Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 39 additions & 110 deletions src/lean_spec/node/chain/service.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,4 @@
"""
Chain service that drives consensus timing.

Ethereum consensus runs on a clock. Every 4 seconds (1 slot), validators act
at each of the 5 intervals:

- Interval 0: Block proposal
- Interval 1: Vote propagation
- Interval 2: Aggregation
- Interval 3: Safe target update
- Interval 4: Attestation acceptance into fork choice

The Store has all this logic built in. But nothing drives the clock.
ChainService is that driver — a simple timer loop:

1. Sleep until next interval boundary
2. Get current wall-clock time
3. Tick the store forward to current time
4. Update the sync service with the new store state
5. Repeat forever
"""
"""Chain service that drives consensus timing."""

from __future__ import annotations

Expand All @@ -36,18 +16,7 @@

@dataclass(slots=True)
class ChainService:
"""
Drives the consensus clock by periodically ticking the forkchoice store.

ChainService is the heartbeat of a consensus client. It ensures time
advances in the Store, triggering interval-specific actions like
attestation acceptance and safe target updates.

The service is intentionally minimal:
- Timer loop that wakes every interval
- Ticks the store forward to current time
- Updates the sync service's store reference
"""
"""Drives the consensus clock by periodically ticking the fork choice store."""

sync_service: SyncService
"""Sync service whose store we tick."""
Expand All @@ -56,77 +25,49 @@ class ChainService:
"""Clock for time calculation."""

spec: LstarSpec = field(default_factory=LstarSpec)
"""Fork spec driving consensus methods. Default lets tests skip wiring."""
"""Fork spec driving consensus methods."""

_running: bool = field(default=False, repr=False)
"""Whether the service is running."""

async def run(self) -> None:
"""
Main loop - tick the store every interval.

This is the core of the chain service. It runs forever, sleeping
until each interval boundary and then advancing the store's time.

The loop continues until the service is stopped.

NOTE: We track the last handled interval to avoid skipping intervals.
If processing takes time and we end up in a new interval, we
handle it immediately instead of sleeping past it.
"""
"""Tick the store forward at each interval boundary, until stopped."""
self._running = True

# Catch up store time to current wall clock (post-genesis only).
#
# - Before genesis, returns None; the main loop handles the wait.
# - After genesis, this ensures attestation validation accepts valid attestations
# (the store's time would otherwise lag behind wall-clock).
# Catch up store time to current wall clock.
# - Before genesis this returns nothing; the loop handles the wait.
# - After genesis this keeps attestation validation from rejecting valid votes.
last_handled_total_interval = await self._initial_tick()

while self._running:
# Get current wall-clock time.
current_time = self.clock.current_time()
genesis_time = self.clock.genesis_time

# Wait for genesis if we're before it.
if current_time < genesis_time:
sleep_duration = int(genesis_time) - int(current_time)
await asyncio.sleep(sleep_duration)
# Wait for genesis if we are before it.
# The clock sleeps exactly until genesis when called before it.
if self.clock.current_time() < self.clock.genesis_time:
await self.clock.sleep_until_next_interval()
continue

# Get current total interval count.
total_interval = self.clock.total_intervals()

# If we've already handled this interval, sleep until the next boundary.
# Already handled this interval: sleep to the next boundary.
already_handled = (
last_handled_total_interval is not None
and total_interval <= last_handled_total_interval
)
if already_handled:
await self.clock.sleep_until_next_interval()
# Check if stopped during sleep.
if not self._running:
break
# Re-fetch interval after sleep.
#
# If still the same (e.g., time didn't advance),
# skip this iteration to avoid duplicate ticks.
# Time may not have advanced during the sleep.
# Skip this iteration to avoid ticking the same interval twice.
total_interval = self.clock.total_intervals()
if total_interval <= last_handled_total_interval:
continue

# Tick the store forward to current interval.
#
# The store advances time interval by interval, performing
# appropriate actions at each interval.
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
# Advance the store to the current interval.
# This service never proposes; block production needs validator keys.
new_aggregated_attestations = await self._tick_to(total_interval)

# Publish any new aggregated attestations produced this tick.
#
# No publisher is wired in tests and offline runs.
# No publisher is wired in tests or offline runs, so guard on its presence.
publish = self.sync_service.publish_aggregated_attestation
if new_aggregated_attestations and publish is not None:
for aggregate in new_aggregated_attestations:
Expand All @@ -140,39 +81,36 @@ async def run(self) -> None:
self.sync_service.store.latest_finalized.slot,
)

# Mark this interval as handled.
last_handled_total_interval = total_interval

async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]:
"""
Advance store to target interval with skip and yield.

When the node falls behind by more than one slot, stale intervals
are skipped. Processing every missed interval synchronously would
block the event loop, starving gossip and causing the node to fall
further behind.
Advance the store to the target interval, skipping stale work and yielding.

Between each remaining interval tick, yields to the event loop so
gossip messages can be processed.
When the node falls behind by more than one slot, stale intervals are skipped.
Processing every missed interval synchronously blocks the event loop.
That starves gossip and pushes the node further behind.

Updates the sync service's store after each tick so concurrent
gossip handlers see current time.
Between remaining ticks, yield so gossip messages can be processed.
Update the sync service store after each tick so gossip handlers see current time.

Returns aggregated attestations produced during the ticks.
"""
store = self.sync_service.store
all_new_aggregates: list[SignedAggregatedAttestation] = []

# The target comes from wall clock, so it never moves time backward.
assert target_interval >= store.time

# Skip stale intervals when falling behind.
#
# Jump to the last full slot boundary before the target.
# The final slot's worth of intervals still runs normally so that
# aggregation, safe target, and attestation acceptance happen.
intervals_per_slot = Interval(int(INTERVALS_PER_SLOT))
gap = target_interval - store.time
if gap > intervals_per_slot:
store.time = target_interval - intervals_per_slot
self.sync_service.store = store
# The final slot still runs normally.
# That preserves aggregation, safe target, and attestation acceptance.
#
# Acceptance for the jumped slots waits for the final slot's tick.
# That is safe: acceptance is a monotone pool merge, and the head recomputes from scratch.
if target_interval - store.time > Interval(INTERVALS_PER_SLOT):
store.time = target_interval - Interval(INTERVALS_PER_SLOT)

# Tick remaining intervals one at a time.
while store.time < target_interval:
Expand All @@ -185,29 +123,21 @@ async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAtte
self.sync_service.store = store

# Yield to the event loop so gossip handlers can run.
# Re-read store afterward: a gossip handler may have added
# blocks or attestations during the yield.
# Re-read the store: a handler may have added blocks or attestations.
await asyncio.sleep(0)
store = self.sync_service.store

return all_new_aggregates

async def _initial_tick(self) -> Interval | None:
"""
Perform initial tick to catch up store time to current wall clock.

This is called once at startup to ensure the store's time reflects
actual wall clock time, not just the genesis anchor time.

Returns the interval that was handled, or None if before genesis.
"""
"""Catch up store time to wall clock at startup."""
current_time = self.clock.current_time()

# Only tick if we're past genesis.
# Only tick once past genesis.
if current_time >= self.clock.genesis_time:
target_interval = self.clock.total_intervals()

# Use _tick_to for skip + yield during catch-up.
# Reuse the skip-and-yield path for catch-up.
# Discard aggregated attestations from catch-up.
# During initial sync we may be many slots behind.
# Publishing stale aggregations would spam the network.
Expand All @@ -221,12 +151,11 @@ def stop(self) -> None:
"""
Stop the service.

Sets the running flag to False, causing the run() loop to exit
after completing its current sleep cycle.
The loop exits after its current sleep cycle finishes.
"""
self._running = False

@property
def is_running(self) -> bool:
"""Check if the service is currently running."""
"""Whether the service is currently running."""
return self._running
Loading
Loading