Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
use clap::Parser;
use ethlambda_blockchain::key_manager::ValidatorKeyPair;
use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2PToBlockChainRef};
use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs};
use ethlambda_p2p::{Bootnode, P2P, PeerId, SwarmConfig, build_swarm, parse_enrs};
use ethlambda_types::primitives::H256;
use ethlambda_types::{
aggregator::AggregatorController,
Expand Down Expand Up @@ -141,7 +141,7 @@ async fn main() -> eyre::Result<()> {
"Loaded genesis configuration"
);

populate_name_registry(&validator_config);
let node_names = load_node_names(&validator_config);
let bootnodes = read_bootnodes(&bootnodes_path);

let validator_keys =
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn main() -> eyre::Result<()> {
})
.expect("failed to build swarm");

let p2p = P2P::spawn(built, store.clone());
let p2p = P2P::spawn(built, store.clone(), node_names);

// Wire actors together via protocol refs
blockchain
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn main() -> eyre::Result<()> {
Ok(())
}

fn populate_name_registry(validator_config: impl AsRef<Path>) {
fn load_node_names(validator_config: impl AsRef<Path>) -> HashMap<PeerId, String> {
#[derive(Deserialize)]
struct Validator {
name: String,
Expand All @@ -244,8 +244,9 @@ fn populate_name_registry(validator_config: impl AsRef<Path>) {
.map(|v| (v.name, v.privkey))
.collect();

// Populates a dictionary used for labeling metrics with node names
ethlambda_p2p::metrics::populate_name_registry(names_and_privkeys);
let names = ethlambda_p2p::derive_peer_ids(names_and_privkeys);
info!(count = names.len(), "Loaded node-name registry");
names
}

fn read_bootnodes(bootnodes_path: impl AsRef<Path>) -> Vec<Bootnode> {
Expand Down
66 changes: 58 additions & 8 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use ethrex_p2p::types::NodeRecord;
use ethrex_rlp::decode::RLPDecode;
use futures::StreamExt;
use libp2p::{
Multiaddr, PeerId, StreamProtocol,
Multiaddr, StreamProtocol,
gossipsub::{MessageAuthenticity, ValidationMode},
identity::{PublicKey, secp256k1},
identity::{Keypair, PublicKey, secp256k1},
multiaddr::Protocol,
request_response::{self, OutboundRequestId},
swarm::{NetworkBehaviour, SwarmEvent},
Expand Down Expand Up @@ -51,7 +51,7 @@ pub mod metrics;
mod req_resp;
pub(crate) mod swarm_adapter;

pub use metrics::populate_name_registry;
pub use libp2p::PeerId;

// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1280ms, 2560ms
const MAX_FETCH_RETRIES: u32 = 10;
Expand Down Expand Up @@ -282,7 +282,7 @@ pub struct P2P {

impl P2P {
/// Build swarm, start I/O adapter, spawn actor, and wire the swarm event stream.
pub fn spawn(built: BuiltSwarm, store: Store) -> P2P {
pub fn spawn(built: BuiltSwarm, store: Store, node_names: HashMap<PeerId, String>) -> P2P {
let (swarm_stream, swarm_handle) = swarm_adapter::start_swarm_adapter(built.swarm);

let server = P2PServer {
Expand All @@ -297,6 +297,7 @@ impl P2P {
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
bootnode_addrs: built.bootnode_addrs,
node_names,
};
let handle = server.start();
spawn_listener(handle.context(), swarm_stream.map(WrappedSwarmEvent));
Expand Down Expand Up @@ -332,6 +333,16 @@ pub struct P2PServer {
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}

impl P2PServer {
fn resolve_node_name(&self, peer_id: Option<&PeerId>) -> &str {
peer_id
.and_then(|p| self.node_names.get(p))
.map(String::as_str)
.unwrap_or("unknown")
}
}

// Protocol trait for internal messages only (retry scheduling).
Expand Down Expand Up @@ -459,7 +470,11 @@ async fn handle_swarm_event(
if num_established.get() == 1 {
server.connected_peers.insert(peer_id);
let peer_count = server.connected_peers.len();
metrics::notify_peer_connected(&Some(peer_id), direction, "success");
metrics::notify_peer_connected(
server.resolve_node_name(Some(&peer_id)),
direction,
"success",
);
// Send status request on first connection to this peer
let our_status = build_status(&server.store);
let our_finalized_slot = our_status.finalized.slot;
Expand Down Expand Up @@ -512,7 +527,11 @@ async fn handle_swarm_event(
if num_established == 0 {
server.connected_peers.remove(&peer_id);
let peer_count = server.connected_peers.len();
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);
metrics::notify_peer_disconnected(
server.resolve_node_name(Some(&peer_id)),
direction,
reason,
);

info!(
%peer_id,
Expand Down Expand Up @@ -541,7 +560,11 @@ async fn handle_swarm_event(
} else {
"error"
};
metrics::notify_peer_connected(&peer_id, "outbound", result);
metrics::notify_peer_connected(
server.resolve_node_name(peer_id.as_ref()),
"outbound",
result,
);
warn!(?peer_id, %error, "Outgoing connection error");

// Schedule redial if this was a bootnode
Expand All @@ -558,7 +581,11 @@ async fn handle_swarm_event(
}
}
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
metrics::notify_peer_connected(&peer_id, "inbound", "error");
metrics::notify_peer_connected(
server.resolve_node_name(peer_id.as_ref()),
"inbound",
"error",
);
warn!(%error, "Incoming connection error");
}
_ => {
Expand All @@ -567,6 +594,29 @@ async fn handle_swarm_event(
}
}

// --- Node identity helpers ---

/// Derive each entry's `PeerId` from its secp256k1 private key.
///
/// Drops entries whose key fails to parse, with a `warn!` per drop.
pub fn derive_peer_ids(names_and_privkeys: HashMap<String, H256>) -> HashMap<PeerId, String> {
names_and_privkeys
.into_iter()
.filter_map(|(name, mut privkey)| {
match secp256k1::SecretKey::try_from_bytes(&mut privkey.0) {
Ok(privkey) => {
let pubkey = Keypair::from(secp256k1::Keypair::from(privkey)).public();
Some((PeerId::from_public_key(&pubkey), name))
}
Err(err) => {
warn!(%name, %err, "Skipping node-name registry entry: invalid secp256k1 privkey");
None
}
}
})
.collect()
}

// --- Bootnode parsing ---

pub struct Bootnode {
Expand Down
49 changes: 5 additions & 44 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,8 @@
//! Prometheus metrics for the P2P network layer.

use std::{
collections::HashMap,
sync::{LazyLock, RwLock},
};
use std::sync::LazyLock;

use ethlambda_metrics::*;
use ethlambda_types::primitives::H256;
use libp2p::{
PeerId,
identity::{Keypair, secp256k1},
};

static NODE_NAME_REGISTRY: LazyLock<RwLock<HashMap<PeerId, &'static str>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));

pub fn populate_name_registry(names_and_privkeys: HashMap<String, H256>) {
let mut registry = NODE_NAME_REGISTRY.write().unwrap();
*registry = names_and_privkeys
.into_iter()
.filter_map(|(name, mut privkey)| {
let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey.0) else {
return None;
};
let pubkey = Keypair::from(secp256k1::Keypair::from(privkey)).public();
let peer_id = PeerId::from_public_key(&pubkey);
// NOTE: we leak the name string to get a 'static lifetime.
// In reality, the name registry is not expected to be read, so it should be safe
// to turn these strings to &'static str.
Some((peer_id, &*name.leak()))
})
.collect();
}

fn resolve(peer_id: &Option<PeerId>) -> &'static str {
let registry = NODE_NAME_REGISTRY.read().unwrap();
peer_id
.as_ref()
.and_then(|peer_id| registry.get(peer_id))
.unwrap_or(&"unknown")
}

static LEAN_CONNECTED_PEERS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
Expand Down Expand Up @@ -146,25 +109,23 @@ pub fn set_attestation_committee_subnet(subnet_id: u64) {
///
/// If `result` is "success", the connected peer count is incremented.
/// The connection event counter is always incremented.
pub fn notify_peer_connected(peer_id: &Option<PeerId>, direction: &str, result: &str) {
pub fn notify_peer_connected(node_name: &str, direction: &str, result: &str) {
LEAN_PEER_CONNECTION_EVENTS_TOTAL
.with_label_values(&[direction, result])
.inc();

if result == "success" {
let name = resolve(peer_id);
LEAN_CONNECTED_PEERS.with_label_values(&[name]).inc();
LEAN_CONNECTED_PEERS.with_label_values(&[node_name]).inc();
}
}

/// Notify that a peer disconnected.
///
/// Decrements the connected peer count and increments the disconnection event counter.
pub fn notify_peer_disconnected(peer_id: &Option<PeerId>, direction: &str, reason: &str) {
pub fn notify_peer_disconnected(node_name: &str, direction: &str, reason: &str) {
LEAN_PEER_DISCONNECTION_EVENTS_TOTAL
.with_label_values(&[direction, reason])
.inc();

let name = resolve(peer_id);
LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec();
LEAN_CONNECTED_PEERS.with_label_values(&[node_name]).dec();
}