Use a ReusableBoxFuture to store ReceiverStream

This commit is contained in:
Cecylia Bocovich 2023-03-22 15:05:05 -04:00
parent 0b0324c487
commit e1e5e798b6
2 changed files with 17 additions and 7 deletions

View File

@ -14,3 +14,4 @@ tokio = { version = "1", features = ["macros"]}
reqwest = { version = "0.11", features = ["stream"]} reqwest = { version = "0.11", features = ["stream"]}
tokio-stream = "0.1.12" tokio-stream = "0.1.12"
futures = "0.3.27" futures = "0.3.27"
tokio-util = "0.7.7"

View File

@ -5,12 +5,13 @@
use bytes::{self, Buf, Bytes}; use bytes::{self, Buf, Bytes};
use core::pin::Pin; use core::pin::Pin;
use core::task::Poll; use core::task::{Poll, ready};
use futures_util::{Stream, StreamExt}; use futures_util::{Stream, StreamExt};
use reqwest::Client; use reqwest::Client;
use std::io::{self, BufRead}; use std::io::{self, BufRead};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::ReusableBoxFuture;
pub mod proto; pub mod proto;
@ -42,7 +43,7 @@ impl From<io::Error> for Error {
/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes
/// received from the connection to the rdsys backend /// received from the connection to the rdsys backend
pub struct ResourceStream { pub struct ResourceStream {
rx: ReceiverStream<Bytes>, inner: ReusableBoxFuture<'static, (Option<Bytes>, ReceiverStream<Bytes>)>,
buf: Vec<u8>, buf: Vec<u8>,
partial: Option<bytes::buf::Reader<Bytes>>, partial: Option<bytes::buf::Reader<Bytes>>,
} }
@ -50,13 +51,18 @@ pub struct ResourceStream {
impl ResourceStream { impl ResourceStream {
pub fn new(rx: mpsc::Receiver<Bytes>) -> ResourceStream { pub fn new(rx: mpsc::Receiver<Bytes>) -> ResourceStream {
ResourceStream { ResourceStream {
rx: ReceiverStream::new(rx), inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))),
buf: vec![], buf: vec![],
partial: None, partial: None,
} }
} }
} }
async fn make_future(mut rx: ReceiverStream<Bytes>) -> (Option<Bytes>, ReceiverStream<Bytes>) {
let result = rx.next().await;
(result, rx)
}
impl Stream for ResourceStream { impl Stream for ResourceStream {
type Item = proto::ResourceDiff; type Item = proto::ResourceDiff;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll<Option<Self::Item>> {
@ -92,8 +98,12 @@ impl Stream for ResourceStream {
} }
} }
self.buf = buf; self.buf = buf;
match Pin::new(&mut self.rx).poll_next(cx) { println!("HERE");
Poll::Ready(Some(chunk)) => { let (result, rx) = ready!(self.inner.poll(cx));
println!("HERE2");
self.inner.set(make_future(rx));
match result {
Some(chunk) => {
let mut buffer = chunk.reader(); let mut buffer = chunk.reader();
match parse(&mut buffer, &mut self.buf) { match parse(&mut buffer, &mut self.buf) {
Ok(Some(diff)) => { Ok(Some(diff)) => {
@ -104,8 +114,7 @@ impl Stream for ResourceStream {
Err(_) => return Poll::Ready(None), Err(_) => return Poll::Ready(None),
} }
} }
Poll::Ready(None) => Poll::Ready(None), None => Poll::Ready(None),
Poll::Pending => Poll::Pending,
} }
} }
} }