diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 94df26119cd94..e57261bba2ee5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -329,7 +329,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i private void individualAcknowledgeMessageIfNeeded(List positions, Map properties) { if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(positions, AckType.Individual, properties); + subscription.acknowledgeMessageAsync(positions, AckType.Individual, properties); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index a198905eed90c..0f4708fa39fe7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -517,6 +517,10 @@ public void doUnsubscribe(final long requestId, boolean force) { } public CompletableFuture messageAcked(CommandAck ack) { + return messageAcked(ack, false); + } + + public CompletableFuture messageAcked(CommandAck ack, boolean requirePersistedAck) { CompletableFuture future; this.lastAckedTimestamp = System.currentTimeMillis(); @@ -549,22 +553,20 @@ public CompletableFuture messageAcked(CommandAck ack) { position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); } + List positionsAcked = Collections.singletonList(position); if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { - List positionsAcked = Collections.singletonList(position); future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked) .thenApply(unused -> 1L); + } else if (requirePersistedAck) { + future = subscription.acknowledgeMessageAsync(positionsAcked, AckType.Cumulative, properties) + .thenApply(unused -> 1L); } else { - List positionsAcked = Collections.singletonList(position); - subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); + subscription.acknowledgeMessageAsync(positionsAcked, AckType.Cumulative, properties); future = CompletableFuture.completedFuture(1L); } } else { - if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) { - future = individualAckWithTransaction(ack); - } else { - future = individualAckNormal(ack, properties); - } + future = individualAck(ack, properties, requirePersistedAck); } return future @@ -575,128 +577,170 @@ public CompletableFuture messageAcked(CommandAck ack) { }); } - //this method is for individual ack not carry the transaction - private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { - List> positionsAcked = new ArrayList<>(); + /** + * Handles individual acknowledgments, with or without a transaction. + * + *

Non-transactional acks defer pending ack state updates until persistence succeeds. + * Transactional acks immediately apply pending ack state updates (preserving the original + * timing behavior), and additionally schedule per-position cleanup on txn storage completion. + */ + private CompletableFuture individualAck(CommandAck ack, Map properties, + boolean requirePersistedAck) { + boolean hasTxn = ack.hasTxnidLeastBits() && ack.hasTxnidMostBits(); + + if (hasTxn && !isTransactionEnabled()) { + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); + } + + // Txn path needs consumer+position+batchSize tuples for transactionIndividualAcknowledge. + List>> txnPositions = + hasTxn ? new ArrayList<>() : null; + // Non-txn path needs plain positions for acknowledgeMessageAsync. + List nonTxnPositions = hasTxn ? null : new ArrayList<>(); + // Deferred completions for non-txn (applied after persistence). + List pendingAckCompletions = new ArrayList<>(); long totalAckCount = 0; + for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); - Position position; + boolean hasAckSet = msgId.getAckSetsCount() > 0; ObjectIntPair ackOwnerConsumerAndBatchSize = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()); + + if (hasTxn && ackOwnerConsumerAndBatchSize == null) { + log.warn() + .attr("ledgerId", msgId.getLedgerId()) + .attr("entryId", msgId.getEntryId()) + .log("Acknowledging message that was already deleted"); + continue; + } + Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - long ackedCount = 0; - int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); - if (msgId.getAckSetsCount() > 0) { - long[] ackSets = new long[msgId.getAckSetsCount()]; - for (int j = 0; j < msgId.getAckSetsCount(); j++) { - ackSets[j] = msgId.getAckSetAt(j); - } - position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer); - if (isTransactionEnabled()) { - //sync the batch position bit set point, in order to delete the position in pending acks - if (Subscription.isIndividualAckMode(subType)) { - ((PersistentSubscription) subscription) - .syncBatchPositionBitSetForPendingAck(position); + + if (hasTxn) { + // Transactional: use batch size from message ID (not from pendingAcks, which may not + // exist for non-Shared subscriptions). PendingAckHandleImpl needs the actual batch size + // to correctly track ack set state. + int batchSize = msgId.hasBatchSize() ? msgId.getBatchSize() : 0; + Position position = AckSetStateUtil.createPositionWithAckSet( + msgId.getLedgerId(), msgId.getEntryId(), null); + long ackedCount; + if (hasAckSet) { + long[] ackSets = new long[msgId.getAckSetsCount()]; + for (int j = 0; j < msgId.getAckSetsCount(); j++) { + ackSets[j] = msgId.getAckSetAt(j); } + AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); + ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); + } else { + ackedCount = 1; } - if (ackedCount > 0) { + txnPositions.add(Pair.of(ackOwnerConsumer, MutablePair.of(position, batchSize))); + + // TODO: If the transaction is later aborted, the unacked count is NOT restored, leading + // to an incorrect (lower) unacked message count. Fixing this requires coordinating + // with PendingAckHandle's commit/abort callbacks to defer consumer-level state + // updates until the transaction outcome is determined. + if (hasAckSet && ackedCount > 0) { boolean updated = ackOwnerConsumer.updateRemainingUnacked( position.getLedgerId(), position.getEntryId(), (int) ackedCount); if (updated) { addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } + } else if (!hasAckSet) { + IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + position.getLedgerId(), position.getEntryId()); + if (removed != null) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); + } } + + totalAckCount += ackedCount; } else { - position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); - IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( - position.getLedgerId(), position.getEntryId()); - if (removed != null) { - ackedCount = removed.leftInt(); - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); - } + // Non-transactional: build position and compute acked count, defer state updates. + Position position = buildPosition(msgId); + int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); + long ackedCount = computeAckedCount(msgId, position, ackOwnerConsumer, batchSize); + + nonTxnPositions.add(position); + pendingAckCompletions.add(new PendingAckCompletion(ackOwnerConsumer, position, + hasAckSet, ackedCount)); + totalAckCount += ackedCount; } - positionsAcked.add(Pair.of(ackOwnerConsumer, position)); + checkAckValidationError(ack, getAckPosition(hasTxn, msgId)); + } - checkAckValidationError(ack, position); + final long finalTotalAckCount = totalAckCount; - totalAckCount += ackedCount; + if (hasTxn) { + CompletableFuture txnFuture = transactionIndividualAcknowledge( + ack.getTxnidMostBits(), ack.getTxnidLeastBits(), + txnPositions.stream().map(Pair::getRight).collect(Collectors.toList())); + if (Subscription.isIndividualAckMode(subType)) { + txnFuture.whenComplete((v, e) -> + txnPositions.forEach(positionPair -> { + Consumer ackOwnerConsumer = positionPair.getLeft(); + MutablePair posAndBatch = positionPair.getRight(); + if (AckSetStateUtil.hasAckSet(posAndBatch.getLeft())) { + if (((PersistentSubscription) subscription) + .checkIsCanDeleteConsumerPendingAck(posAndBatch.left)) { + removePendingAcks(ackOwnerConsumer, posAndBatch.left); + } + } + })); + } + return txnFuture.thenApply(__ -> finalTotalAckCount); } - subscription.acknowledgeMessage(positionsAcked.stream() - .map(Pair::getRight) - .collect(Collectors.toList()), AckType.Individual, properties); - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(totalAckCount); - if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - Position position = positionPair.getRight(); - //check if the position can remove from the consumer pending acks. - // the bit set is empty in pending ack handle. - if (AckSetStateUtil.hasAckSet(position)) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(position)) { - removePendingAcks(ackOwnerConsumer, position); - } - } - })); + + // Non-transactional + CompletableFuture ackFuture = subscription.acknowledgeMessageAsync( + nonTxnPositions, AckType.Individual, properties); + if (requirePersistedAck) { + return ackFuture.thenApply(unused -> { + applyPendingAckCompletions(pendingAckCompletions); + return finalTotalAckCount; + }); } - return completableFuture; + ackFuture.thenRun(() -> applyPendingAckCompletions(pendingAckCompletions)); + return CompletableFuture.completedFuture(finalTotalAckCount); } - - //this method is for individual ack carry the transaction - private CompletableFuture individualAckWithTransaction(CommandAck ack) { - // Individual ack - List>> positionsAcked = new ArrayList<>(); - if (!isTransactionEnabled()) { - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); + /** + * Get the ack position from the message ID. For txn acks the position is built without ack set, + * for non-txn acks it includes ack set data. + */ + private static Position getAckPosition(boolean hasTxn, MessageIdData msgId) { + if (hasTxn) { + return AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); } - - LongAdder totalAckCount = new LongAdder(); - for (int i = 0; i < ack.getMessageIdsCount(); i++) { - MessageIdData msgId = ack.getMessageIdAt(i); - Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); - ObjectIntPair ackOwnerConsumerAndBatchSize = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), - msgId.getEntryId()); - if (ackOwnerConsumerAndBatchSize == null) { - log.warn() - .attr("position", position) - .log("Acknowledging message that was already deleted"); - continue; - } - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - // acked count at least one - long ackedCount; - int batchSize; - if (msgId.hasBatchSize()) { - batchSize = msgId.getBatchSize(); - // ack batch messages set ackeCount = batchSize - ackedCount = msgId.getBatchSize(); - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize()))); - } else { - // ack no batch message set ackedCount = 1 - batchSize = 0; - ackedCount = 1; - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize))); + if (msgId.getAckSetsCount() > 0) { + long[] ackSets = new long[msgId.getAckSetsCount()]; + for (int j = 0; j < msgId.getAckSetsCount(); j++) { + ackSets[j] = msgId.getAckSetAt(j); } + return AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); + } + return PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + } - if (msgId.getAckSetsCount() > 0) { - long[] ackSets = new long[msgId.getAckSetsCount()]; - for (int j = 0; j < msgId.getAckSetsCount(); j++) { - ackSets[j] = msgId.getAckSetAt(j); - } - AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); - ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); - if (ackedCount > 0) { + + private record PendingAckCompletion(Consumer consumer, Position position, boolean hasAckSet, long ackedCount) { + } + + private void applyPendingAckCompletions(List pendingAckCompletions) { + for (PendingAckCompletion pendingAckCompletion : pendingAckCompletions) { + Consumer ackOwnerConsumer = pendingAckCompletion.consumer(); + Position position = pendingAckCompletion.position(); + + if (pendingAckCompletion.hasAckSet()) { + if (pendingAckCompletion.ackedCount() > 0) { boolean updated = ackOwnerConsumer.updateRemainingUnacked( - position.getLedgerId(), position.getEntryId(), (int) ackedCount); + position.getLedgerId(), position.getEntryId(), (int) pendingAckCompletion.ackedCount()); if (updated) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) pendingAckCompletion.ackedCount()); } } } else { @@ -704,53 +748,72 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { position.getLedgerId(), position.getEntryId()); if (removed != null) { addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); - updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); } } + updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); - checkAckValidationError(ack, position); + if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { + //check if the position can remove from the consumer pending acks. + // the bit set is empty in pending ack handle. + if (AckSetStateUtil.hasAckSet(position)) { + if (((PersistentSubscription) subscription) + .checkIsCanDeleteConsumerPendingAck(position)) { + removePendingAcks(ackOwnerConsumer, position); + } + } + } + } + } + + /** + * Build the position from the message id, syncing batch index ack state if needed. + */ + private Position buildPosition(MessageIdData msgId) { + if (msgId.getAckSetsCount() > 0) { + long[] ackSets = new long[msgId.getAckSetsCount()]; + for (int j = 0; j < msgId.getAckSetsCount(); j++) { + ackSets[j] = msgId.getAckSetAt(j); + } + Position position = + AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); + if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { + ((PersistentSubscription) subscription) + .syncBatchPositionBitSetForPendingAck(position); + } + return position; + } + return PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + } - totalAckCount.add(ackedCount); + /** + * Compute the number of newly acked batch indexes for batch-index level acks. + */ + private long computeAckedCount(MessageIdData msgId, Position position, Consumer consumer, int batchSize) { + if (msgId.getAckSetsCount() <= 0) { + // No batch-index ack set: the entire batch is being acked. + return batchSize; } - CompletableFuture completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), - ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList())); - if (Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> - positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - MutablePair positionLongMutablePair = positionPair.getRight(); - if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { - removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); - } - } - })); + long[] ackSets = new long[msgId.getAckSetsCount()]; + for (int j = 0; j < msgId.getAckSetsCount(); j++) { + ackSets[j] = msgId.getAckSetAt(j); } - return completableFuture.thenApply(__ -> totalAckCount.sum()); - } - - private long getAckedCountForBatchIndexLevelEnabled(Position position, int batchSize, long[] ackSets, - Consumer consumer) { - long ackedCount = 0; - if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType) - && consumer.getPendingAcks().contains(position.getLedgerId(), position.getEntryId())) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); - int lastCardinality = cursorBitSet.cardinality(); - BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); - cursorBitSet.and(givenBitSet); - givenBitSet.recycle(); - int currentCardinality = cursorBitSet.cardinality(); - ackedCount = lastCardinality - currentCardinality; - cursorBitSet.recycle(); - } else { - ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); - } + if (!isAcknowledgmentAtBatchIndexLevelEnabled || !Subscription.isIndividualAckMode(subType) + || !consumer.getPendingAcks().contains(position.getLedgerId(), position.getEntryId())) { + return 0; } - return ackedCount; + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet == null) { + return batchSize - BitSet.valueOf(ackSets).cardinality(); + } + BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); + int lastCardinality = cursorBitSet.cardinality(); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); + cursorBitSet.and(givenBitSet); + givenBitSet.recycle(); + int currentCardinality = cursorBitSet.cardinality(); + cursorBitSet.recycle(); + return lastCardinality - currentCardinality; } private long getAckedCountForTransactionAck(int batchSize, long[] ackSets) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 944148c5dc2af..6ef6ebe8ce30f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2429,7 +2429,7 @@ protected void handleAck(CommandAck ack) { .log("Ignoring message acks during topic transfer. Total ignored ack count"); return; } - consumer.messageAcked(ack).thenRun(() -> { + consumer.messageAcked(ack, hasRequestId).thenRun(() -> { if (hasRequestId) { writeAndFlush(Commands.newAckResponse( requestId, null, null, consumerId)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..3463e07747991 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -48,7 +48,16 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { void consumerFlow(Consumer consumer, int additionalNumberOfMessages); - void acknowledgeMessage(List positions, AckType ackType, Map properties); + /** + * @deprecated Use {@link #acknowledgeMessageAsync(List, AckType, Map)} instead. + */ + @Deprecated + default void acknowledgeMessage(List positions, AckType ackType, Map properties) { + acknowledgeMessageAsync(positions, ackType, properties); + } + + CompletableFuture acknowledgeMessageAsync(List positions, AckType ackType, + Map properties); String getTopicName(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 0ea77817ff6ba..23ffc1e9e72d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -207,8 +207,10 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List position, AckType ackType, Map properties) { + public CompletableFuture acknowledgeMessageAsync(List position, AckType ackType, + Map properties) { // No-op + return CompletableFuture.completedFuture(null); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index eb3d024ab9a0d..236a55162347b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -450,8 +450,9 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { dispatcher.consumerFlow(consumer, additionalNumberOfMessages); } - @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public CompletableFuture acknowledgeMessageAsync(List positions, AckType ackType, + Map properties) { + CompletableFuture future = new CompletableFuture<>(); cursor.updateLastActive(); Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); @@ -460,21 +461,25 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -494,6 +499,8 @@ public void acknowledgeMessage(List positions, AckType ackType, Map transactionIndividualAcknowledge( @@ -506,63 +513,69 @@ public CompletableFuture transactionCumulativeAcknowledge(TxnID txnId, Lis return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions); } - private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() { + private final class AckCallback implements MarkDeleteCallback, DeleteCallback { + private final Position previousMarkDeletePosition; + private final CompletableFuture completionFuture; + + private AckCallback(Position previousMarkDeletePosition, CompletableFuture completionFuture) { + this.previousMarkDeletePosition = previousMarkDeletePosition; + this.completionFuture = completionFuture; + } + @Override public void markDeleteComplete(Object ctx) { - Position oldMD = (Position) ctx; Position newMD = cursor.getMarkDeletedPosition(); log.debug() .attr("newMD", newMD) - .attr("oldMD", oldMD) + .attr("oldMD", previousMarkDeletePosition) .log("Mark deleted messages to position from position"); - // Signal the dispatchers to give chance to take extra actions - if (dispatcher != null) { - dispatcher.afterAckMessages(null, ctx); - } - // Signal the dispatchers to give chance to take extra actions - notifyTheMarkDeletePositionChanged(oldMD); + completeAck(); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // TODO: cut consumer connection on markDeleteFailed log.debug() - .attr("ctx", ctx) + .attr("oldMD", previousMarkDeletePosition) .exceptionMessage(exception) .log("Failed to mark delete for position"); - // Signal the dispatchers to give chance to take extra actions - if (dispatcher != null) { - dispatcher.afterAckMessages(null, ctx); - } + failAck(exception); } - }; - private final DeleteCallback deleteCallback = new DeleteCallback() { @Override public void deleteComplete(Object context) { - // The value of the param "context" is a position. log.debug() - .attr("context", context) + .attr("oldMD", previousMarkDeletePosition) .log("Deleted message"); - // Signal the dispatchers to give chance to take extra actions - if (dispatcher != null) { - dispatcher.afterAckMessages(null, context); - } - notifyTheMarkDeletePositionChanged((Position) context); + completeAck(); } @Override public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.warn() - .attr("ctx", ctx) + .attr("odMD", previousMarkDeletePosition) .exceptionMessage(exception) .log("Failed to delete message"); - // Signal the dispatchers to give chance to take extra actions - if (dispatcher != null) { - dispatcher.afterAckMessages(exception, ctx); + failAck(exception); + } + + private void completeAck() { + Dispatcher currentDispatcher = dispatcher; + if (currentDispatcher != null) { + currentDispatcher.afterAckMessages(null, this); } + notifyTheMarkDeletePositionChanged(previousMarkDeletePosition); + completionFuture.complete(null); } - }; + + private void failAck(ManagedLedgerException exception) { + Dispatcher currentDispatcher = dispatcher; + if (currentDispatcher != null) { + currentDispatcher.afterAckMessages(exception, this); + } + completionFuture.completeExceptionally(exception); + } + } /** * This method is called after acknowledgements (such as individual acks) have been processed and the mark-delete diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index b2b4e38e06cb3..d3dda05e9ea15 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -64,11 +64,13 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public CompletableFuture acknowledgeMessageAsync(List positions, AckType ackType, + Map properties) { checkArgument(ackType == AckType.Cumulative); checkArgument(positions.size() == 1); checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY); + CompletableFuture completionFuture = new CompletableFuture<>(); Position position = positions.get(0); @@ -93,6 +95,7 @@ public void markDeleteComplete(Object ctx) { if (previousContext != null) { compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId()); } + completionFuture.complete(null); } @Override @@ -101,14 +104,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.debug() .exception(exception) .log("Failed to mark delete for position on compactor subscription"); + completionFuture.completeExceptionally(exception); } }, null); + }).exceptionally(ex -> { + completionFuture.completeExceptionally(ex); + return null; }); if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { // Notify all consumer that the end of topic was reached checkAndApplyReachedEndOfTopicOrTopicMigration(topic, dispatcher.getConsumers()); } + + return completionFuture; } CompletableFuture cleanCompactedLedger() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 9595a4a628369..0b4e8d1df47fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -217,7 +217,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); + sub.acknowledgeMessageAsync(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info() @@ -228,7 +228,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { topic.createSubscription(update.getSubscriptionName(), InitialPosition.Earliest, true /* replicateSubscriptionState */, Collections.emptyMap()) .thenAccept(subscriptionCreated -> { - subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), + subscriptionCreated.acknowledgeMessageAsync(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index f947cf9ec3da4..c417fd6d18e75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -464,14 +464,19 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l if (cumulativeAckOfTransaction.getKey().equals(txnID)) { pendingAckStoreFuture.thenAccept(pendingAckStore -> pendingAckStore .appendCommitMark(txnID, AckType.Cumulative).thenAccept(v -> { - log.debug() - .attr("txnId", txnID) - .log("Transaction pending ack store commit cumulative success"); - persistentSubscription.acknowledgeMessage( - Collections.singletonList(cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); - cumulativeAckOfTransaction = null; - commitFuture.complete(null); + log.debug() + .attr("txnId", txnID) + .log("Transaction pending ack store commit cumulative success"); + persistentSubscription.acknowledgeMessageAsync( + Collections.singletonList(cumulativeAckOfTransaction.getValue()), + AckType.Cumulative, properties) + .thenRun(() -> { + cumulativeAckOfTransaction = null; + commitFuture.complete(null); + }).exceptionally(ackError -> { + commitFuture.completeExceptionally(ackError); + return null; + }); }).exceptionally(e -> { log.error() .attr("txnId", txnID) @@ -496,12 +501,14 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l log.debug() .attr("txnId", txnID) .log("Transaction pending ack store commit individual success"); - individualAckCommitCommon( - txnID, - pendingAckMessageForCurrentTxn, - properties); - commitFuture.complete(null); - handleLowWaterMark(txnID, lowWaterMark); + individualAckCommitCommon(txnID, pendingAckMessageForCurrentTxn, properties) + .thenRun(() -> { + commitFuture.complete(null); + handleLowWaterMark(txnID, lowWaterMark); + }).exceptionally(ackError -> { + commitFuture.completeExceptionally(ackError); + return null; + }); } else { commitFuture.complete(null); } @@ -770,7 +777,7 @@ private void individualAckAbortCommon(TxnID txnID, HashMap c protected void handleCommit(TxnID txnID, AckType ackType, Map properties) { if (ackType == AckType.Cumulative) { if (this.cumulativeAckOfTransaction != null) { - persistentSubscription.acknowledgeMessage( + persistentSubscription.acknowledgeMessageAsync( Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), AckType.Cumulative, properties); } @@ -786,13 +793,16 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map prop } } - private void individualAckCommitCommon(TxnID txnID, - HashMap currentTxn, - Map properties) { + private CompletableFuture individualAckCommitCommon(TxnID txnID, + HashMap currentTxn, + Map properties) { if (currentTxn != null) { - persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()), - AckType.Individual, properties); - individualAckOfTransaction.remove(txnID); + return persistentSubscription.acknowledgeMessageAsync(new ArrayList<>(currentTxn.values()), + AckType.Individual, properties).thenRun(() -> { + individualAckOfTransaction.remove(txnID); + }); + } else { + return CompletableFuture.completedFuture(null); } } @@ -1151,4 +1161,4 @@ protected void handleCacheRequest() { } } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 661877532b6a7..9eb11d55cef1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1882,7 +1882,9 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa assertTrue(consumedTimestamp < lastConsumedTimestamp); assertTrue(ackedTimestamp < lastAckedTimestamp); assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp); - assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp); + // consumedFlowTimestamp may change due to deferred ack completion triggering + // additional consumerFlow calls. Only verify it's not reset. + assertTrue(lastConsumedFlowTimestamp >= consumedFlowTimestamp); assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats); assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats); assertEquals(firstConsumedFlowTimestamp, firstConsumedFlowTimestamp2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 2ca25c894201b..ded5bdb6c238d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -26,7 +26,6 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -36,6 +35,7 @@ import static org.mockito.Mockito.when; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -85,7 +85,7 @@ public void setup() throws Exception { doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", cursor, false)); - doNothing().when(sub).acknowledgeMessage(any(), any(), any()); + doReturn(CompletableFuture.completedFuture(null)).when(sub).acknowledgeMessageAsync(any(), any(), any()); } @AfterMethod(alwaysRun = true) @@ -125,7 +125,7 @@ public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throw commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessageAsync(any(), any(), any()); } @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") @@ -140,7 +140,7 @@ public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) th commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, times(1)).acknowledgeMessage(any(), any(), any()); + verify(sub, times(1)).acknowledgeMessageAsync(any(), any(), any()); } @Test(timeOut = 5000) @@ -156,6 +156,6 @@ public void testAckWithMoreThanNoneMessageIds() throws Exception { commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessageAsync(any(), any(), any()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index a6c6b6963cc86..3a2774bce2126 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -80,6 +80,7 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.CustomLog; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -1815,15 +1816,18 @@ public void testClosingReplicationProducerTwice() throws Exception { public void testCompactorSubscription() { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); CompactedTopic compactedTopic = mock(CompactedTopic.class); + CompactedTopicContext compactedTopicContext = mock(CompactedTopicContext.class); + LedgerHandle ledgerHandle = mock(LedgerHandle.class); + when(compactedTopicContext.getLedger()).thenReturn(ledgerHandle); when(compactedTopic.newCompactedLedger(any(Position.class), anyLong())) - .thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class))); + .thenReturn(CompletableFuture.completedFuture(compactedTopicContext)); PersistentSubscription sub = new PulsarCompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock); Position position = PositionFactory.create(1, 1); long ledgerId = 0xc0bfefeL; - sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, - Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)); + sub.acknowledgeMessageAsync(Collections.singletonList(position), AckType.Cumulative, + Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)).join(); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } @@ -1994,7 +1998,7 @@ public void testBacklogCursor() throws Exception { assertFalse(cursor3.isActive()); // Write messages to ledger - CountDownLatch latch = new CountDownLatch(backloggedThreshold); + CountDownLatch latch = new CountDownLatch(backloggedThreshold + 1); for (int i = 0; i < backloggedThreshold + 1; i++) { String content = "entry"; // 5 bytes ByteBuf entry = getMessageWithMetadata(content.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index e70ca49bc57fd..018488546b997 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -85,8 +85,8 @@ public void testMarkerDeleteTimes() throws Exception { PersistentSubscription persistentSubscription = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); - persistentSubscription.acknowledgeMessage(Collections.singletonList(position), - AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessageAsync(Collections.singletonList(position), + AckType.Individual, Collections.emptyMap()).join(); verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index deee8bee7286c..0885a3ae43ef2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -193,7 +193,7 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessageAsync(positionList, AckType.Individual, Collections.emptyMap()).join(); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1); @@ -226,9 +226,9 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception { positionList.add(PositionFactory.create(1, 1)); long beforeAcknowledgeTimestamp = System.currentTimeMillis(); Thread.sleep(1); - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessageAsync(positionList, AckType.Individual, Collections.emptyMap()).join(); - // `acknowledgeMessage` should update cursor last active + // `acknowledgeMessageAsync` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index b07bcb8586dc3..7fc59469bd2ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.AssertJUnit.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -127,15 +128,17 @@ public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws Pulsar Assert.assertEquals(pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), 0); final String topicName = "persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer"; + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) .subscriptionName("sub") .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer() .topic(topicName) + .enableBatching(false) .create(); final int messages = 10; @@ -143,15 +146,15 @@ public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws Pulsar producer.send(("message-" + i).getBytes()); } - int received = 0; + List> received = new ArrayList<>(); for (int i = 0; i < messages; i++) { // don't ack messages here - consumer.receive(); - received++; + Message receive = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(receive); + received.add(receive); } - Assert.assertEquals(received, messages); - received = 0; + Assert.assertEquals(received.size(), messages); TopicStats stats = admin.topics().getStats(topicName); Assert.assertEquals(stats.getSubscriptions().size(), 1); @@ -162,22 +165,21 @@ public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws Pulsar Assert.assertEquals(stats.getSubscriptions().entrySet().iterator().next() .getValue().getConsumers().get(0).getUnackedMessages(), messages); - for (int i = 0; i < messages; i++) { - consumer.acknowledge(consumer.receive()); - received++; - } - - Assert.assertEquals(received, messages); - - // wait acknowledge send - Thread.sleep(2000); - - stats = admin.topics().getStats(topicName); + received.forEach(n -> { + try { + consumer.acknowledge(n); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }); - Assert.assertFalse(stats.getSubscriptions().entrySet().iterator().next() - .getValue().getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); - Assert.assertEquals(stats.getSubscriptions().entrySet().iterator().next() - .getValue().getConsumers().get(0).getUnackedMessages(), 0); + Awaitility.await().untilAsserted(() -> { + TopicStats topicStats = admin.topics().getStats(topicName); + Assert.assertFalse(topicStats.getSubscriptions().entrySet().iterator().next() + .getValue().getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + Assert.assertEquals(topicStats.getSubscriptions().entrySet().iterator().next() + .getValue().getConsumers().get(0).getUnackedMessages(), 0); + }); } @Test