Skip to content

Commit 57c91bc

Browse files
authored
Fix issue with nested DataLoader dispatching. (#9121)
1 parent bcc2224 commit 57c91bc

File tree

10 files changed

+213
-82
lines changed

10 files changed

+213
-82
lines changed

src/GreenDonut/src/GreenDonut/Instrumentation/AggregateDataLoaderDiagnosticEventListener.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public override void BatchEvaluated(int openBatches)
9292
}
9393

9494
/// <inheritdoc />
95-
public override void BatchDispatched(int dispatchedBatches, bool inParallel)
95+
public override void BatchDispatched(int dispatchedBatches)
9696
{
9797
for (var i = 0; i < listeners.Length; i++)
9898
{
99-
listeners[i].BatchDispatched(dispatchedBatches, inParallel);
99+
listeners[i].BatchDispatched(dispatchedBatches);
100100
}
101101
}
102102

src/GreenDonut/src/GreenDonut/Instrumentation/DataLoaderDiagnosticEventListener.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public virtual void BatchEvaluated(int openBatches)
5959
{ }
6060

6161
/// <inheritdoc />
62-
public virtual void BatchDispatched(int dispatchedBatches, bool inParallel)
62+
public virtual void BatchDispatched(int dispatchedBatches)
6363
{ }
6464

6565
private sealed class EmptyActivityScope : IDisposable

src/GreenDonut/src/GreenDonut/Instrumentation/IDataLoaderDiagnosticEvents.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,5 @@ void BatchItemError<TKey>(
9595
/// <param name="dispatchedBatches">
9696
/// The number of batches that have been dispatched.
9797
/// </param>
98-
/// <param name="inParallel">
99-
/// Indicates whether the batches have been dispatched in parallel.
100-
/// </param>
101-
void BatchDispatched(int dispatchedBatches, bool inParallel);
98+
void BatchDispatched(int dispatchedBatches);
10299
}

src/HotChocolate/AspNetCore/benchmarks/k6/Catalog.API/Program.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
.AddDefaultBatchDispatcher(
3636
new HotChocolate.Fetching.BatchDispatcherOptions
3737
{
38-
EnableParallelBatches = false,
39-
MaxParallelBatches = 4,
4038
MaxBatchWaitTimeUs = 50_000
4139
})
4240
.AddDiagnosticEventListener<BenchmarkDataLoaderDiagnosticEventListener>()

src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public sealed partial class BatchDispatcher : IBatchDispatcher
2828
private readonly IDataLoaderDiagnosticEvents _diagnosticEvents;
2929
private readonly BatchDispatcherOptions _options;
3030
private List<Task>? _dispatchTasks;
31+
private List<Task>? _inFlightDispatches;
3132
private int _openBatches;
3233
private long _lastSubscribed;
3334
private long _lastEnqueued;
@@ -43,6 +44,15 @@ public BatchDispatcher(IDataLoaderDiagnosticEvents diagnosticEvents, BatchDispat
4344
ArgumentNullException.ThrowIfNull(diagnosticEvents);
4445

4546
_diagnosticEvents = diagnosticEvents;
47+
48+
// Guard against `default(BatchDispatcherOptions)` which zeroes all fields,
49+
// bypassing the struct's field initializers and silently disabling age-based
50+
// forced dispatch.
51+
if (options.MaxBatchWaitTimeUs == 0)
52+
{
53+
options.MaxBatchWaitTimeUs = 50_000;
54+
}
55+
4656
_options = options;
4757
_coordinatorCts.Token.Register(_signal.Set);
4858
}
@@ -91,7 +101,8 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
91101
using var scope = _diagnosticEvents.RunBatchDispatchCoordinator();
92102

93103
var backlog = new PriorityQueue<Batch, long>();
94-
_dispatchTasks ??= new List<Task>(_options.MaxParallelBatches);
104+
_dispatchTasks ??= new List<Task>(4);
105+
_inFlightDispatches ??= new List<Task>(4);
95106

96107
Send(BatchDispatchEventType.CoordinatorStarted);
97108

@@ -106,7 +117,11 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
106117
return;
107118
}
108119

109-
await EvaluateAndDispatchAsync(backlog, _dispatchTasks, stoppingToken);
120+
await EvaluateAndDispatchAsync(
121+
backlog,
122+
_dispatchTasks,
123+
_inFlightDispatches,
124+
stoppingToken);
110125
}
111126
}
112127
catch (Exception ex)
@@ -124,17 +139,30 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
124139
private async Task EvaluateAndDispatchAsync(
125140
PriorityQueue<Batch, long> backlog,
126141
List<Task> dispatchTasks,
142+
List<Task> inFlightDispatches,
127143
CancellationToken stoppingToken)
128144
{
129-
var noDispatchCycles = 0;
145+
var idleCycles = 0;
130146

131147
while (!stoppingToken.IsCancellationRequested)
132148
{
149+
var completedDispatches = await CompleteInFlightDispatchesAsync(inFlightDispatches)
150+
.ConfigureAwait(false);
151+
152+
if (completedDispatches > 0)
153+
{
154+
_diagnosticEvents.BatchDispatched(completedDispatches);
155+
Send(BatchDispatchEventType.Dispatched);
156+
idleCycles = 0;
157+
}
158+
133159
var openBatches = Volatile.Read(ref _openBatches);
134160
long lastModified = 0;
135161

136-
// If we have no open batches to evaluate, we can stop
137-
// and wait for another signal.
162+
// If we have no open batches to evaluate and all in-flight dispatches
163+
// are completed, we can stop and wait for another signal.
164+
// If there are in-flight dispatches still running we also stop and
165+
// wait for their completion signal.
138166
if (openBatches == 0)
139167
{
140168
return;
@@ -146,35 +174,13 @@ private async Task EvaluateAndDispatchAsync(
146174

147175
EvaluateOpenBatches(ref lastModified, backlog, dispatchTasks);
148176

149-
// If the evaluation selected batches for dispatch.
177+
// If the evaluation selected batches for dispatch, we register them
178+
// as in-flight and continue evaluation without waiting for completion.
150179
if (dispatchTasks.Count > 0)
151180
{
152-
// We wait for all dispatch tasks to be completed before we reset the signal
153-
// that lets us pause the evaluation. Only then will we send a message to
154-
// the subscribed executors to reevaluate if they can continue execution.
155-
if (dispatchTasks.Count == 1)
156-
{
157-
await dispatchTasks[0];
158-
}
159-
else
160-
{
161-
if (_options.EnableParallelBatches)
162-
{
163-
await Task.WhenAll(dispatchTasks);
164-
}
165-
else
166-
{
167-
foreach (var task in dispatchTasks)
168-
{
169-
await task.ConfigureAwait(false);
170-
}
171-
}
172-
}
173-
174-
_diagnosticEvents.BatchDispatched(dispatchTasks.Count, _options.EnableParallelBatches);
175-
_signal.TryResetToIdle();
176-
Send(BatchDispatchEventType.Dispatched);
177-
return;
181+
RegisterInFlightDispatches(dispatchTasks, inFlightDispatches);
182+
idleCycles = 0;
183+
continue;
178184
}
179185

180186
// Signal that we have evaluated all enqueued tasks without dispatching any.
@@ -185,17 +191,58 @@ private async Task EvaluateAndDispatchAsync(
185191
// data requirements to the open batches.
186192
await WaitForMoreBatchActivityAsync(lastModified);
187193

188-
// After 10 cycles without dispatch, we add a small delay to provide backpressure.
189-
if (noDispatchCycles >= 10)
194+
// After 10 cycles without dispatch, insert a delay to avoid busy-spinning.
195+
// The first 10 cycles run tight (only the conditional yield in
196+
// WaitForMoreBatchActivityAsync) to give resolvers time to fill batches.
197+
if (idleCycles++ >= 10)
190198
{
191199
await Task.Delay(10, stoppingToken);
192-
noDispatchCycles = 0;
200+
idleCycles = 0;
201+
}
202+
}
203+
}
204+
205+
private void RegisterInFlightDispatches(
206+
List<Task> dispatchTasks,
207+
List<Task> inFlightDispatches)
208+
{
209+
foreach (var dispatchTask in dispatchTasks)
210+
{
211+
if (!dispatchTask.IsCompleted)
212+
{
213+
_ = dispatchTask.ContinueWith(
214+
static (_, state) => ((AsyncAutoResetEvent)state!).Set(),
215+
_signal,
216+
CancellationToken.None,
217+
TaskContinuationOptions.ExecuteSynchronously,
218+
TaskScheduler.Default);
193219
}
194220

195-
noDispatchCycles++;
221+
inFlightDispatches.Add(dispatchTask);
196222
}
197223
}
198224

225+
private static async Task<int> CompleteInFlightDispatchesAsync(List<Task> inFlightDispatches)
226+
{
227+
var completedDispatches = 0;
228+
229+
for (var i = inFlightDispatches.Count - 1; i >= 0; i--)
230+
{
231+
var dispatchTask = inFlightDispatches[i];
232+
233+
if (!dispatchTask.IsCompleted)
234+
{
235+
continue;
236+
}
237+
238+
await dispatchTask.ConfigureAwait(false);
239+
inFlightDispatches.RemoveAt(i);
240+
completedDispatches++;
241+
}
242+
243+
return completedDispatches;
244+
}
245+
199246
private void EvaluateOpenBatches(
200247
ref long lastModified,
201248
PriorityQueue<Batch, long> backlog,
@@ -231,7 +278,7 @@ private void EvaluateOpenBatches(
231278
// we force dispatch it regardless of its status to prevent starvation
232279
// under continuous high load.
233280
//
234-
// We stop evaluation once we've dispatched MaxParallelBatches or when we have touched all batches.
281+
// We stop evaluation when we have touched all batches.
235282
if (singleBatch is not null)
236283
{
237284
// we have an optimized path if there is only a single batch to evaluate.
@@ -250,7 +297,6 @@ private void EvaluateMultipleOpenBatches(
250297
{
251298
var now = Stopwatch.GetTimestamp();
252299
var maxBatchAgeUs = _options.MaxBatchWaitTimeUs;
253-
var maxParallelBatches = _options.MaxParallelBatches;
254300

255301
while (backlog.TryDequeue(out var batch, out _))
256302
{
@@ -276,11 +322,6 @@ private void EvaluateMultipleOpenBatches(
276322
Interlocked.Decrement(ref _openBatches);
277323
dispatchTasks.Add(batch.DispatchAsync());
278324
}
279-
280-
if (dispatchTasks.Count == maxParallelBatches)
281-
{
282-
break;
283-
}
284325
}
285326
}
286327

src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ namespace HotChocolate.Fetching;
55
/// </summary>
66
public struct BatchDispatcherOptions
77
{
8-
private int _maxParallelBatches = 4;
98
private long _maxBatchWaitTimeUs = 50_000;
109

1110
/// <summary>
@@ -15,24 +14,6 @@ public BatchDispatcherOptions()
1514
{
1615
}
1716

18-
/// <summary>
19-
/// Gets or sets a value indicating whether batches can be dispatched in parallel.
20-
/// </summary>
21-
public bool EnableParallelBatches { get; set; } = false;
22-
23-
/// <summary>
24-
/// Gets or sets the maximum number of batches that can be dispatched in parallel.
25-
/// </summary>
26-
public int MaxParallelBatches
27-
{
28-
readonly get => _maxParallelBatches;
29-
set
30-
{
31-
ArgumentOutOfRangeException.ThrowIfLessThan(value, 2, nameof(MaxParallelBatches));
32-
_maxParallelBatches = value;
33-
}
34-
}
35-
3617
/// <summary>
3718
/// Gets or sets the maximum wait time in microseconds before a batch is forcefully dispatched.
3819
/// Disable max batch wait time by setting this value to 0.

0 commit comments

Comments
 (0)