import { BadRequestError, NotFoundError } from "http-errors-enhanced"; import { logger } from "@zerobyte/core/node"; import type { BackupSchedule, Repository } from "../../../db/schema"; import { restic } from "../../../core/restic"; import { repoMutex } from "../../../core/repository-mutex"; import { serverEvents } from "../../../core/events"; import { cache, cacheKeys } from "../../../utils/cache"; import { runEffectPromise, toMessage } from "../../../utils/errors"; import { getOrganizationId } from "~/server/core/request-context"; import { mirrorQueries, repositoryQueries, scheduleQueries } from "../backups.queries"; export async function runForget(scheduleId: number, repositoryId?: string, organizationIdOverride?: string) { const organizationId = organizationIdOverride ?? getOrganizationId(); const schedule = await scheduleQueries.findById(scheduleId, organizationId); if (!schedule) { throw new NotFoundError("Backup schedule not found"); } if (!schedule.retentionPolicy) { throw new BadRequestError("No retention policy configured for this schedule"); } const repository = await repositoryQueries.findById(repositoryId ?? schedule.repositoryId, organizationId); if (!repository) { throw new NotFoundError("Repository not found"); } logger.info(`running retention policy (forget) for schedule ${scheduleId}`); const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:${scheduleId}`); try { await runEffectPromise( restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.shortId, organizationId }), ); cache.delByPrefix(cacheKeys.repository.all(repository.id)); } finally { releaseLock(); } logger.info(`Retention policy applied successfully for schedule ${scheduleId}`); } export async function copyToMirrors( scheduleId: number, sourceRepository: Repository, retentionPolicy: BackupSchedule["retentionPolicy"], organizationIdOverride?: string, ) { const organizationId = organizationIdOverride ?? getOrganizationId(); const schedule = await scheduleQueries.findById(scheduleId, organizationId); if (!schedule) { throw new NotFoundError("Backup schedule not found"); } const mirrors = await mirrorQueries.findEnabledBySchedule(scheduleId); if (mirrors.length === 0) { return; } logger.info(`[Background] Copying snapshots to ${mirrors.length} mirror repositories for schedule ${scheduleId}`); for (const mirror of mirrors) { await copyToSingleMirror(scheduleId, schedule, sourceRepository, mirror, retentionPolicy, organizationId); } } export async function syncSnapshotsToMirror( scheduleId: number, mirrorRepositoryId: string, organizationIdOverride?: string, snapshotIds?: string[], ) { const organizationId = organizationIdOverride ?? getOrganizationId(); const schedule = await scheduleQueries.findById(scheduleId, organizationId); if (!schedule) { throw new NotFoundError("Backup schedule not found"); } const mirror = await mirrorQueries.findByScheduleAndRepository(scheduleId, mirrorRepositoryId); if (!mirror) { throw new NotFoundError("Mirror not found for this schedule"); } const sourceRepository = await repositoryQueries.findById(schedule.repositoryId, organizationId); if (!sourceRepository) { throw new NotFoundError("Source repository not found"); } const mirrorRepository = await repositoryQueries.findById(mirrorRepositoryId, organizationId); if (!mirrorRepository) { throw new NotFoundError("Mirror repository not found"); } try { logger.info(`[Background] Syncing all snapshots to mirror repository: ${mirrorRepository.name}`); serverEvents.emit("mirror:started", { organizationId, scheduleId: schedule.shortId, repositoryId: mirrorRepository.shortId, repositoryName: mirrorRepository.name, }); await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, { lastCopyStatus: "in_progress", lastCopyError: null, }); const releaseLocks = await repoMutex.acquireMany([ { repositoryId: sourceRepository.id, type: "shared", operation: `mirror_sync_source:${scheduleId}` }, { repositoryId: mirrorRepository.id, type: "exclusive", operation: `mirror_sync:${scheduleId}` }, ]); try { await runEffectPromise( restic.copy(sourceRepository.config, mirrorRepository.config, { tag: schedule.shortId, organizationId, snapshotIds, }), ); cache.delByPrefix(cacheKeys.repository.all(mirrorRepository.id)); } finally { releaseLocks(); } if (schedule.retentionPolicy) { void runForget(scheduleId, mirrorRepository.id, organizationId).catch((error) => { logger.error( `Failed to run retention policy for mirror repository ${mirrorRepository.name}: ${toMessage(error)}`, ); }); } await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, { lastCopyAt: Date.now(), lastCopyStatus: "success", lastCopyError: null, }); logger.info(`[Background] Successfully synced all snapshots to mirror repository: ${mirrorRepository.name}`); serverEvents.emit("mirror:completed", { organizationId, scheduleId: schedule.shortId, repositoryId: mirrorRepository.shortId, repositoryName: mirrorRepository.name, status: "success", }); } catch (error) { const errorMessage = toMessage(error); logger.error( `[Background] Failed to sync all snapshots to mirror repository ${mirrorRepository.name}: ${errorMessage}`, ); await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, { lastCopyAt: Date.now(), lastCopyStatus: "error", lastCopyError: errorMessage, }); serverEvents.emit("mirror:completed", { organizationId, scheduleId: schedule.shortId, repositoryId: mirrorRepository.shortId, repositoryName: mirrorRepository.name, status: "error", error: errorMessage, }); } } async function copyToSingleMirror( scheduleId: number, schedule: BackupSchedule, sourceRepository: Repository, mirror: { repositoryId: string; repository: Repository; }, retentionPolicy: BackupSchedule["retentionPolicy"], organizationId: string, ) { try { logger.info(`[Background] Copying to mirror repository: ${mirror.repository.name}`); serverEvents.emit("mirror:started", { organizationId, scheduleId: schedule.shortId, repositoryId: mirror.repository.shortId, repositoryName: mirror.repository.name, }); await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, { lastCopyStatus: "in_progress", lastCopyError: null, }); const releaseLocks = await repoMutex.acquireMany([ { repositoryId: sourceRepository.id, type: "shared", operation: `mirror_source:${scheduleId}` }, { repositoryId: mirror.repository.id, type: "exclusive", operation: `mirror:${scheduleId}` }, ]); try { await runEffectPromise( restic.copy(sourceRepository.config, mirror.repository.config, { tag: schedule.shortId, organizationId, }), ); cache.delByPrefix(cacheKeys.repository.all(mirror.repository.id)); } finally { releaseLocks(); } if (retentionPolicy) { void runForget(scheduleId, mirror.repository.id, organizationId).catch((error) => { logger.error( `Failed to run retention policy for mirror repository ${mirror.repository.name}: ${toMessage(error)}`, ); }); } await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, { lastCopyAt: Date.now(), lastCopyStatus: "success", lastCopyError: null, }); logger.info(`[Background] Successfully copied to mirror repository: ${mirror.repository.name}`); serverEvents.emit("mirror:completed", { organizationId, scheduleId: schedule.shortId, repositoryId: mirror.repository.shortId, repositoryName: mirror.repository.name, status: "success", }); } catch (error) { const errorMessage = toMessage(error); logger.error(`[Background] Failed to copy to mirror repository ${mirror.repository.name}: ${errorMessage}`); await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, { lastCopyAt: Date.now(), lastCopyStatus: "error", lastCopyError: errorMessage, }); serverEvents.emit("mirror:completed", { organizationId, scheduleId: schedule.shortId, repositoryId: mirror.repository.shortId, repositoryName: mirror.repository.name, status: "error", error: errorMessage, }); } }