feat: save tarballs before unpacking

...and unpack tarballs one by one

Close #600
This commit is contained in:
zkochan
2017-02-26 19:48:13 +02:00
parent 65d7c20763
commit be633ffb73
8 changed files with 132 additions and 22 deletions

View File

@@ -26,7 +26,7 @@ const defaults = () => (<StrictPnpmOptions>{
engineStrict: false,
metaCache: new Map(),
networkConcurrency: 16,
fetchingConcurrency: 4,
fetchingConcurrency: 16,
lockStaleDuration: 60 * 1000, // 1 minute
})

View File

@@ -80,6 +80,7 @@ export default async function fetch (
resolution: <Resolution>resolution,
loggedPkg: options.loggedPkg,
got: options.got,
localRegistry: options.localRegistry,
}))
if (fetchingPkg == null) {
@@ -114,6 +115,7 @@ async function fetchToStore (opts: {
resolution: Resolution,
loggedPkg: LoggedPkg,
got: Got,
localRegistry: string,
}): Promise<Boolean> {
const target = opts.target
const targetExists = await exists(target)
@@ -137,6 +139,7 @@ async function fetchToStore (opts: {
await fetchResolution(opts.resolution, targetStage, {
got: opts.got,
loggedPkg: opts.loggedPkg,
localRegistry: opts.localRegistry,
})
logStatus({
status: 'fetched',

View File

@@ -5,9 +5,13 @@ import spawn = require('cross-spawn')
import execa = require('execa')
import {IncomingMessage} from 'http'
import * as unpackStream from 'unpack-stream'
import existsFile = require('exists-file')
import pLimit = require('p-limit')
import {Resolution} from '../resolve'
import {Got} from '../network/got'
import logStatus from '../logging/logInstallStatus'
import parseNpmTarballUrl from 'parse-npm-tarball-url'
import {escapeHost} from '../resolve/npm/getRegistryFolderName'
const gitLogger = logger('git')
@@ -15,7 +19,8 @@ const fetchLogger = logger('fetch')
export type FetchOptions = {
loggedPkg: LoggedPkg,
got: Got
got: Got,
localRegistry: string,
}
export type PackageDist = {
@@ -103,15 +108,45 @@ export function fetchFromTarball (dir: string, dist: PackageDist, opts: FetchOpt
}
export async function fetchFromRemoteTarball (dir: string, dist: PackageDist, opts: FetchOptions) {
const stream: IncomingMessage = await opts.got.getStream(dist.tarball)
await unpackStream.remote(stream, dir, {
const localTarballPath = getLocalTarballPath(dist.tarball, opts.localRegistry)
if (!await existsFile(localTarballPath)) {
await opts.got.download(dist.tarball, localTarballPath, {
shasum: dist.shasum,
onStart: () => logStatus({status: 'fetching', pkg: opts.loggedPkg}),
onProgress: (done: number, total: number) =>
logStatus({
status: 'fetching',
pkg: opts.loggedPkg,
progress: { done, total },
})
})
}
await fetchFromLocalTarball(dir, {
shasum: dist.shasum,
onStart: () => logStatus({status: 'fetching', pkg: opts.loggedPkg}),
onProgress: (done: number, total: number) => logStatus({status: 'fetching', pkg: opts.loggedPkg, progress: { done, total }})
tarball: localTarballPath,
})
fetchLogger.debug(`finish ${dist.shasum} ${dist.tarball}`)
}
export async function fetchFromLocalTarball (dir: string, dist: PackageDist) {
await unpackStream.local(fs.createReadStream(dist.tarball), dir)
function getLocalTarballPath (tarballUrl: string, localRegistry: string) {
const tarball = parseNpmTarballUrl(tarballUrl)
if (tarball) {
const escapedHost = escapeHost(tarball.host)
return path.join(localRegistry, escapedHost, tarball.pkg.name,
`${unscope(tarball.pkg.name)}-${tarball.pkg.version}.tgz`)
}
return path.join(localRegistry, tarballUrl.replace(/^.*:\/\/(git@)?/, ''))
}
function unscope (pkgName: string) {
if (pkgName[0] === '@') {
return pkgName.split('/')[1]
}
return pkgName
}
const limitUnpack = pLimit(1)
async function fetchFromLocalTarball (dir: string, dist: PackageDist) {
await limitUnpack(() => unpackStream.local(fs.createReadStream(dist.tarball), dir))
}

View File

@@ -1,8 +1,11 @@
import {IncomingMessage} from 'http'
import pauseStream = require('pause-stream')
import getRegistryAuthInfo = require('registry-auth-token')
import memoize = require('lodash.memoize')
import pLimit = require('p-limit')
import fs = require('mz/fs')
import crypto = require('crypto')
import mkdirp = require('mkdirp-promise')
import path = require('path')
export type RequestParams = {
auth?: {
@@ -18,7 +21,11 @@ export type HttpResponse = {
}
export type Got = {
getStream: (url: string) => Promise<IncomingMessage>,
download(url: string, saveto: string, opts: {
onStart?: () => void,
onProgress?: (downloaded: number, totalSize: number) => void,
shasum?: string
}): Promise<void>,
getJSON<T>(url: string): Promise<T>,
}
@@ -39,16 +46,59 @@ export default (client: NpmRegistryClient, opts: {networkConcurrency: number}):
}))
}
const getStream = function (url: string): Promise<IncomingMessage> {
return limit(() => new Promise((resolve, reject) => {
client.fetch(url, createOptions(url), (err: Error, res: IncomingMessage) => {
if (err) return reject(err)
const ps = pauseStream()
// without pausing, gunzip/tar-fs would miss the beginning of the stream
res.pipe(ps.pause())
resolve(ps)
function download (url: string, saveto: string, opts: {
onStart?: () => void,
onProgress?: (downloaded: number, totalSize: number) => void,
shasum?: string
}): Promise<void> {
return limit(async () => {
const stage = `${saveto}+stage`
await mkdirp(path.dirname(stage))
return new Promise((resolve, reject) => {
client.fetch(url, createOptions(url), async (err: Error, res: IncomingMessage) => {
if (err) return reject(err)
const writeStream = fs.createWriteStream(stage)
const actualShasum = crypto.createHash('sha1')
res
.on('response', start)
.on('data', (_: Buffer) => { actualShasum.update(_) })
.on('error', reject)
.pipe(writeStream)
.on('error', reject)
.on('finish', finish)
function start (res: IncomingMessage) {
if (res.statusCode !== 200) {
return reject(new Error(`Invalid response: ${res.statusCode}`))
}
if (opts.onStart) opts.onStart()
if (opts.onProgress && ('content-length' in res.headers)) {
const onProgress = opts.onProgress
let downloaded = 0
let size = +res.headers['content-length']
res.on('data', (chunk: Buffer) => {
downloaded += chunk.length
onProgress(downloaded, size)
})
}
}
async function finish () {
const digest = actualShasum.digest('hex')
if (opts.shasum && digest !== opts.shasum) {
reject(new Error(`Incorrect shasum (expected ${opts.shasum}, got ${digest})`))
return
}
await fs.rename(stage, saveto)
resolve()
}
})
})
}))
})
}
function createOptions (url: string): RequestParams {
@@ -75,6 +125,6 @@ export default (client: NpmRegistryClient, opts: {networkConcurrency: number}):
return {
getJSON: memoize(getJSON),
getStream: getStream,
download,
}
}

View File

@@ -0,0 +1,9 @@
import url = require('url')
export default function getHost (rawUrl: string) {
const urlObj = url.parse(rawUrl)
if (!urlObj || !urlObj.host) {
throw new Error(`Couldn't get host from ${rawUrl}`)
}
return urlObj.host
}

View File

@@ -0,0 +1,11 @@
import url = require('url')
import getHost from './getHost'
export default function (registryUrl: string): string {
const host = getHost(registryUrl)
return escapeHost(host)
}
export function escapeHost (host: string) {
return host.replace(':', '+')
}

View File

@@ -5,6 +5,7 @@ import {PackageSpec, ResolveOptions, TarballResolution, ResolveResult} from '..'
import logStatus from '../../logging/logInstallStatus'
import loadPkgMeta, {PackageMeta} from './loadPackageMeta'
import createPkgId from './createNpmPkgId'
import getHost from './getHost'
export {PackageMeta}
@@ -37,7 +38,7 @@ export default async function resolveNpm (spec: PackageSpec, opts: ResolveOption
spec.raw + '\n' + message)
throw err
}
const registryHost = <string>url.parse(correctPkg.dist.tarball).host
const registryHost = getHost(correctPkg.dist.tarball)
const id = createPkgId(registryHost, correctPkg.name, correctPkg.version)
const resolution: TarballResolution = {

View File

@@ -7,6 +7,7 @@ import {Got} from '../../network/got'
import {PackageSpec} from '..'
import {Package} from '../../types'
import createPkgId from './createNpmPkgId'
import getRegistryFolderName from './getRegistryFolderName'
import logger from 'pnpm-logger'
import pLimit = require('p-limit')
@@ -38,7 +39,7 @@ export default async function loadPkgMetaNonCached (
return <PackageMeta>metaCache.get(spec.name)
}
const registry = (<string>url.parse(registryUrl(spec.scope)).host).replace(':', '+')
const registry = getRegistryFolderName(registryUrl(spec.scope))
const pkgMirror = path.join(localRegistry, registry, spec.name)
const limit = metafileOperationLimits[pkgMirror] = metafileOperationLimits[pkgMirror] || pLimit(1)