diff --git a/bin/ethlambda/src/checkpoint_sync.rs b/bin/ethlambda/src/checkpoint_sync.rs index 1f9deecb..e8ffa1e1 100644 --- a/bin/ethlambda/src/checkpoint_sync.rs +++ b/bin/ethlambda/src/checkpoint_sync.rs @@ -19,6 +19,8 @@ pub enum CheckpointSyncError { Http(#[from] reqwest::Error), #[error("SSZ deserialization failed: {0:?}")] SszDecode(DecodeError), + #[error("Storage error: {0}")] + Storage(#[from] ethlambda_storage::Error), #[error("checkpoint state slot cannot be 0")] SlotIsZero, #[error("checkpoint state has no validators")] diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816c..85c21d24 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -169,7 +169,8 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()) + .expect("failed to spawn blockchain actor"); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the // AggregatorController — subnet subscriptions are decided once here and @@ -458,7 +459,8 @@ async fn fetch_initial_state( let Some(checkpoint_url) = checkpoint_url else { info!("No checkpoint sync URL provided, initializing from genesis state"); let genesis_state = State::from_genesis(genesis.genesis_time, validators); - return Ok(Store::from_anchor_state(backend, genesis_state)); + return Store::from_anchor_state(backend, genesis_state) + .map_err(checkpoint_sync::CheckpointSyncError::from); }; // Checkpoint sync path @@ -476,5 +478,6 @@ async fn fetch_initial_state( ); // Store the anchor state and header, without body - Ok(Store::from_anchor_state(backend, state)) + Store::from_anchor_state(backend, state) + .map_err(checkpoint_sync::CheckpointSyncError::from) } diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 77c98826..df9ae5cd 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, SystemTime}; +type Error = ethlambda_storage::Error; + use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; use ethlambda_state_transition::is_proposer; use ethlambda_storage::{ALL_TABLES, Store}; @@ -52,10 +54,10 @@ impl BlockChain { store: Store, validator_keys: HashMap, aggregator: AggregatorController, - ) -> BlockChain { + ) -> Result { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); - let genesis_time = store.config().genesis_time; + let genesis_time = store.config()?.genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); let handle = BlockChainServer { store, @@ -75,7 +77,7 @@ impl BlockChain { handle.context(), block_chain_protocol::Tick, ); - BlockChain { handle } + Ok(BlockChain { handle }) } pub fn actor_ref(&self) -> &ActorRef { @@ -118,8 +120,8 @@ pub struct BlockChainServer { } impl BlockChainServer { - async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context) { - let genesis_time_ms = self.store.config().genesis_time * 1000; + async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context) -> Result<(), Error>{ + let genesis_time_ms = self.store.config()?.genesis_time * 1000; // Calculate current slot and interval from milliseconds let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -128,9 +130,9 @@ impl BlockChainServer { // Fail fast: a state with zero validators is invalid and would cause // panics in proposer selection and attestation processing. - if self.store.head_state().validators.is_empty() { + if self.store.head_state()?.validators.is_empty() { error!("Head state has no validators, skipping tick"); - return; + return Ok(()); } // Update current slot metric @@ -146,9 +148,11 @@ impl BlockChainServer { // At interval 0, check if we will propose (but don't build the block yet). // Tick forkchoice first to accept attestations, then build the block // using the freshly-accepted attestations. - let proposer_validator_id = (interval == 0 && slot > 0) - .then(|| self.get_our_proposer(slot)) - .flatten(); + let proposer_validator_id = if interval == 0 && slot > 0 { + self.get_our_proposer(slot)? + } else { + None + }; // Tick the store first - this accepts attestations at interval 0 if we have a proposal store::on_tick( @@ -174,9 +178,10 @@ impl BlockChainServer { } // Update safe target slot metric (updated by store.on_tick at interval 3) - metrics::update_safe_target_slot(self.store.safe_target_slot()); + metrics::update_safe_target_slot(self.store.safe_target_slot()?); // Update head slot metric (head may change when attestations are promoted at intervals 0/4) - metrics::update_head_slot(self.store.head_slot()); + metrics::update_head_slot(self.store.head_slot()?); + Ok(()) } /// Kick off a committee-signature aggregation session: @@ -236,21 +241,25 @@ impl BlockChainServer { } /// Returns the validator ID if any of our validators is the proposer for this slot. - fn get_our_proposer(&self, slot: u64) -> Option { - let head_state = self.store.head_state(); + fn get_our_proposer(&self, slot: u64) -> Result, Error> { + let head_state = self.store.head_state()?; let num_validators = head_state.validators.len() as u64; - self.key_manager + Ok(self.key_manager .validator_ids() .into_iter() - .find(|&vid| is_proposer(vid, slot, num_validators)) + .find(|&vid| is_proposer(vid, slot, num_validators))) } fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { let _timing = metrics::time_attestations_production(); // Produce attestation data once for all validators - let attestation_data = store::produce_attestation_data(&self.store, slot); + let Ok(attestation_data) = store::produce_attestation_data(&self.store, slot) + .inspect_err(|err| error!(%slot, %err, "Failed to produce attestation data")) + else { + return; + }; // For each registered validator, produce and publish attestation for validator_id in self.key_manager.validator_ids() { @@ -351,14 +360,14 @@ impl BlockChainServer { fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> { store::on_block(&mut self.store, signed_block)?; - let head_slot = self.store.head_slot(); + let head_slot = self.store.head_slot()?; metrics::update_head_slot(head_slot); - metrics::update_latest_justified_slot(self.store.latest_justified().slot); - metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); + metrics::update_latest_justified_slot(self.store.latest_justified()?.slot); + metrics::update_latest_finalized_slot(self.store.latest_finalized()?.slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); // Update sync status based on head slot vs wall clock slot - let current_slot = self.store.time() / INTERVALS_PER_SLOT; + let current_slot = self.store.time()? / INTERVALS_PER_SLOT; let status = if head_slot >= current_slot { metrics::SyncStatus::Synced } else { @@ -381,13 +390,15 @@ impl BlockChainServer { // Here we process blocks iteratively, to avoid recursive calls that could // cause a stack overflow. while let Some(block) = queue.pop_front() { - self.process_or_pend_block(block, &mut queue); + let _ = self.process_or_pend_block(block, &mut queue) + .inspect_err(|e| error!(%e, "Failed to process or pend block")); } // Prune old states and blocks AFTER the entire cascade completes. // Running this mid-cascade would delete states that pending children // still need, causing re-processing loops when fallback pruning is active. - self.store.prune_old_data(); + let _ = self.store.prune_old_data() + .inspect_err(|e| error!(%e, "Failed to prune old data")); } /// Try to process a single block. If its parent state is missing, store it @@ -397,7 +408,7 @@ impl BlockChainServer { &mut self, signed_block: SignedBlock, queue: &mut VecDeque, - ) { + ) -> Result<(), Error> { let slot = signed_block.message.slot; let block_root = signed_block.message.hash_tree_root(); let parent_root = signed_block.message.parent_root; @@ -407,13 +418,13 @@ impl BlockChainServer { // already part of the canonical chain and cannot affect fork choice. // Discard any pending children: since we won't process this block, // children referencing it as parent would remain stuck indefinitely. - if slot <= self.store.latest_finalized().slot { + if slot <= self.store.latest_finalized()?.slot { self.discard_pending_subtree(block_root); - return; + return Ok(()); } // Check if parent state exists before attempting to process - if !self.store.has_state(&parent_root) { + if !self.store.has_state(&parent_root)? { info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending"); // Resolve the actual missing ancestor by walking the chain. A stale entry @@ -427,7 +438,8 @@ impl BlockChainServer { self.pending_block_parents.insert(block_root, missing_root); // Persist block data to DB (no LiveChain entry — invisible to fork choice) - self.store.insert_pending_block(block_root, signed_block); + let _ = self.store.insert_pending_block(block_root, signed_block) + .inspect_err(|e| warn!(%block_root, %e, "Failed to persist pending block")); // Store only the H256 reference in memory self.pending_blocks @@ -439,16 +451,22 @@ impl BlockChainServer { // session, the actual missing block is further up the chain. // Note: this loop always terminates — blocks reference parents by hash, // so a cycle would require a hash collision. - while let Some(header) = self.store.get_block_header(&missing_root) { - if self.store.has_state(&header.parent_root) { + while let Ok(Some(header)) = self.store.get_block_header(&missing_root) { + if self.store.has_state(&header.parent_root)? { // Parent state available — enqueue for processing, cascade // handles the rest via the outer loop. - let block = self - .store - .get_signed_block(&missing_root) - .expect("header and parent state exist, so the full signed block must too"); - queue.push_back(block); - return; + match self.store.get_signed_block(&missing_root) { + Ok(Some(block)) => { + queue.push_back(block); + } + Ok(None) => { + warn!(%missing_root, "Pending block missing from DB during walk-up"); + } + Err(e) => { + error!(%missing_root, %e, "DB error fetching pending block during walk-up"); + } + } + return Ok(()); } // Block exists but parent doesn't have state — register as pending // so the cascade works when the true ancestor arrives @@ -463,7 +481,7 @@ impl BlockChainServer { // Request the actual missing block from network self.request_missing_block(missing_root); - return; + return Ok(()); } // Parent exists, proceed with processing @@ -491,6 +509,7 @@ impl BlockChainServer { ); } } + Ok(()) } fn request_missing_block(&mut self, block_root: H256) { @@ -520,7 +539,7 @@ impl BlockChainServer { self.pending_block_parents.remove(&block_root); // Load block data from DB - let Some(child_block) = self.store.get_signed_block(&block_root) else { + let Ok(Some(child_block)) = self.store.get_signed_block(&block_root) else { warn!( block_root = %ShortRoot(&block_root.0), "Pending block missing from DB, skipping" @@ -580,7 +599,8 @@ impl BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_millis() as u64, ctx).await; + let _ = self.on_tick(timestamp.as_millis() as u64, ctx).await + .inspect_err(|e| error!(%e, "Tick failed")); // Schedule the next tick at the next 800ms interval boundary let ms_since_epoch = timestamp.as_millis() as u64; let ms_to_next_interval = diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 45b3b220..e34160f9 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -42,19 +42,20 @@ fn accept_new_attestations(store: &mut Store, log_tree: bool) { store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); - update_head(store, log_tree); + let _ = update_head(store, log_tree) + .inspect_err(|e| warn!(%e, "Failed to update head")); } /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII /// fork choice tree to the terminal. -fn update_head(store: &mut Store, log_tree: bool) { - let blocks = store.get_live_chain(); +fn update_head(store: &mut Store, log_tree: bool) -> Result<(), StoreError>{ + let blocks = store.get_live_chain()?; let attestations = store.extract_latest_known_attestations(); - let old_head = store.head(); + let old_head = store.head()?; let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( - store.latest_justified().root, + store.latest_justified()?.root, &blocks, &attestations, 0, @@ -64,19 +65,23 @@ fn update_head(store: &mut Store, log_tree: bool) { metrics::observe_fork_choice_reorg_depth(depth); info!(%old_head, %new_head, depth, "Fork choice reorg detected"); } - store.update_checkpoints(ForkCheckpoints::head_only(new_head)); + store.update_checkpoints(ForkCheckpoints::head_only(new_head))?; if old_head != new_head { let old_slot = store .get_block_header(&old_head) + .ok() + .flatten() .map(|h| h.slot) .unwrap_or(0); let new_slot = store .get_block_header(&new_head) + .ok() + .flatten() .map(|h| h.slot) .unwrap_or(0); - let justified_slot = store.latest_justified().slot; - let finalized_slot = store.latest_finalized().slot; + let justified_slot = store.latest_justified()?.slot; + let finalized_slot = store.latest_finalized()?.slot; info!( head_slot = new_slot, head_root = %ShortRoot(&new_head.0), @@ -93,11 +98,12 @@ fn update_head(store: &mut Store, log_tree: bool) { &blocks, &weights, new_head, - store.latest_justified(), - store.latest_finalized(), + store.latest_justified()?, + store.latest_finalized()?, ); info!("\n{tree}"); } + Ok(()) } /// Update the safe target for attestation. @@ -111,21 +117,23 @@ fn update_head(store: &mut Store, log_tree: bool) { /// Counting "known" would let a node keep advancing its safe target on stale /// evidence even when live participation has collapsed: exactly the failure /// mode safe target is supposed to prevent. See leanSpec PR #680. -fn update_safe_target(store: &mut Store) { - let head_state = store.get_state(&store.head()).expect("head state exists"); +fn update_safe_target(store: &mut Store) -> Result<(), StoreError> { + let head_state = store.get_state(&store.head()?)? + .ok_or_else(|| StoreError::Storage("head state not found".into()))?; let num_validators = head_state.validators.len() as u64; let min_target_score = (num_validators * 2).div_ceil(3); - let blocks = store.get_live_chain(); + let blocks = store.get_live_chain()?; let attestations = store.extract_latest_new_attestations(); let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( - store.latest_justified().root, + store.latest_justified()?.root, &blocks, &attestations, min_target_score, ); - store.set_safe_target(safe_target); + store.set_safe_target(safe_target)?; + Ok(()) } /// Validate incoming attestation before processing. @@ -141,14 +149,14 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() // Availability Check - We cannot count a vote if we haven't seen the blocks involved. let source_header = store - .get_block_header(&data.source.root) + .get_block_header(&data.source.root)? .ok_or(StoreError::UnknownSourceBlock(data.source.root))?; let target_header = store - .get_block_header(&data.target.root) + .get_block_header(&data.target.root)? .ok_or(StoreError::UnknownTargetBlock(data.target.root))?; let head_header = store - .get_block_header(&data.head.root) + .get_block_header(&data.head.root)? .ok_or(StoreError::UnknownHeadBlock(data.head.root))?; // Topology Check - Source must be older than Target, and Head must be at least as recent. @@ -184,7 +192,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() // Time Check - Validate attestation is not too far in the future. // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = store.time() / INTERVALS_PER_SLOT; + let current_slot = store.time()? / INTERVALS_PER_SLOT; if data.slot > current_slot + 1 { return Err(StoreError::AttestationTooFarInFuture { attestation_slot: data.slot, @@ -203,26 +211,31 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// interval = store.time() % INTERVALS_PER_SLOT pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { // Convert UNIX timestamp (ms) to interval count since genesis - let genesis_time_ms = store.config().genesis_time * 1000; + let Ok(config) = store.config() else { return }; + let genesis_time_ms = config.genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. // Operations are idempotent, so this should be fine. - if time.saturating_sub(store.time()) > INTERVALS_PER_SLOT { - store.set_time(time - INTERVALS_PER_SLOT); + let current_time = store.time().unwrap_or(0); + if time.saturating_sub(current_time) > INTERVALS_PER_SLOT { + let _ = store.set_time(time - INTERVALS_PER_SLOT); } - while store.time() < time { - store.set_time(store.time() + 1); + loop { + let Ok(t) = store.time() else { return }; + if t >= time { break; } + if store.set_time(t + 1).is_err() { return; } - let slot = store.time() / INTERVALS_PER_SLOT; - let interval = store.time() % INTERVALS_PER_SLOT; + let Ok(new_time) = store.time() else { return }; + let slot = new_time / INTERVALS_PER_SLOT; + let interval = new_time % INTERVALS_PER_SLOT; trace!(%slot, %interval, "processing tick"); // has_proposal is only signaled for the final tick (matching Python spec behavior) - let is_final_tick = store.time() == time; + let is_final_tick = new_time == time; let should_signal_proposal = has_proposal && is_final_tick; // NOTE: here we assume on_tick never skips intervals. @@ -245,7 +258,8 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { } 3 => { // Update safe target for validators - update_safe_target(store); + let _ = update_safe_target(store) + .inspect_err(|e| warn!(%e, "Failed to update safe target")); } 4 => { // End of slot - accept accumulated attestations and log tree @@ -280,7 +294,7 @@ pub fn on_gossip_attestation( let target = attestation.data.target; let target_state = store - .get_state(&target.root) + .get_state(&target.root)? .ok_or(StoreError::MissingTargetState(target.root))?; if validator_id >= target_state.validators.len() as u64 { return Err(StoreError::InvalidValidatorIndex); @@ -343,7 +357,7 @@ pub fn on_gossip_aggregated_attestation( .inspect_err(|_| metrics::inc_attestations_invalid())?; let target_state = store - .get_state(&aggregated.data.target.root) + .get_state(&aggregated.data.target.root)? .ok_or(StoreError::MissingTargetState(aggregated.data.target.root))?; let validators = &target_state.validators; let num_validators = validators.len() as u64; @@ -437,7 +451,7 @@ fn on_block_core( let slot = block.slot; // Skip duplicate blocks (idempotent operation) - if store.has_state(&block_root) { + if store.has_state(&block_root)? { return Ok(()); } @@ -446,7 +460,7 @@ fn on_block_core( // This check ensures the state has been computed for the parent block. let parent_state = store - .get_state(&block.parent_root) + .get_state(&block.parent_root)? .ok_or(StoreError::MissingParentState { parent_root: block.parent_root, slot, @@ -491,18 +505,18 @@ fn on_block_core( post_state.latest_block_header.state_root = state_root; // Update justified/finalized checkpoints if they have higher slots - let justified = (post_state.latest_justified.slot > store.latest_justified().slot) + let justified = (post_state.latest_justified.slot > store.latest_justified()?.slot) .then_some(post_state.latest_justified); - let finalized = (post_state.latest_finalized.slot > store.latest_finalized().slot) + let finalized = (post_state.latest_finalized.slot > store.latest_finalized()?.slot) .then_some(post_state.latest_finalized); if justified.is_some() || finalized.is_some() { - store.update_checkpoints(ForkCheckpoints::new(store.head(), justified, finalized)); + store.update_checkpoints(ForkCheckpoints::new(store.head()?, justified, finalized))?; } // Store signed block and state - store.insert_signed_block(block_root, signed_block.clone()); - store.insert_state(block_root, post_state); + store.insert_signed_block(block_root, signed_block.clone())?; + store.insert_state(block_root, post_state)?; // Process block body attestations and their signatures let aggregated_attestations = &block.body.attestations; @@ -523,7 +537,7 @@ fn on_block_core( store.insert_known_aggregated_payloads_batch(known_entries); // Update forkchoice head based on new block and attestations - update_head(store, false); + update_head(store, false)?; let block_total = block_start.elapsed(); info!( @@ -541,11 +555,11 @@ fn on_block_core( /// Calculate target checkpoint for validator attestations. /// /// NOTE: this assumes that we have all the blocks from the head back to the latest finalized. -pub fn get_attestation_target(store: &Store) -> Checkpoint { +pub fn get_attestation_target(store: &Store) -> Result { get_attestation_target_with_checkpoints( store, - store.latest_justified(), - store.latest_finalized(), + store.latest_justified()?, + store.latest_finalized()?, ) } @@ -563,16 +577,16 @@ pub fn get_attestation_target_with_checkpoints( store: &Store, justified: Checkpoint, finalized: Checkpoint, -) -> Checkpoint { +) -> Result { // Start from current head - let mut target_block_root = store.head(); + let mut target_block_root = store.head()?; let mut target_header = store - .get_block_header(&target_block_root) - .expect("head block exists"); + .get_block_header(&target_block_root)? + .ok_or_else(|| StoreError::Storage("head block header not found".into()))?; let safe_target_block_slot = store - .get_block_header(&store.safe_target()) - .expect("safe target exists") + .get_block_header(&store.safe_target()?)? + .ok_or_else(|| StoreError::Storage("safe target block header not found".into()))? .slot; // Walk back toward safe target (up to `JUSTIFICATION_LOOKBACK_SLOTS` steps) @@ -583,8 +597,8 @@ pub fn get_attestation_target_with_checkpoints( if target_header.slot > safe_target_block_slot { target_block_root = target_header.parent_root; target_header = store - .get_block_header(&target_block_root) - .expect("parent block exists"); + .get_block_header(&target_block_root)? + .ok_or_else(|| StoreError::Storage("parent block header not found".into()))?; } else { break; } @@ -601,8 +615,8 @@ pub fn get_attestation_target_with_checkpoints( { target_block_root = target_header.parent_root; target_header = store - .get_block_header(&target_block_root) - .expect("parent block exists"); + .get_block_header(&target_block_root)? + .ok_or_else(|| StoreError::Storage("parent block header not found".into()))?; } // Guard: clamp target to justified (not in the spec). // @@ -622,13 +636,13 @@ pub fn get_attestation_target_with_checkpoints( justified_slot = justified.slot, "Attestation target walked behind justified source, clamping to justified" ); - return justified; + return Ok(justified); } - Checkpoint { + Ok(Checkpoint { root: target_block_root, slot: target_header.slot, - } + }) } /// Produce attestation data for the given slot. @@ -640,34 +654,36 @@ pub fn get_attestation_target_with_checkpoints( /// fixed-point attestation loop in `build_block`. /// /// See: -pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { - let head_root = store.head(); +pub fn produce_attestation_data(store: &Store, slot: u64) -> Result { + let head_root = store.head()?; + + let head_slot = store + .get_block_header(&head_root)? + .ok_or_else(|| StoreError::Storage("head block header not found".into()))? + .slot; let head_checkpoint = Checkpoint { root: head_root, - slot: store - .get_block_header(&head_root) - .expect("head block exists") - .slot, + slot: head_slot, }; - let target_checkpoint = get_attestation_target(store); + let target_checkpoint = get_attestation_target(store)?; - AttestationData { + Ok(AttestationData { slot, head: head_checkpoint, target: target_checkpoint, - source: store.latest_justified(), - } + source: store.latest_justified()?, + }) } /// Get the head for block proposal at the given slot. /// /// Ensures store is up-to-date and processes any pending attestations /// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +fn get_proposal_head(store: &mut Store, slot: u64) -> Result { // Calculate time corresponding to this slot - let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; + let slot_time_ms = store.config()?.genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) on_tick(store, slot_time_ms, true); @@ -675,7 +691,7 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Process any pending attestations before proposal accept_new_attestations(store, false); - store.head() + store.head().map_err(StoreError::from) } /// Produce a block and per-aggregated-attestation signature payloads for the target slot. @@ -688,9 +704,9 @@ pub fn produce_block_with_signatures( validator_index: u64, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { // Get parent block and state to build upon - let head_root = get_proposal_head(store, slot); + let head_root = get_proposal_head(store, slot)?; let head_state = store - .get_state(&head_root) + .get_state(&head_root)? .ok_or(StoreError::MissingParentState { parent_root: head_root, slot, @@ -709,7 +725,7 @@ pub fn produce_block_with_signatures( // Get known aggregated payloads: data_root -> (AttestationData, Vec) let aggregated_payloads = store.known_aggregated_payloads(); - let known_block_roots = store.get_block_roots(); + let known_block_roots = store.get_block_roots()?; let (block, signatures, post_checkpoints) = { let _timing = metrics::time_block_building_payload_aggregation(); @@ -728,7 +744,7 @@ pub fn produce_block_with_signatures( // see justification advance, degrading liveness: the fixed-point loop in // `build_block` is expected to incorporate pool attestations that close // any divergence inherited from a minority fork. - let store_justified_slot = store.latest_justified().slot; + let store_justified_slot = store.latest_justified()?.slot; if post_checkpoints.justified.slot < store_justified_slot { return Err(StoreError::JustifiedDivergenceNotClosed { block_justified_slot: post_checkpoints.justified.slot, @@ -849,6 +865,15 @@ pub enum StoreError { block_justified_slot: u64, store_justified_slot: u64, }, + + #[error("Storage error: {0}")] + Storage(#[source] ethlambda_storage::Error), +} + +impl From for StoreError { + fn from(e: ethlambda_storage::Error) -> Self { + Self::Storage(e) + } } /// Compute the bitwise union (OR) of two AggregationBits bitfields. @@ -1275,8 +1300,8 @@ fn reorg_depth(old_head: H256, new_head: H256, store: &Store) -> Option { return None; } - let old_head_header = store.get_block_header(&old_head)?; - let new_head_header = store.get_block_header(&new_head)?; + let old_head_header = store.get_block_header(&old_head).ok().flatten()?; + let new_head_header = store.get_block_header(&new_head).ok().flatten()?; let old_slot = old_head_header.slot; let new_slot = new_head_header.slot; @@ -1292,7 +1317,7 @@ fn reorg_depth(old_head: H256, new_head: H256, store: &Store) -> Option { // Bounded to avoid unbounded walks in pathological cases. const MAX_REORG_DEPTH: u64 = 128; let mut depth: u64 = 0; - while let Some(current_header) = store.get_block_header(¤t_root) { + while let Some(current_header) = store.get_block_header(¤t_root).ok().flatten() { if current_header.slot <= target_slot { // We've reached the target slot - check if we're at the target block return (current_root != target_root).then_some(depth); @@ -1609,9 +1634,9 @@ mod tests { }, }; let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store::get_forkchoice_store(backend, genesis_state, genesis_block); + let mut store = Store::get_forkchoice_store(backend, genesis_state, genesis_block).unwrap(); - let head_root = store.head(); + let head_root = store.head().unwrap(); let att_data = AttestationData { slot: 0, head: Checkpoint { diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 13d8cf7e..5dbb6b31 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -48,7 +48,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let anchor_block: Block = test.anchor_block.into(); let genesis_time = anchor_state.config.genesis_time; let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store::get_forkchoice_store(backend, anchor_state, anchor_block); + let mut store = Store::get_forkchoice_store(backend, anchor_state, anchor_block).unwrap(); // Block registry: maps block labels to their roots let mut block_registry: HashMap = HashMap::new(); @@ -193,7 +193,7 @@ fn validate_checks( // Validate time check: fixtures encode the expected store time in intervals // since genesis (matching `Store::time()`). if let Some(expected_time) = checks.time { - let actual_time = st.time(); + let actual_time = st.time().unwrap(); if actual_time != expected_time { return Err(format!( "Step {}: time mismatch: expected {}, got {}", @@ -226,7 +226,7 @@ fn validate_checks( } // Validate attestationTargetSlot if let Some(expected_slot) = checks.attestation_target_slot { - let target = store::get_attestation_target(st); + let target = store::get_attestation_target(st).unwrap(); if target.slot != expected_slot { return Err(format!( "Step {}: attestationTargetSlot mismatch: expected {}, got {}", @@ -236,7 +236,7 @@ fn validate_checks( } // Also validate the root matches a block at this slot - let blocks = st.get_live_chain(); + let blocks = st.get_live_chain().unwrap(); let block_found = blocks .iter() .any(|(root, (slot, _))| *slot == expected_slot && *root == target.root); @@ -257,10 +257,11 @@ fn validate_checks( // Validate headSlot if let Some(expected_slot) = checks.head_slot { - let head_root = st.head(); + let head_root = st.head().unwrap(); let head_header = st .get_block_header(&head_root) - .ok_or_else(|| format!("Step {}: head block not found", step_idx))?; + .map_err(|e| format!("Step {step_idx}: DB error reading head block: {e}"))? + .ok_or_else(|| format!("Step {step_idx}: head block not found"))?; if head_header.slot != expected_slot { return Err(format!( "Step {}: headSlot mismatch: expected {}, got {}", @@ -272,7 +273,7 @@ fn validate_checks( // Validate headRoot (resolved from headRootLabel if headRoot not provided) if let Some(ref expected_root) = resolved_head_root { - let head_root = st.head(); + let head_root = st.head().unwrap(); if head_root != *expected_root { return Err(format!( "Step {}: headRoot mismatch: expected {:?}, got {:?}", @@ -284,7 +285,7 @@ fn validate_checks( // Validate latestJustifiedSlot if let Some(expected_slot) = checks.latest_justified_slot { - let justified = st.latest_justified(); + let justified = st.latest_justified().unwrap(); if justified.slot != expected_slot { return Err(format!( "Step {}: latestJustifiedSlot mismatch: expected {}, got {}", @@ -296,7 +297,7 @@ fn validate_checks( // Validate latestJustifiedRoot (resolved from label if root not provided) if let Some(ref expected_root) = resolved_justified_root { - let justified = st.latest_justified(); + let justified = st.latest_justified().unwrap(); if justified.root != *expected_root { return Err(format!( "Step {}: latestJustifiedRoot mismatch: expected {:?}, got {:?}", @@ -308,7 +309,7 @@ fn validate_checks( // Validate latestFinalizedSlot if let Some(expected_slot) = checks.latest_finalized_slot { - let finalized = st.latest_finalized(); + let finalized = st.latest_finalized().unwrap(); if finalized.slot != expected_slot { return Err(format!( "Step {}: latestFinalizedSlot mismatch: expected {}, got {}", @@ -320,7 +321,7 @@ fn validate_checks( // Validate latestFinalizedRoot (resolved from label if root not provided) if let Some(ref expected_root) = resolved_finalized_root { - let finalized = st.latest_finalized(); + let finalized = st.latest_finalized().unwrap(); if finalized.root != *expected_root { return Err(format!( "Step {}: latestFinalizedRoot mismatch: expected {:?}, got {:?}", @@ -434,7 +435,7 @@ fn validate_lexicographic_head_among( .into()); } - let blocks = st.get_live_chain(); + let blocks = st.get_live_chain().unwrap(); let known_attestations: HashMap = st.extract_latest_known_attestations(); // Resolve all fork labels to roots and compute their weights @@ -510,7 +511,7 @@ fn validate_lexicographic_head_among( .expect("fork_data is not empty"); // Verify the current head matches the lexicographically highest root - let actual_head_root = st.head(); + let actual_head_root = st.head().unwrap(); if actual_head_root != expected_head_root { let highest_label = fork_data .iter() diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index e7c1a888..8ffad8bd 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -44,7 +44,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Initialize the store with the anchor state and block let genesis_time = anchor_state.config.genesis_time; let backend = Arc::new(InMemoryBackend::new()); - let mut st = Store::get_forkchoice_store(backend, anchor_state, anchor_block); + let mut st = Store::get_forkchoice_store(backend, anchor_state, anchor_block).unwrap(); // Step 2: Run the state transition function with the block fixture let signed_block: SignedBlock = test.signed_block.into(); diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 1f4b9aa9..fdcb4099 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -110,10 +110,10 @@ async fn handle_blocks_by_root_request( let mut blocks = Vec::new(); for root in request.roots.iter() { - if let Some(signed_block) = server.store.get_signed_block(root) { + if let Ok(Some(signed_block)) = server.store.get_signed_block(root) { blocks.push(signed_block); } - // Missing blocks are silently skipped (per spec) + // Missing blocks and DB errors are silently skipped (per spec) } let found = blocks.len(); @@ -172,12 +172,14 @@ async fn handle_blocks_by_root_response( /// Build a Status message from the current Store state. pub fn build_status(store: &Store) -> Status { - let finalized = store.latest_finalized(); - let head_root = store.head(); + let finalized = store.latest_finalized().unwrap_or_default(); + let head_root = store.head().unwrap_or_default(); let head_slot = store .get_block_header(&head_root) - .expect("head block exists") - .slot; + .ok() + .flatten() + .map(|h| h.slot) + .unwrap_or(0); Status { finalized, head: Checkpoint { diff --git a/crates/net/rpc/src/fork_choice.rs b/crates/net/rpc/src/fork_choice.rs index 75fb2702..7cebd750 100644 --- a/crates/net/rpc/src/fork_choice.rs +++ b/crates/net/rpc/src/fork_choice.rs @@ -30,26 +30,38 @@ pub struct ForkChoiceNode { pub async fn get_fork_choice( axum::extract::State(store): axum::extract::State, ) -> impl IntoResponse { - let blocks = store.get_live_chain(); + use axum::http::StatusCode; + + macro_rules! try_store { + ($e:expr) => { + match $e { + Ok(v) => v, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } + }; + } + + let blocks = try_store!(store.get_live_chain()); let attestations = store.extract_latest_known_attestations(); - let justified = store.latest_justified(); - let finalized = store.latest_finalized(); + let justified = try_store!(store.latest_justified()); + let finalized = try_store!(store.latest_finalized()); let start_slot = finalized.slot; let weights = ethlambda_fork_choice::compute_block_weights(start_slot, &blocks, &attestations); - let head = store.head(); - let safe_target = store.safe_target(); + let head = try_store!(store.head()); + let safe_target = try_store!(store.safe_target()); - let head_state = store.head_state(); - let validator_count = head_state.validators.len() as u64; + let validator_count = try_store!(store.head_state()).validators.len() as u64; let nodes: Vec = blocks .iter() .map(|(root, &(slot, parent_root))| { let proposer_index = store .get_block_header(root) + .ok() + .flatten() .map(|h| h.proposer_index) .unwrap_or(0); @@ -106,7 +118,7 @@ mod tests { async fn test_get_fork_choice_returns_json() { let state = create_test_state(); let backend = Arc::new(InMemoryBackend::new()); - let store = Store::from_anchor_state(backend, state); + let store = Store::from_anchor_state(backend, state).unwrap(); let app = build_test_router(store); @@ -143,7 +155,7 @@ mod tests { async fn test_get_fork_choice_ui_returns_html() { let state = create_test_state(); let backend = Arc::new(InMemoryBackend::new()); - let store = Store::from_anchor_state(backend, state); + let store = Store::from_anchor_state(backend, state).unwrap(); let app = build_test_router(store); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index acec7fa1..b2247773 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,7 +1,8 @@ use std::net::SocketAddr; use axum::{ - Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get, + Extension, Json, Router, http::HeaderValue, http::StatusCode, http::header, + response::IntoResponse, routing::get, }; use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; @@ -79,25 +80,31 @@ fn build_debug_router() -> Router { async fn get_latest_finalized_state( axum::extract::State(store): axum::extract::State, ) -> impl IntoResponse { - let finalized = store.latest_finalized(); - let mut state = store - .get_state(&finalized.root) - .expect("finalized state exists"); - - // Zero state_root to match the canonical post-state representation. - // The spec's state_transition sets state_root to zero during process_block_header, - // and only fills it in lazily at the next slot's process_slots. - // Serving the canonical form ensures checkpoint sync interoperability. - state.latest_block_header.state_root = H256::ZERO; - - ssz_response(state.to_ssz()) + let finalized = match store.latest_finalized() { + Ok(cp) => cp, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + }; + match store.get_state(&finalized.root) { + Ok(Some(mut state)) => { + // Zero state_root to match the canonical post-state representation. + // The spec's state_transition sets state_root to zero during process_block_header, + // and only fills it in lazily at the next slot's process_slots. + // Serving the canonical form ensures checkpoint sync interoperability. + state.latest_block_header.state_root = H256::ZERO; + ssz_response(state.to_ssz()) + } + Ok(None) => StatusCode::NOT_FOUND.into_response(), + Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } } async fn get_latest_justified_state( axum::extract::State(store): axum::extract::State, ) -> impl IntoResponse { - let checkpoint = store.latest_justified(); - json_response(checkpoint) + match store.latest_justified() { + Ok(checkpoint) => json_response(checkpoint), + Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } } fn json_response(value: T) -> axum::response::Response { @@ -176,7 +183,7 @@ mod tests { async fn test_get_latest_justified_checkpoint() { let state = create_test_state(); let backend = Arc::new(InMemoryBackend::new()); - let store = Store::from_anchor_state(backend, state); + let store = Store::from_anchor_state(backend, state).unwrap(); let app = build_api_router(store.clone()); @@ -196,7 +203,7 @@ mod tests { let checkpoint: serde_json::Value = serde_json::from_slice(&body).unwrap(); // The justified checkpoint should match the store's latest justified - let expected = store.latest_justified(); + let expected = store.latest_justified().unwrap(); assert_eq!( checkpoint, json!({ @@ -212,11 +219,11 @@ mod tests { use libssz::SszEncode; let state = create_test_state(); let backend = Arc::new(InMemoryBackend::new()); - let store = Store::from_anchor_state(backend, state); + let store = Store::from_anchor_state(backend, state).unwrap(); // Build expected SSZ with zeroed state_root (canonical post-state form) - let finalized = store.latest_finalized(); - let mut expected_state = store.get_state(&finalized.root).unwrap(); + let finalized = store.latest_finalized().unwrap(); + let mut expected_state = store.get_state(&finalized.root).unwrap().unwrap(); expected_state.latest_block_header.state_root = H256::ZERO; let expected_ssz = expected_state.to_ssz(); diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 9c30f9c8..15bec00b 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -2,5 +2,5 @@ mod api; pub mod backend; mod store; -pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; +pub use api::{ALL_TABLES, Error, StorageBackend, StorageReadView, StorageWriteBatch, Table}; pub use store::{ForkCheckpoints, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 4a3d4fd5..4ccd152f 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, LazyLock, Mutex}; /// allowing us to skip storing empty bodies and reconstruct them on read. static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().hash_tree_root()); -use crate::api::{StorageBackend, StorageWriteBatch, Table}; +use crate::api::{StorageBackend, StorageWriteBatch, Table, Error}; use ethlambda_types::{ attestation::{AttestationData, HashedAttestationData}, @@ -427,7 +427,7 @@ impl Store { /// /// Uses the state's `latest_block_header` as the anchor block header. /// No block body is stored since it's not available. - pub fn from_anchor_state(backend: Arc, anchor_state: State) -> Self { + pub fn from_anchor_state(backend: Arc, anchor_state: State) -> Result { Self::init_store(backend, anchor_state, None) } @@ -444,7 +444,7 @@ impl Store { backend: Arc, anchor_state: State, anchor_block: Block, - ) -> Self { + ) -> Result { // Compare headers with state_root zeroed (init_store handles state_root separately) let mut state_header = anchor_state.latest_block_header.clone(); let mut block_header = anchor_block.header(); @@ -466,7 +466,7 @@ impl Store { backend: Arc, mut anchor_state: State, anchor_body: Option, - ) -> Self { + ) -> Result { // Save original state_root for validation let original_state_root = anchor_state.latest_block_header.state_root; @@ -494,7 +494,7 @@ impl Store { // Insert initial data { - let mut batch = backend.begin_write().expect("write batch"); + let mut batch = backend.begin_write()?; // Metadata let metadata_entries = vec![ @@ -505,74 +505,60 @@ impl Store { (KEY_LATEST_JUSTIFIED.to_vec(), anchor_checkpoint.to_ssz()), (KEY_LATEST_FINALIZED.to_vec(), anchor_checkpoint.to_ssz()), ]; - batch - .put_batch(Table::Metadata, metadata_entries) - .expect("put metadata"); + batch.put_batch(Table::Metadata, metadata_entries)?; // Block header let header_entries = vec![( anchor_block_root.to_ssz(), anchor_state.latest_block_header.to_ssz(), )]; - batch - .put_batch(Table::BlockHeaders, header_entries) - .expect("put block header"); + batch.put_batch(Table::BlockHeaders, header_entries)?; // Block body (if provided) if let Some(body) = anchor_body { let body_entries = vec![(anchor_block_root.to_ssz(), body.to_ssz())]; - batch - .put_batch(Table::BlockBodies, body_entries) - .expect("put block body"); + batch.put_batch(Table::BlockBodies, body_entries)?; } // State let state_entries = vec![(anchor_block_root.to_ssz(), anchor_state.to_ssz())]; - batch - .put_batch(Table::States, state_entries) - .expect("put state"); + batch.put_batch(Table::States, state_entries)?; // Live chain index let index_entries = vec![( encode_live_chain_key(anchor_state.latest_block_header.slot, &anchor_block_root), anchor_state.latest_block_header.parent_root.to_ssz(), )]; - batch - .put_batch(Table::LiveChain, index_entries) - .expect("put live chain index"); + batch.put_batch(Table::LiveChain, index_entries)?; - batch.commit().expect("commit"); + batch.commit()?; } info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - Self { + Ok(Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), - } + }) } // ============ Metadata Helpers ============ - fn get_metadata(&self, key: &[u8]) -> T { - let view = self.backend.begin_read().expect("read view"); - let bytes = view - .get(Table::Metadata, key) - .expect("get") - .expect("metadata key exists"); - T::from_ssz_bytes(&bytes).expect("valid encoding") + fn get_metadata(&self, key: &[u8]) -> Result { + let view = self.backend.begin_read()?; + let bytes = view.get(Table::Metadata, key)?.ok_or("Metadata key not found")?; + T::from_ssz_bytes(&bytes).map_err(|e| e.into()) } - fn set_metadata(&self, key: &[u8], value: &T) { - let mut batch = self.backend.begin_write().expect("write batch"); + fn set_metadata(&self, key: &[u8], value: &T) -> Result<(), Error>{ + let mut batch = self.backend.begin_write()?; batch - .put_batch(Table::Metadata, vec![(key.to_vec(), value.to_ssz())]) - .expect("put metadata"); - batch.commit().expect("commit"); + .put_batch(Table::Metadata, vec![(key.to_vec(), value.to_ssz())])?; + batch.commit() } // ============ Time ============ @@ -582,50 +568,50 @@ impl Store { /// Each increment represents one 800ms interval. Derive slot/interval as: /// slot = time() / INTERVALS_PER_SLOT /// interval = time() % INTERVALS_PER_SLOT - pub fn time(&self) -> u64 { + pub fn time(&self) -> Result { self.get_metadata(KEY_TIME) } /// Sets the current store time. - pub fn set_time(&mut self, time: u64) { - self.set_metadata(KEY_TIME, &time); + pub fn set_time(&mut self, time: u64) -> Result<(), Error> { + self.set_metadata(KEY_TIME, &time) } // ============ Config ============ /// Returns the chain configuration. - pub fn config(&self) -> ChainConfig { + pub fn config(&self) -> Result { self.get_metadata(KEY_CONFIG) } // ============ Head ============ /// Returns the current head block root. - pub fn head(&self) -> H256 { + pub fn head(&self) -> Result { self.get_metadata(KEY_HEAD) } // ============ Safe Target ============ /// Returns the safe target block root for attestations. - pub fn safe_target(&self) -> H256 { + pub fn safe_target(&self) -> Result { self.get_metadata(KEY_SAFE_TARGET) } /// Sets the safe target block root. - pub fn set_safe_target(&mut self, safe_target: H256) { - self.set_metadata(KEY_SAFE_TARGET, &safe_target); + pub fn set_safe_target(&mut self, safe_target: H256) -> Result<(), Error> { + self.set_metadata(KEY_SAFE_TARGET, &safe_target) } // ============ Checkpoints ============ /// Returns the latest justified checkpoint. - pub fn latest_justified(&self) -> Checkpoint { + pub fn latest_justified(&self) -> Result { self.get_metadata(KEY_LATEST_JUSTIFIED) } /// Returns the latest finalized checkpoint. - pub fn latest_finalized(&self) -> Checkpoint { + pub fn latest_finalized(&self) -> Result { self.get_metadata(KEY_LATEST_FINALIZED) } @@ -638,9 +624,9 @@ impl Store { /// - Finalized is updated if provided. /// /// When finalization advances, prunes the LiveChain index. - pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { + pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) -> Result<(), Error> { // Read old finalized slot before updating metadata - let old_finalized_slot = self.latest_finalized().slot; + let old_finalized_slot = self.latest_finalized()?.slot; let mut entries = vec![(KEY_HEAD.to_vec(), checkpoints.head.to_ssz())]; @@ -652,18 +638,18 @@ impl Store { entries.push((KEY_LATEST_FINALIZED.to_vec(), finalized.to_ssz())); } - let mut batch = self.backend.begin_write().expect("write batch"); - batch.put_batch(Table::Metadata, entries).expect("put"); - batch.commit().expect("commit"); + let mut batch = self.backend.begin_write()?; + batch.put_batch(Table::Metadata, entries)?; + batch.commit()?; // Lightweight pruning that should happen immediately on finalization advance: // live chain index, signatures, and attestation data. These are cheap and // affect fork choice correctness (live chain) or attestation processing. // Heavy state/block pruning is deferred to prune_old_data(). - if let Some(finalized) = checkpoints.finalized + Ok(if let Some(finalized) = checkpoints.finalized && finalized.slot > old_finalized_slot { - let pruned_chain = self.prune_live_chain(finalized.slot); + let pruned_chain = self.prune_live_chain(finalized.slot)?; let pruned_sigs = self.prune_gossip_signatures(finalized.slot); if pruned_chain > 0 || pruned_sigs > 0 { @@ -672,7 +658,7 @@ impl Store { pruned_chain, pruned_sigs, "Pruned finalized data" ); } - } + }) } /// Prune old states and blocks to keep storage bounded. @@ -681,13 +667,13 @@ impl Store { /// pruning until after a batch of blocks has been fully processed. Running /// this mid-cascade would delete states that pending children still need, /// causing infinite re-processing loops when fallback pruning is active. - pub fn prune_old_data(&mut self) { - let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; - let pruned_states = self.prune_old_states(&protected_roots); - let pruned_blocks = self.prune_old_blocks(&protected_roots); - if pruned_states > 0 || pruned_blocks > 0 { + pub fn prune_old_data(&mut self) -> Result<(), Error> { + let protected_roots = [self.latest_finalized()?.root, self.latest_justified()?.root]; + let pruned_states = self.prune_old_states(&protected_roots)?; + let pruned_blocks = self.prune_old_blocks(&protected_roots)?; + Ok(if pruned_states > 0 || pruned_blocks > 0 { info!(pruned_states, pruned_blocks, "Pruned old states and blocks"); - } + }) } // ============ Blocks ============ @@ -696,15 +682,14 @@ impl Store { /// /// Iterates only the LiveChain table, avoiding Block deserialization. /// Returns only non-finalized blocks, automatically pruned on finalization. - pub fn get_live_chain(&self) -> HashMap { - let view = self.backend.begin_read().expect("read view"); - view.prefix_iterator(Table::LiveChain, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { + pub fn get_live_chain(&self) -> Result, Error> { + let view = self.backend.begin_read()?; + view.prefix_iterator(Table::LiveChain, &[])? + .map(|res| -> Result<(H256, (u64, H256)), Error> { + let (k, v) = res?; let (slot, root) = decode_live_chain_key(&k); - let parent_root = H256::from_ssz_bytes(&v).expect("valid parent_root"); - (root, (slot, parent_root)) + let parent_root = H256::from_ssz_bytes(&v).map_err(|e| -> Error { Box::new(e) })?; + Ok((root, (slot, parent_root))) }) .collect() } @@ -712,14 +697,13 @@ impl Store { /// Get all known block roots as HashSet. /// /// Useful for checking block existence without deserializing. - pub fn get_block_roots(&self) -> HashSet { - let view = self.backend.begin_read().expect("read view"); - view.prefix_iterator(Table::LiveChain, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, _)| { + pub fn get_block_roots(&self) -> Result, Error> { + let view = self.backend.begin_read()?; + view.prefix_iterator(Table::LiveChain, &[])? + .map(|res| -> Result { + let (k, _) = res?; let (_, root) = decode_live_chain_key(&k); - root + Ok(root) }) .collect() } @@ -730,14 +714,13 @@ impl Store { /// LiveChain index is pruned. /// /// Returns the number of entries pruned. - pub fn prune_live_chain(&mut self, finalized_slot: u64) -> usize { - let view = self.backend.begin_read().expect("read view"); + pub fn prune_live_chain(&mut self, finalized_slot: u64) -> Result { + let view = self.backend.begin_read()?; // Collect keys to delete - stop once we hit finalized_slot // Keys are sorted by slot (big-endian encoding) so we can stop early let keys_to_delete: Vec<_> = view - .prefix_iterator(Table::LiveChain, &[]) - .expect("iterator") + .prefix_iterator(Table::LiveChain, &[])? .filter_map(|res| res.ok()) .take_while(|(k, _)| { let (slot, _) = decode_live_chain_key(k); @@ -749,15 +732,13 @@ impl Store { let count = keys_to_delete.len(); if count == 0 { - return 0; + return Ok(0); } - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::LiveChain, keys_to_delete) - .expect("delete non-finalized chain entries"); - batch.commit().expect("commit"); - count + let mut batch = self.backend.begin_write()?; + batch.delete_batch(Table::LiveChain, keys_to_delete)?; + batch.commit()?; + Ok(count) } /// Prune gossip signatures for slots <= finalized_slot. @@ -774,23 +755,22 @@ impl Store { /// states whose roots appear in `protected_roots` (finalized, justified). /// /// Returns the number of states pruned. - pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> usize { - let view = self.backend.begin_read().expect("read view"); + pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> Result { + let view = self.backend.begin_read()?; // Collect (root_bytes, slot) from BlockHeaders to determine state age. + // Intentionally skip corrupt entries with filter_map + ok(). let mut entries: Vec<(Vec, u64)> = view - .prefix_iterator(Table::BlockHeaders, &[]) - .expect("iterator") + .prefix_iterator(Table::BlockHeaders, &[])? .filter_map(|res| res.ok()) - .map(|(key, value)| { - let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); - (key.to_vec(), header.slot) + .filter_map(|(key, value)| { + BlockHeader::from_ssz_bytes(&value).ok().map(|h| (key.to_vec(), h.slot)) }) .collect(); drop(view); if entries.len() <= STATES_TO_KEEP { - return 0; + return Ok(0); } // Sort by slot descending (newest first) @@ -808,13 +788,11 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::States, keys_to_delete) - .expect("delete old states"); - batch.commit().expect("commit"); + let mut batch = self.backend.begin_write()?; + batch.delete_batch(Table::States, keys_to_delete)?; + batch.commit()?; } - count + Ok(count) } /// Prune old blocks beyond the retention window. @@ -824,22 +802,21 @@ impl Store { /// Deletes from `BlockHeaders`, `BlockBodies`, and `BlockSignatures`. /// /// Returns the number of blocks pruned. - pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> usize { - let view = self.backend.begin_read().expect("read view"); + pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> Result { + let view = self.backend.begin_read()?; + // Intentionally skip corrupt entries with filter_map + ok(). let mut entries: Vec<(Vec, u64)> = view - .prefix_iterator(Table::BlockHeaders, &[]) - .expect("iterator") + .prefix_iterator(Table::BlockHeaders, &[])? .filter_map(|res| res.ok()) - .map(|(key, value)| { - let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); - (key.to_vec(), header.slot) + .filter_map(|(key, value)| { + BlockHeader::from_ssz_bytes(&value).ok().map(|h| (key.to_vec(), h.slot)) }) .collect(); drop(view); if entries.len() <= BLOCKS_TO_KEEP { - return 0; + return Ok(0); } // Sort by slot descending (newest first) @@ -856,27 +833,21 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) - .expect("delete old block headers"); - batch - .delete_batch(Table::BlockBodies, keys_to_delete.clone()) - .expect("delete old block bodies"); - batch - .delete_batch(Table::BlockSignatures, keys_to_delete) - .expect("delete old block signatures"); - batch.commit().expect("commit"); + let mut batch = self.backend.begin_write()?; + batch.delete_batch(Table::BlockHeaders, keys_to_delete.clone())?; + batch.delete_batch(Table::BlockBodies, keys_to_delete.clone())?; + batch.delete_batch(Table::BlockSignatures, keys_to_delete)?; + batch.commit()?; } - count + Ok(count) } /// Get the block header by root. - pub fn get_block_header(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::BlockHeaders, &root.to_ssz()) - .expect("get") - .map(|bytes| BlockHeader::from_ssz_bytes(&bytes).expect("valid header")) + pub fn get_block_header(&self, root: &H256) -> Result, Error> { + let view = self.backend.begin_read()?; + view.get(Table::BlockHeaders, &root.to_ssz())? + .map(|bytes| BlockHeader::from_ssz_bytes(&bytes).map_err(|e| -> Error { Box::new(e) })) + .transpose() } // ============ Signed Blocks ============ @@ -890,10 +861,10 @@ impl Store { /// /// When the block is later processed via [`insert_signed_block`](Self::insert_signed_block), /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. - pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlock) { - let mut batch = self.backend.begin_write().expect("write batch"); - write_signed_block(batch.as_mut(), &root, signed_block); - batch.commit().expect("commit"); + pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlock) -> Result<(), Error> { + let mut batch = self.backend.begin_write()?; + write_signed_block(batch.as_mut(), &root, signed_block)?; + batch.commit() } /// Insert a signed block, storing the block and signatures separately. @@ -903,75 +874,81 @@ impl Store { /// only storing signatures for non-genesis blocks. /// /// Takes ownership to avoid cloning large signature data. - pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlock) { - let mut batch = self.backend.begin_write().expect("write batch"); - let block = write_signed_block(batch.as_mut(), &root, signed_block); + pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlock) -> Result<(), Error> { + let mut batch = self.backend.begin_write()?; + let block = write_signed_block(batch.as_mut(), &root, signed_block)?; let index_entries = vec![( encode_live_chain_key(block.slot, &root), block.parent_root.to_ssz(), )]; - batch - .put_batch(Table::LiveChain, index_entries) - .expect("put non-finalized chain index"); + batch.put_batch(Table::LiveChain, index_entries)?; - batch.commit().expect("commit"); + batch.commit() } /// Get a signed block by combining header, body, and signatures. /// /// Returns None if any of the components are not found. /// Note: Genesis block has no entry in BlockSignatures table. - pub fn get_signed_block(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); + pub fn get_signed_block(&self, root: &H256) -> Result, Error> { + let view = self.backend.begin_read()?; let key = root.to_ssz(); - let header_bytes = view.get(Table::BlockHeaders, &key).expect("get")?; - let sig_bytes = view.get(Table::BlockSignatures, &key).expect("get")?; + let Some(header_bytes) = view.get(Table::BlockHeaders, &key)? else { + return Ok(None); + }; + let Some(sig_bytes) = view.get(Table::BlockSignatures, &key)? else { + return Ok(None); + }; - let header = BlockHeader::from_ssz_bytes(&header_bytes).expect("valid header"); + let header = BlockHeader::from_ssz_bytes(&header_bytes) + .map_err(|e| -> Error { Box::new(e) })?; // Use empty body if header indicates empty, otherwise fetch from DB let body = if header.body_root == *EMPTY_BODY_ROOT { BlockBody::default() } else { - let body_bytes = view.get(Table::BlockBodies, &key).expect("get")?; - BlockBody::from_ssz_bytes(&body_bytes).expect("valid body") + let Some(body_bytes) = view.get(Table::BlockBodies, &key)? else { + return Ok(None); + }; + BlockBody::from_ssz_bytes(&body_bytes) + .map_err(|e| -> Error { Box::new(e) })? }; let block = Block::from_header_and_body(header, body); - let signature = BlockSignatures::from_ssz_bytes(&sig_bytes).expect("valid signatures"); + let signature = BlockSignatures::from_ssz_bytes(&sig_bytes) + .map_err(|e| -> Error { Box::new(e) })?; - Some(SignedBlock { + Ok(Some(SignedBlock { message: block, signature, - }) + })) } // ============ States ============ /// Returns the state for the given block root. - pub fn get_state(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::States, &root.to_ssz()) - .expect("get") - .map(|bytes| State::from_ssz_bytes(&bytes).expect("valid state")) + pub fn get_state(&self, root: &H256) -> Result, Error> { + let view = self.backend.begin_read()?; + view.get(Table::States, &root.to_ssz())? + .map(|bytes| State::from_ssz_bytes(&bytes).map_err(|e| -> Error { Box::new(e) })) + .transpose() } /// Returns whether a state exists for the given block root. - pub fn has_state(&self, root: &H256) -> bool { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::States, &root.to_ssz()) - .expect("get") - .is_some() + pub fn has_state(&self, root: &H256) -> Result { + let view = self.backend.begin_read()?; + Ok(view.get(Table::States, &root.to_ssz())? + .is_some()) } /// Stores a state indexed by block root. - pub fn insert_state(&mut self, root: H256, state: State) { - let mut batch = self.backend.begin_write().expect("write batch"); + pub fn insert_state(&mut self, root: H256, state: State) -> Result<(), Error> { + let mut batch = self.backend.begin_write()?; let entries = vec![(root.to_ssz(), state.to_ssz())]; - batch.put_batch(Table::States, entries).expect("put state"); - batch.commit().expect("commit"); + batch.put_batch(Table::States, entries)?; + batch.commit() } // ============ Attestation Extraction ============ @@ -1156,23 +1133,23 @@ impl Store { // ============ Derived Accessors ============ /// Returns the slot of the current head block. - pub fn head_slot(&self) -> u64 { - self.get_block_header(&self.head()) - .expect("head block exists") - .slot + pub fn head_slot(&self) -> Result { + Ok(self.get_block_header(&self.head()?)? + .ok_or_else(|| -> Error { "head block header not found".into() })? + .slot) } /// Returns the slot of the current safe target block. - pub fn safe_target_slot(&self) -> u64 { - self.get_block_header(&self.safe_target()) - .expect("safe target exists") - .slot + pub fn safe_target_slot(&self) -> Result { + Ok(self.get_block_header(&self.safe_target()?)? + .ok_or_else(|| -> Error { "safe target block header not found".into() })? + .slot) } /// Returns a clone of the head state. - pub fn head_state(&self) -> State { - self.get_state(&self.head()) - .expect("head state is always available") + pub fn head_state(&self) -> Result { + self.get_state(&self.head()?)? + .ok_or_else(|| "head state not found".into()) } } @@ -1184,7 +1161,7 @@ fn write_signed_block( batch: &mut dyn StorageWriteBatch, root: &H256, signed_block: SignedBlock, -) -> Block { +) -> Result { let SignedBlock { message: block, signature, @@ -1194,24 +1171,18 @@ fn write_signed_block( let root_bytes = root.to_ssz(); let header_entries = vec![(root_bytes.clone(), header.to_ssz())]; - batch - .put_batch(Table::BlockHeaders, header_entries) - .expect("put block header"); + batch.put_batch(Table::BlockHeaders, header_entries)?; // Skip storing empty bodies - they can be reconstructed from the header's body_root if header.body_root != *EMPTY_BODY_ROOT { let body_entries = vec![(root_bytes.clone(), block.body.to_ssz())]; - batch - .put_batch(Table::BlockBodies, body_entries) - .expect("put block body"); + batch.put_batch(Table::BlockBodies, body_entries)?; } let sig_entries = vec![(root_bytes, signature.to_ssz())]; - batch - .put_batch(Table::BlockSignatures, sig_entries) - .expect("put block signatures"); + batch.put_batch(Table::BlockSignatures, sig_entries)?; - block + Ok(block) } #[cfg(test)] @@ -1318,7 +1289,7 @@ mod tests { BLOCKS_TO_KEEP ); - let pruned = store.prune_old_blocks(&[]); + let pruned = store.prune_old_blocks(&[]).unwrap(); assert_eq!(pruned, 0); assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), @@ -1337,7 +1308,7 @@ mod tests { } assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), total); - let pruned = store.prune_old_blocks(&[]); + let pruned = store.prune_old_blocks(&[]).unwrap(); assert_eq!(pruned, 10); assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), @@ -1375,7 +1346,7 @@ mod tests { // Protect the two oldest blocks (slots 0 and 1) let finalized_root = root(0); let justified_root = root(1); - let pruned = store.prune_old_blocks(&[finalized_root, justified_root]); + let pruned = store.prune_old_blocks(&[finalized_root, justified_root]).unwrap(); // 10 would be pruned, but 2 are protected assert_eq!(pruned, 8); @@ -1418,7 +1389,7 @@ mod tests { STATES_TO_KEEP ); - let pruned = store.prune_old_states(&[]); + let pruned = store.prune_old_states(&[]).unwrap(); assert_eq!(pruned, 0); } @@ -1434,7 +1405,7 @@ mod tests { } assert_eq!(count_entries(backend.as_ref(), Table::States), total); - let pruned = store.prune_old_states(&[]); + let pruned = store.prune_old_states(&[]).unwrap(); assert_eq!(pruned, 5); assert_eq!( count_entries(backend.as_ref(), Table::States), @@ -1464,7 +1435,7 @@ mod tests { let finalized_root = root(0); let justified_root = root(2); - let pruned = store.prune_old_states(&[finalized_root, justified_root]); + let pruned = store.prune_old_states(&[finalized_root, justified_root]).unwrap(); // 5 would be pruned, but 2 are protected assert_eq!(pruned, 3); @@ -1525,13 +1496,13 @@ mod tests { // Use the last inserted root as head. Calling update_checkpoints with // head_only triggers the fallback path (finalization doesn't advance). let head_root = root(total_states as u64 - 1); - store.update_checkpoints(ForkCheckpoints::head_only(head_root)); + store.update_checkpoints(ForkCheckpoints::head_only(head_root)).unwrap(); // update_checkpoints no longer prunes states/blocks inline — the caller // must invoke prune_old_data() separately (after a block cascade completes). assert_eq!(count_entries(backend.as_ref(), Table::States), total_states); - store.prune_old_data(); + store.prune_old_data().unwrap(); // 3005 headers total. Top 3000 by slot are kept in the retention window, // leaving 5 candidates. 2 are protected (finalized + justified), @@ -1576,8 +1547,8 @@ mod tests { // Use the last inserted root as head let head_root = root(STATES_TO_KEEP as u64 - 1); - store.update_checkpoints(ForkCheckpoints::head_only(head_root)); - store.prune_old_data(); + store.update_checkpoints(ForkCheckpoints::head_only(head_root)).unwrap(); + store.prune_old_data().unwrap(); // Nothing should be pruned (within retention window) assert_eq!(