Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
# Required: Your L2 RPC endpoint
RPC_URL=http://localhost:8545

# Human-readable name for your chain, displayed in the explorer UI
CHAIN_NAME="My Chain"

# Optional settings (defaults shown)
START_BLOCK=0
BATCH_SIZE=100
Expand Down
5 changes: 3 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ pub struct AppState {

### Frontend API client
- Base URL: `/api` (proxied by nginx to `atlas-server:3000`)
- `GET /api/status` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/status` polling on disconnect.
- Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- Chain status: `GET /api/status` → `{ chain_id, chain_name, block_height, total_transactions, total_addresses, indexed_at }` — full chain info, fetched once on page load.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/height` polling on disconnect.

## Important Conventions

Expand Down
57 changes: 51 additions & 6 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@ pub async fn get_latest_block(pool: &PgPool) -> Result<Option<Block>, sqlx::Erro
.fetch_optional(pool)
.await
}
fn exact_count_sql(table_name: &str) -> Result<&'static str, sqlx::Error> {
match table_name {
"transactions" => Ok("SELECT COUNT(*) FROM transactions"),
"addresses" => Ok("SELECT COUNT(*) FROM addresses"),
_ => Err(sqlx::Error::Protocol(format!(
"unsupported table for exact count: {table_name}"
))),
}
}

fn should_use_approximate_count(approx: i64) -> bool {
approx > 100_000
}

/// Get transactions table row count efficiently.
/// Get a table's row count efficiently.
/// - For tables > 100k rows: uses PostgreSQL's approximate count (instant, ~99% accurate)
/// - For smaller tables: uses exact COUNT(*) (fast enough)
///
/// This avoids the slow COUNT(*) full table scan on large tables.
pub async fn get_table_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
let table_name = "transactions";

pub async fn get_table_count(pool: &PgPool, table_name: &str) -> Result<i64, sqlx::Error> {
// Sum approximate reltuples across partitions if any, else use parent.
// This is instant and reasonably accurate for large tables.
// Cast to float8 (f64) since reltuples is float4 and SUM returns float4
Expand Down Expand Up @@ -57,13 +68,47 @@ pub async fn get_table_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
parent.0.unwrap_or(0.0) as i64
};

if approx > 100_000 {
if should_use_approximate_count(approx) {
Ok(approx)
} else {
// Exact count for small tables
let exact: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transactions")
let exact: (i64,) = sqlx::query_as(exact_count_sql(table_name)?)
.fetch_one(pool)
.await?;
Ok(exact.0)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn exact_count_sql_whitelists_supported_tables() {
assert_eq!(
exact_count_sql("transactions").unwrap(),
"SELECT COUNT(*) FROM transactions"
);
assert_eq!(
exact_count_sql("addresses").unwrap(),
"SELECT COUNT(*) FROM addresses"
);
}

#[test]
fn exact_count_sql_rejects_unsupported_tables() {
let err = exact_count_sql("blocks").unwrap_err();
assert!(err.to_string().contains("unsupported table"));
}

#[test]
fn should_use_approximate_count_above_threshold() {
assert!(should_use_approximate_count(100_001));
}

#[test]
fn should_use_approximate_count_uses_exact_count_at_threshold_and_below() {
assert!(!should_use_approximate_count(100_000));
assert!(!should_use_approximate_count(42));
}
}
7 changes: 3 additions & 4 deletions backend/crates/atlas-server/src/api/handlers/sse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use axum::{
extract::State,
response::sse::{Event, Sse},
response::IntoResponse,
};
use futures::stream::Stream;
use serde::Serialize;
Expand Down Expand Up @@ -94,9 +95,7 @@ fn make_block_stream(
/// New connections receive only the current latest block and then stream
/// forward from in-memory committed head state. Historical catch-up stays on
/// the canonical block endpoints.
pub async fn block_events(
State(state): State<Arc<AppState>>,
) -> Sse<axum::response::sse::KeepAliveStream<SseStream>> {
pub async fn block_events(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let stream = make_block_stream(
state.pool.clone(),
state.head_tracker.clone(),
Expand All @@ -105,7 +104,7 @@ pub async fn block_events(
sse_response(stream)
}

fn sse_response<S>(stream: S) -> Sse<axum::response::sse::KeepAliveStream<SseStream>>
fn sse_response<S>(stream: S) -> impl IntoResponse
where
S: Stream<Item = Result<Event, Infallible>> + Send + 'static,
{
Expand Down
72 changes: 50 additions & 22 deletions backend/crates/atlas-server/src/api/handlers/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@ use serde::Serialize;
use std::sync::Arc;

use crate::api::error::ApiResult;
use crate::api::handlers::get_table_count;
use crate::api::AppState;

#[derive(Serialize)]
pub struct HeightResponse {
pub block_height: i64,
pub indexed_at: String,
}

#[derive(Serialize)]
pub struct ChainStatus {
pub chain_id: String,
pub chain_name: String,
pub block_height: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub indexed_at: Option<String>,
pub total_transactions: i64,
pub total_addresses: i64,
pub indexed_at: String,
}

/// GET /api/status - Lightweight endpoint for current chain status
/// Returns in <1ms, optimized for frequent polling
pub async fn get_status(State(state): State<Arc<AppState>>) -> ApiResult<Json<ChainStatus>> {
async fn latest_height_and_indexed_at(state: &AppState) -> Result<(i64, String), sqlx::Error> {
if let Some(block) = state.head_tracker.latest().await {
return Ok(Json(ChainStatus {
block_height: block.number,
indexed_at: Some(block.indexed_at.to_rfc3339()),
}));
return Ok((block.number, block.indexed_at.to_rfc3339()));
}

// Fallback: single key-value lookup from indexer_state (sub-ms, avoids blocks table)
Expand All @@ -30,15 +35,36 @@ pub async fn get_status(State(state): State<Arc<AppState>>) -> ApiResult<Json<Ch
.await?;

if let Some((block_height, updated_at)) = row {
return Ok(Json(ChainStatus {
block_height,
indexed_at: Some(updated_at.to_rfc3339()),
}));
return Ok((block_height, updated_at.to_rfc3339()));
}

Ok((0, String::new()))
}

/// GET /api/height - Lightweight endpoint for current block height.
/// Returns in <1ms, optimized for frequent polling.
pub async fn get_height(State(state): State<Arc<AppState>>) -> ApiResult<Json<HeightResponse>> {
let (block_height, indexed_at) = latest_height_and_indexed_at(&state).await?;

Ok(Json(HeightResponse {
block_height,
indexed_at,
}))
}

/// GET /api/status - Full chain status including chain ID, name, and counts.
pub async fn get_status(State(state): State<Arc<AppState>>) -> ApiResult<Json<ChainStatus>> {
let (block_height, indexed_at) = latest_height_and_indexed_at(&state).await?;
let total_transactions = get_table_count(&state.pool, "transactions").await?;
let total_addresses = get_table_count(&state.pool, "addresses").await?;

Ok(Json(ChainStatus {
block_height: 0,
indexed_at: None,
chain_id: state.chain_id.to_string(),
chain_name: state.chain_name.clone(),
block_height,
total_transactions,
total_addresses,
indexed_at,
}))
}

Expand Down Expand Up @@ -72,32 +98,34 @@ mod tests {
block_events_tx: tx,
head_tracker,
rpc_url: String::new(),
chain_id: 1,
chain_name: "Test Chain".to_string(),
}))
}

#[tokio::test]
async fn status_returns_head_tracker_block() {
async fn height_returns_head_tracker_block() {
let tracker = Arc::new(HeadTracker::empty(10));
tracker
.publish_committed_batch(vec![sample_block(42)])
.await;

let result = get_status(test_state(tracker)).await;
let Json(status) = result.unwrap_or_else(|_| panic!("get_status should not fail"));
let result = get_height(test_state(tracker)).await;
let Json(status) = result.unwrap_or_else(|_| panic!("get_height should not fail"));

assert_eq!(status.block_height, 42);
assert!(status.indexed_at.is_some());
assert!(!status.indexed_at.is_empty());
}

#[tokio::test]
async fn status_returns_latest_head_after_multiple_publishes() {
async fn height_returns_latest_head_after_multiple_publishes() {
let tracker = Arc::new(HeadTracker::empty(10));
tracker
.publish_committed_batch(vec![sample_block(10), sample_block(11), sample_block(12)])
.await;

let result = get_status(test_state(tracker)).await;
let Json(status) = result.unwrap_or_else(|_| panic!("get_status should not fail"));
let result = get_height(test_state(tracker)).await;
let Json(status) = result.unwrap_or_else(|_| panic!("get_height should not fail"));

assert_eq!(status.block_height, 12);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub async fn list_transactions(
Query(pagination): Query<Pagination>,
) -> ApiResult<Json<PaginatedResponse<Transaction>>> {
// Use optimized count (approximate for large tables, exact for small)
let total = get_table_count(&state.pool).await?;
let total = get_table_count(&state.pool, "transactions").await?;

let transactions: Vec<Transaction> = sqlx::query_as(
"SELECT hash, block_number, block_index, from_address, to_address, value, gas_price, gas_used, input_data, status, contract_created, timestamp
Expand Down
4 changes: 3 additions & 1 deletion backend/crates/atlas-server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct AppState {
pub block_events_tx: broadcast::Sender<()>,
pub head_tracker: Arc<HeadTracker>,
pub rpc_url: String,
pub chain_id: u64,
pub chain_name: String,
}

/// Build the Axum router.
Expand Down Expand Up @@ -139,6 +141,7 @@ pub fn build_router(state: Arc<AppState>, cors_origin: Option<String>) -> Router
// Search
.route("/api/search", get(handlers::search::search))
// Status
.route("/api/height", get(handlers::status::get_height))
.route("/api/status", get(handlers::status::get_status))
// Health
.route("/health", get(|| async { "OK" }))
Expand All @@ -153,7 +156,6 @@ pub fn build_router(state: Arc<AppState>, cors_origin: Option<String>) -> Router
.layer(build_cors_layer(cors_origin))
.layer(TraceLayer::new_for_http())
}

/// Construct the CORS layer.
///
/// When `cors_origin` is `Some`, restrict to that exact origin.
Expand Down
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Config {
/// (backwards-compatible default for development / self-hosted deployments).
pub cors_origin: Option<String>,
pub sse_replay_buffer_blocks: usize,
pub chain_name: String,
}

impl Config {
Expand Down Expand Up @@ -98,6 +99,7 @@ impl Config {
.context("Invalid API_PORT")?,
cors_origin: env::var("CORS_ORIGIN").ok(),
sse_replay_buffer_blocks,
chain_name: env::var("CHAIN_NAME").unwrap_or_else(|_| "Unknown".to_string()),
})
}
}
Expand Down
Loading
Loading