mirror of
https://github.com/nicotsx/zerobyte.git
synced 2026-06-04 06:28:45 -04:00
* refactor(restic): all commands are effects * fix(restic): preserve effect failure errors * chore: pr feedbacks
267 lines
8.1 KiB
TypeScript
267 lines
8.1 KiB
TypeScript
import { BadRequestError, NotFoundError } from "http-errors-enhanced";
|
|
import { logger } from "@zerobyte/core/node";
|
|
import type { BackupSchedule, Repository } from "../../../db/schema";
|
|
import { restic } from "../../../core/restic";
|
|
import { repoMutex } from "../../../core/repository-mutex";
|
|
import { serverEvents } from "../../../core/events";
|
|
import { cache, cacheKeys } from "../../../utils/cache";
|
|
import { runEffectPromise, toMessage } from "../../../utils/errors";
|
|
import { getOrganizationId } from "~/server/core/request-context";
|
|
import { mirrorQueries, repositoryQueries, scheduleQueries } from "../backups.queries";
|
|
|
|
export async function runForget(scheduleId: number, repositoryId?: string, organizationIdOverride?: string) {
|
|
const organizationId = organizationIdOverride ?? getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
if (!schedule.retentionPolicy) {
|
|
throw new BadRequestError("No retention policy configured for this schedule");
|
|
}
|
|
|
|
const repository = await repositoryQueries.findById(repositoryId ?? schedule.repositoryId, organizationId);
|
|
|
|
if (!repository) {
|
|
throw new NotFoundError("Repository not found");
|
|
}
|
|
|
|
logger.info(`running retention policy (forget) for schedule ${scheduleId}`);
|
|
const releaseLock = await repoMutex.acquireExclusive(repository.id, `forget:${scheduleId}`);
|
|
|
|
try {
|
|
await runEffectPromise(
|
|
restic.forget(repository.config, schedule.retentionPolicy, { tag: schedule.shortId, organizationId }),
|
|
);
|
|
cache.delByPrefix(cacheKeys.repository.all(repository.id));
|
|
} finally {
|
|
releaseLock();
|
|
}
|
|
|
|
logger.info(`Retention policy applied successfully for schedule ${scheduleId}`);
|
|
}
|
|
|
|
export async function copyToMirrors(
|
|
scheduleId: number,
|
|
sourceRepository: Repository,
|
|
retentionPolicy: BackupSchedule["retentionPolicy"],
|
|
organizationIdOverride?: string,
|
|
) {
|
|
const organizationId = organizationIdOverride ?? getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
const mirrors = await mirrorQueries.findEnabledBySchedule(scheduleId);
|
|
|
|
if (mirrors.length === 0) {
|
|
return;
|
|
}
|
|
|
|
logger.info(`[Background] Copying snapshots to ${mirrors.length} mirror repositories for schedule ${scheduleId}`);
|
|
|
|
for (const mirror of mirrors) {
|
|
await copyToSingleMirror(scheduleId, schedule, sourceRepository, mirror, retentionPolicy, organizationId);
|
|
}
|
|
}
|
|
|
|
export async function syncSnapshotsToMirror(
|
|
scheduleId: number,
|
|
mirrorRepositoryId: string,
|
|
organizationIdOverride?: string,
|
|
snapshotIds?: string[],
|
|
) {
|
|
const organizationId = organizationIdOverride ?? getOrganizationId();
|
|
const schedule = await scheduleQueries.findById(scheduleId, organizationId);
|
|
|
|
if (!schedule) {
|
|
throw new NotFoundError("Backup schedule not found");
|
|
}
|
|
|
|
const mirror = await mirrorQueries.findByScheduleAndRepository(scheduleId, mirrorRepositoryId);
|
|
|
|
if (!mirror) {
|
|
throw new NotFoundError("Mirror not found for this schedule");
|
|
}
|
|
|
|
const sourceRepository = await repositoryQueries.findById(schedule.repositoryId, organizationId);
|
|
|
|
if (!sourceRepository) {
|
|
throw new NotFoundError("Source repository not found");
|
|
}
|
|
|
|
const mirrorRepository = await repositoryQueries.findById(mirrorRepositoryId, organizationId);
|
|
|
|
if (!mirrorRepository) {
|
|
throw new NotFoundError("Mirror repository not found");
|
|
}
|
|
|
|
try {
|
|
logger.info(`[Background] Syncing all snapshots to mirror repository: ${mirrorRepository.name}`);
|
|
|
|
serverEvents.emit("mirror:started", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirrorRepository.shortId,
|
|
repositoryName: mirrorRepository.name,
|
|
});
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
|
|
lastCopyStatus: "in_progress",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
const releaseLocks = await repoMutex.acquireMany([
|
|
{ repositoryId: sourceRepository.id, type: "shared", operation: `mirror_sync_source:${scheduleId}` },
|
|
{ repositoryId: mirrorRepository.id, type: "exclusive", operation: `mirror_sync:${scheduleId}` },
|
|
]);
|
|
|
|
try {
|
|
await runEffectPromise(
|
|
restic.copy(sourceRepository.config, mirrorRepository.config, {
|
|
tag: schedule.shortId,
|
|
organizationId,
|
|
snapshotIds,
|
|
}),
|
|
);
|
|
cache.delByPrefix(cacheKeys.repository.all(mirrorRepository.id));
|
|
} finally {
|
|
releaseLocks();
|
|
}
|
|
|
|
if (schedule.retentionPolicy) {
|
|
void runForget(scheduleId, mirrorRepository.id, organizationId).catch((error) => {
|
|
logger.error(
|
|
`Failed to run retention policy for mirror repository ${mirrorRepository.name}: ${toMessage(error)}`,
|
|
);
|
|
});
|
|
}
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "success",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
logger.info(`[Background] Successfully synced all snapshots to mirror repository: ${mirrorRepository.name}`);
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirrorRepository.shortId,
|
|
repositoryName: mirrorRepository.name,
|
|
status: "success",
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = toMessage(error);
|
|
logger.error(
|
|
`[Background] Failed to sync all snapshots to mirror repository ${mirrorRepository.name}: ${errorMessage}`,
|
|
);
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirrorRepositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "error",
|
|
lastCopyError: errorMessage,
|
|
});
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirrorRepository.shortId,
|
|
repositoryName: mirrorRepository.name,
|
|
status: "error",
|
|
error: errorMessage,
|
|
});
|
|
}
|
|
}
|
|
|
|
async function copyToSingleMirror(
|
|
scheduleId: number,
|
|
schedule: BackupSchedule,
|
|
sourceRepository: Repository,
|
|
mirror: {
|
|
repositoryId: string;
|
|
repository: Repository;
|
|
},
|
|
retentionPolicy: BackupSchedule["retentionPolicy"],
|
|
organizationId: string,
|
|
) {
|
|
try {
|
|
logger.info(`[Background] Copying to mirror repository: ${mirror.repository.name}`);
|
|
|
|
serverEvents.emit("mirror:started", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
});
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyStatus: "in_progress",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
const releaseLocks = await repoMutex.acquireMany([
|
|
{ repositoryId: sourceRepository.id, type: "shared", operation: `mirror_source:${scheduleId}` },
|
|
{ repositoryId: mirror.repository.id, type: "exclusive", operation: `mirror:${scheduleId}` },
|
|
]);
|
|
|
|
try {
|
|
await runEffectPromise(
|
|
restic.copy(sourceRepository.config, mirror.repository.config, {
|
|
tag: schedule.shortId,
|
|
organizationId,
|
|
}),
|
|
);
|
|
cache.delByPrefix(cacheKeys.repository.all(mirror.repository.id));
|
|
} finally {
|
|
releaseLocks();
|
|
}
|
|
|
|
if (retentionPolicy) {
|
|
void runForget(scheduleId, mirror.repository.id, organizationId).catch((error) => {
|
|
logger.error(
|
|
`Failed to run retention policy for mirror repository ${mirror.repository.name}: ${toMessage(error)}`,
|
|
);
|
|
});
|
|
}
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "success",
|
|
lastCopyError: null,
|
|
});
|
|
|
|
logger.info(`[Background] Successfully copied to mirror repository: ${mirror.repository.name}`);
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
status: "success",
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = toMessage(error);
|
|
logger.error(`[Background] Failed to copy to mirror repository ${mirror.repository.name}: ${errorMessage}`);
|
|
|
|
await mirrorQueries.updateStatus(scheduleId, mirror.repositoryId, {
|
|
lastCopyAt: Date.now(),
|
|
lastCopyStatus: "error",
|
|
lastCopyError: errorMessage,
|
|
});
|
|
|
|
serverEvents.emit("mirror:completed", {
|
|
organizationId,
|
|
scheduleId: schedule.shortId,
|
|
repositoryId: mirror.repository.shortId,
|
|
repositoryName: mirror.repository.name,
|
|
status: "error",
|
|
error: errorMessage,
|
|
});
|
|
}
|
|
}
|