Skip to content

Commit 0bcb004

Browse files
authored
Merge pull request #779 from Particular/v2-3-cherry-pick
Forward DiscardIfNotReceivedBefore with dispatch
2 parents 3fab90e + bc48dd1 commit 0bcb004

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.AcceptanceTesting;
6+
using NServiceBus.Features;
7+
using NUnit.Framework;
8+
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
9+
10+
public class DoNotDeliverBefore : BridgeAcceptanceTest
11+
{
12+
static string OriginalEndpointName = Conventions.EndpointNamingConvention(typeof(OriginalEndpoint));
13+
14+
[Test]
15+
public async Task Should_set_TTBR_correctly()
16+
{
17+
var ctx = await Scenario.Define<Context>()
18+
.WithBridge(bridgeConfiguration =>
19+
{
20+
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
21+
{
22+
Name = "DefaultTestingTransport"
23+
};
24+
bridgeTransport.AddTestEndpoint<OriginalEndpoint>();
25+
bridgeConfiguration.AddTransport(bridgeTransport);
26+
27+
var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
28+
theOtherTransport.AddTestEndpoint<MigratedEndpoint>();
29+
bridgeConfiguration.AddTransport(theOtherTransport);
30+
})
31+
.WithEndpoint<MigratedEndpoint>()
32+
.WithEndpoint<OriginalEndpoint>(endpoint => endpoint.When((session, context) =>
33+
{
34+
context.SendStartTime = DateTimeOffset.UtcNow;
35+
return Task.CompletedTask;
36+
}))
37+
.Done(c => c.SendStartTime != DateTimeOffset.MinValue && DateTimeOffset.UtcNow >= c.SendStartTime.AddSeconds(10))
38+
.Run();
39+
40+
Assert.That(ctx.NumberOfMessagesReceived, Is.EqualTo(1));
41+
}
42+
43+
public class Context : ScenarioContext
44+
{
45+
public int NumberOfMessagesReceived { get; set; }
46+
public DateTimeOffset SendStartTime { get; set; }
47+
}
48+
49+
public class MigratedEndpoint : EndpointConfigurationBuilder
50+
{
51+
public MigratedEndpoint()
52+
{
53+
CustomEndpointName(OriginalEndpointName);
54+
EndpointSetup<DefaultTestServer>(c =>
55+
{
56+
// Set concurrency to 1 to ensure that the first message
57+
// will delay sufficiently long for the TTBR setting
58+
// to expire on the second message
59+
c.LimitMessageProcessingConcurrencyTo(1);
60+
c.PurgeOnStartup(true);
61+
});
62+
}
63+
64+
public class AMessageHandler : IHandleMessages<AMessage>
65+
{
66+
Context testContext;
67+
68+
public AMessageHandler(Context testContext)
69+
{
70+
this.testContext = testContext;
71+
}
72+
73+
public async Task Handle(AMessage message, IMessageHandlerContext context)
74+
{
75+
testContext.NumberOfMessagesReceived++;
76+
77+
await Task.Delay(5000);
78+
}
79+
}
80+
}
81+
82+
public class OriginalEndpoint : EndpointConfigurationBuilder
83+
{
84+
public OriginalEndpoint() => EndpointSetup<DefaultServer>(cfg => cfg.EnableFeature<SendLocalFeature>());
85+
86+
public class SendLocalFeature : Feature
87+
{
88+
protected override void Setup(FeatureConfigurationContext context) => context.RegisterStartupTask(() => new SendLocalStartupTask());
89+
90+
class SendLocalStartupTask : FeatureStartupTask
91+
{
92+
protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
93+
{
94+
await session.SendLocal(new AMessage(), cancellationToken);
95+
await session.SendLocal(new AMessage(), cancellationToken);
96+
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
97+
}
98+
99+
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
100+
}
101+
}
102+
}
103+
104+
[TimeToBeReceived("00:00:03")]
105+
public class AMessage : IMessage { }
106+
}

src/NServiceBus.MessagingBridge/TargetEndpointDispatcher.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
using System.Threading;
1+
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.Performance.TimeToBeReceived;
36
using NServiceBus.Raw;
47
using NServiceBus.Routing;
58
using NServiceBus.Transport;
@@ -27,6 +30,13 @@ public Task Dispatch(
2730
CancellationToken cancellationToken = default)
2831
{
2932
var transportOperation = new TransportOperation(outgoingMessage, targetAddress);
33+
34+
if (outgoingMessage.Headers.TryGetValue(Headers.TimeToBeReceived, out var timeToBeReceivedValue)
35+
&& TimeSpan.TryParse(timeToBeReceivedValue, out var timeToBeReceived))
36+
{
37+
transportOperation.Properties.DiscardIfNotReceivedBefore = new DiscardIfNotReceivedBefore(timeToBeReceived);
38+
}
39+
3040
return rawEndpoint.Dispatch(new TransportOperations(transportOperation), transaction, cancellationToken);
3141
}
3242

0 commit comments

Comments
 (0)