diff --git a/crates/lox-distributor/.gitignore b/crates/lox-distributor/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/crates/lox-distributor/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/crates/lox-distributor/Cargo.toml similarity index 100% rename from Cargo.toml rename to crates/lox-distributor/Cargo.toml diff --git a/crates/lox-distributor/README.md b/crates/lox-distributor/README.md new file mode 100644 index 0000000..cbfe60f --- /dev/null +++ b/crates/lox-distributor/README.md @@ -0,0 +1,35 @@ +# Lox Distributor + +The Lox distributor receives resources from [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) and writes them to [Lox +BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). + +## Configure rdsys stream + +A test `config.json` is included for testing on a local instance of rdsys. This +can be edited to correspond to the desired types of resources and endpoints. + +## Test Run + +For testing purposes, you will need a running instance of rdsys as well as a running Lox client. + +### Run rdsys locally + +First clone rdsys from [here](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) then change into the backend directory: + +``` +cd rdsys/cmd/backend +``` + +Finally run rdsys: + +``` + ./backend --config config.json +``` + +### Run Lox Distributor locally + +Simply run `cargo run -- config.json` :) + +### Run a Lox client locally + +First clone lox-wasm from [here](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). Follow the instructions in the [README](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm/-/blob/main/README.md) to build and test the Lox client. diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json new file mode 100644 index 0000000..177e5a2 --- /dev/null +++ b/crates/lox-distributor/config.json @@ -0,0 +1,9 @@ +{ + "endpoint": "http://127.0.0.1:7100/resource-stream", + "name": "https", + "token": "HttpsApiTokenPlaceholder", + "types": [ + "obfs2", + "scramblesuit" + ] +} \ No newline at end of file diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs new file mode 100644 index 0000000..6c476e2 --- /dev/null +++ b/crates/lox-distributor/src/lox_context.rs @@ -0,0 +1,325 @@ +use hyper::{body::Bytes, header::HeaderValue, Body, Response}; + +use lox::{ + bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET}, + proto::{ + blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite, + redeem_invite, trust_promotion, + }, + BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH, +}; +use rand::RngCore; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use std::sync::{Arc, Mutex}; + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct Invite { + #[serde_as(as = "[_; OPENINV_LENGTH]")] + invite: [u8; OPENINV_LENGTH], +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct EncBridgeTable { + #[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")] + etable: Vec<[u8; ENC_BUCKET_BYTES]>, +} + +/// Create a random BridgeLine for testing ONLY. Do not use in production! +/// This was copied directly from lox/src/bridge_table.rs in order +/// to easily initialize a bridgedb/lox_auth with structurally +/// correct buckets to be used for Lox requests/verifications/responses. +/// In production, existing bridges should be translated into this format +/// in a private function and sorted into buckets (3 bridges/bucket is suggested +/// but experience may suggest something else) in some intelligent way. + +pub fn random() -> BridgeLine { + let mut rng = rand::thread_rng(); + let mut res: BridgeLine = BridgeLine::default(); + // Pick a random 4-byte address + let mut addr: [u8; 4] = [0; 4]; + rng.fill_bytes(&mut addr); + // If the leading byte is 224 or more, that's not a valid IPv4 + // address. Choose an IPv6 address instead (but don't worry too + // much about it being well formed). + if addr[0] >= 224 { + rng.fill_bytes(&mut res.addr); + } else { + // Store an IPv4 address as a v4-mapped IPv6 address + res.addr[10] = 255; + res.addr[11] = 255; + res.addr[12..16].copy_from_slice(&addr); + }; + let ports: [u16; 4] = [443, 4433, 8080, 43079]; + let portidx = (rng.next_u32() % 4) as usize; + res.port = ports[portidx]; + res.uid_fingerprint = rng.next_u64(); + let mut cert: [u8; 52] = [0; 52]; + rng.fill_bytes(&mut cert); + let infostr: String = format!( + "obfs4 cert={}, iat-mode=0", + base64::encode_config(cert, base64::STANDARD_NO_PAD) + ); + res.info[..infostr.len()].copy_from_slice(infostr.as_bytes()); + res +} + +#[derive(Clone)] +pub struct LoxServerContext { + pub db: Arc>, + pub ba: Arc>, + pub extra_bridges: Arc>>, + pub unreplaced_bridges: Arc>>, +} + +impl LoxServerContext { + pub fn append_extra_bridges(&self, bridge: BridgeLine) { + let mut extra_bridges = self.extra_bridges.lock().unwrap(); + extra_bridges.push(bridge); + } + + pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] { + let mut extra_bridges = self.extra_bridges.lock().unwrap(); + let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + for i in 0..MAX_BRIDGES_PER_BUCKET { + return_bridges[i] = extra_bridges.remove(i); + } + + return_bridges + } + + pub fn remove_single_bridge(&self) { + let mut extra_bridges = self.extra_bridges.lock().unwrap(); + let length = extra_bridges.len(); + _ = extra_bridges.remove(length - 1) + } + + pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) { + let mut unreplaced_bridges = self.unreplaced_bridges.lock().unwrap(); + unreplaced_bridges.push(bridge); + } + + pub fn allocate_leftover_bridges(&self) { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + let mut extra_bridges = self.extra_bridges.lock().unwrap(); + ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj); + } + + pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.add_openinv_bridges(bucket, &mut db_obj); + } + + pub fn add_spare_bucket(&self, bucket: [BridgeLine; 3]) { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.add_spare_bucket(bucket); + } + + pub fn replace_with_new(&self, bridgeline: BridgeLine) -> bool { + let mut ba_obj = self.ba.lock().unwrap(); + let eb_obj = self.extra_bridges.lock().unwrap(); + let available_bridge = eb_obj.last(); + // .last() doesn't actually remove the object so we still have to do that + if eb_obj.len() > 0 { + self.remove_single_bridge(); + } + ba_obj.bridge_replace(&bridgeline, available_bridge) + } + + pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.bridge_unreachable(&bridgeline, &mut db_obj) + } + + pub fn update_bridge(&self, bridgeline: BridgeLine) -> bool { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.bridge_update(&bridgeline) + } + + fn advance_days_TEST(&self, num: u16) { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.advance_days(num); // FOR TESTING ONLY + println!("Today's date according to server: {}", ba_obj.today()); + } + + pub fn encrypt_table(&self) -> Vec<[u8; ENC_BUCKET_BYTES]> { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.enc_bridge_table().clone() + } + + fn pubkeys(&self) -> Vec { + let ba_obj = self.ba.lock().unwrap(); + // vector of public keys (to serialize) + vec![ + ba_obj.lox_pub.clone(), + ba_obj.migration_pub.clone(), + ba_obj.migrationkey_pub.clone(), + ba_obj.reachability_pub.clone(), + ba_obj.invitation_pub.clone(), + ] + } + + fn gen_invite(&self) -> Invite { + let obj = self.db.lock().unwrap(); + Invite { + invite: obj.invite(), + } + } + + fn open_inv(&self, req: open_invite::Request) -> open_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_open_invite(req).unwrap() + } + + fn trust_promo(&self, req: trust_promotion::Request) -> trust_promotion::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_trust_promotion(req).unwrap() + } + + fn trust_migration(&self, req: migration::Request) -> migration::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_migration(req).unwrap() + } + + fn level_up(&self, req: level_up::Request) -> level_up::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_level_up(req).unwrap() + } + + fn issue_invite(&self, req: issue_invite::Request) -> issue_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_issue_invite(req).unwrap() + } + + fn redeem_invite(&self, req: redeem_invite::Request) -> redeem_invite::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_redeem_invite(req).unwrap() + } + + fn check_blockage(&self, req: check_blockage::Request) -> check_blockage::Response { + let mut ba_obj = self.ba.lock().unwrap(); + // Created 5 buckets initially, so we will add 5 hot spares (for migration) and + // block all of the existing buckets to trigger migration table propagation + // FOR TESTING ONLY, ADD 5 NEW Buckets + for _ in 0..5 { + let bucket = [random(), random(), random()]; + ba_obj.add_spare_bucket(bucket); + } + ba_obj.enc_bridge_table(); + + // FOR TESTING ONLY, BLOCK ALL BRIDGES + let mut db_obj = self.db.lock().unwrap(); + for index in 0..5 { + let b0 = ba_obj.bridge_table.buckets[index][0]; + let b1 = ba_obj.bridge_table.buckets[index][1]; + let b2 = ba_obj.bridge_table.buckets[index][2]; + ba_obj.bridge_unreachable(&b0, &mut db_obj); + ba_obj.bridge_unreachable(&b1, &mut db_obj); + ba_obj.bridge_unreachable(&b2, &mut db_obj); + } + ba_obj.enc_bridge_table(); + ba_obj.handle_check_blockage(req).unwrap() + } + + fn blockage_migration(&self, req: blockage_migration::Request) -> blockage_migration::Response { + let mut ba_obj = self.ba.lock().unwrap(); + ba_obj.handle_blockage_migration(req).unwrap() + } +} + +// Generate and return an open invitation token +pub fn generate_invite(context: LoxServerContext) -> Response { + let invite = context.gen_invite(); + let token = serde_json::to_string(&invite).unwrap(); + prepare_header(token) +} + +// Return the serialized encrypted bridge table +pub fn send_reachability_cred(context: LoxServerContext) -> Response { + context.advance_days_TEST(85); // FOR TESTING ONLY + let enc_table = context.encrypt_table(); + let etable = EncBridgeTable { etable: enc_table }; + prepare_header(serde_json::to_string(&etable).unwrap()) +} + +// Return the serialized pubkeys for the Bridge Authority +pub fn send_keys(context: LoxServerContext) -> Response { + let pubkeys = context.pubkeys(); + prepare_header(serde_json::to_string(&pubkeys).unwrap()) +} + +pub fn verify_and_send_open_cred(request: Bytes, context: LoxServerContext) -> Response { + let req: open_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.open_inv(req); + let open_invite_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(open_invite_resp_str) +} + +pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> Response { + let req: trust_promotion::Request = serde_json::from_slice(&request).unwrap(); + context.advance_days_TEST(31); // FOR TESTING ONLY + let response = context.trust_promo(req); + let trust_promo_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(trust_promo_resp_str) +} + +pub fn verify_and_send_trust_migration( + request: Bytes, + context: LoxServerContext, +) -> Response { + let req: migration::Request = serde_json::from_slice(&request).unwrap(); + let response = context.trust_migration(req); + let resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(resp_str) +} + +pub fn verify_and_send_level_up(request: Bytes, context: LoxServerContext) -> Response { + let req: level_up::Request = serde_json::from_slice(&request).unwrap(); + let response = context.level_up(req); + let level_up_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(level_up_resp_str) +} + +pub fn verify_and_send_issue_invite(request: Bytes, context: LoxServerContext) -> Response { + let req: issue_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.issue_invite(req); + let issue_invite_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(issue_invite_resp_str) +} + +pub fn verify_and_send_redeem_invite(request: Bytes, context: LoxServerContext) -> Response { + let req: redeem_invite::Request = serde_json::from_slice(&request).unwrap(); + let response = context.redeem_invite(req); + let redeem_invite_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(redeem_invite_resp_str) +} + +pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) -> Response { + let req: check_blockage::Request = serde_json::from_slice(&request).unwrap(); + + let response = context.check_blockage(req); + let check_blockage_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(check_blockage_resp_str) +} + +pub fn verify_and_send_blockage_migration( + request: Bytes, + context: LoxServerContext, +) -> Response { + let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap(); + let response = context.blockage_migration(req); + let resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(resp_str) +} + +fn prepare_header(response: String) -> Response { + let mut resp = Response::new(Body::from(response)); + resp.headers_mut() + .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); + resp +} diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs new file mode 100644 index 0000000..7c62334 --- /dev/null +++ b/crates/lox-distributor/src/main.rs @@ -0,0 +1,344 @@ +use futures::future; +use futures::StreamExt; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; +use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; +use lox::{BridgeAuth, BridgeDb}; + +use rdsys_backend::{proto::ResourceDiff, start_stream}; +use serde::Deserialize; +use std::{ + convert::Infallible, + env, + fs::File, + io::BufReader, + net::SocketAddr, + sync::{Arc, Mutex}, + time::Duration, +}; + +mod lox_context; +mod request_handler; +use request_handler::handle; +mod resource_parser; +use resource_parser::parse_resource; + +use tokio::{ + signal, spawn, + sync::{broadcast, mpsc, oneshot}, + time::sleep, +}; + +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c signal"); + println!("Shut down Lox Server"); +} + +#[derive(Debug, Deserialize)] +struct ResourceInfo { + endpoint: String, + name: String, + token: String, + types: Vec, +} +// Populate Bridgedb from rdsys + +// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified +// in the config.json file. +// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. +async fn rdsys_stream( + rtype: ResourceInfo, + tx: mpsc::Sender, + mut kill: broadcast::Receiver<()>, +) { + let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) + .await + .expect("rdsys stream initialization failed. Start rdsys or check config.json"); + loop { + tokio::select! { + res = rstream.next() => { + match res { + Some(diff) => tx.send(diff).await.unwrap(), + None => return, + } + }, + _ = kill.recv() => {println!("Shut down rdsys stream"); return}, + + } + } +} + +async fn rdsys_bridge_parser( + rdsys_tx: mpsc::Sender, + rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { + start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , + _ = kill.recv() => {println!("Shut down bridge_parser");}, + } +} + +// Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the +// Context Manager to be parsed and added to the BridgeDB +async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { + loop { + let resourcediff = rx.recv().await.unwrap(); + let cmd = Command::Rdsys { resourcediff }; + rdsys_tx.send(cmd).await.unwrap(); + sleep(Duration::from_secs(1)).await; + } +} + +async fn create_context_manager( + context_rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { + create_context = context_manager(context_rx) => create_context, + _ = kill.recv() => {println!("Shut down context_manager");}, + } +} + +// Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring +// that the DB can be updated from the rdsys stream and client requests +// can be responded to with an updated BridgeDB state +async fn context_manager(mut context_rx: mpsc::Receiver) { + let bridgedb = BridgeDb::new(); + let lox_auth = BridgeAuth::new(bridgedb.pubkey); + + let context = lox_context::LoxServerContext { + db: Arc::new(Mutex::new(bridgedb)), + ba: Arc::new(Mutex::new(lox_auth)), + extra_bridges: Arc::new(Mutex::new(Vec::new())), + unreplaced_bridges: Arc::new(Mutex::new(Vec::new())), + }; + + while let Some(cmd) = context_rx.recv().await { + use Command::*; + match cmd { + Rdsys { resourcediff } => { + if let Some(new_resources) = resourcediff.new { + let mut count = 0; + let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + for pt in new_resources { + println!("A NEW RESOURCE: {:?}", pt); + for resource in pt.1 { + let bridgeline = parse_resource(resource); + println!("Now it's a bridgeline: {:?}", bridgeline); + if context.unreplaced_bridges.lock().unwrap().len() > 0 { + println!("BridgeLine to be replaced: {:?}", bridgeline); + let res = context.replace_with_new(bridgeline); + if res { + println!("BridgeLine successfully replaced: {:?}", bridgeline); + } else { + // Add the bridge to the list of unreplaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline); + context.new_unreplaced_bridge(bridgeline); + } + } else if count < MAX_BRIDGES_PER_BUCKET { + bucket[count] = bridgeline; + count += 1; + } else { + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + context.add_openinv_bucket(bucket); + count = 0; + bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + } + } + } + // Handle the extra buckets that were not allocated already + if count != 0 { + for val in 0..count { + if context.extra_bridges.lock().unwrap().len() + < (MAX_BRIDGES_PER_BUCKET) + { + context.append_extra_bridges(bucket[val]); + } else { + bucket = context.remove_extra_bridges(); + context.add_spare_bucket(bucket); + } + } + } + } + if let Some(changed_resources) = resourcediff.changed { + for pt in changed_resources { + println!("A NEW CHANGED RESOURCE: {:?}", pt); + for resource in pt.1 { + let bridgeline = parse_resource(resource); + println!("BridgeLine to be changed: {:?}", bridgeline); + let res = context.update_bridge(bridgeline); + if res { + println!("BridgeLine successfully updated: {:?}", bridgeline); + } else { + println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline); + if context.extra_bridges.lock().unwrap().len() < 2 { + context.append_extra_bridges(bridgeline); + } else { + let bucket = context.remove_extra_bridges(); + context.add_spare_bucket(bucket); + } + } + } + } + } + // gone resources are not the same as blocked resources. + // Instead, these are bridges which have either failed to pass tests for some period + // or have expired bridge descriptors. In both cases, the bridge is unusable, but this + // is not likely due to censorship. Therefore, we replace gone resources with new resources + // TODO: create a notion of blocked resources from information collected through various means: + // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 + if let Some(gone_resources) = resourcediff.gone { + for pt in gone_resources { + println!("A NEW GONE RESOURCE: {:?}", pt); + for resource in pt.1 { + let bridgeline = parse_resource(resource); + println!("BridgeLine to be replaced: {:?}", bridgeline); + let res = context.replace_with_new(bridgeline); + if res { + println!("BridgeLine successfully replaced: {:?}", bridgeline); + } else { + // Add the bridge to the list of unreplaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", + bridgeline + ); + context.new_unreplaced_bridge(bridgeline); + } + } + } + } + /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not + yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not + currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something + like the following: + println!("BridgeLine to be removed: {:?}", bridgeline); + let res = context.add_unreachable(bridgeline); + if res { + println!( + "BridgeLine successfully marked unreachable: {:?}", + bridgeline + ); + } else { + println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); + //TODO probably do something else here + } + */ + context.allocate_leftover_bridges(); + context.encrypt_table(); + sleep(Duration::from_millis(1)).await; + } + Request { req, sender } => { + let response = handle(context.clone(), req).await; + if let Err(e) = sender.send(response) { + eprintln!("Server Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; + } + Shutdown { shutdown_sig } => { + println!("Sending Shutdown Signal, all threads should shutdown."); + drop(shutdown_sig); + println!("Shutdown Sent."); + } + } + } +} + +// Each of the commands that the Context Manager handles +#[derive(Debug)] +enum Command { + Rdsys { + resourcediff: ResourceDiff, + }, + Request { + req: Request, + sender: oneshot::Sender, Infallible>>, + }, + Shutdown { + shutdown_sig: broadcast::Sender<()>, + }, +} + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + let file = File::open(&args[1]).expect("Should have been able to read config.json file"); + let reader = BufReader::new(file); + // Read the JSON contents of the file as a ResourceInfo + let rtype: ResourceInfo = + serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); + + let (rdsys_tx, context_rx) = mpsc::channel(32); + let request_tx = rdsys_tx.clone(); + let shutdown_cmd_tx = rdsys_tx.clone(); + + // create the shutdown broadcast channel and clone for every thread + let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); + let kill_stream = shutdown_tx.subscribe(); + let kill_parser = shutdown_tx.subscribe(); + let kill_context = shutdown_tx.subscribe(); + + // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx + let shutdown_handler = spawn(async move { + tokio::select! { + _ = signal::ctrl_c() => { + let cmd = Command::Shutdown { + shutdown_sig: shutdown_tx, + }; + shutdown_cmd_tx.send(cmd).await.unwrap(); + sleep(Duration::from_secs(1)).await; + + _ = shutdown_rx.recv().await; + } + } + }); + + let context_manager = + spawn(async move { create_context_manager(context_rx, kill_context).await }); + + let (tx, rx) = mpsc::channel(32); + let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await }); + + let rdsys_resource_receiver = + spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); + + let make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = request_tx.clone(); + let service = service_fn(move |req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Request { + req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], 8001)); + let server = Server::bind(&addr).serve(make_service); + let graceful = server.with_graceful_shutdown(shutdown_signal()); + println!("Listening on {}", addr); + if let Err(e) = graceful.await { + eprintln!("server error: {}", e); + } + future::join_all([ + rdsys_stream_handler, + rdsys_resource_receiver, + context_manager, + shutdown_handler, + ]) + .await; +} diff --git a/crates/lox-distributor/src/request_handler.rs b/crates/lox-distributor/src/request_handler.rs new file mode 100644 index 0000000..98682a4 --- /dev/null +++ b/crates/lox-distributor/src/request_handler.rs @@ -0,0 +1,74 @@ +use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode}; + +use std::convert::Infallible; + +use crate::lox_context; +use crate::lox_context::LoxServerContext; + +// Lox Request handling logic for each Lox request/protocol +pub async fn handle( + cloned_context: LoxServerContext, + req: Request, +) -> Result, Infallible> { + println!("Request: {:?}", req); + match req.method() { + &Method::OPTIONS => Ok(Response::builder() + .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) + .header("Access-Control-Allow-Headers", "accept, content-type") + .header("Access-Control-Allow-Methods", "POST") + .status(200) + .body(Body::from("Allow POST")) + .unwrap()), + _ => match (req.method(), req.uri().path()) { + (&Method::POST, "/invite") => { + Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)) + } + (&Method::POST, "/reachability") => { + Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context)) + } + (&Method::POST, "/pubkeys") => { + Ok::<_, Infallible>(lox_context::send_keys(cloned_context)) + } + (&Method::POST, "/openreq") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_open_cred(bytes, cloned_context) + }), + (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_trust_promo(bytes, cloned_context) + }), + (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_trust_migration(bytes, cloned_context) + }), + (&Method::POST, "/levelup") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_level_up(bytes, cloned_context) + }), + (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_issue_invite(bytes, cloned_context) + }), + (&Method::POST, "/redeem") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_redeem_invite(bytes, cloned_context) + }), + (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + // TEST ONLY: Block all existing bridges and add new ones for migration + lox_context::verify_and_send_check_blockage(bytes, cloned_context) + }), + (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_blockage_migration(bytes, cloned_context) + }), + _ => { + // Return 404 not found response. + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found")) + .unwrap()) + } + }, + } +} diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs new file mode 100644 index 0000000..6790fa5 --- /dev/null +++ b/crates/lox-distributor/src/resource_parser.rs @@ -0,0 +1,30 @@ +use lox::bridge_table::{BridgeLine, BRIDGE_BYTES}; +use rdsys_backend::proto::Resource; + +pub fn parse_resource(resource: Resource) -> BridgeLine { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes()); + let resource_uid = resource + .get_uid() + .expect("Unable to get Fingerprint UID of resource"); + let infostr: String = format!( + "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}", + resource.r#type, + resource.blocked_in, + resource.protocol, + resource.fingerprint, + resource.or_addresses, + resource.distribution, + resource.flags, + resource.params, +); + let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + BridgeLine { + addr: ip_bytes, + port: resource.port, + uid_fingerprint: resource_uid, + info: info_bytes, + } +}