Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ dist/
!scripts/*.ts
scripts/.env
!scripts/.env.example
scripts/assets/
24 changes: 15 additions & 9 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ npx tsx scripts/generate-bot-token.ts
npx tsx scripts/discover-channels.ts
npx tsx scripts/chat-client.ts
npx tsx scripts/test-roundtrip.ts
npx tsx scripts/test-roundtrip-audio.ts
npx tsx scripts/test-thread.ts

# Restart the gateway after code changes
Expand Down Expand Up @@ -45,9 +46,9 @@ The OpenClaw framework discovers the plugin via `plugins.load.paths` in `~/.open

| File | Responsibility |
|---|---|
| `src/channel.ts` | Main plugin export (`streamchatPlugin`). Contains `handleStreamChatMessage` (inbound dispatch) and the `ChannelPlugin` adapter implementations: `config`, `outbound`, `gateway`, `status`. |
| `src/channel.ts` | Main plugin export (`streamchatPlugin`). Contains `handleStreamChatMessage` (inbound dispatch), `resolveStreamChatMedia` (inbound media download + WAV→OGG conversion), `uploadOutboundMedia` (outbound media upload), and the `ChannelPlugin` adapter implementations: `config`, `outbound`, `gateway`, `status`. |
| `src/stream-chat-runtime.ts` | `StreamChatClientRuntime` — wraps the `stream-chat` SDK. Connects as bot user (`allowServerSideConnect: true` is required for Node.js server contexts), queries + watches channels on startup, auto-watches channels added later via `notification.added_to_channel`. |
| `src/streaming.ts` | `StreamingHandler` — manages the AI streaming lifecycle per run: creates placeholder message → sends `ai_indicator` events → calls `partialUpdateMessage` on throttled chunks → finalizes on completion. |
| `src/streaming.ts` | `StreamingHandler` — manages the AI streaming lifecycle per run: creates placeholder message → sends `ai_indicator` events → calls `partialUpdateMessage` on throttled chunks → finalizes on completion with text and any queued attachments. |
| `src/run-context.ts` | `RunContextMap` — binds an OpenClaw `runId` (UUID generated per inbound message) to delivery routing state: `channelId`, `threadParentId`, `responseMessageId`. TTL of 5 min. |
| `src/envelope.ts` | `buildEnvelope` — wraps the raw message text in `[Thread]` / `[Replying to]` XML-like tags so the LLM receives thread and quote context in the single-session model. |
| `src/types.ts` | `StreamChatChannelConfig`, `ResolvedAccount`, `RunContext`, `EnvelopeResult` interfaces, plus config helper functions (`getStreamChatConfig`, `resolveStreamChatAccount`). |
Expand All @@ -72,11 +73,12 @@ message.new (WebSocket)
replyOptions.onPartialReply fires per streaming token (cumulative text):
delta = full.slice(lastPartialText.length) → onTextChunk (throttled partialUpdateMessage)
deliver(payload, info) called once per complete block:
info.kind === "tool" → onRunProgress (EXTERNAL_SOURCES indicator)
payload.isError → onRunError (error text + ERROR indicator)
text block → no-op (already handled token-by-token above)
info.kind === "tool" → onRunProgress (EXTERNAL_SOURCES indicator)
payload.isError → onRunError (error text + ERROR indicator)
payload.mediaUrl/mediaUrls → uploadOutboundMedia → streamingHandler.addAttachment
text block → no-op (already handled token-by-token above)
after dispatcher returns:
→ onRunCompleted (final partialUpdateMessage + ai_indicator.clear)
→ onRunCompleted (final partialUpdateMessage with text + attachments + ai_indicator.clear)
```

**Why pre-create the placeholder:** `onPartialReply` is called fire-and-forget (`void`) by OpenClaw, so it cannot safely do async work (like `channel.sendMessage`). The placeholder must exist before the first token arrives.
Expand All @@ -95,7 +97,8 @@ How each signal from the OpenClaw pipeline translates into Stream Chat API calls
| `onPartialReply` first token | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_GENERATING" })` | Transitions from THINKING on the very first token |
| `onPartialReply` per token — throttled | `client.partialUpdateMessage(msgId, { set: { text, generating: true } })` | Delta-computed from cumulative text. Odd chunks 1,3,5,7; then every N (default 15). Chained via `lastUpdatePromise` to avoid out-of-order updates |
| `deliver` with `info.kind === "tool"` | `channel.sendEvent({ type: "ai_indicator.update", ai_state: "AI_STATE_EXTERNAL_SOURCES" })` | Only emitted once per run (de-duplicated by `indicatorState`) |
| Dispatcher resolves (run complete) | `client.partialUpdateMessage(msgId, { set: { text, generating: false } })` | Final flush, waits for any in-flight partial updates first |
| `deliver` with `payload.mediaUrl` / `payload.mediaUrls` | `channel.sendFile(buffer, name, mime)` → queued as `Attachment` on the stream | Outbound audio (TTS); attachment type is `"audio"` or `"voiceRecording"` if `payload.audioAsVoice` is true |
| Dispatcher resolves (run complete) | `client.partialUpdateMessage(msgId, { set: { text, generating: false, attachments? } })` | Final flush; includes accumulated attachments if any were uploaded during the run |
| Dispatcher resolves (run complete) | `channel.sendEvent({ type: "ai_indicator.clear" })` | Clears the indicator bubble |
| Dispatcher resolves (run complete) | `channel.deleteReaction(inboundMsgId, "eyes")` → `channel.sendReaction(inboundMsgId, { type: "white_check_mark" })` | Reaction swap on the original user message |
| `deliver` with `payload.isError` | `client.partialUpdateMessage(msgId, { set: { text: "…\n\nError: …", generating: false } })` | Appends error to any partial text already accumulated |
Expand All @@ -111,7 +114,8 @@ Each agent run that produces text goes through these steps in `StreamingHandler`

1. `onRunStarted` — `channel.sendMessage({ text: "", ai_generated: true })` → `ai_indicator.update(AI_STATE_THINKING)`
2. `onTextChunk` — accumulates text, switches indicator to `AI_STATE_GENERATING` on first chunk, calls `client.partialUpdateMessage({ set: { text, generating: true } })` throttled (early burst: odd chunks < 8; then every Nth chunk, default N=15)
3. `onRunCompleted` — waits for in-flight partial updates, sends final `partialUpdateMessage({ generating: false })`, sends `ai_indicator.clear`
3. `addAttachment` — called from the `deliver` callback when the agent produces media (e.g. TTS audio); uploads to Stream Chat CDN and queues the `Attachment` object on the stream state
4. `onRunCompleted` — waits for in-flight partial updates, sends final `partialUpdateMessage({ text, generating: false, attachments? })`, sends `ai_indicator.clear`

Force-stop (`ai_indicator.stop` from client) calls `onForceStop`, which clears `generating` without overwriting the accumulated text.

Expand Down Expand Up @@ -147,10 +151,12 @@ Config supports a flat default account or named sub-accounts:
## Key design decisions

- **Bot token in config, secret is not.** The API secret is only used in `scripts/generate-bot-token.ts` to mint a JWT. Only the resulting token is stored in `openclaw.json`.
- **`deliver` callback vs. completion signal.** `dispatchReplyWithBufferedBlockDispatcher` signals completion by resolving its promise, not by passing an `isComplete` flag. The `info.kind` parameter (`"tool" | "block" | "final"`) distinguishes delivery type. `onRunCompleted` is called after the dispatcher awaits. The `ReplyPayload` type has `text` and `isError` as the only relevant fields — there is no `markdown`, `isComplete`, or `toolName` field, despite what seems intuitive.
- **`deliver` callback vs. completion signal.** `dispatchReplyWithBufferedBlockDispatcher` signals completion by resolving its promise, not by passing an `isComplete` flag. The `info.kind` parameter (`"tool" | "block" | "final"`) distinguishes delivery type. `onRunCompleted` is called after the dispatcher awaits. The `ReplyPayload` type fields relevant to this plugin: `text`, `isError`, `mediaUrl`, `mediaUrls`, `audioAsVoice`. There is no `markdown`, `isComplete`, or `toolName` field, despite what seems intuitive.
- **Partial updates are chained via `lastUpdatePromise`.** Each `partialUpdateMessage` is `.then()`-chained onto the previous one to avoid out-of-order message text.
- **`safeSendEvent` swallows errors.** Indicator events are best-effort; a failed `ai_indicator` update must not abort message delivery. Retries: 5 attempts, exponential backoff starting at 100 ms, only on 429/5xx.
- **`seenThreads` is process-scoped.** The `Set<string>` tracking "first message in thread" lives at module level, so it persists across gateway reloads until the process restarts. This is intentional — it avoids re-sending parent context for active threads after a config reload.
- **`onTextChunk` receives deltas despite the wire protocol using full text.** `onPartialReply` provides cumulative text; `channel.ts` extracts the delta before calling `onTextChunk`. Inside `StreamingHandler`, `onTextChunk` re-accumulates deltas into `accumulatedText` and passes that full string to `partialUpdateMessage`. The round-trip is: cumulative → delta → cumulative. The delta extraction exists because `StreamingHandler` was designed around the "streaming chunks" mental model — it owns the accumulation and the throttle counter, making that API feel natural. The redundancy is intentional for architectural clarity, not a bug.
- **`activeGatewayCleanup` is a module-level registry for defence against connection accumulation.** `startAccount` stores a cleanup function per `accountId`. On the next `startAccount` call for the same account, any existing entry is force-invoked before creating the new connection. This self-heals the case where OpenClaw's in-process gateway reload calls `startAccount` without calling `stop()` first, which would otherwise leave orphaned WebSocket connections accumulating (each receiving every `message.new` event). `handleAbort` is idempotent via a `stopped` boolean guard, so it is safe to call from both the abort signal and `stop()`.
- **All three SDK event listeners are removed on cleanup.** `handleAbort` explicitly calls `client.off()` for `message.new` and `ai_indicator.stop`; `chatRuntime.stop()` removes `notification.added_to_channel` (saved as `addedToChannelHandler` in `StreamChatClientRuntime`). Failing to remove any one of these would cause listener accumulation across restarts.
- **Inbound WAV is converted to OGG before saving.** OpenClaw's `EXT_BY_MIME` table does not map `audio/wav` → `.wav`, so WAV files would be saved without an extension and never transcribed. `resolveStreamChatMedia` detects WAV MIME types (`audio/wav`, `audio/wave`, `audio/x-wav`) and converts to OGG Opus via `ffmpeg` before calling `saveMediaBuffer`. Fallback: if ffmpeg is unavailable the original WAV buffer is passed as-is.
- **Outbound audio is uploaded to Stream Chat CDN.** When the agent produces an audio file (`payload.mediaUrl`/`payload.mediaUrls` in the `deliver` callback), `uploadOutboundMedia` reads the local file, determines its MIME type via `mime-types`, and uploads it via `channel.sendFile()`. The returned CDN URL is wrapped in a Stream Chat `Attachment` object (`type: "audio"` or `"voiceRecording"` when `audioAsVoice` is true) and queued on the stream state via `addAttachment`. The attachment is included in the final `partialUpdateMessage` call by `onRunCompleted`.
121 changes: 121 additions & 0 deletions scripts/test-roundtrip-audio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env npx tsx
/**
* Automated audio round-trip test: uploads a WAV file, sends it as a voice
* recording, and observes the bot's streaming response.
*
* Requires a .env file at the project root (see .env.example).
*/

import { config } from "dotenv";
config({ path: new URL(".env", import.meta.url).pathname });
import { StreamChat } from "stream-chat";
import { readFileSync } from "fs";

const apiKey = process.env.STREAM_API_KEY;
const userId = process.env.TEST_USER_ID;
const userToken = process.env.TEST_USER_TOKEN;
const channelId = process.env.TEST_CHANNEL_ID || "ai-test-channel";

if (!apiKey || !userId || !userToken) {
console.error("Error: STREAM_API_KEY, TEST_USER_ID, and TEST_USER_TOKEN must be set in .env");
process.exit(1);
}

const client = new StreamChat(apiKey, { allowServerSideConnect: true });
console.log(`Connecting as ${userId}...`);
await client.connectUser({ id: userId }, userToken);
console.log("Connected.");

// Watch the test channel
const channel = client.channel("messaging", channelId);
await channel.watch();
console.log(`Using channel: messaging:${channelId}`);

let gotResponse = false;

// Listen for new messages
client.on("message.new", (event) => {
if (event.user?.id === userId) return;
if (!event.message) return;
const from = event.user?.name || event.user?.id || "unknown";
const text = event.message.text || "(no text)";
const aiGenerated = event.message.ai_generated ? " [AI]" : "";
const attachments = event.message.attachments || [];
const hasAudio = attachments.some(
(a) => a.type === "voiceRecording" || a.mime_type?.startsWith("audio/"),
);
console.log(`[NEW MSG][${from}]${aiGenerated}: ${text}${hasAudio ? " [has audio]" : ""}`);
});

// Listen for AI indicators
for (const evType of ["ai_indicator.update", "ai_indicator.clear"] as const) {
client.on(evType as Parameters<typeof client.on>[0], (event) => {
const raw = event as Record<string, unknown>;
if (String(raw.type).includes("clear")) {
console.log(`[AI INDICATOR] cleared`);
} else {
console.log(`[AI INDICATOR] ${raw.ai_state}`);
}
});
}

// Listen for message updates (streaming)
client.on("message.updated", (event) => {
if (!event.message) return;
const text = event.message.text || "";
const generating = (event.message as Record<string, unknown>).generating;
const attachments = event.message.attachments || [];
const hasAudio = attachments.some(
(a) => a.type === "voiceRecording" || a.mime_type?.startsWith("audio/"),
);
if (generating) {
console.log(`[STREAMING] ${text.slice(-120)}${hasAudio ? " [has audio]" : ""}`);
} else {
console.log(`[FINAL] ${text}${hasAudio ? " [has audio]" : ""}`);
gotResponse = true;
}
});

// Read audio file
const audioPath = new URL("assets/test-audio.wav", import.meta.url).pathname;
const wavBuffer = readFileSync(audioPath);
console.log(`\nLoaded test audio: ${wavBuffer.length} bytes`);

// Upload WAV via channel.sendFile
console.log("Uploading audio file...");
const uploadResp = await channel.sendFile(wavBuffer as any, "test-audio.wav", "audio/wav");
const assetUrl = uploadResp.file;
console.log(`Uploaded → ${assetUrl.slice(0, 80)}...`);

// Send message with voiceRecording attachment
console.log("Sending audio message...");
await channel.sendMessage({
text: "",
attachments: [
{
type: "voiceRecording",
asset_url: assetUrl,
mime_type: "audio/wav",
title: "test-audio.wav",
file_size: wavBuffer.length,
duration: 5.071,
waveform_data: new Array(100).fill(0.2),
},
],
});
console.log("Audio message sent. Waiting for response...\n");

// Wait up to 60 seconds for response (audio processing is slower)
const start = Date.now();
while (!gotResponse && Date.now() - start < 60000) {
await new Promise((r) => setTimeout(r, 500));
}

if (gotResponse) {
console.log("\n✓ Audio round-trip test PASSED — got bot response.");
} else {
console.log("\n✗ Audio round-trip test TIMED OUT — no response in 60s.");
}

await client.disconnectUser();
process.exit(gotResponse ? 0 : 1);
Loading