use hyper::{Body, Client, Method, Request}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use sled::Db; use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, }; pub mod analyzer; pub mod bridge_verification_info; pub mod extra_info; pub mod negative_report; pub mod positive_report; pub mod request_handler; use analyzer::Analyzer; use extra_info::*; use negative_report::*; use positive_report::*; lazy_static! { // known country codes based on Tor geoIP database // Produced with `cat /usr/share/tor/geoip{,6} | grep -v ^# | grep -o ..$ | sort | uniq | tr '[:upper:]' '[:lower:]' | tr '\n' ',' | sed 's/,/","/g'` pub static ref COUNTRY_CODES: HashSet<&'static str> = HashSet::from(["??","ad","ae","af","ag","ai","al","am","ao","ap","aq","ar","as","at","au","aw","ax","az","ba","bb","bd","be","bf","bg","bh","bi","bj","bl","bm","bn","bo","bq","br","bs","bt","bv","bw","by","bz","ca","cc","cd","cf","cg","ch","ci","ck","cl","cm","cn","co","cr","cs","cu","cv","cw","cx","cy","cz","de","dj","dk","dm","do","dz","ec","ee","eg","eh","er","es","et","eu","fi","fj","fk","fm","fo","fr","ga","gb","gd","ge","gf","gg","gh","gi","gl","gm","gn","gp","gq","gr","gs","gt","gu","gw","gy","hk","hm","hn","hr","ht","hu","id","ie","il","im","in","io","iq","ir","is","it","je","jm","jo","jp","ke","kg","kh","ki","km","kn","kp","kr","kw","ky","kz","la","lb","lc","li","lk","lr","ls","lt","lu","lv","ly","ma","mc","md","me","mf","mg","mh","mk","ml","mm","mn","mo","mp","mq","mr","ms","mt","mu","mv","mw","mx","my","mz","na","nc","ne","nf","ng","ni","nl","no","np","nr","nu","nz","om","pa","pe","pf","pg","ph","pk","pl","pm","pn","pr","ps","pt","pw","py","qa","re","ro","rs","ru","rw","sa","sb","sc","sd","se","sg","sh","si","sj","sk","sl","sm","sn","so","sr","ss","st","sv","sx","sy","sz","tc","td","tf","tg","th","tj","tk","tl","tm","tn","to","tr","tt","tv","tw","tz","ua","ug","um","us","uy","uz","va","vc","ve","vg","vi","vn","vu","wf","ws","ye","yt","za","zm","zw"]); } /// Get Julian date pub fn get_date() -> u32 { time::OffsetDateTime::now_utc() .date() .to_julian_day() .try_into() .unwrap() } #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeDistributor { Lox, } /// All the info for a bridge, to be stored in the database #[derive(Serialize, Deserialize)] pub struct BridgeInfo { /// hashed fingerprint (SHA-1 hash of 20-byte bridge ID) pub fingerprint: [u8; 20], /// nickname of bridge (probably not necessary) pub nickname: String, /// first Julian date we started collecting data on this bridge pub first_seen: u32, /// map of countries to data for this bridge in that country pub info_by_country: HashMap, } impl BridgeInfo { pub fn new(fingerprint: [u8; 20], nickname: &String) -> Self { Self { fingerprint: fingerprint, nickname: nickname.to_string(), first_seen: get_date(), info_by_country: HashMap::::new(), } } } impl fmt::Display for BridgeInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = format!( "fingerprint:{}\n", array_bytes::bytes2hex("", self.fingerprint).as_str() ); str.push_str(format!("nickname: {}\n", self.nickname).as_str()); str.push_str(format!("first_seen: {}\n", self.first_seen).as_str()); str.push_str("info_by_country:"); for country in self.info_by_country.keys() { str.push_str(format!("\n country: {}", country).as_str()); let country_info = self.info_by_country.get(country).unwrap(); for line in country_info.to_string().lines() { str.push_str(format!("\n {}", line).as_str()); } } write!(f, "{}", str) } } #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub enum BridgeInfoType { BridgeIps, NegativeReports, PositiveReports, } /// Information about bridge reachability from a given country #[derive(Serialize, Deserialize)] pub struct BridgeCountryInfo { pub info_by_day: BTreeMap>, pub blocked: bool, } impl BridgeCountryInfo { pub fn new() -> Self { Self { info_by_day: BTreeMap::>::new(), blocked: false, } } pub fn add_info(&mut self, info_type: BridgeInfoType, date: u32, count: u32) { if self.info_by_day.contains_key(&date) { let info = self.info_by_day.get_mut(&date).unwrap(); if !info.contains_key(&info_type) { info.insert(info_type, count); } else if info_type == BridgeInfoType::BridgeIps { if *info.get(&info_type).unwrap() < count { // Use highest value we've seen today info.insert(info_type, count); } } else { // Add count to previous count for reports let new_count = info.get(&info_type).unwrap() + count; info.insert(info_type, new_count); } } else { let mut info = BTreeMap::::new(); info.insert(info_type, count); self.info_by_day.insert(date, info); } } } impl fmt::Display for BridgeCountryInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut str = String::from("info:"); for date in self.info_by_day.keys() { let info = self.info_by_day.get(date).unwrap(); let ip_count = match info.get(&BridgeInfoType::BridgeIps) { Some(v) => v, None => &0, }; let nr_count = match info.get(&BridgeInfoType::NegativeReports) { Some(v) => v, None => &0, }; let pr_count = match info.get(&BridgeInfoType::PositiveReports) { Some(v) => v, None => &0, }; if ip_count > &0 || nr_count > &0 || pr_count > &0 { str.push_str( format!( "\n date: {}\n connections: {}\n negative reports: {}\n positive reports: {}", date, ip_count, nr_count, pr_count, ) .as_str(), ); } } write!(f, "{}", str) } } // Process extra-infos /// Adds the extra-info data for a single bridge to the database. If the /// database already contains an extra-info for this bridge for thid date, /// but this extra-info contains different data for some reason, use the /// greater count of connections from each country. pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) { let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::<[u8; 20], BridgeInfo>::new(), }; let fingerprint = extra_info.fingerprint; if !bridges.contains_key(&fingerprint) { bridges.insert( fingerprint, BridgeInfo::new(fingerprint, &extra_info.nickname), ); } let bridge_info = bridges.get_mut(&fingerprint).unwrap(); for country in extra_info.bridge_ips.keys() { if bridge_info.info_by_country.contains_key::(country) { bridge_info .info_by_country .get_mut(country) .unwrap() .add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info( BridgeInfoType::BridgeIps, extra_info.date, *extra_info.bridge_ips.get(country).unwrap(), ); bridge_info .info_by_country .insert(country.to_string(), bridge_country_info); } } // Commit changes to database db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); } /// Download new extra-infos files and add their data to the database pub async fn update_extra_infos(db: &Db) { // Track which files have been processed. This is slightly redundant // because we're only downloading files we don't already have, but it // might be a good idea to check in case we downloaded a file but didn't // process it for some reason. let mut processed_extra_infos_files = match db.get(b"extra_infos_files").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => HashSet::::new(), }; let new_files = extra_info::download_extra_infos().await.unwrap(); let mut new_extra_infos = HashSet::::new(); // Make set of new extra-infos for extra_info_file in &new_files { extra_info::add_extra_infos(&extra_info_file, &mut new_extra_infos); processed_extra_infos_files.insert(extra_info_file.to_string()); } // Add new extra-infos data to database for extra_info in new_extra_infos { add_extra_info_to_db(&db, extra_info); } db.insert( b"extra_infos_files", bincode::serialize(&processed_extra_infos_files).unwrap(), ) .unwrap(); } // Process negative reports /// Negative reports can be deduplicated, so we store to-be-processed /// negative reports as a map of [report] to [count of report]. Add this /// NR to that map (or create a new map if necessary). pub fn save_negative_report_to_process(db: &Db, nr: NegativeReport) { // We serialize the negative reports as strings to use them as map keys. let mut reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", &nr.fingerprint), &nr.country, &nr.date, ); let serialized_nr = nr.to_json(); if reports.contains_key(&map_key) { let nr_map = reports.get_mut(&map_key).unwrap(); if nr_map.contains_key(&serialized_nr) { let prev_count = nr_map.get(&serialized_nr).unwrap(); nr_map.insert(serialized_nr, prev_count + 1); } else { nr_map.insert(serialized_nr, 1); } } else { let mut nr_map = BTreeMap::::new(); nr_map.insert(serialized_nr, 1); reports.insert(map_key, nr_map); } // Commit changes to database db.insert("nrs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of negative reports to the Lox Authority and returns the /// number of valid reports returned by the server. The negative reports in the /// collection should all have the same bridge fingerprint, date, country, and /// distributor. pub async fn verify_negative_reports( distributors: &BTreeMap, reports: &BTreeMap, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } // Get one report, assume the rest have the same distributor let first_report: SerializableNegativeReport = serde_json::from_str(reports.first_key_value().unwrap().0).unwrap(); let distributor = first_report.distributor; let client = Client::new(); let uri: String = (distributors.get(&distributor).unwrap().to_owned() + "/verifynegative") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's negative reports and store the count of verified reports in /// the database. pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap) { let mut all_negative_reports = match db.get("nrs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_negative_reports.keys() { let reports = all_negative_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report: SerializableNegativeReport = serde_json::from_str(reports.first_key_value().unwrap().0).unwrap(); let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country; let count_valid = verify_negative_reports(&distributors, reports).await; let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::<[u8; 20], BridgeInfo>::new(), }; // Get bridge info or make new one if !bridges.contains_key(&fingerprint) { // This case shouldn't happen unless the bridge hasn't published // any bridge stats. bridges.insert(fingerprint, BridgeInfo::new(fingerprint, &"".to_string())); } let bridge_info = bridges.get_mut(&fingerprint).unwrap(); // Add the new report count to it if bridge_info.info_by_country.contains_key(&country) { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info(BridgeInfoType::NegativeReports, date, count_valid); bridge_info .info_by_country .insert(country, bridge_country_info); } // Commit changes to database db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); } } // TODO: Would it be cheaper to just recreate it? all_negative_reports.clear(); // Remove the now-processed reports from the database db.insert( "nrs-to-process", bincode::serialize(&all_negative_reports).unwrap(), ) .unwrap(); } // Process positive reports /// We store to-be-processed positive reports as a vector. Add this PR /// to that vector (or create a new vector if necessary). pub fn save_positive_report_to_process(db: &Db, pr: PositiveReport) { let mut reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Store to-be-processed reports with key [fingerprint]_[country]_[date] let map_key = format!( "{}_{}_{}", array_bytes::bytes2hex("", &pr.fingerprint), &pr.country, &pr.date, ); if reports.contains_key(&map_key) { reports .get_mut(&map_key) .unwrap() .push(pr.to_serializable_report()); } else { let mut prs = Vec::::new(); prs.push(pr.to_serializable_report()); reports.insert(map_key, prs); } // Commit changes to database db.insert("prs-to-process", bincode::serialize(&reports).unwrap()) .unwrap(); } /// Sends a collection of positive reports to the Lox Authority and returns the /// number of valid reports returned by the server. The positive reports in the /// collection should all have the same bridge fingerprint, date, and country. pub async fn verify_positive_reports( distributors: &BTreeMap, reports: &Vec, ) -> u32 { // Don't make a network call if we don't have any reports anyway if reports.is_empty() { return 0; } let client = Client::new(); let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/verifypositive") .parse() .unwrap(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&reports).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); serde_json::from_slice(&buf).unwrap() } /// Process today's positive reports and store the count of verified reports in /// the database. pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap) { let mut all_positive_reports = match db.get("prs-to-process").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::>::new(), }; // Key is [fingerprint]_[country]_[date] for bridge_country_date in all_positive_reports.keys() { let reports = all_positive_reports.get(bridge_country_date).unwrap(); if !reports.is_empty() { let first_report = &reports[0]; let fingerprint = first_report.fingerprint; let date = first_report.date; let country = first_report.country.clone(); let count_valid = verify_positive_reports(&distributors, reports).await; // Get bridge data from database let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::<[u8; 20], BridgeInfo>::new(), }; // Get bridge info or make new one if !bridges.contains_key(&fingerprint) { // This case shouldn't happen unless the bridge hasn't published // any bridge stats. bridges.insert(fingerprint, BridgeInfo::new(fingerprint, &"".to_string())); } let bridge_info = bridges.get_mut(&fingerprint).unwrap(); // Add the new report count to it if bridge_info.info_by_country.contains_key(&country) { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); } else { // No existing entry; make a new one. let mut bridge_country_info = BridgeCountryInfo::new(); bridge_country_info.add_info(BridgeInfoType::PositiveReports, date, count_valid); bridge_info .info_by_country .insert(country, bridge_country_info); } // Commit changes to database db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); } } // TODO: Would it be cheaper to just recreate it? all_positive_reports.clear(); // Remove the now-processed reports from the database db.insert( "prs-to-process", bincode::serialize(&all_positive_reports).unwrap(), ) .unwrap(); } // Verdict on bridge reachability /// Guess which countries block a bridge. This function returns a map of new /// blockages (fingerprint : set of countries which block the bridge) pub fn guess_blockages(db: &Db, analyzer: &dyn Analyzer) -> HashMap<[u8; 20], HashSet> { // Map of bridge fingerprint to set of countries which newly block it let mut blockages = HashMap::<[u8; 20], HashSet>::new(); // Get bridge data from database let mut bridges = match db.get("bridges").unwrap() { Some(v) => bincode::deserialize(&v).unwrap(), None => BTreeMap::<[u8; 20], BridgeInfo>::new(), }; // Guess for each bridge for (fingerprint, bridge_info) in &mut bridges { let mut new_blockages = HashSet::::new(); let blocked_in = analyzer.blocked_in(&bridge_info); for country in blocked_in { let bridge_country_info = bridge_info.info_by_country.get_mut(&country).unwrap(); if !bridge_country_info.blocked { new_blockages.insert(country.to_string()); // Mark bridge as blocked when db gets updated bridge_country_info.blocked = true; } } blockages.insert(*fingerprint, new_blockages); } // Commit changes to database db.insert("bridges", bincode::serialize(&bridges).unwrap()) .unwrap(); // Return map of new blockages blockages } /// Report blocked bridges to bridge distributor pub async fn report_blockages( distributors: &BTreeMap, blockages: HashMap<[u8; 20], HashSet>, ) { // For now, only report to Lox // TODO: Support more distributors let uri: String = (distributors .get(&BridgeDistributor::Lox) .unwrap() .to_owned() + "/reportblocked") .parse() .unwrap(); // Convert map keys from [u8; 20] to 40-character hex strings let mut blockages_str = HashMap::>::new(); for (fingerprint, countries) in blockages { let fpr_string = array_bytes::bytes2hex("", fingerprint); blockages_str.insert(fpr_string, countries); } // Report blocked bridges to bridge distributor let client = Client::new(); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(serde_json::to_string(&blockages_str).unwrap())) .unwrap(); let resp = client.request(req).await.unwrap(); let buf = hyper::body::to_bytes(resp).await.unwrap(); let resp_str: String = serde_json::from_slice(&buf).unwrap(); assert_eq!("OK", resp_str); } // Unit tests #[cfg(test)] mod tests;