Skip to content

Race condition in grabConn causes silently dropped messages and ActiveConsumerChange events on any consumer subscription type #1473

@aleks-lazic

Description

@aleks-lazic

Expected behavior

Any consumer should receive all ActiveConsumerChange notifications and all
messages published after the subscribe RPC succeeds. When a consumer subscribes,
the broker delivers messages and state changes to it — the Go client should route
them to the consumer's MessageChannel and event listener.

Actual behavior

MESSAGE and ACTIVE_CONSUMER_CHANGE commands sent by the broker immediately
after the subscribe RPC succeeds are silently dropped by the Go client. The
client logs:

WARN  Consumer not found while active consumer change  consumerID=N
WARN  Got unexpected message                           consumerID=N

The consumer is subscribed and active from the broker's perspective — the broker
correctly delivers the commands. But the Go client has not yet registered the
consumer handler in its internal map, so handleMessage and
handleActiveConsumerChange in connection.go cannot find the consumer and
discard the commands.

This race exists on every subscribe call — initial or resubscription, and
for all subscription types (failover, exclusive, shared, key-shared). In
practice the race is most reliably triggered during resubscription on a failover
subscription, because the broker delivers ActiveConsumerChange immediately on
a connection it has already been communicating on. But the underlying window
exists unconditionally.

While the broker will eventually redeliver unacked messages, this creates a
serious correctness hazard for consumers using AckCumulative. A later message
may be acknowledged cumulatively, implicitly acknowledging the dropped message
before the application ever processes it. The message is then permanently lost
from the application's perspective — with no error, no warning, and no way to
detect it.

Steps to reproduce

The following log trace captures a real occurrence. The key sequence:

  1. Consumer 1 subscribes
  2. A price update is published and delivered by the broker
  3. Consumer 1 receives BecameInactive (msg_count=1) — the message was
    received but the consumer wasn't ready, so it could not be processed.
    Consumer 1 is closed and the service resubscribes.
  4. Consumer 3 is created. The broker sends ActiveConsumerChange immediately —
    before AddConsumeHandler is called. The notification is dropped.
  5. BecameActive finally arrives again ~3 seconds later (broker retransmit).
    The persister starts, but the original message was already consumed by
    consumer 1, never processed, and silently lost.

Root cause

This is a race condition between the connection's read goroutine (which
dispatches incoming frames) and the consumer registration via AddConsumeHandler.

In consumer_partition.go, grabConn() registers the consumer handler after
the subscribe RPC returns:

// subscribe RPC blocks until broker responds
res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(...)

// handler registered AFTER subscribe completes
pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)  // ← too late

The broker sends ACTIVE_CONSUMER_CHANGE and MESSAGE commands on the same
connection immediately after the subscribe RPC succeeds. These are processed by
the connection's read goroutine, which looks up the consumer handler by ID.
Since AddConsumeHandler hasn't been called yet, the lookup fails and the
commands are discarded.

Proposed fix

#1476

RequestWithCnxKeySuffix is internally a GetConnection followed by a
RequestOnCnx. The fix splits these two operations apart and inserts
AddConsumeHandler in between, so the handler is registered before the broker
can send any frames.

The same-connection guarantee holds: GetConnection is called with the same
(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix) parameters that
RequestWithCnxKeySuffix already uses internally. cnxKeySuffix is stable —
it is assigned once at construction time and never changes.

// Obtain connection before sending the RPC
cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix)
if err != nil {
    return err
}

// Register handler BEFORE the subscribe RPC so no frames are missed
cnx.AddConsumeHandler(pc.consumerID, pc)

res, err := pc.client.rpcClient.RequestOnCnx(cnx, ...)
if err != nil {
    cnx.DeleteConsumeHandler(pc.consumerID) // clean up on failure
    return err
}

pc._setConn(cnx)

System configuration

Pulsar version 4.x
Client version pulsar-client-go v0.18.0 (also present on current master)
Go version 1.24

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions