From 47f645a4f99ea17b76553d6b3d3c4b364759f1e6 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Mon, 20 Mar 2023 12:42:40 -0400 Subject: [PATCH] Add distributor logic for bridge updates from rdsys, handling for unsuccessful removals/updates --- crates/lox-distributor/src/main.rs | 295 +++++++++++++++++------------ 1 file changed, 174 insertions(+), 121 deletions(-) diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 3e0834a..4013bf1 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -19,6 +19,8 @@ use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH}; use rand::RngCore; use rdsys_backend::{proto::ResourceDiff, start_stream}; use serde::{Deserialize, Serialize}; +use serde_json; +use serde_with::serde_as; use std::{ convert::Infallible, env, @@ -28,12 +30,10 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use serde_json; -use serde_with::serde_as; use tokio::{ - spawn, + signal, spawn, sync::{broadcast, mpsc, oneshot}, - time::sleep, signal, + time::sleep, }; #[serde_as] @@ -113,10 +113,16 @@ impl LoxServerContext { ba_obj.add_openinv_bridges(bucket, &mut db_obj); } - fn add_unreachable(&self, bridgeline: BridgeLine) { + fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); - ba_obj.bridge_unreachable(&bridgeline, &mut db_obj); + ba_obj.bridge_unreachable(&bridgeline, &mut db_obj) + } + + fn update_bridge(&self, bridgeline: BridgeLine) -> bool { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.bridge_update(&bridgeline, &mut db_obj) } fn advance_days_TEST(&self, num: u16) { @@ -274,7 +280,6 @@ async fn handle( } } - // Generate and return an open invitation token fn generate_invite(context: LoxServerContext) -> Response { let invite = context.gen_invite(); @@ -378,8 +383,12 @@ async fn shutdown_signal() { println!("Shut down Lox Server"); } -async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>) { - tokio:: select! { +async fn rdsys_stream( + rtype: ResourceInfo, + tx: mpsc::Sender, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream , _ = kill.recv() => {println!("Shut down rdsys stream"); return}, } @@ -389,10 +398,10 @@ async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut // in the config.json file. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { - let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) + let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .expect("rdsys stream initialization failed. Start rdsys or check config.json"); - sleep(Duration::from_millis(1)).await; + sleep(Duration::from_millis(1)).await; for diff in rstream { println!("Received diff: {:?}", diff); //send this through a channel tx.send(diff).await.unwrap(); @@ -400,8 +409,12 @@ async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { } } -async fn rdsys_bridge_parser(rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) { - tokio:: select! { +async fn rdsys_bridge_parser( + rdsys_tx: mpsc::Sender, + rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , _ = kill.recv() => {println!("Shut down bridge_parser"); return}, } @@ -420,8 +433,11 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) { - tokio:: select! { +async fn create_context_manager( + context_rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { create_context = context_manager(context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager"); return}, } @@ -431,34 +447,33 @@ async fn create_context_manager(context_rx: mpsc::Receiver, mut kill: b // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state async fn context_manager(mut context_rx: mpsc::Receiver) { + let bridgedb = BridgeDb::new(); + let lox_auth = BridgeAuth::new(bridgedb.pubkey); - let bridgedb = BridgeDb::new(); - let lox_auth = BridgeAuth::new(bridgedb.pubkey); + let context = LoxServerContext { + db: Arc::new(Mutex::new(bridgedb)), + ba: Arc::new(Mutex::new(lox_auth)), + }; - let context = LoxServerContext { - db: Arc::new(Mutex::new(bridgedb)), - ba: Arc::new(Mutex::new(lox_auth)), - }; + while let Some(cmd) = context_rx.recv().await { + use Command::*; - while let Some(cmd) = context_rx.recv().await { - use Command::*; - - match cmd { - Rdsys { resourcediff } => { - for new_resource in resourcediff.new { - for pt in new_resource { - println!("A NEW RESOURCE: {:?}", pt); - let mut bucket = [ - BridgeLine::default(), - BridgeLine::default(), - BridgeLine::default(), - ]; - let mut count = 0; - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( + match cmd { + Rdsys { resourcediff } => { + for new_resource in resourcediff.new { + for pt in new_resource { + println!("A NEW RESOURCE: {:?}", pt); + let mut bucket = [ + BridgeLine::default(), + BridgeLine::default(), + BridgeLine::default(), + ]; + let mut count = 0; + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, @@ -469,43 +484,77 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { resource.flags, resource.params, ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = - [0; BRIDGE_BYTES - 18]; + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; - println!("Now it's a bridgeline: {:?}", bridgeline); - if count < 2 { - bucket[count] = bridgeline; - count += 1; - } else { - context.add_openinv_bucket(bucket); - count = 0; - bucket = [ - BridgeLine::default(), - BridgeLine::default(), - BridgeLine::default(), - ]; - } - } - } - } - for changed_resource in resourcediff.changed { - println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); - } - for gone_resource in resourcediff.gone { - for pt in gone_resource { - println!("A NEW GONE RESOURCE: {:?}", pt); - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( + println!("Now it's a bridgeline: {:?}", bridgeline); + if count < 2 { + bucket[count] = bridgeline; + count += 1; + } else { + context.add_openinv_bucket(bucket); + count = 0; + bucket = [ + BridgeLine::default(), + BridgeLine::default(), + BridgeLine::default(), + ]; + } + } + } + } + for changed_resource in resourcediff.changed { + for pt in changed_resource { + println!("A NEW CHANGED RESOURCE: {:?}", pt); + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( + "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", + resource.r#type, + resource.blocked_in, + resource.protocol, + resource.fingerprint, + resource.or_addresses, + resource.distribution, + resource.flags, + resource.params, + ); + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; + + println!("BridgeLine to be changed: {:?}", bridgeline); + let res = context.update_bridge(bridgeline); + if res { + println!("BridgeLine successfully updated: {:?}", bridgeline); + } else { + println!("'Changed' BridgeLine NOT UPDATED!! : {:?}", bridgeline); + //TODO probably do something else here + } + } + } + } + for gone_resource in resourcediff.gone { + for pt in gone_resource { + println!("A NEW GONE RESOURCE: {:?}", pt); + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, @@ -516,40 +565,46 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { resource.flags, resource.params, ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = - [0; BRIDGE_BYTES - 18]; + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; - println!("Now it's a bridgeline: {:?}", bridgeline); - context.add_unreachable(bridgeline); - } - } - } - context.encrypt_table(); - sleep(Duration::from_millis(1)).await; - } - Request { req, sender } => { - let response = handle(context.clone(), req).await; - if let Err(e) = sender.send(response) { - eprintln!("Server Response Error: {:?}", e); - }; - sleep(Duration::from_millis(1)).await; - } - Shutdown { shutdown_sig} => { + println!("BridgeLine to be removed: {:?}", bridgeline); + let res = context.add_unreachable(bridgeline); + if res { + println!( + "BridgeLine successfully marked unreachable: {:?}", + bridgeline + ); + } else { + println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); + //TODO probably do something else here + } + } + } + } + context.encrypt_table(); + sleep(Duration::from_millis(1)).await; + } + Request { req, sender } => { + let response = handle(context.clone(), req).await; + if let Err(e) = sender.send(response) { + eprintln!("Server Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; + } + Shutdown { shutdown_sig } => { println!("Sending Shutdown Signal, all threads should shutdown."); drop(shutdown_sig); println!("Shutdown Sent."); - } - } - } - - + } + } + } } // Each of the commands that the Context Manager handles @@ -564,7 +619,7 @@ enum Command { }, Shutdown { shutdown_sig: broadcast::Sender<()>, - } + }, } #[tokio::main] @@ -573,22 +628,21 @@ async fn main() { let file = File::open(&args[1]).expect("Should have been able to read config.json file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo - let rtype: ResourceInfo = serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); + let rtype: ResourceInfo = + serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); let shutdown_cmd_tx = rdsys_tx.clone(); - - // create the shutdown broadcast channel and clone for every thread - let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); - let kill_stream = shutdown_tx.subscribe(); - let kill_parser = shutdown_tx.subscribe(); - let kill_context= shutdown_tx.subscribe(); + let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); + let kill_stream = shutdown_tx.subscribe(); + let kill_parser = shutdown_tx.subscribe(); + let kill_context = shutdown_tx.subscribe(); -// Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx - let shutdown_handler = spawn(async move { + // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx + let shutdown_handler = spawn(async move { tokio::select! { _ = signal::ctrl_c() => { let cmd = Command::Shutdown { @@ -600,15 +654,16 @@ async fn main() { _ = shutdown_rx.recv().await; } } - }); + }); - - let context_manager = spawn(async move { create_context_manager(context_rx, kill_context).await }); + let context_manager = + spawn(async move { create_context_manager(context_rx, kill_context).await }); let (tx, rx) = mpsc::channel(32); let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await }); - let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); + let rdsys_resource_receiver = + spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); let make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = request_tx.clone(); @@ -634,13 +689,11 @@ async fn main() { if let Err(e) = graceful.await { eprintln!("server error: {}", e); } - future::join_all([ + future::join_all([ rdsys_stream_handler, rdsys_resource_receiver, context_manager, shutdown_handler, ]) .await; - - }