use futures::future; use hyper::{ body, body::Bytes, header::HeaderValue, server::conn::AddrStream, service::{make_service_fn, service_fn}, Body, Method, Request, Response, Server, StatusCode, }; use lox::bridge_table::{BridgeLine, BRIDGE_BYTES, ENC_BUCKET_BYTES}; use lox::{ proto::{ blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite, redeem_invite, trust_promotion, }, IssuerPubKey, }; use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH}; use rand::RngCore; use rdsys_backend::{proto::ResourceDiff, start_stream}; use serde::{Deserialize, Serialize}; use std::{ convert::Infallible, env, fs::File, io::BufReader, net::SocketAddr, sync::{Arc, Mutex}, time::Duration, }; use serde_json; use serde_with::serde_as; use tokio::{ spawn, sync::{broadcast, mpsc, oneshot}, time::sleep, signal, }; #[serde_as] #[derive(Serialize, Deserialize)] pub struct Invite { #[serde_as(as = "[_; OPENINV_LENGTH]")] invite: [u8; OPENINV_LENGTH], } #[serde_as] #[derive(Serialize, Deserialize)] pub struct EncBridgeTable { #[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")] etable: Vec<[u8; ENC_BUCKET_BYTES]>, } #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, name: String, token: String, types: Vec, } // Populate Bridgedb from rdsys /// Create a random BridgeLine for testing ONLY. Do not use in production! /// This was copied directly from lox/src/bridge_table.rs in order /// to easily initialize a bridgedb/lox_auth with structurally /// correct buckets to be used for Lox requests/verifications/responses. /// In production, existing bridges should be translated into this format /// in a private function and sorted into buckets (3 bridges/bucket is suggested /// but experience may suggest something else) in some intelligent way. pub fn random() -> BridgeLine { let mut rng = rand::thread_rng(); let mut res: BridgeLine = BridgeLine::default(); // Pick a random 4-byte address let mut addr: [u8; 4] = [0; 4]; rng.fill_bytes(&mut addr); // If the leading byte is 224 or more, that's not a valid IPv4 // address. Choose an IPv6 address instead (but don't worry too // much about it being well formed). if addr[0] >= 224 { rng.fill_bytes(&mut res.addr); } else { // Store an IPv4 address as a v4-mapped IPv6 address res.addr[10] = 255; res.addr[11] = 255; res.addr[12..16].copy_from_slice(&addr); }; let ports: [u16; 4] = [443, 4433, 8080, 43079]; let portidx = (rng.next_u32() % 4) as usize; res.port = ports[portidx]; let mut fingerprint: [u8; 20] = [0; 20]; let mut cert: [u8; 52] = [0; 52]; rng.fill_bytes(&mut fingerprint); rng.fill_bytes(&mut cert); let infostr: String = format!( "obfs4 {} cert={} iat-mode=0", hex_fmt::HexFmt(fingerprint), base64::encode_config(cert, base64::STANDARD_NO_PAD) ); res.info[..infostr.len()].copy_from_slice(infostr.as_bytes()); res } #[derive(Clone)] struct LoxServerContext { db: Arc>, ba: Arc>, } impl LoxServerContext { fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); ba_obj.add_openinv_bridges(bucket, &mut db_obj); } fn add_unreachable(&self, bridgeline: BridgeLine) { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); ba_obj.bridge_unreachable(&bridgeline, &mut db_obj); } fn advance_days_TEST(&self, num: u16) { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.advance_days(num); // FOR TESTING ONLY println!("Today's date according to server: {}", ba_obj.today()); } fn encrypt_table(&self) -> Vec<[u8; ENC_BUCKET_BYTES]> { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.enc_bridge_table().clone() } fn pubkeys(&self) -> Vec { let ba_obj = self.ba.lock().unwrap(); // vector of public keys (to serialize) vec![ ba_obj.lox_pub.clone(), ba_obj.migration_pub.clone(), ba_obj.migrationkey_pub.clone(), ba_obj.reachability_pub.clone(), ba_obj.invitation_pub.clone(), ] } fn gen_invite(&self) -> Invite { let obj = self.db.lock().unwrap(); return Invite { invite: obj.invite(), }; } fn open_inv(&self, req: open_invite::Request) -> open_invite::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_open_invite(req).unwrap() } fn trust_promo(&self, req: trust_promotion::Request) -> trust_promotion::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_trust_promotion(req).unwrap() } fn trust_migration(&self, req: migration::Request) -> migration::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_migration(req).unwrap() } fn level_up(&self, req: level_up::Request) -> level_up::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_level_up(req).unwrap() } fn issue_invite(&self, req: issue_invite::Request) -> issue_invite::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_issue_invite(req).unwrap() } fn redeem_invite(&self, req: redeem_invite::Request) -> redeem_invite::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_redeem_invite(req).unwrap() } fn check_blockage(&self, req: check_blockage::Request) -> check_blockage::Response { let mut ba_obj = self.ba.lock().unwrap(); // Created 5 buckets initially, so we will add 5 hot spares (for migration) and // block all of the existing buckets to trigger migration table propagation // FOR TESTING ONLY, ADD 5 NEW Buckets for _ in 0..5 { let bucket = [random(), random(), random()]; ba_obj.add_spare_bucket(bucket); } ba_obj.enc_bridge_table(); // FOR TESTING ONLY, BLOCK ALL BRIDGES let mut db_obj = self.db.lock().unwrap(); for index in 0..5 { let b0 = ba_obj.bridge_table.buckets[index][0]; let b1 = ba_obj.bridge_table.buckets[index][1]; let b2 = ba_obj.bridge_table.buckets[index][2]; ba_obj.bridge_unreachable(&b0, &mut db_obj); ba_obj.bridge_unreachable(&b1, &mut db_obj); ba_obj.bridge_unreachable(&b2, &mut db_obj); } ba_obj.enc_bridge_table(); ba_obj.handle_check_blockage(req).unwrap() } fn blockage_migration(&self, req: blockage_migration::Request) -> blockage_migration::Response { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_blockage_migration(req).unwrap() } } // Lox Request handling logic for each Lox request/protocol async fn handle( cloned_context: LoxServerContext, req: Request, ) -> Result, Infallible> { println!("Request: {:?}", req); match req.method() { &Method::OPTIONS => Ok(Response::builder() .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) .header("Access-Control-Allow-Headers", "accept, content-type") .header("Access-Control-Allow-Methods", "POST") .status(200) .body(Body::from("Allow POST")) .unwrap()), _ => match (req.method(), req.uri().path()) { (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(cloned_context)), (&Method::GET, "/reachability") => { Ok::<_, Infallible>(send_reachability_cred(cloned_context)) } (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(cloned_context)), (&Method::POST, "/openreq") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_open_cred(bytes, cloned_context) }), (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_trust_promo(bytes, cloned_context) }), (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_trust_migration(bytes, cloned_context) }), (&Method::POST, "/levelup") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_level_up(bytes, cloned_context) }), (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_issue_invite(bytes, cloned_context) }), (&Method::POST, "/redeem") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_redeem_invite(bytes, cloned_context) }), (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); // TEST ONLY: Block all existing bridges and add new ones for migration verify_and_send_check_blockage(bytes, cloned_context) }), (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_blockage_migration(bytes, cloned_context) }), _ => { // Return 404 not found response. Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("Not found")) .unwrap()) } }, } } // Generate and return an open invitation token fn generate_invite(context: LoxServerContext) -> Response { let invite = context.gen_invite(); let token = serde_json::to_string(&invite).unwrap(); let mut resp = Response::new(Body::from(token)); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } // Return the serialized encrypted bridge table fn send_reachability_cred(context: LoxServerContext) -> Response { context.advance_days_TEST(85); // FOR TESTING ONLY let enc_table = context.encrypt_table(); let etable = EncBridgeTable { etable: enc_table }; let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap())); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } // Return the serialized pubkeys for the Bridge Authority fn send_keys(context: LoxServerContext) -> Response { let pubkeys = context.pubkeys(); let mut resp = Response::new(Body::from(serde_json::to_string(&pubkeys).unwrap())); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } fn verify_and_send_open_cred(request: Bytes, context: LoxServerContext) -> Response { let req: open_invite::Request = serde_json::from_slice(&request).unwrap(); let response = context.open_inv(req); let open_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(open_invite_resp_str) } fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> Response { let req: trust_promotion::Request = serde_json::from_slice(&request).unwrap(); context.advance_days_TEST(31); // FOR TESTING ONLY let response = context.trust_promo(req); let trust_promo_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(trust_promo_resp_str) } fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response { let req: migration::Request = serde_json::from_slice(&request).unwrap(); let response = context.trust_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); prepare_header(resp_str) } fn verify_and_send_level_up(request: Bytes, context: LoxServerContext) -> Response { let req: level_up::Request = serde_json::from_slice(&request).unwrap(); let response = context.level_up(req); let level_up_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(level_up_resp_str) } fn verify_and_send_issue_invite(request: Bytes, context: LoxServerContext) -> Response { let req: issue_invite::Request = serde_json::from_slice(&request).unwrap(); let response = context.issue_invite(req); let issue_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(issue_invite_resp_str) } fn verify_and_send_redeem_invite(request: Bytes, context: LoxServerContext) -> Response { let req: redeem_invite::Request = serde_json::from_slice(&request).unwrap(); let response = context.redeem_invite(req); let redeem_invite_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(redeem_invite_resp_str) } fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) -> Response { let req: check_blockage::Request = serde_json::from_slice(&request).unwrap(); let response = context.check_blockage(req); let check_blockage_resp_str = serde_json::to_string(&response).unwrap(); prepare_header(check_blockage_resp_str) } fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response { let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap(); let response = context.blockage_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); prepare_header(resp_str) } fn prepare_header(response: String) -> Response { let mut resp = Response::new(Body::from(response)); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to listen for ctrl+c signal"); println!("Shut down Lox Server"); } async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>) { tokio:: select! { start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream , _ = kill.recv() => {println!("Shut down rdsys stream"); return}, } } // Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified // in the config.json file. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) { let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) .await .expect("rdsys stream initialization failed. Start rdsys or check config.json"); sleep(Duration::from_millis(1)).await; for diff in rstream { println!("Received diff: {:?}", diff); //send this through a channel tx.send(diff).await.unwrap(); sleep(Duration::from_secs(1)).await; } } async fn rdsys_bridge_parser(rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) { tokio:: select! { start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , _ = kill.recv() => {println!("Shut down bridge_parser"); return}, } } // Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the // Context Manager to be parsed and added to the BridgeDB async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resourcediff = rx.recv().await.unwrap(); let cmd = Command::Rdsys { resourcediff: resourcediff, }; rdsys_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; } } async fn create_context_manager(context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) { tokio:: select! { create_context = context_manager(context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager"); return}, } } // Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state async fn context_manager(mut context_rx: mpsc::Receiver) { let bridgedb = BridgeDb::new(); let lox_auth = BridgeAuth::new(bridgedb.pubkey); let context = LoxServerContext { db: Arc::new(Mutex::new(bridgedb)), ba: Arc::new(Mutex::new(lox_auth)), }; while let Some(cmd) = context_rx.recv().await { use Command::*; match cmd { Rdsys { resourcediff } => { for new_resource in resourcediff.new { for pt in new_resource { println!("A NEW RESOURCE: {:?}", pt); let mut bucket = [ BridgeLine::default(), BridgeLine::default(), BridgeLine::default(), ]; let mut count = 0; for resource in pt.1 { let mut ip_bytes: [u8; 16] = [0; 16]; ip_bytes[..resource.address.len()] .copy_from_slice(resource.address.as_bytes()); let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, resource.protocol, resource.fingerprint, resource.or_addresses, resource.distribution, resource.flags, resource.params, ); let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); let bridgeline = BridgeLine { addr: ip_bytes, port: resource.port, info: info_bytes, }; println!("Now it's a bridgeline: {:?}", bridgeline); if count < 2 { bucket[count] = bridgeline; count += 1; } else { context.add_openinv_bucket(bucket); count = 0; bucket = [ BridgeLine::default(), BridgeLine::default(), BridgeLine::default(), ]; } } } } for changed_resource in resourcediff.changed { println!("A NEW CHANGED RESOURCE: {:?}", changed_resource); } for gone_resource in resourcediff.gone { for pt in gone_resource { println!("A NEW GONE RESOURCE: {:?}", pt); for resource in pt.1 { let mut ip_bytes: [u8; 16] = [0; 16]; ip_bytes[..resource.address.len()] .copy_from_slice(resource.address.as_bytes()); let infostr: String = format!( "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}", resource.r#type, resource.blocked_in, resource.protocol, resource.fingerprint, resource.or_addresses, resource.distribution, resource.flags, resource.params, ); let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18]; info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); let bridgeline = BridgeLine { addr: ip_bytes, port: resource.port, info: info_bytes, }; println!("Now it's a bridgeline: {:?}", bridgeline); context.add_unreachable(bridgeline); } } } context.encrypt_table(); sleep(Duration::from_millis(1)).await; } Request { req, sender } => { let response = handle(context.clone(), req).await; if let Err(e) = sender.send(response) { eprintln!("Server Response Error: {:?}", e); }; sleep(Duration::from_millis(1)).await; } Shutdown { shutdown_sig} => { println!("Sending Shutdown Signal, all threads should shutdown."); drop(shutdown_sig); println!("Shutdown Sent."); } } } } // Each of the commands that the Context Manager handles #[derive(Debug)] enum Command { Rdsys { resourcediff: ResourceDiff, }, Request { req: Request, sender: oneshot::Sender, Infallible>>, }, Shutdown { shutdown_sig: broadcast::Sender<()>, } } #[tokio::main] async fn main() { let args: Vec = env::args().collect(); let file = File::open(&args[1]).expect("Should have been able to read config.json file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo let rtype: ResourceInfo = serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); let shutdown_cmd_tx = rdsys_tx.clone(); // create the shutdown broadcast channel and clone for every thread let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); let kill_stream = shutdown_tx.subscribe(); let kill_parser = shutdown_tx.subscribe(); let kill_context= shutdown_tx.subscribe(); // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx let shutdown_handler = spawn(async move { tokio::select! { _ = signal::ctrl_c() => { let cmd = Command::Shutdown { shutdown_sig: shutdown_tx, }; shutdown_cmd_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; _ = shutdown_rx.recv().await; } } }); let context_manager = spawn(async move { create_context_manager(context_rx, kill_context).await }); let (tx, rx) = mpsc::channel(32); let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await }); let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); let make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = request_tx.clone(); let service = service_fn(move |req| { let request_tx = request_tx.clone(); let (response_tx, response_rx) = oneshot::channel(); let cmd = Command::Request { req: req, sender: response_tx, }; async move { request_tx.send(cmd).await.unwrap(); response_rx.await.unwrap() } }); async move { Ok::<_, Infallible>(service) } }); let addr = SocketAddr::from(([127, 0, 0, 1], 8001)); let server = Server::bind(&addr).serve(make_service); let graceful = server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", addr); if let Err(e) = graceful.await { eprintln!("server error: {}", e); } future::join_all([ rdsys_stream_handler, rdsys_resource_receiver, context_manager, shutdown_handler, ]) .await; }