perf: faster integrity verification of the store

If a file in the store was never modified, we are not checking its
integrity.

PR #2876
This commit is contained in:
Zoltan Kochan
2020-09-19 23:20:19 +03:00
committed by GitHub
parent 05f032774e
commit 0a65440431
18 changed files with 199 additions and 65 deletions

View File

@@ -0,0 +1,7 @@
---
"@pnpm/package-requester": minor
"@pnpm/package-store": minor
"@pnpm/store-controller-types": minor
---
A new field added to the package files index: `checkedAt`. `checkedAt` is the timestamp (number of milliseconds), when the file's content was verified the last time.

View File

@@ -0,0 +1,6 @@
---
"@pnpm/cafs": major
"@pnpm/fetcher-base": major
---
`generatingIntegrity` replaced with `writeResult`. When files are added to the store, the store returns not only the file's integrity as a result, but also the exact time when the file's content was verified with its integrity.

View File

@@ -0,0 +1,5 @@
---
"@pnpm/cafs": minor
---
If a file in the store was never modified, we can skip checking its integrity.

View File

@@ -23,7 +23,6 @@
"get-stream": "^6.0.0",
"mz": "^2.7.0",
"p-limit": "^3.0.2",
"path-exists": "^4.0.0",
"path-temp": "^2.0.0",
"rename-overwrite": "^3.0.0",
"ssri": "6.0.1",

View File

@@ -1,9 +1,12 @@
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import {
DeferredManifestPromise,
FilesIndex,
FileWriteResult,
} from '@pnpm/fetcher-base'
import { parseJsonBuffer } from './parseJson'
import path = require('path')
import fs = require('mz/fs')
import pLimit = require('p-limit')
import ssri = require('ssri')
const limit = pLimit(20)
@@ -11,21 +14,21 @@ const MAX_BULK_SIZE = 1 * 1024 * 1024 // 1MB
export default async function (
cafs: {
addStream: (stream: NodeJS.ReadableStream, mode: number) => Promise<ssri.Integrity>
addBuffer: (buffer: Buffer, mode: number) => Promise<ssri.Integrity>
addStream: (stream: NodeJS.ReadableStream, mode: number) => Promise<FileWriteResult>
addBuffer: (buffer: Buffer, mode: number) => Promise<FileWriteResult>
},
dirname: string,
manifest?: DeferredManifestPromise
) {
const index = {}
): Promise<FilesIndex> {
const index: FilesIndex = {}
await _retrieveFileIntegrities(cafs, dirname, dirname, index, manifest)
return index
}
async function _retrieveFileIntegrities (
cafs: {
addStream: (stream: NodeJS.ReadableStream, mode: number) => Promise<ssri.Integrity>
addBuffer: (buffer: Buffer, mode: number) => Promise<ssri.Integrity>
addStream: (stream: NodeJS.ReadableStream, mode: number) => Promise<FileWriteResult>
addBuffer: (buffer: Buffer, mode: number) => Promise<FileWriteResult>
},
rootDir: string,
currDir: string,
@@ -43,7 +46,7 @@ async function _retrieveFileIntegrities (
}
if (stat.isFile()) {
const relativePath = path.relative(rootDir, fullPath)
const generatingIntegrity = limit(async () => {
const writeResult = limit(async () => {
if (deferredManifest && rootDir === currDir && file === 'package.json') {
const buffer = await fs.readFile(fullPath)
parseJsonBuffer(buffer, deferredManifest)
@@ -56,9 +59,9 @@ async function _retrieveFileIntegrities (
return cafs.addStream(fs.createReadStream(fullPath), stat.mode)
})
index[relativePath] = {
generatingIntegrity,
mode: stat.mode,
size: stat.size,
writeResult,
}
}
}))

View File

@@ -1,12 +1,11 @@
import { Duplex, PassThrough } from 'stream'
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import { DeferredManifestPromise, FilesIndex, FileWriteResult } from '@pnpm/fetcher-base'
import { parseJsonStream } from './parseJson'
import decompress = require('decompress-maybe')
import ssri = require('ssri')
import tar = require('tar-stream')
export default async function (
addStreamToCafs: (fileStream: PassThrough, mode: number) => Promise<ssri.Integrity>,
addStreamToCafs: (fileStream: PassThrough, mode: number) => Promise<FileWriteResult>,
_ignore: null | ((filename: string) => Boolean),
stream: NodeJS.ReadableStream,
manifest?: DeferredManifestPromise
@@ -25,11 +24,11 @@ export default async function (
if (filename === 'package.json' && manifest) {
parseJsonStream(fileStream, manifest)
}
const generatingIntegrity = addStreamToCafs(fileStream, header.mode!)
const writeResult = addStreamToCafs(fileStream, header.mode!)
filesIndex[filename] = {
generatingIntegrity,
mode: header.mode!,
size: header.size,
writeResult,
}
next()
})

View File

@@ -44,34 +44,47 @@ export default async function (
return verified
}
type FileInfo = Pick<PackageFileInfo, 'size' | 'checkedAt'> & {
integrity: string | ssri.IntegrityLike
}
async function verifyFile (
filename: string,
fstat: { size: number, integrity: string },
fstat: FileInfo,
deferredManifest?: DeferredManifestPromise
) {
if (fstat.size > MAX_BULK_SIZE && !deferredManifest) {
try {
const ok = Boolean(await ssri.checkStream(fs.createReadStream(filename), fstat.integrity))
const currentFile = await checkFile(filename, fstat.checkedAt)
if (!currentFile) return false
if (currentFile.isModified) {
if (currentFile.size !== fstat.size) {
await rimraf(filename)
return false
}
return verifyFileIntegrity(filename, fstat, deferredManifest)
}
if (deferredManifest) {
parseJsonBuffer(await fs.readFile(filename), deferredManifest)
}
// If a file was not edited, we are skipping integrity check.
// We assume that nobody will manually remove a file in the store and create a new one.
return true
}
export async function verifyFileIntegrity (
filename: string,
expectedFile: FileInfo,
deferredManifest?: DeferredManifestPromise
) {
try {
if (expectedFile.size > MAX_BULK_SIZE && !deferredManifest) {
const ok = Boolean(await ssri.checkStream(fs.createReadStream(filename), expectedFile.integrity))
if (!ok) {
await rimraf(filename)
}
return ok
} catch (err) {
switch (err.code) {
case 'ENOENT': return false
case 'EINTEGRITY': {
// Broken files are removed from the store
await rimraf(filename)
return false
}
}
throw err
}
}
try {
const data = await fs.readFile(filename)
const ok = Boolean(ssri.checkData(data, fstat.integrity))
const ok = Boolean(ssri.checkData(data, expectedFile.integrity))
if (!ok) {
await rimraf(filename)
} else if (deferredManifest) {
@@ -90,3 +103,16 @@ async function verifyFile (
throw err
}
}
async function checkFile (filename: string, checkedAt?: number) {
try {
const { mtimeMs, size } = await fs.stat(filename)
return {
isModified: (mtimeMs - (checkedAt ?? 0)) > 100,
size,
}
} catch (err) {
if (err.code === 'ENOENT') return null
throw err
}
}

View File

@@ -1,4 +1,4 @@
import { Hash } from 'ssri'
import { IntegrityLike } from 'ssri'
import path = require('path')
import ssri = require('ssri')
@@ -8,7 +8,7 @@ export type FileType = 'exec' | 'nonexec' | 'index'
export function getFilePathByModeInCafs (
cafsDir: string,
integrity: string | Hash,
integrity: string | IntegrityLike,
mode: number
) {
const fileType = modeIsExecutable(mode) ? 'exec' : 'nonexec'
@@ -17,14 +17,14 @@ export function getFilePathByModeInCafs (
export default function getFilePathInCafs (
cafsDir: string,
integrity: string | Hash,
integrity: string | IntegrityLike,
fileType: FileType
) {
return path.join(cafsDir, contentPathFromIntegrity(integrity, fileType))
}
function contentPathFromIntegrity (
integrity: string | Hash,
integrity: string | IntegrityLike,
fileType: FileType
) {
const sri = ssri.parse(integrity, { single: true })

View File

@@ -1,8 +1,10 @@
import { PackageFileInfo } from '@pnpm/store-controller-types'
import { FileWriteResult } from '@pnpm/fetcher-base'
import addFilesFromDir from './addFilesFromDir'
import addFilesFromTarball from './addFilesFromTarball'
import checkFilesIntegrity, {
PackageFilesIndex,
verifyFileIntegrity,
} from './checkFilesIntegrity'
import getFilePathInCafs, {
contentPathFromHex,
@@ -11,9 +13,9 @@ import getFilePathInCafs, {
modeIsExecutable,
} from './getFilePathInCafs'
import writeFile from './writeFile'
import fs = require('mz/fs')
import path = require('path')
import getStream = require('get-stream')
import exists = require('path-exists')
import pathTemp = require('path-temp')
import renameOverwrite = require('rename-overwrite')
import ssri = require('ssri')
@@ -42,40 +44,51 @@ async function addStreamToCafs (
writeBufferToCafs: WriteBufferToCafs,
fileStream: NodeJS.ReadableStream,
mode: number
): Promise<ssri.Integrity> {
): Promise<FileWriteResult> {
const buffer = await getStream.buffer(fileStream)
return addBufferToCafs(writeBufferToCafs, buffer, mode)
}
type WriteBufferToCafs = (buffer: Buffer, fileDest: string, mode: number | undefined) => Promise<void>
type WriteBufferToCafs = (buffer: Buffer, fileDest: string, mode: number | undefined, integrity: ssri.IntegrityLike) => Promise<number>
async function addBufferToCafs (
writeBufferToCafs: WriteBufferToCafs,
buffer: Buffer,
mode: number
): Promise<ssri.Integrity> {
): Promise<FileWriteResult> {
const integrity = ssri.fromData(buffer)
const isExecutable = modeIsExecutable(mode)
const fileDest = contentPathFromHex(isExecutable ? 'exec' : 'nonexec', integrity.hexDigest())
await writeBufferToCafs(buffer, fileDest, isExecutable ? 0o755 : undefined)
return integrity
const checkedAt = await writeBufferToCafs(
buffer,
fileDest,
isExecutable ? 0o755 : undefined,
integrity
)
return { checkedAt, integrity }
}
async function writeBufferToCafs (
locker: Map<string, Promise<void>>,
locker: Map<string, Promise<number>>,
cafsDir: string,
buffer: Buffer,
fileDest: string,
mode: number | undefined
) {
mode: number | undefined,
integrity: ssri.IntegrityLike
): Promise<number> {
fileDest = path.join(cafsDir, fileDest)
if (locker.has(fileDest)) {
await locker.get(fileDest)
return
return locker.get(fileDest)!
}
const p = (async () => {
// This is a slow operation. Should be rewritten
if (await exists(fileDest)) return
// This part is a bit redundant.
// When a file is already used by another package,
// we probably have validated its content already.
// However, there is no way to find which package index file references
// the given file. So we should revalidate the content of the file again.
if (await existsSame(fileDest, integrity)) {
return Date.now()
}
// This might be too cautious.
// The write is atomic, so in case pnpm crashes, no broken file
@@ -87,8 +100,26 @@ async function writeBufferToCafs (
// to the final file directly.
const temp = pathTemp(path.dirname(fileDest))
await writeFile(temp, buffer, mode)
// Unfortunately, "birth time" (time of file creation) is available not on all filesystems.
// We log the creation time ourselves and save it in the package index file.
// Having this information allows us to skip content checks for files that were not modified since "birth time".
const birthtimeMs = Date.now()
await renameOverwrite(temp, fileDest)
return birthtimeMs
})()
locker.set(fileDest, p)
await p
return p
}
async function existsSame (filename: string, integrity: ssri.IntegrityLike) {
let existingFile: fs.Stats | undefined
try {
existingFile = await fs.stat(filename)
} catch (err) {
return false
}
return verifyFileIntegrity(filename, {
size: existingFile.size,
integrity,
})
}

View File

@@ -0,0 +1 @@
foo

View File

@@ -1,15 +1,54 @@
import createCafs from '../src'
import fs = require('fs')
import createCafs, {
checkFilesIntegrity,
getFilePathInCafs,
} from '../src'
import fs = require('mz/fs')
import path = require('path')
import tempy = require('tempy')
describe('cafs', () => {
test('unpack', async () => {
it('unpack', async () => {
const dest = tempy.directory()
const cafs = createCafs(dest)
const filesIndex = await cafs.addFilesFromTarball(
fs.createReadStream(path.join(__dirname, '../__fixtures__/node-gyp-6.1.0.tgz'))
)
expect(Object.keys(filesIndex)).toHaveLength(121)
const pkgFile = filesIndex['package.json']
expect(pkgFile.size).toBe(1121)
expect(pkgFile.mode).toBe(420)
const { checkedAt, integrity } = await pkgFile.writeResult
expect(typeof checkedAt).toBe('number')
expect(integrity.toString()).toBe('sha512-8xCvrlC7W3TlwXxetv5CZTi53szYhmT7tmpXF/ttNthtTR9TC7Y7WJFPmJToHaSQ4uObuZyOARdOJYNYuTSbXA==')
})
it('replaces an already existing file, if the integrity of it was broken', async () => {
const storeDir = tempy.directory()
const srcDir = path.join(__dirname, 'fixtures/one-file')
const addFiles = () => createCafs(storeDir).addFilesFromDir(srcDir)
let filesIndex = await addFiles()
const { integrity } = await filesIndex['foo.txt'].writeResult
// Modifying the file in the store
const filePath = getFilePathInCafs(storeDir, integrity, 'nonexec')
await fs.appendFile(filePath, 'bar')
filesIndex = await addFiles()
await filesIndex['foo.txt'].writeResult
expect(await fs.readFile(filePath, 'utf8')).toBe('foo\n')
})
})
describe('checkFilesIntegrity()', () => {
it("doesn't fail if file was removed from the store", async () => {
const storeDir = tempy.directory()
expect(await checkFilesIntegrity(storeDir, {
foo: {
integrity: 'sha512-8xCvrlC7W3TlwXxetv5CZTi53szYhmT7tmpXF/ttNthtTR9TC7Y7WJFPmJToHaSQ4uObuZyOARdOJYNYuTSbXA==',
mode: 420,
size: 10,
},
})).toBeFalsy()
})
})

View File

@@ -1,6 +1,6 @@
import { Resolution } from '@pnpm/resolver-base'
import { DependencyManifest } from '@pnpm/types'
import { Integrity } from 'ssri'
import { IntegrityLike } from 'ssri'
export interface Cafs {
addFilesFromDir: (dir: string, manifest?: DeferredManifestPromise) => Promise<FilesIndex>
@@ -29,10 +29,15 @@ export interface FetchResult {
filesIndex: FilesIndex
}
export interface FileWriteResult {
checkedAt: number
integrity: IntegrityLike
}
export interface FilesIndex {
[filename: string]: {
mode: number
size: number
generatingIntegrity: Promise<Integrity>
writeResult: Promise<FileWriteResult>
}
}

View File

@@ -23,7 +23,7 @@ test('fetch', async t => {
}
)
t.ok(filesIndex['package.json'])
t.ok(await filesIndex['package.json'].generatingIntegrity)
t.ok(await filesIndex['package.json'].writeResult)
t.equal((await manifest.promise).name, 'is-positive')
t.end()
})

View File

@@ -456,12 +456,16 @@ function fetchToStore (
// Ideally, files wouldn't care about when integrity is calculated.
// However, we can only rename the temp folder once we know the package name.
// And we cannot rename the temp folder till we're calculating integrities.
const integrity = {}
const integrity: Record<string, PackageFileInfo> = {}
await Promise.all(
Object.keys(filesIndex)
.map(async (filename) => {
const fileIntegrity = await filesIndex[filename].generatingIntegrity
const {
checkedAt,
integrity: fileIntegrity,
} = await filesIndex[filename].writeResult
integrity[filename] = {
checkedAt,
integrity: fileIntegrity.toString(), // TODO: use the raw Integrity object
mode: filesIndex[filename].mode,
size: filesIndex[filename].size,

View File

@@ -1,6 +1,6 @@
/// <reference path="../../../typings/index.d.ts" />
import { promisify } from 'util'
import { getFilePathInCafs } from '@pnpm/cafs'
import { getFilePathInCafs, PackageFilesIndex } from '@pnpm/cafs'
import createClient from '@pnpm/client'
import { streamParser } from '@pnpm/logger'
import createPackageRequester, { PackageFilesResponse, PackageResponse } from '@pnpm/package-requester'
@@ -8,6 +8,7 @@ import pkgIdToFilename from '@pnpm/pkgid-to-filename'
import { DependencyManifest } from '@pnpm/types'
import delay from 'delay'
import path = require('path')
import loadJsonFile = require('load-json-file')
import fs = require('mz/fs')
import ncpCB = require('ncp')
import nock = require('nock')
@@ -358,6 +359,10 @@ test('fetchPackageToStore()', async (t) => {
'returned info about files after fetch completed')
t.notOk(files.fromStore)
const indexFile = await loadJsonFile<PackageFilesIndex>(fetchResult.filesIndexFile)
t.ok(indexFile, 'index file is written')
t.ok(typeof indexFile.files['package.json'].checkedAt, 'number')
t.ok(fetchResult.finishing())
const fetchResult2 = packageRequester.fetchPackageToStore({
@@ -650,6 +655,7 @@ test('refetch package to store if it has been modified', async (t) => {
indexJsFile = getFilePathInCafs(cafsDir, filesIndex['index.js'].integrity, 'nonexec')
}
await delay(200)
// Adding some content to the file to change its integrity
await fs.appendFile(indexJsFile, '// foobar')

View File

@@ -69,12 +69,16 @@ export default async function (
const sideEffectsIndex = await packageRequester.cafs.addFilesFromDir(builtPkgLocation)
// TODO: move this to a function
// This is duplicated in @pnpm/package-requester
const integrity = {}
const integrity: Record<string, PackageFileInfo> = {}
await Promise.all(
Object.keys(sideEffectsIndex)
.map(async (filename) => {
const fileIntegrity = await sideEffectsIndex[filename].generatingIntegrity
const {
checkedAt,
integrity: fileIntegrity,
} = await sideEffectsIndex[filename].writeResult
integrity[filename] = {
checkedAt,
integrity: fileIntegrity.toString(), // TODO: use the raw Integrity object
mode: sideEffectsIndex[filename].mode,
size: sideEffectsIndex[filename].size,

View File

@@ -64,6 +64,7 @@ export type ImportPackageFunction = (
) => Promise<{ isBuilt: boolean, importMethod: undefined | string }>
export interface PackageFileInfo {
checkedAt?: number // Nullable for backward compatibility
integrity: string
mode: number
size: number

2
pnpm-lock.yaml generated
View File

@@ -114,7 +114,6 @@ importers:
get-stream: 6.0.0
mz: 2.7.0
p-limit: 3.0.2
path-exists: 4.0.0
path-temp: 2.0.0
rename-overwrite: 3.0.0
ssri: 6.0.1
@@ -141,7 +140,6 @@ importers:
get-stream: ^6.0.0
mz: ^2.7.0
p-limit: ^3.0.2
path-exists: ^4.0.0
path-temp: ^2.0.0
rename-overwrite: ^3.0.0
ssri: 6.0.1