Skip to content

fgiova/fastify-sqs-consumer

Repository files navigation

fastify sqs-consumer

NPM version CI workflow TypeScript Linted with Biome Maintainability Code Coverage

Description

This plugin for fastify 5.x (for fastify 4.x use 1.x ver) allows you to consume messages from AWS SQS queues. On fastify shutdown a simple wait function is called to wait for the end of the processing of the messages in progress.

Warning
To use this plugin, you must have correctly configured your AWS credentials.
This plugin uses @fgiova/mini-sqs-client and @fgiova/sqs-consumer to interact with SQS service.

Install

npm i @fgiova/fastify-sqs-consumer

Usage

ESM

import fastify from "fastify";
import sqsConsumer from "@fgiova/fastify-sqs-consumer";

const app = fastify();

app.register(sqsConsumer, [
    {
        arn: "arn:aws:sqs:eu-central-1:000000000000:MyQueue",
        waitTimeSeconds: 20,
        timeout: 10_000,
        batchSize: 10,
        handlerFunction: async (message, fastify) => {
            return true;
        }
    }
]);

CommonJS

const fastify = require("fastify")();

fastify.register(require("@fgiova/fastify-sqs-consumer").default, [
    {
        arn: "arn:aws:sqs:eu-central-1:000000000000:MyQueue",
        waitTimeSeconds: 20,
        timeout: 10_000,
        batchSize: 10,
        handlerFunction: async (message, fastify) => {
            return true;
        }
    }
]);

Options

Options are an array of objects with the following properties (one for each queue):

Option Type Description
arn* string The ARN of the Amazon SQS queue from which messages are received.
handlerFunction* function The function that will be called for each message.
name string An optional name for the consumer (useful for add hooks post config). If not provided, uuid is assigned.
waitTimeSeconds number The duration (in seconds, default 20s) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
timeout number The duration before the message is considered as failed: default 90000ms.
batchSize number The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
attributeNames string[] Array of message system attributes to retrieve for each message.
messageAttributeNames string[] Array of custom message attributes to retrieve for each message.
parallelExecution boolean Enable parallel execution of message handlers.
credentials {accessKeyId, secretAccessKey} Explicit AWS credentials (alternative to environment configuration).
events object Events functions for the consumer (detail in next table).
sqs MiniSQSClient Initialized @fgiova/mini-sqs-client instance (useful for testing sessions).

* required

Decorator

The plugin decorates the Fastify instance with sqsConsumers, a Record<string, { consumer: SQSConsumer, meta: { pendingMessages: number } }> that provides access to the registered consumers and their metadata.

handlerFunction

Handler function is an async function called for each message received from the queue. Each Error thrown by the function is caught and the message is not deleted from the queue. Otherwise, the message is deleted from the queue.

Events

Option Type Description
onPoll (messages: Message[]) => Promise<Message[]> Called when the consumer polls for messages
onMessage (message: Message) => Promise Called when the consumer receives a message
onHandle (message: Message) => Promise Called when the consumer handles a message
onHandlerSuccess (message: Message) => Promise Called when the consumer handles a message successfully
onHandlerTimeout (message: Message) => Promise Called when the consumer handler execution exceed executionTimeout
onHandlerError (message: Message, error: Error) => Promise Called when the consumer handler execution throws an error
onSuccess (message: Message) => Promise Called when the consumer handler execution finishes successfully
onError ( hook: HookName, message: Message, error: Error) => Promise Called when the consumer handler execution throws an uncaught error
onSQSError (error: Error, message?: Message) => Promise Called when the consumer receives an error from the SQS service

License

Licensed under MIT.

About

This plugin for fastify 4.x allows you to consume messages from AWS SQS queues. On fastify shutdown a simple wait function is called to wait for the end of the processing of the messages in progress

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors