diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java index 594896a966d..c77914b11b2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java @@ -537,6 +537,18 @@ synchronized void processRemoteQueueAdded(String addressName, String queueName) }); } + /** + * {@return true if this is the federation source instance, false if it is the target} + */ + abstract boolean isFederationSource(); + + /** + * {@return true if this is the federation target instance, false if it is the source} + */ + final boolean isFederationTarget() { + return !isFederationSource(); + } + /** * Error signaling API that must be implemented by the specific federation implementation to handle error when * creating a federation resource such as an outgoing receiver link. diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java index bb2852bc331..6db4d5c2c26 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java @@ -32,18 +32,21 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; +import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; @@ -251,6 +254,23 @@ private void checkBindingForMatch(Binding binding) { return; } + // Target brokers which have been sent remote federation policies might not have write + // access via the logged in user to the address with demand which we are attempting to + // federate messages to so instead of creating a receiver that will fail when the remote + // routes a message to it we can just omit creating the link in the first place. + if (federation.isFederationTarget()) { + try { + session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth()); + } catch (ActiveMQSecurityException e) { + ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation( + addressInfo.getName().toString(), "User does not have send permission to configured address."); + return; + } catch (Exception ex) { + logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex); + return; + } + } + createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding); } else { reactIfQueueBindingMatchesAnyDivertTarget(queueBinding); @@ -289,6 +309,23 @@ private void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding divertBindi return; } + // Target brokers which have been sent remote federation policies might not have write access + // via the logged in user to the address this divert is attached to which means we don't need + // to track this divert. Since we don't add the divert to the tracking map future demand on + // divert that would otherwise match the address includes won't trigger federation attempts. + if (federation.isFederationTarget()) { + try { + session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth()); + } catch (ActiveMQSecurityException e) { + ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation( + addressInfo.getName().toString(), "User does not have send permission to configured address."); + return; + } catch (Exception ex) { + logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex); + return; + } + } + // We only need to check if we've never seen the divert before, afterwards we will // be checking it any time a new QueueBinding is added instead. if (!divertsTracking.containsKey(divertBinding)) { @@ -337,15 +374,15 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding final SimpleString queueAddress = queueBinding.getAddress(); - divertsTracking.entrySet().forEach((e) -> { - final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress(); - final DivertBinding divertBinding = e.getKey(); + divertsTracking.entrySet().forEach((divert) -> { + final SimpleString forwardAddress = divert.getKey().getDivert().getForwardAddress(); + final DivertBinding divertBinding = divert.getKey(); // Check matched diverts to see if the QueueBinding address matches the address or // addresses (composite diverts) of the Divert and if so then we can check if we need // to create demand on the source address on the remote if we haven't done so already. - if (!e.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) { + if (!divert.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) { // The plugin can block the demand totally here either based on the divert itself // or the queue that's attached to the divert. if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) { @@ -360,7 +397,7 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding // Each divert that forwards to the address the queue is bound to we add demand // in the diverts tracker. - e.getValue().add(queueBinding); + divert.getValue().add(queueBinding); final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java index 375c5a49022..041e75a1146 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java @@ -28,9 +28,12 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; @@ -41,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,8 +168,9 @@ private void checkQueueForMatch(Queue queue) { private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { final String queueName = consumer.getQueue().getName().toString(); + final SimpleString addressName = consumer.getQueueAddress(); - if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) { + if (testIfQueueMatchesPolicy(addressName.toString(), queueName)) { final boolean federationConsumer = isFederationConsumer(consumer); // We should ignore federation consumers from remote peers but configuration does allow @@ -175,6 +180,23 @@ private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { return; } + // Target brokers which have been sent remote federation policies might not have write + // access via the logged in user to the queue with demand which we are attempting to + // federate messages to so instead of creating a receiver that will fail when the remote + // routes a message to it we can just omit creating the link in the first place. + if (federation.isFederationTarget()) { + try { + session.getSessionSPI().check(addressName, consumer.getQueue().getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth()); + } catch (ActiveMQSecurityException e) { + ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedQueueFederation( + queueName, "User does not have send permission to configured queue."); + return; + } catch (Exception ex) { + logger.warn("Caught unknown exception from security check on queue:{} send permissions: cannot federate:", queueName, ex); + return; + } + } + logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding()); final AMQPFederationQueueConsumerManager entry; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java index c48e0afd1ee..14e4caa7e92 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java @@ -294,6 +294,11 @@ protected void signalError(Exception cause) { brokerConnection.runtimeError(cause); } + @Override + boolean isFederationSource() { + return true; + } + @Override void registerFederationManagement() throws Exception { AMQPFederationManagementSupport.registerFederationSource(this); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java index aa0a69ebf16..39d8664f0ce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java @@ -142,6 +142,11 @@ protected void signalError(Exception cause) { connection.close(new ErrorCondition(condition, description)); } + @Override + boolean isFederationSource() { + return false; + } + @Override void registerFederationManagement() throws Exception { if (brokerConnection.isManagable()) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java index c5162161e7f..890cd48664e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java @@ -82,4 +82,10 @@ public interface ActiveMQAMQPProtocolLogger { @LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on address {}, queueID={}", level = LogMessage.Level.WARN) void ackRetryFailed(Object ackRetryInformation, Object address, long queueID); + + @LogMessage(id = 111013, value = "AMQP Federation target skipped federation of address {}, reason={}", level = LogMessage.Level.WARN) + void federationTargetSkippedAddressFederation(String address, String reason); + + @LogMessage(id = 111014, value = "AMQP Federation target skipped federation of queue {}, reason={}", level = LogMessage.Level.WARN) + void federationTargetSkippedQueueFederation(String address, String reason); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index 4c678e339b3..167e0075cde 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -29,9 +29,11 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_NAME; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_POLICY_NAME; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V1; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2; @@ -68,15 +70,16 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; - import java.lang.invoke.MethodHandles; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -106,12 +109,14 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; +import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -123,6 +128,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.Wait; @@ -154,6 +160,12 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected String federationUser = "fed_user"; + protected String federationPass = "fed_pass"; + + protected String demandUser = "demand_user"; + protected String demandPass = "demand_pass"; + @Override protected String getConfiguredProtocols() { return "AMQP,CORE"; @@ -7033,6 +7045,305 @@ private void doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalD } } + @Test + @Timeout(20) + public void testRemoteFederationDoesNotCreateLinksForAddressItCannotWriteTo() throws Exception { + final String allowedAddress = getTestName(); + final String restrictedAddress = getTestName() + "restricted"; // Federation user cannot send here. + + configureSecurity(server, allowedAddress, restrictedAddress); + server.start(); + + final Set includes = new HashSet<>(); + includes.add(allowedAddress); + includes.add(restrictedAddress); + + final Map properties = new HashMap<>(); + properties.put(ADDRESS_RECEIVER_IDLE_TIMEOUT, 1); + + final FederationReceiveFromAddressPolicy policy = + new FederationReceiveFromAddressPolicy("test-address-policy", + true, 30_000L, 1000L, 1, false, + includes, null, properties, null, + DEFAULT_WILDCARD_CONFIGURATION); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, federationUser, federationPass, "test", true); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withSettled(true).withState().accepted(); + + sendAddresPolicyToRemote(peer, policy); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(startsWith(allowedAddress)) + .and() + .respondInKind(); // Server detected demand + peer.expectFlow().withLinkCredit(1000); + peer.remoteTransfer().withBody().withString("test-message") + .also() + .withDeliveryId(1) + .queue(); + peer.expectDisposition().withSettled(true).withState().accepted(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Demand on the unrestricted address should generate federation links and move the message + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(allowedAddress)); + + connection.start(); + + final Message message = consumer.receive(5_000); + assertNotNull(message); + assertInstanceOf(TextMessage.class, message); + assertEquals("test-message", ((TextMessage) message).getText()); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectFlow().withLinkCredit(999).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach(); // demand will be gone and receiver link should close. + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand on the restricted address should result in no federation links being attempted. + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(restrictedAddress)); + + connection.start(); + + final Message message = consumer.receiveNoWait(); + assertNull(message); + } + + // This would fail if the server tries creating a federation link + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteFederationDoesNotCreateLinksForAddressItCannotWriteToThatHasDemandOnEnabledDivert() throws Exception { + final String fowardAllowed = "forward-allowed"; + final String fowardRestricted = "forward-restricted"; + final String allowedAddress = getTestName(); + final String restrictedAddress = getTestName() + "restricted"; // Federation user cannot send here. + + configureSecurity(server, allowedAddress, restrictedAddress, fowardAllowed, fowardRestricted); + server.start(); + + final DivertConfiguration divertConfig1 = new DivertConfiguration().setAddress(allowedAddress) + .setForwardingAddress(fowardAllowed) + .setName("test-divert-1"); + final DivertConfiguration divertConfig2 = new DivertConfiguration().setAddress(restrictedAddress) + .setForwardingAddress(fowardRestricted) + .setName("test-divert-2"); + + server.deployDivert(divertConfig1); + server.deployDivert(divertConfig2); + + // The addresses need to exist before divert tracking engages in the address policy manager. + server.addAddressInfo(new AddressInfo(SimpleString.of(allowedAddress), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.of(restrictedAddress), RoutingType.MULTICAST)); + + final Set includes = new HashSet<>(); + includes.add(allowedAddress); + includes.add(restrictedAddress); + + final Map properties = new HashMap<>(); + properties.put(ADDRESS_RECEIVER_IDLE_TIMEOUT, 1); + + final FederationReceiveFromAddressPolicy policy = + new FederationReceiveFromAddressPolicy("test-address-policy", + true, 30_000L, 1000L, 1, true, + includes, null, properties, null, + DEFAULT_WILDCARD_CONFIGURATION); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, federationUser, federationPass, "test", true); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withSettled(true).withState().accepted(); + + sendAddresPolicyToRemote(peer, policy); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(startsWith(allowedAddress)) + .and() + .respondInKind(); // Server detected demand based on policy we sent it + peer.expectFlow().withLinkCredit(1000); + peer.remoteTransfer().withBody().withString("test-message") + .also() + .withDeliveryId(1) + .queue(); + peer.expectDisposition().withSettled(true).withState().accepted(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Demand on the unrestricted address should generate federation links and move the message + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(fowardAllowed)); + + connection.start(); + + final Message message = consumer.receive(5_000); + assertNotNull(message); + assertInstanceOf(TextMessage.class, message); + assertEquals("test-message", ((TextMessage) message).getText()); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectFlow().withLinkCredit(999).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach(); // demand will be gone and receiver link should close. + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand on the restricted address should result in no federation links being attempted. + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(fowardRestricted)); + + connection.start(); + + final Message message = consumer.receiveNoWait(); + assertNull(message); + } + + // This would fail if the server tries creating a federation link since we would + // receive unexpected attach frames for the remote federation receiver. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteBrokerFederationReceiverRejectedWhenSecurityRestricted() throws Exception { + final String allowedAddress = getTestName(); + final String restrictedAddress = getTestName() + "restricted"; // Federation user cannot receive here. + + configureSecurity(server, allowedAddress, restrictedAddress); + server.start(); + server.createQueue(QueueConfiguration.of(allowedAddress).setRoutingType(RoutingType.MULTICAST) + .setAddress(allowedAddress) + .setAutoCreated(false)); + server.createQueue(QueueConfiguration.of(restrictedAddress).setRoutingType(RoutingType.MULTICAST) + .setAddress(restrictedAddress) + .setAutoCreated(false)); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, federationUser, federationPass, "test", true); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(allowedAddress) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(allowedAddress); + + // Connect to remote as if an address had demand and matched our federation policy + // This uses the allowed address so it should attach without issue. + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allowedAddress) + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(allowedAddress) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(restrictedAddress) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withNullSource(); + peer.expectDetach().withError(AmqpError.UNAUTHORIZED_ACCESS.toString()).respond(); + + // Connect to remote as if an address had demand and matched our federation policy + // This uses the restricted address so it should fail to attach. + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(restrictedAddress) + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(restrictedAddress) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + protected void configureSecurity(ActiveMQServer server, String allowed, String restricted, String... userAllowedOnly) { + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + + // User additions + securityManager.getConfiguration().addUser(federationUser, federationPass); + securityManager.getConfiguration().addRole(federationUser, "federate"); + + securityManager.getConfiguration().addUser(demandUser, demandPass); + securityManager.getConfiguration().addRole(demandUser, "user"); + + // Configure roles + HierarchicalRepository> securityRepository = server.getSecurityRepository(); + Set allowedRoles = new HashSet<>(); + allowedRoles.add(new Role("federate", true, true, false, false, false, false, true, true, false, false, false, false)); + allowedRoles.add(new Role("user", true, true, true, true, true, false, true, true, true, false, false, false)); + + Set restrictedRoles = new HashSet<>(); + restrictedRoles.add(new Role("federate", false, false, false, false, false, false, true, true, false, false, false, false)); + restrictedRoles.add(new Role("user", true, true, true, true, true, false, true, true, true, false, false, false)); + + // Only user can access the given security matches + for (String securityMatch : userAllowedOnly) { + securityRepository.addMatch(securityMatch, restrictedRoles); + } + + securityRepository.addMatch(restricted, restrictedRoles); + securityRepository.addMatch(allowed, allowedRoles); + securityRepository.addMatch(FEDERATION_BASE_VALIDATION_ADDRESS, allowedRoles); + + server.getConfiguration().setSecurityEnabled(true); + } + private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address); @@ -7131,6 +7442,14 @@ private static void scriptFederationConnectToRemote(ProtonTestClient peer, Strin } private static void scriptFederationConnectToRemote(ProtonTestClient peer, String federationName, int amqpCredits, int amqpLowCredits, boolean eventsSender, boolean eventsReceiver, boolean fqqnAddressSubs) { + scriptFederationConnectToRemote(peer, null, null, federationName, amqpCredits, amqpLowCredits, eventsSender, eventsReceiver, fqqnAddressSubs); + } + + private static void scriptFederationConnectToRemote(ProtonTestClient peer, String user, String password, String federationName, boolean fqqnAddressSubs) { + scriptFederationConnectToRemote(peer, user, password, federationName, AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT, false, false, fqqnAddressSubs); + } + + private static void scriptFederationConnectToRemote(ProtonTestClient peer, String user, String password, String federationName, int amqpCredits, int amqpLowCredits, boolean eventsSender, boolean eventsReceiver, boolean fqqnAddressSubs) { final String federationControlLinkName = "Federation:control:" + UUID.randomUUID().toString(); final Map federationConfiguration = new HashMap<>(); @@ -7140,8 +7459,14 @@ private static void scriptFederationConnectToRemote(ProtonTestClient peer, Strin final Map senderProperties = new HashMap<>(); senderProperties.put(FEDERATION_CONFIGURATION.toString(), federationConfiguration); senderProperties.put(FEDERATION_VERSION.toString(), fqqnAddressSubs ? FEDERATION_V2 : FEDERATION_V1); + senderProperties.put(FEDERATION_NAME.toString(), federationName); + + if (user == null && password == null) { + peer.queueClientSaslAnonymousConnect(); + } else { + peer.queueClientSaslPlainConnect(user, password); + } - peer.queueClientSaslAnonymousConnect(); peer.remoteOpen().queue(); peer.expectOpen(); peer.remoteBegin().queue(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index bdaf6cd7066..2ae83caec94 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -33,9 +33,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -54,10 +56,12 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; +import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -67,6 +71,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.Wait; @@ -92,6 +97,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK; @@ -99,6 +105,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_RECEIVER_PRIORITY; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_NAME; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; @@ -134,6 +141,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; /** * Tests for AMQP Broker federation handling of the receive from and send to queue policy configuration handling. @@ -142,6 +150,12 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected String federationUser = "fed_user"; + protected String federationPass = "fed_pass"; + + protected String demandUser = "demand_user"; + protected String demandPass = "demand_pass"; + @Override protected String getConfiguredProtocols() { return "AMQP,CORE"; @@ -201,8 +215,8 @@ private void doTestFederationCreatesQueueReceiverLinkForQueueMatch(RoutingType r server.getConfiguration().addAMQPConnection(amqpConnection); server.start(); server.createQueue(QueueConfiguration.of("test").setRoutingType(routingType) - .setAddress("test") - .setAutoCreated(false)); + .setAddress("test") + .setAutoCreated(false)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectAttach().ofReceiver() @@ -5547,6 +5561,169 @@ private void doTestDrainReceiverOnTransientErrorsConfiguredAtFederationLevel(boo } } + @Test + @Timeout(20) + public void testRemoteFederationDoesNotCreateLinksForQueuesItCannotWriteTo() throws Exception { + final String allowedQueue = getTestName(); + final String restrictedQueue = getTestName() + "restricted"; // Federation user cannot send here. + + configureSecurity(server, allowedQueue, restrictedQueue); + server.start(); + + final Collection> includes = new ArrayList<>(); + includes.add(new AbstractMap.SimpleEntry<>("#", allowedQueue)); + includes.add(new AbstractMap.SimpleEntry<>("#", restrictedQueue)); + + final Map properties = new HashMap<>(); + properties.put(QUEUE_RECEIVER_IDLE_TIMEOUT, 1); + + final FederationReceiveFromQueuePolicy policy = + new FederationReceiveFromQueuePolicy("test-queue-policy", + true, -2, includes, null, properties, null, + DEFAULT_WILDCARD_CONFIGURATION); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, federationUser, federationPass, "test"); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withSettled(true).withState().accepted(); + + sendQueuePolicyToRemote(peer, policy); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withSource().withAddress(allowedQueue + "::" + allowedQueue) + .and() + .respondInKind(); // Server detected demand based on policy we sent it + peer.expectFlow().withLinkCredit(1000); + peer.remoteTransfer().withBody().withString("test-message") + .also() + .withDeliveryId(1) + .queue(); + peer.expectDisposition().withSettled(true).withState().accepted(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Demand on the unrestricted queue should generate federation links and move the message + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(allowedQueue)); + + connection.start(); + + final Message message = consumer.receive(5_000); + assertNotNull(message); + assertInstanceOf(TextMessage.class, message); + assertEquals("test-message", ((TextMessage) message).getText()); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectFlow().withLinkCredit(999).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach(); // demand will be gone and receiver link should close. + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Demand on the restricted queue should result in no federation links being attempted. + try (Connection connection = factory.createConnection(demandUser, demandPass)) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(restrictedQueue)); + + connection.start(); + + final Message message = consumer.receiveNoWait(); + assertNull(message); + } + + // This would fail if the server tries creating a federation link since we would + // receive unexpected attach frames for the remote federation receiver. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteBrokerFederationReceiverRejectedWhenSecurityRestricted() throws Exception { + final String allowedQueue = getTestName(); + final String restrictedQueue = getTestName() + "restricted"; // Federation user cannot receive here. + + configureSecurity(server, allowedQueue, restrictedQueue); + server.start(); + server.createQueue(QueueConfiguration.of(allowedQueue).setRoutingType(RoutingType.ANYCAST) + .setAddress(allowedQueue) + .setAutoCreated(false)); + server.createQueue(QueueConfiguration.of(restrictedQueue).setRoutingType(RoutingType.ANYCAST) + .setAddress(restrictedQueue) + .setAutoCreated(false)); + + final String allowedFQQN = allowedQueue + "::" + allowedQueue; + final String restrictedFQQN = restrictedQueue + "::" + restrictedQueue; + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, federationUser, federationPass, "test"); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(allowedFQQN) + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withSource().withAddress(allowedFQQN); + + // Connect to remote as if an queue had demand and matched our federation policy + // This uses the allowed queue so it should attach without issue. + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allowedFQQN) + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(allowedFQQN) + .withCapabilities("queue") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(restrictedFQQN) + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withNullSource(); + peer.expectDetach().withError(AmqpError.UNAUTHORIZED_ACCESS.toString()).respond(); + + // Connect to remote as if an queue had demand and matched our federation policy + // This uses the restricted queue so it should fail to attach. + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(restrictedFQQN) + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(restrictedFQQN) + .withCapabilities("queue") + .and() + .withTarget().and() + .now(); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address); @@ -5652,6 +5829,15 @@ private void scriptFederationConnectToRemote(ProtonTestClient peer, String feder } private void scriptFederationConnectToRemote(ProtonTestClient peer, String federationName, int amqpCredits, int amqpLowCredits, boolean eventsSender, boolean eventsReceiver) { + scriptFederationConnectToRemote(peer, null, null, federationName, amqpCredits, amqpLowCredits, eventsSender, eventsReceiver); + } + + private void scriptFederationConnectToRemote(ProtonTestClient peer, String username, String password, String federationName) { + scriptFederationConnectToRemote(peer, username, password, federationName, AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT, false, false); + } + + private void scriptFederationConnectToRemote(ProtonTestClient peer, String user, String password, String federationName, int amqpCredits, int amqpLowCredits, boolean eventsSender, boolean eventsReceiver) { + final String federationControlLinkName = "Federation:control:" + UUID.randomUUID().toString(); final Map federationConfiguration = new HashMap<>(); @@ -5660,8 +5846,14 @@ private void scriptFederationConnectToRemote(ProtonTestClient peer, String feder final Map senderProperties = new HashMap<>(); senderProperties.put(FEDERATION_CONFIGURATION.toString(), federationConfiguration); + senderProperties.put(FEDERATION_NAME.toString(), federationName); + + if (user == null && password == null) { + peer.queueClientSaslAnonymousConnect(); + } else { + peer.queueClientSaslPlainConnect(user, password); + } - peer.queueClientSaslAnonymousConnect(); peer.remoteOpen().queue(); peer.expectOpen(); peer.remoteBegin().queue(); @@ -5741,6 +5933,42 @@ private void scriptFederationConnectToRemote(ProtonTestClient peer, String feder } } + @Override + protected void configureBrokerSecurity(ActiveMQServer server) { + if (isSecurityEnabled()) { + enableSecurity(server); + } else { + server.getConfiguration().setSecurityEnabled(false); + } + } + + private void configureSecurity(ActiveMQServer server, String allowedQueue, String restrictedQueue) { + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + + // User additions + securityManager.getConfiguration().addUser(federationUser, federationPass); + securityManager.getConfiguration().addRole(federationUser, "federate"); + + securityManager.getConfiguration().addUser(demandUser, demandPass); + securityManager.getConfiguration().addRole(demandUser, "user"); + + // Configure roles + HierarchicalRepository> securityRepository = server.getSecurityRepository(); + Set value = new HashSet<>(); + value.add(new Role("federate", true, true, false, false, false, false, true, true, false, false, false, false)); + value.add(new Role("user", true, true, true, false, false, false, true, true, true, false, false, false)); + + Set restricted = new HashSet<>(); + restricted.add(new Role("federate", false, false, false, false, false, false, true, true, false, false, false, false)); + restricted.add(new Role("user", true, true, true, false, false, false, true, true, true, false, false, false)); + + securityRepository.addMatch(restrictedQueue, restricted); + securityRepository.addMatch(allowedQueue, value); + securityRepository.addMatch(FEDERATION_BASE_VALIDATION_ADDRESS, value); + + server.getConfiguration().setSecurityEnabled(true); + } + private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin { public final AtomicBoolean started = new AtomicBoolean();