Skip to content

Commit aa1cb38

Browse files
authored
Merge pull request #134 from Particular/release-0-2-1
2 parents 9f50bf4 + eae25e1 commit aa1cb38

File tree

3 files changed

+85
-6
lines changed

3 files changed

+85
-6
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
namespace NServiceBus.PersistenceTesting.Outbox
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
using NServiceBus.Outbox;
6+
using NUnit.Framework;
7+
8+
[TestFixtureSource(typeof(PersistenceTestsConfiguration), "OutboxVariants")]
9+
class ExtendedOutboxStorageTests
10+
{
11+
public ExtendedOutboxStorageTests(TestVariant param)
12+
{
13+
this.param = param;
14+
}
15+
16+
[OneTimeSetUp]
17+
public async Task OneTimeSetUp()
18+
{
19+
configuration = new PersistenceTestsConfiguration(param) { OutboxTimeToLiveInSeconds = 1 };
20+
await configuration.Configure();
21+
}
22+
23+
[OneTimeTearDown]
24+
public async Task OneTimeTearDown()
25+
{
26+
await configuration.Cleanup();
27+
}
28+
29+
[Test]
30+
public async Task Should_expire_dispatched_messages()
31+
{
32+
configuration.RequiresOutboxSupport();
33+
34+
var storage = configuration.OutboxStorage;
35+
var ctx = configuration.GetContextBagForOutbox();
36+
37+
var messageId = Guid.NewGuid().ToString();
38+
await storage.Get(messageId, ctx);
39+
40+
var messageToStore = new OutboxMessage(messageId, new[] { new TransportOperation("x", null, null, null) });
41+
using (var transaction = await storage.BeginTransaction(ctx))
42+
{
43+
await storage.Store(messageToStore, transaction, ctx);
44+
45+
await transaction.Commit();
46+
}
47+
48+
await storage.SetAsDispatched(messageId, configuration.GetContextBagForOutbox());
49+
50+
OutboxMessage message = null;
51+
for (int i = 1; i < 10; i++)
52+
{
53+
message = await storage.Get(messageId, configuration.GetContextBagForOutbox());
54+
if (message != null)
55+
{
56+
await Task.Delay(i * 550);
57+
}
58+
}
59+
60+
Assert.That(message, Is.Null, "The outbox record was not expired.");
61+
}
62+
63+
IPersistenceTestsConfiguration configuration;
64+
TestVariant param;
65+
}
66+
}

src/NServiceBus.Persistence.CosmosDB.PersistenceTests/PersistenceTestsConfiguration.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public partial class PersistenceTestsConfiguration : IProvideCosmosClient
3232

3333
public CosmosClient Client { get; } = SetupFixture.CosmosDbClient;
3434

35+
public int OutboxTimeToLiveInSeconds { get; set; } = 100;
36+
3537
public Task Configure()
3638
{
3739
// with this we have a partition key per run which makes things naturally isolated
@@ -46,14 +48,17 @@ public Task Configure()
4648
var resolver = new ContainerHolderResolver(this, new ContainerInformation(SetupFixture.ContainerName, partitionKeyPath), SetupFixture.DatabaseName);
4749
SynchronizedStorage = new StorageSessionFactory(resolver, null);
4850
SagaStorage = new SagaPersister(serializer, false);
49-
OutboxStorage = new OutboxPersister(resolver, serializer, 100);
51+
OutboxStorage = new OutboxPersister(resolver, serializer, OutboxTimeToLiveInSeconds);
5052

5153
GetContextBagForSagaStorage = () =>
5254
{
5355
var contextBag = new ContextBag();
5456
// This populates the partition key required to participate in a shared transaction
55-
var setAsDispatchedHolder = new SetAsDispatchedHolder { PartitionKey = new PartitionKey(partitionKey) };
56-
setAsDispatchedHolder.ContainerHolder = resolver.ResolveAndSetIfAvailable(contextBag);
57+
var setAsDispatchedHolder = new SetAsDispatchedHolder
58+
{
59+
PartitionKey = new PartitionKey(partitionKey),
60+
ContainerHolder = resolver.ResolveAndSetIfAvailable(contextBag)
61+
};
5762
contextBag.Set(setAsDispatchedHolder);
5863
contextBag.Set(new PartitionKey(partitionKey));
5964
return contextBag;
@@ -63,8 +68,11 @@ public Task Configure()
6368
{
6469
var contextBag = new ContextBag();
6570
// This populates the partition key required to participate in a shared transaction
66-
var setAsDispatchedHolder = new SetAsDispatchedHolder { PartitionKey = new PartitionKey(partitionKey) };
67-
setAsDispatchedHolder.ContainerHolder = resolver.ResolveAndSetIfAvailable(contextBag);
71+
var setAsDispatchedHolder = new SetAsDispatchedHolder
72+
{
73+
PartitionKey = new PartitionKey(partitionKey),
74+
ContainerHolder = resolver.ResolveAndSetIfAvailable(contextBag)
75+
};
6876
contextBag.Set(setAsDispatchedHolder);
6977
contextBag.Set(new PartitionKey(partitionKey));
7078
return contextBag;

src/NServiceBus.Persistence.CosmosDB/Installer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ await clientProvider.Client.CreateDatabaseIfNotExistsAsync(installerSettings.Dat
3939

4040
var database = clientProvider.Client.GetDatabase(installerSettings.DatabaseName);
4141

42-
var containerProperties = new ContainerProperties(installerSettings.ContainerName, installerSettings.PartitionKeyPath);
42+
var containerProperties =
43+
new ContainerProperties(installerSettings.ContainerName, installerSettings.PartitionKeyPath)
44+
{
45+
// in order for individual items TTL to work (example outbox records)
46+
DefaultTimeToLive = -1
47+
};
4348

4449
await database.CreateContainerIfNotExistsAsync(containerProperties)
4550
.ConfigureAwait(false);

0 commit comments

Comments
 (0)