From f787253ad55d3cf56b5f7f9b56c16a20527d166d Mon Sep 17 00:00:00 2001 From: onyinyang Date: Tue, 1 Aug 2023 15:09:46 -0400 Subject: [PATCH] Added support for lox-context storage through sled --- Cargo.lock | 127 +++++++++++++++++++++- crates/lox-distributor/Cargo.toml | 1 + crates/lox-distributor/config.json | 3 + crates/lox-distributor/src/file_reader.rs | 10 +- crates/lox-distributor/src/lox_context.rs | 2 +- crates/lox-distributor/src/main.rs | 61 +++++++---- 6 files changed, 175 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d862c3..694bc44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -366,12 +366,43 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crc64" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2707e3afba5e19b75d582d88bc79237418f2a2a2d673d01cf9b03633b46e98f3" +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg 1.1.0", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -579,6 +610,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -674,6 +715,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -911,6 +961,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -1008,6 +1067,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sled", "time 0.3.28", "tokio", "zkp", @@ -1070,6 +1130,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg 1.1.0", +] + [[package]] name = "merlin" version = "2.0.1" @@ -1288,6 +1357,17 @@ dependencies = [ "libm", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1295,7 +1375,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -1306,7 +1400,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", "windows-targets", ] @@ -1575,6 +1669,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1815,6 +1918,22 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + [[package]] name = "smallvec" version = "1.11.0" @@ -1878,7 +1997,7 @@ checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", + "redox_syscall 0.3.5", "rustix", "windows-sys", ] @@ -1969,7 +2088,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index 52ce60a..8ff6556 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -28,6 +28,7 @@ lox_utils = { path = "../lox-utils", version = "0.1.0"} rdsys_backend = { path = "../rdsys-backend-api", version = "0.2"} clap = { version = "4.4.2", features = ["derive"] } serde_json = "1.0.105" +sled = "0.34.7" [dependencies.chrono] version = "0.4.27" diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 177e5a2..4e070bc 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -1,4 +1,6 @@ { + "db_path": "lox_db", + "rtype": { "endpoint": "http://127.0.0.1:7100/resource-stream", "name": "https", "token": "HttpsApiTokenPlaceholder", @@ -6,4 +8,5 @@ "obfs2", "scramblesuit" ] +} } \ No newline at end of file diff --git a/crates/lox-distributor/src/file_reader.rs b/crates/lox-distributor/src/file_reader.rs index 3a0846f..2b98554 100644 --- a/crates/lox-distributor/src/file_reader.rs +++ b/crates/lox-distributor/src/file_reader.rs @@ -1,5 +1,6 @@ use crate::lox_context; use chrono::prelude::*; +use sled::IVec; use std::{ env, error::Error, @@ -17,6 +18,13 @@ pub fn read_context_from_file>( Ok(context) } +pub fn write_context_to_db(db: sled::Db, context: lox_context::LoxServerContext) { + let date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string(); + let json_date = serde_json::to_vec(&date).unwrap(); + let json_result = serde_json::to_vec(&context).unwrap(); + let _ = db.insert(IVec::from(json_date), IVec::from(json_result)); +} + pub fn write_context_to_file(context: lox_context::LoxServerContext) { let mut date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string(); let path = "_lox.json"; @@ -26,7 +34,7 @@ pub fn write_context_to_file(context: lox_context::LoxServerContext) { let _ = serde_json::to_writer(file, &context); } -pub fn check_db_exists() -> Option { +pub fn check_file_exists() -> Option { let current_path = env::current_dir().expect("Unable to access current dir"); std::fs::read_dir(current_path) .expect("Couldn't read local directory") diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index b732880..a777a0d 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -16,7 +16,7 @@ use std::{ }; use zkp::ProofError; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct LoxServerContext { pub db: Arc>, pub ba: Arc>, diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index 018661c..34df0b3 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -24,7 +24,7 @@ use std::{ }; mod file_reader; -use file_reader::{check_db_exists, read_context_from_file, write_context_to_file}; +use file_reader::write_context_to_db; mod lox_context; mod request_handler; use request_handler::handle; @@ -58,6 +58,12 @@ struct Args { backup_context: Option, } +#[derive(Debug, Deserialize)] +struct Config { + db_path: String, + rtype: ResourceInfo, + +} #[derive(Debug, Deserialize)] struct ResourceInfo { endpoint: String, @@ -115,7 +121,7 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver, + backup_context_path: String, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { @@ -128,23 +134,33 @@ async fn create_context_manager( // Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring // that the DB can be updated from the rdsys stream and client requests // can be responded to with an updated BridgeDB state -async fn context_manager(db_path: Option, mut context_rx: mpsc::Receiver) { +async fn context_manager(db_path: String, mut context_rx: mpsc::Receiver) { let context: lox_context::LoxServerContext; - if let Some(existing_db) = db_path.as_deref() { - context = read_context_from_file(existing_db).unwrap(); - } else if let Some(last_modified_file) = check_db_exists() { - println!("Reading from file {:?}", last_modified_file); - context = read_context_from_file(&last_modified_file.path()).unwrap(); - } else { - let new_db = BridgeDb::new(); - let new_ba = BridgeAuth::new(new_db.pubkey); - context = lox_context::LoxServerContext { - db: Arc::new(Mutex::new(new_db)), - ba: Arc::new(Mutex::new(new_ba)), - extra_bridges: Arc::new(Mutex::new(Vec::new())), - to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())), + let existing_db = match sled::open(db_path) { + Ok(lox_db) => { + // Check if the lox_db already exists + if lox_db.was_recovered() { + // And use the last entry to populate the Lox context if so + // TODO add functionality to specify the key or roll back to a previous time + let ivec_context = lox_db.last().unwrap().unwrap().1; + context = serde_json::from_slice(&ivec_context).unwrap(); + //Otherwise, create a new Lox context + } else { + let new_db = BridgeDb::new(); + let new_ba = BridgeAuth::new(new_db.pubkey); + context = lox_context::LoxServerContext { + db: Arc::new(Mutex::new(new_db)), + ba: Arc::new(Mutex::new(new_ba)), + extra_bridges: Arc::new(Mutex::new(Vec::new())), + to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())), + } + } + lox_db } - } + Err(e) => { + panic!("Unable to read or create lox database! {:?}", e); + } + }; while let Some(cmd) = context_rx.recv().await { use Command::*; @@ -158,7 +174,6 @@ async fn context_manager(db_path: Option, mut context_rx: mpsc::Receive if let Some(resources) = pt.1 { for resource in resources { let bridgeline = parse_resource(resource); - println!("Now it's a bridgeline: {:?}", bridgeline); if context.to_be_replaced_bridges.lock().unwrap().len() > 0 { println!("BridgeLine to be replaced: {:?}", bridgeline); let res = context.replace_with_new(bridgeline); @@ -291,7 +306,7 @@ async fn context_manager(db_path: Option, mut context_rx: mpsc::Receive */ context.allocate_leftover_bridges(); context.encrypt_table(); - write_context_to_file(context.clone()); + write_context_to_db(existing_db.clone(), context.clone()); sleep(Duration::from_millis(1)).await; } Request { req, sender } => { @@ -332,8 +347,8 @@ async fn main() { let file = File::open(&args.config).expect("Could not read config file"); let reader = BufReader::new(file); // Read the JSON contents of the file as a ResourceInfo - let rtype: ResourceInfo = - serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed."); + let config: Config = + serde_json::from_reader(reader).expect("Reading Config from JSON failed."); let (rdsys_tx, context_rx) = mpsc::channel(32); let request_tx = rdsys_tx.clone(); @@ -361,11 +376,11 @@ async fn main() { }); let context_manager = spawn(async move { - create_context_manager(args.backup_context, context_rx, kill_context).await + create_context_manager(config.db_path, context_rx, kill_context).await }); let (tx, rx) = mpsc::channel(32); - let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await }); + let rdsys_stream_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await }); let rdsys_resource_receiver = spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });