diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 04a015e..6578722 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -2,14 +2,17 @@ "db": { "db_path": "lox_db" + }, + "bridge_allocation": { + "percent_spares": 50 }, "rtype": { - "endpoint": "http://127.0.0.1:7100/resources", - "name": "https", - "token": "HttpsApiTokenPlaceholder", - "types": [ - "obfs2", - "scramblesuit" - ] -} + "endpoint": "http://127.0.0.1:7100/resources", + "name": "https", + "token": "HttpsApiTokenPlaceholder", + "types": [ + "obfs2", + "scramblesuit" + ] + } } \ No newline at end of file diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index 4b02b6d..4b87166 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -31,12 +31,24 @@ impl LoxServerContext { } // Populate an empty bridgetable for the first time - /* pub fn populate_bridgetable(&self, bridgelines: Vec, percent_spares: Option) { - if Some(percent_spares) { - let partition: usize = bridgelines.len()*percent_spares/100; + pub fn populate_bridgetable( + &self, + buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>, + percent_spares: i32, + ) { + let mut partition: i32 = 0; + if percent_spares != 0 { + partition = buckets.len() as i32 * percent_spares / 100; + } + let (spares, open_invitations) = buckets.split_at(partition as usize); + for bucket in spares { + self.add_spare_bucket(*bucket) } - } */ + for bucket in open_invitations { + self.add_openinv_bucket(*bucket) + } + } pub fn append_extra_bridges(&self, bridge: BridgeLine) { let mut extra_bridges = self.extra_bridges.lock().unwrap(); diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index e4a29e2..f41c40c 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -20,7 +20,7 @@ mod lox_context; mod request_handler; use request_handler::handle; mod resource_parser; -use resource_parser::parse_resources; +use resource_parser::{parse_into_bridgelines, parse_into_buckets}; use tokio::{ signal, spawn, @@ -55,10 +55,11 @@ struct Args { #[derive(Debug, Deserialize)] struct Config { db: DbConfig, - + bridge_allocation: BridgeConfig, rtype: ResourceInfo, } +// Path of the lox database #[derive(Debug, Deserialize)] pub struct DbConfig { // The path for the lox_context database, default is "lox_db" @@ -73,6 +74,21 @@ impl Default for DbConfig { } } +// Config information for how bridges should be allocated to buckets +#[derive(Debug, Deserialize)] +pub struct BridgeConfig { + // The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges) + // that should be allocated as spare buckets + // This will be calculated as the floor of buckets.len() * percent_spares / 100 + percent_spares: i32, +} + +impl Default for BridgeConfig { + fn default() -> BridgeConfig { + BridgeConfig { percent_spares: 0 } + } +} + #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, @@ -138,12 +154,13 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { - create_context = context_manager(db_config, roll_back_date, context_rx) => create_context, + create_context = context_manager(db_config, bridge_config, roll_back_date, context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager");}, } } @@ -153,6 +170,7 @@ async fn create_context_manager( // can be responded to with an updated BridgeDB state async fn context_manager( db_config: DbConfig, + bridge_config: BridgeConfig, roll_back_date: Option, mut context_rx: mpsc::Receiver, ) { @@ -167,138 +185,153 @@ async fn context_manager( use Command::*; match cmd { Rdsys { resources } => { - let mut count = 0; - let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + // If the bridgetable is not being loaded from an existing database, we will populate the + // bridgetable with all of the bridges received from rdsys if context.bridgetable_is_empty() { - // otherwise, for each resource, check if the resource fingerprint is failing tests, if it is check for how long + let bridgelines = parse_into_bridgelines(resources); + let (buckets, leftovers) = parse_into_buckets(bridgelines); + for leftover in leftovers { + context.append_extra_bridges(leftover); + } + context.populate_bridgetable(buckets, bridge_config.percent_spares); + // otherwise, we need to sync the existing bridgetable with the resources we receive from + // rdsys and ensure that all functioning bridges are correctly placed in the bridgetable + // those that have changed are updated and those that have been failing tests for an extended + // period of time are removed. + // If bridges are labelled as blocked_in, we should also handle blocking behaviour. + } else { + // for each resource, check if the resource fingerprint is failing tests, if it is check for how long // check if the resource is already in the Lox bridgetable // if it is, it's probably fine to remove or replace the existing resource with the incoming one // to account for changes unless we want to track the number of changes on the lox side? // that should be sufficient to keep it in sync - let bridgelines = parse_resources(resources); - for bridgeline in bridgelines { - //context.populate_bridgetable(bridgelines. None); - println!("What is the bridgeline: {:?}", bridgeline); - if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { - println!("BridgeLine to be replaced: {:?}", bridgeline); - let res = context.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::NotFound { - println!( - "BridgeLine not found in bridge_table, already updated {:?}", - bridgeline - ); - } else if res == lox_library::ReplaceSuccess::Replaced { - println!("BridgeLine successfully replaced: {:?}", bridgeline); - } else { - assert!( - res == lox_library::ReplaceSuccess::NotReplaced, - "ReplaceSuccess incorrectly set somehow" - ); - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", - bridgeline - ); - context.new_to_be_replaced_bridge(bridgeline); - } - } else if count < MAX_BRIDGES_PER_BUCKET { - bucket[count] = bridgeline; - count += 1; - } else { - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - context.add_openinv_bucket(bucket); - count = 0; - bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; - } - } - // Handle the extra buckets that were not allocated already - if count != 0 { - for val in 0..count { - if context.extra_bridges.lock().unwrap().len() - < (MAX_BRIDGES_PER_BUCKET) - { - context.append_extra_bridges(bucket[val]); - } else { - bucket = context.remove_extra_bridges(); - context.add_spare_bucket(bucket); - } - } - } - } - /* - let bridgeline = parse_resource(resource); - println!("BridgeLine to be changed: {:?}", bridgeline); - let res = context.update_bridge(bridgeline); - if res { - println!("BridgeLine successfully updated: {:?}", bridgeline); + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + /* for bridgeline in bridgelines { + //context.populate_bridgetable(bridgelines. None); + println!("What is the bridgeline: {:?}", bridgeline); + if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { + println!("BridgeLine to be replaced: {:?}", bridgeline); + let res = context.replace_with_new(bridgeline); + if res == lox_library::ReplaceSuccess::NotFound { + println!( + "BridgeLine not found in bridge_table, already updated {:?}", + bridgeline + ); + } else if res == lox_library::ReplaceSuccess::Replaced { + println!("BridgeLine successfully replaced: {:?}", bridgeline); } else { - println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline); - if context.extra_bridges.lock().unwrap().len() < 2 { - context.append_extra_bridges(bridgeline); - } else { - let bucket = context.remove_extra_bridges(); - context.add_spare_bucket(bucket); - } + assert!( + res == lox_library::ReplaceSuccess::NotReplaced, + "ReplaceSuccess incorrectly set somehow" + ); + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", + bridgeline + ); + context.new_to_be_replaced_bridge(bridgeline); } - } - } - } - } - // gone resources are not the same as blocked resources. - // Instead, these are bridges which have either failed to pass tests for some period - // or have expired bridge descriptors. In both cases, the bridge is unusable, but this - // is not likely due to censorship. Therefore, we replace gone resources with new resources - // TODO: create a notion of blocked resources from information collected through various means: - // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 - // If resource last passed tests 3 hours ago, it should be replaced with a working - // resource and be removed from the bridgetable. If it has been gone for more than 7 hours, - // we should stop trying to remove it from the bridge table and assume it has successfully been - // removed already - if resource.last_passed < (Utc::now() - chrono::Duration::hours(3)) - || resource.last_passed - > (Utc::now() - chrono::Duration::hours(7)) - { - let bridgeline = parse_resource(resource); - println!("BridgeLine to be replaced: {:?}", bridgeline); - let res = context.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine successfully replaced: {:?}", - bridgeline - ); - } else if res == lox_library::ReplaceSuccess::NotReplaced { - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", - bridgeline - ); - context.new_to_be_replaced_bridge(bridgeline); - } - } - } - } - } - } - */ - /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not - yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not - currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something - like the following: - println!("BridgeLine to be removed: {:?}", bridgeline); - let res = context.add_unreachable(bridgeline); - if res { - println!( - "BridgeLine successfully marked unreachable: {:?}", - bridgeline - ); + } else if count < MAX_BRIDGES_PER_BUCKET { + bucket[count] = bridgeline; + count += 1; } else { - println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); - //TODO probably do something else here + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + context.add_openinv_bucket(bucket); + count = 0; + bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; } - */ + } + // Handle the extra buckets that were not allocated already + if count != 0 { + for val in 0..count { + if context.extra_bridges.lock().unwrap().len() + < (MAX_BRIDGES_PER_BUCKET) + { + context.append_extra_bridges(bucket[val]); + } else { + bucket = context.remove_extra_bridges(); + context.add_spare_bucket(bucket); + } + } + } + } + + + let bridgeline = parse_resource(resource); + println!("BridgeLine to be changed: {:?}", bridgeline); + let res = context.update_bridge(bridgeline); + if res { + println!("BridgeLine successfully updated: {:?}", bridgeline); + } else { + println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline); + if context.extra_bridges.lock().unwrap().len() < 2 { + context.append_extra_bridges(bridgeline); + } else { + let bucket = context.remove_extra_bridges(); + context.add_spare_bucket(bucket); + } + } + } + } + } + } + // gone resources are not the same as blocked resources. + // Instead, these are bridges which have either failed to pass tests for some period + // or have expired bridge descriptors. In both cases, the bridge is unusable, but this + // is not likely due to censorship. Therefore, we replace gone resources with new resources + // TODO: create a notion of blocked resources from information collected through various means: + // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 + // If resource last passed tests 3 hours ago, it should be replaced with a working + // resource and be removed from the bridgetable. If it has been gone for more than 7 hours, + // we should stop trying to remove it from the bridge table and assume it has successfully been + // removed already + if resource.last_passed < (Utc::now() - chrono::Duration::hours(3)) + || resource.last_passed + > (Utc::now() - chrono::Duration::hours(7)) + { + let bridgeline = parse_resource(resource); + println!("BridgeLine to be replaced: {:?}", bridgeline); + let res = context.replace_with_new(bridgeline); + if res == lox_library::ReplaceSuccess::Replaced { + println!( + "BridgeLine successfully replaced: {:?}", + bridgeline + ); + } else if res == lox_library::ReplaceSuccess::NotReplaced { + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", + bridgeline + ); + context.new_to_be_replaced_bridge(bridgeline); + } + } + } + } + } + } + */ + /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not + yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not + currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something + like the following: + println!("BridgeLine to be removed: {:?}", bridgeline); + let res = context.add_unreachable(bridgeline); + if res { + println!( + "BridgeLine successfully marked unreachable: {:?}", + bridgeline + ); + } else { + println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); + //TODO probably do something else here + } + */ + } context.allocate_leftover_bridges(); context.encrypt_table(); lox_db.write_context(context.clone()); @@ -372,7 +405,14 @@ async fn main() { }); let context_manager = spawn(async move { - create_context_manager(config.db, args.roll_back_date, context_rx, kill_context).await + create_context_manager( + config.db, + config.bridge_allocation, + args.roll_back_date, + context_rx, + kill_context, + ) + .await }); let (tx, rx) = mpsc::channel(32); diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs index ae5739e..bc0d95b 100644 --- a/crates/lox-distributor/src/resource_parser.rs +++ b/crates/lox-distributor/src/resource_parser.rs @@ -1,7 +1,8 @@ -use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES}; +use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET}; use rdsys_backend::proto::Resource; -pub fn parse_resources(resources: Vec) -> Vec { +// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable +pub fn parse_into_bridgelines(resources: Vec) -> Vec { let mut bridgelines: Vec = Vec::new(); for resource in resources { let mut ip_bytes: [u8; 16] = [0; 16]; @@ -32,3 +33,36 @@ pub fn parse_resources(resources: Vec) -> Vec { } bridgelines } + +// Allocate each Bridgeline into a bucket that will later be allocated into spare buckets or open invitation buckets +// Any leftover buckets from total_bridgelines % MAX_BRIDGES_PER_BUCKET are returned in a separate Vec +// TODO: Improve this function to sort bridgelines into buckets in a more intentional manner. This could include +// sorting bridgelines with high bandwidth into buckets that are only distributed to more trusted users or sorting +// bridgelines by location +pub fn parse_into_buckets( + mut bridgelines: Vec, +) -> (Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>, Vec) { + let mut buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]> = Vec::new(); + let mut count = 0; + let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + let mut leftovers: Vec = Vec::new(); + for bridgeline in bridgelines.clone() { + println!("What is the bridgeline: {:?}", bridgeline); + if count < MAX_BRIDGES_PER_BUCKET { + bucket[count] = bridgeline; + count += 1; + } else { + buckets.push(bucket); + count = 0; + bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + } + } + // Handle the extra buckets that were not allocated already + if count != 0 { + for _ in 0..count { + // Assumes that the unallocated bridgelines will be the last x of the passed bridgelines + leftovers.push(bridgelines.pop().unwrap()); + } + } + (buckets, leftovers) +} diff --git a/crates/rdsys-backend-api/src/lib.rs b/crates/rdsys-backend-api/src/lib.rs index 923c15a..10b6087 100644 --- a/crates/rdsys-backend-api/src/lib.rs +++ b/crates/rdsys-backend-api/src/lib.rs @@ -286,7 +286,7 @@ pub async fn request_resources( api_endpoint: String, .await.unwrap(); match response.status() { reqwest::StatusCode::OK => { - fetched_resources = match dbg!(response.json::>().await) { + fetched_resources = match response.json::>().await { Ok(fetched_resources) => Ok(fetched_resources), Err(e) => Err(Error::Reqwest(e)), };