//! # Rdsys Backend Distributor API //! //! `rdsys_backend` is an implementation of the rdsys backend API //! https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/doc/backend-api.md use bytes::{self, Buf, Bytes}; use futures_util::StreamExt; use reqwest::Client; use std::io::{self, BufRead}; use std::sync::mpsc; pub mod proto; #[derive(Debug)] pub enum Error { Reqwest(reqwest::Error), Io(io::Error), JSON(serde_json::Error), } impl From for Error { fn from(value: serde_json::Error) -> Self { Self::JSON(value) } } impl From for Error { fn from(value: reqwest::Error) -> Self { Self::Reqwest(value) } } impl From for Error { fn from(value: io::Error) -> Self { Self::Io(value) } } /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// received from the connection to the rdsys backend pub struct ResourceStream { rx: mpsc::Receiver, buf: Vec, partial: Option>, } impl Iterator for ResourceStream { type Item = proto::ResourceDiff; fn next(&mut self) -> Option { let mut parse = |buffer: &mut bytes::buf::Reader| -> Result, Error> { match buffer.read_until(b'\r', &mut self.buf) { Ok(_) => match self.buf.pop() { Some(b'\r') => match serde_json::from_slice(&self.buf) { Ok(diff) => { self.buf.clear(); Ok(Some(diff)) } Err(e) => Err(Error::JSON(e)), }, Some(n) => { self.buf.push(n); Ok(None) } None => Ok(None), }, Err(e) => Err(Error::Io(e)), } }; if let Some(p) = &mut self.partial { match parse(p) { Ok(Some(diff)) => return Some(diff), Ok(None) => self.partial = None, Err(_) => return None, } } for chunk in &self.rx { let mut buffer = chunk.reader(); match parse(&mut buffer) { Ok(Some(diff)) => { self.partial = Some(buffer); return Some(diff); } Ok(None) => continue, Err(_) => return None, }; } None } } #[cfg(test)] mod tests { use super::*; #[test] fn parse_resource() { let chunk = Bytes::from_static( b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", ); let (tx, rx) = mpsc::channel(); tx.send(chunk).unwrap(); let mut diffs = ResourceStream { rx: rx, partial: None, buf: vec![], }; let res = diffs.next(); assert_ne!(res, None); if let Some(diff) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } } #[test] fn parse_across_chunks() { let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); let (tx, rx) = mpsc::channel(); tx.send(chunk1).unwrap(); tx.send(chunk2).unwrap(); let mut diffs = ResourceStream { rx: rx, partial: None, buf: vec![], }; let res = diffs.next(); assert_ne!(res, None); if let Some(diff) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } } #[test] fn parse_multi_diff_partial_chunks() { let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); let chunk4 = Bytes::from_static(b"\r"); let (tx, rx) = mpsc::channel(); tx.send(chunk1).unwrap(); tx.send(chunk2).unwrap(); tx.send(chunk3).unwrap(); tx.send(chunk4).unwrap(); let mut diffs = ResourceStream { rx: rx, partial: None, buf: vec![], }; let mut res = diffs.next(); assert_ne!(res, None); if let Some(diff) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } res = diffs.next(); assert_ne!(res, None); if let Some(diff) = res { assert_eq!(diff.new, None); assert_eq!(diff.full_update, true); } } } /// Makes an http connection to the rdsys backend api endpoint and returns a ResourceStream /// if successful /// /// # Examples /// /// ```ignore /// use rdsys_backend::start_stream; /// /// let endpoint = String::from("http://127.0.0.1:7100/resource-stream"); /// let name = String::from("https"); /// let token = String::from("HttpsApiTokenPlaceholder"); /// let types = vec![String::from("obfs2"), String::from("scramblesuit")]; /// let rx = start_stream(endpoint, name, token, types).await.unwrap(); /// for diff in rx { /// println!("Received diff: {:?}", diff); /// } /// ``` /// pub async fn start_stream( api_endpoint: String, name: String, token: String, resource_types: Vec, ) -> Result { let (tx, rx) = mpsc::channel(); let req = proto::ResourceRequest { request_origin: name, resource_types, }; let json = serde_json::to_string(&req)?; let auth_value = format!("Bearer {}", token); let client = Client::new(); let mut stream = client .get(api_endpoint) .header("Authorization", &auth_value) .body(json) .send() .await? .bytes_stream(); tokio::spawn(async move { while let Some(chunk) = stream.next().await { let bytes = match chunk { Ok(b) => b, Err(_e) => { return; } }; tx.send(bytes).unwrap(); } }); Ok(ResourceStream { rx, buf: vec![], partial: None, }) }