use hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, }; use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry}; use std::{future::Future, io, net::SocketAddr, pin::Pin, sync::Arc}; use tokio::signal::unix::{signal, SignalKind}; #[derive(Debug, Clone)] pub struct Metrics { pub existing_or_updated_bridges: Counter, pub new_bridges: Counter, pub removed_bridges: Counter, pub blocked_bridges: Counter, pub open_inv_count: Counter, pub trust_promo_count: Counter, pub trust_mig_count: Counter, pub level_up_count: Counter, pub issue_invite_count: Counter, pub redeem_invite_count: Counter, pub check_blockage_count: Counter, pub blockage_migration_count: Counter, pub k_reset_count: Counter, pub invites_requested: Counter, } impl Default for Metrics { fn default() -> Self { // Create counters. let existing_or_updated_bridges = Counter::default(); let new_bridges = Counter::default(); let removed_bridges = Counter::default(); let blocked_bridges = Counter::default(); let open_inv_count = Counter::default(); let trust_promo_count = Counter::default(); let trust_mig_count = Counter::default(); let level_up_count = Counter::default(); let issue_invite_count = Counter::default(); let redeem_invite_count = Counter::default(); let check_blockage_count = Counter::default(); let blockage_migration_count = Counter::default(); let k_reset_count = Counter::default(); let invites_requested = Counter::default(); Metrics { existing_or_updated_bridges, new_bridges, removed_bridges, blocked_bridges, open_inv_count, trust_promo_count, trust_mig_count, level_up_count, issue_invite_count, redeem_invite_count, check_blockage_count, blockage_migration_count, k_reset_count, invites_requested, } } } impl Metrics { pub fn register(&self) -> Registry { // Create a Registry and register Counter. let mut r = ::with_prefix("lox-metrics"); r.register( "existing_or_updated_bridges", "number of existing or updated bridges recorded at rdsys sync", self.existing_or_updated_bridges.clone(), ); r.register( "new_bridges", "number of new bridges added to bridge table", self.new_bridges.clone(), ); r.register( "removed_bridges", "number of bridges removed from the bridgetable", self.removed_bridges.clone(), ); r.register( "blocked_bridges", "number of bridges blocked", self.blocked_bridges.clone(), ); r.register( "open_inv_counter", "number of open invitations distributed", self.open_inv_count.clone(), ); r.register( "trust_promo_counter", "number of trust promotions requests", self.trust_promo_count.clone(), ); r.register( "trust_mig_counter", "number of trust migrations requests", self.trust_mig_count.clone(), ); r.register( "level_up_counter", "number of level up requests", self.level_up_count.clone(), ); r.register( "issue_invite_counter", "number of issue invite requests", self.issue_invite_count.clone(), ); r.register( "redeem_invite_counter", "number of level up requests", self.redeem_invite_count.clone(), ); r.register( "check_blockage_counter", "number of check blockage requests", self.check_blockage_count.clone(), ); r.register( "blockage_migration_counter", "number of blockage migration requests", self.blockage_migration_count.clone(), ); r.register( "k_reset_counter", "number of times k has reset to 0", self.k_reset_count.clone(), ); r.register( "invites_requested", "number of invites requested", self.invites_requested.clone(), ); r } } /// Start a HTTP server to report metrics. pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) { let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); eprintln!("Starting metrics server on {metrics_addr}"); let registry = Arc::new(registry); Server::bind(&metrics_addr) .serve(make_service_fn(move |_conn| { let registry = registry.clone(); async move { let handler = make_handler(registry); Ok::<_, io::Error>(service_fn(handler)) } })) .with_graceful_shutdown(async move { shutdown_stream.recv().await; }) .await .unwrap(); } /// This function returns an HTTP handler (i.e. another function) pub fn make_handler( registry: Arc, ) -> impl Fn(Request) -> Pin>> + Send>> { // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. move |_req: Request| { let reg = registry.clone(); Box::pin(async move { let mut buf = String::new(); encode(&mut buf, ®.clone()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) .map(|_| { let body = Body::from(buf); Response::builder() .header( hyper::header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8", ) .body(body) .unwrap() }) }) } }