// Allow points to be capital letters #![allow(non_snake_case)] use clap::Parser; use curve25519_dalek::ristretto::RistrettoBasepointTable; use futures::{future, join}; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, Body, Request, Response, Server, }; use prometheus_client::registry::Registry; use rdsys_backend::{proto::ResourceState, request_resources}; use serde::Deserialize; use std::{ collections::HashMap, convert::Infallible, fs::File, io::BufReader, net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration, }; mod db_handler; use db_handler::DB; mod fake_resource_state; mod lox_context; mod metrics; use metrics::Metrics; mod request_handler; use request_handler::handle; mod resource_parser; use resource_parser::{parse_into_bridgelines, parse_into_buckets}; mod test_handler; use test_handler::handle as test_handle; mod troll_patrol_handler; use troll_patrol_handler::handle as tp_handle; use tokio::{ signal, spawn, sync::{broadcast, mpsc, oneshot}, time::{interval, sleep}, }; async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to listen for ctrl+c signal"); println!("Shut down Lox Server"); } #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// Name/path of the configuration file #[arg(short, long, default_value = "config.json")] config: PathBuf, // Optional Date/time to roll back to as a %Y-%m-%d_%H:%M:%S string // This argument should be passed if the lox_context should be rolled back to a // previous state due to, for example, a mass blocking event that is likely not // due to Lox user behaviour. If the exact roll back date/time is not known, the // last db entry within 24 hours from the passed roll_back_date will be used or else // the program will fail gracefully. #[arg(short, long, verbatim_doc_comment)] roll_back_date: Option, } #[derive(Debug, Deserialize)] struct Config { db: DbConfig, metrics_port: u16, lox_authority_port: u16, test_port: u16, troll_patrol_port: u16, bridge_config: BridgeConfig, rtype: ResourceInfo, } // Path of the lox database #[derive(Debug, Deserialize)] pub struct DbConfig { // The path for the lox_context database, default is "lox_db" db_path: String, } impl Default for DbConfig { fn default() -> DbConfig { DbConfig { db_path: "lox_db".to_owned(), } } } // Config information for how bridges should be allocated to buckets #[derive(Debug, Default, Deserialize)] pub struct BridgeConfig { // A list of regions (as ISO 3166 country codes) that Lox will monitor resources for. // Any region indicated here that is listed in the `blocked_in` field of a resource will be marked as // blocked by Lox's bridge authority. watched_blockages: Vec, // The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges) // that should be allocated as spare buckets // This will be calculated as the floor of buckets.len() * percent_spares / 100 percent_spares: i32, } #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, name: String, token: String, types: Vec, } // Populate Bridgedb from rdsys // Rdsys sender creates a Resource request with the api_endpoint, resource token and type specified // in the config.json file. async fn rdsys_request_creator( rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>, ) { tokio::select! { start_resource_request = rdsys_request(rtype, tx) => start_resource_request, _ = kill.recv() => {println!("Shut down rdsys request loop")}, } } // Makes a request to rdsys for the full set of Resources assigned to lox every interval // (defined in the function) async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender) { let mut interval = interval(Duration::from_secs(120)); loop { interval.tick().await; let resources = match request_resources( rtype.endpoint.clone(), rtype.name.clone(), rtype.token.clone(), rtype.types.clone(), ) .await { Ok(resources) => resources, Err(e) => { println!("No resources received from rdsys: {:?}", e); continue; } }; tx.send(resources).await.unwrap(); } } // Parse bridges received from rdsys and sync with Lox context async fn rdsys_bridge_parser( rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { start_bridge_parser = parse_bridges(rdsys_tx, rx) => start_bridge_parser , _ = kill.recv() => {println!("Shut down bridge_parser");}, } } // Parse Bridges receives the resources from rdsys and sends it to the // Context Manager to be parsed and added to the Lox BridgeDB async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resources = rx.recv().await.unwrap(); let cmd = Command::Rdsys { resources }; rdsys_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; } } // Create a prometheus metrics server async fn start_metrics_collector( metrics_addr: SocketAddr, registry: Registry, mut kill: broadcast::Receiver<()>, ) { tokio::select! { lox_metrics = metrics::start_metrics_server(metrics_addr, registry) => lox_metrics, _ = kill.recv() => {println!("Shut down metrics server");}, } } async fn create_context_manager( db_config: DbConfig, bridge_config: BridgeConfig, roll_back_date: Option, Htables: HashMap, metrics: Metrics, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { create_context = context_manager(db_config, bridge_config, roll_back_date, Htables, metrics, context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager");}, } } // Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state async fn context_manager( db_config: DbConfig, bridge_config: BridgeConfig, roll_back_date: Option, mut Htables: HashMap, metrics: Metrics, mut context_rx: mpsc::Receiver, ) { let (mut lox_db, context) = match DB::open_new_or_existing_db(db_config, roll_back_date, metrics) { Ok((lox_db, context)) => (lox_db, context), Err(e) => { panic!("Error: {:?}", e); } }; while let Some(cmd) = context_rx.recv().await { use Command::*; match cmd { Rdsys { resources } => { // If the bridgetable is not being loaded from an existing database, we will populate the // bridgetable with all of the working bridges received from rdsys. if context.bridgetable_is_empty() { if let Some(working_resources) = resources.working { let (bridgelines, _) = parse_into_bridgelines( bridge_config.watched_blockages.clone(), working_resources, ); context.metrics.new_bridges.inc_by(bridgelines.len() as u64); let (buckets, leftovers) = parse_into_buckets(bridgelines); for leftover in leftovers { context.append_extra_bridges(leftover); } context.populate_bridgetable(buckets, bridge_config.percent_spares); // otherwise, we need to sync the existing bridgetable with the resources we receive from // rdsys and ensure that all functioning bridges are correctly placed in the bridgetable // those that have changed are updated and those that have been failing tests for an extended // period of time are removed. // If bridges are labelled as blocked_in, we should also handle blocking behaviour. } } else { context .sync_with_bridgetable(bridge_config.watched_blockages.clone(), resources); } // Handle any bridges that are leftover in the bridge authority from the sync context.allocate_leftover_bridges(); context.encrypt_table(); lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } Request { req, sender } => { let response = handle(context.clone(), req).await; if let Err(e) = sender.send(response) { eprintln!("Server Response Error: {:?}", e); }; lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } TestRequest { req, sender } => { let response = test_handle(context.clone(), req).await; if let Err(e) = sender.send(response) { eprintln!("Server Response Error: {:?}", e); }; lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } TpRequest { req, sender } => { let response = tp_handle(context.clone(), &mut Htables, req).await; if let Err(e) = sender.send(response) { eprintln!("Server Response Error: {:?}", e); }; lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } Shutdown { shutdown_sig } => { lox_db.write_context(context.clone()); println!("Sending Shutdown Signal, all threads should shutdown."); drop(shutdown_sig); println!("Shutdown Sent."); } } } } // Each of the commands that the Context Manager handles #[derive(Debug)] enum Command { Rdsys { resources: ResourceState, }, Request { req: Request, sender: oneshot::Sender, Infallible>>, }, TestRequest { req: Request, sender: oneshot::Sender, Infallible>>, }, TpRequest { req: Request, sender: oneshot::Sender, Infallible>>, }, Shutdown { shutdown_sig: broadcast::Sender<()>, }, } #[tokio::main] async fn main() { let args: Args = Args::parse(); let file = File::open(&args.config).expect("Could not read config file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo let config: Config = serde_json::from_reader(reader).expect("Reading Config from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); let test_request_tx = rdsys_tx.clone(); let tp_request_tx = rdsys_tx.clone(); let shutdown_cmd_tx = rdsys_tx.clone(); // create the shutdown broadcast channel and clone for every thread let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); let kill_stream = shutdown_tx.subscribe(); let kill_metrics = shutdown_tx.subscribe(); let kill_parser = shutdown_tx.subscribe(); let kill_context = shutdown_tx.subscribe(); // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx let shutdown_handler = spawn(async move { tokio::select! { _ = signal::ctrl_c() => { let cmd = Command::Shutdown { shutdown_sig: shutdown_tx, }; shutdown_cmd_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; _ = shutdown_rx.recv().await; } } }); let metrics = Metrics::default(); let registry = metrics.register(); let metrics_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.metrics_port); let metrics_handler = spawn(async move { start_metrics_collector(metrics_addr, registry, kill_metrics).await }); // Store a basepoint table for each day's H for TP positive reports. // RistrettoBasepointTables are not serializable, so don't save these to disk. // Just keep a map during each run and recompute when we restart. let Htables = HashMap::::new(); let context_manager = spawn(async move { create_context_manager( config.db, config.bridge_config, args.roll_back_date, Htables, metrics, context_rx, kill_context, ) .await }); let (tx, rx) = mpsc::channel(32); let rdsys_request_handler = spawn(async { rdsys_request_creator(config.rtype, tx, kill_stream).await }); let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await }); let make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = request_tx.clone(); let service = service_fn(move |req| { let request_tx = request_tx.clone(); let (response_tx, response_rx) = oneshot::channel(); let cmd = Command::Request { req, sender: response_tx, }; async move { request_tx.send(cmd).await.unwrap(); response_rx.await.unwrap() } }); async move { Ok::<_, Infallible>(service) } }); let test_make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = test_request_tx.clone(); let service = service_fn(move |req| { let request_tx = request_tx.clone(); let (response_tx, response_rx) = oneshot::channel(); let cmd = Command::TestRequest { req, sender: response_tx, }; async move { request_tx.send(cmd).await.unwrap(); response_rx.await.unwrap() } }); async move { Ok::<_, Infallible>(service) } }); let tp_make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = tp_request_tx.clone(); let service = service_fn(move |req| { let request_tx = request_tx.clone(); let (response_tx, response_rx) = oneshot::channel(); let cmd = Command::TpRequest { req, sender: response_tx, }; async move { request_tx.send(cmd).await.unwrap(); response_rx.await.unwrap() } }); async move { Ok::<_, Infallible>(service) } }); // Public address let pub_addr = SocketAddr::from(([127, 0, 0, 1], config.lox_authority_port)); let server = Server::bind(&pub_addr).serve(make_service); let graceful = server.with_graceful_shutdown(shutdown_signal()); // Address for test commands let test_addr = SocketAddr::from(([127, 0, 0, 1], config.test_port)); let test_server = Server::bind(&test_addr).serve(test_make_service); let test_graceful = test_server.with_graceful_shutdown(shutdown_signal()); // Address for connections from Troll Patrol let tp_addr = SocketAddr::from(([127, 0, 0, 1], config.troll_patrol_port)); let tp_server = Server::bind(&tp_addr).serve(tp_make_service); let tp_graceful = tp_server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", pub_addr); println!("Listening on {}", test_addr); println!("Listening on {}", tp_addr); let (a, b, c) = join!(graceful, test_graceful, tp_graceful); if a.is_err() { eprintln!("server error: {}", a.unwrap_err()); } if b.is_err() { eprintln!("server error: {}", b.unwrap_err()); } if c.is_err() { eprintln!("server error: {}", c.unwrap_err()); } future::join_all([ metrics_handler, rdsys_request_handler, rdsys_resource_receiver, context_manager, shutdown_handler, ]) .await; }