From 22ef5d157d459d4c5e2e808ebcfa0b285b73084a Mon Sep 17 00:00:00 2001 From: onyinyang Date: Fri, 24 Mar 2023 11:31:01 -0400 Subject: [PATCH] Updating bridges to make sense --- crates/lox-distributor/src/main.rs | 31 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index e528e81..ff43253 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -124,7 +124,7 @@ impl LoxServerContext { fn update_bridge(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); - ba_obj.bridge_update(&bridgeline, &mut db_obj) + ba_obj.bridge_update(&bridgeline) } fn advance_days_TEST(&self, num: u16) { @@ -385,29 +385,28 @@ async fn shutdown_signal() { println!("Shut down Lox Server"); } +// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified +// in the config.json file. +// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_stream( rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>, ) { - tokio::select! { - start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream , - _ = kill.recv() => {println!("Shut down rdsys stream"); return}, - } -} - -// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified -// in the config.json file. -// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. -async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .expect("rdsys stream initialization failed. Start rdsys or check config.json"); - sleep(Duration::from_millis(1)).await; - while let Some(diff) = rstream.next().await { - println!("Received diff: {:?}", diff); //send this through a channel - tx.send(diff).await.unwrap(); - sleep(Duration::from_secs(1)).await; + loop { + tokio::select! { + res = rstream.next() => { + match res { + Some(diff) => tx.send(diff).await.unwrap(), + None => return, + } + }, + _ = kill.recv() => {println!("Shut down rdsys stream"); return}, + + } } }