Skip to content

[Issue 1473][Consumer] Fix race in grabConn dropping messages before handler registration#1476

Merged
crossoverJie merged 1 commit intoapache:masterfrom
aleks-lazic:fix/consumer-handler-registration-race-in-grabconn
May 6, 2026
Merged

[Issue 1473][Consumer] Fix race in grabConn dropping messages before handler registration#1476
crossoverJie merged 1 commit intoapache:masterfrom
aleks-lazic:fix/consumer-handler-registration-race-in-grabconn

Conversation

@aleks-lazic
Copy link
Copy Markdown
Contributor

Motivation

MESSAGE and ACTIVE_CONSUMER_CHANGE frames sent by the broker immediately after a successful subscribe RPC are silently dropped. The client logs Consumer not found while active consumer change and Got unexpected message, but the frames are permanently lost.

This happens because grabConn() calls AddConsumeHandler after the subscribe RPC returns. The broker starts delivering frames as soon as the subscribe succeeds, but the connection's read goroutine cannot find the handler yet and discards them.

This is a correctness hazard for consumers using AckCumulative: a later message acknowledged cumulatively can implicitly acknowledge the dropped message before the application ever processes it — permanent silent message loss.

Modifications

Split RequestWithCnxKeySuffix (which is internally GetConnection + RequestOnCnx) into its two constituent operations and insert AddConsumeHandler in between, so the handler is registered before the broker can send any frames.

On subscribe failure, DeleteConsumeHandler cleans up the pre-registered handler. The timeout path sends CloseConsumer via RequestOnCnx on the same connection.

This mirrors the existing pattern in producer_partition.go which already does GetConnectionRegisterListenerRequestOnCnx.

Verifying this change

This change added tests and can be verified as follows:

  • TestGrabConn_HandlerRegisteredBeforeSubscribe — handler is in the map before the subscribe RPC
  • TestGrabConn_HandlerRemovedOnSubscribeFailure — no handler leak on error
  • TestGrabConn_HandlerRemovedOnSubscribeTimeout — cleanup on timeout, close sent on same connection
  • TestGrabConn_BrokerFrameDuringSubscribe — broker frame arriving mid-RPC reaches the consumer
  • TestGrabConn_GetConnectionFailure — early return, no handler registered
  • TestGrabConn_AddConsumeHandlerFailure — early return, no RPC sent

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no

Documentation

  • Does this pull request introduce a new feature? no

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a consumer subscribe ordering race where broker frames (notably MESSAGE and ACTIVE_CONSUMER_CHANGE) could arrive immediately after (or during) subscribe and be dropped because the consumer handler wasn’t registered on the connection yet.

Changes:

  • Refactors partitionConsumer.grabConn() to explicitly GetConnection and register the consume handler before issuing the subscribe RPC via RequestOnCnx.
  • Adds cleanup on subscribe RPC failure/timeout (delete handler; send close-on-timeout on the same connection).
  • Adds targeted unit tests around handler registration ordering and cleanup behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
pulsar/consumer_partition.go Reorders connection acquisition / handler registration vs. subscribe RPC to close the handler-registration race.
pulsar/consumer_partition_test.go Adds new grabConn-focused tests with spy connection/RPC client to validate ordering and cleanup paths.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pulsar/consumer_partition.go
Comment thread pulsar/consumer_partition.go Outdated
Comment thread pulsar/consumer_partition_test.go
@aleks-lazic aleks-lazic force-pushed the fix/consumer-handler-registration-race-in-grabconn branch 2 times, most recently from 02d1718 to f80ea0d Compare April 9, 2026 12:26
@aleks-lazic
Copy link
Copy Markdown
Contributor Author

Ready for another review @crossoverJie

@aleks-lazic
Copy link
Copy Markdown
Contributor Author

aleks-lazic commented Apr 15, 2026

Failing integration tests are coming from unrelated flaky tests.
--- FAIL: TestTokenAuth (30.00s)
--- FAIL: TestTokenAuthWithClientVersion (30.01s)

cc @crossoverJie @RobertIndie is it possible to get another review on this PR ? thanks.

@aleks-lazic
Copy link
Copy Markdown
Contributor Author

cc @crossoverJie @RobertIndie — just following up, happy to address any feedback!

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 4, 2026

Local review by Claude Code

This is a code review performed locally by Claude Code (model: Opus 4.7). No automated tools posted this; a human reviewer ran /pr-review against the diff and metadata.

Verdict: Approve. Bug is real, fix is correct, tests are thorough.

Summary

The race is real and the fix is well-implemented. GetConnection → _setConn → AddConsumeHandler → RequestOnCnx mirrors the existing producer pattern in producer_partition.go:320-345. Failure paths cleanly delete the handler, restore the previous connection, and route the timeout CLOSE_CONSUMER over the same cnx rather than re-resolving from the pool (which is actually slightly more correct — consumer IDs are per-connection).

Correctness checks confirmed

  • cnx from cnxPool.GetConnection is identical to res.Cnx from RequestOnCnx(cnx, ...) (internal/rpc_client.go:175-186), so removing the post-success _setConn(res.Cnx) is safe.
  • pc.conn is atomic.Pointer[internal.Connection] (not atomic.Value), so pc.conn.Store(nil) does not panic.
  • Reconnect callers discard the return value into a retry loop (consumer_partition.go:2055-2114); the prevConn rollback preserves prior semantics that pc.conn survives a failed reconnect unchanged.

Nits (non-blocking)

  1. Triplicated cleanup block — the "delete handler + restore prevConn (or store nil)" sequence is inlined three times. A small restoreConn := func() { ... } closure or a defer gated on a subscribed flag would be cleaner.
  2. _getConn() invariant comment at consumer_partition.go:2496 claims conn is non-nil for the lifetime of the consumer. In practice it's only non-nil after the first successful subscribe (true before this PR too — initial-subscribe failure left the zero-value pointer). Worth tightening the comment in a follow-up.
  3. Test connectedCh drainTestGrabConn_HandlerRegisteredBeforeSubscribe and TestGrabConn_BrokerFrameDuringSubscribe hit the success path which spawns go func() { pc.connectedCh <- struct{}{} }(). Channel is buffered (size 1) so the goroutine completes, but draining it (<-pc.connectedCh) would assert correctness and protect against future buffer-size changes.
  4. Helper init asymmetrynewGrabConnTestConsumerNoConn initializes pc.availablePermits but newGrabConnTestConsumer does not. A one-line comment explaining that only the MessageReceived path needs it would help future readers.

Intent vs implementation

The PR description's claim that the fix "mirrors the existing pattern in producer_partition.go" is accurate; the consumer version adds extra prevConn rollback that the producer doesn't bother with, which is a small improvement.

@aleks-lazic aleks-lazic force-pushed the fix/consumer-handler-registration-race-in-grabconn branch from f80ea0d to 2ff4a03 Compare May 4, 2026 13:15
@aleks-lazic
Copy link
Copy Markdown
Contributor Author

Thanks @lhotari, made the changes. Looks better 👍

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 4, 2026

Something is causing integration tests to fail on 1.26. I didn't see a test failure in the logs. I'm not sure if it's passing on master branch either.

@aleks-lazic
Copy link
Copy Markdown
Contributor Author

I had those failing in the past (unrelated flaky tests)
--- FAIL: TestTokenAuth (30.00s)
--- FAIL: TestTokenAuthWithClientVersion (30.01s)

But now I see that all checks have passed.

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great job @aleks-lazic

@crossoverJie crossoverJie merged commit 6db892c into apache:master May 6, 2026
11 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants