From 96e165c7ff89ba47f0ff03c7eca459f58ce3ff2a Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Wed, 9 Aug 2023 01:41:28 +0300 Subject: [PATCH] perf: improve tarball fetching speed (#6819) --- .changeset/stale-coats-happen.md | 6 ++ .../src/remoteTarballFetcher.ts | 93 ++++++++----------- fetching/tarball-fetcher/test/fetch.ts | 2 +- 3 files changed, 46 insertions(+), 55 deletions(-) create mode 100644 .changeset/stale-coats-happen.md diff --git a/.changeset/stale-coats-happen.md b/.changeset/stale-coats-happen.md new file mode 100644 index 0000000000..a45e5910bc --- /dev/null +++ b/.changeset/stale-coats-happen.md @@ -0,0 +1,6 @@ +--- +"@pnpm/tarball-fetcher": patch +"pnpm": patch +--- + +Performance optimizations. Package tarballs are now download directly to memory and built to an ArrayBuffer. Hashing and other operations are avoided until the stream has been fully received [#6819](https://github.com/pnpm/pnpm/pull/6819). diff --git a/fetching/tarball-fetcher/src/remoteTarballFetcher.ts b/fetching/tarball-fetcher/src/remoteTarballFetcher.ts index 969f2f3302..bf8cd290c1 100644 --- a/fetching/tarball-fetcher/src/remoteTarballFetcher.ts +++ b/fetching/tarball-fetcher/src/remoteTarballFetcher.ts @@ -7,6 +7,7 @@ import { type FetchFromRegistry } from '@pnpm/fetching-types' import * as retry from '@zkochan/retry' import throttle from 'lodash.throttle' import ssri from 'ssri' +import { Readable } from 'stream' import { BadTarballError } from './errorTypes' const BIG_TARBALL_SIZE = 1024 * 1024 * 5 // 5 MB @@ -158,27 +159,51 @@ export function createDownloader ( ? throttle(opts.onProgress, 500) : undefined let downloaded = 0 - res.body!.on('data', (chunk: Buffer) => { + const chunks: Buffer[] = [] + // This will handle the 'data', 'error', and 'end' events. + for await (const chunk of res.body!) { + chunks.push(chunk as Buffer) downloaded += chunk.length - if (onProgress != null) onProgress(downloaded) - }) + onProgress?.(downloaded) + } + if (size !== null && size !== downloaded) { + throw new BadTarballError({ + expectedSize: size, + receivedSize: downloaded, + tarballUrl: url, + }) + } // eslint-disable-next-line no-async-promise-executor return await new Promise(async (resolve, reject) => { - const stream = res.body! - .on('error', reject) + const data: Buffer = Buffer.from(new ArrayBuffer(downloaded)) + let offset: number = 0 + for (const chunk of chunks) { + chunk.copy(data, offset) + offset += chunk.length + } try { - const [integrityCheckResult, filesIndex] = await Promise.all([ - opts.integrity ? safeCheckStream(res.body, opts.integrity, url) : true, - opts.cafs.addFilesFromTarball(res.body!, opts.manifest), - waitTillClosed({ stream, size, getDownloaded: () => downloaded, url }), - ]) - if (integrityCheckResult !== true) { - // eslint-disable-next-line - throw integrityCheckResult + if (opts.integrity) { + try { + ssri.checkData(data, opts.integrity, { error: true }) + } catch (err: any) { // eslint-disable-line + throw new TarballIntegrityError({ + algorithm: err.algorithm, + expected: err.expected, + found: err.found, + sri: err.sri, + url, + }) + } } - + const streamForTarball = new Readable({ + read () { + this.push(data) + this.push(null) + }, + }) + const filesIndex = await opts.cafs.addFilesFromTarball(streamForTarball, opts.manifest) resolve({ filesIndex }) } catch (err: any) { // eslint-disable-line // If the error is not an integrity check error, then it happened during extracting the tarball @@ -201,43 +226,3 @@ export function createDownloader ( } } } - -async function safeCheckStream (stream: any, integrity: string, url: string): Promise { // eslint-disable-line @typescript-eslint/no-explicit-any - try { - await ssri.checkStream(stream, integrity) - return true - } catch (err: any) { // eslint-disable-line - return new TarballIntegrityError({ - algorithm: err['algorithm'], - expected: err['expected'], - found: err['found'], - sri: err['sri'], - url, - }) - } -} - -async function waitTillClosed ( - opts: { - stream: NodeJS.EventEmitter - size: null | number - getDownloaded: () => number - url: string - } -) { - return new Promise((resolve, reject) => { - opts.stream.on('end', () => { - const downloaded = opts.getDownloaded() - if (opts.size !== null && opts.size !== downloaded) { - const err = new BadTarballError({ - expectedSize: opts.size, - receivedSize: downloaded, - tarballUrl: opts.url, - }) - reject(err) - return - } - resolve() - }) - }) -} diff --git a/fetching/tarball-fetcher/test/fetch.ts b/fetching/tarball-fetcher/test/fetch.ts index b1a83a8556..8e6129fa92 100644 --- a/fetching/tarball-fetcher/test/fetch.ts +++ b/fetching/tarball-fetcher/test/fetch.ts @@ -130,7 +130,7 @@ test('fail when integrity check fails two times in a row', async () => { new TarballIntegrityError({ algorithm: 'sha512', expected: 'sha1-HssnaJydJVE+rbyZFKc/VAi+enY=', - found: 'sha512-VuFL1iPaIxJK/k3gTxStIkc6+wSiDwlLdnCWNZyapsVLobu/0onvGOZolASZpfBFiDJYrOIGiDzgLIULTW61Vg== sha1-ACjKMFA7S6uRFXSDFfH4aT+4B4Y=', + found: 'sha1-ACjKMFA7S6uRFXSDFfH4aT+4B4Y=', sri: '', url: resolution.tarball, })