From 895623a2a2e36bf7fac07c17c4f16078e3be0390 Mon Sep 17 00:00:00 2001 From: Vecna Date: Fri, 24 May 2024 21:35:05 -0400 Subject: [PATCH] Set up daily updater to be controlled by simulation --- config.json | 3 ++ src/bin/server.rs | 103 +++++++++++++++++++++++++++++++++-------- src/request_handler.rs | 2 +- 3 files changed, 87 insertions(+), 21 deletions(-) diff --git a/config.json b/config.json index 1bb922e..9cafea3 100644 --- a/config.json +++ b/config.json @@ -10,7 +10,10 @@ "confidence": 0.95, "max_threshold": 5, "scaling_factor": 0.25, + "min_historical_days": 30, + "max_historical_days": 30, "port": 8003, "require_bridge_token": false, + "updater_port": 8123, "updater_schedule": "* * 22 * * * *" } diff --git a/src/bin/server.rs b/src/bin/server.rs index 184619d..9083443 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,7 +1,8 @@ -use troll_patrol::{request_handler::handle, *}; +use troll_patrol::{request_handler::*, *}; use clap::Parser; use futures::future; +use futures::join; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, @@ -10,14 +11,20 @@ use hyper::{ use serde::Deserialize; use sled::Db; use std::{ - collections::BTreeMap, convert::Infallible, fs::File, io::BufReader, net::SocketAddr, - path::PathBuf, time::Duration, + collections::{BTreeMap, HashMap, HashSet}, + convert::Infallible, + fs::File, + io::BufReader, + net::SocketAddr, + path::PathBuf, + time::Duration, }; use tokio::{ signal, spawn, sync::{broadcast, mpsc, oneshot}, time::sleep, }; +#[cfg(not(features = "simulation"))] use tokio_cron::{Job, Scheduler}; async fn shutdown_signal() { @@ -60,6 +67,7 @@ pub struct Config { //require_bridge_token: bool, port: u16, + updater_port: u16, updater_schedule: String, } @@ -86,7 +94,7 @@ async fn update_daily_info( scaling_factor: f64, min_historical_days: u32, max_historical_days: u32, -) { +) -> HashMap<[u8; 20], HashSet> { update_extra_infos(&db, &extra_infos_base_url) .await .unwrap(); @@ -99,15 +107,22 @@ async fn update_daily_info( min_historical_days, max_historical_days, ); - report_blockages(&distributors, new_blockages).await; + report_blockages(&distributors, new_blockages.clone()).await; // Generate tomorrow's key if we don't already have it new_negative_report_key(&db, get_date() + 1); + + // Return new detected blockages + new_blockages } +/* async fn run_updater(updater_tx: mpsc::Sender) { - updater_tx.send(Command::Update {}).await.unwrap(); + updater_tx.send(Command::Update { + + }).await.unwrap(); } +*/ async fn create_context_manager( db_config: DbConfig, @@ -155,8 +170,8 @@ async fn context_manager( drop(shutdown_sig); println!("Shutdown Sent."); } - Update {} => { - update_daily_info( + Update { _req, sender } => { + let blockages = update_daily_info( &db, &distributors, &extra_infos_base_url, @@ -167,6 +182,23 @@ async fn context_manager( max_historical_days, ) .await; + let response = if cfg!(feature = "simulation") { + // Convert map keys from [u8; 20] to 40-character hex strings + let mut blockages_str = HashMap::>::new(); + for (fingerprint, countries) in blockages { + let fpr_string = array_bytes::bytes2hex("", fingerprint); + blockages_str.insert(fpr_string, countries); + } + Ok(prepare_header( + serde_json::to_string(&blockages_str).unwrap(), + )) + } else { + Ok(prepare_header("OK".to_string())) + }; + if let Err(e) = sender.send(response) { + eprintln!("Update Response Error: {:?}", e); + }; + sleep(Duration::from_millis(1)).await; } } } @@ -182,7 +214,10 @@ enum Command { Shutdown { shutdown_sig: broadcast::Sender<()>, }, - Update {}, + Update { + _req: Request, + sender: oneshot::Sender, Infallible>>, + }, } #[tokio::main] @@ -218,14 +253,17 @@ async fn main() { } }); - let updater = spawn(async move { - // Run updater once per day - let mut sched = Scheduler::utc(); - sched.add(Job::new(config.updater_schedule, move || { - run_updater(updater_tx.clone()) - })); - }); - + // TODO: Reintroduce this + /* + #[cfg(not(feature = "simulation"))] + let updater = spawn(async move { + // Run updater once per day + let mut sched = Scheduler::utc(); + sched.add(Job::new(config.updater_schedule, move || { + run_updater(updater_tx.clone()) + })); + }); + */ let context_manager = spawn(async move { create_context_manager( config.db, @@ -259,12 +297,37 @@ async fn main() { async move { Ok::<_, Infallible>(service) } }); + let updater_make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = updater_tx.clone(); + let service = service_fn(move |_req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Update { + _req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); let server = Server::bind(&addr).serve(make_service); let graceful = server.with_graceful_shutdown(shutdown_signal()); + let updater_addr = SocketAddr::from(([127, 0, 0, 1], config.updater_port)); + let updater_server = Server::bind(&updater_addr).serve(updater_make_service); + let updater_graceful = updater_server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", addr); - if let Err(e) = graceful.await { - eprintln!("server error: {}", e); + println!("Updater listening on {}", updater_addr); + let (a, b) = join!(graceful, updater_graceful); + if a.is_err() { + eprintln!("server error: {}", a.unwrap_err()); } - future::join_all([context_manager, updater, shutdown_handler]).await; + if b.is_err() { + eprintln!("server error: {}", b.unwrap_err()); + } + future::join_all([context_manager, shutdown_handler]).await; } diff --git a/src/request_handler.rs b/src/request_handler.rs index 8f23739..3d7ea7c 100644 --- a/src/request_handler.rs +++ b/src/request_handler.rs @@ -76,7 +76,7 @@ pub async fn handle(db: &Db, req: Request) -> Result, Infal } // Prepare HTTP Response for successful Server Request -fn prepare_header(response: String) -> Response { +pub fn prepare_header(response: String) -> Response { let mut resp = Response::new(Body::from(response)); resp.headers_mut() .insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));