Add static request capabilities to rdsys-api-backend

This commit is contained in:
onyinyang 2023-08-23 17:26:03 -04:00
parent ee4f53dbd4
commit 96cf4ab764
No known key found for this signature in database
GPG Key ID: 156A6435430C2036
3 changed files with 55 additions and 5 deletions

View File

@ -9,7 +9,7 @@ use hyper::{
}; };
use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET}; use lox_library::bridge_table::{BridgeLine, MAX_BRIDGES_PER_BUCKET};
use rdsys_backend::{proto::ResourceDiff, start_stream}; use rdsys_backend::{proto::ResourceDiff, proto::Resource, request_resources, start_stream};
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{

View File

@ -14,8 +14,8 @@ bytes = "1"
hex = "0.4.3" hex = "0.4.3"
crc64 = "2.0.0" crc64 = "2.0.0"
sha1 = "0.10.6" sha1 = "0.10.6"
tokio = { version = "1", features = ["macros"]} tokio = { version = "1", features = ["full", "macros"] }
reqwest = { version = "0.11", features = ["stream"]} reqwest = { version = "0.11", features = ["json", "stream"]}
tokio-stream = "0.1.14" tokio-stream = "0.1.14"
futures = "0.3.28" futures = "0.3.28"
tokio-util = "0.7.9" tokio-util = "0.7.9"

View File

@ -6,7 +6,7 @@
use bytes::{self, Buf, Bytes}; use bytes::{self, Buf, Bytes};
use core::pin::Pin; use core::pin::Pin;
use futures_util::{Stream, StreamExt}; use futures_util::{Stream, StreamExt};
use reqwest::Client; use reqwest::{Client, StatusCode};
use std::io::{self, BufRead}; use std::io::{self, BufRead};
use std::task::{ready, Context, Poll}; use std::task::{ready, Context, Poll};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -19,6 +19,7 @@ pub enum Error {
Reqwest(reqwest::Error), Reqwest(reqwest::Error),
Io(io::Error), Io(io::Error),
JSON(serde_json::Error), JSON(serde_json::Error),
String(StatusCode),
} }
impl From<serde_json::Error> for Error { impl From<serde_json::Error> for Error {
@ -39,6 +40,17 @@ impl From<io::Error> for Error {
} }
} }
pub struct StaticResourceRequest {}
impl StaticResourceRequest {
pub fn new(rx: mpsc::Receiver<Bytes>) -> StaticResourceRequest {
StaticResourceRequest {
}
}
}
/// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes /// An iterable wrapper of ResourceDiff items for the streamed chunks of Bytes
/// received from the connection to the rdsys backend /// received from the connection to the rdsys backend
pub struct ResourceStream { pub struct ResourceStream {
@ -220,7 +232,7 @@ mod tests {
/// } /// }
/// } /// }
/// ``` /// ```
///
pub async fn start_stream( pub async fn start_stream(
api_endpoint: String, api_endpoint: String,
name: String, name: String,
@ -260,3 +272,41 @@ pub async fn start_stream(
}); });
Ok(ResourceStream::new(rx)) Ok(ResourceStream::new(rx))
} }
pub async fn request_resources( api_endpoint: String,
name: String,
token: String,
resource_types: Vec<String>,
) -> Result<Vec<proto::Resource>, Error> {
let fetched_resources: Result<Vec<proto::Resource>, Error>;
let req = proto::ResourceRequest {
request_origin: name,
resource_types,
};
let json = serde_json::to_string(&req)?;
let auth_value = format!("Bearer {}", token);
let client = Client::new();
let response = client
.get(api_endpoint)
.header("Authorization", &auth_value)
.body(json)
.send()
.await.unwrap();
println!("Success? {:?}", response);
match response.status() {
reqwest::StatusCode::OK => {
fetched_resources = match response.json::<Vec<proto::Resource>>().await {
Ok(fetched_resources) => Ok(fetched_resources),
Err(e) => Err(Error::Reqwest(e)),
};
}
other => {
fetched_resources = Err(Error::String(other))
}
};
println!("Resources: {:?}", fetched_resources);
fetched_resources
}