From be633ffb73e10bb19f0ba4e95fb16d88e128c8f8 Mon Sep 17 00:00:00 2001 From: zkochan Date: Sun, 26 Feb 2017 19:48:13 +0200 Subject: [PATCH] feat: save tarballs before unpacking ...and unpack tarballs one by one Close #600 --- src/api/extendOptions.ts | 2 +- src/install/fetch.ts | 3 + src/install/fetchResolution.ts | 49 +++++++++++++--- src/network/got.ts | 74 ++++++++++++++++++++---- src/resolve/npm/getHost.ts | 9 +++ src/resolve/npm/getRegistryFolderName.ts | 11 ++++ src/resolve/npm/index.ts | 3 +- src/resolve/npm/loadPackageMeta.ts | 3 +- 8 files changed, 132 insertions(+), 22 deletions(-) create mode 100644 src/resolve/npm/getHost.ts create mode 100644 src/resolve/npm/getRegistryFolderName.ts diff --git a/src/api/extendOptions.ts b/src/api/extendOptions.ts index 807ebae6b0..c4236e25fe 100644 --- a/src/api/extendOptions.ts +++ b/src/api/extendOptions.ts @@ -26,7 +26,7 @@ const defaults = () => ({ engineStrict: false, metaCache: new Map(), networkConcurrency: 16, - fetchingConcurrency: 4, + fetchingConcurrency: 16, lockStaleDuration: 60 * 1000, // 1 minute }) diff --git a/src/install/fetch.ts b/src/install/fetch.ts index 1cb19a05ce..19ef9ade0f 100644 --- a/src/install/fetch.ts +++ b/src/install/fetch.ts @@ -80,6 +80,7 @@ export default async function fetch ( resolution: resolution, loggedPkg: options.loggedPkg, got: options.got, + localRegistry: options.localRegistry, })) if (fetchingPkg == null) { @@ -114,6 +115,7 @@ async function fetchToStore (opts: { resolution: Resolution, loggedPkg: LoggedPkg, got: Got, + localRegistry: string, }): Promise { const target = opts.target const targetExists = await exists(target) @@ -137,6 +139,7 @@ async function fetchToStore (opts: { await fetchResolution(opts.resolution, targetStage, { got: opts.got, loggedPkg: opts.loggedPkg, + localRegistry: opts.localRegistry, }) logStatus({ status: 'fetched', diff --git a/src/install/fetchResolution.ts b/src/install/fetchResolution.ts index 9004031795..d6c07545a3 100644 --- a/src/install/fetchResolution.ts +++ b/src/install/fetchResolution.ts @@ -5,9 +5,13 @@ import spawn = require('cross-spawn') import execa = require('execa') import {IncomingMessage} from 'http' import * as unpackStream from 'unpack-stream' +import existsFile = require('exists-file') +import pLimit = require('p-limit') import {Resolution} from '../resolve' import {Got} from '../network/got' import logStatus from '../logging/logInstallStatus' +import parseNpmTarballUrl from 'parse-npm-tarball-url' +import {escapeHost} from '../resolve/npm/getRegistryFolderName' const gitLogger = logger('git') @@ -15,7 +19,8 @@ const fetchLogger = logger('fetch') export type FetchOptions = { loggedPkg: LoggedPkg, - got: Got + got: Got, + localRegistry: string, } export type PackageDist = { @@ -103,15 +108,45 @@ export function fetchFromTarball (dir: string, dist: PackageDist, opts: FetchOpt } export async function fetchFromRemoteTarball (dir: string, dist: PackageDist, opts: FetchOptions) { - const stream: IncomingMessage = await opts.got.getStream(dist.tarball) - await unpackStream.remote(stream, dir, { + const localTarballPath = getLocalTarballPath(dist.tarball, opts.localRegistry) + if (!await existsFile(localTarballPath)) { + await opts.got.download(dist.tarball, localTarballPath, { + shasum: dist.shasum, + onStart: () => logStatus({status: 'fetching', pkg: opts.loggedPkg}), + onProgress: (done: number, total: number) => + logStatus({ + status: 'fetching', + pkg: opts.loggedPkg, + progress: { done, total }, + }) + }) + } + await fetchFromLocalTarball(dir, { shasum: dist.shasum, - onStart: () => logStatus({status: 'fetching', pkg: opts.loggedPkg}), - onProgress: (done: number, total: number) => logStatus({status: 'fetching', pkg: opts.loggedPkg, progress: { done, total }}) + tarball: localTarballPath, }) fetchLogger.debug(`finish ${dist.shasum} ${dist.tarball}`) } -export async function fetchFromLocalTarball (dir: string, dist: PackageDist) { - await unpackStream.local(fs.createReadStream(dist.tarball), dir) +function getLocalTarballPath (tarballUrl: string, localRegistry: string) { + const tarball = parseNpmTarballUrl(tarballUrl) + if (tarball) { + const escapedHost = escapeHost(tarball.host) + return path.join(localRegistry, escapedHost, tarball.pkg.name, + `${unscope(tarball.pkg.name)}-${tarball.pkg.version}.tgz`) + } + return path.join(localRegistry, tarballUrl.replace(/^.*:\/\/(git@)?/, '')) +} + +function unscope (pkgName: string) { + if (pkgName[0] === '@') { + return pkgName.split('/')[1] + } + return pkgName +} + +const limitUnpack = pLimit(1) + +async function fetchFromLocalTarball (dir: string, dist: PackageDist) { + await limitUnpack(() => unpackStream.local(fs.createReadStream(dist.tarball), dir)) } diff --git a/src/network/got.ts b/src/network/got.ts index 32644c8de1..60a318a411 100644 --- a/src/network/got.ts +++ b/src/network/got.ts @@ -1,8 +1,11 @@ import {IncomingMessage} from 'http' -import pauseStream = require('pause-stream') import getRegistryAuthInfo = require('registry-auth-token') import memoize = require('lodash.memoize') import pLimit = require('p-limit') +import fs = require('mz/fs') +import crypto = require('crypto') +import mkdirp = require('mkdirp-promise') +import path = require('path') export type RequestParams = { auth?: { @@ -18,7 +21,11 @@ export type HttpResponse = { } export type Got = { - getStream: (url: string) => Promise, + download(url: string, saveto: string, opts: { + onStart?: () => void, + onProgress?: (downloaded: number, totalSize: number) => void, + shasum?: string + }): Promise, getJSON(url: string): Promise, } @@ -39,16 +46,59 @@ export default (client: NpmRegistryClient, opts: {networkConcurrency: number}): })) } - const getStream = function (url: string): Promise { - return limit(() => new Promise((resolve, reject) => { - client.fetch(url, createOptions(url), (err: Error, res: IncomingMessage) => { - if (err) return reject(err) - const ps = pauseStream() - // without pausing, gunzip/tar-fs would miss the beginning of the stream - res.pipe(ps.pause()) - resolve(ps) + function download (url: string, saveto: string, opts: { + onStart?: () => void, + onProgress?: (downloaded: number, totalSize: number) => void, + shasum?: string + }): Promise { + return limit(async () => { + const stage = `${saveto}+stage` + await mkdirp(path.dirname(stage)) + + return new Promise((resolve, reject) => { + client.fetch(url, createOptions(url), async (err: Error, res: IncomingMessage) => { + if (err) return reject(err) + const writeStream = fs.createWriteStream(stage) + const actualShasum = crypto.createHash('sha1') + + res + .on('response', start) + .on('data', (_: Buffer) => { actualShasum.update(_) }) + .on('error', reject) + .pipe(writeStream) + .on('error', reject) + .on('finish', finish) + + function start (res: IncomingMessage) { + if (res.statusCode !== 200) { + return reject(new Error(`Invalid response: ${res.statusCode}`)) + } + + if (opts.onStart) opts.onStart() + if (opts.onProgress && ('content-length' in res.headers)) { + const onProgress = opts.onProgress + let downloaded = 0 + let size = +res.headers['content-length'] + res.on('data', (chunk: Buffer) => { + downloaded += chunk.length + onProgress(downloaded, size) + }) + } + } + + async function finish () { + const digest = actualShasum.digest('hex') + if (opts.shasum && digest !== opts.shasum) { + reject(new Error(`Incorrect shasum (expected ${opts.shasum}, got ${digest})`)) + return + } + + await fs.rename(stage, saveto) + resolve() + } + }) }) - })) + }) } function createOptions (url: string): RequestParams { @@ -75,6 +125,6 @@ export default (client: NpmRegistryClient, opts: {networkConcurrency: number}): return { getJSON: memoize(getJSON), - getStream: getStream, + download, } } diff --git a/src/resolve/npm/getHost.ts b/src/resolve/npm/getHost.ts new file mode 100644 index 0000000000..aa22b11696 --- /dev/null +++ b/src/resolve/npm/getHost.ts @@ -0,0 +1,9 @@ +import url = require('url') + +export default function getHost (rawUrl: string) { + const urlObj = url.parse(rawUrl) + if (!urlObj || !urlObj.host) { + throw new Error(`Couldn't get host from ${rawUrl}`) + } + return urlObj.host +} diff --git a/src/resolve/npm/getRegistryFolderName.ts b/src/resolve/npm/getRegistryFolderName.ts new file mode 100644 index 0000000000..c19878cb8d --- /dev/null +++ b/src/resolve/npm/getRegistryFolderName.ts @@ -0,0 +1,11 @@ +import url = require('url') +import getHost from './getHost' + +export default function (registryUrl: string): string { + const host = getHost(registryUrl) + return escapeHost(host) +} + +export function escapeHost (host: string) { + return host.replace(':', '+') +} diff --git a/src/resolve/npm/index.ts b/src/resolve/npm/index.ts index 2c5d6b44d0..cad393bfc2 100644 --- a/src/resolve/npm/index.ts +++ b/src/resolve/npm/index.ts @@ -5,6 +5,7 @@ import {PackageSpec, ResolveOptions, TarballResolution, ResolveResult} from '..' import logStatus from '../../logging/logInstallStatus' import loadPkgMeta, {PackageMeta} from './loadPackageMeta' import createPkgId from './createNpmPkgId' +import getHost from './getHost' export {PackageMeta} @@ -37,7 +38,7 @@ export default async function resolveNpm (spec: PackageSpec, opts: ResolveOption spec.raw + '\n' + message) throw err } - const registryHost = url.parse(correctPkg.dist.tarball).host + const registryHost = getHost(correctPkg.dist.tarball) const id = createPkgId(registryHost, correctPkg.name, correctPkg.version) const resolution: TarballResolution = { diff --git a/src/resolve/npm/loadPackageMeta.ts b/src/resolve/npm/loadPackageMeta.ts index b2dabab723..fb66674b72 100644 --- a/src/resolve/npm/loadPackageMeta.ts +++ b/src/resolve/npm/loadPackageMeta.ts @@ -7,6 +7,7 @@ import {Got} from '../../network/got' import {PackageSpec} from '..' import {Package} from '../../types' import createPkgId from './createNpmPkgId' +import getRegistryFolderName from './getRegistryFolderName' import logger from 'pnpm-logger' import pLimit = require('p-limit') @@ -38,7 +39,7 @@ export default async function loadPkgMetaNonCached ( return metaCache.get(spec.name) } - const registry = (url.parse(registryUrl(spec.scope)).host).replace(':', '+') + const registry = getRegistryFolderName(registryUrl(spec.scope)) const pkgMirror = path.join(localRegistry, registry, spec.name) const limit = metafileOperationLimits[pkgMirror] = metafileOperationLimits[pkgMirror] || pLimit(1)