Skip to content

Commit 76ffbf6

Browse files
authored
Merge pull request #777 from Particular/forward-DiscardIfNotReceivedBefore
Forward DiscardIfNotReceivedBefore with dispatch
2 parents 2531b0b + c25232d commit 76ffbf6

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.AcceptanceTesting;
6+
using NServiceBus.Features;
7+
using NUnit.Framework;
8+
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
9+
10+
public class DoNotDeliverBefore : BridgeAcceptanceTest
11+
{
12+
static string OriginalEndpointName = Conventions.EndpointNamingConvention(typeof(OriginalEndpoint));
13+
14+
[Test]
15+
public async Task Should_set_TTBR_correctly()
16+
{
17+
var ctx = await Scenario.Define<Context>()
18+
.WithBridge((bridgeConfiguration, transportBeingTested) =>
19+
{
20+
transportBeingTested.HasEndpoint("_");
21+
bridgeConfiguration.AddTransport(transportBeingTested);
22+
23+
bridgeConfiguration.AddTestTransportEndpoint(new BridgeEndpoint(OriginalEndpointName));
24+
})
25+
.WithEndpoint<MigratedEndpoint>()
26+
.WithEndpoint<OriginalEndpoint>(endpoint => endpoint.When((session, context) =>
27+
{
28+
context.SendStartTime = DateTimeOffset.UtcNow;
29+
return Task.CompletedTask;
30+
}))
31+
.Done(c => c.SendStartTime != DateTimeOffset.MinValue && DateTimeOffset.UtcNow >= c.SendStartTime.AddSeconds(10))
32+
.Run();
33+
34+
Assert.That(ctx.NumberOfMessagesReceived, Is.EqualTo(1));
35+
}
36+
37+
public class Context : BridgeScenarioContext
38+
{
39+
public int NumberOfMessagesReceived { get; set; }
40+
public DateTimeOffset SendStartTime { get; set; }
41+
}
42+
43+
public class MigratedEndpoint : EndpointConfigurationBuilder
44+
{
45+
public MigratedEndpoint()
46+
{
47+
CustomEndpointName(OriginalEndpointName);
48+
EndpointSetup<DefaultTestServer>(c =>
49+
{
50+
// Set concurrency to 1 to ensure that the first message
51+
// will delay sufficiently long for the TTBR setting
52+
// to expire on the second message
53+
c.LimitMessageProcessingConcurrencyTo(1);
54+
c.PurgeOnStartup(true);
55+
});
56+
}
57+
58+
public class AMessageHandler(Context testContext) : IHandleMessages<AMessage>
59+
{
60+
public async Task Handle(AMessage message, IMessageHandlerContext context)
61+
{
62+
testContext.NumberOfMessagesReceived++;
63+
64+
await Task.Delay(5000);
65+
}
66+
}
67+
}
68+
69+
public class OriginalEndpoint : EndpointConfigurationBuilder
70+
{
71+
public OriginalEndpoint() => EndpointSetup<DefaultServer>(cfg => cfg.EnableFeature<SendLocalFeature>());
72+
73+
public class SendLocalFeature : Feature
74+
{
75+
protected override void Setup(FeatureConfigurationContext context) => context.RegisterStartupTask(() => new SendLocalStartupTask());
76+
77+
class SendLocalStartupTask : FeatureStartupTask
78+
{
79+
protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
80+
{
81+
await session.SendLocal(new AMessage(), cancellationToken);
82+
await session.SendLocal(new AMessage(), cancellationToken);
83+
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
84+
}
85+
86+
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
87+
}
88+
}
89+
}
90+
91+
[TimeToBeReceived("00:00:03")]
92+
public class AMessage : IMessage;
93+
}

src/NServiceBus.MessagingBridge/TargetEndpointDispatcher.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
using System.Threading;
1+
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.Performance.TimeToBeReceived;
36
using NServiceBus.Raw;
47
using NServiceBus.Routing;
58
using NServiceBus.Transport;
@@ -22,6 +25,13 @@ public Task Dispatch(
2225
CancellationToken cancellationToken = default)
2326
{
2427
var transportOperation = new TransportOperation(outgoingMessage, targetAddress);
28+
29+
if (outgoingMessage.Headers.TryGetValue(Headers.TimeToBeReceived, out var timeToBeReceivedValue)
30+
&& TimeSpan.TryParse(timeToBeReceivedValue, out var timeToBeReceived))
31+
{
32+
transportOperation.Properties.DiscardIfNotReceivedBefore = new DiscardIfNotReceivedBefore(timeToBeReceived);
33+
}
34+
2535
return rawEndpoint.Dispatch(new TransportOperations(transportOperation), transaction, cancellationToken);
2636
}
2737

0 commit comments

Comments
 (0)