Skip to content

Commit 62a99db

Browse files
authored
GH-6792: Add headers mapping to JMS channels (#10838)
* GH-6792: Add headers mapping to JMS channels Fixes: #6792 Currently, by default `AbstractJmsChannel` implementations use Java serialization for storing `Message<?>` instance into JMS destination. The deserialized message is produced from the channel as is. There might be use-cases when some JMS properties could be useful in downstream flows. * Add `DefaultJmsHeaderMapper` into the `AbstractJmsChannel` as a conditional logic to map headers to/from JMS when `jmsTemplate.getMessageConverter()` is not a `MessagingMessageConverter`. Otherwise, then conversion of the message and header mapping is done in that converter * Refactor logic in the `AbstractJmsChannel` to common API * Fix Nullability formatting in the `PollableJmsChannel` * Use lambda style for `MessageListener` in the `SubscribableJmsChannel` instead of inner extra class * Ensure in the `PollableJmsChannelTests` and `SubscribableJmsChannelTests` that header mapper does it job * Fix `JmsChannelParserTests` to not assert on the `messageBuilderFactory` since now a `MessageListener` in the `SubscribableJmsChannel` is a direct method of the class * Fix `JmsChannelHistoryTests` verification according to a new internal logic of the `AbstractJmsChannel` * Document new feature in the `jms.adoc` and `whats-new.adoc` # Conflicts: # src/reference/antora/modules/ROOT/pages/whats-new.adoc * More fixes and cleanup for JMS Channels * Use lambda-based `logger.debug()` in the `PollableJmsChannel` instead of `if (logger.isDebugEnabled()) {`: the `message` variable is now effectively `final` * Remove redundant null checks from the `SubscribableJmsChannel` * Add more header tests into the `PollableJmsChannelTests` & `SubscribableJmsChannelTests` * Clean up `JmsChannelParserTests` and verify the `messageBuilderFactory` settings
1 parent 60f4a73 commit 62a99db

File tree

9 files changed

+230
-237
lines changed

9 files changed

+230
-237
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/channel/AbstractJmsChannel.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,26 @@
1616

1717
package org.springframework.integration.jms.channel;
1818

19+
import jakarta.jms.JMSException;
20+
1921
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2022
import org.springframework.integration.channel.AbstractMessageChannel;
23+
import org.springframework.integration.jms.DefaultJmsHeaderMapper;
2124
import org.springframework.integration.jms.DynamicJmsTemplateProperties;
2225
import org.springframework.jms.core.JmsTemplate;
26+
import org.springframework.jms.support.converter.MessageConverter;
27+
import org.springframework.jms.support.converter.MessagingMessageConverter;
2328
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.MessageHeaders;
30+
import org.springframework.messaging.MessagingException;
2431
import org.springframework.util.Assert;
2532

2633
/**
2734
* A base {@link AbstractMessageChannel} implementation for JMS-backed message channels.
2835
*
2936
* @author Mark Fisher
3037
* @author Gary Russell
38+
* @author Artem Bilan
3139
*
3240
* @since 7.0
3341
*
@@ -36,27 +44,62 @@
3644
*/
3745
public abstract class AbstractJmsChannel extends AbstractMessageChannel {
3846

39-
private final JmsTemplate jmsTemplate;
47+
protected final DefaultJmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();
48+
49+
protected final JmsTemplate jmsTemplate;
4050

4151
public AbstractJmsChannel(JmsTemplate jmsTemplate) {
4252
Assert.notNull(jmsTemplate, "jmsTemplate must not be null");
4353
this.jmsTemplate = jmsTemplate;
4454
}
4555

46-
JmsTemplate getJmsTemplate() {
47-
return this.jmsTemplate;
48-
}
49-
5056
@Override
5157
protected boolean doSend(Message<?> message, long timeout) {
5258
try {
5359
DynamicJmsTemplateProperties.setPriority(new IntegrationMessageHeaderAccessor(message).getPriority());
54-
this.jmsTemplate.convertAndSend(message);
60+
MessageConverter messageConverter = this.jmsTemplate.getMessageConverter();
61+
this.jmsTemplate.send((session) -> {
62+
jakarta.jms.Message jmsMessage = messageConverter.toMessage(message, session);
63+
if (!(messageConverter instanceof MessagingMessageConverter)) {
64+
MessageHeaders headers = message.getHeaders();
65+
this.headerMapper.fromHeaders(headers, jmsMessage);
66+
}
67+
return jmsMessage;
68+
});
5569
}
5670
finally {
5771
DynamicJmsTemplateProperties.clearPriority();
5872
}
5973
return true;
6074
}
6175

76+
protected Message<?> fromJmsMessage(jakarta.jms.Message message) {
77+
MessageConverter converter = this.jmsTemplate.getMessageConverter();
78+
try {
79+
Object converted = converter.fromMessage(message);
80+
Message<?> messageToSend;
81+
if (converted instanceof Message<?> convertedMessage) {
82+
messageToSend = convertedMessage;
83+
if (!(converter instanceof MessagingMessageConverter)) {
84+
messageToSend = getMessageBuilderFactory()
85+
.fromMessage(messageToSend)
86+
.copyHeadersIfAbsent(this.headerMapper.toHeaders(message))
87+
.build();
88+
}
89+
}
90+
else {
91+
messageToSend = getMessageBuilderFactory()
92+
.withPayload(converted)
93+
.copyHeaders(this.headerMapper.toHeaders(message))
94+
.build();
95+
}
96+
97+
return messageToSend;
98+
}
99+
catch (JMSException ex) {
100+
throw new MessagingException("failed to convert incoming JMS Message", ex);
101+
}
102+
103+
}
104+
62105
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/channel/PollableJmsChannel.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ public void setMessageSelector(String messageSelector) {
6161
}
6262

6363
@Override
64-
@Nullable
65-
public Message<?> receive(long timeout) {
64+
public @Nullable Message<?> receive(long timeout) {
6665
try {
6766
DynamicJmsTemplateProperties.setReceiveTimeout(timeout);
6867
return receive();
@@ -73,8 +72,7 @@ public Message<?> receive(long timeout) {
7372
}
7473

7574
@Override
76-
@Nullable
77-
public Message<?> receive() {
75+
public @Nullable Message<?> receive() {
7876
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
7977
Deque<ChannelInterceptor> interceptorStack = null;
8078
boolean counted = false;
@@ -109,23 +107,22 @@ public Message<?> receive() {
109107
}
110108
}
111109

112-
@Nullable
113-
private Message<?> receiveAndConvertToMessage() {
114-
Object object;
110+
private @Nullable Message<?> receiveAndConvertToMessage() {
111+
jakarta.jms.Message jmsMessage;
115112
if (this.messageSelector == null) {
116-
object = getJmsTemplate().receiveAndConvert();
113+
jmsMessage = this.jmsTemplate.receive();
117114
}
118115
else {
119-
object = getJmsTemplate().receiveSelectedAndConvert(this.messageSelector);
116+
jmsMessage = this.jmsTemplate.receiveSelected(this.messageSelector);
120117
}
121118

122-
if (object == null) {
119+
if (jmsMessage == null) {
123120
logger.trace(() -> "postReceive on channel '" + this + "', message is null");
124121
return null;
125122
}
126-
Message<?> message = object instanceof Message<?> msg
127-
? msg
128-
: getMessageBuilderFactory().withPayload(object).build();
123+
124+
Message<?> message = fromJmsMessage(jmsMessage);
125+
129126
logger.debug(() -> "postReceive on channel '" + this + "', message: " + message);
130127

131128
return message;

spring-integration-jms/src/main/java/org/springframework/integration/jms/channel/SubscribableJmsChannel.java

Lines changed: 20 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,19 @@
1717
package org.springframework.integration.jms.channel;
1818

1919
import jakarta.jms.MessageListener;
20-
import org.apache.commons.logging.Log;
21-
import org.apache.commons.logging.LogFactory;
2220

2321
import org.springframework.integration.MessageDispatchingException;
2422
import org.springframework.integration.channel.BroadcastCapableChannel;
2523
import org.springframework.integration.dispatcher.AbstractDispatcher;
2624
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
27-
import org.springframework.integration.dispatcher.MessageDispatcher;
2825
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2926
import org.springframework.integration.dispatcher.UnicastingDispatcher;
30-
import org.springframework.integration.support.MessageBuilderFactory;
3127
import org.springframework.integration.support.management.ManageableSmartLifecycle;
3228
import org.springframework.jms.core.JmsTemplate;
3329
import org.springframework.jms.listener.AbstractMessageListenerContainer;
34-
import org.springframework.jms.support.converter.MessageConverter;
3530
import org.springframework.messaging.Message;
3631
import org.springframework.messaging.MessageDeliveryException;
3732
import org.springframework.messaging.MessageHandler;
38-
import org.springframework.messaging.MessagingException;
3933
import org.springframework.util.Assert;
4034

4135
/**
@@ -79,15 +73,11 @@ public void setMaxSubscribers(int maxSubscribers) {
7973

8074
@Override
8175
public boolean subscribe(MessageHandler handler) {
82-
Assert.state(this.dispatcher != null,
83-
"'MessageDispatcher' must not be null. This channel might not have been initialized");
8476
return this.dispatcher.addHandler(handler);
8577
}
8678

8779
@Override
8880
public boolean unsubscribe(MessageHandler handler) {
89-
Assert.state(this.dispatcher != null,
90-
"'MessageDispatcher' must not be null. This channel might not have been initialized");
9181
return this.dispatcher.removeHandler(handler);
9282
}
9383

@@ -102,12 +92,8 @@ public void onInit() {
10292
return;
10393
}
10494
super.onInit();
105-
boolean isPubSub = isBroadcast();
106-
configureDispatcher(isPubSub);
107-
MessageListener listener =
108-
new DispatchingMessageListener(getJmsTemplate(), this.dispatcher, this, isPubSub,
109-
getMessageBuilderFactory());
110-
this.container.setMessageListener(listener);
95+
configureDispatcher(isBroadcast());
96+
this.container.setMessageListener((MessageListener) this::receiveAndDispatch);
11197
if (!this.container.isActive()) {
11298
this.container.afterPropertiesSet();
11399
}
@@ -140,115 +126,54 @@ private void configureDispatcher(boolean isPubSub) {
140126

141127
@Override
142128
public boolean isAutoStartup() {
143-
return (this.container != null) && this.container.isAutoStartup();
129+
return this.container.isAutoStartup();
144130
}
145131

146132
@Override
147133
public int getPhase() {
148-
return (this.container != null) ? this.container.getPhase() : 0;
134+
return this.container.getPhase();
149135
}
150136

151137
@Override
152138
public boolean isRunning() {
153-
return (this.container != null) && this.container.isRunning();
139+
return this.container.isRunning();
154140
}
155141

156142
@Override
157143
public void start() {
158-
if (this.container != null) {
159-
this.container.start();
160-
}
144+
this.container.start();
161145
}
162146

163147
@Override
164148
public void stop() {
165-
if (this.container != null) {
166-
this.container.stop();
167-
}
149+
this.container.stop();
168150
}
169151

170152
@Override
171153
public void stop(Runnable callback) {
172-
if (this.container != null) {
173-
this.container.stop(callback);
174-
}
175-
else {
176-
callback.run();
177-
}
154+
this.container.stop(callback);
178155
}
179156

180157
@Override
181158
public void destroy() {
182-
if (this.container != null) {
183-
this.container.destroy();
184-
}
159+
this.container.destroy();
185160
}
186161

187-
private static final class DispatchingMessageListener implements MessageListener {
188-
189-
private final Log logger = LogFactory.getLog(this.getClass());
190-
191-
private final JmsTemplate jmsTemplate;
192-
193-
private final MessageDispatcher dispatcher;
194-
195-
private final SubscribableJmsChannel channel;
196-
197-
private final boolean isPubSub;
198-
199-
private final MessageBuilderFactory messageBuilderFactory;
200-
201-
DispatchingMessageListener(JmsTemplate jmsTemplate,
202-
MessageDispatcher dispatcher, SubscribableJmsChannel channel, boolean isPubSub,
203-
MessageBuilderFactory messageBuilderFactory) {
204-
205-
this.jmsTemplate = jmsTemplate;
206-
this.dispatcher = dispatcher;
207-
this.channel = channel;
208-
this.isPubSub = isPubSub;
209-
this.messageBuilderFactory = messageBuilderFactory;
162+
private void receiveAndDispatch(jakarta.jms.Message message) {
163+
Message<?> messageToSend = fromJmsMessage(message);
164+
try {
165+
this.dispatcher.dispatch(messageToSend);
210166
}
211-
212-
@SuppressWarnings("NullAway") // Dataflow analysis limitation
213-
@Override
214-
public void onMessage(jakarta.jms.Message message) {
215-
Message<?> messageToSend = null;
216-
try {
217-
MessageConverter converter = this.jmsTemplate.getMessageConverter();
218-
Object converted = null;
219-
if (converter != null) {
220-
converted = converter.fromMessage(message);
221-
}
222-
if (converted != null) {
223-
messageToSend =
224-
converted instanceof Message<?> convertedMessage
225-
? convertedMessage
226-
: this.messageBuilderFactory.withPayload(converted).build();
227-
this.dispatcher.dispatch(messageToSend);
228-
}
229-
else if (this.logger.isWarnEnabled()) {
230-
this.logger.warn("No converter found, or converter returned null for: " + message
231-
+ ", no Message to dispatch");
232-
}
167+
catch (MessageDispatchingException ex) {
168+
String exceptionMessage = ex.getMessage() + " for jms-channel '" + this.getFullChannelName() + "'.";
169+
if (isBroadcast()) {
170+
// log only for backwards compatibility with pub/sub
171+
this.logger.warn(ex, exceptionMessage);
233172
}
234-
catch (MessageDispatchingException ex) {
235-
String exceptionMessage = ex.getMessage() + " for jms-channel '"
236-
+ this.channel.getFullChannelName() + "'.";
237-
if (this.isPubSub) {
238-
// log only for backwards compatibility with pub/sub
239-
if (this.logger.isWarnEnabled()) {
240-
this.logger.warn(exceptionMessage, ex);
241-
}
242-
}
243-
else {
244-
throw new MessageDeliveryException(messageToSend, exceptionMessage, ex);
245-
}
246-
}
247-
catch (Exception ex) {
248-
throw new MessagingException("failed to handle incoming JMS Message", ex);
173+
else {
174+
throw new MessageDeliveryException(messageToSend, exceptionMessage, ex);
249175
}
250176
}
251-
252177
}
253178

254179
}

0 commit comments

Comments
 (0)