Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions src/lean_spec/node/anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Two sources land on the same return shape:

- Genesis: synthesise the store from the genesis validator set.
- Checkpoint: fetch a finalized state from a peer and build the store from it.
- Checkpoint: fetch a finalized block and state from a peer and build the store.

Once the store exists the protocol cannot tell the two sources apart.
"""
Expand All @@ -19,14 +19,12 @@
from lean_spec.node.networking.reqresp.message import Status
from lean_spec.node.sync.checkpoint_sync import (
CheckpointSyncError,
fetch_finalized_block,
fetch_finalized_state,
verify_checkpoint_state,
)
from lean_spec.spec.crypto.merkleization import hash_tree_root
from lean_spec.spec.forks import (
AggregatedAttestations,
Block,
BlockBody,
Checkpoint,
ForkProtocol,
Slot,
Expand Down Expand Up @@ -81,10 +79,12 @@ async def from_checkpoint(
validator_index: ValidatorIndex | None,
) -> Anchor:
"""
Build an anchor by fetching a finalized state from a peer.
Build an anchor by fetching a finalized block and state from a peer.

The fetched state replaces the genesis validator set.
Deposits and exits since genesis are already baked into it.
The fetched block anchors the store at the same finalized root the
network agrees on; a source that cannot serve it cannot be used.

Args:
url: HTTP endpoint of the node serving the checkpoint state.
Expand All @@ -96,6 +96,10 @@ async def from_checkpoint(
CheckpointSyncError: For every failure mode covering transport,
structural verification, and genesis-time mismatch.
"""
# The block comes first: it is small, so an incapable source fails
# fast before the multi-megabyte state download starts.
signed_block = await fetch_finalized_block(url)

state = await fetch_finalized_state(url, fork.state_class)

# Catches a corrupt download before it contaminates the forkchoice store.
Expand All @@ -110,24 +114,17 @@ async def from_checkpoint(
f"local={genesis.genesis_time}"
)

# Reconstruct the anchor block from the header embedded in the state.
# A header stored before its post-state root carries a zero placeholder;
# in that case we recompute the root from the state itself.
# Fork choice only needs identity and lineage, so the body is left empty.
header = state.latest_block_header
state_root = (
header.state_root if header.state_root != Bytes32.zero() else hash_tree_root(state)
)
anchor_block = Block(
slot=header.slot,
proposer_index=header.proposer_index,
parent_root=header.parent_root,
state_root=state_root,
body=BlockBody(attestations=AggregatedAttestations(data=[])),
)
# Both fetches read the snapshot at the finalized root.
# A pairing mismatch means finalization advanced between the two
# requests; refetching is the fix.
if signed_block.block.state_root != hash_tree_root(state):
raise CheckpointSyncError(
"anchor block / state mismatch; "
"source advanced finalization between requests, retry"
)

# The protocol return type is structural, but only one concrete store ships.
store = cast(Store, fork.create_store(state, anchor_block, validator_index))
store = cast(Store, fork.create_store(state, signed_block.block, validator_index))
head_slot = store.blocks[store.head].slot

return cls(
Expand Down
60 changes: 60 additions & 0 deletions src/lean_spec/node/api/endpoints/blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Blocks endpoint handlers."""

from __future__ import annotations

import asyncio
import logging

from aiohttp import web

logger = logging.getLogger(__name__)


async def handle_finalized(request: web.Request) -> web.Response:
"""
Handle finalized signed block request.

Returns the signed block for the finalized checkpoint as raw SSZ bytes
(not snappy compressed).

Together with the finalized state endpoint, this gives a
checkpoint-syncing peer the (state, signed block) anchor pair.
External consumers (other client implementations and the hive
simulator) bootstrap their fork-choice store from this pair.

The fork-choice store holds only unsigned blocks.
Serving a signed block therefore needs a separate signed-block source,
injected into the server by the embedding node.
Nodes without such a source answer with 503.

Response: SSZ-encoded SignedBlock (binary, application/octet-stream)

Status Codes:
200 OK: Signed block returned successfully.
404 Not Found: Finalized signed block not available on this node.
503 Service Unavailable: Store or signed-block source not initialized.
"""
store_getter = request.app.get("store_getter")
store = store_getter() if store_getter else None

if store is None:
raise web.HTTPServiceUnavailable(reason="Store not initialized")

signed_block_getter = request.app.get("signed_block_getter")

if signed_block_getter is None:
raise web.HTTPServiceUnavailable(reason="Signed block source not configured")

signed_block = signed_block_getter(store.latest_finalized.root)

if signed_block is None:
raise web.HTTPNotFound(reason="Finalized signed block not available")

# Implementation detail: offload CPU-intensive encoding to thread pool
try:
ssz_bytes = await asyncio.to_thread(signed_block.encode_bytes)
except Exception as exception:
logger.error("Failed to encode signed block: %s", exception)
raise web.HTTPInternalServerError(reason="Encoding failed") from exception

return web.Response(body=ssz_bytes, content_type="application/octet-stream")
2 changes: 2 additions & 0 deletions src/lean_spec/node/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from lean_spec.node.api.endpoints import (
aggregator,
blocks,
checkpoints,
fork_choice,
health,
Expand All @@ -21,6 +22,7 @@
ROUTES: dict[str, Handler] = {
"/lean/v0/health": health.handle,
"/lean/v0/states/finalized": states.handle_finalized,
"/lean/v0/blocks/finalized": blocks.handle_finalized,
"/lean/v0/checkpoints/justified": checkpoints.handle_justified,
"/lean/v0/fork_choice": fork_choice.handle,
"/metrics": metrics.handle,
Expand Down
17 changes: 16 additions & 1 deletion src/lean_spec/node/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

from lean_spec.node.api.aggregator_controller import AggregatorController
from lean_spec.node.api.routes import ADMIN_ROUTES, ROUTES
from lean_spec.spec.forks import LstarSpec, Store
from lean_spec.spec.forks import LstarSpec, SignedBlock, Store
from lean_spec.spec.ssz import Bytes32

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,6 +68,16 @@ class ApiServer:
store_getter: Callable[[], Store | None] | None = None
"""Callable that returns the current Store instance."""

signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None
"""
Optional callable returning the signed block for a block root.

The fork-choice store retains only unsigned blocks, so serving the
checkpoint-sync anchor block needs a separate signed-block source.
The embedding node injects one here.
When absent, the finalized block endpoint returns 503.
"""

aggregator_controller: AggregatorController | None = None
"""
Optional controller for toggling the aggregator role at runtime.
Expand Down Expand Up @@ -96,6 +107,10 @@ async def start(self) -> None:
# Store the store_getter in app for handlers that need store access
app["store_getter"] = self.store_getter

# Expose the signed-block lookup for endpoints serving signed blocks.
# Absence is fine; the finalized block endpoint returns 503 when unset.
app["signed_block_getter"] = self.signed_block_getter

# Expose the fork spec for handlers that drive consensus computations.
app["spec"] = self.spec

Expand Down
59 changes: 58 additions & 1 deletion src/lean_spec/node/sync/checkpoint_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import httpx

from lean_spec.spec.crypto.merkleization import hash_tree_root
from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, State
from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, State

logger = logging.getLogger(__name__)

Expand All @@ -34,6 +34,13 @@
FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized"
"""API endpoint for fetching finalized state. Follows Beacon API conventions."""

FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized"
"""API endpoint for fetching the signed block matching the finalized state.

Together with the state, this forms the anchor pair for store creation.
Checkpoint sync requires the source to serve it.
"""


class CheckpointSyncError(Exception):
"""
Expand Down Expand Up @@ -102,6 +109,56 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State:
raise CheckpointSyncError(f"Failed to fetch state: {exception}") from exception


async def fetch_finalized_block(url: str) -> SignedBlock:
"""
Fetch the signed block matching the finalized state via checkpoint sync.

The returned block carries the real body, so its hash tree root equals
the finalized root the rest of the network agrees on.
The caller must verify the block's state root against the fetched state
before pairing them into a store.

Args:
url: Base URL of the node API (e.g., "http://localhost:5052").

Returns:
The finalized signed block.

Raises:
CheckpointSyncError: If the request fails or block bytes are invalid.
"""
base_url = url.rstrip("/")
full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}"

logger.info("Fetching finalized signed block from %s", full_url)

headers = {"Accept": "application/octet-stream"}

try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
response = await client.get(full_url, headers=headers)
response.raise_for_status()

ssz_data = response.content
logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data))

signed_block = SignedBlock.decode_bytes(ssz_data)
logger.info("Deserialized signed block at slot %s", signed_block.block.slot)

return signed_block

except httpx.RequestError as exception:
raise CheckpointSyncError(
f"Network error while connecting to {exception.request.url}: {exception}"
) from exception
except httpx.HTTPStatusError as exception:
raise CheckpointSyncError(
f"HTTP error {exception.response.status_code}: {exception.response.text[:200]}"
) from exception
except Exception as exception:
raise CheckpointSyncError(f"Failed to fetch signed block: {exception}") from exception


def verify_checkpoint_state(state: State) -> bool:
"""
Verify that a checkpoint state is structurally valid.
Expand Down
16 changes: 16 additions & 0 deletions tests/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

from consensus_testing import make_genesis_store
from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig
from lean_spec.spec.forks import SignedBlock
from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate
from lean_spec.spec.ssz import ByteList512KiB, Bytes32

# Default port for auto-started local server
DEFAULT_PORT = 15099
Expand Down Expand Up @@ -67,11 +70,24 @@ def _create_server(self) -> ApiServer:
"""Create the API server with a test store and aggregator controller."""
store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time()))

def signed_block_for(root: Bytes32) -> SignedBlock | None:
# The store retains only unsigned blocks.
# Wrap the anchor block with an empty proof, like a node
# serving a genesis anchor that no proposer ever signed.
block = store.blocks.get(root)
if block is None:
return None
return SignedBlock(
block=block,
proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")),
)

controller = _make_conformance_controller(initial=False)
config = ApiServerConfig(host="127.0.0.1", port=self.port)
return ApiServer(
config=config,
store_getter=lambda: store,
signed_block_getter=signed_block_for,
aggregator_controller=controller,
)

Expand Down
54 changes: 54 additions & 0 deletions tests/api/endpoints/test_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Tests for the blocks endpoints."""

import httpx

from lean_spec.spec.crypto.merkleization import hash_tree_root
from lean_spec.spec.forks import SignedBlock
from lean_spec.spec.forks.lstar import State


def get_finalized_block(server_url: str) -> httpx.Response:
"""Fetch the finalized signed block from the server."""
return httpx.get(
f"{server_url}/lean/v0/blocks/finalized",
headers={"Accept": "application/octet-stream"},
)


class TestFinalizedBlock:
"""Tests for the /lean/v0/blocks/finalized endpoint."""

def test_returns_200(self, server_url: str) -> None:
"""Finalized block endpoint returns 200 status code."""
response = get_finalized_block(server_url)
assert response.status_code == 200

def test_content_type_is_octet_stream(self, server_url: str) -> None:
"""Finalized block endpoint returns octet-stream content type."""
response = get_finalized_block(server_url)
content_type = response.headers.get("content-type", "")
assert "application/octet-stream" in content_type

def test_ssz_deserializes(self, server_url: str) -> None:
"""Finalized block SSZ bytes deserialize to a valid SignedBlock object."""
response = get_finalized_block(server_url)
signed_block = SignedBlock.decode_bytes(response.content)
assert signed_block is not None

def test_state_root_matches_finalized_state(self, server_url: str) -> None:
"""
Returned block's state root equals the finalized state's hash tree root.

Store creation from a checkpoint asserts exactly this.
If it fails, the (state, signed block) pair cannot bootstrap a store.
"""
block_response = get_finalized_block(server_url)
signed_block = SignedBlock.decode_bytes(block_response.content)

state_response = httpx.get(
f"{server_url}/lean/v0/states/finalized",
headers={"Accept": "application/octet-stream"},
)
state = State.decode_bytes(state_response.content)

assert signed_block.block.state_root == hash_tree_root(state)
Loading
Loading