Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
use ethlambda_state_transition::{is_heartbeat_committee_member, is_proposer};
use ethlambda_storage::{ALL_TABLES, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -471,9 +471,18 @@ impl BlockChainServer {

// Publish to gossip network
if let Some(ref p2p) = self.p2p {
let _ = p2p.publish_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let _ = p2p
.publish_attestation(signed_attestation.clone())
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
let head_state = self.store.head_state();
let num_validators = head_state.validators.len() as u64;
if is_heartbeat_committee_member(validator_id, slot, num_validators) {
let _ = p2p.publish_heartbeat_attestation(signed_attestation).inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
);
}
info!(%slot, %validator_id, "Published attestation");
}
}
Expand Down
12 changes: 10 additions & 2 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashSet;

use ethlambda_state_transition::{is_proposer, slot_is_justifiable_after};
use ethlambda_state_transition::{
is_heartbeat_committee_member, is_proposer, slot_is_justifiable_after,
};
use ethlambda_storage::{ForkCheckpoints, Store};
use ethlambda_types::{
ShortRoot,
Expand Down Expand Up @@ -39,7 +41,7 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) {
/// fork choice tree to the terminal.
pub fn update_head(store: &mut Store, log_tree: bool) {
let blocks = store.get_live_chain();
let attestations = store.extract_latest_known_attestations();
let attestations = store.get_last_slot_votes();
let old_head = store.head();
let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
Expand Down Expand Up @@ -340,6 +342,12 @@ pub fn on_gossip_attestation(
}
metrics::inc_pq_sig_attestation_signatures_valid();

let num_validators = target_state.validators.len() as u64;
// If the validator is in the heartbeat committee, persist the vote for fork choice usage.
if is_heartbeat_committee_member(validator_id, attestation.data.slot, num_validators) {
store.insert_heartbeat_vote(validator_id, attestation.data.clone());
}

// Only aggregators persist the signature for later aggregation at
// interval 2. Non-aggregators drop the validated attestation — they
// still participate in the mesh so peers see the message propagate.
Expand Down
17 changes: 17 additions & 0 deletions crates/blockchain/state_transition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tracing::{info, warn};
pub mod justified_slots_ops;
pub mod metrics;

pub const HEARTBEAT_COMMITTEE_SIZE: usize = 4;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("target slot {target_slot} is in the past (current is {current_slot})")]
Expand Down Expand Up @@ -231,6 +233,21 @@ pub fn is_proposer(validator_index: u64, slot: u64, num_validators: u64) -> bool
current_proposer(slot, num_validators) == Some(validator_index)
}

/// Check if a validator is part of the heartbeat committee for a given slot.
///
/// The heartbeat committee is formed by the proposer and the next N validators.
pub fn is_heartbeat_committee_member(validator_index: u64, slot: u64, num_validators: u64) -> bool {
let Some(proposer) = current_proposer(slot, num_validators) else {
return false;
};
for i in 0..HEARTBEAT_COMMITTEE_SIZE as u64 {
if validator_index == (proposer + i) % num_validators {
return true;
}
}
false
}

/// Apply attestations and update justification/finalization
/// according to the Lean Consensus 3SF-mini rules.
fn process_attestations(
Expand Down
4 changes: 4 additions & 0 deletions crates/net/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use spawned_concurrency::protocol;
pub trait BlockChainToP2P: Send + Sync {
fn publish_block(&self, block: SignedBlock) -> Result<(), ActorError>;
fn publish_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>;
fn publish_heartbeat_attestation(
&self,
attestation: SignedAttestation,
) -> Result<(), ActorError>;
fn publish_aggregated_attestation(
&self,
attestation: SignedAggregatedAttestation,
Expand Down
28 changes: 26 additions & 2 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
attestation_subnet_topic,
},
};
use crate::{P2PServer, metrics};
use crate::{P2PServer, gossipsub::messages::HEARTBEAT_TOPIC_KIND, metrics};

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand Down Expand Up @@ -95,7 +95,10 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
);
}
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
Some(kind)
if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX)
|| kind == HEARTBEAT_TOPIC_KIND =>
{
info!(kind = "attestation", peer_count, "P2P message received");
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
Expand Down Expand Up @@ -196,6 +199,27 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {
);
}

pub async fn publish_heartbeat_attestation(server: &mut P2PServer, attestation: SignedAttestation) {
let slot = attestation.data.slot;
let validator = attestation.validator_id;

// Encode to SSZ
let ssz_bytes = attestation.to_ssz();

// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

// Publish to gossipsub
server
.swarm_handle
.publish(server.heartbeat_topic.clone(), compressed);
info!(
%slot,
validator,
"Published heartbeat attestation to gossipsub"
);
}

pub async fn publish_aggregated_attestation(
server: &mut P2PServer,
attestation: SignedAggregatedAttestation,
Expand Down
11 changes: 11 additions & 0 deletions crates/net/p2p/src/gossipsub/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub const FORK_DIGEST: &str = "12345678";

/// Topic kind for block gossip
pub const BLOCK_TOPIC_KIND: &str = "block";

/// Topic kind for heartbeat gossip
pub const HEARTBEAT_TOPIC_KIND: &str = "heartbeat";

/// Topic kind prefix for per-committee attestation subnets.
///
/// Full topic format: `/leanconsensus/{FORK_DIGEST}/attestation_{subnet_id}/ssz_snappy`
Expand Down Expand Up @@ -38,3 +42,10 @@ pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic
"/leanconsensus/{FORK_DIGEST}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy"
))
}

/// Build a heartbeat gossipsub topic.
pub fn heartbeat_topic() -> libp2p::gossipsub::IdentTopic {
libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{FORK_DIGEST}/{HEARTBEAT_TOPIC_KIND}/ssz_snappy"
))
}
3 changes: 2 additions & 1 deletion crates/net/p2p/src/gossipsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ mod messages;
pub use encoding::decompress_message;
pub use handler::{
handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block,
publish_heartbeat_attestation,
};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic};
pub use messages::{aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic};
24 changes: 22 additions & 2 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ethlambda_network_api::{
InitBlockChain, P2PToBlockChainRef,
block_chain_to_p2p::{
FetchBlock, PublishAggregatedAttestation, PublishAttestation, PublishBlock,
PublishHeartbeatAttestation,
},
};
use ethlambda_storage::Store;
Expand Down Expand Up @@ -37,8 +38,9 @@ use tracing::{info, trace, warn};

use crate::{
gossipsub::{
aggregation_topic, attestation_subnet_topic, block_topic, publish_aggregated_attestation,
publish_attestation, publish_block,
aggregation_topic, attestation_subnet_topic, block_topic, heartbeat_topic,
publish_aggregated_attestation, publish_attestation, publish_block,
publish_heartbeat_attestation,
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
Expand Down Expand Up @@ -178,6 +180,7 @@ pub struct BuiltSwarm {
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
pub(crate) attestation_committee_count: u64,
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
pub(crate) heartbeat_topic: libp2p::gossipsub::IdentTopic,
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
pub(crate) bootnode_addrs: HashMap<PeerId, Multiaddr>,
}
Expand Down Expand Up @@ -293,6 +296,14 @@ pub fn build_swarm(
.subscribe(&block_topic)
.unwrap();

// Subscribe to heartbeat topic (all nodes)
let heartbeat_topic = heartbeat_topic();
swarm
.behaviour_mut()
.gossipsub
.subscribe(&heartbeat_topic)
.unwrap();

// Subscribe to aggregation topic (all validators)
let aggregation_topic = aggregation_topic();
swarm
Expand Down Expand Up @@ -343,6 +354,7 @@ pub fn build_swarm(
attestation_topics,
attestation_committee_count: config.attestation_committee_count,
block_topic,
heartbeat_topic,
aggregation_topic,
bootnode_addrs,
})
Expand All @@ -368,6 +380,7 @@ impl P2P {
attestation_topics: built.attestation_topics,
attestation_committee_count: built.attestation_committee_count,
block_topic: built.block_topic,
heartbeat_topic: built.heartbeat_topic,
aggregation_topic: built.aggregation_topic,
connected_peers: HashSet::new(),
pending_root_requests: HashMap::new(),
Expand Down Expand Up @@ -404,6 +417,7 @@ pub struct P2PServer {
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
pub(crate) attestation_committee_count: u64,
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
pub(crate) heartbeat_topic: libp2p::gossipsub::IdentTopic,
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,

pub(crate) connected_peers: HashSet<PeerId>,
Expand Down Expand Up @@ -498,6 +512,12 @@ impl Handler<PublishAttestation> for P2PServer {
}
}

impl Handler<PublishHeartbeatAttestation> for P2PServer {
async fn handle(&mut self, msg: PublishHeartbeatAttestation, _ctx: &Context<Self>) {
publish_heartbeat_attestation(self, msg.attestation).await;
}
}

impl Handler<PublishAggregatedAttestation> for P2PServer {
async fn handle(&mut self, msg: PublishAggregatedAttestation, _ctx: &Context<Self>) {
publish_aggregated_attestation(self, msg.attestation).await;
Expand Down
44 changes: 43 additions & 1 deletion crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub enum GetForkchoiceStoreError {
/// allowing us to skip storing empty bodies and reconstruct them on read.
static EMPTY_BODY_ROOT: LazyLock<H256> = LazyLock::new(|| BlockBody::default().hash_tree_root());

const INTERVALS_PER_SLOT: u64 = 5;

/// Checkpoints to update in the forkchoice store.
///
/// Used with `Store::update_checkpoints` to update head and optionally
Expand Down Expand Up @@ -493,6 +495,8 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) {
#[derive(Clone)]
pub struct Store {
backend: Arc<dyn StorageBackend>,
/// List of votes observed in the last [`RLMD_LOOKBACK_LIMIT`] slots.
votes_per_slot: Arc<Mutex<BTreeMap<u64, HashMap<u64, AttestationData>>>>,
new_payloads: Arc<Mutex<PayloadBuffer>>,
known_payloads: Arc<Mutex<PayloadBuffer>>,
/// In-memory gossip signatures, consumed at interval 2 aggregation.
Expand Down Expand Up @@ -564,6 +568,7 @@ impl Store {
info!("Loaded store from persisted DB state");
Some(Self {
backend,
votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())),
new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))),
known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))),
gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new(
Expand Down Expand Up @@ -661,6 +666,7 @@ impl Store {

Self {
backend,
votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())),
new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))),
known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))),
gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new(
Expand Down Expand Up @@ -778,12 +784,17 @@ impl Store {
{
let pruned_chain = self.prune_live_chain(finalized.slot);
let pruned_sigs = self.prune_gossip_signatures(finalized.slot);
let pruned_votes = self.prune_heartbeat_votes(finalized.slot);
let pruned_payloads = self.prune_stale_aggregated_payloads(finalized.slot);

if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 {
info!(
finalized_slot = finalized.slot,
pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data"
pruned_chain,
pruned_sigs,
pruned_votes,
pruned_payloads,
"Pruned finalized data"
);
}
}
Expand Down Expand Up @@ -896,6 +907,16 @@ impl Store {
gossip.prune(finalized_slot)
}

/// Prune heartbeat votes for slots <= finalized_slot.
///
/// Returns the number of entries pruned.
pub fn prune_heartbeat_votes(&mut self, finalized_slot: u64) -> usize {
let mut votes_per_slot = self.votes_per_slot.lock().unwrap();
let initial_len = votes_per_slot.len();
votes_per_slot.retain(|slot, _| *slot >= finalized_slot);
initial_len - votes_per_slot.len()
}

/// Prune aggregated payload buffers (new + known) whose target slot is at or below
/// `finalized_slot`.
///
Expand Down Expand Up @@ -1169,6 +1190,16 @@ impl Store {
.extract_latest_attestations()
}

pub fn get_last_slot_votes(&self) -> HashMap<u64, AttestationData> {
let current_slot = self.time() / INTERVALS_PER_SLOT;
self.votes_per_slot
.lock()
.unwrap()
.get(&current_slot)
.cloned()
.unwrap_or_default()
}

// ============ Known Aggregated Payloads ============
//
// "Known" aggregated payloads are active in fork choice weight calculations.
Expand Down Expand Up @@ -1352,6 +1383,15 @@ impl Store {
gossip.insert(hashed, validator_id, signature);
}

pub fn insert_heartbeat_vote(&self, validator_id: u64, data: AttestationData) {
self.votes_per_slot
.lock()
.unwrap()
.entry(data.slot)
.or_default()
.insert(validator_id, data);
}

// ============ Derived Accessors ============

/// Returns the slot of the current head block.
Expand Down Expand Up @@ -1481,6 +1521,7 @@ mod tests {
let backend = Arc::new(InMemoryBackend::new());
Self {
backend,
votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())),
new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))),
known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))),
gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new(
Expand All @@ -1494,6 +1535,7 @@ mod tests {
fn test_store_with_backend(backend: Arc<InMemoryBackend>) -> Self {
Self {
backend,
votes_per_slot: Arc::new(Mutex::new(BTreeMap::new())),
new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))),
known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))),
gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new(
Expand Down
Loading