Add watched_blockages to config file and improve naming

This commit is contained in:
onyinyang 2023-11-01 13:53:58 -04:00
parent ec7ba4e343
commit cb159405a3
7 changed files with 79 additions and 46 deletions

View File

@ -5,6 +5,9 @@
},
"metrics_port": 5222,
"bridge_config": {
"watched_blockages": [
"RU"
],
"percent_spares": 50
},
"rtype": {

View File

@ -57,9 +57,14 @@ impl LoxServerContext {
}
}
pub fn handle_working_resources(&self, working_resources: Vec<Resource>) -> Vec<u64> {
pub fn handle_working_resources(
&self,
watched_blockages: Vec<String>,
working_resources: Vec<Resource>,
) -> Vec<u64> {
let mut accounted_for_bridges: Vec<u64> = Vec::new();
let (bridgelines, blocked_bridgelines) = parse_into_bridgelines(working_resources);
let (bridgelines, blocked_bridgelines) =
parse_into_bridgelines(watched_blockages, working_resources);
for bridge in blocked_bridgelines {
let res = self.add_unreachable(bridge);
if res {
@ -97,10 +102,12 @@ impl LoxServerContext {
// otherwise they are replaced with new bridges
pub fn handle_not_working_resources(
&self,
watched_blockages: Vec<String>,
not_working_resources: Vec<Resource>,
mut accounted_for_bridges: Vec<u64>,
) -> Vec<u64> {
let (grace_period, failing, blocked) = sort_for_parsing(not_working_resources);
let (grace_period, failing, blocked) =
sort_for_parsing(watched_blockages, not_working_resources);
for bridge in blocked {
let res = self.add_unreachable(bridge);
if res {
@ -162,18 +169,22 @@ impl LoxServerContext {
}
// Sync resources received from rdsys with the Lox bridgetable
pub fn sync_with_bridgetable(&self, resources: ResourceState) {
pub fn sync_with_bridgetable(&self, watched_blockages: Vec<String>, resources: ResourceState) {
// Check if each resource is already in the Lox bridgetable. If it is, it's probably fine
// to replace the existing resource with the incoming one to account for changes
// save a list of accounted for bridges and deal with the unaccounted for bridges at the end
let mut accounted_for_bridges: Vec<u64> = Vec::new();
// ensure all working resources are updated and accounted for
if let Some(working_resources) = resources.working {
accounted_for_bridges = self.handle_working_resources(working_resources);
accounted_for_bridges =
self.handle_working_resources(watched_blockages.clone(), working_resources);
}
if let Some(not_working_resources) = resources.not_working {
accounted_for_bridges =
self.handle_not_working_resources(not_working_resources, accounted_for_bridges);
accounted_for_bridges = self.handle_not_working_resources(
watched_blockages,
not_working_resources,
accounted_for_bridges,
);
}
let mut ba_clone = self.ba.lock().unwrap();
let total_reachable = ba_clone.bridge_table.reachable.len();
@ -308,7 +319,7 @@ impl LoxServerContext {
pub fn mark_blocked(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
ba_obj.bridge_blocked(&bridgeline, &mut db_obj)
}
// Find the bridgeline in the Lox bridge table that matches the fingerprint

View File

@ -85,6 +85,10 @@ impl Default for DbConfig {
// Config information for how bridges should be allocated to buckets
#[derive(Debug, Default, Deserialize)]
pub struct BridgeConfig {
// A list of regions (as ISO 3166 country codes) that Lox will monitor resources for.
// Any region indicated here that is listed in the `blocked_in` field of a resource will be marked as
// blocked by Lox's bridge authority.
watched_blockages: Vec<String>,
// The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges)
// that should be allocated as spare buckets
// This will be calculated as the floor of buckets.len() * percent_spares / 100
@ -207,7 +211,10 @@ async fn context_manager(
// bridgetable with all of the working bridges received from rdsys.
if context.bridgetable_is_empty() {
if let Some(working_resources) = resources.working {
let (bridgelines, _) = parse_into_bridgelines(working_resources);
let (bridgelines, _) = parse_into_bridgelines(
bridge_config.watched_blockages.clone(),
working_resources,
);
context.metrics.new_bridges.inc_by(bridgelines.len() as u64);
let (buckets, leftovers) = parse_into_buckets(bridgelines);
for leftover in leftovers {
@ -222,7 +229,8 @@ async fn context_manager(
// If bridges are labelled as blocked_in, we should also handle blocking behaviour.
}
} else {
context.sync_with_bridgetable(resources);
context
.sync_with_bridgetable(bridge_config.watched_blockages.clone(), resources);
}
// Handle any bridges that are leftover in the bridge authority from the sync
context.allocate_leftover_bridges();

View File

@ -253,8 +253,8 @@ mod tests {
.unwrap();
assert!(bucket.1.is_some());
// Block two of our bridges
lox_auth.bridge_unreachable(&bucket.0[0], &mut bdb);
lox_auth.bridge_unreachable(&bucket.0[2], &mut bdb);
lox_auth.bridge_blocked(&bucket.0[0], &mut bdb);
lox_auth.bridge_blocked(&bucket.0[2], &mut bdb);
(cred, id, key)
}

View File

@ -1,11 +1,18 @@
use std::process::exit;
use chrono::{Duration, Utc};
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET};
use rdsys_backend::proto::Resource;
pub const ACCEPTED_HOURS_OF_FAILURE: i64 = 3;
// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable
pub fn parse_into_bridgelines(resources: Vec<Resource>) -> (Vec<BridgeLine>, Vec<BridgeLine>) {
// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable and return
// Bridgelines as two vectors, those that are marked as blocked in a specified region (indicated in the config file)
// and those that are not blocked.
pub fn parse_into_bridgelines(
watched_blockages: Vec<String>,
resources: Vec<Resource>,
) -> (Vec<BridgeLine>, Vec<BridgeLine>) {
let mut bridgelines: Vec<BridgeLine> = Vec::new();
let mut blocked_bridgelines: Vec<BridgeLine> = Vec::new();
for resource in resources {
@ -15,34 +22,33 @@ pub fn parse_into_bridgelines(resources: Vec<Resource>) -> (Vec<BridgeLine>, Vec
.get_uid()
.expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
"type={} protocol={} fingerprint={:?} or_addresses={:?} flags={:?} params={:?}",
resource.r#type,
resource.protocol,
resource.fingerprint,
resource.or_addresses,
resource.distribution,
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
if let Some(blockage) = resource.blocked_in.get("RU") {
if *blockage {
blocked_bridgelines.push(BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
});
} else {
bridgelines.push(BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
});
let mut blocked = false;
for watched_blockage in watched_blockages.clone() {
if let Some(blockage) = resource.blocked_in.get(&watched_blockage) {
if *blockage {
blocked = true;
break;
}
}
}
if blocked {
blocked_bridgelines.push(BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
});
} else {
bridgelines.push(BridgeLine {
addr: ip_bytes,
@ -91,10 +97,11 @@ pub fn parse_into_buckets(
(buckets, leftovers)
}
// Sort Resources into those that are functional and those that are failing based on the last time
// they were passing tests. Before passing them back to the calling function, they are parsed into
// BridgeLines
// Sort Resources into those that are functional, those that are failing based on the last time
// they were passing tests, and those that are blocked in the region(s) specified in the config file.
// Before passing them back to the calling function, they are parsed into BridgeLines
pub fn sort_for_parsing(
watched_blockages: Vec<String>,
resources: Vec<Resource>,
) -> (Vec<BridgeLine>, Vec<BridgeLine>, Vec<BridgeLine>) {
let mut grace_period: Vec<Resource> = Vec::new();
@ -111,8 +118,10 @@ pub fn sort_for_parsing(
failing.push(resource);
}
}
let (grace_period_bridgelines, mut grace_period_blocked) = parse_into_bridgelines(grace_period);
let (failing_bridgelines, mut failing_blocked) = parse_into_bridgelines(failing);
let (grace_period_bridgelines, mut grace_period_blocked) =
parse_into_bridgelines(watched_blockages.clone(), grace_period);
let (failing_bridgelines, mut failing_blocked) =
parse_into_bridgelines(watched_blockages, failing);
blocked.append(&mut grace_period_blocked);
blocked.append(&mut failing_blocked);
@ -270,7 +279,9 @@ mod tests {
test_vec.push(resource_six);
test_vec.push(resource_seven);
println!("How many in test? {:?}", test_vec.len());
let (functional, failing, blocked) = sort_for_parsing(test_vec);
let mut watched_blockages: Vec<String> = Vec::new();
watched_blockages.push("RU".to_string());
let (functional, failing, blocked) = sort_for_parsing(watched_blockages, test_vec);
assert!(
functional.len() == 2,
"There should be 2 functional bridges"

View File

@ -604,7 +604,7 @@ impl BridgeAuth {
res
}
/// Mark a bridge as unreachable
/// Mark a bridge as blocked
///
/// This bridge will be removed from each of the buckets that
/// contains it. If any of those are open-invitation buckets, the
@ -618,7 +618,7 @@ impl BridgeAuth {
/// migration, change the target to the new (formerly spare) bucket.
/// Returns true if sucessful, or false if it needed a hot spare but
/// there was none available.
pub fn bridge_unreachable(&mut self, bridge: &BridgeLine, bdb: &mut BridgeDb) -> bool {
pub fn bridge_blocked(&mut self, bridge: &BridgeLine, bdb: &mut BridgeDb) -> bool {
let mut res: bool = true;
if self.bridge_table.unallocated_bridges.contains(bridge) {
let index = self

View File

@ -888,7 +888,7 @@ fn block_bridges(th: &mut TestHarness, to_block: usize) {
let ba_clone = th.ba.bridge_table.buckets.clone();
if let Some(bridgelines) = ba_clone.get(&u32::try_from(index).unwrap()) {
for bridgeline in bridgelines {
th.ba.bridge_unreachable(bridgeline, &mut th.bdb);
th.ba.bridge_blocked(bridgeline, &mut th.bdb);
}
}
}
@ -1229,7 +1229,7 @@ fn test_mark_unreachable() {
// Mark a bridge in an untrusted bucket as unreachable
let bucket6 = th.ba.bridge_table.buckets.get(&6u32).unwrap();
let b6 = bucket6[0];
th.ba.bridge_unreachable(&b6, &mut th.bdb);
th.ba.bridge_blocked(&b6, &mut th.bdb);
println!("spares = {:?}", th.ba.bridge_table.spares);
println!("tmig = {:?}", th.ba.trustup_migration_table.table);
@ -1240,7 +1240,7 @@ fn test_mark_unreachable() {
// unreachable
let bucket7 = th.ba.bridge_table.buckets.get(&7u32).unwrap();
let b7 = bucket7[0];
th.ba.bridge_unreachable(&b7, &mut th.bdb);
th.ba.bridge_blocked(&b7, &mut th.bdb);
println!("spares = {:?}", th.ba.bridge_table.spares);
println!("tmig = {:?}", th.ba.trustup_migration_table.table);
@ -1262,8 +1262,8 @@ fn test_mark_unreachable() {
let bt1 = bucket1[1];
let bucket2 = th.ba.bridge_table.buckets.get(&target).unwrap();
let bt2 = bucket2[2];
th.ba.bridge_unreachable(&bt1, &mut th.bdb);
th.ba.bridge_unreachable(&bt2, &mut th.bdb);
th.ba.bridge_blocked(&bt1, &mut th.bdb);
th.ba.bridge_blocked(&bt2, &mut th.bdb);
println!("spares = {:?}", th.ba.bridge_table.spares);
println!("tmig = {:?}", th.ba.trustup_migration_table.table);
@ -1313,8 +1313,8 @@ fn test_blockage_migration() {
assert!(bucket.1.is_some());
// Oh, no! Two of our bridges are blocked!
th.ba.bridge_unreachable(&bucket.0[0], &mut th.bdb);
th.ba.bridge_unreachable(&bucket.0[2], &mut th.bdb);
th.ba.bridge_blocked(&bucket.0[0], &mut th.bdb);
th.ba.bridge_blocked(&bucket.0[2], &mut th.bdb);
println!("spares = {:?}", th.ba.bridge_table.spares);
println!("tmig = {:?}", th.ba.trustup_migration_table.table);