Skip to content

Commit 0c6a56b

Browse files
committed
Fix address mappings for when more than two transports are being used (#526)
1 parent d2eb3c7 commit 0c6a56b

File tree

7 files changed

+180
-104
lines changed

7 files changed

+180
-104
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using NServiceBus;
5+
using NServiceBus.Raw;
6+
using NServiceBus.Transport;
7+
8+
/// <summary>
9+
/// How to map an address depends upon which transport the message is being sent to,
10+
/// so it is necessary to have an address mapping dictionary for each transport
11+
/// </summary>
12+
class AddressMap(IReadOnlyDictionary<string, IStartableRawEndpoint> dispatchers) : IAddressMap
13+
{
14+
readonly Dictionary<string, TransportAddressMap> addressMap =
15+
dispatchers.ToDictionary(dispatcher => dispatcher.Key, _ => new TransportAddressMap());
16+
17+
readonly HashSet<string> endpoints = [];
18+
19+
public void Add(BridgeTransport transport, BridgeEndpoint endpoint)
20+
{
21+
if (!endpoints.Add(endpoint.Name))
22+
{
23+
throw new InvalidOperationException($"{endpoint.Name} has already been added to the address map.");
24+
}
25+
26+
var queueAddress = new QueueAddress(endpoint.Name);
27+
28+
foreach (var targetTransport in addressMap.Keys)
29+
{
30+
var targetAddress = DetermineTransportAddress(targetTransport, transport.Name, endpoint.QueueAddress, queueAddress);
31+
32+
foreach (var sourceTransport in addressMap.Keys)
33+
{
34+
var sourceAddress = DetermineTransportAddress(sourceTransport, transport.Name, endpoint.QueueAddress, queueAddress);
35+
36+
addressMap[targetTransport][sourceAddress] = targetAddress;
37+
}
38+
}
39+
}
40+
41+
string DetermineTransportAddress(string transportToMap, string endpointTransport, string overriddenQueueAddress, QueueAddress queueAddress) => transportToMap == endpointTransport
42+
? overriddenQueueAddress ?? dispatchers[transportToMap].ToTransportAddress(queueAddress)
43+
: dispatchers[transportToMap].ToTransportAddress(queueAddress);
44+
45+
public bool TryTranslate(string targetTransport, string address, out string bestMatch)
46+
{
47+
var transportAddressMappings = addressMap[targetTransport];
48+
49+
if (transportAddressMappings.TryGetValue(address, out bestMatch))
50+
{
51+
return true;
52+
}
53+
54+
bestMatch = address.GetClosestMatch(transportAddressMappings.Keys);
55+
return false;
56+
}
57+
}
58+
59+
class TransportAddressMap : Dictionary<string, string>
60+
{
61+
}

src/NServiceBus.MessagingBridge/EndpointRegistry.cs

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,59 +15,77 @@ public async Task<IEnumerable<ProxyRegistration>> Initialize(IReadOnlyCollection
1515
{
1616
// Assume that it is the number of endpoints that is more likely to scale up in size than the number of transports (typically only 2).
1717
// Therefore in cases where it might matter, it is more efficient to iterate over the transports multiple times.
18-
transportConfigurationMappings = transportConfigurations.ToDictionary(t => t.Name, t => t);
1918

2019
IList<ProxyRegistration> proxyRegistrations = [];
2120

22-
foreach (var targetTransport in transportConfigurations)
21+
await CreateAndRegisterDispatchers(transportConfigurations, proxyRegistrations, cancellationToken)
22+
.ConfigureAwait(false);
23+
24+
var allEndpoints = transportConfigurations.SelectMany(
25+
transport => transport.Endpoints.Select(endpoint => (transport, endpoint)));
26+
27+
foreach (var (transport, endpoint) in allEndpoints)
2328
{
24-
// Create the dispatcher for this transport
25-
var dispatchEndpoint = await EndpointProxyFactory.CreateDispatcher(targetTransport, cancellationToken).ConfigureAwait(false);
29+
AddressMap.Add(transport, endpoint);
30+
31+
var dispatcher = dispatchers[transport.Name];
32+
var queueAddress = new QueueAddress(endpoint.Name);
33+
var targetTransportAddress = dispatcher.ToTransportAddress(queueAddress);
34+
endpointAddressMappings[endpoint.Name] = endpoint.QueueAddress ?? targetTransportAddress;
35+
targetEndpointDispatchers[endpoint.Name] = new TargetEndpointDispatcher(transport.Name, dispatcher, targetTransportAddress);
36+
37+
await CreateAndRegisterProxies(transport, endpoint, transportConfigurations, proxyRegistrations, cancellationToken)
38+
.ConfigureAwait(false);
39+
}
40+
41+
return proxyRegistrations;
42+
}
2643

44+
async Task CreateAndRegisterDispatchers(
45+
IReadOnlyCollection<BridgeTransport> transportConfigurations,
46+
IList<ProxyRegistration> proxyRegistrations,
47+
CancellationToken cancellationToken)
48+
{
49+
foreach (var transport in transportConfigurations)
50+
{
51+
var dispatcher = await EndpointProxyFactory.CreateDispatcher(transport, cancellationToken).ConfigureAwait(false);
52+
dispatchers.Add(transport.Name, dispatcher);
2753
proxyRegistrations.Add(new ProxyRegistration
2854
{
2955
Endpoint = null,
30-
TranportName = targetTransport.Name,
31-
RawEndpoint = dispatchEndpoint
56+
RawEndpoint = dispatcher
3257
});
58+
}
59+
60+
AddressMap = new AddressMap(dispatchers);
61+
}
3362

34-
// create required proxy endpoints on all transports
35-
foreach (var endpointToSimulate in targetTransport.Endpoints)
63+
async Task CreateAndRegisterProxies(
64+
BridgeTransport targetTransport,
65+
BridgeEndpoint targetEndpoint,
66+
IReadOnlyCollection<BridgeTransport> transportConfigurations,
67+
IList<ProxyRegistration> proxyRegistrations,
68+
CancellationToken cancellationToken)
69+
{
70+
// Endpoint will need to be proxied on the other transports
71+
foreach (var proxyTransport in transportConfigurations)
72+
{
73+
if (proxyTransport.Name == targetTransport.Name)
3674
{
37-
// Endpoint will need to be proxied on the other transports
38-
foreach (var proxyTransport in transportConfigurationMappings.Where(kvp => kvp.Key != targetTransport.Name).Select(kvp => kvp.Value))
39-
{
40-
var startableEndpointProxy = await endpointProxyFactory.CreateProxy(endpointToSimulate, proxyTransport, cancellationToken)
41-
.ConfigureAwait(false);
42-
43-
logger.LogInformation("Proxy for endpoint {endpoint} created on {transport}", endpointToSimulate.Name, proxyTransport.Name);
44-
45-
var queueAddress = new QueueAddress(endpointToSimulate.Name);
46-
var targetTransportAddress = dispatchEndpoint.ToTransportAddress(queueAddress);
47-
var sourceTransportAddress = startableEndpointProxy.ToTransportAddress(queueAddress);
48-
49-
endpointAddressMappings[endpointToSimulate.Name] = endpointToSimulate.QueueAddress ?? targetTransportAddress;
50-
targetEndpointAddressMappings[targetTransportAddress] = sourceTransportAddress;
51-
if (targetTransportAddress != sourceTransportAddress)
52-
{
53-
// Also add the reverse mapping so that any messages that were in-flight before a bridge configuration change
54-
// can still have their address translated correctly. This also allows for the case where duplicate logical endpoints
55-
// are running as a competing consumer with the bridge in canary/parallel deployment scenarios.
56-
targetEndpointAddressMappings[sourceTransportAddress] = targetTransportAddress;
57-
}
58-
targetEndpointDispatchers[endpointToSimulate.Name] = new TargetEndpointDispatcher(targetTransport.Name, dispatchEndpoint, targetTransportAddress);
59-
60-
proxyRegistrations.Add(new ProxyRegistration
61-
{
62-
Endpoint = endpointToSimulate,
63-
TranportName = proxyTransport.Name,
64-
RawEndpoint = startableEndpointProxy
65-
});
66-
}
75+
continue;
6776
}
68-
}
6977

70-
return proxyRegistrations;
78+
var startableEndpointProxy = await endpointProxyFactory.CreateProxy(targetEndpoint, proxyTransport, cancellationToken)
79+
.ConfigureAwait(false);
80+
81+
logger.LogInformation("Proxy for endpoint {endpoint} created on {transport}", targetTransport.Name, proxyTransport.Name);
82+
83+
proxyRegistrations.Add(new ProxyRegistration
84+
{
85+
Endpoint = targetEndpoint,
86+
RawEndpoint = startableEndpointProxy
87+
});
88+
}
7189
}
7290

7391
public TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpointName)
@@ -77,21 +95,12 @@ public TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpoin
7795
return endpointDispatcher;
7896
}
7997

80-
var nearestMatch = GetClosestMatchForExceptionMessage(sourceEndpointName, targetEndpointDispatchers.Keys);
98+
var nearestMatch = sourceEndpointName.GetClosestMatch(targetEndpointDispatchers.Keys);
8199

82100
throw new Exception($"No target endpoint dispatcher could be found for endpoint: {sourceEndpointName}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {nearestMatch}");
83101
}
84102

85-
public bool TryTranslateToTargetAddress(string sourceAddress, out string bestMatch)
86-
{
87-
if (targetEndpointAddressMappings.TryGetValue(sourceAddress, out bestMatch))
88-
{
89-
return true;
90-
}
91-
92-
bestMatch = GetClosestMatchForExceptionMessage(sourceAddress, targetEndpointAddressMappings.Keys);
93-
return false;
94-
}
103+
public IAddressMap AddressMap { get; private set; }
95104

96105
public string GetEndpointAddress(string endpointName)
97106
{
@@ -100,30 +109,18 @@ public string GetEndpointAddress(string endpointName)
100109
return address;
101110
}
102111

103-
var nearestMatch = GetClosestMatchForExceptionMessage(endpointName, endpointAddressMappings.Keys);
112+
var nearestMatch = endpointName.GetClosestMatch(endpointAddressMappings.Keys) ?? "(No mappings registered)";
104113

105114
throw new Exception($"No address mapping could be found for endpoint: {endpointName}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {nearestMatch}");
106115
}
107116

108-
static string GetClosestMatchForExceptionMessage(string sourceEndpointName, IEnumerable<string> items)
109-
{
110-
var calculator = new Levenshtein(sourceEndpointName.ToLower());
111-
var nearestMatch = items
112-
.OrderBy(x => calculator.DistanceFrom(x.ToLower()))
113-
.FirstOrDefault();
114-
return nearestMatch ?? "(No mappings registered)";
115-
}
116-
117-
Dictionary<string, BridgeTransport> transportConfigurationMappings = [];
118-
117+
readonly Dictionary<string, IStartableRawEndpoint> dispatchers = [];
119118
readonly Dictionary<string, TargetEndpointDispatcher> targetEndpointDispatchers = [];
120-
readonly Dictionary<string, string> targetEndpointAddressMappings = [];
121119
readonly Dictionary<string, string> endpointAddressMappings = [];
122120

123121
public class ProxyRegistration
124122
{
125123
public BridgeEndpoint Endpoint;
126-
public string TranportName;
127124
public IStartableRawEndpoint RawEndpoint;
128125
}
129126
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using NServiceBus;
2+
3+
interface IAddressMap
4+
{
5+
void Add(BridgeTransport transport, BridgeEndpoint endpoint);
6+
7+
bool TryTranslate(string targetTransport, string address, out string bestMatch);
8+
}

src/NServiceBus.MessagingBridge/IEndpointRegistry.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
{
33
TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpointName);
44

5-
bool TryTranslateToTargetAddress(string sourceAddress, out string bestMatch);
5+
IAddressMap AddressMap { get; }
66

77
string GetEndpointAddress(string endpointName);
88
}

src/NServiceBus.MessagingBridge/MessageShovel.cs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,11 @@
66
using NServiceBus.Faults;
77
using NServiceBus.Transport;
88

9-
sealed class MessageShovel : IMessageShovel
9+
sealed class MessageShovel(
10+
ILogger<MessageShovel> logger,
11+
IEndpointRegistry targetEndpointRegistry,
12+
FinalizedBridgeConfiguration finalizedBridgeConfiguration) : IMessageShovel
1013
{
11-
public MessageShovel(
12-
ILogger<MessageShovel> logger,
13-
IEndpointRegistry targetEndpointRegistry,
14-
FinalizedBridgeConfiguration finalizedBridgeConfiguration)
15-
{
16-
this.logger = logger;
17-
this.targetEndpointRegistry = targetEndpointRegistry;
18-
translateReplyToAddressForFailedMessages = finalizedBridgeConfiguration.TranslateReplyToAddressForFailedMessages;
19-
}
20-
2114
public async Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default)
2215
{
2316
TargetEndpointDispatcher targetEndpointDispatcher = null;
@@ -26,13 +19,14 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
2619
targetEndpointDispatcher = targetEndpointRegistry.GetTargetEndpointDispatcher(transferContext.SourceEndpointName);
2720

2821
var messageContext = transferContext.MessageToTransfer;
22+
var targetTransport = targetEndpointDispatcher.TransportName;
2923

3024
var messageToSend = new OutgoingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body);
3125
messageToSend.Headers.Remove(BridgeHeaders.FailedQ);
3226

33-
var length = transferContext.SourceTransport.Length + targetEndpointDispatcher.TransportName.Length + 2 /* ->*/;
27+
var length = transferContext.SourceTransport.Length + targetTransport.Length + 2 /* ->*/;
3428
var transferDetails = string.Create(length,
35-
(Source: transferContext.SourceTransport, Target: targetEndpointDispatcher.TransportName),
29+
(Source: transferContext.SourceTransport, Target: targetTransport),
3630
static (chars, context) =>
3731
{
3832
var position = 0;
@@ -48,12 +42,12 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
4842
//This is a failed message forwarded to ServiceControl. We need to transform the FailedQ header so that ServiceControl returns the message
4943
//to the correct queue/transport on the other side
5044

51-
TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ);
45+
TransformAddressHeader(messageToSend, targetTransport, FaultsHeaderKeys.FailedQ);
5246

5347
if (translateReplyToAddressForFailedMessages)
5448
{
5549
//Try to translate the ReplyToAddress, this is needed when an endpoint is migrated to the ServiceControl side before this message is retried
56-
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress, throwOnError: false);
50+
TransformAddressHeader(messageToSend, targetTransport, Headers.ReplyToAddress, throwOnError: false);
5751
}
5852
}
5953
else if (IsAuditMessage(messageToSend))
@@ -64,27 +58,27 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
6458
else if (IsRetryMessage(messageToSend))
6559
{
6660
//Transform the retry ack queue address
67-
TransformAddressHeader(messageToSend, targetEndpointRegistry, "ServiceControl.Retry.AcknowledgementQueue");
61+
TransformAddressHeader(messageToSend, targetTransport, "ServiceControl.Retry.AcknowledgementQueue");
6862

6963
if (translateReplyToAddressForFailedMessages)
7064
{
7165
//This is a message retried from ServiceControl. We try to translate its ReplyToAddress.
72-
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress, throwOnError: false);
66+
TransformAddressHeader(messageToSend, targetTransport, Headers.ReplyToAddress, throwOnError: false);
7367
}
7468
}
7569
else if (IsRetryEditedMessage(messageToSend))
7670
{
7771
// This is a retry message that has been edited going from one side of the bridge to another
7872
// The ReplyToAddress is transformed to allow for replies to be delivered
7973
messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails;
80-
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress, !translateReplyToAddressForFailedMessages);
74+
TransformAddressHeader(messageToSend, targetTransport, Headers.ReplyToAddress, !translateReplyToAddressForFailedMessages);
8175
}
8276
else
8377
{
8478
// This is a regular message sent between the endpoints on different sides of the bridge.
8579
// The ReplyToAddress is transformed to allow for replies to be delivered
8680
messageToSend.Headers[BridgeHeaders.Transfer] = transferDetails;
87-
TransformRegularMessageReplyToAddress(transferContext, messageToSend, targetEndpointRegistry);
81+
TransformRegularMessageReplyToAddress(transferContext, messageToSend, targetTransport);
8882
}
8983

9084
await targetEndpointDispatcher.Dispatch(
@@ -118,7 +112,7 @@ await targetEndpointDispatcher.Dispatch(
118112
void TransformRegularMessageReplyToAddress(
119113
TransferContext transferContext,
120114
OutgoingMessage messageToSend,
121-
IEndpointRegistry targetEndpointRegistry)
115+
string targetTransport)
122116
{
123117
if (!messageToSend.Headers.TryGetValue(Headers.ReplyToAddress, out var headerValue))
124118
{
@@ -133,36 +127,34 @@ void TransformRegularMessageReplyToAddress(
133127
}
134128
else
135129
{
136-
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress);
130+
TransformAddressHeader(messageToSend, targetTransport, Headers.ReplyToAddress);
137131
}
138132
}
139133

140134
void TransformAddressHeader(
141135
OutgoingMessage messageToSend,
142-
IEndpointRegistry endpointRegistry,
136+
string targetTransport,
143137
string addressHeaderKey,
144138
bool throwOnError = true)
145139
{
146-
if (!messageToSend.Headers.TryGetValue(addressHeaderKey, out var sourceAddress))
140+
if (!messageToSend.Headers.TryGetValue(addressHeaderKey, out var address))
147141
{
148142
return;
149143
}
150144

151-
if (endpointRegistry.TryTranslateToTargetAddress(sourceAddress, out string bestMatch))
145+
if (targetEndpointRegistry.AddressMap.TryTranslate(targetTransport, address, out string bestMatch))
152146
{
153147
messageToSend.Headers[addressHeaderKey] = bestMatch;
154148
}
155149
else if (throwOnError == false)
156150
{
157-
logger.LogWarning($"Could not translate {sourceAddress} address. Consider using `.HasEndpoint()` method to add missing endpoint declaration.");
151+
logger.LogWarning($"Could not translate {address} address. Consider using `.HasEndpoint()` method to add missing endpoint declaration.");
158152
}
159153
else
160154
{
161-
throw new Exception($"No target address mapping could be found for source address: {sourceAddress}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {bestMatch}");
155+
throw new Exception($"No target address mapping could be found for source address: {address}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {bestMatch ?? "(No mappings registered)"}");
162156
}
163157
}
164158

165-
readonly ILogger<MessageShovel> logger;
166-
readonly IEndpointRegistry targetEndpointRegistry;
167-
readonly bool translateReplyToAddressForFailedMessages;
159+
readonly bool translateReplyToAddressForFailedMessages = finalizedBridgeConfiguration.TranslateReplyToAddressForFailedMessages;
168160
}

0 commit comments

Comments
 (0)