Skip to content

Commit e7df028

Browse files
tabish121gemmellr
authored andcommitted
ARTEMIS-5914 Bypass link creation in remote federation when possible
Bypass link creation from the target broker back to the source when remote federation is configured if the outcome is known to be a failure due to the federation user not having access to send into the queue on the remote.
1 parent 6b455f4 commit e7df028

File tree

8 files changed

+651
-11
lines changed

8 files changed

+651
-11
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,18 @@ synchronized void processRemoteQueueAdded(String addressName, String queueName)
537537
});
538538
}
539539

540+
/**
541+
* {@return true if this is the federation source instance, false if it is the target}
542+
*/
543+
abstract boolean isFederationSource();
544+
545+
/**
546+
* {@return true if this is the federation target instance, false if it is the source}
547+
*/
548+
final boolean isFederationTarget() {
549+
return !isFederationSource();
550+
}
551+
540552
/**
541553
* Error signaling API that must be implemented by the specific federation implementation to handle error when
542554
* creating a federation resource such as an outgoing receiver link.

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,21 @@
3232

3333
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
3434
import org.apache.activemq.artemis.api.core.ActiveMQException;
35+
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
3536
import org.apache.activemq.artemis.api.core.RoutingType;
3637
import org.apache.activemq.artemis.api.core.SimpleString;
3738
import org.apache.activemq.artemis.core.filter.Filter;
3839
import org.apache.activemq.artemis.core.postoffice.Binding;
3940
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
4041
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
42+
import org.apache.activemq.artemis.core.security.CheckType;
4143
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
4244
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
4345
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
4446
import org.apache.activemq.artemis.core.transaction.Transaction;
4547
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
4648
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
49+
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
4750
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
4851
import org.apache.activemq.artemis.utils.CompositeAddress;
4952
import org.slf4j.Logger;
@@ -251,6 +254,23 @@ private void checkBindingForMatch(Binding binding) {
251254
return;
252255
}
253256

257+
// Target brokers which have been sent remote federation policies might not have write
258+
// access via the logged in user to the address with demand which we are attempting to
259+
// federate messages to so instead of creating a receiver that will fail when the remote
260+
// routes a message to it we can just omit creating the link in the first place.
261+
if (federation.isFederationTarget()) {
262+
try {
263+
session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
264+
} catch (ActiveMQSecurityException e) {
265+
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
266+
addressInfo.getName().toString(), "User does not have send permission to configured address.");
267+
return;
268+
} catch (Exception ex) {
269+
logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
270+
return;
271+
}
272+
}
273+
254274
createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding);
255275
} else {
256276
reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
@@ -289,6 +309,23 @@ private void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding divertBindi
289309
return;
290310
}
291311

312+
// Target brokers which have been sent remote federation policies might not have write access
313+
// via the logged in user to the address this divert is attached to which means we don't need
314+
// to track this divert. Since we don't add the divert to the tracking map future demand on
315+
// divert that would otherwise match the address includes won't trigger federation attempts.
316+
if (federation.isFederationTarget()) {
317+
try {
318+
session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
319+
} catch (ActiveMQSecurityException e) {
320+
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
321+
addressInfo.getName().toString(), "User does not have send permission to configured address.");
322+
return;
323+
} catch (Exception ex) {
324+
logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
325+
return;
326+
}
327+
}
328+
292329
// We only need to check if we've never seen the divert before, afterwards we will
293330
// be checking it any time a new QueueBinding is added instead.
294331
if (!divertsTracking.containsKey(divertBinding)) {
@@ -337,15 +374,15 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding
337374

338375
final SimpleString queueAddress = queueBinding.getAddress();
339376

340-
divertsTracking.entrySet().forEach((e) -> {
341-
final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress();
342-
final DivertBinding divertBinding = e.getKey();
377+
divertsTracking.entrySet().forEach((divert) -> {
378+
final SimpleString forwardAddress = divert.getKey().getDivert().getForwardAddress();
379+
final DivertBinding divertBinding = divert.getKey();
343380

344381
// Check matched diverts to see if the QueueBinding address matches the address or
345382
// addresses (composite diverts) of the Divert and if so then we can check if we need
346383
// to create demand on the source address on the remote if we haven't done so already.
347384

348-
if (!e.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
385+
if (!divert.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
349386
// The plugin can block the demand totally here either based on the divert itself
350387
// or the queue that's attached to the divert.
351388
if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) {
@@ -360,7 +397,7 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding
360397

361398
// Each divert that forwards to the address the queue is bound to we add demand
362399
// in the diverts tracker.
363-
e.getValue().add(queueBinding);
400+
divert.getValue().add(queueBinding);
364401

365402
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress());
366403

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828

2929
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
3030
import org.apache.activemq.artemis.api.core.ActiveMQException;
31+
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
32+
import org.apache.activemq.artemis.api.core.SimpleString;
3133
import org.apache.activemq.artemis.core.filter.Filter;
3234
import org.apache.activemq.artemis.core.postoffice.Binding;
3335
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
36+
import org.apache.activemq.artemis.core.security.CheckType;
3437
import org.apache.activemq.artemis.core.server.Queue;
3538
import org.apache.activemq.artemis.core.server.ServerConsumer;
3639
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -41,6 +44,7 @@
4144
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
4245
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
4346
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
47+
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
4448
import org.apache.activemq.artemis.utils.CompositeAddress;
4549
import org.slf4j.Logger;
4650
import org.slf4j.LoggerFactory;
@@ -164,8 +168,9 @@ private void checkQueueForMatch(Queue queue) {
164168

165169
private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
166170
final String queueName = consumer.getQueue().getName().toString();
171+
final SimpleString addressName = consumer.getQueueAddress();
167172

168-
if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) {
173+
if (testIfQueueMatchesPolicy(addressName.toString(), queueName)) {
169174
final boolean federationConsumer = isFederationConsumer(consumer);
170175

171176
// We should ignore federation consumers from remote peers but configuration does allow
@@ -175,6 +180,23 @@ private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
175180
return;
176181
}
177182

183+
// Target brokers which have been sent remote federation policies might not have write
184+
// access via the logged in user to the queue with demand which we are attempting to
185+
// federate messages to so instead of creating a receiver that will fail when the remote
186+
// routes a message to it we can just omit creating the link in the first place.
187+
if (federation.isFederationTarget()) {
188+
try {
189+
session.getSessionSPI().check(addressName, consumer.getQueue().getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
190+
} catch (ActiveMQSecurityException e) {
191+
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedQueueFederation(
192+
queueName, "User does not have send permission to configured queue.");
193+
return;
194+
} catch (Exception ex) {
195+
logger.warn("Caught unknown exception from security check on queue:{} send permissions: cannot federate:", queueName, ex);
196+
return;
197+
}
198+
}
199+
178200
logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding());
179201

180202
final AMQPFederationQueueConsumerManager entry;

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ protected void signalError(Exception cause) {
294294
brokerConnection.runtimeError(cause);
295295
}
296296

297+
@Override
298+
boolean isFederationSource() {
299+
return true;
300+
}
301+
297302
@Override
298303
void registerFederationManagement() throws Exception {
299304
AMQPFederationManagementSupport.registerFederationSource(this);

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ protected void signalError(Exception cause) {
142142
connection.close(new ErrorCondition(condition, description));
143143
}
144144

145+
@Override
146+
boolean isFederationSource() {
147+
return false;
148+
}
149+
145150
@Override
146151
void registerFederationManagement() throws Exception {
147152
if (brokerConnection.isManagable()) {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,10 @@ public interface ActiveMQAMQPProtocolLogger {
8282

8383
@LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on address {}, queueID={}", level = LogMessage.Level.WARN)
8484
void ackRetryFailed(Object ackRetryInformation, Object address, long queueID);
85+
86+
@LogMessage(id = 111013, value = "AMQP Federation target skipped federation of address {}, reason={}", level = LogMessage.Level.WARN)
87+
void federationTargetSkippedAddressFederation(String address, String reason);
88+
89+
@LogMessage(id = 111014, value = "AMQP Federation target skipped federation of queue {}, reason={}", level = LogMessage.Level.WARN)
90+
void federationTargetSkippedQueueFederation(String address, String reason);
8591
}

0 commit comments

Comments
 (0)