Skip to content

Commit 1bb6cdb

Browse files
authored
Merge pull request #589 from Particular/many-transports-fix-v3.1
Fix address mappings for when more than two transports are being used - 3.1
2 parents 1b38a0e + 0c6a56b commit 1bb6cdb

20 files changed

+452
-183
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
echo "AWS_REGION=${{ secrets.AWS_REGION }}" | Out-File -FilePath $Env:GITHUB_ENV -Encoding utf-8 -Append
9090
- name: Setup RabbitMQ
9191
if: matrix.transport == 'RabbitMQ'
92-
uses: Particular/setup-rabbitmq-action@v1.7.0
92+
uses: Particular/setup-rabbitmq-action@v1.7.1
9393
with:
9494
connection-string-name: RabbitMQTransport_ConnectionString
9595
tag: RabbitMQTransportBridge
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.AcceptanceTesting;
5+
using NServiceBus.Pipeline;
6+
using NUnit.Framework;
7+
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
8+
9+
public class ReplyToAddress : BridgeAcceptanceTest
10+
{
11+
[Test]
12+
public async Task Should_translate_address_for_already_migrated_endpoint()
13+
{
14+
var ctx = await Scenario.Define<Context>()
15+
.WithEndpoint<SendingEndpoint>(builder =>
16+
{
17+
builder.DoNotFailOnErrorMessages();
18+
builder.When(c => c.EndpointsStarted, (session, _) =>
19+
{
20+
var options = new SendOptions();
21+
options.SetDestination(Conventions.EndpointNamingConvention(typeof(SecondMigratedEndpoint)));
22+
23+
return session.Send(new ADelayedMessage(), options);
24+
});
25+
})
26+
.WithEndpoint<FirstMigratedEndpoint>()
27+
.WithEndpoint<SecondMigratedEndpoint>()
28+
.WithBridge(bridgeConfiguration =>
29+
{
30+
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
31+
{
32+
Name = "DefaultTestingTransport"
33+
};
34+
bridgeTransport.AddTestEndpoint<SendingEndpoint>();
35+
bridgeConfiguration.AddTransport(bridgeTransport);
36+
37+
var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
38+
theOtherTransport.AddTestEndpoint<FirstMigratedEndpoint>();
39+
theOtherTransport.AddTestEndpoint<SecondMigratedEndpoint>();
40+
bridgeConfiguration.AddTransport(theOtherTransport);
41+
})
42+
.Done(c => c.ADelayedMessageReceived)
43+
.Run();
44+
45+
}
46+
47+
public class SendingEndpoint : EndpointConfigurationBuilder
48+
{
49+
public SendingEndpoint() => EndpointSetup<DefaultTestServer>((c, runDescriptor) =>
50+
c.Pipeline.Register(new OverrideReplyToAddress(), "Checks that the retry confirmation arrived"));
51+
52+
class OverrideReplyToAddress : Behavior<IOutgoingPhysicalMessageContext>
53+
{
54+
public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
55+
{
56+
context.Headers[Headers.ReplyToAddress] = Conventions.EndpointNamingConvention(typeof(FirstMigratedEndpoint));
57+
await next();
58+
59+
}
60+
}
61+
}
62+
63+
public class FirstMigratedEndpoint : EndpointConfigurationBuilder
64+
{
65+
public FirstMigratedEndpoint() => EndpointSetup<DefaultServer>();
66+
}
67+
68+
public class SecondMigratedEndpoint : EndpointConfigurationBuilder
69+
{
70+
public SecondMigratedEndpoint() => EndpointSetup<DefaultServer>();
71+
72+
class ADelayedMessageHandler : IHandleMessages<ADelayedMessage>
73+
{
74+
public ADelayedMessageHandler(Context context) => testContext = context;
75+
76+
public Task Handle(ADelayedMessage message, IMessageHandlerContext context)
77+
{
78+
testContext.ADelayedMessageReceived = true;
79+
return Task.CompletedTask;
80+
}
81+
82+
readonly Context testContext;
83+
}
84+
}
85+
86+
public class Context : ScenarioContext
87+
{
88+
public bool ADelayedMessageReceived { get; set; }
89+
}
90+
91+
public class ADelayedMessage : IMessage
92+
{
93+
}
94+
}

src/AcceptanceTests/Shared/Support/BridgeAcceptanceTest.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public Task TearDown()
4141
}
4242

4343
protected TransportDefinition TransportBeingTested => bridgeTransportDefinition.TransportDefinition;
44-
protected TransportDefinition TestTransport;
4544

4645
BridgeTransportDefinition bridgeTransportDefinition;
4746
}

src/AcceptanceTests/Shared/Support/DefaultTestServer.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
public class DefaultTestServer : IEndpointSetupTemplate
1010
{
11+
const string StorageDirectory = "DefaultTestingTransport";
12+
1113
#pragma warning disable PS0013 // Add a CancellationToken parameter type argument
1214
public virtual async Task<EndpointConfiguration> GetConfiguration(RunDescriptor runDescriptor, EndpointCustomizationConfiguration endpointConfiguration, Func<EndpointConfiguration, Task> configurationBuilderCustomization)
1315
#pragma warning restore PS0013 // Add a CancellationToken parameter type argument
@@ -25,7 +27,7 @@ public virtual async Task<EndpointConfiguration> GetConfiguration(RunDescriptor
2527

2628
configuration.RegisterComponentsAndInheritanceHierarchy(runDescriptor);
2729

28-
var transportDefinition = GetTestTransportDefinition();
30+
var transportDefinition = GetTransportDefinition();
2931
configuration.UseTransport(transportDefinition);
3032

3133
runDescriptor.OnTestCompleted(_ =>
@@ -43,12 +45,17 @@ public virtual async Task<EndpointConfiguration> GetConfiguration(RunDescriptor
4345
return configuration;
4446
}
4547

46-
public static AcceptanceTestingTransport GetTestTransportDefinition()
48+
protected static AcceptanceTestingTransport GetTransportDefinition(string instanceId)
4749
{
4850
var testRunId = TestContext.CurrentContext.Test.ID;
51+
4952
//make sure to run in a non-default directory to not clash with learning transport and other acceptance tests
50-
var storageDir = Path.Combine(Path.GetTempPath(), testRunId, "DefaultTestingTransport");
53+
var storagePath = Path.Combine(Path.GetTempPath(), testRunId, instanceId);
5154

52-
return new AcceptanceTestingTransport { StorageLocation = storageDir };
55+
return new AcceptanceTestingTransport { StorageLocation = storagePath };
5356
}
57+
58+
protected virtual AcceptanceTestingTransport GetTransportDefinition() => GetTransportDefinition(StorageDirectory);
59+
60+
public static AcceptanceTestingTransport GetTestTransportDefinition() => GetTransportDefinition(StorageDirectory);
5461
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using NServiceBus;
2+
3+
public class ReceivingTestServer : DefaultTestServer
4+
{
5+
const string StorageDirectory = "ReceiverTestingTransport";
6+
7+
protected override AcceptanceTestingTransport GetTransportDefinition() => GetTransportDefinition(StorageDirectory);
8+
9+
public static AcceptanceTestingTransport GetReceivingTransportDefinition() => GetTransportDefinition(StorageDirectory);
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using NServiceBus;
2+
3+
public class SendingTestServer : DefaultTestServer
4+
{
5+
const string StorageDirectory = "SenderTestingTransport";
6+
7+
protected override AcceptanceTestingTransport GetTransportDefinition() => GetTransportDefinition(StorageDirectory);
8+
9+
public static AcceptanceTestingTransport GetSendingTransportDefinition() => GetTransportDefinition(StorageDirectory);
10+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using System.Threading.Tasks;
2+
using NServiceBus;
3+
using NServiceBus.AcceptanceTesting;
4+
using NUnit.Framework;
5+
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
6+
7+
public class ThreeTransports : BridgeAcceptanceTest
8+
{
9+
[Test(Description = "Replicates issue reported in https://github.com/Particular/NServiceBus.MessagingBridge/issues/369")]
10+
public async Task Should_translate_address_correctly_for_target_transport()
11+
{
12+
var endpointOnTestingTransportName = Conventions.EndpointNamingConvention(typeof(EndpointOnTestingTransport));
13+
var endpointOnTransportUnderTestName = Conventions.EndpointNamingConvention(typeof(EndpointOnTransportUnderTest));
14+
15+
var options = new SendOptions();
16+
options.SetDestination(Conventions.EndpointNamingConvention(typeof(ReceivingEndpoint)));
17+
18+
var ctx = await Scenario.Define<Context>()
19+
.WithEndpoint<ReceivingEndpoint>()
20+
.WithEndpoint<EndpointOnTestingTransport>(builder => builder
21+
.When(c => c.EndpointsStarted, (session, _) => session.Send(new SomeMessage { From = endpointOnTestingTransportName }, options)))
22+
.WithEndpoint<EndpointOnTransportUnderTest>(builder => builder
23+
.When(c => c.EndpointsStarted, (session, _) => session.Send(new SomeMessage { From = endpointOnTransportUnderTestName }, options)))
24+
.WithBridge(bridgeConfiguration =>
25+
{
26+
bridgeConfiguration.TranslateReplyToAddressForFailedMessages();
27+
28+
var receivingTransport = new TestableBridgeTransport(ReceivingTestServer.GetReceivingTransportDefinition())
29+
{
30+
Name = "ReceivingTransport"
31+
};
32+
receivingTransport.AddTestEndpoint<ReceivingEndpoint>();
33+
bridgeConfiguration.AddTransport(receivingTransport);
34+
35+
var acceptanceTestingTransport = new TestableBridgeTransport(SendingTestServer.GetSendingTransportDefinition())
36+
{
37+
Name = "SendingAcceptanceTestingTransportName"
38+
};
39+
acceptanceTestingTransport.AddTestEndpoint<EndpointOnTestingTransport>();
40+
bridgeConfiguration.AddTransport(acceptanceTestingTransport);
41+
42+
var transportUnderTest = new TestableBridgeTransport(TransportBeingTested)
43+
{
44+
Name = "TransportUnderTest"
45+
};
46+
transportUnderTest.AddTestEndpoint<EndpointOnTransportUnderTest>();
47+
bridgeConfiguration.AddTransport(transportUnderTest);
48+
})
49+
.Done(c => c.ReceivedMessageCount == 2)
50+
.Run();
51+
52+
}
53+
54+
public class ReceivingEndpoint : EndpointConfigurationBuilder
55+
{
56+
public ReceivingEndpoint() => EndpointSetup<ReceivingTestServer>();
57+
58+
class SomeMessageHandler(Context context) : IHandleMessages<SomeMessage>
59+
{
60+
public Task Handle(SomeMessage message, IMessageHandlerContext context)
61+
{
62+
Assert.Multiple(() =>
63+
{
64+
Assert.That(context.MessageHeaders.TryGetValue("NServiceBus.ReplyToAddress", out var headerValue), Is.True);
65+
Assert.That(headerValue, Is.EqualTo(message.From));
66+
});
67+
68+
testContext.ReceivedMessageCount++;
69+
70+
return Task.CompletedTask;
71+
}
72+
73+
readonly Context testContext = context;
74+
}
75+
}
76+
77+
public class EndpointOnTestingTransport : EndpointConfigurationBuilder
78+
{
79+
public EndpointOnTestingTransport() => EndpointSetup<SendingTestServer>();
80+
}
81+
82+
public class EndpointOnTransportUnderTest : EndpointConfigurationBuilder
83+
{
84+
public EndpointOnTransportUnderTest() => EndpointSetup<DefaultServer>();
85+
}
86+
87+
public class Context : ScenarioContext
88+
{
89+
public int ReceivedMessageCount { get; set; }
90+
}
91+
92+
public class SomeMessage : IMessage
93+
{
94+
public string From { get; set; }
95+
}
96+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using NServiceBus;
5+
using NServiceBus.Raw;
6+
using NServiceBus.Transport;
7+
8+
/// <summary>
9+
/// How to map an address depends upon which transport the message is being sent to,
10+
/// so it is necessary to have an address mapping dictionary for each transport
11+
/// </summary>
12+
class AddressMap(IReadOnlyDictionary<string, IStartableRawEndpoint> dispatchers) : IAddressMap
13+
{
14+
readonly Dictionary<string, TransportAddressMap> addressMap =
15+
dispatchers.ToDictionary(dispatcher => dispatcher.Key, _ => new TransportAddressMap());
16+
17+
readonly HashSet<string> endpoints = [];
18+
19+
public void Add(BridgeTransport transport, BridgeEndpoint endpoint)
20+
{
21+
if (!endpoints.Add(endpoint.Name))
22+
{
23+
throw new InvalidOperationException($"{endpoint.Name} has already been added to the address map.");
24+
}
25+
26+
var queueAddress = new QueueAddress(endpoint.Name);
27+
28+
foreach (var targetTransport in addressMap.Keys)
29+
{
30+
var targetAddress = DetermineTransportAddress(targetTransport, transport.Name, endpoint.QueueAddress, queueAddress);
31+
32+
foreach (var sourceTransport in addressMap.Keys)
33+
{
34+
var sourceAddress = DetermineTransportAddress(sourceTransport, transport.Name, endpoint.QueueAddress, queueAddress);
35+
36+
addressMap[targetTransport][sourceAddress] = targetAddress;
37+
}
38+
}
39+
}
40+
41+
string DetermineTransportAddress(string transportToMap, string endpointTransport, string overriddenQueueAddress, QueueAddress queueAddress) => transportToMap == endpointTransport
42+
? overriddenQueueAddress ?? dispatchers[transportToMap].ToTransportAddress(queueAddress)
43+
: dispatchers[transportToMap].ToTransportAddress(queueAddress);
44+
45+
public bool TryTranslate(string targetTransport, string address, out string bestMatch)
46+
{
47+
var transportAddressMappings = addressMap[targetTransport];
48+
49+
if (transportAddressMappings.TryGetValue(address, out bestMatch))
50+
{
51+
return true;
52+
}
53+
54+
bestMatch = address.GetClosestMatch(transportAddressMappings.Keys);
55+
return false;
56+
}
57+
}
58+
59+
class TransportAddressMap : Dictionary<string, string>
60+
{
61+
}

src/NServiceBus.MessagingBridge/Configuration/BridgeConfiguration.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,14 @@ internal FinalizedBridgeConfiguration FinalizeConfiguration(ILogger<BridgeConfig
4949
throw new InvalidOperationException("At least two transports needs to be configured");
5050
}
5151

52-
var tranportsWithNoEndpoints = transportConfigurations.Where(tc => !tc.Endpoints.Any())
53-
.Select(t => t.Name).ToArray();
52+
var allEndpoints = transportConfigurations
53+
.SelectMany(t => t.Endpoints).ToArray();
5454

55-
if (tranportsWithNoEndpoints.Any())
55+
if (!allEndpoints.Any())
5656
{
57-
var endpointNames = string.Join(", ", tranportsWithNoEndpoints);
58-
throw new InvalidOperationException($"At least one endpoint needs to be configured for transport(s): {endpointNames}");
57+
throw new InvalidOperationException($"At least one endpoint needs to be configured");
5958
}
6059

61-
var allEndpoints = transportConfigurations
62-
.SelectMany(t => t.Endpoints).ToArray();
63-
6460
var duplicatedEndpoints = allEndpoints
6561
.GroupBy(e => e.Name)
6662
.Where(g => g.Count() > 1)

src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public Task<IStartableRawEndpoint> CreateProxy(
5858
return RawEndpoint.Create(transportEndpointConfiguration, cancellationToken);
5959
}
6060

61+
public static Task<IStartableRawEndpoint> CreateDispatcher(BridgeTransport transportConfiguration, CancellationToken cancellationToken = default)
62+
{
63+
var endpointConfiguration = RawEndpointConfiguration.CreateSendOnly($"bridge-dispatcher-{transportConfiguration.Name}", transportConfiguration.TransportDefinition);
64+
65+
return RawEndpoint.Create(endpointConfiguration, cancellationToken);
66+
}
67+
6168
static bool IsSubscriptionMessage(IReadOnlyDictionary<string, string> messageContextHeaders)
6269
{
6370
var messageIntent = default(MessageIntent);

0 commit comments

Comments
 (0)