Skip to content

Pulse

WARNING

@tsed/pulse is deprecated and will be removed in a future major release. This package won't receive any future updates.

For new projects, use @tsed/agenda with Agenda v6. Existing consumers should migrate to @tsed/agenda + agenda + @agendajs/mongo-backend.

The legacy Pulse examples below are kept only to help existing consumers migrate old code.

Feature

@pulsecron/pulse is maintained fork of the Agenda.

Currently, @tsed/pulse allows you to decorate classes with @Pulse and corresponding methods to have them picked up by the @pulsecron/pulse library to be scheduled automatically (@Every) or programmatically (@Define) via the PulseService.

For more information about Pulse look at the documentation here;

Installation

To begin, install the Pulse module for Ts.ED:

sh
npm install --save @tsed/pulse @pulsecron/pulse
sh
yarn add @tsed/pulse @pulsecron/pulse
sh
pnpm add @tsed/pulse @pulsecron/pulse
sh
bun add @tsed/pulse @pulsecron/pulse

Migrate to Agenda

Migrate from @tsed/pulse to @tsed/agenda on top of Agenda v6.

diff
import {Configuration} from "@tsed/di";
-import "@tsed/pulse";
+import "@tsed/agenda";
+import {MongoBackend} from "@agendajs/mongo-backend";

 @Configuration({
-  pulse: {
+  agenda: {
     enabled: true,
-    db: {
-      address: mongoConnectionString
-    }
+    backend: new MongoBackend({
+      address: mongoConnectionString
+    })
   }
 })
 export class Server {}

AI migration note

Apply these deterministic rewrites:

text
Migrate from @tsed/pulse to @tsed/agenda:

- `import "@tsed/pulse"` → `import "@tsed/agenda"`
- add `import {MongoBackend} from "@agendajs/mongo-backend"`
- `@Pulse(...)` → `@JobsController(...)`
- `PulseModule` → `Agenda` from `agenda`
- `pulse` config key → `agenda`
- `pulse.db.address` → `agenda.backend: new MongoBackend({address: ...})`
- `pulse.db.collection` / `pulse.db.options` → `agenda.backend: new MongoBackend({collection, options})`
- `pulse.mongo` / `pulse.repository` → `agenda.backend`
- `pulse.ensureIndex` / `pulse.sort` → move into `new MongoBackend(...)`
- `@pulsecron/pulse` dependency → `agenda` + `@agendajs/mongo-backend`

For more details about @tsed/agenda, see [Agenda for Ts.ED](https://tsed.dev/ai/tutorials/agenda.md).

Configure your server

Import @tsed/pulse in your Server:

typescript
import {Configuration} from "@tsed/di";
import "@tsed/pulse"; // import pulse ts.ed module

const mongoConnectionString = "mongodb://127.0.0.1/pulse";

@Configuration({
  pulse: {
    enabled: true, // Enable Pulse jobs for this instance.
    // drainJobsBeforeStop: true, // Wait for jobs to finish before stopping the pulse process.
    // disableJobProcessing: true, // Prevents jobs from being processed.
    // pass any options that you would normally pass to new Pulse(), e.g.
    db: {
      address: mongoConnectionString
    }
  }
})
export class Server {}

Create a new Service

Decorate the class with @Pulse. The namespace option is optional and will prefix the job name with namespace.

Use the @Every decorator to define a cron-like job that gets automatically scheduled based on the given interval. The name is optional and by default the method name is used as job name.

Use the @Define decorator on methods that you would like to schedule programmatically via the PulseService and Pulse instance access.

ts
import {Pulse, Every, Define} from "@tsed/pulse";
import {Job} from "@pulsecron/pulse";

@Pulse({namespace: "email"})
export class EmailJobService {
  @Every("60 minutes", {
    name: "maintenanceJob"
    /* ... and any option you would normally pass to pulse.every/define */
  })
  async sendAdminStatistics(job: Job) {
    // implement something here
  }

  @Define({
    name: "sendWelcomeEmail"
    /*  ... and any option you would normally pass to pulse.define(...) */
  })
  async sendWelcomeEmail(job: Job) {
    // implement something here
  }

  @Define({name: "sendFollowUpEmail"})
  async sendFollowUpEmail(job: Job) {
    // implement something here
  }
}

Define a job processor manually

PulseModule exposes methods to manually define a job processor. It can be useful to define a job processor when you need to fetch data beforehand and dynamically build job name / options.

typescript
import {Pulse, PulseModule, Define} from "@tsed/pulse";

@Pulse({namespace: "email"})
export class EmailJobService {
  @Inject()
  pulse: PulseModule;

  @Inject()
  httpClient: HttpClient;

  cache: Map<string, Job[]> = new Map();

  @Define({
    name: "sendWelcomeEmail",
    concurrency: 3
    /*  ... and any option you would normally pass to pulse.define(...) */
  })
  async sendWelcomeEmail(job: Job) {
    // implement something here
    console.log(job.attrs.data.locale);
  }

  async $beforePulseStart() {
    const locales = await this.httpClient.get("/locales");

    this.cache.set(
      "sendWelcomeEmail",
      locales.map((locale) => {
        return this.pulse.create("sendWelcomeEmail", {locale});
      })
    );
  }

  async $afterPulseStart() {
    const jobs = this.cache.get("sendWelcomeEmail");

    await Promise.all(jobs.map((job) => job.repeatEvery("1 week").save()));
  }
}

Inject Pulse

Inject the PulseService instance to interact with it directly, e.g. to schedule a job manually.

typescript
import {Service} from "@tsed/di";
import {AfterRoutesInit} from "@tsed/platform-params";
import {PulseModule} from "@tsed/pulse";

@Service()
export class UsersService {
  @Inject()
  private pulse: PulseModule;

  async create(user: User): Promise<User> {
    // do something
    // ...
    // then schedule some jobs
    await this.pulse.now("email.sendWelcomeEmail", {user});
    await this.pulse.schedule("in 2 hours", "email.sendFollowUpEmail", {user});

    return user;
  }
}

Authors

Maintainers

Released under the MIT License.