diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 669d096..78b3e1e 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -4,6 +4,7 @@ }, "lox_authority_port": 8001, + "test_port": 8005, "troll_patrol_port": 8002, "metrics_port": 5222, "bridge_config": { diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 9808f23..25f3fb3 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -3,7 +3,7 @@ use clap::Parser; use curve25519_dalek::ristretto::RistrettoBasepointTable; -use futures::future; +use futures::{future, join}; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, @@ -33,6 +33,8 @@ mod request_handler; use request_handler::handle; mod resource_parser; use resource_parser::{parse_into_bridgelines, parse_into_buckets}; +mod test_handler; +use test_handler::handle as test_handle; mod troll_patrol_handler; use troll_patrol_handler::handle as tp_handle; @@ -71,6 +73,7 @@ struct Config { db: DbConfig, metrics_port: u16, lox_authority_port: u16, + test_port: u16, troll_patrol_port: u16, bridge_config: BridgeConfig, rtype: ResourceInfo, @@ -263,6 +266,14 @@ async fn context_manager( lox_db.write_context(context.clone()); sleep(Duration::from_millis(1)).await; } + TestRequest { req, sender } => { + let response = test_handle(context.clone(), req).await; + if let Err(e) = sender.send(response) { + eprintln!("Server Response Error: {:?}", e); + }; + lox_db.write_context(context.clone()); + sleep(Duration::from_millis(1)).await; + } TpRequest { req, sender } => { let response = tp_handle(context.clone(), &mut Htables, req).await; if let Err(e) = sender.send(response) { @@ -291,6 +302,10 @@ enum Command { req: Request, sender: oneshot::Sender, Infallible>>, }, + TestRequest { + req: Request, + sender: oneshot::Sender, Infallible>>, + }, TpRequest { req: Request, sender: oneshot::Sender, Infallible>>, @@ -311,6 +326,7 @@ async fn main() { let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); + let test_request_tx = rdsys_tx.clone(); let tp_request_tx = rdsys_tx.clone(); let shutdown_cmd_tx = rdsys_tx.clone(); @@ -385,6 +401,23 @@ async fn main() { async move { Ok::<_, Infallible>(service) } }); + let test_make_service = make_service_fn(move |_conn: &AddrStream| { + let request_tx = test_request_tx.clone(); + let service = service_fn(move |req| { + let request_tx = request_tx.clone(); + let (response_tx, response_rx) = oneshot::channel(); + let cmd = Command::Request { + req, + sender: response_tx, + }; + async move { + request_tx.send(cmd).await.unwrap(); + response_rx.await.unwrap() + } + }); + async move { Ok::<_, Infallible>(service) } + }); + let tp_make_service = make_service_fn(move |_conn: &AddrStream| { let request_tx = tp_request_tx.clone(); let service = service_fn(move |req| { @@ -406,19 +439,27 @@ async fn main() { let pub_addr = SocketAddr::from(([127, 0, 0, 1], config.lox_authority_port)); let server = Server::bind(&pub_addr).serve(make_service); let graceful = server.with_graceful_shutdown(shutdown_signal()); + // Address for test commands + let test_addr = SocketAddr::from(([127, 0, 0, 1], config.test_port)); + let test_server = Server::bind(&test_addr).serve(test_make_service); + let test_graceful = test_server.with_graceful_shutdown(shutdown_signal()); // Address for connections from Troll Patrol let tp_addr = SocketAddr::from(([127, 0, 0, 1], config.troll_patrol_port)); let tp_server = Server::bind(&tp_addr).serve(tp_make_service); let tp_graceful = tp_server.with_graceful_shutdown(shutdown_signal()); println!("Listening on {}", pub_addr); + println!("Listening on {}", test_addr); println!("Listening on {}", tp_addr); - let (a, b) = future::join(graceful, tp_graceful).await; + let (a, b, c) = join!(graceful, test_graceful, tp_graceful); if a.is_err() { eprintln!("server error: {}", a.unwrap_err()); } if b.is_err() { eprintln!("server error: {}", b.unwrap_err()); } + if c.is_err() { + eprintln!("server error: {}", c.unwrap_err()); + } future::join_all([ metrics_handler, rdsys_request_handler, diff --git a/crates/lox-distributor/src/request_handler.rs b/crates/lox-distributor/src/request_handler.rs index 3f9a8ed..f58969d 100644 --- a/crates/lox-distributor/src/request_handler.rs +++ b/crates/lox-distributor/src/request_handler.rs @@ -56,11 +56,6 @@ pub async fn handle( let bytes = body::to_bytes(req.into_body()).await.unwrap(); cloned_context.verify_and_send_blockage_migration(bytes) }), - //#[cfg(test)] - (&Method::POST, "/advancedays") => Ok::<_, Infallible>({ - let bytes = body::to_bytes(req.into_body()).await.unwrap(); - cloned_context.advance_days_with_response_test(bytes) - }), _ => { // Return 404 not found response. Ok(Response::builder() diff --git a/crates/lox-distributor/src/test_handler.rs b/crates/lox-distributor/src/test_handler.rs new file mode 100644 index 0000000..2665ad0 --- /dev/null +++ b/crates/lox-distributor/src/test_handler.rs @@ -0,0 +1,32 @@ +use crate::lox_context::LoxServerContext; +use hyper::{body, header::HeaderValue, Body, Method, Request, Response, StatusCode}; +use std::convert::Infallible; + +// Handle requests used in testing +pub async fn handle( + cloned_context: LoxServerContext, + req: Request, +) -> Result, Infallible> { + match req.method() { + &Method::OPTIONS => Ok(Response::builder() + .header("Access-Control-Allow-Origin", HeaderValue::from_static("*")) + .header("Access-Control-Allow-Headers", "accept, content-type") + .header("Access-Control-Allow-Methods", "POST") + .status(200) + .body(Body::from("Allow POST")) + .unwrap()), + _ => match (req.method(), req.uri().path()) { + (&Method::POST, "/advancedays") => Ok::<_, Infallible>({ + let bytes = body::to_bytes(req.into_body()).await.unwrap(); + cloned_context.advance_days_with_response_test(bytes) + }), + _ => { + // Return 404 not found response. + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found")) + .unwrap()) + } + }, + } +}