Reconfigure distributor to handle static rdsys req

This commit is contained in:
onyinyang 2023-08-25 11:21:38 -04:00
parent 96cf4ab764
commit d4c54e969c
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
5 changed files with 80 additions and 88 deletions

1
Cargo.lock generated
View File

@ -1043,6 +1043,7 @@ dependencies = [
"lox_utils", "lox_utils",
"rand 0.8.5", "rand 0.8.5",
"rdsys_backend", "rdsys_backend",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"serde_with", "serde_with",

View File

@ -19,6 +19,7 @@ futures = "0.3.28"
time = "0.3.29" time = "0.3.29"
tokio = { version = "1", features = ["full", "macros", "signal"] } tokio = { version = "1", features = ["full", "macros", "signal"] }
rand = "0.8.5" rand = "0.8.5"
reqwest = { version = "0.11", features = ["json", "stream"]}
serde = { version = "1.0", features = ["derive", "rc"] } serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "3.4.0" serde_with = "3.4.0"
zkp = "0.8.0" zkp = "0.8.0"

View File

@ -4,7 +4,7 @@
}, },
"rtype": { "rtype": {
"endpoint": "http://127.0.0.1:7100/resource-stream", "endpoint": "http://127.0.0.1:7100/resources",
"name": "https", "name": "https",
"token": "HttpsApiTokenPlaceholder", "token": "HttpsApiTokenPlaceholder",
"types": [ "types": [

View File

@ -9,7 +9,7 @@ use hyper::{
}; };
use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use rdsys_backend::{proto::ResourceDiff, proto::Resource, request_resources, start_stream}; use rdsys_backend::{proto::Resource, proto::ResourceDiff, request_resources, start_stream};
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
@ -27,7 +27,7 @@ use resource_parser::parse_resource;
use tokio::{ use tokio::{
signal, spawn, signal, spawn,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot},
time::sleep, time::{interval, sleep},
}; };
async fn shutdown_signal() { async fn shutdown_signal() {
@ -89,29 +89,36 @@ struct ResourceInfo {
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command. // TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_stream( async fn rdsys_stream(
rtype: ResourceInfo, rtype: ResourceInfo,
tx: mpsc::Sender<ResourceDiff>, tx: mpsc::Sender<Vec<Resource>>,
mut kill: broadcast::Receiver<()>, mut kill: broadcast::Receiver<()>,
) { ) {
let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) tokio::select! {
.await start_resource_request = rdsys_request(rtype, tx) => start_resource_request,
.expect("rdsys stream initialization failed. Start rdsys or check config.json"); _ = kill.recv() => {println!("Shut down rdsys request loop"); return},
loop {
tokio::select! {
res = rstream.next() => {
match res {
Some(diff) => tx.send(diff).await.unwrap(),
None => return,
}
},
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
} }
}
async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<Vec<Resource>>) {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
let resources = request_resources(
rtype.endpoint.clone(),
rtype.name.clone(),
rtype.token.clone(),
rtype.types.clone(),
)
.await
.unwrap();
tx.send(resources).await.unwrap();
sleep(Duration::from_secs(30)).await;
} }
} }
async fn rdsys_bridge_parser( async fn rdsys_bridge_parser(
rdsys_tx: mpsc::Sender<Command>, rdsys_tx: mpsc::Sender<Command>,
rx: mpsc::Receiver<ResourceDiff>, rx: mpsc::Receiver<Vec<Resource>>,
mut kill: broadcast::Receiver<()>, mut kill: broadcast::Receiver<()>,
) { ) {
tokio::select! { tokio::select! {
@ -122,10 +129,10 @@ async fn rdsys_bridge_parser(
// Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the // Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the
// Context Manager to be parsed and added to the BridgeDB // Context Manager to be parsed and added to the BridgeDB
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) { async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<Vec<Resource>>) {
loop { loop {
let resourcediff = rx.recv().await.unwrap(); let resources = rx.recv().await.unwrap();
let cmd = Command::Rdsys { resourcediff }; let cmd = Command::Rdsys { resources };
rdsys_tx.send(cmd).await.unwrap(); rdsys_tx.send(cmd).await.unwrap();
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
@ -161,71 +168,58 @@ async fn context_manager(
while let Some(cmd) = context_rx.recv().await { while let Some(cmd) = context_rx.recv().await {
use Command::*; use Command::*;
match cmd { match cmd {
Rdsys { resourcediff } => { Rdsys { resources } => {
if let Some(new_resources) = resourcediff.new { let mut count = 0;
let mut count = 0; let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; for resource in resources {
for pt in new_resources { let bridgeline = parse_resource(resource);
println!("A NEW RESOURCE: {:?}", pt); println!("What is the bridgeline: {:?}", bridgeline);
if let Some(resources) = pt.1 { if context.to_be_replaced_bridges.lock().unwrap().len() > 0 {
for resource in resources { println!("BridgeLine to be replaced: {:?}", bridgeline);
let bridgeline = parse_resource(resource); let res = context.replace_with_new(bridgeline);
println!("What is the bridgeline: {:?}", bridgeline); if res == lox_library::ReplaceSuccess::NotFound {
if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { println!(
println!("BridgeLine to be replaced: {:?}", bridgeline); "BridgeLine not found in bridge_table, already updated {:?}",
let res = context.replace_with_new(bridgeline); bridgeline
if res == lox_library::ReplaceSuccess::NotFound { );
println!( } else if res == lox_library::ReplaceSuccess::Replaced {
"BridgeLine not found in bridge_table, already updated {:?}", println!("BridgeLine successfully replaced: {:?}", bridgeline);
bridgeline } else {
); assert!(
} else if res == lox_library::ReplaceSuccess::Replaced { res == lox_library::ReplaceSuccess::NotReplaced,
println!( "ReplaceSuccess incorrectly set somehow"
"BridgeLine successfully replaced: {:?}", );
bridgeline // Add the bridge to the list of to_be_replaced bridges in the Lox context and try
); // again to replace at the next update (nothing changes in the Lox Authority)
} else { println!(
assert!( "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
res == lox_library::ReplaceSuccess::NotReplaced, bridgeline
"ReplaceSuccess incorrectly set somehow" );
); context.new_to_be_replaced_bridge(bridgeline);
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline);
context.new_to_be_replaced_bridge(bridgeline);
}
} else if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
context.add_openinv_bucket(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
}
}
} }
} else if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
context.add_openinv_bucket(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
} }
// Handle the extra buckets that were not allocated already }
if count != 0 { // Handle the extra buckets that were not allocated already
for val in 0..count { if count != 0 {
if context.extra_bridges.lock().unwrap().len() for val in 0..count {
< (MAX_BRIDGES_PER_BUCKET) if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET) {
{ context.append_extra_bridges(bucket[val]);
context.append_extra_bridges(bucket[val]); } else {
} else { bucket = context.remove_extra_bridges();
bucket = context.remove_extra_bridges(); context.add_spare_bucket(bucket);
context.add_spare_bucket(bucket);
}
} }
} }
} }
if let Some(changed_resources) = resourcediff.changed { /*
for pt in changed_resources {
println!("A NEW CHANGED RESOURCE: {:?}", pt);
if let Some(resources) = pt.1 {
for resource in resources {
let bridgeline = parse_resource(resource); let bridgeline = parse_resource(resource);
println!("BridgeLine to be changed: {:?}", bridgeline); println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline); let res = context.update_bridge(bridgeline);
@ -250,11 +244,6 @@ async fn context_manager(
// is not likely due to censorship. Therefore, we replace gone resources with new resources // is not likely due to censorship. Therefore, we replace gone resources with new resources
// TODO: create a notion of blocked resources from information collected through various means: // TODO: create a notion of blocked resources from information collected through various means:
// https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035
if let Some(gone_resources) = resourcediff.gone {
for pt in gone_resources {
println!("A NEW GONE RESOURCE: {:?}", pt);
if let Some(resources) = pt.1 {
for resource in resources {
// If resource last passed tests 3 hours ago, it should be replaced with a working // If resource last passed tests 3 hours ago, it should be replaced with a working
// resource and be removed from the bridgetable. If it has been gone for more than 7 hours, // resource and be removed from the bridgetable. If it has been gone for more than 7 hours,
// we should stop trying to remove it from the bridge table and assume it has successfully been // we should stop trying to remove it from the bridge table and assume it has successfully been
@ -285,6 +274,7 @@ async fn context_manager(
} }
} }
} }
*/
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something
@ -328,7 +318,7 @@ async fn context_manager(
#[derive(Debug)] #[derive(Debug)]
enum Command { enum Command {
Rdsys { Rdsys {
resourcediff: ResourceDiff, resources: Vec<Resource>,
}, },
Request { Request {
req: Request<Body>, req: Request<Body>,

View File

@ -298,7 +298,7 @@ pub async fn request_resources( api_endpoint: String,
println!("Success? {:?}", response); println!("Success? {:?}", response);
match response.status() { match response.status() {
reqwest::StatusCode::OK => { reqwest::StatusCode::OK => {
fetched_resources = match response.json::<Vec<proto::Resource>>().await { fetched_resources = match dbg!(response.json::<Vec<proto::Resource>>().await) {
Ok(fetched_resources) => Ok(fetched_resources), Ok(fetched_resources) => Ok(fetched_resources),
Err(e) => Err(Error::Reqwest(e)), Err(e) => Err(Error::Reqwest(e)),
}; };