Skip to content

Commit 54c5124

Browse files
tabish121gemmellr
authored andcommitted
ARTEMIS-5941 Route wildcard subscriptions direct to demand bindings
Avoid using the WildcardAddressManager to route to an actual wild-card address when a federation link is federating for demand on the wild-card address as that method will throw if assertions are enabled and has likely not been tested for that use.
1 parent 8cc9f68 commit 54c5124

File tree

3 files changed

+271
-6
lines changed

3 files changed

+271
-6
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,22 @@ private FederationConsumerInfo createConsumerInfo(AddressInfo address, Binding b
653653
}
654654

655655
private boolean isUseConduitConsumer() {
656-
// Only use binding filters when configured to do so and the remote supports FQQN subscriptions because
657-
// we need to be able to open multiple uniquely named queues for an address if more than one consumer with
658-
// differing filters are present and prior to FQQN subscription support we used a simple link name that
659-
// would not be unique amongst multiple consumers.
660-
return configuration.isIgnoreAddressBindingFilters() ||
661-
!manager.getFederation().getCapabilities().isUseFQQNAddressSubscriptions();
656+
if (!manager.getCapabilities().isUseFQQNAddressSubscriptions()) {
657+
// prior to FQQN subscription support we used a simple link name that would not be unique amongst
658+
// multiple consumers on the same address so for most features or behaviors that come later we cannot
659+
// action them properly and we must fallback to a conduit consumer strategy.
660+
return true;
661+
} else if (manager.getWildcardConfiguration().isWild(addressInfo.getName())) {
662+
// For a wildcard subscription we want to treat it as a bindings aware consumer and route messages only
663+
// to the bindings we have tracked as demand on the address. The broker wild-card code is not known to
664+
// support direct sends to the literal wildcard address and could fail to route messages correctly.
665+
return false;
666+
} else {
667+
// If we get here then the only consideration is if we are honoring consumers filters on the address
668+
// which decides if we can route straight to the address or must route to specific bindings groups that
669+
// all share the same filter.
670+
return configuration.isIgnoreAddressBindingFilters();
671+
}
662672
}
663673

664674
private String generateQueueName(AddressInfo address, Binding binding, boolean ignoreFilters) {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.lang.invoke.MethodHandles;
2020
import java.util.concurrent.atomic.AtomicBoolean;
2121
import org.apache.activemq.artemis.api.core.ActiveMQException;
22+
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
2223
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
2324
import org.apache.activemq.artemis.core.server.Divert;
2425
import org.apache.activemq.artemis.core.server.Queue;
@@ -39,10 +40,15 @@ public abstract class AMQPFederationLocalPolicyManager extends AMQPFederationPol
3940

4041
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
4142

43+
private final WildcardConfiguration wildcardConfiguration;
44+
4245
protected volatile AMQPFederationConsumerConfiguration configuration;
46+
protected volatile AMQPFederationCapabilities capabilities;
4347

4448
public AMQPFederationLocalPolicyManager(AMQPFederation federation, AMQPFederationMetrics metrics, FederationReceiveFromResourcePolicy policy) throws ActiveMQException {
4549
super(federation, metrics, policy.getPolicyName(), policy.getPolicyType());
50+
51+
this.wildcardConfiguration = federation.getWildcardConfiguration();
4652
}
4753

4854
/**
@@ -57,6 +63,20 @@ protected AMQPFederationConsumerConfiguration getConfiguration() {
5763
return configuration;
5864
}
5965

66+
/**
67+
* {@return the known connection capabilities at this time the method is called}
68+
*/
69+
protected AMQPFederationCapabilities getCapabilities() {
70+
return capabilities;
71+
}
72+
73+
/**
74+
* {@return the wild-card configuration the federation was configured with when created}
75+
*/
76+
protected WildcardConfiguration getWildcardConfiguration() {
77+
return wildcardConfiguration;
78+
}
79+
6080
@Override
6181
protected final void handleManagerInitialized() {
6282
server.registerBrokerPlugin(this);
@@ -104,6 +124,7 @@ protected final void handleConnectionRestored() {
104124
// Capture state for the current connection on each connection as different URIs could have different options we
105125
// need to capture in the current configuration state.
106126
configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), getPolicy().getProperties());
127+
capabilities = federation.getCapabilities();
107128

108129
updateStateAfterConnect(configuration, session);
109130

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7312,6 +7312,240 @@ public void testRemoteBrokerFederationReceiverRejectedWhenSecurityRestricted() t
73127312
}
73137313
}
73147314

7315+
@Test
7316+
@Timeout(20)
7317+
public void testWildcardSubscriptionRoutedToBindingWhenFederatedFromRemote() throws Exception {
7318+
try (ProtonTestServer peer = new ProtonTestServer()) {
7319+
peer.expectSASLAnonymousConnect();
7320+
peer.expectOpen().respond();
7321+
peer.expectBegin().respond();
7322+
peer.expectAttach().ofSender()
7323+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
7324+
.withProperty(FEDERATION_VERSION.toString(), FEDERATION_V2)
7325+
.respondInKind()
7326+
.withProperty(FEDERATION_VERSION.toString(), FEDERATION_V2);
7327+
peer.expectAttach().ofReceiver()
7328+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
7329+
.respondInKind();
7330+
peer.expectFlow().withLinkCredit(10);
7331+
peer.start();
7332+
7333+
final String wildcardAddressA = getTestName() + ".A.#";
7334+
final String wildcardAddressB = getTestName() + ".B.#";
7335+
7336+
final URI remoteURI = peer.getServerURI();
7337+
logger.info("Test started, peer listening on: {}", remoteURI);
7338+
7339+
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
7340+
receiveFromAddress.setName("address-policy");
7341+
receiveFromAddress.addToIncludes(wildcardAddressA);
7342+
receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
7343+
7344+
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
7345+
element.setName(getTestName());
7346+
element.addLocalAddressPolicy(receiveFromAddress);
7347+
7348+
final AMQPBrokerConnectConfiguration amqpConnection =
7349+
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
7350+
amqpConnection.setReconnectAttempts(0);// No reconnects
7351+
amqpConnection.addElement(element);
7352+
7353+
server.getConfiguration().addAMQPConnection(amqpConnection);
7354+
server.start();
7355+
7356+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7357+
peer.expectAttach().ofReceiver()
7358+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
7359+
.withName(allOf(containsString(wildcardAddressA),
7360+
containsString("address-receiver"),
7361+
containsString(server.getNodeID().toString())))
7362+
.withSource().withAddress(not(containsString("filterId"))).also()
7363+
.respondInKind();
7364+
peer.expectFlow().withLinkCredit(1000);
7365+
7366+
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
7367+
7368+
try (Connection connection = factory.createConnection()) {
7369+
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
7370+
// Both should have the message routed to them by a single federation receiver link to the remote
7371+
final MessageConsumer consumerA1 = session.createConsumer(session.createTopic(wildcardAddressA));
7372+
final MessageConsumer consumerA2 = session.createConsumer(session.createTopic(wildcardAddressA));
7373+
// Should not match and should not create any federation links
7374+
final MessageConsumer consumerB = session.createConsumer(session.createTopic(wildcardAddressB));
7375+
7376+
connection.start();
7377+
7378+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7379+
peer.expectDisposition().withSettled(true).withState().accepted();
7380+
peer.remoteTransfer().withMessage()
7381+
.withBody().withString("test-message").also()
7382+
.withDeliveryId(0)
7383+
.later(10);
7384+
7385+
final Message messageA1 = consumerA1.receive(5_000);
7386+
assertNotNull(messageA1);
7387+
assertInstanceOf(TextMessage.class, messageA1);
7388+
assertEquals("test-message", ((TextMessage) messageA1).getText());
7389+
7390+
final Message messageA2 = consumerA2.receive(5_000);
7391+
assertNotNull(messageA2);
7392+
assertInstanceOf(TextMessage.class, messageA2);
7393+
assertEquals("test-message", ((TextMessage) messageA2).getText());
7394+
7395+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7396+
peer.expectFlow().withLinkCredit(999).withDrain(true)
7397+
.respond()
7398+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
7399+
peer.expectDetach(); // demand will be gone and receiver link should close.
7400+
7401+
assertNull(consumerB.receiveNoWait());
7402+
}
7403+
7404+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7405+
peer.close();
7406+
}
7407+
}
7408+
7409+
@Test
7410+
@Timeout(20)
7411+
public void testWildcardSubscriptionWithFiltersRoutedToBindingsWhenFederatedFromRemote() throws Exception {
7412+
try (ProtonTestServer peer = new ProtonTestServer()) {
7413+
peer.expectSASLAnonymousConnect();
7414+
peer.expectOpen().respond();
7415+
peer.expectBegin().respond();
7416+
peer.expectAttach().ofSender()
7417+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
7418+
.withProperty(FEDERATION_VERSION.toString(), FEDERATION_V2)
7419+
.respondInKind()
7420+
.withProperty(FEDERATION_VERSION.toString(), FEDERATION_V2);
7421+
peer.expectAttach().ofReceiver()
7422+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
7423+
.respondInKind();
7424+
peer.expectFlow().withLinkCredit(10);
7425+
peer.start();
7426+
7427+
final String wildcardAddressA = getTestName() + ".A.#";
7428+
final String wildcardAddressB = getTestName() + ".B.#";
7429+
final String expectedJMSFilter = "color='red'";
7430+
final AtomicReference<Attach> capturedAttach = new AtomicReference<>();
7431+
final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
7432+
final org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong jmsSelectorCode =
7433+
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
7434+
7435+
final URI remoteURI = peer.getServerURI();
7436+
logger.info("Test started, peer listening on: {}", remoteURI);
7437+
7438+
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
7439+
receiveFromAddress.setName("address-policy");
7440+
receiveFromAddress.addToIncludes(wildcardAddressA);
7441+
receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
7442+
receiveFromAddress.addProperty(IGNORE_ADDRESS_BINDING_FILTERS, "false");
7443+
7444+
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
7445+
element.setName(getTestName());
7446+
element.addLocalAddressPolicy(receiveFromAddress);
7447+
7448+
final AMQPBrokerConnectConfiguration amqpConnection =
7449+
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
7450+
amqpConnection.setReconnectAttempts(0);// No reconnects
7451+
amqpConnection.addElement(element);
7452+
7453+
server.getConfiguration().addAMQPConnection(amqpConnection);
7454+
server.start();
7455+
7456+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7457+
peer.expectAttach().ofReceiver()
7458+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
7459+
.withCapture(attach -> capturedAttach.set(attach))
7460+
.withName(allOf(containsString(wildcardAddressA),
7461+
containsString("address-receiver"),
7462+
containsString(server.getNodeID().toString())))
7463+
.withSource().withAddress(containsString("filterId")).also()
7464+
.respondInKind();
7465+
peer.expectFlow().withLinkCredit(1000);
7466+
7467+
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
7468+
7469+
try (Connection connection = factory.createConnection()) {
7470+
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
7471+
// Both should have the message routed to them by a single federation receiver link to the remote
7472+
// that link should have a filter assigned to it so the remote only sends matches and nothing else.
7473+
final MessageConsumer consumerA1 = session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
7474+
final MessageConsumer consumerA2 = session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
7475+
// Should not match and should not create any federation links
7476+
final MessageConsumer consumerB = session.createConsumer(session.createTopic(wildcardAddressB));
7477+
7478+
connection.start();
7479+
7480+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7481+
peer.expectDisposition().withSettled(true).withState().accepted();
7482+
peer.remoteTransfer().withMessage()
7483+
.withBody().withString("test-message").also()
7484+
.withApplicationProperties().withProperty("color", "red").also()
7485+
.withDeliveryId(0)
7486+
.later(10);
7487+
7488+
final Message messageA1 = consumerA1.receive(5_000);
7489+
assertNotNull(messageA1);
7490+
assertInstanceOf(TextMessage.class, messageA1);
7491+
assertEquals("test-message", ((TextMessage) messageA1).getText());
7492+
7493+
final Message messageA2 = consumerA2.receive(5_000);
7494+
assertNotNull(messageA2);
7495+
assertInstanceOf(TextMessage.class, messageA2);
7496+
assertEquals("test-message", ((TextMessage) messageA2).getText());
7497+
7498+
final Map<Symbol, Object> filtersMap1 = capturedAttach.get().getSource().getFilter();
7499+
7500+
assertNotNull(filtersMap1);
7501+
assertTrue(filtersMap1.containsKey(jmsSelectorKey));
7502+
final DescribedType jmsSelectorEntry = (DescribedType) filtersMap1.get(jmsSelectorKey);
7503+
assertNotNull(jmsSelectorEntry);
7504+
assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
7505+
assertEquals(jmsSelectorEntry.getDescribed().toString(), expectedJMSFilter);
7506+
7507+
// Attach another consumer on the same wild card address but with a different filter and we should
7508+
// see a new receiver link opened as that binding needs its own receiver.
7509+
peer.expectAttach().ofReceiver()
7510+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
7511+
.withCapture(attach -> capturedAttach.set(attach))
7512+
.withName(allOf(containsString(wildcardAddressA),
7513+
containsString("address-receiver"),
7514+
containsString(server.getNodeID().toString())))
7515+
.withSource().withAddress(containsString("filterId")).also()
7516+
.respondInKind();
7517+
peer.expectFlow().withLinkCredit(1000);
7518+
7519+
final MessageConsumer consumerA3 = session.createConsumer(session.createTopic(wildcardAddressA), "color='blue'");
7520+
7521+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7522+
7523+
assertNull(consumerA3.receiveNoWait()); // Should be nothing to route here.
7524+
7525+
// Consumer A3 close should trigger federation link close with no messages read.
7526+
peer.expectFlow().withLinkCredit(1000).withDrain(true)
7527+
.respond()
7528+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
7529+
peer.expectDetach();
7530+
7531+
consumerA3.close();
7532+
7533+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7534+
7535+
// Now the other two consumers will close and the link should be torn down.
7536+
peer.expectFlow().withLinkCredit(999).withDrain(true)
7537+
.respond()
7538+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
7539+
peer.expectDetach();
7540+
7541+
assertNull(consumerB.receive(100));
7542+
}
7543+
7544+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
7545+
peer.close();
7546+
}
7547+
}
7548+
73157549
protected void configureSecurity(ActiveMQServer server, String allowed, String restricted, String... userAllowedOnly) {
73167550
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
73177551

0 commit comments

Comments
 (0)