From 7a075d42ff83a68243d2b97103ce6e501d400936 Mon Sep 17 00:00:00 2001 From: Vecna Date: Fri, 24 May 2024 22:55:29 -0400 Subject: [PATCH] Add simulation binary code, move server code to main --- Cargo.toml | 4 + src/bin/server.rs | 333 -------------------------------------- src/bin/simulation.rs | 178 +++++++++++++++++++++ src/main.rs | 334 ++++++++++++++++++++++++++++++++++++++- src/simulation/bridge.rs | 2 +- src/simulation/censor.rs | 2 +- src/simulation/state.rs | 5 +- src/simulation/user.rs | 43 ++--- 8 files changed, 540 insertions(+), 361 deletions(-) delete mode 100644 src/bin/server.rs create mode 100644 src/bin/simulation.rs diff --git a/Cargo.toml b/Cargo.toml index a80f507..22fd969 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,3 +46,7 @@ faketime = "0.2" [features] simulation = ["faketime", "lox_cli"] + +[[bin]] +name = "simulation" +required-features = ["simulation"] diff --git a/src/bin/server.rs b/src/bin/server.rs deleted file mode 100644 index 9083443..0000000 --- a/src/bin/server.rs +++ /dev/null @@ -1,333 +0,0 @@ -use troll_patrol::{request_handler::*, *}; - -use clap::Parser; -use futures::future; -use futures::join; -use hyper::{ - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, -}; -use serde::Deserialize; -use sled::Db; -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - convert::Infallible, - fs::File, - io::BufReader, - net::SocketAddr, - path::PathBuf, - time::Duration, -}; -use tokio::{ - signal, spawn, - sync::{broadcast, mpsc, oneshot}, - time::sleep, -}; -#[cfg(not(features = "simulation"))] -use tokio_cron::{Job, Scheduler}; - -async fn shutdown_signal() { - tokio::signal::ctrl_c() - .await - .expect("failed to listen for ctrl+c signal"); - println!("Shut down Troll Patrol Server"); -} - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Name/path of the configuration file - #[arg(short, long, default_value = "config.json")] - config: PathBuf, -} - -#[derive(Debug, Deserialize)] -pub struct Config { - pub db: DbConfig, - // map of distributor name to IP:port to contact it - pub distributors: BTreeMap, - extra_infos_base_url: String, - - // confidence required to consider a bridge blocked - confidence: f64, - - // block open-entry bridges if they get more negative reports than this - max_threshold: u32, - - // block open-entry bridges if they get more negative reports than - // scaling_factor * bridge_ips - scaling_factor: f64, - - // minimum number of historical days for statistical analysis - min_historical_days: u32, - - // maximum number of historical days to consider in historical analysis - max_historical_days: u32, - - //require_bridge_token: bool, - port: u16, - updater_port: u16, - updater_schedule: String, -} - -#[derive(Debug, Deserialize)] -pub struct DbConfig { - // The path for the server database, default is "server_db" - pub db_path: String, -} - -impl Default for DbConfig { - fn default() -> DbConfig { - DbConfig { - db_path: "server_db".to_owned(), - } - } -} - -async fn update_daily_info( - db: &Db, - distributors: &BTreeMap, - extra_infos_base_url: &str, - confidence: f64, - max_threshold: u32, - scaling_factor: f64, - min_historical_days: u32, - max_historical_days: u32, -) -> HashMap<[u8; 20], HashSet> { - update_extra_infos(&db, &extra_infos_base_url) - .await - .unwrap(); - update_negative_reports(&db, &distributors).await; - update_positive_reports(&db, &distributors).await; - let new_blockages = guess_blockages( - &db, - &analysis::NormalAnalyzer::new(max_threshold, scaling_factor), - confidence, - min_historical_days, - max_historical_days, - ); - report_blockages(&distributors, new_blockages.clone()).await; - - // Generate tomorrow's key if we don't already have it - new_negative_report_key(&db, get_date() + 1); - - // Return new detected blockages - new_blockages -} - -/* -async fn run_updater(updater_tx: mpsc::Sender) { - updater_tx.send(Command::Update { - - }).await.unwrap(); -} -*/ - -async fn create_context_manager( - db_config: DbConfig, - distributors: BTreeMap, - extra_infos_base_url: &str, - confidence: f64, - max_threshold: u32, - scaling_factor: f64, - min_historical_days: u32, - max_historical_days: u32, - context_rx: mpsc::Receiver, - mut kill: broadcast::Receiver<()>, -) { - tokio::select! { - create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, max_threshold, scaling_factor, min_historical_days, max_historical_days, context_rx) => create_context, - _ = kill.recv() => {println!("Shut down manager");}, - } -} - -async fn context_manager( - db_config: DbConfig, - distributors: BTreeMap, - extra_infos_base_url: &str, - confidence: f64, - max_threshold: u32, - scaling_factor: f64, - min_historical_days: u32, - max_historical_days: u32, - mut context_rx: mpsc::Receiver, -) { - let db: Db = sled::open(&db_config.db_path).unwrap(); - - while let Some(cmd) = context_rx.recv().await { - use Command::*; - match cmd { - Request { req, sender } => { - let response = handle(&db, req).await; - if let Err(e) = sender.send(response) { - eprintln!("Server Response Error: {:?}", e); - }; - sleep(Duration::from_millis(1)).await; - } - Shutdown { shutdown_sig } => { - println!("Sending Shutdown Signal, all threads should shutdown."); - drop(shutdown_sig); - println!("Shutdown Sent."); - } - Update { _req, sender } => { - let blockages = update_daily_info( - &db, - &distributors, - &extra_infos_base_url, - confidence, - max_threshold, - scaling_factor, - min_historical_days, - max_historical_days, - ) - .await; - let response = if cfg!(feature = "simulation") { - // Convert map keys from [u8; 20] to 40-character hex strings - let mut blockages_str = HashMap::>::new(); - for (fingerprint, countries) in blockages { - let fpr_string = array_bytes::bytes2hex("", fingerprint); - blockages_str.insert(fpr_string, countries); - } - Ok(prepare_header( - serde_json::to_string(&blockages_str).unwrap(), - )) - } else { - Ok(prepare_header("OK".to_string())) - }; - if let Err(e) = sender.send(response) { - eprintln!("Update Response Error: {:?}", e); - }; - sleep(Duration::from_millis(1)).await; - } - } - } -} - -// Each of the commands that can be handled -#[derive(Debug)] -enum Command { - Request { - req: Request, - sender: oneshot::Sender, Infallible>>, - }, - Shutdown { - shutdown_sig: broadcast::Sender<()>, - }, - Update { - _req: Request, - sender: oneshot::Sender, Infallible>>, - }, -} - -#[tokio::main] -async fn main() { - let args: Args = Args::parse(); - - let config: Config = serde_json::from_reader(BufReader::new( - File::open(&args.config).expect("Could not read config file"), - )) - .expect("Reading config file from JSON failed"); - - let (request_tx, request_rx) = mpsc::channel(32); - - let updater_tx = request_tx.clone(); - let shutdown_cmd_tx = request_tx.clone(); - - // create the shutdown broadcast channel - let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); - let kill = shutdown_tx.subscribe(); - - // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx - let shutdown_handler = spawn(async move { - tokio::select! { - _ = signal::ctrl_c() => { - let cmd = Command::Shutdown { - shutdown_sig: shutdown_tx, - }; - shutdown_cmd_tx.send(cmd).await.unwrap(); - sleep(Duration::from_secs(1)).await; - - _ = shutdown_rx.recv().await; - } - } - }); - - // TODO: Reintroduce this - /* - #[cfg(not(feature = "simulation"))] - let updater = spawn(async move { - // Run updater once per day - let mut sched = Scheduler::utc(); - sched.add(Job::new(config.updater_schedule, move || { - run_updater(updater_tx.clone()) - })); - }); - */ - let context_manager = spawn(async move { - create_context_manager( - config.db, - config.distributors, - &config.extra_infos_base_url, - config.confidence, - config.max_threshold, - config.scaling_factor, - config.min_historical_days, - config.max_historical_days, - request_rx, - kill, - ) - .await - }); - - let make_service = make_service_fn(move |_conn: &AddrStream| { - let request_tx = request_tx.clone(); - let service = service_fn(move |req| { - let request_tx = request_tx.clone(); - let (response_tx, response_rx) = oneshot::channel(); - let cmd = Command::Request { - req, - sender: response_tx, - }; - async move { - request_tx.send(cmd).await.unwrap(); - response_rx.await.unwrap() - } - }); - async move { Ok::<_, Infallible>(service) } - }); - - let updater_make_service = make_service_fn(move |_conn: &AddrStream| { - let request_tx = updater_tx.clone(); - let service = service_fn(move |_req| { - let request_tx = request_tx.clone(); - let (response_tx, response_rx) = oneshot::channel(); - let cmd = Command::Update { - _req, - sender: response_tx, - }; - async move { - request_tx.send(cmd).await.unwrap(); - response_rx.await.unwrap() - } - }); - async move { Ok::<_, Infallible>(service) } - }); - - let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); - let server = Server::bind(&addr).serve(make_service); - let graceful = server.with_graceful_shutdown(shutdown_signal()); - let updater_addr = SocketAddr::from(([127, 0, 0, 1], config.updater_port)); - let updater_server = Server::bind(&updater_addr).serve(updater_make_service); - let updater_graceful = updater_server.with_graceful_shutdown(shutdown_signal()); - println!("Listening on {}", addr); - println!("Updater listening on {}", updater_addr); - let (a, b) = join!(graceful, updater_graceful); - if a.is_err() { - eprintln!("server error: {}", a.unwrap_err()); - } - if b.is_err() { - eprintln!("server error: {}", b.unwrap_err()); - } - future::join_all([context_manager, shutdown_handler]).await; -} diff --git a/src/bin/simulation.rs b/src/bin/simulation.rs new file mode 100644 index 0000000..e44b5d2 --- /dev/null +++ b/src/bin/simulation.rs @@ -0,0 +1,178 @@ +// Before running this, run: +// 1. rdsys +// 2. lox-distributor +// 3. troll-patrol with the feature "simulation" + +use troll_patrol::{ + extra_info::ExtraInfo, + increment_simulated_date, + simulation::{bridge::Bridge, censor::Censor, extra_infos_server, state::State, user::User}, +}; + +use clap::Parser; +use lox_cli::{networking::*, *}; +use rand::Rng; +use serde::Deserialize; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + io::BufReader, + path::PathBuf, +}; +use tokio::spawn; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Name/path of the configuration file + #[arg(short, long, default_value = "simulation_config.json")] + config: PathBuf, +} + +#[derive(Debug, Deserialize)] +pub struct Config { + pub la_port: u16, + pub la_test_port: u16, + pub tp_port: u16, + pub tp_test_port: u16, + pub min_new_users_per_day: u32, + pub max_new_users_per_day: u32, + // How many days to simulate + pub num_days: u32, + pub prob_connection_fails: f64, + pub prob_friend_in_same_country: f64, + pub prob_user_invites_friend: f64, + pub prob_user_is_censor: f64, + pub prob_user_submits_reports: f64, + pub probs_user_in_country: Vec<(String, f64)>, + pub sharing: bool, +} + +#[tokio::main] +pub async fn main() { + let args: Args = Args::parse(); + + let config: Config = serde_json::from_reader(BufReader::new( + File::open(&args.config).expect("Could not read config file"), + )) + .expect("Reading config file from JSON failed"); + + let la_net = HyperNet { + hostname: format!("http://localhost:{}", config.la_port), + }; + let la_net_test = HyperNet { + hostname: format!("http://localhost:{}", config.la_test_port), + }; + let tp_net = HyperNet { + hostname: format!("http://localhost:{}", config.tp_port), + }; + let tp_net_test = HyperNet { + hostname: format!("http://localhost:{}", config.tp_test_port), + }; + let extra_infos_net = HyperNet { + hostname: "http://localhost:8003".to_string(), + }; + + let la_pubkeys = get_lox_auth_keys(&la_net).await; + + let state = State { + la_net, + tp_net, + la_pubkeys, + prob_connection_fails: config.prob_connection_fails, + prob_friend_in_same_country: config.prob_friend_in_same_country, + prob_user_invites_friend: config.prob_user_invites_friend, + prob_user_is_censor: config.prob_user_is_censor, + prob_user_submits_reports: config.prob_user_submits_reports, + probs_user_in_country: config.probs_user_in_country, + sharing: config.sharing, + }; + + let mut rng = rand::thread_rng(); + + // Set up censors + let mut censors = HashMap::::new(); + + // Set up bridges (no bridges yet) + let mut bridges = HashMap::<[u8; 20], Bridge>::new(); + + // Set up users (no users yet) + let mut users = Vec::::new(); + + // Set up extra-infos server + spawn(async move { + extra_infos_server::server().await; + }); + + // Main loop + for _ in 0..config.num_days { + // USER TASKS + + // Add some new users + let num_new_users: u32 = + rng.gen_range(config.min_new_users_per_day..=config.max_new_users_per_day); + for _ in 0..num_new_users { + users.push(User::new(&state).await); + } + + let mut new_users = Vec::::new(); + + // Users do daily actions + for user in &mut users { + // TODO: Refactor out connections from return + let (mut invited_friends, _connections) = + user.daily_tasks(&state, &mut bridges, &mut censors).await; + + // If this user invited any friends, add them to the list of users + new_users.append(&mut invited_friends); + } + + // Add new users + users.append(&mut new_users); + + // CENSOR TASKS + for (_, censor) in censors.iter_mut() { + censor.end_of_day_tasks(&state, &mut bridges).await; + } + + // BRIDGE TASKS + let mut new_extra_infos = HashSet::::new(); + for (_, bridge) in bridges.iter_mut() { + // Bridge reports its connections for the day + new_extra_infos.insert(bridge.gen_extra_info()); + + // Bridge resets for tomorrow + bridge.reset_for_tomorrow(); + } + + // Publish all the bridges' extra-infos for today + extra_infos_net + .request( + "/add".to_string(), + serde_json::to_string(&new_extra_infos).unwrap().into(), + ) + .await; + + // TROLL PATROL TASKS + let new_blockages_resp = tp_net_test.request("/update".to_string(), vec![]).await; + let new_blockages: HashMap> = + serde_json::from_slice(&new_blockages_resp).unwrap(); + + // TODO: Track stats about new blockages + + // LOX AUTHORITY TASKS + + // Advance LA's time to tomorrow + la_net_test + .request( + "/advancedays".to_string(), + serde_json::to_string(&(1 as u16)).unwrap().into(), + ) + .await; + + // SIMULATION TASKS + + // Advance simulated time to tomorrow + increment_simulated_date(); + } +} diff --git a/src/main.rs b/src/main.rs index e7a11a9..9083443 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,333 @@ -fn main() { - println!("Hello, world!"); +use troll_patrol::{request_handler::*, *}; + +use clap::Parser; +use futures::future; +use futures::join; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; +use serde::Deserialize; +use sled::Db; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + convert::Infallible, + fs::File, + io::BufReader, + net::SocketAddr, + path::PathBuf, + time::Duration, +}; +use tokio::{ + signal, spawn, + sync::{broadcast, mpsc, oneshot}, + time::sleep, +}; +#[cfg(not(features = "simulation"))] +use tokio_cron::{Job, Scheduler}; + +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c signal"); + println!("Shut down Troll Patrol Server"); +} + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Name/path of the configuration file + #[arg(short, long, default_value = "config.json")] + config: PathBuf, +} + +#[derive(Debug, Deserialize)] +pub struct Config { + pub db: DbConfig, + // map of distributor name to IP:port to contact it + pub distributors: BTreeMap, + extra_infos_base_url: String, + + // confidence required to consider a bridge blocked + confidence: f64, + + // block open-entry bridges if they get more negative reports than this + max_threshold: u32, + + // block open-entry bridges if they get more negative reports than + // scaling_factor * bridge_ips + scaling_factor: f64, + + // minimum number of historical days for statistical analysis + min_historical_days: u32, + + // maximum number of historical days to consider in historical analysis + max_historical_days: u32, + + //require_bridge_token: bool, + port: u16, + updater_port: u16, + updater_schedule: String, +} + +#[derive(Debug, Deserialize)] +pub struct DbConfig { + // The path for the server database, default is "server_db" + pub db_path: String, +} + +impl Default for DbConfig { + fn default() -> DbConfig { + DbConfig { + db_path: "server_db".to_owned(), + } + } +} + +async fn update_daily_info( + db: &Db, + distributors: &BTreeMap, + extra_infos_base_url: &str, + confidence: f64, + max_threshold: u32, + scaling_factor: f64, + min_historical_days: u32, + max_historical_days: u32, +) -> HashMap<[u8; 20], HashSet> { + update_extra_infos(&db, &extra_infos_base_url) + .await + .unwrap(); + update_negative_reports(&db, &distributors).await; + update_positive_reports(&db, &distributors).await; + let new_blockages = guess_blockages( + &db, + &analysis::NormalAnalyzer::new(max_threshold, scaling_factor), + confidence, + min_historical_days, + max_historical_days, + ); + report_blockages(&distributors, new_blockages.clone()).await; + + // Generate tomorrow's key if we don't already have it + new_negative_report_key(&db, get_date() + 1); + + // Return new detected blockages + new_blockages +} + +/* +async fn run_updater(updater_tx: mpsc::Sender) { + updater_tx.send(Command::Update { + + }).await.unwrap(); +} +*/ + +async fn create_context_manager( + db_config: DbConfig, + distributors: BTreeMap, + extra_infos_base_url: &str, + confidence: f64, + max_threshold: u32, + scaling_factor: f64, + min_historical_days: u32, + max_historical_days: u32, + context_rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { + create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, max_threshold, scaling_factor, min_historical_days, max_historical_days, context_rx) => create_context, + _ = kill.recv() => {println!("Shut down manager");}, + } +} + +async fn context_manager( + db_config: DbConfig, + distributors: BTreeMap, + extra_infos_base_url: &str, + confidence: f64, + max_threshold: u32, + scaling_factor: f64, + min_historical_days: u32, + max_historical_days: u32, + mut context_rx: mpsc::Receiver, +) { + let db: Db = sled::open(&db_config.db_path).unwrap(); + + while let Some(cmd) = context_rx.recv().await { + use Command::*; + match cmd { + Request { req, sender } => { + let response = handle(&db, req).await; + if let Err(e) = sender.send(response) { + eprintln!("Server Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; + } + Shutdown { shutdown_sig } => { + println!("Sending Shutdown Signal, all threads should shutdown."); + drop(shutdown_sig); + println!("Shutdown Sent."); + } + Update { _req, sender } => { + let blockages = update_daily_info( + &db, + &distributors, + &extra_infos_base_url, + confidence, + max_threshold, + scaling_factor, + min_historical_days, + max_historical_days, + ) + .await; + let response = if cfg!(feature = "simulation") { + // Convert map keys from [u8; 20] to 40-character hex strings + let mut blockages_str = HashMap::>::new(); + for (fingerprint, countries) in blockages { + let fpr_string = array_bytes::bytes2hex("", fingerprint); + blockages_str.insert(fpr_string, countries); + } + Ok(prepare_header( + serde_json::to_string(&blockages_str).unwrap(), + )) + } else { + Ok(prepare_header("OK".to_string())) + }; + if let Err(e) = sender.send(response) { + eprintln!("Update Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; + } + } + } +} + +// Each of the commands that can be handled +#[derive(Debug)] +enum Command { + Request { + req: Request, + sender: oneshot::Sender, Infallible>>, + }, + Shutdown { + shutdown_sig: broadcast::Sender<()>, + }, + Update { + _req: Request, + sender: oneshot::Sender, Infallible>>, + }, +} + +#[tokio::main] +async fn main() { + let args: Args = Args::parse(); + + let config: Config = serde_json::from_reader(BufReader::new( + File::open(&args.config).expect("Could not read config file"), + )) + .expect("Reading config file from JSON failed"); + + let (request_tx, request_rx) = mpsc::channel(32); + + let updater_tx = request_tx.clone(); + let shutdown_cmd_tx = request_tx.clone(); + + // create the shutdown broadcast channel + let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); + let kill = shutdown_tx.subscribe(); + + // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx + let shutdown_handler = spawn(async move { + tokio::select! { + _ = signal::ctrl_c() => { + let cmd = Command::Shutdown { + shutdown_sig: shutdown_tx, + }; + shutdown_cmd_tx.send(cmd).await.unwrap(); + sleep(Duration::from_secs(1)).await; + + _ = shutdown_rx.recv().await; + } + } + }); + + // TODO: Reintroduce this + /* + #[cfg(not(feature = "simulation"))] + let updater = spawn(async move { + // Run updater once per day + let mut sched = Scheduler::utc(); + sched.add(Job::new(config.updater_schedule, move || { + run_updater(updater_tx.clone()) + })); + }); + */ + let context_manager = spawn(async move { + create_context_manager( + config.db, + config.distributors, + &config.extra_infos_base_url, + config.confidence, + config.max_threshold, + config.scaling_factor, + config.min_historical_days, + config.max_historical_days, + request_rx, + kill, + ) + .await + }); + + let make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = request_tx.clone(); + let service = service_fn(move |req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Request { + req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + + let updater_make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = updater_tx.clone(); + let service = service_fn(move |_req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Update { + _req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); + let server = Server::bind(&addr).serve(make_service); + let graceful = server.with_graceful_shutdown(shutdown_signal()); + let updater_addr = SocketAddr::from(([127, 0, 0, 1], config.updater_port)); + let updater_server = Server::bind(&updater_addr).serve(updater_make_service); + let updater_graceful = updater_server.with_graceful_shutdown(shutdown_signal()); + println!("Listening on {}", addr); + println!("Updater listening on {}", updater_addr); + let (a, b) = join!(graceful, updater_graceful); + if a.is_err() { + eprintln!("server error: {}", a.unwrap_err()); + } + if b.is_err() { + eprintln!("server error: {}", b.unwrap_err()); + } + future::join_all([context_manager, shutdown_handler]).await; } diff --git a/src/simulation/bridge.rs b/src/simulation/bridge.rs index 9ece496..0862a12 100644 --- a/src/simulation/bridge.rs +++ b/src/simulation/bridge.rs @@ -73,7 +73,7 @@ impl Bridge { } } - fn reset_for_tomorrow(&mut self) { + pub fn reset_for_tomorrow(&mut self) { self.real_connections = HashMap::::new(); self.total_connections = BTreeMap::::new(); } diff --git a/src/simulation/censor.rs b/src/simulation/censor.rs index 36716fa..cbd64d2 100644 --- a/src/simulation/censor.rs +++ b/src/simulation/censor.rs @@ -113,7 +113,7 @@ impl Censor { ) .unwrap(); state - .net_tp + .tp_net .request("/positivereport".to_string(), pr.to_json().into_bytes()) .await; } diff --git a/src/simulation/state.rs b/src/simulation/state.rs index 96288a0..994bba2 100644 --- a/src/simulation/state.rs +++ b/src/simulation/state.rs @@ -3,9 +3,8 @@ use lox_library::IssuerPubKey; pub struct State { pub la_pubkeys: Vec, - pub net: HyperNet, - pub net_test: HyperNet, - pub net_tp: HyperNet, + pub la_net: HyperNet, + pub tp_net: HyperNet, // Probability that a connection randomly fails, even though censor // does not block the bridge pub prob_connection_fails: f64, diff --git a/src/simulation/user.rs b/src/simulation/user.rs index dca4a23..3fc7c98 100644 --- a/src/simulation/user.rs +++ b/src/simulation/user.rs @@ -49,8 +49,8 @@ pub struct User { impl User { pub async fn new(state: &State) -> Self { let cred = get_lox_credential( - &state.net, - &get_open_invitation(&state.net).await, + &state.la_net, + &get_open_invitation(&state.la_net).await, get_lox_pub(&state.la_pubkeys), ) .await @@ -95,9 +95,9 @@ impl User { // TODO: This should probably return an actual error type pub async fn invite(&mut self, state: &State) -> Result { - let etable = get_reachability_credential(&state.net).await; + let etable = get_reachability_credential(&state.la_net).await; let (new_cred, invite) = issue_invite( - &state.net, + &state.la_net, &self.primary_cred, &etable, get_lox_pub(&state.la_pubkeys), @@ -107,7 +107,7 @@ impl User { .await; self.primary_cred = new_cred; let friend_cred = redeem_invite( - &state.net, + &state.la_net, &invite, get_lox_pub(&state.la_pubkeys), get_invitation_pub(&state.la_pubkeys), @@ -198,7 +198,7 @@ impl User { let date = get_date(); let pubkey = serde_json::from_slice::>( &state - .net_tp + .tp_net .request( "/nrkey".to_string(), serde_json::to_string(&date).unwrap().into(), @@ -209,7 +209,7 @@ impl User { .unwrap(); for report in reports { state - .net_tp + .tp_net .request( "/negativereport".to_string(), bincode::serialize(&report.encrypt(&pubkey)).unwrap(), @@ -221,7 +221,7 @@ impl User { pub async fn send_positive_reports(state: &State, reports: Vec) { for report in reports { state - .net_tp + .tp_net .request("/positivereport".to_string(), report.to_json().into_bytes()) .await; } @@ -243,7 +243,7 @@ impl User { // Download bucket to see if bridge is still reachable. (We // assume that this step can be done even if the user can't // actually talk to the LA.) - let (bucket, reachcred) = get_bucket(&state.net, &self.primary_cred).await; + let (bucket, reachcred) = get_bucket(&state.la_net, &self.primary_cred).await; let level = scalar_u32(&self.primary_cred.trust_level).unwrap(); // Make sure each bridge in bucket is in the global bridges set @@ -273,8 +273,8 @@ impl User { // Can we level up the main credential? let can_level_up = reachcred.is_some() && (level == 0 - && eligible_for_trust_promotion(&state.net, &self.primary_cred).await - || level > 0 && eligible_for_level_up(&state.net, &self.primary_cred).await); + && eligible_for_trust_promotion(&state.la_net, &self.primary_cred).await + || level > 0 && eligible_for_level_up(&state.la_net, &self.primary_cred).await); // Can we migrate the main credential? let can_migrate = reachcred.is_none() && level >= MIN_TRUST_LEVEL; @@ -304,8 +304,8 @@ impl User { } else { // Get new credential let cred = get_lox_credential( - &state.net, - &get_open_invitation(&state.net).await, + &state.la_net, + &get_open_invitation(&state.la_net).await, get_lox_pub(&state.la_pubkeys), ) .await @@ -319,7 +319,8 @@ impl User { }; if second_cred.is_some() { let second_cred = second_cred.as_ref().unwrap(); - let (second_bucket, second_reachcred) = get_bucket(&state.net, &second_cred).await; + let (second_bucket, second_reachcred) = + get_bucket(&state.la_net, &second_cred).await; if !bridges.contains_key(&second_bucket[0].fingerprint) { bridges .insert( @@ -335,7 +336,7 @@ impl User { ) { succeeded.push(second_bucket[0]); if second_reachcred.is_some() - && eligible_for_trust_promotion(&state.net, &second_cred).await + && eligible_for_trust_promotion(&state.la_net, &second_cred).await { second_level_up = true; } @@ -375,7 +376,7 @@ impl User { // let's just allow it. if can_level_up { let cred = level_up( - &state.net, + &state.la_net, &self.primary_cred, &reachcred.unwrap(), get_lox_pub(&state.la_pubkeys), @@ -389,7 +390,7 @@ impl User { // Make sure censor has access to each bridge and // each credential let censor = censors.get_mut(&self.country).unwrap(); - let (bucket, reachcred) = get_bucket(&state.net, &self.primary_cred).await; + let (bucket, reachcred) = get_bucket(&state.la_net, &self.primary_cred).await; for bl in bucket { censor.learn_bridge(&bl.fingerprint); censor.give_lox_cred(&bl.fingerprint, &self.primary_cred); @@ -400,9 +401,9 @@ impl User { else if second_level_up { let second_cred = second_cred.as_ref().unwrap(); let cred = trust_migration( - &state.net, + &state.la_net, &second_cred, - &trust_promotion(&state.net, &second_cred, get_lox_pub(&state.la_pubkeys)) + &trust_promotion(&state.la_net, &second_cred, get_lox_pub(&state.la_pubkeys)) .await, get_lox_pub(&state.la_pubkeys), get_migration_pub(&state.la_pubkeys), @@ -412,10 +413,10 @@ impl User { self.secondary_cred = None; } else if can_migrate { let cred = blockage_migration( - &state.net, + &state.la_net, &self.primary_cred, &check_blockage( - &state.net, + &state.la_net, &self.primary_cred, get_lox_pub(&state.la_pubkeys), )