Add syncing functionality
This commit is contained in:
parent
6cc8033051
commit
25add69c4d
|
@ -3,7 +3,7 @@
|
|||
"db_path": "lox_db"
|
||||
|
||||
},
|
||||
"bridge_allocation": {
|
||||
"bridge_config": {
|
||||
"percent_spares": 50
|
||||
},
|
||||
"rtype": {
|
||||
|
|
|
@ -7,7 +7,7 @@ use sled::IVec;
|
|||
|
||||
pub struct DB {
|
||||
db: sled::Db,
|
||||
}
|
||||
}
|
||||
|
||||
impl DB {
|
||||
pub fn write_context(&mut self, context: lox_context::LoxServerContext) {
|
||||
|
|
|
@ -50,6 +50,79 @@ impl LoxServerContext {
|
|||
}
|
||||
}
|
||||
|
||||
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not
|
||||
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not
|
||||
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something
|
||||
like the following:
|
||||
println!("BridgeLine to be removed: {:?}", bridgeline);
|
||||
let res = context.add_unreachable(bridgeline);
|
||||
if res {
|
||||
println!(
|
||||
"BridgeLine successfully marked unreachable: {:?}",
|
||||
bridgeline
|
||||
);
|
||||
} else {
|
||||
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
|
||||
//TODO probably do something else here
|
||||
}
|
||||
*/
|
||||
|
||||
// Sync resources received from rdsys with the Lox bridgetable
|
||||
pub fn sync_with_bridgetable(&self, functional: Vec<BridgeLine>, failing: Vec<BridgeLine>) {
|
||||
// Check if the resource is already in the Lox bridgetable. If it is, it's probably fine
|
||||
// to replace the existing resource with the incoming one to account for changes,
|
||||
// unless we want to track the number of changes on the lox side?
|
||||
for bridge in functional {
|
||||
let res = self.update_bridge(bridge);
|
||||
if res {
|
||||
println!(
|
||||
"BridgeLine {:?} successfully updated.",
|
||||
bridge.uid_fingerprint
|
||||
);
|
||||
// Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later
|
||||
} else {
|
||||
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint);
|
||||
self.append_extra_bridges(bridge);
|
||||
}
|
||||
}
|
||||
// Next, handle the failing bridges. If resource last passed tests 3 hours ago, it should be replaced
|
||||
// with a working resource and be removed from the bridgetable.
|
||||
for bridge in failing {
|
||||
let res = self.replace_with_new(bridge);
|
||||
if res == lox_library::ReplaceSuccess::Replaced {
|
||||
println!(
|
||||
"BridgeLine {:?} successfully replaced.",
|
||||
bridge.uid_fingerprint
|
||||
);
|
||||
} else if res == lox_library::ReplaceSuccess::NotReplaced {
|
||||
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
|
||||
// again to replace at the next update (nothing changes in the Lox Authority)
|
||||
println!(
|
||||
"BridgeLine {:?} NOT replaced, saved for next update!",
|
||||
bridge.uid_fingerprint
|
||||
);
|
||||
self.new_to_be_replaced_bridge(bridge);
|
||||
} else {
|
||||
// NotFound
|
||||
assert!(
|
||||
res == lox_library::ReplaceSuccess::NotFound,
|
||||
"ReplaceSuccess incorrectly set"
|
||||
);
|
||||
println!(
|
||||
"BridgeLine {:?} no longer in bridge table.",
|
||||
bridge.uid_fingerprint
|
||||
);
|
||||
}
|
||||
}
|
||||
// Finally, assign any extra_bridges to new buckets if there are enough
|
||||
while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET {
|
||||
let bucket = self.remove_extra_bridges();
|
||||
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
|
||||
// eventually also do some more fancy grouping of new resources, i.e., by type or region
|
||||
self.add_spare_bucket(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
|
||||
let mut extra_bridges = self.extra_bridges.lock().unwrap();
|
||||
extra_bridges.push(bridge);
|
||||
|
@ -112,6 +185,8 @@ impl LoxServerContext {
|
|||
}
|
||||
}
|
||||
|
||||
// Attempt to remove a bridge that is failing tests and replace it with a bridge from the
|
||||
// available bridges or from a spare bucket
|
||||
pub fn replace_with_new(&self, bridgeline: BridgeLine) -> lox_library::ReplaceSuccess {
|
||||
let mut ba_obj = self.ba.lock().unwrap();
|
||||
let eb_obj = self.extra_bridges.lock().unwrap();
|
||||
|
|
|
@ -5,7 +5,6 @@ use hyper::{
|
|||
service::{make_service_fn, service_fn},
|
||||
Body, Request, Response, Server,
|
||||
};
|
||||
use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
|
||||
|
||||
use rdsys_backend::{proto::Resource, request_resources};
|
||||
use serde::Deserialize;
|
||||
|
@ -28,6 +27,8 @@ use tokio::{
|
|||
time::{interval, sleep},
|
||||
};
|
||||
|
||||
use crate::resource_parser::sort_for_parsing;
|
||||
|
||||
async fn shutdown_signal() {
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
|
@ -55,7 +56,7 @@ struct Args {
|
|||
#[derive(Debug, Deserialize)]
|
||||
struct Config {
|
||||
db: DbConfig,
|
||||
bridge_allocation: BridgeConfig,
|
||||
bridge_config: BridgeConfig,
|
||||
rtype: ResourceInfo,
|
||||
}
|
||||
|
||||
|
@ -75,7 +76,7 @@ impl Default for DbConfig {
|
|||
}
|
||||
|
||||
// Config information for how bridges should be allocated to buckets
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct BridgeConfig {
|
||||
// The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges)
|
||||
// that should be allocated as spare buckets
|
||||
|
@ -83,12 +84,6 @@ pub struct BridgeConfig {
|
|||
percent_spares: i32,
|
||||
}
|
||||
|
||||
impl Default for BridgeConfig {
|
||||
fn default() -> BridgeConfig {
|
||||
BridgeConfig { percent_spares: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResourceInfo {
|
||||
endpoint: String,
|
||||
|
@ -108,7 +103,7 @@ async fn rdsys_stream(
|
|||
) {
|
||||
tokio::select! {
|
||||
start_resource_request = rdsys_request(rtype, tx) => start_resource_request,
|
||||
_ = kill.recv() => {println!("Shut down rdsys request loop"); return},
|
||||
_ = kill.recv() => {println!("Shut down rdsys request loop")},
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -200,138 +195,10 @@ async fn context_manager(
|
|||
// period of time are removed.
|
||||
// If bridges are labelled as blocked_in, we should also handle blocking behaviour.
|
||||
} else {
|
||||
// for each resource, check if the resource fingerprint is failing tests, if it is check for how long
|
||||
// check if the resource is already in the Lox bridgetable
|
||||
// if it is, it's probably fine to remove or replace the existing resource with the incoming one
|
||||
// to account for changes unless we want to track the number of changes on the lox side?
|
||||
// that should be sufficient to keep it in sync
|
||||
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
|
||||
// eventually also do some more fancy grouping of new resources, i.e., by type or region
|
||||
/* for bridgeline in bridgelines {
|
||||
//context.populate_bridgetable(bridgelines. None);
|
||||
println!("What is the bridgeline: {:?}", bridgeline);
|
||||
if context.to_be_replaced_bridges.lock().unwrap().len() > 0 {
|
||||
println!("BridgeLine to be replaced: {:?}", bridgeline);
|
||||
let res = context.replace_with_new(bridgeline);
|
||||
if res == lox_library::ReplaceSuccess::NotFound {
|
||||
println!(
|
||||
"BridgeLine not found in bridge_table, already updated {:?}",
|
||||
bridgeline
|
||||
);
|
||||
} else if res == lox_library::ReplaceSuccess::Replaced {
|
||||
println!("BridgeLine successfully replaced: {:?}", bridgeline);
|
||||
} else {
|
||||
assert!(
|
||||
res == lox_library::ReplaceSuccess::NotReplaced,
|
||||
"ReplaceSuccess incorrectly set somehow"
|
||||
);
|
||||
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
|
||||
// again to replace at the next update (nothing changes in the Lox Authority)
|
||||
println!(
|
||||
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
|
||||
bridgeline
|
||||
);
|
||||
context.new_to_be_replaced_bridge(bridgeline);
|
||||
}
|
||||
} else if count < MAX_BRIDGES_PER_BUCKET {
|
||||
bucket[count] = bridgeline;
|
||||
count += 1;
|
||||
} else {
|
||||
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
|
||||
// eventually also do some more fancy grouping of new resources, i.e., by type or region
|
||||
context.add_openinv_bucket(bucket);
|
||||
count = 0;
|
||||
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
|
||||
}
|
||||
}
|
||||
// Handle the extra buckets that were not allocated already
|
||||
if count != 0 {
|
||||
for val in 0..count {
|
||||
if context.extra_bridges.lock().unwrap().len()
|
||||
< (MAX_BRIDGES_PER_BUCKET)
|
||||
{
|
||||
context.append_extra_bridges(bucket[val]);
|
||||
} else {
|
||||
bucket = context.remove_extra_bridges();
|
||||
context.add_spare_bucket(bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
let bridgeline = parse_resource(resource);
|
||||
println!("BridgeLine to be changed: {:?}", bridgeline);
|
||||
let res = context.update_bridge(bridgeline);
|
||||
if res {
|
||||
println!("BridgeLine successfully updated: {:?}", bridgeline);
|
||||
} else {
|
||||
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline);
|
||||
if context.extra_bridges.lock().unwrap().len() < 2 {
|
||||
context.append_extra_bridges(bridgeline);
|
||||
} else {
|
||||
let bucket = context.remove_extra_bridges();
|
||||
context.add_spare_bucket(bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// gone resources are not the same as blocked resources.
|
||||
// Instead, these are bridges which have either failed to pass tests for some period
|
||||
// or have expired bridge descriptors. In both cases, the bridge is unusable, but this
|
||||
// is not likely due to censorship. Therefore, we replace gone resources with new resources
|
||||
// TODO: create a notion of blocked resources from information collected through various means:
|
||||
// https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035
|
||||
// If resource last passed tests 3 hours ago, it should be replaced with a working
|
||||
// resource and be removed from the bridgetable. If it has been gone for more than 7 hours,
|
||||
// we should stop trying to remove it from the bridge table and assume it has successfully been
|
||||
// removed already
|
||||
if resource.last_passed < (Utc::now() - chrono::Duration::hours(3))
|
||||
|| resource.last_passed
|
||||
> (Utc::now() - chrono::Duration::hours(7))
|
||||
{
|
||||
let bridgeline = parse_resource(resource);
|
||||
println!("BridgeLine to be replaced: {:?}", bridgeline);
|
||||
let res = context.replace_with_new(bridgeline);
|
||||
if res == lox_library::ReplaceSuccess::Replaced {
|
||||
println!(
|
||||
"BridgeLine successfully replaced: {:?}",
|
||||
bridgeline
|
||||
);
|
||||
} else if res == lox_library::ReplaceSuccess::NotReplaced {
|
||||
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
|
||||
// again to replace at the next update (nothing changes in the Lox Authority)
|
||||
println!(
|
||||
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
|
||||
bridgeline
|
||||
);
|
||||
context.new_to_be_replaced_bridge(bridgeline);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not
|
||||
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not
|
||||
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something
|
||||
like the following:
|
||||
println!("BridgeLine to be removed: {:?}", bridgeline);
|
||||
let res = context.add_unreachable(bridgeline);
|
||||
if res {
|
||||
println!(
|
||||
"BridgeLine successfully marked unreachable: {:?}",
|
||||
bridgeline
|
||||
);
|
||||
} else {
|
||||
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
|
||||
//TODO probably do something else here
|
||||
}
|
||||
*/
|
||||
let (functional, failing) = sort_for_parsing(resources);
|
||||
context.sync_with_bridgetable(functional, failing);
|
||||
}
|
||||
// Handle any bridges that are leftover in the bridge authority from the sync
|
||||
context.allocate_leftover_bridges();
|
||||
context.encrypt_table();
|
||||
lox_db.write_context(context.clone());
|
||||
|
@ -407,7 +274,7 @@ async fn main() {
|
|||
let context_manager = spawn(async move {
|
||||
create_context_manager(
|
||||
config.db,
|
||||
config.bridge_allocation,
|
||||
config.bridge_config,
|
||||
args.roll_back_date,
|
||||
context_rx,
|
||||
kill_context,
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use chrono::{Duration, Utc};
|
||||
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET};
|
||||
use rdsys_backend::proto::Resource;
|
||||
|
||||
pub const ACCEPTED_HOURS_OF_FAILURE: i64 = 3;
|
||||
|
||||
// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable
|
||||
pub fn parse_into_bridgelines(resources: Vec<Resource>) -> Vec<BridgeLine> {
|
||||
let mut bridgelines: Vec<BridgeLine> = Vec::new();
|
||||
|
@ -66,3 +69,113 @@ pub fn parse_into_buckets(
|
|||
}
|
||||
(buckets, leftovers)
|
||||
}
|
||||
|
||||
// Sort Resources into those that are functional and those that are failing based on the last time
|
||||
// they were passing tests. Before passing them back to the calling function, they are parsed into
|
||||
// BridgeLines
|
||||
pub fn sort_for_parsing(resources: Vec<Resource>) -> (Vec<BridgeLine>, Vec<BridgeLine>) {
|
||||
let mut functional: Vec<Resource> = Vec::new();
|
||||
let mut failing: Vec<Resource> = Vec::new();
|
||||
for resource in resources {
|
||||
if resource.last_passed + Duration::hours(ACCEPTED_HOURS_OF_FAILURE) >= Utc::now() {
|
||||
functional.push(resource);
|
||||
} else {
|
||||
failing.push(resource);
|
||||
}
|
||||
}
|
||||
let functional_bridgelines = parse_into_bridgelines(functional);
|
||||
let failing_bridgelines = parse_into_bridgelines(failing);
|
||||
|
||||
(functional_bridgelines, failing_bridgelines)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rdsys_backend::proto::Resource;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use chrono::{Duration, Utc};
|
||||
|
||||
use super::sort_for_parsing;
|
||||
|
||||
pub fn make_resource(
|
||||
rtype: String,
|
||||
address: String,
|
||||
port: u16,
|
||||
fingerprint: String,
|
||||
last_passed: i64,
|
||||
) -> Resource {
|
||||
let mut flags = HashMap::new();
|
||||
flags.insert(String::from("fast"), true);
|
||||
flags.insert(String::from("stable"), true);
|
||||
let mut params = HashMap::new();
|
||||
params.insert(
|
||||
String::from("password"),
|
||||
String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"),
|
||||
);
|
||||
Resource {
|
||||
r#type: String::from(rtype),
|
||||
blocked_in: HashMap::new(),
|
||||
last_passed: Utc::now() - Duration::hours(last_passed),
|
||||
protocol: String::from("tcp"),
|
||||
address: String::from(address),
|
||||
port: port,
|
||||
fingerprint: String::from(fingerprint),
|
||||
or_addresses: None,
|
||||
distribution: String::from("https"),
|
||||
flags: Some(flags),
|
||||
params: Some(params),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sort_for_parsing() {
|
||||
let resource_one = make_resource(
|
||||
"scramblesuit".to_owned(),
|
||||
"123.456.789.100".to_owned(),
|
||||
3002,
|
||||
"BE84A97D02130470A1C77839954392BA979F7EE1".to_owned(),
|
||||
2,
|
||||
);
|
||||
let resource_two = make_resource(
|
||||
"https".to_owned(),
|
||||
"123.222.333.444".to_owned(),
|
||||
6002,
|
||||
"C56B9EF202130470A1C77839954392BA979F7FF9".to_owned(),
|
||||
5,
|
||||
);
|
||||
let resource_three = make_resource(
|
||||
"scramblesuit".to_owned(),
|
||||
"444.888.222.100".to_owned(),
|
||||
3042,
|
||||
"1A4C8BD902130470A1C77839954392BA979F7B46".to_owned(),
|
||||
4,
|
||||
);
|
||||
let resource_four = make_resource(
|
||||
"https".to_owned(),
|
||||
"555.444.212.100".to_owned(),
|
||||
8022,
|
||||
"FF024DC302130470A1C77839954392BA979F7AE2".to_owned(),
|
||||
3,
|
||||
);
|
||||
let resource_five = make_resource(
|
||||
"https".to_owned(),
|
||||
"234.111.212.100".to_owned(),
|
||||
10432,
|
||||
"7B4DE14CB2130470A1C77839954392BA979F7AE2".to_owned(),
|
||||
1,
|
||||
);
|
||||
let mut test_vec: Vec<Resource> = Vec::new();
|
||||
test_vec.push(resource_one);
|
||||
test_vec.push(resource_two);
|
||||
test_vec.push(resource_three);
|
||||
test_vec.push(resource_four);
|
||||
test_vec.push(resource_five);
|
||||
let (functional, failing) = sort_for_parsing(test_vec);
|
||||
assert!(
|
||||
functional.len() == 2,
|
||||
"There should be 2 functional bridges"
|
||||
);
|
||||
assert!(failing.len() == 3, "There should be 3 failing bridges");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -431,16 +431,11 @@ impl BridgeAuth {
|
|||
let reachable_bridges = self.bridge_table.reachable.clone();
|
||||
for reachable_bridge in reachable_bridges {
|
||||
if reachable_bridge.0.uid_fingerprint == bridge.uid_fingerprint {
|
||||
println!(
|
||||
"Bridge from table: {:?} has same IP and Port as bridge {:?}!",
|
||||
reachable_bridge.0, bridge
|
||||
);
|
||||
// Now we must remove the old bridge from the table and insert the new bridge in its place
|
||||
// i.e., in the same bucket and with the same permissions.
|
||||
let positions = self.bridge_table.reachable.get(&reachable_bridge.0);
|
||||
if let Some(v) = positions {
|
||||
for (bucketnum, offset) in v.iter() {
|
||||
println!("Bucket num: {:?} and offset: {:?}", bucketnum, offset);
|
||||
let mut bridgelines = match self.bridge_table.buckets.get(bucketnum) {
|
||||
Some(bridgelines) => *bridgelines,
|
||||
None => return res,
|
||||
|
@ -448,11 +443,10 @@ impl BridgeAuth {
|
|||
assert!(bridgelines[*offset] == reachable_bridge.0);
|
||||
bridgelines[*offset] = *bridge;
|
||||
self.bridge_table.buckets.insert(*bucketnum, bridgelines);
|
||||
let bridgelines = match self.bridge_table.buckets.get(bucketnum) {
|
||||
match self.bridge_table.buckets.get(bucketnum) {
|
||||
Some(bridgelines) => *bridgelines,
|
||||
None => return res,
|
||||
};
|
||||
assert!(bridgelines[*offset] != reachable_bridge.0);
|
||||
}
|
||||
res = true;
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue