Bullmq - Queues and Workers

Bullmq - Queues and Workers

ยท

5 min read

What is bullmq?

Bullmq is like a smart helper that ensures your app runs smoothly by managing all the time-consuming tasks without getting in your way. It's a node js library useful for making micro-service architecture. It's a queue system built on top of Redis (i.e. Redis is like a super-fast and smart storage system for computer programs. Imagine you have a magic box that can remember things really quickly and never forgets).

It's built on top of 4 classes, these are Queue, Worker, QueueEvents, FlowProducer.

To start working with this we need to first establish a connection with a Redis database.

Queue -

It's a list of jobs waiting in line to be processed one by one. You can imagine it as a road lane and the jobs be the vehicles here.

  1. Make the queue - We are making the queue of cars below.

    const queue = new Queue('Cars');

  2. Add jobs in the queue - Here paint is the job name and the object is the payload or the information associated with the job.

    await queue.add('paint', { color: 'red' });

  3. Auto removal of jobs - we can specify parameters that will remove jobs based on their status ( i.e. completed, failed).

     await myQueue.add(
       'test',
       { foo: 'bar' },
       { removeOnComplete: true, removeOnFail: true },
     );
    
  4. Adding jobs in bulk - we can simply pass the array inside the addBulk function.

     const jobs = await queue.addBulk([
       { name, data: { paint: 'car' } },
       { name, data: { paint: 'house' } },
       { name, data: { paint: 'boat' } },
     ]);
    
  5. Removing all jobs - await queue.drain();

  6. Clean jobs - it's a bit different from the remove one. It's used to remove jobs in a specific condition.

     const deletedJobIds = await queue.clean(
       60000, // jobs older than 1 min will be cleaned
       1000, // max number of jobs to clean
       'paused', // only paused state jobs will be cleaned
     );
    
  7. Obliterate - This method permanently deletes the entire queue and all its contents, making it useful when you want to completely remove the queue and all related data from your application. Keep in mind that using obliterate will result in the loss of all data associated with the queue, so it should be used with caution. await queue.obliterate();

Workers -

It's like a busy helper that does the tasks you give it. Imagine you have a list of chores, and you hire someone to do those chores for you.

The Worker class is used to create a worker that processes jobs from a queue and the Job class represents a job in the queue.

import { Worker, Job } from 'bullmq';
const worker = new Worker(queueName, async (job: Job) => {
  // Optionally report some progress about the job
  await job.updateProgress(42);

  // Optionally sending an object as progress
  await job.updateProgress({ foo: 'bar' });

  // Do something with job
  return 'some value';
});

This above code will autorun itself when the file will be executed. In order to decide at what point in code you want your workers to run you can set the autorun to false and then use worker.run() to process the jobs.

import { Worker, Job } from 'bullmq';
const worker = new Worker(
  queueName,
  async (job: Job) => {
    // Optionally report some progress
    await job.updateProgress(42);
    // Optionally sending an object as progress
    await job.updateProgress({ foo: 'bar' });
    // Do something with job
    return 'some value';
  },
  { autorun: false },
);
worker.run();

We have events also for which we can listen to our worker, these are completed, progress, failed and error.

  1. completed - an event listener on the worker instance to listen for the 'completed' event. The 'completed' event is triggered when a job processed by the worker is completed.

     worker.on('completed', (job: Job, returnvalue: any) => {
       // Do something with the return value.
     });
    
  2. progress - an event listener on the worker instance to listen for the 'progress' event. The 'progress' event is triggered when a job processed by the worker reports progress using the updateProgress method.

     worker.on('progress', (job: Job, progress: number) => {
       // Do something with the return value.
     });
    
  3. fail - The 'failed' event is triggered when a job processed by the worker encounters an error during execution.

     worker.on('failed', (job: Job, error: Error) => {
       // Do something with the return value.
     });
    
  4. error - Similar to fail but in context to the error thrown by worker. More generally, used to handle errors in various contexts, including job processing and other operations, emitted by different objects.

     worker.on('error', err => {
       // log the error
       console.error(err);
     });
    
  5. Concurrency - refers to the number of jobs that a worker can process simultaneously. It determines how many tasks a worker can work on concurrently or at the same time.

    worker.concurrency = 5; This is only possible if the worker does async operations.

  6. Shutdown - we can shut down the worker using await worker.close();

  7. Stalled jobs - refers to jobs that have been taken by a worker for processing but haven't been completed or failed within a certain time threshold.

  8. Sandboxed processors - refer to a security feature that isolates the execution of job processing code within a controlled environment.

    Define the process in separate file

     import { SandboxedJob } from 'bullmq';
     module.exports = async (job: SandboxedJob) => {
         // Do something with job
     };
    

    and connect it to the worker constructor (i.e. It's like a blueprint or a set of instructions for creating an instance of a class):

     import { Worker } from 'bullmq'
     const processorFile = path.join(__dirname, 'my_procesor.js');
     worker = new Worker(queueName, processorFile);
    
  9. Worker threads - allow you to process multiple jobs simultaneously, improving the efficiency of your job queue processing. To enable it -

     import { Worker } from 'bullmq'
    
     const processorFile = path.join(__dirname, 'my_procesor.js');
     worker = new Worker(queueName, processorFile, { useWorkerThreads: true });
    
  10. Pausing queues - A queue is paused globally when no workers will pick up any jobs from the queue.

    // pauses the queue
    await myQueue.pause();
    
    // will pause the worker after the currently being processed jobs are over
    await myWorker.pause();
    
    // pause immediately and not wait
    await myWorker.pause(true);
    

Another blog on the rest of the part will be uploaded soon.
For any doubts and queries, you can comment below ๐Ÿ˜„

ย