mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-04-18 13:57:52 -04:00
133 lines
2.7 KiB
TypeScript
133 lines
2.7 KiB
TypeScript
import { CronExpressionParser } from "cron-parser";
|
|
import { logger } from "@zerobyte/core/node";
|
|
|
|
export abstract class Job {
|
|
abstract run(): Promise<unknown>;
|
|
}
|
|
|
|
type JobConstructor = new () => Job;
|
|
|
|
class ScheduledTask {
|
|
private timer: ReturnType<typeof setTimeout> | null = null;
|
|
private active = true;
|
|
private running = false;
|
|
private readonly timeZone = Intl.DateTimeFormat().resolvedOptions().timeZone || undefined;
|
|
|
|
constructor(
|
|
private readonly jobName: string,
|
|
private readonly cronExpression: string,
|
|
private readonly run: () => Promise<void>,
|
|
) {
|
|
CronExpressionParser.parse(this.cronExpression);
|
|
this.scheduleNext();
|
|
}
|
|
|
|
private getDelay(fromDate: Date) {
|
|
const interval = CronExpressionParser.parse(this.cronExpression, {
|
|
currentDate: fromDate,
|
|
tz: this.timeZone,
|
|
});
|
|
const nextRun = interval.next().toDate();
|
|
return Math.max(0, nextRun.getTime() - Date.now());
|
|
}
|
|
|
|
private clearTimer() {
|
|
if (this.timer) {
|
|
clearTimeout(this.timer);
|
|
this.timer = null;
|
|
}
|
|
}
|
|
|
|
private stopScheduling(error: unknown) {
|
|
this.active = false;
|
|
this.clearTimer();
|
|
logger.error(`Stopping scheduled job ${this.jobName} after cron parsing failed:`, error);
|
|
}
|
|
|
|
private scheduleNext(fromDate = new Date()) {
|
|
if (!this.active) return;
|
|
|
|
const delay = this.getDelay(fromDate);
|
|
this.timer = setTimeout(() => {
|
|
void this.tick();
|
|
}, delay);
|
|
}
|
|
|
|
private async tick() {
|
|
if (!this.active) return;
|
|
this.timer = null;
|
|
|
|
try {
|
|
this.scheduleNext(new Date());
|
|
} catch (error) {
|
|
this.stopScheduling(error);
|
|
return;
|
|
}
|
|
|
|
if (this.running) {
|
|
logger.warn(`Skipping overlapping run for job ${this.jobName}`);
|
|
return;
|
|
}
|
|
|
|
this.running = true;
|
|
try {
|
|
await this.run();
|
|
} finally {
|
|
this.running = false;
|
|
}
|
|
}
|
|
|
|
async stop() {
|
|
this.active = false;
|
|
this.clearTimer();
|
|
}
|
|
|
|
async destroy() {
|
|
await this.stop();
|
|
}
|
|
}
|
|
|
|
class SchedulerClass {
|
|
private tasks: ScheduledTask[] = [];
|
|
|
|
async start() {
|
|
logger.info("Scheduler started");
|
|
}
|
|
|
|
build(JobClass: JobConstructor) {
|
|
const job = new JobClass();
|
|
return {
|
|
schedule: (cronExpression: string) => {
|
|
const task = new ScheduledTask(JobClass.name, cronExpression, async () => {
|
|
try {
|
|
await job.run();
|
|
} catch (error) {
|
|
logger.error(`Job ${JobClass.name} failed:`, error);
|
|
}
|
|
});
|
|
|
|
this.tasks.push(task);
|
|
logger.info(`Scheduled job ${JobClass.name} with cron: ${cronExpression}`);
|
|
},
|
|
};
|
|
}
|
|
|
|
async stop() {
|
|
for (const task of this.tasks) {
|
|
await task.stop();
|
|
}
|
|
this.tasks = [];
|
|
logger.info("Scheduler stopped");
|
|
}
|
|
|
|
async clear() {
|
|
for (const task of this.tasks) {
|
|
await task.destroy();
|
|
}
|
|
this.tasks = [];
|
|
logger.info("Scheduler cleared all tasks");
|
|
}
|
|
}
|
|
|
|
export const Scheduler = new SchedulerClass();
|