From cb17c44e55b6867273a66946d5cdda06c0579d79 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Thu, 12 Mar 2026 21:12:12 +0100 Subject: [PATCH] fix(dlx): fix race conditions in parallel dlx calls sharing Global Virtual Store (#10939) Content-verified skip in GVS mode, tolerate EPERM during bin creation on Windows, handle EPERM in DLX cache symlink. (cherry picked from commit 62f760ec3d812863da7d1fffb3252938f83dabde) --- .changeset/fix-dlx-concurrent-install-race.md | 9 +++ .../plugin-commands-script-runners/src/dlx.ts | 6 +- .../src/importIndexedDir.ts | 59 ++++++++++++---- .../test/importIndexedDir.race.test.ts | 67 ++++++++----------- pkg-manager/core/src/install/index.ts | 1 + pkg-manager/core/src/install/link.ts | 6 ++ pkg-manager/headless/src/index.ts | 3 + pkg-manager/link-bins/src/index.ts | 15 +++-- pnpm/test/utils/execPnpm.ts | 24 +++++-- store/cafs-types/src/index.ts | 1 + store/create-cafs-store/src/index.ts | 2 + store/store-controller-types/src/index.ts | 1 + worker/src/index.ts | 4 +- worker/src/start.ts | 2 + worker/src/types.ts | 1 + 15 files changed, 138 insertions(+), 63 deletions(-) create mode 100644 .changeset/fix-dlx-concurrent-install-race.md diff --git a/.changeset/fix-dlx-concurrent-install-race.md b/.changeset/fix-dlx-concurrent-install-race.md new file mode 100644 index 0000000000..ab027ff71e --- /dev/null +++ b/.changeset/fix-dlx-concurrent-install-race.md @@ -0,0 +1,9 @@ +--- +"@pnpm/fs.indexed-pkg-importer": patch +"@pnpm/plugin-commands-script-runners": patch +"@pnpm/core": patch +"@pnpm/link-bins": patch +"pnpm": patch +--- + +Fixed intermittent failures when multiple `pnpm dlx` calls run concurrently for the same package. When the global virtual store is enabled, the importer now verifies file content before skipping a rename, avoiding destructive swap-renames that break concurrent processes. Also tolerates EPERM during bin creation on Windows and properly propagates `enableGlobalVirtualStore` through the install pipeline. diff --git a/exec/plugin-commands-script-runners/src/dlx.ts b/exec/plugin-commands-script-runners/src/dlx.ts index 7c02ba141a..d39cb7fa0c 100644 --- a/exec/plugin-commands-script-runners/src/dlx.ts +++ b/exec/plugin-commands-script-runners/src/dlx.ts @@ -162,12 +162,12 @@ export async function handler ( try { await symlinkDir(cachedDir, cacheLink, { overwrite: true }) } catch (error) { - // EBUSY means that there is another dlx process running in parallel that has acquired the cache link first. - // Similarly, EEXIST means that another dlx process has created the cache link before this process. + // EBUSY/EEXIST/EPERM means that there is another dlx process running in parallel that has acquired the cache link first. + // EPERM can happen on Windows when another process has the symlink open while this process tries to unlink it. // The link created by the other process is just as up-to-date as the link the current process was attempting // to create. Therefore, instead of re-attempting to create the current link again, it is just as good to let // the other link stay. The current process should yield. - if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST')) { + if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST' && error.code !== 'EPERM')) { throw error } } diff --git a/fs/indexed-pkg-importer/src/importIndexedDir.ts b/fs/indexed-pkg-importer/src/importIndexedDir.ts index 6d06479c89..35f03c0f24 100644 --- a/fs/indexed-pkg-importer/src/importIndexedDir.ts +++ b/fs/indexed-pkg-importer/src/importIndexedDir.ts @@ -2,7 +2,7 @@ import fs from 'fs' import util from 'util' import { copySync } from 'fs-extra' import path from 'path' -import { globalWarn, logger } from '@pnpm/logger' +import { globalInfo, globalWarn, logger } from '@pnpm/logger' import { sync as rimraf } from '@zkochan/rimraf' import { sync as makeEmptyDir } from 'make-empty-dir' import sanitizeFilename from 'sanitize-filename' @@ -20,6 +20,7 @@ export function importIndexedDir ( filenames: Record, opts: { keepModulesDir?: boolean + safeToSkip?: boolean } ): void { const stage = pathTemp(newDir) @@ -61,27 +62,61 @@ They were renamed.`) } throw err } + if (opts.safeToSkip) { + // Content-addressable target (e.g. global virtual store): if the target + // already exists and has all expected files, it has the correct content. + // Skip instead of doing a swap-rename that temporarily removes the target + // directory — which breaks junctions read by other processes. + try { + fs.renameSync(stage, newDir) + return + } catch (err: unknown) { + if (util.types.isNativeError(err) && 'code' in err && (err.code === 'ENOTEMPTY' || err.code === 'EEXIST' || err.code === 'EPERM')) { + if (allFilesMatch(newDir, filenames)) { + try { + rimraf(stage) + } catch {} // eslint-disable-line:no-empty + return + } + } + // Files missing or other error — fall through to renameOverwriteSync + } + } try { renameOverwrite.sync(stage, newDir) } catch (renameErr: unknown) { - // When enableGlobalVirtualStore is true, multiple worker threads may import - // the same package to the same global store location concurrently. Their - // rename operations can race. If the rename fails but the target already - // has the expected content, another thread completed the import. try { rimraf(stage) } catch {} // eslint-disable-line:no-empty - if (util.types.isNativeError(renameErr) && 'code' in renameErr && (renameErr.code === 'ENOTEMPTY' || renameErr.code === 'EEXIST')) { - const firstFile = Object.keys(filenames)[0] - if (firstFile && fs.existsSync(path.join(newDir, firstFile))) { - logger('_virtual-store-race').debug({ target: newDir }) - return - } - } throw renameErr } } +function allFilesMatch (dir: string, filenames: Record): boolean { + for (const [f, src] of Object.entries(filenames)) { + const target = path.join(dir, f) + try { + const targetStat = gfs.statSync(target) + const srcStat = gfs.statSync(src) + // Fast path: hardlinks share the same inode + if (targetStat.ino === srcStat.ino && targetStat.dev === srcStat.dev) continue + // Copy path: compare size first, then content + if (targetStat.size !== srcStat.size) { + globalInfo(`Re-importing "${dir}" because file "${f}" has a different size`) + return false + } + if (!gfs.readFileSync(target).equals(gfs.readFileSync(src))) { + globalInfo(`Re-importing "${dir}" because file "${f}" has different content`) + return false + } + } catch { + globalInfo(`Re-importing "${dir}" because file "${f}" is missing or unreadable`) + return false + } + } + return true +} + interface SanitizeFilenamesResult { sanitizedFilenames: Record invalidFilenames: string[] diff --git a/fs/indexed-pkg-importer/test/importIndexedDir.race.test.ts b/fs/indexed-pkg-importer/test/importIndexedDir.race.test.ts index 2dc04931d9..0e9c398661 100644 --- a/fs/indexed-pkg-importer/test/importIndexedDir.race.test.ts +++ b/fs/indexed-pkg-importer/test/importIndexedDir.race.test.ts @@ -3,10 +3,8 @@ import path from 'path' import { jest } from '@jest/globals' import { tempDir } from '@pnpm/prepare' -// renameOverwrite is mocked because its real implementation handles ENOTEMPTY -// internally (rimraf + retry). The race condition we're testing only surfaces -// when renameOverwrite exhausts its retries due to concurrent threads -// repeatedly recreating the target, which can't be reproduced deterministically. +// renameOverwrite is mocked so we can verify it's called (or not called) +// and control its behavior in tests. jest.mock('rename-overwrite', () => { const fn = Object.assign(jest.fn(), { sync: jest.fn() }) return fn @@ -21,7 +19,31 @@ beforeEach(() => { renameOverwrite.sync.mockReset() }) -test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY)', () => { +test('safeToSkip skips when target already exists (content-addressed)', () => { + const tmp = tempDir() + const srcFile = path.join(tmp, 'src', 'package.json') + const newDir = path.join(tmp, 'dest') + + // Create source file in CAS + fs.mkdirSync(path.join(tmp, 'src'), { recursive: true }) + fs.writeFileSync(srcFile, '{"name":"pkg","version":"1.0.0"}') + + // Pre-create target (simulates another process that already placed it) + fs.mkdirSync(newDir, { recursive: true }) + fs.linkSync(srcFile, path.join(newDir, 'package.json')) + + const filenames: Record = { 'package.json': srcFile } + + // Should not throw — target exists, path is content-addressed so content is correct + importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true }) + + expect(fs.existsSync(path.join(newDir, 'package.json'))).toBe(true) + // When safeToSkip is true and the target already exists with matching content, + // renameOverwrite.sync should not be called on any platform. + expect(renameOverwrite.sync).not.toHaveBeenCalled() +}) + +test('safeToSkip creates dir when target does not exist', () => { const tmp = tempDir() const srcFile = path.join(tmp, 'src', 'index.js') const newDir = path.join(tmp, 'dest') @@ -30,41 +52,10 @@ test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY fs.mkdirSync(path.join(tmp, 'src'), { recursive: true }) fs.writeFileSync(srcFile, 'content') - // Pre-create target with expected content (simulating another thread completed first) - fs.mkdirSync(newDir, { recursive: true }) - fs.writeFileSync(path.join(newDir, 'index.js'), 'content') - const filenames: Record = { 'index.js': srcFile } - renameOverwrite.sync.mockImplementation(() => { - throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' }) - }) - - // Should not throw — the target already has the expected content - importIndexedDir(fs.copyFileSync, newDir, filenames, {}) + // Target doesn't exist — should create it + importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true }) expect(fs.existsSync(path.join(newDir, 'index.js'))).toBe(true) }) - -test('importIndexedDir throws ENOTEMPTY when target does not have expected content', () => { - const tmp = tempDir() - const srcFile = path.join(tmp, 'src', 'index.js') - const newDir = path.join(tmp, 'dest') - - // Create source file - fs.mkdirSync(path.join(tmp, 'src'), { recursive: true }) - fs.writeFileSync(srcFile, 'content') - - // Target exists but does NOT have the expected file - fs.mkdirSync(newDir, { recursive: true }) - - const filenames: Record = { 'index.js': srcFile } - - renameOverwrite.sync.mockImplementation(() => { - throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' }) - }) - - expect(() => { - importIndexedDir(fs.copyFileSync, newDir, filenames, {}) - }).toThrow('ENOTEMPTY') -}) diff --git a/pkg-manager/core/src/install/index.ts b/pkg-manager/core/src/install/index.ts index 8ce8f573fc..cc28b86305 100644 --- a/pkg-manager/core/src/install/index.ts +++ b/pkg-manager/core/src/install/index.ts @@ -1294,6 +1294,7 @@ const _installInContext: InstallFunction = async (projects, ctx, opts) => { dependenciesByProjectId, depsStateCache, disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps, + enableGlobalVirtualStore: opts.enableGlobalVirtualStore, extraNodePaths: ctx.extraNodePaths, force: opts.force, hoistedDependencies: ctx.hoistedDependencies, diff --git a/pkg-manager/core/src/install/link.ts b/pkg-manager/core/src/install/link.ts index 72b6a89da1..98c24098e9 100644 --- a/pkg-manager/core/src/install/link.ts +++ b/pkg-manager/core/src/install/link.ts @@ -51,6 +51,7 @@ export interface LinkPackagesOptions { disableRelinkLocalDirDeps?: boolean force: boolean depsStateCache: DepsStateCache + enableGlobalVirtualStore: boolean extraNodePaths: string[] hoistedDependencies: HoistedDependencies hoistedModulesDir: string @@ -151,6 +152,7 @@ export async function linkPackages (projects: ImporterToUpdate[], depGraph: Depe { allowBuild: opts.allowBuild, disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps, + enableGlobalVirtualStore: opts.enableGlobalVirtualStore, force: opts.force, depsStateCache: opts.depsStateCache, ignoreScripts: opts.ignoreScripts, @@ -322,6 +324,7 @@ interface LinkNewPackagesOptions { allowBuild?: AllowBuild depsStateCache: DepsStateCache disableRelinkLocalDirDeps?: boolean + enableGlobalVirtualStore: boolean force: boolean optional: boolean ignoreScripts: boolean @@ -402,6 +405,7 @@ async function linkNewPackages ( depGraph, depsStateCache: opts.depsStateCache, disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps, + enableGlobalVirtualStore: opts.enableGlobalVirtualStore, force: opts.force, ignoreScripts: opts.ignoreScripts, lockfileDir: opts.lockfileDir, @@ -456,6 +460,7 @@ async function linkAllPkgs ( depGraph: DependenciesGraph depsStateCache: DepsStateCache disableRelinkLocalDirDeps?: boolean + enableGlobalVirtualStore: boolean force: boolean ignoreScripts: boolean lockfileDir: string @@ -480,6 +485,7 @@ async function linkAllPkgs ( disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps, filesResponse: files, force: opts.force, + safeToSkip: opts.enableGlobalVirtualStore, sideEffectsCacheKey, requiresBuild: depNode.patch != null || depNode.requiresBuild, }) diff --git a/pkg-manager/headless/src/index.ts b/pkg-manager/headless/src/index.ts index 1987d61baf..9163e648c5 100644 --- a/pkg-manager/headless/src/index.ts +++ b/pkg-manager/headless/src/index.ts @@ -433,6 +433,7 @@ export async function headlessInstall (opts: HeadlessOptions): Promise { await new Promise((resolve, reject) => { - const proc = spawnPnpm(args, opts) + const proc = crossSpawn.spawn(process.execPath, [pnpmBinLocation, ...args], { + env: { + ...createEnv(), + ...opts?.env, + } as NodeJS.ProcessEnv, + stdio: ['inherit', 'pipe', 'pipe'], + }) const timeout = opts?.timeout ?? DEFAULT_EXEC_PNPM_TIMEOUT const timeoutId = registerProcessTimeout(proc, timeout, reject) + const output: Buffer[] = [] + proc.stdout!.on('data', (chunk: Buffer) => { + output.push(chunk); process.stdout.write(chunk) + }) + proc.stderr!.on('data', (chunk: Buffer) => { + output.push(chunk); process.stderr.write(chunk) + }) + proc.on('error', reject) - proc.on('close', (code: number) => { + proc.on('close', (code: number | null, signal: string | null) => { clearTimeout(timeoutId) - if (code > 0) { - reject(new Error(`Exit code ${code}`)) + if (signal) { + reject(new Error(`Killed by signal ${signal}\n\n${Buffer.concat(output).toString()}`)) + } else if (code) { + reject(new Error(`Exit code ${code}\n\n${Buffer.concat(output).toString()}`)) } else { resolve() } diff --git a/store/cafs-types/src/index.ts b/store/cafs-types/src/index.ts index ab595e8d11..976412a992 100644 --- a/store/cafs-types/src/index.ts +++ b/store/cafs-types/src/index.ts @@ -39,6 +39,7 @@ export interface ImportPackageOpts { filesResponse: PackageFilesResponse force: boolean keepModulesDir?: boolean + safeToSkip?: boolean } export type ImportPackageFunction = ( diff --git a/store/create-cafs-store/src/index.ts b/store/create-cafs-store/src/index.ts index c1356333bc..27dfc1461d 100644 --- a/store/create-cafs-store/src/index.ts +++ b/store/create-cafs-store/src/index.ts @@ -44,6 +44,7 @@ export function createPackageImporterAsync ( resolvedFrom: opts.filesResponse.resolvedFrom, force: opts.force, keepModulesDir: Boolean(opts.keepModulesDir), + safeToSkip: opts.safeToSkip, }) return { importMethod, isBuilt } } @@ -74,6 +75,7 @@ function createPackageImporter ( resolvedFrom: opts.filesResponse.resolvedFrom, force: opts.force, keepModulesDir: Boolean(opts.keepModulesDir), + safeToSkip: opts.safeToSkip, }) return { importMethod, isBuilt } } diff --git a/store/store-controller-types/src/index.ts b/store/store-controller-types/src/index.ts index 4b75d761bc..44615ea1e8 100644 --- a/store/store-controller-types/src/index.ts +++ b/store/store-controller-types/src/index.ts @@ -184,6 +184,7 @@ export interface ImportOptions { force: boolean resolvedFrom: ResolvedFrom keepModulesDir?: boolean + safeToSkip?: boolean } export type ImportIndexedPackage = (to: string, opts: ImportOptions) => string | undefined diff --git a/worker/src/index.ts b/worker/src/index.ts index 3031d8f0aa..8593306476 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -222,7 +222,7 @@ export async function importPackage ( localWorker.once('message', ({ status, error, value }: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any workerPool!.checkinWorker(localWorker) if (status === 'error') { - reject(new PnpmError(error.code ?? 'LINKING_FAILED', error.message as string)) + reject(new PnpmError(error.code ?? 'LINKING_FAILED', `[importPackage ${opts.targetDir}] ${error.message as string}`)) return } resolve(value) @@ -247,7 +247,7 @@ export async function symlinkAllModules ( workerPool!.checkinWorker(localWorker) if (status === 'error') { const hint = opts.deps?.[0]?.modules != null ? createErrorHint(error, opts.deps[0].modules) : undefined - reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', error.message as string, { hint })) + reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', `[symlinkAllModules] ${error.message as string}`, { hint })) return } resolve(value) diff --git a/worker/src/start.ts b/worker/src/start.ts index 62074d5b71..3ef4c2e0fb 100644 --- a/worker/src/start.ts +++ b/worker/src/start.ts @@ -344,6 +344,7 @@ function importPackage ({ force, keepModulesDir, disableRelinkLocalDirDeps, + safeToSkip, }: LinkPkgMessage): ImportPackageResult { const cacheKey = JSON.stringify({ storeDir, packageImportMethod }) if (!cafsStoreCache.has(cacheKey)) { @@ -357,6 +358,7 @@ function importPackage ({ requiresBuild, sideEffectsCacheKey, keepModulesDir, + safeToSkip, }) return { status: 'success', value: { isBuilt, importMethod } } } diff --git a/worker/src/types.ts b/worker/src/types.ts index 4df618bbc6..24b4765573 100644 --- a/worker/src/types.ts +++ b/worker/src/types.ts @@ -33,6 +33,7 @@ export interface LinkPkgMessage { force: boolean keepModulesDir?: boolean disableRelinkLocalDirDeps?: boolean + safeToSkip?: boolean } export interface SymlinkAllModulesMessage {