From 25add69c4d198db0eaf7744bfa74f5df53403e61 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Mon, 28 Aug 2023 19:33:19 -0400 Subject: [PATCH] Add syncing functionality --- crates/lox-distributor/config.json | 2 +- crates/lox-distributor/src/db_handler.rs | 2 +- crates/lox-distributor/src/lox_context.rs | 75 +++++++++ crates/lox-distributor/src/main.rs | 151 ++---------------- crates/lox-distributor/src/resource_parser.rs | 113 +++++++++++++ crates/lox-library/src/lib.rs | 8 +- 6 files changed, 200 insertions(+), 151 deletions(-) diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 6578722..06f6fdc 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -3,7 +3,7 @@ "db_path": "lox_db" }, - "bridge_allocation": { + "bridge_config": { "percent_spares": 50 }, "rtype": { diff --git a/crates/lox-distributor/src/db_handler.rs b/crates/lox-distributor/src/db_handler.rs index e732da6..149cb3d 100644 --- a/crates/lox-distributor/src/db_handler.rs +++ b/crates/lox-distributor/src/db_handler.rs @@ -7,7 +7,7 @@ use sled::IVec; pub struct DB { db: sled::Db, -} + } impl DB { pub fn write_context(&mut self, context: lox_context::LoxServerContext) { diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index 4b87166..adc49ca 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -50,6 +50,79 @@ impl LoxServerContext { } } + /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not + yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not + currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something + like the following: + println!("BridgeLine to be removed: {:?}", bridgeline); + let res = context.add_unreachable(bridgeline); + if res { + println!( + "BridgeLine successfully marked unreachable: {:?}", + bridgeline + ); + } else { + println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); + //TODO probably do something else here + } + */ + + // Sync resources received from rdsys with the Lox bridgetable + pub fn sync_with_bridgetable(&self, functional: Vec, failing: Vec) { + // Check if the resource is already in the Lox bridgetable. If it is, it's probably fine + // to replace the existing resource with the incoming one to account for changes, + // unless we want to track the number of changes on the lox side? + for bridge in functional { + let res = self.update_bridge(bridge); + if res { + println!( + "BridgeLine {:?} successfully updated.", + bridge.uid_fingerprint + ); + // Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later + } else { + println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint); + self.append_extra_bridges(bridge); + } + } + // Next, handle the failing bridges. If resource last passed tests 3 hours ago, it should be replaced + // with a working resource and be removed from the bridgetable. + for bridge in failing { + let res = self.replace_with_new(bridge); + if res == lox_library::ReplaceSuccess::Replaced { + println!( + "BridgeLine {:?} successfully replaced.", + bridge.uid_fingerprint + ); + } else if res == lox_library::ReplaceSuccess::NotReplaced { + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "BridgeLine {:?} NOT replaced, saved for next update!", + bridge.uid_fingerprint + ); + self.new_to_be_replaced_bridge(bridge); + } else { + // NotFound + assert!( + res == lox_library::ReplaceSuccess::NotFound, + "ReplaceSuccess incorrectly set" + ); + println!( + "BridgeLine {:?} no longer in bridge table.", + bridge.uid_fingerprint + ); + } + } + // Finally, assign any extra_bridges to new buckets if there are enough + while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { + let bucket = self.remove_extra_bridges(); + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + self.add_spare_bucket(bucket); + } + } + pub fn append_extra_bridges(&self, bridge: BridgeLine) { let mut extra_bridges = self.extra_bridges.lock().unwrap(); extra_bridges.push(bridge); @@ -112,6 +185,8 @@ impl LoxServerContext { } } + // Attempt to remove a bridge that is failing tests and replace it with a bridge from the + // available bridges or from a spare bucket pub fn replace_with_new(&self, bridgeline: BridgeLine) -> lox_library::ReplaceSuccess { let mut ba_obj = self.ba.lock().unwrap(); let eb_obj = self.extra_bridges.lock().unwrap(); diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index f41c40c..24c2582 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -5,7 +5,6 @@ use hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, }; -use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; use rdsys_backend::{proto::Resource, request_resources}; use serde::Deserialize; @@ -28,6 +27,8 @@ use tokio::{ time::{interval, sleep}, }; +use crate::resource_parser::sort_for_parsing; + async fn shutdown_signal() { tokio::signal::ctrl_c() .await @@ -55,7 +56,7 @@ struct Args { #[derive(Debug, Deserialize)] struct Config { db: DbConfig, - bridge_allocation: BridgeConfig, + bridge_config: BridgeConfig, rtype: ResourceInfo, } @@ -75,7 +76,7 @@ impl Default for DbConfig { } // Config information for how bridges should be allocated to buckets -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] pub struct BridgeConfig { // The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges) // that should be allocated as spare buckets @@ -83,12 +84,6 @@ pub struct BridgeConfig { percent_spares: i32, } -impl Default for BridgeConfig { - fn default() -> BridgeConfig { - BridgeConfig { percent_spares: 0 } - } -} - #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, @@ -108,7 +103,7 @@ async fn rdsys_stream( ) { tokio::select! { start_resource_request = rdsys_request(rtype, tx) => start_resource_request, - _ = kill.recv() => {println!("Shut down rdsys request loop"); return}, + _ = kill.recv() => {println!("Shut down rdsys request loop")}, } } @@ -200,138 +195,10 @@ async fn context_manager( // period of time are removed. // If bridges are labelled as blocked_in, we should also handle blocking behaviour. } else { - // for each resource, check if the resource fingerprint is failing tests, if it is check for how long - // check if the resource is already in the Lox bridgetable - // if it is, it's probably fine to remove or replace the existing resource with the incoming one - // to account for changes unless we want to track the number of changes on the lox side? - // that should be sufficient to keep it in sync - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - /* for bridgeline in bridgelines { - //context.populate_bridgetable(bridgelines. None); - println!("What is the bridgeline: {:?}", bridgeline); - if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { - println!("BridgeLine to be replaced: {:?}", bridgeline); - let res = context.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::NotFound { - println!( - "BridgeLine not found in bridge_table, already updated {:?}", - bridgeline - ); - } else if res == lox_library::ReplaceSuccess::Replaced { - println!("BridgeLine successfully replaced: {:?}", bridgeline); - } else { - assert!( - res == lox_library::ReplaceSuccess::NotReplaced, - "ReplaceSuccess incorrectly set somehow" - ); - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", - bridgeline - ); - context.new_to_be_replaced_bridge(bridgeline); - } - } else if count < MAX_BRIDGES_PER_BUCKET { - bucket[count] = bridgeline; - count += 1; - } else { - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - context.add_openinv_bucket(bucket); - count = 0; - bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; - } - } - // Handle the extra buckets that were not allocated already - if count != 0 { - for val in 0..count { - if context.extra_bridges.lock().unwrap().len() - < (MAX_BRIDGES_PER_BUCKET) - { - context.append_extra_bridges(bucket[val]); - } else { - bucket = context.remove_extra_bridges(); - context.add_spare_bucket(bucket); - } - } - } - } - - - let bridgeline = parse_resource(resource); - println!("BridgeLine to be changed: {:?}", bridgeline); - let res = context.update_bridge(bridgeline); - if res { - println!("BridgeLine successfully updated: {:?}", bridgeline); - } else { - println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline); - if context.extra_bridges.lock().unwrap().len() < 2 { - context.append_extra_bridges(bridgeline); - } else { - let bucket = context.remove_extra_bridges(); - context.add_spare_bucket(bucket); - } - } - } - } - } - } - // gone resources are not the same as blocked resources. - // Instead, these are bridges which have either failed to pass tests for some period - // or have expired bridge descriptors. In both cases, the bridge is unusable, but this - // is not likely due to censorship. Therefore, we replace gone resources with new resources - // TODO: create a notion of blocked resources from information collected through various means: - // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 - // If resource last passed tests 3 hours ago, it should be replaced with a working - // resource and be removed from the bridgetable. If it has been gone for more than 7 hours, - // we should stop trying to remove it from the bridge table and assume it has successfully been - // removed already - if resource.last_passed < (Utc::now() - chrono::Duration::hours(3)) - || resource.last_passed - > (Utc::now() - chrono::Duration::hours(7)) - { - let bridgeline = parse_resource(resource); - println!("BridgeLine to be replaced: {:?}", bridgeline); - let res = context.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine successfully replaced: {:?}", - bridgeline - ); - } else if res == lox_library::ReplaceSuccess::NotReplaced { - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", - bridgeline - ); - context.new_to_be_replaced_bridge(bridgeline); - } - } - } - } - } - } - */ - /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not - yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not - currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something - like the following: - println!("BridgeLine to be removed: {:?}", bridgeline); - let res = context.add_unreachable(bridgeline); - if res { - println!( - "BridgeLine successfully marked unreachable: {:?}", - bridgeline - ); - } else { - println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); - //TODO probably do something else here - } - */ + let (functional, failing) = sort_for_parsing(resources); + context.sync_with_bridgetable(functional, failing); } + // Handle any bridges that are leftover in the bridge authority from the sync context.allocate_leftover_bridges(); context.encrypt_table(); lox_db.write_context(context.clone()); @@ -407,7 +274,7 @@ async fn main() { let context_manager = spawn(async move { create_context_manager( config.db, - config.bridge_allocation, + config.bridge_config, args.roll_back_date, context_rx, kill_context, diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs index bc0d95b..e122c0a 100644 --- a/crates/lox-distributor/src/resource_parser.rs +++ b/crates/lox-distributor/src/resource_parser.rs @@ -1,6 +1,9 @@ +use chrono::{Duration, Utc}; use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET}; use rdsys_backend::proto::Resource; +pub const ACCEPTED_HOURS_OF_FAILURE: i64 = 3; + // Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable pub fn parse_into_bridgelines(resources: Vec) -> Vec { let mut bridgelines: Vec = Vec::new(); @@ -66,3 +69,113 @@ pub fn parse_into_buckets( } (buckets, leftovers) } + +// Sort Resources into those that are functional and those that are failing based on the last time +// they were passing tests. Before passing them back to the calling function, they are parsed into +// BridgeLines +pub fn sort_for_parsing(resources: Vec) -> (Vec, Vec) { + let mut functional: Vec = Vec::new(); + let mut failing: Vec = Vec::new(); + for resource in resources { + if resource.last_passed + Duration::hours(ACCEPTED_HOURS_OF_FAILURE) >= Utc::now() { + functional.push(resource); + } else { + failing.push(resource); + } + } + let functional_bridgelines = parse_into_bridgelines(functional); + let failing_bridgelines = parse_into_bridgelines(failing); + + (functional_bridgelines, failing_bridgelines) +} + +#[cfg(test)] +mod tests { + use rdsys_backend::proto::Resource; + use std::collections::HashMap; + + use chrono::{Duration, Utc}; + + use super::sort_for_parsing; + + pub fn make_resource( + rtype: String, + address: String, + port: u16, + fingerprint: String, + last_passed: i64, + ) -> Resource { + let mut flags = HashMap::new(); + flags.insert(String::from("fast"), true); + flags.insert(String::from("stable"), true); + let mut params = HashMap::new(); + params.insert( + String::from("password"), + String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + ); + Resource { + r#type: String::from(rtype), + blocked_in: HashMap::new(), + last_passed: Utc::now() - Duration::hours(last_passed), + protocol: String::from("tcp"), + address: String::from(address), + port: port, + fingerprint: String::from(fingerprint), + or_addresses: None, + distribution: String::from("https"), + flags: Some(flags), + params: Some(params), + } + } + + #[test] + fn test_sort_for_parsing() { + let resource_one = make_resource( + "scramblesuit".to_owned(), + "123.456.789.100".to_owned(), + 3002, + "BE84A97D02130470A1C77839954392BA979F7EE1".to_owned(), + 2, + ); + let resource_two = make_resource( + "https".to_owned(), + "123.222.333.444".to_owned(), + 6002, + "C56B9EF202130470A1C77839954392BA979F7FF9".to_owned(), + 5, + ); + let resource_three = make_resource( + "scramblesuit".to_owned(), + "444.888.222.100".to_owned(), + 3042, + "1A4C8BD902130470A1C77839954392BA979F7B46".to_owned(), + 4, + ); + let resource_four = make_resource( + "https".to_owned(), + "555.444.212.100".to_owned(), + 8022, + "FF024DC302130470A1C77839954392BA979F7AE2".to_owned(), + 3, + ); + let resource_five = make_resource( + "https".to_owned(), + "234.111.212.100".to_owned(), + 10432, + "7B4DE14CB2130470A1C77839954392BA979F7AE2".to_owned(), + 1, + ); + let mut test_vec: Vec = Vec::new(); + test_vec.push(resource_one); + test_vec.push(resource_two); + test_vec.push(resource_three); + test_vec.push(resource_four); + test_vec.push(resource_five); + let (functional, failing) = sort_for_parsing(test_vec); + assert!( + functional.len() == 2, + "There should be 2 functional bridges" + ); + assert!(failing.len() == 3, "There should be 3 failing bridges"); + } +} diff --git a/crates/lox-library/src/lib.rs b/crates/lox-library/src/lib.rs index 002ab9c..ba6d946 100644 --- a/crates/lox-library/src/lib.rs +++ b/crates/lox-library/src/lib.rs @@ -431,16 +431,11 @@ impl BridgeAuth { let reachable_bridges = self.bridge_table.reachable.clone(); for reachable_bridge in reachable_bridges { if reachable_bridge.0.uid_fingerprint == bridge.uid_fingerprint { - println!( - "Bridge from table: {:?} has same IP and Port as bridge {:?}!", - reachable_bridge.0, bridge - ); // Now we must remove the old bridge from the table and insert the new bridge in its place // i.e., in the same bucket and with the same permissions. let positions = self.bridge_table.reachable.get(&reachable_bridge.0); if let Some(v) = positions { for (bucketnum, offset) in v.iter() { - println!("Bucket num: {:?} and offset: {:?}", bucketnum, offset); let mut bridgelines = match self.bridge_table.buckets.get(bucketnum) { Some(bridgelines) => *bridgelines, None => return res, @@ -448,11 +443,10 @@ impl BridgeAuth { assert!(bridgelines[*offset] == reachable_bridge.0); bridgelines[*offset] = *bridge; self.bridge_table.buckets.insert(*bucketnum, bridgelines); - let bridgelines = match self.bridge_table.buckets.get(bucketnum) { + match self.bridge_table.buckets.get(bucketnum) { Some(bridgelines) => *bridgelines, None => return res, }; - assert!(bridgelines[*offset] != reachable_bridge.0); } res = true; } else {