diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index adc49ca..b237419 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -8,6 +8,7 @@ use lox_library::{ }, BridgeAuth, BridgeDb, IssuerPubKey, }; +use rdsys_backend::proto::ResourceState; use serde::{Deserialize, Serialize}; use std::{ @@ -16,6 +17,8 @@ use std::{ }; use zkp::ProofError; +use crate::resource_parser::{parse_into_bridgelines, sort_for_parsing}; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LoxServerContext { pub db: Arc>, @@ -68,58 +71,89 @@ impl LoxServerContext { */ // Sync resources received from rdsys with the Lox bridgetable - pub fn sync_with_bridgetable(&self, functional: Vec, failing: Vec) { + pub fn sync_with_bridgetable(&self, resources: ResourceState) { // Check if the resource is already in the Lox bridgetable. If it is, it's probably fine - // to replace the existing resource with the incoming one to account for changes, - // unless we want to track the number of changes on the lox side? - for bridge in functional { - let res = self.update_bridge(bridge); - if res { - println!( - "BridgeLine {:?} successfully updated.", - bridge.uid_fingerprint - ); - // Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later - } else { - println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint); - self.append_extra_bridges(bridge); + // to replace the existing resource with the incoming one to account for changes + let mut unaccounted_for_bridges: Vec = Vec::new(); + let mut accounted_for_bridges: Vec = Vec::new(); + if let Some(working_resources) = resources.working { + // ensure all working resources are updated and accounted for + // save a list of unaccounted for bridges and deal with them in the next block + let bridgelines = parse_into_bridgelines(working_resources); + for bridge in bridgelines { + let res = self.update_bridge(bridge); + if res { + println!( + "BridgeLine {:?} successfully updated.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + // Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later + } else { + println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint); + self.append_extra_bridges(bridge); + } } } - // Next, handle the failing bridges. If resource last passed tests 3 hours ago, it should be replaced - // with a working resource and be removed from the bridgetable. - for bridge in failing { - let res = self.replace_with_new(bridge); - if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine {:?} successfully replaced.", - bridge.uid_fingerprint - ); - } else if res == lox_library::ReplaceSuccess::NotReplaced { - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "BridgeLine {:?} NOT replaced, saved for next update!", - bridge.uid_fingerprint - ); - self.new_to_be_replaced_bridge(bridge); - } else { - // NotFound - assert!( - res == lox_library::ReplaceSuccess::NotFound, - "ReplaceSuccess incorrectly set" - ); - println!( - "BridgeLine {:?} no longer in bridge table.", - bridge.uid_fingerprint - ); + if let Some(not_working_resources) = resources.not_working { + let (grace_period, failing) = sort_for_parsing(not_working_resources); + // Update bridges in the bridge table that are failing but within the grace period + for bridge in grace_period { + let res = self.update_bridge(bridge); + if res { + println!( + "BridgeLine {:?} successfully updated.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + } + } + // Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago, + // it should be replaced with a working resource and be removed from the bridgetable. + for bridge in failing { + let res = self.replace_with_new(bridge); + if res == lox_library::ReplaceSuccess::Replaced { + println!( + "BridgeLine {:?} successfully replaced.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + } else if res == lox_library::ReplaceSuccess::NotReplaced { + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "BridgeLine {:?} NOT replaced, saved for next update!", + bridge.uid_fingerprint + ); + self.new_to_be_replaced_bridge(bridge); + accounted_for_bridges.push(bridge.uid_fingerprint); + } else { + // NotFound + assert!( + res == lox_library::ReplaceSuccess::NotFound, + "ReplaceSuccess incorrectly set" + ); + println!( + "BridgeLine {:?} no longer in bridge table.", + bridge.uid_fingerprint + ); + } + } + // Make sure that all bridges are accounted for + let ba_clone = self.ba.lock().unwrap(); + let total_reachable = ba_clone.bridge_table.reachable.len(); + if total_reachable > accounted_for_bridges.len() { + // Search for extra fingerprints, assume those bridges are gone and remove? + } else if total_reachable < accounted_for_bridges.len() { + println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"); + } + // Finally, assign any extra_bridges to new buckets if there are enough + while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { + let bucket = self.remove_extra_bridges(); + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + self.add_spare_bucket(bucket); } - } - // Finally, assign any extra_bridges to new buckets if there are enough - while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { - let bucket = self.remove_extra_bridges(); - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - self.add_spare_bucket(bucket); } } diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 0387a3f..78898e8 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -6,7 +6,7 @@ use hyper::{ Body, Request, Response, Server, }; -use rdsys_backend::{proto::Resource, request_resources}; +use rdsys_backend::{proto::ResourceState, request_resources}; use serde::Deserialize; use std::{ @@ -27,8 +27,6 @@ use tokio::{ time::{interval, sleep}, }; -use crate::resource_parser::sort_for_parsing; - async fn shutdown_signal() { tokio::signal::ctrl_c() .await @@ -98,7 +96,7 @@ struct ResourceInfo { // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_stream( rtype: ResourceInfo, - tx: mpsc::Sender>, + tx: mpsc::Sender, mut kill: broadcast::Receiver<()>, ) { tokio::select! { @@ -108,7 +106,7 @@ async fn rdsys_stream( } } -async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender>) { +async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender) { let mut interval = interval(Duration::from_secs(30)); loop { interval.tick().await; @@ -126,7 +124,7 @@ async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender>) { async fn rdsys_bridge_parser( rdsys_tx: mpsc::Sender, - rx: mpsc::Receiver>, + rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { @@ -137,7 +135,7 @@ async fn rdsys_bridge_parser( // Parse Bridges receives a Vec from rdsys_sender and sends it to the // Context Manager to be parsed and added to the BridgeDB -async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver>) { +async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resources = rx.recv().await.unwrap(); let cmd = Command::Rdsys { resources }; @@ -180,22 +178,24 @@ async fn context_manager( match cmd { Rdsys { resources } => { // If the bridgetable is not being loaded from an existing database, we will populate the - // bridgetable with all of the bridges received from rdsys + // bridgetable with all of the working bridges received from rdsys. if context.bridgetable_is_empty() { - let bridgelines = parse_into_bridgelines(resources); - let (buckets, leftovers) = parse_into_buckets(bridgelines); - for leftover in leftovers { - context.append_extra_bridges(leftover); + if let Some(working_resources) = resources.working { + let bridgelines = parse_into_bridgelines(working_resources); + let (buckets, leftovers) = parse_into_buckets(bridgelines); + for leftover in leftovers { + context.append_extra_bridges(leftover); + } + context.populate_bridgetable(buckets, bridge_config.percent_spares); + + // otherwise, we need to sync the existing bridgetable with the resources we receive from + // rdsys and ensure that all functioning bridges are correctly placed in the bridgetable + // those that have changed are updated and those that have been failing tests for an extended + // period of time are removed. + // If bridges are labelled as blocked_in, we should also handle blocking behaviour. } - context.populate_bridgetable(buckets, bridge_config.percent_spares); - // otherwise, we need to sync the existing bridgetable with the resources we receive from - // rdsys and ensure that all functioning bridges are correctly placed in the bridgetable - // those that have changed are updated and those that have been failing tests for an extended - // period of time are removed. - // If bridges are labelled as blocked_in, we should also handle blocking behaviour. } else { - let (functional, failing) = sort_for_parsing(resources); - context.sync_with_bridgetable(functional, failing); + context.sync_with_bridgetable(resources); } // Handle any bridges that are leftover in the bridge authority from the sync context.allocate_leftover_bridges(); @@ -225,7 +225,7 @@ async fn context_manager( #[derive(Debug)] enum Command { Rdsys { - resources: Vec, + resources: ResourceState, }, Request { req: Request, diff --git a/crates/lox-distributor/src/resource_parser.rs b/crates/lox-distributor/src/resource_parser.rs index 09863ed..9136450 100644 --- a/crates/lox-distributor/src/resource_parser.rs +++ b/crates/lox-distributor/src/resource_parser.rs @@ -77,24 +77,28 @@ pub fn parse_into_buckets( // they were passing tests. Before passing them back to the calling function, they are parsed into // BridgeLines pub fn sort_for_parsing(resources: Vec) -> (Vec, Vec) { - let mut functional: Vec = Vec::new(); + let mut grace_period: Vec = Vec::new(); let mut failing: Vec = Vec::new(); for resource in resources { - if resource.last_passed + Duration::hours(ACCEPTED_HOURS_OF_FAILURE) >= Utc::now() { - functional.push(resource); + // TODO: Maybe filter for untested resources first if last_passed alone would skew + // the filter in an unintended direction + if resource.test_result.last_passed + Duration::hours(ACCEPTED_HOURS_OF_FAILURE) + >= Utc::now() + { + grace_period.push(resource); } else { failing.push(resource); } } - let functional_bridgelines = parse_into_bridgelines(functional); + let grace_period_bridgelines = parse_into_bridgelines(grace_period); let failing_bridgelines = parse_into_bridgelines(failing); - (functional_bridgelines, failing_bridgelines) + (grace_period_bridgelines, failing_bridgelines) } #[cfg(test)] mod tests { - use rdsys_backend::proto::Resource; + use rdsys_backend::proto::{Resource, TestResults}; use std::collections::HashMap; use chrono::{Duration, Utc}; @@ -119,7 +123,9 @@ mod tests { Resource { r#type: String::from(rtype), blocked_in: HashMap::new(), - last_passed: Utc::now() - Duration::hours(last_passed), + test_result: TestResults { + last_passed: Utc::now() - Duration::hours(last_passed), + }, protocol: String::from("tcp"), address: String::from(address), port: port, diff --git a/crates/rdsys-backend-api/src/lib.rs b/crates/rdsys-backend-api/src/lib.rs index f3aa8b3..51e2bdc 100644 --- a/crates/rdsys-backend-api/src/lib.rs +++ b/crates/rdsys-backend-api/src/lib.rs @@ -267,8 +267,8 @@ pub async fn request_resources( name: String, token: String, resource_types: Vec, -) -> Result, Error> { - let fetched_resources: Result, Error>; +) -> Result { + let fetched_resources: Result; let req = proto::ResourceRequest { request_origin: name, resource_types, @@ -288,7 +288,7 @@ pub async fn request_resources( .unwrap(); match response.status() { reqwest::StatusCode::OK => { - fetched_resources = match response.json::>().await { + fetched_resources = match response.json::().await { Ok(fetched_resources) => Ok(fetched_resources), Err(e) => Err(Error::Reqwest(e)), }; diff --git a/crates/rdsys-backend-api/src/proto.rs b/crates/rdsys-backend-api/src/proto.rs index af0a026..ecfeee9 100644 --- a/crates/rdsys-backend-api/src/proto.rs +++ b/crates/rdsys-backend-api/src/proto.rs @@ -10,12 +10,18 @@ pub struct ResourceRequest { pub resource_types: Vec, } + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct TestResults { + pub last_passed: DateTime +} + /// Representation of a bridge resource #[derive(Deserialize, PartialEq, Eq, Debug)] pub struct Resource { pub r#type: String, pub blocked_in: HashMap, - pub last_passed: DateTime, + pub test_result: TestResults, pub protocol: String, pub address: String, pub port: u16, @@ -51,6 +57,13 @@ impl Resource { } } +/// A ResourceState holds information about new, changed, or pruned resources +#[derive(Deserialize, PartialEq, Eq, Debug)] +pub struct ResourceState { + pub working: Option>, + pub not_working: Option>, +} + /// A ResourceDiff holds information about new, changed, or pruned resources #[derive(Deserialize, PartialEq, Eq, Debug)] pub struct ResourceDiff { @@ -92,7 +105,7 @@ mod tests { let bridge = Resource { r#type: String::from("scramblesuit"), blocked_in: HashMap::new(), - last_passed: "2023-05-30T14:20:28Z".parse::>().unwrap(), + test_result: TestResults { last_passed: "2023-05-30T14:20:28Z".parse::>().unwrap() }, protocol: String::from("tcp"), address: String::from("216.117.3.62"), port: 63174, @@ -107,7 +120,9 @@ mod tests { { "type": "scramblesuit", "blocked_in": {}, - "last_passed": "2023-05-30T14:20:28.000+00:00", + "test_result" : { + "last_passed": "2023-05-30T14:20:28.000+00:00" + }, "protocol": "tcp", "address": "216.117.3.62", "port": 63174, @@ -126,7 +141,7 @@ mod tests { assert_eq!(bridge, res); } - #[test] + #[test] fn deserialize_resource_diff() { let data = r#" { @@ -135,7 +150,9 @@ mod tests { { "type": "obfs2", "blocked_in": {}, - "last_passed": "2023-05-30T11:42:28.000+07:00", + "test_result" : { + "last_passed": "2023-05-30T11:42:28.000+07:00" + }, "Location": null, "protocol": "tcp", "address": "176.247.216.207", @@ -153,7 +170,9 @@ mod tests { { "type": "obfs2", "blocked_in": {}, - "last_passed": "2023-05-30T12:20:28.000+07:00", + "test_result" : { + "last_passed": "2023-05-30T12:20:28.000+07:00" + }, "protocol": "tcp", "address": "133.69.16.145", "port": 58314, @@ -172,7 +191,9 @@ mod tests { { "type": "scramblesuit", "blocked_in": {}, - "last_passed": "2023-05-30T14:20:28.000+07:00", + "test_result" : { + "last_passed": "2023-05-30T14:20:28.000+07:00" + }, "protocol": "tcp", "address": "216.117.3.62", "port": 63174,