lox/crates/lox-distributor/src/metrics.rs

156 lines
5.1 KiB
Rust
Raw Normal View History

2023-10-25 13:58:24 -04:00
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry};
use std::{future::Future, io, net::SocketAddr, pin::Pin, sync::Arc};
use tokio::signal::unix::{signal, SignalKind};
2023-10-30 12:54:59 -04:00
#[derive(Debug, Clone)]
pub struct Metrics {
pub open_inv_count: Counter,
pub trust_promo_count: Counter,
pub trust_mig_count: Counter,
pub level_up_count: Counter,
pub issue_invite_count: Counter,
pub redeem_invite_count: Counter,
pub check_blockage_count: Counter,
pub blockage_migration_count: Counter,
pub k_reset_count: Counter,
pub buckets_requested_today: Counter,
}
impl Default for Metrics {
fn default() -> Self {
// Create counters.
2023-10-25 13:58:24 -04:00
let open_inv_count = Counter::default();
let trust_promo_count = Counter::default();
let trust_mig_count = Counter::default();
let level_up_count = Counter::default();
let issue_invite_count = Counter::default();
let redeem_invite_count = Counter::default();
let check_blockage_count = Counter::default();
let blockage_migration_count = Counter::default();
let k_reset_count = Counter::default();
let buckets_requested_today = Counter::default();
Metrics {
open_inv_count,
trust_promo_count,
trust_mig_count,
level_up_count,
issue_invite_count,
redeem_invite_count,
check_blockage_count,
blockage_migration_count,
k_reset_count,
buckets_requested_today,
}
}
}
impl Metrics {
pub fn register(&self) -> Registry {
// Create a Registry and register Counter.
let mut r = <Registry>::with_prefix("lox-metrics");
r.register(
2023-10-30 12:54:59 -04:00
"open_inv_counter",
"number of open invitations distributed",
2023-10-25 13:58:24 -04:00
self.open_inv_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"trust_promo_counter",
"number of trust promotions requests",
2023-10-25 13:58:24 -04:00
self.trust_promo_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"trust_mig_counter",
"number of trust migrations requests",
2023-10-25 13:58:24 -04:00
self.trust_mig_count.clone(),
);
r.register(
"level_up_counter",
"number of level up requests",
self.level_up_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"issue_invite_counter",
"number of issue invite requests",
2023-10-25 13:58:24 -04:00
self.issue_invite_count.clone(),
);
r.register(
"redeem_invite_counter",
"number of level up requests",
self.redeem_invite_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"check_blockage_counter",
"number of check blockage requests",
2023-10-25 13:58:24 -04:00
self.check_blockage_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"blockage_migration_counter",
"number of blockage migration requests",
2023-10-25 13:58:24 -04:00
self.blockage_migration_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"k_reset_counter",
"number of times k has reset to 0",
2023-10-25 13:58:24 -04:00
self.k_reset_count.clone(),
);
r.register(
2023-10-30 12:54:59 -04:00
"buckets_requested_today",
"number of buckets used today",
2023-10-25 13:58:24 -04:00
self.buckets_requested_today.clone(),
);
r
}
}
2023-10-30 12:54:59 -04:00
2023-10-25 13:58:24 -04:00
/// Start a HTTP server to report metrics.
pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();
2023-10-30 12:54:59 -04:00
2023-10-25 13:58:24 -04:00
eprintln!("Starting metrics server on {metrics_addr}");
2023-10-30 12:54:59 -04:00
2023-10-25 13:58:24 -04:00
let registry = Arc::new(registry);
Server::bind(&metrics_addr)
.serve(make_service_fn(move |_conn| {
let registry = registry.clone();
async move {
let handler = make_handler(registry);
Ok::<_, io::Error>(service_fn(handler))
}
}))
.with_graceful_shutdown(async move {
shutdown_stream.recv().await;
})
.await
.unwrap();
}
/// This function returns a HTTP handler (i.e. another function)
pub fn make_handler(
registry: Arc<Registry>,
) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> {
// This closure accepts a request and responds with the OpenMetrics encoding of our metrics.
move |_req: Request<Body>| {
let reg = registry.clone();
Box::pin(async move {
let mut buf = String::new();
encode(&mut buf, &reg.clone())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
let body = Body::from(buf);
Response::builder()
.header(
hyper::header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(body)
.unwrap()
})
})
2023-10-30 12:54:59 -04:00
}
}