Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/flashblocks/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,3 @@ thiserror.workspace = true
parking_lot.workspace = true
chrono.workspace = true
reth-tasks = { workspace = true }

75 changes: 49 additions & 26 deletions crates/flashblocks/p2p/src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@ use futures::{Stream, StreamExt};
use metrics::gauge;
use reth::payload::PayloadId;
use reth_ethereum::network::{api::PeerId, eth_wire::multiplex::ProtocolConnection};
use reth_network::types::ReputationChangeKind;
use reth_network::{cache::LruMap, types::ReputationChangeKind};
use std::{
pin::Pin,
task::{Context, Poll, ready},
};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{info, trace};

/// Grace period for authorization timestamp checks to reduce false positives from
/// minor skew/races between peers.
const AUTHORIZATION_TIMESTAMP_GRACE_SEC: u64 = 10;

/// Number of payload receive-sets cached per peer.
///
/// This should be large enough to retain entries across the grace window.
const RECEIVED_CACHE_LEN: u32 = AUTHORIZATION_TIMESTAMP_GRACE_SEC as u32 * 20;

/// Represents a single P2P connection for the flashblocks protocol.
///
/// This struct manages the bidirectional communication with a single peer in the flashblocks
Expand All @@ -40,11 +49,9 @@ pub struct FlashblocksConnection<N> {
/// Receiver for peer messages to be sent to all peers.
/// We send bytes over this stream to avoid repeatedly having to serialize the payloads.
peer_rx: BroadcastStream<PeerMsg>,
/// Most recent payload ID received from this peer to track payload transitions.
payload_id: PayloadId,
/// A list of flashblock indices that we have already received from
/// this peer for the current payload, used to detect duplicate messages.
received: Vec<bool>,
/// Per-peer tracking of flashblocks this peer has already sent us.
/// Uses `peek` for lookups to avoid LRU promotion, giving FIFO eviction semantics.
received_cache: LruMap<(PayloadId, usize), ()>,
}

impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
Expand All @@ -68,9 +75,29 @@ impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
conn,
peer_id,
peer_rx,
payload_id: PayloadId::default(),
received: Vec::new(),
received_cache: LruMap::new(RECEIVED_CACHE_LEN),
}
}
}

impl<N> FlashblocksConnection<N> {
/// Insert a `(payload_id, flashblock_index)` into the received cache.
///
/// Uses [`LruMap::peek`] before insert to avoid promoting duplicates,
/// giving FIFO eviction semantics instead of LRU.
///
/// Returns `true` if the key was newly inserted, `false` if it already existed.
fn received_cache_insert(&mut self, key: (PayloadId, usize)) -> bool {
if self.received_cache.peek(&key).is_some() {
return false;
}
self.received_cache.insert(key, ())
}

/// Check if a `(payload_id, flashblock_index)` exists in the received cache
/// without promoting it (preserves FIFO eviction order).
fn received_cache_contains(&self, key: &(PayloadId, usize)) -> bool {
self.received_cache.peek(key).is_some()
}
}

Expand Down Expand Up @@ -104,9 +131,7 @@ impl<N: FlashblocksP2PNetworkHandle> Stream for FlashblocksConnection<N> {
bytes,
)) => {
// Check if this flashblock actually originated from this peer.
if this.payload_id != payload_id
|| this.received.get(flashblock_index) != Some(&true)
{
if !this.received_cache_contains(&(payload_id, flashblock_index)) {
trace!(
target: "flashblocks::p2p",
peer_id = %this.peer_id,
Expand Down Expand Up @@ -226,24 +251,32 @@ impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
///
/// # Behavior
/// - Validates timestamp to prevent replay attacks
/// - Tracks payload transitions and resets duplicate detection
/// - Tracks duplicate detection across recently seen payloads
/// - Prevents duplicate flashblock spam from the same peer
/// - Updates active publisher information from base payload data
/// - Forwards valid payloads to the protocol handler for processing
fn handle_flashblocks_payload_v1(
&mut self,
authorized_payload: AuthorizedPayload<FlashblocksPayloadV1>,
) {
let mut state = self.protocol.handle.state.lock();
let state_handle = self.protocol.handle.state.clone();
let mut state = state_handle.lock();
let authorization = &authorized_payload.authorized.authorization;
let msg = authorized_payload.msg();

// check if this is an old payload
if authorization.timestamp < state.payload_timestamp {
// Check if this payload is older than our current view by more than the allowed
// grace window.
if authorization.timestamp
< state
.payload_timestamp
.saturating_sub(AUTHORIZATION_TIMESTAMP_GRACE_SEC)
{
tracing::warn!(
target: "flashblocks::p2p",
peer_id = %self.peer_id,
current_timestamp = state.payload_timestamp,
timestamp = authorization.timestamp,
grace_sec = AUTHORIZATION_TIMESTAMP_GRACE_SEC,
"received flashblock with outdated timestamp",
);
self.protocol
Expand All @@ -252,12 +285,6 @@ impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
return;
}

// Check if this is a new payload from this peer
if self.payload_id != msg.payload_id {
self.payload_id = msg.payload_id;
self.received.fill(false);
}

// Check if the payload index is within the allowed range
if msg.index as usize > MAX_FLASHBLOCK_INDEX {
tracing::error!(
Expand All @@ -272,10 +299,7 @@ impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
}

// Check if this peer is spamming us with the same payload index
let len = self.received.len();
self.received
.resize_with(len.max(msg.index as usize + 1), || false);
if self.received[msg.index as usize] {
if !self.received_cache_insert((msg.payload_id, msg.index as usize)) {
// We've already seen this index from this peer.
// They could be trying to DOS us.
tracing::warn!(
Expand All @@ -290,7 +314,6 @@ impl<N: FlashblocksP2PNetworkHandle> FlashblocksConnection<N> {
.reputation_change(self.peer_id, ReputationChangeKind::AlreadySeenTransaction);
return;
}
self.received[msg.index as usize] = true;

state.publishing_status.send_modify(|status| {
let active_publishers = match status {
Expand Down
2 changes: 1 addition & 1 deletion crates/flashblocks/primitives/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{error::FlashblocksError, primitives::FlashblocksPayloadV1};
pub struct Authorization {
/// The unique identifier of the payload this authorization applies to
pub payload_id: PayloadId,
/// Unix timestamp when this authorization was created
/// Unix timestamp in seconds corresponding to the PayloadAttributes timestamp
pub timestamp: u64,
/// The public key of the builder who is authorized to sign messages
pub builder_vk: VerifyingKey,
Expand Down
Loading