Skip to content

Commit 9c1d1cc

Browse files
committed
Fix race condition in async exception handling by ensuring exception notifications are not dropped when executor is terminated
1 parent 50119e0 commit 9c1d1cc

File tree

1 file changed

+42
-13
lines changed

1 file changed

+42
-13
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,25 +2029,45 @@ public void onClientInternalException(final Throwable error) {
20292029
* @param error
20302030
*/
20312031
public void onAsyncException(Throwable error) {
2032-
if (!closed.get() && !closing.get()) {
2033-
if (this.exceptionListener != null) {
2034-
2035-
if (!(error instanceof JMSException)) {
2036-
error = JMSExceptionSupport.create(error);
2032+
if (this.exceptionListener != null) {
2033+
if (!(error instanceof JMSException)) {
2034+
error = JMSExceptionSupport.create(error);
2035+
}
2036+
final JMSException e = (JMSException) error;
2037+
// Submit directly to executor bypassing closed/closing guards
2038+
// to ensure exception notifications are never silently dropped
2039+
try {
2040+
executor.execute(() -> exceptionListener.onException(e));
2041+
} catch (final RejectedExecutionException re) {
2042+
LOG.debug("Could not notify exception listener asynchronously (executor terminated), notifying inline: {}", error.getMessage());
2043+
try {
2044+
exceptionListener.onException(e);
2045+
} catch (final Exception ex) {
2046+
LOG.debug("Exception during inline ExceptionListener notification", ex);
20372047
}
2038-
final JMSException e = (JMSException) error;
2039-
executeAsync(() -> exceptionListener.onException(e));
2040-
2041-
} else {
2042-
LOG.debug("Async exception with no exception listener: {}", error, error);
20432048
}
2049+
} else {
2050+
LOG.debug("Async exception with no exception listener: {}", error, error);
20442051
}
20452052
}
20462053

20472054
@Override
20482055
public void onException(final IOException error) {
2049-
onAsyncException(error);
2050-
executeAsync(() -> {
2056+
// Combine JMS ExceptionListener and TransportListener notifications
2057+
// into a single async task to prevent a race condition where the
2058+
// ExceptionListener (e.g. ConnectionPool) closes the connection and
2059+
// shuts down the executor before the TransportListener task is queued.
2060+
final Runnable exceptionTask = () -> {
2061+
// Notify JMS ExceptionListener first (same as onAsyncException)
2062+
if (exceptionListener != null) {
2063+
try {
2064+
final JMSException jmsError = JMSExceptionSupport.create(error);
2065+
exceptionListener.onException(jmsError);
2066+
} catch (final Exception e) {
2067+
LOG.debug("Exception during JMS ExceptionListener notification", e);
2068+
}
2069+
}
2070+
20512071
transportFailed(error);
20522072
ServiceSupport.dispose(ActiveMQConnection.this.transport);
20532073
brokerInfoReceived.countDown();
@@ -2059,7 +2079,16 @@ public void onException(final IOException error) {
20592079
for (final TransportListener listener : transportListeners) {
20602080
listener.onException(error);
20612081
}
2062-
});
2082+
};
2083+
2084+
// Submit directly to executor bypassing closed/closing guards
2085+
// to ensure transport failure handling is never silently dropped
2086+
try {
2087+
executor.execute(exceptionTask);
2088+
} catch (final RejectedExecutionException e) {
2089+
LOG.debug("Could not execute exception task asynchronously (executor terminated), executing inline");
2090+
exceptionTask.run();
2091+
}
20632092
}
20642093

20652094
@Override

0 commit comments

Comments
 (0)