fix(dlx): fix race conditions in parallel dlx calls sharing Global Virtual Store (#10939)

## Summary

Fixes intermittent failures in the `parallel dlx calls of the same package` test, especially on Windows CI. Multiple race conditions were discovered when concurrent `pnpm dlx` processes share the same Global Virtual Store (GVS):

- **Content-verified skip in GVS mode**: When `safeToSkip` is true and a rename fails because the target already exists (ENOTEMPTY/EEXIST/EPERM), verify all files match (inode or content comparison) before skipping. Falls through to `renameOverwriteSync` if content doesn't match.
- **Tolerate EPERM during bin creation on Windows**: `cmd-shim`'s `chmod` can fail with EPERM when another process holds the `.bin` file. Warn instead of crashing.
- **Handle EPERM in DLX cache symlink**: Added EPERM to the list of tolerated errors when creating the DLX cache symlink, as Windows can throw this when another process has the symlink open.

## Test plan

- [x] `parallel dlx calls of the same package` test passes on Windows CI
- [x] Full test suite passes on both Ubuntu and Windows
This commit is contained in:
Zoltan Kochan
2026-03-12 21:06:32 +01:00
committed by GitHub
parent 39afb24ce6
commit 62f760ec3d
15 changed files with 136 additions and 74 deletions

View File

@@ -0,0 +1,9 @@
---
"@pnpm/fs.indexed-pkg-importer": patch
"@pnpm/plugin-commands-script-runners": patch
"@pnpm/core": patch
"@pnpm/link-bins": patch
"pnpm": patch
---
Fixed intermittent failures when multiple `pnpm dlx` calls run concurrently for the same package. When the global virtual store is enabled, the importer now verifies file content before skipping a rename, avoiding destructive swap-renames that break concurrent processes. Also tolerates EPERM during bin creation on Windows and properly propagates `enableGlobalVirtualStore` through the install pipeline.

View File

@@ -162,12 +162,12 @@ export async function handler (
try {
await symlinkDir(cachedDir, cacheLink, { overwrite: true })
} catch (error) {
// EBUSY means that there is another dlx process running in parallel that has acquired the cache link first.
// Similarly, EEXIST means that another dlx process has created the cache link before this process.
// EBUSY/EEXIST/EPERM means that there is another dlx process running in parallel that has acquired the cache link first.
// EPERM can happen on Windows when another process has the symlink open while this process tries to unlink it.
// The link created by the other process is just as up-to-date as the link the current process was attempting
// to create. Therefore, instead of re-attempting to create the current link again, it is just as good to let
// the other link stay. The current process should yield.
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST')) {
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST' && error.code !== 'EPERM')) {
throw error
}
}

View File

@@ -2,7 +2,7 @@ import fs from 'fs'
import util from 'util'
import fsx from 'fs-extra'
import path from 'path'
import { globalWarn, logger } from '@pnpm/logger'
import { globalInfo, globalWarn, logger } from '@pnpm/logger'
import { rimrafSync } from '@zkochan/rimraf'
import { makeEmptyDirSync } from 'make-empty-dir'
import sanitizeFilename from 'sanitize-filename'
@@ -20,6 +20,7 @@ export function importIndexedDir (
filenames: Map<string, string>,
opts: {
keepModulesDir?: boolean
safeToSkip?: boolean
}
): void {
const stage = pathTemp(newDir)
@@ -61,38 +62,61 @@ They were renamed.`)
}
throw err
}
if (opts.safeToSkip) {
// Content-addressable target (e.g. global virtual store): if the target
// already exists and has all expected files, it has the correct content.
// Skip instead of doing a swap-rename that temporarily removes the target
// directory — which breaks junctions read by other processes.
try {
fs.renameSync(stage, newDir)
return
} catch (err: unknown) {
if (util.types.isNativeError(err) && 'code' in err && (err.code === 'ENOTEMPTY' || err.code === 'EEXIST' || err.code === 'EPERM')) {
if (allFilesMatch(newDir, filenames)) {
try {
rimrafSync(stage)
} catch {} // eslint-disable-line:no-empty
return
}
}
// Files missing or other error — fall through to renameOverwriteSync
}
}
try {
renameOverwriteSync(stage, newDir)
} catch (renameErr: unknown) {
// When enableGlobalVirtualStore is true, multiple worker threads may import
// the same package to the same global store location concurrently. Their
// rename operations can race. If the rename fails but the target already
// has the expected content, another thread completed the import.
try {
rimrafSync(stage)
} catch {} // eslint-disable-line:no-empty
if (util.types.isNativeError(renameErr) && 'code' in renameErr && (renameErr.code === 'ENOTEMPTY' || renameErr.code === 'EEXIST')) {
const firstFile = filenames.keys().next().value
if (firstFile) {
const targetFile = path.join(newDir, firstFile)
// Retry with short delays. With 3+ concurrent workers, a third thread
// may have rimrafed the target (inside its own renameOverwrite) but not
// yet completed its own rename. A short wait lets it finish.
for (let attempt = 0; attempt < 4; attempt++) {
if (attempt > 0) {
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, 50)
}
if (fs.existsSync(targetFile)) {
logger('_virtual-store-race').debug({ target: newDir })
return
}
}
}
}
throw renameErr
}
}
function allFilesMatch (dir: string, filenames: Map<string, string>): boolean {
for (const [f, src] of filenames) {
const target = path.join(dir, f)
try {
const targetStat = gfs.statSync(target)
const srcStat = gfs.statSync(src)
// Fast path: hardlinks share the same inode
if (targetStat.ino === srcStat.ino && targetStat.dev === srcStat.dev) continue
// Copy path: compare size first, then content
if (targetStat.size !== srcStat.size) {
globalInfo(`Re-importing "${dir}" because file "${f}" has a different size`)
return false
}
if (!gfs.readFileSync(target).equals(gfs.readFileSync(src))) {
globalInfo(`Re-importing "${dir}" because file "${f}" has different content`)
return false
}
} catch {
globalInfo(`Re-importing "${dir}" because file "${f}" is missing or unreadable`)
return false
}
}
return true
}
interface SanitizeFilenamesResult {
sanitizedFilenames: Map<string, string>
invalidFilenames: string[]

View File

@@ -3,10 +3,8 @@ import path from 'path'
import { jest } from '@jest/globals'
import { tempDir } from '@pnpm/prepare'
// renameOverwrite is mocked because its real implementation handles ENOTEMPTY
// internally (rimraf + retry). The race condition we're testing only surfaces
// when renameOverwrite exhausts its retries due to concurrent threads
// repeatedly recreating the target, which can't be reproduced deterministically.
// Mock renameOverwriteSync so we can verify it's called (or not called)
// and control its behavior in tests.
const renameOverwriteSyncMock = jest.fn()
jest.unstable_mockModule('rename-overwrite', () => ({
renameOverwrite: jest.fn(),
@@ -19,7 +17,31 @@ beforeEach(() => {
renameOverwriteSyncMock.mockReset()
})
test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY)', () => {
test('safeToSkip skips when target already exists (content-addressed)', () => {
const tmp = tempDir()
const srcFile = path.join(tmp, 'src', 'package.json')
const newDir = path.join(tmp, 'dest')
// Create source file in CAS
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
fs.writeFileSync(srcFile, '{"name":"pkg","version":"1.0.0"}')
// Pre-create target (simulates another process that already placed it)
fs.mkdirSync(newDir, { recursive: true })
fs.linkSync(srcFile, path.join(newDir, 'package.json'))
const filenames = new Map([['package.json', srcFile]])
// Should not throw — target exists, path is content-addressed so content is correct
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
expect(fs.existsSync(path.join(newDir, 'package.json'))).toBe(true)
// When safeToSkip is true and the target already exists with matching content,
// renameOverwriteSync should not be called on any platform.
expect(renameOverwriteSyncMock).not.toHaveBeenCalled()
})
test('safeToSkip creates dir when target does not exist', () => {
const tmp = tempDir()
const srcFile = path.join(tmp, 'src', 'index.js')
const newDir = path.join(tmp, 'dest')
@@ -28,41 +50,10 @@ test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
fs.writeFileSync(srcFile, 'content')
// Pre-create target with expected content (simulating another thread completed first)
fs.mkdirSync(newDir, { recursive: true })
fs.writeFileSync(path.join(newDir, 'index.js'), 'content')
const filenames = new Map([['index.js', srcFile]])
renameOverwriteSyncMock.mockImplementation(() => {
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
})
// Should not throw — the target already has the expected content
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
// Target doesn't exist — should create it
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
expect(fs.existsSync(path.join(newDir, 'index.js'))).toBe(true)
})
test('importIndexedDir throws ENOTEMPTY when target does not have expected content', () => {
const tmp = tempDir()
const srcFile = path.join(tmp, 'src', 'index.js')
const newDir = path.join(tmp, 'dest')
// Create source file
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
fs.writeFileSync(srcFile, 'content')
// Target exists but does NOT have the expected file
fs.mkdirSync(newDir, { recursive: true })
const filenames = new Map([['index.js', srcFile]])
renameOverwriteSyncMock.mockImplementation(() => {
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
})
expect(() => {
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
}).toThrow('ENOTEMPTY')
})

View File

@@ -1311,6 +1311,7 @@ const _installInContext: InstallFunction = async (projects, ctx, opts) => {
dependenciesByProjectId,
depsStateCache,
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
extraNodePaths: ctx.extraNodePaths,
force: opts.force,
hoistedDependencies: ctx.hoistedDependencies,

View File

@@ -46,6 +46,7 @@ export interface LinkPackagesOptions {
disableRelinkLocalDirDeps?: boolean
force: boolean
depsStateCache: DepsStateCache
enableGlobalVirtualStore: boolean
extraNodePaths: string[]
hoistedDependencies: HoistedDependencies
hoistedModulesDir: string
@@ -146,6 +147,7 @@ export async function linkPackages (projects: ImporterToUpdate[], depGraph: Depe
{
allowBuild: opts.allowBuild,
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
force: opts.force,
depsStateCache: opts.depsStateCache,
ignoreScripts: opts.ignoreScripts,
@@ -317,6 +319,7 @@ interface LinkNewPackagesOptions {
allowBuild?: AllowBuild
depsStateCache: DepsStateCache
disableRelinkLocalDirDeps?: boolean
enableGlobalVirtualStore: boolean
force: boolean
optional: boolean
ignoreScripts: boolean
@@ -397,6 +400,7 @@ async function linkNewPackages (
depGraph,
depsStateCache: opts.depsStateCache,
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
force: opts.force,
ignoreScripts: opts.ignoreScripts,
lockfileDir: opts.lockfileDir,
@@ -451,6 +455,7 @@ async function linkAllPkgs (
depGraph: DependenciesGraph
depsStateCache: DepsStateCache
disableRelinkLocalDirDeps?: boolean
enableGlobalVirtualStore: boolean
force: boolean
ignoreScripts: boolean
lockfileDir: string
@@ -475,6 +480,7 @@ async function linkAllPkgs (
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
filesResponse: files,
force: opts.force,
safeToSkip: opts.enableGlobalVirtualStore,
sideEffectsCacheKey,
requiresBuild: depNode.patch != null || depNode.requiresBuild,
})

View File

@@ -919,6 +919,7 @@ async function linkAllPkgs (
force: depNode.forceImportPackage ?? opts.force,
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
requiresBuild: depNode.patch != null || depNode.requiresBuild,
safeToSkip: opts.enableGlobalVirtualStore,
sideEffectsCacheKey,
})
if (importMethod) {

View File

@@ -353,11 +353,18 @@ async function linkBin (cmd: CommandInfo, binsDir: string, opts?: LinkBinOptions
nodeExecPath: cmd.nodeExecPath,
})
} catch (err: any) { // eslint-disable-line
if (err.code !== 'ENOENT' && err.code !== 'EISDIR') {
throw err
if (err.code === 'ENOENT' || err.code === 'EISDIR') {
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
return
}
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
return
// On Windows, EPERM during bin creation can happen when another process
// (e.g. a parallel dlx call) is writing to the same shared bin directory.
// The other process will finish creating the bin, so we can safely skip.
if (IS_WINDOWS && err.code === 'EPERM') {
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
return
}
throw err
}
// ensure that bin are executable and not containing
// windows line-endings(CRLF) on the hashbang line

View File

@@ -25,18 +25,34 @@ export async function execPnpm (
}
): Promise<void> {
await new Promise<void>((resolve, reject) => {
const proc = spawnPnpm(args, opts)
const proc = crossSpawn.spawn(process.execPath, [pnpmBinLocation, ...args], {
env: {
...createEnv(opts),
...opts?.env,
} as NodeJS.ProcessEnv,
stdio: ['inherit', 'pipe', 'pipe'],
})
const timeout = opts?.timeout ?? DEFAULT_EXEC_PNPM_TIMEOUT
const timeoutId = registerProcessTimeout(proc, timeout, reject)
const output: Buffer[] = []
proc.stdout!.on('data', (chunk: Buffer) => {
output.push(chunk); process.stdout.write(chunk)
})
proc.stderr!.on('data', (chunk: Buffer) => {
output.push(chunk); process.stderr.write(chunk)
})
proc.on('error', reject)
proc.on('close', (code: number) => {
proc.on('close', (code: number | null, signal: string | null) => {
clearTimeout(timeoutId)
if (code > 0) {
reject(new Error(`Exit code ${code}`))
if (signal) {
reject(new Error(`Killed by signal ${signal}\n\n${Buffer.concat(output).toString()}`))
} else if (code) {
reject(new Error(`Exit code ${code}\n\n${Buffer.concat(output).toString()}`))
} else {
resolve()
}

View File

@@ -36,6 +36,7 @@ export interface ImportPackageOpts {
filesResponse: PackageFilesResponse
force: boolean
keepModulesDir?: boolean
safeToSkip?: boolean
}
export type ImportPackageFunction = (

View File

@@ -42,6 +42,7 @@ export function createPackageImporterAsync (
resolvedFrom: opts.filesResponse.resolvedFrom,
force: opts.force,
keepModulesDir: Boolean(opts.keepModulesDir),
safeToSkip: opts.safeToSkip,
})
return { importMethod, isBuilt }
}
@@ -72,6 +73,7 @@ function createPackageImporter (
resolvedFrom: opts.filesResponse.resolvedFrom,
force: opts.force,
keepModulesDir: Boolean(opts.keepModulesDir),
safeToSkip: opts.safeToSkip,
})
return { importMethod, isBuilt }
}

View File

@@ -167,6 +167,7 @@ export interface ImportOptions {
force: boolean
resolvedFrom: ResolvedFrom
keepModulesDir?: boolean
safeToSkip?: boolean
}
export type ImportIndexedPackage = (to: string, opts: ImportOptions) => string | undefined

View File

@@ -260,7 +260,7 @@ export async function importPackage (
localWorker.once('message', ({ status, error, value }: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
workerPool!.checkinWorker(localWorker)
if (status === 'error') {
reject(new PnpmError(error.code ?? 'LINKING_FAILED', error.message as string))
reject(new PnpmError(error.code ?? 'LINKING_FAILED', `[importPackage ${opts.targetDir}] ${error.message as string}`))
return
}
resolve(value)
@@ -285,7 +285,7 @@ export async function symlinkAllModules (
workerPool!.checkinWorker(localWorker)
if (status === 'error') {
const hint = opts.deps?.[0]?.modules != null ? createErrorHint(error, opts.deps[0].modules) : undefined
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', error.message as string, { hint }))
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', `[symlinkAllModules] ${error.message as string}`, { hint }))
return
}
resolve(value)

View File

@@ -421,6 +421,7 @@ function importPackage ({
force,
keepModulesDir,
disableRelinkLocalDirDeps,
safeToSkip,
}: LinkPkgMessage): ImportPackageResult {
const cacheKey = JSON.stringify({ storeDir, packageImportMethod })
if (!cafsStoreCache.has(cacheKey)) {
@@ -434,6 +435,7 @@ function importPackage ({
requiresBuild,
sideEffectsCacheKey,
keepModulesDir,
safeToSkip,
})
return { status: 'success', value: { isBuilt, importMethod } }
}

View File

@@ -33,6 +33,7 @@ export interface LinkPkgMessage {
force: boolean
keepModulesDir?: boolean
disableRelinkLocalDirDeps?: boolean
safeToSkip?: boolean
}
export interface SymlinkAllModulesMessage {