Merge 'lox-distributor/main' into lox-workspace
This commit is contained in:
commit
98218f7bf9
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
|
@ -0,0 +1,35 @@
|
|||
# Lox Distributor
|
||||
|
||||
The Lox distributor receives resources from [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) and writes them to [Lox
|
||||
BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm).
|
||||
|
||||
## Configure rdsys stream
|
||||
|
||||
A test `config.json` is included for testing on a local instance of rdsys. This
|
||||
can be edited to correspond to the desired types of resources and endpoints.
|
||||
|
||||
## Test Run
|
||||
|
||||
For testing purposes, you will need a running instance of rdsys as well as a running Lox client.
|
||||
|
||||
### Run rdsys locally
|
||||
|
||||
First clone rdsys from [here](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) then change into the backend directory:
|
||||
|
||||
```
|
||||
cd rdsys/cmd/backend
|
||||
```
|
||||
|
||||
Finally run rdsys:
|
||||
|
||||
```
|
||||
./backend --config config.json
|
||||
```
|
||||
|
||||
### Run Lox Distributor locally
|
||||
|
||||
Simply run `cargo run -- config.json` :)
|
||||
|
||||
### Run a Lox client locally
|
||||
|
||||
First clone lox-wasm from [here](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). Follow the instructions in the [README](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm/-/blob/main/README.md) to build and test the Lox client.
|
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"endpoint": "http://127.0.0.1:7100/resource-stream",
|
||||
"name": "https",
|
||||
"token": "HttpsApiTokenPlaceholder",
|
||||
"types": [
|
||||
"obfs2",
|
||||
"scramblesuit"
|
||||
]
|
||||
}
|
|
@ -0,0 +1,325 @@
|
|||
use hyper::{body::Bytes, header::HeaderValue, Body, Response};
|
||||
|
||||
use lox::{
|
||||
bridge_table::{BridgeLine, ENC_BUCKET_BYTES, MAX_BRIDGES_PER_BUCKET},
|
||||
proto::{
|
||||
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
|
||||
redeem_invite, trust_promotion,
|
||||
},
|
||||
BridgeAuth, BridgeDb, IssuerPubKey, OPENINV_LENGTH,
|
||||
};
|
||||
use rand::RngCore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Invite {
|
||||
#[serde_as(as = "[_; OPENINV_LENGTH]")]
|
||||
invite: [u8; OPENINV_LENGTH],
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct EncBridgeTable {
|
||||
#[serde_as(as = "Vec<[_; ENC_BUCKET_BYTES]>")]
|
||||
etable: Vec<[u8; ENC_BUCKET_BYTES]>,
|
||||
}
|
||||
|
||||
/// Create a random BridgeLine for testing ONLY. Do not use in production!
|
||||
/// This was copied directly from lox/src/bridge_table.rs in order
|
||||
/// to easily initialize a bridgedb/lox_auth with structurally
|
||||
/// correct buckets to be used for Lox requests/verifications/responses.
|
||||
/// In production, existing bridges should be translated into this format
|
||||
/// in a private function and sorted into buckets (3 bridges/bucket is suggested
|
||||
/// but experience may suggest something else) in some intelligent way.
|
||||
|
||||
pub fn random() -> BridgeLine {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut res: BridgeLine = BridgeLine::default();
|
||||
// Pick a random 4-byte address
|
||||
let mut addr: [u8; 4] = [0; 4];
|
||||
rng.fill_bytes(&mut addr);
|
||||
// If the leading byte is 224 or more, that's not a valid IPv4
|
||||
// address. Choose an IPv6 address instead (but don't worry too
|
||||
// much about it being well formed).
|
||||
if addr[0] >= 224 {
|
||||
rng.fill_bytes(&mut res.addr);
|
||||
} else {
|
||||
// Store an IPv4 address as a v4-mapped IPv6 address
|
||||
res.addr[10] = 255;
|
||||
res.addr[11] = 255;
|
||||
res.addr[12..16].copy_from_slice(&addr);
|
||||
};
|
||||
let ports: [u16; 4] = [443, 4433, 8080, 43079];
|
||||
let portidx = (rng.next_u32() % 4) as usize;
|
||||
res.port = ports[portidx];
|
||||
res.uid_fingerprint = rng.next_u64();
|
||||
let mut cert: [u8; 52] = [0; 52];
|
||||
rng.fill_bytes(&mut cert);
|
||||
let infostr: String = format!(
|
||||
"obfs4 cert={}, iat-mode=0",
|
||||
base64::encode_config(cert, base64::STANDARD_NO_PAD)
|
||||
);
|
||||
res.info[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
||||
res
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LoxServerContext {
|
||||
pub db: Arc<Mutex<BridgeDb>>,
|
||||
pub ba: Arc<Mutex<BridgeAuth>>,
|
||||
pub extra_bridges: Arc<Mutex<Vec<BridgeLine>>>,
|
||||
pub unreplaced_bridges: Arc<Mutex<Vec<BridgeLine>>>,
|
||||
}
|
||||
|
||||
impl LoxServerContext {
|
||||
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
|
||||
let mut extra_bridges = self.extra_bridges.lock().unwrap();
|
||||
extra_bridges.push(bridge);
|
||||
}
|
||||
|
||||
pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] {
|
||||
let mut extra_bridges = self.extra_bridges.lock().unwrap();
|
||||
let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
|
||||
for i in 0..MAX_BRIDGES_PER_BUCKET {
|
||||
return_bridges[i] = extra_bridges.remove(i);
|
||||
}
|
||||
|
||||
return_bridges
|
||||
}
|
||||
|
||||
pub fn remove_single_bridge(&self) {
|
||||
let mut extra_bridges = self.extra_bridges.lock().unwrap();
|
||||
let length = extra_bridges.len();
|
||||
_ = extra_bridges.remove(length - 1)
|
||||
}
|
||||
|
||||
pub fn new_unreplaced_bridge(&self, bridge: BridgeLine) {
|
||||
let mut unreplaced_bridges = self.unreplaced_bridges.lock().unwrap();
|
||||
unreplaced_bridges.push(bridge);
|
||||
}
|
||||
|
||||
pub fn allocate_leftover_bridges(&self) {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
let mut db_obj = self.db.lock().unwrap();
|
||||
let mut extra_bridges = self.extra_bridges.lock().unwrap();
|
||||
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
|
||||
}
|
||||
|
||||
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
let mut db_obj = self.db.lock().unwrap();
|
||||
ba_obj.add_openinv_bridges(bucket, &mut db_obj);
|
||||
}
|
||||
|
||||
pub fn add_spare_bucket(&self, bucket: [BridgeLine; 3]) {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.add_spare_bucket(bucket);
|
||||
}
|
||||
|
||||
pub fn replace_with_new(&self, bridgeline: BridgeLine) -> bool {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
let eb_obj = self.extra_bridges.lock().unwrap();
|
||||
let available_bridge = eb_obj.last();
|
||||
// .last() doesn't actually remove the object so we still have to do that
|
||||
if eb_obj.len() > 0 {
|
||||
self.remove_single_bridge();
|
||||
}
|
||||
ba_obj.bridge_replace(&bridgeline, available_bridge)
|
||||
}
|
||||
|
||||
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
let mut db_obj = self.db.lock().unwrap();
|
||||
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
|
||||
}
|
||||
|
||||
pub fn update_bridge(&self, bridgeline: BridgeLine) -> bool {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.bridge_update(&bridgeline)
|
||||
}
|
||||
|
||||
fn advance_days_TEST(&self, num: u16) {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.advance_days(num); // FOR TESTING ONLY
|
||||
println!("Today's date according to server: {}", ba_obj.today());
|
||||
}
|
||||
|
||||
pub fn encrypt_table(&self) -> Vec<[u8; ENC_BUCKET_BYTES]> {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.enc_bridge_table().clone()
|
||||
}
|
||||
|
||||
fn pubkeys(&self) -> Vec<IssuerPubKey> {
|
||||
let ba_obj = self.ba.lock().unwrap();
|
||||
// vector of public keys (to serialize)
|
||||
vec![
|
||||
ba_obj.lox_pub.clone(),
|
||||
ba_obj.migration_pub.clone(),
|
||||
ba_obj.migrationkey_pub.clone(),
|
||||
ba_obj.reachability_pub.clone(),
|
||||
ba_obj.invitation_pub.clone(),
|
||||
]
|
||||
}
|
||||
|
||||
fn gen_invite(&self) -> Invite {
|
||||
let obj = self.db.lock().unwrap();
|
||||
Invite {
|
||||
invite: obj.invite(),
|
||||
}
|
||||
}
|
||||
|
||||
fn open_inv(&self, req: open_invite::Request) -> open_invite::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_open_invite(req).unwrap()
|
||||
}
|
||||
|
||||
fn trust_promo(&self, req: trust_promotion::Request) -> trust_promotion::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_trust_promotion(req).unwrap()
|
||||
}
|
||||
|
||||
fn trust_migration(&self, req: migration::Request) -> migration::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_migration(req).unwrap()
|
||||
}
|
||||
|
||||
fn level_up(&self, req: level_up::Request) -> level_up::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_level_up(req).unwrap()
|
||||
}
|
||||
|
||||
fn issue_invite(&self, req: issue_invite::Request) -> issue_invite::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_issue_invite(req).unwrap()
|
||||
}
|
||||
|
||||
fn redeem_invite(&self, req: redeem_invite::Request) -> redeem_invite::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_redeem_invite(req).unwrap()
|
||||
}
|
||||
|
||||
fn check_blockage(&self, req: check_blockage::Request) -> check_blockage::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
// Created 5 buckets initially, so we will add 5 hot spares (for migration) and
|
||||
// block all of the existing buckets to trigger migration table propagation
|
||||
// FOR TESTING ONLY, ADD 5 NEW Buckets
|
||||
for _ in 0..5 {
|
||||
let bucket = [random(), random(), random()];
|
||||
ba_obj.add_spare_bucket(bucket);
|
||||
}
|
||||
ba_obj.enc_bridge_table();
|
||||
|
||||
// FOR TESTING ONLY, BLOCK ALL BRIDGES
|
||||
let mut db_obj = self.db.lock().unwrap();
|
||||
for index in 0..5 {
|
||||
let b0 = ba_obj.bridge_table.buckets[index][0];
|
||||
let b1 = ba_obj.bridge_table.buckets[index][1];
|
||||
let b2 = ba_obj.bridge_table.buckets[index][2];
|
||||
ba_obj.bridge_unreachable(&b0, &mut db_obj);
|
||||
ba_obj.bridge_unreachable(&b1, &mut db_obj);
|
||||
ba_obj.bridge_unreachable(&b2, &mut db_obj);
|
||||
}
|
||||
ba_obj.enc_bridge_table();
|
||||
ba_obj.handle_check_blockage(req).unwrap()
|
||||
}
|
||||
|
||||
fn blockage_migration(&self, req: blockage_migration::Request) -> blockage_migration::Response {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
ba_obj.handle_blockage_migration(req).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
// Generate and return an open invitation token
|
||||
pub fn generate_invite(context: LoxServerContext) -> Response<Body> {
|
||||
let invite = context.gen_invite();
|
||||
let token = serde_json::to_string(&invite).unwrap();
|
||||
prepare_header(token)
|
||||
}
|
||||
|
||||
// Return the serialized encrypted bridge table
|
||||
pub fn send_reachability_cred(context: LoxServerContext) -> Response<Body> {
|
||||
context.advance_days_TEST(85); // FOR TESTING ONLY
|
||||
let enc_table = context.encrypt_table();
|
||||
let etable = EncBridgeTable { etable: enc_table };
|
||||
prepare_header(serde_json::to_string(&etable).unwrap())
|
||||
}
|
||||
|
||||
// Return the serialized pubkeys for the Bridge Authority
|
||||
pub fn send_keys(context: LoxServerContext) -> Response<Body> {
|
||||
let pubkeys = context.pubkeys();
|
||||
prepare_header(serde_json::to_string(&pubkeys).unwrap())
|
||||
}
|
||||
|
||||
pub fn verify_and_send_open_cred(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: open_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.open_inv(req);
|
||||
let open_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(open_invite_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_trust_promo(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: trust_promotion::Request = serde_json::from_slice(&request).unwrap();
|
||||
context.advance_days_TEST(31); // FOR TESTING ONLY
|
||||
let response = context.trust_promo(req);
|
||||
let trust_promo_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(trust_promo_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_trust_migration(
|
||||
request: Bytes,
|
||||
context: LoxServerContext,
|
||||
) -> Response<Body> {
|
||||
let req: migration::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.trust_migration(req);
|
||||
let resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_level_up(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: level_up::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.level_up(req);
|
||||
let level_up_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(level_up_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_issue_invite(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: issue_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.issue_invite(req);
|
||||
let issue_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(issue_invite_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_redeem_invite(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: redeem_invite::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.redeem_invite(req);
|
||||
let redeem_invite_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(redeem_invite_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_check_blockage(request: Bytes, context: LoxServerContext) -> Response<Body> {
|
||||
let req: check_blockage::Request = serde_json::from_slice(&request).unwrap();
|
||||
|
||||
let response = context.check_blockage(req);
|
||||
let check_blockage_resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(check_blockage_resp_str)
|
||||
}
|
||||
|
||||
pub fn verify_and_send_blockage_migration(
|
||||
request: Bytes,
|
||||
context: LoxServerContext,
|
||||
) -> Response<Body> {
|
||||
let req: blockage_migration::Request = serde_json::from_slice(&request).unwrap();
|
||||
let response = context.blockage_migration(req);
|
||||
let resp_str = serde_json::to_string(&response).unwrap();
|
||||
prepare_header(resp_str)
|
||||
}
|
||||
|
||||
fn prepare_header(response: String) -> Response<Body> {
|
||||
let mut resp = Response::new(Body::from(response));
|
||||
resp.headers_mut()
|
||||
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
|
||||
resp
|
||||
}
|
|
@ -0,0 +1,344 @@
|
|||
use futures::future;
|
||||
use futures::StreamExt;
|
||||
use hyper::{
|
||||
server::conn::AddrStream,
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Request, Response, Server,
|
||||
};
|
||||
use lox::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
|
||||
use lox::{BridgeAuth, BridgeDb};
|
||||
|
||||
use rdsys_backend::{proto::ResourceDiff, start_stream};
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
env,
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
mod lox_context;
|
||||
mod request_handler;
|
||||
use request_handler::handle;
|
||||
mod resource_parser;
|
||||
use resource_parser::parse_resource;
|
||||
|
||||
use tokio::{
|
||||
signal, spawn,
|
||||
sync::{broadcast, mpsc, oneshot},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
async fn shutdown_signal() {
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("failed to listen for ctrl+c signal");
|
||||
println!("Shut down Lox Server");
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResourceInfo {
|
||||
endpoint: String,
|
||||
name: String,
|
||||
token: String,
|
||||
types: Vec<String>,
|
||||
}
|
||||
// Populate Bridgedb from rdsys
|
||||
|
||||
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
|
||||
// in the config.json file.
|
||||
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
|
||||
async fn rdsys_stream(
|
||||
rtype: ResourceInfo,
|
||||
tx: mpsc::Sender<ResourceDiff>,
|
||||
mut kill: broadcast::Receiver<()>,
|
||||
) {
|
||||
let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
|
||||
.await
|
||||
.expect("rdsys stream initialization failed. Start rdsys or check config.json");
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = rstream.next() => {
|
||||
match res {
|
||||
Some(diff) => tx.send(diff).await.unwrap(),
|
||||
None => return,
|
||||
}
|
||||
},
|
||||
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn rdsys_bridge_parser(
|
||||
rdsys_tx: mpsc::Sender<Command>,
|
||||
rx: mpsc::Receiver<ResourceDiff>,
|
||||
mut kill: broadcast::Receiver<()>,
|
||||
) {
|
||||
tokio::select! {
|
||||
start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser ,
|
||||
_ = kill.recv() => {println!("Shut down bridge_parser");},
|
||||
}
|
||||
}
|
||||
|
||||
// Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the
|
||||
// Context Manager to be parsed and added to the BridgeDB
|
||||
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) {
|
||||
loop {
|
||||
let resourcediff = rx.recv().await.unwrap();
|
||||
let cmd = Command::Rdsys { resourcediff };
|
||||
rdsys_tx.send(cmd).await.unwrap();
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_context_manager(
|
||||
context_rx: mpsc::Receiver<Command>,
|
||||
mut kill: broadcast::Receiver<()>,
|
||||
) {
|
||||
tokio::select! {
|
||||
create_context = context_manager(context_rx) => create_context,
|
||||
_ = kill.recv() => {println!("Shut down context_manager");},
|
||||
}
|
||||
}
|
||||
|
||||
// Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring
|
||||
// that the DB can be updated from the rdsys stream and client requests
|
||||
// can be responded to with an updated BridgeDB state
|
||||
async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
|
||||
let bridgedb = BridgeDb::new();
|
||||
let lox_auth = BridgeAuth::new(bridgedb.pubkey);
|
||||
|
||||
let context = lox_context::LoxServerContext {
|
||||
db: Arc::new(Mutex::new(bridgedb)),
|
||||
ba: Arc::new(Mutex::new(lox_auth)),
|
||||
extra_bridges: Arc::new(Mutex::new(Vec::new())),
|
||||
unreplaced_bridges: Arc::new(Mutex::new(Vec::new())),
|
||||
};
|
||||
|
||||
while let Some(cmd) = context_rx.recv().await {
|
||||
use Command::*;
|
||||
match cmd {
|
||||
Rdsys { resourcediff } => {
|
||||
if let Some(new_resources) = resourcediff.new {
|
||||
let mut count = 0;
|
||||
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
|
||||
for pt in new_resources {
|
||||
println!("A NEW RESOURCE: {:?}", pt);
|
||||
for resource in pt.1 {
|
||||
let bridgeline = parse_resource(resource);
|
||||
println!("Now it's a bridgeline: {:?}", bridgeline);
|
||||
if context.unreplaced_bridges.lock().unwrap().len() > 0 {
|
||||
println!("BridgeLine to be replaced: {:?}", bridgeline);
|
||||
let res = context.replace_with_new(bridgeline);
|
||||
if res {
|
||||
println!("BridgeLine successfully replaced: {:?}", bridgeline);
|
||||
} else {
|
||||
// Add the bridge to the list of unreplaced bridges in the Lox context and try
|
||||
// again to replace at the next update (nothing changes in the Lox Authority)
|
||||
println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline);
|
||||
context.new_unreplaced_bridge(bridgeline);
|
||||
}
|
||||
} else if count < MAX_BRIDGES_PER_BUCKET {
|
||||
bucket[count] = bridgeline;
|
||||
count += 1;
|
||||
} else {
|
||||
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
|
||||
// eventually also do some more fancy grouping of new resources, i.e., by type or region
|
||||
context.add_openinv_bucket(bucket);
|
||||
count = 0;
|
||||
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
|
||||
}
|
||||
}
|
||||
}
|
||||
// Handle the extra buckets that were not allocated already
|
||||
if count != 0 {
|
||||
for val in 0..count {
|
||||
if context.extra_bridges.lock().unwrap().len()
|
||||
< (MAX_BRIDGES_PER_BUCKET)
|
||||
{
|
||||
context.append_extra_bridges(bucket[val]);
|
||||
} else {
|
||||
bucket = context.remove_extra_bridges();
|
||||
context.add_spare_bucket(bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(changed_resources) = resourcediff.changed {
|
||||
for pt in changed_resources {
|
||||
println!("A NEW CHANGED RESOURCE: {:?}", pt);
|
||||
for resource in pt.1 {
|
||||
let bridgeline = parse_resource(resource);
|
||||
println!("BridgeLine to be changed: {:?}", bridgeline);
|
||||
let res = context.update_bridge(bridgeline);
|
||||
if res {
|
||||
println!("BridgeLine successfully updated: {:?}", bridgeline);
|
||||
} else {
|
||||
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline);
|
||||
if context.extra_bridges.lock().unwrap().len() < 2 {
|
||||
context.append_extra_bridges(bridgeline);
|
||||
} else {
|
||||
let bucket = context.remove_extra_bridges();
|
||||
context.add_spare_bucket(bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// gone resources are not the same as blocked resources.
|
||||
// Instead, these are bridges which have either failed to pass tests for some period
|
||||
// or have expired bridge descriptors. In both cases, the bridge is unusable, but this
|
||||
// is not likely due to censorship. Therefore, we replace gone resources with new resources
|
||||
// TODO: create a notion of blocked resources from information collected through various means:
|
||||
// https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035
|
||||
if let Some(gone_resources) = resourcediff.gone {
|
||||
for pt in gone_resources {
|
||||
println!("A NEW GONE RESOURCE: {:?}", pt);
|
||||
for resource in pt.1 {
|
||||
let bridgeline = parse_resource(resource);
|
||||
println!("BridgeLine to be replaced: {:?}", bridgeline);
|
||||
let res = context.replace_with_new(bridgeline);
|
||||
if res {
|
||||
println!("BridgeLine successfully replaced: {:?}", bridgeline);
|
||||
} else {
|
||||
// Add the bridge to the list of unreplaced bridges in the Lox context and try
|
||||
// again to replace at the next update (nothing changes in the Lox Authority)
|
||||
println!(
|
||||
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
|
||||
bridgeline
|
||||
);
|
||||
context.new_unreplaced_bridge(bridgeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not
|
||||
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not
|
||||
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something
|
||||
like the following:
|
||||
println!("BridgeLine to be removed: {:?}", bridgeline);
|
||||
let res = context.add_unreachable(bridgeline);
|
||||
if res {
|
||||
println!(
|
||||
"BridgeLine successfully marked unreachable: {:?}",
|
||||
bridgeline
|
||||
);
|
||||
} else {
|
||||
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
|
||||
//TODO probably do something else here
|
||||
}
|
||||
*/
|
||||
context.allocate_leftover_bridges();
|
||||
context.encrypt_table();
|
||||
sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
Request { req, sender } => {
|
||||
let response = handle(context.clone(), req).await;
|
||||
if let Err(e) = sender.send(response) {
|
||||
eprintln!("Server Response Error: {:?}", e);
|
||||
};
|
||||
sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
Shutdown { shutdown_sig } => {
|
||||
println!("Sending Shutdown Signal, all threads should shutdown.");
|
||||
drop(shutdown_sig);
|
||||
println!("Shutdown Sent.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Each of the commands that the Context Manager handles
|
||||
#[derive(Debug)]
|
||||
enum Command {
|
||||
Rdsys {
|
||||
resourcediff: ResourceDiff,
|
||||
},
|
||||
Request {
|
||||
req: Request<Body>,
|
||||
sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
|
||||
},
|
||||
Shutdown {
|
||||
shutdown_sig: broadcast::Sender<()>,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let file = File::open(&args[1]).expect("Should have been able to read config.json file");
|
||||
let reader = BufReader::new(file);
|
||||
// Read the JSON contents of the file as a ResourceInfo
|
||||
let rtype: ResourceInfo =
|
||||
serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed.");
|
||||
|
||||
let (rdsys_tx, context_rx) = mpsc::channel(32);
|
||||
let request_tx = rdsys_tx.clone();
|
||||
let shutdown_cmd_tx = rdsys_tx.clone();
|
||||
|
||||
// create the shutdown broadcast channel and clone for every thread
|
||||
let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
|
||||
let kill_stream = shutdown_tx.subscribe();
|
||||
let kill_parser = shutdown_tx.subscribe();
|
||||
let kill_context = shutdown_tx.subscribe();
|
||||
|
||||
// Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx
|
||||
let shutdown_handler = spawn(async move {
|
||||
tokio::select! {
|
||||
_ = signal::ctrl_c() => {
|
||||
let cmd = Command::Shutdown {
|
||||
shutdown_sig: shutdown_tx,
|
||||
};
|
||||
shutdown_cmd_tx.send(cmd).await.unwrap();
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
|
||||
_ = shutdown_rx.recv().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let context_manager =
|
||||
spawn(async move { create_context_manager(context_rx, kill_context).await });
|
||||
|
||||
let (tx, rx) = mpsc::channel(32);
|
||||
let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await });
|
||||
|
||||
let rdsys_resource_receiver =
|
||||
spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });
|
||||
|
||||
let make_service = make_service_fn(move |_conn: &AddrStream| {
|
||||
let request_tx = request_tx.clone();
|
||||
let service = service_fn(move |req| {
|
||||
let request_tx = request_tx.clone();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let cmd = Command::Request {
|
||||
req,
|
||||
sender: response_tx,
|
||||
};
|
||||
async move {
|
||||
request_tx.send(cmd).await.unwrap();
|
||||
response_rx.await.unwrap()
|
||||
}
|
||||
});
|
||||
async move { Ok::<_, Infallible>(service) }
|
||||
});
|
||||
|
||||
let addr = SocketAddr::from(([127, 0, 0, 1], 8001));
|
||||
let server = Server::bind(&addr).serve(make_service);
|
||||
let graceful = server.with_graceful_shutdown(shutdown_signal());
|
||||
println!("Listening on {}", addr);
|
||||
if let Err(e) = graceful.await {
|
||||
eprintln!("server error: {}", e);
|
||||
}
|
||||
future::join_all([
|
||||
rdsys_stream_handler,
|
||||
rdsys_resource_receiver,
|
||||
context_manager,
|
||||
shutdown_handler,
|
||||
])
|
||||
.await;
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode};
|
||||
|
||||
use std::convert::Infallible;
|
||||
|
||||
use crate::lox_context;
|
||||
use crate::lox_context::LoxServerContext;
|
||||
|
||||
// Lox Request handling logic for each Lox request/protocol
|
||||
pub async fn handle(
|
||||
cloned_context: LoxServerContext,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Infallible> {
|
||||
println!("Request: {:?}", req);
|
||||
match req.method() {
|
||||
&Method::OPTIONS => Ok(Response::builder()
|
||||
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
|
||||
.header("Access-Control-Allow-Headers", "accept, content-type")
|
||||
.header("Access-Control-Allow-Methods", "POST")
|
||||
.status(200)
|
||||
.body(Body::from("Allow POST"))
|
||||
.unwrap()),
|
||||
_ => match (req.method(), req.uri().path()) {
|
||||
(&Method::POST, "/invite") => {
|
||||
Ok::<_, Infallible>(lox_context::generate_invite(cloned_context))
|
||||
}
|
||||
(&Method::POST, "/reachability") => {
|
||||
Ok::<_, Infallible>(lox_context::send_reachability_cred(cloned_context))
|
||||
}
|
||||
(&Method::POST, "/pubkeys") => {
|
||||
Ok::<_, Infallible>(lox_context::send_keys(cloned_context))
|
||||
}
|
||||
(&Method::POST, "/openreq") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_open_cred(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/trustpromo") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_trust_promo(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/trustmig") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_trust_migration(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/levelup") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_level_up(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/issueinvite") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_issue_invite(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/redeem") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_redeem_invite(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/checkblockage") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
// TEST ONLY: Block all existing bridges and add new ones for migration
|
||||
lox_context::verify_and_send_check_blockage(bytes, cloned_context)
|
||||
}),
|
||||
(&Method::POST, "/blockagemigration") => Ok::<_, Infallible>({
|
||||
let bytes = body::to_bytes(req.into_body()).await.unwrap();
|
||||
lox_context::verify_and_send_blockage_migration(bytes, cloned_context)
|
||||
}),
|
||||
_ => {
|
||||
// Return 404 not found response.
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("Not found"))
|
||||
.unwrap())
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
use lox::bridge_table::{BridgeLine, BRIDGE_BYTES};
|
||||
use rdsys_backend::proto::Resource;
|
||||
|
||||
pub fn parse_resource(resource: Resource) -> BridgeLine {
|
||||
let mut ip_bytes: [u8; 16] = [0; 16];
|
||||
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
|
||||
let resource_uid = resource
|
||||
.get_uid()
|
||||
.expect("Unable to get Fingerprint UID of resource");
|
||||
let infostr: String = format!(
|
||||
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
|
||||
resource.r#type,
|
||||
resource.blocked_in,
|
||||
resource.protocol,
|
||||
resource.fingerprint,
|
||||
resource.or_addresses,
|
||||
resource.distribution,
|
||||
resource.flags,
|
||||
resource.params,
|
||||
);
|
||||
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
|
||||
|
||||
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
|
||||
BridgeLine {
|
||||
addr: ip_bytes,
|
||||
port: resource.port,
|
||||
uid_fingerprint: resource_uid,
|
||||
info: info_bytes,
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue