Skip to content

Commit 62a119a

Browse files
Ensure non succesful SDK status codes are handled gracefully (#854) (#857)
* Make sure the outbox recording reading validates the response message for success * Make sure the saga persistence read validates the response for successful status code * Cosmetics * OutboxPersister Get tests * SagaPersister Get tests * Remove the outbox etag setting since it is never used (and shouldn't be since we rely on the set as dispatched being idempotent) --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent 322d1f7 commit 62a119a

File tree

7 files changed

+347
-95
lines changed

7 files changed

+347
-95
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
namespace NServiceBus.Persistence.CosmosDB.Tests
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.IO;
6+
using System.Linq;
7+
using System.Net;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Extensibility;
11+
using Microsoft.Azure.Cosmos;
12+
using Microsoft.Azure.Cosmos.Scripts;
13+
using Newtonsoft.Json;
14+
using NUnit.Framework;
15+
16+
[TestFixture]
17+
public class OutboxPersisterTests
18+
{
19+
[Test]
20+
public async Task Should_return_null_when_not_found()
21+
{
22+
var fakeCosmosClient = new FakeCosmosClient(new FakeContainer
23+
{
24+
ReadItemStreamOutboxRecord = () => new ResponseMessage(HttpStatusCode.NotFound)
25+
});
26+
27+
var containerHolderHolderResolver = new ContainerHolderResolver(new FakeProvider(fakeCosmosClient),
28+
new ContainerInformation("fakeContainer", new PartitionKeyPath("")), "fakeDatabase");
29+
30+
var persister = new OutboxPersister(containerHolderHolderResolver, JsonSerializer.Create(), 0);
31+
32+
var contextBag = new ContextBag();
33+
contextBag.Set(new PartitionKey("somePartitionKey"));
34+
35+
var outboxRecord = await persister.Get("someMessageId", contextBag);
36+
37+
Assert.That(outboxRecord, Is.Null);
38+
}
39+
40+
[Test]
41+
public void Should_rethrow_unsuccessful_status()
42+
{
43+
var fakeCosmosClient = new FakeCosmosClient(new FakeContainer
44+
{
45+
// not testing more status codes since we would effectively be testing EnsureSuccessfulStatus
46+
ReadItemStreamOutboxRecord = () => new ResponseMessage(HttpStatusCode.TooManyRequests)
47+
});
48+
49+
var containerHolderHolderResolver = new ContainerHolderResolver(new FakeProvider(fakeCosmosClient),
50+
new ContainerInformation("fakeContainer", new PartitionKeyPath("")), "fakeDatabase");
51+
52+
var persister = new OutboxPersister(containerHolderHolderResolver, JsonSerializer.Create(), 0);
53+
54+
var contextBag = new ContextBag();
55+
contextBag.Set(new PartitionKey("somePartitionKey"));
56+
57+
Assert.ThrowsAsync<CosmosException>(async () => await persister.Get("someMessage", contextBag));
58+
}
59+
60+
class FakeContainer : Container
61+
{
62+
public override Task<ResponseMessage> ReadItemStreamAsync(string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = new()) => Task.FromResult(ReadItemStreamOutboxRecord());
63+
64+
public Func<ResponseMessage> ReadItemStreamOutboxRecord = () => new ResponseMessage(HttpStatusCode.OK);
65+
66+
#region Not Implemented Members
67+
68+
public override string Id => throw new NotImplementedException();
69+
70+
public override Database Database => throw new NotImplementedException();
71+
72+
public override Conflicts Conflicts => throw new NotImplementedException();
73+
74+
public override Scripts Scripts => throw new NotImplementedException();
75+
76+
public override Task<ItemResponse<T>> CreateItemAsync<T>(T item, PartitionKey? partitionKey = null, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
77+
public override Task<ResponseMessage> CreateItemStreamAsync(Stream streamPayload, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
78+
public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey) => throw new NotImplementedException();
79+
public override Task<ContainerResponse> DeleteContainerAsync(ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
80+
public override Task<ResponseMessage> DeleteContainerStreamAsync(ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
81+
public override Task<ItemResponse<T>> DeleteItemAsync<T>(string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
82+
public override Task<ResponseMessage> PatchItemStreamAsync(string id, PartitionKey partitionKey, IReadOnlyList<PatchOperation> patchOperations, PatchItemRequestOptions requestOptions = null, CancellationToken cancellationToken = new()) => throw new NotImplementedException();
83+
public override Task<ResponseMessage> DeleteItemStreamAsync(string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
84+
public override ChangeFeedEstimator GetChangeFeedEstimator(string processorName, Container leaseContainer) => throw new NotImplementedException();
85+
public override ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder(string processorName, ChangesEstimationHandler estimationDelegate, TimeSpan? estimationPeriod = null) => throw new NotImplementedException();
86+
public override FeedIterator<T> GetChangeFeedIterator<T>(ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedMode changeFeedMode, ChangeFeedRequestOptions changeFeedRequestOptions = null) => throw new NotImplementedException();
87+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(string processorName, ChangesHandler<T> onChangesDelegate) => throw new NotImplementedException();
88+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(string processorName, ChangeFeedHandler<T> onChangesDelegate) => throw new NotImplementedException();
89+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(string processorName, ChangeFeedStreamHandler onChangesDelegate) => throw new NotImplementedException();
90+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(string processorName, ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate) => throw new NotImplementedException();
91+
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(string processorName, ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate) => throw new NotImplementedException();
92+
public override FeedIterator GetChangeFeedStreamIterator(ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedMode changeFeedMode, ChangeFeedRequestOptions changeFeedRequestOptions = null) => throw new NotImplementedException();
93+
public override Task<IReadOnlyList<FeedRange>> GetFeedRangesAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();
94+
95+
public override FeedIterator<T> GetItemQueryIterator<T>(FeedRange feedRange, QueryDefinition queryDefinition,
96+
string continuationToken = null, QueryRequestOptions requestOptions = null) =>
97+
throw new NotImplementedException();
98+
99+
public override IOrderedQueryable<T> GetItemLinqQueryable<T>(bool allowSynchronousQueryExecution = false, string continuationToken = null, QueryRequestOptions requestOptions = null, CosmosLinqSerializerOptions linqSerializerOptions = null) => throw new NotImplementedException();
100+
public override FeedIterator<T> GetItemQueryIterator<T>(QueryDefinition queryDefinition, string continuationToken = null, QueryRequestOptions requestOptions = null) => throw new NotImplementedException();
101+
public override FeedIterator<T> GetItemQueryIterator<T>(string queryText = null, string continuationToken = null, QueryRequestOptions requestOptions = null) => throw new NotImplementedException();
102+
103+
public override FeedIterator GetItemQueryStreamIterator(FeedRange feedRange, QueryDefinition queryDefinition, string continuationToken,
104+
QueryRequestOptions requestOptions = null) =>
105+
throw new NotImplementedException();
106+
107+
public override FeedIterator GetItemQueryStreamIterator(QueryDefinition queryDefinition, string continuationToken = null, QueryRequestOptions requestOptions = null) => throw new NotImplementedException();
108+
public override FeedIterator GetItemQueryStreamIterator(string queryText = null, string continuationToken = null, QueryRequestOptions requestOptions = null) => throw new NotImplementedException();
109+
public override Task<ContainerResponse> ReadContainerAsync(ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
110+
public override Task<ResponseMessage> ReadContainerStreamAsync(ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
111+
public override Task<ItemResponse<T>> ReadItemAsync<T>(string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
112+
public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(IReadOnlyList<(string id, PartitionKey partitionKey)> items, ReadManyRequestOptions readManyRequestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
113+
public override Task<ItemResponse<T>> PatchItemAsync<T>(string id, PartitionKey partitionKey, IReadOnlyList<PatchOperation> patchOperations, PatchItemRequestOptions requestOptions = null, CancellationToken cancellationToken = new()) => throw new NotImplementedException();
114+
public override Task<ResponseMessage> ReadManyItemsStreamAsync(IReadOnlyList<(string id, PartitionKey partitionKey)> items, ReadManyRequestOptions readManyRequestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
115+
public override Task<int?> ReadThroughputAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();
116+
public override Task<ThroughputResponse> ReadThroughputAsync(RequestOptions requestOptions, CancellationToken cancellationToken = default) => throw new NotImplementedException();
117+
public override Task<ContainerResponse> ReplaceContainerAsync(ContainerProperties containerProperties, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
118+
public override Task<ResponseMessage> ReplaceContainerStreamAsync(ContainerProperties containerProperties, ContainerRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
119+
public override Task<ItemResponse<T>> ReplaceItemAsync<T>(T item, string id, PartitionKey? partitionKey = null, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
120+
public override Task<ResponseMessage> ReplaceItemStreamAsync(Stream streamPayload, string id, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
121+
public override Task<ThroughputResponse> ReplaceThroughputAsync(int throughput, RequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
122+
public override Task<ThroughputResponse> ReplaceThroughputAsync(ThroughputProperties throughputProperties, RequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
123+
public override Task<ItemResponse<T>> UpsertItemAsync<T>(T item, PartitionKey? partitionKey = null, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
124+
public override Task<ResponseMessage> UpsertItemStreamAsync(Stream streamPayload, PartitionKey partitionKey, ItemRequestOptions requestOptions = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
125+
126+
#endregion
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)