diff --git a/.agents/skills/sqlite-schema-design/SKILL.md b/.agents/skills/sqlite-schema-design/SKILL.md new file mode 100644 index 0000000000..2b098d5313 --- /dev/null +++ b/.agents/skills/sqlite-schema-design/SKILL.md @@ -0,0 +1,219 @@ +--- +name: sqlite-schema-design +description: Design or review schemas for `crates/cloudsync` using SQLite Sync constraints, not generic SQLite advice. Use when adding synced tables, changing synced columns, or planning CloudSync-safe migrations. +--- + +## Goal + +Design tables that behave correctly under SQLite Sync's CRDT replication model. + +This skill is specifically for CloudSync-backed schemas: + +- tables are initialized through `cloudsync_init(...)` +- sync is enabled with `cloudsync_enable(...)` +- identifiers should be generated with `cloudsync_uuid()` +- schema changes must go through `cloudsync_begin_alter(...)` and `cloudsync_commit_alter(...)` +- local and cloud databases must keep the same schema + +Do not treat this as ordinary SQLite schema design. SQLite Sync imposes extra rules around keys, defaults, foreign keys, and schema evolution. + +## Workflow + +### 1. Decide Whether The Table Is Synced + +Before proposing DDL, classify the table: + +- synced application data: must satisfy SQLite Sync constraints +- local-only cache or ephemeral state: should usually stay out of CloudSync + +Only apply this skill to synced tables or to tables that may become synced soon. + +### 2. Require A Stable, Globally Unique Primary Key + +For synced tables: + +- always declare an explicit primary key +- prefer `TEXT PRIMARY KEY NOT NULL` +- generate ids with `cloudsync_uuid()` +- do not use auto-incrementing integer ids + +SQLite Sync docs explicitly recommend UUIDv7-style globally unique ids for CRDT workloads. Integer autoincrement ids are a bad fit because multiple devices can create rows independently. + +```sql +CREATE TABLE document ( + id TEXT PRIMARY KEY NOT NULL DEFAULT (cloudsync_uuid()), + workspace_id TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '', + body TEXT NOT NULL DEFAULT '', + archived INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + updated_at INTEGER NOT NULL DEFAULT (unixepoch()) +) STRICT; +``` + +If the environment cannot use a function call in `DEFAULT`, generate the id in application code, but still use `cloudsync_uuid()` as the canonical id strategy. + +### 3. Make Inserts Merge-Safe With Real Defaults + +SQLite Sync best practices call out a non-obvious CRDT constraint: merges can happen column-by-column, so missing values are much more dangerous than in a single-node SQLite app. + +For synced tables: + +- every non-primary-key `NOT NULL` column should have a meaningful `DEFAULT` +- avoid required columns that only application code knows how to populate +- prefer simple scalar defaults over nullable columns when the field is logically always present + +Good: + +```sql +title TEXT NOT NULL DEFAULT '' +archived INTEGER NOT NULL DEFAULT 0 +sort_order INTEGER NOT NULL DEFAULT 0 +``` + +Bad: + +```sql +title TEXT NOT NULL +archived INTEGER NOT NULL +``` + +without defaults on a synced table. + +### 4. Keep The Local And Cloud Schemas Identical + +The getting-started docs require the local synced database and the SQLite Cloud database to share the same schema. + +When designing or reviewing a schema: + +- treat local and remote DDL as one contract +- do not introduce "client-only" columns on synced tables +- do not rely on drift being harmless +- ensure migrations are applied consistently before sync resumes + +If a field is only needed locally, it likely belongs in a separate non-synced table. + +### 5. Be Conservative With Foreign Keys + +SQLite Sync best practices explicitly warn that foreign keys can interact poorly with CRDT replication. + +Use foreign keys on synced tables only when the integrity guarantee is worth the operational cost. + +If you keep them: + +- make child foreign key columns nullable when the relationship is optional +- if a foreign key column has a `DEFAULT`, that default must be `NULL` or reference an actually valid parent row +- avoid fake sentinel ids such as `'root'` unless that parent row is guaranteed to exist everywhere +- index the child foreign key columns + +Prefer ownership patterns that tolerate out-of-order arrival between related rows. + +### 6. Avoid Triggers And Implicit Write Logic On Synced Tables + +SQLite Sync best practices advise minimizing triggers because they make replicated writes harder to reason about. + +For synced tables: + +- avoid triggers that mutate synced columns +- avoid hidden side effects on insert or update +- prefer explicit application writes +- keep derived or bookkeeping writes in non-synced tables if possible + +If a trigger is unavoidable, review it as part of the replication design, not as a local SQLite convenience. + +### 7. Scope Uniqueness For RLS And Multi-Tenant Sync + +The introduction docs emphasize row-level security and multi-tenant access patterns. + +That changes uniqueness design: + +- if the real rule is "unique per workspace/user/team", encode that as a composite constraint +- avoid globally unique business keys unless they truly span all tenants + +Prefer: + +```sql +UNIQUE (workspace_id, slug) +``` + +over: + +```sql +slug TEXT UNIQUE +``` + +when data is tenant-scoped. + +### 8. Keep Schema Changes Inside The CloudSync Alter Window + +Schema changes for synced databases are not ordinary `ALTER TABLE` work. Use: + +1. `cloudsync_begin_alter('table_name')` +2. perform the schema change +3. `cloudsync_commit_alter('table_name')` + +Design implications: + +- favor additive changes over destructive rewrites +- prefer adding columns with safe defaults +- avoid migrations that temporarily violate sync invariants +- plan rollouts so every replica can move cleanly to the new shape + +When reviewing a migration plan, reject any synced-table schema change that skips the CloudSync alter flow. + +### 9. Keep Sync Metadata Out Of Your Domain Schema + +SQLite Sync already exposes its own metadata and helpers: + +- `cloudsync_siteid()` +- `cloudsync_db_version()` +- `cloudsync_version()` +- `cloudsync_is_enabled()` + +Do not duplicate these concepts as app-managed columns on synced tables unless there is a very specific product requirement. + +### 10. Separate Network Lifecycle From Schema Design + +The API set includes network setup and sync transport functions such as: + +- `cloudsync_network_init(...)` +- `cloudsync_network_set_token(...)` +- `cloudsync_network_set_apikey(...)` +- `cloudsync_network_sync(...)` +- `cloudsync_network_has_unsent_changes()` + +These matter operationally, but they are not substitutes for sound schema design. + +Do not design tables that assume: + +- sync is always online +- rows arrive in lockstep +- dependent rows replicate in a single transaction boundary visible to all peers + +Assume offline creation, delayed delivery, retries, and independent merges. + +## Design Defaults For Synced Tables + +Unless the user explicitly asks otherwise: + +- `TEXT PRIMARY KEY NOT NULL` +- ids generated with `cloudsync_uuid()` +- `STRICT` tables +- explicit `DEFAULT` on every non-key `NOT NULL` column +- composite uniqueness for tenant-scoped identifiers +- minimal or no triggers +- cautious foreign key usage +- separate non-synced tables for local UI/cache state + +## Review Checklist + +When reviewing a CloudSync schema, ask: + +- Does every synced table have a globally unique primary key strategy? +- Are new rows creatable independently on multiple devices? +- Do all non-key required columns have defaults that make replicated inserts safe? +- Would the schema still behave correctly if related rows arrive out of order? +- Are foreign keys optional where replication ordering can vary? +- Are tenant-scoped uniqueness rules modeled as composite constraints? +- Does the migration plan use `cloudsync_begin_alter` / `cloudsync_commit_alter`? +- Is any local-only state incorrectly mixed into a synced table? diff --git a/Cargo.lock b/Cargo.lock index 29800d5924..8f8fd8982a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4717,6 +4717,7 @@ name = "db-app" version = "0.1.0" dependencies = [ "db-core2", + "db-migrate", "sqlx", "tokio", ] @@ -4756,6 +4757,7 @@ dependencies = [ "cloudsync", "db-app", "db-core2", + "db-migrate", "serde", "serde_json", "sqlx", @@ -4765,6 +4767,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "db-migrate" +version = "0.1.0" +dependencies = [ + "cloudsync", + "db-core2", + "sha2 0.10.9", + "sqlx", + "thiserror 2.0.18", + "tokio", +] + [[package]] name = "db-parser" version = "0.1.0" @@ -11546,6 +11560,7 @@ dependencies = [ "db-app", "db-core2", "db-live-query", + "db-migrate", "serde_json", "tempfile", "thiserror 2.0.18", @@ -18602,6 +18617,7 @@ dependencies = [ "db-app", "db-core2", "db-live-query", + "db-migrate", "serde", "serde_json", "specta", diff --git a/Cargo.toml b/Cargo.toml index d6daab62c1..41a1cc8669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ hypr-data = { path = "crates/data", package = "data" } hypr-db-app = { path = "crates/db-app", package = "db-app" } hypr-db-core2 = { path = "crates/db-core2", package = "db-core2" } hypr-db-live-query = { path = "crates/db-live-query", package = "db-live-query" } +hypr-db-migrate = { path = "crates/db-migrate", package = "db-migrate" } hypr-denoise = { path = "crates/denoise", package = "denoise" } hypr-detect = { path = "crates/detect", package = "detect" } hypr-device-monitor = { path = "crates/device-monitor", package = "device-monitor" } diff --git a/crates/cloudsync/AGENTS.md b/crates/cloudsync/AGENTS.md index 02289d9ac7..865453db2b 100644 --- a/crates/cloudsync/AGENTS.md +++ b/crates/cloudsync/AGENTS.md @@ -1 +1,29 @@ -https://github.com/sqliteai/sqlite-sync/releases/tag/1.0.12 +Release: +- https://github.com/sqliteai/sqlite-sync/releases/tag/1.0.12 + +Docs: +- https://docs.sqlitecloud.io/docs/sqlite-sync-introduction +- https://docs.sqlitecloud.io/docs/sqlite-sync-getting-started +- https://docs.sqlitecloud.io/docs/sqlite-sync-best-practices +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-init +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-enable +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-disable +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-is-enabled +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-cleanup +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-terminate +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-version +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-siteid +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-db-version +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-uuid +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-begin-alter +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-commit-alter +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-init +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-cleanup +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-token +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-apikey +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-has-unsent-changes +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-send-changes +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-check-changes +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-sync +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-reset-sync-version +- https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-logout diff --git a/crates/cloudsync/src/api.rs b/crates/cloudsync/src/api.rs new file mode 100644 index 0000000000..22d37acbff --- /dev/null +++ b/crates/cloudsync/src/api.rs @@ -0,0 +1,177 @@ +use sqlx::{Executor, Sqlite}; + +use crate::error::Error; + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-version +pub async fn version<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_version()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-init +pub async fn init<'e, E>( + executor: E, + table_name: &str, + crdt_algo: Option<&str>, + force: Option, +) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite> + Copy, +{ + match (crdt_algo, force) { + (None, None) => { + sqlx::query("SELECT cloudsync_init(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + } + (Some(crdt_algo), None) => { + sqlx::query("SELECT cloudsync_init(?, ?)") + .bind(table_name) + .bind(crdt_algo) + .fetch_optional(executor) + .await?; + } + (None, Some(force)) => { + sqlx::query("SELECT cloudsync_init(?, NULL, ?)") + .bind(table_name) + .bind(force) + .fetch_optional(executor) + .await?; + } + (Some(crdt_algo), Some(force)) => { + sqlx::query("SELECT cloudsync_init(?, ?, ?)") + .bind(table_name) + .bind(crdt_algo) + .bind(force) + .fetch_optional(executor) + .await?; + } + } + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-begin-alter +pub async fn begin_alter<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_begin_alter(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-enable +pub async fn enable<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_enable(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-disable +pub async fn disable<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_disable(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-is-enabled +pub async fn is_enabled<'e, E>(executor: E, table_name: &str) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok( + sqlx::query_scalar("SELECT cloudsync_is_enabled(?)") + .bind(table_name) + .fetch_one(executor) + .await?, + ) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-commit-alter +pub async fn commit_alter<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_commit_alter(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-cleanup +pub async fn cleanup<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_cleanup(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-siteid +pub async fn siteid<'e, E>(executor: E) -> Result, Error> +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_siteid()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-db-version +pub async fn db_version<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_db_version()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-uuid +pub async fn uuid<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_uuid()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-terminate +pub async fn terminate<'e, E>(executor: E) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_terminate()") + .fetch_optional(executor) + .await?; + + Ok(()) +} diff --git a/crates/cloudsync/src/init.rs b/crates/cloudsync/src/init.rs deleted file mode 100644 index 99504938bb..0000000000 --- a/crates/cloudsync/src/init.rs +++ /dev/null @@ -1,84 +0,0 @@ -use sqlx::SqlitePool; - -use crate::error::Error; - -pub async fn version(pool: &SqlitePool) -> Result { - Ok(sqlx::query_scalar("SELECT cloudsync_version()") - .fetch_one(pool) - .await?) -} - -pub async fn init( - pool: &SqlitePool, - table_name: &str, - crdt_algo: Option<&str>, - force: Option, -) -> Result<(), Error> { - match (crdt_algo, force) { - (None, None) => { - sqlx::query("SELECT cloudsync_init(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - } - (Some(crdt_algo), None) => { - sqlx::query("SELECT cloudsync_init(?, ?)") - .bind(table_name) - .bind(crdt_algo) - .fetch_optional(pool) - .await?; - } - (None, Some(force)) => { - sqlx::query("SELECT cloudsync_init(?, NULL, ?)") - .bind(table_name) - .bind(force) - .fetch_optional(pool) - .await?; - } - (Some(crdt_algo), Some(force)) => { - sqlx::query("SELECT cloudsync_init(?, ?, ?)") - .bind(table_name) - .bind(crdt_algo) - .bind(force) - .fetch_optional(pool) - .await?; - } - } - - Ok(()) -} - -pub async fn begin_alter(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_begin_alter(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn commit_alter(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_commit_alter(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn cleanup(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_cleanup(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn terminate(pool: &SqlitePool) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_terminate()") - .fetch_optional(pool) - .await?; - - Ok(()) -} diff --git a/crates/cloudsync/src/lib.rs b/crates/cloudsync/src/lib.rs index 8b8dec2de1..700262fcdd 100644 --- a/crates/cloudsync/src/lib.rs +++ b/crates/cloudsync/src/lib.rs @@ -2,7 +2,7 @@ mod bundle; mod error; -mod init; +mod api; mod network; use std::path::PathBuf; @@ -11,7 +11,20 @@ use sqlx::sqlite::SqliteConnectOptions; pub use bundle::bundled_extension_path; pub use error::{Error, ErrorKind}; -pub use init::{begin_alter, cleanup, commit_alter, init, terminate, version}; +pub use api::{ + begin_alter, + cleanup, + commit_alter, + db_version, + disable, + enable, + is_enabled, + init, + siteid, + terminate, + uuid, + version, +}; pub use network::{ network_check_changes, network_cleanup, network_has_unsent_changes, network_init, network_logout, network_reset_sync_version, network_send_changes, network_set_apikey, diff --git a/crates/cloudsync/src/network.rs b/crates/cloudsync/src/network.rs index 1c88a35024..787b656802 100644 --- a/crates/cloudsync/src/network.rs +++ b/crates/cloudsync/src/network.rs @@ -2,6 +2,7 @@ use sqlx::SqlitePool; use crate::error::Error; +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-init pub async fn network_init(pool: &SqlitePool, connection_string: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_init(?)") .bind(connection_string) @@ -11,6 +12,7 @@ pub async fn network_init(pool: &SqlitePool, connection_string: &str) -> Result< Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-apikey pub async fn network_set_apikey(pool: &SqlitePool, api_key: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_set_apikey(?)") .bind(api_key) @@ -20,6 +22,7 @@ pub async fn network_set_apikey(pool: &SqlitePool, api_key: &str) -> Result<(), Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-token pub async fn network_set_token(pool: &SqlitePool, token: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_set_token(?)") .bind(token) @@ -29,6 +32,7 @@ pub async fn network_set_token(pool: &SqlitePool, token: &str) -> Result<(), Err Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-cleanup pub async fn network_cleanup(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_cleanup()") .fetch_optional(pool) @@ -37,6 +41,7 @@ pub async fn network_cleanup(pool: &SqlitePool) -> Result<(), Error> { Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-has-unsent-changes pub async fn network_has_unsent_changes(pool: &SqlitePool) -> Result { Ok( sqlx::query_scalar("SELECT cloudsync_network_has_unsent_changes()") @@ -45,6 +50,7 @@ pub async fn network_has_unsent_changes(pool: &SqlitePool) -> Result, @@ -78,6 +84,7 @@ pub async fn network_send_changes( }) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-check-changes pub async fn network_check_changes( pool: &SqlitePool, wait_ms: Option, @@ -111,6 +118,7 @@ pub async fn network_check_changes( }) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-reset-sync-version pub async fn network_reset_sync_version(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_reset_sync_version()") .fetch_optional(pool) @@ -119,6 +127,7 @@ pub async fn network_reset_sync_version(pool: &SqlitePool) -> Result<(), Error> Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-logout pub async fn network_logout(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_logout()") .fetch_optional(pool) @@ -127,6 +136,7 @@ pub async fn network_logout(pool: &SqlitePool) -> Result<(), Error> { Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-sync pub async fn network_sync( pool: &SqlitePool, wait_ms: Option, diff --git a/crates/db-app/Cargo.toml b/crates/db-app/Cargo.toml index ddadf6f359..87ee789ae1 100644 --- a/crates/db-app/Cargo.toml +++ b/crates/db-app/Cargo.toml @@ -5,7 +5,9 @@ edition = "2024" [dependencies] hypr-db-core2 = { workspace = true } -sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled", "macros", "migrate"] } +hypr-db-migrate = { workspace = true } + +sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled", "macros"] } [features] default = [] diff --git a/crates/db-app/migrations/20260414120000_calendars_events.sql b/crates/db-app/migrations/20260414120000_calendars_events.sql new file mode 100644 index 0000000000..8febcf2f74 --- /dev/null +++ b/crates/db-app/migrations/20260414120000_calendars_events.sql @@ -0,0 +1,36 @@ +CREATE TABLE IF NOT EXISTS calendars ( + id TEXT PRIMARY KEY NOT NULL, + tracking_id_calendar TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + enabled INTEGER NOT NULL DEFAULT 0, + provider TEXT NOT NULL DEFAULT '', + source TEXT NOT NULL DEFAULT '', + color TEXT NOT NULL DEFAULT '#888', + connection_id TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); + +CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY NOT NULL, + tracking_id_event TEXT NOT NULL DEFAULT '', + calendar_id TEXT NOT NULL DEFAULT '', + title TEXT NOT NULL DEFAULT '', + started_at TEXT NOT NULL DEFAULT '', + ended_at TEXT NOT NULL DEFAULT '', + location TEXT NOT NULL DEFAULT '', + meeting_link TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + note TEXT NOT NULL DEFAULT '', + recurrence_series_id TEXT NOT NULL DEFAULT '', + has_recurrence_rules INTEGER NOT NULL DEFAULT 0, + is_all_day INTEGER NOT NULL DEFAULT 0, + provider TEXT NOT NULL DEFAULT '', + participants_json TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); + +CREATE INDEX IF NOT EXISTS idx_events_calendar_id ON events(calendar_id); +CREATE INDEX IF NOT EXISTS idx_events_started_at ON events(started_at); +CREATE INDEX IF NOT EXISTS idx_calendars_provider ON calendars(provider); diff --git a/crates/db-app/src/calendar_ops.rs b/crates/db-app/src/calendar_ops.rs new file mode 100644 index 0000000000..8cdc11478b --- /dev/null +++ b/crates/db-app/src/calendar_ops.rs @@ -0,0 +1,83 @@ +use sqlx::SqlitePool; + +use crate::{CalendarRow, UpsertCalendar}; + +pub async fn get_calendar(pool: &SqlitePool, id: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, CalendarRow>("SELECT * FROM calendars WHERE id = ?") + .bind(id) + .fetch_optional(pool) + .await +} + +pub async fn list_calendars(pool: &SqlitePool) -> Result, sqlx::Error> { + sqlx::query_as::<_, CalendarRow>("SELECT * FROM calendars ORDER BY name") + .fetch_all(pool) + .await +} + +pub async fn upsert_calendar( + pool: &SqlitePool, + input: UpsertCalendar<'_>, +) -> Result<(), sqlx::Error> { + sqlx::query( + "INSERT INTO calendars \ + (id, tracking_id_calendar, name, enabled, provider, source, color, connection_id, updated_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) \ + ON CONFLICT(id) DO UPDATE SET \ + tracking_id_calendar = excluded.tracking_id_calendar, \ + name = excluded.name, \ + enabled = excluded.enabled, \ + provider = excluded.provider, \ + source = excluded.source, \ + color = excluded.color, \ + connection_id = excluded.connection_id, \ + updated_at = excluded.updated_at", + ) + .bind(input.id) + .bind(input.tracking_id_calendar) + .bind(input.name) + .bind(input.enabled) + .bind(input.provider) + .bind(input.source) + .bind(input.color) + .bind(input.connection_id) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn insert_calendar_if_missing( + pool: &SqlitePool, + input: UpsertCalendar<'_>, +) -> Result { + let result = sqlx::query( + "INSERT INTO calendars \ + (id, tracking_id_calendar, name, enabled, provider, source, color, connection_id, \ + created_at, updated_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, \ + strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) \ + ON CONFLICT(id) DO NOTHING", + ) + .bind(input.id) + .bind(input.tracking_id_calendar) + .bind(input.name) + .bind(input.enabled) + .bind(input.provider) + .bind(input.source) + .bind(input.color) + .bind(input.connection_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +pub async fn delete_calendar(pool: &SqlitePool, id: &str) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM calendars WHERE id = ?") + .bind(id) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/crates/db-app/src/calendar_types.rs b/crates/db-app/src/calendar_types.rs new file mode 100644 index 0000000000..6e5340cf78 --- /dev/null +++ b/crates/db-app/src/calendar_types.rs @@ -0,0 +1,24 @@ +#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] +pub struct CalendarRow { + pub id: String, + pub tracking_id_calendar: String, + pub name: String, + pub enabled: bool, + pub provider: String, + pub source: String, + pub color: String, + pub connection_id: String, + pub created_at: String, + pub updated_at: String, +} + +pub struct UpsertCalendar<'a> { + pub id: &'a str, + pub tracking_id_calendar: &'a str, + pub name: &'a str, + pub enabled: bool, + pub provider: &'a str, + pub source: &'a str, + pub color: &'a str, + pub connection_id: &'a str, +} diff --git a/crates/db-app/src/event_ops.rs b/crates/db-app/src/event_ops.rs new file mode 100644 index 0000000000..6f9bab4a06 --- /dev/null +++ b/crates/db-app/src/event_ops.rs @@ -0,0 +1,117 @@ +use sqlx::SqlitePool; + +use crate::{EventRow, UpsertEvent}; + +pub async fn get_event(pool: &SqlitePool, id: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, EventRow>("SELECT * FROM events WHERE id = ?") + .bind(id) + .fetch_optional(pool) + .await +} + +pub async fn list_events(pool: &SqlitePool) -> Result, sqlx::Error> { + sqlx::query_as::<_, EventRow>("SELECT * FROM events ORDER BY started_at") + .fetch_all(pool) + .await +} + +pub async fn upsert_event(pool: &SqlitePool, input: UpsertEvent<'_>) -> Result<(), sqlx::Error> { + sqlx::query( + "INSERT INTO events \ + (id, tracking_id_event, calendar_id, title, started_at, ended_at, \ + location, meeting_link, description, note, recurrence_series_id, \ + has_recurrence_rules, is_all_day, provider, participants_json, updated_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) \ + ON CONFLICT(id) DO UPDATE SET \ + tracking_id_event = excluded.tracking_id_event, \ + calendar_id = excluded.calendar_id, \ + title = excluded.title, \ + started_at = excluded.started_at, \ + ended_at = excluded.ended_at, \ + location = excluded.location, \ + meeting_link = excluded.meeting_link, \ + description = excluded.description, \ + note = excluded.note, \ + recurrence_series_id = excluded.recurrence_series_id, \ + has_recurrence_rules = excluded.has_recurrence_rules, \ + is_all_day = excluded.is_all_day, \ + provider = excluded.provider, \ + participants_json = excluded.participants_json, \ + updated_at = excluded.updated_at", + ) + .bind(input.id) + .bind(input.tracking_id_event) + .bind(input.calendar_id) + .bind(input.title) + .bind(input.started_at) + .bind(input.ended_at) + .bind(input.location) + .bind(input.meeting_link) + .bind(input.description) + .bind(input.note) + .bind(input.recurrence_series_id) + .bind(input.has_recurrence_rules) + .bind(input.is_all_day) + .bind(input.provider) + .bind(input.participants_json) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn insert_event_if_missing( + pool: &SqlitePool, + input: UpsertEvent<'_>, +) -> Result { + let result = sqlx::query( + "INSERT INTO events \ + (id, tracking_id_event, calendar_id, title, started_at, ended_at, \ + location, meeting_link, description, note, recurrence_series_id, \ + has_recurrence_rules, is_all_day, provider, participants_json, \ + created_at, updated_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, \ + strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) \ + ON CONFLICT(id) DO NOTHING", + ) + .bind(input.id) + .bind(input.tracking_id_event) + .bind(input.calendar_id) + .bind(input.title) + .bind(input.started_at) + .bind(input.ended_at) + .bind(input.location) + .bind(input.meeting_link) + .bind(input.description) + .bind(input.note) + .bind(input.recurrence_series_id) + .bind(input.has_recurrence_rules) + .bind(input.is_all_day) + .bind(input.provider) + .bind(input.participants_json) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +pub async fn delete_event(pool: &SqlitePool, id: &str) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM events WHERE id = ?") + .bind(id) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn delete_events_by_calendar_id( + pool: &SqlitePool, + calendar_id: &str, +) -> Result { + let result = sqlx::query("DELETE FROM events WHERE calendar_id = ?") + .bind(calendar_id) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} diff --git a/crates/db-app/src/event_types.rs b/crates/db-app/src/event_types.rs new file mode 100644 index 0000000000..e8bb025cf4 --- /dev/null +++ b/crates/db-app/src/event_types.rs @@ -0,0 +1,38 @@ +#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] +pub struct EventRow { + pub id: String, + pub tracking_id_event: String, + pub calendar_id: String, + pub title: String, + pub started_at: String, + pub ended_at: String, + pub location: String, + pub meeting_link: String, + pub description: String, + pub note: String, + pub recurrence_series_id: String, + pub has_recurrence_rules: bool, + pub is_all_day: bool, + pub provider: String, + pub participants_json: Option, + pub created_at: String, + pub updated_at: String, +} + +pub struct UpsertEvent<'a> { + pub id: &'a str, + pub tracking_id_event: &'a str, + pub calendar_id: &'a str, + pub title: &'a str, + pub started_at: &'a str, + pub ended_at: &'a str, + pub location: &'a str, + pub meeting_link: &'a str, + pub description: &'a str, + pub note: &'a str, + pub recurrence_series_id: &'a str, + pub has_recurrence_rules: bool, + pub is_all_day: bool, + pub provider: &'a str, + pub participants_json: Option<&'a str>, +} diff --git a/crates/db-app/src/lib.rs b/crates/db-app/src/lib.rs index d5829dfc26..3088d44ad7 100644 --- a/crates/db-app/src/lib.rs +++ b/crates/db-app/src/lib.rs @@ -2,28 +2,50 @@ mod activity_ops; mod activity_types; +mod calendar_ops; +mod calendar_types; mod cloudsync; mod daily_note_ops; mod daily_note_types; mod daily_summary_ops; mod daily_summary_types; +mod event_ops; +mod event_types; mod template_ops; mod template_types; pub use activity_ops::*; pub use activity_types::*; +pub use calendar_ops::*; +pub use calendar_types::*; pub use cloudsync::*; pub use daily_note_ops::*; pub use daily_note_types::*; pub use daily_summary_ops::*; pub use daily_summary_types::*; +pub use event_ops::*; +pub use event_types::*; pub use template_ops::*; pub use template_types::*; -use sqlx::SqlitePool; - -pub async fn migrate(pool: &SqlitePool) -> Result<(), sqlx::migrate::MigrateError> { - sqlx::migrate!("./migrations").run(pool).await +pub const APP_MIGRATION_STEPS: &[hypr_db_migrate::MigrationStep] = &[ + hypr_db_migrate::MigrationStep { + id: "20260413020000_templates", + scope: hypr_db_migrate::MigrationScope::Plain, + sql: include_str!("../migrations/20260413020000_templates.sql"), + }, + hypr_db_migrate::MigrationStep { + id: "20260414120000_calendars_events", + scope: hypr_db_migrate::MigrationScope::Plain, + sql: include_str!("../migrations/20260414120000_calendars_events.sql"), + }, +]; + +pub fn schema() -> hypr_db_migrate::DbSchema { + hypr_db_migrate::DbSchema { + steps: APP_MIGRATION_STEPS, + validate_cloudsync_table: cloudsync_alter_guard_required, + } } #[cfg(test)] @@ -33,9 +55,50 @@ mod tests { use sqlx::Row; async fn test_db() -> Db3 { - let db = Db3::connect_memory_plain().await.unwrap(); - migrate(db.pool()).await.unwrap(); - db + hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Memory, + cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, + }, + schema(), + ) + .await + .unwrap() + } + + #[tokio::test] + async fn schema_declares_legacy_migrations_and_cloudsync_registry() { + let db = hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Memory, + cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, + }, + schema(), + ) + .await + .unwrap(); + + let tables: Vec = sqlx::query_scalar( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_sqlx%' ORDER BY name", + ) + .fetch_all(db.pool().as_ref()) + .await + .unwrap(); + + assert!(tables.contains(&"templates".to_string())); + assert!(tables.contains(&"app_migrations".to_string())); } #[tokio::test] @@ -54,14 +117,7 @@ mod tests { assert_eq!( tables, - vec![ - "activity_observation_analyses", - "activity_observation_events", - "activity_screenshots", - "daily_notes", - "daily_summaries", - "templates", - ] + vec!["app_migrations", "calendars", "events", "templates"] ); } @@ -276,6 +332,71 @@ mod tests { assert!(get_daily_summary(db.pool(), "ds1").await.unwrap().is_none()); } + #[tokio::test] + async fn calendar_roundtrip() { + let db = test_db().await; + + upsert_calendar( + db.pool(), + UpsertCalendar { + id: "cal1", + tracking_id_calendar: "tracking-cal-1", + name: "Work", + enabled: true, + provider: "google", + source: "team", + color: "#123456", + connection_id: "conn-1", + }, + ) + .await + .unwrap(); + + let row = get_calendar(db.pool(), "cal1").await.unwrap().unwrap(); + assert_eq!(row.name, "Work"); + assert!(row.enabled); + + let rows = list_calendars(db.pool()).await.unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "cal1"); + } + + #[tokio::test] + async fn event_roundtrip() { + let db = test_db().await; + + upsert_event( + db.pool(), + UpsertEvent { + id: "evt1", + tracking_id_event: "tracking-evt-1", + calendar_id: "cal1", + title: "Standup", + started_at: "2026-04-15T09:00:00Z", + ended_at: "2026-04-15T09:30:00Z", + location: "", + meeting_link: "https://meet.example/1", + description: "Daily sync", + note: "", + recurrence_series_id: "series-1", + has_recurrence_rules: true, + is_all_day: false, + provider: "google", + participants_json: Some("[{\"email\":\"a@example.com\"}]"), + }, + ) + .await + .unwrap(); + + let row = get_event(db.pool(), "evt1").await.unwrap().unwrap(); + assert_eq!(row.title, "Standup"); + assert_eq!(row.calendar_id, "cal1"); + + let rows = list_events(db.pool()).await.unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "evt1"); + } + #[tokio::test] async fn template_roundtrip() { let db = test_db().await; diff --git a/crates/db-core2/AGENTS.md b/crates/db-core2/AGENTS.md index 2314f3d4f7..92b586ca64 100644 --- a/crates/db-core2/AGENTS.md +++ b/crates/db-core2/AGENTS.md @@ -3,7 +3,7 @@ ## Role - `db-core2` is the database substrate layer. -- It owns `Db3`/`DbPool`, SQLite open options, pool lifecycle, migration failure policy, and per-connection SQLite wiring. +- It owns `Db3`/`DbPool`, SQLite open options, pool lifecycle, storage-recreation primitives, and per-connection SQLite wiring. - Raw SQLite hook integration belongs here, including `sqlite3_update_hook`. - Cloudsync integration also belongs here because it is part of how the database is opened and managed, not how queries are exposed to the app. - Higher layers should consume `Db3`/`DbPool` and raw table-change events from here instead of reimplementing pool setup. @@ -14,7 +14,8 @@ - Applying low-level SQLite pragmas and connection policy. - Installing per-connection hooks in `SqlitePoolOptions::after_connect`. - Exposing best-effort table-level mutation notifications for pooled writes. -- Database recreation behavior when migrations fail and policy requests it. +- Database recreation primitives that upper layers may invoke when their policy requests it. +- Connection-scoped CloudSync helpers that must run on one checked-out executor. - Keeping `DbPool` ergonomic as an `sqlx::SqlitePool` wrapper. ## This Crate Does Not Own @@ -33,6 +34,7 @@ - Change events are table-level, not row-level or predicate-level. - `DbPool` must continue to `Deref`/`AsRef` to `SqlitePool` so existing SQL callers stay ergonomic. - App code may supply a migration callback, but the crate must stay schema-agnostic. +- CloudSync operations that require executor affinity should be wrapped here so upper layers do not call `hypr_cloudsync` directly. - Reactive support must stay additive to normal database usage; callers that do not subscribe should see ordinary open/query behavior. ## Dependency Direction diff --git a/crates/db-core2/src/cloudsync.rs b/crates/db-core2/src/cloudsync.rs index dfb0dc0f1d..4c75f65cd6 100644 --- a/crates/db-core2/src/cloudsync.rs +++ b/crates/db-core2/src/cloudsync.rs @@ -3,7 +3,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use backon::{ExponentialBuilder, Retryable}; use serde::{Deserialize, Serialize}; -use sqlx::SqlitePool; +use sqlx::{Executor, Sqlite, SqlitePool}; use tokio::sync::{broadcast, oneshot}; use tokio::task::JoinHandle; @@ -72,6 +72,8 @@ pub enum CloudsyncRuntimeError { NotConfigured, #[error("cloudsync runtime is not started")] NotStarted, + #[error("cloudsync runtime is running; stop it first or use cloudsync_reconfigure")] + RestartRequired, #[error("cloudsync sync interval must be greater than 0")] InvalidSyncInterval, #[error(transparent)] @@ -186,11 +188,33 @@ impl Db3 { config: CloudsyncRuntimeConfig, ) -> Result<(), CloudsyncRuntimeError> { let mut runtime = self.cloudsync_runtime.lock().unwrap(); + if runtime.running { + return Err(CloudsyncRuntimeError::RestartRequired); + } runtime.config = Some(config.normalized()?); runtime.last_error = None; Ok(()) } + pub async fn cloudsync_reconfigure( + &self, + config: CloudsyncRuntimeConfig, + ) -> Result<(), CloudsyncRuntimeError> { + let was_running = self.cloudsync_runtime.lock().unwrap().running; + + if was_running { + self.cloudsync_stop().await?; + } + + self.cloudsync_configure(config)?; + + if was_running { + self.cloudsync_start().await?; + } + + Ok(()) + } + pub async fn cloudsync_start(&self) -> Result<(), CloudsyncRuntimeError> { if self.cloudsync_open_mode == CloudsyncOpenMode::Disabled { let mut runtime = self.cloudsync_runtime.lock().unwrap(); @@ -404,14 +428,14 @@ impl Db3 { &self, table_name: &str, ) -> Result<(), hypr_cloudsync::Error> { - hypr_cloudsync::begin_alter(self.pool.as_ref(), table_name).await + cloudsync_begin_alter_on(self.pool.as_ref(), table_name).await } pub async fn cloudsync_commit_alter( &self, table_name: &str, ) -> Result<(), hypr_cloudsync::Error> { - hypr_cloudsync::commit_alter(self.pool.as_ref(), table_name).await + cloudsync_commit_alter_on(self.pool.as_ref(), table_name).await } pub async fn cloudsync_cleanup(&self, table_name: &str) -> Result<(), hypr_cloudsync::Error> { @@ -485,6 +509,26 @@ impl Db3 { } } +pub async fn cloudsync_begin_alter_on<'e, E>( + executor: E, + table_name: &str, +) -> Result<(), hypr_cloudsync::Error> +where + E: Executor<'e, Database = Sqlite>, +{ + hypr_cloudsync::begin_alter(executor, table_name).await +} + +pub async fn cloudsync_commit_alter_on<'e, E>( + executor: E, + table_name: &str, +) -> Result<(), hypr_cloudsync::Error> +where + E: Executor<'e, Database = Sqlite>, +{ + hypr_cloudsync::commit_alter(executor, table_name).await +} + const MAX_BACKOFF_SECS: u64 = 300; async fn cloudsync_background_loop( diff --git a/crates/db-core2/src/lib.rs b/crates/db-core2/src/lib.rs index 5c90245325..f2d61931d8 100644 --- a/crates/db-core2/src/lib.rs +++ b/crates/db-core2/src/lib.rs @@ -1,21 +1,18 @@ mod cloudsync; mod pool; -use std::future::Future; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; pub use hypr_cloudsync::Error; -use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use crate::cloudsync::CloudsyncRuntimeState; pub use crate::cloudsync::{ CloudsyncAuth, CloudsyncOpenMode, CloudsyncRuntimeConfig, CloudsyncRuntimeError, - CloudsyncStatus, CloudsyncTableSpec, + CloudsyncStatus, CloudsyncTableSpec, cloudsync_begin_alter_on, cloudsync_commit_alter_on, }; use crate::pool::connect_pool; pub use crate::pool::{DbPool, TableChange, TableChangeKind}; @@ -26,12 +23,6 @@ pub enum DbStorage<'a> { Memory, } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum MigrationFailurePolicy { - Fail, - Recreate, -} - #[derive(Clone, Copy, Debug)] pub struct DbOpenOptions<'a> { pub storage: DbStorage<'a>, @@ -39,7 +30,6 @@ pub struct DbOpenOptions<'a> { pub journal_mode_wal: bool, pub foreign_keys: bool, pub max_connections: Option, - pub migration_failure_policy: MigrationFailurePolicy, } #[derive(Debug, thiserror::Error)] @@ -50,16 +40,10 @@ pub enum DbOpenError { Sqlx(#[from] sqlx::Error), #[error(transparent)] Cloudsync(#[from] hypr_cloudsync::Error), - #[error("migration failed: {0}")] - Migration(String), - #[error("failed to recreate database after migration failure: {0}")] - RecreateFailed(String), } pub type ManagedDb = std::sync::Arc; -type BoxedMigrationFuture<'a, E> = Pin> + Send + 'a>>; - const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(5); pub struct Db3 { @@ -80,32 +64,27 @@ impl std::fmt::Debug for Db3 { } } -impl Db3 { - pub async fn open_with_migrate( - options: DbOpenOptions<'_>, - migrate: F, - ) -> Result - where - F: for<'a> Fn(&'a SqlitePool) -> BoxedMigrationFuture<'a, E>, - E: std::fmt::Display, - { - match try_open_with_migrate(&options, &migrate).await { - Ok(db) => Ok(db), - Err(DbOpenError::Migration(message)) - if matches!( - options.migration_failure_policy, - MigrationFailurePolicy::Recreate - ) => - { - tracing::warn!("database migration failed, recreating fresh database: {message}"); - recreate_storage(&options)?; - try_open_with_migrate(&options, &migrate) - .await - .map_err(|error| DbOpenError::RecreateFailed(error.to_string())) +impl Drop for Db3 { + fn drop(&mut self) { + let task = { + let mut runtime = self.cloudsync_runtime.lock().unwrap(); + runtime.running = false; + runtime.task.take() + }; + + if let Some(mut task) = task { + if let Some(shutdown_tx) = task.shutdown_tx.take() { + let _ = shutdown_tx.send(()); } - Err(error) => Err(error), + task.join_handle.abort(); } } +} + +impl Db3 { + pub async fn open(options: DbOpenOptions<'_>) -> Result { + connect_with_options(&options).await + } pub async fn connect_local(path: impl AsRef) -> Result { if let Some(parent) = path.as_ref().parent() { @@ -176,24 +155,6 @@ impl Db3 { } } -async fn try_open_with_migrate( - options: &DbOpenOptions<'_>, - migrate: &F, -) -> Result -where - F: for<'a> Fn(&'a SqlitePool) -> BoxedMigrationFuture<'a, E>, - E: std::fmt::Display, -{ - let db = connect_with_options(options).await?; - - if let Err(error) = migrate(db.pool()).await { - db.pool.clone().close().await; - return Err(DbOpenError::Migration(error.to_string())); - } - - Ok(db) -} - async fn connect_with_options(options: &DbOpenOptions<'_>) -> Result { let mut connect_options = match options.storage { DbStorage::Local(path) => { @@ -242,7 +203,7 @@ fn apply_internal_connect_policy(connect_options: SqliteConnectOptions) -> Sqlit connect_options.busy_timeout(SQLITE_BUSY_TIMEOUT) } -fn recreate_storage(options: &DbOpenOptions<'_>) -> Result<(), DbOpenError> { +pub fn recreate_storage(options: &DbOpenOptions<'_>) -> Result<(), DbOpenError> { match options.storage { DbStorage::Local(path) => { wipe_db_file(path); @@ -270,7 +231,10 @@ fn wipe_db_file(path: &Path) { #[cfg(test)] mod tests { use super::*; - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + use tokio::sync::oneshot; fn test_cloudsync_config() -> CloudsyncRuntimeConfig { CloudsyncRuntimeConfig { @@ -298,39 +262,40 @@ mod tests { } #[tokio::test] - async fn open_with_migrate_recreates_local_db_when_requested() { + async fn recreate_storage_wipes_local_db_when_requested() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("app.db"); - let attempts = AtomicUsize::new(0); - - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Recreate, - }, - |pool| { - let n = attempts.fetch_add(1, Ordering::SeqCst); - Box::pin(async move { - if n == 0 { - sqlx::query("CREATE TABLE broken (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Err("boom") - } else { - sqlx::query("CREATE TABLE fresh (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Ok::<(), &'static str>(()) - } - }) - }, - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + sqlx::query("CREATE TABLE broken (id TEXT PRIMARY KEY NOT NULL)") + .execute(db.pool().as_ref()) + .await + .unwrap(); + db.pool.clone().close().await; + + recreate_storage(&DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) + .unwrap(); + + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) .await .unwrap(); @@ -344,83 +309,21 @@ mod tests { .map(|row| row.0) .collect(); - assert_eq!(attempts.load(Ordering::SeqCst), 2); - assert_eq!(tables, vec!["fresh"]); + assert!(tables.is_empty()); } #[tokio::test] - async fn open_with_migrate_returns_migration_error_when_fail_policy_is_used() { - let error = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Err::<(), _>("nope") }), - ) - .await - .unwrap_err(); - - assert!(matches!(error, DbOpenError::Migration(message) if message == "nope")); - } - - #[tokio::test] - async fn open_with_migrate_returns_recreate_failed_when_retry_also_fails() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("app.db"); - let attempts = AtomicUsize::new(0); - - let error = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Recreate, - }, - |pool| { - let n = attempts.fetch_add(1, Ordering::SeqCst); - Box::pin(async move { - let table_name = if n == 0 { - "first_attempt" - } else { - "second_attempt" - }; - let sql = format!("CREATE TABLE {table_name} (id TEXT PRIMARY KEY NOT NULL)"); - sqlx::query(&sql).execute(pool).await.unwrap(); - Err::<(), &'static str>("still broken") - }) - }, - ) - .await - .unwrap_err(); - - assert_eq!(attempts.load(Ordering::SeqCst), 2); - assert!( - matches!(error, DbOpenError::RecreateFailed(message) if message == "migration failed: still broken") - ); - } - - #[tokio::test] - async fn open_with_migrate_applies_requested_pragmas() { + async fn open_applies_requested_pragmas() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("app.db"); - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) .await .unwrap(); @@ -444,17 +347,13 @@ mod tests { #[tokio::test] async fn disabled_open_mode_keeps_cloudsync_inert() { - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(1), + }) .await .unwrap(); @@ -482,6 +381,103 @@ mod tests { assert!(matches!(error, CloudsyncRuntimeError::NotConfigured)); } + #[tokio::test] + async fn configure_rejects_live_runtime_changes() { + let db = Db3::connect_memory_plain().await.unwrap(); + db.cloudsync_configure(test_cloudsync_config()).unwrap(); + db.cloudsync_runtime.lock().unwrap().running = true; + + let error = db + .cloudsync_configure(CloudsyncRuntimeConfig { + connection_string: "sqlitecloud://demo.invalid/other.db?apikey=demo".to_string(), + ..test_cloudsync_config() + }) + .unwrap_err(); + + assert!(matches!(error, CloudsyncRuntimeError::RestartRequired)); + assert_eq!( + db.cloudsync_runtime + .lock() + .unwrap() + .config + .as_ref() + .unwrap() + .connection_string, + "sqlitecloud://demo.invalid/app.db?apikey=demo" + ); + } + + #[tokio::test] + async fn reconfigure_preserves_stopped_state_when_runtime_is_inert() { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + db.cloudsync_configure(test_cloudsync_config()).unwrap(); + { + let mut runtime = db.cloudsync_runtime.lock().unwrap(); + runtime.running = true; + runtime.network_initialized = true; + } + + let next_config = CloudsyncRuntimeConfig { + connection_string: "sqlitecloud://demo.invalid/reconfigured.db?apikey=demo".to_string(), + sync_interval_ms: 2_000, + ..test_cloudsync_config() + }; + + db.cloudsync_reconfigure(next_config.clone()).await.unwrap(); + + let runtime = db.cloudsync_runtime.lock().unwrap(); + assert_eq!(runtime.config, Some(next_config)); + assert!(!runtime.running); + assert!(!runtime.network_initialized); + } + + #[tokio::test] + async fn dropping_db_stops_background_task_best_effort() { + struct DropFlag(Arc); + + impl Drop for DropFlag { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let db = Db3::connect_memory_plain().await.unwrap(); + let dropped = Arc::new(AtomicBool::new(false)); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let guard = DropFlag(Arc::clone(&dropped)); + let join_handle = tokio::spawn(async move { + let _guard = guard; + let _ = shutdown_rx.await; + }); + + { + let mut runtime = db.cloudsync_runtime.lock().unwrap(); + runtime.running = true; + runtime.task = Some(crate::cloudsync::CloudsyncBackgroundTask { + shutdown_tx: Some(shutdown_tx), + join_handle, + }); + } + + drop(db); + + tokio::time::timeout(std::time::Duration::from_secs(1), async { + while !dropped.load(Ordering::SeqCst) { + tokio::task::yield_now().await; + } + }) + .await + .unwrap(); + } + #[tokio::test] async fn emits_table_changes_for_local_writes() { let db = Db3::connect_memory_plain().await.unwrap(); @@ -661,27 +657,19 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("app.db"); - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| { - Box::pin(async move { - sqlx::query("CREATE TABLE multi_conn_events (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Ok::<(), sqlx::Error>(()) - }) - }, - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&path), + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); + sqlx::query("CREATE TABLE multi_conn_events (id TEXT PRIMARY KEY NOT NULL)") + .execute(db.pool().as_ref()) + .await + .unwrap(); let mut changes = db.subscribe_table_changes(); let mut conn_a = db.pool().acquire().await.unwrap(); @@ -768,18 +756,14 @@ mod tests { } #[tokio::test] - async fn open_with_migrate_memory_clamps_max_connections_to_one() { - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + async fn open_memory_clamps_max_connections_to_one() { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); diff --git a/crates/db-live-query/Cargo.toml b/crates/db-live-query/Cargo.toml index a70d0c7d57..6b0b86528a 100644 --- a/crates/db-live-query/Cargo.toml +++ b/crates/db-live-query/Cargo.toml @@ -13,8 +13,10 @@ tokio = { workspace = true, features = ["macros", "sync", "time"] } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] -anyhow = { workspace = true } hypr-cloudsync = { workspace = true } hypr-db-app = { workspace = true } +hypr-db-migrate = { workspace = true } + +anyhow = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "time"] } diff --git a/crates/db-live-query/src/lib.rs b/crates/db-live-query/src/lib.rs index 97462d8393..b27b102204 100644 --- a/crates/db-live-query/src/lib.rs +++ b/crates/db-live-query/src/lib.rs @@ -3,519 +3,17 @@ mod error; mod explain; mod query; +mod runtime; mod schema; mod subscriptions; +mod types; mod watch; -use std::collections::HashSet; -use std::sync::Arc; - -use tokio::sync::broadcast::error::{RecvError, TryRecvError}; -use tokio::sync::watch as tokio_watch; - -use hypr_db_core2::Db3; -use schema::CatalogStore; -use subscriptions::{QueryEventPayload, RefreshJob, Registry}; - pub use error::{Error, Result}; pub use explain::extract_dependencies; +pub use runtime::DbRuntime; pub use schema::DependencyResolutionError; - -use query::{run_query, run_query_proxy}; - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ProxyQueryMethod { - Run, - All, - Get, - Values, -} - -impl ProxyQueryMethod { - fn parse(method: &str) -> Result { - match method { - "run" => Ok(Self::Run), - "all" => Ok(Self::All), - "get" => Ok(Self::Get), - "values" => Ok(Self::Values), - _ => Err(Error::InvalidQueryMethod(method.to_string())), - } - } -} - -#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] -pub struct ProxyQueryResult { - pub rows: Vec, -} - -pub trait QueryEventSink: Clone + Send + 'static { - fn send_result(&self, rows: Vec) -> std::result::Result<(), String>; - fn send_error(&self, error: String) -> std::result::Result<(), String>; -} - -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub enum DependencyTarget { - Table(String), - VirtualTable(String), -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum DependencyAnalysis { - Reactive { targets: HashSet }, - NonReactive { reason: String }, -} - -pub struct DbRuntime { - db: Arc, - catalog: CatalogStore, - subscriptions: Registry, - shutdown_tx: tokio_watch::Sender, - dispatcher: std::sync::Mutex>>, -} - -pub struct SubscriptionRegistration { - pub id: String, - pub analysis: DependencyAnalysis, -} - -impl DbRuntime { - pub fn new(db: Arc) -> Self { - let db = db; - let catalog = CatalogStore::default(); - let subscriptions = Registry::default(); - let (shutdown_tx, mut shutdown_rx) = tokio_watch::channel(false); - let mut change_rx = db.subscribe_table_changes(); - let dispatcher_catalog = catalog.clone(); - let dispatcher_subscriptions = subscriptions.clone(); - let dispatcher_db = Arc::clone(&db); - - let dispatcher = tokio::spawn(async move { - loop { - tokio::select! { - changed = shutdown_rx.changed() => { - if changed.is_err() || *shutdown_rx.borrow() { - break; - } - } - change = change_rx.recv() => { - let jobs = match change { - Ok(first_change) => { - let mut changed_tables = HashSet::from([first_change.table]); - let mut trigger_seq = first_change.seq; - let mut rerun_all = false; - loop { - match change_rx.try_recv() { - Ok(next_change) => { - trigger_seq = trigger_seq.max(next_change.seq); - changed_tables.insert(next_change.table); - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Closed) => break, - Err(TryRecvError::Lagged(_)) => { - rerun_all = true; - } - } - } - - if rerun_all { - let trigger_seq = - dispatcher_db.pool().current_table_change_seq(); - dispatcher_subscriptions.collect_all_jobs(trigger_seq).await - } else { - match dispatcher_catalog - .canonicalize_raw_tables( - dispatcher_db.pool().as_ref(), - &changed_tables, - ) - .await - { - Ok(changed_targets) => { - dispatcher_subscriptions - .collect_jobs(&changed_targets, trigger_seq) - .await - } - Err(_) => { - dispatcher_subscriptions - .collect_all_jobs(trigger_seq) - .await - } - } - } - } - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(_)) => { - loop { - match change_rx.try_recv() { - Ok(_) | Err(TryRecvError::Lagged(_)) => {} - Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, - } - } - let trigger_seq = dispatcher_db.pool().current_table_change_seq(); - dispatcher_subscriptions.collect_all_jobs(trigger_seq).await - } - }; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - if jobs.is_empty() { - continue; - } - - for job in jobs { - dispatcher_subscriptions - .refresh(&dispatcher_db, job, None) - .await; - } - } - } - } - }); - - Self { - db, - catalog, - subscriptions, - shutdown_tx, - dispatcher: std::sync::Mutex::new(Some(dispatcher)), - } - } - - pub async fn execute( - &self, - sql: String, - params: Vec, - ) -> Result> { - run_query(&self.db, &sql, ¶ms).await.map_err(Into::into) - } - - pub async fn execute_proxy( - &self, - sql: String, - params: Vec, - method: String, - ) -> Result { - let method = ProxyQueryMethod::parse(&method)?; - run_query_proxy(&self.db, &sql, ¶ms, method) - .await - .map_err(Into::into) - } - - pub async fn subscribe( - &self, - sql: String, - params: Vec, - sink: S, - ) -> Result { - let baseline_seq = self.db.pool().current_table_change_seq(); - let analysis = match self - .catalog - .analyze_query(self.db.pool().as_ref(), &sql) - .await - { - Ok(resolved) => DependencyAnalysis::Reactive { - targets: resolved.targets, - }, - Err(error) => DependencyAnalysis::NonReactive { - reason: error.to_string(), - }, - }; - let registered = self - .subscriptions - .register(sql.clone(), params.clone(), sink.clone(), analysis) - .await; - #[cfg(test)] - test_support::before_initial_payload_load().await; - let initial_payload = QueryEventPayload::load(&self.db, &sql, ¶ms).await; - - let event_result = initial_payload.send_to(&sink); - - if let Err(error) = event_result { - self.subscriptions - .unregister(®istered.registration.id) - .await; - return Err(Error::Sink(error)); - } - - if let Some(watch_id) = registered.reactive_watch_id { - let latest_dependency_seq = match ®istered.registration.analysis { - DependencyAnalysis::Reactive { targets } => self - .catalog - .latest_dependency_seq(self.db.pool(), targets) - .await - .ok() - .flatten() - .unwrap_or(baseline_seq), - DependencyAnalysis::NonReactive { .. } => baseline_seq, - }; - self.subscriptions - .activate(watch_id, latest_dependency_seq) - .await; - if latest_dependency_seq > baseline_seq { - self.subscriptions - .refresh( - &self.db, - RefreshJob { - watch_id, - sql, - params, - }, - Some(&initial_payload), - ) - .await; - } - } - - Ok(registered.registration) - } - - pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> { - let removed = self.subscriptions.unregister(subscription_id).await; - if removed { - Ok(()) - } else { - Err(Error::SubscriptionNotFound(subscription_id.to_string())) - } - } - - pub async fn dependency_analysis(&self, subscription_id: &str) -> Option { - self.subscriptions - .dependency_analysis(subscription_id) - .await - } - - pub fn db(&self) -> &Db3 { - self.db.as_ref() - } -} - -#[cfg(test)] -mod test_support { - use std::sync::Arc; - use std::sync::OnceLock; - use std::sync::atomic::{AtomicBool, Ordering}; - - use tokio::sync::{Mutex, Notify}; - - struct InitialPayloadHook { - reached: AtomicBool, - reached_notify: Notify, - released: AtomicBool, - release_notify: Notify, - } - - impl InitialPayloadHook { - fn new() -> Self { - Self { - reached: AtomicBool::new(false), - reached_notify: Notify::new(), - released: AtomicBool::new(false), - release_notify: Notify::new(), - } - } - } - - pub(crate) struct InitialPayloadHookHandle { - hook: Arc, - } - - fn hook_slot() -> &'static Mutex>> { - static SLOT: OnceLock>>> = OnceLock::new(); - SLOT.get_or_init(|| Mutex::new(None)) - } - - pub(crate) async fn install_initial_payload_hook() -> InitialPayloadHookHandle { - let hook = Arc::new(InitialPayloadHook::new()); - *hook_slot().lock().await = Some(Arc::clone(&hook)); - InitialPayloadHookHandle { hook } - } - - pub(crate) async fn before_initial_payload_load() { - let hook = hook_slot().lock().await.clone(); - let Some(hook) = hook else { - return; - }; - - hook.reached.store(true, Ordering::SeqCst); - hook.reached_notify.notify_waiters(); - while !hook.released.load(Ordering::SeqCst) { - hook.release_notify.notified().await; - } - } - - impl InitialPayloadHookHandle { - pub(crate) async fn wait_until_reached(&self) { - tokio::time::timeout(std::time::Duration::from_secs(1), async { - while !self.hook.reached.load(Ordering::SeqCst) { - self.hook.reached_notify.notified().await; - } - }) - .await - .expect("initial payload hook should be reached"); - } - - pub(crate) async fn release(self) { - self.hook.released.store(true, Ordering::SeqCst); - self.hook.release_notify.notify_waiters(); - *hook_slot().lock().await = None; - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::{Arc, Mutex}; - use std::time::Duration; - - use hypr_db_core2::{DbOpenOptions, DbStorage, MigrationFailurePolicy}; - use serde_json::json; - - use super::*; - - #[derive(Clone, Debug, PartialEq)] - enum TestEvent { - Result(Vec), - Error(String), - } - - #[derive(Clone)] - struct TestSink { - events: Arc>>, - } - - impl QueryEventSink for TestSink { - fn send_result(&self, rows: Vec) -> std::result::Result<(), String> { - self.push(TestEvent::Result(rows)) - } - - fn send_error(&self, error: String) -> std::result::Result<(), String> { - self.push(TestEvent::Error(error)) - } - } - - impl TestSink { - fn capture() -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(Vec::new())); - ( - Self { - events: Arc::clone(&events), - }, - events, - ) - } - - fn push(&self, event: TestEvent) -> std::result::Result<(), String> { - self.events.lock().unwrap().push(event); - Ok(()) - } - } - - async fn next_event( - events: &Arc>>, - index: usize, - ) -> anyhow::Result { - tokio::time::timeout(Duration::from_secs(1), async { - loop { - if let Some(event) = events.lock().unwrap().get(index).cloned() { - return event; - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .map_err(anyhow::Error::from) - } - - async fn wait_for_stable_event_count( - events: &Arc>>, - stable_for: Duration, - ) -> usize { - let mut last_len = events.lock().unwrap().len(); - loop { - tokio::time::sleep(stable_for).await; - let len = events.lock().unwrap().len(); - if len == last_len { - return len; - } - last_len = len; - } - } - - async fn setup_runtime() -> (tempfile::TempDir, sqlx::SqlitePool, DbRuntime) { - let dir = tempfile::tempdir().unwrap(); - let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) - .await - .unwrap(); - - let pool = db.pool().as_ref().clone(); - (dir, pool, DbRuntime::new(Arc::new(db))) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn stale_init_time_broadcast_processed_after_activation_is_ignored() { - let (_dir, pool, runtime) = setup_runtime().await; - let hook = test_support::install_initial_payload_hook().await; - let (sink, events) = TestSink::capture(); - - let subscribe = tokio::spawn(async move { - runtime - .subscribe( - "SELECT id FROM daily_notes WHERE id = ?".to_string(), - vec![json!("note-stale-after-activation")], - sink, - ) - .await - }); - - hook.wait_until_reached().await; - - sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") - .bind("note-stale-after-activation") - .bind("2026-04-25") - .bind("{}") - .bind("user-stale") - .execute(&pool) - .await - .unwrap(); - - for idx in 0..320 { - sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") - .bind(format!("note-lag-{idx}")) - .bind("2026-04-25") - .bind("{}") - .bind(format!("user-lag-{idx}")) - .execute(&pool) - .await - .unwrap(); - } - - hook.release().await; - subscribe.await.unwrap().unwrap(); - - let initial = next_event(&events, 0).await.unwrap(); - assert_eq!( - initial, - TestEvent::Result(vec![json!({ "id": "note-stale-after-activation" })]) - ); - - let stable_count = wait_for_stable_event_count(&events, Duration::from_millis(100)).await; - assert_eq!(stable_count, 1); - } -} - -impl Drop for DbRuntime { - fn drop(&mut self) { - let _ = self.shutdown_tx.send(true); - if let Some(dispatcher) = self.dispatcher.lock().unwrap().take() { - dispatcher.abort(); - } - } -} +pub use types::{ + DependencyAnalysis, DependencyTarget, ProxyQueryMethod, ProxyQueryResult, QueryEventSink, + SubscriptionRegistration, +}; diff --git a/crates/db-live-query/src/runtime.rs b/crates/db-live-query/src/runtime.rs new file mode 100644 index 0000000000..aaedb06f32 --- /dev/null +++ b/crates/db-live-query/src/runtime.rs @@ -0,0 +1,441 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use tokio::sync::broadcast::error::{RecvError, TryRecvError}; +use tokio::sync::watch as tokio_watch; + +use hypr_db_core2::Db3; + +use crate::error::{Error, Result}; +use crate::query::{run_query, run_query_proxy}; +use crate::schema::CatalogStore; +use crate::subscriptions::{QueryEventPayload, RefreshJob, Registry}; +use crate::types::{ + DependencyAnalysis, ProxyQueryMethod, ProxyQueryResult, QueryEventSink, + SubscriptionRegistration, +}; + +pub struct DbRuntime { + db: Arc, + catalog: CatalogStore, + subscriptions: Registry, + shutdown_tx: tokio_watch::Sender, + dispatcher: std::sync::Mutex>>, +} + +impl DbRuntime { + pub fn new(db: Arc) -> Self { + let db = db; + let catalog = CatalogStore::default(); + let subscriptions = Registry::default(); + let (shutdown_tx, mut shutdown_rx) = tokio_watch::channel(false); + let mut change_rx = db.subscribe_table_changes(); + let dispatcher_catalog = catalog.clone(); + let dispatcher_subscriptions = subscriptions.clone(); + let dispatcher_db = Arc::clone(&db); + + let dispatcher = tokio::spawn(async move { + loop { + tokio::select! { + changed = shutdown_rx.changed() => { + if changed.is_err() || *shutdown_rx.borrow() { + break; + } + } + change = change_rx.recv() => { + let jobs = match change { + Ok(first_change) => { + let mut changed_tables = HashSet::from([first_change.table]); + let mut trigger_seq = first_change.seq; + let mut rerun_all = false; + loop { + match change_rx.try_recv() { + Ok(next_change) => { + trigger_seq = trigger_seq.max(next_change.seq); + changed_tables.insert(next_change.table); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Closed) => break, + Err(TryRecvError::Lagged(_)) => { + rerun_all = true; + } + } + } + + if rerun_all { + let trigger_seq = + dispatcher_db.pool().current_table_change_seq(); + dispatcher_subscriptions.collect_all_jobs(trigger_seq).await + } else { + match dispatcher_catalog + .canonicalize_raw_tables( + dispatcher_db.pool().as_ref(), + &changed_tables, + ) + .await + { + Ok(changed_targets) => { + dispatcher_subscriptions + .collect_jobs(&changed_targets, trigger_seq) + .await + } + Err(_) => { + dispatcher_subscriptions + .collect_all_jobs(trigger_seq) + .await + } + } + } + } + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => { + loop { + match change_rx.try_recv() { + Ok(_) | Err(TryRecvError::Lagged(_)) => {} + Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break, + } + } + let trigger_seq = dispatcher_db.pool().current_table_change_seq(); + dispatcher_subscriptions.collect_all_jobs(trigger_seq).await + } + }; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + if jobs.is_empty() { + continue; + } + + for job in jobs { + dispatcher_subscriptions + .refresh(&dispatcher_db, job, None) + .await; + } + } + } + } + }); + + Self { + db, + catalog, + subscriptions, + shutdown_tx, + dispatcher: std::sync::Mutex::new(Some(dispatcher)), + } + } + + pub async fn execute( + &self, + sql: String, + params: Vec, + ) -> Result> { + run_query(&self.db, &sql, ¶ms).await.map_err(Into::into) + } + + pub async fn execute_proxy( + &self, + sql: String, + params: Vec, + method: String, + ) -> Result { + let method: ProxyQueryMethod = method.parse()?; + run_query_proxy(&self.db, &sql, ¶ms, method) + .await + .map_err(Into::into) + } + + pub async fn subscribe( + &self, + sql: String, + params: Vec, + sink: S, + ) -> Result { + let baseline_seq = self.db.pool().current_table_change_seq(); + let analysis = match self + .catalog + .analyze_query(self.db.pool().as_ref(), &sql) + .await + { + Ok(resolved) => DependencyAnalysis::Reactive { + targets: resolved.targets, + }, + Err(error) => DependencyAnalysis::NonReactive { + reason: error.to_string(), + }, + }; + let registered = self + .subscriptions + .register(sql.clone(), params.clone(), sink.clone(), analysis) + .await; + #[cfg(test)] + test_support::before_initial_payload_load().await; + let initial_payload = QueryEventPayload::load(&self.db, &sql, ¶ms).await; + + let event_result = initial_payload.send_to(&sink); + + if let Err(error) = event_result { + self.subscriptions + .unregister(®istered.registration.id) + .await; + return Err(Error::Sink(error)); + } + + if let Some(watch_id) = registered.reactive_watch_id { + let latest_dependency_seq = match ®istered.registration.analysis { + DependencyAnalysis::Reactive { targets } => self + .catalog + .latest_dependency_seq(self.db.pool(), targets) + .await + .ok() + .flatten() + .unwrap_or(baseline_seq), + DependencyAnalysis::NonReactive { .. } => baseline_seq, + }; + self.subscriptions + .activate(watch_id, latest_dependency_seq) + .await; + if latest_dependency_seq > baseline_seq { + self.subscriptions + .refresh( + &self.db, + RefreshJob { + watch_id, + sql, + params, + }, + Some(&initial_payload), + ) + .await; + } + } + + Ok(registered.registration) + } + + pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> { + let removed = self.subscriptions.unregister(subscription_id).await; + if removed { + Ok(()) + } else { + Err(Error::SubscriptionNotFound(subscription_id.to_string())) + } + } + + pub async fn dependency_analysis(&self, subscription_id: &str) -> Option { + self.subscriptions + .dependency_analysis(subscription_id) + .await + } + + pub fn db(&self) -> &Db3 { + self.db.as_ref() + } +} + +impl Drop for DbRuntime { + fn drop(&mut self) { + let _ = self.shutdown_tx.send(true); + if let Some(dispatcher) = self.dispatcher.lock().unwrap().take() { + dispatcher.abort(); + } + } +} + +#[cfg(test)] +mod test_support { + use std::sync::Arc; + use std::sync::OnceLock; + use std::sync::atomic::{AtomicBool, Ordering}; + + use tokio::sync::{Mutex, Notify}; + + struct InitialPayloadHook { + reached: AtomicBool, + reached_notify: Notify, + released: AtomicBool, + release_notify: Notify, + } + + impl InitialPayloadHook { + fn new() -> Self { + Self { + reached: AtomicBool::new(false), + reached_notify: Notify::new(), + released: AtomicBool::new(false), + release_notify: Notify::new(), + } + } + } + + pub(crate) struct InitialPayloadHookHandle { + hook: Arc, + } + + fn hook_slot() -> &'static Mutex>> { + static SLOT: OnceLock>>> = OnceLock::new(); + SLOT.get_or_init(|| Mutex::new(None)) + } + + pub(crate) async fn install_initial_payload_hook() -> InitialPayloadHookHandle { + let hook = Arc::new(InitialPayloadHook::new()); + *hook_slot().lock().await = Some(Arc::clone(&hook)); + InitialPayloadHookHandle { hook } + } + + pub(crate) async fn before_initial_payload_load() { + let hook = hook_slot().lock().await.clone(); + let Some(hook) = hook else { + return; + }; + + hook.reached.store(true, Ordering::SeqCst); + hook.reached_notify.notify_waiters(); + while !hook.released.load(Ordering::SeqCst) { + hook.release_notify.notified().await; + } + } + + impl InitialPayloadHookHandle { + pub(crate) async fn wait_until_reached(&self) { + tokio::time::timeout(std::time::Duration::from_secs(1), async { + while !self.hook.reached.load(Ordering::SeqCst) { + self.hook.reached_notify.notified().await; + } + }) + .await + .expect("initial payload hook should be reached"); + } + + pub(crate) async fn release(self) { + self.hook.released.store(true, Ordering::SeqCst); + self.hook.release_notify.notify_waiters(); + *hook_slot().lock().await = None; + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use hypr_db_core2::{DbOpenOptions, DbStorage}; + use serde_json::json; + + use super::*; + use crate::types::QueryEventSink; + + #[derive(Clone, Debug, PartialEq)] + enum TestEvent { + Result(Vec), + Error(String), + } + + #[derive(Clone)] + struct TestSink { + events: Arc>>, + } + + impl QueryEventSink for TestSink { + fn send_result(&self, rows: Vec) -> std::result::Result<(), String> { + self.events.lock().unwrap().push(TestEvent::Result(rows)); + Ok(()) + } + + fn send_error(&self, error: String) -> std::result::Result<(), String> { + self.events.lock().unwrap().push(TestEvent::Error(error)); + Ok(()) + } + } + + impl TestSink { + fn capture() -> (Self, Arc>>) { + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + events: Arc::clone(&events), + }, + events, + ) + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn stale_init_time_broadcast_processed_after_activation_is_ignored() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("app.db"); + let db = hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, + }, + hypr_db_app::schema(), + ) + .await + .unwrap(); + + let pool = db.pool().as_ref().clone(); + let runtime = DbRuntime::new(Arc::new(db)); + + let hook = test_support::install_initial_payload_hook().await; + let (sink, events) = TestSink::capture(); + + let subscribe = tokio::spawn(async move { + runtime + .subscribe( + "SELECT id FROM daily_notes WHERE id = ?".to_string(), + vec![json!("note-stale-after-activation")], + sink, + ) + .await + }); + + hook.wait_until_reached().await; + + sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") + .bind("note-stale-after-activation") + .bind("2026-04-25") + .bind("{}") + .bind("user-stale") + .execute(&pool) + .await + .unwrap(); + + for idx in 0..320 { + sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") + .bind(format!("note-lag-{idx}")) + .bind("2026-04-25") + .bind("{}") + .bind(format!("user-lag-{idx}")) + .execute(&pool) + .await + .unwrap(); + } + + hook.release().await; + subscribe.await.unwrap().unwrap(); + + let initial = tokio::time::timeout(Duration::from_secs(1), async { + loop { + if let Some(event) = events.lock().unwrap().first().cloned() { + return event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + assert_eq!( + initial, + TestEvent::Result(vec![json!({ "id": "note-stale-after-activation" })]) + ); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(events.lock().unwrap().len(), 1); + } +} diff --git a/crates/db-live-query/src/types.rs b/crates/db-live-query/src/types.rs new file mode 100644 index 0000000000..a919c2a8e6 --- /dev/null +++ b/crates/db-live-query/src/types.rs @@ -0,0 +1,52 @@ +use std::collections::HashSet; + +use crate::error::Error; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ProxyQueryMethod { + Run, + All, + Get, + Values, +} + +impl std::str::FromStr for ProxyQueryMethod { + type Err = Error; + + fn from_str(s: &str) -> crate::error::Result { + match s { + "run" => Ok(Self::Run), + "all" => Ok(Self::All), + "get" => Ok(Self::Get), + "values" => Ok(Self::Values), + _ => Err(Error::InvalidQueryMethod(s.to_string())), + } + } +} + +#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub struct ProxyQueryResult { + pub rows: Vec, +} + +pub trait QueryEventSink: Clone + Send + 'static { + fn send_result(&self, rows: Vec) -> std::result::Result<(), String>; + fn send_error(&self, error: String) -> std::result::Result<(), String>; +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub enum DependencyTarget { + Table(String), + VirtualTable(String), +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum DependencyAnalysis { + Reactive { targets: HashSet }, + NonReactive { reason: String }, +} + +pub struct SubscriptionRegistration { + pub id: String, + pub analysis: DependencyAnalysis, +} diff --git a/crates/db-live-query/tests/cloudsync.rs b/crates/db-live-query/tests/cloudsync.rs index 2727456817..4afe51ccba 100644 --- a/crates/db-live-query/tests/cloudsync.rs +++ b/crates/db-live-query/tests/cloudsync.rs @@ -1,65 +1,16 @@ -use std::sync::{Arc, Mutex}; +mod common; + use std::time::Duration; -use db_live_query::{DbRuntime, QueryEventSink}; +use common::{TestEvent, TestSink, next_event}; +use db_live_query::DbRuntime; use hypr_db_core2::Db3; use serde_json::json; -#[derive(Clone, Debug, PartialEq)] -enum TestEvent { - Result(Vec), - Error(String), -} - -#[derive(Clone)] -struct TestSink { - events: Arc>>, -} - -impl QueryEventSink for TestSink { - fn send_result(&self, rows: Vec) -> std::result::Result<(), String> { - self.events.lock().unwrap().push(TestEvent::Result(rows)); - Ok(()) - } - - fn send_error(&self, error: String) -> std::result::Result<(), String> { - self.events.lock().unwrap().push(TestEvent::Error(error)); - Ok(()) - } -} - -impl TestSink { - fn capture() -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(Vec::new())); - ( - Self { - events: Arc::clone(&events), - }, - events, - ) - } -} - fn connection_string() -> String { std::env::var("SQLITECLOUD_URL").expect("SQLITECLOUD_URL must be set") } -async fn next_event( - events: &Arc>>, - index: usize, -) -> anyhow::Result { - tokio::time::timeout(Duration::from_secs(10), async { - loop { - if let Some(event) = events.lock().unwrap().get(index).cloned() { - return event; - } - tokio::time::sleep(Duration::from_millis(25)).await; - } - }) - .await - .map_err(anyhow::Error::from) -} - async fn setup_db() -> Db3 { let db = Db3::connect_memory().await.unwrap(); @@ -101,7 +52,9 @@ async fn cloudsync_pull_refreshes_live_query_subscriptions() { .await .unwrap(); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(10)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); sqlx::query("INSERT INTO test_sync (id, value) VALUES (cloudsync_uuid(), ?)") @@ -117,7 +70,9 @@ async fn cloudsync_pull_refreshes_live_query_subscriptions() { .await .unwrap(); - let event = next_event(&events, 1).await.unwrap(); + let event = next_event(&events, 1, Duration::from_secs(10)) + .await + .unwrap(); let TestEvent::Result(rows) = event else { panic!("expected result event"); }; diff --git a/crates/db-live-query/tests/common/mod.rs b/crates/db-live-query/tests/common/mod.rs new file mode 100644 index 0000000000..52139f2032 --- /dev/null +++ b/crates/db-live-query/tests/common/mod.rs @@ -0,0 +1,201 @@ +#![allow(dead_code)] + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use db_live_query::{DbRuntime, QueryEventSink}; +use hypr_db_core2::{DbOpenOptions, DbStorage}; + +#[derive(Clone, Debug, PartialEq)] +pub enum TestEvent { + Result(Vec), + Error(String), +} + +#[derive(Clone)] +pub struct TestSink { + events: Arc>>, + fail_after: Option, + send_delay: Option, + send_block: Option, +} + +#[derive(Clone)] +struct SendBlock { + event_index: usize, + started: Arc, + release: Arc, +} + +#[derive(Clone)] +pub struct SendBlockHandle { + started: Arc, + release: Arc, +} + +impl QueryEventSink for TestSink { + fn send_result(&self, rows: Vec) -> std::result::Result<(), String> { + self.push(TestEvent::Result(rows)) + } + + fn send_error(&self, error: String) -> std::result::Result<(), String> { + self.push(TestEvent::Error(error)) + } +} + +impl TestSink { + pub fn capture() -> (Self, Arc>>) { + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + events: Arc::clone(&events), + fail_after: None, + send_delay: None, + send_block: None, + }, + events, + ) + } + + pub fn fail_after(limit: usize) -> (Self, Arc>>) { + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + events: Arc::clone(&events), + fail_after: Some(limit), + send_delay: None, + send_block: None, + }, + events, + ) + } + + pub fn with_delay(delay: Duration) -> (Self, Arc>>) { + let events = Arc::new(Mutex::new(Vec::new())); + ( + Self { + events: Arc::clone(&events), + fail_after: None, + send_delay: Some(delay), + send_block: None, + }, + events, + ) + } + + pub fn with_blocked_send( + event_index: usize, + ) -> (Self, Arc>>, SendBlockHandle) { + let events = Arc::new(Mutex::new(Vec::new())); + let started = Arc::new(AtomicBool::new(false)); + let release = Arc::new(AtomicBool::new(false)); + ( + Self { + events: Arc::clone(&events), + fail_after: None, + send_delay: None, + send_block: Some(SendBlock { + event_index, + started: Arc::clone(&started), + release: Arc::clone(&release), + }), + }, + events, + SendBlockHandle { started, release }, + ) + } + + fn push(&self, event: TestEvent) -> std::result::Result<(), String> { + if let Some(block) = &self.send_block { + let event_index = self.events.lock().unwrap().len(); + if event_index == block.event_index { + block.started.store(true, Ordering::SeqCst); + while !block.release.load(Ordering::SeqCst) { + std::thread::sleep(Duration::from_millis(1)); + } + } + } + if let Some(delay) = self.send_delay { + std::thread::sleep(delay); + } + let mut guard = self.events.lock().unwrap(); + if self.fail_after.is_some_and(|limit| guard.len() >= limit) { + return Err("sink closed".to_string()); + } + guard.push(event); + Ok(()) + } +} + +impl SendBlockHandle { + pub async fn wait_until_started(&self) { + tokio::time::timeout(Duration::from_secs(1), async { + while !self.started.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("blocked send should start"); + } + + pub fn release(&self) { + self.release.store(true, Ordering::SeqCst); + } +} + +pub async fn next_event( + events: &Arc>>, + index: usize, + timeout: Duration, +) -> anyhow::Result { + tokio::time::timeout(timeout, async { + loop { + if let Some(event) = events.lock().unwrap().get(index).cloned() { + return event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .map_err(anyhow::Error::from) +} + +pub async fn wait_for_stable_event_count( + events: &Arc>>, + stable_for: Duration, +) -> usize { + let mut last_len = events.lock().unwrap().len(); + loop { + tokio::time::sleep(stable_for).await; + let len = events.lock().unwrap().len(); + if len == last_len { + return len; + } + last_len = len; + } +} + +pub async fn setup_runtime() -> (tempfile::TempDir, sqlx::SqlitePool, DbRuntime) { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("app.db"); + let db = hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, + }, + hypr_db_app::schema(), + ) + .await + .unwrap(); + + let pool = db.pool().as_ref().clone(); + + (dir, pool, DbRuntime::new(std::sync::Arc::new(db))) +} diff --git a/crates/db-live-query/tests/runtime.rs b/crates/db-live-query/tests/runtime.rs index 365ad18d81..38644acbc2 100644 --- a/crates/db-live-query/tests/runtime.rs +++ b/crates/db-live-query/tests/runtime.rs @@ -1,201 +1,11 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +mod common; + use std::time::Duration; -use db_live_query::{DbRuntime, DependencyAnalysis, DependencyTarget, Error, QueryEventSink}; -use hypr_db_core2::{DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use common::{TestEvent, TestSink, next_event, setup_runtime, wait_for_stable_event_count}; +use db_live_query::{DependencyAnalysis, DependencyTarget, Error}; use serde_json::json; -#[derive(Clone, Debug, PartialEq)] -enum TestEvent { - Result(Vec), - Error(String), -} - -#[derive(Clone)] -struct TestSink { - events: Arc>>, - fail_after: Option, - send_delay: Option, - send_block: Option, -} - -#[derive(Clone)] -struct SendBlock { - event_index: usize, - started: Arc, - release: Arc, -} - -#[derive(Clone)] -struct SendBlockHandle { - started: Arc, - release: Arc, -} - -impl QueryEventSink for TestSink { - fn send_result(&self, rows: Vec) -> std::result::Result<(), String> { - self.push(TestEvent::Result(rows)) - } - - fn send_error(&self, error: String) -> std::result::Result<(), String> { - self.push(TestEvent::Error(error)) - } -} - -impl TestSink { - fn capture() -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(Vec::new())); - ( - Self { - events: Arc::clone(&events), - fail_after: None, - send_delay: None, - send_block: None, - }, - events, - ) - } - - fn fail_after(limit: usize) -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(Vec::new())); - ( - Self { - events: Arc::clone(&events), - fail_after: Some(limit), - send_delay: None, - send_block: None, - }, - events, - ) - } - - fn with_delay(delay: Duration) -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(Vec::new())); - ( - Self { - events: Arc::clone(&events), - fail_after: None, - send_delay: Some(delay), - send_block: None, - }, - events, - ) - } - - fn with_blocked_send( - event_index: usize, - ) -> (Self, Arc>>, SendBlockHandle) { - let events = Arc::new(Mutex::new(Vec::new())); - let started = Arc::new(AtomicBool::new(false)); - let release = Arc::new(AtomicBool::new(false)); - ( - Self { - events: Arc::clone(&events), - fail_after: None, - send_delay: None, - send_block: Some(SendBlock { - event_index, - started: Arc::clone(&started), - release: Arc::clone(&release), - }), - }, - events, - SendBlockHandle { started, release }, - ) - } - - fn push(&self, event: TestEvent) -> std::result::Result<(), String> { - if let Some(block) = &self.send_block { - let event_index = self.events.lock().unwrap().len(); - if event_index == block.event_index { - block.started.store(true, Ordering::SeqCst); - while !block.release.load(Ordering::SeqCst) { - std::thread::sleep(Duration::from_millis(1)); - } - } - } - if let Some(delay) = self.send_delay { - std::thread::sleep(delay); - } - let mut guard = self.events.lock().unwrap(); - if self.fail_after.is_some_and(|limit| guard.len() >= limit) { - return Err("sink closed".to_string()); - } - guard.push(event); - Ok(()) - } -} - -impl SendBlockHandle { - async fn wait_until_started(&self) { - tokio::time::timeout(Duration::from_secs(1), async { - while !self.started.load(Ordering::SeqCst) { - tokio::time::sleep(Duration::from_millis(5)).await; - } - }) - .await - .expect("blocked send should start"); - } - - fn release(&self) { - self.release.store(true, Ordering::SeqCst); - } -} - -async fn next_event( - events: &Arc>>, - index: usize, -) -> anyhow::Result { - tokio::time::timeout(Duration::from_secs(1), async { - loop { - if let Some(event) = events.lock().unwrap().get(index).cloned() { - return event; - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .map_err(anyhow::Error::from) -} - -async fn wait_for_stable_event_count( - events: &Arc>>, - stable_for: Duration, -) -> usize { - let mut last_len = events.lock().unwrap().len(); - loop { - tokio::time::sleep(stable_for).await; - let len = events.lock().unwrap().len(); - if len == last_len { - return len; - } - last_len = len; - } -} - -async fn setup_runtime() -> (tempfile::TempDir, sqlx::SqlitePool, DbRuntime) { - let dir = tempfile::tempdir().unwrap(); - let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) - .await - .unwrap(); - - let pool = db.pool().as_ref().clone(); - - (dir, pool, DbRuntime::new(std::sync::Arc::new(db))) -} - #[tokio::test] async fn subscribe_sends_initial_result() { let (_dir, _pool, runtime) = setup_runtime().await; @@ -210,7 +20,9 @@ async fn subscribe_sends_initial_result() { .await .unwrap(); - let event = next_event(&events, 0).await.unwrap(); + let event = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(event, TestEvent::Result(Vec::new())); } @@ -242,10 +54,14 @@ async fn initialization_defers_refresh_until_after_initial_snapshot() { subscribe.await.unwrap().unwrap(); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); - let refresh = next_event(&events, 1).await.unwrap(); + let refresh = next_event(&events, 1, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = refresh else { panic!("expected result event"); }; @@ -283,7 +99,9 @@ async fn initialization_suppresses_duplicate_catch_up_payloads() { subscribe.await.unwrap().unwrap(); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); tokio::time::sleep(Duration::from_millis(100)).await; @@ -361,7 +179,9 @@ async fn dependent_writes_trigger_refresh() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") .bind("note-1") @@ -372,7 +192,9 @@ async fn dependent_writes_trigger_refresh() { .await .unwrap(); - let event = next_event(&events, 1).await.unwrap(); + let event = next_event(&events, 1, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = event else { panic!("expected result event"); }; @@ -393,7 +215,9 @@ async fn open_transactions_do_not_refresh_until_commit() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); let mut tx = pool.begin().await.unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") @@ -410,7 +234,9 @@ async fn open_transactions_do_not_refresh_until_commit() { tx.commit().await.unwrap(); - let event = next_event(&events, 1).await.unwrap(); + let event = next_event(&events, 1, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = event else { panic!("expected result event"); }; @@ -431,7 +257,9 @@ async fn rollback_after_write_does_not_refresh() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); let mut tx = pool.begin().await.unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") @@ -472,7 +300,9 @@ async fn unrelated_writes_do_not_trigger_refresh() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); sqlx::query( "INSERT INTO daily_summaries (id, daily_note_id, date, content, timeline_json, topics_json, status, source_cursor_ms, source_fingerprint, generation_error, generated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", @@ -555,7 +385,9 @@ async fn fts_match_subscriptions_refresh_after_writes() { } ); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); sqlx::query("INSERT INTO docs_fts (title, body) VALUES (?, ?)") @@ -565,7 +397,9 @@ async fn fts_match_subscriptions_refresh_after_writes() { .await .unwrap(); - let refresh = next_event(&events, 1).await.unwrap(); + let refresh = next_event(&events, 1, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = refresh else { panic!("expected result event"); }; @@ -591,7 +425,9 @@ async fn virtual_table_created_after_runtime_start_is_discovered() { .await .unwrap(); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); sqlx::query("INSERT INTO docs_fts (title, body) VALUES (?, ?)") @@ -601,7 +437,9 @@ async fn virtual_table_created_after_runtime_start_is_discovered() { .await .unwrap(); - let refresh = next_event(&events, 1).await.unwrap(); + let refresh = next_event(&events, 1, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = refresh else { panic!("expected result event"); }; @@ -632,7 +470,9 @@ async fn unsupported_virtual_tables_are_explicitly_non_reactive() { DependencyAnalysis::NonReactive { .. } )); - let initial = next_event(&events, 0).await.unwrap(); + let initial = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert_eq!(initial, TestEvent::Result(Vec::new())); sqlx::query("INSERT INTO docs_rtree (id, min_x, max_x) VALUES (?, ?, ?)") @@ -661,7 +501,9 @@ async fn unsubscribe_stops_future_events() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); runtime.unsubscribe(®istration.id).await.unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") @@ -691,7 +533,9 @@ async fn unsubscribe_waits_for_in_flight_refresh_delivery() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") .bind("note-blocked-refresh") @@ -738,7 +582,9 @@ async fn invalid_sql_sends_error_event() { .await .unwrap(); - let event = next_event(&events, 0).await.unwrap(); + let event = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert!(matches!(event, TestEvent::Error(_))); } @@ -781,7 +627,9 @@ async fn extraction_failures_become_explicit_non_reactive_subscriptions() { .expect("subscription should exist"); assert!(matches!(analysis, DependencyAnalysis::NonReactive { .. })); - let event = next_event(&events, 0).await.unwrap(); + let event = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); assert!(matches!(event, TestEvent::Error(_))); } @@ -799,7 +647,9 @@ async fn stale_subscribers_are_removed_after_send_failures() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") .bind("note-stale") @@ -835,7 +685,9 @@ async fn lagged_broadcast_receiver_resyncs_and_keeps_dispatcher_alive() { .await .unwrap(); - let _ = next_event(&events, 0).await.unwrap(); + let _ = next_event(&events, 0, Duration::from_secs(1)) + .await + .unwrap(); for idx in 0..320 { sqlx::query("INSERT INTO daily_notes (id, date, body, user_id) VALUES (?, ?, ?, ?)") @@ -861,7 +713,9 @@ async fn lagged_broadcast_receiver_resyncs_and_keeps_dispatcher_alive() { .await .unwrap(); - let event = next_event(&events, before).await.unwrap(); + let event = next_event(&events, before, Duration::from_secs(1)) + .await + .unwrap(); let TestEvent::Result(rows) = event else { panic!("expected result event"); }; diff --git a/crates/db-migrate/AGENTS.md b/crates/db-migrate/AGENTS.md new file mode 100644 index 0000000000..a20ba4e4c1 --- /dev/null +++ b/crates/db-migrate/AGENTS.md @@ -0,0 +1,67 @@ +# `db-migrate` + +## Role + +- `db-migrate` is the app-database migration engine. +- It owns migration orchestration, migration history bookkeeping, and failure policy around opening a database that needs schema changes. +- It exists to keep schema declaration crates such as `db-app` focused on tables, types, ops, and migration manifests, while keeping CloudSync-sensitive migration mechanics in a core-adjacent layer. + +## Why This Crate Exists (why not sqlx's builtin migrator) + +sqlx's `Migrator::run()` applies SQL in a transaction and records it — but gives no hook to run per-connection setup/teardown around each DDL statement. CloudSync alter requires `cloudsync_begin_alter_on(conn, table)` → DDL → `cloudsync_commit_alter_on(conn, table)` **on the same connection**. sqlx's `apply()` owns the connection internally, so there's no way to inject these calls. + +The migration runner here reimplements the subset of sqlx's migrator that we need (checksum validation via SHA-384, history table, idempotent apply) while adding the CloudSync scope semantics. See `sqlx-core/src/migrate/` and `sqlx-sqlite/src/migrate.rs` in the sqlx repo for the upstream implementation this is based on. + +Other reasons this crate exists: +- `db-core2` is schema-agnostic substrate. It should open databases, manage pools, and expose CloudSync/SQLite primitives, but it should not know app schema history. +- `db-app` is schema declaration. It should define the CloudSync table registry and migration steps, but it should not own migration policy or retry/recreate behavior. +- CloudSync-backed schema changes introduce operational constraints that are stronger than ordinary SQLite migrations, so the runner needs to enforce them centrally instead of leaving each caller to remember them. + +## This Crate Owns + +- `AppDbOpenOptions` and migration failure policy. +- The open-and-migrate flow for app databases. +- The `app_migrations` history table for post-baseline migration steps. +- Execution of migration steps with explicit scope: + - `Plain` + - `CloudsyncAlter { table_name }` +- Validation that CloudSync alter steps only target tables declared as synced by the schema crate. +- The policy that CloudSync-enabled opens must not auto-recreate storage after migration failure. + +## This Crate Does Not Own + +- App table definitions, row types, or query/ops functions. +- The set of synced tables for a given app schema. +- Migration `.sql` files themselves (those live in the schema crate, embedded via `include_str!`). +- Raw SQLite/CloudSync connection setup and same-connection CloudSync alter helpers. That belongs in `db-core2`. + +## CloudSync Constraints + +Treat CloudSync-backed schema changes as a different class of migration from normal SQLite DDL. + +- For synced-table schema changes, the runner must use: + 1. `db-core2`'s connection-scoped `cloudsync_begin_alter_on(...)` + 2. run the DDL on the same checked-out connection + 3. `db-core2`'s connection-scoped `cloudsync_commit_alter_on(...)` +- Do not run `begin_alter` / DDL / `commit_alter` through a pool-level API that may hop connections. +- Do not hide CloudSync alter behavior behind SQL parsing or table-name inference. Migration steps must declare CloudSync scope explicitly. +- When CloudSync is disabled at open time, the same schema step may run without the alter wrapper so local and synced schemas remain structurally aligned. +- Automatic recreate-on-failure is forbidden for CloudSync-enabled opens. Wiping a synced database is not equivalent to recovering a local cache. + +## Design Rules + +- Keep the runner generic over schema providers. Schema crates should pass: + - migration step manifest (using `include_str!` for SQL, checksums computed at runtime via SHA-384) + - CloudSync table validator +- Prefer explicit step metadata over “magic” inspection. +- Add new migration policy here only when it is about migration execution semantics, not about schema meaning. +- If a future change only affects one app's schema contents, it probably belongs in that schema crate, not here. + +## Testing Ownership + +- Put tests here when behavior is about: + - migration history bookkeeping + - recreate/fail policy + - CloudSync alter-step validation + - open-time migration orchestration +- Do not test app-specific query behavior here. That belongs in the schema crate. diff --git a/crates/db-migrate/Cargo.toml b/crates/db-migrate/Cargo.toml new file mode 100644 index 0000000000..b5239ca661 --- /dev/null +++ b/crates/db-migrate/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "db-migrate" +version = "0.1.0" +edition = "2024" + +[dependencies] +hypr-cloudsync = { workspace = true } +hypr-db-core2 = { workspace = true } + +sha2 = "0.10" +sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled"] } +thiserror = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/crates/db-migrate/src/error.rs b/crates/db-migrate/src/error.rs new file mode 100644 index 0000000000..541914939d --- /dev/null +++ b/crates/db-migrate/src/error.rs @@ -0,0 +1,20 @@ +use hypr_db_core2::DbOpenError; + +#[derive(Debug, thiserror::Error)] +pub enum AppDbOpenError { + #[error(transparent)] + Open(#[from] DbOpenError), + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + #[error(transparent)] + Cloudsync(#[from] hypr_db_core2::Error), + #[error("cloudsync-enabled databases cannot recreate storage after migration failure")] + RecreateNotAllowedWithCloudsync, + #[error("migration step {step_id} checksum changed after it was applied")] + StepChecksumMismatch { step_id: &'static str }, + #[error("cloudsync alter step {step_id} targets non-synced table {table_name}")] + InvalidCloudsyncStep { + step_id: &'static str, + table_name: &'static str, + }, +} diff --git a/crates/db-migrate/src/lib.rs b/crates/db-migrate/src/lib.rs new file mode 100644 index 0000000000..da28cec8e7 --- /dev/null +++ b/crates/db-migrate/src/lib.rs @@ -0,0 +1,121 @@ +#![forbid(unsafe_code)] + +mod error; +mod migrate; +mod schema; + +pub use error::AppDbOpenError; +pub use schema::{ + AppDbOpenOptions, DbSchema, MigrationFailurePolicy, MigrationScope, MigrationStep, +}; + +use hypr_db_core2::{CloudsyncOpenMode, Db3, DbStorage}; + +pub async fn open_db( + options: AppDbOpenOptions<'_>, + schema: DbSchema, +) -> Result { + if options.db.cloudsync_open_mode == CloudsyncOpenMode::Enabled + && matches!( + options.migration_failure_policy, + MigrationFailurePolicy::Recreate + ) + { + return Err(AppDbOpenError::RecreateNotAllowedWithCloudsync); + } + + match try_open_db(options, schema).await { + Ok(db) => Ok(db), + Err(_error) + if matches!( + options.migration_failure_policy, + MigrationFailurePolicy::Recreate + ) && options.db.cloudsync_open_mode == CloudsyncOpenMode::Disabled + && matches!(options.db.storage, DbStorage::Local(_)) => + { + hypr_db_core2::recreate_storage(&options.db)?; + try_open_db(options, schema).await + } + Err(error) => Err(error), + } +} + +async fn try_open_db( + options: AppDbOpenOptions<'_>, + schema: DbSchema, +) -> Result { + let db = Db3::open(options.db).await?; + + if let Err(error) = migrate::run_migrations(&db, schema).await { + db.pool().clone().close().await; + return Err(error); + } + + Ok(db) +} + +#[cfg(test)] +mod tests { + use super::*; + use hypr_db_core2::DbOpenOptions; + + fn empty_schema() -> DbSchema { + DbSchema { + steps: &[], + validate_cloudsync_table: |_table| false, + } + } + + fn test_options<'a>( + storage: DbStorage<'a>, + cloudsync_open_mode: CloudsyncOpenMode, + ) -> AppDbOpenOptions<'a> { + AppDbOpenOptions { + db: DbOpenOptions { + storage, + cloudsync_open_mode, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: MigrationFailurePolicy::Fail, + } + } + + #[tokio::test] + async fn recreate_is_rejected_for_cloudsync_open_mode() { + let error = open_db( + AppDbOpenOptions { + migration_failure_policy: MigrationFailurePolicy::Recreate, + ..test_options(DbStorage::Memory, CloudsyncOpenMode::Enabled) + }, + empty_schema(), + ) + .await + .unwrap_err(); + + assert!(matches!( + error, + AppDbOpenError::RecreateNotAllowedWithCloudsync + )); + } + + #[tokio::test] + async fn open_db_bootstraps_app_migration_history() { + let db = open_db( + test_options(DbStorage::Memory, CloudsyncOpenMode::Disabled), + empty_schema(), + ) + .await + .unwrap(); + + let tables: Vec = sqlx::query_scalar( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_sqlx%' ORDER BY name", + ) + .fetch_all(db.pool().as_ref()) + .await + .unwrap(); + + assert!(tables.contains(&"app_migrations".to_string())); + } +} diff --git a/crates/db-migrate/src/migrate.rs b/crates/db-migrate/src/migrate.rs new file mode 100644 index 0000000000..1568d36628 --- /dev/null +++ b/crates/db-migrate/src/migrate.rs @@ -0,0 +1,125 @@ +use sha2::{Digest, Sha384}; + +use hypr_db_core2::{CloudsyncOpenMode, Db3}; + +use crate::error::AppDbOpenError; +use crate::schema::{DbSchema, MigrationScope, MigrationStep}; + +fn compute_checksum(sql: &str) -> String { + let hash = Sha384::digest(sql.as_bytes()); + hash.iter().map(|b| format!("{b:02x}")).collect() +} + +pub(crate) async fn run_migrations(db: &Db3, schema: DbSchema) -> Result<(), AppDbOpenError> { + ensure_app_migrations_table(db.pool().as_ref()).await?; + run_app_migration_steps(db, schema).await?; + Ok(()) +} + +async fn ensure_app_migrations_table(pool: &sqlx::SqlitePool) -> Result<(), sqlx::Error> { + sqlx::query( + "CREATE TABLE IF NOT EXISTS app_migrations ( + id TEXT PRIMARY KEY NOT NULL, + checksum TEXT NOT NULL, + applied_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + )", + ) + .execute(pool) + .await?; + Ok(()) +} + +async fn run_app_migration_steps(db: &Db3, schema: DbSchema) -> Result<(), AppDbOpenError> { + for step in schema.steps { + validate_step(schema, step)?; + + let checksum = compute_checksum(step.sql); + + let applied_checksum: Option = + sqlx::query_scalar("SELECT checksum FROM app_migrations WHERE id = ?") + .bind(step.id) + .fetch_optional(db.pool().as_ref()) + .await?; + + if let Some(applied_checksum) = applied_checksum { + if applied_checksum != checksum { + return Err(AppDbOpenError::StepChecksumMismatch { step_id: step.id }); + } + continue; + } + + match step.scope { + MigrationScope::Plain => run_plain_step(db.pool().as_ref(), step, &checksum).await?, + MigrationScope::CloudsyncAlter { table_name } => { + run_cloudsync_step(db, step, table_name, &checksum).await? + } + } + } + + Ok(()) +} + +fn validate_step(schema: DbSchema, step: &MigrationStep) -> Result<(), AppDbOpenError> { + let MigrationScope::CloudsyncAlter { table_name } = step.scope else { + return Ok(()); + }; + + if (schema.validate_cloudsync_table)(table_name) { + return Ok(()); + } + + Err(AppDbOpenError::InvalidCloudsyncStep { + step_id: step.id, + table_name, + }) +} + +async fn run_plain_step( + pool: &sqlx::SqlitePool, + step: &MigrationStep, + checksum: &str, +) -> Result<(), AppDbOpenError> { + let mut tx = pool.begin().await?; + sqlx::raw_sql(step.sql).execute(&mut *tx).await?; + record_step(&mut *tx, step, checksum).await?; + tx.commit().await?; + Ok(()) +} + +async fn run_cloudsync_step( + db: &Db3, + step: &MigrationStep, + table_name: &'static str, + checksum: &str, +) -> Result<(), AppDbOpenError> { + let mut conn = db.pool().acquire().await?; + + if db.cloudsync_open_mode() == CloudsyncOpenMode::Enabled { + hypr_db_core2::cloudsync_begin_alter_on(&mut *conn, table_name).await?; + } + + sqlx::raw_sql(step.sql).execute(&mut *conn).await?; + + if db.cloudsync_open_mode() == CloudsyncOpenMode::Enabled { + hypr_db_core2::cloudsync_commit_alter_on(&mut *conn, table_name).await?; + } + + record_step(&mut *conn, step, checksum).await?; + Ok(()) +} + +async fn record_step<'e, E>( + executor: E, + step: &MigrationStep, + checksum: &str, +) -> Result<(), sqlx::Error> +where + E: sqlx::Executor<'e, Database = sqlx::Sqlite>, +{ + sqlx::query("INSERT INTO app_migrations (id, checksum) VALUES (?, ?)") + .bind(step.id) + .bind(checksum) + .execute(executor) + .await?; + Ok(()) +} diff --git a/crates/db-migrate/src/schema.rs b/crates/db-migrate/src/schema.rs new file mode 100644 index 0000000000..621c588c65 --- /dev/null +++ b/crates/db-migrate/src/schema.rs @@ -0,0 +1,32 @@ +use hypr_db_core2::DbOpenOptions; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum MigrationFailurePolicy { + Fail, + Recreate, +} + +#[derive(Clone, Copy, Debug)] +pub struct AppDbOpenOptions<'a> { + pub db: DbOpenOptions<'a>, + pub migration_failure_policy: MigrationFailurePolicy, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum MigrationScope { + Plain, + CloudsyncAlter { table_name: &'static str }, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MigrationStep { + pub id: &'static str, + pub scope: MigrationScope, + pub sql: &'static str, +} + +#[derive(Clone, Copy)] +pub struct DbSchema { + pub steps: &'static [MigrationStep], + pub validate_cloudsync_table: fn(&str) -> bool, +} diff --git a/crates/mobile-bridge/Cargo.toml b/crates/mobile-bridge/Cargo.toml index 5ea465ac02..b24e110f54 100644 --- a/crates/mobile-bridge/Cargo.toml +++ b/crates/mobile-bridge/Cargo.toml @@ -11,6 +11,8 @@ name = "mobile_bridge" hypr-db-app = { workspace = true } hypr-db-core2 = { workspace = true } hypr-db-live-query = { workspace = true } +hypr-db-migrate = { workspace = true } + serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] } diff --git a/crates/mobile-bridge/src/db.rs b/crates/mobile-bridge/src/db.rs index 6be8579b70..09bcdab188 100644 --- a/crates/mobile-bridge/src/db.rs +++ b/crates/mobile-bridge/src/db.rs @@ -1,21 +1,24 @@ use std::path::PathBuf; -use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage}; +use hypr_db_migrate::AppDbOpenError; pub(crate) async fn open_app_db( db_path: &PathBuf, cloudsync_open_mode: CloudsyncOpenMode, -) -> Result { - Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(db_path), - cloudsync_open_mode, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, +) -> Result { + hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: DbOpenOptions { + storage: DbStorage::Local(db_path), + cloudsync_open_mode, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, }, - |pool| Box::pin(hypr_db_app::migrate(pool)), + hypr_db_app::schema(), ) .await } diff --git a/plugins/db/Cargo.toml b/plugins/db/Cargo.toml index a65f3e229d..6f8d2e6ffc 100644 --- a/plugins/db/Cargo.toml +++ b/plugins/db/Cargo.toml @@ -11,6 +11,7 @@ description = "" hypr-db-app = { workspace = true } hypr-db-core2 = { workspace = true } hypr-db-live-query = { workspace = true } +hypr-db-migrate = { workspace = true } hypr-storage = { workspace = true } hypr-tauri-utils = { workspace = true } diff --git a/plugins/db/src/error.rs b/plugins/db/src/error.rs index 0b7ef01779..2af9fa786f 100644 --- a/plugins/db/src/error.rs +++ b/plugins/db/src/error.rs @@ -7,6 +7,8 @@ pub enum Error { #[error(transparent)] Db(#[from] hypr_db_core2::DbOpenError), #[error(transparent)] + AppDb(#[from] hypr_db_migrate::AppDbOpenError), + #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] Sqlx(#[from] sqlx::Error), diff --git a/plugins/db/src/import/calendars.rs b/plugins/db/src/import/calendars.rs new file mode 100644 index 0000000000..7214f41358 --- /dev/null +++ b/plugins/db/src/import/calendars.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; +use std::path::Path; + +use hypr_db_app::UpsertCalendar; +use sqlx::SqlitePool; + +pub async fn import_legacy_calendars_from_path( + pool: &SqlitePool, + path: &Path, +) -> crate::Result<()> { + if !path.exists() { + return Ok(()); + } + + let calendars = read_calendars_file(path)?; + for calendar in calendars { + hypr_db_app::insert_calendar_if_missing( + pool, + UpsertCalendar { + id: &calendar.id, + tracking_id_calendar: &calendar.tracking_id_calendar, + name: &calendar.name, + enabled: calendar.enabled, + provider: &calendar.provider, + source: &calendar.source, + color: &calendar.color, + connection_id: &calendar.connection_id, + }, + ) + .await?; + } + + Ok(()) +} + +struct LegacyCalendar { + id: String, + tracking_id_calendar: String, + name: String, + enabled: bool, + provider: String, + source: String, + color: String, + connection_id: String, +} + +fn read_calendars_file(path: &Path) -> crate::Result> { + let content = std::fs::read_to_string(path)?; + let Ok(table) = serde_json::from_str::>(&content) else { + return Ok(Vec::new()); + }; + + let mut calendars = Vec::new(); + for (id, row) in table { + calendars.push(LegacyCalendar { + id, + tracking_id_calendar: row + .get("tracking_id_calendar") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(), + name: row + .get("name") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(), + enabled: row + .get("enabled") + .and_then(|value| value.as_bool()) + .unwrap_or(false), + provider: row + .get("provider") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(), + source: row + .get("source") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(), + color: row + .get("color") + .and_then(|value| value.as_str()) + .unwrap_or("#888") + .to_string(), + connection_id: row + .get("connection_id") + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string(), + }); + } + + Ok(calendars) +} + +#[cfg(test)] +mod tests { + use super::*; + use hypr_db_core2::Db3; + + async fn test_db() -> Db3 { + let db = Db3::connect_memory_plain().await.unwrap(); + hypr_db_app::migrate(db.pool()).await.unwrap(); + db + } + + #[tokio::test] + async fn import_legacy_calendars_from_path_imports_rows() { + let db = test_db().await; + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("calendars.json"); + + std::fs::write( + &path, + r##"{ + "cal-1": { + "tracking_id_calendar": "track-1", + "name": "Work", + "enabled": true, + "provider": "google", + "source": "team", + "color": "#111111", + "connection_id": "conn-1" + } + }"##, + ) + .unwrap(); + + import_legacy_calendars_from_path(db.pool(), &path) + .await + .unwrap(); + + let rows = hypr_db_app::list_calendars(db.pool()).await.unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "cal-1"); + assert_eq!(rows[0].name, "Work"); + } +} diff --git a/plugins/db/src/import/events.rs b/plugins/db/src/import/events.rs new file mode 100644 index 0000000000..04e73db7c8 --- /dev/null +++ b/plugins/db/src/import/events.rs @@ -0,0 +1,158 @@ +use std::collections::HashMap; +use std::path::Path; + +use hypr_db_app::UpsertEvent; +use sqlx::SqlitePool; + +pub async fn import_legacy_events_from_path(pool: &SqlitePool, path: &Path) -> crate::Result<()> { + if !path.exists() { + return Ok(()); + } + + let events = read_events_file(path)?; + for event in events { + hypr_db_app::insert_event_if_missing( + pool, + UpsertEvent { + id: &event.id, + tracking_id_event: &event.tracking_id_event, + calendar_id: &event.calendar_id, + title: &event.title, + started_at: &event.started_at, + ended_at: &event.ended_at, + location: &event.location, + meeting_link: &event.meeting_link, + description: &event.description, + note: &event.note, + recurrence_series_id: &event.recurrence_series_id, + has_recurrence_rules: event.has_recurrence_rules, + is_all_day: event.is_all_day, + provider: &event.provider, + participants_json: event.participants_json.as_deref(), + }, + ) + .await?; + } + + Ok(()) +} + +struct LegacyEvent { + id: String, + tracking_id_event: String, + calendar_id: String, + title: String, + started_at: String, + ended_at: String, + location: String, + meeting_link: String, + description: String, + note: String, + recurrence_series_id: String, + has_recurrence_rules: bool, + is_all_day: bool, + provider: String, + participants_json: Option, +} + +fn str_field(row: &serde_json::Value, key: &str) -> String { + row.get(key) + .and_then(|value| value.as_str()) + .unwrap_or("") + .to_string() +} + +fn bool_field(row: &serde_json::Value, key: &str) -> bool { + row.get(key) + .and_then(|value| value.as_bool()) + .unwrap_or(false) +} + +fn read_events_file(path: &Path) -> crate::Result> { + let content = std::fs::read_to_string(path)?; + let Ok(table) = serde_json::from_str::>(&content) else { + return Ok(Vec::new()); + }; + + let mut events = Vec::new(); + for (id, row) in table { + let participants_json = row + .get("participants") + .filter(|value| !value.is_null()) + .map(|value| value.to_string()); + + events.push(LegacyEvent { + id, + tracking_id_event: str_field(&row, "tracking_id_event"), + calendar_id: str_field(&row, "calendar_id"), + title: str_field(&row, "title"), + started_at: str_field(&row, "started_at"), + ended_at: str_field(&row, "ended_at"), + location: str_field(&row, "location"), + meeting_link: str_field(&row, "meeting_link"), + description: str_field(&row, "description"), + note: str_field(&row, "note"), + recurrence_series_id: str_field(&row, "recurrence_series_id"), + has_recurrence_rules: bool_field(&row, "has_recurrence_rules"), + is_all_day: bool_field(&row, "is_all_day"), + provider: str_field(&row, "provider"), + participants_json, + }); + } + + Ok(events) +} + +#[cfg(test)] +mod tests { + use super::*; + use hypr_db_core2::Db3; + + async fn test_db() -> Db3 { + let db = Db3::connect_memory_plain().await.unwrap(); + hypr_db_app::migrate(db.pool()).await.unwrap(); + db + } + + #[tokio::test] + async fn import_legacy_events_from_path_imports_rows_and_serializes_participants() { + let db = test_db().await; + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.json"); + + std::fs::write( + &path, + r#"{ + "evt-1": { + "tracking_id_event": "track-1", + "calendar_id": "cal-1", + "title": "Standup", + "started_at": "2026-04-15T09:00:00Z", + "ended_at": "2026-04-15T09:30:00Z", + "location": "", + "meeting_link": "https://meet.example/1", + "description": "Daily sync", + "note": "", + "recurrence_series_id": "series-1", + "has_recurrence_rules": true, + "is_all_day": false, + "provider": "google", + "participants": [{"email":"a@example.com"}] + } + }"#, + ) + .unwrap(); + + import_legacy_events_from_path(db.pool(), &path) + .await + .unwrap(); + + let rows = hypr_db_app::list_events(db.pool()).await.unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "evt-1"); + assert_eq!( + rows[0].participants_json.as_deref(), + Some(r#"[{"email":"a@example.com"}]"#) + ); + } +} diff --git a/plugins/db/src/import/mod.rs b/plugins/db/src/import/mod.rs index 3c27c49073..ab224ba313 100644 --- a/plugins/db/src/import/mod.rs +++ b/plugins/db/src/import/mod.rs @@ -1,18 +1,26 @@ +mod calendars; +mod events; mod templates; use std::path::PathBuf; use sqlx::SqlitePool; +use calendars::import_legacy_calendars_from_path; +use events::import_legacy_events_from_path; use templates::import_legacy_templates_from_path; +const CALENDARS_FILENAME: &str = "calendars.json"; +const EVENTS_FILENAME: &str = "events.json"; const TEMPLATES_FILENAME: &str = "templates.json"; -pub async fn import_legacy_templates( +pub async fn import_legacy_data( app: &tauri::AppHandle, pool: &SqlitePool, ) -> crate::Result<()> { let vault_base = resolve_startup_vault_base(app)?; + import_legacy_calendars_from_path(pool, &vault_base.join(CALENDARS_FILENAME)).await?; + import_legacy_events_from_path(pool, &vault_base.join(EVENTS_FILENAME)).await?; import_legacy_templates_from_path(pool, &vault_base.join(TEMPLATES_FILENAME)).await } diff --git a/plugins/db/src/lib.rs b/plugins/db/src/lib.rs index d33889c65b..cf6f93928a 100644 --- a/plugins/db/src/lib.rs +++ b/plugins/db/src/lib.rs @@ -47,8 +47,8 @@ pub fn init( .setup(move |app, _| { let pool = db.pool().clone(); let app_handle = app.app_handle().clone(); - hypr_tauri_utils::spawn("import legacy templates.json", async move { - import::import_legacy_templates(&app_handle, &pool).await + hypr_tauri_utils::spawn("import legacy tinybase json", async move { + import::import_legacy_data(&app_handle, &pool).await }); app.manage(std::sync::Arc::new(runtime::PluginDbRuntime::new(db))); Ok(()) @@ -118,16 +118,18 @@ mod test { async fn setup_runtime() -> (tempfile::TempDir, Arc) { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - hypr_db_core2::DbOpenOptions { - storage: hypr_db_core2::DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: hypr_db_core2::MigrationFailurePolicy::Fail, + let db = hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Local(&db_path), + cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, }, - |pool| Box::pin(hypr_db_app::migrate(pool)), + hypr_db_app::schema(), ) .await .unwrap(); diff --git a/plugins/db/src/runtime.rs b/plugins/db/src/runtime.rs index 3737ba6603..7a33ef737d 100644 --- a/plugins/db/src/runtime.rs +++ b/plugins/db/src/runtime.rs @@ -1,6 +1,6 @@ use std::path::Path; -use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage}; use hypr_db_live_query::QueryEventSink; use tauri::ipc::Channel; @@ -37,16 +37,18 @@ pub async fn open_app_db(db_path: Option<&Path>) -> Result { None => DbStorage::Memory, }; - let db = Db3::open_with_migrate( - DbOpenOptions { - storage, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, + let db = hypr_db_migrate::open_db( + hypr_db_migrate::AppDbOpenOptions { + db: DbOpenOptions { + storage, + cloudsync_open_mode: CloudsyncOpenMode::Disabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }, + migration_failure_policy: hypr_db_migrate::MigrationFailurePolicy::Fail, }, - |pool| Box::pin(hypr_db_app::migrate(pool)), + hypr_db_app::schema(), ) .await?;