Files
pnpm/src/createDownloader.ts
Zoltan Kochan efb8115503 refactor: save to a temp folder
BREAKING CHANGE:

programmatic API changed
2018-02-10 00:07:57 +02:00

210 lines
6.1 KiB
TypeScript

import {FetchResult} from '@pnpm/fetcher-base'
import createFetcher from 'fetch-from-npm-registry'
import createWriteStreamAtomic = require('fs-write-stream-atomic')
import {IncomingMessage} from 'http'
import mkdirp = require('mkdirp-promise')
import path = require('path')
import pathTemp = require('path-temp')
import retry = require('retry')
import rimraf = require('rimraf')
import ssri = require('ssri')
import unpackStream = require('unpack-stream')
import urlLib = require('url')
import {BadTarballError} from './errorTypes'
export interface HttpResponse {
body: string
}
export type DownloadFunction = (url: string, saveto: string, opts: {
auth?: {
scope: string,
token: string | undefined,
password: string | undefined,
username: string | undefined,
email: string | undefined,
auth: string | undefined,
alwaysAuth: string | undefined,
},
unpackTo: string,
registry?: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
ignore?: (filename: string) => boolean,
integrity?: string
generatePackageIntegrity?: boolean,
}) => Promise<FetchResult>
export interface NpmRegistryClient {
get: (url: string, getOpts: object, cb: (err: Error, data: object, raw: object, res: HttpResponse) => void) => void,
fetch: (url: string, opts: {auth?: object}, cb: (err: Error, res: IncomingMessage) => void) => void,
}
export default (
gotOpts: {
alwaysAuth: boolean,
registry: string,
// proxy
proxy?: string,
localAddress?: string,
// ssl
ca?: string,
cert?: string,
key?: string,
strictSSL?: boolean,
// retry
retry?: {
retries?: number,
factor?: number,
minTimeout?: number,
maxTimeout?: number,
randomize?: boolean,
},
userAgent?: string,
},
): DownloadFunction => {
const fetchFromNpmRegistry = createFetcher(gotOpts)
const retryOpts = {
retries: 2,
factor: 10,
minTimeout: 1e4, // 10 seconds
maxTimeout: 6e4, // 1 minute
...gotOpts.retry
}
return async function download (url: string, saveto: string, opts: {
auth?: {
scope: string,
token: string | undefined,
password: string | undefined,
username: string | undefined,
email: string | undefined,
auth: string | undefined,
alwaysAuth: string | undefined,
},
unpackTo: string,
registry?: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
ignore?: (filename: string) => boolean,
integrity?: string,
generatePackageIntegrity?: boolean,
}): Promise<FetchResult> {
await mkdirp(path.dirname(saveto))
// If a tarball is hosted on a different place than the manifest, only send
// credentials on `alwaysAuth`
const shouldAuth = opts.auth && (
opts.auth.alwaysAuth ||
!opts.registry ||
urlLib.parse(url).host === urlLib.parse(opts.registry).host
)
const op = retry.operation(retryOpts)
return new Promise<FetchResult>((resolve, reject) => {
op.attempt((currentAttempt) => {
fetch(currentAttempt)
.then(resolve)
.catch((err) => {
if (err.code === 'E403') {
reject(err)
return
}
if (op.retry(err)) {
return
}
reject(op.mainError())
})
})
})
async function fetch (currentAttempt: number): Promise<FetchResult> {
try {
const res = await fetchFromNpmRegistry(url, {auth: shouldAuth && opts.auth as any || undefined}) // tslint:disable-line
if (res.status !== 200) {
const err = new Error(`${res.status} ${res.statusText}: ${url}`)
// tslint:disable
err['code'] = `E${res.status}`
err['uri'] = url
err['response'] = res
// tslint:enable
throw err
}
const contentLength = res.headers.has('content-length') && res.headers.get('content-length')
const size = typeof contentLength === 'string'
? parseInt(contentLength, 10)
: null
if (opts.onStart) {
opts.onStart(size, currentAttempt)
}
const onProgress = opts.onProgress
let downloaded = 0
res.body.on('data', (chunk: Buffer) => {
downloaded += chunk.length
if (onProgress) onProgress(downloaded)
})
const writeStream = createWriteStreamAtomic(saveto)
return await new Promise<FetchResult>((resolve, reject) => {
const stream = res.body
.on('error', reject)
.pipe(writeStream)
.on('error', reject)
const tempLocation = pathTemp(opts.unpackTo)
Promise.all([
opts.integrity && ssri.checkStream(res.body, opts.integrity),
unpackStream.local(res.body, tempLocation, {
generateIntegrity: opts.generatePackageIntegrity,
ignore: opts.ignore,
}),
waitTillClosed({ stream, size, getDownloaded: () => downloaded, url }),
])
.then((vals) => resolve({ tempLocation, filesIndex: vals[1] }))
.catch((err) => {
rimraf(tempLocation, (err) => {
// Just ignoring this error
// A redundant stage folder won't break anything
})
reject(err)
})
})
} catch (err) {
err.attempts = currentAttempt
err.resource = url
throw err
}
}
}
}
function waitTillClosed (
opts: {
stream: NodeJS.ReadableStream,
size: null | number,
getDownloaded: () => number,
url: string,
},
) {
return new Promise((resolve, reject) => {
opts.stream.on('close', () => {
const downloaded = opts.getDownloaded()
if (opts.size !== null && opts.size !== downloaded) {
const err = new BadTarballError({
expectedSize: opts.size,
receivedSize: downloaded,
tarballUrl: opts.url,
})
reject(err)
return
}
resolve()
})
})
}