feat(agent): add volume operation RPC (#861)

This commit is contained in:
Nico
2026-05-07 18:11:57 +02:00
committed by GitHub
parent 5e4742488f
commit df4b668560
34 changed files with 1591 additions and 905 deletions

View File

@@ -5,7 +5,7 @@ import { volumeService } from "../modules/volumes/volume.service";
import { readMountInfo } from "../utils/mountinfo";
import { getVolumePath } from "../modules/volumes/helpers";
import { logger } from "@zerobyte/core/node";
import { executeUnmount } from "../modules/backends/utils/backend-utils";
import { executeUnmount } from "../../../apps/agent/src/volume-host/backends/utils";
import { toMessage } from "../utils/errors";
import { VOLUME_MOUNT_BASE } from "../core/constants";
import { db } from "../db/db";

View File

@@ -4,7 +4,7 @@ import { fromAny, fromPartial } from "@total-typescript/shoehorn";
import { Effect } from "effect";
import { agentManager, type ProcessWithAgentRuntime } from "../agents-manager";
import type { AgentManagerRuntime } from "../controller/server";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import type { BackupRunPayload, VolumeCommand, VolumeCommandResponsePayload } from "@zerobyte/contracts/agent-protocol";
const setAgentRuntime = (agentManagerRuntime: Partial<AgentManagerRuntime> | null) => {
(process as ProcessWithAgentRuntime).__zerobyteAgentRuntime = {
@@ -46,3 +46,32 @@ test("cancelBackup resolves a running backup when the cancel command cannot be d
scheduleId: "schedule-1",
});
});
test("runVolumeCommand sends the command to the selected agent", async () => {
const runVolumeCommand = vi.fn(() =>
Effect.succeed({
commandId: "command-1",
status: "success",
command: { name: "volume.mount", result: { status: "mounted" } },
} satisfies VolumeCommandResponsePayload),
);
setAgentRuntime({ runVolumeCommand });
const command = fromPartial<VolumeCommand>({ name: "volume.mount", volume: { agentId: "agent-1" } });
await expect(agentManager.runVolumeCommand("agent-1", command)).resolves.toEqual({
name: "volume.mount",
result: { status: "mounted" },
});
expect(runVolumeCommand).toHaveBeenCalledWith("agent-1", command);
});
test("runVolumeCommand fails when the selected agent is unavailable", async () => {
setAgentRuntime(null);
const command = fromPartial<VolumeCommand>({ name: "volume.mount", volume: { agentId: "agent-1" } });
await expect(agentManager.runVolumeCommand("agent-1", command)).rejects.toThrow(
"Volume agent agent-1 is not connected",
);
});

View File

@@ -1,5 +1,5 @@
import { logger } from "@zerobyte/core/node";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import type { BackupRunPayload, VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol";
import { Effect } from "effect";
import { config } from "../../core/config";
import { createAgentManagerRuntime, type AgentManagerEvent } from "./controller/server";
@@ -266,6 +266,23 @@ export const agentManager = {
cancelBackup: async (agentId: string, scheduleId: number) => {
return requestBackupCancellation(agentId, scheduleId);
},
runVolumeCommand: async (agentId: string, command: VolumeCommand): Promise<VolumeCommandResult> => {
const runtime = getAgentManagerRuntime();
if (!runtime) {
throw new Error(`Volume agent ${agentId} is not connected`);
}
const response = await Effect.runPromise(runtime.runVolumeCommand(agentId, command));
if (!response) {
throw new Error(`Failed to send volume command ${command.name} to agent ${agentId}`);
}
if (response.status === "error") {
throw new Error(response.error);
}
return response.command;
},
};
export const startLocalAgent = async () => {

View File

@@ -1,7 +1,13 @@
import { Data, Effect, Exit, Fiber, Scope } from "effect";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import type { AgentMessage, BackupCancelPayload, BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import type {
AgentMessage,
BackupCancelPayload,
BackupRunPayload,
VolumeCommand,
VolumeCommandResponsePayload,
} from "@zerobyte/contracts/agent-protocol";
import {
createControllerAgentSession,
type AgentConnectionData,
@@ -319,10 +325,30 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) =>
logger.warn(`Cannot cancel backup command. Agent ${agentId} is no longer accepting commands.`);
return false;
}
logger.info(`Sent backup cancel for command ${payload.jobId} to agent ${agentId}`);
return true;
}),
runVolumeCommand: (
agentId: string,
command: VolumeCommand,
): Effect.Effect<VolumeCommandResponsePayload | null, Error> =>
Effect.gen(function* () {
const session = getSession(agentId);
if (!session) {
yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not connected.`);
return null;
}
if (!(yield* session.isReady())) {
yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not ready.`);
return null;
}
const result = yield* session.runVolumeCommand(command);
yield* logger.effect.info(`Completed volume command ${command.name} on agent ${agentId}`);
return result;
}),
stop,
};
}

View File

@@ -1,4 +1,4 @@
import { Effect, Queue, Ref, type Scope } from "effect";
import { Deferred, Effect, Queue, Ref, type Scope } from "effect";
import type { AgentKind } from "../../../db/schema";
import {
createControllerMessage,
@@ -7,6 +7,8 @@ import {
type BackupCancelPayload,
type BackupRunPayload,
type ControllerWireMessage,
type VolumeCommand,
type VolumeCommandResponsePayload,
} from "@zerobyte/contracts/agent-protocol";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
@@ -27,13 +29,23 @@ type SessionState = {
lastPongAt: number | null;
};
export type ControllerAgentSessionEvent = AgentMessage | { type: "agent.disconnected" };
type PendingCommand = {
deferred: Deferred.Deferred<VolumeCommandResponsePayload, Error>;
description: string;
};
export type ControllerAgentSessionEvent =
| Exclude<AgentMessage, { type: "volume.commandResult" }>
| {
type: "agent.disconnected";
};
export type ControllerAgentSession = {
readonly connectionId: string;
handleMessage: (data: string) => Effect.Effect<void>;
sendBackup: (payload: BackupRunPayload) => Effect.Effect<boolean>;
sendBackupCancel: (payload: BackupCancelPayload) => Effect.Effect<boolean>;
runVolumeCommand: (command: VolumeCommand) => Effect.Effect<VolumeCommandResponsePayload, Error>;
isReady: () => Effect.Effect<boolean>;
run: Effect.Effect<void, never, Scope.Scope>;
};
@@ -45,6 +57,7 @@ export const createControllerAgentSession = (
Effect.gen(function* () {
let isClosed = false;
const outboundQueue = yield* Queue.bounded<ControllerWireMessage>(64);
const pendingCommands = yield* Ref.make(new Map<string, PendingCommand>());
const state = yield* Ref.make<SessionState>({
isReady: false,
lastSeenAt: null,
@@ -63,9 +76,33 @@ export const createControllerAgentSession = (
const updateState = (update: (current: SessionState) => SessionState) => Ref.update(state, update);
const setPendingCommand = (commandId: string, pending: PendingCommand) =>
Ref.update(pendingCommands, (current) => new Map(current).set(commandId, pending));
const removePendingCommand = (commandId: string) =>
Ref.modify(pendingCommands, (current) => {
const pending = current.get(commandId) ?? null;
const next = new Map(current);
next.delete(commandId);
return [pending, next];
});
const rejectPendingCommands = Effect.gen(function* () {
const pendingCommandEntries = yield* Ref.get(pendingCommands);
yield* Ref.set(pendingCommands, new Map());
for (const pending of pendingCommandEntries.values()) {
yield* Deferred.fail(
pending.deferred,
new Error(`Agent session closed before ${pending.description} completed`),
);
}
});
const releaseSession = Effect.gen(function* () {
const disconnectedAt = Date.now();
yield* updateState((current) => ({ ...current, isReady: false, lastSeenAt: disconnectedAt }));
yield* rejectPendingCommands;
yield* onEvent({ type: "agent.disconnected" });
yield* Queue.shutdown(outboundQueue);
@@ -129,20 +166,42 @@ export const createControllerAgentSession = (
return yield* Effect.never;
});
const handleVolumeCommandResult = (payload: VolumeCommandResponsePayload) =>
Effect.gen(function* () {
const pending = yield* removePendingCommand(payload.commandId);
if (!pending) {
yield* logger.effect.warn(`Received response for unknown volume command ${payload.commandId}`);
return;
}
yield* Deferred.succeed(pending.deferred, payload);
});
const handleAgentMessage = (message: AgentMessage) =>
Effect.gen(function* () {
if (message.type === "agent.ready") {
const readyAt = Date.now();
yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt }));
yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`);
switch (message.type) {
case "agent.ready": {
const readyAt = Date.now();
yield* updateState((current) => ({ ...current, isReady: true, lastSeenAt: readyAt }));
yield* logger.effect.info(`Agent "${socket.data.agentName}" (${socket.data.agentId}) is ready`);
yield* onEvent(message);
break;
}
case "heartbeat.pong": {
const seenAt = Date.now();
yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt }));
yield* onEvent(message);
break;
}
case "volume.commandResult": {
yield* handleVolumeCommandResult(message.payload);
break;
}
default: {
yield* onEvent(message);
break;
}
}
if (message.type === "heartbeat.pong") {
const seenAt = Date.now();
yield* updateState((current) => ({ ...current, lastSeenAt: seenAt, lastPongAt: message.payload.sentAt }));
}
yield* onEvent(message);
});
return {
@@ -166,6 +225,27 @@ export const createControllerAgentSession = (
},
sendBackup: (payload) => offerOutbound(createControllerMessage("backup.run", payload)),
sendBackupCancel: (payload) => offerOutbound(createControllerMessage("backup.cancel", payload)),
runVolumeCommand: (command) =>
Effect.gen(function* () {
const commandId = Bun.randomUUIDv7();
const description = `volume command ${command.name}`;
const deferred = yield* Deferred.make<VolumeCommandResponsePayload, Error>();
yield* setPendingCommand(commandId, { deferred, description });
const queued = yield* offerOutbound(createControllerMessage("volume.command", { commandId, command }));
if (!queued) {
yield* removePendingCommand(commandId);
return yield* Effect.fail(new Error(`Failed to queue volume command ${command.name}`));
}
return yield* Deferred.await(deferred).pipe(
Effect.timeoutFail({
duration: "60 seconds",
onTimeout: () => new Error(`Volume command ${command.name} timed out`),
}),
Effect.ensuring(removePendingCommand(commandId)),
);
}),
isReady: () => Ref.get(state).pipe(Effect.map((current) => current.isReady)),
run,
};

View File

@@ -1,23 +1,14 @@
import type { BackendStatus } from "~/schemas/volumes";
import { makeDirectoryBackend } from "../../../../apps/agent/src/volume-host/backends/directory";
import { makeNfsBackend } from "../../../../apps/agent/src/volume-host/backends/nfs";
import { makeRcloneBackend } from "../../../../apps/agent/src/volume-host/backends/rclone";
import { makeSftpBackend } from "../../../../apps/agent/src/volume-host/backends/sftp";
import { makeSmbBackend } from "../../../../apps/agent/src/volume-host/backends/smb";
import { makeWebdavBackend } from "../../../../apps/agent/src/volume-host/backends/webdav";
import type { VolumeBackend } from "../../../../apps/agent/src/volume-host/types";
import type { Volume } from "../../db/schema";
import { getVolumePath } from "../volumes/helpers";
import { makeDirectoryBackend } from "./directory/directory-backend";
import { makeNfsBackend } from "./nfs/nfs-backend";
import { makeRcloneBackend } from "./rclone/rclone-backend";
import { makeSmbBackend } from "./smb/smb-backend";
import { makeWebdavBackend } from "./webdav/webdav-backend";
import { makeSftpBackend } from "./sftp/sftp-backend";
type OperationResult = {
error?: string;
status: BackendStatus;
};
export type VolumeBackend = {
mount: () => Promise<OperationResult>;
unmount: () => Promise<OperationResult>;
checkHealth: () => Promise<OperationResult>;
};
export type { VolumeBackend };
export const createVolumeBackend = (volume: Volume, mountPath = getVolumePath(volume)): VolumeBackend => {
switch (volume.config.backend) {

View File

@@ -1,53 +0,0 @@
import * as fs from "node:fs/promises";
import { toMessage } from "../../../utils/errors";
import { logger } from "@zerobyte/core/node";
import type { VolumeBackend } from "../backend";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
const mount = async (config: BackendConfig, _volumePath: string) => {
if (config.backend !== "directory") {
return { status: BACKEND_STATUS.error, error: "Invalid backend type" };
}
logger.info("Mounting directory volume from:", config.path);
try {
await fs.access(config.path);
const stats = await fs.stat(config.path);
if (!stats.isDirectory()) {
return { status: BACKEND_STATUS.error, error: "Path is not a directory" };
}
return { status: BACKEND_STATUS.mounted };
} catch (error) {
logger.error("Failed to mount directory volume:", error);
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const unmount = async () => {
logger.info("Cannot unmount directory volume.");
return { status: BACKEND_STATUS.unmounted };
};
const checkHealth = async (config: BackendConfig) => {
if (config.backend !== "directory") {
return { status: BACKEND_STATUS.error, error: "Invalid backend type" };
}
try {
await fs.access(config.path);
return { status: BACKEND_STATUS.mounted };
} catch (error) {
logger.error("Directory health check failed:", error);
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
export const makeDirectoryBackend = (config: BackendConfig, volumePath: string): VolumeBackend => ({
mount: () => mount(config, volumePath),
unmount,
checkHealth: () => checkHealth(config),
});

View File

@@ -1,135 +0,0 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
import { OPERATION_TIMEOUT } from "../../../core/constants";
import { toMessage } from "../../../utils/errors";
import { logger } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { withTimeout } from "../../../utils/timeout";
import type { VolumeBackend } from "../backend";
import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils";
const mount = async (config: BackendConfig, path: string) => {
logger.debug(`Mounting volume ${path}...`);
if (config.backend !== "nfs") {
logger.error("Provided config is not for NFS backend");
return {
status: BACKEND_STATUS.error,
error: "Provided config is not for NFS backend",
};
}
if (os.platform() !== "linux") {
logger.error("NFS mounting is only supported on Linux hosts.");
return {
status: BACKEND_STATUS.error,
error: "NFS mounting is only supported on Linux hosts.",
};
}
const { status } = await checkHealth(path);
if (status === "mounted") {
return { status: BACKEND_STATUS.mounted };
}
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`);
await unmount(path);
}
const run = async () => {
await fs.mkdir(path, { recursive: true });
const source = `${config.server}:${config.exportPath}`;
const options = [`vers=${config.version}`, `port=${config.port}`];
if (config.version === "3") {
options.push("nolock");
}
if (config.readOnly) {
options.push("ro");
}
const args = ["-t", "nfs", "-o", options.join(","), source, path];
logger.debug(`Mounting volume ${path}...`);
logger.info(`Executing mount: mount ${args.join(" ")}`);
try {
await executeMount(args);
} catch (error) {
logger.warn(`Initial NFS mount failed, retrying with -i flag: ${toMessage(error)}`);
// Fallback with -i flag if the first mount fails using the mount helper
await executeMount(["-i", ...args]);
}
logger.info(`NFS volume at ${path} mounted successfully.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS mount");
} catch (err) {
logger.error("Error mounting NFS volume", { error: toMessage(err) });
return { status: BACKEND_STATUS.error, error: toMessage(err) };
}
};
const unmount = async (path: string) => {
if (os.platform() !== "linux") {
logger.error("NFS unmounting is only supported on Linux hosts.");
return {
status: BACKEND_STATUS.error,
error: "NFS unmounting is only supported on Linux hosts.",
};
}
const run = async () => {
const mount = await getMountForPath(path);
if (!mount || mount.mountPoint !== path) {
logger.debug(`Path ${path} is not a mount point. Skipping unmount.`);
return { status: BACKEND_STATUS.unmounted };
}
await executeUnmount(path);
await fs.rmdir(path).catch(() => {});
logger.info(`NFS volume at ${path} unmounted successfully.`);
return { status: BACKEND_STATUS.unmounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS unmount");
} catch (err) {
logger.error("Error unmounting NFS volume", {
path,
error: toMessage(err),
});
return { status: BACKEND_STATUS.error, error: toMessage(err) };
}
};
const checkHealth = async (path: string) => {
const run = async () => {
await assertMounted(path, (fstype) => fstype.startsWith("nfs"));
logger.debug(`NFS volume at ${path} is healthy and mounted.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("NFS volume health check failed:", message);
}
return { status: BACKEND_STATUS.error, error: message };
}
};
export const makeNfsBackend = (config: BackendConfig, path: string): VolumeBackend => ({
mount: () => mount(config, path),
unmount: () => unmount(path),
checkHealth: () => checkHealth(path),
});

View File

@@ -1,131 +0,0 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { OPERATION_TIMEOUT, RCLONE_CONFIG_FILE } from "../../../core/constants";
import { toMessage } from "../../../utils/errors";
import { logger } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { withTimeout } from "../../../utils/timeout";
import type { VolumeBackend } from "../backend";
import { assertMounted, executeUnmount } from "../utils/backend-utils";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
import { safeExec } from "@zerobyte/core/node";
import { config as zbConfig } from "~/server/core/config";
const mount = async (config: BackendConfig, path: string) => {
logger.debug(`Mounting rclone volume ${path}...`);
if (config.backend !== "rclone") {
logger.error("Provided config is not for rclone backend");
return { status: BACKEND_STATUS.error, error: "Provided config is not for rclone backend" };
}
if (os.platform() !== "linux") {
logger.error("Rclone mounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "Rclone mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(path);
if (status === "mounted") {
return { status: BACKEND_STATUS.mounted };
}
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`);
await unmount(path);
}
const run = async () => {
await fs.mkdir(path, { recursive: true });
const remotePath = `${config.remote}:${config.path}`;
const args = ["mount", remotePath, path, "--daemon"];
if (config.readOnly) {
args.push("--read-only");
}
args.push("--vfs-cache-mode", "writes");
args.push("--allow-non-empty");
args.push("--allow-other");
logger.debug(`Mounting rclone volume ${path}...`);
logger.info(`Executing rclone: rclone ${args.join(" ")}`);
const result = await safeExec({
command: "rclone",
args,
env: { RCLONE_CONFIG: RCLONE_CONFIG_FILE },
timeout: zbConfig.serverIdleTimeout * 1000,
});
if (result.exitCode !== 0) {
const errorMsg = result.stderr.toString() || result.stdout.toString() || "Unknown error";
throw new Error(`Failed to mount rclone volume: ${errorMsg}`);
}
logger.info(`Rclone volume at ${path} mounted successfully.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), zbConfig.serverIdleTimeout * 1000, "Rclone mount");
} catch (error) {
const errorMsg = toMessage(error);
logger.error("Error mounting rclone volume", { error: errorMsg });
return { status: BACKEND_STATUS.error, error: errorMsg };
}
};
const unmount = async (path: string) => {
if (os.platform() !== "linux") {
logger.error("Rclone unmounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "Rclone unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(path);
if (!mount || mount.mountPoint !== path) {
logger.debug(`Path ${path} is not a mount point. Skipping unmount.`);
return { status: BACKEND_STATUS.unmounted };
}
await executeUnmount(path);
await fs.rmdir(path).catch(() => {});
logger.info(`Rclone volume at ${path} unmounted successfully.`);
return { status: BACKEND_STATUS.unmounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone unmount");
} catch (error) {
logger.error("Error unmounting rclone volume", { path, error: toMessage(error) });
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const checkHealth = async (path: string) => {
const run = async () => {
await assertMounted(path, (fstype) => fstype.includes("rclone"));
logger.debug(`Rclone volume at ${path} is healthy and mounted.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("Rclone volume health check failed:", message);
}
return { status: BACKEND_STATUS.error, error: message };
}
};
export const makeRcloneBackend = (config: BackendConfig, path: string): VolumeBackend => ({
mount: () => mount(config, path),
unmount: () => unmount(path),
checkHealth: () => checkHealth(path),
});

View File

@@ -1,212 +0,0 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import { spawn } from "node:child_process";
import { OPERATION_TIMEOUT, SSH_KEYS_DIR } from "../../../core/constants";
import { cryptoUtils } from "../../../utils/crypto";
import { toMessage } from "../../../utils/errors";
import { logger, FILE_MODES, writeFileWithMode } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { withTimeout } from "../../../utils/timeout";
import type { VolumeBackend } from "../backend";
import { executeUnmount } from "../utils/backend-utils";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
const getPrivateKeyPath = (mountPath: string) => {
const name = path.basename(mountPath);
return path.join(SSH_KEYS_DIR, `${name}.key`);
};
const getKnownHostsPath = (mountPath: string) => {
const name = path.basename(mountPath);
return path.join(SSH_KEYS_DIR, `${name}.known_hosts`);
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting SFTP volume ${mountPath}...`);
if (config.backend !== "sftp") {
logger.error("Provided config is not for SFTP backend");
return { status: BACKEND_STATUS.error, error: "Provided config is not for SFTP backend" };
}
if (os.platform() !== "linux") {
logger.error("SFTP mounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "SFTP mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") {
return { status: BACKEND_STATUS.mounted };
}
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true });
await fs.mkdir(SSH_KEYS_DIR, { recursive: true });
const { uid, gid } = os.userInfo();
const options = [
"reconnect",
"ServerAliveInterval=15",
"ServerAliveCountMax=3",
"allow_other",
`uid=${uid}`,
`gid=${gid}`,
];
if (config.skipHostKeyCheck) {
options.push("StrictHostKeyChecking=no", "UserKnownHostsFile=/dev/null");
} else if (config.knownHosts) {
const knownHostsPath = getKnownHostsPath(mountPath);
await writeFileWithMode(knownHostsPath, config.knownHosts, FILE_MODES.ownerReadWrite);
options.push(`UserKnownHostsFile=${knownHostsPath}`, "StrictHostKeyChecking=yes");
} else {
options.push("StrictHostKeyChecking=yes");
}
if (config.readOnly) {
options.push("ro");
}
if (config.port) {
options.push(`port=${config.port}`);
}
const keyPath = getPrivateKeyPath(mountPath);
if (config.privateKey) {
const decryptedKey = await cryptoUtils.resolveSecret(config.privateKey);
let normalizedKey = decryptedKey.replace(/\r\n/g, "\n");
if (!normalizedKey.endsWith("\n")) {
normalizedKey += "\n";
}
await writeFileWithMode(keyPath, normalizedKey, FILE_MODES.ownerReadWrite);
options.push(`IdentityFile=${keyPath}`);
}
const source = `${config.username}@${config.host}:${config.path || ""}`;
const args = [source, mountPath, "-o", options.join(",")];
logger.debug(`Mounting SFTP volume ${mountPath}...`);
const runSshfs = async (mountArgs: string[], password?: string) => {
return new Promise<void>((resolve, reject) => {
const child = spawn("sshfs", mountArgs, { stdio: ["pipe", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (data) => {
stdout += data;
});
child.stderr.on("data", (data) => {
stderr += data;
});
child.on("error", (error) => {
reject(new Error(`Failed to start sshfs: ${error.message}`));
});
child.on("close", (code) => {
if (code === 0) {
resolve();
return;
}
const errorMsg = stderr.trim() || stdout.trim() || "Unknown error";
reject(new Error(`Failed to mount SFTP volume: ${errorMsg}`));
});
if (password) {
child.stdin.write(password);
}
child.stdin.end();
});
};
if (config.password) {
const password = await cryptoUtils.resolveSecret(config.password);
args.push("-o", "password_stdin");
logger.info(`Executing sshfs: sshfs ${args.join(" ")}`);
await runSshfs(args, password);
} else {
logger.info(`Executing sshfs: sshfs ${args.join(" ")}`);
await runSshfs(args);
}
logger.info(`SFTP volume at ${mountPath} mounted successfully.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT * 2, "SFTP mount");
} catch (error) {
const errorMsg = toMessage(error);
logger.error("Error mounting SFTP volume", { error: errorMsg });
return { status: BACKEND_STATUS.error, error: errorMsg };
}
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("SFTP unmounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "SFTP unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
} else {
await executeUnmount(mountPath);
}
const keyPath = getPrivateKeyPath(mountPath);
await fs.unlink(keyPath).catch(() => {});
const knownHostsPath = getKnownHostsPath(mountPath);
await fs.unlink(knownHostsPath).catch(() => {});
await fs.rmdir(mountPath).catch(() => {});
logger.info(`SFTP volume at ${mountPath} unmounted successfully.`);
return { status: BACKEND_STATUS.unmounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SFTP unmount");
} catch (error) {
logger.error("Error unmounting SFTP volume", { mountPath, error: toMessage(error) });
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const checkHealth = async (mountPath: string) => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
return { status: BACKEND_STATUS.unmounted };
}
if (mount.fstype !== "fuse.sshfs") {
return {
status: BACKEND_STATUS.error,
error: `Invalid filesystem type: ${mount.fstype} (expected fuse.sshfs)`,
};
}
return { status: BACKEND_STATUS.mounted };
};
export const makeSftpBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -1,141 +0,0 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { OPERATION_TIMEOUT } from "../../../core/constants";
import { cryptoUtils } from "../../../utils/crypto";
import { toMessage } from "../../../utils/errors";
import { logger } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { withTimeout } from "../../../utils/timeout";
import type { VolumeBackend } from "../backend";
import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
const mount = async (config: BackendConfig, path: string) => {
logger.debug(`Mounting SMB volume ${path}...`);
if (config.backend !== "smb") {
logger.error("Provided config is not for SMB backend");
return { status: BACKEND_STATUS.error, error: "Provided config is not for SMB backend" };
}
if (os.platform() !== "linux") {
logger.error("SMB mounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "SMB mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(path);
if (status === "mounted") {
return { status: BACKEND_STATUS.mounted };
}
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`);
await unmount(path);
}
const run = async () => {
await fs.mkdir(path, { recursive: true });
const source = `//${config.server}/${config.share}`;
const { uid, gid } = os.userInfo();
const options = [`port=${config.port}`, `uid=${uid}`, `gid=${gid}`, "iocharset=utf8"];
if (config.guest) {
options.push("username=guest", "password=");
} else {
const password = await cryptoUtils.resolveSecret(config.password ?? "");
const safePassword = password.replace(/\\/g, "\\\\").replace(/,/g, "\\,");
options.push(`username=${config.username ?? "user"}`, `password=${safePassword}`);
}
if (config.domain) {
options.push(`domain=${config.domain}`);
}
if (config.vers && config.vers !== "auto") {
options.push(`vers=${config.vers}`);
}
if (config.readOnly) {
options.push("ro");
}
const args = ["-t", "cifs", "-o", options.join(","), source, path];
logger.debug(`Mounting SMB volume ${path}...`);
logger.info(`Executing SMB mount for ${source} at ${path}`);
try {
await executeMount(args);
} catch (error) {
logger.error(`SMB mount failed: ${toMessage(error)}`);
throw error;
}
logger.info(`SMB volume at ${path} mounted successfully.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB mount");
} catch (error) {
logger.error("Error mounting SMB volume", { error: toMessage(error) });
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const unmount = async (path: string) => {
if (os.platform() !== "linux") {
logger.error("SMB unmounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "SMB unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(path);
if (!mount || mount.mountPoint !== path) {
logger.debug(`Path ${path} is not a mount point. Skipping unmount.`);
return { status: BACKEND_STATUS.unmounted };
}
await executeUnmount(path);
await fs.rmdir(path).catch(() => {});
logger.info(`SMB volume at ${path} unmounted successfully.`);
return { status: BACKEND_STATUS.unmounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB unmount");
} catch (error) {
logger.error("Error unmounting SMB volume", { path, error: toMessage(error) });
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const checkHealth = async (path: string) => {
const run = async () => {
await assertMounted(path, (fstype) => fstype === "cifs");
logger.debug(`SMB volume at ${path} is healthy and mounted.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("SMB volume health check failed:", message);
}
return { status: BACKEND_STATUS.error, error: message };
}
};
export const makeSmbBackend = (config: BackendConfig, path: string): VolumeBackend => ({
mount: () => mount(config, path),
unmount: () => unmount(path),
checkHealth: () => checkHealth(path),
});

View File

@@ -1,159 +0,0 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { OPERATION_TIMEOUT } from "../../../core/constants";
import { cryptoUtils } from "../../../utils/crypto";
import { toMessage } from "../../../utils/errors";
import { logger } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { withTimeout } from "../../../utils/timeout";
import type { VolumeBackend } from "../backend";
import { assertMounted, executeMount, executeUnmount } from "../utils/backend-utils";
import { BACKEND_STATUS, type BackendConfig } from "~/schemas/volumes";
const mount = async (config: BackendConfig, path: string) => {
logger.debug(`Mounting WebDAV volume ${path}...`);
if (config.backend !== "webdav") {
logger.error("Provided config is not for WebDAV backend");
return { status: BACKEND_STATUS.error, error: "Provided config is not for WebDAV backend" };
}
if (os.platform() !== "linux") {
logger.error("WebDAV mounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "WebDAV mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(path);
if (status === "mounted") {
return { status: BACKEND_STATUS.mounted };
}
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${path} before mounting...`);
await unmount(path);
}
const run = async () => {
await fs.mkdir(path, { recursive: true }).catch((err) => {
logger.warn(`Failed to create directory ${path}: ${err.message}`);
});
const protocol = config.ssl ? "https" : "http";
const defaultPort = config.ssl ? 443 : 80;
const port = config.port !== defaultPort ? `:${config.port}` : "";
const source = `${protocol}://${config.server}${port}${config.path}`;
const { uid, gid } = os.userInfo();
const options = config.readOnly
? [`uid=${uid}`, `gid=${gid}`, "file_mode=0444", "dir_mode=0555", "ro"]
: [`uid=${uid}`, `gid=${gid}`, "file_mode=0664", "dir_mode=0775"];
if (config.username && config.password) {
const password = await cryptoUtils.resolveSecret(config.password);
const secretsFile = "/etc/davfs2/secrets";
const entry = [source, config.username, password].map((value) => value.replace(/[\r\n\t\s]+/g, " ")).join(" ");
const secretsContent = `${entry}\n`;
await fs.appendFile(secretsFile, secretsContent, { mode: 0o600 });
}
logger.debug(`Mounting WebDAV volume ${path}...`);
const args = ["-t", "davfs", "-o", options.join(","), source, path];
try {
await executeMount(args);
} catch (error) {
logger.warn(`Initial WebDAV mount failed, retrying with -i flag: ${toMessage(error)}`);
// Fallback with -i flag if the first mount fails using the mount helper
await executeMount(["-i", ...args]);
}
logger.info(`WebDAV volume at ${path} mounted successfully.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV mount");
} catch (error) {
const errorMsg = toMessage(error);
if (errorMsg.includes("already mounted")) {
return { status: BACKEND_STATUS.mounted };
}
logger.error("Error mounting WebDAV volume", { error: errorMsg });
if (errorMsg.includes("option") && errorMsg.includes("requires argument")) {
return {
status: BACKEND_STATUS.error,
error: "Invalid mount options. Please check your WebDAV server configuration.",
};
} else if (errorMsg.includes("connection refused") || errorMsg.includes("Connection refused")) {
return {
status: BACKEND_STATUS.error,
error: "Cannot connect to WebDAV server. Please check the server address and port.",
};
} else if (errorMsg.includes("unauthorized") || errorMsg.includes("Unauthorized")) {
return {
status: BACKEND_STATUS.error,
error: "Authentication failed. Please check your username and password.",
};
}
return { status: BACKEND_STATUS.error, error: errorMsg };
}
};
const unmount = async (path: string) => {
if (os.platform() !== "linux") {
logger.error("WebDAV unmounting is only supported on Linux hosts.");
return { status: BACKEND_STATUS.error, error: "WebDAV unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(path);
if (!mount || mount.mountPoint !== path) {
logger.debug(`Path ${path} is not a mount point. Skipping unmount.`);
return { status: BACKEND_STATUS.unmounted };
}
await executeUnmount(path);
await fs.rmdir(path).catch(() => {});
logger.info(`WebDAV volume at ${path} unmounted successfully.`);
return { status: BACKEND_STATUS.unmounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV unmount");
} catch (error) {
logger.error("Error unmounting WebDAV volume", { path, error: toMessage(error) });
return { status: BACKEND_STATUS.error, error: toMessage(error) };
}
};
const checkHealth = async (path: string) => {
const run = async () => {
await assertMounted(path, (fstype) => fstype === "fuse" || fstype === "davfs");
logger.debug(`WebDAV volume at ${path} is healthy and mounted.`);
return { status: BACKEND_STATUS.mounted };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("WebDAV volume health check failed:", message);
}
return { status: BACKEND_STATUS.error, error: message };
}
};
export const makeWebdavBackend = (config: BackendConfig, path: string): VolumeBackend => ({
mount: () => mount(config, path),
unmount: () => unmount(path),
checkHealth: () => checkHealth(path),
});

View File

@@ -32,3 +32,7 @@ export const mapVolumeConfigSecrets = async (
export const encryptVolumeConfig = async (config: BackendConfig): Promise<BackendConfig> => {
return await mapVolumeConfigSecrets(config, cryptoUtils.sealSecret);
};
export const decryptVolumeConfig = async (config: BackendConfig): Promise<BackendConfig> => {
return await mapVolumeConfigSecrets(config, cryptoUtils.resolveSecret);
};

View File

@@ -20,7 +20,7 @@ import { volumeConfigSchema, type BackendConfig } from "~/schemas/volumes";
import { getOrganizationId } from "~/server/core/request-context";
import { isNodeJSErrnoException } from "~/server/utils/fs";
import { asShortId, type ShortId } from "~/server/utils/branded";
import { encryptVolumeConfig } from "./volume-config-secrets";
import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets";
type EnsureHealthyVolumeResult =
| {
@@ -80,7 +80,7 @@ const createVolume = async (name: string, backendConfig: BackendConfig) => {
throw new InternalServerError("Failed to create volume");
}
const backend = createVolumeBackend(created);
const backend = createVolumeBackend({ ...created, config: await decryptVolumeConfig(created.config) });
const { error, status } = await backend.mount();
await db
@@ -114,7 +114,7 @@ const mountVolume = async (shortId: ShortId) => {
throw new NotFoundError("Volume not found");
}
const backend = createVolumeBackend(volume);
const backend = createVolumeBackend({ ...volume, config: await decryptVolumeConfig(volume.config) });
await backend.unmount();
const { error, status } = await backend.mount();
@@ -219,7 +219,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
}
if (configChanged) {
const backend = createVolumeBackend(updated);
const backend = createVolumeBackend({ ...updated, config: await decryptVolumeConfig(updated.config) });
const { error, status } = await backend.mount();
await db
.update(volumesTable)
@@ -235,18 +235,16 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
const testConnection = async (backendConfig: BackendConfig) => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-"));
try {
const encryptedConfig = await encryptVolumeConfig(backendConfig);
const mockVolume = {
id: 0,
shortId: asShortId("test"),
name: "test-connection",
path: tempDir,
config: encryptedConfig,
config: backendConfig,
createdAt: Date.now(),
updatedAt: Date.now(),
lastHealthCheck: Date.now(),
type: encryptedConfig.backend,
type: backendConfig.backend,
status: "unmounted" as const,
lastError: null,
provisioningId: null,

View File

@@ -95,3 +95,56 @@ test("closes the websocket when an outbound send throws", async () => {
session.close();
}
});
test("continues processing inbound messages after a volume command fails", async () => {
const outboundMessages: string[] = [];
const session = createControllerSession(
fromPartial({
send: (message: string) => {
outboundMessages.push(message);
},
}),
);
try {
session.onMessage(
createControllerMessage("volume.command", {
commandId: "command-1",
command: {
name: "filesystem.browse",
path: "/path/that/does/not/exist",
},
}),
);
session.onMessage(createControllerMessage("heartbeat.ping", { sentAt: 123 }));
await waitForExpect(() => {
const parsedMessages = outboundMessages.map((message) => parseAgentMessage(message));
const volumeResult = parsedMessages.find(
(message) => message?.success && message.data.type === "volume.commandResult",
);
const heartbeatPong = parsedMessages.find(
(message) => message?.success && message.data.type === "heartbeat.pong",
);
expect(volumeResult?.success).toBe(true);
if (!volumeResult || !volumeResult.success || volumeResult.data.type !== "volume.commandResult") {
return;
}
expect(volumeResult.data.payload).toEqual({
commandId: "command-1",
status: "error",
error: "ENOENT: no such file or directory, scandir '/path/that/does/not/exist'",
});
expect(heartbeatPong?.success).toBe(true);
if (!heartbeatPong || !heartbeatPong.success || heartbeatPong.data.type !== "heartbeat.pong") {
return;
}
expect(heartbeatPong.data.payload).toEqual({ sentAt: 123 });
});
} finally {
session.close();
}
});

View File

@@ -3,6 +3,7 @@ import { handleBackupCancelCommand } from "./backup-cancel";
import { handleBackupRunCommand } from "./backup-run";
import type { ControllerCommandContext } from "../context";
import { handleHeartbeatPingCommand } from "./heartbeat-ping";
import { handleVolumeCommand } from "./volume";
export const handleControllerCommand = (context: ControllerCommandContext, message: ControllerMessage) => {
switch (message.type) {
@@ -12,6 +13,9 @@ export const handleControllerCommand = (context: ControllerCommandContext, messa
case "backup.cancel": {
return handleBackupCancelCommand(context, message.payload);
}
case "volume.command": {
return handleVolumeCommand(context, message.payload);
}
case "heartbeat.ping": {
return handleHeartbeatPingCommand(context, message.payload);
}

View File

@@ -0,0 +1,93 @@
import { Effect, Data } from "effect";
import { createAgentMessage, type VolumeCommand, type VolumeCommandPayload } from "@zerobyte/contracts/agent-protocol";
import { toMessage } from "@zerobyte/core/utils";
import { createVolumeBackend, getStatFs, getVolumePath, type AgentVolume, type BackendConfig } from "../volume-host";
import { browseFilesystem, listVolumeFiles, testVolumeConnection } from "../volume-host/operations";
import type { ControllerCommandContext } from "../context";
type VolumeBackedCommand = Extract<VolumeCommand, { volume: unknown }>;
class VolumeCommandError extends Data.TaggedError("StopAgentManagerServerError")<{
cause: unknown;
}> {}
const asVolume = (volume: VolumeBackedCommand["volume"]): AgentVolume => ({
...volume,
config: volume.config as BackendConfig,
provisioningId: volume.provisioningId ?? null,
});
const runBackendOperation = (
command: Extract<VolumeCommand, { volume: unknown }>,
operation: "mount" | "unmount" | "checkHealth",
) =>
Effect.tryPromise({
try: () => {
const backend = createVolumeBackend(asVolume(command.volume));
return backend[operation]();
},
catch: (error) => new VolumeCommandError({ cause: error }),
});
const executeVolumeCommand = (command: VolumeCommand) =>
Effect.gen(function* () {
switch (command.name) {
case "volume.mount":
return { name: command.name, result: yield* runBackendOperation(command, "mount") };
case "volume.unmount":
return { name: command.name, result: yield* runBackendOperation(command, "unmount") };
case "volume.checkHealth":
return { name: command.name, result: yield* runBackendOperation(command, "checkHealth") };
case "volume.statfs": {
const result = yield* Effect.tryPromise({
try: () => getStatFs(getVolumePath(asVolume(command.volume))),
catch: (error) => new VolumeCommandError({ cause: error }),
});
return { name: command.name, result };
}
case "volume.listFiles": {
const result = yield* Effect.tryPromise({
try: () => listVolumeFiles(asVolume(command.volume), command.subPath, command.offset, command.limit),
catch: (error) => new VolumeCommandError({ cause: error }),
});
return { name: command.name, result };
}
case "volume.testConnection": {
const result = yield* testVolumeConnection(command.backendConfig as BackendConfig);
return { name: command.name, result };
}
case "filesystem.browse":
const result = yield* Effect.tryPromise({
try: () => browseFilesystem(command.path),
catch: (error) => new VolumeCommandError({ cause: error }),
});
return { name: command.name, result };
}
});
export const handleVolumeCommand = (context: ControllerCommandContext, payload: VolumeCommandPayload) => {
return Effect.gen(function* () {
const command = yield* executeVolumeCommand(payload.command);
yield* context.offerOutbound(
createAgentMessage("volume.commandResult", {
commandId: payload.commandId,
status: "success",
command,
}),
);
return command;
}).pipe(
Effect.tapError((error) => {
return context.offerOutbound(
createAgentMessage("volume.commandResult", {
commandId: payload.commandId,
status: "error",
error: toMessage(error?.cause),
}),
);
}),
);
};

View File

@@ -120,7 +120,16 @@ export const createControllerSession = (ws: WebSocket): ControllerSession => {
return;
}
yield* handleControllerCommand(commandContext, parsed.data);
const commandEffect: Effect.Effect<unknown, unknown, never> = handleControllerCommand(
commandContext,
parsed.data,
);
yield* commandEffect.pipe(
Effect.catchAll((error) =>
Effect.sync(() => logger.error(`Failed to handle controller message: ${toMessage(error)}`)),
),
);
}),
),
);

View File

@@ -0,0 +1,52 @@
import * as fs from "node:fs/promises";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import type { BackendConfig, VolumeBackend } from "../types";
const mount = async (config: BackendConfig) => {
if (config.backend !== "directory") {
return { status: "error" as const, error: "Invalid backend type" };
}
logger.info("Mounting directory volume from:", config.path);
try {
await fs.access(config.path);
const stats = await fs.stat(config.path);
if (!stats.isDirectory()) {
return { status: "error" as const, error: "Path is not a directory" };
}
return { status: "mounted" as const };
} catch (error) {
logger.error("Failed to mount directory volume:", error);
return { status: "error" as const, error: toMessage(error) };
}
};
const unmount = async () => {
logger.info("Cannot unmount directory volume.");
return { status: "unmounted" as const };
};
const checkHealth = async (config: BackendConfig) => {
if (config.backend !== "directory") {
return { status: "error" as const, error: "Invalid backend type" };
}
try {
await fs.access(config.path);
return { status: "mounted" as const };
} catch (error) {
logger.error("Directory health check failed:", error);
return { status: "error" as const, error: toMessage(error) };
}
};
export const makeDirectoryBackend = (config: BackendConfig, _: string): VolumeBackend => ({
mount: () => mount(config),
unmount,
checkHealth: () => checkHealth(config),
});

View File

@@ -0,0 +1,112 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {
const run = async () => {
await assertMounted(mountPath, (fstype) => fstype.startsWith("nfs"));
logger.debug(`NFS volume at ${mountPath} is healthy and mounted.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("NFS volume health check failed:", message);
}
return { status: "error" as const, error: message };
}
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("NFS unmounting is only supported on Linux hosts.");
return { status: "error" as const, error: "NFS unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
return { status: "unmounted" as const };
}
await executeUnmount(mountPath);
await fs.rmdir(mountPath).catch(() => {});
logger.info(`NFS volume at ${mountPath} unmounted successfully.`);
return { status: "unmounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS unmount");
} catch (error) {
logger.error("Error unmounting NFS volume", { mountPath, error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting volume ${mountPath}...`);
if (config.backend !== "nfs") {
logger.error("Provided config is not for NFS backend");
return { status: "error" as const, error: "Provided config is not for NFS backend" };
}
if (os.platform() !== "linux") {
logger.error("NFS mounting is only supported on Linux hosts.");
return { status: "error" as const, error: "NFS mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") return { status: "mounted" as const };
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true });
const options = [`vers=${config.version}`, `port=${config.port}`];
if (config.version === "3") options.push("nolock");
if (config.readOnly) options.push("ro");
const args = ["-t", "nfs", "-o", options.join(","), `${config.server}:${config.exportPath}`, mountPath];
logger.debug(`Mounting volume ${mountPath}...`);
logger.info(`Executing mount: mount ${args.join(" ")}`);
try {
await executeMount(args);
} catch (error) {
logger.warn(`Initial NFS mount failed, retrying with -i flag: ${toMessage(error)}`);
await executeMount(["-i", ...args]);
}
logger.info(`NFS volume at ${mountPath} mounted successfully.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "NFS mount");
} catch (error) {
logger.error("Error mounting NFS volume", { error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
export const makeNfsBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -0,0 +1,124 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { logger, safeExec } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT, RCLONE_CONFIG_FILE, RCLONE_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import { assertMounted, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {
const run = async () => {
await assertMounted(mountPath, (fstype) => fstype.includes("rclone"));
logger.debug(`Rclone volume at ${mountPath} is healthy and mounted.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("Rclone volume health check failed:", message);
}
return { status: "error" as const, error: message };
}
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("Rclone unmounting is only supported on Linux hosts.");
return { status: "error" as const, error: "Rclone unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
return { status: "unmounted" as const };
}
await executeUnmount(mountPath);
await fs.rmdir(mountPath).catch(() => {});
logger.info(`Rclone volume at ${mountPath} unmounted successfully.`);
return { status: "unmounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "Rclone unmount");
} catch (error) {
logger.error("Error unmounting rclone volume", { mountPath, error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting rclone volume ${mountPath}...`);
if (config.backend !== "rclone") {
logger.error("Provided config is not for rclone backend");
return { status: "error" as const, error: "Provided config is not for rclone backend" };
}
if (os.platform() !== "linux") {
logger.error("Rclone mounting is only supported on Linux hosts.");
return { status: "error" as const, error: "Rclone mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") return { status: "mounted" as const };
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true });
const args = [
"mount",
`${config.remote}:${config.path}`,
mountPath,
"--daemon",
"--vfs-cache-mode",
"writes",
"--allow-non-empty",
"--allow-other",
];
if (config.readOnly) args.push("--read-only");
logger.debug(`Mounting rclone volume ${mountPath}...`);
logger.info(`Executing rclone: rclone ${args.join(" ")}`);
const result = await safeExec({
command: "rclone",
args,
env: { RCLONE_CONFIG: RCLONE_CONFIG_FILE },
timeout: RCLONE_TIMEOUT,
});
if (result.exitCode !== 0) {
const errorMsg = result.stderr.toString() || result.stdout.toString() || "Unknown error";
throw new Error(`Failed to mount rclone volume: ${errorMsg}`);
}
logger.info(`Rclone volume at ${mountPath} mounted successfully.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), RCLONE_TIMEOUT, "Rclone mount");
} catch (error) {
const errorMsg = toMessage(error);
logger.error("Error mounting rclone volume", { error: errorMsg });
return { status: "error" as const, error: errorMsg };
}
};
export const makeRcloneBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -0,0 +1,162 @@
import * as fs from "node:fs/promises";
import { createHash } from "node:crypto";
import * as os from "node:os";
import * as path from "node:path";
import { spawn } from "node:child_process";
import { FILE_MODES, logger, writeFileWithMode } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT, SSH_KEYS_DIR } from "../constants";
import { getMountForPath } from "../fs";
import { withTimeout } from "../timeout";
import type { BackendConfig, VolumeBackend } from "../types";
import { executeUnmount } from "./utils";
const getMountPathHash = (mountPath: string) => createHash("sha256").update(mountPath).digest("hex").slice(0, 16);
const getPrivateKeyPath = (mountPath: string) => path.join(SSH_KEYS_DIR, `${getMountPathHash(mountPath)}.key`);
const getKnownHostsPath = (mountPath: string) => path.join(SSH_KEYS_DIR, `${getMountPathHash(mountPath)}.known_hosts`);
const runSshfs = async (args: string[], password?: string) =>
new Promise<void>((resolve, reject) => {
const child = spawn("sshfs", args, { stdio: ["pipe", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (data) => {
stdout += data;
});
child.stderr.on("data", (data) => {
stderr += data;
});
child.on("error", (error) => {
reject(new Error(`Failed to start sshfs: ${error.message}`));
});
child.on("close", (code) => {
if (code === 0) {
resolve();
return;
}
const errorMsg = stderr.trim() || stdout.trim() || "Unknown error";
reject(new Error(`Failed to mount SFTP volume: ${errorMsg}`));
});
if (password) child.stdin.write(password);
child.stdin.end();
});
const checkHealth = async (mountPath: string) => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) return { status: "unmounted" as const };
if (mount.fstype !== "fuse.sshfs") {
return { status: "error" as const, error: `Invalid filesystem type: ${mount.fstype} (expected fuse.sshfs)` };
}
return { status: "mounted" as const };
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("SFTP unmounting is only supported on Linux hosts.");
return { status: "error" as const, error: "SFTP unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
} else {
await executeUnmount(mountPath);
}
await fs.unlink(getPrivateKeyPath(mountPath)).catch(() => {});
await fs.unlink(getKnownHostsPath(mountPath)).catch(() => {});
await fs.rmdir(mountPath).catch(() => {});
logger.info(`SFTP volume at ${mountPath} unmounted successfully.`);
return { status: "unmounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SFTP unmount");
} catch (error) {
logger.error("Error unmounting SFTP volume", { mountPath, error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting SFTP volume ${mountPath}...`);
if (config.backend !== "sftp") {
logger.error("Provided config is not for SFTP backend");
return { status: "error" as const, error: "Provided config is not for SFTP backend" };
}
if (os.platform() !== "linux") {
logger.error("SFTP mounting is only supported on Linux hosts.");
return { status: "error" as const, error: "SFTP mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") return { status: "mounted" as const };
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true });
await fs.mkdir(SSH_KEYS_DIR, { recursive: true });
const { uid, gid } = os.userInfo();
const options = [
"reconnect",
"ServerAliveInterval=15",
"ServerAliveCountMax=3",
"allow_other",
`uid=${uid}`,
`gid=${gid}`,
];
if (config.skipHostKeyCheck) {
options.push("StrictHostKeyChecking=no", "UserKnownHostsFile=/dev/null");
} else if (config.knownHosts) {
await writeFileWithMode(getKnownHostsPath(mountPath), config.knownHosts, FILE_MODES.ownerReadWrite);
options.push(`UserKnownHostsFile=${getKnownHostsPath(mountPath)}`, "StrictHostKeyChecking=yes");
} else {
options.push("StrictHostKeyChecking=yes");
}
if (config.readOnly) options.push("ro");
if (config.port) options.push(`port=${config.port}`);
if (config.privateKey) {
let key = config.privateKey.replace(/\r\n/g, "\n");
if (!key.endsWith("\n")) key += "\n";
await writeFileWithMode(getPrivateKeyPath(mountPath), key, FILE_MODES.ownerReadWrite);
options.push(`IdentityFile=${getPrivateKeyPath(mountPath)}`);
}
const args = [`${config.username}@${config.host}:${config.path || ""}`, mountPath, "-o", options.join(",")];
if (config.password) args.push("-o", "password_stdin");
logger.info(`Executing sshfs: sshfs ${args.join(" ")}`);
await runSshfs(args, config.password);
logger.info(`SFTP volume at ${mountPath} mounted successfully.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT * 2, "SFTP mount");
} catch (error) {
logger.error("Error mounting SFTP volume", { error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
export const makeSftpBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -0,0 +1,124 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {
const run = async () => {
await assertMounted(mountPath, (fstype) => fstype === "cifs");
logger.debug(`SMB volume at ${mountPath} is healthy and mounted.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("SMB volume health check failed:", message);
}
return { status: "error" as const, error: message };
}
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("SMB unmounting is only supported on Linux hosts.");
return { status: "error" as const, error: "SMB unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
return { status: "unmounted" as const };
}
await executeUnmount(mountPath);
await fs.rmdir(mountPath).catch(() => {});
logger.info(`SMB volume at ${mountPath} unmounted successfully.`);
return { status: "unmounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB unmount");
} catch (error) {
logger.error("Error unmounting SMB volume", { mountPath, error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting SMB volume ${mountPath}...`);
if (config.backend !== "smb") {
logger.error("Provided config is not for SMB backend");
return { status: "error" as const, error: "Provided config is not for SMB backend" };
}
if (os.platform() !== "linux") {
logger.error("SMB mounting is only supported on Linux hosts.");
return { status: "error" as const, error: "SMB mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") return { status: "mounted" as const };
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true });
const { uid, gid } = os.userInfo();
const options = [`port=${config.port}`, `uid=${uid}`, `gid=${gid}`, "iocharset=utf8"];
if (config.guest) {
options.push("username=guest", "password=");
} else {
const safePassword = (config.password ?? "").replace(/\\/g, "\\\\").replace(/,/g, "\\,");
options.push(`username=${config.username ?? "user"}`, `password=${safePassword}`);
}
if (config.domain) options.push(`domain=${config.domain}`);
if (config.vers && config.vers !== "auto") options.push(`vers=${config.vers}`);
if (config.readOnly) options.push("ro");
const source = `//${config.server}/${config.share}`;
const args = ["-t", "cifs", "-o", options.join(","), source, mountPath];
logger.debug(`Mounting SMB volume ${mountPath}...`);
logger.info(`Executing SMB mount for ${source} at ${mountPath}`);
try {
await executeMount(args);
} catch (error) {
logger.error(`SMB mount failed: ${toMessage(error)}`);
throw error;
}
logger.info(`SMB volume at ${mountPath} mounted successfully.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "SMB mount");
} catch (error) {
logger.error("Error mounting SMB volume", { error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
export const makeSmbBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -9,8 +9,8 @@ vi.mock("node:fs/promises", async (importOriginal) => {
};
});
vi.mock("../../../../utils/mountinfo", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../../../utils/mountinfo")>();
vi.mock("../fs", async (importOriginal) => {
const actual = await importOriginal<typeof import("../fs")>();
return {
...actual,
@@ -19,14 +19,14 @@ vi.mock("../../../../utils/mountinfo", async (importOriginal) => {
});
import * as fs from "node:fs/promises";
import * as mountinfo from "../../../../utils/mountinfo";
import { assertMounted } from "../backend-utils";
import * as volumeFs from "../fs";
import { assertMounted } from "./utils";
afterEach(() => {
vi.restoreAllMocks();
});
describe("assertMountedFilesystem", () => {
describe("assertMounted", () => {
test("throws when the path is not accessible", async () => {
vi.mocked(fs.access).mockRejectedValueOnce(new Error("missing"));
@@ -37,7 +37,7 @@ describe("assertMountedFilesystem", () => {
test("throws when the mount filesystem does not match", async () => {
vi.mocked(fs.access).mockResolvedValueOnce(undefined);
vi.mocked(mountinfo.getMountForPath).mockResolvedValueOnce({
vi.mocked(volumeFs.getMountForPath).mockResolvedValueOnce({
mountPoint: "/tmp/volume",
fstype: "cifs",
});
@@ -49,7 +49,7 @@ describe("assertMountedFilesystem", () => {
test("accepts a matching mounted filesystem", async () => {
vi.mocked(fs.access).mockResolvedValueOnce(undefined);
vi.mocked(mountinfo.getMountForPath).mockResolvedValueOnce({
vi.mocked(volumeFs.getMountForPath).mockResolvedValueOnce({
mountPoint: "/tmp/volume",
fstype: "nfs4",
});

View File

@@ -1,7 +1,6 @@
import * as fs from "node:fs/promises";
import { logger } from "@zerobyte/core/node";
import { safeExec } from "@zerobyte/core/node";
import { getMountForPath } from "../../../utils/mountinfo";
import { logger, safeExec } from "@zerobyte/core/node";
import { getMountForPath } from "../fs";
export const executeMount = async (args: string[]): Promise<void> => {
const shouldBeVerbose = process.env.LOG_LEVEL === "debug" || process.env.NODE_ENV !== "production";
@@ -10,7 +9,6 @@ export const executeMount = async (args: string[]): Promise<void> => {
logger.debug(`Executing mount ${effectiveArgs.join(" ")}`);
const result = await safeExec({ command: "mount", args: effectiveArgs, timeout: 10000 });
const stdout = result.stdout.toString().trim();
const stderr = result.stderr.toString().trim();
@@ -26,37 +24,29 @@ export const executeMount = async (args: string[]): Promise<void> => {
throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr || stdout || "unknown error"}`);
};
export const executeUnmount = async (path: string): Promise<void> => {
let stderr: string | undefined;
logger.debug(`Executing umount -l ${path}`);
const result = await safeExec({ command: "umount", args: ["-l", path], timeout: 10000 });
stderr = result.stderr.toString();
if (stderr?.trim()) {
logger.warn(stderr.trim());
}
export const executeUnmount = async (mountPath: string): Promise<void> => {
logger.debug(`Executing umount -l ${mountPath}`);
const result = await safeExec({ command: "umount", args: ["-l", mountPath], timeout: 10000 });
const stderr = result.stderr.toString();
if (stderr.trim()) logger.warn(stderr.trim());
if (result.exitCode !== 0) {
throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr?.trim()}`);
throw new Error(`Mount command failed with exit code ${result.exitCode}: ${stderr.trim()}`);
}
};
export const assertMounted = async (path: string, isExpectedFilesystem: (fstype: string) => boolean) => {
export const assertMounted = async (mountPath: string, isExpectedFilesystem: (fstype: string) => boolean) => {
try {
await fs.access(path);
await fs.access(mountPath);
} catch {
throw new Error("Volume is not mounted");
}
const mount = await getMountForPath(path);
if (!mount || mount.mountPoint !== path) {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
throw new Error("Volume is not mounted");
}
if (!isExpectedFilesystem(mount.fstype)) {
throw new Error(`Path ${path} is not mounted as correct fstype (found ${mount.fstype}).`);
throw new Error(`Path ${mountPath} is not mounted as correct fstype (found ${mount.fstype}).`);
}
};

View File

@@ -0,0 +1,144 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {
const run = async () => {
await assertMounted(mountPath, (fstype) => fstype === "fuse" || fstype === "davfs");
logger.debug(`WebDAV volume at ${mountPath} is healthy and mounted.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV health check");
} catch (error) {
const message = toMessage(error);
if (message !== "Volume is not mounted") {
logger.error("WebDAV volume health check failed:", message);
}
return { status: "error" as const, error: message };
}
};
const unmount = async (mountPath: string) => {
if (os.platform() !== "linux") {
logger.error("WebDAV unmounting is only supported on Linux hosts.");
return { status: "error" as const, error: "WebDAV unmounting is only supported on Linux hosts." };
}
const run = async () => {
const mount = await getMountForPath(mountPath);
if (!mount || mount.mountPoint !== mountPath) {
logger.debug(`Path ${mountPath} is not a mount point. Skipping unmount.`);
return { status: "unmounted" as const };
}
await executeUnmount(mountPath);
await fs.rmdir(mountPath).catch(() => {});
logger.info(`WebDAV volume at ${mountPath} unmounted successfully.`);
return { status: "unmounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV unmount");
} catch (error) {
logger.error("Error unmounting WebDAV volume", { mountPath, error: toMessage(error) });
return { status: "error" as const, error: toMessage(error) };
}
};
const mount = async (config: BackendConfig, mountPath: string) => {
logger.debug(`Mounting WebDAV volume ${mountPath}...`);
if (config.backend !== "webdav") {
logger.error("Provided config is not for WebDAV backend");
return { status: "error" as const, error: "Provided config is not for WebDAV backend" };
}
if (os.platform() !== "linux") {
logger.error("WebDAV mounting is only supported on Linux hosts.");
return { status: "error" as const, error: "WebDAV mounting is only supported on Linux hosts." };
}
const { status } = await checkHealth(mountPath);
if (status === "mounted") return { status: "mounted" as const };
if (status === "error") {
logger.debug(`Trying to unmount any existing mounts at ${mountPath} before mounting...`);
await unmount(mountPath);
}
const run = async () => {
await fs.mkdir(mountPath, { recursive: true }).catch((error) => {
logger.warn(`Failed to create directory ${mountPath}: ${toMessage(error)}`);
});
const protocol = config.ssl ? "https" : "http";
const defaultPort = config.ssl ? 443 : 80;
const source = `${protocol}://${config.server}${config.port !== defaultPort ? `:${config.port}` : ""}${config.path}`;
const { uid, gid } = os.userInfo();
const options = config.readOnly
? [`uid=${uid}`, `gid=${gid}`, "file_mode=0444", "dir_mode=0555", "ro"]
: [`uid=${uid}`, `gid=${gid}`, "file_mode=0664", "dir_mode=0775"];
if (config.username && config.password) {
const entry = [source, config.username, config.password]
.map((value) => value.replace(/[\r\n\t\s]+/g, " "))
.join(" ");
await fs.appendFile("/etc/davfs2/secrets", `${entry}\n`, { mode: 0o600 });
}
logger.debug(`Mounting WebDAV volume ${mountPath}...`);
const args = ["-t", "davfs", "-o", options.join(","), source, mountPath];
try {
await executeMount(args);
} catch (error) {
logger.warn(`Initial WebDAV mount failed, retrying with -i flag: ${toMessage(error)}`);
await executeMount(["-i", ...args]);
}
logger.info(`WebDAV volume at ${mountPath} mounted successfully.`);
return { status: "mounted" as const };
};
try {
return await withTimeout(run(), OPERATION_TIMEOUT, "WebDAV mount");
} catch (error) {
const message = toMessage(error);
if (message.includes("already mounted")) return { status: "mounted" as const };
logger.error("Error mounting WebDAV volume", { error: message });
if (message.includes("option") && message.includes("requires argument")) {
return {
status: "error" as const,
error: "Invalid mount options. Please check your WebDAV server configuration.",
};
}
if (message.includes("connection refused") || message.includes("Connection refused")) {
return {
status: "error" as const,
error: "Cannot connect to WebDAV server. Please check the server address and port.",
};
}
if (message.includes("unauthorized") || message.includes("Unauthorized")) {
return { status: "error" as const, error: "Authentication failed. Please check your username and password." };
}
return { status: "error" as const, error: message };
}
};
export const makeWebdavBackend = (config: BackendConfig, mountPath: string): VolumeBackend => ({
mount: () => mount(config, mountPath),
unmount: () => unmount(mountPath),
checkHealth: () => checkHealth(mountPath),
});

View File

@@ -0,0 +1,9 @@
import * as path from "node:path";
export const OPERATION_TIMEOUT = 5000;
export const VOLUME_MOUNT_BASE = process.env.ZEROBYTE_VOLUMES_DIR || "/var/lib/zerobyte/volumes";
export const SSH_KEYS_DIR = "/var/lib/zerobyte/ssh";
export const RCLONE_CONFIG_DIR = process.env.RCLONE_CONFIG_DIR || "/root/.config/rclone";
export const RCLONE_CONFIG_FILE = path.join(RCLONE_CONFIG_DIR, "rclone.conf");
const serverIdleTimeout = Number(process.env.SERVER_IDLE_TIMEOUT ?? 60);
export const RCLONE_TIMEOUT = (Number.isFinite(serverIdleTimeout) ? serverIdleTimeout : 60) * 1000;

View File

@@ -0,0 +1,66 @@
import * as fs from "node:fs/promises";
import { isPathWithin } from "@zerobyte/core/utils";
type MountInfo = {
mountPoint: string;
fstype: string;
};
const unescapeMount = (value: string) =>
value.replace(/\\([0-7]{3})/g, (_, oct) => String.fromCharCode(parseInt(oct, 8)));
export const readMountInfo = async (): Promise<MountInfo[]> => {
const text = await fs.readFile("/proc/self/mountinfo", "utf-8");
const result: MountInfo[] = [];
for (const line of text.split("\n")) {
if (!line) continue;
const sep = line.indexOf(" - ");
if (sep === -1) continue;
const left = line.slice(0, sep).split(" ");
const right = line.slice(sep + 3).split(" ");
const mpRaw = left[4];
const fstype = right[0];
if (!mpRaw || !fstype) continue;
result.push({ mountPoint: unescapeMount(mpRaw), fstype });
}
return result;
};
export const getMountForPath = async (targetPath: string): Promise<MountInfo | undefined> => {
const mounts = await readMountInfo();
let best: MountInfo | undefined;
for (const mount of mounts) {
if (!isPathWithin(mount.mountPoint, targetPath)) continue;
if (!best || mount.mountPoint.length > best.mountPoint.length) {
best = mount;
}
}
return best;
};
export const getStatFs = async (mountPoint: string) => {
const stat = await fs.statfs(mountPoint, { bigint: true });
const unit = stat.bsize > 0n ? stat.bsize : 1n;
const blocks = stat.blocks > 0n ? stat.blocks : 0n;
let bfree = stat.bfree > 0n ? stat.bfree : 0n;
if (bfree > blocks) bfree = blocks;
const bavail = stat.bavail > 0n ? stat.bavail : 0n;
const max = BigInt(Number.MAX_SAFE_INTEGER);
const toNumber = (value: bigint) => (value > max ? Number.MAX_SAFE_INTEGER : Number(value));
return {
total: toNumber(blocks * unit),
used: toNumber((blocks - bfree) * unit),
free: toNumber(bavail * unit),
};
};
export const isNodeJSErrnoException = (error: unknown): error is NodeJS.ErrnoException => {
return error instanceof Error && "code" in error;
};

View File

@@ -0,0 +1,31 @@
import { makeDirectoryBackend } from "./backends/directory";
import { makeNfsBackend } from "./backends/nfs";
import { makeRcloneBackend } from "./backends/rclone";
import { makeSftpBackend } from "./backends/sftp";
import { makeSmbBackend } from "./backends/smb";
import { makeWebdavBackend } from "./backends/webdav";
import { getVolumePath } from "./paths";
import type { AgentVolume, VolumeBackend } from "./types";
export { getStatFs, isNodeJSErrnoException } from "./fs";
export { getVolumePath } from "./paths";
export type { AgentVolume, BackendConfig, VolumeBackend } from "./types";
export const createVolumeBackend = (volume: AgentVolume, mountPath = getVolumePath(volume)): VolumeBackend => {
switch (volume.config.backend) {
case "directory":
return makeDirectoryBackend(volume.config, mountPath);
case "nfs":
return makeNfsBackend(volume.config, mountPath);
case "smb":
return makeSmbBackend(volume.config, mountPath);
case "webdav":
return makeWebdavBackend(volume.config, mountPath);
case "rclone":
return makeRcloneBackend(volume.config, mountPath);
case "sftp":
return makeSftpBackend(volume.config, mountPath);
}
throw new Error("Unsupported backend");
};

View File

@@ -0,0 +1,190 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import { toMessage } from "@zerobyte/core/utils";
import { createVolumeBackend, getVolumePath, isNodeJSErrnoException } from ".";
import type { AgentVolume, BackendConfig } from "./types";
import { Data, Effect } from "effect";
const DEFAULT_PAGE_SIZE = 500;
const MAX_PAGE_SIZE = 500;
export const listVolumeFiles = async (
volume: AgentVolume,
subPath?: string,
offset: number = 0,
limit: number = DEFAULT_PAGE_SIZE,
) => {
const volumePath = getVolumePath(volume);
const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath;
const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE);
const startOffset = Math.max(offset, 0);
try {
const realVolumeRoot = await fs.realpath(volumePath);
const realRequestedPath = await fs.realpath(requestedPath);
const relative = path.relative(realVolumeRoot, realRequestedPath);
if (relative.startsWith("..") || path.isAbsolute(relative)) {
throw new Error("Invalid path");
}
const dirents = await fs.readdir(realRequestedPath, { withFileTypes: true });
dirents.sort((a, b) => {
const aIsDir = a.isDirectory();
const bIsDir = b.isDirectory();
if (aIsDir === bIsDir) {
return a.name.localeCompare(b.name);
}
return aIsDir ? -1 : 1;
});
const total = dirents.length;
const paginatedDirents = dirents.slice(startOffset, startOffset + pageSize);
const entries = (
await Promise.all(
paginatedDirents.map(async (dirent) => {
const fullPath = path.join(realRequestedPath, dirent.name);
try {
const stats = await fs.stat(fullPath);
const relativePath = path.relative(realVolumeRoot, fullPath);
return {
name: dirent.name,
path: `/${relativePath}`,
type: dirent.isDirectory() ? ("directory" as const) : ("file" as const),
size: dirent.isFile() ? stats.size : undefined,
modifiedAt: stats.mtimeMs,
};
} catch {
return null;
}
}),
)
).filter((entry) => entry !== null);
return {
files: entries,
path: subPath || "/",
offset: startOffset,
limit: pageSize,
total,
hasMore: startOffset + pageSize < total,
};
} catch (error) {
if (isNodeJSErrnoException(error) && error.code === "ENOENT") {
throw new Error("Directory not found");
}
if (toMessage(error) === "Invalid path") {
throw error;
}
throw new Error(`Failed to list files: ${toMessage(error)}`);
}
};
export const browseFilesystem = async (browsePath: string) => {
const normalizedPath = path.normalize(browsePath);
const entries = await fs.readdir(normalizedPath, { withFileTypes: true });
const directories = await Promise.all(
entries
.filter((entry) => entry.isDirectory())
.map(async (entry) => {
const fullPath = path.join(normalizedPath, entry.name);
try {
const stats = await fs.stat(fullPath);
return {
name: entry.name,
path: fullPath,
type: "directory" as const,
size: undefined,
modifiedAt: stats.mtimeMs,
};
} catch {
return {
name: entry.name,
path: fullPath,
type: "directory" as const,
size: undefined,
modifiedAt: undefined,
};
}
}),
);
return {
directories: directories.sort((a, b) => a.name.localeCompare(b.name)),
path: normalizedPath,
};
};
class TempDirError extends Data.TaggedError("TempDirError")<{
cause: unknown;
}> {}
class CleanupError extends Data.TaggedError("CleanupError")<{
cause: unknown;
tempDir: string;
}> {}
class MountError extends Data.TaggedError("MountError")<{
cause: unknown;
}> {}
const createTempDir = Effect.acquireRelease(
Effect.tryPromise({
try: () => fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-")),
catch: (error) => new TempDirError({ cause: error }),
}),
(tempDir) =>
Effect.tryPromise({
try: () => fs.rm(tempDir, { recursive: true, force: true }),
catch: (error) => new CleanupError({ cause: error, tempDir }),
}).pipe(Effect.orDie),
);
export const testVolumeConnection = (backendConfig: BackendConfig) =>
Effect.scoped(
Effect.gen(function* () {
const tempDir = yield* createTempDir;
const mockVolume: AgentVolume = {
id: 0,
shortId: "test",
name: "test-connection",
config: backendConfig,
createdAt: Date.now(),
updatedAt: Date.now(),
lastHealthCheck: Date.now(),
type: backendConfig.backend,
status: "unmounted",
lastError: null,
provisioningId: null,
autoRemount: true,
agentId: "local",
organizationId: "test-org",
};
const backend = createVolumeBackend(mockVolume, tempDir);
const mountResult = yield* Effect.tryPromise({
try: () => backend.mount(),
catch: (error) => new MountError({ cause: error }),
});
yield* Effect.tryPromise({
try: () => backend.unmount(),
catch: () => undefined,
});
return {
success: !mountResult.error,
message: mountResult.error ? toMessage(mountResult.error) : "Connection successful",
};
}),
);

View File

@@ -0,0 +1,10 @@
import { VOLUME_MOUNT_BASE } from "./constants";
import type { AgentVolume } from "./types";
export const getVolumePath = (volume: AgentVolume) => {
if (volume.config.backend === "directory") {
return volume.config.path;
}
return `${VOLUME_MOUNT_BASE}/${volume.shortId}/_data`;
};

View File

@@ -0,0 +1,21 @@
class TimeoutError extends Error {
code = "ETIMEOUT";
constructor(message: string) {
super(message);
this.name = "TimeoutError";
}
}
export const withTimeout = async <T>(promise: Promise<T>, ms: number, label: string): Promise<T> => {
let timeout: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<never>((_, reject) => {
timeout = setTimeout(() => reject(new TimeoutError(`${label} timed out after ${ms}ms`)), ms);
}),
]);
} finally {
if (timeout) clearTimeout(timeout);
}
};

View File

@@ -0,0 +1,67 @@
export type BackendStatus = "mounted" | "unmounted" | "error";
type BaseConfig = { backend: string; readOnly?: boolean };
export type BackendConfig =
| (BaseConfig & { backend: "directory"; path: string })
| (BaseConfig & { backend: "nfs"; server: string; exportPath: string; port: number; version: "3" | "4" | "4.1" })
| (BaseConfig & {
backend: "smb";
server: string;
share: string;
username?: string;
password?: string;
guest?: boolean;
vers?: "1.0" | "2.0" | "2.1" | "3.0" | "auto";
domain?: string;
port: number;
})
| (BaseConfig & {
backend: "webdav";
server: string;
path: string;
username?: string;
password?: string;
port: number;
ssl?: boolean;
})
| (BaseConfig & { backend: "rclone"; remote: string; path: string })
| (BaseConfig & {
backend: "sftp";
host: string;
port: number;
username: string;
password?: string;
privateKey?: string;
path: string;
skipHostKeyCheck?: boolean;
knownHosts?: string;
});
export type AgentVolume = {
id: number;
shortId: string;
name: string;
config: BackendConfig;
createdAt: number;
updatedAt: number;
lastHealthCheck: number;
type: string;
status: BackendStatus;
lastError: string | null;
provisioningId?: string | null;
autoRemount: boolean;
agentId: string;
organizationId: string;
};
export type OperationResult = {
status: BackendStatus;
error?: string;
};
export type VolumeBackend = {
mount: () => Promise<OperationResult>;
unmount: () => Promise<OperationResult>;
checkHealth: () => Promise<OperationResult>;
};

View File

@@ -51,6 +51,111 @@ const backupCancelSchema = z.object({
payload: z.object({ jobId: z.string(), scheduleId: z.string() }),
});
const backendStatusSchema = z.enum(["mounted", "unmounted", "error"]);
const volumeSchema = z.object({
id: z.number(),
shortId: z.string(),
name: z.string(),
path: z.string().nullable().optional(),
config: z.record(z.string(), z.unknown()).and(z.object({ backend: z.string() })),
createdAt: z.number(),
updatedAt: z.number(),
lastHealthCheck: z.number(),
type: z.string(),
status: backendStatusSchema,
lastError: z.string().nullable(),
provisioningId: z.string().nullable().optional(),
autoRemount: z.boolean(),
agentId: z.string(),
organizationId: z.string(),
});
const volumeOperationResultSchema = z.object({
status: backendStatusSchema,
error: z.string().optional(),
});
const statfsSchema = z.object({
total: z.number().optional(),
used: z.number().optional(),
free: z.number().optional(),
});
const fileEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.enum(["directory", "file"]),
size: z.number().optional(),
modifiedAt: z.number().optional(),
});
const directoryEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.literal("directory"),
size: z.undefined().optional(),
modifiedAt: z.number().optional(),
});
const volumeCommandSchema = z.discriminatedUnion("name", [
z.object({ name: z.literal("volume.mount"), volume: volumeSchema }),
z.object({ name: z.literal("volume.unmount"), volume: volumeSchema }),
z.object({ name: z.literal("volume.checkHealth"), volume: volumeSchema }),
z.object({ name: z.literal("volume.statfs"), volume: volumeSchema }),
z.object({
name: z.literal("volume.listFiles"),
volume: volumeSchema,
subPath: z.string().optional(),
offset: z.number(),
limit: z.number(),
}),
z.object({ name: z.literal("volume.testConnection"), backendConfig: z.record(z.string(), z.unknown()) }),
z.object({ name: z.literal("filesystem.browse"), path: z.string() }),
]);
const volumeCommandRequestSchema = z.object({
type: z.literal("volume.command"),
payload: z.object({
commandId: z.string(),
command: volumeCommandSchema,
}),
});
const volumeCommandResultSchema = z.discriminatedUnion("name", [
z.object({ name: z.literal("volume.mount"), result: volumeOperationResultSchema }),
z.object({ name: z.literal("volume.unmount"), result: volumeOperationResultSchema }),
z.object({ name: z.literal("volume.checkHealth"), result: volumeOperationResultSchema }),
z.object({ name: z.literal("volume.statfs"), result: statfsSchema }),
z.object({
name: z.literal("volume.listFiles"),
result: z.object({
files: z.array(fileEntrySchema),
path: z.string(),
offset: z.number(),
limit: z.number(),
total: z.number(),
hasMore: z.boolean(),
}),
}),
z.object({
name: z.literal("volume.testConnection"),
result: z.object({ success: z.boolean(), message: z.string() }),
}),
z.object({
name: z.literal("filesystem.browse"),
result: z.object({ directories: z.array(directoryEntrySchema), path: z.string() }),
}),
]);
const volumeCommandResponseSchema = z.object({
type: z.literal("volume.commandResult"),
payload: z.discriminatedUnion("status", [
z.object({ commandId: z.string(), status: z.literal("success"), command: volumeCommandResultSchema }),
z.object({ commandId: z.string(), status: z.literal("error"), error: z.string() }),
]),
});
const heartbeatPingSchema = z.object({
type: z.literal("heartbeat.ping"),
payload: z.object({ sentAt: z.number() }),
@@ -113,6 +218,7 @@ const heartbeatPongSchema = z.object({
const controllerMessageSchema = z.discriminatedUnion("type", [
backupRunSchema,
backupCancelSchema,
volumeCommandRequestSchema,
heartbeatPingSchema,
]);
const agentMessageSchema = z.discriminatedUnion("type", [
@@ -122,6 +228,7 @@ const agentMessageSchema = z.discriminatedUnion("type", [
backupCompletedSchema,
backupFailedSchema,
backupCancelledSchema,
volumeCommandResponseSchema,
heartbeatPongSchema,
]);
@@ -132,6 +239,10 @@ export type BackupProgressPayload = z.infer<typeof backupProgressSchema>["payloa
export type BackupCompletedPayload = z.infer<typeof backupCompletedSchema>["payload"];
export type BackupFailedPayload = z.infer<typeof backupFailedSchema>["payload"];
export type BackupCancelledPayload = z.infer<typeof backupCancelledSchema>["payload"];
export type VolumeCommandPayload = z.infer<typeof volumeCommandRequestSchema>["payload"];
export type VolumeCommand = z.infer<typeof volumeCommandSchema>;
export type VolumeCommandResult = z.infer<typeof volumeCommandResultSchema>;
export type VolumeCommandResponsePayload = z.infer<typeof volumeCommandResponseSchema>["payload"];
export type ControllerMessage = z.infer<typeof controllerMessageSchema>;
export type AgentMessage = z.infer<typeof agentMessageSchema>;