Store bridge data by fingerprint and store set of fingerprints

We had been storing all the data as a huge map under the key 'bridges' so that we could iterate over all keys in the map as a way of iterating over all bridges. This caused the database to grow way too big, so we don't do that anymore. Now, we store each bridge's data with the bridge fingerprint as the key, and we store a set of bridge fingerprints under the key 'bridges'.
This commit is contained in:
Vecna 2024-04-06 01:09:43 -04:00
parent b915aea094
commit a679c13e4f
4 changed files with 103 additions and 54 deletions

View File

@ -40,6 +40,7 @@ pub struct Config {
pub db: DbConfig,
// map of distributor name to IP:port to contact it
pub distributors: BTreeMap<BridgeDistributor, String>,
extra_infos_base_url: String,
//require_bridge_token: bool,
port: u16,
}
@ -58,8 +59,12 @@ impl Default for DbConfig {
}
}
async fn update_daily_info(db: &Db, distributors: &BTreeMap<BridgeDistributor, String>) {
update_extra_infos(&db).await;
async fn update_daily_info(
db: &Db,
distributors: &BTreeMap<BridgeDistributor, String>,
extra_infos_base_url: &str,
) {
update_extra_infos(&db, &extra_infos_base_url).await;
update_negative_reports(&db, &distributors).await;
update_positive_reports(&db, &distributors).await;
let new_blockages = guess_blockages(&db, &analyzer::ExampleAnalyzer {});
@ -73,11 +78,12 @@ async fn run_updater(updater_tx: mpsc::Sender<Command>) {
async fn create_context_manager(
db_config: DbConfig,
distributors: BTreeMap<BridgeDistributor, String>,
extra_infos_base_url: &str,
context_rx: mpsc::Receiver<Command>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
create_context = context_manager(db_config, distributors, context_rx) => create_context,
create_context = context_manager(db_config, distributors, extra_infos_base_url, context_rx) => create_context,
_ = kill.recv() => {println!("Shut down manager");},
}
}
@ -85,6 +91,7 @@ async fn create_context_manager(
async fn context_manager(
db_config: DbConfig,
distributors: BTreeMap<BridgeDistributor, String>,
extra_infos_base_url: &str,
mut context_rx: mpsc::Receiver<Command>,
) {
let db: Db = sled::open(&db_config.db_path).unwrap();
@ -105,7 +112,7 @@ async fn context_manager(
println!("Shutdown Sent.");
}
Update {} => {
update_daily_info(&db, &distributors).await;
update_daily_info(&db, &distributors, &extra_infos_base_url).await;
}
}
}
@ -175,7 +182,14 @@ async fn main() {
});
let context_manager = spawn(async move {
create_context_manager(config.db, config.distributors, request_rx, kill).await
create_context_manager(
config.db,
config.distributors,
&config.extra_infos_base_url,
request_rx,
kill,
)
.await
});
let make_service = make_service_fn(move |_conn: &AddrStream| {

View File

@ -148,9 +148,9 @@ pub fn add_extra_infos<'a>(filename: &str, set: &mut HashSet<ExtraInfo>) {
/// Download new extra-infos files and save them in DIRECTORY. This function
/// returns the set of newly downloaded filenames.
pub async fn download_extra_infos(
base_url: &str,
) -> Result<HashSet<String>, Box<dyn std::error::Error + Send + Sync>> {
// Download directory of recent extra-infos
let base_url = "https://collector.torproject.org/recent/bridge-descriptors/extra-infos/";
let url = base_url.parse().unwrap();
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots() // TODO: Pin certificate? Is this data signed/verifiable?

View File

@ -165,6 +165,18 @@ impl fmt::Display for BridgeCountryInfo {
}
}
/// We store a set of all known bridges so that we can later iterate over them.
/// This function just adds a bridge fingerprint to that set.
pub fn add_bridge_to_db(db: &Db, fingerprint: [u8; 20]) {
let mut bridges = match db.get("bridges").unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => HashSet::<[u8; 20]>::new(),
};
bridges.insert(fingerprint);
db.insert("bridges", bincode::serialize(&bridges).unwrap())
.unwrap();
}
// Process extra-infos
/// Adds the extra-info data for a single bridge to the database. If the
@ -172,18 +184,14 @@ impl fmt::Display for BridgeCountryInfo {
/// but this extra-info contains different data for some reason, use the
/// greater count of connections from each country.
pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) {
let mut bridges = match db.get("bridges").unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => BTreeMap::<[u8; 20], BridgeInfo>::new(),
};
let fingerprint = extra_info.fingerprint;
if !bridges.contains_key(&fingerprint) {
bridges.insert(
fingerprint,
BridgeInfo::new(fingerprint, &extra_info.nickname),
);
}
let bridge_info = bridges.get_mut(&fingerprint).unwrap();
let mut bridge_info = match db.get(fingerprint).unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => {
add_bridge_to_db(&db, fingerprint);
BridgeInfo::new(fingerprint, &extra_info.nickname)
}
};
for country in extra_info.bridge_ips.keys() {
if bridge_info.info_by_country.contains_key::<String>(country) {
bridge_info
@ -209,12 +217,12 @@ pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) {
}
}
// Commit changes to database
db.insert("bridges", bincode::serialize(&bridges).unwrap())
db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap())
.unwrap();
}
/// Download new extra-infos files and add their data to the database
pub async fn update_extra_infos(db: &Db) {
pub async fn update_extra_infos(db: &Db, base_url: &str) {
// Track which files have been processed. This is slightly redundant
// because we're only downloading files we don't already have, but it
// might be a good idea to check in case we downloaded a file but didn't
@ -224,7 +232,7 @@ pub async fn update_extra_infos(db: &Db) {
None => HashSet::<String>::new(),
};
let new_files = extra_info::download_extra_infos().await.unwrap();
let new_files = extra_info::download_extra_infos(base_url).await.unwrap();
let mut new_extra_infos = HashSet::<ExtraInfo>::new();
@ -331,18 +339,16 @@ pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap<BridgeDist
let country = first_report.country;
let count_valid = verify_negative_reports(&distributors, reports).await;
let mut bridges = match db.get("bridges").unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => BTreeMap::<[u8; 20], BridgeInfo>::new(),
};
// Get bridge info or make new one
if !bridges.contains_key(&fingerprint) {
// This case shouldn't happen unless the bridge hasn't published
// any bridge stats.
bridges.insert(fingerprint, BridgeInfo::new(fingerprint, &"".to_string()));
}
let bridge_info = bridges.get_mut(&fingerprint).unwrap();
let mut bridge_info = match db.get(fingerprint).unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => {
// This case shouldn't happen unless the bridge hasn't
// published any bridge stats.
add_bridge_to_db(&db, fingerprint);
BridgeInfo::new(fingerprint, &String::default())
}
};
// Add the new report count to it
if bridge_info.info_by_country.contains_key(&country) {
@ -358,7 +364,7 @@ pub async fn update_negative_reports(db: &Db, distributors: &BTreeMap<BridgeDist
}
// Commit changes to database
db.insert("bridges", bincode::serialize(&bridges).unwrap())
db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap())
.unwrap();
}
}
@ -449,19 +455,16 @@ pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap<BridgeDist
let country = first_report.country.clone();
let count_valid = verify_positive_reports(&distributors, reports).await;
// Get bridge data from database
let mut bridges = match db.get("bridges").unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => BTreeMap::<[u8; 20], BridgeInfo>::new(),
};
// Get bridge info or make new one
if !bridges.contains_key(&fingerprint) {
// This case shouldn't happen unless the bridge hasn't published
// any bridge stats.
bridges.insert(fingerprint, BridgeInfo::new(fingerprint, &"".to_string()));
}
let bridge_info = bridges.get_mut(&fingerprint).unwrap();
let mut bridge_info = match db.get(fingerprint).unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => {
// This case shouldn't happen unless the bridge hasn't
// published any bridge stats.
add_bridge_to_db(&db, fingerprint);
BridgeInfo::new(fingerprint, &String::default())
}
};
// Add the new report count to it
if bridge_info.info_by_country.contains_key(&country) {
@ -476,7 +479,7 @@ pub async fn update_positive_reports(db: &Db, distributors: &BTreeMap<BridgeDist
.insert(country, bridge_country_info);
}
// Commit changes to database
db.insert("bridges", bincode::serialize(&bridges).unwrap())
db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap())
.unwrap();
}
}
@ -498,14 +501,16 @@ pub fn guess_blockages(db: &Db, analyzer: &dyn Analyzer) -> HashMap<[u8; 20], Ha
// Map of bridge fingerprint to set of countries which newly block it
let mut blockages = HashMap::<[u8; 20], HashSet<String>>::new();
// Get bridge data from database
let mut bridges = match db.get("bridges").unwrap() {
// Get list of bridges from database
let bridges = match db.get("bridges").unwrap() {
Some(v) => bincode::deserialize(&v).unwrap(),
None => BTreeMap::<[u8; 20], BridgeInfo>::new(),
None => HashSet::<[u8; 20]>::new(),
};
// Guess for each bridge
for (fingerprint, bridge_info) in &mut bridges {
for fingerprint in bridges {
let mut bridge_info: BridgeInfo =
bincode::deserialize(&db.get(fingerprint).unwrap().unwrap()).unwrap();
let mut new_blockages = HashSet::<String>::new();
let blocked_in = analyzer.blocked_in(&bridge_info);
for country in blocked_in {
@ -516,12 +521,12 @@ pub fn guess_blockages(db: &Db, analyzer: &dyn Analyzer) -> HashMap<[u8; 20], Ha
bridge_country_info.blocked = true;
}
}
blockages.insert(*fingerprint, new_blockages);
}
blockages.insert(fingerprint, new_blockages);
// Commit changes to database
db.insert("bridges", bincode::serialize(&bridges).unwrap())
.unwrap();
// Commit changes to database
db.insert(fingerprint, bincode::serialize(&bridge_info).unwrap())
.unwrap();
}
// Return map of new blockages
blockages

View File

@ -144,6 +144,36 @@ pub fn random() -> BridgeLine {
res
}
#[tokio::test]
async fn test_extra_infos() {
let bridge_to_test =
array_bytes::hex2array("72E12B89136B45BBC81D1EF0AC7DDDBB91B148DB").unwrap();
// Open test database
let db: Db = sled::open("test_db").unwrap();
// Delete all data in test DB
db.clear().unwrap();
assert!(!db.contains_key("bridges").unwrap());
assert!(!db.contains_key(bridge_to_test).unwrap());
// Download and process recent extra-infos files
update_extra_infos(
&db,
"https://collector.torproject.org/recent/bridge-descriptors/extra-infos/",
)
.await;
// Check that DB contains information on a bridge with high uptime
assert!(db.contains_key("bridges").unwrap());
let bridges: HashSet<[u8; 20]> =
bincode::deserialize(&db.get("bridges").unwrap().unwrap()).unwrap();
assert!(bridges.contains(&bridge_to_test));
assert!(db.contains_key(bridge_to_test).unwrap());
let bridge_info: BridgeInfo =
bincode::deserialize(&db.get(bridge_to_test).unwrap().unwrap()).unwrap();
}
#[test]
fn test_negative_reports() {
let mut th = TestHarness::new();