From f2626eb800668a735ebdb1c288889ade91592ef0 Mon Sep 17 00:00:00 2001 From: Vecna Date: Mon, 15 Apr 2024 03:09:14 -0400 Subject: [PATCH] Add analyzer which evaluates data as multivariate normal distribution --- Cargo.toml | 2 + src/analyzer.rs | 253 +++++++++++++++++++++++++++++++++++++++++++++- src/bin/server.rs | 35 ++++++- 3 files changed, 283 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f348a3f..9002753 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ hyper-util = { version = "0.1", features = ["full"] } julianday = "1.2.0" lazy_static = "1" lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" } +nalgebra = "0.29" rand = { version = "0.8" } #select = "0.6.0" serde = "1.0.197" @@ -29,6 +30,7 @@ serde_with = {version = "3.7.0", features = ["json"]} sha1 = "0.10" sha3 = "0.10" sled = "0.34.7" +statrs = "0.16" time = "0.3.30" tokio = { version = "1", features = ["full"] } tokio-cron = "0.1.2" diff --git a/src/analyzer.rs b/src/analyzer.rs index 74e799d..666383d 100644 --- a/src/analyzer.rs +++ b/src/analyzer.rs @@ -1,16 +1,19 @@ -use crate::BridgeInfo; -use std::collections::HashSet; +use crate::{get_date, BridgeInfo, BridgeInfoType}; +use lox_library::proto::trust_promotion::UNTRUSTED_INTERVAL; +use nalgebra::DVector; +use statrs::distribution::{Continuous, MultivariateNormal}; +use std::collections::{BTreeMap, HashSet}; /// Provides a function for predicting which countries block this bridge pub trait Analyzer { fn blocked_in(&self, bridge_info: &BridgeInfo, confidence: f64) -> HashSet; } +/// Dummy example that just tells us about blockages we already know about pub struct ExampleAnalyzer {} -/// Dummy example which just tells us about blockages we already know about impl Analyzer for ExampleAnalyzer { - fn blocked_in(&self, bridge_info: &BridgeInfo, confidence: f64) -> HashSet { + fn blocked_in(&self, bridge_info: &BridgeInfo, _confidence: f64) -> HashSet { let mut blocked_in = HashSet::::new(); for (country, info) in &bridge_info.info_by_country { if info.blocked { @@ -20,3 +23,245 @@ impl Analyzer for ExampleAnalyzer { blocked_in } } + +/// Model data as multivariate normal distribution +pub struct NormalAnalyzer { + max_threshold: u32, + scaling_factor: f64, +} + +impl NormalAnalyzer { + pub fn new(max_threshold: u32, scaling_factor: f64) -> Self { + Self { + max_threshold, + scaling_factor, + } + } + + fn mean_vector_and_covariance_matrix(data: &[&[u32]]) -> (Vec, Vec) { + let n = data.len(); + + // Compute mean vector + let mean_vec = { + let mut mean_vec = Vec::::new(); + for var in data { + mean_vec.push({ + let mut sum = 0.0; + for count in *var { + sum += *count as f64; + } + sum / var.len() as f64 + }); + } + mean_vec + }; + + // Compute covariance matrix + let cov_mat = { + let mut cov_mat = Vec::::new(); + // We don't need to recompute Syx, but we currently do + for i in 0..n { + for j in 0..n { + cov_mat.push({ + let var1 = data[i]; + let var1_mean = mean_vec[i]; + + let var2 = data[j]; + let var2_mean = mean_vec[j]; + + assert_eq!(var1.len(), var2.len()); + + let mut sum = 0.0; + for index in 0..var1.len() { + sum += + (var1[index] as f64 - var1_mean) * (var2[index] as f64 - var2_mean); + } + sum / var1.len() as f64 + }); + } + } + cov_mat + }; + + (mean_vec, cov_mat) + } + + /// Evaluate open-entry bridge based on only today's data + fn stage_one(&self, bridge_ips_today: u32, negative_reports_today: u32) -> bool { + negative_reports_today > self.max_threshold + || f64::from(negative_reports_today) > self.scaling_factor * f64::from(bridge_ips_today) + } + + /// Evaluate invite-only bridge based on last 30 days + fn stage_two( + &self, + confidence: f64, + bridge_ips: &[u32], + bridge_ips_today: u32, + negative_reports: &[u32], + negative_reports_today: u32, + ) -> bool { + assert!(bridge_ips.len() >= UNTRUSTED_INTERVAL as usize); + assert_eq!(bridge_ips.len(), negative_reports.len()); + + let (mean_vec, cov_mat) = + Self::mean_vector_and_covariance_matrix(&[bridge_ips, negative_reports]); + let bridge_ips_mean = mean_vec[0]; + let negative_reports_mean = mean_vec[1]; + + let mvn = MultivariateNormal::new(mean_vec, cov_mat).unwrap(); + if mvn.pdf(&DVector::from_vec(vec![ + bridge_ips_today as f64, + negative_reports_today as f64, + ])) < confidence + { + (negative_reports_today as f64) > negative_reports_mean + || (bridge_ips_today as f64) < bridge_ips_mean + } else { + false + } + } + + /// Evaluate invite-only bridge with lv3+ users submitting positive reports + fn stage_three( + &self, + confidence: f64, + bridge_ips: &[u32], + bridge_ips_today: u32, + negative_reports: &[u32], + negative_reports_today: u32, + positive_reports: &[u32], + positive_reports_today: u32, + ) -> bool { + assert!(bridge_ips.len() >= UNTRUSTED_INTERVAL as usize); + assert_eq!(bridge_ips.len(), negative_reports.len()); + assert_eq!(bridge_ips.len(), positive_reports.len()); + + let (mean_vec, cov_mat) = Self::mean_vector_and_covariance_matrix(&[ + bridge_ips, + negative_reports, + positive_reports, + ]); + let bridge_ips_mean = mean_vec[0]; + let negative_reports_mean = mean_vec[1]; + let positive_reports_mean = mean_vec[2]; + + let mvn = MultivariateNormal::new(mean_vec, cov_mat).unwrap(); + if mvn.pdf(&DVector::from_vec(vec![ + bridge_ips_today as f64, + negative_reports_today as f64, + positive_reports_today as f64, + ])) < confidence + { + (negative_reports_today as f64) > negative_reports_mean + || (bridge_ips_today as f64) < bridge_ips_mean + || (positive_reports_today as f64) < positive_reports_mean + } else { + false + } + } +} + +impl Analyzer for NormalAnalyzer { + fn blocked_in(&self, bridge_info: &BridgeInfo, confidence: f64) -> HashSet { + // TODO: Re-evaluate past days if we have backdated reports + let mut blocked_in = HashSet::::new(); + let today = get_date(); + let age = bridge_info.first_seen - today; + for (country, info) in &bridge_info.info_by_country { + if info.blocked { + // Assume bridges never become unblocked + blocked_in.insert(country.to_string()); + } else { + // Get today's values + let new_map_binding = BTreeMap::::new(); + // TODO: Evaluate on yesterday if we don't have data for today? + let today_info = match info.info_by_day.get(&today) { + Some(v) => v, + None => &new_map_binding, + }; + let bridge_ips_today = match today_info.get(&BridgeInfoType::BridgeIps) { + Some(v) => *v, + None => 0, + }; + let negative_reports_today = match today_info.get(&BridgeInfoType::NegativeReports) + { + Some(v) => *v, + None => 0, + }; + let positive_reports_today = match today_info.get(&BridgeInfoType::PositiveReports) + { + Some(v) => *v, + None => 0, + }; + + if age < UNTRUSTED_INTERVAL { + // open-entry bridge + if self.stage_one(bridge_ips_today, negative_reports_today) { + blocked_in.insert(country.to_string()); + } + } else { + // invite-only bridge + let mut bridge_ips = [0; UNTRUSTED_INTERVAL as usize]; + let mut negative_reports = [0; UNTRUSTED_INTERVAL as usize]; + let mut positive_reports = [0; UNTRUSTED_INTERVAL as usize]; + let mut stage_3 = false; + + // Populate time series + for i in 0..UNTRUSTED_INTERVAL { + let date = today - UNTRUSTED_INTERVAL + i - 1; + let new_map_binding = BTreeMap::::new(); + let day_info = match info.info_by_day.get(&date) { + Some(v) => v, + None => &new_map_binding, + }; + bridge_ips[i as usize] = match day_info.get(&BridgeInfoType::BridgeIps) { + Some(v) => *v, + None => 0, + }; + negative_reports[i as usize] = + match day_info.get(&BridgeInfoType::NegativeReports) { + Some(v) => *v, + None => 0, + }; + positive_reports[i as usize] = + match day_info.get(&BridgeInfoType::PositiveReports) { + Some(v) => { + stage_3 = true; + *v + } + None => 0, + }; + } + + if stage_3 { + // We've seen positive reports + if self.stage_three( + confidence, + &bridge_ips, + bridge_ips_today, + &negative_reports, + negative_reports_today, + &positive_reports, + positive_reports_today, + ) { + blocked_in.insert(country.to_string()); + } + } else { + // We have not seen positive reports + if self.stage_two( + confidence, + &bridge_ips, + bridge_ips_today, + &negative_reports, + negative_reports_today, + ) { + blocked_in.insert(country.to_string()); + } + } + } + } + } + blocked_in + } +} diff --git a/src/bin/server.rs b/src/bin/server.rs index 70fdf69..1e8ac0e 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -41,8 +41,17 @@ pub struct Config { // map of distributor name to IP:port to contact it pub distributors: BTreeMap, extra_infos_base_url: String, + // confidence required to consider a bridge blocked confidence: f64, + + // block open-entry bridges if they get more negative reports than this + max_threshold: u32, + + // block open-entry bridges if they get more negative reports than + // scaling_factor * bridge_ips + scaling_factor: f64, + //require_bridge_token: bool, port: u16, updater_schedule: String, @@ -67,13 +76,19 @@ async fn update_daily_info( distributors: &BTreeMap, extra_infos_base_url: &str, confidence: f64, + max_threshold: u32, + scaling_factor: f64, ) { update_extra_infos(&db, &extra_infos_base_url) .await .unwrap(); update_negative_reports(&db, &distributors).await; update_positive_reports(&db, &distributors).await; - let new_blockages = guess_blockages(&db, &analyzer::ExampleAnalyzer {}, confidence); + let new_blockages = guess_blockages( + &db, + &analyzer::NormalAnalyzer::new(max_threshold, scaling_factor), + confidence, + ); report_blockages(&distributors, new_blockages).await; } @@ -86,11 +101,13 @@ async fn create_context_manager( distributors: BTreeMap, extra_infos_base_url: &str, confidence: f64, + max_threshold: u32, + scaling_factor: f64, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { - create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, context_rx) => create_context, + create_context = context_manager(db_config, distributors, extra_infos_base_url, confidence, max_threshold, scaling_factor, context_rx) => create_context, _ = kill.recv() => {println!("Shut down manager");}, } } @@ -100,6 +117,8 @@ async fn context_manager( distributors: BTreeMap, extra_infos_base_url: &str, confidence: f64, + max_threshold: u32, + scaling_factor: f64, mut context_rx: mpsc::Receiver, ) { let db: Db = sled::open(&db_config.db_path).unwrap(); @@ -120,7 +139,15 @@ async fn context_manager( println!("Shutdown Sent."); } Update {} => { - update_daily_info(&db, &distributors, &extra_infos_base_url, confidence).await; + update_daily_info( + &db, + &distributors, + &extra_infos_base_url, + confidence, + max_threshold, + scaling_factor, + ) + .await; } } } @@ -188,6 +215,8 @@ async fn main() { config.distributors, &config.extra_infos_base_url, config.confidence, + config.max_threshold, + config.scaling_factor, request_rx, kill, )