Add resourcediff stream stub for rdsys integration
This commit is contained in:
parent
efaf68de42
commit
453bcf34ee
|
@ -7,13 +7,14 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
hyper = "0.13"
|
hyper = { version = "0.14.24", features = ["server"] }
|
||||||
hex_fmt = "0.3"
|
hex_fmt = "0.3"
|
||||||
tokio = { version = "0.2", features = ["macros", "signal"] }
|
tokio = { version = "1", features = ["full", "macros", "signal"] }
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_with = "1.9.1"
|
serde_with = "1.9.1"
|
||||||
serde_json = "1.0.87"
|
serde_json = "1.0.87"
|
||||||
time = "0.2"
|
time = "0.2"
|
||||||
|
|
||||||
lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"}
|
lox = { git = "https://gitlab.torproject.org/onyinyang/lox.git"}
|
||||||
|
rdsys_backend = { git = "https://gitlab.torproject.org/cohosh/rdsys-backend-api.git"}
|
||||||
|
|
|
@ -16,10 +16,11 @@ use lox::bridge_table::{BridgeLine, ENC_BUCKET_BYTES};
|
||||||
use lox::proto;
|
use lox::proto;
|
||||||
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
|
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
|
||||||
use rand::RngCore;
|
use rand::RngCore;
|
||||||
|
use rdsys_backend::start_stream;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use serde_with::{serde_as};
|
use serde_with::serde_as;
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -31,10 +32,12 @@ pub struct Invite {
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct EncBridgeTable {
|
pub struct EncBridgeTable {
|
||||||
#[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")]
|
#[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")]
|
||||||
etable: Vec<[u8; ENC_BUCKET_BYTES]>,
|
etable: Vec<[u8; ENC_BUCKET_BYTES]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Populate Bridgedb from rdsys
|
||||||
|
|
||||||
/// Create a random BridgeLine for testing ONLY. Do not use in production!
|
/// Create a random BridgeLine for testing ONLY. Do not use in production!
|
||||||
/// This was copied directly from lox/src/bridge_table.rs in order
|
/// This was copied directly from lox/src/bridge_table.rs in order
|
||||||
/// to easily initialize a bridgedb/lox_auth with structurally
|
/// to easily initialize a bridgedb/lox_auth with structurally
|
||||||
|
@ -98,8 +101,9 @@ async fn handle(
|
||||||
.unwrap()),
|
.unwrap()),
|
||||||
_ => match (req.method(), req.uri().path()) {
|
_ => match (req.method(), req.uri().path()) {
|
||||||
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
|
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
|
||||||
(&Method::POST, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
|
(&Method::GET, "/reachability") => {
|
||||||
(&Method::GET, "/reachability") => Ok::<_, Infallible>(send_reachability_cred(context.ba)),
|
Ok::<_, Infallible>(send_reachability_cred(context.ba))
|
||||||
|
}
|
||||||
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)),
|
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)),
|
||||||
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
|
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
|
@ -125,6 +129,15 @@ async fn handle(
|
||||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
verify_and_send_redeem_invite(bytes, context.ba)
|
verify_and_send_redeem_invite(bytes, context.ba)
|
||||||
}),
|
}),
|
||||||
|
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
|
||||||
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
|
// TEST ONLY: Block all existing bridges and add new ones for migration
|
||||||
|
verify_and_send_check_blockage(bytes, context.ba, context.db)
|
||||||
|
}),
|
||||||
|
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
|
||||||
|
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||||
|
verify_and_send_blockage_migration(bytes, context.ba)
|
||||||
|
}),
|
||||||
_ => {
|
_ => {
|
||||||
// Return 404 not found response.
|
// Return 404 not found response.
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
|
@ -136,7 +149,6 @@ async fn handle(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
|
fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
|
||||||
let obj = db.lock().unwrap();
|
let obj = db.lock().unwrap();
|
||||||
let invite = Invite {
|
let invite = Invite {
|
||||||
|
@ -153,14 +165,13 @@ fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
|
||||||
// Return the serialized encrypted bridge table
|
// Return the serialized encrypted bridge table
|
||||||
fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let mut ba_obj = ba.lock().unwrap();
|
||||||
ba_obj.advance_days(85); // FOR TESTING ONLY
|
ba_obj.advance_days(85); // FOR TESTING ONLY
|
||||||
println!("Today's date according to server: {}", ba_obj.today());
|
println!("Today's date according to server: {}", ba_obj.today());
|
||||||
let enc_table = ba_obj.enc_bridge_table().clone();
|
let enc_table = ba_obj.enc_bridge_table().clone();
|
||||||
let etable = EncBridgeTable {
|
let etable = EncBridgeTable { etable: enc_table };
|
||||||
etable: enc_table,
|
|
||||||
};
|
|
||||||
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
|
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
|
||||||
resp.headers_mut().insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
resp.headers_mut()
|
||||||
|
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
||||||
resp
|
resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,18 +239,58 @@ fn verify_and_send_redeem_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) ->
|
||||||
let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap();
|
let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||||
let mut ba_obj = ba.lock().unwrap();
|
let mut ba_obj = ba.lock().unwrap();
|
||||||
let response = ba_obj.handle_redeem_invite(req).unwrap();
|
let response = ba_obj.handle_redeem_invite(req).unwrap();
|
||||||
let issue_invite_resp_str = serde_json::to_string(&response).unwrap();
|
let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
prepare_header(issue_invite_resp_str)
|
prepare_header(redeem_invite_resp_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn verify_and_send_check_blockage(
|
||||||
|
request: Bytes,
|
||||||
|
ba: Arc<Mutex<BridgeAuth>>,
|
||||||
|
db: Arc<Mutex<BridgeDb>>,
|
||||||
|
) -> Response<Body> {
|
||||||
|
let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap();
|
||||||
|
let mut ba_obj = ba.lock().unwrap();
|
||||||
|
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
|
||||||
|
// block all of the existing buckets to trigger migration table propagation
|
||||||
|
// FOR TESTING ONLY, ADD 5 NEW Buckets
|
||||||
|
for _ in 0..5 {
|
||||||
|
let bucket = [random(), random(), random()];
|
||||||
|
ba_obj.add_spare_bucket(bucket);
|
||||||
|
}
|
||||||
|
ba_obj.enc_bridge_table();
|
||||||
|
|
||||||
|
// FOR TESTING ONLY, BLOCK ALL BRIDGES
|
||||||
|
let mut db_obj = db.lock().unwrap();
|
||||||
|
for index in 0..5 {
|
||||||
|
let b0 = ba_obj.bridge_table.buckets[index][0];
|
||||||
|
let b1 = ba_obj.bridge_table.buckets[index][1];
|
||||||
|
let b2 = ba_obj.bridge_table.buckets[index][2];
|
||||||
|
ba_obj.bridge_unreachable(&b0, &mut db_obj);
|
||||||
|
ba_obj.bridge_unreachable(&b1, &mut db_obj);
|
||||||
|
ba_obj.bridge_unreachable(&b2, &mut db_obj);
|
||||||
|
}
|
||||||
|
ba_obj.enc_bridge_table();
|
||||||
|
let response = ba_obj.handle_check_blockage(req).unwrap();
|
||||||
|
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
|
||||||
|
prepare_header(check_blockage_resp_str)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_and_send_blockage_migration(
|
||||||
|
request: Bytes,
|
||||||
|
ba: Arc<Mutex<BridgeAuth>>,
|
||||||
|
) -> Response<Body> {
|
||||||
|
let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap();
|
||||||
|
let mut ba_obj = ba.lock().unwrap();
|
||||||
|
let response = ba_obj.handle_blockage_migration(req).unwrap();
|
||||||
|
let resp_str = serde_json::to_string(&response).unwrap();
|
||||||
|
prepare_header(resp_str)
|
||||||
|
}
|
||||||
|
|
||||||
fn prepare_header(response: String) -> Response<Body> {
|
fn prepare_header(response: String) -> Response<Body> {
|
||||||
let mut resp = Response::new(Body::from(response));
|
let mut resp = Response::new(Body::from(response));
|
||||||
resp.headers_mut()
|
resp.headers_mut()
|
||||||
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
||||||
resp
|
resp
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
|
@ -248,8 +299,25 @@ async fn shutdown_signal() {
|
||||||
.expect("failed to listen for ctrl+c signal");
|
.expect("failed to listen for ctrl+c signal");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
// Initial bridgedb setup then:
|
||||||
|
// Listen for updates and return new bridges to be added to the bridgedb
|
||||||
|
async fn load_bridges() {
|
||||||
|
let endpoint = String::from("http://127.0.0.1:7100/resource-stream");
|
||||||
|
let name = String::from("https");
|
||||||
|
let token = String::from("HttpsApiTokenPlaceholder"); //Bring in from commmand line
|
||||||
|
let types = vec![String::from("obfs2"), String::from("scramblesuit")];
|
||||||
|
let rx = start_stream(endpoint, name, token, types).await.unwrap();
|
||||||
|
for diff in rx {
|
||||||
|
println!("Received diff: {:?}", diff); //send this through a channel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main(worker_threads = 2)]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
|
||||||
|
//
|
||||||
|
tokio::spawn(async move { load_bridges().await; });
|
||||||
|
// let new_bridgedb = task::spawn(load_bridges());
|
||||||
let num_buckets = 5;
|
let num_buckets = 5;
|
||||||
// Create and initialize a new db and lox_auth
|
// Create and initialize a new db and lox_auth
|
||||||
let mut bridgedb = BridgeDb::new();
|
let mut bridgedb = BridgeDb::new();
|
||||||
|
|
Loading…
Reference in New Issue