diff --git a/app/server/modules/backups/__tests__/backups.service.execution.test.ts b/app/server/modules/backups/__tests__/backups.service.execution.test.ts index 9edc89a8..5d6818f3 100644 --- a/app/server/modules/backups/__tests__/backups.service.execution.test.ts +++ b/app/server/modules/backups/__tests__/backups.service.execution.test.ts @@ -300,6 +300,7 @@ describe("backup execution - validation failures", () => { test("persists latest backup progress while preserving execution", async () => { const { runBackupMock } = setup(); + const updateProgressSpy = vi.spyOn(taskStore, "updateProgress"); const volume = await createTestVolume(); const repository = await createTestRepository(); const schedule = await createTestBackupSchedule({ @@ -319,6 +320,17 @@ describe("backup execution - validation failures", () => { bytes_done: 250, current_files: ["file.txt"], }); + request.onProgress({ + message_type: "status", + seconds_elapsed: 2, + seconds_remaining: 8, + percent_done: 0.5, + total_files: 100, + files_done: 50, + total_bytes: 1000, + bytes_done: 500, + current_files: ["later.txt"], + }); return { status: "completed", @@ -332,12 +344,13 @@ describe("backup execution - validation failures", () => { const task = await getBackupTaskForSchedule(schedule.id); expect(task?.status).toBe("succeeded"); + expect(updateProgressSpy).toHaveBeenCalledTimes(1); expect(task?.progress).toMatchObject({ kind: "backup", progress: { - percent_done: 0.25, - bytes_done: 250, - current_files: ["file.txt"], + percent_done: 0.5, + bytes_done: 500, + current_files: ["later.txt"], }, }); }); diff --git a/app/server/modules/backups/backups.service.ts b/app/server/modules/backups/backups.service.ts index 0ebd3767..5be1a270 100644 --- a/app/server/modules/backups/backups.service.ts +++ b/app/server/modules/backups/backups.service.ts @@ -31,6 +31,7 @@ import { mirrorQueries } from "./backups.queries"; import { runEffectPromise, toMessage } from "../../utils/errors"; import { Effect } from "effect"; import { taskStore } from "../tasks/tasks.store"; +import { createTaskProgressBuffer } from "../tasks/progress-buffer"; const BACKUP_TASK_RESOURCE_TYPE = "backup_schedule"; @@ -448,6 +449,11 @@ const executeBackup = async (scheduleId: number, manual = false) => { }); const abortController = backupExecutor.track(scheduleId); + const progressBuffer = createTaskProgressBuffer(task.id, { + onError: (error) => { + logger.error(`Failed to persist backup task progress for ${task.id}: ${toMessage(error)}`); + }, + }); let domainHandlerCompleted = false; try { @@ -470,22 +476,20 @@ const executeBackup = async (scheduleId: number, manual = false) => { signal: abortController.signal, onProgress: (progress) => { updateBackupProgress(ctx, progress); - try { - taskStore.updateProgress(task.id, { kind: "backup", progress }); - } catch (error) { - logger.error(`Failed to persist backup task progress for ${task.id}: ${toMessage(error)}`); - } + progressBuffer.update({ kind: "backup", progress }); }, }); switch (executionResult.status) { case "unavailable": { + progressBuffer.flush(); await handleBackupFailure(scheduleId, ctx.organizationId, executionResult.error, manual, ctx); domainHandlerCompleted = true; taskStore.fail(task.id, toMessage(executionResult.error)); return; } case "completed": + progressBuffer.flush(); await finalizeSuccessfulBackup( ctx, executionResult.exitCode, @@ -501,12 +505,14 @@ const executeBackup = async (scheduleId: number, manual = false) => { }); return; case "failed": { + progressBuffer.flush(); await handleBackupFailure(scheduleId, ctx.organizationId, executionResult.error, manual, ctx); domainHandlerCompleted = true; taskStore.fail(task.id, toMessage(executionResult.error)); return; } case "cancelled": + progressBuffer.flush(); await handleBackupCancellation(scheduleId, ctx.organizationId, executionResult.message); domainHandlerCompleted = true; taskStore.cancel(task.id, executionResult.message ?? "Backup was stopped by the user"); @@ -517,6 +523,7 @@ const executeBackup = async (scheduleId: number, manual = false) => { } } catch (error) { if (abortController.signal.aborted) { + progressBuffer.flush(); taskStore.cancel(task.id, "Backup was stopped by the user"); return; } @@ -525,9 +532,11 @@ const executeBackup = async (scheduleId: number, manual = false) => { throw error; } + progressBuffer.flush(); await handleBackupFailure(scheduleId, ctx.organizationId, error, manual, ctx); taskStore.fail(task.id, toMessage(error)); } finally { + progressBuffer.dispose(); backupExecutor.untrack(scheduleId, abortController); cache.del(cacheKeys.backup.progress(scheduleId)); } diff --git a/app/server/modules/tasks/progress-buffer.ts b/app/server/modules/tasks/progress-buffer.ts new file mode 100644 index 00000000..c6590b22 --- /dev/null +++ b/app/server/modules/tasks/progress-buffer.ts @@ -0,0 +1,66 @@ +import { taskStore } from "./tasks.store"; +import type { TaskProgress } from "./tasks.schemas"; + +type TaskProgressBufferOptions = { + intervalMs?: number; + onError?: (error: unknown) => void; +}; + +const DEFAULT_PROGRESS_PERSIST_INTERVAL_MS = 15_000; + +export const createTaskProgressBuffer = (taskId: string, options: TaskProgressBufferOptions = {}) => { + const intervalMs = options.intervalMs ?? DEFAULT_PROGRESS_PERSIST_INTERVAL_MS; + let latestProgress: TaskProgress | null = null; + let dirty = false; + let disposed = false; + let timer: ReturnType | null = null; + + const clearTimer = () => { + if (!timer) return; + clearTimeout(timer); + timer = null; + }; + + const persistLatest = () => { + if (!latestProgress || !dirty || disposed) return; + + try { + taskStore.updateProgress(taskId, latestProgress); + dirty = false; + } catch (error) { + options.onError?.(error); + } + }; + + const schedulePersist = () => { + if (disposed || timer) return; + + timer = setTimeout(() => { + timer = null; + persistLatest(); + + if (dirty) { + schedulePersist(); + } + }, intervalMs); + timer.unref?.(); + }; + + return { + update: (progress: TaskProgress) => { + if (disposed) return; + + latestProgress = progress; + dirty = true; + schedulePersist(); + }, + flush: () => { + clearTimer(); + persistLatest(); + }, + dispose: () => { + disposed = true; + clearTimer(); + }, + }; +}; diff --git a/packages/core/src/restic/commands/__tests__/backup.test.ts b/packages/core/src/restic/commands/__tests__/backup.test.ts index 7c7591e2..91a87cc3 100644 --- a/packages/core/src/restic/commands/__tests__/backup.test.ts +++ b/packages/core/src/restic/commands/__tests__/backup.test.ts @@ -97,6 +97,7 @@ const runBackupError = (...args: Parameters) => Effect.runPromise afterEach(() => { vi.restoreAllMocks(); + delete process.env.RESTIC_PROGRESS_FPS; }); describe("backup command", () => { @@ -251,6 +252,23 @@ describe("backup command", () => { expect(getEnv()?.RCLONE_CONFIG).toBe(mockDeps.rcloneConfigFile); }); + + test("defaults restic progress output to one update per second", async () => { + const { getEnv } = setup(); + + await runBackup(config, "/mnt/data", { organizationId: "org-1" }, mockDeps); + + expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("1"); + }); + + test("respects an explicit RESTIC_PROGRESS_FPS environment value", async () => { + process.env.RESTIC_PROGRESS_FPS = "2"; + const { getEnv } = setup(); + + await runBackup(config, "/mnt/data", { organizationId: "org-1" }, mockDeps); + + expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("2"); + }); }); describe("exit code handling", () => { diff --git a/packages/core/src/restic/commands/__tests__/restore.test.ts b/packages/core/src/restic/commands/__tests__/restore.test.ts index 914c4063..28b86a92 100644 --- a/packages/core/src/restic/commands/__tests__/restore.test.ts +++ b/packages/core/src/restic/commands/__tests__/restore.test.ts @@ -49,10 +49,12 @@ type SetupOptions = { const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {}) => { let capturedArgs: string[] = []; + let capturedEnv: SafeSpawnParams["env"]; vi.spyOn(cleanupModule, "cleanupTemporaryKeys").mockImplementation(() => Promise.resolve()); vi.spyOn(spawnModule, "safeSpawn").mockImplementation((params: SafeSpawnParams) => { capturedArgs = params.args; + capturedEnv = params.env; if (spawnError) { return Promise.reject(spawnError); } @@ -85,6 +87,7 @@ const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {}) return { getArgs: () => capturedArgs, + getEnv: () => capturedEnv, getRestoreArg, getOptionValues, }; @@ -92,6 +95,7 @@ const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {}) afterEach(() => { vi.restoreAllMocks(); + delete process.env.RESTIC_PROGRESS_FPS; }); const runRestore = (...args: Parameters) => Effect.runPromise(restore(...args)); @@ -229,6 +233,35 @@ describe("restore command", () => { }); describe("progress callbacks", () => { + test("defaults restic progress output to one update per second", async () => { + const { getEnv } = setup(); + + await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, + mockDeps, + ); + + expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("1"); + }); + + test("respects an explicit RESTIC_PROGRESS_FPS environment value", async () => { + process.env.RESTIC_PROGRESS_FPS = "2"; + const { getEnv } = setup(); + + await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, + mockDeps, + ); + + expect(getEnv()?.RESTIC_PROGRESS_FPS).toBe("2"); + }); + test("calls onProgress with parsed status updates", async () => { const progressUpdates: unknown[] = []; setup({ onSpawnCall: (params) => params.onStdout?.(validProgressLine) }); diff --git a/packages/core/src/restic/commands/backup.ts b/packages/core/src/restic/commands/backup.ts index e0170a42..725fdb8e 100644 --- a/packages/core/src/restic/commands/backup.ts +++ b/packages/core/src/restic/commands/backup.ts @@ -2,7 +2,6 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { Data, Effect } from "effect"; -import { throttle } from "es-toolkit"; import type { CompressionMode, RepositoryConfig } from "../schemas"; import { type ResticBackupProgressDto, resticBackupOutputSchema, resticBackupProgressSchema } from "../restic-dto"; import { addCommonArgs } from "../helpers/add-common-args"; @@ -133,18 +132,15 @@ export const backup = ( } const stderrLines: string[] = []; - const logData = throttle((data: string) => { - logger.info(data.trim()); - }, 5000); + const resticProgressFps = process.env.RESTIC_PROGRESS_FPS ?? "1"; logger.debug(`Executing: restic ${args.join(" ")}`); const res = await safeSpawn({ command: "restic", args, - env: { ...env, RESTIC_PROGRESS_FPS: "1" }, + env: { ...env, RESTIC_PROGRESS_FPS: resticProgressFps }, signal: options.signal, onStdout: (data) => { - logData(data); if (!options.onProgress) { return; } diff --git a/packages/core/src/restic/commands/restore.ts b/packages/core/src/restic/commands/restore.ts index b531d25b..909e00bf 100644 --- a/packages/core/src/restic/commands/restore.ts +++ b/packages/core/src/restic/commands/restore.ts @@ -98,13 +98,14 @@ export const restore = ( args.push("--", restoreArg); const onProgress = options.onProgress; + const resticProgressFps = process.env.RESTIC_PROGRESS_FPS ?? "1"; logger.debug(`Executing: restic ${args.join(" ")}`); const res = yield* Effect.tryPromise(() => safeSpawn({ command: "restic", args, - env: { ...env, RESTIC_PROGRESS_FPS: "1" }, + env: { ...env, RESTIC_PROGRESS_FPS: resticProgressFps }, signal: options.signal, onStdout: (data) => { if (!onProgress) {