diff --git a/crates/rdsys-backend/src/http.rs b/crates/rdsys-backend/src/http.rs deleted file mode 100644 index 90971e4..0000000 --- a/crates/rdsys-backend/src/http.rs +++ /dev/null @@ -1,203 +0,0 @@ -use bytes::{self, Buf, Bytes}; -use futures_util::StreamExt; -use reqwest::Client; -use std::io::{self, BufRead}; -use std::sync::mpsc; -use tokio; - -#[derive(Debug)] -pub enum Error { - Reqwest(reqwest::Error), - Io(io::Error), - JSON(serde_json::Error), -} - -impl From for Error { - fn from(value: serde_json::Error) -> Self { - Self::JSON(value) - } -} - -impl From for Error { - fn from(value: reqwest::Error) -> Self { - Self::Reqwest(value) - } -} - -impl From for Error { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -pub struct ResourceStream { - rx: mpsc::Receiver, - buf: Vec, - partial: Option>, -} - -impl Iterator for ResourceStream { - type Item = crate::ResourceDiff; - fn next(&mut self) -> Option { - let mut parse = - |buffer: &mut bytes::buf::Reader| -> Result, Error> { - match buffer.read_until(b'\r', &mut self.buf) { - Ok(_) => match self.buf.pop() { - Some(b'\r') => match serde_json::from_slice(&self.buf) { - Ok(diff) => { - self.buf.clear(); - return Ok(Some(diff)); - } - Err(e) => return Err(Error::JSON(e)), - }, - Some(n) => { - self.buf.push(n); - return Ok(None); - } - None => return Ok(None), - }, - Err(e) => Err(Error::Io(e)), - } - }; - - if let Some(p) = &mut self.partial { - match parse(p) { - Ok(Some(diff)) => return Some(diff), - Ok(None) => self.partial = None, - Err(_) => return None, - } - } - for chunk in &self.rx { - let mut buffer = chunk.reader(); - match parse(&mut buffer) { - Ok(Some(diff)) => { - self.partial = Some(buffer); - return Some(diff); - } - Ok(None) => continue, - Err(_) => return None, - }; - } - None - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_resource() { - let chunk = Bytes::from_static( - b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", - ); - let (tx, rx) = mpsc::channel(); - tx.send(chunk).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } - - #[test] - fn parse_across_chunks() { - let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); - let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } - - #[test] - fn parse_multi_diff_partial_chunks() { - let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); - let chunk2 = - Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); - let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); - let chunk4 = Bytes::from_static(b"\r"); - let (tx, rx) = mpsc::channel(); - tx.send(chunk1).unwrap(); - tx.send(chunk2).unwrap(); - tx.send(chunk3).unwrap(); - tx.send(chunk4).unwrap(); - let mut diffs = ResourceStream { - rx: rx, - partial: None, - buf: vec![], - }; - let mut res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - res = diffs.next(); - assert_ne!(res, None); - if let Some(diff) = res { - assert_eq!(diff.new, None); - assert_eq!(diff.full_update, true); - } - } -} - -pub async fn start_stream( - api_endpoint: String, - name: String, - token: String, - resource_types: Vec, -) -> Result { - let (tx, rx) = mpsc::channel(); - - let req = crate::ResourceRequest { - request_origin: name, - resource_types: resource_types, - }; - let json = serde_json::to_string(&req)?; - - let auth_value = format!("Bearer {}", token); - - let client = Client::new(); - - let mut stream = client - .get(api_endpoint) - .header("Authorization", &auth_value) - .body(json) - .send() - .await? - .bytes_stream(); - - tokio::spawn(async move { - while let Some(chunk) = stream.next().await { - let bytes = match chunk { - Ok(b) => b, - Err(_e) => { - return; - } - }; - tx.send(bytes).unwrap(); - } - }); - Ok(ResourceStream { - rx: rx, - buf: vec![], - partial: None, - }) -} diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 2c63af2..7b23e51 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -1,36 +1,87 @@ -use std::collections::HashMap; +use bytes::{self, Buf, Bytes}; +use futures_util::StreamExt; +use reqwest::Client; +use std::io::{self, BufRead}; +use std::sync::mpsc; +use tokio; -use serde::{Deserialize, Serialize}; +pub mod proto; -pub mod http; - -#[derive(Serialize)] -pub struct ResourceRequest { - request_origin: String, - resource_types: Vec, +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + Io(io::Error), + JSON(serde_json::Error), } -#[derive(Deserialize, PartialEq, Debug)] -pub struct Resource { - r#type: String, - blocked_in: HashMap, - protocol: String, - address: String, - port: u16, - fingerprint: String, - #[serde(rename = "or-addresses")] - or_addresses: Option>, - distribution: String, - flags: Option>, - params: Option>, +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::JSON(value) + } } -#[derive(Deserialize, PartialEq, Debug)] -pub struct ResourceDiff { - new: Option>>, - changed: Option>>, - gone: Option>>, - full_update: bool, +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Self::Reqwest(value) + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +pub struct ResourceStream { + rx: mpsc::Receiver, + buf: Vec, + partial: Option>, +} + +impl Iterator for ResourceStream { + type Item = proto::ResourceDiff; + fn next(&mut self) -> Option { + let mut parse = + |buffer: &mut bytes::buf::Reader| -> Result, Error> { + match buffer.read_until(b'\r', &mut self.buf) { + Ok(_) => match self.buf.pop() { + Some(b'\r') => match serde_json::from_slice(&self.buf) { + Ok(diff) => { + self.buf.clear(); + return Ok(Some(diff)); + } + Err(e) => return Err(Error::JSON(e)), + }, + Some(n) => { + self.buf.push(n); + return Ok(None); + } + None => return Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; + + if let Some(p) = &mut self.partial { + match parse(p) { + Ok(Some(diff)) => return Some(diff), + Ok(None) => self.partial = None, + Err(_) => return None, + } + } + for chunk in &self.rx { + let mut buffer = chunk.reader(); + match parse(&mut buffer) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Some(diff); + } + Ok(None) => continue, + Err(_) => return None, + }; + } + None + } } #[cfg(test)] @@ -38,168 +89,117 @@ mod tests { use super::*; #[test] - fn serialize_resource_request() { - let req = ResourceRequest { - request_origin: String::from("https"), - resource_types: vec![String::from("obfs2"), String::from("scramblesuit")], - }; - let json = serde_json::to_string(&req).unwrap(); - assert_eq!( - json, - "{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}" - ) - } - - #[test] - fn deserialize_resource() { - let mut flags = HashMap::new(); - flags.insert(String::from("fast"), true); - flags.insert(String::from("stable"), true); - let mut params = HashMap::new(); - params.insert( - String::from("password"), - String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"), + fn parse_resource() { + let chunk = Bytes::from_static( + b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", ); - let bridge = Resource { - r#type: String::from("scramblesuit"), - blocked_in: HashMap::new(), - protocol: String::from("tcp"), - address: String::from("216.117.3.62"), - port: 63174, - fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"), - or_addresses: None, - distribution: String::from("https"), - flags: Some(flags), - params: Some(params), + let (tx, rx) = mpsc::channel(); + tx.send(chunk).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - - let data = r#" - { - "type": "scramblesuit", - "blocked_in": {}, - "protocol": "tcp", - "address": "216.117.3.62", - "port": 63174, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true - }, - "params": { - "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" - } - }"#; - let res: Resource = serde_json::from_str(data).unwrap(); - assert_eq!(bridge, res); - } - - #[test] - fn deserialize_resource_diff() { - let data = r#" - { - "new": { - "obfs2": [ - { - "type": "obfs2", - "blocked_in": {}, - "Location": null, - "protocol": "tcp", - "address": "176.247.216.207", - "port": 42810, - "fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - } - }, - { - "type": "obfs2", - "blocked_in": {}, - "protocol": "tcp", - "address": "133.69.16.145", - "port": 58314, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - } - } - ], - "scramblesuit": [ - { - "type": "scramblesuit", - "blocked_in": {}, - "protocol": "tcp", - "address": "216.117.3.62", - "port": 63174, - "fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1", - "or-addresses": null, - "distribution": "https", - "flags": { - "fast": true, - "stable": true, - "running": true, - "valid": true - }, - "params": { - "password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" - } - } - ] - }, - "changed": null, - "gone": null, - "full_update": true - }"#; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - assert_ne!(diff.new, None); - assert_eq!(diff.changed, None); - assert_eq!(diff.gone, None); - assert_eq!(diff.full_update, true); - if let Some(new) = diff.new { - assert_eq!(new["obfs2"][0].r#type, "obfs2"); + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); } } #[test] - fn deserialize_empty_resource_diff() { - let data = r#" - { - "new": null, - "changed": null, - "gone": null, - "full_update": true - }"#; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - let empty_diff = ResourceDiff { - new: None, - changed: None, - gone: None, - full_update: true, + fn parse_across_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - assert_eq!(diff, empty_diff); + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } } #[test] - fn deserialize_empty_condensed_diff() { - let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; - let diff: ResourceDiff = serde_json::from_str(data).unwrap(); - let empty_diff = ResourceDiff { - new: None, - changed: None, - gone: None, - full_update: true, + fn parse_multi_diff_partial_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = + Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); + let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); + let chunk4 = Bytes::from_static(b"\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + tx.send(chunk3).unwrap(); + tx.send(chunk4).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], }; - assert_eq!(diff, empty_diff); + let mut res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } } } + +pub async fn start_stream( + api_endpoint: String, + name: String, + token: String, + resource_types: Vec, +) -> Result { + let (tx, rx) = mpsc::channel(); + + let req = proto::ResourceRequest { + request_origin: name, + resource_types: resource_types, + }; + let json = serde_json::to_string(&req)?; + + let auth_value = format!("Bearer {}", token); + + let client = Client::new(); + + let mut stream = client + .get(api_endpoint) + .header("Authorization", &auth_value) + .body(json) + .send() + .await? + .bytes_stream(); + + tokio::spawn(async move { + while let Some(chunk) = stream.next().await { + let bytes = match chunk { + Ok(b) => b, + Err(_e) => { + return; + } + }; + tx.send(bytes).unwrap(); + } + }); + Ok(ResourceStream { + rx: rx, + buf: vec![], + partial: None, + }) +}