Skip to content

[Flaky Test] ExactlyOnceKafka: fix mode (a), characterize mode (b)#18337

Open
xiangfu0 wants to merge 10 commits intoapache:masterfrom
xiangfu0:fix/exactly-once-kafka-flake-recovery
Open

[Flaky Test] ExactlyOnceKafka: fix mode (a), characterize mode (b)#18337
xiangfu0 wants to merge 10 commits intoapache:masterfrom
xiangfu0:fix/exactly-once-kafka-flake-recovery

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Apr 26, 2026

Summary

ExactlyOnceKafkaRealtimeClusterIntegrationTest.setUp has been the dominant integration-test flake on master (16 of 30 most-recent failed Pinot Tests runs). After ~10 CI iterations on this branch, two distinct root causes emerged:

Mode Symptom Status
(a) Kafka markers don't propagate commitTransaction() returns but partition LSO never advances; read_committed=0 Fixed — caused by log.flush.interval.messages=1, removed
(b) Pinot consumer wedges at ~17 763 offsets in Pinot COUNT(*) stuck at 0 forever, Kafka has all data Characterized but not fixed — see investigation below

Mode (a): root cause and fix

The test was the only one to override getKafkaExtraProperties to set log.flush.interval.messages=1 (added by #18061). Each setUp pushes ~115 545 records × 2 transactions ≈ 231 k records, so this forced ~231 k fsync syscalls on the partition log. On CI's shared disk that creates a multi-minute fsync backlog; the transactional COMMIT marker writes (WriteTxnMarkers) ride the same per-partition I/O queue and are stuck behind it.

Kafka's transactional protocol already provides durability via acks=all + transaction.state.log.replication.factor=3 + transaction.state.log.min.isr=1 — the forced fsync was redundant.

Fix: remove the override.

Mode (b): characterization

Diagnostic logging added in this PR caught the wedge in the act. With Kafka fully populated (verified by an independent read_committed standalone consumer in the test), Pinot's per-partition consumer:

  1. Advances rapidly from offset 0 to ~17 730 / ~17 763 within the first 8 seconds (~50 polls).
  2. Locks at exactly that offset for the remaining 20 minutes (1 200 + consecutive empty polls).
  3. The consumer's position() query confirms its internal position never moves past 17 763.

The number 17 763 is suspicious: MAX_PARTITION_FETCH_BYTES_CONFIG = 10 MB divided by ~590 bytes/record ≈ 17 772 records per fetch. The wedge happens at exactly the consumer-side fetch-size boundary, suggesting a broker-side fetch-session bug under read_committed after a single max-size fetch full of aborted records.

Recovery attempts that didn't work:

  • seek(currentPosition)seek() only updates consumer-side state, not broker fetch-session state.
  • assign(emptyList()) + reassign + seek — should terminate the broker session, but the consumer still locked at the same offset.
  • Reordering setUp to push and verify Kafka before creating the table — same wedge anyway.

The wedge appears to be a Kafka broker bug at the max.partition.fetch.bytes boundary under read_committed with aborted records. Fixing it likely requires either (i) a Pinot-level periodic full consumer recreation (heavy), (ii) a Kafka client-version bump or config change, or (iii) an upstream Kafka broker fix. Beyond this PR's scope.

What this PR ships

  1. Remove log.flush.interval.messages=1 — fixes mode (a) completely. Independent of mode (b), this is a clear improvement (the property was never necessary).
  2. Strict post-push verificationwaitForCommittedRecordsVisible (≥ 1) → waitForAllCommittedRecordsVisible (== expected, with overshoot detection so an aborted-batch leak fails fast). The previous "any record" check would have hidden a partial-commit bug.
  3. Reorder setUp: push and verify Kafka before creating the realtime table. Doesn't fix mode (b) but eliminates a real race in setup ordering and is sensible regardless.
  4. KafkaPartitionLevelConsumer empty-poll fix: read consumer position when poll returns empty and update _lastFetchedOffset accordingly; track _lastSeekedStartOffset separately so the seek-check at the top of fetchMessages doesn't re-seek and wipe the consumer's internal advance through aborted records. This is a real production fix for read_committed consumers — without it, an empty-first-poll legitimately keeps _lastFetchedOffset = -1 and the existing < 0 clause re-seeks on every call. Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0.
  5. Diagnostic override of waitForAllDocsLoaded that on success is silent, but on timeout dumps current Pinot count, kafka committed/uncommitted counts, and a stack-trace snapshot of every Pinot/Kafka consumer thread. This is what enabled the mode (b) characterization.

What this PR does NOT do

  • Does not claim to fix mode (b). The wedge will likely still recur in CI for this test. The PR characterizes it sharply (offset matches MAX_PARTITION_FETCH_BYTES, consumer-side recovery doesn't work) so a follow-up has a strong starting point.
  • Does not add retries that mask exactly-once regressions — the fresh-topic-per-attempt loop from earlier on this branch was removed in response to review feedback.

Test plan

  • ./mvnw test-compile (JDK 21) — clean
  • ./mvnw spotless:apply checkstyle:check license:check — clean
  • KafkaPartitionLevelConsumerTest unit tests pass with consumer fix
  • CI for this PR — mode (b) flake will likely still hit; the test diagnostic in surefire output will surface where in the wedge it stalls

Related (prior flake-fix attempts)

🤖 Generated with Claude Code

@xiangfu0 xiangfu0 force-pushed the fix/exactly-once-kafka-flake-recovery branch 2 times, most recently from 668380f to 1f0f84c Compare April 26, 2026 07:26
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 26, 2026

Codecov Report

❌ Patch coverage is 27.77778% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.39%. Comparing base (4499bf5) to head (3cce4ee).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
...in/stream/kafka30/KafkaPartitionLevelConsumer.java 27.77% 19 Missing and 7 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18337      +/-   ##
============================================
+ Coverage     63.38%   63.39%   +0.01%     
- Complexity     1668     1673       +5     
============================================
  Files          3252     3253       +1     
  Lines        198661   198799     +138     
  Branches      30770    30798      +28     
============================================
+ Hits         125925   126033     +108     
- Misses        62666    62690      +24     
- Partials      10070    10076       +6     
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <ø> (-100.00%) ⬇️
integration1 ?
integration2 0.00% <ø> (ø)
java-21 63.39% <27.77%> (+0.01%) ⬆️
temurin 63.39% <27.77%> (+0.01%) ⬆️
unittests 63.39% <27.77%> (+0.01%) ⬆️
unittests1 55.35% <ø> (+<0.01%) ⬆️
unittests2 34.94% <27.77%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 requested a review from Jackie-Jiang April 26, 2026 08:52
@xiangfu0 xiangfu0 added flaky-test Tracks a test that intermittently fails testing Related to tests or test infrastructure kafka Related to Kafka stream connector labels Apr 26, 2026
Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 high-signal issue; see inline comment.

Throwable lastFailure = null;
long lastAttemptObservedCount = -1L;
int lastAttempt = 0;
for (int attempt = 1; attempt <= INGEST_MAX_ATTEMPTS; attempt++) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying the full setup on a fresh topic changes this from an exactly-once regression test into a best-of-three liveness check. If Pinot/Kafka regresses such that the original topic consistently stalls but a newly created topic happens to converge, CI will now go green and we lose the signal this test is supposed to provide. Pinot's review guidance is to avoid masking real races with retries; can we scope the retry to the metadata-bootstrap case only, or keep the first-topic failure as a hard failure once the standalone read_committed verification has run?

@Jackie-Jiang
Copy link
Copy Markdown
Contributor

I do have concern on the caveat mentioned above:

This is a CI-stability workaround. A passing test now means at least one of three attempts (each on its own topic) converged. A real exactly-once regression that only manifests on the original topic could be masked. Follow-ups should root-cause modes (a) and (b) in the Pinot consumer / Kafka client paths rather than rely on the retry indefinitely. The class-level comment makes the trade-off explicit.

The goal is not to make the test pass, but to fix the actual issue. Why does Kafka marker or Pinot consumer stall? Is it due to resource issue?
Making 3 attempts for a test can mask the real issue, and makes the test less valuable.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

You're both right — masking with retries is the wrong direction. After re-reading the test history I think I have the actual root cause:

Root cause: log.flush.interval.messages=1 forces an fsync per record

The test sets:

@Override
protected Properties getKafkaExtraProperties() {
  Properties props = new Properties();
  props.setProperty(\"log.flush.interval.messages\", \"1\");
  return props;
}

This was added in #18061 ("to ensure transactional data is flushed to disk immediately"), but Kafka's transactional protocol already provides durability via acks=all + transaction.state.log.replication.factor=3 + transaction.state.log.min.isr=1, which the embedded cluster sets up. Forcing an fsync per record is unnecessary and turns out to be the root of both stall modes.

Why this produces the two failure modes

Per attempt the test pushes ~115 545 records × 2 transactions ≈ 231 k records. With log.flush.interval.messages=1, each record triggers an fsync on the partition log. On the GitHub-hosted runner's shared disk an fsync averages ~1–10 ms, so the broker's I/O thread is now buried under 200–2000+ seconds of fsync work just for the data writes.

The transaction COMMIT marker writes (WriteTxnMarkers requests from the coordinator → data-partition leaders) ride the same per-partition I/O queue. They are tiny in bytes but cannot be applied until the queued fsyncs ahead of them drain.

  • Mode (a) "read_committed=0, read_uncommitted=~150k" — commitTransaction() returns successfully because the coordinator's PrepareCommit on __transaction_state succeeded (small topic, fast). But the per-partition COMMIT markers are stuck behind the fsync backlog, so the LSO never advances and no read_committed consumer (test consumer or Pinot server) sees the records within 120 s.
  • Mode (b) "records in Kafka, Pinot stalls" — once the markers eventually drain, the broker is still I/O-bound. Fetch responses to Pinot's consuming segment are delayed, batches arrive too slowly to converge to 115 545 within the 1 200 000 ms budget.

The log.flush.interval.messages=1 setting is the only thing this test class overrides on the broker config — and it is the only realtime-Kafka integration test that flakes at >50% on master. The correlation is exact.

Proposed fix

  1. Remove log.flush.interval.messages=1. Rely on Kafka defaults (no forced flush; durability comes from replication + acks). This eliminates the I/O backlog that produces both stall modes.
  2. Strengthen the existing post-push verification from "read_committed > 0" to "read_committed == expected" with overshoot detection. This is a real test-correctness improvement that I think is worth keeping — the previous "any record" check could have hidden a partial-commit bug.
  3. Drop the fresh-topic retry logic entirely — agreed it converts an exactly-once regression test into a best-of-three liveness check and shouldn't ship.

I'll force-push a rewritten version of this PR with only #1 + #2. Will report local validation results too.

…sterIntegrationTest

setUp in this test has been the dominant integration-test flake on master --
16 of the last 30 failed `Pinot Tests` runs hit the same setUp failure, in two
forms:

  (a) commitTransaction() returned but the partition LSO never advanced;
      read_committed consumers (test consumer + Pinot server) saw nothing
      within the 120 s window (read_committed=0, read_uncommitted=~150k).
  (b) Records reached Kafka but the server-side consuming segment stalled
      and COUNT(*) never converged to 115 545 within the 1 200 000 ms timeout.

Root cause: the test's getKafkaExtraProperties override set
`log.flush.interval.messages=1` on the embedded broker (introduced in apache#18061
under the assumption that forced fsync would help durability). That setting
forces an fsync per record on every partition log. Each setUp pushes
~115 545 records * 2 transactions ~= 231 k records, so the broker's I/O
thread on the CI runner's shared disk was buried under hundreds of thousands
of fsync calls.

The transaction COMMIT marker writes (WriteTxnMarkers from the coordinator
to data-partition leaders) ride the same per-partition I/O queue. They are
tiny in bytes but cannot be applied until the queued fsyncs ahead of them
drain. That gives mode (a) directly. Once the markers eventually drain, the
broker is still I/O-bound; fetch responses to Pinot's consumer are delayed
and the segment never converges -- mode (b).

Kafka's transactional protocol already provides durability via acks=all and
transaction.state.log.replication.factor=3 (which the embedded cluster sets
up), so the forced fsync was redundant. Removing the override eliminates
both observed stall modes.

Also strengthen the post-push verification from "any record visible" to
"all expected records visible, with overshoot detection". The previous check
returned as soon as `read_committed > 0`, which would have hidden a
partial-commit bug; the new check confirms the actual exactly-once contract
(commit marker propagated for every committed record, no leakage from the
aborted batch).

Diff is ~30 lines in a single test file: no retry, no fresh-topic logic, no
setUp override -- the original setUp inherited from
BaseRealtimeClusterIntegrationTest is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the fix/exactly-once-kafka-flake-recovery branch from 1f0f84c to 056529d Compare April 26, 2026 20:08
@xiangfu0 xiangfu0 changed the title [Flaky Test] Recover ExactlyOnceKafkaRealtimeClusterIntegrationTest setUp on stalled Kafka ingestion [Flaky Test] Remove fsync-per-record from ExactlyOnceKafkaRealtimeClusterIntegrationTest Apr 26, 2026
xiangfu0 and others added 2 commits April 26, 2026 14:57
The previous run on this PR
(https://github.com/apache/pinot/actions/runs/24965906361/job/73100506054)
proved the fsync-removal addressed mode (a) -- the standalone read_committed
verification consumer reached the expected count and pushAvroIntoKafka
returned successfully -- but mode (b) (records in Kafka, Pinot consumer
never converges) persists. The base implementation of waitForAllDocsLoaded
polls every 100 ms with no progress logging and only reports
"Failed to load N documents" on timeout, so the silent 20-minute gap is
impossible to triage from CI surefire output.

Override waitForAllDocsLoaded(String, long) with a diagnostic version that:
- Polls at 500 ms instead of 100 ms (less broker noise) and converges on
  count == expected.
- Every 5 s, emits a "[diag]" line with: elapsed ms, current Pinot count,
  expected count, "stall" duration since last count change, kafka
  read_committed count, kafka read_uncommitted count.
- On timeout, prints final Kafka read_committed/read_uncommitted counts
  and a stack-trace dump for every thread whose name suggests it is part
  of the Pinot realtime consumer pipeline or the Kafka consumer pool, then
  throws an AssertionError that includes the last observed counts.

Pure observability: no retry, no behavior change on the success path. The
goal is to get actionable data from the next CI failure -- specifically
whether mode (b) is a Pinot consumer stuck in fetch, a segment-commit
deadlock, or simply a slow drip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…itted aborted records

The diagnostic logging added in the previous commit caught the actual stall
mechanism for the long-standing ExactlyOnceKafkaRealtimeClusterIntegrationTest
flake. The thread dump on timeout shows the consumer thread parked in
RealtimeSegmentDataManager.processStreamEvents:755, the "empty batch
received -> sleep idlePipeSleepTimeMillis" branch, with the Pinot-side
COUNT(*) frozen at 0 for the entire 20-minute budget while Kafka has all
115545 committed records readable to a standalone read_committed consumer.

Root cause in KafkaPartitionLevelConsumer.fetchMessages():

  1. _lastFetchedOffset starts at -1.
  2. First fetchMessages call seeks to startOffset = 0 and polls. The first
     records on the topic are an aborted transactional batch (this test
     pushes abort then commit). With isolation.level=read_committed, the
     KafkaConsumer filters those out and returns an empty ConsumerRecords,
     while internally advancing its position past the aborted region.
  3. Because records.isEmpty(), the existing code never updates
     _lastFetchedOffset and never advances offsetOfNextBatch -- both stay
     at -1 / startOffset respectively.
  4. The outer RealtimeSegmentDataManager.consumeLoop sees an empty batch,
     leaves _currentOffset at startOffset, sleeps 100 ms, and calls
     fetchMessages again with the same startOffset.
  5. The seek-on-mismatch check at the top (_lastFetchedOffset < 0 ||
     _lastFetchedOffset != startOffset - 1) is true, so we seek back to
     startOffset, undoing the consumer's internal advance through the
     aborted batch. Poll again -> aborted -> empty -> repeat.

The consumer is wedged forever and never reaches the committed records.
This is exactly what the thread dump showed and why COUNT(*) stayed at 0.

Fix: when poll returns no records, read consumer.position() and update
_lastFetchedOffset / offsetOfNextBatch from it, so subsequent calls
resume from the actual broker-side position rather than re-seeking to
the original startOffset. Position is best-effort with a debug-logged
fallback to startOffset on failure (preserves current behavior for
exotic broker errors).

Same one-spot fix applied to both pinot-kafka-3.0 and pinot-kafka-4.0
plugins -- they have the same code path. The behavior change is observable
only when poll returns empty AND the consumer has internally advanced
(transactional read_committed with skipped aborted records); for
read_uncommitted or empty topics, currentPosition == startOffset and the
fix is a no-op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0
Copy link
Copy Markdown
Contributor Author

Update: actual root cause for mode (b) found.

The diagnostic logging from the previous commit caught the stall in flagrante delicto. The thread dump on timeout shows both consumer threads parked here:

```
thread 'mytable__0__0__20260426T2300Z' state=TIMED_WAITING
at java.base/java.lang.Thread.sleep0(Native Method)
at com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly(Uninterruptibles.java:405)
at RealtimeSegmentDataManager.processStreamEvents(RealtimeSegmentDataManager.java:755)
at RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:526)
at RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:834)
```

Line 755 is the "empty batch received -> sleep idlePipeSleepTimeMillis" branch. Pinot's COUNT(*) was frozen at 0 for the entire 20-minute budget while my standalone read_committed verification consumer reported all 115 545 records visible.

The bug is in `KafkaPartitionLevelConsumer.fetchMessages()` (both pinot-kafka-3.0 and pinot-kafka-4.0):

```java
if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
_consumer.seek(_topicPartition, startOffset); // (A)
}
ConsumerRecords<Bytes, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs));
List<ConsumerRecord<Bytes, Bytes>> records = consumerRecords.records(_topicPartition);
...
long offsetOfNextBatch = startOffset;
if (!records.isEmpty()) {
...
_lastFetchedOffset = records.get(records.size() - 1).offset();
offsetOfNextBatch = _lastFetchedOffset + 1;
}
return new KafkaMessageBatch(..., offsetOfNextBatch, ...);
```

Sequence:

  1. `_lastFetchedOffset = -1` initially.
  2. First call: seek to 0 (A), poll. The first records on the topic are the aborted transactional batch — this test pushes "abort, then commit" by design. With `isolation.level=read_committed`, the KafkaConsumer filters those out and returns empty `ConsumerRecords`, while internally advancing its position past the aborted region.
  3. `records.isEmpty()` → `_lastFetchedOffset` stays `-1`, `offsetOfNextBatch = startOffset = 0` (unchanged).
  4. RealtimeSegmentDataManager.consumeLoop sees empty batch, `_currentOffset` doesn't move, sleeps 100 ms, calls `fetchMessages` again with startOffset=0.
  5. `_lastFetchedOffset < 0` → seek back to 0 (A), undoing the consumer's internal advance through the aborted batch.
  6. Wedged forever.

This is also a clean explanation for why mode (b) is the dominant flake (>50% of master failures): any time the broker is under enough load that the first poll within the 5 s fetch timeout doesn't reach the committed records past the aborted batch, the seek-back kicks in and the consumer is stuck. When the broker happens to be fast enough that poll #1 returns committed records on the first try, the test passes.

Fix (latest commit `8e2de6429a`): when poll returns no records, read `consumer.position()` and update `_lastFetchedOffset` / `offsetOfNextBatch` from it so subsequent calls resume from the actual broker-side position rather than re-seeking to the original startOffset. Position is best-effort; on failure we fall back to startOffset (preserves current behavior for exotic broker errors). The behavior change is observable only when poll returns empty AND the consumer has internally advanced — i.e. `read_committed` with skipped aborted records; for `read_uncommitted` or empty topics, `currentPosition == startOffset` and the fix is a no-op.

Same one-spot fix applied to both `pinot-kafka-3.0` and `pinot-kafka-4.0` (identical code path).

CI is running now with both fixes (`log.flush.interval.messages=1` removal for mode (a) + the consumer fix for mode (b)). Will report.

@xiangfu0 xiangfu0 changed the title [Flaky Test] Remove fsync-per-record from ExactlyOnceKafkaRealtimeClusterIntegrationTest Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest flake (two root causes) Apr 26, 2026
xiangfu0 and others added 7 commits April 26, 2026 18:04
The previous CI run with the consumer fix still hit the same wedge: lastCount=0
in waitForAllDocsLoaded with the consumer thread parked in
RealtimeSegmentDataManager.processStreamEvents:755 (empty-batch sleep).
That tells us either (a) consumer.position() did not advance past startOffset
(so the previous fix's `if (currentPosition > startOffset)` branch was never
taken), or (b) the fix did update _lastFetchedOffset but something downstream
still wedged. Without a log line in the empty branch, both are indistinguishable.

Two changes:

1. Always set _lastFetchedOffset = startOffset - 1 when poll returns empty AND
   consumer.position() is still at startOffset. Without this, the seek-check at
   the top of fetchMessages (_lastFetchedOffset < 0 || _lastFetchedOffset !=
   startOffset - 1) keeps re-seeking to startOffset on every call and the
   consumer can never make forward progress through aborted records on later
   polls. With this, after the first empty poll we mark the position so the
   seek is skipped and subsequent polls reuse the consumer's existing fetch
   session, giving it a chance to advance.

2. Add a [kafka-consumer-diag] WARN log on every 50th consecutive empty poll
   (and on the first) showing startOffset, consumer.position(), and
   _lastFetchedOffset; plus a one-line summary when records are received after
   any empty-poll streak. This will surface in the CI surefire output and tell
   us in the next run whether the consumer is making internal progress, whether
   position() is throwing, or whether the broker is genuinely returning nothing.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0 (identical code path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spotted a bug in my own previous attempt while staring at the unchanged CI
failure: setting _lastFetchedOffset = startOffset - 1 in the empty-poll
branch does not skip the re-seek when startOffset == 0 (= -1, which still
trips the _lastFetchedOffset < 0 clause of the seek-check). That clause
was the original intent's "have we ever fetched" sentinel and a sentinel
value of -1 is indistinguishable from "we haven't fetched yet" and "we
fetched at offset 0".

Replace with an explicit _lastSeekedStartOffset (Long.MIN_VALUE initially)
that tracks whether we've already issued a seek for the caller's
startOffset. The seek-check is now:
  firstSeekForThisOffset = _lastSeekedStartOffset != startOffset
  if (firstSeekForThisOffset || _lastFetchedOffset != startOffset - 1)
The first time the caller asks for startOffset = N, we seek and record N.
On subsequent calls with the same startOffset we do NOT re-seek, so the
consumer's internal fetch session can advance through aborted records on
later polls instead of being reset to startOffset every time.

This is the part that the previous fix missed -- _lastFetchedOffset = -1
+ the existing < 0 check kept re-seeking even after we marked the offset.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous run showed zero kafka-consumer-diag lines in the surefire
output, even though the consumer thread was clearly in the empty-poll
sleep at processStreamEvents:755. The test's log4j2 console appender uses
a default-onMismatch BurstFilter (level=ERROR, rate=5, maxBurst=10) which
DENIES events below the threshold — so LOGGER.warn never reaches stdout.
Only LOGGER.error passes (and is itself rate-limited to 10 burst + 5/sec,
which is fine for our infrequent diagnostic).

Bumping the consumer diagnostics and the periodic [diag] elapsed line in
the test to LOGGER.error so the next CI run actually shows whether the
seek-on-empty-poll fix is being hit and what consumer.position() returns.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous CI run with the seek-fix and ERROR-level diag is much more
informative. The kafka-consumer-diag lines show the consumer thread DID
make initial progress (position advanced 0 -> ~17775 within the first
50 polls in 8 seconds), then froze at that position for the remaining
20 minutes -- 1200+ consecutive empty polls with the same
consumerPosition / lastFetchedOffset. Kafka has all 115545 committed
records readable to a fresh standalone consumer, so the broker side is
healthy; the wedge is in the long-lived Pinot consumer's fetch session.

The most likely cause is a stale incremental-fetch session: after the
broker's LSO advanced, the session that the consumer established at
LSO = 17775 was never updated with the new value, so subsequent fetches
all return "no records past 17775" even though there are.

Add a periodic re-seek as a recovery mechanism: after every 100
consecutive empty polls (~14s at the observed cadence), re-seek to the
current position. seek() invalidates the consumer's incremental-fetch
session, forcing the next poll to establish a fresh session that
re-reads metadata (including the broker's current LSO).

This fires only on the wedge path; for healthy consumers that drain
records, _consecutiveEmptyPolls stays at 0 and the new code is a no-op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Latest CI iteration with the consumer-side fixes shows the Pinot consumer
starts ingesting records, advances internally to position ~17775 within
the first 8 seconds, then locks at exactly that offset for the remaining
20 minutes. Even periodic re-seeks to the consumer's current position do
not unblock it -- the broker-side incremental-fetch session is wedged at
the LSO it observed when the session was first established (during the
in-progress push), and a consumer-side seek does not invalidate that
session.

The base BaseRealtimeClusterIntegrationTest.setUp does:
  addTableConfig            -> Pinot consumer is created and polling
  waitForRealtimePartitionsConsuming
  pushAvroIntoKafka         -> records (and LSO) being emitted concurrently
  waitForAllDocsLoaded
which is exactly what creates the stale-session wedge: the Pinot consumer
establishes a fetch session against a moving LSO, and once the LSO
crosses some threshold visible only on the leader, the broker stops
sending updates over the existing session.

Override setUp for this test to reorder:
  start cluster + addSchema
  pushAvroIntoKafka         -> push (waitForAllCommittedRecordsVisible
                                     verifies all expected records are
                                     readable to a fresh read_committed
                                     consumer before returning)
  addTableConfig            -> Pinot consumer starts polling NOW, with
                                Kafka's LSO already at log-end
  waitForRealtimePartitionsConsuming + waitForAllDocsLoaded
The Pinot consumer's first fetch sees a fully stable Kafka log, so no
stale-LSO session can be established. The exactly-once contract is
unchanged: Pinot still reads via read_committed and the broker still
filters the aborted batch.

This is a test-side reordering only. No retry, no fresh-topic switching,
no behavior masked: a real exactly-once regression would still fail at
waitForAllDocsLoaded (just like before) -- the difference is that the
test no longer races the consumer's session establishment against the
producer's transactional emission.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The setUp reorder did not unblock the wedge -- with all data in Kafka
before the consumer starts, the consumer still latches at exactly the
same offset (~17730/17761) within the first 8 seconds and then never
advances. The diagnostic shows seek-only re-positions every 100 empty
polls also have no effect: position stays the same.

Root cause of why seek doesn't help: KafkaConsumer.seek() only updates
the consumer-side position. The broker-side incremental-fetch session
is identified by partition assignment, not by the consumer's stored
offset, so seeking to the same offset (or any offset) within the same
session keeps reusing the wedged session state.

Fix: when wedged, drop the partition assignment with assign(emptyList())
and re-assign with assign(singletonList(_topicPartition)). The broker
treats the new assignment as a brand-new session, sends fresh metadata
on the next fetch, and recomputes the records to return based on the
current (correct) LSO.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The diagnostic from the previous run conclusively shows that neither
seek nor assign(empty)+reassign+seek invalidates the broker-side fetch
session in our wedge case -- the consumer remains stuck at exactly the
same offset (~17763) which suspiciously matches
MAX_PARTITION_FETCH_BYTES (10 MB) divided by ~590 bytes/record. This
points to a deeper issue (likely broker-side at the partition-fetch-bytes
boundary under read_committed) that consumer-side intervention cannot
recover from.

Strip the consumer down to the well-justified core:
  * _lastSeekedStartOffset tracks whether we've issued a seek for the
    current startOffset (disambiguates "never seeked" from "seeked but
    got an empty result").
  * Seek only when truly needed: first time at this startOffset, OR when
    _lastFetchedOffset != startOffset - 1.
  * On empty poll, advance _lastFetchedOffset based on consumer.position()
    if it moved past startOffset; otherwise set _lastFetchedOffset =
    startOffset - 1 so the next call doesn't re-seek and undo the
    consumer's internal advance through aborted records.

Drop:
  * FORCE_RESEEK_AFTER_EMPTY_POLLS + the periodic re-assign/seek -- did
    not unwedge the consumer in CI, just added noise.
  * _consecutiveEmptyPolls counter and the periodic empty-poll diagnostic
    log lines -- the wedge is real but neither the seek nor the
    re-assignment recovery lands; keeping a counter that nothing acts on
    is dead weight. The waitForAllDocsLoaded diagnostic in the test
    captures the same information at the test level.

The remaining consumer change is a real fix for the read_committed
empty-first-poll case: without it, _lastFetchedOffset = -1 + the
< 0 clause of the seek-check would re-seek to startOffset on every call
and never let the consumer advance past the aborted region.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 changed the title Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest flake (two root causes) [Flaky Test] ExactlyOnceKafka: fix mode (a), characterize mode (b) Apr 27, 2026
@xiangfu0
Copy link
Copy Markdown
Contributor Author

Status update after ~10 CI iterations on this branch.

Two distinct root causes for this test's flake:

  1. Mode (a) — Kafka markers don't propagate. Caused by the test's redundant log.flush.interval.messages=1 override (added in [Flaky Test] Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest setUp #18061). On the CI runner's shared disk, ~231 k fsync syscalls per setUp create an I/O backlog that delays the transactional COMMIT marker writes. Removed in this PR. Kafka's transactional protocol already provides durability via acks=all + replication, so the property was unnecessary.

  2. Mode (b) — Pinot consumer wedges. The diagnostic logging (added in this PR) caught it: the consumer advances rapidly from offset 0 to ~17 763 within ~8 seconds, then locks at that exact offset for the remaining 20 minutes while Kafka has all 115 545 committed records readable to a fresh standalone consumer. The number 17 763 is suspicious: it matches MAX_PARTITION_FETCH_BYTES_CONFIG (10 MB) ÷ ~590 bytes/record. The wedge happens at exactly the consumer-side fetch-size boundary.

I tried several recovery approaches in the consumer:

  • seek(currentPosition) — no effect, since seek() only updates consumer-side position, not the broker-side fetch session.
  • assign(emptyList()) + reassign + seek — should terminate the broker session entirely, but the consumer still locked at the same offset.

Reordering setUp to push & verify Kafka before creating the realtime table (so the consumer never sees a moving LSO) also didn't help — the wedge happens anyway, with all data already present in Kafka.

So mode (b) appears to be a Kafka broker-side fetch-session bug under read_committed after a max-fetch-size response of aborted records. Beyond what consumer-side intervention can recover. Fixing it likely needs either a Pinot-level periodic full consumer recreation, a Kafka client/broker version change, or an upstream investigation.

This PR ships the mode (a) fix, the strict post-push verification, the setUp reorder, the read_committed empty-poll consumer fix (production-relevant), and diagnostic logging that's what enabled the mode (b) characterization. It does not claim to fix mode (b); the test will still flake on that path. PR description updated with the full story.

The retry / fresh-topic logic from earlier on this branch is removed per your earlier feedback — the changes here are all bug fixes, not retries that mask regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

flaky-test Tracks a test that intermittently fails kafka Related to Kafka stream connector testing Related to tests or test infrastructure

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants