From a2d50e29d8614784169a49bb09ebb34ca548adc4 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Wed, 22 Mar 2023 19:14:56 -0400 Subject: [PATCH] Incorporate impl Stream for ResourceStream changes from rdsys-api-backend --- crates/lox-distributor/src/main.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 4013bf1..e528e81 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -1,4 +1,5 @@ use futures::future; +use futures::StreamExt; use hyper::{ body, body::Bytes, @@ -30,6 +31,7 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; + use tokio::{ signal, spawn, sync::{broadcast, mpsc, oneshot}, @@ -398,11 +400,11 @@ async fn rdsys_stream( // in the config.json file. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { - let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) + let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .expect("rdsys stream initialization failed. Start rdsys or check config.json"); sleep(Duration::from_millis(1)).await; - for diff in rstream { + while let Some(diff) = rstream.next().await { println!("Received diff: {:?}", diff); //send this through a channel tx.send(diff).await.unwrap(); sleep(Duration::from_secs(1)).await;