mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-06-04 06:28:45 -04:00
342 lines
11 KiB
TypeScript
342 lines
11 KiB
TypeScript
import { BadRequestError, NotFoundError } from "http-errors-enhanced";
|
|
import { logger } from "@zerobyte/core/node";
|
|
import type { ResticBackupOutputDto } from "@zerobyte/core/restic";
|
|
import type { BackupSchedule, Repository, Volume } from "../../../db/schema";
|
|
import { serverEvents } from "../../../core/events";
|
|
import { cache, cacheKeys } from "../../../utils/cache";
|
|
import { toErrorDetails, toMessage } from "../../../utils/errors";
|
|
import { notificationsService } from "../../notifications/notifications.service";
|
|
import { getOrganizationId } from "~/server/core/request-context";
|
|
import type { BackupProgressEventDto } from "~/schemas/events-dto";
|
|
import { calculateNextRun } from "../backup.helpers";
|
|
import { scheduleQueries } from "../backups.queries";
|
|
import type { BackupExecutionProgress } from "../../agents/agents-manager";
|
|
import { repositoriesService } from "../../repositories/repositories.service";
|
|
import { volumeService } from "../../volumes/volume.service";
|
|
import { config } from "../../../core/config";
|
|
import { LOCAL_AGENT_ID } from "../../agents/constants";
|
|
import { copyToMirrors, runForget } from "./backup-maintenance";
|
|
|
|
interface BackupContext {
|
|
schedule: BackupSchedule;
|
|
volume: Volume;
|
|
repository: Repository;
|
|
organizationId: string;
|
|
}
|
|
|
|
type ValidationSuccess = {
|
|
type: "success";
|
|
context: BackupContext;
|
|
};
|
|
|
|
type ValidationFailure = {
|
|
type: "failure";
|
|
error: Error;
|
|
partialContext?: Partial<BackupContext>;
|
|
};
|
|
|
|
type ValidationSkipped = {
|
|
type: "skipped";
|
|
reason: string;
|
|
};
|
|
|
|
type ValidationResult = ValidationSuccess | ValidationFailure | ValidationSkipped;
|
|
|
|
const requiresControllerLocalVolumeReadiness = (volume: Volume) =>
|
|
volume.agentId === LOCAL_AGENT_ID && !config.flags.enableLocalAgent;
|
|
|
|
export function getBackupProgress(scheduleId: number): BackupProgressEventDto | undefined {
|
|
return cache.get<BackupProgressEventDto>(cacheKeys.backup.progress(scheduleId));
|
|
}
|
|
|
|
export async function validateBackupExecution(scheduleId: number, manual = false): Promise<ValidationResult> {
|
|
const organizationId = getOrganizationId();
|
|
const result = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!result) {
|
|
return { type: "failure", error: new NotFoundError("Backup schedule not found") };
|
|
}
|
|
|
|
const { volume, repository, ...schedule } = result;
|
|
|
|
if (!schedule.enabled && !manual) {
|
|
logger.info(`Backup schedule ${scheduleId} is disabled. Skipping execution.`);
|
|
return { type: "skipped", reason: "Backup schedule is disabled" };
|
|
}
|
|
|
|
if (schedule.lastBackupStatus === "in_progress") {
|
|
logger.info(`Backup schedule ${scheduleId} is already in progress. Skipping execution.`);
|
|
return { type: "skipped", reason: "Backup is already in progress" };
|
|
}
|
|
|
|
if (!volume) {
|
|
return { type: "failure", error: new NotFoundError("Volume not found"), partialContext: { schedule } };
|
|
}
|
|
|
|
if (!repository) {
|
|
return { type: "failure", error: new NotFoundError("Repository not found"), partialContext: { schedule, volume } };
|
|
}
|
|
|
|
if (!requiresControllerLocalVolumeReadiness(volume)) {
|
|
return {
|
|
type: "success",
|
|
context: { schedule, volume, repository, organizationId },
|
|
};
|
|
}
|
|
|
|
const volumeReadiness = await volumeService.ensureHealthyVolume(volume.shortId);
|
|
|
|
if (!volumeReadiness.ready) {
|
|
return {
|
|
type: "failure",
|
|
error: new BadRequestError(volumeReadiness.reason),
|
|
partialContext: { schedule, volume: volumeReadiness.volume, repository },
|
|
};
|
|
}
|
|
|
|
return {
|
|
type: "success",
|
|
context: { schedule, volume: volumeReadiness.volume, repository, organizationId },
|
|
};
|
|
}
|
|
|
|
export async function handleValidationResult(
|
|
scheduleId: number,
|
|
result: ValidationFailure | ValidationSkipped,
|
|
manual: boolean,
|
|
) {
|
|
const organizationId = getOrganizationId();
|
|
|
|
if (result.type === "skipped") {
|
|
logger.info(`Backup execution for schedule ${scheduleId} was skipped: ${result.reason}`);
|
|
return;
|
|
}
|
|
|
|
await handleBackupFailure(scheduleId, organizationId, result.error, manual, result.partialContext);
|
|
}
|
|
|
|
export function emitBackupStarted(ctx: BackupContext, scheduleId: number) {
|
|
logger.info(
|
|
`Starting backup ${ctx.schedule.name} for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
|
|
serverEvents.emit("backup:started", {
|
|
organizationId: ctx.organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, "start", {
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
scheduleName: ctx.schedule.name,
|
|
})
|
|
.catch((error) => {
|
|
logger.error(`Failed to send backup start notification: ${toMessage(error)}`);
|
|
});
|
|
}
|
|
|
|
export function updateBackupProgress(ctx: BackupContext, progress: BackupExecutionProgress) {
|
|
const progressEvent = {
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
...progress,
|
|
};
|
|
|
|
cache.set(cacheKeys.backup.progress(ctx.schedule.id), progressEvent, 60 * 60);
|
|
|
|
serverEvents.emit("backup:progress", {
|
|
organizationId: ctx.organizationId,
|
|
...progressEvent,
|
|
});
|
|
}
|
|
|
|
export async function finalizeSuccessfulBackup(
|
|
ctx: BackupContext,
|
|
exitCode: number,
|
|
result: ResticBackupOutputDto | null,
|
|
warningDetails: string | null,
|
|
) {
|
|
const scheduleId = ctx.schedule.id;
|
|
const finalStatus = exitCode === 0 && !warningDetails ? "success" : "warning";
|
|
|
|
if (ctx.schedule.retentionPolicy) {
|
|
void runForget(scheduleId, undefined, ctx.organizationId).catch((error) => {
|
|
logger.error(`Failed to run retention policy for schedule ${scheduleId}: ${toMessage(error)}`);
|
|
});
|
|
}
|
|
|
|
void copyToMirrors(scheduleId, ctx.repository, ctx.schedule.retentionPolicy, ctx.organizationId).catch((error) => {
|
|
logger.error(`Background mirror copy failed for schedule ${scheduleId}: ${toMessage(error)}`);
|
|
});
|
|
|
|
cache.delByPrefix(cacheKeys.repository.all(ctx.repository.id));
|
|
|
|
void repositoriesService.refreshRepositoryStats(ctx.repository.shortId).catch((error) => {
|
|
logger.error(
|
|
`Background repository stats refresh failed for schedule ${scheduleId} (${ctx.repository.shortId}): ${toMessage(error)}`,
|
|
);
|
|
});
|
|
|
|
await scheduleQueries.updateStatus(scheduleId, ctx.organizationId, {
|
|
lastBackupAt: Date.now(),
|
|
lastBackupStatus: finalStatus,
|
|
lastBackupError: finalStatus === "warning" ? warningDetails : null,
|
|
nextBackupAt: ctx.schedule.cronExpression ? calculateNextRun(ctx.schedule.cronExpression) : null,
|
|
failureRetryCount: 0,
|
|
});
|
|
|
|
if (finalStatus === "warning") {
|
|
logger.warn(
|
|
`Backup ${ctx.schedule.name} completed with warnings for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
} else {
|
|
logger.info(
|
|
`Backup ${ctx.schedule.name} completed successfully for volume ${ctx.volume.name} to repository ${ctx.repository.name}`,
|
|
);
|
|
}
|
|
|
|
serverEvents.emit("backup:completed", {
|
|
organizationId: ctx.organizationId,
|
|
scheduleId: ctx.schedule.shortId,
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
status: finalStatus,
|
|
summary: result ?? undefined,
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, finalStatus, {
|
|
volumeName: ctx.volume.name,
|
|
repositoryName: ctx.repository.name,
|
|
scheduleName: ctx.schedule.name,
|
|
error: finalStatus === "warning" ? (warningDetails ?? undefined) : undefined,
|
|
summary: result ?? undefined,
|
|
})
|
|
.catch((error) => {
|
|
logger.error(`Failed to send backup success notification: ${toMessage(error)}`);
|
|
});
|
|
}
|
|
|
|
export async function handleBackupFailure(
|
|
scheduleId: number,
|
|
organizationId: string,
|
|
error: unknown,
|
|
manual: boolean,
|
|
partialContext?: Partial<BackupContext>,
|
|
) {
|
|
const errorMessage = toMessage(error);
|
|
const errorDetails = toErrorDetails(error);
|
|
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
lastBackupAt: Date.now(),
|
|
lastBackupStatus: "error",
|
|
lastBackupError: errorDetails,
|
|
});
|
|
|
|
if (!partialContext?.schedule || !partialContext?.volume || !partialContext?.repository) {
|
|
return;
|
|
}
|
|
|
|
// Determine if the backup should be retried
|
|
const schedule = partialContext.schedule;
|
|
const currentRetryCount = schedule.failureRetryCount;
|
|
const maxRetries = schedule.maxRetries;
|
|
const shouldRetry = currentRetryCount < maxRetries;
|
|
const nextRetryBackupAt = Date.now() + schedule.retryDelay;
|
|
const nextScheduledBackupAt = schedule.cronExpression ? calculateNextRun(schedule.cronExpression) : null;
|
|
|
|
if (!manual && shouldRetry && nextScheduledBackupAt && nextRetryBackupAt < nextScheduledBackupAt) {
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
nextBackupAt: nextRetryBackupAt,
|
|
failureRetryCount: currentRetryCount + 1,
|
|
});
|
|
|
|
const delayMinutes = Math.round((schedule.retryDelay / (60 * 1000)) * 10) / 10;
|
|
|
|
logger.error(
|
|
`Backup ${schedule.name} failed. Scheduling retry ${currentRetryCount + 1}/${maxRetries} for ${delayMinutes} minutes from now: ${errorMessage}`,
|
|
);
|
|
|
|
if (partialContext?.volume && partialContext?.repository) {
|
|
serverEvents.emit("backup:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
volumeName: partialContext.volume.name,
|
|
repositoryName: partialContext.repository.name,
|
|
status: "error",
|
|
});
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, "failure", {
|
|
volumeName: partialContext.volume.name,
|
|
repositoryName: partialContext.repository.name,
|
|
scheduleName: schedule.name,
|
|
error: `${errorDetails}\n\nRetrying in ${delayMinutes} minutes (attempt ${currentRetryCount + 1}/${maxRetries})`,
|
|
})
|
|
.catch((notifError) => {
|
|
logger.error(`Failed to send backup failure notification: ${toMessage(notifError)}`);
|
|
});
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
failureRetryCount: 0,
|
|
});
|
|
|
|
const { volume, repository } = partialContext;
|
|
|
|
if (manual) {
|
|
logger.error(
|
|
`Manual backup ${schedule.name} failed for volume ${volume.name} to repository ${repository.name}: ${errorMessage}`,
|
|
);
|
|
} else {
|
|
logger.error(
|
|
`Backup ${schedule.name} failed after ${maxRetries} retries for volume ${volume.name} to repository ${repository.name}: ${errorMessage}`,
|
|
);
|
|
}
|
|
|
|
serverEvents.emit("backup:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
volumeName: volume.name,
|
|
repositoryName: repository.name,
|
|
status: "error",
|
|
});
|
|
|
|
let errorNotificationMessage = `${errorDetails}`;
|
|
if (!manual && currentRetryCount > 0) {
|
|
errorNotificationMessage = `${errorDetails}\n\nFailed after ${currentRetryCount} retry attempts.`;
|
|
}
|
|
|
|
notificationsService
|
|
.sendBackupNotification(scheduleId, "failure", {
|
|
volumeName: volume.name,
|
|
repositoryName: repository.name,
|
|
scheduleName: schedule.name,
|
|
error: errorNotificationMessage,
|
|
})
|
|
.catch((notifyError) => {
|
|
logger.error(`Failed to send backup failure notification: ${toMessage(notifyError)}`);
|
|
});
|
|
}
|
|
|
|
export async function handleBackupCancellation(
|
|
scheduleId: number,
|
|
organizationId: string,
|
|
message?: string,
|
|
shouldSetLastBackupAt = true,
|
|
) {
|
|
await scheduleQueries.updateStatus(scheduleId, organizationId, {
|
|
lastBackupAt: shouldSetLastBackupAt ? Date.now() : undefined,
|
|
lastBackupStatus: "warning",
|
|
lastBackupError: message ?? "Backup was stopped by the user",
|
|
failureRetryCount: 0,
|
|
});
|
|
}
|