From 2597cb53f7972663fecd38cc2eec54e348a8cdb7 Mon Sep 17 00:00:00 2001 From: onyinyang Date: Wed, 25 Oct 2023 13:58:24 -0400 Subject: [PATCH] Add prometheus http server --- Cargo.lock | 511 +++++++++++++++++++++- crates/lox-distributor/Cargo.toml | 3 + crates/lox-distributor/config.json | 1 + crates/lox-distributor/src/db_handler.rs | 11 +- crates/lox-distributor/src/lox_context.rs | 2 +- crates/lox-distributor/src/main.rs | 43 +- crates/lox-distributor/src/metrics.rs | 192 +++++--- 7 files changed, 661 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e7eb2e..5b8ef71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,198 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix-codec" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "617a8268e3537fe1d8c9ead925fca49ef6400927ee7bc26750e90ecee14ce4b8" +dependencies = [ + "bitflags 1.3.2", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92ef85799cba03f76e4f7c10f533e66d87c9a7e7055f3391f09000ad8351bc9" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "ahash", + "base64", + "bitflags 2.3.3", + "brotli", + "bytes", + "bytestring", + "derive_more", + "encoding_rs", + "flate2", + "futures-core", + "h2", + "http", + "httparse", + "httpdate", + "itoa", + "language-tags", + "local-channel", + "mime", + "percent-encoding", + "pin-project-lite", + "rand 0.8.5", + "sha1", + "smallvec", + "tokio", + "tokio-util", + "tracing", + "zstd", +] + +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn 2.0.32", +] + +[[package]] +name = "actix-router" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66ff4d247d2b160861fa2866457e85706833527840e4133f8f49aa423a38799" +dependencies = [ + "bytestring", + "http", + "regex", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2 0.5.5", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" +dependencies = [ + "futures-core", + "paste", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4a5b5e29603ca8c94a77c65cf874718ceb60292c5a5c3e5f4ace041af462b9" +dependencies = [ + "actix-codec", + "actix-http", + "actix-macros", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "actix-web-codegen", + "ahash", + "bytes", + "bytestring", + "cfg-if", + "cookie", + "derive_more", + "encoding_rs", + "futures-core", + "futures-util", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2 0.5.5", + "time", + "url", +] + +[[package]] +name = "actix-web-codegen" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" +dependencies = [ + "actix-router", + "proc-macro2", + "quote", + "syn 2.0.32", +] + +[[package]] +name = "actix-web-prom" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23f332a652836b8f3a6876103c70c9ed436d0e69fa779ab5d7f57b1d5c8d488" +dependencies = [ + "actix-web", + "futures-core", + "pin-project-lite", + "prometheus", + "regex", +] + [[package]] name = "addr2line" version = "0.20.0" @@ -52,6 +244,43 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" +dependencies = [ + "cfg-if", + "getrandom 0.2.10", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +dependencies = [ + "memchr", +] + +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -187,6 +416,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da74e2b81409b1b743f8f0c62cc6254afefb8b8e50bbfe3735550f7aeefa3448" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -205,11 +455,23 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bytestring" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "238e4886760d98c4f899360c834fa93e62cf7f721ac3c2da375cbdf4b8679aae" +dependencies = [ + "bytes", +] + [[package]] name = "cc" version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -273,7 +535,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -313,6 +575,23 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "cookie" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -422,7 +701,7 @@ checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -446,7 +725,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.32", ] [[package]] @@ -457,7 +736,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -480,6 +759,19 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -490,6 +782,12 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "ed25519" version = "2.2.2" @@ -563,6 +861,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -665,7 +973,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -865,7 +1173,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -976,6 +1284,15 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "jobserver" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.64" @@ -1003,6 +1320,12 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.4.0" @@ -1011,9 +1334,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "linux-raw-sys" @@ -1021,6 +1344,23 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +[[package]] +name = "local-channel" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a493488de5f18c8ffcba89eebb8532ffc562dc400490eb65b84893fae0b178" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" + [[package]] name = "lock_api" version = "0.4.10" @@ -1041,6 +1381,8 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" name = "lox-distributor" version = "0.1.0" dependencies = [ + "actix-web", + "actix-web-prom", "base64", "chrono", "clap", @@ -1051,6 +1393,7 @@ dependencies = [ "lox-library", "lox_utils", "prometheus", + "prometheus-client", "rand 0.8.5", "rdsys_backend", "reqwest", @@ -1165,6 +1508,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -1318,7 +1662,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1387,6 +1731,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "percent-encoding" version = "2.3.0" @@ -1475,6 +1825,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-client" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510c4f1c9d81d556458f94c98f857748130ea9737bbd6053da497503b26ea63c" +dependencies = [ + "dtoa", + "itoa", + "parking_lot 0.12.1", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -1713,6 +2086,35 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "regex" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + [[package]] name = "reqwest" version = "0.11.18" @@ -1847,7 +2249,7 @@ checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1899,7 +2301,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1980,6 +2382,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "spki" version = "0.7.2" @@ -2014,9 +2426,20 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" -version = "2.0.29" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -2053,7 +2476,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -2115,7 +2538,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.9", "tokio-macros", "windows-sys", ] @@ -2128,7 +2551,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -2179,6 +2602,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] @@ -2306,7 +2730,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wasm-bindgen-shared", ] @@ -2340,7 +2764,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2480,6 +2904,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "zerocopy" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8db0ac2df3d060f81ec0380ccc5b71c2a7c092cfced671feeee1320e95559c87" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b6093bc6d5265ff40b479c834cdd25d8e20784781a2a29a8106327393d0a9ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "zeroize" version = "1.6.0" @@ -2497,7 +2941,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -2512,3 +2956,32 @@ dependencies = [ "serde_derive", "thiserror", ] + +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.9+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/crates/lox-distributor/Cargo.toml b/crates/lox-distributor/Cargo.toml index ea017e0..d6af14e 100644 --- a/crates/lox-distributor/Cargo.toml +++ b/crates/lox-distributor/Cargo.toml @@ -30,6 +30,9 @@ clap = { version = "4.4.7", features = ["derive"] } serde_json = "1.0.108" prometheus = "0.13.3" sled = "0.34.7" +actix-web = "4.4.0" +actix-web-prom = "0.7.0" +prometheus-client = "0.22.0" [dependencies.chrono] version = "0.4.31" diff --git a/crates/lox-distributor/config.json b/crates/lox-distributor/config.json index 06f6fdc..438cab1 100644 --- a/crates/lox-distributor/config.json +++ b/crates/lox-distributor/config.json @@ -3,6 +3,7 @@ "db_path": "lox_db" }, + "metrics_port": 5222, "bridge_config": { "percent_spares": 50 }, diff --git a/crates/lox-distributor/src/db_handler.rs b/crates/lox-distributor/src/db_handler.rs index f14ded6..acf0cfe 100644 --- a/crates/lox-distributor/src/db_handler.rs +++ b/crates/lox-distributor/src/db_handler.rs @@ -31,6 +31,7 @@ impl DB { pub fn open_new_or_existing_db( db_config: DbConfig, roll_back_date: Option, + metrics: Metrics, ) -> Result<(DB, lox_context::LoxServerContext), sled::Error> { let mut context: lox_context::LoxServerContext; let (lox_db, context) = match sled::open(db_config.db_path) { @@ -38,7 +39,7 @@ impl DB { // Check if the lox_db already exists if lox_db.was_recovered() { context = read_lox_context_from_db(lox_db.clone(), roll_back_date); - context.metrics = Metrics::default(); + context.metrics = metrics; //Otherwise, create a new Lox context } else { let new_db = BridgeDb::new(); @@ -48,7 +49,7 @@ impl DB { ba: Arc::new(Mutex::new(new_ba)), extra_bridges: Arc::new(Mutex::new(Vec::new())), to_be_replaced_bridges: Arc::new(Mutex::new(Vec::new())), - metrics: Metrics::default(), + metrics, }; } (DB { db: lox_db }, context) @@ -126,12 +127,14 @@ fn use_last_context(lox_db: sled::Db) -> lox_context::LoxServerContext { #[cfg(test)] mod tests { use super::lox_context::LoxServerContext; - use super::DB; use super::DbConfig; + use super::Metrics; + use super::DB; #[test] fn test_write_context() { - let (mut lox_db, _context) = DB::open_new_or_existing_db(DbConfig::default(), None).unwrap(); + let (mut lox_db, _context) = + DB::open_new_or_existing_db(DbConfig::default(), None, Metrics::default()).unwrap(); assert!( lox_db.db.is_empty(), "db read from context that shouldn't exist" diff --git a/crates/lox-distributor/src/lox_context.rs b/crates/lox-distributor/src/lox_context.rs index da77437..317692a 100644 --- a/crates/lox-distributor/src/lox_context.rs +++ b/crates/lox-distributor/src/lox_context.rs @@ -18,8 +18,8 @@ use std::{ }; use zkp::ProofError; -use crate::resource_parser::{parse_into_bridgelines, sort_for_parsing}; use crate::metrics::Metrics; +use crate::resource_parser::{parse_into_bridgelines, sort_for_parsing}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LoxServerContext { diff --git a/crates/lox-distributor/src/main.rs b/crates/lox-distributor/src/main.rs index ce77d90..8aba55a 100644 --- a/crates/lox-distributor/src/main.rs +++ b/crates/lox-distributor/src/main.rs @@ -6,17 +6,24 @@ use hyper::{ Body, Request, Response, Server, }; +use prometheus_client::registry::Registry; use rdsys_backend::{proto::ResourceState, request_resources}; use serde::Deserialize; use std::{ - convert::Infallible, fs::File, io::BufReader, net::SocketAddr, path::PathBuf, time::Duration, + convert::Infallible, + fs::File, + io::BufReader, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + time::Duration, }; mod db_handler; use db_handler::DB; mod lox_context; mod metrics; +use metrics::Metrics; mod request_handler; use request_handler::handle; mod resource_parser; @@ -55,6 +62,7 @@ struct Args { #[derive(Debug, Deserialize)] struct Config { db: DbConfig, + metrics_port: u16, bridge_config: BridgeConfig, rtype: ResourceInfo, } @@ -144,15 +152,23 @@ async fn parse_bridges(rdsys_tx: mpsc::Sender, mut rx: mpsc::Receiver){ + tokio::select! { + lox_metrics = metrics::start_metrics_server(metrics_addr, registry) => lox_metrics, + _ = kill.recv() => {println!("Shut down metrics server");}, + } +} + async fn create_context_manager( db_config: DbConfig, bridge_config: BridgeConfig, roll_back_date: Option, + metrics: Metrics, context_rx: mpsc::Receiver, mut kill: broadcast::Receiver<()>, ) { tokio::select! { - create_context = context_manager(db_config, bridge_config, roll_back_date, context_rx) => create_context, + create_context = context_manager(db_config, bridge_config, roll_back_date, metrics, context_rx) => create_context, _ = kill.recv() => {println!("Shut down context_manager");}, } } @@ -164,14 +180,16 @@ async fn context_manager( db_config: DbConfig, bridge_config: BridgeConfig, roll_back_date: Option, + metrics: Metrics, mut context_rx: mpsc::Receiver, ) { - let (mut lox_db, context) = match DB::open_new_or_existing_db(db_config, roll_back_date) { - Ok((lox_db, context)) => (lox_db, context), - Err(e) => { - panic!("Error: {:?}", e); - } - }; + let (mut lox_db, context) = + match DB::open_new_or_existing_db(db_config, roll_back_date, metrics) { + Ok((lox_db, context)) => (lox_db, context), + Err(e) => { + panic!("Error: {:?}", e); + } + }; while let Some(cmd) = context_rx.recv().await { use Command::*; @@ -252,6 +270,7 @@ async fn main() { // create the shutdown broadcast channel and clone for every thread let (shutdown_tx, mut shutdown_rx) = broadcast::channel(16); let kill_stream = shutdown_tx.subscribe(); + let kill_metrics = shutdown_tx.subscribe(); let kill_parser = shutdown_tx.subscribe(); let kill_context = shutdown_tx.subscribe(); @@ -270,11 +289,18 @@ async fn main() { } }); + let metrics = Metrics::default(); + let registry = metrics.register(); + let metrics_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.metrics_port); + let metrics_handler = + spawn(async move { start_metrics_collector(metrics_addr, registry, kill_metrics).await }); + let context_manager = spawn(async move { create_context_manager( config.db, config.bridge_config, args.roll_back_date, + metrics, context_rx, kill_context, ) @@ -312,6 +338,7 @@ async fn main() { eprintln!("server error: {}", e); } future::join_all([ + metrics_handler, rdsys_request_handler, rdsys_resource_receiver, context_manager, diff --git a/crates/lox-distributor/src/metrics.rs b/crates/lox-distributor/src/metrics.rs index 5fbe171..9a12684 100644 --- a/crates/lox-distributor/src/metrics.rs +++ b/crates/lox-distributor/src/metrics.rs @@ -1,5 +1,10 @@ -use futures::executor::block_on; -use prometheus::{Counter, Encoder, Opts, Registry, TextEncoder}; +use hyper::{ + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; +use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry}; +use std::{future::Future, io, net::SocketAddr, pin::Pin, sync::Arc}; +use tokio::signal::unix::{signal, SignalKind}; #[derive(Debug, Clone)] pub struct Metrics { @@ -18,75 +23,17 @@ pub struct Metrics { impl Default for Metrics { fn default() -> Self { // Create counters. - let open_inv_count = Counter::with_opts(Opts::new( - "open_inv_counter", - "number of open invitations distributed", - )) - .unwrap(); - let trust_promo_count = Counter::with_opts(Opts::new( - "trust_promo_counter", - "number of trust promotions requests", - )) - .unwrap(); - let trust_mig_count = Counter::with_opts(Opts::new( - "trust_mig_counter", - "number of trust migrations requests", - )) - .unwrap(); - let level_up_count = - Counter::with_opts(Opts::new("level_up_counter", "number of level up requests")) - .unwrap(); - let issue_invite_count = Counter::with_opts(Opts::new( - "issue_invite_counter", - "number of issue invite requests", - )) - .unwrap(); - let redeem_invite_count = - Counter::with_opts(Opts::new("redeem_invite_counter", "number of level up requests")) - .unwrap(); - let check_blockage_count = Counter::with_opts(Opts::new( - "check_blockage_counter", - "number of check blockage requests", - )) - .unwrap(); - let blockage_migration_count = Counter::with_opts(Opts::new( - "blockage_migration_counter", - "number of blockage migration requests", - )) - .unwrap(); - let k_reset_count = Counter::with_opts(Opts::new( - "k_reset_counter", - "number of times k has reset to 0", - )) - .unwrap(); - let buckets_requested_today = Counter::with_opts(Opts::new( - "buckets_requested_today", - "number of buckets used today", - )) - .unwrap(); + let open_inv_count = Counter::default(); + let trust_promo_count = Counter::default(); + let trust_mig_count = Counter::default(); + let level_up_count = Counter::default(); + let issue_invite_count = Counter::default(); + let redeem_invite_count = Counter::default(); + let check_blockage_count = Counter::default(); + let blockage_migration_count = Counter::default(); + let k_reset_count = Counter::default(); + let buckets_requested_today = Counter::default(); - // Create a Registry and register Counter. - let r = Registry::new(); - r.register(Box::new(open_inv_count.clone())).unwrap(); - r.register(Box::new(trust_promo_count.clone())).unwrap(); - r.register(Box::new(trust_mig_count.clone())).unwrap(); - r.register(Box::new(level_up_count.clone())).unwrap(); - r.register(Box::new(issue_invite_count.clone())).unwrap(); - r.register(Box::new(redeem_invite_count.clone())).unwrap(); - r.register(Box::new(check_blockage_count.clone())).unwrap(); - r.register(Box::new(blockage_migration_count.clone())).unwrap(); - r.register(Box::new(k_reset_count.clone())).unwrap(); - r.register(Box::new(buckets_requested_today.clone())).unwrap(); - - // Gather the metrics. - /* let mut buffer = vec![]; - let encoder = TextEncoder::new(); - let metric_families = r.gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); -*/ - - // Output to the standard output. - // println!("{}", String::from_utf8(buffer).unwrap()); Metrics { open_inv_count, trust_promo_count, @@ -101,3 +48,108 @@ impl Default for Metrics { } } } + +impl Metrics { + pub fn register(&self) -> Registry { + // Create a Registry and register Counter. + let mut r = ::with_prefix("lox-metrics"); + r.register( + "open_inv_counter", + "number of open invitations distributed", + self.open_inv_count.clone(), + ); + r.register( + "trust_promo_counter", + "number of trust promotions requests", + self.trust_promo_count.clone(), + ); + r.register( + "trust_mig_counter", + "number of trust migrations requests", + self.trust_mig_count.clone(), + ); + r.register( + "level_up_counter", + "number of level up requests", + self.level_up_count.clone(), + ); + r.register( + "issue_invite_counter", + "number of issue invite requests", + self.issue_invite_count.clone(), + ); + r.register( + "redeem_invite_counter", + "number of level up requests", + self.redeem_invite_count.clone(), + ); + r.register( + "check_blockage_counter", + "number of check blockage requests", + self.check_blockage_count.clone(), + ); + r.register( + "blockage_migration_counter", + "number of blockage migration requests", + self.blockage_migration_count.clone(), + ); + r.register( + "k_reset_counter", + "number of times k has reset to 0", + self.k_reset_count.clone(), + ); + r.register( + "buckets_requested_today", + "number of buckets used today", + self.buckets_requested_today.clone(), + ); + r + } +} + +/// Start a HTTP server to report metrics. +pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) { + let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); + + eprintln!("Starting metrics server on {metrics_addr}"); + + let registry = Arc::new(registry); + Server::bind(&metrics_addr) + .serve(make_service_fn(move |_conn| { + let registry = registry.clone(); + async move { + let handler = make_handler(registry); + Ok::<_, io::Error>(service_fn(handler)) + } + })) + .with_graceful_shutdown(async move { + shutdown_stream.recv().await; + }) + .await + .unwrap(); +} + +/// This function returns a HTTP handler (i.e. another function) +pub fn make_handler( + registry: Arc, +) -> impl Fn(Request) -> Pin>> + Send>> { + // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. + move |_req: Request| { + let reg = registry.clone(); + Box::pin(async move { + let mut buf = String::new(); + encode(&mut buf, ®.clone()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map(|_| { + let body = Body::from(buf); + Response::builder() + .header( + hyper::header::CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ) + .body(body) + .unwrap() + }) + }) + } +}