feat(dev): run worker code in a Bun Worker thread per generation#8
Open
jonasnobile wants to merge 197 commits into
Open
feat(dev): run worker code in a Bun Worker thread per generation#8jonasnobile wants to merge 197 commits into
jonasnobile wants to merge 197 commits into
Conversation
30d9ce2 to
e0a3274
Compare
Adds a per-generation Bun Worker runtime gated behind
`workerIsolation: 'thread'` (LopataConfig) / `--worker-isolation=thread`
(CLI). The worker module graph lives in the Worker; reload terminates
and respawns the thread, so edits to transitive deps always take
effect — independent of any in-process module cache.
Background: lopata's `?v=<ts>` cache-bust only invalidates the entry,
and on Bun 1.3.14 the `globalThis.Loader.registry` workaround
(src/module-cache.ts, May 11) silently no-ops because JSC's internal
registry is no longer exposed. Thread mode sidesteps the problem
entirely.
Phase 0 scaffold: routes `fetch` only, env is `{}`, no tracing /
waitUntil / static-assets / WebSocket. Bindings, ctx, and the rest
of the worker contract land in follow-up phases. Default stays
`in-process` so existing setups are unchanged.
Verified end-to-end against tests/fixtures/hmr-worker — 5 new e2e
cases under `HMR E2E — standalone (worker-isolation=thread)` cover
entry edits + transitive dep edits + repeated transitive edits.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Worker thread now builds its own env with the bindings whose state lives on disk: KV, R2, D1, Cache (via globalThis.caches), static-assets binding, vars/.dev.vars/.env, AI, Analytics Engine, Hyperdrive, Images, Media, version_metadata. Each handle opens the same .lopata SQLite/files the main thread uses — WAL + busy_timeout=5000 keep concurrent access safe. Static assets stay main-side for the auto-serve fallback (filesystem only, no DB) so the worker thread doesn't need filesystem routing. Stateful bindings (DO, queues, workflows, service bindings, send_email, browser, containers) still produce an undefined env entry in thread mode; those get RPC bridges in Phase 2. Adds `WorkerIsolation` shared type, exports `ClassRegistry`, hoists `StaticAssets` to a static import, and adds `busy_timeout` to the main-thread DB handle for symmetry. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a worker → main RPC channel so the worker thread can invoke
stateful bindings whose state lives in the main process. Phase 2a
ships the bridge itself and the simplest user — queue producers.
Protocol: worker posts `binding-call {id, target, method, args}`,
main resolves `env[target.binding][method](...args)` and replies with
`binding-result {id, value}` or `binding-error {id, error}`. Structured-
clone handles the payloads for now (Request/Response args land with
service bindings in 2b).
`_doReloadThread` now calls `buildEnv()` on main so the stateful
bindings exist as RPC targets; the worker keeps its stateless duplicates
from Phase 1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tateful bindings Adds `env.SEND_EMAIL.send(message)` over the worker→main RPC bridge. `EmailMessage` is a class lopata exposes via `cloudflare:email`; structured clone strips its identity, so the worker proxy tags the payload with `__lopata_class` and the executor rebuilds the real instance before invoking `send()` on main (`reifyArgs`). Bridge groundwork for follow-up phases: - `BindingTarget` gains an optional `instanceId` so collection-style bindings (Workflows: `env.WF.get(id).method()`) can address an entry. Main resolves via `env[binding].get(instanceId)` before dispatch. - New `binding-fetch` / `binding-fetch-result` message pair carries serialised Request/Response across the boundary — needed once service bindings learn to route back through main in Phase 7. Service bindings, workflows, browser, and DOs are intentionally deferred: each needs main to call back into user code that lives in the worker thread (handlers, classes, entrypoints) — that's the next architectural beat, planned for Phases 6/7. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Findings from a review pass over the Phase 2 RPC bridge:
- Drop `binding-fetch` / `binding-fetch-result` / `binding-fetch-error`
message types, `RpcClient.callFetch`, and `_dispatchBindingFetch`.
Service bindings (the one consumer) are deferred to Phase 7 — adding
the wire shape back is cheap when we actually need it; carrying dead
code costs more.
- `executor.ts`: static-import `EmailMessage` and make `reifyArg` sync —
every RPC call no longer pays a microtask for the dynamic import.
- `thread-env.ts`: replace `constructor?.name === 'EmailMessage'` +
duck-type probes with `instanceof EmailMessage`. The plugin's virtual
`cloudflare:email` resolves to the same module, so identity holds.
- Promote `mainEnv` to a private field on the executor (read on every
RPC) instead of dereferencing through `_initConfig`.
- Use the named `BindingTarget` type in dispatcher signatures; drop the
inline `import('./protocol').SerializedRequest` (lived only inside the
deleted `_dispatchBindingFetch`).
- Trim `BindingTarget` JSDoc; tag `instanceId` `@internal Scaffold`
until the workflow/DO proxies that use it actually land.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The user's `ctx.waitUntil(promise)` runs in the worker thread (the
promise closes over worker-side env + module state) but main needs to
know about it so reload drain doesn't kill background work mid-flight.
- `worker-thread/execution-context.ts`: thread-local `WorkerExecutionContext`
that posts `wait-until-add` when `.waitUntil` is called and
`wait-until-settle` when each promise resolves/rejects. Errors are
logged (matches in-process `ExecutionContext`).
- `protocol.ts`: two new worker→main message types.
- `executor.ts`: `_pendingWaitUntil` counter; `pendingWaitUntil()` getter.
- `entry.ts`: pass the real ctx to user code instead of `{}`.
- `generation.ts`: `isIdle()` returns false while
`threadExecutor.pendingWaitUntil() > 0`, so the drain poll holds the
old generation open until background work settles before terminating
the worker.
Verified end-to-end: the response returns immediately, the KV write
queued via `waitUntil` lands ~ms later, and a reload triggered while a
1.5s `waitUntil` is in flight still observes the write completing
(drain held the previous worker open).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Extract `logIfRejected` helper (single source of truth for the `[lopata] waitUntil promise rejected:` log line). Both the in-process `ExecutionContext` and the new `WorkerExecutionContext` use it. - Drop the redundant `Promise.resolve(promise)` wrap in the worker context — the parameter is already `Promise<unknown>`. - Hoist three dynamic imports in `entry.ts` to static — `thread-env`, `rpc-client`, `execution-context` don't transitively touch `cloudflare:*` virtual modules, so the plugin-first invariant still holds. Only `../plugin` (ordering) and `init.modulePath` (runtime data) stay dynamic. - Mark `WorkerExecutionContext.props` `@internal` scaffold for Phase 7. - Explain the `Math.max(0, ...)` guard on the wait-until counter. - Use `?.` + `??` in `Generation.isIdle()` to match `dispose()` style. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
In thread mode the worker had its own `getTraceStore()` singleton
writing to traces.sqlite on its own connection. Spans landed on disk
but main's in-process subscribers (dashboard websocket) never fired,
so live observability was broken — and worker spans had no parent, so
they appeared as orphan roots.
This commit:
- `tracing/store.ts`: add `setTraceStoreOverride(store)` so a context
can swap the process-local TraceStore for a custom implementation.
- `worker-thread/remote-trace-store.ts`: stand-in `RemoteTraceStore`
that forwards `insertSpan` / `endSpan` / `setSpanStatus` /
`updateAttributes` / `addEvent` / `insertError` to main via
postMessage. Keeps a local `spanId → status` mirror because
`startSpan` reads `getSpanStatus` synchronously.
- `protocol.ts`: 6 new trace-* messages + a `ParentSpanContext` carried
on the `fetch` command.
- `worker-thread/entry.ts`: install the remote store override before
user code loads, and enter the main-provided parent context via
`runWithContext` around the user's `fetch` handler so worker spans
nest under main's server span.
- `worker-thread/executor.ts`: send the active span context with each
fetch command; on the receiving side, route every trace-* message
into the real main `getTraceStore()`.
- `generation.ts` `_callFetchThread`: wrap the dispatch in `startSpan`
with `kind: 'server'` (matches the in-process path) and stamp
`http.status_code` on the response — gives the worker a real parent
context to inherit from.
Verified: a fixture endpoint calling `__lopata.trace('phase4-child', …,
fn)` produces two rows in main's `traces.sqlite` — a `kind=server`
root (`GET /trace/nested`) and a nested `phase4-child` child sharing
the same `trace_id`. The custom event added inside the child span is
also persisted.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `RemoteTraceStore` now `implements Pick<TraceStore, ...>` so the seven shimmed signatures are statically checked against `TraceStore`. Drops the `asTraceStore` cast helper. - `setTraceStoreOverride` takes that `Pick` directly; one cast at the store-internal boundary instead of one per caller. JSDoc trimmed + carries a foot-gun warning about main-thread use muting subscribers. - `RemoteTraceStore.endSpan` deletes the `_statuses` entry after posting (the local mirror is only needed between `insertSpan` and `endSpan`). Plugs an unbounded-Map leak for long-lived generations. - `TraceErrorPayload = Parameters<TraceStore['insertError']>[0]` — drift between protocol and DB row is now a compile error. - Extract `isInfrastructurePath()` (shared with in-process `callFetch`) and `_withServerSpan()` from `_callFetchThread`. - Add an invariant comment on the trace-* message group: wire shape == TraceStore row shape; bumping requires a protocol version + translator. - Trace test polls traces.sqlite instead of sleeping 200ms — robust on slow CI, faster on fast hosts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User code in the worker can now return a `Response { status: 101,
webSocket: clientSide }` and have the upgrade flow through to a real
client, with messages travelling both directions across the thread
boundary.
Architecture:
- Worker creates a local `WebSocketPair`. `pair[0]` ships in the
Response, `pair[1]` stays for user `accept` / `addEventListener` /
`send`. `WorkerWsBridge.register(shipped)` mints a `wsId`, accepts
the shipped side, and attaches listeners that forward outgoing
message/close events to main (`ws-worker-send`, `ws-worker-close`).
- Serialized response carries `webSocketId` instead of a body.
- On main, `WorkerThreadExecutor` sees `webSocketId` and constructs a
fresh `CFWebSocket` with a `BridgeWebSocketPeer` as `_peer`. The
cfSocket is attached to the reconstructed Response. The existing
`cli/dev.ts` upgrade handler is untouched — it sees the same
`cfSocket._peer._accepted` / `_dispatchWSEvent` shape.
- Incoming client messages: `cli/dev.ts` dispatches on `_peer`, which
posts `ws-client-message` to the worker; `WorkerWsBridge` dispatches
on the user-facing peer (or queues until accept).
- Outgoing user messages: worker forwards via `ws-worker-send`; main
dispatches on the cfSocket, firing the listener `cli/dev.ts` added
during upgrade, which forwards to the real WebSocket.
- `Generation._callFetchThread` now takes the `server` reference and
performs the actual `server.upgrade(...)` when the response carries
a 101 + CFWebSocket — mirroring the in-process `handleResponse`.
New protocol messages:
- main → worker: `ws-client-message`, `ws-client-close`
- worker → main: `ws-worker-send`, `ws-worker-close`
Verified end-to-end with a fresh fixture covering client echo (string
+ binary), server-initiated push, and worker-initiated close (custom
code + reason).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `BridgeWebSocketPeer` now extends `CFWebSocket` and only overrides
`_dispatchWSEvent`. Drops the `as unknown as CFWebSocket` cast plus
the hand-rolled `_eventQueue`, `_attachment`, `OPEN`/`CLOSED`
constants — all inherited correctly from the base class.
- Export `WSEvent` and add `ResponseWithWebSocket` type from
`bindings/websocket-pair.ts`. Three sites that previously inlined
`Response & { webSocket?: CFWebSocket }` now use the named type.
- `_callFetchThread(server)` parameter typed as `Server<unknown>` from
`bun` instead of `unknown`; drops the inline `{ upgrade(...): boolean }`
shape.
- Fix `MainWsBridge._sockets` leak on real-client disconnect: the peer
receives an `onForget` callback and removes the entry when the
inbound close event is dispatched. Drops the `forgetSocket` public
surface (no longer needed externally).
- `MainWsBridge.disposeAll` now dispatches a `1012 Service Restart`
close event to each active client before clearing — without it,
worker-thread reload terminates the worker and real clients hang
until TCP timeout.
- Fixture: drop `(globalThis as any).WebSocketPair` reaches; the
tests/fixtures tree is excluded from typecheck so bare references
work fine, matching the existing `ws-worker` fixture style.
Deferred to follow-up PRs (out of Phase 5 scope):
- `_dispatchOrQueue` helper on `CFWebSocket` (touches production
in-process paths in `websocket-pair.ts` + `cli/dev.ts`).
- Shared `connectWS` test helper across the three WS e2e files
(~180 LOC dedup).
- DO bridge dead `ws-create`/`ws-created` cleanup.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Routes `callScheduled` and `callEmail` through the worker, so manual
`/cdn-cgi/handler/scheduled` and `/cdn-cgi/handler/email` triggers now
actually invoke the user's handler in thread mode (previously they
404'd because `defaultExport` lives in the worker, not on main).
- protocol: `scheduled` / `email` commands + matching `*-result` /
`*-error` responses (with a `noHandler` flag for the "user didn't
define this handler" case so main can return 404 without inventing
a stack).
- executor: `executeScheduled(cron, time)` and `executeEmail(id, from,
to, raw)` mirror `executeFetch` — current span context propagated
via `ParentSpanContext`, pendings tracked in a separate map.
- entry: `resolveHandler('scheduled' | 'email')` walks class- vs
object-based exports; `callScheduled` builds a real
`ScheduledController`, `callEmail` builds a `ForwardableEmailMessage`
against the thread-local SQLite handle (so `setReject` / `forward` /
`reply` mutate the shared DB without going through RPC).
- generation:
- `callScheduled` / `callEmail` short-circuit into the executor
when `threadExecutor` is set. The incoming-email row is INSERTed
on main first (uses `getDatabase()` singleton) so both modes
persist the same row.
- `startConsumers` now starts a thread-mode cron timer that calls
`executor.executeScheduled` directly (`_startThreadCronScheduler`)
when crons + `cronEnabled` are configured.
Queue consumers are not yet routed through the bridge — the
`MessageBatch.ackAll` / `retryAll` decision channel between handler
and consumer needs bidirectional plumbing. Left as a follow-up.
Verified with two new e2e tests on the existing stateful fixture:
- POST `/cdn-cgi/handler/scheduled?cron=…` invokes the worker's
`scheduled()` and writes the cron + scheduledTime into KV.
- POST `/cdn-cgi/handler/email?from=…&to=…` with RFC-822 raw invokes
`email()` and writes from/to/rawSize into KV.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Factor `startCronTimer(crons, invoke, workerName)` out of `startCronScheduler` (`bindings/scheduled.ts`). Both modes now share the parse + minute-loop + span wrapping; in-process keeps its old `(controller, env, ctx)` shape via a thin wrapper, thread mode passes `(cron, now) => executor.executeScheduled(cron, now.getTime())`. Drops `Generation._startThreadCronScheduler` entirely. - Extract `runWithParentContext(parent, fn)` in `tracing/context.ts`. Re-seeds an empty `fetchStack` (refs can't cross postMessage) and replaces the inline three-call pattern at the worker switch arms. - Add `WorkerHandlerName = 'fetch' | 'scheduled' | 'email' | 'queue'` to the protocol; `resolveHandler` uses it instead of an inline union. - `Generation._persistAndRethrow(source, err)` collapses three identical `console.error + persistError + throw` blocks in `callScheduled` / `callEmail`. - Drop the `raw.buffer.slice(...) as ArrayBuffer` copy + unsafe cast in `executor.ts:executeEmail`. `Uint8Array` is structured-cloneable; the protocol's `raw` field is now `Uint8Array` and the worker side drops the `new Uint8Array(cmd.raw)` re-wrap. - Hoist `ForwardableEmailMessage`, `createScheduledController`, `getDatabase` to static imports in `entry.ts` — were dynamic, paid a microtask per RPC. - Hoist `withParent` (gone — replaced by `runWithParentContext`); trim the verbose JSDoc on `resolveHandler`. Add a one-line note that `error.message: 'no-handler'` on `*-error` envelopes is a placeholder the executor never surfaces. Deferred (out of scope here): - Refactoring in-process `Generation.getHandler` to mirror the worker's `resolveHandler(name, ctx)`. Touches three in-process call sites with inline class/object walks — separate PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each worker in `lopata.config.ts` now runs in its own Bun Worker thread when `workerIsolation: 'thread'` is set, and `env.AUX.fetch(req)` from worker A correctly routes A → main → worker B's thread without deadlocking or losing the request. Bridge surface: - Restore the `binding-fetch` / `binding-fetch-result` / `binding-fetch-error` message group (dropped in Phase 2 cleanup) — service bindings now have a caller for it. - `RpcClient.callFetch(target, request)` serializes the Request, posts, awaits a `SerializedResponse`. - `WorkerThreadExecutor._dispatchBindingFetch` deserializes, invokes `env[binding].fetch(request)` on main (the `ServiceBinding` instance), and ships the response back. - `thread-env.ts` `makeServiceBindingProxy` exposes `.fetch()` over `callFetch` and stubs `.connect()` with the same "not supported" message as in-process mode. Routing: - `WorkerRegistry.resolveTarget(name)` now returns the target gen's `threadExecutor` alongside `workerModule` + `env`. - `ServiceBinding._resolver` typed as `() => ServiceBindingResolved` with an optional `threadExecutor`. - `ServiceBinding.fetch` short-circuits to `resolved.threadExecutor.executeFetch(request)` when present — bypasses `_getTarget`'s class/object dispatch (the target's defaultExport lives in its own thread, not here). - Extracted `wireServiceBindings` from `wireClassRefs` and called it from `_doReloadThread` so thread-mode generations get their service bindings wired (DO/Workflow class wiring still requires user code on main, so we skip the rest of `wireClassRefs`). Service binding RPC methods (entrypoint class methods, non-fetch) are intentionally out of scope here — they require entrypoint-class wiring across the thread, separate concern. Verified via a new multi-worker fixture (`thread-multi-worker`): main thread fetches `env.AUX.fetch()` and gets aux's response, with the request URL + query string preserved across two thread hops. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add `src/worker-thread/serialize.ts` with `serializeRequest`, `deserializeRequest`, `serializeResponse`, `deserializeResponse`. Replaces three inline `headers.forEach + arrayBuffer()` copies in `executor.ts`, `rpc-client.ts`, and the (now-trivial) wrapper in `entry.ts` that adds the WS-id branch on top of the shared helper. - Drop the duplicate `ServiceBindingResolved` type from `service-binding.ts`; use `ResolvedTarget` from `worker-registry` directly. The cycle worry was unfounded — `bindings/` already imports from `worker-thread/` (DO executor). - Fix stale resolver cast in `wireServiceBindings` (`env.ts`); the self-ref fallback now returns `threadExecutor: null` so the resolver shape stays uniform. - Export `serviceBindingConnectError(name)` from `service-binding.ts`; both the in-process `connect()` and the worker-thread proxy throw via the same helper (no more drift-prone string template duplicated across files). - Fix a real semantic gap: `ServiceBinding._props` were dropped on thread-mode `.fetch()`. Plumbed through `executor.executeFetch(req, props?)`, the `fetch` command's new `props?` field, and into the target worker's `WorkerExecutionContext(post, props)`. Target now sees `ctx.props` matching the CF prod behavior. - Strengthen the trace-context comment in `ServiceBinding.fetch` to say *why* we don't `runWithContext` here. Deferred: - `cleanupFixture` test helper (touches 4 e2e files; test hygiene PR). - Splitting `wireServiceBindings` into two — the conditional is small and the alternative is worse. - Streaming request bodies across postMessage — known limitation, serialization buffers into an ArrayBuffer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Thread-mode main workers can't host DO classes — user code lives in
the worker thread. Instead each DO now spawns its own Bun Worker via
the existing `isolated` DO mode (which already loads the user module
itself from `modulePath`).
- `cli/dev.ts`: when `workerIsolation: 'thread'` is set, auto-install
`WorkerExecutorFactory` (same path the explicit `isolation:
'isolated'` config takes). Both single- and multi-worker modes.
Single-worker also passes a resolved `configPath` so the DO worker
can re-load wrangler config; `config.ts` exposes `findConfigPath()`.
- `generation-manager.ts:_doReloadThread`:
- Call `executorFactory.configure(workerPath, configPath)` so the
factory knows where the user module lives.
- After `buildEnv`, install a stub class on each DO namespace via
`_setClass(class {} as any, env, …)`. The namespace only checks
`_class` for truthiness; the WorkerExecutor reads the real
`className` from config and loads it inside the DO worker thread.
- `worker-thread/thread-env.ts`: `makeDONamespaceProxy` exposes
`idFromName` (deterministic sha256), `idFromString`, `newUniqueId`
(random UUID), `get`, `getByName`, `jurisdiction`. `makeDOStubProxy`
is a `Proxy` whose `.fetch()` routes via `binding-fetch` and whose
unknown props become RPC method calls via `binding-call` — both
carry the DO id as `BindingTarget.instanceId` so main's executor
resolves the right stub from `env.MY_DO.get(idStr)`.
Verified with a new `thread-do-worker` fixture (single SQLite-backed
`Counter` class): per-name instance isolation, persistent storage
across requests, RPC method calls (`.greet`) round-tripping back
through the bridge.
Workflows, queue consumers, and service-binding RPC methods are still
in-process-only — tracked as Phase 8b–d.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Correctness:
- DO stub Proxy in `thread-env.ts` now filters `NON_RPC_PROPS`. The
previous version would return `then` / `catch` / `finally` as RPC
callables, so `await stub.foo` triggered an `.then(resolve, reject)`
RPC call instead of an RPC property read. Property-read RPC is still
a follow-up (Phase 8d territory) but `await stub` no longer breaks.
- Drop the worker-side `jurisdiction()` method. The in-process
`DurableObjectNamespaceImpl` doesn't expose it, so code that worked
in `in-process` mode would suddenly start succeeding in thread mode
— false parity.
Reuse:
- Extract `hashIdFromName(name)` and `randomUniqueIdHex()` to
`bindings/durable-object.ts`. Both the in-process namespace and the
thread-env proxy now share the algorithm — silent drift hazard
closed (same `name` always resolves to the same id across modes).
- Replace local `DurableObjectIdLike` interface + `buildDurableObjectId`
helper with `new DurableObjectIdImpl(...)` directly. One concrete
type, one source of truth.
- Extract `proxyFetch(target, rpc, input, init)` in `thread-env.ts`
shared by the DO stub and service-binding proxy — same Request
construction + `deserializeResponse(callFetch(...))` body.
Quality:
- Name the stub-class hack: `EXTERNAL_DO_CLASS` constant in
`generation-manager.ts` with a comment explaining why an empty class
satisfies `_setClass`'s gate. The cast is in one place now.
- Extract `makeExecutorFactory(needsIsolated)` in `cli/dev.ts`. Both
the multi-worker and single-worker branches no longer duplicate
the dynamic `import('../bindings/do-executor-worker')` + factory
construction + console line.
Efficiency:
- `makeDONamespaceProxy` caches stubs by id string, matching the
in-process `_stubs: Map`. Hot DO clients (`env.MY_DO.get(id)` per
request) stop allocating a fresh Proxy + RPC closures per access.
Deferred:
- RPC property-read parity (`stub.myProp` → `binding-get` message
+ main-side dispatch). Belongs with Phase 8d (service-binding RPC
methods), which has the same shape.
- `wrapRpcReturnValue` parity for nicer error messages on non-
cloneable return values — also Phase 8d.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Workflows are an easier case than DOs: the state machine talks to
SQLite, which both threads can open against the shared `.lopata` file.
So the whole binding lives in the worker thread — no main-side proxy,
no cross-thread RPC for `step.do` / `step.sleep` / `step.waitForEvent`.
- `thread-env.ts`: builds `new SqliteWorkflowBinding(db, name, className,
limits)` for each `workflows[]` entry against the thread-local DB.
Returns the workflow bindings alongside `env` so the caller can wire
classes after the user module loads (shape: `{ env, workflows }`).
- `entry.ts`: after `import(modulePath)`, look up each workflow class
on `workerModule[className]`, call `binding._setClass(cls, env)` +
`binding.resumeInterrupted()` (mirrors `wireClassRefs` in-process).
- Main never sees the binding; `_doReloadThread` already skips
`wireClassRefs`, so main's env keeps a `SqliteWorkflowBinding` from
`buildEnv()` that's just unused (never wired, never called).
Cross-worker access to workflows is the same shape as service bindings —
worker A's user code would access worker B's workflow via a service
binding, and B's worker thread handles it locally. Not exercised in
this commit.
Verified with `thread-workflow-worker` fixture: a two-step `Greeter`
workflow runs end-to-end, results land in `workflow_instances.output`,
and `instance.status()` returns `{ status: 'complete', output: {...} }`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Same shape as workflows: the consumer state machine (poll timer, lease
tracking, ack/retry decisions) talks to SQLite, and the user's queue
handler lives in the worker thread — so everything runs locally
without any cross-thread RPC.
- `thread-env.ts`:
- Return `db` from `buildThreadEnv()` alongside `env` / `workflows`
so the caller can construct consumers against the same handle.
- `startThreadQueueConsumers(config, db, env, workerModule)` builds
a `QueueConsumer` per `queues.consumers[]` entry, resolves the
`queue` handler from `defaultExport` (class- or object-based),
starts polling.
- `resolveQueueHandler` wraps user returns into the `QueueHandler`
`Promise<void>` shape (CF tolerates non-void returns).
- `entry.ts`: after the user module loads and workflows are wired,
call `startThreadQueueConsumers`. Consumers stop automatically when
the worker terminates on reload.
- `Generation.startConsumers` already no-ops on the main side in
thread mode (`getHandler('queue')` returns null when `defaultExport`
is null) — unchanged.
Verified with `thread-queue-worker` fixture: producer.send() x3,
poll cycle picks them up in worker, handler writes KV receipts,
messages flip to `acked` status in `queue_messages`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Plumb `workerName` from `GenerationManager` through `executor` → `WorkerInitConfig` → `entry.ts` → `startThreadQueueConsumers`. The thread-mode `QueueConsumer` now matches the in-process attribution shape: `[queue:<name>]` span + `persistError(_, 'queue', workerName)` see the worker that owns them. Previously thread mode produced unattributed queue spans on the dashboard. - Extract `wireWorkflowClass(binding, className, workerModule, env)` to `bindings/workflow.ts`. The lookup-throw-setClass-resumeInterrupted contract now lives in one place; both the in-process `wireClassRefs` and the worker entry call it. - Move `startThreadQueueConsumers` + `resolveQueueHandler` out of `thread-env.ts` into a new `worker-thread/wire-handlers.ts` (next to `wireWorkflows`). `buildThreadEnv` no longer needs to be the function that *also* exposes queue-consumer wiring helpers — it builds env; the new file does the post-import wiring. Same `db` exits via `ThreadEnvBuilt` (still needed by the queue consumer). - `entry.ts` captures the consumer array (`_queueConsumers`) even though nothing reads it today — keeps a future graceful-shutdown command unblocked. Deferred (cross-cutting, separate PRs): - Unifying `Generation.getHandler` / `entry.ts:resolveHandler` / `wire-handlers.ts:resolveQueueHandler` — same 3-arm class-vs-object walk, but lives across the in-process/thread boundary so a real dedupe touches `src/generation.ts`. - Extracting `normalizeConsumerConfig` for queue consumer defaults shared between `env.ts:218-228` and `wire-handlers.ts`. - Fixing the in-process queue handler `ctx` plumbing — `getHandler` discards the consumer's ctx and constructs a fresh one, which means `ctx.waitUntil` from user code never reaches the consumer's `_awaitAll()`. The thread path is actually correct here (per the review). Latent in-process bug, separate from this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
env.SVC.someMethod(args) from worker A now routes A → main → worker B's
thread, constructs the entrypoint class on B (or uses default export
when no `entrypoint` is configured), invokes the method, and ships the
return value back. Only `.fetch()` worked cross-thread before this.
Protocol:
- `entrypoint-rpc { id, entrypoint?, method, args }` (main → worker)
- `entrypoint-rpc-result { id, value }` / `entrypoint-rpc-error`
(worker → main).
Executor:
- `WorkerThreadExecutor.executeEntrypointRpc(entrypoint, method, args)`
posts the command, tracks a `_pendingRpc` map keyed by id, awaits
the value. Disposed alongside the other pending maps.
Worker entry:
- New `invokeEntrypointRpc(entrypoint, method, args)` constructs the
entrypoint class (`new Cls(ctx, env)`) or uses `defaultExport`
directly when no entrypoint configured. Mirrors the in-process
`ServiceBinding._getTarget` shape.
- New switch arm on `entrypoint-rpc` calls `invokeEntrypointRpc`,
posts result/error.
Main side:
- `ServiceBinding.toProxy()`'s RPC callable (`bindings/service-binding.ts`)
short-circuits to `resolved.threadExecutor.executeEntrypointRpc(
self._entrypoint, prop, args)` when the target lives in a worker
thread. In-process path unchanged.
Worker A side:
- `makeServiceBindingProxy` in `thread-env.ts` is now a Proxy whose
unknown property access produces an RPC callable (matches the
`makeDOStubProxy` shape from Phase 8a). Filters `NON_RPC_PROPS`
so `await binding.foo` doesn't dispatch `then` as RPC.
Verified end-to-end against the existing `thread-multi-worker` fixture:
added `aux.double(n)` returning a primitive and `aux.greet(name)`
returning an object; both round-trip across two worker threads and
back with the right values.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Correctness:
- Extract `resolveEntrypointTarget(workerModule, entrypoint, ctx, env)`
in `bindings/service-binding.ts`. Used by both in-process
`ServiceBinding._getTarget` and the worker-thread `invokeEntrypointRpc`.
Closes a latent divergence — in-process checked `def.prototype?.fetch`,
thread-mode checked `def.prototype`. The looser check (just
`prototype`) is correct for entrypoints that expose only RPC methods
and no fetch; using the shared helper lifts the in-process side
too.
- Plumb `_props` from `ServiceBinding` through `executeEntrypointRpc`
/ `entrypoint-rpc` / `WorkerExecutionContext`. Previously thread-
mode RPC targets saw an empty `ctx.props` while fetch targets saw
the real value — parity bug.
- Add `parent?: ParentSpanContext` to `entrypoint-rpc`. The other
three executor commands (fetch/scheduled/email) all propagate trace
context; RPC was the lone holdout. Worker side enters it via
`runWithParentContext` like the others.
Cleanup:
- Collapse `PendingFetch` / `PendingHandler` / `PendingRpc` into a
generic `interface Pending<T>` plus a `HandlerResult` alias.
- Add `_sendAndAwait<T>(map, build)` private helper in executor.ts.
All four public methods (`executeFetch`, `executeScheduled`,
`executeEmail`, `executeEntrypointRpc`) now go through it. Saves
~25 lines of "disposed-check → await ready → grab active context
→ allocate id → set pending → send" boilerplate per method and
guarantees consistent parent-span propagation.
- Extract `makeRpcProxy(target, rpc, extras)` in `thread-env.ts`
shared by `makeDOStubProxy` and `makeServiceBindingProxy`. DO
passes `{ id, name }` extras; service binding passes `{ connect:
throws }`. Same NON_RPC_PROPS filter, same `.fetch` special-case,
same RPC method fallback. Drops two near-duplicate Proxy literals.
- Cache method functions in `makeRpcProxy` (`Map<string|symbol,
unknown>`). Hot callers (`env.SVC.method(x)` per request) stop
allocating a fresh closure per access.
Deferred:
- `await binding.prop` thenable thread-mode awareness — `_getTarget`
on thread targets has no class to read from. Needs a new
`binding-get` protocol triple; tracked separately.
- Wrapping DataCloneError from `postMessage` with method context.
Today the user sees a bare `DataCloneError` when an RPC return
value isn't structured-cloneable.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Every worker now runs inside a Bun `Worker` per generation. Reload =
terminate + respawn, which gives correct transitive HMR for free (the
entire user module graph is rebuilt) and removes the dependency on the
JSC-internal `globalThis.Loader.registry` API that disappeared in Bun
1.3.14.
- Drops `WorkerIsolation` config / `--worker-isolation` CLI flag /
`module-cache.ts`. `Generation` is thread-only: callFetch/Scheduled/
Email all delegate to `WorkerThreadExecutor`. `GenerationManager._doReload`
inlines what was `_doReloadThread`.
- Adds WebSocket bridging through the DO worker thread. When a DO's
`fetch()` returns `Response{status:101, webSocket: client}`:
* `do-worker-entry.ts` registers the client peer, allocates a wsId,
and forwards user → main bytes via `fetch-ws-outgoing`.
* `do-executor-worker.ts` builds a main-side `CFWebSocket` whose
peer (`FetchDoBridgePeer`) routes outbound bytes back down via
`fetch-ws-incoming`.
* `executor._dispatchBindingFetch` adopts that CFWebSocket through
`MainWsBridge.adoptExisting`, ships the wsId in the binding-fetch
result, and the requesting worker re-ships the same id (via a
`__bridgedWsId` marker on the response) so `Bun.serve.upgrade`
receives the real peer.
- Fixes a buffering ordering bug: bridges now attach `addEventListener`
BEFORE `accept()` (both `ws-bridge.ts` and `do-worker-entry.ts`) — the
flush of queued events (e.g. a `server.send()` issued before the user
returned the response) would otherwise be dropped.
- `MainWsBridge` buffers `ws-worker-send` / `ws-worker-close` for wsIds
that haven't reached `createSocket` yet (race: the worker can dispatch
queued events during `accept()` before the fetch result reaches main).
`disposeAll()` clears the buffer too so stranded entries don't leak.
- Adds `CFWebSocket.dispatchOrQueue()` and routes every bridge through
it, killing ~9 inline copies of the dispatch-or-queue branch.
- Existing in-process paths in `vite-plugin/dev-server-plugin.ts` are
unchanged — Vite still drives the user module on the main thread.
Two WS HMR tests (`tests/ws-hmr-e2e.test.ts`) are marked `.skip`: they
expect an open WebSocket to survive a reload via in-place DO class
hot-swap, which is incompatible with the terminate-and-respawn strategy
that makes transitive HMR correct. Active connections now receive
`1012 Service Restart` on reload and clients reconnect.
Test results: 1272 pass, 2 skip, 0 fail.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pin down the subtle behaviour added in Phase 8e: - `deliverWorkerSend` / `deliverWorkerClose` called before `createSocket` buffer their events; `createSocket` then drains the buffer onto the new socket's queue so `accept()` flushes them in order. - `disposeAll` clears the pending buffer too (the leak the review flagged: stranded entries for wsIds whose binding-fetch errored). - 1012 Service Restart close is dispatched only to non-CLOSED sockets. - `adoptExisting` / `getSocket` round-trip a real peer. - Peer-side dispatches post `ws-client-message` / `ws-client-close` to the worker and forget the wsId on close. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 8e shipped two new WS bridge paths — plain-worker WS through `MainWsBridge` and DO-backed WS through `FetchDoBridgePeer` — but the playground had no surface for either, so neither got smoke-tested manually nor covered by `bun run dev`. - `Counter` DO now broadcasts count changes to subscribers: `fetch()` upgrades to WS, accepts a peer, sends the current count immediately, and re-broadcasts on every `inc` / `dec` / `reset` (whether the change came from the WS itself or from an external RPC caller). - `/ws/echo` route exercises the plain-worker WS path. - `/ws/counter/<name>` route forwards to the Counter DO, exercising the worker → main → DO-worker → main → client bridge end-to-end. - Tiny UI added under the Workflow section: connect/inc/dec/reset buttons plus an echo input, with a rolling event log. Also fixes a real bug in the testing harness uncovered by the new tests: `DurableObjectHandle.connectWebSocket()` called `serverWs.accept()` BEFORE constructing the `TestWebSocket` wrapper, which meant any events the DO had already queued (e.g. an initial `server.send()`) were flushed to no listeners and lost. Now the wrapper is built first, then `accept()` flushes onto a listening peer. Test results: 1285 pass, 2 skip, 0 fail (4 new WS tests in playground). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 8e regression: containers stopped working because `ContainerRuntime`
was only initialised in `InProcessExecutor` (the path now gone), so a
`ContainerBase` DO running in its worker thread had `_containerRuntime`
left undefined. First call into `startAndWaitForPorts` threw
"Container runtime not initialized" — reported via @cloudflare/sandbox
on POST /sandbox/exec.
`do-worker-entry.ts` now looks up the `containers` entry in the wrangler
config whose `class_name` matches the DO's namespace, instantiates a
fresh `ContainerRuntime` + `DockerManager` inside the worker, attaches
the runtime to `DurableObjectState.container`, and (after constructing
the user instance) calls `_wireRuntime` on it when it extends
`ContainerBase`. Mirrors the wiring `InProcessExecutor` used to do.
Each DO worker gets its own `DockerManager` — fine because the manager
is stateless above the docker daemon (it shells out via `bun $`), and
`allocatePort` uses `Bun.serve({port: 0})` which the OS already
de-duplicates.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`cloudflare/sandbox:0.7.4` only publishes an amd64 manifest, so the build failed on arm64 hosts with "no match for platform in manifest". Add `--platform=linux/amd64` to force the amd64 pull; Docker Desktop runs it under Rosetta. Build now succeeds on M-series Macs. The base image will need a real arm64 build (or a multi-arch variant from Cloudflare) before the constant-platform docker lint warning can be addressed properly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cloudflare removed multi-arch publishing for `cloudflare/sandbox` on purpose — production Containers run on linux/amd64, and an arm64 image would let Apple Silicon hosts silently pull a different build than prod. Re-word the Dockerfile comment to reflect intent (dev/prod parity) rather than a presumed gap on Cloudflare's side. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DOs running in their worker thread had `this.env.X` undefined for any stateful binding — `do-worker-env.ts` only constructed KV/R2/D1/queue producers/nested DOs/vars. ErrorBridge crashed with "undefined is not an object (evaluating 'this.env.FAILING[method]')" on the playground's /error-bridge/rpc/* routes. Add an `env-*` RPC channel on the DO worker's existing message pipe and wire proxies for the missing stateful bindings: - New `DOMainMessage.env-call` / `env-fetch` and matching `DOWorkerMessage.env-*-result` / `env-*-error` carry the request and reply. `WorkerExecutor` resolves the binding against the executor's configured main env (the same instance the user-worker uses, so cross- worker service fetches still hit `WorkerRegistry`-resolved threads). - `createDoEnvRpc` exposes a tiny `call` / `callFetch` / `handle` API modelled on `worker-thread/rpc-client.ts`. `buildWorkerEnv` accepts the rpc and registers `makeEnvBindingProxy` entries for `services`, `send_email`, and `workflows`. Same `then`-skipping that the main worker proxy uses so `await stub.foo` doesn't try to RPC `.then`. - `do-worker-entry.ts` constructs the rpc, passes it into `buildWorkerEnv`, and lets the rpc consume its own replies before the command-dispatch path. Smoke-tested via the playground: - `/error-bridge/rpc/ping` → `pong from failing-worker` (env RPC). - `/error-bridge/fetch/ok` → `all good` (env fetch). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…, drop dead wiring DO namespace lifecycle fixes: - the container-DO branch of buildEnv always constructed a fresh namespace, ignoring existingNamespaces. Per reload the old namespace was orphaned (its destroy() deliberately keeps WS-holding executors) while a new one spun up an executor for the same id — two executors opening the same do-sql file, breaking DO single-threading. Reuse the existing namespace like the regular DO branch (CORR-21). - _doNamespaces was only ever .set(), never pruned. A DO class removed from config left a stale namespace that kept firing persisted alarms (spawning threads for a class that no longer exists) and kept its 30s eviction interval alive until process exit. Force-destroy + drop entries whose class left the registry (CORR-26). - buildWorkerEnv returned a doNamespaces array it never pushed to, and the do-worker-entry loop consuming it was unreachable dead code. Removed (CORR-38). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Every reload, the new worker's init ran wireWorkflows() -> resumeInterrupted(), re-executing every running/waiting instance before `ready` was even posted — while the old worker (terminated only after drain + up to 10s grace) was still executing them. With no lease/guard, the in-flight step ran concurrently in two threads: duplicate external side effects and racing workflow_step_attempts writes. The intended mitigation was dead: Generation.stop() called abortRunning() on main's hollow binding (empty module-level abortControllers) — workflows only ever died via worker terminate(). Resume is now main-driven: - wireWorkflowClass no longer resumes during worker init. - a new `resumeInterrupted` workflow-control op routes to the live worker-side binding. - GenerationManager resumes on first load immediately (nothing else can be running them), and on reload only AFTER the old generation's worker is disposed — guarded so a generation is resumed at most once even under stacked reloads. - the dead abortRunning() call is gone; worker termination stops the old generation's workflows, and their persisted rows are picked up by the resume. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…tches Queue consumers run as setIntervals inside the worker thread, and the only stop mechanism was Worker.terminate(). So drain()'s "stops consumers" only cleared the main-side cron timer — for the whole grace period the OLD generation kept polling the shared queue and competing with the new generation for fresh messages, and a mid-batch worker could be terminated even when "idle" (lease expiry then redelivered with the retry counter bumped, pushing messages toward the DLQ on repeated reloads). - Generation.drain() now posts a stop-queue-consumers command; the worker stops its consumers from claiming NEW messages. - in-flight batches register with reload drain (trackBackgroundWork → wait-until), so isIdle() waits for the current batch to finish before the generation is stopped. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
On the user-worker channel, a binding fetch returning 101 set
response.webSocket to a bare { __bridgedWsId } tag — not a CFWebSocket. Only
the reship-to-client flow worked; the CF-documented pattern
`(await env.DO.get(id).fetch(req)).webSocket.accept()` threw an opaque
`accept is not a function`. A regression vs in-process mode.
Mirror the DO env-binding channel onto the user-worker channel: main adopts the
upstream peer with bridgeEvents:true into a dedicated _envBindingWsBridge and
ships its events to the worker, where proxyFetch reconstructs a real user-facing
CFWebSocket via WsGuestBridge.createBridgedSocket. User code can now consume it
(.accept()/.send()/.addEventListener). Reship still works: returning the bridged
socket re-registers it on the worker's top-level WS bridge, double-bridging
through to the real client. New env-ws-incoming/-close-in (main→worker) and
env-ws-outgoing/-close-out (worker→main) messages carry the events.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…bodies Nothing tied a cross-thread stream bridge's lifetime to consumption, so a handler that never read (or only partially read) a request body, or a caller that dropped a binding-fetch response body, left the sender's pump parked in acquireCredit forever — holding the locked source reader + buffered chunks until the generation was disposed. Bounded per stream, but unbounded count over a long dev session. - StreamReceiver registers each reconstructed stream with a FinalizationRegistry that cancels the sender's pump when the stream is garbage-collected before terminating. GC only collects truly-unreferenced streams, so a body still held by ctx.waitUntil is never wrongly cancelled. Covers all channels (top-level request body, rpc-fetch + DO-fetch response bodies) (CORR-11). - the DO-fetch handler now cancels the request-body receiver on error and on a resolve that left the body unconsumed (DO fetch has no ctx.waitUntil, so nothing reads it later) — deterministic, mirroring dispatchRpcFetch (CORR-27). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
In thread mode the main-side _checkSubrequestLimit was effectively dead: every binding call from a worker arrives as its own RPC dispatch wrapped in runWithParentContext, which re-seeds subrequests.count to 0 — so each call counted to exactly 1 and maxSubrequests could never trip (all user workers run in threads now). Count on the worker side instead, in the thread-env binding proxies' fetch/call path: the worker's fetch context is seeded once per top-level request and shared across all its binding calls via AsyncLocalStorage, so the budget accumulates correctly and a runaway worker hits the 1000-subrequest limit. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…boundary
serializeRequestShell shipped only url/method/headers, so the worker's rebuilt
Request had a signal that never fired. Pre-branch handlers got the live
Bun.serve Request, so request.signal.addEventListener('abort', cleanup) (the
common SSE / long-poll cleanup pattern) silently lost its hook in thread mode.
Main now listens to the original request.signal and posts fetch-abort{id} on
client disconnect; the worker keeps an AbortController per in-flight fetch,
passes its signal to the rebuilt Request, and aborts it on fetch-abort. The
controller is kept alive until a streamed response body finishes (so an SSE
loop can still observe the signal) and dropped on completion. (Streamed bodies
already cancelled via stream-cancel; this covers code that watches the signal
directly.)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… respawn Two container lifecycle races: - ContainerRuntime.start() adopted any running container with the matching name, even one left by a crashed/foreign lopata process (labels are immutable, so an adopted container keeps the dead creator's lopata.pid). Its config/env/ports may not match what we'd build now. inspect() now returns labels and start() only adopts a running container whose lopata.pid is THIS process; otherwise it removes and recreates. Combined with the reaper's existing live-pid check this makes the reaper-vs-restored-alarm race benign (CORR-23). - on eviction/reload the executor leaves _executors synchronously while dispose() runs the docker rm cleanup fire-and-forget; a request in that window created a new executor with the SAME container name, and if its docker run landed first the old cleanup could docker rm -f the replacement. The namespace now tracks in-flight disposals per id (container DOs only) and a respawn awaits the prior teardown before its first command — serializing run vs rm (CORR-22). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The port-mapping adoption regression test's inspect() mock predated the adoption pid-check; give it the current-process lopata.pid label so it exercises the own-process adoption path it's testing. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…handlers init-error, entrypoint-rpc-error, entrypoint-rpc-get-error and workflow-control-error hand-rolled new Error(message)+stack+name, dropping the cause chain and custom props (err.code, status, data) that serializeError already ships on the wire. entrypoint-rpc-error is the cross-worker service-binding RPC path, so user code matching on e.code/e.cause silently broke only in thread mode. (CORR-2) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…waitUntil logIfRejected called promise.catch() directly — a non-thenable (ctx.waitUntil(undefined), tolerated by CF) threw synchronously after wait-until-add was already posted, so wait-until-settle never fired, isIdle() stayed false, and every reload waited the full grace period. Coerce via Promise.resolve in logIfRejected (fixes the main-side ExecutionContext throw too) and cover the add/settle pairing. (CORR-4) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…yWhile mirror p.finally(postState) creates a new promise that adopts the callback's rejection; nothing awaited it, so a rejecting blockConcurrencyWhile callback (documented CF behavior — it resets the DO) produced an unhandled-rejection report inside the DO worker even when user code handled the returned promise. Use the both-callbacks then() form so the mirror branch can't reject. (CORR-5) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The init-error branch rejected pendings and flipped _disposed but left the failed thread alive (SQLite handle, container/health timers) until the 30s idle reaper or the next access — each retry against a broken DO transiently leaked another thread. Mirror the onerror teardown: terminate, drop the handle, dispose bridges/streams/rpc. (CORR-6) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…s stopped finish() resumed interrupted workflows as soon as ITS generation stopped, but rapid reloads leave several older generations in overlapping grace windows — another older generation could still be mid-execution of the very instances resumeInterrupted selects (DB rows stay 'running' until the step settles), running one workflow instance in two worker threads at once. Each finish now re-checks that no generation older than the active one is still alive before resuming. (CORR-1) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e open A streamed DO fetch settles its pending command at response headers, dropping _inFlightCount to 0 while do-stream-chunk messages keep flowing. isActive() then reported idle, so _evictIdle (after the 120s idle window) and _disposeExecutorWhenIdle (reload swap) could dispose the executor mid-body — disposeAll error()s a body the caller is still reading (SSE, slow download). Count open inbound/outbound fetch streams in isActive(), mirroring the top-level generation drain's openStreamCount() guard. (CORR-3) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…n the test The sender is seeded `window` credits and the receiver's pull-driven grants add up to `window` more while its queue fills, so the hard bound on in-flight chunks is ~2x STREAM_BACKPRESSURE_WINDOW, not 1x as the doc comment implied. Both halves are intentional (seeding 0 would clock a cold pump to one chunk per postMessage round-trip), so fix the docs and tighten the stall test to the 2x ceiling instead of changing behavior. (CORR-7) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ntainer teardown The _priorDisposalAwaited boolean flipped synchronously, so two concurrent first commands to a fresh container-DO executor raced: the second saw the flag set, skipped the await, and could trigger docker run for the same container name while the prior executor's docker rm was still in flight — the race 3335a76 targeted. Replace the flag with a shared gate promise every _sendCommand awaits (settled after the first, so steady-state cost is one microtask). (CORR-8) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…sted A client that disconnected before dispatch has signal.aborted already true, so onAbort() fired inside the build callback — posting fetch-abort BEFORE the fetch command. The worker no-ops aborts for unknown ids, so the rebuilt Request's signal never fired and pre-registered cleanup listeners never ran. Registering in afterPost keeps abort-after-fetch ordered (postMessage is FIFO; the worker registers the AbortController synchronously on 'fetch'). Covered by a direct executeFetch test with a pre-aborted signal — verified to fail under the old ordering. (CORR-9) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…l channel A DO's workflow binding was a generic env-RPC proxy landing on main's HOLLOW SqliteWorkflowBinding — create() threw 'Workflow class not wired yet' and get(id) handles silently no-oped against main's empty per-process registries (and a real SqliteWorkflowInstance couldn't cross the boundary anyway). Model the public Workflow API as WorkflowControlOp round-trips instead: the DO proxy calls executeControl on main, which forwards through the thread router to the user worker that owns the live state machine. Adds id to the create op and a status op (doubles as get()'s existence check). E2E-covered with a DO that creates a workflow and reads its status — verified to fail without the proxy. (COMP-1) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…it window _disposeExecutorWhenIdle only recorded the disposal in _disposing once its isActive() poll ended, so a respawn for the same container-DO id created mid-poll read _priorDisposal: undefined — and the deferred docker rm could later remove the container the fresh executor had adopted by name. Register a gate promise in _disposing before the poll starts and settle it when the eventual dispose completes. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The enforcement 1c1e263 moved to the worker (checkSubrequestLimit in thread-env.ts) shipped untested — only the dead main-side path had coverage. Add an e2e: >1000 binding RPC calls within one request trips 'Subrequest limit exceeded' (a per-call context re-seed would never trip), and a following request starts from zero. (TEST-1) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
isIdle() was unit-tested but nothing proved the GenerationManager drain wiring actually keeps the old generation's worker alive: a streamed body started before a reload completes untruncated, and a slow handler that hadn't returned its Response yet still serves the OLD code's response while the next request gets the new generation. (TEST-2) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…the DO alive The one race named in the review brief with no coverage: an outbound env-call (this.env.SVC.ping() leaving the DO worker toward main) runs inside a pending inbound command, so isActive() must hold off _evictIdle until it settles. Pin that with a gated service stub and a forced eviction scan mid-call. (TEST-3) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The architecture doc still named main-ws-bridge.ts / ws-bridge.ts — files that no longer exist; WS bridging was consolidated into ws-bridge-shared.ts (WsHostBridge/WsGuestBridge parameterized by envelope callbacks, shared by the user-worker and DO-worker channels). (ARCH-1) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Commit 434f4ed made proxyFetch always reconstruct binding-fetch WebSockets as real CFWebSockets via createBridgedSocket, so the { __bridgedWsId } union member in serializeResponse was assigned nowhere and its reship branch was unreachable. Replace it with a plain guard for a 101 response whose webSocket isn't a CFWebSocket. Also simplify executeFetch's getSocket() ?? register() — a top-level fetch-result id is always a fresh guest-side registration (env-binding adoptions live on _envBindingWsBridge), so getSocket always missed and the comment described reuse that doesn't occur. (CORR-10, CORR-11) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ardown dispose(), onerror and the init-error path each hand-rolled the same mark-disposed / terminate / reject-pending / disposeAll sequence — the drift that let init-error skip termination (CORR-6) in the first place. One private _teardown(error) now serves all three (counterpart of WorkerThreadExecutor._failAll; cross-class extraction skipped on purpose — the pending-map and bridge sets differ enough that a generic helper would obscure more than it shares). Net behavior change: dispose() now also rejects a still-pending ready promise, so a command awaiting ready during a pre-ready dispose rejects instead of hanging. (ARCH-2) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e windows End-to-end coverage for the CORR-1 resume gate: generation 1 runs a slow workflow step while pinned non-idle by an open stream, two reloads land mid-step, and generation 2 (also briefly pinned so its drain finishes via the async waitUntilIdle poll, after the active pointer moved to generation 3) releases first. Pre-fix that resumed the still-running instance on generation 3 and the step callback executed twice — verified: the test fails with runs=2 without the gate. The fixture records one KV marker per step-callback execution. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…rkers
Behavior coverage for the deserializeError fix (CORR-2): a cross-worker
service-binding RPC method throwing Object.assign(new Error(...),
{ code, status, cause }) must reach the caller with all of it intact.
Verified to fail against the old hand-rolled new Error(message)
handler.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactors lopata's dev runtime so user worker code always runs in a Bun
Workerthread (one per generation). Reload = terminate + respawn, which gives correct transitive HMR for free.Why now: Bun 1.3.14 removed
globalThis.Loader.registry(the JSC-internal API the previous in-process cache-bust trick depended on), and the existing?v=<ts>invalidation only ever cleared the entry — transitive deps stayed cached across reloads. The thread-per-generation model sidesteps the registry entirely: a fresh worker re-evaluates every module from disk.The branch is structured as a 13-phase arc (Phase 0 → 8e), each phase a
featcommit followed by arefactor"tighten" pass from a 3-agent simplify review. The final phase (3af2998) flips the default and drops the in-process path; subsequent commits fill in two integration gaps the simplify pass surfaced (DO worker WebSocket bridging,ContainerRuntimewiring, env-binding RPC for DOs).What changed
Runtime
Workerspawned perGeneration. Main thread ownsBun.serve, file watcher, dashboard, stateful bindings; the worker imports the user module and holds stateless bindings + proxies to main.src/worker-thread/directory:executor.ts,entry.ts,protocol.ts,rpc-client.ts,serialize.ts,thread-env.ts,main-ws-bridge.ts,ws-bridge.ts,wire-handlers.ts,remote-trace-store.ts,execution-context.ts.Generationshrank from 376 → ~120 LOC (in-process branches,patchFrameworkDispatch,stitchAsyncStackare gone).WorkerIsolationconfig /--worker-isolationCLI flag /src/module-cache.tsall removed.Cross-thread bridges
binding-call/binding-fetchmessages. Service-binding RPC + fetch both work cross-worker viaWorkerRegistry.ctx.waitUntilpostswait-until-add/wait-until-settle; reload drains pending promises before terminating.AsyncLocalStorage-based) bridges viaParentSpanContext+runWithParentContextso worker-side spans link to the main-side request span.MainWsBridge↔WorkerWsBridgefor the main user worker.FetchDoBridgePeer(new) for WebSockets returned by a DO'sfetch()— main adopts the peer, ships the wsId back soBun.serve.upgradereceives the real CFWebSocket end-to-end.MainWsBridge._pendingEventsbuffers events that race ahead ofcreateSocket(when the worker dispatches queued events duringaccept()before the fetch result has reached main). Cleared indisposeAll().ContainerBaseis now instantiated inside the DO worker (ContainerRuntime+ per-workerDockerManager) — previously onlyInProcessExecutorwired it.this.env.SERVICE.fetch(...)/this.env.WORKFLOW.create(...)/this.env.MAILER.send(...). Newenv-call/env-fetchmessages on the DO-worker channel;WorkerExecutorresolves against the executor'smainEnvreference (the same proxies the main user-worker uses).Bug fixes uncovered along the way
NON_RPC_PROPSdrift: a local copy indo-worker-env.tsdroppedtoJSON/valueOf/toStringfrom the skiplist —JSON.stringify(env.X)from a DO would have dispatched an RPC method call. Now imports the canonical set fromrpc-stub.ts.ws-bridge.ts,do-worker-entry.ts,testing/durable-object.ts:connectWebSocket) calledaccept()before attaching listeners.accept()flushes the event queue — pre-shipped messages were silently lost. Listeners now attach first._disposedguard on the new env dispatchers so side-effecting env calls (KV writes, cross-worker fetches) don't complete after the requesting DO worker has been terminated.Playground
CounterDO grew a WebSocketfetch()that broadcasts count changes to subscribers; new/ws/counter/<name>route exercises the DO WS bridge,/ws/echoexercises the plain-worker bridge.completefrom the UI.Dockerfile.sandboxpins to--platform=linux/amd64(Cloudflare publishes the sandbox image amd64-only on purpose for dev/prod parity).Tests
tests/:thread-stateless-,thread-stateful-,thread-ws-,thread-do-,thread-multi-worker-,thread-workflow-,thread-queue-e2e.test.ts.tests/main-ws-bridge.test.ts— unit coverage for the pending-event buffer +disposeAllcleanup that the simplify review flagged.User-visible behavior change
Active WebSockets do not survive a reload. Previously, the in-process executor could hot-swap the DO class while keeping hibernated WS connections alive. With worker-thread isolation, reload disposes the worker (that's what gives us correct transitive HMR) and active clients receive
1012 Service Restart— they need to reconnect. Two HMR-WS tests are marked.skipwith a comment.DO instances spawn their own Bun Worker (was opt-in via
isolation: 'isolated', now always). For workloads with many short-lived DO instances this adds cold-start cost; bounded by the existing 30s eviction timer.Test plan
bun run typecheckcleanbun run lintcleanbun run format:checkcleanbun run test— 1285 pass, 2 skip, 0 fail🤖 Generated with Claude Code