lox/crates/lox-distributor/src/lox_context.rs

601 lines
23 KiB
Rust
Raw Normal View History

use hyper::{body::Bytes, header::HeaderValue, Body, Response};
use lox_library::{
bridge_table::{BridgeLine, EncryptedBucket, MAX_BRIDGES_PER_BUCKET},
proto::{
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
redeem_invite, trust_promotion,
},
2023-10-31 11:15:47 -04:00
BridgeAuth, BridgeDb, ExceededMaxBridgesError, IssuerPubKey,
};
use rdsys_backend::proto::{Resource, ResourceState};
use serde::{Deserialize, Serialize};
2023-07-19 10:37:09 -04:00
use std::{
cmp::Ordering,
2023-07-19 10:37:09 -04:00
collections::HashMap,
sync::{Arc, Mutex},
};
use zkp::ProofError;
2023-10-30 12:54:59 -04:00
use crate::metrics::Metrics;
2023-10-25 13:58:24 -04:00
use crate::resource_parser::{parse_into_bridgelines, sort_for_parsing};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LoxServerContext {
pub db: Arc<Mutex<BridgeDb>>,
pub ba: Arc<Mutex<BridgeAuth>>,
pub extra_bridges: Arc<Mutex<Vec<BridgeLine>>>,
pub to_be_replaced_bridges: Arc<Mutex<Vec<BridgeLine>>>,
2023-10-30 12:54:59 -04:00
#[serde(skip)]
pub metrics: Metrics,
}
impl LoxServerContext {
2023-09-13 12:08:48 -04:00
pub fn bridgetable_is_empty(&self) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.is_empty()
}
// Populate an empty bridgetable for the first time
pub fn populate_bridgetable(
&self,
buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>,
percent_spares: i32,
) {
let mut partition: i32 = 0;
if percent_spares != 0 {
partition = buckets.len() as i32 * percent_spares / 100;
}
let (spares, open_invitations) = buckets.split_at(partition as usize);
for bucket in spares {
self.add_spare_bucket(*bucket)
2023-09-13 12:08:48 -04:00
}
for bucket in open_invitations {
self.add_openinv_bucket(*bucket)
}
}
2023-09-13 12:08:48 -04:00
pub fn handle_working_resources(&self, working_resources: Vec<Resource>) -> Vec<u64> {
let mut accounted_for_bridges: Vec<u64> = Vec::new();
let bridgelines = parse_into_bridgelines(working_resources);
for bridge in bridgelines {
/* TODO: Functionality for marking bridges as unreachable/blocked should eventually happen here.
It is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked.
This means that migrations to unblocked bridges do not currently work but can be easily enabled by parsing the
list of `blocked resources` from rdsys or another source with something like the following:
let res = context.add_unreachable(bridgeline);
if res {
println!(
"BridgeLine {:?} successfully marked unreachable: {:?}",
bridgeline
);
} else {
println!(
"BridgeLine {:?} NOT marked unreachable, saved for next update!",
bridge.uid_fingerprint
);
}
*/
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
2023-10-26 12:19:28 -04:00
self.metrics.existing_or_updated_bridges.inc();
// Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later
} else {
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint);
self.append_extra_bridges(bridge);
2023-10-26 12:19:28 -04:00
self.metrics.new_bridges.inc();
}
}
accounted_for_bridges
}
pub fn handle_not_working_resources(
&self,
not_working_resources: Vec<Resource>,
mut accounted_for_bridges: Vec<u64>,
) -> Vec<u64> {
let (grace_period, failing) = sort_for_parsing(not_working_resources);
// Update bridges in the bridge table that are failing but within the grace period
for bridge in grace_period {
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
2023-10-26 12:19:28 -04:00
self.metrics.existing_or_updated_bridges.inc();
}
}
// Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago,
// it should be replaced with a working resource and be removed from the bridgetable.
for bridge in failing {
let res = self.replace_with_new(bridge);
if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine {:?} successfully replaced.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
2023-10-26 12:19:28 -04:00
self.metrics.removed_bridges.inc();
} else if res == lox_library::ReplaceSuccess::NotReplaced {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!(
"BridgeLine {:?} NOT replaced, saved for next update!",
bridge.uid_fingerprint
);
self.new_to_be_replaced_bridge(bridge);
2023-10-26 12:19:28 -04:00
self.metrics.existing_or_updated_bridges.inc();
accounted_for_bridges.push(bridge.uid_fingerprint);
} else {
// NotFound
assert!(
res == lox_library::ReplaceSuccess::NotFound,
"ReplaceSuccess incorrectly set"
);
println!(
"BridgeLine {:?} no longer in bridge table.",
bridge.uid_fingerprint
);
}
}
accounted_for_bridges
}
2023-08-28 19:33:19 -04:00
// Sync resources received from rdsys with the Lox bridgetable
pub fn sync_with_bridgetable(&self, resources: ResourceState) {
// Check if each resource is already in the Lox bridgetable. If it is, it's probably fine
// to replace the existing resource with the incoming one to account for changes
// save a list of accounted for bridges and deal with the unaccounted for bridges at the end
let mut accounted_for_bridges: Vec<u64> = Vec::new();
// ensure all working resources are updated and accounted for
if let Some(working_resources) = resources.working {
accounted_for_bridges = self.handle_working_resources(working_resources);
2023-08-28 19:33:19 -04:00
}
if let Some(not_working_resources) = resources.not_working {
accounted_for_bridges =
self.handle_not_working_resources(not_working_resources, accounted_for_bridges);
}
let mut ba_clone = self.ba.lock().unwrap();
let total_reachable = ba_clone.bridge_table.reachable.len();
match total_reachable.cmp(&accounted_for_bridges.len()) {
Ordering::Greater => {
let unaccounted_for = ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges);
for bridgeline in unaccounted_for {
match self.replace_with_new(bridgeline) {
2023-10-26 12:19:28 -04:00
lox_library::ReplaceSuccess::Replaced => {
println!("BridgeLine {:?} not found in rdsys update was successfully replaced.", bridgeline.uid_fingerprint);
2023-10-26 13:22:19 -04:00
self.metrics.removed_bridges.inc();
}
lox_library::ReplaceSuccess::NotReplaced => {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!("BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!",
bridgeline.uid_fingerprint);
2023-10-26 12:19:28 -04:00
self.new_to_be_replaced_bridge(bridgeline);
2023-10-26 13:22:19 -04:00
self.metrics.existing_or_updated_bridges.inc();
}
lox_library::ReplaceSuccess::NotFound => println!(
"BridgeLine {:?} no longer in reachable bridges.",
bridgeline.uid_fingerprint
),
}
}
}
Ordering::Less => println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"),
_ => (),
}
// Finally, assign any extra_bridges to new buckets if there are enough
while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET {
let bucket = self.remove_extra_bridges();
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
let mut db_obj = self.db.lock().unwrap();
match ba_clone.add_spare_bucket(bucket, &mut db_obj) {
Ok(_) => (),
Err(e) => {
println!("Error: {:?}", e);
for bridge in bucket {
self.append_extra_bridges(bridge);
}
}
2023-08-28 19:33:19 -04:00
}
}
}
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
extra_bridges.push(bridge);
}
pub fn remove_extra_bridges(&self) -> [BridgeLine; MAX_BRIDGES_PER_BUCKET] {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
let mut return_bridges = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
2023-06-16 12:27:40 -04:00
for bridge in return_bridges.iter_mut() {
if let Some(extra) = extra_bridges.pop() {
*bridge = extra
}
}
return_bridges
}
pub fn remove_single_bridge(&self) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
let length = extra_bridges.len();
_ = extra_bridges.remove(length - 1)
}
pub fn new_to_be_replaced_bridge(&self, bridge: BridgeLine) {
let mut to_be_replaced_bridges = self.to_be_replaced_bridges.lock().unwrap();
to_be_replaced_bridges.push(bridge);
}
2023-05-08 20:30:30 -04:00
pub fn allocate_leftover_bridges(&self) {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
let mut extra_bridges = self.extra_bridges.lock().unwrap();
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
}
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
match ba_obj.add_openinv_bridges(bucket, &mut db_obj) {
Ok(_) => (),
Err(e) => {
println!("Error: {:?}", e);
for bridge in bucket {
self.append_extra_bridges(bridge);
}
}
}
}
pub fn add_spare_bucket(&self, bucket: [BridgeLine; 3]) {
let mut ba_obj = self.ba.lock().unwrap();
2023-07-13 17:36:40 -04:00
let mut db_obj = self.db.lock().unwrap();
match ba_obj.add_spare_bucket(bucket, &mut db_obj) {
Ok(_) => (),
Err(e) => {
println!("Error: {:?}", e);
for bridge in bucket {
self.append_extra_bridges(bridge);
}
}
}
}
2023-08-28 19:33:19 -04:00
// Attempt to remove a bridge that is failing tests and replace it with a bridge from the
// available bridges or from a spare bucket
pub fn replace_with_new(&self, bridgeline: BridgeLine) -> lox_library::ReplaceSuccess {
let mut ba_obj = self.ba.lock().unwrap();
let eb_obj = self.extra_bridges.lock().unwrap();
let available_bridge = eb_obj.last();
let result = ba_obj.bridge_replace(&bridgeline, available_bridge);
// .last() doesn't actually remove the object so we still have to do that if the bridge was
// replaced with an available bridge
if result == lox_library::ReplaceSuccess::Replaced && eb_obj.len() > 0 {
self.remove_single_bridge();
}
result
}
/* TODO: Uncomment when bridge blocking is finalized
2023-07-19 10:37:09 -04:00
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
}
*/
pub fn update_bridge(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.bridge_update(&bridgeline)
}
#[cfg(test)]
/// For testing only: manually advance the day by the given number
/// of days.
pub fn advance_days_test(&self, num: u16) {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.advance_days(num); // FOR TESTING ONLY
println!("Today's date according to server: {}", ba_obj.today());
}
pub fn encrypt_table(&self) -> HashMap<u32, EncryptedBucket> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.enc_bridge_table().clone()
}
fn pubkeys(&self) -> Vec<IssuerPubKey> {
let ba_obj = self.ba.lock().unwrap();
// vector of public keys (to serialize)
vec![
ba_obj.lox_pub.clone(),
ba_obj.migration_pub.clone(),
ba_obj.migrationkey_pub.clone(),
ba_obj.reachability_pub.clone(),
ba_obj.invitation_pub.clone(),
]
}
fn gen_invite(&self) -> Result<lox_utils::Invite, ExceededMaxBridgesError> {
let mut obj = self.db.lock().unwrap();
match obj.invite() {
2023-10-30 12:54:59 -04:00
Ok(invite) => {
if obj.current_k == 1 {
self.metrics.k_reset_count.inc();
}
Ok(lox_utils::Invite { invite })
}
Err(e) => Err(e),
}
}
fn open_inv(&self, req: open_invite::Request) -> Result<open_invite::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_open_invite(req)
}
fn trust_promo(
&self,
req: trust_promotion::Request,
) -> Result<trust_promotion::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_trust_promotion(req)
}
fn trust_migration(&self, req: migration::Request) -> Result<migration::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_migration(req)
}
fn level_up(&self, req: level_up::Request) -> Result<level_up::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_level_up(req)
}
fn issue_invite(
&self,
req: issue_invite::Request,
) -> Result<issue_invite::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_issue_invite(req)
}
fn redeem_invite(
&self,
req: redeem_invite::Request,
) -> Result<redeem_invite::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_redeem_invite(req)
}
fn check_blockage(
&self,
req: check_blockage::Request,
) -> Result<check_blockage::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_check_blockage(req)
}
fn blockage_migration(
&self,
req: blockage_migration::Request,
) -> Result<blockage_migration::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_blockage_migration(req)
}
// Generate and return an open invitation token
pub fn generate_invite(self) -> Response<Body> {
2023-10-31 11:15:47 -04:00
self.metrics.invites_requested.inc();
let invite = self.gen_invite();
match invite {
Ok(invite) => match serde_json::to_string(&invite) {
Ok(resp) => prepare_header(resp),
Err(e) => {
println!("Error parsing Invite to JSON");
prepare_error_header(e.to_string())
}
},
Err(e) => {
println!("Error parsing Invite to JSON");
prepare_error_header(e.to_string())
}
}
}
// Return the serialized encrypted bridge table
pub fn send_reachability_cred(self) -> Response<Body> {
let enc_table = self.encrypt_table();
let etable = lox_utils::EncBridgeTable { etable: enc_table };
match serde_json::to_string(&etable) {
Ok(resp) => prepare_header(resp),
Err(e) => {
println!("Error parsing encrypted bridgetable to JSON");
prepare_error_header(e.to_string())
}
}
}
// Return the serialized pubkeys for the Bridge Authority
pub fn send_keys(self) -> Response<Body> {
let pubkeys = self.pubkeys();
match serde_json::to_string(&pubkeys) {
Ok(resp) => prepare_header(resp),
Err(e) => {
println!("Error parsing Pubkeys to JSON");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_open_cred(self, request: Bytes) -> Response<Body> {
let req = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.open_inv(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.open_inv_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Open Invitation request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_trust_promo(self, request: Bytes) -> Response<Body> {
let req: trust_promotion::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.trust_promo(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.trust_promo_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Trust Promotion request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_trust_migration(self, request: Bytes) -> Response<Body> {
let req: migration::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.trust_migration(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.trust_mig_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Trust Migration request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_level_up(self, request: Bytes) -> Response<Body> {
let req: level_up::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.level_up(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.level_up_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Level up request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_issue_invite(self, request: Bytes) -> Response<Body> {
let req: issue_invite::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.issue_invite(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.issue_invite_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Issue invite request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_redeem_invite(self, request: Bytes) -> Response<Body> {
let req: redeem_invite::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.redeem_invite(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.redeem_invite_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Redeem Invite request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_check_blockage(self, request: Bytes) -> Response<Body> {
let req: check_blockage::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.check_blockage(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.check_blockage_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Check blockage request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
pub fn verify_and_send_blockage_migration(self, request: Bytes) -> Response<Body> {
let req: blockage_migration::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => return prepare_error_header(e.to_string()),
};
match self.blockage_migration(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
2023-10-30 12:54:59 -04:00
self.metrics.blockage_migration_count.inc();
prepare_header(response)
}
Err(e) => {
println!("Invalid Blockage Migration request, Proof Error");
prepare_error_header(e.to_string())
}
}
}
}
fn prepare_header(response: String) -> Response<Body> {
let mut resp = Response::new(Body::from(response));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}
fn prepare_error_header(error: String) -> Response<Body> {
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(error))
.unwrap()
}