Skip to content

Commit 4deeced

Browse files
ARTEMIS-5376 Preventing OME from QueueImpl::iteration while paging
1 parent 7d2a565 commit 4deeced

File tree

3 files changed

+233
-37
lines changed

3 files changed

+233
-37
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,4 +1526,7 @@ void slowConsumerDetected(String sessionID,
15261526

15271527
@LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO)
15281528
void retryLockCoordinator(String name);
1529+
1530+
@LogMessage(id = 224158, value = "The operation {} on queue {} cannot read more data from paging into memory. The operation will be interrupted.", level = LogMessage.Level.INFO)
1531+
void preventQueueManagementToFloodMemory(String operation, String queue);
15291532
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@ public int deleteMatchingReferences(Filter filter) throws Exception {
20252025

20262026
@Override
20272027
public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
2028-
return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason));
2028+
return iterQueue("deleteMatchingReferences", flushLimit, filter1, createDeleteMatchingAction(ackReason), false);
20292029
}
20302030

20312031
QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
@@ -2043,9 +2043,11 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
20432043
* This is a generic method for any method interacting on the Queue to move or delete messages Instead of duplicate
20442044
* the feature we created an abstract class where you pass the logic for each message.
20452045
*/
2046-
private int iterQueue(final int flushLimit,
2046+
private int iterQueue(final String operationName,
2047+
final int flushLimit,
20472048
final Filter filter1,
2048-
QueueIterateAction messageAction) throws Exception {
2049+
QueueIterateAction messageAction,
2050+
boolean allowDepaging) throws Exception {
20492051
int count = 0;
20502052
int txCount = 0;
20512053

@@ -2112,26 +2114,59 @@ private int iterQueue(final int flushLimit,
21122114
}
21132115

21142116
if (pageIterator != null) {
2115-
while (pageIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
2116-
PagedReference reference = pageIterator.next();
2117-
pageIterator.remove();
2117+
PageIterator theIterator;
21182118

2119-
if (messageAction.match(reference)) {
2120-
if (!messageAction.actMessage(tx, reference)) {
2121-
addTail(reference, false);
2119+
if (allowDepaging) {
2120+
theIterator = pageIterator;
2121+
} else {
2122+
// if not allowed to depage, it will open a new iterator
2123+
theIterator = pageSubscription.iterator();
2124+
}
2125+
2126+
try {
2127+
while (theIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
2128+
PagedReference reference = theIterator.next();
2129+
2130+
if (messageAction.match(reference)) {
2131+
if (messageAction.actMessage(tx, reference)) {
2132+
theIterator.remove();
2133+
} else {
2134+
if (allowDepaging) {
2135+
theIterator.remove();
2136+
addTail(reference, false);
2137+
if (!needsDepage()) {
2138+
ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, String.valueOf(QueueImpl.this.getName()));
2139+
break;
2140+
}
2141+
}
2142+
}
2143+
txCount++;
2144+
count++;
2145+
} else {
2146+
if (allowDepaging) {
2147+
theIterator.remove();
2148+
addTail(reference, false);
2149+
if (!needsDepage()) {
2150+
ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, String.valueOf(QueueImpl.this.getName()));
2151+
break;
2152+
}
2153+
}
2154+
}
2155+
2156+
if (txCount > 0 && txCount % flushLimit == 0) {
2157+
tx.commit();
2158+
tx = new TransactionImpl(storageManager);
2159+
txCount = 0;
21222160
}
2123-
txCount++;
2124-
count++;
2125-
} else {
2126-
addTail(reference, false);
21272161
}
21282162

2129-
if (txCount > 0 && txCount % flushLimit == 0) {
2130-
tx.commit();
2131-
tx = new TransactionImpl(storageManager);
2132-
txCount = 0;
2163+
} finally {
2164+
if (!allowDepaging) {
2165+
// close the iterator if not allowed to depage
2166+
theIterator.close();
21332167
}
21342168
}
2169+
21352170
}
21362171

21372172
if (txCount > 0) {
@@ -2441,7 +2476,7 @@ public void run() {
24412476
@Override
24422477
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
24432478

2444-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2479+
return iterQueue("sendMessageToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
24452480

24462481
@Override
24472482
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2450,14 +2485,14 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24502485
return true;
24512486
}
24522487

2453-
}) == 1;
2488+
}, false) == 1;
24542489

24552490
}
24562491

24572492
@Override
24582493
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
24592494

2460-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2495+
return iterQueue("sendMessagesToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24612496

24622497
@Override
24632498
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2466,7 +2501,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24662501
return true;
24672502
}
24682503

2469-
});
2504+
}, false);
24702505

24712506
}
24722507

@@ -2476,7 +2511,7 @@ public boolean moveReference(final long messageID,
24762511
final Binding binding,
24772512
final boolean rejectDuplicate) throws Exception {
24782513

2479-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2514+
return iterQueue("moveReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
24802515

24812516
@Override
24822517
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2485,7 +2520,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24852520
return true;
24862521
}
24872522

2488-
}) == 1;
2523+
}, false) == 1;
24892524

24902525
}
24912526

@@ -2513,7 +2548,7 @@ public int moveReferences(final int flushLimit,
25132548
final Integer expectedHits = messageCount > 0 ? messageCount : null;
25142549
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
25152550

2516-
return iterQueue(flushLimit, filter, new QueueIterateAction(expectedHits) {
2551+
return iterQueue("moveReferences", flushLimit, filter, new QueueIterateAction(expectedHits) {
25172552
@Override
25182553
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25192554
boolean ignored = false;
@@ -2537,37 +2572,37 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25372572

25382573
return true;
25392574
}
2540-
});
2575+
}, false);
25412576
}
25422577

25432578
public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
2544-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
2579+
return iterQueue("moveReferencesBetweenSnFQueues", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
25452580
@Override
25462581
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25472582
return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
25482583
}
2549-
});
2584+
}, false);
25502585
}
25512586

25522587
@Override
25532588
public boolean copyReference(final long messageID,
25542589
final SimpleString toQueue,
25552590
final Binding binding) throws Exception {
25562591

2557-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2592+
return iterQueue("copyReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
25582593

25592594
@Override
25602595
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25612596
copy(tx, toQueue, binding, ref);
25622597
return false;
25632598
}
25642599

2565-
}) == 1;
2600+
}, false) == 1;
25662601

25672602
}
25682603

25692604
public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
2570-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2605+
return iterQueue("rerouteMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
25712606
@Override
25722607
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25732608
RoutingContext routingContext = new RoutingContextImpl(tx);
@@ -2579,7 +2614,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25792614
postOffice.processRoute(m, routingContext, false);
25802615
return false;
25812616
}
2582-
});
2617+
}, false);
25832618
}
25842619

25852620
@Override
@@ -2592,7 +2627,7 @@ public int retryMessages(Filter filter, Integer expectedHits) throws Exception {
25922627

25932628
final HashMap<String, Long> queues = new HashMap<>();
25942629

2595-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
2630+
return iterQueue("retryMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
25962631

25972632
@Override
25982633
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2630,37 +2665,37 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26302665
}
26312666
return false;
26322667
}
2633-
});
2668+
}, false);
26342669

26352670
}
26362671

26372672
@Override
26382673
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
26392674

2640-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2675+
return iterQueue("changeReferencePriority", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
26412676

26422677
@Override
26432678
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
26442679
ref.getMessage().setPriority(newPriority);
26452680
return false;
26462681
}
26472682

2648-
}) == 1;
2683+
}, true) == 1;
26492684

26502685
}
26512686

26522687
@Override
26532688
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
26542689

2655-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2690+
return iterQueue("changeReferencesPriority", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
26562691

26572692
@Override
26582693
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
26592694
ref.getMessage().setPriority(newPriority);
26602695
return false;
26612696
}
26622697

2663-
});
2698+
}, true);
26642699

26652700
}
26662701

0 commit comments

Comments
 (0)