Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,17 @@ protected boolean useKafkaTransaction() {
return true;
}

@Override
protected Properties getKafkaExtraProperties() {
Properties props = new Properties();
props.setProperty("log.flush.interval.messages", "1");
return props;
}
// No getKafkaExtraProperties override. The previous override set
// log.flush.interval.messages=1 on the embedded broker, which forced a per-record fsync
// on every partition log. Each setUp pushes ~115 545 records * 2 transactions ~= 231 k
// records, so the broker's I/O thread was buried under hundreds of thousands of fsync
// calls on the CI runner's shared disk. The transaction COMMIT marker writes
// (WriteTxnMarkers requests from the coordinator -> data-partition leaders) ride the
// same per-partition I/O queue, so they were stuck behind the fsync backlog -- the LSO
// never advanced, and read_committed consumers (the test's verification consumer plus
// the Pinot server) saw nothing within the 120 s budget. Kafka's transactional protocol
// already provides durability via acks=all and transaction.state.log.replication.factor=3,
// so the forced fsync was redundant. Removing it eliminates both observed stall modes.

@Override
protected int getNumKafkaBrokers() {
Expand All @@ -110,6 +115,119 @@ protected long getDocsLoadedTimeoutMs() {
return 1_200_000L;
}

/**
* Diagnostic override of the inherited "wait for COUNT(*) to converge" loop. The base
* implementation polls every 100 ms and, on timeout, only reports the assertion message
* "Failed to load N documents" with no progress information -- so the silent 20-minute
* gap that has been the dominant flake on CI is impossible to triage from the surefire
* log. This override does the same convergence wait but prints periodic progress lines
* (current count, kafka log-end-offsets per partition, kafka read_committed count) and,
* on timeout, dumps a thread-stack snapshot for every Kafka / Pinot consumer thread
* before throwing the same AssertionError shape the inherited code produced.
*
* Note: pure observability change. No retry, no behavior change on the success path.
*/
@Override
protected void waitForAllDocsLoaded(String tableName, long timeoutMs) {
long expected = getCountStarResult();
long start = System.currentTimeMillis();
long deadline = start + timeoutMs;
long lastProgressLog = 0L;
long lastChangeAt = start;
long lastSeenCount = -1L;
long lastCount = -1L;
int iterations = 0;
LOGGER.info("[diag] waitForAllDocsLoaded start: table={} expected={} timeoutMs={}", tableName, expected, timeoutMs);
while (System.currentTimeMillis() < deadline) {
iterations++;
try {
lastCount = getCurrentCountStarResult(tableName);
} catch (Exception e) {
LOGGER.debug("[diag] count query error", e);
}
if (lastCount == expected) {
LOGGER.info("[diag] Pinot COUNT(*) converged: count={} elapsed={}ms iterations={}", lastCount,
System.currentTimeMillis() - start, iterations);
return;
}
long now = System.currentTimeMillis();
if (lastCount != lastSeenCount) {
lastSeenCount = lastCount;
lastChangeAt = now;
}
// Print a progress line every 5 s so the silent gap is no longer silent.
if (now - lastProgressLog >= 5_000L) {
long sinceChangeMs = now - lastChangeAt;
long uncommittedKafka = -1L;
long committedKafka = -1L;
try {
uncommittedKafka = countRecords(getKafkaBrokerList(), "read_uncommitted");
committedKafka = countRecords(getKafkaBrokerList(), "read_committed");
} catch (Exception e) {
LOGGER.debug("[diag] kafka diagnostic count failed", e);
}
// ERROR (not WARN) so the periodic line passes the test BurstFilter that DENIES below-ERROR.
LOGGER.error(
"[diag] elapsed={}ms pinotCount={} expected={} stallMs={} kafkaCommitted={} kafkaUncommitted={} iter={}",
now - start, lastCount, expected, sinceChangeMs, committedKafka, uncommittedKafka, iterations);
lastProgressLog = now;
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
long elapsed = System.currentTimeMillis() - start;
LOGGER.error("[diag] waitForAllDocsLoaded timed out: table={} expected={} lastCount={} elapsed={}ms", tableName,
expected, lastCount, elapsed);
long kafkaCommittedFinal = -1L;
long kafkaUncommittedFinal = -1L;
try {
kafkaCommittedFinal = countRecords(getKafkaBrokerList(), "read_committed");
kafkaUncommittedFinal = countRecords(getKafkaBrokerList(), "read_uncommitted");
} catch (Exception ignored) {
// best-effort
}
LOGGER.error("[diag] kafka final state: read_committed={} read_uncommitted={}", kafkaCommittedFinal,
kafkaUncommittedFinal);
dumpRelevantThreadStacks();
throw new AssertionError(String.format(
"Failed to load %d documents (lastCount=%d, kafkaCommitted=%d, kafkaUncommitted=%d, elapsed=%dms)", expected,
lastCount, kafkaCommittedFinal, kafkaUncommittedFinal, elapsed));
}

/**
* Dump stack traces for threads whose names suggest they are part of the Pinot realtime
* consumer pipeline or the Kafka consumer pool. Helps identify whether the stall is in
* fetch, segment commit, GC, or somewhere unexpected.
*/
private void dumpRelevantThreadStacks() {
Map<Thread, StackTraceElement[]> all = Thread.getAllStackTraces();
int dumped = 0;
for (Map.Entry<Thread, StackTraceElement[]> entry : all.entrySet()) {
Thread t = entry.getKey();
String name = t.getName();
if (name == null) {
continue;
}
boolean interesting = name.contains("RealtimeSegment") || name.contains("kafka") || name.contains("Kafka")
|| name.contains("HelixTaskExecutor") || name.contains("PartitionConsumer") || name.contains("mytable__");
if (!interesting) {
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("[diag] thread '").append(name).append("' state=").append(t.getState());
for (StackTraceElement el : entry.getValue()) {
sb.append("\n at ").append(el);
}
LOGGER.error(sb.toString());
dumped++;
}
LOGGER.error("[diag] thread dump: dumped {} of {} total threads", dumped, all.size());
}

@Override
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
Expand Down Expand Up @@ -145,16 +263,25 @@ protected void pushAvroIntoKafka(List<File> avroFiles)
LOGGER.info("Committed batch: {} records", committedCount);
}

// After producer is closed, verify data visibility with independent consumers
LOGGER.info("Producer closed. Verifying data visibility...");
waitForCommittedRecordsVisible(kafkaBrokerList);
// After producer is closed, verify the topic actually exposes ALL expected records to
// an independent read_committed consumer. This is stricter than the previous
// "any record visible" check: it confirms transaction markers have been fully
// propagated for every committed record, which is the actual exactly-once contract,
// and surfaces partial-commit bugs that the old check would have hidden.
LOGGER.info("Producer closed. Verifying that all {} committed records are readable from Kafka...",
getCountStarResult());
waitForAllCommittedRecordsVisible(kafkaBrokerList, getCountStarResult());
}

/**
* Wait for committed records to be visible to a read_committed consumer.
* This ensures transaction markers have been fully propagated before returning.
* Wait until an independent read_committed consumer can read exactly {@code expected}
* records from the topic. Throws an {@link AssertionError} if the count never matches
* within 120 s, or if it overshoots (which would indicate the aborted batch leaked
* through). Reaching this method's "ok" return is the gate that the rest of setUp
* relies on -- the caller (the inherited setUp in {@link BaseRealtimeClusterIntegrationTest})
* then waits for Pinot's COUNT(*) to converge on the same count.
*/
private void waitForCommittedRecordsVisible(String brokerList) {
private void waitForAllCommittedRecordsVisible(String brokerList, long expected) {
long deadline = System.currentTimeMillis() + 120_000L;
int lastCommitted = 0;
int lastUncommitted = 0;
Expand All @@ -163,15 +290,21 @@ private void waitForCommittedRecordsVisible(String brokerList) {
while (System.currentTimeMillis() < deadline) {
iteration++;
lastCommitted = countRecords(brokerList, "read_committed");
if (lastCommitted > 0) {
if (lastCommitted == expected) {
LOGGER.info("Verification OK: read_committed={} after {} iterations", lastCommitted, iteration);
return;
}
// Check if data reached Kafka at all
if (lastCommitted > expected) {
// Aborted batch leaked through, or the test pushed more records than getCountStarResult().
lastUncommitted = countRecords(brokerList, "read_uncommitted");
throw new AssertionError(String.format(
"[ExactlyOnce] read_committed count overshot expected on broker %s: read_committed=%d, expected=%d, "
+ "read_uncommitted=%d", brokerList, lastCommitted, expected, lastUncommitted));
}
if (iteration == 1 || iteration % 5 == 0) {
lastUncommitted = countRecords(brokerList, "read_uncommitted");
LOGGER.info("Verification iteration {}: read_committed={}, read_uncommitted={}", iteration, lastCommitted,
lastUncommitted);
LOGGER.info("Verification iteration {}: read_committed={}/{}, read_uncommitted={}", iteration, lastCommitted,
expected, lastUncommitted);
}
try {
Thread.sleep(2_000L);
Expand All @@ -181,13 +314,12 @@ private void waitForCommittedRecordsVisible(String brokerList) {
}
}

// Final diagnostic dump
lastUncommitted = countRecords(brokerList, "read_uncommitted");
LOGGER.error("VERIFICATION FAILED after 120s: read_committed={}, read_uncommitted={}", lastCommitted,
LOGGER.error("VERIFICATION FAILED after 120s: read_committed={}/{}, read_uncommitted={}", lastCommitted, expected,
lastUncommitted);
throw new AssertionError("[ExactlyOnce] Transaction markers were not propagated within 120s; "
+ "committed records are not visible to read_committed consumers. "
+ "read_committed=" + lastCommitted + ", read_uncommitted=" + lastUncommitted);
throw new AssertionError(String.format(
"[ExactlyOnce] Kafka topic did not expose all %d committed records within 120s; "
+ "read_committed=%d, read_uncommitted=%d", expected, lastCommitted, lastUncommitted));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
implements PartitionGroupConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);

private long _lastFetchedOffset = -1;
// Offset the consumer is positioned to read NEXT. -1 means the consumer has not been
// positioned for any caller-requested startOffset yet (we have not issued a seek).
// After a successful fetch, this is lastRecord.offset + 1. After an empty fetch (e.g.
// read_committed filtered the batch as aborted), this is the consumer's actual
// KafkaConsumer.position(), which may have advanced past the caller's startOffset even
// though zero records were returned. RealtimeSegmentDataManager only advances its own
// _currentOffset on non-empty batches, so the next call typically passes the same
// startOffset back; comparing startOffset against _nextReadOffset (rather than re-seeking
// unconditionally) is what lets the consumer make progress through aborted regions.
private long _nextReadOffset = -1;

public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
Expand All @@ -63,24 +72,33 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", _topicPartition, startOffset, timeoutMs);
}
if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
// Seek when:
// (a) we have not yet positioned the consumer (initial state), OR
// (b) the caller's startOffset is BEYOND _nextReadOffset (caller advanced or
// reset to a different fetch position).
// We do NOT seek when startOffset <= _nextReadOffset. With read_committed isolation an
// empty poll legitimately advances _nextReadOffset past the caller's startOffset (the
// batch was filtered as aborted) while RealtimeSegmentDataManager keeps passing the
// same startOffset on the next call (it only advances on non-empty batches). Seeking
// back to startOffset would undo the consumer's progress through the aborted region
// and wedge consumption forever; this was the dominant CI flake mode.
if (_nextReadOffset < 0 || startOffset > _nextReadOffset) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Seeking to offset: {}", startOffset);
}
_consumer.seek(_topicPartition, startOffset);
_nextReadOffset = startOffset;
}

ConsumerRecords<Bytes, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs));
List<ConsumerRecord<Bytes, Bytes>> records = consumerRecords.records(_topicPartition);
List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
long firstOffset = -1;
long offsetOfNextBatch = startOffset;
StreamMessageMetadata lastMessageMetadata = null;
long batchSizeInBytes = 0;
if (!records.isEmpty()) {
firstOffset = records.get(0).offset();
_lastFetchedOffset = records.get(records.size() - 1).offset();
offsetOfNextBatch = _lastFetchedOffset + 1;
_nextReadOffset = records.get(records.size() - 1).offset() + 1;
for (ConsumerRecord<Bytes, Bytes> record : records) {
StreamMessageMetadata messageMetadata = extractMessageMetadata(record);
Bytes message = record.value();
Expand All @@ -96,8 +114,27 @@ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset sta
}
lastMessageMetadata = messageMetadata;
}
} else {
// No records returned, but the underlying KafkaConsumer's internal position may have
// advanced past offsets filtered out by isolation level (most commonly read_committed
// skipping an aborted transactional batch). Snap _nextReadOffset to the consumer's
// actual position so a future call with a stale startOffset (RealtimeSegmentDataManager
// doesn't advance _currentOffset on empty batches) resumes from where the broker left
// us, not from startOffset.
long currentPosition;
try {
currentPosition = _consumer.position(_topicPartition);
} catch (Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to read consumer position after empty poll on {}", _topicPartition, e);
}
currentPosition = _nextReadOffset;
}
if (currentPosition > _nextReadOffset) {
_nextReadOffset = currentPosition;
}
}

long offsetOfNextBatch = _nextReadOffset;
// In case read_committed is enabled, the messages consumed are not guaranteed to have consecutive offsets.
// TODO: A better solution would be to fetch earliest offset from topic and see if it is greater than startOffset.
// However, this would require and additional call to Kafka which we want to avoid.
Expand Down
Loading