diff --git a/Cargo.lock b/Cargo.lock index 29800d5924..2356fb8ba3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4717,6 +4717,7 @@ name = "db-app" version = "0.1.0" dependencies = [ "db-core2", + "db-migrate", "sqlx", "tokio", ] @@ -4754,8 +4755,8 @@ version = "0.1.0" dependencies = [ "anyhow", "cloudsync", - "db-app", "db-core2", + "db-migrate", "serde", "serde_json", "sqlx", @@ -4765,6 +4766,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "db-migrate" +version = "0.1.0" +dependencies = [ + "db-core2", + "sqlx", + "thiserror 2.0.18", + "tokio", +] + [[package]] name = "db-parser" version = "0.1.0" @@ -11546,6 +11557,7 @@ dependencies = [ "db-app", "db-core2", "db-live-query", + "db-migrate", "serde_json", "tempfile", "thiserror 2.0.18", @@ -18602,6 +18614,7 @@ dependencies = [ "db-app", "db-core2", "db-live-query", + "db-migrate", "serde", "serde_json", "specta", diff --git a/Cargo.toml b/Cargo.toml index d6daab62c1..41a1cc8669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ hypr-data = { path = "crates/data", package = "data" } hypr-db-app = { path = "crates/db-app", package = "db-app" } hypr-db-core2 = { path = "crates/db-core2", package = "db-core2" } hypr-db-live-query = { path = "crates/db-live-query", package = "db-live-query" } +hypr-db-migrate = { path = "crates/db-migrate", package = "db-migrate" } hypr-denoise = { path = "crates/denoise", package = "denoise" } hypr-detect = { path = "crates/detect", package = "detect" } hypr-device-monitor = { path = "crates/device-monitor", package = "device-monitor" } diff --git a/crates/cloudsync/src/api.rs b/crates/cloudsync/src/api.rs new file mode 100644 index 0000000000..f69d738084 --- /dev/null +++ b/crates/cloudsync/src/api.rs @@ -0,0 +1,175 @@ +use sqlx::{Executor, Sqlite}; + +use crate::error::Error; + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-version +pub async fn version<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_version()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-init +pub async fn init<'e, E>( + executor: E, + table_name: &str, + crdt_algo: Option<&str>, + force: Option, +) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite> + Copy, +{ + match (crdt_algo, force) { + (None, None) => { + sqlx::query("SELECT cloudsync_init(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + } + (Some(crdt_algo), None) => { + sqlx::query("SELECT cloudsync_init(?, ?)") + .bind(table_name) + .bind(crdt_algo) + .fetch_optional(executor) + .await?; + } + (None, Some(force)) => { + sqlx::query("SELECT cloudsync_init(?, NULL, ?)") + .bind(table_name) + .bind(force) + .fetch_optional(executor) + .await?; + } + (Some(crdt_algo), Some(force)) => { + sqlx::query("SELECT cloudsync_init(?, ?, ?)") + .bind(table_name) + .bind(crdt_algo) + .bind(force) + .fetch_optional(executor) + .await?; + } + } + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-begin-alter +pub async fn begin_alter<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_begin_alter(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-enable +pub async fn enable<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_enable(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-disable +pub async fn disable<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_disable(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-is-enabled +pub async fn is_enabled<'e, E>(executor: E, table_name: &str) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_is_enabled(?)") + .bind(table_name) + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-commit-alter +pub async fn commit_alter<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_commit_alter(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-cleanup +pub async fn cleanup<'e, E>(executor: E, table_name: &str) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_cleanup(?)") + .bind(table_name) + .fetch_optional(executor) + .await?; + + Ok(()) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-siteid +pub async fn siteid<'e, E>(executor: E) -> Result, Error> +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_siteid()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-db-version +pub async fn db_version<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_db_version()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-uuid +pub async fn uuid<'e, E>(executor: E) -> Result +where + E: Executor<'e, Database = Sqlite>, +{ + Ok(sqlx::query_scalar("SELECT cloudsync_uuid()") + .fetch_one(executor) + .await?) +} + +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-terminate +pub async fn terminate<'e, E>(executor: E) -> Result<(), Error> +where + E: Executor<'e, Database = Sqlite>, +{ + sqlx::query("SELECT cloudsync_terminate()") + .fetch_optional(executor) + .await?; + + Ok(()) +} diff --git a/crates/cloudsync/src/init.rs b/crates/cloudsync/src/init.rs deleted file mode 100644 index 99504938bb..0000000000 --- a/crates/cloudsync/src/init.rs +++ /dev/null @@ -1,84 +0,0 @@ -use sqlx::SqlitePool; - -use crate::error::Error; - -pub async fn version(pool: &SqlitePool) -> Result { - Ok(sqlx::query_scalar("SELECT cloudsync_version()") - .fetch_one(pool) - .await?) -} - -pub async fn init( - pool: &SqlitePool, - table_name: &str, - crdt_algo: Option<&str>, - force: Option, -) -> Result<(), Error> { - match (crdt_algo, force) { - (None, None) => { - sqlx::query("SELECT cloudsync_init(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - } - (Some(crdt_algo), None) => { - sqlx::query("SELECT cloudsync_init(?, ?)") - .bind(table_name) - .bind(crdt_algo) - .fetch_optional(pool) - .await?; - } - (None, Some(force)) => { - sqlx::query("SELECT cloudsync_init(?, NULL, ?)") - .bind(table_name) - .bind(force) - .fetch_optional(pool) - .await?; - } - (Some(crdt_algo), Some(force)) => { - sqlx::query("SELECT cloudsync_init(?, ?, ?)") - .bind(table_name) - .bind(crdt_algo) - .bind(force) - .fetch_optional(pool) - .await?; - } - } - - Ok(()) -} - -pub async fn begin_alter(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_begin_alter(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn commit_alter(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_commit_alter(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn cleanup(pool: &SqlitePool, table_name: &str) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_cleanup(?)") - .bind(table_name) - .fetch_optional(pool) - .await?; - - Ok(()) -} - -pub async fn terminate(pool: &SqlitePool) -> Result<(), Error> { - sqlx::query("SELECT cloudsync_terminate()") - .fetch_optional(pool) - .await?; - - Ok(()) -} diff --git a/crates/cloudsync/src/lib.rs b/crates/cloudsync/src/lib.rs index 8b8dec2de1..9f333ba15f 100644 --- a/crates/cloudsync/src/lib.rs +++ b/crates/cloudsync/src/lib.rs @@ -1,17 +1,20 @@ #![forbid(unsafe_code)] +mod api; mod bundle; mod error; -mod init; mod network; use std::path::PathBuf; use sqlx::sqlite::SqliteConnectOptions; +pub use api::{ + begin_alter, cleanup, commit_alter, db_version, disable, enable, init, is_enabled, siteid, + terminate, uuid, version, +}; pub use bundle::bundled_extension_path; pub use error::{Error, ErrorKind}; -pub use init::{begin_alter, cleanup, commit_alter, init, terminate, version}; pub use network::{ network_check_changes, network_cleanup, network_has_unsent_changes, network_init, network_logout, network_reset_sync_version, network_send_changes, network_set_apikey, diff --git a/crates/cloudsync/src/network.rs b/crates/cloudsync/src/network.rs index 1c88a35024..787b656802 100644 --- a/crates/cloudsync/src/network.rs +++ b/crates/cloudsync/src/network.rs @@ -2,6 +2,7 @@ use sqlx::SqlitePool; use crate::error::Error; +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-init pub async fn network_init(pool: &SqlitePool, connection_string: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_init(?)") .bind(connection_string) @@ -11,6 +12,7 @@ pub async fn network_init(pool: &SqlitePool, connection_string: &str) -> Result< Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-apikey pub async fn network_set_apikey(pool: &SqlitePool, api_key: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_set_apikey(?)") .bind(api_key) @@ -20,6 +22,7 @@ pub async fn network_set_apikey(pool: &SqlitePool, api_key: &str) -> Result<(), Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-set-token pub async fn network_set_token(pool: &SqlitePool, token: &str) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_set_token(?)") .bind(token) @@ -29,6 +32,7 @@ pub async fn network_set_token(pool: &SqlitePool, token: &str) -> Result<(), Err Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-cleanup pub async fn network_cleanup(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_cleanup()") .fetch_optional(pool) @@ -37,6 +41,7 @@ pub async fn network_cleanup(pool: &SqlitePool) -> Result<(), Error> { Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-has-unsent-changes pub async fn network_has_unsent_changes(pool: &SqlitePool) -> Result { Ok( sqlx::query_scalar("SELECT cloudsync_network_has_unsent_changes()") @@ -45,6 +50,7 @@ pub async fn network_has_unsent_changes(pool: &SqlitePool) -> Result, @@ -78,6 +84,7 @@ pub async fn network_send_changes( }) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-check-changes pub async fn network_check_changes( pool: &SqlitePool, wait_ms: Option, @@ -111,6 +118,7 @@ pub async fn network_check_changes( }) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-reset-sync-version pub async fn network_reset_sync_version(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_reset_sync_version()") .fetch_optional(pool) @@ -119,6 +127,7 @@ pub async fn network_reset_sync_version(pool: &SqlitePool) -> Result<(), Error> Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-logout pub async fn network_logout(pool: &SqlitePool) -> Result<(), Error> { sqlx::query("SELECT cloudsync_network_logout()") .fetch_optional(pool) @@ -127,6 +136,7 @@ pub async fn network_logout(pool: &SqlitePool) -> Result<(), Error> { Ok(()) } +/// https://docs.sqlitecloud.io/docs/sqlite-sync-api-cloudsync-network-sync pub async fn network_sync( pool: &SqlitePool, wait_ms: Option, diff --git a/crates/db-app/Cargo.toml b/crates/db-app/Cargo.toml index ddadf6f359..87ee789ae1 100644 --- a/crates/db-app/Cargo.toml +++ b/crates/db-app/Cargo.toml @@ -5,7 +5,9 @@ edition = "2024" [dependencies] hypr-db-core2 = { workspace = true } -sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled", "macros", "migrate"] } +hypr-db-migrate = { workspace = true } + +sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled", "macros"] } [features] default = [] diff --git a/crates/db-app/src/lib.rs b/crates/db-app/src/lib.rs index fb68c72c20..2157cb15b3 100644 --- a/crates/db-app/src/lib.rs +++ b/crates/db-app/src/lib.rs @@ -28,10 +28,24 @@ pub use event_types::*; pub use template_ops::*; pub use template_types::*; -use sqlx::SqlitePool; - -pub async fn migrate(pool: &SqlitePool) -> Result<(), sqlx::migrate::MigrateError> { - sqlx::migrate!("./migrations").run(pool).await +pub const APP_MIGRATION_STEPS: &[hypr_db_migrate::MigrationStep] = &[ + hypr_db_migrate::MigrationStep { + id: "20260413020000_templates", + scope: hypr_db_migrate::MigrationScope::Plain, + sql: include_str!("../migrations/20260413020000_templates.sql"), + }, + hypr_db_migrate::MigrationStep { + id: "20260414120000_calendars_events", + scope: hypr_db_migrate::MigrationScope::Plain, + sql: include_str!("../migrations/20260414120000_calendars_events.sql"), + }, +]; + +pub fn schema() -> hypr_db_migrate::DbSchema { + hypr_db_migrate::DbSchema { + steps: APP_MIGRATION_STEPS, + validate_cloudsync_table: cloudsync_alter_guard_required, + } } #[cfg(test)] @@ -41,11 +55,43 @@ mod tests { use sqlx::Row; async fn test_db() -> Db3 { - let db = Db3::connect_memory_plain().await.unwrap(); - migrate(db.pool()).await.unwrap(); + let db = Db3::open(hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + hypr_db_migrate::migrate(&db, schema()).await.unwrap(); db } + #[tokio::test] + async fn schema_declares_legacy_migrations_and_cloudsync_registry() { + let db = Db3::open(hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + hypr_db_migrate::migrate(&db, schema()).await.unwrap(); + + let tables: Vec = sqlx::query_scalar( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE '_sqlx%' ORDER BY name", + ) + .fetch_all(db.pool().as_ref()) + .await + .unwrap(); + + assert!(tables.contains(&"templates".to_string())); + assert!(tables.contains(&"_char_migrations".to_string())); + } + #[tokio::test] async fn migrations_apply_cleanly() { let db = test_db().await; @@ -62,16 +108,7 @@ mod tests { assert_eq!( tables, - vec![ - "activity_observation_analyses", - "activity_observation_events", - "activity_screenshots", - "calendars", - "daily_notes", - "daily_summaries", - "events", - "templates", - ] + vec!["_char_migrations", "calendars", "events", "templates"] ); } diff --git a/crates/db-core2/AGENTS.md b/crates/db-core2/AGENTS.md index 2314f3d4f7..92b586ca64 100644 --- a/crates/db-core2/AGENTS.md +++ b/crates/db-core2/AGENTS.md @@ -3,7 +3,7 @@ ## Role - `db-core2` is the database substrate layer. -- It owns `Db3`/`DbPool`, SQLite open options, pool lifecycle, migration failure policy, and per-connection SQLite wiring. +- It owns `Db3`/`DbPool`, SQLite open options, pool lifecycle, storage-recreation primitives, and per-connection SQLite wiring. - Raw SQLite hook integration belongs here, including `sqlite3_update_hook`. - Cloudsync integration also belongs here because it is part of how the database is opened and managed, not how queries are exposed to the app. - Higher layers should consume `Db3`/`DbPool` and raw table-change events from here instead of reimplementing pool setup. @@ -14,7 +14,8 @@ - Applying low-level SQLite pragmas and connection policy. - Installing per-connection hooks in `SqlitePoolOptions::after_connect`. - Exposing best-effort table-level mutation notifications for pooled writes. -- Database recreation behavior when migrations fail and policy requests it. +- Database recreation primitives that upper layers may invoke when their policy requests it. +- Connection-scoped CloudSync helpers that must run on one checked-out executor. - Keeping `DbPool` ergonomic as an `sqlx::SqlitePool` wrapper. ## This Crate Does Not Own @@ -33,6 +34,7 @@ - Change events are table-level, not row-level or predicate-level. - `DbPool` must continue to `Deref`/`AsRef` to `SqlitePool` so existing SQL callers stay ergonomic. - App code may supply a migration callback, but the crate must stay schema-agnostic. +- CloudSync operations that require executor affinity should be wrapped here so upper layers do not call `hypr_cloudsync` directly. - Reactive support must stay additive to normal database usage; callers that do not subscribe should see ordinary open/query behavior. ## Dependency Direction diff --git a/crates/db-core2/src/cloudsync.rs b/crates/db-core2/src/cloudsync.rs index dfb0dc0f1d..f51e1d619e 100644 --- a/crates/db-core2/src/cloudsync.rs +++ b/crates/db-core2/src/cloudsync.rs @@ -3,19 +3,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use backon::{ExponentialBuilder, Retryable}; use serde::{Deserialize, Serialize}; -use sqlx::SqlitePool; +use sqlx::{Executor, Sqlite, SqlitePool}; use tokio::sync::{broadcast, oneshot}; use tokio::task::JoinHandle; use crate::Db3; use crate::pool::TableChange; -#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CloudsyncOpenMode { - Disabled, - Enabled, -} #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -53,7 +47,7 @@ pub enum CloudsyncErrorKind { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct CloudsyncStatus { - pub open_mode: CloudsyncOpenMode, + pub cloudsync_enabled: bool, pub extension_loaded: bool, pub configured: bool, pub running: bool, @@ -72,6 +66,8 @@ pub enum CloudsyncRuntimeError { NotConfigured, #[error("cloudsync runtime is not started")] NotStarted, + #[error("cloudsync runtime is running; stop it first or use cloudsync_reconfigure")] + RestartRequired, #[error("cloudsync sync interval must be greater than 0")] InvalidSyncInterval, #[error(transparent)] @@ -163,8 +159,8 @@ impl std::fmt::Debug for CloudsyncBackgroundTask { } impl Db3 { - pub fn cloudsync_open_mode(&self) -> CloudsyncOpenMode { - self.cloudsync_open_mode + pub fn cloudsync_enabled(&self) -> bool { + self.cloudsync_enabled } pub fn has_cloudsync(&self) -> bool { @@ -186,13 +182,35 @@ impl Db3 { config: CloudsyncRuntimeConfig, ) -> Result<(), CloudsyncRuntimeError> { let mut runtime = self.cloudsync_runtime.lock().unwrap(); + if runtime.running { + return Err(CloudsyncRuntimeError::RestartRequired); + } runtime.config = Some(config.normalized()?); runtime.last_error = None; Ok(()) } + pub async fn cloudsync_reconfigure( + &self, + config: CloudsyncRuntimeConfig, + ) -> Result<(), CloudsyncRuntimeError> { + let was_running = self.cloudsync_runtime.lock().unwrap().running; + + if was_running { + self.cloudsync_stop().await?; + } + + self.cloudsync_configure(config)?; + + if was_running { + self.cloudsync_start().await?; + } + + Ok(()) + } + pub async fn cloudsync_start(&self) -> Result<(), CloudsyncRuntimeError> { - if self.cloudsync_open_mode == CloudsyncOpenMode::Disabled { + if !self.cloudsync_enabled { let mut runtime = self.cloudsync_runtime.lock().unwrap(); runtime.running = false; runtime.network_initialized = false; @@ -270,7 +288,7 @@ impl Db3 { let _ = task.join_handle.await; } - if self.cloudsync_open_mode == CloudsyncOpenMode::Disabled { + if !self.cloudsync_enabled { let mut runtime = self.cloudsync_runtime.lock().unwrap(); runtime.network_initialized = false; runtime.last_error = None; @@ -317,14 +335,14 @@ impl Db3 { }; let has_unsent_changes = - if self.cloudsync_open_mode == CloudsyncOpenMode::Enabled && network_initialized { + if self.cloudsync_enabled && network_initialized { Some(self.cloudsync_network_has_unsent_changes().await?) } else { None }; Ok(CloudsyncStatus { - open_mode: self.cloudsync_open_mode, + cloudsync_enabled: self.cloudsync_enabled, extension_loaded: self.has_cloudsync(), configured: config.is_some(), running, @@ -339,7 +357,7 @@ impl Db3 { } pub async fn cloudsync_trigger_sync(&self) -> Result { - if self.cloudsync_open_mode == CloudsyncOpenMode::Disabled { + if !self.cloudsync_enabled { let mut runtime = self.cloudsync_runtime.lock().unwrap(); runtime.last_error = None; return Ok(0); @@ -404,14 +422,14 @@ impl Db3 { &self, table_name: &str, ) -> Result<(), hypr_cloudsync::Error> { - hypr_cloudsync::begin_alter(self.pool.as_ref(), table_name).await + cloudsync_begin_alter_on(self.pool.as_ref(), table_name).await } pub async fn cloudsync_commit_alter( &self, table_name: &str, ) -> Result<(), hypr_cloudsync::Error> { - hypr_cloudsync::commit_alter(self.pool.as_ref(), table_name).await + cloudsync_commit_alter_on(self.pool.as_ref(), table_name).await } pub async fn cloudsync_cleanup(&self, table_name: &str) -> Result<(), hypr_cloudsync::Error> { @@ -485,6 +503,26 @@ impl Db3 { } } +pub async fn cloudsync_begin_alter_on<'e, E>( + executor: E, + table_name: &str, +) -> Result<(), hypr_cloudsync::Error> +where + E: Executor<'e, Database = Sqlite>, +{ + hypr_cloudsync::begin_alter(executor, table_name).await +} + +pub async fn cloudsync_commit_alter_on<'e, E>( + executor: E, + table_name: &str, +) -> Result<(), hypr_cloudsync::Error> +where + E: Executor<'e, Database = Sqlite>, +{ + hypr_cloudsync::commit_alter(executor, table_name).await +} + const MAX_BACKOFF_SECS: u64 = 300; async fn cloudsync_background_loop( diff --git a/crates/db-core2/src/lib.rs b/crates/db-core2/src/lib.rs index 5c90245325..546f5d4eb3 100644 --- a/crates/db-core2/src/lib.rs +++ b/crates/db-core2/src/lib.rs @@ -1,21 +1,18 @@ mod cloudsync; mod pool; -use std::future::Future; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; pub use hypr_cloudsync::Error; -use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use crate::cloudsync::CloudsyncRuntimeState; pub use crate::cloudsync::{ - CloudsyncAuth, CloudsyncOpenMode, CloudsyncRuntimeConfig, CloudsyncRuntimeError, - CloudsyncStatus, CloudsyncTableSpec, + CloudsyncAuth, CloudsyncRuntimeConfig, CloudsyncRuntimeError, CloudsyncStatus, + CloudsyncTableSpec, cloudsync_begin_alter_on, cloudsync_commit_alter_on, }; use crate::pool::connect_pool; pub use crate::pool::{DbPool, TableChange, TableChangeKind}; @@ -26,20 +23,13 @@ pub enum DbStorage<'a> { Memory, } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum MigrationFailurePolicy { - Fail, - Recreate, -} - #[derive(Clone, Copy, Debug)] pub struct DbOpenOptions<'a> { pub storage: DbStorage<'a>, - pub cloudsync_open_mode: CloudsyncOpenMode, + pub cloudsync_enabled: bool, pub journal_mode_wal: bool, pub foreign_keys: bool, pub max_connections: Option, - pub migration_failure_policy: MigrationFailurePolicy, } #[derive(Debug, thiserror::Error)] @@ -50,20 +40,14 @@ pub enum DbOpenError { Sqlx(#[from] sqlx::Error), #[error(transparent)] Cloudsync(#[from] hypr_cloudsync::Error), - #[error("migration failed: {0}")] - Migration(String), - #[error("failed to recreate database after migration failure: {0}")] - RecreateFailed(String), } pub type ManagedDb = std::sync::Arc; -type BoxedMigrationFuture<'a, E> = Pin> + Send + 'a>>; - const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(5); pub struct Db3 { - pub(crate) cloudsync_open_mode: CloudsyncOpenMode, + pub(crate) cloudsync_enabled: bool, pub(crate) cloudsync_path: Option, pub(crate) cloudsync_runtime: Arc>, pub(crate) pool: DbPool, @@ -73,39 +57,34 @@ impl std::fmt::Debug for Db3 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let runtime = self.cloudsync_runtime.lock().unwrap(); f.debug_struct("Db3") - .field("cloudsync_open_mode", &self.cloudsync_open_mode) + .field("cloudsync_enabled", &self.cloudsync_enabled) .field("cloudsync_path", &self.cloudsync_path) .field("cloudsync_runtime", &*runtime) .finish_non_exhaustive() } } -impl Db3 { - pub async fn open_with_migrate( - options: DbOpenOptions<'_>, - migrate: F, - ) -> Result - where - F: for<'a> Fn(&'a SqlitePool) -> BoxedMigrationFuture<'a, E>, - E: std::fmt::Display, - { - match try_open_with_migrate(&options, &migrate).await { - Ok(db) => Ok(db), - Err(DbOpenError::Migration(message)) - if matches!( - options.migration_failure_policy, - MigrationFailurePolicy::Recreate - ) => - { - tracing::warn!("database migration failed, recreating fresh database: {message}"); - recreate_storage(&options)?; - try_open_with_migrate(&options, &migrate) - .await - .map_err(|error| DbOpenError::RecreateFailed(error.to_string())) +impl Drop for Db3 { + fn drop(&mut self) { + let task = { + let mut runtime = self.cloudsync_runtime.lock().unwrap(); + runtime.running = false; + runtime.task.take() + }; + + if let Some(mut task) = task { + if let Some(shutdown_tx) = task.shutdown_tx.take() { + let _ = shutdown_tx.send(()); } - Err(error) => Err(error), + task.join_handle.abort(); } } +} + +impl Db3 { + pub async fn open(options: DbOpenOptions<'_>) -> Result { + connect_with_options(&options).await + } pub async fn connect_local(path: impl AsRef) -> Result { if let Some(parent) = path.as_ref().parent() { @@ -118,7 +97,7 @@ impl Db3 { let pool = connect_pool(options, None).await.map_err(Error::from)?; Ok(Self { - cloudsync_open_mode: CloudsyncOpenMode::Enabled, + cloudsync_enabled: true, cloudsync_path: Some(cloudsync_path), cloudsync_runtime: Arc::new(Mutex::new(CloudsyncRuntimeState::default())), pool, @@ -132,7 +111,7 @@ impl Db3 { let pool = connect_pool(options, Some(1)).await.map_err(Error::from)?; Ok(Self { - cloudsync_open_mode: CloudsyncOpenMode::Enabled, + cloudsync_enabled: true, cloudsync_path: Some(cloudsync_path), cloudsync_runtime: Arc::new(Mutex::new(CloudsyncRuntimeState::default())), pool, @@ -150,7 +129,7 @@ impl Db3 { let pool = connect_pool(options, None).await?; Ok(Self { - cloudsync_open_mode: CloudsyncOpenMode::Disabled, + cloudsync_enabled: false, cloudsync_path: None, cloudsync_runtime: Arc::new(Mutex::new(CloudsyncRuntimeState::default())), pool, @@ -164,7 +143,7 @@ impl Db3 { let pool = connect_pool(options, Some(1)).await?; Ok(Self { - cloudsync_open_mode: CloudsyncOpenMode::Disabled, + cloudsync_enabled: false, cloudsync_path: None, cloudsync_runtime: Arc::new(Mutex::new(CloudsyncRuntimeState::default())), pool, @@ -176,24 +155,6 @@ impl Db3 { } } -async fn try_open_with_migrate( - options: &DbOpenOptions<'_>, - migrate: &F, -) -> Result -where - F: for<'a> Fn(&'a SqlitePool) -> BoxedMigrationFuture<'a, E>, - E: std::fmt::Display, -{ - let db = connect_with_options(options).await?; - - if let Err(error) = migrate(db.pool()).await { - db.pool.clone().close().await; - return Err(DbOpenError::Migration(error.to_string())); - } - - Ok(db) -} - async fn connect_with_options(options: &DbOpenOptions<'_>) -> Result { let mut connect_options = match options.storage { DbStorage::Local(path) => { @@ -217,7 +178,7 @@ async fn connect_with_options(options: &DbOpenOptions<'_>) -> Result) -> Result Sqlit connect_options.busy_timeout(SQLITE_BUSY_TIMEOUT) } -fn recreate_storage(options: &DbOpenOptions<'_>) -> Result<(), DbOpenError> { - match options.storage { - DbStorage::Local(path) => { - wipe_db_file(path); - if options.cloudsync_open_mode == CloudsyncOpenMode::Enabled { - let connect_options = SqliteConnectOptions::new().filename(path); - let (_, cloudsync_path) = hypr_cloudsync::apply(connect_options)?; - wipe_db_file(&cloudsync_path); - } - } - DbStorage::Memory => {} - } - - Ok(()) -} - -fn wipe_db_file(path: &Path) { - for suffix in ["", "-wal", "-shm", "-journal"] { - let file = PathBuf::from(format!("{}{suffix}", path.display())); - if file.exists() { - let _ = std::fs::remove_file(file); - } - } -} - #[cfg(test)] mod tests { use super::*; - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + use tokio::sync::oneshot; fn test_cloudsync_config() -> CloudsyncRuntimeConfig { CloudsyncRuntimeConfig { @@ -298,129 +237,17 @@ mod tests { } #[tokio::test] - async fn open_with_migrate_recreates_local_db_when_requested() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("app.db"); - let attempts = AtomicUsize::new(0); - - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Recreate, - }, - |pool| { - let n = attempts.fetch_add(1, Ordering::SeqCst); - Box::pin(async move { - if n == 0 { - sqlx::query("CREATE TABLE broken (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Err("boom") - } else { - sqlx::query("CREATE TABLE fresh (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Ok::<(), &'static str>(()) - } - }) - }, - ) - .await - .unwrap(); - - let tables: Vec = sqlx::query_as::<_, (String,)>( - "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name", - ) - .fetch_all(db.pool().as_ref()) - .await - .unwrap() - .into_iter() - .map(|row| row.0) - .collect(); - - assert_eq!(attempts.load(Ordering::SeqCst), 2); - assert_eq!(tables, vec!["fresh"]); - } - - #[tokio::test] - async fn open_with_migrate_returns_migration_error_when_fail_policy_is_used() { - let error = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Err::<(), _>("nope") }), - ) - .await - .unwrap_err(); - - assert!(matches!(error, DbOpenError::Migration(message) if message == "nope")); - } - - #[tokio::test] - async fn open_with_migrate_returns_recreate_failed_when_retry_also_fails() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("app.db"); - let attempts = AtomicUsize::new(0); - - let error = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Recreate, - }, - |pool| { - let n = attempts.fetch_add(1, Ordering::SeqCst); - Box::pin(async move { - let table_name = if n == 0 { - "first_attempt" - } else { - "second_attempt" - }; - let sql = format!("CREATE TABLE {table_name} (id TEXT PRIMARY KEY NOT NULL)"); - sqlx::query(&sql).execute(pool).await.unwrap(); - Err::<(), &'static str>("still broken") - }) - }, - ) - .await - .unwrap_err(); - - assert_eq!(attempts.load(Ordering::SeqCst), 2); - assert!( - matches!(error, DbOpenError::RecreateFailed(message) if message == "migration failed: still broken") - ); - } - - #[tokio::test] - async fn open_with_migrate_applies_requested_pragmas() { + async fn open_applies_requested_pragmas() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("app.db"); - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) .await .unwrap(); @@ -444,21 +271,17 @@ mod tests { #[tokio::test] async fn disabled_open_mode_keeps_cloudsync_inert() { - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(1), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(1), + }) .await .unwrap(); - assert_eq!(db.cloudsync_open_mode(), CloudsyncOpenMode::Disabled); + assert!(!db.cloudsync_enabled()); assert!(!db.has_cloudsync()); db.cloudsync_configure(test_cloudsync_config()).unwrap(); @@ -469,7 +292,7 @@ mod tests { assert!(!status.extension_loaded); assert!(!status.running); assert!(!status.network_initialized); - assert_eq!(status.open_mode, CloudsyncOpenMode::Disabled); + assert!(!status.cloudsync_enabled); db.cloudsync_stop().await.unwrap(); } @@ -482,6 +305,103 @@ mod tests { assert!(matches!(error, CloudsyncRuntimeError::NotConfigured)); } + #[tokio::test] + async fn configure_rejects_live_runtime_changes() { + let db = Db3::connect_memory_plain().await.unwrap(); + db.cloudsync_configure(test_cloudsync_config()).unwrap(); + db.cloudsync_runtime.lock().unwrap().running = true; + + let error = db + .cloudsync_configure(CloudsyncRuntimeConfig { + connection_string: "sqlitecloud://demo.invalid/other.db?apikey=demo".to_string(), + ..test_cloudsync_config() + }) + .unwrap_err(); + + assert!(matches!(error, CloudsyncRuntimeError::RestartRequired)); + assert_eq!( + db.cloudsync_runtime + .lock() + .unwrap() + .config + .as_ref() + .unwrap() + .connection_string, + "sqlitecloud://demo.invalid/app.db?apikey=demo" + ); + } + + #[tokio::test] + async fn reconfigure_preserves_stopped_state_when_runtime_is_inert() { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + db.cloudsync_configure(test_cloudsync_config()).unwrap(); + { + let mut runtime = db.cloudsync_runtime.lock().unwrap(); + runtime.running = true; + runtime.network_initialized = true; + } + + let next_config = CloudsyncRuntimeConfig { + connection_string: "sqlitecloud://demo.invalid/reconfigured.db?apikey=demo".to_string(), + sync_interval_ms: 2_000, + ..test_cloudsync_config() + }; + + db.cloudsync_reconfigure(next_config.clone()).await.unwrap(); + + let runtime = db.cloudsync_runtime.lock().unwrap(); + assert_eq!(runtime.config, Some(next_config)); + assert!(!runtime.running); + assert!(!runtime.network_initialized); + } + + #[tokio::test] + async fn dropping_db_stops_background_task_best_effort() { + struct DropFlag(Arc); + + impl Drop for DropFlag { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let db = Db3::connect_memory_plain().await.unwrap(); + let dropped = Arc::new(AtomicBool::new(false)); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let guard = DropFlag(Arc::clone(&dropped)); + let join_handle = tokio::spawn(async move { + let _guard = guard; + let _ = shutdown_rx.await; + }); + + { + let mut runtime = db.cloudsync_runtime.lock().unwrap(); + runtime.running = true; + runtime.task = Some(crate::cloudsync::CloudsyncBackgroundTask { + shutdown_tx: Some(shutdown_tx), + join_handle, + }); + } + + drop(db); + + tokio::time::timeout(std::time::Duration::from_secs(1), async { + while !dropped.load(Ordering::SeqCst) { + tokio::task::yield_now().await; + } + }) + .await + .unwrap(); + } + #[tokio::test] async fn emits_table_changes_for_local_writes() { let db = Db3::connect_memory_plain().await.unwrap(); @@ -661,27 +581,19 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("app.db"); - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&path), - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| { - Box::pin(async move { - sqlx::query("CREATE TABLE multi_conn_events (id TEXT PRIMARY KEY NOT NULL)") - .execute(pool) - .await - .unwrap(); - Ok::<(), sqlx::Error>(()) - }) - }, - ) + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(&path), + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); + sqlx::query("CREATE TABLE multi_conn_events (id TEXT PRIMARY KEY NOT NULL)") + .execute(db.pool().as_ref()) + .await + .unwrap(); let mut changes = db.subscribe_table_changes(); let mut conn_a = db.pool().acquire().await.unwrap(); @@ -768,18 +680,14 @@ mod tests { } #[tokio::test] - async fn open_with_migrate_memory_clamps_max_connections_to_one() { - let db = Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Memory, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: false, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |_pool| Box::pin(async { Ok::<(), sqlx::Error>(()) }), - ) + async fn open_memory_clamps_max_connections_to_one() { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: false, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); diff --git a/crates/db-live-query/Cargo.toml b/crates/db-live-query/Cargo.toml index a70d0c7d57..e374da7d30 100644 --- a/crates/db-live-query/Cargo.toml +++ b/crates/db-live-query/Cargo.toml @@ -13,8 +13,9 @@ tokio = { workspace = true, features = ["macros", "sync", "time"] } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] -anyhow = { workspace = true } hypr-cloudsync = { workspace = true } -hypr-db-app = { workspace = true } +hypr-db-migrate = { workspace = true } + +anyhow = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "time"] } diff --git a/crates/db-live-query/src/runtime.rs b/crates/db-live-query/src/runtime.rs index 64969f84a8..8ac7503e0d 100644 --- a/crates/db-live-query/src/runtime.rs +++ b/crates/db-live-query/src/runtime.rs @@ -318,7 +318,7 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; - use hypr_db_core2::{DbOpenOptions, DbStorage, MigrationFailurePolicy}; + use hypr_db_core2::{DbOpenOptions, DbStorage}; use serde_json::json; use super::*; @@ -350,7 +350,12 @@ mod tests { impl TestSink { fn capture() -> (Self, Arc>>) { let events = Arc::new(Mutex::new(Vec::new())); - (Self { events: Arc::clone(&events) }, events) + ( + Self { + events: Arc::clone(&events), + }, + events, + ) } } @@ -358,19 +363,18 @@ mod tests { async fn stale_init_time_broadcast_processed_after_activation_is_ignored() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) + let db = hypr_db_core2::Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); + hypr_db_migrate::migrate(&db, hypr_db_app::schema()) + .await + .unwrap(); let pool = db.pool().as_ref().clone(); let runtime = DbRuntime::new(Arc::new(db)); diff --git a/crates/db-live-query/tests/common/mod.rs b/crates/db-live-query/tests/common/mod.rs index 344b04ad36..32c8cdc880 100644 --- a/crates/db-live-query/tests/common/mod.rs +++ b/crates/db-live-query/tests/common/mod.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use db_live_query::{DbRuntime, QueryEventSink}; -use hypr_db_core2::{DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use hypr_db_core2::{DbOpenOptions, DbStorage}; #[derive(Clone, Debug, PartialEq)] pub enum TestEvent { @@ -179,19 +179,18 @@ pub async fn wait_for_stable_event_count( pub async fn setup_runtime() -> (tempfile::TempDir, sqlx::SqlitePool, DbRuntime) { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) + let db = hypr_db_core2::Db3::open(DbOpenOptions { + storage: DbStorage::Local(&db_path), + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); + hypr_db_migrate::migrate(&db, hypr_db_app::schema()) + .await + .unwrap(); let pool = db.pool().as_ref().clone(); diff --git a/crates/db-migrate/AGENTS.md b/crates/db-migrate/AGENTS.md new file mode 100644 index 0000000000..15929002ad --- /dev/null +++ b/crates/db-migrate/AGENTS.md @@ -0,0 +1,115 @@ +# `db-migrate` + +## Purpose + +`db-migrate` owns app-database migration execution. + +- Input: a checked-open `Db3` from `db-core2` plus a schema manifest from a schema crate such as `db-app` +- Output: schema changes applied and recorded in `_sqlx_migrations` + +This crate exists to keep: + +- `db-core2` focused on database opening, pooling, and SQLite/CloudSync primitives +- schema crates focused on migration manifests and table meaning +- CloudSync-sensitive migration mechanics enforced in one place + +## Model + +Treat this crate as a narrow port of `sqlx` migration behavior, not as a custom migration system. + +It should preserve the usual `sqlx` SQLite semantics: + +- `_sqlx_migrations` history table +- ordered apply +- checksum validation +- dirty-version detection +- idempotent re-run behavior + +The only intentional divergence is explicit per-step scope: + +- `Plain` +- `CloudsyncAlter { table_name }` + +## Why Not Just Use `sqlx::Migrator` + +Built-in `sqlx` migrator logic is close, but it does not expose a hook for: + +1. `cloudsync_begin_alter_on(conn, table)` +2. run DDL on that same `conn` +3. `cloudsync_commit_alter_on(conn, table)` + +For ordinary SQLite migrations, pool-level execution is fine. +For CloudSync alter steps, it is not. + +The invariant is: + +```text +same checked-out connection: + begin_alter + DDL + commit_alter +``` + +Not: + +```text +pool: + begin_alter -> conn A + DDL -> conn B + commit -> conn C +``` + +`max_connections = 1` is not a real substitute. The requirement is explicit ownership of one connection across the whole alter protocol, not merely "the pool only has one connection available right now." + +## API + +```rust +pub async fn migrate(db: &Db3, schema: DbSchema) -> Result<(), MigrateError> +``` + +Callers must open the database first. This crate does not own connection setup or storage configuration. + +## Ownership Boundary + +This crate owns: + +- migration orchestration +- translation from `MigrationStep` to `sqlx::migrate::Migration` +- validation of step ids, duplicate versions, and CloudSync-target eligibility +- execution semantics for `Plain` vs `CloudsyncAlter` +- `_sqlx_migrations` bookkeeping + +This crate does not own: + +- pool creation or database opening +- CloudSync extension loading or network/runtime setup +- app table definitions, row types, or query APIs +- migration SQL contents +- inference of whether a step "looks like" a CloudSync alter + +If a change is about schema meaning, it probably belongs in the schema crate. +If a change is about how migrations are executed, it probably belongs here. + +## Rules + +- Keep behavior as close to upstream `sqlx` as possible. +- Add divergence only when CloudSync or connection-control requirements force it. +- Make migration scope explicit in the manifest; do not infer it from SQL text. +- For `CloudsyncAlter`, use connection-scoped helpers from `db-core2`, never pool-level wrappers. +- When CloudSync is disabled, the same `CloudsyncAlter` step should fall back to normal SQLite execution so local and synced schemas stay aligned. +- Prefer a small, auditable port over a growing custom framework. + +## Testing + +Tests here should cover migration execution semantics, not app behavior. + +Keep coverage focused on: + +- plain migration parity with `sqlx` expectations +- idempotent re-runs +- checksum/version validation failures +- manifest validation failures +- CloudSync alter behavior on one checked-out connection +- CloudSync-disabled fallback for `CloudsyncAlter` + +Do not put app-specific query or domain tests here. diff --git a/crates/db-migrate/Cargo.toml b/crates/db-migrate/Cargo.toml new file mode 100644 index 0000000000..322312f4bb --- /dev/null +++ b/crates/db-migrate/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "db-migrate" +version = "0.1.0" +edition = "2024" + +[dependencies] +hypr-db-core2 = { workspace = true } + +sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "sqlite-unbundled", "migrate"] } +thiserror = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/crates/db-migrate/src/error.rs b/crates/db-migrate/src/error.rs new file mode 100644 index 0000000000..004e0d396a --- /dev/null +++ b/crates/db-migrate/src/error.rs @@ -0,0 +1,22 @@ +#[derive(Debug, thiserror::Error)] +pub enum MigrateError { + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + #[error(transparent)] + SqlxMigrate(#[from] sqlx::migrate::MigrateError), + #[error( + "migration step id {step_id} must match _ with a positive integer version" + )] + InvalidStepId { step_id: &'static str }, + #[error("migration version {version} is declared by both {first_step_id} and {second_step_id}")] + DuplicateStepVersion { + version: i64, + first_step_id: &'static str, + second_step_id: &'static str, + }, + #[error("cloudsync alter step {step_id} targets non-synced table {table_name}")] + InvalidCloudsyncStep { + step_id: &'static str, + table_name: &'static str, + }, +} diff --git a/crates/db-migrate/src/lib.rs b/crates/db-migrate/src/lib.rs new file mode 100644 index 0000000000..39497e775a --- /dev/null +++ b/crates/db-migrate/src/lib.rs @@ -0,0 +1,51 @@ +#![forbid(unsafe_code)] + +mod error; +mod migrate; +mod schema; + +pub use error::MigrateError; +pub use schema::{DbSchema, MigrationScope, MigrationStep}; + +use hypr_db_core2::Db3; + +pub async fn migrate(db: &Db3, schema: DbSchema) -> Result<(), MigrateError> { + migrate::run_migrations(db, schema).await +} + +#[cfg(test)] +mod tests { + use super::*; + use hypr_db_core2::{DbOpenOptions, DbStorage}; + + fn empty_schema() -> DbSchema { + DbSchema { + steps: &[], + validate_cloudsync_table: |_table| false, + } + } + + #[tokio::test] + async fn migrate_bootstraps_migration_history() { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Memory, + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(1), + }) + .await + .unwrap(); + + migrate(&db, empty_schema()).await.unwrap(); + + let tables: Vec = sqlx::query_scalar( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name", + ) + .fetch_all(db.pool().as_ref()) + .await + .unwrap(); + + assert!(tables.contains(&"_sqlx_migrations".to_string())); + } +} diff --git a/crates/db-migrate/src/migrate.rs b/crates/db-migrate/src/migrate.rs new file mode 100644 index 0000000000..aad4f29fad --- /dev/null +++ b/crates/db-migrate/src/migrate.rs @@ -0,0 +1,300 @@ +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; +use std::time::{Duration, Instant}; + +use hypr_db_core2::Db3; +use sqlx::migrate::{ + AppliedMigration, Migrate, MigrateError as SqlxMigrateError, Migration, MigrationType, +}; +use sqlx::{Executor, Sqlite, SqliteConnection}; + +use crate::error::MigrateError; +use crate::schema::{DbSchema, MigrationScope, MigrationStep}; + +type BoxFuture<'a, T> = Pin + Send + 'a>>; + +struct DbMigrateConnection<'a> { + db: &'a Db3, + conn: sqlx::pool::PoolConnection, + scopes_by_version: HashMap, +} + +impl<'a> DbMigrateConnection<'a> { + fn new( + db: &'a Db3, + conn: sqlx::pool::PoolConnection, + scopes_by_version: HashMap, + ) -> Self { + Self { + db, + conn, + scopes_by_version, + } + } +} + +pub(crate) async fn run_migrations(db: &Db3, schema: DbSchema) -> Result<(), MigrateError> { + let resolved = resolve_migrations(schema)?; + let scopes_by_version = resolved + .iter() + .map(|(step, migration)| (migration.version, step.scope)) + .collect(); + let migrations: Vec<_> = resolved + .into_iter() + .map(|(_, migration)| migration) + .collect(); + + let conn = db.pool().acquire().await?; + let mut conn = DbMigrateConnection::new(db, conn, scopes_by_version); + run_direct(&migrations, &mut conn).await?; + Ok(()) +} + +async fn run_direct(migrations: &[Migration], conn: &mut C) -> Result<(), SqlxMigrateError> +where + C: Migrate, +{ + conn.lock().await?; + conn.ensure_migrations_table().await?; + + if let Some(version) = conn.dirty_version().await? { + return Err(SqlxMigrateError::Dirty(version)); + } + + let applied_migrations = conn.list_applied_migrations().await?; + validate_applied_migrations(&applied_migrations, migrations)?; + + let applied_migrations: HashMap<_, _> = applied_migrations + .into_iter() + .map(|migration| (migration.version, migration)) + .collect(); + + for migration in migrations { + if migration.migration_type.is_down_migration() { + continue; + } + + match applied_migrations.get(&migration.version) { + Some(applied_migration) => { + if migration.checksum != applied_migration.checksum { + return Err(SqlxMigrateError::VersionMismatch(migration.version)); + } + } + None => { + conn.apply(migration).await?; + } + } + } + + conn.unlock().await?; + Ok(()) +} + +fn validate_applied_migrations( + applied_migrations: &[AppliedMigration], + migrations: &[Migration], +) -> Result<(), SqlxMigrateError> { + let versions: HashSet<_> = migrations + .iter() + .map(|migration| migration.version) + .collect(); + + for applied_migration in applied_migrations { + if !versions.contains(&applied_migration.version) { + return Err(SqlxMigrateError::VersionMissing(applied_migration.version)); + } + } + + Ok(()) +} + +fn resolve_migrations( + schema: DbSchema, +) -> Result, MigrateError> { + let mut seen_versions = HashMap::new(); + let mut migrations = Vec::with_capacity(schema.steps.len()); + + for step in schema.steps { + validate_step(schema, step)?; + + let (version, description) = parse_step_id(step.id)?; + + if let Some(first_step_id) = seen_versions.insert(version, step.id) { + return Err(MigrateError::DuplicateStepVersion { + version, + first_step_id, + second_step_id: step.id, + }); + } + + migrations.push(( + step, + Migration::new( + version, + Cow::Borrowed(description), + MigrationType::Simple, + Cow::Borrowed(step.sql), + step.sql.starts_with("-- no-transaction"), + ), + )); + } + + migrations.sort_by_key(|(_, migration)| migration.version); + Ok(migrations) +} + +fn validate_step(schema: DbSchema, step: &MigrationStep) -> Result<(), MigrateError> { + let MigrationScope::CloudsyncAlter { table_name } = step.scope else { + return Ok(()); + }; + + if (schema.validate_cloudsync_table)(table_name) { + return Ok(()); + } + + Err(MigrateError::InvalidCloudsyncStep { + step_id: step.id, + table_name, + }) +} + +fn parse_step_id(step_id: &'static str) -> Result<(i64, &'static str), MigrateError> { + let Some((version, description)) = step_id.split_once('_') else { + return Err(MigrateError::InvalidStepId { step_id }); + }; + + let version = version + .parse::() + .ok() + .filter(|version| *version > 0) + .ok_or(MigrateError::InvalidStepId { step_id })?; + + if description.is_empty() { + return Err(MigrateError::InvalidStepId { step_id }); + } + + Ok((version, description)) +} + +fn cloudsync_error(err: impl std::error::Error + Send + Sync + 'static) -> SqlxMigrateError { + SqlxMigrateError::Execute(sqlx::Error::config(err)) +} + +impl Migrate for DbMigrateConnection<'_> { + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), SqlxMigrateError>> { + ::ensure_migrations_table(&mut *self.conn) + } + + fn dirty_version(&mut self) -> BoxFuture<'_, Result, SqlxMigrateError>> { + ::dirty_version(&mut *self.conn) + } + + fn list_applied_migrations( + &mut self, + ) -> BoxFuture<'_, Result, SqlxMigrateError>> { + ::list_applied_migrations(&mut *self.conn) + } + + fn lock(&mut self) -> BoxFuture<'_, Result<(), SqlxMigrateError>> { + ::lock(&mut *self.conn) + } + + fn unlock(&mut self) -> BoxFuture<'_, Result<(), SqlxMigrateError>> { + ::unlock(&mut *self.conn) + } + + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let scope = self + .scopes_by_version + .get(&migration.version) + .copied() + .unwrap_or(MigrationScope::Plain); + + match scope { + MigrationScope::Plain => { + ::apply(&mut *self.conn, migration).await + } + MigrationScope::CloudsyncAlter { table_name } => { + if !self.db.cloudsync_enabled() { + return ::apply(&mut *self.conn, migration) + .await; + } + + let start = Instant::now(); + + hypr_db_core2::cloudsync_begin_alter_on(&mut *self.conn, table_name) + .await + .map_err(cloudsync_error)?; + + execute_migration(&mut *self.conn, migration).await?; + + hypr_db_core2::cloudsync_commit_alter_on(&mut *self.conn, table_name) + .await + .map_err(cloudsync_error)?; + + let elapsed = start.elapsed(); + update_execution_time(&mut *self.conn, migration.version, elapsed).await?; + + Ok(elapsed) + } + } + }) + } + + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + ::revert(&mut *self.conn, migration) + } +} + +async fn execute_migration( + conn: &mut SqliteConnection, + migration: &Migration, +) -> Result<(), SqlxMigrateError> { + conn.execute(&*migration.sql) + .await + .map_err(|err| SqlxMigrateError::ExecuteMigration(err, migration.version))?; + + sqlx::query( + r#" +INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) +VALUES ( ?1, ?2, TRUE, ?3, -1 ) + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(&*migration.checksum) + .execute(&mut *conn) + .await?; + + Ok(()) +} + +async fn update_execution_time( + conn: &mut SqliteConnection, + version: i64, + elapsed: Duration, +) -> Result<(), SqlxMigrateError> { + #[allow(clippy::cast_possible_truncation)] + sqlx::query( + r#" +UPDATE _sqlx_migrations +SET execution_time = ?1 +WHERE version = ?2 + "#, + ) + .bind(elapsed.as_nanos() as i64) + .bind(version) + .execute(&mut *conn) + .await?; + + Ok(()) +} diff --git a/crates/db-migrate/src/schema.rs b/crates/db-migrate/src/schema.rs new file mode 100644 index 0000000000..061a1abda7 --- /dev/null +++ b/crates/db-migrate/src/schema.rs @@ -0,0 +1,18 @@ +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum MigrationScope { + Plain, + CloudsyncAlter { table_name: &'static str }, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MigrationStep { + pub id: &'static str, + pub scope: MigrationScope, + pub sql: &'static str, +} + +#[derive(Clone, Copy)] +pub struct DbSchema { + pub steps: &'static [MigrationStep], + pub validate_cloudsync_table: fn(&str) -> bool, +} diff --git a/crates/db-migrate/tests/e2e.rs b/crates/db-migrate/tests/e2e.rs new file mode 100644 index 0000000000..e18a210d12 --- /dev/null +++ b/crates/db-migrate/tests/e2e.rs @@ -0,0 +1,289 @@ +use db_migrate::{DbSchema, MigrateError, MigrationScope, MigrationStep, migrate}; +use hypr_db_core2::Db3; + +const CREATE_WIDGETS_SQL: &str = r#" +CREATE TABLE widgets ( + id INTEGER PRIMARY KEY NOT NULL, + name TEXT NOT NULL DEFAULT '' +) +"#; + +const CREATE_CLOUDSYNC_WIDGETS_SQL: &str = r#" +CREATE TABLE widgets ( + id TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL DEFAULT '' +) +"#; + +const CREATE_WIDGETS_WITH_STATUS_SQL: &str = r#" +CREATE TABLE widgets ( + id INTEGER PRIMARY KEY NOT NULL, + name TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'draft' +) +"#; + +const SEED_WIDGETS_SQL: &str = r#" +INSERT INTO widgets (id, name) VALUES (1, 'alpha') +"#; + +const ADD_SLUG_SQL: &str = r#" +ALTER TABLE widgets ADD COLUMN slug TEXT NOT NULL DEFAULT '' +"#; + +const PLAIN_SCHEMA_STEPS: &[MigrationStep] = &[ + MigrationStep { + id: "20260415010101_create_widgets", + scope: MigrationScope::Plain, + sql: CREATE_WIDGETS_SQL, + }, + MigrationStep { + id: "20260415010102_seed_widgets", + scope: MigrationScope::Plain, + sql: SEED_WIDGETS_SQL, + }, +]; + +const MODIFIED_PLAIN_SCHEMA_STEPS: &[MigrationStep] = &[MigrationStep { + id: "20260415010101_create_widgets", + scope: MigrationScope::Plain, + sql: CREATE_WIDGETS_WITH_STATUS_SQL, +}]; + +const MISSING_VERSION_SCHEMA_STEPS: &[MigrationStep] = &[MigrationStep { + id: "20260415010102_seed_widgets", + scope: MigrationScope::Plain, + sql: SEED_WIDGETS_SQL, +}]; + +const DUPLICATE_VERSION_SCHEMA_STEPS: &[MigrationStep] = &[ + MigrationStep { + id: "20260415020101_create_widgets", + scope: MigrationScope::Plain, + sql: CREATE_WIDGETS_SQL, + }, + MigrationStep { + id: "20260415020101_seed_widgets", + scope: MigrationScope::Plain, + sql: SEED_WIDGETS_SQL, + }, +]; + +const INVALID_STEP_ID_SCHEMA_STEPS: &[MigrationStep] = &[MigrationStep { + id: "invalid_step_id", + scope: MigrationScope::Plain, + sql: CREATE_WIDGETS_SQL, +}]; + +const CLOUDSYNC_BASE_STEPS: &[MigrationStep] = &[MigrationStep { + id: "20260415030101_create_widgets", + scope: MigrationScope::Plain, + sql: CREATE_CLOUDSYNC_WIDGETS_SQL, +}]; + +const CLOUDSYNC_ALTER_STEPS: &[MigrationStep] = &[ + MigrationStep { + id: "20260415030101_create_widgets", + scope: MigrationScope::Plain, + sql: CREATE_CLOUDSYNC_WIDGETS_SQL, + }, + MigrationStep { + id: "20260415030102_add_slug", + scope: MigrationScope::CloudsyncAlter { + table_name: "widgets", + }, + sql: ADD_SLUG_SQL, + }, +]; + +fn schema(steps: &'static [MigrationStep], validate_cloudsync_table: fn(&str) -> bool) -> DbSchema { + DbSchema { + steps, + validate_cloudsync_table, + } +} + +fn never_synced(_: &str) -> bool { + false +} + +fn widgets_synced(table_name: &str) -> bool { + table_name == "widgets" +} + +async fn open_plain_db() -> Db3 { + Db3::connect_memory_plain().await.unwrap() +} + +async fn applied_versions(db: &Db3) -> Vec { + sqlx::query_scalar("SELECT version FROM _sqlx_migrations ORDER BY version") + .fetch_all(db.pool().as_ref()) + .await + .unwrap() +} + +async fn widget_names(db: &Db3) -> Vec { + sqlx::query_scalar("SELECT name FROM widgets ORDER BY id") + .fetch_all(db.pool().as_ref()) + .await + .unwrap() +} + +async fn widget_columns(db: &Db3) -> Vec { + sqlx::query_scalar("SELECT name FROM pragma_table_info('widgets') ORDER BY cid") + .fetch_all(db.pool().as_ref()) + .await + .unwrap() +} + +#[tokio::test] +async fn plain_migrations_apply_and_remain_idempotent() { + let db = open_plain_db().await; + + migrate(&db, schema(PLAIN_SCHEMA_STEPS, never_synced)) + .await + .unwrap(); + + assert_eq!( + applied_versions(&db).await, + vec![20260415010101, 20260415010102] + ); + assert_eq!(widget_names(&db).await, vec!["alpha".to_string()]); + + migrate(&db, schema(PLAIN_SCHEMA_STEPS, never_synced)) + .await + .unwrap(); + + assert_eq!( + applied_versions(&db).await, + vec![20260415010101, 20260415010102] + ); + assert_eq!(widget_names(&db).await, vec!["alpha".to_string()]); +} + +#[tokio::test] +async fn changed_checksum_is_rejected() { + let db = open_plain_db().await; + + migrate(&db, schema(&PLAIN_SCHEMA_STEPS[..1], never_synced)) + .await + .unwrap(); + + let err = migrate(&db, schema(MODIFIED_PLAIN_SCHEMA_STEPS, never_synced)) + .await + .unwrap_err(); + + assert!(matches!( + err, + MigrateError::SqlxMigrate(sqlx::migrate::MigrateError::VersionMismatch(20260415010101)) + )); +} + +#[tokio::test] +async fn missing_applied_version_is_rejected() { + let db = open_plain_db().await; + + migrate(&db, schema(PLAIN_SCHEMA_STEPS, never_synced)) + .await + .unwrap(); + + let err = migrate(&db, schema(MISSING_VERSION_SCHEMA_STEPS, never_synced)) + .await + .unwrap_err(); + + assert!(matches!( + err, + MigrateError::SqlxMigrate(sqlx::migrate::MigrateError::VersionMissing(20260415010101)) + )); +} + +#[tokio::test] +async fn invalid_manifest_metadata_is_rejected() { + let db = open_plain_db().await; + + let invalid_id = migrate(&db, schema(INVALID_STEP_ID_SCHEMA_STEPS, never_synced)) + .await + .unwrap_err(); + assert!(matches!( + invalid_id, + MigrateError::InvalidStepId { + step_id: "invalid_step_id" + } + )); + + let duplicate_version = migrate(&db, schema(DUPLICATE_VERSION_SCHEMA_STEPS, never_synced)) + .await + .unwrap_err(); + assert!(matches!( + duplicate_version, + MigrateError::DuplicateStepVersion { + version: 20260415020101, + .. + } + )); +} + +#[tokio::test] +async fn cloudsync_alter_scope_falls_back_to_plain_when_cloudsync_is_disabled() { + let db = open_plain_db().await; + + migrate(&db, schema(CLOUDSYNC_BASE_STEPS, widgets_synced)) + .await + .unwrap(); + migrate(&db, schema(CLOUDSYNC_ALTER_STEPS, widgets_synced)) + .await + .unwrap(); + + assert_eq!( + applied_versions(&db).await, + vec![20260415030101, 20260415030102] + ); + assert_eq!( + widget_columns(&db).await, + vec!["id".to_string(), "name".to_string(), "slug".to_string()] + ); +} + +#[cfg(any( + all(test, target_os = "macos", target_arch = "aarch64"), + all(test, target_os = "macos", target_arch = "x86_64"), + all(test, target_os = "linux", target_env = "gnu", target_arch = "aarch64"), + all(test, target_os = "linux", target_env = "gnu", target_arch = "x86_64"), + all( + test, + target_os = "linux", + target_env = "musl", + target_arch = "aarch64" + ), + all(test, target_os = "linux", target_env = "musl", target_arch = "x86_64"), + all(test, target_os = "windows", target_arch = "x86_64"), +))] +#[tokio::test] +async fn cloudsync_alter_scope_runs_successfully_on_a_cloudsync_table() { + let db = Db3::connect_memory().await.unwrap(); + + migrate(&db, schema(CLOUDSYNC_BASE_STEPS, widgets_synced)) + .await + .unwrap(); + + db.cloudsync_init("widgets", None, None).await.unwrap(); + + let enabled: bool = sqlx::query_scalar("SELECT cloudsync_is_enabled('widgets')") + .fetch_one(db.pool().as_ref()) + .await + .unwrap(); + assert!(enabled); + + migrate(&db, schema(CLOUDSYNC_ALTER_STEPS, widgets_synced)) + .await + .unwrap(); + + assert_eq!( + applied_versions(&db).await, + vec![20260415030101, 20260415030102] + ); + assert_eq!( + widget_columns(&db).await, + vec!["id".to_string(), "name".to_string(), "slug".to_string()] + ); +} diff --git a/crates/mobile-bridge/Cargo.toml b/crates/mobile-bridge/Cargo.toml index 5ea465ac02..b24e110f54 100644 --- a/crates/mobile-bridge/Cargo.toml +++ b/crates/mobile-bridge/Cargo.toml @@ -11,6 +11,8 @@ name = "mobile_bridge" hypr-db-app = { workspace = true } hypr-db-core2 = { workspace = true } hypr-db-live-query = { workspace = true } +hypr-db-migrate = { workspace = true } + serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] } diff --git a/crates/mobile-bridge/src/db.rs b/crates/mobile-bridge/src/db.rs index 6be8579b70..57d11be224 100644 --- a/crates/mobile-bridge/src/db.rs +++ b/crates/mobile-bridge/src/db.rs @@ -1,21 +1,23 @@ use std::path::PathBuf; -use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use hypr_db_core2::{Db3, DbOpenOptions, DbStorage}; + +use crate::error::OpenAppDbError; pub(crate) async fn open_app_db( db_path: &PathBuf, - cloudsync_open_mode: CloudsyncOpenMode, -) -> Result { - Db3::open_with_migrate( - DbOpenOptions { - storage: DbStorage::Local(db_path), - cloudsync_open_mode, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) - .await + cloudsync_enabled: bool, +) -> Result { + let db = Db3::open(DbOpenOptions { + storage: DbStorage::Local(db_path), + cloudsync_enabled, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) + .await?; + + hypr_db_migrate::migrate(&db, hypr_db_app::schema()).await?; + + Ok(db) } diff --git a/crates/mobile-bridge/src/error.rs b/crates/mobile-bridge/src/error.rs index 0d4d0e82f7..e7b73e795e 100644 --- a/crates/mobile-bridge/src/error.rs +++ b/crates/mobile-bridge/src/error.rs @@ -56,3 +56,11 @@ pub(crate) fn serialization_error(error: serde_json::Error) -> BridgeError { reason: error.to_string(), } } + +#[derive(Debug, thiserror::Error)] +pub(crate) enum OpenAppDbError { + #[error(transparent)] + Open(#[from] hypr_db_core2::DbOpenError), + #[error(transparent)] + Migrate(#[from] hypr_db_migrate::MigrateError), +} diff --git a/crates/mobile-bridge/src/lib.rs b/crates/mobile-bridge/src/lib.rs index 47b1e7cf5a..6a894f187a 100644 --- a/crates/mobile-bridge/src/lib.rs +++ b/crates/mobile-bridge/src/lib.rs @@ -38,12 +38,9 @@ impl MobileDbBridge { reason: error.to_string(), })?; let path = std::path::PathBuf::from(db_path); - let cloudsync_open_mode = match cloudsync_open_mode.as_deref() { - Some("enabled") => hypr_db_core2::CloudsyncOpenMode::Enabled, - _ => hypr_db_core2::CloudsyncOpenMode::Disabled, - }; + let cloudsync_enabled = cloudsync_open_mode.as_deref() == Some("enabled"); let db = runtime - .block_on(db::open_app_db(&path, cloudsync_open_mode)) + .block_on(db::open_app_db(&path, cloudsync_enabled)) .map_err(|error| BridgeError::OpenFailed { reason: error.to_string(), })?; diff --git a/plugins/db/Cargo.toml b/plugins/db/Cargo.toml index a65f3e229d..6f8d2e6ffc 100644 --- a/plugins/db/Cargo.toml +++ b/plugins/db/Cargo.toml @@ -11,6 +11,7 @@ description = "" hypr-db-app = { workspace = true } hypr-db-core2 = { workspace = true } hypr-db-live-query = { workspace = true } +hypr-db-migrate = { workspace = true } hypr-storage = { workspace = true } hypr-tauri-utils = { workspace = true } diff --git a/plugins/db/src/error.rs b/plugins/db/src/error.rs index 0b7ef01779..014258286e 100644 --- a/plugins/db/src/error.rs +++ b/plugins/db/src/error.rs @@ -7,6 +7,8 @@ pub enum Error { #[error(transparent)] Db(#[from] hypr_db_core2::DbOpenError), #[error(transparent)] + Migrate(#[from] hypr_db_migrate::MigrateError), + #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] Sqlx(#[from] sqlx::Error), diff --git a/plugins/db/src/lib.rs b/plugins/db/src/lib.rs index 2ff3d25193..075e4d2c99 100644 --- a/plugins/db/src/lib.rs +++ b/plugins/db/src/lib.rs @@ -118,19 +118,18 @@ mod test { async fn setup_runtime() -> (tempfile::TempDir, Arc) { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("app.db"); - let db = hypr_db_core2::Db3::open_with_migrate( - hypr_db_core2::DbOpenOptions { - storage: hypr_db_core2::DbStorage::Local(&db_path), - cloudsync_open_mode: hypr_db_core2::CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: hypr_db_core2::MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) + let db = hypr_db_core2::Db3::open(hypr_db_core2::DbOpenOptions { + storage: hypr_db_core2::DbStorage::Local(&db_path), + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await .unwrap(); + hypr_db_migrate::migrate(&db, hypr_db_app::schema()) + .await + .unwrap(); (dir, Arc::new(runtime::PluginDbRuntime::new(Arc::new(db)))) } diff --git a/plugins/db/src/runtime.rs b/plugins/db/src/runtime.rs index 3737ba6603..7af60fea1b 100644 --- a/plugins/db/src/runtime.rs +++ b/plugins/db/src/runtime.rs @@ -1,6 +1,6 @@ use std::path::Path; -use hypr_db_core2::{CloudsyncOpenMode, Db3, DbOpenOptions, DbStorage, MigrationFailurePolicy}; +use hypr_db_core2::{Db3, DbOpenOptions, DbStorage}; use hypr_db_live_query::QueryEventSink; use tauri::ipc::Channel; @@ -37,18 +37,16 @@ pub async fn open_app_db(db_path: Option<&Path>) -> Result { None => DbStorage::Memory, }; - let db = Db3::open_with_migrate( - DbOpenOptions { - storage, - cloudsync_open_mode: CloudsyncOpenMode::Disabled, - journal_mode_wal: true, - foreign_keys: true, - max_connections: Some(4), - migration_failure_policy: MigrationFailurePolicy::Fail, - }, - |pool| Box::pin(hypr_db_app::migrate(pool)), - ) + let db = Db3::open(DbOpenOptions { + storage, + cloudsync_enabled: false, + journal_mode_wal: true, + foreign_keys: true, + max_connections: Some(4), + }) .await?; + hypr_db_migrate::migrate(&db, hypr_db_app::schema()).await?; + Ok(db) }