fix: limit concurrent ls to 2 in flight calls (#948)

* fix: limit concurrent ls to 2 in flight calls

* refactor: get shared lock after semaphore take
This commit is contained in:
Nico
2026-06-05 18:07:59 +02:00
committed by GitHub
parent 34fc4f0cbb
commit 2318b6bdd0
5 changed files with 179 additions and 63 deletions

View File

@@ -224,29 +224,6 @@ describe("SnapshotTreeBrowser", () => {
});
});
test("prefetches using the query path when display and query roots differ", async () => {
const requests = mockListSnapshotFiles();
renderSnapshotTreeBrowser();
const row = await screen.findByRole("button", { name: "project" });
const initialRequestCount = requests.length;
await userEvent.hover(row);
await waitFor(() => {
expect(requests.length).toBe(initialRequestCount + 1);
});
expect(requests.at(-1)).toEqual({
shortId: "repo-1",
snapshotId: "snap-1",
path: "/mnt/project",
offset: "0",
limit: "500",
});
});
test("shows the query root contents when display and query roots differ", async () => {
mockListSnapshotFiles();

View File

@@ -5,7 +5,6 @@ import { FileBrowser, type FileBrowserUiProps } from "~/client/components/file-b
import { useFileBrowser } from "~/client/hooks/use-file-browser";
import { parseError } from "~/client/lib/errors";
import { isPathWithin, normalizeAbsolutePath } from "@zerobyte/core/utils";
import { logger } from "~/client/lib/logger";
function createPathPrefixFns(basePath: string) {
return {
@@ -84,16 +83,6 @@ export const SnapshotTreeBrowser = (props: SnapshotTreeBrowserProps) => {
}),
);
},
prefetchFolder: (displayPath) => {
void queryClient
.prefetchQuery(
listSnapshotFilesOptions({
path: { shortId: repositoryId, snapshotId },
query: { path: displayPath, offset: 0, limit: pageSize },
}),
)
.catch((e) => logger.error(e));
},
pathTransform: displayPathFns,
});

View File

@@ -63,9 +63,7 @@ export function ScheduleDetailsPage(props: Props) {
const queryClient = useQueryClient();
const navigate = useNavigate();
const searchParams = useSearch({ from: "/(dashboard)/backups/$backupId/" });
const [selectedSnapshotId, setSelectedSnapshotId] = useState<string | undefined>(
initialSnapshotId ?? loaderData.snapshots?.at(-1)?.short_id,
);
const [selectedSnapshotId, setSelectedSnapshotId] = useState<string | undefined>(initialSnapshotId);
const [showDeleteConfirm, setShowDeleteConfirm] = useState(false);
const [snapshotToDelete, setSnapshotToDelete] = useState<string | null>(null);

View File

@@ -208,6 +208,141 @@ describe("repositoriesService repository stats", () => {
});
});
describe("repositoriesService.listSnapshotFiles", () => {
afterEach(() => {
vi.restoreAllMocks();
});
test("limits concurrent restic ls commands per repository", async () => {
const repository = await createTestRepository(session.organizationId);
let active = 0;
let maxActive = 0;
let releaseAll = false;
let exclusiveAcquired = false;
let releaseExclusive: (() => void) | undefined;
let exclusivePromise: Promise<() => void> | undefined;
const releaseWaiters: Array<() => void> = [];
const exclusiveController = new AbortController();
const releaseWaitingCommands = () => {
const waiters = releaseWaiters.splice(0);
for (const release of waiters) {
release();
}
};
const resolveWithin = async <T>(promise: Promise<T>, timeoutMs: number) => {
return await new Promise<T>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Expected promise to resolve within ${timeoutMs}ms`));
}, timeoutMs);
promise.then(
(value) => {
clearTimeout(timeout);
resolve(value);
},
(error) => {
clearTimeout(timeout);
reject(error);
},
);
});
};
const lsSpy = vi.spyOn(restic, "ls").mockImplementation((_config, snapshotId, _path, options) =>
Effect.promise(async () => {
active++;
maxActive = Math.max(maxActive, active);
try {
if (!releaseAll) {
await new Promise<void>((resolve) => releaseWaiters.push(resolve));
}
return {
snapshot: {
id: snapshotId,
short_id: snapshotId,
time: new Date().toISOString(),
tree: "tree",
paths: ["/"],
hostname: "host",
struct_type: "snapshot" as const,
message_type: "snapshot" as const,
},
nodes: [],
pagination: {
offset: options.offset ?? 0,
limit: options.limit ?? 500,
total: 0,
hasMore: false,
},
};
} finally {
active--;
}
}),
);
const calls = Array.from({ length: 4 }, (_, index) =>
withContext({ organizationId: session.organizationId, userId: session.user.id }, () =>
repositoriesService.listSnapshotFiles(repository.shortId, `snapshot-${index}`, "/", {
offset: 0,
limit: 100,
}),
),
);
try {
await waitForExpect(() => {
expect(releaseWaiters).toHaveLength(2);
});
expect(maxActive).toBe(2);
exclusivePromise = repoMutex
.acquireExclusive(repository.id, "delete", exclusiveController.signal)
.then((release) => {
exclusiveAcquired = true;
releaseExclusive = release;
return release;
});
releaseWaitingCommands();
releaseExclusive = await resolveWithin(exclusivePromise, 2000);
expect(exclusiveAcquired).toBe(true);
expect(active).toBe(0);
releaseExclusive();
releaseExclusive = undefined;
await waitForExpect(() => {
expect(releaseWaiters).toHaveLength(2);
});
expect(maxActive).toBe(2);
releaseWaitingCommands();
await Promise.all(calls);
} finally {
if (releaseExclusive) {
releaseExclusive();
} else {
exclusiveController.abort();
}
releaseAll = true;
releaseWaitingCommands();
await Promise.allSettled(calls);
if (exclusivePromise) {
await Promise.allSettled([exclusivePromise]);
}
}
expect(lsSpy).toHaveBeenCalledTimes(4);
expect(maxActive).toBeLessThanOrEqual(2);
});
});
describe("repositoriesService.dumpSnapshot", () => {
afterEach(() => {
vi.restoreAllMocks();

View File

@@ -42,6 +42,7 @@ import type { ParsedTask, TaskInput } from "../tasks/tasks.schemas";
import { Effect } from "effect";
const runningDoctors = new Map<string, AbortController>();
const lsLimiters = new Map<string, Effect.Semaphore>();
const RESTORE_TASK_RESOURCE_TYPE = "repository";
type RestoreTaskInput = Extract<TaskInput, { kind: "restore" }>;
@@ -99,6 +100,15 @@ const updateActiveRestoreTask = (restoreId: string, eventName: string, update: (
}
};
const getLsLimiter = (repositoryId: string) => {
let limiter = lsLimiters.get(repositoryId);
if (!limiter) {
limiter = Effect.runSync(Effect.makeSemaphore(2));
lsLimiters.set(repositoryId, limiter);
}
return limiter;
};
const findActiveRestoreTask = (
organizationId: string,
repositoryShortId: string,
@@ -469,36 +479,43 @@ const listSnapshotFiles = async (
};
}
const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`);
const limiter = getLsLimiter(repository.id);
await runEffectPromise(limiter.take(1));
try {
const result = await runEffectPromise(
restic.ls(repository.config, snapshotId, path, { organizationId, offset, limit }),
);
const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`);
try {
const result = await runEffectPromise(
restic.ls(repository.config, snapshotId, path, { organizationId, offset, limit }),
);
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
if (!result.snapshot) {
throw new NotFoundError("Snapshot not found or empty");
}
const response = {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
offset: result.pagination.offset,
limit: result.pagination.limit,
total: result.pagination.total,
hasMore: result.pagination.hasMore,
};
cache.set(cacheKey, result);
return response;
} finally {
releaseLock();
}
const response = {
snapshot: {
id: result.snapshot.id,
short_id: result.snapshot.short_id,
time: result.snapshot.time,
hostname: result.snapshot.hostname,
paths: result.snapshot.paths,
},
files: result.nodes,
offset: result.pagination.offset,
limit: result.pagination.limit,
total: result.pagination.total,
hasMore: result.pagination.hasMore,
};
cache.set(cacheKey, result);
return response;
} finally {
releaseLock();
await runEffectPromise(limiter.release(1));
}
};