Move notification delivery to a queue/worker#3659
Move notification delivery to a queue/worker#3659CarsonF wants to merge 5 commits intonotifications/emailfrom
Conversation
d9168d2 to
7f4007f
Compare
Otherwise, users could try to load these pending DB resources before the tx makes them live.
e5f7188 to
a890a15
Compare
There was a problem hiding this comment.
Pull request overview
This PR moves notification delivery (broadcast + email) out of the request path and into a BullMQ queue/worker, with supporting changes to resource loading so queue workers don’t require a GraphQL context.
Changes:
- Enqueue notification delivery after DB commit via
NotificationDeliveryQueue. - Add
NotificationDeliveryWorkerto handle preference-based channel routing and actual delivery. - Adjust
ResourceLoadercontext selection to support non-GQL execution paths.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/core/resources/resource.loader.ts | Attempts to allow loaders to operate without a GQL context when an identity context exists. |
| src/components/notifications/notification.service.ts | Moves delivery from inline execution to an afterCommit queue job. |
| src/components/notifications/notification.module.ts | Registers the new queue and worker in the notifications module. |
| src/components/notifications/notification-delivery.worker.ts | New worker that resolves channels and delivers via broadcaster + email with retry progress tracking. |
| src/components/notifications/notification-delivery.queue.ts | New queue definition and job options for notification delivery retries/retention. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const context = this.identity.currentMaybe | ||
| ? // If we have an ALS identity, the loaders are configured to use that. | ||
| // Just fake an object id, as it will be ignored. | ||
| // See: cc912734a68439fe03a992b365d6af4bbb02cff3 | ||
| // Duplicating this check here to prevent requiring a GQL Context to exist. | ||
| // If we've configured an Identity, like Identity.asUser() then that is sufficient. | ||
| // And queue workers do not have a GQL Context. | ||
| IDENTITY_CONTEXT_ID | ||
| : this.config.isCli | ||
| ? CLI_CONTEXT_ID | ||
| : this.contextHost.context; |
There was a problem hiding this comment.
identity.currentMaybe throws if no AsyncLocalStorage session context exists (see SessionHost.currentMaybe), so this conditional can itself crash in queue workers / other non-request paths. Use identity.currentIfInCtx (or guard with try/catch) when you just want to detect whether an ALS identity context is active.
| this.txHooks.afterCommit.add(async () => { | ||
| await this.deliveryQueue.add('deliver', { | ||
| typeName: this.typeClassToName.get(type)!, | ||
| notification, |
There was a problem hiding this comment.
Queueing the full notification DTO will be JSON-serialized by BullMQ, which strips class prototypes and will coerce Luxon DateTime fields (e.g. createdAt, readAt) into strings. The delivery worker then receives a different runtime shape than Notification, which can break strategies (especially renderEmail). Prefer queueing stable primitives (e.g. notification.id, typeName, recipients) and re-loading the notification in the worker (or explicitly serialize/rehydrate temporal fields).
| notification, | |
| notificationId: notification.id, |
| const createProgressManager = <T extends object>( | ||
| job: Job, | ||
| ): JobProgressor<T> => { | ||
| let current = job.progress as T; | ||
| return { | ||
| get: () => current, | ||
| set: async (getNext: (prev: T) => T) => { | ||
| const next = getNext(current); | ||
| current = next; | ||
| await job.updateProgress(next); |
There was a problem hiding this comment.
createProgressManager() initializes current from job.progress. In BullMQ the default progress is a number (typically 0), so the first progress.set(prev => ({ ...prev, ... })) will throw when spreading a non-object. Initialize current to {} when job.progress is not a plain object (or when it's null/undefined).
No description provided.