diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index b89e896..03930ce 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -10,12 +10,12 @@ async-channel = "1.8.0" base64 = "0.13" hyper = { version = "0.14.24", features = ["server"] } hex_fmt = "0.3" +futures = "0.3.26" tokio = { version = "1", features = ["full", "macros", "signal"] } rand = "0.7" serde = { version = "1.0", features = ["derive"] } serde_with = "1.9.1" serde_json = "1.0.87" -time = "0.2" lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"} rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"} diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 4e4aa1a..b861b9c 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -1,3 +1,5 @@ +use async_channel::{Sender, Receiver}; +use futures::future; use hyper::{ body, body::Bytes, @@ -7,7 +9,13 @@ use hyper::{ Body, Method, Request, Response, Server, StatusCode, }; use lox::bridge_table::{BridgeLine, BRIDGE_BYTES, ENC_BUCKET_BYTES}; -use lox::proto; +use lox::{ + proto::{ + blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite, + redeem_invite, trust_promotion, + }, + IssuerPubKey, +}; use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH}; use rand::RngCore; use rdsys_backend::{proto::ResourceDiff, start_stream, ResourceStream}; @@ -19,11 +27,12 @@ use std::{ io::BufReader, net::SocketAddr, sync::{Arc, Mutex}, + time::Duration, }; use serde_json; use serde_with::serde_as; -use tokio::{spawn, sync::mpsc}; +use tokio::{spawn, time::sleep, sync::mpsc}; #[serde_as] #[derive(Serialize, Deserialize)] @@ -95,6 +104,110 @@ struct LoxServerContext { ba: Arc>, } +impl LoxServerContext { + fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.add_openinv_bridges(bucket, &mut db_obj); + } + + fn add_unreachable(&self, bridgeline: BridgeLine) { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.bridge_unreachable(&bridgeline, &mut db_obj); + } + + fn advance_days_TEST(&self, num: u16) { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.advance_days(num); // FOR TESTING ONLY + println!("Today's date according to server: {}", ba_obj.today()); + } + + fn encrypt_table(&self) -> Vec<[u8; ENC_BUCKET_BYTES]> { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.enc_bridge_table().clone() + } + + fn pubkeys(&self) -> Vec { + let ba_obj = self.ba.lock().unwrap(); + // vector of public keys (to serialize) + vec![ + ba_obj.lox_pub.clone(), + ba_obj.migration_pub.clone(), + ba_obj.migrationkey_pub.clone(), + ba_obj.reachability_pub.clone(), + ba_obj.invitation_pub.clone(), + ] + } + + fn gen_invite(&self) -> Invite { + let obj = self.db.lock().unwrap(); + return Invite { + invite: obj.invite(), + }; + } + + fn open_inv(&self, req: open_invite::Request) -> open_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_open_invite(req).unwrap() + } + + fn trust_promo(&self, req: trust_promotion::Request) -> trust_promotion::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_trust_promotion(req).unwrap() + } + + fn trust_migration(&self, req: migration::Request) -> migration::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_migration(req).unwrap() + } + + fn level_up(&self, req: level_up::Request) -> level_up::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_level_up(req).unwrap() + } + + fn issue_invite(&self, req: issue_invite::Request) -> issue_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_issue_invite(req).unwrap() + } + + fn redeem_invite(&self, req: redeem_invite::Request) -> redeem_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_redeem_invite(req).unwrap() + } + + fn check_blockage(&self, req: check_blockage::Request) -> check_blockage::Response { + let mut ba_obj = self.ba.lock().unwrap(); + // Created 5 buckets initially, so we will add 5 hot spares (for migration) and + // block all of the existing buckets to trigger migration table propagation + // FOR TESTING ONLY, ADD 5 NEW Buckets + for _ in 0..5 { + let bucket = [random(), random(), random()]; + ba_obj.add_spare_bucket(bucket); + } + ba_obj.enc_bridge_table(); + + // FOR TESTING ONLY, BLOCK ALL BRIDGES + let mut db_obj = self.db.lock().unwrap(); + for index in 0..5 { + let b0 = ba_obj.bridge_table.buckets[index][0]; + let b1 = ba_obj.bridge_table.buckets[index][1]; + let b2 = ba_obj.bridge_table.buckets[index][2]; + ba_obj.bridge_unreachable(&b0, &mut db_obj); + ba_obj.bridge_unreachable(&b1, &mut db_obj); + ba_obj.bridge_unreachable(&b2, &mut db_obj); + } + ba_obj.enc_bridge_table(); + ba_obj.handle_check_blockage(req).unwrap() + } + + fn blockage_migration(&self, req: blockage_migration::Request) -> blockage_migration::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_blockage_migration(req).unwrap() + } +} + async fn handle( context: LoxServerContext, // addr: SocketAddr, @@ -110,43 +223,41 @@ async fn handle( .body(Body::from("Allow POST")) .unwrap()), _ => match (req.method(), req.uri().path()) { - (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)), - (&Method::GET, "/reachability") => { - Ok::<_, Infallible>(send_reachability_cred(context.ba)) - } - (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)), + (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context)), + (&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context)), + (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context)), (&Method::POST, "/openreq") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_open_cred(bytes, context.ba) + verify_and_send_open_cred(bytes, context) }), (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_trust_promo(bytes, context.ba) + verify_and_send_trust_promo(bytes, context) }), (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_trust_migration(bytes, context.ba) + verify_and_send_trust_migration(bytes, context) }), (&Method::POST, "/levelup") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_level_up(bytes, context.ba) + verify_and_send_level_up(bytes, context) }), (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_issue_invite(bytes, context.ba) + verify_and_send_issue_invite(bytes, context) }), (&Method::POST, "/redeem") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_redeem_invite(bytes, context.ba) + verify_and_send_redeem_invite(bytes, context) }), (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); // TEST ONLY: Block all existing bridges and add new ones for migration - verify_and_send_check_blockage(bytes, context.ba, context.db) + verify_and_send_check_blockage(bytes, context) }), (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_blockage_migration(bytes, context.ba) + verify_and_send_blockage_migration(bytes, context) }), _ => { // Return 404 not found response. @@ -159,12 +270,8 @@ async fn handle( } } -fn generate_invite(db: Arc>) -> Response { - let obj = db.lock().unwrap(); - let invite = Invite { - invite: obj.invite(), - }; - +fn generate_invite(context: LoxServerContext) -> Response { + let invite = context.gen_invite(); let token = serde_json::to_string(&invite).unwrap(); let mut resp = Response::new(Body::from(token)); resp.headers_mut() @@ -173,11 +280,9 @@ fn generate_invite(db: Arc>) -> Response { } // Return the serialized encrypted bridge table -fn send_reachability_cred(ba: Arc>) -> Response { - let mut ba_obj = ba.lock().unwrap(); - ba_obj.advance_days(85); // FOR TESTING ONLY - println!("Today's date according to server: {}", ba_obj.today()); - let enc_table = ba_obj.enc_bridge_table().clone(); +fn send_reachability_cred(context: LoxServerContext) -> Response { + context.advance_days_TEST(85); // FOR TESTING ONLY + let enc_table = context.encrypt_table(); let etable = EncBridgeTable { etable: enc_table }; let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap())); resp.headers_mut() @@ -185,113 +290,69 @@ fn send_reachability_cred(ba: Arc>) -> Response { resp } -fn send_keys(ba: Arc>) -> Response { - let ba_obj = ba.lock().unwrap(); - // vector of public keys (to serialize) - let ba_obj_pubkeys = vec![ - &ba_obj.lox_pub, - &ba_obj.migration_pub, - &ba_obj.migrationkey_pub, - &ba_obj.reachability_pub, - &ba_obj.invitation_pub, - ]; - println!("Today's date according to server: {}", ba_obj.today()); +fn send_keys(context: LoxServerContext) -> Response { + let pubkeys = context.pubkeys(); - let mut resp = Response::new(Body::from(serde_json::to_string(&ba_obj_pubkeys).unwrap())); + let mut resp = Response::new(Body::from(serde_json::to_string(&pubkeys).unwrap())); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } -fn verify_and_send_open_cred(request: Bytes, ba: Arc>) -> Response { - let req: proto::open_invite::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_open_invite(req).unwrap(); +fn verify_and_send_open_cred(request: Bytes, context: LoxServerContext) -> Response { + let req: open_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.open_inv(req); let open_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(open_invite_resp_str) } -fn verify_and_send_trust_promo(request: Bytes, ba: Arc>) -> Response { - let req: proto::trust_promotion::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - ba_obj.advance_days(31); // FOR TESTING ONLY - println!("Today's date according to server: {}", ba_obj.today()); - let response = ba_obj.handle_trust_promotion(req).unwrap(); +fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> Response { + let req: trust_promotion::Request = serde_json::from_slice(&request).unwrap(); + context.advance_days_TEST(31); // FOR TESTING ONLY + let response = context.trust_promo(req); let trust_promo_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(trust_promo_resp_str) } -fn verify_and_send_trust_migration(request: Bytes, ba: Arc>) -> Response { - let req: proto::migration::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_migration(req).unwrap(); +fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response { + let req: migration::Request = serde_json::from_slice(&request).unwrap(); + let response = context.trust_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); prepare_header(resp_str) } -fn verify_and_send_level_up(request: Bytes, ba: Arc>) -> Response { - let req: proto::level_up::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_level_up(req).unwrap(); +fn verify_and_send_level_up(request: Bytes, context: LoxServerContext) -> Response { + let req: level_up::Request = serde_json::from_slice(&request).unwrap(); + let response = context.level_up(req); let level_up_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(level_up_resp_str) } -fn verify_and_send_issue_invite(request: Bytes, ba: Arc>) -> Response { - let req: proto::issue_invite::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_issue_invite(req).unwrap(); +fn verify_and_send_issue_invite(request: Bytes, context: LoxServerContext) -> Response { + let req: issue_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.issue_invite(req); let issue_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(issue_invite_resp_str) } -fn verify_and_send_redeem_invite(request: Bytes, ba: Arc>) -> Response { - let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_redeem_invite(req).unwrap(); +fn verify_and_send_redeem_invite(request: Bytes, context: LoxServerContext) -> Response { + let req: redeem_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.redeem_invite(req); let redeem_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(redeem_invite_resp_str) } -fn verify_and_send_check_blockage( - request: Bytes, - ba: Arc>, - db: Arc>, -) -> Response { - let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - // Created 5 buckets initially, so we will add 5 hot spares (for migration) and - // block all of the existing buckets to trigger migration table propagation - // FOR TESTING ONLY, ADD 5 NEW Buckets - for _ in 0..5 { - let bucket = [random(), random(), random()]; - ba_obj.add_spare_bucket(bucket); - } - ba_obj.enc_bridge_table(); +fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) -> Response { + let req: check_blockage::Request = serde_json::from_slice(&request).unwrap(); - // FOR TESTING ONLY, BLOCK ALL BRIDGES - let mut db_obj = db.lock().unwrap(); - for index in 0..5 { - let b0 = ba_obj.bridge_table.buckets[index][0]; - let b1 = ba_obj.bridge_table.buckets[index][1]; - let b2 = ba_obj.bridge_table.buckets[index][2]; - ba_obj.bridge_unreachable(&b0, &mut db_obj); - ba_obj.bridge_unreachable(&b1, &mut db_obj); - ba_obj.bridge_unreachable(&b2, &mut db_obj); - } - ba_obj.enc_bridge_table(); - let response = ba_obj.handle_check_blockage(req).unwrap(); + let response = context.check_blockage(req); let check_blockage_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(check_blockage_resp_str) } -fn verify_and_send_blockage_migration( - request: Bytes, - ba: Arc>, -) -> Response { - let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap(); - let mut ba_obj = ba.lock().unwrap(); - let response = ba_obj.handle_blockage_migration(req).unwrap(); +fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response { + let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap(); + let response = context.blockage_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); prepare_header(resp_str) } @@ -309,8 +370,32 @@ async fn shutdown_signal() { .expect("failed to listen for ctrl+c signal"); } -// Initial bridgedb setup then: -// Listen for updates and return new bridges to be added to the bridged +async fn rdsys_sender(rstream: ResourceStream, tx: Sender) { + for diff in rstream { + println!("Received diff: {:?}", diff); //send this through a channel + tx.send(diff).await.unwrap(); + sleep(Duration::from_secs(1)).await; + } +} + +async fn parse_bridges(rdsys_tx: Sender, rx: Receiver) { + loop { + let resourcediff = rx.recv().await.unwrap(); + let cmd = Command::Rdsys { resourcediff: resourcediff, }; + rdsys_tx.send(cmd).await.unwrap(); + } + +} + +#[derive(Debug)] +enum Command { + Rdsys { + resourcediff: ResourceDiff, + }, + Request { + request: Request, + } +} // Run with cargo run -- config.json #[tokio::main(worker_threads = 2)] @@ -321,6 +406,10 @@ async fn main() { // Read the JSON contents of the file as a ResourceInfo let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap(); +let (rdsys_tx, mut context_rx) = mpsc::channel(32); +let request_tx = rdsys_tx.clone(); + + let context_manager = spawn(async move { // pass in distribution of open invite vs. hot spare buckets? let bridgedb = BridgeDb::new(); let lox_auth = BridgeAuth::new(bridgedb.pubkey); @@ -329,141 +418,131 @@ async fn main() { db: Arc::new(Mutex::new(bridgedb)), ba: Arc::new(Mutex::new(lox_auth)), }; - //Sender is resource stream and receiver is bridgedb function (add_openinv_bridges) - let (tx, rx) = async_channel::bounded(3); - // to populate the bridge db - let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) - .await - .unwrap(); - spawn(async move { - for diff in rstream { - println!("Received diff: {:?}", diff); //send this through a channel - tx.send(diff).await.expect("can not add to bridgedb)") - } - }); - let context_clone = context.clone(); - let _ = spawn(async move { - while let resourcediff = rx.recv().await.unwrap() { - // spawn(async move { - for new_resource in resourcediff.new { - for pt in new_resource { - println!("A NEW RESOURCE: {:?}", pt); - let mut bucket = [ - BridgeLine::default(), - BridgeLine::default(), - BridgeLine::default(), - ]; - let mut count = 0; - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( - "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", - resource.r#type, - resource.blocked_in, - resource.protocol, - resource.fingerprint, - resource.or_addresses, - resource.distribution, - resource.flags, - resource.params, - ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; + while let Some(cmd) = context_rx.recv().await { + use Command::*; - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; - - println!("Now it's a bridgeline: {:?}", bridgeline); - if count < 2 { - bucket[count] = bridgeline; - count += 1; - } else { - let mut ba_obj = context_clone.ba.lock().unwrap(); - let mut db_obj = context_clone.db.lock().unwrap(); - ba_obj.add_openinv_bridges(bucket, &mut db_obj); - count = 0; - bucket = [ + match cmd { + Rdsys {resourcediff} => { + for new_resource in resourcediff.new { + for pt in new_resource { + println!("A NEW RESOURCE: {:?}", pt); + let mut bucket = [ BridgeLine::default(), BridgeLine::default(), BridgeLine::default(), ]; + let mut count = 0; + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( + "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", + resource.r#type, + resource.blocked_in, + resource.protocol, + resource.fingerprint, + resource.or_addresses, + resource.distribution, + resource.flags, + resource.params, + ); + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; + + println!("Now it's a bridgeline: {:?}", bridgeline); + if count < 2 { + bucket[count] = bridgeline; + count += 1; + } else { + context.add_openinv_bucket(bucket); + count = 0; + bucket = [ + BridgeLine::default(), + BridgeLine::default(), + BridgeLine::default(), + ]; + } + } } } - } - } - for changed_resource in resourcediff.changed { - println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); - } - for gone_resource in resourcediff.gone { - for pt in gone_resource { - println!("A NEW GONE RESOURCE: {:?}", pt); - let mut unreachable_bridge = BridgeLine::default(); - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( - "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", - resource.r#type, - resource.blocked_in, - resource.protocol, - resource.fingerprint, - resource.or_addresses, - resource.distribution, - resource.flags, - resource.params, - ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; - - println!("Now it's a bridgeline: {:?}", bridgeline); - let mut ba_obj = context_clone.ba.lock().unwrap(); - let mut db_obj = context_clone.db.lock().unwrap(); - ba_obj.bridge_unreachable(&unreachable_bridge, &mut db_obj); + for changed_resource in resourcediff.changed { + println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); } + for gone_resource in resourcediff.gone { + for pt in gone_resource { + println!("A NEW GONE RESOURCE: {:?}", pt); + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( + "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", + resource.r#type, + resource.blocked_in, + resource.protocol, + resource.fingerprint, + resource.or_addresses, + resource.distribution, + resource.flags, + resource.params, + ); + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; + + println!("Now it's a bridgeline: {:?}", bridgeline); + context.add_unreachable(bridgeline); + } + } + } + context.encrypt_table(); } + Request {request} => { + } - // Create the encrypted bridge table - let mut ba_obj = context_clone.ba.lock().unwrap(); - ba_obj.enc_bridge_table(); } +} + }); - // let new_bridgedb = task::spawn(load_bridges()); - // Create and initialize a new db and lox_auth - // Make 3 x num_buckets open invitation bridges, in sets of 3 -/* for _ in 0..num_buckets { - let bucket = [random(), random(), random()]; - lox_auth.add_openinv_bridges(bucket, &mut bridgedb); - } -*/ + //Sender is resource stream and receiver is bridgedb function (add_openinv_bridges) + let (tx, mut rx) = async_channel::unbounded(); + // to populate the bridge db + let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) + .await + .unwrap(); + let rdsys_sender_handle = spawn(async {rdsys_sender(rstream, tx).await }); + let bridge_parser = spawn( async {parse_bridges(rdsys_tx, rx).await}); + future::join_all([rdsys_sender_handle, bridge_parser]).await; +/* let new_service = make_service_fn(move |_conn: &AddrStream| { - let context = context.clone(); - let service = service_fn(move |req| { - handle(context.clone(), req) - }); + let service = service_fn(move |req| handle(context, req)); async move { Ok::<_, Infallible>(service) } }); let addr = SocketAddr::from(([127, 0, 0, 1], 8001)); - let server = Server::bind(&addr).serve(new_service); - let graceful = server.with_graceful_shutdown(shutdown_signal()); +// let server = Server::bind(&addr).serve(new_service); +// let graceful = server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", addr); if let Err(e) = graceful.await { eprintln!("server error: {}", e); } +*/ }