diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index 532eb1b..492e17f 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -8,10 +8,11 @@ use lox_library::{ }, BridgeAuth, BridgeDb, IssuerPubKey, }; -use rdsys_backend::proto::ResourceState; +use rdsys_backend::proto::{Resource, ResourceState}; use serde::{Deserialize, Serialize}; use std::{ + cmp::Ordering, collections::HashMap, sync::{Arc, Mutex}, }; @@ -53,134 +54,149 @@ impl LoxServerContext { } } - /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not - yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not - currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something - like the following: - println!("BridgeLine to be removed: {:?}", bridgeline); - let res = context.add_unreachable(bridgeline); - if res { - println!( - "BridgeLine successfully marked unreachable: {:?}", - bridgeline - ); - } else { - println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); - //TODO probably do something else here - } - */ + pub fn handle_working_resources(&self, working_resources: Vec) -> Vec { + let mut accounted_for_bridges: Vec = Vec::new(); + let bridgelines = parse_into_bridgelines(working_resources); + for bridge in bridgelines { + /* TODO: Functionality for marking bridges as unreachable/blocked should eventually happen here. + It is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked. + This means that migrations to unblocked bridges do not currently work but can be easily enabled by parsing the + list of `blocked resources` from rdsys or another source with something like the following: + let res = context.add_unreachable(bridgeline); + if res { + println!( + "BridgeLine {:?} successfully marked unreachable: {:?}", + bridgeline + ); + } else { + println!( + "BridgeLine {:?} NOT marked unreachable, saved for next update!", + bridge.uid_fingerprint + ); + } + */ + let res = self.update_bridge(bridge); + if res { + println!( + "BridgeLine {:?} successfully updated.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + // Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later + } else { + println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint); + self.append_extra_bridges(bridge); + } + } + accounted_for_bridges + } + + pub fn handle_not_working_resources( + &self, + not_working_resources: Vec, + mut accounted_for_bridges: Vec, + ) -> Vec { + let (grace_period, failing) = sort_for_parsing(not_working_resources); + // Update bridges in the bridge table that are failing but within the grace period + for bridge in grace_period { + let res = self.update_bridge(bridge); + if res { + println!( + "BridgeLine {:?} successfully updated.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + } + } + // Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago, + // it should be replaced with a working resource and be removed from the bridgetable. + for bridge in failing { + let res = self.replace_with_new(bridge); + if res == lox_library::ReplaceSuccess::Replaced { + println!( + "BridgeLine {:?} successfully replaced.", + bridge.uid_fingerprint + ); + accounted_for_bridges.push(bridge.uid_fingerprint); + } else if res == lox_library::ReplaceSuccess::NotReplaced { + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "BridgeLine {:?} NOT replaced, saved for next update!", + bridge.uid_fingerprint + ); + self.new_to_be_replaced_bridge(bridge); + accounted_for_bridges.push(bridge.uid_fingerprint); + } else { + // NotFound + assert!( + res == lox_library::ReplaceSuccess::NotFound, + "ReplaceSuccess incorrectly set" + ); + println!( + "BridgeLine {:?} no longer in bridge table.", + bridge.uid_fingerprint + ); + } + } + accounted_for_bridges + } // Sync resources received from rdsys with the Lox bridgetable pub fn sync_with_bridgetable(&self, resources: ResourceState) { - // Check if the resource is already in the Lox bridgetable. If it is, it's probably fine + // Check if each resource is already in the Lox bridgetable. If it is, it's probably fine // to replace the existing resource with the incoming one to account for changes + // save a list of accounted for bridges and deal with the unaccounted for bridges at the end let mut accounted_for_bridges: Vec = Vec::new(); + // ensure all working resources are updated and accounted for if let Some(working_resources) = resources.working { - // ensure all working resources are updated and accounted for - // save a list of unaccounted for bridges and deal with them in the next block - let bridgelines = parse_into_bridgelines(working_resources); - for bridge in bridgelines { - let res = self.update_bridge(bridge); - if res { - println!( - "BridgeLine {:?} successfully updated.", - bridge.uid_fingerprint - ); - accounted_for_bridges.push(bridge.uid_fingerprint); - // Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later - } else { - println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint); - self.append_extra_bridges(bridge); - } - } + accounted_for_bridges = self.handle_working_resources(working_resources); } if let Some(not_working_resources) = resources.not_working { - let (grace_period, failing) = sort_for_parsing(not_working_resources); - // Update bridges in the bridge table that are failing but within the grace period - for bridge in grace_period { - let res = self.update_bridge(bridge); - if res { - println!( - "BridgeLine {:?} successfully updated.", - bridge.uid_fingerprint - ); - accounted_for_bridges.push(bridge.uid_fingerprint); - } - } - // Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago, - // it should be replaced with a working resource and be removed from the bridgetable. - for bridge in failing { - let res = self.replace_with_new(bridge); - if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine {:?} successfully replaced.", - bridge.uid_fingerprint - ); - accounted_for_bridges.push(bridge.uid_fingerprint); - } else if res == lox_library::ReplaceSuccess::NotReplaced { - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "BridgeLine {:?} NOT replaced, saved for next update!", - bridge.uid_fingerprint - ); - self.new_to_be_replaced_bridge(bridge); - accounted_for_bridges.push(bridge.uid_fingerprint); - } else { - // NotFound - assert!( - res == lox_library::ReplaceSuccess::NotFound, - "ReplaceSuccess incorrectly set" - ); - println!( - "BridgeLine {:?} no longer in bridge table.", - bridge.uid_fingerprint - ); - } - } - // Make sure that all bridges are accounted for - let mut ba_clone = self.ba.lock().unwrap(); - let total_reachable = ba_clone.bridge_table.reachable.len(); - if total_reachable > accounted_for_bridges.len() { - let unaccounted_for = - ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges); + accounted_for_bridges = + self.handle_not_working_resources(not_working_resources, accounted_for_bridges); + } + let mut ba_clone = self.ba.lock().unwrap(); + let total_reachable = ba_clone.bridge_table.reachable.len(); + match total_reachable.cmp(&accounted_for_bridges.len()) { + Ordering::Greater => { + let unaccounted_for = ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges); for bridgeline in unaccounted_for { - let res = self.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine {:?} not found in rdsys update was successfully replaced.", - bridgeline.uid_fingerprint - ); - } else if res == lox_library::ReplaceSuccess::NotReplaced { - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!( - "BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!", - bridgeline.uid_fingerprint - ); - self.new_to_be_replaced_bridge(bridgeline); - } else { - // NotFound - assert!( - res == lox_library::ReplaceSuccess::NotFound, - "ReplaceSuccess incorrectly set" - ); - println!( + match self.replace_with_new(bridgeline) { + lox_library::ReplaceSuccess::Replaced => println!( + "BridgeLine {:?} not found in rdsys update was successfully replaced.", bridgeline.uid_fingerprint), + lox_library::ReplaceSuccess::NotReplaced => { + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!("BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!", + bridgeline.uid_fingerprint); + self.new_to_be_replaced_bridge(bridgeline) + } + lox_library::ReplaceSuccess::NotFound => println!( "BridgeLine {:?} no longer in reachable bridges.", bridgeline.uid_fingerprint - ); + ), } } - // Search for extra fingerprints, assume those bridges are gone and remove - } else if total_reachable < accounted_for_bridges.len() { - println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"); } - // Finally, assign any extra_bridges to new buckets if there are enough - while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { - let bucket = self.remove_extra_bridges(); - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - self.add_spare_bucket(bucket); + Ordering::Less => println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"), + _ => (), + + } + // Finally, assign any extra_bridges to new buckets if there are enough + while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { + let bucket = self.remove_extra_bridges(); + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + let mut db_obj = self.db.lock().unwrap(); + match ba_clone.add_spare_bucket(bucket, &mut db_obj) { + Ok(_) => (), + Err(e) => { + println!("Error: {:?}", e); + for bridge in bucket { + self.append_extra_bridges(bridge); + } + } } } } @@ -262,7 +278,7 @@ impl LoxServerContext { result } - /* Uncomment when bridge blocking is finalized + /* TODO: Uncomment when bridge blocking is finalized pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { let mut ba_obj = self.ba.lock().unwrap(); let mut db_obj = self.db.lock().unwrap(); diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 78898e8..2b36e82 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -93,7 +93,6 @@ struct ResourceInfo { // Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified // in the config.json file. -// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_stream( rtype: ResourceInfo, tx: mpsc::Sender, @@ -107,7 +106,7 @@ async fn rdsys_stream( } async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender) { - let mut interval = interval(Duration::from_secs(30)); + let mut interval = interval(Duration::from_secs(5)); loop { interval.tick().await; let resources = request_resources( @@ -133,8 +132,8 @@ async fn rdsys_bridge_parser( } } -// Parse Bridges receives a Vec from rdsys_sender and sends it to the -// Context Manager to be parsed and added to the BridgeDB +// Parse Bridges receives the resources from rdsys and sends it to the +// Context Manager to be parsed and added to the Lox BridgeDB async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { loop { let resources = rx.recv().await.unwrap(); diff --git a/crates/lox-library/src/lib.rs b/crates/lox-library/src/lib.rs index 2571893..2f92ad9 100644 --- a/crates/lox-library/src/lib.rs +++ b/crates/lox-library/src/lib.rs @@ -380,12 +380,10 @@ impl BridgeAuth { Ok(()) } - // TODO Ensure synchronization of Lox bridge_table with rdsys pub fn find_and_remove_unaccounted_for_bridges( &mut self, accounted_for_bridges: Vec, ) -> Vec { - // If there are expired blockages, separate them from the fresh blockages let mut unaccounted_for: Vec = Vec::new(); for (k, _v) in self.bridge_table.reachable.clone() { if !accounted_for_bridges.contains(&k.uid_fingerprint) { @@ -393,8 +391,6 @@ impl BridgeAuth { } } unaccounted_for - //use open_inv_keys and blocked_keys from bridge_table to remove expired keys from table. - // make sure this happens before they are removed from the structures in the bridge table } pub fn allocate_bridges( @@ -446,10 +442,9 @@ impl BridgeAuth { assert!(bridgelines[*offset] == reachable_bridge.0); bridgelines[*offset] = *bridge; self.bridge_table.buckets.insert(*bucketnum, bridgelines); - match self.bridge_table.buckets.get(bucketnum) { - Some(bridgelines) => *bridgelines, - None => return res, - }; + if self.bridge_table.buckets.get(bucketnum).is_none() { + return res; + } } res = true; } else { @@ -776,6 +771,11 @@ impl BridgeAuth { .iter() .partition(|&x| x.1 + EXPIRY_DATE < self.today()); for item in expired { + // We should check that the items were actually distributed before they are removed + if !bdb.distributed_buckets.contains(&item.0) { + // TODO: Add prometheus metric for this? + println!("This bucket was not actually distributed!"); + } let new_item = item.0; bdb.remove_blocked_or_expired_buckets(&new_item); // Remove any trust upgrade migrations from this