refactor(mutext): persist repository locks in database (#895)

* refactor(mutext): persist repository locks in database

* fix: clean up promoted repository lock on queued abort

* fix: throttle repository lock cleanup during polling
This commit is contained in:
Nico
2026-05-19 18:59:40 +02:00
committed by GitHub
parent b292f94186
commit 94f6d0529f
7 changed files with 3446 additions and 339 deletions

View File

@@ -0,0 +1,28 @@
CREATE TABLE `repository_lock_waiters` (
`id` text PRIMARY KEY,
`repository_id` text NOT NULL,
`type` text NOT NULL,
`operation` text NOT NULL,
`owner_id` text NOT NULL,
`requested_at` integer NOT NULL,
`expires_at` integer NOT NULL,
`heartbeat_at` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE `repository_locks` (
`id` text PRIMARY KEY,
`repository_id` text NOT NULL,
`type` text NOT NULL,
`operation` text NOT NULL,
`owner_id` text NOT NULL,
`acquired_at` integer NOT NULL,
`expires_at` integer NOT NULL,
`heartbeat_at` integer NOT NULL
);
--> statement-breakpoint
CREATE INDEX `repository_lock_waiters_repository_id_idx` ON `repository_lock_waiters` (`repository_id`);--> statement-breakpoint
CREATE INDEX `repository_lock_waiters_expires_at_idx` ON `repository_lock_waiters` (`expires_at`);--> statement-breakpoint
CREATE INDEX `repository_lock_waiters_owner_id_idx` ON `repository_lock_waiters` (`owner_id`);--> statement-breakpoint
CREATE INDEX `repository_locks_repository_id_idx` ON `repository_locks` (`repository_id`);--> statement-breakpoint
CREATE INDEX `repository_locks_expires_at_idx` ON `repository_locks` (`expires_at`);--> statement-breakpoint
CREATE INDEX `repository_locks_owner_id_idx` ON `repository_locks` (`owner_id`);

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,17 @@
import { describe, expect, test } from "vitest";
import { describe, expect, test, vi } from "vitest";
import { eq } from "drizzle-orm";
import { db } from "~/server/db/db";
import { repositoryLocksTable, repositoryLockWaitersTable } from "~/server/db/schema";
import { repoMutex } from "../repository-mutex";
const acquireWithin = <T>(promise: Promise<T>, ms = 500) =>
Promise.race([
promise,
new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Timed out after ${ms}ms`)), ms);
}),
]);
describe("RepositoryMutex", () => {
test("should prioritize waiting exclusive locks over new shared locks", async () => {
const repoId = "test-repo";
@@ -327,6 +338,43 @@ describe("RepositoryMutex", () => {
expect(repoMutex.isLocked(repoB)).toBe(false);
});
test("should not leave a promoted shared lock behind if that waiter aborts before observing acquisition", async () => {
vi.useFakeTimers();
const repoId = "shared-abort-after-promotion";
const releaseExclusive = await repoMutex.acquireExclusive(repoId, "holder");
const firstSharedPromise = repoMutex.acquireShared(repoId, "shared-1");
try {
await vi.advanceTimersByTimeAsync(100);
const controller = new AbortController();
const secondSharedPromise = repoMutex.acquireShared(repoId, "shared-2", controller.signal);
await vi.advanceTimersByTimeAsync(140);
releaseExclusive();
// The first shared waiter wakes first and promotes both shared waiters.
await vi.advanceTimersByTimeAsync(10);
const releaseShared1 = await firstSharedPromise;
controller.abort(new Error("abort after promotion"));
await expect(secondSharedPromise).rejects.toThrow("abort after promotion");
releaseShared1();
const remainingLocks = await db.query.repositoryLocksTable.findMany({
where: { repositoryId: { eq: repoId } },
orderBy: { operation: "asc" },
});
expect(remainingLocks).toEqual([]);
} finally {
await db.delete(repositoryLockWaitersTable).where(eq(repositoryLockWaitersTable.repositoryId, repoId));
await db.delete(repositoryLocksTable).where(eq(repositoryLocksTable.repositoryId, repoId));
vi.useRealTimers();
}
});
test("should safely handle multiple calls to the release function", async () => {
const repoId = "idempotent-release";
@@ -376,4 +424,100 @@ describe("RepositoryMutex", () => {
releaseExclusive();
expect(repoMutex.isLocked(repoId)).toBe(false);
});
test("should ignore and clean expired active lock rows during acquisition", async () => {
const repoId = "expired-active-lock";
const expiredLockId = "expired-active-lock-row";
const now = Date.now();
await db.insert(repositoryLocksTable).values({
id: expiredLockId,
repositoryId: repoId,
type: "exclusive",
operation: "stale-check",
ownerId: "stale-owner",
acquiredAt: now - 10_000,
expiresAt: now - 1,
heartbeatAt: now - 10_000,
});
const releaseShared = await acquireWithin(repoMutex.acquireShared(repoId, "backup"));
try {
const expiredLock = await db.query.repositoryLocksTable.findFirst({
where: { id: { eq: expiredLockId } },
});
expect(expiredLock).toBeUndefined();
expect(repoMutex.isLocked(repoId)).toBe(true);
} finally {
releaseShared();
await db.delete(repositoryLocksTable).where(eq(repositoryLocksTable.repositoryId, repoId));
}
});
test("should ignore and clean expired waiters during acquisition", async () => {
const repoId = "expired-waiter";
const expiredWaiterId = "expired-waiter-row";
const now = Date.now();
await db.insert(repositoryLockWaitersTable).values({
id: expiredWaiterId,
repositoryId: repoId,
type: "exclusive",
operation: "stale-exclusive",
ownerId: "stale-owner",
requestedAt: now - 10_000,
expiresAt: now - 1,
heartbeatAt: now - 10_000,
});
const releaseShared = await acquireWithin(repoMutex.acquireShared(repoId, "backup"));
try {
const expiredWaiter = await db.query.repositoryLockWaitersTable.findFirst({
where: { id: { eq: expiredWaiterId } },
});
expect(expiredWaiter).toBeUndefined();
expect(repoMutex.isLocked(repoId)).toBe(true);
} finally {
releaseShared();
await db.delete(repositoryLockWaitersTable).where(eq(repositoryLockWaitersTable.repositoryId, repoId));
await db.delete(repositoryLocksTable).where(eq(repositoryLocksTable.repositoryId, repoId));
}
});
test("should release only the caller lock row", async () => {
const repoId = "release-own-row";
const foreignLockId = "foreign-release-own-row";
const releaseShared = await repoMutex.acquireShared(repoId, "owned-shared");
const now = Date.now();
try {
await db.insert(repositoryLocksTable).values({
id: foreignLockId,
repositoryId: repoId,
type: "shared",
operation: "foreign-shared",
ownerId: "foreign-owner",
acquiredAt: now,
expiresAt: now + 60_000,
heartbeatAt: now,
});
releaseShared();
const remainingLocks = await db.query.repositoryLocksTable.findMany({
where: { repositoryId: { eq: repoId } },
orderBy: { operation: "asc" },
});
expect(remainingLocks.map((lock) => lock.operation)).toEqual(["foreign-shared"]);
expect(repoMutex.isLocked(repoId)).toBe(true);
} finally {
releaseShared();
await db.delete(repositoryLocksTable).where(eq(repositoryLocksTable.repositoryId, repoId));
}
});
});

View File

@@ -1,4 +1,12 @@
import { and, eq, lte } from "drizzle-orm";
import { logger } from "@zerobyte/core/node";
import { db } from "../db/db";
import {
repositoryLocksTable,
repositoryLockWaitersTable,
type RepositoryLock,
type RepositoryLockWaiter,
} from "../db/schema";
type LockType = "shared" | "exclusive";
@@ -8,292 +16,349 @@ interface LockRequest {
operation: string;
}
interface LockHolder {
interface AcquiredLock {
id: string;
repositoryId: string;
type: LockType;
operation: string;
acquiredAt: number;
}
interface RepositoryLockState {
sharedHolders: Map<string, LockHolder>;
exclusiveHolder: LockHolder | null;
waitQueue: Array<{
type: LockType;
operation: string;
resolve: (lockId: string) => void;
}>;
}
type RepositoryMutexTransaction = Parameters<Parameters<typeof db.transaction>[0]>[0];
type HeartbeatTarget = "lock" | "waiter";
type QueueAttempt = { status: "acquired"; lock: AcquiredLock } | { status: "waiting" } | { status: "missing" };
const LOCK_LEASE_MS = 30_000;
const LOCK_HEARTBEAT_MS = 5_000;
const LOCK_POLL_MS = 250;
const LOCK_POLL_CLEANUP_MS = 5_000;
class RepositoryMutex {
private locks = new Map<string, RepositoryLockState>();
private changeListeners = new Map<string, Set<() => void>>();
private lockIdCounter = 0;
private getOrCreateState(repositoryId: string): RepositoryLockState {
let state = this.locks.get(repositoryId);
if (!state) {
state = {
sharedHolders: new Map(),
exclusiveHolder: null,
waitQueue: [],
};
this.locks.set(repositoryId, state);
}
return state;
}
private ownerId = `owner_${Bun.randomUUIDv7()}`;
private heartbeatTimers = new Map<string, ReturnType<typeof setInterval>>();
private nextPollCleanupAt = 0;
private generateLockId(): string {
return `lock_${++this.lockIdCounter}_${Date.now()}`;
return `lock_${Bun.randomUUIDv7()}`;
}
private cleanupStateIfEmpty(repositoryId: string): void {
const state = this.locks.get(repositoryId);
if (state && state.sharedHolders.size === 0 && !state.exclusiveHolder && state.waitQueue.length === 0) {
this.locks.delete(repositoryId);
}
private abortReason(signal: AbortSignal) {
return signal.reason || new Error("Operation aborted");
}
private notifyChange(repositoryId: string): void {
const listeners = this.changeListeners.get(repositoryId);
if (!listeners) {
return;
}
for (const listener of listeners) {
listener();
}
}
private canAcquireImmediately(state: RepositoryLockState | undefined, type: LockType): boolean {
if (!state) {
return true;
}
if (type === "shared") {
const hasExclusiveInQueue = state.waitQueue.some((item) => item.type === "exclusive");
return !state.exclusiveHolder && !hasExclusiveInQueue;
}
return !state.exclusiveHolder && state.sharedHolders.size === 0 && state.waitQueue.length === 0;
}
private waitForChange(repositoryIds: string[], signal?: AbortSignal) {
private throwIfAborted(signal?: AbortSignal) {
if (signal?.aborted) {
throw signal.reason || new Error("Operation aborted");
throw this.abortReason(signal);
}
}
private releaseIfAborted(releaseLock: () => void, signal?: AbortSignal) {
if (!signal?.aborted) return;
releaseLock();
throw this.abortReason(signal);
}
private waitForPoll(signal?: AbortSignal) {
this.throwIfAborted(signal);
return new Promise<void>((resolve, reject) => {
const uniqueRepositoryIds = [...new Set(repositoryIds)];
const cleanupCallbacks: Array<() => void> = [];
let settled = false;
const timeout = setTimeout(() => settle(resolve), LOCK_POLL_MS);
const onAbort = () => {
settle(() => reject(this.abortReason(signal!)));
};
const cleanup = () => {
clearTimeout(timeout);
signal?.removeEventListener("abort", onAbort);
};
const settle = (callback: () => void) => {
if (settled) {
return;
}
if (settled) return;
settled = true;
for (const cleanup of cleanupCallbacks) {
cleanup();
}
cleanup();
callback();
};
for (const repositoryId of uniqueRepositoryIds) {
let listeners = this.changeListeners.get(repositoryId);
if (!listeners) {
listeners = new Set();
this.changeListeners.set(repositoryId, listeners);
}
const listener = () => settle(resolve);
listeners.add(listener);
cleanupCallbacks.push(() => {
const currentListeners = this.changeListeners.get(repositoryId);
if (!currentListeners) {
return;
}
currentListeners.delete(listener);
if (currentListeners.size === 0) {
this.changeListeners.delete(repositoryId);
}
});
}
if (signal) {
const onAbort = () => {
settle(() => reject(signal.reason || new Error("Operation aborted")));
};
signal.addEventListener("abort", onAbort);
cleanupCallbacks.push(() => {
signal.removeEventListener("abort", onAbort);
});
}
signal?.addEventListener("abort", onAbort, { once: true });
});
}
private tryAcquireMany(requests: LockRequest[]) {
for (const request of requests) {
if (!this.canAcquireImmediately(this.locks.get(request.repositoryId), request.type)) {
return null;
}
private cleanupExpired(tx: RepositoryMutexTransaction, now: number) {
tx.delete(repositoryLocksTable).where(lte(repositoryLocksTable.expiresAt, now)).run();
tx.delete(repositoryLockWaitersTable).where(lte(repositoryLockWaitersTable.expiresAt, now)).run();
}
private cleanupExpiredDuringPolling(tx: RepositoryMutexTransaction, now: number) {
if (now < this.nextPollCleanupAt) return;
this.cleanupExpired(tx, now);
this.nextPollCleanupAt = now + LOCK_POLL_CLEANUP_MS;
}
private getActiveLocks(tx: RepositoryMutexTransaction, repositoryId: string, now: number) {
return tx.query.repositoryLocksTable
.findMany({
where: { AND: [{ repositoryId: { eq: repositoryId } }, { expiresAt: { gt: now } }] },
orderBy: { acquiredAt: "asc", id: "asc" },
})
.sync();
}
private getWaiters(tx: RepositoryMutexTransaction, repositoryId: string, now: number) {
return tx.query.repositoryLockWaitersTable
.findMany({
where: { AND: [{ repositoryId: { eq: repositoryId } }, { expiresAt: { gt: now } }] },
orderBy: { requestedAt: "asc", id: "asc" },
})
.sync();
}
private getActiveLockById(tx: RepositoryMutexTransaction, lockId: string, now: number) {
return tx.query.repositoryLocksTable
.findFirst({ where: { AND: [{ id: { eq: lockId } }, { expiresAt: { gt: now } }] } })
.sync();
}
private getWaiterById(tx: RepositoryMutexTransaction, waiterId: string, now: number) {
return tx.query.repositoryLockWaitersTable
.findFirst({ where: { AND: [{ id: { eq: waiterId } }, { expiresAt: { gt: now } }] } })
.sync();
}
private canAcquireImmediately(type: LockType, activeLocks: RepositoryLock[], waiters: RepositoryLockWaiter[]) {
if (type === "shared") {
return (
!activeLocks.some((lock) => lock.type === "exclusive") &&
!waiters.some((waiter) => waiter.type === "exclusive")
);
}
const releases = requests.map((request) => {
const state = this.getOrCreateState(request.repositoryId);
const lockId = this.generateLockId();
return activeLocks.length === 0 && waiters.length === 0;
}
if (request.type === "shared") {
state.sharedHolders.set(lockId, {
id: lockId,
operation: request.operation,
acquiredAt: Date.now(),
});
} else {
state.exclusiveHolder = {
id: lockId,
operation: request.operation,
acquiredAt: Date.now(),
};
}
return this.createRelease(request.type, request.repositoryId, lockId);
});
let released = false;
return () => {
if (released) {
return;
}
released = true;
for (const release of releases.toReversed()) {
release();
}
private insertLock(
tx: RepositoryMutexTransaction,
request: LockRequest & { id: string; ownerId: string },
now: number,
) {
const lock = {
id: request.id,
repositoryId: request.repositoryId,
type: request.type,
operation: request.operation,
ownerId: request.ownerId,
acquiredAt: now,
expiresAt: now + LOCK_LEASE_MS,
heartbeatAt: now,
};
tx.insert(repositoryLocksTable).values(lock).run();
return lock;
}
async acquireShared(repositoryId: string, operation: string, signal?: AbortSignal): Promise<() => void> {
if (signal?.aborted) {
throw signal.reason || new Error("Operation aborted");
}
private tryAcquireManyRows(requests: LockRequest[]) {
const now = Date.now();
const state = this.getOrCreateState(repositoryId);
return db.transaction((tx) => {
this.cleanupExpired(tx, now);
const hasExclusiveInQueue = state.waitQueue.some((item) => item.type === "exclusive");
for (const request of requests) {
const activeLocks = this.getActiveLocks(tx, request.repositoryId, now);
const waiters = this.getWaiters(tx, request.repositoryId, now);
if (!state.exclusiveHolder && !hasExclusiveInQueue) {
const lockId = this.generateLockId();
state.sharedHolders.set(lockId, {
id: lockId,
operation,
acquiredAt: Date.now(),
});
return () => this.releaseShared(repositoryId, lockId);
}
logger.debug(
`[Mutex] Waiting for shared lock on repo ${repositoryId}: ${operation} (exclusive held by: ${state.exclusiveHolder?.operation ?? "none"}, queue: ${state.waitQueue.length})`,
);
let onAbort: () => void = () => {};
let lockId: string | undefined;
try {
lockId = await new Promise<string>((resolve, reject) => {
const waiter = { type: "shared" as const, operation, resolve };
state.waitQueue.push(waiter);
if (signal) {
onAbort = () => {
const index = state.waitQueue.indexOf(waiter);
if (index !== -1) {
state.waitQueue.splice(index, 1);
this.cleanupStateIfEmpty(repositoryId);
this.notifyChange(repositoryId);
reject(signal.reason || new Error("Operation aborted"));
}
};
signal.addEventListener("abort", onAbort);
if (!this.canAcquireImmediately(request.type, activeLocks, waiters)) {
return null;
}
});
} finally {
signal?.removeEventListener("abort", onAbort);
}
if (signal?.aborted) {
if (lockId) {
this.releaseShared(repositoryId, lockId);
}
throw signal.reason || new Error("Operation aborted");
}
return this.createRelease("shared", repositoryId, lockId!);
return requests.map((request) =>
this.insertLock(tx, { ...request, id: this.generateLockId(), ownerId: this.ownerId }, now),
);
});
}
async acquireExclusive(repositoryId: string, operation: string, signal?: AbortSignal): Promise<() => void> {
if (signal?.aborted) {
throw signal.reason || new Error("Operation aborted");
private tryAcquireImmediately(request: LockRequest, signal?: AbortSignal) {
const locks = this.tryAcquireManyRows([request]);
if (!locks || locks.length === 0) return null;
const [lock] = locks;
const releaseLock = this.createRelease(lock);
this.releaseIfAborted(releaseLock, signal);
return releaseLock;
}
private createWaiter(request: LockRequest, waiterId: string) {
const now = Date.now();
db.transaction((tx) => {
this.cleanupExpired(tx, now);
tx.insert(repositoryLockWaitersTable)
.values({
id: waiterId,
repositoryId: request.repositoryId,
type: request.type,
operation: request.operation,
ownerId: this.ownerId,
requestedAt: now,
expiresAt: now + LOCK_LEASE_MS,
heartbeatAt: now,
})
.run();
});
}
private deleteWaiter(waiterId: string) {
db.delete(repositoryLockWaitersTable)
.where(
and(eq(repositoryLockWaitersTable.id, waiterId), eq(repositoryLockWaitersTable.ownerId, this.ownerId)),
)
.run();
}
private deleteWaiterRow(tx: RepositoryMutexTransaction, waiterId: string): void {
tx.delete(repositoryLockWaitersTable).where(eq(repositoryLockWaitersTable.id, waiterId)).run();
}
private promoteWaiter(tx: RepositoryMutexTransaction, waiter: RepositoryLockWaiter, now: number) {
this.deleteWaiterRow(tx, waiter.id);
return this.insertLock(tx, { ...waiter, id: waiter.id }, now);
}
private getLeadingSharedWaiters(waiters: RepositoryLockWaiter[]) {
const leadingSharedWaiters: RepositoryLockWaiter[] = [];
for (const waiter of waiters) {
if (waiter.type === "exclusive") break;
leadingSharedWaiters.push(waiter);
}
const state = this.getOrCreateState(repositoryId);
return leadingSharedWaiters;
}
if (!state.exclusiveHolder && state.sharedHolders.size === 0 && state.waitQueue.length === 0) {
const lockId = this.generateLockId();
state.exclusiveHolder = {
id: lockId,
operation,
acquiredAt: Date.now(),
};
return () => this.releaseExclusive(repositoryId, lockId);
}
private tryPromoteWaiter(waiterId: string): QueueAttempt {
const now = Date.now();
logger.debug(
`[Mutex] Waiting for exclusive lock on repo ${repositoryId}: ${operation} (shared: ${state.sharedHolders.size}, exclusive: ${state.exclusiveHolder ? "yes" : "no"}, queue: ${state.waitQueue.length})`,
);
return db.transaction((tx) => {
this.cleanupExpiredDuringPolling(tx, now);
let onAbort: () => void = () => {};
let lockId: string | undefined;
try {
lockId = await new Promise<string>((resolve, reject) => {
const waiter = { type: "exclusive" as const, operation, resolve };
state.waitQueue.push(waiter);
if (signal) {
onAbort = () => {
const index = state.waitQueue.indexOf(waiter);
if (index !== -1) {
state.waitQueue.splice(index, 1);
this.cleanupStateIfEmpty(repositoryId);
this.notifyChange(repositoryId);
reject(signal.reason || new Error("Operation aborted"));
}
};
signal.addEventListener("abort", onAbort);
}
});
} finally {
signal?.removeEventListener("abort", onAbort);
}
if (signal?.aborted) {
if (lockId) {
this.releaseExclusive(repositoryId, lockId);
const activeLock = this.getActiveLockById(tx, waiterId, now);
if (activeLock) {
return { status: "acquired", lock: activeLock };
}
throw signal.reason || new Error("Operation aborted");
const waiter = this.getWaiterById(tx, waiterId, now);
if (!waiter) {
return { status: "missing" };
}
const activeLocks = this.getActiveLocks(tx, waiter.repositoryId, now);
const waiters = this.getWaiters(tx, waiter.repositoryId, now);
if (waiter.type === "exclusive") {
if (activeLocks.length > 0 || waiters[0]?.id !== waiter.id) {
return { status: "waiting" };
}
return { status: "acquired", lock: this.promoteWaiter(tx, waiter, now) };
}
if (activeLocks.some((lock) => lock.type === "exclusive")) {
return { status: "waiting" };
}
const leadingSharedWaiters = this.getLeadingSharedWaiters(waiters);
if (!leadingSharedWaiters.some((queuedWaiter) => queuedWaiter.id === waiter.id)) {
return { status: "waiting" };
}
let acquiredLock: AcquiredLock | null = null;
for (const sharedWaiter of leadingSharedWaiters) {
const lock = this.promoteWaiter(tx, sharedWaiter, now);
if (sharedWaiter.id === waiter.id) {
acquiredLock = lock;
}
}
if (!acquiredLock) {
return { status: "waiting" };
}
return { status: "acquired", lock: acquiredLock };
});
}
private async waitForQueuedLock(request: LockRequest, signal?: AbortSignal) {
this.throwIfAborted(signal);
const waiterId = this.generateLockId();
this.createWaiter(request, waiterId);
this.startHeartbeat("waiter", waiterId);
try {
while (true) {
this.throwIfAborted(signal);
const attempt = this.tryPromoteWaiter(waiterId);
if (attempt.status === "acquired") {
this.stopHeartbeat(waiterId);
const releaseLock = this.createRelease(attempt.lock);
this.releaseIfAborted(releaseLock, signal);
return releaseLock;
}
if (attempt.status === "missing") {
this.createWaiter(request, waiterId);
this.startHeartbeat("waiter", waiterId);
}
await this.waitForPoll(signal);
}
} catch (error) {
this.stopHeartbeat(waiterId);
this.deleteWaiter(waiterId);
this.release({ id: waiterId });
throw error;
}
}
async acquireShared(repositoryId: string, operation: string, signal?: AbortSignal) {
this.throwIfAborted(signal);
const request: LockRequest = { repositoryId, type: "shared", operation };
const releaseLock = this.tryAcquireImmediately(request, signal);
if (releaseLock) {
return releaseLock;
}
logger.debug(`[Mutex] Acquired exclusive lock for repo ${repositoryId}: ${operation} (${lockId})`);
logger.debug(`[Mutex] Waiting for shared lock on repo ${repositoryId}: ${operation}`);
return await this.waitForQueuedLock(request, signal);
}
return this.createRelease("exclusive", repositoryId, lockId!);
async acquireExclusive(repositoryId: string, operation: string, signal?: AbortSignal) {
this.throwIfAborted(signal);
const request: LockRequest = { repositoryId, type: "exclusive", operation };
const releaseLock = this.tryAcquireImmediately(request, signal);
if (releaseLock) {
logger.debug(`[Mutex] Acquired exclusive lock for repo ${repositoryId}: ${operation}`);
return releaseLock;
}
logger.debug(`[Mutex] Waiting for exclusive lock on repo ${repositoryId}: ${operation}`);
const queuedReleaseLock = await this.waitForQueuedLock(request, signal);
logger.debug(`[Mutex] Acquired exclusive lock for repo ${repositoryId}: ${operation}`);
return queuedReleaseLock;
}
async acquireMany(requests: LockRequest[], signal?: AbortSignal) {
if (signal?.aborted) {
throw signal.reason || new Error("Operation aborted");
}
this.throwIfAborted(signal);
if (requests.length === 0) {
return () => {};
@@ -309,119 +374,123 @@ class RepositoryMutex {
const sortedRequests = [...requests].sort((a, b) => a.repositoryId.localeCompare(b.repositoryId));
while (true) {
const releaseLocks = this.tryAcquireMany(sortedRequests);
if (releaseLocks) {
const locks = this.tryAcquireManyRows(sortedRequests);
if (locks) {
const releaseLocks = this.createReleaseMany(locks);
this.releaseIfAborted(releaseLocks, signal);
return releaseLocks;
}
await this.waitForChange(
sortedRequests.map((request) => request.repositoryId),
signal,
);
if (signal?.aborted) {
throw signal.reason || new Error("Operation aborted");
}
await this.waitForPoll(signal);
}
}
private releaseShared(repositoryId: string, lockId: string): void {
const state = this.locks.get(repositoryId);
if (!state) {
return;
}
isLocked(repositoryId: string) {
const now = Date.now();
const holder = state.sharedHolders.get(lockId);
if (!holder) {
return;
}
state.sharedHolders.delete(lockId);
const duration = Date.now() - holder.acquiredAt;
logger.debug(`[Mutex] Released shared lock for repo ${repositoryId}: ${holder.operation} (held for ${duration}ms)`);
this.processWaitQueue(repositoryId);
this.cleanupStateIfEmpty(repositoryId);
this.notifyChange(repositoryId);
return db.transaction((tx) => {
this.cleanupExpired(tx, now);
return this.getActiveLocks(tx, repositoryId, now).length > 0;
});
}
private releaseExclusive(repositoryId: string, lockId: string): void {
const state = this.locks.get(repositoryId);
if (!state) {
return;
}
if (!state.exclusiveHolder || state.exclusiveHolder.id !== lockId) {
return;
}
const duration = Date.now() - state.exclusiveHolder.acquiredAt;
logger.debug(
`[Mutex] Released exclusive lock for repo ${repositoryId}: ${state.exclusiveHolder.operation} (held for ${duration}ms)`,
);
state.exclusiveHolder = null;
this.processWaitQueue(repositoryId);
this.cleanupStateIfEmpty(repositoryId);
this.notifyChange(repositoryId);
}
private processWaitQueue(repositoryId: string): void {
const state = this.locks.get(repositoryId);
if (!state || state.waitQueue.length === 0) {
return;
}
if (state.exclusiveHolder) {
return;
}
const firstWaiter = state.waitQueue[0];
if (firstWaiter.type === "exclusive") {
if (state.sharedHolders.size === 0) {
state.waitQueue.shift();
const lockId = this.generateLockId();
state.exclusiveHolder = {
id: lockId,
operation: firstWaiter.operation,
acquiredAt: Date.now(),
};
firstWaiter.resolve(lockId);
}
} else {
while (state.waitQueue.length > 0 && state.waitQueue[0].type === "shared") {
const waiter = state.waitQueue.shift();
if (!waiter) break;
const lockId = this.generateLockId();
state.sharedHolders.set(lockId, {
id: lockId,
operation: waiter.operation,
acquiredAt: Date.now(),
});
waiter.resolve(lockId);
}
}
}
isLocked(repositoryId: string): boolean {
const state = this.locks.get(repositoryId);
if (!state) return false;
return state.exclusiveHolder !== null || state.sharedHolders.size > 0;
}
private createRelease(type: LockType, repositoryId: string, lockId: string) {
private createReleaseMany(locks: AcquiredLock[]) {
const releases = locks.map((lock) => this.createRelease(lock));
let released = false;
return () => {
if (released) return;
released = true;
if (type === "shared") {
this.releaseShared(repositoryId, lockId);
} else {
this.releaseExclusive(repositoryId, lockId);
for (const release of releases.toReversed()) {
release();
}
};
}
private createRelease(lock: AcquiredLock) {
this.startHeartbeat("lock", lock.id);
let released = false;
return () => {
if (released) return;
released = true;
this.stopHeartbeat(lock.id);
this.release(lock);
};
}
private release(lock: Pick<AcquiredLock, "id">) {
const releasedLock = db.transaction((tx) => {
const row = tx.query.repositoryLocksTable
.findFirst({ where: { AND: [{ id: { eq: lock.id } }, { ownerId: { eq: this.ownerId } }] } })
.sync();
if (!row) return null;
tx.delete(repositoryLocksTable)
.where(and(eq(repositoryLocksTable.id, lock.id), eq(repositoryLocksTable.ownerId, this.ownerId)))
.run();
return row;
});
if (!releasedLock) return;
const duration = Date.now() - releasedLock.acquiredAt;
logger.debug(
`[Mutex] Released ${releasedLock.type} lock for repo ${releasedLock.repositoryId}: ${releasedLock.operation} (held for ${duration}ms)`,
);
}
private startHeartbeat(target: HeartbeatTarget, lockId: string) {
this.stopHeartbeat(lockId);
const heartbeat = () => {
const now = Date.now();
const values = { heartbeatAt: now, expiresAt: now + LOCK_LEASE_MS };
try {
if (target === "lock") {
db.update(repositoryLocksTable)
.set(values)
.where(and(eq(repositoryLocksTable.id, lockId), eq(repositoryLocksTable.ownerId, this.ownerId)))
.run();
} else {
db.update(repositoryLockWaitersTable)
.set(values)
.where(
and(
eq(repositoryLockWaitersTable.id, lockId),
eq(repositoryLockWaitersTable.ownerId, this.ownerId),
),
)
.run();
}
} catch (error) {
logger.warn(`[Mutex] Failed to heartbeat ${target} ${lockId}: ${String(error)}`);
}
};
const timer = setInterval(heartbeat, LOCK_HEARTBEAT_MS);
if (timer && "unref" in timer) {
timer.unref();
}
this.heartbeatTimers.set(lockId, timer);
}
private stopHeartbeat(lockId: string) {
const timer = this.heartbeatTimers.get(lockId);
if (!timer) {
return;
}
clearInterval(timer);
this.heartbeatTimers.delete(lockId);
}
}
export const repoMutex = new RepositoryMutex();

View File

@@ -33,6 +33,7 @@ const runMigrations = async () => {
migrate(db, { migrationsFolder });
sqlite.run("PRAGMA foreign_keys = ON;");
sqlite.run("PRAGMA busy_timeout = 5000;");
};
export const runDbMigrations = () => {

View File

@@ -310,6 +310,48 @@ export const repositoriesTable = sqliteTable(
export type Repository = typeof repositoriesTable.$inferSelect;
export type RepositoryInsert = typeof repositoriesTable.$inferInsert;
export type RepositoryLockType = "shared" | "exclusive";
export const repositoryLocksTable = sqliteTable(
"repository_locks",
{
id: text("id").primaryKey(),
repositoryId: text("repository_id").notNull(),
type: text("type").$type<RepositoryLockType>().notNull(),
operation: text("operation").notNull(),
ownerId: text("owner_id").notNull(),
acquiredAt: int("acquired_at", { mode: "number" }).notNull(),
expiresAt: int("expires_at", { mode: "number" }).notNull(),
heartbeatAt: int("heartbeat_at", { mode: "number" }).notNull(),
},
(table) => [
index("repository_locks_repository_id_idx").on(table.repositoryId),
index("repository_locks_expires_at_idx").on(table.expiresAt),
index("repository_locks_owner_id_idx").on(table.ownerId),
],
);
export type RepositoryLock = typeof repositoryLocksTable.$inferSelect;
export const repositoryLockWaitersTable = sqliteTable(
"repository_lock_waiters",
{
id: text("id").primaryKey(),
repositoryId: text("repository_id").notNull(),
type: text("type").$type<RepositoryLockType>().notNull(),
operation: text("operation").notNull(),
ownerId: text("owner_id").notNull(),
requestedAt: int("requested_at", { mode: "number" }).notNull(),
expiresAt: int("expires_at", { mode: "number" }).notNull(),
heartbeatAt: int("heartbeat_at", { mode: "number" }).notNull(),
},
(table) => [
index("repository_lock_waiters_repository_id_idx").on(table.repositoryId),
index("repository_lock_waiters_expires_at_idx").on(table.expiresAt),
index("repository_lock_waiters_owner_id_idx").on(table.ownerId),
],
);
export type RepositoryLockWaiter = typeof repositoryLockWaitersTable.$inferSelect;
/**
* Backup Schedules Table
*/

View File

@@ -149,7 +149,10 @@ const runAndStoreRepositoryStats = async (repository: Repository): Promise<Resti
.update(repositoriesTable)
.set({ stats, statsUpdatedAt: Date.now() })
.where(
and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)),
and(
eq(repositoriesTable.id, repository.id),
eq(repositoriesTable.organizationId, repository.organizationId),
),
);
return stats;
@@ -190,7 +193,10 @@ const deleteRepository = async (shortId: ShortId) => {
await db
.delete(repositoriesTable)
.where(
and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)),
and(
eq(repositoriesTable.id, repository.id),
eq(repositoriesTable.organizationId, repository.organizationId),
),
);
cache.delByPrefix(cacheKeys.repository.all(repository.id));
@@ -505,7 +511,10 @@ const checkHealth = async (shortId: ShortId) => {
lastError: error,
})
.where(
and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)),
and(
eq(repositoriesTable.id, repository.id),
eq(repositoriesTable.organizationId, repository.organizationId),
),
);
return { lastError: error };
@@ -737,7 +746,9 @@ const updateRepository = async (shortId: ShortId, updates: UpdateRepositoryBody)
const [updated] = await db
.update(repositoriesTable)
.set(updatePayload)
.where(and(eq(repositoriesTable.id, existing.id), eq(repositoriesTable.organizationId, existing.organizationId)))
.where(
and(eq(repositoriesTable.id, existing.id), eq(repositoriesTable.organizationId, existing.organizationId)),
)
.returning();
if (!updated) {
@@ -792,11 +803,12 @@ const execResticCommand = async (
}
addCommonArgs(resticArgs, env, repository.config);
const result = await safeSpawn({ command: "restic", args: resticArgs, env, signal, onStdout, onStderr });
await cleanupTemporaryKeys(env, resticDeps);
return { exitCode: result.exitCode };
try {
const result = await safeSpawn({ command: "restic", args: resticArgs, env, signal, onStdout, onStderr });
return { exitCode: result.exitCode };
} finally {
await cleanupTemporaryKeys(env, resticDeps);
}
};
const getRetentionCategories = async (repositoryId: ShortId, scheduleId?: ShortId) => {