From ffb05d403a1205e570b028ce566c217a28d38cb4 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Thu, 3 Aug 2023 18:22:39 -0400 Subject: [PATCH] Fix up database handling --- crates/lox-distributor/config.json | 4 +- crates/lox-distributor/src/db_handler.rs | 105 ++++++++++++++++++++++ crates/lox-distributor/src/file_reader.rs | 47 ---------- crates/lox-distributor/src/main.rs | 91 ++++++++++--------- 4 files changed, 155 insertions(+), 92 deletions(-) create mode 100644 crates/lox-distributor/src/db_handler.rs delete mode 100644 crates/lox-distributor/src/file_reader.rs diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 4e070bc..d73ecb9 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -1,5 +1,7 @@ { - "db_path": "lox_db", + "db": { + "db_path": "lox_db" + }, "rtype": { "endpoint": "http://127.0.0.1:7100/resource-stream", "name": "https", diff --git a/crates/lox-distributor/src/db_handler.rs b/crates/lox-distributor/src/db_handler.rs new file mode 100644 index 0000000..040ef29 --- /dev/null +++ b/crates/lox-distributor/src/db_handler.rs @@ -0,0 +1,105 @@ +use std::sync::{Arc, Mutex}; + +use crate::{lox_context, DbConfig}; +use chrono::prelude::*; +use lox_library::{BridgeAuth, BridgeDb}; +use sled::IVec; + +pub fn write_context_to_db(db: sled::Db, context: lox_context::LoxServerContext) { + let date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string(); + let json_date = serde_json::to_vec(&date).unwrap(); + let json_result = serde_json::to_vec(&context).unwrap(); + let _ = db.insert(IVec::from(json_date), IVec::from(json_result)); +} + +pub fn find_existing_db( + db_config: DbConfig, +) -> Result<(sled::Db, lox_context::LoxServerContext), sled::Error> { + let context: lox_context::LoxServerContext; + let (lox_db, context) = match sled::open(db_config.db_path) { + Ok(lox_db) => { + // Check if the lox_db already exists + if lox_db.was_recovered() { + // Check if there is a roll back date and try to choose the appropriate context + // to rollback to, otherwise, take the last saved context + match db_config.roll_back_date { + // If roll back date has been specified, either the exact date or range should be set + Some(roll_back_date) => { + // If the exact date is specified and it's in the database, use that to populate the context + match roll_back_date.exact_date { + Some(exact_date) => { + match lox_db.contains_key(exact_date.clone()) { + // Find exact date/time in db and use the context from that date. + Ok(_) => { + let ivec_context = lox_db.get(exact_date).unwrap().unwrap(); + context = serde_json::from_slice(&ivec_context).unwrap(); + } + // If the entered date is not found, use the last saved context + Err(_) => { + println!("UNEXPECTED DATE: Specified exact date not found in lox db! Using last saved context"); + context = use_last_context(lox_db.clone()); + } + } + } + // Otherwise check that the start of a range has been specified + None => { + match roll_back_date.date_range_start { + Some(start) => { + // If so, ensure the end of the range has also been specified + if let Some(end) = roll_back_date.date_range_end { + let start_json: Vec = + serde_json::to_vec(&start).unwrap(); + let end_json: Vec = + serde_json::to_vec(&end).unwrap(); + let r: sled::Iter = lox_db.range( + IVec::from(start_json)..IVec::from(end_json), + ); + let ivec_context = r.last().unwrap().unwrap().1; + context = + serde_json::from_slice(&ivec_context).unwrap(); + } else { + println!("UNEXPECTED DATE: Start of range was specified but End of range was not! Using last saved context"); + context = use_last_context(lox_db.clone()); + } + } + // If not, roll_back_date should not have been indicated but it was! So just use the last + // context and print that this is happening. + None => { + // If exact date and rollback range are blank, use the last saved context + println!("UNEXPECTED DATE: No date specified despite indicating rollback! Using last saved context"); + context = use_last_context(lox_db.clone()); + } + } + } + } + } + // Use the last entry to populate the Lox context if no rollback date is set (which should be most common) + None => { + context = use_last_context(lox_db.clone()); + } + } + //Otherwise, create a new Lox context + } else { + let new_db = BridgeDb::new(); + let new_ba = BridgeAuth::new(new_db.pubkey); + context = lox_context::LoxServerContext { + db: Arc::new(Mutex::new(new_db)), + ba: Arc::new(Mutex::new(new_ba)), + extra_bridges: Arc::new(Mutex::new(Vec::new())), + to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())), + } + } + (lox_db, context) + } + Err(e) => { + panic!("Unable to read or create lox database! {:?}", e); + } + }; + Ok((lox_db, context)) +} + +fn use_last_context(cloned_db: sled::Db) -> lox_context::LoxServerContext { + let ivec_context = cloned_db.last().unwrap().unwrap().1; + println!("Using db date: {:?}", ivec_context); + serde_json::from_slice(&ivec_context).unwrap() +} diff --git a/crates/lox-distributor/src/file_reader.rs b/crates/lox-distributor/src/file_reader.rs deleted file mode 100644 index 2b98554..0000000 --- a/crates/lox-distributor/src/file_reader.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::lox_context; -use chrono::prelude::*; -use sled::IVec; -use std::{ - env, - error::Error, - fs::{DirEntry, File}, - io::BufReader, - path::Path, -}; - -pub fn read_context_from_file>( - path: P, -) -> Result> { - let file = File::open(path)?; - let reader = BufReader::new(file); - let context = serde_json::from_reader(reader)?; - Ok(context) -} - -pub fn write_context_to_db(db: sled::Db, context: lox_context::LoxServerContext) { - let date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string(); - let json_date = serde_json::to_vec(&date).unwrap(); - let json_result = serde_json::to_vec(&context).unwrap(); - let _ = db.insert(IVec::from(json_date), IVec::from(json_result)); -} - -pub fn write_context_to_file(context: lox_context::LoxServerContext) { - let mut date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string(); - let path = "_lox.json"; - date.push_str(path); - let file = File::create(&date) - .expect(format!("Unable to write to db file: {:?} !", stringify!($date)).as_str()); - let _ = serde_json::to_writer(file, &context); -} - -pub fn check_file_exists() -> Option { - let current_path = env::current_dir().expect("Unable to access current dir"); - std::fs::read_dir(current_path) - .expect("Couldn't read local directory") - .flatten() // Remove failed - .filter(|f| { - f.metadata().unwrap().is_file() - && (f.file_name().into_string().unwrap().contains("_lox.json")) - }) // Filter out directories (only consider files) - .max_by_key(|x| x.metadata().unwrap().modified().unwrap()) -} diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 34df0b3..19a73ad 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -8,23 +8,16 @@ use hyper::{ Body, Request, Response, Server, }; use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; -use lox_library::{BridgeAuth, BridgeDb}; use rdsys_backend::{proto::ResourceDiff, start_stream}; use serde::Deserialize; use std::{ - convert::Infallible, - fs::File, - io::BufReader, - net::SocketAddr, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, + convert::Infallible, fs::File, io::BufReader, net::SocketAddr, path::PathBuf, time::Duration, }; -mod file_reader; -use file_reader::write_context_to_db; +mod db_handler; +use db_handler::{find_existing_db, write_context_to_db}; mod lox_context; mod request_handler; use request_handler::handle; @@ -60,10 +53,41 @@ struct Args { #[derive(Debug, Deserialize)] struct Config { - db_path: String, + db: DbConfig, rtype: ResourceInfo, - } + +#[derive(Debug, Deserialize)] +pub struct DbConfig { + // The path for the lox_context database, default is "lox_db" + db_path: String, + // Optional Date/time to roll back to + // This should be blank for normal operation and only changed if the lox_context + // should be rolled back to a previous state + roll_back_date: Option, +} + +impl Default for DbConfig { + fn default() -> DbConfig { + DbConfig { + db_path: "lox_db".to_owned(), + roll_back_date: None, + } + } +} +#[derive(Debug, Deserialize)] +pub struct RollBackDate { + // Optional exact Date/time to roll back to as a %Y-%m-%d_%H:%M:%S string + // This should only be changed if the lox_context should be rolled back to a + // previous state and the exact roll back date/time is known + exact_date: Option, + // Since the exact time the database last saved the context may not be obvious, + // this can be presented as a range from which the most recent date in the database will be + // selected + date_range_start: Option, + date_range_end: Option, +} + #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, @@ -121,12 +145,12 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { - create_context = context_manager(backup_context_path, context_rx) => create_context, + create_context = context_manager(db_config, context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager");}, } } @@ -134,31 +158,11 @@ async fn create_context_manager( // Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state -async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver) { - let context: lox_context::LoxServerContext; - let existing_db = match sled::open(db_path) { - Ok(lox_db) => { - // Check if the lox_db already exists - if lox_db.was_recovered() { - // And use the last entry to populate the Lox context if so - // TODO add functionality to specify the key or roll back to a previous time - let ivec_context = lox_db.last().unwrap().unwrap().1; - context = serde_json::from_slice(&ivec_context).unwrap(); - //Otherwise, create a new Lox context - } else { - let new_db = BridgeDb::new(); - let new_ba = BridgeAuth::new(new_db.pubkey); - context = lox_context::LoxServerContext { - db: Arc::new(Mutex::new(new_db)), - ba: Arc::new(Mutex::new(new_ba)), - extra_bridges: Arc::new(Mutex::new(Vec::new())), - to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())), - } - } - lox_db - } +async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver) { + let (lox_db, context) = match find_existing_db(db_config) { + Ok((lox_db, context)) => (lox_db, context), Err(e) => { - panic!("Unable to read or create lox database! {:?}", e); + panic!("Error: {:?}", e); } }; @@ -174,6 +178,7 @@ async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver 0 { println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); @@ -306,7 +311,7 @@ async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver { @@ -347,8 +352,7 @@ async fn main() { let file = File::open(&args.config).expect("Could not read config file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo - let config: Config = - serde_json::from_reader(reader).expect("Reading Config from JSON failed."); + let config: Config = serde_json::from_reader(reader).expect("Reading Config from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); @@ -375,9 +379,8 @@ async fn main() { } }); - let context_manager = spawn(async move { - create_context_manager(config.db_path, context_rx, kill_context).await - }); + let context_manager = + spawn(async move { create_context_manager(config.db, context_rx, kill_context).await }); let (tx, rx) = mpsc::channel(32); let rdsys_stream_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await });