lox/crates/lox-distributor/src/main.rs

427 lines
17 KiB
Rust
Raw Normal View History

use hyper::{
body,
body::Bytes,
2022-11-15 22:04:04 -05:00
header::HeaderValue,
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode,
};
use lox::bridge_table::{BridgeLine, ENC_BUCKET_BYTES, BRIDGE_BYTES};
use lox::proto;
use lox::{BridgeAuth, BridgeDb, OPENINV_LENGTH};
use rand::RngCore;
use rdsys_backend::{proto::ResourceDiff, start_stream, ResourceStream};
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
env,
fs::File,
io::BufReader,
net::SocketAddr,
sync::{Arc, Mutex},
};
use serde_json;
use serde_with::serde_as;
use tokio::{spawn, sync::mpsc};
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct Invite {
#[serde_as(as = "[_; OPENINV_LENGTH]")]
invite: [u8; OPENINV_LENGTH],
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct EncBridgeTable {
#[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")]
etable: Vec<[u8; ENC_BUCKET_BYTES]>,
}
#[derive(Debug, Deserialize)]
struct ResourceInfo {
endpoint: String,
name: String,
token: String,
types: Vec<String>,
}
// Populate Bridgedb from rdsys
/// Create a random BridgeLine for testing ONLY. Do not use in production!
/// This was copied directly from lox/src/bridge_table.rs in order
/// to easily initialize a bridgedb/lox_auth with structurally
/// correct buckets to be used for Lox requests/verifications/responses.
/// In production, existing bridges should be translated into this format
/// in a private function and sorted into buckets (3 bridges/bucket is suggested
/// but experience may suggest something else) in some intelligent way.
pub fn random() -> BridgeLine {
let mut rng = rand::thread_rng();
let mut res: BridgeLine = BridgeLine::default();
// Pick a random 4-byte address
let mut addr: [u8; 4] = [0; 4];
rng.fill_bytes(&mut addr);
// If the leading byte is 224 or more, that's not a valid IPv4
// address. Choose an IPv6 address instead (but don't worry too
// much about it being well formed).
if addr[0] >= 224 {
rng.fill_bytes(&mut res.addr);
} else {
// Store an IPv4 address as a v4-mapped IPv6 address
res.addr[10] = 255;
res.addr[11] = 255;
res.addr[12..16].copy_from_slice(&addr);
};
let ports: [u16; 4] = [443, 4433, 8080, 43079];
let portidx = (rng.next_u32() % 4) as usize;
res.port = ports[portidx];
let mut fingerprint: [u8; 20] = [0; 20];
let mut cert: [u8; 52] = [0; 52];
rng.fill_bytes(&mut fingerprint);
rng.fill_bytes(&mut cert);
let infostr: String = format!(
"obfs4 {} cert={} iat-mode=0",
hex_fmt::HexFmt(fingerprint),
base64::encode_config(cert, base64::STANDARD_NO_PAD)
);
res.info[..infostr.len()].copy_from_slice(infostr.as_bytes());
res
}
#[derive(Clone)]
struct LoxServerContext {
db: Arc<Mutex<BridgeDb>>,
ba: Arc<Mutex<BridgeAuth>>,
}
async fn handle(
context: LoxServerContext,
// addr: SocketAddr,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
2023-02-06 13:57:23 -05:00
println!("Request: {:?}", req);
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match (req.method(), req.uri().path()) {
(&Method::GET, "/invite") => Ok::<_, Infallible>(generate_invite(context.db)),
(&Method::GET, "/reachability") => {
Ok::<_, Infallible>(send_reachability_cred(context.ba))
}
2023-02-06 13:57:23 -05:00
(&Method::GET, "/pubkeys") => Ok::<_, Infallible>(send_keys(context.ba)),
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_open_cred(bytes, context.ba)
}),
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_trust_promo(bytes, context.ba)
}),
2023-02-13 23:58:38 -05:00
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_trust_migration(bytes, context.ba)
}),
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_level_up(bytes, context.ba)
}),
2023-02-24 19:10:38 -05:00
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_issue_invite(bytes, context.ba)
}),
2023-02-27 14:17:07 -05:00
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_redeem_invite(bytes, context.ba)
}),
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
// TEST ONLY: Block all existing bridges and add new ones for migration
verify_and_send_check_blockage(bytes, context.ba, context.db)
}),
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
verify_and_send_blockage_migration(bytes, context.ba)
}),
_ => {
// Return 404 not found response.
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not found"))
.unwrap())
}
},
}
}
fn generate_invite(db: Arc<Mutex<lox::BridgeDb>>) -> Response<Body> {
let obj = db.lock().unwrap();
let invite = Invite {
invite: obj.invite(),
};
2023-02-13 16:28:24 -05:00
let token = serde_json::to_string(&invite).unwrap();
let mut resp = Response::new(Body::from(token));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}
// Return the serialized encrypted bridge table
fn send_reachability_cred(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let mut ba_obj = ba.lock().unwrap();
ba_obj.advance_days(85); // FOR TESTING ONLY
println!("Today's date according to server: {}", ba_obj.today());
let enc_table = ba_obj.enc_bridge_table().clone();
let etable = EncBridgeTable { etable: enc_table };
let mut resp = Response::new(Body::from(serde_json::to_string(&etable).unwrap()));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}
2023-02-06 13:57:23 -05:00
fn send_keys(ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let ba_obj = ba.lock().unwrap();
// vector of public keys (to serialize)
let ba_obj_pubkeys = vec![
&ba_obj.lox_pub,
&ba_obj.migration_pub,
&ba_obj.migrationkey_pub,
&ba_obj.reachability_pub,
&ba_obj.invitation_pub,
];
2023-02-13 16:28:24 -05:00
println!("Today's date according to server: {}", ba_obj.today());
2023-02-06 13:57:23 -05:00
let mut resp = Response::new(Body::from(serde_json::to_string(&ba_obj_pubkeys).unwrap()));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}
fn verify_and_send_open_cred(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::open_invite::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_open_invite(req).unwrap();
let open_invite_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(open_invite_resp_str)
}
fn verify_and_send_trust_promo(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::trust_promotion::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
ba_obj.advance_days(31); // FOR TESTING ONLY
2023-02-13 16:28:24 -05:00
println!("Today's date according to server: {}", ba_obj.today());
let response = ba_obj.handle_trust_promotion(req).unwrap();
let trust_promo_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(trust_promo_resp_str)
}
2023-02-13 23:58:38 -05:00
fn verify_and_send_trust_migration(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::migration::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_migration(req).unwrap();
let resp_str = serde_json::to_string(&response).unwrap();
prepare_header(resp_str)
}
fn verify_and_send_level_up(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::level_up::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_level_up(req).unwrap();
let level_up_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(level_up_resp_str)
}
2023-02-24 19:10:38 -05:00
fn verify_and_send_issue_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::issue_invite::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_issue_invite(req).unwrap();
let issue_invite_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(issue_invite_resp_str)
}
2023-02-27 14:17:07 -05:00
fn verify_and_send_redeem_invite(request: Bytes, ba: Arc<Mutex<BridgeAuth>>) -> Response<Body> {
let req: proto::redeem_invite::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_redeem_invite(req).unwrap();
let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(redeem_invite_resp_str)
2023-02-27 14:17:07 -05:00
}
fn verify_and_send_check_blockage(
request: Bytes,
ba: Arc<Mutex<BridgeAuth>>,
db: Arc<Mutex<BridgeDb>>,
) -> Response<Body> {
let req: proto::check_blockage::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
// block all of the existing buckets to trigger migration table propagation
// FOR TESTING ONLY, ADD 5 NEW Buckets
for _ in 0..5 {
let bucket = [random(), random(), random()];
ba_obj.add_spare_bucket(bucket);
}
ba_obj.enc_bridge_table();
// FOR TESTING ONLY, BLOCK ALL BRIDGES
let mut db_obj = db.lock().unwrap();
for index in 0..5 {
let b0 = ba_obj.bridge_table.buckets[index][0];
let b1 = ba_obj.bridge_table.buckets[index][1];
let b2 = ba_obj.bridge_table.buckets[index][2];
ba_obj.bridge_unreachable(&b0, &mut db_obj);
ba_obj.bridge_unreachable(&b1, &mut db_obj);
ba_obj.bridge_unreachable(&b2, &mut db_obj);
}
ba_obj.enc_bridge_table();
let response = ba_obj.handle_check_blockage(req).unwrap();
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
prepare_header(check_blockage_resp_str)
}
2023-02-27 14:17:07 -05:00
fn verify_and_send_blockage_migration(
request: Bytes,
ba: Arc<Mutex<BridgeAuth>>,
) -> Response<Body> {
let req: proto::blockage_migration::Request = serde_json::from_slice(&request).unwrap();
let mut ba_obj = ba.lock().unwrap();
let response = ba_obj.handle_blockage_migration(req).unwrap();
let resp_str = serde_json::to_string(&response).unwrap();
prepare_header(resp_str)
}
2023-02-24 19:10:38 -05:00
fn prepare_header(response: String) -> Response<Body> {
let mut resp = Response::new(Body::from(response));
2023-02-13 23:58:38 -05:00
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl+c signal");
}
// Initial bridgedb setup then:
// Listen for updates and return new bridges to be added to the bridged
// Run with cargo run -- config.json
#[tokio::main(worker_threads = 2)]
async fn main() {
let args: Vec<String> = env::args().collect();
let file = File::open(&args[1]).expect("Should have been able to read config.json file");
let reader = BufReader::new(file);
// Read the JSON contents of the file as a ResourceInfo
let rtype: ResourceInfo = serde_json::from_reader(reader).unwrap();
// pass in distribution of open invite vs. hot spare buckets?
let num_buckets = 5;
let hot_spare_buckets = 5;
let mut bridgedb = BridgeDb::new();
let mut lox_auth = BridgeAuth::new(bridgedb.pubkey);
//Sender is resource stream and receiver is bridgedb function (add_openinv_bridges)
let (mut tx, mut rx) = async_channel::bounded(3);
// to populate the bridge db
let rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
.await
.unwrap();
spawn(async move {
for diff in rstream {
println!("Received diff: {:?}", diff); //send this through a channel
tx.send(diff).await.expect("can not add to bridgedb)")
}
});
while let resourcediff = rx.recv().await.unwrap() {
spawn(async move {
for new_resource in resourcediff.new.unwrap(){
println!("A NEW RESOURCE: {:?}", new_resource);
match new_resource.0.as_str() {
"obfs2" => {
println!("Obfs2!");
let count = 0;
for resource in new_resource.1 {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
//let params = resource.params.unwrap(); I guess this should be a cert but I will fix this later
let infostr: String = format!(
"obfs4 {} fingerprint={} cert={} iat-mode=0",
resource.r#type,
resource.fingerprint,
base64::encode_config("super secret password", base64::STANDARD_NO_PAD),
);
let mut info_bytes: [u8; BRIDGE_BYTES - 18] = [0; BRIDGE_BYTES-18];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
let bline = BridgeLine {
addr: ip_bytes,
port: resource.port,
info: info_bytes,
};
println!("Now it's a bridgeline: {:?}", bline);
}
},
"scramblesuit" => println!("Scramblesuit!"),
"obfs4" => println!("Obfs4!"),
"meek" => println!("Meek!"),
"snowflake" => println!("Meek!"),
_=> println!("Other unknown"),
}
}
for changed_resource in resourcediff.changed{
println!("A NEW CHANGED RESOURCE: {:?}", changed_resource);
}
for gone_resource in resourcediff.gone{
println!("A NEW GONE RESOURCE: {:?}", gone_resource);
}
//parse resource diff into Bridgeline
//add open inv bridges
// users.push(user);
});
}
// let new_bridgedb = task::spawn(load_bridges());
// Create and initialize a new db and lox_auth
// Make 3 x num_buckets open invitation bridges, in sets of 3
for _ in 0..num_buckets {
let bucket = [random(), random(), random()];
lox_auth.add_openinv_bridges(bucket, &mut bridgedb);
}
// Create the encrypted bridge table
lox_auth.enc_bridge_table();
let context = LoxServerContext {
db: Arc::new(Mutex::new(bridgedb)),
ba: Arc::new(Mutex::new(lox_auth)),
};
let new_service = make_service_fn(move |_conn: &AddrStream| {
let context = context.clone();
let service = service_fn(move |req| {
// let addr = conn.remote_addr();
handle(context.clone(), req)
});
async move { Ok::<_, Infallible>(service) }
});
let addr = SocketAddr::from(([127, 0, 0, 1], 8001));
let server = Server::bind(&addr).serve(new_service);
let graceful = server.with_graceful_shutdown(shutdown_signal());
println!("Listening on {}", addr);
if let Err(e) = graceful.await {
eprintln!("server error: {}", e);
}
}