Skip to content

Commit 73cc056

Browse files
authored
[Fusion] Improved Scheduler (#9118)
1 parent 14a4893 commit 73cc056

File tree

1 file changed

+13
-25
lines changed

1 file changed

+13
-25
lines changed

src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutor.cs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Collections.Concurrent;
12
using System.Runtime.CompilerServices;
23
using HotChocolate.Execution;
34
using HotChocolate.Features;
@@ -216,7 +217,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(
216217
var requestCount = requests.Count;
217218
var tasks = Interlocked.Exchange(ref _taskList, null) ?? new List<Task>(requestCount);
218219

219-
var completed = new List<OperationResult>();
220+
var completed = new ConcurrentQueue<OperationResult>();
220221

221222
for (var i = 0; i < requestCount; i++)
222223
{
@@ -225,7 +226,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(
225226

226227
var buffer = new OperationResult[Math.Min(16, requestCount)];
227228

228-
while (tasks.Count > 0 || completed.Count > 0)
229+
while (tasks.Count > 0 || !completed.IsEmpty)
229230
{
230231
var count = completed.TryDequeueRange(buffer);
231232

@@ -234,7 +235,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(
234235
yield return buffer[i];
235236
}
236237

237-
if (completed.Count == 0 && tasks.Count > 0)
238+
if (completed.IsEmpty && tasks.Count > 0)
238239
{
239240
await Task.WhenAny(tasks).ConfigureAwait(false);
240241

@@ -280,7 +281,7 @@ private static IOperationRequest WithServices(
280281
private async Task ExecuteBatchItemAsync(
281282
IOperationRequest request,
282283
int requestIndex,
283-
List<OperationResult> completed,
284+
ConcurrentQueue<OperationResult> completed,
284285
CancellationToken cancellationToken)
285286
{
286287
var result = await ExecuteAsync(request, requestIndex, cancellationToken).ConfigureAwait(false);
@@ -289,7 +290,7 @@ private async Task ExecuteBatchItemAsync(
289290

290291
private static async Task UnwrapBatchItemResultAsync(
291292
IExecutionResult result,
292-
List<OperationResult> completed,
293+
ConcurrentQueue<OperationResult> completed,
293294
CancellationToken cancellationToken)
294295
{
295296
switch (result)
@@ -340,30 +341,17 @@ private static async Task UnwrapBatchItemResultAsync(
340341
public ValueTask DisposeAsync() => Schema.DisposeAsync();
341342
}
342343

343-
file static class ListExtensions
344+
file static class ConcurrentQueueExtensions
344345
{
345-
public static void Enqueue<T>(this List<T> queue, T item)
346+
public static int TryDequeueRange<T>(this ConcurrentQueue<T> queue, T[] buffer)
346347
{
347-
lock (queue)
348-
{
349-
queue.Insert(0, item);
350-
}
351-
}
348+
var i = 0;
352349

353-
public static int TryDequeueRange<T>(this List<T> queue, T[] buffer)
354-
{
355-
lock (queue)
350+
while (i < buffer.Length && queue.TryDequeue(out var value))
356351
{
357-
var count = Math.Min(queue.Count, buffer.Length);
358-
var j = 0;
359-
360-
for (var i = count - 1; i >= 0; i--)
361-
{
362-
buffer[j++] = queue[i];
363-
queue.RemoveAt(i);
364-
}
365-
366-
return count;
352+
buffer[i++] = value;
367353
}
354+
355+
return i;
368356
}
369357
}

0 commit comments

Comments
 (0)