-
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmod.rs
More file actions
114 lines (93 loc) · 2.79 KB
/
mod.rs
File metadata and controls
114 lines (93 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use prometheus::Registry;
use sqlx::PgPool;
use std::sync::Arc;
use tracing::{debug, info_span, instrument, warn};
use tracing_futures::Instrument as _;
pub mod replica;
use replica::ReplicaCollector;
pub mod stat_replication;
use stat_replication::StatReplicationCollector;
pub mod slots;
use slots::ReplicationSlotsCollector;
#[derive(Clone, Default)]
pub struct ReplicationCollector {
subs: Vec<Arc<dyn Collector + Send + Sync>>,
}
impl ReplicationCollector {
#[must_use]
pub fn new() -> Self {
Self {
subs: vec![
Arc::new(ReplicaCollector::new()),
Arc::new(StatReplicationCollector::new()),
Arc::new(ReplicationSlotsCollector::new()),
],
}
}
}
impl Collector for ReplicationCollector {
fn name(&self) -> &'static str {
"replication"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "replication")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
for sub in &self.subs {
let span = info_span!("collector.register_metrics", sub_collector = %sub.name());
let res = sub.register_metrics(registry);
match res {
Ok(()) => debug!(collector = sub.name(), "registered metrics"),
Err(ref e) => {
warn!(collector = sub.name(), error = %e, "failed to register metrics");
}
}
res?;
drop(span);
}
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector = "replication", otel.kind = "internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let mut tasks = FuturesUnordered::new();
for sub in &self.subs {
let span = info_span!("collector.collect", sub_collector = %sub.name(), otel.kind = "internal");
tasks.push(sub.collect(pool).instrument(span));
}
while let Some(res) = tasks.next().await {
res?;
}
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_collector_name() {
let collector = ReplicationCollector::new();
assert_eq!(collector.name(), "replication");
}
#[test]
fn test_replication_collector_not_enabled_by_default() {
let collector = ReplicationCollector::new();
assert!(!collector.enabled_by_default());
}
}