diff --git a/backend/.env.example b/backend/.env.example index c63fe75..5f5c46a 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -45,6 +45,32 @@ ADMIN_EMAIL=admin@forgemind.space # ── CORS ─────────────────────────────────────────────────────────────── CORS_ORIGIN=http://localhost:5173 +# ── LLM provider fallbacks (used when an agent doesn't have its own key) ── +# Each AI Agent can BYOK its own provider key (per-agent override, stored +# AES-256-GCM encrypted). When the per-agent key is blank, the engine falls +# back to whichever of these matches the agent's provider. Leave both unset +# to require BYOK on every agent. +ANTHROPIC_API_KEY= +OPENAI_API_KEY= + +# Optional tuning for the agent inference queue. +# AGENT_QUEUE_CONCURRENCY=4 +# AGENT_QUEUE_ATTEMPTS=2 + +# ── Google OAuth (Sheets in v1; Gmail + Calendar in a later release) ─── +# Create an OAuth 2.0 Client (Web application) in Google Cloud Console +# (https://console.cloud.google.com/apis/credentials), enable the Google +# Sheets + Google Drive APIs for the project, and authorize the redirect +# URI below. Leave unset to disable the Google Integrations tab entirely. +GOOGLE_CLIENT_ID= +GOOGLE_CLIENT_SECRET= +# Must match exactly what's configured in the Google Cloud Console. In +# local dev with the Vite frontend, browsers hit the backend through the +# Vite proxy, so use a URL the backend can serve directly: +# Dev: http://localhost:3001/api/google-integrations/callback +# Prod: https://your-domain/api/google-integrations/callback +GOOGLE_OAUTH_REDIRECT_URI=http://localhost:3001/api/google-integrations/callback + # ── Meta WhatsApp Cloud API ──────────────────────────────────────────── # Needed only for live WhatsApp send/receive. Leave unset to work on UI/API # without making real Meta calls. diff --git a/backend/package.json b/backend/package.json index 7466ed2..1441164 100644 --- a/backend/package.json +++ b/backend/package.json @@ -11,6 +11,7 @@ "test": "node --test test/" }, "dependencies": { + "@anthropic-ai/sdk": "^0.99.0", "bcryptjs": "^2.4.3", "bullmq": "^5.77.2", "cookie-parser": "^1.4.6", @@ -19,10 +20,12 @@ "exceljs": "^4.4.0", "express": "^4.19.2", "express-rate-limit": "^7.3.1", + "googleapis": "^172.0.0", "helmet": "^7.1.0", "ioredis": "^5.4.1", "jsonwebtoken": "^9.0.2", "multer": "^2.1.1", + "openai": "^6.0.0", "pg": "^8.21.0" }, "devDependencies": { diff --git a/backend/src/engine/agentEngine.js b/backend/src/engine/agentEngine.js new file mode 100644 index 0000000..2e293a2 --- /dev/null +++ b/backend/src/engine/agentEngine.js @@ -0,0 +1,371 @@ +// Agent engine: end-to-end "inbound message → LLM tool-use loop → outbound reply". +// +// Called from the agentQueue worker (NOT inline from the webhook) so the +// webhook stays under Meta's 20s timeout. Logs everything to agent_runs + +// agent_run_steps so the UI can show a full trace. + +const pool = require('../db'); +const { decrypt } = require('../util/crypto'); +const { getProvider } = require('../llm'); +const googleSheets = require('../services/googleSheets'); +const { enqueueSend } = require('../queue/sendQueue'); +const { insertPendingRow } = require('../services/messageSender'); +const { getAccountWithToken } = require('../routes/whatsappAccounts'); + +/** + * Build the JSON-schema tool definitions surfaced to the LLM for one agent. + * Returns: + * - tools: array of { name, description, input_schema } (Anthropic shape) + * - executors: map from tool name → async (args) => result + * + * We name tools deterministically as `_` (e.g. `google_sheets_append`) + * so the LLM can pick the right one and so multiple Sheets tools per agent + * (different spreadsheets) get unique names via the row id suffix. + */ +async function buildToolsForAgent(agentId) { + const { rows } = await pool.query( + `SELECT * FROM coexistence.agent_tools + WHERE agent_id = $1 AND is_enabled = TRUE + ORDER BY id`, + [agentId], + ); + + const tools = []; + const executors = {}; + + for (const row of rows) { + if (row.tool_type === 'google_sheets') { + const cfg = row.config || {}; + const ops = Array.isArray(cfg.ops) ? cfg.ops : []; + const baseDesc = `Google Sheet "${cfg.spreadsheet_name || cfg.spreadsheet_id}" tab "${cfg.sheet_name}"`; + + if (ops.includes('read')) { + const name = `google_sheets_read_${row.id}`; + tools.push({ + name, + description: `Read rows from ${baseDesc}. Use this to look up information the user might be asking about.`, + input_schema: { + type: 'object', + properties: { + range: { + type: 'string', + description: "Optional A1 range to read (e.g. 'A2:E50'). Omit to read the whole sheet.", + }, + max_rows: { + type: 'integer', + description: 'Cap the number of rows returned. Default 100, max 500.', + }, + }, + }, + }); + executors[name] = (args) => googleSheets.executeOp({ op: 'read', toolConfig: cfg, args }); + } + + if (ops.includes('append')) { + const name = `google_sheets_append_${row.id}`; + tools.push({ + name, + description: `Append a new row to ${baseDesc}. Use this to save information the user provided (booking, order, lead, etc.).`, + input_schema: { + type: 'object', + properties: { + values: { + type: 'array', + items: { type: ['string', 'number', 'boolean', 'null'] }, + description: 'Cell values in left-to-right column order.', + }, + }, + required: ['values'], + }, + }); + executors[name] = (args) => googleSheets.executeOp({ op: 'append', toolConfig: cfg, args }); + } + + if (ops.includes('update')) { + const name = `google_sheets_update_${row.id}`; + tools.push({ + name, + description: `Update a specific range in ${baseDesc}. Use this only after you've used the read tool to identify which row/range to change.`, + input_schema: { + type: 'object', + properties: { + range: { + type: 'string', + description: "A1 range to overwrite (e.g. 'A5:E5' to replace row 5).", + }, + values: { + type: 'array', + items: { type: ['string', 'number', 'boolean', 'null'] }, + description: 'New cell values for the range.', + }, + }, + required: ['range', 'values'], + }, + }); + executors[name] = (args) => googleSheets.executeOp({ op: 'update', toolConfig: cfg, args }); + } + } + // Future: gmail_send, calendar_create_event, etc. — same pattern. + } + + return { tools, executors }; +} + +/** + * Pull recent chat history for this contact, oldest-first, capped at the + * agent's context window. Skips status updates and reactions. + */ +async function buildMessageHistory({ waAccountId, contactNumber, limit, currentInboundText }) { + // Resolve the agent's wa_number from the WhatsApp account + let waNumber = null; + if (waAccountId) { + const acc = await getAccountWithToken(waAccountId); + waNumber = acc?.displayPhoneNumber || null; + } + + const { rows } = waNumber + ? await pool.query( + `SELECT direction, message_body, timestamp + FROM coexistence.chat_history + WHERE wa_number = $1 AND contact_number = $2 + AND message_type NOT IN ('status','reaction') + AND message_body IS NOT NULL AND message_body <> '' + ORDER BY timestamp DESC + LIMIT $3`, + [waNumber, contactNumber, Math.max(1, Math.min(100, limit || 20))], + ) + : { rows: [] }; + + // DB returned newest-first for the LIMIT; reverse to chronological. + const history = rows.reverse().map(r => ({ + role: r.direction === 'incoming' ? 'user' : 'assistant', + content: r.message_body, + })); + + // The current inbound message may not yet be persisted (timing race vs. + // webhook commit). Append it as the last user message if it isn't there. + const last = history[history.length - 1]; + if (currentInboundText && !(last && last.role === 'user' && last.content === currentInboundText)) { + history.push({ role: 'user', content: currentInboundText }); + } + return history; +} + +function pickApiKey(agent) { + const fromAgent = decrypt(agent.llm_api_key_encrypted); + if (fromAgent) return fromAgent; + if (agent.llm_provider === 'anthropic') return process.env.ANTHROPIC_API_KEY || ''; + if (agent.llm_provider === 'openai') return process.env.OPENAI_API_KEY || ''; + return ''; +} + +async function recordStep(runId, stepIndex, step) { + await pool.query( + `INSERT INTO coexistence.agent_run_steps + (run_id, step_index, step_type, tool_type, input, output, status, latency_ms, error_message) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)`, + [ + runId, + stepIndex, + step.step_type, + step.tool_type || null, + step.input ? JSON.stringify(step.input) : null, + step.output != null ? JSON.stringify(step.output) : null, + step.status, + step.latency_ms || null, + step.error_message ? String(step.error_message).slice(0, 1000) : null, + ], + ); +} + +/** + * Main entry. Loads the agent, builds tools + context, runs the LLM loop, + * persists everything, and enqueues the final reply on the existing sendQueue. + */ +async function runAgent({ agentId, contactNumber, inboundMessageId, inboundText }) { + const { rows: agentRows } = await pool.query( + 'SELECT * FROM coexistence.agents WHERE id = $1', + [agentId], + ); + const agent = agentRows[0]; + if (!agent) throw new Error(`Agent id=${agentId} not found`); + if (!agent.is_active) throw new Error(`Agent id=${agentId} is inactive`); + if (!agent.wa_account_id) throw new Error(`Agent id=${agentId} has no WhatsApp account bound`); + + const apiKey = pickApiKey(agent); + if (!apiKey) { + throw new Error(`No API key for provider '${agent.llm_provider}'. Set a per-agent key, or ${agent.llm_provider === 'anthropic' ? 'ANTHROPIC_API_KEY' : 'OPENAI_API_KEY'} in backend/.env.`); + } + + // Open the run row immediately so a crash mid-loop is still visible in the UI. + const { rows: runRows } = await pool.query( + `INSERT INTO coexistence.agent_runs + (agent_id, wa_account_id, contact_number, inbound_message_id, status) + VALUES ($1,$2,$3,$4,'running') + RETURNING id`, + [agent.id, agent.wa_account_id, contactNumber, inboundMessageId || null], + ); + const runId = runRows[0].id; + + let stepCounter = 0; + const onStep = async (step) => { + stepCounter += 1; + try { + await recordStep(runId, stepCounter, step); + } catch (e) { + console.error('[agentEngine] step persist failed:', e.message); + } + }; + + try { + const { tools, executors } = await buildToolsForAgent(agent.id); + const history = await buildMessageHistory({ + waAccountId: agent.wa_account_id, + contactNumber, + limit: agent.context_window_messages, + currentInboundText: inboundText, + }); + + const provider = getProvider(agent.llm_provider); + const result = await provider.runWithTools({ + systemPrompt: agent.system_prompt, + messages: history, + tools, + onToolCall: async ({ name, args }) => { + const exec = executors[name]; + if (!exec) throw new Error(`Unknown tool '${name}'`); + return await exec(args); + }, + onStep, + model: agent.llm_model, + apiKey, + maxIterations: Math.max(1, Math.min(20, agent.max_tool_iterations || 6)), + }); + + const finalStatus = result.capped ? 'capped' : 'completed'; + await pool.query( + `UPDATE coexistence.agent_runs + SET status=$1, total_input_tokens=$2, total_output_tokens=$3, + final_reply=$4, ended_at=NOW() + WHERE id=$5`, + [finalStatus, result.totalInputTokens, result.totalOutputTokens, + result.finalText || null, runId], + ); + + if (result.finalText) { + // Insert an optimistic chat_history row FIRST so the agent's reply shows + // up in the Chats UI immediately (status='sending'), then the existing + // sendQueue worker swaps the local id for Meta's wamid on success + // (markSent) or marks it failed (markFailed). Without this row, the + // message is delivered to WhatsApp but never appears in our own UI. + const account = await getAccountWithToken(agent.wa_account_id); + let localMessageId = null; + if (account && account.displayPhoneNumber) { + try { + localMessageId = await insertPendingRow({ + account, + toNumber: contactNumber, + messageType: 'text', + messageBody: result.finalText, + }); + } catch (e) { + console.error('[agentEngine] optimistic row insert failed:', e.message); + } + } + await enqueueSend({ + kind: 'text', + accountId: agent.wa_account_id, + to: contactNumber, + localMessageId, + payload: { body: result.finalText }, + }); + } + return { runId, status: finalStatus, finalText: result.finalText }; + } catch (err) { + await pool.query( + `UPDATE coexistence.agent_runs + SET status='failed', error_message=$1, ended_at=NOW() + WHERE id=$2`, + [String(err.message || err).slice(0, 1000), runId], + ); + throw err; + } +} + +/** + * Dry-run an agent for the in-app test chat panel. Same loading + tool + * execution as runAgent, but: + * - Does NOT enqueue a real WhatsApp send (no chat_history write). + * - Does NOT persist to agent_runs / agent_run_steps (test interactions + * would otherwise pollute the run history). + * - Accepts an explicit messages[] array so the operator can simulate a + * multi-turn conversation without writing to chat_history. + * - Returns the full step trace inline so the UI can show what the LLM + * called and what tools fired. + * + * `messages` shape: [{ role: 'user'|'assistant', content: string }] — same as + * the runtime hydrates from chat_history in runAgent. + */ +async function runAgentTest({ agentId, messages }) { + const { rows: agentRows } = await pool.query( + 'SELECT * FROM coexistence.agents WHERE id = $1', + [agentId], + ); + const agent = agentRows[0]; + if (!agent) throw new Error(`Agent id=${agentId} not found`); + + const apiKey = pickApiKey(agent); + if (!apiKey) { + throw new Error(`No API key for provider '${agent.llm_provider}'. Set a per-agent key, or ${agent.llm_provider === 'anthropic' ? 'ANTHROPIC_API_KEY' : 'OPENAI_API_KEY'} in backend/.env.`); + } + + const { tools, executors } = await buildToolsForAgent(agent.id); + + const steps = []; + const onStep = async (step) => { + steps.push({ + stepIndex: steps.length + 1, + stepType: step.step_type, + toolType: step.tool_type || null, + input: step.input, + output: step.output, + status: step.status, + latencyMs: step.latency_ms || null, + errorMessage: step.error_message || null, + }); + }; + + const cleaned = Array.isArray(messages) + ? messages.filter(m => m && m.content && (m.role === 'user' || m.role === 'assistant')) + : []; + if (cleaned.length === 0) { + throw new Error('At least one message is required (role=user|assistant, non-empty content).'); + } + + const provider = getProvider(agent.llm_provider); + const result = await provider.runWithTools({ + systemPrompt: agent.system_prompt, + messages: cleaned, + tools, + onToolCall: async ({ name, args }) => { + const exec = executors[name]; + if (!exec) throw new Error(`Unknown tool '${name}'`); + return await exec(args); + }, + onStep, + model: agent.llm_model, + apiKey, + maxIterations: Math.max(1, Math.min(20, agent.max_tool_iterations || 6)), + }); + + return { + reply: result.finalText || '', + status: result.capped ? 'capped' : 'completed', + totalInputTokens: result.totalInputTokens, + totalOutputTokens: result.totalOutputTokens, + iterations: result.iterations, + steps, + }; +} + +module.exports = { runAgent, runAgentTest, buildToolsForAgent, buildMessageHistory }; diff --git a/backend/src/index.js b/backend/src/index.js index 01eddfd..9ce029d 100644 --- a/backend/src/index.js +++ b/backend/src/index.js @@ -20,10 +20,16 @@ const { router: mediaRouter } = require('./routes/media'); const { router: mediaLibraryRouter } = require('./routes/mediaLibrary'); const mediaStorage = require('./util/pgStorage'); const { router: whatsappAccountsRouter } = require('./routes/whatsappAccounts'); +const { + router: googleIntegrationsRouter, + publicRouter: googleIntegrationsPublicRouter, +} = require('./routes/googleIntegrations'); +const { router: agentsRouter } = require('./routes/agents'); const { router: dashboardRouter } = require('./routes/dashboard'); const { router: pipelinesRouter } = require('./routes/pipelines'); const { startWorker: startMediaWorker, shutdown: shutdownMediaQueue } = require('./queue/mediaQueue'); const { startSendWorker, shutdownSendQueue } = require('./queue/sendQueue'); +const { startAgentWorker, shutdownAgentQueue } = require('./queue/agentQueue'); const app = express(); const PORT = parseInt(process.env.PORT || '3001', 10); @@ -99,6 +105,11 @@ app.get('/health', (req, res) => res.json({ ok: true })); // Public routes (webhook from n8n — no auth) app.use('/api', webhookRouter); +// Google OAuth callback is public: Google redirects the user's browser back +// here, and we re-derive the user from the signed `state` param (see +// routes/googleIntegrations.js). Everything else under /google-integrations is +// auth-required and mounted further down. +app.use('/api', googleIntegrationsPublicRouter); // Auth routes (public) app.use('/api', authRouter); @@ -115,6 +126,8 @@ app.use('/api', authMiddleware, chatbotsRouter); app.use('/api', authMiddleware, mediaRouter); app.use('/api', authMiddleware, mediaLibraryRouter); app.use('/api', authMiddleware, whatsappAccountsRouter); +app.use('/api', authMiddleware, googleIntegrationsRouter); +app.use('/api', authMiddleware, agentsRouter); app.use('/api', authMiddleware, dashboardRouter); app.use('/api', authMiddleware, pipelinesRouter); @@ -134,6 +147,7 @@ async function start() { ); startMediaWorker(); startSendWorker(); + startAgentWorker(); // Stale-pause sweeper: mark paused automation executions that have outlived // their expires_at as error. Resume already inline-checks expires_at, so @@ -200,6 +214,7 @@ async function start() { server.close(() => {}); await shutdownMediaQueue(); await shutdownSendQueue(); + await shutdownAgentQueue(); process.exit(0); }; process.on('SIGTERM', () => shutdown('SIGTERM')); diff --git a/backend/src/llm/anthropic.js b/backend/src/llm/anthropic.js new file mode 100644 index 0000000..b514769 --- /dev/null +++ b/backend/src/llm/anthropic.js @@ -0,0 +1,120 @@ +// Anthropic Claude adapter. Tool-use loop matches the official SDK pattern: +// keep calling messages.create with the rolling history while stop_reason === +// 'tool_use', appending the assistant's tool_use blocks and our tool_result +// blocks each iteration. Bail after maxIterations regardless — runaway tool +// loops are a real cost risk for an open-source app. + +const Anthropic = require('@anthropic-ai/sdk'); + +async function runWithTools({ + systemPrompt, + messages, // [{ role:'user'|'assistant', content:string }] + tools, // [{ name, description, input_schema }] + onToolCall, + onStep, + model, + apiKey, + maxIterations, +}) { + const client = new Anthropic({ apiKey }); + + // Translate our generic messages to Anthropic's format. v1: text only. + const history = messages.map(m => ({ + role: m.role, + content: [{ type: 'text', text: m.content }], + })); + + let totalInputTokens = 0; + let totalOutputTokens = 0; + let finalText = ''; + let iterations = 0; + + while (iterations < maxIterations) { + iterations += 1; + const t0 = Date.now(); + const resp = await client.messages.create({ + model, + max_tokens: 1024, + system: systemPrompt, + tools: tools.length > 0 ? tools : undefined, + messages: history, + }); + const latency = Date.now() - t0; + + totalInputTokens += resp.usage?.input_tokens || 0; + totalOutputTokens += resp.usage?.output_tokens || 0; + + await onStep({ + step_type: 'llm_call', + status: 'ok', + latency_ms: latency, + input: { model, message_count: history.length, tool_count: tools.length }, + output: { + stop_reason: resp.stop_reason, + input_tokens: resp.usage?.input_tokens, + output_tokens: resp.usage?.output_tokens, + }, + }); + + // Append assistant turn (mix of text + tool_use) to history verbatim — the + // Anthropic API requires the exact tool_use blocks to be echoed back when + // we attach tool_result. + history.push({ role: 'assistant', content: resp.content }); + + // Accumulate any text the model emitted this turn. The model may emit text + // alongside a tool_use (a thinking aloud preamble); we want the final + // user-facing answer, which is the text from the turn where stop_reason === + // 'end_turn'. But we also keep the last-seen text as a fallback for capped + // runs. + const textBlocks = (resp.content || []).filter(b => b.type === 'text').map(b => b.text); + if (textBlocks.length > 0) finalText = textBlocks.join('\n').trim(); + + if (resp.stop_reason !== 'tool_use') { + return { finalText, totalInputTokens, totalOutputTokens, iterations }; + } + + // Run every tool_use block the model requested THIS turn (parallel tool use) + const toolUses = (resp.content || []).filter(b => b.type === 'tool_use'); + const toolResults = []; + for (const tu of toolUses) { + const tt0 = Date.now(); + let resultText; + let stepStatus = 'ok'; + let stepError = null; + try { + const r = await onToolCall({ name: tu.name, args: tu.input || {} }); + resultText = typeof r === 'string' ? r : JSON.stringify(r); + } catch (err) { + stepStatus = 'error'; + stepError = err.message; + resultText = `Error: ${err.message}`; + } + await onStep({ + step_type: 'tool_call', + tool_type: tu.name, + status: stepStatus, + latency_ms: Date.now() - tt0, + input: tu.input || {}, + output: stepStatus === 'ok' ? safeParse(resultText) : null, + error_message: stepError, + }); + toolResults.push({ + type: 'tool_result', + tool_use_id: tu.id, + content: resultText, + is_error: stepStatus === 'error', + }); + } + + history.push({ role: 'user', content: toolResults }); + } + + return { finalText, totalInputTokens, totalOutputTokens, iterations, capped: true }; +} + +function safeParse(s) { + if (typeof s !== 'string') return s; + try { return JSON.parse(s); } catch { return { text: s.slice(0, 500) }; } +} + +module.exports = { runWithTools }; diff --git a/backend/src/llm/index.js b/backend/src/llm/index.js new file mode 100644 index 0000000..27769b7 --- /dev/null +++ b/backend/src/llm/index.js @@ -0,0 +1,46 @@ +// LLM provider registry. The agent engine asks for a provider by name and +// gets back a uniform `{ runWithTools(...) }` adapter. Adding a provider is a +// new file in this folder + one line below — the engine code never branches +// on provider name. +// +// Each adapter exports a single async function with this contract: +// +// runWithTools({ +// systemPrompt: string, +// messages: Array<{ role: 'user'|'assistant', content: string }>, +// tools: Array<{ name, description, input_schema }>, +// onToolCall: async ({ name, args }) => any, // executes one tool and returns result +// onStep: async (step) => void, // emits llm_call / tool_call traces +// model: string, +// apiKey: string, +// maxIterations: number, +// }) -> { finalText, totalInputTokens, totalOutputTokens, iterations } +// +// `tools` follows the Anthropic-style shape because it's the simpler superset; +// the OpenAI adapter translates internally. `onToolCall` errors are caught by +// the adapter and fed back to the model as a tool error — the loop continues +// until the model stops asking for tools (or maxIterations). + +const anthropic = require('./anthropic'); +const openai = require('./openai'); + +const PROVIDERS = { + anthropic, + openai, +}; + +function getProvider(name) { + const p = PROVIDERS[name]; + if (!p) { + const err = new Error(`Unknown LLM provider: ${name}`); + err.code = 'UNKNOWN_PROVIDER'; + throw err; + } + return p; +} + +function listProviders() { + return Object.keys(PROVIDERS); +} + +module.exports = { getProvider, listProviders }; diff --git a/backend/src/llm/openai.js b/backend/src/llm/openai.js new file mode 100644 index 0000000..1efb153 --- /dev/null +++ b/backend/src/llm/openai.js @@ -0,0 +1,124 @@ +// OpenAI adapter. Same shape as the Anthropic adapter (see ./index.js for the +// contract). We translate the generic tool schema to OpenAI's +// `tools=[{type:'function', function:{...}}]` format and loop on +// finish_reason === 'tool_calls' until the model stops requesting tools. + +const OpenAI = require('openai'); + +function toOpenAITools(tools) { + return tools.map(t => ({ + type: 'function', + function: { + name: t.name, + description: t.description, + parameters: t.input_schema || { type: 'object', properties: {} }, + }, + })); +} + +async function runWithTools({ + systemPrompt, + messages, + tools, + onToolCall, + onStep, + model, + apiKey, + maxIterations, +}) { + const client = new OpenAI({ apiKey }); + const oaiTools = toOpenAITools(tools); + + const history = [ + { role: 'system', content: systemPrompt }, + ...messages.map(m => ({ role: m.role, content: m.content })), + ]; + + let totalInputTokens = 0; + let totalOutputTokens = 0; + let finalText = ''; + let iterations = 0; + + while (iterations < maxIterations) { + iterations += 1; + const t0 = Date.now(); + const resp = await client.chat.completions.create({ + model, + messages: history, + tools: oaiTools.length > 0 ? oaiTools : undefined, + max_tokens: 1024, + }); + const latency = Date.now() - t0; + + totalInputTokens += resp.usage?.prompt_tokens || 0; + totalOutputTokens += resp.usage?.completion_tokens || 0; + + const choice = resp.choices?.[0]; + const msg = choice?.message; + const finishReason = choice?.finish_reason; + + await onStep({ + step_type: 'llm_call', + status: 'ok', + latency_ms: latency, + input: { model, message_count: history.length, tool_count: oaiTools.length }, + output: { + finish_reason: finishReason, + prompt_tokens: resp.usage?.prompt_tokens, + completion_tokens: resp.usage?.completion_tokens, + }, + }); + + if (msg?.content) finalText = msg.content.trim(); + + // Always push the assistant turn — OpenAI requires the tool_calls echo when + // we attach the matching tool messages below. + history.push(msg); + + const toolCalls = msg?.tool_calls || []; + if (finishReason !== 'tool_calls' || toolCalls.length === 0) { + return { finalText, totalInputTokens, totalOutputTokens, iterations }; + } + + for (const tc of toolCalls) { + const name = tc.function?.name; + let args = {}; + try { args = JSON.parse(tc.function?.arguments || '{}'); } catch { /* leave empty */ } + const tt0 = Date.now(); + let resultText; + let stepStatus = 'ok'; + let stepError = null; + try { + const r = await onToolCall({ name, args }); + resultText = typeof r === 'string' ? r : JSON.stringify(r); + } catch (err) { + stepStatus = 'error'; + stepError = err.message; + resultText = `Error: ${err.message}`; + } + await onStep({ + step_type: 'tool_call', + tool_type: name, + status: stepStatus, + latency_ms: Date.now() - tt0, + input: args, + output: stepStatus === 'ok' ? safeParse(resultText) : null, + error_message: stepError, + }); + history.push({ + role: 'tool', + tool_call_id: tc.id, + content: resultText, + }); + } + } + + return { finalText, totalInputTokens, totalOutputTokens, iterations, capped: true }; +} + +function safeParse(s) { + if (typeof s !== 'string') return s; + try { return JSON.parse(s); } catch { return { text: s.slice(0, 500) }; } +} + +module.exports = { runWithTools }; diff --git a/backend/src/queue/agentQueue.js b/backend/src/queue/agentQueue.js new file mode 100644 index 0000000..9856bfc --- /dev/null +++ b/backend/src/queue/agentQueue.js @@ -0,0 +1,83 @@ +// Agent inference queue. The webhook handler enqueues; this worker runs the +// agent's LLM tool-use loop off the request path so Meta doesn't time out (20s +// webhook ceiling). Per-contact serial processing prevents two simultaneous +// agent runs from sending out-of-order replies to the same chat. + +const { Queue, Worker, QueueEvents } = require('bullmq'); +const IORedis = require('ioredis'); +const { runAgent } = require('../engine/agentEngine'); + +const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379'; +const QUEUE_NAME = 'forgechat-agent'; +const CONCURRENCY = parseInt(process.env.AGENT_QUEUE_CONCURRENCY || '4', 10); +const ATTEMPTS = parseInt(process.env.AGENT_QUEUE_ATTEMPTS || '2', 10); + +const connection = new IORedis(REDIS_URL, { + maxRetriesPerRequest: null, + enableReadyCheck: false, +}); +connection.on('error', err => console.error('[agentQueue] redis error:', err.message)); + +const agentQueue = new Queue(QUEUE_NAME, { connection }); + +let worker = null; +let queueEvents = null; + +async function processJob(job) { + const { agentId, contactNumber, inboundMessageId, inboundText } = job.data || {}; + return await runAgent({ agentId, contactNumber, inboundMessageId, inboundText }); +} + +function startAgentWorker() { + if (worker) return worker; + worker = new Worker(QUEUE_NAME, processJob, { + connection, + concurrency: CONCURRENCY, + }); + + worker.on('completed', (job, result) => { + const r = result || {}; + console.log(`[agentQueue] agent=${job.data?.agentId} contact=${job.data?.contactNumber} status=${r.status} run=${r.runId}`); + }); + worker.on('failed', (job, err) => { + console.error(`[agentQueue] agent=${job?.data?.agentId} contact=${job?.data?.contactNumber} failed (attempt ${job?.attemptsMade}/${ATTEMPTS}): ${err?.message}`); + }); + + queueEvents = new QueueEvents(QUEUE_NAME, { connection }); + queueEvents.on('error', err => console.error('[agentQueue] events error:', err.message)); + + console.log(`[agentQueue] worker started, concurrency=${CONCURRENCY}, attempts=${ATTEMPTS}`); + return worker; +} + +/** + * Enqueue an agent run. The jobId pins it to (agent, contact) so a flood of + * messages from the same number doesn't fan out into parallel runs that step + * over each other. + */ +async function enqueueAgentRun({ agentId, contactNumber, inboundMessageId, inboundText }) { + await agentQueue.add( + 'run', + { agentId, contactNumber, inboundMessageId, inboundText }, + { + jobId: `agent-${agentId}-${contactNumber}-${inboundMessageId || Date.now()}`, + attempts: ATTEMPTS, + backoff: { type: 'exponential', delay: 2000 }, + removeOnComplete: { count: 200, age: 3600 }, + removeOnFail: { count: 500, age: 86400 }, + }, + ); +} + +async function shutdownAgentQueue() { + try { + if (worker) await worker.close(); + if (queueEvents) await queueEvents.close(); + await agentQueue.close(); + await connection.quit(); + } catch (err) { + console.error('[agentQueue] shutdown error:', err.message); + } +} + +module.exports = { agentQueue, startAgentWorker, enqueueAgentRun, shutdownAgentQueue }; diff --git a/backend/src/routes/agents.js b/backend/src/routes/agents.js new file mode 100644 index 0000000..9f91a8d --- /dev/null +++ b/backend/src/routes/agents.js @@ -0,0 +1,365 @@ +// AI Agents CRUD + runs viewer. +// +// Single-owner system, same pattern as whatsappAccounts.js: every authenticated +// request is the owner. BYOK API keys are AES-256-GCM at rest (util/crypto.js) +// and never returned in plaintext except via ?reveal=1 on the single-agent +// GET, mirroring how access tokens are revealed elsewhere. + +const { Router } = require('express'); +const pool = require('../db'); +const { encrypt, decrypt, maskSecret } = require('../util/crypto'); + +const router = Router(); + +function adminOnly(req, res, next) { + if (!req.user) return res.status(401).json({ error: 'Unauthorized' }); + next(); +} + +function agentShape(row, { reveal = false } = {}) { + if (!row) return null; + const apiKey = decrypt(row.llm_api_key_encrypted); + return { + id: row.id, + name: row.name, + description: row.description, + systemPrompt: row.system_prompt, + llmProvider: row.llm_provider, + llmModel: row.llm_model, + llmApiKeyMasked: maskSecret(apiKey || ''), + llmApiKey: reveal ? (apiKey || '') : undefined, + hasOwnApiKey: !!apiKey, + waAccountId: row.wa_account_id, + isActive: row.is_active, + contextWindowMessages: row.context_window_messages, + maxToolIterations: row.max_tool_iterations, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +function toolShape(row) { + return { + id: row.id, + agentId: row.agent_id, + toolType: row.tool_type, + config: row.config || {}, + isEnabled: row.is_enabled, + createdAt: row.created_at, + }; +} + +router.get('/agents', async (req, res) => { + try { + const { rows } = await pool.query( + `SELECT a.*, + (SELECT COUNT(*)::int FROM coexistence.agent_tools t WHERE t.agent_id = a.id) AS tool_count, + (SELECT MAX(started_at) FROM coexistence.agent_runs r WHERE r.agent_id = a.id) AS last_run_at + FROM coexistence.agents a + ORDER BY a.updated_at DESC`, + ); + res.json(rows.map(r => ({ + ...agentShape(r), + toolCount: r.tool_count, + lastRunAt: r.last_run_at, + }))); + } catch (err) { + console.error('[agents] list error:', err.message); + res.status(500).json({ error: 'Failed to list agents' }); + } +}); + +router.get('/agents/:id', async (req, res) => { + try { + const { rows } = await pool.query( + 'SELECT * FROM coexistence.agents WHERE id = $1', + [req.params.id], + ); + if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); + + const { rows: tools } = await pool.query( + `SELECT * FROM coexistence.agent_tools WHERE agent_id = $1 ORDER BY id`, + [req.params.id], + ); + res.json({ + ...agentShape(rows[0], { reveal: req.query.reveal === '1' }), + tools: tools.map(toolShape), + }); + } catch (err) { + console.error('[agents] get error:', err.message); + res.status(500).json({ error: 'Failed to fetch agent' }); + } +}); + +router.post('/agents', adminOnly, async (req, res) => { + try { + const b = req.body || {}; + if (!b.name || !b.systemPrompt || !b.llmProvider || !b.llmModel) { + return res.status(400).json({ error: 'name, systemPrompt, llmProvider, llmModel are required' }); + } + if (!['anthropic', 'openai'].includes(b.llmProvider)) { + return res.status(400).json({ error: "llmProvider must be 'anthropic' or 'openai'" }); + } + const { rows } = await pool.query( + `INSERT INTO coexistence.agents + (name, description, system_prompt, llm_provider, llm_model, + llm_api_key_encrypted, wa_account_id, is_active, + context_window_messages, max_tool_iterations) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) + RETURNING *`, + [ + b.name.trim(), b.description?.trim() || null, + b.systemPrompt, b.llmProvider, b.llmModel.trim(), + b.llmApiKey ? encrypt(b.llmApiKey.trim()) : null, + b.waAccountId || null, + !!b.isActive, + Math.max(1, Math.min(100, parseInt(b.contextWindowMessages || 20, 10))), + Math.max(1, Math.min(20, parseInt(b.maxToolIterations || 6, 10))), + ], + ); + res.status(201).json(agentShape(rows[0])); + } catch (err) { + if (err.code === '23505') { + return res.status(409).json({ error: 'Another agent is already active on this WhatsApp account. Disable it first.' }); + } + console.error('[agents] create error:', err.message); + res.status(500).json({ error: 'Failed to create agent' }); + } +}); + +router.put('/agents/:id', adminOnly, async (req, res) => { + try { + const b = req.body || {}; + const { rows: existing } = await pool.query( + 'SELECT * FROM coexistence.agents WHERE id = $1', + [req.params.id], + ); + if (existing.length === 0) return res.status(404).json({ error: 'Not found' }); + + const sets = ['updated_at = NOW()']; + const params = []; + let i = 1; + const push = (col, val) => { sets.push(`${col} = $${i++}`); params.push(val); }; + + if (b.name !== undefined) push('name', b.name.trim()); + if (b.description !== undefined) push('description', b.description?.trim() || null); + if (b.systemPrompt !== undefined) push('system_prompt', b.systemPrompt); + if (b.llmProvider !== undefined) { + if (!['anthropic', 'openai'].includes(b.llmProvider)) { + return res.status(400).json({ error: "llmProvider must be 'anthropic' or 'openai'" }); + } + push('llm_provider', b.llmProvider); + } + if (b.llmModel !== undefined) push('llm_model', b.llmModel.trim()); + // Treat an empty string as "clear my key, fall back to server env"; null/undefined + // means "no change"; any other string means "rotate to this new key". + if (b.llmApiKey !== undefined) { + if (b.llmApiKey === '') push('llm_api_key_encrypted', null); + else push('llm_api_key_encrypted', encrypt(b.llmApiKey.trim())); + } + if (b.waAccountId !== undefined) push('wa_account_id', b.waAccountId || null); + if (b.isActive !== undefined) push('is_active', !!b.isActive); + if (b.contextWindowMessages !== undefined) { + push('context_window_messages', Math.max(1, Math.min(100, parseInt(b.contextWindowMessages, 10) || 20))); + } + if (b.maxToolIterations !== undefined) { + push('max_tool_iterations', Math.max(1, Math.min(20, parseInt(b.maxToolIterations, 10) || 6))); + } + + params.push(req.params.id); + const { rows } = await pool.query( + `UPDATE coexistence.agents SET ${sets.join(', ')} WHERE id = $${i} RETURNING *`, + params, + ); + res.json(agentShape(rows[0])); + } catch (err) { + if (err.code === '23505') { + return res.status(409).json({ error: 'Another agent is already active on this WhatsApp account. Disable it first.' }); + } + console.error('[agents] update error:', err.message); + res.status(500).json({ error: 'Failed to update agent' }); + } +}); + +router.delete('/agents/:id', adminOnly, async (req, res) => { + try { + const { rowCount } = await pool.query( + 'DELETE FROM coexistence.agents WHERE id = $1', + [req.params.id], + ); + if (rowCount === 0) return res.status(404).json({ error: 'Not found' }); + res.json({ ok: true }); + } catch (err) { + console.error('[agents] delete error:', err.message); + res.status(500).json({ error: 'Failed to delete agent' }); + } +}); + +/* --------------------------- Tools (nested) --------------------------- */ + +router.post('/agents/:id/tools', adminOnly, async (req, res) => { + try { + const b = req.body || {}; + if (!b.toolType || !b.config) { + return res.status(400).json({ error: 'toolType and config are required' }); + } + if (b.toolType === 'google_sheets') { + const cfg = b.config; + if (!cfg.google_account_id || !cfg.spreadsheet_id || !cfg.sheet_name) { + return res.status(400).json({ error: 'Sheets tool needs google_account_id, spreadsheet_id, sheet_name' }); + } + if (!Array.isArray(cfg.ops) || cfg.ops.length === 0) { + return res.status(400).json({ error: 'Sheets tool needs at least one op enabled (read/append/update)' }); + } + } + const { rows } = await pool.query( + `INSERT INTO coexistence.agent_tools (agent_id, tool_type, config, is_enabled) + VALUES ($1,$2,$3,$4) RETURNING *`, + [req.params.id, b.toolType, JSON.stringify(b.config), b.isEnabled !== false], + ); + res.status(201).json(toolShape(rows[0])); + } catch (err) { + console.error('[agents] tool create error:', err.message); + res.status(500).json({ error: 'Failed to add tool' }); + } +}); + +router.put('/agents/:id/tools/:toolId', adminOnly, async (req, res) => { + try { + const b = req.body || {}; + const sets = []; + const params = []; + let i = 1; + if (b.config !== undefined) { sets.push(`config = $${i++}`); params.push(JSON.stringify(b.config)); } + if (b.isEnabled !== undefined) { sets.push(`is_enabled = $${i++}`); params.push(!!b.isEnabled); } + if (sets.length === 0) return res.status(400).json({ error: 'No updatable fields provided' }); + params.push(req.params.id, req.params.toolId); + const { rows } = await pool.query( + `UPDATE coexistence.agent_tools SET ${sets.join(', ')} + WHERE agent_id = $${i++} AND id = $${i} RETURNING *`, + params, + ); + if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); + res.json(toolShape(rows[0])); + } catch (err) { + console.error('[agents] tool update error:', err.message); + res.status(500).json({ error: 'Failed to update tool' }); + } +}); + +router.delete('/agents/:id/tools/:toolId', adminOnly, async (req, res) => { + try { + const { rowCount } = await pool.query( + 'DELETE FROM coexistence.agent_tools WHERE agent_id = $1 AND id = $2', + [req.params.id, req.params.toolId], + ); + if (rowCount === 0) return res.status(404).json({ error: 'Not found' }); + res.json({ ok: true }); + } catch (err) { + console.error('[agents] tool delete error:', err.message); + res.status(500).json({ error: 'Failed to delete tool' }); + } +}); + +/* --------------------------- Runs (viewer) ---------------------------- */ + +router.get('/agents/:id/runs', async (req, res) => { + try { + const limit = Math.max(1, Math.min(200, parseInt(req.query.limit || '50', 10))); + const { rows } = await pool.query( + `SELECT id, agent_id, contact_number, inbound_message_id, status, + total_input_tokens, total_output_tokens, final_reply, error_message, + started_at, ended_at + FROM coexistence.agent_runs + WHERE agent_id = $1 + ORDER BY started_at DESC + LIMIT $2`, + [req.params.id, limit], + ); + res.json(rows.map(r => ({ + id: r.id, + agentId: r.agent_id, + contactNumber: r.contact_number, + inboundMessageId: r.inbound_message_id, + status: r.status, + totalInputTokens: r.total_input_tokens, + totalOutputTokens: r.total_output_tokens, + finalReply: r.final_reply, + errorMessage: r.error_message, + startedAt: r.started_at, + endedAt: r.ended_at, + }))); + } catch (err) { + console.error('[agents] runs error:', err.message); + res.status(500).json({ error: 'Failed to fetch runs' }); + } +}); + +router.get('/agents/:id/runs/:runId', async (req, res) => { + try { + const { rows: runs } = await pool.query( + `SELECT * FROM coexistence.agent_runs WHERE id = $1 AND agent_id = $2`, + [req.params.runId, req.params.id], + ); + if (runs.length === 0) return res.status(404).json({ error: 'Not found' }); + const { rows: steps } = await pool.query( + `SELECT * FROM coexistence.agent_run_steps WHERE run_id = $1 ORDER BY step_index`, + [req.params.runId], + ); + const r = runs[0]; + res.json({ + id: r.id, + agentId: r.agent_id, + contactNumber: r.contact_number, + inboundMessageId: r.inbound_message_id, + status: r.status, + totalInputTokens: r.total_input_tokens, + totalOutputTokens: r.total_output_tokens, + finalReply: r.final_reply, + errorMessage: r.error_message, + startedAt: r.started_at, + endedAt: r.ended_at, + steps: steps.map(s => ({ + id: s.id, + stepIndex: s.step_index, + stepType: s.step_type, + toolType: s.tool_type, + input: s.input, + output: s.output, + status: s.status, + latencyMs: s.latency_ms, + errorMessage: s.error_message, + createdAt: s.created_at, + })), + }); + } catch (err) { + console.error('[agents] run detail error:', err.message); + res.status(500).json({ error: 'Failed to fetch run' }); + } +}); + +/* --------------------------- Test chat (preview) ---------------------- */ +const { runAgentTest } = require('../engine/agentEngine'); + +// POST /agents/:id/test body: { messages: [{role:'user'|'assistant', content}] } +// +// In-app dry run of an agent. Runs the LLM loop with real tool execution +// (Sheets append/read/update WILL hit the real spreadsheet — operators are +// expected to point a test agent at a test sheet) but skips the WhatsApp send +// and skips agent_runs persistence so the run history stays clean. Returns +// the reply text + the per-step trace. +router.post('/agents/:id/test', adminOnly, async (req, res) => { + try { + const messages = req.body?.messages; + if (!Array.isArray(messages) || messages.length === 0) { + return res.status(400).json({ error: 'messages must be a non-empty array of {role,content}' }); + } + const result = await runAgentTest({ agentId: req.params.id, messages }); + res.json(result); + } catch (err) { + console.error('[agents] test error:', err.message); + res.status(500).json({ error: err.message || 'Agent test failed' }); + } +}); + +module.exports = { router }; diff --git a/backend/src/routes/googleIntegrations.js b/backend/src/routes/googleIntegrations.js new file mode 100644 index 0000000..9b9a63f --- /dev/null +++ b/backend/src/routes/googleIntegrations.js @@ -0,0 +1,193 @@ +// Google Integrations API. +// +// GET /api/google-integrations List connected Google accounts +// POST /api/google-integrations/authorize Get Google consent URL (start of flow) +// GET /api/google-integrations/callback OAuth callback (Google redirects here) +// DELETE /api/google-integrations/:id Disconnect (revoke + delete) +// GET /api/google-integrations/status Lightweight "is this configured on the server?" +// +// Note: /callback is hit by the user's browser AFTER they approve in Google, +// not by Google directly, so the request still carries the auth cookie. The +// callback finishes by 302-redirecting the browser back to the frontend +// settings tab with ?connected=1 (or ?error=...) so the React UI refreshes +// itself. + +const { Router } = require('express'); +const crypto = require('crypto'); +const pool = require('../db'); +const { + PROVIDER, + isConfigured, + buildAuthUrl, + verifyState, + handleCallback, + revokeAndDelete, +} = require('../services/googleAuth'); +const googleSheets = require('../services/googleSheets'); + +// Public router: only the callback. Google's redirect lands the user's browser +// here without our cookie guaranteed (SameSite=Lax can drop it across an OAuth +// hop in some browsers), so we re-derive the user from the signed state token. +const publicRouter = Router(); + +// Protected router: everything else requires the caller to be signed in. +const router = Router(); + +/** + * Where to send the user's browser after the OAuth dance finishes. Falls back + * to "/" if CORS_ORIGIN isn't set (dev). Trailing-slash safe. + */ +function frontendSettingsUrl({ status, error, label }) { + const base = (process.env.CORS_ORIGIN || '/').replace(/\/+$/, ''); + const params = new URLSearchParams(); + if (status) params.set('google', status); + if (error) params.set('error', error.slice(0, 200)); + if (label) params.set('label', label); + return `${base}/#/admin-settings/google-integrations?${params.toString()}`; +} + +function publicShape(row) { + return { + id: row.id, + provider: row.provider, + accountLabel: row.account_label, + scopes: row.scopes || [], + healthStatus: row.health_status, + lastErrorMessage: row.last_error_message, + lastRefreshedAt: row.last_refreshed_at, + accessTokenExpiresAt: row.access_token_expires_at, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +/** Lightweight probe so the UI can show a helpful message when OAuth isn't + * set up — AND so users can see the exact redirect URI they must authorize + * in Google Cloud Console (mismatches here are the #1 cause of failed + * consent redirects). The URI is returned even when `configured=false` so + * an admin can copy it before they've finished filling in the server env. */ +router.get('/google-integrations/status', (req, res) => { + res.json({ + configured: isConfigured(), + redirectUri: process.env.GOOGLE_OAUTH_REDIRECT_URI || '', + }); +}); + +router.get('/google-integrations', async (req, res) => { + try { + const { rows } = await pool.query( + `SELECT * FROM coexistence.oauth_credentials + WHERE user_id = $1 AND provider = $2 + ORDER BY created_at DESC`, + [req.user.id, PROVIDER], + ); + res.json(rows.map(publicShape)); + } catch (err) { + console.error('[google-integrations] list error:', err.message); + res.status(500).json({ error: 'Failed to list Google integrations' }); + } +}); + +/** + * Returns the URL the frontend should send the user to. The frontend opens it + * as a full-window navigation (not a popup) — Google's consent screen breaks + * inside popups in many browser configurations, and the callback redirects + * cleanly back to the settings tab anyway. + */ +router.post('/google-integrations/authorize', (req, res) => { + if (!isConfigured()) { + return res.status(501).json({ error: 'Google OAuth is not configured on this server. Set GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_OAUTH_REDIRECT_URI in backend/.env and restart.' }); + } + try { + const nonce = crypto.randomBytes(16).toString('hex'); + const url = buildAuthUrl({ userId: req.user.id, nonce }); + res.json({ authUrl: url }); + } catch (err) { + console.error('[google-integrations] authorize error:', err.message); + res.status(500).json({ error: err.message || 'Failed to start Google authorization' }); + } +}); + +/** + * Google redirects the user's browser here with ?code=... &state=... (or + * ?error=... if the user denied consent). Always 302's back to the frontend + * — never returns JSON — so the UX is a single tab switch. + * + * No authMiddleware: this is mounted on the public auth path because Google's + * redirect can't carry our session cookie predictably across some browsers' + * SameSite rules. We re-derive the user from the signed state token. + */ +publicRouter.get('/google-integrations/callback', async (req, res) => { + const { code, state, error } = req.query; + if (error) { + return res.redirect(frontendSettingsUrl({ status: 'error', error: String(error) })); + } + if (!code || !state) { + return res.redirect(frontendSettingsUrl({ status: 'error', error: 'Missing code or state' })); + } + const payload = verifyState(String(state)); + if (!payload) { + return res.redirect(frontendSettingsUrl({ status: 'error', error: 'Invalid or expired state' })); + } + try { + const row = await handleCallback({ code: String(code), userId: payload.uid }); + res.redirect(frontendSettingsUrl({ status: 'connected', label: row.account_label })); + } catch (err) { + console.error('[google-integrations] callback error:', err.message); + res.redirect(frontendSettingsUrl({ status: 'error', error: err.message || 'OAuth callback failed' })); + } +}); + +// Pick-list endpoints used by the agent's Sheets tool config UI. +// Both scope the credential lookup to the caller's user_id so one user can't +// list another user's Google data. +router.get('/google-integrations/:id/spreadsheets', async (req, res) => { + try { + const { rows } = await pool.query( + 'SELECT id FROM coexistence.oauth_credentials WHERE id = $1 AND user_id = $2 AND provider = $3', + [req.params.id, req.user.id, PROVIDER], + ); + if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); + const files = await googleSheets.listSpreadsheets(req.params.id, { query: req.query.q || '' }); + res.json(files); + } catch (err) { + console.error('[google-integrations] list spreadsheets error:', err.message); + res.status(500).json({ error: err.message || 'Failed to list spreadsheets' }); + } +}); + +router.get('/google-integrations/:id/spreadsheets/:spreadsheetId/tabs', async (req, res) => { + try { + const { rows } = await pool.query( + 'SELECT id FROM coexistence.oauth_credentials WHERE id = $1 AND user_id = $2 AND provider = $3', + [req.params.id, req.user.id, PROVIDER], + ); + if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); + const tabs = await googleSheets.listSheetTabs(req.params.id, req.params.spreadsheetId); + res.json(tabs); + } catch (err) { + console.error('[google-integrations] list tabs error:', err.message); + res.status(500).json({ error: err.message || 'Failed to list sheet tabs' }); + } +}); + +router.delete('/google-integrations/:id', async (req, res) => { + try { + // Make sure the credential being deleted belongs to the caller — without + // this scope check any authenticated user could disconnect anyone else's + // Google account by guessing IDs. + const { rows } = await pool.query( + 'SELECT id FROM coexistence.oauth_credentials WHERE id = $1 AND user_id = $2', + [req.params.id, req.user.id], + ); + if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); + const ok = await revokeAndDelete(req.params.id); + if (!ok) return res.status(404).json({ error: 'Not found' }); + res.json({ ok: true }); + } catch (err) { + console.error('[google-integrations] delete error:', err.message); + res.status(500).json({ error: 'Failed to disconnect Google account' }); + } +}); + +module.exports = { router, publicRouter }; diff --git a/backend/src/routes/webhook.js b/backend/src/routes/webhook.js index c65ca9f..3a34555 100644 --- a/backend/src/routes/webhook.js +++ b/backend/src/routes/webhook.js @@ -3,6 +3,7 @@ const pool = require('../db'); const { decrypt } = require('../util/crypto'); const { safeEqual, verifyMetaSignature } = require('../util/webhookSignature'); const { evaluateTriggers, resumeAutomation } = require('../engine/automationEngine'); +const agentRouter = require('../services/agentRouter'); const { markPending, MEDIA_TYPES } = require('../services/mediaDownloader'); const { enqueueMediaDownload } = require('../queue/mediaQueue'); @@ -328,7 +329,18 @@ router.post('/webhook/whatsapp', async (req, res) => { } continue; // do not also fire fresh triggers } - await evaluateTriggers(record); + const fired = await evaluateTriggers(record); + // Agent fall-through: if no keyword automation matched, hand the + // message to the agent bound to this WhatsApp account (if any active + // agent exists). evaluateTriggers returns the array of executions + // it created; an empty array means nothing fired. + if (!fired || fired.length === 0) { + try { + await agentRouter.routeIfActive(record); + } catch (agentErr) { + console.error('[webhook] Agent routing error:', agentErr.message); + } + } } catch (triggerErr) { console.error('[webhook] Trigger evaluation error:', triggerErr.message); } diff --git a/backend/src/services/agentRouter.js b/backend/src/services/agentRouter.js new file mode 100644 index 0000000..8fc2b9a --- /dev/null +++ b/backend/src/services/agentRouter.js @@ -0,0 +1,51 @@ +// Agent router. Called from the webhook after evaluateTriggers() returns. +// Decides whether to hand an inbound message to the active agent for that +// WhatsApp account. +// +// Precedence (handled by the caller): +// 1. Paused automation execution awaiting a reply → resume that, skip agent. +// 2. Keyword automation fires on this message → run it, skip agent. +// 3. Otherwise → this router enqueues the agent run (if any agent is active +// for the inbound WA number). + +const pool = require('../db'); +const { enqueueAgentRun } = require('../queue/agentQueue'); + +/** + * Look up the active agent (if any) for the WhatsApp account that received + * `record`, and enqueue a run. Returns the run job's metadata or null. + * + * - Matches the WA account by its display_phone_number (digits-only) against + * the inbound record's wa_number; falls back to phone_number_id. + * - The DB enforces at most one active agent per WA account (partial unique + * index on agents(wa_account_id) WHERE is_active=TRUE), so this query is + * guaranteed to return ≤1 row. + */ +async function routeIfActive(record) { + if (!record || record.direction !== 'incoming') return null; + if (record.message_type === 'status' || record.message_type === 'reaction') return null; + if (!record.message_body || !record.contact_number) return null; + + const { rows } = await pool.query( + `SELECT a.id, a.wa_account_id + FROM coexistence.agents a + JOIN coexistence.whatsapp_accounts w ON w.id = a.wa_account_id + WHERE a.is_active = TRUE + AND (regexp_replace(w.display_phone_number, '\\D', '', 'g') = $1 + OR w.phone_number_id = $2) + LIMIT 1`, + [record.wa_number || '', record.phone_number_id || ''], + ); + if (rows.length === 0) return null; + + const agent = rows[0]; + await enqueueAgentRun({ + agentId: agent.id, + contactNumber: record.contact_number, + inboundMessageId: record.message_id || null, + inboundText: record.message_body, + }); + return { agentId: agent.id }; +} + +module.exports = { routeIfActive }; diff --git a/backend/src/services/googleAuth.js b/backend/src/services/googleAuth.js new file mode 100644 index 0000000..454db4a --- /dev/null +++ b/backend/src/services/googleAuth.js @@ -0,0 +1,268 @@ +// Google OAuth helper. Owns the OAuth2 client lifecycle (consent URL → token +// exchange → store-encrypted → refresh) for the generic coexistence.oauth_credentials +// table, with provider='google'. v1 scopes cover Google Sheets only; Gmail + +// Calendar will append scopes here in a follow-up release without schema change. +// +// State CSRF: the OAuth `state` param is a short-lived JWT signed with JWT_SECRET +// that pins the request to a specific user_id + nonce. The callback rejects any +// mismatch, so a returning Google redirect can't be replayed against another user. + +const { google } = require('googleapis'); +const jwt = require('jsonwebtoken'); +const pool = require('../db'); +const { encrypt, decrypt } = require('../util/crypto'); + +const PROVIDER = 'google'; + +// v1: Sheets-only. Drive.file is needed so the user's spreadsheet picker can +// list files the user explicitly grants access to (non-broad: only files +// created or opened with the app, NOT the user's entire Drive). +const SHEETS_SCOPES = [ + 'https://www.googleapis.com/auth/drive.file', + 'https://www.googleapis.com/auth/spreadsheets', + // openid + email + profile let us derive the account_label (email) without a + // second API call. + 'openid', + 'email', + 'profile', +]; + +const STATE_TTL_SECONDS = 10 * 60; // 10 min consent window + +function isConfigured() { + return !!(process.env.GOOGLE_CLIENT_ID && + process.env.GOOGLE_CLIENT_SECRET && + process.env.GOOGLE_OAUTH_REDIRECT_URI); +} + +function buildOAuthClient() { + if (!isConfigured()) { + const err = new Error('Google OAuth is not configured on this server. Set GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_OAUTH_REDIRECT_URI.'); + err.code = 'GOOGLE_OAUTH_NOT_CONFIGURED'; + throw err; + } + return new google.auth.OAuth2( + process.env.GOOGLE_CLIENT_ID, + process.env.GOOGLE_CLIENT_SECRET, + process.env.GOOGLE_OAUTH_REDIRECT_URI, + ); +} + +function signState({ userId, nonce }) { + return jwt.sign( + { uid: userId, n: nonce, kind: 'google_oauth_state' }, + process.env.JWT_SECRET || 'forgecrm-dev-secret-change-me', + { expiresIn: STATE_TTL_SECONDS }, + ); +} + +function verifyState(state) { + try { + const payload = jwt.verify( + state, + process.env.JWT_SECRET || 'forgecrm-dev-secret-change-me', + ); + if (payload.kind !== 'google_oauth_state') return null; + return payload; + } catch { + return null; + } +} + +/** + * Build the Google consent URL the user is sent to. + * access_type=offline + prompt=consent force Google to return a refresh_token + * every time (without prompt=consent, Google only emits a refresh_token on the + * FIRST consent — re-connecting the same account would leave us tokenless). + */ +function buildAuthUrl({ userId, nonce, scopes = SHEETS_SCOPES }) { + const client = buildOAuthClient(); + // NOTE: `include_granted_scopes` is intentionally OFF. With it on, Google + // adds back every scope the user has ever granted to this OAuth Client + // (e.g. Gmail, Calendar, full Drive from a previous setup) on top of what + // we're requesting now. v1 of this feature only needs Sheets + the narrow + // `drive.file` scope to populate the spreadsheet picker, so we request + // exactly those. If the user previously granted broader scopes, they + // should revoke this app at https://myaccount.google.com/permissions + // before reconnecting to get a clean minimal-permissions consent screen. + return client.generateAuthUrl({ + access_type: 'offline', + prompt: 'consent', + scope: scopes, + state: signState({ userId, nonce }), + }); +} + +/** + * Exchange an auth code for tokens, derive the user's Google email from the + * id_token, and upsert into oauth_credentials (encrypted). + */ +async function handleCallback({ code, userId }) { + const client = buildOAuthClient(); + const { tokens } = await client.getToken(code); + // tokens: { access_token, refresh_token, id_token, expiry_date, scope, token_type } + + if (!tokens.refresh_token) { + // This can happen if the user previously consented and we didn't pass + // prompt=consent — unrecoverable here because Google won't give us a refresh + // token, so the connection would silently die in an hour. Surface clearly. + const err = new Error('Google did not return a refresh token. Revoke the app from your Google Account → Security → Third-party access, then try connecting again.'); + err.code = 'NO_REFRESH_TOKEN'; + throw err; + } + + // Pull the email out of the id_token (no extra API call). The id_token is a + // JWT signed by Google; we trust the values because we just got it over TLS + // straight from Google's token endpoint via the official SDK. + let email = null; + if (tokens.id_token) { + const parts = tokens.id_token.split('.'); + if (parts.length === 3) { + try { + const payload = JSON.parse(Buffer.from(parts[1], 'base64').toString('utf8')); + if (payload && typeof payload.email === 'string') email = payload.email; + } catch { /* ignore — fall through to label below */ } + } + } + const accountLabel = email || `google-${Date.now()}`; + + const scopes = (tokens.scope || '').split(' ').filter(Boolean); + const expiresAt = tokens.expiry_date ? new Date(tokens.expiry_date) : null; + + // Upsert: re-connecting the same Google account updates the existing row + // (refresh token rotates) instead of failing with a unique-violation. + const { rows } = await pool.query( + `INSERT INTO coexistence.oauth_credentials + (user_id, provider, account_label, + refresh_token_encrypted, access_token_encrypted, access_token_expires_at, + scopes, health_status, last_refreshed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, 'ok', NOW()) + ON CONFLICT (user_id, provider, account_label) DO UPDATE + SET refresh_token_encrypted = EXCLUDED.refresh_token_encrypted, + access_token_encrypted = EXCLUDED.access_token_encrypted, + access_token_expires_at = EXCLUDED.access_token_expires_at, + scopes = EXCLUDED.scopes, + health_status = 'ok', + last_error_message = NULL, + last_refreshed_at = NOW(), + updated_at = NOW() + RETURNING *`, + [ + userId, PROVIDER, accountLabel, + encrypt(tokens.refresh_token), encrypt(tokens.access_token || ''), + expiresAt, scopes, + ], + ); + return rows[0]; +} + +/** + * Return a valid access token for the given credential row, refreshing via the + * refresh_token if the cached one expired (or is within a 60s safety window). + * Persists the refreshed access_token + expires_at back to the DB. + */ +async function getAccessToken(credentialId) { + const { rows } = await pool.query( + 'SELECT * FROM coexistence.oauth_credentials WHERE id = $1', + [credentialId], + ); + if (rows.length === 0) { + const err = new Error('OAuth credential not found'); + err.code = 'CREDENTIAL_NOT_FOUND'; + throw err; + } + const row = rows[0]; + + const now = Date.now(); + const expiresAt = row.access_token_expires_at ? new Date(row.access_token_expires_at).getTime() : 0; + const cached = decrypt(row.access_token_encrypted); + if (cached && expiresAt > now + 60 * 1000) { + return cached; + } + + // Refresh. + const client = buildOAuthClient(); + const refreshToken = decrypt(row.refresh_token_encrypted); + if (!refreshToken) { + await markUnhealthy(credentialId, 'Refresh token missing or unreadable; reconnect this Google account.'); + const err = new Error('Refresh token unavailable; reconnect this Google account.'); + err.code = 'REFRESH_TOKEN_MISSING'; + throw err; + } + client.setCredentials({ refresh_token: refreshToken }); + let creds; + try { + const r = await client.refreshAccessToken(); + creds = r.credentials; + } catch (err) { + await markUnhealthy(credentialId, `Refresh failed: ${err.message}`); + throw err; + } + const newAccess = creds.access_token; + const newExpiry = creds.expiry_date ? new Date(creds.expiry_date) : null; + await pool.query( + `UPDATE coexistence.oauth_credentials + SET access_token_encrypted = $1, + access_token_expires_at = $2, + health_status = 'ok', + last_error_message = NULL, + last_refreshed_at = NOW(), + updated_at = NOW() + WHERE id = $3`, + [encrypt(newAccess), newExpiry, credentialId], + ); + return newAccess; +} + +async function markUnhealthy(credentialId, message) { + try { + await pool.query( + `UPDATE coexistence.oauth_credentials + SET health_status = 'error', + last_error_message = $1, + updated_at = NOW() + WHERE id = $2`, + [String(message || '').slice(0, 1000), credentialId], + ); + } catch (e) { + console.error('[googleAuth] markUnhealthy failed:', e.message); + } +} + +/** + * Best-effort token revocation at Google's end. Always followed by a DB delete + * by the caller — we don't want a Google API hiccup to leave a dead row. + */ +async function revokeAndDelete(credentialId) { + const { rows } = await pool.query( + 'SELECT refresh_token_encrypted FROM coexistence.oauth_credentials WHERE id = $1', + [credentialId], + ); + if (rows.length === 0) return false; + const refreshToken = decrypt(rows[0].refresh_token_encrypted); + if (refreshToken) { + try { + const client = buildOAuthClient(); + await client.revokeToken(refreshToken); + } catch (e) { + console.warn('[googleAuth] revokeToken failed (deleting locally anyway):', e.message); + } + } + const { rowCount } = await pool.query( + 'DELETE FROM coexistence.oauth_credentials WHERE id = $1', + [credentialId], + ); + return rowCount > 0; +} + +module.exports = { + PROVIDER, + SHEETS_SCOPES, + isConfigured, + buildAuthUrl, + verifyState, + handleCallback, + getAccessToken, + markUnhealthy, + revokeAndDelete, +}; diff --git a/backend/src/services/googleSheets.js b/backend/src/services/googleSheets.js new file mode 100644 index 0000000..180c56f --- /dev/null +++ b/backend/src/services/googleSheets.js @@ -0,0 +1,169 @@ +// Google Sheets service wrapper. +// +// Every call resolves a fresh access token via googleAuth.getAccessToken(), +// which silently refreshes the cached token if it's about to expire. Tools +// hand us a credentialId; we never see plain tokens. +// +// Three operations are exposed as agent tools: `read`, `append`, `update`. +// Plus picker helpers (`listSpreadsheets`, `listSheetTabs`) used by the UI +// when the operator is configuring which sheet an agent should touch. + +const { google } = require('googleapis'); +const { getAccessToken } = require('./googleAuth'); + +async function authedSheets(credentialId) { + const token = await getAccessToken(credentialId); + const oauth2 = new google.auth.OAuth2(); + oauth2.setCredentials({ access_token: token }); + return google.sheets({ version: 'v4', auth: oauth2 }); +} + +async function authedDrive(credentialId) { + const token = await getAccessToken(credentialId); + const oauth2 = new google.auth.OAuth2(); + oauth2.setCredentials({ access_token: token }); + return google.drive({ version: 'v3', auth: oauth2 }); +} + +/** + * Pick-list for the UI. Returns spreadsheets the user has either created with + * us or explicitly opened — we never list their full Drive (the requested + * scope is `drive.file`, not the full `drive` scope). + */ +async function listSpreadsheets(credentialId, { pageSize = 50, query = '' } = {}) { + const drive = await authedDrive(credentialId); + const safeQuery = query.replace(/'/g, "\\'").slice(0, 100); + const q = [ + "mimeType='application/vnd.google-apps.spreadsheet'", + 'trashed=false', + safeQuery ? `name contains '${safeQuery}'` : null, + ].filter(Boolean).join(' and '); + const { data } = await drive.files.list({ + q, + pageSize, + fields: 'files(id,name,modifiedTime)', + orderBy: 'modifiedTime desc', + }); + return data.files || []; +} + +/** + * List the tab (sheet) names inside one spreadsheet, so the operator can pick + * which tab the agent reads/writes. + */ +async function listSheetTabs(credentialId, spreadsheetId) { + const sheets = await authedSheets(credentialId); + const { data } = await sheets.spreadsheets.get({ + spreadsheetId, + fields: 'sheets(properties(sheetId,title,gridProperties(rowCount,columnCount)))', + }); + return (data.sheets || []).map(s => ({ + sheetId: s.properties.sheetId, + title: s.properties.title, + rowCount: s.properties.gridProperties?.rowCount, + columnCount: s.properties.gridProperties?.columnCount, + })); +} + +/** + * Tool op: read a range from the configured sheet. + * args.range — optional A1 (defaults to the whole tab if omitted) + * args.max_rows — soft cap so the LLM doesn't get a wall of data + */ +async function read({ credentialId, spreadsheetId, sheetName, args = {} }) { + const sheets = await authedSheets(credentialId); + const range = args.range + ? (args.range.includes('!') ? args.range : `'${sheetName}'!${args.range}`) + : `'${sheetName}'`; + const { data } = await sheets.spreadsheets.values.get({ + spreadsheetId, + range, + valueRenderOption: 'UNFORMATTED_VALUE', + }); + const rows = data.values || []; + const maxRows = Math.max(1, Math.min(500, parseInt(args.max_rows || 100, 10))); + return { + range: data.range, + rowCount: rows.length, + truncated: rows.length > maxRows, + rows: rows.slice(0, maxRows), + }; +} + +/** + * Tool op: append a row. `args.values` is an array of cell values (left-to-right). + * USER_ENTERED so dates/numbers/formulas behave like a human typed them. + */ +async function append({ credentialId, spreadsheetId, sheetName, args = {} }) { + if (!Array.isArray(args.values)) { + throw new Error('append requires args.values (array)'); + } + const sheets = await authedSheets(credentialId); + const { data } = await sheets.spreadsheets.values.append({ + spreadsheetId, + range: `'${sheetName}'`, + valueInputOption: 'USER_ENTERED', + insertDataOption: 'INSERT_ROWS', + requestBody: { values: [args.values] }, + }); + return { + updatedRange: data.updates?.updatedRange, + updatedRows: data.updates?.updatedRows, + updatedCells: data.updates?.updatedCells, + }; +} + +/** + * Tool op: write `args.values` into a specific range (`args.range`). + * Used to update an existing row the LLM identified via `read`. + */ +async function update({ credentialId, spreadsheetId, sheetName, args = {} }) { + if (!args.range) throw new Error('update requires args.range'); + if (!Array.isArray(args.values)) throw new Error('update requires args.values (array)'); + const sheets = await authedSheets(credentialId); + // Single row: wrap in an outer array; matrix: pass through. + const values = Array.isArray(args.values[0]) ? args.values : [args.values]; + const range = args.range.includes('!') ? args.range : `'${sheetName}'!${args.range}`; + const { data } = await sheets.spreadsheets.values.update({ + spreadsheetId, + range, + valueInputOption: 'USER_ENTERED', + requestBody: { values }, + }); + return { + updatedRange: data.updatedRange, + updatedRows: data.updatedRows, + updatedCells: data.updatedCells, + }; +} + +/** + * Dispatcher used by the agent engine. Looks at the tool's `config.ops` to + * gate which operations the LLM is allowed to call — defense in depth, since + * the LLM only ever sees the ops we expose to it in the tool schema anyway. + */ +async function executeOp({ op, toolConfig, args }) { + const allowed = Array.isArray(toolConfig.ops) ? toolConfig.ops : []; + if (!allowed.includes(op)) { + throw new Error(`Operation '${op}' is not enabled for this Sheets tool. Enabled: ${allowed.join(', ') || 'none'}`); + } + const ctx = { + credentialId: toolConfig.google_account_id, + spreadsheetId: toolConfig.spreadsheet_id, + sheetName: toolConfig.sheet_name, + args, + }; + if (op === 'read') return read(ctx); + if (op === 'append') return append(ctx); + if (op === 'update') return update(ctx); + throw new Error(`Unknown Sheets op: ${op}`); +} + +module.exports = { + listSpreadsheets, + listSheetTabs, + read, + append, + update, + executeOp, +}; diff --git a/db/migrations/050_oauth_credentials.sql b/db/migrations/050_oauth_credentials.sql new file mode 100644 index 0000000..cdbf327 --- /dev/null +++ b/db/migrations/050_oauth_credentials.sql @@ -0,0 +1,28 @@ +-- 050: Generic OAuth credentials. +-- +-- Provider-keyed so the same table holds Google (v1: Sheets), and later Gmail, +-- Calendar, Slack, HubSpot, etc. — no schema migration when we add providers. +-- refresh/access tokens are AES-256-GCM ciphertexts produced by +-- backend/src/util/crypto.js. NEVER store plain tokens in these columns. +-- +-- Indexing: lookup by (user_id, provider) is the hot path (list-my-google-accounts). + +CREATE TABLE IF NOT EXISTS coexistence.oauth_credentials ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES coexistence.forgecrm_users(id) ON DELETE CASCADE, + provider TEXT NOT NULL, -- 'google' in v1 + account_label TEXT NOT NULL, -- e.g. user's Google email + refresh_token_encrypted TEXT NOT NULL, + access_token_encrypted TEXT, + access_token_expires_at TIMESTAMPTZ, + scopes TEXT[] NOT NULL DEFAULT '{}', -- e.g. ['drive.file','spreadsheets'] + health_status TEXT NOT NULL DEFAULT 'ok', -- 'ok' | 'error' + last_error_message TEXT, + last_refreshed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (user_id, provider, account_label) +); + +CREATE INDEX IF NOT EXISTS idx_oauth_credentials_user_provider + ON coexistence.oauth_credentials (user_id, provider); diff --git a/db/migrations/051_agents.sql b/db/migrations/051_agents.sql new file mode 100644 index 0000000..572f7a8 --- /dev/null +++ b/db/migrations/051_agents.sql @@ -0,0 +1,88 @@ +-- 051: AI Agents (standalone, not workflow nodes). +-- +-- An "agent" is a configured LLM that handles inbound WhatsApp messages on a +-- bound WA account: system prompt + provider + model + BYOK key + tools. Tools +-- are stored separately so we can extend the tool registry (Gmail, Calendar, +-- HTTP, ...) without changing the agent row shape. +-- +-- Per the agreed plan, agents are *always-on* for their bound WA account — the +-- webhook's automation evaluation runs first, and only when no keyword auto +-- fires do we hand the message to the active agent. Precedence is enforced in +-- backend/src/services/agentRouter.js, not here. +-- +-- llm_api_key_encrypted is AES-256-GCM (backend/src/util/crypto.js). Null +-- means "fall back to ANTHROPIC_API_KEY / OPENAI_API_KEY from the server env". + +CREATE TABLE IF NOT EXISTS coexistence.agents ( + id BIGSERIAL PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + system_prompt TEXT NOT NULL, + llm_provider TEXT NOT NULL CHECK (llm_provider IN ('anthropic','openai')), + llm_model TEXT NOT NULL, + llm_api_key_encrypted TEXT, + wa_account_id BIGINT REFERENCES coexistence.whatsapp_accounts(id) ON DELETE SET NULL, + is_active BOOLEAN NOT NULL DEFAULT FALSE, + context_window_messages INT NOT NULL DEFAULT 20, + max_tool_iterations INT NOT NULL DEFAULT 6, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Single active agent per WA account: enforced with a partial unique index so +-- multiple paused/inactive drafts can coexist for the same number while only +-- one ever takes inbound traffic. +CREATE UNIQUE INDEX IF NOT EXISTS idx_agents_one_active_per_account + ON coexistence.agents (wa_account_id) + WHERE is_active = TRUE; + +CREATE TABLE IF NOT EXISTS coexistence.agent_tools ( + id BIGSERIAL PRIMARY KEY, + agent_id BIGINT NOT NULL REFERENCES coexistence.agents(id) ON DELETE CASCADE, + tool_type TEXT NOT NULL, -- 'google_sheets' in v1 + config JSONB NOT NULL, -- shape varies by tool_type; see backend + is_enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_agent_tools_agent + ON coexistence.agent_tools (agent_id); + +-- One row per LLM-invocation chain (i.e. per inbound message handled). +CREATE TABLE IF NOT EXISTS coexistence.agent_runs ( + id BIGSERIAL PRIMARY KEY, + agent_id BIGINT NOT NULL REFERENCES coexistence.agents(id) ON DELETE CASCADE, + wa_account_id BIGINT REFERENCES coexistence.whatsapp_accounts(id) ON DELETE SET NULL, + contact_number TEXT NOT NULL, + inbound_message_id TEXT, + status TEXT NOT NULL CHECK (status IN ('running','completed','failed','capped')), + total_input_tokens INT, + total_output_tokens INT, + final_reply TEXT, + error_message TEXT, + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ended_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_agent_runs_agent_started + ON coexistence.agent_runs (agent_id, started_at DESC); + +CREATE INDEX IF NOT EXISTS idx_agent_runs_contact + ON coexistence.agent_runs (contact_number, started_at DESC); + +CREATE TABLE IF NOT EXISTS coexistence.agent_run_steps ( + id BIGSERIAL PRIMARY KEY, + run_id BIGINT NOT NULL REFERENCES coexistence.agent_runs(id) ON DELETE CASCADE, + step_index INT NOT NULL, + step_type TEXT NOT NULL CHECK (step_type IN ('llm_call','tool_call')), + tool_type TEXT, -- set when step_type='tool_call' + input JSONB, + output JSONB, + status TEXT NOT NULL CHECK (status IN ('ok','error')), + latency_ms INT, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_agent_run_steps_run + ON coexistence.agent_run_steps (run_id, step_index); diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 878b1e1..3652772 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -15,10 +15,12 @@ import AdminSettingsPage from './pages/AdminSettingsPage.jsx'; import MediaLibraryPage from './pages/MediaLibraryPage.jsx'; import AboutUsPage from './pages/AboutUsPage.jsx'; import PipelinesPage from './pages/PipelinesPage.jsx'; +import AiAgentBuilderPage from './pages/AiAgentBuilderPage.jsx'; const VALID_PAGES = new Set([ 'home', 'chatbot-builder', 'template-builder', 'chats', 'contacts', 'pipelines', 'bulk-message', 'admin-settings', 'media-library', 'about', + 'ai-agent-builder', ]); export default function App() { @@ -96,6 +98,7 @@ export default function App() { case 'media-library': return ; case 'bulk-message': return ; case 'chatbot-builder': return ; + case 'ai-agent-builder': return ; case 'about': return ; case 'admin-settings': return ; default: return ; diff --git a/frontend/src/api.js b/frontend/src/api.js index ea02ae2..aab821c 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -151,6 +151,37 @@ export const api = { update: (id, data) => req(`/whatsapp-accounts/${id}`, { method: 'PUT', body: JSON.stringify(data) }), delete: (id) => req(`/whatsapp-accounts/${id}`, { method: 'DELETE' }), }, + // Google integrations (v1: Google Sheets only; Gmail + Calendar in a later + // release reuse the same /google-integrations table and OAuth flow). + googleIntegrations: { + status: () => req('/google-integrations/status'), + list: () => req('/google-integrations'), + authorize: () => req('/google-integrations/authorize', { method: 'POST' }), + disconnect: (id) => req(`/google-integrations/${id}`, { method: 'DELETE' }), + listSpreadsheets: (id, q = '') => + req(`/google-integrations/${id}/spreadsheets${q ? `?q=${encodeURIComponent(q)}` : ''}`), + listTabs: (id, spreadsheetId) => + req(`/google-integrations/${id}/spreadsheets/${encodeURIComponent(spreadsheetId)}/tabs`), + }, + // AI Agents — standalone LLM-driven chat handlers bound to a WhatsApp account. + agents: { + list: () => req('/agents'), + get: (id, reveal = false) => req(`/agents/${id}${reveal ? '?reveal=1' : ''}`), + create: (data) => req('/agents', { method: 'POST', body: JSON.stringify(data) }), + update: (id, data) => req(`/agents/${id}`, { method: 'PUT', body: JSON.stringify(data) }), + delete: (id) => req(`/agents/${id}`, { method: 'DELETE' }), + runs: (id, limit = 50) => req(`/agents/${id}/runs?limit=${limit}`), + run: (id, runId) => req(`/agents/${id}/runs/${runId}`), + addTool: (id, data) => req(`/agents/${id}/tools`, { method: 'POST', body: JSON.stringify(data) }), + updateTool: (id, toolId, data) => + req(`/agents/${id}/tools/${toolId}`, { method: 'PUT', body: JSON.stringify(data) }), + removeTool: (id, toolId) => req(`/agents/${id}/tools/${toolId}`, { method: 'DELETE' }), + // Dry-run an agent without sending the reply to WhatsApp — used by the + // "Test chat" panel inside the agent editor. + test: (id, messages) => req(`/agents/${id}/test`, { + method: 'POST', body: JSON.stringify({ messages }), + }), + }, pipelines: { list: () => req('/pipelines'), create: (name) => req('/pipelines', { method: 'POST', body: JSON.stringify({ name }) }), diff --git a/frontend/src/components/Sidebar.jsx b/frontend/src/components/Sidebar.jsx index 375f1de..ab9b29d 100644 --- a/frontend/src/components/Sidebar.jsx +++ b/frontend/src/components/Sidebar.jsx @@ -1,12 +1,13 @@ import { Home, Zap, LayoutTemplate, MessageCircle, Users, - Megaphone, Image as ImageIcon, Info, KanbanSquare, + Megaphone, Image as ImageIcon, Info, KanbanSquare, Bot, } from 'lucide-react'; import { C, FONT } from '../constants.js'; const NAV_ITEMS = [ { id: 'home', label: 'Home', Icon: Home }, { id: 'chatbot-builder', label: 'Automations', Icon: Zap }, + { id: 'ai-agent-builder', label: 'AI Agents', Icon: Bot }, { id: 'template-builder', label: 'Template Builder', Icon: LayoutTemplate }, { id: 'media-library', label: 'Media', Icon: ImageIcon }, { id: 'chats', label: 'Chats', Icon: MessageCircle }, diff --git a/frontend/src/components/agents/AgentEditor.jsx b/frontend/src/components/agents/AgentEditor.jsx new file mode 100644 index 0000000..68516ed --- /dev/null +++ b/frontend/src/components/agents/AgentEditor.jsx @@ -0,0 +1,442 @@ +import { useState, useEffect, useCallback } from 'react'; +import { Save, Trash2, Eye, EyeOff, Loader2, Plus, AlertCircle } from 'lucide-react'; +import { api } from '../../api.js'; +import { C, FONT, MONO } from '../../constants.js'; +import DeleteConfirmModal from '../DeleteConfirmModal.jsx'; +import AgentToolsList from './AgentToolsList.jsx'; +import AgentRunsViewer from './AgentRunsViewer.jsx'; +import TestChat from './TestChat.jsx'; + +const PROVIDERS = [ + { + value: 'anthropic', + label: 'Anthropic Claude', + models: [ + { value: 'claude-opus-4-7', label: 'Claude Opus 4.7 (most capable)' }, + { value: 'claude-sonnet-4-6', label: 'Claude Sonnet 4.6 (balanced)' }, + { value: 'claude-haiku-4-5-20251001', label: 'Claude Haiku 4.5 (fastest)' }, + ], + }, + { + value: 'openai', + label: 'OpenAI', + models: [ + { value: 'gpt-4o', label: 'GPT-4o' }, + { value: 'gpt-4o-mini', label: 'GPT-4o mini' }, + { value: 'gpt-4-turbo', label: 'GPT-4 Turbo' }, + ], + }, +]; + +const BLANK = { + name: '', + description: '', + systemPrompt: 'You are a helpful WhatsApp assistant. Keep replies concise.', + llmProvider: 'anthropic', + llmModel: 'claude-sonnet-4-6', + llmApiKey: '', + waAccountId: '', + isActive: false, + contextWindowMessages: 20, + maxToolIterations: 6, +}; + +export default function AgentEditor({ agentId, waAccounts, user, onDone, onCancel }) { + const isCreate = agentId == null; + const [form, setForm] = useState(BLANK); + const [tools, setTools] = useState([]); + const [loading, setLoading] = useState(!isCreate); + const [saving, setSaving] = useState(false); + const [error, setError] = useState(''); + const [showKey, setShowKey] = useState(false); + const [pendingDelete, setPendingDelete] = useState(false); + const [advancedOpen, setAdvancedOpen] = useState(false); + + const refresh = useCallback(async () => { + if (isCreate) return; + setLoading(true); + setError(''); + try { + const a = await api.agents.get(agentId); + setForm({ + name: a.name || '', + description: a.description || '', + systemPrompt: a.systemPrompt || '', + llmProvider: a.llmProvider, + llmModel: a.llmModel, + llmApiKey: '', + waAccountId: a.waAccountId || '', + isActive: !!a.isActive, + contextWindowMessages: a.contextWindowMessages || 20, + maxToolIterations: a.maxToolIterations || 6, + hasOwnApiKey: !!a.hasOwnApiKey, + llmApiKeyMasked: a.llmApiKeyMasked || '', + }); + setTools(a.tools || []); + } catch (e) { + setError(prettyError(e)); + } finally { + setLoading(false); + } + }, [agentId, isCreate]); + + useEffect(() => { refresh(); }, [refresh]); + + // If switching provider, snap model to the new provider's first model. + const setProvider = (provider) => { + const p = PROVIDERS.find(x => x.value === provider); + setForm(f => ({ + ...f, + llmProvider: provider, + llmModel: p?.models[0]?.value || f.llmModel, + })); + }; + + const handleSave = async () => { + setSaving(true); + setError(''); + try { + const payload = { ...form }; + // Don't send the key field if the user didn't touch it (otherwise we'd + // wipe the saved encrypted key on every edit). + if (!payload.llmApiKey) delete payload.llmApiKey; + if (isCreate) { + const created = await api.agents.create(payload); + // After create, switch to "edit" mode for this new agent so the user + // can add tools without losing context. We do that by calling onDone() + // and trusting the parent to navigate; but to keep the flow simple in + // v1, return to the list. + onDone(created.id); + } else { + await api.agents.update(agentId, payload); + onDone(agentId); + } + } catch (e) { + setError(prettyError(e)); + } finally { + setSaving(false); + } + }; + + const handleDelete = async () => { + try { + await api.agents.delete(agentId); + setPendingDelete(false); + onDone(); + } catch (e) { + setError(prettyError(e)); + setPendingDelete(false); + } + }; + + if (loading) { + return ( +
+ Loading… +
+ ); + } + + const provider = PROVIDERS.find(p => p.value === form.llmProvider) || PROVIDERS[0]; + const isAdmin = user?.role === 'admin'; + + return ( +
+ {error && ( +
+ {error} +
+ )} + +
+ + + setForm(f => ({ ...f, name: e.target.value }))} + placeholder="e.g. Booking Assistant" style={inputStyle} /> + + + + + setForm(f => ({ ...f, description: e.target.value }))} + placeholder="What does this agent do?" style={inputStyle} /> + + +
+ +
+