diff --git a/src/lean_spec/node/anchor.py b/src/lean_spec/node/anchor.py index da58644ef..119971b4e 100644 --- a/src/lean_spec/node/anchor.py +++ b/src/lean_spec/node/anchor.py @@ -5,7 +5,7 @@ Two sources land on the same return shape: - Genesis: synthesise the store from the genesis validator set. -- Checkpoint: fetch a finalized state from a peer and build the store from it. +- Checkpoint: fetch a finalized block and state from a peer and build the store. Once the store exists the protocol cannot tell the two sources apart. """ @@ -19,14 +19,12 @@ from lean_spec.node.networking.reqresp.message import Status from lean_spec.node.sync.checkpoint_sync import ( CheckpointSyncError, + fetch_finalized_block, fetch_finalized_state, verify_checkpoint_state, ) from lean_spec.spec.crypto.merkleization import hash_tree_root from lean_spec.spec.forks import ( - AggregatedAttestations, - Block, - BlockBody, Checkpoint, ForkProtocol, Slot, @@ -81,10 +79,12 @@ async def from_checkpoint( validator_index: ValidatorIndex | None, ) -> Anchor: """ - Build an anchor by fetching a finalized state from a peer. + Build an anchor by fetching a finalized block and state from a peer. The fetched state replaces the genesis validator set. Deposits and exits since genesis are already baked into it. + The fetched block anchors the store at the same finalized root the + network agrees on; a source that cannot serve it cannot be used. Args: url: HTTP endpoint of the node serving the checkpoint state. @@ -96,6 +96,10 @@ async def from_checkpoint( CheckpointSyncError: For every failure mode covering transport, structural verification, and genesis-time mismatch. """ + # The block comes first: it is small, so an incapable source fails + # fast before the multi-megabyte state download starts. + signed_block = await fetch_finalized_block(url) + state = await fetch_finalized_state(url, fork.state_class) # Catches a corrupt download before it contaminates the forkchoice store. @@ -110,24 +114,17 @@ async def from_checkpoint( f"local={genesis.genesis_time}" ) - # Reconstruct the anchor block from the header embedded in the state. - # A header stored before its post-state root carries a zero placeholder; - # in that case we recompute the root from the state itself. - # Fork choice only needs identity and lineage, so the body is left empty. - header = state.latest_block_header - state_root = ( - header.state_root if header.state_root != Bytes32.zero() else hash_tree_root(state) - ) - anchor_block = Block( - slot=header.slot, - proposer_index=header.proposer_index, - parent_root=header.parent_root, - state_root=state_root, - body=BlockBody(attestations=AggregatedAttestations(data=[])), - ) + # Both fetches read the snapshot at the finalized root. + # A pairing mismatch means finalization advanced between the two + # requests; refetching is the fix. + if signed_block.block.state_root != hash_tree_root(state): + raise CheckpointSyncError( + "anchor block / state mismatch; " + "source advanced finalization between requests, retry" + ) # The protocol return type is structural, but only one concrete store ships. - store = cast(Store, fork.create_store(state, anchor_block, validator_index)) + store = cast(Store, fork.create_store(state, signed_block.block, validator_index)) head_slot = store.blocks[store.head].slot return cls( diff --git a/src/lean_spec/node/api/endpoints/blocks.py b/src/lean_spec/node/api/endpoints/blocks.py new file mode 100644 index 000000000..2a7a2585e --- /dev/null +++ b/src/lean_spec/node/api/endpoints/blocks.py @@ -0,0 +1,60 @@ +"""Blocks endpoint handlers.""" + +from __future__ import annotations + +import asyncio +import logging + +from aiohttp import web + +logger = logging.getLogger(__name__) + + +async def handle_finalized(request: web.Request) -> web.Response: + """ + Handle finalized signed block request. + + Returns the signed block for the finalized checkpoint as raw SSZ bytes + (not snappy compressed). + + Together with the finalized state endpoint, this gives a + checkpoint-syncing peer the (state, signed block) anchor pair. + External consumers (other client implementations and the hive + simulator) bootstrap their fork-choice store from this pair. + + The fork-choice store holds only unsigned blocks. + Serving a signed block therefore needs a separate signed-block source, + injected into the server by the embedding node. + Nodes without such a source answer with 503. + + Response: SSZ-encoded SignedBlock (binary, application/octet-stream) + + Status Codes: + 200 OK: Signed block returned successfully. + 404 Not Found: Finalized signed block not available on this node. + 503 Service Unavailable: Store or signed-block source not initialized. + """ + store_getter = request.app.get("store_getter") + store = store_getter() if store_getter else None + + if store is None: + raise web.HTTPServiceUnavailable(reason="Store not initialized") + + signed_block_getter = request.app.get("signed_block_getter") + + if signed_block_getter is None: + raise web.HTTPServiceUnavailable(reason="Signed block source not configured") + + signed_block = signed_block_getter(store.latest_finalized.root) + + if signed_block is None: + raise web.HTTPNotFound(reason="Finalized signed block not available") + + # Implementation detail: offload CPU-intensive encoding to thread pool + try: + ssz_bytes = await asyncio.to_thread(signed_block.encode_bytes) + except Exception as exception: + logger.error("Failed to encode signed block: %s", exception) + raise web.HTTPInternalServerError(reason="Encoding failed") from exception + + return web.Response(body=ssz_bytes, content_type="application/octet-stream") diff --git a/src/lean_spec/node/api/routes.py b/src/lean_spec/node/api/routes.py index 1f03bd3d9..edb4ca73b 100644 --- a/src/lean_spec/node/api/routes.py +++ b/src/lean_spec/node/api/routes.py @@ -8,6 +8,7 @@ from lean_spec.node.api.endpoints import ( aggregator, + blocks, checkpoints, fork_choice, health, @@ -21,6 +22,7 @@ ROUTES: dict[str, Handler] = { "/lean/v0/health": health.handle, "/lean/v0/states/finalized": states.handle_finalized, + "/lean/v0/blocks/finalized": blocks.handle_finalized, "/lean/v0/checkpoints/justified": checkpoints.handle_justified, "/lean/v0/fork_choice": fork_choice.handle, "/metrics": metrics.handle, diff --git a/src/lean_spec/node/api/server.py b/src/lean_spec/node/api/server.py index 321408991..b98998856 100644 --- a/src/lean_spec/node/api/server.py +++ b/src/lean_spec/node/api/server.py @@ -16,7 +16,8 @@ from lean_spec.node.api.aggregator_controller import AggregatorController from lean_spec.node.api.routes import ADMIN_ROUTES, ROUTES -from lean_spec.spec.forks import LstarSpec, Store +from lean_spec.spec.forks import LstarSpec, SignedBlock, Store +from lean_spec.spec.ssz import Bytes32 logger = logging.getLogger(__name__) @@ -67,6 +68,16 @@ class ApiServer: store_getter: Callable[[], Store | None] | None = None """Callable that returns the current Store instance.""" + signed_block_getter: Callable[[Bytes32], SignedBlock | None] | None = None + """ + Optional callable returning the signed block for a block root. + + The fork-choice store retains only unsigned blocks, so serving the + checkpoint-sync anchor block needs a separate signed-block source. + The embedding node injects one here. + When absent, the finalized block endpoint returns 503. + """ + aggregator_controller: AggregatorController | None = None """ Optional controller for toggling the aggregator role at runtime. @@ -96,6 +107,10 @@ async def start(self) -> None: # Store the store_getter in app for handlers that need store access app["store_getter"] = self.store_getter + # Expose the signed-block lookup for endpoints serving signed blocks. + # Absence is fine; the finalized block endpoint returns 503 when unset. + app["signed_block_getter"] = self.signed_block_getter + # Expose the fork spec for handlers that drive consensus computations. app["spec"] = self.spec diff --git a/src/lean_spec/node/sync/checkpoint_sync.py b/src/lean_spec/node/sync/checkpoint_sync.py index 70fc6dd59..66f5eb5c0 100644 --- a/src/lean_spec/node/sync/checkpoint_sync.py +++ b/src/lean_spec/node/sync/checkpoint_sync.py @@ -24,7 +24,7 @@ import httpx from lean_spec.spec.crypto.merkleization import hash_tree_root -from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, State +from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, State logger = logging.getLogger(__name__) @@ -34,6 +34,13 @@ FINALIZED_STATE_ENDPOINT: Final = "/lean/v0/states/finalized" """API endpoint for fetching finalized state. Follows Beacon API conventions.""" +FINALIZED_BLOCK_ENDPOINT: Final = "/lean/v0/blocks/finalized" +"""API endpoint for fetching the signed block matching the finalized state. + +Together with the state, this forms the anchor pair for store creation. +Checkpoint sync requires the source to serve it. +""" + class CheckpointSyncError(Exception): """ @@ -102,6 +109,56 @@ async def fetch_finalized_state(url: str, state_class: type[State]) -> State: raise CheckpointSyncError(f"Failed to fetch state: {exception}") from exception +async def fetch_finalized_block(url: str) -> SignedBlock: + """ + Fetch the signed block matching the finalized state via checkpoint sync. + + The returned block carries the real body, so its hash tree root equals + the finalized root the rest of the network agrees on. + The caller must verify the block's state root against the fetched state + before pairing them into a store. + + Args: + url: Base URL of the node API (e.g., "http://localhost:5052"). + + Returns: + The finalized signed block. + + Raises: + CheckpointSyncError: If the request fails or block bytes are invalid. + """ + base_url = url.rstrip("/") + full_url = f"{base_url}{FINALIZED_BLOCK_ENDPOINT}" + + logger.info("Fetching finalized signed block from %s", full_url) + + headers = {"Accept": "application/octet-stream"} + + try: + async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: + response = await client.get(full_url, headers=headers) + response.raise_for_status() + + ssz_data = response.content + logger.info("Downloaded %d bytes of SSZ signed block data", len(ssz_data)) + + signed_block = SignedBlock.decode_bytes(ssz_data) + logger.info("Deserialized signed block at slot %s", signed_block.block.slot) + + return signed_block + + except httpx.RequestError as exception: + raise CheckpointSyncError( + f"Network error while connecting to {exception.request.url}: {exception}" + ) from exception + except httpx.HTTPStatusError as exception: + raise CheckpointSyncError( + f"HTTP error {exception.response.status_code}: {exception.response.text[:200]}" + ) from exception + except Exception as exception: + raise CheckpointSyncError(f"Failed to fetch signed block: {exception}") from exception + + def verify_checkpoint_state(state: State) -> bool: """ Verify that a checkpoint state is structurally valid. diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 24a498e66..a64515195 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -11,6 +11,9 @@ from consensus_testing import make_genesis_store from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.spec.forks import SignedBlock +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 # Default port for auto-started local server DEFAULT_PORT = 15099 @@ -67,11 +70,24 @@ def _create_server(self) -> ApiServer: """Create the API server with a test store and aggregator controller.""" store = make_genesis_store(num_validators=3, observer=True, genesis_time=int(time.time())) + def signed_block_for(root: Bytes32) -> SignedBlock | None: + # The store retains only unsigned blocks. + # Wrap the anchor block with an empty proof, like a node + # serving a genesis anchor that no proposer ever signed. + block = store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + controller = _make_conformance_controller(initial=False) config = ApiServerConfig(host="127.0.0.1", port=self.port) return ApiServer( config=config, store_getter=lambda: store, + signed_block_getter=signed_block_for, aggregator_controller=controller, ) diff --git a/tests/api/endpoints/test_blocks.py b/tests/api/endpoints/test_blocks.py new file mode 100644 index 000000000..e0f4d4146 --- /dev/null +++ b/tests/api/endpoints/test_blocks.py @@ -0,0 +1,54 @@ +"""Tests for the blocks endpoints.""" + +import httpx + +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock +from lean_spec.spec.forks.lstar import State + + +def get_finalized_block(server_url: str) -> httpx.Response: + """Fetch the finalized signed block from the server.""" + return httpx.get( + f"{server_url}/lean/v0/blocks/finalized", + headers={"Accept": "application/octet-stream"}, + ) + + +class TestFinalizedBlock: + """Tests for the /lean/v0/blocks/finalized endpoint.""" + + def test_returns_200(self, server_url: str) -> None: + """Finalized block endpoint returns 200 status code.""" + response = get_finalized_block(server_url) + assert response.status_code == 200 + + def test_content_type_is_octet_stream(self, server_url: str) -> None: + """Finalized block endpoint returns octet-stream content type.""" + response = get_finalized_block(server_url) + content_type = response.headers.get("content-type", "") + assert "application/octet-stream" in content_type + + def test_ssz_deserializes(self, server_url: str) -> None: + """Finalized block SSZ bytes deserialize to a valid SignedBlock object.""" + response = get_finalized_block(server_url) + signed_block = SignedBlock.decode_bytes(response.content) + assert signed_block is not None + + def test_state_root_matches_finalized_state(self, server_url: str) -> None: + """ + Returned block's state root equals the finalized state's hash tree root. + + Store creation from a checkpoint asserts exactly this. + If it fails, the (state, signed block) pair cannot bootstrap a store. + """ + block_response = get_finalized_block(server_url) + signed_block = SignedBlock.decode_bytes(block_response.content) + + state_response = httpx.get( + f"{server_url}/lean/v0/states/finalized", + headers={"Accept": "application/octet-stream"}, + ) + state = State.decode_bytes(state_response.content) + + assert signed_block.block.state_root == hash_tree_root(state) diff --git a/tests/node/api/test_server.py b/tests/node/api/test_server.py index 1f616aef0..f466b4d44 100644 --- a/tests/node/api/test_server.py +++ b/tests/node/api/test_server.py @@ -12,12 +12,17 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass, field import httpx from lean_spec.node.api import AggregatorController, ApiServer, ApiServerConfig +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock from lean_spec.spec.forks.lstar import Store +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 @dataclass(slots=True) @@ -102,6 +107,102 @@ async def test_returns_503_when_store_not_initialized(self) -> None: await server.aclose() +class TestFinalizedBlockEndpoint: + """Tests for the /lean/v0/blocks/finalized endpoint.""" + + @staticmethod + def _signed_block_getter_for(store: Store) -> Callable[[Bytes32], SignedBlock | None]: + """Build a signed-block lookup wrapping the store's unsigned blocks.""" + + def signed_block_for(root: Bytes32) -> SignedBlock | None: + block = store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + + return signed_block_for + + async def test_returns_503_when_store_not_initialized(self) -> None: + """Endpoint returns 503 Service Unavailable when store is not set.""" + config = ApiServerConfig(port=15071) + server = ApiServer(config=config) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15071/lean/v0/blocks/finalized") + + assert response.status_code == 503 + + finally: + await server.aclose() + + async def test_returns_503_without_signed_block_source(self, base_store: Store) -> None: + """Endpoint returns 503 when no signed-block source is configured.""" + config = ApiServerConfig(port=15072) + server = ApiServer(config=config, store_getter=lambda: base_store) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15072/lean/v0/blocks/finalized") + + assert response.status_code == 503 + + finally: + await server.aclose() + + async def test_returns_404_when_block_unavailable(self, base_store: Store) -> None: + """Endpoint returns 404 when the source has no block for the finalized root.""" + config = ApiServerConfig(port=15073) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=lambda root: None, + ) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15073/lean/v0/blocks/finalized") + + assert response.status_code == 404 + + finally: + await server.aclose() + + async def test_returns_finalized_anchor_block(self, base_store: Store) -> None: + """Endpoint serves the signed block matching the finalized checkpoint root.""" + config = ApiServerConfig(port=15074) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=self._signed_block_getter_for(base_store), + ) + + await server.start() + + try: + async with httpx.AsyncClient() as client: + response = await client.get("http://127.0.0.1:15074/lean/v0/blocks/finalized") + + assert response.status_code == 200 + assert "application/octet-stream" in response.headers["content-type"] + + signed_block = SignedBlock.decode_bytes(response.content) + + assert hash_tree_root(signed_block.block) == base_store.latest_finalized.root + + finally: + await server.aclose() + + class TestJustifiedCheckpointEndpoint: """Tests for the /lean/v0/checkpoints/justified endpoint error handling.""" diff --git a/tests/node/sync/test_checkpoint_sync.py b/tests/node/sync/test_checkpoint_sync.py index e2257a25f..a7545f9c3 100644 --- a/tests/node/sync/test_checkpoint_sync.py +++ b/tests/node/sync/test_checkpoint_sync.py @@ -9,14 +9,18 @@ from lean_spec.node.api import ApiServer, ApiServerConfig from lean_spec.node.sync.checkpoint_sync import ( + FINALIZED_BLOCK_ENDPOINT, FINALIZED_STATE_ENDPOINT, CheckpointSyncError, + fetch_finalized_block, fetch_finalized_state, verify_checkpoint_state, ) -from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, Slot +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import VALIDATOR_REGISTRY_LIMIT, SignedBlock, Slot from lean_spec.spec.forks.lstar import State, Store -from lean_spec.spec.forks.lstar.containers import Validators +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate, Validators +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 class _MockTransport(httpx.AsyncBaseTransport): @@ -211,6 +215,79 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response: assert captured == [f"http://example.com{FINALIZED_STATE_ENDPOINT}"] +class TestFetchFinalizedBlock: + """ + Tests for error handling when fetching the finalized block over HTTP. + + Mirrors the state-fetch error tests: failures are injected through the + transport so the real httpx client and error wrapping run unchanged. + """ + + async def test_network_error_raises_checkpoint_sync_error(self) -> None: + """TCP-level failure surfaces as CheckpointSyncError with the URL.""" + transport = _MockTransport( + exc=httpx.RequestError( + "connection refused", + request=httpx.Request("GET", f"http://example.com{FINALIZED_BLOCK_ENDPOINT}"), + ) + ) + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value) == ( + "Network error while connecting to " + "http://example.com/lean/v0/blocks/finalized: connection refused" + ) + + @pytest.mark.parametrize( + ("status_code", "status_text"), + [ + (404, "Not Found"), + (503, "Service Unavailable"), + ], + ) + async def test_http_error_response_raises_checkpoint_sync_error( + self, status_code: int, status_text: str + ) -> None: + """ + Non-success HTTP status surfaces as CheckpointSyncError with the code. + + Covers missing endpoints (404) and sources without a signed-block + source (503), the two cases the anchor builder falls back on. + """ + transport = _MockTransport(status=status_code, content=status_text.encode()) + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value) == f"HTTP error {status_code}: {status_text}" + + async def test_corrupt_ssz_raises_checkpoint_sync_error(self) -> None: + """Corrupt response body surfaces as CheckpointSyncError.""" + transport = _MockTransport(content=b"\xff\xfe corrupt") + + with ( + patch( + "lean_spec.node.sync.checkpoint_sync.httpx.AsyncClient", + return_value=httpx.AsyncClient(transport=transport), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await fetch_finalized_block("http://example.com") + assert str(exception_info.value).startswith("Failed to fetch signed block: ") + + class TestCheckpointSyncClientServerIntegration: """Integration tests for checkpoint sync client fetching from server.""" @@ -232,3 +309,46 @@ async def test_client_fetches_and_deserializes_state(self, base_store: Store) -> finally: await server.aclose() + + async def test_client_fetches_and_deserializes_block(self, base_store: Store) -> None: + """Client fetches the finalized block the server's signed-block source provides.""" + + def signed_block_for(root: Bytes32) -> SignedBlock | None: + block = base_store.blocks.get(root) + if block is None: + return None + return SignedBlock( + block=block, + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) + + config = ApiServerConfig(port=15075) + server = ApiServer( + config=config, + store_getter=lambda: base_store, + signed_block_getter=signed_block_for, + ) + + await server.start() + + try: + signed_block = await fetch_finalized_block("http://127.0.0.1:15075") + + assert hash_tree_root(signed_block.block) == base_store.latest_finalized.root + + finally: + await server.aclose() + + async def test_block_fetch_raises_when_source_unconfigured(self, base_store: Store) -> None: + """A server without a signed-block source yields the 503 error path.""" + config = ApiServerConfig(port=15076) + server = ApiServer(config=config, store_getter=lambda: base_store) + + await server.start() + + try: + with pytest.raises(CheckpointSyncError, match="HTTP error 503"): + await fetch_finalized_block("http://127.0.0.1:15076") + + finally: + await server.aclose() diff --git a/tests/node/test_anchor.py b/tests/node/test_anchor.py index 0979ee86f..0188f405f 100644 --- a/tests/node/test_anchor.py +++ b/tests/node/test_anchor.py @@ -6,13 +6,24 @@ import pytest -from consensus_testing import make_genesis_state +from consensus_testing import make_genesis_block, make_genesis_state from lean_spec.node.anchor import Anchor from lean_spec.node.genesis import GenesisConfig from lean_spec.node.sync.checkpoint_sync import CheckpointSyncError -from lean_spec.spec.forks import Slot +from lean_spec.spec.crypto.merkleization import hash_tree_root +from lean_spec.spec.forks import SignedBlock, Slot +from lean_spec.spec.forks.lstar import State +from lean_spec.spec.forks.lstar.containers import MultiMessageAggregate from lean_spec.spec.forks.lstar.spec import LstarSpec -from lean_spec.spec.ssz import Bytes32 +from lean_spec.spec.ssz import ByteList512KiB, Bytes32 + + +def _signed_genesis_block(state: State) -> SignedBlock: + """Wrap the genesis block matching a state with an empty proof.""" + return SignedBlock( + block=make_genesis_block(state), + proof=MultiMessageAggregate(proof=ByteList512KiB(data=b"")), + ) class TestAnchorFromGenesis: @@ -43,6 +54,11 @@ async def test_genesis_time_mismatch_raises(self) -> None: ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -66,6 +82,11 @@ async def test_verification_failure_raises(self) -> None: ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -85,13 +106,41 @@ async def test_verification_failure_raises(self) -> None: ) assert str(exception_info.value) == ("checkpoint state failed structural verification") - async def test_network_error_propagates(self) -> None: - """Network errors surface as CheckpointSyncError.""" + async def test_block_fetch_failure_propagates(self) -> None: + """A source that cannot serve the finalized block aborts checkpoint sync.""" local_genesis = GenesisConfig.model_validate( {"GENESIS_TIME": 1000, "GENESIS_VALIDATORS": []} ) with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + side_effect=CheckpointSyncError("HTTP error 503: no signed block source"), + ), + pytest.raises(CheckpointSyncError) as exception_info, + ): + await Anchor.from_checkpoint( + url="http://localhost:5052", + genesis=local_genesis, + fork=LstarSpec(), + validator_index=None, + ) + assert str(exception_info.value) == "HTTP error 503: no signed block source" + + async def test_state_fetch_failure_propagates(self) -> None: + """Network errors on the state fetch surface as CheckpointSyncError.""" + checkpoint_state = make_genesis_state(num_validators=3, genesis_time=1000) + local_genesis = GenesisConfig.model_validate( + {"GENESIS_TIME": 1000, "GENESIS_VALIDATORS": []} + ) + + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=_signed_genesis_block(checkpoint_state), + ), patch( "lean_spec.node.anchor.fetch_finalized_state", new_callable=AsyncMock, @@ -111,14 +160,22 @@ async def test_success_builds_store_and_status(self) -> None: """Successful checkpoint sync produces a populated anchor.""" genesis_time = 1000 checkpoint_state = make_genesis_state(num_validators=3, genesis_time=genesis_time) + signed_block = _signed_genesis_block(checkpoint_state) local_genesis = GenesisConfig.model_validate( {"GENESIS_TIME": genesis_time, "GENESIS_VALIDATORS": []} ) - with patch( - "lean_spec.node.anchor.fetch_finalized_state", - new_callable=AsyncMock, - return_value=checkpoint_state, + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_state", + new_callable=AsyncMock, + return_value=checkpoint_state, + ), + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=signed_block, + ), ): anchor = await Anchor.from_checkpoint( url="http://localhost:5052", @@ -130,3 +187,36 @@ async def test_success_builds_store_and_status(self) -> None: assert anchor.store is not None assert anchor.validators == checkpoint_state.validators assert anchor.initial_status.finalized == anchor.store.latest_finalized + # The anchor is keyed by the fetched block's root, so the store's + # finalized checkpoint matches the root the network agrees on. + assert anchor.store.latest_finalized.root == hash_tree_root(signed_block.block) + + async def test_block_state_pairing_mismatch_raises(self) -> None: + """A block not matching the fetched state raises instead of falling back.""" + genesis_time = 1000 + checkpoint_state = make_genesis_state(num_validators=3, genesis_time=genesis_time) + other_state = make_genesis_state(num_validators=4, genesis_time=genesis_time) + mismatched_block = _signed_genesis_block(other_state) + local_genesis = GenesisConfig.model_validate( + {"GENESIS_TIME": genesis_time, "GENESIS_VALIDATORS": []} + ) + + with ( + patch( + "lean_spec.node.anchor.fetch_finalized_state", + new_callable=AsyncMock, + return_value=checkpoint_state, + ), + patch( + "lean_spec.node.anchor.fetch_finalized_block", + new_callable=AsyncMock, + return_value=mismatched_block, + ), + pytest.raises(CheckpointSyncError, match="mismatch"), + ): + await Anchor.from_checkpoint( + url="http://localhost:5052", + genesis=local_genesis, + fork=LstarSpec(), + validator_index=None, + )