(
+ partition_prefix: &str,
+ store_name: &str,
+ codec_config: C,
+) -> immutable::Config {
+ let prefix = format!("{}-{}", partition_prefix, store_name);
+ let page_cache = make_default_cache_ref();
+
+ immutable::Config {
+ metadata_partition: format!("{prefix}-metadata"),
+ freezer_table_partition: format!("{prefix}-freezer-table"),
+ freezer_table_initial_size: 64,
+ freezer_table_resize_frequency: 10,
+ freezer_table_resize_chunk_size: 10,
+ freezer_key_partition: format!("{prefix}-freezer-key"),
+ freezer_key_page_cache: page_cache,
+ freezer_value_partition: format!("{prefix}-freezer-value"),
+ freezer_value_target_size: u64::from(DEFAULT_PAGE_SIZE),
+ freezer_value_compression: None,
+ ordinal_partition: format!("{prefix}-ordinal"),
+ items_per_section: NonZeroU64::new(DEFAULT_ITEMS_PER_SECTION).unwrap(),
+ codec_config,
+ replay_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(),
+ freezer_key_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(),
+ freezer_value_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(),
+ ordinal_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(),
+ }
+}
+
+/// Create a resolver [`Config`](resolver::p2p::Config) for the marshal's P2P block fetcher.
+pub fn resolver_config(
+ public_key: P,
+ provider: C,
+ blocker: B,
+ config: &MarshalConfig,
+) -> resolver::p2p::Config
+where
+ P: commonware_cryptography::PublicKey,
+ C: commonware_p2p::Provider,
+ B: commonware_p2p::Blocker,
+{
+ resolver::p2p::Config {
+ public_key,
+ provider,
+ blocker,
+ mailbox_size: config.mailbox_size,
+ initial: config.resolver.initial,
+ timeout: config.resolver.timeout,
+ fetch_retry_timeout: config.resolver.fetch_retry_timeout,
+ priority_requests: config.resolver.priority_requests,
+ priority_responses: config.resolver.priority_responses,
+ }
+}
diff --git a/crates/consensus/src/relay.rs b/crates/consensus/src/relay.rs
new file mode 100644
index 0000000..986b7f5
--- /dev/null
+++ b/crates/consensus/src/relay.rs
@@ -0,0 +1,246 @@
+use crate::block::ConsensusBlock;
+use commonware_consensus::Relay;
+use std::collections::BTreeMap;
+use std::sync::{Arc, Mutex, RwLock, Weak};
+
+type BlockStore = Arc>>>;
+type BlockStoreWeak = Weak>>>;
+
+/// Shared in-memory fanout used by [`EvolveRelay`] instances in the same test/process.
+///
+/// Each validator should be constructed with the same network handle so `broadcast()`
+/// can copy the full block into every peer's pending-block store.
+#[derive(Debug)]
+pub struct InMemoryRelayNetwork {
+ peers: Mutex>>,
+}
+
+impl InMemoryRelayNetwork {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ fn register(&self, blocks: &BlockStore) {
+ let mut peers = self
+ .peers
+ .lock()
+ .unwrap_or_else(|poison| poison.into_inner());
+ peers.retain(|peer| peer.strong_count() > 0);
+
+ let already_registered = peers.iter().any(|peer| {
+ peer.upgrade()
+ .is_some_and(|registered| Arc::ptr_eq(®istered, blocks))
+ });
+ if !already_registered {
+ peers.push(Arc::downgrade(blocks));
+ }
+ }
+
+ fn fanout(&self, source: &BlockStore, block: &ConsensusBlock) -> usize
+ where
+ Tx: Clone,
+ {
+ let digest = block.digest_value().0;
+ let mut delivered = 0;
+ let mut peers = self
+ .peers
+ .lock()
+ .unwrap_or_else(|poison| poison.into_inner());
+ peers.retain(|peer| {
+ let Some(store) = peer.upgrade() else {
+ return false;
+ };
+ if Arc::ptr_eq(&store, source) {
+ return true;
+ }
+ store
+ .write()
+ .unwrap_or_else(|poison| {
+ tracing::warn!("relay: recovered from poisoned write lock");
+ poison.into_inner()
+ })
+ .insert(digest, block.clone());
+ delivered += 1;
+ true
+ });
+ delivered
+ }
+}
+
+impl Default for InMemoryRelayNetwork {
+ fn default() -> Self {
+ Self {
+ peers: Mutex::new(Vec::new()),
+ }
+ }
+}
+
+/// A local in-memory relay for block broadcast.
+///
+/// Stores proposed blocks so that other participants (or the local verifier)
+/// can look them up by digest. Validators that should exchange proposals must
+/// share the same [`InMemoryRelayNetwork`].
+#[derive(Clone)]
+pub struct EvolveRelay {
+ /// Shared storage of blocks indexed by their digest.
+ blocks: BlockStore,
+ network: Arc>,
+}
+
+impl EvolveRelay {
+ pub fn new(network: Arc>, blocks: BlockStore) -> Self {
+ network.register(&blocks);
+ Self { blocks, network }
+ }
+
+ /// Retrieve a block by its digest.
+ pub fn get_block(&self, digest: &[u8; 32]) -> Option>
+ where
+ Tx: Clone,
+ {
+ self.blocks
+ .read()
+ .unwrap_or_else(|poison| {
+ tracing::warn!("relay: recovered from poisoned read lock");
+ poison.into_inner()
+ })
+ .get(digest)
+ .cloned()
+ }
+
+ /// Insert a block into the relay's store.
+ pub fn insert_block(&self, block: ConsensusBlock)
+ where
+ Tx: Clone,
+ {
+ let digest = block.digest_value().0;
+ self.blocks
+ .write()
+ .unwrap_or_else(|poison| {
+ tracing::warn!("relay: recovered from poisoned write lock");
+ poison.into_inner()
+ })
+ .insert(digest, block);
+ }
+}
+
+impl Relay for EvolveRelay
+where
+ Tx: Clone + Send + Sync + 'static,
+{
+ type Digest = commonware_cryptography::sha256::Digest;
+
+ async fn broadcast(&mut self, payload: Self::Digest) {
+ let block = self.get_block(&payload.0);
+ let Some(block) = block else {
+ tracing::warn!(
+ digest = ?payload,
+ "relay: cannot broadcast missing block"
+ );
+ return;
+ };
+
+ let recipients = self.network.fanout(&self.blocks, &block);
+ tracing::debug!(
+ digest = ?payload,
+ recipients,
+ "relay: broadcast block to registered peers"
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use alloy_primitives::B256;
+ use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message};
+ use evolve_stf_traits::Transaction;
+ use std::collections::BTreeMap;
+ use std::sync::{Arc, RwLock};
+
+ #[derive(Debug, Clone)]
+ struct TestTx {
+ id: [u8; 32],
+ request: InvokeRequest,
+ funds: Vec,
+ }
+
+ impl TestTx {
+ fn new(id: [u8; 32]) -> Self {
+ Self {
+ id,
+ request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])),
+ funds: Vec::new(),
+ }
+ }
+ }
+
+ impl Transaction for TestTx {
+ fn sender(&self) -> AccountId {
+ AccountId::from_u64(1)
+ }
+
+ fn recipient(&self) -> AccountId {
+ AccountId::from_u64(2)
+ }
+
+ fn request(&self) -> &InvokeRequest {
+ &self.request
+ }
+
+ fn gas_limit(&self) -> u64 {
+ 21_000
+ }
+
+ fn funds(&self) -> &[FungibleAsset] {
+ &self.funds
+ }
+
+ fn compute_identifier(&self) -> [u8; 32] {
+ self.id
+ }
+ }
+
+ #[test]
+ fn insert_and_get_block_by_digest() {
+ let network = Arc::new(InMemoryRelayNetwork::new());
+ let store = Arc::new(RwLock::new(BTreeMap::new()));
+ let relay = EvolveRelay::new(network, store);
+ let block = evolve_server::Block::new(
+ evolve_server::BlockHeader::new(1, 100, B256::ZERO),
+ vec![TestTx::new([7u8; 32])],
+ );
+ let consensus_block = ConsensusBlock::new(block);
+ let digest = consensus_block.digest_value().0;
+
+ relay.insert_block(consensus_block.clone());
+
+ let fetched = relay.get_block(&digest).expect("block should exist");
+ assert_eq!(fetched.digest_value(), consensus_block.digest_value());
+ assert_eq!(fetched.inner.header.number, 1);
+ }
+
+ #[tokio::test]
+ async fn broadcast_copies_block_to_other_registered_relays() {
+ let network = Arc::new(InMemoryRelayNetwork::new());
+ let store_a = Arc::new(RwLock::new(BTreeMap::new()));
+ let store_b = Arc::new(RwLock::new(BTreeMap::new()));
+ let mut relay_a = EvolveRelay::new(network.clone(), store_a);
+ let relay_b = EvolveRelay::new(network, store_b);
+
+ let block = evolve_server::Block::new(
+ evolve_server::BlockHeader::new(1, 100, B256::ZERO),
+ vec![TestTx::new([9u8; 32])],
+ );
+ let consensus_block = ConsensusBlock::new(block);
+ let digest = consensus_block.digest_value();
+ relay_a.insert_block(consensus_block.clone());
+
+ relay_a.broadcast(digest).await;
+
+ let fetched = relay_b
+ .get_block(&digest.0)
+ .expect("peer relay should receive the broadcast block");
+ assert_eq!(fetched.digest_value(), consensus_block.digest_value());
+ }
+}
diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs
new file mode 100644
index 0000000..34aeda9
--- /dev/null
+++ b/crates/consensus/src/reporter.rs
@@ -0,0 +1,412 @@
+use alloy_primitives::B256;
+use commonware_consensus::simplex::types::Activity;
+use commonware_consensus::Reporter;
+use commonware_cryptography::certificate::Scheme;
+use commonware_cryptography::sha256::Digest as Sha256Digest;
+use evolve_mempool::{Mempool, MempoolTx, SharedMempool};
+use evolve_stf_traits::Transaction;
+use std::collections::BTreeMap;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, RwLock};
+use tokio::sync::RwLock as TokioRwLock;
+
+use crate::block::ConsensusBlock;
+
+/// Shared chain state that the reporter updates on finalization.
+///
+/// The automaton exposes these via `last_hash()` and `height_atomic()`.
+/// Pass them to the reporter so finalization events update chain linkage.
+pub struct ChainState {
+ /// The hash of the most recently finalized block.
+ pub last_hash: Arc>,
+ /// The current chain height.
+ pub height: Arc,
+ /// Pending blocks cache (shared with automaton).
+ pub pending_blocks: Arc>>>,
+ /// Shared mempool to confirm executed transactions and requeue dropped ones.
+ pub mempool: SharedMempool>,
+}
+
+impl Clone for ChainState {
+ fn clone(&self) -> Self {
+ Self {
+ last_hash: self.last_hash.clone(),
+ height: self.height.clone(),
+ pending_blocks: self.pending_blocks.clone(),
+ mempool: self.mempool.clone(),
+ }
+ }
+}
+
+/// A reporter that logs consensus activity and updates chain state on finalization.
+///
+/// Generic over the activity type so it works with any consensus scheme.
+/// Holds an optional [`ChainState`] reference — when present, finalization
+/// events update `last_hash` so subsequent proposals use the correct parent.
+///
+/// In production, the marshal sits between simplex and this reporter,
+/// delivering ordered `Update` events. For direct (non-marshal) usage,
+/// this reporter receives raw `Activity` events.
+pub struct EvolveReporter {
+ chain_state: Option>,
+ _phantom: std::marker::PhantomData A>,
+}
+
+impl Clone for EvolveReporter {
+ fn clone(&self) -> Self {
+ Self {
+ chain_state: self.chain_state.clone(),
+ _phantom: std::marker::PhantomData,
+ }
+ }
+}
+
+impl EvolveReporter {
+ /// Create a reporter without chain state (logging only).
+ pub fn new() -> Self {
+ Self {
+ chain_state: None,
+ _phantom: std::marker::PhantomData,
+ }
+ }
+
+ /// Create a reporter with chain state for finalization updates.
+ pub fn with_chain_state(chain_state: ChainState) -> Self {
+ Self {
+ chain_state: Some(chain_state),
+ _phantom: std::marker::PhantomData,
+ }
+ }
+}
+
+impl Default for EvolveReporter {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Reporter for EvolveReporter, Tx>
+where
+ S: Scheme + Clone + Send + 'static,
+ Tx: Clone + Transaction + MempoolTx + Send + Sync + 'static,
+{
+ type Activity = Activity;
+
+ async fn report(&mut self, activity: Self::Activity) {
+ let Some(state) = self.chain_state.as_ref() else {
+ tracing::debug!("reporter: received consensus activity");
+ return;
+ };
+
+ let finalized_digest = match activity {
+ Activity::Finalization(finalization) => finalization.proposal.payload.0,
+ _ => {
+ tracing::debug!(
+ height = state.height.load(Ordering::SeqCst),
+ "reporter: received non-finalization activity"
+ );
+ return;
+ }
+ };
+
+ let finalized_block = {
+ let mut pending = state.pending_blocks.write().unwrap_or_else(|poison| {
+ tracing::warn!("reporter: recovered from poisoned pending_blocks lock");
+ poison.into_inner()
+ });
+ pending.remove(&finalized_digest)
+ };
+
+ let Some(block) = finalized_block else {
+ tracing::warn!(
+ digest = ?finalized_digest,
+ "reporter: finalization digest not found in pending blocks"
+ );
+ return;
+ };
+
+ let finalized_hash = block.block_hash();
+ let executed: Vec<_> = block
+ .inner
+ .transactions
+ .iter()
+ .map(|tx| tx.tx_id())
+ .collect();
+ if !executed.is_empty() {
+ let mut mempool = state.mempool.write().await;
+ mempool.finalize(&executed);
+ }
+
+ *state.last_hash.write().await = finalized_hash;
+ state.height.fetch_max(
+ block.inner.header.number.saturating_add(1),
+ Ordering::SeqCst,
+ );
+
+ tracing::debug!(
+ digest = ?finalized_digest,
+ block_number = block.inner.header.number,
+ block_hash = ?finalized_hash,
+ "reporter: advanced chain state from finalized activity"
+ );
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use commonware_consensus::simplex::scheme::ed25519;
+ use commonware_consensus::simplex::types::{Finalization, Finalize, Proposal};
+ use commonware_consensus::types::{Epoch, Round, View};
+ use commonware_cryptography::{ed25519::PrivateKey, Signer as _};
+ use commonware_parallel::Sequential;
+ use commonware_utils::ordered::Set;
+ use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message};
+ use evolve_mempool::{new_shared_mempool, FifoOrdering};
+ use evolve_server::{Block, BlockHeader};
+ use std::sync::Arc;
+
+ #[derive(Clone)]
+ struct TestTx {
+ id: [u8; 32],
+ request: InvokeRequest,
+ funds: Vec,
+ }
+
+ impl TestTx {
+ fn new(id: [u8; 32]) -> Self {
+ Self {
+ id,
+ request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])),
+ funds: Vec::new(),
+ }
+ }
+ }
+
+ impl Transaction for TestTx {
+ fn sender(&self) -> AccountId {
+ AccountId::from_u64(1)
+ }
+
+ fn recipient(&self) -> AccountId {
+ AccountId::from_u64(2)
+ }
+
+ fn request(&self) -> &InvokeRequest {
+ &self.request
+ }
+
+ fn gas_limit(&self) -> u64 {
+ 21_000
+ }
+
+ fn funds(&self) -> &[FungibleAsset] {
+ &self.funds
+ }
+
+ fn compute_identifier(&self) -> [u8; 32] {
+ self.id
+ }
+ }
+
+ impl MempoolTx for TestTx {
+ type OrderingKey = FifoOrdering;
+
+ fn tx_id(&self) -> [u8; 32] {
+ self.id
+ }
+
+ fn ordering_key(&self) -> Self::OrderingKey {
+ FifoOrdering::new(u64::from(self.id[0]))
+ }
+
+ fn gas_limit(&self) -> u64 {
+ Transaction::gas_limit(self)
+ }
+ }
+
+ fn test_scheme() -> ed25519::Scheme {
+ let private_key = PrivateKey::from_seed(7);
+ let public_key = private_key.public_key();
+ let participants = Set::from_iter_dedup([public_key]);
+ ed25519::Scheme::signer(b"reporter-test", participants, private_key)
+ .expect("signer must exist in participants")
+ }
+
+ fn finalization_activity_for_digest(
+ scheme: &ed25519::Scheme,
+ digest: Sha256Digest,
+ ) -> Activity {
+ let proposal = Proposal::new(
+ Round::new(Epoch::zero(), View::new(1)),
+ View::zero(),
+ digest,
+ );
+ let finalize_vote = Finalize::sign(scheme, proposal).expect("finalize vote must sign");
+ let finalization = Finalization::from_finalizes(scheme, [&finalize_vote], &Sequential)
+ .expect("single-validator finalization must assemble");
+ Activity::Finalization(finalization)
+ }
+
+ #[tokio::test]
+ async fn finalization_updates_chain_state_and_evicts_pending_block() {
+ let last_hash = Arc::new(TokioRwLock::new(B256::ZERO));
+ let height = Arc::new(AtomicU64::new(1));
+ let pending_blocks = Arc::new(RwLock::new(
+ BTreeMap::<[u8; 32], ConsensusBlock>::new(),
+ ));
+ let mempool = new_shared_mempool::();
+
+ let block = Block::new(
+ BlockHeader::new(1, 1_000, B256::ZERO),
+ vec![TestTx::new([1u8; 32])],
+ );
+ let consensus_block = ConsensusBlock::new(block);
+ let digest = consensus_block.digest_value();
+ let block_hash = consensus_block.block_hash();
+ pending_blocks
+ .write()
+ .unwrap()
+ .insert(consensus_block.digest_value().0, consensus_block);
+
+ let chain_state = ChainState {
+ last_hash: last_hash.clone(),
+ height: height.clone(),
+ pending_blocks: pending_blocks.clone(),
+ mempool,
+ };
+ let mut reporter =
+ EvolveReporter::, TestTx>::with_chain_state(
+ chain_state,
+ );
+
+ let scheme = test_scheme();
+ reporter
+ .report(finalization_activity_for_digest(&scheme, digest))
+ .await;
+
+ assert_eq!(*last_hash.read().await, block_hash);
+ assert_eq!(height.load(Ordering::SeqCst), 2);
+ assert!(
+ pending_blocks.read().unwrap().is_empty(),
+ "finalized block should be evicted from pending cache"
+ );
+ }
+
+ #[tokio::test]
+ async fn non_finalization_activity_does_not_mutate_chain_state() {
+ let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xAA)));
+ let height = Arc::new(AtomicU64::new(42));
+ let pending_blocks = Arc::new(RwLock::new(
+ BTreeMap::<[u8; 32], ConsensusBlock>::new(),
+ ));
+ let mempool = new_shared_mempool::();
+
+ let chain_state = ChainState {
+ last_hash: last_hash.clone(),
+ height: height.clone(),
+ pending_blocks: pending_blocks.clone(),
+ mempool,
+ };
+ let mut reporter =
+ EvolveReporter::, TestTx>::with_chain_state(
+ chain_state,
+ );
+
+ let digest = Sha256Digest([9u8; 32]);
+ let proposal = Proposal::new(
+ Round::new(Epoch::zero(), View::new(2)),
+ View::new(1),
+ digest,
+ );
+ let scheme = test_scheme();
+ let finalize_vote = Finalize::sign(&scheme, proposal).expect("finalize vote must sign");
+
+ reporter.report(Activity::Finalize(finalize_vote)).await;
+
+ assert_eq!(*last_hash.read().await, B256::repeat_byte(0xAA));
+ assert_eq!(height.load(Ordering::SeqCst), 42);
+ assert!(pending_blocks.read().unwrap().is_empty());
+ }
+
+ #[tokio::test]
+ async fn finalization_with_unknown_digest_does_not_mutate_chain_state() {
+ let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xBB)));
+ let height = Arc::new(AtomicU64::new(7));
+ let pending_blocks = Arc::new(RwLock::new(
+ BTreeMap::<[u8; 32], ConsensusBlock>::new(),
+ ));
+ let mempool = new_shared_mempool::();
+
+ let chain_state = ChainState {
+ last_hash: last_hash.clone(),
+ height: height.clone(),
+ pending_blocks: pending_blocks.clone(),
+ mempool,
+ };
+ let mut reporter =
+ EvolveReporter::, TestTx>::with_chain_state(
+ chain_state,
+ );
+
+ let scheme = test_scheme();
+ reporter
+ .report(finalization_activity_for_digest(
+ &scheme,
+ Sha256Digest([0xCC; 32]),
+ ))
+ .await;
+
+ assert_eq!(*last_hash.read().await, B256::repeat_byte(0xBB));
+ assert_eq!(height.load(Ordering::SeqCst), 7);
+ assert!(pending_blocks.read().unwrap().is_empty());
+ }
+
+ #[tokio::test]
+ async fn finalization_confirms_transactions_in_mempool() {
+ let last_hash = Arc::new(TokioRwLock::new(B256::ZERO));
+ let height = Arc::new(AtomicU64::new(1));
+ let pending_blocks = Arc::new(RwLock::new(
+ BTreeMap::<[u8; 32], ConsensusBlock>::new(),
+ ));
+ let mempool = new_shared_mempool::();
+ let tx = TestTx::new([0x11; 32]);
+ mempool
+ .write()
+ .await
+ .add(tx.clone())
+ .expect("tx should enter mempool");
+ let proposed = mempool.write().await.propose(0, 1).0;
+ assert_eq!(proposed.len(), 1, "proposal should pull the test tx");
+
+ let block = Block::new(BlockHeader::new(1, 1_000, B256::ZERO), vec![tx.clone()]);
+ let consensus_block = ConsensusBlock::new(block);
+ let digest = consensus_block.digest_value();
+ pending_blocks
+ .write()
+ .unwrap()
+ .insert(consensus_block.digest_value().0, consensus_block);
+
+ let chain_state = ChainState {
+ last_hash,
+ height,
+ pending_blocks,
+ mempool: mempool.clone(),
+ };
+ let mut reporter =
+ EvolveReporter::, TestTx>::with_chain_state(
+ chain_state,
+ );
+
+ let scheme = test_scheme();
+ reporter
+ .report(finalization_activity_for_digest(&scheme, digest))
+ .await;
+
+ let pool = mempool.read().await;
+ assert!(
+ !pool.contains(&tx.tx_id()),
+ "finalized transactions should be removed from the mempool"
+ );
+ }
+}
diff --git a/crates/consensus/src/runner.rs b/crates/consensus/src/runner.rs
new file mode 100644
index 0000000..6c5446b
--- /dev/null
+++ b/crates/consensus/src/runner.rs
@@ -0,0 +1,120 @@
+//! Consensus lifecycle orchestrator.
+//!
+//! [`ConsensusRunner`] manages the full lifecycle of consensus + P2P subsystems:
+//! network initialization, engine startup, and graceful shutdown.
+//!
+//! # With Marshal (Production)
+//!
+//! For ordered finalized block delivery, initialize the marshal via
+//! [`crate::marshal`] and pass the [`MarshalMailbox`](crate::MarshalMailbox)
+//! as the `reporter` parameter to [`start`](ConsensusRunner::start). The
+//! marshal actor then delivers blocks in sequential height order to the
+//! application reporter. See [`crate::marshal`] for the full wiring pattern.
+//!
+//! # Without Marshal (Testing)
+//!
+//! For testing, pass any `Reporter` implementation directly — the simplex
+//! engine will report raw activity events without ordering guarantees.
+
+use crate::config::ConsensusConfig;
+use crate::engine::{SimplexScheme, SimplexSetup};
+use commonware_consensus::simplex::elector::{Config as ElectorConfig, RoundRobin};
+use commonware_consensus::simplex::types::{Activity, Context};
+use commonware_consensus::{CertifiableAutomaton, Relay, Reporter};
+use commonware_cryptography::Digest;
+use commonware_p2p::{Blocker, Receiver, Sender};
+use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
+use evolve_p2p::NetworkConfig;
+use rand_core::CryptoRngCore;
+
+/// Orchestrates the full consensus lifecycle.
+///
+/// Wires together P2P networking and the simplex consensus engine,
+/// managing startup and graceful shutdown of all subsystems.
+///
+/// # Lifecycle
+///
+/// 1. Create the runner with consensus and network configuration.
+/// 2. Call [`start`](Self::start) with the runtime context, automaton, scheme,
+/// blocker, and network channels.
+/// 3. The returned [`Handle`] completes when the runtime signals shutdown.
+///
+/// # Marshal Integration
+///
+/// For production use with ordered block delivery:
+///
+/// ```rust,ignore
+/// use evolve_consensus::marshal::*;
+///
+/// // 1. Create archive stores
+/// let certs = immutable::Archive::init(ctx, archive_config(&prefix, "certs", codec)).await?;
+/// let blocks = immutable::Archive::init(ctx, archive_config(&prefix, "blocks", ())).await?;
+///
+/// // 2. Initialize marshal
+/// let marshal_cfg = init_marshal_config::(&MarshalConfig::default(), scheme, ());
+/// let (actor, mailbox, height) = MarshalActor::init(ctx, certs, blocks, marshal_cfg).await;
+///
+/// // 3. Pass marshal mailbox as reporter to ConsensusRunner::start()
+/// let engine_handle = runner.start(ctx, automaton, relay, mailbox, scheme, blocker, ...);
+///
+/// // 4. Start marshal actor (delivers ordered blocks to app reporter)
+/// let marshal_handle = actor.start(app_reporter, broadcast_buffer, resolver);
+/// ```
+pub struct ConsensusRunner {
+ consensus_config: ConsensusConfig,
+ _network_config: NetworkConfig,
+}
+
+impl ConsensusRunner {
+ /// Create a new consensus runner.
+ pub fn new(consensus_config: ConsensusConfig, network_config: NetworkConfig) -> Self {
+ Self {
+ consensus_config,
+ _network_config: network_config,
+ }
+ }
+
+ /// Start the consensus engine with the given runtime context and components.
+ ///
+ /// The `reporter` parameter accepts any [`Reporter`] implementation.
+ /// For production, pass a [`MarshalMailbox`](crate::MarshalMailbox) to
+ /// enable ordered block delivery through the marshal actor.
+ /// For testing, pass a mock reporter directly.
+ ///
+ /// # Returns
+ ///
+ /// A [`Handle`] that completes when the engine shuts down.
+ #[allow(clippy::too_many_arguments)]
+ pub fn start(
+ &self,
+ context: E,
+ automaton: A,
+ relay: R,
+ reporter: F,
+ scheme: S,
+ blocker: B,
+ vote_network: (VS, VR),
+ certificate_network: (CS, CR),
+ resolver_network: (RS, RR),
+ ) -> Handle<()>
+ where
+ E: Clock + CryptoRngCore + Spawner + Storage + Metrics,
+ S: SimplexScheme + Clone,
+ D: Digest,
+ B: Blocker,
+ A: CertifiableAutomaton, Digest = D>,
+ R: Relay,
+ F: Reporter>,
+ RoundRobin: ElectorConfig,
+ VS: Sender,
+ VR: Receiver,
+ CS: Sender,
+ CR: Receiver,
+ RS: Sender,
+ RR: Receiver,
+ {
+ let setup = SimplexSetup::new(self.consensus_config.clone(), scheme);
+ let engine = setup.build(context, automaton, relay, reporter, blocker);
+ engine.start(vote_network, certificate_network, resolver_network)
+ }
+}
diff --git a/crates/consensus/tests/simplex_integration.rs b/crates/consensus/tests/simplex_integration.rs
new file mode 100644
index 0000000..90b0057
--- /dev/null
+++ b/crates/consensus/tests/simplex_integration.rs
@@ -0,0 +1,216 @@
+//! Integration tests for the simplex consensus engine wiring.
+//!
+//! Validates that [`SimplexSetup`] correctly configures and starts a simplex
+//! engine using ed25519 signatures, RoundRobin leader election, and the
+//! commonware simulated P2P network.
+
+use commonware_consensus::simplex::elector::RoundRobin;
+use commonware_consensus::simplex::mocks;
+use commonware_consensus::simplex::scheme::ed25519;
+use commonware_consensus::types::View;
+use commonware_consensus::Monitor;
+use commonware_cryptography::Sha256;
+use commonware_p2p::simulated::{Config as NetConfig, Network};
+use commonware_runtime::{deterministic, Metrics, Quota, Runner};
+use evolve_consensus::engine::SimplexSetup;
+use evolve_consensus::ConsensusConfig;
+use std::num::NonZeroU32;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Single-validator consensus: one validator starts consensus, proposes
+/// blocks, and finalizes them via [`SimplexSetup`].
+///
+/// Verifies:
+/// - The engine starts successfully via SimplexSetup::build
+/// - The automaton's propose() is called (blocks are produced)
+/// - Blocks are finalized (reporter receives finalization events)
+/// - No faults are detected
+#[test]
+fn single_validator_consensus_produces_and_finalizes_blocks() {
+ let executor = deterministic::Runner::timed(Duration::from_secs(120));
+ executor.start(|mut context| async move {
+ let namespace = b"evolve_test".to_vec();
+ let n = 1u32;
+
+ // Create simulated P2P network.
+ let (network, oracle) = Network::new(
+ context.with_label("network"),
+ NetConfig {
+ max_size: 1024 * 1024,
+ disconnect_on_block: true,
+ tracked_peer_sets: None,
+ },
+ );
+ network.start();
+
+ // Generate ed25519 fixture for 1 validator.
+ let fixture = ed25519::fixture(&mut context, &namespace, n);
+ let validator = fixture.participants[0].clone();
+
+ // Register validator channels on simulated network.
+ let control = oracle.control(validator.clone());
+ let quota = Quota::per_second(NonZeroU32::MAX);
+ let vote_network = control.register(0, quota).await.unwrap();
+ let certificate_network = control.register(1, quota).await.unwrap();
+ let resolver_network = control.register(2, quota).await.unwrap();
+
+ // Create mock relay, application, and reporter.
+ let relay = Arc::new(mocks::relay::Relay::new());
+ let app_cfg = mocks::application::Config {
+ hasher: Sha256::default(),
+ relay: relay.clone(),
+ me: validator.clone(),
+ propose_latency: (5.0, 1.0),
+ verify_latency: (5.0, 1.0),
+ certify_latency: (5.0, 1.0),
+ should_certify: mocks::application::Certifier::Always,
+ };
+ let (application, mailbox) =
+ mocks::application::Application::new(context.with_label("application"), app_cfg);
+ application.start();
+
+ let elector = RoundRobin::::default();
+ let reporter_cfg = mocks::reporter::Config {
+ participants: fixture.participants.clone().try_into().expect("non-empty"),
+ scheme: fixture.schemes[0].clone(),
+ elector: elector.clone(),
+ };
+ let mut reporter =
+ mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
+
+ // Subscribe to finalization events BEFORE starting the engine.
+ let (mut latest, mut monitor): (View, _) = reporter.subscribe().await;
+
+ // Use SimplexSetup to build the engine -- this is the code under test.
+ let config = ConsensusConfig {
+ chain_id: 1,
+ namespace: namespace.clone(),
+ gas_limit: 30_000_000,
+ leader_timeout: Duration::from_secs(1),
+ notarization_timeout: Duration::from_secs(2),
+ activity_timeout: Duration::from_secs(10),
+ epoch_length: 100,
+ };
+ let setup = SimplexSetup::new(config, fixture.schemes[0].clone());
+ let engine = setup.build(
+ context.with_label("engine"),
+ mailbox.clone(), // automaton
+ mailbox, // relay (Mailbox implements both)
+ reporter.clone(),
+ oracle.control(validator.clone()), // blocker
+ );
+
+ // Start the engine with P2P channels.
+ let _handle = engine.start(vote_network, certificate_network, resolver_network);
+
+ // Wait for at least 3 finalized views.
+ let target = View::new(3);
+ while latest < target {
+ latest = monitor.recv().await.expect("monitor channel closed");
+ }
+
+ // Verify: we reached the target view through the monitor.
+ assert!(
+ latest >= target,
+ "expected finalization to reach view {target:?}, got {latest:?}"
+ );
+
+ // Verify: no faults detected.
+ let faults = reporter.faults.lock().unwrap();
+ assert!(faults.is_empty(), "unexpected faults detected");
+
+ // Verify: no invalid signatures.
+ let invalid = *reporter.invalid.lock().unwrap();
+ assert_eq!(invalid, 0, "unexpected invalid signatures: {invalid}");
+ });
+}
+
+/// Verify that ConsensusRunner wires the engine correctly.
+///
+/// This test validates the higher-level ConsensusRunner API produces
+/// a running consensus engine.
+#[test]
+fn consensus_runner_starts_engine() {
+ let executor = deterministic::Runner::timed(Duration::from_secs(120));
+ executor.start(|mut context| async move {
+ let namespace = b"runner_test".to_vec();
+ let n = 1u32;
+
+ // Create simulated P2P network.
+ let (network, oracle) = Network::new(
+ context.with_label("network"),
+ NetConfig {
+ max_size: 1024 * 1024,
+ disconnect_on_block: true,
+ tracked_peer_sets: None,
+ },
+ );
+ network.start();
+
+ // Generate fixture.
+ let fixture = ed25519::fixture(&mut context, &namespace, n);
+ let validator = fixture.participants[0].clone();
+
+ // Register channels.
+ let control = oracle.control(validator.clone());
+ let quota = Quota::per_second(NonZeroU32::MAX);
+ let vote_network = control.register(0, quota).await.unwrap();
+ let certificate_network = control.register(1, quota).await.unwrap();
+ let resolver_network = control.register(2, quota).await.unwrap();
+
+ // Create mock components.
+ let relay = Arc::new(mocks::relay::Relay::new());
+ let app_cfg = mocks::application::Config {
+ hasher: Sha256::default(),
+ relay: relay.clone(),
+ me: validator.clone(),
+ propose_latency: (5.0, 1.0),
+ verify_latency: (5.0, 1.0),
+ certify_latency: (5.0, 1.0),
+ should_certify: mocks::application::Certifier::Always,
+ };
+ let (application, mailbox) =
+ mocks::application::Application::new(context.with_label("application"), app_cfg);
+ application.start();
+
+ let elector = RoundRobin::::default();
+ let reporter_cfg = mocks::reporter::Config {
+ participants: fixture.participants.clone().try_into().expect("non-empty"),
+ scheme: fixture.schemes[0].clone(),
+ elector: elector.clone(),
+ };
+ let mut reporter =
+ mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
+
+ let (mut latest, mut monitor): (View, _) = reporter.subscribe().await;
+
+ // Use ConsensusRunner -- the higher-level API under test.
+ let consensus_config = ConsensusConfig::default();
+ let network_config = evolve_p2p::NetworkConfig::default();
+ let runner = evolve_consensus::ConsensusRunner::new(consensus_config, network_config);
+ let _handle = runner.start(
+ context.with_label("runner_engine"),
+ mailbox.clone(),
+ mailbox,
+ reporter.clone(),
+ fixture.schemes[0].clone(),
+ oracle.control(validator.clone()),
+ vote_network,
+ certificate_network,
+ resolver_network,
+ );
+
+ // Wait for finalization progress.
+ let target = View::new(2);
+ while latest < target {
+ latest = monitor.recv().await.expect("monitor channel closed");
+ }
+
+ // Verify: we reached the target view through the monitor.
+ assert!(
+ latest >= target,
+ "expected finalization to reach view {target:?}, got {latest:?}"
+ );
+ });
+}
diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml
new file mode 100644
index 0000000..b655f73
--- /dev/null
+++ b/crates/p2p/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "evolve-p2p"
+version = "0.1.0"
+edition = "2021"
+license.workspace = true
+repository.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+commonware-cryptography = { workspace = true }
+commonware-p2p = { workspace = true }
+commonware-utils = { workspace = true }
+
+tokio = { workspace = true }
+tracing = { workspace = true }
+
+[lints]
+workspace = true
diff --git a/crates/p2p/src/channels.rs b/crates/p2p/src/channels.rs
new file mode 100644
index 0000000..897896d
--- /dev/null
+++ b/crates/p2p/src/channels.rs
@@ -0,0 +1,155 @@
+//! P2P channel definitions for the Evolve protocol.
+//!
+//! Commonware P2P uses numbered channels with per-channel rate limits.
+//! Each channel is independent — rate limiting and message sizing are
+//! configured per channel so that e.g. large block transfers don't starve
+//! consensus vote messages.
+
+/// Channel ID constants for the Evolve P2P protocol.
+///
+/// Each channel has its own rate limit quota and message capacity.
+/// Consensus channels (0–2) are used by simplex BFT.
+/// Application channels (3–4) are Evolve-specific.
+pub mod channel {
+ /// Consensus vote messages (notarize, finalize, nullify).
+ /// High rate, small messages (~200 bytes).
+ pub const VOTES: u32 = 0;
+
+ /// Consensus certificates (notarization, finalization, nullification).
+ /// High rate, medium messages (~2 KB with aggregated sigs).
+ pub const CERTIFICATES: u32 = 1;
+
+ /// Block resolution requests/responses.
+ /// Medium rate, large messages (full block data).
+ pub const RESOLVER: u32 = 2;
+
+ /// Block broadcast from proposer to validators.
+ /// Low rate (1 per view), large messages.
+ pub const BLOCK_BROADCAST: u32 = 3;
+
+ /// Transaction gossip between validators.
+ /// Medium rate, medium messages (encoded transactions).
+ ///
+ /// NOTE: This channel is Evolve-specific — Alto doesn't have it because
+ /// Alto has no transaction concept.
+ pub const TX_GOSSIP: u32 = 4;
+
+ /// Total number of channels.
+ pub const COUNT: u32 = 5;
+}
+
+/// Rate limiting configuration for a single P2P channel.
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RateLimit {
+ /// Sustained throughput: messages allowed per second.
+ pub messages_per_second: u32,
+ /// Burst capacity: maximum message spike above the sustained rate.
+ pub burst: u32,
+}
+
+/// Configuration for a single P2P channel.
+#[derive(Clone, Debug)]
+pub struct ChannelConfig {
+ /// Channel identifier. Must match one of the constants in [`channel`].
+ pub id: u32,
+ /// Maximum encoded size of a single message on this channel (bytes).
+ pub max_message_size: usize,
+ /// Rate limiting parameters for this channel.
+ pub rate_limit: RateLimit,
+}
+
+/// Default channel configurations for all Evolve P2P channels.
+///
+/// Values are conservative starting points. Production deployments should
+/// tune `max_message_size` and `rate_limit` based on observed traffic.
+pub fn default_channel_configs() -> Vec {
+ vec![
+ ChannelConfig {
+ id: channel::VOTES,
+ max_message_size: 512,
+ rate_limit: RateLimit {
+ messages_per_second: 100,
+ burst: 200,
+ },
+ },
+ ChannelConfig {
+ id: channel::CERTIFICATES,
+ max_message_size: 4_096,
+ rate_limit: RateLimit {
+ messages_per_second: 100,
+ burst: 200,
+ },
+ },
+ ChannelConfig {
+ id: channel::RESOLVER,
+ max_message_size: 4_194_304, // 4 MiB
+ rate_limit: RateLimit {
+ messages_per_second: 10,
+ burst: 20,
+ },
+ },
+ ChannelConfig {
+ id: channel::BLOCK_BROADCAST,
+ max_message_size: 4_194_304, // 4 MiB
+ rate_limit: RateLimit {
+ messages_per_second: 5,
+ burst: 10,
+ },
+ },
+ ChannelConfig {
+ id: channel::TX_GOSSIP,
+ max_message_size: 65_536, // 64 KiB
+ rate_limit: RateLimit {
+ messages_per_second: 50,
+ burst: 100,
+ },
+ },
+ ]
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn channel_ids_are_unique_and_sequential() {
+ assert_eq!(channel::VOTES, 0);
+ assert_eq!(channel::CERTIFICATES, 1);
+ assert_eq!(channel::RESOLVER, 2);
+ assert_eq!(channel::BLOCK_BROADCAST, 3);
+ assert_eq!(channel::TX_GOSSIP, 4);
+ assert_eq!(channel::COUNT, 5);
+ }
+
+ #[test]
+ fn default_configs_cover_all_channels() {
+ let configs = default_channel_configs();
+ assert_eq!(configs.len(), channel::COUNT as usize);
+
+ let mut ids: Vec = configs.iter().map(|c| c.id).collect();
+ ids.sort_unstable();
+ let expected: Vec = (0..channel::COUNT).collect();
+ assert_eq!(ids, expected);
+ }
+
+ #[test]
+ fn default_configs_have_positive_limits() {
+ for cfg in default_channel_configs() {
+ assert!(
+ cfg.max_message_size > 0,
+ "channel {} has zero max_message_size",
+ cfg.id
+ );
+ assert!(
+ cfg.rate_limit.messages_per_second > 0,
+ "channel {} has zero messages_per_second",
+ cfg.id
+ );
+ assert!(
+ cfg.rate_limit.burst >= cfg.rate_limit.messages_per_second,
+ "channel {} burst is less than messages_per_second",
+ cfg.id
+ );
+ }
+ }
+}
diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs
new file mode 100644
index 0000000..272c4ec
--- /dev/null
+++ b/crates/p2p/src/config.rs
@@ -0,0 +1,87 @@
+//! Network configuration for Evolve's P2P layer.
+
+use std::{net::SocketAddr, time::Duration};
+
+use crate::channels::{default_channel_configs, ChannelConfig};
+
+/// Network configuration for Evolve's authenticated P2P layer.
+///
+/// Passed to the Commonware P2P bootstrapper at startup. All fields have
+/// sensible defaults via [`Default`]; override only what your deployment
+/// requires.
+#[derive(Clone, Debug)]
+pub struct NetworkConfig {
+ /// Address to listen on for incoming P2P connections.
+ pub listen_addr: SocketAddr,
+
+ /// Bootstrapper/seed node addresses for initial peer discovery.
+ ///
+ /// Leave empty for single-node dev setups. Production nodes should list
+ /// at least one stable seed address.
+ pub bootstrappers: Vec,
+
+ /// Per-channel rate limiting and message size configuration.
+ ///
+ /// Defaults to [`default_channel_configs`] covering all five Evolve channels.
+ pub channel_configs: Vec,
+
+ /// Maximum number of concurrent peer connections to maintain.
+ pub max_peers: usize,
+
+ /// Timeout for establishing a new peer connection.
+ pub connection_timeout: Duration,
+
+ /// Signing namespace for domain separation.
+ ///
+ /// Prevents cross-chain message replay. Must be unique per network.
+ pub namespace: Vec,
+}
+
+impl Default for NetworkConfig {
+ fn default() -> Self {
+ Self {
+ listen_addr: "0.0.0.0:9000".parse().expect("static addr is valid"),
+ bootstrappers: vec![],
+ channel_configs: default_channel_configs(),
+ max_peers: 50,
+ connection_timeout: Duration::from_secs(10),
+ namespace: b"_EVOLVE".to_vec(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::channels::channel;
+ use std::time::Duration;
+
+ #[test]
+ fn default_config_invariants() {
+ let cfg = NetworkConfig::default();
+
+ assert_eq!(cfg.listen_addr.port(), 9000);
+ assert!(cfg.bootstrappers.is_empty());
+ assert_eq!(cfg.channel_configs.len(), channel::COUNT as usize);
+ assert_eq!(cfg.namespace, b"_EVOLVE");
+ assert!(cfg.max_peers > 0);
+ assert!(cfg.connection_timeout > Duration::ZERO);
+ }
+
+ #[test]
+ fn custom_config_roundtrip() {
+ let addr: SocketAddr = "127.0.0.1:19000".parse().unwrap();
+ let seed: SocketAddr = "10.0.0.1:9000".parse().unwrap();
+ let cfg = NetworkConfig {
+ listen_addr: addr,
+ bootstrappers: vec![seed],
+ max_peers: 10,
+ namespace: b"_TEST".to_vec(),
+ ..NetworkConfig::default()
+ };
+ assert_eq!(cfg.listen_addr, addr);
+ assert_eq!(cfg.bootstrappers.len(), 1);
+ assert_eq!(cfg.max_peers, 10);
+ assert_eq!(cfg.namespace, b"_TEST");
+ }
+}
diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs
new file mode 100644
index 0000000..9105a59
--- /dev/null
+++ b/crates/p2p/src/lib.rs
@@ -0,0 +1,19 @@
+//! P2P networking configuration for Evolve's consensus layer.
+//!
+//! This crate configures Commonware's authenticated P2P networking
+//! for use in the Evolve blockchain. It provides:
+//!
+//! - Channel constants and rate limit configuration
+//! - `NetworkConfig` for bootstrapping the P2P layer
+//! - `ValidatorSet` for managing epoch-based validator identities
+//! - `EpochPeerProvider` implementing the `Provider` trait for peer management
+
+pub mod channels;
+pub mod config;
+pub mod provider;
+pub mod validator;
+
+pub use channels::{default_channel_configs, ChannelConfig, RateLimit};
+pub use config::NetworkConfig;
+pub use provider::EpochPeerProvider;
+pub use validator::{ValidatorIdentity, ValidatorSet};
diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs
new file mode 100644
index 0000000..16cfd69
--- /dev/null
+++ b/crates/p2p/src/provider.rs
@@ -0,0 +1,433 @@
+//! Epoch-based peer set provider for the Evolve P2P layer.
+//!
+//! [`EpochPeerProvider`] implements [`commonware_p2p::Provider`] so that the
+//! Commonware authenticated P2P stack can query the current set of validator
+//! peers and receive change notifications on epoch transitions.
+
+use std::{collections::BTreeMap, fmt, sync::Arc};
+
+use commonware_cryptography::ed25519;
+use commonware_p2p::Provider;
+use commonware_utils::ordered::Set;
+use tokio::sync::{mpsc, RwLock};
+
+/// Convenience alias for the subscriber notification payload.
+///
+/// `(published_id, new_peer_set, all_tracked_peers)`
+type Notification = (u64, Set, Set);
+const SUBSCRIBER_CHANNEL_CAPACITY: usize = 64;
+
+#[derive(Debug)]
+struct ProviderState {
+ /// Peer sets indexed by epoch. BTreeMap for deterministic iteration.
+ peer_sets: BTreeMap>,
+ /// Latest synthetic snapshot published to subscribers when no real epoch ID
+ /// can carry the update (for example after pruning old epochs).
+ synthetic_snapshot: Option<(u64, Set)>,
+ /// Active subscriber channels. Dead senders are pruned on each notify.
+ subscribers: Vec>,
+ /// Union of all currently tracked peer sets.
+ all_peers: Set,
+ /// Last published notification ID.
+ last_notification_id: Option,
+}
+
+impl ProviderState {
+ fn new() -> Self {
+ Self {
+ peer_sets: BTreeMap::new(),
+ synthetic_snapshot: None,
+ subscribers: Vec::new(),
+ all_peers: Set::default(),
+ last_notification_id: None,
+ }
+ }
+
+ fn recompute_all_peers(&mut self) {
+ let all_peers: Vec = self
+ .peer_sets
+ .values()
+ .flat_map(|s| s.iter().cloned())
+ .collect();
+ self.all_peers = Set::from_iter_dedup(all_peers);
+ }
+
+ fn latest_snapshot(&self) -> Option {
+ self.synthetic_snapshot
+ .as_ref()
+ .map(|(id, peers)| (*id, peers.clone(), self.all_peers.clone()))
+ .or_else(|| {
+ self.peer_sets
+ .iter()
+ .next_back()
+ .map(|(id, peers)| (*id, peers.clone(), self.all_peers.clone()))
+ })
+ }
+
+ fn publish_notification(
+ &mut self,
+ preferred_id: u64,
+ peers: Set,
+ ) -> Notification {
+ let id = match self.last_notification_id {
+ Some(last) if preferred_id <= last => last.saturating_add(1),
+ _ => preferred_id,
+ };
+ self.last_notification_id = Some(id);
+ self.synthetic_snapshot = if self.peer_sets.contains_key(&id) {
+ None
+ } else {
+ Some((id, peers.clone()))
+ };
+ (id, peers, self.all_peers.clone())
+ }
+
+ /// Notify live subscribers and prune dead channels.
+ fn notify(&mut self, notification: Notification) {
+ self.subscribers
+ .retain(|tx| match tx.try_send(notification.clone()) {
+ Ok(()) => true,
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ tracing::warn!(
+ peer_set_id = notification.0,
+ "provider: dropping peer-set notification for slow subscriber"
+ );
+ true
+ }
+ Err(mpsc::error::TrySendError::Closed(_)) => false,
+ });
+ }
+}
+
+/// Provides peer sets to the P2P layer based on epochs.
+///
+/// When the validator set changes (new epoch), call [`update_epoch`] to
+/// register the new peer set and notify all current subscribers.
+///
+/// [`EpochPeerProvider`] is cheaply cloneable; all clones share the same
+/// underlying state via an [`Arc`].
+///
+/// [`update_epoch`]: EpochPeerProvider::update_epoch
+#[derive(Clone)]
+pub struct EpochPeerProvider {
+ inner: Arc>,
+}
+
+impl fmt::Debug for EpochPeerProvider {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EpochPeerProvider").finish_non_exhaustive()
+ }
+}
+
+impl EpochPeerProvider {
+ /// Create a new provider with no registered peer sets.
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(RwLock::new(ProviderState::new())),
+ }
+ }
+
+ /// Register a peer set for the given epoch and notify all subscribers.
+ ///
+ /// If a peer set for `epoch` already exists it is replaced. Subscribers
+ /// receive the new set and the updated union of all tracked peers. The
+ /// notification ID is monotonic and may be newer than `epoch` when a
+ /// previous prune already published a synthetic snapshot.
+ ///
+ /// # Note
+ ///
+ /// Old epochs are not pruned — `peer_sets` grows with each call. For
+ /// production use, call [`retain_epochs`](Self::retain_epochs) periodically
+ /// to bound memory usage.
+ pub async fn update_epoch(&self, epoch: u64, peers: Set) {
+ let mut state = self.inner.write().await;
+ state.peer_sets.insert(epoch, peers.clone());
+ state.recompute_all_peers();
+ let notification = state.publish_notification(epoch, peers);
+
+ state.notify(notification);
+ }
+
+ /// Remove all epoch entries older than `min_epoch`.
+ ///
+ /// Recomputes `all_peers` from the remaining sets and notifies subscribers
+ /// with a fresh, monotonic snapshot ID so consumers observe the prune even
+ /// when the highest retained epoch does not change.
+ pub async fn retain_epochs(&self, min_epoch: u64) {
+ let mut state = self.inner.write().await;
+ state.peer_sets.retain(|&e, _| e >= min_epoch);
+ state.recompute_all_peers();
+ let notification = if let Some((epoch, peers)) = state
+ .peer_sets
+ .iter()
+ .next_back()
+ .map(|(e, p)| (*e, p.clone()))
+ {
+ state.publish_notification(epoch, peers)
+ } else {
+ state.publish_notification(min_epoch, Set::default())
+ };
+
+ state.notify(notification);
+ }
+}
+
+impl Default for EpochPeerProvider {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Provider for EpochPeerProvider {
+ type PublicKey = ed25519::PublicKey;
+
+ fn peer_set(
+ &mut self,
+ id: u64,
+ ) -> impl std::future::Future