mirror of
https://github.com/pnpm/pnpm.git
synced 2026-06-02 21:15:27 -04:00
fix(dlx): fix race conditions in parallel dlx calls sharing
Global Virtual Store (#10939)
Content-verified skip in GVS mode, tolerate EPERM during
bin creation on Windows, handle EPERM in DLX cache symlink.
(cherry picked from commit 62f760ec3d)
This commit is contained in:
9
.changeset/fix-dlx-concurrent-install-race.md
Normal file
9
.changeset/fix-dlx-concurrent-install-race.md
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
"@pnpm/fs.indexed-pkg-importer": patch
|
||||
"@pnpm/plugin-commands-script-runners": patch
|
||||
"@pnpm/core": patch
|
||||
"@pnpm/link-bins": patch
|
||||
"pnpm": patch
|
||||
---
|
||||
|
||||
Fixed intermittent failures when multiple `pnpm dlx` calls run concurrently for the same package. When the global virtual store is enabled, the importer now verifies file content before skipping a rename, avoiding destructive swap-renames that break concurrent processes. Also tolerates EPERM during bin creation on Windows and properly propagates `enableGlobalVirtualStore` through the install pipeline.
|
||||
@@ -162,12 +162,12 @@ export async function handler (
|
||||
try {
|
||||
await symlinkDir(cachedDir, cacheLink, { overwrite: true })
|
||||
} catch (error) {
|
||||
// EBUSY means that there is another dlx process running in parallel that has acquired the cache link first.
|
||||
// Similarly, EEXIST means that another dlx process has created the cache link before this process.
|
||||
// EBUSY/EEXIST/EPERM means that there is another dlx process running in parallel that has acquired the cache link first.
|
||||
// EPERM can happen on Windows when another process has the symlink open while this process tries to unlink it.
|
||||
// The link created by the other process is just as up-to-date as the link the current process was attempting
|
||||
// to create. Therefore, instead of re-attempting to create the current link again, it is just as good to let
|
||||
// the other link stay. The current process should yield.
|
||||
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST')) {
|
||||
if (!util.types.isNativeError(error) || !('code' in error) || (error.code !== 'EBUSY' && error.code !== 'EEXIST' && error.code !== 'EPERM')) {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import fs from 'fs'
|
||||
import util from 'util'
|
||||
import { copySync } from 'fs-extra'
|
||||
import path from 'path'
|
||||
import { globalWarn, logger } from '@pnpm/logger'
|
||||
import { globalInfo, globalWarn, logger } from '@pnpm/logger'
|
||||
import { sync as rimraf } from '@zkochan/rimraf'
|
||||
import { sync as makeEmptyDir } from 'make-empty-dir'
|
||||
import sanitizeFilename from 'sanitize-filename'
|
||||
@@ -20,6 +20,7 @@ export function importIndexedDir (
|
||||
filenames: Record<string, string>,
|
||||
opts: {
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
): void {
|
||||
const stage = pathTemp(newDir)
|
||||
@@ -61,27 +62,61 @@ They were renamed.`)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
if (opts.safeToSkip) {
|
||||
// Content-addressable target (e.g. global virtual store): if the target
|
||||
// already exists and has all expected files, it has the correct content.
|
||||
// Skip instead of doing a swap-rename that temporarily removes the target
|
||||
// directory — which breaks junctions read by other processes.
|
||||
try {
|
||||
fs.renameSync(stage, newDir)
|
||||
return
|
||||
} catch (err: unknown) {
|
||||
if (util.types.isNativeError(err) && 'code' in err && (err.code === 'ENOTEMPTY' || err.code === 'EEXIST' || err.code === 'EPERM')) {
|
||||
if (allFilesMatch(newDir, filenames)) {
|
||||
try {
|
||||
rimraf(stage)
|
||||
} catch {} // eslint-disable-line:no-empty
|
||||
return
|
||||
}
|
||||
}
|
||||
// Files missing or other error — fall through to renameOverwriteSync
|
||||
}
|
||||
}
|
||||
try {
|
||||
renameOverwrite.sync(stage, newDir)
|
||||
} catch (renameErr: unknown) {
|
||||
// When enableGlobalVirtualStore is true, multiple worker threads may import
|
||||
// the same package to the same global store location concurrently. Their
|
||||
// rename operations can race. If the rename fails but the target already
|
||||
// has the expected content, another thread completed the import.
|
||||
try {
|
||||
rimraf(stage)
|
||||
} catch {} // eslint-disable-line:no-empty
|
||||
if (util.types.isNativeError(renameErr) && 'code' in renameErr && (renameErr.code === 'ENOTEMPTY' || renameErr.code === 'EEXIST')) {
|
||||
const firstFile = Object.keys(filenames)[0]
|
||||
if (firstFile && fs.existsSync(path.join(newDir, firstFile))) {
|
||||
logger('_virtual-store-race').debug({ target: newDir })
|
||||
return
|
||||
}
|
||||
}
|
||||
throw renameErr
|
||||
}
|
||||
}
|
||||
|
||||
function allFilesMatch (dir: string, filenames: Record<string, string>): boolean {
|
||||
for (const [f, src] of Object.entries(filenames)) {
|
||||
const target = path.join(dir, f)
|
||||
try {
|
||||
const targetStat = gfs.statSync(target)
|
||||
const srcStat = gfs.statSync(src)
|
||||
// Fast path: hardlinks share the same inode
|
||||
if (targetStat.ino === srcStat.ino && targetStat.dev === srcStat.dev) continue
|
||||
// Copy path: compare size first, then content
|
||||
if (targetStat.size !== srcStat.size) {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" has a different size`)
|
||||
return false
|
||||
}
|
||||
if (!gfs.readFileSync(target).equals(gfs.readFileSync(src))) {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" has different content`)
|
||||
return false
|
||||
}
|
||||
} catch {
|
||||
globalInfo(`Re-importing "${dir}" because file "${f}" is missing or unreadable`)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
interface SanitizeFilenamesResult {
|
||||
sanitizedFilenames: Record<string, string>
|
||||
invalidFilenames: string[]
|
||||
|
||||
@@ -3,10 +3,8 @@ import path from 'path'
|
||||
import { jest } from '@jest/globals'
|
||||
import { tempDir } from '@pnpm/prepare'
|
||||
|
||||
// renameOverwrite is mocked because its real implementation handles ENOTEMPTY
|
||||
// internally (rimraf + retry). The race condition we're testing only surfaces
|
||||
// when renameOverwrite exhausts its retries due to concurrent threads
|
||||
// repeatedly recreating the target, which can't be reproduced deterministically.
|
||||
// renameOverwrite is mocked so we can verify it's called (or not called)
|
||||
// and control its behavior in tests.
|
||||
jest.mock('rename-overwrite', () => {
|
||||
const fn = Object.assign(jest.fn(), { sync: jest.fn() })
|
||||
return fn
|
||||
@@ -21,7 +19,31 @@ beforeEach(() => {
|
||||
renameOverwrite.sync.mockReset()
|
||||
})
|
||||
|
||||
test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY)', () => {
|
||||
test('safeToSkip skips when target already exists (content-addressed)', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'package.json')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
|
||||
// Create source file in CAS
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, '{"name":"pkg","version":"1.0.0"}')
|
||||
|
||||
// Pre-create target (simulates another process that already placed it)
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
fs.linkSync(srcFile, path.join(newDir, 'package.json'))
|
||||
|
||||
const filenames: Record<string, string> = { 'package.json': srcFile }
|
||||
|
||||
// Should not throw — target exists, path is content-addressed so content is correct
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
|
||||
|
||||
expect(fs.existsSync(path.join(newDir, 'package.json'))).toBe(true)
|
||||
// When safeToSkip is true and the target already exists with matching content,
|
||||
// renameOverwrite.sync should not be called on any platform.
|
||||
expect(renameOverwrite.sync).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test('safeToSkip creates dir when target does not exist', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'index.js')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
@@ -30,41 +52,10 @@ test('importIndexedDir succeeds when rename races with another thread (ENOTEMPTY
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, 'content')
|
||||
|
||||
// Pre-create target with expected content (simulating another thread completed first)
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
fs.writeFileSync(path.join(newDir, 'index.js'), 'content')
|
||||
|
||||
const filenames: Record<string, string> = { 'index.js': srcFile }
|
||||
|
||||
renameOverwrite.sync.mockImplementation(() => {
|
||||
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
|
||||
})
|
||||
|
||||
// Should not throw — the target already has the expected content
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
|
||||
// Target doesn't exist — should create it
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, { safeToSkip: true })
|
||||
|
||||
expect(fs.existsSync(path.join(newDir, 'index.js'))).toBe(true)
|
||||
})
|
||||
|
||||
test('importIndexedDir throws ENOTEMPTY when target does not have expected content', () => {
|
||||
const tmp = tempDir()
|
||||
const srcFile = path.join(tmp, 'src', 'index.js')
|
||||
const newDir = path.join(tmp, 'dest')
|
||||
|
||||
// Create source file
|
||||
fs.mkdirSync(path.join(tmp, 'src'), { recursive: true })
|
||||
fs.writeFileSync(srcFile, 'content')
|
||||
|
||||
// Target exists but does NOT have the expected file
|
||||
fs.mkdirSync(newDir, { recursive: true })
|
||||
|
||||
const filenames: Record<string, string> = { 'index.js': srcFile }
|
||||
|
||||
renameOverwrite.sync.mockImplementation(() => {
|
||||
throw Object.assign(new Error('ENOTEMPTY: directory not empty'), { code: 'ENOTEMPTY' })
|
||||
})
|
||||
|
||||
expect(() => {
|
||||
importIndexedDir(fs.copyFileSync, newDir, filenames, {})
|
||||
}).toThrow('ENOTEMPTY')
|
||||
})
|
||||
|
||||
@@ -1294,6 +1294,7 @@ const _installInContext: InstallFunction = async (projects, ctx, opts) => {
|
||||
dependenciesByProjectId,
|
||||
depsStateCache,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
extraNodePaths: ctx.extraNodePaths,
|
||||
force: opts.force,
|
||||
hoistedDependencies: ctx.hoistedDependencies,
|
||||
|
||||
@@ -51,6 +51,7 @@ export interface LinkPackagesOptions {
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
force: boolean
|
||||
depsStateCache: DepsStateCache
|
||||
enableGlobalVirtualStore: boolean
|
||||
extraNodePaths: string[]
|
||||
hoistedDependencies: HoistedDependencies
|
||||
hoistedModulesDir: string
|
||||
@@ -151,6 +152,7 @@ export async function linkPackages (projects: ImporterToUpdate[], depGraph: Depe
|
||||
{
|
||||
allowBuild: opts.allowBuild,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
force: opts.force,
|
||||
depsStateCache: opts.depsStateCache,
|
||||
ignoreScripts: opts.ignoreScripts,
|
||||
@@ -322,6 +324,7 @@ interface LinkNewPackagesOptions {
|
||||
allowBuild?: AllowBuild
|
||||
depsStateCache: DepsStateCache
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
enableGlobalVirtualStore: boolean
|
||||
force: boolean
|
||||
optional: boolean
|
||||
ignoreScripts: boolean
|
||||
@@ -402,6 +405,7 @@ async function linkNewPackages (
|
||||
depGraph,
|
||||
depsStateCache: opts.depsStateCache,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
force: opts.force,
|
||||
ignoreScripts: opts.ignoreScripts,
|
||||
lockfileDir: opts.lockfileDir,
|
||||
@@ -456,6 +460,7 @@ async function linkAllPkgs (
|
||||
depGraph: DependenciesGraph
|
||||
depsStateCache: DepsStateCache
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
enableGlobalVirtualStore: boolean
|
||||
force: boolean
|
||||
ignoreScripts: boolean
|
||||
lockfileDir: string
|
||||
@@ -480,6 +485,7 @@ async function linkAllPkgs (
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
filesResponse: files,
|
||||
force: opts.force,
|
||||
safeToSkip: opts.enableGlobalVirtualStore,
|
||||
sideEffectsCacheKey,
|
||||
requiresBuild: depNode.patch != null || depNode.requiresBuild,
|
||||
})
|
||||
|
||||
@@ -433,6 +433,7 @@ export async function headlessInstall (opts: HeadlessOptions): Promise<Installat
|
||||
allowBuild,
|
||||
force: opts.force,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
enableGlobalVirtualStore: opts.enableGlobalVirtualStore,
|
||||
depGraph: graph,
|
||||
depsStateCache,
|
||||
ignoreScripts: opts.ignoreScripts,
|
||||
@@ -865,6 +866,7 @@ async function linkAllPkgs (
|
||||
depGraph: DependenciesGraph
|
||||
depsStateCache: DepsStateCache
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
enableGlobalVirtualStore?: boolean
|
||||
force: boolean
|
||||
ignoreScripts: boolean
|
||||
lockfileDir: string
|
||||
@@ -897,6 +899,7 @@ async function linkAllPkgs (
|
||||
force: depNode.forceImportPackage ?? opts.force,
|
||||
disableRelinkLocalDirDeps: opts.disableRelinkLocalDirDeps,
|
||||
requiresBuild: depNode.patch != null || depNode.requiresBuild,
|
||||
safeToSkip: opts.enableGlobalVirtualStore,
|
||||
sideEffectsCacheKey,
|
||||
})
|
||||
if (importMethod) {
|
||||
|
||||
@@ -332,11 +332,18 @@ async function linkBin (cmd: CommandInfo, binsDir: string, opts?: LinkBinOptions
|
||||
nodeExecPath: cmd.nodeExecPath,
|
||||
})
|
||||
} catch (err: any) { // eslint-disable-line
|
||||
if (err.code !== 'ENOENT' && err.code !== 'EISDIR') {
|
||||
throw err
|
||||
if (err.code === 'ENOENT' || err.code === 'EISDIR') {
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
}
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
// On Windows, EPERM during bin creation can happen when another process
|
||||
// (e.g. a parallel dlx call) is writing to the same shared bin directory.
|
||||
// The other process will finish creating the bin, so we can safely skip.
|
||||
if (IS_WINDOWS && err.code === 'EPERM') {
|
||||
globalWarn(`Failed to create bin at ${externalBinPath}. ${err.message as string}`)
|
||||
return
|
||||
}
|
||||
throw err
|
||||
}
|
||||
// ensure that bin are executable and not containing
|
||||
// windows line-endings(CRLF) on the hashbang line
|
||||
|
||||
@@ -22,18 +22,34 @@ export async function execPnpm (
|
||||
}
|
||||
): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const proc = spawnPnpm(args, opts)
|
||||
const proc = crossSpawn.spawn(process.execPath, [pnpmBinLocation, ...args], {
|
||||
env: {
|
||||
...createEnv(),
|
||||
...opts?.env,
|
||||
} as NodeJS.ProcessEnv,
|
||||
stdio: ['inherit', 'pipe', 'pipe'],
|
||||
})
|
||||
|
||||
const timeout = opts?.timeout ?? DEFAULT_EXEC_PNPM_TIMEOUT
|
||||
const timeoutId = registerProcessTimeout(proc, timeout, reject)
|
||||
|
||||
const output: Buffer[] = []
|
||||
proc.stdout!.on('data', (chunk: Buffer) => {
|
||||
output.push(chunk); process.stdout.write(chunk)
|
||||
})
|
||||
proc.stderr!.on('data', (chunk: Buffer) => {
|
||||
output.push(chunk); process.stderr.write(chunk)
|
||||
})
|
||||
|
||||
proc.on('error', reject)
|
||||
|
||||
proc.on('close', (code: number) => {
|
||||
proc.on('close', (code: number | null, signal: string | null) => {
|
||||
clearTimeout(timeoutId)
|
||||
|
||||
if (code > 0) {
|
||||
reject(new Error(`Exit code ${code}`))
|
||||
if (signal) {
|
||||
reject(new Error(`Killed by signal ${signal}\n\n${Buffer.concat(output).toString()}`))
|
||||
} else if (code) {
|
||||
reject(new Error(`Exit code ${code}\n\n${Buffer.concat(output).toString()}`))
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ export interface ImportPackageOpts {
|
||||
filesResponse: PackageFilesResponse
|
||||
force: boolean
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export type ImportPackageFunction = (
|
||||
|
||||
@@ -44,6 +44,7 @@ export function createPackageImporterAsync (
|
||||
resolvedFrom: opts.filesResponse.resolvedFrom,
|
||||
force: opts.force,
|
||||
keepModulesDir: Boolean(opts.keepModulesDir),
|
||||
safeToSkip: opts.safeToSkip,
|
||||
})
|
||||
return { importMethod, isBuilt }
|
||||
}
|
||||
@@ -74,6 +75,7 @@ function createPackageImporter (
|
||||
resolvedFrom: opts.filesResponse.resolvedFrom,
|
||||
force: opts.force,
|
||||
keepModulesDir: Boolean(opts.keepModulesDir),
|
||||
safeToSkip: opts.safeToSkip,
|
||||
})
|
||||
return { importMethod, isBuilt }
|
||||
}
|
||||
|
||||
@@ -184,6 +184,7 @@ export interface ImportOptions {
|
||||
force: boolean
|
||||
resolvedFrom: ResolvedFrom
|
||||
keepModulesDir?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export type ImportIndexedPackage = (to: string, opts: ImportOptions) => string | undefined
|
||||
|
||||
@@ -222,7 +222,7 @@ export async function importPackage (
|
||||
localWorker.once('message', ({ status, error, value }: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
|
||||
workerPool!.checkinWorker(localWorker)
|
||||
if (status === 'error') {
|
||||
reject(new PnpmError(error.code ?? 'LINKING_FAILED', error.message as string))
|
||||
reject(new PnpmError(error.code ?? 'LINKING_FAILED', `[importPackage ${opts.targetDir}] ${error.message as string}`))
|
||||
return
|
||||
}
|
||||
resolve(value)
|
||||
@@ -247,7 +247,7 @@ export async function symlinkAllModules (
|
||||
workerPool!.checkinWorker(localWorker)
|
||||
if (status === 'error') {
|
||||
const hint = opts.deps?.[0]?.modules != null ? createErrorHint(error, opts.deps[0].modules) : undefined
|
||||
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', error.message as string, { hint }))
|
||||
reject(new PnpmError(error.code ?? 'SYMLINK_FAILED', `[symlinkAllModules] ${error.message as string}`, { hint }))
|
||||
return
|
||||
}
|
||||
resolve(value)
|
||||
|
||||
@@ -344,6 +344,7 @@ function importPackage ({
|
||||
force,
|
||||
keepModulesDir,
|
||||
disableRelinkLocalDirDeps,
|
||||
safeToSkip,
|
||||
}: LinkPkgMessage): ImportPackageResult {
|
||||
const cacheKey = JSON.stringify({ storeDir, packageImportMethod })
|
||||
if (!cafsStoreCache.has(cacheKey)) {
|
||||
@@ -357,6 +358,7 @@ function importPackage ({
|
||||
requiresBuild,
|
||||
sideEffectsCacheKey,
|
||||
keepModulesDir,
|
||||
safeToSkip,
|
||||
})
|
||||
return { status: 'success', value: { isBuilt, importMethod } }
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ export interface LinkPkgMessage {
|
||||
force: boolean
|
||||
keepModulesDir?: boolean
|
||||
disableRelinkLocalDirDeps?: boolean
|
||||
safeToSkip?: boolean
|
||||
}
|
||||
|
||||
export interface SymlinkAllModulesMessage {
|
||||
|
||||
Reference in New Issue
Block a user