diff --git a/docs/docs/multi-tab-worker.md b/docs/docs/multi-tab-worker.md index f64a9b9ea..ac0ff14e8 100644 --- a/docs/docs/multi-tab-worker.md +++ b/docs/docs/multi-tab-worker.md @@ -53,6 +53,8 @@ In addition to having all the standard methods of the [`PGlite` interface](./api - `id: string`
This is an optional `id` to group your PGlite workers. The leader election is run between all `PGliteWorker`s with the same `id`.
If not provided, the url to the worker is concatenated with the `dataDir` option to create an id. +- `singleTab: boolean`
+ Set to `true` if using `PGliteWorker` in an environment without `navigator.locks` support, such as a Capacitor app. These webviews have no tabs so there is no need to run a leader election. Defaults to `false`. - `meta: any`
Any additional metadata you would like to pass to the worker process `init` function. diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index 93a552173..80142e0aa 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -17,6 +17,7 @@ export type PGliteWorkerOptions = PGliteOptions & { meta?: any id?: string + singleTab?: boolean } export class PGliteWorker @@ -45,6 +46,8 @@ export class PGliteWorker #tabChannel?: BroadcastChannel #releaseTabCloseLock?: () => void + #singleTab: boolean + #notifyListeners = new Map void>>() #globalNotifyListeners = new Set<(channel: string, payload: string) => void>() @@ -56,6 +59,7 @@ export class PGliteWorker this.#workerProcess = worker this.#tabId = uuid() this.#extensions = options?.extensions ?? {} + this.#singleTab = options?.singleTab ?? false this.#workerHerePromise = new Promise((resolve) => { this.#workerProcess.addEventListener( @@ -148,46 +152,61 @@ export class PGliteWorker // Wait for the worker let us know it's ready await this.#workerReadyPromise - // Acquire the tab close lock, this is released then the tab, or this - // PGliteWorker instance, is closed - const tabCloseLockId = `pglite-tab-close:${this.#tabId}` - this.#releaseTabCloseLock = await acquireLock(tabCloseLockId) - - // Start the broadcast channel used to communicate with tabs and leader election - const broadcastChannelId = `pglite-broadcast:${this.#workerID}` - this.#broadcastChannel = new BroadcastChannel(broadcastChannelId) - - // Start the tab channel used to communicate with the leader directly - const tabChannelId = `pglite-tab:${this.#tabId}` - this.#tabChannel = new BroadcastChannel(tabChannelId) - - this.#broadcastChannel.addEventListener('message', async (event) => { - if (event.data.type === 'leader-here') { - this.#connected = false - this.#eventTarget.dispatchEvent(new Event('leader-change')) - this.#leaderNotifyLoop() - } else if (event.data.type === 'notify') { - this.#receiveNotification(event.data.channel, event.data.payload) - } - }) + if (this.#singleTab) { + // In single-tab mode, skip multi-tab coordination + this.#connected = true + this.#isLeader = true + this.#debug = await this.#rpc('getDebugLevel') + this.#ready = true + + // Listen for notifications directly from the worker + this.#workerProcess.addEventListener('message', async (event) => { + if (event.data.type === 'notify') { + this.#receiveNotification(event.data.channel, event.data.payload) + } + }) + } else { + // Acquire the tab close lock, this is released then the tab, or this + // PGliteWorker instance, is closed + const tabCloseLockId = `pglite-tab-close:${this.#tabId}` + this.#releaseTabCloseLock = await acquireLock(tabCloseLockId) + + // Start the broadcast channel used to communicate with tabs and leader election + const broadcastChannelId = `pglite-broadcast:${this.#workerID}` + this.#broadcastChannel = new BroadcastChannel(broadcastChannelId) + + // Start the tab channel used to communicate with the leader directly + const tabChannelId = `pglite-tab:${this.#tabId}` + this.#tabChannel = new BroadcastChannel(tabChannelId) + + this.#broadcastChannel.addEventListener('message', async (event) => { + if (event.data.type === 'leader-here') { + this.#connected = false + this.#eventTarget.dispatchEvent(new Event('leader-change')) + this.#leaderNotifyLoop() + } else if (event.data.type === 'notify') { + this.#receiveNotification(event.data.channel, event.data.payload) + } + }) - this.#tabChannel.addEventListener('message', async (event) => { - if (event.data.type === 'connected') { - this.#connected = true - this.#eventTarget.dispatchEvent(new Event('connected')) - this.#debug = await this.#rpc('getDebugLevel') - this.#ready = true - } - }) + this.#tabChannel.addEventListener('message', async (event) => { + if (event.data.type === 'connected') { + this.#connected = true + this.#eventTarget.dispatchEvent(new Event('connected')) + this.#debug = await this.#rpc('getDebugLevel') + this.#ready = true + } + }) - this.#workerProcess.addEventListener('message', async (event) => { - if (event.data.type === 'leader-now') { - this.#isLeader = true - this.#eventTarget.dispatchEvent(new Event('leader-change')) - } - }) + this.#workerProcess.addEventListener('message', async (event) => { + if (event.data.type === 'leader-now') { + this.#isLeader = true + this.#eventTarget.dispatchEvent(new Event('leader-change')) + } + }) - this.#leaderNotifyLoop() + this.#leaderNotifyLoop() + } // Init array types // We don't await this as it will result in a deadlock @@ -209,6 +228,37 @@ export class PGliteWorker method: Method, ...args: Parameters ): Promise> { + if (this.#singleTab) { + // In single-tab mode, communicate directly with the worker + const callId = uuid() + const message: WorkerRpcCall = { + type: 'rpc-call', + callId, + method, + args, + } + this.#workerProcess.postMessage(message) + return await new Promise>( + (resolve, reject) => { + const listener = (event: MessageEvent) => { + if (event.data.callId !== callId) return + this.#workerProcess.removeEventListener('message', listener) + const message: WorkerRpcResponse = event.data + if (message.type === 'rpc-return') { + resolve(message.result) + } else if (message.type === 'rpc-error') { + const error = new Error(message.error.message) + Object.assign(error, message.error) + reject(error) + } else { + reject(new Error('Invalid message')) + } + } + this.#workerProcess.addEventListener('message', listener) + }, + ) + } + const callId = uuid() const message: WorkerRpcCall = { type: 'rpc-call', @@ -522,6 +572,50 @@ export async function worker({ init }: WorkerOptions) { // Let the main thread know we are ready postMessage({ type: 'ready', id }) + const singleTab = options.singleTab ?? false + + if (singleTab) { + // Single-tab mode: skip multi-tab coordination and start directly + const db = await init(options) + + // Listen for notifications and send them to the main thread + db.onNotification((channel, payload) => { + postMessage({ type: 'notify', channel, payload }) + }) + + const api = makeWorkerApi('single-tab', db) + + // Handle RPC calls directly from the main thread + addEventListener('message', async (event) => { + const msg = event.data + if (msg.type === 'rpc-call') { + await db.waitReady + const { callId, method, args } = msg as WorkerRpcCall + try { + // @ts-ignore no apparent reason why it fails + const result = (await api[method](...args)) as WorkerRpcResult< + typeof method + >['result'] + postMessage({ + type: 'rpc-return', + callId, + result, + } satisfies WorkerRpcResult) + } catch (error) { + console.error(error) + postMessage({ + type: 'rpc-error', + callId, + error: { message: (error as Error).message }, + } satisfies WorkerRpcError) + } + } + }) + + return + } + + // Multi-tab mode: use locks and broadcast channels for coordination const electionLockId = `pglite-election-lock:${id}` const broadcastChannelId = `pglite-broadcast:${id}` const broadcastChannel = new BroadcastChannel(broadcastChannelId) @@ -620,15 +714,18 @@ function makeWorkerApi(tabId: string, db: PGlite) { // If the tab is closed and it is holding a lock, release the the locks // and rollback any pending transactions - const tabCloseLockId = `pglite-tab-close:${tabId}` - acquireLock(tabCloseLockId).then(() => { - if (transactionLockRelease) { - // rollback any pending transactions - db.exec('ROLLBACK') - } - queryLockRelease?.() - transactionLockRelease?.() - }) + // Skip this in single-tab mode (tabId === 'single-tab') + if (tabId !== 'single-tab') { + const tabCloseLockId = `pglite-tab-close:${tabId}` + acquireLock(tabCloseLockId).then(() => { + if (transactionLockRelease) { + // rollback any pending transactions + db.exec('ROLLBACK') + } + queryLockRelease?.() + transactionLockRelease?.() + }) + } return { async getDebugLevel() {