diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index b861b9c..cff7343 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -1,9 +1,10 @@ -use async_channel::{Sender, Receiver}; +use async_channel::{Receiver, Sender}; use futures::future; use hyper::{ body, body::Bytes, header::HeaderValue, + http::response, server::conn::AddrStream, service::{make_service_fn, service_fn}, Body, Method, Request, Response, Server, StatusCode, @@ -32,7 +33,7 @@ use std::{ use serde_json; use serde_with::serde_as; -use tokio::{spawn, time::sleep, sync::mpsc}; +use tokio::{spawn, sync::{mpsc, oneshot}, time::sleep}; #[serde_as] #[derive(Serialize, Deserialize)] @@ -208,9 +209,9 @@ impl LoxServerContext { } } + async fn handle( - context: LoxServerContext, - // addr: SocketAddr, + cloned_context: LoxServerContext, req: Request, ) -> Result, Infallible> { println!("Request: {:?}", req); @@ -223,41 +224,41 @@ async fn handle( .body(Body::from("Allow POST")) .unwrap()), _ => match (req.method(), req.uri().path()) { - (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context)), - (&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context)), - (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context)), + (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(cloned_context)), + (&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(cloned_context)), + (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(cloned_context)), (&Method::POST, "/openreq") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_open_cred(bytes, context) + verify_and_send_open_cred(bytes, cloned_context) }), (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_trust_promo(bytes, context) + verify_and_send_trust_promo(bytes, cloned_context) }), (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_trust_migration(bytes, context) + verify_and_send_trust_migration(bytes, cloned_context) }), (&Method::POST, "/levelup") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_level_up(bytes, context) + verify_and_send_level_up(bytes, cloned_context) }), (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_issue_invite(bytes, context) + verify_and_send_issue_invite(bytes, cloned_context) }), (&Method::POST, "/redeem") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_redeem_invite(bytes, context) + verify_and_send_redeem_invite(bytes, cloned_context) }), (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); // TEST ONLY: Block all existing bridges and add new ones for migration - verify_and_send_check_blockage(bytes, context) + verify_and_send_check_blockage(bytes, cloned_context) }), (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); - verify_and_send_blockage_migration(bytes, context) + verify_and_send_blockage_migration(bytes, cloned_context) }), _ => { // Return 404 not found response. @@ -378,27 +379,27 @@ async fn rdsys_sender(rstream: ResourceStream, tx: Sender) { } } -async fn parse_bridges(rdsys_tx: Sender, rx: Receiver) { +async fn parse_bridges(rdsys_tx: mpsc::Sender, rx: Receiver) { loop { let resourcediff = rx.recv().await.unwrap(); - let cmd = Command::Rdsys { resourcediff: resourcediff, }; + let cmd = Command::Rdsys { + resourcediff: resourcediff, + }; rdsys_tx.send(cmd).await.unwrap(); } - } #[derive(Debug)] enum Command { - Rdsys { - resourcediff: ResourceDiff, - }, + Rdsys { resourcediff: ResourceDiff }, Request { - request: Request, - } + req: Request, + sender: oneshot::Sender, Infallible>>, + }, } // Run with cargo run -- config.json -#[tokio::main(worker_threads = 2)] +#[tokio::main] async fn main() { let args: Vec = env::args().collect(); let file = File::open(&args[1]).expect("Should have been able to read config.json file"); @@ -406,38 +407,38 @@ async fn main() { // Read the JSON contents of the file as a ResourceInfo let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap(); -let (rdsys_tx, mut context_rx) = mpsc::channel(32); -let request_tx = rdsys_tx.clone(); + let (rdsys_tx, mut context_rx) = mpsc::channel(32); + let mut request_tx = rdsys_tx.clone(); let context_manager = spawn(async move { - // pass in distribution of open invite vs. hot spare buckets? - let bridgedb = BridgeDb::new(); - let lox_auth = BridgeAuth::new(bridgedb.pubkey); + // pass in distribution of open invite vs. hot spare buckets? + let bridgedb = BridgeDb::new(); + let lox_auth = BridgeAuth::new(bridgedb.pubkey); - let context = LoxServerContext { - db: Arc::new(Mutex::new(bridgedb)), - ba: Arc::new(Mutex::new(lox_auth)), - }; + let context = LoxServerContext { + db: Arc::new(Mutex::new(bridgedb)), + ba: Arc::new(Mutex::new(lox_auth)), + }; - while let Some(cmd) = context_rx.recv().await { - use Command::*; + while let Some(cmd) = context_rx.recv().await { + use Command::*; - match cmd { - Rdsys {resourcediff} => { - for new_resource in resourcediff.new { - for pt in new_resource { - println!("A NEW RESOURCE: {:?}", pt); - let mut bucket = [ - BridgeLine::default(), - BridgeLine::default(), - BridgeLine::default(), - ]; - let mut count = 0; - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( + match cmd { + Rdsys { resourcediff } => { + for new_resource in resourcediff.new { + for pt in new_resource { + println!("A NEW RESOURCE: {:?}", pt); + let mut bucket = [ + BridgeLine::default(), + BridgeLine::default(), + BridgeLine::default(), + ]; + let mut count = 0; + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, @@ -448,42 +449,43 @@ let request_tx = rdsys_tx.clone(); resource.flags, resource.params, ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; - - println!("Now it's a bridgeline: {:?}", bridgeline); - if count < 2 { - bucket[count] = bridgeline; - count += 1; - } else { - context.add_openinv_bucket(bucket); - count = 0; - bucket = [ - BridgeLine::default(), - BridgeLine::default(), - BridgeLine::default(), - ]; + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = + [0; BRIDGE_BYTES - 18]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; + + println!("Now it's a bridgeline: {:?}", bridgeline); + if count < 2 { + bucket[count] = bridgeline; + count += 1; + } else { + context.add_openinv_bucket(bucket); + count = 0; + bucket = [ + BridgeLine::default(), + BridgeLine::default(), + BridgeLine::default(), + ]; + } } } } - } - for changed_resource in resourcediff.changed { - println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); - } - for gone_resource in resourcediff.gone { - for pt in gone_resource { - println!("A NEW GONE RESOURCE: {:?}", pt); - for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let infostr: String = format!( + for changed_resource in resourcediff.changed { + println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); + } + for gone_resource in resourcediff.gone { + for pt in gone_resource { + println!("A NEW GONE RESOURCE: {:?}", pt); + for resource in pt.1 { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()] + .copy_from_slice(resource.address.as_bytes()); + let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, @@ -494,28 +496,29 @@ let request_tx = rdsys_tx.clone(); resource.flags, resource.params, ); - let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - info: info_bytes, - }; - - println!("Now it's a bridgeline: {:?}", bridgeline); - context.add_unreachable(bridgeline); + let mut info_bytes: [u8; BRIDGE_BYTES - 18] = + [0; BRIDGE_BYTES - 18]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + let bridgeline = BridgeLine { + addr: ip_bytes, + port: resource.port, + info: info_bytes, + }; + + println!("Now it's a bridgeline: {:?}", bridgeline); + context.add_unreachable(bridgeline); + } } } + context.encrypt_table(); + } + Request { req, sender } => { + let response = handle(context.clone(), req).await; + sender.send(response); } - context.encrypt_table(); } - Request {request} => { - } - } -} - }); //Sender is resource stream and receiver is bridgedb function (add_openinv_bridges) @@ -524,25 +527,38 @@ let request_tx = rdsys_tx.clone(); let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .unwrap(); - let rdsys_sender_handle = spawn(async {rdsys_sender(rstream, tx).await }); + let rdsys_stream_handler = spawn(async { rdsys_sender(rstream, tx).await }); - let bridge_parser = spawn( async {parse_bridges(rdsys_tx, rx).await}); + let rdsys_resource_receiver = spawn(async { parse_bridges(rdsys_tx, rx).await }); - future::join_all([rdsys_sender_handle, bridge_parser]).await; -/* - let new_service = make_service_fn(move |_conn: &AddrStream| { - let service = service_fn(move |req| handle(context, req)); + let make_service = make_service_fn(|_conn: &AddrStream| { + let request_tx = request_tx.clone(); + let service = service_fn( move |req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Request { + req: req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); async move { Ok::<_, Infallible>(service) } }); let addr = SocketAddr::from(([127, 0, 0, 1], 8001)); -// let server = Server::bind(&addr).serve(new_service); -// let graceful = server.with_graceful_shutdown(shutdown_signal()); + let server = Server::bind(&addr).serve(make_service); + let graceful = server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", addr); - if let Err(e) = graceful.await { eprintln!("server error: {}", e); } -*/ + + future::join_all([rdsys_stream_handler, rdsys_resource_receiver]).await; + } + +