Skip to content

Commit 591f5c7

Browse files
Processing a control message causes the outbox to throw a null reference exception (#549) (#551)
* Processing a control message causes the outbox to throw a null reference exception * Update src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net> --------- Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net> # Conflicts: # src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs
1 parent c2e8d21 commit 591f5c7

File tree

6 files changed

+245
-3
lines changed

6 files changed

+245
-3
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using Extensibility;
8+
using NServiceBus.AcceptanceTesting.Support;
9+
using NServiceBus.Features;
10+
using NServiceBus.Pipeline;
11+
using NServiceBus.Routing;
12+
using NServiceBus.Transport;
13+
using NServiceBus.Unicast.Transport;
14+
using NUnit.Framework;
15+
16+
[TestFixture]
17+
public class When_using_outbox_control_message : NServiceBusAcceptanceTest
18+
{
19+
[Test]
20+
public async Task Should_work()
21+
{
22+
var runSettings = new RunSettings();
23+
runSettings.DoNotRegisterDefaultPartitionKeyProvider();
24+
25+
var context = await Scenario.Define<Context>()
26+
.WithEndpoint<Endpoint>()
27+
.Done(c => c.ProcessedControlMessage)
28+
.Run(runSettings)
29+
.ConfigureAwait(false);
30+
31+
Assert.True(context.ProcessedControlMessage);
32+
}
33+
34+
public class Context : ScenarioContext
35+
{
36+
public bool ProcessedControlMessage { get; set; }
37+
}
38+
39+
public class Endpoint : EndpointConfigurationBuilder
40+
{
41+
public Endpoint() =>
42+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
43+
{
44+
config.EnableOutbox();
45+
config.ConfigureTransport().Transactions(TransportTransactionMode.ReceiveOnly);
46+
config.EnableFeature<ControlMessageSender>();
47+
config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully");
48+
});
49+
50+
class ControlMessageSender : Feature
51+
{
52+
protected override void Setup(FeatureConfigurationContext context)
53+
{
54+
context.Container.ConfigureComponent<StartupTask>(DependencyLifecycle.InstancePerCall);
55+
context.RegisterStartupTask(b => b.Build<StartupTask>());
56+
}
57+
58+
class StartupTask : FeatureStartupTask
59+
{
60+
public StartupTask(IDispatchMessages dispatcher) => this.dispatcher = dispatcher;
61+
62+
protected override Task OnStart(IMessageSession session)
63+
{
64+
var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
65+
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint))));
66+
67+
return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), new ContextBag());
68+
}
69+
70+
protected override Task OnStop(IMessageSession sessio) => Task.CompletedTask;
71+
72+
readonly IDispatchMessages dispatcher;
73+
}
74+
}
75+
76+
class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
77+
{
78+
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;
79+
80+
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
81+
{
82+
await next();
83+
84+
testContext.ProcessedControlMessage = true;
85+
}
86+
87+
readonly Context testContext;
88+
}
89+
}
90+
}
91+
}

src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
6565
// Outbox operating at the logical stage
6666
if (!context.Extensions.TryGet<PartitionKey>(out var partitionKey))
6767
{
68-
throw new Exception("For the outbox to work the following information must be provided at latest up to the incoming physical or logical message stage. A partition key via `context.Extensions.Set<PartitionKey>(yourPartitionKey)`.");
68+
throw new Exception("For the outbox to work a partition key must be provided at latest up to the incoming physical or logical message stage. Set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
6969
}
7070

7171
var containerHolder = containerHolderResolver.ResolveAndSetIfAvailable(context.Extensions);
@@ -77,6 +77,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
7777
outboxTransaction.PartitionKey = partitionKey;
7878
outboxTransaction.StorageSession.ContainerHolder = containerHolder;
7979

80+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
81+
8082
var outboxRecord = await containerHolder.Container.ReadOutboxRecord(context.MessageId, outboxTransaction.PartitionKey.Value, serializer, context.Extensions)
8183
.ConfigureAwait(false);
8284

src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxPersister.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using Microsoft.Azure.Cosmos;
66
using Newtonsoft.Json;
77
using Outbox;
8+
using Transport;
9+
using Headers = NServiceBus.Headers;
810

911
class OutboxPersister : IOutboxStorage
1012
{
@@ -37,10 +39,19 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context)
3739

3840
if (!context.TryGet<PartitionKey>(out var partitionKey))
3941
{
40-
// we return null here to enable outbox work at logical stage
41-
return null;
42+
// because of the transactional session we cannot assume the incoming message is always present
43+
if (!context.TryGet<IncomingMessage>(out var incomingMessage) ||
44+
!incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader))
45+
{
46+
// we return null here to enable outbox work at logical stage
47+
return null;
48+
}
49+
50+
partitionKey = new PartitionKey(messageId);
51+
context.Set(partitionKey);
4252
}
4353

54+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
4455
setAsDispatchedHolder.PartitionKey = partitionKey;
4556

4657
var outboxRecord = await setAsDispatchedHolder.ContainerHolder.Container.ReadOutboxRecord(messageId, partitionKey, serializer, context)
@@ -72,6 +83,7 @@ public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextB
7283
public async Task SetAsDispatched(string messageId, ContextBag context)
7384
{
7485
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
86+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
7587

7688
var partitionKey = setAsDispatchedHolder.PartitionKey;
7789
var containerHolder = setAsDispatchedHolder.ContainerHolder;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#nullable enable
2+
3+
namespace NServiceBus.Persistence.CosmosDB
4+
{
5+
using System;
6+
7+
static class SetAsDispatchedHolderExtensions
8+
{
9+
public static void ThrowIfContainerIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder)
10+
{
11+
if (setAsDispatchedHolder.ContainerHolder != null && setAsDispatchedHolder.ContainerHolder.Container != null)
12+
{
13+
return;
14+
}
15+
16+
throw new Exception($"For the outbox to work a container must be configured. Either configure a default one using '{nameof(CosmosPersistenceConfig.DefaultContainer)}' or set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
17+
}
18+
}
19+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using NServiceBus.AcceptanceTesting.Support;
8+
using NUnit.Framework;
9+
10+
[TestFixture]
11+
public class When_no_container_information_is_configured : NServiceBusAcceptanceTest
12+
{
13+
[Test]
14+
public async Task Should_throw_meaningful_exception()
15+
{
16+
var runSettings = new RunSettings();
17+
runSettings.DoNotRegisterDefaultContainerInformationProvider();
18+
19+
var context = await Scenario.Define<Context>()
20+
.WithEndpoint<Endpoint>(b =>
21+
{
22+
b.DoNotFailOnErrorMessages();
23+
b.When(s => s.SendLocal(new MyMessage()));
24+
})
25+
.Done(c => c.FailedMessages.Any())
26+
.Run(runSettings);
27+
28+
var failure = context.FailedMessages.FirstOrDefault()
29+
.Value.First();
30+
31+
Assert.That(failure.Exception.Message, Does.Contain("container"));
32+
}
33+
34+
class Context : ScenarioContext
35+
{
36+
}
37+
38+
class Endpoint : EndpointConfigurationBuilder
39+
{
40+
public Endpoint() =>
41+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
42+
{
43+
config.EnableOutbox();
44+
config.ConfigureTransport().Transactions(TransportTransactionMode.ReceiveOnly);
45+
});
46+
47+
class MyMessageHandler : IHandleMessages<MyMessage>
48+
{
49+
public Task Handle(MyMessage message, IMessageHandlerContext context)
50+
{
51+
Assert.Fail("Should not be called");
52+
return Task.CompletedTask;
53+
}
54+
}
55+
}
56+
57+
class MyMessage : IMessage { }
58+
}
59+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using NServiceBus.AcceptanceTesting.Support;
8+
using NUnit.Framework;
9+
10+
[TestFixture]
11+
public class When_no_partition_key_is_configured : NServiceBusAcceptanceTest
12+
{
13+
[Test]
14+
public async Task Should_throw_meaningful_exception()
15+
{
16+
var runSettings = new RunSettings();
17+
runSettings.DoNotRegisterDefaultPartitionKeyProvider();
18+
19+
var context = await Scenario.Define<Context>()
20+
.WithEndpoint<Endpoint>(b =>
21+
{
22+
b.DoNotFailOnErrorMessages();
23+
b.When(s => s.SendLocal(new MyMessage()));
24+
})
25+
.Done(c => c.FailedMessages.Any())
26+
.Run(runSettings);
27+
28+
var failure = context.FailedMessages.FirstOrDefault()
29+
.Value.First();
30+
31+
Assert.That(failure.Exception.Message, Does.Contain("partition key"));
32+
}
33+
34+
class Context : ScenarioContext
35+
{
36+
}
37+
38+
class Endpoint : EndpointConfigurationBuilder
39+
{
40+
public Endpoint() =>
41+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
42+
{
43+
config.EnableOutbox();
44+
config.ConfigureTransport().Transactions(TransportTransactionMode.ReceiveOnly);
45+
});
46+
47+
class MyMessageHandler : IHandleMessages<MyMessage>
48+
{
49+
public Task Handle(MyMessage message, IMessageHandlerContext context)
50+
{
51+
Assert.Fail("Should not be called");
52+
return Task.CompletedTask;
53+
}
54+
}
55+
}
56+
57+
class MyMessage : IMessage { }
58+
}
59+
}

0 commit comments

Comments
 (0)