perf: read the package manifests from the memory, when possible

When a new package is being added to the store, its manifest is streamed
in the memory. So instead of reading the manifest from the filesystem,
we can parse the stream from the memory.

PR #2525
This commit is contained in:
Zoltan Kochan
2020-05-03 17:38:10 +03:00
committed by GitHub
parent 64bae33c4a
commit 42e6490d18
21 changed files with 202 additions and 54 deletions

View File

@@ -0,0 +1,6 @@
---
"@pnpm/package-requester": minor
"@pnpm/store-controller-types": minor
---
The fetch package to store function does not need the pkgName anymore.

View File

@@ -0,0 +1,10 @@
---
"@pnpm/cafs": minor
"@pnpm/fetcher-base": minor
"@pnpm/git-fetcher": minor
"@pnpm/package-requester": minor
"pnpm": minor
"@pnpm/tarball-fetcher": minor
---
When a new package is being added to the store, its manifest is streamed in the memory. So instead of reading the manifest from the filesystem, we can parse the stream from the memory.

View File

@@ -16,6 +16,7 @@
"dependencies": {
"@pnpm/fetcher-base": "workspace:7.0.0-alpha.1",
"@zkochan/rimraf": "1.0.0",
"concat-stream": "^2.0.0",
"decompress-maybe": "^1.0.0",
"get-stream": "5.1.0",
"mz": "2.7.0",
@@ -27,6 +28,7 @@
"tar-stream": "^2.1.2"
},
"devDependencies": {
"@types/concat-stream": "^1.6.0",
"@types/mz": "2.7.0",
"@types/node": "^13.13.4",
"@types/ssri": "^6.0.2",

View File

@@ -1,8 +1,9 @@
import { FilesIndex } from '@pnpm/fetcher-base'
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import fs = require('mz/fs')
import pLimit from 'p-limit'
import path = require('path')
import ssri = require('ssri')
import { parseJsonBuffer } from './parseJson'
const limit = pLimit(20)
@@ -14,9 +15,10 @@ export default async function (
addBuffer: (buffer: Buffer, mode: number) => Promise<ssri.Integrity>,
},
dirname: string,
manifest?: DeferredManifestPromise,
) {
const index = {}
await _retrieveFileIntegrities(cafs, dirname, dirname, index)
await _retrieveFileIntegrities(cafs, dirname, dirname, index, manifest)
return index
}
@@ -28,6 +30,7 @@ async function _retrieveFileIntegrities (
rootDir: string,
currDir: string,
index: FilesIndex,
deferredManifest?: DeferredManifestPromise,
) {
try {
const files = await fs.readdir(currDir)
@@ -40,12 +43,20 @@ async function _retrieveFileIntegrities (
}
if (stat.isFile()) {
const relativePath = path.relative(rootDir, fullPath)
const generatingIntegrity = limit(async () => {
if (deferredManifest && rootDir === currDir && file === 'package.json') {
const buffer = await fs.readFile(fullPath)
parseJsonBuffer(buffer, deferredManifest)
return cafs.addBuffer(buffer, stat.mode)
}
if (stat.size < MAX_BULK_SIZE) {
const buffer = await fs.readFile(fullPath)
return cafs.addBuffer(buffer, stat.mode)
}
return cafs.addStream(fs.createReadStream(fullPath), stat.mode)
})
index[relativePath] = {
generatingIntegrity: limit(() => {
return stat.size < MAX_BULK_SIZE
? fs.readFile(fullPath).then((buffer) => cafs.addBuffer(buffer, stat.mode))
: cafs.addStream(fs.createReadStream(fullPath), stat.mode)
}),
generatingIntegrity,
mode: stat.mode,
size: stat.size,
}

View File

@@ -1,13 +1,15 @@
import { FilesIndex } from '@pnpm/fetcher-base'
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import decompress = require('decompress-maybe')
import ssri = require('ssri')
import { Duplex, PassThrough } from 'stream'
import tar = require('tar-stream')
import { parseJsonStream } from './parseJson'
export default async function (
addStreamToCafs: (fileStream: PassThrough, mode: number) => Promise<ssri.Integrity>,
_ignore: null | ((filename: string) => Boolean),
stream: NodeJS.ReadableStream,
manifest?: DeferredManifestPromise,
): Promise<FilesIndex> {
const ignore = _ignore ? _ignore : () => false
const extract = tar.extract()
@@ -20,6 +22,9 @@ export default async function (
next()
return
}
if (filename === 'package.json' && manifest) {
parseJsonStream(fileStream, manifest)
}
const generatingIntegrity = addStreamToCafs(fileStream, header.mode!)
filesIndex[filename] = {
generatingIntegrity,

View File

@@ -1,8 +1,10 @@
import { DeferredManifestPromise } from '@pnpm/fetcher-base'
import rimraf = require('@zkochan/rimraf')
import fs = require('mz/fs')
import pLimit from 'p-limit'
import ssri = require('ssri')
import { getFilePathInCafs } from '.'
import { parseJsonBuffer } from './parseJson'
const limit = pLimit(20)
const MAX_BULK_SIZE = 1 * 1024 * 1024 // 1MB
@@ -10,6 +12,7 @@ const MAX_BULK_SIZE = 1 * 1024 * 1024 // 1MB
export default async function (
cafsDir: string,
integrityObj: Record<string, { size: number, mode: number, integrity: string }>,
manifest?: DeferredManifestPromise,
) {
let verified = true
await Promise.all(
@@ -24,6 +27,7 @@ export default async function (
!await verifyFile(
getFilePathInCafs(cafsDir, fstat),
fstat,
f === 'package.json' ? manifest : undefined,
)
) {
verified = false
@@ -34,8 +38,12 @@ export default async function (
return verified
}
async function verifyFile (filename: string, fstat: { size: number, integrity: string }) {
if (fstat.size > MAX_BULK_SIZE) {
async function verifyFile (
filename: string,
fstat: { size: number, integrity: string },
deferredManifest?: DeferredManifestPromise,
) {
if (fstat.size > MAX_BULK_SIZE && !deferredManifest) {
try {
const ok = Boolean(await ssri.checkStream(fs.createReadStream(filename), fstat.integrity))
if (!ok) {
@@ -60,6 +68,8 @@ async function verifyFile (filename: string, fstat: { size: number, integrity: s
const ok = Boolean(ssri.checkData(data, fstat.integrity))
if (!ok) {
await rimraf(filename)
} else if (deferredManifest) {
parseJsonBuffer(data, deferredManifest)
}
return ok
} catch (err) {

View File

@@ -0,0 +1,23 @@
import { DeferredManifestPromise } from '@pnpm/fetcher-base'
import concatStream = require('concat-stream')
import { PassThrough } from 'stream'
export function parseJsonBuffer (
buffer: Buffer,
deferred: DeferredManifestPromise,
) {
try {
deferred.resolve(JSON.parse(buffer.toString()))
} catch (err) {
deferred.reject(err)
}
}
export function parseJsonStream (
stream: PassThrough,
deferred: DeferredManifestPromise,
) {
stream.pipe(
concatStream((buffer) => parseJsonBuffer(buffer, deferred)),
)
}

View File

@@ -31,6 +31,7 @@
"homepage": "https://github.com/pnpm/pnpm/blob/master/packages/fetcher-base#readme",
"dependencies": {
"@pnpm/resolver-base": "workspace:7.0.0",
"@pnpm/types": "workspace:5.0.0",
"@types/ssri": "^6.0.2"
},
"funding": "https://opencollective.com/pnpm"

View File

@@ -1,18 +1,25 @@
import { Resolution } from '@pnpm/resolver-base'
import { DependencyManifest } from '@pnpm/types'
import { Integrity } from 'ssri'
export type Cafs = {
addFilesFromDir: (dir: string) => Promise<FilesIndex>,
addFilesFromTarball: (stream: NodeJS.ReadableStream) => Promise<FilesIndex>,
addFilesFromDir: (dir: string, manifest?: DeferredManifestPromise) => Promise<FilesIndex>,
addFilesFromTarball: (stream: NodeJS.ReadableStream, manifest?: DeferredManifestPromise) => Promise<FilesIndex>,
}
export interface FetchOptions {
cachedTarballLocation: string,
manifest?: DeferredManifestPromise,
lockfileDir: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
}
export type DeferredManifestPromise = {
resolve: (manifest: DependencyManifest) => void,
reject: (err: Error) => void,
}
export type FetchFunction = (
cafs: Cafs,
resolution: Resolution,

View File

@@ -11,6 +11,9 @@
"references": [
{
"path": "../resolver-base"
},
{
"path": "../types"
}
]
}

View File

@@ -37,7 +37,9 @@
},
"devDependencies": {
"@pnpm/cafs": "workspace:1.0.0-alpha.1",
"@pnpm/git-fetcher": "link:"
"@pnpm/git-fetcher": "link:",
"@pnpm/types": "workspace:5.0.0",
"p-defer": "^3.0.0"
},
"funding": "https://opencollective.com/pnpm"
}

View File

@@ -1,4 +1,4 @@
import { Cafs } from '@pnpm/fetcher-base'
import { Cafs, DeferredManifestPromise } from '@pnpm/fetcher-base'
import rimraf = require('@zkochan/rimraf')
import execa = require('execa')
import path = require('path')
@@ -13,6 +13,7 @@ export default () => {
},
opts: {
cafs: Cafs,
manifest?: DeferredManifestPromise,
},
) {
const tempLocation = tempy.directory()
@@ -21,7 +22,7 @@ export default () => {
// removing /.git to make directory integrity calculation faster
await rimraf(path.join(tempLocation, '.git'))
return {
filesIndex: await opts.cafs.addFilesFromDir(tempLocation),
filesIndex: await opts.cafs.addFilesFromDir(tempLocation, opts.manifest),
}
},
}

View File

@@ -1,6 +1,8 @@
///<reference path="../../../typings/index.d.ts"/>
import createCafs from '@pnpm/cafs'
import createFetcher from '@pnpm/git-fetcher'
import { DependencyManifest } from '@pnpm/types'
import pDefer = require('p-defer')
import test = require('tape')
import tempy = require('tempy')
@@ -8,13 +10,16 @@ test('fetch', async t => {
const cafsDir = tempy.directory()
t.comment(`cafs at ${cafsDir}`)
const fetch = createFetcher().git
const fetchResult = await fetch({
const manifest = pDefer<DependencyManifest>()
const { filesIndex } = await fetch({
commit: 'c9b30e71d704cd30fa71f2edd1ecc7dcc4985493',
repo: 'https://github.com/kevva/is-positive.git',
}, {
cafs: createCafs(cafsDir),
manifest,
})
t.ok(fetchResult.filesIndex['package.json'])
t.ok(await fetchResult.filesIndex['package.json'].generatingIntegrity)
t.ok(filesIndex['package.json'])
t.ok(await filesIndex['package.json'].generatingIntegrity)
t.equal((await manifest.promise).name, 'is-positive')
t.end()
})

View File

@@ -14,6 +14,9 @@
},
{
"path": "../cafs"
},
{
"path": "../types"
}
]
}

View File

@@ -5,6 +5,7 @@ import createCafs, {
import { fetchingProgressLogger } from '@pnpm/core-loggers'
import {
Cafs,
DeferredManifestPromise,
FetchFunction,
FetchOptions,
FetchResult,
@@ -40,6 +41,7 @@ import pShare = require('promise-share')
import R = require('ramda')
import ssri = require('ssri')
import writeJsonFile = require('write-json-file')
import safeDeferredPromise from './safeDeferredPromise'
const TARBALL_INTEGRITY_FILENAME = 'tarball-integrity'
const packageRequestLogger = logger('package-requester')
@@ -214,7 +216,6 @@ async function resolveAndFetch (
force: forceFetch,
lockfileDir: options.lockfileDir,
pkgId: id,
pkgName: manifest?.name,
resolution: resolution,
})
@@ -242,7 +243,10 @@ async function resolveAndFetch (
function fetchToStore (
ctx: {
checkFilesIntegrity: (integrity: Record<string, { size: number, mode: number, integrity: string }>) => Promise<boolean>,
checkFilesIntegrity: (
integrity: Record<string, { size: number, mode: number, integrity: string }>,
manifest?: DeferredManifestPromise,
) => Promise<boolean>,
fetch: (
packageId: string,
resolution: Resolution,
@@ -263,7 +267,6 @@ function fetchToStore (
opts: {
fetchRawManifest?: boolean,
force: boolean,
pkgName?: string,
pkgId: string,
lockfileDir: string,
resolution: Resolution,
@@ -386,23 +389,30 @@ function fetchToStore (
// ignoring. It is fine if the integrity file is not present. Just refetch the package
}
// if target exists and it wasn't modified, then no need to refetch it
if (integrity && await ctx.checkFilesIntegrity(integrity)) {
files.resolve({
filesIndex: integrity,
fromStore: true,
})
if (opts.fetchRawManifest) {
readBundledManifest(ctx.getFilePathInCafs(integrity['package.json']))
.then(bundledManifest.resolve)
.catch(bundledManifest.reject)
if (integrity) {
const manifest = opts.fetchRawManifest
? safeDeferredPromise<DependencyManifest>()
: undefined
const verified = await ctx.checkFilesIntegrity(integrity, manifest)
if (verified) {
files.resolve({
filesIndex: integrity,
fromStore: true,
})
if (manifest) {
manifest()
.then((manifest) => bundledManifest.resolve(pickBundledManifest(manifest)))
.catch(bundledManifest.reject)
}
finishing.resolve(undefined)
return
}
finishing.resolve(undefined)
return
packageRequestLogger.warn({
message: `Refetching ${target} to store. It was either modified or had no integrity checksums`,
prefix: opts.lockfileDir,
})
}
packageRequestLogger.warn({
message: `Refetching ${target} to store. It was either modified or had no integrity checksums`,
prefix: opts.lockfileDir,
})
}
// We fetch into targetStage directory first and then fs.rename() it to the
@@ -414,12 +424,21 @@ function fetchToStore (
// As much tarballs should be downloaded simultaneously as possible.
const priority = (++ctx.requestsQueue['counter'] % ctx.requestsQueue['concurrency'] === 0 ? -1 : 1) * 1000 // tslint:disable-line
const fetchManifest = opts.fetchRawManifest
? safeDeferredPromise<DependencyManifest>()
: undefined
if (fetchManifest) {
fetchManifest()
.then((manifest) => bundledManifest.resolve(pickBundledManifest(manifest)))
.catch(bundledManifest.reject)
}
const fetchedPackage = await ctx.requestsQueue.add(() => ctx.fetch(
opts.pkgId,
opts.resolution,
{
cachedTarballLocation: path.join(ctx.storeDir, opts.pkgId, 'packed.tgz'),
lockfileDir: opts.lockfileDir,
manifest: fetchManifest,
onProgress: (downloaded) => {
fetchingProgressLogger.debug({
downloaded,
@@ -458,15 +477,6 @@ function fetchToStore (
await writeJsonFile(path.join(target, 'integrity.json'), integrity, { indent: undefined })
finishing.resolve(undefined)
let pkgName: string | undefined = opts.pkgName
if (!pkgName || opts.fetchRawManifest) {
const manifest = await readPackage(ctx.getFilePathInCafs(integrity['package.json'])) as DependencyManifest
bundledManifest.resolve(pickBundledManifest(manifest))
if (!pkgName) {
pkgName = manifest.name
}
}
if (isLocalTarballDep && opts.resolution['integrity']) { // tslint:disable-line:no-string-literal
await fs.writeFile(path.join(target, TARBALL_INTEGRITY_FILENAME), opts.resolution['integrity'], 'utf8') // tslint:disable-line:no-string-literal
}

View File

@@ -0,0 +1,13 @@
import pShare = require('promise-share')
export default function safeDeferredPromise<T> () {
let resolve!: (v: T) => void
let reject!: (err: Error) => void
const promiseFn = pShare(new Promise<T>((_resolve, _reject) => {
resolve = _resolve
reject = _reject
}))
return Object.assign(promiseFn, { resolve, reject })
}

View File

@@ -560,7 +560,7 @@ test('always return a package manifest in the response', async t => {
nock.cleanAll()
const requestPackage = createPackageRequester(resolve, fetch, {
networkConcurrency: 1,
storeDir: '.store',
storeDir: tempy.directory(),
storeIndex: {},
verifyStoreIntegrity: true,
})

View File

@@ -71,7 +71,6 @@ export interface FetchPackageToStoreOptions {
force: boolean,
lockfileDir: string,
pkgId: string,
pkgName?: string,
resolution: Resolution,
}

View File

@@ -1,5 +1,10 @@
import PnpmError from '@pnpm/error'
import { Cafs, FetchResult, FilesIndex } from '@pnpm/fetcher-base'
import {
Cafs,
DeferredManifestPromise,
FetchResult,
FilesIndex,
} from '@pnpm/fetcher-base'
import createFetcher from 'fetch-from-npm-registry'
import { IncomingMessage } from 'http'
import fs = require('mz/fs')
@@ -57,6 +62,7 @@ export type DownloadFunction = (url: string, saveto: string, opts: {
alwaysAuth: boolean | undefined,
},
cafs: Cafs,
manifest?: DeferredManifestPromise,
registry?: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
@@ -107,6 +113,7 @@ export default (
alwaysAuth: boolean | undefined,
},
cafs: Cafs,
manifest?: DeferredManifestPromise,
registry?: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
@@ -173,7 +180,7 @@ export default (
try {
const [integrityCheckResult, filesIndex] = await Promise.all([
opts.integrity && safeCheckStream(res.body, opts.integrity, url) || true,
opts.cafs.addFilesFromTarball(res.body),
opts.cafs.addFilesFromTarball(res.body, opts.manifest),
waitTillClosed({ stream, size, getDownloaded: () => downloaded, url }),
])
if (integrityCheckResult !== true) {

View File

@@ -1,6 +1,7 @@
import PnpmError from '@pnpm/error'
import {
Cafs,
DeferredManifestPromise,
FetchFunction,
FetchOptions,
FetchResult,
@@ -90,6 +91,7 @@ function fetchFromTarball (
const tarball = path.join(opts.lockfileDir, resolution.tarball.slice(5))
return fetchFromLocalTarball(cafs, tarball, {
integrity: resolution.integrity,
manifest: opts.manifest,
})
}
return ctx.fetchFromRemoteTarball(cafs, resolution, opts)
@@ -115,6 +117,7 @@ async function fetchFromRemoteTarball (
try {
return await fetchFromLocalTarball(cafs, opts.cachedTarballLocation, {
integrity: dist.integrity,
manifest: opts.manifest,
})
} catch (err) {
// ignore errors for missing files or broken/partial archives
@@ -151,6 +154,7 @@ async function fetchFromRemoteTarball (
auth,
cafs,
integrity: dist.integrity,
manifest: opts.manifest,
onProgress: opts.onProgress,
onStart: opts.onStart,
registry: dist.registry,
@@ -163,17 +167,18 @@ async function fetchFromLocalTarball (
tarball: string,
opts: {
integrity?: string,
manifest?: DeferredManifestPromise,
},
): Promise<FetchResult> {
try {
const tarballStream = fs.createReadStream(tarball)
const [filesIndex] = (
const [fetchResult] = (
await Promise.all([
cafs.addFilesFromTarball(tarballStream),
cafs.addFilesFromTarball(tarballStream, opts.manifest),
opts.integrity && (ssri.checkStream(tarballStream, opts.integrity) as any), // tslint:disable-line
])
)
return { filesIndex }
return { filesIndex: fetchResult }
} catch (err) {
err.attempts = 1
err.resource = tarball

29
pnpm-lock.yaml generated
View File

@@ -96,6 +96,7 @@ importers:
dependencies:
'@pnpm/fetcher-base': 'link:../fetcher-base'
'@zkochan/rimraf': 1.0.0
concat-stream: 2.0.0
decompress-maybe: 1.0.0
get-stream: 5.1.0
mz: 2.7.0
@@ -106,6 +107,7 @@ importers:
ssri: 6.0.1
tar-stream: 2.1.2
devDependencies:
'@types/concat-stream': 1.6.0
'@types/mz': 2.7.0
'@types/node': 13.13.4
'@types/ssri': 6.0.2
@@ -113,11 +115,13 @@ importers:
tempy: 0.5.0
specifiers:
'@pnpm/fetcher-base': 'workspace:7.0.0-alpha.1'
'@types/concat-stream': ^1.6.0
'@types/mz': 2.7.0
'@types/node': ^13.13.4
'@types/ssri': ^6.0.2
'@types/tar-stream': ^2.1.0
'@zkochan/rimraf': 1.0.0
concat-stream: ^2.0.0
decompress-maybe: ^1.0.0
get-stream: 5.1.0
mz: 2.7.0
@@ -387,9 +391,11 @@ importers:
packages/fetcher-base:
dependencies:
'@pnpm/resolver-base': 'link:../resolver-base'
'@pnpm/types': 'link:../types'
'@types/ssri': 6.0.2
specifiers:
'@pnpm/resolver-base': 'workspace:7.0.0'
'@pnpm/types': 'workspace:5.0.0'
'@types/ssri': ^6.0.2
packages/filter-lockfile:
dependencies:
@@ -547,12 +553,16 @@ importers:
devDependencies:
'@pnpm/cafs': 'link:../cafs'
'@pnpm/git-fetcher': 'link:'
'@pnpm/types': 'link:../types'
p-defer: 3.0.0
specifiers:
'@pnpm/cafs': 'workspace:1.0.0-alpha.1'
'@pnpm/fetcher-base': 'workspace:7.0.0-alpha.1'
'@pnpm/git-fetcher': 'link:'
'@pnpm/types': 'workspace:5.0.0'
'@zkochan/rimraf': 1.0.0
execa: 4.0.0
p-defer: ^3.0.0
tempy: 0.5.0
packages/git-resolver:
dependencies:
@@ -3537,6 +3547,12 @@ packages:
dev: true
resolution:
integrity: sha512-htRqZr5qn8EzMelhX/Xmx142z218lLyGaeZ3YR8jlze4TATRU9huKKvuBmAJEW4LCC4pnY1N6JAm6p85fMHjhg==
/@types/concat-stream/1.6.0:
dependencies:
'@types/node': 13.13.4
dev: true
resolution:
integrity: sha1-OU2+C7X+5Gs42JZzXoto7yOQ0A0=
/@types/configstore/4.0.0:
dev: true
resolution:
@@ -5318,6 +5334,17 @@ packages:
'0': node >= 0.8
resolution:
integrity: sha512-27HBghJxjiZtIk3Ycvn/4kbJk/1uZuJFfuPEns6LaEvpvG1f0hTea8lilrouyo9mVc2GWdcEZ8OLoGmSADlrCw==
/concat-stream/2.0.0:
dependencies:
buffer-from: 1.1.1
inherits: 2.0.4
readable-stream: 3.6.0
typedarray: 0.0.6
dev: false
engines:
'0': node >= 6.0
resolution:
integrity: sha512-MWufYdFw53ccGjCA+Ol7XJYpAlW6/prSMzuPOTRnJGcGzuhLn4Scrz7qf6o8bROZ514ltazcIFJZevcfbo0x7A==
/config-chain/1.1.12:
dependencies:
ini: 1.3.5
@@ -9457,7 +9484,6 @@ packages:
resolution:
integrity: sha1-n26xgvbJqozXQwBKfU+WsZaw+ww=
/p-defer/3.0.0:
dev: false
engines:
node: '>=8'
resolution:
@@ -11960,7 +11986,6 @@ packages:
resolution:
integrity: sha512-zdu8XMNEDepKKR+XYOXAVPtWui0ly0NtohUscw+UmaHiAWT8hrV1rr//H6V+0DvJ3OQ19S979M0laLfX8rm82Q==
/typedarray/0.0.6:
dev: true
resolution:
integrity: sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
/typescript/3.8.3: