Refactors lox-distributor to make division of tasks clearer

This commit is contained in:
onyinyang 2023-05-11 11:11:57 -04:00
parent da4d7c962f
commit 3e3f50c215
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
4 changed files with 151 additions and 197 deletions

View File

@ -1,26 +1,17 @@
use hyper::{body::Bytes, header::HeaderValue, Body, Response};
use hyper::{
body::Bytes,
header::HeaderValue,
Body, Response,
};
use lox::{ use lox::{
BridgeAuth, BridgeDb, OPENINV_LENGTH,
bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET}, bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET},
proto::{ proto::{
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite, blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
redeem_invite, trust_promotion, redeem_invite, trust_promotion,
}, },
IssuerPubKey, BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH,
}; };
use rand::RngCore; use rand::RngCore;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::serde_as; use serde_with::serde_as;
use std::{ use std::sync::{Arc, Mutex};
sync::{Arc, Mutex},
};
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -84,7 +75,6 @@ pub struct LoxServerContext {
} }
impl LoxServerContext { impl LoxServerContext {
pub fn append_extra_bridges(&self, bridge: BridgeLine) { pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap(); let mut extra_bridges = self.extra_bridges.lock().unwrap();
extra_bridges.push(bridge); extra_bridges.push(bridge);
@ -98,7 +88,6 @@ impl LoxServerContext {
} }
return_bridges return_bridges
} }
pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) { pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) {
@ -111,8 +100,6 @@ impl LoxServerContext {
let mut db_obj = self.db.lock().unwrap(); let mut db_obj = self.db.lock().unwrap();
let mut extra_bridges = self.extra_bridges.lock().unwrap(); let mut extra_bridges = self.extra_bridges.lock().unwrap();
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj); ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
} }
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
@ -133,8 +120,8 @@ impl LoxServerContext {
let mut available_bridge = eb_obj.last(); let mut available_bridge = eb_obj.last();
ba_obj.bridge_replace(&bridgeline, available_bridge, &mut db_obj) ba_obj.bridge_replace(&bridgeline, available_bridge, &mut db_obj)
} }
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap(); let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap(); let mut db_obj = self.db.lock().unwrap();
@ -273,7 +260,10 @@ pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) ->
prepare_header(trust_promo_resp_str) prepare_header(trust_promo_resp_str)
} }
pub fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response<Body> { pub fn verify_and_send_trust_migration(
request: Bytes,
context: LoxServerContext,
) -> Response<Body> {
let req: migration::Request = serde_json::from_slice(&request).unwrap(); let req: migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.trust_migration(req); let response = context.trust_migration(req);
let resp_str = serde_json::to_string(&response).unwrap(); let resp_str = serde_json::to_string(&response).unwrap();
@ -309,7 +299,10 @@ pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext)
prepare_header(check_blockage_resp_str) prepare_header(check_blockage_resp_str)
} }
pub fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response<Body> { pub fn verify_and_send_blockage_migration(
request: Bytes,
context: LoxServerContext,
) -> Response<Body> {
let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap(); let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.blockage_migration(req); let response = context.blockage_migration(req);
let resp_str = serde_json::to_string(&response).unwrap(); let resp_str = serde_json::to_string(&response).unwrap();

View File

@ -1,19 +1,15 @@
use futures::future; use futures::future;
use futures::StreamExt; use futures::StreamExt;
use hyper::{ use hyper::{
body,
header::HeaderValue,
server::conn::AddrStream, server::conn::AddrStream,
service::{make_service_fn, service_fn}, service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode, Body, Request, Response, Server,
}; };
use lox::bridge_table::MAX_BRIDGES_PER_BUCKET; use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
use lox::{BridgeAuth, BridgeDb}; use lox::{BridgeAuth, BridgeDb};
use rdsys_backend::{proto::ResourceDiff, start_stream}; use rdsys_backend::{proto::ResourceDiff, start_stream};
use serde::{Deserialize}; use serde::Deserialize;
use std::{ use std::{
convert::Infallible, convert::Infallible,
env, env,
@ -25,7 +21,10 @@ use std::{
}; };
mod lox_context; mod lox_context;
use lox_context::LoxServerContext; mod request_handler;
use request_handler::handle;
mod resource_parser;
use resource_parser::parse_resource;
use tokio::{ use tokio::{
signal, spawn, signal, spawn,
@ -33,70 +32,6 @@ use tokio::{
time::sleep, time::sleep,
}; };
// Lox Request handling logic for each Lox request/protocol
async fn handle(
cloned_context: LoxServerContext,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
println!("Request: {:?}", req);
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match (req.method(), req.uri().path()) {
(&Method::POST, "/invite") => Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)),
(&Method::POST, "/reachability") => {
Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
}
(&Method::POST, "/pubkeys") => Ok::<_, Infallible>(lox_context::send_keys(cloned_context)),
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_open_cred(bytes, cloned_context)
}),
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_promo(bytes, cloned_context)
}),
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_migration(bytes, cloned_context)
}),
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_level_up(bytes, cloned_context)
}),
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_issue_invite(bytes, cloned_context)
}),
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
}),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
lox_context::verify_and_send_check_blockage(bytes, cloned_context)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
}),
_ => {
// Return 404 not found response.
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))
.unwrap())
}
},
}
}
async fn shutdown_signal() { async fn shutdown_signal() {
tokio::signal::ctrl_c() tokio::signal::ctrl_c()
.await .await
@ -104,7 +39,6 @@ async fn shutdown_signal() {
println!("Shut down Lox Server"); println!("Shut down Lox Server");
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct ResourceInfo { struct ResourceInfo {
endpoint: String, endpoint: String,
@ -155,9 +89,7 @@ async fn rdsys_bridge_parser(
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) { async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) {
loop { loop {
let resourcediff = rx.recv().await.unwrap(); let resourcediff = rx.recv().await.unwrap();
let cmd = Command::Rdsys { let cmd = Command::Rdsys { resourcediff };
resourcediff,
};
rdsys_tx.send(cmd).await.unwrap(); rdsys_tx.send(cmd).await.unwrap();
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
@ -189,7 +121,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
while let Some(cmd) = context_rx.recv().await { while let Some(cmd) = context_rx.recv().await {
use Command::*; use Command::*;
match cmd { match cmd {
Rdsys { resourcediff } => { Rdsys { resourcediff } => {
if let Some(new_resources) = resourcediff.new { if let Some(new_resources) = resourcediff.new {
@ -198,39 +129,13 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
for pt in new_resources { for pt in new_resources {
println!("A NEW RESOURCE: {:?}", pt); println!("A NEW RESOURCE: {:?}", pt);
for resource in pt.1 { for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16]; let bridgeline = parse_resource(resource);
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
println!("Now it's a bridgeline: {:?}", bridgeline); println!("Now it's a bridgeline: {:?}", bridgeline);
if context.unreplaced_bridges.lock().unwrap().len() > 0 { if context.unreplaced_bridges.lock().unwrap().len() > 0 {
println!("BridgeLine to be replaced: {:?}", bridgeline); println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline); let res = context.replace_with_new(bridgeline);
if res { if res {
println!( println!("BridgeLine successfully replaced: {:?}", bridgeline);
"BridgeLine successfully replaced: {:?}",
bridgeline
);
} else { } else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try // Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority) // again to replace at the next update (nothing changes in the Lox Authority)
@ -238,7 +143,7 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
context.new_unreplaced_bridge(bridgeline); context.new_unreplaced_bridge(bridgeline);
} }
} else { } else {
if count < MAX_BRIDGES_PER_BUCKET-1 { if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline; bucket[count] = bridgeline;
count += 1; count += 1;
} else { } else {
@ -254,7 +159,9 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
// Handle the extra buckets that were not allocated already // Handle the extra buckets that were not allocated already
if count != 0 { if count != 0 {
for val in 0..count { for val in 0..count {
if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET-1) { if context.extra_bridges.lock().unwrap().len()
< (MAX_BRIDGES_PER_BUCKET)
{
context.append_extra_bridges(bucket[val]); context.append_extra_bridges(bucket[val]);
} else { } else {
bucket = context.remove_extra_bridges(); bucket = context.remove_extra_bridges();
@ -267,31 +174,7 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
for pt in changed_resources { for pt in changed_resources {
println!("A NEW CHANGED RESOURCE: {:?}", pt); println!("A NEW CHANGED RESOURCE: {:?}", pt);
for resource in pt.1 { for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16]; let bridgeline = parse_resource(resource);
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint= {:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
println!("BridgeLine to be changed: {:?}", bridgeline); println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline); let res = context.update_bridge(bridgeline);
if res { if res {
@ -304,7 +187,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
let bucket = context.remove_extra_bridges(); let bucket = context.remove_extra_bridges();
context.add_spare_bucket(bucket); context.add_spare_bucket(bucket);
} }
} }
} }
} }
@ -319,44 +201,20 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
for pt in gone_resources { for pt in gone_resources {
println!("A NEW GONE RESOURCE: {:?}", pt); println!("A NEW GONE RESOURCE: {:?}", pt);
for resource in pt.1 { for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16]; let bridgeline = parse_resource(resource);
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
println!("BridgeLine to be replaced: {:?}", bridgeline); println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline); let res = context.replace_with_new(bridgeline);
if res { if res {
println!( println!("BridgeLine successfully replaced: {:?}", bridgeline);
"BridgeLine successfully replaced: {:?}",
bridgeline
);
} else { } else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try // Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority) // again to replace at the next update (nothing changes in the Lox Authority)
println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline); println!(
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
bridgeline
);
context.new_unreplaced_bridge(bridgeline); context.new_unreplaced_bridge(bridgeline);
} }
} }
} }
} }
@ -380,7 +238,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
context.encrypt_table(); context.encrypt_table();
sleep(Duration::from_millis(1)).await; sleep(Duration::from_millis(1)).await;
} }
Request { req, sender } => { Request { req, sender } => {
let response = handle(context.clone(), req).await; let response = handle(context.clone(), req).await;
if let Err(e) = sender.send(response) { if let Err(e) = sender.send(response) {

View File

@ -0,0 +1,74 @@
use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode};
use std::convert::Infallible;
use crate::lox_context;
use crate::lox_context::LoxServerContext;
// Lox Request handling logic for each Lox request/protocol
pub async fn handle(
cloned_context: LoxServerContext,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
println!("Request: {:?}", req);
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match (req.method(), req.uri().path()) {
(&Method::POST, "/invite") => {
Ok::<_, Infallible>(lox_context::generate_invite(cloned_context))
}
(&Method::POST, "/reachability") => {
Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
}
(&Method::POST, "/pubkeys") => {
Ok::<_, Infallible>(lox_context::send_keys(cloned_context))
}
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_open_cred(bytes, cloned_context)
}),
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_promo(bytes, cloned_context)
}),
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_migration(bytes, cloned_context)
}),
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_level_up(bytes, cloned_context)
}),
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_issue_invite(bytes, cloned_context)
}),
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
}),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
lox_context::verify_and_send_check_blockage(bytes, cloned_context)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
}),
_ => {
// Return 404 not found response.
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))
.unwrap())
}
},
}
}

View File

@ -0,0 +1,30 @@
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
use rdsys_backend::proto::Resource;
pub fn parse_resource(resource: Resource) -> BridgeLine {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
let resource_uid = resource
.get_uid()
.expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
}
}