Queues are a powerful design pattern that help you deal with common application scaling and performance challenges. Some examples of problems that Queues can help you solve are:
- Smooth out processing peaks. For example, if users can initiate resource-intensive tasks at arbitrary times, you can add these tasks to a queue instead of performing them synchronously. Then you can have worker processes pull tasks from the queue in a controlled manner. You can easily add new Queue consumers to scale up the back-end task handling as the application scales up.
- Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
- Provide a reliable communication channel across various services. For example, you can queue tasks (jobs) in one process or service, and consume them in another. You can be notified (by listening for status events) upon completion, error or other state changes in the job life cycle from any process or service. When Queue producers or consumers fail, their state is preserved and task handling can restart automatically when nodes are restarted.
Nest provides the
@nestjs/bull package as an abstraction/wrapper on top of Bull, a popular, well supported, high performance Node.js based Queue system implementation. The package makes it easy to integrate Bull Queues in a Nest-friendly way to your application.
Bull uses Redis to persist job data, so you'll need to have Redis installed on your system. Because it is Redis-backed, your Queue architecture can be completely distributed and platform-independent. For example, you can have some Queue producers and consumers and listeners running in Nest on one (or several) nodes, and other producers, consumers and listeners running on other Node.js platforms on other network nodes.
This chapter covers the
@nestjs/bull package. We also recommend reading the Bull documentation for more background and specific implementation details.
To begin using it, we first install the required dependencies.
Once the installation process is complete, we can import the
BullModule into the root
forRoot() method is used to register a
bull package configuration object that will be used by all queues registered in the application (unless specified otherwise). A configuration object consist of the following properties:
limiter: RateLimiter- Options to control the rate at which the queue's jobs are processed. See RateLimiter for more information. Optional.
redis: RedisOpts- Options to configure the Redis connection. See RedisOpts for more information. Optional.
prefix: string- Prefix for all queue keys. Optional.
defaultJobOptions: JobOpts- Options to control the default settings for new jobs. See JobOpts for more information. Optional.
settings: AdvancedSettings- Advanced Queue configuration settings. These should usually not be changed. See AdvancedSettings for more information. Optional.
All the options are optional, providing detailed control over queue behavior. These are passed directly to the Bull
Queue constructor. Read more about these options here.
To register a queue, import the
BullModule#registerQueue() dynamic module, as follows:
info Hint Create multiple queues by passing multiple comma-separated configuration objects to the
registerQueue() method is used to instantiate and/or register queues. Queues are shared across modules and processes that connect to the same underlying Redis database with the same credentials. Each queue is unique by its name property. A queue name is used as both an injection token (for injecting the queue into controllers/providers), and as an argument to decorators to associate consumer classes and listeners with queues.
You can also override some of the pre-configured options for a specific queue, as follows:
Since jobs are persisted in Redis, each time a specific named queue is instantiated (e.g., when an app is started/restarted), it attempts to process any old jobs that may exist from a previous unfinished session.
Each queue can have one or many producers, consumers, and listeners. Consumers retrieve jobs from the queue in a specific order: FIFO (the default), LIFO, or according to priorities. Controlling queue processing order is discussed here.
If your queues connect to multiple different Redis instances, you can use a technique called named configurations. This feature allows you to register several configurations under specified keys, which then you can refer to in the queue options.
For example, assuming that you have an additional Redis instance (apart from the default one) used by a few queues registered in your application, you can register its configuration as follows:
In the example above,
'alternative-config' is just a configuration key (it can be any arbitrary string).
With this in place, you can now point to this configuration in the
registerQueue() options object:
Job producers add jobs to queues. Producers are typically application services (Nest providers). To add jobs to a queue, first inject the queue into the service as follows:
info Hint The
@InjectQueue()decorator identifies the queue by its name, as provided in the
registerQueue()method call (e.g.,
Now, add a job by calling the queue's
Jobs may have unique names. This allows you to create specialized consumers that will only process jobs with a given name.
Warning Warning When using named jobs, you must create processors for each unique name added to a queue, or the queue will complain that you are missing a processor for the given job. See here for more information on consuming named jobs.
Jobs can have additional options associated with them. Pass an options object after the
job argument in the
Queue.add() method. Job options properties are:
number- Optional priority value. Ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that using priorities has a slight impact on performance, so use them with caution.
number- An amount of time (milliseconds) to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized.
number- The total number of attempts to try the job until it completes.
RepeatOpts- Repeat job according to a cron specification. See RepeatOpts.
number | BackoffOpts- Backoff setting for automatic retries if the job fails. See BackoffOpts.
boolean- If true, adds the job to the right end of the queue instead of the left (default false).
number- The number of milliseconds after which the job should fail with a timeout error.
string- Override the job ID - by default, the job ID is a unique integer, but you can use this setting to override it. If you use this option, it is up to you to ensure the jobId is unique. If you attempt to add a job with an id that already exists, it will not be added.
boolean | number- If true, removes the job when it successfully completes. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the completed set.
boolean | number- If true, removes the job when it fails after all attempts. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the failed set.
number- Limits the amount of stack trace lines that will be recorded in the stacktrace.
Here are a few examples of customizing jobs with job options.
To delay the start of a job, use the
delay configuration property.
To add a job to the right end of the queue (process the job as LIFO (Last In First Out)), set the
lifo property of the configuration object to
To prioritize a job, use the
A consumer is a class defining methods that either process jobs added into the queue, or listen for events on the queue, or both. Declare a consumer class using the
@Processor() decorator as follows:
Where the decorator's string argument (e.g.,
'audio') is the name of the queue to be associated with the class methods.
Within a consumer class, declare job handlers by decorating handler methods with the
The decorated method (e.g.,
transcode()) is called whenever the worker is idle and there are jobs to process in the queue. This handler method receives the
job object as its only argument. The value returned by the handler method is stored in the job object and can be accessed later on, for example in a listener for the completed event.
Job objects have multiple methods that allow you to interact with their state. For example, the above code uses the
progress() method to update the job's progress. See here for the complete
Job object API reference.
You can designate that a job handler method will handle only jobs of a certain type (jobs with a specific
name) by passing that
name to the
@Process() decorator as shown below. You can have multiple
@Process() handlers in a given consumer class, corresponding to each job type (
name). When you use named jobs, be sure to have a handler corresponding to each name.
Bull generates a set of useful events when queue and/or job state changes occur. Nest provides a set of decorators that allow subscribing to a core set of standard events. These are exported from the
Event listeners must be declared within a consumer class (i.e., within a class decorated with the
@Processor() decorator). To listen for an event, use one of the decorators in the table below to declare a handler for the event. For example, to listen to the event emitted when a job enters the active state in the
audio queue, use the following construct:
Since Bull operates in a distributed (multi-node) environment, it defines the concept of event locality. This concept recognizes that events may be triggered either entirely within a single process, or on shared queues from different processes. A local event is one that is produced when an action or state change is triggered on a queue in the local process. In other words, when your event producers and consumers are local to a single process, all events happening on queues are local.
When a queue is shared across multiple processes, we encounter the possibility of global events. For a listener in one process to receive an event notification triggered by another process, it must register for a global event.
Event handlers are invoked whenever their corresponding event is emitted. The handler is called with the signature shown in the table below, providing access to information relevant to the event. We discuss one key difference between local and global event handler signatures below.
|Local event listeners||Global event listeners||Handler method signature / When fired|
When listening for global events, the method signatures can be slightly different from their local counterpart. Specifically, any method signature that receives
job objects in the local version, instead receives a
number) in the global version. To get a reference to the actual
job object in such a case, use the
Queue#getJob method. This call should be awaited, and therefore the handler should be declared
async. For example:
info Hint To access the
Queueobject (to make a
getJob()call), you must of course inject it. Also, the Queue must be registered in the module where you are injecting it.
In addition to the specific event listener decorators, you can also use the generic
@OnQueueEvent() decorator in combination with either
BullQueueGlobalEvents enums. Read more about events here.
Queue's have an API that allows you to perform management functions like pausing and resuming, retrieving the count of jobs in various states, and several more. You can find the full queue API here. Invoke any of these methods directly on the
Queue object, as shown below with the pause/resume examples.
Pause a queue with the
pause() method call. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.
To resume a paused queue, use the
resume() method, as follows:
Job handlers can also be run in a separate (forked) process (source). This has several advantages:
- The process is sandboxed so if it crashes it does not affect the worker.
- You can run blocking code without affecting the queue (jobs will not stall).
- Much better utilization of multi-core CPUs.
- Less connections to redis.
Please note that because your function is being executed in a forked process, Dependency Injection (and IoC container) won't be available. That means that your processor function will need to contain (or create) all instances of external dependencies it needs.
You may want to pass
bull options asynchronously instead of statically. In this case, use the
forRootAsync() method which provides several ways to deal with async configuration. Likewise, if you want to pass queue options asynchronously, use the
One approach is to use a factory function:
Our factory behaves like any other asynchronous provider (e.g., it can be
async and it's able to inject dependencies through
Alternatively, you can use the
The construction above will instantiate
BullModule and use it to provide an options object by calling
createBullOptions(). Note that this means that the
BullConfigService has to implement the
BullOptionsFactory interface, as shown below:
In order to prevent the creation of
BullModule and use a provider imported from a different module, you can use the
This construction works the same as
useClass with one critical difference -
BullModule will lookup imported modules to reuse an existing
ConfigService instead of instantiating a new one.
A working example is available here.