Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,92 @@ public void deleteSubscription(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/seek")
@ApiOperation(value = "Reset a subscription's cursor on every segment to the given"
+ " wall-clock timestamp. The controller uses each segment's recorded sealed-time"
+ " window to dispatch the cheapest per-segment op.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cursor reset successfully on all segments"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission on the namespace"),
@ApiResponse(code = 404, message = "Scalable topic or subscription doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void seekSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription,
@ApiParam(value = "Wall-clock millis since the unix epoch", required = true)
@QueryParam("timestamp") long timestampMs) {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic);

validateTopicOperationAsync(tn, TopicOperation.RESET_CURSOR, subscription)
.thenCompose(__ -> onControllerLeader(tn,
svc -> svc.seekSubscription(tn, subscription, timestampMs)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic", tn)
.attr("timestampMs", timestampMs)
.log("Sought subscription on scalable topic");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic", tn)
.exception(ex).log("Failed to seek subscription");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/skip-all")
@ApiOperation(value = "Skip every undelivered message on the subscription, across every"
+ " segment in the DAG (advance each per-segment cursor to the end).")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Backlog cleared successfully on all segments"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission on the namespace"),
@ApiResponse(code = 404, message = "Scalable topic or subscription doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void clearBacklog(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription) {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, encodedTopic);

validateTopicOperationAsync(tn, TopicOperation.SKIP, subscription)
.thenCompose(__ -> onControllerLeader(tn,
svc -> svc.clearBacklog(tn, subscription)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic", tn)
.log("Cleared backlog on scalable topic");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic", tn)
.exception(ex).log("Failed to clear subscription backlog");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

// --- Segment operations ---

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,136 @@ public void getSubscriptionBacklog(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/seek")
@ApiOperation(value = "Reset the segment topic's subscription cursor to the given timestamp."
+ " Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Cursor reset successfully"),
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Segment topic or subscription not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public void seekSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the parent topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true)
@PathParam("descriptor") String descriptor,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription,
@ApiParam(value = "Wall-clock millis since the unix epoch", required = true)
@QueryParam("timestamp") long timestampMs,
@ApiParam(value = "Whether leader broker redirected this call to this broker.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor);

validateSuperUserAccessAsync()
.thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative))
.thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
// Segment topic not loaded on this owner — could be ownership
// churn or a transient unload. 503 so the caller retries (and so
// the parent-topic seek can't conflate this with the
// subscription-not-found case below, which is tolerated).
throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
"Segment topic not loaded: " + segmentTopic);
}
var sub = optTopic.get().getSubscription(subscription);
if (sub == null) {
throw new RestException(Response.Status.NOT_FOUND,
"Subscription not found on segment: " + subscription);
}
return sub.resetCursor(timestampMs)
.exceptionally(ex -> {
Throwable cause = ex instanceof java.util.concurrent.CompletionException
? ex.getCause() : ex;
if (cause instanceof org.apache.pulsar.broker.service.BrokerServiceException
.SubscriptionInvalidCursorPosition) {
// Empty managed ledger — no entries to seek to. The
// cursor is already at the only valid position, so this
// is a no-op (e.g. a freshly-split active child segment
// that hasn't received any messages yet).
log.debug().attr("segment", segmentTopic)
.attr("subscription", subscription)
.log("Empty segment, treating seek as no-op");
return null;
}
throw org.apache.pulsar.common.util.FutureUtil
.wrapToCompletionException(cause);
});
})
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription).attr("timestampMs", timestampMs)
.exception(ex).log("Failed to seek segment subscription");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/skip-all")
@ApiOperation(value = "Skip every undelivered message on the segment topic's subscription —"
+ " advance the cursor to the end. Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Backlog cleared successfully"),
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Segment topic or subscription not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public void clearSubscriptionBacklog(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the parent topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true)
@PathParam("descriptor") String descriptor,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription,
@ApiParam(value = "Whether leader broker redirected this call to this broker.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor);

validateSuperUserAccessAsync()
.thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative))
.thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
// 503 vs 404 — see the rationale on the seek endpoint above. The
// distinction lets the parent-topic clear-backlog tolerate
// subscription-not-found while still surfacing transient unloads.
throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
"Segment topic not loaded: " + segmentTopic);
}
var sub = optTopic.get().getSubscription(subscription);
if (sub == null) {
throw new RestException(Response.Status.NOT_FOUND,
"Subscription not found on segment: " + subscription);
}
return sub.clearBacklog();
})
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription)
.exception(ex).log("Failed to clear segment subscription backlog");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/{descriptor}")
@ApiOperation(value = "Delete a segment topic. Super-user only.")
Expand Down
Loading
Loading