Refactors lox-distributor to make division of tasks clearer

This commit is contained in:
onyinyang 2023-05-11 11:11:57 -04:00
parent 6d2361bfbc
commit bae093ef20
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
4 changed files with 151 additions and 197 deletions

View File

@ -1,26 +1,17 @@
use hyper::{
body::Bytes,
header::HeaderValue,
Body, Response,
};
use hyper::{body::Bytes, header::HeaderValue, Body, Response};
use lox::{
BridgeAuth, BridgeDb, OPENINV_LENGTH,
bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET},
proto::{
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
redeem_invite, trust_promotion,
},
IssuerPubKey,
BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH,
};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::{
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};
#[serde_as]
#[derive(Serialize, Deserialize)]
@ -84,8 +75,7 @@ pub struct LoxServerContext {
}
impl LoxServerContext {
pub fn append_extra_bridges(&self, bridge: BridgeLine){
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
extra_bridges.push(bridge);
}
@ -93,12 +83,11 @@ impl LoxServerContext {
pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
for i in 0..MAX_BRIDGES_PER_BUCKET{
for i in 0..MAX_BRIDGES_PER_BUCKET {
return_bridges[i] = extra_bridges.remove(i);
}
return_bridges
}
pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) {
@ -111,8 +100,6 @@ impl LoxServerContext {
let mut db_obj = self.db.lock().unwrap();
let mut extra_bridges = self.extra_bridges.lock().unwrap();
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
}
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
@ -133,8 +120,8 @@ impl LoxServerContext {
let mut available_bridge = eb_obj.last();
ba_obj.bridge_replace(&bridgeline, available_bridge, &mut db_obj)
}
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@ -273,7 +260,10 @@ pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) ->
prepare_header(trust_promo_resp_str)
}
pub fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response<Body> {
pub fn verify_and_send_trust_migration(
request: Bytes,
context: LoxServerContext,
) -> Response<Body> {
let req: migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.trust_migration(req);
let resp_str = serde_json::to_string(&response).unwrap();
@ -309,7 +299,10 @@ pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext)
prepare_header(check_blockage_resp_str)
}
pub fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response<Body> {
pub fn verify_and_send_blockage_migration(
request: Bytes,
context: LoxServerContext,
) -> Response<Body> {
let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.blockage_migration(req);
let resp_str = serde_json::to_string(&response).unwrap();

View File

@ -1,19 +1,15 @@
use futures::future;
use futures::StreamExt;
use hyper::{
body,
header::HeaderValue,
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode,
Body, Request, Response, Server,
};
use lox::bridge_table::MAX_BRIDGES_PER_BUCKET;
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use lox::{BridgeAuth, BridgeDb};
use rdsys_backend::{proto::ResourceDiff, start_stream};
use serde::{Deserialize};
use serde::Deserialize;
use std::{
convert::Infallible,
env,
@ -25,7 +21,10 @@ use std::{
};
mod lox_context;
use lox_context::LoxServerContext;
mod request_handler;
use request_handler::handle;
mod resource_parser;
use resource_parser::parse_resource;
use tokio::{
signal, spawn,
@ -33,70 +32,6 @@ use tokio::{
time::sleep,
};
// Lox Request handling logic for each Lox request/protocol
async fn handle(
cloned_context: LoxServerContext,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
println!("Request: {:?}", req);
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match (req.method(), req.uri().path()) {
(&Method::POST, "/invite") => Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)),
(&Method::POST, "/reachability") => {
Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
}
(&Method::POST, "/pubkeys") => Ok::<_, Infallible>(lox_context::send_keys(cloned_context)),
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_open_cred(bytes, cloned_context)
}),
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_promo(bytes, cloned_context)
}),
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_migration(bytes, cloned_context)
}),
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_level_up(bytes, cloned_context)
}),
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_issue_invite(bytes, cloned_context)
}),
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
}),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
lox_context::verify_and_send_check_blockage(bytes, cloned_context)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
}),
_ => {
// Return 404 not found response.
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))
.unwrap())
}
},
}
}
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
@ -104,7 +39,6 @@ async fn shutdown_signal() {
println!("Shut down Lox Server");
}
#[derive(Debug, Deserialize)]
struct ResourceInfo {
endpoint: String,
@ -155,9 +89,7 @@ async fn rdsys_bridge_parser(
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) {
loop {
let resourcediff = rx.recv().await.unwrap();
let cmd = Command::Rdsys {
resourcediff,
};
let cmd = Command::Rdsys { resourcediff };
rdsys_tx.send(cmd).await.unwrap();
sleep(Duration::from_secs(1)).await;
}
@ -189,48 +121,21 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
while let Some(cmd) = context_rx.recv().await {
use Command::*;
match cmd {
Rdsys { resourcediff } => {
if let Some(new_resources) = resourcediff.new {
let mut count = 0;
let mut bucket= [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
for pt in new_resources {
println!("A NEW RESOURCE: {:?}", pt);
for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
let bridgeline = parse_resource(resource);
println!("Now it's a bridgeline: {:?}", bridgeline);
if context.unreplaced_bridges.lock().unwrap().len() > 0 {
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res {
println!(
"BridgeLine successfully replaced: {:?}",
bridgeline
);
println!("BridgeLine successfully replaced: {:?}", bridgeline);
} else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
@ -238,23 +143,25 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
context.new_unreplaced_bridge(bridgeline);
}
} else {
if count < MAX_BRIDGES_PER_BUCKET-1 {
bucket[count] = bridgeline;
count += 1;
} else {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
context.add_openinv_bucket(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
context.add_openinv_bucket(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
}
}
}
}
}
// Handle the extra buckets that were not allocated already
if count != 0 {
for val in 0..count {
if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET-1) {
if context.extra_bridges.lock().unwrap().len()
< (MAX_BRIDGES_PER_BUCKET)
{
context.append_extra_bridges(bucket[val]);
} else {
bucket = context.remove_extra_bridges();
@ -267,31 +174,7 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
for pt in changed_resources {
println!("A NEW CHANGED RESOURCE: {:?}", pt);
for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint= {:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
let bridgeline = parse_resource(resource);
println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline);
if res {
@ -304,7 +187,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
let bucket = context.remove_extra_bridges();
context.add_spare_bucket(bucket);
}
}
}
}
@ -319,44 +201,20 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
for pt in gone_resources {
println!("A NEW GONE RESOURCE: {:?}", pt);
for resource in pt.1 {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()]
.copy_from_slice(resource.address.as_bytes());
let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bridgeline = BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
};
let bridgeline = parse_resource(resource);
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res {
println!(
"BridgeLine successfully replaced: {:?}",
bridgeline
);
println!("BridgeLine successfully replaced: {:?}", bridgeline);
} else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline);
println!(
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
bridgeline
);
context.new_unreplaced_bridge(bridgeline);
}
}
}
}
@ -375,12 +233,11 @@ async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
//TODO probably do something else here
}
*/
*/
context.allocate_leftover_bridges();
context.encrypt_table();
sleep(Duration::from_millis(1)).await;
}
Request { req, sender } => {
let response = handle(context.clone(), req).await;
if let Err(e) = sender.send(response) {

View File

@ -0,0 +1,74 @@
use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode};
use std::convert::Infallible;
use crate::lox_context;
use crate::lox_context::LoxServerContext;
// Lox Request handling logic for each Lox request/protocol
pub async fn handle(
cloned_context: LoxServerContext,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
println!("Request: {:?}", req);
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match (req.method(), req.uri().path()) {
(&Method::POST, "/invite") => {
Ok::<_, Infallible>(lox_context::generate_invite(cloned_context))
}
(&Method::POST, "/reachability") => {
Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
}
(&Method::POST, "/pubkeys") => {
Ok::<_, Infallible>(lox_context::send_keys(cloned_context))
}
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_open_cred(bytes, cloned_context)
}),
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_promo(bytes, cloned_context)
}),
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_trust_migration(bytes, cloned_context)
}),
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_level_up(bytes, cloned_context)
}),
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_issue_invite(bytes, cloned_context)
}),
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
}),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
lox_context::verify_and_send_check_blockage(bytes, cloned_context)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
}),
_ => {
// Return 404 not found response.
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))
.unwrap())
}
},
}
}

View File

@ -0,0 +1,30 @@
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
use rdsys_backend::proto::Resource;
pub fn parse_resource(resource: Resource) -> BridgeLine {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
let resource_uid = resource
.get_uid()
.expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
}
}