Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,24 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
messagesHaveFixedDelay = false;
return false;
}
log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.attr("deliveryInMs", () -> deliverAt - clock.millis())
.log("Add message");
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
delayedMessagesCount.incrementAndGet();

log.debug()
.attr("ledgerId", ledgerId)
.attr("entryId", entryId)
.attr("deliveryInMs", () -> deliverAt - clock.millis())
.log("Add message");
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);

Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
// Roaring64Bitmap does not store duplicates, so track if it a new element
// so we can keep delayedMessagesCount in sync
boolean isNew = !bitmap.contains(entryId);

if (isNew) {
bitmap.add(entryId);
delayedMessagesCount.incrementAndGet();
}

updateTimer();

Expand Down
Comment thread
chamons marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public synchronized void readMoreEntries() {
}
}

protected Predicate<Position> createReadEntriesSkipConditionForNormalRead() {
protected synchronized Predicate<Position> createReadEntriesSkipConditionForNormalRead() {
Predicate<Position> skipCondition = null;
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
Expand Down Expand Up @@ -1378,12 +1378,12 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
}

@Override
public long getNumberOfDelayedMessages() {
public synchronized long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}

@Override
public CompletableFuture<Void> clearDelayedMessages() {
public synchronized CompletableFuture<Void> clearDelayedMessages() {
if (!topic.isDelayedDeliveryEnabled()) {
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -1464,11 +1464,11 @@ public PersistentTopic getTopic() {
}


public long getDelayedTrackerMemoryUsage() {
public synchronized long getDelayedTrackerMemoryUsage() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}

public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
public synchronized Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
if (delayedDeliveryTracker.isEmpty()) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ public boolean initializeDispatchRateLimiterIfNeeded() {
}

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
public synchronized boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!topic.isDelayedDeliveryEnabled()) {
// If broker has the feature disabled, always deliver messages immediately
return false;
Expand Down Expand Up @@ -1212,12 +1212,12 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
}

@Override
public long getNumberOfDelayedMessages() {
public synchronized long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}

@Override
public CompletableFuture<Void> clearDelayedMessages() {
public synchronized CompletableFuture<Void> clearDelayedMessages() {
if (!topic.isDelayedDeliveryEnabled()) {
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -1291,11 +1291,11 @@ public PersistentTopic getTopic() {
}


public long getDelayedTrackerMemoryUsage() {
public synchronized long getDelayedTrackerMemoryUsage() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}

public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
public synchronized Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
if (delayedDeliveryTracker.isEmpty()) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,14 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc
tracker.close();
}

@Test(dataProvider = "delayedTracker")
public void testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) throws Exception {
tracker.addMessage(1, 1, 50);
tracker.addMessage(1, 1, 50);
tracker.addMessage(1, 1, 50);

clockTime.set(60);

tracker.getScheduledMessages(10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import com.carrotsearch.hppc.ObjectSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -34,6 +39,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -152,4 +158,97 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception {
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}

@Test
public void testRaceConditionInTrackDelayedDelivery() throws Exception {
final int numThreads = 16;
final int operationsPerThread = 2000;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(numThreads);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicReference<Exception> firstException = new AtomicReference<>();

final String topicName = newTopicName();
final String subscription = "s1";

// Needed to create the topic
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared).subscribe();

PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get();

ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(subscription).when(cursor).getName();

Subscription sub = Mockito.mock(PersistentSubscription.class);
Mockito.doReturn(topic).when(sub).getTopic();

PersistentDispatcherMultipleConsumersClassic dispatcher =
new PersistentDispatcherMultipleConsumersClassic(topic, cursor, sub);

// Align all writes to the same bucket
// This is the key which triggers the race condition
long deliverAt = System.currentTimeMillis() + 5000;

MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis())
.setDeliverAtTime(deliverAt);

@Cleanup("shutdown")
ExecutorService executorService = Executors.newFixedThreadPool(32);

// Start clear message thread
for (int i = 0; i < numThreads / 2; i++) {
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
dispatcher.clearDelayedMessages();
Thread.sleep(1);
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
}

// Start track delayed delivery thread
for (int i = numThreads / 2; i < numThreads; i++) {
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
Thread.sleep(1);
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds");

if (errors.get() > 0) {
Exception exception = firstException.get();
if (exception != null) {
System.err.println("First exception caught: " + exception.getMessage());
exception.printStackTrace();
}
}
Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import com.carrotsearch.hppc.ObjectSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -34,6 +39,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -152,4 +158,97 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception {
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}

@Test
public void testRaceConditionInTrackDelayedDelivery() throws Exception {
Comment thread
chamons marked this conversation as resolved.
final int numThreads = 16;
final int operationsPerThread = 2000;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(numThreads);
final AtomicInteger errors = new AtomicInteger(0);
final AtomicReference<Exception> firstException = new AtomicReference<>();

final String topicName = newTopicName();
final String subscription = "s1";

// Needed to create the topic
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared).subscribe();

PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get();

ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(subscription).when(cursor).getName();

Subscription sub = Mockito.mock(PersistentSubscription.class);
Mockito.doReturn(topic).when(sub).getTopic();

PersistentDispatcherMultipleConsumers dispatcher =
new PersistentDispatcherMultipleConsumers(topic, cursor, sub);

// Align all writes to the same bucket
// This is the key which triggers the race condition
long deliverAt = System.currentTimeMillis() + 5000;

MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(1)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis())
.setDeliverAtTime(deliverAt);

@Cleanup("shutdown")
ExecutorService executorService = Executors.newFixedThreadPool(32);
Comment thread
chamons marked this conversation as resolved.

// Start clear message thread
for (int i = 0; i < numThreads / 2; i++) {
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
dispatcher.clearDelayedMessages();
Thread.sleep(1);
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
}

// Start track delayed delivery thread
for (int i = numThreads / 2; i < numThreads; i++) {
executorService.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < operationsPerThread; j++) {
dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
Thread.sleep(1);
}
} catch (Exception e) {
errors.incrementAndGet();
firstException.compareAndSet(null, e);
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
}

startLatch.countDown();
Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds");

if (errors.get() > 0) {
Exception exception = firstException.get();
if (exception != null) {
System.err.println("First exception caught: " + exception.getMessage());
exception.printStackTrace();
}
}
Assert.assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations");
}
}
Loading