Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 210 additions & 7 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ const MAX_RETRIES: usize = 10;
/// Maximum number of peer attempts at each hop level
const DEFAULT_MAX_BREADTH: usize = 3;

/// Minimum HTL for speculative retries.
///
/// Retries use a reduced HTL (capped at current_hop) to avoid full-depth
/// traversal storms. This floor ensures retries still reach peers 2-3 hops
/// away, which is the minimum useful search depth in any topology.
const MIN_RETRY_HTL: usize = 3;

pub(crate) fn start_op(
instance_id: ContractInstanceId,
fetch_contract: bool,
Expand Down Expand Up @@ -710,6 +717,11 @@ impl GetOp {
visited: visited.clone(),
}));

// Note: handle_abort uses current_hop directly (not the attempts-based
// reduction from retry_with_next_alternative). Connection aborts are
// immediate failures, not timeout-based retries, so they don't contribute
// to retry storms the same way. The bloom filter and tried_peers already
// prevent cycling through the same peers. (#3570)
let msg = GetMsg::Request {
id: self.id,
instance_id,
Expand Down Expand Up @@ -978,11 +990,15 @@ impl GetOp {
};
match state {
GetState::AwaitingResponse(mut data) => {
// If local alternatives exhausted, inject fallback peers we haven't tried
// If local alternatives exhausted, inject fallback peers we haven't tried.
// Filter through BOTH tried_peers (this hop) AND visited bloom filter
// (all hops) to avoid retry storms cycling through the same peers (#3570).
if data.alternatives.is_empty() && !fallback_peers.is_empty() {
for peer in fallback_peers {
if let Some(addr) = peer.socket_addr() {
if !data.tried_peers.contains(&addr) {
if !data.tried_peers.contains(&addr)
&& !data.visited.probably_visited(addr)
{
data.alternatives.push(peer.clone());
}
}
Expand All @@ -1007,6 +1023,9 @@ impl GetOp {
let fetch_contract = data.fetch_contract;
if let Some(addr) = next_target.socket_addr() {
data.tried_peers.insert(addr);
// Mark in bloom filter so downstream peers won't forward back,
// and future retries won't select this peer again (#3570).
data.visited.mark_visited(addr);
}
tracing::info!(
tx = %self.id,
Expand All @@ -1019,13 +1038,22 @@ impl GetOp {
data.attempts_at_hop += 1;
let visited = data.visited.clone();

// Reduce HTL on each retry to avoid full-depth traversal storms (#3570).
// At the originator, current_hop == max_hops_to_live, so we can't use
// current_hop alone. Instead, halve the HTL for each retry attempt,
// floored at MIN_RETRY_HTL. This limits the blast radius of retries
// while still allowing the request to reach nearby contract holders.
let retry_htl = (max_hops_to_live / (data.attempts_at_hop.max(1)))
.max(MIN_RETRY_HTL)
.min(max_hops_to_live);

self.state = Some(GetState::AwaitingResponse(data));

let msg = GetMsg::Request {
id: self.id,
instance_id,
fetch_contract,
htl: max_hops_to_live,
htl: retry_htl,
visited,
};

Expand Down Expand Up @@ -4065,6 +4093,164 @@ mod tests {
assert!(result.is_err(), "All fallback peers already tried");
}

/// Verify that retry HTL decreases with each attempt (#3570).
///
/// At the originator, current_hop == max_hops_to_live, so the old code
/// (htl: max_hops_to_live) always sent full-depth retries. The fix divides
/// HTL by attempts_at_hop, creating progressively shorter retry chains.
#[test]
fn retry_htl_decreases_with_attempts() {
let alt1 = make_peer(6001);
let alt2 = make_peer(6002);
let alt3 = make_peer(6003);
let op = make_awaiting_op(vec![alt1, alt2, alt3], &[]);

// make_awaiting_op sets attempts_at_hop=1, then retry increments to 2 before computing HTL.
// First retry: attempts_at_hop becomes 2 → 10/2 = 5
let (op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 1 failed"));
match &msg {
GetMsg::Request { htl, .. } => {
assert_eq!(
*htl, 5,
"First retry: 10/2=5 (attempts_at_hop incremented to 2)"
);
}
other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. }
| other @ GetMsg::ForwardingAck { .. } => {
panic!("Expected Request, got {other}")
}
}

// Second retry: attempts_at_hop becomes 3 → 10/3 = 3
let (op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 2 failed"));
match &msg {
GetMsg::Request { htl, .. } => {
assert_eq!(
*htl, MIN_RETRY_HTL,
"Second retry: 10/3=3 (clamped to MIN_RETRY_HTL)"
);
}
other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. }
| other @ GetMsg::ForwardingAck { .. } => {
panic!("Expected Request, got {other}")
}
}

// Third retry: attempts_at_hop becomes 4 → 10/4 = 2 → clamped to MIN_RETRY_HTL=3
let (_op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry 3 failed"));
match &msg {
GetMsg::Request { htl, .. } => {
assert_eq!(
*htl, MIN_RETRY_HTL,
"Third retry: 10/4=2 → clamped to MIN_RETRY_HTL"
);
}
other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. }
| other @ GetMsg::ForwardingAck { .. } => {
panic!("Expected Request, got {other}")
}
}
}

/// Verify that MIN_RETRY_HTL floor is applied even when max_htl is low.
#[test]
fn retry_htl_floor_at_min() {
let alt = make_peer(7001);
let mut op = make_awaiting_op(vec![alt], &[]);
// Simulate many attempts to push HTL below floor
if let Some(GetState::AwaitingResponse(ref mut data)) = op.state {
data.attempts_at_hop = 20;
}

let (_op, msg) = op
.retry_with_next_alternative(10, &[])
.unwrap_or_else(|_| panic!("retry failed"));
match &msg {
GetMsg::Request { htl, .. } => {
assert!(
*htl >= MIN_RETRY_HTL,
"HTL {} should not fall below MIN_RETRY_HTL {}",
htl,
MIN_RETRY_HTL
);
}
other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. }
| other @ GetMsg::ForwardingAck { .. } => {
panic!("Expected Request, got {other}")
}
}
}

/// Verify that bloom-filter-visited peers are excluded from fallback injection (#3570).
#[test]
fn retry_dbf_fallback_skips_bloom_visited() {
let visited_peer = make_peer(8001);
let fresh_peer = make_peer(8002);

// Create op with the visited peer marked in the bloom filter
let mut op = make_awaiting_op(vec![], &[]);
if let Some(GetState::AwaitingResponse(ref mut data)) = op.state {
if let Some(addr) = visited_peer.socket_addr() {
data.visited.mark_visited(addr);
}
// Remove visited_peer from tried_peers so only bloom filter catches it
if let Some(addr) = visited_peer.socket_addr() {
data.tried_peers.remove(&addr);
}
}

let result = op.retry_with_next_alternative(7, &[visited_peer.clone(), fresh_peer.clone()]);
assert!(
result.is_ok(),
"Should find fresh_peer despite visited_peer in bloom"
);

let (new_op, _msg) = result.unwrap_or_else(|_| panic!("retry failed"));
if let Some(GetState::AwaitingResponse(data)) = &new_op.state {
assert_eq!(
data.alternatives.len(),
0,
"Only fresh_peer should be injected (visited_peer filtered by bloom)"
);
} else {
panic!("Expected AwaitingResponse state");
}
}

/// Verify that retry targets are marked in the bloom filter (#3570).
#[test]
fn retry_marks_target_in_bloom_filter() {
let alt = make_peer(9001);
let alt_addr = alt.socket_addr().unwrap();
let op = make_awaiting_op(vec![alt], &[]);

let (new_op, _msg) = op
.retry_with_next_alternative(7, &[])
.unwrap_or_else(|_| panic!("retry failed"));
if let Some(GetState::AwaitingResponse(data)) = &new_op.state {
assert!(
data.visited.probably_visited(alt_addr),
"Retry target should be marked in bloom filter"
);
} else {
panic!("Expected AwaitingResponse state");
}
}

#[test]
fn retry_wrong_state_returns_err() {
// PrepareRequest state
Expand Down Expand Up @@ -4328,10 +4514,22 @@ mod tests {
panic!("GC retry should produce a message when alternatives exist")
});

// Verify the retry message targets the first alternative
// Verify the retry message uses reduced HTL (not full max_htl).
// make_awaiting_op sets attempts_at_hop=1, retry increments to 2 → 7/2=3.
match &msg {
GetMsg::Request { htl, .. } => {
assert_eq!(*htl, max_htl, "Retry should use max_htl");
assert!(
*htl <= max_htl,
"Retry HTL ({}) should not exceed max_htl ({})",
htl,
max_htl
);
assert!(
*htl >= MIN_RETRY_HTL,
"Retry HTL ({}) should be at least MIN_RETRY_HTL ({})",
htl,
MIN_RETRY_HTL
);
}
GetMsg::Response { .. }
| GetMsg::ResponseStreaming { .. }
Expand Down Expand Up @@ -4467,7 +4665,12 @@ mod tests {
assert_eq!(deser_id, id);
assert_eq!(deser_iid, instance_id);
}
other => panic!("Expected ForwardingAck, got {other}"),
other @ GetMsg::Request { .. }
| other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. } => {
panic!("Expected ForwardingAck, got {other}")
}
}
}

Expand Down Expand Up @@ -4574,7 +4777,7 @@ mod tests {
let base_ms = 1_700_000_000_000;
GlobalSimulationTime::set_time_ms(base_ms);

let mut op = make_awaiting_op(vec![make_peer(5001), make_peer(5002)], &[]);
let op = make_awaiting_op(vec![make_peer(5001), make_peer(5002)], &[]);
assert_eq!(op.speculative_paths, 0);
assert!(!op.ack_received);

Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/operations/subscribe/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,10 @@ fn subscribe_forwarding_ack_serde_roundtrip() {
assert_eq!(deser_id, id);
assert_eq!(deser_iid, instance_id);
}
other => panic!("Expected ForwardingAck, got {other}"),
other @ SubscribeMsg::Request { .. }
| other @ SubscribeMsg::Response { .. }
| other @ SubscribeMsg::Unsubscribe { .. } => {
panic!("Expected ForwardingAck, got {other}")
}
}
}
Loading