diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 8da74a553ddf5..b1d4e8cecbd14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -127,16 +127,24 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { messagesHaveFixedDelay = false; return false; } - log.debug() - .attr("ledgerId", ledgerId) - .attr("entryId", entryId) - .attr("deliveryInMs", () -> deliverAt - clock.millis()) - .log("Add message"); - long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>()) - .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) - .add(entryId); - delayedMessagesCount.incrementAndGet(); + + log.debug() + .attr("ledgerId", ledgerId) + .attr("entryId", entryId) + .attr("deliveryInMs", () -> deliverAt - clock.millis()) + .log("Add message"); + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + + Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + // Roaring64Bitmap does not store duplicates, so track if it a new element + // so we can keep delayedMessagesCount in sync + boolean isNew = !bitmap.contains(entryId); + + if (isNew) { + bitmap.add(entryId); + delayedMessagesCount.incrementAndGet(); + } updateTimer(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c569cf5b68cb2..9bcf9153572a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -427,7 +427,7 @@ public synchronized void readMoreEntries() { } } - protected Predicate createReadEntriesSkipConditionForNormalRead() { + protected synchronized Predicate createReadEntriesSkipConditionForNormalRead() { Predicate skipCondition = null; // Filter out and skip read delayed messages exist in DelayedDeliveryTracker if (delayedDeliveryTracker.isPresent()) { @@ -1378,12 +1378,12 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { } @Override - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override - public CompletableFuture clearDelayedMessages() { + public synchronized CompletableFuture clearDelayedMessages() { if (!topic.isDelayedDeliveryEnabled()) { return CompletableFuture.completedFuture(null); } @@ -1464,11 +1464,11 @@ public PersistentTopic getTopic() { } - public long getDelayedTrackerMemoryUsage() { + public synchronized long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public Map getBucketDelayedIndexStats() { + public synchronized Map getBucketDelayedIndexStats() { if (delayedDeliveryTracker.isEmpty()) { return Collections.emptyMap(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 6a01add4f2e2e..276f8c038a67c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -1146,7 +1146,7 @@ public boolean initializeDispatchRateLimiterIfNeeded() { } @Override - public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { if (!topic.isDelayedDeliveryEnabled()) { // If broker has the feature disabled, always deliver messages immediately return false; @@ -1212,12 +1212,12 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { } @Override - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override - public CompletableFuture clearDelayedMessages() { + public synchronized CompletableFuture clearDelayedMessages() { if (!topic.isDelayedDeliveryEnabled()) { return CompletableFuture.completedFuture(null); } @@ -1291,11 +1291,11 @@ public PersistentTopic getTopic() { } - public long getDelayedTrackerMemoryUsage() { + public synchronized long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public Map getBucketDelayedIndexStats() { + public synchronized Map getBucketDelayedIndexStats() { if (delayedDeliveryTracker.isEmpty()) { return Collections.emptyMap(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index e25595072d3c9..322992d7b1cc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -274,4 +274,14 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) throws Exception { + tracker.addMessage(1, 1, 50); + tracker.addMessage(1, 1, 50); + tracker.addMessage(1, 1, 50); + + clockTime.set(60); + + tracker.getScheduledMessages(10); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java index 5c87c74d9bb42..e81e4bb6c1cc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java @@ -20,7 +20,12 @@ import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.CustomLog; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; @@ -152,4 +158,97 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception { // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testRaceConditionInTrackDelayedDelivery() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference firstException = new AtomicReference<>(); + + final String topicName = newTopicName(); + final String subscription = "s1"; + + // Needed to create the topic + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + + PersistentDispatcherMultipleConsumersClassic dispatcher = + new PersistentDispatcherMultipleConsumersClassic(topic, cursor, sub); + + // Align all writes to the same bucket + // This is the key which triggers the race condition + long deliverAt = System.currentTimeMillis() + 5000; + + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setDeliverAtTime(deliverAt); + + @Cleanup("shutdown") + ExecutorService executorService = Executors.newFixedThreadPool(32); + + // Start clear message thread + for (int i = 0; i < numThreads / 2; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.clearDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start track delayed delivery thread + for (int i = numThreads / 2; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.trackDelayedDelivery(1, 1, messageMetadata); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index e5da7850dfd63..cf91f29988e38 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -20,7 +20,12 @@ import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.CustomLog; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; @@ -152,4 +158,97 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception { // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testRaceConditionInTrackDelayedDelivery() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference firstException = new AtomicReference<>(); + + final String topicName = newTopicName(); + final String subscription = "s1"; + + // Needed to create the topic + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + + PersistentDispatcherMultipleConsumers dispatcher = + new PersistentDispatcherMultipleConsumers(topic, cursor, sub); + + // Align all writes to the same bucket + // This is the key which triggers the race condition + long deliverAt = System.currentTimeMillis() + 5000; + + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setDeliverAtTime(deliverAt); + + @Cleanup("shutdown") + ExecutorService executorService = Executors.newFixedThreadPool(32); + + // Start clear message thread + for (int i = 0; i < numThreads / 2; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.clearDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start track delayed delivery thread + for (int i = numThreads / 2; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.trackDelayedDelivery(1, 1, messageMetadata); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } }