Refactoring to add a channel to pass messages to client
This commit is contained in:
parent
c715d80632
commit
34f1d0399f
|
@ -10,12 +10,12 @@ async-channel = "1.8.0"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
hyper = { version = "0.14.24", features = ["server"] }
|
hyper = { version = "0.14.24", features = ["server"] }
|
||||||
hex_fmt = "0.3"
|
hex_fmt = "0.3"
|
||||||
|
futures = "0.3.26"
|
||||||
tokio = { version = "1", features = ["full", "macros", "signal"] }
|
tokio = { version = "1", features = ["full", "macros", "signal"] }
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_with = "1.9.1"
|
serde_with = "1.9.1"
|
||||||
serde_json = "1.0.87"
|
serde_json = "1.0.87"
|
||||||
time = "0.2"
|
|
||||||
|
|
||||||
lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"}
|
lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"}
|
||||||
rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"}
|
rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use async_channel::{Sender, Receiver};
|
||||||
|
use futures::future;
|
||||||
use hyper::{
|
use hyper::{
|
||||||
body,
|
body,
|
||||||
body::Bytes,
|
body::Bytes,
|
||||||
|
@ -7,7 +9,13 @@ use hyper::{
|
||||||
Body, Method, Request, Response, Server, StatusCode,
|
Body, Method, Request, Response, Server, StatusCode,
|
||||||
};
|
};
|
||||||
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES, ENC_BUCKET_BYTES};
|
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES, ENC_BUCKET_BYTES};
|
||||||
use lox::proto;
|
use lox::{
|
||||||
|
proto::{
|
||||||
|
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
|
||||||
|
redeem_invite, trust_promotion,
|
||||||
|
},
|
||||||
|
IssuerPubKey,
|
||||||
|
};
|
||||||
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
|
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
|
||||||
use rand::RngCore;
|
use rand::RngCore;
|
||||||
use rdsys_backend::{proto::ResourceDiff, start_stream, ResourceStream};
|
use rdsys_backend::{proto::ResourceDiff, start_stream, ResourceStream};
|
||||||
|
@ -19,11 +27,12 @@ use std::{
|
||||||
io::BufReader,
|
io::BufReader,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use serde_with::serde_as;
|
use serde_with::serde_as;
|
||||||
use tokio::{spawn, sync::mpsc};
|
use tokio::{spawn, time::sleep, sync::mpsc};
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -95,6 +104,110 @@ struct LoxServerContext {
|
||||||
ba: Arc<Mutex<BridgeAuth>>,
|
ba: Arc<Mutex<BridgeAuth>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl LoxServerContext {
|
||||||
|
fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
let mut db_obj = self.db.lock().unwrap();
|
||||||
|
ba_obj.add_openinv_bridges(bucket, &mut db_obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_unreachable(&self, bridgeline: BridgeLine) {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
let mut db_obj = self.db.lock().unwrap();
|
||||||
|
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn advance_days_TEST(&self, num: u16) {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.advance_days(num); // FOR TESTING ONLY
|
||||||
|
println!("Today's date according to server: {}", ba_obj.today());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encrypt_table(&self) -> Vec<[u8; ENC_BUCKET_BYTES]> {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.enc_bridge_table().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pubkeys(&self) -> Vec<IssuerPubKey> {
|
||||||
|
let ba_obj = self.ba.lock().unwrap();
|
||||||
|
// vector of public keys (to serialize)
|
||||||
|
vec![
|
||||||
|
ba_obj.lox_pub.clone(),
|
||||||
|
ba_obj.migration_pub.clone(),
|
||||||
|
ba_obj.migrationkey_pub.clone(),
|
||||||
|
ba_obj.reachability_pub.clone(),
|
||||||
|
ba_obj.invitation_pub.clone(),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gen_invite(&self) -> Invite {
|
||||||
|
let obj = self.db.lock().unwrap();
|
||||||
|
return Invite {
|
||||||
|
invite: obj.invite(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_inv(&self, req: open_invite::Request) -> open_invite::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_open_invite(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn trust_promo(&self, req: trust_promotion::Request) -> trust_promotion::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_trust_promotion(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn trust_migration(&self, req: migration::Request) -> migration::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_migration(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn level_up(&self, req: level_up::Request) -> level_up::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_level_up(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn issue_invite(&self, req: issue_invite::Request) -> issue_invite::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_issue_invite(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn redeem_invite(&self, req: redeem_invite::Request) -> redeem_invite::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_redeem_invite(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_blockage(&self, req: check_blockage::Request) -> check_blockage::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
|
||||||
|
// block all of the existing buckets to trigger migration table propagation
|
||||||
|
// FOR TESTING ONLY, ADD 5 NEW Buckets
|
||||||
|
for _ in 0..5 {
|
||||||
|
let bucket = [random(), random(), random()];
|
||||||
|
ba_obj.add_spare_bucket(bucket);
|
||||||
|
}
|
||||||
|
ba_obj.enc_bridge_table();
|
||||||
|
|
||||||
|
// FOR TESTING ONLY, BLOCK ALL BRIDGES
|
||||||
|
let mut db_obj = self.db.lock().unwrap();
|
||||||
|
for index in 0..5 {
|
||||||
|
let b0 = ba_obj.bridge_table.buckets[index][0];
|
||||||
|
let b1 = ba_obj.bridge_table.buckets[index][1];
|
||||||
|
let b2 = ba_obj.bridge_table.buckets[index][2];
|
||||||
|
ba_obj.bridge_unreachable(&b0, &mut db_obj);
|
||||||
|
ba_obj.bridge_unreachable(&b1, &mut db_obj);
|
||||||
|
ba_obj.bridge_unreachable(&b2, &mut db_obj);
|
||||||
|
}
|
||||||
|
ba_obj.enc_bridge_table();
|
||||||
|
ba_obj.handle_check_blockage(req).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blockage_migration(&self, req: blockage_migration::Request) -> blockage_migration::Response {
|
||||||
|
let mut ba_obj = self.ba.lock().unwrap();
|
||||||
|
ba_obj.handle_blockage_migration(req).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle(
|
async fn handle(
|
||||||
context: LoxServerContext,
|
context: LoxServerContext,
|
||||||
// addr: SocketAddr,
|
// addr: SocketAddr,
|
||||||
|
@ -110,43 +223,41 @@ async fn handle(
|
||||||
.body(Body::from("Allow POST"))
|
.body(Body::from("Allow POST"))
|
||||||
.unwrap()),
|
.unwrap()),
|
||||||
_ => match (req.method(), req.uri().path()) {
|
_ => match (req.method(), req.uri().path()) {
|
||||||
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
|
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context)),
|
||||||
(&Method::GET, "/reachability") => {
|
(&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context)),
|
||||||
Ok::<_, Infallible>(send_reachability_cred(context.ba))
|
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context)),
|
||||||
}
|
|
||||||
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)),
|
|
||||||
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
|
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_open_cred(bytes, context.ba)
|
verify_and_send_open_cred(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
|
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_trust_promo(bytes, context.ba)
|
verify_and_send_trust_promo(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
|
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_trust_migration(bytes, context.ba)
|
verify_and_send_trust_migration(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
|
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_level_up(bytes, context.ba)
|
verify_and_send_level_up(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
|
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_issue_invite(bytes, context.ba)
|
verify_and_send_issue_invite(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
|
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_redeem_invite(bytes, context.ba)
|
verify_and_send_redeem_invite(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
|
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
// TEST ONLY: Block all existing bridges and add new ones for migration
|
// TEST ONLY: Block all existing bridges and add new ones for migration
|
||||||
verify_and_send_check_blockage(bytes, context.ba, context.db)
|
verify_and_send_check_blockage(bytes, context)
|
||||||
}),
|
}),
|
||||||
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
|
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_blockage_migration(bytes, context.ba)
|
verify_and_send_blockage_migration(bytes, context)
|
||||||
}),
|
}),
|
||||||
_ => {
|
_ => {
|
||||||
// Return 404 not found response.
|
// Return 404 not found response.
|
||||||
|
@ -159,12 +270,8 @@ async fn handle(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
|
fn generate_invite(context: LoxServerContext) -> Response<Body> {
|
||||||
let obj = db.lock().unwrap();
|
let invite = context.gen_invite();
|
||||||
let invite = Invite {
|
|
||||||
invite: obj.invite(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let token = serde_json::to_string(&invite).unwrap();
|
let token = serde_json::to_string(&invite).unwrap();
|
||||||
let mut resp = Response::new(Body::from(token));
|
let mut resp = Response::new(Body::from(token));
|
||||||
resp.headers_mut()
|
resp.headers_mut()
|
||||||
|
@ -173,11 +280,9 @@ fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the serialized encrypted bridge table
|
// Return the serialized encrypted bridge table
|
||||||
fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn send_reachability_cred(context: LoxServerContext) -> Response<Body> {
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
context.advance_days_TEST(85); // FOR TESTING ONLY
|
||||||
ba_obj.advance_days(85); // FOR TESTING ONLY
|
let enc_table = context.encrypt_table();
|
||||||
println!("Today's date according to server: {}", ba_obj.today());
|
|
||||||
let enc_table = ba_obj.enc_bridge_table().clone();
|
|
||||||
let etable = EncBridgeTable { etable: enc_table };
|
let etable = EncBridgeTable { etable: enc_table };
|
||||||
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
|
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
|
||||||
resp.headers_mut()
|
resp.headers_mut()
|
||||||
|
@ -185,113 +290,69 @@ fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
||||||
resp
|
resp
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_keys(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn send_keys(context: LoxServerContext) -> Response<Body> {
|
||||||
let ba_obj = ba.lock().unwrap();
|
let pubkeys = context.pubkeys();
|
||||||
// vector of public keys (to serialize)
|
|
||||||
let ba_obj_pubkeys = vec![
|
|
||||||
&ba_obj.lox_pub,
|
|
||||||
&ba_obj.migration_pub,
|
|
||||||
&ba_obj.migrationkey_pub,
|
|
||||||
&ba_obj.reachability_pub,
|
|
||||||
&ba_obj.invitation_pub,
|
|
||||||
];
|
|
||||||
println!("Today's date according to server: {}", ba_obj.today());
|
|
||||||
|
|
||||||
let mut resp = Response::new(Body::from(serde_json::to_string(&ba_obj_pubkeys).unwrap()));
|
let mut resp = Response::new(Body::from(serde_json::to_string(&pubkeys).unwrap()));
|
||||||
resp.headers_mut()
|
resp.headers_mut()
|
||||||
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
||||||
resp
|
resp
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_open_cred(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_open_cred(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::open_invite::Request = serde_json::from_slice(&request).unwrap();
|
let req: open_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let response = context.open_inv(req);
|
||||||
let response = ba_obj.handle_open_invite(req).unwrap();
|
|
||||||
let open_invite_resp_str = serde_json::to_string(&response).unwrap();
|
let open_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(open_invite_resp_str)
|
prepare_header(open_invite_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_trust_promo(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::trust_promotion::Request = serde_json::from_slice(&request).unwrap();
|
let req: trust_promotion::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
context.advance_days_TEST(31); // FOR TESTING ONLY
|
||||||
ba_obj.advance_days(31); // FOR TESTING ONLY
|
let response = context.trust_promo(req);
|
||||||
println!("Today's date according to server: {}", ba_obj.today());
|
|
||||||
let response = ba_obj.handle_trust_promotion(req).unwrap();
|
|
||||||
let trust_promo_resp_str = serde_json::to_string(&response).unwrap();
|
let trust_promo_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(trust_promo_resp_str)
|
prepare_header(trust_promo_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_trust_migration(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_trust_migration(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::migration::Request = serde_json::from_slice(&request).unwrap();
|
let req: migration::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let response = context.trust_migration(req);
|
||||||
let response = ba_obj.handle_migration(req).unwrap();
|
|
||||||
let resp_str = serde_json::to_string(&response).unwrap();
|
let resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(resp_str)
|
prepare_header(resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_level_up(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_level_up(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::level_up::Request = serde_json::from_slice(&request).unwrap();
|
let req: level_up::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let response = context.level_up(req);
|
||||||
let response = ba_obj.handle_level_up(req).unwrap();
|
|
||||||
let level_up_resp_str = serde_json::to_string(&response).unwrap();
|
let level_up_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(level_up_resp_str)
|
prepare_header(level_up_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_issue_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_issue_invite(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::issue_invite::Request = serde_json::from_slice(&request).unwrap();
|
let req: issue_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let response = context.issue_invite(req);
|
||||||
let response = ba_obj.handle_issue_invite(req).unwrap();
|
|
||||||
let issue_invite_resp_str = serde_json::to_string(&response).unwrap();
|
let issue_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(issue_invite_resp_str)
|
prepare_header(issue_invite_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_redeem_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn verify_and_send_redeem_invite(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap();
|
let req: redeem_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let response = context.redeem_invite(req);
|
||||||
let response = ba_obj.handle_redeem_invite(req).unwrap();
|
|
||||||
let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
|
let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(redeem_invite_resp_str)
|
prepare_header(redeem_invite_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_check_blockage(
|
fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
request: Bytes,
|
let req: check_blockage::Request = serde_json::from_slice(&request).unwrap();
|
||||||
ba: Arc<Mutex<BridgeAuth>>,
|
|
||||||
db: Arc<Mutex<BridgeDb>>,
|
|
||||||
) -> Response<Body> {
|
|
||||||
let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap();
|
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
|
||||||
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
|
|
||||||
// block all of the existing buckets to trigger migration table propagation
|
|
||||||
// FOR TESTING ONLY, ADD 5 NEW Buckets
|
|
||||||
for _ in 0..5 {
|
|
||||||
let bucket = [random(), random(), random()];
|
|
||||||
ba_obj.add_spare_bucket(bucket);
|
|
||||||
}
|
|
||||||
ba_obj.enc_bridge_table();
|
|
||||||
|
|
||||||
// FOR TESTING ONLY, BLOCK ALL BRIDGES
|
let response = context.check_blockage(req);
|
||||||
let mut db_obj = db.lock().unwrap();
|
|
||||||
for index in 0..5 {
|
|
||||||
let b0 = ba_obj.bridge_table.buckets[index][0];
|
|
||||||
let b1 = ba_obj.bridge_table.buckets[index][1];
|
|
||||||
let b2 = ba_obj.bridge_table.buckets[index][2];
|
|
||||||
ba_obj.bridge_unreachable(&b0, &mut db_obj);
|
|
||||||
ba_obj.bridge_unreachable(&b1, &mut db_obj);
|
|
||||||
ba_obj.bridge_unreachable(&b2, &mut db_obj);
|
|
||||||
}
|
|
||||||
ba_obj.enc_bridge_table();
|
|
||||||
let response = ba_obj.handle_check_blockage(req).unwrap();
|
|
||||||
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
|
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(check_blockage_resp_str)
|
prepare_header(check_blockage_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_and_send_blockage_migration(
|
fn verify_and_send_blockage_migration(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||||
request: Bytes,
|
let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap();
|
||||||
ba: Arc<Mutex<BridgeAuth>>,
|
let response = context.blockage_migration(req);
|
||||||
) -> Response<Body> {
|
|
||||||
let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap();
|
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
|
||||||
let response = ba_obj.handle_blockage_migration(req).unwrap();
|
|
||||||
let resp_str = serde_json::to_string(&response).unwrap();
|
let resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(resp_str)
|
prepare_header(resp_str)
|
||||||
}
|
}
|
||||||
|
@ -309,8 +370,32 @@ async fn shutdown_signal() {
|
||||||
.expect("failed to listen for ctrl+c signal");
|
.expect("failed to listen for ctrl+c signal");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initial bridgedb setup then:
|
async fn rdsys_sender(rstream: ResourceStream, tx: Sender<ResourceDiff>) {
|
||||||
// Listen for updates and return new bridges to be added to the bridged
|
for diff in rstream {
|
||||||
|
println!("Received diff: {:?}", diff); //send this through a channel
|
||||||
|
tx.send(diff).await.unwrap();
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn parse_bridges(rdsys_tx: Sender<Command>, rx: Receiver<ResourceDiff>) {
|
||||||
|
loop {
|
||||||
|
let resourcediff = rx.recv().await.unwrap();
|
||||||
|
let cmd = Command::Rdsys { resourcediff: resourcediff, };
|
||||||
|
rdsys_tx.send(cmd).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Command {
|
||||||
|
Rdsys {
|
||||||
|
resourcediff: ResourceDiff,
|
||||||
|
},
|
||||||
|
Request {
|
||||||
|
request: Request<Body>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run with cargo run -- config.json
|
// Run with cargo run -- config.json
|
||||||
#[tokio::main(worker_threads = 2)]
|
#[tokio::main(worker_threads = 2)]
|
||||||
|
@ -321,6 +406,10 @@ async fn main() {
|
||||||
// Read the JSON contents of the file as a ResourceInfo
|
// Read the JSON contents of the file as a ResourceInfo
|
||||||
let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap();
|
let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap();
|
||||||
|
|
||||||
|
let (rdsys_tx, mut context_rx) = mpsc::channel(32);
|
||||||
|
let request_tx = rdsys_tx.clone();
|
||||||
|
|
||||||
|
let context_manager = spawn(async move {
|
||||||
// pass in distribution of open invite vs. hot spare buckets?
|
// pass in distribution of open invite vs. hot spare buckets?
|
||||||
let bridgedb = BridgeDb::new();
|
let bridgedb = BridgeDb::new();
|
||||||
let lox_auth = BridgeAuth::new(bridgedb.pubkey);
|
let lox_auth = BridgeAuth::new(bridgedb.pubkey);
|
||||||
|
@ -329,141 +418,131 @@ async fn main() {
|
||||||
db: Arc::new(Mutex::new(bridgedb)),
|
db: Arc::new(Mutex::new(bridgedb)),
|
||||||
ba: Arc::new(Mutex::new(lox_auth)),
|
ba: Arc::new(Mutex::new(lox_auth)),
|
||||||
};
|
};
|
||||||
//Sender is resource stream and receiver is bridgedb function (add_openinv_bridges)
|
|
||||||
let (tx, rx) = async_channel::bounded(3);
|
|
||||||
// to populate the bridge db
|
|
||||||
let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
spawn(async move {
|
|
||||||
for diff in rstream {
|
|
||||||
println!("Received diff: {:?}", diff); //send this through a channel
|
|
||||||
tx.send(diff).await.expect("can not add to bridgedb)")
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let context_clone = context.clone();
|
while let Some(cmd) = context_rx.recv().await {
|
||||||
let _ = spawn(async move {
|
use Command::*;
|
||||||
while let resourcediff = rx.recv().await.unwrap() {
|
|
||||||
// spawn(async move {
|
|
||||||
for new_resource in resourcediff.new {
|
|
||||||
for pt in new_resource {
|
|
||||||
println!("A NEW RESOURCE: {:?}", pt);
|
|
||||||
let mut bucket = [
|
|
||||||
BridgeLine::default(),
|
|
||||||
BridgeLine::default(),
|
|
||||||
BridgeLine::default(),
|
|
||||||
];
|
|
||||||
let mut count = 0;
|
|
||||||
for resource in pt.1 {
|
|
||||||
let mut ip_bytes: [u8; 16] = [0; 16];
|
|
||||||
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
|
|
||||||
let infostr: String = format!(
|
|
||||||
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
|
|
||||||
resource.r#type,
|
|
||||||
resource.blocked_in,
|
|
||||||
resource.protocol,
|
|
||||||
resource.fingerprint,
|
|
||||||
resource.or_addresses,
|
|
||||||
resource.distribution,
|
|
||||||
resource.flags,
|
|
||||||
resource.params,
|
|
||||||
);
|
|
||||||
let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
|
|
||||||
|
|
||||||
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
match cmd {
|
||||||
let bridgeline = BridgeLine {
|
Rdsys {resourcediff} => {
|
||||||
addr: ip_bytes,
|
for new_resource in resourcediff.new {
|
||||||
port: resource.port,
|
for pt in new_resource {
|
||||||
info: info_bytes,
|
println!("A NEW RESOURCE: {:?}", pt);
|
||||||
};
|
let mut bucket = [
|
||||||
|
|
||||||
println!("Now it's a bridgeline: {:?}", bridgeline);
|
|
||||||
if count < 2 {
|
|
||||||
bucket[count] = bridgeline;
|
|
||||||
count += 1;
|
|
||||||
} else {
|
|
||||||
let mut ba_obj = context_clone.ba.lock().unwrap();
|
|
||||||
let mut db_obj = context_clone.db.lock().unwrap();
|
|
||||||
ba_obj.add_openinv_bridges(bucket, &mut db_obj);
|
|
||||||
count = 0;
|
|
||||||
bucket = [
|
|
||||||
BridgeLine::default(),
|
BridgeLine::default(),
|
||||||
BridgeLine::default(),
|
BridgeLine::default(),
|
||||||
BridgeLine::default(),
|
BridgeLine::default(),
|
||||||
];
|
];
|
||||||
|
let mut count = 0;
|
||||||
|
for resource in pt.1 {
|
||||||
|
let mut ip_bytes: [u8; 16] = [0; 16];
|
||||||
|
ip_bytes[..resource.address.len()]
|
||||||
|
.copy_from_slice(resource.address.as_bytes());
|
||||||
|
let infostr: String = format!(
|
||||||
|
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
|
||||||
|
resource.r#type,
|
||||||
|
resource.blocked_in,
|
||||||
|
resource.protocol,
|
||||||
|
resource.fingerprint,
|
||||||
|
resource.or_addresses,
|
||||||
|
resource.distribution,
|
||||||
|
resource.flags,
|
||||||
|
resource.params,
|
||||||
|
);
|
||||||
|
let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
|
||||||
|
|
||||||
|
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
||||||
|
let bridgeline = BridgeLine {
|
||||||
|
addr: ip_bytes,
|
||||||
|
port: resource.port,
|
||||||
|
info: info_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("Now it's a bridgeline: {:?}", bridgeline);
|
||||||
|
if count < 2 {
|
||||||
|
bucket[count] = bridgeline;
|
||||||
|
count += 1;
|
||||||
|
} else {
|
||||||
|
context.add_openinv_bucket(bucket);
|
||||||
|
count = 0;
|
||||||
|
bucket = [
|
||||||
|
BridgeLine::default(),
|
||||||
|
BridgeLine::default(),
|
||||||
|
BridgeLine::default(),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
for changed_resource in resourcediff.changed {
|
||||||
}
|
println!("A NEW CHANGED RESOURCE: {:?}", changed_resource);
|
||||||
for changed_resource in resourcediff.changed {
|
|
||||||
println!("A NEW CHANGED RESOURCE: {:?}", changed_resource);
|
|
||||||
}
|
|
||||||
for gone_resource in resourcediff.gone {
|
|
||||||
for pt in gone_resource {
|
|
||||||
println!("A NEW GONE RESOURCE: {:?}", pt);
|
|
||||||
let mut unreachable_bridge = BridgeLine::default();
|
|
||||||
for resource in pt.1 {
|
|
||||||
let mut ip_bytes: [u8; 16] = [0; 16];
|
|
||||||
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
|
|
||||||
let infostr: String = format!(
|
|
||||||
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
|
|
||||||
resource.r#type,
|
|
||||||
resource.blocked_in,
|
|
||||||
resource.protocol,
|
|
||||||
resource.fingerprint,
|
|
||||||
resource.or_addresses,
|
|
||||||
resource.distribution,
|
|
||||||
resource.flags,
|
|
||||||
resource.params,
|
|
||||||
);
|
|
||||||
let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
|
|
||||||
|
|
||||||
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
|
||||||
let bridgeline = BridgeLine {
|
|
||||||
addr: ip_bytes,
|
|
||||||
port: resource.port,
|
|
||||||
info: info_bytes,
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("Now it's a bridgeline: {:?}", bridgeline);
|
|
||||||
let mut ba_obj = context_clone.ba.lock().unwrap();
|
|
||||||
let mut db_obj = context_clone.db.lock().unwrap();
|
|
||||||
ba_obj.bridge_unreachable(&unreachable_bridge, &mut db_obj);
|
|
||||||
}
|
}
|
||||||
|
for gone_resource in resourcediff.gone {
|
||||||
|
for pt in gone_resource {
|
||||||
|
println!("A NEW GONE RESOURCE: {:?}", pt);
|
||||||
|
for resource in pt.1 {
|
||||||
|
let mut ip_bytes: [u8; 16] = [0; 16];
|
||||||
|
ip_bytes[..resource.address.len()]
|
||||||
|
.copy_from_slice(resource.address.as_bytes());
|
||||||
|
let infostr: String = format!(
|
||||||
|
"type={} blocked_in={:?} protocol={} fingerprint={} or_addresses={:?} distribution={} flags={:?} params={:?}",
|
||||||
|
resource.r#type,
|
||||||
|
resource.blocked_in,
|
||||||
|
resource.protocol,
|
||||||
|
resource.fingerprint,
|
||||||
|
resource.or_addresses,
|
||||||
|
resource.distribution,
|
||||||
|
resource.flags,
|
||||||
|
resource.params,
|
||||||
|
);
|
||||||
|
let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES - 18];
|
||||||
|
|
||||||
|
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
||||||
|
let bridgeline = BridgeLine {
|
||||||
|
addr: ip_bytes,
|
||||||
|
port: resource.port,
|
||||||
|
info: info_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("Now it's a bridgeline: {:?}", bridgeline);
|
||||||
|
context.add_unreachable(bridgeline);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
context.encrypt_table();
|
||||||
}
|
}
|
||||||
|
Request {request} => {
|
||||||
|
|
||||||
}
|
}
|
||||||
// Create the encrypted bridge table
|
|
||||||
let mut ba_obj = context_clone.ba.lock().unwrap();
|
|
||||||
ba_obj.enc_bridge_table();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
// let new_bridgedb = task::spawn(load_bridges());
|
|
||||||
// Create and initialize a new db and lox_auth
|
|
||||||
// Make 3 x num_buckets open invitation bridges, in sets of 3
|
|
||||||
/* for _ in 0..num_buckets {
|
|
||||||
let bucket = [random(), random(), random()];
|
|
||||||
lox_auth.add_openinv_bridges(bucket, &mut bridgedb);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
//Sender is resource stream and receiver is bridgedb function (add_openinv_bridges)
|
||||||
|
let (tx, mut rx) = async_channel::unbounded();
|
||||||
|
// to populate the bridge db
|
||||||
|
let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let rdsys_sender_handle = spawn(async {rdsys_sender(rstream, tx).await });
|
||||||
|
|
||||||
|
let bridge_parser = spawn( async {parse_bridges(rdsys_tx, rx).await});
|
||||||
|
|
||||||
|
future::join_all([rdsys_sender_handle, bridge_parser]).await;
|
||||||
|
/*
|
||||||
|
|
||||||
let new_service = make_service_fn(move |_conn: &AddrStream| {
|
let new_service = make_service_fn(move |_conn: &AddrStream| {
|
||||||
let context = context.clone();
|
let service = service_fn(move |req| handle(context, req));
|
||||||
let service = service_fn(move |req| {
|
|
||||||
handle(context.clone(), req)
|
|
||||||
});
|
|
||||||
async move { Ok::<_, Infallible>(service) }
|
async move { Ok::<_, Infallible>(service) }
|
||||||
});
|
});
|
||||||
|
|
||||||
let addr = SocketAddr::from(([127, 0, 0, 1], 8001));
|
let addr = SocketAddr::from(([127, 0, 0, 1], 8001));
|
||||||
let server = Server::bind(&addr).serve(new_service);
|
// let server = Server::bind(&addr).serve(new_service);
|
||||||
let graceful = server.with_graceful_shutdown(shutdown_signal());
|
// let graceful = server.with_graceful_shutdown(shutdown_signal());
|
||||||
println!("Listening on {}", addr);
|
println!("Listening on {}", addr);
|
||||||
|
|
||||||
if let Err(e) = graceful.await {
|
if let Err(e) = graceful.await {
|
||||||
eprintln!("server error: {}", e);
|
eprintln!("server error: {}", e);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue