Return an iterator over ResourceDiff items
Rather than returning a Receiver<Bytes>, instead return a ResourceStream struct that implements the Iterator trait so callers can simply iterate over the ResourceDiff items received.
This commit is contained in:
parent
c89aa4a9f9
commit
9d448b00b4
|
@ -9,5 +9,7 @@ edition = "2021"
|
|||
serde_json = "1"
|
||||
futures-util = { version = "0.3"}
|
||||
serde = { version = "1", features = ["derive"]}
|
||||
bytes = "1"
|
||||
tokio = "1"
|
||||
|
||||
reqwest = { version = "0.11", features = ["stream"]}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use reqwest::Client;
|
||||
use bytes::{self, Buf, Bytes};
|
||||
use futures_util::StreamExt;
|
||||
use std::io;
|
||||
use reqwest::Client;
|
||||
use std::io::{self, BufRead};
|
||||
use std::sync::mpsc;
|
||||
use tokio;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
|
@ -27,7 +30,141 @@ impl From<io::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn start_stream(api_endpoint: String, name: String, token: String, resource_types: Vec<String> ) -> Result<(), Error> {
|
||||
pub struct ResourceStream {
|
||||
rx: mpsc::Receiver<Bytes>,
|
||||
buf: Vec<u8>,
|
||||
partial: Option<bytes::buf::Reader<Bytes>>,
|
||||
}
|
||||
|
||||
impl Iterator for ResourceStream {
|
||||
type Item = crate::ResourceDiff;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mut parse =
|
||||
|buffer: &mut bytes::buf::Reader<Bytes>| -> Result<Option<Self::Item>, Error> {
|
||||
match buffer.read_until(b'\r', &mut self.buf) {
|
||||
Ok(_) => match self.buf.pop() {
|
||||
Some(b'\r') => match serde_json::from_slice(&self.buf) {
|
||||
Ok(diff) => {
|
||||
self.buf.clear();
|
||||
return Ok(Some(diff));
|
||||
}
|
||||
Err(e) => return Err(Error::JSON(e)),
|
||||
},
|
||||
Some(n) => {
|
||||
self.buf.push(n);
|
||||
return Ok(None);
|
||||
}
|
||||
None => return Ok(None),
|
||||
},
|
||||
Err(e) => Err(Error::Io(e)),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(p) = &mut self.partial {
|
||||
match parse(p) {
|
||||
Ok(Some(diff)) => return Some(diff),
|
||||
Ok(None) => self.partial = None,
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
for chunk in &self.rx {
|
||||
let mut buffer = chunk.reader();
|
||||
match parse(&mut buffer) {
|
||||
Ok(Some(diff)) => {
|
||||
self.partial = Some(buffer);
|
||||
return Some(diff);
|
||||
}
|
||||
Ok(None) => continue,
|
||||
Err(_) => return None,
|
||||
};
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_resource() {
|
||||
let chunk = Bytes::from_static(
|
||||
b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r",
|
||||
);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
tx.send(chunk).unwrap();
|
||||
let mut diffs = ResourceStream {
|
||||
rx: rx,
|
||||
partial: None,
|
||||
buf: vec![],
|
||||
};
|
||||
let res = diffs.next();
|
||||
assert_ne!(res, None);
|
||||
if let Some(diff) = res {
|
||||
assert_eq!(diff.new, None);
|
||||
assert_eq!(diff.full_update, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_across_chunks() {
|
||||
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
||||
let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r");
|
||||
let (tx, rx) = mpsc::channel();
|
||||
tx.send(chunk1).unwrap();
|
||||
tx.send(chunk2).unwrap();
|
||||
let mut diffs = ResourceStream {
|
||||
rx: rx,
|
||||
partial: None,
|
||||
buf: vec![],
|
||||
};
|
||||
let res = diffs.next();
|
||||
assert_ne!(res, None);
|
||||
if let Some(diff) = res {
|
||||
assert_eq!(diff.new, None);
|
||||
assert_eq!(diff.full_update, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_multi_diff_partial_chunks() {
|
||||
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
||||
let chunk2 =
|
||||
Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed");
|
||||
let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}");
|
||||
let chunk4 = Bytes::from_static(b"\r");
|
||||
let (tx, rx) = mpsc::channel();
|
||||
tx.send(chunk1).unwrap();
|
||||
tx.send(chunk2).unwrap();
|
||||
tx.send(chunk3).unwrap();
|
||||
tx.send(chunk4).unwrap();
|
||||
let mut diffs = ResourceStream {
|
||||
rx: rx,
|
||||
partial: None,
|
||||
buf: vec![],
|
||||
};
|
||||
let mut res = diffs.next();
|
||||
assert_ne!(res, None);
|
||||
if let Some(diff) = res {
|
||||
assert_eq!(diff.new, None);
|
||||
assert_eq!(diff.full_update, true);
|
||||
}
|
||||
res = diffs.next();
|
||||
assert_ne!(res, None);
|
||||
if let Some(diff) = res {
|
||||
assert_eq!(diff.new, None);
|
||||
assert_eq!(diff.full_update, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_stream(
|
||||
api_endpoint: String,
|
||||
name: String,
|
||||
token: String,
|
||||
resource_types: Vec<String>,
|
||||
) -> Result<ResourceStream, Error> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let req = crate::ResourceRequest {
|
||||
request_origin: name,
|
||||
|
@ -38,15 +175,29 @@ pub async fn start_stream(api_endpoint: String, name: String, token: String, res
|
|||
let auth_value = format!("Bearer {}", token);
|
||||
|
||||
let client = Client::new();
|
||||
let mut stream = client.get(api_endpoint)
|
||||
|
||||
let mut stream = client
|
||||
.get(api_endpoint)
|
||||
.header("Authorization", &auth_value)
|
||||
.body(json)
|
||||
.send()
|
||||
.await?
|
||||
.bytes_stream();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(chunk) = stream.next().await {
|
||||
println!("Chunk: {:?}", chunk?);
|
||||
};
|
||||
Ok(())
|
||||
let bytes = match chunk {
|
||||
Ok(b) => b,
|
||||
Err(_e) => {
|
||||
return;
|
||||
}
|
||||
};
|
||||
tx.send(bytes).unwrap();
|
||||
}
|
||||
});
|
||||
Ok(ResourceStream {
|
||||
rx: rx,
|
||||
buf: vec![],
|
||||
partial: None,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -189,4 +189,17 @@ mod tests {
|
|||
};
|
||||
assert_eq!(diff, empty_diff);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_empty_condensed_diff() {
|
||||
let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}";
|
||||
let diff: ResourceDiff = serde_json::from_str(data).unwrap();
|
||||
let empty_diff = ResourceDiff {
|
||||
new: None,
|
||||
changed: None,
|
||||
gone: None,
|
||||
full_update: true,
|
||||
};
|
||||
assert_eq!(diff, empty_diff);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue