Incorporate impl Stream for ResourceStream changes from rdsys-api-backend

This commit is contained in:
onyinyang 2023-03-22 19:14:56 -04:00
parent 47f645a4f9
commit a2d50e29d8
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
1 changed files with 4 additions and 2 deletions

View File

@ -1,4 +1,5 @@
use futures::future; use futures::future;
use futures::StreamExt;
use hyper::{ use hyper::{
body, body,
body::Bytes, body::Bytes,
@ -30,6 +31,7 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration, time::Duration,
}; };
use tokio::{ use tokio::{
signal, spawn, signal, spawn,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot},
@ -398,11 +400,11 @@ async fn rdsys_stream(
// in the config.json file. // in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender<ResourceDiff>) { async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender<ResourceDiff>) {
let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
.await .await
.expect("rdsys stream initialization failed. Start rdsys or check config.json"); .expect("rdsys stream initialization failed. Start rdsys or check config.json");
sleep(Duration::from_millis(1)).await; sleep(Duration::from_millis(1)).await;
for diff in rstream { while let Some(diff) = rstream.next().await {
println!("Received diff: {:?}", diff); //send this through a channel println!("Received diff: {:?}", diff); //send this through a channel
tx.send(diff).await.unwrap(); tx.send(diff).await.unwrap();
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;