Skip to content

Commit c88f818

Browse files
committed
feat: adding NOMKSTREAM support for stream add methods
1 parent feb122c commit c88f818

File tree

12 files changed

+235
-41
lines changed

12 files changed

+235
-41
lines changed

docs/ReleaseNotes.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ Current package versions:
1111
- Support Redis 8.4 CAS/CAD operations (`DIGEST`, and the `IFEQ`, `IFNE`, `IFDEQ`, `IFDNE` modifiers on `SET` / `DEL`)
1212
via the new `ValueCondition` abstraction, and use CAS/CAD operations for `Lock*` APIs when possible ([#2978 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2978))
1313
- **note**: overload resolution for `StringSet[Async]` may be impacted in niche cases, requiring trivial build changes (there are no runtime-breaking changes such as missing methods)
14-
- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
14+
- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))
1515
- Support `MSETEX` (Redis 8.4.0) for multi-key operations with expiration ([#2977 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2977))
16+
- Support `NOMKSTREAM` option in `StreamAdd` methods via `createStream` parameter (requires Redis 6.2.0+)
1617

1718
## 2.9.32
1819

docs/Streams.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,38 @@ You also have the option to override the auto-generated message ID by passing yo
3737
db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
3838
```
3939

40+
Conditional Stream Creation with NOMKSTREAM
41+
---
42+
43+
By default, `StreamAdd` automatically creates the stream if it doesn't exist. Starting with Redis 6.2.0, you can prevent automatic stream creation using the `createStream` parameter:
44+
45+
```csharp
46+
// This will return null if the stream doesn't exist
47+
var messageId = db.StreamAdd(
48+
"mystream",
49+
"field",
50+
"value",
51+
createStream: false);
52+
53+
if (messageId.IsNull)
54+
{
55+
Console.WriteLine("Stream does not exist, message was not added.");
56+
}
57+
else
58+
{
59+
Console.WriteLine($"Message added with ID: {messageId}");
60+
}
61+
```
62+
63+
**Use cases**:
64+
- **Producer-consumer scenarios**: Only add messages if a consumer has registered (by creating the stream or consumer group)
65+
- **Prevent typos**: Avoid accidentally creating streams with misspelled keys
66+
- **Conditional writes**: Only write to pre-existing streams
67+
68+
**Requirements**:
69+
- Redis 6.2.0 or higher
70+
- When `createStream: false` and the stream doesn't exist, the method returns `RedisValue.Null`
71+
4072
Reading from Streams
4173
===
4274

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2655,13 +2655,14 @@ IEnumerable<SortedSetEntry> SortedSetScan(
26552655
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
26562656
/// <param name="maxLength">The maximum length of the stream.</param>
26572657
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2658+
/// <param name="createStream">When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+.</param>
26582659
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
26592660
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
26602661
/// <param name="flags">The flags to use for this operation.</param>
26612662
/// <returns>The ID of the newly created message.</returns>
26622663
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
26632664
#pragma warning disable RS0026 // different shape
2664-
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2665+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
26652666
#pragma warning restore RS0026
26662667

26672668
/// <summary>
@@ -2674,13 +2675,14 @@ IEnumerable<SortedSetEntry> SortedSetScan(
26742675
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
26752676
/// <param name="maxLength">The maximum length of the stream.</param>
26762677
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2678+
/// <param name="createStream">When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+.</param>
26772679
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
26782680
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
26792681
/// <param name="flags">The flags to use for this operation.</param>
26802682
/// <returns>The ID of the newly created message.</returns>
26812683
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
26822684
#pragma warning disable RS0026 // different shape
2683-
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2685+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
26842686
#pragma warning restore RS0026
26852687

26862688
/// <summary>

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -650,11 +650,11 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
650650
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
651651

652652
#pragma warning disable RS0026 // similar overloads
653-
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
654-
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
653+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, long?, bool, bool, long?, StreamTrimMode, CommandFlags)"/>
654+
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
655655

656-
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
657-
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
656+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, bool, long?, StreamTrimMode, CommandFlags)"/>
657+
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
658658
#pragma warning restore RS0026
659659

660660
/// <inheritdoc cref="IDatabase.StreamAutoClaim(RedisKey, RedisValue, RedisValue, long, RedisValue, int?, CommandFlags)"/>

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -615,11 +615,11 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, Red
615615
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
616616
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
617617

618-
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
619-
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
618+
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
619+
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);
620620

621-
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
622-
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
621+
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
622+
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);
623623

624624
public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
625625
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -597,11 +597,11 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str
597597
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
598598
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
599599

600-
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
601-
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
600+
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
601+
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);
602602

603-
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
604-
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
603+
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
604+
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags);
605605

606606
public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
607607
Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);

0 commit comments

Comments
 (0)