2023-01-30 15:23:01 -05:00
|
|
|
//! # Rdsys Backend Distributor API
|
|
|
|
//!
|
|
|
|
//! `rdsys_backend` is an implementation of the rdsys backend API
|
|
|
|
//! https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/main/doc/backend-api.md
|
|
|
|
|
2023-01-30 15:08:30 -05:00
|
|
|
use bytes::{self, Buf, Bytes};
|
|
|
|
use futures_util::StreamExt;
|
|
|
|
use reqwest::Client;
|
|
|
|
use std::io::{self, BufRead};
|
|
|
|
use std::sync::mpsc;
|
|
|
|
use tokio;
|
2023-01-20 16:01:03 -05:00
|
|
|
|
2023-01-30 15:08:30 -05:00
|
|
|
pub mod proto;
|
2023-01-20 16:01:03 -05:00
|
|
|
|
2023-01-30 15:08:30 -05:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Error {
|
|
|
|
Reqwest(reqwest::Error),
|
|
|
|
Io(io::Error),
|
|
|
|
JSON(serde_json::Error),
|
|
|
|
}
|
2023-01-29 14:18:39 -05:00
|
|
|
|
2023-01-30 15:08:30 -05:00
|
|
|
impl From<serde_json::Error> for Error {
|
|
|
|
fn from(value: serde_json::Error) -> Self {
|
|
|
|
Self::JSON(value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<reqwest::Error> for Error {
|
|
|
|
fn from(value: reqwest::Error) -> Self {
|
|
|
|
Self::Reqwest(value)
|
|
|
|
}
|
2023-01-20 16:01:03 -05:00
|
|
|
}
|
|
|
|
|
2023-01-30 15:08:30 -05:00
|
|
|
impl From<io::Error> for Error {
|
|
|
|
fn from(value: io::Error) -> Self {
|
|
|
|
Self::Io(value)
|
|
|
|
}
|
2023-01-20 16:01:03 -05:00
|
|
|
}
|
|
|
|
|
2023-01-30 15:23:01 -05:00
|
|
|
/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes
|
|
|
|
/// received from the connection to the rdsys backend
|
2023-01-30 15:08:30 -05:00
|
|
|
pub struct ResourceStream {
|
|
|
|
rx: mpsc::Receiver<Bytes>,
|
|
|
|
buf: Vec<u8>,
|
|
|
|
partial: Option<bytes::buf::Reader<Bytes>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Iterator for ResourceStream {
|
|
|
|
type Item = proto::ResourceDiff;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
let mut parse =
|
|
|
|
|buffer: &mut bytes::buf::Reader<Bytes>| -> Result<Option<Self::Item>, Error> {
|
|
|
|
match buffer.read_until(b'\r', &mut self.buf) {
|
|
|
|
Ok(_) => match self.buf.pop() {
|
|
|
|
Some(b'\r') => match serde_json::from_slice(&self.buf) {
|
|
|
|
Ok(diff) => {
|
|
|
|
self.buf.clear();
|
|
|
|
return Ok(Some(diff));
|
|
|
|
}
|
|
|
|
Err(e) => return Err(Error::JSON(e)),
|
|
|
|
},
|
|
|
|
Some(n) => {
|
|
|
|
self.buf.push(n);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
None => return Ok(None),
|
|
|
|
},
|
|
|
|
Err(e) => Err(Error::Io(e)),
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(p) = &mut self.partial {
|
|
|
|
match parse(p) {
|
|
|
|
Ok(Some(diff)) => return Some(diff),
|
|
|
|
Ok(None) => self.partial = None,
|
|
|
|
Err(_) => return None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for chunk in &self.rx {
|
|
|
|
let mut buffer = chunk.reader();
|
|
|
|
match parse(&mut buffer) {
|
|
|
|
Ok(Some(diff)) => {
|
|
|
|
self.partial = Some(buffer);
|
|
|
|
return Some(diff);
|
|
|
|
}
|
|
|
|
Ok(None) => continue,
|
|
|
|
Err(_) => return None,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
None
|
|
|
|
}
|
2023-01-20 16:01:03 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[test]
|
2023-01-30 15:08:30 -05:00
|
|
|
fn parse_resource() {
|
|
|
|
let chunk = Bytes::from_static(
|
|
|
|
b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r",
|
2023-01-20 16:12:29 -05:00
|
|
|
);
|
2023-01-30 15:08:30 -05:00
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
tx.send(chunk).unwrap();
|
|
|
|
let mut diffs = ResourceStream {
|
|
|
|
rx: rx,
|
|
|
|
partial: None,
|
|
|
|
buf: vec![],
|
2023-01-20 16:01:03 -05:00
|
|
|
};
|
2023-01-30 15:08:30 -05:00
|
|
|
let res = diffs.next();
|
|
|
|
assert_ne!(res, None);
|
|
|
|
if let Some(diff) = res {
|
|
|
|
assert_eq!(diff.new, None);
|
|
|
|
assert_eq!(diff.full_update, true);
|
2023-01-23 12:58:41 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2023-01-30 15:08:30 -05:00
|
|
|
fn parse_across_chunks() {
|
|
|
|
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
|
|
|
let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r");
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
tx.send(chunk1).unwrap();
|
|
|
|
tx.send(chunk2).unwrap();
|
|
|
|
let mut diffs = ResourceStream {
|
|
|
|
rx: rx,
|
|
|
|
partial: None,
|
|
|
|
buf: vec![],
|
2023-01-23 12:58:41 -05:00
|
|
|
};
|
2023-01-30 15:08:30 -05:00
|
|
|
let res = diffs.next();
|
|
|
|
assert_ne!(res, None);
|
|
|
|
if let Some(diff) = res {
|
|
|
|
assert_eq!(diff.new, None);
|
|
|
|
assert_eq!(diff.full_update, true);
|
|
|
|
}
|
2023-01-20 16:01:03 -05:00
|
|
|
}
|
2023-01-30 10:22:32 -05:00
|
|
|
|
|
|
|
#[test]
|
2023-01-30 15:08:30 -05:00
|
|
|
fn parse_multi_diff_partial_chunks() {
|
|
|
|
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
|
|
|
let chunk2 =
|
|
|
|
Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed");
|
|
|
|
let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}");
|
|
|
|
let chunk4 = Bytes::from_static(b"\r");
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
tx.send(chunk1).unwrap();
|
|
|
|
tx.send(chunk2).unwrap();
|
|
|
|
tx.send(chunk3).unwrap();
|
|
|
|
tx.send(chunk4).unwrap();
|
|
|
|
let mut diffs = ResourceStream {
|
|
|
|
rx: rx,
|
|
|
|
partial: None,
|
|
|
|
buf: vec![],
|
2023-01-30 10:22:32 -05:00
|
|
|
};
|
2023-01-30 15:08:30 -05:00
|
|
|
let mut res = diffs.next();
|
|
|
|
assert_ne!(res, None);
|
|
|
|
if let Some(diff) = res {
|
|
|
|
assert_eq!(diff.new, None);
|
|
|
|
assert_eq!(diff.full_update, true);
|
|
|
|
}
|
|
|
|
res = diffs.next();
|
|
|
|
assert_ne!(res, None);
|
|
|
|
if let Some(diff) = res {
|
|
|
|
assert_eq!(diff.new, None);
|
|
|
|
assert_eq!(diff.full_update, true);
|
|
|
|
}
|
2023-01-30 10:22:32 -05:00
|
|
|
}
|
2023-01-20 16:01:03 -05:00
|
|
|
}
|
2023-01-30 15:08:30 -05:00
|
|
|
|
2023-01-30 15:23:01 -05:00
|
|
|
/// Makes an http connection to the rdsys backend api endpoint and returns a ResourceStream
|
|
|
|
/// if successful
|
|
|
|
///
|
|
|
|
/// # Examples
|
|
|
|
/// use rdsys_backend::start_stream;
|
|
|
|
///
|
|
|
|
/// ```
|
|
|
|
/// let endpoint = String::from("http://127.0.0.1:7100/resource-stream");
|
|
|
|
/// let name = String::from("https");
|
|
|
|
/// let token = String::from("HttpsApiTokenPlaceholder");
|
|
|
|
/// let types = vec![String::from("obfs2"), String::from("scramblesuit")];
|
|
|
|
/// let rx = start_stream(endpoint, name, token, types).await.unwrap();
|
|
|
|
/// for diff in rx {
|
|
|
|
/// println!("Received diff: {:?}", diff);
|
|
|
|
/// }
|
|
|
|
/// ```
|
|
|
|
///
|
2023-01-30 15:08:30 -05:00
|
|
|
pub async fn start_stream(
|
|
|
|
api_endpoint: String,
|
|
|
|
name: String,
|
|
|
|
token: String,
|
|
|
|
resource_types: Vec<String>,
|
|
|
|
) -> Result<ResourceStream, Error> {
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
|
|
|
|
let req = proto::ResourceRequest {
|
|
|
|
request_origin: name,
|
|
|
|
resource_types: resource_types,
|
|
|
|
};
|
|
|
|
let json = serde_json::to_string(&req)?;
|
|
|
|
|
|
|
|
let auth_value = format!("Bearer {}", token);
|
|
|
|
|
|
|
|
let client = Client::new();
|
|
|
|
|
|
|
|
let mut stream = client
|
|
|
|
.get(api_endpoint)
|
|
|
|
.header("Authorization", &auth_value)
|
|
|
|
.body(json)
|
|
|
|
.send()
|
|
|
|
.await?
|
|
|
|
.bytes_stream();
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
while let Some(chunk) = stream.next().await {
|
|
|
|
let bytes = match chunk {
|
|
|
|
Ok(b) => b,
|
|
|
|
Err(_e) => {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
tx.send(bytes).unwrap();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
Ok(ResourceStream {
|
|
|
|
rx: rx,
|
|
|
|
buf: vec![],
|
|
|
|
partial: None,
|
|
|
|
})
|
|
|
|
}
|