Reconfiguring lox distributor handling of rdsys resources

This commit is contained in:
onyinyang 2023-08-28 14:14:41 -04:00
parent be0d026fe8
commit 6cc8033051
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
5 changed files with 231 additions and 142 deletions

View File

@ -2,6 +2,9 @@
"db": {
"db_path": "lox_db"
},
"bridge_allocation": {
"percent_spares": 50
},
"rtype": {
"endpoint": "http://127.0.0.1:7100/resources",
@ -11,5 +14,5 @@
"obfs2",
"scramblesuit"
]
}
}
}

View File

@ -31,12 +31,24 @@ impl LoxServerContext {
}
// Populate an empty bridgetable for the first time
/* pub fn populate_bridgetable(&self, bridgelines: Vec<BridgeLine>, percent_spares: Option<usize>) {
if Some(percent_spares) {
let partition: usize = bridgelines.len()*percent_spares/100;
pub fn populate_bridgetable(
&self,
buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>,
percent_spares: i32,
) {
let mut partition: i32 = 0;
if percent_spares != 0 {
partition = buckets.len() as i32 * percent_spares / 100;
}
let (spares, open_invitations) = buckets.split_at(partition as usize);
for bucket in spares {
self.add_spare_bucket(*bucket)
}
} */
for bucket in open_invitations {
self.add_openinv_bucket(*bucket)
}
}
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();

View File

@ -20,7 +20,7 @@ mod lox_context;
mod request_handler;
use request_handler::handle;
mod resource_parser;
use resource_parser::parse_resources;
use resource_parser::{parse_into_bridgelines, parse_into_buckets};
use tokio::{
signal, spawn,
@ -55,10 +55,11 @@ struct Args {
#[derive(Debug, Deserialize)]
struct Config {
db: DbConfig,
bridge_allocation: BridgeConfig,
rtype: ResourceInfo,
}
// Path of the lox database
#[derive(Debug, Deserialize)]
pub struct DbConfig {
// The path for the lox_context database, default is "lox_db"
@ -73,6 +74,21 @@ impl Default for DbConfig {
}
}
// Config information for how bridges should be allocated to buckets
#[derive(Debug, Deserialize)]
pub struct BridgeConfig {
// The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges)
// that should be allocated as spare buckets
// This will be calculated as the floor of buckets.len() * percent_spares / 100
percent_spares: i32,
}
impl Default for BridgeConfig {
fn default() -> BridgeConfig {
BridgeConfig { percent_spares: 0 }
}
}
#[derive(Debug, Deserialize)]
struct ResourceInfo {
endpoint: String,
@ -138,12 +154,13 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<V
async fn create_context_manager(
db_config: DbConfig,
bridge_config: BridgeConfig,
roll_back_date: Option<String>,
context_rx: mpsc::Receiver<Command>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
create_context = context_manager(db_config, roll_back_date, context_rx) => create_context,
create_context = context_manager(db_config, bridge_config, roll_back_date, context_rx) => create_context,
_ = kill.recv() => {println!("Shut down context_manager");},
}
}
@ -153,6 +170,7 @@ async fn create_context_manager(
// can be responded to with an updated BridgeDB state
async fn context_manager(
db_config: DbConfig,
bridge_config: BridgeConfig,
roll_back_date: Option<String>,
mut context_rx: mpsc::Receiver<Command>,
) {
@ -167,16 +185,29 @@ async fn context_manager(
use Command::*;
match cmd {
Rdsys { resources } => {
let mut count = 0;
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
// If the bridgetable is not being loaded from an existing database, we will populate the
// bridgetable with all of the bridges received from rdsys
if context.bridgetable_is_empty() {
// otherwise, for each resource, check if the resource fingerprint is failing tests, if it is check for how long
let bridgelines = parse_into_bridgelines(resources);
let (buckets, leftovers) = parse_into_buckets(bridgelines);
for leftover in leftovers {
context.append_extra_bridges(leftover);
}
context.populate_bridgetable(buckets, bridge_config.percent_spares);
// otherwise, we need to sync the existing bridgetable with the resources we receive from
// rdsys and ensure that all functioning bridges are correctly placed in the bridgetable
// those that have changed are updated and those that have been failing tests for an extended
// period of time are removed.
// If bridges are labelled as blocked_in, we should also handle blocking behaviour.
} else {
// for each resource, check if the resource fingerprint is failing tests, if it is check for how long
// check if the resource is already in the Lox bridgetable
// if it is, it's probably fine to remove or replace the existing resource with the incoming one
// to account for changes unless we want to track the number of changes on the lox side?
// that should be sufficient to keep it in sync
let bridgelines = parse_resources(resources);
for bridgeline in bridgelines {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
/* for bridgeline in bridgelines {
//context.populate_bridgetable(bridgelines. None);
println!("What is the bridgeline: {:?}", bridgeline);
if context.to_be_replaced_bridges.lock().unwrap().len() > 0 {
@ -227,7 +258,8 @@ async fn context_manager(
}
}
}
/*
let bridgeline = parse_resource(resource);
println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline);
@ -299,6 +331,7 @@ async fn context_manager(
//TODO probably do something else here
}
*/
}
context.allocate_leftover_bridges();
context.encrypt_table();
lox_db.write_context(context.clone());
@ -372,7 +405,14 @@ async fn main() {
});
let context_manager = spawn(async move {
create_context_manager(config.db, args.roll_back_date, context_rx, kill_context).await
create_context_manager(
config.db,
config.bridge_allocation,
args.roll_back_date,
context_rx,
kill_context,
)
.await
});
let (tx, rx) = mpsc::channel(32);

View File

@ -1,7 +1,8 @@
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES};
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET};
use rdsys_backend::proto::Resource;
pub fn parse_resources(resources: Vec<Resource>) -> Vec<BridgeLine> {
// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable
pub fn parse_into_bridgelines(resources: Vec<Resource>) -> Vec<BridgeLine> {
let mut bridgelines: Vec<BridgeLine> = Vec::new();
for resource in resources {
let mut ip_bytes: [u8; 16] = [0; 16];
@ -32,3 +33,36 @@ pub fn parse_resources(resources: Vec<Resource>) -> Vec<BridgeLine> {
}
bridgelines
}
// Allocate each Bridgeline into a bucket that will later be allocated into spare buckets or open invitation buckets
// Any leftover buckets from total_bridgelines % MAX_BRIDGES_PER_BUCKET are returned in a separate Vec<Bridgeline>
// TODO: Improve this function to sort bridgelines into buckets in a more intentional manner. This could include
// sorting bridgelines with high bandwidth into buckets that are only distributed to more trusted users or sorting
// bridgelines by location
pub fn parse_into_buckets(
mut bridgelines: Vec<BridgeLine>,
) -> (Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>, Vec<BridgeLine>) {
let mut buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]> = Vec::new();
let mut count = 0;
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
let mut leftovers: Vec<BridgeLine> = Vec::new();
for bridgeline in bridgelines.clone() {
println!("What is the bridgeline: {:?}", bridgeline);
if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
buckets.push(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
}
}
// Handle the extra buckets that were not allocated already
if count != 0 {
for _ in 0..count {
// Assumes that the unallocated bridgelines will be the last x of the passed bridgelines
leftovers.push(bridgelines.pop().unwrap());
}
}
(buckets, leftovers)
}

View File

@ -286,7 +286,7 @@ pub async fn request_resources( api_endpoint: String,
.await.unwrap();
match response.status() {
reqwest::StatusCode::OK => {
fetched_resources = match dbg!(response.json::<Vec<proto::Resource>>().await) {
fetched_resources = match response.json::<Vec<proto::Resource>>().await {
Ok(fetched_resources) => Ok(fetched_resources),
Err(e) => Err(Error::Reqwest(e)),
};