Skip to content

Commit d2eb3c7

Browse files
committed
Backport refactoring of bridge startup which includes creating dedicated dispatchers for each configured transport
1 parent 04c4de7 commit d2eb3c7

File tree

8 files changed

+193
-122
lines changed

8 files changed

+193
-122
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.AcceptanceTesting;
5+
using NServiceBus.Pipeline;
6+
using NUnit.Framework;
7+
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
8+
9+
public class ReplyToAddress : BridgeAcceptanceTest
10+
{
11+
[Test]
12+
public async Task Should_translate_address_for_already_migrated_endpoint()
13+
{
14+
var ctx = await Scenario.Define<Context>()
15+
.WithEndpoint<SendingEndpoint>(builder =>
16+
{
17+
builder.DoNotFailOnErrorMessages();
18+
builder.When(c => c.EndpointsStarted, (session, _) =>
19+
{
20+
var options = new SendOptions();
21+
options.SetDestination(Conventions.EndpointNamingConvention(typeof(SecondMigratedEndpoint)));
22+
23+
return session.Send(new ADelayedMessage(), options);
24+
});
25+
})
26+
.WithEndpoint<FirstMigratedEndpoint>()
27+
.WithEndpoint<SecondMigratedEndpoint>()
28+
.WithBridge(bridgeConfiguration =>
29+
{
30+
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
31+
{
32+
Name = "DefaultTestingTransport"
33+
};
34+
bridgeTransport.AddTestEndpoint<SendingEndpoint>();
35+
bridgeConfiguration.AddTransport(bridgeTransport);
36+
37+
var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
38+
theOtherTransport.AddTestEndpoint<FirstMigratedEndpoint>();
39+
theOtherTransport.AddTestEndpoint<SecondMigratedEndpoint>();
40+
bridgeConfiguration.AddTransport(theOtherTransport);
41+
})
42+
.Done(c => c.ADelayedMessageReceived)
43+
.Run();
44+
45+
}
46+
47+
public class SendingEndpoint : EndpointConfigurationBuilder
48+
{
49+
public SendingEndpoint() => EndpointSetup<DefaultTestServer>((c, runDescriptor) =>
50+
c.Pipeline.Register(new OverrideReplyToAddress(), "Checks that the retry confirmation arrived"));
51+
52+
class OverrideReplyToAddress : Behavior<IOutgoingPhysicalMessageContext>
53+
{
54+
public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
55+
{
56+
context.Headers[Headers.ReplyToAddress] = Conventions.EndpointNamingConvention(typeof(FirstMigratedEndpoint));
57+
await next();
58+
59+
}
60+
}
61+
}
62+
63+
public class FirstMigratedEndpoint : EndpointConfigurationBuilder
64+
{
65+
public FirstMigratedEndpoint() => EndpointSetup<DefaultServer>();
66+
}
67+
68+
public class SecondMigratedEndpoint : EndpointConfigurationBuilder
69+
{
70+
public SecondMigratedEndpoint() => EndpointSetup<DefaultServer>();
71+
72+
class ADelayedMessageHandler : IHandleMessages<ADelayedMessage>
73+
{
74+
public ADelayedMessageHandler(Context context) => testContext = context;
75+
76+
public Task Handle(ADelayedMessage message, IMessageHandlerContext context)
77+
{
78+
testContext.ADelayedMessageReceived = true;
79+
return Task.CompletedTask;
80+
}
81+
82+
readonly Context testContext;
83+
}
84+
}
85+
86+
public class Context : ScenarioContext
87+
{
88+
public bool ADelayedMessageReceived { get; set; }
89+
}
90+
91+
public class ADelayedMessage : IMessage
92+
{
93+
}
94+
}

src/NServiceBus.MessagingBridge/Configuration/BridgeConfiguration.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,14 @@ internal FinalizedBridgeConfiguration FinalizeConfiguration(ILogger<BridgeConfig
4949
throw new InvalidOperationException("At least two transports needs to be configured");
5050
}
5151

52-
var tranportsWithNoEndpoints = transportConfigurations.Where(tc => !tc.Endpoints.Any())
53-
.Select(t => t.Name).ToArray();
52+
var allEndpoints = transportConfigurations
53+
.SelectMany(t => t.Endpoints).ToArray();
5454

55-
if (tranportsWithNoEndpoints.Any())
55+
if (!allEndpoints.Any())
5656
{
57-
var endpointNames = string.Join(", ", tranportsWithNoEndpoints);
58-
throw new InvalidOperationException($"At least one endpoint needs to be configured for transport(s): {endpointNames}");
57+
throw new InvalidOperationException($"At least one endpoint needs to be configured");
5958
}
6059

61-
var allEndpoints = transportConfigurations
62-
.SelectMany(t => t.Endpoints).ToArray();
63-
6460
var duplicatedEndpoints = allEndpoints
6561
.GroupBy(e => e.Name)
6662
.Where(g => g.Count() > 1)

src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public Task<IStartableRawEndpoint> CreateProxy(
5858
return RawEndpoint.Create(transportEndpointConfiguration, cancellationToken);
5959
}
6060

61+
public static Task<IStartableRawEndpoint> CreateDispatcher(BridgeTransport transportConfiguration, CancellationToken cancellationToken = default)
62+
{
63+
var endpointConfiguration = RawEndpointConfiguration.CreateSendOnly($"bridge-dispatcher-{transportConfiguration.Name}", transportConfiguration.TransportDefinition);
64+
65+
return RawEndpoint.Create(endpointConfiguration, cancellationToken);
66+
}
67+
6168
static bool IsSubscriptionMessage(IReadOnlyDictionary<string, string> messageContextHeaders)
6269
{
6370
var messageIntent = default(MessageIntent);

src/NServiceBus.MessagingBridge/EndpointRegistry.cs

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,73 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Extensions.Logging;
47
using NServiceBus;
58
using NServiceBus.Raw;
69
using NServiceBus.Transport;
710

8-
class EndpointRegistry : IEndpointRegistry
11+
class EndpointRegistry(EndpointProxyFactory endpointProxyFactory, ILogger<StartableBridge> logger) : IEndpointRegistry
912
{
10-
public void RegisterDispatcher(
11-
BridgeEndpoint endpoint,
12-
string targetTransportName,
13-
IStartableRawEndpoint rawEndpoint)
13+
//NOTE: This method cannot have a return type of IAsyncEnumerable as all the endpoints need to be configured on the bridge before the bridge can start processing messages.
14+
public async Task<IEnumerable<ProxyRegistration>> Initialize(IReadOnlyCollection<BridgeTransport> transportConfigurations, CancellationToken cancellationToken = default)
1415
{
15-
registrations.Add(new ProxyRegistration
16-
{
17-
Endpoint = endpoint,
18-
TranportName = targetTransportName,
19-
RawEndpoint = rawEndpoint
20-
});
21-
}
22-
23-
public void ApplyMappings(IReadOnlyCollection<BridgeTransport> transportConfigurations)
24-
{
25-
foreach (var registration in registrations)
26-
{
27-
var endpoint = registration.Endpoint;
28-
29-
// target transport is the transport where this endpoint is actually running
30-
var targetTransport = transportConfigurations.Single(t => t.Endpoints.Any(e => e.Name == endpoint.Name));
16+
// Assume that it is the number of endpoints that is more likely to scale up in size than the number of transports (typically only 2).
17+
// Therefore in cases where it might matter, it is more efficient to iterate over the transports multiple times.
18+
transportConfigurationMappings = transportConfigurations.ToDictionary(t => t.Name, t => t);
3119

32-
// just pick the first proxy that is running on the target transport since
33-
// we just need to be able to send messages to that transport
34-
var proxyEndpoint = registrations
35-
.First(r => r.TranportName == targetTransport.Name)
36-
.RawEndpoint;
20+
IList<ProxyRegistration> proxyRegistrations = [];
3721

38-
//This value represents in fact the endpoint's name. It is wrapped in the QueueAddress class only because
39-
//the ToTransportAddress API expects it.
40-
var endpointName = new QueueAddress(endpoint.Name);
41-
42-
var transportAddress = endpoint.QueueAddress ?? proxyEndpoint.ToTransportAddress(endpointName);
43-
44-
endpointAddressMappings[registration.Endpoint.Name] = transportAddress;
45-
46-
targetEndpointAddressMappings[transportAddress] = registration.RawEndpoint.ToTransportAddress(endpointName);
47-
48-
targetEndpointDispatchers[registration.Endpoint.Name] = new TargetEndpointDispatcher(
49-
targetTransport.Name,
50-
proxyEndpoint,
51-
transportAddress);
22+
foreach (var targetTransport in transportConfigurations)
23+
{
24+
// Create the dispatcher for this transport
25+
var dispatchEndpoint = await EndpointProxyFactory.CreateDispatcher(targetTransport, cancellationToken).ConfigureAwait(false);
26+
27+
proxyRegistrations.Add(new ProxyRegistration
28+
{
29+
Endpoint = null,
30+
TranportName = targetTransport.Name,
31+
RawEndpoint = dispatchEndpoint
32+
});
33+
34+
// create required proxy endpoints on all transports
35+
foreach (var endpointToSimulate in targetTransport.Endpoints)
36+
{
37+
// Endpoint will need to be proxied on the other transports
38+
foreach (var proxyTransport in transportConfigurationMappings.Where(kvp => kvp.Key != targetTransport.Name).Select(kvp => kvp.Value))
39+
{
40+
var startableEndpointProxy = await endpointProxyFactory.CreateProxy(endpointToSimulate, proxyTransport, cancellationToken)
41+
.ConfigureAwait(false);
42+
43+
logger.LogInformation("Proxy for endpoint {endpoint} created on {transport}", endpointToSimulate.Name, proxyTransport.Name);
44+
45+
var queueAddress = new QueueAddress(endpointToSimulate.Name);
46+
var targetTransportAddress = dispatchEndpoint.ToTransportAddress(queueAddress);
47+
var sourceTransportAddress = startableEndpointProxy.ToTransportAddress(queueAddress);
48+
49+
endpointAddressMappings[endpointToSimulate.Name] = endpointToSimulate.QueueAddress ?? targetTransportAddress;
50+
targetEndpointAddressMappings[targetTransportAddress] = sourceTransportAddress;
51+
if (targetTransportAddress != sourceTransportAddress)
52+
{
53+
// Also add the reverse mapping so that any messages that were in-flight before a bridge configuration change
54+
// can still have their address translated correctly. This also allows for the case where duplicate logical endpoints
55+
// are running as a competing consumer with the bridge in canary/parallel deployment scenarios.
56+
targetEndpointAddressMappings[sourceTransportAddress] = targetTransportAddress;
57+
}
58+
targetEndpointDispatchers[endpointToSimulate.Name] = new TargetEndpointDispatcher(targetTransport.Name, dispatchEndpoint, targetTransportAddress);
59+
60+
proxyRegistrations.Add(new ProxyRegistration
61+
{
62+
Endpoint = endpointToSimulate,
63+
TranportName = proxyTransport.Name,
64+
RawEndpoint = startableEndpointProxy
65+
});
66+
}
67+
}
5268
}
69+
70+
return proxyRegistrations;
5371
}
5472

5573
public TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpointName)
@@ -96,12 +114,11 @@ static string GetClosestMatchForExceptionMessage(string sourceEndpointName, IEnu
96114
return nearestMatch ?? "(No mappings registered)";
97115
}
98116

99-
public IEnumerable<ProxyRegistration> Registrations => registrations;
117+
Dictionary<string, BridgeTransport> transportConfigurationMappings = [];
100118

101119
readonly Dictionary<string, TargetEndpointDispatcher> targetEndpointDispatchers = [];
102120
readonly Dictionary<string, string> targetEndpointAddressMappings = [];
103121
readonly Dictionary<string, string> endpointAddressMappings = [];
104-
readonly List<ProxyRegistration> registrations = [];
105122

106123
public class ProxyRegistration
107124
{

src/NServiceBus.MessagingBridge/RawEndpoints/InitializableRawEndpoint.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace NServiceBus.Raw
22
{
33
using NServiceBus.Transport;
4+
using System;
45
using System.Collections.Generic;
56
using System.Threading;
67
using System.Threading.Tasks;
@@ -20,13 +21,18 @@ public async Task<IStartableRawEndpoint> Initialize(CancellationToken cancellati
2021
null); //null means "not hosted by core", transport SHOULD adjust accordingly to not assume things
2122

2223
var usePubSub = rawEndpointConfiguration.TransportDefinition.SupportsPublishSubscribe && !rawEndpointConfiguration.PublishAndSubscribeDisabled;
23-
var receivers = new[]{
24-
new ReceiveSettings(
25-
rawEndpointConfiguration.EndpointName,
26-
new QueueAddress(rawEndpointConfiguration.EndpointName),
27-
usePubSub,
28-
false,
29-
rawEndpointConfiguration.PoisonMessageQueue)};
24+
var receivers = Array.Empty<ReceiveSettings>();
25+
26+
if (!rawEndpointConfiguration.SendOnly)
27+
{
28+
receivers = [
29+
new ReceiveSettings(
30+
rawEndpointConfiguration.EndpointName,
31+
new QueueAddress(rawEndpointConfiguration.EndpointName),
32+
usePubSub,
33+
false,
34+
rawEndpointConfiguration.PoisonMessageQueue)];
35+
}
3036

3137
var sendingQueues = new List<string>(rawEndpointConfiguration.AdditionalQueues);
3238

src/NServiceBus.MessagingBridge/RawEndpoints/StartableRawEndpoint.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ public StartableRawEndpoint(
1414
RawCriticalError criticalError)
1515
{
1616
this.criticalError = criticalError;
17-
messagePump = transportInfrastructure.Receivers.Values.First();
17+
messagePump = transportInfrastructure.Receivers.Values.FirstOrDefault();
1818
dispatcher = transportInfrastructure.Dispatcher;
1919
this.rawEndpointConfiguration = rawEndpointConfiguration;
2020
this.transportInfrastructure = transportInfrastructure;
21-
SubscriptionManager = messagePump.Subscriptions;
22-
TransportAddress = messagePump.ReceiveAddress;
21+
SubscriptionManager = messagePump?.Subscriptions;
22+
TransportAddress = messagePump?.ReceiveAddress;
2323
}
2424

2525
public async Task<IReceivingRawEndpoint> Start(CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)