Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,18 @@ synchronized void processRemoteQueueAdded(String addressName, String queueName)
});
}

/**
* {@return true if this is the federation source instance, false if it is the target}
*/
abstract boolean isFederationSource();

/**
* {@return true if this is the federation target instance, false if it is the source}
*/
final boolean isFederationTarget() {
return !isFederationSource();
}

/**
* Error signaling API that must be implemented by the specific federation implementation to handle error when
* creating a federation resource such as an outgoing receiver link.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
Expand Down Expand Up @@ -251,6 +254,23 @@ private void checkBindingForMatch(Binding binding) {
return;
}

// Target brokers which have been sent remote federation policies might not have write
// access via the logged in user to the address with demand which we are attempting to
// federate messages to so instead of creating a receiver that will fail when the remote
// routes a message to it we can just omit creating the link in the first place.
if (federation.isFederationTarget()) {
try {
session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
} catch (ActiveMQSecurityException e) {
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
addressInfo.getName().toString(), "User does not have send permission to configured address.");
return;
} catch (Exception ex) {
logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
return;
}
}

createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding);
} else {
reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
Expand Down Expand Up @@ -289,6 +309,23 @@ private void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding divertBindi
return;
}

// Target brokers which have been sent remote federation policies might not have write access
// via the logged in user to the address this divert is attached to which means we don't need
// to track this divert. Since we don't add the divert to the tracking map future demand on
// divert that would otherwise match the address includes won't trigger federation attempts.
if (federation.isFederationTarget()) {
try {
session.getSessionSPI().check(addressInfo.getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
} catch (ActiveMQSecurityException e) {
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
addressInfo.getName().toString(), "User does not have send permission to configured address.");
return;
} catch (Exception ex) {
logger.warn("Caught unknown exception from security check on address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
return;
}
}

// We only need to check if we've never seen the divert before, afterwards we will
// be checking it any time a new QueueBinding is added instead.
if (!divertsTracking.containsKey(divertBinding)) {
Expand Down Expand Up @@ -337,15 +374,15 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding

final SimpleString queueAddress = queueBinding.getAddress();

divertsTracking.entrySet().forEach((e) -> {
final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress();
final DivertBinding divertBinding = e.getKey();
divertsTracking.entrySet().forEach((divert) -> {
final SimpleString forwardAddress = divert.getKey().getDivert().getForwardAddress();
final DivertBinding divertBinding = divert.getKey();

// Check matched diverts to see if the QueueBinding address matches the address or
// addresses (composite diverts) of the Divert and if so then we can check if we need
// to create demand on the source address on the remote if we haven't done so already.

if (!e.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
if (!divert.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
// The plugin can block the demand totally here either based on the divert itself
// or the queue that's attached to the divert.
if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) {
Expand All @@ -360,7 +397,7 @@ private void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding

// Each divert that forwards to the address the queue is bound to we add demand
// in the diverts tracker.
e.getValue().add(queueBinding);
divert.getValue().add(queueBinding);

final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
Expand All @@ -41,6 +44,7 @@
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -164,8 +168,9 @@ private void checkQueueForMatch(Queue queue) {

private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
final String queueName = consumer.getQueue().getName().toString();
final SimpleString addressName = consumer.getQueueAddress();

if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) {
if (testIfQueueMatchesPolicy(addressName.toString(), queueName)) {
final boolean federationConsumer = isFederationConsumer(consumer);

// We should ignore federation consumers from remote peers but configuration does allow
Expand All @@ -175,6 +180,23 @@ private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
return;
}

// Target brokers which have been sent remote federation policies might not have write
// access via the logged in user to the queue with demand which we are attempting to
// federate messages to so instead of creating a receiver that will fail when the remote
// routes a message to it we can just omit creating the link in the first place.
if (federation.isFederationTarget()) {
try {
session.getSessionSPI().check(addressName, consumer.getQueue().getName(), CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
} catch (ActiveMQSecurityException e) {
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedQueueFederation(
queueName, "User does not have send permission to configured queue.");
return;
} catch (Exception ex) {
logger.warn("Caught unknown exception from security check on queue:{} send permissions: cannot federate:", queueName, ex);
return;
}
}

logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding());

final AMQPFederationQueueConsumerManager entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ protected void signalError(Exception cause) {
brokerConnection.runtimeError(cause);
}

@Override
boolean isFederationSource() {
return true;
}

@Override
void registerFederationManagement() throws Exception {
AMQPFederationManagementSupport.registerFederationSource(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ protected void signalError(Exception cause) {
connection.close(new ErrorCondition(condition, description));
}

@Override
boolean isFederationSource() {
return false;
}

@Override
void registerFederationManagement() throws Exception {
if (brokerConnection.isManagable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ public interface ActiveMQAMQPProtocolLogger {

@LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on address {}, queueID={}", level = LogMessage.Level.WARN)
void ackRetryFailed(Object ackRetryInformation, Object address, long queueID);

@LogMessage(id = 111013, value = "AMQP Federation target skipped federation of address {}, reason={}", level = LogMessage.Level.WARN)
void federationTargetSkippedAddressFederation(String address, String reason);

@LogMessage(id = 111014, value = "AMQP Federation target skipped federation of queue {}, reason={}", level = LogMessage.Level.WARN)
void federationTargetSkippedQueueFederation(String address, String reason);
}
Loading