diff --git a/crates/flashblocks/p2p/Cargo.toml b/crates/flashblocks/p2p/Cargo.toml index cf810b92..2bc32cc6 100644 --- a/crates/flashblocks/p2p/Cargo.toml +++ b/crates/flashblocks/p2p/Cargo.toml @@ -28,4 +28,3 @@ thiserror.workspace = true parking_lot.workspace = true chrono.workspace = true reth-tasks = { workspace = true } - diff --git a/crates/flashblocks/p2p/src/protocol/connection.rs b/crates/flashblocks/p2p/src/protocol/connection.rs index 5d2d033f..cafeb3ca 100644 --- a/crates/flashblocks/p2p/src/protocol/connection.rs +++ b/crates/flashblocks/p2p/src/protocol/connection.rs @@ -14,7 +14,7 @@ use futures::{Stream, StreamExt}; use metrics::gauge; use reth::payload::PayloadId; use reth_ethereum::network::{api::PeerId, eth_wire::multiplex::ProtocolConnection}; -use reth_network::types::ReputationChangeKind; +use reth_network::{cache::LruMap, types::ReputationChangeKind}; use std::{ pin::Pin, task::{Context, Poll, ready}, @@ -22,6 +22,15 @@ use std::{ use tokio_stream::wrappers::BroadcastStream; use tracing::{info, trace}; +/// Grace period for authorization timestamp checks to reduce false positives from +/// minor skew/races between peers. +const AUTHORIZATION_TIMESTAMP_GRACE_SEC: u64 = 10; + +/// Number of payload receive-sets cached per peer. +/// +/// This should be large enough to retain entries across the grace window. +const RECEIVED_CACHE_LEN: u32 = AUTHORIZATION_TIMESTAMP_GRACE_SEC as u32 * 20; + /// Represents a single P2P connection for the flashblocks protocol. /// /// This struct manages the bidirectional communication with a single peer in the flashblocks @@ -40,11 +49,9 @@ pub struct FlashblocksConnection { /// Receiver for peer messages to be sent to all peers. /// We send bytes over this stream to avoid repeatedly having to serialize the payloads. peer_rx: BroadcastStream, - /// Most recent payload ID received from this peer to track payload transitions. - payload_id: PayloadId, - /// A list of flashblock indices that we have already received from - /// this peer for the current payload, used to detect duplicate messages. - received: Vec, + /// Per-peer tracking of flashblocks this peer has already sent us. + /// Uses `peek` for lookups to avoid LRU promotion, giving FIFO eviction semantics. + received_cache: LruMap<(PayloadId, usize), ()>, } impl FlashblocksConnection { @@ -68,9 +75,29 @@ impl FlashblocksConnection { conn, peer_id, peer_rx, - payload_id: PayloadId::default(), - received: Vec::new(), + received_cache: LruMap::new(RECEIVED_CACHE_LEN), + } + } +} + +impl FlashblocksConnection { + /// Insert a `(payload_id, flashblock_index)` into the received cache. + /// + /// Uses [`LruMap::peek`] before insert to avoid promoting duplicates, + /// giving FIFO eviction semantics instead of LRU. + /// + /// Returns `true` if the key was newly inserted, `false` if it already existed. + fn received_cache_insert(&mut self, key: (PayloadId, usize)) -> bool { + if self.received_cache.peek(&key).is_some() { + return false; } + self.received_cache.insert(key, ()) + } + + /// Check if a `(payload_id, flashblock_index)` exists in the received cache + /// without promoting it (preserves FIFO eviction order). + fn received_cache_contains(&self, key: &(PayloadId, usize)) -> bool { + self.received_cache.peek(key).is_some() } } @@ -104,9 +131,7 @@ impl Stream for FlashblocksConnection { bytes, )) => { // Check if this flashblock actually originated from this peer. - if this.payload_id != payload_id - || this.received.get(flashblock_index) != Some(&true) - { + if !this.received_cache_contains(&(payload_id, flashblock_index)) { trace!( target: "flashblocks::p2p", peer_id = %this.peer_id, @@ -226,7 +251,7 @@ impl FlashblocksConnection { /// /// # Behavior /// - Validates timestamp to prevent replay attacks - /// - Tracks payload transitions and resets duplicate detection + /// - Tracks duplicate detection across recently seen payloads /// - Prevents duplicate flashblock spam from the same peer /// - Updates active publisher information from base payload data /// - Forwards valid payloads to the protocol handler for processing @@ -234,16 +259,24 @@ impl FlashblocksConnection { &mut self, authorized_payload: AuthorizedPayload, ) { - let mut state = self.protocol.handle.state.lock(); + let state_handle = self.protocol.handle.state.clone(); + let mut state = state_handle.lock(); let authorization = &authorized_payload.authorized.authorization; let msg = authorized_payload.msg(); - // check if this is an old payload - if authorization.timestamp < state.payload_timestamp { + // Check if this payload is older than our current view by more than the allowed + // grace window. + if authorization.timestamp + < state + .payload_timestamp + .saturating_sub(AUTHORIZATION_TIMESTAMP_GRACE_SEC) + { tracing::warn!( target: "flashblocks::p2p", peer_id = %self.peer_id, + current_timestamp = state.payload_timestamp, timestamp = authorization.timestamp, + grace_sec = AUTHORIZATION_TIMESTAMP_GRACE_SEC, "received flashblock with outdated timestamp", ); self.protocol @@ -252,12 +285,6 @@ impl FlashblocksConnection { return; } - // Check if this is a new payload from this peer - if self.payload_id != msg.payload_id { - self.payload_id = msg.payload_id; - self.received.fill(false); - } - // Check if the payload index is within the allowed range if msg.index as usize > MAX_FLASHBLOCK_INDEX { tracing::error!( @@ -272,10 +299,7 @@ impl FlashblocksConnection { } // Check if this peer is spamming us with the same payload index - let len = self.received.len(); - self.received - .resize_with(len.max(msg.index as usize + 1), || false); - if self.received[msg.index as usize] { + if !self.received_cache_insert((msg.payload_id, msg.index as usize)) { // We've already seen this index from this peer. // They could be trying to DOS us. tracing::warn!( @@ -290,7 +314,6 @@ impl FlashblocksConnection { .reputation_change(self.peer_id, ReputationChangeKind::AlreadySeenTransaction); return; } - self.received[msg.index as usize] = true; state.publishing_status.send_modify(|status| { let active_publishers = match status { diff --git a/crates/flashblocks/primitives/src/p2p.rs b/crates/flashblocks/primitives/src/p2p.rs index 37ceba23..3ef29e15 100644 --- a/crates/flashblocks/primitives/src/p2p.rs +++ b/crates/flashblocks/primitives/src/p2p.rs @@ -19,7 +19,7 @@ use crate::{error::FlashblocksError, primitives::FlashblocksPayloadV1}; pub struct Authorization { /// The unique identifier of the payload this authorization applies to pub payload_id: PayloadId, - /// Unix timestamp when this authorization was created + /// Unix timestamp in seconds corresponding to the PayloadAttributes timestamp pub timestamp: u64, /// The public key of the builder who is authorized to sign messages pub builder_vk: VerifyingKey,