From 9d448b00b49cc777d0d9c6e42b1e5db4648919a4 Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Mon, 30 Jan 2023 10:22:32 -0500 Subject: [PATCH] Return an iterator over ResourceDiff items Rather than returning a Receiver, instead return a ResourceStream struct that implements the Iterator trait so callers can simply iterate over the ResourceDiff items received. --- crates/rdsys-backend/Cargo.toml | 2 + crates/rdsys-backend/src/http.rs | 175 ++++++++++++++++++++++++++++--- crates/rdsys-backend/src/lib.rs | 13 +++ 3 files changed, 178 insertions(+), 12 deletions(-) diff --git a/crates/rdsys-backend/Cargo.toml b/crates/rdsys-backend/Cargo.toml index 43e6861..df6fbc6 100644 --- a/crates/rdsys-backend/Cargo.toml +++ b/crates/rdsys-backend/Cargo.toml @@ -9,5 +9,7 @@ edition = "2021" serde_json = "1" futures-util = { version = "0.3"} serde = { version = "1", features = ["derive"]} +bytes = "1" +tokio = "1" reqwest = { version = "0.11", features = ["stream"]} diff --git a/crates/rdsys-backend/src/http.rs b/crates/rdsys-backend/src/http.rs index b03d742..90971e4 100644 --- a/crates/rdsys-backend/src/http.rs +++ b/crates/rdsys-backend/src/http.rs @@ -1,12 +1,15 @@ -use reqwest::Client; +use bytes::{self, Buf, Bytes}; use futures_util::StreamExt; -use std::io; +use reqwest::Client; +use std::io::{self, BufRead}; +use std::sync::mpsc; +use tokio; #[derive(Debug)] pub enum Error { - Reqwest (reqwest::Error), - Io (io::Error), - JSON (serde_json::Error), + Reqwest(reqwest::Error), + Io(io::Error), + JSON(serde_json::Error), } impl From for Error { @@ -27,7 +30,141 @@ impl From for Error { } } -pub async fn start_stream(api_endpoint: String, name: String, token: String, resource_types: Vec ) -> Result<(), Error> { +pub struct ResourceStream { + rx: mpsc::Receiver, + buf: Vec, + partial: Option>, +} + +impl Iterator for ResourceStream { + type Item = crate::ResourceDiff; + fn next(&mut self) -> Option { + let mut parse = + |buffer: &mut bytes::buf::Reader| -> Result, Error> { + match buffer.read_until(b'\r', &mut self.buf) { + Ok(_) => match self.buf.pop() { + Some(b'\r') => match serde_json::from_slice(&self.buf) { + Ok(diff) => { + self.buf.clear(); + return Ok(Some(diff)); + } + Err(e) => return Err(Error::JSON(e)), + }, + Some(n) => { + self.buf.push(n); + return Ok(None); + } + None => return Ok(None), + }, + Err(e) => Err(Error::Io(e)), + } + }; + + if let Some(p) = &mut self.partial { + match parse(p) { + Ok(Some(diff)) => return Some(diff), + Ok(None) => self.partial = None, + Err(_) => return None, + } + } + for chunk in &self.rx { + let mut buffer = chunk.reader(); + match parse(&mut buffer) { + Ok(Some(diff)) => { + self.partial = Some(buffer); + return Some(diff); + } + Ok(None) => continue, + Err(_) => return None, + }; + } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_resource() { + let chunk = Bytes::from_static( + b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r", + ); + let (tx, rx) = mpsc::channel(); + tx.send(chunk).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[test] + fn parse_across_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } + + #[test] + fn parse_multi_diff_partial_chunks() { + let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,"); + let chunk2 = + Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed"); + let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}"); + let chunk4 = Bytes::from_static(b"\r"); + let (tx, rx) = mpsc::channel(); + tx.send(chunk1).unwrap(); + tx.send(chunk2).unwrap(); + tx.send(chunk3).unwrap(); + tx.send(chunk4).unwrap(); + let mut diffs = ResourceStream { + rx: rx, + partial: None, + buf: vec![], + }; + let mut res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + res = diffs.next(); + assert_ne!(res, None); + if let Some(diff) = res { + assert_eq!(diff.new, None); + assert_eq!(diff.full_update, true); + } + } +} + +pub async fn start_stream( + api_endpoint: String, + name: String, + token: String, + resource_types: Vec, +) -> Result { + let (tx, rx) = mpsc::channel(); let req = crate::ResourceRequest { request_origin: name, @@ -35,18 +172,32 @@ pub async fn start_stream(api_endpoint: String, name: String, token: String, res }; let json = serde_json::to_string(&req)?; - let auth_value = format!("Bearer {}",token); + let auth_value = format!("Bearer {}", token); let client = Client::new(); - let mut stream = client.get(api_endpoint) + + let mut stream = client + .get(api_endpoint) .header("Authorization", &auth_value) .body(json) .send() .await? .bytes_stream(); - while let Some(chunk) = stream.next().await { - println!("Chunk: {:?}", chunk?); - }; - Ok(()) + tokio::spawn(async move { + while let Some(chunk) = stream.next().await { + let bytes = match chunk { + Ok(b) => b, + Err(_e) => { + return; + } + }; + tx.send(bytes).unwrap(); + } + }); + Ok(ResourceStream { + rx: rx, + buf: vec![], + partial: None, + }) } diff --git a/crates/rdsys-backend/src/lib.rs b/crates/rdsys-backend/src/lib.rs index 2192df7..2c63af2 100644 --- a/crates/rdsys-backend/src/lib.rs +++ b/crates/rdsys-backend/src/lib.rs @@ -189,4 +189,17 @@ mod tests { }; assert_eq!(diff, empty_diff); } + + #[test] + fn deserialize_empty_condensed_diff() { + let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}"; + let diff: ResourceDiff = serde_json::from_str(data).unwrap(); + let empty_diff = ResourceDiff { + new: None, + changed: None, + gone: None, + full_update: true, + }; + assert_eq!(diff, empty_diff); + } }