mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-04-26 18:08:14 -04:00
fix: schedules not running if never run manually (#194)
* fix: backups that were never run manually are not picked up by job * fix(mutex): prioritize waiting exclusive locks over newly added shared tasks
This commit is contained in:
38
app/server/core/__tests__/repository-mutex.test.ts
Normal file
38
app/server/core/__tests__/repository-mutex.test.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import { test, describe, expect } from "bun:test";
|
||||
import { repoMutex } from "../repository-mutex";
|
||||
|
||||
describe("RepositoryMutex", () => {
|
||||
test("should prioritize waiting exclusive locks over new shared locks", async () => {
|
||||
const repoId = "test-repo";
|
||||
const results: string[] = [];
|
||||
|
||||
const releaseShared1 = await repoMutex.acquireShared(repoId, "backup-1");
|
||||
results.push("acquired-shared-1");
|
||||
|
||||
const exclusivePromise = repoMutex.acquireExclusive(repoId, "unlock").then((release) => {
|
||||
results.push("acquired-exclusive");
|
||||
return release;
|
||||
});
|
||||
|
||||
const shared2Promise = repoMutex.acquireShared(repoId, "backup-2").then((release) => {
|
||||
results.push("acquired-shared-2");
|
||||
return release;
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(results).toEqual(["acquired-shared-1"]);
|
||||
|
||||
releaseShared1();
|
||||
|
||||
const releaseExclusive = await exclusivePromise;
|
||||
expect(results).toEqual(["acquired-shared-1", "acquired-exclusive"]);
|
||||
|
||||
releaseExclusive();
|
||||
|
||||
const releaseShared2 = await shared2Promise;
|
||||
expect(results).toEqual(["acquired-shared-1", "acquired-exclusive", "acquired-shared-2"]);
|
||||
|
||||
releaseShared2();
|
||||
});
|
||||
});
|
||||
@@ -49,7 +49,9 @@ class RepositoryMutex {
|
||||
async acquireShared(repositoryId: string, operation: string): Promise<() => void> {
|
||||
const state = this.getOrCreateState(repositoryId);
|
||||
|
||||
if (!state.exclusiveHolder) {
|
||||
const hasExclusiveInQueue = state.waitQueue.some((item) => item.type === "exclusive");
|
||||
|
||||
if (!state.exclusiveHolder && !hasExclusiveInQueue) {
|
||||
const lockId = this.generateLockId();
|
||||
state.sharedHolders.set(lockId, {
|
||||
id: lockId,
|
||||
@@ -60,7 +62,7 @@ class RepositoryMutex {
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`[Mutex] Waiting for shared lock on repo ${repositoryId}: ${operation} (exclusive held by: ${state.exclusiveHolder.operation})`,
|
||||
`[Mutex] Waiting for shared lock on repo ${repositoryId}: ${operation} (exclusive held by: ${state.exclusiveHolder?.operation ?? "none"}, queue: ${state.waitQueue.length})`,
|
||||
);
|
||||
const lockId = await new Promise<string>((resolve) => {
|
||||
state.waitQueue.push({ type: "shared", operation, resolve });
|
||||
|
||||
@@ -4,6 +4,7 @@ import { createTestVolume } from "~/test/helpers/volume";
|
||||
import { createTestBackupSchedule } from "~/test/helpers/backup";
|
||||
import { createTestRepository } from "~/test/helpers/repository";
|
||||
import { generateBackupOutput } from "~/test/helpers/restic";
|
||||
import { faker } from "@faker-js/faker";
|
||||
import * as spawnModule from "~/server/utils/spawn";
|
||||
|
||||
const resticBackupMock = mock(() => Promise.resolve({ exitCode: 0, stdout: "", stderr: "" }));
|
||||
@@ -150,3 +151,26 @@ describe("execute backup", () => {
|
||||
expect(updatedSchedule.lastBackupStatus).toBe("error");
|
||||
});
|
||||
});
|
||||
|
||||
describe("getSchedulesToExecute", () => {
|
||||
test("should return schedules with NULL lastBackupStatus", async () => {
|
||||
// arrange
|
||||
const volume = await createTestVolume();
|
||||
const repository = await createTestRepository();
|
||||
|
||||
const schedule = await createTestBackupSchedule({
|
||||
volumeId: volume.id,
|
||||
repositoryId: repository.id,
|
||||
enabled: true,
|
||||
cronExpression: "* * * * *",
|
||||
lastBackupStatus: null,
|
||||
nextBackupAt: faker.date.past().getTime(),
|
||||
});
|
||||
|
||||
// act
|
||||
const schedulesToExecute = await backupsService.getSchedulesToExecute();
|
||||
|
||||
// assert
|
||||
expect(schedulesToExecute).toContain(schedule.id);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { and, asc, eq, ne } from "drizzle-orm";
|
||||
import { and, asc, eq, isNull, ne, or } from "drizzle-orm";
|
||||
import cron from "node-cron";
|
||||
import { CronExpressionParser } from "cron-parser";
|
||||
import { NotFoundError, BadRequestError, ConflictError } from "http-errors-enhanced";
|
||||
@@ -393,7 +393,10 @@ const executeBackup = async (scheduleId: number, manual = false) => {
|
||||
const getSchedulesToExecute = async () => {
|
||||
const now = Date.now();
|
||||
const schedules = await db.query.backupSchedulesTable.findMany({
|
||||
where: and(eq(backupSchedulesTable.enabled, true), ne(backupSchedulesTable.lastBackupStatus, "in_progress")),
|
||||
where: and(
|
||||
eq(backupSchedulesTable.enabled, true),
|
||||
or(ne(backupSchedulesTable.lastBackupStatus, "in_progress"), isNull(backupSchedulesTable.lastBackupStatus)),
|
||||
),
|
||||
});
|
||||
|
||||
const schedulesToRun: number[] = [];
|
||||
|
||||
Reference in New Issue
Block a user