Skip to content

Commit cb51c62

Browse files
authored
fix: prevent GET retry storms cycling through same peers (#3584)
1 parent a0e108b commit cb51c62

File tree

2 files changed

+215
-8
lines changed

2 files changed

+215
-8
lines changed

crates/core/src/operations/get.rs

Lines changed: 210 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ const MAX_RETRIES: usize = 10;
3131
/// Maximum number of peer attempts at each hop level
3232
const DEFAULT_MAX_BREADTH: usize = 3;
3333

34+
/// Minimum HTL for speculative retries.
35+
///
36+
/// Retries use a reduced HTL (capped at current_hop) to avoid full-depth
37+
/// traversal storms. This floor ensures retries still reach peers 2-3 hops
38+
/// away, which is the minimum useful search depth in any topology.
39+
const MIN_RETRY_HTL: usize = 3;
40+
3441
pub(crate) fn start_op(
3542
instance_id: ContractInstanceId,
3643
fetch_contract: bool,
@@ -710,6 +717,11 @@ impl GetOp {
710717
visited: visited.clone(),
711718
}));
712719

720+
// Note: handle_abort uses current_hop directly (not the attempts-based
721+
// reduction from retry_with_next_alternative). Connection aborts are
722+
// immediate failures, not timeout-based retries, so they don't contribute
723+
// to retry storms the same way. The bloom filter and tried_peers already
724+
// prevent cycling through the same peers. (#3570)
713725
let msg = GetMsg::Request {
714726
id: self.id,
715727
instance_id,
@@ -978,11 +990,15 @@ impl GetOp {
978990
};
979991
match state {
980992
GetState::AwaitingResponse(mut data) => {
981-
// If local alternatives exhausted, inject fallback peers we haven't tried
993+
// If local alternatives exhausted, inject fallback peers we haven't tried.
994+
// Filter through BOTH tried_peers (this hop) AND visited bloom filter
995+
// (all hops) to avoid retry storms cycling through the same peers (#3570).
982996
if data.alternatives.is_empty() && !fallback_peers.is_empty() {
983997
for peer in fallback_peers {
984998
if let Some(addr) = peer.socket_addr() {
985-
if !data.tried_peers.contains(&addr) {
999+
if !data.tried_peers.contains(&addr)
1000+
&& !data.visited.probably_visited(addr)
1001+
{
9861002
data.alternatives.push(peer.clone());
9871003
}
9881004
}
@@ -1007,6 +1023,9 @@ impl GetOp {
10071023
let fetch_contract = data.fetch_contract;
10081024
if let Some(addr) = next_target.socket_addr() {
10091025
data.tried_peers.insert(addr);
1026+
// Mark in bloom filter so downstream peers won't forward back,
1027+
// and future retries won't select this peer again (#3570).
1028+
data.visited.mark_visited(addr);
10101029
}
10111030
tracing::info!(
10121031
tx = %self.id,
@@ -1019,13 +1038,22 @@ impl GetOp {
10191038
data.attempts_at_hop += 1;
10201039
let visited = data.visited.clone();
10211040

1041+
// Reduce HTL on each retry to avoid full-depth traversal storms (#3570).
1042+
// At the originator, current_hop == max_hops_to_live, so we can't use
1043+
// current_hop alone. Instead, halve the HTL for each retry attempt,
1044+
// floored at MIN_RETRY_HTL. This limits the blast radius of retries
1045+
// while still allowing the request to reach nearby contract holders.
1046+
let retry_htl = (max_hops_to_live / (data.attempts_at_hop.max(1)))
1047+
.max(MIN_RETRY_HTL)
1048+
.min(max_hops_to_live);
1049+
10221050
self.state = Some(GetState::AwaitingResponse(data));
10231051

10241052
let msg = GetMsg::Request {
10251053
id: self.id,
10261054
instance_id,
10271055
fetch_contract,
1028-
htl: max_hops_to_live,
1056+
htl: retry_htl,
10291057
visited,
10301058
};
10311059

@@ -4065,6 +4093,164 @@ mod tests {
40654093
assert!(result.is_err(), "All fallback peers already tried");
40664094
}
40674095

4096+
/// Verify that retry HTL decreases with each attempt (#3570).
4097+
///
4098+
/// At the originator, current_hop == max_hops_to_live, so the old code
4099+
/// (htl: max_hops_to_live) always sent full-depth retries. The fix divides
4100+
/// HTL by attempts_at_hop, creating progressively shorter retry chains.
4101+
#[test]
4102+
fn retry_htl_decreases_with_attempts() {
4103+
let alt1 = make_peer(6001);
4104+
let alt2 = make_peer(6002);
4105+
let alt3 = make_peer(6003);
4106+
let op = make_awaiting_op(vec![alt1, alt2, alt3], &[]);
4107+
4108+
// make_awaiting_op sets attempts_at_hop=1, then retry increments to 2 before computing HTL.
4109+
// First retry: attempts_at_hop becomes 2 → 10/2 = 5
4110+
let (op, msg) = op
4111+
.retry_with_next_alternative(10, &[])
4112+
.unwrap_or_else(|_| panic!("retry 1 failed"));
4113+
match &msg {
4114+
GetMsg::Request { htl, .. } => {
4115+
assert_eq!(
4116+
*htl, 5,
4117+
"First retry: 10/2=5 (attempts_at_hop incremented to 2)"
4118+
);
4119+
}
4120+
other @ GetMsg::Response { .. }
4121+
| other @ GetMsg::ResponseStreaming { .. }
4122+
| other @ GetMsg::ResponseStreamingAck { .. }
4123+
| other @ GetMsg::ForwardingAck { .. } => {
4124+
panic!("Expected Request, got {other}")
4125+
}
4126+
}
4127+
4128+
// Second retry: attempts_at_hop becomes 3 → 10/3 = 3
4129+
let (op, msg) = op
4130+
.retry_with_next_alternative(10, &[])
4131+
.unwrap_or_else(|_| panic!("retry 2 failed"));
4132+
match &msg {
4133+
GetMsg::Request { htl, .. } => {
4134+
assert_eq!(
4135+
*htl, MIN_RETRY_HTL,
4136+
"Second retry: 10/3=3 (clamped to MIN_RETRY_HTL)"
4137+
);
4138+
}
4139+
other @ GetMsg::Response { .. }
4140+
| other @ GetMsg::ResponseStreaming { .. }
4141+
| other @ GetMsg::ResponseStreamingAck { .. }
4142+
| other @ GetMsg::ForwardingAck { .. } => {
4143+
panic!("Expected Request, got {other}")
4144+
}
4145+
}
4146+
4147+
// Third retry: attempts_at_hop becomes 4 → 10/4 = 2 → clamped to MIN_RETRY_HTL=3
4148+
let (_op, msg) = op
4149+
.retry_with_next_alternative(10, &[])
4150+
.unwrap_or_else(|_| panic!("retry 3 failed"));
4151+
match &msg {
4152+
GetMsg::Request { htl, .. } => {
4153+
assert_eq!(
4154+
*htl, MIN_RETRY_HTL,
4155+
"Third retry: 10/4=2 → clamped to MIN_RETRY_HTL"
4156+
);
4157+
}
4158+
other @ GetMsg::Response { .. }
4159+
| other @ GetMsg::ResponseStreaming { .. }
4160+
| other @ GetMsg::ResponseStreamingAck { .. }
4161+
| other @ GetMsg::ForwardingAck { .. } => {
4162+
panic!("Expected Request, got {other}")
4163+
}
4164+
}
4165+
}
4166+
4167+
/// Verify that MIN_RETRY_HTL floor is applied even when max_htl is low.
4168+
#[test]
4169+
fn retry_htl_floor_at_min() {
4170+
let alt = make_peer(7001);
4171+
let mut op = make_awaiting_op(vec![alt], &[]);
4172+
// Simulate many attempts to push HTL below floor
4173+
if let Some(GetState::AwaitingResponse(ref mut data)) = op.state {
4174+
data.attempts_at_hop = 20;
4175+
}
4176+
4177+
let (_op, msg) = op
4178+
.retry_with_next_alternative(10, &[])
4179+
.unwrap_or_else(|_| panic!("retry failed"));
4180+
match &msg {
4181+
GetMsg::Request { htl, .. } => {
4182+
assert!(
4183+
*htl >= MIN_RETRY_HTL,
4184+
"HTL {} should not fall below MIN_RETRY_HTL {}",
4185+
htl,
4186+
MIN_RETRY_HTL
4187+
);
4188+
}
4189+
other @ GetMsg::Response { .. }
4190+
| other @ GetMsg::ResponseStreaming { .. }
4191+
| other @ GetMsg::ResponseStreamingAck { .. }
4192+
| other @ GetMsg::ForwardingAck { .. } => {
4193+
panic!("Expected Request, got {other}")
4194+
}
4195+
}
4196+
}
4197+
4198+
/// Verify that bloom-filter-visited peers are excluded from fallback injection (#3570).
4199+
#[test]
4200+
fn retry_dbf_fallback_skips_bloom_visited() {
4201+
let visited_peer = make_peer(8001);
4202+
let fresh_peer = make_peer(8002);
4203+
4204+
// Create op with the visited peer marked in the bloom filter
4205+
let mut op = make_awaiting_op(vec![], &[]);
4206+
if let Some(GetState::AwaitingResponse(ref mut data)) = op.state {
4207+
if let Some(addr) = visited_peer.socket_addr() {
4208+
data.visited.mark_visited(addr);
4209+
}
4210+
// Remove visited_peer from tried_peers so only bloom filter catches it
4211+
if let Some(addr) = visited_peer.socket_addr() {
4212+
data.tried_peers.remove(&addr);
4213+
}
4214+
}
4215+
4216+
let result = op.retry_with_next_alternative(7, &[visited_peer.clone(), fresh_peer.clone()]);
4217+
assert!(
4218+
result.is_ok(),
4219+
"Should find fresh_peer despite visited_peer in bloom"
4220+
);
4221+
4222+
let (new_op, _msg) = result.unwrap_or_else(|_| panic!("retry failed"));
4223+
if let Some(GetState::AwaitingResponse(data)) = &new_op.state {
4224+
assert_eq!(
4225+
data.alternatives.len(),
4226+
0,
4227+
"Only fresh_peer should be injected (visited_peer filtered by bloom)"
4228+
);
4229+
} else {
4230+
panic!("Expected AwaitingResponse state");
4231+
}
4232+
}
4233+
4234+
/// Verify that retry targets are marked in the bloom filter (#3570).
4235+
#[test]
4236+
fn retry_marks_target_in_bloom_filter() {
4237+
let alt = make_peer(9001);
4238+
let alt_addr = alt.socket_addr().unwrap();
4239+
let op = make_awaiting_op(vec![alt], &[]);
4240+
4241+
let (new_op, _msg) = op
4242+
.retry_with_next_alternative(7, &[])
4243+
.unwrap_or_else(|_| panic!("retry failed"));
4244+
if let Some(GetState::AwaitingResponse(data)) = &new_op.state {
4245+
assert!(
4246+
data.visited.probably_visited(alt_addr),
4247+
"Retry target should be marked in bloom filter"
4248+
);
4249+
} else {
4250+
panic!("Expected AwaitingResponse state");
4251+
}
4252+
}
4253+
40684254
#[test]
40694255
fn retry_wrong_state_returns_err() {
40704256
// PrepareRequest state
@@ -4328,10 +4514,22 @@ mod tests {
43284514
panic!("GC retry should produce a message when alternatives exist")
43294515
});
43304516

4331-
// Verify the retry message targets the first alternative
4517+
// Verify the retry message uses reduced HTL (not full max_htl).
4518+
// make_awaiting_op sets attempts_at_hop=1, retry increments to 2 → 7/2=3.
43324519
match &msg {
43334520
GetMsg::Request { htl, .. } => {
4334-
assert_eq!(*htl, max_htl, "Retry should use max_htl");
4521+
assert!(
4522+
*htl <= max_htl,
4523+
"Retry HTL ({}) should not exceed max_htl ({})",
4524+
htl,
4525+
max_htl
4526+
);
4527+
assert!(
4528+
*htl >= MIN_RETRY_HTL,
4529+
"Retry HTL ({}) should be at least MIN_RETRY_HTL ({})",
4530+
htl,
4531+
MIN_RETRY_HTL
4532+
);
43354533
}
43364534
GetMsg::Response { .. }
43374535
| GetMsg::ResponseStreaming { .. }
@@ -4467,7 +4665,12 @@ mod tests {
44674665
assert_eq!(deser_id, id);
44684666
assert_eq!(deser_iid, instance_id);
44694667
}
4470-
other => panic!("Expected ForwardingAck, got {other}"),
4668+
other @ GetMsg::Request { .. }
4669+
| other @ GetMsg::Response { .. }
4670+
| other @ GetMsg::ResponseStreaming { .. }
4671+
| other @ GetMsg::ResponseStreamingAck { .. } => {
4672+
panic!("Expected ForwardingAck, got {other}")
4673+
}
44714674
}
44724675
}
44734676

@@ -4574,7 +4777,7 @@ mod tests {
45744777
let base_ms = 1_700_000_000_000;
45754778
GlobalSimulationTime::set_time_ms(base_ms);
45764779

4577-
let mut op = make_awaiting_op(vec![make_peer(5001), make_peer(5002)], &[]);
4780+
let op = make_awaiting_op(vec![make_peer(5001), make_peer(5002)], &[]);
45784781
assert_eq!(op.speculative_paths, 0);
45794782
assert!(!op.ack_received);
45804783

crates/core/src/operations/subscribe/tests.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2094,6 +2094,10 @@ fn subscribe_forwarding_ack_serde_roundtrip() {
20942094
assert_eq!(deser_id, id);
20952095
assert_eq!(deser_iid, instance_id);
20962096
}
2097-
other => panic!("Expected ForwardingAck, got {other}"),
2097+
other @ SubscribeMsg::Request { .. }
2098+
| other @ SubscribeMsg::Response { .. }
2099+
| other @ SubscribeMsg::Unsubscribe { .. } => {
2100+
panic!("Expected ForwardingAck, got {other}")
2101+
}
20982102
}
20992103
}

0 commit comments

Comments
 (0)