-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpersistence-sqlite.ts
More file actions
144 lines (135 loc) · 4.48 KB
/
persistence-sqlite.ts
File metadata and controls
144 lines (135 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Example: durable run snapshots in a local SQLite file using node:sqlite
// (built into Node.js since 22.5; no external dependency).
//
// Wire it into createAgent via `persistence: makeSqlitePersistence('./runs.db')`.
// On every run: a row is written at start, updated after each step, and
// finalized on completion. Schema is created lazily on first use.
import { DatabaseSync } from 'node:sqlite'
import type {
IConversationTurn,
IPersistence,
IPlan,
IRunSnapshot,
IStepResult,
IUsage,
} from '../src/index.ts'
// Returned alongside the IPersistence facade so the caller can release the
// DB handle on shutdown - a long-lived agent process inherits the handle and
// node:sqlite will release it on exit, but tests / scripts should close
// explicitly to avoid lingering file locks.
export interface ISqlitePersistence extends IPersistence {
close: () => void
}
export const makeSqlitePersistence = (filePath: string): ISqlitePersistence => {
const db = new DatabaseSync(filePath)
// WAL gives us concurrent readers + a single writer with sane crash-safety.
// For a single-process agent this is mostly a perf win; for multiple
// processes sharing one DB it's the correct journaling mode.
db.exec('PRAGMA journal_mode = WAL;')
db.exec(`
CREATE TABLE IF NOT EXISTS agent_runs (
run_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
started_at INTEGER NOT NULL,
completed_at INTEGER,
input TEXT NOT NULL,
history TEXT,
plan TEXT,
trace TEXT NOT NULL,
usage TEXT NOT NULL,
step_index INTEGER NOT NULL,
iterations INTEGER NOT NULL,
revisions INTEGER NOT NULL,
result_text TEXT,
error TEXT
);
CREATE INDEX IF NOT EXISTS agent_runs_started_at_idx
ON agent_runs (started_at DESC);
`)
// Prepared once: hot path is the per-step UPSERT during a long run.
const upsert = db.prepare(`
INSERT INTO agent_runs
(run_id, status, started_at, completed_at, input, history, plan, trace, usage,
step_index, iterations, revisions, result_text, error)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id) DO UPDATE SET
status = excluded.status,
completed_at = excluded.completed_at,
history = excluded.history,
plan = excluded.plan,
trace = excluded.trace,
usage = excluded.usage,
step_index = excluded.step_index,
iterations = excluded.iterations,
revisions = excluded.revisions,
result_text = excluded.result_text,
error = excluded.error
`)
const select = db.prepare(`SELECT * FROM agent_runs WHERE run_id = ?`)
const write = (s: IRunSnapshot): void => {
upsert.run(
s.runId,
s.status,
s.startedAt,
s.completedAt ?? null,
s.input,
s.history ? JSON.stringify(s.history) : null,
s.plan ? JSON.stringify(s.plan) : null,
JSON.stringify(s.trace),
JSON.stringify(s.usage),
s.stepIndex,
s.iterations,
s.revisions,
s.text ?? null,
s.error ?? null,
)
}
// Row shape from the SELECT above. node:sqlite returns columns as a flat
// record; we narrow to the columns we wrote to keep the parser explicit.
type Row = {
run_id: string
status: IRunSnapshot['status']
started_at: number
completed_at: number | null
input: string
history: string | null
plan: string | null
trace: string
usage: string
step_index: number
iterations: number
revisions: number
result_text: string | null
error: string | null
}
const read = (runId: string): IRunSnapshot | null => {
const row = select.get(runId) as Row | undefined
if (!row) {
return null
}
return {
runId: row.run_id,
status: row.status,
startedAt: row.started_at,
completedAt: row.completed_at ?? undefined,
input: row.input,
history: row.history ? (JSON.parse(row.history) as IConversationTurn[]) : undefined,
plan: row.plan ? (JSON.parse(row.plan) as IPlan) : undefined,
trace: JSON.parse(row.trace) as IStepResult[],
usage: JSON.parse(row.usage) as IUsage,
stepIndex: row.step_index,
iterations: row.iterations,
revisions: row.revisions,
text: row.result_text ?? undefined,
error: row.error ?? undefined,
}
}
return {
onRunStart: (snapshot) => write(snapshot),
onStepComplete: (snapshot) => write(snapshot),
onRunComplete: (snapshot) => write(snapshot),
loadRun: (runId) => read(runId),
close: () => db.close(),
}
}