Merge changes from upstream

This commit is contained in:
Vecna 2023-11-06 16:08:12 -05:00
commit 6f1ccd9485
28 changed files with 1566 additions and 698 deletions

650
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,23 +12,26 @@ keywords = ["tor", "lox", "bridges"]
[dependencies]
julianday = "1.2.0"
base64 = "0.13.1"
base64 = "0.21.5"
hyper = { version = "0.14.27", features = ["server"] }
hex_fmt = "0.3"
futures = "0.3.28"
time = "0.3.28"
futures = "0.3.29"
time = "0.3.30"
tokio = { version = "1", features = ["full", "macros", "signal"] }
rand = "0.8.5"
reqwest = { version = "0.11", features = ["json", "stream"]}
serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "3.3.0"
zkp = "0.8.0"
serde_with = "3.4.0"
zkp = { git = "https://gitlab.torproject.org/onyinyang/lox-zkp" }
lox-library = { path = "../lox-library", version = "0.1.0"}
lox_utils = { path = "../lox-utils", version = "0.1.0"}
rdsys_backend = { path = "../rdsys-backend-api", version = "0.2"}
clap = { version = "4.4.2", features = ["derive"] }
serde_json = "1.0.105"
clap = { version = "4.4.7", features = ["derive"] }
serde_json = "1.0.108"
prometheus = "0.13.3"
sled = "0.34.7"
prometheus-client = "0.22.0"
[dependencies.chrono]
version = "0.4.27"
version = "0.4.31"
features = ["serde"]

View File

@ -1,12 +1,11 @@
# Lox Distributor
The Lox distributor receives resources from [rdsys](https://gitlab.torproject.org/tpo/anti-censorship/rdsys) and writes them to [Lox
BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm).
BridgeLines](https://git-crysp.uwaterloo.ca/iang/lox/src/master/src/bridge_table.rs#L42). Concurrently, it receives and responds to requests from [Lox clients](https://gitlab.torproject.org/tpo/anti-censorship/lox/lox-wasm). It saves the [LoxContext](https://gitlab.torproject.org/tpo/anti-censorship/lox-rs/-/blob/main/crates/lox-distributor/src/lox_context.rs) to a database every time the Lox bridgetable is updated and before the distributor is shutdown.
## Configure rdsys stream
A test `config.json` is included for testing on a local instance of rdsys. This
can be edited to correspond to the desired types of resources and endpoints.
can be edited to correspond to the desired types of resources, endpoints and database configuration.
## Test Run
@ -26,6 +25,13 @@ Finally run rdsys:
./backend --config config.json
```
## Database Config
The database has a few configuration options. The path for where the database
should be read/written can be specified in the `config.json`. Rolling back to a
previous version of the database is also possible by passing the
`roll_back_date` flag at runtime and providing the date/time as a `%Y-%m-%d_%H:%M:%S` string. This argument should be passed if the `LoxContext` should be rolled back to a previous state due to, for example, a mass blocking event that is likely not due to Lox user behaviour. If the exact roll back date/time is not known, the last db entry within 24 hours from the passed `roll_back_date` will be used or else the program will fail gracefully.
### Run Lox Distributor locally
Simply run `cargo run -- config.json` :)

View File

@ -1,9 +1,19 @@
{
"endpoint": "http://127.0.0.1:7100/resource-stream",
"name": "https",
"token": "HttpsApiTokenPlaceholder",
"types": [
"obfs2",
"scramblesuit"
]
"db": {
"db_path": "lox_db"
},
"metrics_port": 5222,
"bridge_config": {
"percent_spares": 50
},
"rtype": {
"endpoint": "http://127.0.0.1:7100/resources",
"name": "https",
"token": "HttpsApiTokenPlaceholder",
"types": [
"obfs2",
"scramblesuit"
]
}
}

File diff suppressed because one or more lines are too long

View File

@ -1,39 +0,0 @@
use crate::lox_context;
use chrono::prelude::*;
use std::{
env,
error::Error,
fs::{DirEntry, File},
io::BufReader,
path::Path,
};
pub fn read_context_from_file<P: AsRef<Path>>(
path: P,
) -> Result<lox_context::LoxServerContext, Box<dyn Error>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let context = serde_json::from_reader(reader)?;
Ok(context)
}
pub fn write_context_to_file(context: lox_context::LoxServerContext) {
let mut date = Local::now().format("%Y-%m-%d_%H:%M:%S").to_string();
let path = "_lox.json";
date.push_str(path);
let file = File::create(&date)
.expect(format!("Unable to write to db file: {:?} !", stringify!($date)).as_str());
let _ = serde_json::to_writer(file, &context);
}
pub fn check_db_exists() -> Option<DirEntry> {
let current_path = env::current_dir().expect("Unable to access current dir");
std::fs::read_dir(current_path)
.expect("Couldn't read local directory")
.flatten() // Remove failed
.filter(|f| {
f.metadata().unwrap().is_file()
&& (f.file_name().into_string().unwrap().contains("_lox.json"))
}) // Filter out directories (only consider files)
.max_by_key(|x| x.metadata().unwrap().modified().unwrap())
}

View File

@ -6,25 +6,212 @@ use lox_library::{
blockage_migration, check_blockage, issue_invite, level_up, migration, open_invite,
redeem_invite, trust_promotion,
},
BridgeAuth, BridgeDb, IssuerPubKey,
BridgeAuth, BridgeDb, ExceededMaxBridgesError, IssuerPubKey,
};
use rdsys_backend::proto::{Resource, ResourceState};
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::HashMap,
sync::{Arc, Mutex},
};
use zkp::ProofError;
#[derive(Clone, Serialize, Deserialize)]
use crate::metrics::Metrics;
use crate::resource_parser::{parse_into_bridgelines, sort_for_parsing};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LoxServerContext {
pub db: Arc<Mutex<BridgeDb>>,
pub ba: Arc<Mutex<BridgeAuth>>,
pub extra_bridges: Arc<Mutex<Vec<BridgeLine>>>,
pub to_be_replaced_bridges: Arc<Mutex<Vec<BridgeLine>>>,
#[serde(skip)]
pub metrics: Metrics,
}
impl LoxServerContext {
pub fn bridgetable_is_empty(&self) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
ba_obj.is_empty()
}
// Populate an empty bridgetable for the first time
pub fn populate_bridgetable(
&self,
buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>,
percent_spares: i32,
) {
let mut partition: i32 = 0;
if percent_spares != 0 {
partition = buckets.len() as i32 * percent_spares / 100;
}
let (spares, open_invitations) = buckets.split_at(partition as usize);
for bucket in spares {
self.add_spare_bucket(*bucket)
}
for bucket in open_invitations {
self.add_openinv_bucket(*bucket)
}
}
pub fn handle_working_resources(&self, working_resources: Vec<Resource>) -> Vec<u64> {
let mut accounted_for_bridges: Vec<u64> = Vec::new();
let bridgelines = parse_into_bridgelines(working_resources);
for bridge in bridgelines {
/* TODO: Functionality for marking bridges as unreachable/blocked should eventually happen here.
It is currently not enabled as there is not yet a reliable way to determine that a bridge is blocked.
This means that migrations to unblocked bridges do not currently work but can be easily enabled by parsing the
list of `blocked resources` from rdsys or another source with something like the following:
let res = context.add_unreachable(bridgeline);
if res {
println!(
"BridgeLine {:?} successfully marked unreachable: {:?}",
bridgeline
);
} else {
println!(
"BridgeLine {:?} NOT marked unreachable, saved for next update!",
bridge.uid_fingerprint
);
}
*/
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
self.metrics.existing_or_updated_bridges.inc();
// Assume non-failing bridges that are not found in the bridge table are new bridges and save them for later
} else {
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridge.uid_fingerprint);
self.append_extra_bridges(bridge);
self.metrics.new_bridges.inc();
}
}
accounted_for_bridges
}
pub fn handle_not_working_resources(
&self,
not_working_resources: Vec<Resource>,
mut accounted_for_bridges: Vec<u64>,
) -> Vec<u64> {
let (grace_period, failing) = sort_for_parsing(not_working_resources);
// Update bridges in the bridge table that are failing but within the grace period
for bridge in grace_period {
let res = self.update_bridge(bridge);
if res {
println!(
"BridgeLine {:?} successfully updated.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
self.metrics.existing_or_updated_bridges.inc();
}
}
// Next, handle the failing bridges. If resource last passed tests >=ACCEPTED_HOURS_OF_FAILURE ago,
// it should be replaced with a working resource and be removed from the bridgetable.
for bridge in failing {
let res = self.replace_with_new(bridge);
if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine {:?} successfully replaced.",
bridge.uid_fingerprint
);
accounted_for_bridges.push(bridge.uid_fingerprint);
self.metrics.removed_bridges.inc();
} else if res == lox_library::ReplaceSuccess::NotReplaced {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!(
"BridgeLine {:?} NOT replaced, saved for next update!",
bridge.uid_fingerprint
);
self.new_to_be_replaced_bridge(bridge);
self.metrics.existing_or_updated_bridges.inc();
accounted_for_bridges.push(bridge.uid_fingerprint);
} else {
// NotFound
assert!(
res == lox_library::ReplaceSuccess::NotFound,
"ReplaceSuccess incorrectly set"
);
println!(
"BridgeLine {:?} no longer in bridge table.",
bridge.uid_fingerprint
);
}
}
accounted_for_bridges
}
// Sync resources received from rdsys with the Lox bridgetable
pub fn sync_with_bridgetable(&self, resources: ResourceState) {
// Check if each resource is already in the Lox bridgetable. If it is, it's probably fine
// to replace the existing resource with the incoming one to account for changes
// save a list of accounted for bridges and deal with the unaccounted for bridges at the end
let mut accounted_for_bridges: Vec<u64> = Vec::new();
// ensure all working resources are updated and accounted for
if let Some(working_resources) = resources.working {
accounted_for_bridges = self.handle_working_resources(working_resources);
}
if let Some(not_working_resources) = resources.not_working {
accounted_for_bridges =
self.handle_not_working_resources(not_working_resources, accounted_for_bridges);
}
let mut ba_clone = self.ba.lock().unwrap();
let total_reachable = ba_clone.bridge_table.reachable.len();
match total_reachable.cmp(&accounted_for_bridges.len()) {
Ordering::Greater => {
let unaccounted_for = ba_clone.find_and_remove_unaccounted_for_bridges(accounted_for_bridges);
for bridgeline in unaccounted_for {
match self.replace_with_new(bridgeline) {
lox_library::ReplaceSuccess::Replaced => {
println!("BridgeLine {:?} not found in rdsys update was successfully replaced.", bridgeline.uid_fingerprint);
self.metrics.removed_bridges.inc();
}
lox_library::ReplaceSuccess::NotReplaced => {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!("BridgeLine {:?} not found in rdsys update NOT replaced, saved for next update!",
bridgeline.uid_fingerprint);
self.new_to_be_replaced_bridge(bridgeline);
self.metrics.existing_or_updated_bridges.inc();
}
lox_library::ReplaceSuccess::NotFound => println!(
"BridgeLine {:?} no longer in reachable bridges.",
bridgeline.uid_fingerprint
),
}
}
}
Ordering::Less => println!("Something unexpected occurred: The number of reachable bridges should not be less than those updated from rdsys"),
_ => (),
}
// Finally, assign any extra_bridges to new buckets if there are enough
while self.extra_bridges.lock().unwrap().len() >= MAX_BRIDGES_PER_BUCKET {
let bucket = self.remove_extra_bridges();
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
let mut db_obj = self.db.lock().unwrap();
match ba_clone.add_spare_bucket(bucket, &mut db_obj) {
Ok(_) => (),
Err(e) => {
println!("Error: {:?}", e);
for bridge in bucket {
self.append_extra_bridges(bridge);
}
}
}
}
}
pub fn append_extra_bridges(&self, bridge: BridgeLine) {
let mut extra_bridges = self.extra_bridges.lock().unwrap();
extra_bridges.push(bridge);
@ -87,6 +274,8 @@ impl LoxServerContext {
}
}
// Attempt to remove a bridge that is failing tests and replace it with a bridge from the
// available bridges or from a spare bucket
pub fn replace_with_new(&self, bridgeline: BridgeLine) -> lox_library::ReplaceSuccess {
let mut ba_obj = self.ba.lock().unwrap();
let eb_obj = self.extra_bridges.lock().unwrap();
@ -100,7 +289,7 @@ impl LoxServerContext {
result
}
/* Uncomment when bridge blocking is finalized
/* TODO: Uncomment when bridge blocking is finalized
pub fn add_unreachable(&self, bridgeline: BridgeLine) -> bool {
let mut ba_obj = self.ba.lock().unwrap();
let mut db_obj = self.db.lock().unwrap();
@ -143,10 +332,16 @@ impl LoxServerContext {
self.ba.lock().unwrap().today()
}
fn gen_invite(&self) -> lox_utils::Invite {
fn gen_invite(&self) -> Result<lox_utils::Invite, ExceededMaxBridgesError> {
let mut obj = self.db.lock().unwrap();
lox_utils::Invite {
invite: obj.invite(),
match obj.invite() {
Ok(invite) => {
if obj.current_k == 1 {
self.metrics.k_reset_count.inc();
}
Ok(lox_utils::Invite { invite })
}
Err(e) => Err(e),
}
}
@ -207,9 +402,16 @@ impl LoxServerContext {
// Generate and return an open invitation token
pub fn generate_invite(self) -> Response<Body> {
self.metrics.invites_requested.inc();
let invite = self.gen_invite();
match serde_json::to_string(&invite) {
Ok(resp) => prepare_header(resp),
match invite {
Ok(invite) => match serde_json::to_string(&invite) {
Ok(resp) => prepare_header(resp),
Err(e) => {
println!("Error parsing Invite to JSON");
prepare_error_header(e.to_string())
}
},
Err(e) => {
println!("Error parsing Invite to JSON");
prepare_error_header(e.to_string())
@ -261,6 +463,7 @@ impl LoxServerContext {
match self.open_inv(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.open_inv_count.inc();
prepare_header(response)
}
Err(e) => {
@ -278,6 +481,7 @@ impl LoxServerContext {
match self.trust_promo(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.trust_promo_count.inc();
prepare_header(response)
}
Err(e) => {
@ -295,6 +499,7 @@ impl LoxServerContext {
match self.trust_migration(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.trust_mig_count.inc();
prepare_header(response)
}
Err(e) => {
@ -312,6 +517,7 @@ impl LoxServerContext {
match self.level_up(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.level_up_count.inc();
prepare_header(response)
}
Err(e) => {
@ -329,6 +535,7 @@ impl LoxServerContext {
match self.issue_invite(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.issue_invite_count.inc();
prepare_header(response)
}
Err(e) => {
@ -346,6 +553,7 @@ impl LoxServerContext {
match self.redeem_invite(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.redeem_invite_count.inc();
prepare_header(response)
}
Err(e) => {
@ -363,6 +571,7 @@ impl LoxServerContext {
match self.check_blockage(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.check_blockage_count.inc();
prepare_header(response)
}
Err(e) => {
@ -380,6 +589,7 @@ impl LoxServerContext {
match self.blockage_migration(req) {
Ok(resp) => {
let response = serde_json::to_string(&resp).unwrap();
self.metrics.blockage_migration_count.inc();
prepare_header(response)
}
Err(e) => {

View File

@ -1,40 +1,38 @@
use chrono::Utc;
use clap::Parser;
use futures::future;
use futures::StreamExt;
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use lox_library::{BridgeAuth, BridgeDb};
use rdsys_backend::{proto::ResourceDiff, start_stream};
use prometheus_client::registry::Registry;
use rdsys_backend::{proto::ResourceState, request_resources};
use serde::Deserialize;
use std::{
convert::Infallible,
fs::File,
io::BufReader,
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
mod file_reader;
use file_reader::{check_db_exists, read_context_from_file, write_context_to_file};
mod db_handler;
use db_handler::DB;
mod lox_context;
mod metrics;
use metrics::Metrics;
mod request_handler;
use request_handler::handle;
mod resource_parser;
use resource_parser::parse_resource;
use resource_parser::{parse_into_bridgelines, parse_into_buckets};
use tokio::{
signal, spawn,
sync::{broadcast, mpsc, oneshot},
time::sleep,
time::{interval, sleep},
};
async fn shutdown_signal() {
@ -51,11 +49,46 @@ struct Args {
#[arg(short, long, default_value = "config.json")]
config: PathBuf,
/// Optional name/path of the lox context json backup file
/// Used to populate the Lox context. If none is provided, an empty
/// Lox context will be created
// Optional Date/time to roll back to as a %Y-%m-%d_%H:%M:%S string
// This argument should be passed if the lox_context should be rolled back to a
// previous state due to, for example, a mass blocking event that is likely not
// due to Lox user behaviour. If the exact roll back date/time is not known, the
// last db entry within 24 hours from the passed roll_back_date will be used or else
// the program will fail gracefully.
#[arg(short, long, verbatim_doc_comment)]
backup_context: Option<PathBuf>,
roll_back_date: Option<String>,
}
#[derive(Debug, Deserialize)]
struct Config {
db: DbConfig,
metrics_port: u16,
bridge_config: BridgeConfig,
rtype: ResourceInfo,
}
// Path of the lox database
#[derive(Debug, Deserialize)]
pub struct DbConfig {
// The path for the lox_context database, default is "lox_db"
db_path: String,
}
impl Default for DbConfig {
fn default() -> DbConfig {
DbConfig {
db_path: "lox_db".to_owned(),
}
}
}
// Config information for how bridges should be allocated to buckets
#[derive(Debug, Default, Deserialize)]
pub struct BridgeConfig {
// The percentage of buckets (made up of MAX_BRIDGES_PER_BUCKET bridges)
// that should be allocated as spare buckets
// This will be calculated as the floor of buckets.len() * percent_spares / 100
percent_spares: i32,
}
#[derive(Debug, Deserialize)]
@ -69,32 +102,37 @@ struct ResourceInfo {
// Rdsys sender creates a ResourceStream with the api_endpoint, resource token and type specified
// in the config.json file.
// TODO: ensure this stream gracefully shutdowns on the ctrl_c command.
async fn rdsys_stream(
rtype: ResourceInfo,
tx: mpsc::Sender<ResourceDiff>,
tx: mpsc::Sender<ResourceState>,
mut kill: broadcast::Receiver<()>,
) {
let mut rstream = start_stream(rtype.endpoint, rtype.name, rtype.token, rtype.types)
.await
.expect("rdsys stream initialization failed. Start rdsys or check config.json");
loop {
tokio::select! {
res = rstream.next() => {
match res {
Some(diff) => tx.send(diff).await.unwrap(),
None => return,
}
},
_ = kill.recv() => {println!("Shut down rdsys stream"); return},
tokio::select! {
start_resource_request = rdsys_request(rtype, tx) => start_resource_request,
_ = kill.recv() => {println!("Shut down rdsys request loop")},
}
}
}
async fn rdsys_request(rtype: ResourceInfo, tx: mpsc::Sender<ResourceState>) {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
let resources = request_resources(
rtype.endpoint.clone(),
rtype.name.clone(),
rtype.token.clone(),
rtype.types.clone(),
)
.await
.unwrap();
tx.send(resources).await.unwrap();
}
}
async fn rdsys_bridge_parser(
rdsys_tx: mpsc::Sender<Command>,
rx: mpsc::Receiver<ResourceDiff>,
rx: mpsc::Receiver<ResourceState>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
@ -103,24 +141,38 @@ async fn rdsys_bridge_parser(
}
}
// Parse Bridges receives a ResourceDiff from rdsys_sender and sends it to the
// Context Manager to be parsed and added to the BridgeDB
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceDiff>) {
// Parse Bridges receives the resources from rdsys and sends it to the
// Context Manager to be parsed and added to the Lox BridgeDB
async fn parse_bridges(rdsys_tx: mpsc::Sender<Command>, mut rx: mpsc::Receiver<ResourceState>) {
loop {
let resourcediff = rx.recv().await.unwrap();
let cmd = Command::Rdsys { resourcediff };
let resources = rx.recv().await.unwrap();
let cmd = Command::Rdsys { resources };
rdsys_tx.send(cmd).await.unwrap();
sleep(Duration::from_secs(1)).await;
}
}
async fn start_metrics_collector(
metrics_addr: SocketAddr,
registry: Registry,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
lox_metrics = metrics::start_metrics_server(metrics_addr, registry) => lox_metrics,
_ = kill.recv() => {println!("Shut down metrics server");},
}
}
async fn create_context_manager(
backup_context_path: Option<PathBuf>,
db_config: DbConfig,
bridge_config: BridgeConfig,
roll_back_date: Option<String>,
metrics: Metrics,
context_rx: mpsc::Receiver<Command>,
mut kill: broadcast::Receiver<()>,
) {
tokio::select! {
create_context = context_manager(backup_context_path, context_rx) => create_context,
create_context = context_manager(db_config, bridge_config, roll_back_date, metrics, context_rx) => create_context,
_ = kill.recv() => {println!("Shut down context_manager");},
}
}
@ -128,170 +180,50 @@ async fn create_context_manager(
// Context Manager handles the Lox BridgeDB and Bridge Authority, ensuring
// that the DB can be updated from the rdsys stream and client requests
// can be responded to with an updated BridgeDB state
async fn context_manager(db_path: Option<PathBuf>, mut context_rx: mpsc::Receiver<Command>) {
let context: lox_context::LoxServerContext;
if let Some(existing_db) = db_path.as_deref() {
context = read_context_from_file(existing_db).unwrap();
} else if let Some(last_modified_file) = check_db_exists() {
println!("Reading from file {:?}", last_modified_file);
context = read_context_from_file(&last_modified_file.path()).unwrap();
} else {
let new_db = BridgeDb::new();
let new_ba = BridgeAuth::new(new_db.pubkey);
context = lox_context::LoxServerContext {
db: Arc::new(Mutex::new(new_db)),
ba: Arc::new(Mutex::new(new_ba)),
extra_bridges: Arc::new(Mutex::new(Vec::new())),
to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())),
}
}
async fn context_manager(
db_config: DbConfig,
bridge_config: BridgeConfig,
roll_back_date: Option<String>,
metrics: Metrics,
mut context_rx: mpsc::Receiver<Command>,
) {
let (mut lox_db, context) =
match DB::open_new_or_existing_db(db_config, roll_back_date, metrics) {
Ok((lox_db, context)) => (lox_db, context),
Err(e) => {
panic!("Error: {:?}", e);
}
};
while let Some(cmd) = context_rx.recv().await {
use Command::*;
match cmd {
Rdsys { resourcediff } => {
if let Some(new_resources) = resourcediff.new {
let mut count = 0;
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
for pt in new_resources {
println!("A NEW RESOURCE: {:?}", pt);
if let Some(resources) = pt.1 {
for resource in resources {
let bridgeline = parse_resource(resource);
println!("Now it's a bridgeline: {:?}", bridgeline);
if context.to_be_replaced_bridges.lock().unwrap().len() > 0 {
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res == lox_library::ReplaceSuccess::NotFound {
println!(
"BridgeLine not found in bridge_table, already updated {:?}",
bridgeline
);
} else if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine successfully replaced: {:?}",
bridgeline
);
} else {
assert!(
res == lox_library::ReplaceSuccess::NotReplaced,
"ReplaceSuccess incorrectly set somehow"
);
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!("'Gone' BridgeLine NOT replaced, saved for next update! : {:?}", bridgeline);
context.new_to_be_replaced_bridge(bridgeline);
}
} else if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
// TODO: Decide the circumstances under which a bridge is allocated to an open_inv or spare bucket,
// eventually also do some more fancy grouping of new resources, i.e., by type or region
context.add_openinv_bucket(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
}
}
}
}
// Handle the extra buckets that were not allocated already
if count != 0 {
for val in 0..count {
if context.extra_bridges.lock().unwrap().len()
< (MAX_BRIDGES_PER_BUCKET)
{
context.append_extra_bridges(bucket[val]);
} else {
bucket = context.remove_extra_bridges();
context.add_spare_bucket(bucket);
}
Rdsys { resources } => {
// If the bridgetable is not being loaded from an existing database, we will populate the
// bridgetable with all of the working bridges received from rdsys.
if context.bridgetable_is_empty() {
if let Some(working_resources) = resources.working {
let bridgelines = parse_into_bridgelines(working_resources);
context.metrics.new_bridges.inc_by(bridgelines.len() as u64);
let (buckets, leftovers) = parse_into_buckets(bridgelines);
for leftover in leftovers {
context.append_extra_bridges(leftover);
}
context.populate_bridgetable(buckets, bridge_config.percent_spares);
// otherwise, we need to sync the existing bridgetable with the resources we receive from
// rdsys and ensure that all functioning bridges are correctly placed in the bridgetable
// those that have changed are updated and those that have been failing tests for an extended
// period of time are removed.
// If bridges are labelled as blocked_in, we should also handle blocking behaviour.
}
} else {
context.sync_with_bridgetable(resources);
}
if let Some(changed_resources) = resourcediff.changed {
for pt in changed_resources {
println!("A NEW CHANGED RESOURCE: {:?}", pt);
if let Some(resources) = pt.1 {
for resource in resources {
let bridgeline = parse_resource(resource);
println!("BridgeLine to be changed: {:?}", bridgeline);
let res = context.update_bridge(bridgeline);
if res {
println!("BridgeLine successfully updated: {:?}", bridgeline);
} else {
println!("BridgeLine: {:?} not found in Lox's Bridgetable. Save it as a new resource for now!", bridgeline);
if context.extra_bridges.lock().unwrap().len() < 2 {
context.append_extra_bridges(bridgeline);
} else {
let bucket = context.remove_extra_bridges();
context.add_spare_bucket(bucket);
}
}
}
}
}
}
// gone resources are not the same as blocked resources.
// Instead, these are bridges which have either failed to pass tests for some period
// or have expired bridge descriptors. In both cases, the bridge is unusable, but this
// is not likely due to censorship. Therefore, we replace gone resources with new resources
// TODO: create a notion of blocked resources from information collected through various means:
// https://gitlab.torproject.org/tpo/anti-censorship/censorship-analysis/-/issues/40035
if let Some(gone_resources) = resourcediff.gone {
for pt in gone_resources {
println!("A NEW GONE RESOURCE: {:?}", pt);
if let Some(resources) = pt.1 {
for resource in resources {
// If resource last passed tests 3 hours ago, it should be replaced with a working
// resource and be removed from the bridgetable. If it has been gone for more than 7 hours,
// we should stop trying to remove it from the bridge table and assume it has successfully been
// removed already
if resource.last_passed < (Utc::now() - chrono::Duration::hours(3))
|| resource.last_passed
> (Utc::now() - chrono::Duration::hours(7))
{
let bridgeline = parse_resource(resource);
println!("BridgeLine to be replaced: {:?}", bridgeline);
let res = context.replace_with_new(bridgeline);
if res == lox_library::ReplaceSuccess::Replaced {
println!(
"BridgeLine successfully replaced: {:?}",
bridgeline
);
} else if res == lox_library::ReplaceSuccess::NotReplaced {
// Add the bridge to the list of to_be_replaced bridges in the Lox context and try
// again to replace at the next update (nothing changes in the Lox Authority)
println!(
"'Gone' BridgeLine NOT replaced, saved for next update! : {:?}",
bridgeline
);
context.new_to_be_replaced_bridge(bridgeline);
}
}
}
}
}
}
/* Functionality for marking bridges as unreachable/blocked is currently not enabled as there is not
yet a reliable way to determine that a bridge is blocked. This means that migrations to unblocked bridges do not
currently work but can be easily enabled with a list of `blocked resources` from rdsys or another source with something
like the following:
println!("BridgeLine to be removed: {:?}", bridgeline);
let res = context.add_unreachable(bridgeline);
if res {
println!(
"BridgeLine successfully marked unreachable: {:?}",
bridgeline
);
} else {
println!("'Gone' BridgeLine NOT REMOVED!! : {:?}", bridgeline);
//TODO probably do something else here
}
*/
// Handle any bridges that are leftover in the bridge authority from the sync
context.allocate_leftover_bridges();
context.encrypt_table();
write_context_to_file(context.clone());
lox_db.write_context(context.clone());
sleep(Duration::from_millis(1)).await;
}
Request { req, sender } => {
@ -299,9 +231,11 @@ async fn context_manager(db_path: Option<PathBuf>, mut context_rx: mpsc::Receive
if let Err(e) = sender.send(response) {
eprintln!("Server Response Error: {:?}", e);
};
lox_db.write_context(context.clone());
sleep(Duration::from_millis(1)).await;
}
Shutdown { shutdown_sig } => {
lox_db.write_context(context.clone());
println!("Sending Shutdown Signal, all threads should shutdown.");
drop(shutdown_sig);
println!("Shutdown Sent.");
@ -314,7 +248,7 @@ async fn context_manager(db_path: Option<PathBuf>, mut context_rx: mpsc::Receive
#[derive(Debug)]
enum Command {
Rdsys {
resourcediff: ResourceDiff,
resources: ResourceState,
},
Request {
req: Request<Body>,
@ -332,8 +266,7 @@ async fn main() {
let file = File::open(&args.config).expect("Could not read config file");
let reader = BufReader::new(file);
// Read the JSON contents of the file as a ResourceInfo
let rtype: ResourceInfo =
serde_json::from_reader(reader).expect("Reading ResourceInfo from JSON failed.");
let config: Config = serde_json::from_reader(reader).expect("Reading Config from JSON failed.");
let (rdsys_tx, context_rx) = mpsc::channel(32);
let request_tx = rdsys_tx.clone();
@ -342,6 +275,7 @@ async fn main() {
// create the shutdown broadcast channel and clone for every thread
let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16);
let kill_stream = shutdown_tx.subscribe();
let kill_metrics = shutdown_tx.subscribe();
let kill_parser = shutdown_tx.subscribe();
let kill_context = shutdown_tx.subscribe();
@ -360,12 +294,27 @@ async fn main() {
}
});
let metrics = Metrics::default();
let registry = metrics.register();
let metrics_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.metrics_port);
let metrics_handler =
spawn(async move { start_metrics_collector(metrics_addr, registry, kill_metrics).await });
let context_manager = spawn(async move {
create_context_manager(args.backup_context, context_rx, kill_context).await
create_context_manager(
config.db,
config.bridge_config,
args.roll_back_date,
metrics,
context_rx,
kill_context,
)
.await
});
let (tx, rx) = mpsc::channel(32);
let rdsys_stream_handler = spawn(async { rdsys_stream(rtype, tx, kill_stream).await });
let rdsys_request_handler = spawn(async { rdsys_stream(config.rtype, tx, kill_stream).await });
let rdsys_resource_receiver =
spawn(async { rdsys_bridge_parser(rdsys_tx, rx, kill_parser).await });
@ -395,7 +344,8 @@ async fn main() {
eprintln!("server error: {}", e);
}
future::join_all([
rdsys_stream_handler,
metrics_handler,
rdsys_request_handler,
rdsys_resource_receiver,
context_manager,
shutdown_handler,

View File

@ -0,0 +1,187 @@
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry};
use std::{future::Future, io, net::SocketAddr, pin::Pin, sync::Arc};
use tokio::signal::unix::{signal, SignalKind};
#[derive(Debug, Clone)]
pub struct Metrics {
pub existing_or_updated_bridges: Counter,
pub new_bridges: Counter,
pub removed_bridges: Counter,
pub blocked_bridges: Counter,
pub open_inv_count: Counter,
pub trust_promo_count: Counter,
pub trust_mig_count: Counter,
pub level_up_count: Counter,
pub issue_invite_count: Counter,
pub redeem_invite_count: Counter,
pub check_blockage_count: Counter,
pub blockage_migration_count: Counter,
pub k_reset_count: Counter,
pub invites_requested: Counter,
}
impl Default for Metrics {
fn default() -> Self {
// Create counters.
let existing_or_updated_bridges = Counter::default();
let new_bridges = Counter::default();
let removed_bridges = Counter::default();
let blocked_bridges = Counter::default();
let open_inv_count = Counter::default();
let trust_promo_count = Counter::default();
let trust_mig_count = Counter::default();
let level_up_count = Counter::default();
let issue_invite_count = Counter::default();
let redeem_invite_count = Counter::default();
let check_blockage_count = Counter::default();
let blockage_migration_count = Counter::default();
let k_reset_count = Counter::default();
let invites_requested = Counter::default();
Metrics {
existing_or_updated_bridges,
new_bridges,
removed_bridges,
blocked_bridges,
open_inv_count,
trust_promo_count,
trust_mig_count,
level_up_count,
issue_invite_count,
redeem_invite_count,
check_blockage_count,
blockage_migration_count,
k_reset_count,
invites_requested,
}
}
}
impl Metrics {
pub fn register(&self) -> Registry {
// Create a Registry and register Counter.
let mut r = <Registry>::with_prefix("lox-metrics");
r.register(
"existing_or_updated_bridges",
"number of existing or updated bridges recorded at rdsys sync",
self.existing_or_updated_bridges.clone(),
);
r.register(
"new_bridges",
"number of new bridges added to bridge table",
self.new_bridges.clone(),
);
r.register(
"removed_bridges",
"number of bridges removed from the bridgetable",
self.removed_bridges.clone(),
);
r.register(
"blocked_bridges",
"number of bridges blocked",
self.blocked_bridges.clone(),
);
r.register(
"open_inv_counter",
"number of open invitations distributed",
self.open_inv_count.clone(),
);
r.register(
"trust_promo_counter",
"number of trust promotions requests",
self.trust_promo_count.clone(),
);
r.register(
"trust_mig_counter",
"number of trust migrations requests",
self.trust_mig_count.clone(),
);
r.register(
"level_up_counter",
"number of level up requests",
self.level_up_count.clone(),
);
r.register(
"issue_invite_counter",
"number of issue invite requests",
self.issue_invite_count.clone(),
);
r.register(
"redeem_invite_counter",
"number of level up requests",
self.redeem_invite_count.clone(),
);
r.register(
"check_blockage_counter",
"number of check blockage requests",
self.check_blockage_count.clone(),
);
r.register(
"blockage_migration_counter",
"number of blockage migration requests",
self.blockage_migration_count.clone(),
);
r.register(
"k_reset_counter",
"number of times k has reset to 0",
self.k_reset_count.clone(),
);
r.register(
"invites_requested",
"number of invites requested",
self.invites_requested.clone(),
);
r
}
}
/// Start a HTTP server to report metrics.
pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();
eprintln!("Starting metrics server on {metrics_addr}");
let registry = Arc::new(registry);
Server::bind(&metrics_addr)
.serve(make_service_fn(move |_conn| {
let registry = registry.clone();
async move {
let handler = make_handler(registry);
Ok::<_, io::Error>(service_fn(handler))
}
}))
.with_graceful_shutdown(async move {
shutdown_stream.recv().await;
})
.await
.unwrap();
}
/// This function returns a HTTP handler (i.e. another function)
pub fn make_handler(
registry: Arc<Registry>,
) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> {
// This closure accepts a request and responds with the OpenMetrics encoding of our metrics.
move |_req: Request<Body>| {
let reg = registry.clone();
Box::pin(async move {
let mut buf = String::new();
encode(&mut buf, &reg.clone())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
let body = Body::from(buf);
Response::builder()
.header(
hyper::header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(body)
.unwrap()
})
})
}
}

View File

@ -74,9 +74,11 @@ pub async fn handle(
#[cfg(test)]
mod tests {
use crate::lox_context;
use crate::metrics::Metrics;
use super::*;
use base64::{engine::general_purpose, Engine as _};
use chrono::{Duration, Utc};
use julianday::JulianDay;
use lox_library::{
@ -235,6 +237,7 @@ mod tests {
ba: Arc::new(Mutex::new(lox_auth)),
extra_bridges: Arc::new(Mutex::new(Vec::new())),
to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())),
metrics: Metrics::default(),
};
Self { context }
}
@ -298,7 +301,7 @@ mod tests {
rng.fill_bytes(&mut cert);
let infostr: String = format!(
"obfs4 cert={}, iat-mode=0",
base64::encode_config(cert, base64::STANDARD_NO_PAD)
general_purpose::STANDARD_NO_PAD.encode(cert)
);
res.info[..infostr.len()].copy_from_slice(infostr.as_bytes());
res

View File

@ -1,13 +1,19 @@
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES};
use chrono::{Duration, Utc};
use lox_library::bridge_table::{BridgeLine, BRIDGE_BYTES, MAX_BRIDGES_PER_BUCKET};
use rdsys_backend::proto::Resource;
pub fn parse_resource(resource: Resource) -> BridgeLine {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
let resource_uid = resource
.get_uid()
.expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
pub const ACCEPTED_HOURS_OF_FAILURE: i64 = 3;
// Parse each resource from rdsys into a Bridgeline as expected by the Lox Bridgetable
pub fn parse_into_bridgelines(resources: Vec<Resource>) -> Vec<BridgeLine> {
let mut bridgelines: Vec<BridgeLine> = Vec::new();
for resource in resources {
let mut ip_bytes: [u8; 16] = [0; 16];
ip_bytes[..resource.address.len()].copy_from_slice(resource.address.as_bytes());
let resource_uid = resource
.get_uid()
.expect("Unable to get Fingerprint UID of resource");
let infostr: String = format!(
"type={} blocked_in={:?} protocol={} fingerprint={:?} or_addresses={:?} distribution={} flags={:?} params={:?}",
resource.r#type,
resource.blocked_in,
@ -18,13 +24,167 @@ pub fn parse_resource(resource: Resource) -> BridgeLine {
resource.flags,
resource.params,
);
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
let mut info_bytes: [u8; BRIDGE_BYTES - 26] = [0; BRIDGE_BYTES - 26];
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
info_bytes[..infostr.len()].copy_from_slice(infostr.as_bytes());
bridgelines.push(BridgeLine {
addr: ip_bytes,
port: resource.port,
uid_fingerprint: resource_uid,
info: info_bytes,
})
}
bridgelines
}
// Allocate each Bridgeline into a bucket that will later be allocated into spare buckets or open invitation buckets
// Any leftover buckets from total_bridgelines % MAX_BRIDGES_PER_BUCKET are returned in a separate Vec<Bridgeline>
// TODO: Improve this function to sort bridgelines into buckets in a more intentional manner. This could include
// sorting bridgelines with high bandwidth into buckets that are only distributed to more trusted users or sorting
// bridgelines by location
pub fn parse_into_buckets(
mut bridgelines: Vec<BridgeLine>,
) -> (Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]>, Vec<BridgeLine>) {
let mut buckets: Vec<[BridgeLine; MAX_BRIDGES_PER_BUCKET]> = Vec::new();
let mut count = 0;
let mut bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
let mut leftovers: Vec<BridgeLine> = Vec::new();
for bridgeline in bridgelines.clone() {
println!(
"Added bridge with fingerprint: {:?}",
bridgeline.uid_fingerprint
);
if count < MAX_BRIDGES_PER_BUCKET {
bucket[count] = bridgeline;
count += 1;
} else {
buckets.push(bucket);
count = 0;
bucket = [BridgeLine::default(); MAX_BRIDGES_PER_BUCKET];
}
}
// Handle the extra buckets that were not allocated already
if count != 0 {
for _ in 0..count {
// Assumes that the unallocated bridgelines will be the last x of the passed bridgelines
leftovers.push(bridgelines.pop().unwrap());
}
}
(buckets, leftovers)
}
// Sort Resources into those that are functional and those that are failing based on the last time
// they were passing tests. Before passing them back to the calling function, they are parsed into
// BridgeLines
pub fn sort_for_parsing(resources: Vec<Resource>) -> (Vec<BridgeLine>, Vec<BridgeLine>) {
let mut grace_period: Vec<Resource> = Vec::new();
let mut failing: Vec<Resource> = Vec::new();
for resource in resources {
// TODO: Maybe filter for untested resources first if last_passed alone would skew
// the filter in an unintended direction
if resource.test_result.last_passed + Duration::hours(ACCEPTED_HOURS_OF_FAILURE)
>= Utc::now()
{
grace_period.push(resource);
} else {
failing.push(resource);
}
}
let grace_period_bridgelines = parse_into_bridgelines(grace_period);
let failing_bridgelines = parse_into_bridgelines(failing);
(grace_period_bridgelines, failing_bridgelines)
}
#[cfg(test)]
mod tests {
use rdsys_backend::proto::{Resource, TestResults};
use std::collections::HashMap;
use chrono::{Duration, Utc};
use super::sort_for_parsing;
pub fn make_resource(
rtype: String,
address: String,
port: u16,
fingerprint: String,
last_passed: i64,
) -> Resource {
let mut flags = HashMap::new();
flags.insert(String::from("fast"), true);
flags.insert(String::from("stable"), true);
let mut params = HashMap::new();
params.insert(
String::from("password"),
String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"),
);
Resource {
r#type: String::from(rtype),
blocked_in: HashMap::new(),
test_result: TestResults {
last_passed: Utc::now() - Duration::hours(last_passed),
},
protocol: String::from("tcp"),
address: String::from(address),
port: port,
fingerprint: String::from(fingerprint),
or_addresses: None,
distribution: String::from("https"),
flags: Some(flags),
params: Some(params),
}
}
#[test]
fn test_sort_for_parsing() {
let resource_one = make_resource(
"scramblesuit".to_owned(),
"123.456.789.100".to_owned(),
3002,
"BE84A97D02130470A1C77839954392BA979F7EE1".to_owned(),
2,
);
let resource_two = make_resource(
"https".to_owned(),
"123.222.333.444".to_owned(),
6002,
"C56B9EF202130470A1C77839954392BA979F7FF9".to_owned(),
5,
);
let resource_three = make_resource(
"scramblesuit".to_owned(),
"444.888.222.100".to_owned(),
3042,
"1A4C8BD902130470A1C77839954392BA979F7B46".to_owned(),
4,
);
let resource_four = make_resource(
"https".to_owned(),
"555.444.212.100".to_owned(),
8022,
"FF024DC302130470A1C77839954392BA979F7AE2".to_owned(),
3,
);
let resource_five = make_resource(
"https".to_owned(),
"234.111.212.100".to_owned(),
10432,
"7B4DE14CB2130470A1C77839954392BA979F7AE2".to_owned(),
1,
);
let mut test_vec: Vec<Resource> = Vec::new();
test_vec.push(resource_one);
test_vec.push(resource_two);
test_vec.push(resource_three);
test_vec.push(resource_four);
test_vec.push(resource_five);
let (functional, failing) = sort_for_parsing(test_vec);
assert!(
functional.len() == 2,
"There should be 2 functional bridges"
);
assert!(failing.len() == 3, "There should be 3 failing bridges");
}
}

View File

@ -9,28 +9,24 @@ description = "Main Lox library with protocols and functions that that make up L
keywords = ["tor", "lox", "bridges"]
[dependencies]
curve25519-dalek = { package = "curve25519-dalek-ng", version = "3", default-features = false, features = ["serde", "std"] }
ed25519-dalek = { version = "1", features = ["serde"] }
# zkp = { version = "0.8", features = ["debug-transcript"] }
zkp = "0.8"
curve25519-dalek = { version = "4", default-features = false, features = ["serde", "rand_core", "digest"] }
ed25519-dalek = { version = "2", features = ["serde", "rand_core"] }
bincode = "1"
chrono = "0.4"
rand = "0.7"
serde = "1.0.188"
serde_with = {version = "3.3.0", features = ["json"]}
sha2 = "0.9"
rand = { version = "0.8", features = ["std_rng"]}
serde = "1.0.190"
serde_with = {version = "3.4.0", features = ["json"]}
sha2 = "0.10"
statistical = "1.0.0"
lazy_static = "1"
hex_fmt = "0.3"
aes-gcm = "0.8"
base64 = "0.13"
time = "0.3.28"
subtle = "2.4"
thiserror = "1.0.48"
aes-gcm = { version = "0.10", features =["aes"]}
base64 = "0.21"
time = "0.3.30"
prometheus = "0.13.3"
subtle = "2.5"
thiserror = "1.0.49"
zkp = { git = "https://gitlab.torproject.org/onyinyang/lox-zkp" }
[features]
default = ["u64_backend"]
u32_backend = ["curve25519-dalek/u32_backend"]
u64_backend = ["curve25519-dalek/u64_backend"]
simd_backend = ["curve25519-dalek/simd_backend"]
fast = []

View File

@ -11,8 +11,9 @@ use super::cred;
use super::IssuerPrivKey;
use super::CMZ_B_TABLE;
use aes_gcm::aead;
use aes_gcm::aead::{generic_array::GenericArray, Aead, NewAead};
use aes_gcm::Aes128Gcm;
use aes_gcm::aead::{generic_array::GenericArray, Aead};
use aes_gcm::{Aes128Gcm, KeyInit};
use base64::{engine::general_purpose, Engine as _};
use curve25519_dalek::ristretto::CompressedRistretto;
use curve25519_dalek::ristretto::RistrettoBasepointTable;
use curve25519_dalek::scalar::Scalar;
@ -149,8 +150,12 @@ impl BridgeLine {
let date = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
let (optP, optQ) = if date > 0 {
(
CompressedRistretto::from_slice(&data[pos + 4..pos + 36]).decompress(),
CompressedRistretto::from_slice(&data[pos + 36..]).decompress(),
CompressedRistretto::from_slice(&data[pos + 4..pos + 36])
.expect("Unable to extract P from bucket")
.decompress(),
CompressedRistretto::from_slice(&data[pos + 36..])
.expect("Unable to extract Q from bucket")
.decompress(),
)
} else {
(None, None)
@ -170,6 +175,7 @@ impl BridgeLine {
(bridges, None)
}
}
/// Create a random BridgeLine for testing
#[cfg(test)]
pub fn random() -> Self {
@ -197,7 +203,7 @@ impl BridgeLine {
rng.fill_bytes(&mut cert);
let infostr: String = format!(
"obfs4 cert={}, iat-mode=0",
base64::encode_config(cert, base64::STANDARD_NO_PAD)
general_purpose::STANDARD_NO_PAD.encode(cert)
);
res.info[..infostr.len()].copy_from_slice(infostr.as_bytes());
res

View File

@ -4,8 +4,7 @@
This implementation just keeps the table of seen ids in memory, but a
production one would of course use a disk-backed database. */
use std::cmp::Eq;
use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
use serde::{Deserialize, Serialize};
@ -14,7 +13,7 @@ use serde::{Deserialize, Serialize};
/// seen ids. IdType will typically be Scalar.
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct DupFilter<IdType: Hash + Eq + Copy + Serialize> {
seen_table: HashMap<IdType, ()>,
seen_table: HashSet<IdType>,
}
/// A return type indicating whether the item was fresh (not previously
@ -30,7 +29,7 @@ impl<IdType: Hash + Eq + Copy + Serialize> DupFilter<IdType> {
/// to the seen table. Return Seen if it is already in the table,
/// Fresh if not.
pub fn check(&self, id: &IdType) -> SeenType {
if self.seen_table.contains_key(id) {
if self.seen_table.contains(id) {
SeenType::Seen
} else {
SeenType::Fresh
@ -41,9 +40,9 @@ impl<IdType: Hash + Eq + Copy + Serialize> DupFilter<IdType> {
/// table, and add it if not. Return Fresh if it was not already
/// in the table, and Seen if it was.
pub fn filter(&mut self, id: &IdType) -> SeenType {
match self.seen_table.insert(*id, ()) {
None => SeenType::Fresh,
Some(()) => SeenType::Seen,
match self.seen_table.insert(*id) {
true => SeenType::Fresh,
false => SeenType::Seen,
}
}
}

View File

@ -22,6 +22,7 @@ pub mod cred;
pub mod dup_filter;
pub mod migration_table;
use chrono::Duration;
use chrono::{DateTime, Utc};
use sha2::Sha512;
@ -36,7 +37,7 @@ use rand::Rng;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use ed25519_dalek::{Keypair, PublicKey, Signature, SignatureError, Signer, Verifier};
use ed25519_dalek::{Signature, SignatureError, Signer, SigningKey, Verifier, VerifyingKey};
use subtle::ConstantTimeEq;
use std::collections::HashSet;
@ -57,7 +58,7 @@ lazy_static! {
pub static ref CMZ_B: RistrettoPoint = dalek_constants::RISTRETTO_BASEPOINT_POINT;
pub static ref CMZ_A_TABLE: RistrettoBasepointTable = RistrettoBasepointTable::create(&CMZ_A);
pub static ref CMZ_B_TABLE: RistrettoBasepointTable =
dalek_constants::RISTRETTO_BASEPOINT_TABLE;
dalek_constants::RISTRETTO_BASEPOINT_TABLE.clone();
}
// EXPIRY_DATE is set to EXPIRY_DATE days for open-entry and blocked buckets in order to match
@ -78,6 +79,12 @@ pub enum NoAvailableIDError {
ExhaustedIndexer,
}
#[derive(Error, Debug)]
pub enum ExceededMaxBridgesError {
#[error("The maximum number of bridges has already been distributed today, please try again tomorrow!")]
ExceededMaxBridges,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IssuerPrivKey {
x0tilde: Scalar,
@ -123,20 +130,26 @@ impl IssuerPubKey {
}
}
// Number of times a given invitation is ditributed
pub const OPENINV_K: u32 = 10;
// TODO: Decide on maximum daily number of invitations to be distributed
pub const MAX_DAILY_BRIDGES: u32 = 100;
/// The BridgeDb. This will typically be a singleton object. The
/// BridgeDb's role is simply to issue signed "open invitations" to
/// people who are not yet part of the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct BridgeDb {
/// The keypair for signing open invitations
keypair: Keypair,
keypair: SigningKey,
/// The public key for verifying open invitations
pub pubkey: PublicKey,
pub pubkey: VerifyingKey,
/// The set of open-invitation buckets
openinv_buckets: HashSet<u32>,
distributed_buckets: Vec<u32>,
current_k: u32,
#[serde(skip)]
today: DateTime<Utc>,
pub current_k: u32,
pub daily_bridges_distributed: u32,
}
/// An open invitation is a [u8; OPENINV_LENGTH] where the first 32
@ -152,14 +165,16 @@ impl BridgeDb {
/// Create the BridgeDb.
pub fn new() -> Self {
let mut csprng = OsRng {};
let keypair = Keypair::generate(&mut csprng);
let pubkey = keypair.public;
let keypair = SigningKey::generate(&mut csprng);
let pubkey = keypair.verifying_key();
Self {
keypair,
pubkey,
openinv_buckets: Default::default(),
distributed_buckets: Default::default(),
today: Utc::now(),
current_k: 0,
daily_bridges_distributed: 0,
}
}
@ -189,30 +204,39 @@ impl BridgeDb {
/// Produce an open invitation such that the next k users, where k is <
/// OPENINV_K, will receive the same open invitation bucket
/// chosen randomly from the set of open-invitation buckets.
pub fn invite(&mut self) -> [u8; OPENINV_LENGTH] {
pub fn invite(&mut self) -> Result<[u8; OPENINV_LENGTH], ExceededMaxBridgesError> {
let mut res: [u8; OPENINV_LENGTH] = [0; OPENINV_LENGTH];
let mut rng = rand::thread_rng();
// Choose a random invitation id (a Scalar) and serialize it
let id = Scalar::random(&mut rng);
res[0..32].copy_from_slice(&id.to_bytes());
let bucket_num: u32;
if self.current_k < OPENINV_K && !self.distributed_buckets.is_empty() {
bucket_num = *self.distributed_buckets.last().unwrap();
self.current_k += 1;
} else {
// Choose a random bucket number (from the set of open
// invitation buckets) and serialize it
let openinv_vec: Vec<&u32> = self.openinv_buckets.iter().collect();
bucket_num = *openinv_vec[rng.gen_range(0, openinv_vec.len())];
self.mark_distributed(bucket_num);
self.remove_openinv(&bucket_num);
self.current_k = 1;
if Utc::now() >= (self.today + Duration::days(1)) {
self.today = Utc::now();
self.daily_bridges_distributed = 0;
}
if self.daily_bridges_distributed < MAX_DAILY_BRIDGES {
if self.current_k < OPENINV_K && !self.distributed_buckets.is_empty() {
bucket_num = *self.distributed_buckets.last().unwrap();
self.current_k += 1;
} else {
// Choose a random bucket number (from the set of open
// invitation buckets) and serialize it
let openinv_vec: Vec<&u32> = self.openinv_buckets.iter().collect();
bucket_num = *openinv_vec[rng.gen_range(0..openinv_vec.len())];
self.mark_distributed(bucket_num);
self.remove_openinv(&bucket_num);
self.current_k = 1;
self.daily_bridges_distributed += 1;
}
res[32..(32 + 4)].copy_from_slice(&bucket_num.to_le_bytes());
// Sign the first 36 bytes and serialize it
let sig = self.keypair.sign(&res[0..(32 + 4)]);
res[(32 + 4)..].copy_from_slice(&sig.to_bytes());
Ok(res)
} else {
Err(ExceededMaxBridgesError::ExceededMaxBridges)
}
res[32..(32 + 4)].copy_from_slice(&bucket_num.to_le_bytes());
// Sign the first 36 bytes and serialize it
let sig = self.keypair.sign(&res[0..(32 + 4)]);
res[(32 + 4)..].copy_from_slice(&sig.to_bytes());
res
}
/// Verify an open invitation. Returns the invitation id and the
@ -221,7 +245,7 @@ impl BridgeDb {
/// before.
pub fn verify(
invitation: [u8; OPENINV_LENGTH],
pubkey: PublicKey,
pubkey: VerifyingKey,
) -> Result<(Scalar, u32), SignatureError> {
// Pull out the signature and verify it
let sig = Signature::try_from(&invitation[(32 + 4)..])?;
@ -229,11 +253,13 @@ impl BridgeDb {
// The signature passed. Pull out the bucket number and then
// the invitation id
let bucket = u32::from_le_bytes(invitation[32..(32 + 4)].try_into().unwrap());
match Scalar::from_canonical_bytes(invitation[0..32].try_into().unwrap()) {
let s = Scalar::from_canonical_bytes(invitation[0..32].try_into().unwrap());
if s.is_some().into() {
return Ok((s.unwrap(), bucket));
} else {
// It should never happen that there's a valid signature on
// an invalid serialization of a Scalar, but check anyway.
None => Err(SignatureError::new()),
Some(s) => Ok((s, bucket)),
return Err(SignatureError::new());
}
}
}
@ -269,7 +295,7 @@ pub struct BridgeAuth {
pub invitation_pub: IssuerPubKey,
/// The public key of the BridgeDb issuing open invitations
pub bridgedb_pub: PublicKey,
pub bridgedb_pub: VerifyingKey,
/// The bridge table
pub bridge_table: BridgeTable,
@ -294,7 +320,7 @@ pub struct BridgeAuth {
}
impl BridgeAuth {
pub fn new(bridgedb_pub: PublicKey) -> Self {
pub fn new(bridgedb_pub: VerifyingKey) -> Self {
// Create the private and public keys for each of the types of
// credential, each with the appropriate number of attributes
let lox_priv = IssuerPrivKey::new(6);
@ -330,6 +356,10 @@ impl BridgeAuth {
}
}
pub fn is_empty(&mut self) -> bool {
self.bridge_table.buckets.is_empty()
}
/// Insert a set of open invitation bridges.
///
/// Each of the bridges will be given its own open invitation
@ -376,19 +406,17 @@ impl BridgeAuth {
Ok(())
}
// TODO Ensure synchronization of Lox bridge_table with rdsys
pub fn sync_table(&mut self) {
// Create a hashtable (?) of bridges in the lox distributor from new resources
// accept the hashtable and recreate the bridge table from the hash table here
// using existing reachable bridges, other table checks and placements from existing bridge table
// If bridges are in reachable bridges, put them in the table with their Vec
// How to check for bridges that aren't there/are extra?
// After going through the update, make sure bridges in the table are the same and deal with discrepencies
// This will be the bad/annoying part
//also use open_inv_keys and blocked_keys from bridge_table to remove expired keys from table.
// make sure this happens before they are removed from the structures in the bridge table
pub fn find_and_remove_unaccounted_for_bridges(
&mut self,
accounted_for_bridges: Vec<u64>,
) -> Vec<BridgeLine> {
let mut unaccounted_for: Vec<BridgeLine> = Vec::new();
for (k, _v) in self.bridge_table.reachable.clone() {
if !accounted_for_bridges.contains(&k.uid_fingerprint) {
unaccounted_for.push(k);
}
}
unaccounted_for
}
pub fn allocate_bridges(
@ -428,16 +456,11 @@ impl BridgeAuth {
let reachable_bridges = self.bridge_table.reachable.clone();
for reachable_bridge in reachable_bridges {
if reachable_bridge.0.uid_fingerprint == bridge.uid_fingerprint {
println!(
"Bridge from table: {:?} has same IP and Port as bridge {:?}!",
reachable_bridge.0, bridge
);
// Now we must remove the old bridge from the table and insert the new bridge in its place
// i.e., in the same bucket and with the same permissions.
let positions = self.bridge_table.reachable.get(&reachable_bridge.0);
if let Some(v) = positions {
for (bucketnum, offset) in v.iter() {
println!("Bucket num: {:?} and offset: {:?}", bucketnum, offset);
let mut bridgelines = match self.bridge_table.buckets.get(bucketnum) {
Some(bridgelines) => *bridgelines,
None => return res,
@ -445,11 +468,9 @@ impl BridgeAuth {
assert!(bridgelines[*offset] == reachable_bridge.0);
bridgelines[*offset] = *bridge;
self.bridge_table.buckets.insert(*bucketnum, bridgelines);
let bridgelines = match self.bridge_table.buckets.get(bucketnum) {
Some(bridgelines) => *bridgelines,
None => return res,
};
assert!(bridgelines[*offset] != reachable_bridge.0);
if self.bridge_table.buckets.get(bucketnum).is_none() {
return res;
}
}
res = true;
} else {
@ -776,6 +797,11 @@ impl BridgeAuth {
.iter()
.partition(|&x| x.1 + EXPIRY_DATE < self.today());
for item in expired {
// We should check that the items were actually distributed before they are removed
if !bdb.distributed_buckets.contains(&item.0) {
// TODO: Add prometheus metric for this?
println!("This bucket was not actually distributed!");
}
let new_item = item.0;
bdb.remove_blocked_or_expired_buckets(&new_item);
// Remove any trust upgrade migrations from this

View File

@ -16,8 +16,8 @@ use curve25519_dalek::scalar::Scalar;
use sha2::Digest;
use sha2::Sha256;
use aes_gcm::aead::{generic_array::GenericArray, Aead, NewAead};
use aes_gcm::Aes128Gcm;
use aes_gcm::aead::{generic_array::GenericArray, Aead};
use aes_gcm::{Aes128Gcm, KeyInit};
use rand::RngCore;
use std::collections::HashMap;
@ -249,8 +249,12 @@ pub fn decrypt_cred(
let mut to_bucket_bytes: [u8; 32] = [0; 32];
to_bucket_bytes.copy_from_slice(&plaintextbytes[..32]);
let to_bucket = Scalar::from_bytes_mod_order(to_bucket_bytes);
let P = CompressedRistretto::from_slice(&plaintextbytes[32..64]).decompress()?;
let Q = CompressedRistretto::from_slice(&plaintextbytes[64..]).decompress()?;
let P = CompressedRistretto::from_slice(&plaintextbytes[32..64])
.expect("Unable to extract P from bucket")
.decompress()?;
let Q = CompressedRistretto::from_slice(&plaintextbytes[64..])
.expect("Unable to extract Q from bucket")
.decompress()?;
Some(Migration {
P,

View File

@ -26,9 +26,7 @@ and a new Lox credential to be issued:
- trust_level: revealed to be 2 less than the trust_level above
- level_since: today
- invites_remaining: revealed to be LEVEL_INVITATIONS for the new trust
level [Actually, there's a bug in the zkp crate that's triggered when
a public value is 0 (the identity element of the Ristretto group), so
we treat this field as blinded, but the _server_ encrypts the value.]
level
- blockages: blinded, but proved in ZK that it's one more than the
blockages above
@ -104,11 +102,9 @@ pub struct Response {
// The fields for the new Lox credential
P: RistrettoPoint,
EncQ: (RistrettoPoint, RistrettoPoint),
EncInvRemain: (RistrettoPoint, RistrettoPoint),
id_server: Scalar,
TId: RistrettoPoint,
TBucket: RistrettoPoint,
TInvRemain: RistrettoPoint,
TBlockages: RistrettoPoint,
// The ZKP
@ -155,11 +151,10 @@ define_proof! {
blindissue,
"Blockage Migration Blind Issuing",
(x0, x0tilde, xid, xbucket, xlevel, xsince, xinvremain, xblockages,
s, b, tid, tbucket, tinvremain, tblockages),
s, b, tid, tbucket, tblockages),
(P, EncQ0, EncQ1, X0, Xid, Xbucket, Xlevel, Xsince, Xinvremain,
Xblockages, Plevel, Psince, TId, TBucket, TInvRemain, TBlockages,
D, EncId0, EncId1, EncBucket0, EncBucket1, EncInvRemain0,
EncInvRemain1, EncBlockages0, EncBlockages1),
Xblockages, Plevel, Psince, Pinvremain, TId, TBucket, TBlockages,
D, EncId0, EncId1, EncBucket0, EncBucket1, EncBlockages0, EncBlockages1),
(A, B):
Xid = (xid*A),
Xlevel = (xlevel*A),
@ -173,14 +168,12 @@ define_proof! {
TId = (tid*A),
TBucket = (b*Xbucket),
TBucket = (tbucket*A),
TInvRemain = (b*Xinvremain),
TInvRemain = (tinvremain*A),
TBlockages = (b*Xblockages),
TBlockages = (tblockages*A),
EncQ0 = (s*B + tid*EncId0 + tbucket*EncBucket0
+ tinvremain*EncInvRemain0 + tblockages*EncBlockages0),
+ tblockages*EncBlockages0),
EncQ1 = (s*D + tid*EncId1 + tbucket*EncBucket1
+ tinvremain*EncInvRemain1 + tblockages*EncBlockages1
+ tblockages*EncBlockages1
+ x0*P + xlevel*Plevel + xsince*Psince)
}
@ -288,7 +281,7 @@ pub fn request(
&migration_cred.to_bucket * Btable + ebucket * D,
);
let eblockages = Scalar::random(&mut rng);
let new_blockages = lox_cred.blockages + Scalar::one();
let new_blockages = lox_cred.blockages + Scalar::ONE;
let EncBlockages = (
&eblockages * Btable,
&new_blockages * Btable + eblockages * D,
@ -484,14 +477,6 @@ impl BridgeAuth {
// invitations for moving from level i to level i+1)
let invremain: Scalar = LEVEL_INVITATIONS[(level - 3) as usize].into();
// Because of the bug in the zkp crate, encrypt the invites
// remaining instead of sending it in the clear
let sinvremain = Scalar::random(&mut rng);
let EncInvRemain = (
&sinvremain * Btable,
&invremain * Btable + sinvremain * req.D,
);
// Compute the MAC on the visible attributes
let b = Scalar::random(&mut rng);
let P = &b * Btable;
@ -512,9 +497,6 @@ impl BridgeAuth {
let tbucket = self.lox_priv.x[2] * b;
let TBucket = &tbucket * Atable;
let EncQBucket = (tbucket * req.EncBucket.0, tbucket * req.EncBucket.1);
let tinvremain = self.lox_priv.x[5] * b;
let TInvRemain = &tinvremain * Atable;
let EncQInvRemain = (tinvremain * EncInvRemain.0, tinvremain * EncInvRemain.1);
let tblockages = self.lox_priv.x[6] * b;
let TBlockages = &tblockages * Atable;
let EncQBlockages = (
@ -523,8 +505,8 @@ impl BridgeAuth {
);
let EncQ = (
EncQHc.0 + EncQId.0 + EncQBucket.0 + EncQInvRemain.0 + EncQBlockages.0,
EncQHc.1 + EncQId.1 + EncQBucket.1 + EncQInvRemain.1 + EncQBlockages.1,
EncQHc.0 + EncQId.0 + EncQBucket.0 + EncQBlockages.0,
EncQHc.1 + EncQId.1 + EncQBucket.1 + EncQBlockages.1,
);
let mut transcript = Transcript::new(b"blockage migration issuing");
@ -545,17 +527,15 @@ impl BridgeAuth {
Xblockages: &self.lox_pub.X[6],
Plevel: &(trust_level * P),
Psince: &(level_since * P),
Pinvremain: &(invremain * P),
TId: &TId,
TBucket: &TBucket,
TInvRemain: &TInvRemain,
TBlockages: &TBlockages,
D: &req.D,
EncId0: &EncId.0,
EncId1: &EncId.1,
EncBucket0: &req.EncBucket.0,
EncBucket1: &req.EncBucket.1,
EncInvRemain0: &EncInvRemain.0,
EncInvRemain1: &EncInvRemain.1,
EncBlockages0: &req.EncBlockages.0,
EncBlockages1: &req.EncBlockages.1,
x0: &self.lox_priv.x[0],
@ -570,7 +550,6 @@ impl BridgeAuth {
b: &b,
tid: &tid,
tbucket: &tbucket,
tinvremain: &tinvremain,
tblockages: &tblockages,
},
)
@ -580,11 +559,9 @@ impl BridgeAuth {
level_since,
P,
EncQ,
EncInvRemain,
id_server,
TId,
TBucket,
TInvRemain,
TBlockages,
piBlindIssue,
})
@ -601,7 +578,6 @@ pub fn handle_response(
let A: &RistrettoPoint = &CMZ_A;
let B: &RistrettoPoint = &CMZ_B;
let Btable: &RistrettoBasepointTable = &CMZ_B_TABLE;
if resp.P.is_identity() {
return Err(ProofError::VerificationFailure);
}
@ -627,13 +603,6 @@ pub fn handle_response(
// moving from level i to level i+1)
let invremain: Scalar = LEVEL_INVITATIONS[(new_level - 1) as usize].into();
// Decrypt EncInvRemain
let recv_invremain = resp.EncInvRemain.1 - (state.d * resp.EncInvRemain.0);
if recv_invremain != &invremain * Btable {
return Err(ProofError::VerificationFailure);
}
// Verify the proof
let mut transcript = Transcript::new(b"blockage migration issuing");
blindissue::verify_compact(
@ -654,17 +623,15 @@ pub fn handle_response(
Xblockages: &lox_pub.X[6].compress(),
Plevel: &(state.trust_level * resp.P).compress(),
Psince: &(resp.level_since * resp.P).compress(),
Pinvremain: &(invremain * resp.P).compress(),
TId: &resp.TId.compress(),
TBucket: &resp.TBucket.compress(),
TInvRemain: &resp.TInvRemain.compress(),
TBlockages: &resp.TBlockages.compress(),
D: &state.D.compress(),
EncId0: &EncId.0.compress(),
EncId1: &EncId.1.compress(),
EncBucket0: &state.EncBucket.0.compress(),
EncBucket1: &state.EncBucket.1.compress(),
EncInvRemain0: &resp.EncInvRemain.0.compress(),
EncInvRemain1: &resp.EncInvRemain.1.compress(),
EncBlockages0: &state.EncBlockages.0.compress(),
EncBlockages1: &state.EncBlockages.1.compress(),
},

View File

@ -269,7 +269,7 @@ pub fn request(
// Ensure the credential can be correctly shown: it must be the case
// that invites_remaining not be 0
if lox_cred.invites_remaining == Scalar::zero() {
if lox_cred.invites_remaining == Scalar::ZERO {
return Err(ProofError::VerificationFailure);
}
// The buckets in the Lox and Bucket Reachability credentials have
@ -286,7 +286,7 @@ pub fn request(
return Err(ProofError::VerificationFailure);
}
// The new invites_remaining
let new_invites_remaining = lox_cred.invites_remaining - Scalar::one();
let new_invites_remaining = lox_cred.invites_remaining - Scalar::ONE;
// Blind showing the Lox credential

View File

@ -168,7 +168,7 @@ pub fn request(
// This protocol only allows migrating from trust level 0 to trust
// level 1
if lox_cred.trust_level != Scalar::zero() {
if lox_cred.trust_level != Scalar::ZERO {
return Err(ProofError::VerificationFailure);
}
@ -323,7 +323,7 @@ impl BridgeAuth {
}
// We only currently support migrating from trust level 0
if req.trust_level != Scalar::zero() {
if req.trust_level != Scalar::ZERO {
return Err(ProofError::VerificationFailure);
}
@ -387,7 +387,7 @@ impl BridgeAuth {
// Create the trust_level attrubute (Scalar), which will be
// level 1
let trust_level: Scalar = Scalar::one();
let trust_level: Scalar = Scalar::ONE;
// Create the level_since attribute (Scalar), which is today's
// Julian date
@ -513,7 +513,7 @@ pub fn handle_response(
Xlevel: &lox_pub.X[3].compress(),
Xsince: &lox_pub.X[4].compress(),
// The new trust level is 1
Plevel: &(Scalar::one() * resp.P).compress(),
Plevel: &(Scalar::ONE * resp.P).compress(),
Psince: &(resp.level_since * resp.P).compress(),
TId: &resp.TId.compress(),
TBucket: &resp.TBucket.compress(),
@ -533,9 +533,9 @@ pub fn handle_response(
Q,
id,
bucket: state.to_bucket,
trust_level: Scalar::one(),
trust_level: Scalar::ONE,
level_since: resp.level_since,
invites_remaining: Scalar::zero(),
blockages: Scalar::zero(),
invites_remaining: Scalar::ZERO,
blockages: Scalar::ZERO,
})
}

View File

@ -324,10 +324,10 @@ pub fn handle_response(
Q,
id,
bucket: resp.bucket,
trust_level: Scalar::zero(),
trust_level: Scalar::ZERO,
level_since: resp.level_since,
invites_remaining: Scalar::zero(),
blockages: Scalar::zero(),
invites_remaining: Scalar::ZERO,
blockages: Scalar::ZERO,
},
resp.bridge_line,
))

View File

@ -458,7 +458,7 @@ impl BridgeAuth {
let EncId = (req.EncIdClient.0, req.EncIdClient.1 + &id_server * Btable);
// The trust level for invitees is always 1
let level = Scalar::one();
let level = Scalar::ONE;
// The invites remaining for invitees is always 0 (as
// appropriate for trust level 1), so we don't need to actually
@ -611,9 +611,9 @@ pub fn handle_response(
Q,
id,
bucket: state.bucket,
trust_level: Scalar::one(),
trust_level: Scalar::ONE,
level_since: resp.level_since,
invites_remaining: Scalar::zero(),
invites_remaining: Scalar::ZERO,
blockages: state.blockages,
})
}

View File

@ -64,7 +64,7 @@ impl TestHarness {
fn open_invite(&mut self) -> (PerfStat, (cred::Lox, bridge_table::BridgeLine)) {
// Issue an open invitation
let inv = self.bdb.invite();
let inv = self.bdb.invite().unwrap();
let req_start = Instant::now();
// Use it to get a Lox credential
@ -394,17 +394,21 @@ fn test_open_invite() {
assert!(bridgeline == bucket.0[0]);
}
#[test]
fn test_k_invites() {
let mut th = TestHarness::new();
for i in 0..25 {
let _ = th.open_invite();
if (i+1) % OPENINV_K != 0 {
assert!(th.bdb.current_k == (i+1)%OPENINV_K, "the current_k should be (i+1)%OPENINV_K");
if (i + 1) % OPENINV_K != 0 {
assert!(
th.bdb.current_k == (i + 1) % OPENINV_K,
"the current_k should be (i+1)%OPENINV_K"
);
} else {
assert!(th.bdb.current_k == OPENINV_K, "the current_k should be OPENINV_K");
assert!(
th.bdb.current_k == OPENINV_K,
"the current_k should be OPENINV_K"
);
}
}
}
@ -871,7 +875,7 @@ fn block_bridges(th: &mut TestHarness, to_block: usize) {
let mut rng = rand::thread_rng();
while block_index.len() < to_block {
let rand_num = rng.gen_range(1, blockable_range);
let rand_num = rng.gen_range(1..blockable_range);
if !th.bdb.openinv_buckets.contains(&(rand_num as u32))
&& !th.bdb.distributed_buckets.contains(&(rand_num as u32))
&& !block_index.contains(&rand_num)
@ -1041,7 +1045,7 @@ fn test_bridge_replace() {
let table_size = th.ba.bridge_table.buckets.len();
let mut num = 100000;
while !th.ba.bridge_table.buckets.contains_key(&num) {
num = rand::thread_rng().gen_range(0, th.ba.bridge_table.counter);
num = rand::thread_rng().gen_range(0..th.ba.bridge_table.counter);
}
let replaceable_bucket = *th.ba.bridge_table.buckets.get(&num).unwrap();
let replacement_bridge = &replaceable_bucket[0];

View File

@ -10,7 +10,7 @@ fn test_bridgedb() {
for i in &[1u32, 5, 7, 12, 19, 20, 22] {
bdb.insert_openinv(*i);
}
let inv = bdb.invite();
let inv = bdb.invite().unwrap();
println!("{:?}", inv);
let res = BridgeDb::verify(inv, bdb.pubkey);
println!("{:?}", res);

View File

@ -14,8 +14,8 @@ repository = "https://gitlab.torproject.org/tpo/anti-censorship/lox.git/"
[dependencies]
lox-library = {path = "../lox-library", version = "0.1.0"}
serde = "1"
serde_json = "1.0.105"
serde_with = "3.3.0"
serde_json = "1.0.108"
serde_with = "3.4.0"
[features]

View File

@ -10,19 +10,20 @@ license = "MIT"
crate-type = ["cdylib"]
[dependencies]
getrandom = { version = "0.2", features = ["js"] }
julianday = "1.2.0"
lazy_static = "1.4.0"
lox-library = { path = "../lox-library", version = "0.1.0" }
lox_utils = { path = "../lox-utils", version = "0.1.0" }
wasm-bindgen = "0.2"
time = "0.3.28"
serde_json = "1.0.105"
time = "0.3.30"
serde_json = "1.0.108"
console_error_panic_hook = "0.1.7"
js-sys = "0.3.64"
js-sys = "0.3.65"
rand = { version = "0.7", features = ["wasm-bindgen"] }
zkp = "0.8.0"
zkp = { git = "https://gitlab.torproject.org/onyinyang/lox-zkp" }
[dependencies.chrono]
version = "0.4.27"
version = "0.4.31"
features = ["serde", "wasmbind"]

View File

@ -13,10 +13,10 @@ serde = { version = "1", features = ["derive"]}
bytes = "1"
hex = "0.4.3"
crc64 = "2.0.0"
sha1 = "0.10.5"
tokio = { version = "1", features = ["macros"]}
reqwest = { version = "0.11", features = ["stream"]}
sha1 = "0.10.6"
tokio = { version = "1", features = ["full", "macros"] }
reqwest = { version = "0.11", features = ["json", "stream"]}
tokio-stream = "0.1.14"
futures = "0.3.28"
tokio-util = "0.7.8"
chrono = { version = "0.4.27", features = ["serde", "clock"] }
futures = "0.3.29"
tokio-util = "0.7.10"
chrono = { version = "0.4.31", features = ["serde", "clock"] }

View File

@ -6,7 +6,7 @@
use bytes::{self, Buf, Bytes};
use core::pin::Pin;
use futures_util::{Stream, StreamExt};
use reqwest::Client;
use reqwest::{Client, StatusCode};
use std::io::{self, BufRead};
use std::task::{ready, Context, Poll};
use tokio::sync::mpsc;
@ -19,6 +19,7 @@ pub enum Error {
Reqwest(reqwest::Error),
Io(io::Error),
JSON(serde_json::Error),
String(StatusCode),
}
impl From<serde_json::Error> for Error {
@ -220,7 +221,7 @@ mod tests {
/// }
/// }
/// ```
///
pub async fn start_stream(
api_endpoint: String,
name: String,
@ -260,3 +261,39 @@ pub async fn start_stream(
});
Ok(ResourceStream::new(rx))
}
pub async fn request_resources(
api_endpoint: String,
name: String,
token: String,
resource_types: Vec<String>,
) -> Result<proto::ResourceState, Error> {
let fetched_resources: Result<proto::ResourceState, Error>;
let req = proto::ResourceRequest {
request_origin: name,
resource_types,
};
let json = serde_json::to_string(&req)?;
let auth_value = format!("Bearer {}", token);
let client = Client::new();
let response = client
.get(api_endpoint)
.header("Authorization", &auth_value)
.body(json)
.send()
.await
.unwrap();
match response.status() {
reqwest::StatusCode::OK => {
fetched_resources = match response.json::<proto::ResourceState>().await {
Ok(fetched_resources) => Ok(fetched_resources),
Err(e) => Err(Error::Reqwest(e)),
};
}
other => fetched_resources = Err(Error::String(other)),
};
fetched_resources
}

View File

@ -10,12 +10,17 @@ pub struct ResourceRequest {
pub resource_types: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct TestResults {
pub last_passed: DateTime<Utc>,
}
/// Representation of a bridge resource
#[derive(Deserialize, PartialEq, Eq, Debug)]
pub struct Resource {
pub r#type: String,
pub blocked_in: HashMap<String, bool>,
pub last_passed: DateTime<Utc>,
pub test_result: TestResults,
pub protocol: String,
pub address: String,
pub port: u16,
@ -51,6 +56,13 @@ impl Resource {
}
}
/// A ResourceState holds information about new, changed, or pruned resources
#[derive(Deserialize, PartialEq, Eq, Debug)]
pub struct ResourceState {
pub working: Option<Vec<Resource>>,
pub not_working: Option<Vec<Resource>>,
}
/// A ResourceDiff holds information about new, changed, or pruned resources
#[derive(Deserialize, PartialEq, Eq, Debug)]
pub struct ResourceDiff {
@ -92,7 +104,9 @@ mod tests {
let bridge = Resource {
r#type: String::from("scramblesuit"),
blocked_in: HashMap::new(),
last_passed: "2023-05-30T14:20:28Z".parse::<DateTime<Utc>>().unwrap(),
test_result: TestResults {
last_passed: "2023-05-30T14:20:28Z".parse::<DateTime<Utc>>().unwrap(),
},
protocol: String::from("tcp"),
address: String::from("216.117.3.62"),
port: 63174,
@ -107,7 +121,9 @@ mod tests {
{
"type": "scramblesuit",
"blocked_in": {},
"last_passed": "2023-05-30T14:20:28.000+00:00",
"test_result" : {
"last_passed": "2023-05-30T14:20:28.000+00:00"
},
"protocol": "tcp",
"address": "216.117.3.62",
"port": 63174,
@ -135,7 +151,9 @@ mod tests {
{
"type": "obfs2",
"blocked_in": {},
"last_passed": "2023-05-30T11:42:28.000+07:00",
"test_result" : {
"last_passed": "2023-05-30T11:42:28.000+07:00"
},
"Location": null,
"protocol": "tcp",
"address": "176.247.216.207",
@ -153,7 +171,9 @@ mod tests {
{
"type": "obfs2",
"blocked_in": {},
"last_passed": "2023-05-30T12:20:28.000+07:00",
"test_result" : {
"last_passed": "2023-05-30T12:20:28.000+07:00"
},
"protocol": "tcp",
"address": "133.69.16.145",
"port": 58314,
@ -172,7 +192,9 @@ mod tests {
{
"type": "scramblesuit",
"blocked_in": {},
"last_passed": "2023-05-30T14:20:28.000+07:00",
"test_result" : {
"last_passed": "2023-05-30T14:20:28.000+07:00"
},
"protocol": "tcp",
"address": "216.117.3.62",
"port": 63174,