From 1018dd827f5d20a3907e7f5a23479062cfd389ef Mon Sep 17 00:00:00 2001 From: onyinyang Date: Fri, 3 Mar 2023 14:09:13 -0500 Subject: [PATCH] Add resourcediff stream stub for rdsys integration --- crates/lox-distributor/Cargo.toml | 7 ++- crates/lox-distributor/src/main.rs | 96 +++++++++++++++++++++++++----- 2 files changed, 86 insertions(+), 17 deletions(-) diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index fe70cb6..1e9f513 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -7,13 +7,14 @@ edition = "2021" [dependencies] base64 = "0.13" -hyper = "0.13" +hyper = { version = "0.14.24", features = ["server"] } hex_fmt = "0.3" -tokio = { version = "0.2", features = ["macros", "signal"] } +tokio = { version = "1", features = ["full", "macros", "signal"] } rand = "0.7" serde = { version = "1.0", features = ["derive"] } serde_with = "1.9.1" serde_json = "1.0.87" time = "0.2" -lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"} \ No newline at end of file +lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"} +rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"} diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 7b275b0..3932211 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -16,10 +16,11 @@ use lox::bridge_table::{BridgeLine, ENC_BUCKET_BYTES}; use lox::proto; use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH}; use rand::RngCore; +use rdsys_backend::start_stream; use serde::{Deserialize, Serialize}; use serde_json; -use serde_with::{serde_as}; +use serde_with::serde_as; #[serde_as] #[derive(Serialize, Deserialize)] @@ -31,10 +32,12 @@ pub struct Invite { #[serde_as] #[derive(Serialize, Deserialize)] pub struct EncBridgeTable { - #[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")] + #[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")] etable: Vec<[u8; ENC_BUCKET_BYTES]>, } +// Populate Bridgedb from rdsys + /// Create a random BridgeLine for testing ONLY. Do not use in production! /// This was copied directly from lox/src/bridge_table.rs in order /// to easily initialize a bridgedb/lox_auth with structurally @@ -98,8 +101,9 @@ async fn handle( .unwrap()), _ => match (req.method(), req.uri().path()) { (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)), - (&Method::POST, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)), - (&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context.ba)), + (&Method::GET, "/reachability") => { + Ok::<_, Infallible>(send_reachability_cred(context.ba)) + } (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)), (&Method::POST, "/openreq") => Ok::<_, Infallible>({ let bytes = body::to_bytes(req.into_body()).await.unwrap(); @@ -125,6 +129,15 @@ async fn handle( let bytes = body::to_bytes(req.into_body()).await.unwrap(); verify_and_send_redeem_invite(bytes, context.ba) }), + (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + // TEST ONLY: Block all existing bridges and add new ones for migration + verify_and_send_check_blockage(bytes, context.ba, context.db) + }), + (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + verify_and_send_blockage_migration(bytes, context.ba) + }), _ => { // Return 404 not found response. Ok(Response::builder() @@ -136,7 +149,6 @@ async fn handle( } } - fn generate_invite(db: Arc>) -> Response { let obj = db.lock().unwrap(); let invite = Invite { @@ -153,14 +165,13 @@ fn generate_invite(db: Arc>) -> Response { // Return the serialized encrypted bridge table fn send_reachability_cred(ba: Arc>) -> Response { let mut ba_obj = ba.lock().unwrap(); - ba_obj.advance_days(85); // FOR TESTING ONLY + ba_obj.advance_days(85); // FOR TESTING ONLY println!("Today's date according to server: {}", ba_obj.today()); let enc_table = ba_obj.enc_bridge_table().clone(); - let etable = EncBridgeTable { - etable: enc_table, - }; + let etable = EncBridgeTable { etable: enc_table }; let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap())); - resp.headers_mut().insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); + resp.headers_mut() + .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp } @@ -228,18 +239,58 @@ fn verify_and_send_redeem_invite(request: Bytes, ba: Arc>) -> let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap(); let mut ba_obj = ba.lock().unwrap(); let response = ba_obj.handle_redeem_invite(req).unwrap(); - let issue_invite_resp_str = serde_json::to_string(&response).unwrap(); - prepare_header(issue_invite_resp_str) + let redeem_invite_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(redeem_invite_resp_str) } +fn verify_and_send_check_blockage( + request: Bytes, + ba: Arc>, + db: Arc>, +) -> Response { + let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap(); + let mut ba_obj = ba.lock().unwrap(); + // Created 5 buckets initially, so we will add 5 hot spares (for migration) and + // block all of the existing buckets to trigger migration table propagation + // FOR TESTING ONLY, ADD 5 NEW Buckets + for _ in 0..5 { + let bucket = [random(), random(), random()]; + ba_obj.add_spare_bucket(bucket); + } + ba_obj.enc_bridge_table(); + // FOR TESTING ONLY, BLOCK ALL BRIDGES + let mut db_obj = db.lock().unwrap(); + for index in 0..5 { + let b0 = ba_obj.bridge_table.buckets[index][0]; + let b1 = ba_obj.bridge_table.buckets[index][1]; + let b2 = ba_obj.bridge_table.buckets[index][2]; + ba_obj.bridge_unreachable(&b0, &mut db_obj); + ba_obj.bridge_unreachable(&b1, &mut db_obj); + ba_obj.bridge_unreachable(&b2, &mut db_obj); + } + ba_obj.enc_bridge_table(); + let response = ba_obj.handle_check_blockage(req).unwrap(); + let check_blockage_resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(check_blockage_resp_str) +} + +fn verify_and_send_blockage_migration( + request: Bytes, + ba: Arc>, +) -> Response { + let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap(); + let mut ba_obj = ba.lock().unwrap(); + let response = ba_obj.handle_blockage_migration(req).unwrap(); + let resp_str = serde_json::to_string(&response).unwrap(); + prepare_header(resp_str) +} fn prepare_header(response: String) -> Response { let mut resp = Response::new(Body::from(response)); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp - } async fn shutdown_signal() { @@ -248,8 +299,25 @@ async fn shutdown_signal() { .expect("failed to listen for ctrl+c signal"); } -#[tokio::main] +// Initial bridgedb setup then: +// Listen for updates and return new bridges to be added to the bridgedb +async fn load_bridges() { + let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); + let name = String::from("https"); + let token = String::from("HttpsApiTokenPlaceholder"); //Bring in from commmand line + let types = vec![String::from("obfs2"), String::from("scramblesuit")]; + let rx = start_stream(endpoint, name, token, types).await.unwrap(); + for diff in rx { + println!("Received diff: {:?}", diff); //send this through a channel + } +} + +#[tokio::main(worker_threads = 2)] async fn main() { + +// + tokio::spawn(async move { load_bridges().await; }); + // let new_bridgedb = task::spawn(load_bridges()); let num_buckets = 5; // Create and initialize a new db and lox_auth let mut bridgedb = BridgeDb::new();