diff --git a/src/lean_spec/node/chain/service.py b/src/lean_spec/node/chain/service.py index 0848eea16..d432edbd5 100644 --- a/src/lean_spec/node/chain/service.py +++ b/src/lean_spec/node/chain/service.py @@ -1,24 +1,4 @@ -""" -Chain service that drives consensus timing. - -Ethereum consensus runs on a clock. Every 4 seconds (1 slot), validators act -at each of the 5 intervals: - -- Interval 0: Block proposal -- Interval 1: Vote propagation -- Interval 2: Aggregation -- Interval 3: Safe target update -- Interval 4: Attestation acceptance into fork choice - -The Store has all this logic built in. But nothing drives the clock. -ChainService is that driver — a simple timer loop: - -1. Sleep until next interval boundary -2. Get current wall-clock time -3. Tick the store forward to current time -4. Update the sync service with the new store state -5. Repeat forever -""" +"""Chain service that drives consensus timing.""" from __future__ import annotations @@ -36,18 +16,7 @@ @dataclass(slots=True) class ChainService: - """ - Drives the consensus clock by periodically ticking the forkchoice store. - - ChainService is the heartbeat of a consensus client. It ensures time - advances in the Store, triggering interval-specific actions like - attestation acceptance and safe target updates. - - The service is intentionally minimal: - - Timer loop that wakes every interval - - Ticks the store forward to current time - - Updates the sync service's store reference - """ + """Drives the consensus clock by periodically ticking the fork choice store.""" sync_service: SyncService """Sync service whose store we tick.""" @@ -56,77 +25,49 @@ class ChainService: """Clock for time calculation.""" spec: LstarSpec = field(default_factory=LstarSpec) - """Fork spec driving consensus methods. Default lets tests skip wiring.""" + """Fork spec driving consensus methods.""" _running: bool = field(default=False, repr=False) """Whether the service is running.""" async def run(self) -> None: - """ - Main loop - tick the store every interval. - - This is the core of the chain service. It runs forever, sleeping - until each interval boundary and then advancing the store's time. - - The loop continues until the service is stopped. - - NOTE: We track the last handled interval to avoid skipping intervals. - If processing takes time and we end up in a new interval, we - handle it immediately instead of sleeping past it. - """ + """Tick the store forward at each interval boundary, until stopped.""" self._running = True - # Catch up store time to current wall clock (post-genesis only). - # - # - Before genesis, returns None; the main loop handles the wait. - # - After genesis, this ensures attestation validation accepts valid attestations - # (the store's time would otherwise lag behind wall-clock). + # Catch up store time to current wall clock. + # - Before genesis this returns nothing; the loop handles the wait. + # - After genesis this keeps attestation validation from rejecting valid votes. last_handled_total_interval = await self._initial_tick() while self._running: - # Get current wall-clock time. - current_time = self.clock.current_time() - genesis_time = self.clock.genesis_time - - # Wait for genesis if we're before it. - if current_time < genesis_time: - sleep_duration = int(genesis_time) - int(current_time) - await asyncio.sleep(sleep_duration) + # Wait for genesis if we are before it. + # The clock sleeps exactly until genesis when called before it. + if self.clock.current_time() < self.clock.genesis_time: + await self.clock.sleep_until_next_interval() continue - # Get current total interval count. total_interval = self.clock.total_intervals() - # If we've already handled this interval, sleep until the next boundary. + # Already handled this interval: sleep to the next boundary. already_handled = ( last_handled_total_interval is not None and total_interval <= last_handled_total_interval ) if already_handled: await self.clock.sleep_until_next_interval() - # Check if stopped during sleep. if not self._running: break - # Re-fetch interval after sleep. - # - # If still the same (e.g., time didn't advance), - # skip this iteration to avoid duplicate ticks. + # Time may not have advanced during the sleep. + # Skip this iteration to avoid ticking the same interval twice. total_interval = self.clock.total_intervals() if total_interval <= last_handled_total_interval: continue - # Tick the store forward to current interval. - # - # The store advances time interval by interval, performing - # appropriate actions at each interval. - # - # This minimal service does not produce blocks. - # Block production requires validator keys. + # Advance the store to the current interval. + # This service never proposes; block production needs validator keys. new_aggregated_attestations = await self._tick_to(total_interval) - # Publish any new aggregated attestations produced this tick. - # - # No publisher is wired in tests and offline runs. + # No publisher is wired in tests or offline runs, so guard on its presence. publish = self.sync_service.publish_aggregated_attestation if new_aggregated_attestations and publish is not None: for aggregate in new_aggregated_attestations: @@ -140,39 +81,36 @@ async def run(self) -> None: self.sync_service.store.latest_finalized.slot, ) - # Mark this interval as handled. last_handled_total_interval = total_interval async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]: """ - Advance store to target interval with skip and yield. - - When the node falls behind by more than one slot, stale intervals - are skipped. Processing every missed interval synchronously would - block the event loop, starving gossip and causing the node to fall - further behind. + Advance the store to the target interval, skipping stale work and yielding. - Between each remaining interval tick, yields to the event loop so - gossip messages can be processed. + When the node falls behind by more than one slot, stale intervals are skipped. + Processing every missed interval synchronously blocks the event loop. + That starves gossip and pushes the node further behind. - Updates the sync service's store after each tick so concurrent - gossip handlers see current time. + Between remaining ticks, yield so gossip messages can be processed. + Update the sync service store after each tick so gossip handlers see current time. Returns aggregated attestations produced during the ticks. """ store = self.sync_service.store all_new_aggregates: list[SignedAggregatedAttestation] = [] + # The target comes from wall clock, so it never moves time backward. + assert target_interval >= store.time + # Skip stale intervals when falling behind. - # # Jump to the last full slot boundary before the target. - # The final slot's worth of intervals still runs normally so that - # aggregation, safe target, and attestation acceptance happen. - intervals_per_slot = Interval(int(INTERVALS_PER_SLOT)) - gap = target_interval - store.time - if gap > intervals_per_slot: - store.time = target_interval - intervals_per_slot - self.sync_service.store = store + # The final slot still runs normally. + # That preserves aggregation, safe target, and attestation acceptance. + # + # Acceptance for the jumped slots waits for the final slot's tick. + # That is safe: acceptance is a monotone pool merge, and the head recomputes from scratch. + if target_interval - store.time > Interval(INTERVALS_PER_SLOT): + store.time = target_interval - Interval(INTERVALS_PER_SLOT) # Tick remaining intervals one at a time. while store.time < target_interval: @@ -185,29 +123,21 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte self.sync_service.store = store # Yield to the event loop so gossip handlers can run. - # Re-read store afterward: a gossip handler may have added - # blocks or attestations during the yield. + # Re-read the store: a handler may have added blocks or attestations. await asyncio.sleep(0) store = self.sync_service.store return all_new_aggregates async def _initial_tick(self) -> Interval | None: - """ - Perform initial tick to catch up store time to current wall clock. - - This is called once at startup to ensure the store's time reflects - actual wall clock time, not just the genesis anchor time. - - Returns the interval that was handled, or None if before genesis. - """ + """Catch up store time to wall clock at startup.""" current_time = self.clock.current_time() - # Only tick if we're past genesis. + # Only tick once past genesis. if current_time >= self.clock.genesis_time: target_interval = self.clock.total_intervals() - # Use _tick_to for skip + yield during catch-up. + # Reuse the skip-and-yield path for catch-up. # Discard aggregated attestations from catch-up. # During initial sync we may be many slots behind. # Publishing stale aggregations would spam the network. @@ -221,12 +151,11 @@ def stop(self) -> None: """ Stop the service. - Sets the running flag to False, causing the run() loop to exit - after completing its current sleep cycle. + The loop exits after its current sleep cycle finishes. """ self._running = False @property def is_running(self) -> bool: - """Check if the service is currently running.""" + """Whether the service is currently running.""" return self._running diff --git a/tests/lean_spec/node/chain/test_service.py b/tests/lean_spec/node/chain/test_service.py index f1295f06b..761a56ac3 100644 --- a/tests/lean_spec/node/chain/test_service.py +++ b/tests/lean_spec/node/chain/test_service.py @@ -2,625 +2,362 @@ from __future__ import annotations -from dataclasses import dataclass, field +from collections.abc import Callable, Coroutine +from dataclasses import dataclass from unittest.mock import patch +import pytest + from lean_spec.node.chain import SlotClock from lean_spec.node.chain.service import ChainService -from lean_spec.spec.forks import Interval, Slot -from lean_spec.spec.forks.lstar.config import MILLISECONDS_PER_INTERVAL -from lean_spec.spec.forks.lstar.containers import SignedAggregatedAttestation -from lean_spec.spec.ssz import ZERO_HASH, Bytes32, Uint64 -from tests.lean_spec.helpers.mocks import StoreInterceptingSpec - - -@dataclass -class MockCheckpoint: - """Mock checkpoint for the latest_finalized attribute.""" - - slot: Slot = field(default_factory=lambda: Slot(0)) +from lean_spec.spec.forks import Checkpoint, Interval, LstarSpec, Slot +from lean_spec.spec.forks.lstar.config import INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL +from lean_spec.spec.forks.lstar.containers import ( + AggregationBits, + AttestationData, + SignedAggregatedAttestation, + SingleMessageAggregate, + Store, +) +from lean_spec.spec.ssz import Boolean, ByteList512KiB, Bytes32, Uint64 +from tests.lean_spec.helpers.builders import make_store + +# One interval lasts this many wall-clock seconds. +# +# Tests express time as a multiple of it so the millisecond math stays readable. +INTERVAL_SECONDS = float(MILLISECONDS_PER_INTERVAL) / 1000.0 + + +def make_aggregate(seed: int) -> SignedAggregatedAttestation: + """Build a distinct aggregated attestation cheaply, without signing keys.""" + # The driver treats the aggregate as an opaque payload. + # The seed only makes separate instances compare unequal. + return SignedAggregatedAttestation( + data=AttestationData( + slot=Slot(2), + head=Checkpoint(root=Bytes32.zero(), slot=Slot(2)), + target=Checkpoint(root=Bytes32.zero(), slot=Slot(2)), + source=Checkpoint(root=Bytes32.zero(), slot=Slot(0)), + ), + proof=SingleMessageAggregate( + participants=AggregationBits(data=[Boolean(True)]), + proof=ByteList512KiB(data=bytes([seed])), + ), + ) + + +class ProbeSpec(LstarSpec): + """Real spec that records each tick so a test can see what the driver did.""" + + def __init__(self, emit: list[SignedAggregatedAttestation] | None = None) -> None: + """Begin with an empty tick log and remember the aggregates to emit, if any.""" + super().__init__() + self.ticks: list[tuple[int, bool, bool]] = [] + self.emit = emit or [] + + def tick_interval( # type: ignore[override] + self, store: Store, has_proposal: bool, is_aggregator: bool = False + ) -> tuple[Store, list[SignedAggregatedAttestation]]: + """Advance the real store, record the call, then emit at the aggregation interval.""" + # Delegate to the real spec so the store advances exactly as in production. + # Discard its real aggregates; this stub substitutes its own below. + store, _ = super().tick_interval(store, has_proposal, is_aggregator) + + # Record after the tick, so the logged time is the post-tick value. + self.ticks.append((int(store.time), has_proposal, is_aggregator)) + + # A real aggregator builds its proof at interval 2 of each slot. + at_aggregation_interval = int(store.time) % int(INTERVALS_PER_SLOT) == 2 + + # Emit the configured aggregates only there, and nothing on other intervals. + return store, self.emit if at_aggregation_interval else [] + + +class PublishRecorder: + """Async publisher that records every aggregate the driver hands it.""" + + def __init__(self) -> None: + """Begin with an empty record of received aggregates.""" + self.received: list[SignedAggregatedAttestation] = [] + + async def __call__(self, aggregate: SignedAggregatedAttestation) -> None: + """Record one published aggregate.""" + self.received.append(aggregate) @dataclass -class MockStore: - """Mock store that tracks tick_interval calls.""" - - time: Interval = field(default_factory=lambda: Interval(0)) - tick_calls: list[tuple[Interval, bool]] = field(default_factory=list) - head: Bytes32 = field(default_factory=lambda: ZERO_HASH) - latest_finalized: MockCheckpoint = field(default_factory=MockCheckpoint) - - def tick_interval( - self, has_proposal: bool, is_aggregator: bool = False - ) -> tuple[MockStore, list]: - """Record the tick call, advance time by one interval, and return a new store.""" - new_time = self.time + Interval(1) - new_store = MockStore( - time=new_time, - tick_calls=[*self.tick_calls, (new_time, has_proposal)], - head=self.head, - latest_finalized=self.latest_finalized, - ) - return new_store, [] +class SyncServiceStub: + """Minimal stand-in exposing only what the driver reads from the sync service.""" + store: Store + """Forkchoice store the driver ticks forward.""" -@dataclass -class MockSyncService: - """Mock sync service for testing ChainService.""" - - store: MockStore = field(default_factory=MockStore) is_aggregator: bool = False - published_aggregations: list = field(default_factory=list) - - async def publish_aggregated_attestation(self, aggregate: SignedAggregatedAttestation) -> None: - """Record published aggregations.""" - self.published_aggregations.append(aggregate) - - -class TestChainServiceLifecycle: - """Tests for ChainService start/stop lifecycle.""" - - def test_starts_not_running(self) -> None: - """ - Service initializes in stopped state. - - The running flag prevents accidental double-starts and enables graceful shutdown. - """ - sync_service = MockSyncService() - clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: 0.0) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - assert chain_service.is_running is False - - def test_stop_sets_flag(self) -> None: - """ - stop() transitions running flag from True to False. - - This allows the run loop to exit gracefully at the next sleep boundary. - """ - sync_service = MockSyncService() - clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: 0.0) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - chain_service._running = True - assert chain_service.is_running is True - - chain_service.stop() - assert chain_service.is_running is False - - async def test_run_sets_running_flag(self) -> None: - """ - run() sets the running flag before entering the main loop. - - This ensures is_running reflects actual state during execution. - """ - sync_service = MockSyncService() - clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: 0.0) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - call_count = 0 - - async def stop_on_second_call(_duration: float) -> None: - nonlocal call_count - call_count += 1 - if call_count >= 2: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_on_second_call): - await chain_service.run() - - # After stopping, flag should be False. - assert chain_service.is_running is False - - -class TestIntervalTiming: - """Tests for interval boundary timing.""" - - async def test_sleep_calculation_mid_interval(self) -> None: - """ - Mid-interval sleep calculation ensures wakeup at next boundary. - - Precise boundary alignment is critical for coordinated validator actions. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - # Halfway into first interval. - current_time = float(genesis) + interval_secs / 2 - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - captured_duration: float | None = None - - async def capture_sleep(duration: float) -> None: - nonlocal captured_duration - captured_duration = duration - - with patch("asyncio.sleep", new=capture_sleep): - await chain_service.clock.sleep_until_next_interval() - - # Should sleep until next interval boundary. - expected = float(genesis) + interval_secs - current_time - assert captured_duration is not None - assert abs(captured_duration - expected) < 0.002 # floating-point tolerance - - async def test_sleep_at_interval_boundary(self) -> None: - """ - When clock reads exactly at interval boundary, sleep is one full interval. - - This tests the math of the sleep calculation, not real-world timing. - """ - genesis = Uint64(1000) - # Clock reads exactly at first interval boundary. - current_time = float(genesis + (MILLISECONDS_PER_INTERVAL // Uint64(1000))) - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - captured_duration: float | None = None - - async def capture_sleep(duration: float) -> None: - nonlocal captured_duration - captured_duration = duration - - with patch("asyncio.sleep", new=capture_sleep): - await chain_service.clock.sleep_until_next_interval() - - # At boundary, next boundary is one full interval away. - expected = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - assert captured_duration is not None - assert abs(captured_duration - expected) < 0.001 - - async def test_sleep_before_genesis(self) -> None: - """ - Before genesis, sleeps until genesis time. - - The network cannot produce valid blocks or attestations pre-genesis. - """ - genesis = Uint64(1000) - current_time = 900.0 # 100 seconds before genesis - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - captured_duration: float | None = None - - async def capture_sleep(duration: float) -> None: - nonlocal captured_duration - captured_duration = duration - - with patch("asyncio.sleep", new=capture_sleep): - await chain_service.clock.sleep_until_next_interval() - - # Should sleep until genesis. - expected = float(genesis) - current_time - assert captured_duration is not None - assert abs(captured_duration - expected) < 0.001 - - -class TestStoreTicking: - """Tests for store tick integration.""" - - async def test_ticks_store_with_current_interval(self) -> None: - """ - Store receives the current interval count on tick. - - The chain service passes intervals (not seconds) so the store - can advance time without lossy seconds→intervals conversion. - """ - genesis = Uint64(1000) - # 5 intervals after genesis = 5 * 800ms = 4.0 seconds. - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - current_time = float(genesis) + 5 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - call_count = 0 - - async def stop_on_second_call(_duration: float) -> None: - nonlocal call_count - call_count += 1 - if call_count >= 2: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_on_second_call): - await chain_service.run() - - # Initial tick handles all 5 intervals (0→1, 1→2, ..., 4→5). - # Main loop recognizes the interval was handled and waits. - expected_ticks = [(Interval(i), False) for i in range(1, 6)] - assert sync_service.store.tick_calls == expected_ticks - - async def test_has_proposal_always_false(self) -> None: - """ - has_proposal is always False for this minimal service. - - Block production requires validator keys, which this service does not handle. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - current_time = float(genesis) + 5 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - tick_count = 0 - - async def stop_after_three(_duration: float) -> None: - nonlocal tick_count - tick_count += 1 - if tick_count >= 3: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_after_three): - await chain_service.run() - - # All ticks have has_proposal=False. - assert all(proposal is False for _, proposal in sync_service.store.tick_calls) - - async def test_sync_service_store_updated(self) -> None: - """ - SyncService.store is replaced with new store after each tick. - - The Store uses immutable updates, so each tick creates a new instance. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - current_time = float(genesis) + 5 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - initial_store = MockStore() - sync_service = MockSyncService(store=initial_store) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - - # No ticks before our first run. - assert sync_service.store.tick_calls == [] - - async def stop_immediately(_duration: float) -> None: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_immediately): - await chain_service.run() - - # Store should have been replaced. - assert sync_service.store is not initial_store - - # Initial tick handles all 5 intervals. - assert sync_service.store.time == Interval(5) - - -class TestMultipleIntervals: - """Tests for running through multiple intervals.""" - - async def test_advances_through_intervals(self) -> None: - """ - Service advances through multiple intervals correctly. - - Each interval triggers a store tick with the current time. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - # 4 consecutive interval times. - times = [ - float(genesis) + 1 * interval_secs, - float(genesis) + 2 * interval_secs, - float(genesis) + 3 * interval_secs, - float(genesis) + 4 * interval_secs, - ] - time_index = 0 - - def advancing_time() -> float: - nonlocal time_index - if time_index < len(times): - return times[time_index] - return times[-1] - - clock = SlotClock(genesis_time=genesis, time_fn=advancing_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), + """Whether the node acts as an aggregator.""" + + publish_aggregated_attestation: ( + Callable[[SignedAggregatedAttestation], Coroutine[None, None, None]] | None + ) = None + """Callback for produced aggregates, or nothing when no publisher is wired.""" + + +def make_service( + spec: ProbeSpec, + *, + genesis_seconds: int = 1000, + time_fn: Callable[[], float] | None = None, + is_aggregator: bool = False, + publisher: Callable[[SignedAggregatedAttestation], Coroutine[None, None, None]] | None = None, +) -> ChainService: + """ + Assemble a chain service over a real store with a controllable clock. + + The clock defaults to a fixed reading at genesis. + That suits tests calling the catch-up helpers directly, which never advance time. + + Args: + spec: Recording spec the driver ticks each interval. + genesis_seconds: Genesis time in seconds, shared by the store and the clock. + time_fn: Wall-clock source; defaults to a fixed reading at genesis. + is_aggregator: Whether the node acts as an aggregator. + publisher: Callback for produced aggregates, or nothing to drop them. + + Returns: + A chain service wired to the stub sync service and the clock. + """ + sync_service = SyncServiceStub( + store=make_store(genesis_time=genesis_seconds), + is_aggregator=is_aggregator, + publish_aggregated_attestation=publisher, + ) + clock = SlotClock( + genesis_time=Uint64(genesis_seconds), + time_fn=time_fn or (lambda: float(genesis_seconds)), + ) + # The stub supplies the members the driver uses, but is not a nominal sync service. + return ChainService(sync_service=sync_service, clock=clock, spec=spec) # type: ignore[arg-type] + + +class TestCatchUp: + """Tests for the catch-up helper that walks the store to a target interval.""" + + @pytest.mark.parametrize( + ("target", "expected_tick_times"), + [ + # Already at the target: nothing to do. + (0, []), + # Within one slot: every interval ticks. + (3, [1, 2, 3]), + # Exactly one slot behind: still no skip. + (5, [1, 2, 3, 4, 5]), + # Just over one slot: the first interval is skipped. + (6, [2, 3, 4, 5, 6]), + # Four slots behind: whole stale slots are skipped, only the last slot ticks. + (20, [16, 17, 18, 19, 20]), + # Twenty slots behind: the skip math holds at scale, still one slot of ticks. + (100, [96, 97, 98, 99, 100]), + ], + ) + async def test_advances_and_skips_stale_intervals( + self, target: int, expected_tick_times: list[int] + ) -> None: + """Catch-up reaches the target, skipping stale intervals beyond one slot.""" + spec = ProbeSpec() + service = make_service(spec) + await service._tick_to(Interval(target)) + assert [time for time, _, _ in spec.ticks] == expected_tick_times + assert int(service.sync_service.store.time) == target + + async def test_never_proposes_and_forwards_aggregator_flag(self) -> None: + """Each tick reports no proposal and forwards the configured aggregator flag.""" + spec = ProbeSpec() + service = make_service(spec, is_aggregator=True) + await service._tick_to(Interval(3)) + assert spec.ticks == [(1, False, True), (2, False, True), (3, False, True)] + + async def test_rejects_a_target_before_the_current_time(self) -> None: + """Catch-up refuses a target earlier than where the store already sits.""" + service = make_service(ProbeSpec()) + service.sync_service.store.time = Interval(5) + with pytest.raises(AssertionError): + await service._tick_to(Interval(3)) + + async def test_continues_on_a_store_swapped_in_during_the_yield(self) -> None: + """A store replaced mid-catch-up is picked up on the next tick, not the stale one.""" + service = make_service(ProbeSpec()) + + # Stand in for a gossip handler that processes a block during the yield. + # It installs a fresh store object, which the driver must continue from. + swapped = make_store(genesis_time=1000) + swapped.time = Interval(1) + swap = {"done": False} + + async def swap_store_once(duration: float) -> None: + if not swap["done"]: + swap["done"] = True + service.sync_service.store = swapped + + with patch("asyncio.sleep", new=swap_store_once): + await service._tick_to(Interval(3)) + + # Reaching the target on the swapped object proves the post-yield re-read. + assert service.sync_service.store is swapped + assert int(swapped.time) == 3 + + +class TestStartupTick: + """Tests for the one-shot catch-up performed at startup.""" + + @pytest.mark.parametrize( + ("intervals_elapsed", "expected_result", "expected_tick_times"), + [ + # Before genesis: nothing ticks, the caller waits. + (-0.5, None, []), + # Exactly at genesis: the store already sits at the anchor. + (0.0, Interval(0), []), + # One slot in: five ticks, no skip. + (5.0, Interval(5), [1, 2, 3, 4, 5]), + # Four slots behind: stale intervals are skipped. + (20.0, Interval(20), [16, 17, 18, 19, 20]), + ], + ) + async def test_catches_store_up_to_wall_clock( + self, + intervals_elapsed: float, + expected_result: Interval | None, + expected_tick_times: list[int], + ) -> None: + """Startup ticks the store to the current interval, or reports waiting if pre-genesis.""" + spec = ProbeSpec() + now = 1000 + intervals_elapsed * INTERVAL_SECONDS + service = make_service(spec, time_fn=lambda: now) + result = await service._initial_tick() + assert result == expected_result + assert [time for time, _, _ in spec.ticks] == expected_tick_times + + async def test_discards_aggregates_produced_during_catch_up(self) -> None: + """Aggregates emitted while catching up are dropped, not published.""" + recorder = PublishRecorder() + service = make_service( + ProbeSpec(emit=[make_aggregate(1)]), + time_fn=lambda: 1000 + 5 * INTERVAL_SECONDS, + is_aggregator=True, + publisher=recorder, ) + await service._initial_tick() + assert recorder.received == [] - async def advance_and_stop(_duration: float) -> None: - nonlocal time_index - time_index += 1 - if time_index >= len(times): - chain_service.stop() - - with patch("asyncio.sleep", new=advance_and_stop): - await chain_service.run() - - # Initial tick at interval 1, then main loop ticks at 2, 3, 4. - # Each _tick_to call ticks exactly one interval (gap=1 each time). - assert sync_service.store.tick_calls == [ - (Interval(1), False), - (Interval(2), False), - (Interval(3), False), - (Interval(4), False), - ] - - -class TestInitialTick: - """Tests for the initial tick behavior at startup.""" - - async def test_initial_tick_skipped_before_genesis(self) -> None: - """ - Initial tick is a no-op when current time is before genesis. - - The store time should not advance before the network starts. - This ensures the main loop correctly handles the genesis wait. - """ - genesis = Uint64(1000) - current_time = 900.0 # Before genesis - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - initial_store = MockStore() - sync_service = MockSyncService(store=initial_store) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - # Run just the initial tick without the full run loop. - await chain_service._initial_tick() - - # Store should not have been ticked. - assert sync_service.store is initial_store - assert sync_service.store.tick_calls == [] - - async def test_initial_tick_executed_after_genesis(self) -> None: - """ - Initial tick advances store time when past genesis. - - This ensures attestation validation works immediately on startup. - """ - genesis = Uint64(1000) - # 5 intervals after genesis = 5 * 800ms = 4.0 seconds. - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - current_time = float(genesis) + 5 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - initial_store = MockStore() - sync_service = MockSyncService(store=initial_store) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) +@dataclass +class IntervalClock: + """Wall clock that advances one interval each time the loop waits.""" - await chain_service._initial_tick() - - # Store should have been replaced and ticked through all 5 intervals. - assert sync_service.store is not initial_store - assert sync_service.store.time == Interval(5) - assert len(sync_service.store.tick_calls) == 5 - - async def test_initial_tick_at_exact_genesis(self) -> None: - """ - Initial tick is executed when current time equals genesis. - - At genesis, the network is active and the store should be initialized. - """ - genesis = Uint64(1000) - current_time = float(genesis) # Exactly at genesis - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - initial_store = MockStore() - sync_service = MockSyncService(store=initial_store) - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) + genesis_seconds: int + interval: int - await chain_service._initial_tick() - - # At interval 0, no ticks needed (store already at time=0). - assert sync_service.store.time == Interval(0) - assert sync_service.store.tick_calls == [] - - async def test_initial_tick_skips_stale_intervals(self) -> None: - """ - Initial tick skips stale intervals when far behind genesis. - - When the gap exceeds one slot, only the last slot's worth of - intervals is processed. This prevents event loop starvation. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - # 20 intervals after genesis (4 full slots). - current_time = float(genesis) + 20 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) + def __call__(self) -> float: + # Land mid-interval so floating-point jitter never crosses a boundary. + return self.genesis_seconds + (self.interval + 0.5) * INTERVAL_SECONDS - await chain_service._initial_tick() - - # Gap=20 > INTERVALS_PER_SLOT(5), so skip to interval 15. - # Only last 5 intervals are ticked (15→16, ..., 19→20). - assert sync_service.store.time == Interval(20) - assert len(sync_service.store.tick_calls) == 5 - assert sync_service.store.tick_calls[0] == (Interval(16), False) - assert sync_service.store.tick_calls[-1] == (Interval(20), False) - - -class TestIntervalTracking: - """Tests for the last_handled_total_interval tracking logic.""" - - async def test_does_not_reprocess_same_interval(self) -> None: - """ - Same interval is not processed twice when processing is fast. - - The last_handled_total_interval tracks which interval was last processed - to prevent duplicate ticks if the service finishes before the next boundary. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - # Halfway into second interval (stays constant). - # 1.5 intervals * 800ms = 1200ms. total_intervals = 1200 // 800 = 1. - current_time = float(genesis) + interval_secs + interval_secs / 2 - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) + def advance(self) -> None: + """Move the clock forward by one interval.""" + self.interval += 1 - sleep_call_count = 0 - async def count_sleeps_and_stop(_duration: float) -> None: - nonlocal sleep_call_count - sleep_call_count += 1 - # After several sleep calls, stop the service. - # If deduplication works, we should see sleeps but not extra ticks. - if sleep_call_count >= 3: - chain_service.stop() +async def run_for_waits(service: ChainService, *, waits: int = 1) -> None: + """Run the loop with a frozen clock, stopping after the given number of boundary waits.""" + count = 0 - with patch("asyncio.sleep", new=count_sleeps_and_stop): - await chain_service.run() + async def stop_after_waits(duration: float) -> None: + nonlocal count + # Zero-duration sleeps are the tick loop yielding, not interval boundaries. + if duration <= 0: + return + count += 1 + if count >= waits: + service.stop() - # Only the initial tick happens (one interval: 0→1). - # The interval tracking prevents redundant ticks for the same interval. - assert sync_service.store.tick_calls == [(Interval(1), False)] + with patch("asyncio.sleep", new=stop_after_waits): + await service.run() -class TestEdgeCases: - """Tests for edge cases and boundary conditions.""" +async def run_advancing( + service: ChainService, clock: IntervalClock, *, stop_at_interval: int +) -> None: + """Run the loop, advancing the clock one interval per wait, until the target interval.""" - async def test_genesis_time_zero(self) -> None: - """ - Works correctly with genesis_time of 0. + async def advance_then_maybe_stop(duration: float) -> None: + # Ignore the zero-duration yields inside the tick loop. + if duration <= 0: + return + clock.advance() + if clock.interval >= stop_at_interval: + service.stop() - This tests the boundary condition of Unix epoch as genesis. - """ - genesis = Uint64(0) - current_time = 5 * (float(MILLISECONDS_PER_INTERVAL) / 1000.0) + with patch("asyncio.sleep", new=advance_then_maybe_stop): + await service.run() - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) - async def stop_immediately(_duration: float) -> None: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_immediately): - await chain_service.run() - - # Initial tick advances through 5 intervals. - assert sync_service.store.time == Interval(5) - assert len(sync_service.store.tick_calls) == 5 - - async def test_large_genesis_time(self) -> None: - """ - Works with realistic Unix timestamp genesis times. - - Tests that large integer arithmetic works correctly. - """ - genesis = Uint64(1700000000) # Nov 2023 - # 100 intervals = 80s, plus 0.5s mid-interval offset. - # total_intervals = int(80.5 * 1000) // 800 = 80500 // 800 = 100. - current_time = float(genesis) + 100 * (float(MILLISECONDS_PER_INTERVAL) / 1000.0) + 0.5 - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), - ) +class TestRunLoop: + """Tests for the run loop that drives catch-up at each interval boundary.""" - async def stop_immediately(_duration: float) -> None: - chain_service.stop() - - with patch("asyncio.sleep", new=stop_immediately): - await chain_service.run() - - # Gap=100 > INTERVALS_PER_SLOT(5), so stale intervals are skipped. - # Only the last 5 intervals are ticked (96→97, ..., 99→100). - assert sync_service.store.time == Interval(100) - assert len(sync_service.store.tick_calls) == 5 - - async def test_stop_during_sleep(self) -> None: - """ - Service exits cleanly when stopped during sleep. - - The running flag is checked after each sleep to enable graceful shutdown. - """ - genesis = Uint64(1000) - interval_secs = float(MILLISECONDS_PER_INTERVAL) / 1000.0 - current_time = float(genesis) + 5 * interval_secs - - clock = SlotClock(genesis_time=genesis, time_fn=lambda: current_time) - sync_service = MockSyncService() - chain_service = ChainService( - sync_service=sync_service, # type: ignore[arg-type] - clock=clock, - spec=StoreInterceptingSpec(), + def test_starts_not_running(self) -> None: + """A freshly built service reports that it is not running.""" + assert make_service(ProbeSpec()).is_running is False + + def test_stop_clears_running_flag(self) -> None: + """Stopping a running service flips the flag back to not running.""" + service = make_service(ProbeSpec()) + service._running = True + assert service.is_running is True + service.stop() + assert service.is_running is False + + async def test_run_then_stop_leaves_not_running(self) -> None: + """The run loop clears the running flag once stopped.""" + service = make_service(ProbeSpec(), time_fn=lambda: 1000 + INTERVAL_SECONDS) + await run_for_waits(service) + assert service.is_running is False + + async def test_does_not_tick_before_genesis(self) -> None: + """Before genesis the loop only waits, never ticking the store.""" + spec = ProbeSpec() + service = make_service(spec, time_fn=lambda: 900.0) + await run_for_waits(service) + assert spec.ticks == [] + + async def test_does_not_retick_an_already_handled_interval(self) -> None: + """A frozen clock past startup yields repeated waits but no extra ticks.""" + spec = ProbeSpec() + # 1.5 intervals in: the total interval count truncates to 1. + service = make_service(spec, time_fn=lambda: 1000 + 1.5 * INTERVAL_SECONDS) + await run_for_waits(service, waits=3) + assert [time for time, _, _ in spec.ticks] == [1] + + async def test_waits_for_genesis_then_starts_ticking(self) -> None: + """A service started before genesis ticks once the clock crosses genesis.""" + spec = ProbeSpec() + # Interval -1 sits before genesis; each wait advances toward and past it. + clock = IntervalClock(genesis_seconds=1000, interval=-1) + service = make_service(spec, time_fn=clock) + await run_advancing(service, clock, stop_at_interval=2) + assert [time for time, _, _ in spec.ticks] == [1] + + async def test_publishes_aggregates_produced_in_the_loop(self) -> None: + """Aggregates produced by a steady-state tick reach the publisher in order.""" + recorder = PublishRecorder() + first, second = make_aggregate(1), make_aggregate(2) + clock = IntervalClock(genesis_seconds=1000, interval=1) + service = make_service( + ProbeSpec(emit=[first, second]), time_fn=clock, is_aggregator=True, publisher=recorder ) - - async def stop_during_sleep(_duration: float) -> None: - # Simulate stop being called while sleeping. - chain_service.stop() - - with patch("asyncio.sleep", new=stop_during_sleep): - await chain_service.run() - - # Service should have stopped cleanly. - assert chain_service.is_running is False - - # Initial tick handles all 5 intervals even though stop is called - # during the yield sleeps (stop only checked in main loop). - assert sync_service.store.time == Interval(5) + await run_advancing(service, clock, stop_at_interval=3) + assert recorder.received == [first, second] + + async def test_tolerates_missing_publisher(self) -> None: + """A produced aggregate with no publisher wired does not raise.""" + spec = ProbeSpec(emit=[make_aggregate(1)]) + clock = IntervalClock(genesis_seconds=1000, interval=1) + service = make_service(spec, time_fn=clock, is_aggregator=True, publisher=None) + await run_advancing(service, clock, stop_at_interval=3) + assert [time for time, _, _ in spec.ticks] == [1, 2]