diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs
index d79ac30..c9993a0 100644
--- a/crates/lox-distributor/src/lox_context.rs
+++ b/crates/lox-distributor/src/lox_context.rs
@@ -1,26 +1,17 @@
-
-use hyper::{
- body::Bytes,
- header::HeaderValue,
- Body, Response,
-};
+use hyper::{body::Bytes, header::HeaderValue, Body, Response};
use lox::{
- BridgeAuth, BridgeDb, OPENINV_LENGTH,
bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET},
proto::{
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
redeem_invite, trust_promotion,
},
- IssuerPubKey,
+ BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH,
};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
-use std::{
- sync::{Arc, Mutex},
-};
-
+use std::sync::{Arc, Mutex};
#[serde_as]
#[derive(Serialize, Deserialize)]
@@ -84,8 +75,7 @@ pub struct LoxServerContext {
}
impl LoxServerContext {
-
- pub fn append_extra_bridges(&self, bridge: BridgeLine){
+ pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
extra_bridges.push(bridge);
}
@@ -93,12 +83,11 @@ impl LoxServerContext {
pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
- for i in 0..MAX_BRIDGES_PER_BUCKET{
+ for i in 0..MAX_BRIDGES_PER_BUCKET {
return_bridges[i] = extra_bridges.remove(i);
}
return_bridges
-
}
pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) {
@@ -111,8 +100,6 @@ impl LoxServerContext {
let mut db_obj = self.db.lock().unwrap();
let mut extra_bridges = self.extra_bridges.lock().unwrap();
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
-
-
}
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
@@ -133,8 +120,8 @@ impl LoxServerContext {
let mut available_bridge = eb_obj.last();
ba_obj.bridge_replace(&bridgeline, available_bridge, &mut db_obj)
-
}
+
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@@ -273,7 +260,10 @@ pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) ->
prepare_header(trust_promo_resp_str)
}
-pub fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response
{
+pub fn verify_and_send_trust_migration(
+ request: Bytes,
+ context: LoxServerContext,
+) -> Response {
let req: migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.trust_migration(req);
let resp_str = serde_json::to_string(&response).unwrap();
@@ -309,7 +299,10 @@ pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext)
prepare_header(check_blockage_resp_str)
}
-pub fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response {
+pub fn verify_and_send_blockage_migration(
+ request: Bytes,
+ context: LoxServerContext,
+) -> Response {
let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap();
let response = context.blockage_migration(req);
let resp_str = serde_json::to_string(&response).unwrap();
@@ -321,4 +314,4 @@ fn prepare_header(response: String) -> Response {
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
-}
\ No newline at end of file
+}
diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs
index 1dbd0bb..e775fdb 100644
--- a/crates/lox-distributor/src/main.rs
+++ b/crates/lox-distributor/src/main.rs
@@ -1,19 +1,15 @@
use futures::future;
use futures::StreamExt;
use hyper::{
- body,
- header::HeaderValue,
server::conn::AddrStream,
service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server, StatusCode,
+ Body, Request, Response, Server,
};
-use lox::bridge_table::MAX_BRIDGES_PER_BUCKET;
-use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
-
+use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use lox::{BridgeAuth, BridgeDb};
use rdsys_backend::{proto::ResourceDiff, start_stream};
-use serde::{Deserialize};
+use serde::Deserialize;
use std::{
convert::Infallible,
env,
@@ -25,7 +21,10 @@ use std::{
};
mod lox_context;
-use lox_context::LoxServerContext;
+mod request_handler;
+use request_handler::handle;
+mod resource_parser;
+use resource_parser::parse_resource;
use tokio::{
signal, spawn,
@@ -33,70 +32,6 @@ use tokio::{
time::sleep,
};
-// Lox Request handling logic for each Lox request/protocol
-async fn handle(
- cloned_context: LoxServerContext,
- req: Request,
-) -> Result, Infallible> {
- println!("Request: {:?}", req);
- match req.method() {
- &Method::OPTIONS => Ok(Response::builder()
- .header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
- .header("Access-Control-Allow-Headers", "accept, content-type")
- .header("Access-Control-Allow-Methods", "POST")
- .status(200)
- .body(Body::from("Allow POST"))
- .unwrap()),
- _ => match (req.method(), req.uri().path()) {
- (&Method::POST, "/invite") => Ok::<_, Infallible>(lox_context::generate_invite(cloned_context)),
- (&Method::POST, "/reachability") => {
- Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
- }
- (&Method::POST, "/pubkeys") => Ok::<_, Infallible>(lox_context::send_keys(cloned_context)),
- (&Method::POST, "/openreq") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_open_cred(bytes, cloned_context)
- }),
- (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_trust_promo(bytes, cloned_context)
- }),
- (&Method::POST, "/trustmig") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_trust_migration(bytes, cloned_context)
- }),
- (&Method::POST, "/levelup") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_level_up(bytes, cloned_context)
- }),
- (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_issue_invite(bytes, cloned_context)
- }),
- (&Method::POST, "/redeem") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
- }),
- (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- // TEST ONLY: Block all existing bridges and add new ones for migration
- lox_context::verify_and_send_check_blockage(bytes, cloned_context)
- }),
- (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
- let bytes = body::to_bytes(req.into_body()).await.unwrap();
- lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
- }),
- _ => {
- // Return 404 not found response.
- Ok(Response::builder()
- .status(StatusCode::NOT_FOUND)
- .body(Body::from("Not found"))
- .unwrap())
- }
- },
- }
-}
-
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
@@ -104,7 +39,6 @@ async fn shutdown_signal() {
println!("Shut down Lox Server");
}
-
#[derive(Debug, Deserialize)]
struct ResourceInfo {
endpoint: String,
@@ -155,9 +89,7 @@ async fn rdsys_bridge_parser(
async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) {
loop {
let resourcediff = rx.recv().await.unwrap();
- let cmd = Command::Rdsys {
- resourcediff,
- };
+ let cmd = Command::Rdsys { resourcediff };
rdsys_tx.send(cmd).await.unwrap();
sleep(Duration::from_secs(1)).await;
}
@@ -189,48 +121,21 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
while let Some(cmd) = context_rx.recv().await {
use Command::*;
-
match cmd {
Rdsys { resourcediff } => {
if let Some(new_resources) = resourcediff.new {
let mut count = 0;
- let mut bucket= [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
+ let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
for pt in new_resources {
println!("A NEW RESOURCE: {:?}", pt);
for resource in pt.1 {
- let mut ip_bytes: [u8; 16] = [0; 16];
- ip_bytes[..resource.address.len()]
- .copy_from_slice(resource.address.as_bytes());
- let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
- let infostr: String = format!(
- "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
- resource.r#type,
- resource.blocked_in,
- resource.protocol,
- resource.fingerprint,
- resource.or_addresses,
- resource.distribution,
- resource.flags,
- resource.params,
- );
- let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
-
- info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
- let bridgeline = BridgeLine {
- addr: ip_bytes,
- port: resource.port,
- uid_fingerprint: resource_uid,
- info: info_bytes,
- };
+ let bridgeline = parse_resource(resource);
println!("Now it's a bridgeline: {:?}", bridgeline);
if context.unreplaced_bridges.lock().unwrap().len() > 0 {
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res {
- println!(
- "BridgeLine successfully replaced: {:?}",
- bridgeline
- );
+ println!("BridgeLine successfully replaced: {:?}", bridgeline);
} else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
@@ -238,23 +143,25 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
context.new_unreplaced_bridge(bridgeline);
}
} else {
- if count < MAX_BRIDGES_PER_BUCKET-1 {
- bucket[count] = bridgeline;
- count += 1;
- } else {
- // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
- // eventually also do some more fancy grouping of new resources, i.e., by type or region
- context.add_openinv_bucket(bucket);
- count = 0;
- bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
+ if count < MAX_BRIDGES_PER_BUCKET {
+ bucket[count] = bridgeline;
+ count += 1;
+ } else {
+ // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
+ // eventually also do some more fancy grouping of new resources, i.e., by type or region
+ context.add_openinv_bucket(bucket);
+ count = 0;
+ bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
+ }
}
}
- }
}
// Handle the extra buckets that were not allocated already
if count != 0 {
for val in 0..count {
- if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET-1) {
+ if context.extra_bridges.lock().unwrap().len()
+ < (MAX_BRIDGES_PER_BUCKET)
+ {
context.append_extra_bridges(bucket[val]);
} else {
bucket = context.remove_extra_bridges();
@@ -267,31 +174,7 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
for pt in changed_resources {
println!("A NEW CHANGED RESOURCE: {:?}", pt);
for resource in pt.1 {
- let mut ip_bytes: [u8; 16] = [0; 16];
- ip_bytes[..resource.address.len()]
- .copy_from_slice(resource.address.as_bytes());
- let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
- let infostr: String = format!(
- "type={} blocked_in={:?} protocol={} fingerprint= {:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
- resource.r#type,
- resource.blocked_in,
- resource.protocol,
- resource.fingerprint,
- resource.or_addresses,
- resource.distribution,
- resource.flags,
- resource.params,
- );
- let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
-
- info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
- let bridgeline = BridgeLine {
- addr: ip_bytes,
- port: resource.port,
- uid_fingerprint: resource_uid,
- info: info_bytes,
- };
-
+ let bridgeline = parse_resource(resource);
println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline);
if res {
@@ -304,7 +187,6 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
let bucket = context.remove_extra_bridges();
context.add_spare_bucket(bucket);
}
-
}
}
}
@@ -319,44 +201,20 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
for pt in gone_resources {
println!("A NEW GONE RESOURCE: {:?}", pt);
for resource in pt.1 {
- let mut ip_bytes: [u8; 16] = [0; 16];
- ip_bytes[..resource.address.len()]
- .copy_from_slice(resource.address.as_bytes());
- let resource_uid = resource.get_uid().expect("Unable to get Fingerprint UID of resource");
- let infostr: String = format!(
- "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
- resource.r#type,
- resource.blocked_in,
- resource.protocol,
- resource.fingerprint,
- resource.or_addresses,
- resource.distribution,
- resource.flags,
- resource.params,
- );
- let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
-
- info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
- let bridgeline = BridgeLine {
- addr: ip_bytes,
- port: resource.port,
- uid_fingerprint: resource_uid,
- info: info_bytes,
- };
+ let bridgeline = parse_resource(resource);
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res {
- println!(
- "BridgeLine successfully replaced: {:?}",
- bridgeline
- );
+ println!("BridgeLine successfully replaced: {:?}", bridgeline);
} else {
// Add the bridge to the list of unreplaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
- println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline);
+ println!(
+ "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
+ bridgeline
+ );
context.new_unreplaced_bridge(bridgeline);
}
-
}
}
}
@@ -375,12 +233,11 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
//TODO probably do something else here
}
- */
+ */
context.allocate_leftover_bridges();
context.encrypt_table();
sleep(Duration::from_millis(1)).await;
}
-
Request { req, sender } => {
let response = handle(context.clone(), req).await;
if let Err(e) = sender.send(response) {
diff --git a/crates/lox-distributor/src/request_handler.rs b/crates/lox-distributor/src/request_handler.rs
new file mode 100644
index 0000000..98682a4
--- /dev/null
+++ b/crates/lox-distributor/src/request_handler.rs
@@ -0,0 +1,74 @@
+use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode};
+
+use std::convert::Infallible;
+
+use crate::lox_context;
+use crate::lox_context::LoxServerContext;
+
+// Lox Request handling logic for each Lox request/protocol
+pub async fn handle(
+ cloned_context: LoxServerContext,
+ req: Request,
+) -> Result, Infallible> {
+ println!("Request: {:?}", req);
+ match req.method() {
+ &Method::OPTIONS => Ok(Response::builder()
+ .header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
+ .header("Access-Control-Allow-Headers", "accept, content-type")
+ .header("Access-Control-Allow-Methods", "POST")
+ .status(200)
+ .body(Body::from("Allow POST"))
+ .unwrap()),
+ _ => match (req.method(), req.uri().path()) {
+ (&Method::POST, "/invite") => {
+ Ok::<_, Infallible>(lox_context::generate_invite(cloned_context))
+ }
+ (&Method::POST, "/reachability") => {
+ Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
+ }
+ (&Method::POST, "/pubkeys") => {
+ Ok::<_, Infallible>(lox_context::send_keys(cloned_context))
+ }
+ (&Method::POST, "/openreq") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_open_cred(bytes, cloned_context)
+ }),
+ (&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_trust_promo(bytes, cloned_context)
+ }),
+ (&Method::POST, "/trustmig") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_trust_migration(bytes, cloned_context)
+ }),
+ (&Method::POST, "/levelup") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_level_up(bytes, cloned_context)
+ }),
+ (&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_issue_invite(bytes, cloned_context)
+ }),
+ (&Method::POST, "/redeem") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
+ }),
+ (&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ // TEST ONLY: Block all existing bridges and add new ones for migration
+ lox_context::verify_and_send_check_blockage(bytes, cloned_context)
+ }),
+ (&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
+ let bytes = body::to_bytes(req.into_body()).await.unwrap();
+ lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
+ }),
+ _ => {
+ // Return 404 not found response.
+ Ok(Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Not found"))
+ .unwrap())
+ }
+ },
+ }
+}
diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs
new file mode 100644
index 0000000..6790fa5
--- /dev/null
+++ b/crates/lox-distributor/src/resource_parser.rs
@@ -0,0 +1,30 @@
+use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
+use rdsys_backend::proto::Resource;
+
+pub fn parse_resource(resource: Resource) -> BridgeLine {
+ let mut ip_bytes: [u8; 16] = [0; 16];
+ ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
+ let resource_uid = resource
+ .get_uid()
+ .expect("Unable to get Fingerprint UID of resource");
+ let infostr: String = format!(
+ "type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
+ resource.r#type,
+ resource.blocked_in,
+ resource.protocol,
+ resource.fingerprint,
+ resource.or_addresses,
+ resource.distribution,
+ resource.flags,
+ resource.params,
+);
+ let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
+
+ info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
+ BridgeLine {
+ addr: ip_bytes,
+ port: resource.port,
+ uid_fingerprint: resource_uid,
+ info: info_bytes,
+ }
+}