Compare commits
3 Commits
1ccd676e5c
...
d933b3b94b
Author | SHA1 | Date |
---|---|---|
|
d933b3b94b | |
|
32d68893a7 | |
|
7481fe10f9 |
|
@ -21,7 +21,6 @@ hyper-util = { version = "0.1", features = ["full"] }
|
|||
julianday = "1.2.0"
|
||||
lazy_static = "1"
|
||||
lox-library = { git = "https://gitlab.torproject.org/vecna/lox.git", version = "0.1.0" }
|
||||
select = "0.6.0"
|
||||
serde = "1.0.197"
|
||||
serde_json = "1.0"
|
||||
serde_with = {version = "3.7.0", features = ["json"]}
|
||||
|
@ -30,3 +29,4 @@ sha3 = "0.10"
|
|||
sled = "0.34.7"
|
||||
time = "0.3.30"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-cron = "0.1.2"
|
||||
|
|
|
@ -18,6 +18,7 @@ use tokio::{
|
|||
sync::{broadcast, mpsc, oneshot},
|
||||
time::sleep,
|
||||
};
|
||||
use tokio_cron::{Job, Scheduler};
|
||||
|
||||
async fn shutdown_signal() {
|
||||
tokio::signal::ctrl_c()
|
||||
|
@ -65,18 +66,27 @@ async fn update_daily_info(db: &Db, distributors: &BTreeMap<BridgeDistributor, S
|
|||
report_blockages(&distributors, new_blockages).await;
|
||||
}
|
||||
|
||||
async fn run_updater(updater_tx: mpsc::Sender<Command>) {
|
||||
updater_tx.send(Command::Update {}).await.unwrap();
|
||||
}
|
||||
|
||||
async fn create_context_manager(
|
||||
db_config: DbConfig,
|
||||
distributors: BTreeMap<BridgeDistributor, String>,
|
||||
context_rx: mpsc::Receiver<Command>,
|
||||
mut kill: broadcast::Receiver<()>,
|
||||
) {
|
||||
tokio::select! {
|
||||
create_context = context_manager(db_config, context_rx) => create_context,
|
||||
create_context = context_manager(db_config, distributors, context_rx) => create_context,
|
||||
_ = kill.recv() => {println!("Shut down manager");},
|
||||
}
|
||||
}
|
||||
|
||||
async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver<Command>) {
|
||||
async fn context_manager(
|
||||
db_config: DbConfig,
|
||||
distributors: BTreeMap<BridgeDistributor, String>,
|
||||
mut context_rx: mpsc::Receiver<Command>,
|
||||
) {
|
||||
let db: Db = sled::open(&db_config.db_path).unwrap();
|
||||
|
||||
while let Some(cmd) = context_rx.recv().await {
|
||||
|
@ -94,6 +104,9 @@ async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver<Com
|
|||
drop(shutdown_sig);
|
||||
println!("Shutdown Sent.");
|
||||
}
|
||||
Update {} => {
|
||||
update_daily_info(&db, &distributors).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -108,6 +121,7 @@ enum Command {
|
|||
Shutdown {
|
||||
shutdown_sig: broadcast::Sender<()>,
|
||||
},
|
||||
Update {},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -128,11 +142,14 @@ async fn main() {
|
|||
|
||||
let (request_tx, request_rx) = mpsc::channel(32);
|
||||
|
||||
let updater_tx = request_tx.clone();
|
||||
let shutdown_cmd_tx = request_tx.clone();
|
||||
|
||||
// create the shutdown broadcast channel and clone for every thread
|
||||
let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
|
||||
let kill = shutdown_tx.subscribe();
|
||||
// TODO: Gracefully shut down updater
|
||||
let kill_updater = shutdown_tx.subscribe();
|
||||
|
||||
// Listen for ctrl_c, send signal to broadcast shutdown to all threads by dropping shutdown_tx
|
||||
let shutdown_handler = spawn(async move {
|
||||
|
@ -149,8 +166,17 @@ async fn main() {
|
|||
}
|
||||
});
|
||||
|
||||
let context_manager =
|
||||
spawn(async move { create_context_manager(config.db, request_rx, kill).await });
|
||||
let updater = spawn(async move {
|
||||
// Run updater once per day
|
||||
let mut sched = Scheduler::utc();
|
||||
sched.add(Job::new("* * 22 * * * *", move || {
|
||||
run_updater(updater_tx.clone())
|
||||
}));
|
||||
});
|
||||
|
||||
let context_manager = spawn(async move {
|
||||
create_context_manager(config.db, config.distributors, request_rx, kill).await
|
||||
});
|
||||
|
||||
let make_service = make_service_fn(move |_conn: &AddrStream| {
|
||||
let request_tx = request_tx.clone();
|
||||
|
@ -176,5 +202,5 @@ async fn main() {
|
|||
if let Err(e) = graceful.await {
|
||||
eprintln!("server error: {}", e);
|
||||
}
|
||||
future::join_all([context_manager, shutdown_handler]).await;
|
||||
future::join_all([context_manager, updater, shutdown_handler]).await;
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ use http_body_util::{BodyExt, Empty};
|
|||
use hyper::body::Bytes;
|
||||
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
|
||||
use julianday::JulianDay;
|
||||
use select::{document::Document, predicate::Name};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
|
@ -174,7 +173,21 @@ pub async fn download_extra_infos(
|
|||
}
|
||||
}
|
||||
|
||||
let doc = Document::from(body_str.as_str());
|
||||
// Removed because it caused some problem...
|
||||
//let doc = Document::from(body_str.clone().as_str());
|
||||
// Instead, do this
|
||||
let mut links = HashSet::<String>::new();
|
||||
for line in body_str.lines() {
|
||||
let begin_match = "<a href=\"";
|
||||
let end_match = "\">";
|
||||
if line.contains(begin_match) {
|
||||
let link = &line[line.find(begin_match).unwrap() + begin_match.len()..];
|
||||
if link.contains(end_match) {
|
||||
let link = &link[0..link.find(end_match).unwrap()];
|
||||
links.insert(link.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create extra-infos directory if it doesn't exist
|
||||
std::fs::create_dir_all(&DIRECTORY)?;
|
||||
|
@ -182,7 +195,6 @@ pub async fn download_extra_infos(
|
|||
let mut new_files = HashSet::<String>::new();
|
||||
|
||||
// Go through all the links in the page and download new files
|
||||
let links = doc.find(Name("a")).filter_map(|n| n.attr("href"));
|
||||
for link in links {
|
||||
if link.ends_with("-extra-infos") {
|
||||
let filename = format!("{}/{}", DIRECTORY, link);
|
||||
|
@ -191,7 +203,7 @@ pub async fn download_extra_infos(
|
|||
if !Path::new(&filename).exists() {
|
||||
let extra_infos_url = format!("{}{}", base_url, link);
|
||||
println!("Downloading {}", extra_infos_url);
|
||||
let mut res = client.get(extra_infos_url.parse().unwrap()).await?;
|
||||
let mut res = client.get(extra_infos_url.parse().unwrap()).await.unwrap();
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let mut file = std::fs::File::create(filename).unwrap();
|
||||
while let Some(next) = res.frame().await {
|
||||
|
|
Loading…
Reference in New Issue