Updating bridges to make sense

This commit is contained in:
onyinyang 2023-03-24 11:31:01 -04:00
parent 3f5d497573
commit 22ef5d157d
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
1 changed files with 15 additions and 16 deletions

View File

@ -124,7 +124,7 @@ impl LoxServerContext {
fn update_bridge(&self, bridgeline: BridgeLine) -> bool { fn update_bridge(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap(); let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap(); let mut db_obj = self.db.lock().unwrap();
ba_obj.bridge_update(&bridgeline, &mut db_obj) ba_obj.bridge_update(&bridgeline)
} }
fn advance_days_TEST(&self, num: u16) { fn advance_days_TEST(&self, num: u16) {
@ -385,29 +385,28 @@ async fn shutdown_signal() {
println!("Shut down Lox Server"); println!("Shut down Lox Server");
} }
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
// in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_stream( async fn rdsys_stream(
rtype: ResourceInfo, rtype: ResourceInfo,
tx: mpsc::Sender<ResourceDiff>, tx: mpsc::Sender<ResourceDiff>,
mut kill: broadcast::Receiver<()>, mut kill: broadcast::Receiver<()>,
) { ) {
tokio::select! {
start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream ,
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
}
}
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
// in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender<ResourceDiff>) {
let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
.await .await
.expect("rdsys stream initialization failed. Start rdsys or check config.json"); .expect("rdsys stream initialization failed. Start rdsys or check config.json");
sleep(Duration::from_millis(1)).await; loop {
while let Some(diff) = rstream.next().await { tokio::select! {
println!("Received diff: {:?}", diff); //send this through a channel res = rstream.next() => {
tx.send(diff).await.unwrap(); match res {
sleep(Duration::from_secs(1)).await; Some(diff) => tx.send(diff).await.unwrap(),
None => return,
}
},
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
}
} }
} }