perf: improve tarball fetching speed (#6819)

This commit is contained in:
Zoltan Kochan
2023-08-09 01:41:28 +03:00
committed by GitHub
parent b4e8b75b67
commit 96e165c7ff
3 changed files with 46 additions and 55 deletions

View File

@@ -0,0 +1,6 @@
---
"@pnpm/tarball-fetcher": patch
"pnpm": patch
---
Performance optimizations. Package tarballs are now download directly to memory and built to an ArrayBuffer. Hashing and other operations are avoided until the stream has been fully received [#6819](https://github.com/pnpm/pnpm/pull/6819).

View File

@@ -7,6 +7,7 @@ import { type FetchFromRegistry } from '@pnpm/fetching-types'
import * as retry from '@zkochan/retry'
import throttle from 'lodash.throttle'
import ssri from 'ssri'
import { Readable } from 'stream'
import { BadTarballError } from './errorTypes'
const BIG_TARBALL_SIZE = 1024 * 1024 * 5 // 5 MB
@@ -158,27 +159,51 @@ export function createDownloader (
? throttle(opts.onProgress, 500)
: undefined
let downloaded = 0
res.body!.on('data', (chunk: Buffer) => {
const chunks: Buffer[] = []
// This will handle the 'data', 'error', and 'end' events.
for await (const chunk of res.body!) {
chunks.push(chunk as Buffer)
downloaded += chunk.length
if (onProgress != null) onProgress(downloaded)
})
onProgress?.(downloaded)
}
if (size !== null && size !== downloaded) {
throw new BadTarballError({
expectedSize: size,
receivedSize: downloaded,
tarballUrl: url,
})
}
// eslint-disable-next-line no-async-promise-executor
return await new Promise<FetchResult>(async (resolve, reject) => {
const stream = res.body!
.on('error', reject)
const data: Buffer = Buffer.from(new ArrayBuffer(downloaded))
let offset: number = 0
for (const chunk of chunks) {
chunk.copy(data, offset)
offset += chunk.length
}
try {
const [integrityCheckResult, filesIndex] = await Promise.all([
opts.integrity ? safeCheckStream(res.body, opts.integrity, url) : true,
opts.cafs.addFilesFromTarball(res.body!, opts.manifest),
waitTillClosed({ stream, size, getDownloaded: () => downloaded, url }),
])
if (integrityCheckResult !== true) {
// eslint-disable-next-line
throw integrityCheckResult
if (opts.integrity) {
try {
ssri.checkData(data, opts.integrity, { error: true })
} catch (err: any) { // eslint-disable-line
throw new TarballIntegrityError({
algorithm: err.algorithm,
expected: err.expected,
found: err.found,
sri: err.sri,
url,
})
}
}
const streamForTarball = new Readable({
read () {
this.push(data)
this.push(null)
},
})
const filesIndex = await opts.cafs.addFilesFromTarball(streamForTarball, opts.manifest)
resolve({ filesIndex })
} catch (err: any) { // eslint-disable-line
// If the error is not an integrity check error, then it happened during extracting the tarball
@@ -201,43 +226,3 @@ export function createDownloader (
}
}
}
async function safeCheckStream (stream: any, integrity: string, url: string): Promise<true | Error> { // eslint-disable-line @typescript-eslint/no-explicit-any
try {
await ssri.checkStream(stream, integrity)
return true
} catch (err: any) { // eslint-disable-line
return new TarballIntegrityError({
algorithm: err['algorithm'],
expected: err['expected'],
found: err['found'],
sri: err['sri'],
url,
})
}
}
async function waitTillClosed (
opts: {
stream: NodeJS.EventEmitter
size: null | number
getDownloaded: () => number
url: string
}
) {
return new Promise<void>((resolve, reject) => {
opts.stream.on('end', () => {
const downloaded = opts.getDownloaded()
if (opts.size !== null && opts.size !== downloaded) {
const err = new BadTarballError({
expectedSize: opts.size,
receivedSize: downloaded,
tarballUrl: opts.url,
})
reject(err)
return
}
resolve()
})
})
}

View File

@@ -130,7 +130,7 @@ test('fail when integrity check fails two times in a row', async () => {
new TarballIntegrityError({
algorithm: 'sha512',
expected: 'sha1-HssnaJydJVE+rbyZFKc/VAi+enY=',
found: 'sha512-VuFL1iPaIxJK/k3gTxStIkc6+wSiDwlLdnCWNZyapsVLobu/0onvGOZolASZpfBFiDJYrOIGiDzgLIULTW61Vg== sha1-ACjKMFA7S6uRFXSDFfH4aT+4B4Y=',
found: 'sha1-ACjKMFA7S6uRFXSDFfH4aT+4B4Y=',
sri: '',
url: resolution.tarball,
})