From bae093ef20c9d2f41a07523a5ed2a58ce75536e3 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Thu, 11 May 2023 11:11:57 -0400 Subject: [PATCH] Refactors lox-distributor to make division of tasks clearer --- crates/lox-distributor/src/lox_context.rs | 37 ++-- crates/lox-distributor/src/main.rs | 207 +++--------------- crates/lox-distributor/src/request_handler.rs | 74 +++++++ crates/lox-distributor/src/resource_parser.rs | 30 +++ 4 files changed, 151 insertions(+), 197 deletions(-) create mode 100644 crates/lox-distributor/src/request_handler.rs create mode 100644 crates/lox-distributor/src/resource_parser.rs diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index d79ac30..c9993a0 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -1,26 +1,17 @@ - -use hyper::{ - body::Bytes, - header::HeaderValue, - Body, Response, -}; +use hyper::{body::Bytes, header::HeaderValue, Body, Response}; use lox::{ - BridgeAuth, BridgeDb, OPENINV_LENGTH, bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET}, proto::{ blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite, redeem_invite, trust_promotion, }, - IssuerPubKey, + BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH, }; use rand::RngCore; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::{ - sync::{Arc, Mutex}, -}; - +use std::sync::{Arc, Mutex}; #[serde_as] #[derive(Serialize, Deserialize)] @@ -84,8 +75,7 @@ pub struct LoxServerContext { } impl LoxServerContext { - - pub fn append_extra_bridges(&self, bridge: BridgeLine){ + pub fn append_extra_bridges(&self, bridge: BridgeLine) { let mut extra_bridges = self.extra_bridges.lock().unwrap(); extra_bridges.push(bridge); } @@ -93,12 +83,11 @@ impl LoxServerContext { pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] { let mut extra_bridges = self.extra_bridges.lock().unwrap(); let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; - for i in 0..MAX_BRIDGES_PER_BUCKET{ + for i in 0..MAX_BRIDGES_PER_BUCKET { return_bridges[i] = extra_bridges.remove(i); } return_bridges - } pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) { @@ -111,8 +100,6 @@ impl LoxServerContext { let mut db_obj = self.db.lock().unwrap(); let mut extra_bridges = self.extra_bridges.lock().unwrap(); ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj); - - } pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { @@ -133,8 +120,8 @@ impl LoxServerContext { let mut available_bridge = eb_obj.last(); ba_obj.bridge_replace(&bridgeline, available_bridge, &mut db_obj) - } + pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); @@ -273,7 +260,10 @@ pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> prepare_header(trust_promo_resp_str) } -pub fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response { +pub fn verify_and_send_trust_migration( + request: Bytes, + context: LoxServerContext, +) -> Response { let req: migration::Request = serde_json::from_slice(&request).unwrap(); let response = context.trust_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); @@ -309,7 +299,10 @@ pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) prepare_header(check_blockage_resp_str) } -pub fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response { +pub fn verify_and_send_blockage_migration( + request: Bytes, + context: LoxServerContext, +) -> Response { let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap(); let response = context.blockage_migration(req); let resp_str = serde_json::to_string(&response).unwrap(); @@ -321,4 +314,4 @@ fn prepare_header(response: String) -> Response { resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp -} \ No newline at end of file +} diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 1dbd0bb..e775fdb 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -1,19 +1,15 @@ use futures::future; use futures::StreamExt; use hyper::{ - body, - header::HeaderValue, server::conn::AddrStream, service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, StatusCode, + Body, Request, Response, Server, }; -use lox::bridge_table::MAX_BRIDGES_PER_BUCKET; -use lox::bridge_table::{BridgeLine, BRIDGE_BYTES}; - +use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; use lox::{BridgeAuth, BridgeDb}; use rdsys_backend::{proto::ResourceDiff, start_stream}; -use serde::{Deserialize}; +use serde::Deserialize; use std::{ convert::Infallible, env, @@ -25,7 +21,10 @@ use std::{ }; mod lox_context; -use lox_context::LoxServerContext; +mod request_handler; +use request_handler::handle; +mod resource_parser; +use resource_parser::parse_resource; use tokio::{ signal, spawn, @@ -33,70 +32,6 @@ use tokio::{ time::sleep, }; -// Lox Request handling logic for each Lox request/protocol -async fn handle( - cloned_context: LoxServerContext, - req: Request, -) -> Result, Infallible> { - println!("Request: {:?}", req); - match req.method() { - &Method::OPTIONS => Ok(Response::builder() - .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) - .header("Access-Control-Allow-Headers", "accept, content-type") - .header("Access-Control-Allow-Methods", "POST") - .status(200) - .body(Body::from("Allow POST")) - .unwrap()), - _ => match (req.method(), req.uri().path()) { - (&Method::POST, "/invite") => Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)), - (&Method::POST, "/reachability") => { - Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context)) - } - (&Method::POST, "/pubkeys") => Ok::<_, Infallible>(lox_context::send_keys(cloned_context)), - (&Method::POST, "/openreq") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_open_cred(bytes, cloned_context) - }), - (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_trust_promo(bytes, cloned_context) - }), - (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_trust_migration(bytes, cloned_context) - }), - (&Method::POST, "/levelup") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_level_up(bytes, cloned_context) - }), - (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_issue_invite(bytes, cloned_context) - }), - (&Method::POST, "/redeem") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_redeem_invite(bytes, cloned_context) - }), - (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - // TEST ONLY: Block all existing bridges and add new ones for migration - lox_context::verify_and_send_check_blockage(bytes, cloned_context) - }), - (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - lox_context::verify_and_send_blockage_migration(bytes, cloned_context) - }), - _ => { - // Return 404 not found response. - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Not found")) - .unwrap()) - } - }, - } -} - async fn shutdown_signal() { tokio::signal::ctrl_c() .await @@ -104,7 +39,6 @@ async fn shutdown_signal() { println!("Shut down Lox Server"); } - #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, @@ -155,9 +89,7 @@ async fn rdsys_bridge_parser( async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resourcediff = rx.recv().await.unwrap(); - let cmd = Command::Rdsys { - resourcediff, - }; + let cmd = Command::Rdsys { resourcediff }; rdsys_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; } @@ -189,48 +121,21 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { while let Some(cmd) = context_rx.recv().await { use Command::*; - match cmd { Rdsys { resourcediff } => { if let Some(new_resources) = resourcediff.new { let mut count = 0; - let mut bucket= [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; for pt in new_resources { println!("A NEW RESOURCE: {:?}", pt); for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource"); - let infostr: String = format!( - "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}", - resource.r#type, - resource.blocked_in, - resource.protocol, - resource.fingerprint, - resource.or_addresses, - resource.distribution, - resource.flags, - resource.params, - ); - let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - uid_fingerprint: resource_uid, - info: info_bytes, - }; + let bridgeline = parse_resource(resource); println!("Now it's a bridgeline: {:?}", bridgeline); if context.unreplaced_bridges.lock().unwrap().len() > 0 { println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); if res { - println!( - "BridgeLine successfully replaced: {:?}", - bridgeline - ); + println!("BridgeLine successfully replaced: {:?}", bridgeline); } else { // Add the bridge to the list of unreplaced bridges in the Lox context and try // again to replace at the next update (nothing changes in the Lox Authority) @@ -238,23 +143,25 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { context.new_unreplaced_bridge(bridgeline); } } else { - if count < MAX_BRIDGES_PER_BUCKET-1 { - bucket[count] = bridgeline; - count += 1; - } else { - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - context.add_openinv_bucket(bucket); - count = 0; - bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + if count < MAX_BRIDGES_PER_BUCKET { + bucket[count] = bridgeline; + count += 1; + } else { + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + context.add_openinv_bucket(bucket); + count = 0; + bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + } } } - } } // Handle the extra buckets that were not allocated already if count != 0 { for val in 0..count { - if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET-1) { + if context.extra_bridges.lock().unwrap().len() + < (MAX_BRIDGES_PER_BUCKET) + { context.append_extra_bridges(bucket[val]); } else { bucket = context.remove_extra_bridges(); @@ -267,31 +174,7 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { for pt in changed_resources { println!("A NEW CHANGED RESOURCE: {:?}", pt); for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource"); - let infostr: String = format!( - "type={} blocked_in={:?} protocol={} fingerprint= {:?} or_addresses={:?} distribution={} flags={:?} params={:?}", - resource.r#type, - resource.blocked_in, - resource.protocol, - resource.fingerprint, - resource.or_addresses, - resource.distribution, - resource.flags, - resource.params, - ); - let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - uid_fingerprint: resource_uid, - info: info_bytes, - }; - + let bridgeline = parse_resource(resource); println!("BridgeLine to be changed: {:?}", bridgeline); let res = context.update_bridge(bridgeline); if res { @@ -304,7 +187,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { let bucket = context.remove_extra_bridges(); context.add_spare_bucket(bucket); } - } } } @@ -319,44 +201,20 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { for pt in gone_resources { println!("A NEW GONE RESOURCE: {:?}", pt); for resource in pt.1 { - let mut ip_bytes: [u8; 16] = [0; 16]; - ip_bytes[..resource.address.len()] - .copy_from_slice(resource.address.as_bytes()); - let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource"); - let infostr: String = format!( - "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}", - resource.r#type, - resource.blocked_in, - resource.protocol, - resource.fingerprint, - resource.or_addresses, - resource.distribution, - resource.flags, - resource.params, - ); - let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; - - info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - let bridgeline = BridgeLine { - addr: ip_bytes, - port: resource.port, - uid_fingerprint: resource_uid, - info: info_bytes, - }; + let bridgeline = parse_resource(resource); println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); if res { - println!( - "BridgeLine successfully replaced: {:?}", - bridgeline - ); + println!("BridgeLine successfully replaced: {:?}", bridgeline); } else { // Add the bridge to the list of unreplaced bridges in the Lox context and try // again to replace at the next update (nothing changes in the Lox Authority) - println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline); + println!( + "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", + bridgeline + ); context.new_unreplaced_bridge(bridgeline); } - } } } @@ -375,12 +233,11 @@ async fn context_manager(mut context_rx: mpsc::Receiver) { println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); //TODO probably do something else here } - */ + */ context.allocate_leftover_bridges(); context.encrypt_table(); sleep(Duration::from_millis(1)).await; } - Request { req, sender } => { let response = handle(context.clone(), req).await; if let Err(e) = sender.send(response) { diff --git a/crates/lox-distributor/src/request_handler.rs b/crates/lox-distributor/src/request_handler.rs new file mode 100644 index 0000000..98682a4 --- /dev/null +++ b/crates/lox-distributor/src/request_handler.rs @@ -0,0 +1,74 @@ +use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode}; + +use std::convert::Infallible; + +use crate::lox_context; +use crate::lox_context::LoxServerContext; + +// Lox Request handling logic for each Lox request/protocol +pub async fn handle( + cloned_context: LoxServerContext, + req: Request, +) -> Result, Infallible> { + println!("Request: {:?}", req); + match req.method() { + &Method::OPTIONS => Ok(Response::builder() + .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) + .header("Access-Control-Allow-Headers", "accept, content-type") + .header("Access-Control-Allow-Methods", "POST") + .status(200) + .body(Body::from("Allow POST")) + .unwrap()), + _ => match (req.method(), req.uri().path()) { + (&Method::POST, "/invite") => { + Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)) + } + (&Method::POST, "/reachability") => { + Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context)) + } + (&Method::POST, "/pubkeys") => { + Ok::<_, Infallible>(lox_context::send_keys(cloned_context)) + } + (&Method::POST, "/openreq") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_open_cred(bytes, cloned_context) + }), + (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_trust_promo(bytes, cloned_context) + }), + (&Method::POST, "/trustmig") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_trust_migration(bytes, cloned_context) + }), + (&Method::POST, "/levelup") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_level_up(bytes, cloned_context) + }), + (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_issue_invite(bytes, cloned_context) + }), + (&Method::POST, "/redeem") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_redeem_invite(bytes, cloned_context) + }), + (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + // TEST ONLY: Block all existing bridges and add new ones for migration + lox_context::verify_and_send_check_blockage(bytes, cloned_context) + }), + (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + lox_context::verify_and_send_blockage_migration(bytes, cloned_context) + }), + _ => { + // Return 404 not found response. + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found")) + .unwrap()) + } + }, + } +} diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs new file mode 100644 index 0000000..6790fa5 --- /dev/null +++ b/crates/lox-distributor/src/resource_parser.rs @@ -0,0 +1,30 @@ +use lox::bridge_table::{BridgeLine, BRIDGE_BYTES}; +use rdsys_backend::proto::Resource; + +pub fn parse_resource(resource: Resource) -> BridgeLine { + let mut ip_bytes: [u8; 16] = [0; 16]; + ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes()); + let resource_uid = resource + .get_uid() + .expect("Unable to get Fingerprint UID of resource"); + let infostr: String = format!( + "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}", + resource.r#type, + resource.blocked_in, + resource.protocol, + resource.fingerprint, + resource.or_addresses, + resource.distribution, + resource.flags, + resource.params, +); + let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; + + info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); + BridgeLine { + addr: ip_bytes, + port: resource.port, + uid_fingerprint: resource_uid, + info: info_bytes, + } +}