From 8fedeef4d1275d14746e5406cd8f441ecae4ffbd Mon Sep 17 00:00:00 2001 From: Nico <47644445+nicotsx@users.noreply.github.com> Date: Sun, 31 May 2026 19:39:20 +0200 Subject: [PATCH] feat: add restore agent RPC foundation (#929) * feat: add restore agent RPC foundation * chore: temp event handlers * refactor: export restore progress from dto file --- .../modules/agents/controller/server.ts | 9 + app/server/modules/backups/backup.helpers.ts | 2 +- .../src/commands/__tests__/restore.test.ts | 122 ++++++++++++ .../commands/{ => __tests__}/volume.test.ts | 8 +- apps/agent/src/commands/backup-cancel.ts | 2 +- apps/agent/src/commands/backup-run.ts | 8 +- .../__tests__/backup.helpers.test.ts | 0 .../commands/{ => helpers}/backup.helpers.ts | 0 apps/agent/src/commands/index.ts | 8 + apps/agent/src/commands/restore-cancel.ts | 21 ++ apps/agent/src/commands/restore.ts | 110 +++++++++++ apps/agent/src/context.ts | 7 +- apps/agent/src/controller-session.ts | 2 +- packages/contracts/src/agent-protocol.ts | 85 +++++++- .../restic/commands/__tests__/restore.test.ts | 187 +++++++++--------- packages/core/src/restic/commands/restore.ts | 136 ++++++------- packages/core/src/restic/index.ts | 2 - packages/core/src/restic/restic-dto.ts | 11 ++ 18 files changed, 541 insertions(+), 179 deletions(-) create mode 100644 apps/agent/src/commands/__tests__/restore.test.ts rename apps/agent/src/commands/{ => __tests__}/volume.test.ts (93%) rename apps/agent/src/commands/{ => helpers}/__tests__/backup.helpers.test.ts (100%) rename apps/agent/src/commands/{ => helpers}/backup.helpers.ts (100%) create mode 100644 apps/agent/src/commands/restore-cancel.ts create mode 100644 apps/agent/src/commands/restore.ts diff --git a/app/server/modules/agents/controller/server.ts b/app/server/modules/agents/controller/server.ts index b3de99f5..9d7670b8 100644 --- a/app/server/modules/agents/controller/server.ts +++ b/app/server/modules/agents/controller/server.ts @@ -109,6 +109,15 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) => case "agent.disconnected": { return Effect.sync(() => onEvent({ type: "agent.disconnected", agentId, agentName })); } + case "restore.cancelled": + case "restore.completed": + case "restore.failed": + case "restore.progress": + case "restore.started": { + // TODO: once we implement the app side + return Effect.void; + } + default: { return Effect.sync(() => onEvent({ ...event, agentId, agentName })); } diff --git a/app/server/modules/backups/backup.helpers.ts b/app/server/modules/backups/backup.helpers.ts index dcafb27e..095f831d 100644 --- a/app/server/modules/backups/backup.helpers.ts +++ b/app/server/modules/backups/backup.helpers.ts @@ -1,5 +1,5 @@ import { CronExpressionParser } from "cron-parser"; -import { createBackupOptions as createAgentBackupOptions } from "../../../../apps/agent/src/commands/backup.helpers"; +import { createBackupOptions as createAgentBackupOptions } from "../../../../apps/agent/src/commands/helpers/backup.helpers"; import type { BackupSchedule } from "~/server/db/schema"; import { toMessage } from "~/server/utils/errors"; import { logger } from "@zerobyte/core/node"; diff --git a/apps/agent/src/commands/__tests__/restore.test.ts b/apps/agent/src/commands/__tests__/restore.test.ts new file mode 100644 index 00000000..03f7143a --- /dev/null +++ b/apps/agent/src/commands/__tests__/restore.test.ts @@ -0,0 +1,122 @@ +import { Effect } from "effect"; +import { afterEach, expect, test, vi } from "vitest"; +import waitForExpect from "wait-for-expect"; +import { fromPartial } from "@total-typescript/shoehorn"; +import { parseAgentMessage, type RestoreRunPayload } from "@zerobyte/contracts/agent-protocol"; +import * as resticServer from "@zerobyte/core/restic/server"; +import { handleRestoreCancelCommand } from "../restore-cancel"; +import { handleRestoreRunCommand } from "../restore"; +import type { ControllerCommandContext, RunningJob } from "../../context"; + +afterEach(() => { + vi.restoreAllMocks(); +}); + +const createRunPayload = (overrides: Partial = {}) => + fromPartial({ + restoreId: "restore-1", + organizationId: "org-1", + repositoryId: "repo-1", + snapshotId: "snapshot-1", + target: `${process.cwd()}/restore-target`, + repositoryConfig: { backend: "local", path: "/tmp/repository" }, + runtime: { password: "password" }, + options: { organizationId: "org-1", basePath: "/" }, + ...overrides, + }); + +const createContext = () => { + const outboundMessages: string[] = []; + const runningJobs = new Map(); + + const context: ControllerCommandContext = { + getRunningJob: (jobId) => Effect.succeed(runningJobs.get(jobId)), + setRunningJob: (jobId, job) => + Effect.sync(() => { + runningJobs.set(jobId, job); + }), + deleteRunningJob: (jobId) => + Effect.sync(() => { + runningJobs.delete(jobId); + }), + offerOutbound: (message) => + Effect.sync(() => { + outboundMessages.push(message); + return true; + }), + }; + + return { context, runningJobs, messages: () => outboundMessages.map((message) => parseAgentMessage(message)) }; +}; + +test("forks restore execution and emits lifecycle events", async () => { + vi.spyOn(resticServer, "createRestic").mockReturnValue( + fromPartial({ + restore: () => + Effect.succeed({ + message_type: "summary" as const, + files_restored: 2, + files_skipped: 1, + bytes_skipped: 0, + }), + }), + ); + const payload = createRunPayload(); + const { context, runningJobs, messages } = createContext(); + + await Effect.runPromise( + Effect.gen(function* () { + yield* handleRestoreRunCommand(context, payload); + yield* Effect.promise(() => + waitForExpect(() => { + expect(runningJobs.has(payload.restoreId)).toBe(false); + }), + ); + }), + ); + + expect(messages().flatMap((message) => (message?.success ? [message.data.type] : []))).toEqual([ + "restore.started", + "restore.completed", + ]); +}); + +test("cancels a running restore with the shared running job registry", async () => { + vi.spyOn(resticServer, "createRestic").mockReturnValue( + fromPartial({ + restore: (_config: unknown, _snapshotId: string, _target: string, options: { signal?: AbortSignal }) => + Effect.tryPromise( + () => + new Promise((_resolve, reject) => { + options.signal?.addEventListener("abort", () => reject(new Error("aborted")), { + once: true, + }); + }), + ), + }), + ); + const payload = createRunPayload(); + const { context, runningJobs, messages } = createContext(); + + await Effect.runPromise( + Effect.gen(function* () { + yield* handleRestoreRunCommand(context, payload); + yield* Effect.promise(() => + waitForExpect(() => { + expect(runningJobs.get(payload.restoreId)?.kind).toBe("restore"); + }), + ); + + yield* handleRestoreCancelCommand(context, { restoreId: payload.restoreId }); + + yield* Effect.promise(() => + waitForExpect(() => { + expect(runningJobs.has(payload.restoreId)).toBe(false); + expect( + messages().some((message) => message?.success && message.data.type === "restore.cancelled"), + ).toBe(true); + }), + ); + }), + ); +}); diff --git a/apps/agent/src/commands/volume.test.ts b/apps/agent/src/commands/__tests__/volume.test.ts similarity index 93% rename from apps/agent/src/commands/volume.test.ts rename to apps/agent/src/commands/__tests__/volume.test.ts index 28c3fbdf..07527838 100644 --- a/apps/agent/src/commands/volume.test.ts +++ b/apps/agent/src/commands/__tests__/volume.test.ts @@ -6,7 +6,7 @@ import { type AgentWireMessage, type VolumeCommandPayload, } from "@zerobyte/contracts/agent-protocol"; -import type { ControllerCommandContext } from "../context"; +import type { ControllerCommandContext } from "../../context"; const volumeHostMock = vi.hoisted(() => ({ createVolumeBackend: vi.fn(), @@ -20,10 +20,10 @@ const operationsMock = vi.hoisted(() => ({ testVolumeConnection: vi.fn(), })); -vi.mock("../volume-host", () => volumeHostMock); -vi.mock("../volume-host/operations", () => operationsMock); +vi.mock("../../volume-host", () => volumeHostMock); +vi.mock("../../volume-host/operations", () => operationsMock); -import { handleVolumeCommand } from "./volume"; +import { handleVolumeCommand } from "../volume"; afterEach(() => { vi.restoreAllMocks(); diff --git a/apps/agent/src/commands/backup-cancel.ts b/apps/agent/src/commands/backup-cancel.ts index ec09d033..24c3cf67 100644 --- a/apps/agent/src/commands/backup-cancel.ts +++ b/apps/agent/src/commands/backup-cancel.ts @@ -11,7 +11,7 @@ export const handleBackupCancelCommand = (context: ControllerCommandContext, pay return; } - if (running.scheduleId !== payload.scheduleId) { + if (running.kind !== "backup" || running.scheduleId !== payload.scheduleId) { logger.warn(`Ignoring cancel for backup ${payload.jobId} due to schedule mismatch ${payload.scheduleId}`); return; } diff --git a/apps/agent/src/commands/backup-run.ts b/apps/agent/src/commands/backup-run.ts index 17e53e36..c7812741 100644 --- a/apps/agent/src/commands/backup-run.ts +++ b/apps/agent/src/commands/backup-run.ts @@ -8,7 +8,7 @@ import { toMessage } from "@zerobyte/core/utils"; import type { ControllerCommandContext } from "../context"; import { resticDeps } from "../restic/deps"; import { createVolumeBackend, getVolumePath } from "../volume-host"; -import { createBackupOptions } from "./backup.helpers"; +import { createBackupOptions } from "./helpers/backup.helpers"; class VolumeReadinessError extends Data.TaggedError("VolumeReadinessError")<{ readonly _tag: "VolumeReadinessError"; @@ -64,7 +64,11 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa yield* logger.effect.info(`Starting backup ${payload.jobId} for schedule ${payload.scheduleId}`); const abortController = new AbortController(); - yield* context.setRunningJob(payload.jobId, { scheduleId: payload.scheduleId, abortController }); + yield* context.setRunningJob(payload.jobId, { + kind: "backup", + scheduleId: payload.scheduleId, + abortController, + }); yield* Effect.fork( Effect.gen(function* () { diff --git a/apps/agent/src/commands/__tests__/backup.helpers.test.ts b/apps/agent/src/commands/helpers/__tests__/backup.helpers.test.ts similarity index 100% rename from apps/agent/src/commands/__tests__/backup.helpers.test.ts rename to apps/agent/src/commands/helpers/__tests__/backup.helpers.test.ts diff --git a/apps/agent/src/commands/backup.helpers.ts b/apps/agent/src/commands/helpers/backup.helpers.ts similarity index 100% rename from apps/agent/src/commands/backup.helpers.ts rename to apps/agent/src/commands/helpers/backup.helpers.ts diff --git a/apps/agent/src/commands/index.ts b/apps/agent/src/commands/index.ts index 6f73e0c6..8e75b907 100644 --- a/apps/agent/src/commands/index.ts +++ b/apps/agent/src/commands/index.ts @@ -3,6 +3,8 @@ import { handleBackupCancelCommand } from "./backup-cancel"; import { handleBackupRunCommand } from "./backup-run"; import type { ControllerCommandContext } from "../context"; import { handleHeartbeatPingCommand } from "./heartbeat-ping"; +import { handleRestoreCancelCommand } from "./restore-cancel"; +import { handleRestoreRunCommand } from "./restore"; import { handleVolumeCommand } from "./volume"; export const handleControllerCommand = (context: ControllerCommandContext, message: ControllerMessage) => { @@ -16,6 +18,12 @@ export const handleControllerCommand = (context: ControllerCommandContext, messa case "volume.command": { return handleVolumeCommand(context, message.payload); } + case "restore.run": { + return handleRestoreRunCommand(context, message.payload); + } + case "restore.cancel": { + return handleRestoreCancelCommand(context, message.payload); + } case "heartbeat.ping": { return handleHeartbeatPingCommand(context, message.payload); } diff --git a/apps/agent/src/commands/restore-cancel.ts b/apps/agent/src/commands/restore-cancel.ts new file mode 100644 index 00000000..e97b54aa --- /dev/null +++ b/apps/agent/src/commands/restore-cancel.ts @@ -0,0 +1,21 @@ +import { Effect } from "effect"; +import { type RestoreCancelPayload } from "@zerobyte/contracts/agent-protocol"; +import { logger } from "@zerobyte/core/node"; +import type { ControllerCommandContext } from "../context"; + +export const handleRestoreCancelCommand = (context: ControllerCommandContext, payload: RestoreCancelPayload) => { + return Effect.gen(function* () { + const running = yield* context.getRunningJob(payload.restoreId); + if (!running) { + logger.warn(`Restore ${payload.restoreId} is not running`); + return; + } + + if (running.kind !== "restore") { + logger.warn(`Ignoring restore cancel for non-restore job ${payload.restoreId}`); + return; + } + + running.abortController.abort(); + }); +}; diff --git a/apps/agent/src/commands/restore.ts b/apps/agent/src/commands/restore.ts new file mode 100644 index 00000000..ccb6bdc4 --- /dev/null +++ b/apps/agent/src/commands/restore.ts @@ -0,0 +1,110 @@ +import os from "node:os"; +import path from "node:path"; +import { Effect, Runtime } from "effect"; +import { createAgentMessage, type RestoreRunPayload } from "@zerobyte/contracts/agent-protocol"; +import { createRestic } from "@zerobyte/core/restic/server"; +import { isPathWithin, toMessage } from "@zerobyte/core/utils"; +import { logger } from "@zerobyte/core/node"; +import type { ControllerCommandContext } from "../context"; +import { resticDeps } from "../restic/deps"; + +const REPOSITORY_BASE = process.env.ZEROBYTE_REPOSITORIES_DIR || "/var/lib/zerobyte/repositories"; +const RESTIC_PASS_FILE = process.env.RESTIC_PASS_FILE || "/var/lib/zerobyte/data/restic.pass"; + +const getBlockedRestoreTargets = () => + [REPOSITORY_BASE, path.dirname(RESTIC_PASS_FILE), os.tmpdir()].map((target) => path.resolve(target)); + +const assertAllowedRestoreTarget = (target: string) => { + const resolvedTarget = path.resolve(target); + + for (const blockedTarget of getBlockedRestoreTargets()) { + if (isPathWithin(blockedTarget, resolvedTarget)) { + throw new Error( + "Restore target path is not allowed. Restoring to this path could overwrite critical system files or application data.", + ); + } + } +}; + +export const handleRestoreRunCommand = (context: ControllerCommandContext, payload: RestoreRunPayload) => { + return Effect.gen(function* () { + const restoreContext = { + restoreId: payload.restoreId, + organizationId: payload.organizationId, + repositoryId: payload.repositoryId, + snapshotId: payload.snapshotId, + }; + + const existing = yield* context.getRunningJob(payload.restoreId); + if (existing) { + yield* context.offerOutbound( + createAgentMessage("restore.failed", { + ...restoreContext, + error: "Restore job is already running", + }), + ); + return; + } + + logger.info(`Starting restore ${payload.restoreId} for snapshot ${payload.snapshotId}`); + const abortController = new AbortController(); + yield* context.setRunningJob(payload.restoreId, { kind: "restore", abortController }); + + yield* Effect.fork( + Effect.gen(function* () { + assertAllowedRestoreTarget(payload.target); + + const runtime = yield* Effect.runtime(); + const restic = createRestic(resticDeps(payload.runtime.password)); + + yield* context.offerOutbound(createAgentMessage("restore.started", restoreContext)); + + const result = yield* restic.restore(payload.repositoryConfig, payload.snapshotId, payload.target, { + ...payload.options, + signal: abortController.signal, + onProgress: (progress) => { + void Runtime.runPromise( + runtime, + context.offerOutbound( + createAgentMessage("restore.progress", { + ...restoreContext, + progress, + }), + ), + ).catch((error) => { + logger.error(`Failed to send restore progress update: ${toMessage(error)}`); + }); + }, + }); + + yield* context.offerOutbound( + createAgentMessage("restore.completed", { + ...restoreContext, + result, + }), + ); + }).pipe( + Effect.catchAll((error) => { + if (abortController.signal.aborted) { + return context.offerOutbound( + createAgentMessage("restore.cancelled", { + ...restoreContext, + message: "Restore was cancelled", + }), + ); + } + + const errorMessage = toMessage(error); + return context.offerOutbound( + createAgentMessage("restore.failed", { + ...restoreContext, + error: errorMessage, + errorDetails: errorMessage, + }), + ); + }), + Effect.ensuring(context.deleteRunningJob(payload.restoreId)), + ), + ); + }).pipe(Effect.asVoid); +}; diff --git a/apps/agent/src/context.ts b/apps/agent/src/context.ts index 4daa68bc..5d47e966 100644 --- a/apps/agent/src/context.ts +++ b/apps/agent/src/context.ts @@ -1,10 +1,9 @@ import type { AgentWireMessage } from "@zerobyte/contracts/agent-protocol"; import type { Effect } from "effect"; -export type RunningJob = { - scheduleId: string; - abortController: AbortController; -}; +export type RunningJob = + | { kind: "backup"; scheduleId: string; abortController: AbortController } + | { kind: "restore"; abortController: AbortController }; export type ControllerCommandContext = { getRunningJob: (jobId: string) => Effect.Effect; diff --git a/apps/agent/src/controller-session.ts b/apps/agent/src/controller-session.ts index f6fa6949..69d2a05d 100644 --- a/apps/agent/src/controller-session.ts +++ b/apps/agent/src/controller-session.ts @@ -144,7 +144,7 @@ export const createControllerSession = (ws: WebSocket): ControllerSession => { protocolVersion: 1, hostname: resolveResticHostname(), platform: process.platform, - capabilities: { backup: true, volume: true, restic: true }, + capabilities: { backup: true, restore: true, volume: true, restic: true }, }), ), ).catch((error) => { diff --git a/packages/contracts/src/agent-protocol.ts b/packages/contracts/src/agent-protocol.ts index bd78ad58..748661a8 100644 --- a/packages/contracts/src/agent-protocol.ts +++ b/packages/contracts/src/agent-protocol.ts @@ -5,6 +5,8 @@ import { repositoryConfigSchema, resticBackupOutputSchema, resticBackupProgressSchema, + resticRestoreOutputSchema, + restoreProgressSchema, type CompressionMode, } from "@zerobyte/core/restic"; import { @@ -29,7 +31,7 @@ const backupExecutionOptionsSchema = z.object({ compressionMode: compressionModeSchema, }); -const backupRuntimeSchema = z.object({ +const commandRuntimeSchema = z.object({ password: z.string(), }); @@ -42,7 +44,7 @@ const backupRunSchema = z.object({ volume: volumeSchema, repositoryConfig: repositoryConfigSchema, options: backupExecutionOptionsSchema, - runtime: backupRuntimeSchema, + runtime: commandRuntimeSchema, webhooks: backupWebhooksSchema, webhookAllowedOrigins: z.array(z.string()), webhookTimeoutMs: z.number(), @@ -96,6 +98,71 @@ const volumeCommandResponseSchema = z.object({ ]), }); +const restoreIdentitySchema = z.object({ + restoreId: z.string(), + organizationId: z.string(), + repositoryId: z.string(), + snapshotId: z.string(), +}); + +const restoreRunSchema = z.object({ + type: z.literal("restore.run"), + payload: restoreIdentitySchema.extend({ + target: z.string(), + repositoryConfig: repositoryConfigSchema, + runtime: commandRuntimeSchema, + options: z.object({ + basePath: z.string().optional(), + organizationId: z.string(), + include: z.array(z.string()).optional(), + selectedItemKind: z.enum(["file", "dir"]).optional(), + exclude: z.array(z.string()).optional(), + excludeXattr: z.array(z.string()).optional(), + delete: z.boolean().optional(), + overwrite: z.enum(["always", "if-changed", "if-newer", "never"]).optional(), + }), + }), +}); + +const restoreCancelSchema = z.object({ + type: z.literal("restore.cancel"), + payload: z.object({ restoreId: z.string() }), +}); + +const restoreStartedSchema = z.object({ + type: z.literal("restore.started"), + payload: restoreIdentitySchema, +}); + +const restoreProgressMessageSchema = z.object({ + type: z.literal("restore.progress"), + payload: restoreIdentitySchema.extend({ + progress: restoreProgressSchema, + }), +}); + +const restoreCompletedSchema = z.object({ + type: z.literal("restore.completed"), + payload: restoreIdentitySchema.extend({ + result: resticRestoreOutputSchema, + }), +}); + +const restoreFailedSchema = z.object({ + type: z.literal("restore.failed"), + payload: restoreIdentitySchema.extend({ + error: z.string(), + errorDetails: z.string().optional(), + }), +}); + +const restoreCancelledSchema = z.object({ + type: z.literal("restore.cancelled"), + payload: restoreIdentitySchema.extend({ + message: z.string().optional(), + }), +}); + const heartbeatPingSchema = z.object({ type: z.literal("heartbeat.ping"), payload: z.object({ sentAt: z.number() }), @@ -165,6 +232,8 @@ const controllerMessageSchema = z.discriminatedUnion("type", [ backupRunSchema, backupCancelSchema, volumeCommandRequestSchema, + restoreRunSchema, + restoreCancelSchema, heartbeatPingSchema, ]); const agentMessageSchema = z.discriminatedUnion("type", [ @@ -175,6 +244,11 @@ const agentMessageSchema = z.discriminatedUnion("type", [ backupFailedSchema, backupCancelledSchema, volumeCommandResponseSchema, + restoreStartedSchema, + restoreProgressMessageSchema, + restoreCompletedSchema, + restoreFailedSchema, + restoreCancelledSchema, heartbeatPongSchema, ]); @@ -189,6 +263,13 @@ export type VolumeCommandPayload = z.infer["p export type VolumeCommand = z.infer; export type VolumeCommandResult = z.infer; export type VolumeCommandResponsePayload = z.infer["payload"]; +export type RestoreRunPayload = z.infer["payload"]; +export type RestoreCancelPayload = z.infer["payload"]; +export type RestoreStartedPayload = z.infer["payload"]; +export type RestoreProgressPayload = z.infer["payload"]; +export type RestoreCompletedPayload = z.infer["payload"]; +export type RestoreFailedPayload = z.infer["payload"]; +export type RestoreCancelledPayload = z.infer["payload"]; export type ControllerMessage = z.infer; export type AgentMessage = z.infer; diff --git a/packages/core/src/restic/commands/__tests__/restore.test.ts b/packages/core/src/restic/commands/__tests__/restore.test.ts index a043a923..914c4063 100644 --- a/packages/core/src/restic/commands/__tests__/restore.test.ts +++ b/packages/core/src/restic/commands/__tests__/restore.test.ts @@ -1,11 +1,11 @@ import { afterEach, describe, expect, test, vi } from "vitest"; +import { Effect } from "effect"; import * as cleanupModule from "../../helpers/cleanup-temporary-keys"; import * as spawnModule from "../../../node/spawn"; import { ResticError } from "../../error"; import { restore } from "../restore"; import type { ResticDeps } from "../../types"; import type { SafeSpawnParams, SpawnResult } from "../../../node/spawn"; -import { Effect } from "effect"; const mockDeps: ResticDeps = { resolveSecret: async (s) => s, @@ -44,14 +44,19 @@ const config = { type SetupOptions = { spawnResult?: Partial; onSpawnCall?: (params: SafeSpawnParams) => void | Promise; + spawnError?: unknown; }; -const setup = ({ spawnResult = {}, onSpawnCall }: SetupOptions = {}) => { +const setup = ({ spawnResult = {}, onSpawnCall, spawnError }: SetupOptions = {}) => { let capturedArgs: string[] = []; vi.spyOn(cleanupModule, "cleanupTemporaryKeys").mockImplementation(() => Promise.resolve()); vi.spyOn(spawnModule, "safeSpawn").mockImplementation((params: SafeSpawnParams) => { capturedArgs = params.args; + if (spawnError) { + return Promise.reject(spawnError); + } + return Promise.resolve(onSpawnCall?.(params)).then(() => ({ exitCode: 0, summary: successfulRestoreSummary, @@ -89,25 +94,26 @@ afterEach(() => { vi.restoreAllMocks(); }); +const runRestore = (...args: Parameters) => Effect.runPromise(restore(...args)); +const runRestoreError = (...args: Parameters) => Effect.runPromise(Effect.flip(restore(...args))); + describe("restore command", () => { describe("path selection", () => { test("uses the common ancestor as restore root and strips includes for non-root targets", async () => { const { getRestoreArg, getOptionValues } = setup(); - await Effect.runPromise( - restore( - config, - "snapshot-456", - "/tmp/restore-target", - { - organizationId: "org-1", - include: [ - "/var/lib/zerobyte/volumes/vol123/_data/Documents/report.pdf", - "/var/lib/zerobyte/volumes/vol123/_data/Photos/summer.jpg", - ], - }, - mockDeps, - ), + await runRestore( + config, + "snapshot-456", + "/tmp/restore-target", + { + organizationId: "org-1", + include: [ + "/var/lib/zerobyte/volumes/vol123/_data/Documents/report.pdf", + "/var/lib/zerobyte/volumes/vol123/_data/Photos/summer.jpg", + ], + }, + mockDeps, ); expect(getRestoreArg()).toBe("snapshot-456:/var/lib/zerobyte/volumes/vol123/_data"); @@ -117,18 +123,16 @@ describe("restore command", () => { test("restores a selected file from its parent directory for non-root targets", async () => { const { getRestoreArg, getOptionValues } = setup(); - await Effect.runPromise( - restore( - config, - "snapshot-single-file", - "/tmp/restore-target", - { - organizationId: "org-1", - include: ["/var/lib/zerobyte/volumes/vol123/_data/archive/backup.20260301-233001.7z"], - selectedItemKind: "file", - }, - mockDeps, - ), + await runRestore( + config, + "snapshot-single-file", + "/tmp/restore-target", + { + organizationId: "org-1", + include: ["/var/lib/zerobyte/volumes/vol123/_data/archive/backup.20260301-233001.7z"], + selectedItemKind: "file", + }, + mockDeps, ); expect(getRestoreArg()).toBe("snapshot-single-file:/var/lib/zerobyte/volumes/vol123/_data/archive"); @@ -138,17 +142,15 @@ describe("restore command", () => { test("treats flag-like snapshot IDs as positional restore args", async () => { const { getArgs, getRestoreArg } = setup(); - await Effect.runPromise( - restore( - config, - "--help", - "/tmp/restore-target", - { - organizationId: "org-1", - basePath: "/var/lib/zerobyte/volumes/vol123/_data", - }, - mockDeps, - ), + await runRestore( + config, + "--help", + "/tmp/restore-target", + { + organizationId: "org-1", + basePath: "/var/lib/zerobyte/volumes/vol123/_data", + }, + mockDeps, ); const separatorIndex = getArgs().indexOf("--"); @@ -161,14 +163,12 @@ describe("restore command", () => { test("returns a parsed restore summary on success", async () => { setup(); - const result = await Effect.runPromise( - restore( - config, - "snapshot-123", - "/tmp/restore-target", - { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, - mockDeps, - ), + const result = await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, + mockDeps, ); expect(result).toMatchObject({ @@ -181,32 +181,41 @@ describe("restore command", () => { test("throws ResticError when the command fails", async () => { setup({ spawnResult: { exitCode: 1, summary: "", error: "restore failed" } }); - const error = await Effect.runPromise( - Effect.flip( - restore( - config, - "snapshot-123", - "/tmp/restore-target", - { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, - mockDeps, - ), - ), - ); - - expect(error).toBeInstanceOf(ResticError); - }); - - test("falls back to an empty summary when restic output cannot be parsed", async () => { - setup({ spawnResult: { summary: "not-json" } }); - - const result = await Effect.runPromise( - restore( + await expect( + runRestoreError( config, "snapshot-123", "/tmp/restore-target", { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, mockDeps, ), + ).resolves.toBeInstanceOf(ResticError); + }); + + test("cleans up temporary keys when spawning restic rejects", async () => { + const cleanupSpy = vi.spyOn(cleanupModule, "cleanupTemporaryKeys"); + setup({ spawnError: new Error("spawn failed") }); + + await runRestoreError( + config, + "snapshot-123", + "/tmp/restore-target", + { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, + mockDeps, + ); + + expect(cleanupSpy).toHaveBeenCalledTimes(1); + }); + + test("falls back to an empty summary when restic output cannot be parsed", async () => { + setup({ spawnResult: { summary: "not-json" } }); + + const result = await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { organizationId: "org-1", basePath: "/var/lib/zerobyte/volumes/vol123/_data" }, + mockDeps, ); expect(result).toEqual({ @@ -224,18 +233,16 @@ describe("restore command", () => { const progressUpdates: unknown[] = []; setup({ onSpawnCall: (params) => params.onStdout?.(validProgressLine) }); - await Effect.runPromise( - restore( - config, - "snapshot-123", - "/tmp/restore-target", - { - organizationId: "org-1", - basePath: "/var/lib/zerobyte/volumes/vol123/_data", - onProgress: (progress) => progressUpdates.push(progress), - }, - mockDeps, - ), + await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { + organizationId: "org-1", + basePath: "/var/lib/zerobyte/volumes/vol123/_data", + onProgress: (progress) => progressUpdates.push(progress), + }, + mockDeps, ); expect(progressUpdates).toHaveLength(1); @@ -255,18 +262,16 @@ describe("restore command", () => { }, }); - await Effect.runPromise( - restore( - config, - "snapshot-123", - "/tmp/restore-target", - { - organizationId: "org-1", - basePath: "/var/lib/zerobyte/volumes/vol123/_data", - onProgress: (progress) => progressUpdates.push(progress), - }, - mockDeps, - ), + await runRestore( + config, + "snapshot-123", + "/tmp/restore-target", + { + organizationId: "org-1", + basePath: "/var/lib/zerobyte/volumes/vol123/_data", + onProgress: (progress) => progressUpdates.push(progress), + }, + mockDeps, ); expect(progressUpdates).toHaveLength(0); diff --git a/packages/core/src/restic/commands/restore.ts b/packages/core/src/restic/commands/restore.ts index 24cc72d9..9b6b8f7e 100644 --- a/packages/core/src/restic/commands/restore.ts +++ b/packages/core/src/restic/commands/restore.ts @@ -1,5 +1,4 @@ import path from "node:path"; -import { z } from "zod"; import { throttle } from "es-toolkit"; import { findCommonAncestor } from "../../utils/common-ancestor"; import { addCommonArgs } from "../helpers/add-common-args"; @@ -8,8 +7,13 @@ import { buildRepoUrl } from "../helpers/build-repo-url"; import { cleanupTemporaryKeys } from "../helpers/cleanup-temporary-keys"; import { type RepositoryConfig, type OverwriteMode } from "../schemas"; import { logger, safeSpawn } from "../../node"; -import { createResticError, isResticError } from "../error"; -import { resticRestoreOutputSchema, type ResticRestoreOutputDto } from "../restic-dto"; +import { createResticError, isResticError, type AnyResticError } from "../error"; +import { + restoreProgressSchema, + resticRestoreOutputSchema, + type RestoreProgress, + type ResticRestoreOutputDto, +} from "../restic-dto"; import type { ResticDeps } from "../types"; import { Data, Effect } from "effect"; import { toMessage } from "../../utils"; @@ -19,18 +23,6 @@ class ResticRestoreCommandError extends Data.TaggedError("ResticRestoreCommandEr message: string; }> {} -const restoreProgressSchema = z.object({ - message_type: z.enum(["status", "summary"]), - seconds_elapsed: z.number().default(0), - percent_done: z.number().default(0), - total_files: z.number().default(0), - files_restored: z.number().default(0), - total_bytes: z.number().default(0), - bytes_restored: z.number().default(0), -}); - -export type RestoreProgress = z.infer; - export const restore = ( config: RepositoryConfig, snapshotId: string, @@ -48,23 +40,21 @@ export const restore = ( signal?: AbortSignal; }, deps: ResticDeps, -) => { - return Effect.tryPromise({ - try: async () => { - const repoUrl = buildRepoUrl(config); - const env = await buildEnv(config, options.organizationId, deps); - - let restoreArg = snapshotId; +): Effect.Effect => { + return Effect.scoped( + Effect.gen(function* () { + const repoUrl = yield* Effect.try(() => buildRepoUrl(config)); + const env = yield* Effect.acquireRelease( + Effect.tryPromise(() => buildEnv(config, options.organizationId, deps)), + (env) => Effect.promise(() => cleanupTemporaryKeys(env, deps)), + ); const includes = options.include?.length ? options.include : [options.basePath ?? "/"]; const commonAncestor = options.selectedItemKind === "file" && includes.length === 1 ? path.posix.dirname(includes[0] ?? "/") : findCommonAncestor(includes); - - if (target !== "/") { - restoreArg = `${snapshotId}:${commonAncestor}`; - } + const restoreArg = target === "/" ? snapshotId : `${snapshotId}:${commonAncestor}`; const args = ["--repo", repoUrl, "restore", "--target", target]; @@ -87,21 +77,19 @@ export const restore = ( if (!includesCoverRestoreRoot) { for (const pattern of strippedIncludes) { - if (pattern !== "" && pattern !== ".") { - args.push("--include", pattern); - } + args.push("--include", pattern); } } } } - if (options.exclude && options.exclude.length > 0) { + if (options.exclude?.length) { for (const pattern of options.exclude) { args.push("--exclude", pattern); } } - if (options.excludeXattr && options.excludeXattr.length > 0) { + if (options.excludeXattr?.length) { for (const xattr of options.excludeXattr) { args.push("--exclude-xattr", xattr); } @@ -110,50 +98,53 @@ export const restore = ( addCommonArgs(args, env, config); args.push("--", restoreArg); + const onProgress = options.onProgress; const streamProgress = throttle((data: string) => { - if (options.onProgress) { - try { - const jsonData = JSON.parse(data); - if (jsonData.message_type !== "status") { - return; - } + if (!onProgress) { + return; + } - const progress = restoreProgressSchema.safeParse(jsonData); - if (progress.success) { - options.onProgress(progress.data); - } else { - logger.error(progress.error.message); - } - } catch { - // Ignore JSON parse errors for non-JSON lines + try { + const jsonData = JSON.parse(data); + if (jsonData.message_type !== "status") { + return; } + + const progress = restoreProgressSchema.safeParse(jsonData); + if (progress.success) { + onProgress(progress.data); + } else { + logger.error(progress.error.message); + } + } catch { + // Ignore JSON parse errors for non-JSON lines } }, 1000); logger.debug(`Executing: restic ${args.join(" ")}`); - const res = await safeSpawn({ - command: "restic", - args, - env, - signal: options.signal, - onStdout: (data) => { - if (options.onProgress) { - streamProgress(data); - } - }, - }); - - await cleanupTemporaryKeys(env, deps); + const res = yield* Effect.tryPromise(() => + safeSpawn({ + command: "restic", + args, + env, + signal: options.signal, + onStdout: (data) => { + if (onProgress) { + streamProgress(data); + } + }, + }), + ); if (res.exitCode !== 0) { logger.error(`Restic restore failed: ${res.error}`); - throw createResticError(res.exitCode, res.stderr || res.error); + return yield* Effect.fail(createResticError(res.exitCode, res.stderr || res.error)); } const lastLine = res.summary.trim(); let summaryLine: unknown = {}; try { - summaryLine = JSON.parse(lastLine ?? "{}"); + summaryLine = JSON.parse(lastLine); } catch { logger.warn("Failed to parse restic restore output JSON summary.", lastLine); summaryLine = {}; @@ -181,16 +172,19 @@ export const restore = ( ); return result.data; - }, - catch: (error) => { - if (isResticError(error)) { - return error; - } + }).pipe( + Effect.catchAll((error): Effect.Effect => { + if (isResticError(error)) { + return Effect.fail(error); + } - return new ResticRestoreCommandError({ - cause: error, - message: toMessage(error), - }); - }, - }); + return Effect.fail( + new ResticRestoreCommandError({ + cause: error, + message: toMessage(error), + }), + ); + }), + ), + ); }; diff --git a/packages/core/src/restic/index.ts b/packages/core/src/restic/index.ts index 0900cf5c..2f975a25 100644 --- a/packages/core/src/restic/index.ts +++ b/packages/core/src/restic/index.ts @@ -1,8 +1,6 @@ export * from "./schemas"; export * from "./restic-dto"; export { isResticError, ResticError, ResticLockError } from "./error"; - -export type { RestoreProgress } from "./commands/restore"; export type { ResticDeps, ResticEnv, diff --git a/packages/core/src/restic/restic-dto.ts b/packages/core/src/restic/restic-dto.ts index 2d3b36bf..344cb53f 100644 --- a/packages/core/src/restic/restic-dto.ts +++ b/packages/core/src/restic/restic-dto.ts @@ -54,6 +54,16 @@ export const resticRestoreOutputSchema = z.object({ bytes_skipped: z.number().optional(), }); +export const restoreProgressSchema = z.object({ + message_type: z.enum(["status", "summary"]), + seconds_elapsed: z.number().default(0), + percent_done: z.number().default(0), + total_files: z.number().default(0), + files_restored: z.number().default(0), + total_bytes: z.number().default(0), + bytes_restored: z.number().default(0), +}); + export const resticStatsSchema = z.object({ total_size: z.number().default(0), total_uncompressed_size: z.number().default(0), @@ -70,4 +80,5 @@ export type ResticBackupProgressMetricsDto = z.infer; export type ResticRestoreOutputDto = z.infer; +export type RestoreProgress = z.infer; export type ResticStatsDto = z.infer;