Only return Poll::Pending if the inner future returns it

The waker stored in the future will only wake up and re-poll the stream
if the call to future returned Poll::Pending. Because of this, if we
didn't receive enough data to reconstruct the ResourceDiff, we need to
loop and poll the inner future again rather than return Poll::Pending
ourselves.
This commit is contained in:
Cecylia Bocovich 2023-03-22 17:41:08 -04:00
parent e1e5e798b6
commit eb43414574
1 changed files with 39 additions and 39 deletions

View File

@ -5,12 +5,11 @@
use bytes::{self, Buf, Bytes}; use bytes::{self, Buf, Bytes};
use core::pin::Pin; use core::pin::Pin;
use core::task::{Poll, ready};
use futures_util::{Stream, StreamExt}; use futures_util::{Stream, StreamExt};
use reqwest::Client; use reqwest::Client;
use std::io::{self, BufRead}; use std::io::{self, BufRead};
use std::task::{ready, Context, Poll};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::ReusableBoxFuture; use tokio_util::sync::ReusableBoxFuture;
pub mod proto; pub mod proto;
@ -43,7 +42,7 @@ impl From<io::Error> for Error {
/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes
/// received from the connection to the rdsys backend /// received from the connection to the rdsys backend
pub struct ResourceStream { pub struct ResourceStream {
inner: ReusableBoxFuture<'static, (Option<Bytes>, ReceiverStream<Bytes>)>, inner: ReusableBoxFuture<'static, (Option<Bytes>, mpsc::Receiver<Bytes>)>,
buf: Vec<u8>, buf: Vec<u8>,
partial: Option<bytes::buf::Reader<Bytes>>, partial: Option<bytes::buf::Reader<Bytes>>,
} }
@ -51,41 +50,42 @@ pub struct ResourceStream {
impl ResourceStream { impl ResourceStream {
pub fn new(rx: mpsc::Receiver<Bytes>) -> ResourceStream { pub fn new(rx: mpsc::Receiver<Bytes>) -> ResourceStream {
ResourceStream { ResourceStream {
inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))), inner: ReusableBoxFuture::new(make_future(rx)),
buf: vec![], buf: vec![],
partial: None, partial: None,
} }
} }
} }
async fn make_future(mut rx: ReceiverStream<Bytes>) -> (Option<Bytes>, ReceiverStream<Bytes>) { async fn make_future(mut rx: mpsc::Receiver<Bytes>) -> (Option<Bytes>, mpsc::Receiver<Bytes>) {
let result = rx.next().await; let result = rx.recv().await;
(result, rx) (result, rx)
} }
impl Stream for ResourceStream { impl Stream for ResourceStream {
type Item = proto::ResourceDiff; type Item = proto::ResourceDiff;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let parse = let parse = |buffer: &mut bytes::buf::Reader<Bytes>,
|buffer: &mut bytes::buf::Reader<Bytes>, buf: &mut Vec<u8>| -> Result<Option<Self::Item>, Error> { buf: &mut Vec<u8>|
match buffer.read_until(b'\r', buf) { -> Result<Option<Self::Item>, Error> {
Ok(_) => match buf.pop() { match buffer.read_until(b'\r', buf) {
Some(b'\r') => match serde_json::from_slice(buf) { Ok(_) => match buf.pop() {
Ok(diff) => { Some(b'\r') => match serde_json::from_slice(buf) {
buf.clear(); Ok(diff) => {
Ok(Some(diff)) buf.clear();
} Ok(Some(diff))
Err(e) => Err(Error::JSON(e)),
},
Some(n) => {
buf.push(n);
Ok(None)
} }
None => Ok(None), Err(e) => Err(Error::JSON(e)),
}, },
Err(e) => Err(Error::Io(e)), Some(n) => {
} buf.push(n);
}; Ok(None)
}
None => Ok(None),
},
Err(e) => Err(Error::Io(e)),
}
};
// This clone is here to avoid having multiple mutable references to self // This clone is here to avoid having multiple mutable references to self
// it's not optimal performance-wise but given that these resource streams aren't large // it's not optimal performance-wise but given that these resource streams aren't large
// this feels like an acceptable trade-off to the complexity of interior mutability // this feels like an acceptable trade-off to the complexity of interior mutability
@ -98,23 +98,23 @@ impl Stream for ResourceStream {
} }
} }
self.buf = buf; self.buf = buf;
println!("HERE"); loop {
let (result, rx) = ready!(self.inner.poll(cx)); let (result, rx) = ready!(self.inner.poll(cx));
println!("HERE2"); self.inner.set(make_future(rx));
self.inner.set(make_future(rx)); match result {
match result { Some(chunk) => {
Some(chunk) => { let mut buffer = chunk.reader();
let mut buffer = chunk.reader(); match parse(&mut buffer, &mut self.buf) {
match parse(&mut buffer, &mut self.buf) { Ok(Some(diff)) => {
Ok(Some(diff)) => { self.partial = Some(buffer);
self.partial = Some(buffer); return Poll::Ready(Some(diff));
return Poll::Ready(Some(diff)); }
Ok(None) => continue,
Err(_) => return Poll::Ready(None),
} }
Ok(None) => Poll::Pending, //maybe loop here?
Err(_) => return Poll::Ready(None),
} }
None => return Poll::Ready(None),
} }
None => Poll::Ready(None),
} }
} }
} }