Fix up database handling

This commit is contained in:
onyinyang 2023-08-03 18:22:39 -04:00
parent f787253ad5
commit ffb05d403a
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
4 changed files with 155 additions and 92 deletions

View File

@ -1,5 +1,7 @@
{
"db_path": "lox_db",
"db": {
"db_path": "lox_db"
},
"rtype": {
"endpoint": "http://127.0.0.1:7100/resource-stream",
"name": "https",

View File

@ -0,0 +1,105 @@
use std::sync::{Arc, Mutex};
use crate::{lox_context, DbConfig};
use chrono::prelude::*;
use lox_library::{BridgeAuth, BridgeDb};
use sled::IVec;
pub fn write_context_to_db(db: sled::Db, context: lox_context::LoxServerContext) {
let date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string();
let json_date = serde_json::to_vec(&date).unwrap();
let json_result = serde_json::to_vec(&context).unwrap();
let _ = db.insert(IVec::from(json_date), IVec::from(json_result));
}
pub fn find_existing_db(
db_config: DbConfig,
) -> Result<(sled::Db, lox_context::LoxServerContext), sled::Error> {
let context: lox_context::LoxServerContext;
let (lox_db, context) = match sled::open(db_config.db_path) {
Ok(lox_db) => {
// Check if the lox_db already exists
if lox_db.was_recovered() {
// Check if there is a roll back date and try to choose the appropriate context
// to rollback to, otherwise, take the last saved context
match db_config.roll_back_date {
// If roll back date has been specified, either the exact date or range should be set
Some(roll_back_date) => {
// If the exact date is specified and it's in the database, use that to populate the context
match roll_back_date.exact_date {
Some(exact_date) => {
match lox_db.contains_key(exact_date.clone()) {
// Find exact date/time in db and use the context from that date.
Ok(_) => {
let ivec_context = lox_db.get(exact_date).unwrap().unwrap();
context = serde_json::from_slice(&ivec_context).unwrap();
}
// If the entered date is not found, use the last saved context
Err(_) => {
println!("UNEXPECTED DATE: Specified exact date not found in lox db! Using last saved context");
context = use_last_context(lox_db.clone());
}
}
}
// Otherwise check that the start of a range has been specified
None => {
match roll_back_date.date_range_start {
Some(start) => {
// If so, ensure the end of the range has also been specified
if let Some(end) = roll_back_date.date_range_end {
let start_json: Vec<u8> =
serde_json::to_vec(&start).unwrap();
let end_json: Vec<u8> =
serde_json::to_vec(&end).unwrap();
let r: sled::Iter = lox_db.range(
IVec::from(start_json)..IVec::from(end_json),
);
let ivec_context = r.last().unwrap().unwrap().1;
context =
serde_json::from_slice(&ivec_context).unwrap();
} else {
println!("UNEXPECTED DATE: Start of range was specified but End of range was not! Using last saved context");
context = use_last_context(lox_db.clone());
}
}
// If not, roll_back_date should not have been indicated but it was! So just use the last
// context and print that this is happening.
None => {
// If exact date and rollback range are blank, use the last saved context
println!("UNEXPECTED DATE: No date specified despite indicating rollback! Using last saved context");
context = use_last_context(lox_db.clone());
}
}
}
}
}
// Use the last entry to populate the Lox context if no rollback date is set (which should be most common)
None => {
context = use_last_context(lox_db.clone());
}
}
//Otherwise, create a new Lox context
} else {
let new_db = BridgeDb::new();
let new_ba = BridgeAuth::new(new_db.pubkey);
context = lox_context::LoxServerContext {
db: Arc::new(Mutex::new(new_db)),
ba: Arc::new(Mutex::new(new_ba)),
extra_bridges: Arc::new(Mutex::new(Vec::new())),
to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())),
}
}
(lox_db, context)
}
Err(e) => {
panic!("Unable to read or create lox database! {:?}", e);
}
};
Ok((lox_db, context))
}
fn use_last_context(cloned_db: sled::Db) -> lox_context::LoxServerContext {
let ivec_context = cloned_db.last().unwrap().unwrap().1;
println!("Using db date: {:?}", ivec_context);
serde_json::from_slice(&ivec_context).unwrap()
}

View File

@ -1,47 +0,0 @@
use crate::lox_context;
use chrono::prelude::*;
use sled::IVec;
use std::{
env,
error::Error,
fs::{DirEntry, File},
io::BufReader,
path::Path,
};
pub fn read_context_from_file<P: AsRef<Path>>(
path: P,
) -> Result<lox_context::LoxServerContext, Box<dyn Error>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let context = serde_json::from_reader(reader)?;
Ok(context)
}
pub fn write_context_to_db(db: sled::Db, context: lox_context::LoxServerContext) {
let date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string();
let json_date = serde_json::to_vec(&date).unwrap();
let json_result = serde_json::to_vec(&context).unwrap();
let _ = db.insert(IVec::from(json_date), IVec::from(json_result));
}
pub fn write_context_to_file(context: lox_context::LoxServerContext) {
let mut date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string();
let path = "_lox.json";
date.push_str(path);
let file = File::create(&date)
.expect(format!("Unable to write to db file: {:?} !", stringify!($date)).as_str());
let _ = serde_json::to_writer(file, &context);
}
pub fn check_file_exists() -> Option<DirEntry> {
let current_path = env::current_dir().expect("Unable to access current dir");
std::fs::read_dir(current_path)
.expect("Couldn't read local directory")
.flatten() // Remove failed
.filter(|f| {
f.metadata().unwrap().is_file()
&& (f.file_name().into_string().unwrap().contains("_lox.json"))
}) // Filter out directories (only consider files)
.max_by_key(|x| x.metadata().unwrap().modified().unwrap())
}

View File

@ -8,23 +8,16 @@ use hyper::{
Body, Request, Response, Server,
};
use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use lox_library::{BridgeAuth, BridgeDb};
use rdsys_backend::{proto::ResourceDiff, start_stream};
use serde::Deserialize;
use std::{
convert::Infallible,
fs::File,
io::BufReader,
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
convert::Infallible, fs::File, io::BufReader, net::SocketAddr, path::PathBuf, time::Duration,
};
mod file_reader;
use file_reader::write_context_to_db;
mod db_handler;
use db_handler::{find_existing_db, write_context_to_db};
mod lox_context;
mod request_handler;
use request_handler::handle;
@ -60,10 +53,41 @@ struct Args {
#[derive(Debug, Deserialize)]
struct Config {
db_path: String,
db: DbConfig,
rtype: ResourceInfo,
}
#[derive(Debug, Deserialize)]
pub struct DbConfig {
// The path for the lox_context database, default is "lox_db"
db_path: String,
// Optional Date/time to roll back to
// This should be blank for normal operation and only changed if the lox_context
// should be rolled back to a previous state
roll_back_date: Option<RollBackDate>,
}
impl Default for DbConfig {
fn default() -> DbConfig {
DbConfig {
db_path: "lox_db".to_owned(),
roll_back_date: None,
}
}
}
#[derive(Debug, Deserialize)]
pub struct RollBackDate {
// Optional exact Date/time to roll back to as a %Y-%m-%d_%H:%M:%S string
// This should only be changed if the lox_context should be rolled back to a
// previous state and the exact roll back date/time is known
exact_date: Option<String>,
// Since the exact time the database last saved the context may not be obvious,
// this can be presented as a range from which the most recent date in the database will be
// selected
date_range_start: Option<String>,
date_range_end: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ResourceInfo {
endpoint: String,
@ -121,12 +145,12 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<R
}
async fn create_context_manager(
backup_context_path: String,
db_config: DbConfig,
context_rx: mpsc::Receiver<Command>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
create_context = context_manager(backup_context_path, context_rx) => create_context,
create_context = context_manager(db_config, context_rx) => create_context,
_ = kill.recv() => {println!("Shut down context_manager");},
}
}
@ -134,31 +158,11 @@ async fn create_context_manager(
// Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring
// that the DB can be updated from the rdsys stream and client requests
// can be responded to with an updated BridgeDB state
async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver<Command>) {
let context: lox_context::LoxServerContext;
let existing_db = match sled::open(db_path) {
Ok(lox_db) => {
// Check if the lox_db already exists
if lox_db.was_recovered() {
// And use the last entry to populate the Lox context if so
// TODO add functionality to specify the key or roll back to a previous time
let ivec_context = lox_db.last().unwrap().unwrap().1;
context = serde_json::from_slice(&ivec_context).unwrap();
//Otherwise, create a new Lox context
} else {
let new_db = BridgeDb::new();
let new_ba = BridgeAuth::new(new_db.pubkey);
context = lox_context::LoxServerContext {
db: Arc::new(Mutex::new(new_db)),
ba: Arc::new(Mutex::new(new_ba)),
extra_bridges: Arc::new(Mutex::new(Vec::new())),
to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())),
}
}
lox_db
}
async fn context_manager(db_config: DbConfig, mut context_rx: mpsc::Receiver<Command>) {
let (lox_db, context) = match find_existing_db(db_config) {
Ok((lox_db, context)) => (lox_db, context),
Err(e) => {
panic!("Unable to read or create lox database! {:?}", e);
panic!("Error: {:?}", e);
}
};
@ -174,6 +178,7 @@ async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver<Command
if let Some(resources) = pt.1 {
for resource in resources {
let bridgeline = parse_resource(resource);
println!("What is the bridgeline: {:?}", bridgeline);
if context.to_be_replaced_bridges.lock().unwrap().len() > 0 {
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
@ -306,7 +311,7 @@ async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver<Command
*/
context.allocate_leftover_bridges();
context.encrypt_table();
write_context_to_db(existing_db.clone(), context.clone());
write_context_to_db(lox_db.clone(), context.clone());
sleep(Duration::from_millis(1)).await;
}
Request { req, sender } => {
@ -347,8 +352,7 @@ async fn main() {
let file = File::open(&args.config).expect("Could not read config file");
let reader = BufReader::new(file);
// Read the JSON contents of the file as a ResourceInfo
let config: Config =
serde_json::from_reader(reader).expect("Reading Config from JSON failed.");
let config: Config = serde_json::from_reader(reader).expect("Reading Config from JSON failed.");
let (rdsys_tx, context_rx) = mpsc::channel(32);
let request_tx = rdsys_tx.clone();
@ -375,9 +379,8 @@ async fn main() {
}
});
let context_manager = spawn(async move {
create_context_manager(config.db_path, context_rx, kill_context).await
});
let context_manager =
spawn(async move { create_context_manager(config.db, context_rx, kill_context).await });
let (tx, rx) = mpsc::channel(32);
let rdsys_stream_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await });