Skip to content

Commit 44a4a59

Browse files
josephjclarkclaude
andauthored
Release/worker (#1160)
* Log limits (#1156) * engine: support payload limits per event * add log limit tests * tests * type * changeset * formatting * update changeset * remove rubbish * Engine: try to make calculating payload sizes more efficient (#1155) * scaffolding: make ensure-payload-size async and add slots for two algorithms * first swing at async traversal This actually increases memory overhead * new traversal algorithm with queue, rather than recuion * add benchmarking util 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * tidying * optimisations Still doesn't get close * add streaming algo * update benchmark * remove traversal algorithm * tidy * tidy up * tidy docs * remove ignore * remove old traverse stats * typing * restore await * add a synchronous publish option important for process exit events * revert debug code --------- Co-authored-by: Claude <noreply@anthropic.com> * Worker: support optional batching of log events (#1162) * worker: re-write core event handler to allow more control of sequencing * tidy tests and finish * types * add basic tests * more testing * Add batching and tests * more tests * options and more tests * types * copy new log stuff from other branch * updates for live testing against lightning Works great against new and legacy app, with or without batching * options for interval and limit * fix test * copy across old test * fix type * add batch log test * upate default batch size * changeset * run integration tests in batch mode * update tests * remove logging * docs * versions: worker@1.20. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 697c1df commit 44a4a59

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2111
-316
lines changed

integration-tests/worker/src/init.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export const initWorker = async (
4747
secret: crypto.randomUUID(),
4848
collectionsVersion: '1.0.0',
4949
messageTimeoutSeconds: 0.01,
50+
batchLogs: true,
5051
...workerArgs,
5152
});
5253

integration-tests/worker/test/integration.test.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -988,9 +988,11 @@ test.serial('Redact logs which exceed the payload limit', (t) => {
988988
};
989989

990990
lightning.on('run:log', (evt) => {
991-
if (evt.payload.source === 'JOB') {
992-
t.regex(evt.payload.message[0], /redacted/i);
993-
}
991+
evt.payload.logs.forEach((log) => {
992+
if (log.source === 'JOB') {
993+
t.regex(log.message[0], /redacted/i);
994+
}
995+
});
994996
});
995997

996998
lightning.enqueueRun(run);
@@ -1066,11 +1068,13 @@ test.serial(
10661068
const rtLogs = [];
10671069

10681070
lightning.on('run:log', (e) => {
1069-
if (e.payload.source === 'JOB') {
1070-
jobLogs.push(e.payload);
1071-
} else if (e.payload.source === 'R/T') {
1072-
rtLogs.push(e.payload);
1073-
}
1071+
e.payload.logs.forEach((log) => {
1072+
if (log.source === 'JOB') {
1073+
jobLogs.push(log);
1074+
} else if (log.source === 'R/T') {
1075+
rtLogs.push(log);
1076+
}
1077+
});
10741078
});
10751079

10761080
lightning.once('run:complete', () => {

packages/engine-multi/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# engine-multi
22

3+
## 1.9.0
4+
5+
### Minor Changes
6+
7+
- 5417e97: Accept log-payload-limit-mb (defaults to 1mb)
8+
9+
### Patch Changes
10+
11+
- Updated dependencies [5417e97]
12+
- @openfn/lexicon@1.3.0
13+
314
## 1.8.5
415

516
### Patch Changes

packages/engine-multi/memtest.js

Whitespace-only changes.

packages/engine-multi/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/engine-multi",
3-
"version": "1.8.5",
3+
"version": "1.9.0",
44
"description": "Multi-process runtime engine",
55
"main": "dist/index.js",
66
"type": "module",
@@ -20,7 +20,8 @@
2020
"@openfn/lexicon": "workspace:^",
2121
"@openfn/logger": "workspace:*",
2222
"@openfn/runtime": "workspace:*",
23-
"fast-safe-stringify": "^2.1.1"
23+
"fast-safe-stringify": "^2.1.1",
24+
"json-stream-stringify": "^3.1.6"
2425
},
2526
"devDependencies": {
2627
"@types/node": "^18.19.127",

packages/engine-multi/src/api/execute.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import preloadCredentials from './preload-credentials';
1616
import { ExecutionError } from '../errors';
1717
import type { RunOptions } from '../worker/thread/run';
1818
import { COMPILE_COMPLETE, COMPILE_START, ExternalEvents } from '../events';
19+
import type { PayloadLimits } from '../worker/thread/runtime';
1920

2021
const execute = async (context: ExecutionContext) => {
2122
const { state, callWorker, logger, options } = context;
@@ -47,9 +48,18 @@ const execute = async (context: ExecutionContext) => {
4748
profilePollInteval: context.options.profilePollInterval,
4849
} as RunOptions;
4950

51+
// Construct the payload limits object
52+
const payloadLimits: PayloadLimits = {};
53+
if (options.payloadLimitMb !== undefined) {
54+
payloadLimits.default = options.payloadLimitMb;
55+
}
56+
if (options.logPayloadLimitMb !== undefined) {
57+
payloadLimits[workerEvents.LOG] = options.logPayloadLimitMb;
58+
}
59+
5060
const workerOptions = {
5161
memoryLimitMb: options.memoryLimitMb,
52-
payloadLimitMb: options.payloadLimitMb,
62+
payloadLimits,
5363
timeout: options.runTimeoutMs,
5464
};
5565

packages/engine-multi/src/engine.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export type EngineOptions = {
7474
maxWorkers?: number;
7575
memoryLimitMb?: number;
7676
payloadLimitMb?: number;
77+
logPayloadLimitMb?: number;
7778
repoDir: string;
7879
resolvers?: LazyResolvers;
7980
runtimelogger?: Logger;
@@ -109,6 +110,7 @@ const createEngine = async (
109110
const defaultMemoryLimit = options.memoryLimitMb || DEFAULT_MEMORY_LIMIT_MB;
110111
const defaultPayloadLimit =
111112
options.payloadLimitMb || DEFAULT_PAYLOAD_LIMIT_MB;
113+
const defaultLogPayloadLimit = options.logPayloadLimitMb;
112114
const defaultProfile = options.profile ?? DEFAULT_PROFILE;
113115
const defaultProfilePollInterval =
114116
options.profilePollInterval ?? DEFAULT_PROFILE_POLL_INTERVAL;
@@ -170,6 +172,7 @@ const createEngine = async (
170172
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
171173
memoryLimitMb: opts.memoryLimitMb ?? defaultMemoryLimit,
172174
payloadLimitMb: opts.payloadLimitMb ?? defaultPayloadLimit,
175+
logPayloadLimitMb: opts.logPayloadLimitMb ?? defaultLogPayloadLimit,
173176
jobLogLevel: opts.jobLogLevel,
174177
profile: defaultProfile,
175178
profilePollInterval: defaultProfilePollInterval,

packages/engine-multi/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export type ExecutionContextConstructor = {
4646

4747
export type ExecuteOptions = {
4848
payloadLimitMb?: number;
49+
logPayloadLimitMb?: number;
4950
memoryLimitMb?: number;
5051
resolvers?: LazyResolvers;
5152
runTimeoutMs?: number;

packages/engine-multi/src/util/ensure-payload-size.ts

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { JsonStreamStringify } from 'json-stream-stringify';
2+
13
// This specifies which keys of an event payload to potentially redact
24
// if they are too big
35
const KEYS_TO_VERIFY = ['state', 'final_state', 'log'];
@@ -11,18 +13,22 @@ const replacements: Record<string, any> = {
1113
},
1214
};
1315

14-
export const verify = (value: any, limit_mb: number = 10) => {
16+
export const verify = async (
17+
value: any,
18+
limit_mb: number = 10,
19+
algo: 'stringify' | 'stream' = 'stringify'
20+
) => {
1521
if (value && !isNaN(limit_mb)) {
16-
let size_mb = 0;
17-
try {
18-
const str = typeof value === 'string' ? value : JSON.stringify(value);
19-
const size_bytes = Buffer.byteLength(str, 'utf8');
20-
size_mb = size_bytes / 1024 / 1024;
21-
} catch (e) {
22-
// do nothing
22+
const limitBytes = limit_mb * 1024 * 1024;
23+
24+
let sizeBytes: number;
25+
if (algo === 'stream') {
26+
sizeBytes = await calculateSizeStream(value, limitBytes);
27+
} else {
28+
sizeBytes = calculateSizeStringify(value);
2329
}
2430

25-
if (size_mb > limit_mb) {
31+
if (sizeBytes > limitBytes) {
2632
const e = new Error();
2733
// @ts-ignore
2834
e.name = 'PAYLOAD_TOO_LARGE';
@@ -32,12 +38,39 @@ export const verify = (value: any, limit_mb: number = 10) => {
3238
}
3339
};
3440

35-
export default (payload: any, limit_mb: number = 10) => {
41+
export const calculateSizeStringify = (value: any): number => {
42+
const str = typeof value === 'string' ? value : JSON.stringify(value);
43+
const size_bytes = Buffer.byteLength(str, 'utf8');
44+
return size_bytes;
45+
};
46+
47+
export const calculateSizeStream = async (
48+
value: any,
49+
limit?: number
50+
): Promise<number> => {
51+
let size_bytes = 0;
52+
53+
const stream = new JsonStreamStringify(value);
54+
55+
for await (const chunk of stream) {
56+
// Each chunk is a string token from the JSON output
57+
size_bytes += Buffer.byteLength(chunk, 'utf8');
58+
59+
if (limit !== undefined && size_bytes > limit) {
60+
break;
61+
}
62+
}
63+
stream.destroy();
64+
65+
return size_bytes;
66+
};
67+
68+
export default async (payload: any, limit_mb: number = 10) => {
3669
const newPayload = { ...payload };
3770

3871
for (const key of KEYS_TO_VERIFY) {
3972
try {
40-
verify(payload[key], limit_mb);
73+
await verify(payload[key], limit_mb);
4174
} catch (e) {
4275
Object.assign(newPayload[key], replacements[key] ?? replacements.default);
4376
newPayload.redacted = true;

packages/engine-multi/src/worker/pool.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from './events';
1313
import { HANDLED_EXIT_CODE } from '../events';
1414
import { Logger } from '@openfn/logger';
15+
import type { PayloadLimits } from './thread/runtime';
1516

1617
export type PoolOptions = {
1718
capacity?: number; // defaults to 5
@@ -34,7 +35,7 @@ export type ExecOpts = {
3435
timeout?: number; // ms
3536

3637
memoryLimitMb?: number;
37-
payloadLimitMb?: number;
38+
payloadLimits?: PayloadLimits;
3839
};
3940

4041
export type ChildProcessPool = Array<ChildProcess | false>;
@@ -212,7 +213,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
212213
args,
213214
options: {
214215
memoryLimitMb: opts.memoryLimitMb,
215-
payloadLimitMb: opts.payloadLimitMb,
216+
payloadLimits: opts.payloadLimits,
216217
},
217218
} as RunTaskEvent);
218219
} catch (e) {

0 commit comments

Comments
 (0)