Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ fn read_validator_keys(
validator_keys.insert(
idx,
ValidatorKeyPair {
attestation_key,
proposal_key,
attestation_key: Some(attestation_key),
proposal_key: Some(proposal_key),
},
);
}
Expand Down
101 changes: 60 additions & 41 deletions crates/blockchain/src/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@ use ethlambda_types::{
primitives::{H256, HashTreeRoot as _},
signature::{ValidatorSecretKey, ValidatorSignature},
};
use tracing::info;

use crate::metrics;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum KeyRole {
Attestation,
Proposal,
}

/// Error types for KeyManager operations.
#[derive(Debug, thiserror::Error)]
pub enum KeyManagerError {
#[error("Validator key not found for validator_id: {0}")]
ValidatorKeyNotFound(u64),
#[error("Key unavailable for validator {0}")]
KeyUnavailable(u64),
#[error("Key not prepared for slot {slot} (validator {validator_id}, {role:?})")]
KeyNotPreparedForSlot {
validator_id: u64,
role: KeyRole,
slot: u32,
},
#[error("Signing error: {0}")]
SigningError(String),
#[error("Signature conversion error: {0}")]
Expand All @@ -26,16 +39,16 @@ pub enum KeyManagerError {
/// allowing the validator to sign both an attestation and a block proposal
/// within the same slot.
pub struct ValidatorKeyPair {
pub attestation_key: ValidatorSecretKey,
pub proposal_key: ValidatorSecretKey,
pub attestation_key: Option<ValidatorSecretKey>,
pub proposal_key: Option<ValidatorSecretKey>,
}

/// Manages validator secret keys for signing attestations and block proposals.
///
/// Each validator has two independent XMSS keys: one for attestation signing
/// and one for block proposal signing.
pub struct KeyManager {
keys: HashMap<u64, ValidatorKeyPair>,
pub(crate) keys: HashMap<u64, ValidatorKeyPair>,
}

impl KeyManager {
Expand Down Expand Up @@ -79,29 +92,22 @@ impl KeyManager {
.keys
.get_mut(&validator_id)
.ok_or(KeyManagerError::ValidatorKeyNotFound(validator_id))?;
let key = key_pair
.attestation_key
.as_ref()
.ok_or(KeyManagerError::KeyUnavailable(validator_id))?;

// Advance XMSS key preparation window if the slot is outside the current window.
// Each bottom tree covers 65,536 slots; the window holds 2 at a time.
// Multiple advances may be needed if the node was offline for an extended period.
if !key_pair.attestation_key.is_prepared_for(slot) {
info!(validator_id, slot, "Advancing XMSS key preparation window");
while !key_pair.attestation_key.is_prepared_for(slot) {
let before = key_pair.attestation_key.get_prepared_interval();
key_pair.attestation_key.advance_preparation();
if key_pair.attestation_key.get_prepared_interval() == before {
return Err(KeyManagerError::SigningError(format!(
"XMSS key exhausted for validator {validator_id}: \
slot {slot} is beyond the key's activation interval"
)));
}
}
if !key.is_prepared_for(slot) {
return Err(KeyManagerError::KeyNotPreparedForSlot {
validator_id,
role: KeyRole::Attestation,
slot,
});
}

let signature: ValidatorSignature = {
let _timing = metrics::time_pq_sig_attestation_signing();
key_pair
.attestation_key
.sign(slot, message)
key.sign(slot, message)
.map_err(|e| KeyManagerError::SigningError(e.to_string()))
}?;
metrics::inc_pq_sig_attestation_signatures();
Expand All @@ -121,29 +127,20 @@ impl KeyManager {
.keys
.get_mut(&validator_id)
.ok_or(KeyManagerError::ValidatorKeyNotFound(validator_id))?;
let key = key_pair
.proposal_key
.as_ref()
.ok_or(KeyManagerError::KeyUnavailable(validator_id))?;

// Advance XMSS key preparation window if the slot is outside the current window.
// Each bottom tree covers 65,536 slots; the window holds 2 at a time.
// Multiple advances may be needed if the node was offline for an extended period.
if !key_pair.proposal_key.is_prepared_for(slot) {
info!(
if !key.is_prepared_for(slot) {
return Err(KeyManagerError::KeyNotPreparedForSlot {
validator_id,
slot, "Advancing XMSS proposal key preparation window"
);
while !key_pair.proposal_key.is_prepared_for(slot) {
let before = key_pair.proposal_key.get_prepared_interval();
key_pair.proposal_key.advance_preparation();
if key_pair.proposal_key.get_prepared_interval() == before {
return Err(KeyManagerError::SigningError(format!(
"XMSS proposal key exhausted for validator {validator_id}: \
slot {slot} is beyond the key's activation interval"
)));
}
}
role: KeyRole::Proposal,
slot,
});
}

let signature: ValidatorSignature = key_pair
.proposal_key
let signature: ValidatorSignature = key
.sign(slot, message)
.map_err(|e| KeyManagerError::SigningError(e.to_string()))?;

Expand Down Expand Up @@ -189,4 +186,26 @@ mod tests {
Err(KeyManagerError::ValidatorKeyNotFound(123))
));
}

#[test]
fn test_sign_returns_key_unavailable_when_field_is_none() {
let mut keys = HashMap::new();
keys.insert(
0,
ValidatorKeyPair {
attestation_key: None,
proposal_key: None,
},
);
let mut key_manager = KeyManager::new(keys);

assert!(matches!(
key_manager.sign_with_attestation_key(0, 0, &H256::default()),
Err(KeyManagerError::KeyUnavailable(0)),
));
assert!(matches!(
key_manager.sign_with_proposal_key(0, 0, &H256::default()),
Err(KeyManagerError::KeyUnavailable(0)),
));
}
}
137 changes: 118 additions & 19 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ use ethlambda_types::{
attestation::{SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, SignedBlock},
primitives::{H256, HashTreeRoot as _},
signature::ValidatorSecretKey,
};

use crate::aggregation::{
AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone,
AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker,
};
use crate::key_manager::ValidatorKeyPair;
use crate::key_manager::{KeyManagerError, KeyRole, ValidatorKeyPair};
use spawned_concurrency::actor;
use spawned_concurrency::error::ActorError;
use spawned_concurrency::message::Message;
use spawned_concurrency::protocol;
use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -171,14 +173,14 @@ impl BlockChainServer {

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
self.propose_block(slot, validator_id);
self.propose_block(slot, validator_id, ctx);
}

// Produce attestations at interval 1 (all validators including proposer).
// Reuse the same snapshot so self-delivery decisions match the rest
// of the tick.
if interval == 1 {
self.produce_attestations(slot, is_aggregator);
self.produce_attestations(slot, is_aggregator, ctx);
}

// Update safe target slot metric (updated by store.on_tick at interval 3)
Expand Down Expand Up @@ -254,7 +256,7 @@ impl BlockChainServer {
.find(|&vid| is_proposer(vid, slot, num_validators))
}

fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) {
fn produce_attestations(&mut self, slot: u64, is_aggregator: bool, ctx: &Context<Self>) {
let _timing = metrics::time_attestations_production();

// Produce attestation data once for all validators
Expand All @@ -263,14 +265,24 @@ impl BlockChainServer {
// For each registered validator, produce and publish attestation
for validator_id in self.key_manager.validator_ids() {
// Sign the attestation
let Ok(signature) = self
let signature = match self
.key_manager
.sign_attestation(validator_id, &attestation_data)
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to sign attestation"),
)
else {
continue;
{
Ok(sig) => sig,
Err(KeyManagerError::KeyNotPreparedForSlot {
role,
slot: target_slot,
..
}) => {
self.prepare_key_for_slot(validator_id, role, target_slot, ctx);
continue;
}
Err(KeyManagerError::KeyUnavailable(_)) => continue,
Err(err) => {
error!(%slot, %validator_id, %err, "Failed to sign attestation");
continue;
}
};

// Create signed attestation
Expand Down Expand Up @@ -302,7 +314,7 @@ impl BlockChainServer {
}

/// Build and publish a block for the given slot and validator.
fn propose_block(&mut self, slot: u64, validator_id: u64) {
fn propose_block(&mut self, slot: u64, validator_id: u64, ctx: &Context<Self>) {
info!(%slot, %validator_id, "We are the proposer for this slot");

let _timing = metrics::time_block_building();
Expand All @@ -318,14 +330,31 @@ impl BlockChainServer {

// Sign the block root with the proposal key
let block_root = block.hash_tree_root();
let Ok(proposer_signature) = self
.key_manager
.sign_block_root(validator_id, slot as u32, &block_root)
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root"))
else {
metrics::inc_block_building_failures();
return;
};
let proposer_signature =
match self
.key_manager
.sign_block_root(validator_id, slot as u32, &block_root)
{
Ok(sig) => sig,
Err(KeyManagerError::KeyNotPreparedForSlot {
role,
slot: target_slot,
..
}) => {
self.prepare_key_for_slot(validator_id, role, target_slot, ctx);
metrics::inc_block_building_failures();
return;
}
Err(KeyManagerError::KeyUnavailable(_)) => {
metrics::inc_block_building_failures();
return;
}
Err(err) => {
error!(%slot, %validator_id, %err, "Failed to sign block root");
metrics::inc_block_building_failures();
return;
}
};

// Assemble SignedBlock
let signed_block = SignedBlock {
Expand Down Expand Up @@ -357,6 +386,39 @@ impl BlockChainServer {
info!(%slot, %validator_id, "Published block");
}

/// Move the validator's key off the actor onto a `spawn_blocking` worker
/// that runs `advance_preparation` until the prepared window covers `target_slot`.
/// The worker sends back `KeyPreparedForSlot` for the actor
/// to restore the (possibly advanced) key.
fn prepare_key_for_slot(
&mut self,
validator_id: u64,
role: KeyRole,
target_slot: u32,
ctx: &Context<Self>,
) {
let Some(key_pair) = self.key_manager.keys.get_mut(&validator_id) else {
return;
};
let field = match role {
KeyRole::Attestation => &mut key_pair.attestation_key,
KeyRole::Proposal => &mut key_pair.proposal_key,
};
let Some(key) = field.take() else { return };

info!(%validator_id, ?role, %target_slot, "Preparing XMSS key for slot in background");
let actor_ref = ctx.actor_ref().clone();

tokio::task::spawn_blocking(move || {
let result = key.advance_until_prepared(target_slot);
let _ = actor_ref.send(KeyPreparedForSlot {
validator_id,
role,
key: result,
});
});
}

fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
store::on_block(&mut self.store, signed_block)?;
let head_slot = self.store.head_slot();
Expand Down Expand Up @@ -719,3 +781,40 @@ impl Handler<AggregationDeadline> for BlockChainServer {
}
}
}

/// Worker β†’ actor result for a background XMSS key advance.
/// `key: None` means the activation interval was exhausted.
pub(crate) struct KeyPreparedForSlot {
pub validator_id: u64,
pub role: KeyRole,
pub key: Option<ValidatorSecretKey>,
}
impl Message for KeyPreparedForSlot {
type Result = ();
}

impl Handler<KeyPreparedForSlot> for BlockChainServer {
async fn handle(&mut self, msg: KeyPreparedForSlot, _ctx: &Context<Self>) {
let KeyPreparedForSlot {
validator_id,
role,
key,
} = msg;
let Some(key_pair) = self.key_manager.keys.get_mut(&validator_id) else {
return;
};
match key {
Some(advanced) => {
info!(%validator_id, ?role, "XMSS key advance complete");
match role {
KeyRole::Attestation => key_pair.attestation_key = Some(advanced),
KeyRole::Proposal => key_pair.proposal_key = Some(advanced),
}
}
None => error!(
%validator_id, ?role,
"XMSS key activation interval exhausted; validator can no longer sign with this key"
),
}
}
}
Loading