Skip to content

Commit a3fe0cc

Browse files
committed
fix(cloudflare): fix streaming responses
1 parent e24b7cb commit a3fe0cc

File tree

6 files changed

+266
-124
lines changed

6 files changed

+266
-124
lines changed

docs/deploy/cloudflare.mdx

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,16 @@ app.post("/sandbox/:name/prompt", async (c) => {
7575
const sandbox = await getReadySandbox(c.req.param("name"), c.env);
7676

7777
const sdk = await SandboxAgent.connect({
78-
fetch: (input, init) => sandbox.containerFetch(input as Request | string | URL, init, PORT),
78+
fetch: (input, init) =>
79+
sandbox.containerFetch(
80+
input as Request | string | URL,
81+
{
82+
...(init ?? {}),
83+
// Avoid passing AbortSignal through containerFetch; it can drop streamed session updates.
84+
signal: undefined,
85+
},
86+
PORT,
87+
),
7988
});
8089

8190
const session = await sdk.createSession({ agent: "codex" });
@@ -103,6 +112,33 @@ export default app;
103112
Create the SDK client inside the Worker using custom `fetch` backed by `sandbox.containerFetch(...)`.
104113
This keeps all Sandbox Agent calls inside the Cloudflare sandbox routing path and does not require a `baseUrl`.
105114

115+
## Troubleshooting streaming updates
116+
117+
If you only receive:
118+
- outbound `session/prompt`
119+
- final `{ stopReason: "end_turn" }`
120+
121+
then the streamed update channel dropped. In Cloudflare sandbox paths, this is typically caused by forwarding `AbortSignal` from SDK fetch init into `containerFetch(...)`.
122+
123+
Fix:
124+
125+
```ts
126+
const sdk = await SandboxAgent.connect({
127+
fetch: (input, init) =>
128+
sandbox.containerFetch(
129+
input as Request | string | URL,
130+
{
131+
...(init ?? {}),
132+
// Avoid passing AbortSignal through containerFetch; it can drop streamed session updates.
133+
signal: undefined,
134+
},
135+
PORT,
136+
),
137+
});
138+
```
139+
140+
This keeps prompt completion behavior the same, but restores streamed text/tool updates.
141+
106142
## Local development
107143

108144
```bash

examples/cloudflare/README.md

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,44 @@ curl http://localhost:8787
3939
Test prompt routing through the SDK with a custom sandbox fetch handler:
4040

4141
```bash
42-
curl -X POST "http://localhost:8787/sandbox/demo/prompt" \
42+
curl -N -X POST "http://localhost:8787/sandbox/demo/prompt" \
4343
-H "Content-Type: application/json" \
44+
-H "Accept: text/event-stream" \
4445
-d '{"agent":"codex","prompt":"Reply with one short sentence."}'
4546
```
4647

47-
The response includes `events`, an array of all recorded session events for that prompt.
48+
The response is an SSE stream with events:
49+
- `session.created`
50+
- `session.event`
51+
- `prompt.completed`
52+
- `done`
53+
54+
### Troubleshooting: only two events
55+
56+
If you only see:
57+
- outbound `session/prompt`
58+
- inbound prompt result with `stopReason: "end_turn"`
59+
60+
then ACP `session/update` notifications are not flowing. In Cloudflare sandbox paths this can happen if you forward `AbortSignal` from SDK fetch init into `containerFetch(...)` for long-lived ACP SSE requests.
61+
62+
Use:
63+
64+
```ts
65+
const sdk = await SandboxAgent.connect({
66+
fetch: (input, init) =>
67+
sandbox.containerFetch(
68+
input as Request | string | URL,
69+
{
70+
...(init ?? {}),
71+
// Avoid passing AbortSignal through containerFetch; it can drop ACP SSE updates.
72+
signal: undefined,
73+
},
74+
PORT,
75+
),
76+
});
77+
```
78+
79+
Without `session/update` events, assistant text/tool deltas will not appear in UI streams.
4880

4981
## Deploy
5082

examples/cloudflare/src/index.ts

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { getSandbox, type Sandbox } from "@cloudflare/sandbox";
22
import { Hono } from "hono";
33
import { HTTPException } from "hono/http-exception";
4-
import { runPromptTest, type PromptTestRequest } from "./prompt-test";
4+
import { streamSSE } from "hono/streaming";
5+
import { runPromptEndpointStream, type PromptRequest } from "./prompt-endpoint";
56

67
export { Sandbox } from "@cloudflare/sandbox";
78

@@ -49,7 +50,15 @@ async function getReadySandbox(name: string, env: Bindings): Promise<Sandbox> {
4950

5051
async function proxyToSandbox(sandbox: Sandbox, request: Request, path: string): Promise<Response> {
5152
const query = new URL(request.url).search;
52-
return sandbox.containerFetch(new Request(`http://localhost${path}${query}`, request), PORT);
53+
return sandbox.containerFetch(
54+
`http://localhost${path}${query}`,
55+
{
56+
method: request.method,
57+
headers: request.headers,
58+
body: request.body,
59+
},
60+
PORT,
61+
);
5362
}
5463

5564
const app = new Hono<AppEnv>();
@@ -63,15 +72,34 @@ app.post("/sandbox/:name/prompt", async (c) => {
6372
throw new HTTPException(400, { message: "Content-Type must be application/json" });
6473
}
6574

66-
let payload: PromptTestRequest;
75+
let payload: PromptRequest;
6776
try {
68-
payload = await c.req.json<PromptTestRequest>();
77+
payload = await c.req.json<PromptRequest>();
6978
} catch {
7079
throw new HTTPException(400, { message: "Invalid JSON body" });
7180
}
7281

7382
const sandbox = await getReadySandbox(c.req.param("name"), c.env);
74-
return c.json(await runPromptTest(sandbox, payload, PORT));
83+
return streamSSE(c, async (stream) => {
84+
try {
85+
await runPromptEndpointStream(sandbox, payload, PORT, async (event) => {
86+
await stream.writeSSE({
87+
event: event.type,
88+
data: JSON.stringify(event),
89+
});
90+
});
91+
await stream.writeSSE({
92+
event: "done",
93+
data: JSON.stringify({ ok: true }),
94+
});
95+
} catch (error) {
96+
const message = error instanceof Error ? error.message : String(error);
97+
await stream.writeSSE({
98+
event: "error",
99+
data: JSON.stringify({ message }),
100+
});
101+
}
102+
});
75103
});
76104

77105
app.all("/sandbox/:name/proxy/*", async (c) => {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import type { Sandbox } from "@cloudflare/sandbox";
2+
import { SandboxAgent } from "sandbox-agent";
3+
4+
export type PromptRequest = {
5+
agent?: string;
6+
prompt?: string;
7+
};
8+
9+
export async function runPromptEndpointStream(
10+
sandbox: Sandbox,
11+
request: PromptRequest,
12+
port: number,
13+
emit: (event: { type: string; [key: string]: unknown }) => Promise<void> | void,
14+
): Promise<void> {
15+
const client = await SandboxAgent.connect({
16+
fetch: (req, init) =>
17+
sandbox.containerFetch(
18+
req,
19+
{
20+
...(init ?? {}),
21+
// Cloudflare containerFetch may drop long-lived update streams when
22+
// a forwarded AbortSignal is cancelled; clear it for this path.
23+
signal: undefined,
24+
},
25+
port,
26+
),
27+
});
28+
29+
let unsubscribe: (() => void) | undefined;
30+
try {
31+
const session = await client.createSession({
32+
agent: request.agent ?? "codex",
33+
});
34+
35+
const promptText =
36+
request.prompt?.trim() || "Reply with a short confirmation.";
37+
await emit({
38+
type: "session.created",
39+
sessionId: session.id,
40+
agent: session.agent,
41+
prompt: promptText,
42+
});
43+
44+
let pendingWrites: Promise<void> = Promise.resolve();
45+
unsubscribe = session.onEvent((event) => {
46+
pendingWrites = pendingWrites
47+
.then(async () => {
48+
await emit({ type: "session.event", event });
49+
})
50+
.catch(() => {});
51+
});
52+
53+
const response = await session.prompt([{ type: "text", text: promptText }]);
54+
await pendingWrites;
55+
await emit({ type: "prompt.response", response });
56+
await emit({ type: "prompt.completed" });
57+
} finally {
58+
if (unsubscribe) {
59+
unsubscribe();
60+
}
61+
await Promise.race([
62+
client.dispose(),
63+
new Promise((resolve) => setTimeout(resolve, 250)),
64+
]);
65+
}
66+
}

examples/cloudflare/src/prompt-test.ts

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)