Add resourcediff stream stub for rdsys integration

This commit is contained in:
onyinyang 2023-03-03 14:09:13 -05:00
parent 9e2d341751
commit 1018dd827f
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
2 changed files with 86 additions and 17 deletions

View File

@ -7,13 +7,14 @@ edition = "2021"
[dependencies] [dependencies]
base64 = "0.13" base64 = "0.13"
hyper = "0.13" hyper = { version = "0.14.24", features = ["server"] }
hex_fmt = "0.3" hex_fmt = "0.3"
tokio = { version = "0.2", features = ["macros", "signal"] } tokio = { version = "1", features = ["full", "macros", "signal"] }
rand = "0.7" rand = "0.7"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_with = "1.9.1" serde_with = "1.9.1"
serde_json = "1.0.87" serde_json = "1.0.87"
time = "0.2" time = "0.2"
lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"} lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"}
rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"}

View File

@ -16,10 +16,11 @@ use lox::bridge_table::{BridgeLine, ENC_BUCKET_BYTES};
use lox::proto; use lox::proto;
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH}; use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
use rand::RngCore; use rand::RngCore;
use rdsys_backend::start_stream;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
use serde_with::{serde_as}; use serde_with::serde_as;
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -31,10 +32,12 @@ pub struct Invite {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct EncBridgeTable { pub struct EncBridgeTable {
#[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")] #[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")]
etable: Vec<[u8; ENC_BUCKET_BYTES]>, etable: Vec<[u8; ENC_BUCKET_BYTES]>,
} }
// Populate Bridgedb from rdsys
/// Create a random BridgeLine for testing ONLY. Do not use in production! /// Create a random BridgeLine for testing ONLY. Do not use in production!
/// This was copied directly from lox/src/bridge_table.rs in order /// This was copied directly from lox/src/bridge_table.rs in order
/// to easily initialize a bridgedb/lox_auth with structurally /// to easily initialize a bridgedb/lox_auth with structurally
@ -98,8 +101,9 @@ async fn handle(
.unwrap()), .unwrap()),
_ => match (req.method(), req.uri().path()) { _ => match (req.method(), req.uri().path()) {
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)), (&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
(&Method::POST, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)), (&Method::GET, "/reachability") => {
(&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context.ba)), Ok::<_, Infallible>(send_reachability_cred(context.ba))
}
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)), (&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)),
(&Method::POST, "/openreq") => Ok::<_, Infallible>({ (&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap(); let bytes = body::to_bytes(req.into_body()).await.unwrap();
@ -125,6 +129,15 @@ async fn handle(
let bytes = body::to_bytes(req.into_body()).await.unwrap(); let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_redeem_invite(bytes, context.ba) verify_and_send_redeem_invite(bytes, context.ba)
}), }),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
verify_and_send_check_blockage(bytes, context.ba, context.db)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_blockage_migration(bytes, context.ba)
}),
_ => { _ => {
// Return 404 not found response. // Return 404 not found response.
Ok(Response::builder() Ok(Response::builder()
@ -136,7 +149,6 @@ async fn handle(
} }
} }
fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> { fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
let obj = db.lock().unwrap(); let obj = db.lock().unwrap();
let invite = Invite { let invite = Invite {
@ -153,14 +165,13 @@ fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
// Return the serialized encrypted bridge table // Return the serialized encrypted bridge table
fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> { fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let mut ba_obj = ba.lock().unwrap(); let mut ba_obj = ba.lock().unwrap();
ba_obj.advance_days(85); // FOR TESTING ONLY ba_obj.advance_days(85); // FOR TESTING ONLY
println!("Today's date according to server: {}", ba_obj.today()); println!("Today's date according to server: {}", ba_obj.today());
let enc_table = ba_obj.enc_bridge_table().clone(); let enc_table = ba_obj.enc_bridge_table().clone();
let etable = EncBridgeTable { let etable = EncBridgeTable { etable: enc_table };
etable: enc_table,
};
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap())); let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
resp.headers_mut().insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp resp
} }
@ -228,18 +239,58 @@ fn verify_and_send_redeem_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) ->
let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap(); let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap(); let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_redeem_invite(req).unwrap(); let response = ba_obj.handle_redeem_invite(req).unwrap();
let issue_invite_resp_str = serde_json::to_string(&response).unwrap(); let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(issue_invite_resp_str) prepare_header(redeem_invite_resp_str)
} }
fn verify_and_send_check_blockage(
request: Bytes,
ba: Arc<Mutex<BridgeAuth>>,
db: Arc<Mutex<BridgeDb>>,
) -> Response<Body> {
let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
// block all of the existing buckets to trigger migration table propagation
// FOR TESTING ONLY, ADD 5 NEW Buckets
for _ in 0..5 {
let bucket = [random(), random(), random()];
ba_obj.add_spare_bucket(bucket);
}
ba_obj.enc_bridge_table();
// FOR TESTING ONLY, BLOCK ALL BRIDGES
let mut db_obj = db.lock().unwrap();
for index in 0..5 {
let b0 = ba_obj.bridge_table.buckets[index][0];
let b1 = ba_obj.bridge_table.buckets[index][1];
let b2 = ba_obj.bridge_table.buckets[index][2];
ba_obj.bridge_unreachable(&b0, &mut db_obj);
ba_obj.bridge_unreachable(&b1, &mut db_obj);
ba_obj.bridge_unreachable(&b2, &mut db_obj);
}
ba_obj.enc_bridge_table();
let response = ba_obj.handle_check_blockage(req).unwrap();
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(check_blockage_resp_str)
}
fn verify_and_send_blockage_migration(
request: Bytes,
ba: Arc<Mutex<BridgeAuth>>,
) -> Response<Body> {
let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_blockage_migration(req).unwrap();
let resp_str = serde_json::to_string(&response).unwrap();
prepare_header(resp_str)
}
fn prepare_header(response: String) -> Response<Body> { fn prepare_header(response: String) -> Response<Body> {
let mut resp = Response::new(Body::from(response)); let mut resp = Response::new(Body::from(response));
resp.headers_mut() resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp resp
} }
async fn shutdown_signal() { async fn shutdown_signal() {
@ -248,8 +299,25 @@ async fn shutdown_signal() {
.expect("failed to listen for ctrl+c signal"); .expect("failed to listen for ctrl+c signal");
} }
#[tokio::main] // Initial bridgedb setup then:
// Listen for updates and return new bridges to be added to the bridgedb
async fn load_bridges() {
let endpoint = String::from("http://127.0.0.1:7100/resource-stream");
let name = String::from("https");
let token = String::from("HttpsApiTokenPlaceholder"); //Bring in from commmand line
let types = vec![String::from("obfs2"), String::from("scramblesuit")];
let rx = start_stream(endpoint, name, token, types).await.unwrap();
for diff in rx {
println!("Received diff: {:?}", diff); //send this through a channel
}
}
#[tokio::main(worker_threads = 2)]
async fn main() { async fn main() {
//
tokio::spawn(async move { load_bridges().await; });
// let new_bridgedb = task::spawn(load_bridges());
let num_buckets = 5; let num_buckets = 5;
// Create and initialize a new db and lox_auth // Create and initialize a new db and lox_auth
let mut bridgedb = BridgeDb::new(); let mut bridgedb = BridgeDb::new();