diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs
index 3e0834a..4013bf1 100644
--- a/crates/lox-distributor/src/main.rs
+++ b/crates/lox-distributor/src/main.rs
@@ -19,6 +19,8 @@ use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
use rand::RngCore;
use rdsys_backend::{proto::ResourceDiff, start_stream};
use serde::{Deserialize, Serialize};
+use serde_json;
+use serde_with::serde_as;
use std::{
convert::Infallible,
env,
@@ -28,12 +30,10 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
-use serde_json;
-use serde_with::serde_as;
use tokio::{
- spawn,
+ signal, spawn,
sync::{broadcast, mpsc, oneshot},
- time::sleep, signal,
+ time::sleep,
};
#[serde_as]
@@ -113,10 +113,16 @@ impl LoxServerContext {
ba_obj.add_openinv_bridges(bucket, &mut db_obj);
}
- fn add_unreachable(&self, bridgeline: BridgeLine) {
+ fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
- ba_obj.bridge_unreachable(&bridgeline, &mut db_obj);
+ ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
+ }
+
+ fn update_bridge(&self, bridgeline: BridgeLine) -> bool {
+ let mut ba_obj = self.ba.lock().unwrap();
+ let mut db_obj = self.db.lock().unwrap();
+ ba_obj.bridge_update(&bridgeline, &mut db_obj)
}
fn advance_days_TEST(&self, num: u16) {
@@ -274,7 +280,6 @@ async fn handle(
}
}
-
// Generate and return an open invitation token
fn generate_invite(context: LoxServerContext) -> Response
{
let invite = context.gen_invite();
@@ -378,8 +383,12 @@ async fn shutdown_signal() {
println!("Shut down Lox Server");
}
-async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>) {
- tokio:: select! {
+async fn rdsys_stream(
+ rtype: ResourceInfo,
+ tx: mpsc::Sender,
+ mut kill: broadcast::Receiver<()>,
+) {
+ tokio::select! {
start_rdsys_stream = rdsys_sender(rtype, tx) => start_rdsys_stream ,
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
}
@@ -389,10 +398,10 @@ async fn rdsys_stream(rtype: ResourceInfo, tx: mpsc::Sender, mut
// in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) {
- let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
+ let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
.await
.expect("rdsys stream initialization failed. Start rdsys or check config.json");
- sleep(Duration::from_millis(1)).await;
+ sleep(Duration::from_millis(1)).await;
for diff in rstream {
println!("Received diff: {:?}", diff); //send this through a channel
tx.send(diff).await.unwrap();
@@ -400,8 +409,12 @@ async fn rdsys_sender(rtype: ResourceInfo, tx: mpsc::Sender) {
}
}
-async fn rdsys_bridge_parser(rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) {
- tokio:: select! {
+async fn rdsys_bridge_parser(
+ rdsys_tx: mpsc::Sender,
+ rx: mpsc::Receiver,
+ mut kill: broadcast::Receiver<()>,
+) {
+ tokio::select! {
start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser ,
_ = kill.recv() => {println!("Shut down bridge_parser"); return},
}
@@ -420,8 +433,11 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>) {
- tokio:: select! {
+async fn create_context_manager(
+ context_rx: mpsc::Receiver,
+ mut kill: broadcast::Receiver<()>,
+) {
+ tokio::select! {
create_context = context_manager(context_rx) => create_context,
_ = kill.recv() => {println!("Shut down context_manager"); return},
}
@@ -431,34 +447,33 @@ async fn create_context_manager(context_rx: mpsc::Receiver, mut kill: b
// that the DB can be updated from the rdsys stream and client requests
// can be responded to with an updated BridgeDB state
async fn context_manager(mut context_rx: mpsc::Receiver) {
+ let bridgedb = BridgeDb::new();
+ let lox_auth = BridgeAuth::new(bridgedb.pubkey);
- let bridgedb = BridgeDb::new();
- let lox_auth = BridgeAuth::new(bridgedb.pubkey);
+ let context = LoxServerContext {
+ db: Arc::new(Mutex::new(bridgedb)),
+ ba: Arc::new(Mutex::new(lox_auth)),
+ };
- let context = LoxServerContext {
- db: Arc::new(Mutex::new(bridgedb)),
- ba: Arc::new(Mutex::new(lox_auth)),
- };
+ while let Some(cmd) = context_rx.recv().await {
+ use Command::*;
- while let Some(cmd) = context_rx.recv().await {
- use Command::*;
-
- match cmd {
- Rdsys { resourcediff } => {
- for new_resource in resourcediff.new {
- for pt in new_resource {
- println!("A NEW RESOURCE: {:?}", pt);
- let mut bucket = [
- BridgeLine::default(),
- BridgeLine::default(),
- BridgeLine::default(),
- ];
- let mut count = 0;
- for resource in pt.1 {
- let mut ip_bytes: [u8; 16] = [0; 16];
- ip_bytes[..resource.address.len()]
- .copy_from_slice(resource.address.as_bytes());
- let infostr: String = format!(
+ match cmd {
+ Rdsys { resourcediff } => {
+ for new_resource in resourcediff.new {
+ for pt in new_resource {
+ println!("A NEW RESOURCE: {:?}", pt);
+ let mut bucket = [
+ BridgeLine::default(),
+ BridgeLine::default(),
+ BridgeLine::default(),
+ ];
+ let mut count = 0;
+ for resource in pt.1 {
+ let mut ip_bytes: [u8; 16] = [0; 16];
+ ip_bytes[..resource.address.len()]
+ .copy_from_slice(resource.address.as_bytes());
+ let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
@@ -469,43 +484,77 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
resource.flags,
resource.params,
);
- let mut info_bytes: [u8; BRIDGE_BYTES - 18] =
- [0; BRIDGE_BYTES - 18];
+ let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
- info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
- let bridgeline = BridgeLine {
- addr: ip_bytes,
- port: resource.port,
- info: info_bytes,
- };
+ info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
+ let bridgeline = BridgeLine {
+ addr: ip_bytes,
+ port: resource.port,
+ info: info_bytes,
+ };
- println!("Now it's a bridgeline: {:?}", bridgeline);
- if count < 2 {
- bucket[count] = bridgeline;
- count += 1;
- } else {
- context.add_openinv_bucket(bucket);
- count = 0;
- bucket = [
- BridgeLine::default(),
- BridgeLine::default(),
- BridgeLine::default(),
- ];
- }
- }
- }
- }
- for changed_resource in resourcediff.changed {
- println!("A NEW CHANGED RESOURCE: {:?}", changed_resource);
- }
- for gone_resource in resourcediff.gone {
- for pt in gone_resource {
- println!("A NEW GONE RESOURCE: {:?}", pt);
- for resource in pt.1 {
- let mut ip_bytes: [u8; 16] = [0; 16];
- ip_bytes[..resource.address.len()]
- .copy_from_slice(resource.address.as_bytes());
- let infostr: String = format!(
+ println!("Now it's a bridgeline: {:?}", bridgeline);
+ if count < 2 {
+ bucket[count] = bridgeline;
+ count += 1;
+ } else {
+ context.add_openinv_bucket(bucket);
+ count = 0;
+ bucket = [
+ BridgeLine::default(),
+ BridgeLine::default(),
+ BridgeLine::default(),
+ ];
+ }
+ }
+ }
+ }
+ for changed_resource in resourcediff.changed {
+ for pt in changed_resource {
+ println!("A NEW CHANGED RESOURCE: {:?}", pt);
+ for resource in pt.1 {
+ let mut ip_bytes: [u8; 16] = [0; 16];
+ ip_bytes[..resource.address.len()]
+ .copy_from_slice(resource.address.as_bytes());
+ let infostr: String = format!(
+ "type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
+ resource.r#type,
+ resource.blocked_in,
+ resource.protocol,
+ resource.fingerprint,
+ resource.or_addresses,
+ resource.distribution,
+ resource.flags,
+ resource.params,
+ );
+ let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
+
+ info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
+ let bridgeline = BridgeLine {
+ addr: ip_bytes,
+ port: resource.port,
+ info: info_bytes,
+ };
+
+ println!("BridgeLine to be changed: {:?}", bridgeline);
+ let res = context.update_bridge(bridgeline);
+ if res {
+ println!("BridgeLine successfully updated: {:?}", bridgeline);
+ } else {
+ println!("'Changed' BridgeLine NOT UPDATED!! : {:?}", bridgeline);
+ //TODO probably do something else here
+ }
+ }
+ }
+ }
+ for gone_resource in resourcediff.gone {
+ for pt in gone_resource {
+ println!("A NEW GONE RESOURCE: {:?}", pt);
+ for resource in pt.1 {
+ let mut ip_bytes: [u8; 16] = [0; 16];
+ ip_bytes[..resource.address.len()]
+ .copy_from_slice(resource.address.as_bytes());
+ let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
@@ -516,40 +565,46 @@ async fn context_manager(mut context_rx: mpsc::Receiver) {
resource.flags,
resource.params,
);
- let mut info_bytes: [u8; BRIDGE_BYTES - 18] =
- [0; BRIDGE_BYTES - 18];
+ let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
- info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
- let bridgeline = BridgeLine {
- addr: ip_bytes,
- port: resource.port,
- info: info_bytes,
- };
+ info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
+ let bridgeline = BridgeLine {
+ addr: ip_bytes,
+ port: resource.port,
+ info: info_bytes,
+ };
- println!("Now it's a bridgeline: {:?}", bridgeline);
- context.add_unreachable(bridgeline);
- }
- }
- }
- context.encrypt_table();
- sleep(Duration::from_millis(1)).await;
- }
- Request { req, sender } => {
- let response = handle(context.clone(), req).await;
- if let Err(e) = sender.send(response) {
- eprintln!("Server Response Error: {:?}", e);
- };
- sleep(Duration::from_millis(1)).await;
- }
- Shutdown { shutdown_sig} => {
+ println!("BridgeLine to be removed: {:?}", bridgeline);
+ let res = context.add_unreachable(bridgeline);
+ if res {
+ println!(
+ "BridgeLine successfully marked unreachable: {:?}",
+ bridgeline
+ );
+ } else {
+ println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
+ //TODO probably do something else here
+ }
+ }
+ }
+ }
+ context.encrypt_table();
+ sleep(Duration::from_millis(1)).await;
+ }
+ Request { req, sender } => {
+ let response = handle(context.clone(), req).await;
+ if let Err(e) = sender.send(response) {
+ eprintln!("Server Response Error: {:?}", e);
+ };
+ sleep(Duration::from_millis(1)).await;
+ }
+ Shutdown { shutdown_sig } => {
println!("Sending Shutdown Signal, all threads should shutdown.");
drop(shutdown_sig);
println!("Shutdown Sent.");
- }
- }
- }
-
-
+ }
+ }
+ }
}
// Each of the commands that the Context Manager handles
@@ -564,7 +619,7 @@ enum Command {
},
Shutdown {
shutdown_sig: broadcast::Sender<()>,
- }
+ },
}
#[tokio::main]
@@ -573,22 +628,21 @@ async fn main() {
let file = File::open(&args[1]).expect("Should have been able to read config.json file");
let reader = BufReader::new(file);
// Read the JSON contents of the file as a ResourceInfo
- let rtype: ResourceInfo = serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed.");
+ let rtype: ResourceInfo =
+ serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed.");
let (rdsys_tx, context_rx) = mpsc::channel(32);
let request_tx = rdsys_tx.clone();
let shutdown_cmd_tx = rdsys_tx.clone();
-
-
// create the shutdown broadcast channel and clone for every thread
- let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
- let kill_stream = shutdown_tx.subscribe();
- let kill_parser = shutdown_tx.subscribe();
- let kill_context= shutdown_tx.subscribe();
+ let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
+ let kill_stream = shutdown_tx.subscribe();
+ let kill_parser = shutdown_tx.subscribe();
+ let kill_context = shutdown_tx.subscribe();
-// Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx
- let shutdown_handler = spawn(async move {
+ // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx
+ let shutdown_handler = spawn(async move {
tokio::select! {
_ = signal::ctrl_c() => {
let cmd = Command::Shutdown {
@@ -600,15 +654,16 @@ async fn main() {
_ = shutdown_rx.recv().await;
}
}
- });
+ });
-
- let context_manager = spawn(async move { create_context_manager(context_rx, kill_context).await });
+ let context_manager =
+ spawn(async move { create_context_manager(context_rx, kill_context).await });
let (tx, rx) = mpsc::channel(32);
let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await });
- let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });
+ let rdsys_resource_receiver =
+ spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });
let make_service = make_service_fn(move |_conn: &AddrStream| {
let request_tx = request_tx.clone();
@@ -634,13 +689,11 @@ async fn main() {
if let Err(e) = graceful.await {
eprintln!("server error: {}", e);
}
- future::join_all([
+ future::join_all([
rdsys_stream_handler,
rdsys_resource_receiver,
context_manager,
shutdown_handler,
])
.await;
-
-
}