Fixed up inline comments and README for Lox Distributor

This commit is contained in:
onyinyang 2023-11-07 00:10:38 -05:00
parent ea37984da6
commit 73b04b9a19
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
5 changed files with 130 additions and 18 deletions

View File

@ -2,14 +2,80 @@
The Lox distributor receives resources from [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) and writes them to [Lox
BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). It saves the [LoxContext](https://gitlab.torproject.org/tpo/anti-censorship/lox-rs/-/blob/main/crates/lox-distributor/src/lox_context.rs) to a database every time the Lox bridgetable is updated and before the distributor is shutdown.
## Configure rdsys stream
A test `config.json` is included for testing on a local instance of rdsys. This
can be edited to correspond to the desired types of resources, endpoints and database configuration.
## Configuration
A test `config.json` is included for testing on a local instance of rdsys. There are several configurable
fields in this config file:
### DB Config
The DB config `db` accepts a `db_path` where the Lox distributor will look for or create a new Lox database as follows:
```
"db": {
"db_path": "path/to/db"
}
```
### Rdsys Config
The rdsys request `rtype` has the following fields:
`endpoint` the endpoint of the rdsys instance that the distributor will make requests to,
`name` the type of distributor we are requesting. In most cases this should be `lox`,
`token` the corresponding Api Token,
`types` the type of bridges that are being accepted.
Example configuration:
```
"rtype": {
"endpoint": "http://127.0.0.1:7100/resources",
"name": "lox",
"token": "LoxApiTokenPlaceholder",
"types": [
"obfs2",
"scramblesuit"
]
}
```
### Bridge Config
The Bridge config, `bridge_config` has the following fields:
`watched_blockages` lists the regions (as ISO 3166 country codes) that Lox will monitor for listed blockages
`percent_spares` is the percentage of buckets that should be allocated as hot spares (as opposed to open invitation buckets)
Example configuration:
```
"bridge_config": {
"watched_blockages": [
"RU"
],
"percent_spares": 50
},
```
### Metrics Port
The `metrics_port` field is the port that the prometheus server will run on.
### Command Line Arguments for Advanced Database Config
There are a few configurations for the Lox database that can be passed as arguments at run time since they are not likely to be suitable as persistent configuration options.
Rolling back to a previous version of the database is possible by passing the
`roll_back_date` flag at runtime and providing the date/time as a `%Y-%m-%d_%H:%M:%S` string. This argument should be passed if the `LoxContext` should be rolled back to a previous state due to, for example, a mass blocking event that is likely not due to Lox user behaviour. If the exact roll back date/time is not known, the last db entry within 24 hours from the passed `roll_back_date` will be used or else the program will fail gracefully.
## Test Run
For testing purposes, you will need a running instance of rdsys as well as a running Lox client.
For testing purposes, you will need a running instance of [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) as well as a running Lox client.
### Run rdsys locally
@ -25,13 +91,6 @@ Finally run rdsys:
./backend --config config.json
```
## Database Config
The database has a few configuration options. The path for where the database
should be read/written can be specified in the `config.json`. Rolling back to a
previous version of the database is also possible by passing the
`roll_back_date` flag at runtime and providing the date/time as a `%Y-%m-%d_%H:%M:%S` string. This argument should be passed if the `LoxContext` should be rolled back to a previous state due to, for example, a mass blocking event that is likely not due to Lox user behaviour. If the exact roll back date/time is not known, the last db entry within 24 hours from the passed `roll_back_date` will be used or else the program will fail gracefully.
### Run Lox Distributor locally
Simply run `cargo run -- config.json` :)

View File

@ -6,11 +6,14 @@ use chrono::{naive::Days, DateTime, Local, NaiveDateTime, Utc};
use lox_library::{BridgeAuth, BridgeDb};
use sled::IVec;
// Database of Lox Distributor State
pub struct DB {
db: sled::Db,
}
impl DB {
// Writes the Lox context to the lox database with "context_%Y-%m-%d_%H:%M:%S" as the
// database key
pub fn write_context(&mut self, context: lox_context::LoxServerContext) {
let date = Local::now().format("context_%Y-%m-%d_%H:%M:%S").to_string();
let json_result = serde_json::to_vec(&context).unwrap();
@ -28,6 +31,9 @@ impl DB {
);
}
// If roll_back_date is empty, opens the most recent entry in the lox database or if none exists, creates a
// new database. If roll_back_date is not empty, use the specified date to roll back to a previous lox-context
// either exactly the entry at the roll_back_date or within 24 hours from thhe roll_back_date.
pub fn open_new_or_existing_db(
db_config: DbConfig,
roll_back_date: Option<String>,
@ -62,6 +68,7 @@ impl DB {
}
}
// Logic for finding the correct context to open from the database
fn read_lox_context_from_db(
lox_db: sled::Db,
roll_back_date: Option<String>,
@ -117,6 +124,7 @@ fn compute_startdate_string(date_range_end: String) -> Option<DateTime<Utc>> {
dt.with_timezone(&Utc).checked_sub_days(Days::new(1))
}
// Use the last context that was entered into the database
fn use_last_context(lox_db: sled::Db) -> lox_context::LoxServerContext {
let ivec_context = lox_db.last().unwrap().unwrap();
let ivec_date: String = String::from_utf8(ivec_context.0.to_vec()).unwrap();

View File

@ -96,6 +96,10 @@ impl LoxServerContext {
accounted_for_bridges
}
// When syncing resources with rdsys, handle the non-working resources
// Those that are blocked in the target region are marked as unreachable/blocked
// All others are matched by fingerprint and if they are still in the grace period, they are updated
// otherwise they are replaced with new bridges
pub fn handle_not_working_resources(
&self,
not_working_resources: Vec<Resource>,
@ -239,6 +243,9 @@ impl LoxServerContext {
to_be_replaced_bridges.push(bridge);
}
// Add extra_bridges to the Lox bridge table as open invitation bridges
// TODO: Add some consideration for whether or not bridges should be sorted as
// open invitation buckets or hot spare buckets
pub fn allocate_leftover_bridges(&self) {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@ -246,6 +253,7 @@ impl LoxServerContext {
ba_obj.allocate_bridges(&mut extra_bridges, &mut db_obj);
}
// Add an open invitation bucket to the Lox db
pub fn add_openinv_bucket(&self, bucket: [BridgeLine; 3]) {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@ -260,6 +268,7 @@ impl LoxServerContext {
}
}
// Add a hot spare bucket to the Lox db
pub fn add_spare_bucket(&self, bucket: [BridgeLine; 3]) {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@ -296,7 +305,17 @@ impl LoxServerContext {
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
}
*/
// Mark bridges as blocked
pub fn mark_blocked(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
ba_obj.bridge_unreachable(&bridgeline, &mut db_obj)
}
// Find the bridgeline in the Lox bridge table that matches the fingerprint
// of the bridgeline passed by argument. Once found, replace it with the bridgeline
// passed by argument to ensure all fields besides the fingerprint are updated
// appropriately.
pub fn update_bridge(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.bridge_update(&bridgeline)
@ -311,11 +330,13 @@ impl LoxServerContext {
println!("Today's date according to server: {}", ba_obj.today());
}
// Encrypts the Lox bridge table, should be called after every sync
pub fn encrypt_table(&self) -> HashMap<u32, EncryptedBucket> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.enc_bridge_table().clone()
}
// Returns a vector of the Lox Authority's public keys
fn pubkeys(&self) -> Vec<IssuerPubKey> {
let ba_obj = self.ba.lock().unwrap();
// vector of public keys (to serialize)
@ -328,6 +349,8 @@ impl LoxServerContext {
]
}
// Generates a Lox invitation if fewer than MAX_BRIDGES_PER_DAY have been
// requested on a given day
fn gen_invite(&self) -> Result<lox_utils::Invite, ExceededMaxBridgesError> {
let mut obj = self.db.lock().unwrap();
match obj.invite() {
@ -341,11 +364,13 @@ impl LoxServerContext {
}
}
// Returns a valid open_invite::Response if the open_invite::Request is valid
fn open_inv(&self, req: open_invite::Request) -> Result<open_invite::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_open_invite(req)
}
// Returns a valid trust_promotion:: Response if the trust_promotion::Request is valid
fn trust_promo(
&self,
req: trust_promotion::Request,
@ -354,16 +379,19 @@ impl LoxServerContext {
ba_obj.handle_trust_promotion(req)
}
// Returns a valid trust_migration::Response if the trust_migration::Request is valid
fn trust_migration(&self, req: migration::Request) -> Result<migration::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_migration(req)
}
// Returns a valid level_up:: Response if the level_up::Request is valid
fn level_up(&self, req: level_up::Request) -> Result<level_up::Response, ProofError> {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.handle_level_up(req)
}
// Returns a valid issue_invite::Response if the issue_invite::Request is valid
fn issue_invite(
&self,
req: issue_invite::Request,
@ -372,6 +400,7 @@ impl LoxServerContext {
ba_obj.handle_issue_invite(req)
}
// Returns a valid redeem_invite::Response if the redeem_invite::Request is valid
fn redeem_invite(
&self,
req: redeem_invite::Request,
@ -380,6 +409,7 @@ impl LoxServerContext {
ba_obj.handle_redeem_invite(req)
}
// Returns a valid check_blockage::Response if the check_blockage::Request is valid
fn check_blockage(
&self,
req: check_blockage::Request,
@ -388,6 +418,7 @@ impl LoxServerContext {
ba_obj.handle_check_blockage(req)
}
// Returns a valid blockage_migration::Response if the blockage_migration::Request is valid
fn blockage_migration(
&self,
req: blockage_migration::Request,
@ -396,7 +427,7 @@ impl LoxServerContext {
ba_obj.handle_blockage_migration(req)
}
// Generate and return an open invitation token
// Generate and return an open invitation token as an HTTP response
pub fn generate_invite(self) -> Response<Body> {
self.metrics.invites_requested.inc();
let invite = self.gen_invite();
@ -415,7 +446,7 @@ impl LoxServerContext {
}
}
// Return the serialized encrypted bridge table
// Return the serialized encrypted bridge table as an HTTP response
pub fn send_reachability_cred(self) -> Response<Body> {
let enc_table = self.encrypt_table();
let etable = lox_utils::EncBridgeTable { etable: enc_table };
@ -428,7 +459,7 @@ impl LoxServerContext {
}
}
// Return the serialized pubkeys for the Bridge Authority
// Return the serialized pubkeys for the Bridge Authority as an HTTP response
pub fn send_keys(self) -> Response<Body> {
let pubkeys = self.pubkeys();
match serde_json::to_string(&pubkeys) {
@ -440,6 +471,7 @@ impl LoxServerContext {
}
}
// Verify the open invitation request and return the result as an HTTP response
pub fn verify_and_send_open_cred(self, request: Bytes) -> Response<Body> {
let req = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -458,6 +490,7 @@ impl LoxServerContext {
}
}
// Verify the trust promotion request and return the result as an HTTP response
pub fn verify_and_send_trust_promo(self, request: Bytes) -> Response<Body> {
let req: trust_promotion::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -476,6 +509,7 @@ impl LoxServerContext {
}
}
// Verify the trust migration request and return the result as an HTTP response
pub fn verify_and_send_trust_migration(self, request: Bytes) -> Response<Body> {
let req: migration::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -494,6 +528,7 @@ impl LoxServerContext {
}
}
// Verify the level up request and return the result as an HTTP response
pub fn verify_and_send_level_up(self, request: Bytes) -> Response<Body> {
let req: level_up::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -512,6 +547,7 @@ impl LoxServerContext {
}
}
// Verify the open invitation request and return the result as an HTTP response
pub fn verify_and_send_issue_invite(self, request: Bytes) -> Response<Body> {
let req: issue_invite::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -530,6 +566,7 @@ impl LoxServerContext {
}
}
// Verify the redeem invite request and return the result as an HTTP response
pub fn verify_and_send_redeem_invite(self, request: Bytes) -> Response<Body> {
let req: redeem_invite::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -548,6 +585,7 @@ impl LoxServerContext {
}
}
// Verify the check blockage request and return the result as an HTTP response
pub fn verify_and_send_check_blockage(self, request: Bytes) -> Response<Body> {
let req: check_blockage::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -566,6 +604,7 @@ impl LoxServerContext {
}
}
// Verify the blockage migration request and return the result as an HTTP response
pub fn verify_and_send_blockage_migration(self, request: Bytes) -> Response<Body> {
let req: blockage_migration::Request = match serde_json::from_slice(&request) {
Ok(req) => req,
@ -585,6 +624,7 @@ impl LoxServerContext {
}
}
// Prepare HTTP Response for successful Server Request
fn prepare_header(response: String) -> Response<Body> {
let mut resp = Response::new(Body::from(response));
resp.headers_mut()
@ -592,6 +632,7 @@ fn prepare_header(response: String) -> Response<Body> {
resp
}
// Prepare HTTP Response for errored Server Request
fn prepare_error_header(error: String) -> Response<Body> {
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)

View File

@ -100,9 +100,9 @@ struct ResourceInfo {
}
// Populate Bridgedb from rdsys
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
// Rdsys sender creates a Resource request with the api_endpoint, resource token and type specified
// in the config.json file.
async fn rdsys_stream(
async fn rdsys_request_creator(
rtype: ResourceInfo,
tx: mpsc::Sender<ResourceState>,
mut kill: broadcast::Receiver<()>,
@ -114,6 +114,8 @@ async fn rdsys_stream(
}
}
// Makes a request to rdsys for the full set of Resources assigned to lox every interval
// (defined in the function)
async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<ResourceState>) {
let mut interval = interval(Duration::from_secs(5));
loop {
@ -130,6 +132,7 @@ async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<ResourceState>) {
}
}
// Parse bridges received from rdsys and sync with Lox context
async fn rdsys_bridge_parser(
rdsys_tx: mpsc::Sender<Command>,
rx: mpsc::Receiver<ResourceState>,
@ -152,6 +155,7 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<R
}
}
// Create a prometheus metrics server
async fn start_metrics_collector(
metrics_addr: SocketAddr,
registry: Registry,
@ -314,7 +318,7 @@ async fn main() {
});
let (tx, rx) = mpsc::channel(32);
let rdsys_request_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await });
let rdsys_request_handler = spawn(async { rdsys_request_creator(config.rtype, tx, kill_stream).await });
let rdsys_resource_receiver =
spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });

View File

@ -161,7 +161,7 @@ pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry)
.unwrap();
}
/// This function returns a HTTP handler (i.e. another function)
/// This function returns an HTTP handler (i.e. another function)
pub fn make_handler(
registry: Arc<Registry>,
) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> {