Skip to content

BullMQ

Feature

The BullMQ Module for Ts.ED allows you to decorate a class using the @JobController decorator and implement the JobMethods interface provided by the module. Repeatable Jobs can also be defined using this decorator.

For more information about BullMQ look at the documentation here;

Installation

To begin, install the BullMQ module for Ts.ED:

sh
npm install @tsed/bullmq bullmq
sh
yarn add @tsed/bullmq bullmq
sh
pnpm add @tsed/bullmq bullmq
sh
bun add @tsed/bullmq bullmq

Configure the Server

Import the @tsed/bullmq module in your server

ts
import {Configuration} from "@tsed/di";
import "@tsed/bullmq"; // import bullmq ts.ed module

@Configuration({
  bullmq: {
    // Define queue names.
    // Note: Since v7.60.0 this options is not required anymore, excepted if queue is not defined in JobController decorator
    queues: ["default", "special"],
    connection: {
      // redisio connection options
    },
    defaultQueueOptions: {
      // Default queue options which are applied to every queue
      // Can be extended/overridden by `queueOptions`
    },
    queueOptions: {
      special: {
        // Specify additional queue options by queue name
      }
    },
    // Disable the creation of any worker.
    // All other worker configuration will be ignored
    disableWorker: true,
    // Specify for which queues to start a worker for.
    // When undefined falls back to all queues specified.
    workerQueues: ["default"],
    defaultWorkerOptions: {
      // Default worker options which are applied to every worker
      // Can be extended/overridden by `workerOptions`
    },
    workerOptions: {
      special: {
        // Specify additional worker options by queue name
      }
    }
  }
})
export class Server {}

Define a Job

A job is defined as a class decorated with the @JobController decorator and implementing the JobMethods interface of the @tsed/bullmq package

ts
import {JobController, JobMethods} from "@tsed/bullmq";

@JobController("example")
class ExampleJob implements JobMethods {
  public handle(payload: {msg: string}) {
    console.info("New message incoming", payload.msg);
  }
}

You can also specify a non default queue as the second argument in the decorator and add any other job specific options as a third argument

ts
import {JobController, JobMethods} from "@tsed/bullmq";

@JobController("other-example", "other-queue", {
  attempts: 42
})
class OtherExampleJob implements JobMethods {
  public handle(payload: {num: number}) {
    console.info("look at my awesome number: ", payload.num);
  }
}

Defining a custom job id within a job

The JobMethods interface has an optional method jobId, which when implemented instructs the dispatcher to use it when defining the id for the job.

The method will accept the job payload, and since it is defined within the job class, it will also have access to all injected services.

ts
import {JobController, JobMethods} from "@tsed/bullmq";

@JobController("example-with-custom-id")
class ExampleJobWithCustomId implements JobMethods {
  public handle(payload: {num: number}) {
    console.info("look at my awesome number: ", payload.num);
  }

  public jobId(payload: {num: number}): string {
    return `very realistic job id #${payload.num}`;
  }
}

Keep in mind, though, that when defining a job using the dispatcher and dispatching the job, the ID defined using the dispatcher will take precedence!

ts
this.dispatcher(ExampleJobWithCustomId, {num: 1}); // id: 'very realistic job id #1'
this.dispatcher(ExampleJobWithCustomId, {num: 2}, {jobId: "I do my own thing!"}); // id: 'I do my own thing!'

Defining a repeating job

Jobs that should be run regularly on a schedule can also easily be defined using the @JobController decorator. Doing so will automatically dispatch it without any data.

ts
import {JobController, JobMethods} from "@tsed/bullmq";

@JobController("my-cron-job", "default", {
  repeat: {
    pattern: "* * * * *"
  }
})
class MyCronJob implements JobMethods {
  public handle() {
    console.info("I run every minute!");
  }
}

To register the job, you now need to import it into the server so that it can be detected.

ts
import {Configuration} from "@tsed/di";
import "@tsed/bullmq"; // import bullmq ts.ed module

import "./jobs/MyCronJob";

@Configuration()
// server configuration
export class Server {}

Defining a fallback job

Because the @JobController requires a name, you can not use it in case you have dynamic job names. For this usecase there is a @FallbackJobController, which allows you to define a fallback method globally or per queue:

ts
import {FallbackJobController, JobMethods} from "@tsed/bullmq";

@FallbackJobController("foo")
class FooFallbackController implements JobMethods {
  public handle() {
    console.info(`I run for every job within the "foo" queue, which doesn't have its own JobController`);
  }
}

@FallbackJobController()
class GlobalFallbackController implements JobMethods {
  public handle() {
    console.info(`I run for every job in every other queue, which doesn't have its own JobController`);
  }
}

You also have to register the fallback job in the server:

ts
import {Configuration} from "@tsed/di";
import "@tsed/bullmq"; // import bullmq ts.ed module

import "./jobs/MyFallbackJobs";

@Configuration()
// server configuration
export class Server {}

Dispatching jobs

Dispatching jobs is done via the JobDispatcher service that takes the job to be dispatched and its payload.

ts
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";

@Service()
class MyService {
  constructor(private readonly dispatcher: JobDispatcher) {}

  public async doingSomething() {
    await this.dispatcher.dispatch(ExampleJob, {msg: "this message is part of the payload for the job"});

    console.info("I just dispatched a job!");
  }
}

In addition to statically defined job options when declaring the job, custom job options can also be set when dispatching the job. This allows, for example, the job to be delayed from its original dispatch time.

ts
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";

@Service()
class MyService {
  constructor(private readonly dispatcher: JobDispatcher) {}

  public async doingSomething() {
    await this.dispatcher.dispatch(
      ExampleJob,
      {msg: "this message is part of the payload for the job"},
      {
        delay: 600_000 // 10 minutes in milliseconds
      }
    );

    console.info("I just dispatched a job!");
  }
}

In case you want to be more flexible, you can also dispatch a job via a name or a queue/name combination.

Note: This normally only makes sense when you have a Fallback Job configured.

ts
import {Service} from "@tsed/di";
import {JobDispatcher} from "@tsed/bullmq";
import {ExampleJob} from "./jobs/ExampleJob";

@Service()
class MyService {
  constructor(private readonly dispatcher: JobDispatcher) {}

  public async doingSomething() {
    // When passing only a name, the job will be dispatched in a queue named "default"
    await this.dispatcher.dispatch("some-name", {msg: "this message is part of the payload for the job"});

    // You can also specifiy which queue to use
    await this.dispatcher.dispatch({queue: "some-queue", name: "some-name"}, {msg: "this message is part of the payload for the job"});
  }
}

Inject a Queue v7.60.0+

While JobDispatcher is the recommended way to dispatch jobs use class token, this can be useful in some cases to manipulate the queue directly. You can inject a queue using the @InjectQueue decorator.

ts
import {InjectQueue, JobController} from "@tsed/bullmq";
import {Queue} from "bullmq";

@JobController("example")
class ExampleJob implements JobMethods {
  @InjectQueue("default")
  private readonly queue?: Queue;

  $onInit() {
    if (this.queue) {
      // do something with the queue
      this.queue.add("some-job", {msg: "this message is part of the payload for the job"});
    }
  }

  public handle(payload: {msg: string}) {
    console.info("New message incoming", payload.msg);
  }
}

Inject a Worker v7.60.0+

You can also inject a worker using the @InjectWorker decorator.

ts
import {InjectWorker, JobController} from "@tsed/bullmq";

@JobController("example")
class ExampleJob implements JobMethods {
  @InjectWorker("default")
  private readonly worker?: Worker;

  $onInit() {
    if (this.worker) {
      // do something with the worker
      this.worker.on("completed", (job) => {
        console.log("Job completed", job);
      });
    }
  }

  public handle(payload: {msg: string}) {
    console.info("New message incoming", payload.msg);
  }
}

Authors

Maintainers

Released under the MIT License.