From 5ed708de661e502073c728f6f7437d8e958e3836 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 12 Jun 2026 17:24:46 -0300 Subject: [PATCH] feat: add heartbeat topic and Goldfish fork-choice --- crates/blockchain/src/lib.rs | 17 +++++-- crates/blockchain/src/store.rs | 12 ++++- crates/blockchain/state_transition/src/lib.rs | 17 +++++++ crates/net/api/src/lib.rs | 4 ++ crates/net/p2p/src/gossipsub/handler.rs | 28 +++++++++++- crates/net/p2p/src/gossipsub/messages.rs | 11 +++++ crates/net/p2p/src/gossipsub/mod.rs | 3 +- crates/net/p2p/src/lib.rs | 24 +++++++++- crates/storage/src/store.rs | 44 ++++++++++++++++++- 9 files changed, 148 insertions(+), 12 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index e0e20e6b..4d208c03 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant, SystemTime}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; -use ethlambda_state_transition::is_proposer; +use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer}; use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, @@ -471,9 +471,18 @@ impl BlockChainServer { // Publish to gossip network if let Some(ref p2p) = self.p2p { - let _ = p2p.publish_attestation(signed_attestation).inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), - ); + let _ = p2p + .publish_attestation(signed_attestation.clone()) + .inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), + ); + let head_state = self.store.head_state(); + let num_validators = head_state.validators.len() as u64; + if is_heartbeat_committee_member(validator_id, slot, num_validators) { + let _ = p2p.publish_heartbeat_attestation(signed_attestation).inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to publish attestation"), + ); + } info!(%slot, %validator_id, "Published attestation"); } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 8fd1b36c..4db5f519 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1,6 +1,8 @@ use std::collections::HashSet; -use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after}; +use ethlambda_state_transition::{ + is_heartbeat_committee_member, is_proposer, slot_is_justifiable_after, +}; use ethlambda_storage::{ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, @@ -39,7 +41,7 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) { /// fork choice tree to the terminal. pub fn update_head(store: &mut Store, log_tree: bool) { let blocks = store.get_live_chain(); - let attestations = store.extract_latest_known_attestations(); + let attestations = store.get_last_slot_votes(); let old_head = store.head(); let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, @@ -340,6 +342,12 @@ pub fn on_gossip_attestation( } metrics::inc_pq_sig_attestation_signatures_valid(); + let num_validators = target_state.validators.len() as u64; + // If the validator is in the heartbeat committee, persist the vote for fork choice usage. + if is_heartbeat_committee_member(validator_id, attestation.data.slot, num_validators) { + store.insert_heartbeat_vote(validator_id, attestation.data.clone()); + } + // Only aggregators persist the signature for later aggregation at // interval 2. Non-aggregators drop the validated attestation — they // still participate in the mesh so peers see the message propagate. diff --git a/crates/blockchain/state_transition/src/lib.rs b/crates/blockchain/state_transition/src/lib.rs index ac57bc7d..a744cbd8 100644 --- a/crates/blockchain/state_transition/src/lib.rs +++ b/crates/blockchain/state_transition/src/lib.rs @@ -13,6 +13,8 @@ use tracing::{info, warn}; pub mod justified_slots_ops; pub mod metrics; +pub const HEARTBEAT_COMMITTEE_SIZE: usize = 4; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("target slot {target_slot} is in the past (current is {current_slot})")] @@ -231,6 +233,21 @@ pub fn is_proposer(validator_index: u64, slot: u64, num_validators: u64) -> bool current_proposer(slot, num_validators) == Some(validator_index) } +/// Check if a validator is part of the heartbeat committee for a given slot. +/// +/// The heartbeat committee is formed by the proposer and the next N validators. +pub fn is_heartbeat_committee_member(validator_index: u64, slot: u64, num_validators: u64) -> bool { + let Some(proposer) = current_proposer(slot, num_validators) else { + return false; + }; + for i in 0..HEARTBEAT_COMMITTEE_SIZE as u64 { + if validator_index == (proposer + i) % num_validators { + return true; + } + } + false +} + /// Apply attestations and update justification/finalization /// according to the Lean Consensus 3SF-mini rules. fn process_attestations( diff --git a/crates/net/api/src/lib.rs b/crates/net/api/src/lib.rs index 9cdbcdd0..4f00dfd2 100644 --- a/crates/net/api/src/lib.rs +++ b/crates/net/api/src/lib.rs @@ -13,6 +13,10 @@ use spawned_concurrency::protocol; pub trait BlockChainToP2P: Send + Sync { fn publish_block(&self, block: SignedBlock) -> Result<(), ActorError>; fn publish_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>; + fn publish_heartbeat_attestation( + &self, + attestation: SignedAttestation, + ) -> Result<(), ActorError>; fn publish_aggregated_attestation( &self, attestation: SignedAggregatedAttestation, diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index c257006b..0825fe1b 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -15,7 +15,7 @@ use super::{ attestation_subnet_topic, }, }; -use crate::{P2PServer, metrics}; +use crate::{P2PServer, gossipsub::messages::HEARTBEAT_TOPIC_KIND, metrics}; pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { @@ -95,7 +95,10 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { ); } } - Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + Some(kind) + if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) + || kind == HEARTBEAT_TOPIC_KIND => + { info!(kind = "attestation", peer_count, "P2P message received"); let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) @@ -196,6 +199,27 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { ); } +pub async fn publish_heartbeat_attestation(server: &mut P2PServer, attestation: SignedAttestation) { + let slot = attestation.data.slot; + let validator = attestation.validator_id; + + // Encode to SSZ + let ssz_bytes = attestation.to_ssz(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to gossipsub + server + .swarm_handle + .publish(server.heartbeat_topic.clone(), compressed); + info!( + %slot, + validator, + "Published heartbeat attestation to gossipsub" + ); +} + pub async fn publish_aggregated_attestation( server: &mut P2PServer, attestation: SignedAggregatedAttestation, diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 4a47f201..8a1ca264 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -9,6 +9,10 @@ pub const FORK_DIGEST: &str = "12345678"; /// Topic kind for block gossip pub const BLOCK_TOPIC_KIND: &str = "block"; + +/// Topic kind for heartbeat gossip +pub const HEARTBEAT_TOPIC_KIND: &str = "heartbeat"; + /// Topic kind prefix for per-committee attestation subnets. /// /// Full topic format: `/leanconsensus/{FORK_DIGEST}/attestation_{subnet_id}/ssz_snappy` @@ -38,3 +42,10 @@ pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic "/leanconsensus/{FORK_DIGEST}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy" )) } + +/// Build a heartbeat gossipsub topic. +pub fn heartbeat_topic() -> libp2p::gossipsub::IdentTopic { + libp2p::gossipsub::IdentTopic::new(format!( + "/leanconsensus/{FORK_DIGEST}/{HEARTBEAT_TOPIC_KIND}/ssz_snappy" + )) +} diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index b50ea4fd..2ad00233 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -5,5 +5,6 @@ mod messages; pub use encoding::decompress_message; pub use handler::{ handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, + publish_heartbeat_attestation, }; -pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic}; +pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index de34e469..466ec5cb 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -9,6 +9,7 @@ use ethlambda_network_api::{ InitBlockChain, P2PToBlockChainRef, block_chain_to_p2p::{ FetchBlock, PublishAggregatedAttestation, PublishAttestation, PublishBlock, + PublishHeartbeatAttestation, }, }; use ethlambda_storage::Store; @@ -37,8 +38,9 @@ use tracing::{info, trace, warn}; use crate::{ gossipsub::{ - aggregation_topic, attestation_subnet_topic, block_topic, publish_aggregated_attestation, - publish_attestation, publish_block, + aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic, + publish_aggregated_attestation, publish_attestation, publish_block, + publish_heartbeat_attestation, }, req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, @@ -178,6 +180,7 @@ pub struct BuiltSwarm { pub(crate) attestation_topics: HashMap, pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) heartbeat_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) bootnode_addrs: HashMap, } @@ -293,6 +296,14 @@ pub fn build_swarm( .subscribe(&block_topic) .unwrap(); + // Subscribe to heartbeat topic (all nodes) + let heartbeat_topic = heartbeat_topic(); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&heartbeat_topic) + .unwrap(); + // Subscribe to aggregation topic (all validators) let aggregation_topic = aggregation_topic(); swarm @@ -343,6 +354,7 @@ pub fn build_swarm( attestation_topics, attestation_committee_count: config.attestation_committee_count, block_topic, + heartbeat_topic, aggregation_topic, bootnode_addrs, }) @@ -368,6 +380,7 @@ impl P2P { attestation_topics: built.attestation_topics, attestation_committee_count: built.attestation_committee_count, block_topic: built.block_topic, + heartbeat_topic: built.heartbeat_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), pending_root_requests: HashMap::new(), @@ -404,6 +417,7 @@ pub struct P2PServer { pub(crate) attestation_topics: HashMap, pub(crate) attestation_committee_count: u64, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) heartbeat_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, @@ -498,6 +512,12 @@ impl Handler for P2PServer { } } +impl Handler for P2PServer { + async fn handle(&mut self, msg: PublishHeartbeatAttestation, _ctx: &Context) { + publish_heartbeat_attestation(self, msg.attestation).await; + } +} + impl Handler for P2PServer { async fn handle(&mut self, msg: PublishAggregatedAttestation, _ctx: &Context) { publish_aggregated_attestation(self, msg.attestation).await; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index fb06021f..0d992aa5 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -36,6 +36,8 @@ pub enum GetForkchoiceStoreError { /// allowing us to skip storing empty bodies and reconstruct them on read. static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().hash_tree_root()); +const INTERVALS_PER_SLOT: u64 = 5; + /// Checkpoints to update in the forkchoice store. /// /// Used with `Store::update_checkpoints` to update head and optionally @@ -493,6 +495,8 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { #[derive(Clone)] pub struct Store { backend: Arc, + /// List of votes observed in the last [`RLMD_LOOKBACK_LIMIT`] slots. + votes_per_slot: Arc>>>, new_payloads: Arc>, known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. @@ -564,6 +568,7 @@ impl Store { info!("Loaded store from persisted DB state"); Some(Self { backend, + votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())), new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( @@ -661,6 +666,7 @@ impl Store { Self { backend, + votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())), new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( @@ -778,12 +784,17 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); let pruned_sigs = self.prune_gossip_signatures(finalized.slot); + let pruned_votes = self.prune_heartbeat_votes(finalized.slot); let pruned_payloads = self.prune_stale_aggregated_payloads(finalized.slot); if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data" + pruned_chain, + pruned_sigs, + pruned_votes, + pruned_payloads, + "Pruned finalized data" ); } } @@ -896,6 +907,16 @@ impl Store { gossip.prune(finalized_slot) } + /// Prune heartbeat votes for slots <= finalized_slot. + /// + /// Returns the number of entries pruned. + pub fn prune_heartbeat_votes(&mut self, finalized_slot: u64) -> usize { + let mut votes_per_slot = self.votes_per_slot.lock().unwrap(); + let initial_len = votes_per_slot.len(); + votes_per_slot.retain(|slot, _| *slot >= finalized_slot); + initial_len - votes_per_slot.len() + } + /// Prune aggregated payload buffers (new + known) whose target slot is at or below /// `finalized_slot`. /// @@ -1169,6 +1190,16 @@ impl Store { .extract_latest_attestations() } + pub fn get_last_slot_votes(&self) -> HashMap { + let current_slot = self.time() / INTERVALS_PER_SLOT; + self.votes_per_slot + .lock() + .unwrap() + .get(¤t_slot) + .cloned() + .unwrap_or_default() + } + // ============ Known Aggregated Payloads ============ // // "Known" aggregated payloads are active in fork choice weight calculations. @@ -1352,6 +1383,15 @@ impl Store { gossip.insert(hashed, validator_id, signature); } + pub fn insert_heartbeat_vote(&self, validator_id: u64, data: AttestationData) { + self.votes_per_slot + .lock() + .unwrap() + .entry(data.slot) + .or_default() + .insert(validator_id, data); + } + // ============ Derived Accessors ============ /// Returns the slot of the current head block. @@ -1481,6 +1521,7 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); Self { backend, + votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())), new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( @@ -1494,6 +1535,7 @@ mod tests { fn test_store_with_backend(backend: Arc) -> Self { Self { backend, + votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())), new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new(