Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }

# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

# Config
dotenvy = "0.15"

Expand Down
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
dotenvy = { workspace = true }
bigdecimal = { workspace = true }
hex = { workspace = true }
Expand Down
18 changes: 18 additions & 0 deletions backend/crates/atlas-server/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ impl IntoResponse for ApiError {
let status =
StatusCode::from_u16(self.0.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);

// Increment error counter for Prometheus alerting
let error_type = match &self.0 {
AtlasError::Database(_) => "database",
AtlasError::Internal(_) => "internal",
AtlasError::Config(_) => "config",
AtlasError::Rpc(_) => "rpc_request",
AtlasError::MetadataFetch(_) => "metadata_fetch",
_ => "",
};
if !error_type.is_empty() {
metrics::counter!(
"atlas_errors_total",
"component" => "api",
"error_type" => error_type
)
.increment(1);
}
Comment thread
pthmas marked this conversation as resolved.

// Determine the client-facing message based on error type.
// Internal details are logged server-side to avoid leaking stack traces or
// database internals to callers.
Expand Down
5 changes: 5 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ mod tests {
let head_tracker = Arc::new(crate::head::HeadTracker::empty(10));
let (tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();
Arc::new(AppState {
pool,
block_events_tx: tx,
Expand All @@ -178,6 +181,8 @@ mod tests {
background_color_light: None,
success_color: None,
error_color: None,
metrics: crate::metrics::Metrics::new(),
prometheus_handle,
})
}

Expand Down
60 changes: 60 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use serde::Serialize;
use std::sync::Arc;

use crate::api::AppState;

#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
}

/// GET /health/live — liveness probe (process is alive)
pub async fn liveness() -> impl IntoResponse {
Json(HealthResponse {
status: "ok",
reason: None,
})
}

/// GET /health/ready — readiness probe (DB reachable, indexer fresh)
pub async fn readiness(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// Check DB connectivity
if let Err(e) = sqlx::query("SELECT 1").execute(&state.pool).await {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some(format!("database unreachable: {e}")),
}),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
);
}

// Check indexer freshness (head within 5 minutes)
if let Some(block) = state.head_tracker.latest().await {
let now = chrono::Utc::now();
let age = now - block.indexed_at;
if age > chrono::Duration::minutes(5) {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some(format!(
"indexer stale: last block indexed {}s ago",
age.num_seconds()
)),
}),
);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

(
StatusCode::OK,
Json(HealthResponse {
status: "ready",
reason: None,
}),
)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
9 changes: 9 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use axum::extract::State;
use std::sync::Arc;

use crate::api::AppState;

/// GET /metrics — Prometheus text format
pub async fn metrics(State(state): State<Arc<AppState>>) -> String {
state.prometheus_handle.render()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ pub mod blocks;
pub mod config;
pub mod etherscan;
pub mod faucet;
pub mod health;
pub mod logs;
pub mod metrics;
pub mod nfts;
pub mod proxy;
pub mod search;
Expand Down
23 changes: 21 additions & 2 deletions backend/crates/atlas-server/src/api/handlers/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::api::handlers::get_latest_block;
use crate::api::AppState;
use crate::head::HeadTracker;
use crate::indexer::DaSseUpdate;
use crate::metrics::{Metrics, SseConnectionGuard};
use atlas_common::Block;
use sqlx::PgPool;
use tracing::warn;
Expand Down Expand Up @@ -45,8 +46,11 @@ fn make_event_stream(
head_tracker: Arc<HeadTracker>,
mut block_rx: broadcast::Receiver<()>,
mut da_rx: broadcast::Receiver<Vec<DaSseUpdate>>,
metrics: Option<Metrics>,
) -> impl Stream<Item = Result<Event, Infallible>> + Send {
async_stream::stream! {
// Guard decrements the SSE connection gauge when the stream is dropped
let _guard = metrics.map(SseConnectionGuard::new);
let mut last_block_number: Option<i64> = None;

match head_tracker.latest().await {
Expand Down Expand Up @@ -147,6 +151,7 @@ pub async fn block_events(
state.head_tracker.clone(),
state.block_events_tx.subscribe(),
state.da_events_tx.subscribe(),
Some(state.metrics.clone()),
);
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
Expand Down Expand Up @@ -281,7 +286,13 @@ mod tests {

let (tx, _) = broadcast::channel::<()>(16);
let (da_tx, _) = broadcast::channel::<Vec<DaSseUpdate>>(16);
let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe());
let stream = make_event_stream(
dummy_pool(),
tracker,
tx.subscribe(),
da_tx.subscribe(),
None,
);
tokio::pin!(stream);

// Drop sender so loop terminates after the initial seed.
Expand Down Expand Up @@ -313,6 +324,7 @@ mod tests {
tracker.clone(),
tx.subscribe(),
da_tx.subscribe(),
None,
);
tokio::pin!(stream);

Expand Down Expand Up @@ -353,6 +365,7 @@ mod tests {
tracker.clone(),
tx.subscribe(),
da_tx.subscribe(),
None,
);
tokio::pin!(stream);

Expand Down Expand Up @@ -397,7 +410,13 @@ mod tests {

let (tx, _) = broadcast::channel::<()>(16);
let (da_tx, _) = broadcast::channel::<Vec<DaSseUpdate>>(1);
let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe());
let stream = make_event_stream(
dummy_pool(),
tracker,
tx.subscribe(),
da_tx.subscribe(),
None,
);
tokio::pin!(stream);

let _ = tokio::time::timeout(Duration::from_secs(1), stream.next())
Expand Down
5 changes: 5 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ mod tests {
let pool = sqlx::postgres::PgPoolOptions::new()
.connect_lazy("postgres://test@localhost:5432/test")
.expect("lazy pool");
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();
State(Arc::new(AppState {
pool,
block_events_tx: block_tx,
Expand All @@ -120,6 +123,8 @@ mod tests {
background_color_light: None,
success_color: None,
error_color: None,
metrics: crate::metrics::Metrics::new(),
prometheus_handle,
}))
}

Expand Down
25 changes: 22 additions & 3 deletions backend/crates/atlas-server/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod error;
pub mod handlers;

use axum::{routing::get, Router};
use axum::{middleware, routing::get, Router};
use metrics_exporter_prometheus::PrometheusHandle;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -13,6 +14,7 @@ use tower_http::trace::TraceLayer;
use crate::faucet::SharedFaucetBackend;
use crate::head::HeadTracker;
use crate::indexer::DaSseUpdate;
use crate::metrics::Metrics;

pub struct AppState {
pub pool: PgPool,
Expand All @@ -30,6 +32,8 @@ pub struct AppState {
pub background_color_light: Option<String>,
pub success_color: Option<String>,
pub error_color: Option<String>,
pub metrics: Metrics,
pub prometheus_handle: PrometheusHandle,
}

/// Build the Axum router.
Expand All @@ -42,6 +46,11 @@ pub fn build_router(state: Arc<AppState>, cors_origin: Option<String>) -> Router
.route("/api/events", get(handlers::sse::block_events))
.with_state(state.clone());

// Metrics route — excluded from TimeoutLayer (scrape can take a while)
let metrics_routes = Router::new()
.route("/metrics", get(handlers::metrics::metrics))
.with_state(state.clone());
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

let mut router = Router::new()
// Blocks
.route("/api/blocks", get(handlers::blocks::list_blocks))
Expand Down Expand Up @@ -171,7 +180,9 @@ pub fn build_router(state: Arc<AppState>, cors_origin: Option<String>) -> Router
// Config (white-label branding)
.route("/api/config", get(handlers::config::get_config))
// Health
.route("/health", get(|| async { "OK" }));
.route("/health", get(|| async { "OK" }))
.route("/health/live", get(handlers::health::liveness))
.route("/health/ready", get(handlers::health::readiness));

if state.faucet.is_some() {
router = router
Expand All @@ -187,8 +198,11 @@ pub fn build_router(state: Arc<AppState>, cors_origin: Option<String>) -> Router
axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10),
))
// Merge SSE routes (no TimeoutLayer so connections stay alive)
// HTTP metrics middleware — placed after routing so MatchedPath is available
.layer(middleware::from_fn(crate::metrics::http_metrics_middleware))
// Merge SSE and metrics routes (no TimeoutLayer so connections stay alive)
.merge(sse_routes)
.merge(metrics_routes)
// Shared layers applied to all routes
.layer(build_cors_layer(cors_origin))
.layer(TraceLayer::new_for_http())
Expand Down Expand Up @@ -262,6 +276,9 @@ mod tests {
let head_tracker = Arc::new(crate::head::HeadTracker::empty(10));
let (tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();
Arc::new(AppState {
pool,
block_events_tx: tx,
Expand All @@ -278,6 +295,8 @@ mod tests {
background_color_light: None,
success_color: None,
error_color: None,
metrics: Metrics::new(),
prometheus_handle,
})
}

Expand Down
9 changes: 9 additions & 0 deletions backend/crates/atlas-server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,15 @@ pub struct LogArgs {
help = "Log filter directive (e.g. info, atlas_server=debug)"
)]
pub level: String,

#[arg(
long = "atlas.log.format",
env = "LOG_FORMAT",
default_value = "text",
value_name = "FORMAT",
help = "Log output format: text or json"
)]
pub format: String,
}

// ── db subcommand ─────────────────────────────────────────────────────────────
Expand Down
1 change: 1 addition & 0 deletions backend/crates/atlas-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ mod tests_from_run_args {
},
log: cli::LogArgs {
level: "info".to_string(),
format: "text".to_string(),
},
}
}
Expand Down
Loading
Loading