[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696
[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API#25696merlimat wants to merge 1 commit intoapache:masterfrom
Conversation
Two new operational primitives on a scalable-topic subscription, exposed
as admin REST endpoints, admin client methods, and pulsar-admin CLI
subcommands:
- seek by wall-clock timestamp: reset every per-segment cursor to a
point in time. The controller uses each segment's recorded
[createdAtMs, sealedAtMs) window to dispatch the cheapest per-segment
op:
- sealed entirely before t -> skip-all (cursor to end)
- created entirely after t -> seek to timestamp 0 (earliest)
- alive at t -> seek to timestamp t
- clear backlog: dispatch skip-all on the subscription across every
segment in the DAG.
Plumbing:
- Per-segment endpoints in /segments/.../subscription/{sub}/seek and
.../skip-all (super-user, route to segment owner). They call the
standard Subscription.resetCursor / clearBacklog under the hood.
- Parent-topic endpoints in /scalable/.../subscriptions/{sub}/seek
and .../skip-all, gated on RESET_CURSOR / SKIP authz, routed to
the controller leader.
- Admin client interface + impl pairs (seekSegmentSubscription /
clearSegmentSubscriptionBacklog and seekSubscription / clearBacklog).
- CLI: `pulsar-admin scalable-topics seek <topic> --subscription <s>
--time 1h` (relative offset, computed as now - offset) and
`pulsar-admin scalable-topics clear-backlog <topic> -s <s>`.
Removals (subscription seek is now an admin operation, not a consumer
operation):
- StreamConsumerBuilder.seek(MessageId / Instant) — were placeholder
no-ops; gone. Initial position uses subscriptionInitialPosition;
timestamp seek uses the new admin API.
- CheckpointConsumer.seek(Checkpoint) and the async counterpart.
Frameworks restore from a saved checkpoint via
CheckpointConsumerBuilder.startPosition(Checkpoint).
- Checkpoint.atTimestamp(Instant) factory and the underlying
TimestampCheckpoint type — timestamp positioning is the admin
surface, not a checkpoint kind.
- Checkpoint.creationTime() — was just metadata, not part of the
position vector. Connector frameworks that need timing record it
themselves. Wire format simplifies accordingly.
Tests:
- ScalableTopicControllerTest:
testSeekSubscriptionDispatchesPerSegmentByTimestamp — three
segments at hand-picked timestamps (one fully before t, one
straddling, one fully after); asserts the right per-segment
admin call is issued for each.
testClearBacklogDispatchesSkipAllToEverySegment — N skip-all
calls for N segments.
- V5CheckpointConsumerBasicTest.testSeekRewindsToEarlierCheckpoint —
removed; corresponding example in Examples.java removed.
- V5AsyncApisTest.testAsyncCheckpointConsumerCheckpointAndSeek —
slimmed to testAsyncCheckpointConsumerCheckpoint.
- CheckpointV5Test — drop timestamp-roundtrip + creationTime
assertions; constructor calls updated to single-arg.
|
[BUG] 404 conflates "segment topic not loaded" with "subscription not found", causing silent failures.
But the segment endpoint in
The admin client only sees the status code, not the message body, so a transient unload (segment owner restarting, ownership churn) during a parent-topic seek is silently swallowed and the caller thinks the seek succeeded across all segments — when in fact one or more were skipped entirely. For a write op like seek / clear-backlog this is a real correctness bug, not just a UX wart. A few ways to fix:
The Javadoc on |
|
[BUG] Empty active segments straddling
Because Realistic worst case: split-segment just ran, the new child segments are active and empty, operator runs Options:
This is the kind of issue an end-to-end "produce → seek → consume" integration test across multiple segments would have caught — the new controller test only exercises mocks, so the managed-ledger reset-cursor path isn't covered at all. |
|
The 2 comments above from a local Claude Code review |
lhotari
left a comment
There was a problem hiding this comment.
LGTM, just check the local Claude Code review comments.
Summary
Two new operational primitives on a scalable-topic subscription, exposed as admin REST endpoints, admin client methods, and
pulsar-adminCLI subcommands.seekby wall-clock timestampReset every per-segment cursor to a point in time. The controller uses each segment's recorded
[createdAtMs, sealedAtMs)window to dispatch the cheapest per-segment op:ttttimestamp=0(earliest)ttimestamp=tclear-backlogDispatch skip-all on the subscription across every segment in the DAG.
Plumbing
/segments/.../subscription/{sub}/seekand.../skip-all(super-user, routed to segment owner). CallSubscription.resetCursor/clearBacklogunder the hood./scalable/.../subscriptions/{sub}/seekand.../skip-all, gated onRESET_CURSOR/SKIPauthz, routed to the controller leader.seekSegmentSubscription,clearSegmentSubscriptionBacklog) and parent-level (seekSubscription,clearBacklog).pulsar-admin scalable-topics seek <topic> --subscription <s> --time 1h—--timeis a relative offset; the absolute timestamp passed to the broker isnow - offset. Standard time-unit converter (1s,5m,1h,5d, …).pulsar-admin scalable-topics clear-backlog <topic> --subscription <s>.Removals
Subscription seek is now an admin operation, not a consumer operation. The following V5 client surface goes away:
StreamConsumerBuilder.seek(MessageId)andseek(Instant)— were placeholder no-ops. Initial position is set viasubscriptionInitialPosition(EARLIEST/LATEST); timestamp seek is the new admin call.CheckpointConsumer.seek(Checkpoint)and the async counterpart. Connector frameworks restore from a saved checkpoint viaCheckpointConsumerBuilder.startPosition(Checkpoint).Checkpoint.atTimestamp(Instant)factory and the underlyingTimestampCheckpointtype — timestamp positioning is the admin surface, not a checkpoint kind.Checkpoint.creationTime()— was just metadata, not part of the position vector. Wire format simplifies accordingly. Connector frameworks that need timing can record it themselves.Test plan
ScalableTopicControllerTest:testSeekSubscriptionDispatchesPerSegmentByTimestamp— three segments at hand-picked timestamps (one fully beforet, one straddling, one fully after); asserts the right per-segment admin call is issued for each.testClearBacklogDispatchesSkipAllToEverySegment— N skip-all calls for N segments.V5CheckpointConsumerBasicTest,V5CheckpointConsumerDagReplayTest,V5CheckpointConsumerGroupTest,V5AsyncApisTest,CheckpointV5Test.pulsar-broker,pulsar-client-admin-api,pulsar-client-admin,pulsar-client-api-v5,pulsar-client-v5,pulsar-client-tools).