Skip to content

Commit fbf954a

Browse files
Merge pull request #385 from Particular/backport-fix
Make sure PendingTransportOperations clearing doesn't leak state from previous executions
2 parents 0c4ee85 + 75681e7 commit fbf954a

File tree

3 files changed

+79
-15
lines changed

3 files changed

+79
-15
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
namespace NServiceBus.Persistence.CosmosDB.Tests
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using NUnit.Framework;
6+
using Routing;
7+
using Transport;
8+
9+
[TestFixture]
10+
public class PendingTransportOperationsExtensionsTests
11+
{
12+
[Test]
13+
public void Should_clear_existing_operations()
14+
{
15+
var operations = new PendingTransportOperations();
16+
operations.Add(new TransportOperation(
17+
new OutgoingMessage("", new Dictionary<string, string>(), Array.Empty<byte>()),
18+
new UnicastAddressTag("someQueue")));
19+
20+
operations.Clear();
21+
22+
Assert.That(operations.Operations, Is.Empty);
23+
}
24+
25+
[Test]
26+
public void Should_support_adding_after_clearing()
27+
{
28+
var operations = new PendingTransportOperations();
29+
operations.Add(new TransportOperation(
30+
new OutgoingMessage("1", new Dictionary<string, string>(), Array.Empty<byte>()),
31+
new UnicastAddressTag("someQueue")));
32+
33+
operations.Clear();
34+
35+
operations.Add(new TransportOperation(
36+
new OutgoingMessage("2", new Dictionary<string, string>(), Array.Empty<byte>()),
37+
new UnicastAddressTag("someQueue")));
38+
39+
operations.Clear();
40+
41+
operations.Add(new TransportOperation(
42+
new OutgoingMessage("3", new Dictionary<string, string>(), Array.Empty<byte>()),
43+
new UnicastAddressTag("someQueue")));
44+
45+
Assert.That(operations.Operations, Has.Length.EqualTo(1));
46+
}
47+
}
48+
}

src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
namespace NServiceBus.Persistence.CosmosDB
22
{
33
using System;
4-
using System.Collections.Concurrent;
54
using System.Collections.Generic;
6-
using System.Linq.Expressions;
7-
using System.Reflection;
85
using System.Threading.Tasks;
96
using DelayedDelivery;
107
using DeliveryConstraints;
@@ -37,16 +34,6 @@ internal LogicalOutboxBehavior()
3734
/// <remarks>Can be renamed back to LogicalOutboxBehavior once the type is gone from the public API.</remarks>
3835
class OutboxBehavior : IBehavior<IIncomingLogicalMessageContext, IIncomingLogicalMessageContext>
3936
{
40-
static OutboxBehavior()
41-
{
42-
var field = typeof(PendingTransportOperations).GetField("operations", BindingFlags.NonPublic | BindingFlags.Instance);
43-
var targetExp = Expression.Parameter(typeof(PendingTransportOperations), "target");
44-
var fieldExp = Expression.Field(targetExp, field);
45-
var assignExp = Expression.Assign(fieldExp, Expression.Constant(new ConcurrentStack<TransportOperation>()));
46-
47-
setter = Expression.Lambda<Action<PendingTransportOperations>>(assignExp, targetExp).Compile();
48-
}
49-
5037
internal OutboxBehavior(ContainerHolderResolver containerHolderResolver, JsonSerializer serializer)
5138
{
5239
this.containerHolderResolver = containerHolderResolver;
@@ -103,7 +90,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
10390
outboxTransaction.SuppressStoreAndCommit = true;
10491

10592
var pendingTransportOperations = context.Extensions.Get<PendingTransportOperations>();
106-
setter(pendingTransportOperations);
93+
pendingTransportOperations.Clear();
10794

10895
foreach (var operation in outboxRecord.TransportOperations)
10996
{
@@ -160,7 +147,6 @@ static AddressTag DeserializeRoutingStrategy(Dictionary<string, string> options)
160147
}
161148

162149
readonly JsonSerializer serializer;
163-
static Action<PendingTransportOperations> setter;
164150
readonly ContainerHolderResolver containerHolderResolver;
165151
}
166152
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
namespace NServiceBus.Persistence.CosmosDB
2+
{
3+
using System;
4+
using System.Collections.Concurrent;
5+
using System.Linq.Expressions;
6+
using System.Reflection;
7+
using Transport;
8+
9+
static class PendingTransportOperationsExtensions
10+
{
11+
static PendingTransportOperationsExtensions()
12+
{
13+
var field = typeof(PendingTransportOperations).GetField("operations",
14+
BindingFlags.NonPublic | BindingFlags.Instance);
15+
var targetExp = Expression.Parameter(typeof(PendingTransportOperations), "target");
16+
var fieldExp = Expression.Field(targetExp, field);
17+
getter = Expression
18+
.Lambda<Func<PendingTransportOperations, ConcurrentStack<TransportOperation>>>(fieldExp, targetExp)
19+
.Compile();
20+
}
21+
22+
public static void Clear(this PendingTransportOperations operations)
23+
{
24+
var collection = getter(operations);
25+
collection.Clear();
26+
}
27+
28+
static readonly Func<PendingTransportOperations, ConcurrentStack<TransportOperation>> getter;
29+
}
30+
}

0 commit comments

Comments
 (0)