mirror of
https://github.com/pnpm/pnpm.git
synced 2026-04-27 18:46:18 -04:00
perf(cafs): skip rename on cold CAS writes, use atomic rename for recovery (#11087)
## Problem
Every file extracted to the CAS goes through a temp-file-plus-rename cycle: `writeFile(temp, buffer)` then `renameOverwriteSync(temp, fileDest)`. For a typical cold install with ~30k files, this adds ~30k extra rename syscalls.
## Solution
Use `writeFileExclusive()` with `{ flag: 'wx' }` (O_CREAT|O_EXCL) to write directly to the final CAS path when the file doesn't exist — skipping the temp+rename overhead. For recovery paths (corrupt/partial files, EEXIST races), fall back to the existing atomic temp+rename via `optimisticRenameOverwrite`.
### Write paths
- **File doesn't exist (common cold-install path)** → `writeFileExclusive` writes directly, no rename
- **File exists with correct integrity** → return immediately, no write
- **File exists with wrong integrity (corruption/crash)** → atomic temp+rename recovery
- **EEXIST (concurrent write)** → verify integrity; if OK return, otherwise atomic temp+rename recovery
### Concurrent safety
- `writeFileExclusive` (`O_CREAT|O_EXCL`) ensures only one process creates a given CAS file
- Recovery overwrites use the battle-tested `optimisticRenameOverwrite` + `pathTemp` for atomic replacement
- `verifyFileIntegrity` is non-destructive (no `unlinkSync` on mismatch), safe when another process may be mid-write
- A crash mid-`writeFileExclusive` can leave a partial file, recovered on next access via atomic temp+rename
---------
Co-authored-by: Zoltan Kochan <z@kochan.io>
This commit is contained in:
6
.changeset/perf-cafs-direct-write.md
Normal file
6
.changeset/perf-cafs-direct-write.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
"@pnpm/store.cafs": patch
|
||||
"pnpm": patch
|
||||
---
|
||||
|
||||
Write CAS files directly to their final content-addressed path instead of writing to a temp file and renaming. Uses exclusive-create file mode for safe concurrent multi-process writes. Eliminates ~30k rename syscalls per cold install.
|
||||
@@ -162,7 +162,11 @@ function verifyFile (
|
||||
rimrafSync(filename)
|
||||
return false
|
||||
}
|
||||
return verifyFileIntegrity(filename, { digest: fstat.digest, algorithm })
|
||||
const passed = verifyFileIntegrity(filename, { digest: fstat.digest, algorithm })
|
||||
if (!passed) {
|
||||
gfs.unlinkSync(filename)
|
||||
}
|
||||
return passed
|
||||
}
|
||||
// If a file was not edited, we are skipping integrity check.
|
||||
// We assume that nobody will manually remove a file in the store and create a new one.
|
||||
@@ -184,18 +188,12 @@ export function verifyFileIntegrity (
|
||||
}
|
||||
throw err
|
||||
}
|
||||
let computedDigest: string
|
||||
try {
|
||||
computedDigest = crypto.hash(integrity.algorithm, data, 'hex')
|
||||
return crypto.hash(integrity.algorithm, data, 'hex') === integrity.digest
|
||||
} catch {
|
||||
// Invalid algorithm (e.g., corrupted index file) - treat as verification failure
|
||||
return false
|
||||
}
|
||||
const passed = computedDigest === integrity.digest
|
||||
if (!passed) {
|
||||
gfs.unlinkSync(filename)
|
||||
}
|
||||
return passed
|
||||
}
|
||||
|
||||
function checkFile (filename: string, checkedAt?: number): { isModified: boolean, size: number } | null {
|
||||
|
||||
@@ -26,7 +26,7 @@ import {
|
||||
modeIsExecutable,
|
||||
} from './getFilePathInCafs.js'
|
||||
import { normalizeBundledManifest } from './normalizeBundledManifest.js'
|
||||
import { optimisticRenameOverwrite, writeBufferToCafs } from './writeBufferToCafs.js'
|
||||
import { writeBufferToCafs } from './writeBufferToCafs.js'
|
||||
|
||||
export const HASH_ALGORITHM = 'sha512'
|
||||
|
||||
@@ -40,7 +40,6 @@ export {
|
||||
type FileType,
|
||||
getFilePathByModeInCafs,
|
||||
type Integrity,
|
||||
optimisticRenameOverwrite,
|
||||
type PackageFileInfo,
|
||||
type PackageFiles,
|
||||
type PackageFilesIndex,
|
||||
|
||||
@@ -6,7 +6,7 @@ import workerThreads from 'node:worker_threads'
|
||||
import { renameOverwriteSync } from 'rename-overwrite'
|
||||
|
||||
import { type Integrity, verifyFileIntegrity } from './checkPkgFilesIntegrity.js'
|
||||
import { writeFile } from './writeFile.js'
|
||||
import { writeFile, writeFileExclusive } from './writeFile.js'
|
||||
|
||||
export function writeBufferToCafs (
|
||||
locker: Map<string, number>,
|
||||
@@ -23,40 +23,64 @@ export function writeBufferToCafs (
|
||||
filePath: fileDest,
|
||||
}
|
||||
}
|
||||
// This part is a bit redundant.
|
||||
// When a file is already used by another package,
|
||||
// we probably have validated its content already.
|
||||
// However, there is no way to find which package index file references
|
||||
// the given file. So we should revalidate the content of the file again.
|
||||
if (existsSame(fileDest, integrity)) {
|
||||
const checkedAt = Date.now()
|
||||
locker.set(fileDest, checkedAt)
|
||||
return {
|
||||
checkedAt,
|
||||
filePath: fileDest,
|
||||
const checkedAt = writeOrCheck(fileDest, buffer, mode, integrity)
|
||||
locker.set(fileDest, checkedAt)
|
||||
return {
|
||||
checkedAt,
|
||||
filePath: fileDest,
|
||||
}
|
||||
}
|
||||
|
||||
function writeOrCheck (
|
||||
fileDest: string,
|
||||
buffer: Buffer,
|
||||
mode: number | undefined,
|
||||
integrity: Integrity
|
||||
): number {
|
||||
// Fast path: check if the file already exists on disk with correct content.
|
||||
const existingFile = fs.statSync(fileDest, { throwIfNoEntry: false })
|
||||
if (existingFile) {
|
||||
if (verifyFileIntegrity(fileDest, integrity)) {
|
||||
return Date.now()
|
||||
}
|
||||
// File exists but has wrong integrity (corruption/partial write).
|
||||
// Use temp+rename so the replacement is atomic.
|
||||
return writeFileAtomic(fileDest, buffer, mode)
|
||||
}
|
||||
|
||||
// This might be too cautious.
|
||||
// The write is atomic, so in case pnpm crashes, no broken file
|
||||
// will be added to the store.
|
||||
// It might be a redundant step though, as we verify the contents of the
|
||||
// files before linking
|
||||
//
|
||||
// If we don't allow --no-verify-store-integrity then we probably can write
|
||||
// to the final file directly.
|
||||
const temp = pathTemp(fileDest)
|
||||
writeFile(temp, buffer, mode)
|
||||
// File doesn't exist. Use exclusive-create (O_CREAT|O_EXCL) so that
|
||||
// if another process creates the same CAS file concurrently, we get EEXIST
|
||||
// instead of silently overwriting. A crash mid-write can leave a partial
|
||||
// file, which is recovered by the atomic temp+rename path on next access.
|
||||
try {
|
||||
writeFileExclusive(fileDest, buffer, mode)
|
||||
} catch (err: unknown) {
|
||||
if (util.types.isNativeError(err) && 'code' in err && err.code === 'EEXIST') {
|
||||
// Another process created the file. If it finished successfully,
|
||||
// integrity will pass. If it crashed or is still writing, integrity
|
||||
// will fail and we recover via atomic temp+rename.
|
||||
if (verifyFileIntegrity(fileDest, integrity)) {
|
||||
return Date.now()
|
||||
}
|
||||
return writeFileAtomic(fileDest, buffer, mode)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
// Unfortunately, "birth time" (time of file creation) is available not on all filesystems.
|
||||
// We log the creation time ourselves and save it in the package index file.
|
||||
// Having this information allows us to skip content checks for files that were not modified since "birth time".
|
||||
const birthtimeMs = Date.now()
|
||||
return Date.now()
|
||||
}
|
||||
|
||||
function writeFileAtomic (
|
||||
fileDest: string,
|
||||
buffer: Buffer,
|
||||
mode: number | undefined
|
||||
): number {
|
||||
const temp = pathTemp(fileDest)
|
||||
writeFile(temp, buffer, mode)
|
||||
optimisticRenameOverwrite(temp, fileDest)
|
||||
locker.set(fileDest, birthtimeMs)
|
||||
return {
|
||||
checkedAt: birthtimeMs,
|
||||
filePath: fileDest,
|
||||
}
|
||||
return Date.now()
|
||||
}
|
||||
|
||||
export function optimisticRenameOverwrite (temp: string, fileDest: string): void {
|
||||
@@ -91,7 +115,7 @@ export function optimisticRenameOverwrite (temp: string, fileDest: string): void
|
||||
* @param file - The original file path
|
||||
* @returns A temporary file path in the format: {basename}{pid}{threadId}
|
||||
*/
|
||||
export function pathTemp (file: string): string {
|
||||
function pathTemp (file: string): string {
|
||||
const basename = removeSuffix(path.basename(file))
|
||||
return path.join(path.dirname(file), `${basename}${process.pid}${workerThreads.threadId}`)
|
||||
}
|
||||
@@ -105,9 +129,3 @@ function removeSuffix (filePath: string): string {
|
||||
}
|
||||
return withoutSuffix
|
||||
}
|
||||
|
||||
function existsSame (filename: string, integrity: Integrity): boolean {
|
||||
const existingFile = fs.statSync(filename, { throwIfNoEntry: false })
|
||||
if (!existingFile) return false
|
||||
return verifyFileIntegrity(filename, integrity)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,20 @@ export function writeFile (
|
||||
fs.writeFileSync(fileDest, buffer, { mode })
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a file only if it doesn't already exist, using O_CREAT|O_EXCL.
|
||||
* Throws EEXIST if the file was created by another process concurrently.
|
||||
* Note: the write itself is not atomic — a crash mid-write can leave a partial file.
|
||||
*/
|
||||
export function writeFileExclusive (
|
||||
fileDest: string,
|
||||
buffer: Buffer,
|
||||
mode?: number
|
||||
): void {
|
||||
makeDirForFile(fileDest)
|
||||
fs.writeFileSync(fileDest, buffer, { mode, flag: 'wx' })
|
||||
}
|
||||
|
||||
function makeDirForFile (fileDest: string): void {
|
||||
const dir = path.dirname(fileDest)
|
||||
if (!dirs.has(dir)) {
|
||||
|
||||
@@ -1,23 +1,136 @@
|
||||
import { execSync } from 'node:child_process'
|
||||
import crypto from 'node:crypto'
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
import { fileURLToPath, pathToFileURL } from 'node:url'
|
||||
|
||||
import { temporaryDirectory } from 'tempy'
|
||||
|
||||
import { pathTemp, writeBufferToCafs } from '../src/writeBufferToCafs.js'
|
||||
import { writeBufferToCafs } from '../src/writeBufferToCafs.js'
|
||||
|
||||
const testDir = path.dirname(fileURLToPath(import.meta.url))
|
||||
|
||||
describe('writeBufferToCafs', () => {
|
||||
it('should not fail if a file already exists at the temp file location', () => {
|
||||
it('should write directly to the final CAS path', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const buffer = Buffer.from('abc')
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
fs.writeFileSync(pathTemp(fullFileDest), 'ccc', 'utf8')
|
||||
const digest = crypto.hash('sha512', buffer, 'hex')
|
||||
writeBufferToCafs(new Map(), storeDir, buffer, fileDest, 420, { digest, algorithm: 'sha512' })
|
||||
expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc')
|
||||
})
|
||||
|
||||
it('should handle EEXIST when the existing file has correct integrity', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const buffer = Buffer.from('abc')
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
const digest = crypto.hash('sha512', buffer, 'hex')
|
||||
const integrity = { digest, algorithm: 'sha512' }
|
||||
const locker = new Map<string, number>()
|
||||
|
||||
// Simulate another process creating the file before our exclusive write
|
||||
fs.mkdirSync(path.dirname(fullFileDest), { recursive: true })
|
||||
fs.writeFileSync(fullFileDest, buffer)
|
||||
|
||||
// Our call should find the file via stat, verify integrity, and cache it
|
||||
const result = writeBufferToCafs(locker, storeDir, buffer, fileDest, 420, integrity)
|
||||
expect(result.filePath).toBe(fullFileDest)
|
||||
expect(locker.has(fullFileDest)).toBe(true)
|
||||
expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc')
|
||||
})
|
||||
|
||||
it('should overwrite an existing file with wrong integrity without deleting it first', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const buffer = Buffer.from('abc')
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
const digest = crypto.hash('sha512', buffer, 'hex')
|
||||
const integrity = { digest, algorithm: 'sha512' }
|
||||
const locker = new Map<string, number>()
|
||||
|
||||
// Create a file with wrong content (simulating corruption or partial write)
|
||||
fs.mkdirSync(path.dirname(fullFileDest), { recursive: true })
|
||||
fs.writeFileSync(fullFileDest, 'wrong content')
|
||||
|
||||
// Should detect mismatch and overwrite (not delete + recreate)
|
||||
const result = writeBufferToCafs(locker, storeDir, buffer, fileDest, 420, integrity)
|
||||
expect(result.filePath).toBe(fullFileDest)
|
||||
expect(locker.has(fullFileDest)).toBe(true)
|
||||
expect(fs.readFileSync(fullFileDest, 'utf8')).toBe('abc')
|
||||
})
|
||||
|
||||
it('should handle concurrent writes from multiple processes without corruption', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const content = crypto.randomBytes(256 * 1024)
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
const digest = crypto.hash('sha512', content, 'hex')
|
||||
|
||||
const results = runConcurrentWorkers(storeDir, fileDest, content, digest)
|
||||
|
||||
expect(results).toHaveLength(8)
|
||||
for (const result of results) {
|
||||
expect(result.filePath).toBe(fullFileDest)
|
||||
expect(typeof result.checkedAt).toBe('number')
|
||||
}
|
||||
|
||||
const finalContent = fs.readFileSync(fullFileDest)
|
||||
expect(finalContent).toHaveLength(content.length)
|
||||
expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest)
|
||||
})
|
||||
|
||||
it('should recover from a corrupt file when multiple processes write concurrently', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const content = crypto.randomBytes(256 * 1024)
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
const digest = crypto.hash('sha512', content, 'hex')
|
||||
|
||||
// Pre-seed a corrupt file (simulates a previous process that crashed mid-write)
|
||||
fs.mkdirSync(path.dirname(fullFileDest), { recursive: true })
|
||||
fs.writeFileSync(fullFileDest, 'partial garbage from a crashed writer')
|
||||
|
||||
const results = runConcurrentWorkers(storeDir, fileDest, content, digest)
|
||||
|
||||
// All workers must succeed despite the pre-existing corrupt file
|
||||
expect(results).toHaveLength(8)
|
||||
for (const result of results) {
|
||||
expect(result.filePath).toBe(fullFileDest)
|
||||
expect(typeof result.checkedAt).toBe('number')
|
||||
}
|
||||
|
||||
// Final file must have correct content
|
||||
const finalContent = fs.readFileSync(fullFileDest)
|
||||
expect(finalContent).toHaveLength(content.length)
|
||||
expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest)
|
||||
})
|
||||
|
||||
it('should recover from a truncated file (simulating crash mid-write)', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
const content = crypto.randomBytes(256 * 1024)
|
||||
const fullFileDest = path.join(storeDir, fileDest)
|
||||
const digest = crypto.hash('sha512', content, 'hex')
|
||||
|
||||
// Pre-seed a truncated file: first 1 KB of the correct content
|
||||
// (simulates a process that started writing correctly but crashed)
|
||||
fs.mkdirSync(path.dirname(fullFileDest), { recursive: true })
|
||||
fs.writeFileSync(fullFileDest, content.subarray(0, 1024))
|
||||
|
||||
const results = runConcurrentWorkers(storeDir, fileDest, content, digest)
|
||||
|
||||
expect(results).toHaveLength(8)
|
||||
for (const result of results) {
|
||||
expect(result.filePath).toBe(fullFileDest)
|
||||
}
|
||||
|
||||
const finalContent = fs.readFileSync(fullFileDest)
|
||||
expect(finalContent).toHaveLength(content.length)
|
||||
expect(crypto.hash('sha512', finalContent, 'hex')).toBe(digest)
|
||||
})
|
||||
|
||||
it('should populate the locker cache when a file already exists with correct integrity', () => {
|
||||
const storeDir = temporaryDirectory()
|
||||
const fileDest = 'abc'
|
||||
@@ -41,3 +154,58 @@ describe('writeBufferToCafs', () => {
|
||||
expect(cached.checkedAt).toBe(result.checkedAt)
|
||||
})
|
||||
})
|
||||
|
||||
function runConcurrentWorkers (
|
||||
storeDir: string,
|
||||
fileDest: string,
|
||||
content: Buffer,
|
||||
digest: string,
|
||||
numWorkers = 8
|
||||
): Array<{ filePath: string, checkedAt: number }> {
|
||||
const libUrl = pathToFileURL(path.resolve(testDir, '../lib/writeBufferToCafs.js')).href
|
||||
const resultsDir = path.join(storeDir, '_results')
|
||||
fs.mkdirSync(resultsDir, { recursive: true })
|
||||
|
||||
const workerScript = path.join(storeDir, '_worker.mjs')
|
||||
fs.writeFileSync(workerScript, `
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { writeBufferToCafs } from ${JSON.stringify(libUrl)};
|
||||
const content = Buffer.from(${JSON.stringify(content.toString('base64'))}, 'base64');
|
||||
const locker = new Map();
|
||||
const result = writeBufferToCafs(locker, ${JSON.stringify(storeDir)}, content, ${JSON.stringify(fileDest)}, 420, { digest: ${JSON.stringify(digest)}, algorithm: 'sha512' });
|
||||
fs.writeFileSync(path.join(${JSON.stringify(resultsDir)}, process.pid + '.json'), JSON.stringify(result));
|
||||
`)
|
||||
|
||||
const spawnerScript = path.join(storeDir, '_spawner.mjs')
|
||||
fs.writeFileSync(spawnerScript, `
|
||||
import { spawn } from 'node:child_process';
|
||||
const N = ${numWorkers};
|
||||
const workerScript = process.argv[2];
|
||||
const children = [];
|
||||
for (let i = 0; i < N; i++) {
|
||||
children.push(new Promise((resolve, reject) => {
|
||||
const p = spawn(process.execPath, [workerScript], { stdio: 'pipe' });
|
||||
let stderr = '';
|
||||
p.stderr.on('data', d => { stderr += d; });
|
||||
p.on('exit', code => code === 0 ? resolve() : reject(new Error('Process ' + i + ' exited ' + code + ': ' + stderr)));
|
||||
}));
|
||||
}
|
||||
const results = await Promise.allSettled(children);
|
||||
const failures = results.filter(r => r.status === 'rejected');
|
||||
if (failures.length > 0) {
|
||||
for (const f of failures) console.error(f.reason.message);
|
||||
process.exit(1);
|
||||
}
|
||||
`)
|
||||
|
||||
execSync(`node ${JSON.stringify(spawnerScript)} ${JSON.stringify(workerScript)}`, {
|
||||
timeout: 30000,
|
||||
stdio: 'pipe',
|
||||
})
|
||||
|
||||
const resultFiles = fs.readdirSync(resultsDir)
|
||||
return resultFiles.map(file =>
|
||||
JSON.parse(fs.readFileSync(path.join(resultsDir, file), 'utf8'))
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user