Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum CheckpointSyncError {
Http(#[from] reqwest::Error),
#[error("SSZ deserialization failed: {0:?}")]
SszDecode(DecodeError),
#[error("Storage error: {0}")]
Storage(#[from] ethlambda_storage::Error),
#[error("checkpoint state slot cannot be 0")]
SlotIsZero,
#[error("checkpoint state has no validators")]
Expand Down
9 changes: 6 additions & 3 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ async fn main() -> eyre::Result<()> {
// and the API server (which exposes GET/POST admin endpoints).
let aggregator = AggregatorController::new(options.is_aggregator);

let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone());
let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone())
.expect("failed to spawn blockchain actor");

// Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the
// AggregatorController — subnet subscriptions are decided once here and
Expand Down Expand Up @@ -458,7 +459,8 @@ async fn fetch_initial_state(
let Some(checkpoint_url) = checkpoint_url else {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
return Store::from_anchor_state(backend, genesis_state)
.map_err(checkpoint_sync::CheckpointSyncError::from);
};

// Checkpoint sync path
Expand All @@ -476,5 +478,6 @@ async fn fetch_initial_state(
);

// Store the anchor state and header, without body
Ok(Store::from_anchor_state(backend, state))
Store::from_anchor_state(backend, state)
.map_err(checkpoint_sync::CheckpointSyncError::from)
}
98 changes: 59 additions & 39 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, SystemTime};

type Error = ethlambda_storage::Error;

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
use ethlambda_storage::{ALL_TABLES, Store};
Expand Down Expand Up @@ -52,10 +54,10 @@ impl BlockChain {
store: Store,
validator_keys: HashMap<u64, ValidatorKeyPair>,
aggregator: AggregatorController,
) -> BlockChain {
) -> Result<BlockChain, Error> {
metrics::set_is_aggregator(aggregator.is_enabled());
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
let genesis_time = store.config().genesis_time;
let genesis_time = store.config()?.genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
let handle = BlockChainServer {
store,
Expand All @@ -75,7 +77,7 @@ impl BlockChain {
handle.context(),
block_chain_protocol::Tick,
);
BlockChain { handle }
Ok(BlockChain { handle })
}

pub fn actor_ref(&self) -> &ActorRef<BlockChainServer> {
Expand Down Expand Up @@ -118,8 +120,8 @@ pub struct BlockChainServer {
}

impl BlockChainServer {
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) {
let genesis_time_ms = self.store.config().genesis_time * 1000;
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) -> Result<(), Error>{
let genesis_time_ms = self.store.config()?.genesis_time * 1000;

// Calculate current slot and interval from milliseconds
let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms);
Expand All @@ -128,9 +130,9 @@ impl BlockChainServer {

// Fail fast: a state with zero validators is invalid and would cause
// panics in proposer selection and attestation processing.
if self.store.head_state().validators.is_empty() {
if self.store.head_state()?.validators.is_empty() {
error!("Head state has no validators, skipping tick");
return;
return Ok(());
}

// Update current slot metric
Expand All @@ -146,9 +148,11 @@ impl BlockChainServer {
// At interval 0, check if we will propose (but don't build the block yet).
// Tick forkchoice first to accept attestations, then build the block
// using the freshly-accepted attestations.
let proposer_validator_id = (interval == 0 && slot > 0)
.then(|| self.get_our_proposer(slot))
.flatten();
let proposer_validator_id = if interval == 0 && slot > 0 {
self.get_our_proposer(slot)?
} else {
None
};

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(
Expand All @@ -174,9 +178,10 @@ impl BlockChainServer {
}

// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
metrics::update_safe_target_slot(self.store.safe_target_slot()?);
// Update head slot metric (head may change when attestations are promoted at intervals 0/4)
metrics::update_head_slot(self.store.head_slot());
metrics::update_head_slot(self.store.head_slot()?);
Ok(())
}

/// Kick off a committee-signature aggregation session:
Expand Down Expand Up @@ -236,21 +241,25 @@ impl BlockChainServer {
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 DB error silently treated as missing block during walk-up

In process_or_pend_block, the walk-up loop uses while let Ok(Some(header)) = self.store.get_block_header(&missing_root), which breaks silently on both Ok(None) and Err(...). When a storage error occurs mid-walk, the code falls through to self.request_missing_block(missing_root) — treating a read failure as if the block is genuinely absent from the DB. This issues a spurious P2P request for a block the node already holds, and on re-delivery the cycle restarts.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 242

Comment:
**DB error silently treated as missing block during walk-up**

In `process_or_pend_block`, the walk-up loop uses `while let Ok(Some(header)) = self.store.get_block_header(&missing_root)`, which breaks silently on both `Ok(None)` and `Err(...)`. When a storage error occurs mid-walk, the code falls through to `self.request_missing_block(missing_root)` — treating a read failure as if the block is genuinely absent from the DB. This issues a spurious P2P request for a block the node already holds, and on re-delivery the cycle restarts.

How can I resolve this? If you propose a fix, please make it concise.

/// Returns the validator ID if any of our validators is the proposer for this slot.
fn get_our_proposer(&self, slot: u64) -> Option<u64> {
let head_state = self.store.head_state();
fn get_our_proposer(&self, slot: u64) -> Result<Option<u64>, Error> {
let head_state = self.store.head_state()?;
let num_validators = head_state.validators.len() as u64;

self.key_manager
Ok(self.key_manager
.validator_ids()
.into_iter()
.find(|&vid| is_proposer(vid, slot, num_validators))
.find(|&vid| is_proposer(vid, slot, num_validators)))
}

fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) {
let _timing = metrics::time_attestations_production();

// Produce attestation data once for all validators
let attestation_data = store::produce_attestation_data(&self.store, slot);
let Ok(attestation_data) = store::produce_attestation_data(&self.store, slot)
.inspect_err(|err| error!(%slot, %err, "Failed to produce attestation data"))
else {
return;
};

// For each registered validator, produce and publish attestation
for validator_id in self.key_manager.validator_ids() {
Expand Down Expand Up @@ -351,14 +360,14 @@ impl BlockChainServer {

fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
store::on_block(&mut self.store, signed_block)?;
let head_slot = self.store.head_slot();
let head_slot = self.store.head_slot()?;
metrics::update_head_slot(head_slot);
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
metrics::update_latest_justified_slot(self.store.latest_justified()?.slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized()?.slot);
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);

// Update sync status based on head slot vs wall clock slot
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
let current_slot = self.store.time()? / INTERVALS_PER_SLOT;
let status = if head_slot >= current_slot {
metrics::SyncStatus::Synced
} else {
Expand All @@ -381,13 +390,15 @@ impl BlockChainServer {
// Here we process blocks iteratively, to avoid recursive calls that could
// cause a stack overflow.
while let Some(block) = queue.pop_front() {
self.process_or_pend_block(block, &mut queue);
let _ = self.process_or_pend_block(block, &mut queue)
.inspect_err(|e| error!(%e, "Failed to process or pend block"));
}

// Prune old states and blocks AFTER the entire cascade completes.
// Running this mid-cascade would delete states that pending children
// still need, causing re-processing loops when fallback pruning is active.
self.store.prune_old_data();
let _ = self.store.prune_old_data()
.inspect_err(|e| error!(%e, "Failed to prune old data"));
}

/// Try to process a single block. If its parent state is missing, store it
Expand All @@ -397,7 +408,7 @@ impl BlockChainServer {
&mut self,
signed_block: SignedBlock,
queue: &mut VecDeque<SignedBlock>,
) {
) -> Result<(), Error> {
let slot = signed_block.message.slot;
let block_root = signed_block.message.hash_tree_root();
let parent_root = signed_block.message.parent_root;
Expand All @@ -407,13 +418,13 @@ impl BlockChainServer {
// already part of the canonical chain and cannot affect fork choice.
// Discard any pending children: since we won't process this block,
// children referencing it as parent would remain stuck indefinitely.
if slot <= self.store.latest_finalized().slot {
if slot <= self.store.latest_finalized()?.slot {
self.discard_pending_subtree(block_root);
return;
return Ok(());
}

// Check if parent state exists before attempting to process
if !self.store.has_state(&parent_root) {
if !self.store.has_state(&parent_root)? {
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");

// Resolve the actual missing ancestor by walking the chain. A stale entry
Expand All @@ -427,7 +438,8 @@ impl BlockChainServer {
self.pending_block_parents.insert(block_root, missing_root);

// Persist block data to DB (no LiveChain entry — invisible to fork choice)
self.store.insert_pending_block(block_root, signed_block);
let _ = self.store.insert_pending_block(block_root, signed_block)
.inspect_err(|e| warn!(%block_root, %e, "Failed to persist pending block"));

// Store only the H256 reference in memory
self.pending_blocks
Expand All @@ -439,16 +451,22 @@ impl BlockChainServer {
// session, the actual missing block is further up the chain.
// Note: this loop always terminates — blocks reference parents by hash,
// so a cycle would require a hash collision.
while let Some(header) = self.store.get_block_header(&missing_root) {
if self.store.has_state(&header.parent_root) {
while let Ok(Some(header)) = self.store.get_block_header(&missing_root) {
if self.store.has_state(&header.parent_root)? {
// Parent state available — enqueue for processing, cascade
// handles the rest via the outer loop.
let block = self
.store
.get_signed_block(&missing_root)
.expect("header and parent state exist, so the full signed block must too");
queue.push_back(block);
return;
match self.store.get_signed_block(&missing_root) {
Ok(Some(block)) => {
queue.push_back(block);
}
Ok(None) => {
warn!(%missing_root, "Pending block missing from DB during walk-up");
}
Err(e) => {
error!(%missing_root, %e, "DB error fetching pending block during walk-up");
}
}
return Ok(());
}
// Block exists but parent doesn't have state — register as pending
// so the cascade works when the true ancestor arrives
Expand All @@ -463,7 +481,7 @@ impl BlockChainServer {

// Request the actual missing block from network
self.request_missing_block(missing_root);
return;
return Ok(());
}

// Parent exists, proceed with processing
Expand Down Expand Up @@ -491,6 +509,7 @@ impl BlockChainServer {
);
}
}
Ok(())
}

fn request_missing_block(&mut self, block_root: H256) {
Expand Down Expand Up @@ -520,7 +539,7 @@ impl BlockChainServer {
self.pending_block_parents.remove(&block_root);

// Load block data from DB
let Some(child_block) = self.store.get_signed_block(&block_root) else {
let Ok(Some(child_block)) = self.store.get_signed_block(&block_root) else {
warn!(
block_root = %ShortRoot(&block_root.0),
"Pending block missing from DB, skipping"
Expand Down Expand Up @@ -580,7 +599,8 @@ impl BlockChainServer {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_millis() as u64, ctx).await;
let _ = self.on_tick(timestamp.as_millis() as u64, ctx).await
.inspect_err(|e| error!(%e, "Tick failed"));
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
Expand Down
Loading