Skip to content

Commit 85524a9

Browse files
Processing a control message causes the outbox to throw a null reference exception (#549) (#550)
* Processing a control message causes the outbox to throw a null reference exception * Update src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs --------- Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>
1 parent 64775ee commit 85524a9

File tree

6 files changed

+236
-3
lines changed

6 files changed

+236
-3
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using AcceptanceTesting;
7+
using EndpointTemplates;
8+
using NServiceBus.AcceptanceTesting.Support;
9+
using NServiceBus.Features;
10+
using NServiceBus.Pipeline;
11+
using NServiceBus.Routing;
12+
using NServiceBus.Transport;
13+
using NServiceBus.Unicast.Transport;
14+
using NUnit.Framework;
15+
16+
[TestFixture]
17+
public class When_using_outbox_control_message : NServiceBusAcceptanceTest
18+
{
19+
[Test]
20+
public async Task Should_work()
21+
{
22+
var runSettings = new RunSettings();
23+
runSettings.DoNotRegisterDefaultPartitionKeyProvider();
24+
25+
var context = await Scenario.Define<Context>()
26+
.WithEndpoint<Endpoint>()
27+
.Done(c => c.ProcessedControlMessage)
28+
.Run(runSettings)
29+
.ConfigureAwait(false);
30+
31+
Assert.True(context.ProcessedControlMessage);
32+
}
33+
34+
public class Context : ScenarioContext
35+
{
36+
public bool ProcessedControlMessage { get; set; }
37+
}
38+
39+
public class Endpoint : EndpointConfigurationBuilder
40+
{
41+
public Endpoint() =>
42+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
43+
{
44+
config.EnableOutbox();
45+
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
46+
config.RegisterStartupTask<ControlMessageSender>();
47+
config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully");
48+
});
49+
50+
class ControlMessageSender : FeatureStartupTask
51+
{
52+
public ControlMessageSender(IMessageDispatcher dispatcher) => this.dispatcher = dispatcher;
53+
54+
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
55+
{
56+
var controlMessage = ControlMessageFactory.Create(MessageIntent.Subscribe);
57+
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint))));
58+
59+
return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken);
60+
}
61+
62+
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
63+
64+
readonly IMessageDispatcher dispatcher;
65+
}
66+
67+
class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
68+
{
69+
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;
70+
71+
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
72+
{
73+
await next();
74+
75+
testContext.ProcessedControlMessage = true;
76+
}
77+
78+
readonly Context testContext;
79+
}
80+
}
81+
}
82+
}

src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
6262
// Outbox operating at the logical stage
6363
if (!context.Extensions.TryGet<PartitionKey>(out var partitionKey))
6464
{
65-
throw new Exception("For the outbox to work the following information must be provided at latest up to the incoming physical or logical message stage. A partition key via `context.Extensions.Set<PartitionKey>(yourPartitionKey)`.");
65+
throw new Exception("For the outbox to work a partition key must be provided at latest up to the incoming physical or logical message stage. Set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
6666
}
6767

6868
var containerHolder = containerHolderResolver.ResolveAndSetIfAvailable(context.Extensions);
@@ -74,6 +74,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
7474
outboxTransaction.PartitionKey = partitionKey;
7575
outboxTransaction.StorageSession.ContainerHolder = containerHolder;
7676

77+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
78+
7779
var outboxRecord = await containerHolder.Container.ReadOutboxRecord(context.MessageId, outboxTransaction.PartitionKey.Value, serializer, context.Extensions, context.CancellationToken)
7880
.ConfigureAwait(false);
7981

src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxPersister.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
using Microsoft.Azure.Cosmos;
88
using Newtonsoft.Json;
99
using Outbox;
10+
using Transport;
11+
using Headers = NServiceBus.Headers;
1012

1113
class OutboxPersister : IOutboxStorage
1214
{
@@ -39,10 +41,19 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context, Cance
3941

4042
if (!context.TryGet<PartitionKey>(out var partitionKey))
4143
{
42-
// we return null here to enable outbox work at logical stage
43-
return null;
44+
// because of the transactional session we cannot assume the incoming message is always present
45+
if (!context.TryGet<IncomingMessage>(out var incomingMessage) ||
46+
!incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader))
47+
{
48+
// we return null here to enable outbox work at logical stage
49+
return null;
50+
}
51+
52+
partitionKey = new PartitionKey(messageId);
53+
context.Set(partitionKey);
4454
}
4555

56+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
4657
setAsDispatchedHolder.PartitionKey = partitionKey;
4758

4859
var outboxRecord = await setAsDispatchedHolder.ContainerHolder.Container.ReadOutboxRecord(messageId, partitionKey, serializer, context, cancellationToken: cancellationToken)
@@ -74,6 +85,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
7485
public async Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken = default)
7586
{
7687
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
88+
setAsDispatchedHolder.ThrowIfContainerIsNotSet();
7789

7890
var partitionKey = setAsDispatchedHolder.PartitionKey;
7991
var containerHolder = setAsDispatchedHolder.ContainerHolder;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#nullable enable
2+
3+
namespace NServiceBus.Persistence.CosmosDB
4+
{
5+
using System;
6+
7+
static class SetAsDispatchedHolderExtensions
8+
{
9+
public static void ThrowIfContainerIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder)
10+
{
11+
if (setAsDispatchedHolder.ContainerHolder is { Container: not null })
12+
{
13+
return;
14+
}
15+
16+
throw new Exception($"For the outbox to work a container must be configured. Either configure a default one using '{nameof(CosmosPersistenceConfig.DefaultContainer)}' or set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
17+
}
18+
}
19+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using NServiceBus.AcceptanceTesting.Support;
8+
using NUnit.Framework;
9+
10+
[TestFixture]
11+
public class When_no_container_information_is_configured : NServiceBusAcceptanceTest
12+
{
13+
[Test]
14+
public async Task Should_throw_meaningful_exception()
15+
{
16+
var runSettings = new RunSettings();
17+
runSettings.DoNotRegisterDefaultContainerInformationProvider();
18+
19+
var context = await Scenario.Define<Context>()
20+
.WithEndpoint<Endpoint>(b =>
21+
{
22+
b.DoNotFailOnErrorMessages();
23+
b.When(s => s.SendLocal(new MyMessage()));
24+
})
25+
.Done(c => c.FailedMessages.Any())
26+
.Run(runSettings);
27+
28+
var failure = context.FailedMessages.FirstOrDefault()
29+
.Value.First();
30+
31+
Assert.That(failure.Exception.Message, Does.Contain("container"));
32+
}
33+
34+
class Context : ScenarioContext
35+
{
36+
}
37+
38+
class Endpoint : EndpointConfigurationBuilder
39+
{
40+
public Endpoint() =>
41+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
42+
{
43+
config.EnableOutbox();
44+
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
45+
});
46+
47+
class MyMessageHandler : IHandleMessages<MyMessage>
48+
{
49+
public Task Handle(MyMessage message, IMessageHandlerContext context)
50+
{
51+
Assert.Fail("Should not be called");
52+
return Task.CompletedTask;
53+
}
54+
}
55+
}
56+
57+
class MyMessage : IMessage { }
58+
}
59+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using NServiceBus.AcceptanceTesting.Support;
8+
using NUnit.Framework;
9+
10+
[TestFixture]
11+
public class When_no_partition_key_is_configured : NServiceBusAcceptanceTest
12+
{
13+
[Test]
14+
public async Task Should_throw_meaningful_exception()
15+
{
16+
var runSettings = new RunSettings();
17+
runSettings.DoNotRegisterDefaultPartitionKeyProvider();
18+
19+
var context = await Scenario.Define<Context>()
20+
.WithEndpoint<Endpoint>(b =>
21+
{
22+
b.DoNotFailOnErrorMessages();
23+
b.When(s => s.SendLocal(new MyMessage()));
24+
})
25+
.Done(c => c.FailedMessages.Any())
26+
.Run(runSettings);
27+
28+
var failure = context.FailedMessages.FirstOrDefault()
29+
.Value.First();
30+
31+
Assert.That(failure.Exception.Message, Does.Contain("partition key"));
32+
}
33+
34+
class Context : ScenarioContext
35+
{
36+
}
37+
38+
class Endpoint : EndpointConfigurationBuilder
39+
{
40+
public Endpoint() =>
41+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
42+
{
43+
config.EnableOutbox();
44+
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
45+
});
46+
47+
class MyMessageHandler : IHandleMessages<MyMessage>
48+
{
49+
public Task Handle(MyMessage message, IMessageHandlerContext context)
50+
{
51+
Assert.Fail("Should not be called");
52+
return Task.CompletedTask;
53+
}
54+
}
55+
}
56+
57+
class MyMessage : IMessage { }
58+
}
59+
}

0 commit comments

Comments
 (0)