use clap::Parser; use futures::future; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, Body, Request, Response, Server, }; use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; use rdsys_backend::{proto::Resource, request_resources}; use serde::Deserialize; use std::{ convert::Infallible, fs::File, io::BufReader, net::SocketAddr, path::PathBuf, time::Duration, }; mod db_handler; use db_handler::DB; mod lox_context; mod request_handler; use request_handler::handle; mod resource_parser; use resource_parser::parse_resources; use tokio::{ signal, spawn, sync::{broadcast, mpsc, oneshot}, time::{interval, sleep}, }; async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to listen for ctrl+c signal"); println!("Shut down Lox Server"); } #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Name/path of the configuration file #[arg(short, long, default_value = "config.json")] config: PathBuf, // Optional Date/time to roll back to as a %Y-%m-%d_%H:%M:%S string // This argument should be passed if the lox_context should be rolled back to a // previous state due to, for example, a mass blocking event that is likely not // due to Lox user behaviour. If the exact roll back date/time is not known, the // last db entry within 24 hours from the passed roll_back_date will be used or else // the program will fail gracefully. #[arg(short, long, verbatim_doc_comment)] roll_back_date: Option, } #[derive(Debug, Deserialize)] struct Config { db: DbConfig, rtype: ResourceInfo, } #[derive(Debug, Deserialize)] pub struct DbConfig { // The path for the lox_context database, default is "lox_db" db_path: String, } impl Default for DbConfig { fn default() -> DbConfig { DbConfig { db_path: "lox_db".to_owned(), } } } #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, name: String, token: String, types: Vec, } // Populate Bridgedb from rdsys // Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified // in the config.json file. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_stream( rtype: ResourceInfo, tx: mpsc::Sender>, mut kill: broadcast::Receiver<()>, ) { tokio::select! { start_resource_request = rdsys_request(rtype, tx) => start_resource_request, _ = kill.recv() => {println!("Shut down rdsys request loop"); return}, } } async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender>) { let mut interval = interval(Duration::from_secs(30)); loop { interval.tick().await; let resources = request_resources( rtype.endpoint.clone(), rtype.name.clone(), rtype.token.clone(), rtype.types.clone(), ) .await .unwrap(); tx.send(resources).await.unwrap(); sleep(Duration::from_secs(30)).await; } } async fn rdsys_bridge_parser( rdsys_tx: mpsc::Sender, rx: mpsc::Receiver>, mut kill: broadcast::Receiver<()>, ) { tokio::select! { start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , _ = kill.recv() => {println!("Shut down bridge_parser");}, } } // Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the // Context Manager to be parsed and added to the BridgeDB async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver>) { loop { let resources = rx.recv().await.unwrap(); let cmd = Command::Rdsys { resources }; rdsys_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; } } async fn create_context_manager( db_config: DbConfig, roll_back_date: Option, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { create_context = context_manager(db_config, roll_back_date, context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager");}, } } // Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state async fn context_manager( db_config: DbConfig, roll_back_date: Option, mut context_rx: mpsc::Receiver, ) { let (mut lox_db, context) = match DB::open_new_or_existing_db(db_config, roll_back_date) { Ok((lox_db, context)) => (lox_db, context), Err(e) => { panic!("Error: {:?}", e); } }; while let Some(cmd) = context_rx.recv().await { use Command::*; match cmd { Rdsys { resources } => { let mut count = 0; let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; if context.bridgetable_is_empty() { // otherwise, for each resource, check if the resource fingerprint is failing tests, if it is check for how long // check if the resource is already in the Lox bridgetable // if it is, it's probably fine to remove or replace the existing resource with the incoming one // to account for changes unless we want to track the number of changes on the lox side? // that should be sufficient to keep it in sync let bridgelines = parse_resources(resources); for bridgeline in bridgelines { //context.populate_bridgetable(bridgelines. None); println!("What is the bridgeline: {:?}", bridgeline); if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); if res == lox_library::ReplaceSuccess::NotFound { println!( "BridgeLine not found in bridge_table, already updated {:?}", bridgeline ); } else if res == lox_library::ReplaceSuccess::Replaced { println!("BridgeLine successfully replaced: {:?}", bridgeline); } else { assert!( res == lox_library::ReplaceSuccess::NotReplaced, "ReplaceSuccess incorrectly set somehow" ); // Add the bridge to the list of to_be_replaced bridges in the Lox context and try // again to replace at the next update (nothing changes in the Lox Authority) println!( "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline ); context.new_to_be_replaced_bridge(bridgeline); } } else if count < MAX_BRIDGES_PER_BUCKET { bucket[count] = bridgeline; count += 1; } else { // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, // eventually also do some more fancy grouping of new resources, i.e., by type or region context.add_openinv_bucket(bucket); count = 0; bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; } } // Handle the extra buckets that were not allocated already if count != 0 { for val in 0..count { if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET) { context.append_extra_bridges(bucket[val]); } else { bucket = context.remove_extra_bridges(); context.add_spare_bucket(bucket); } } } } /* let bridgeline = parse_resource(resource); println!("BridgeLine to be changed: {:?}", bridgeline); let res = context.update_bridge(bridgeline); if res { println!("BridgeLine successfully updated: {:?}", bridgeline); } else { println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline); if context.extra_bridges.lock().unwrap().len() < 2 { context.append_extra_bridges(bridgeline); } else { let bucket = context.remove_extra_bridges(); context.add_spare_bucket(bucket); } } } } } } // gone resources are not the same as blocked resources. // Instead, these are bridges which have either failed to pass tests for some period // or have expired bridge descriptors. In both cases, the bridge is unusable, but this // is not likely due to censorship. Therefore, we replace gone resources with new resources // TODO: create a notion of blocked resources from information collected through various means: // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 // If resource last passed tests 3 hours ago, it should be replaced with a working // resource and be removed from the bridgetable. If it has been gone for more than 7 hours, // we should stop trying to remove it from the bridge table and assume it has successfully been // removed already if resource.last_passed < (Utc::now() - chrono::Duration::hours(3)) || resource.last_passed > (Utc::now() - chrono::Duration::hours(7)) { let bridgeline = parse_resource(resource); println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); if res == lox_library::ReplaceSuccess::Replaced { println!( "BridgeLine successfully replaced: {:?}", bridgeline ); } else if res == lox_library::ReplaceSuccess::NotReplaced { // Add the bridge to the list of to_be_replaced bridges in the Lox context and try // again to replace at the next update (nothing changes in the Lox Authority) println!( "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline ); context.new_to_be_replaced_bridge(bridgeline); } } } } } } */ /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something like the following: println!("BridgeLine to be removed: {:?}", bridgeline); let res = context.add_unreachable(bridgeline); if res { println!( "BridgeLine successfully marked unreachable: {:?}", bridgeline ); } else { println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); //TODO probably do something else here } */ context.allocate_leftover_bridges(); context.encrypt_table(); lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } Request { req, sender } => { let response = handle(context.clone(), req).await; if let Err(e) = sender.send(response) { eprintln!("Server Response Error: {:?}", e); }; lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } Shutdown { shutdown_sig } => { lox_db.write_context(context.clone()); println!("Sending Shutdown Signal, all threads should shutdown."); drop(shutdown_sig); println!("Shutdown Sent."); } } } } // Each of the commands that the Context Manager handles #[derive(Debug)] enum Command { Rdsys { resources: Vec, }, Request { req: Request, sender: oneshot::Sender, Infallible>>, }, Shutdown { shutdown_sig: broadcast::Sender<()>, }, } #[tokio::main] async fn main() { let args: Args = Args::parse(); let file = File::open(&args.config).expect("Could not read config file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo let config: Config = serde_json::from_reader(reader).expect("Reading Config from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); let shutdown_cmd_tx = rdsys_tx.clone(); // create the shutdown broadcast channel and clone for every thread let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); let kill_stream = shutdown_tx.subscribe(); let kill_parser = shutdown_tx.subscribe(); let kill_context = shutdown_tx.subscribe(); // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx let shutdown_handler = spawn(async move { tokio::select! { _ = signal::ctrl_c() => { let cmd = Command::Shutdown { shutdown_sig: shutdown_tx, }; shutdown_cmd_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; _ = shutdown_rx.recv().await; } } }); let context_manager = spawn(async move { create_context_manager(config.db, args.roll_back_date, context_rx, kill_context).await }); let (tx, rx) = mpsc::channel(32); let rdsys_stream_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await }); let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); let make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = request_tx.clone(); let service = service_fn(move |req| { let request_tx = request_tx.clone(); let (response_tx, response_rx) = oneshot::channel(); let cmd = Command::Request { req, sender: response_tx, }; async move { request_tx.send(cmd).await.unwrap(); response_rx.await.unwrap() } }); async move { Ok::<_, Infallible>(service) } }); let addr = SocketAddr::from(([127, 0, 0, 1], 8001)); let server = Server::bind(&addr).serve(make_service); let graceful = server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", addr); if let Err(e) = graceful.await { eprintln!("server error: {}", e); } future::join_all([ rdsys_stream_handler, rdsys_resource_receiver, context_manager, shutdown_handler, ]) .await; }