diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index cdaa5a0..e30c954 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -14,3 +14,4 @@ tokio = { version = "1", features = ["macros"]} reqwest = { version = "0.11", features = ["stream"]} tokio-stream = "0.1.12" futures = "0.3.27" +tokio-util = "0.7.7" diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 7ac1c52..75e8f11 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -5,12 +5,13 @@ use bytes::{self, Buf, Bytes}; use core::pin::Pin; -use core::task::Poll; +use core::task::{Poll, ready}; use futures_util::{Stream, StreamExt}; use reqwest::Client; use std::io::{self, BufRead}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::ReusableBoxFuture; pub mod proto; @@ -42,7 +43,7 @@ impl From for Error { /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { - rx: ReceiverStream, + inner: ReusableBoxFuture<'static, (Option, ReceiverStream)>, buf: Vec, partial: Option>, } @@ -50,13 +51,18 @@ pub struct ResourceStream { impl ResourceStream { pub fn new(rx: mpsc::Receiver) -> ResourceStream { ResourceStream { - rx: ReceiverStream::new(rx), + inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))), buf: vec![], partial: None, } } } +async fn make_future(mut rx: ReceiverStream) -> (Option, ReceiverStream) { + let result = rx.next().await; + (result, rx) +} + impl Stream for ResourceStream { type Item = proto::ResourceDiff; fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll> { @@ -92,8 +98,12 @@ impl Stream for ResourceStream { } } self.buf = buf; - match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(Some(chunk)) => { + println!("HERE"); + let (result, rx) = ready!(self.inner.poll(cx)); + println!("HERE2"); + self.inner.set(make_future(rx)); + match result { + Some(chunk) => { let mut buffer = chunk.reader(); match parse(&mut buffer, &mut self.buf) { Ok(Some(diff)) => { @@ -104,8 +114,7 @@ impl Stream for ResourceStream { Err(_) => return Poll::Ready(None), } } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + None => Poll::Ready(None), } } }