diff --git a/crates/lox-distributor/README.md b/crates/lox-distributor/README.md index 329e769..884f417 100644 --- a/crates/lox-distributor/README.md +++ b/crates/lox-distributor/README.md @@ -2,14 +2,80 @@ The Lox distributor receives resources from [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) and writes them to [Lox BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). It saves the [LoxContext](https://gitlab.torproject.org/tpo/anti-censorship/lox-rs/-/blob/main/crates/lox-distributor/src/lox_context.rs) to a database every time the Lox bridgetable is updated and before the distributor is shutdown. -## Configure rdsys stream -A test `config.json` is included for testing on a local instance of rdsys. This -can be edited to correspond to the desired types of resources, endpoints and database configuration. + +## Configuration + +A test `config.json` is included for testing on a local instance of rdsys. There are several configurable +fields in this config file: + +### DB Config + +The DB config `db` accepts a `db_path` where the Lox distributor will look for or create a new Lox database as follows: + +``` +"db": { + "db_path": "path/to/db" +} +``` + +### Rdsys Config + +The rdsys request `rtype` has the following fields: + + `endpoint` the endpoint of the rdsys instance that the distributor will make requests to, + + `name` the type of distributor we are requesting. In most cases this should be `lox`, + + `token` the corresponding Api Token, + + `types` the type of bridges that are being accepted. + + Example configuration: + ``` + "rtype": { + "endpoint": "http://127.0.0.1:7100/resources", + "name": "lox", + "token": "LoxApiTokenPlaceholder", + "types": [ + "obfs2", + "scramblesuit" + ] + } + ``` + +### Bridge Config + +The Bridge config, `bridge_config` has the following fields: + +`watched_blockages` lists the regions (as ISO 3166 country codes) that Lox will monitor for listed blockages + +`percent_spares` is the percentage of buckets that should be allocated as hot spares (as opposed to open invitation buckets) + +Example configuration: +``` + "bridge_config": { + "watched_blockages": [ + "RU" + ], + "percent_spares": 50 + }, +``` +### Metrics Port + +The `metrics_port` field is the port that the prometheus server will run on. + +### Command Line Arguments for Advanced Database Config + +There are a few configurations for the Lox database that can be passed as arguments at run time since they are not likely to be suitable as persistent configuration options. + +Rolling back to a previous version of the database is possible by passing the +`roll_back_date` flag at runtime and providing the date/time as a `%Y-%m-%d_%H:%M:%S` string. This argument should be passed if the `LoxContext` should be rolled back to a previous state due to, for example, a mass blocking event that is likely not due to Lox user behaviour. If the exact roll back date/time is not known, the last db entry within 24 hours from the passed `roll_back_date` will be used or else the program will fail gracefully. + ## Test Run -For testing purposes, you will need a running instance of rdsys as well as a running Lox client. +For testing purposes, you will need a running instance of [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) as well as a running Lox client. ### Run rdsys locally @@ -25,13 +91,6 @@ Finally run rdsys: ./backend --config config.json ``` -## Database Config - -The database has a few configuration options. The path for where the database -should be read/written can be specified in the `config.json`. Rolling back to a -previous version of the database is also possible by passing the -`roll_back_date` flag at runtime and providing the date/time as a `%Y-%m-%d_%H:%M:%S` string. This argument should be passed if the `LoxContext` should be rolled back to a previous state due to, for example, a mass blocking event that is likely not due to Lox user behaviour. If the exact roll back date/time is not known, the last db entry within 24 hours from the passed `roll_back_date` will be used or else the program will fail gracefully. - ### Run Lox Distributor locally Simply run `cargo run -- config.json` :) diff --git a/crates/lox-distributor/src/db_handler.rs b/crates/lox-distributor/src/db_handler.rs index acf0cfe..85c5be3 100644 --- a/crates/lox-distributor/src/db_handler.rs +++ b/crates/lox-distributor/src/db_handler.rs @@ -6,11 +6,14 @@ use chrono::{naive::Days, DateTime, Local, NaiveDateTime, Utc}; use lox_library::{BridgeAuth, BridgeDb}; use sled::IVec; +// Database of Lox Distributor State pub struct DB { db: sled::Db, } impl DB { + // Writes the Lox context to the lox database with "context_%Y-%m-%d_%H:%M:%S" as the + // database key pub fn write_context(&mut self, context: lox_context::LoxServerContext) { let date = Local::now().format("context_%Y-%m-%d_%H:%M:%S").to_string(); let json_result = serde_json::to_vec(&context).unwrap(); @@ -28,6 +31,9 @@ impl DB { ); } + // If roll_back_date is empty, opens the most recent entry in the lox database or if none exists, creates a + // new database. If roll_back_date is not empty, use the specified date to roll back to a previous lox-context + // either exactly the entry at the roll_back_date or within 24 hours from thhe roll_back_date. pub fn open_new_or_existing_db( db_config: DbConfig, roll_back_date: Option, @@ -62,6 +68,7 @@ impl DB { } } +// Logic for finding the correct context to open from the database fn read_lox_context_from_db( lox_db: sled::Db, roll_back_date: Option, @@ -117,6 +124,7 @@ fn compute_startdate_string(date_range_end: String) -> Option> { dt.with_timezone(&Utc).checked_sub_days(Days::new(1)) } +// Use the last context that was entered into the database fn use_last_context(lox_db: sled::Db) -> lox_context::LoxServerContext { let ivec_context = lox_db.last().unwrap().unwrap(); let ivec_date: String = String::from_utf8(ivec_context.0.to_vec()).unwrap(); diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index 744ace1..011f94d 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -96,6 +96,10 @@ impl LoxServerContext { accounted_for_bridges } + // When syncing resources with rdsys, handle the non-working resources + // Those that are blocked in the target region are marked as unreachable/blocked + // All others are matched by fingerprint and if they are still in the grace period, they are updated + // otherwise they are replaced with new bridges pub fn handle_not_working_resources( &self, not_working_resources: Vec, @@ -239,6 +243,9 @@ impl LoxServerContext { to_be_replaced_bridges.push(bridge); } + // Add extra_bridges to the Lox bridge table as open invitation bridges + // TODO: Add some consideration for whether or not bridges should be sorted as + // open invitation buckets or hot spare buckets pub fn allocate_leftover_bridges(&self) { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); @@ -246,6 +253,7 @@ impl LoxServerContext { ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj); } + // Add an open invitation bucket to the Lox db pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); @@ -260,6 +268,7 @@ impl LoxServerContext { } } + // Add a hot spare bucket to the Lox db pub fn add_spare_bucket(&self, bucket: [BridgeLine; 3]) { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); @@ -296,7 +305,17 @@ impl LoxServerContext { ba_obj.bridge_unreachable(&bridgeline, &mut db_obj) } */ + // Mark bridges as blocked + pub fn mark_blocked(&self, bridgeline: BridgeLine) -> bool { + let mut ba_obj = self.ba.lock().unwrap(); + let mut db_obj = self.db.lock().unwrap(); + ba_obj.bridge_unreachable(&bridgeline, &mut db_obj) + } + // Find the bridgeline in the Lox bridge table that matches the fingerprint + // of the bridgeline passed by argument. Once found, replace it with the bridgeline + // passed by argument to ensure all fields besides the fingerprint are updated + // appropriately. pub fn update_bridge(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.bridge_update(&bridgeline) @@ -311,11 +330,13 @@ impl LoxServerContext { println!("Today's date according to server: {}", ba_obj.today()); } + // Encrypts the Lox bridge table, should be called after every sync pub fn encrypt_table(&self) -> HashMap { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.enc_bridge_table().clone() } + // Returns a vector of the Lox Authority's public keys fn pubkeys(&self) -> Vec { let ba_obj = self.ba.lock().unwrap(); // vector of public keys (to serialize) @@ -328,6 +349,8 @@ impl LoxServerContext { ] } + // Generates a Lox invitation if fewer than MAX_BRIDGES_PER_DAY have been + // requested on a given day fn gen_invite(&self) -> Result { let mut obj = self.db.lock().unwrap(); match obj.invite() { @@ -341,11 +364,13 @@ impl LoxServerContext { } } + // Returns a valid open_invite::Response if the open_invite::Request is valid fn open_inv(&self, req: open_invite::Request) -> Result { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_open_invite(req) } + // Returns a valid trust_promotion:: Response if the trust_promotion::Request is valid fn trust_promo( &self, req: trust_promotion::Request, @@ -354,16 +379,19 @@ impl LoxServerContext { ba_obj.handle_trust_promotion(req) } + // Returns a valid trust_migration::Response if the trust_migration::Request is valid fn trust_migration(&self, req: migration::Request) -> Result { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_migration(req) } + // Returns a valid level_up:: Response if the level_up::Request is valid fn level_up(&self, req: level_up::Request) -> Result { let mut ba_obj = self.ba.lock().unwrap(); ba_obj.handle_level_up(req) } + // Returns a valid issue_invite::Response if the issue_invite::Request is valid fn issue_invite( &self, req: issue_invite::Request, @@ -372,6 +400,7 @@ impl LoxServerContext { ba_obj.handle_issue_invite(req) } + // Returns a valid redeem_invite::Response if the redeem_invite::Request is valid fn redeem_invite( &self, req: redeem_invite::Request, @@ -380,6 +409,7 @@ impl LoxServerContext { ba_obj.handle_redeem_invite(req) } + // Returns a valid check_blockage::Response if the check_blockage::Request is valid fn check_blockage( &self, req: check_blockage::Request, @@ -388,6 +418,7 @@ impl LoxServerContext { ba_obj.handle_check_blockage(req) } + // Returns a valid blockage_migration::Response if the blockage_migration::Request is valid fn blockage_migration( &self, req: blockage_migration::Request, @@ -396,7 +427,7 @@ impl LoxServerContext { ba_obj.handle_blockage_migration(req) } - // Generate and return an open invitation token + // Generate and return an open invitation token as an HTTP response pub fn generate_invite(self) -> Response { self.metrics.invites_requested.inc(); let invite = self.gen_invite(); @@ -415,7 +446,7 @@ impl LoxServerContext { } } - // Return the serialized encrypted bridge table + // Return the serialized encrypted bridge table as an HTTP response pub fn send_reachability_cred(self) -> Response { let enc_table = self.encrypt_table(); let etable = lox_utils::EncBridgeTable { etable: enc_table }; @@ -428,7 +459,7 @@ impl LoxServerContext { } } - // Return the serialized pubkeys for the Bridge Authority + // Return the serialized pubkeys for the Bridge Authority as an HTTP response pub fn send_keys(self) -> Response { let pubkeys = self.pubkeys(); match serde_json::to_string(&pubkeys) { @@ -440,6 +471,7 @@ impl LoxServerContext { } } + // Verify the open invitation request and return the result as an HTTP response pub fn verify_and_send_open_cred(self, request: Bytes) -> Response { let req = match serde_json::from_slice(&request) { Ok(req) => req, @@ -458,6 +490,7 @@ impl LoxServerContext { } } + // Verify the trust promotion request and return the result as an HTTP response pub fn verify_and_send_trust_promo(self, request: Bytes) -> Response { let req: trust_promotion::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -476,6 +509,7 @@ impl LoxServerContext { } } + // Verify the trust migration request and return the result as an HTTP response pub fn verify_and_send_trust_migration(self, request: Bytes) -> Response { let req: migration::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -494,6 +528,7 @@ impl LoxServerContext { } } + // Verify the level up request and return the result as an HTTP response pub fn verify_and_send_level_up(self, request: Bytes) -> Response { let req: level_up::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -512,6 +547,7 @@ impl LoxServerContext { } } + // Verify the open invitation request and return the result as an HTTP response pub fn verify_and_send_issue_invite(self, request: Bytes) -> Response { let req: issue_invite::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -530,6 +566,7 @@ impl LoxServerContext { } } + // Verify the redeem invite request and return the result as an HTTP response pub fn verify_and_send_redeem_invite(self, request: Bytes) -> Response { let req: redeem_invite::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -548,6 +585,7 @@ impl LoxServerContext { } } + // Verify the check blockage request and return the result as an HTTP response pub fn verify_and_send_check_blockage(self, request: Bytes) -> Response { let req: check_blockage::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -566,6 +604,7 @@ impl LoxServerContext { } } + // Verify the blockage migration request and return the result as an HTTP response pub fn verify_and_send_blockage_migration(self, request: Bytes) -> Response { let req: blockage_migration::Request = match serde_json::from_slice(&request) { Ok(req) => req, @@ -585,6 +624,7 @@ impl LoxServerContext { } } +// Prepare HTTP Response for successful Server Request fn prepare_header(response: String) -> Response { let mut resp = Response::new(Body::from(response)); resp.headers_mut() @@ -592,6 +632,7 @@ fn prepare_header(response: String) -> Response { resp } +// Prepare HTTP Response for errored Server Request fn prepare_error_header(error: String) -> Response { Response::builder() .status(hyper::StatusCode::BAD_REQUEST) diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 290110a..c6a79df 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -100,9 +100,9 @@ struct ResourceInfo { } // Populate Bridgedb from rdsys -// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified +// Rdsys sender creates a Resource request with the api_endpoint, resource token and type specified // in the config.json file. -async fn rdsys_stream( +async fn rdsys_request_creator( rtype: ResourceInfo, tx: mpsc::Sender, mut kill: broadcast::Receiver<()>, @@ -114,6 +114,8 @@ async fn rdsys_stream( } } +// Makes a request to rdsys for the full set of Resources assigned to lox every interval +// (defined in the function) async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender) { let mut interval = interval(Duration::from_secs(5)); loop { @@ -130,6 +132,7 @@ async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender) { } } +// Parse bridges received from rdsys and sync with Lox context async fn rdsys_bridge_parser( rdsys_tx: mpsc::Sender, rx: mpsc::Receiver, @@ -152,6 +155,7 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, ) -> impl Fn(Request) -> Pin>> + Send>> {