refactor: dedpulicate volume schemas across packages (#864)

This commit is contained in:
Nico
2026-05-09 15:36:25 +02:00
committed by GitHub
parent 2ada5acd5a
commit aa7da321ba
32 changed files with 341 additions and 298 deletions

View File

@@ -1,5 +1,5 @@
import { Cloud, Folder, Server } from "lucide-react";
import type { BackendType } from "~/schemas/volumes";
import type { BackendType } from "@zerobyte/contracts/volumes";
type VolumeIconProps = {
backend: BackendType;

View File

@@ -25,7 +25,7 @@ import {
smbConfigSchema,
volumeConfigSchema,
webdavConfigSchema,
} from "~/schemas/volumes";
} from "@zerobyte/contracts/volumes";
import { testConnectionMutation } from "../../../api-client/@tanstack/react-query.gen";
import { Tooltip, TooltipContent, TooltipTrigger } from "../../../components/ui/tooltip";
import { useSystemInfo } from "~/client/hooks/use-system-info";

View File

@@ -10,7 +10,7 @@ import type {
ResticStatsDto,
} from "@zerobyte/core/restic";
import type { BackupWebhooks } from "@zerobyte/core/backup-hooks";
import type { BackendConfig, BackendStatus, BackendType } from "~/schemas/volumes";
import type { BackendConfig, BackendStatus, BackendType } from "@zerobyte/contracts/volumes";
import type { NotificationConfig, NotificationType } from "~/schemas/notifications";
import type { ShortId } from "~/server/utils/branded";
import { LOCAL_AGENT_ID } from "../modules/agents/constants";

View File

@@ -3,6 +3,7 @@ import { afterEach, expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { createAgentMessage } from "@zerobyte/contracts/agent-protocol";
import type { Volume } from "@zerobyte/contracts/volumes";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
const agentsServiceMocks = vi.hoisted(() => ({
@@ -36,6 +37,22 @@ const createSocket = (id: string, agentId = LOCAL_AGENT_ID) => ({
close: vi.fn(),
});
const backupVolume = {
id: 1,
shortId: "volume-1",
name: "Volume 1",
config: { backend: "directory", path: "/tmp" },
createdAt: 0,
updatedAt: 0,
lastHealthCheck: 0,
type: "directory",
status: "mounted" as const,
lastError: null,
autoRemount: true,
agentId: LOCAL_AGENT_ID,
organizationId: "org-1",
} satisfies Volume;
type CapturedFetch = NonNullable<Parameters<typeof Bun.serve>[0]["fetch"]>;
const invokeFetch = (fetch: CapturedFetch | undefined, request: Request, srv: Parameters<CapturedFetch>[1]) => {
@@ -223,6 +240,7 @@ test("closing a replaced connection reports disconnect without marking the activ
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {
@@ -253,6 +271,7 @@ test("sendBackup is only delivered after the agent is ready", async () => {
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {

View File

@@ -3,6 +3,7 @@ import { expect, test, vi } from "vitest";
import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { createAgentMessage, type AgentMessage } from "@zerobyte/contracts/agent-protocol";
import type { Volume } from "@zerobyte/contracts/volumes";
import { LOCAL_AGENT_ID, LOCAL_AGENT_KIND, LOCAL_AGENT_NAME } from "../constants";
import { createControllerAgentSession } from "../controller/session";
@@ -51,6 +52,22 @@ const createSession = (
}
};
const backupVolume = {
id: 1,
shortId: "volume-1",
name: "Volume 1",
config: { backend: "directory", path: "/tmp" },
createdAt: 0,
updatedAt: 0,
lastHealthCheck: 0,
type: "directory",
status: "mounted" as const,
lastError: null,
autoRemount: true,
agentId: LOCAL_AGENT_ID,
organizationId: "org-1",
} satisfies Volume;
test("closing the session scope interrupts the session runner", async () => {
const { run, closeAsync } = createSession();
const fiber = run();
@@ -85,6 +102,7 @@ test("sendBackup only queues the transport message", () => {
scheduleId: "schedule-queued",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: {
backend: "local",
path: "/tmp/repository",

View File

@@ -12,7 +12,7 @@ import * as spawnModule from "@zerobyte/core/node";
import type { SafeSpawnParams } from "@zerobyte/core/node";
import { logger } from "@zerobyte/core/node";
import { restic } from "~/server/core/restic";
import { NotFoundError, BadRequestError } from "http-errors-enhanced";
import { NotFoundError } from "http-errors-enhanced";
import { fromAny } from "@total-typescript/shoehorn";
import { scheduleQueries } from "../backups.queries";
import { repositoriesService } from "~/server/modules/repositories/repositories.service";
@@ -23,6 +23,7 @@ import { createAgentBackupMocks } from "~/test/helpers/agent-mock";
import { getScheduleByIdOrShortId } from "../helpers/backup-schedule-lookups";
import { volumeService } from "~/server/modules/volumes/volume.service";
import { db } from "~/server/db/db";
import { config } from "~/server/core/config";
const setup = () => {
const resticBackupMock = vi.fn((_: SafeSpawnParams) =>
@@ -49,31 +50,33 @@ const setup = () => {
vi.spyOn(agentManager, "runBackup").mockImplementation(runBackupMock);
vi.spyOn(agentManager, "cancelBackup").mockImplementation(cancelBackupMock);
vi.spyOn(context, "getOrganizationId").mockReturnValue(TEST_ORG_ID);
const ensureHealthyVolumeMock = vi.spyOn(volumeService, "ensureHealthyVolume").mockImplementation(async (shortId) => {
const volume = await db.query.volumesTable.findFirst({
where: {
AND: [{ shortId: { eq: shortId } }, { organizationId: TEST_ORG_ID }],
},
});
const ensureHealthyVolumeMock = vi
.spyOn(volumeService, "ensureHealthyVolume")
.mockImplementation(async (shortId) => {
const volume = await db.query.volumesTable.findFirst({
where: {
AND: [{ shortId: { eq: shortId } }, { organizationId: TEST_ORG_ID }],
},
});
if (!volume) {
throw new NotFoundError("Volume not found");
}
if (!volume) {
throw new NotFoundError("Volume not found");
}
if (volume.status !== "mounted") {
return {
ready: false as const,
volume,
reason: "Volume is not mounted",
};
}
if (volume.status !== "mounted") {
return {
ready: false as const,
ready: true as const,
volume,
reason: "Volume is not mounted",
remounted: false,
};
}
return {
ready: true as const,
volume,
remounted: false,
};
});
});
return {
resticBackupMock,
@@ -88,10 +91,11 @@ const setup = () => {
afterEach(() => {
vi.restoreAllMocks();
config.flags.enableLocalAgent = true;
});
describe("backup execution - validation failures", () => {
test("should fail backup when volume is not mounted", async () => {
test("does not fail validation when the agent runtime owns volume readiness", async () => {
// arrange
const { resticBackupMock } = setup();
const volume = await createTestVolume({ status: "unmounted" });
@@ -105,55 +109,7 @@ describe("backup execution - validation failures", () => {
const result = await backupsService.validateBackupExecution(schedule.id);
// assert
expect(result.type).toBe("failure");
if (result.type === "failure") {
expect(result.error).toBeInstanceOf(BadRequestError);
expect(result.error.message).toBe("Volume is not mounted");
}
expect(resticBackupMock).not.toHaveBeenCalled();
});
test("runs a preflight volume health check before starting a backup", async () => {
setup();
const volume = await createTestVolume();
const repository = await createTestRepository();
const schedule = await createTestBackupSchedule({
volumeId: volume.id,
repositoryId: repository.id,
});
const ensureHealthyVolumeSpy = vi.spyOn(volumeService, "ensureHealthyVolume").mockResolvedValue({
ready: true,
volume,
remounted: false,
});
const result = await backupsService.validateBackupExecution(schedule.id);
expect(result.type).toBe("success");
expect(ensureHealthyVolumeSpy).toHaveBeenCalledWith(volume.shortId);
});
test("fails validation when the preflight health check cannot recover the volume", async () => {
const { resticBackupMock } = setup();
const volume = await createTestVolume();
const repository = await createTestRepository();
const schedule = await createTestBackupSchedule({
volumeId: volume.id,
repositoryId: repository.id,
});
vi.spyOn(volumeService, "ensureHealthyVolume").mockResolvedValue({
ready: false,
volume: { ...volume, status: "error", lastError: "stale mount" },
reason: "stale mount",
});
const result = await backupsService.validateBackupExecution(schedule.id);
expect(result.type).toBe("failure");
if (result.type === "failure") {
expect(result.error).toBeInstanceOf(BadRequestError);
expect(result.error.message).toBe("stale mount");
}
expect(resticBackupMock).not.toHaveBeenCalled();
});
@@ -461,7 +417,10 @@ describe("backup execution - routing", () => {
await backupsService.executeBackup(schedule.id);
expect(runBackupMock).toHaveBeenCalledWith("agent-remote", expect.objectContaining({ scheduleId: schedule.id }));
expect(runBackupMock).toHaveBeenCalledWith(
"agent-remote",
expect.objectContaining({ scheduleId: schedule.id }),
);
});
});
@@ -874,7 +833,9 @@ describe("retention policy - runForget", () => {
});
// act & assert
await expect(backupsService.runForget(schedule.id, "non-existent-repo")).rejects.toThrow("Repository not found");
await expect(backupsService.runForget(schedule.id, "non-existent-repo")).rejects.toThrow(
"Repository not found",
);
});
});

View File

@@ -7,6 +7,7 @@ import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import { agentManager, type BackupExecutionProgress } from "../agents/agents-manager";
import { LOCAL_AGENT_ID } from "../agents/constants";
import { getVolumePath } from "../volumes/helpers";
import { decryptVolumeConfig } from "../volumes/volume-config-secrets";
import { decryptRepositoryConfig } from "../repositories/repository-config-secrets";
import { createBackupOptions } from "./backup.helpers";
import { toErrorDetails } from "../../utils/errors";
@@ -43,7 +44,9 @@ const createBackupRunPayload = async ({
repository,
organizationId,
}: BackupExecutionRequest & { jobId: string }): Promise<BackupRunPayload> => {
// TODO: compute the source path on the agent so backup payloads do not carry controller-local paths.
const sourcePath = getVolumePath(volume);
const agentVolume = { ...volume, config: await decryptVolumeConfig(volume.config) };
const { signal: _, ...options } = createBackupOptions(schedule, sourcePath);
if (FUSE_VOLUME_BACKENDS.has(volume.type) && !options.customResticParams.includes(IGNORE_INODE_FLAG)) {
@@ -59,6 +62,7 @@ const createBackupRunPayload = async ({
scheduleId: schedule.shortId,
organizationId,
sourcePath,
volume: agentVolume,
repositoryConfig,
options: {
...options,

View File

@@ -1,7 +1,7 @@
import { z } from "zod";
import { describeRoute, resolver } from "hono-openapi";
import { backupWebhooksSchema } from "@zerobyte/core/backup-hooks";
import { volumeSchema } from "../volumes/volume.dto";
import { publicVolumeSchema } from "@zerobyte/contracts/volumes";
import { repositorySchema } from "../repositories/repositories.dto";
import { backupProgressEventSchema } from "~/schemas/events-dto";
@@ -39,7 +39,7 @@ const backupScheduleSchema = z.object({
nextBackupAt: z.number().nullable(),
createdAt: z.number(),
updatedAt: z.number(),
volume: volumeSchema,
volume: publicVolumeSchema,
repository: repositorySchema,
});

View File

@@ -13,6 +13,8 @@ import { scheduleQueries } from "../backups.queries";
import type { BackupExecutionProgress } from "../../agents/agents-manager";
import { repositoriesService } from "../../repositories/repositories.service";
import { volumeService } from "../../volumes/volume.service";
import { config } from "../../../core/config";
import { LOCAL_AGENT_ID } from "../../agents/constants";
import { copyToMirrors, runForget } from "./backup-maintenance";
interface BackupContext {
@@ -40,6 +42,9 @@ type ValidationSkipped = {
type ValidationResult = ValidationSuccess | ValidationFailure | ValidationSkipped;
const requiresControllerLocalVolumeReadiness = (volume: Volume) =>
volume.agentId === LOCAL_AGENT_ID && !config.flags.enableLocalAgent;
export function getBackupProgress(scheduleId: number): BackupProgressEventDto | undefined {
return cache.get<BackupProgressEventDto>(cacheKeys.backup.progress(scheduleId));
}
@@ -72,6 +77,13 @@ export async function validateBackupExecution(scheduleId: number, manual = false
return { type: "failure", error: new NotFoundError("Repository not found"), partialContext: { schedule, volume } };
}
if (!requiresControllerLocalVolumeReadiness(volume)) {
return {
type: "success",
context: { schedule, volume, repository, organizationId },
};
}
const volumeReadiness = await volumeService.ensureHealthyVolume(volume.shortId);
if (!volumeReadiness.ready) {

View File

@@ -13,7 +13,7 @@ import { logger } from "@zerobyte/core/node";
import { toMessage } from "~/server/utils/errors";
import { cryptoUtils } from "~/server/utils/crypto";
import type { RepositoryConfig } from "@zerobyte/core/restic";
import type { BackendConfig } from "~/schemas/volumes";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import type { NotificationConfig } from "~/schemas/notifications";
import { RESTIC_PASS_FILE } from "~/server/core/constants";
import { symmetricDecrypt, symmetricEncrypt } from "better-auth/crypto";

View File

@@ -16,7 +16,7 @@ import { repositoriesTable, volumesTable } from "~/server/db/schema";
import { LOCAL_AGENT_ID } from "~/server/modules/agents/constants";
import { mapRepositoryConfigSecrets } from "~/server/modules/repositories/repository-config-secrets";
import { mapVolumeConfigSecrets } from "~/server/modules/volumes/volume-config-secrets";
import { BACKEND_TYPES, volumeConfigSchema, type BackendConfig } from "~/schemas/volumes";
import { BACKEND_TYPES, volumeConfigSchema, type BackendConfig } from "@zerobyte/contracts/volumes";
import { cryptoUtils } from "~/server/utils/crypto";
import { toMessage } from "~/server/utils/errors";
import { generateShortId } from "~/server/utils/id";

View File

@@ -1,4 +1,4 @@
import type { BackendConfig } from "~/schemas/volumes";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { cryptoUtils, transformOptionalSecret, type SecretTransformer } from "~/server/utils/crypto";
export const mapVolumeConfigSecrets = async (

View File

@@ -1,21 +1,16 @@
import { z } from "zod";
import { describeRoute, resolver } from "hono-openapi";
import { BACKEND_STATUS, BACKEND_TYPES, volumeConfigSchema } from "~/schemas/volumes";
import {
browseFilesystemResponseSchema,
listVolumeFilesResponseSchema,
publicVolumeSchema,
statfsSchema,
testVolumeConnectionResponseSchema,
volumeConfigSchema,
volumeOperationResultSchema,
} from "@zerobyte/contracts/volumes";
export const volumeSchema = z.object({
id: z.number(),
shortId: z.string(),
provisioningId: z.string().nullable(),
name: z.string(),
type: z.enum(BACKEND_TYPES),
status: z.enum(BACKEND_STATUS),
lastError: z.string().nullable(),
createdAt: z.number(),
updatedAt: z.number(),
lastHealthCheck: z.number(),
config: volumeConfigSchema,
autoRemount: z.boolean(),
});
const volumeSchema = publicVolumeSchema;
const listVolumesResponse = volumeSchema.array();
export type ListVolumesDto = z.infer<typeof listVolumesResponse>;
@@ -80,12 +75,6 @@ export const deleteVolumeDto = describeRoute({
},
});
const statfsSchema = z.object({
total: z.number(),
used: z.number(),
free: z.number(),
});
const getVolumeResponse = z.object({
volume: volumeSchema,
statfs: statfsSchema,
@@ -146,10 +135,7 @@ export const testConnectionBody = z.object({
config: volumeConfigSchema,
});
const testConnectionResponse = z.object({
success: z.boolean(),
message: z.string(),
});
const testConnectionResponse = testVolumeConnectionResponseSchema;
export const testConnectionDto = describeRoute({
description: "Test connection to backend",
@@ -167,10 +153,7 @@ export const testConnectionDto = describeRoute({
},
});
const mountVolumeResponse = z.object({
error: z.string().optional(),
status: z.enum(BACKEND_STATUS),
});
const mountVolumeResponse = volumeOperationResultSchema;
export const mountVolumeDto = describeRoute({
description: "Mount a volume",
@@ -188,10 +171,7 @@ export const mountVolumeDto = describeRoute({
},
});
const unmountVolumeResponse = z.object({
error: z.string().optional(),
status: z.enum(BACKEND_STATUS),
});
const unmountVolumeResponse = volumeOperationResultSchema;
export const unmountVolumeDto = describeRoute({
description: "Unmount a volume",
@@ -209,10 +189,7 @@ export const unmountVolumeDto = describeRoute({
},
});
const healthCheckResponse = z.object({
error: z.string().optional(),
status: z.enum(BACKEND_STATUS),
});
const healthCheckResponse = volumeOperationResultSchema;
export const healthCheckDto = describeRoute({
description: "Perform a health check on a volume",
@@ -233,22 +210,7 @@ export const healthCheckDto = describeRoute({
},
});
const fileEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.enum(["file", "directory"]),
size: z.number().optional(),
modifiedAt: z.number().optional(),
});
const listFilesResponse = z.object({
files: fileEntrySchema.array(),
path: z.string(),
offset: z.number(),
limit: z.number(),
total: z.number(),
hasMore: z.boolean(),
});
const listFilesResponse = listVolumeFilesResponseSchema;
export type ListFilesDto = z.infer<typeof listFilesResponse>;
export const listFilesQuery = z.object({
@@ -273,10 +235,7 @@ export const listFilesDto = describeRoute({
},
});
const browseFilesystemResponse = z.object({
directories: fileEntrySchema.array(),
path: z.string(),
});
const browseFilesystemResponse = browseFilesystemResponseSchema;
export type BrowseFilesystemDto = z.infer<typeof browseFilesystemResponse>;
export const browseFilesystemDto = describeRoute({

View File

@@ -13,23 +13,18 @@ import type { UpdateVolumeBody } from "./volume.dto";
import { logger } from "@zerobyte/core/node";
import { serverEvents } from "../../core/events";
import type { Volume } from "../../db/schema";
import { volumeConfigSchema, type BackendConfig } from "~/schemas/volumes";
import { volumeConfigSchema, type BackendConfig, type Volume as AgentVolume } from "@zerobyte/contracts/volumes";
import { Effect } from "effect";
import { getOrganizationId } from "~/server/core/request-context";
import { type ShortId } from "~/server/utils/branded";
import { decryptVolumeConfig, encryptVolumeConfig } from "./volume-config-secrets";
import type { VolumeCommand, VolumeCommandResult } from "@zerobyte/contracts/agent-protocol";
import {
createVolumeBackend,
getStatFs,
getVolumePath,
type AgentVolume,
} from "../../../../apps/agent/src/volume-host";
import { createVolumeBackend, getStatFs, getVolumePath } from "../../../../apps/agent/src/volume-host";
import {
browseFilesystem as browseHostFilesystem,
listVolumeFiles,
testVolumeConnection,
} from "../../../../apps/agent/src/volume-host/operations";
import { Effect } from "effect";
type EnsureHealthyVolumeResult =
| { ready: true; volume: Volume; remounted: boolean }
@@ -70,6 +65,7 @@ const volumeForAgent = async (volume: Volume): Promise<Volume> => ({
const volumeForHost = async (volume: Volume): Promise<AgentVolume> => ({
...volume,
shortId: volume.shortId,
config: await decryptVolumeConfig(volume.config),
provisioningId: volume.provisioningId ?? null,
});

View File

@@ -34,6 +34,21 @@ test("emits backup.failed when a backup command hits a restic error", async () =
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/missing-source",
volume: {
id: 1,
shortId: "volume-1",
name: "Volume 1",
config: { backend: "directory", path: "/tmp" },
createdAt: 0,
updatedAt: 0,
lastHealthCheck: 0,
type: "directory",
status: "mounted",
lastError: null,
autoRemount: true,
agentId: "local",
organizationId: "org-1",
},
repositoryConfig: {
backend: "local",
path: "/tmp/test-repository",

View File

@@ -40,6 +40,21 @@ const createRunPayload = (overrides: Partial<BackupRunPayload> = {}) =>
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: {
id: 1,
shortId: "volume-1",
name: "Volume 1",
config: { backend: "directory", path: "/tmp" },
createdAt: 0,
updatedAt: 0,
lastHealthCheck: 0,
type: "directory",
status: "mounted",
lastError: null,
autoRemount: true,
agentId: "local",
organizationId: "org-1",
},
repositoryConfig: {
backend: "local",
path: "/tmp/repository",
@@ -325,6 +340,21 @@ test("waits for running-job registration before returning to the processor loop"
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: {
id: 1,
shortId: "volume-1",
name: "Volume 1",
config: { backend: "directory", path: "/tmp" },
createdAt: 0,
updatedAt: 0,
lastHealthCheck: 0,
type: "directory",
status: "mounted",
lastError: null,
autoRemount: true,
agentId: "local",
organizationId: "org-1",
},
repositoryConfig: {
backend: "local",
path: "/tmp/repository",

View File

@@ -1,11 +1,51 @@
import { Effect, Runtime } from "effect";
import { Data, Effect, Runtime } from "effect";
import { createAgentMessage, type BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import type { Volume } from "@zerobyte/contracts/volumes";
import { runBackupLifecycle } from "@zerobyte/core/backup-hooks";
import { logger } from "@zerobyte/core/node";
import { type ResticDeps } from "@zerobyte/core/restic";
import { createRestic } from "@zerobyte/core/restic/server";
import { toMessage } from "@zerobyte/core/utils";
import type { ControllerCommandContext } from "../context";
import { createVolumeBackend } from "../volume-host";
class VolumeReadinessError extends Data.TaggedError("VolumeReadinessError")<{
readonly _tag: "VolumeReadinessError";
message: string;
}> {}
const ensureHealthyVolume = (volume: Volume) =>
Effect.gen(function* () {
if (volume.status === "unmounted") {
return yield* new VolumeReadinessError({
message: `Volume ${volume.name} is not mounted`,
});
}
const backend = createVolumeBackend(volume);
let failureReason = volume.lastError ?? "Volume health check failed";
if (volume.status !== "error") {
const health = yield* Effect.promise(() => backend.checkHealth());
if (health.status === "mounted") {
return;
}
failureReason = health.error ?? failureReason;
}
if (!volume.autoRemount) {
return yield* new VolumeReadinessError({ message: failureReason });
}
logger.warn(
`${volume.name} is not healthy. Auto-remount is enabled, attempting to remount. Reason: ${failureReason}`,
);
const remount = yield* Effect.promise(() => backend.mount());
if (remount.status !== "mounted") {
return yield* new VolumeReadinessError({ message: remount.error ?? failureReason });
}
});
export const handleBackupRunCommand = (context: ControllerCommandContext, payload: BackupRunPayload) => {
return Effect.gen(function* () {
@@ -57,6 +97,8 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa
const restic = createRestic(deps);
const runtime = yield* Effect.runtime<never>();
yield* ensureHealthyVolume(payload.volume);
const backupResult = yield* runBackupLifecycle({
restic,
repositoryConfig: payload.repositoryConfig,
@@ -111,7 +153,19 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa
yield* sendCancelled(backupResult.message);
return;
}
}).pipe(Effect.ensuring(context.deleteRunningJob(payload.jobId))),
}).pipe(
Effect.catchAll((error) =>
context.offerOutbound(
createAgentMessage("backup.failed", {
jobId: payload.jobId,
scheduleId: payload.scheduleId,
error: error.message,
errorDetails: toMessage(error),
}),
),
),
Effect.ensuring(context.deleteRunningJob(payload.jobId)),
),
);
}).pipe(Effect.asVoid);
};

View File

@@ -1,7 +1,8 @@
import { Effect, Data } from "effect";
import { createAgentMessage, type VolumeCommand, type VolumeCommandPayload } from "@zerobyte/contracts/agent-protocol";
import type { BackendConfig, Volume as AgentVolume } from "@zerobyte/contracts/volumes";
import { toMessage } from "@zerobyte/core/utils";
import { createVolumeBackend, getStatFs, getVolumePath, type AgentVolume, type BackendConfig } from "../volume-host";
import { createVolumeBackend, getStatFs, getVolumePath } from "../volume-host";
import { browseFilesystem, listVolumeFiles, testVolumeConnection } from "../volume-host/operations";
import type { ControllerCommandContext } from "../context";

View File

@@ -1,9 +1,9 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import type { Volume as AgentVolume } from "@zerobyte/contracts/volumes";
import { afterEach, expect, test } from "vitest";
import { listVolumeFiles } from "../operations";
import type { AgentVolume } from "../types";
let tempRoot: string | undefined;

View File

@@ -1,7 +1,8 @@
import * as fs from "node:fs/promises";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
const mount = async (config: BackendConfig) => {
if (config.backend !== "directory") {

View File

@@ -1,11 +1,12 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {

View File

@@ -1,11 +1,12 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { logger, safeExec } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT, RCLONE_CONFIG_FILE, RCLONE_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
import { assertMounted, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {

View File

@@ -3,12 +3,13 @@ import { createHash } from "node:crypto";
import * as os from "node:os";
import * as path from "node:path";
import { spawn } from "node:child_process";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { FILE_MODES, logger, writeFileWithMode } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT, SSH_KEYS_DIR } from "../constants";
import { getMountForPath } from "../fs";
import { withTimeout } from "../timeout";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
import { executeUnmount } from "./utils";
const getMountPathHash = (mountPath: string) => createHash("sha256").update(mountPath).digest("hex").slice(0, 16);

View File

@@ -1,11 +1,12 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {

View File

@@ -1,11 +1,12 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import type { BackendConfig } from "@zerobyte/contracts/volumes";
import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { OPERATION_TIMEOUT } from "../constants";
import { withTimeout } from "../timeout";
import { getMountForPath } from "../fs";
import type { BackendConfig, VolumeBackend } from "../types";
import type { VolumeBackend } from "../types";
import { assertMounted, executeMount, executeUnmount } from "./utils";
const checkHealth = async (mountPath: string) => {

View File

@@ -5,11 +5,11 @@ import { makeSftpBackend } from "./backends/sftp";
import { makeSmbBackend } from "./backends/smb";
import { makeWebdavBackend } from "./backends/webdav";
import { getVolumePath } from "./paths";
import type { AgentVolume, VolumeBackend } from "./types";
import type { Volume as AgentVolume } from "@zerobyte/contracts/volumes";
import type { VolumeBackend } from "./types";
export { getStatFs, isNodeJSErrnoException } from "./fs";
export { getVolumePath } from "./paths";
export type { AgentVolume, BackendConfig, VolumeBackend } from "./types";
export const createVolumeBackend = (volume: AgentVolume, mountPath = getVolumePath(volume)): VolumeBackend => {
switch (volume.config.backend) {
@@ -25,7 +25,9 @@ export const createVolumeBackend = (volume: AgentVolume, mountPath = getVolumePa
return makeRcloneBackend(volume.config, mountPath);
case "sftp":
return makeSftpBackend(volume.config, mountPath);
default: {
const _exhaustive: never = volume.config;
throw new Error("Unsupported backend");
}
}
throw new Error("Unsupported backend");
};

View File

@@ -1,10 +1,10 @@
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import type { BackendConfig, Volume as AgentVolume } from "@zerobyte/contracts/volumes";
import { toMessage } from "@zerobyte/core/utils";
import { createVolumeBackend, getVolumePath, isNodeJSErrnoException } from ".";
import type { AgentVolume, BackendConfig } from "./types";
import { Data, Effect } from "effect";
import { createVolumeBackend, getVolumePath, isNodeJSErrnoException } from ".";
const DEFAULT_PAGE_SIZE = 500;
const MAX_PAGE_SIZE = 500;

View File

@@ -1,7 +1,7 @@
import { VOLUME_MOUNT_BASE } from "./constants";
import type { AgentVolume } from "./types";
import type { Volume } from "@zerobyte/contracts/volumes";
export const getVolumePath = (volume: AgentVolume) => {
export const getVolumePath = (volume: Volume) => {
if (volume.config.backend === "directory") {
return volume.config.path;
}

View File

@@ -1,67 +1,7 @@
export type BackendStatus = "mounted" | "unmounted" | "error";
type BaseConfig = { backend: string; readOnly?: boolean };
export type BackendConfig =
| (BaseConfig & { backend: "directory"; path: string })
| (BaseConfig & { backend: "nfs"; server: string; exportPath: string; port: number; version: "3" | "4" | "4.1" })
| (BaseConfig & {
backend: "smb";
server: string;
share: string;
username?: string;
password?: string;
guest?: boolean;
vers?: "1.0" | "2.0" | "2.1" | "3.0" | "auto";
domain?: string;
port: number;
})
| (BaseConfig & {
backend: "webdav";
server: string;
path: string;
username?: string;
password?: string;
port: number;
ssl?: boolean;
})
| (BaseConfig & { backend: "rclone"; remote: string; path: string })
| (BaseConfig & {
backend: "sftp";
host: string;
port: number;
username: string;
password?: string;
privateKey?: string;
path: string;
skipHostKeyCheck?: boolean;
knownHosts?: string;
});
export type AgentVolume = {
id: number;
shortId: string;
name: string;
config: BackendConfig;
createdAt: number;
updatedAt: number;
lastHealthCheck: number;
type: string;
status: BackendStatus;
lastError: string | null;
provisioningId?: string | null;
autoRemount: boolean;
agentId: string;
organizationId: string;
};
export type OperationResult = {
status: BackendStatus;
error?: string;
};
import type { VolumeOperationResult } from "@zerobyte/contracts/volumes";
export type VolumeBackend = {
mount: () => Promise<OperationResult>;
unmount: () => Promise<OperationResult>;
checkHealth: () => Promise<OperationResult>;
mount: () => Promise<VolumeOperationResult>;
unmount: () => Promise<VolumeOperationResult>;
checkHealth: () => Promise<VolumeOperationResult>;
};

View File

@@ -7,13 +7,19 @@
"types": "./src/agent-protocol.ts",
"import": "./src/agent-protocol.ts",
"default": "./src/agent-protocol.ts"
},
"./volumes": {
"types": "./src/volumes.ts",
"import": "./src/volumes.ts",
"default": "./src/volumes.ts"
}
},
"scripts": {
"tsc": "tsc --noEmit"
},
"dependencies": {
"@zerobyte/core": "workspace:*"
"@zerobyte/core": "workspace:*",
"zod": "^4.0.1"
},
"devDependencies": {
"@types/bun": "latest"

View File

@@ -7,6 +7,15 @@ import {
resticBackupProgressSchema,
type CompressionMode,
} from "@zerobyte/core/restic";
import {
browseFilesystemResponseSchema,
listVolumeFilesResponseSchema,
statfsSchema,
testVolumeConnectionResponseSchema,
volumeConfigSchema,
volumeOperationResultSchema,
volumeSchema,
} from "./volumes";
const compressionModeSchema = z.enum(["off", "auto", "max"]) satisfies z.ZodType<CompressionMode>;
@@ -37,6 +46,7 @@ const backupRunSchema = z.object({
scheduleId: z.string(),
organizationId: z.string(),
sourcePath: z.string(),
volume: volumeSchema,
repositoryConfig: repositoryConfigSchema,
options: backupExecutionOptionsSchema,
runtime: backupRuntimeSchema,
@@ -51,53 +61,6 @@ const backupCancelSchema = z.object({
payload: z.object({ jobId: z.string(), scheduleId: z.string() }),
});
const backendStatusSchema = z.enum(["mounted", "unmounted", "error"]);
const volumeSchema = z.object({
id: z.number(),
shortId: z.string(),
name: z.string(),
path: z.string().nullable().optional(),
config: z.record(z.string(), z.unknown()).and(z.object({ backend: z.string() })),
createdAt: z.number(),
updatedAt: z.number(),
lastHealthCheck: z.number(),
type: z.string(),
status: backendStatusSchema,
lastError: z.string().nullable(),
provisioningId: z.string().nullable().optional(),
autoRemount: z.boolean(),
agentId: z.string(),
organizationId: z.string(),
});
const volumeOperationResultSchema = z.object({
status: backendStatusSchema,
error: z.string().optional(),
});
const statfsSchema = z.object({
total: z.number().optional(),
used: z.number().optional(),
free: z.number().optional(),
});
const fileEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.enum(["directory", "file"]),
size: z.number().optional(),
modifiedAt: z.number().optional(),
});
const directoryEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.literal("directory"),
size: z.undefined().optional(),
modifiedAt: z.number().optional(),
});
const volumeCommandSchema = z.discriminatedUnion("name", [
z.object({ name: z.literal("volume.mount"), volume: volumeSchema }),
z.object({ name: z.literal("volume.unmount"), volume: volumeSchema }),
@@ -110,7 +73,7 @@ const volumeCommandSchema = z.discriminatedUnion("name", [
offset: z.number(),
limit: z.number(),
}),
z.object({ name: z.literal("volume.testConnection"), backendConfig: z.record(z.string(), z.unknown()) }),
z.object({ name: z.literal("volume.testConnection"), backendConfig: volumeConfigSchema }),
z.object({ name: z.literal("filesystem.browse"), path: z.string() }),
]);
@@ -127,25 +90,9 @@ const volumeCommandResultSchema = z.discriminatedUnion("name", [
z.object({ name: z.literal("volume.unmount"), result: volumeOperationResultSchema }),
z.object({ name: z.literal("volume.checkHealth"), result: volumeOperationResultSchema }),
z.object({ name: z.literal("volume.statfs"), result: statfsSchema }),
z.object({
name: z.literal("volume.listFiles"),
result: z.object({
files: z.array(fileEntrySchema),
path: z.string(),
offset: z.number(),
limit: z.number(),
total: z.number(),
hasMore: z.boolean(),
}),
}),
z.object({
name: z.literal("volume.testConnection"),
result: z.object({ success: z.boolean(), message: z.string() }),
}),
z.object({
name: z.literal("filesystem.browse"),
result: z.object({ directories: z.array(directoryEntrySchema), path: z.string() }),
}),
z.object({ name: z.literal("volume.listFiles"), result: listVolumeFilesResponseSchema }),
z.object({ name: z.literal("volume.testConnection"), result: testVolumeConnectionResponseSchema }),
z.object({ name: z.literal("filesystem.browse"), result: browseFilesystemResponseSchema }),
]);
const volumeCommandResponseSchema = z.object({

View File

@@ -86,7 +86,7 @@ export const sftpConfigSchema = z.object({
knownHosts: z.string().optional(),
});
const volumeConfigSchemaBase = z.discriminatedUnion("backend", [
export const volumeConfigSchema = z.discriminatedUnion("backend", [
nfsConfigSchema,
smbConfigSchema,
webdavConfigSchema,
@@ -95,8 +95,6 @@ const volumeConfigSchemaBase = z.discriminatedUnion("backend", [
sftpConfigSchema,
]);
export const volumeConfigSchema = volumeConfigSchemaBase;
export type BackendConfig = z.infer<typeof volumeConfigSchema>;
export const BACKEND_STATUS = {
@@ -106,3 +104,78 @@ export const BACKEND_STATUS = {
} as const;
export type BackendStatus = keyof typeof BACKEND_STATUS;
export const backendStatusSchema = z.enum(BACKEND_STATUS);
export const volumeSchema = z.object({
id: z.number(),
shortId: z.string(),
name: z.string(),
path: z.string().nullable().optional(),
config: volumeConfigSchema,
createdAt: z.number(),
updatedAt: z.number(),
lastHealthCheck: z.number(),
type: z.enum(BACKEND_TYPES),
status: backendStatusSchema,
lastError: z.string().nullable(),
provisioningId: z.string().nullable().optional(),
autoRemount: z.boolean(),
agentId: z.string(),
organizationId: z.string(),
});
export type Volume = z.infer<typeof volumeSchema>;
export const publicVolumeSchema = volumeSchema.omit({
agentId: true,
organizationId: true,
path: true,
});
export type PublicVolume = z.infer<typeof publicVolumeSchema>;
export const volumeOperationResultSchema = z.object({
status: backendStatusSchema,
error: z.string().optional(),
});
export type VolumeOperationResult = z.infer<typeof volumeOperationResultSchema>;
export const statfsSchema = z.object({
total: z.number().optional(),
used: z.number().optional(),
free: z.number().optional(),
});
export const fileEntrySchema = z.object({
name: z.string(),
path: z.string(),
type: z.enum(["directory", "file"]),
size: z.number().optional(),
modifiedAt: z.number().optional(),
});
export const directoryEntrySchema = fileEntrySchema.extend({
type: z.literal("directory"),
size: z.undefined().optional(),
});
export const listVolumeFilesResponseSchema = z.object({
files: z.array(fileEntrySchema),
path: z.string(),
offset: z.number(),
limit: z.number(),
total: z.number(),
hasMore: z.boolean(),
});
export const testVolumeConnectionResponseSchema = z.object({
success: z.boolean(),
message: z.string(),
});
export const browseFilesystemResponseSchema = z.object({
directories: z.array(directoryEntrySchema),
path: z.string(),
});