Files
zerobyte/app/server/modules/backups/helpers/backup-maintenance.ts
Nico d4436b0cdc refactor(restic): all commands return effects (#924)
* refactor(restic): all commands are effects

* fix(restic): preserve effect failure errors

* chore: pr feedbacks
2026-05-30 10:10:54 +02:00

267 lines
8.1 KiB
TypeScript

import { BadRequestError, NotFoundError } from "http-errors-enhanced";
import { logger } from "@zerobyte/core/node";
import type { BackupSchedule, Repository } from "../../../db/schema";
import { restic } from "../../../core/restic";
import { repoMutex } from "../../../core/repository-mutex";
import { serverEvents } from "../../../core/events";
import { cache, cacheKeys } from "../../../utils/cache";
import { runEffectPromise, toMessage } from "../../../utils/errors";
import { getOrganizationId } from "~/server/core/request-context";
import { mirrorQueries, repositoryQueries, scheduleQueries } from "../backups.queries";
export async function runForget(scheduleId: number, repositoryId?: string, organizationIdOverride?: string) {
const organizationId = organizationIdOverride ?? getOrganizationId();
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
if (!schedule) {
throw new NotFoundError("Backup schedule not found");
}
if (!schedule.retentionPolicy) {
throw new BadRequestError("No retention policy configured for this schedule");
}
const repository = await repositoryQueries.findById(repositoryId ?? schedule.repositoryId, organizationId);
if (!repository) {
throw new NotFoundError("Repository not found");
}
logger.info(`running retention policy (forget) for schedule ${scheduleId}`);
const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:${scheduleId}`);
try {
await runEffectPromise(
restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.shortId, organizationId }),
);
cache.delByPrefix(cacheKeys.repository.all(repository.id));
} finally {
releaseLock();
}
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
}
export async function copyToMirrors(
scheduleId: number,
sourceRepository: Repository,
retentionPolicy: BackupSchedule["retentionPolicy"],
organizationIdOverride?: string,
) {
const organizationId = organizationIdOverride ?? getOrganizationId();
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
if (!schedule) {
throw new NotFoundError("Backup schedule not found");
}
const mirrors = await mirrorQueries.findEnabledBySchedule(scheduleId);
if (mirrors.length === 0) {
return;
}
logger.info(`[Background] Copying snapshots to ${mirrors.length} mirror repositories for schedule ${scheduleId}`);
for (const mirror of mirrors) {
await copyToSingleMirror(scheduleId, schedule, sourceRepository, mirror, retentionPolicy, organizationId);
}
}
export async function syncSnapshotsToMirror(
scheduleId: number,
mirrorRepositoryId: string,
organizationIdOverride?: string,
snapshotIds?: string[],
) {
const organizationId = organizationIdOverride ?? getOrganizationId();
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
if (!schedule) {
throw new NotFoundError("Backup schedule not found");
}
const mirror = await mirrorQueries.findByScheduleAndRepository(scheduleId, mirrorRepositoryId);
if (!mirror) {
throw new NotFoundError("Mirror not found for this schedule");
}
const sourceRepository = await repositoryQueries.findById(schedule.repositoryId, organizationId);
if (!sourceRepository) {
throw new NotFoundError("Source repository not found");
}
const mirrorRepository = await repositoryQueries.findById(mirrorRepositoryId, organizationId);
if (!mirrorRepository) {
throw new NotFoundError("Mirror repository not found");
}
try {
logger.info(`[Background] Syncing all snapshots to mirror repository: ${mirrorRepository.name}`);
serverEvents.emit("mirror:started", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirrorRepository.shortId,
repositoryName: mirrorRepository.name,
});
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
lastCopyStatus: "in_progress",
lastCopyError: null,
});
const releaseLocks = await repoMutex.acquireMany([
{ repositoryId: sourceRepository.id, type: "shared", operation: `mirror_sync_source:${scheduleId}` },
{ repositoryId: mirrorRepository.id, type: "exclusive", operation: `mirror_sync:${scheduleId}` },
]);
try {
await runEffectPromise(
restic.copy(sourceRepository.config, mirrorRepository.config, {
tag: schedule.shortId,
organizationId,
snapshotIds,
}),
);
cache.delByPrefix(cacheKeys.repository.all(mirrorRepository.id));
} finally {
releaseLocks();
}
if (schedule.retentionPolicy) {
void runForget(scheduleId, mirrorRepository.id, organizationId).catch((error) => {
logger.error(
`Failed to run retention policy for mirror repository ${mirrorRepository.name}: ${toMessage(error)}`,
);
});
}
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
lastCopyAt: Date.now(),
lastCopyStatus: "success",
lastCopyError: null,
});
logger.info(`[Background] Successfully synced all snapshots to mirror repository: ${mirrorRepository.name}`);
serverEvents.emit("mirror:completed", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirrorRepository.shortId,
repositoryName: mirrorRepository.name,
status: "success",
});
} catch (error) {
const errorMessage = toMessage(error);
logger.error(
`[Background] Failed to sync all snapshots to mirror repository ${mirrorRepository.name}: ${errorMessage}`,
);
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
lastCopyAt: Date.now(),
lastCopyStatus: "error",
lastCopyError: errorMessage,
});
serverEvents.emit("mirror:completed", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirrorRepository.shortId,
repositoryName: mirrorRepository.name,
status: "error",
error: errorMessage,
});
}
}
async function copyToSingleMirror(
scheduleId: number,
schedule: BackupSchedule,
sourceRepository: Repository,
mirror: {
repositoryId: string;
repository: Repository;
},
retentionPolicy: BackupSchedule["retentionPolicy"],
organizationId: string,
) {
try {
logger.info(`[Background] Copying to mirror repository: ${mirror.repository.name}`);
serverEvents.emit("mirror:started", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirror.repository.shortId,
repositoryName: mirror.repository.name,
});
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
lastCopyStatus: "in_progress",
lastCopyError: null,
});
const releaseLocks = await repoMutex.acquireMany([
{ repositoryId: sourceRepository.id, type: "shared", operation: `mirror_source:${scheduleId}` },
{ repositoryId: mirror.repository.id, type: "exclusive", operation: `mirror:${scheduleId}` },
]);
try {
await runEffectPromise(
restic.copy(sourceRepository.config, mirror.repository.config, {
tag: schedule.shortId,
organizationId,
}),
);
cache.delByPrefix(cacheKeys.repository.all(mirror.repository.id));
} finally {
releaseLocks();
}
if (retentionPolicy) {
void runForget(scheduleId, mirror.repository.id, organizationId).catch((error) => {
logger.error(
`Failed to run retention policy for mirror repository ${mirror.repository.name}: ${toMessage(error)}`,
);
});
}
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
lastCopyAt: Date.now(),
lastCopyStatus: "success",
lastCopyError: null,
});
logger.info(`[Background] Successfully copied to mirror repository: ${mirror.repository.name}`);
serverEvents.emit("mirror:completed", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirror.repository.shortId,
repositoryName: mirror.repository.name,
status: "success",
});
} catch (error) {
const errorMessage = toMessage(error);
logger.error(`[Background] Failed to copy to mirror repository ${mirror.repository.name}: ${errorMessage}`);
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
lastCopyAt: Date.now(),
lastCopyStatus: "error",
lastCopyError: errorMessage,
});
serverEvents.emit("mirror:completed", {
organizationId,
scheduleId: schedule.shortId,
repositoryId: mirror.repository.shortId,
repositoryName: mirror.repository.name,
status: "error",
error: errorMessage,
});
}
}