Don't persist extra-infos files to disk, just process and store results
This commit is contained in:
parent
a679c13e4f
commit
e1588aac0e
|
@ -21,6 +21,7 @@ hyper-util = { version = "0.1", features = ["full"] }
|
|||
julianday = "1.2.0"
|
||||
lazy_static = "1"
|
||||
lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" }
|
||||
#select = "0.6.0"
|
||||
serde = "1.0.197"
|
||||
serde_json = "1.0"
|
||||
serde_with = {version = "3.7.0", features = ["json"]}
|
||||
|
|
|
@ -64,7 +64,9 @@ async fn update_daily_info(
|
|||
distributors: &BTreeMap<BridgeDistributor, String>,
|
||||
extra_infos_base_url: &str,
|
||||
) {
|
||||
update_extra_infos(&db, &extra_infos_base_url).await;
|
||||
update_extra_infos(&db, &extra_infos_base_url)
|
||||
.await
|
||||
.unwrap();
|
||||
update_negative_reports(&db, &distributors).await;
|
||||
update_positive_reports(&db, &distributors).await;
|
||||
let new_blockages = guess_blockages(&db, &analyzer::ExampleAnalyzer {});
|
||||
|
|
|
@ -3,21 +3,9 @@ Note, this is NOT a complete implementation of the document format.
|
|||
(https://spec.torproject.org/dir-spec/extra-info-document-format.html) */
|
||||
|
||||
use chrono::DateTime;
|
||||
use http::status::StatusCode;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper::body::Bytes;
|
||||
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
|
||||
use julianday::JulianDay;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
fs::File,
|
||||
io::{prelude::*, BufReader, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
/// Directory where we store these files
|
||||
pub const DIRECTORY: &str = "extra_infos";
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
|
||||
/// Fields we need from extra-info document
|
||||
#[derive(Eq, PartialEq, Hash, Serialize, Deserialize)]
|
||||
|
@ -40,182 +28,112 @@ pub struct ExtraInfo {
|
|||
pub bridge_ips: BTreeMap<String, u32>, // TODO: What size for count?
|
||||
}
|
||||
|
||||
fn get_extra_info_or_error(entry: &HashMap<String, String>) -> Result<ExtraInfo, String> {
|
||||
if !entry.contains_key("nickname") || !entry.contains_key("fingerprint") {
|
||||
// How did we get here??
|
||||
return Err("Cannot parse extra-info: Missing nickname or fingerprint".to_string());
|
||||
}
|
||||
if !(entry.contains_key("bridge-stats-end") || entry.contains_key("published"))
|
||||
|| !entry.contains_key("bridge-ips")
|
||||
{
|
||||
// Some extra-infos are missing data on connecting IPs...
|
||||
// But we can't do anything in that case.
|
||||
return Err(format!(
|
||||
"Failed to parse extra-info for {} {}",
|
||||
entry.get("nickname").unwrap(),
|
||||
entry.get("fingerprint").unwrap()
|
||||
));
|
||||
}
|
||||
let nickname = entry.get("nickname").unwrap().to_string();
|
||||
let fingerprint_str = entry.get("fingerprint").unwrap();
|
||||
if fingerprint_str.len() != 40 {
|
||||
return Err("Fingerprint must be 20 bytes".to_string());
|
||||
}
|
||||
let fingerprint = array_bytes::hex2array(fingerprint_str).unwrap();
|
||||
let date: u32 = {
|
||||
let date_str = if entry.contains_key("bridge-stats-end") {
|
||||
let line = entry.get("bridge-stats-end").unwrap();
|
||||
// Parse out (86400 s) from end of line
|
||||
&line[..line.find("(").unwrap() - 1]
|
||||
} else {
|
||||
entry.get("published").unwrap().as_str()
|
||||
};
|
||||
JulianDay::from(
|
||||
DateTime::parse_from_str(&(date_str.to_owned() + " +0000"), "%F %T %z")
|
||||
.unwrap()
|
||||
.date_naive(),
|
||||
)
|
||||
.inner()
|
||||
.try_into()
|
||||
.unwrap()
|
||||
};
|
||||
let bridge_ips_str = entry.get("bridge-ips").unwrap();
|
||||
let mut bridge_ips: BTreeMap<String, u32> = BTreeMap::new();
|
||||
let countries: Vec<&str> = bridge_ips_str.split(',').collect();
|
||||
for country in countries {
|
||||
if country != "" {
|
||||
// bridge-ips may be empty
|
||||
let (cc, count) = country.split_once('=').unwrap();
|
||||
bridge_ips.insert(cc.to_string(), count.parse::<u32>().unwrap());
|
||||
impl ExtraInfo {
|
||||
/// Converts a map of keys and values into an ExtraInfo if all necessary fields
|
||||
/// are represented.
|
||||
fn from_map(entry: &HashMap<String, String>) -> Result<Self, String> {
|
||||
if !entry.contains_key("nickname") || !entry.contains_key("fingerprint") {
|
||||
// How did we get here??
|
||||
return Err("Cannot parse extra-info: Missing nickname or fingerprint".to_string());
|
||||
}
|
||||
if !(entry.contains_key("bridge-stats-end") || entry.contains_key("published"))
|
||||
|| !entry.contains_key("bridge-ips")
|
||||
{
|
||||
// Some extra-infos are missing data on connecting IPs...
|
||||
// But we can't do anything in that case.
|
||||
return Err(format!(
|
||||
"Failed to parse extra-info for {} {}",
|
||||
entry.get("nickname").unwrap(),
|
||||
entry.get("fingerprint").unwrap()
|
||||
));
|
||||
}
|
||||
let nickname = entry.get("nickname").unwrap().to_string();
|
||||
let fingerprint_str = entry.get("fingerprint").unwrap();
|
||||
if fingerprint_str.len() != 40 {
|
||||
return Err("Fingerprint must be 20 bytes".to_string());
|
||||
}
|
||||
let fingerprint = array_bytes::hex2array(fingerprint_str).unwrap();
|
||||
let date: u32 = {
|
||||
let date_str = if entry.contains_key("bridge-stats-end") {
|
||||
let line = entry.get("bridge-stats-end").unwrap();
|
||||
// Parse out (86400 s) from end of line
|
||||
&line[..line.find("(").unwrap() - 1]
|
||||
} else {
|
||||
entry.get("published").unwrap().as_str()
|
||||
};
|
||||
JulianDay::from(
|
||||
DateTime::parse_from_str(&(date_str.to_owned() + " +0000"), "%F %T %z")
|
||||
.unwrap()
|
||||
.date_naive(),
|
||||
)
|
||||
.inner()
|
||||
.try_into()
|
||||
.unwrap()
|
||||
};
|
||||
let bridge_ips_str = entry.get("bridge-ips").unwrap();
|
||||
let mut bridge_ips: BTreeMap<String, u32> = BTreeMap::new();
|
||||
let countries: Vec<&str> = bridge_ips_str.split(',').collect();
|
||||
for country in countries {
|
||||
if country != "" {
|
||||
// bridge-ips may be empty
|
||||
let (cc, count) = country.split_once('=').unwrap();
|
||||
bridge_ips.insert(cc.to_string(), count.parse::<u32>().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
nickname,
|
||||
fingerprint,
|
||||
date,
|
||||
bridge_ips,
|
||||
})
|
||||
}
|
||||
|
||||
Ok(ExtraInfo {
|
||||
nickname,
|
||||
fingerprint,
|
||||
date,
|
||||
bridge_ips,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_extra_infos<'a>(filename: &str, set: &mut HashSet<ExtraInfo>) {
|
||||
let infile = File::open(format!("{}/{}", DIRECTORY, filename)).unwrap();
|
||||
let reader = BufReader::new(infile);
|
||||
|
||||
let mut entry = HashMap::<String, String>::new();
|
||||
for line in reader.lines() {
|
||||
let line = line.unwrap();
|
||||
if line.starts_with("@type bridge-extra-info ") {
|
||||
if !entry.is_empty() {
|
||||
let extra_info = get_extra_info_or_error(&entry);
|
||||
if extra_info.is_ok() {
|
||||
set.insert(extra_info.unwrap());
|
||||
} else {
|
||||
// Just print the error and continue.
|
||||
println!("{}", extra_info.err().unwrap());
|
||||
}
|
||||
entry = HashMap::<String, String>::new();
|
||||
}
|
||||
} else {
|
||||
if line.starts_with("extra-info ") {
|
||||
// extra-info line has format:
|
||||
// extra-info <nickname> <fingerprint>
|
||||
let line_split: Vec<&str> = line.split(' ').collect();
|
||||
if line_split.len() != 3 {
|
||||
println!("Misformed extra-info line");
|
||||
} else {
|
||||
entry.insert("nickname".to_string(), line_split[1].to_string());
|
||||
entry.insert("fingerprint".to_string(), line_split[2].to_string());
|
||||
/// Accepts a downloaded extra-infos file as a big string, returns a set of
|
||||
/// the ExtraInfos represented by the file.
|
||||
pub fn parse_file<'a>(extra_info_str: &str) -> HashSet<Self> {
|
||||
let mut set = HashSet::<Self>::new();
|
||||
let mut entry = HashMap::<String, String>::new();
|
||||
for line in extra_info_str.lines() {
|
||||
let line = line;
|
||||
if line.starts_with("@type bridge-extra-info ") {
|
||||
if !entry.is_empty() {
|
||||
let extra_info = Self::from_map(&entry);
|
||||
if extra_info.is_ok() {
|
||||
set.insert(extra_info.unwrap());
|
||||
} else {
|
||||
// Just print the error and continue.
|
||||
println!("{}", extra_info.err().unwrap());
|
||||
}
|
||||
entry = HashMap::<String, String>::new();
|
||||
}
|
||||
} else {
|
||||
let (key, value) = match line.split_once(' ') {
|
||||
Some((k, v)) => (k, v),
|
||||
None => (line.as_str(), ""),
|
||||
};
|
||||
entry.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Do for the last one
|
||||
let extra_info = get_extra_info_or_error(&entry);
|
||||
if extra_info.is_ok() {
|
||||
set.insert(extra_info.unwrap());
|
||||
} else {
|
||||
println!("{}", extra_info.err().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
/// Download new extra-infos files and save them in DIRECTORY. This function
|
||||
/// returns the set of newly downloaded filenames.
|
||||
pub async fn download_extra_infos(
|
||||
base_url: &str,
|
||||
) -> Result<HashSet<String>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Download directory of recent extra-infos
|
||||
let url = base_url.parse().unwrap();
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots() // TODO: Pin certificate? Is this data signed/verifiable?
|
||||
.expect("no native root CA certificates found")
|
||||
.https_only()
|
||||
.enable_http1()
|
||||
.build();
|
||||
|
||||
let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build(https);
|
||||
|
||||
println!("Downloading {}", base_url);
|
||||
let mut res = client.get(url).await?;
|
||||
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let mut body_str = String::from("");
|
||||
while let Some(next) = res.frame().await {
|
||||
let frame = next?;
|
||||
if let Some(chunk) = frame.data_ref() {
|
||||
body_str.push_str(&String::from_utf8(chunk.to_vec())?);
|
||||
}
|
||||
}
|
||||
|
||||
// Removed because it caused some problem...
|
||||
//let doc = Document::from(body_str.clone().as_str());
|
||||
// Instead, do this
|
||||
let mut links = HashSet::<String>::new();
|
||||
for line in body_str.lines() {
|
||||
let begin_match = "<a href=\"";
|
||||
let end_match = "\">";
|
||||
if line.contains(begin_match) {
|
||||
let link = &line[line.find(begin_match).unwrap() + begin_match.len()..];
|
||||
if link.contains(end_match) {
|
||||
let link = &link[0..link.find(end_match).unwrap()];
|
||||
links.insert(link.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create extra-infos directory if it doesn't exist
|
||||
std::fs::create_dir_all(&DIRECTORY)?;
|
||||
|
||||
let mut new_files = HashSet::<String>::new();
|
||||
|
||||
// Go through all the links in the page and download new files
|
||||
for link in links {
|
||||
if link.ends_with("-extra-infos") {
|
||||
let filename = format!("{}/{}", DIRECTORY, link);
|
||||
|
||||
// Download file if it's not already downloaded
|
||||
if !Path::new(&filename).exists() {
|
||||
let extra_infos_url = format!("{}{}", base_url, link);
|
||||
println!("Downloading {}", extra_infos_url);
|
||||
let mut res = client.get(extra_infos_url.parse().unwrap()).await.unwrap();
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let mut file = std::fs::File::create(filename).unwrap();
|
||||
while let Some(next) = res.frame().await {
|
||||
let frame = next?;
|
||||
if let Some(chunk) = frame.data_ref() {
|
||||
file.write_all(&chunk)?;
|
||||
if line.starts_with("extra-info ") {
|
||||
// extra-info line has format:
|
||||
// extra-info <nickname> <fingerprint>
|
||||
let line_split: Vec<&str> = line.split(' ').collect();
|
||||
if line_split.len() != 3 {
|
||||
println!("Misformed extra-info line");
|
||||
} else {
|
||||
entry.insert("nickname".to_string(), line_split[1].to_string());
|
||||
entry.insert("fingerprint".to_string(), line_split[2].to_string());
|
||||
}
|
||||
} else {
|
||||
let (key, value) = match line.split_once(' ') {
|
||||
Some((k, v)) => (k, v),
|
||||
None => (line, ""),
|
||||
};
|
||||
entry.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
new_files.insert(link.to_string());
|
||||
}
|
||||
}
|
||||
// Do for the last one
|
||||
let extra_info = Self::from_map(&entry);
|
||||
if extra_info.is_ok() {
|
||||
set.insert(extra_info.unwrap());
|
||||
} else {
|
||||
println!("{}", extra_info.err().unwrap());
|
||||
}
|
||||
set
|
||||
}
|
||||
|
||||
Ok(new_files)
|
||||
}
|
||||
|
|
73
src/lib.rs
73
src/lib.rs
|
@ -1,5 +1,9 @@
|
|||
use hyper::{Body, Client, Method, Request};
|
||||
use http::status::StatusCode;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
use hyper::{body::Bytes, Body, Client, Method, Request};
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use lazy_static::lazy_static;
|
||||
//use select::{document::Document, predicate::Name};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sled::Db;
|
||||
use std::{
|
||||
|
@ -177,6 +181,31 @@ pub fn add_bridge_to_db(db: &Db, fingerprint: [u8; 20]) {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
// Download a webpage and return it as a string
|
||||
pub async fn download(url: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let https = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.expect("no native root CA certificates found")
|
||||
.https_only()
|
||||
.enable_http1()
|
||||
.build();
|
||||
|
||||
let client: hyper_util::client::legacy::Client<_, Empty<Bytes>> =
|
||||
hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https);
|
||||
|
||||
println!("Downloading {}", url);
|
||||
let mut res = client.get(url.parse()?).await?;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let mut body_str = String::default();
|
||||
while let Some(next) = res.frame().await {
|
||||
let frame = next?;
|
||||
if let Some(chunk) = frame.data_ref() {
|
||||
body_str.push_str(&String::from_utf8(chunk.to_vec())?);
|
||||
}
|
||||
}
|
||||
Ok(body_str)
|
||||
}
|
||||
|
||||
// Process extra-infos
|
||||
|
||||
/// Adds the extra-info data for a single bridge to the database. If the
|
||||
|
@ -222,7 +251,10 @@ pub fn add_extra_info_to_db(db: &Db, extra_info: ExtraInfo) {
|
|||
}
|
||||
|
||||
/// Download new extra-infos files and add their data to the database
|
||||
pub async fn update_extra_infos(db: &Db, base_url: &str) {
|
||||
pub async fn update_extra_infos(
|
||||
db: &Db,
|
||||
base_url: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Track which files have been processed. This is slightly redundant
|
||||
// because we're only downloading files we don't already have, but it
|
||||
// might be a good idea to check in case we downloaded a file but didn't
|
||||
|
@ -232,14 +264,38 @@ pub async fn update_extra_infos(db: &Db, base_url: &str) {
|
|||
None => HashSet::<String>::new(),
|
||||
};
|
||||
|
||||
let new_files = extra_info::download_extra_infos(base_url).await.unwrap();
|
||||
let dir_page = download(base_url).await?;
|
||||
|
||||
// Causes Send issues, so use solution below instead
|
||||
//let doc = Document::from(dir_page.as_str());
|
||||
//let links = doc.find(Name("a")).filter_map(|n| n.attr("href"));
|
||||
|
||||
// Alternative, less robust solution
|
||||
let mut links = HashSet::<String>::new();
|
||||
for line in dir_page.lines() {
|
||||
let begin_match = "<a href=\"";
|
||||
let end_match = "\">";
|
||||
if line.contains(begin_match) {
|
||||
let link = &line[line.find(begin_match).unwrap() + begin_match.len()..];
|
||||
if link.contains(end_match) {
|
||||
let link = &link[0..link.find(end_match).unwrap()];
|
||||
links.insert(link.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_extra_infos = HashSet::<ExtraInfo>::new();
|
||||
|
||||
// Make set of new extra-infos
|
||||
for extra_info_file in &new_files {
|
||||
extra_info::add_extra_infos(&extra_info_file, &mut new_extra_infos);
|
||||
processed_extra_infos_files.insert(extra_info_file.to_string());
|
||||
// We should now have an iterable collection of links to consider downloading.
|
||||
for link in links {
|
||||
if link.ends_with("-extra-infos") && !processed_extra_infos_files.contains(&link) {
|
||||
let extra_infos_url = format!("{}{}", base_url, link);
|
||||
let extra_info_str = download(&extra_infos_url).await?;
|
||||
//ExtraInfo::parse_file(&extra_info_str, &mut new_extra_infos);
|
||||
let extra_infos = ExtraInfo::parse_file(&extra_info_str);
|
||||
new_extra_infos.extend(extra_infos);
|
||||
processed_extra_infos_files.insert(link);
|
||||
}
|
||||
}
|
||||
|
||||
// Add new extra-infos data to database
|
||||
|
@ -247,11 +303,14 @@ pub async fn update_extra_infos(db: &Db, base_url: &str) {
|
|||
add_extra_info_to_db(&db, extra_info);
|
||||
}
|
||||
|
||||
// Store which files we've already downloaded and processed
|
||||
db.insert(
|
||||
b"extra_infos_files",
|
||||
bincode::serialize(&processed_extra_infos_files).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Process negative reports
|
||||
|
|
|
@ -162,7 +162,8 @@ async fn test_extra_infos() {
|
|||
&db,
|
||||
"https://collector.torproject.org/recent/bridge-descriptors/extra-infos/",
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Check that DB contains information on a bridge with high uptime
|
||||
assert!(db.contains_key("bridges").unwrap());
|
||||
|
|
Loading…
Reference in New Issue