mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-04-17 21:37:06 -04:00
* feat(agent): add standalone agent runtime * fix(agent): add Bun and DOM types to agent tsconfig * refactor: wrap backup error in a tagged effect error * feat(controller): add agent manager and session handling * feat(backups): execute backups through the agent * fix(agent): harden disconnect and send-failure handling * fix: rebase conflicts * test: simplify mocks * refactor: split agent runtime state * fix(backup): keep old path when agent is disabled * fix: pr feedbacks
156 lines
4.6 KiB
TypeScript
156 lines
4.6 KiB
TypeScript
import { Effect } from "effect";
|
|
import type { BackupSchedule, Volume, Repository } from "../../db/schema";
|
|
import { config } from "../../core/config";
|
|
import { restic, resticDeps } from "../../core/restic";
|
|
import type { BackupRunPayload } from "@zerobyte/contracts/agent-protocol";
|
|
import { agentManager, type BackupExecutionProgress } from "../agents/agents-manager";
|
|
import { getVolumePath } from "../volumes/helpers";
|
|
import { decryptRepositoryConfig } from "../repositories/repository-config-secrets";
|
|
import { createBackupOptions } from "./backup.helpers";
|
|
import { toErrorDetails } from "../../utils/errors";
|
|
|
|
const LOCAL_AGENT_ID = "local";
|
|
|
|
type BackupExecutionRequest = {
|
|
scheduleId: number;
|
|
schedule: BackupSchedule;
|
|
volume: Volume;
|
|
repository: Repository;
|
|
organizationId: string;
|
|
signal: AbortSignal;
|
|
onProgress: (progress: BackupExecutionProgress) => void;
|
|
};
|
|
|
|
export type { BackupExecutionResult } from "../agents/agents-manager";
|
|
|
|
const activeControllersByScheduleId = new Map<number, AbortController>();
|
|
|
|
const createBackupRunPayload = async ({
|
|
jobId,
|
|
schedule,
|
|
volume,
|
|
repository,
|
|
organizationId,
|
|
}: BackupExecutionRequest & { jobId: string }): Promise<BackupRunPayload> => {
|
|
const sourcePath = getVolumePath(volume);
|
|
const { signal: _, ...options } = createBackupOptions(schedule, sourcePath);
|
|
const repositoryConfig = await decryptRepositoryConfig(repository.config);
|
|
const encryptedResticPassword = await resticDeps.getOrganizationResticPassword(organizationId);
|
|
const resticPassword = await resticDeps.resolveSecret(encryptedResticPassword);
|
|
|
|
return {
|
|
jobId,
|
|
scheduleId: schedule.shortId,
|
|
organizationId,
|
|
sourcePath,
|
|
repositoryConfig,
|
|
options: {
|
|
...options,
|
|
compressionMode: repository.compressionMode ?? "auto",
|
|
},
|
|
runtime: {
|
|
password: resticPassword,
|
|
cacheDir: resticDeps.resticCacheDir,
|
|
passFile: resticDeps.resticPassFile,
|
|
defaultExcludes: resticDeps.defaultExcludes,
|
|
rcloneConfigFile: resticDeps.rcloneConfigFile,
|
|
hostname: resticDeps.hostname,
|
|
},
|
|
};
|
|
};
|
|
|
|
const executeBackupWithoutAgent = async (
|
|
payload: BackupRunPayload,
|
|
{ signal, onProgress }: Pick<BackupExecutionRequest, "signal" | "onProgress">,
|
|
) => {
|
|
try {
|
|
const execution = await Effect.runPromise(
|
|
restic
|
|
.backup(payload.repositoryConfig, payload.sourcePath, {
|
|
...payload.options,
|
|
organizationId: payload.organizationId,
|
|
signal,
|
|
onProgress,
|
|
})
|
|
.pipe(
|
|
Effect.map((result) => ({ success: true as const, result })),
|
|
Effect.catchAll((error) => Effect.succeed({ success: false as const, error })),
|
|
),
|
|
);
|
|
|
|
if (!execution.success) {
|
|
return {
|
|
status: "failed" as const,
|
|
error: toErrorDetails(execution.error),
|
|
};
|
|
}
|
|
|
|
const { exitCode, result, warningDetails } = execution.result;
|
|
return {
|
|
status: "completed" as const,
|
|
exitCode,
|
|
result,
|
|
warningDetails,
|
|
};
|
|
} catch (error) {
|
|
return {
|
|
status: "failed" as const,
|
|
error: toErrorDetails(error),
|
|
};
|
|
}
|
|
};
|
|
|
|
export const backupExecutor = {
|
|
track: (scheduleId: number) => {
|
|
const abortController = new AbortController();
|
|
activeControllersByScheduleId.set(scheduleId, abortController);
|
|
return abortController;
|
|
},
|
|
untrack: (scheduleId: number, abortController: AbortController) => {
|
|
if (activeControllersByScheduleId.get(scheduleId) === abortController) {
|
|
activeControllersByScheduleId.delete(scheduleId);
|
|
}
|
|
},
|
|
execute: async (request: Omit<BackupExecutionRequest, "jobId">) => {
|
|
const trackedAbortController = activeControllersByScheduleId.get(request.scheduleId);
|
|
if (!trackedAbortController || trackedAbortController.signal !== request.signal) {
|
|
throw new Error(`Backup execution for schedule ${request.scheduleId} was not tracked`);
|
|
}
|
|
|
|
if (request.signal.aborted) {
|
|
throw request.signal.reason || new Error("Operation aborted");
|
|
}
|
|
|
|
const jobId = Bun.randomUUIDv7();
|
|
|
|
const payload = await createBackupRunPayload({ ...request, jobId });
|
|
|
|
if (request.signal.aborted) {
|
|
throw request.signal.reason || new Error("Operation aborted");
|
|
}
|
|
|
|
const executionResult = await agentManager.runBackup(LOCAL_AGENT_ID, {
|
|
scheduleId: request.scheduleId,
|
|
payload,
|
|
signal: request.signal,
|
|
onProgress: request.onProgress,
|
|
});
|
|
|
|
if (executionResult.status === "unavailable" && !config.flags.enableLocalAgent) {
|
|
return executeBackupWithoutAgent(payload, request);
|
|
}
|
|
|
|
return executionResult;
|
|
},
|
|
cancel: async (scheduleId: number) => {
|
|
const abortController = activeControllersByScheduleId.get(scheduleId);
|
|
if (!abortController) {
|
|
return false;
|
|
}
|
|
|
|
abortController.abort();
|
|
await agentManager.cancelBackup(LOCAL_AGENT_ID, scheduleId);
|
|
return true;
|
|
},
|
|
};
|