diff --git a/crates/rdsys-backend-api/.gitignore b/crates/rdsys-backend-api/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/crates/rdsys-backend-api/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/crates/rdsys-backend-api/Cargo.toml b/crates/rdsys-backend-api/Cargo.toml new file mode 100644 index 0000000..7e8d187 --- /dev/null +++ b/crates/rdsys-backend-api/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "rdsys_backend" +authors = ["Cecylia Bocovich "] +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1" +futures-util = { version = "0.3"} +serde = { version = "1", features = ["derive"]} +bytes = "1" +hex = "0.4.3" +crc64 = "2.0.0" +sha1 = "0.10.5" +tokio = { version = "1", features = ["macros"]} +reqwest = { version = "0.11", features = ["stream"]} +tokio-stream = "0.1.12" +futures = "0.3.27" +tokio-util = "0.7.7" diff --git a/crates/rdsys-backend-api/README.md b/crates/rdsys-backend-api/README.md new file mode 100644 index 0000000..2cfa114 --- /dev/null +++ b/crates/rdsys-backend-api/README.md @@ -0,0 +1,20 @@ +# rdsys backend API + +### Usage + +```rust +use rdsys_backend::start_stream; +use tokio; + +#[tokio::main] +async fn main() { + let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); + let name = String::from("https"); + let token = String::from("HttpsApiTokenPlaceholder"); + let types = vec![String::from("obfs2"), String::from("scramblesuit")]; + let rx = start_stream(endpoint, name, token, types).await.unwrap(); + for diff in rx { + println!("Received diff: {:?}", diff); + } +} +``` diff --git a/crates/rdsys-backend-api/src/lib.rs b/crates/rdsys-backend-api/src/lib.rs new file mode 100644 index 0000000..ae8c7db --- /dev/null +++ b/crates/rdsys-backend-api/src/lib.rs @@ -0,0 +1,262 @@ +//! # Rdsys Backend Distributor API +//! +//! `rdsys_backend` is an implementation of the rdsys backend API +//! https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/doc/backend-api.md + +use bytes::{self, Buf, Bytes}; +use core::pin::Pin; +use futures_util::{Stream, StreamExt}; +use reqwest::Client; +use std::io::{self, BufRead}; +use std::task::{ready, Context, Poll}; +use tokio::sync::mpsc; +use tokio_util::sync::ReusableBoxFuture; + +pub mod proto; + +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + Io(io::Error), + JSON(serde_json::Error), +} + +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::JSON(value) + } +} + +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Self::Reqwest(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes +/// received from the connection to the rdsys backend +pub struct ResourceStream { + inner: ReusableBoxFuture<'static, (Option, mpsc::Receiver)>, + buf: Vec, + partial: Option>, +} + +impl ResourceStream { + pub fn new(rx: mpsc::Receiver) -> ResourceStream { + ResourceStream { + inner: ReusableBoxFuture::new(make_future(rx)), + buf: vec![], + partial: None, + } + } +} + +async fn make_future(mut rx: mpsc::Receiver) -> (Option, mpsc::Receiver) { + let result = rx.recv().await; + (result, rx) +} + +impl Stream for ResourceStream { + type Item = proto::ResourceDiff; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let parse = |buffer: &mut bytes::buf::Reader, + buf: &mut Vec| + -> Result, Error> { + match buffer.read_until(b'\r', buf) { + Ok(_) => match buf.pop() { + Some(b'\r') => match serde_json::from_slice(buf) { + Ok(diff) => { + buf.clear(); + Ok(Some(diff)) + } + Err(e) => Err(Error::JSON(e)), + }, + Some(n) => { + buf.push(n); + Ok(None) + } + None => Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; + // This clone is here to avoid having multiple mutable references to self + // it's not optimal performance-wise but given that these resource streams aren't large + // this feels like an acceptable trade-off to the complexity of interior mutability + let mut buf = self.buf.clone(); + if let Some(p) = &mut self.partial { + match parse(p, &mut buf) { + Ok(Some(diff)) => return Poll::Ready(Some(diff)), + Ok(None) => self.partial = None, + Err(_) => return Poll::Ready(None), + } + } + self.buf = buf; + loop { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Some(chunk) => { + let mut buffer = chunk.reader(); + match parse(&mut buffer, &mut self.buf) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Poll::Ready(Some(diff)); + } + Ok(None) => continue, + Err(_) => return Poll::Ready(None), + } + } + None => return Poll::Ready(None), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn parse_resource() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); + let chunk = Bytes::from_static( + b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", + ); + let (tx, rx) = mpsc::channel(100); + tx.send(chunk).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let res = Pin::new(&mut diffs).poll_next(&mut cx); + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[tokio::test] + async fn parse_across_chunks() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); + let (tx, rx) = mpsc::channel(100); + tx.send(chunk1).await.unwrap(); + tx.send(chunk2).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let mut res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[tokio::test] + async fn parse_multi_diff_partial_chunks() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = + Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); + let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); + let chunk4 = Bytes::from_static(b"\r"); + let (tx, rx) = mpsc::channel(100); + tx.send(chunk1).await.unwrap(); + tx.send(chunk2).await.unwrap(); + tx.send(chunk3).await.unwrap(); + tx.send(chunk4).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let mut res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } +} + +/// Makes an http connection to the rdsys backend api endpoint and returns a ResourceStream +/// if successful +/// +/// # Examples +/// +/// ```ignore +/// use rdsys_backend::start_stream; +/// +/// let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); +/// let name = String::from("https"); +/// let token = String::from("HttpsApiTokenPlaceholder"); +/// let types = vec![String::from("obfs2"), String::from("scramblesuit")]; +/// let stream = start_stream(endpoint, name, token, types).await.unwrap(); +/// loop { +/// match Pin::new(&mut stream).poll_next(&mut cx) { +/// Poll::Ready(Some(diff)) => println!("Received diff: {:?}", diff), +/// Poll::Ready(None) => break, +/// Poll::Pending => continue, +/// } +/// } +/// ``` +/// +pub async fn start_stream( + api_endpoint: String, + name: String, + token: String, + resource_types: Vec, +) -> Result { + let (tx, rx) = mpsc::channel(100); + + let req = proto::ResourceRequest { + request_origin: name, + resource_types, + }; + let json = serde_json::to_string(&req)?; + + let auth_value = format!("Bearer {}", token); + + let client = Client::new(); + + let mut stream = client + .get(api_endpoint) + .header("Authorization", &auth_value) + .body(json) + .send() + .await? + .bytes_stream(); + + tokio::spawn(async move { + while let Some(chunk) = stream.next().await { + let bytes = match chunk { + Ok(b) => b, + Err(_e) => { + return; + } + }; + tx.send(bytes).await.unwrap(); + } + }); + Ok(ResourceStream::new(rx)) +} diff --git a/crates/rdsys-backend-api/src/proto.rs b/crates/rdsys-backend-api/src/proto.rs new file mode 100644 index 0000000..9ecbaec --- /dev/null +++ b/crates/rdsys-backend-api/src/proto.rs @@ -0,0 +1,230 @@ +use serde::{Deserialize, Serialize}; +use sha1::{Digest, Sha1}; +use std::collections::HashMap; + +/// The body of the request for resources made to the rdsys backend +#[derive(Serialize)] +pub struct ResourceRequest { + pub request_origin: String, + pub resource_types: Vec, +} + +/// Representation of a bridge resource +#[derive(Deserialize, PartialEq, Eq, Debug)] +pub struct Resource { + pub r#type: String, + pub blocked_in: HashMap, + pub protocol: String, + pub address: String, + pub port: u16, + pub fingerprint: String, + #[serde(rename = "or-addresses")] + pub or_addresses: Option>, + pub distribution: String, + pub flags: Option>, + pub params: Option>, +} + +impl Resource { + /// get_uid creates a unique identifier of the resource from a hash of the fingerprint + /// and bridge type. A similar process is used in rdsys + /// https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/pkg/usecases/resources/bridges.go#L99 + /// however, the golang and rust implementations of crc64 lead to different hash values. + /// The polynomial used for rust's crc64 package is: https://docs.rs/crc64/2.0.0/src/crc64/lib.rs.html#8 + /// using "Jones" coefficients. Changing go's polynomial to match rust's still doesn't make the hashes the same. + /// We use the get_uid in this case for an identifier in the distributor so as long as it is unique, it doesn't + /// strictly need to match the value in rdsys' backend. + pub fn get_uid(self: &Self) -> Result { + let hex_fingerprint = match hex::decode(self.fingerprint.clone()) { + Ok(hex_fingerprint) => hex_fingerprint, + Err(e) => return Err(e), + }; + + let mut hasher = Sha1::new(); + hasher.update(hex_fingerprint); + let result_fingerprint = hasher.finalize(); + let uid_string = self.r#type.clone() + &hex::encode_upper(result_fingerprint); + + Ok(crc64::crc64(0, uid_string.as_bytes())) + } +} + +/// A ResourceDiff holds information about new, changed, or pruned resources +#[derive(Deserialize, PartialEq, Eq, Debug)] +pub struct ResourceDiff { + pub new: Option>>, + pub changed: Option>>, + pub gone: Option>>, + pub full_update: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serialize_resource_request() { + let req = ResourceRequest { + request_origin: String::from("https"), + resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], + }; + let json = serde_json::to_string(&req).unwrap(); + assert_eq!( + json, + "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}" + ) + } + + #[test] + fn deserialize_resource() { + let mut flags = HashMap::new(); + flags.insert(String::from("fast"), true); + flags.insert(String::from("stable"), true); + let mut params = HashMap::new(); + params.insert( + String::from("password"), + String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + ); + let bridge = Resource { + r#type: String::from("scramblesuit"), + blocked_in: HashMap::new(), + protocol: String::from("tcp"), + address: String::from("216.117.3.62"), + port: 63174, + fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"), + or_addresses: None, + distribution: String::from("https"), + flags: Some(flags), + params: Some(params), + }; + + let data = r#" + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + }"#; + let res: Resource = serde_json::from_str(data).unwrap(); + assert_eq!(bridge, res); + } + + #[test] + fn deserialize_resource_diff() { + let data = r#" + { + "new": { + "obfs2": [ + { + "type": "obfs2", + "blocked_in": {}, + "Location": null, + "protocol": "tcp", + "address": "176.247.216.207", + "port": 42810, + "fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + }, + { + "type": "obfs2", + "blocked_in": {}, + "protocol": "tcp", + "address": "133.69.16.145", + "port": 58314, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + } + ], + "scramblesuit": [ + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + } + ] + }, + "changed": null, + "gone": null, + "full_update": true + }"#; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + assert_ne!(diff.new, None); + assert_eq!(diff.changed, None); + assert_eq!(diff.gone, None); + assert_eq!(diff.full_update, true); + if let Some(new) = diff.new { + assert_eq!(new["obfs2"][0].r#type, "obfs2"); + } + } + + #[test] + fn deserialize_empty_resource_diff() { + let data = r#" + { + "new": null, + "changed": null, + "gone": null, + "full_update": true + }"#; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } + + #[test] + fn deserialize_empty_condensed_diff() { + let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } +}