use hyper::{Body, Client, Method, Request}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use sled::Db; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, }; pub mod bridge_verification_info; pub mod extra_info; pub mod negative_report; pub mod positive_report; pub mod request_handler; use extra_info::*; use negative_report::*; use positive_report::*; lazy_static! { // known country codes based on Tor geoIP database // Produced with `cat /usr/share/tor/geoip{,6} | grep -v ^# | grep -o ..$ | sort | uniq | tr '[:upper:]' '[:lower:]' | tr '\n' ',' | sed 's/,/","/g'` pub static ref COUNTRY_CODES: HashSet<&'static str> = HashSet::from(["??","ad","ae","af","ag","ai","al","am","ao","ap","aq","ar","as","at","au","aw","ax","az","ba","bb","bd","be","bf","bg","bh","bi","bj","bl","bm","bn","bo","bq","br","bs","bt","bv","bw","by","bz","ca","cc","cd","cf","cg","ch","ci","ck","cl","cm","cn","co","cr","cs","cu","cv","cw","cx","cy","cz","de","dj","dk","dm","do","dz","ec","ee","eg","eh","er","es","et","eu","fi","fj","fk","fm","fo","fr","ga","gb","gd","ge","gf","gg","gh","gi","gl","gm","gn","gp","gq","gr","gs","gt","gu","gw","gy","hk","hm","hn","hr","ht","hu","id","ie","il","im","in","io","iq","ir","is","it","je","jm","jo","jp","ke","kg","kh","ki","km","kn","kp","kr","kw","ky","kz","la","lb","lc","li","lk","lr","ls","lt","lu","lv","ly","ma","mc","md","me","mf","mg","mh","mk","ml","mm","mn","mo","mp","mq","mr","ms","mt","mu","mv","mw","mx","my","mz","na","nc","ne","nf","ng","ni","nl","no","np","nr","nu","nz","om","pa","pe","pf","pg","ph","pk","pl","pm","pn","pr","ps","pt","pw","py","qa","re","ro","rs","ru","rw","sa","sb","sc","sd","se","sg","sh","si","sj","sk","sl","sm","sn","so","sr","ss","st","sv","sx","sy","sz","tc","td","tf","tg","th","tj","tk","tl","tm","tn","to","tr","tt","tv","tw","tz","ua","ug","um","us","uy","uz","va","vc","ve","vg","vi","vn","vu","wf","ws","ye","yt","za","zm","zw"]); } /// Get Julian date pub fn get_date() -> u32 { time::OffsetDateTime::now_utc() .date() .to_julian_day() .try_into() .unwrap() } #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeDistributor { Lox, } /// All the info for a bridge, to be stored in the database #[derive(Serialize, Deserialize)] pub struct BridgeInfo { /// hashed fingerprint (SHA-1 hash of 20-byte bridge ID) pub fingerprint: [u8; 20], /// nickname of bridge (probably not necessary) pub nickname: String, /// first Julian date we started collecting data on this bridge pub first_seen: u32, /// list of countries where the bridge is believed to be blocked pub blocked_in: Vec, /// map of dates to data for that day pub info_by_day: HashMap, } impl BridgeInfo { pub fn new(fingerprint: [u8; 20], nickname: &String) -> Self { Self { fingerprint: fingerprint, nickname: nickname.to_string(), first_seen: get_date(), blocked_in: Vec::::new(), info_by_day: HashMap::::new(), } } } impl fmt::Display for BridgeInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = format!( "fingerprint:{}\n", array_bytes::bytes2hex("", self.fingerprint).as_str() ); str.push_str(format!("nickname: {}\n", self.nickname).as_str()); str.push_str(format!("first_seen: {}\n", self.first_seen).as_str()); str.push_str("blocked_in:"); for country in &self.blocked_in { str.push_str(format!("\n {}", country).as_str()); } str.push_str("info_by_day:"); for day in self.info_by_day.keys() { str.push_str(format!("\n day: {}", day).as_str()); let daily_info = self.info_by_day.get(day).unwrap(); for line in daily_info.to_string().lines() { str.push_str(format!("\n {}", line).as_str()); } } write!(f, "{}", str) } } #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeInfoType { BridgeIps, NegativeReports, PositiveReports, } /// Information about bridge reachability, gathered daily #[derive(Serialize, Deserialize)] pub struct DailyBridgeInfo { pub info_by_country: BTreeMap>, } impl DailyBridgeInfo { pub fn new() -> Self { Self { info_by_country: BTreeMap::>::new(), } } pub fn add_info( &mut self, info_type: BridgeInfoType, count_per_country: &BTreeMap, ) { for country in count_per_country.keys() { if self.info_by_country.contains_key(country) { let info = self.info_by_country.get_mut(country).unwrap(); if !info.contains_key(&info_type) { info.insert( info_type, *count_per_country.get(&country.to_string()).unwrap(), ); } else if info_type == BridgeInfoType::BridgeIps { // Use newest value we've seen today if info.get(&info_type).unwrap() < count_per_country.get(country).unwrap() { info.insert( BridgeInfoType::BridgeIps, *count_per_country.get(&country.to_string()).unwrap(), ); } } else { let new_count = info.get(&info_type).unwrap() + *count_per_country.get(&country.to_string()).unwrap(); info.insert(info_type, new_count); } } else { let mut info = BTreeMap::::new(); info.insert( info_type, *count_per_country.get(&country.to_string()).unwrap(), ); self.info_by_country.insert(country.to_string(), info); } } } } impl fmt::Display for DailyBridgeInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = String::from("info:"); for country in self.info_by_country.keys() { let info = self.info_by_country.get(country).unwrap(); let ip_count = match info.get(&BridgeInfoType::BridgeIps) { Some(v) => v, None => &0, }; let nr_count = match info.get(&BridgeInfoType::NegativeReports) { Some(v) => v, None => &0, }; let pr_count = match info.get(&BridgeInfoType::PositiveReports) { Some(v) => v, None => &0, }; if ip_count > &0 || nr_count > &0 || pr_count > &0 { str.push_str( format!( "\n cc: {}\n connections: {}\n negative reports: {}\n positive reports: {}", country, ip_count, nr_count, pr_count, ) .as_str(), ); } } write!(f, "{}", str) } } // Process extra-infos /// Adds the extra-info data for a single bridge to the database. If the /// database already contains an extra-info for this bridge for thid date, /// but this extra-info contains different data for some reason, use the /// greater count of connections from each country. pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) { let fingerprint = extra_info.fingerprint; let mut bridge_info = match db.get(&fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BridgeInfo::new(fingerprint, &extra_info.nickname), }; // If we already have an entry, compare it with the new one. For each // country:count mapping, use the greater of the two counts. if bridge_info.info_by_day.contains_key(&extra_info.date) { let daily_bridge_info = bridge_info.info_by_day.get_mut(&extra_info.date).unwrap(); daily_bridge_info.add_info(BridgeInfoType::BridgeIps, &extra_info.bridge_ips); } else { // No existing entry; make a new one. let mut daily_bridge_info = DailyBridgeInfo { info_by_country: BTreeMap::>::new(), }; daily_bridge_info.add_info(BridgeInfoType::BridgeIps, &extra_info.bridge_ips); bridge_info .info_by_day .insert(extra_info.date, daily_bridge_info); } // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } /// Download new extra-infos files and add their data to the database pub async fn update_extra_infos(db: &Db) { // Track which files have been processed. This is slightly redundant // because we're only downloading files we don't already have, but it // might be a good idea to check in case we downloaded a file but didn't // process it for some reason. let mut processed_extra_infos_files = match db.get(b"extra_infos_files").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::::new(), }; let new_files = extra_info::download_extra_infos().await.unwrap(); let mut new_extra_infos = HashSet::::new(); // Make set of new extra-infos for extra_info_file in &new_files { extra_info::add_extra_infos(&extra_info_file, &mut new_extra_infos); processed_extra_infos_files.insert(extra_info_file.to_string()); } // Add new extra-infos data to database for extra_info in new_extra_infos { add_extra_info_to_db(&db, extra_info); } db.insert( b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap(), ) .unwrap(); } // Process negative reports /// Negative reports can be deduplicated, so we store to-be-processed /// negative reports as a map of [report] to [count of report]. Add this /// NR to that map (or create a new map if necessary). pub fn save_negative_report_to_process(db: &Db, nr: NegativeReport) { // We serialize the negative reports as strings to use them as map keys. let mut reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[date] let map_key = format!( "{}_{}", array_bytes::bytes2hex("", &nr.fingerprint), &nr.date ); let serialized_nr = nr.to_json(); if reports.contains_key(&map_key) { let nr_map = reports.get_mut(&map_key).unwrap(); if nr_map.contains_key(&serialized_nr) { let prev_count = nr_map.get(&serialized_nr).unwrap(); nr_map.insert(serialized_nr, prev_count + 1); } else { nr_map.insert(serialized_nr, 1); } } else { let mut nr_map = BTreeMap::::new(); nr_map.insert(serialized_nr, 1); reports.insert(map_key, nr_map); } // Commit changes to database db.insert("nrs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of negative reports to the Lox Authority and returns the /// number of valid reports returned by the server. The negative reports in the /// collection should all have the same bridge fingerprint, date, country, and /// distributor. pub async fn verify_negative_reports( distributors: &BTreeMap, reports: &BTreeMap, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } // Get one report, assume the rest have the same distributor let first_report: SerializableNegativeReport = serde_json::from_str(reports.first_key_value().unwrap().0).unwrap(); let distributor = first_report.distributor; let client = Client::new(); let uri: String = (distributors.get(&distributor).unwrap().to_owned() + "/verifynegative") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's negative reports and store the count of verified reports in /// the database. pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap) { let mut all_negative_reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; for bridge_date in all_negative_reports.keys() { // We could parse the fingerprint and date: //let fingerprint: [u8; 20] = array_bytes::hex2array(&bridge_date[0..40]).unwrap(); //let date: u32 = &bridge_date[41..].parse().unwrap(); // but instead, let's just get it from the first report let reports = all_negative_reports.get(bridge_date).unwrap(); if !reports.is_empty() { let first_report: SerializableNegativeReport = serde_json::from_str(reports.first_key_value().unwrap().0).unwrap(); let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country; let count_valid = verify_negative_reports(&distributors, reports).await; let mut count_per_country = BTreeMap::::new(); count_per_country.insert(country, count_valid).unwrap(); let mut bridge_info = match db.get(&fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), // It should already exist, unless the bridge hasn't published // any bridge stats. None => BridgeInfo::new(fingerprint, &"".to_string()), }; // Add the new report count to it if bridge_info.info_by_day.contains_key(&date) { let daily_bridge_info = bridge_info.info_by_day.get_mut(&date).unwrap(); daily_bridge_info.add_info(BridgeInfoType::NegativeReports, &count_per_country); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } else { // No existing entry; make a new one. let mut daily_bridge_info = DailyBridgeInfo::new(); daily_bridge_info.add_info(BridgeInfoType::NegativeReports, &count_per_country); bridge_info.info_by_day.insert(date, daily_bridge_info); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } } // TODO: Would it be cheaper to just recreate it? all_negative_reports.clear(); // Remove the now-processed reports from the database db.insert( "nrs-to-process", bincode::serialize(&all_negative_reports).unwrap(), ) .unwrap(); } // Process positive reports /// We store to-be-processed positive reports as a vector. Add this PR /// to that vector (or create a new vector if necessary). pub fn save_positive_report_to_process(db: &Db, pr: PositiveReport) { let mut reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[date] let map_key = format!( "{}_{}", array_bytes::bytes2hex("", &pr.fingerprint), &pr.date ); if reports.contains_key(&map_key) { reports .get_mut(&map_key) .unwrap() .push(pr.to_serializable_report()); } else { let mut prs = Vec::::new(); prs.push(pr.to_serializable_report()); reports.insert(map_key, prs); } // Commit changes to database db.insert("prs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of positive reports to the Lox Authority and returns the /// number of valid reports returned by the server. The positive reports in the /// collection should all have the same bridge fingerprint, date, and country. pub async fn verify_positive_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } let client = Client::new(); let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/verifypositive") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's positive reports and store the count of verified reports in /// the database. pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap) { let mut all_positive_reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; for bridge_date in all_positive_reports.keys() { // We could parse the fingerprint and date: //let fingerprint: [u8; 20] = array_bytes::hex2array(&bridge_date[0..40]).unwrap(); //let date: u32 = &bridge_date[41..].parse().unwrap(); // but instead, let's just get it from the first report let reports = all_positive_reports.get(bridge_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_positive_reports(&distributors, reports).await; let mut count_per_country = BTreeMap::::new(); count_per_country.insert(country, count_valid).unwrap(); let mut bridge_info = match db.get(&fingerprint).unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), // It should already exist, unless the bridge hasn't published // any bridge stats. None => BridgeInfo::new(fingerprint, &"".to_string()), }; // Add the new report count to it if bridge_info.info_by_day.contains_key(&date) { let daily_bridge_info = bridge_info.info_by_day.get_mut(&date).unwrap(); daily_bridge_info.add_info(BridgeInfoType::PositiveReports, &count_per_country); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } else { // No existing entry; make a new one. let mut daily_bridge_info = DailyBridgeInfo::new(); daily_bridge_info.add_info(BridgeInfoType::PositiveReports, &count_per_country); bridge_info.info_by_day.insert(date, daily_bridge_info); // Commit changes to database db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap()) .unwrap(); } } } // TODO: Would it be cheaper to just recreate it? all_positive_reports.clear(); // Remove the now-processed reports from the database db.insert( "prs-to-process", bincode::serialize(&all_positive_reports).unwrap(), ) .unwrap(); } // TODO: function to mark a bridge as blocked