Skip to content

Commit 431c853

Browse files
sanityclaude
andauthored
fix: prevent pipe_stream stall on multi-hop PUT forwarding (#3568)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2a959ae commit 431c853

File tree

2 files changed

+267
-23
lines changed

2 files changed

+267
-23
lines changed

crates/core/src/transport/peer_connection/outbound_stream.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,51 @@ pub(super) async fn pipe_stream<S: super::super::Socket, T: TimeSource>(
361361
let mut fragment_number = 1u32;
362362
let mut pending_metadata = metadata;
363363

364-
while let Some(result) = stream.next().await {
365-
let payload = match result {
364+
// Inactivity timeout for piped streams. If no fragment arrives from the
365+
// inbound buffer within this duration, the pipe fails rather than hanging
366+
// indefinitely. This matches STREAM_INACTIVITY_TIMEOUT used by assemble().
367+
use super::streaming::STREAM_INACTIVITY_TIMEOUT;
368+
let inactivity_timeout = STREAM_INACTIVITY_TIMEOUT;
369+
370+
loop {
371+
// Use tokio::select! with time_source.sleep() for DST compatibility.
372+
// tokio::time::timeout uses real timers which don't advance in
373+
// VirtualTime simulation tests.
374+
let next_fragment = tokio::select! {
375+
result = stream.next() => {
376+
match result {
377+
Some(r) => r,
378+
None => break, // Stream complete
379+
}
380+
}
381+
_ = time_source.sleep(inactivity_timeout) => {
382+
// No fragment arrived within the inactivity timeout
383+
let elapsed = time_source.now().saturating_sub(start_time);
384+
tracing::warn!(
385+
stream_id = %outbound_stream_id.0,
386+
destination = %destination_addr,
387+
sent_so_far,
388+
total_bytes,
389+
fragment_number,
390+
elapsed_ms = elapsed.as_millis(),
391+
"pipe_stream stalled: no fragment received within {}s",
392+
inactivity_timeout.as_secs()
393+
);
394+
emit_transfer_failed(
395+
outbound_stream_id.0 as u64,
396+
destination_addr,
397+
sent_so_far,
398+
format!(
399+
"pipe stalled: no fragment for {}s (sent {sent_so_far}/{total_bytes} bytes)",
400+
inactivity_timeout.as_secs()
401+
),
402+
elapsed.as_millis() as u64,
403+
TransferDirection::Send,
404+
);
405+
return Err(TransportError::ConnectionClosed(destination_addr));
406+
}
407+
};
408+
let payload = match next_fragment {
366409
Ok(data) => data,
367410
Err(e) => {
368411
let elapsed = time_source.now().saturating_sub(start_time);

crates/core/src/transport/peer_connection/streaming.rs

Lines changed: 222 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
3232
use bytes::Bytes;
3333
use dashmap::DashMap;
34+
use event_listener::EventListener;
3435
use futures::Stream;
36+
use std::future::Future;
3537
use std::pin::Pin;
3638
use std::sync::Arc;
3739
use std::task::{Context, Poll, Waker};
@@ -198,6 +200,7 @@ impl StreamHandle {
198200
next_fragment: 1,
199201
bytes_read: 0,
200202
auto_reclaim: false,
203+
listener: None,
201204
}
202205
}
203206

@@ -246,15 +249,35 @@ impl StreamHandle {
246249
next_fragment: 1,
247250
bytes_read: 0,
248251
auto_reclaim: true,
252+
listener: None,
249253
}
250254
}
251255

252256
/// Forks this handle, creating an independent consumer.
253257
///
254-
/// The forked handle shares the same underlying buffer but maintains
255-
/// its own read position when used with `.stream()`.
258+
/// The forked handle shares the same underlying buffer but has its own
259+
/// independent `SyncState` (cancelled flag and wakers). This means:
260+
/// - Cancelling the original handle does NOT cancel the fork
261+
/// - Each consumer has its own waker list
262+
/// - Fragment notifications reach the fork via the buffer's `Event` notifier
263+
///
264+
/// This independence is critical for piped stream forwarding: when the
265+
/// upstream `PeerConnection` drops and cancels its handles, the pipe task
266+
/// (which reads from a forked handle) must continue forwarding data that's
267+
/// already in the shared buffer.
256268
pub fn fork(&self) -> Self {
257-
self.clone()
269+
// Preserve the current cancelled state so forking an already-dead
270+
// stream doesn't resurrect it. Future cancellations on the original
271+
// won't propagate to the fork (independent SyncState).
272+
let already_cancelled = self.sync.read().cancelled;
273+
let mut sync = SyncState::new();
274+
sync.cancelled = already_cancelled;
275+
Self {
276+
buffer: self.buffer.clone(),
277+
sync: Arc::new(parking_lot::RwLock::new(sync)),
278+
stream_id: self.stream_id,
279+
total_bytes: self.total_bytes,
280+
}
258281
}
259282

260283
/// Inserts a fragment into the stream buffer.
@@ -415,6 +438,11 @@ pub struct StreamingInboundStream {
415438
/// If true, fragments are taken (removed) from the buffer after reading.
416439
/// This enables progressive memory reclamation for single-consumer scenarios.
417440
auto_reclaim: bool,
441+
/// Listener for buffer data_available notifications.
442+
/// Created when poll_next returns Pending, consumed when notified.
443+
/// Uses the buffer's `event_listener::Event` which is fired by every
444+
/// `buffer.insert()` — independent of which handle called `push_fragment`.
445+
listener: Option<Pin<Box<EventListener>>>,
418446
}
419447

420448
impl StreamingInboundStream {
@@ -460,7 +488,7 @@ impl Stream for StreamingInboundStream {
460488
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461489
let next_idx = self.next_fragment;
462490

463-
// Check cancelled state (sync lock)
491+
// Check cancelled state (per-handle, independent for forks)
464492
if self.handle.sync.read().cancelled {
465493
return Poll::Ready(Some(Err(StreamError::Cancelled)));
466494
}
@@ -480,31 +508,83 @@ impl Stream for StreamingInboundStream {
480508
return Poll::Ready(None);
481509
}
482510

483-
// Try to get the next fragment (lock-free)
511+
// Try to get the next fragment (lock-free fast path)
484512
if let Some(data) = self.try_get_fragment(next_idx) {
513+
self.listener = None;
485514
self.next_fragment = next_idx + 1;
486515
self.bytes_read += data.len() as u64;
487516
return Poll::Ready(Some(Ok(data)));
488517
}
489518

490-
// Fragment not yet available, register waker
491-
{
492-
let mut sync = self.handle.sync.write();
493-
// Re-check cancelled after acquiring write lock
494-
if sync.cancelled {
495-
return Poll::Ready(Some(Err(StreamError::Cancelled)));
496-
}
497-
// Re-check buffer (fragment may have arrived)
498-
if let Some(data) = self.try_get_fragment(next_idx) {
499-
drop(sync); // Release lock before modifying self
500-
self.next_fragment = next_idx + 1;
501-
self.bytes_read += data.len() as u64;
502-
return Poll::Ready(Some(Ok(data)));
503-
}
504-
sync.wakers.push(cx.waker().clone());
519+
// Fragment not yet available.
520+
//
521+
// Following the event_listener pattern: create the listener BEFORE
522+
// re-checking the condition. This prevents a TOCTOU race where a
523+
// notification fires between the check and listener creation.
524+
//
525+
// We use the buffer's `data_available` Event (fired by every
526+
// buffer.insert()) as the primary notification mechanism. This works
527+
// for both original and forked handles since buffer.insert() fires
528+
// regardless of which handle called push_fragment().
529+
//
530+
// cancel() also fires buffer.notifier().notify(), so the listener
531+
// covers both data arrival and cancellation.
532+
if self.listener.is_none() {
533+
self.listener = Some(Box::pin(self.handle.buffer.notifier().listen()));
534+
}
535+
536+
// Re-check after listener creation (the event_listener pattern):
537+
// if a fragment arrived between the fast-path check and listen(),
538+
// we catch it here instead of missing the notification.
539+
if self.handle.sync.read().cancelled {
540+
self.listener = None;
541+
return Poll::Ready(Some(Err(StreamError::Cancelled)));
542+
}
543+
if let Some(data) = self.try_get_fragment(next_idx) {
544+
self.listener = None;
545+
self.next_fragment = next_idx + 1;
546+
self.bytes_read += data.len() as u64;
547+
return Poll::Ready(Some(Ok(data)));
505548
}
506549

507-
Poll::Pending
550+
// Poll the listener. The listener is guaranteed to be Some here
551+
// (set above), but we use if-let to satisfy the no-unwrap rule.
552+
if let Some(listener) = self.listener.as_mut() {
553+
match listener.as_mut().poll(cx) {
554+
Poll::Ready(()) => {
555+
// Notified — clear listener and re-check buffer inline
556+
// (avoids wake_by_ref spin-loop).
557+
self.listener = None;
558+
if self.handle.sync.read().cancelled {
559+
return Poll::Ready(Some(Err(StreamError::Cancelled)));
560+
}
561+
if let Some(data) = self.try_get_fragment(next_idx) {
562+
self.next_fragment = next_idx + 1;
563+
self.bytes_read += data.len() as u64;
564+
return Poll::Ready(Some(Ok(data)));
565+
}
566+
// Spurious notification. Create a fresh listener,
567+
// re-check, and register waker.
568+
self.listener = Some(Box::pin(self.handle.buffer.notifier().listen()));
569+
if let Some(data) = self.try_get_fragment(next_idx) {
570+
self.listener = None;
571+
self.next_fragment = next_idx + 1;
572+
self.bytes_read += data.len() as u64;
573+
return Poll::Ready(Some(Ok(data)));
574+
}
575+
if let Some(new_listener) = self.listener.as_mut() {
576+
match new_listener.as_mut().poll(cx) {
577+
Poll::Ready(()) => self.listener = None,
578+
Poll::Pending => {}
579+
}
580+
}
581+
Poll::Pending
582+
}
583+
Poll::Pending => Poll::Pending,
584+
}
585+
} else {
586+
Poll::Pending
587+
}
508588
}
509589
}
510590

@@ -656,6 +736,127 @@ mod tests {
656736
assert_eq!(handle.try_assemble(), forked.try_assemble());
657737
}
658738

739+
#[test]
740+
fn test_fork_independent_cancellation() {
741+
use super::super::streaming_buffer::FRAGMENT_PAYLOAD_SIZE;
742+
743+
// Use exact fragment size so one fragment completes the stream
744+
let total = FRAGMENT_PAYLOAD_SIZE as u64;
745+
let data = vec![42u8; FRAGMENT_PAYLOAD_SIZE];
746+
let handle = StreamHandle::new(make_stream_id(), total);
747+
handle.push_fragment(1, Bytes::from(data.clone())).unwrap();
748+
749+
let forked = handle.fork();
750+
751+
// Cancel the original — simulates PeerConnection::Drop
752+
handle.cancel();
753+
754+
// Original is cancelled
755+
assert!(handle.sync.read().cancelled);
756+
assert!(matches!(
757+
handle.push_fragment(1, Bytes::from(vec![0u8; 10])),
758+
Err(StreamError::Cancelled)
759+
));
760+
761+
// Fork is NOT cancelled — pipe task can continue reading
762+
assert!(!forked.sync.read().cancelled);
763+
764+
// Fork can still read data from the shared buffer
765+
assert!(forked.is_complete());
766+
assert_eq!(forked.try_assemble(), Some(data));
767+
}
768+
769+
#[tokio::test]
770+
async fn test_fork_stream_reads_after_original_cancel() {
771+
use super::super::streaming_buffer::FRAGMENT_PAYLOAD_SIZE;
772+
773+
let total = (FRAGMENT_PAYLOAD_SIZE * 2) as u64;
774+
let handle = StreamHandle::new(make_stream_id(), total);
775+
776+
// Push both fragments via the original handle
777+
handle
778+
.push_fragment(1, Bytes::from(vec![1u8; FRAGMENT_PAYLOAD_SIZE]))
779+
.unwrap();
780+
handle
781+
.push_fragment(2, Bytes::from(vec![2u8; FRAGMENT_PAYLOAD_SIZE]))
782+
.unwrap();
783+
784+
// Fork and create a stream from it
785+
let forked = handle.fork();
786+
let mut stream = forked.stream();
787+
788+
// Cancel the original (simulates PeerConnection drop)
789+
handle.cancel();
790+
791+
// The forked stream should still be able to read all fragments
792+
let chunk1 = stream.next().await;
793+
assert!(chunk1.is_some());
794+
assert_eq!(chunk1.unwrap().unwrap().len(), FRAGMENT_PAYLOAD_SIZE);
795+
796+
let chunk2 = stream.next().await;
797+
assert!(chunk2.is_some());
798+
assert_eq!(chunk2.unwrap().unwrap().len(), FRAGMENT_PAYLOAD_SIZE);
799+
800+
// Stream complete
801+
let chunk3 = stream.next().await;
802+
assert!(chunk3.is_none());
803+
}
804+
805+
#[tokio::test]
806+
async fn test_fork_incremental_wakeup() {
807+
use super::super::streaming_buffer::FRAGMENT_PAYLOAD_SIZE;
808+
809+
let total = (FRAGMENT_PAYLOAD_SIZE * 3) as u64;
810+
let handle = StreamHandle::new(make_stream_id(), total);
811+
812+
// Push only fragment #1
813+
handle
814+
.push_fragment(1, Bytes::from(vec![1u8; FRAGMENT_PAYLOAD_SIZE]))
815+
.unwrap();
816+
817+
// Fork and start reading
818+
let forked = handle.fork();
819+
let mut stream = forked.stream();
820+
821+
// Read fragment #1 (available immediately)
822+
let chunk1 = stream.next().await;
823+
assert!(chunk1.is_some());
824+
assert_eq!(chunk1.unwrap().unwrap(), vec![1u8; FRAGMENT_PAYLOAD_SIZE]);
825+
826+
// Fragment #2 arrives later via the ORIGINAL handle's push_fragment.
827+
// The forked stream must wake up via the buffer's EventListener
828+
// (not via SyncState wakers, which are on the fork's independent sync).
829+
let handle_clone = handle.clone();
830+
tokio::spawn(async move {
831+
tokio::task::yield_now().await;
832+
handle_clone
833+
.push_fragment(2, Bytes::from(vec![2u8; FRAGMENT_PAYLOAD_SIZE]))
834+
.unwrap();
835+
});
836+
837+
// The forked stream should receive fragment #2 via EventListener
838+
let chunk2 = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()).await;
839+
assert!(
840+
chunk2.is_ok(),
841+
"fork should wake up when fragment arrives via original handle"
842+
);
843+
let chunk2 = chunk2.unwrap();
844+
assert!(chunk2.is_some());
845+
assert_eq!(chunk2.unwrap().unwrap(), vec![2u8; FRAGMENT_PAYLOAD_SIZE]);
846+
847+
// Push fragment #3 and verify
848+
handle
849+
.push_fragment(3, Bytes::from(vec![3u8; FRAGMENT_PAYLOAD_SIZE]))
850+
.unwrap();
851+
let chunk3 = stream.next().await;
852+
assert!(chunk3.is_some());
853+
assert_eq!(chunk3.unwrap().unwrap(), vec![3u8; FRAGMENT_PAYLOAD_SIZE]);
854+
855+
// Stream complete
856+
let end = stream.next().await;
857+
assert!(end.is_none());
858+
}
859+
659860
#[tokio::test]
660861
async fn test_streaming_inbound_stream_basic() {
661862
let handle = StreamHandle::new(make_stream_id(), 15);

0 commit comments

Comments
 (0)