Skip to content

Commit f0c4d57

Browse files
ARTEMIS-5376 Preventing OME from QueueImpl::iteration while paging
1 parent 7d2a565 commit f0c4d57

File tree

3 files changed

+227
-37
lines changed

3 files changed

+227
-37
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,4 +1526,7 @@ void slowConsumerDetected(String sessionID,
15261526

15271527
@LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO)
15281528
void retryLockCoordinator(String name);
1529+
1530+
@LogMessage(id = 224158, value = "The operation {} on queue {} cannot read more data from paging into memory. The operation will be interrupted.", level = LogMessage.Level.INFO)
1531+
void preventQueueManagementToFloodMemory(String operation, String queue);
15291532
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@ public int deleteMatchingReferences(Filter filter) throws Exception {
20252025

20262026
@Override
20272027
public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
2028-
return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason));
2028+
return iterQueue("deleteMatchingReferences", flushLimit, filter1, createDeleteMatchingAction(ackReason), false);
20292029
}
20302030

20312031
QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
@@ -2043,9 +2043,11 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
20432043
* This is a generic method for any method interacting on the Queue to move or delete messages Instead of duplicate
20442044
* the feature we created an abstract class where you pass the logic for each message.
20452045
*/
2046-
private int iterQueue(final int flushLimit,
2046+
private int iterQueue(final String operationName,
2047+
final int flushLimit,
20472048
final Filter filter1,
2048-
QueueIterateAction messageAction) throws Exception {
2049+
QueueIterateAction messageAction,
2050+
boolean allowDepaging) throws Exception {
20492051
int count = 0;
20502052
int txCount = 0;
20512053

@@ -2112,26 +2114,58 @@ private int iterQueue(final int flushLimit,
21122114
}
21132115

21142116
if (pageIterator != null) {
2115-
while (pageIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
2116-
PagedReference reference = pageIterator.next();
2117-
pageIterator.remove();
2117+
PageIterator theIterator;
21182118

2119-
if (messageAction.match(reference)) {
2120-
if (!messageAction.actMessage(tx, reference)) {
2121-
addTail(reference, false);
2119+
if (allowDepaging) {
2120+
theIterator = pageIterator;
2121+
} else {
2122+
// if not allowed to depage, it will open a new iterator
2123+
theIterator = pageSubscription.iterator();
2124+
}
2125+
2126+
try {
2127+
while (theIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
2128+
PagedReference reference = theIterator.next();
2129+
boolean matched = messageAction.match(reference);
2130+
boolean acted = false;
2131+
2132+
if (matched) {
2133+
acted = messageAction.actMessage(tx, reference);
2134+
}
2135+
2136+
if (allowDepaging) {
2137+
// When depaging is allowed, we remove from paging and potentially add to queue tail
2138+
theIterator.remove();
2139+
2140+
if (!acted) {
2141+
// Put non-matching or non-acted messages back to queue tail
2142+
addTail(reference, false);
2143+
if (!needsDepage()) {
2144+
ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, String.valueOf(QueueImpl.this.getName()));
2145+
break;
2146+
}
2147+
}
2148+
}
2149+
2150+
if (matched) {
2151+
txCount++;
2152+
count++;
2153+
}
2154+
2155+
if (txCount > 0 && txCount % flushLimit == 0) {
2156+
tx.commit();
2157+
tx = new TransactionImpl(storageManager);
2158+
txCount = 0;
21222159
}
2123-
txCount++;
2124-
count++;
2125-
} else {
2126-
addTail(reference, false);
21272160
}
21282161

2129-
if (txCount > 0 && txCount % flushLimit == 0) {
2130-
tx.commit();
2131-
tx = new TransactionImpl(storageManager);
2132-
txCount = 0;
2162+
} finally {
2163+
if (!allowDepaging) {
2164+
// close the iterator if not allowed to depage
2165+
theIterator.close();
21332166
}
21342167
}
2168+
21352169
}
21362170

21372171
if (txCount > 0) {
@@ -2441,7 +2475,7 @@ public void run() {
24412475
@Override
24422476
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
24432477

2444-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2478+
return iterQueue("sendMessageToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
24452479

24462480
@Override
24472481
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2450,14 +2484,14 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24502484
return true;
24512485
}
24522486

2453-
}) == 1;
2487+
}, false) == 1;
24542488

24552489
}
24562490

24572491
@Override
24582492
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
24592493

2460-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2494+
return iterQueue("sendMessagesToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24612495

24622496
@Override
24632497
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2466,7 +2500,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24662500
return true;
24672501
}
24682502

2469-
});
2503+
}, false);
24702504

24712505
}
24722506

@@ -2476,7 +2510,7 @@ public boolean moveReference(final long messageID,
24762510
final Binding binding,
24772511
final boolean rejectDuplicate) throws Exception {
24782512

2479-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2513+
return iterQueue("moveReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
24802514

24812515
@Override
24822516
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2485,7 +2519,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
24852519
return true;
24862520
}
24872521

2488-
}) == 1;
2522+
}, false) == 1;
24892523

24902524
}
24912525

@@ -2513,7 +2547,7 @@ public int moveReferences(final int flushLimit,
25132547
final Integer expectedHits = messageCount > 0 ? messageCount : null;
25142548
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
25152549

2516-
return iterQueue(flushLimit, filter, new QueueIterateAction(expectedHits) {
2550+
return iterQueue("moveReferences", flushLimit, filter, new QueueIterateAction(expectedHits) {
25172551
@Override
25182552
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25192553
boolean ignored = false;
@@ -2537,37 +2571,37 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25372571

25382572
return true;
25392573
}
2540-
});
2574+
}, false);
25412575
}
25422576

25432577
public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
2544-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
2578+
return iterQueue("moveReferencesBetweenSnFQueues", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
25452579
@Override
25462580
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25472581
return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
25482582
}
2549-
});
2583+
}, false);
25502584
}
25512585

25522586
@Override
25532587
public boolean copyReference(final long messageID,
25542588
final SimpleString toQueue,
25552589
final Binding binding) throws Exception {
25562590

2557-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2591+
return iterQueue("copyReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
25582592

25592593
@Override
25602594
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25612595
copy(tx, toQueue, binding, ref);
25622596
return false;
25632597
}
25642598

2565-
}) == 1;
2599+
}, false) == 1;
25662600

25672601
}
25682602

25692603
public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
2570-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2604+
return iterQueue("rerouteMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
25712605
@Override
25722606
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
25732607
RoutingContext routingContext = new RoutingContextImpl(tx);
@@ -2579,7 +2613,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25792613
postOffice.processRoute(m, routingContext, false);
25802614
return false;
25812615
}
2582-
});
2616+
}, false);
25832617
}
25842618

25852619
@Override
@@ -2592,7 +2626,7 @@ public int retryMessages(Filter filter, Integer expectedHits) throws Exception {
25922626

25932627
final HashMap<String, Long> queues = new HashMap<>();
25942628

2595-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
2629+
return iterQueue("retryMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
25962630

25972631
@Override
25982632
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2630,37 +2664,37 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26302664
}
26312665
return false;
26322666
}
2633-
});
2667+
}, false);
26342668

26352669
}
26362670

26372671
@Override
26382672
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
26392673

2640-
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2674+
return iterQueue("changeReferencePriority", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
26412675

26422676
@Override
26432677
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
26442678
ref.getMessage().setPriority(newPriority);
26452679
return false;
26462680
}
26472681

2648-
}) == 1;
2682+
}, true) == 1;
26492683

26502684
}
26512685

26522686
@Override
26532687
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
26542688

2655-
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2689+
return iterQueue("changeReferencesPriority", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
26562690

26572691
@Override
26582692
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
26592693
ref.getMessage().setPriority(newPriority);
26602694
return false;
26612695
}
26622696

2663-
});
2697+
}, true);
26642698

26652699
}
26662700

0 commit comments

Comments
 (0)