refactor: move backup path resolution into agent contract (#872)

This commit is contained in:
Nico
2026-05-10 21:46:03 +02:00
committed by GitHub
parent aa7da321ba
commit a58fe82d48
18 changed files with 521 additions and 318 deletions

View File

@@ -1,31 +1,11 @@
import { readFileSync } from "node:fs";
import os from "node:os";
import { prettifyError, z } from "zod";
import "dotenv/config";
import { resolveResticHostname } from "../../../apps/agent/src/restic/hostname";
import { buildAllowedHosts } from "../lib/auth/base-url";
import { toMessage } from "@zerobyte/core/utils";
const unquote = (str: string) => str.trim().replace(/^(['"])(.*)\1$/, "$2");
const getResticHostname = () => {
try {
const mountinfo = readFileSync("/proc/self/mountinfo", "utf-8");
const hostnameLine = mountinfo.split("\n").find((line) => line.includes(" /etc/hostname "));
const hostname = os.hostname();
if (hostnameLine) {
const containerIdMatch = hostnameLine.match(/[0-9a-f]{64}/);
const containerId = containerIdMatch ? containerIdMatch[0] : null;
if (containerId?.startsWith(hostname)) {
return "zerobyte";
}
return hostname || "zerobyte";
}
} catch {}
return "zerobyte";
};
const envSchema = z
.object({
@@ -124,7 +104,7 @@ const envSchema = z
serverIp: s.SERVER_IP,
serverIdleTimeout: s.SERVER_IDLE_TIMEOUT,
webhookTimeout: s.WEBHOOK_TIMEOUT,
resticHostname: s.RESTIC_HOSTNAME || getResticHostname(),
resticHostname: s.RESTIC_HOSTNAME || resolveResticHostname(),
port: s.PORT,
migrationsPath: s.MIGRATIONS_PATH,
appVersion: s.APP_VERSION,

View File

@@ -53,6 +53,35 @@ const backupVolume = {
organizationId: "org-1",
} satisfies Volume;
const readyPayload = {
agentId: LOCAL_AGENT_ID,
protocolVersion: 1,
hostname: "host",
platform: "linux",
capabilities: { backup: true },
};
const backupPayload = {
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
volume: backupVolume,
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {
oneFileSystem: false,
excludePatterns: null,
excludeIfPresent: null,
includePaths: null,
includePatterns: null,
customResticParams: null,
compressionMode: "auto" as const,
},
runtime: { password: "password" },
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
webhookTimeoutMs: 60_000,
};
type CapturedFetch = NonNullable<Parameters<typeof Bun.serve>[0]["fetch"]>;
const invokeFetch = (fetch: CapturedFetch | undefined, request: Request, srv: Parameters<CapturedFetch>[1]) => {
@@ -162,7 +191,7 @@ test("websocket lifecycle updates agent connection status", async () => {
const socket = createSocket("connection-1");
await websocket?.open?.(fromPartial(socket));
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", readyPayload));
await websocket?.message?.(fromPartial(socket), createAgentMessage("heartbeat.pong", { sentAt: 123 }));
await websocket?.close?.(fromPartial(socket), 1000, "done");
await Effect.runPromise(runtime.stop);
@@ -173,7 +202,12 @@ test("websocket lifecycle updates agent connection status", async () => {
agentName: LOCAL_AGENT_NAME,
agentKind: LOCAL_AGENT_KIND,
});
expect(agentsServiceMocks.markAgentOnline).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number));
expect(agentsServiceMocks.markAgentOnline).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number), {
backup: true,
protocolVersion: 1,
hostname: "host",
platform: "linux",
});
expect(agentsServiceMocks.markAgentSeen).toHaveBeenCalledWith(LOCAL_AGENT_ID, expect.any(Number));
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledWith(LOCAL_AGENT_ID);
expect(stop).toHaveBeenCalledWith(true);
@@ -226,36 +260,14 @@ test("closing a replaced connection reports disconnect without marking the activ
await websocket?.open?.(fromPartial(oldSocket));
await websocket?.open?.(fromPartial(newSocket));
await websocket?.message?.(fromPartial(newSocket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await websocket?.message?.(fromPartial(newSocket), createAgentMessage("agent.ready", readyPayload));
await websocket?.close?.(fromPartial(oldSocket), 1000, "replaced");
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({ type: "agent.disconnected", agentId: LOCAL_AGENT_ID }),
);
expect(agentsServiceMocks.markAgentOffline).toHaveBeenCalledTimes(offlineCallsBeforeClose);
expect(
await Effect.runPromise(
runtime.sendBackup(LOCAL_AGENT_ID, {
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
webhookTimeoutMs: 60_000,
}),
),
).toBe(true);
expect(await Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, backupPayload))).toBe(true);
await Effect.runPromise(runtime.stop);
});
@@ -266,30 +278,12 @@ test("sendBackup is only delivered after the agent is ready", async () => {
const { runtime } = await startRuntime();
const websocket = serve.mock.calls[0]?.[0].websocket;
const socket = createSocket("connection-1");
const payload = {
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: { backend: "local" as const, path: "/tmp/repository" },
options: {},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
webhookTimeoutMs: 60_000,
};
const payload = backupPayload;
await websocket?.open?.(fromPartial(socket));
await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(false);
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID }));
await websocket?.message?.(fromPartial(socket), createAgentMessage("agent.ready", readyPayload));
await expect(Effect.runPromise(runtime.sendBackup(LOCAL_AGENT_ID, payload))).resolves.toBe(true);
await waitForExpect(() => {

View File

@@ -101,19 +101,22 @@ test("sendBackup only queues the transport message", () => {
jobId: "job-queued",
scheduleId: "schedule-queued",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: backupVolume,
repositoryConfig: {
backend: "local",
path: "/tmp/repository",
},
options: {},
options: {
oneFileSystem: false,
excludePatterns: null,
excludeIfPresent: null,
includePaths: null,
includePatterns: null,
customResticParams: null,
compressionMode: "auto",
},
runtime: {
password: "password",
cacheDir: "/tmp/cache",
passFile: "/tmp/pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],
@@ -142,10 +145,29 @@ test("agent.ready marks the session ready and forwards the event", () => {
const { session, close } = createSession(onEvent);
expect(Effect.runSync(session.isReady())).toBe(false);
Effect.runSync(session.handleMessage(createAgentMessage("agent.ready", { agentId: LOCAL_AGENT_ID })));
Effect.runSync(
session.handleMessage(
createAgentMessage("agent.ready", {
agentId: LOCAL_AGENT_ID,
protocolVersion: 1,
hostname: "host",
platform: "linux",
capabilities: { backup: true },
}),
),
);
expect(Effect.runSync(session.isReady())).toBe(true);
expect(onEvent).toHaveBeenCalledWith({ type: "agent.ready", payload: { agentId: LOCAL_AGENT_ID } });
expect(onEvent).toHaveBeenCalledWith({
type: "agent.ready",
payload: {
agentId: LOCAL_AGENT_ID,
protocolVersion: 1,
hostname: "host",
platform: "linux",
capabilities: { backup: true },
},
});
close();
});

View File

@@ -95,9 +95,10 @@ const updateAgentRuntime = async (agentId: string, values: Partial<Agent>) => {
return updatedAgent;
};
const markAgentOnline = async (agentId: string, readyAt = Date.now()) => {
const markAgentOnline = async (agentId: string, readyAt = Date.now(), metadata?: AgentCapabilities) => {
return updateAgentRuntime(agentId, {
status: "online",
capabilities: metadata,
lastSeenAt: readyAt,
lastReadyAt: readyAt,
updatedAt: readyAt,

View File

@@ -93,7 +93,12 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) =>
case "agent.ready": {
const at = Date.now();
return Effect.promise(async () => {
await agentsService.markAgentOnline(agentId, at);
await agentsService.markAgentOnline(agentId, at, {
...event.payload.capabilities,
protocolVersion: event.payload.protocolVersion,
hostname: event.payload.hostname,
platform: event.payload.platform,
});
});
}
case "heartbeat.pong": {
@@ -328,7 +333,9 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) =>
return false;
}
logger.info(`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`);
logger.info(
`Sent backup command ${payload.jobId} to agent ${agentId} for schedule ${payload.scheduleId}`,
);
return true;
}),
cancelBackup: (agentId: string, payload: BackupCancelPayload) =>
@@ -355,12 +362,16 @@ export function createAgentManagerRuntime(onEvent: (event: AgentManagerEvent) =>
const session = getSession(agentId);
if (!session) {
yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not connected.`);
yield* logger.effect.warn(
`Cannot send volume command ${command.name}. Agent ${agentId} is not connected.`,
);
return null;
}
if (!(yield* session.isReady())) {
yield* logger.effect.warn(`Cannot send volume command ${command.name}. Agent ${agentId} is not ready.`);
yield* logger.effect.warn(
`Cannot send volume command ${command.name}. Agent ${agentId} is not ready.`,
);
return null;
}

View File

@@ -1,23 +1,10 @@
import path from "node:path";
import fc from "fast-check";
import { describe, expect, test } from "vitest";
import { fromAny } from "@total-typescript/shoehorn";
import { createBackupOptions, processPattern } from "../backup.helpers";
import { createBackupOptions } from "../backup.helpers";
type BackupScheduleInput = Parameters<typeof createBackupOptions>[0];
const safePatternSegmentArb = fc
.array(fc.constantFrom("a", "b", "c", "x", "y", "z", "0", "1", "2", "-", "_", ".", " "), {
minLength: 1,
maxLength: 12,
})
.map((chars) => chars.join(""))
.filter((segment) => segment.trim() !== "" && segment !== "." && segment !== "..");
const safeRelativePatternArb = fc
.array(safePatternSegmentArb, { minLength: 1, maxLength: 5 })
.map((segments) => segments.join("/"));
const createSchedule = (overrides: Partial<BackupScheduleInput> = {}): BackupScheduleInput =>
fromAny({
shortId: "sched-1234",
@@ -57,43 +44,6 @@ describe("executeBackup - include / exclude patterns", () => {
});
});
test("should handle the case where a subfolder has the exact same name as the volume name", () => {
// arrange
const volumeName = "SyncFolder";
const volumePath = `/${volumeName}`;
const selectedPath = `/${volumeName}`;
const schedule = createSchedule({
includePaths: [selectedPath],
});
const signal = new AbortController().signal;
// act
const options = createBackupOptions(schedule, volumePath, signal);
// assert
expect(options.includePaths).toEqual([path.join(volumePath, volumeName)]);
});
test("should correctly mix relative and absolute patterns", () => {
// arrange
const volumePath = "/var/lib/zerobyte/volumes/vol456/_data";
const relativeInclude = "relative/include";
const anchoredInclude = "/anchored/include";
const schedule = createSchedule({
includePatterns: [relativeInclude, anchoredInclude],
});
const signal = new AbortController().signal;
// act
const options = createBackupOptions(schedule, volumePath, signal);
// assert
expect(options.includePatterns).toEqual([
path.join(volumePath, relativeInclude),
path.join(volumePath, "anchored/include"),
]);
});
test("should handle empty include and exclude patterns", () => {
// arrange
const schedule = createSchedule({
@@ -110,79 +60,4 @@ describe("executeBackup - include / exclude patterns", () => {
expect(options.includePatterns).toEqual([]);
expect(options.exclude).toEqual([]);
});
test("processPattern keeps relative and negated relative patterns unchanged", () => {
expect(processPattern("relative/include", "/volume")).toBe("relative/include");
expect(processPattern("!*.log", "/volume")).toBe("!*.log");
});
test("rejects include patterns that escape the volume root", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const signal = new AbortController().signal;
expect(() =>
createBackupOptions(
createSchedule({
includePatterns: ["../../../../etc/shadow", "/../etc/passwd", "!/../../secrets.txt"],
}),
volumePath,
signal,
),
).toThrow("Include pattern escapes volume root");
});
test("anchors relative glob include patterns to the volume path", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const schedule = createSchedule({
includePatterns: ["**/*.xyz", "*.zip", "!**/*.tmp"],
});
const signal = new AbortController().signal;
const options = createBackupOptions(schedule, volumePath, signal);
expect(options.includePatterns).toEqual([
path.join(volumePath, "**/*.xyz"),
path.join(volumePath, "*.zip"),
`!${path.join(volumePath, "**/*.tmp")}`,
]);
});
test("anchors generated include patterns under the volume path", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
fc.assert(
fc.property(safeRelativePatternArb, fc.boolean(), fc.boolean(), (pattern, anchored, negated) => {
const rawPattern = `${negated ? "!" : ""}${anchored ? "/" : ""}${pattern}`;
const expected = path.join(volumePath, pattern);
expect(processPattern(rawPattern, volumePath, true)).toBe(negated ? `!${expected}` : expected);
}),
);
});
test("rejects generated include patterns that escape the volume root", () => {
const volumePath = "/volume/root";
fc.assert(
fc.property(safeRelativePatternArb, fc.boolean(), (pattern, negated) => {
const escapingPattern = `${negated ? "!" : ""}${"../".repeat(8)}${pattern}`;
expect(() => processPattern(escapingPattern, volumePath, true)).toThrow("Include pattern escapes volume root");
}),
);
});
test("keeps selected include paths separate from include patterns", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const schedule = createSchedule({
includePaths: ["/movies [1]"],
includePatterns: ["**/*.txt"],
});
const signal = new AbortController().signal;
const options = createBackupOptions(schedule, volumePath, signal);
expect(options.includePaths).toEqual([path.join(volumePath, "movies [1]")]);
expect(options.includePatterns).toEqual([path.join(volumePath, "**/*.txt")]);
});
});

View File

@@ -44,14 +44,8 @@ const createBackupRunPayload = async ({
repository,
organizationId,
}: BackupExecutionRequest & { jobId: string }): Promise<BackupRunPayload> => {
// TODO: compute the source path on the agent so backup payloads do not carry controller-local paths.
const sourcePath = getVolumePath(volume);
const agentVolume = { ...volume, config: await decryptVolumeConfig(volume.config) };
const { signal: _, ...options } = createBackupOptions(schedule, sourcePath);
if (FUSE_VOLUME_BACKENDS.has(volume.type) && !options.customResticParams.includes(IGNORE_INODE_FLAG)) {
options.customResticParams = [...options.customResticParams, IGNORE_INODE_FLAG];
}
const customResticParams = schedule.customResticParams ?? [];
const repositoryConfig = await decryptRepositoryConfig(repository.config);
const encryptedResticPassword = await resticDeps.getOrganizationResticPassword(organizationId);
@@ -61,20 +55,22 @@ const createBackupRunPayload = async ({
jobId,
scheduleId: schedule.shortId,
organizationId,
sourcePath,
volume: agentVolume,
repositoryConfig,
options: {
...options,
oneFileSystem: schedule.oneFileSystem,
excludePatterns: schedule.excludePatterns,
excludeIfPresent: schedule.excludeIfPresent,
includePaths: schedule.includePaths,
includePatterns: schedule.includePatterns,
customResticParams:
FUSE_VOLUME_BACKENDS.has(volume.type) && !customResticParams.includes(IGNORE_INODE_FLAG)
? [...customResticParams, IGNORE_INODE_FLAG]
: customResticParams,
compressionMode: repository.compressionMode ?? "auto",
},
runtime: {
password: resticPassword,
cacheDir: resticDeps.resticCacheDir,
passFile: resticDeps.resticPassFile,
defaultExcludes: resticDeps.defaultExcludes,
rcloneConfigFile: resticDeps.rcloneConfigFile,
hostname: resticDeps.hostname,
},
webhooks: schedule.backupWebhooks ?? { pre: null, post: null },
webhookAllowedOrigins: config.webhookAllowedOrigins,
@@ -84,17 +80,25 @@ const createBackupRunPayload = async ({
const executeBackupWithoutAgent = async (
payload: BackupRunPayload,
{ signal, onProgress }: Pick<BackupExecutionRequest, "signal" | "onProgress">,
{ schedule, volume, signal, onProgress }: BackupExecutionRequest,
) => {
const sourcePath = getVolumePath(volume);
const { signal: _, ...backupOptions } = createBackupOptions(schedule, sourcePath, signal);
const options = {
...backupOptions,
customResticParams: payload.options.customResticParams ?? [],
compressionMode: payload.options.compressionMode,
};
return Effect.runPromise(
runBackupLifecycle({
restic,
repositoryConfig: payload.repositoryConfig,
sourcePath: payload.sourcePath,
sourcePath,
jobId: payload.jobId,
scheduleId: payload.scheduleId,
organizationId: payload.organizationId,
options: payload.options,
options,
webhooks: payload.webhooks,
webhookAllowedOrigins: payload.webhookAllowedOrigins,
webhookTimeoutMs: payload.webhookTimeoutMs,

View File

@@ -1,5 +1,5 @@
import { CronExpressionParser } from "cron-parser";
import path from "node:path";
import { createBackupOptions as createAgentBackupOptions } from "../../../../apps/agent/src/commands/backup.helpers";
import type { BackupSchedule } from "~/server/db/schema";
import { toMessage } from "~/server/utils/errors";
import { logger } from "@zerobyte/core/node";
@@ -28,45 +28,20 @@ export const isValidCron = (expression: string) => {
}
};
export const processPattern = (pattern: string, volumePath: string, relative = false) => {
const isNegated = pattern.startsWith("!");
const p = isNegated ? pattern.slice(1) : pattern;
const ensurePatternIsWithinVolume = (candidate: string) => {
const resolvedVolumePath = path.resolve(volumePath);
const resolvedCandidatePath = path.resolve(volumePath, candidate);
const relativePath = path.relative(resolvedVolumePath, resolvedCandidatePath);
if (relativePath === ".." || relativePath.startsWith(`..${path.sep}`) || path.isAbsolute(relativePath)) {
throw new Error(`Include pattern escapes volume root: ${pattern}`);
}
};
if (!p.startsWith("/")) {
if (!relative) return pattern;
ensurePatternIsWithinVolume(p);
const processed = path.join(volumePath, p);
return isNegated ? `!${processed}` : processed;
}
if (relative) {
ensurePatternIsWithinVolume(p.slice(1));
}
const processed = path.join(volumePath, p.slice(1));
return isNegated ? `!${processed}` : processed;
};
export const createBackupOptions = (schedule: BackupSchedule, volumePath: string, signal?: AbortSignal) => ({
tags: [schedule.shortId],
oneFileSystem: schedule.oneFileSystem,
signal,
exclude: schedule.excludePatterns ? schedule.excludePatterns.map((p) => processPattern(p, volumePath)) : undefined,
excludeIfPresent: schedule.excludeIfPresent ?? undefined,
includePaths: schedule.includePaths
? schedule.includePaths.map((p) => processPattern(p, volumePath, true))
: undefined,
includePatterns: schedule.includePatterns
? schedule.includePatterns.map((p) => processPattern(p, volumePath, true))
: undefined,
customResticParams: schedule.customResticParams ?? [],
});
export const createBackupOptions = (schedule: BackupSchedule, volumePath: string, signal?: AbortSignal) =>
createAgentBackupOptions(
{
scheduleId: schedule.shortId,
options: {
oneFileSystem: schedule.oneFileSystem,
excludePatterns: schedule.excludePatterns,
excludeIfPresent: schedule.excludeIfPresent,
includePaths: schedule.includePaths,
includePatterns: schedule.includePatterns,
customResticParams: schedule.customResticParams,
compressionMode: "auto",
},
},
volumePath,
signal,
);

View File

@@ -33,7 +33,6 @@ test("emits backup.failed when a backup command hits a restic error", async () =
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/missing-source",
volume: {
id: 1,
shortId: "volume-1",
@@ -53,13 +52,17 @@ test("emits backup.failed when a backup command hits a restic error", async () =
backend: "local",
path: "/tmp/test-repository",
},
options: {},
options: {
oneFileSystem: false,
excludePatterns: null,
excludeIfPresent: null,
includePaths: null,
includePatterns: null,
customResticParams: null,
compressionMode: "auto",
},
runtime: {
password: "password",
cacheDir: "/tmp/restic-cache",
passFile: "/tmp/restic-pass",
defaultExcludes: [],
rcloneConfigFile: "/root/.config/rclone/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],

View File

@@ -6,9 +6,9 @@ import waitForExpect from "wait-for-expect";
import { fromPartial } from "@total-typescript/shoehorn";
import { parseAgentMessage, type BackupCancelPayload, type BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import * as resticServer from "@zerobyte/core/restic/server";
import { handleBackupCancelCommand } from "./backup-cancel";
import { handleBackupRunCommand } from "./backup-run";
import type { ControllerCommandContext, RunningJob } from "../context";
import { handleBackupCancelCommand } from "../backup-cancel";
import { handleBackupRunCommand } from "../backup-run";
import type { ControllerCommandContext, RunningJob } from "../../context";
const server = setupServer();
@@ -39,7 +39,6 @@ const createRunPayload = (overrides: Partial<BackupRunPayload> = {}) =>
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: {
id: 1,
shortId: "volume-1",
@@ -59,13 +58,17 @@ const createRunPayload = (overrides: Partial<BackupRunPayload> = {}) =>
backend: "local",
path: "/tmp/repository",
},
options: {},
options: {
oneFileSystem: false,
excludePatterns: null,
excludeIfPresent: null,
includePaths: null,
includePatterns: null,
customResticParams: null,
compressionMode: "auto",
},
runtime: {
password: "password",
cacheDir: "/tmp/restic-cache",
passFile: "/tmp/restic-pass",
defaultExcludes: [],
rcloneConfigFile: "/tmp/rclone.conf",
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: ["http://localhost:8080"],
@@ -339,7 +342,6 @@ test("waits for running-job registration before returning to the processor loop"
jobId: "job-1",
scheduleId: "schedule-1",
organizationId: "org-1",
sourcePath: "/tmp/source",
volume: {
id: 1,
shortId: "volume-1",
@@ -359,12 +361,17 @@ test("waits for running-job registration before returning to the processor loop"
backend: "local",
path: "/tmp/repository",
},
options: {},
options: {
oneFileSystem: false,
excludePatterns: null,
excludeIfPresent: null,
includePaths: null,
includePatterns: null,
customResticParams: null,
compressionMode: "auto",
},
runtime: {
password: "password",
cacheDir: "/tmp/restic-cache",
passFile: "/tmp/restic-pass",
defaultExcludes: [],
},
webhooks: { pre: null, post: null },
webhookAllowedOrigins: [],

View File

@@ -0,0 +1,171 @@
import path from "node:path";
import fc from "fast-check";
import { describe, expect, test } from "vitest";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
import { createBackupOptions, processPattern } from "../backup.helpers";
type BackupPathOptions = BackupRunPayload["options"];
const safePatternSegmentArb = fc
.array(fc.constantFrom("a", "b", "c", "x", "y", "z", "0", "1", "2", "-", "_", ".", " "), {
minLength: 1,
maxLength: 12,
})
.map((chars) => chars.join(""))
.filter((segment) => segment.trim() !== "" && segment !== "." && segment !== "..");
const safeRelativePatternArb = fc
.array(safePatternSegmentArb, { minLength: 1, maxLength: 5 })
.map((segments) => segments.join("/"));
const createPathOptions = (overrides: Partial<BackupPathOptions> = {}): BackupPathOptions => ({
oneFileSystem: false,
includePaths: [],
includePatterns: [],
excludePatterns: [],
excludeIfPresent: [],
customResticParams: [],
compressionMode: "auto",
...overrides,
});
const createOptions = (options: BackupPathOptions, volumePath: string, signal?: AbortSignal) =>
createBackupOptions({ scheduleId: "sched-1234", options }, volumePath, signal);
describe("backup path options", () => {
test("builds include and exclude patterns", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const signal = new AbortController().signal;
const options = createOptions(
createPathOptions({
includePaths: ["/Photos"],
includePatterns: ["*.zip", "!/Temp", "!*.log"],
excludePatterns: [".DS_Store", "/Config", "!/Important", "!*.tmp"],
excludeIfPresent: [".nobackup"],
}),
volumePath,
signal,
);
expect(options).toMatchObject({
tags: ["sched-1234"],
signal,
includePaths: [path.join(volumePath, "Photos")],
includePatterns: [
path.join(volumePath, "*.zip"),
`!${path.join(volumePath, "Temp")}`,
`!${path.join(volumePath, "*.log")}`,
],
exclude: [".DS_Store", path.join(volumePath, "Config"), `!${path.join(volumePath, "Important")}`, "!*.tmp"],
excludeIfPresent: [".nobackup"],
});
});
test("keeps relative and negated relative exclude patterns unchanged", () => {
expect(processPattern("relative/include", "/volume")).toBe("relative/include");
expect(processPattern("!*.log", "/volume")).toBe("!*.log");
});
test("anchors relative glob include patterns to the volume path", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const options = createOptions(
createPathOptions({ includePatterns: ["**/*.xyz", "*.zip", "!**/*.tmp"] }),
volumePath,
);
expect(options.includePatterns).toEqual([
path.join(volumePath, "**/*.xyz"),
path.join(volumePath, "*.zip"),
`!${path.join(volumePath, "**/*.tmp")}`,
]);
});
test("handles a selected subfolder with the exact same name as the volume path", () => {
const volumeName = "SyncFolder";
const volumePath = `/${volumeName}`;
const options = createOptions(createPathOptions({ includePaths: [`/${volumeName}`] }), volumePath);
expect(options.includePaths).toEqual([path.join(volumePath, volumeName)]);
});
test("mixes relative and absolute include patterns", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol456/_data";
const relativeInclude = "relative/include";
const anchoredInclude = "/anchored/include";
const options = createOptions(
createPathOptions({ includePatterns: [relativeInclude, anchoredInclude] }),
volumePath,
);
expect(options.includePatterns).toEqual([
path.join(volumePath, relativeInclude),
path.join(volumePath, "anchored/include"),
]);
});
test("handles empty include and exclude patterns", () => {
const options = createOptions(
createPathOptions({ includePatterns: [], excludePatterns: [] }),
"/var/lib/zerobyte/volumes/vol999/_data",
);
expect(options.includePaths).toEqual([]);
expect(options.includePatterns).toEqual([]);
expect(options.exclude).toEqual([]);
});
test("rejects include patterns that escape the volume root", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
expect(() =>
createOptions(
createPathOptions({
includePatterns: ["../../../../etc/shadow", "/../etc/passwd", "!/../../secrets.txt"],
}),
volumePath,
),
).toThrow("Include pattern escapes volume root");
});
test("anchors generated include patterns under the volume path", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
fc.assert(
fc.property(safeRelativePatternArb, fc.boolean(), fc.boolean(), (pattern, anchored, negated) => {
const rawPattern = `${negated ? "!" : ""}${anchored ? "/" : ""}${pattern}`;
const expected = path.join(volumePath, pattern);
expect(processPattern(rawPattern, volumePath, true)).toBe(negated ? `!${expected}` : expected);
}),
);
});
test("rejects generated include patterns that escape the volume root", () => {
const volumePath = "/volume/root";
fc.assert(
fc.property(safeRelativePatternArb, fc.boolean(), (pattern, negated) => {
const escapingPattern = `${negated ? "!" : ""}${"../".repeat(8)}${pattern}`;
expect(() => processPattern(escapingPattern, volumePath, true)).toThrow(
"Include pattern escapes volume root",
);
}),
);
});
test("keeps selected include paths separate from include patterns", () => {
const volumePath = "/var/lib/zerobyte/volumes/vol123/_data";
const options = createOptions(
createPathOptions({ includePaths: ["/movies [1]"], includePatterns: ["**/*.txt"] }),
volumePath,
);
expect(options.includePaths).toEqual([path.join(volumePath, "movies [1]")]);
expect(options.includePatterns).toEqual([path.join(volumePath, "**/*.txt")]);
});
});

View File

@@ -3,11 +3,12 @@ import { createAgentMessage, type BackupRunPayload } from "@zerobyte/contracts/a
import type { Volume } from "@zerobyte/contracts/volumes";
import { runBackupLifecycle } from "@zerobyte/core/backup-hooks";
import { logger } from "@zerobyte/core/node";
import { type ResticDeps } from "@zerobyte/core/restic";
import { createRestic } from "@zerobyte/core/restic/server";
import { toMessage } from "@zerobyte/core/utils";
import type { ControllerCommandContext } from "../context";
import { createVolumeBackend } from "../volume-host";
import { resticDeps } from "../restic/deps";
import { createVolumeBackend, getVolumePath } from "../volume-host";
import { createBackupOptions } from "./backup.helpers";
class VolumeReadinessError extends Data.TaggedError("VolumeReadinessError")<{
readonly _tag: "VolumeReadinessError";
@@ -84,29 +85,21 @@ export const handleBackupRunCommand = (context: ControllerCommandContext, payloa
}),
);
const deps: ResticDeps = {
resolveSecret: async (encrypted) => encrypted,
getOrganizationResticPassword: async () => payload.runtime.password,
resticCacheDir: payload.runtime.cacheDir,
resticPassFile: payload.runtime.passFile,
defaultExcludes: payload.runtime.defaultExcludes,
hostname: payload.runtime.hostname,
rcloneConfigFile: payload.runtime.rcloneConfigFile,
};
const restic = createRestic(deps);
const restic = createRestic(resticDeps(payload.runtime.password));
const runtime = yield* Effect.runtime<never>();
yield* ensureHealthyVolume(payload.volume);
const sourcePath = getVolumePath(payload.volume);
const options = createBackupOptions(payload, sourcePath, abortController.signal);
const backupResult = yield* runBackupLifecycle({
restic,
repositoryConfig: payload.repositoryConfig,
sourcePath: payload.sourcePath,
sourcePath,
jobId: payload.jobId,
scheduleId: payload.scheduleId,
organizationId: payload.organizationId,
options: payload.options,
options,
webhooks: payload.webhooks,
webhookAllowedOrigins: payload.webhookAllowedOrigins,
webhookTimeoutMs: payload.webhookTimeoutMs,

View File

@@ -0,0 +1,48 @@
import path from "node:path";
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
type BackupOptions = BackupRunPayload["options"];
export const processPattern = (pattern: string, volumePath: string, relative = false) => {
const isNegated = pattern.startsWith("!");
const p = isNegated ? pattern.slice(1) : pattern;
const ensurePatternIsWithinVolume = (candidate: string) => {
const resolvedVolumePath = path.resolve(volumePath);
const resolvedCandidatePath = path.resolve(volumePath, candidate);
const relativePath = path.relative(resolvedVolumePath, resolvedCandidatePath);
if (relativePath === ".." || relativePath.startsWith(`..${path.sep}`) || path.isAbsolute(relativePath)) {
throw new Error(`Include pattern escapes volume root: ${pattern}`);
}
};
if (!p.startsWith("/")) {
if (!relative) return pattern;
ensurePatternIsWithinVolume(p);
const processed = path.join(volumePath, p);
return isNegated ? `!${processed}` : processed;
}
if (relative) {
ensurePatternIsWithinVolume(p.slice(1));
}
const processed = path.join(volumePath, p.slice(1));
return isNegated ? `!${processed}` : processed;
};
export const createBackupOptions = (
params: { scheduleId: string; options: BackupOptions },
volumePath: string,
signal?: AbortSignal,
) => ({
tags: [params.scheduleId],
oneFileSystem: params.options.oneFileSystem,
signal,
exclude: params.options.excludePatterns?.map((p) => processPattern(p, volumePath)) ?? undefined,
excludeIfPresent: params.options.excludeIfPresent ?? undefined,
includePaths: params.options.includePaths?.map((p) => processPattern(p, volumePath, true)) ?? undefined,
includePatterns: params.options.includePatterns?.map((p) => processPattern(p, volumePath, true)) ?? undefined,
customResticParams: params.options.customResticParams ?? [],
compressionMode: params.options.compressionMode,
});

View File

@@ -9,6 +9,7 @@ import { logger } from "@zerobyte/core/node";
import { toMessage } from "@zerobyte/core/utils";
import { handleControllerCommand } from "./commands";
import type { ControllerCommandContext, RunningJob } from "./context";
import { resolveResticHostname } from "./restic/hostname";
export type ControllerSession = {
onOpen: () => void;
@@ -136,7 +137,17 @@ export const createControllerSession = (ws: WebSocket): ControllerSession => {
return {
onOpen: () => {
void Effect.runPromise(offerOutbound(createAgentMessage("agent.ready", { agentId: "" }))).catch((error) => {
void Effect.runPromise(
offerOutbound(
createAgentMessage("agent.ready", {
agentId: "",
protocolVersion: 1,
hostname: resolveResticHostname(),
platform: process.platform,
capabilities: { backup: true, volume: true, restic: true },
}),
),
).catch((error) => {
logger.error(`Failed to queue ready message: ${toMessage(error)}`);
});
},

View File

@@ -0,0 +1,62 @@
import { afterEach, expect, test, vi } from "vitest";
const fsMock = vi.hoisted(() => ({
readFileSync: vi.fn(),
}));
const osMock = vi.hoisted(() => ({
hostname: vi.fn(),
}));
vi.mock("node:fs", () => fsMock);
vi.mock("node:os", () => ({ default: osMock }));
afterEach(() => {
delete process.env.RESTIC_HOSTNAME;
fsMock.readFileSync.mockReset();
osMock.hostname.mockReset();
});
test("uses the configured RESTIC_HOSTNAME when present", async () => {
process.env.RESTIC_HOSTNAME = "configured-host";
const { resolveResticHostname } = await import("../hostname");
expect(resolveResticHostname()).toBe("configured-host");
expect(fsMock.readFileSync).not.toHaveBeenCalled();
});
test("normalizes Docker container IDs to the stable Zerobyte hostname", async () => {
const containerId = "abc123".padEnd(64, "0");
fsMock.readFileSync.mockReturnValue(`123 456 0:1 / ${containerId} /etc/hostname rw - ext4 /dev/root rw`);
osMock.hostname.mockReturnValue("abc123");
const { resolveResticHostname } = await import("../hostname");
expect(resolveResticHostname()).toBe("zerobyte");
});
test("keeps non-container hostnames from mountinfo", async () => {
const containerId = "def456".padEnd(64, "0");
fsMock.readFileSync.mockReturnValue(`123 456 0:1 / ${containerId} /etc/hostname rw - ext4 /dev/root rw`);
osMock.hostname.mockReturnValue("backup-host");
const { resolveResticHostname } = await import("../hostname");
expect(resolveResticHostname()).toBe("backup-host");
});
test("uses the stable Zerobyte hostname when mountinfo is unavailable", async () => {
fsMock.readFileSync.mockImplementation(() => {
throw new Error("unavailable");
});
osMock.hostname.mockReturnValue("ephemeral-container-host");
const { resolveResticHostname } = await import("../hostname");
expect(resolveResticHostname()).toBe("zerobyte");
});
test("uses the stable Zerobyte hostname when hostname mount is missing", async () => {
fsMock.readFileSync.mockReturnValue("123 456 0:1 / / rw - overlay overlay rw");
osMock.hostname.mockReturnValue("ephemeral-container-host");
const { resolveResticHostname } = await import("../hostname");
expect(resolveResticHostname()).toBe("zerobyte");
});

View File

@@ -0,0 +1,20 @@
import path from "node:path";
import type { ResticDeps } from "@zerobyte/core/restic";
import { resolveResticHostname } from "./hostname";
const REPOSITORY_BASE = process.env.ZEROBYTE_REPOSITORIES_DIR || "/var/lib/zerobyte/repositories";
const RESTIC_CACHE_DIR = process.env.RESTIC_CACHE_DIR || "/var/lib/zerobyte/restic/cache";
const RESTIC_PASS_FILE = process.env.RESTIC_PASS_FILE || "/var/lib/zerobyte/data/restic.pass";
const RCLONE_CONFIG_DIR = process.env.RCLONE_CONFIG_DIR || "/root/.config/rclone";
const RCLONE_CONFIG_FILE = path.join(RCLONE_CONFIG_DIR, "rclone.conf");
const DEFAULT_EXCLUDES = [RESTIC_PASS_FILE, REPOSITORY_BASE];
export const resticDeps = (password: string): ResticDeps => ({
resolveSecret: async (encrypted) => encrypted,
getOrganizationResticPassword: async () => password,
resticCacheDir: RESTIC_CACHE_DIR,
resticPassFile: RESTIC_PASS_FILE,
defaultExcludes: DEFAULT_EXCLUDES,
hostname: resolveResticHostname(),
rcloneConfigFile: RCLONE_CONFIG_FILE,
});

View File

@@ -0,0 +1,27 @@
import { readFileSync } from "node:fs";
import os from "node:os";
export const resolveResticHostname = () => {
if (process.env.RESTIC_HOSTNAME) {
return process.env.RESTIC_HOSTNAME;
}
try {
const mountinfo = readFileSync("/proc/self/mountinfo", "utf-8");
const hostnameLine = mountinfo.split("\n").find((line) => line.includes(" /etc/hostname "));
if (hostnameLine) {
const hostname = os.hostname();
const containerIdMatch = hostnameLine.match(/[0-9a-f]{64}/);
const containerId = containerIdMatch ? containerIdMatch[0] : null;
if (containerId?.startsWith(hostname)) {
return "zerobyte";
}
return hostname || "zerobyte";
}
} catch {}
return "zerobyte";
};

View File

@@ -20,23 +20,17 @@ import {
const compressionModeSchema = z.enum(["off", "auto", "max"]) satisfies z.ZodType<CompressionMode>;
const backupExecutionOptionsSchema = z.object({
tags: z.array(z.string()).optional(),
oneFileSystem: z.boolean().optional(),
exclude: z.array(z.string()).optional(),
excludeIfPresent: z.array(z.string()).optional(),
includePaths: z.array(z.string()).optional(),
includePatterns: z.array(z.string()).optional(),
customResticParams: z.array(z.string()).optional(),
compressionMode: compressionModeSchema.optional(),
oneFileSystem: z.boolean(),
excludePatterns: z.array(z.string()).nullable(),
excludeIfPresent: z.array(z.string()).nullable(),
includePaths: z.array(z.string()).nullable(),
includePatterns: z.array(z.string()).nullable(),
customResticParams: z.array(z.string()).nullable(),
compressionMode: compressionModeSchema,
});
const backupRuntimeSchema = z.object({
password: z.string(),
cacheDir: z.string(),
passFile: z.string(),
defaultExcludes: z.array(z.string()),
hostname: z.string().optional(),
rcloneConfigFile: z.string(),
});
const backupRunSchema = z.object({
@@ -45,7 +39,6 @@ const backupRunSchema = z.object({
jobId: z.string(),
scheduleId: z.string(),
organizationId: z.string(),
sourcePath: z.string(),
volume: volumeSchema,
repositoryConfig: repositoryConfigSchema,
options: backupExecutionOptionsSchema,
@@ -110,7 +103,13 @@ const heartbeatPingSchema = z.object({
const agentReadySchema = z.object({
type: z.literal("agent.ready"),
payload: z.object({ agentId: z.string() }),
payload: z.object({
agentId: z.string(),
protocolVersion: z.number(),
hostname: z.string(),
platform: z.string(),
capabilities: z.record(z.string(), z.unknown()),
}),
});
const backupStartedSchema = z.object({