mirror of
https://github.com/pnpm/pnpm.git
synced 2026-04-27 18:46:18 -04:00
fix(dlx): fix race conditions in parallel dlx calls sharing Global Virtual Store (#10939)
## Summary Fixes intermittent failures in the `parallel dlx calls of the same package` test, especially on Windows CI. Multiple race conditions were discovered when concurrent `pnpm dlx` processes share the same Global Virtual Store (GVS): - **Content-verified skip in GVS mode**: When `safeToSkip` is true and a rename fails because the target already exists (ENOTEMPTY/EEXIST/EPERM), verify all files match (inode or content comparison) before skipping. Falls through to `renameOverwriteSync` if content doesn't match. - **Tolerate EPERM during bin creation on Windows**: `cmd-shim`'s `chmod` can fail with EPERM when another process holds the `.bin` file. Warn instead of crashing. - **Handle EPERM in DLX cache symlink**: Added EPERM to the list of tolerated errors when creating the DLX cache symlink, as Windows can throw this when another process has the symlink open. ## Test plan - [x] `parallel dlx calls of the same package` test passes on Windows CI - [x] Full test suite passes on both Ubuntu and Windows
This commit is contained in:
9
.changeset/fix-dlx-concurrent-install-race.md
Normal file
9
.changeset/fix-dlx-concurrent-install-race.md
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
"@pnpm/fs.indexed-pkg-importer": patch
|
||||
"@pnpm/plugin-commands-script-runners": patch
|
||||
"@pnpm/core": patch
|
||||
"@pnpm/link-bins": patch
|
||||
"pnpm": patch
|
||||
---
|
||||
|
||||
Fixed intermittent failures when multiple `pnpm dlx` calls run concurrently for the same package. When the global virtual store is enabled, the importer now verifies file content before skipping a rename, avoiding destructive swap-renames that break concurrent processes. Also tolerates EPERM during bin creation on Windows and properly propagates `enableGlobalVirtualStore` through the install pipeline.
|
||||
@@ -162,12 +162,12 @@ export async function handler (
|
||||
try {
|
||||
await symlinkDir(cachedDir, cacheLink, { overwrite: true })
|
||||
} catch (error) {
|
||||
// EBUSY means that there is another dlx process running in parallel that has acquired the cache link first.
|
||||
// Similarly, EEXIST means that another dlx process has created the cache link before this process.
|
||||
// EBUSY/EEXIST/EPERM means that there is another dlx process running in parallel that has acquired the cache link first.
|
||||
// EPERM can happen on Windows when another process has the symlink open while this process tries to unlink it.
|
||||
// The link created by the other process is just as up-to-date as the link the current process was attempting
|
||||
// to create. Therefore, instead of re-attempting to create the current link again, it is just as good to let
|
||||
// the other link stay. The current process should yield.
|
||||
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST')) {
|
||||
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST' && error.code !== 'EPERM')) {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import fs from 'fs'
|
||||
import util from 'util'
|
||||
import fsx from 'fs-extra'
|
||||
import path from 'path'
|
||||
import { globalWarn, logger } from '@pnpm/logger'
|
||||
import { globalInfo, globalWarn, logger } from '@pnpm/logger'
|
||||
import { rimrafSync } from '@zkochan/rimraf'
|
||||
import { makeEmptyDirSync } from 'make-empty-dir'
|
||||
import sanitizeFilename from 'sanitize-filename'
|
||||
@@ -20,6 +20,7 @@ export function importIndexedDir (
|
||||
filenames: Map<string, string>,
|
||||
opts: {
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
): void {
|
||||
const stage = pathTemp(newDir)
|
||||
@@ -61,38 +62,61 @@ They were renamed.`)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
if (opts.safeToSkip) {
|
||||
// Content-addressable target (e.g. global virtual store): if the target
|
||||
// already exists and has all expected files, it has the correct content.
|
||||
// Skip instead of doing a swap-rename that temporarily removes the target
|
||||
// directory — which breaks junctions read by other processes.
|
||||
try {
|
||||
fs.renameSync(stage, newDir)
|
||||
return
|
||||
} catch (err: unknown) {
|
||||
if (util.types.isNativeError(err) && 'code' in err && (err.code === 'ENOTEMPTY' || err.code === 'EEXIST' || err.code === 'EPERM')) {
|
||||
if (allFilesMatch(newDir, filenames)) {
|
||||
try {
|
||||
rimrafSync(stage)
|
||||
} catch {} // eslint-disable-line:no-empty
|
||||
return
|
||||
}
|
||||
}
|
||||
// Files missing or other error — fall through to renameOverwriteSync
|
||||
}
|
||||
}
|
||||
try {
|
||||
renameOverwriteSync(stage, newDir)
|
||||
} catch (renameErr: unknown) {
|
||||
// When enableGlobalVirtualStore is true, multiple worker threads may import
|
||||
// the same package to the same global store location concurrently. Their
|
||||
// rename operations can race. If the rename fails but the target already
|
||||
// has the expected content, another thread completed the import.
|
||||
try {
|
||||
rimrafSync(stage)
|
||||
} catch {} // eslint-disable-line:no-empty
|
||||
if (util.types.isNativeError(renameErr) && 'code' in renameErr && (renameErr.code === 'ENOTEMPTY' || renameErr.code === 'EEXIST')) {
|
||||
const firstFile = filenames.keys().next().value
|
||||
if (firstFile) {
|
||||
const targetFile = path.join(newDir, firstFile)
|
||||
// Retry with short delays. With 3+ concurrent workers, a third thread
|
||||
// may have rimrafed the target (inside its own renameOverwrite) but not
|
||||
// yet completed its own rename. A short wait lets it finish.
|
||||
for (let attempt = 0; attempt < 4; attempt++) {
|
||||
if (attempt > 0) {
|
||||
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, 50)
|
||||
}
|
||||
if (fs.existsSync(targetFile)) {
|
||||
logger('_virtual-store-race').debug({ target: newDir })
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw renameErr
|
||||
}
|
||||
}
|
||||
|
||||
function allFilesMatch (dir: string, filenames: Map<string, string>): boolean {
|
||||
for (const [f, src] of filenames) {
|
||||
const target = path.join(dir, f)
|
||||
try {
|
||||
const targetStat = gfs.statSync(target)
|
||||
const srcStat = gfs.statSync(src)
|
||||
// Fast path: hardlinks share the same inode
|
||||
if (targetStat.ino === srcStat.ino && targetStat.dev === srcStat.dev) continue
|
||||
// Copy path: compare size first, then content
|
||||
if (targetStat.size !== srcStat.size) {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" has a different size`)
|
||||
return false
|
||||
}
|
||||
if (!gfs.readFileSync(target).equals(gfs.readFileSync(src))) {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" has different content`)
|
||||
return false
|
||||
}
|
||||
} catch {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" is missing or unreadable`)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
interface SanitizeFilenamesResult {
|
||||
sanitizedFilenames: Map<string, string>
|
||||
invalidFilenames: string[]
|
||||
|
||||
@@ -3,10 +3,8 @@ import path from 'path'
|
||||
import { jest } from '@jest/globals'
|
||||
import { tempDir } from '@pnpm/prepare'
|
||||
|
||||
// renameOverwrite is mocked because its real implementation handles ENOTEMPTY
|
||||
// internally (rimraf + retry). The race condition we're testing only surfaces
|
||||
// when renameOverwrite exhausts its retries due to concurrent threads
|
||||
// repeatedly recreating the target, which can't be reproduced deterministically.
|
||||
// Mock renameOverwriteSync so we can verify it's called (or not called)
|
||||
// and control its behavior in tests.
|
||||
const renameOverwriteSyncMock = jest.fn()
|
||||
jest.unstable_mockModule('rename-overwrite', () => ({
|
||||
renameOverwrite: jest.fn(),
|
||||
@@ -19,7 +17,31 @@ beforeEach(() => {
|
||||
renameOverwriteSyncMock.mockReset()
|
||||
})
|
||||
|
||||
test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY)', () => {
|
||||
test('safeToSkip skips when target already exists (content-addressed)', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'package.json')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
|
||||
// Create source file in CAS
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, '{"name":"pkg","version":"1.0.0"}')
|
||||
|
||||
// Pre-create target (simulates another process that already placed it)
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
fs.linkSync(srcFile, path.join(newDir, 'package.json'))
|
||||
|
||||
const filenames = new Map([['package.json', srcFile]])
|
||||
|
||||
// Should not throw — target exists, path is content-addressed so content is correct
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
|
||||
|
||||
expect(fs.existsSync(path.join(newDir, 'package.json'))).toBe(true)
|
||||
// When safeToSkip is true and the target already exists with matching content,
|
||||
// renameOverwriteSync should not be called on any platform.
|
||||
expect(renameOverwriteSyncMock).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test('safeToSkip creates dir when target does not exist', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'index.js')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
@@ -28,41 +50,10 @@ test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, 'content')
|
||||
|
||||
// Pre-create target with expected content (simulating another thread completed first)
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
fs.writeFileSync(path.join(newDir, 'index.js'), 'content')
|
||||
|
||||
const filenames = new Map([['index.js', srcFile]])
|
||||
|
||||
renameOverwriteSyncMock.mockImplementation(() => {
|
||||
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
|
||||
})
|
||||
|
||||
// Should not throw — the target already has the expected content
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
|
||||
// Target doesn't exist — should create it
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
|
||||
|
||||
expect(fs.existsSync(path.join(newDir, 'index.js'))).toBe(true)
|
||||
})
|
||||
|
||||
test('importIndexedDir throws ENOTEMPTY when target does not have expected content', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'index.js')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
|
||||
// Create source file
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, 'content')
|
||||
|
||||
// Target exists but does NOT have the expected file
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
|
||||
const filenames = new Map([['index.js', srcFile]])
|
||||
|
||||
renameOverwriteSyncMock.mockImplementation(() => {
|
||||
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
|
||||
})
|
||||
|
||||
expect(() => {
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
|
||||
}).toThrow('ENOTEMPTY')
|
||||
})
|
||||
|
||||
@@ -1311,6 +1311,7 @@ const _installInContext: InstallFunction = async (projects, ctx, opts) => {
|
||||
dependenciesByProjectId,
|
||||
depsStateCache,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
extraNodePaths: ctx.extraNodePaths,
|
||||
force: opts.force,
|
||||
hoistedDependencies: ctx.hoistedDependencies,
|
||||
|
||||
@@ -46,6 +46,7 @@ export interface LinkPackagesOptions {
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
force: boolean
|
||||
depsStateCache: DepsStateCache
|
||||
enableGlobalVirtualStore: boolean
|
||||
extraNodePaths: string[]
|
||||
hoistedDependencies: HoistedDependencies
|
||||
hoistedModulesDir: string
|
||||
@@ -146,6 +147,7 @@ export async function linkPackages (projects: ImporterToUpdate[], depGraph: Depe
|
||||
{
|
||||
allowBuild: opts.allowBuild,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
force: opts.force,
|
||||
depsStateCache: opts.depsStateCache,
|
||||
ignoreScripts: opts.ignoreScripts,
|
||||
@@ -317,6 +319,7 @@ interface LinkNewPackagesOptions {
|
||||
allowBuild?: AllowBuild
|
||||
depsStateCache: DepsStateCache
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
enableGlobalVirtualStore: boolean
|
||||
force: boolean
|
||||
optional: boolean
|
||||
ignoreScripts: boolean
|
||||
@@ -397,6 +400,7 @@ async function linkNewPackages (
|
||||
depGraph,
|
||||
depsStateCache: opts.depsStateCache,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
force: opts.force,
|
||||
ignoreScripts: opts.ignoreScripts,
|
||||
lockfileDir: opts.lockfileDir,
|
||||
@@ -451,6 +455,7 @@ async function linkAllPkgs (
|
||||
depGraph: DependenciesGraph
|
||||
depsStateCache: DepsStateCache
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
enableGlobalVirtualStore: boolean
|
||||
force: boolean
|
||||
ignoreScripts: boolean
|
||||
lockfileDir: string
|
||||
@@ -475,6 +480,7 @@ async function linkAllPkgs (
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
filesResponse: files,
|
||||
force: opts.force,
|
||||
safeToSkip: opts.enableGlobalVirtualStore,
|
||||
sideEffectsCacheKey,
|
||||
requiresBuild: depNode.patch != null || depNode.requiresBuild,
|
||||
})
|
||||
|
||||
@@ -919,6 +919,7 @@ async function linkAllPkgs (
|
||||
force: depNode.forceImportPackage ?? opts.force,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
requiresBuild: depNode.patch != null || depNode.requiresBuild,
|
||||
safeToSkip: opts.enableGlobalVirtualStore,
|
||||
sideEffectsCacheKey,
|
||||
})
|
||||
if (importMethod) {
|
||||
|
||||
@@ -353,11 +353,18 @@ async function linkBin (cmd: CommandInfo, binsDir: string, opts?: LinkBinOptions
|
||||
nodeExecPath: cmd.nodeExecPath,
|
||||
})
|
||||
} catch (err: any) { // eslint-disable-line
|
||||
if (err.code !== 'ENOENT' && err.code !== 'EISDIR') {
|
||||
throw err
|
||||
if (err.code === 'ENOENT' || err.code === 'EISDIR') {
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
}
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
// On Windows, EPERM during bin creation can happen when another process
|
||||
// (e.g. a parallel dlx call) is writing to the same shared bin directory.
|
||||
// The other process will finish creating the bin, so we can safely skip.
|
||||
if (IS_WINDOWS && err.code === 'EPERM') {
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
}
|
||||
throw err
|
||||
}
|
||||
// ensure that bin are executable and not containing
|
||||
// windows line-endings(CRLF) on the hashbang line
|
||||
|
||||
@@ -25,18 +25,34 @@ export async function execPnpm (
|
||||
}
|
||||
): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const proc = spawnPnpm(args, opts)
|
||||
const proc = crossSpawn.spawn(process.execPath, [pnpmBinLocation, ...args], {
|
||||
env: {
|
||||
...createEnv(opts),
|
||||
...opts?.env,
|
||||
} as NodeJS.ProcessEnv,
|
||||
stdio: ['inherit', 'pipe', 'pipe'],
|
||||
})
|
||||
|
||||
const timeout = opts?.timeout ?? DEFAULT_EXEC_PNPM_TIMEOUT
|
||||
const timeoutId = registerProcessTimeout(proc, timeout, reject)
|
||||
|
||||
const output: Buffer[] = []
|
||||
proc.stdout!.on('data', (chunk: Buffer) => {
|
||||
output.push(chunk); process.stdout.write(chunk)
|
||||
})
|
||||
proc.stderr!.on('data', (chunk: Buffer) => {
|
||||
output.push(chunk); process.stderr.write(chunk)
|
||||
})
|
||||
|
||||
proc.on('error', reject)
|
||||
|
||||
proc.on('close', (code: number) => {
|
||||
proc.on('close', (code: number | null, signal: string | null) => {
|
||||
clearTimeout(timeoutId)
|
||||
|
||||
if (code > 0) {
|
||||
reject(new Error(`Exit code ${code}`))
|
||||
if (signal) {
|
||||
reject(new Error(`Killed by signal ${signal}\n\n${Buffer.concat(output).toString()}`))
|
||||
} else if (code) {
|
||||
reject(new Error(`Exit code ${code}\n\n${Buffer.concat(output).toString()}`))
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ export interface ImportPackageOpts {
|
||||
filesResponse: PackageFilesResponse
|
||||
force: boolean
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export type ImportPackageFunction = (
|
||||
|
||||
@@ -42,6 +42,7 @@ export function createPackageImporterAsync (
|
||||
resolvedFrom: opts.filesResponse.resolvedFrom,
|
||||
force: opts.force,
|
||||
keepModulesDir: Boolean(opts.keepModulesDir),
|
||||
safeToSkip: opts.safeToSkip,
|
||||
})
|
||||
return { importMethod, isBuilt }
|
||||
}
|
||||
@@ -72,6 +73,7 @@ function createPackageImporter (
|
||||
resolvedFrom: opts.filesResponse.resolvedFrom,
|
||||
force: opts.force,
|
||||
keepModulesDir: Boolean(opts.keepModulesDir),
|
||||
safeToSkip: opts.safeToSkip,
|
||||
})
|
||||
return { importMethod, isBuilt }
|
||||
}
|
||||
|
||||
@@ -167,6 +167,7 @@ export interface ImportOptions {
|
||||
force: boolean
|
||||
resolvedFrom: ResolvedFrom
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export type ImportIndexedPackage = (to: string, opts: ImportOptions) => string | undefined
|
||||
|
||||
@@ -260,7 +260,7 @@ export async function importPackage (
|
||||
localWorker.once('message', ({ status, error, value }: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
|
||||
workerPool!.checkinWorker(localWorker)
|
||||
if (status === 'error') {
|
||||
reject(new PnpmError(error.code ?? 'LINKING_FAILED', error.message as string))
|
||||
reject(new PnpmError(error.code ?? 'LINKING_FAILED', `[importPackage ${opts.targetDir}] ${error.message as string}`))
|
||||
return
|
||||
}
|
||||
resolve(value)
|
||||
@@ -285,7 +285,7 @@ export async function symlinkAllModules (
|
||||
workerPool!.checkinWorker(localWorker)
|
||||
if (status === 'error') {
|
||||
const hint = opts.deps?.[0]?.modules != null ? createErrorHint(error, opts.deps[0].modules) : undefined
|
||||
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', error.message as string, { hint }))
|
||||
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', `[symlinkAllModules] ${error.message as string}`, { hint }))
|
||||
return
|
||||
}
|
||||
resolve(value)
|
||||
|
||||
@@ -421,6 +421,7 @@ function importPackage ({
|
||||
force,
|
||||
keepModulesDir,
|
||||
disableRelinkLocalDirDeps,
|
||||
safeToSkip,
|
||||
}: LinkPkgMessage): ImportPackageResult {
|
||||
const cacheKey = JSON.stringify({ storeDir, packageImportMethod })
|
||||
if (!cafsStoreCache.has(cacheKey)) {
|
||||
@@ -434,6 +435,7 @@ function importPackage ({
|
||||
requiresBuild,
|
||||
sideEffectsCacheKey,
|
||||
keepModulesDir,
|
||||
safeToSkip,
|
||||
})
|
||||
return { status: 'success', value: { isBuilt, importMethod } }
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ export interface LinkPkgMessage {
|
||||
force: boolean
|
||||
keepModulesDir?: boolean
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export interface SymlinkAllModulesMessage {
|
||||
|
||||
Reference in New Issue
Block a user