- Published on
在 Express 中使用 node-cron 集成定时任务
- Authors
- Name
- Shelton Ma
1. 目录结构设计
合理的目录结构有助于代码可维护性,推荐以下结构:
src/
├── app.ts # Express 主应用入口
├── server.ts # 启动服务
├── cron/ # ✅ 定时任务相关
│ ├── task-runner.ts # 单独启动的任务入口,避免 PM2 多进程冲突
│ ├── index.ts # 任务注册 & 启动
│ ├── taskList.ts # 存放任务列表
│ ├── executeTask.ts # 封装任务执行逻辑
│ └── taskDefinitions.ts # 存放任务函数
├── controllers/
├── services/
├── middlewares/
├── utils/
├── .env
├── ecosystem.config.js # ✅ PM2 配置文件
└── package.json
2. 编写定时任务
类型支持
// src/types/task.ts export type TaskFunction = () => Promise<void>; export interface CronTask { cron: string; fn: TaskFunction; }
使用Mongodb持久化任务(可选)
// src/models/taskLog.ts import mongoose, { ObjectId, Schema } from "mongoose"; export enum TaskStatus { RUNNING = "running", SUCCESS = "success", FAILED = "failed", SKIP = "skip", INTERRUPTED = "interrupted", } export interface ITaskLogModel { _id: ObjectId; taskName: string; // 任务名称 status: TaskStatus; // 任务状态 error?: string; // 错误信息 startTime: Date; // 开始时间 endTime?: Date; // 结束时间 duration?: number; // 执行时长(毫秒) createdAt: Date; updatedAt: Date; markAsSuccess(): Promise<ITaskLogModel>; markAsFailed(error: string): Promise<ITaskLogModel>; } interface ITaskLogModelStatic { skipTask(taskName: string): Promise<ITaskLogModel>; createTask(taskName: string): Promise<ITaskLogModel>; findByTaskName( taskName: string, page?: number, pageSize?: number, sortBy?: string, sortOrder?: "asc" | "desc" ): Promise<{ data: ITaskLogModel[]; pagination: { total: number; page: number; pageSize: number; totalPages: number; }; }>; findFailedTasks( page?: number, pageSize?: number, sortBy?: string, sortOrder?: "asc" | "desc" ): Promise<{ data: ITaskLogModel[]; pagination: { total: number; page: number; pageSize: number; totalPages: number; }; }>; } const TaskLogSchema = new Schema<ITaskLogModel>( { taskName: { type: String, required: [true, "任务名称不能为空"], trim: true, maxlength: [100, "任务名称不能超过100个字符"], }, status: { type: String, enum: Object.values(TaskStatus), default: TaskStatus.RUNNING, }, startTime: { type: Date, required: [true, "开始时间不能为空"], }, endTime: { type: Date, validate: { validator: function (this: ITaskLogModel, value: Date) { return !value || value >= this.startTime; }, message: "结束时间不能早于开始时间", }, }, error: { type: String, maxlength: [1000, "错误信息不能超过1000个字符"], }, duration: { type: Number, min: [0, "执行时长不能为负数"], }, }, { timestamps: true, } ); // 在保存前计算执行时长 TaskLogSchema.pre("save", function (next) { if (this.endTime && this.startTime) { this.duration = this.endTime.getTime() - this.startTime.getTime(); } next(); }); TaskLogSchema.statics.createTask = function ( taskName: string ): Promise<ITaskLogModel> { return new this({ taskName, status: TaskStatus.RUNNING, startTime: new Date(), }).save(); }; TaskLogSchema.statics.skipTask = function ( taskName: string ): Promise<ITaskLogModel> { return new this({ taskName, status: TaskStatus.SKIP, startTime: new Date(), }).save(); }; // 添加静态方法 TaskLogSchema.statics.findByTaskName = function ( taskName: string, page: number = 1, pageSize: number = 10, sortBy: string = "createdAt", sortOrder: "asc" | "desc" = "desc" ) { const skip = (page - 1) * pageSize; const sort = { [sortBy]: sortOrder === "desc" ? -1 : 1 }; return Promise.all([ this.find({ taskName }).sort(sort).skip(skip).limit(pageSize), this.countDocuments({ taskName }), ]).then(([data, total]) => ({ data, pagination: { total, page, pageSize, totalPages: Math.ceil(total / pageSize), }, })); }; TaskLogSchema.statics.findFailedTasks = function ( page: number = 1, pageSize: number = 10, sortBy: string = "updatedAt", sortOrder: "asc" | "desc" = "desc" ) { const skip = (page - 1) * pageSize; const sort = { [sortBy]: sortOrder === "desc" ? -1 : 1 }; return Promise.all([ this.find({ status: TaskStatus.FAILED }) .sort(sort) .skip(skip) .limit(pageSize), this.countDocuments({ status: TaskStatus.FAILED }), ]).then(([data, total]) => ({ data, pagination: { total, page, pageSize, totalPages: Math.ceil(total / pageSize), }, })); }; // 添加实例方法 TaskLogSchema.methods.markAsSuccess = function () { this.status = TaskStatus.SUCCESS; this.endTime = new Date(); return this.save(); }; TaskLogSchema.methods.markAsFailed = function (error: string) { this.status = TaskStatus.FAILED; this.error = error; this.endTime = new Date(); return this.save(); }; // 优化索引 TaskLogSchema.index({ taskName: 1, createdAt: -1 }); // 按任务名称和时间查询 TaskLogSchema.index({ status: 1, updatedAt: -1 }); // 按状态和时间查询 TaskLogSchema.index({ startTime: -1 }); // 按开始时间查询 export const TaskLog = mongoose.model< ITaskLogModel, mongoose.Model<ITaskLogModel, {}, {}, {}, any> & ITaskLogModelStatic >("TaskLog", TaskLogSchema);
logTaskExecution
用于记录任务执行记录// src/utils/logTaskExecution.ts import { TaskLog } from "src/models/taskLog"; /** * The function `logTaskExecution` logs the start and completion of a task, along with its status and * any errors encountered, while also storing this information in a database. * @param {string} taskName - The `taskName` parameter is a string that represents the name or * identifier of the task being executed. It is used for logging and tracking the task's progress and * outcome. * @param taskFn - The `taskFn` parameter in the `logTaskExecution` function is a function that returns * a promise. This function represents the task that will be executed and logged by the * `logTaskExecution` function. It should be an asynchronous function that performs the actual task * logic and resolves the promise when the task is completed successfully. If the task encounters an error, it should * reject the promise, which will be caught by the `logTaskExecution` function to log the error * information. * @returns A promise that resolves to void. * @example * logTaskExecution("MyTask", async () => { * // Your task logic here * await someAsyncOperation(); * }); */ export async function logTaskExecution( taskName: string, taskFn: () => Promise<void> ) { if (!taskName || !taskFn) { throw new Error("Task name and function are required."); } if (typeof taskName !== "string") { throw new Error("Task name must be a string."); } if (typeof taskFn !== "function") { throw new Error("Task function must be a function."); } const startTime = new Date(); console.log(`[${taskName}] Task started at:`, startTime.toISOString()); let tasklog = await TaskLog.createTask(taskName); try { await taskFn(); const endTime = new Date(); console.log( `[${taskName}] Task completed successfully at:`, endTime.toISOString() ); tasklog = await tasklog.markAsSuccess(); } catch (error) { const endTime = new Date(); console.error( `[${taskName}] Task failed at:`, endTime.toISOString(), "Error:", error instanceof Error ? error : new Error(String(error)) ); await tasklog.markAsFailed( error instanceof Error ? error.message : String(error) ); } }
executeTask
封装任务执行, 用于格式化任务注册, 使用taskQueue
实现任务跳过逻辑, 避免同一任务并发执行// src/cron/executeTask.ts import { TaskLog } from "src/models/taskLog"; import { TaskFunction } from "src/types/task"; import { logTaskExecution } from "src/utils/logTaskExecution"; const taskQueue: Map<string, boolean> = new Map(); export const executeTask = async (taskName: string, taskFn: TaskFunction) => { if (taskQueue.get(taskName)) { await TaskLog.skipTask(taskName); logger.info(`Task "${taskName}" is still running, skipping...`); return; } taskQueue.set(taskName, true); logger.info(`Starting task "${taskName}" at ${new Date().toISOString()}`); try { await logTaskExecution(taskName, taskFn); logger.info(`Task "${taskName}" completed.`); } catch (error) { logger.info(`Task "${taskName}" failed:`, error); } finally { taskQueue.set(taskName, false); } };
src/cron/taskDefinitions.ts
存放任务taskList.ts
添加任务// src/cron/taskList.ts import { CronTask } from "src/types/tasks"; export const tasks: Record<string, CronTask> = {}; tasks["task1"] = { cron: "*/1 * * * *", fn: async () => { ... await task1(); }, }; tasks["task2"] = { cron: "*/2 * * * *", fn: task2, };
src/cron/index.ts
统一启动任务// src/cron/index.ts import cron from "node-cron"; import { executeTask } from "src/cron/executeTask"; import { tasks } from "src/cron/taskList"; // 统一注册任务 export function registerCronJobs() { logger.info(`${JSON.stringify(tasks)}`); Object.entries(tasks).forEach(([taskName, { cron: cronExpression, fn }]) => { cron.schedule(cronExpression, () => executeTask(taskName, fn)); }); console.log("Task queue started"); }
task-runner.ts
(单独启动的任务入口,避免 PM2 多进程冲突), 如果单进程express, 直接将startTaskQueue
导入express的启动文件PM2 推荐做法: 将所有定时任务放在 task-runner.ts 中,并使用 pm2 start task-runner.ts 单独启动.
// 若作为独立进程启动,确保入口点生效 if (require.main === module) { registerCronJobs(); console.log("定时任务已启动"); }
3. Express 启动任务入口
app.ts, 开发环境使用
// src/app.ts import express from "express"; import { registerCronJobs } from "./cron"; // ✅ 注册定时任务 (仅开发环境使用) const app = express(); // 注册路由 app.get("/", (req, res) => res.send("Hello World")); // 启动定时任务 (PM2 时不推荐,避免重复启动) if (process.env.NODE_ENV === "development") { registerCronJobs(); } export default app;
server.ts 独立启动服务
import app from "./app"; const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`🚀 Server running at http://localhost:${PORT}`); });
4. PM2 配置 (避免多进程重复启动)
// ecosystem.config.js
module.exports = {
apps: [
{
name: "app",
script: "./server.ts",
exec_mode: "cluster", // ✅ 启动多进程模式
instances: "max", // ✅ 根据 CPU 核心数自动扩展
env: {
NODE_ENV: "production",
},
},
{
name: "cron-task-runner",
script: "./src/cron/task-runner.ts", // ✅ 定时任务单独进程
exec_mode: "fork", // ✅ 独立启动,避免重复
instances: 1, // ❗ 限制仅启动 1 个实例,避免冲突
env: {
NODE_ENV: "production",
},
},
],
};
5. 启动流程 (PM2 运行)
# 启动 Express 服务
pm2 start ecosystem.config.js --only app
#启动定时任务 (单独进程)
pm2 start ecosystem.config.js --only cron-task-runner
# 查看进程状态
pm2 list
# 查看任务日志
pm2 logs cron-task-runner
6. 记录任务执行
1. 使用高阶函数
在 TypeScript/JavaScript 中,装饰器只能用于类的方法、属性、访问器、参数或类本身,不能直接用于普通函数 如果希望直接用于方法, 可以 手动包装 这个函数,而不是使用装饰器。
import cron from "node-cron";
import { MongoClient } from "mongodb";
const client = new MongoClient();
const db = client.db();
const logsCollection = db.collection("tasklogs");
/**
* The function `logTaskExecution` logs the start and completion of a task, along with its status and
* any errors encountered, while also storing this information in a database.
* @param {string} taskName - The `taskName` parameter is a string that represents the name or
* identifier of the task being executed. It is used for logging and tracking the task's progress and
* outcome.
* @param taskFn - The `taskFn` parameter in the `logTaskExecution` function is a function that returns
* a promise. This function represents the task that will be executed and logged by the
* `logTaskExecution` function. It should be an asynchronous function that performs the actual task
* logic and resolves the promise when the
*/
async function logTaskExecution(taskName: string, taskFn: () => Promise<void>) {
if (!taskName || !taskFn) {
throw new Error("Task name and function are required.");
}
if (typeof taskName !== "string") {
throw new Error("Task name must be a string.");
}
if (typeof taskFn !== "function") {
throw new Error("Task function must be a function.");
}
if (!taskFn.length) {
throw new Error("Task function must accept a callback.");
}
const startTime = new Date();
console.log(`[${taskName}] Task started at:`, startTime.toISOString());
const logEntry = { taskName, startTime, status: "running" };
const logId = (await logsCollection.insertOne(logEntry)).insertedId;
try {
await taskFn();
const endTime = new Date();
console.log(
`[${taskName}] Task completed successfully at:`,
endTime.toISOString()
);
await logsCollection.updateOne(
{ _id: logId },
{ $set: { endTime, status: "success" } }
);
} catch (error) {
const endTime = new Date();
console.error(
`[${taskName}] Task failed at:`,
endTime.toISOString(),
"Error:",
error
);
await logsCollection.updateOne(
{_id: logId },
{
$set: {
endTime,
status: "failed",
error: error instanceof Error ? error.message : "Unknown Error",
},
}
);
}
}
async function someCronTask() {
console.log("Run update...");
}
cron.schedule("01 09,14 ** *", () =>
logTaskExecution("someCronTask", someCronTask)
);