Fix up some minor issues + fix bug in context sync

This commit is contained in:
onyinyang 2023-10-16 17:47:08 -04:00
parent 89fb0fbf4b
commit 4f7b96a603
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
3 changed files with 143 additions and 128 deletions

View File

@ -8,10 +8,11 @@ use lox_library::{
}, },
BridgeAuth, BridgeDb, IssuerPubKey, BridgeAuth, BridgeDb, IssuerPubKey,
}; };
use rdsys_backend::proto::ResourceState; use rdsys_backend::proto::{Resource, ResourceState};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
cmp::Ordering,
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
@ -53,134 +54,149 @@ impl LoxServerContext {
} }
} }
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not pub fn handle_working_resources(&self, working_resources: Vec<Resource>) -> Vec<u64> {
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not let mut accounted_for_bridges: Vec<u64> = Vec::new();
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something let bridgelines = parse_into_bridgelines(working_resources);
like the following: for bridge in bridgelines {
println!("BridgeLine to be removed: {:?}", bridgeline); /* TODO: Functionality for marking bridges as unreachable/blocked should eventually happen here.
let res = context.add_unreachable(bridgeline); It is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked.
if res { This means that migrations to unblocked bridges do not currently work but can be easily enabled by parsing the
println!( list of `blocked resources` from rdsys or another source with something like the following:
"BridgeLine successfully marked unreachable: {:?}", let res = context.add_unreachable(bridgeline);
bridgeline if res {
); println!(
} else { "BridgeLine {:?} successfully marked unreachable: {:?}",
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline); bridgeline
//TODO probably do something else here );
} } else {
*/ println!(
"BridgeLine {:?} NOT marked unreachable, saved for next update!",
bridge.uid_fingerprint
);
}
*/
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
// Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later
} else {
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint);
self.append_extra_bridges(bridge);
}
}
accounted_for_bridges
}
pub fn handle_not_working_resources(
&self,
not_working_resources: Vec<Resource>,
mut accounted_for_bridges: Vec<u64>,
) -> Vec<u64> {
let (grace_period, failing) = sort_for_parsing(not_working_resources);
// Update bridges in the bridge table that are failing but within the grace period
for bridge in grace_period {
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
}
}
// Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago,
// it should be replaced with a working resource and be removed from the bridgetable.
for bridge in failing {
let res = self.replace_with_new(bridge);
if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine {:?} successfully replaced.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
} else if res == lox_library::ReplaceSuccess::NotReplaced {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!(
"BridgeLine {:?} NOT replaced, saved for next update!",
bridge.uid_fingerprint
);
self.new_to_be_replaced_bridge(bridge);
accounted_for_bridges.push(bridge.uid_fingerprint);
} else {
// NotFound
assert!(
res == lox_library::ReplaceSuccess::NotFound,
"ReplaceSuccess incorrectly set"
);
println!(
"BridgeLine {:?} no longer in bridge table.",
bridge.uid_fingerprint
);
}
}
accounted_for_bridges
}
// Sync resources received from rdsys with the Lox bridgetable // Sync resources received from rdsys with the Lox bridgetable
pub fn sync_with_bridgetable(&self, resources: ResourceState) { pub fn sync_with_bridgetable(&self, resources: ResourceState) {
// Check if the resource is already in the Lox bridgetable. If it is, it's probably fine // Check if each resource is already in the Lox bridgetable. If it is, it's probably fine
// to replace the existing resource with the incoming one to account for changes // to replace the existing resource with the incoming one to account for changes
// save a list of accounted for bridges and deal with the unaccounted for bridges at the end
let mut accounted_for_bridges: Vec<u64> = Vec::new(); let mut accounted_for_bridges: Vec<u64> = Vec::new();
// ensure all working resources are updated and accounted for
if let Some(working_resources) = resources.working { if let Some(working_resources) = resources.working {
// ensure all working resources are updated and accounted for accounted_for_bridges = self.handle_working_resources(working_resources);
// save a list of unaccounted for bridges and deal with them in the next block
let bridgelines = parse_into_bridgelines(working_resources);
for bridge in bridgelines {
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
// Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later
} else {
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint);
self.append_extra_bridges(bridge);
}
}
} }
if let Some(not_working_resources) = resources.not_working { if let Some(not_working_resources) = resources.not_working {
let (grace_period, failing) = sort_for_parsing(not_working_resources); accounted_for_bridges =
// Update bridges in the bridge table that are failing but within the grace period self.handle_not_working_resources(not_working_resources, accounted_for_bridges);
for bridge in grace_period { }
let res = self.update_bridge(bridge); let mut ba_clone = self.ba.lock().unwrap();
if res { let total_reachable = ba_clone.bridge_table.reachable.len();
println!( match total_reachable.cmp(&accounted_for_bridges.len()) {
"BridgeLine {:?} successfully updated.", Ordering::Greater => {
bridge.uid_fingerprint let unaccounted_for = ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges);
);
accounted_for_bridges.push(bridge.uid_fingerprint);
}
}
// Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago,
// it should be replaced with a working resource and be removed from the bridgetable.
for bridge in failing {
let res = self.replace_with_new(bridge);
if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine {:?} successfully replaced.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
} else if res == lox_library::ReplaceSuccess::NotReplaced {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!(
"BridgeLine {:?} NOT replaced, saved for next update!",
bridge.uid_fingerprint
);
self.new_to_be_replaced_bridge(bridge);
accounted_for_bridges.push(bridge.uid_fingerprint);
} else {
// NotFound
assert!(
res == lox_library::ReplaceSuccess::NotFound,
"ReplaceSuccess incorrectly set"
);
println!(
"BridgeLine {:?} no longer in bridge table.",
bridge.uid_fingerprint
);
}
}
// Make sure that all bridges are accounted for
let mut ba_clone = self.ba.lock().unwrap();
let total_reachable = ba_clone.bridge_table.reachable.len();
if total_reachable > accounted_for_bridges.len() {
let unaccounted_for =
ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges);
for bridgeline in unaccounted_for { for bridgeline in unaccounted_for {
let res = self.replace_with_new(bridgeline); match self.replace_with_new(bridgeline) {
if res == lox_library::ReplaceSuccess::Replaced { lox_library::ReplaceSuccess::Replaced => println!(
println!( "BridgeLine {:?} not found in rdsys update was successfully replaced.", bridgeline.uid_fingerprint),
"BridgeLine {:?} not found in rdsys update was successfully replaced.", lox_library::ReplaceSuccess::NotReplaced => {
bridgeline.uid_fingerprint // Add the bridge to the list of to_be_replaced bridges in the Lox context and try
); // again to replace at the next update (nothing changes in the Lox Authority)
} else if res == lox_library::ReplaceSuccess::NotReplaced { println!("BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!",
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try bridgeline.uid_fingerprint);
// again to replace at the next update (nothing changes in the Lox Authority) self.new_to_be_replaced_bridge(bridgeline)
println!( }
"BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!", lox_library::ReplaceSuccess::NotFound => println!(
bridgeline.uid_fingerprint
);
self.new_to_be_replaced_bridge(bridgeline);
} else {
// NotFound
assert!(
res == lox_library::ReplaceSuccess::NotFound,
"ReplaceSuccess incorrectly set"
);
println!(
"BridgeLine {:?} no longer in reachable bridges.", "BridgeLine {:?} no longer in reachable bridges.",
bridgeline.uid_fingerprint bridgeline.uid_fingerprint
); ),
} }
} }
// Search for extra fingerprints, assume those bridges are gone and remove
} else if total_reachable < accounted_for_bridges.len() {
println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys");
} }
// Finally, assign any extra_bridges to new buckets if there are enough Ordering::Less => println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"),
while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET { _ => (),
let bucket = self.remove_extra_bridges();
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, }
// eventually also do some more fancy grouping of new resources, i.e., by type or region // Finally, assign any extra_bridges to new buckets if there are enough
self.add_spare_bucket(bucket); while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET {
let bucket = self.remove_extra_bridges();
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
let mut db_obj = self.db.lock().unwrap();
match ba_clone.add_spare_bucket(bucket, &mut db_obj) {
Ok(_) => (),
Err(e) => {
println!("Error: {:?}", e);
for bridge in bucket {
self.append_extra_bridges(bridge);
}
}
} }
} }
} }
@ -262,7 +278,7 @@ impl LoxServerContext {
result result
} }
/* Uncomment when bridge blocking is finalized /* TODO: Uncomment when bridge blocking is finalized
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool { pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap(); let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap(); let mut db_obj = self.db.lock().unwrap();

View File

@ -93,7 +93,6 @@ struct ResourceInfo {
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified // Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
// in the config.json file. // in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_stream( async fn rdsys_stream(
rtype: ResourceInfo, rtype: ResourceInfo,
tx: mpsc::Sender<ResourceState>, tx: mpsc::Sender<ResourceState>,
@ -107,7 +106,7 @@ async fn rdsys_stream(
} }
async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<ResourceState>) { async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<ResourceState>) {
let mut interval = interval(Duration::from_secs(30)); let mut interval = interval(Duration::from_secs(5));
loop { loop {
interval.tick().await; interval.tick().await;
let resources = request_resources( let resources = request_resources(
@ -133,8 +132,8 @@ async fn rdsys_bridge_parser(
} }
} }
// Parse Bridges receives a Vec<Resource> from rdsys_sender and sends it to the // Parse Bridges receives the resources from rdsys and sends it to the
// Context Manager to be parsed and added to the BridgeDB // Context Manager to be parsed and added to the Lox BridgeDB
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceState>) { async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceState>) {
loop { loop {
let resources = rx.recv().await.unwrap(); let resources = rx.recv().await.unwrap();

View File

@ -380,12 +380,10 @@ impl BridgeAuth {
Ok(()) Ok(())
} }
// TODO Ensure synchronization of Lox bridge_table with rdsys
pub fn find_and_remove_unaccounted_for_bridges( pub fn find_and_remove_unaccounted_for_bridges(
&mut self, &mut self,
accounted_for_bridges: Vec<u64>, accounted_for_bridges: Vec<u64>,
) -> Vec<BridgeLine> { ) -> Vec<BridgeLine> {
// If there are expired blockages, separate them from the fresh blockages
let mut unaccounted_for: Vec<BridgeLine> = Vec::new(); let mut unaccounted_for: Vec<BridgeLine> = Vec::new();
for (k, _v) in self.bridge_table.reachable.clone() { for (k, _v) in self.bridge_table.reachable.clone() {
if !accounted_for_bridges.contains(&k.uid_fingerprint) { if !accounted_for_bridges.contains(&k.uid_fingerprint) {
@ -393,8 +391,6 @@ impl BridgeAuth {
} }
} }
unaccounted_for unaccounted_for
//use open_inv_keys and blocked_keys from bridge_table to remove expired keys from table.
// make sure this happens before they are removed from the structures in the bridge table
} }
pub fn allocate_bridges( pub fn allocate_bridges(
@ -446,10 +442,9 @@ impl BridgeAuth {
assert!(bridgelines[*offset] == reachable_bridge.0); assert!(bridgelines[*offset] == reachable_bridge.0);
bridgelines[*offset] = *bridge; bridgelines[*offset] = *bridge;
self.bridge_table.buckets.insert(*bucketnum, bridgelines); self.bridge_table.buckets.insert(*bucketnum, bridgelines);
match self.bridge_table.buckets.get(bucketnum) { if self.bridge_table.buckets.get(bucketnum).is_none() {
Some(bridgelines) => *bridgelines, return res;
None => return res, }
};
} }
res = true; res = true;
} else { } else {
@ -776,6 +771,11 @@ impl BridgeAuth {
.iter() .iter()
.partition(|&x| x.1 + EXPIRY_DATE < self.today()); .partition(|&x| x.1 + EXPIRY_DATE < self.today());
for item in expired { for item in expired {
// We should check that the items were actually distributed before they are removed
if !bdb.distributed_buckets.contains(&item.0) {
// TODO: Add prometheus metric for this?
println!("This bucket was not actually distributed!");
}
let new_item = item.0; let new_item = item.0;
bdb.remove_blocked_or_expired_buckets(&new_item); bdb.remove_blocked_or_expired_buckets(&new_item);
// Remove any trust upgrade migrations from this // Remove any trust upgrade migrations from this