Refactor modules
This commit is contained in:
parent
9d448b00b4
commit
04e7caef31
|
@ -1,203 +0,0 @@
|
||||||
use bytes::{self, Buf, Bytes};
|
|
||||||
use futures_util::StreamExt;
|
|
||||||
use reqwest::Client;
|
|
||||||
use std::io::{self, BufRead};
|
|
||||||
use std::sync::mpsc;
|
|
||||||
use tokio;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Error {
|
|
||||||
Reqwest(reqwest::Error),
|
|
||||||
Io(io::Error),
|
|
||||||
JSON(serde_json::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<serde_json::Error> for Error {
|
|
||||||
fn from(value: serde_json::Error) -> Self {
|
|
||||||
Self::JSON(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<reqwest::Error> for Error {
|
|
||||||
fn from(value: reqwest::Error) -> Self {
|
|
||||||
Self::Reqwest(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<io::Error> for Error {
|
|
||||||
fn from(value: io::Error) -> Self {
|
|
||||||
Self::Io(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ResourceStream {
|
|
||||||
rx: mpsc::Receiver<Bytes>,
|
|
||||||
buf: Vec<u8>,
|
|
||||||
partial: Option<bytes::buf::Reader<Bytes>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Iterator for ResourceStream {
|
|
||||||
type Item = crate::ResourceDiff;
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
let mut parse =
|
|
||||||
|buffer: &mut bytes::buf::Reader<Bytes>| -> Result<Option<Self::Item>, Error> {
|
|
||||||
match buffer.read_until(b'\r', &mut self.buf) {
|
|
||||||
Ok(_) => match self.buf.pop() {
|
|
||||||
Some(b'\r') => match serde_json::from_slice(&self.buf) {
|
|
||||||
Ok(diff) => {
|
|
||||||
self.buf.clear();
|
|
||||||
return Ok(Some(diff));
|
|
||||||
}
|
|
||||||
Err(e) => return Err(Error::JSON(e)),
|
|
||||||
},
|
|
||||||
Some(n) => {
|
|
||||||
self.buf.push(n);
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
None => return Ok(None),
|
|
||||||
},
|
|
||||||
Err(e) => Err(Error::Io(e)),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(p) = &mut self.partial {
|
|
||||||
match parse(p) {
|
|
||||||
Ok(Some(diff)) => return Some(diff),
|
|
||||||
Ok(None) => self.partial = None,
|
|
||||||
Err(_) => return None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for chunk in &self.rx {
|
|
||||||
let mut buffer = chunk.reader();
|
|
||||||
match parse(&mut buffer) {
|
|
||||||
Ok(Some(diff)) => {
|
|
||||||
self.partial = Some(buffer);
|
|
||||||
return Some(diff);
|
|
||||||
}
|
|
||||||
Ok(None) => continue,
|
|
||||||
Err(_) => return None,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_resource() {
|
|
||||||
let chunk = Bytes::from_static(
|
|
||||||
b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r",
|
|
||||||
);
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
tx.send(chunk).unwrap();
|
|
||||||
let mut diffs = ResourceStream {
|
|
||||||
rx: rx,
|
|
||||||
partial: None,
|
|
||||||
buf: vec![],
|
|
||||||
};
|
|
||||||
let res = diffs.next();
|
|
||||||
assert_ne!(res, None);
|
|
||||||
if let Some(diff) = res {
|
|
||||||
assert_eq!(diff.new, None);
|
|
||||||
assert_eq!(diff.full_update, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_across_chunks() {
|
|
||||||
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
|
||||||
let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r");
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
tx.send(chunk1).unwrap();
|
|
||||||
tx.send(chunk2).unwrap();
|
|
||||||
let mut diffs = ResourceStream {
|
|
||||||
rx: rx,
|
|
||||||
partial: None,
|
|
||||||
buf: vec![],
|
|
||||||
};
|
|
||||||
let res = diffs.next();
|
|
||||||
assert_ne!(res, None);
|
|
||||||
if let Some(diff) = res {
|
|
||||||
assert_eq!(diff.new, None);
|
|
||||||
assert_eq!(diff.full_update, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_multi_diff_partial_chunks() {
|
|
||||||
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
|
||||||
let chunk2 =
|
|
||||||
Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed");
|
|
||||||
let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}");
|
|
||||||
let chunk4 = Bytes::from_static(b"\r");
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
tx.send(chunk1).unwrap();
|
|
||||||
tx.send(chunk2).unwrap();
|
|
||||||
tx.send(chunk3).unwrap();
|
|
||||||
tx.send(chunk4).unwrap();
|
|
||||||
let mut diffs = ResourceStream {
|
|
||||||
rx: rx,
|
|
||||||
partial: None,
|
|
||||||
buf: vec![],
|
|
||||||
};
|
|
||||||
let mut res = diffs.next();
|
|
||||||
assert_ne!(res, None);
|
|
||||||
if let Some(diff) = res {
|
|
||||||
assert_eq!(diff.new, None);
|
|
||||||
assert_eq!(diff.full_update, true);
|
|
||||||
}
|
|
||||||
res = diffs.next();
|
|
||||||
assert_ne!(res, None);
|
|
||||||
if let Some(diff) = res {
|
|
||||||
assert_eq!(diff.new, None);
|
|
||||||
assert_eq!(diff.full_update, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start_stream(
|
|
||||||
api_endpoint: String,
|
|
||||||
name: String,
|
|
||||||
token: String,
|
|
||||||
resource_types: Vec<String>,
|
|
||||||
) -> Result<ResourceStream, Error> {
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
|
|
||||||
let req = crate::ResourceRequest {
|
|
||||||
request_origin: name,
|
|
||||||
resource_types: resource_types,
|
|
||||||
};
|
|
||||||
let json = serde_json::to_string(&req)?;
|
|
||||||
|
|
||||||
let auth_value = format!("Bearer {}", token);
|
|
||||||
|
|
||||||
let client = Client::new();
|
|
||||||
|
|
||||||
let mut stream = client
|
|
||||||
.get(api_endpoint)
|
|
||||||
.header("Authorization", &auth_value)
|
|
||||||
.body(json)
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.bytes_stream();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(chunk) = stream.next().await {
|
|
||||||
let bytes = match chunk {
|
|
||||||
Ok(b) => b,
|
|
||||||
Err(_e) => {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
tx.send(bytes).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(ResourceStream {
|
|
||||||
rx: rx,
|
|
||||||
buf: vec![],
|
|
||||||
partial: None,
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,36 +1,87 @@
|
||||||
use std::collections::HashMap;
|
use bytes::{self, Buf, Bytes};
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use reqwest::Client;
|
||||||
|
use std::io::{self, BufRead};
|
||||||
|
use std::sync::mpsc;
|
||||||
|
use tokio;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
pub mod proto;
|
||||||
|
|
||||||
pub mod http;
|
#[derive(Debug)]
|
||||||
|
pub enum Error {
|
||||||
#[derive(Serialize)]
|
Reqwest(reqwest::Error),
|
||||||
pub struct ResourceRequest {
|
Io(io::Error),
|
||||||
request_origin: String,
|
JSON(serde_json::Error),
|
||||||
resource_types: Vec<String>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, PartialEq, Debug)]
|
impl From<serde_json::Error> for Error {
|
||||||
pub struct Resource {
|
fn from(value: serde_json::Error) -> Self {
|
||||||
r#type: String,
|
Self::JSON(value)
|
||||||
blocked_in: HashMap<String, bool>,
|
}
|
||||||
protocol: String,
|
|
||||||
address: String,
|
|
||||||
port: u16,
|
|
||||||
fingerprint: String,
|
|
||||||
#[serde(rename = "or-addresses")]
|
|
||||||
or_addresses: Option<Vec<String>>,
|
|
||||||
distribution: String,
|
|
||||||
flags: Option<HashMap<String, bool>>,
|
|
||||||
params: Option<HashMap<String, String>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, PartialEq, Debug)]
|
impl From<reqwest::Error> for Error {
|
||||||
pub struct ResourceDiff {
|
fn from(value: reqwest::Error) -> Self {
|
||||||
new: Option<HashMap<String, Vec<Resource>>>,
|
Self::Reqwest(value)
|
||||||
changed: Option<HashMap<String, Vec<Resource>>>,
|
}
|
||||||
gone: Option<HashMap<String, Vec<Resource>>>,
|
}
|
||||||
full_update: bool,
|
|
||||||
|
impl From<io::Error> for Error {
|
||||||
|
fn from(value: io::Error) -> Self {
|
||||||
|
Self::Io(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ResourceStream {
|
||||||
|
rx: mpsc::Receiver<Bytes>,
|
||||||
|
buf: Vec<u8>,
|
||||||
|
partial: Option<bytes::buf::Reader<Bytes>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for ResourceStream {
|
||||||
|
type Item = proto::ResourceDiff;
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let mut parse =
|
||||||
|
|buffer: &mut bytes::buf::Reader<Bytes>| -> Result<Option<Self::Item>, Error> {
|
||||||
|
match buffer.read_until(b'\r', &mut self.buf) {
|
||||||
|
Ok(_) => match self.buf.pop() {
|
||||||
|
Some(b'\r') => match serde_json::from_slice(&self.buf) {
|
||||||
|
Ok(diff) => {
|
||||||
|
self.buf.clear();
|
||||||
|
return Ok(Some(diff));
|
||||||
|
}
|
||||||
|
Err(e) => return Err(Error::JSON(e)),
|
||||||
|
},
|
||||||
|
Some(n) => {
|
||||||
|
self.buf.push(n);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
None => return Ok(None),
|
||||||
|
},
|
||||||
|
Err(e) => Err(Error::Io(e)),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(p) = &mut self.partial {
|
||||||
|
match parse(p) {
|
||||||
|
Ok(Some(diff)) => return Some(diff),
|
||||||
|
Ok(None) => self.partial = None,
|
||||||
|
Err(_) => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for chunk in &self.rx {
|
||||||
|
let mut buffer = chunk.reader();
|
||||||
|
match parse(&mut buffer) {
|
||||||
|
Ok(Some(diff)) => {
|
||||||
|
self.partial = Some(buffer);
|
||||||
|
return Some(diff);
|
||||||
|
}
|
||||||
|
Ok(None) => continue,
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -38,168 +89,117 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serialize_resource_request() {
|
fn parse_resource() {
|
||||||
let req = ResourceRequest {
|
let chunk = Bytes::from_static(
|
||||||
request_origin: String::from("https"),
|
b"{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}\r",
|
||||||
resource_types: vec![String::from("obfs2"), String::from("scramblesuit")],
|
|
||||||
};
|
|
||||||
let json = serde_json::to_string(&req).unwrap();
|
|
||||||
assert_eq!(
|
|
||||||
json,
|
|
||||||
"{\"request_origin\":\"https\",\"resource_types\":[\"obfs2\",\"scramblesuit\"]}"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn deserialize_resource() {
|
|
||||||
let mut flags = HashMap::new();
|
|
||||||
flags.insert(String::from("fast"), true);
|
|
||||||
flags.insert(String::from("stable"), true);
|
|
||||||
let mut params = HashMap::new();
|
|
||||||
params.insert(
|
|
||||||
String::from("password"),
|
|
||||||
String::from("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"),
|
|
||||||
);
|
);
|
||||||
let bridge = Resource {
|
let (tx, rx) = mpsc::channel();
|
||||||
r#type: String::from("scramblesuit"),
|
tx.send(chunk).unwrap();
|
||||||
blocked_in: HashMap::new(),
|
let mut diffs = ResourceStream {
|
||||||
protocol: String::from("tcp"),
|
rx: rx,
|
||||||
address: String::from("216.117.3.62"),
|
partial: None,
|
||||||
port: 63174,
|
buf: vec![],
|
||||||
fingerprint: String::from("BE84A97D02130470A1C77839954392BA979F7EE1"),
|
|
||||||
or_addresses: None,
|
|
||||||
distribution: String::from("https"),
|
|
||||||
flags: Some(flags),
|
|
||||||
params: Some(params),
|
|
||||||
};
|
};
|
||||||
|
let res = diffs.next();
|
||||||
let data = r#"
|
assert_ne!(res, None);
|
||||||
{
|
if let Some(diff) = res {
|
||||||
"type": "scramblesuit",
|
assert_eq!(diff.new, None);
|
||||||
"blocked_in": {},
|
|
||||||
"protocol": "tcp",
|
|
||||||
"address": "216.117.3.62",
|
|
||||||
"port": 63174,
|
|
||||||
"fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1",
|
|
||||||
"or-addresses": null,
|
|
||||||
"distribution": "https",
|
|
||||||
"flags": {
|
|
||||||
"fast": true,
|
|
||||||
"stable": true
|
|
||||||
},
|
|
||||||
"params": {
|
|
||||||
"password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
|
|
||||||
}
|
|
||||||
}"#;
|
|
||||||
let res: Resource = serde_json::from_str(data).unwrap();
|
|
||||||
assert_eq!(bridge, res);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn deserialize_resource_diff() {
|
|
||||||
let data = r#"
|
|
||||||
{
|
|
||||||
"new": {
|
|
||||||
"obfs2": [
|
|
||||||
{
|
|
||||||
"type": "obfs2",
|
|
||||||
"blocked_in": {},
|
|
||||||
"Location": null,
|
|
||||||
"protocol": "tcp",
|
|
||||||
"address": "176.247.216.207",
|
|
||||||
"port": 42810,
|
|
||||||
"fingerprint": "10282810115283F99ADE5CFE42D49644F45D715D",
|
|
||||||
"or-addresses": null,
|
|
||||||
"distribution": "https",
|
|
||||||
"flags": {
|
|
||||||
"fast": true,
|
|
||||||
"stable": true,
|
|
||||||
"running": true,
|
|
||||||
"valid": true
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "obfs2",
|
|
||||||
"blocked_in": {},
|
|
||||||
"protocol": "tcp",
|
|
||||||
"address": "133.69.16.145",
|
|
||||||
"port": 58314,
|
|
||||||
"fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1",
|
|
||||||
"or-addresses": null,
|
|
||||||
"distribution": "https",
|
|
||||||
"flags": {
|
|
||||||
"fast": true,
|
|
||||||
"stable": true,
|
|
||||||
"running": true,
|
|
||||||
"valid": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"scramblesuit": [
|
|
||||||
{
|
|
||||||
"type": "scramblesuit",
|
|
||||||
"blocked_in": {},
|
|
||||||
"protocol": "tcp",
|
|
||||||
"address": "216.117.3.62",
|
|
||||||
"port": 63174,
|
|
||||||
"fingerprint": "BE84A97D02130470A1C77839954392BA979F7EE1",
|
|
||||||
"or-addresses": null,
|
|
||||||
"distribution": "https",
|
|
||||||
"flags": {
|
|
||||||
"fast": true,
|
|
||||||
"stable": true,
|
|
||||||
"running": true,
|
|
||||||
"valid": true
|
|
||||||
},
|
|
||||||
"params": {
|
|
||||||
"password": "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"changed": null,
|
|
||||||
"gone": null,
|
|
||||||
"full_update": true
|
|
||||||
}"#;
|
|
||||||
let diff: ResourceDiff = serde_json::from_str(data).unwrap();
|
|
||||||
assert_ne!(diff.new, None);
|
|
||||||
assert_eq!(diff.changed, None);
|
|
||||||
assert_eq!(diff.gone, None);
|
|
||||||
assert_eq!(diff.full_update, true);
|
assert_eq!(diff.full_update, true);
|
||||||
if let Some(new) = diff.new {
|
|
||||||
assert_eq!(new["obfs2"][0].r#type, "obfs2");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn deserialize_empty_resource_diff() {
|
fn parse_across_chunks() {
|
||||||
let data = r#"
|
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
||||||
{
|
let chunk2 = Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r");
|
||||||
"new": null,
|
let (tx, rx) = mpsc::channel();
|
||||||
"changed": null,
|
tx.send(chunk1).unwrap();
|
||||||
"gone": null,
|
tx.send(chunk2).unwrap();
|
||||||
"full_update": true
|
let mut diffs = ResourceStream {
|
||||||
}"#;
|
rx: rx,
|
||||||
let diff: ResourceDiff = serde_json::from_str(data).unwrap();
|
partial: None,
|
||||||
let empty_diff = ResourceDiff {
|
buf: vec![],
|
||||||
new: None,
|
|
||||||
changed: None,
|
|
||||||
gone: None,
|
|
||||||
full_update: true,
|
|
||||||
};
|
};
|
||||||
assert_eq!(diff, empty_diff);
|
let res = diffs.next();
|
||||||
|
assert_ne!(res, None);
|
||||||
|
if let Some(diff) = res {
|
||||||
|
assert_eq!(diff.new, None);
|
||||||
|
assert_eq!(diff.full_update, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn deserialize_empty_condensed_diff() {
|
fn parse_multi_diff_partial_chunks() {
|
||||||
let data = "{\"new\": null,\"changed\": null,\"gone\": null,\"full_update\": true}";
|
let chunk1 = Bytes::from_static(b"{\"new\": null,\"changed\": null,");
|
||||||
let diff: ResourceDiff = serde_json::from_str(data).unwrap();
|
let chunk2 =
|
||||||
let empty_diff = ResourceDiff {
|
Bytes::from_static(b"\"gone\": null,\"full_update\": true}\r{\"new\": null,\"changed");
|
||||||
new: None,
|
let chunk3 = Bytes::from_static(b"\": null,\"gone\": null,\"full_update\": true}");
|
||||||
changed: None,
|
let chunk4 = Bytes::from_static(b"\r");
|
||||||
gone: None,
|
let (tx, rx) = mpsc::channel();
|
||||||
full_update: true,
|
tx.send(chunk1).unwrap();
|
||||||
|
tx.send(chunk2).unwrap();
|
||||||
|
tx.send(chunk3).unwrap();
|
||||||
|
tx.send(chunk4).unwrap();
|
||||||
|
let mut diffs = ResourceStream {
|
||||||
|
rx: rx,
|
||||||
|
partial: None,
|
||||||
|
buf: vec![],
|
||||||
};
|
};
|
||||||
assert_eq!(diff, empty_diff);
|
let mut res = diffs.next();
|
||||||
|
assert_ne!(res, None);
|
||||||
|
if let Some(diff) = res {
|
||||||
|
assert_eq!(diff.new, None);
|
||||||
|
assert_eq!(diff.full_update, true);
|
||||||
|
}
|
||||||
|
res = diffs.next();
|
||||||
|
assert_ne!(res, None);
|
||||||
|
if let Some(diff) = res {
|
||||||
|
assert_eq!(diff.new, None);
|
||||||
|
assert_eq!(diff.full_update, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_stream(
|
||||||
|
api_endpoint: String,
|
||||||
|
name: String,
|
||||||
|
token: String,
|
||||||
|
resource_types: Vec<String>,
|
||||||
|
) -> Result<ResourceStream, Error> {
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let req = proto::ResourceRequest {
|
||||||
|
request_origin: name,
|
||||||
|
resource_types: resource_types,
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&req)?;
|
||||||
|
|
||||||
|
let auth_value = format!("Bearer {}", token);
|
||||||
|
|
||||||
|
let client = Client::new();
|
||||||
|
|
||||||
|
let mut stream = client
|
||||||
|
.get(api_endpoint)
|
||||||
|
.header("Authorization", &auth_value)
|
||||||
|
.body(json)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.bytes_stream();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
let bytes = match chunk {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(_e) => {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tx.send(bytes).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(ResourceStream {
|
||||||
|
rx: rx,
|
||||||
|
buf: vec![],
|
||||||
|
partial: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue