From 2062beac686a380fadb964c649657d9dbbfe89a3 Mon Sep 17 00:00:00 2001 From: Nico <47644445+nicotsx@users.noreply.github.com> Date: Thu, 7 May 2026 20:51:25 +0200 Subject: [PATCH] refactor(server): route volume operations through agents (#862) * refactor(server): route volume operations through agents * chore: pr feedbacks --- app/server/modules/backends/backend.ts | 37 --- .../lifecycle/__tests__/shutdown.test.ts | 6 +- app/server/modules/lifecycle/shutdown.ts | 9 +- .../__tests__/volumes.controller.test.ts | 10 +- .../volumes/__tests__/volumes.service.test.ts | 176 +++++------- app/server/modules/volumes/volume.service.ts | 262 +++++++----------- apps/agent/src/volume-host/operations.ts | 13 +- 7 files changed, 196 insertions(+), 317 deletions(-) delete mode 100644 app/server/modules/backends/backend.ts diff --git a/app/server/modules/backends/backend.ts b/app/server/modules/backends/backend.ts deleted file mode 100644 index 18b536b7..00000000 --- a/app/server/modules/backends/backend.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { makeDirectoryBackend } from "../../../../apps/agent/src/volume-host/backends/directory"; -import { makeNfsBackend } from "../../../../apps/agent/src/volume-host/backends/nfs"; -import { makeRcloneBackend } from "../../../../apps/agent/src/volume-host/backends/rclone"; -import { makeSftpBackend } from "../../../../apps/agent/src/volume-host/backends/sftp"; -import { makeSmbBackend } from "../../../../apps/agent/src/volume-host/backends/smb"; -import { makeWebdavBackend } from "../../../../apps/agent/src/volume-host/backends/webdav"; -import type { VolumeBackend } from "../../../../apps/agent/src/volume-host/types"; -import type { Volume } from "../../db/schema"; -import { getVolumePath } from "../volumes/helpers"; - -export type { VolumeBackend }; - -export const createVolumeBackend = (volume: Volume, mountPath = getVolumePath(volume)): VolumeBackend => { - switch (volume.config.backend) { - case "nfs": { - return makeNfsBackend(volume.config, mountPath); - } - case "smb": { - return makeSmbBackend(volume.config, mountPath); - } - case "directory": { - return makeDirectoryBackend(volume.config, mountPath); - } - case "webdav": { - return makeWebdavBackend(volume.config, mountPath); - } - case "rclone": { - return makeRcloneBackend(volume.config, mountPath); - } - case "sftp": { - return makeSftpBackend(volume.config, mountPath); - } - default: { - throw new Error("Unsupported backend"); - } - } -}; diff --git a/app/server/modules/lifecycle/__tests__/shutdown.test.ts b/app/server/modules/lifecycle/__tests__/shutdown.test.ts index 6e8adddf..da7aa379 100644 --- a/app/server/modules/lifecycle/__tests__/shutdown.test.ts +++ b/app/server/modules/lifecycle/__tests__/shutdown.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect, test, vi } from "vitest"; import { Scheduler } from "../../../core/scheduler"; -import * as backendModule from "../../backends/backend"; -import type { VolumeBackend } from "../../backends/backend"; +import * as volumeHostModule from "../../../../../apps/agent/src/volume-host"; +import type { VolumeBackend } from "../../../../../apps/agent/src/volume-host"; import * as bootstrapModule from "../bootstrap"; import { createTestVolume } from "~/test/helpers/volume"; @@ -40,7 +40,7 @@ describe("shutdown", () => { vi.spyOn(Scheduler, "stop").mockImplementation(stopScheduler); vi.spyOn(bootstrapModule, "stopApplicationRuntime").mockImplementation(stopApplicationRuntime); - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation( + vi.spyOn(volumeHostModule, "createVolumeBackend").mockImplementation( () => ({ mount: async () => ({ status: "mounted" as const }), diff --git a/app/server/modules/lifecycle/shutdown.ts b/app/server/modules/lifecycle/shutdown.ts index 4deff2af..66ec7bf2 100644 --- a/app/server/modules/lifecycle/shutdown.ts +++ b/app/server/modules/lifecycle/shutdown.ts @@ -1,8 +1,9 @@ import { Scheduler } from "../../core/scheduler"; import { db } from "../../db/db"; import { logger } from "@zerobyte/core/node"; -import { createVolumeBackend } from "../backends/backend"; import { stopApplicationRuntime } from "./bootstrap"; +import { decryptVolumeConfig } from "../volumes/volume-config-secrets"; +import { createVolumeBackend } from "../../../../apps/agent/src/volume-host"; export const shutdown = async () => { await Scheduler.stop(); @@ -13,7 +14,11 @@ export const shutdown = async () => { }); for (const volume of volumes) { - const backend = createVolumeBackend(volume); + const backend = createVolumeBackend({ + ...volume, + config: await decryptVolumeConfig(volume.config), + provisioningId: volume.provisioningId ?? null, + }); const { status, error } = await backend.unmount(); logger.info(`Volume ${volume.name} unmount status: ${status}${error ? `, error: ${error}` : ""}`); diff --git a/app/server/modules/volumes/__tests__/volumes.controller.test.ts b/app/server/modules/volumes/__tests__/volumes.controller.test.ts index c577001b..df0f129c 100644 --- a/app/server/modules/volumes/__tests__/volumes.controller.test.ts +++ b/app/server/modules/volumes/__tests__/volumes.controller.test.ts @@ -1,18 +1,26 @@ -import { beforeAll, describe, expect, test } from "vitest"; +import { afterAll, beforeAll, describe, expect, test } from "vitest"; import { db } from "~/server/db/db"; import { volumesTable } from "~/server/db/schema"; import { createApp } from "~/server/app"; import { createTestSession, getAuthHeaders } from "~/test/helpers/auth"; import { generateShortId } from "~/server/utils/id"; +import { config } from "~/server/core/config"; const app = createApp(); let session: Awaited>; +let previousEnableLocalAgent: boolean; beforeAll(async () => { + previousEnableLocalAgent = config.flags.enableLocalAgent; + config.flags.enableLocalAgent = false; session = await createTestSession(); }); +afterAll(() => { + config.flags.enableLocalAgent = previousEnableLocalAgent; +}); + const createManagedVolumeRecord = async (organizationId: string) => { const [volume] = await db .insert(volumesTable) diff --git a/app/server/modules/volumes/__tests__/volumes.service.test.ts b/app/server/modules/volumes/__tests__/volumes.service.test.ts index 7b487801..9372df16 100644 --- a/app/server/modules/volumes/__tests__/volumes.service.test.ts +++ b/app/server/modules/volumes/__tests__/volumes.service.test.ts @@ -1,19 +1,26 @@ import { afterEach, describe, expect, test, vi } from "vitest"; +const agentManagerMock = vi.hoisted(() => ({ + runVolumeCommand: vi.fn(), +})); + +vi.mock("../../agents/agents-manager", () => ({ + agentManager: agentManagerMock, +})); + import { volumeService } from "../volume.service"; import { db } from "~/server/db/db"; import { volumesTable } from "~/server/db/schema"; import { randomUUID } from "node:crypto"; -import * as fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; import { createTestSession } from "~/test/helpers/auth"; import { withContext } from "~/server/core/request-context"; import { asShortId } from "~/server/utils/branded"; import { createTestVolume } from "~/test/helpers/volume"; -import * as backendModule from "../../backends/backend"; +import { config } from "~/server/core/config"; afterEach(() => { + config.flags.enableLocalAgent = false; vi.restoreAllMocks(); + agentManagerMock.runVolumeCommand.mockReset(); }); describe("volumeService.getVolume", () => { @@ -98,13 +105,7 @@ describe("volumeService.getVolume", () => { describe("volumeService.listFiles security", () => { test("should reject traversal outside the volume root in listFiles", async () => { const { organizationId, user } = await createTestSession(); - const tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-vol-svc-")); - const volumePath = path.join(tempRoot, "vol"); - const secretPath = path.join(tempRoot, "volume-secret"); - - await fs.mkdir(volumePath, { recursive: true }); - await fs.mkdir(secretPath, { recursive: true }); - await fs.writeFile(path.join(secretPath, "secret.txt"), "top secret", "utf-8"); + agentManagerMock.runVolumeCommand.mockRejectedValue(new Error("Invalid path")); const [volume] = await db .insert(volumesTable) @@ -113,44 +114,40 @@ describe("volumeService.listFiles security", () => { name: `test-vol-${randomUUID().slice(0, 8)}`, type: "directory", status: "mounted", - config: { backend: "directory", path: volumePath }, + config: { backend: "directory", path: "/tmp/volume" }, autoRemount: true, organizationId, }) .returning(); - try { - await withContext({ organizationId, userId: user.id }, async () => { - const traversalPath = `../${path.basename(secretPath)}`; - - await expect(volumeService.listFiles(volume.shortId, traversalPath)).rejects.toThrow("Invalid path"); - }); - } finally { - await fs.rm(tempRoot, { recursive: true, force: true }); - } + await withContext({ organizationId, userId: user.id }, async () => { + await expect(volumeService.listFiles(volume.shortId, "../volume-secret")).rejects.toThrow("Invalid path"); + }); }); }); describe("volumeService.mountVolume", () => { - test("unmounts any existing mount before mounting", async () => { + test("routes unmount and mount to the owning agent before updating state", async () => { const { organizationId, user } = await createTestSession(); - const volume = await createTestVolume({ organizationId, status: "mounted" }); - const unmount = vi.fn().mockResolvedValue({ status: "unmounted" }); - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({ - mount, - unmount, - checkHealth: vi.fn().mockResolvedValue({ status: "mounted" }), - })); + const volume = await createTestVolume({ organizationId, status: "mounted", agentId: "agent-1" }); + agentManagerMock.runVolumeCommand + .mockResolvedValueOnce({ name: "volume.unmount", result: { status: "unmounted" } }) + .mockResolvedValueOnce({ name: "volume.mount", result: { status: "mounted" } }); await withContext({ organizationId, userId: user.id }, async () => { const result = await volumeService.mountVolume(volume.shortId); expect(result.status).toBe("mounted"); - expect(unmount).toHaveBeenCalledOnce(); - expect(mount).toHaveBeenCalledOnce(); - expect(unmount.mock.invocationCallOrder[0]).toBeLessThan(mount.mock.invocationCallOrder[0]); + expect(agentManagerMock.runVolumeCommand).toHaveBeenNthCalledWith( + 1, + volume.agentId, + expect.objectContaining({ name: "volume.unmount", volume: expect.objectContaining({ id: volume.id }) }), + ); + expect(agentManagerMock.runVolumeCommand).toHaveBeenNthCalledWith( + 2, + volume.agentId, + expect.objectContaining({ name: "volume.mount", volume: expect.objectContaining({ id: volume.id }) }), + ); }); }); }); @@ -158,15 +155,8 @@ describe("volumeService.mountVolume", () => { describe("volumeService.ensureHealthyVolume", () => { test("returns ready when the mounted volume passes its health check", async () => { const { organizationId, user } = await createTestSession(); - const volume = await createTestVolume({ organizationId, status: "mounted" }); - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - const checkHealth = vi.fn().mockResolvedValue({ status: "mounted" }); - - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({ - mount, - unmount: vi.fn().mockResolvedValue({ status: "unmounted" }), - checkHealth, - })); + const volume = await createTestVolume({ organizationId, status: "mounted", agentId: "agent-1" }); + agentManagerMock.runVolumeCommand.mockResolvedValue({ name: "volume.checkHealth", result: { status: "mounted" } }); await withContext({ organizationId, userId: user.id }, async () => { const result = await volumeService.ensureHealthyVolume(volume.shortId); @@ -176,22 +166,21 @@ describe("volumeService.ensureHealthyVolume", () => { volume: expect.objectContaining({ id: volume.id, status: "mounted", lastError: null }), remounted: false, }); - expect(checkHealth).toHaveBeenCalledOnce(); - expect(mount).not.toHaveBeenCalled(); + expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledOnce(); + expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledWith( + volume.agentId, + expect.objectContaining({ name: "volume.checkHealth", volume: expect.objectContaining({ id: volume.id }) }), + ); }); }); test("auto-remounts when the mounted volume fails its health check", async () => { const { organizationId, user } = await createTestSession(); - const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: true }); - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - const checkHealth = vi.fn().mockResolvedValue({ status: "error", error: "stale mount" }); - - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({ - mount, - unmount: vi.fn().mockResolvedValue({ status: "unmounted" }), - checkHealth, - })); + const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: true, agentId: "agent-1" }); + agentManagerMock.runVolumeCommand + .mockResolvedValueOnce({ name: "volume.checkHealth", result: { status: "error", error: "stale mount" } }) + .mockResolvedValueOnce({ name: "volume.unmount", result: { status: "unmounted" } }) + .mockResolvedValueOnce({ name: "volume.mount", result: { status: "mounted" } }); await withContext({ organizationId, userId: user.id }, async () => { const result = await volumeService.ensureHealthyVolume(volume.shortId); @@ -201,8 +190,7 @@ describe("volumeService.ensureHealthyVolume", () => { volume: expect.objectContaining({ id: volume.id, status: "mounted", lastError: null }), remounted: true, }); - expect(checkHealth).toHaveBeenCalledOnce(); - expect(mount).toHaveBeenCalledOnce(); + expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledTimes(3); const updatedVolume = await db.query.volumesTable.findFirst({ where: { id: volume.id } }); expect(updatedVolume?.status).toBe("mounted"); @@ -212,15 +200,16 @@ describe("volumeService.ensureHealthyVolume", () => { test("returns not ready when the health check fails and auto-remount is disabled", async () => { const { organizationId, user } = await createTestSession(); - const volume = await createTestVolume({ organizationId, status: "mounted", autoRemount: false }); - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - const checkHealth = vi.fn().mockResolvedValue({ status: "error", error: "stale mount" }); - - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation(() => ({ - mount, - unmount: vi.fn().mockResolvedValue({ status: "unmounted" }), - checkHealth, - })); + const volume = await createTestVolume({ + organizationId, + status: "mounted", + autoRemount: false, + agentId: "agent-1", + }); + agentManagerMock.runVolumeCommand.mockResolvedValue({ + name: "volume.checkHealth", + result: { status: "error", error: "stale mount" }, + }); await withContext({ organizationId, userId: user.id }, async () => { const result = await volumeService.ensureHealthyVolume(volume.shortId); @@ -230,54 +219,17 @@ describe("volumeService.ensureHealthyVolume", () => { volume: expect.objectContaining({ id: volume.id, status: "error", lastError: "stale mount" }), reason: "stale mount", }); - expect(checkHealth).toHaveBeenCalledOnce(); - expect(mount).not.toHaveBeenCalled(); + expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledOnce(); }); }); }); describe("volumeService.testConnection", () => { - test("uses an isolated temp mount path for backend test connections", async () => { - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - const unmount = vi.fn().mockResolvedValue({ status: "unmounted" }); - const createVolumeBackendSpy = vi.spyOn(backendModule, "createVolumeBackend").mockReturnValue({ - mount, - unmount, - checkHealth: vi.fn(), - }); - - await volumeService.testConnection({ - backend: "nfs", - server: "127.0.0.1", - exportPath: "/exports/test", - version: "4", - port: 2049, - readOnly: false, - }); - - expect(createVolumeBackendSpy).toHaveBeenCalledOnce(); - const [, mountPath] = createVolumeBackendSpy.mock.calls[0]; - expect(mountPath).toEqual(expect.stringContaining(`${path.sep}zerobyte-test-`)); - await expect(fs.access(mountPath as string)).rejects.toThrow(); - expect(mount).toHaveBeenCalledOnce(); - expect(unmount).toHaveBeenCalledOnce(); - }); - - test("does not fail when backend unmount already removed the temp mount path", async () => { - const mount = vi.fn().mockResolvedValue({ status: "mounted" }); - let mountPath: string | undefined; - const unmount = vi.fn().mockImplementation(async () => { - await fs.rm(mountPath!, { recursive: true, force: true }); - return { status: "unmounted" }; - }); - - vi.spyOn(backendModule, "createVolumeBackend").mockImplementation((_volume, tempPath) => { - mountPath = tempPath; - return { - mount, - unmount, - checkHealth: vi.fn(), - }; + test("routes test connections to the local agent", async () => { + config.flags.enableLocalAgent = true; + agentManagerMock.runVolumeCommand.mockResolvedValue({ + name: "volume.testConnection", + result: { success: true, message: "Connection successful" }, }); await expect( @@ -294,9 +246,9 @@ describe("volumeService.testConnection", () => { message: "Connection successful", }); - expect(mountPath).toEqual(expect.stringContaining(`${path.sep}zerobyte-test-`)); - await expect(fs.access(mountPath as string)).rejects.toThrow(); - expect(mount).toHaveBeenCalledOnce(); - expect(unmount).toHaveBeenCalledOnce(); + expect(agentManagerMock.runVolumeCommand).toHaveBeenCalledWith( + "local", + expect.objectContaining({ name: "volume.testConnection" }), + ); }); }); diff --git a/app/server/modules/volumes/volume.service.ts b/app/server/modules/volumes/volume.service.ts index 074433e0..d53cff21 100644 --- a/app/server/modules/volumes/volume.service.ts +++ b/app/server/modules/volumes/volume.service.ts @@ -1,38 +1,34 @@ -import * as fs from "node:fs/promises"; -import * as os from "node:os"; -import * as path from "node:path"; import { and, eq } from "drizzle-orm"; import { BadRequestError, InternalServerError, NotFoundError } from "http-errors-enhanced"; import { db } from "../../db/db"; import { volumesTable } from "../../db/schema"; import { toMessage } from "../../utils/errors"; import { generateShortId } from "../../utils/id"; -import { getStatFs, type StatFs } from "../../utils/mountinfo"; +import type { StatFs } from "../../utils/mountinfo"; import { withTimeout } from "../../utils/timeout"; -import { createVolumeBackend } from "../backends/backend"; +import { config } from "../../core/config"; import { LOCAL_AGENT_ID } from "../agents/constants"; +import { agentManager } from "../agents/agents-manager"; import type { UpdateVolumeBody } from "./volume.dto"; -import { getVolumePath } from "./helpers"; import { logger } from "@zerobyte/core/node"; import { serverEvents } from "../../core/events"; import type { Volume } from "../../db/schema"; import { volumeConfigSchema, type BackendConfig } from "~/schemas/volumes"; import { getOrganizationId } from "~/server/core/request-context"; -import { isNodeJSErrnoException } from "~/server/utils/fs"; -import { asShortId, type ShortId } from "~/server/utils/branded"; +import { type ShortId } from "~/server/utils/branded"; import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets"; +import type { VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol"; +import { createVolumeBackend, getStatFs, getVolumePath } from "../../../../apps/agent/src/volume-host"; +import { + browseFilesystem as browseHostFilesystem, + listVolumeFiles, + testVolumeConnection, +} from "../../../../apps/agent/src/volume-host/operations"; +import { Effect } from "effect"; type EnsureHealthyVolumeResult = - | { - ready: true; - volume: Volume; - remounted: boolean; - } - | { - ready: false; - volume: Volume; - reason: string; - }; + | { ready: true; volume: Volume; remounted: boolean } + | { ready: false; volume: Volume; reason: string }; const listVolumes = async () => { const organizationId = getOrganizationId(); @@ -53,6 +49,53 @@ const findVolume = async (shortId: ShortId) => { }); }; +const runVolumeCommand = async (agentId: string, command: TCommand) => { + const result = await agentManager.runVolumeCommand(agentId, command); + if (result.name !== command.name) { + throw new InternalServerError(`Unexpected agent response for ${command.name}`); + } + + return result as Extract; +}; + +const volumeForAgent = async (volume: Volume): Promise => ({ + ...volume, + config: await decryptVolumeConfig(volume.config), +}); + +const volumeForHost = async (volume: Volume): Promise => ({ + ...volume, + shortId: volume.shortId, + config: await decryptVolumeConfig(volume.config), + provisioningId: volume.provisioningId ?? null, +}); + +// TODO(agent-rollout): Remove the local host execution branch once all installs run volume operations through agents. +const shouldRunViaAgent = (volume: Volume) => volume.agentId !== LOCAL_AGENT_ID || config.flags.enableLocalAgent; + +const runVolumeBackendCommand = async ( + volume: Volume, + name: "volume.mount" | "volume.unmount" | "volume.checkHealth", +) => { + if (!shouldRunViaAgent(volume)) { + const backend = createVolumeBackend(await volumeForHost(volume)); + switch (name) { + case "volume.mount": + return backend.mount(); + case "volume.unmount": + return backend.unmount(); + case "volume.checkHealth": + return backend.checkHealth(); + } + } + + const command = await runVolumeCommand(volume.agentId, { + name, + volume: await volumeForAgent(volume), + }); + return command.result; +}; + const createVolume = async (name: string, backendConfig: BackendConfig) => { const organizationId = getOrganizationId(); const trimmedName = name.trim(); @@ -80,8 +123,7 @@ const createVolume = async (name: string, backendConfig: BackendConfig) => { throw new InternalServerError("Failed to create volume"); } - const backend = createVolumeBackend({ ...created, config: await decryptVolumeConfig(created.config) }); - const { error, status } = await backend.mount(); + const { error, status } = await runVolumeBackendCommand(created, "volume.mount"); await db .update(volumesTable) @@ -99,8 +141,7 @@ const deleteVolume = async (shortId: ShortId) => { throw new NotFoundError("Volume not found"); } - const backend = createVolumeBackend(volume); - await backend.unmount(); + await runVolumeBackendCommand(volume, "volume.unmount"); await db .delete(volumesTable) .where(and(eq(volumesTable.id, volume.id), eq(volumesTable.organizationId, organizationId))); @@ -114,9 +155,8 @@ const mountVolume = async (shortId: ShortId) => { throw new NotFoundError("Volume not found"); } - const backend = createVolumeBackend({ ...volume, config: await decryptVolumeConfig(volume.config) }); - await backend.unmount(); - const { error, status } = await backend.mount(); + await runVolumeBackendCommand(volume, "volume.unmount"); + const { error, status } = await runVolumeBackendCommand(volume, "volume.mount"); await db .update(volumesTable) @@ -138,8 +178,7 @@ const unmountVolume = async (shortId: ShortId) => { throw new NotFoundError("Volume not found"); } - const backend = createVolumeBackend(volume); - const { status, error } = await backend.unmount(); + const { status, error } = await runVolumeBackendCommand(volume, "volume.unmount"); await db .update(volumesTable) @@ -162,7 +201,16 @@ const getVolume = async (shortId: ShortId) => { let statfs: Partial = {}; if (volume.status === "mounted") { - statfs = await withTimeout(getStatFs(getVolumePath(volume)), 1000, "getStatFs").catch((error) => { + statfs = await withTimeout( + shouldRunViaAgent(volume) + ? runVolumeCommand(volume.agentId, { + name: "volume.statfs", + volume: await volumeForAgent(volume), + }).then((command) => command.result) + : volumeForHost(volume).then((hostVolume) => getStatFs(getVolumePath(hostVolume))), + 1000, + "volume.statfs", + ).catch((error) => { logger.warn(`Failed to get statfs for volume ${volume.name}: ${toMessage(error)}`); return {}; }); @@ -190,8 +238,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => { if (configChanged) { logger.debug("Unmounting existing volume before applying new config"); - const backend = createVolumeBackend(existing); - await backend.unmount(); + await runVolumeBackendCommand(existing, "volume.unmount"); } const newConfigResult = volumeConfigSchema.safeParse(volumeData.config || existing.config); @@ -219,8 +266,7 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => { } if (configChanged) { - const backend = createVolumeBackend({ ...updated, config: await decryptVolumeConfig(updated.config) }); - const { error, status } = await backend.mount(); + const { error, status } = await runVolumeBackendCommand(updated, "volume.mount"); await db .update(volumesTable) .set({ status, lastError: error ?? null, lastHealthCheck: Date.now() }) @@ -233,38 +279,12 @@ const updateVolume = async (shortId: ShortId, volumeData: UpdateVolumeBody) => { }; const testConnection = async (backendConfig: BackendConfig) => { - const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-test-")); - try { - const mockVolume = { - id: 0, - shortId: asShortId("test"), - name: "test-connection", - path: tempDir, - config: backendConfig, - createdAt: Date.now(), - updatedAt: Date.now(), - lastHealthCheck: Date.now(), - type: backendConfig.backend, - status: "unmounted" as const, - lastError: null, - provisioningId: null, - autoRemount: true, - agentId: LOCAL_AGENT_ID, - organizationId: "test-org", - }; - - const backend = createVolumeBackend(mockVolume, tempDir); - const { error } = await backend.mount(); - - await backend.unmount(); - - return { - success: !error, - message: error ? toMessage(error) : "Connection successful", - }; - } finally { - await fs.rm(tempDir, { recursive: true, force: true }); + if (!config.flags.enableLocalAgent) { + return Effect.runPromise(testVolumeConnection(backendConfig)); } + + const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "volume.testConnection", backendConfig }); + return command.result; }; const checkHealth = async (shortId: ShortId) => { @@ -275,8 +295,7 @@ const checkHealth = async (shortId: ShortId) => { throw new NotFoundError("Volume not found"); } - const backend = createVolumeBackend(volume); - const { error, status } = await backend.checkHealth(); + const { error, status } = await runVolumeBackendCommand(volume, "volume.checkHealth"); if (status !== volume.status) { serverEvents.emit("volume:status_changed", { organizationId, volumeName: volume.name, status }); @@ -344,7 +363,6 @@ const ensureHealthyVolume = async (shortId: ShortId): Promise { const volume = await findVolume(shortId); @@ -357,110 +375,32 @@ const listFiles = async (shortId: ShortId, subPath?: string, offset: number = 0, throw new InternalServerError("Volume is not mounted"); } - const volumePath = getVolumePath(volume); - const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath; - const normalizedPath = path.normalize(requestedPath); - const relative = path.relative(volumePath, normalizedPath); - - if (relative.startsWith("..") || path.isAbsolute(relative)) { - throw new BadRequestError("Invalid path"); - } - - const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE); - const startOffset = Math.max(offset, 0); - try { - const dirents = await fs.readdir(normalizedPath, { withFileTypes: true }); - - dirents.sort((a, b) => { - const aIsDir = a.isDirectory(); - const bIsDir = b.isDirectory(); - - if (aIsDir === bIsDir) { - return a.name.localeCompare(b.name); - } - return aIsDir ? -1 : 1; - }); - - const total = dirents.length; - const paginatedDirents = dirents.slice(startOffset, startOffset + pageSize); - - const entries = ( - await Promise.all( - paginatedDirents.map(async (dirent) => { - const fullPath = path.join(normalizedPath, dirent.name); - - try { - const stats = await fs.stat(fullPath); - const relativePath = path.relative(volumePath, fullPath); - - return { - name: dirent.name, - path: `/${relativePath}`, - type: dirent.isDirectory() ? ("directory" as const) : ("file" as const), - size: dirent.isFile() ? stats.size : undefined, - modifiedAt: stats.mtimeMs, - }; - } catch { - return null; - } - }), - ) - ).filter((e) => e !== null); - - return { - files: entries, - path: subPath || "/", - offset: startOffset, - limit: pageSize, - total, - hasMore: startOffset + pageSize < total, - }; - } catch (error) { - if (isNodeJSErrnoException(error) && error.code === "ENOENT") { - throw new NotFoundError("Directory not found"); + if (!shouldRunViaAgent(volume)) { + return await listVolumeFiles(await volumeForHost(volume), subPath, offset, limit); } + + const command = await runVolumeCommand(volume.agentId, { + name: "volume.listFiles", + volume: await volumeForAgent(volume), + subPath, + offset, + limit, + }); + return command.result; + } catch (error) { throw new InternalServerError(`Failed to list files: ${toMessage(error)}`); } }; const browseFilesystem = async (browsePath: string) => { - const normalizedPath = path.normalize(browsePath); - try { - const entries = await fs.readdir(normalizedPath, { withFileTypes: true }); + if (!config.flags.enableLocalAgent) { + return await browseHostFilesystem(browsePath); + } - const directories = await Promise.all( - entries - .filter((entry) => entry.isDirectory()) - .map(async (entry) => { - const fullPath = path.join(normalizedPath, entry.name); - - try { - const stats = await fs.stat(fullPath); - return { - name: entry.name, - path: fullPath, - type: "directory" as const, - size: undefined, - modifiedAt: stats.mtimeMs, - }; - } catch { - return { - name: entry.name, - path: fullPath, - type: "directory" as const, - size: undefined, - modifiedAt: undefined, - }; - } - }), - ); - - return { - directories: directories.sort((a, b) => a.name.localeCompare(b.name)), - path: normalizedPath, - }; + const command = await runVolumeCommand(LOCAL_AGENT_ID, { name: "filesystem.browse", path: browsePath }); + return command.result; } catch (error) { throw new InternalServerError(`Failed to browse filesystem: ${toMessage(error)}`); } diff --git a/apps/agent/src/volume-host/operations.ts b/apps/agent/src/volume-host/operations.ts index 527bee97..e2ad0f7e 100644 --- a/apps/agent/src/volume-host/operations.ts +++ b/apps/agent/src/volume-host/operations.ts @@ -17,6 +17,17 @@ export const listVolumeFiles = async ( ) => { const volumePath = getVolumePath(volume); const requestedPath = subPath ? path.join(volumePath, subPath) : volumePath; + const normalizedPath = path.normalize(requestedPath); + const requestedRelativePath = path.relative(volumePath, normalizedPath); + + if ( + requestedRelativePath === ".." || + requestedRelativePath.startsWith(`..${path.sep}`) || + path.isAbsolute(requestedRelativePath) + ) { + throw new Error("Invalid path"); + } + const pageSize = Math.min(Math.max(limit, 1), MAX_PAGE_SIZE); const startOffset = Math.max(offset, 0); @@ -25,7 +36,7 @@ export const listVolumeFiles = async ( const realRequestedPath = await fs.realpath(requestedPath); const relative = path.relative(realVolumeRoot, realRequestedPath); - if (relative.startsWith("..") || path.isAbsolute(relative)) { + if (relative === ".." || relative.startsWith(`..${path.sep}`) || path.isAbsolute(relative)) { throw new Error("Invalid path"); }