From df4b66856084fd12a51e4618dc074acaef9ad4cd Mon Sep 17 00:00:00 2001 From: Nico <47644445+nicotsx@users.noreply.github.com> Date: Thu, 7 May 2026 18:11:57 +0200 Subject: [PATCH] feat(agent): add volume operation RPC (#861) --- app/server/jobs/cleanup-dangling.ts | 2 +- .../__tests__/agents-manager.backups.test.ts | 31 ++- app/server/modules/agents/agents-manager.ts | 19 +- .../modules/agents/controller/server.ts | 30 ++- .../modules/agents/controller/session.ts | 106 +++++++-- app/server/modules/backends/backend.ts | 25 +-- .../backends/directory/directory-backend.ts | 53 ----- .../modules/backends/nfs/nfs-backend.ts | 135 ----------- .../modules/backends/rclone/rclone-backend.ts | 131 ----------- .../modules/backends/sftp/sftp-backend.ts | 212 ------------------ .../modules/backends/smb/smb-backend.ts | 141 ------------ .../modules/backends/webdav/webdav-backend.ts | 159 ------------- .../modules/volumes/volume-config-secrets.ts | 4 + app/server/modules/volumes/volume.service.ts | 14 +- .../src/__tests__/controller-session.test.ts | 53 +++++ apps/agent/src/commands/index.ts | 4 + apps/agent/src/commands/volume.ts | 93 ++++++++ apps/agent/src/controller-session.ts | 11 +- .../src/volume-host/backends/directory.ts | 52 +++++ apps/agent/src/volume-host/backends/nfs.ts | 112 +++++++++ apps/agent/src/volume-host/backends/rclone.ts | 124 ++++++++++ apps/agent/src/volume-host/backends/sftp.ts | 162 +++++++++++++ apps/agent/src/volume-host/backends/smb.ts | 124 ++++++++++ .../src/volume-host/backends/utils.test.ts | 14 +- .../agent/src/volume-host/backends/utils.ts | 36 ++- apps/agent/src/volume-host/backends/webdav.ts | 144 ++++++++++++ apps/agent/src/volume-host/constants.ts | 9 + apps/agent/src/volume-host/fs.ts | 66 ++++++ apps/agent/src/volume-host/index.ts | 31 +++ apps/agent/src/volume-host/operations.ts | 190 ++++++++++++++++ apps/agent/src/volume-host/paths.ts | 10 + apps/agent/src/volume-host/timeout.ts | 21 ++ apps/agent/src/volume-host/types.ts | 67 ++++++ packages/contracts/src/agent-protocol.ts | 111 +++++++++ 34 files changed, 1591 insertions(+), 905 deletions(-) delete mode 100644 app/server/modules/backends/directory/directory-backend.ts delete mode 100644 app/server/modules/backends/nfs/nfs-backend.ts delete mode 100644 app/server/modules/backends/rclone/rclone-backend.ts delete mode 100644 app/server/modules/backends/sftp/sftp-backend.ts delete mode 100644 app/server/modules/backends/smb/smb-backend.ts delete mode 100644 app/server/modules/backends/webdav/webdav-backend.ts create mode 100644 apps/agent/src/commands/volume.ts create mode 100644 apps/agent/src/volume-host/backends/directory.ts create mode 100644 apps/agent/src/volume-host/backends/nfs.ts create mode 100644 apps/agent/src/volume-host/backends/rclone.ts create mode 100644 apps/agent/src/volume-host/backends/sftp.ts create mode 100644 apps/agent/src/volume-host/backends/smb.ts rename app/server/modules/backends/utils/__tests__/backend-utils.test.ts => apps/agent/src/volume-host/backends/utils.test.ts (75%) rename app/server/modules/backends/utils/backend-utils.ts => apps/agent/src/volume-host/backends/utils.ts (62%) create mode 100644 apps/agent/src/volume-host/backends/webdav.ts create mode 100644 apps/agent/src/volume-host/constants.ts create mode 100644 apps/agent/src/volume-host/fs.ts create mode 100644 apps/agent/src/volume-host/index.ts create mode 100644 apps/agent/src/volume-host/operations.ts create mode 100644 apps/agent/src/volume-host/paths.ts create mode 100644 apps/agent/src/volume-host/timeout.ts create mode 100644 apps/agent/src/volume-host/types.ts diff --git a/app/server/jobs/cleanup-dangling.ts b/app/server/jobs/cleanup-dangling.ts index b89ba0bd..a1c7eef7 100644 --- a/app/server/jobs/cleanup-dangling.ts +++ b/app/server/jobs/cleanup-dangling.ts @@ -5,7 +5,7 @@ import { volumeService } from "../modules/volumes/volume.service"; import { readMountInfo } from "../utils/mountinfo"; import { getVolumePath } from "../modules/volumes/helpers"; import { logger } from "@zerobyte/core/node"; -import { executeUnmount } from "../modules/backends/utils/backend-utils"; +import { executeUnmount } from "../../../apps/agent/src/volume-host/backends/utils"; import { toMessage } from "../utils/errors"; import { VOLUME_MOUNT_BASE } from "../core/constants"; import { db } from "../db/db"; diff --git a/app/server/modules/agents/__tests__/agents-manager.backups.test.ts b/app/server/modules/agents/__tests__/agents-manager.backups.test.ts index 25830203..eaab3870 100644 --- a/app/server/modules/agents/__tests__/agents-manager.backups.test.ts +++ b/app/server/modules/agents/__tests__/agents-manager.backups.test.ts @@ -4,7 +4,7 @@ import { fromAny, fromPartial } from "@total-typescript/shoehorn"; import { Effect } from "effect"; import { agentManager, type ProcessWithAgentRuntime } from "../agents-manager"; import type { AgentManagerRuntime } from "../controller/server"; -import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import type { BackupRunPayload, VolumeCommand, VolumeCommandResponsePayload } from "@zerobyte/contracts/agent-protocol"; const setAgentRuntime = (agentManagerRuntime: Partial | null) => { (process as ProcessWithAgentRuntime).__zerobyteAgentRuntime = { @@ -46,3 +46,32 @@ test("cancelBackup resolves a running backup when the cancel command cannot be d scheduleId: "schedule-1", }); }); + +test("runVolumeCommand sends the command to the selected agent", async () => { + const runVolumeCommand = vi.fn(() => + Effect.succeed({ + commandId: "command-1", + status: "success", + command: { name: "volume.mount", result: { status: "mounted" } }, + } satisfies VolumeCommandResponsePayload), + ); + setAgentRuntime({ runVolumeCommand }); + + const command = fromPartial({ name: "volume.mount", volume: { agentId: "agent-1" } }); + + await expect(agentManager.runVolumeCommand("agent-1", command)).resolves.toEqual({ + name: "volume.mount", + result: { status: "mounted" }, + }); + expect(runVolumeCommand).toHaveBeenCalledWith("agent-1", command); +}); + +test("runVolumeCommand fails when the selected agent is unavailable", async () => { + setAgentRuntime(null); + + const command = fromPartial({ name: "volume.mount", volume: { agentId: "agent-1" } }); + + await expect(agentManager.runVolumeCommand("agent-1", command)).rejects.toThrow( + "Volume agent agent-1 is not connected", + ); +}); diff --git a/app/server/modules/agents/agents-manager.ts b/app/server/modules/agents/agents-manager.ts index e82b8dde..962a4e03 100644 --- a/app/server/modules/agents/agents-manager.ts +++ b/app/server/modules/agents/agents-manager.ts @@ -1,5 +1,5 @@ import { logger } from "@zerobyte/core/node"; -import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import type { BackupRunPayload, VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol"; import { Effect } from "effect"; import { config } from "../../core/config"; import { createAgentManagerRuntime, type AgentManagerEvent } from "./controller/server"; @@ -266,6 +266,23 @@ export const agentManager = { cancelBackup: async (agentId: string, scheduleId: number) => { return requestBackupCancellation(agentId, scheduleId); }, + runVolumeCommand: async (agentId: string, command: VolumeCommand): Promise => { + const runtime = getAgentManagerRuntime(); + if (!runtime) { + throw new Error(`Volume agent ${agentId} is not connected`); + } + + const response = await Effect.runPromise(runtime.runVolumeCommand(agentId, command)); + if (!response) { + throw new Error(`Failed to send volume command ${command.name} to agent ${agentId}`); + } + + if (response.status === "error") { + throw new Error(response.error); + } + + return response.command; + }, }; export const startLocalAgent = async () => { diff --git a/app/server/modules/agents/controller/server.ts b/app/server/modules/agents/controller/server.ts index d5201fa4..995ba237 100644 --- a/app/server/modules/agents/controller/server.ts +++ b/app/server/modules/agents/controller/server.ts @@ -1,7 +1,13 @@ import { Data, Effect, Exit, Fiber, Scope } from "effect"; import { logger } from "@zerobyte/core/node"; import { toMessage } from "@zerobyte/core/utils"; -import type { AgentMessage, BackupCancelPayload, BackupRunPayload } from "@zerobyte/contracts/agent-protocol"; +import type { + AgentMessage, + BackupCancelPayload, + BackupRunPayload, + VolumeCommand, + VolumeCommandResponsePayload, +} from "@zerobyte/contracts/agent-protocol"; import { createControllerAgentSession, type AgentConnectionData, @@ -319,10 +325,30 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) => logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`); return false; } - logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`); return true; }), + runVolumeCommand: ( + agentId: string, + command: VolumeCommand, + ): Effect.Effect => + Effect.gen(function* () { + const session = getSession(agentId); + + if (!session) { + yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not connected.`); + return null; + } + + if (!(yield* session.isReady())) { + yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not ready.`); + return null; + } + + const result = yield* session.runVolumeCommand(command); + yield* logger.effect.info(`Completed volume command ${command.name} on agent ${agentId}`); + return result; + }), stop, }; } diff --git a/app/server/modules/agents/controller/session.ts b/app/server/modules/agents/controller/session.ts index c2c3d9c7..5a68402e 100644 --- a/app/server/modules/agents/controller/session.ts +++ b/app/server/modules/agents/controller/session.ts @@ -1,4 +1,4 @@ -import { Effect, Queue, Ref, type Scope } from "effect"; +import { Deferred, Effect, Queue, Ref, type Scope } from "effect"; import type { AgentKind } from "../../../db/schema"; import { createControllerMessage, @@ -7,6 +7,8 @@ import { type BackupCancelPayload, type BackupRunPayload, type ControllerWireMessage, + type VolumeCommand, + type VolumeCommandResponsePayload, } from "@zerobyte/contracts/agent-protocol"; import { logger } from "@zerobyte/core/node"; import { toMessage } from "@zerobyte/core/utils"; @@ -27,13 +29,23 @@ type SessionState = { lastPongAt: number | null; }; -export type ControllerAgentSessionEvent = AgentMessage | { type: "agent.disconnected" }; +type PendingCommand = { + deferred: Deferred.Deferred; + description: string; +}; + +export type ControllerAgentSessionEvent = + | Exclude + | { + type: "agent.disconnected"; + }; export type ControllerAgentSession = { readonly connectionId: string; handleMessage: (data: string) => Effect.Effect; sendBackup: (payload: BackupRunPayload) => Effect.Effect; sendBackupCancel: (payload: BackupCancelPayload) => Effect.Effect; + runVolumeCommand: (command: VolumeCommand) => Effect.Effect; isReady: () => Effect.Effect; run: Effect.Effect; }; @@ -45,6 +57,7 @@ export const createControllerAgentSession = ( Effect.gen(function* () { let isClosed = false; const outboundQueue = yield* Queue.bounded(64); + const pendingCommands = yield* Ref.make(new Map()); const state = yield* Ref.make({ isReady: false, lastSeenAt: null, @@ -63,9 +76,33 @@ export const createControllerAgentSession = ( const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update); + const setPendingCommand = (commandId: string, pending: PendingCommand) => + Ref.update(pendingCommands, (current) => new Map(current).set(commandId, pending)); + + const removePendingCommand = (commandId: string) => + Ref.modify(pendingCommands, (current) => { + const pending = current.get(commandId) ?? null; + const next = new Map(current); + next.delete(commandId); + return [pending, next]; + }); + + const rejectPendingCommands = Effect.gen(function* () { + const pendingCommandEntries = yield* Ref.get(pendingCommands); + yield* Ref.set(pendingCommands, new Map()); + + for (const pending of pendingCommandEntries.values()) { + yield* Deferred.fail( + pending.deferred, + new Error(`Agent session closed before ${pending.description} completed`), + ); + } + }); + const releaseSession = Effect.gen(function* () { const disconnectedAt = Date.now(); yield* updateState((current) => ({ ...current, isReady: false, lastSeenAt: disconnectedAt })); + yield* rejectPendingCommands; yield* onEvent({ type: "agent.disconnected" }); yield* Queue.shutdown(outboundQueue); @@ -129,20 +166,42 @@ export const createControllerAgentSession = ( return yield* Effect.never; }); + const handleVolumeCommandResult = (payload: VolumeCommandResponsePayload) => + Effect.gen(function* () { + const pending = yield* removePendingCommand(payload.commandId); + if (!pending) { + yield* logger.effect.warn(`Received response for unknown volume command ${payload.commandId}`); + return; + } + + yield* Deferred.succeed(pending.deferred, payload); + }); + const handleAgentMessage = (message: AgentMessage) => Effect.gen(function* () { - if (message.type === "agent.ready") { - const readyAt = Date.now(); - yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt })); - yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`); + switch (message.type) { + case "agent.ready": { + const readyAt = Date.now(); + yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt })); + yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`); + yield* onEvent(message); + break; + } + case "heartbeat.pong": { + const seenAt = Date.now(); + yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt })); + yield* onEvent(message); + break; + } + case "volume.commandResult": { + yield* handleVolumeCommandResult(message.payload); + break; + } + default: { + yield* onEvent(message); + break; + } } - - if (message.type === "heartbeat.pong") { - const seenAt = Date.now(); - yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt })); - } - - yield* onEvent(message); }); return { @@ -166,6 +225,27 @@ export const createControllerAgentSession = ( }, sendBackup: (payload) => offerOutbound(createControllerMessage("backup.run", payload)), sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)), + runVolumeCommand: (command) => + Effect.gen(function* () { + const commandId = Bun.randomUUIDv7(); + const description = `volume command ${command.name}`; + const deferred = yield* Deferred.make(); + yield* setPendingCommand(commandId, { deferred, description }); + + const queued = yield* offerOutbound(createControllerMessage("volume.command", { commandId, command })); + if (!queued) { + yield* removePendingCommand(commandId); + return yield* Effect.fail(new Error(`Failed to queue volume command ${command.name}`)); + } + + return yield* Deferred.await(deferred).pipe( + Effect.timeoutFail({ + duration: "60 seconds", + onTimeout: () => new Error(`Volume command ${command.name} timed out`), + }), + Effect.ensuring(removePendingCommand(commandId)), + ); + }), isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)), run, }; diff --git a/app/server/modules/backends/backend.ts b/app/server/modules/backends/backend.ts index f6eb3e3a..18b536b7 100644 --- a/app/server/modules/backends/backend.ts +++ b/app/server/modules/backends/backend.ts @@ -1,23 +1,14 @@ -import type { BackendStatus } from "~/schemas/volumes"; +import { makeDirectoryBackend } from "../../../../apps/agent/src/volume-host/backends/directory"; +import { makeNfsBackend } from "../../../../apps/agent/src/volume-host/backends/nfs"; +import { makeRcloneBackend } from "../../../../apps/agent/src/volume-host/backends/rclone"; +import { makeSftpBackend } from "../../../../apps/agent/src/volume-host/backends/sftp"; +import { makeSmbBackend } from "../../../../apps/agent/src/volume-host/backends/smb"; +import { makeWebdavBackend } from "../../../../apps/agent/src/volume-host/backends/webdav"; +import type { VolumeBackend } from "../../../../apps/agent/src/volume-host/types"; import type { Volume } from "../../db/schema"; import { getVolumePath } from "../volumes/helpers"; -import { makeDirectoryBackend } from "./directory/directory-backend"; -import { makeNfsBackend } from "./nfs/nfs-backend"; -import { makeRcloneBackend } from "./rclone/rclone-backend"; -import { makeSmbBackend } from "./smb/smb-backend"; -import { makeWebdavBackend } from "./webdav/webdav-backend"; -import { makeSftpBackend } from "./sftp/sftp-backend"; -type OperationResult = { - error?: string; - status: BackendStatus; -}; - -export type VolumeBackend = { - mount: () => Promise; - unmount: () => Promise; - checkHealth: () => Promise; -}; +export type { VolumeBackend }; export const createVolumeBackend = (volume: Volume, mountPath = getVolumePath(volume)): VolumeBackend => { switch (volume.config.backend) { diff --git a/app/server/modules/backends/directory/directory-backend.ts b/app/server/modules/backends/directory/directory-backend.ts deleted file mode 100644 index 8fa02ccf..00000000 --- a/app/server/modules/backends/directory/directory-backend.ts +++ /dev/null @@ -1,53 +0,0 @@ -import * as fs from "node:fs/promises"; -import { toMessage } from "../../../utils/errors"; -import { logger } from "@zerobyte/core/node"; -import type { VolumeBackend } from "../backend"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; - -const mount = async (config: BackendConfig, _volumePath: string) => { - if (config.backend !== "directory") { - return { status: BACKEND_STATUS.error, error: "Invalid backend type" }; - } - - logger.info("Mounting directory volume from:", config.path); - - try { - await fs.access(config.path); - const stats = await fs.stat(config.path); - - if (!stats.isDirectory()) { - return { status: BACKEND_STATUS.error, error: "Path is not a directory" }; - } - - return { status: BACKEND_STATUS.mounted }; - } catch (error) { - logger.error("Failed to mount directory volume:", error); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const unmount = async () => { - logger.info("Cannot unmount directory volume."); - return { status: BACKEND_STATUS.unmounted }; -}; - -const checkHealth = async (config: BackendConfig) => { - if (config.backend !== "directory") { - return { status: BACKEND_STATUS.error, error: "Invalid backend type" }; - } - - try { - await fs.access(config.path); - - return { status: BACKEND_STATUS.mounted }; - } catch (error) { - logger.error("Directory health check failed:", error); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -export const makeDirectoryBackend = (config: BackendConfig, volumePath: string): VolumeBackend => ({ - mount: () => mount(config, volumePath), - unmount, - checkHealth: () => checkHealth(config), -}); diff --git a/app/server/modules/backends/nfs/nfs-backend.ts b/app/server/modules/backends/nfs/nfs-backend.ts deleted file mode 100644 index 2f0fafdc..00000000 --- a/app/server/modules/backends/nfs/nfs-backend.ts +++ /dev/null @@ -1,135 +0,0 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; -import { OPERATION_TIMEOUT } from "../../../core/constants"; -import { toMessage } from "../../../utils/errors"; -import { logger } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; -import { withTimeout } from "../../../utils/timeout"; -import type { VolumeBackend } from "../backend"; -import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils"; - -const mount = async (config: BackendConfig, path: string) => { - logger.debug(`Mounting volume ${path}...`); - - if (config.backend !== "nfs") { - logger.error("Provided config is not for NFS backend"); - return { - status: BACKEND_STATUS.error, - error: "Provided config is not for NFS backend", - }; - } - - if (os.platform() !== "linux") { - logger.error("NFS mounting is only supported on Linux hosts."); - return { - status: BACKEND_STATUS.error, - error: "NFS mounting is only supported on Linux hosts.", - }; - } - - const { status } = await checkHealth(path); - if (status === "mounted") { - return { status: BACKEND_STATUS.mounted }; - } - - if (status === "error") { - logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`); - await unmount(path); - } - - const run = async () => { - await fs.mkdir(path, { recursive: true }); - - const source = `${config.server}:${config.exportPath}`; - const options = [`vers=${config.version}`, `port=${config.port}`]; - if (config.version === "3") { - options.push("nolock"); - } - if (config.readOnly) { - options.push("ro"); - } - const args = ["-t", "nfs", "-o", options.join(","), source, path]; - - logger.debug(`Mounting volume ${path}...`); - logger.info(`Executing mount: mount ${args.join(" ")}`); - - try { - await executeMount(args); - } catch (error) { - logger.warn(`Initial NFS mount failed, retrying with -i flag: ${toMessage(error)}`); - // Fallback with -i flag if the first mount fails using the mount helper - await executeMount(["-i", ...args]); - } - - logger.info(`NFS volume at ${path} mounted successfully.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "NFS mount"); - } catch (err) { - logger.error("Error mounting NFS volume", { error: toMessage(err) }); - return { status: BACKEND_STATUS.error, error: toMessage(err) }; - } -}; - -const unmount = async (path: string) => { - if (os.platform() !== "linux") { - logger.error("NFS unmounting is only supported on Linux hosts."); - return { - status: BACKEND_STATUS.error, - error: "NFS unmounting is only supported on Linux hosts.", - }; - } - - const run = async () => { - const mount = await getMountForPath(path); - if (!mount || mount.mountPoint !== path) { - logger.debug(`Path ${path} is not a mount point. Skipping unmount.`); - return { status: BACKEND_STATUS.unmounted }; - } - - await executeUnmount(path); - - await fs.rmdir(path).catch(() => {}); - - logger.info(`NFS volume at ${path} unmounted successfully.`); - return { status: BACKEND_STATUS.unmounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "NFS unmount"); - } catch (err) { - logger.error("Error unmounting NFS volume", { - path, - error: toMessage(err), - }); - return { status: BACKEND_STATUS.error, error: toMessage(err) }; - } -}; - -const checkHealth = async (path: string) => { - const run = async () => { - await assertMounted(path, (fstype) => fstype.startsWith("nfs")); - - logger.debug(`NFS volume at ${path} is healthy and mounted.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "NFS health check"); - } catch (error) { - const message = toMessage(error); - if (message !== "Volume is not mounted") { - logger.error("NFS volume health check failed:", message); - } - return { status: BACKEND_STATUS.error, error: message }; - } -}; - -export const makeNfsBackend = (config: BackendConfig, path: string): VolumeBackend => ({ - mount: () => mount(config, path), - unmount: () => unmount(path), - checkHealth: () => checkHealth(path), -}); diff --git a/app/server/modules/backends/rclone/rclone-backend.ts b/app/server/modules/backends/rclone/rclone-backend.ts deleted file mode 100644 index d4d8d241..00000000 --- a/app/server/modules/backends/rclone/rclone-backend.ts +++ /dev/null @@ -1,131 +0,0 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import { OPERATION_TIMEOUT, RCLONE_CONFIG_FILE } from "../../../core/constants"; -import { toMessage } from "../../../utils/errors"; -import { logger } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; -import { withTimeout } from "../../../utils/timeout"; -import type { VolumeBackend } from "../backend"; -import { assertMounted, executeUnmount } from "../utils/backend-utils"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; -import { safeExec } from "@zerobyte/core/node"; -import { config as zbConfig } from "~/server/core/config"; - -const mount = async (config: BackendConfig, path: string) => { - logger.debug(`Mounting rclone volume ${path}...`); - - if (config.backend !== "rclone") { - logger.error("Provided config is not for rclone backend"); - return { status: BACKEND_STATUS.error, error: "Provided config is not for rclone backend" }; - } - - if (os.platform() !== "linux") { - logger.error("Rclone mounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "Rclone mounting is only supported on Linux hosts." }; - } - - const { status } = await checkHealth(path); - if (status === "mounted") { - return { status: BACKEND_STATUS.mounted }; - } - - if (status === "error") { - logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`); - await unmount(path); - } - - const run = async () => { - await fs.mkdir(path, { recursive: true }); - - const remotePath = `${config.remote}:${config.path}`; - const args = ["mount", remotePath, path, "--daemon"]; - - if (config.readOnly) { - args.push("--read-only"); - } - - args.push("--vfs-cache-mode", "writes"); - args.push("--allow-non-empty"); - args.push("--allow-other"); - - logger.debug(`Mounting rclone volume ${path}...`); - logger.info(`Executing rclone: rclone ${args.join(" ")}`); - - const result = await safeExec({ - command: "rclone", - args, - env: { RCLONE_CONFIG: RCLONE_CONFIG_FILE }, - timeout: zbConfig.serverIdleTimeout * 1000, - }); - - if (result.exitCode !== 0) { - const errorMsg = result.stderr.toString() || result.stdout.toString() || "Unknown error"; - throw new Error(`Failed to mount rclone volume: ${errorMsg}`); - } - - logger.info(`Rclone volume at ${path} mounted successfully.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), zbConfig.serverIdleTimeout * 1000, "Rclone mount"); - } catch (error) { - const errorMsg = toMessage(error); - - logger.error("Error mounting rclone volume", { error: errorMsg }); - return { status: BACKEND_STATUS.error, error: errorMsg }; - } -}; - -const unmount = async (path: string) => { - if (os.platform() !== "linux") { - logger.error("Rclone unmounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "Rclone unmounting is only supported on Linux hosts." }; - } - - const run = async () => { - const mount = await getMountForPath(path); - if (!mount || mount.mountPoint !== path) { - logger.debug(`Path ${path} is not a mount point. Skipping unmount.`); - return { status: BACKEND_STATUS.unmounted }; - } - - await executeUnmount(path); - await fs.rmdir(path).catch(() => {}); - - logger.info(`Rclone volume at ${path} unmounted successfully.`); - return { status: BACKEND_STATUS.unmounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone unmount"); - } catch (error) { - logger.error("Error unmounting rclone volume", { path, error: toMessage(error) }); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const checkHealth = async (path: string) => { - const run = async () => { - await assertMounted(path, (fstype) => fstype.includes("rclone")); - - logger.debug(`Rclone volume at ${path} is healthy and mounted.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone health check"); - } catch (error) { - const message = toMessage(error); - if (message !== "Volume is not mounted") { - logger.error("Rclone volume health check failed:", message); - } - return { status: BACKEND_STATUS.error, error: message }; - } -}; - -export const makeRcloneBackend = (config: BackendConfig, path: string): VolumeBackend => ({ - mount: () => mount(config, path), - unmount: () => unmount(path), - checkHealth: () => checkHealth(path), -}); diff --git a/app/server/modules/backends/sftp/sftp-backend.ts b/app/server/modules/backends/sftp/sftp-backend.ts deleted file mode 100644 index 9fbb2587..00000000 --- a/app/server/modules/backends/sftp/sftp-backend.ts +++ /dev/null @@ -1,212 +0,0 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import * as path from "node:path"; -import { spawn } from "node:child_process"; -import { OPERATION_TIMEOUT, SSH_KEYS_DIR } from "../../../core/constants"; -import { cryptoUtils } from "../../../utils/crypto"; -import { toMessage } from "../../../utils/errors"; -import { logger, FILE_MODES, writeFileWithMode } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; -import { withTimeout } from "../../../utils/timeout"; -import type { VolumeBackend } from "../backend"; -import { executeUnmount } from "../utils/backend-utils"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; - -const getPrivateKeyPath = (mountPath: string) => { - const name = path.basename(mountPath); - return path.join(SSH_KEYS_DIR, `${name}.key`); -}; - -const getKnownHostsPath = (mountPath: string) => { - const name = path.basename(mountPath); - return path.join(SSH_KEYS_DIR, `${name}.known_hosts`); -}; - -const mount = async (config: BackendConfig, mountPath: string) => { - logger.debug(`Mounting SFTP volume ${mountPath}...`); - - if (config.backend !== "sftp") { - logger.error("Provided config is not for SFTP backend"); - return { status: BACKEND_STATUS.error, error: "Provided config is not for SFTP backend" }; - } - - if (os.platform() !== "linux") { - logger.error("SFTP mounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "SFTP mounting is only supported on Linux hosts." }; - } - - const { status } = await checkHealth(mountPath); - if (status === "mounted") { - return { status: BACKEND_STATUS.mounted }; - } - - if (status === "error") { - logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); - await unmount(mountPath); - } - - const run = async () => { - await fs.mkdir(mountPath, { recursive: true }); - await fs.mkdir(SSH_KEYS_DIR, { recursive: true }); - - const { uid, gid } = os.userInfo(); - const options = [ - "reconnect", - "ServerAliveInterval=15", - "ServerAliveCountMax=3", - "allow_other", - `uid=${uid}`, - `gid=${gid}`, - ]; - - if (config.skipHostKeyCheck) { - options.push("StrictHostKeyChecking=no", "UserKnownHostsFile=/dev/null"); - } else if (config.knownHosts) { - const knownHostsPath = getKnownHostsPath(mountPath); - await writeFileWithMode(knownHostsPath, config.knownHosts, FILE_MODES.ownerReadWrite); - options.push(`UserKnownHostsFile=${knownHostsPath}`, "StrictHostKeyChecking=yes"); - } else { - options.push("StrictHostKeyChecking=yes"); - } - - if (config.readOnly) { - options.push("ro"); - } - - if (config.port) { - options.push(`port=${config.port}`); - } - - const keyPath = getPrivateKeyPath(mountPath); - if (config.privateKey) { - const decryptedKey = await cryptoUtils.resolveSecret(config.privateKey); - let normalizedKey = decryptedKey.replace(/\r\n/g, "\n"); - if (!normalizedKey.endsWith("\n")) { - normalizedKey += "\n"; - } - await writeFileWithMode(keyPath, normalizedKey, FILE_MODES.ownerReadWrite); - options.push(`IdentityFile=${keyPath}`); - } - - const source = `${config.username}@${config.host}:${config.path || ""}`; - const args = [source, mountPath, "-o", options.join(",")]; - - logger.debug(`Mounting SFTP volume ${mountPath}...`); - - const runSshfs = async (mountArgs: string[], password?: string) => { - return new Promise((resolve, reject) => { - const child = spawn("sshfs", mountArgs, { stdio: ["pipe", "pipe", "pipe"] }); - let stdout = ""; - let stderr = ""; - - child.stdout.setEncoding("utf8"); - child.stderr.setEncoding("utf8"); - - child.stdout.on("data", (data) => { - stdout += data; - }); - - child.stderr.on("data", (data) => { - stderr += data; - }); - - child.on("error", (error) => { - reject(new Error(`Failed to start sshfs: ${error.message}`)); - }); - - child.on("close", (code) => { - if (code === 0) { - resolve(); - return; - } - - const errorMsg = stderr.trim() || stdout.trim() || "Unknown error"; - reject(new Error(`Failed to mount SFTP volume: ${errorMsg}`)); - }); - - if (password) { - child.stdin.write(password); - } - child.stdin.end(); - }); - }; - - if (config.password) { - const password = await cryptoUtils.resolveSecret(config.password); - args.push("-o", "password_stdin"); - logger.info(`Executing sshfs: sshfs ${args.join(" ")}`); - await runSshfs(args, password); - } else { - logger.info(`Executing sshfs: sshfs ${args.join(" ")}`); - await runSshfs(args); - } - - logger.info(`SFTP volume at ${mountPath} mounted successfully.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT * 2, "SFTP mount"); - } catch (error) { - const errorMsg = toMessage(error); - logger.error("Error mounting SFTP volume", { error: errorMsg }); - return { status: BACKEND_STATUS.error, error: errorMsg }; - } -}; - -const unmount = async (mountPath: string) => { - if (os.platform() !== "linux") { - logger.error("SFTP unmounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "SFTP unmounting is only supported on Linux hosts." }; - } - - const run = async () => { - const mount = await getMountForPath(mountPath); - if (!mount || mount.mountPoint !== mountPath) { - logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); - } else { - await executeUnmount(mountPath); - } - - const keyPath = getPrivateKeyPath(mountPath); - await fs.unlink(keyPath).catch(() => {}); - - const knownHostsPath = getKnownHostsPath(mountPath); - await fs.unlink(knownHostsPath).catch(() => {}); - - await fs.rmdir(mountPath).catch(() => {}); - - logger.info(`SFTP volume at ${mountPath} unmounted successfully.`); - return { status: BACKEND_STATUS.unmounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "SFTP unmount"); - } catch (error) { - logger.error("Error unmounting SFTP volume", { mountPath, error: toMessage(error) }); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const checkHealth = async (mountPath: string) => { - const mount = await getMountForPath(mountPath); - - if (!mount || mount.mountPoint !== mountPath) { - return { status: BACKEND_STATUS.unmounted }; - } - - if (mount.fstype !== "fuse.sshfs") { - return { - status: BACKEND_STATUS.error, - error: `Invalid filesystem type: ${mount.fstype} (expected fuse.sshfs)`, - }; - } - - return { status: BACKEND_STATUS.mounted }; -}; - -export const makeSftpBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ - mount: () => mount(config, mountPath), - unmount: () => unmount(mountPath), - checkHealth: () => checkHealth(mountPath), -}); diff --git a/app/server/modules/backends/smb/smb-backend.ts b/app/server/modules/backends/smb/smb-backend.ts deleted file mode 100644 index a4d51a45..00000000 --- a/app/server/modules/backends/smb/smb-backend.ts +++ /dev/null @@ -1,141 +0,0 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import { OPERATION_TIMEOUT } from "../../../core/constants"; -import { cryptoUtils } from "../../../utils/crypto"; -import { toMessage } from "../../../utils/errors"; -import { logger } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; -import { withTimeout } from "../../../utils/timeout"; -import type { VolumeBackend } from "../backend"; -import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; - -const mount = async (config: BackendConfig, path: string) => { - logger.debug(`Mounting SMB volume ${path}...`); - - if (config.backend !== "smb") { - logger.error("Provided config is not for SMB backend"); - return { status: BACKEND_STATUS.error, error: "Provided config is not for SMB backend" }; - } - - if (os.platform() !== "linux") { - logger.error("SMB mounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "SMB mounting is only supported on Linux hosts." }; - } - - const { status } = await checkHealth(path); - if (status === "mounted") { - return { status: BACKEND_STATUS.mounted }; - } - - if (status === "error") { - logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`); - await unmount(path); - } - - const run = async () => { - await fs.mkdir(path, { recursive: true }); - - const source = `//${config.server}/${config.share}`; - const { uid, gid } = os.userInfo(); - - const options = [`port=${config.port}`, `uid=${uid}`, `gid=${gid}`, "iocharset=utf8"]; - - if (config.guest) { - options.push("username=guest", "password="); - } else { - const password = await cryptoUtils.resolveSecret(config.password ?? ""); - const safePassword = password.replace(/\\/g, "\\\\").replace(/,/g, "\\,"); - - options.push(`username=${config.username ?? "user"}`, `password=${safePassword}`); - } - - if (config.domain) { - options.push(`domain=${config.domain}`); - } - - if (config.vers && config.vers !== "auto") { - options.push(`vers=${config.vers}`); - } - - if (config.readOnly) { - options.push("ro"); - } - - const args = ["-t", "cifs", "-o", options.join(","), source, path]; - - logger.debug(`Mounting SMB volume ${path}...`); - logger.info(`Executing SMB mount for ${source} at ${path}`); - - try { - await executeMount(args); - } catch (error) { - logger.error(`SMB mount failed: ${toMessage(error)}`); - throw error; - } - - logger.info(`SMB volume at ${path} mounted successfully.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "SMB mount"); - } catch (error) { - logger.error("Error mounting SMB volume", { error: toMessage(error) }); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const unmount = async (path: string) => { - if (os.platform() !== "linux") { - logger.error("SMB unmounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "SMB unmounting is only supported on Linux hosts." }; - } - - const run = async () => { - const mount = await getMountForPath(path); - if (!mount || mount.mountPoint !== path) { - logger.debug(`Path ${path} is not a mount point. Skipping unmount.`); - return { status: BACKEND_STATUS.unmounted }; - } - - await executeUnmount(path); - - await fs.rmdir(path).catch(() => {}); - - logger.info(`SMB volume at ${path} unmounted successfully.`); - return { status: BACKEND_STATUS.unmounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "SMB unmount"); - } catch (error) { - logger.error("Error unmounting SMB volume", { path, error: toMessage(error) }); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const checkHealth = async (path: string) => { - const run = async () => { - await assertMounted(path, (fstype) => fstype === "cifs"); - - logger.debug(`SMB volume at ${path} is healthy and mounted.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "SMB health check"); - } catch (error) { - const message = toMessage(error); - if (message !== "Volume is not mounted") { - logger.error("SMB volume health check failed:", message); - } - return { status: BACKEND_STATUS.error, error: message }; - } -}; - -export const makeSmbBackend = (config: BackendConfig, path: string): VolumeBackend => ({ - mount: () => mount(config, path), - unmount: () => unmount(path), - checkHealth: () => checkHealth(path), -}); diff --git a/app/server/modules/backends/webdav/webdav-backend.ts b/app/server/modules/backends/webdav/webdav-backend.ts deleted file mode 100644 index 7885be84..00000000 --- a/app/server/modules/backends/webdav/webdav-backend.ts +++ /dev/null @@ -1,159 +0,0 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import { OPERATION_TIMEOUT } from "../../../core/constants"; -import { cryptoUtils } from "../../../utils/crypto"; -import { toMessage } from "../../../utils/errors"; -import { logger } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; -import { withTimeout } from "../../../utils/timeout"; -import type { VolumeBackend } from "../backend"; -import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils"; -import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes"; - -const mount = async (config: BackendConfig, path: string) => { - logger.debug(`Mounting WebDAV volume ${path}...`); - - if (config.backend !== "webdav") { - logger.error("Provided config is not for WebDAV backend"); - return { status: BACKEND_STATUS.error, error: "Provided config is not for WebDAV backend" }; - } - - if (os.platform() !== "linux") { - logger.error("WebDAV mounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "WebDAV mounting is only supported on Linux hosts." }; - } - - const { status } = await checkHealth(path); - if (status === "mounted") { - return { status: BACKEND_STATUS.mounted }; - } - - if (status === "error") { - logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`); - await unmount(path); - } - - const run = async () => { - await fs.mkdir(path, { recursive: true }).catch((err) => { - logger.warn(`Failed to create directory ${path}: ${err.message}`); - }); - - const protocol = config.ssl ? "https" : "http"; - const defaultPort = config.ssl ? 443 : 80; - const port = config.port !== defaultPort ? `:${config.port}` : ""; - const source = `${protocol}://${config.server}${port}${config.path}`; - - const { uid, gid } = os.userInfo(); - const options = config.readOnly - ? [`uid=${uid}`, `gid=${gid}`, "file_mode=0444", "dir_mode=0555", "ro"] - : [`uid=${uid}`, `gid=${gid}`, "file_mode=0664", "dir_mode=0775"]; - - if (config.username && config.password) { - const password = await cryptoUtils.resolveSecret(config.password); - const secretsFile = "/etc/davfs2/secrets"; - const entry = [source, config.username, password].map((value) => value.replace(/[\r\n\t\s]+/g, " ")).join(" "); - const secretsContent = `${entry}\n`; - await fs.appendFile(secretsFile, secretsContent, { mode: 0o600 }); - } - - logger.debug(`Mounting WebDAV volume ${path}...`); - - const args = ["-t", "davfs", "-o", options.join(","), source, path]; - - try { - await executeMount(args); - } catch (error) { - logger.warn(`Initial WebDAV mount failed, retrying with -i flag: ${toMessage(error)}`); - // Fallback with -i flag if the first mount fails using the mount helper - await executeMount(["-i", ...args]); - } - - logger.info(`WebDAV volume at ${path} mounted successfully.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV mount"); - } catch (error) { - const errorMsg = toMessage(error); - - if (errorMsg.includes("already mounted")) { - return { status: BACKEND_STATUS.mounted }; - } - - logger.error("Error mounting WebDAV volume", { error: errorMsg }); - - if (errorMsg.includes("option") && errorMsg.includes("requires argument")) { - return { - status: BACKEND_STATUS.error, - error: "Invalid mount options. Please check your WebDAV server configuration.", - }; - } else if (errorMsg.includes("connection refused") || errorMsg.includes("Connection refused")) { - return { - status: BACKEND_STATUS.error, - error: "Cannot connect to WebDAV server. Please check the server address and port.", - }; - } else if (errorMsg.includes("unauthorized") || errorMsg.includes("Unauthorized")) { - return { - status: BACKEND_STATUS.error, - error: "Authentication failed. Please check your username and password.", - }; - } - - return { status: BACKEND_STATUS.error, error: errorMsg }; - } -}; - -const unmount = async (path: string) => { - if (os.platform() !== "linux") { - logger.error("WebDAV unmounting is only supported on Linux hosts."); - return { status: BACKEND_STATUS.error, error: "WebDAV unmounting is only supported on Linux hosts." }; - } - - const run = async () => { - const mount = await getMountForPath(path); - if (!mount || mount.mountPoint !== path) { - logger.debug(`Path ${path} is not a mount point. Skipping unmount.`); - return { status: BACKEND_STATUS.unmounted }; - } - - await executeUnmount(path); - - await fs.rmdir(path).catch(() => {}); - - logger.info(`WebDAV volume at ${path} unmounted successfully.`); - return { status: BACKEND_STATUS.unmounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV unmount"); - } catch (error) { - logger.error("Error unmounting WebDAV volume", { path, error: toMessage(error) }); - return { status: BACKEND_STATUS.error, error: toMessage(error) }; - } -}; - -const checkHealth = async (path: string) => { - const run = async () => { - await assertMounted(path, (fstype) => fstype === "fuse" || fstype === "davfs"); - - logger.debug(`WebDAV volume at ${path} is healthy and mounted.`); - return { status: BACKEND_STATUS.mounted }; - }; - - try { - return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV health check"); - } catch (error) { - const message = toMessage(error); - if (message !== "Volume is not mounted") { - logger.error("WebDAV volume health check failed:", message); - } - return { status: BACKEND_STATUS.error, error: message }; - } -}; - -export const makeWebdavBackend = (config: BackendConfig, path: string): VolumeBackend => ({ - mount: () => mount(config, path), - unmount: () => unmount(path), - checkHealth: () => checkHealth(path), -}); diff --git a/app/server/modules/volumes/volume-config-secrets.ts b/app/server/modules/volumes/volume-config-secrets.ts index 2f9bf1c7..1bda9efe 100644 --- a/app/server/modules/volumes/volume-config-secrets.ts +++ b/app/server/modules/volumes/volume-config-secrets.ts @@ -32,3 +32,7 @@ export const mapVolumeConfigSecrets = async ( export const encryptVolumeConfig = async (config: BackendConfig): Promise => { return await mapVolumeConfigSecrets(config, cryptoUtils.sealSecret); }; + +export const decryptVolumeConfig = async (config: BackendConfig): Promise => { + return await mapVolumeConfigSecrets(config, cryptoUtils.resolveSecret); +}; diff --git a/app/server/modules/volumes/volume.service.ts b/app/server/modules/volumes/volume.service.ts index 0c5f4ba3..074433e0 100644 --- a/app/server/modules/volumes/volume.service.ts +++ b/app/server/modules/volumes/volume.service.ts @@ -20,7 +20,7 @@ import { volumeConfigSchema, type BackendConfig } from "~/schemas/volumes"; import { getOrganizationId } from "~/server/core/request-context"; import { isNodeJSErrnoException } from "~/server/utils/fs"; import { asShortId, type ShortId } from "~/server/utils/branded"; -import { encryptVolumeConfig } from "./volume-config-secrets"; +import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets"; type EnsureHealthyVolumeResult = | { @@ -80,7 +80,7 @@ const createVolume = async (name: string, backendConfig: BackendConfig) => { throw new InternalServerError("Failed to create volume"); } - const backend = createVolumeBackend(created); + const backend = createVolumeBackend({ ...created, config: await decryptVolumeConfig(created.config) }); const { error, status } = await backend.mount(); await db @@ -114,7 +114,7 @@ const mountVolume = async (shortId: ShortId) => { throw new NotFoundError("Volume not found"); } - const backend = createVolumeBackend(volume); + const backend = createVolumeBackend({ ...volume, config: await decryptVolumeConfig(volume.config) }); await backend.unmount(); const { error, status } = await backend.mount(); @@ -219,7 +219,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => { } if (configChanged) { - const backend = createVolumeBackend(updated); + const backend = createVolumeBackend({ ...updated, config: await decryptVolumeConfig(updated.config) }); const { error, status } = await backend.mount(); await db .update(volumesTable) @@ -235,18 +235,16 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => { const testConnection = async (backendConfig: BackendConfig) => { const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-")); try { - const encryptedConfig = await encryptVolumeConfig(backendConfig); - const mockVolume = { id: 0, shortId: asShortId("test"), name: "test-connection", path: tempDir, - config: encryptedConfig, + config: backendConfig, createdAt: Date.now(), updatedAt: Date.now(), lastHealthCheck: Date.now(), - type: encryptedConfig.backend, + type: backendConfig.backend, status: "unmounted" as const, lastError: null, provisioningId: null, diff --git a/apps/agent/src/__tests__/controller-session.test.ts b/apps/agent/src/__tests__/controller-session.test.ts index c6dea013..3745c1c7 100644 --- a/apps/agent/src/__tests__/controller-session.test.ts +++ b/apps/agent/src/__tests__/controller-session.test.ts @@ -95,3 +95,56 @@ test("closes the websocket when an outbound send throws", async () => { session.close(); } }); + +test("continues processing inbound messages after a volume command fails", async () => { + const outboundMessages: string[] = []; + const session = createControllerSession( + fromPartial({ + send: (message: string) => { + outboundMessages.push(message); + }, + }), + ); + + try { + session.onMessage( + createControllerMessage("volume.command", { + commandId: "command-1", + command: { + name: "filesystem.browse", + path: "/path/that/does/not/exist", + }, + }), + ); + session.onMessage(createControllerMessage("heartbeat.ping", { sentAt: 123 })); + + await waitForExpect(() => { + const parsedMessages = outboundMessages.map((message) => parseAgentMessage(message)); + const volumeResult = parsedMessages.find( + (message) => message?.success && message.data.type === "volume.commandResult", + ); + const heartbeatPong = parsedMessages.find( + (message) => message?.success && message.data.type === "heartbeat.pong", + ); + + expect(volumeResult?.success).toBe(true); + if (!volumeResult || !volumeResult.success || volumeResult.data.type !== "volume.commandResult") { + return; + } + + expect(volumeResult.data.payload).toEqual({ + commandId: "command-1", + status: "error", + error: "ENOENT: no such file or directory, scandir '/path/that/does/not/exist'", + }); + expect(heartbeatPong?.success).toBe(true); + if (!heartbeatPong || !heartbeatPong.success || heartbeatPong.data.type !== "heartbeat.pong") { + return; + } + + expect(heartbeatPong.data.payload).toEqual({ sentAt: 123 }); + }); + } finally { + session.close(); + } +}); diff --git a/apps/agent/src/commands/index.ts b/apps/agent/src/commands/index.ts index a17360df..6f73e0c6 100644 --- a/apps/agent/src/commands/index.ts +++ b/apps/agent/src/commands/index.ts @@ -3,6 +3,7 @@ import { handleBackupCancelCommand } from "./backup-cancel"; import { handleBackupRunCommand } from "./backup-run"; import type { ControllerCommandContext } from "../context"; import { handleHeartbeatPingCommand } from "./heartbeat-ping"; +import { handleVolumeCommand } from "./volume"; export const handleControllerCommand = (context: ControllerCommandContext, message: ControllerMessage) => { switch (message.type) { @@ -12,6 +13,9 @@ export const handleControllerCommand = (context: ControllerCommandContext, messa case "backup.cancel": { return handleBackupCancelCommand(context, message.payload); } + case "volume.command": { + return handleVolumeCommand(context, message.payload); + } case "heartbeat.ping": { return handleHeartbeatPingCommand(context, message.payload); } diff --git a/apps/agent/src/commands/volume.ts b/apps/agent/src/commands/volume.ts new file mode 100644 index 00000000..55b3c1a5 --- /dev/null +++ b/apps/agent/src/commands/volume.ts @@ -0,0 +1,93 @@ +import { Effect, Data } from "effect"; +import { createAgentMessage, type VolumeCommand, type VolumeCommandPayload } from "@zerobyte/contracts/agent-protocol"; +import { toMessage } from "@zerobyte/core/utils"; +import { createVolumeBackend, getStatFs, getVolumePath, type AgentVolume, type BackendConfig } from "../volume-host"; +import { browseFilesystem, listVolumeFiles, testVolumeConnection } from "../volume-host/operations"; +import type { ControllerCommandContext } from "../context"; + +type VolumeBackedCommand = Extract; + +class VolumeCommandError extends Data.TaggedError("StopAgentManagerServerError")<{ + cause: unknown; +}> {} + +const asVolume = (volume: VolumeBackedCommand["volume"]): AgentVolume => ({ + ...volume, + config: volume.config as BackendConfig, + provisioningId: volume.provisioningId ?? null, +}); + +const runBackendOperation = ( + command: Extract, + operation: "mount" | "unmount" | "checkHealth", +) => + Effect.tryPromise({ + try: () => { + const backend = createVolumeBackend(asVolume(command.volume)); + return backend[operation](); + }, + catch: (error) => new VolumeCommandError({ cause: error }), + }); + +const executeVolumeCommand = (command: VolumeCommand) => + Effect.gen(function* () { + switch (command.name) { + case "volume.mount": + return { name: command.name, result: yield* runBackendOperation(command, "mount") }; + case "volume.unmount": + return { name: command.name, result: yield* runBackendOperation(command, "unmount") }; + case "volume.checkHealth": + return { name: command.name, result: yield* runBackendOperation(command, "checkHealth") }; + case "volume.statfs": { + const result = yield* Effect.tryPromise({ + try: () => getStatFs(getVolumePath(asVolume(command.volume))), + catch: (error) => new VolumeCommandError({ cause: error }), + }); + return { name: command.name, result }; + } + case "volume.listFiles": { + const result = yield* Effect.tryPromise({ + try: () => listVolumeFiles(asVolume(command.volume), command.subPath, command.offset, command.limit), + catch: (error) => new VolumeCommandError({ cause: error }), + }); + + return { name: command.name, result }; + } + case "volume.testConnection": { + const result = yield* testVolumeConnection(command.backendConfig as BackendConfig); + return { name: command.name, result }; + } + case "filesystem.browse": + const result = yield* Effect.tryPromise({ + try: () => browseFilesystem(command.path), + catch: (error) => new VolumeCommandError({ cause: error }), + }); + return { name: command.name, result }; + } + }); + +export const handleVolumeCommand = (context: ControllerCommandContext, payload: VolumeCommandPayload) => { + return Effect.gen(function* () { + const command = yield* executeVolumeCommand(payload.command); + + yield* context.offerOutbound( + createAgentMessage("volume.commandResult", { + commandId: payload.commandId, + status: "success", + command, + }), + ); + + return command; + }).pipe( + Effect.tapError((error) => { + return context.offerOutbound( + createAgentMessage("volume.commandResult", { + commandId: payload.commandId, + status: "error", + error: toMessage(error?.cause), + }), + ); + }), + ); +}; diff --git a/apps/agent/src/controller-session.ts b/apps/agent/src/controller-session.ts index d8f82e24..07331166 100644 --- a/apps/agent/src/controller-session.ts +++ b/apps/agent/src/controller-session.ts @@ -120,7 +120,16 @@ export const createControllerSession = (ws: WebSocket): ControllerSession => { return; } - yield* handleControllerCommand(commandContext, parsed.data); + const commandEffect: Effect.Effect = handleControllerCommand( + commandContext, + parsed.data, + ); + + yield* commandEffect.pipe( + Effect.catchAll((error) => + Effect.sync(() => logger.error(`Failed to handle controller message: ${toMessage(error)}`)), + ), + ); }), ), ); diff --git a/apps/agent/src/volume-host/backends/directory.ts b/apps/agent/src/volume-host/backends/directory.ts new file mode 100644 index 00000000..67151268 --- /dev/null +++ b/apps/agent/src/volume-host/backends/directory.ts @@ -0,0 +1,52 @@ +import * as fs from "node:fs/promises"; +import { logger } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import type { BackendConfig, VolumeBackend } from "../types"; + +const mount = async (config: BackendConfig) => { + if (config.backend !== "directory") { + return { status: "error" as const, error: "Invalid backend type" }; + } + + logger.info("Mounting directory volume from:", config.path); + + try { + await fs.access(config.path); + const stats = await fs.stat(config.path); + + if (!stats.isDirectory()) { + return { status: "error" as const, error: "Path is not a directory" }; + } + + return { status: "mounted" as const }; + } catch (error) { + logger.error("Failed to mount directory volume:", error); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const unmount = async () => { + logger.info("Cannot unmount directory volume."); + return { status: "unmounted" as const }; +}; + +const checkHealth = async (config: BackendConfig) => { + if (config.backend !== "directory") { + return { status: "error" as const, error: "Invalid backend type" }; + } + + try { + await fs.access(config.path); + + return { status: "mounted" as const }; + } catch (error) { + logger.error("Directory health check failed:", error); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +export const makeDirectoryBackend = (config: BackendConfig, _: string): VolumeBackend => ({ + mount: () => mount(config), + unmount, + checkHealth: () => checkHealth(config), +}); diff --git a/apps/agent/src/volume-host/backends/nfs.ts b/apps/agent/src/volume-host/backends/nfs.ts new file mode 100644 index 00000000..803e9a45 --- /dev/null +++ b/apps/agent/src/volume-host/backends/nfs.ts @@ -0,0 +1,112 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import { logger } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import { OPERATION_TIMEOUT } from "../constants"; +import { withTimeout } from "../timeout"; +import { getMountForPath } from "../fs"; +import type { BackendConfig, VolumeBackend } from "../types"; +import { assertMounted, executeMount, executeUnmount } from "./utils"; + +const checkHealth = async (mountPath: string) => { + const run = async () => { + await assertMounted(mountPath, (fstype) => fstype.startsWith("nfs")); + + logger.debug(`NFS volume at ${mountPath} is healthy and mounted.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "NFS health check"); + } catch (error) { + const message = toMessage(error); + if (message !== "Volume is not mounted") { + logger.error("NFS volume health check failed:", message); + } + return { status: "error" as const, error: message }; + } +}; + +const unmount = async (mountPath: string) => { + if (os.platform() !== "linux") { + logger.error("NFS unmounting is only supported on Linux hosts."); + return { status: "error" as const, error: "NFS unmounting is only supported on Linux hosts." }; + } + + const run = async () => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { + logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); + return { status: "unmounted" as const }; + } + + await executeUnmount(mountPath); + await fs.rmdir(mountPath).catch(() => {}); + + logger.info(`NFS volume at ${mountPath} unmounted successfully.`); + return { status: "unmounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "NFS unmount"); + } catch (error) { + logger.error("Error unmounting NFS volume", { mountPath, error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const mount = async (config: BackendConfig, mountPath: string) => { + logger.debug(`Mounting volume ${mountPath}...`); + + if (config.backend !== "nfs") { + logger.error("Provided config is not for NFS backend"); + return { status: "error" as const, error: "Provided config is not for NFS backend" }; + } + + if (os.platform() !== "linux") { + logger.error("NFS mounting is only supported on Linux hosts."); + return { status: "error" as const, error: "NFS mounting is only supported on Linux hosts." }; + } + + const { status } = await checkHealth(mountPath); + if (status === "mounted") return { status: "mounted" as const }; + + if (status === "error") { + logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); + await unmount(mountPath); + } + + const run = async () => { + await fs.mkdir(mountPath, { recursive: true }); + const options = [`vers=${config.version}`, `port=${config.port}`]; + if (config.version === "3") options.push("nolock"); + if (config.readOnly) options.push("ro"); + const args = ["-t", "nfs", "-o", options.join(","), `${config.server}:${config.exportPath}`, mountPath]; + + logger.debug(`Mounting volume ${mountPath}...`); + logger.info(`Executing mount: mount ${args.join(" ")}`); + + try { + await executeMount(args); + } catch (error) { + logger.warn(`Initial NFS mount failed, retrying with -i flag: ${toMessage(error)}`); + await executeMount(["-i", ...args]); + } + + logger.info(`NFS volume at ${mountPath} mounted successfully.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "NFS mount"); + } catch (error) { + logger.error("Error mounting NFS volume", { error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +export const makeNfsBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ + mount: () => mount(config, mountPath), + unmount: () => unmount(mountPath), + checkHealth: () => checkHealth(mountPath), +}); diff --git a/apps/agent/src/volume-host/backends/rclone.ts b/apps/agent/src/volume-host/backends/rclone.ts new file mode 100644 index 00000000..011f2cf4 --- /dev/null +++ b/apps/agent/src/volume-host/backends/rclone.ts @@ -0,0 +1,124 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import { logger, safeExec } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import { OPERATION_TIMEOUT, RCLONE_CONFIG_FILE, RCLONE_TIMEOUT } from "../constants"; +import { withTimeout } from "../timeout"; +import { getMountForPath } from "../fs"; +import type { BackendConfig, VolumeBackend } from "../types"; +import { assertMounted, executeUnmount } from "./utils"; + +const checkHealth = async (mountPath: string) => { + const run = async () => { + await assertMounted(mountPath, (fstype) => fstype.includes("rclone")); + + logger.debug(`Rclone volume at ${mountPath} is healthy and mounted.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone health check"); + } catch (error) { + const message = toMessage(error); + if (message !== "Volume is not mounted") { + logger.error("Rclone volume health check failed:", message); + } + return { status: "error" as const, error: message }; + } +}; + +const unmount = async (mountPath: string) => { + if (os.platform() !== "linux") { + logger.error("Rclone unmounting is only supported on Linux hosts."); + return { status: "error" as const, error: "Rclone unmounting is only supported on Linux hosts." }; + } + + const run = async () => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { + logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); + return { status: "unmounted" as const }; + } + + await executeUnmount(mountPath); + await fs.rmdir(mountPath).catch(() => {}); + + logger.info(`Rclone volume at ${mountPath} unmounted successfully.`); + return { status: "unmounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone unmount"); + } catch (error) { + logger.error("Error unmounting rclone volume", { mountPath, error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const mount = async (config: BackendConfig, mountPath: string) => { + logger.debug(`Mounting rclone volume ${mountPath}...`); + + if (config.backend !== "rclone") { + logger.error("Provided config is not for rclone backend"); + return { status: "error" as const, error: "Provided config is not for rclone backend" }; + } + + if (os.platform() !== "linux") { + logger.error("Rclone mounting is only supported on Linux hosts."); + return { status: "error" as const, error: "Rclone mounting is only supported on Linux hosts." }; + } + + const { status } = await checkHealth(mountPath); + if (status === "mounted") return { status: "mounted" as const }; + + if (status === "error") { + logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); + await unmount(mountPath); + } + + const run = async () => { + await fs.mkdir(mountPath, { recursive: true }); + const args = [ + "mount", + `${config.remote}:${config.path}`, + mountPath, + "--daemon", + "--vfs-cache-mode", + "writes", + "--allow-non-empty", + "--allow-other", + ]; + if (config.readOnly) args.push("--read-only"); + + logger.debug(`Mounting rclone volume ${mountPath}...`); + logger.info(`Executing rclone: rclone ${args.join(" ")}`); + + const result = await safeExec({ + command: "rclone", + args, + env: { RCLONE_CONFIG: RCLONE_CONFIG_FILE }, + timeout: RCLONE_TIMEOUT, + }); + if (result.exitCode !== 0) { + const errorMsg = result.stderr.toString() || result.stdout.toString() || "Unknown error"; + throw new Error(`Failed to mount rclone volume: ${errorMsg}`); + } + + logger.info(`Rclone volume at ${mountPath} mounted successfully.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), RCLONE_TIMEOUT, "Rclone mount"); + } catch (error) { + const errorMsg = toMessage(error); + logger.error("Error mounting rclone volume", { error: errorMsg }); + return { status: "error" as const, error: errorMsg }; + } +}; + +export const makeRcloneBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ + mount: () => mount(config, mountPath), + unmount: () => unmount(mountPath), + checkHealth: () => checkHealth(mountPath), +}); diff --git a/apps/agent/src/volume-host/backends/sftp.ts b/apps/agent/src/volume-host/backends/sftp.ts new file mode 100644 index 00000000..021ea77f --- /dev/null +++ b/apps/agent/src/volume-host/backends/sftp.ts @@ -0,0 +1,162 @@ +import * as fs from "node:fs/promises"; +import { createHash } from "node:crypto"; +import * as os from "node:os"; +import * as path from "node:path"; +import { spawn } from "node:child_process"; +import { FILE_MODES, logger, writeFileWithMode } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import { OPERATION_TIMEOUT, SSH_KEYS_DIR } from "../constants"; +import { getMountForPath } from "../fs"; +import { withTimeout } from "../timeout"; +import type { BackendConfig, VolumeBackend } from "../types"; +import { executeUnmount } from "./utils"; + +const getMountPathHash = (mountPath: string) => createHash("sha256").update(mountPath).digest("hex").slice(0, 16); +const getPrivateKeyPath = (mountPath: string) => path.join(SSH_KEYS_DIR, `${getMountPathHash(mountPath)}.key`); +const getKnownHostsPath = (mountPath: string) => path.join(SSH_KEYS_DIR, `${getMountPathHash(mountPath)}.known_hosts`); + +const runSshfs = async (args: string[], password?: string) => + new Promise((resolve, reject) => { + const child = spawn("sshfs", args, { stdio: ["pipe", "pipe", "pipe"] }); + let stdout = ""; + let stderr = ""; + + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + + child.stdout.on("data", (data) => { + stdout += data; + }); + child.stderr.on("data", (data) => { + stderr += data; + }); + child.on("error", (error) => { + reject(new Error(`Failed to start sshfs: ${error.message}`)); + }); + child.on("close", (code) => { + if (code === 0) { + resolve(); + return; + } + + const errorMsg = stderr.trim() || stdout.trim() || "Unknown error"; + reject(new Error(`Failed to mount SFTP volume: ${errorMsg}`)); + }); + + if (password) child.stdin.write(password); + child.stdin.end(); + }); + +const checkHealth = async (mountPath: string) => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) return { status: "unmounted" as const }; + if (mount.fstype !== "fuse.sshfs") { + return { status: "error" as const, error: `Invalid filesystem type: ${mount.fstype} (expected fuse.sshfs)` }; + } + return { status: "mounted" as const }; +}; + +const unmount = async (mountPath: string) => { + if (os.platform() !== "linux") { + logger.error("SFTP unmounting is only supported on Linux hosts."); + return { status: "error" as const, error: "SFTP unmounting is only supported on Linux hosts." }; + } + + const run = async () => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { + logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); + } else { + await executeUnmount(mountPath); + } + + await fs.unlink(getPrivateKeyPath(mountPath)).catch(() => {}); + await fs.unlink(getKnownHostsPath(mountPath)).catch(() => {}); + await fs.rmdir(mountPath).catch(() => {}); + + logger.info(`SFTP volume at ${mountPath} unmounted successfully.`); + return { status: "unmounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "SFTP unmount"); + } catch (error) { + logger.error("Error unmounting SFTP volume", { mountPath, error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const mount = async (config: BackendConfig, mountPath: string) => { + logger.debug(`Mounting SFTP volume ${mountPath}...`); + + if (config.backend !== "sftp") { + logger.error("Provided config is not for SFTP backend"); + return { status: "error" as const, error: "Provided config is not for SFTP backend" }; + } + + if (os.platform() !== "linux") { + logger.error("SFTP mounting is only supported on Linux hosts."); + return { status: "error" as const, error: "SFTP mounting is only supported on Linux hosts." }; + } + + const { status } = await checkHealth(mountPath); + if (status === "mounted") return { status: "mounted" as const }; + + if (status === "error") { + logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); + await unmount(mountPath); + } + + const run = async () => { + await fs.mkdir(mountPath, { recursive: true }); + await fs.mkdir(SSH_KEYS_DIR, { recursive: true }); + const { uid, gid } = os.userInfo(); + const options = [ + "reconnect", + "ServerAliveInterval=15", + "ServerAliveCountMax=3", + "allow_other", + `uid=${uid}`, + `gid=${gid}`, + ]; + + if (config.skipHostKeyCheck) { + options.push("StrictHostKeyChecking=no", "UserKnownHostsFile=/dev/null"); + } else if (config.knownHosts) { + await writeFileWithMode(getKnownHostsPath(mountPath), config.knownHosts, FILE_MODES.ownerReadWrite); + options.push(`UserKnownHostsFile=${getKnownHostsPath(mountPath)}`, "StrictHostKeyChecking=yes"); + } else { + options.push("StrictHostKeyChecking=yes"); + } + + if (config.readOnly) options.push("ro"); + if (config.port) options.push(`port=${config.port}`); + if (config.privateKey) { + let key = config.privateKey.replace(/\r\n/g, "\n"); + if (!key.endsWith("\n")) key += "\n"; + await writeFileWithMode(getPrivateKeyPath(mountPath), key, FILE_MODES.ownerReadWrite); + options.push(`IdentityFile=${getPrivateKeyPath(mountPath)}`); + } + + const args = [`${config.username}@${config.host}:${config.path || ""}`, mountPath, "-o", options.join(",")]; + if (config.password) args.push("-o", "password_stdin"); + logger.info(`Executing sshfs: sshfs ${args.join(" ")}`); + await runSshfs(args, config.password); + + logger.info(`SFTP volume at ${mountPath} mounted successfully.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT * 2, "SFTP mount"); + } catch (error) { + logger.error("Error mounting SFTP volume", { error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +export const makeSftpBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ + mount: () => mount(config, mountPath), + unmount: () => unmount(mountPath), + checkHealth: () => checkHealth(mountPath), +}); diff --git a/apps/agent/src/volume-host/backends/smb.ts b/apps/agent/src/volume-host/backends/smb.ts new file mode 100644 index 00000000..767d27d5 --- /dev/null +++ b/apps/agent/src/volume-host/backends/smb.ts @@ -0,0 +1,124 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import { logger } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import { OPERATION_TIMEOUT } from "../constants"; +import { withTimeout } from "../timeout"; +import { getMountForPath } from "../fs"; +import type { BackendConfig, VolumeBackend } from "../types"; +import { assertMounted, executeMount, executeUnmount } from "./utils"; + +const checkHealth = async (mountPath: string) => { + const run = async () => { + await assertMounted(mountPath, (fstype) => fstype === "cifs"); + + logger.debug(`SMB volume at ${mountPath} is healthy and mounted.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "SMB health check"); + } catch (error) { + const message = toMessage(error); + if (message !== "Volume is not mounted") { + logger.error("SMB volume health check failed:", message); + } + return { status: "error" as const, error: message }; + } +}; + +const unmount = async (mountPath: string) => { + if (os.platform() !== "linux") { + logger.error("SMB unmounting is only supported on Linux hosts."); + return { status: "error" as const, error: "SMB unmounting is only supported on Linux hosts." }; + } + + const run = async () => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { + logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); + return { status: "unmounted" as const }; + } + + await executeUnmount(mountPath); + await fs.rmdir(mountPath).catch(() => {}); + + logger.info(`SMB volume at ${mountPath} unmounted successfully.`); + return { status: "unmounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "SMB unmount"); + } catch (error) { + logger.error("Error unmounting SMB volume", { mountPath, error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const mount = async (config: BackendConfig, mountPath: string) => { + logger.debug(`Mounting SMB volume ${mountPath}...`); + + if (config.backend !== "smb") { + logger.error("Provided config is not for SMB backend"); + return { status: "error" as const, error: "Provided config is not for SMB backend" }; + } + + if (os.platform() !== "linux") { + logger.error("SMB mounting is only supported on Linux hosts."); + return { status: "error" as const, error: "SMB mounting is only supported on Linux hosts." }; + } + + const { status } = await checkHealth(mountPath); + if (status === "mounted") return { status: "mounted" as const }; + + if (status === "error") { + logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); + await unmount(mountPath); + } + + const run = async () => { + await fs.mkdir(mountPath, { recursive: true }); + const { uid, gid } = os.userInfo(); + const options = [`port=${config.port}`, `uid=${uid}`, `gid=${gid}`, "iocharset=utf8"]; + + if (config.guest) { + options.push("username=guest", "password="); + } else { + const safePassword = (config.password ?? "").replace(/\\/g, "\\\\").replace(/,/g, "\\,"); + options.push(`username=${config.username ?? "user"}`, `password=${safePassword}`); + } + + if (config.domain) options.push(`domain=${config.domain}`); + if (config.vers && config.vers !== "auto") options.push(`vers=${config.vers}`); + if (config.readOnly) options.push("ro"); + + const source = `//${config.server}/${config.share}`; + const args = ["-t", "cifs", "-o", options.join(","), source, mountPath]; + + logger.debug(`Mounting SMB volume ${mountPath}...`); + logger.info(`Executing SMB mount for ${source} at ${mountPath}`); + + try { + await executeMount(args); + } catch (error) { + logger.error(`SMB mount failed: ${toMessage(error)}`); + throw error; + } + + logger.info(`SMB volume at ${mountPath} mounted successfully.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "SMB mount"); + } catch (error) { + logger.error("Error mounting SMB volume", { error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +export const makeSmbBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ + mount: () => mount(config, mountPath), + unmount: () => unmount(mountPath), + checkHealth: () => checkHealth(mountPath), +}); diff --git a/app/server/modules/backends/utils/__tests__/backend-utils.test.ts b/apps/agent/src/volume-host/backends/utils.test.ts similarity index 75% rename from app/server/modules/backends/utils/__tests__/backend-utils.test.ts rename to apps/agent/src/volume-host/backends/utils.test.ts index afee747c..7ca06b93 100644 --- a/app/server/modules/backends/utils/__tests__/backend-utils.test.ts +++ b/apps/agent/src/volume-host/backends/utils.test.ts @@ -9,8 +9,8 @@ vi.mock("node:fs/promises", async (importOriginal) => { }; }); -vi.mock("../../../../utils/mountinfo", async (importOriginal) => { - const actual = await importOriginal(); +vi.mock("../fs", async (importOriginal) => { + const actual = await importOriginal(); return { ...actual, @@ -19,14 +19,14 @@ vi.mock("../../../../utils/mountinfo", async (importOriginal) => { }); import * as fs from "node:fs/promises"; -import * as mountinfo from "../../../../utils/mountinfo"; -import { assertMounted } from "../backend-utils"; +import * as volumeFs from "../fs"; +import { assertMounted } from "./utils"; afterEach(() => { vi.restoreAllMocks(); }); -describe("assertMountedFilesystem", () => { +describe("assertMounted", () => { test("throws when the path is not accessible", async () => { vi.mocked(fs.access).mockRejectedValueOnce(new Error("missing")); @@ -37,7 +37,7 @@ describe("assertMountedFilesystem", () => { test("throws when the mount filesystem does not match", async () => { vi.mocked(fs.access).mockResolvedValueOnce(undefined); - vi.mocked(mountinfo.getMountForPath).mockResolvedValueOnce({ + vi.mocked(volumeFs.getMountForPath).mockResolvedValueOnce({ mountPoint: "/tmp/volume", fstype: "cifs", }); @@ -49,7 +49,7 @@ describe("assertMountedFilesystem", () => { test("accepts a matching mounted filesystem", async () => { vi.mocked(fs.access).mockResolvedValueOnce(undefined); - vi.mocked(mountinfo.getMountForPath).mockResolvedValueOnce({ + vi.mocked(volumeFs.getMountForPath).mockResolvedValueOnce({ mountPoint: "/tmp/volume", fstype: "nfs4", }); diff --git a/app/server/modules/backends/utils/backend-utils.ts b/apps/agent/src/volume-host/backends/utils.ts similarity index 62% rename from app/server/modules/backends/utils/backend-utils.ts rename to apps/agent/src/volume-host/backends/utils.ts index a9e9d2b6..7a5ac091 100644 --- a/app/server/modules/backends/utils/backend-utils.ts +++ b/apps/agent/src/volume-host/backends/utils.ts @@ -1,7 +1,6 @@ import * as fs from "node:fs/promises"; -import { logger } from "@zerobyte/core/node"; -import { safeExec } from "@zerobyte/core/node"; -import { getMountForPath } from "../../../utils/mountinfo"; +import { logger, safeExec } from "@zerobyte/core/node"; +import { getMountForPath } from "../fs"; export const executeMount = async (args: string[]): Promise => { const shouldBeVerbose = process.env.LOG_LEVEL === "debug" || process.env.NODE_ENV !== "production"; @@ -10,7 +9,6 @@ export const executeMount = async (args: string[]): Promise => { logger.debug(`Executing mount ${effectiveArgs.join(" ")}`); const result = await safeExec({ command: "mount", args: effectiveArgs, timeout: 10000 }); - const stdout = result.stdout.toString().trim(); const stderr = result.stderr.toString().trim(); @@ -26,37 +24,29 @@ export const executeMount = async (args: string[]): Promise => { throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr || stdout || "unknown error"}`); }; -export const executeUnmount = async (path: string): Promise => { - let stderr: string | undefined; - - logger.debug(`Executing umount -l ${path}`); - const result = await safeExec({ command: "umount", args: ["-l", path], timeout: 10000 }); - - stderr = result.stderr.toString(); - - if (stderr?.trim()) { - logger.warn(stderr.trim()); - } +export const executeUnmount = async (mountPath: string): Promise => { + logger.debug(`Executing umount -l ${mountPath}`); + const result = await safeExec({ command: "umount", args: ["-l", mountPath], timeout: 10000 }); + const stderr = result.stderr.toString(); + if (stderr.trim()) logger.warn(stderr.trim()); if (result.exitCode !== 0) { - throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr?.trim()}`); + throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr.trim()}`); } }; -export const assertMounted = async (path: string, isExpectedFilesystem: (fstype: string) => boolean) => { +export const assertMounted = async (mountPath: string, isExpectedFilesystem: (fstype: string) => boolean) => { try { - await fs.access(path); + await fs.access(mountPath); } catch { throw new Error("Volume is not mounted"); } - const mount = await getMountForPath(path); - - if (!mount || mount.mountPoint !== path) { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { throw new Error("Volume is not mounted"); } - if (!isExpectedFilesystem(mount.fstype)) { - throw new Error(`Path ${path} is not mounted as correct fstype (found ${mount.fstype}).`); + throw new Error(`Path ${mountPath} is not mounted as correct fstype (found ${mount.fstype}).`); } }; diff --git a/apps/agent/src/volume-host/backends/webdav.ts b/apps/agent/src/volume-host/backends/webdav.ts new file mode 100644 index 00000000..dd3548e8 --- /dev/null +++ b/apps/agent/src/volume-host/backends/webdav.ts @@ -0,0 +1,144 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import { logger } from "@zerobyte/core/node"; +import { toMessage } from "@zerobyte/core/utils"; +import { OPERATION_TIMEOUT } from "../constants"; +import { withTimeout } from "../timeout"; +import { getMountForPath } from "../fs"; +import type { BackendConfig, VolumeBackend } from "../types"; +import { assertMounted, executeMount, executeUnmount } from "./utils"; + +const checkHealth = async (mountPath: string) => { + const run = async () => { + await assertMounted(mountPath, (fstype) => fstype === "fuse" || fstype === "davfs"); + + logger.debug(`WebDAV volume at ${mountPath} is healthy and mounted.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV health check"); + } catch (error) { + const message = toMessage(error); + if (message !== "Volume is not mounted") { + logger.error("WebDAV volume health check failed:", message); + } + return { status: "error" as const, error: message }; + } +}; + +const unmount = async (mountPath: string) => { + if (os.platform() !== "linux") { + logger.error("WebDAV unmounting is only supported on Linux hosts."); + return { status: "error" as const, error: "WebDAV unmounting is only supported on Linux hosts." }; + } + + const run = async () => { + const mount = await getMountForPath(mountPath); + if (!mount || mount.mountPoint !== mountPath) { + logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`); + return { status: "unmounted" as const }; + } + + await executeUnmount(mountPath); + await fs.rmdir(mountPath).catch(() => {}); + + logger.info(`WebDAV volume at ${mountPath} unmounted successfully.`); + return { status: "unmounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV unmount"); + } catch (error) { + logger.error("Error unmounting WebDAV volume", { mountPath, error: toMessage(error) }); + return { status: "error" as const, error: toMessage(error) }; + } +}; + +const mount = async (config: BackendConfig, mountPath: string) => { + logger.debug(`Mounting WebDAV volume ${mountPath}...`); + + if (config.backend !== "webdav") { + logger.error("Provided config is not for WebDAV backend"); + return { status: "error" as const, error: "Provided config is not for WebDAV backend" }; + } + + if (os.platform() !== "linux") { + logger.error("WebDAV mounting is only supported on Linux hosts."); + return { status: "error" as const, error: "WebDAV mounting is only supported on Linux hosts." }; + } + + const { status } = await checkHealth(mountPath); + if (status === "mounted") return { status: "mounted" as const }; + + if (status === "error") { + logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`); + await unmount(mountPath); + } + + const run = async () => { + await fs.mkdir(mountPath, { recursive: true }).catch((error) => { + logger.warn(`Failed to create directory ${mountPath}: ${toMessage(error)}`); + }); + const protocol = config.ssl ? "https" : "http"; + const defaultPort = config.ssl ? 443 : 80; + const source = `${protocol}://${config.server}${config.port !== defaultPort ? `:${config.port}` : ""}${config.path}`; + const { uid, gid } = os.userInfo(); + const options = config.readOnly + ? [`uid=${uid}`, `gid=${gid}`, "file_mode=0444", "dir_mode=0555", "ro"] + : [`uid=${uid}`, `gid=${gid}`, "file_mode=0664", "dir_mode=0775"]; + + if (config.username && config.password) { + const entry = [source, config.username, config.password] + .map((value) => value.replace(/[\r\n\t\s]+/g, " ")) + .join(" "); + await fs.appendFile("/etc/davfs2/secrets", `${entry}\n`, { mode: 0o600 }); + } + + logger.debug(`Mounting WebDAV volume ${mountPath}...`); + + const args = ["-t", "davfs", "-o", options.join(","), source, mountPath]; + try { + await executeMount(args); + } catch (error) { + logger.warn(`Initial WebDAV mount failed, retrying with -i flag: ${toMessage(error)}`); + await executeMount(["-i", ...args]); + } + + logger.info(`WebDAV volume at ${mountPath} mounted successfully.`); + return { status: "mounted" as const }; + }; + + try { + return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV mount"); + } catch (error) { + const message = toMessage(error); + if (message.includes("already mounted")) return { status: "mounted" as const }; + + logger.error("Error mounting WebDAV volume", { error: message }); + + if (message.includes("option") && message.includes("requires argument")) { + return { + status: "error" as const, + error: "Invalid mount options. Please check your WebDAV server configuration.", + }; + } + if (message.includes("connection refused") || message.includes("Connection refused")) { + return { + status: "error" as const, + error: "Cannot connect to WebDAV server. Please check the server address and port.", + }; + } + if (message.includes("unauthorized") || message.includes("Unauthorized")) { + return { status: "error" as const, error: "Authentication failed. Please check your username and password." }; + } + + return { status: "error" as const, error: message }; + } +}; + +export const makeWebdavBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({ + mount: () => mount(config, mountPath), + unmount: () => unmount(mountPath), + checkHealth: () => checkHealth(mountPath), +}); diff --git a/apps/agent/src/volume-host/constants.ts b/apps/agent/src/volume-host/constants.ts new file mode 100644 index 00000000..c11d135d --- /dev/null +++ b/apps/agent/src/volume-host/constants.ts @@ -0,0 +1,9 @@ +import * as path from "node:path"; + +export const OPERATION_TIMEOUT = 5000; +export const VOLUME_MOUNT_BASE = process.env.ZEROBYTE_VOLUMES_DIR || "/var/lib/zerobyte/volumes"; +export const SSH_KEYS_DIR = "/var/lib/zerobyte/ssh"; +export const RCLONE_CONFIG_DIR = process.env.RCLONE_CONFIG_DIR || "/root/.config/rclone"; +export const RCLONE_CONFIG_FILE = path.join(RCLONE_CONFIG_DIR, "rclone.conf"); +const serverIdleTimeout = Number(process.env.SERVER_IDLE_TIMEOUT ?? 60); +export const RCLONE_TIMEOUT = (Number.isFinite(serverIdleTimeout) ? serverIdleTimeout : 60) * 1000; diff --git a/apps/agent/src/volume-host/fs.ts b/apps/agent/src/volume-host/fs.ts new file mode 100644 index 00000000..a97a47ec --- /dev/null +++ b/apps/agent/src/volume-host/fs.ts @@ -0,0 +1,66 @@ +import * as fs from "node:fs/promises"; +import { isPathWithin } from "@zerobyte/core/utils"; + +type MountInfo = { + mountPoint: string; + fstype: string; +}; + +const unescapeMount = (value: string) => + value.replace(/\\([0-7]{3})/g, (_, oct) => String.fromCharCode(parseInt(oct, 8))); + +export const readMountInfo = async (): Promise => { + const text = await fs.readFile("/proc/self/mountinfo", "utf-8"); + const result: MountInfo[] = []; + + for (const line of text.split("\n")) { + if (!line) continue; + const sep = line.indexOf(" - "); + if (sep === -1) continue; + + const left = line.slice(0, sep).split(" "); + const right = line.slice(sep + 3).split(" "); + const mpRaw = left[4]; + const fstype = right[0]; + + if (!mpRaw || !fstype) continue; + result.push({ mountPoint: unescapeMount(mpRaw), fstype }); + } + + return result; +}; + +export const getMountForPath = async (targetPath: string): Promise => { + const mounts = await readMountInfo(); + let best: MountInfo | undefined; + + for (const mount of mounts) { + if (!isPathWithin(mount.mountPoint, targetPath)) continue; + if (!best || mount.mountPoint.length > best.mountPoint.length) { + best = mount; + } + } + + return best; +}; + +export const getStatFs = async (mountPoint: string) => { + const stat = await fs.statfs(mountPoint, { bigint: true }); + const unit = stat.bsize > 0n ? stat.bsize : 1n; + const blocks = stat.blocks > 0n ? stat.blocks : 0n; + let bfree = stat.bfree > 0n ? stat.bfree : 0n; + if (bfree > blocks) bfree = blocks; + const bavail = stat.bavail > 0n ? stat.bavail : 0n; + const max = BigInt(Number.MAX_SAFE_INTEGER); + const toNumber = (value: bigint) => (value > max ? Number.MAX_SAFE_INTEGER : Number(value)); + + return { + total: toNumber(blocks * unit), + used: toNumber((blocks - bfree) * unit), + free: toNumber(bavail * unit), + }; +}; + +export const isNodeJSErrnoException = (error: unknown): error is NodeJS.ErrnoException => { + return error instanceof Error && "code" in error; +}; diff --git a/apps/agent/src/volume-host/index.ts b/apps/agent/src/volume-host/index.ts new file mode 100644 index 00000000..d704326e --- /dev/null +++ b/apps/agent/src/volume-host/index.ts @@ -0,0 +1,31 @@ +import { makeDirectoryBackend } from "./backends/directory"; +import { makeNfsBackend } from "./backends/nfs"; +import { makeRcloneBackend } from "./backends/rclone"; +import { makeSftpBackend } from "./backends/sftp"; +import { makeSmbBackend } from "./backends/smb"; +import { makeWebdavBackend } from "./backends/webdav"; +import { getVolumePath } from "./paths"; +import type { AgentVolume, VolumeBackend } from "./types"; + +export { getStatFs, isNodeJSErrnoException } from "./fs"; +export { getVolumePath } from "./paths"; +export type { AgentVolume, BackendConfig, VolumeBackend } from "./types"; + +export const createVolumeBackend = (volume: AgentVolume, mountPath = getVolumePath(volume)): VolumeBackend => { + switch (volume.config.backend) { + case "directory": + return makeDirectoryBackend(volume.config, mountPath); + case "nfs": + return makeNfsBackend(volume.config, mountPath); + case "smb": + return makeSmbBackend(volume.config, mountPath); + case "webdav": + return makeWebdavBackend(volume.config, mountPath); + case "rclone": + return makeRcloneBackend(volume.config, mountPath); + case "sftp": + return makeSftpBackend(volume.config, mountPath); + } + + throw new Error("Unsupported backend"); +}; diff --git a/apps/agent/src/volume-host/operations.ts b/apps/agent/src/volume-host/operations.ts new file mode 100644 index 00000000..527bee97 --- /dev/null +++ b/apps/agent/src/volume-host/operations.ts @@ -0,0 +1,190 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import * as path from "node:path"; +import { toMessage } from "@zerobyte/core/utils"; +import { createVolumeBackend, getVolumePath, isNodeJSErrnoException } from "."; +import type { AgentVolume, BackendConfig } from "./types"; +import { Data, Effect } from "effect"; + +const DEFAULT_PAGE_SIZE = 500; +const MAX_PAGE_SIZE = 500; + +export const listVolumeFiles = async ( + volume: AgentVolume, + subPath?: string, + offset: number = 0, + limit: number = DEFAULT_PAGE_SIZE, +) => { + const volumePath = getVolumePath(volume); + const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath; + const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE); + const startOffset = Math.max(offset, 0); + + try { + const realVolumeRoot = await fs.realpath(volumePath); + const realRequestedPath = await fs.realpath(requestedPath); + const relative = path.relative(realVolumeRoot, realRequestedPath); + + if (relative.startsWith("..") || path.isAbsolute(relative)) { + throw new Error("Invalid path"); + } + + const dirents = await fs.readdir(realRequestedPath, { withFileTypes: true }); + + dirents.sort((a, b) => { + const aIsDir = a.isDirectory(); + const bIsDir = b.isDirectory(); + + if (aIsDir === bIsDir) { + return a.name.localeCompare(b.name); + } + return aIsDir ? -1 : 1; + }); + + const total = dirents.length; + const paginatedDirents = dirents.slice(startOffset, startOffset + pageSize); + + const entries = ( + await Promise.all( + paginatedDirents.map(async (dirent) => { + const fullPath = path.join(realRequestedPath, dirent.name); + + try { + const stats = await fs.stat(fullPath); + const relativePath = path.relative(realVolumeRoot, fullPath); + + return { + name: dirent.name, + path: `/${relativePath}`, + type: dirent.isDirectory() ? ("directory" as const) : ("file" as const), + size: dirent.isFile() ? stats.size : undefined, + modifiedAt: stats.mtimeMs, + }; + } catch { + return null; + } + }), + ) + ).filter((entry) => entry !== null); + + return { + files: entries, + path: subPath || "/", + offset: startOffset, + limit: pageSize, + total, + hasMore: startOffset + pageSize < total, + }; + } catch (error) { + if (isNodeJSErrnoException(error) && error.code === "ENOENT") { + throw new Error("Directory not found"); + } + if (toMessage(error) === "Invalid path") { + throw error; + } + throw new Error(`Failed to list files: ${toMessage(error)}`); + } +}; + +export const browseFilesystem = async (browsePath: string) => { + const normalizedPath = path.normalize(browsePath); + const entries = await fs.readdir(normalizedPath, { withFileTypes: true }); + + const directories = await Promise.all( + entries + .filter((entry) => entry.isDirectory()) + .map(async (entry) => { + const fullPath = path.join(normalizedPath, entry.name); + + try { + const stats = await fs.stat(fullPath); + return { + name: entry.name, + path: fullPath, + type: "directory" as const, + size: undefined, + modifiedAt: stats.mtimeMs, + }; + } catch { + return { + name: entry.name, + path: fullPath, + type: "directory" as const, + size: undefined, + modifiedAt: undefined, + }; + } + }), + ); + + return { + directories: directories.sort((a, b) => a.name.localeCompare(b.name)), + path: normalizedPath, + }; +}; + +class TempDirError extends Data.TaggedError("TempDirError")<{ + cause: unknown; +}> {} + +class CleanupError extends Data.TaggedError("CleanupError")<{ + cause: unknown; + tempDir: string; +}> {} + +class MountError extends Data.TaggedError("MountError")<{ + cause: unknown; +}> {} + +const createTempDir = Effect.acquireRelease( + Effect.tryPromise({ + try: () => fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-")), + catch: (error) => new TempDirError({ cause: error }), + }), + (tempDir) => + Effect.tryPromise({ + try: () => fs.rm(tempDir, { recursive: true, force: true }), + catch: (error) => new CleanupError({ cause: error, tempDir }), + }).pipe(Effect.orDie), +); + +export const testVolumeConnection = (backendConfig: BackendConfig) => + Effect.scoped( + Effect.gen(function* () { + const tempDir = yield* createTempDir; + + const mockVolume: AgentVolume = { + id: 0, + shortId: "test", + name: "test-connection", + config: backendConfig, + createdAt: Date.now(), + updatedAt: Date.now(), + lastHealthCheck: Date.now(), + type: backendConfig.backend, + status: "unmounted", + lastError: null, + provisioningId: null, + autoRemount: true, + agentId: "local", + organizationId: "test-org", + }; + + const backend = createVolumeBackend(mockVolume, tempDir); + + const mountResult = yield* Effect.tryPromise({ + try: () => backend.mount(), + catch: (error) => new MountError({ cause: error }), + }); + + yield* Effect.tryPromise({ + try: () => backend.unmount(), + catch: () => undefined, + }); + + return { + success: !mountResult.error, + message: mountResult.error ? toMessage(mountResult.error) : "Connection successful", + }; + }), + ); diff --git a/apps/agent/src/volume-host/paths.ts b/apps/agent/src/volume-host/paths.ts new file mode 100644 index 00000000..f5a1b59e --- /dev/null +++ b/apps/agent/src/volume-host/paths.ts @@ -0,0 +1,10 @@ +import { VOLUME_MOUNT_BASE } from "./constants"; +import type { AgentVolume } from "./types"; + +export const getVolumePath = (volume: AgentVolume) => { + if (volume.config.backend === "directory") { + return volume.config.path; + } + + return `${VOLUME_MOUNT_BASE}/${volume.shortId}/_data`; +}; diff --git a/apps/agent/src/volume-host/timeout.ts b/apps/agent/src/volume-host/timeout.ts new file mode 100644 index 00000000..336ca1e0 --- /dev/null +++ b/apps/agent/src/volume-host/timeout.ts @@ -0,0 +1,21 @@ +class TimeoutError extends Error { + code = "ETIMEOUT"; + constructor(message: string) { + super(message); + this.name = "TimeoutError"; + } +} + +export const withTimeout = async (promise: Promise, ms: number, label: string): Promise => { + let timeout: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new TimeoutError(`${label} timed out after ${ms}ms`)), ms); + }), + ]); + } finally { + if (timeout) clearTimeout(timeout); + } +}; diff --git a/apps/agent/src/volume-host/types.ts b/apps/agent/src/volume-host/types.ts new file mode 100644 index 00000000..72b32826 --- /dev/null +++ b/apps/agent/src/volume-host/types.ts @@ -0,0 +1,67 @@ +export type BackendStatus = "mounted" | "unmounted" | "error"; + +type BaseConfig = { backend: string; readOnly?: boolean }; + +export type BackendConfig = + | (BaseConfig & { backend: "directory"; path: string }) + | (BaseConfig & { backend: "nfs"; server: string; exportPath: string; port: number; version: "3" | "4" | "4.1" }) + | (BaseConfig & { + backend: "smb"; + server: string; + share: string; + username?: string; + password?: string; + guest?: boolean; + vers?: "1.0" | "2.0" | "2.1" | "3.0" | "auto"; + domain?: string; + port: number; + }) + | (BaseConfig & { + backend: "webdav"; + server: string; + path: string; + username?: string; + password?: string; + port: number; + ssl?: boolean; + }) + | (BaseConfig & { backend: "rclone"; remote: string; path: string }) + | (BaseConfig & { + backend: "sftp"; + host: string; + port: number; + username: string; + password?: string; + privateKey?: string; + path: string; + skipHostKeyCheck?: boolean; + knownHosts?: string; + }); + +export type AgentVolume = { + id: number; + shortId: string; + name: string; + config: BackendConfig; + createdAt: number; + updatedAt: number; + lastHealthCheck: number; + type: string; + status: BackendStatus; + lastError: string | null; + provisioningId?: string | null; + autoRemount: boolean; + agentId: string; + organizationId: string; +}; + +export type OperationResult = { + status: BackendStatus; + error?: string; +}; + +export type VolumeBackend = { + mount: () => Promise; + unmount: () => Promise; + checkHealth: () => Promise; +}; diff --git a/packages/contracts/src/agent-protocol.ts b/packages/contracts/src/agent-protocol.ts index 2cbd1c75..2be20a16 100644 --- a/packages/contracts/src/agent-protocol.ts +++ b/packages/contracts/src/agent-protocol.ts @@ -51,6 +51,111 @@ const backupCancelSchema = z.object({ payload: z.object({ jobId: z.string(), scheduleId: z.string() }), }); +const backendStatusSchema = z.enum(["mounted", "unmounted", "error"]); + +const volumeSchema = z.object({ + id: z.number(), + shortId: z.string(), + name: z.string(), + path: z.string().nullable().optional(), + config: z.record(z.string(), z.unknown()).and(z.object({ backend: z.string() })), + createdAt: z.number(), + updatedAt: z.number(), + lastHealthCheck: z.number(), + type: z.string(), + status: backendStatusSchema, + lastError: z.string().nullable(), + provisioningId: z.string().nullable().optional(), + autoRemount: z.boolean(), + agentId: z.string(), + organizationId: z.string(), +}); + +const volumeOperationResultSchema = z.object({ + status: backendStatusSchema, + error: z.string().optional(), +}); + +const statfsSchema = z.object({ + total: z.number().optional(), + used: z.number().optional(), + free: z.number().optional(), +}); + +const fileEntrySchema = z.object({ + name: z.string(), + path: z.string(), + type: z.enum(["directory", "file"]), + size: z.number().optional(), + modifiedAt: z.number().optional(), +}); + +const directoryEntrySchema = z.object({ + name: z.string(), + path: z.string(), + type: z.literal("directory"), + size: z.undefined().optional(), + modifiedAt: z.number().optional(), +}); + +const volumeCommandSchema = z.discriminatedUnion("name", [ + z.object({ name: z.literal("volume.mount"), volume: volumeSchema }), + z.object({ name: z.literal("volume.unmount"), volume: volumeSchema }), + z.object({ name: z.literal("volume.checkHealth"), volume: volumeSchema }), + z.object({ name: z.literal("volume.statfs"), volume: volumeSchema }), + z.object({ + name: z.literal("volume.listFiles"), + volume: volumeSchema, + subPath: z.string().optional(), + offset: z.number(), + limit: z.number(), + }), + z.object({ name: z.literal("volume.testConnection"), backendConfig: z.record(z.string(), z.unknown()) }), + z.object({ name: z.literal("filesystem.browse"), path: z.string() }), +]); + +const volumeCommandRequestSchema = z.object({ + type: z.literal("volume.command"), + payload: z.object({ + commandId: z.string(), + command: volumeCommandSchema, + }), +}); + +const volumeCommandResultSchema = z.discriminatedUnion("name", [ + z.object({ name: z.literal("volume.mount"), result: volumeOperationResultSchema }), + z.object({ name: z.literal("volume.unmount"), result: volumeOperationResultSchema }), + z.object({ name: z.literal("volume.checkHealth"), result: volumeOperationResultSchema }), + z.object({ name: z.literal("volume.statfs"), result: statfsSchema }), + z.object({ + name: z.literal("volume.listFiles"), + result: z.object({ + files: z.array(fileEntrySchema), + path: z.string(), + offset: z.number(), + limit: z.number(), + total: z.number(), + hasMore: z.boolean(), + }), + }), + z.object({ + name: z.literal("volume.testConnection"), + result: z.object({ success: z.boolean(), message: z.string() }), + }), + z.object({ + name: z.literal("filesystem.browse"), + result: z.object({ directories: z.array(directoryEntrySchema), path: z.string() }), + }), +]); + +const volumeCommandResponseSchema = z.object({ + type: z.literal("volume.commandResult"), + payload: z.discriminatedUnion("status", [ + z.object({ commandId: z.string(), status: z.literal("success"), command: volumeCommandResultSchema }), + z.object({ commandId: z.string(), status: z.literal("error"), error: z.string() }), + ]), +}); + const heartbeatPingSchema = z.object({ type: z.literal("heartbeat.ping"), payload: z.object({ sentAt: z.number() }), @@ -113,6 +218,7 @@ const heartbeatPongSchema = z.object({ const controllerMessageSchema = z.discriminatedUnion("type", [ backupRunSchema, backupCancelSchema, + volumeCommandRequestSchema, heartbeatPingSchema, ]); const agentMessageSchema = z.discriminatedUnion("type", [ @@ -122,6 +228,7 @@ const agentMessageSchema = z.discriminatedUnion("type", [ backupCompletedSchema, backupFailedSchema, backupCancelledSchema, + volumeCommandResponseSchema, heartbeatPongSchema, ]); @@ -132,6 +239,10 @@ export type BackupProgressPayload = z.infer["payloa export type BackupCompletedPayload = z.infer["payload"]; export type BackupFailedPayload = z.infer["payload"]; export type BackupCancelledPayload = z.infer["payload"]; +export type VolumeCommandPayload = z.infer["payload"]; +export type VolumeCommand = z.infer; +export type VolumeCommandResult = z.infer; +export type VolumeCommandResponsePayload = z.infer["payload"]; export type ControllerMessage = z.infer; export type AgentMessage = z.infer;