refactor: buffer backup / restore progress (#956)

This commit is contained in:
Nico
2026-06-08 17:36:20 +02:00
committed by GitHub
parent a2e621345a
commit 5dabe9dec5
7 changed files with 151 additions and 15 deletions

View File

@@ -300,6 +300,7 @@ describe("backup execution - validation failures", () => {
test("persists latest backup progress while preserving execution", async () => {
const { runBackupMock } = setup();
const updateProgressSpy = vi.spyOn(taskStore, "updateProgress");
const volume = await createTestVolume();
const repository = await createTestRepository();
const schedule = await createTestBackupSchedule({
@@ -319,6 +320,17 @@ describe("backup execution - validation failures", () => {
bytes_done: 250,
current_files: ["file.txt"],
});
request.onProgress({
message_type: "status",
seconds_elapsed: 2,
seconds_remaining: 8,
percent_done: 0.5,
total_files: 100,
files_done: 50,
total_bytes: 1000,
bytes_done: 500,
current_files: ["later.txt"],
});
return {
status: "completed",
@@ -332,12 +344,13 @@ describe("backup execution - validation failures", () => {
const task = await getBackupTaskForSchedule(schedule.id);
expect(task?.status).toBe("succeeded");
expect(updateProgressSpy).toHaveBeenCalledTimes(1);
expect(task?.progress).toMatchObject({
kind: "backup",
progress: {
percent_done: 0.25,
bytes_done: 250,
current_files: ["file.txt"],
percent_done: 0.5,
bytes_done: 500,
current_files: ["later.txt"],
},
});
});

View File

@@ -31,6 +31,7 @@ import { mirrorQueries } from "./backups.queries";
import { runEffectPromise, toMessage } from "../../utils/errors";
import { Effect } from "effect";
import { taskStore } from "../tasks/tasks.store";
import { createTaskProgressBuffer } from "../tasks/progress-buffer";
const BACKUP_TASK_RESOURCE_TYPE = "backup_schedule";
@@ -448,6 +449,11 @@ const executeBackup = async (scheduleId: number, manual = false) => {
});
const abortController = backupExecutor.track(scheduleId);
const progressBuffer = createTaskProgressBuffer(task.id, {
onError: (error) => {
logger.error(`Failed to persist backup task progress for ${task.id}: ${toMessage(error)}`);
},
});
let domainHandlerCompleted = false;
try {
@@ -470,22 +476,20 @@ const executeBackup = async (scheduleId: number, manual = false) => {
signal: abortController.signal,
onProgress: (progress) => {
updateBackupProgress(ctx, progress);
try {
taskStore.updateProgress(task.id, { kind: "backup", progress });
} catch (error) {
logger.error(`Failed to persist backup task progress for ${task.id}: ${toMessage(error)}`);
}
progressBuffer.update({ kind: "backup", progress });
},
});
switch (executionResult.status) {
case "unavailable": {
progressBuffer.flush();
await handleBackupFailure(scheduleId, ctx.organizationId, executionResult.error, manual, ctx);
domainHandlerCompleted = true;
taskStore.fail(task.id, toMessage(executionResult.error));
return;
}
case "completed":
progressBuffer.flush();
await finalizeSuccessfulBackup(
ctx,
executionResult.exitCode,
@@ -501,12 +505,14 @@ const executeBackup = async (scheduleId: number, manual = false) => {
});
return;
case "failed": {
progressBuffer.flush();
await handleBackupFailure(scheduleId, ctx.organizationId, executionResult.error, manual, ctx);
domainHandlerCompleted = true;
taskStore.fail(task.id, toMessage(executionResult.error));
return;
}
case "cancelled":
progressBuffer.flush();
await handleBackupCancellation(scheduleId, ctx.organizationId, executionResult.message);
domainHandlerCompleted = true;
taskStore.cancel(task.id, executionResult.message ?? "Backup was stopped by the user");
@@ -517,6 +523,7 @@ const executeBackup = async (scheduleId: number, manual = false) => {
}
} catch (error) {
if (abortController.signal.aborted) {
progressBuffer.flush();
taskStore.cancel(task.id, "Backup was stopped by the user");
return;
}
@@ -525,9 +532,11 @@ const executeBackup = async (scheduleId: number, manual = false) => {
throw error;
}
progressBuffer.flush();
await handleBackupFailure(scheduleId, ctx.organizationId, error, manual, ctx);
taskStore.fail(task.id, toMessage(error));
} finally {
progressBuffer.dispose();
backupExecutor.untrack(scheduleId, abortController);
cache.del(cacheKeys.backup.progress(scheduleId));
}

View File

@@ -0,0 +1,66 @@
import { taskStore } from "./tasks.store";
import type { TaskProgress } from "./tasks.schemas";
type TaskProgressBufferOptions = {
intervalMs?: number;
onError?: (error: unknown) => void;
};
const DEFAULT_PROGRESS_PERSIST_INTERVAL_MS = 15_000;
export const createTaskProgressBuffer = (taskId: string, options: TaskProgressBufferOptions = {}) => {
const intervalMs = options.intervalMs ?? DEFAULT_PROGRESS_PERSIST_INTERVAL_MS;
let latestProgress: TaskProgress | null = null;
let dirty = false;
let disposed = false;
let timer: ReturnType<typeof setTimeout> | null = null;
const clearTimer = () => {
if (!timer) return;
clearTimeout(timer);
timer = null;
};
const persistLatest = () => {
if (!latestProgress || !dirty || disposed) return;
try {
taskStore.updateProgress(taskId, latestProgress);
dirty = false;
} catch (error) {
options.onError?.(error);
}
};
const schedulePersist = () => {
if (disposed || timer) return;
timer = setTimeout(() => {
timer = null;
persistLatest();
if (dirty) {
schedulePersist();
}
}, intervalMs);
timer.unref?.();
};
return {
update: (progress: TaskProgress) => {
if (disposed) return;
latestProgress = progress;
dirty = true;
schedulePersist();
},
flush: () => {
clearTimer();
persistLatest();
},
dispose: () => {
disposed = true;
clearTimer();
},
};
};

View File

@@ -97,6 +97,7 @@ const runBackupError = (...args: Parameters<typeof backup>) => Effect.runPromise
afterEach(() => {
vi.restoreAllMocks();
delete process.env.RESTIC_PROGRESS_FPS;
});
describe("backup command", () => {
@@ -251,6 +252,23 @@ describe("backup command", () => {
expect(getEnv()?.RCLONE_CONFIG).toBe(mockDeps.rcloneConfigFile);
});
test("defaults restic progress output to one update per second", async () => {
const { getEnv } = setup();
await runBackup(config, "/mnt/data", { organizationId: "org-1" }, mockDeps);
expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("1");
});
test("respects an explicit RESTIC_PROGRESS_FPS environment value", async () => {
process.env.RESTIC_PROGRESS_FPS = "2";
const { getEnv } = setup();
await runBackup(config, "/mnt/data", { organizationId: "org-1" }, mockDeps);
expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("2");
});
});
describe("exit code handling", () => {

View File

@@ -49,10 +49,12 @@ type SetupOptions = {
const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {}) => {
let capturedArgs: string[] = [];
let capturedEnv: SafeSpawnParams["env"];
vi.spyOn(cleanupModule, "cleanupTemporaryKeys").mockImplementation(() => Promise.resolve());
vi.spyOn(spawnModule, "safeSpawn").mockImplementation((params: SafeSpawnParams) => {
capturedArgs = params.args;
capturedEnv = params.env;
if (spawnError) {
return Promise.reject(spawnError);
}
@@ -85,6 +87,7 @@ const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {})
return {
getArgs: () => capturedArgs,
getEnv: () => capturedEnv,
getRestoreArg,
getOptionValues,
};
@@ -92,6 +95,7 @@ const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {})
afterEach(() => {
vi.restoreAllMocks();
delete process.env.RESTIC_PROGRESS_FPS;
});
const runRestore = (...args: Parameters<typeof restore>) => Effect.runPromise(restore(...args));
@@ -229,6 +233,35 @@ describe("restore command", () => {
});
describe("progress callbacks", () => {
test("defaults restic progress output to one update per second", async () => {
const { getEnv } = setup();
await runRestore(
config,
"snapshot-123",
"/tmp/restore-target",
{ organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" },
mockDeps,
);
expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("1");
});
test("respects an explicit RESTIC_PROGRESS_FPS environment value", async () => {
process.env.RESTIC_PROGRESS_FPS = "2";
const { getEnv } = setup();
await runRestore(
config,
"snapshot-123",
"/tmp/restore-target",
{ organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" },
mockDeps,
);
expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("2");
});
test("calls onProgress with parsed status updates", async () => {
const progressUpdates: unknown[] = [];
setup({ onSpawnCall: (params) => params.onStdout?.(validProgressLine) });

View File

@@ -2,7 +2,6 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { Data, Effect } from "effect";
import { throttle } from "es-toolkit";
import type { CompressionMode, RepositoryConfig } from "../schemas";
import { type ResticBackupProgressDto, resticBackupOutputSchema, resticBackupProgressSchema } from "../restic-dto";
import { addCommonArgs } from "../helpers/add-common-args";
@@ -133,18 +132,15 @@ export const backup = (
}
const stderrLines: string[] = [];
const logData = throttle((data: string) => {
logger.info(data.trim());
}, 5000);
const resticProgressFps = process.env.RESTIC_PROGRESS_FPS ?? "1";
logger.debug(`Executing: restic ${args.join(" ")}`);
const res = await safeSpawn({
command: "restic",
args,
env: { ...env, RESTIC_PROGRESS_FPS: "1" },
env: { ...env, RESTIC_PROGRESS_FPS: resticProgressFps },
signal: options.signal,
onStdout: (data) => {
logData(data);
if (!options.onProgress) {
return;
}

View File

@@ -98,13 +98,14 @@ export const restore = (
args.push("--", restoreArg);
const onProgress = options.onProgress;
const resticProgressFps = process.env.RESTIC_PROGRESS_FPS ?? "1";
logger.debug(`Executing: restic ${args.join(" ")}`);
const res = yield* Effect.tryPromise(() =>
safeSpawn({
command: "restic",
args,
env: { ...env, RESTIC_PROGRESS_FPS: "1" },
env: { ...env, RESTIC_PROGRESS_FPS: resticProgressFps },
signal: options.signal,
onStdout: (data) => {
if (!onProgress) {