diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 01b2320f99a65..32468c4889259 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -404,6 +404,92 @@ public void deleteSubscription( }); } + @POST + @Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/seek") + @ApiOperation(value = "Reset a subscription's cursor on every segment to the given" + + " wall-clock timestamp. The controller uses each segment's recorded sealed-time" + + " window to dispatch the cheapest per-segment op.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Cursor reset successfully on all segments"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission on the namespace"), + @ApiResponse(code = 404, message = "Scalable topic or subscription doesn't exist"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void seekSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Wall-clock millis since the unix epoch", required = true) + @QueryParam("timestamp") long timestampMs) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + + validateTopicOperationAsync(tn, TopicOperation.RESET_CURSOR, subscription) + .thenCompose(__ -> onControllerLeader(tn, + svc -> svc.seekSubscription(tn, subscription, timestampMs))) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()) + .attr("subscription", subscription).attr("topic", tn) + .attr("timestampMs", timestampMs) + .log("Sought subscription on scalable topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()) + .attr("subscription", subscription).attr("topic", tn) + .exception(ex).log("Failed to seek subscription"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/skip-all") + @ApiOperation(value = "Skip every undelivered message on the subscription, across every" + + " segment in the DAG (advance each per-segment cursor to the end).") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Backlog cleared successfully on all segments"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission on the namespace"), + @ApiResponse(code = 404, message = "Scalable topic or subscription doesn't exist"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void clearBacklog( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription) { + validateNamespaceName(tenant, namespace); + TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic); + + validateTopicOperationAsync(tn, TopicOperation.SKIP, subscription) + .thenCompose(__ -> onControllerLeader(tn, + svc -> svc.clearBacklog(tn, subscription))) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()) + .attr("subscription", subscription).attr("topic", tn) + .log("Cleared backlog on scalable topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()) + .attr("subscription", subscription).attr("topic", tn) + .exception(ex).log("Failed to clear subscription backlog"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + // --- Segment operations --- @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java index 0ae8b3264a5ad..507119518b07d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java @@ -228,6 +228,136 @@ public void getSubscriptionBacklog( }); } + @POST + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/seek") + @ApiOperation(value = "Reset the segment topic's subscription cursor to the given timestamp." + + " Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Cursor reset successfully"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 404, message = "Segment topic or subscription not found"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void seekSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Wall-clock millis since the unix epoch", required = true) + @QueryParam("timestamp") long timestampMs, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString())) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + // Segment topic not loaded on this owner — could be ownership + // churn or a transient unload. 503 so the caller retries (and so + // the parent-topic seek can't conflate this with the + // subscription-not-found case below, which is tolerated). + throw new RestException(Response.Status.SERVICE_UNAVAILABLE, + "Segment topic not loaded: " + segmentTopic); + } + var sub = optTopic.get().getSubscription(subscription); + if (sub == null) { + throw new RestException(Response.Status.NOT_FOUND, + "Subscription not found on segment: " + subscription); + } + return sub.resetCursor(timestampMs) + .exceptionally(ex -> { + Throwable cause = ex instanceof java.util.concurrent.CompletionException + ? ex.getCause() : ex; + if (cause instanceof org.apache.pulsar.broker.service.BrokerServiceException + .SubscriptionInvalidCursorPosition) { + // Empty managed ledger — no entries to seek to. The + // cursor is already at the only valid position, so this + // is a no-op (e.g. a freshly-split active child segment + // that hasn't received any messages yet). + log.debug().attr("segment", segmentTopic) + .attr("subscription", subscription) + .log("Empty segment, treating seek as no-op"); + return null; + } + throw org.apache.pulsar.common.util.FutureUtil + .wrapToCompletionException(cause); + }); + }) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription).attr("timestampMs", timestampMs) + .exception(ex).log("Failed to seek segment subscription"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/skip-all") + @ApiOperation(value = "Skip every undelivered message on the segment topic's subscription —" + + " advance the cursor to the end. Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Backlog cleared successfully"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 404, message = "Segment topic or subscription not found"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void clearSubscriptionBacklog( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString())) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + // 503 vs 404 — see the rationale on the seek endpoint above. The + // distinction lets the parent-topic clear-backlog tolerate + // subscription-not-found while still surfacing transient unloads. + throw new RestException(Response.Status.SERVICE_UNAVAILABLE, + "Segment topic not loaded: " + segmentTopic); + } + var sub = optTopic.get().getSubscription(subscription); + if (sub == null) { + throw new RestException(Response.Status.NOT_FOUND, + "Subscription not found on segment: " + subscription); + } + return sub.clearBacklog(); + }) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .exception(ex).log("Failed to clear segment subscription backlog"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/{descriptor}") @ApiOperation(value = "Delete a segment topic. Super-user only.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 552ca321d8478..3bd562a48e100 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -603,6 +603,113 @@ private CompletableFuture deleteSubscriptionOnAllSegments(String subscript return CompletableFuture.allOf(futures); } + /** + * Reset a subscription's cursor across every segment to the given wall-clock + * timestamp. We use each segment's recorded {@code [createdAtMs, sealedAtMs)} + * window to dispatch the cheapest possible per-segment op: + * + * + * + *

Per-segment failures are surfaced (the call fails-fast). The only tolerated + * outcome is {@code 404 Not Found} from the segment endpoint, which the segment + * REST resource emits exclusively for "subscription not present on this segment" + * (e.g. the cursor hasn't been materialised yet — it will propagate lazily and + * the next seek will land it). Transient unloads / ownership churn surface as + * {@code 503} from the segment endpoint and propagate to the caller, who can + * retry the parent-level operation. + */ + public CompletableFuture seekSubscription(String subscription, long timestampMs) { + checkLeader(); + SegmentLayout layout = this.currentLayout; + CompletableFuture[] futures = layout.getAllSegments().values().stream() + .map(segment -> seekSubscriptionOnSegment(segment, subscription, timestampMs)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(futures); + } + + /** + * Skip every undelivered message on the subscription, across every segment in the + * DAG. Equivalent to advancing each per-segment cursor to the end. + */ + public CompletableFuture clearBacklog(String subscription) { + checkLeader(); + SegmentLayout layout = this.currentLayout; + CompletableFuture[] futures = layout.getAllSegments().values().stream() + .map(segment -> clearSubscriptionBacklogOnSegment(segment, subscription)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(futures); + } + + private CompletableFuture seekSubscriptionOnSegment(SegmentInfo segment, + String subscription, + long timestampMs) { + // Classify the segment relative to the requested timestamp using the recorded + // sealed-time / created-time. This is what makes the parent-level seek O(N segments) + // worth of cheap RPCs rather than O(N) timestamp-based scans of every segment's + // managed ledger. + if (segment.isSealed() && segment.sealedAtMs() > 0 + && segment.sealedAtMs() <= timestampMs) { + // Segment fully predates timestamp → skip everything on this segment. + return clearSubscriptionBacklogOnSegment(segment, subscription); + } + long effective = timestampMs; + if (segment.createdAtMs() > 0 && segment.createdAtMs() >= timestampMs) { + // Segment fully postdates timestamp → seek to start (timestamp=0 == earliest + // for managed-ledger reset-cursor-by-timestamp semantics). + effective = 0L; + } + String segmentName = toSegmentPersistentName(segment); + try { + return brokerService.getPulsar().getAdminClient() + .scalableTopics().seekSegmentSubscriptionAsync(segmentName, subscription, effective) + .exceptionally(ex -> { + Throwable cause = + org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); + if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException + .NotFoundException) { + // 404 from the segment endpoint == "subscription not present + // on this segment" (the segment endpoint uses 503 for + // "topic not loaded"). The cursor will propagate lazily; + // tolerated. + return null; + } + throw org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause); + }); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + + private CompletableFuture clearSubscriptionBacklogOnSegment(SegmentInfo segment, + String subscription) { + String segmentName = toSegmentPersistentName(segment); + try { + return brokerService.getPulsar().getAdminClient() + .scalableTopics() + .clearSegmentSubscriptionBacklogAsync(segmentName, subscription) + .exceptionally(ex -> { + Throwable cause = + org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); + if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException + .NotFoundException) { + // Subscription not present on this segment — tolerated. + // (See seek path for the 404-vs-503 contract.) + return null; + } + throw org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause); + }); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + private CompletableFuture createSubscriptionOnSegment(SegmentInfo segment, String subscription) { String persistentName = toSegmentUnderlyingPersistentName(segment); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java index e43ca06ccbd3f..12c7064e84453 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java @@ -196,6 +196,27 @@ public CompletableFuture deleteSubscription(TopicName topic, String subscr .thenCompose(controller -> controller.deleteSubscription(subscription)); } + /** + * Reset a subscription's cursor across every segment to the given wall-clock + * timestamp. Delegates to the controller leader; the controller uses each + * segment's recorded {@code [createdAtMs, sealedAtMs)} window to dispatch the + * cheapest per-segment op (skip-all, seek-by-timestamp, or seek-to-earliest). + */ + public CompletableFuture seekSubscription(TopicName topic, String subscription, + long timestampMs) { + return getOrCreateController(topic) + .thenCompose(controller -> controller.seekSubscription(subscription, timestampMs)); + } + + /** + * Skip every undelivered message on the subscription, across every segment in + * the DAG. Delegates to the controller leader. + */ + public CompletableFuture clearBacklog(TopicName topic, String subscription) { + return getOrCreateController(topic) + .thenCompose(controller -> controller.clearBacklog(subscription)); + } + /** * Get aggregated stats for a scalable topic. Read-only: does not require leadership. * Returns segment-DAG counts and per-subscription consumer counts, read from the diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index c00b192350458..8c75823ebef0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -456,6 +457,163 @@ public void testCloseReleasesLeadership() throws Exception { } } + // --- Seek subscription / clear backlog --- + + /** + * The seek path classifies each segment by its {@code [createdAtMs, sealedAtMs)} window + * against the requested timestamp. With 3 segments — one sealed entirely before the + * timestamp, one straddling, and one created entirely after — we expect: + *

+ */ + @Test + public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws Exception { + // Build a custom metadata with three segments at specific timestamps. + long t0 = 1_000_000L; + long t1 = 2_000_000L; + long t2 = 3_000_000L; + long t3 = 4_000_000L; + // Segment 0: created at t0, sealed at t1 (entirely before tSeek). + // Segment 1: created at t1, still active (straddles tSeek). + // Segment 2: created at t3, still active (entirely after tSeek). + long tSeek = t2; + + org.apache.pulsar.common.scalable.SegmentInfo seg0 = new org.apache.pulsar.common.scalable.SegmentInfo( + 0L, + org.apache.pulsar.common.scalable.HashRange.of(0x0000, 0x3FFF), + org.apache.pulsar.common.scalable.SegmentState.SEALED, + java.util.List.of(), java.util.List.of(3L), + /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1, + /*createdAtMs*/ t0, /*sealedAtMs*/ t1); + org.apache.pulsar.common.scalable.SegmentInfo seg1 = new org.apache.pulsar.common.scalable.SegmentInfo( + 1L, + org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF), + org.apache.pulsar.common.scalable.SegmentState.ACTIVE, + java.util.List.of(), java.util.List.of(), + 0, -1, t1, -1); + org.apache.pulsar.common.scalable.SegmentInfo seg2 = new org.apache.pulsar.common.scalable.SegmentInfo( + 2L, + org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF), + org.apache.pulsar.common.scalable.SegmentState.ACTIVE, + java.util.List.of(), java.util.List.of(), + 0, -1, t3, -1); + + TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic"); + ScalableTopicMetadata md = ScalableTopicMetadata.builder() + .epoch(2).nextSegmentId(4) + .segments(java.util.Map.of(0L, seg0, 1L, seg1, 2L, seg2)) + .properties(java.util.Map.of()) + .build(); + resources.createScalableTopicAsync(seekTopic, md).get(); + ScalableTopicController c = new ScalableTopicController( + seekTopic, resources, brokerService, coordinationService); + try { + // Stub the segment-aware admin calls. + when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), anyString(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + + c.initialize().get(); + c.seekSubscription("sub-a", tSeek).get(); + + // Sealed-old segment: skip-all admin call once. + verify(scalableTopics, org.mockito.Mockito.times(1)) + .clearSegmentSubscriptionBacklogAsync(anyString(), org.mockito.ArgumentMatchers.eq("sub-a")); + + // Two seek calls: one for the straddling segment (t == tSeek), one for the + // created-after segment (t == 0 because seg2.createdAtMs >= tSeek). + org.mockito.ArgumentCaptor tsCaptor = org.mockito.ArgumentCaptor.forClass(Long.class); + verify(scalableTopics, org.mockito.Mockito.times(2)) + .seekSegmentSubscriptionAsync(anyString(), + org.mockito.ArgumentMatchers.eq("sub-a"), + tsCaptor.capture()); + java.util.List sentTs = tsCaptor.getAllValues(); + assertTrue(sentTs.contains(tSeek), "expected straddling segment to receive tSeek"); + assertTrue(sentTs.contains(0L), "expected created-after segment to receive 0"); + } finally { + c.close().join(); + } + } + + /** Clear-backlog dispatches skip-all to every segment in the DAG. */ + @Test + public void testClearBacklogDispatchesSkipAllToEverySegment() throws Exception { + when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + controller.initialize().get(); + + controller.clearBacklog("sub-a").get(); + // INITIAL_SEGMENTS active segments, no sealed ones in the baseline → exactly N calls. + verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS)) + .clearSegmentSubscriptionBacklogAsync(anyString(), + org.mockito.ArgumentMatchers.eq("sub-a")); + } + + /** + * 404 from a per-segment seek means "subscription not present on that segment" — + * the controller tolerates this as success (cursor will materialise lazily). + */ + @Test + public void testSeekTolerates404SubscriptionNotFound() throws Exception { + when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), anyString(), anyLong())) + .thenReturn(CompletableFuture.failedFuture( + new org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException( + new RuntimeException("Subscription not found"), + "Subscription not found", 404))); + controller.initialize().get(); + + // Should not throw — every segment's 404 is swallowed. + controller.seekSubscription("sub-a", System.currentTimeMillis()).get(); + } + + /** + * 503 from the segment endpoint == "topic not loaded yet". This is a transient + * unload (e.g. ownership churn); the controller MUST surface it so the caller can + * retry the parent-level operation, instead of silently skipping segments. + */ + @Test + public void testSeekPropagates503TransientUnload() throws Exception { + org.apache.pulsar.client.admin.PulsarAdminException unavailable = + new org.apache.pulsar.client.admin.PulsarAdminException( + new RuntimeException("Service Unavailable"), + "Segment topic not loaded", 503); + when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), anyString(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(unavailable)); + controller.initialize().get(); + + java.util.concurrent.ExecutionException ex = + org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class, + () -> controller.seekSubscription("sub-a", System.currentTimeMillis()).get()); + Throwable cause = ex.getCause() instanceof java.util.concurrent.CompletionException + ? ex.getCause().getCause() : ex.getCause(); + assertTrue(cause instanceof org.apache.pulsar.client.admin.PulsarAdminException, + "expected 503 PulsarAdminException to propagate, got " + cause); + } + + /** Same contract as seek: 503 from a per-segment clear-backlog must propagate. */ + @Test + public void testClearBacklogPropagates503TransientUnload() throws Exception { + org.apache.pulsar.client.admin.PulsarAdminException unavailable = + new org.apache.pulsar.client.admin.PulsarAdminException( + new RuntimeException("Service Unavailable"), + "Segment topic not loaded", 503); + when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.failedFuture(unavailable)); + controller.initialize().get(); + + java.util.concurrent.ExecutionException ex = + org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class, + () -> controller.clearBacklog("sub-a").get()); + Throwable cause = ex.getCause() instanceof java.util.concurrent.CompletionException + ? ex.getCause().getCause() : ex.getCause(); + assertTrue(cause instanceof org.apache.pulsar.client.admin.PulsarAdminException, + "expected 503 PulsarAdminException to propagate, got " + cause); + } + // --- Sealed-segment GC --- /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java index 54164b2333953..28ba3bc1a7e1e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java @@ -24,9 +24,7 @@ import static org.testng.Assert.assertTrue; import java.time.Duration; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -176,14 +174,16 @@ public void testAsyncStreamConsumerReceiveAndCumulativeAck() throws Exception { } @Test - public void testAsyncCheckpointConsumerCheckpointAndSeek() throws Exception { + public void testAsyncCheckpointConsumerCheckpoint() throws Exception { + // Verifies that the async view of CheckpointConsumer surfaces the same + // checkpoint as the sync API and that it completes asynchronously. String topic = newScalableTopic(1); @Cleanup Producer producer = v5Client.newProducer(Schema.string()) .topic(topic) .create(); - for (int i = 0; i < 6; i++) { + for (int i = 0; i < 3; i++) { producer.newMessage().value("v-" + i).send(); } @@ -194,28 +194,12 @@ public void testAsyncCheckpointConsumerCheckpointAndSeek() throws Exception { .create(); AsyncCheckpointConsumer async = consumer.async(); - - // Read 3, snapshot via async, read 3 more, then async-seek back. for (int i = 0; i < 3; i++) { Message msg = async.receive().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS); assertEquals(msg.value(), "v-" + i); } Checkpoint mark = async.checkpoint().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS); assertNotNull(mark, "async checkpoint must complete with a non-null position"); - - for (int i = 3; i < 6; i++) { - Message msg = async.receive().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS); - assertEquals(msg.value(), "v-" + i); - } - - async.seek(mark).get(AWAIT.toMillis(), TimeUnit.MILLISECONDS); - Set redelivered = new HashSet<>(); - for (int i = 0; i < 3; i++) { - Message msg = async.receive().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS); - redelivered.add(msg.value()); - } - assertEquals(redelivered, Set.of("v-3", "v-4", "v-5"), - "async seek did not redeliver the post-checkpoint window"); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java index 0cd3b78c8a4bf..697491ad74a4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java @@ -32,7 +32,7 @@ * Basic end-to-end coverage for {@link CheckpointConsumer}: the unmanaged reader-style * API used by connector frameworks (Flink, Spark) — specifically the start-position * sentinels (earliest / latest), checkpoint roundtrip via {@link Checkpoint#toByteArray()}, - * resume from a saved checkpoint, and {@link CheckpointConsumer#seek(Checkpoint)}. + * and resume from a saved checkpoint. * *

All scenarios use a single-segment scalable topic to keep the focus on the * consumer surface itself; cross-segment position-vector behavior lives in the @@ -189,42 +189,6 @@ public void testCheckpointSerializationRoundtrip() throws Exception { assertEquals(received, List.of("v-3", "v-4", "v-5")); } - @Test - public void testSeekRewindsToEarlierCheckpoint() throws Exception { - String topic = newScalableTopic(1); - - @Cleanup - Producer producer = v5Client.newProducer(Schema.string()) - .topic(topic) - .create(); - for (int i = 0; i < 6; i++) { - producer.newMessage().value("v-" + i).send(); - } - - @Cleanup - CheckpointConsumer consumer = v5Client.newCheckpointConsumer(Schema.string()) - .topic(topic) - .startPosition(Checkpoint.earliest()) - .create(); - - // Read 3, snapshot, read 3 more, then seek back to the snapshot — should - // re-deliver the second batch. - for (int i = 0; i < 3; i++) { - assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" + i); - } - Checkpoint mark = consumer.checkpoint(); - for (int i = 3; i < 6; i++) { - assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" + i); - } - - consumer.seek(mark); - for (int i = 3; i < 6; i++) { - Message msg = consumer.receive(Duration.ofSeconds(5)); - assertNotNull(msg, "seek did not redeliver message v-" + i); - assertEquals(msg.value(), "v-" + i); - } - } - @Test public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception { String topic = newScalableTopic(1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java new file mode 100644 index 0000000000000..a5e3a3d195261 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the parent-topic admin seek + clear-backlog operations on a + * scalable-topic subscription. Exercises the real per-segment cursor-reset path + * (managed-ledger {@code resetCursor(timestamp)} / {@code clearBacklog}), which the + * controller-level mock-based tests do not cover. + */ +public class V5SeekSubscriptionTest extends V5ClientBaseTest { + + @Test + public void testSeekRewindsCursorAcrossSingleSegment() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled()) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("seek-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Produce 5 messages, consume + ack — drains the cursor so the rewind is observable. + for (int i = 0; i < 5; i++) { + producer.newMessage().value("v-" + i).send(); + } + for (int i = 0; i < 5; i++) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg); + consumer.acknowledge(msg.id()); + } + + // Snapshot the rewind target — anything published before this should be redelivered. + long mark = System.currentTimeMillis(); + // Slack for the broker's wall-clock vs the test's: producing immediately at `mark` + // can land at `mark` or `mark+1`. Sleep a tick so post-mark messages have a strictly + // later publish time. + Thread.sleep(10); + + for (int i = 5; i < 10; i++) { + producer.newMessage().value("v-" + i).send(); + } + for (int i = 5; i < 10; i++) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg); + consumer.acknowledge(msg.id()); + } + + // Drained — confirm. + assertNull(consumer.receive(Duration.ofMillis(500))); + + // Admin seek back to the mark — post-mark messages must be redelivered. + admin.scalableTopics().seekSubscription(topic, "seek-sub", mark); + + Set redelivered = new HashSet<>(); + for (int i = 0; i < 5; i++) { + Message msg = consumer.receive(Duration.ofSeconds(10)); + assertNotNull(msg, "expected redelivery of post-mark message #" + i); + redelivered.add(msg.value()); + consumer.acknowledge(msg.id()); + } + assertEquals(redelivered, Set.of("v-5", "v-6", "v-7", "v-8", "v-9"), + "seek must redeliver exactly the post-mark window"); + } + + /** + * Regression: a freshly-split active child segment that has received no messages + * straddles {@code timestampMs} but has an empty managed ledger; the per-segment + * {@code resetCursor} on it would have failed with {@code SubscriptionInvalidCursorPosition}, + * which used to bring the entire parent-level seek down. The fix treats empty + * segments as a no-op. + */ + @Test + public void testSeekToleratesEmptyChildSegmentsAfterSplit() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled()) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("split-seek-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Populate the initial segment. + for (int i = 0; i < 5; i++) { + producer.newMessage().value("pre-" + i).send(); + } + + // Drain so we don't redeliver them later. + for (int i = 0; i < 5; i++) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg); + consumer.acknowledge(msg.id()); + } + + // Split — children are empty; parent is sealed with all 5 messages. + long activeSegmentId = -1; + var meta = admin.scalableTopics().getMetadata(topic); + for (var seg : meta.getSegments().values()) { + if (seg.isActive()) { + activeSegmentId = seg.getSegmentId(); + break; + } + } + assertTrue(activeSegmentId >= 0); + admin.scalableTopics().splitSegment(topic, activeSegmentId); + + // Wait for the split to be visible. + Awaitility.await().untilAsserted(() -> { + int active = 0; + var m = admin.scalableTopics().getMetadata(topic); + for (var seg : m.getSegments().values()) { + if (seg.isActive()) { + active++; + } + } + assertEquals(active, 2, "split must produce 2 active children"); + }); + + // Seek to "now" — this exercises the bug. The two empty active children + // straddle the timestamp; without the fix, the controller's allOf would fail + // because resetCursor on an empty managed ledger throws + // SubscriptionInvalidCursorPosition. + long now = System.currentTimeMillis(); + admin.scalableTopics().seekSubscription(topic, "split-seek-sub", now); + + // After seek, no backlog (the sealed parent's data is all from before `now`, + // and the children have no data). So receive must time out. + assertNull(consumer.receive(Duration.ofSeconds(2)), + "seek across empty children must succeed and leave no undelivered messages"); + } + + @Test + public void testClearBacklogDropsAllUndeliveredMessages() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled()) + .create(); + + // Establish the subscription via a short-lived consumer — and then close it. The + // V5 receive-queue would otherwise prefetch the produced messages into its + // client-side buffer, masking the broker-side cursor advance from clearBacklog. + // Closing also releases the segment-cursor fences so clearBacklog itself can + // fence them cleanly. + QueueConsumer bootstrap = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("clear-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + bootstrap.close(); + + for (int i = 0; i < 20; i++) { + producer.newMessage().key("k-" + i).value("v-" + i).send(); + } + + admin.scalableTopics().clearBacklog(topic, "clear-sub"); + + // Re-attach the consumer — every cursor is at the end, so no backlog remains. + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("clear-sub") + .subscribe(); + + assertNull(consumer.receive(Duration.ofSeconds(2)), + "clear-backlog must skip every undelivered message"); + + // Subsequent messages still flow through. + producer.newMessage().value("after-clear").send(); + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg); + assertEquals(msg.value(), "after-clear"); + consumer.acknowledge(msg.id()); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java index dd306c5805e86..10da169d14e08 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java @@ -206,6 +206,38 @@ CompletableFuture createSubscriptionAsync(String topic, String subscriptio */ CompletableFuture deleteSubscriptionAsync(String topic, String subscription); + /** + * Reset a subscription's cursor across every segment to the given wall-clock + * timestamp. The controller uses each segment's recorded sealed-time window to + * dispatch the cheapest per-segment op. + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param subscription Subscription name + * @param timestampMs Wall-clock millis since the unix epoch + */ + void seekSubscription(String topic, String subscription, long timestampMs) + throws PulsarAdminException; + + /** + * Reset a subscription's cursor across every segment, asynchronously. + */ + CompletableFuture seekSubscriptionAsync(String topic, String subscription, + long timestampMs); + + /** + * Skip every undelivered message on the subscription, across every segment in the + * DAG (advance each per-segment cursor to the end). + * + * @param topic Topic name in the format "tenant/namespace/topic" + * @param subscription Subscription name + */ + void clearBacklog(String topic, String subscription) throws PulsarAdminException; + + /** + * Skip every undelivered message on the subscription, asynchronously. + */ + CompletableFuture clearBacklogAsync(String topic, String subscription); + /** * Split a segment into two halves. * @@ -298,4 +330,31 @@ CompletableFuture createSubscriptionAsync(String topic, String subscriptio */ CompletableFuture getSegmentSubscriptionBacklogAsync(String segmentTopic, String subscription); + + /** + * Reset the segment topic's subscription cursor to the given wall-clock timestamp. + * Routes to the broker that owns the segment topic. + * + *

Used internally by the parent-topic seek operation in + * {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController + * ScalableTopicController}: the controller classifies each segment by its + * {@code [createdAtMs, sealedAtMs)} window against the requested timestamp and + * dispatches per-segment seek / skip-all calls. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + * @param timestampMs Wall-clock millis since the unix epoch + */ + CompletableFuture seekSegmentSubscriptionAsync(String segmentTopic, String subscription, + long timestampMs); + + /** + * Skip every undelivered message on the segment topic's subscription — advance the + * cursor to the end of the segment. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + */ + CompletableFuture clearSegmentSubscriptionBacklogAsync(String segmentTopic, + String subscription); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java index 779579f1d1bb9..5a040123d2ed5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java @@ -184,6 +184,33 @@ public CompletableFuture deleteSubscriptionAsync(String topic, String subs return asyncDeleteRequest(path); } + @Override + public void seekSubscription(String topic, String subscription, long timestampMs) + throws PulsarAdminException { + sync(() -> seekSubscriptionAsync(topic, subscription, timestampMs)); + } + + @Override + public CompletableFuture seekSubscriptionAsync(String topic, String subscription, + long timestampMs) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn).path("subscriptions").path(subscription).path("seek") + .queryParam("timestamp", timestampMs); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override + public void clearBacklog(String topic, String subscription) throws PulsarAdminException { + sync(() -> clearBacklogAsync(topic, subscription)); + } + + @Override + public CompletableFuture clearBacklogAsync(String topic, String subscription) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn).path("subscriptions").path(subscription).path("skip-all"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + // --- Split --- @Override @@ -274,6 +301,30 @@ public CompletableFuture getSegmentSubscriptionBacklogAsync(String segment return asyncGetRequest(path, Long.class); } + @Override + public CompletableFuture seekSegmentSubscriptionAsync(String segmentTopic, + String subscription, + long timestampMs) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription).path("seek") + .queryParam("timestamp", timestampMs); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override + public CompletableFuture clearSegmentSubscriptionBacklogAsync(String segmentTopic, + String subscription) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription).path("skip-all"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + // --- Helpers --- private static TopicName validateTopic(String topic) { diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java index 6b24fe8943ddd..e872bb1715868 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api.v5; import java.io.IOException; -import java.time.Instant; import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider; /** @@ -30,9 +29,12 @@ * serialized for external storage (e.g. Flink state, S3) using {@link #toByteArray()}. * *

This is the sole position type used with {@link CheckpointConsumer} — for initial - * positioning use the static factories {@link #earliest()}, {@link #latest()}, - * {@link #atTimestamp(Instant)}, or {@link #fromByteArray(byte[])} to restore from - * a previously saved checkpoint. + * positioning use the static factories {@link #earliest()}, {@link #latest()}, or + * {@link #fromByteArray(byte[])} to restore from a previously saved checkpoint. + * + *

For timestamp-based positioning, use the {@code scalable-topics seek} admin + * operation on the subscription instead — see + * {@link org.apache.pulsar.client.admin.ScalableTopics#seekSubscription}. */ public interface Checkpoint { @@ -44,13 +46,6 @@ public interface Checkpoint { */ byte[] toByteArray(); - /** - * The time at which this checkpoint was created. - * - * @return the creation timestamp of this checkpoint as an {@link Instant} - */ - Instant creationTime(); - // --- Static factories --- /** @@ -71,17 +66,6 @@ static Checkpoint latest() { return PulsarClientProvider.get().latestCheckpoint(); } - /** - * A checkpoint that positions at the first message published at or after the given timestamp. - * - * @param timestamp the timestamp to position at - * @return a {@link Checkpoint} that will start consuming from the first message at or after - * the given timestamp - */ - static Checkpoint atTimestamp(Instant timestamp) { - return PulsarClientProvider.get().checkpointAtTimestamp(timestamp); - } - /** * Deserialize a checkpoint from a byte array previously obtained via {@link #toByteArray()}. * diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java index 0d278fb3eb56e..40c4d8b3af295 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java @@ -91,17 +91,6 @@ public interface CheckpointConsumer extends Closeable { */ Checkpoint checkpoint(); - // --- Seek --- - - /** - * Seek to a previously saved checkpoint, or to a sentinel position such as - * {@link Checkpoint#earliest()} or {@link Checkpoint#latest()}. - * - * @param checkpoint the checkpoint to seek to - * @throws PulsarClientException if the seek fails or a connection error occurs - */ - void seek(Checkpoint checkpoint) throws PulsarClientException; - // --- Async --- /** diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java index e9c6153fec5c0..a54ed29e8160b 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java @@ -62,9 +62,8 @@ public interface CheckpointConsumerBuilder { /** * Set the initial position for this consumer. * - *

Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()}, - * {@link Checkpoint#atTimestamp}, or {@link Checkpoint#fromByteArray} to - * create the appropriate starting position. + *

Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()}, or + * {@link Checkpoint#fromByteArray} to create the appropriate starting position. * *

Defaults to {@link Checkpoint#latest()} if not specified. * diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java index 0963f79203523..79f507fe01b2a 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api.v5; import java.time.Duration; -import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; @@ -88,25 +87,6 @@ public interface StreamConsumerBuilder { */ StreamConsumerBuilder subscriptionName(String subscriptionName); - // --- Seek (initial position override) --- - - /** - * Reset the subscription to a specific message ID. - * - * @param messageId the message ID to seek to - * @return this builder instance for chaining - */ - StreamConsumerBuilder seek(MessageId messageId); - - /** - * Reset the subscription to a specific timestamp. The subscription - * will be positioned at the first message published at or after this timestamp. - * - * @param timestamp the timestamp to seek to - * @return this builder instance for chaining - */ - StreamConsumerBuilder seek(Instant timestamp); - // --- Optional --- /** diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java index f7bb04bfabaa7..b663499e17530 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java @@ -69,15 +69,6 @@ public interface AsyncCheckpointConsumer { */ CompletableFuture checkpoint(); - /** - * Seek to a checkpoint asynchronously. - * - * @param checkpoint the checkpoint to seek to - * @return a {@link CompletableFuture} that completes when the consumer has been repositioned - * to the given checkpoint - */ - CompletableFuture seek(Checkpoint checkpoint); - /** * Close this consumer asynchronously. * diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java index faff56205d69a..e27a02302bc0f 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api.v5.internal; import java.io.IOException; -import java.time.Instant; import java.util.Map; import java.util.ServiceLoader; import java.util.function.Supplier; @@ -87,8 +86,6 @@ public interface PulsarClientProvider { Checkpoint latestCheckpoint(); - Checkpoint checkpointAtTimestamp(Instant timestamp); - // --- Authentication --- Authentication authenticationToken(String token); diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java index f05cfbd4a21ce..fe98786d0d50e 100644 --- a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java @@ -502,26 +502,6 @@ void checkpointConsumerRestore(PulsarClient client) throws Exception { } } - /** Time-travel seek — rewind to a specific timestamp. */ - void checkpointConsumerSeek(PulsarClient client) throws Exception { - try (var consumer = client.newCheckpointConsumer(Schema.string()) - .topic("events") - .startPosition(Checkpoint.latest()) - .create()) { - - // Seek back to replay from a specific time - consumer.seek(Checkpoint.atTimestamp(Instant.parse("2025-12-01T00:00:00Z"))); - - while (true) { - Message msg = consumer.receive(Duration.ofSeconds(5)); - if (msg == null) { - break; - } - System.out.printf("[%s] %s%n", msg.publishTime(), msg.value()); - } - } - } - // ================================================================================== // Helper types for the examples // ================================================================================== diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java index c4a308cbdce58..0391653198f41 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java @@ -137,6 +137,49 @@ void run() throws Exception { } } + @Command(description = "Reset a subscription's cursor on every segment to a given" + + " point in time. Pass --time as a relative offset (e.g. 1h, 5d) — the cursor" + + " is reset to (now - offset).") + private class SeekSubscriptionCmd extends CliCommand { + @Parameters(description = "tenant/namespace/topic", arity = "1") + private String topic; + + @Option(names = {"-s", "--subscription"}, + description = "Subscription name", required = true) + private String subscription; + + @Option(names = {"-t", "--time"}, + description = "Relative offset in the past to seek to (e.g. 1h, 5d, 30m)", + required = true, + converter = org.apache.pulsar.cli.converters.picocli.TimeUnitToMillisConverter.class) + private Long offsetMillis; + + @Override + void run() throws Exception { + long target = System.currentTimeMillis() - offsetMillis; + scalableTopics().seekSubscription(topic, subscription, target); + print("Reset subscription " + subscription + " on topic " + topic + + " to timestamp " + target + " (" + offsetMillis + "ms ago)"); + } + } + + @Command(description = "Skip every undelivered message on the subscription, across every" + + " segment in the DAG.") + private class ClearBacklogCmd extends CliCommand { + @Parameters(description = "tenant/namespace/topic", arity = "1") + private String topic; + + @Option(names = {"-s", "--subscription"}, + description = "Subscription name", required = true) + private String subscription; + + @Override + void run() throws Exception { + scalableTopics().clearBacklog(topic, subscription); + print("Cleared backlog of subscription " + subscription + " on topic " + topic); + } + } + @Command(description = "Merge two adjacent segments into one") private class MergeSegmentsCmd extends CliCommand { @Parameters(description = "tenant/namespace/topic", arity = "1") @@ -166,5 +209,7 @@ public CmdScalableTopics(Supplier admin) { addCommand("delete", new DeleteCmd()); addCommand("split-segment", new SplitSegmentCmd()); addCommand("merge-segments", new MergeSegmentsCmd()); + addCommand("seek", new SeekSubscriptionCmd()); + addCommand("clear-backlog", new ClearBacklogCmd()); } } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java index c4a7bf8d38036..ace455e7ba969 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java @@ -65,11 +65,6 @@ public CompletableFuture checkpoint() { return consumer.checkpointAsync(); } - @Override - public CompletableFuture seek(Checkpoint checkpoint) { - return consumer.seekAsync(checkpoint); - } - @Override public CompletableFuture close() { return consumer.closeAsync(); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java index 566209a2b327c..b9ae8c0a777dc 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import org.apache.pulsar.client.api.v5.Checkpoint; @@ -35,17 +34,14 @@ final class CheckpointV5 implements Checkpoint { private static final byte TYPE_REGULAR = 0; private static final byte TYPE_EARLIEST = 1; private static final byte TYPE_LATEST = 2; - private static final byte TYPE_TIMESTAMP = 3; - static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST, Instant.EPOCH); - static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST, Instant.EPOCH); + static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST); + static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST); private final Map segmentPositions; - private final Instant creationTime; - CheckpointV5(Map segmentPositions, Instant creationTime) { + CheckpointV5(Map segmentPositions) { this.segmentPositions = Map.copyOf(segmentPositions); - this.creationTime = creationTime; } /** @@ -55,16 +51,11 @@ Map segmentPositions() { return segmentPositions; } - @Override - public Instant creationTime() { - return creationTime; - } - @Override public byte[] toByteArray() { - // Format: [1 byte type] [8 bytes creationTimeMillis] [4 bytes numEntries] + // Format: [1 byte type] [4 bytes numEntries] // [for each entry: [8 bytes segmentId] [4 bytes msgIdLen] [msgIdBytes]] - int totalSize = 1 + 8 + 4; + int totalSize = 1 + 4; Map serializedIds = new HashMap<>(); for (var entry : segmentPositions.entrySet()) { byte[] idBytes = entry.getValue().toByteArray(); @@ -74,7 +65,6 @@ public byte[] toByteArray() { ByteBuffer buf = ByteBuffer.allocate(totalSize); buf.put(TYPE_REGULAR); - buf.putLong(creationTime.toEpochMilli()); buf.putInt(segmentPositions.size()); for (var entry : serializedIds.entrySet()) { buf.putLong(entry.getKey()); @@ -95,12 +85,7 @@ static Checkpoint fromByteArray(byte[] data) throws IOException { return switch (type) { case TYPE_EARLIEST -> EARLIEST; case TYPE_LATEST -> LATEST; - case TYPE_TIMESTAMP -> { - long millis = buf.getLong(); - yield new TimestampCheckpoint(Instant.ofEpochMilli(millis)); - } case TYPE_REGULAR -> { - long creationMillis = buf.getLong(); int numEntries = buf.getInt(); Map positions = new HashMap<>(); for (int i = 0; i < numEntries; i++) { @@ -111,48 +96,21 @@ static Checkpoint fromByteArray(byte[] data) throws IOException { positions.put(segmentId, org.apache.pulsar.client.api.MessageId.fromByteArray(msgIdBytes)); } - yield new CheckpointV5(positions, Instant.ofEpochMilli(creationMillis)); + yield new CheckpointV5(positions); } default -> throw new IOException("Unknown checkpoint type: " + type); }; } - static Checkpoint atTimestamp(Instant timestamp) { - return new TimestampCheckpoint(timestamp); - } - /** - * Sentinel checkpoint for earliest/latest positions. + * Sentinel checkpoint for earliest/latest positions. Encoded as a single type byte. */ - private record SentinelCheckpoint(byte type, Instant creation) implements Checkpoint { + private record SentinelCheckpoint(byte type) implements Checkpoint { @Override public byte[] toByteArray() { ByteBuffer buf = ByteBuffer.allocate(1); buf.put(type); return buf.array(); } - - @Override - public Instant creationTime() { - return creation; - } - } - - /** - * Checkpoint that positions at a specific timestamp. - */ - private record TimestampCheckpoint(Instant timestamp) implements Checkpoint { - @Override - public byte[] toByteArray() { - ByteBuffer buf = ByteBuffer.allocate(1 + 8); - buf.put(TYPE_TIMESTAMP); - buf.putLong(timestamp.toEpochMilli()); - return buf.array(); - } - - @Override - public Instant creationTime() { - return timestamp; - } } } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java index 2c53f0bcc48e6..7e8971a4ff9af 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl.v5; import java.io.IOException; -import java.time.Instant; import java.util.Map; import java.util.function.Supplier; import org.apache.pulsar.client.api.v5.Checkpoint; @@ -142,11 +141,6 @@ public Checkpoint latestCheckpoint() { return CheckpointV5.LATEST; } - @Override - public Checkpoint checkpointAtTimestamp(Instant timestamp) { - return CheckpointV5.atTimestamp(timestamp); - } - // --- Authentication --- @Override diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java index 9288780f39be9..1eedbb1456692 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java @@ -20,7 +20,6 @@ import io.github.merlimat.slog.Logger; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -221,8 +220,7 @@ public Messages receiveMulti(int maxMessages, Duration timeout) throws Pulsar * Update the checkpoint position for the segment this message belongs to. Called as * messages cross the boundary from the wire-buffer to the application — that's the * point at which a subsequent {@link #checkpoint()} should reflect "I have processed - * this message", so a {@link #seek(Checkpoint)} back to that checkpoint redelivers - * everything after it. + * this message". * *

{@code msg} may be null (timeout or interrupt path); returns it unchanged so the * caller can pass through the receive result without an extra null-check. @@ -237,22 +235,7 @@ private Message advanceCheckpoint(Message msg) { @Override public Checkpoint checkpoint() { Map positions = new HashMap<>(lastReceivedPositions); - return new CheckpointV5(positions, Instant.now()); - } - - @Override - public void seek(Checkpoint checkpoint) throws PulsarClientException { - try { - seekAsync(checkpoint).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException("Seek interrupted", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof PulsarClientException pce) { - throw pce; - } - throw new PulsarClientException(e.getCause()); - } + return new CheckpointV5(positions); } @Override @@ -298,36 +281,6 @@ CompletableFuture checkpointAsync() { return CompletableFuture.completedFuture(checkpoint()); } - CompletableFuture seekAsync(Checkpoint checkpoint) { - if (checkpoint instanceof CheckpointV5 cp) { - List> futures = new ArrayList<>(); - for (var entry : cp.segmentPositions().entrySet()) { - var readerFuture = segmentReaders.get(entry.getKey()); - if (readerFuture != null) { - futures.add(readerFuture.thenCompose(r -> r.seekAsync(entry.getValue()))); - } - } - return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) - .whenComplete((__, ___) -> messageQueue.clear()); - } else if (checkpoint == CheckpointV5.EARLIEST) { - return seekAllAsync(org.apache.pulsar.client.api.MessageId.earliest); - } else if (checkpoint == CheckpointV5.LATEST) { - return seekAllAsync(org.apache.pulsar.client.api.MessageId.latest); - } else { - return CompletableFuture.failedFuture( - new PulsarClientException("Unsupported checkpoint type: " + checkpoint.getClass())); - } - } - - private CompletableFuture seekAllAsync(org.apache.pulsar.client.api.MessageId position) { - List> futures = new ArrayList<>(); - for (var readerFuture : segmentReaders.values()) { - futures.add(readerFuture.thenCompose(r -> r.seekAsync(position))); - } - return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) - .whenComplete((__, ___) -> messageQueue.clear()); - } - CompletableFuture closeAsync() { closed = true; try { diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java index 6b9f6b67e0736..3f2722a7ea878 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java @@ -19,11 +19,9 @@ package org.apache.pulsar.client.impl.v5; import java.time.Duration; -import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.api.v5.MessageId; import org.apache.pulsar.client.api.v5.PulsarClientException; import org.apache.pulsar.client.api.v5.StreamConsumer; import org.apache.pulsar.client.api.v5.StreamConsumerBuilder; @@ -126,18 +124,6 @@ public StreamConsumerBuilderV5 subscriptionName(String subscriptionName) { return this; } - @Override - public StreamConsumerBuilderV5 seek(MessageId messageId) { - // Seek will be applied after consumer creation - // Store for later use - return this; - } - - @Override - public StreamConsumerBuilderV5 seek(Instant timestamp) { - return this; - } - @Override public StreamConsumerBuilderV5 subscriptionProperties(Map properties) { conf.setSubscriptionProperties(properties); diff --git a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java index bcbe08d7e7c1a..c11381e4e9b6f 100644 --- a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java +++ b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java @@ -22,7 +22,6 @@ import static org.testng.Assert.assertSame; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; -import java.time.Instant; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.v5.Checkpoint; @@ -39,12 +38,11 @@ private static MessageId v4(long ledger, long entry) { @Test public void testRegularCheckpointRoundtrip() throws Exception { - Instant created = Instant.ofEpochMilli(1_700_000_000_000L); Map positions = Map.of( 0L, v4(10, 20), 1L, v4(30, 40), 2L, v4(50, 60)); - CheckpointV5 original = new CheckpointV5(positions, created); + CheckpointV5 original = new CheckpointV5(positions); byte[] bytes = original.toByteArray(); Checkpoint decoded = CheckpointV5.fromByteArray(bytes); @@ -52,7 +50,6 @@ public void testRegularCheckpointRoundtrip() throws Exception { assertTrue(decoded instanceof CheckpointV5, "expected CheckpointV5, got " + decoded.getClass()); CheckpointV5 decodedV5 = (CheckpointV5) decoded; - assertEquals(decodedV5.creationTime(), created); assertEquals(decodedV5.segmentPositions().size(), 3); assertEquals(decodedV5.segmentPositions().get(0L), v4(10, 20)); assertEquals(decodedV5.segmentPositions().get(1L), v4(30, 40)); @@ -61,17 +58,15 @@ public void testRegularCheckpointRoundtrip() throws Exception { @Test public void testRegularCheckpointWithEmptyPositions() throws Exception { - Instant created = Instant.ofEpochMilli(42L); - CheckpointV5 original = new CheckpointV5(Map.of(), created); + CheckpointV5 original = new CheckpointV5(Map.of()); CheckpointV5 decoded = (CheckpointV5) CheckpointV5.fromByteArray(original.toByteArray()); - assertEquals(decoded.creationTime(), created); assertEquals(decoded.segmentPositions().size(), 0); } @Test public void testSegmentPositionsIsImmutable() { - CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2)), Instant.EPOCH); + CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2))); assertThrows(UnsupportedOperationException.class, () -> cp.segmentPositions().put(1L, v4(5, 6))); } @@ -100,26 +95,6 @@ public void testSentinelsHaveCompactEncoding() { assertEquals(CheckpointV5.LATEST.toByteArray().length, 1); } - // --- Timestamp checkpoint --- - - @Test - public void testTimestampCheckpointRoundtrip() throws Exception { - Instant ts = Instant.ofEpochMilli(1_234_567_890L); - Checkpoint cp = CheckpointV5.atTimestamp(ts); - - assertEquals(cp.creationTime(), ts); - - Checkpoint decoded = CheckpointV5.fromByteArray(cp.toByteArray()); - assertEquals(decoded.creationTime(), ts); - } - - @Test - public void testTimestampCheckpointEncodesTypeAndMillis() { - Instant ts = Instant.ofEpochMilli(555L); - byte[] bytes = CheckpointV5.atTimestamp(ts).toByteArray(); - assertEquals(bytes.length, 1 + 8, "timestamp wire: [type][millis]"); - } - // --- Error handling --- @Test