Skip to content

Async client silently drops the final DATA frame of a server-streaming RPC #311

@shvbsle

Description

@shvbsle

Description of problem

Async client silently drops the final DATA frame of a server-streaming RPC.

ClientReader::handle_msg in src/asynchronous/client.rs does tokio::spawn per incoming frame. For a server-streaming RPC where the server sends a payload and returns, ttrpc puts a DATA frame on the wire immediately followed by a FLAG_REMOTE_CLOSED frame. The two spawned tasks race for the req_map mutex in get_resp_tx. If the close task wins, it removes the stream from the map; the data task then looks it up, finds nothing, and returns via the Receiver got unknown data packet debug path. The payload is silently dropped.

Steps to reproduce on current main, on any multi-core Linux host:

  1. Apply this diff to the existing example so it exercises the shape that triggers the race:
diff --git a/example/Cargo.toml b/example/Cargo.toml
--- a/example/Cargo.toml
+++ b/example/Cargo.toml
@@
-tokio = { version = "1.0.1", features = ["signal", "time"] }
+tokio = { version = "1.0.1", features = ["signal", "time", "rt-multi-thread", "macros"] }

diff --git a/example/async-stream-client.rs b/example/async-stream-client.rs
--- a/example/async-stream-client.rs
+++ b/example/async-stream-client.rs
@@
-#[tokio::main(flavor = "current_thread")]
+#[tokio::main(flavor = "multi_thread")]
 async fn main() {
@@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
-    let mut stream = cli
-        .echo_default_value(default_ctx(), &Default::default())
-        .await
-        .unwrap();
-
-    let received = stream.recv().await.unwrap().unwrap();
-
-    assert_eq!(received.seq, 0);
-    assert_eq!(received.msg, "");
+    for _ in 0..1000 {
+        let mut stream = cli
+            .echo_default_value(default_ctx(), &Default::default())
+            .await
+            .unwrap();
+
+        let received = stream.recv().await.unwrap().unwrap();
+
+        assert_eq!(received.seq, 0);
+        assert_eq!(received.msg, "");
+    }
 }
  1. Build and run:
cargo build --release --example async-stream-server --example async-stream-client
./target/release/examples/async-stream-server &
./target/release/examples/async-stream-client

The loop alone is not enough to reproduce. The runtime flavor change is what exposes the race, because current_thread runs spawned tasks in submission order and hides it.

Expected result

echo_default_value delivers the server's payload on every iteration. stream.recv().await returns Ok(Some(received)) with received.seq == 0 and received.msg == "" all 1000 times.

Actual result

The client panics within the loop:

thread 'tokio-rt-worker' panicked at example/async-stream-client.rs:
called `Option::unwrap()` on a `None` value

stream.recv().await returned Ok(None) even though the server sent a payload. Rerunning the example a few times it fails every time, though which iteration trips it varies.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions