Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/multi-tab-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ In addition to having all the standard methods of the [`PGlite` interface](./api
- `id: string`<br>
This is an optional `id` to group your PGlite workers. The leader election is run between all `PGliteWorker`s with the same `id`.<br>
If not provided, the url to the worker is concatenated with the `dataDir` option to create an id.
- `singleTab: boolean`<br>
Set to `true` if using `PGliteWorker` in an environment without `navigator.locks` support, such as a Capacitor app. These webviews have no tabs so there is no need to run a leader election. Defaults to `false`.
- `meta: any`<br>
Any additional metadata you would like to pass to the worker process `init` function.

Expand Down
189 changes: 143 additions & 46 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type PGliteWorkerOptions<E extends Extensions = Extensions> =
PGliteOptions<E> & {
meta?: any
id?: string
singleTab?: boolean
}

export class PGliteWorker
Expand Down Expand Up @@ -45,6 +46,8 @@ export class PGliteWorker
#tabChannel?: BroadcastChannel
#releaseTabCloseLock?: () => void

#singleTab: boolean

#notifyListeners = new Map<string, Set<(payload: string) => void>>()
#globalNotifyListeners = new Set<(channel: string, payload: string) => void>()

Expand All @@ -56,6 +59,7 @@ export class PGliteWorker
this.#workerProcess = worker
this.#tabId = uuid()
this.#extensions = options?.extensions ?? {}
this.#singleTab = options?.singleTab ?? false

this.#workerHerePromise = new Promise<void>((resolve) => {
this.#workerProcess.addEventListener(
Expand Down Expand Up @@ -148,46 +152,61 @@ export class PGliteWorker
// Wait for the worker let us know it's ready
await this.#workerReadyPromise

// Acquire the tab close lock, this is released then the tab, or this
// PGliteWorker instance, is closed
const tabCloseLockId = `pglite-tab-close:${this.#tabId}`
this.#releaseTabCloseLock = await acquireLock(tabCloseLockId)

// Start the broadcast channel used to communicate with tabs and leader election
const broadcastChannelId = `pglite-broadcast:${this.#workerID}`
this.#broadcastChannel = new BroadcastChannel(broadcastChannelId)

// Start the tab channel used to communicate with the leader directly
const tabChannelId = `pglite-tab:${this.#tabId}`
this.#tabChannel = new BroadcastChannel(tabChannelId)

this.#broadcastChannel.addEventListener('message', async (event) => {
if (event.data.type === 'leader-here') {
this.#connected = false
this.#eventTarget.dispatchEvent(new Event('leader-change'))
this.#leaderNotifyLoop()
} else if (event.data.type === 'notify') {
this.#receiveNotification(event.data.channel, event.data.payload)
}
})
if (this.#singleTab) {
// In single-tab mode, skip multi-tab coordination
this.#connected = true
this.#isLeader = true
this.#debug = await this.#rpc('getDebugLevel')
this.#ready = true

// Listen for notifications directly from the worker
this.#workerProcess.addEventListener('message', async (event) => {
if (event.data.type === 'notify') {
this.#receiveNotification(event.data.channel, event.data.payload)
}
})
} else {
// Acquire the tab close lock, this is released then the tab, or this
// PGliteWorker instance, is closed
const tabCloseLockId = `pglite-tab-close:${this.#tabId}`
this.#releaseTabCloseLock = await acquireLock(tabCloseLockId)

// Start the broadcast channel used to communicate with tabs and leader election
const broadcastChannelId = `pglite-broadcast:${this.#workerID}`
this.#broadcastChannel = new BroadcastChannel(broadcastChannelId)

// Start the tab channel used to communicate with the leader directly
const tabChannelId = `pglite-tab:${this.#tabId}`
this.#tabChannel = new BroadcastChannel(tabChannelId)

this.#broadcastChannel.addEventListener('message', async (event) => {
if (event.data.type === 'leader-here') {
this.#connected = false
this.#eventTarget.dispatchEvent(new Event('leader-change'))
this.#leaderNotifyLoop()
} else if (event.data.type === 'notify') {
this.#receiveNotification(event.data.channel, event.data.payload)
}
})

this.#tabChannel.addEventListener('message', async (event) => {
if (event.data.type === 'connected') {
this.#connected = true
this.#eventTarget.dispatchEvent(new Event('connected'))
this.#debug = await this.#rpc('getDebugLevel')
this.#ready = true
}
})
this.#tabChannel.addEventListener('message', async (event) => {
if (event.data.type === 'connected') {
this.#connected = true
this.#eventTarget.dispatchEvent(new Event('connected'))
this.#debug = await this.#rpc('getDebugLevel')
this.#ready = true
}
})

this.#workerProcess.addEventListener('message', async (event) => {
if (event.data.type === 'leader-now') {
this.#isLeader = true
this.#eventTarget.dispatchEvent(new Event('leader-change'))
}
})
this.#workerProcess.addEventListener('message', async (event) => {
if (event.data.type === 'leader-now') {
this.#isLeader = true
this.#eventTarget.dispatchEvent(new Event('leader-change'))
}
})

this.#leaderNotifyLoop()
this.#leaderNotifyLoop()
}

// Init array types
// We don't await this as it will result in a deadlock
Expand All @@ -209,6 +228,37 @@ export class PGliteWorker
method: Method,
...args: Parameters<WorkerApi[Method]>
): Promise<ReturnType<WorkerApi[Method]>> {
if (this.#singleTab) {
// In single-tab mode, communicate directly with the worker
const callId = uuid()
const message: WorkerRpcCall<Method> = {
type: 'rpc-call',
callId,
method,
args,
}
this.#workerProcess.postMessage(message)
return await new Promise<ReturnType<WorkerApi[Method]>>(
(resolve, reject) => {
const listener = (event: MessageEvent) => {
if (event.data.callId !== callId) return
this.#workerProcess.removeEventListener('message', listener)
const message: WorkerRpcResponse<Method> = event.data
if (message.type === 'rpc-return') {
resolve(message.result)
} else if (message.type === 'rpc-error') {
const error = new Error(message.error.message)
Object.assign(error, message.error)
reject(error)
} else {
reject(new Error('Invalid message'))
}
}
this.#workerProcess.addEventListener('message', listener)
},
)
}

const callId = uuid()
const message: WorkerRpcCall<Method> = {
type: 'rpc-call',
Expand Down Expand Up @@ -522,6 +572,50 @@ export async function worker({ init }: WorkerOptions) {
// Let the main thread know we are ready
postMessage({ type: 'ready', id })

const singleTab = options.singleTab ?? false

if (singleTab) {
// Single-tab mode: skip multi-tab coordination and start directly
const db = await init(options)

// Listen for notifications and send them to the main thread
db.onNotification((channel, payload) => {
postMessage({ type: 'notify', channel, payload })
})

const api = makeWorkerApi('single-tab', db)

// Handle RPC calls directly from the main thread
addEventListener('message', async (event) => {
const msg = event.data
if (msg.type === 'rpc-call') {
await db.waitReady
const { callId, method, args } = msg as WorkerRpcCall<WorkerRpcMethod>
try {
// @ts-ignore no apparent reason why it fails
const result = (await api[method](...args)) as WorkerRpcResult<
typeof method
>['result']
postMessage({
type: 'rpc-return',
callId,
result,
} satisfies WorkerRpcResult<typeof method>)
} catch (error) {
console.error(error)
postMessage({
type: 'rpc-error',
callId,
error: { message: (error as Error).message },
} satisfies WorkerRpcError)
}
}
})

return
}

// Multi-tab mode: use locks and broadcast channels for coordination
const electionLockId = `pglite-election-lock:${id}`
const broadcastChannelId = `pglite-broadcast:${id}`
const broadcastChannel = new BroadcastChannel(broadcastChannelId)
Expand Down Expand Up @@ -620,15 +714,18 @@ function makeWorkerApi(tabId: string, db: PGlite) {

// If the tab is closed and it is holding a lock, release the the locks
// and rollback any pending transactions
const tabCloseLockId = `pglite-tab-close:${tabId}`
acquireLock(tabCloseLockId).then(() => {
if (transactionLockRelease) {
// rollback any pending transactions
db.exec('ROLLBACK')
}
queryLockRelease?.()
transactionLockRelease?.()
})
// Skip this in single-tab mode (tabId === 'single-tab')
if (tabId !== 'single-tab') {
const tabCloseLockId = `pglite-tab-close:${tabId}`
acquireLock(tabCloseLockId).then(() => {
if (transactionLockRelease) {
// rollback any pending transactions
db.exec('ROLLBACK')
}
queryLockRelease?.()
transactionLockRelease?.()
})
}

return {
async getDebugLevel() {
Expand Down