mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-02-20 07:45:30 -05:00
454 lines
14 KiB
TypeScript
454 lines
14 KiB
TypeScript
import { NotFoundError, BadRequestError, ConflictError } from "http-errors-enhanced";
|
|
import type { BackupSchedule, Volume, Repository } from "../../db/schema";
|
|
import { restic } from "../../utils/restic";
|
|
import { logger } from "../../utils/logger";
|
|
import { cache } from "../../utils/cache";
|
|
import { getVolumePath } from "../volumes/helpers";
|
|
import { toMessage } from "../../utils/errors";
|
|
import { serverEvents } from "../../core/events";
|
|
import { notificationsService } from "../notifications/notifications.service";
|
|
import { repoMutex } from "../../core/repository-mutex";
|
|
import { getOrganizationId } from "~/server/core/request-context";
|
|
import { scheduleQueries, mirrorQueries, repositoryQueries } from "./backups.queries";
|
|
import { calculateNextRun, createBackupOptions } from "./backup.helpers";
|
|
import type { ResticBackupOutputDto } from "~/schemas/restic-dto";
|
|
|
|
const runningBackups = new Map<number, AbortController>();
|
|
|
|
interface BackupContext {
|
|
schedule: BackupSchedule;
|
|
volume: Volume;
|
|
repository: Repository;
|
|
organizationId: string;
|
|
}
|
|
type ValidationSuccess = {
|
|
type: "success";
|
|
context: BackupContext;
|
|
};
|
|
type ValidationFailure = {
|
|
type: "failure";
|
|
error: Error;
|
|
partialContext?: Partial<BackupContext>;
|
|
};
|
|
type ValidationSkipped = {
|
|
type: "skipped";
|
|
reason: string;
|
|
};
|
|
type ValidationResult = ValidationSuccess | ValidationFailure | ValidationSkipped;
|
|
|
|
const validateBackupExecution = async (scheduleId: number, manual = false): Promise<ValidationResult> => {
|
|
const organizationId = getOrganizationId();
|
|
const result = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!result) {
|
|
return { type: "failure", error: new NotFoundError("Backup schedule not found") };
|
|
}
|
|
|
|
const { volume, repository, ...schedule } = result;
|
|
|
|
if (!schedule) {
|
|
return { type: "failure", error: new NotFoundError("Backup schedule not found") };
|
|
}
|
|
|
|
if (!schedule.enabled && !manual) {
|
|
logger.info(`Backup schedule ${scheduleId} is disabled. Skipping execution.`);
|
|
return { type: "skipped", reason: "Backup schedule is disabled" };
|
|
}
|
|
|
|
if (schedule.lastBackupStatus === "in_progress") {
|
|
logger.info(`Backup schedule ${scheduleId} is already in progress. Skipping execution.`);
|
|
return { type: "skipped", reason: "Backup is already in progress" };
|
|
}
|
|
|
|
if (!volume) {
|
|
return { type: "failure", error: new NotFoundError("Volume not found"), partialContext: { schedule } };
|
|
}
|
|
|
|
if (!repository) {
|
|
return { type: "failure", error: new NotFoundError("Repository not found"), partialContext: { schedule, volume } };
|
|
}
|
|
|
|
if (volume.status !== "mounted") {
|
|
return {
|
|
type: "failure",
|
|
error: new BadRequestError("Volume is not mounted"),
|
|
partialContext: { schedule, volume, repository },
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "success",
|
|
context: { schedule, volume, repository, organizationId },
|
|
};
|
|
};
|
|
|
|
const emitBackupStarted = (ctx: BackupContext, scheduleId: number) => {
|
|
logger.info(
|
|
`Starting backup ${ctx.schedule.name} for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
|
|
serverEvents.emit("backup:started", {
|
|
organizationId: ctx.organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, "start", {
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
scheduleName: ctx.schedule.name,
|
|
})
|
|
.catch((error) => {
|
|
logger.error(`Failed to send backup start notification: ${toMessage(error)}`);
|
|
});
|
|
};
|
|
|
|
const runBackupOperation = async (ctx: BackupContext, signal: AbortSignal) => {
|
|
const volumePath = getVolumePath(ctx.volume);
|
|
const backupOptions = createBackupOptions(ctx.schedule, volumePath, signal);
|
|
|
|
const releaseBackupLock = await repoMutex.acquireShared(ctx.repository.id, `backup:${ctx.volume.name}`, signal);
|
|
|
|
try {
|
|
const result = await restic.backup(ctx.repository.config, volumePath, {
|
|
...backupOptions,
|
|
compressionMode: ctx.repository.compressionMode ?? "auto",
|
|
organizationId: ctx.organizationId,
|
|
onProgress: (progress) => {
|
|
serverEvents.emit("backup:progress", {
|
|
organizationId: ctx.organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
...progress,
|
|
});
|
|
},
|
|
});
|
|
return result;
|
|
} finally {
|
|
releaseBackupLock();
|
|
}
|
|
};
|
|
|
|
const finalizeSuccessfulBackup = async (
|
|
ctx: BackupContext,
|
|
scheduleId: number,
|
|
exitCode: number,
|
|
result: ResticBackupOutputDto | null,
|
|
) => {
|
|
const finalStatus = exitCode === 0 ? "success" : "warning";
|
|
|
|
if (ctx.schedule.retentionPolicy) {
|
|
void runForget(scheduleId).catch((error) => {
|
|
logger.error(`Failed to run retention policy for schedule ${scheduleId}: ${toMessage(error)}`);
|
|
});
|
|
}
|
|
|
|
void copyToMirrors(scheduleId, ctx.repository, ctx.schedule.retentionPolicy).catch((error) => {
|
|
logger.error(`Background mirror copy failed for schedule ${scheduleId}: ${toMessage(error)}`);
|
|
});
|
|
|
|
cache.delByPrefix(`snapshots:${ctx.repository.id}:`);
|
|
cache.del(`retention:${ctx.schedule.shortId}`);
|
|
|
|
const nextBackupAt = calculateNextRun(ctx.schedule.cronExpression);
|
|
await scheduleQueries.updateStatus(scheduleId, ctx.organizationId, {
|
|
lastBackupAt: Date.now(),
|
|
lastBackupStatus: finalStatus,
|
|
lastBackupError: null,
|
|
nextBackupAt,
|
|
});
|
|
|
|
if (finalStatus === "warning") {
|
|
logger.warn(
|
|
`Backup ${ctx.schedule.name} completed with warnings for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
} else {
|
|
logger.info(
|
|
`Backup ${ctx.schedule.name} completed successfully for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
}
|
|
|
|
serverEvents.emit("backup:completed", {
|
|
organizationId: ctx.organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
status: finalStatus,
|
|
summary: result ?? undefined,
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, finalStatus, {
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
scheduleName: ctx.schedule.name,
|
|
summary: result ?? undefined,
|
|
})
|
|
.catch((error) => {
|
|
logger.error(`Failed to send backup success notification: ${toMessage(error)}`);
|
|
});
|
|
};
|
|
|
|
const handleValidationResult = async (scheduleId: number, result: ValidationFailure | ValidationSkipped) => {
|
|
const organizationId = getOrganizationId();
|
|
|
|
if (result.type === "skipped") {
|
|
logger.info(`Backup execution for schedule ${scheduleId} was skipped: ${result.reason}`);
|
|
return;
|
|
}
|
|
|
|
await handleBackupFailure(scheduleId, organizationId, result.error, result.partialContext);
|
|
};
|
|
|
|
const handleBackupFailure = async (
|
|
scheduleId: number,
|
|
organizationId: string,
|
|
error: unknown,
|
|
partialContext?: Partial<BackupContext>,
|
|
): Promise<void> => {
|
|
const errorMessage = toMessage(error);
|
|
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
lastBackupAt: Date.now(),
|
|
lastBackupStatus: "error",
|
|
lastBackupError: errorMessage,
|
|
});
|
|
|
|
if (partialContext?.schedule && partialContext?.volume && partialContext?.repository) {
|
|
const ctx = partialContext as BackupContext;
|
|
|
|
logger.error(
|
|
`Backup ${ctx.schedule.name} failed for volume ${ctx.volume.name} to repository ${ctx.repository.name}: ${errorMessage}`,
|
|
);
|
|
|
|
serverEvents.emit("backup:completed", {
|
|
organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
status: "error",
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, "failure", {
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
scheduleName: ctx.schedule.name,
|
|
error: errorMessage,
|
|
})
|
|
.catch((notifError) => {
|
|
logger.error(`Failed to send backup failure notification: ${toMessage(notifError)}`);
|
|
});
|
|
}
|
|
};
|
|
|
|
const executeBackup = async (scheduleId: number, manual = false): Promise<void> => {
|
|
const result = await validateBackupExecution(scheduleId, manual);
|
|
|
|
if (result.type !== "success") {
|
|
return handleValidationResult(scheduleId, result);
|
|
}
|
|
|
|
const { context: ctx } = result;
|
|
emitBackupStarted(ctx, scheduleId);
|
|
|
|
const nextBackupAt = calculateNextRun(ctx.schedule.cronExpression);
|
|
|
|
await scheduleQueries.updateStatus(scheduleId, ctx.organizationId, {
|
|
lastBackupStatus: "in_progress",
|
|
lastBackupError: null,
|
|
nextBackupAt,
|
|
});
|
|
|
|
const abortController = new AbortController();
|
|
runningBackups.set(scheduleId, abortController);
|
|
|
|
try {
|
|
const backupResult = await runBackupOperation(ctx, abortController.signal);
|
|
await finalizeSuccessfulBackup(ctx, scheduleId, backupResult.exitCode, backupResult.result);
|
|
} catch (error) {
|
|
await handleBackupFailure(scheduleId, ctx.organizationId, error, ctx);
|
|
} finally {
|
|
runningBackups.delete(scheduleId);
|
|
}
|
|
};
|
|
|
|
const getSchedulesToExecute = async () => {
|
|
const organizationId = getOrganizationId();
|
|
return scheduleQueries.findExecutable(organizationId);
|
|
};
|
|
|
|
const stopBackup = async (scheduleId: number) => {
|
|
const organizationId = getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
try {
|
|
const abortController = runningBackups.get(scheduleId);
|
|
if (!abortController) {
|
|
throw new ConflictError("No backup is currently running for this schedule");
|
|
}
|
|
|
|
logger.info(`Stopping backup for schedule ${scheduleId}`);
|
|
abortController.abort();
|
|
} finally {
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
lastBackupStatus: "warning",
|
|
lastBackupError: "Backup was stopped by user",
|
|
});
|
|
}
|
|
};
|
|
|
|
const runForget = async (scheduleId: number, repositoryId?: string) => {
|
|
const organizationId = getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
if (!schedule.retentionPolicy) {
|
|
throw new BadRequestError("No retention policy configured for this schedule");
|
|
}
|
|
|
|
const repository = await repositoryQueries.findById(repositoryId ?? schedule.repositoryId, organizationId);
|
|
|
|
if (!repository) {
|
|
throw new NotFoundError("Repository not found");
|
|
}
|
|
|
|
logger.info(`running retention policy (forget) for schedule ${scheduleId}`);
|
|
const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:${scheduleId}`);
|
|
|
|
try {
|
|
await restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.shortId, organizationId });
|
|
cache.delByPrefix(`snapshots:${repository.id}:`);
|
|
cache.del(`retention:${schedule.shortId}`);
|
|
} finally {
|
|
releaseLock();
|
|
}
|
|
|
|
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
|
|
};
|
|
|
|
const copyToMirrors = async (
|
|
scheduleId: number,
|
|
sourceRepository: Repository,
|
|
retentionPolicy: BackupSchedule["retentionPolicy"],
|
|
) => {
|
|
const organizationId = getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
const mirrors = await mirrorQueries.findEnabledBySchedule(scheduleId);
|
|
|
|
if (mirrors.length === 0) {
|
|
return;
|
|
}
|
|
|
|
logger.info(`[Background] Copying snapshots to ${mirrors.length} mirror repositories for schedule ${scheduleId}`);
|
|
|
|
for (const mirror of mirrors) {
|
|
await copyToSingleMirror(scheduleId, schedule, sourceRepository, mirror, retentionPolicy, organizationId);
|
|
}
|
|
};
|
|
|
|
const copyToSingleMirror = async (
|
|
scheduleId: number,
|
|
schedule: BackupSchedule,
|
|
sourceRepository: Repository,
|
|
mirror: {
|
|
repositoryId: string;
|
|
repository: Repository;
|
|
},
|
|
retentionPolicy: BackupSchedule["retentionPolicy"],
|
|
organizationId: string,
|
|
) => {
|
|
try {
|
|
logger.info(`[Background] Copying to mirror repository: ${mirror.repository.name}`);
|
|
|
|
serverEvents.emit("mirror:started", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
});
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyStatus: "in_progress",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
const releaseSource = await repoMutex.acquireShared(sourceRepository.id, `mirror_source:${scheduleId}`);
|
|
const releaseMirror = await repoMutex.acquireShared(mirror.repository.id, `mirror:${scheduleId}`);
|
|
|
|
try {
|
|
await restic.copy(sourceRepository.config, mirror.repository.config, { tag: schedule.shortId, organizationId });
|
|
cache.delByPrefix(`snapshots:${mirror.repository.id}:`);
|
|
} finally {
|
|
releaseSource();
|
|
releaseMirror();
|
|
}
|
|
|
|
if (retentionPolicy) {
|
|
void runForget(scheduleId, mirror.repository.id).catch((error) => {
|
|
logger.error(
|
|
`Failed to run retention policy for mirror repository ${mirror.repository.name}: ${toMessage(error)}`,
|
|
);
|
|
});
|
|
}
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "success",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
logger.info(`[Background] Successfully copied to mirror repository: ${mirror.repository.name}`);
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
status: "success",
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = toMessage(error);
|
|
logger.error(`[Background] Failed to copy to mirror repository ${mirror.repository.name}: ${errorMessage}`);
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "error",
|
|
lastCopyError: errorMessage,
|
|
});
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
status: "error",
|
|
error: errorMessage,
|
|
});
|
|
}
|
|
};
|
|
|
|
export const backupsExecutionService = {
|
|
executeBackup,
|
|
validateBackupExecution,
|
|
getSchedulesToExecute,
|
|
stopBackup,
|
|
runForget,
|
|
copyToMirrors,
|
|
};
|