diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index a6feb7e..dd96749 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -5,6 +5,9 @@ }, "metrics_port": 5222, "bridge_config": { + "watched_blockages": [ + "RU" + ], "percent_spares": 50 }, "rtype": { diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index a8ce538..9420692 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -57,9 +57,14 @@ impl LoxServerContext { } } - pub fn handle_working_resources(&self, working_resources: Vec) -> Vec { + pub fn handle_working_resources( + &self, + watched_blockages: Vec, + working_resources: Vec, + ) -> Vec { let mut accounted_for_bridges: Vec = Vec::new(); - let (bridgelines, blocked_bridgelines) = parse_into_bridgelines(working_resources); + let (bridgelines, blocked_bridgelines) = + parse_into_bridgelines(watched_blockages, working_resources); for bridge in blocked_bridgelines { let res = self.add_unreachable(bridge); if res { @@ -97,10 +102,12 @@ impl LoxServerContext { // otherwise they are replaced with new bridges pub fn handle_not_working_resources( &self, + watched_blockages: Vec, not_working_resources: Vec, mut accounted_for_bridges: Vec, ) -> Vec { - let (grace_period, failing, blocked) = sort_for_parsing(not_working_resources); + let (grace_period, failing, blocked) = + sort_for_parsing(watched_blockages, not_working_resources); for bridge in blocked { let res = self.add_unreachable(bridge); if res { @@ -162,18 +169,22 @@ impl LoxServerContext { } // Sync resources received from rdsys with the Lox bridgetable - pub fn sync_with_bridgetable(&self, resources: ResourceState) { + pub fn sync_with_bridgetable(&self, watched_blockages: Vec, resources: ResourceState) { // Check if each resource is already in the Lox bridgetable. If it is, it's probably fine // to replace the existing resource with the incoming one to account for changes // save a list of accounted for bridges and deal with the unaccounted for bridges at the end let mut accounted_for_bridges: Vec = Vec::new(); // ensure all working resources are updated and accounted for if let Some(working_resources) = resources.working { - accounted_for_bridges = self.handle_working_resources(working_resources); + accounted_for_bridges = + self.handle_working_resources(watched_blockages.clone(), working_resources); } if let Some(not_working_resources) = resources.not_working { - accounted_for_bridges = - self.handle_not_working_resources(not_working_resources, accounted_for_bridges); + accounted_for_bridges = self.handle_not_working_resources( + watched_blockages, + not_working_resources, + accounted_for_bridges, + ); } let mut ba_clone = self.ba.lock().unwrap(); let total_reachable = ba_clone.bridge_table.reachable.len(); @@ -308,7 +319,7 @@ impl LoxServerContext { pub fn mark_blocked(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); - ba_obj.bridge_unreachable(&bridgeline, &mut db_obj) + ba_obj.bridge_blocked(&bridgeline, &mut db_obj) } // Find the bridgeline in the Lox bridge table that matches the fingerprint diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index abb099e..8840481 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -85,6 +85,10 @@ impl Default for DbConfig { // Config information for how bridges should be allocated to buckets #[derive(Debug, Default, Deserialize)] pub struct BridgeConfig { + // A list of regions (as ISO 3166 country codes) that Lox will monitor resources for. + // Any region indicated here that is listed in the `blocked_in` field of a resource will be marked as + // blocked by Lox's bridge authority. + watched_blockages: Vec, // The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges) // that should be allocated as spare buckets // This will be calculated as the floor of buckets.len() * percent_spares / 100 @@ -207,7 +211,10 @@ async fn context_manager( // bridgetable with all of the working bridges received from rdsys. if context.bridgetable_is_empty() { if let Some(working_resources) = resources.working { - let (bridgelines, _) = parse_into_bridgelines(working_resources); + let (bridgelines, _) = parse_into_bridgelines( + bridge_config.watched_blockages.clone(), + working_resources, + ); context.metrics.new_bridges.inc_by(bridgelines.len() as u64); let (buckets, leftovers) = parse_into_buckets(bridgelines); for leftover in leftovers { @@ -222,7 +229,8 @@ async fn context_manager( // If bridges are labelled as blocked_in, we should also handle blocking behaviour. } } else { - context.sync_with_bridgetable(resources); + context + .sync_with_bridgetable(bridge_config.watched_blockages.clone(), resources); } // Handle any bridges that are leftover in the bridge authority from the sync context.allocate_leftover_bridges(); diff --git a/crates/lox-distributor/src/request_handler.rs b/crates/lox-distributor/src/request_handler.rs index 48ca6a8..3be045f 100644 --- a/crates/lox-distributor/src/request_handler.rs +++ b/crates/lox-distributor/src/request_handler.rs @@ -253,8 +253,8 @@ mod tests { .unwrap(); assert!(bucket.1.is_some()); // Block two of our bridges - lox_auth.bridge_unreachable(&bucket.0[0], &mut bdb); - lox_auth.bridge_unreachable(&bucket.0[2], &mut bdb); + lox_auth.bridge_blocked(&bucket.0[0], &mut bdb); + lox_auth.bridge_blocked(&bucket.0[2], &mut bdb); (cred, id, key) } diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs index 208aa57..9b79726 100644 --- a/crates/lox-distributor/src/resource_parser.rs +++ b/crates/lox-distributor/src/resource_parser.rs @@ -1,11 +1,18 @@ +use std::process::exit; + use chrono::{Duration, Utc}; use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET}; use rdsys_backend::proto::Resource; pub const ACCEPTED_HOURS_OF_FAILURE: i64 = 3; -// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable -pub fn parse_into_bridgelines(resources: Vec) -> (Vec, Vec) { +// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable and return +// Bridgelines as two vectors, those that are marked as blocked in a specified region (indicated in the config file) +// and those that are not blocked. +pub fn parse_into_bridgelines( + watched_blockages: Vec, + resources: Vec, +) -> (Vec, Vec) { let mut bridgelines: Vec = Vec::new(); let mut blocked_bridgelines: Vec = Vec::new(); for resource in resources { @@ -15,34 +22,33 @@ pub fn parse_into_bridgelines(resources: Vec) -> (Vec, Vec .get_uid() .expect("Unable to get Fingerprint UID of resource"); let infostr: String = format!( - "type={} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}", + "type={} protocol={} fingerprint={:?} or_addresses={:?} flags={:?} params={:?}", resource.r#type, resource.protocol, resource.fingerprint, resource.or_addresses, - resource.distribution, resource.flags, resource.params, ); let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26]; info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes()); - if let Some(blockage) = resource.blocked_in.get("RU") { - if *blockage { - blocked_bridgelines.push(BridgeLine { - addr: ip_bytes, - port: resource.port, - uid_fingerprint: resource_uid, - info: info_bytes, - }); - } else { - bridgelines.push(BridgeLine { - addr: ip_bytes, - port: resource.port, - uid_fingerprint: resource_uid, - info: info_bytes, - }); + let mut blocked = false; + for watched_blockage in watched_blockages.clone() { + if let Some(blockage) = resource.blocked_in.get(&watched_blockage) { + if *blockage { + blocked = true; + break; + } } + } + if blocked { + blocked_bridgelines.push(BridgeLine { + addr: ip_bytes, + port: resource.port, + uid_fingerprint: resource_uid, + info: info_bytes, + }); } else { bridgelines.push(BridgeLine { addr: ip_bytes, @@ -91,10 +97,11 @@ pub fn parse_into_buckets( (buckets, leftovers) } -// Sort Resources into those that are functional and those that are failing based on the last time -// they were passing tests. Before passing them back to the calling function, they are parsed into -// BridgeLines +// Sort Resources into those that are functional, those that are failing based on the last time +// they were passing tests, and those that are blocked in the region(s) specified in the config file. +// Before passing them back to the calling function, they are parsed into BridgeLines pub fn sort_for_parsing( + watched_blockages: Vec, resources: Vec, ) -> (Vec, Vec, Vec) { let mut grace_period: Vec = Vec::new(); @@ -111,8 +118,10 @@ pub fn sort_for_parsing( failing.push(resource); } } - let (grace_period_bridgelines, mut grace_period_blocked) = parse_into_bridgelines(grace_period); - let (failing_bridgelines, mut failing_blocked) = parse_into_bridgelines(failing); + let (grace_period_bridgelines, mut grace_period_blocked) = + parse_into_bridgelines(watched_blockages.clone(), grace_period); + let (failing_bridgelines, mut failing_blocked) = + parse_into_bridgelines(watched_blockages, failing); blocked.append(&mut grace_period_blocked); blocked.append(&mut failing_blocked); @@ -270,7 +279,9 @@ mod tests { test_vec.push(resource_six); test_vec.push(resource_seven); println!("How many in test? {:?}", test_vec.len()); - let (functional, failing, blocked) = sort_for_parsing(test_vec); + let mut watched_blockages: Vec = Vec::new(); + watched_blockages.push("RU".to_string()); + let (functional, failing, blocked) = sort_for_parsing(watched_blockages, test_vec); assert!( functional.len() == 2, "There should be 2 functional bridges" diff --git a/crates/lox-library/src/lib.rs b/crates/lox-library/src/lib.rs index cbbae94..3afd928 100644 --- a/crates/lox-library/src/lib.rs +++ b/crates/lox-library/src/lib.rs @@ -604,7 +604,7 @@ impl BridgeAuth { res } - /// Mark a bridge as unreachable + /// Mark a bridge as blocked /// /// This bridge will be removed from each of the buckets that /// contains it. If any of those are open-invitation buckets, the @@ -618,7 +618,7 @@ impl BridgeAuth { /// migration, change the target to the new (formerly spare) bucket. /// Returns true if sucessful, or false if it needed a hot spare but /// there was none available. - pub fn bridge_unreachable(&mut self, bridge: &BridgeLine, bdb: &mut BridgeDb) -> bool { + pub fn bridge_blocked(&mut self, bridge: &BridgeLine, bdb: &mut BridgeDb) -> bool { let mut res: bool = true; if self.bridge_table.unallocated_bridges.contains(bridge) { let index = self diff --git a/crates/lox-library/src/tests.rs b/crates/lox-library/src/tests.rs index 3c47e84..bb2a986 100644 --- a/crates/lox-library/src/tests.rs +++ b/crates/lox-library/src/tests.rs @@ -888,7 +888,7 @@ fn block_bridges(th: &mut TestHarness, to_block: usize) { let ba_clone = th.ba.bridge_table.buckets.clone(); if let Some(bridgelines) = ba_clone.get(&u32::try_from(index).unwrap()) { for bridgeline in bridgelines { - th.ba.bridge_unreachable(bridgeline, &mut th.bdb); + th.ba.bridge_blocked(bridgeline, &mut th.bdb); } } } @@ -1229,7 +1229,7 @@ fn test_mark_unreachable() { // Mark a bridge in an untrusted bucket as unreachable let bucket6 = th.ba.bridge_table.buckets.get(&6u32).unwrap(); let b6 = bucket6[0]; - th.ba.bridge_unreachable(&b6, &mut th.bdb); + th.ba.bridge_blocked(&b6, &mut th.bdb); println!("spares = {:?}", th.ba.bridge_table.spares); println!("tmig = {:?}", th.ba.trustup_migration_table.table); @@ -1240,7 +1240,7 @@ fn test_mark_unreachable() { // unreachable let bucket7 = th.ba.bridge_table.buckets.get(&7u32).unwrap(); let b7 = bucket7[0]; - th.ba.bridge_unreachable(&b7, &mut th.bdb); + th.ba.bridge_blocked(&b7, &mut th.bdb); println!("spares = {:?}", th.ba.bridge_table.spares); println!("tmig = {:?}", th.ba.trustup_migration_table.table); @@ -1262,8 +1262,8 @@ fn test_mark_unreachable() { let bt1 = bucket1[1]; let bucket2 = th.ba.bridge_table.buckets.get(&target).unwrap(); let bt2 = bucket2[2]; - th.ba.bridge_unreachable(&bt1, &mut th.bdb); - th.ba.bridge_unreachable(&bt2, &mut th.bdb); + th.ba.bridge_blocked(&bt1, &mut th.bdb); + th.ba.bridge_blocked(&bt2, &mut th.bdb); println!("spares = {:?}", th.ba.bridge_table.spares); println!("tmig = {:?}", th.ba.trustup_migration_table.table); @@ -1313,8 +1313,8 @@ fn test_blockage_migration() { assert!(bucket.1.is_some()); // Oh, no! Two of our bridges are blocked! - th.ba.bridge_unreachable(&bucket.0[0], &mut th.bdb); - th.ba.bridge_unreachable(&bucket.0[2], &mut th.bdb); + th.ba.bridge_blocked(&bucket.0[0], &mut th.bdb); + th.ba.bridge_blocked(&bucket.0[2], &mut th.bdb); println!("spares = {:?}", th.ba.bridge_table.spares); println!("tmig = {:?}", th.ba.trustup_migration_table.table);