Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }

# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

# Config
dotenvy = "0.15"

Expand All @@ -55,7 +59,7 @@ hex = "0.4"
chrono = { version = "0.4", features = ["serde"] }

# Testing
testcontainers = "0.27"
testcontainers = { version = "0.27", features = ["blocking"] }
testcontainers-modules = { version = "0.15", features = ["postgres"] }

# CLI
Expand Down
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
dotenvy = { workspace = true }
bigdecimal = { workspace = true }
hex = { workspace = true }
Expand Down
46 changes: 46 additions & 0 deletions backend/crates/atlas-server/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,34 @@ impl Deref for ApiError {
}
}

fn error_type(err: &AtlasError) -> Option<&'static str> {
match err {
AtlasError::Database(_) => Some("database"),
AtlasError::Internal(_) => Some("internal"),
AtlasError::Config(_) => Some("config"),
AtlasError::Rpc(_) => Some("rpc_request"),
AtlasError::MetadataFetch(_) => Some("metadata_fetch"),
_ => None,
}
}

impl IntoResponse for ApiError {
fn into_response(self) -> Response {
use atlas_common::AtlasError;

let status =
StatusCode::from_u16(self.0.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);

// Increment error counter for Prometheus alerting
if let Some(error_type) = error_type(&self.0) {
metrics::counter!(
"atlas_errors_total",
"component" => "api",
"error_type" => error_type
)
.increment(1);
}
Comment thread
pthmas marked this conversation as resolved.

// Determine the client-facing message based on error type.
// Internal details are logged server-side to avoid leaking stack traces or
// database internals to callers.
Expand Down Expand Up @@ -119,6 +140,31 @@ mod tests {
use super::*;
use axum::body::to_bytes;

#[test]
fn error_type_maps_expected_variants() {
assert_eq!(
error_type(&AtlasError::Database(sqlx::Error::RowNotFound)),
Some("database")
);
assert_eq!(
error_type(&AtlasError::Internal("x".to_string())),
Some("internal")
);
assert_eq!(
error_type(&AtlasError::Config("x".to_string())),
Some("config")
);
assert_eq!(
error_type(&AtlasError::Rpc("x".to_string())),
Some("rpc_request")
);
assert_eq!(
error_type(&AtlasError::MetadataFetch("x".to_string())),
Some("metadata_fetch")
);
assert_eq!(error_type(&AtlasError::NotFound("x".to_string())), None);
}

#[tokio::test]
async fn too_many_requests_sets_retry_after_header_and_body() {
let response = ApiError(AtlasError::TooManyRequests {
Expand Down
5 changes: 5 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ mod tests {
let head_tracker = Arc::new(crate::head::HeadTracker::empty(10));
let (tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();
Arc::new(AppState {
pool,
block_events_tx: tx,
Expand All @@ -178,6 +181,8 @@ mod tests {
background_color_light: None,
success_color: None,
error_color: None,
metrics: crate::metrics::Metrics::new(),
prometheus_handle,
})
}

Expand Down
194 changes: 194 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::sync::Arc;

use crate::api::AppState;

const MAX_INDEXER_AGE_MINUTES: i64 = 5;

#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
}

fn readiness_status(
latest_indexed_at: Option<DateTime<Utc>>,
now: DateTime<Utc>,
) -> (StatusCode, HealthResponse) {
let Some(indexed_at) = latest_indexed_at else {
return (
StatusCode::SERVICE_UNAVAILABLE,
HealthResponse {
status: "not_ready",
reason: Some("indexer state unavailable".to_string()),
},
);
};

let age = now - indexed_at;
if age > chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES) {
return (
StatusCode::SERVICE_UNAVAILABLE,
HealthResponse {
status: "not_ready",
reason: Some(format!(
"indexer stale: last block indexed {}s ago",
age.num_seconds()
)),
},
);
}

(
StatusCode::OK,
HealthResponse {
status: "ready",
reason: None,
},
)
}

/// GET /health/live — liveness probe (process is alive)
pub async fn liveness() -> impl IntoResponse {
Json(HealthResponse {
status: "ok",
reason: None,
})
}

/// GET /health/ready — readiness probe (DB reachable, indexer fresh)
pub async fn readiness(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// Check DB connectivity
if let Err(e) = sqlx::query("SELECT 1").execute(&state.pool).await {
tracing::warn!(error = %e, "readiness database check failed");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some("database unreachable".to_string()),
}),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
);
}

let latest = match super::status::latest_indexed_block(state.as_ref()).await {
Ok(latest) => latest,
Err(e) => {
tracing::warn!(error = %e, "readiness indexer state check failed");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some("indexer state unavailable".to_string()),
}),
);
}
};

let (status, body) = readiness_status(latest.map(|(_, indexed_at)| indexed_at), Utc::now());
(status, Json(body))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::head::HeadTracker;
use crate::metrics::Metrics;
use axum::body::to_bytes;
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use tokio::sync::broadcast;

fn app_state(pool: sqlx::PgPool, head_tracker: Arc<HeadTracker>) -> Arc<AppState> {
let (block_tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();

Arc::new(AppState {
pool,
block_events_tx: block_tx,
da_events_tx: da_tx,
head_tracker,
rpc_url: String::new(),
da_tracking_enabled: false,
faucet: None,
chain_id: 1,
chain_name: "Test Chain".to_string(),
chain_logo_url: None,
accent_color: None,
background_color_dark: None,
background_color_light: None,
success_color: None,
error_color: None,
metrics: Metrics::new(),
prometheus_handle,
})
}

async fn json_response(response: axum::response::Response) -> (StatusCode, serde_json::Value) {
let status = response.status();
let body = to_bytes(response.into_body(), usize::MAX)
.await
.expect("read response body");
let json = serde_json::from_slice(&body).expect("parse json response");
(status, json)
}

#[tokio::test]
async fn liveness_returns_ok() {
let (status, json) = json_response(liveness().await.into_response()).await;

assert_eq!(status, StatusCode::OK);
assert_eq!(json["status"], "ok");
assert!(json.get("reason").is_none());
}

#[tokio::test]
async fn readiness_returns_unavailable_when_database_is_down() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres://postgres:postgres@127.0.0.1:1/atlas")
.expect("create lazy pool");
let state = app_state(pool, Arc::new(HeadTracker::empty(10)));

let (status, json) = json_response(readiness(State(state)).await.into_response()).await;

assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(json["status"], "not_ready");
assert_eq!(json["reason"], "database unreachable");
}

#[test]
fn readiness_returns_unavailable_when_indexer_state_is_missing() {
let (status, body) = readiness_status(None, Utc::now());
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body.status, "not_ready");
assert_eq!(body.reason.as_deref(), Some("indexer state unavailable"));
}

#[test]
fn readiness_returns_unavailable_for_stale_indexer_state() {
let (status, body) = readiness_status(
Some(Utc::now() - chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES + 1)),
Utc::now(),
);
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body.status, "not_ready");
assert!(body
.reason
.as_deref()
.expect("reason string")
.contains("indexer stale"));
}

#[test]
fn readiness_returns_ready_for_fresh_indexer_state() {
let (status, body) = readiness_status(Some(Utc::now()), Utc::now());
assert_eq!(status, StatusCode::OK);
assert_eq!(body.status, "ready");
assert!(body.reason.is_none());
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
63 changes: 63 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use axum::extract::State;
use std::sync::Arc;

use crate::api::AppState;

/// GET /metrics — Prometheus text format
pub async fn metrics(State(state): State<Arc<AppState>>) -> String {
state.prometheus_handle.render()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

#[cfg(test)]
mod tests {
use super::*;
use crate::head::HeadTracker;
use crate::metrics::Metrics;
use sqlx::postgres::PgPoolOptions;
use std::sync::OnceLock;
use tokio::sync::broadcast;

fn test_prometheus_handle() -> metrics_exporter_prometheus::PrometheusHandle {
static PROMETHEUS_HANDLE: OnceLock<metrics_exporter_prometheus::PrometheusHandle> =
OnceLock::new();

PROMETHEUS_HANDLE
.get_or_init(crate::metrics::install_prometheus_recorder)
.clone()
}

#[tokio::test]
async fn metrics_handler_renders_prometheus_output() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres://test@localhost:5432/test")
.expect("lazy pool");
let (block_tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = test_prometheus_handle();
let recorder_metrics = Metrics::new();
recorder_metrics.set_indexer_head_block(42);
let state = Arc::new(AppState {
pool,
block_events_tx: block_tx,
da_events_tx: da_tx,
head_tracker: Arc::new(HeadTracker::empty(10)),
rpc_url: String::new(),
da_tracking_enabled: false,
faucet: None,
chain_id: 1,
chain_name: "Test Chain".to_string(),
chain_logo_url: None,
accent_color: None,
background_color_dark: None,
background_color_light: None,
success_color: None,
error_color: None,
metrics: recorder_metrics,
prometheus_handle,
});

let body = super::metrics(State(state)).await;

assert!(body.contains("atlas_indexer_head_block"));
}
}
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ pub mod blocks;
pub mod config;
pub mod etherscan;
pub mod faucet;
pub mod health;
pub mod logs;
pub mod metrics;
pub mod nfts;
pub mod proxy;
pub mod search;
Expand Down
Loading
Loading