diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
index 862beda2e..4f300eda1 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxState.cs
@@ -2,11 +2,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
+using System.Collections.Generic;
using System.ComponentModel;
using System.Runtime.Serialization;
namespace Microsoft.StreamProcessing.Internal
{
+ ///
+ /// The value representation with its timestamp.
+ ///
+ /// The type of the underlying elements being aggregated.
+ [DataContract]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public struct ValueAndTimestamp
+ {
+ ///
+ /// The timestamp of the payload.
+ ///
+ [DataMember]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public long timestamp;
+
+ ///
+ /// Payload of the event
+ ///
+ [DataMember]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public T value;
+ }
+
///
/// The state object used in minimum and maximum aggregates.
///
@@ -22,6 +46,13 @@ public struct MinMaxState
[EditorBrowsable(EditorBrowsableState.Never)]
public SortedMultiSet savedValues;
+ ///
+ /// List of values and its timestamp
+ ///
+ [DataMember]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public ElasticCircularBuffer> values;
+
///
/// The current value if the aggregate were to be computed immediately.
///
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
deleted file mode 100644
index fe31f2630..000000000
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-// *********************************************************************
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License
-// *********************************************************************
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.Contracts;
-using System.Linq.Expressions;
-using Microsoft.StreamProcessing.Internal;
-
-namespace Microsoft.StreamProcessing.Aggregates
-{
- internal sealed class SlidingMaxAggregate : IAggregate, T>
- {
- private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
- private readonly Comparison comparer;
-
- public SlidingMaxAggregate(QueryContainer container) : this(ComparerExpression.Default, container) { }
-
- public SlidingMaxAggregate(IComparerExpression comparer, QueryContainer container)
- {
- Contract.Requires(comparer != null);
- this.comparer = comparer.GetCompareExpr().Compile();
-
- var generator = comparer.CreateSortedDictionaryGenerator(container);
- Expression>, MinMaxState>> template
- = (g) => new MinMaxState { savedValues = new SortedMultiSet(g), currentValue = default, currentTimestamp = InvalidSyncTime };
- var replaced = template.ReplaceParametersInBody(generator);
- this.initialState = Expression.Lambda>>(replaced);
- }
-
- private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
-
- public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
-
- private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
- {
- if (timestamp == state.currentTimestamp)
- {
- if (this.comparer(input, state.currentValue) > 0)
- state.currentValue = input;
- }
- else
- {
- if (state.currentTimestamp != InvalidSyncTime)
- state.savedValues.Add(state.currentValue);
- state.currentTimestamp = timestamp;
- state.currentValue = input;
- }
- return state;
- }
-
- public Expression, long, T, MinMaxState>> Deaccumulate()
- => (state, timestamp, input) => state; // never invoked, hence not implemented
-
- public Expression, MinMaxState, MinMaxState>> Difference()
- => (leftSet, rightSet) => Difference(leftSet, rightSet);
-
- private static MinMaxState Difference(MinMaxState leftSet, MinMaxState rightSet)
- {
- if (leftSet.currentTimestamp != InvalidSyncTime)
- {
- leftSet.currentTimestamp = InvalidSyncTime;
- leftSet.savedValues.Add(leftSet.currentValue);
- }
- if (rightSet.currentTimestamp != InvalidSyncTime)
- {
- rightSet.currentTimestamp = InvalidSyncTime;
- rightSet.savedValues.Add(rightSet.currentValue);
- }
-
- leftSet.savedValues.RemoveAll(rightSet.savedValues);
- return leftSet;
- }
-
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
-
- private T ComputeResult(MinMaxState state)
- {
- if (state.savedValues.IsEmpty)
- return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
- else
- {
- if (state.currentTimestamp == InvalidSyncTime)
- return state.savedValues.Last();
- else
- {
- var last = state.savedValues.Last();
- return this.comparer(last, state.currentValue) > 0 ? last : state.currentValue;
- }
- }
- }
- }
-}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
deleted file mode 100644
index 9ad193bec..000000000
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-// *********************************************************************
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License
-// *********************************************************************
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.Contracts;
-using System.Linq.Expressions;
-using Microsoft.StreamProcessing.Internal;
-
-namespace Microsoft.StreamProcessing.Aggregates
-{
- internal sealed class SlidingMinAggregate : IAggregate, T>
- {
- private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
- private readonly Comparison comparer;
-
- public SlidingMinAggregate(QueryContainer container) : this(ComparerExpression.Default, container) { }
-
- public SlidingMinAggregate(IComparerExpression comparer, QueryContainer container)
- {
- Contract.Requires(comparer != null);
- this.comparer = comparer.GetCompareExpr().Compile();
-
- var generator = comparer.CreateSortedDictionaryGenerator(container);
- Expression>, MinMaxState>> template
- = (g) => new MinMaxState { savedValues = new SortedMultiSet(g), currentValue = default, currentTimestamp = InvalidSyncTime };
- var replaced = template.ReplaceParametersInBody(generator);
- this.initialState = Expression.Lambda>>(replaced);
- }
-
- private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
-
- public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
-
- private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
- {
- if (timestamp == state.currentTimestamp)
- {
- if (this.comparer(input, state.currentValue) < 0)
- state.currentValue = input;
- }
- else
- {
- if (state.currentTimestamp != InvalidSyncTime)
- state.savedValues.Add(state.currentValue);
- state.currentTimestamp = timestamp;
- state.currentValue = input;
- }
- return state;
- }
-
- public Expression, long, T, MinMaxState>> Deaccumulate()
- => (state, timestamp, input) => state; // never invoked, hence not implemented
-
- public Expression, MinMaxState, MinMaxState>> Difference()
- => (leftSet, rightSet) => Difference(leftSet, rightSet);
-
- private static MinMaxState Difference(MinMaxState leftSet, MinMaxState rightSet)
- {
- if (leftSet.currentTimestamp != InvalidSyncTime)
- {
- leftSet.currentTimestamp = InvalidSyncTime;
- leftSet.savedValues.Add(leftSet.currentValue);
- }
- if (rightSet.currentTimestamp != InvalidSyncTime)
- {
- rightSet.currentTimestamp = InvalidSyncTime;
- rightSet.savedValues.Add(rightSet.currentValue);
- }
-
- leftSet.savedValues.RemoveAll(rightSet.savedValues);
- return leftSet;
- }
-
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
-
- private T ComputeResult(MinMaxState state)
- {
- if (state.savedValues.IsEmpty)
- return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
- else
- {
- if (state.currentTimestamp == InvalidSyncTime)
- return state.savedValues.First();
- else
- {
- var first = state.savedValues.First();
- return this.comparer(first, state.currentValue) < 0 ? first : state.currentValue;
- }
- }
- }
- }
-}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs
new file mode 100644
index 000000000..895162d57
--- /dev/null
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinMaxAggregate.cs
@@ -0,0 +1,162 @@
+// *********************************************************************
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License
+// *********************************************************************
+using System;
+using System.Diagnostics.Contracts;
+using System.Linq.Expressions;
+using Microsoft.StreamProcessing.Internal;
+
+namespace Microsoft.StreamProcessing.Aggregates
+{
+ /* The following invariant holds for the state:
+ * 1. The 'values' list is always in decreasing order of payload - that happens as we append only smaller values than last.
+ * 2. The 'values.timestamp' is always in increasing order - as the new values are always appended.
+ * 3. The 'currentTimestamp' is higher than all timestamps of the 'state.values' list
+ * 4. The 'currentValue' payload need not be the lowest (relative to 'state.values')
+ * -
+ * Further for usage from HoppingPipe:
+ * 5. The 'ecq.state.values' list will always be empty, optimizing difference operation
+ * 6. The difference calculation happens by timestamp, and uses one bulk remove (RemoveRange/Clear) for values.
+ */
+ internal abstract class SlidingMinMaxAggregate : IAggregate, T>
+ {
+ private static readonly long InvalidSyncTime = StreamEvent.MinSyncTime - 1;
+
+ // The comparer is used as if we are working for Max operation. For Min, we reverse the expression
+ private readonly Comparison comparer;
+
+ public SlidingMinMaxAggregate(IComparerExpression comparer)
+ {
+ Contract.Requires(comparer != null);
+ this.comparer = comparer.GetCompareExpr().Compile();
+ }
+
+ public Expression>> InitialState()
+ => () => new MinMaxState { values = new ElasticCircularBuffer>(), currentValue = default, currentTimestamp = InvalidSyncTime };
+
+ public Expression, long, T, MinMaxState>> Accumulate()
+ => (state, timestamp, input) => Accumulate(state, timestamp, input);
+
+ private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
+ {
+ if (timestamp == state.currentTimestamp)
+ {
+ if (this.comparer(input, state.currentValue) > 0)
+ state.currentValue = input;
+ }
+ else
+ {
+ // Save only if new input is smaller,
+ // If the current input is larger (or equal), we never require older value as they expire before current
+ if (state.currentTimestamp != InvalidSyncTime &&
+ this.comparer(input, state.currentValue) < 0)
+ {
+ PushToCollection(state);
+ }
+ state.currentTimestamp = timestamp;
+ state.currentValue = input;
+ }
+ return state;
+ }
+
+ private void PushToCollection(MinMaxState state)
+ {
+ Contract.Assert(state.currentTimestamp != InvalidSyncTime);
+ Contract.Assert(state.values.Count == 0 || state.values.PeekLast().timestamp <= state.currentTimestamp); // Ensure new timestamp is higher
+
+ while (state.values.Count != 0 &&
+ this.comparer(state.values.PeekLast().value, state.currentValue) <= 0)
+ {
+ state.values.PopLast();
+ }
+
+ var newValue = new ValueAndTimestamp { timestamp = state.currentTimestamp, value = state.currentValue };
+ state.values.Enqueue(ref newValue);
+ }
+
+ public Expression, long, T, MinMaxState>> Deaccumulate()
+ => (state, timestamp, input) => Deaccumulate(state, timestamp, input);
+
+ private static MinMaxState Deaccumulate(MinMaxState state, long timestamp, T input)
+ {
+ throw new NotImplementedException($"{nameof(SlidingMinMaxAggregate)} does not implement Deaccumulate()");
+ }
+
+ public Expression, MinMaxState, MinMaxState>> Difference()
+ => (leftSet, rightSet) => Difference(leftSet, rightSet);
+
+ private static MinMaxState Difference(MinMaxState leftSet, MinMaxState rightSet)
+ {
+ long maxRightTimestamp;
+
+ if (rightSet.currentTimestamp != InvalidSyncTime)
+ {
+ maxRightTimestamp = rightSet.currentTimestamp;
+ }
+ else
+ {
+ if (rightSet.values.Count == 0)
+ return leftSet;
+
+ // The right set will never contain values for HoppingPipe, adding below for completeness
+ maxRightTimestamp = rightSet.values.PeekLast().timestamp;
+ }
+
+ if (leftSet.currentTimestamp != InvalidSyncTime &&
+ leftSet.currentTimestamp <= maxRightTimestamp)
+ {
+ leftSet.currentTimestamp = InvalidSyncTime; // Discard all values if rightSet's maxTime covers it.
+ leftSet.values.Clear();
+ return leftSet;
+ }
+ else
+ {
+ while (leftSet.values.Count != 0 &&
+ leftSet.values.PeekFirst().timestamp <= maxRightTimestamp)
+ {
+ leftSet.values.Dequeue();
+ }
+ }
+
+ return leftSet;
+ }
+
+ public Expression, T>> ComputeResult() => state => ComputeResult(state);
+
+ private T ComputeResult(MinMaxState state)
+ {
+ if (state.values.Count == 0)
+ return (state.currentTimestamp == InvalidSyncTime) ? default : state.currentValue;
+ else
+ {
+ var first = state.values.PeekFirst().value;
+
+ if (state.currentTimestamp == InvalidSyncTime)
+ return first;
+ else
+ {
+ return this.comparer(first, state.currentValue) > 0 ? first : state.currentValue;
+ }
+ }
+ }
+ }
+
+ internal sealed class SlidingMaxAggregate : SlidingMinMaxAggregate
+ {
+ public SlidingMaxAggregate()
+ : this(ComparerExpression.Default) { }
+
+ public SlidingMaxAggregate(IComparerExpression comparer)
+ : base(comparer) { }
+ }
+
+ internal sealed class SlidingMinAggregate : SlidingMinMaxAggregate
+ {
+ public SlidingMinAggregate()
+ : this(ComparerExpression.Default) { }
+
+ public SlidingMinAggregate(IComparerExpression comparer)
+ : base(ComparerExpression.Reverse(comparer)) { }
+ }
+}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
index 15ebdacff..cb6ed60b4 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
@@ -20,24 +20,15 @@ public TopKAggregate(int k, IComparerExpression rankComparer, QueryContainer
: this(k, rankComparer, ComparerExpression.Default, container) { }
public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpression overallComparer, QueryContainer container)
- : base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
+ : base(ThenOrderBy(ComparerExpression.Reverse(rankComparer), overallComparer), container)
{
Contract.Requires(rankComparer != null);
Contract.Requires(overallComparer != null);
Contract.Requires(k > 0);
- this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
+ this.compiledRankComparer = ComparerExpression.Reverse(rankComparer).GetCompareExpr().Compile();
this.k = k;
}
- private static IComparerExpression Reverse(IComparerExpression comparer)
- {
- Contract.Requires(comparer != null);
- var expression = comparer.GetCompareExpr();
- Expression> template = (left, right) => CallInliner.Call(expression, right, left);
- var reversedExpression = template.InlineCalls();
- return new ComparerExpression(reversedExpression);
- }
-
private static IComparerExpression ThenOrderBy(IComparerExpression comparer1, IComparerExpression comparer2)
{
Contract.Requires(comparer1 != null);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
index 3df34156c..e85838d8f 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
@@ -21,7 +21,7 @@ public sealed class CircularBuffer
[DataMember]
private int capacityMask = 0xfff;
[DataMember]
- internal T[] Items;
+ internal T[] Items = null;
[DataMember]
internal int head = 0;
[DataMember]
@@ -31,7 +31,7 @@ public sealed class CircularBuffer
/// Currently for internal use only - do not use directly.
///
[EditorBrowsable(EditorBrowsableState.Never)]
- public CircularBuffer() => this.Items = new T[this.capacityMask + 1];
+ public CircularBuffer() { }
///
/// Currently for internal use only - do not use directly.
@@ -44,7 +44,6 @@ public CircularBuffer(int capacity)
var temp = 8;
while (temp <= capacity) temp <<= 1;
- this.Items = new T[temp];
this.capacityMask = temp - 1;
}
@@ -72,6 +71,7 @@ public void Enqueue(ref T value)
{
int next = (this.tail + 1) & this.capacityMask;
if (next == this.head) throw new InvalidOperationException("The list is full!");
+ if (this.Items == null) this.Items = new T[this.capacityMask + 1];
this.Items[this.tail] = value;
this.tail = next;
}
@@ -91,6 +91,21 @@ public T Dequeue()
return ret;
}
+ ///
+ /// Currently for internal use only - do not use directly.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public T PopLast()
+ {
+ if (this.head == this.tail) throw new InvalidOperationException("The list is empty!");
+ int oldtail = this.tail;
+ this.tail = (this.tail - 1) & this.capacityMask;
+ var ret = this.Items[oldtail];
+ this.Items[oldtail] = default;
+ return ret;
+ }
+
///
/// Currently for internal use only - do not use directly.
///
@@ -105,6 +120,13 @@ public T Dequeue()
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => this.head == this.tail;
+ ///
+ /// Removes alll elements from the list - do not use directly.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public void Clear() => this.head = this.tail = 0;
+
///
/// Currently for internal use only - do not use directly.
///
@@ -150,6 +172,17 @@ public ElasticCircularBuffer()
this.Count = 0;
}
+ ///
+ /// Currently for internal use only - do not use directly.
+ ///
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public void Clear()
+ {
+ this.tail = this.head = this.buffers.First;
+ this.head.Value.Clear();
+ this.Count = 0;
+ }
+
///
/// Currently for internal use only - do not use directly.
///
@@ -203,6 +236,27 @@ public T Dequeue()
return this.head.Value.Dequeue();
}
+ ///
+ /// Currently for internal use only - do not use directly.
+ ///
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public T PopLast()
+ {
+ if (this.tail.Value.IsEmpty())
+ {
+ if (this.head == this.tail)
+ throw new InvalidOperationException("The list is empty!");
+
+ this.tail = this.tail.Previous;
+ if (this.tail == null) this.tail = this.buffers.Last;
+ }
+
+ this.Count--;
+ return this.tail.Value.PopLast();
+ }
+
///
/// Currently for internal use only - do not use directly.
///
diff --git a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
index f39ea9dbf..dc1fdf047 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Utilities/ComparerExpression.cs
@@ -5,7 +5,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Linq;
+using System.Diagnostics.Contracts;
using System.Linq.Expressions;
using System.Reflection;
@@ -244,6 +244,16 @@ private static ConditionalExpression MakeComparisonExpression(ParameterExpressio
internal static bool IsSimpleDefault(IComparerExpression input)
=> input == Default && input is PrimitiveComparerExpression;
+
+ public static IComparerExpression Reverse(IComparerExpression comparer)
+ {
+ Contract.Requires(comparer != null);
+
+ var expression = comparer.GetCompareExpr();
+ Expression> template = (left, right) => CallInliner.Call(expression, right, left);
+ var reversedExpression = template.InlineCalls();
+ return new ComparerExpression(reversedExpression);
+ }
}
internal class PrimitiveComparerExpression : ComparerExpression
diff --git a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
index 198521763..749cce699 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs
@@ -90,7 +90,7 @@ public IAggregate, TValue> Min(Expression()
: this.Properties.IsConstantDuration
- ? new SlidingMinAggregate(this.Properties.QueryContainer)
+ ? new SlidingMinAggregate()
: (IAggregate, TValue>)new MinAggregate(this.Properties.QueryContainer);
return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -108,7 +108,7 @@ public IAggregate, TValue> Min(
var aggregate = this.Properties.IsTumbling
? new TumblingMinAggregate(comparer)
: this.Properties.IsConstantDuration
- ? new SlidingMinAggregate(comparer, this.Properties.QueryContainer)
+ ? new SlidingMinAggregate(comparer)
: (IAggregate, TValue>)new MinAggregate(comparer, this.Properties.QueryContainer);
return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -130,7 +130,7 @@ public IAggregate, TValue> Max(Expression()
: this.Properties.IsConstantDuration
- ? new SlidingMaxAggregate(this.Properties.QueryContainer)
+ ? new SlidingMaxAggregate()
: (IAggregate, TValue>)new MaxAggregate(this.Properties.QueryContainer);
return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
@@ -147,7 +147,7 @@ public IAggregate, TValue> Max(Expression(comparer)
: this.Properties.IsConstantDuration
- ? new SlidingMaxAggregate(comparer, this.Properties.QueryContainer)
+ ? new SlidingMaxAggregate(comparer)
: (IAggregate, TValue>)new MaxAggregate(comparer, this.Properties.QueryContainer);
return aggregate.SkipNulls().Wrap(selector).ApplyFilter(this.Filter);
diff --git a/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs
new file mode 100644
index 000000000..2625ead81
--- /dev/null
+++ b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowMinMaxAggregate.cs
@@ -0,0 +1,229 @@
+// *********************************************************************
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License
+// *********************************************************************
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Microsoft.StreamProcessing;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace SimpleTesting
+{
+ [TestClass]
+ public class HoppingWindowMinMaxAggregate : TestWithConfigSettingsAndMemoryLeakDetection
+ {
+ private const long HopSize = 10;
+ private const long WindowSize = 4 * HopSize;
+
+ private StreamEvent EndEvent = StreamEvent.CreatePunctuation(StreamEvent.InfinitySyncTime);
+ private Random random = new Random(Seed: (int)DateTime.UtcNow.Ticks);
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateSimple()
+ {
+ var input = new[]
+ {
+ StreamEvent.CreateStart(1, 10),
+ StreamEvent.CreateStart(3, 20),
+ StreamEvent.CreateStart(6, 10),
+ StreamEvent.CreateStart(8, 30),
+ StreamEvent.CreateStart(12, 20),
+ StreamEvent.CreateStart(18, 10),
+ EndEvent
+ };
+
+ var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Max();
+
+ var correctValues = new[] { 20, 30, 30, 20, 10 };
+ var correctEvents = new List>();
+ for (int i = 0; i < correctValues.Length; i++)
+ {
+ correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i]));
+ correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i]));
+ }
+ correctEvents.Add(EndEvent);
+
+ CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray());
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinAggregateSimple()
+ {
+ var input = new[]
+ {
+ StreamEvent.CreateStart(1, 30),
+ StreamEvent.CreateStart(3, 20),
+ StreamEvent.CreateStart(6, 30),
+ StreamEvent.CreateStart(8, 10),
+ StreamEvent.CreateStart(12, 20),
+ StreamEvent.CreateStart(18, 30),
+ EndEvent
+ };
+
+ var output = input.ToStreamable().HoppingWindowLifetime(10, 5).Min();
+
+ var correctValues = new[] { 20, 10, 10, 20, 30 };
+ var correctEvents = new List>();
+ for (int i = 0; i < correctValues.Length; i++)
+ {
+ correctEvents.Add(StreamEvent.CreateStart(5 * (i + 1), correctValues[i]));
+ correctEvents.Add(StreamEvent.CreateEnd(5 * (i + 2), 5 * (i + 1), correctValues[i]));
+ }
+ correctEvents.Add(EndEvent);
+
+ CollectionAssert.AreEqual(correctEvents, output.ToStreamEventArray());
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateRandomDistribution()
+ {
+ // Distribution: [1,2,3,4,5] hops : 10% each, Closely-Spaced: 50%
+ GenerateDataAndTestInput(
+ numValues: 1000,
+ valueGenerator: v => random.Next(100, 110),
+ distanceGenerator: () =>
+ {
+ var hopType = random.Next(1, 11);
+ return (hopType < 6) ? (hopType * HopSize) : (hopType - 5);
+ });
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateUnaligned()
+ {
+ GenerateDataAndTestInput(
+ numValues: 1000,
+ valueGenerator: v => random.Next(100, 110),
+ distanceGenerator: () => 10,
+ windowSize: 27);
+
+ GenerateDataAndTestInput(
+ numValues: 1000,
+ valueGenerator: v => random.Next(100, 110),
+ distanceGenerator: () => 10,
+ windowSize: 17);
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateAllIncreasing()
+ {
+ GenerateDataAndTestInput(
+ numValues: 1000,
+ valueGenerator: v => v + random.Next(1, 11),
+ distanceGenerator: () => HopSize);
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateAllDecreasing()
+ {
+ GenerateDataAndTestInput(
+ numValues: 1000,
+ valueGenerator: v => (v == 0) ? 10000 : v - random.Next(1, 11),
+ distanceGenerator: () => HopSize);
+ }
+
+ [TestMethod, TestCategory("Gated")]
+ public void TestHoppingWindowMinMaxAggregateCyclingValues()
+ {
+ // Test values 1 -> 2 -> 3 -> 1, and run for 1/2, 1, 2, 4, 8 intervals of hops
+ for (long distance = HopSize / 2; distance <= HopSize * 8; distance *= 2)
+ {
+ GenerateDataAndTestInput(
+ numValues: 10,
+ valueGenerator: v => (v % 3) + 1,
+ distanceGenerator: () => distance);
+ }
+ }
+
+ private void GenerateDataAndTestInput(
+ int numValues,
+ Func valueGenerator,
+ Func distanceGenerator,
+ long windowSize = WindowSize)
+ {
+ var input = new List>();
+ long maxStartTime = 0;
+
+ long startTime = 0;
+ int value = 100;
+ for (int i = 0; i < numValues; i++)
+ {
+ startTime += distanceGenerator();
+ value = valueGenerator(value);
+ input.Add(StreamEvent.CreateStart(startTime, value));
+ maxStartTime = Math.Max(maxStartTime, startTime);
+ }
+ input.Add(EndEvent);
+
+ TestHoppingWindowMaxAggregateInternal(input, maxStartTime, windowSize);
+
+ TestHoppingWindowMinAggregateInternal(input, maxStartTime, windowSize);
+ }
+
+ private void TestHoppingWindowMinAggregateInternal(IEnumerable> streamEvents, long maxStartTime, long windowSize)
+ {
+ TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: true, maxStartTime, windowSize);
+ }
+
+ private void TestHoppingWindowMaxAggregateInternal(IEnumerable> streamEvents, long maxStartTime, long windowSize)
+ {
+ TestHoppingWindowMinMaxAggregateInternal(streamEvents, isMax: false, maxStartTime, windowSize);
+ }
+
+ private void TestHoppingWindowMinMaxAggregateInternal(IEnumerable> streamEvents, bool isMax, long maxStartTime, long windowSize)
+ {
+ var output = streamEvents.ToStreamable().HoppingWindowLifetime(windowSize, HopSize).Max();
+
+ var correct = new List>();
+
+ maxStartTime += windowSize;
+ for (long startTime = 0; startTime < maxStartTime; startTime += HopSize)
+ {
+ var eventsInWindow = streamEvents.Where(x => x.StartTime > (startTime - windowSize) && x.StartTime <= startTime);
+
+ if (!eventsInWindow.Any())
+ continue;
+
+ var fun = isMax ? new Func((x, y) => x > y) : ((x, y) => x < y);
+ var max = eventsInWindow.Aggregate((state, input) => state.Payload > input.Payload ? state : input);
+
+ correct.Add(StreamEvent.CreateStart(startTime, max.Payload));
+ correct.Add(StreamEvent.CreateEnd(startTime + HopSize, startTime, max.Payload));
+ }
+
+ var actual = NormalizeToInterval(output.ToStreamEventArray());
+
+ Assert.IsTrue(Enumerable.SequenceEqual(NormalizeToInterval(correct), actual));
+ }
+
+ private IEnumerable> NormalizeToInterval(IEnumerable> streamEvents)
+ {
+ var result = new List>();
+
+ var endEvents = streamEvents.Where(se => se.Kind == StreamEventKind.End || se.Kind == StreamEventKind.Interval);
+
+ if (!endEvents.Any())
+ return result;
+
+ var firstEvent = endEvents.First();
+ StreamEvent curInterval = StreamEvent.CreateInterval(firstEvent.StartTime, firstEvent.EndTime, firstEvent.Payload);
+
+ foreach (var streamEvent in endEvents.Skip(1))
+ {
+ if ((streamEvent.StartTime == curInterval.EndTime || streamEvent.Kind == StreamEventKind.Interval) && // Merge into current interval if payload is same
+ streamEvent.Payload == curInterval.Payload)
+ {
+ curInterval.OtherTime = streamEvent.EndTime;
+ }
+ else
+ {
+ result.Add(curInterval);
+ curInterval = StreamEvent.CreateInterval(streamEvent.StartTime, streamEvent.EndTime, streamEvent.Payload); ;
+ }
+ }
+ result.Add(curInterval);
+ return result;
+ }
+ }
+}
diff --git a/Sources/Test/SimpleTesting/Program.cs b/Sources/Test/SimpleTesting/Program.cs
index e1b97e1e9..80f4397f2 100644
--- a/Sources/Test/SimpleTesting/Program.cs
+++ b/Sources/Test/SimpleTesting/Program.cs
@@ -49,7 +49,7 @@ public static IStreamable ToCleanStreamable(this Stre
return input.OrderBy(v => v.SyncTime).ToArray().ToStreamable();
}
- public static IStreamable ToStreamable(this StreamEvent[] input)
+ public static IStreamable ToStreamable(this IEnumerable> input)
{
Invariant.IsNotNull(input, "input");
diff --git a/Sources/Test/SimpleTesting/SimpleTesting.csproj b/Sources/Test/SimpleTesting/SimpleTesting.csproj
index a024e50d4..55fa71817 100644
--- a/Sources/Test/SimpleTesting/SimpleTesting.csproj
+++ b/Sources/Test/SimpleTesting/SimpleTesting.csproj
@@ -63,6 +63,7 @@
+