Skip to content

Commit a999d9e

Browse files
authored
feat(engine): Batch trigger reloaded (#2779)
New batch trigger system with larger payloads, streaming ingestion, larger batch sizes, and a fair processing system. This PR introduces a new `FairQueue` abstraction inspired by our own `RunQueue` that enables multi-tenant fair queueing with concurrency limits. The new `BatchQueue` is built on top of the `FairQueue`, and handles processing Batch triggers in a fair manner with per-environment concurrency limits defined per-org. Additionally, there is a global concurrency limit to prevent the BatchQueue system from creating too many runs too quickly, which can cause downstream issues. For this new BatchQueue system we have a completely new batch trigger creation and ingestion system. Previously this was a single endpoint with a single JSON body that defined details about the batch as well as all the items in the batch. We're introducing a two-phase batch trigger ingestion system. In the first phase, the BatchTaskRun record is created (and possibly rate limited). The second phase is another endpoint that accepts an NDJSON body with each line being a single item/run with payload and options. At ingestion time all items are added to a queue, in order, and then processed by the BatchQueue system. ## New batch trigger rate limits This PR implements a new batch trigger specific rate limit, configured on the `Organization.batchRateLimitConfig` column, and defaults using these environment variables: - `BATCH_RATE_LIMIT_REFILL_RATE` defaults to 10 - `BATCH_RATE_LIMIT_REFILL_INTERVAL` the duration interval, defaults to `"10s"` - `BATCH_RATE_LIMIT_MAX` defaults to 1200 This rate limiter is scoped to the environment ID and controls how many runs can be submitted via batch triggers per interval. The SDK handles the retrying side. ## Batch queue concurrency limits The new column `Organization.batchQueueConcurrencyConfig` now defines an org specific `processingConcurrency` value, with a backup of the env var `BATCH_CONCURRENCY_LIMIT_DEFAULT` which defaults to 10. This controls how many batch queue items are processed concurrently per environment. There is also a global rate limit for the batch queue set via the `BATCH_QUEUE_GLOBAL_RATE_LIMIT` which defaults to being disabled. If set, the entire batch queue system won't process more than `BATCH_QUEUE_GLOBAL_RATE_LIMIT` items per second. This allows controlling the maximum number of runs created per second via batch triggers. ## Batch trigger settings - `STREAMING_BATCH_MAX_ITEMS` controls the maximum number of items in a single batch - `STREAMING_BATCH_ITEM_MAXIMUM_SIZE` controls the maximum size of each item in a batch - `BATCH_CONCURRENCY_DEFAULT_CONCURRENCY` controls the default environment concurrency - `BATCH_QUEUE_DRR_QUANTUM` how many credits each environment gets each round for the DRR scheduler - `BATCH_QUEUE_MAX_DEFICIT` the maximum deficit for the DRR scheduler - `BATCH_QUEUE_CONSUMER_COUNT` how many queue consumers to run - `BATCH_QUEUE_CONSUMER_INTERVAL_MS` how frequently they poll for items in the queue ### Configuration Recommendations by Use Case **High-throughput priority (fairness acceptable at 0.98+):** ```env BATCH_QUEUE_DRR_QUANTUM=25 BATCH_QUEUE_MAX_DEFICIT=100 BATCH_QUEUE_CONSUMER_COUNT=10 BATCH_QUEUE_CONSUMER_INTERVAL_MS=50 BATCH_CONCURRENCY_DEFAULT_CONCURRENCY=25 ``` **Strict fairness priority (throughput can be lower):** ```env BATCH_QUEUE_DRR_QUANTUM=5 BATCH_QUEUE_MAX_DEFICIT=25 BATCH_QUEUE_CONSUMER_COUNT=3 BATCH_QUEUE_CONSUMER_INTERVAL_MS=100 BATCH_CONCURRENCY_DEFAULT_CONCURRENCY=5 ```
1 parent 28a66ac commit a999d9e

File tree

92 files changed

+20803
-796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+20803
-796
lines changed

.changeset/fluffy-crews-rhyme.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
feat: Support for new batch trigger system

.env.example

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,10 @@ POSTHOG_PROJECT_KEY=
8585
# These control the server-side internal telemetry
8686
# INTERNAL_OTEL_TRACE_EXPORTER_URL=<URL to send traces to>
8787
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
88-
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0,
88+
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0
89+
90+
# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
91+
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
92+
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
93+
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
94+
# INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS=15000

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ apps/**/public/build
6262
/packages/trigger-sdk/src/package.json
6363
/packages/python/src/package.json
6464
.claude
65-
.mcp.log
65+
.mcp.log
66+
.cursor/debug.log

apps/webapp/app/components/runs/v3/BatchStatus.tsx

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1-
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/20/solid";
1+
import {
2+
CheckCircleIcon,
3+
ExclamationTriangleIcon,
4+
XCircleIcon,
5+
} from "@heroicons/react/20/solid";
26
import type { BatchTaskRunStatus } from "@trigger.dev/database";
37
import assertNever from "assert-never";
48
import { Spinner } from "~/components/primitives/Spinner";
59
import { cn } from "~/utils/cn";
610

7-
export const allBatchStatuses = ["PENDING", "COMPLETED", "ABORTED"] as const satisfies Readonly<
8-
Array<BatchTaskRunStatus>
9-
>;
11+
export const allBatchStatuses = [
12+
"PROCESSING",
13+
"PENDING",
14+
"COMPLETED",
15+
"PARTIAL_FAILED",
16+
"ABORTED",
17+
] as const satisfies Readonly<Array<BatchTaskRunStatus>>;
1018

1119
const descriptions: Record<BatchTaskRunStatus, string> = {
20+
PROCESSING: "The batch is being processed and runs are being created.",
1221
PENDING: "The batch has child runs that have not yet completed.",
1322
COMPLETED: "All the batch child runs have finished.",
14-
ABORTED: "The batch was aborted because some child tasks could not be triggered.",
23+
PARTIAL_FAILED: "Some runs failed to be created. Successfully created runs are still executing.",
24+
ABORTED: "The batch was aborted because child tasks could not be triggered.",
1525
};
1626

1727
export function descriptionForBatchStatus(status: BatchTaskRunStatus): string {
@@ -47,10 +57,14 @@ export function BatchStatusIcon({
4757
className: string;
4858
}) {
4959
switch (status) {
60+
case "PROCESSING":
61+
return <Spinner className={cn(batchStatusColor(status), className)} />;
5062
case "PENDING":
5163
return <Spinner className={cn(batchStatusColor(status), className)} />;
5264
case "COMPLETED":
5365
return <CheckCircleIcon className={cn(batchStatusColor(status), className)} />;
66+
case "PARTIAL_FAILED":
67+
return <ExclamationTriangleIcon className={cn(batchStatusColor(status), className)} />;
5468
case "ABORTED":
5569
return <XCircleIcon className={cn(batchStatusColor(status), className)} />;
5670
default: {
@@ -61,10 +75,14 @@ export function BatchStatusIcon({
6175

6276
export function batchStatusColor(status: BatchTaskRunStatus): string {
6377
switch (status) {
78+
case "PROCESSING":
79+
return "text-blue-500";
6480
case "PENDING":
6581
return "text-pending";
6682
case "COMPLETED":
6783
return "text-success";
84+
case "PARTIAL_FAILED":
85+
return "text-warning";
6886
case "ABORTED":
6987
return "text-error";
7088
default: {
@@ -75,10 +93,14 @@ export function batchStatusColor(status: BatchTaskRunStatus): string {
7593

7694
export function batchStatusTitle(status: BatchTaskRunStatus): string {
7795
switch (status) {
96+
case "PROCESSING":
97+
return "Processing";
7898
case "PENDING":
7999
return "In progress";
80100
case "COMPLETED":
81101
return "Completed";
102+
case "PARTIAL_FAILED":
103+
return "Partial failure";
82104
case "ABORTED":
83105
return "Aborted";
84106
default: {

apps/webapp/app/entry.server.tsx

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
1-
import {
2-
createReadableStreamFromReadable,
3-
type DataFunctionArgs,
4-
type EntryContext,
5-
} from "@remix-run/node"; // or cloudflare/deno
1+
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
62
import { RemixServer } from "@remix-run/react";
3+
import { wrapHandleErrorWithSentry } from "@sentry/remix";
74
import { parseAcceptLanguage } from "intl-parse-accept-language";
85
import isbot from "isbot";
96
import { renderToPipeableStream } from "react-dom/server";
107
import { PassThrough } from "stream";
118
import * as Worker from "~/services/worker.server";
9+
import { bootstrap } from "./bootstrap";
1210
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1311
import {
1412
OperatingSystemContextProvider,
1513
OperatingSystemPlatform,
1614
} from "./components/primitives/OperatingSystemProvider";
15+
import { Prisma } from "./db.server";
16+
import { env } from "./env.server";
17+
import { eventLoopMonitor } from "./eventLoopMonitor.server";
18+
import { logger } from "./services/logger.server";
19+
import { resourceMonitor } from "./services/resourceMonitor.server";
1720
import { singleton } from "./utils/singleton";
18-
import { bootstrap } from "./bootstrap";
19-
import { wrapHandleErrorWithSentry } from "@sentry/remix";
21+
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
22+
import {
23+
registerRunEngineEventBusHandlers,
24+
setupBatchQueueCallbacks,
25+
} from "./v3/runEngineHandlers.server";
2026

2127
const ABORT_DELAY = 30000;
2228

@@ -228,19 +234,13 @@ process.on("uncaughtException", (error, origin) => {
228234
});
229235

230236
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
237+
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
231238

232239
export { apiRateLimiter } from "./services/apiRateLimit.server";
233240
export { engineRateLimiter } from "./services/engineRateLimit.server";
241+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
234242
export { socketIo } from "./v3/handleSocketIo.server";
235243
export { wss } from "./v3/handleWebsockets.server";
236-
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
237-
import { eventLoopMonitor } from "./eventLoopMonitor.server";
238-
import { env } from "./env.server";
239-
import { logger } from "./services/logger.server";
240-
import { Prisma } from "./db.server";
241-
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
242-
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
243-
import { resourceMonitor } from "./services/resourceMonitor.server";
244244

245245
if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
246246
eventLoopMonitor.enable();

apps/webapp/app/env.server.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ const EnvironmentSchema = z
528528
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
529529
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
530530
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
531+
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().optional(), // Defaults to TASK_PAYLOAD_OFFLOAD_THRESHOLD if not set
531532
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
532533
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
533534
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
@@ -537,6 +538,14 @@ const EnvironmentSchema = z
537538
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
538539
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
539540

541+
// 2-phase batch API settings
542+
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
543+
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
544+
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
545+
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
546+
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
547+
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(10),
548+
540549
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
541550
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
542551
REALTIME_STREAM_TTL: z.coerce
@@ -931,6 +940,17 @@ const EnvironmentSchema = z
931940
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
932941
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
933942

943+
// BatchQueue DRR settings (Run Engine v2)
944+
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(25),
945+
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
946+
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
947+
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
948+
// Global rate limit: max items processed per second across all consumers
949+
// If not set, no global rate limiting is applied
950+
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
951+
// Processing concurrency: max concurrent batch items being processed per environment
952+
BATCH_CONCURRENCY_DEFAULT_CONCURRENCY: z.coerce.number().int().default(1),
953+
934954
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
935955
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
936956
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/presenters/v3/BatchListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ WHERE
195195
throw new Error(`Environment not found for Batch ${batch.id}`);
196196
}
197197

198-
const hasFinished = batch.status !== "PENDING";
198+
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
199199

200200
return {
201201
id: batch.id,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { type BatchTaskRunStatus } from "@trigger.dev/database";
2+
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
3+
import { engine } from "~/v3/runEngine.server";
4+
import { BasePresenter } from "./basePresenter.server";
5+
6+
type BatchPresenterOptions = {
7+
environmentId: string;
8+
batchId: string;
9+
userId?: string;
10+
};
11+
12+
export type BatchPresenterData = Awaited<ReturnType<BatchPresenter["call"]>>;
13+
14+
export class BatchPresenter extends BasePresenter {
15+
public async call({ environmentId, batchId, userId }: BatchPresenterOptions) {
16+
const batch = await this._replica.batchTaskRun.findFirst({
17+
select: {
18+
id: true,
19+
friendlyId: true,
20+
status: true,
21+
runCount: true,
22+
batchVersion: true,
23+
createdAt: true,
24+
updatedAt: true,
25+
completedAt: true,
26+
processingStartedAt: true,
27+
processingCompletedAt: true,
28+
successfulRunCount: true,
29+
failedRunCount: true,
30+
idempotencyKey: true,
31+
runtimeEnvironment: {
32+
select: {
33+
id: true,
34+
type: true,
35+
slug: true,
36+
orgMember: {
37+
select: {
38+
user: {
39+
select: {
40+
id: true,
41+
name: true,
42+
displayName: true,
43+
},
44+
},
45+
},
46+
},
47+
},
48+
},
49+
errors: {
50+
select: {
51+
id: true,
52+
index: true,
53+
taskIdentifier: true,
54+
error: true,
55+
errorCode: true,
56+
createdAt: true,
57+
},
58+
orderBy: {
59+
index: "asc",
60+
},
61+
},
62+
},
63+
where: {
64+
runtimeEnvironmentId: environmentId,
65+
friendlyId: batchId,
66+
},
67+
});
68+
69+
if (!batch) {
70+
throw new Error("Batch not found");
71+
}
72+
73+
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
74+
const isV2 = batch.batchVersion === "runengine:v2";
75+
76+
// For v2 batches in PROCESSING state, get live progress from Redis
77+
// This provides real-time updates without waiting for the batch to complete
78+
let liveSuccessCount = batch.successfulRunCount ?? 0;
79+
let liveFailureCount = batch.failedRunCount ?? 0;
80+
81+
if (isV2 && batch.status === "PROCESSING") {
82+
const liveProgress = await engine.getBatchQueueProgress(batch.id);
83+
if (liveProgress) {
84+
liveSuccessCount = liveProgress.successCount;
85+
liveFailureCount = liveProgress.failureCount;
86+
}
87+
}
88+
89+
return {
90+
id: batch.id,
91+
friendlyId: batch.friendlyId,
92+
status: batch.status as BatchTaskRunStatus,
93+
runCount: batch.runCount,
94+
batchVersion: batch.batchVersion,
95+
isV2,
96+
createdAt: batch.createdAt.toISOString(),
97+
updatedAt: batch.updatedAt.toISOString(),
98+
completedAt: batch.completedAt?.toISOString(),
99+
processingStartedAt: batch.processingStartedAt?.toISOString(),
100+
processingCompletedAt: batch.processingCompletedAt?.toISOString(),
101+
finishedAt: batch.completedAt
102+
? batch.completedAt.toISOString()
103+
: hasFinished
104+
? batch.updatedAt.toISOString()
105+
: undefined,
106+
hasFinished,
107+
successfulRunCount: liveSuccessCount,
108+
failedRunCount: liveFailureCount,
109+
idempotencyKey: batch.idempotencyKey,
110+
environment: displayableEnvironment(batch.runtimeEnvironment, userId),
111+
errors: batch.errors.map((error) => ({
112+
id: error.id,
113+
index: error.index,
114+
taskIdentifier: error.taskIdentifier,
115+
error: error.error,
116+
errorCode: error.errorCode,
117+
createdAt: error.createdAt.toISOString(),
118+
})),
119+
};
120+
}
121+
}
122+

0 commit comments

Comments
 (0)