[feat] PIP-473: Metadata-Driven Transactions for Scalable Topics#25693
[feat] PIP-473: Metadata-Driven Transactions for Scalable Topics#25693merlimat wants to merge 3 commits intoapache:masterfrom
Conversation
Sub-PIP of PIP-460 (Scalable Topics). Defines transactional support for scalable topics by replacing the in-stream commit/abort marker model with metadata-store-backed state. Adds parallel implementations of TransactionBuffer, PendingAckStore, and Transaction Coordinator that write nothing to any data stream, so sealed segments (split/merge) no longer strand in-flight transactions. Reuses dispatcher, client API, and TC wire commands; legacy in-stream-marker components remain unchanged for persistent:// topics.
Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
lhotari
left a comment
There was a problem hiding this comment.
I've added some review comments and questions.
| - **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers transactional writes in the topic's data stream, tracks aborted transaction IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that uncommitted entries are not delivered. The TB persists its state in a per-namespace system topic (`__transaction_buffer_snapshot`). | ||
| - **PendingAckStore** — a per-(topic, subscription) component that records transactional acknowledgments in a sibling persistent topic (`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only when the transaction commits. | ||
|
|
||
| When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and `END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a **commit or abort marker** as a regular entry in the topic's managed ledger. The dispatcher discovers committed/aborted state by replaying these markers and consulting the in-memory aborted-txn set. |
| 2. Compose correctly with the scalable-topic lifecycle — including splits, merges, and segments sealed mid-transaction. | ||
| 3. Do not require duplicating data (each `producer.send` produces a single managed-ledger append). | ||
| 4. Reuse as much of the existing transaction surface as possible — interfaces, dispatcher integration, client API — so that we are not re-litigating well-understood concerns. | ||
| 5. Coexist with v4 transactions on `persistent://` topics with no behavior change for those topics. |
There was a problem hiding this comment.
Does this mean a transaction ID for a v4 or v5 transaction can be used interchangeably between the two clients?
| > **Move transactional state out of the data stream and into the metadata store.** | ||
|
|
||
| Concretely: keep all existing components and interfaces, and add a parallel implementation of `TransactionBuffer`, `PendingAckStore`, **and Transaction Coordinator** that writes nothing to any data stream. Their state lives entirely in the metadata store. The legacy in-stream-marker components remain, unchanged, for `persistent://` topics; the new metadata-driven components handle `segment://` topics. The dispatcher's contract is unchanged. | ||
|
|
||
| Why introduce a v5 TC rather than reuse the legacy one: the legacy TC stores its log in a system topic (`__transaction_log_*`), which carries the operational concerns of any system topic — compaction can lead to long recovery times, leadership has to be maintained, and recovery is on the data path. With the metadata store available we can have a TC whose state is just a few key-value records, no log, no system topic, no per-broker in-memory replay. Running both TC implementations in parallel keeps v4 transactions byte-for-byte unchanged while the v5 path uses the simpler design. |
There was a problem hiding this comment.
What would be the migration solution and possible rollback solution for existing v4 topic data which has been written in v4 transactions format?
|
|
||
| A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two phases: | ||
|
|
||
| 1. **Per-participant materialization.** When the TC fans out end-txn, each participant broker materializes the decision (commit: advance subscription cursors for acks, evict header cache; abort: drop ops). Once a participant has finished its materialization for `<txnId>`, it deletes its op records (`/txn-op/<txnId>/<seq>` for ops it owns). |
There was a problem hiding this comment.
is it possible to remove any op records for finalized txns until the data has been deleted? as long as the data is retained, do the txn records need to exist so that the entries can be consistently consumed? Without txn commit/abort information, it wouldn't be possible to determine whether certain records should be skipped or not when a new subscription or reader/checkpointconsumer reads the records.
|
|
||
| #### Subscribe / dispatch | ||
|
|
||
| Unchanged. The dispatcher polls `tb.getMaxReadPosition()` and filters by `tb.isTxnAborted(msg)`. The `MetadataTransactionBuffer` answers both from its in-memory caches, fed by metadata-store watches. |
There was a problem hiding this comment.
There's currently a performance issue (either additional latency or CPU spinloop) when dispatcher polls getMaxReadPosition. This happens when the dispatcher has read all entries up to the maxReadPosition.
For example for the read operation in PersistentDispatcherMultipleConsumers.readMoreEntries:
The read will complete here:
After completion, a new read will be issued. This loop will keep on going until the maxReadPosition moves forward.
A better solution would be that the dispatcher could be notified by the transaction component when the maxReadPosition has advanced so that there wouldn't be a need to poll.
btw. The current dispatcher code should be optimized for v4 transactions since even with polling, it could be improved.
| 1. **Per-participant materialization.** When the TC fans out end-txn, each participant broker materializes the decision (commit: advance subscription cursors for acks, evict header cache; abort: drop ops). Once a participant has finished its materialization for `<txnId>`, it deletes its op records (`/txn-op/<txnId>/<seq>` for ops it owns). | ||
| 2. **Header GC sweep.** A periodic sweep scans `idx:txn-by-final-state` for entries past a configurable retention window (e.g. 60 s after `finalized_ms`). For each, it verifies no `/txn-op/<txnId>/*` records remain (orphan check from a participant crash), forces deletion of any leftovers, and finally deletes the header `/txn/<txnId>`. |
There was a problem hiding this comment.
Local Claude Code review comment (I'm not sure how accurate this is):
[BUG] GC orphan-cleanup window vs. slow participant can lose committed work. Step 1 (per-participant materialization) and step 2 (header GC sweep) interleave optimistically. If a participant is partitioned/slow and hasn't materialized when the 60s retention window elapses, the GC sweep finds the participant's /txn-op//* records still present and "forces deletion of any leftovers". When the participant reconnects and runs MetadataPendingAckStore.commit for that txn, its idx:acks-by-segment-subscription range query returns empty, so no acks are ever applied to the cursor — silent loss of committed acks. The text needs either a much larger retention window with rationale, a participant-liveness check before forced deletion, or watcher-driven materialization that is acknowledged before the GC may proceed.
Summary
Sub-PIP of PIP-460: Scalable Topics.
Proposes a transaction model for scalable topics that replaces the in-stream commit/abort marker mechanism with metadata-store-backed state, so transactions compose correctly with the segment lifecycle (splits, merges, sealed segments).
The shape of the change:
MetadataTransactionBuffer— newTransactionBufferimplementation forsegment://topics. Writes nothing to the topic's data stream; transactional state lives as records and secondary indexes in the metadata store.MetadataPendingAckStore— newPendingAckStoreimplementation forsegment://subscriptions. No sibling pending-ack topic.MetadataStoreextensions — surface partition-key co-location, sequential keys, and secondary indexes with range-watch. Default backend (Oxia) maps to native primitives; ZooKeeper and others can implement them sub-optimally without affecting correctness.Existing
persistent://topic behavior is unchanged. v4 transactions keep their current implementation byte-for-byte.The PIP is self-contained — full data model, end-to-end flows (publish, end-txn, dispatch, recovery, GC), backward-compat, and rejected alternatives are in the document.
Test plan