Skip to content

GroupTransactSession fetched record buffer not flushed after rebalance/fetch #1222

@tomplarge

Description

@tomplarge

Summary

We are using GroupTransactSession with CooperativeStickyBalancer. After a transaction is aborted due to a rebalance, PollRecords sometimes returns records starting from offsets ahead of the last committed offset. We're trying to understand whether this is expected behavior or a bug.

Environment

  • franz-go version: v1.20.5
  • Go version: 1.23

What We Observed

The following sequence of events occurred. I have annotated the full logs (attached below) with // HERE comments to highlight the key points:

  1. Transaction successfully commits partition 458 up to offset 129752
  2. Subsequent transactions correctly re-fetch partition 458 starting from offset 129753 (correct behavior, noted twice in the logs)
  3. Multiple rebalances occur during processing
  4. A transaction processes records up to offset 136638 and attempts to commit
  5. A rebalance causes the transaction to abort
  6. Offset fetch response confirms last committed offset for partition 458 is 129753
  7. kgo logs: transact session resetting to current committed state and aborted buffered records
  8. New transaction begins, PollRecords is called
  9. The issue: PollRecords returns records starting at offset 136013 for partition 458, when the committed offset is 129753
  10. Application detects stale state: EOS processing gap detectedThe gap between the committed offset (129753) and the fetched offset (136013) is ~6,260 records.

Code

Full kgo Configuration

Here is the kgo configuration we use in production:

func produceConsumerConfig(brokers []string, clientID, consumerGroup string, topics []string, transactionID string) []kgo.Opt {
    return []kgo.Opt{
        kgo.SeedBrokers(brokers...),
        kgo.ClientID(clientID),

        kgo.ConsumerGroup(consumerGroup),
        kgo.ConsumeTopics(topics...),

        // EOS configuration
        kgo.TransactionalID(transactionID),
        kgo.FetchIsolationLevel(kgo.ReadCommitted()),
        kgo.DisableAutoCommit(),
        kgo.RequireStableFetchOffsets(),

        // Producer settings
        kgo.StopProducerOnDataLossDetected(),
        kgo.ProducerBatchCompression(kgo.Lz4Compression()),
        kgo.RecordPartitioner(kgo.StickyKeyPartitioner(nil)),
        kgo.ProducerBatchMaxBytes(10 * 1024 * 1024),
        kgo.ProducerLinger(200 * time.Millisecond),

        // Consumer settings
        kgo.FetchMaxBytes(10_000_000),
        kgo.FetchMaxPartitionBytes(5_000_000),
        kgo.FetchMaxWait(1 * time.Second),
        kgo.ConsumePreferringLagFn(kgo.PreferLagAt(50)),

        kgo.Balancers(kgo.CooperativeStickyBalancer()),

        // Metadata settings
        kgo.MetadataMaxAge(60 * time.Second),
    }
}

Code Pattern

func main() {
    opts := produceConsumerConfig(
        []string{"my-broker:9092"},
        "my-app",
        "my-app.consumer-group",
        []string{"my-app.input-topic"},
        "my-app.transactional-id",
    )

    session, _ := kgo.NewGroupTransactSession(opts...)
    defer session.Client().Close()

    Run(context.Background(), session)
}

func Run(ctx context.Context, session *kgo.GroupTransactSession) {
    for ctx.Err() == nil {
        err := run(ctx, session)
        if err != nil {
            continue
        }
    }
}

func run(ctx context.Context, session *kgo.GroupTransactSession) error {
    for ctx.Err() == nil {
        ok, err := transaction(ctx, session)
        if err != nil {
            return err
        }
        if !ok {
            return nil
        }
    }
    return nil
}

func transaction(ctx context.Context, session *kgo.GroupTransactSession) (bool, error) {
    log("beginning transaction")

    err := session.Begin()
    if err != nil {
        return false, err
    }

    log("fetching")

    fetches := session.PollRecords(ctx, 1000)
    if fetches.IsClientClosed() {
        session.End(ctx, kgo.TryAbort)
        return false, nil
    }
    err = fetches.Err()
    if err != nil {
      session.End(ctx, kgo.TryAbort)
      return false, err
    }

    records := fetches.Records()
    if len(records) == 0 {
        return true, nil
    }

    log("fetched", records)

    err = processAndProduce(ctx, session, records)
    if err != nil {
        session.End(ctx, kgo.TryAbort)
        return false, err
    }

    log("committing", records)

    committed, err := session.End(ctx, kgo.TryCommit)
    if err != nil {
        session.End(ctx, kgo.TryAbort)
        return false, err
    }
    if !committed {
        log("not committed")
        session.End(ctx, kgo.TryAbort)
        return true, nil
    }

    log("committed", records)
    return true, nil
}

Our Expectation

We expected that after a transaction abort, PollRecords would return records starting from the last committed offset.

What Happened

After a transaction abort during a cooperative-sticky rebalance where the same partitions were reassigned to the same consumer, PollRecords returned records starting from a much higher offset than the committed offset.

Full Annotated Logs

I've attached a snippet of the logs below with // HERE comments highlighting the key moments. Look for these annotations:

  • // HERE - we have successfully committed partition 458 up to offset 129752
  • // HERE - we correctly re-fetch partition 458 from offset 129753 (appears twice, showing correct behavior)
  • // HERE - we try to commit partition 458 up to offset 136639, but it gets reset
  • // HERE - we see last committed offset for partition 458 is 129753
  • // HERE - expected records to be aborted
  • // HERE start of PollRecords call
  • // HERE - partition 458 returns records starting at offset 136013 when the last committed offset was 129753

The full logs are attached kgo-eos-logs.txt

Click to expand full logs (763 lines)
// HERE - we have succesfully committed partition 458 up to offset 129752
1769721237795411200,2026-01-29T21:13:58.059Z,[DEBUG] issuing txn offset commit; uncommitted: &{0 raw-message-parser.v18.parser-backfill-d678c85cb-cvps6 raw-message-parser.v18 10715 1 24 8c4d0c1b12734beb1e941996a9ccb9e1 <nil> [{raw-messages-keyed [{333 240256 0 0xc0002f2d58 {map[]}} {468 200918 0 0xc0002f2d58 {map[]}} {438 212596 0 0xc0002f2d58 {map[]}} {388 125415 0 0xc0002f2d58 {map[]}} {458 129753 0 0xc0002f2d58 {map[]}} {408 157778 0 0xc0002f2d58 {map[]}} {398 144083 0 0xc0002f2d58 {map[]}} {358 137683 0 0xc0002f2d58 {map[]}} {478 205507 0 0xc0002f2d58 {map[]}} {368 128784 0 0xc0002f2d58 {map[]}} {448 153826 0 0xc0002f2d58 {map[]}} {428 149167 0 0xc0002f2d58 {map[]}} {202 131612 0 0xc0002f2d58 {map[]}} {346 221271 0 0xc0002f2d58 {map[]}} {418 139817 0 0xc0002f2d58 {map[]}} {378 144897 0 0xc0002f2d58 {map[]}}] {map[]}}] {map[]}}
1769721238990996992,2026-01-29T21:13:59.060Z,[DEBUG] read EndTxn v3; broker: 1
1769721238991024640,2026-01-29T21:13:59.060Z,[INFO] end transaction response successfully received; transactional_id: raw-message-parser.v18.parser-backfill-d678c85cb-cvps6
1769721238991044352,2026-01-29T21:13:59.060Z,[INFO] transact session successful
1769721239029340928,2026-01-29T21:13:59.060Z,kafka_produce_consumer.commitWithCallbacks: committed map[raw-messages-keyed:map[202:[129541 131611] 333:[238033 240255] 346:[209848 221270] 358:[135513 137682] 368:[126676 128783] 378:[142717 144896] 388:[123328 125414] 398:[141933 144082] 408:[155132 157777] 418:[137645 139816] 428:[147023 149166] 438:[209876 212595] 448:[153355 153825] 458:[127732 129752] 468:[198672 200917] 478:[203333 205506]]]

// HERE - we correctly fetch partition 458 from offset 129753
1769721242602131200,2026-01-29T21:14:03.062Z,offset fetch response: &{Version:9 ThrottleMillis:0 Topics:[{Topic:raw-messages-keyed Partitions:[{Partition:438 Offset:212596 LeaderEpoch:0 Metadata:0xc0011491c0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:378 Offset:144897 LeaderEpoch:0 Metadata:0xc0011491d0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:408 Offset:157778 LeaderEpoch:0 Metadata:0xc0011491e0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:346 Offset:221271 LeaderEpoch:0 Metadata:0xc0011491f0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:418 Offset:139817 LeaderEpoch:0 Metadata:0xc001149200 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:428 Offset:149167 LeaderEpoch:0 Metadata:0xc001149220 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:202 Offset:131612 LeaderEpoch:0 Metadata:0xc001149230 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:368 Offset:128784 LeaderEpoch:0 Metadata:0xc001149240 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:478 Offset:205507 LeaderEpoch:0 Metadata:0xc001149250 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:333 Offset:240256 LeaderEpoch:0 Metadata:0xc001149260 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:358 Offset:137683 LeaderEpoch:0 Metadata:0xc001149270 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:388 Offset:125415 LeaderEpoch:0 Metadata:0xc001149290 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:458 Offset:129753 LeaderEpoch:0 Metadata:0xc0011492a0 ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}] ErrorCode:0 Groups:[{Group:raw-message-parser.v18 Topics:[{Topic:raw-messages-keyed Partitions:[{Partition:438 Offset:212596 LeaderEpoch:0 Metadata:0xc0011491c0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:378 Offset:144897 LeaderEpoch:0 Metadata:0xc0011491d0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:408 Offset:157778 LeaderEpoch:0 Metadata:0xc0011491e0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:346 Offset:221271 LeaderEpoch:0 Metadata:0xc0011491f0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:418 Offset:139817 LeaderEpoch:0 Metadata:0xc001149200 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:428 Offset:149167 LeaderEpoch:0 Metadata:0xc001149220 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:202 Offset:131612 LeaderEpoch:0 Metadata:0xc001149230 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:368 Offset:128784 LeaderEpoch:0 Metadata:0xc001149240 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:478 Offset:205507 LeaderEpoch:0 Metadata:0xc001149250 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:333 Offset:240256 LeaderEpoch:0 Metadata:0xc001149260 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:358 Offset:137683 LeaderEpoch:0 Metadata:0xc001149270 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:388 Offset:125415 LeaderEpoch:0 Metadata:0xc001149290 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:458 Offset:129753 LeaderEpoch:0 Metadata:0xc0011492a0 ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}] ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}
1769721242602144512,2026-01-29T21:14:03.062Z,kafka_produce_consumer.commit: eosSession.End returned (false map[raw-messages-keyed:map[202:[131612 133735] 333:[240256 242467] 346:[221271 223427] 358:[137683 139858] 368:[128784 130857] 378:[144897 147076] 388:[125415 127520] 408:[157778 159937] 418:[139817 141985] 428:[149167 151088] 438:[212596 214767] 458:[129753 131879] 478:[205507 209804]]]
1769721242602148864,2026-01-29T21:14:03.062Z,kafka_produce_consumer.commitWithCallbacks: not committed map[raw-messages-keyed:map[202:[131612 133735] 333:[240256 242467] 346:[221271 223427] 358:[137683 139858] 368:[128784 130857] 378:[144897 147076] 388:[125415 127520] 408:[157778 159937] 418:[139817 141985] 428:[149167 151088] 438:[212596 214767] 458:[129753 131879] 478:[205507 209804]]]
1769721242602155776,2026-01-29T21:14:03.062Z,[INFO] producer state set to aborting; continuing to wait via flushing
1769721242602159104,2026-01-29T21:14:03.062Z,[INFO] flushing
1769721242602190336,2026-01-29T21:14:03.062Z,[DEBUG] flushed
1769721242602192384,2026-01-29T21:14:03.062Z,[DEBUG] aborted buffered records
1769721242602202368,2026-01-29T21:14:03.062Z,[INFO] transaction session ending; was_failed: false
1769721242602216704,2026-01-29T21:14:03.062Z,[INFO] transact session resetting to current committed state (potentially after a rejoin); tried_commit: false
1769721242609976320,2026-01-29T21:14:03.062Z,kafka_produce_consumer.transaction: beginning
1769721242609979904,2026-01-29T21:14:03.062Z,[INFO] beginning transact session
1769721242609984000,2026-01-29T21:14:03.062Z,[INFO] beginning transaction; transactional_id: raw-message-parser.v18.parser-backfill-d678c85cb-cvps6
1769721242609987584,2026-01-29T21:14:03.062Z,kafka_produce_consumer.transaction: fetching
1769721242610139392,2026-01-29T21:14:03.062Z,[DEBUG] wrote Fetch v16; broker: 20001
1769721242610317056,2026-01-29T21:14:03.062Z,[DEBUG] wrote Fetch v16; broker: 20002
1769721242610602496,2026-01-29T21:14:03.062Z,[DEBUG] wrote Fetch v16; broker: 20003
1769721242610785792,2026-01-29T21:14:03.062Z,[DEBUG] wrote Fetch v16; broker: 20000
1769721242610894592,2026-01-29T21:14:03.062Z,[DEBUG] updated uncommitted; group: raw-message-parser.v18
1769721242637189376,2026-01-29T21:14:03.062Z,[DEBUG] read Fetch v16; broker: 20002
1769721242640413696,2026-01-29T21:14:03.062Z,[DEBUG] wrote Fetch v16; broker: 20002
1769721242640427008,2026-01-29T21:14:03.062Z,[DEBUG] updated uncommitted; group: raw-message-parser.v18
1769721246767622912,2026-01-29T21:14:07.064Z,kafka_produce_consumer.transaction: fetched map[raw-messages-keyed:map[333:[240256 244663] 346:[221271 234576] 358:[137683 142417] 368:[132939 133183] 378:[144897 149258] 398:[144083 148694] 428:[149167 155356] 438:[212596 221172] 448:[153826 157960] 458:[129753 136638] 468:[200918 207537] 478:[205507 216703]]]

... (a couple more transaction resets and correct re-fetches)

// HERE - we try to commit partition 458 up to offset 136639, but it gets reset
1769721250920422400,2026-01-29T21:14:11.066Z,[DEBUG] issuing txn offset commit; uncommitted: &{0 raw-message-parser.v18.parser-backfill-d678c85cb-cvps6 raw-message-parser.v18 10715 1 30 8c4d0c1b12734beb1e941996a9ccb9e1 <nil> [{raw-messages-keyed [{468 207538 0 0xc000246f18 {map[]}} {438 221173 0 0xc000246f18 {map[]}} {458 136639 0 0xc000246f18 {map[]}} {378 149259 0 0xc000246f18 {map[]}} {478 216704 0 0xc000246f18 {map[]}} {398 148695 0 0xc000246f18 {map[]}} {346 234577 0 0xc000246f18 {map[]}} {358 142418 0 0xc000246f18 {map[]}} {448 157961 0 0xc000246f18 {map[]}} {428 155357 0 0xc000246f18 {map[]}} {368 133184 0 0xc000246f18 {map[]}} {333 244664 0 0xc000246f18 {map[]}}] {map[]}}] {map[]}}
1769721250920471040,2026-01-29T21:14:11.066Z,[DEBUG] wrote TxnOffsetCommit v3; broker: 1
1769721251030829056,2026-01-29T21:14:11.066Z,[DEBUG] heartbeating; group: raw-message-parser.v18
1769721251030938624,2026-01-29T21:14:11.066Z,[DEBUG] wrote Heartbeat v4; broker: 1
1769721251096269056,2026-01-29T21:14:12.066Z,[DEBUG] read TxnOffsetCommit v3; broker: 1
1769721251188854272,2026-01-29T21:14:12.066Z,[DEBUG] read Heartbeat v4; broker: 1
1769721251188875008,2026-01-29T21:14:12.066Z,[DEBUG] heartbeat complete; group: raw-message-parser.v18
1769721251188879616,2026-01-29T21:14:12.066Z,[INFO] heartbeat errored; group: raw-message-parser.v18
1769721251188885504,2026-01-29T21:14:12.066Z,[DEBUG] heartbeating; group: raw-message-parser.v18
1769721251188919808,2026-01-29T21:14:12.066Z,[INFO] consumer calling onRevoke at the end of a session; consumer did not change any client-side subscription; group: raw-message-parser.v18
1769721251188936448,2026-01-29T21:14:12.066Z,[DEBUG] entering OnPartitionsRevoked; with: map[]
1769721251188941056,2026-01-29T21:14:12.066Z,[DEBUG] wrote Heartbeat v4; broker: 1
1769721251188943360,2026-01-29T21:14:12.066Z,[INFO] transact session in on_revoke with nothing to revoke; allowing next commit
1769721251188964352,2026-01-29T21:14:12.066Z,onPartitionsRevoked: group: raw-message-parser.v18
1769721251188977664,2026-01-29T21:14:12.066Z,onPartitionsRevoked: group: raw-message-parser.v18
1769721251241340160,2026-01-29T21:14:12.066Z,[DEBUG] read Heartbeat v4; broker: 1
1769721251241368576,2026-01-29T21:14:12.066Z,[DEBUG] heartbeat complete; group: raw-message-parser.v18
1769721251241374976,2026-01-29T21:14:12.066Z,[INFO] heartbeat errored again while waiting for user revoke to finish; group: raw-message-parser.v18
1769721251241384960,2026-01-29T21:14:12.066Z,[DEBUG] blocking commits from join&sync
1769721251241385472,2026-01-29T21:14:12.066Z,[INFO] transaction session ending; was_failed: false
1769721251241390848,2026-01-29T21:14:12.066Z,[INFO] joining group; group: raw-message-parser.v18
1769721251241406720,2026-01-29T21:14:12.066Z,[INFO] ending transaction; transactional_id: raw-message-parser.v18.parser-backfill-d678c85cb-cvps6
1769721251241455616,2026-01-29T21:14:12.066Z,[DEBUG] wrote EndTxn v3; broker: 1
1769721251241483264,2026-01-29T21:14:12.066Z,[DEBUG] wrote JoinGroup v4; broker: 1
1769721251666777344,2026-01-29T21:14:12.066Z,[DEBUG] read EndTxn v3; broker: 1
1769721251666804736,2026-01-29T21:14:12.066Z,[INFO] end transaction response successfully received; transactional_id: raw-message-parser.v18.parser-backfill-d678c85cb-cvps6
1769721251666837248,2026-01-29T21:14:12.066Z,[INFO] transact session resetting to current committed state (potentially after a rejoin); tried_commit: false
1769721251667817216,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20000
1769721251667825152,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20001
1769721251667857664,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20003
1769721251667982848,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20002
1769721251669940480,2026-01-29T21:14:12.066Z,offset fetch request: {Version:0 Group:raw-message-parser.v18 Topics:[{Topic:raw-messages-keyed Partitions:[346 398 438 448 333 358 468 478 458 368 428 378] UnknownTags:{keyvals:map[]}}] Groups:[] RequireStable:true UnknownTags:{keyvals:map[]}}
1769721251671936768,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20001
1769721251709176576,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20003
1769721251721709824,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20000

// HERE - we see last committed offset for partition 458 is 129753
1769721251735393792,2026-01-29T21:14:12.066Z,offset fetch response: &{Version:9 ThrottleMillis:0 Topics:[{Topic:raw-messages-keyed Partitions:[{Partition:346 Offset:221271 LeaderEpoch:0 Metadata:0xc015f2d550 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:398 Offset:144083 LeaderEpoch:0 Metadata:0xc015f2d560 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:438 Offset:212596 LeaderEpoch:0 Metadata:0xc015f2d570 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:448 Offset:153826 LeaderEpoch:0 Metadata:0xc015f2d580 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:333 Offset:240256 LeaderEpoch:0 Metadata:0xc015f2d590 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:358 Offset:137683 LeaderEpoch:0 Metadata:0xc015f2d5a0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:468 Offset:200918 LeaderEpoch:0 Metadata:0xc015f2d5b0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:478 Offset:205507 LeaderEpoch:0 Metadata:0xc015f2d5c0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:458 Offset:129753 LeaderEpoch:0 Metadata:0xc015f2d5d0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:368 Offset:128784 LeaderEpoch:0 Metadata:0xc015f2d5e0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:428 Offset:149167 LeaderEpoch:0 Metadata:0xc015f2d5f0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:378 Offset:144897 LeaderEpoch:0 Metadata:0xc015f2d600 ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}] ErrorCode:0 Groups:[{Group:raw-message-parser.v18 Topics:[{Topic:raw-messages-keyed Partitions:[{Partition:346 Offset:221271 LeaderEpoch:0 Metadata:0xc015f2d550 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:398 Offset:144083 LeaderEpoch:0 Metadata:0xc015f2d560 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:438 Offset:212596 LeaderEpoch:0 Metadata:0xc015f2d570 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:448 Offset:153826 LeaderEpoch:0 Metadata:0xc015f2d580 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:333 Offset:240256 LeaderEpoch:0 Metadata:0xc015f2d590 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:358 Offset:137683 LeaderEpoch:0 Metadata:0xc015f2d5a0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:468 Offset:200918 LeaderEpoch:0 Metadata:0xc015f2d5b0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:478 Offset:205507 LeaderEpoch:0 Metadata:0xc015f2d5c0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:458 Offset:129753 LeaderEpoch:0 Metadata:0xc015f2d5d0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:368 Offset:128784 LeaderEpoch:0 Metadata:0xc015f2d5e0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:428 Offset:149167 LeaderEpoch:0 Metadata:0xc015f2d5f0 ErrorCode:0 UnknownTags:{keyvals:map[]}} {Partition:378 Offset:144897 LeaderEpoch:0 Metadata:0xc015f2d600 ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}] ErrorCode:0 UnknownTags:{keyvals:map[]}}] UnknownTags:{keyvals:map[]}}
1769721251735411712,2026-01-29T21:14:12.066Z,kafka_produce_consumer.commit: eosSession.End returned (false map[raw-messages-keyed:map[333:[240256 244663] 346:[221271 234576] 358:[137683 142417] 368:[132939 133183] 378:[144897 149258] 398:[144083 148694] 428:[149167 155356] 438:[212596 221172] 448:[153826 157960] 458:[129753 136638] 468:[200918 207537] 478:[205507 216703]]]
1769721251735417088,2026-01-29T21:14:12.066Z,kafka_produce_consumer.commitWithCallbacks: not committed map[raw-messages-keyed:map[333:[240256 244663] 346:[221271 234576] 358:[137683 142417] 368:[132939 133183] 378:[144897 149258] 398:[144083 148694] 428:[149167 155356] 438:[212596 221172] 448:[153826 157960] 458:[129753 136638] 468:[200918 207537] 478:[205507 216703]]]
1769721251735424000,2026-01-29T21:14:12.066Z,[INFO] producer state set to aborting; continuing to wait via flushing
1769721251735429376,2026-01-29T21:14:12.066Z,[INFO] flushing
1769721251735467776,2026-01-29T21:14:12.066Z,[DEBUG] flushed

// HERE - expected records to be aborted
1769721251735470848,2026-01-29T21:14:12.066Z,[DEBUG] aborted buffered records
1769721251735482112,2026-01-29T21:14:12.066Z,[INFO] transaction session ending; was_failed: false
1769721251735499008,2026-01-29T21:14:12.066Z,[INFO] transact session resetting to current committed state (potentially after a rejoin); tried_commit: false
1769721251743251456,2026-01-29T21:14:12.066Z,kafka_produce_consumer.transaction: beginning
1769721251743257344,2026-01-29T21:14:12.066Z,[INFO] beginning transact session
1769721251743262464,2026-01-29T21:14:12.066Z,[INFO] beginning transaction; transactional_id: raw-message-parser.v18.parser-backfill-d678c85cb-cvps6

// HERE start of `PollRecords` call, and subsequent broker Fetch requests 
1769721251743266560,2026-01-29T21:14:12.066Z,kafka_produce_consumer.transaction: fetching
1769721251743422720,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20001
1769721251743469568,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20003
1769721251743771392,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20000
1769721251743838720,2026-01-29T21:14:12.066Z,[DEBUG] updated uncommitted; group: raw-message-parser.v18
1769721251751965440,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20002
1769721251755918592,2026-01-29T21:14:12.066Z,[DEBUG] updated uncommitted; group: raw-message-parser.v18
1769721251755948544,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20002
1769721251811352576,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20002
1769721251815184384,2026-01-29T21:14:12.066Z,[DEBUG] updated uncommitted; group: raw-message-parser.v18
1769721251815230464,2026-01-29T21:14:12.066Z,[DEBUG] wrote Fetch v16; broker: 20002
1769721251863289600,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20002
1769721251876406272,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20001

// HERE - partition 458 returns records starting at offset 136013 when the last committed offset was 129753
1769721251907437312,2026-01-29T21:14:12.066Z,kafka_produce_consumer.transaction: fetched map[raw-messages-keyed:map[333:[240256 245187] 346:[221271 223427] 358:[137683 139858] 388:[125415 127520] 398:[144083 144546] 428:[149167 153244] 438:[212596 218977] 448:[153826 160063] 458:[136013 137191]]]
1769721251910696704,2026-01-29T21:14:12.066Z,kafka_produce_consumer: processing messages map[raw-messages-keyed:map[333:[240256 245187] 346:[221271 223427] 358:[137683 139858] 388:[125415 127520] 398:[144083 144546] 428:[149167 153244] 438:[212596 218977] 448:[153826 160063] 458:[136013 137191]]]
1769721251931599872,2026-01-29T21:14:12.066Z,[DEBUG] read JoinGroup v4; broker: 1
1769721251931634176,2026-01-29T21:14:12.066Z,[INFO] joined; group: raw-message-parser.v18
1769721251931639040,2026-01-29T21:14:12.066Z,[INFO] syncing; group: raw-message-parser.v18
1769721251931713536,2026-01-29T21:14:12.066Z,[DEBUG] wrote SyncGroup v5; broker: 1
1769721251984238592,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20003
1769721252010328832,2026-01-29T21:14:12.066Z,[DEBUG] read Fetch v16; broker: 20000
1769721252122486016,2026-01-29T21:14:13.067Z,[DEBUG] sharded request; req: AddPartitionsToTxn
1769721252122594560,2026-01-29T21:14:13.067Z,[DEBUG] wrote AddPartitionsToTxn v3; broker: 1
1769721252122950912,2026-01-29T21:14:13.067Z,[DEBUG] sharded request; req: AddPartitionsToTxn
1769721252122994176,2026-01-29T21:14:13.067Z,[DEBUG] wrote AddPartitionsToTxn v3; broker: 1
1769721252124034560,2026-01-29T21:14:13.067Z,[DEBUG] sharded request; req: AddPartitionsToTxn
1769721252124044800,2026-01-29T21:14:13.067Z,[DEBUG] sharded request; req: AddPartitionsToTxn
1769721252124050432,2026-01-29T21:14:13.067Z,[DEBUG] wrote AddPartitionsToTxn v3; broker: 1
1769721252124075520,2026-01-29T21:14:13.067Z,[DEBUG] wrote AddPartitionsToTxn v3; broker: 1
1769721252205819904,2026-01-29T21:14:13.067Z,messageHandler.ProcessMessages failed: EOS processing gap detected for key (partition: 458, offset: 136027, committed_offset: 129108, next_known_offset: 130800)

Questions

  1. When a transactional group session ends and OnPartitionsRevoked is called, but no partitions are actually lost (as indicated by empty added: and lost: in the logs), are the fetch offsets supposed to be refetched from the broker?

  2. Is there something we should be doing differently in our transaction loop to handle this case? Should we not be retrying the Begin/Poll/Produce/Commit loop and just retry the Commit call? We have tried using BlockRebalanceOnPoll in the past, but it seemed to have caused more disruption in group stability. Perhaps that's changed and we can try again if that's the solution here.

Related Issues

This seems related to:

Thank you for maintaining this excellent library!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions