mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-05-19 14:08:24 -04:00
refactor(server): route volume operations through agents (#862)
* refactor(server): route volume operations through agents * chore: pr feedbacks
This commit is contained in:
@@ -1,37 +0,0 @@
|
||||
import { makeDirectoryBackend } from "../../../../apps/agent/src/volume-host/backends/directory";
|
||||
import { makeNfsBackend } from "../../../../apps/agent/src/volume-host/backends/nfs";
|
||||
import { makeRcloneBackend } from "../../../../apps/agent/src/volume-host/backends/rclone";
|
||||
import { makeSftpBackend } from "../../../../apps/agent/src/volume-host/backends/sftp";
|
||||
import { makeSmbBackend } from "../../../../apps/agent/src/volume-host/backends/smb";
|
||||
import { makeWebdavBackend } from "../../../../apps/agent/src/volume-host/backends/webdav";
|
||||
import type { VolumeBackend } from "../../../../apps/agent/src/volume-host/types";
|
||||
import type { Volume } from "../../db/schema";
|
||||
import { getVolumePath } from "../volumes/helpers";
|
||||
|
||||
export type { VolumeBackend };
|
||||
|
||||
export const createVolumeBackend = (volume: Volume, mountPath = getVolumePath(volume)): VolumeBackend => {
|
||||
switch (volume.config.backend) {
|
||||
case "nfs": {
|
||||
return makeNfsBackend(volume.config, mountPath);
|
||||
}
|
||||
case "smb": {
|
||||
return makeSmbBackend(volume.config, mountPath);
|
||||
}
|
||||
case "directory": {
|
||||
return makeDirectoryBackend(volume.config, mountPath);
|
||||
}
|
||||
case "webdav": {
|
||||
return makeWebdavBackend(volume.config, mountPath);
|
||||
}
|
||||
case "rclone": {
|
||||
return makeRcloneBackend(volume.config, mountPath);
|
||||
}
|
||||
case "sftp": {
|
||||
return makeSftpBackend(volume.config, mountPath);
|
||||
}
|
||||
default: {
|
||||
throw new Error("Unsupported backend");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1,7 +1,7 @@
|
||||
import { afterEach, describe, expect, test, vi } from "vitest";
|
||||
import { Scheduler } from "../../../core/scheduler";
|
||||
import * as backendModule from "../../backends/backend";
|
||||
import type { VolumeBackend } from "../../backends/backend";
|
||||
import * as volumeHostModule from "../../../../../apps/agent/src/volume-host";
|
||||
import type { VolumeBackend } from "../../../../../apps/agent/src/volume-host";
|
||||
import * as bootstrapModule from "../bootstrap";
|
||||
import { createTestVolume } from "~/test/helpers/volume";
|
||||
|
||||
@@ -40,7 +40,7 @@ describe("shutdown", () => {
|
||||
|
||||
vi.spyOn(Scheduler, "stop").mockImplementation(stopScheduler);
|
||||
vi.spyOn(bootstrapModule, "stopApplicationRuntime").mockImplementation(stopApplicationRuntime);
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(
|
||||
vi.spyOn(volumeHostModule, "createVolumeBackend").mockImplementation(
|
||||
() =>
|
||||
({
|
||||
mount: async () => ({ status: "mounted" as const }),
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Scheduler } from "../../core/scheduler";
|
||||
import { db } from "../../db/db";
|
||||
import { logger } from "@zerobyte/core/node";
|
||||
import { createVolumeBackend } from "../backends/backend";
|
||||
import { stopApplicationRuntime } from "./bootstrap";
|
||||
import { decryptVolumeConfig } from "../volumes/volume-config-secrets";
|
||||
import { createVolumeBackend } from "../../../../apps/agent/src/volume-host";
|
||||
|
||||
export const shutdown = async () => {
|
||||
await Scheduler.stop();
|
||||
@@ -13,7 +14,11 @@ export const shutdown = async () => {
|
||||
});
|
||||
|
||||
for (const volume of volumes) {
|
||||
const backend = createVolumeBackend(volume);
|
||||
const backend = createVolumeBackend({
|
||||
...volume,
|
||||
config: await decryptVolumeConfig(volume.config),
|
||||
provisioningId: volume.provisioningId ?? null,
|
||||
});
|
||||
const { status, error } = await backend.unmount();
|
||||
|
||||
logger.info(`Volume ${volume.name} unmount status: ${status}${error ? `, error: ${error}` : ""}`);
|
||||
|
||||
@@ -1,18 +1,26 @@
|
||||
import { beforeAll, describe, expect, test } from "vitest";
|
||||
import { afterAll, beforeAll, describe, expect, test } from "vitest";
|
||||
import { db } from "~/server/db/db";
|
||||
import { volumesTable } from "~/server/db/schema";
|
||||
import { createApp } from "~/server/app";
|
||||
import { createTestSession, getAuthHeaders } from "~/test/helpers/auth";
|
||||
import { generateShortId } from "~/server/utils/id";
|
||||
import { config } from "~/server/core/config";
|
||||
|
||||
const app = createApp();
|
||||
|
||||
let session: Awaited<ReturnType<typeof createTestSession>>;
|
||||
let previousEnableLocalAgent: boolean;
|
||||
|
||||
beforeAll(async () => {
|
||||
previousEnableLocalAgent = config.flags.enableLocalAgent;
|
||||
config.flags.enableLocalAgent = false;
|
||||
session = await createTestSession();
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
config.flags.enableLocalAgent = previousEnableLocalAgent;
|
||||
});
|
||||
|
||||
const createManagedVolumeRecord = async (organizationId: string) => {
|
||||
const [volume] = await db
|
||||
.insert(volumesTable)
|
||||
|
||||
@@ -1,19 +1,26 @@
|
||||
import { afterEach, describe, expect, test, vi } from "vitest";
|
||||
const agentManagerMock = vi.hoisted(() => ({
|
||||
runVolumeCommand: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../agents/agents-manager", () => ({
|
||||
agentManager: agentManagerMock,
|
||||
}));
|
||||
|
||||
import { volumeService } from "../volume.service";
|
||||
import { db } from "~/server/db/db";
|
||||
import { volumesTable } from "~/server/db/schema";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import * as fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { createTestSession } from "~/test/helpers/auth";
|
||||
import { withContext } from "~/server/core/request-context";
|
||||
import { asShortId } from "~/server/utils/branded";
|
||||
import { createTestVolume } from "~/test/helpers/volume";
|
||||
import * as backendModule from "../../backends/backend";
|
||||
import { config } from "~/server/core/config";
|
||||
|
||||
afterEach(() => {
|
||||
config.flags.enableLocalAgent = false;
|
||||
vi.restoreAllMocks();
|
||||
agentManagerMock.runVolumeCommand.mockReset();
|
||||
});
|
||||
|
||||
describe("volumeService.getVolume", () => {
|
||||
@@ -98,13 +105,7 @@ describe("volumeService.getVolume", () => {
|
||||
describe("volumeService.listFiles security", () => {
|
||||
test("should reject traversal outside the volume root in listFiles", async () => {
|
||||
const { organizationId, user } = await createTestSession();
|
||||
const tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-vol-svc-"));
|
||||
const volumePath = path.join(tempRoot, "vol");
|
||||
const secretPath = path.join(tempRoot, "volume-secret");
|
||||
|
||||
await fs.mkdir(volumePath, { recursive: true });
|
||||
await fs.mkdir(secretPath, { recursive: true });
|
||||
await fs.writeFile(path.join(secretPath, "secret.txt"), "top secret", "utf-8");
|
||||
agentManagerMock.runVolumeCommand.mockRejectedValue(new Error("Invalid path"));
|
||||
|
||||
const [volume] = await db
|
||||
.insert(volumesTable)
|
||||
@@ -113,44 +114,40 @@ describe("volumeService.listFiles security", () => {
|
||||
name: `test-vol-${randomUUID().slice(0, 8)}`,
|
||||
type: "directory",
|
||||
status: "mounted",
|
||||
config: { backend: "directory", path: volumePath },
|
||||
config: { backend: "directory", path: "/tmp/volume" },
|
||||
autoRemount: true,
|
||||
organizationId,
|
||||
})
|
||||
.returning();
|
||||
|
||||
try {
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
const traversalPath = `../${path.basename(secretPath)}`;
|
||||
|
||||
await expect(volumeService.listFiles(volume.shortId, traversalPath)).rejects.toThrow("Invalid path");
|
||||
});
|
||||
} finally {
|
||||
await fs.rm(tempRoot, { recursive: true, force: true });
|
||||
}
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
await expect(volumeService.listFiles(volume.shortId, "../volume-secret")).rejects.toThrow("Invalid path");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("volumeService.mountVolume", () => {
|
||||
test("unmounts any existing mount before mounting", async () => {
|
||||
test("routes unmount and mount to the owning agent before updating state", async () => {
|
||||
const { organizationId, user } = await createTestSession();
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted" });
|
||||
const unmount = vi.fn().mockResolvedValue({ status: "unmounted" });
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({
|
||||
mount,
|
||||
unmount,
|
||||
checkHealth: vi.fn().mockResolvedValue({ status: "mounted" }),
|
||||
}));
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted", agentId: "agent-1" });
|
||||
agentManagerMock.runVolumeCommand
|
||||
.mockResolvedValueOnce({ name: "volume.unmount", result: { status: "unmounted" } })
|
||||
.mockResolvedValueOnce({ name: "volume.mount", result: { status: "mounted" } });
|
||||
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
const result = await volumeService.mountVolume(volume.shortId);
|
||||
|
||||
expect(result.status).toBe("mounted");
|
||||
expect(unmount).toHaveBeenCalledOnce();
|
||||
expect(mount).toHaveBeenCalledOnce();
|
||||
expect(unmount.mock.invocationCallOrder[0]).toBeLessThan(mount.mock.invocationCallOrder[0]);
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
volume.agentId,
|
||||
expect.objectContaining({ name: "volume.unmount", volume: expect.objectContaining({ id: volume.id }) }),
|
||||
);
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
volume.agentId,
|
||||
expect.objectContaining({ name: "volume.mount", volume: expect.objectContaining({ id: volume.id }) }),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -158,15 +155,8 @@ describe("volumeService.mountVolume", () => {
|
||||
describe("volumeService.ensureHealthyVolume", () => {
|
||||
test("returns ready when the mounted volume passes its health check", async () => {
|
||||
const { organizationId, user } = await createTestSession();
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted" });
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
const checkHealth = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({
|
||||
mount,
|
||||
unmount: vi.fn().mockResolvedValue({ status: "unmounted" }),
|
||||
checkHealth,
|
||||
}));
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted", agentId: "agent-1" });
|
||||
agentManagerMock.runVolumeCommand.mockResolvedValue({ name: "volume.checkHealth", result: { status: "mounted" } });
|
||||
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
const result = await volumeService.ensureHealthyVolume(volume.shortId);
|
||||
@@ -176,22 +166,21 @@ describe("volumeService.ensureHealthyVolume", () => {
|
||||
volume: expect.objectContaining({ id: volume.id, status: "mounted", lastError: null }),
|
||||
remounted: false,
|
||||
});
|
||||
expect(checkHealth).toHaveBeenCalledOnce();
|
||||
expect(mount).not.toHaveBeenCalled();
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledOnce();
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledWith(
|
||||
volume.agentId,
|
||||
expect.objectContaining({ name: "volume.checkHealth", volume: expect.objectContaining({ id: volume.id }) }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
test("auto-remounts when the mounted volume fails its health check", async () => {
|
||||
const { organizationId, user } = await createTestSession();
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: true });
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
const checkHealth = vi.fn().mockResolvedValue({ status: "error", error: "stale mount" });
|
||||
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({
|
||||
mount,
|
||||
unmount: vi.fn().mockResolvedValue({ status: "unmounted" }),
|
||||
checkHealth,
|
||||
}));
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: true, agentId: "agent-1" });
|
||||
agentManagerMock.runVolumeCommand
|
||||
.mockResolvedValueOnce({ name: "volume.checkHealth", result: { status: "error", error: "stale mount" } })
|
||||
.mockResolvedValueOnce({ name: "volume.unmount", result: { status: "unmounted" } })
|
||||
.mockResolvedValueOnce({ name: "volume.mount", result: { status: "mounted" } });
|
||||
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
const result = await volumeService.ensureHealthyVolume(volume.shortId);
|
||||
@@ -201,8 +190,7 @@ describe("volumeService.ensureHealthyVolume", () => {
|
||||
volume: expect.objectContaining({ id: volume.id, status: "mounted", lastError: null }),
|
||||
remounted: true,
|
||||
});
|
||||
expect(checkHealth).toHaveBeenCalledOnce();
|
||||
expect(mount).toHaveBeenCalledOnce();
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledTimes(3);
|
||||
|
||||
const updatedVolume = await db.query.volumesTable.findFirst({ where: { id: volume.id } });
|
||||
expect(updatedVolume?.status).toBe("mounted");
|
||||
@@ -212,15 +200,16 @@ describe("volumeService.ensureHealthyVolume", () => {
|
||||
|
||||
test("returns not ready when the health check fails and auto-remount is disabled", async () => {
|
||||
const { organizationId, user } = await createTestSession();
|
||||
const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: false });
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
const checkHealth = vi.fn().mockResolvedValue({ status: "error", error: "stale mount" });
|
||||
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({
|
||||
mount,
|
||||
unmount: vi.fn().mockResolvedValue({ status: "unmounted" }),
|
||||
checkHealth,
|
||||
}));
|
||||
const volume = await createTestVolume({
|
||||
organizationId,
|
||||
status: "mounted",
|
||||
autoRemount: false,
|
||||
agentId: "agent-1",
|
||||
});
|
||||
agentManagerMock.runVolumeCommand.mockResolvedValue({
|
||||
name: "volume.checkHealth",
|
||||
result: { status: "error", error: "stale mount" },
|
||||
});
|
||||
|
||||
await withContext({ organizationId, userId: user.id }, async () => {
|
||||
const result = await volumeService.ensureHealthyVolume(volume.shortId);
|
||||
@@ -230,54 +219,17 @@ describe("volumeService.ensureHealthyVolume", () => {
|
||||
volume: expect.objectContaining({ id: volume.id, status: "error", lastError: "stale mount" }),
|
||||
reason: "stale mount",
|
||||
});
|
||||
expect(checkHealth).toHaveBeenCalledOnce();
|
||||
expect(mount).not.toHaveBeenCalled();
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("volumeService.testConnection", () => {
|
||||
test("uses an isolated temp mount path for backend test connections", async () => {
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
const unmount = vi.fn().mockResolvedValue({ status: "unmounted" });
|
||||
const createVolumeBackendSpy = vi.spyOn(backendModule, "createVolumeBackend").mockReturnValue({
|
||||
mount,
|
||||
unmount,
|
||||
checkHealth: vi.fn(),
|
||||
});
|
||||
|
||||
await volumeService.testConnection({
|
||||
backend: "nfs",
|
||||
server: "127.0.0.1",
|
||||
exportPath: "/exports/test",
|
||||
version: "4",
|
||||
port: 2049,
|
||||
readOnly: false,
|
||||
});
|
||||
|
||||
expect(createVolumeBackendSpy).toHaveBeenCalledOnce();
|
||||
const [, mountPath] = createVolumeBackendSpy.mock.calls[0];
|
||||
expect(mountPath).toEqual(expect.stringContaining(`${path.sep}zerobyte-test-`));
|
||||
await expect(fs.access(mountPath as string)).rejects.toThrow();
|
||||
expect(mount).toHaveBeenCalledOnce();
|
||||
expect(unmount).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
test("does not fail when backend unmount already removed the temp mount path", async () => {
|
||||
const mount = vi.fn().mockResolvedValue({ status: "mounted" });
|
||||
let mountPath: string | undefined;
|
||||
const unmount = vi.fn().mockImplementation(async () => {
|
||||
await fs.rm(mountPath!, { recursive: true, force: true });
|
||||
return { status: "unmounted" };
|
||||
});
|
||||
|
||||
vi.spyOn(backendModule, "createVolumeBackend").mockImplementation((_volume, tempPath) => {
|
||||
mountPath = tempPath;
|
||||
return {
|
||||
mount,
|
||||
unmount,
|
||||
checkHealth: vi.fn(),
|
||||
};
|
||||
test("routes test connections to the local agent", async () => {
|
||||
config.flags.enableLocalAgent = true;
|
||||
agentManagerMock.runVolumeCommand.mockResolvedValue({
|
||||
name: "volume.testConnection",
|
||||
result: { success: true, message: "Connection successful" },
|
||||
});
|
||||
|
||||
await expect(
|
||||
@@ -294,9 +246,9 @@ describe("volumeService.testConnection", () => {
|
||||
message: "Connection successful",
|
||||
});
|
||||
|
||||
expect(mountPath).toEqual(expect.stringContaining(`${path.sep}zerobyte-test-`));
|
||||
await expect(fs.access(mountPath as string)).rejects.toThrow();
|
||||
expect(mount).toHaveBeenCalledOnce();
|
||||
expect(unmount).toHaveBeenCalledOnce();
|
||||
expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledWith(
|
||||
"local",
|
||||
expect.objectContaining({ name: "volume.testConnection" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,38 +1,34 @@
|
||||
import * as fs from "node:fs/promises";
|
||||
import * as os from "node:os";
|
||||
import * as path from "node:path";
|
||||
import { and, eq } from "drizzle-orm";
|
||||
import { BadRequestError, InternalServerError, NotFoundError } from "http-errors-enhanced";
|
||||
import { db } from "../../db/db";
|
||||
import { volumesTable } from "../../db/schema";
|
||||
import { toMessage } from "../../utils/errors";
|
||||
import { generateShortId } from "../../utils/id";
|
||||
import { getStatFs, type StatFs } from "../../utils/mountinfo";
|
||||
import type { StatFs } from "../../utils/mountinfo";
|
||||
import { withTimeout } from "../../utils/timeout";
|
||||
import { createVolumeBackend } from "../backends/backend";
|
||||
import { config } from "../../core/config";
|
||||
import { LOCAL_AGENT_ID } from "../agents/constants";
|
||||
import { agentManager } from "../agents/agents-manager";
|
||||
import type { UpdateVolumeBody } from "./volume.dto";
|
||||
import { getVolumePath } from "./helpers";
|
||||
import { logger } from "@zerobyte/core/node";
|
||||
import { serverEvents } from "../../core/events";
|
||||
import type { Volume } from "../../db/schema";
|
||||
import { volumeConfigSchema, type BackendConfig } from "~/schemas/volumes";
|
||||
import { getOrganizationId } from "~/server/core/request-context";
|
||||
import { isNodeJSErrnoException } from "~/server/utils/fs";
|
||||
import { asShortId, type ShortId } from "~/server/utils/branded";
|
||||
import { type ShortId } from "~/server/utils/branded";
|
||||
import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets";
|
||||
import type { VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol";
|
||||
import { createVolumeBackend, getStatFs, getVolumePath } from "../../../../apps/agent/src/volume-host";
|
||||
import {
|
||||
browseFilesystem as browseHostFilesystem,
|
||||
listVolumeFiles,
|
||||
testVolumeConnection,
|
||||
} from "../../../../apps/agent/src/volume-host/operations";
|
||||
import { Effect } from "effect";
|
||||
|
||||
type EnsureHealthyVolumeResult =
|
||||
| {
|
||||
ready: true;
|
||||
volume: Volume;
|
||||
remounted: boolean;
|
||||
}
|
||||
| {
|
||||
ready: false;
|
||||
volume: Volume;
|
||||
reason: string;
|
||||
};
|
||||
| { ready: true; volume: Volume; remounted: boolean }
|
||||
| { ready: false; volume: Volume; reason: string };
|
||||
|
||||
const listVolumes = async () => {
|
||||
const organizationId = getOrganizationId();
|
||||
@@ -53,6 +49,53 @@ const findVolume = async (shortId: ShortId) => {
|
||||
});
|
||||
};
|
||||
|
||||
const runVolumeCommand = async <TCommand extends VolumeCommand>(agentId: string, command: TCommand) => {
|
||||
const result = await agentManager.runVolumeCommand(agentId, command);
|
||||
if (result.name !== command.name) {
|
||||
throw new InternalServerError(`Unexpected agent response for ${command.name}`);
|
||||
}
|
||||
|
||||
return result as Extract<VolumeCommandResult, { name: TCommand["name"] }>;
|
||||
};
|
||||
|
||||
const volumeForAgent = async (volume: Volume): Promise<Volume> => ({
|
||||
...volume,
|
||||
config: await decryptVolumeConfig(volume.config),
|
||||
});
|
||||
|
||||
const volumeForHost = async (volume: Volume): Promise<Volume> => ({
|
||||
...volume,
|
||||
shortId: volume.shortId,
|
||||
config: await decryptVolumeConfig(volume.config),
|
||||
provisioningId: volume.provisioningId ?? null,
|
||||
});
|
||||
|
||||
// TODO(agent-rollout): Remove the local host execution branch once all installs run volume operations through agents.
|
||||
const shouldRunViaAgent = (volume: Volume) => volume.agentId !== LOCAL_AGENT_ID || config.flags.enableLocalAgent;
|
||||
|
||||
const runVolumeBackendCommand = async (
|
||||
volume: Volume,
|
||||
name: "volume.mount" | "volume.unmount" | "volume.checkHealth",
|
||||
) => {
|
||||
if (!shouldRunViaAgent(volume)) {
|
||||
const backend = createVolumeBackend(await volumeForHost(volume));
|
||||
switch (name) {
|
||||
case "volume.mount":
|
||||
return backend.mount();
|
||||
case "volume.unmount":
|
||||
return backend.unmount();
|
||||
case "volume.checkHealth":
|
||||
return backend.checkHealth();
|
||||
}
|
||||
}
|
||||
|
||||
const command = await runVolumeCommand(volume.agentId, {
|
||||
name,
|
||||
volume: await volumeForAgent(volume),
|
||||
});
|
||||
return command.result;
|
||||
};
|
||||
|
||||
const createVolume = async (name: string, backendConfig: BackendConfig) => {
|
||||
const organizationId = getOrganizationId();
|
||||
const trimmedName = name.trim();
|
||||
@@ -80,8 +123,7 @@ const createVolume = async (name: string, backendConfig: BackendConfig) => {
|
||||
throw new InternalServerError("Failed to create volume");
|
||||
}
|
||||
|
||||
const backend = createVolumeBackend({ ...created, config: await decryptVolumeConfig(created.config) });
|
||||
const { error, status } = await backend.mount();
|
||||
const { error, status } = await runVolumeBackendCommand(created, "volume.mount");
|
||||
|
||||
await db
|
||||
.update(volumesTable)
|
||||
@@ -99,8 +141,7 @@ const deleteVolume = async (shortId: ShortId) => {
|
||||
throw new NotFoundError("Volume not found");
|
||||
}
|
||||
|
||||
const backend = createVolumeBackend(volume);
|
||||
await backend.unmount();
|
||||
await runVolumeBackendCommand(volume, "volume.unmount");
|
||||
await db
|
||||
.delete(volumesTable)
|
||||
.where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId)));
|
||||
@@ -114,9 +155,8 @@ const mountVolume = async (shortId: ShortId) => {
|
||||
throw new NotFoundError("Volume not found");
|
||||
}
|
||||
|
||||
const backend = createVolumeBackend({ ...volume, config: await decryptVolumeConfig(volume.config) });
|
||||
await backend.unmount();
|
||||
const { error, status } = await backend.mount();
|
||||
await runVolumeBackendCommand(volume, "volume.unmount");
|
||||
const { error, status } = await runVolumeBackendCommand(volume, "volume.mount");
|
||||
|
||||
await db
|
||||
.update(volumesTable)
|
||||
@@ -138,8 +178,7 @@ const unmountVolume = async (shortId: ShortId) => {
|
||||
throw new NotFoundError("Volume not found");
|
||||
}
|
||||
|
||||
const backend = createVolumeBackend(volume);
|
||||
const { status, error } = await backend.unmount();
|
||||
const { status, error } = await runVolumeBackendCommand(volume, "volume.unmount");
|
||||
|
||||
await db
|
||||
.update(volumesTable)
|
||||
@@ -162,7 +201,16 @@ const getVolume = async (shortId: ShortId) => {
|
||||
|
||||
let statfs: Partial<StatFs> = {};
|
||||
if (volume.status === "mounted") {
|
||||
statfs = await withTimeout(getStatFs(getVolumePath(volume)), 1000, "getStatFs").catch((error) => {
|
||||
statfs = await withTimeout(
|
||||
shouldRunViaAgent(volume)
|
||||
? runVolumeCommand(volume.agentId, {
|
||||
name: "volume.statfs",
|
||||
volume: await volumeForAgent(volume),
|
||||
}).then((command) => command.result)
|
||||
: volumeForHost(volume).then((hostVolume) => getStatFs(getVolumePath(hostVolume))),
|
||||
1000,
|
||||
"volume.statfs",
|
||||
).catch((error) => {
|
||||
logger.warn(`Failed to get statfs for volume ${volume.name}: ${toMessage(error)}`);
|
||||
return {};
|
||||
});
|
||||
@@ -190,8 +238,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
|
||||
|
||||
if (configChanged) {
|
||||
logger.debug("Unmounting existing volume before applying new config");
|
||||
const backend = createVolumeBackend(existing);
|
||||
await backend.unmount();
|
||||
await runVolumeBackendCommand(existing, "volume.unmount");
|
||||
}
|
||||
|
||||
const newConfigResult = volumeConfigSchema.safeParse(volumeData.config || existing.config);
|
||||
@@ -219,8 +266,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
|
||||
}
|
||||
|
||||
if (configChanged) {
|
||||
const backend = createVolumeBackend({ ...updated, config: await decryptVolumeConfig(updated.config) });
|
||||
const { error, status } = await backend.mount();
|
||||
const { error, status } = await runVolumeBackendCommand(updated, "volume.mount");
|
||||
await db
|
||||
.update(volumesTable)
|
||||
.set({ status, lastError: error ?? null, lastHealthCheck: Date.now() })
|
||||
@@ -233,38 +279,12 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => {
|
||||
};
|
||||
|
||||
const testConnection = async (backendConfig: BackendConfig) => {
|
||||
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-"));
|
||||
try {
|
||||
const mockVolume = {
|
||||
id: 0,
|
||||
shortId: asShortId("test"),
|
||||
name: "test-connection",
|
||||
path: tempDir,
|
||||
config: backendConfig,
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
lastHealthCheck: Date.now(),
|
||||
type: backendConfig.backend,
|
||||
status: "unmounted" as const,
|
||||
lastError: null,
|
||||
provisioningId: null,
|
||||
autoRemount: true,
|
||||
agentId: LOCAL_AGENT_ID,
|
||||
organizationId: "test-org",
|
||||
};
|
||||
|
||||
const backend = createVolumeBackend(mockVolume, tempDir);
|
||||
const { error } = await backend.mount();
|
||||
|
||||
await backend.unmount();
|
||||
|
||||
return {
|
||||
success: !error,
|
||||
message: error ? toMessage(error) : "Connection successful",
|
||||
};
|
||||
} finally {
|
||||
await fs.rm(tempDir, { recursive: true, force: true });
|
||||
if (!config.flags.enableLocalAgent) {
|
||||
return Effect.runPromise(testVolumeConnection(backendConfig));
|
||||
}
|
||||
|
||||
const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "volume.testConnection", backendConfig });
|
||||
return command.result;
|
||||
};
|
||||
|
||||
const checkHealth = async (shortId: ShortId) => {
|
||||
@@ -275,8 +295,7 @@ const checkHealth = async (shortId: ShortId) => {
|
||||
throw new NotFoundError("Volume not found");
|
||||
}
|
||||
|
||||
const backend = createVolumeBackend(volume);
|
||||
const { error, status } = await backend.checkHealth();
|
||||
const { error, status } = await runVolumeBackendCommand(volume, "volume.checkHealth");
|
||||
|
||||
if (status !== volume.status) {
|
||||
serverEvents.emit("volume:status_changed", { organizationId, volumeName: volume.name, status });
|
||||
@@ -344,7 +363,6 @@ const ensureHealthyVolume = async (shortId: ShortId): Promise<EnsureHealthyVolum
|
||||
};
|
||||
|
||||
const DEFAULT_PAGE_SIZE = 500;
|
||||
const MAX_PAGE_SIZE = 500;
|
||||
|
||||
const listFiles = async (shortId: ShortId, subPath?: string, offset: number = 0, limit: number = DEFAULT_PAGE_SIZE) => {
|
||||
const volume = await findVolume(shortId);
|
||||
@@ -357,110 +375,32 @@ const listFiles = async (shortId: ShortId, subPath?: string, offset: number = 0,
|
||||
throw new InternalServerError("Volume is not mounted");
|
||||
}
|
||||
|
||||
const volumePath = getVolumePath(volume);
|
||||
const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath;
|
||||
const normalizedPath = path.normalize(requestedPath);
|
||||
const relative = path.relative(volumePath, normalizedPath);
|
||||
|
||||
if (relative.startsWith("..") || path.isAbsolute(relative)) {
|
||||
throw new BadRequestError("Invalid path");
|
||||
}
|
||||
|
||||
const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE);
|
||||
const startOffset = Math.max(offset, 0);
|
||||
|
||||
try {
|
||||
const dirents = await fs.readdir(normalizedPath, { withFileTypes: true });
|
||||
|
||||
dirents.sort((a, b) => {
|
||||
const aIsDir = a.isDirectory();
|
||||
const bIsDir = b.isDirectory();
|
||||
|
||||
if (aIsDir === bIsDir) {
|
||||
return a.name.localeCompare(b.name);
|
||||
}
|
||||
return aIsDir ? -1 : 1;
|
||||
});
|
||||
|
||||
const total = dirents.length;
|
||||
const paginatedDirents = dirents.slice(startOffset, startOffset + pageSize);
|
||||
|
||||
const entries = (
|
||||
await Promise.all(
|
||||
paginatedDirents.map(async (dirent) => {
|
||||
const fullPath = path.join(normalizedPath, dirent.name);
|
||||
|
||||
try {
|
||||
const stats = await fs.stat(fullPath);
|
||||
const relativePath = path.relative(volumePath, fullPath);
|
||||
|
||||
return {
|
||||
name: dirent.name,
|
||||
path: `/${relativePath}`,
|
||||
type: dirent.isDirectory() ? ("directory" as const) : ("file" as const),
|
||||
size: dirent.isFile() ? stats.size : undefined,
|
||||
modifiedAt: stats.mtimeMs,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}),
|
||||
)
|
||||
).filter((e) => e !== null);
|
||||
|
||||
return {
|
||||
files: entries,
|
||||
path: subPath || "/",
|
||||
offset: startOffset,
|
||||
limit: pageSize,
|
||||
total,
|
||||
hasMore: startOffset + pageSize < total,
|
||||
};
|
||||
} catch (error) {
|
||||
if (isNodeJSErrnoException(error) && error.code === "ENOENT") {
|
||||
throw new NotFoundError("Directory not found");
|
||||
if (!shouldRunViaAgent(volume)) {
|
||||
return await listVolumeFiles(await volumeForHost(volume), subPath, offset, limit);
|
||||
}
|
||||
|
||||
const command = await runVolumeCommand(volume.agentId, {
|
||||
name: "volume.listFiles",
|
||||
volume: await volumeForAgent(volume),
|
||||
subPath,
|
||||
offset,
|
||||
limit,
|
||||
});
|
||||
return command.result;
|
||||
} catch (error) {
|
||||
throw new InternalServerError(`Failed to list files: ${toMessage(error)}`);
|
||||
}
|
||||
};
|
||||
|
||||
const browseFilesystem = async (browsePath: string) => {
|
||||
const normalizedPath = path.normalize(browsePath);
|
||||
|
||||
try {
|
||||
const entries = await fs.readdir(normalizedPath, { withFileTypes: true });
|
||||
if (!config.flags.enableLocalAgent) {
|
||||
return await browseHostFilesystem(browsePath);
|
||||
}
|
||||
|
||||
const directories = await Promise.all(
|
||||
entries
|
||||
.filter((entry) => entry.isDirectory())
|
||||
.map(async (entry) => {
|
||||
const fullPath = path.join(normalizedPath, entry.name);
|
||||
|
||||
try {
|
||||
const stats = await fs.stat(fullPath);
|
||||
return {
|
||||
name: entry.name,
|
||||
path: fullPath,
|
||||
type: "directory" as const,
|
||||
size: undefined,
|
||||
modifiedAt: stats.mtimeMs,
|
||||
};
|
||||
} catch {
|
||||
return {
|
||||
name: entry.name,
|
||||
path: fullPath,
|
||||
type: "directory" as const,
|
||||
size: undefined,
|
||||
modifiedAt: undefined,
|
||||
};
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
return {
|
||||
directories: directories.sort((a, b) => a.name.localeCompare(b.name)),
|
||||
path: normalizedPath,
|
||||
};
|
||||
const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "filesystem.browse", path: browsePath });
|
||||
return command.result;
|
||||
} catch (error) {
|
||||
throw new InternalServerError(`Failed to browse filesystem: ${toMessage(error)}`);
|
||||
}
|
||||
|
||||
@@ -17,6 +17,17 @@ export const listVolumeFiles = async (
|
||||
) => {
|
||||
const volumePath = getVolumePath(volume);
|
||||
const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath;
|
||||
const normalizedPath = path.normalize(requestedPath);
|
||||
const requestedRelativePath = path.relative(volumePath, normalizedPath);
|
||||
|
||||
if (
|
||||
requestedRelativePath === ".." ||
|
||||
requestedRelativePath.startsWith(`..${path.sep}`) ||
|
||||
path.isAbsolute(requestedRelativePath)
|
||||
) {
|
||||
throw new Error("Invalid path");
|
||||
}
|
||||
|
||||
const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE);
|
||||
const startOffset = Math.max(offset, 0);
|
||||
|
||||
@@ -25,7 +36,7 @@ export const listVolumeFiles = async (
|
||||
const realRequestedPath = await fs.realpath(requestedPath);
|
||||
const relative = path.relative(realVolumeRoot, realRequestedPath);
|
||||
|
||||
if (relative.startsWith("..") || path.isAbsolute(relative)) {
|
||||
if (relative === ".." || relative.startsWith(`..${path.sep}`) || path.isAbsolute(relative)) {
|
||||
throw new Error("Invalid path");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user