diff --git a/Cargo.lock b/Cargo.lock index 38c8b0c..7346aea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1043,6 +1043,7 @@ dependencies = [ "lox_utils", "rand 0.8.5", "rdsys_backend", + "reqwest", "serde", "serde_json", "serde_with", diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index 5aad2a3..ac48ac5 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -19,6 +19,7 @@ futures = "0.3.28" time = "0.3.29" tokio = { version = "1", features = ["full", "macros", "signal"] } rand = "0.8.5" +reqwest = { version = "0.11", features = ["json", "stream"]} serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "3.4.0" zkp = "0.8.0" diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 444b52e..04a015e 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -4,7 +4,7 @@ }, "rtype": { - "endpoint": "http://127.0.0.1:7100/resource-stream", + "endpoint": "http://127.0.0.1:7100/resources", "name": "https", "token": "HttpsApiTokenPlaceholder", "types": [ diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 5d1d619..6f834c1 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -9,7 +9,7 @@ use hyper::{ }; use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; -use rdsys_backend::{proto::ResourceDiff, proto::Resource, request_resources, start_stream}; +use rdsys_backend::{proto::Resource, proto::ResourceDiff, request_resources, start_stream}; use serde::Deserialize; use std::{ @@ -27,7 +27,7 @@ use resource_parser::parse_resource; use tokio::{ signal, spawn, sync::{broadcast, mpsc, oneshot}, - time::sleep, + time::{interval, sleep}, }; async fn shutdown_signal() { @@ -89,29 +89,36 @@ struct ResourceInfo { // TODO: ensure this stream gracefully shutdowns on the ctrl_c command. async fn rdsys_stream( rtype: ResourceInfo, - tx: mpsc::Sender, + tx: mpsc::Sender>, mut kill: broadcast::Receiver<()>, ) { - let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types) - .await - .expect("rdsys stream initialization failed. Start rdsys or check config.json"); - loop { - tokio::select! { - res = rstream.next() => { - match res { - Some(diff) => tx.send(diff).await.unwrap(), - None => return, - } - }, - _ = kill.recv() => {println!("Shut down rdsys stream"); return}, + tokio::select! { + start_resource_request = rdsys_request(rtype, tx) => start_resource_request, + _ = kill.recv() => {println!("Shut down rdsys request loop"); return}, - } + } +} + +async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender>) { + let mut interval = interval(Duration::from_secs(30)); + loop { + interval.tick().await; + let resources = request_resources( + rtype.endpoint.clone(), + rtype.name.clone(), + rtype.token.clone(), + rtype.types.clone(), + ) + .await + .unwrap(); + tx.send(resources).await.unwrap(); + sleep(Duration::from_secs(30)).await; } } async fn rdsys_bridge_parser( rdsys_tx: mpsc::Sender, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, mut kill: broadcast::Receiver<()>, ) { tokio::select! { @@ -122,10 +129,10 @@ async fn rdsys_bridge_parser( // Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the // Context Manager to be parsed and added to the BridgeDB -async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver) { +async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver>) { loop { - let resourcediff = rx.recv().await.unwrap(); - let cmd = Command::Rdsys { resourcediff }; + let resources = rx.recv().await.unwrap(); + let cmd = Command::Rdsys { resources }; rdsys_tx.send(cmd).await.unwrap(); sleep(Duration::from_secs(1)).await; } @@ -161,71 +168,58 @@ async fn context_manager( while let Some(cmd) = context_rx.recv().await { use Command::*; match cmd { - Rdsys { resourcediff } => { - if let Some(new_resources) = resourcediff.new { - let mut count = 0; - let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; - for pt in new_resources { - println!("A NEW RESOURCE: {:?}", pt); - if let Some(resources) = pt.1 { - for resource in resources { - let bridgeline = parse_resource(resource); - println!("What is the bridgeline: {:?}", bridgeline); - if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { - println!("BridgeLine to be replaced: {:?}", bridgeline); - let res = context.replace_with_new(bridgeline); - if res == lox_library::ReplaceSuccess::NotFound { - println!( - "BridgeLine not found in bridge_table, already updated {:?}", - bridgeline - ); - } else if res == lox_library::ReplaceSuccess::Replaced { - println!( - "BridgeLine successfully replaced: {:?}", - bridgeline - ); - } else { - assert!( - res == lox_library::ReplaceSuccess::NotReplaced, - "ReplaceSuccess incorrectly set somehow" - ); - // Add the bridge to the list of to_be_replaced bridges in the Lox context and try - // again to replace at the next update (nothing changes in the Lox Authority) - println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline); - context.new_to_be_replaced_bridge(bridgeline); - } - } else if count < MAX_BRIDGES_PER_BUCKET { - bucket[count] = bridgeline; - count += 1; - } else { - // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, - // eventually also do some more fancy grouping of new resources, i.e., by type or region - context.add_openinv_bucket(bucket); - count = 0; - bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; - } - } + Rdsys { resources } => { + let mut count = 0; + let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; + for resource in resources { + let bridgeline = parse_resource(resource); + println!("What is the bridgeline: {:?}", bridgeline); + if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { + println!("BridgeLine to be replaced: {:?}", bridgeline); + let res = context.replace_with_new(bridgeline); + if res == lox_library::ReplaceSuccess::NotFound { + println!( + "BridgeLine not found in bridge_table, already updated {:?}", + bridgeline + ); + } else if res == lox_library::ReplaceSuccess::Replaced { + println!("BridgeLine successfully replaced: {:?}", bridgeline); + } else { + assert!( + res == lox_library::ReplaceSuccess::NotReplaced, + "ReplaceSuccess incorrectly set somehow" + ); + // Add the bridge to the list of to_be_replaced bridges in the Lox context and try + // again to replace at the next update (nothing changes in the Lox Authority) + println!( + "'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", + bridgeline + ); + context.new_to_be_replaced_bridge(bridgeline); } + } else if count < MAX_BRIDGES_PER_BUCKET { + bucket[count] = bridgeline; + count += 1; + } else { + // TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket, + // eventually also do some more fancy grouping of new resources, i.e., by type or region + context.add_openinv_bucket(bucket); + count = 0; + bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET]; } - // Handle the extra buckets that were not allocated already - if count != 0 { - for val in 0..count { - if context.extra_bridges.lock().unwrap().len() - < (MAX_BRIDGES_PER_BUCKET) - { - context.append_extra_bridges(bucket[val]); - } else { - bucket = context.remove_extra_bridges(); - context.add_spare_bucket(bucket); - } + } + // Handle the extra buckets that were not allocated already + if count != 0 { + for val in 0..count { + if context.extra_bridges.lock().unwrap().len() < (MAX_BRIDGES_PER_BUCKET) { + context.append_extra_bridges(bucket[val]); + } else { + bucket = context.remove_extra_bridges(); + context.add_spare_bucket(bucket); } } } - if let Some(changed_resources) = resourcediff.changed { - for pt in changed_resources { - println!("A NEW CHANGED RESOURCE: {:?}", pt); - if let Some(resources) = pt.1 { - for resource in resources { + /* let bridgeline = parse_resource(resource); println!("BridgeLine to be changed: {:?}", bridgeline); let res = context.update_bridge(bridgeline); @@ -250,11 +244,6 @@ async fn context_manager( // is not likely due to censorship. Therefore, we replace gone resources with new resources // TODO: create a notion of blocked resources from information collected through various means: // https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035 - if let Some(gone_resources) = resourcediff.gone { - for pt in gone_resources { - println!("A NEW GONE RESOURCE: {:?}", pt); - if let Some(resources) = pt.1 { - for resource in resources { // If resource last passed tests 3 hours ago, it should be replaced with a working // resource and be removed from the bridgetable. If it has been gone for more than 7 hours, // we should stop trying to remove it from the bridge table and assume it has successfully been @@ -285,6 +274,7 @@ async fn context_manager( } } } + */ /* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something @@ -328,7 +318,7 @@ async fn context_manager( #[derive(Debug)] enum Command { Rdsys { - resourcediff: ResourceDiff, + resources: Vec, }, Request { req: Request, diff --git a/crates/rdsys-backend-api/src/lib.rs b/crates/rdsys-backend-api/src/lib.rs index 8e2c8eb..00cc4aa 100644 --- a/crates/rdsys-backend-api/src/lib.rs +++ b/crates/rdsys-backend-api/src/lib.rs @@ -298,7 +298,7 @@ pub async fn request_resources( api_endpoint: String, println!("Success? {:?}", response); match response.status() { reqwest::StatusCode::OK => { - fetched_resources = match response.json::>().await { + fetched_resources = match dbg!(response.json::>().await) { Ok(fetched_resources) => Ok(fetched_resources), Err(e) => Err(Error::Reqwest(e)), };