From c837a131218bbe5acf5cdc11e6e97a515d9ccbd4 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Fri, 20 Jan 2023 16:01:03 -0500 Subject: [PATCH 01/16] Initial commit --- crates/rdsys-backend/.gitignore | 2 + crates/rdsys-backend/Cargo.toml | 10 ++ crates/rdsys-backend/src/lib.rs | 157 ++++++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 crates/rdsys-backend/.gitignore create mode 100644 crates/rdsys-backend/Cargo.toml create mode 100644 crates/rdsys-backend/src/lib.rs diff --git a/crates/rdsys-backend/.gitignore b/crates/rdsys-backend/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/crates/rdsys-backend/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml new file mode 100644 index 0000000..14adce3 --- /dev/null +++ b/crates/rdsys-backend/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rdsys-backend" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde_json = "1" +serde = { version = "1", features = ["derive"]} diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs new file mode 100644 index 0000000..8900381 --- /dev/null +++ b/crates/rdsys-backend/src/lib.rs @@ -0,0 +1,157 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize)] +pub struct ResourceRequest { + request_origin: String, + resource_types: Vec, +} + +#[derive(Deserialize,PartialEq,Debug)] +pub struct Resource { + r#type: String, + blocked_in: HashMap, + protocol: String, + address: String, + port: u16, + fingerprint: String, + #[serde(rename="or-addresses")] + or_addresses: Option>, + distribution: String, + flags: Option>, + params: Option>, +} + +#[derive(Deserialize)] +pub struct ResourceDiff { + new: Option>>, + changed: Option>>, + gone: Option>>, + full_update: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serialize_resource_request() { + let req = ResourceRequest { + request_origin: String::from("https"), + resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], + }; + let json = serde_json::to_string(&req).unwrap(); + assert_eq!(json, "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}") + } + + #[test] + fn deserialize_resource() { + let mut flags = HashMap::new(); + flags.insert(String::from("fast"), true); + flags.insert(String::from("stable"), true); + let mut params = HashMap::new(); + params.insert(String::from("password"), String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567")); + let bridge = Resource { + r#type: String::from("scramblesuit"), + blocked_in: HashMap::new(), + protocol: String::from("tcp"), + address: String::from("216.117.3.62"), + port: 63174, + fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"), + or_addresses: None, + distribution: String::from("https"), + flags: Some(flags), + params: Some(params), + }; + + let data = r#" + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + }"#; + let res: Resource = serde_json::from_str(data).unwrap(); + assert_eq!(bridge, res); + } + + #[test] + fn deserialize_resource_diff() { + let data = r#" + { + "new": { + "obfs2": [ + { + "type": "obfs2", + "blocked_in": {}, + "protocol": "tcp", + "address": "176.247.216.207", + "port": 42810, + "fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + }, + { + "type": "obfs2", + "blocked_in": {}, + "protocol": "tcp", + "address": "133.69.16.145", + "port": 58314, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + } + ], + "scramblesuit": [ + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + } + ] + }, + "changed": null, + "gone": null, + "full_update": true + }"#; + let _diff: ResourceDiff = serde_json::from_str(data).unwrap(); + } +} From d5c98510db5a5b4d41c917cc05954da41e94a304 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Fri, 20 Jan 2023 16:12:29 -0500 Subject: [PATCH 02/16] Run `cargo fmt` --- crates/rdsys-backend/src/lib.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 8900381..459eb46 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -8,7 +8,7 @@ pub struct ResourceRequest { resource_types: Vec, } -#[derive(Deserialize,PartialEq,Debug)] +#[derive(Deserialize, PartialEq, Debug)] pub struct Resource { r#type: String, blocked_in: HashMap, @@ -16,18 +16,18 @@ pub struct Resource { address: String, port: u16, fingerprint: String, - #[serde(rename="or-addresses")] + #[serde(rename = "or-addresses")] or_addresses: Option>, distribution: String, - flags: Option>, - params: Option>, + flags: Option>, + params: Option>, } #[derive(Deserialize)] pub struct ResourceDiff { - new: Option>>, - changed: Option>>, - gone: Option>>, + new: Option>>, + changed: Option>>, + gone: Option>>, full_update: bool, } @@ -42,7 +42,10 @@ mod tests { resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], }; let json = serde_json::to_string(&req).unwrap(); - assert_eq!(json, "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}") + assert_eq!( + json, + "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}" + ) } #[test] @@ -51,7 +54,10 @@ mod tests { flags.insert(String::from("fast"), true); flags.insert(String::from("stable"), true); let mut params = HashMap::new(); - params.insert(String::from("password"), String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567")); + params.insert( + String::from("password"), + String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + ); let bridge = Resource { r#type: String::from("scramblesuit"), blocked_in: HashMap::new(), From 64247c1ceb04de65996c33a90791c4a337c5e7bf Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 23 Jan 2023 12:58:41 -0500 Subject: [PATCH 03/16] Add more tests for resource serialization --- crates/rdsys-backend/src/lib.rs | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 459eb46..1887bea 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -23,7 +23,7 @@ pub struct Resource { params: Option>, } -#[derive(Deserialize)] +#[derive(Deserialize, PartialEq, Debug)] pub struct ResourceDiff { new: Option>>, changed: Option>>, @@ -102,6 +102,7 @@ mod tests { { "type": "obfs2", "blocked_in": {}, + "Location": null, "protocol": "tcp", "address": "176.247.216.207", "port": 42810, @@ -158,6 +159,32 @@ mod tests { "gone": null, "full_update": true }"#; - let _diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + assert_ne!(diff.new, None); + assert_eq!(diff.changed, None); + assert_eq!(diff.gone, None); + assert_eq!(diff.full_update, true); + if let Some(new) = diff.new { + assert_eq!(new["obfs2"][0].r#type, "obfs2"); + } + } + + #[test] + fn deserialize_empty_resource_diff() { + let data = r#" + { + "new": null, + "changed": null, + "gone": null, + "full_update": true + }"#; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); } } From c89aa4a9f9b24c698c69228abed3f8b89293bf28 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Sun, 29 Jan 2023 14:18:39 -0500 Subject: [PATCH 04/16] Add function to start a resource-stream connection At the moment the start_stream function very simply prints out each response as it arrives. --- crates/rdsys-backend/Cargo.toml | 5 ++- crates/rdsys-backend/src/http.rs | 52 ++++++++++++++++++++++++++++++++ crates/rdsys-backend/src/lib.rs | 2 ++ 3 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 crates/rdsys-backend/src/http.rs diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index 14adce3..43e6861 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "rdsys-backend" +name = "rdsys_backend" version = "0.1.0" edition = "2021" @@ -7,4 +7,7 @@ edition = "2021" [dependencies] serde_json = "1" +futures-util = { version = "0.3"} serde = { version = "1", features = ["derive"]} + +reqwest = { version = "0.11", features = ["stream"]} diff --git a/crates/rdsys-backend/src/http.rs b/crates/rdsys-backend/src/http.rs new file mode 100644 index 0000000..b03d742 --- /dev/null +++ b/crates/rdsys-backend/src/http.rs @@ -0,0 +1,52 @@ +use reqwest::Client; +use futures_util::StreamExt; +use std::io; + +#[derive(Debug)] +pub enum Error { + Reqwest (reqwest::Error), + Io (io::Error), + JSON (serde_json::Error), +} + +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::JSON(value) + } +} + +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Self::Reqwest(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +pub async fn start_stream(api_endpoint: String, name: String, token: String, resource_types: Vec ) -> Result<(), Error> { + + let req = crate::ResourceRequest { + request_origin: name, + resource_types: resource_types, + }; + let json = serde_json::to_string(&req)?; + + let auth_value = format!("Bearer {}",token); + + let client = Client::new(); + let mut stream = client.get(api_endpoint) + .header("Authorization", &auth_value) + .body(json) + .send() + .await? + .bytes_stream(); + + while let Some(chunk) = stream.next().await { + println!("Chunk: {:?}", chunk?); + }; + Ok(()) +} diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 1887bea..2192df7 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; +pub mod http; + #[derive(Serialize)] pub struct ResourceRequest { request_origin: String, From 9d448b00b49cc777d0d9c6e42b1e5db4648919a4 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 10:22:32 -0500 Subject: [PATCH 05/16] Return an iterator over ResourceDiff items Rather than returning a Receiver, instead return a ResourceStream struct that implements the Iterator trait so callers can simply iterate over the ResourceDiff items received. --- crates/rdsys-backend/Cargo.toml | 2 + crates/rdsys-backend/src/http.rs | 175 ++++++++++++++++++++++++++++--- crates/rdsys-backend/src/lib.rs | 13 +++ 3 files changed, 178 insertions(+), 12 deletions(-) diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index 43e6861..df6fbc6 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -9,5 +9,7 @@ edition = "2021" serde_json = "1" futures-util = { version = "0.3"} serde = { version = "1", features = ["derive"]} +bytes = "1" +tokio = "1" reqwest = { version = "0.11", features = ["stream"]} diff --git a/crates/rdsys-backend/src/http.rs b/crates/rdsys-backend/src/http.rs index b03d742..90971e4 100644 --- a/crates/rdsys-backend/src/http.rs +++ b/crates/rdsys-backend/src/http.rs @@ -1,12 +1,15 @@ -use reqwest::Client; +use bytes::{self, Buf, Bytes}; use futures_util::StreamExt; -use std::io; +use reqwest::Client; +use std::io::{self, BufRead}; +use std::sync::mpsc; +use tokio; #[derive(Debug)] pub enum Error { - Reqwest (reqwest::Error), - Io (io::Error), - JSON (serde_json::Error), + Reqwest(reqwest::Error), + Io(io::Error), + JSON(serde_json::Error), } impl From for Error { @@ -27,7 +30,141 @@ impl From for Error { } } -pub async fn start_stream(api_endpoint: String, name: String, token: String, resource_types: Vec ) -> Result<(), Error> { +pub struct ResourceStream { + rx: mpsc::Receiver, + buf: Vec, + partial: Option>, +} + +impl Iterator for ResourceStream { + type Item = crate::ResourceDiff; + fn next(&mut self) -> Option { + let mut parse = + |buffer: &mut bytes::buf::Reader| -> Result, Error> { + match buffer.read_until(b'\r', &mut self.buf) { + Ok(_) => match self.buf.pop() { + Some(b'\r') => match serde_json::from_slice(&self.buf) { + Ok(diff) => { + self.buf.clear(); + return Ok(Some(diff)); + } + Err(e) => return Err(Error::JSON(e)), + }, + Some(n) => { + self.buf.push(n); + return Ok(None); + } + None => return Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; + + if let Some(p) = &mut self.partial { + match parse(p) { + Ok(Some(diff)) => return Some(diff), + Ok(None) => self.partial = None, + Err(_) => return None, + } + } + for chunk in &self.rx { + let mut buffer = chunk.reader(); + match parse(&mut buffer) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Some(diff); + } + Ok(None) => continue, + Err(_) => return None, + }; + } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_resource() { + let chunk = Bytes::from_static( + b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", + ); + let (tx, rx) = mpsc::channel(); + tx.send(chunk).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[test] + fn parse_across_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[test] + fn parse_multi_diff_partial_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = + Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); + let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); + let chunk4 = Bytes::from_static(b"\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + tx.send(chunk3).unwrap(); + tx.send(chunk4).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let mut res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } +} + +pub async fn start_stream( + api_endpoint: String, + name: String, + token: String, + resource_types: Vec, +) -> Result { + let (tx, rx) = mpsc::channel(); let req = crate::ResourceRequest { request_origin: name, @@ -35,18 +172,32 @@ pub async fn start_stream(api_endpoint: String, name: String, token: String, res }; let json = serde_json::to_string(&req)?; - let auth_value = format!("Bearer {}",token); + let auth_value = format!("Bearer {}", token); let client = Client::new(); - let mut stream = client.get(api_endpoint) + + let mut stream = client + .get(api_endpoint) .header("Authorization", &auth_value) .body(json) .send() .await? .bytes_stream(); - while let Some(chunk) = stream.next().await { - println!("Chunk: {:?}", chunk?); - }; - Ok(()) + tokio::spawn(async move { + while let Some(chunk) = stream.next().await { + let bytes = match chunk { + Ok(b) => b, + Err(_e) => { + return; + } + }; + tx.send(bytes).unwrap(); + } + }); + Ok(ResourceStream { + rx: rx, + buf: vec![], + partial: None, + }) } diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 2192df7..2c63af2 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -189,4 +189,17 @@ mod tests { }; assert_eq!(diff, empty_diff); } + + #[test] + fn deserialize_empty_condensed_diff() { + let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } } From 04e7caef3137702185b5101430b286f7d1ec99ae Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 15:08:30 -0500 Subject: [PATCH 06/16] Refactor modules --- crates/rdsys-backend/src/http.rs | 203 ------------------ crates/rdsys-backend/src/lib.rs | 358 +++++++++++++++---------------- 2 files changed, 179 insertions(+), 382 deletions(-) delete mode 100644 crates/rdsys-backend/src/http.rs diff --git a/crates/rdsys-backend/src/http.rs b/crates/rdsys-backend/src/http.rs deleted file mode 100644 index 90971e4..0000000 --- a/crates/rdsys-backend/src/http.rs +++ /dev/null @@ -1,203 +0,0 @@ -use bytes::{self, Buf, Bytes}; -use futures_util::StreamExt; -use reqwest::Client; -use std::io::{self, BufRead}; -use std::sync::mpsc; -use tokio; - -#[derive(Debug)] -pub enum Error { - Reqwest(reqwest::Error), - Io(io::Error), - JSON(serde_json::Error), -} - -impl From for Error { - fn from(value: serde_json::Error) -> Self { - Self::JSON(value) - } -} - -impl From for Error { - fn from(value: reqwest::Error) -> Self { - Self::Reqwest(value) - } -} - -impl From for Error { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -pub struct ResourceStream { - rx: mpsc::Receiver, - buf: Vec, - partial: Option>, -} - -impl Iterator for ResourceStream { - type Item = crate::ResourceDiff; - fn next(&mut self) -> Option { - let mut parse = - |buffer: &mut bytes::buf::Reader| -> Result, Error> { - match buffer.read_until(b'\r', &mut self.buf) { - Ok(_) => match self.buf.pop() { - Some(b'\r') => match serde_json::from_slice(&self.buf) { - Ok(diff) => { - self.buf.clear(); - return Ok(Some(diff)); - } - Err(e) => return Err(Error::JSON(e)), - }, - Some(n) => { - self.buf.push(n); - return Ok(None); - } - None => return Ok(None), - }, - Err(e) => Err(Error::Io(e)), - } - }; - - if let Some(p) = &mut self.partial { - match parse(p) { - Ok(Some(diff)) => return Some(diff), - Ok(None) => self.partial = None, - Err(_) => return None, - } - } - for chunk in &self.rx { - let mut buffer = chunk.reader(); - match parse(&mut buffer) { - Ok(Some(diff)) => { - self.partial = Some(buffer); - return Some(diff); - } - Ok(None) => continue, - Err(_) => return None, - }; - } - None - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_resource() { - let chunk = Bytes::from_static( - b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", - ); - let (tx, rx) = mpsc::channel(); - tx.send(chunk).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } - - #[test] - fn parse_across_chunks() { - let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); - let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } - - #[test] - fn parse_multi_diff_partial_chunks() { - let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); - let chunk2 = - Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); - let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); - let chunk4 = Bytes::from_static(b"\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - tx.send(chunk3).unwrap(); - tx.send(chunk4).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let mut res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } -} - -pub async fn start_stream( - api_endpoint: String, - name: String, - token: String, - resource_types: Vec, -) -> Result { - let (tx, rx) = mpsc::channel(); - - let req = crate::ResourceRequest { - request_origin: name, - resource_types: resource_types, - }; - let json = serde_json::to_string(&req)?; - - let auth_value = format!("Bearer {}", token); - - let client = Client::new(); - - let mut stream = client - .get(api_endpoint) - .header("Authorization", &auth_value) - .body(json) - .send() - .await? - .bytes_stream(); - - tokio::spawn(async move { - while let Some(chunk) = stream.next().await { - let bytes = match chunk { - Ok(b) => b, - Err(_e) => { - return; - } - }; - tx.send(bytes).unwrap(); - } - }); - Ok(ResourceStream { - rx: rx, - buf: vec![], - partial: None, - }) -} diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 2c63af2..7b23e51 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -1,36 +1,87 @@ -use std::collections::HashMap; +use bytes::{self, Buf, Bytes}; +use futures_util::StreamExt; +use reqwest::Client; +use std::io::{self, BufRead}; +use std::sync::mpsc; +use tokio; -use serde::{Deserialize, Serialize}; +pub mod proto; -pub mod http; - -#[derive(Serialize)] -pub struct ResourceRequest { - request_origin: String, - resource_types: Vec, +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + Io(io::Error), + JSON(serde_json::Error), } -#[derive(Deserialize, PartialEq, Debug)] -pub struct Resource { - r#type: String, - blocked_in: HashMap, - protocol: String, - address: String, - port: u16, - fingerprint: String, - #[serde(rename = "or-addresses")] - or_addresses: Option>, - distribution: String, - flags: Option>, - params: Option>, +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::JSON(value) + } } -#[derive(Deserialize, PartialEq, Debug)] -pub struct ResourceDiff { - new: Option>>, - changed: Option>>, - gone: Option>>, - full_update: bool, +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Self::Reqwest(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +pub struct ResourceStream { + rx: mpsc::Receiver, + buf: Vec, + partial: Option>, +} + +impl Iterator for ResourceStream { + type Item = proto::ResourceDiff; + fn next(&mut self) -> Option { + let mut parse = + |buffer: &mut bytes::buf::Reader| -> Result, Error> { + match buffer.read_until(b'\r', &mut self.buf) { + Ok(_) => match self.buf.pop() { + Some(b'\r') => match serde_json::from_slice(&self.buf) { + Ok(diff) => { + self.buf.clear(); + return Ok(Some(diff)); + } + Err(e) => return Err(Error::JSON(e)), + }, + Some(n) => { + self.buf.push(n); + return Ok(None); + } + None => return Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; + + if let Some(p) = &mut self.partial { + match parse(p) { + Ok(Some(diff)) => return Some(diff), + Ok(None) => self.partial = None, + Err(_) => return None, + } + } + for chunk in &self.rx { + let mut buffer = chunk.reader(); + match parse(&mut buffer) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Some(diff); + } + Ok(None) => continue, + Err(_) => return None, + }; + } + None + } } #[cfg(test)] @@ -38,168 +89,117 @@ mod tests { use super::*; #[test] - fn serialize_resource_request() { - let req = ResourceRequest { - request_origin: String::from("https"), - resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], - }; - let json = serde_json::to_string(&req).unwrap(); - assert_eq!( - json, - "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}" - ) - } - - #[test] - fn deserialize_resource() { - let mut flags = HashMap::new(); - flags.insert(String::from("fast"), true); - flags.insert(String::from("stable"), true); - let mut params = HashMap::new(); - params.insert( - String::from("password"), - String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + fn parse_resource() { + let chunk = Bytes::from_static( + b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", ); - let bridge = Resource { - r#type: String::from("scramblesuit"), - blocked_in: HashMap::new(), - protocol: String::from("tcp"), - address: String::from("216.117.3.62"), - port: 63174, - fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"), - or_addresses: None, - distribution: String::from("https"), - flags: Some(flags), - params: Some(params), + let (tx, rx) = mpsc::channel(); + tx.send(chunk).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - - let data = r#" - { - "type": "scramblesuit", - "blocked_in": {}, - "protocol": "tcp", - "address": "216.117.3.62", - "port": 63174, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true - }, - "params": { - "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" - } - }"#; - let res: Resource = serde_json::from_str(data).unwrap(); - assert_eq!(bridge, res); - } - - #[test] - fn deserialize_resource_diff() { - let data = r#" - { - "new": { - "obfs2": [ - { - "type": "obfs2", - "blocked_in": {}, - "Location": null, - "protocol": "tcp", - "address": "176.247.216.207", - "port": 42810, - "fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - } - }, - { - "type": "obfs2", - "blocked_in": {}, - "protocol": "tcp", - "address": "133.69.16.145", - "port": 58314, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - } - } - ], - "scramblesuit": [ - { - "type": "scramblesuit", - "blocked_in": {}, - "protocol": "tcp", - "address": "216.117.3.62", - "port": 63174, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - }, - "params": { - "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" - } - } - ] - }, - "changed": null, - "gone": null, - "full_update": true - }"#; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - assert_ne!(diff.new, None); - assert_eq!(diff.changed, None); - assert_eq!(diff.gone, None); - assert_eq!(diff.full_update, true); - if let Some(new) = diff.new { - assert_eq!(new["obfs2"][0].r#type, "obfs2"); + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); } } #[test] - fn deserialize_empty_resource_diff() { - let data = r#" - { - "new": null, - "changed": null, - "gone": null, - "full_update": true - }"#; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - let empty_diff = ResourceDiff { - new: None, - changed: None, - gone: None, - full_update: true, + fn parse_across_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - assert_eq!(diff, empty_diff); + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } } #[test] - fn deserialize_empty_condensed_diff() { - let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - let empty_diff = ResourceDiff { - new: None, - changed: None, - gone: None, - full_update: true, + fn parse_multi_diff_partial_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = + Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); + let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); + let chunk4 = Bytes::from_static(b"\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + tx.send(chunk3).unwrap(); + tx.send(chunk4).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - assert_eq!(diff, empty_diff); + let mut res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } } } + +pub async fn start_stream( + api_endpoint: String, + name: String, + token: String, + resource_types: Vec, +) -> Result { + let (tx, rx) = mpsc::channel(); + + let req = proto::ResourceRequest { + request_origin: name, + resource_types: resource_types, + }; + let json = serde_json::to_string(&req)?; + + let auth_value = format!("Bearer {}", token); + + let client = Client::new(); + + let mut stream = client + .get(api_endpoint) + .header("Authorization", &auth_value) + .body(json) + .send() + .await? + .bytes_stream(); + + tokio::spawn(async move { + while let Some(chunk) = stream.next().await { + let bytes = match chunk { + Ok(b) => b, + Err(_e) => { + return; + } + }; + tx.send(bytes).unwrap(); + } + }); + Ok(ResourceStream { + rx: rx, + buf: vec![], + partial: None, + }) +} From 685541176be6d5d8ebc4bdf3811d2f1e02dc79fb Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 15:23:01 -0500 Subject: [PATCH 07/16] Add documentation --- crates/rdsys-backend/src/lib.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 7b23e51..6ef21a6 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -1,3 +1,8 @@ +//! # Rdsys Backend Distributor API +//! +//! `rdsys_backend` is an implementation of the rdsys backend API +//! https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/doc/backend-api.md + use bytes::{self, Buf, Bytes}; use futures_util::StreamExt; use reqwest::Client; @@ -32,6 +37,8 @@ impl From for Error { } } +/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes +/// received from the connection to the rdsys backend pub struct ResourceStream { rx: mpsc::Receiver, buf: Vec, @@ -160,6 +167,23 @@ mod tests { } } +/// Makes an http connection to the rdsys backend api endpoint and returns a ResourceStream +/// if successful +/// +/// # Examples +/// use rdsys_backend::start_stream; +/// +/// ``` +/// let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); +/// let name = String::from("https"); +/// let token = String::from("HttpsApiTokenPlaceholder"); +/// let types = vec![String::from("obfs2"), String::from("scramblesuit")]; +/// let rx = start_stream(endpoint, name, token, types).await.unwrap(); +/// for diff in rx { +/// println!("Received diff: {:?}", diff); +/// } +/// ``` +/// pub async fn start_stream( api_endpoint: String, name: String, From 0f60fabae69088fc0371f971dd81b311c17e88ef Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 15:24:47 -0500 Subject: [PATCH 08/16] Add README --- crates/rdsys-backend/README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 crates/rdsys-backend/README.md diff --git a/crates/rdsys-backend/README.md b/crates/rdsys-backend/README.md new file mode 100644 index 0000000..bcff125 --- /dev/null +++ b/crates/rdsys-backend/README.md @@ -0,0 +1,20 @@ +# rdsys backend API + +### Usage + +``` +use rdsys_backend::start_stream; +use tokio; + +#[tokio::main] +async fn main() { + let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); + let name = String::from("https"); + let token = String::from("HttpsApiTokenPlaceholder"); + let types = vec![String::from("obfs2"), String::from("scramblesuit")]; + let rx = start_stream(endpoint, name, token, types).await.unwrap(); + for diff in rx { + println!("Received diff: {:?}", diff); + } +} +``` From 6eb8f575ca8cfd4a6bacbf40a8af8609673c847e Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 15:24:58 -0500 Subject: [PATCH 09/16] Add proto.rs after module refactor --- crates/rdsys-backend/src/proto.rs | 206 ++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 crates/rdsys-backend/src/proto.rs diff --git a/crates/rdsys-backend/src/proto.rs b/crates/rdsys-backend/src/proto.rs new file mode 100644 index 0000000..bb96fb6 --- /dev/null +++ b/crates/rdsys-backend/src/proto.rs @@ -0,0 +1,206 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// The body of the request for resources made to the rdsys backend +#[derive(Serialize)] +pub struct ResourceRequest { + pub request_origin: String, + pub resource_types: Vec, +} + +/// Representation of a bridge resource +#[derive(Deserialize, PartialEq, Debug)] +pub struct Resource { + pub r#type: String, + pub blocked_in: HashMap, + pub protocol: String, + pub address: String, + pub port: u16, + pub fingerprint: String, + #[serde(rename = "or-addresses")] + pub or_addresses: Option>, + pub distribution: String, + pub flags: Option>, + pub params: Option>, +} + +/// A ResourceDiff holds information about new, changed, or pruned resources +#[derive(Deserialize, PartialEq, Debug)] +pub struct ResourceDiff { + pub new: Option>>, + pub changed: Option>>, + pub gone: Option>>, + pub full_update: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serialize_resource_request() { + let req = ResourceRequest { + request_origin: String::from("https"), + resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], + }; + let json = serde_json::to_string(&req).unwrap(); + assert_eq!( + json, + "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}" + ) + } + + #[test] + fn deserialize_resource() { + let mut flags = HashMap::new(); + flags.insert(String::from("fast"), true); + flags.insert(String::from("stable"), true); + let mut params = HashMap::new(); + params.insert( + String::from("password"), + String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + ); + let bridge = Resource { + r#type: String::from("scramblesuit"), + blocked_in: HashMap::new(), + protocol: String::from("tcp"), + address: String::from("216.117.3.62"), + port: 63174, + fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"), + or_addresses: None, + distribution: String::from("https"), + flags: Some(flags), + params: Some(params), + }; + + let data = r#" + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + }"#; + let res: Resource = serde_json::from_str(data).unwrap(); + assert_eq!(bridge, res); + } + + #[test] + fn deserialize_resource_diff() { + let data = r#" + { + "new": { + "obfs2": [ + { + "type": "obfs2", + "blocked_in": {}, + "Location": null, + "protocol": "tcp", + "address": "176.247.216.207", + "port": 42810, + "fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + }, + { + "type": "obfs2", + "blocked_in": {}, + "protocol": "tcp", + "address": "133.69.16.145", + "port": 58314, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + } + } + ], + "scramblesuit": [ + { + "type": "scramblesuit", + "blocked_in": {}, + "protocol": "tcp", + "address": "216.117.3.62", + "port": 63174, + "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", + "or-addresses": null, + "distribution": "https", + "flags": { + "fast": true, + "stable": true, + "running": true, + "valid": true + }, + "params": { + "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" + } + } + ] + }, + "changed": null, + "gone": null, + "full_update": true + }"#; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + assert_ne!(diff.new, None); + assert_eq!(diff.changed, None); + assert_eq!(diff.gone, None); + assert_eq!(diff.full_update, true); + if let Some(new) = diff.new { + assert_eq!(new["obfs2"][0].r#type, "obfs2"); + } + } + + #[test] + fn deserialize_empty_resource_diff() { + let data = r#" + { + "new": null, + "changed": null, + "gone": null, + "full_update": true + }"#; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } + + #[test] + fn deserialize_empty_condensed_diff() { + let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } +} From 46d51315b6520e77eadca7e26d3020b97a3ac801 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 15:25:41 -0500 Subject: [PATCH 10/16] Use rust listing in markdown --- crates/rdsys-backend/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rdsys-backend/README.md b/crates/rdsys-backend/README.md index bcff125..2cfa114 100644 --- a/crates/rdsys-backend/README.md +++ b/crates/rdsys-backend/README.md @@ -2,7 +2,7 @@ ### Usage -``` +```rust use rdsys_backend::start_stream; use tokio; From 1da6e245ccdd6252490212ebd6fb802571293b75 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 17:05:38 -0500 Subject: [PATCH 11/16] Formatting and documentation fixes --- crates/rdsys-backend/src/lib.rs | 16 ++++++++-------- crates/rdsys-backend/src/proto.rs | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 6ef21a6..b54a67c 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -8,7 +8,6 @@ use futures_util::StreamExt; use reqwest::Client; use std::io::{self, BufRead}; use std::sync::mpsc; -use tokio; pub mod proto; @@ -55,15 +54,15 @@ impl Iterator for ResourceStream { Some(b'\r') => match serde_json::from_slice(&self.buf) { Ok(diff) => { self.buf.clear(); - return Ok(Some(diff)); + Ok(Some(diff)) } - Err(e) => return Err(Error::JSON(e)), + Err(e) => Err(Error::JSON(e)), }, Some(n) => { self.buf.push(n); - return Ok(None); + Ok(None) } - None => return Ok(None), + None => Ok(None), }, Err(e) => Err(Error::Io(e)), } @@ -171,9 +170,10 @@ mod tests { /// if successful /// /// # Examples +/// +/// ```ignore /// use rdsys_backend::start_stream; /// -/// ``` /// let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); /// let name = String::from("https"); /// let token = String::from("HttpsApiTokenPlaceholder"); @@ -194,7 +194,7 @@ pub async fn start_stream( let req = proto::ResourceRequest { request_origin: name, - resource_types: resource_types, + resource_types, }; let json = serde_json::to_string(&req)?; @@ -222,7 +222,7 @@ pub async fn start_stream( } }); Ok(ResourceStream { - rx: rx, + rx, buf: vec![], partial: None, }) diff --git a/crates/rdsys-backend/src/proto.rs b/crates/rdsys-backend/src/proto.rs index bb96fb6..f81af42 100644 --- a/crates/rdsys-backend/src/proto.rs +++ b/crates/rdsys-backend/src/proto.rs @@ -10,11 +10,11 @@ pub struct ResourceRequest { } /// Representation of a bridge resource -#[derive(Deserialize, PartialEq, Debug)] +#[derive(Deserialize, PartialEq, Eq, Debug)] pub struct Resource { pub r#type: String, pub blocked_in: HashMap, - pub protocol: String, + pub protocol: String, pub address: String, pub port: u16, pub fingerprint: String, @@ -26,7 +26,7 @@ pub struct Resource { } /// A ResourceDiff holds information about new, changed, or pruned resources -#[derive(Deserialize, PartialEq, Debug)] +#[derive(Deserialize, PartialEq, Eq, Debug)] pub struct ResourceDiff { pub new: Option>>, pub changed: Option>>, From 0b0324c4874c882e748d5f0d96ef450dc91d247f Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Tue, 21 Mar 2023 17:37:15 -0400 Subject: [PATCH 12/16] Change ResourceStream from an Iterator into a Stream --- crates/rdsys-backend/Cargo.toml | 5 +- crates/rdsys-backend/src/lib.rs | 182 ++++++++++++++++++-------------- 2 files changed, 106 insertions(+), 81 deletions(-) diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index df6fbc6..cdaa5a0 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -10,6 +10,7 @@ serde_json = "1" futures-util = { version = "0.3"} serde = { version = "1", features = ["derive"]} bytes = "1" -tokio = "1" - +tokio = { version = "1", features = ["macros"]} reqwest = { version = "0.11", features = ["stream"]} +tokio-stream = "0.1.12" +futures = "0.3.27" diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index b54a67c..7ac1c52 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -4,10 +4,13 @@ //! https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/doc/backend-api.md use bytes::{self, Buf, Bytes}; -use futures_util::StreamExt; +use core::pin::Pin; +use core::task::Poll; +use futures_util::{Stream, StreamExt}; use reqwest::Client; use std::io::{self, BufRead}; -use std::sync::mpsc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; pub mod proto; @@ -39,27 +42,37 @@ impl From for Error { /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { - rx: mpsc::Receiver, + rx: ReceiverStream, buf: Vec, partial: Option>, } -impl Iterator for ResourceStream { +impl ResourceStream { + pub fn new(rx: mpsc::Receiver) -> ResourceStream { + ResourceStream { + rx: ReceiverStream::new(rx), + buf: vec![], + partial: None, + } + } +} + +impl Stream for ResourceStream { type Item = proto::ResourceDiff; - fn next(&mut self) -> Option { - let mut parse = - |buffer: &mut bytes::buf::Reader| -> Result, Error> { - match buffer.read_until(b'\r', &mut self.buf) { - Ok(_) => match self.buf.pop() { - Some(b'\r') => match serde_json::from_slice(&self.buf) { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll> { + let parse = + |buffer: &mut bytes::buf::Reader, buf: &mut Vec| -> Result, Error> { + match buffer.read_until(b'\r', buf) { + Ok(_) => match buf.pop() { + Some(b'\r') => match serde_json::from_slice(buf) { Ok(diff) => { - self.buf.clear(); + buf.clear(); Ok(Some(diff)) } Err(e) => Err(Error::JSON(e)), }, Some(n) => { - self.buf.push(n); + buf.push(n); Ok(None) } None => Ok(None), @@ -67,26 +80,33 @@ impl Iterator for ResourceStream { Err(e) => Err(Error::Io(e)), } }; - + // This clone is here to avoid having multiple mutable references to self + // it's not optimal performance-wise but given that these resource streams aren't large + // this feels like an acceptable trade-off to the complexity of interior mutability + let mut buf = self.buf.clone(); if let Some(p) = &mut self.partial { - match parse(p) { - Ok(Some(diff)) => return Some(diff), + match parse(p, &mut buf) { + Ok(Some(diff)) => return Poll::Ready(Some(diff)), Ok(None) => self.partial = None, - Err(_) => return None, + Err(_) => return Poll::Ready(None), } } - for chunk in &self.rx { - let mut buffer = chunk.reader(); - match parse(&mut buffer) { - Ok(Some(diff)) => { - self.partial = Some(buffer); - return Some(diff); + self.buf = buf; + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Ready(Some(chunk)) => { + let mut buffer = chunk.reader(); + match parse(&mut buffer, &mut self.buf) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Poll::Ready(Some(diff)); + } + Ok(None) => Poll::Pending, //maybe loop here? + Err(_) => return Poll::Ready(None), } - Ok(None) => continue, - Err(_) => return None, - }; + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } - None } } @@ -94,72 +114,76 @@ impl Iterator for ResourceStream { mod tests { use super::*; - #[test] - fn parse_resource() { + #[tokio::test] + async fn parse_resource() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); let chunk = Bytes::from_static( b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", ); - let (tx, rx) = mpsc::channel(); - tx.send(chunk).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { + let (tx, rx) = mpsc::channel(100); + tx.send(chunk).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let res = Pin::new(&mut diffs).poll_next(&mut cx); + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } } - #[test] - fn parse_across_chunks() { + #[tokio::test] + async fn parse_across_chunks() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { + let (tx, rx) = mpsc::channel(100); + tx.send(chunk1).await.unwrap(); + tx.send(chunk2).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let mut res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } } - #[test] - fn parse_multi_diff_partial_chunks() { + #[tokio::test] + async fn parse_multi_diff_partial_chunks() { + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); let chunk4 = Bytes::from_static(b"\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - tx.send(chunk3).unwrap(); - tx.send(chunk4).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let mut res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { + let (tx, rx) = mpsc::channel(100); + tx.send(chunk1).await.unwrap(); + tx.send(chunk2).await.unwrap(); + tx.send(chunk3).await.unwrap(); + tx.send(chunk4).await.unwrap(); + let mut diffs = ResourceStream::new(rx); + let mut res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } - res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + while let Poll::Pending = res { + res = Pin::new(&mut diffs).poll_next(&mut cx); + } + assert_ne!(res, Poll::Ready(None)); + assert_ne!(res, Poll::Pending); + if let Poll::Ready(Some(diff)) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } @@ -178,9 +202,13 @@ mod tests { /// let name = String::from("https"); /// let token = String::from("HttpsApiTokenPlaceholder"); /// let types = vec![String::from("obfs2"), String::from("scramblesuit")]; -/// let rx = start_stream(endpoint, name, token, types).await.unwrap(); -/// for diff in rx { -/// println!("Received diff: {:?}", diff); +/// let stream = start_stream(endpoint, name, token, types).await.unwrap(); +/// loop { +/// match Pin::new(&mut stream).poll_next(&mut cx) { +/// Poll::Ready(Some(diff)) => println!("Received diff: {:?}", diff), +/// Poll::Ready(None) => break, +/// Poll::Pending => continue, +/// } /// } /// ``` /// @@ -190,7 +218,7 @@ pub async fn start_stream( token: String, resource_types: Vec, ) -> Result { - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel(100); let req = proto::ResourceRequest { request_origin: name, @@ -218,12 +246,8 @@ pub async fn start_stream( return; } }; - tx.send(bytes).unwrap(); + tx.send(bytes).await.unwrap(); } }); - Ok(ResourceStream { - rx, - buf: vec![], - partial: None, - }) + Ok(ResourceStream::new(rx)) } From e1e5e798b64fffffd82a7c9055bfdb1e30c6223c Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 22 Mar 2023 15:05:05 -0400 Subject: [PATCH 13/16] Use a ReusableBoxFuture to store ReceiverStream --- crates/rdsys-backend/Cargo.toml | 1 + crates/rdsys-backend/src/lib.rs | 23 ++++++++++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index cdaa5a0..e30c954 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -14,3 +14,4 @@ tokio = { version = "1", features = ["macros"]} reqwest = { version = "0.11", features = ["stream"]} tokio-stream = "0.1.12" futures = "0.3.27" +tokio-util = "0.7.7" diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 7ac1c52..75e8f11 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -5,12 +5,13 @@ use bytes::{self, Buf, Bytes}; use core::pin::Pin; -use core::task::Poll; +use core::task::{Poll, ready}; use futures_util::{Stream, StreamExt}; use reqwest::Client; use std::io::{self, BufRead}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::ReusableBoxFuture; pub mod proto; @@ -42,7 +43,7 @@ impl From for Error { /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { - rx: ReceiverStream, + inner: ReusableBoxFuture<'static, (Option, ReceiverStream)>, buf: Vec, partial: Option>, } @@ -50,13 +51,18 @@ pub struct ResourceStream { impl ResourceStream { pub fn new(rx: mpsc::Receiver) -> ResourceStream { ResourceStream { - rx: ReceiverStream::new(rx), + inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))), buf: vec![], partial: None, } } } +async fn make_future(mut rx: ReceiverStream) -> (Option, ReceiverStream) { + let result = rx.next().await; + (result, rx) +} + impl Stream for ResourceStream { type Item = proto::ResourceDiff; fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll> { @@ -92,8 +98,12 @@ impl Stream for ResourceStream { } } self.buf = buf; - match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(Some(chunk)) => { + println!("HERE"); + let (result, rx) = ready!(self.inner.poll(cx)); + println!("HERE2"); + self.inner.set(make_future(rx)); + match result { + Some(chunk) => { let mut buffer = chunk.reader(); match parse(&mut buffer, &mut self.buf) { Ok(Some(diff)) => { @@ -104,8 +114,7 @@ impl Stream for ResourceStream { Err(_) => return Poll::Ready(None), } } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + None => Poll::Ready(None), } } } From eb43414574c78893f01ff6aa2ec240229b8e9339 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 22 Mar 2023 17:41:08 -0400 Subject: [PATCH 14/16] Only return Poll::Pending if the inner future returns it The waker stored in the future will only wake up and re-poll the stream if the call to future returned Poll::Pending. Because of this, if we didn't receive enough data to reconstruct the ResourceDiff, we need to loop and poll the inner future again rather than return Poll::Pending ourselves. --- crates/rdsys-backend/src/lib.rs | 78 ++++++++++++++++----------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 75e8f11..ae8c7db 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -5,12 +5,11 @@ use bytes::{self, Buf, Bytes}; use core::pin::Pin; -use core::task::{Poll, ready}; use futures_util::{Stream, StreamExt}; use reqwest::Client; use std::io::{self, BufRead}; +use std::task::{ready, Context, Poll}; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::ReusableBoxFuture; pub mod proto; @@ -43,7 +42,7 @@ impl From for Error { /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { - inner: ReusableBoxFuture<'static, (Option, ReceiverStream)>, + inner: ReusableBoxFuture<'static, (Option, mpsc::Receiver)>, buf: Vec, partial: Option>, } @@ -51,41 +50,42 @@ pub struct ResourceStream { impl ResourceStream { pub fn new(rx: mpsc::Receiver) -> ResourceStream { ResourceStream { - inner: ReusableBoxFuture::new(make_future(ReceiverStream::new(rx))), + inner: ReusableBoxFuture::new(make_future(rx)), buf: vec![], partial: None, } } } -async fn make_future(mut rx: ReceiverStream) -> (Option, ReceiverStream) { - let result = rx.next().await; +async fn make_future(mut rx: mpsc::Receiver) -> (Option, mpsc::Receiver) { + let result = rx.recv().await; (result, rx) } impl Stream for ResourceStream { type Item = proto::ResourceDiff; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut core::task::Context) -> Poll> { - let parse = - |buffer: &mut bytes::buf::Reader, buf: &mut Vec| -> Result, Error> { - match buffer.read_until(b'\r', buf) { - Ok(_) => match buf.pop() { - Some(b'\r') => match serde_json::from_slice(buf) { - Ok(diff) => { - buf.clear(); - Ok(Some(diff)) - } - Err(e) => Err(Error::JSON(e)), - }, - Some(n) => { - buf.push(n); - Ok(None) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let parse = |buffer: &mut bytes::buf::Reader, + buf: &mut Vec| + -> Result, Error> { + match buffer.read_until(b'\r', buf) { + Ok(_) => match buf.pop() { + Some(b'\r') => match serde_json::from_slice(buf) { + Ok(diff) => { + buf.clear(); + Ok(Some(diff)) } - None => Ok(None), + Err(e) => Err(Error::JSON(e)), }, - Err(e) => Err(Error::Io(e)), - } - }; + Some(n) => { + buf.push(n); + Ok(None) + } + None => Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; // This clone is here to avoid having multiple mutable references to self // it's not optimal performance-wise but given that these resource streams aren't large // this feels like an acceptable trade-off to the complexity of interior mutability @@ -98,23 +98,23 @@ impl Stream for ResourceStream { } } self.buf = buf; - println!("HERE"); - let (result, rx) = ready!(self.inner.poll(cx)); - println!("HERE2"); - self.inner.set(make_future(rx)); - match result { - Some(chunk) => { - let mut buffer = chunk.reader(); - match parse(&mut buffer, &mut self.buf) { - Ok(Some(diff)) => { - self.partial = Some(buffer); - return Poll::Ready(Some(diff)); + loop { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Some(chunk) => { + let mut buffer = chunk.reader(); + match parse(&mut buffer, &mut self.buf) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Poll::Ready(Some(diff)); + } + Ok(None) => continue, + Err(_) => return Poll::Ready(None), } - Ok(None) => Poll::Pending, //maybe loop here? - Err(_) => return Poll::Ready(None), } + None => return Poll::Ready(None), } - None => Poll::Ready(None), } } } From dafe1b0a1de58f6117d3199ff1eaba97b0c67c93 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Tue, 4 Apr 2023 13:27:47 -0400 Subject: [PATCH 15/16] Add get_uid function for API resource --- crates/rdsys-backend/Cargo.toml | 3 +++ crates/rdsys-backend/src/main.rs | 22 ++++++++++++++++++++++ crates/rdsys-backend/src/proto.rs | 28 ++++++++++++++++++++++++++-- 3 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 crates/rdsys-backend/src/main.rs diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index e30c954..782f534 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -10,6 +10,9 @@ serde_json = "1" futures-util = { version = "0.3"} serde = { version = "1", features = ["derive"]} bytes = "1" +hex = "0.4.3" +crc64 = "2.0.0" +sha1 = "0.10.5" tokio = { version = "1", features = ["macros"]} reqwest = { version = "0.11", features = ["stream"]} tokio-stream = "0.1.12" diff --git a/crates/rdsys-backend/src/main.rs b/crates/rdsys-backend/src/main.rs new file mode 100644 index 0000000..88c6ad7 --- /dev/null +++ b/crates/rdsys-backend/src/main.rs @@ -0,0 +1,22 @@ +use sha1::{Sha1, Digest}; + +fn get_uid(fingerprint: String, pt_type: String) -> Result { + let hex_fingerprint = match hex::decode(fingerprint) { + Ok(hex_fingerprint) => hex_fingerprint, + Err(e) => return Err(e), + }; + + let mut hasher = Sha1::new(); + hasher.update(hex_fingerprint); + let result_fingerprint = hasher.finalize(); + let uid_string = pt_type+&hex::encode_upper(result_fingerprint); + Ok(crc64::crc64(0, uid_string.as_bytes())) + } + + +fn main() { +let hex_stuff = get_uid("FD8DC7EF92F1F14D00CF9D6F6297A3468B59E707".to_string(), "obfs4".to_string()); +println!("The result is: {:?}", hex_stuff); +let hex_stuff2 = get_uid("FD8DC7EF92F1F14D00CF9D6F6297A3468B59E707".to_string(), "scramblesuit".to_string()); +println!("The result is: {:?}", hex_stuff2); +} \ No newline at end of file diff --git a/crates/rdsys-backend/src/proto.rs b/crates/rdsys-backend/src/proto.rs index f81af42..9ecbaec 100644 --- a/crates/rdsys-backend/src/proto.rs +++ b/crates/rdsys-backend/src/proto.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; +use sha1::{Digest, Sha1}; +use std::collections::HashMap; /// The body of the request for resources made to the rdsys backend #[derive(Serialize)] @@ -25,6 +25,30 @@ pub struct Resource { pub params: Option>, } +impl Resource { + /// get_uid creates a unique identifier of the resource from a hash of the fingerprint + /// and bridge type. A similar process is used in rdsys + /// https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/pkg/usecases/resources/bridges.go#L99 + /// however, the golang and rust implementations of crc64 lead to different hash values. + /// The polynomial used for rust's crc64 package is: https://docs.rs/crc64/2.0.0/src/crc64/lib.rs.html#8 + /// using "Jones" coefficients. Changing go's polynomial to match rust's still doesn't make the hashes the same. + /// We use the get_uid in this case for an identifier in the distributor so as long as it is unique, it doesn't + /// strictly need to match the value in rdsys' backend. + pub fn get_uid(self: &Self) -> Result { + let hex_fingerprint = match hex::decode(self.fingerprint.clone()) { + Ok(hex_fingerprint) => hex_fingerprint, + Err(e) => return Err(e), + }; + + let mut hasher = Sha1::new(); + hasher.update(hex_fingerprint); + let result_fingerprint = hasher.finalize(); + let uid_string = self.r#type.clone() + &hex::encode_upper(result_fingerprint); + + Ok(crc64::crc64(0, uid_string.as_bytes())) + } +} + /// A ResourceDiff holds information about new, changed, or pruned resources #[derive(Deserialize, PartialEq, Eq, Debug)] pub struct ResourceDiff { From ba7116aa1695ee47e93e81d2380dfeba078a398b Mon Sep 17 00:00:00 2001 From: onyinyang Date: Mon, 10 Apr 2023 11:24:53 -0400 Subject: [PATCH 16/16] Removing test function --- crates/rdsys-backend/.gitignore => .gitignore | 0 crates/rdsys-backend/Cargo.toml => Cargo.toml | 0 crates/rdsys-backend/README.md => README.md | 0 crates/rdsys-backend/src/main.rs | 22 ------------------- {crates/rdsys-backend/src => src}/lib.rs | 0 {crates/rdsys-backend/src => src}/proto.rs | 0 6 files changed, 22 deletions(-) rename crates/rdsys-backend/.gitignore => .gitignore (100%) rename crates/rdsys-backend/Cargo.toml => Cargo.toml (100%) rename crates/rdsys-backend/README.md => README.md (100%) delete mode 100644 crates/rdsys-backend/src/main.rs rename {crates/rdsys-backend/src => src}/lib.rs (100%) rename {crates/rdsys-backend/src => src}/proto.rs (100%) diff --git a/crates/rdsys-backend/.gitignore b/.gitignore similarity index 100% rename from crates/rdsys-backend/.gitignore rename to .gitignore diff --git a/crates/rdsys-backend/Cargo.toml b/Cargo.toml similarity index 100% rename from crates/rdsys-backend/Cargo.toml rename to Cargo.toml diff --git a/crates/rdsys-backend/README.md b/README.md similarity index 100% rename from crates/rdsys-backend/README.md rename to README.md diff --git a/crates/rdsys-backend/src/main.rs b/crates/rdsys-backend/src/main.rs deleted file mode 100644 index 88c6ad7..0000000 --- a/crates/rdsys-backend/src/main.rs +++ /dev/null @@ -1,22 +0,0 @@ -use sha1::{Sha1, Digest}; - -fn get_uid(fingerprint: String, pt_type: String) -> Result { - let hex_fingerprint = match hex::decode(fingerprint) { - Ok(hex_fingerprint) => hex_fingerprint, - Err(e) => return Err(e), - }; - - let mut hasher = Sha1::new(); - hasher.update(hex_fingerprint); - let result_fingerprint = hasher.finalize(); - let uid_string = pt_type+&hex::encode_upper(result_fingerprint); - Ok(crc64::crc64(0, uid_string.as_bytes())) - } - - -fn main() { -let hex_stuff = get_uid("FD8DC7EF92F1F14D00CF9D6F6297A3468B59E707".to_string(), "obfs4".to_string()); -println!("The result is: {:?}", hex_stuff); -let hex_stuff2 = get_uid("FD8DC7EF92F1F14D00CF9D6F6297A3468B59E707".to_string(), "scramblesuit".to_string()); -println!("The result is: {:?}", hex_stuff2); -} \ No newline at end of file diff --git a/crates/rdsys-backend/src/lib.rs b/src/lib.rs similarity index 100% rename from crates/rdsys-backend/src/lib.rs rename to src/lib.rs diff --git a/crates/rdsys-backend/src/proto.rs b/src/proto.rs similarity index 100% rename from crates/rdsys-backend/src/proto.rs rename to src/proto.rs