diff --git a/Cargo.toml b/Cargo.toml index be78d42..e88c30d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,17 +12,19 @@ chrono = "0.4" clap = { version = "4.4.14", features = ["derive"] } curve25519-dalek = { version = "4", default-features = false, features = ["serde", "rand_core", "digest"] } ed25519-dalek = { version = "2", features = ["serde", "rand_core"] } +futures = "0.3.30" +http = "1" http-body-util = "0.1" -hyper = { version = "1", features = ["full"] } +hyper = { version = "0.14.28", features = ["full"] } hyper-rustls = "0.26.0" hyper-util = { version = "0.1", features = ["full"] } julianday = "1.2.0" lazy_static = "1" lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" } select = "0.6.0" -serde = "1.0.195" +serde = "1.0.197" serde_json = "1.0" -serde_with = {version = "3.5.0", features = ["json"]} +serde_with = {version = "3.7.0", features = ["json"]} sha1 = "0.10" sha3 = "0.10" sled = "0.34.7" diff --git a/src/bin/server.rs b/src/bin/server.rs index db8a7cb..94682d9 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -2,25 +2,66 @@ use troll_patrol::{ extra_info::{self, ExtraInfo}, //negative_report::SerializableNegativeReport, //positive_report::SerializablePositiveReport, + request_handler::handle, *, }; use clap::Parser; +use futures::future; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; +use serde::Deserialize; use sled::Db; -use std::{collections::HashSet, path::PathBuf}; +use std::{ + collections::HashSet, convert::Infallible, fs::File, io::BufReader, net::SocketAddr, + path::PathBuf, time::Duration, +}; +use tokio::{ + signal, spawn, + sync::{broadcast, mpsc, oneshot}, + time::sleep, +}; -#[tokio::main] -async fn main() { - // TODO: Currently, we're processing extra-infos here, but we want to: - // 1. Run a server to accept incoming reports - // 2. Periodically (daily): - // a) download new extra-infos - // b) determine whether we think each bridge is blocked or not - // c) report these results to the LA - // 3. Store all our data +async fn shutdown_signal() { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl+c signal"); + println!("Shut down Troll Patrol Server"); +} - let db: Db = sled::open(&CONFIG.db.db_path).unwrap(); +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Name/path of the configuration file + #[arg(short, long, default_value = "config.json")] + config: PathBuf, +} +#[derive(Debug, Deserialize)] +pub struct Config { + pub db: DbConfig, + //require_bridge_token: bool, + port: u16, +} + +#[derive(Debug, Deserialize)] +pub struct DbConfig { + // The path for the server database, default is "server_db" + pub db_path: String, +} + +impl Default for DbConfig { + fn default() -> DbConfig { + DbConfig { + db_path: "server_db".to_owned(), + } + } +} + +async fn update_extra_infos(db: &Db) { // Track which files have been processed. This is slightly redundant // because we're only downloading files we don't already have, but it // might be a good idea to check in case we downloaded a file but didn't @@ -45,5 +86,123 @@ async fn main() { add_extra_info_to_db(&db, extra_info); } - db.insert(b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap()).unwrap(); + db.insert( + b"extra_infos_files", + bincode::serialize(&processed_extra_infos_files).unwrap(), + ) + .unwrap(); +} + +async fn create_context_manager( + db_config: DbConfig, + context_rx: mpsc::Receiver, + mut kill: broadcast::Receiver<()>, +) { + tokio::select! { + create_context = context_manager(db_config, context_rx) => create_context, + _ = kill.recv() => {println!("Shut down manager");}, + } +} + +async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver) { + let db: Db = sled::open(&db_config.db_path).unwrap(); + + while let Some(cmd) = context_rx.recv().await { + use Command::*; + match cmd { + Request { req, sender } => { + let response = handle(&db, req).await; + if let Err(e) = sender.send(response) { + eprintln!("Server Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; + } + Shutdown { shutdown_sig } => { + println!("Sending Shutdown Signal, all threads should shutdown."); + drop(shutdown_sig); + println!("Shutdown Sent."); + } + } + } +} + +// Each of the commands that can be handled +#[derive(Debug)] +enum Command { + Request { + req: Request, + sender: oneshot::Sender, Infallible>>, + }, + Shutdown { + shutdown_sig: broadcast::Sender<()>, + }, +} + +#[tokio::main] +async fn main() { + // TODO: Currently, we're processing extra-infos here, but we want to: + // 2. Periodically (daily): + // a) download new extra-infos + // b) determine whether we think each bridge is blocked or not + // c) report these results to the LA + // 3. Store all our data + + let args: Args = Args::parse(); + + let config: Config = serde_json::from_reader(BufReader::new( + File::open(&args.config).expect("Could not read config file"), + )) + .expect("Reading config file from JSON failed"); + + let (request_tx, request_rx) = mpsc::channel(32); + + let shutdown_cmd_tx = request_tx.clone(); + + // create the shutdown broadcast channel and clone for every thread + let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); + let kill = shutdown_tx.subscribe(); + + // Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx + let shutdown_handler = spawn(async move { + tokio::select! { + _ = signal::ctrl_c() => { + let cmd = Command::Shutdown { + shutdown_sig: shutdown_tx, + }; + shutdown_cmd_tx.send(cmd).await.unwrap(); + sleep(Duration::from_secs(1)).await; + + _ = shutdown_rx.recv().await; + } + } + }); + + let context_manager = + spawn(async move { create_context_manager(config.db, request_rx, kill).await }); + + let make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = request_tx.clone(); + let service = service_fn(move |req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Request { + req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); + let server = Server::bind(&addr).serve(make_service); + let graceful = server.with_graceful_shutdown(shutdown_signal()); + println!("Listening on {}", addr); + if let Err(e) = graceful.await { + eprintln!("server error: {}", e); + } + future::join_all([context_manager, shutdown_handler]).await; } diff --git a/src/extra_info.rs b/src/extra_info.rs index 47dbbbb..d3c7d42 100644 --- a/src/extra_info.rs +++ b/src/extra_info.rs @@ -3,6 +3,7 @@ Note, this is NOT a complete implementation of the document format. (https://spec.torproject.org/dir-spec/extra-info-document-format.html) */ use chrono::DateTime; +use http::status::StatusCode; use http_body_util::{BodyExt, Empty}; use hyper::body::Bytes; use hyper_util::{client::legacy::Client, rt::TokioExecutor}; @@ -164,7 +165,7 @@ pub async fn download_extra_infos( println!("Downloading {}", base_url); let mut res = client.get(url).await?; - assert_eq!(res.status(), hyper::StatusCode::OK); + assert_eq!(res.status(), StatusCode::OK); let mut body_str = String::from(""); while let Some(next) = res.frame().await { let frame = next?; @@ -191,7 +192,7 @@ pub async fn download_extra_infos( let extra_infos_url = format!("{}{}", base_url, link); println!("Downloading {}", extra_infos_url); let mut res = client.get(extra_infos_url.parse().unwrap()).await?; - assert_eq!(res.status(), hyper::StatusCode::OK); + assert_eq!(res.status(), StatusCode::OK); let mut file = std::fs::File::create(filename).unwrap(); while let Some(next) = res.frame().await { let frame = next?; diff --git a/src/lib.rs b/src/lib.rs index 82d449c..a656ebd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,50 +4,22 @@ use sled::Db; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, - fs::File, - io::BufReader, }; pub mod bridge_verification_info; pub mod extra_info; pub mod negative_report; pub mod positive_report; +pub mod request_handler; use extra_info::*; use negative_report::*; use positive_report::*; -#[derive(Debug, Deserialize)] -pub struct Config { - pub db: DbConfig, - //require_bridge_token: bool, -} - -#[derive(Debug, Deserialize)] -pub struct DbConfig { - // The path for the server database, default is "server_db" - pub db_path: String, -} - -impl Default for DbConfig { - fn default() -> DbConfig { - DbConfig { - db_path: "server_db".to_owned(), - } - } -} - lazy_static! { // known country codes based on Tor geoIP database // Produced with `cat /usr/share/tor/geoip{,6} | grep -v ^# | grep -o ..$ | sort | uniq | tr '[:upper:]' '[:lower:]' | tr '\n' ',' | sed 's/,/","/g'` pub static ref COUNTRY_CODES: HashSet<&'static str> = HashSet::from(["??","ad","ae","af","ag","ai","al","am","ao","ap","aq","ar","as","at","au","aw","ax","az","ba","bb","bd","be","bf","bg","bh","bi","bj","bl","bm","bn","bo","bq","br","bs","bt","bv","bw","by","bz","ca","cc","cd","cf","cg","ch","ci","ck","cl","cm","cn","co","cr","cs","cu","cv","cw","cx","cy","cz","de","dj","dk","dm","do","dz","ec","ee","eg","eh","er","es","et","eu","fi","fj","fk","fm","fo","fr","ga","gb","gd","ge","gf","gg","gh","gi","gl","gm","gn","gp","gq","gr","gs","gt","gu","gw","gy","hk","hm","hn","hr","ht","hu","id","ie","il","im","in","io","iq","ir","is","it","je","jm","jo","jp","ke","kg","kh","ki","km","kn","kp","kr","kw","ky","kz","la","lb","lc","li","lk","lr","ls","lt","lu","lv","ly","ma","mc","md","me","mf","mg","mh","mk","ml","mm","mn","mo","mp","mq","mr","ms","mt","mu","mv","mw","mx","my","mz","na","nc","ne","nf","ng","ni","nl","no","np","nr","nu","nz","om","pa","pe","pf","pg","ph","pk","pl","pm","pn","pr","ps","pt","pw","py","qa","re","ro","rs","ru","rw","sa","sb","sc","sd","se","sg","sh","si","sj","sk","sl","sm","sn","so","sr","ss","st","sv","sx","sy","sz","tc","td","tf","tg","th","tj","tk","tl","tm","tn","to","tr","tt","tv","tw","tz","ua","ug","um","us","uy","uz","va","vc","ve","vg","vi","vn","vu","wf","ws","ye","yt","za","zm","zw"]); - - // read config data at run time - pub static ref CONFIG: Config = serde_json::from_reader( - BufReader::new( - File::open("config.json").expect("Could not read config file") // TODO: Make config filename configurable - ) - ).expect("Reading config file from JSON failed"); } /// Get Julian date @@ -237,3 +209,65 @@ pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) { db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } + +/// Negative reports can be deduplicated, so we store to-be-processed +/// negative reports as a map of [report] to [count of report]. Add this +/// NR to that map (or create a new map if necessary). +pub fn save_negative_report_to_process(db: &Db, nr: NegativeReport) { + // We serialize the negative reports as strings to use them as map keys. + let mut reports = match db.get("nrs-to-process").unwrap() { + Some(v) => bincode::deserialize(&v).unwrap(), + None => BTreeMap::>::new(), + }; + // Store to-be-processed reports with key [fingerprint]_[date] + let map_key = format!( + "{}_{}", + array_bytes::bytes2hex("", &nr.fingerprint), + &nr.date + ); + let serialized_nr = nr.to_json(); + if reports.contains_key(&map_key) { + let nr_map = reports.get_mut(&map_key).unwrap(); + if nr_map.contains_key(&serialized_nr) { + let prev_count = nr_map.get(&serialized_nr).unwrap(); + nr_map.insert(serialized_nr, prev_count + 1); + } else { + nr_map.insert(serialized_nr, 1); + } + } else { + let mut nr_map = BTreeMap::::new(); + nr_map.insert(serialized_nr, 1); + reports.insert(map_key, nr_map); + } + // Commit changes to database + db.insert("nrs-to-process", bincode::serialize(&reports).unwrap()) + .unwrap(); +} + +/// We store to-be-processed positive reports as a vector. Add this PR +/// to that vector (or create a new vector if necessary). +pub fn save_positive_report_to_process(db: &Db, pr: PositiveReport) { + let mut reports = match db.get("prs-to-process").unwrap() { + Some(v) => bincode::deserialize(&v).unwrap(), + None => BTreeMap::>::new(), + }; + // Store to-be-processed reports with key [fingerprint]_[date] + let map_key = format!( + "{}_{}", + array_bytes::bytes2hex("", &pr.fingerprint), + &pr.date + ); + if reports.contains_key(&map_key) { + reports + .get_mut(&map_key) + .unwrap() + .push(pr.to_serializable_report()); + } else { + let mut prs = Vec::::new(); + prs.push(pr.to_serializable_report()); + reports.insert(map_key, prs); + } + // Commit changes to database + db.insert("prs-to-process", bincode::serialize(&reports).unwrap()) + .unwrap(); +} diff --git a/src/negative_report.rs b/src/negative_report.rs index 95d8095..843a2fb 100644 --- a/src/negative_report.rs +++ b/src/negative_report.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use sha1::{Digest, Sha1}; use sha3::Sha3_256; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub enum NegativeReportError { DateInFuture, FailedToDeserialize, // couldn't deserialize to SerializableNegativeReport @@ -102,6 +102,14 @@ impl NegativeReport { } } + /// Deserializes the report from slice, eliding the underlying process + pub fn from_slice(slice: &[u8]) -> Result { + match serde_json::from_slice::(&slice) { + Ok(v) => v.to_report(), + Err(_) => Err(NegativeReportError::FailedToDeserialize), + } + } + /// Verify the report pub fn verify(self, bridge_info: &BridgeVerificationInfo) -> bool { match self.bridge_pok { diff --git a/src/positive_report.rs b/src/positive_report.rs index 25b5a90..e28da2e 100644 --- a/src/positive_report.rs +++ b/src/positive_report.rs @@ -1,7 +1,7 @@ // For Lox-related code where points are uppercase and scalars are lowercase #![allow(non_snake_case)] -use crate::{bridge_verification_info::BridgeVerificationInfo, get_date, CONFIG, COUNTRY_CODES}; +use crate::{bridge_verification_info::BridgeVerificationInfo, get_date, COUNTRY_CODES}; use curve25519_dalek::ristretto::RistrettoBasepointTable; use ed25519_dalek::{Signature, Signer, SigningKey, Verifier}; @@ -12,7 +12,7 @@ use std::option::Option; pub const REQUIRE_BRIDGE_TOKEN: bool = false; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub enum PositiveReportError { DateInFuture, FailedToDeserialize, // couldn't deserialize to SerializablePositiveReport @@ -105,6 +105,14 @@ impl PositiveReport { } } + /// Deserializes the report from slice, eliding the underlying process + pub fn from_slice(slice: &[u8]) -> Result { + match serde_json::from_slice::(&slice) { + Ok(v) => v.to_report(), + Err(_) => Err(PositiveReportError::FailedToDeserialize), + } + } + /// Verify report pub fn verify( self, diff --git a/src/request_handler.rs b/src/request_handler.rs new file mode 100644 index 0000000..19ac8f0 --- /dev/null +++ b/src/request_handler.rs @@ -0,0 +1,61 @@ +use crate::{negative_report::NegativeReport, positive_report::PositiveReport, *}; +use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode}; +use serde_json::json; +use sled::Db; +use std::convert::Infallible; + +// Handle submitted reports +pub async fn handle(db: &Db, req: Request) -> Result, Infallible> { + match req.method() { + &Method::OPTIONS => Ok(Response::builder() + .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) + .header("Access-Control-Allow-Headers", "accept, content-type") + .header("Access-Control-Allow-Methods", "POST") + .status(200) + .body(Body::from("Allow POST")) + .unwrap()), + _ => match (req.method(), req.uri().path()) { + (&Method::POST, "/negativereport") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + let nr = match NegativeReport::from_slice(&bytes) { + Ok(nr) => nr, + Err(e) => { + let response = json!({"error": e}); + let val = serde_json::to_string(&response).unwrap(); + return Ok(prepare_header(val)); + } + }; + save_negative_report_to_process(&db, nr); + prepare_header("OK".to_string()) + }), + (&Method::POST, "/positivereport") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + let pr = match PositiveReport::from_slice(&bytes) { + Ok(pr) => pr, + Err(e) => { + let response = json!({"error": e}); + let val = serde_json::to_string(&response).unwrap(); + return Ok(prepare_header(val)); + } + }; + save_positive_report_to_process(&db, pr); + prepare_header("OK".to_string()) + }), + _ => { + // Return 404 not found response. + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found")) + .unwrap()) + } + }, + } +} + +// Prepare HTTP Response for successful Server Request +fn prepare_header(response: String) -> Response { + let mut resp = Response::new(Body::from(response)); + resp.headers_mut() + .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); + resp +}