Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i

private void individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<String, Long> properties) {
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(positions, AckType.Individual, properties);
subscription.acknowledgeMessageAsync(positions, AckType.Individual, properties);
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ protected void handleAck(CommandAck ack) {
.log("Ignoring message acks during topic transfer. Total ignored ack count");
return;
}
consumer.messageAcked(ack).thenRun(() -> {
consumer.messageAcked(ack, hasRequestId).thenRun(() -> {
if (hasRequestId) {
writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException {

void consumerFlow(Consumer consumer, int additionalNumberOfMessages);

void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties);
/**
* @deprecated Use {@link #acknowledgeMessageAsync(List, AckType, Map)} instead.
*/
@Deprecated
default void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
acknowledgeMessageAsync(positions, ackType, properties);
}

CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions, AckType ackType,
Map<String, Long> properties);

String getTopicName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
}

@Override
public void acknowledgeMessage(List<Position> position, AckType ackType, Map<String, Long> properties) {
public CompletableFuture<Void> acknowledgeMessageAsync(List<Position> position, AckType ackType,
Map<String, Long> properties) {
// No-op
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,9 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
public CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions, AckType ackType,
Map<String, Long> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
cursor.updateLastActive();
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();

Expand All @@ -460,21 +461,25 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
if (positions.size() != 1) {
log.warn()
.log("Invalid cumulative ack received with multiple message ids.");
return;
future.completeExceptionally(
new IllegalArgumentException("Invalid cumulative ack received with multiple message ids."));
return future;
}

Position position = positions.get(0);
log.debug()
.attr("position", position)
.log("Cumulative ack on");
AckCallback callback = new AckCallback(previousMarkDeletePosition, future);
cursor.asyncMarkDelete(position, mergeCursorProperties(properties),
markDeleteCallback, previousMarkDeletePosition);
callback, callback);

} else {
log.debug()
.attr("positions", positions)
.log("Individual acks on");
cursor.asyncDelete(positions, deleteCallback, previousMarkDeletePosition);
AckCallback callback = new AckCallback(previousMarkDeletePosition, future);
cursor.asyncDelete(positions, callback, callback);
if (config.isTransactionCoordinatorEnabled()) {
positions.forEach(position -> {
if ((cursor.isMessageDeleted(position))) {
Expand All @@ -494,6 +499,8 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
checkAndApplyReachedEndOfTopicOrTopicMigration(topic, dispatcher.getConsumers());
}
}

return future;
}

public CompletableFuture<Void> transactionIndividualAcknowledge(
Expand All @@ -506,63 +513,69 @@ public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, Lis
return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
}

private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
private final class AckCallback implements MarkDeleteCallback, DeleteCallback {
private final Position previousMarkDeletePosition;
private final CompletableFuture<Void> completionFuture;

private AckCallback(Position previousMarkDeletePosition, CompletableFuture<Void> completionFuture) {
this.previousMarkDeletePosition = previousMarkDeletePosition;
this.completionFuture = completionFuture;
}

@Override
public void markDeleteComplete(Object ctx) {
Position oldMD = (Position) ctx;
Position newMD = cursor.getMarkDeletedPosition();
log.debug()
.attr("newMD", newMD)
.attr("oldMD", oldMD)
.attr("oldMD", previousMarkDeletePosition)
.log("Mark deleted messages to position from position");
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionChanged(oldMD);
completeAck();
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
// TODO: cut consumer connection on markDeleteFailed
log.debug()
.attr("ctx", ctx)
.attr("oldMD", previousMarkDeletePosition)
.exceptionMessage(exception)
.log("Failed to mark delete for position");
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
failAck(exception);
}
};

private final DeleteCallback deleteCallback = new DeleteCallback() {
@Override
public void deleteComplete(Object context) {
// The value of the param "context" is a position.
log.debug()
.attr("context", context)
.attr("oldMD", previousMarkDeletePosition)
.log("Deleted message");
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, context);
}
notifyTheMarkDeletePositionChanged((Position) context);
completeAck();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn()
.attr("ctx", ctx)
.attr("odMD", previousMarkDeletePosition)
.exceptionMessage(exception)
.log("Failed to delete message");
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
failAck(exception);
}

private void completeAck() {
Dispatcher currentDispatcher = dispatcher;
if (currentDispatcher != null) {
currentDispatcher.afterAckMessages(null, this);
}
notifyTheMarkDeletePositionChanged(previousMarkDeletePosition);
completionFuture.complete(null);
}
};

private void failAck(ManagedLedgerException exception) {
Dispatcher currentDispatcher = dispatcher;
if (currentDispatcher != null) {
currentDispatcher.afterAckMessages(exception, this);
}
completionFuture.completeExceptionally(exception);
}
}

/**
* This method is called after acknowledgements (such as individual acks) have been processed and the mark-delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact
}

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
public CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions, AckType ackType,
Map<String, Long> properties) {
checkArgument(ackType == AckType.Cumulative);
checkArgument(positions.size() == 1);
checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

Position position = positions.get(0);

Expand All @@ -93,6 +95,7 @@ public void markDeleteComplete(Object ctx) {
if (previousContext != null) {
compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
}
completionFuture.complete(null);
}

@Override
Expand All @@ -101,14 +104,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.debug()
.exception(exception)
.log("Failed to mark delete for position on compactor subscription");
completionFuture.completeExceptionally(exception);
}
}, null);
}).exceptionally(ex -> {
completionFuture.completeExceptionally(ex);
return null;
});

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
checkAndApplyReachedEndOfTopicOrTopicMigration(topic, dispatcher.getConsumers());
}

return completionFuture;
}

CompletableFuture<Void> cleanCompactedLedger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {

PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
if (sub != null) {
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
sub.acknowledgeMessageAsync(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
} else {
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster.
log.info()
Expand All @@ -228,7 +228,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest,
true /* replicateSubscriptionState */, Collections.emptyMap())
.thenAccept(subscriptionCreated -> {
subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
subscriptionCreated.acknowledgeMessageAsync(Collections.singletonList(pos),
AckType.Cumulative, Collections.emptyMap());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,19 @@ private void internalCommitTxn(TxnID txnID, Map<String, Long> properties, long l
if (cumulativeAckOfTransaction.getKey().equals(txnID)) {
pendingAckStoreFuture.thenAccept(pendingAckStore -> pendingAckStore
.appendCommitMark(txnID, AckType.Cumulative).thenAccept(v -> {
log.debug()
.attr("txnId", txnID)
.log("Transaction pending ack store commit cumulative success");
persistentSubscription.acknowledgeMessage(
Collections.singletonList(cumulativeAckOfTransaction.getValue()),
AckType.Cumulative, properties);
cumulativeAckOfTransaction = null;
commitFuture.complete(null);
log.debug()
.attr("txnId", txnID)
.log("Transaction pending ack store commit cumulative success");
persistentSubscription.acknowledgeMessageAsync(
Collections.singletonList(cumulativeAckOfTransaction.getValue()),
AckType.Cumulative, properties)
.thenRun(() -> {
cumulativeAckOfTransaction = null;
commitFuture.complete(null);
}).exceptionally(ackError -> {
commitFuture.completeExceptionally(ackError);
return null;
});
}).exceptionally(e -> {
log.error()
.attr("txnId", txnID)
Expand All @@ -496,12 +501,14 @@ private void internalCommitTxn(TxnID txnID, Map<String, Long> properties, long l
log.debug()
.attr("txnId", txnID)
.log("Transaction pending ack store commit individual success");
individualAckCommitCommon(
txnID,
pendingAckMessageForCurrentTxn,
properties);
commitFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
individualAckCommitCommon(txnID, pendingAckMessageForCurrentTxn, properties)
.thenRun(() -> {
commitFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
}).exceptionally(ackError -> {
commitFuture.completeExceptionally(ackError);
return null;
});
} else {
commitFuture.complete(null);
}
Expand Down Expand Up @@ -770,7 +777,7 @@ private void individualAckAbortCommon(TxnID txnID, HashMap<Position, Position> c
protected void handleCommit(TxnID txnID, AckType ackType, Map<String, Long> properties) {
if (ackType == AckType.Cumulative) {
if (this.cumulativeAckOfTransaction != null) {
persistentSubscription.acknowledgeMessage(
persistentSubscription.acknowledgeMessageAsync(
Collections.singletonList(this.cumulativeAckOfTransaction.getValue()),
AckType.Cumulative, properties);
}
Expand All @@ -786,13 +793,16 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map<String, Long> prop
}
}

private void individualAckCommitCommon(TxnID txnID,
HashMap<Position, Position> currentTxn,
Map<String, Long> properties) {
private CompletableFuture<Void> individualAckCommitCommon(TxnID txnID,
HashMap<Position, Position> currentTxn,
Map<String, Long> properties) {
if (currentTxn != null) {
persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()),
AckType.Individual, properties);
individualAckOfTransaction.remove(txnID);
return persistentSubscription.acknowledgeMessageAsync(new ArrayList<>(currentTxn.values()),
AckType.Individual, properties).thenRun(() -> {
individualAckOfTransaction.remove(txnID);
});
} else {
return CompletableFuture.completedFuture(null);
}
}

Expand Down Expand Up @@ -1151,4 +1161,4 @@ protected void handleCacheRequest() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,9 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa
assertTrue(consumedTimestamp < lastConsumedTimestamp);
assertTrue(ackedTimestamp < lastAckedTimestamp);
assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp);
// consumedFlowTimestamp may change due to deferred ack completion triggering
// additional consumerFlow calls. Only verify it's not reset.
assertTrue(lastConsumedFlowTimestamp >= consumedFlowTimestamp);
assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);
assertEquals(firstConsumedFlowTimestamp, firstConsumedFlowTimestamp2);
Expand Down
Loading
Loading