troll-patrol/src/simulation/extra_infos_server.rs

164 lines
5.3 KiB
Rust

use crate::extra_info::ExtraInfo;
use hyper::{
body::{self, Bytes},
header::HeaderValue,
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use serde_json::json;
use std::{collections::HashSet, convert::Infallible, net::SocketAddr, time::Duration};
use tokio::{
spawn,
sync::{broadcast, mpsc, oneshot},
time::sleep,
};
async fn serve_extra_infos(
extra_infos_pages: &mut Vec<String>,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
match req.method() {
&Method::OPTIONS => Ok(Response::builder()
.header("Access-Control-Allow-Origin", HeaderValue::from_static("*"))
.header("Access-Control-Allow-Headers", "accept, content-type")
.header("Access-Control-Allow-Methods", "POST")
.status(200)
.body(Body::from("Allow POST"))
.unwrap()),
_ => match req.uri().path() {
"/" => Ok::<_, Infallible>(serve_index(&extra_infos_pages)),
"/add" => Ok::<_, Infallible>({
let bytes = body::to_bytes(req.into_body()).await.unwrap();
add_extra_infos(extra_infos_pages, bytes)
}),
path => Ok::<_, Infallible>({
// Serve the requested file
serve_extra_infos_file(&extra_infos_pages, path)
}),
},
}
}
pub async fn server() {
let (context_tx, context_rx) = mpsc::channel(32);
let request_tx = context_tx.clone();
let shutdown_cmd_tx = context_tx.clone();
let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
let kill_context = shutdown_tx.subscribe();
let context_manager =
spawn(async move { create_context_manager(context_rx, kill_context).await });
let addr = SocketAddr::from(([127, 0, 0, 1], 8004));
let make_svc = make_service_fn(move |_conn: &AddrStream| {
let request_tx = request_tx.clone();
let service = service_fn(move |req| {
let request_tx = request_tx.clone();
let (response_tx, response_rx) = oneshot::channel();
let cmd = Command::Request {
req,
sender: response_tx,
};
async move {
request_tx.send(cmd).await.unwrap();
response_rx.await.unwrap()
}
});
async move { Ok::<_, Infallible>(service) }
});
let server = Server::bind(&addr).serve(make_svc);
println!("Listening on localhost:8004");
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
async fn create_context_manager(
context_rx: mpsc::Receiver<Command>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
create_context = context_manager(context_rx) => create_context,
_ = kill.recv() => {println!("Shut down context_manager");},
}
}
async fn context_manager(mut context_rx: mpsc::Receiver<Command>) {
let mut extra_infos_pages = Vec::<String>::new();
while let Some(cmd) = context_rx.recv().await {
use Command::*;
match cmd {
Request { req, sender } => {
let response = serve_extra_infos(&mut extra_infos_pages, req).await;
if let Err(e) = sender.send(response) {
eprintln!("Server Response Error: {:?}", e);
}
sleep(Duration::from_millis(1)).await;
}
Shutdown { shutdown_sig } => {
drop(shutdown_sig);
}
}
}
}
#[derive(Debug)]
enum Command {
Request {
req: Request<Body>,
sender: oneshot::Sender<Result<Response<Body>, Infallible>>,
},
Shutdown {
shutdown_sig: broadcast::Sender<()>,
},
}
fn add_extra_infos(extra_infos_pages: &mut Vec<String>, request: Bytes) -> Response<Body> {
let extra_infos: HashSet<ExtraInfo> = match serde_json::from_slice(&request) {
Ok(req) => req,
Err(e) => {
let response = json!({"error": e.to_string()});
let val = serde_json::to_string(&response).unwrap();
return prepare_header(val);
}
};
let mut extra_infos_file = String::new();
for extra_info in extra_infos {
extra_infos_file.push_str(extra_info.to_string().as_str());
}
extra_infos_pages.push(extra_infos_file);
prepare_header("OK".to_string())
}
fn serve_index(extra_infos_pages: &Vec<String>) -> Response<Body> {
let mut body_str = String::new();
for i in 0..extra_infos_pages.len() {
body_str
.push_str(format!("<a href=\"{}-extra-infos\">{}-extra-infos</a>\n", i, i).as_str());
}
prepare_header(body_str)
}
fn serve_extra_infos_file(extra_infos_pages: &Vec<String>, path: &str) -> Response<Body> {
if path.ends_with("-extra-infos") {
if let Ok(index) = &path[1..(path.len() - "-extra-infos".len())].parse::<usize>() {
if extra_infos_pages.len() > *index {
return prepare_header(extra_infos_pages[*index].clone());
}
}
}
prepare_header("Not a valid file".to_string())
}
// Prepare HTTP Response for successful Server Request
fn prepare_header(response: String) -> Response<Body> {
let mut resp = Response::new(Body::from(response));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
resp
}