From 2318b6bdd003ee4c101fa48e4f27d50dbf6e4da3 Mon Sep 17 00:00:00 2001 From: Nico <47644445+nicotsx@users.noreply.github.com> Date: Fri, 5 Jun 2026 18:07:59 +0200 Subject: [PATCH] fix: limit concurrent ls to 2 in flight calls (#948) * fix: limit concurrent ls to 2 in flight calls * refactor: get shared lock after semaphore take --- .../__tests__/snapshot-tree-browser.test.tsx | 23 --- .../file-browsers/snapshot-tree-browser.tsx | 11 -- .../modules/backups/routes/backup-details.tsx | 4 +- .../__tests__/repositories.service.test.ts | 135 ++++++++++++++++++ .../repositories/repositories.service.ts | 69 +++++---- 5 files changed, 179 insertions(+), 63 deletions(-) diff --git a/app/client/components/file-browsers/__tests__/snapshot-tree-browser.test.tsx b/app/client/components/file-browsers/__tests__/snapshot-tree-browser.test.tsx index 352e167f..23df6252 100644 --- a/app/client/components/file-browsers/__tests__/snapshot-tree-browser.test.tsx +++ b/app/client/components/file-browsers/__tests__/snapshot-tree-browser.test.tsx @@ -224,29 +224,6 @@ describe("SnapshotTreeBrowser", () => { }); }); - test("prefetches using the query path when display and query roots differ", async () => { - const requests = mockListSnapshotFiles(); - - renderSnapshotTreeBrowser(); - - const row = await screen.findByRole("button", { name: "project" }); - const initialRequestCount = requests.length; - - await userEvent.hover(row); - - await waitFor(() => { - expect(requests.length).toBe(initialRequestCount + 1); - }); - - expect(requests.at(-1)).toEqual({ - shortId: "repo-1", - snapshotId: "snap-1", - path: "/mnt/project", - offset: "0", - limit: "500", - }); - }); - test("shows the query root contents when display and query roots differ", async () => { mockListSnapshotFiles(); diff --git a/app/client/components/file-browsers/snapshot-tree-browser.tsx b/app/client/components/file-browsers/snapshot-tree-browser.tsx index 6491e45f..cf16b3d8 100644 --- a/app/client/components/file-browsers/snapshot-tree-browser.tsx +++ b/app/client/components/file-browsers/snapshot-tree-browser.tsx @@ -5,7 +5,6 @@ import { FileBrowser, type FileBrowserUiProps } from "~/client/components/file-b import { useFileBrowser } from "~/client/hooks/use-file-browser"; import { parseError } from "~/client/lib/errors"; import { isPathWithin, normalizeAbsolutePath } from "@zerobyte/core/utils"; -import { logger } from "~/client/lib/logger"; function createPathPrefixFns(basePath: string) { return { @@ -84,16 +83,6 @@ export const SnapshotTreeBrowser = (props: SnapshotTreeBrowserProps) => { }), ); }, - prefetchFolder: (displayPath) => { - void queryClient - .prefetchQuery( - listSnapshotFilesOptions({ - path: { shortId: repositoryId, snapshotId }, - query: { path: displayPath, offset: 0, limit: pageSize }, - }), - ) - .catch((e) => logger.error(e)); - }, pathTransform: displayPathFns, }); diff --git a/app/client/modules/backups/routes/backup-details.tsx b/app/client/modules/backups/routes/backup-details.tsx index 20086559..75173a3a 100644 --- a/app/client/modules/backups/routes/backup-details.tsx +++ b/app/client/modules/backups/routes/backup-details.tsx @@ -63,9 +63,7 @@ export function ScheduleDetailsPage(props: Props) { const queryClient = useQueryClient(); const navigate = useNavigate(); const searchParams = useSearch({ from: "/(dashboard)/backups/$backupId/" }); - const [selectedSnapshotId, setSelectedSnapshotId] = useState( - initialSnapshotId ?? loaderData.snapshots?.at(-1)?.short_id, - ); + const [selectedSnapshotId, setSelectedSnapshotId] = useState(initialSnapshotId); const [showDeleteConfirm, setShowDeleteConfirm] = useState(false); const [snapshotToDelete, setSnapshotToDelete] = useState(null); diff --git a/app/server/modules/repositories/__tests__/repositories.service.test.ts b/app/server/modules/repositories/__tests__/repositories.service.test.ts index b5c6ccc4..9d50bf5b 100644 --- a/app/server/modules/repositories/__tests__/repositories.service.test.ts +++ b/app/server/modules/repositories/__tests__/repositories.service.test.ts @@ -208,6 +208,141 @@ describe("repositoriesService repository stats", () => { }); }); +describe("repositoriesService.listSnapshotFiles", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + test("limits concurrent restic ls commands per repository", async () => { + const repository = await createTestRepository(session.organizationId); + let active = 0; + let maxActive = 0; + let releaseAll = false; + let exclusiveAcquired = false; + let releaseExclusive: (() => void) | undefined; + let exclusivePromise: Promise<() => void> | undefined; + const releaseWaiters: Array<() => void> = []; + const exclusiveController = new AbortController(); + + const releaseWaitingCommands = () => { + const waiters = releaseWaiters.splice(0); + for (const release of waiters) { + release(); + } + }; + + const resolveWithin = async (promise: Promise, timeoutMs: number) => { + return await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error(`Expected promise to resolve within ${timeoutMs}ms`)); + }, timeoutMs); + + promise.then( + (value) => { + clearTimeout(timeout); + resolve(value); + }, + (error) => { + clearTimeout(timeout); + reject(error); + }, + ); + }); + }; + + const lsSpy = vi.spyOn(restic, "ls").mockImplementation((_config, snapshotId, _path, options) => + Effect.promise(async () => { + active++; + maxActive = Math.max(maxActive, active); + + try { + if (!releaseAll) { + await new Promise((resolve) => releaseWaiters.push(resolve)); + } + + return { + snapshot: { + id: snapshotId, + short_id: snapshotId, + time: new Date().toISOString(), + tree: "tree", + paths: ["/"], + hostname: "host", + struct_type: "snapshot" as const, + message_type: "snapshot" as const, + }, + nodes: [], + pagination: { + offset: options.offset ?? 0, + limit: options.limit ?? 500, + total: 0, + hasMore: false, + }, + }; + } finally { + active--; + } + }), + ); + + const calls = Array.from({ length: 4 }, (_, index) => + withContext({ organizationId: session.organizationId, userId: session.user.id }, () => + repositoriesService.listSnapshotFiles(repository.shortId, `snapshot-${index}`, "/", { + offset: 0, + limit: 100, + }), + ), + ); + + try { + await waitForExpect(() => { + expect(releaseWaiters).toHaveLength(2); + }); + expect(maxActive).toBe(2); + + exclusivePromise = repoMutex + .acquireExclusive(repository.id, "delete", exclusiveController.signal) + .then((release) => { + exclusiveAcquired = true; + releaseExclusive = release; + return release; + }); + + releaseWaitingCommands(); + + releaseExclusive = await resolveWithin(exclusivePromise, 2000); + expect(exclusiveAcquired).toBe(true); + expect(active).toBe(0); + + releaseExclusive(); + releaseExclusive = undefined; + + await waitForExpect(() => { + expect(releaseWaiters).toHaveLength(2); + }); + expect(maxActive).toBe(2); + + releaseWaitingCommands(); + await Promise.all(calls); + } finally { + if (releaseExclusive) { + releaseExclusive(); + } else { + exclusiveController.abort(); + } + releaseAll = true; + releaseWaitingCommands(); + await Promise.allSettled(calls); + if (exclusivePromise) { + await Promise.allSettled([exclusivePromise]); + } + } + + expect(lsSpy).toHaveBeenCalledTimes(4); + expect(maxActive).toBeLessThanOrEqual(2); + }); +}); + describe("repositoriesService.dumpSnapshot", () => { afterEach(() => { vi.restoreAllMocks(); diff --git a/app/server/modules/repositories/repositories.service.ts b/app/server/modules/repositories/repositories.service.ts index 5e04404c..c0343ede 100644 --- a/app/server/modules/repositories/repositories.service.ts +++ b/app/server/modules/repositories/repositories.service.ts @@ -42,6 +42,7 @@ import type { ParsedTask, TaskInput } from "../tasks/tasks.schemas"; import { Effect } from "effect"; const runningDoctors = new Map(); +const lsLimiters = new Map(); const RESTORE_TASK_RESOURCE_TYPE = "repository"; type RestoreTaskInput = Extract; @@ -99,6 +100,15 @@ const updateActiveRestoreTask = (restoreId: string, eventName: string, update: ( } }; +const getLsLimiter = (repositoryId: string) => { + let limiter = lsLimiters.get(repositoryId); + if (!limiter) { + limiter = Effect.runSync(Effect.makeSemaphore(2)); + lsLimiters.set(repositoryId, limiter); + } + return limiter; +}; + const findActiveRestoreTask = ( organizationId: string, repositoryShortId: string, @@ -469,36 +479,43 @@ const listSnapshotFiles = async ( }; } - const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`); + const limiter = getLsLimiter(repository.id); + await runEffectPromise(limiter.take(1)); + try { - const result = await runEffectPromise( - restic.ls(repository.config, snapshotId, path, { organizationId, offset, limit }), - ); + const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`); + try { + const result = await runEffectPromise( + restic.ls(repository.config, snapshotId, path, { organizationId, offset, limit }), + ); - if (!result.snapshot) { - throw new NotFoundError("Snapshot not found or empty"); + if (!result.snapshot) { + throw new NotFoundError("Snapshot not found or empty"); + } + + const response = { + snapshot: { + id: result.snapshot.id, + short_id: result.snapshot.short_id, + time: result.snapshot.time, + hostname: result.snapshot.hostname, + paths: result.snapshot.paths, + }, + files: result.nodes, + offset: result.pagination.offset, + limit: result.pagination.limit, + total: result.pagination.total, + hasMore: result.pagination.hasMore, + }; + + cache.set(cacheKey, result); + + return response; + } finally { + releaseLock(); } - - const response = { - snapshot: { - id: result.snapshot.id, - short_id: result.snapshot.short_id, - time: result.snapshot.time, - hostname: result.snapshot.hostname, - paths: result.snapshot.paths, - }, - files: result.nodes, - offset: result.pagination.offset, - limit: result.pagination.limit, - total: result.pagination.total, - hasMore: result.pagination.hasMore, - }; - - cache.set(cacheKey, result); - - return response; } finally { - releaseLock(); + await runEffectPromise(limiter.release(1)); } };