feat: route backups through volume agents (#860)

This commit is contained in:
Nico
2026-05-06 22:13:51 +02:00
committed by GitHub
parent e65a135676
commit 700fa1cd4a
10 changed files with 2596 additions and 15 deletions

View File

@@ -0,0 +1,2 @@
ALTER TABLE `volumes_table` ADD `agent_id` text DEFAULT 'local' NOT NULL;--> statement-breakpoint
CREATE INDEX `volumes_table_agent_id_idx` ON `volumes_table` (`agent_id`);

View File

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,7 @@ import type { BackupWebhooks } from "@zerobyte/core/backup-hooks";
import type { BackendConfig, BackendStatus, BackendType } from "~/schemas/volumes";
import type { NotificationConfig, NotificationType } from "~/schemas/notifications";
import type { ShortId } from "~/server/utils/branded";
import { LOCAL_AGENT_ID } from "../modules/agents/constants";
/**
* Users Table
@@ -253,12 +254,14 @@ export const volumesTable = sqliteTable(
.default(sql`(unixepoch() * 1000)`),
config: text("config", { mode: "json" }).$type<BackendConfig>().notNull(),
autoRemount: int("auto_remount", { mode: "boolean" }).notNull().default(true),
agentId: text("agent_id").notNull().default(LOCAL_AGENT_ID),
organizationId: text("organization_id")
.notNull()
.references(() => organization.id, { onDelete: "cascade" }),
},
(table) => [
unique().on(table.name, table.organizationId),
index("volumes_table_agent_id_idx").on(table.agentId),
uniqueIndex("volumes_table_org_provisioning_id_uidx").on(table.organizationId, table.provisioningId),
],
);

View File

@@ -421,6 +421,50 @@ describe("backup execution - validation failures", () => {
});
});
describe("backup execution - routing", () => {
test("fails local repository backups on non-local volume agents", async () => {
const { runBackupMock } = setup();
const volume = await createTestVolume({ agentId: "agent-remote" });
const repository = await createTestRepository();
const schedule = await createTestBackupSchedule({
volumeId: volume.id,
repositoryId: repository.id,
});
await backupsService.executeBackup(schedule.id);
const updatedSchedule = await getScheduleByIdOrShortId(schedule.id);
expect(updatedSchedule.lastBackupStatus).toBe("error");
expect(updatedSchedule.lastBackupError).toBe(
`Local repository "${repository.name}" can only be used with the local agent`,
);
expect(runBackupMock).not.toHaveBeenCalled();
});
test("routes remote repository backups through the owning volume agent", async () => {
const { runBackupMock } = setup();
const volume = await createTestVolume({ agentId: "agent-remote" });
const repository = await createTestRepository({
type: "s3",
config: {
backend: "s3",
endpoint: "https://s3.amazonaws.com",
bucket: "bucket-name",
accessKeyId: "access-key",
secretAccessKey: "secret-key",
},
});
const schedule = await createTestBackupSchedule({
volumeId: volume.id,
repositoryId: repository.id,
});
await backupsService.executeBackup(schedule.id);
expect(runBackupMock).toHaveBeenCalledWith("agent-remote", expect.objectContaining({ scheduleId: schedule.id }));
});
});
describe("stop backup", () => {
test("should keep restic warning details when backup completes with read errors", async () => {
const { resticBackupMock } = setup();

View File

@@ -10,6 +10,7 @@ import { getVolumePath } from "../volumes/helpers";
import { decryptRepositoryConfig } from "../repositories/repository-config-secrets";
import { createBackupOptions } from "./backup.helpers";
import { toErrorDetails } from "../../utils/errors";
import { BadRequestError } from "http-errors-enhanced";
const FUSE_VOLUME_BACKENDS = new Set<Volume["type"]>(["rclone", "sftp", "webdav"]);
const IGNORE_INODE_FLAG = "--ignore-inode";
@@ -23,7 +24,17 @@ type BackupExecutionRequest = {
onProgress: (progress: BackupExecutionProgress) => void;
};
const activeControllersByScheduleId = new Map<number, AbortController>();
export type { BackupExecutionResult } from "../agents/agents-manager";
const activeControllersByScheduleId = new Map<number, { abortController: AbortController; agentId: string | null }>();
const getBackupExecutionAgentId = (volume: Volume, repository: Repository) => {
if (repository.type === "local" && volume.agentId !== LOCAL_AGENT_ID) {
throw new BadRequestError(`Local repository "${repository.name}" can only be used with the local agent`);
}
return volume.agentId;
};
const createBackupRunPayload = async ({
jobId,
@@ -93,17 +104,17 @@ const executeBackupWithoutAgent = async (
export const backupExecutor = {
track: (scheduleId: number) => {
const abortController = new AbortController();
activeControllersByScheduleId.set(scheduleId, abortController);
activeControllersByScheduleId.set(scheduleId, { abortController, agentId: null });
return abortController;
},
untrack: (scheduleId: number, abortController: AbortController) => {
if (activeControllersByScheduleId.get(scheduleId) === abortController) {
if (activeControllersByScheduleId.get(scheduleId)?.abortController === abortController) {
activeControllersByScheduleId.delete(scheduleId);
}
},
execute: async (request: Omit<BackupExecutionRequest, "jobId">) => {
const trackedAbortController = activeControllersByScheduleId.get(request.scheduleId);
if (!trackedAbortController || trackedAbortController.signal !== request.signal) {
const trackedExecution = activeControllersByScheduleId.get(request.scheduleId);
if (!trackedExecution || trackedExecution.abortController.signal !== request.signal) {
throw new Error(`Backup execution for schedule ${request.scheduleId} was not tracked`);
}
@@ -119,27 +130,38 @@ export const backupExecutor = {
throw request.signal.reason || new Error("Operation aborted");
}
const executionResult = await agentManager.runBackup(LOCAL_AGENT_ID, {
const executionAgentId = getBackupExecutionAgentId(request.volume, request.repository);
trackedExecution.agentId = executionAgentId;
const executionResult = await agentManager.runBackup(executionAgentId, {
scheduleId: request.scheduleId,
payload,
signal: request.signal,
onProgress: request.onProgress,
});
if (executionResult.status === "unavailable" && !config.flags.enableLocalAgent) {
if (
executionResult.status === "unavailable" &&
executionAgentId === LOCAL_AGENT_ID &&
!config.flags.enableLocalAgent
) {
return executeBackupWithoutAgent(payload, request);
}
return executionResult;
},
cancel: async (scheduleId: number) => {
const abortController = activeControllersByScheduleId.get(scheduleId);
if (!abortController) {
const trackedExecution = activeControllersByScheduleId.get(scheduleId);
if (!trackedExecution) {
return false;
}
abortController.abort();
await agentManager.cancelBackup(LOCAL_AGENT_ID, scheduleId);
trackedExecution.abortController.abort();
if (!trackedExecution.agentId) {
return true;
}
await agentManager.cancelBackup(trackedExecution.agentId, scheduleId);
return true;
},
};

View File

@@ -29,7 +29,6 @@ import { copyToMirrors, runForget, syncSnapshotsToMirror } from "./helpers/backu
import { restic } from "../../core/restic";
import { mirrorQueries } from "./backups.queries";
import { toMessage } from "../../utils/errors";
const listSchedules = async () => {
const organizationId = getOrganizationId();
const schedules = await db.query.backupSchedulesTable.findMany({

View File

@@ -13,6 +13,7 @@ import { config as appConfig } from "~/server/core/config";
import { restic } from "~/server/core/restic";
import { db } from "~/server/db/db";
import { repositoriesTable, volumesTable } from "~/server/db/schema";
import { LOCAL_AGENT_ID } from "~/server/modules/agents/constants";
import { mapRepositoryConfigSecrets } from "~/server/modules/repositories/repository-config-secrets";
import { mapVolumeConfigSecrets } from "~/server/modules/volumes/volume-config-secrets";
import { BACKEND_TYPES, volumeConfigSchema, type BackendConfig } from "~/schemas/volumes";
@@ -154,7 +155,6 @@ const syncProvisionedRepositories = async (repositories: ProvisionedRepository[]
const existing = existingRepositories.find((r) => r.provisioningId === provisioningId);
const encryptedConfig = await encryptProvisionedRepositoryConfig(repository.config);
if (!existing) {
const id = Bun.randomUUIDv7();
@@ -228,6 +228,7 @@ const syncProvisionedVolumes = async (volumes: ProvisionedVolume[]) => {
type: volume.backend,
config: await encryptProvisionedVolumeConfig(volume.config),
autoRemount: volume.autoRemount,
agentId: LOCAL_AGENT_ID,
status: volume.autoRemount ? "mounted" : "unmounted",
organizationId: volume.organizationId,
});
@@ -239,6 +240,7 @@ const syncProvisionedVolumes = async (volumes: ProvisionedVolume[]) => {
type: volume.backend,
config: await encryptProvisionedVolumeConfig(volume.config),
autoRemount: volume.autoRemount,
agentId: LOCAL_AGENT_ID,
organizationId: volume.organizationId,
updatedAt: Date.now(),
};

View File

@@ -73,7 +73,6 @@ const createRepository = async (name: string, config: RepositoryConfig, compress
const organizationId = getOrganizationId();
const id = Bun.randomUUIDv7();
const shortId = generateShortId();
if (config.backend === "local" && !config.isExistingRepository) {
config.path = `${config.path}/${shortId}`;
}
@@ -717,7 +716,6 @@ const updateRepository = async (shortId: ShortId, updates: UpdateRepositoryBody)
const decryptedExisting = await decryptRepositoryConfig(existingConfig);
const configChanged = updates.config && JSON.stringify(decryptedExisting) !== JSON.stringify(parsedConfig);
const encryptedConfig = updates.config ? await encryptRepositoryConfig(parsedConfig) : existingConfig;
const updatedAt = Date.now();
const updatePayload = {
name: newName,

View File

@@ -10,6 +10,7 @@ import { generateShortId } from "../../utils/id";
import { getStatFs, type StatFs } from "../../utils/mountinfo";
import { withTimeout } from "../../utils/timeout";
import { createVolumeBackend } from "../backends/backend";
import { LOCAL_AGENT_ID } from "../agents/constants";
import type { UpdateVolumeBody } from "./volume.dto";
import { getVolumePath } from "./helpers";
import { logger } from "@zerobyte/core/node";
@@ -70,6 +71,7 @@ const createVolume = async (name: string, backendConfig: BackendConfig) => {
name: trimmedName,
config: encryptedConfig,
type: backendConfig.backend,
agentId: LOCAL_AGENT_ID,
organizationId,
})
.returning();
@@ -239,6 +241,7 @@ const testConnection = async (backendConfig: BackendConfig) => {
id: 0,
shortId: asShortId("test"),
name: "test-connection",
path: tempDir,
config: encryptedConfig,
createdAt: Date.now(),
updatedAt: Date.now(),
@@ -248,6 +251,7 @@ const testConnection = async (backendConfig: BackendConfig) => {
lastError: null,
provisioningId: null,
autoRemount: true,
agentId: LOCAL_AGENT_ID,
organizationId: "test-org",
};

View File

@@ -1,6 +1,7 @@
import { db } from "~/server/db/db";
import { faker } from "@faker-js/faker";
import { volumesTable, type VolumeInsert } from "~/server/db/schema";
import { LOCAL_AGENT_ID } from "~/server/modules/agents/constants";
import { ensureTestOrganization, TEST_ORG_ID } from "./organization";
import { generateShortId } from "~/server/utils/id";
@@ -17,6 +18,7 @@ export const createTestVolume = async (overrides: Partial<VolumeInsert> = {}) =>
autoRemount: true,
shortId: generateShortId(),
type: "directory",
agentId: LOCAL_AGENT_ID,
organizationId: TEST_ORG_ID,
...overrides,
};