diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index 03930ce..7b41d7f 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-channel = "1.8.0" base64 = "0.13" hyper = { version = "0.14.24", features = ["server"] } hex_fmt = "0.3" diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 6998e04..3e0834a 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -1,4 +1,3 @@ -use async_channel::{Receiver, Sender}; use futures::future; use hyper::{ body, @@ -211,6 +210,7 @@ impl LoxServerContext { } } +// Lox Request handling logic for each Lox request/protocol async fn handle( cloned_context: LoxServerContext, req: Request, @@ -274,6 +274,8 @@ async fn handle( } } + +// Generate and return an open invitation token fn generate_invite(context: LoxServerContext) -> Response { let invite = context.gen_invite(); let token = serde_json::to_string(&invite).unwrap(); @@ -294,6 +296,7 @@ fn send_reachability_cred(context: LoxServerContext) -> Response { resp } +// Return the serialized pubkeys for the Bridge Authority fn send_keys(context: LoxServerContext) -> Response { let pubkeys = context.pubkeys(); @@ -375,14 +378,17 @@ async fn shutdown_signal() { println!("Shut down Lox Server"); } -async fn rdsys_stream(rtype: ResourceInfo, tx: Sender, mut kill: broadcast::Receiver<()>) { +async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>) { tokio:: select! { start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream , _ = kill.recv() => {println!("Shut down rdsys stream"); return}, } } -async fn rdsys_sender(rtype: ResourceInfo, tx: Sender) { +// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified +// in the config.json file. +// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. +async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .expect("rdsys stream initialization failed. Start rdsys or check config.json"); @@ -394,14 +400,16 @@ async fn rdsys_sender(rtype: ResourceInfo, tx: Sender) { } } -async fn rdsys_bridge_parser(rdsys_tx: mpsc::Sender, rx: Receiver, mut kill: broadcast::Receiver<()>) { +async fn rdsys_bridge_parser(rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) { tokio:: select! { start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , _ = kill.recv() => {println!("Shut down bridge_parser"); return}, } } -async fn parse_bridges(rdsys_tx: mpsc::Sender, rx: Receiver) { +// Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the +// Context Manager to be parsed and added to the BridgeDB +async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resourcediff = rx.recv().await.unwrap(); let cmd = Command::Rdsys { @@ -419,8 +427,10 @@ async fn create_context_manager(context_rx: mpsc::Receiver, mut kill: b } } +// Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring +// that the DB can be updated from the rdsys stream and client requests +// can be responded to with an updated BridgeDB state async fn context_manager(mut context_rx: mpsc::Receiver) { - // pass in distribution of open invite vs. hot spare buckets? let bridgedb = BridgeDb::new(); let lox_auth = BridgeAuth::new(bridgedb.pubkey); @@ -531,12 +541,18 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { }; sleep(Duration::from_millis(1)).await; } + Shutdown { shutdown_sig} => { + println!("Sending Shutdown Signal, all threads should shutdown."); + drop(shutdown_sig); + println!("Shutdown Sent."); + } } } } +// Each of the commands that the Context Manager handles #[derive(Debug)] enum Command { Rdsys { @@ -546,6 +562,9 @@ enum Command { req: Request, sender: oneshot::Sender, Infallible>>, }, + Shutdown { + shutdown_sig: broadcast::Sender<()>, + } } #[tokio::main] @@ -554,14 +573,15 @@ async fn main() { let file = File::open(&args[1]).expect("Should have been able to read config.json file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo - let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap(); + let rtype: ResourceInfo = serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); + let shutdown_cmd_tx = rdsys_tx.clone(); - // create the shutdown broadcast channel + // create the shutdown broadcast channel and clone for every thread let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); let kill_stream = shutdown_tx.subscribe(); let kill_parser = shutdown_tx.subscribe(); @@ -571,20 +591,21 @@ async fn main() { let shutdown_handler = spawn(async move { tokio::select! { _ = signal::ctrl_c() => { - drop(shutdown_tx); - println!("Kill Sent, all threads should shutdown."); + let cmd = Command::Shutdown { + shutdown_sig: shutdown_tx, + }; + shutdown_cmd_tx.send(cmd).await.unwrap(); + sleep(Duration::from_secs(1)).await; _ = shutdown_rx.recv().await; - println!("Receiving shutdown signals. . ."); } } }); - let context_manager = spawn(async move { create_context_manager(context_rx, kill_context).await }); - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = mpsc::channel(32); let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await }); let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });