Published on

在 Express 中使用 node-cron 集成定时任务

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. 目录结构设计

合理的目录结构有助于代码可维护性,推荐以下结构:

src/
 ├── app.ts                  # Express 主应用入口
 ├── server.ts               # 启动服务
 ├── cron/                   # ✅ 定时任务相关
 │   ├── task-runner.ts      # 单独启动的任务入口,避免 PM2 多进程冲突
 │   ├── index.ts            # 任务注册 & 启动
 │   ├── taskList.ts         # 存放任务列表
 │   ├── executeTask.ts      # 封装任务执行逻辑
 │   └── taskDefinitions.ts  # 存放任务函数
 ├── controllers/
 ├── services/
 ├── middlewares/
 ├── utils/
 ├── .env
 ├── ecosystem.config.js     # ✅ PM2 配置文件
 └── package.json

2. 编写定时任务

  1. 类型支持

    // src/types/task.ts
    export type TaskFunction = () => Promise<void>;
    
    export interface CronTask {
      cron: string;
      fn: TaskFunction;
    }
    
  2. 使用Mongodb持久化任务(可选)

    // src/models/taskLog.ts
    import mongoose, { ObjectId, Schema } from "mongoose";
    
    export enum TaskStatus {
      RUNNING = "running",
      SUCCESS = "success",
      FAILED = "failed",
      SKIP = "skip",
      INTERRUPTED = "interrupted",
    }
    
    export interface ITaskLogModel {
      _id: ObjectId;
      taskName: string; // 任务名称
      status: TaskStatus; // 任务状态
      error?: string; // 错误信息
      startTime: Date; // 开始时间
      endTime?: Date; // 结束时间
      duration?: number; // 执行时长(毫秒)
      createdAt: Date;
      updatedAt: Date;
      markAsSuccess(): Promise<ITaskLogModel>;
      markAsFailed(error: string): Promise<ITaskLogModel>;
    }
    
    interface ITaskLogModelStatic {
      skipTask(taskName: string): Promise<ITaskLogModel>;
      createTask(taskName: string): Promise<ITaskLogModel>;
      findByTaskName(
        taskName: string,
        page?: number,
        pageSize?: number,
        sortBy?: string,
        sortOrder?: "asc" | "desc"
      ): Promise<{
        data: ITaskLogModel[];
        pagination: {
          total: number;
          page: number;
          pageSize: number;
          totalPages: number;
        };
      }>;
      findFailedTasks(
        page?: number,
        pageSize?: number,
        sortBy?: string,
        sortOrder?: "asc" | "desc"
      ): Promise<{
        data: ITaskLogModel[];
        pagination: {
          total: number;
          page: number;
          pageSize: number;
          totalPages: number;
        };
      }>;
    }
    
    const TaskLogSchema = new Schema<ITaskLogModel>(
      {
        taskName: {
          type: String,
          required: [true, "任务名称不能为空"],
          trim: true,
          maxlength: [100, "任务名称不能超过100个字符"],
        },
        status: {
          type: String,
          enum: Object.values(TaskStatus),
          default: TaskStatus.RUNNING,
        },
        startTime: {
          type: Date,
          required: [true, "开始时间不能为空"],
        },
        endTime: {
          type: Date,
          validate: {
            validator: function (this: ITaskLogModel, value: Date) {
              return !value || value >= this.startTime;
            },
            message: "结束时间不能早于开始时间",
          },
        },
        error: {
          type: String,
          maxlength: [1000, "错误信息不能超过1000个字符"],
        },
        duration: {
          type: Number,
          min: [0, "执行时长不能为负数"],
        },
      },
      {
        timestamps: true,
      }
    );
    
    // 在保存前计算执行时长
    TaskLogSchema.pre("save", function (next) {
      if (this.endTime && this.startTime) {
        this.duration = this.endTime.getTime() - this.startTime.getTime();
      }
      next();
    });
    
    TaskLogSchema.statics.createTask = function (
      taskName: string
    ): Promise<ITaskLogModel> {
      return new this({
        taskName,
        status: TaskStatus.RUNNING,
        startTime: new Date(),
      }).save();
    };
    
    TaskLogSchema.statics.skipTask = function (
      taskName: string
    ): Promise<ITaskLogModel> {
      return new this({
        taskName,
        status: TaskStatus.SKIP,
        startTime: new Date(),
      }).save();
    };
    
    // 添加静态方法
    TaskLogSchema.statics.findByTaskName = function (
      taskName: string,
      page: number = 1,
      pageSize: number = 10,
      sortBy: string = "createdAt",
      sortOrder: "asc" | "desc" = "desc"
    ) {
      const skip = (page - 1) * pageSize;
      const sort = { [sortBy]: sortOrder === "desc" ? -1 : 1 };
    
      return Promise.all([
        this.find({ taskName }).sort(sort).skip(skip).limit(pageSize),
        this.countDocuments({ taskName }),
      ]).then(([data, total]) => ({
        data,
        pagination: {
          total,
          page,
          pageSize,
          totalPages: Math.ceil(total / pageSize),
        },
      }));
    };
    
    TaskLogSchema.statics.findFailedTasks = function (
      page: number = 1,
      pageSize: number = 10,
      sortBy: string = "updatedAt",
      sortOrder: "asc" | "desc" = "desc"
    ) {
      const skip = (page - 1) * pageSize;
      const sort = { [sortBy]: sortOrder === "desc" ? -1 : 1 };
    
      return Promise.all([
        this.find({ status: TaskStatus.FAILED })
          .sort(sort)
          .skip(skip)
          .limit(pageSize),
        this.countDocuments({ status: TaskStatus.FAILED }),
      ]).then(([data, total]) => ({
        data,
        pagination: {
          total,
          page,
          pageSize,
          totalPages: Math.ceil(total / pageSize),
        },
      }));
    };
    
    // 添加实例方法
    TaskLogSchema.methods.markAsSuccess = function () {
      this.status = TaskStatus.SUCCESS;
      this.endTime = new Date();
      return this.save();
    };
    
    TaskLogSchema.methods.markAsFailed = function (error: string) {
      this.status = TaskStatus.FAILED;
      this.error = error;
      this.endTime = new Date();
      return this.save();
    };
    
    // 优化索引
    TaskLogSchema.index({ taskName: 1, createdAt: -1 }); // 按任务名称和时间查询
    TaskLogSchema.index({ status: 1, updatedAt: -1 }); // 按状态和时间查询
    TaskLogSchema.index({ startTime: -1 }); // 按开始时间查询
    
    export const TaskLog = mongoose.model<
      ITaskLogModel,
      mongoose.Model<ITaskLogModel, {}, {}, {}, any> & ITaskLogModelStatic
    >("TaskLog", TaskLogSchema);
    
  3. logTaskExecution 用于记录任务执行记录

    // src/utils/logTaskExecution.ts
    import { TaskLog } from "src/models/taskLog";
    /**
    * The function `logTaskExecution` logs the start and completion of a task, along with its status and
    * any errors encountered, while also storing this information in a database.
    * @param {string} taskName - The `taskName` parameter is a string that represents the name or
    * identifier of the task being executed. It is used for logging and tracking the task's progress and
    * outcome.
    * @param taskFn - The `taskFn` parameter in the `logTaskExecution` function is a function that returns
    * a promise. This function represents the task that will be executed and logged by the
    * `logTaskExecution` function. It should be an asynchronous function that performs the actual task
    * logic and resolves the promise when the task is completed successfully. If the task encounters an error, it should
    * reject the promise, which will be caught by the `logTaskExecution` function to log the error
    * information.
    * @returns A promise that resolves to void.
    * @example
    * logTaskExecution("MyTask", async () => {
    *   // Your task logic here
    *   await someAsyncOperation();
    * });
    */
    export async function logTaskExecution(
      taskName: string,
      taskFn: () => Promise<void>
    ) {
      if (!taskName || !taskFn) {
        throw new Error("Task name and function are required.");
      }
      if (typeof taskName !== "string") {
        throw new Error("Task name must be a string.");
      }
      if (typeof taskFn !== "function") {
        throw new Error("Task function must be a function.");
      }
      const startTime = new Date();
      console.log(`[${taskName}] Task started at:`, startTime.toISOString());
      let tasklog = await TaskLog.createTask(taskName);
    
      try {
        await taskFn();
        const endTime = new Date();
        console.log(
          `[${taskName}] Task completed successfully at:`,
          endTime.toISOString()
        );
        tasklog = await tasklog.markAsSuccess();
      } catch (error) {
        const endTime = new Date();
        console.error(
          `[${taskName}] Task failed at:`,
          endTime.toISOString(),
          "Error:",
          error instanceof Error ? error : new Error(String(error))
        );
        await tasklog.markAsFailed(
          error instanceof Error ? error.message : String(error)
        );
      }
    }
    
  4. executeTask封装任务执行, 用于格式化任务注册, 使用taskQueue实现任务跳过逻辑, 避免同一任务并发执行

    // src/cron/executeTask.ts
    import { TaskLog } from "src/models/taskLog";
    import { TaskFunction } from "src/types/task";
    import { logTaskExecution } from "src/utils/logTaskExecution";
    
    const taskQueue: Map<string, boolean> = new Map();
    
    export const executeTask = async (taskName: string, taskFn: TaskFunction) => {
      if (taskQueue.get(taskName)) {
        await TaskLog.skipTask(taskName);
        logger.info(`Task "${taskName}" is still running, skipping...`);
        return;
      }
    
      taskQueue.set(taskName, true);
      logger.info(`Starting task "${taskName}" at ${new Date().toISOString()}`);
    
      try {
        await logTaskExecution(taskName, taskFn);
        logger.info(`Task "${taskName}" completed.`);
      } catch (error) {
        logger.info(`Task "${taskName}" failed:`, error);
      } finally {
        taskQueue.set(taskName, false);
      }
    };
    
  5. src/cron/taskDefinitions.ts存放任务

  6. taskList.ts 添加任务

    // src/cron/taskList.ts
    import { CronTask } from "src/types/tasks";
    
    export const tasks: Record<string, CronTask> = {};
    
    tasks["task1"] = {
      cron: "*/1 * * * *",
      fn: async () => {
        ...
        await task1();
      },
    };
    
    tasks["task2"] = {
      cron: "*/2 * * * *",
      fn: task2,
    };
    
  7. src/cron/index.ts统一启动任务

    // src/cron/index.ts
    import cron from "node-cron";
    import { executeTask } from "src/cron/executeTask";
    import { tasks } from "src/cron/taskList";
    
    // 统一注册任务
    export function registerCronJobs() {
      logger.info(`${JSON.stringify(tasks)}`);
      Object.entries(tasks).forEach(([taskName, { cron: cronExpression, fn }]) => {
        cron.schedule(cronExpression, () => executeTask(taskName, fn));
      });
      console.log("Task queue started");
    }
    
  8. task-runner.ts (单独启动的任务入口,避免 PM2 多进程冲突), 如果单进程express, 直接将startTaskQueue导入express的启动文件

    PM2 推荐做法: 将所有定时任务放在 task-runner.ts 中,并使用 pm2 start task-runner.ts 单独启动.

    // 若作为独立进程启动,确保入口点生效
    if (require.main === module) {
      registerCronJobs();
      console.log("定时任务已启动");
    }
    

3. Express 启动任务入口

  1. app.ts, 开发环境使用

    // src/app.ts
    import express from "express";
    import { registerCronJobs } from "./cron";  // ✅ 注册定时任务 (仅开发环境使用)
    
    const app = express();
    
    // 注册路由
    app.get("/", (req, res) => res.send("Hello World"));
    
    // 启动定时任务 (PM2 时不推荐,避免重复启动)
    if (process.env.NODE_ENV === "development") {
      registerCronJobs();
    }
    
    export default app;
    
  2. server.ts 独立启动服务

    import app from "./app";
    
    const PORT = process.env.PORT || 3000;
    
    app.listen(PORT, () => {
      console.log(`🚀 Server running at http://localhost:${PORT}`);
    });
    

4. PM2 配置 (避免多进程重复启动)

// ecosystem.config.js
module.exports = {
  apps: [
    {
      name: "app",
      script: "./server.ts",
      exec_mode: "cluster",     // ✅ 启动多进程模式
      instances: "max",         // ✅ 根据 CPU 核心数自动扩展
      env: {
        NODE_ENV: "production",
      },
    },
    {
      name: "cron-task-runner",
      script: "./src/cron/task-runner.ts",  // ✅ 定时任务单独进程
      exec_mode: "fork",                    // ✅ 独立启动,避免重复
      instances: 1,                         // ❗ 限制仅启动 1 个实例,避免冲突
      env: {
        NODE_ENV: "production",
      },
    },
  ],
};

5. 启动流程 (PM2 运行)

# 启动 Express 服务
pm2 start ecosystem.config.js --only app

#启动定时任务 (单独进程)
pm2 start ecosystem.config.js --only cron-task-runner

# 查看进程状态
pm2 list

# 查看任务日志
pm2 logs cron-task-runner

6. 记录任务执行

1. 使用高阶函数

在 TypeScript/JavaScript 中,装饰器只能用于类的方法、属性、访问器、参数或类本身,不能直接用于普通函数 如果希望直接用于方法, 可以 手动包装 这个函数,而不是使用装饰器。

import cron from "node-cron";
import { MongoClient } from "mongodb";

const client = new MongoClient();
const db = client.db();
const logsCollection = db.collection("tasklogs");

/**
* The function `logTaskExecution` logs the start and completion of a task, along with its status and
* any errors encountered, while also storing this information in a database.
* @param {string} taskName - The `taskName` parameter is a string that represents the name or
* identifier of the task being executed. It is used for logging and tracking the task's progress and
* outcome.
* @param taskFn - The `taskFn` parameter in the `logTaskExecution` function is a function that returns
* a promise. This function represents the task that will be executed and logged by the
* `logTaskExecution` function. It should be an asynchronous function that performs the actual task
* logic and resolves the promise when the
 */
async function logTaskExecution(taskName: string, taskFn: () => Promise<void>) {
  if (!taskName || !taskFn) {
    throw new Error("Task name and function are required.");
  }
  if (typeof taskName !== "string") {
    throw new Error("Task name must be a string.");
  }
  if (typeof taskFn !== "function") {
    throw new Error("Task function must be a function.");
  }
  if (!taskFn.length) {
    throw new Error("Task function must accept a callback.");
  }
  const startTime = new Date();
  console.log(`[${taskName}] Task started at:`, startTime.toISOString());

  const logEntry = { taskName, startTime, status: "running" };
  const logId = (await logsCollection.insertOne(logEntry)).insertedId;

  try {
    await taskFn();
    const endTime = new Date();
    console.log(
      `[${taskName}] Task completed successfully at:`,
      endTime.toISOString()
    );
    await logsCollection.updateOne(
      { _id: logId },
      { $set: { endTime, status: "success" } }
    );
  } catch (error) {
    const endTime = new Date();
    console.error(
      `[${taskName}] Task failed at:`,
      endTime.toISOString(),
      "Error:",
      error
    );
    await logsCollection.updateOne(
      {_id: logId },
      {
        $set: {
          endTime,
          status: "failed",
          error: error instanceof Error ? error.message : "Unknown Error",
        },
      }
    );
  }
}

async function someCronTask() {
  console.log("Run update...");
}

cron.schedule("01 09,14 ** *", () =>
  logTaskExecution("someCronTask", someCronTask)
);

2. 使用装饰器, 参考: 以记录定时任务需求为例, 演示装饰器的使用