From eb43414574c78893f01ff6aa2ec240229b8e9339 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 22 Mar 2023 17:41:08 -0400 Subject: [PATCH] Only return Poll::Pending if the inner future returns it The waker stored in the future will only wake up and re-poll the stream if the call to future returned Poll::Pending. Because of this, if we didn't receive enough data to reconstruct the ResourceDiff, we need to loop and poll the inner future again rather than return Poll::Pending ourselves. --- crates/rdsys-backend/src/lib.rs | 78 ++++++++++++++++----------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 75e8f11..ae8c7db 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -5,12 +5,11 @@ use bytes::{self, Buf, Bytes}; use core::pin::Pin; -use core::task::{Poll, ready}; use futures_util::{Stream, StreamExt}; use reqwest::Client; use std::io::{self, BufRead}; +use std::task::{ready, Context, Poll}; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::ReusableBoxFuture; pub mod proto; @@ -43,7 +42,7 @@ impl From for Error { /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { - inner: ReusableBoxFuture<'static, (Option, ReceiverStream)>, + inner: ReusableBoxFuture<'static, (Option, mpsc::Receiver)>, buf: Vec, partial: Option>, } @@ -51,41 +50,42 @@ pub struct ResourceStream { impl ResourceStream { pub fn new(rx: mpsc::Receiver) -> ResourceStream { ResourceStream { - inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))), + inner: ReusableBoxFuture::new(make_future(rx)), buf: vec![], partial: None, } } } -async fn make_future(mut rx: ReceiverStream) -> (Option, ReceiverStream) { - let result = rx.next().await; +async fn make_future(mut rx: mpsc::Receiver) -> (Option, mpsc::Receiver) { + let result = rx.recv().await; (result, rx) } impl Stream for ResourceStream { type Item = proto::ResourceDiff; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll> { - let parse = - |buffer: &mut bytes::buf::Reader, buf: &mut Vec| -> Result, Error> { - match buffer.read_until(b'\r', buf) { - Ok(_) => match buf.pop() { - Some(b'\r') => match serde_json::from_slice(buf) { - Ok(diff) => { - buf.clear(); - Ok(Some(diff)) - } - Err(e) => Err(Error::JSON(e)), - }, - Some(n) => { - buf.push(n); - Ok(None) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let parse = |buffer: &mut bytes::buf::Reader, + buf: &mut Vec| + -> Result, Error> { + match buffer.read_until(b'\r', buf) { + Ok(_) => match buf.pop() { + Some(b'\r') => match serde_json::from_slice(buf) { + Ok(diff) => { + buf.clear(); + Ok(Some(diff)) } - None => Ok(None), + Err(e) => Err(Error::JSON(e)), }, - Err(e) => Err(Error::Io(e)), - } - }; + Some(n) => { + buf.push(n); + Ok(None) + } + None => Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; // This clone is here to avoid having multiple mutable references to self // it's not optimal performance-wise but given that these resource streams aren't large // this feels like an acceptable trade-off to the complexity of interior mutability @@ -98,23 +98,23 @@ impl Stream for ResourceStream { } } self.buf = buf; - println!("HERE"); - let (result, rx) = ready!(self.inner.poll(cx)); - println!("HERE2"); - self.inner.set(make_future(rx)); - match result { - Some(chunk) => { - let mut buffer = chunk.reader(); - match parse(&mut buffer, &mut self.buf) { - Ok(Some(diff)) => { - self.partial = Some(buffer); - return Poll::Ready(Some(diff)); + loop { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Some(chunk) => { + let mut buffer = chunk.reader(); + match parse(&mut buffer, &mut self.buf) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Poll::Ready(Some(diff)); + } + Ok(None) => continue, + Err(_) => return Poll::Ready(None), } - Ok(None) => Poll::Pending, //maybe loop here? - Err(_) => return Poll::Ready(None), } + None => return Poll::Ready(None), } - None => Poll::Ready(None), } } }