From 878a7734a0b6a31a1f15215942eeecbb923e31da Mon Sep 17 00:00:00 2001 From: Victor Sumner <308886+vsumner@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:20:34 -0400 Subject: [PATCH] perf(cafs): skip rename on cold CAS writes, use atomic rename for recovery (#11087) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Every file extracted to the CAS goes through a temp-file-plus-rename cycle: `writeFile(temp, buffer)` then `renameOverwriteSync(temp, fileDest)`. For a typical cold install with ~30k files, this adds ~30k extra rename syscalls. ## Solution Use `writeFileExclusive()` with `{ flag: 'wx' }` (O_CREAT|O_EXCL) to write directly to the final CAS path when the file doesn't exist — skipping the temp+rename overhead. For recovery paths (corrupt/partial files, EEXIST races), fall back to the existing atomic temp+rename via `optimisticRenameOverwrite`. ### Write paths - **File doesn't exist (common cold-install path)** → `writeFileExclusive` writes directly, no rename - **File exists with correct integrity** → return immediately, no write - **File exists with wrong integrity (corruption/crash)** → atomic temp+rename recovery - **EEXIST (concurrent write)** → verify integrity; if OK return, otherwise atomic temp+rename recovery ### Concurrent safety - `writeFileExclusive` (`O_CREAT|O_EXCL`) ensures only one process creates a given CAS file - Recovery overwrites use the battle-tested `optimisticRenameOverwrite` + `pathTemp` for atomic replacement - `verifyFileIntegrity` is non-destructive (no `unlinkSync` on mismatch), safe when another process may be mid-write - A crash mid-`writeFileExclusive` can leave a partial file, recovered on next access via atomic temp+rename --------- Co-authored-by: Zoltan Kochan --- .changeset/perf-cafs-direct-write.md | 6 + store/cafs/src/checkPkgFilesIntegrity.ts | 14 +- store/cafs/src/index.ts | 3 +- store/cafs/src/writeBufferToCafs.ts | 88 ++++++----- store/cafs/src/writeFile.ts | 14 ++ store/cafs/test/writeBufferToCafs.test.ts | 174 +++++++++++++++++++++- 6 files changed, 251 insertions(+), 48 deletions(-) create mode 100644 .changeset/perf-cafs-direct-write.md diff --git a/.changeset/perf-cafs-direct-write.md b/.changeset/perf-cafs-direct-write.md new file mode 100644 index 0000000000..8fe43947e3 --- /dev/null +++ b/.changeset/perf-cafs-direct-write.md @@ -0,0 +1,6 @@ +--- +"@pnpm/store.cafs": patch +"pnpm": patch +--- + +Write CAS files directly to their final content-addressed path instead of writing to a temp file and renaming. Uses exclusive-create file mode for safe concurrent multi-process writes. Eliminates ~30k rename syscalls per cold install. diff --git a/store/cafs/src/checkPkgFilesIntegrity.ts b/store/cafs/src/checkPkgFilesIntegrity.ts index c43ce46857..9e6158cd14 100644 --- a/store/cafs/src/checkPkgFilesIntegrity.ts +++ b/store/cafs/src/checkPkgFilesIntegrity.ts @@ -162,7 +162,11 @@ function verifyFile ( rimrafSync(filename) return false } - return verifyFileIntegrity(filename, { digest: fstat.digest, algorithm }) + const passed = verifyFileIntegrity(filename, { digest: fstat.digest, algorithm }) + if (!passed) { + gfs.unlinkSync(filename) + } + return passed } // If a file was not edited, we are skipping integrity check. // We assume that nobody will manually remove a file in the store and create a new one. @@ -184,18 +188,12 @@ export function verifyFileIntegrity ( } throw err } - let computedDigest: string try { - computedDigest = crypto.hash(integrity.algorithm, data, 'hex') + return crypto.hash(integrity.algorithm, data, 'hex') === integrity.digest } catch { // Invalid algorithm (e.g., corrupted index file) - treat as verification failure return false } - const passed = computedDigest === integrity.digest - if (!passed) { - gfs.unlinkSync(filename) - } - return passed } function checkFile (filename: string, checkedAt?: number): { isModified: boolean, size: number } | null { diff --git a/store/cafs/src/index.ts b/store/cafs/src/index.ts index c70453bdcf..9683df798c 100644 --- a/store/cafs/src/index.ts +++ b/store/cafs/src/index.ts @@ -26,7 +26,7 @@ import { modeIsExecutable, } from './getFilePathInCafs.js' import { normalizeBundledManifest } from './normalizeBundledManifest.js' -import { optimisticRenameOverwrite, writeBufferToCafs } from './writeBufferToCafs.js' +import { writeBufferToCafs } from './writeBufferToCafs.js' export const HASH_ALGORITHM = 'sha512' @@ -40,7 +40,6 @@ export { type FileType, getFilePathByModeInCafs, type Integrity, - optimisticRenameOverwrite, type PackageFileInfo, type PackageFiles, type PackageFilesIndex, diff --git a/store/cafs/src/writeBufferToCafs.ts b/store/cafs/src/writeBufferToCafs.ts index 183731723f..09c285e14e 100644 --- a/store/cafs/src/writeBufferToCafs.ts +++ b/store/cafs/src/writeBufferToCafs.ts @@ -6,7 +6,7 @@ import workerThreads from 'node:worker_threads' import { renameOverwriteSync } from 'rename-overwrite' import { type Integrity, verifyFileIntegrity } from './checkPkgFilesIntegrity.js' -import { writeFile } from './writeFile.js' +import { writeFile, writeFileExclusive } from './writeFile.js' export function writeBufferToCafs ( locker: Map, @@ -23,40 +23,64 @@ export function writeBufferToCafs ( filePath: fileDest, } } - // This part is a bit redundant. - // When a file is already used by another package, - // we probably have validated its content already. - // However, there is no way to find which package index file references - // the given file. So we should revalidate the content of the file again. - if (existsSame(fileDest, integrity)) { - const checkedAt = Date.now() - locker.set(fileDest, checkedAt) - return { - checkedAt, - filePath: fileDest, + const checkedAt = writeOrCheck(fileDest, buffer, mode, integrity) + locker.set(fileDest, checkedAt) + return { + checkedAt, + filePath: fileDest, + } +} + +function writeOrCheck ( + fileDest: string, + buffer: Buffer, + mode: number | undefined, + integrity: Integrity +): number { + // Fast path: check if the file already exists on disk with correct content. + const existingFile = fs.statSync(fileDest, { throwIfNoEntry: false }) + if (existingFile) { + if (verifyFileIntegrity(fileDest, integrity)) { + return Date.now() } + // File exists but has wrong integrity (corruption/partial write). + // Use temp+rename so the replacement is atomic. + return writeFileAtomic(fileDest, buffer, mode) } - // This might be too cautious. - // The write is atomic, so in case pnpm crashes, no broken file - // will be added to the store. - // It might be a redundant step though, as we verify the contents of the - // files before linking - // - // If we don't allow --no-verify-store-integrity then we probably can write - // to the final file directly. - const temp = pathTemp(fileDest) - writeFile(temp, buffer, mode) + // File doesn't exist. Use exclusive-create (O_CREAT|O_EXCL) so that + // if another process creates the same CAS file concurrently, we get EEXIST + // instead of silently overwriting. A crash mid-write can leave a partial + // file, which is recovered by the atomic temp+rename path on next access. + try { + writeFileExclusive(fileDest, buffer, mode) + } catch (err: unknown) { + if (util.types.isNativeError(err) && 'code' in err && err.code === 'EEXIST') { + // Another process created the file. If it finished successfully, + // integrity will pass. If it crashed or is still writing, integrity + // will fail and we recover via atomic temp+rename. + if (verifyFileIntegrity(fileDest, integrity)) { + return Date.now() + } + return writeFileAtomic(fileDest, buffer, mode) + } + throw err + } // Unfortunately, "birth time" (time of file creation) is available not on all filesystems. // We log the creation time ourselves and save it in the package index file. // Having this information allows us to skip content checks for files that were not modified since "birth time". - const birthtimeMs = Date.now() + return Date.now() +} + +function writeFileAtomic ( + fileDest: string, + buffer: Buffer, + mode: number | undefined +): number { + const temp = pathTemp(fileDest) + writeFile(temp, buffer, mode) optimisticRenameOverwrite(temp, fileDest) - locker.set(fileDest, birthtimeMs) - return { - checkedAt: birthtimeMs, - filePath: fileDest, - } + return Date.now() } export function optimisticRenameOverwrite (temp: string, fileDest: string): void { @@ -91,7 +115,7 @@ export function optimisticRenameOverwrite (temp: string, fileDest: string): void * @param file - The original file path * @returns A temporary file path in the format: {basename}{pid}{threadId} */ -export function pathTemp (file: string): string { +function pathTemp (file: string): string { const basename = removeSuffix(path.basename(file)) return path.join(path.dirname(file), `${basename}${process.pid}${workerThreads.threadId}`) } @@ -105,9 +129,3 @@ function removeSuffix (filePath: string): string { } return withoutSuffix } - -function existsSame (filename: string, integrity: Integrity): boolean { - const existingFile = fs.statSync(filename, { throwIfNoEntry: false }) - if (!existingFile) return false - return verifyFileIntegrity(filename, integrity) -} diff --git a/store/cafs/src/writeFile.ts b/store/cafs/src/writeFile.ts index 01eedfa759..0e6289f534 100644 --- a/store/cafs/src/writeFile.ts +++ b/store/cafs/src/writeFile.ts @@ -13,6 +13,20 @@ export function writeFile ( fs.writeFileSync(fileDest, buffer, { mode }) } +/** + * Creates a file only if it doesn't already exist, using O_CREAT|O_EXCL. + * Throws EEXIST if the file was created by another process concurrently. + * Note: the write itself is not atomic — a crash mid-write can leave a partial file. + */ +export function writeFileExclusive ( + fileDest: string, + buffer: Buffer, + mode?: number +): void { + makeDirForFile(fileDest) + fs.writeFileSync(fileDest, buffer, { mode, flag: 'wx' }) +} + function makeDirForFile (fileDest: string): void { const dir = path.dirname(fileDest) if (!dirs.has(dir)) { diff --git a/store/cafs/test/writeBufferToCafs.test.ts b/store/cafs/test/writeBufferToCafs.test.ts index af24a9fad3..ca518a5bb5 100644 --- a/store/cafs/test/writeBufferToCafs.test.ts +++ b/store/cafs/test/writeBufferToCafs.test.ts @@ -1,23 +1,136 @@ +import { execSync } from 'node:child_process' import crypto from 'node:crypto' import fs from 'node:fs' import path from 'node:path' +import { fileURLToPath, pathToFileURL } from 'node:url' import { temporaryDirectory } from 'tempy' -import { pathTemp, writeBufferToCafs } from '../src/writeBufferToCafs.js' +import { writeBufferToCafs } from '../src/writeBufferToCafs.js' + +const testDir = path.dirname(fileURLToPath(import.meta.url)) describe('writeBufferToCafs', () => { - it('should not fail if a file already exists at the temp file location', () => { + it('should write directly to the final CAS path', () => { const storeDir = temporaryDirectory() const fileDest = 'abc' const buffer = Buffer.from('abc') const fullFileDest = path.join(storeDir, fileDest) - fs.writeFileSync(pathTemp(fullFileDest), 'ccc', 'utf8') const digest = crypto.hash('sha512', buffer, 'hex') writeBufferToCafs(new Map(), storeDir, buffer, fileDest, 420, { digest, algorithm: 'sha512' }) expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc') }) + it('should handle EEXIST when the existing file has correct integrity', () => { + const storeDir = temporaryDirectory() + const fileDest = 'abc' + const buffer = Buffer.from('abc') + const fullFileDest = path.join(storeDir, fileDest) + const digest = crypto.hash('sha512', buffer, 'hex') + const integrity = { digest, algorithm: 'sha512' } + const locker = new Map() + + // Simulate another process creating the file before our exclusive write + fs.mkdirSync(path.dirname(fullFileDest), { recursive: true }) + fs.writeFileSync(fullFileDest, buffer) + + // Our call should find the file via stat, verify integrity, and cache it + const result = writeBufferToCafs(locker, storeDir, buffer, fileDest, 420, integrity) + expect(result.filePath).toBe(fullFileDest) + expect(locker.has(fullFileDest)).toBe(true) + expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc') + }) + + it('should overwrite an existing file with wrong integrity without deleting it first', () => { + const storeDir = temporaryDirectory() + const fileDest = 'abc' + const buffer = Buffer.from('abc') + const fullFileDest = path.join(storeDir, fileDest) + const digest = crypto.hash('sha512', buffer, 'hex') + const integrity = { digest, algorithm: 'sha512' } + const locker = new Map() + + // Create a file with wrong content (simulating corruption or partial write) + fs.mkdirSync(path.dirname(fullFileDest), { recursive: true }) + fs.writeFileSync(fullFileDest, 'wrong content') + + // Should detect mismatch and overwrite (not delete + recreate) + const result = writeBufferToCafs(locker, storeDir, buffer, fileDest, 420, integrity) + expect(result.filePath).toBe(fullFileDest) + expect(locker.has(fullFileDest)).toBe(true) + expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc') + }) + + it('should handle concurrent writes from multiple processes without corruption', () => { + const storeDir = temporaryDirectory() + const fileDest = 'abc' + const content = crypto.randomBytes(256 * 1024) + const fullFileDest = path.join(storeDir, fileDest) + const digest = crypto.hash('sha512', content, 'hex') + + const results = runConcurrentWorkers(storeDir, fileDest, content, digest) + + expect(results).toHaveLength(8) + for (const result of results) { + expect(result.filePath).toBe(fullFileDest) + expect(typeof result.checkedAt).toBe('number') + } + + const finalContent = fs.readFileSync(fullFileDest) + expect(finalContent).toHaveLength(content.length) + expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest) + }) + + it('should recover from a corrupt file when multiple processes write concurrently', () => { + const storeDir = temporaryDirectory() + const fileDest = 'abc' + const content = crypto.randomBytes(256 * 1024) + const fullFileDest = path.join(storeDir, fileDest) + const digest = crypto.hash('sha512', content, 'hex') + + // Pre-seed a corrupt file (simulates a previous process that crashed mid-write) + fs.mkdirSync(path.dirname(fullFileDest), { recursive: true }) + fs.writeFileSync(fullFileDest, 'partial garbage from a crashed writer') + + const results = runConcurrentWorkers(storeDir, fileDest, content, digest) + + // All workers must succeed despite the pre-existing corrupt file + expect(results).toHaveLength(8) + for (const result of results) { + expect(result.filePath).toBe(fullFileDest) + expect(typeof result.checkedAt).toBe('number') + } + + // Final file must have correct content + const finalContent = fs.readFileSync(fullFileDest) + expect(finalContent).toHaveLength(content.length) + expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest) + }) + + it('should recover from a truncated file (simulating crash mid-write)', () => { + const storeDir = temporaryDirectory() + const fileDest = 'abc' + const content = crypto.randomBytes(256 * 1024) + const fullFileDest = path.join(storeDir, fileDest) + const digest = crypto.hash('sha512', content, 'hex') + + // Pre-seed a truncated file: first 1 KB of the correct content + // (simulates a process that started writing correctly but crashed) + fs.mkdirSync(path.dirname(fullFileDest), { recursive: true }) + fs.writeFileSync(fullFileDest, content.subarray(0, 1024)) + + const results = runConcurrentWorkers(storeDir, fileDest, content, digest) + + expect(results).toHaveLength(8) + for (const result of results) { + expect(result.filePath).toBe(fullFileDest) + } + + const finalContent = fs.readFileSync(fullFileDest) + expect(finalContent).toHaveLength(content.length) + expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest) + }) + it('should populate the locker cache when a file already exists with correct integrity', () => { const storeDir = temporaryDirectory() const fileDest = 'abc' @@ -41,3 +154,58 @@ describe('writeBufferToCafs', () => { expect(cached.checkedAt).toBe(result.checkedAt) }) }) + +function runConcurrentWorkers ( + storeDir: string, + fileDest: string, + content: Buffer, + digest: string, + numWorkers = 8 +): Array<{ filePath: string, checkedAt: number }> { + const libUrl = pathToFileURL(path.resolve(testDir, '../lib/writeBufferToCafs.js')).href + const resultsDir = path.join(storeDir, '_results') + fs.mkdirSync(resultsDir, { recursive: true }) + + const workerScript = path.join(storeDir, '_worker.mjs') + fs.writeFileSync(workerScript, ` + import fs from 'node:fs'; + import path from 'node:path'; + import { writeBufferToCafs } from ${JSON.stringify(libUrl)}; + const content = Buffer.from(${JSON.stringify(content.toString('base64'))}, 'base64'); + const locker = new Map(); + const result = writeBufferToCafs(locker, ${JSON.stringify(storeDir)}, content, ${JSON.stringify(fileDest)}, 420, { digest: ${JSON.stringify(digest)}, algorithm: 'sha512' }); + fs.writeFileSync(path.join(${JSON.stringify(resultsDir)}, process.pid + '.json'), JSON.stringify(result)); + `) + + const spawnerScript = path.join(storeDir, '_spawner.mjs') + fs.writeFileSync(spawnerScript, ` + import { spawn } from 'node:child_process'; + const N = ${numWorkers}; + const workerScript = process.argv[2]; + const children = []; + for (let i = 0; i < N; i++) { + children.push(new Promise((resolve, reject) => { + const p = spawn(process.execPath, [workerScript], { stdio: 'pipe' }); + let stderr = ''; + p.stderr.on('data', d => { stderr += d; }); + p.on('exit', code => code === 0 ? resolve() : reject(new Error('Process ' + i + ' exited ' + code + ': ' + stderr))); + })); + } + const results = await Promise.allSettled(children); + const failures = results.filter(r => r.status === 'rejected'); + if (failures.length > 0) { + for (const f of failures) console.error(f.reason.message); + process.exit(1); + } + `) + + execSync(`node ${JSON.stringify(spawnerScript)} ${JSON.stringify(workerScript)}`, { + timeout: 30000, + stdio: 'pipe', + }) + + const resultFiles = fs.readdirSync(resultsDir) + return resultFiles.map(file => + JSON.parse(fs.readFileSync(path.join(resultsDir, file), 'utf8')) + ) +}