refactor: use RxJS instead of "most" in @pnpm/default-reporter

PR #2841
This commit is contained in:
Zoltan Kochan
2020-09-10 11:46:38 +03:00
committed by GitHub
parent 755632720d
commit 09b42d3ab0
28 changed files with 577 additions and 584 deletions

View File

@@ -0,0 +1,5 @@
---
"@pnpm/default-reporter": minor
---
Use RxJS instead of "most".

View File

@@ -34,16 +34,15 @@
"@pnpm/core-loggers": "workspace:4.2.0",
"@pnpm/error": "workspace:1.3.1",
"@pnpm/types": "workspace:6.2.0",
"@zkochan/zen-push": "1.0.0",
"ansi-diff": "^1.1.1",
"chalk": "^4.1.0",
"most": "^1.8.1",
"normalize-path": "^3.0.0",
"pretty-bytes": "^5.4.1",
"pretty-ms": "^7.0.0",
"pretty-time": "^1.1.0",
"ramda": "^0.27.1",
"right-pad": "^1.0.1",
"rxjs": "^6.6.3",
"semver": "^7.3.2",
"stacktracey": "^1.2.127",
"string-length": "^4.0.1",

View File

@@ -1,13 +1,13 @@
import { Config } from '@pnpm/config'
import * as logs from '@pnpm/core-loggers'
import { LogLevel } from '@pnpm/logger'
import PushStream from '@zkochan/zen-push'
import * as Rx from 'rxjs'
import { map, mergeAll } from 'rxjs/operators'
import { EOL } from './constants'
import mergeOutputs from './mergeOutputs'
import reporterForClient from './reporterForClient'
import reporterForServer from './reporterForServer'
import createDiffer = require('ansi-diff')
import most = require('most')
export default function (
opts: {
@@ -26,7 +26,8 @@ export default function (
}
) {
if (opts.context.argv[0] === 'server') {
const log$ = most.fromEvent<logs.Log>('data', opts.streamParser)
// eslint-disable-next-line
const log$ = Rx.fromEvent<logs.Log>(opts.streamParser as any, 'data')
reporterForServer(log$, opts.context.config)
return
}
@@ -75,28 +76,28 @@ export function toOutput$ (
config?: Config
}
}
): most.Stream<string> {
): Rx.Observable<string> {
opts = opts || {}
const contextPushStream = new PushStream()
const fetchingProgressPushStream = new PushStream()
const progressPushStream = new PushStream()
const stagePushStream = new PushStream()
const deprecationPushStream = new PushStream()
const summaryPushStream = new PushStream()
const lifecyclePushStream = new PushStream()
const statsPushStream = new PushStream()
const packageImportMethodPushStream = new PushStream()
const installCheckPushStream = new PushStream()
const registryPushStream = new PushStream()
const rootPushStream = new PushStream()
const packageManifestPushStream = new PushStream()
const linkPushStream = new PushStream()
const otherPushStream = new PushStream()
const hookPushStream = new PushStream()
const skippedOptionalDependencyPushStream = new PushStream()
const scopePushStream = new PushStream()
const requestRetryPushStream = new PushStream()
setTimeout(() => { // setTimeout is a workaround for a strange bug in most https://github.com/cujojs/most/issues/491
const contextPushStream = new Rx.Subject<logs.ContextLog>()
const fetchingProgressPushStream = new Rx.Subject<logs.FetchingProgressLog>()
const progressPushStream = new Rx.Subject<logs.ProgressLog>()
const stagePushStream = new Rx.Subject<logs.StageLog>()
const deprecationPushStream = new Rx.Subject<logs.DeprecationLog>()
const summaryPushStream = new Rx.Subject<logs.SummaryLog>()
const lifecyclePushStream = new Rx.Subject<logs.LifecycleLog>()
const statsPushStream = new Rx.Subject<logs.StatsLog>()
const packageImportMethodPushStream = new Rx.Subject<logs.PackageImportMethodLog>()
const installCheckPushStream = new Rx.Subject<logs.InstallCheckLog>()
const registryPushStream = new Rx.Subject<logs.RegistryLog>()
const rootPushStream = new Rx.Subject<logs.RootLog>()
const packageManifestPushStream = new Rx.Subject<logs.PackageManifestLog>()
const linkPushStream = new Rx.Subject<logs.LinkLog>()
const otherPushStream = new Rx.Subject<logs.Log>()
const hookPushStream = new Rx.Subject<logs.HookLog>()
const skippedOptionalDependencyPushStream = new Rx.Subject<logs.SkippedOptionalDependencyLog>()
const scopePushStream = new Rx.Subject<logs.ScopeLog>()
const requestRetryPushStream = new Rx.Subject<logs.RequestRetryLog>()
setTimeout(() => {
opts.streamParser['on']('data', (log: logs.Log) => {
switch (log.name) {
case 'pnpm:context':
@@ -163,27 +164,27 @@ export function toOutput$ (
})
}, 0)
const log$ = {
context: most.from<logs.ContextLog>(contextPushStream.observable),
deprecation: most.from<logs.DeprecationLog>(deprecationPushStream.observable),
fetchingProgress: most.from<logs.FetchingProgressLog>(fetchingProgressPushStream.observable),
hook: most.from<logs.HookLog>(hookPushStream.observable),
installCheck: most.from<logs.InstallCheckLog>(installCheckPushStream.observable),
lifecycle: most.from<logs.LifecycleLog>(lifecyclePushStream.observable),
link: most.from<logs.LinkLog>(linkPushStream.observable),
other: most.from<logs.Log>(otherPushStream.observable),
packageImportMethod: most.from<logs.PackageImportMethodLog>(packageImportMethodPushStream.observable),
packageManifest: most.from<logs.PackageManifestLog>(packageManifestPushStream.observable),
progress: most.from<logs.ProgressLog>(progressPushStream.observable),
registry: most.from<logs.RegistryLog>(registryPushStream.observable),
requestRetry: most.from<logs.RequestRetryLog>(requestRetryPushStream.observable),
root: most.from<logs.RootLog>(rootPushStream.observable),
scope: most.from<logs.ScopeLog>(scopePushStream.observable),
skippedOptionalDependency: most.from<logs.SkippedOptionalDependencyLog>(skippedOptionalDependencyPushStream.observable),
stage: most.from<logs.StageLog>(stagePushStream.observable),
stats: most.from<logs.StatsLog>(statsPushStream.observable),
summary: most.from<logs.SummaryLog>(summaryPushStream.observable),
context: Rx.from(contextPushStream),
deprecation: Rx.from(deprecationPushStream),
fetchingProgress: Rx.from(fetchingProgressPushStream),
hook: Rx.from(hookPushStream),
installCheck: Rx.from(installCheckPushStream),
lifecycle: Rx.from(lifecyclePushStream),
link: Rx.from(linkPushStream),
other: Rx.from(otherPushStream),
packageImportMethod: Rx.from(packageImportMethodPushStream),
packageManifest: Rx.from(packageManifestPushStream),
progress: Rx.from(progressPushStream),
registry: Rx.from(registryPushStream),
requestRetry: Rx.from(requestRetryPushStream),
root: Rx.from(rootPushStream),
scope: Rx.from(scopePushStream),
skippedOptionalDependency: Rx.from(skippedOptionalDependencyPushStream),
stage: Rx.from(stagePushStream),
stats: Rx.from(statsPushStream),
summary: Rx.from(summaryPushStream),
}
const outputs: Array<most.Stream<most.Stream<{msg: string}>>> = reporterForClient(
const outputs: Array<Rx.Observable<Rx.Observable<{msg: string}>>> = reporterForClient(
log$,
{
appendOnly: opts.reportingOptions?.appendOnly,
@@ -199,10 +200,11 @@ export function toOutput$ (
)
if (opts.reportingOptions?.appendOnly) {
return most.join(
most.mergeArray(outputs)
.map((log: most.Stream<{msg: string}>) => log.map((msg) => msg.msg))
)
return Rx.merge(...outputs)
.pipe(
map((log: Rx.Observable<{msg: string}>) => log.pipe(map((msg) => msg.msg))),
mergeAll()
)
}
return mergeOutputs(outputs).multicast()
return mergeOutputs(outputs)
}

View File

@@ -1,40 +1,42 @@
import { EOL } from './constants'
import most = require('most')
import * as Rx from 'rxjs'
import { filter, map, mergeAll, scan } from 'rxjs/operators'
export default function mergeOutputs (outputs: Array<most.Stream<most.Stream<{msg: string}>>>): most.Stream<string> {
export default function mergeOutputs (outputs: Array<Rx.Observable<Rx.Observable<{msg: string}>>>): Rx.Observable<string> {
let blockNo = 0
let fixedBlockNo = 0
let started = false
return most.join(
most.mergeArray(outputs)
.map((log: most.Stream<{msg: string}>) => {
let currentBlockNo = -1
let currentFixedBlockNo = -1
return log
.map((msg) => {
if (msg['fixed']) {
if (currentFixedBlockNo === -1) {
currentFixedBlockNo = fixedBlockNo++
}
return {
blockNo: currentFixedBlockNo,
fixed: true,
msg: msg.msg,
}
}
if (currentBlockNo === -1) {
currentBlockNo = blockNo++
let previousOuput: string | null = null
return Rx.merge(...outputs).pipe(
map((log: Rx.Observable<{msg: string}>) => {
let currentBlockNo = -1
let currentFixedBlockNo = -1
return log.pipe(
map((msg) => {
if (msg['fixed']) {
if (currentFixedBlockNo === -1) {
currentFixedBlockNo = fixedBlockNo++
}
return {
blockNo: currentBlockNo,
fixed: false,
msg: typeof msg === 'string' ? msg : msg.msg, // eslint-disable-line
prevFixedBlockNo: currentFixedBlockNo,
blockNo: currentFixedBlockNo,
fixed: true,
msg: msg.msg,
}
})
})
)
.scan((acc, log) => {
}
if (currentBlockNo === -1) {
currentBlockNo = blockNo++
}
return {
blockNo: currentBlockNo,
fixed: false,
msg: typeof msg === 'string' ? msg : msg.msg, // eslint-disable-line
prevFixedBlockNo: currentFixedBlockNo,
}
})
)
}),
mergeAll(),
scan((acc, log) => {
if (log.fixed) {
acc.fixedBlocks[log.blockNo] = log.msg
} else {
@@ -42,8 +44,8 @@ export default function mergeOutputs (outputs: Array<most.Stream<most.Stream<{ms
acc.blocks[log.blockNo] = log.msg
}
return acc
}, { fixedBlocks: [], blocks: [] } as {fixedBlocks: string[], blocks: string[]})
.map((sections) => {
}, { fixedBlocks: [], blocks: [] } as {fixedBlocks: string[], blocks: string[]}),
map((sections) => {
const fixedBlocks = sections.fixedBlocks.filter(Boolean)
const nonFixedPart = sections.blocks.filter(Boolean).join(EOL)
if (!fixedBlocks.length) {
@@ -54,14 +56,21 @@ export default function mergeOutputs (outputs: Array<most.Stream<most.Stream<{ms
return fixedPart
}
return `${nonFixedPart}${EOL}${fixedPart}`
})
.filter((msg) => {
}),
filter((msg) => {
if (started) {
return true
}
if (msg === '') return false
started = true
return true
}),
filter((msg) => {
if (msg !== previousOuput) {
previousOuput = msg
return true
}
return false
})
.skipRepeats()
)
}

View File

@@ -1,6 +1,7 @@
import { Config } from '@pnpm/config'
import * as logs from '@pnpm/core-loggers'
import { LogLevel } from '@pnpm/logger'
import * as Rx from 'rxjs'
import reportBigTarballsProgress from './reportBigTarballsProgress'
import reportContext from './reportContext'
import reportDeprecations from './reportDeprecations'
@@ -14,29 +15,28 @@ import reportScope from './reportScope'
import reportSkippedOptionalDependencies from './reportSkippedOptionalDependencies'
import reportStats from './reportStats'
import reportSummary from './reportSummary'
import most = require('most')
export default function (
log$: {
context: most.Stream<logs.ContextLog>
fetchingProgress: most.Stream<logs.FetchingProgressLog>
progress: most.Stream<logs.ProgressLog>
stage: most.Stream<logs.StageLog>
deprecation: most.Stream<logs.DeprecationLog>
summary: most.Stream<logs.SummaryLog>
lifecycle: most.Stream<logs.LifecycleLog>
stats: most.Stream<logs.StatsLog>
installCheck: most.Stream<logs.InstallCheckLog>
registry: most.Stream<logs.RegistryLog>
root: most.Stream<logs.RootLog>
packageManifest: most.Stream<logs.PackageManifestLog>
requestRetry: most.Stream<logs.RequestRetryLog>
link: most.Stream<logs.LinkLog>
other: most.Stream<logs.Log>
hook: most.Stream<logs.HookLog>
scope: most.Stream<logs.ScopeLog>
skippedOptionalDependency: most.Stream<logs.SkippedOptionalDependencyLog>
packageImportMethod: most.Stream<logs.PackageImportMethodLog>
context: Rx.Observable<logs.ContextLog>
fetchingProgress: Rx.Observable<logs.FetchingProgressLog>
progress: Rx.Observable<logs.ProgressLog>
stage: Rx.Observable<logs.StageLog>
deprecation: Rx.Observable<logs.DeprecationLog>
summary: Rx.Observable<logs.SummaryLog>
lifecycle: Rx.Observable<logs.LifecycleLog>
stats: Rx.Observable<logs.StatsLog>
installCheck: Rx.Observable<logs.InstallCheckLog>
registry: Rx.Observable<logs.RegistryLog>
root: Rx.Observable<logs.RootLog>
packageManifest: Rx.Observable<logs.PackageManifestLog>
requestRetry: Rx.Observable<logs.RequestRetryLog>
link: Rx.Observable<logs.LinkLog>
other: Rx.Observable<logs.Log>
hook: Rx.Observable<logs.HookLog>
scope: Rx.Observable<logs.ScopeLog>
skippedOptionalDependency: Rx.Observable<logs.SkippedOptionalDependencyLog>
packageImportMethod: Rx.Observable<logs.PackageImportMethodLog>
},
opts: {
appendOnly?: boolean
@@ -49,11 +49,11 @@ export default function (
throttleProgress?: number
width?: number
}
): Array<most.Stream<most.Stream<{msg: string}>>> {
): Array<Rx.Observable<Rx.Observable<{msg: string}>>> {
const width = opts.width ?? process.stdout.columns ?? 80
const cwd = opts.pnpmConfig?.dir ?? process.cwd()
const outputs: Array<most.Stream<most.Stream<{msg: string}>>> = [
const outputs: Array<Rx.Observable<Rx.Observable<{msg: string}>>> = [
reportProgress(log$, {
cwd,
throttleProgress: opts.throttleProgress,

View File

@@ -1,6 +1,7 @@
import * as logs from '@pnpm/core-loggers'
import { PackageManifest } from '@pnpm/types'
import most = require('most')
import * as Rx from 'rxjs'
import { filter, map, mapTo, reduce, scan, startWith, take } from 'rxjs/operators'
import R = require('ramda')
export interface PackageDiff {
@@ -27,28 +28,31 @@ export const propertyByDependencyType = {
export default function (
log$: {
deprecation: most.Stream<logs.DeprecationLog>
summary: most.Stream<logs.SummaryLog>
root: most.Stream<logs.RootLog>
packageManifest: most.Stream<logs.PackageManifestLog>
deprecation: Rx.Observable<logs.DeprecationLog>
summary: Rx.Observable<logs.SummaryLog>
root: Rx.Observable<logs.RootLog>
packageManifest: Rx.Observable<logs.PackageManifestLog>
},
opts: {
prefix: string
}
) {
const deprecationSet$ = log$.deprecation
.filter((log) => log.prefix === opts.prefix)
.scan((acc, log) => {
acc.add(log.pkgId)
return acc
}, new Set())
.pipe(
filter((log) => log.prefix === opts.prefix),
scan((acc, log) => {
acc.add(log.pkgId)
return acc
}, new Set()),
startWith(new Set())
)
const pkgsDiff$ = most.combine(
(rootLog, deprecationSet) => [rootLog, deprecationSet],
log$.root.filter((log) => log.prefix === opts.prefix),
const filterPrefix = filter((log: { prefix: string }) => log.prefix === opts.prefix)
const pkgsDiff$ = Rx.combineLatest(
log$.root.pipe(filterPrefix),
deprecationSet$
)
.scan((pkgsDiff, args) => {
).pipe(
scan((pkgsDiff, args) => {
const rootLog = args[0]
const deprecationSet = args[1] as Set<string>
if (rootLog['added']) {
@@ -83,57 +87,80 @@ export default function (
nodeModulesOnly: Map<PackageDiff>
optional: Map<PackageDiff>
prod: Map<PackageDiff>
}),
startWith({
dev: {},
nodeModulesOnly: {},
optional: {},
peer: {},
prod: {},
})
const packageManifest$ = most.fromPromise(
most.merge(
log$.packageManifest.filter((log) => log.prefix === opts.prefix),
log$.summary.filter((log) => log.prefix === opts.prefix).constant({})
)
.take(2)
.reduce(R.merge, {} as any) // eslint-disable-line @typescript-eslint/no-explicit-any
)
return most.combine(
(pkgsDiff, packageManifests: { initial?: PackageManifest, updated?: PackageManifest }) => {
if (!packageManifests['initial'] || !packageManifests['updated']) return pkgsDiff
const packageManifest$ = Rx.merge(
log$.packageManifest.pipe(filterPrefix),
log$.summary.pipe(filterPrefix, mapTo({}))
)
.pipe(
take(2),
reduce(R.merge, {} as any) // eslint-disable-line @typescript-eslint/no-explicit-any
)
const initialPackageManifest = removeOptionalFromProdDeps(packageManifests['initial'])
const updatedPackageManifest = removeOptionalFromProdDeps(packageManifests['updated'])
for (const depType of ['peer', 'prod', 'optional', 'dev']) {
const prop = propertyByDependencyType[depType]
const initialDeps = Object.keys(initialPackageManifest[prop] || {})
const updatedDeps = Object.keys(updatedPackageManifest[prop] || {})
const removedDeps = R.difference(initialDeps, updatedDeps)
for (const removedDep of removedDeps) {
if (!pkgsDiff[depType][`-${removedDep}`]) {
pkgsDiff[depType][`-${removedDep}`] = {
added: false,
name: removedDep,
version: initialPackageManifest[prop][removedDep],
}
}
}
const addedDeps = R.difference(updatedDeps, initialDeps)
for (const addedDep of addedDeps) {
if (!pkgsDiff[depType][`+${addedDep}`]) {
pkgsDiff[depType][`+${addedDep}`] = {
added: true,
name: addedDep,
version: updatedPackageManifest[prop][addedDep],
}
}
}
}
return pkgsDiff
},
return Rx.combineLatest(
pkgsDiff$,
packageManifest$
)
.pipe(
map(
([pkgsDiff, packageManifests]: [
{
dev: Map<PackageDiff>
nodeModulesOnly: Map<PackageDiff>
optional: Map<PackageDiff>
prod: Map<PackageDiff>
},
{
initial?: PackageManifest
updated?: PackageManifest
}
]) => {
if (!packageManifests['initial'] || !packageManifests['updated']) return pkgsDiff
const initialPackageManifest = removeOptionalFromProdDeps(packageManifests['initial'])
const updatedPackageManifest = removeOptionalFromProdDeps(packageManifests['updated'])
for (const depType of ['peer', 'prod', 'optional', 'dev']) {
const prop = propertyByDependencyType[depType]
const initialDeps = Object.keys(initialPackageManifest[prop] || {})
const updatedDeps = Object.keys(updatedPackageManifest[prop] || {})
const removedDeps = R.difference(initialDeps, updatedDeps)
for (const removedDep of removedDeps) {
if (!pkgsDiff[depType][`-${removedDep}`]) {
pkgsDiff[depType][`-${removedDep}`] = {
added: false,
name: removedDep,
version: initialPackageManifest[prop][removedDep],
}
}
}
const addedDeps = R.difference(updatedDeps, initialDeps)
for (const addedDep of addedDeps) {
if (!pkgsDiff[depType][`+${addedDep}`]) {
pkgsDiff[depType][`+${addedDep}`] = {
added: true,
name: addedDep,
version: updatedPackageManifest[prop][addedDep],
}
}
}
}
return pkgsDiff
}
)
)
}
function removeOptionalFromProdDeps (pkg: PackageManifest): PackageManifest {

View File

@@ -1,32 +1,33 @@
import { FetchingProgressLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { filter, map, startWith } from 'rxjs/operators'
import {
hlPkgId,
hlValue,
} from './outputConstants'
import most = require('most')
import prettyBytes = require('pretty-bytes')
const BIG_TARBALL_SIZE = 1024 * 1024 * 5 // 5 MB
export default (
log$: {
fetchingProgress: most.Stream<FetchingProgressLog>
fetchingProgress: Rx.Observable<FetchingProgressLog>
}
) => {
return log$.fetchingProgress
.filter((log) => log.status === 'started' &&
return log$.fetchingProgress.pipe(
filter((log: FetchingProgressLog) => log.status === 'started' &&
typeof log.size === 'number' && log.size >= BIG_TARBALL_SIZE &&
// When retrying the download, keep the existing progress line.
// Fixing issue: https://github.com/pnpm/pnpm/issues/1013
log.attempt === 1
)
.map((startedLog) => {
),
map((startedLog: FetchingProgressLog) => {
const size = prettyBytes(startedLog['size'])
return log$.fetchingProgress
.filter((log) => log.status === 'in_progress' && log.packageId === startedLog['packageId'])
.map((log) => log['downloaded'])
.startWith(0)
.map((downloadedRaw) => {
return log$.fetchingProgress.pipe(
filter((log: FetchingProgressLog) => log.status === 'in_progress' && log.packageId === startedLog['packageId']),
map((log: FetchingProgressLog) => log['downloaded']),
startWith(0),
map((downloadedRaw: number) => {
const done = startedLog['size'] === downloadedRaw
const downloaded = prettyBytes(downloadedRaw)
return {
@@ -34,5 +35,7 @@ export default (
msg: `Downloading ${hlPkgId(startedLog['packageId'])}: ${hlValue(downloaded)}/${hlValue(size)}${done ? ', done' : ''}`,
}
})
)
})
)
}

View File

@@ -1,42 +1,47 @@
import { ContextLog, PackageImportMethodLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { map, take } from 'rxjs/operators'
import path = require('path')
import most = require('most')
export default (
log$: {
context: most.Stream<ContextLog>
packageImportMethod: most.Stream<PackageImportMethodLog>
context: Rx.Observable<ContextLog>
packageImportMethod: Rx.Observable<PackageImportMethodLog>
},
opts: { cwd: string }
) => {
return most.combine(
(context, packageImportMethod) => {
if (context.currentLockfileExists) {
return most.never()
}
let method!: string
switch (packageImportMethod.method) {
case 'copy':
method = 'copied'
break
case 'clone':
method = 'cloned'
break
case 'hardlink':
method = 'hard linked'
break
default:
method = packageImportMethod.method
break
}
return most.of({
msg: `\
return Rx.combineLatest(
log$.context.pipe(take(1)),
log$.packageImportMethod.pipe(take(1))
)
.pipe(
map(
([context, packageImportMethod]) => {
if (context.currentLockfileExists) {
return Rx.NEVER
}
let method!: string
switch (packageImportMethod.method) {
case 'copy':
method = 'copied'
break
case 'clone':
method = 'cloned'
break
case 'hardlink':
method = 'hard linked'
break
default:
method = packageImportMethod.method
break
}
return Rx.of({
msg: `\
Packages are ${method} from the content-addressable store to the virtual store.
Content-addressable store is at: ${context.storeDir}
Virtual store is at: ${path.relative(opts.cwd, context.virtualStoreDir)}`,
})
},
log$.context.take(1),
log$.packageImportMethod.take(1)
)
})
}
)
)
}

View File

@@ -1,28 +1,29 @@
import { DeprecationLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { filter, map } from 'rxjs/operators'
import formatWarn from './utils/formatWarn'
import { zoomOut } from './utils/zooming'
import chalk = require('chalk')
import most = require('most')
export default (
deprecation$: most.Stream<DeprecationLog>,
deprecation$: Rx.Observable<DeprecationLog>,
opts: {
cwd: string
isRecursive: boolean
}
) => {
return deprecation$
return deprecation$.pipe(
// print warnings only about deprecated packages from the root
.filter((log) => log.depth === 0)
.map((log) => {
filter((log) => log.depth === 0),
map((log) => {
if (!opts.isRecursive && log.prefix === opts.cwd) {
return {
return Rx.of({
msg: formatWarn(`${chalk.red('deprecated')} ${log.pkgName}@${log.pkgVersion}: ${log.deprecated}`),
}
})
}
return {
return Rx.of({
msg: zoomOut(opts.cwd, log.prefix, formatWarn(`${chalk.red('deprecated')} ${log.pkgName}@${log.pkgVersion}`)),
}
})
})
.map(most.of)
)
}

View File

@@ -1,17 +1,18 @@
import { HookLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { map } from 'rxjs/operators'
import { autozoom } from './utils/zooming'
import chalk = require('chalk')
import most = require('most')
export default (
hook$: most.Stream<HookLog>,
hook$: Rx.Observable<HookLog>,
opts: {
cwd: string
isRecursive: boolean
}
) => {
return hook$
.map((log) => ({
return hook$.pipe(
map((log) => Rx.of({
msg: autozoom(
opts.cwd,
log.prefix,
@@ -21,5 +22,5 @@ export default (
}
),
}))
.map(most.of)
)
}

View File

@@ -1,19 +1,20 @@
import { InstallCheckLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { filter, map } from 'rxjs/operators'
import formatWarn from './utils/formatWarn'
import { autozoom } from './utils/zooming'
import most = require('most')
export default (
installCheck$: most.Stream<InstallCheckLog>,
installCheck$: Rx.Observable<InstallCheckLog>,
opts: {
cwd: string
}
) => {
return installCheck$
.map(formatInstallCheck.bind(null, opts.cwd))
.filter(Boolean)
.map((msg) => ({ msg }))
.map(most.of) as most.Stream<most.Stream<{msg: string}>>
return installCheck$.pipe(
map((log) => formatInstallCheck(opts.cwd, log)),
filter(Boolean),
map((msg) => Rx.of({ msg }))
)
}
function formatInstallCheck (

View File

@@ -1,13 +1,13 @@
import { LifecycleLog } from '@pnpm/core-loggers'
import PushStream from '@zkochan/zen-push'
import * as Rx from 'rxjs'
import { map } from 'rxjs/operators'
import { EOL } from '../constants'
import formatPrefix, { formatPrefixNoTrim } from './utils/formatPrefix'
import {
hlValue,
} from './outputConstants'
import formatPrefix, { formatPrefixNoTrim } from './utils/formatPrefix'
import path = require('path')
import chalk = require('chalk')
import most = require('most')
import path = require('path')
import prettyTime = require('pretty-time')
import stripAnsi = require('strip-ansi')
@@ -24,7 +24,7 @@ type ColorByPkg = Map<string, (txt: string) => string>
export default (
log$: {
lifecycle: most.Stream<LifecycleLog>
lifecycle: Rx.Observable<LifecycleLog>
},
opts: {
appendOnly?: boolean
@@ -36,10 +36,11 @@ export default (
// in order to reduce flickering
if (opts.appendOnly) {
const streamLifecycleOutput = createStreamLifecycleOutput(opts.cwd)
return log$.lifecycle
.map((log: LifecycleLog) => most.of({
return log$.lifecycle.pipe(
map((log: LifecycleLog) => Rx.of({
msg: streamLifecycleOutput(log),
}))
)
}
const lifecycleMessages: {
[depPath: string]: {
@@ -51,9 +52,9 @@ export default (
}
} = {}
const lifecycleStreamByDepPath: {
[depPath: string]: PushStream<{ msg: string }>
[depPath: string]: Rx.Subject<{ msg: string }>
} = {}
const lifecyclePushStream = new PushStream()
const lifecyclePushStream = new Rx.Subject<Rx.Observable<{ msg: string }>>()
// TODO: handle promise of .forEach?!
log$.lifecycle // eslint-disable-line
@@ -76,8 +77,8 @@ export default (
delete lifecycleMessages[key]
}
if (!lifecycleStreamByDepPath[key]) {
lifecycleStreamByDepPath[key] = new PushStream()
lifecyclePushStream.next(most.from(lifecycleStreamByDepPath[key].observable))
lifecycleStreamByDepPath[key] = new Rx.Subject<{ msg: string }>()
lifecyclePushStream.next(Rx.from(lifecycleStreamByDepPath[key]))
}
lifecycleStreamByDepPath[key].next({ msg })
if (exit) {
@@ -85,7 +86,7 @@ export default (
}
})
return most.from(lifecyclePushStream.observable)
return Rx.from(lifecyclePushStream)
}
function renderCollapsedScriptOutput (

View File

@@ -1,12 +1,12 @@
import { Config } from '@pnpm/config'
import { Log, RegistryLog } from '@pnpm/core-loggers'
import { LogLevel } from '@pnpm/logger'
import PushStream from '@zkochan/zen-push'
import reportError from '../reportError'
import formatWarn from './utils/formatWarn'
import { autozoom } from './utils/zooming'
import * as Rx from 'rxjs'
import { filter, map } from 'rxjs/operators'
import os = require('os')
import most = require('most')
// eslint-disable:object-literal-sort-keys
const LOG_LEVEL_NUMBER: Record<LogLevel, number> = {
@@ -21,8 +21,8 @@ const MAX_SHOWN_WARNINGS = 5
export default (
log$: {
registry: most.Stream<RegistryLog>
other: most.Stream<Log>
registry: Rx.Observable<RegistryLog>
other: Rx.Observable<Log>
},
opts: {
cwd: string
@@ -33,25 +33,26 @@ export default (
) => {
const maxLogLevel = LOG_LEVEL_NUMBER[opts.logLevel ?? 'info'] ?? LOG_LEVEL_NUMBER['info']
const reportWarning = makeWarningReporter(opts)
return most.merge(log$.registry, log$.other)
.filter((obj) => LOG_LEVEL_NUMBER[obj.level] <= maxLogLevel &&
(obj.level !== 'info' || !obj['prefix'] || obj['prefix'] === opts.cwd))
.map((obj) => {
return Rx.merge(log$.registry, log$.other).pipe(
filter((obj) => LOG_LEVEL_NUMBER[obj.level] <= maxLogLevel &&
(obj.level !== 'info' || !obj['prefix'] || obj['prefix'] === opts.cwd)),
map((obj) => {
switch (obj.level) {
case 'warn': {
return reportWarning(obj)
}
case 'error':
if (obj['message']?.['prefix'] && obj['message']['prefix'] !== opts.cwd) {
return most.of({
return Rx.of({
msg: `${obj['message']['prefix'] as string}:` + os.EOL + reportError(obj, opts.config),
})
}
return most.of({ msg: reportError(obj, opts.config) })
return Rx.of({ msg: reportError(obj, opts.config) })
default:
return most.of({ msg: obj['message'] })
return Rx.of({ msg: obj['message'] })
}
})
)
}
// Sometimes, when installing new dependencies that rely on many peer dependencies,
@@ -65,21 +66,21 @@ function makeWarningReporter (
}
) {
let warningsCounter = 0
let collapsedWarnings: PushStream<{ msg: string }>
let collapsedWarnings: Rx.Subject<{ msg: string }>
return (obj: { prefix: string, message: string }) => {
warningsCounter++
if (warningsCounter <= MAX_SHOWN_WARNINGS) {
return most.of({ msg: autozoom(opts.cwd, obj.prefix, formatWarn(obj.message), opts) })
return Rx.of({ msg: autozoom(opts.cwd, obj.prefix, formatWarn(obj.message), opts) })
}
const warningMsg = formatWarn(`${warningsCounter - MAX_SHOWN_WARNINGS} other warnings`)
if (!collapsedWarnings) {
collapsedWarnings = new PushStream()
collapsedWarnings = new Rx.Subject()
// For some reason, without using setTimeout, the warning summary is printed above the rest of the warnings
// Even though the summary event happens last. Probably a bug in "most".
setTimeout(() => collapsedWarnings.next({ msg: warningMsg }), 0)
return most.from(collapsedWarnings.observable)
return Rx.from(collapsedWarnings)
}
setTimeout(() => collapsedWarnings!.next({ msg: warningMsg }), 0)
return most.never()
return Rx.NEVER
}
}

View File

@@ -1,8 +1,8 @@
import { ProgressLog, StageLog } from '@pnpm/core-loggers'
import PushStream from '@zkochan/zen-push'
import * as Rx from 'rxjs'
import { filter, map, mapTo, sampleTime, takeWhile, startWith, take } from 'rxjs/operators'
import { hlValue } from './outputConstants'
import { zoomOut } from './utils/zooming'
import most = require('most')
interface ProgressStats {
fetched: number
@@ -11,15 +11,15 @@ interface ProgressStats {
}
interface ModulesInstallProgress {
importingDone$: most.Stream<boolean>
progress$: most.Stream<ProgressStats>
importingDone$: Rx.Observable<boolean>
progress$: Rx.Observable<ProgressStats>
requirer: string
}
export default (
log$: {
progress: most.Stream<ProgressLog>
stage: most.Stream<StageLog>
progress: Rx.Observable<ProgressLog>
stage: Rx.Observable<StageLog>
},
opts: {
cwd: string
@@ -30,101 +30,100 @@ export default (
? throttledProgressOutput.bind(null, opts.throttleProgress)
: nonThrottledProgressOutput
return getModulesInstallProgress$(log$.stage, log$.progress)
.map(({ importingDone$, progress$, requirer }) => {
return getModulesInstallProgress$(log$.stage, log$.progress).pipe(
map(({ importingDone$, progress$, requirer }) => {
const output$ = progressOutput(importingDone$, progress$)
if (requirer === opts.cwd) {
return output$
}
return output$.map((msg: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
msg['msg'] = zoomOut(opts.cwd, requirer, msg['msg'])
return msg
})
return output$.pipe(
map((msg: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
msg['msg'] = zoomOut(opts.cwd, requirer, msg['msg'])
return msg
})
)
})
)
}
function throttledProgressOutput (
throttleProgress: number,
importingDone$: most.Stream<boolean>,
progress$: most.Stream<ProgressStats>
importingDone$: Rx.Observable<boolean>,
progress$: Rx.Observable<ProgressStats>
) {
// Reporting is done every `throttleProgress` milliseconds
// and once all packages are fetched.
const sampler = most.merge(
most.periodic(throttleProgress).until(importingDone$.skip(1)),
return Rx.combineLatest(
progress$.pipe(sampleTime(throttleProgress)),
importingDone$
)
return most.sample(
createStatusMessage,
sampler,
progress$,
importingDone$
)
// Avoid logs after all resolved packages were downloaded.
// Fixing issue: https://github.com/pnpm/pnpm/issues/1028#issuecomment-364782901
.skipAfter((msg) => msg['done'] === true)
.pipe(
map(createStatusMessage),
// Avoid logs after all resolved packages were downloaded.
// Fixing issue: https://github.com/pnpm/pnpm/issues/1028#issuecomment-364782901
takeWhile((msg) => msg['done'] !== true, true)
)
}
function nonThrottledProgressOutput (
importingDone$: most.Stream<boolean>,
progress$: most.Stream<ProgressStats>
importingDone$: Rx.Observable<boolean>,
progress$: Rx.Observable<ProgressStats>
) {
return most.combine(
createStatusMessage,
return Rx.combineLatest(
progress$,
importingDone$
)
.pipe(map(createStatusMessage))
}
function getModulesInstallProgress$ (
stage$: most.Stream<StageLog>,
progress$: most.Stream<ProgressLog>
): most.Stream<ModulesInstallProgress> {
const modulesInstallProgressPushStream = new PushStream<ModulesInstallProgress>()
stage$: Rx.Observable<StageLog>,
progress$: Rx.Observable<ProgressLog>
): Rx.Observable<ModulesInstallProgress> {
const modulesInstallProgressPushStream = new Rx.Subject<ModulesInstallProgress>()
const progessStatsPushStreamByRequirer = getProgessStatsPushStreamByRequirer(progress$)
const stagePushStreamByRequirer: {
[requirer: string]: PushStream<StageLog>
[requirer: string]: Rx.Subject<StageLog>
} = {}
stage$
.forEach((log: StageLog) => {
if (!stagePushStreamByRequirer[log.prefix]) {
stagePushStreamByRequirer[log.prefix] = new PushStream<StageLog>()
stagePushStreamByRequirer[log.prefix] = new Rx.Subject<StageLog>()
if (!progessStatsPushStreamByRequirer[log.prefix]) {
progessStatsPushStreamByRequirer[log.prefix] = new PushStream()
progessStatsPushStreamByRequirer[log.prefix] = new Rx.Subject()
}
modulesInstallProgressPushStream.next({
importingDone$: stage$ToImportingDone$(most.from(stagePushStreamByRequirer[log.prefix].observable)),
progress$: most.from(progessStatsPushStreamByRequirer[log.prefix].observable),
importingDone$: stage$ToImportingDone$(Rx.from(stagePushStreamByRequirer[log.prefix])),
progress$: Rx.from(progessStatsPushStreamByRequirer[log.prefix]),
requirer: log.prefix,
})
}
setTimeout(() => { // without this, `pnpm m i` might hang in some cases
stagePushStreamByRequirer[log.prefix].next(log)
if (log.stage === 'importing_done') {
progessStatsPushStreamByRequirer[log.prefix].complete()
stagePushStreamByRequirer[log.prefix].complete()
}
}, 0)
stagePushStreamByRequirer[log.prefix].next(log)
if (log.stage === 'importing_done') {
progessStatsPushStreamByRequirer[log.prefix].complete()
stagePushStreamByRequirer[log.prefix].complete()
}
})
.catch(() => {})
return most.from(modulesInstallProgressPushStream.observable)
return Rx.from(modulesInstallProgressPushStream)
}
function stage$ToImportingDone$ (stage$: most.Stream<StageLog>) {
function stage$ToImportingDone$ (stage$: Rx.Observable<StageLog>) {
return stage$
.filter((log: StageLog) => log.stage === 'importing_done')
.constant(true)
.take(1)
.startWith(false)
.multicast()
.pipe(
filter((log: StageLog) => log.stage === 'importing_done'),
mapTo(true),
take(1),
startWith(false)
)
}
function getProgessStatsPushStreamByRequirer (progress$: most.Stream<ProgressLog>) {
function getProgessStatsPushStreamByRequirer (progress$: Rx.Observable<ProgressLog>) {
const progessStatsPushStreamByRequirer: {
[requirer: string]: PushStream<ProgressStats>
[requirer: string]: Rx.Subject<ProgressStats>
} = {}
const previousProgressStatsByRequirer: { [requirer: string]: ProgressStats } = {}
@@ -149,7 +148,7 @@ function getProgessStatsPushStreamByRequirer (progress$: most.Stream<ProgressLog
break
}
if (!progessStatsPushStreamByRequirer[log.requester]) {
progessStatsPushStreamByRequirer[log.requester] = new PushStream<ProgressStats>()
progessStatsPushStreamByRequirer[log.requester] = new Rx.Subject<ProgressStats>()
}
progessStatsPushStreamByRequirer[log.requester].next(previousProgressStatsByRequirer[log.requester])
})
@@ -158,10 +157,7 @@ function getProgessStatsPushStreamByRequirer (progress$: most.Stream<ProgressLog
return progessStatsPushStreamByRequirer
}
function createStatusMessage (
progress: ProgressStats,
importingDone: boolean
) {
function createStatusMessage ([progress, importingDone]: [ProgressStats, boolean]) {
const msg = `Resolving: total ${hlValue(progress.resolved.toString())}, reused ${hlValue(progress.reused.toString())}, downloaded ${hlValue(progress.fetched.toString())}`
if (importingDone) {
return {

View File

@@ -1,19 +1,21 @@
import { RequestRetryLog } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import { map } from 'rxjs/operators'
import formatWarn from './utils/formatWarn'
import most = require('most')
import prettyMilliseconds = require('pretty-ms')
export default (
requestRetry$: most.Stream<RequestRetryLog>
requestRetry$: Rx.Observable<RequestRetryLog>
) => {
return requestRetry$
.map((log) => {
return requestRetry$.pipe(
map((log) => {
const retriesLeft = log.maxRetries - log.attempt + 1
const errorCode = log.error['httpStatusCode'] || log.error['status'] || log.error['errno'] || log.error['code']
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
const msg = `${log.method} ${log.url} error (${errorCode}). \
Will retry in ${prettyMilliseconds(log.timeout, { verbose: true })}. \
${retriesLeft} retries left.`
return most.of({ msg: formatWarn(msg) })
return Rx.of({ msg: formatWarn(msg) })
})
)
}

View File

@@ -1,5 +1,6 @@
import { ScopeLog } from '@pnpm/core-loggers'
import most = require('most')
import * as Rx from 'rxjs'
import { map, take } from 'rxjs/operators'
const COMMANDS_THAT_REPORT_SCOPE = new Set([
'install',
@@ -14,21 +15,21 @@ const COMMANDS_THAT_REPORT_SCOPE = new Set([
])
export default (
scope$: most.Stream<ScopeLog>,
scope$: Rx.Observable<ScopeLog>,
opts: {
isRecursive: boolean
cmd: string
}
) => {
if (!COMMANDS_THAT_REPORT_SCOPE.has(opts.cmd)) {
return most.never()
return Rx.NEVER
}
return scope$
.take(1)
.map((log) => {
return scope$.pipe(
take(1),
map((log) => {
if (log.selected === 1 && typeof log.total !== 'number') {
if (!log.workspacePrefix) return most.never()
if (!opts.isRecursive) return most.of({ msg: 'Scope: current workspace package' })
if (!log.workspacePrefix) return Rx.NEVER
if (!opts.isRecursive) return Rx.of({ msg: 'Scope: current workspace package' })
}
let msg = 'Scope: '
@@ -47,6 +48,7 @@ export default (
msg += ' projects'
}
return most.of({ msg })
return Rx.of({ msg })
})
)
}

View File

@@ -1,18 +1,20 @@
import { SkippedOptionalDependencyLog } from '@pnpm/core-loggers'
import most = require('most')
import * as Rx from 'rxjs'
import { filter, map } from 'rxjs/operators'
export default (
skippedOptionalDependency$: most.Stream<SkippedOptionalDependencyLog>,
skippedOptionalDependency$: Rx.Observable<SkippedOptionalDependencyLog>,
opts: {
cwd: string
}
) => {
return skippedOptionalDependency$
.filter((log) => Boolean(log['prefix'] === opts.cwd && log.parents && log.parents.length === 0))
.map((log) => most.of({
return skippedOptionalDependency$.pipe(
filter((log) => Boolean(log['prefix'] === opts.cwd && log.parents && log.parents.length === 0)),
map((log) => Rx.of({
msg: `info: ${
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
log.package['id'] || log.package.name && (`${log.package.name}@${log.package.version}`) || log.package['pref']
} is an optional dependency and failed compatibility check. Excluding it from installation.`,
}))
)
}

View File

@@ -1,18 +1,19 @@
import { StatsLog } from '@pnpm/core-loggers'
import { zoomOut } from './utils/zooming'
import * as Rx from 'rxjs'
import { filter, take, reduce, map } from 'rxjs/operators'
import { EOL } from '../constants'
import {
ADDED_CHAR,
REMOVED_CHAR,
} from './outputConstants'
import { zoomOut } from './utils/zooming'
import chalk = require('chalk')
import most = require('most')
import R = require('ramda')
import stringLength = require('string-length')
export default (
log$: {
stats: most.Stream<StatsLog>
stats: Rx.Observable<StatsLog>
},
opts: {
cmd: string
@@ -23,7 +24,7 @@ export default (
) => {
const stats$ = opts.isRecursive
? log$.stats
: log$.stats.filter((log) => log.prefix !== opts.cwd)
: log$.stats.pipe(filter((log) => log.prefix !== opts.cwd))
const outputs = [
statsForNotCurrentPackage(stats$, {
@@ -45,32 +46,30 @@ export default (
}
function statsForCurrentPackage (
stats$: most.Stream<StatsLog>,
stats$: Rx.Observable<StatsLog>,
opts: {
cmd: string
currentPrefix: string
width: number
}
) {
return most.fromPromise(
stats$
.filter((log) => log.prefix === opts.currentPrefix)
.take((opts.cmd === 'install' || opts.cmd === 'install-test' || opts.cmd === 'add' || opts.cmd === 'update') ? 2 : 1)
.reduce((acc, log) => {
if (typeof log['added'] === 'number') {
acc['added'] = log['added']
} else if (typeof log['removed'] === 'number') {
acc['removed'] = log['removed']
}
return acc
}, {})
)
.map((stats) => {
return stats$.pipe(
filter((log) => log.prefix === opts.currentPrefix),
take((opts.cmd === 'install' || opts.cmd === 'install-test' || opts.cmd === 'add' || opts.cmd === 'update') ? 2 : 1),
reduce((acc, log) => {
if (typeof log['added'] === 'number') {
acc['added'] = log['added']
} else if (typeof log['removed'] === 'number') {
acc['removed'] = log['removed']
}
return acc
}, {}),
map((stats) => {
if (!stats['removed'] && !stats['added']) {
if (opts.cmd === 'link') {
return most.never()
return Rx.NEVER
}
return most.of({ msg: 'Already up-to-date' })
return Rx.of({ msg: 'Already up-to-date' })
}
let msg = 'Packages:'
@@ -83,22 +82,24 @@ function statsForCurrentPackage (
msg += ' ' + chalk.red(`-${stats['removed'].toString()}`)
}
msg += EOL + printPlusesAndMinuses(opts.width, (stats['added'] || 0), (stats['removed'] || 0))
return most.of({ msg })
return Rx.of({ msg })
})
)
}
function statsForNotCurrentPackage (
stats$: most.Stream<StatsLog>,
stats$: Rx.Observable<StatsLog>,
opts: {
cmd: string
currentPrefix: string
width: number
}
) {
const stats = {}
const cookedStats$ = (
opts.cmd !== 'remove'
? stats$
.loop((stats, log) => {
? stats$.pipe(
map((log) => {
// As of pnpm v2.9.0, during `pnpm recursive link`, logging of removed stats happens twice
// 1. during linking
// 2. during installing
@@ -115,14 +116,15 @@ function statsForNotCurrentPackage (
} else {
const value = { ...stats[log.prefix], ...log }
delete stats[log.prefix]
return { seed: stats, value }
return value
}
}, {})
)
: stats$
)
return cookedStats$
.filter((stats) => stats !== null && (stats['removed'] || stats['added']))
.map((stats) => {
return cookedStats$.pipe(
filter((stats) => stats !== null && (stats['removed'] || stats['added'])),
map((stats) => {
const parts = [] as string[]
if (stats['added']) {
@@ -137,8 +139,9 @@ function statsForNotCurrentPackage (
let msg = zoomOut(opts.currentPrefix, stats['prefix'], parts.join(' '))
const rest = Math.max(0, opts.width - 1 - stringLength(msg))
msg += ' ' + printPlusesAndMinuses(rest, roundStats(stats['added'] || 0), roundStats(stats['removed'] || 0))
return most.of({ msg })
return Rx.of({ msg })
})
)
}
function padStep (s: string, step: number) {

View File

@@ -1,31 +1,32 @@
import { Config } from '@pnpm/config'
import {
DeprecationLog,
PackageManifestLog,
RootLog,
SummaryLog,
} from '@pnpm/core-loggers'
import { Config } from '@pnpm/config'
import * as Rx from 'rxjs'
import { map, take } from 'rxjs/operators'
import { EOL } from '../constants'
import {
ADDED_CHAR,
REMOVED_CHAR,
} from './outputConstants'
import getPkgsDiff, {
PackageDiff,
propertyByDependencyType,
} from './pkgsDiff'
import path = require('path')
import {
ADDED_CHAR,
REMOVED_CHAR,
} from './outputConstants'
import chalk = require('chalk')
import most = require('most')
import path = require('path')
import R = require('ramda')
import semver = require('semver')
export default (
log$: {
deprecation: most.Stream<DeprecationLog>
summary: most.Stream<SummaryLog>
root: most.Stream<RootLog>
packageManifest: most.Stream<PackageManifestLog>
deprecation: Rx.Observable<DeprecationLog>
summary: Rx.Observable<SummaryLog>
root: Rx.Observable<RootLog>
packageManifest: Rx.Observable<PackageManifestLog>
},
opts: {
cwd: string
@@ -34,33 +35,33 @@ export default (
) => {
const pkgsDiff$ = getPkgsDiff(log$, { prefix: opts.cwd })
const summaryLog$ = log$.summary
.take(1)
const summaryLog$ = log$.summary.pipe(take(1))
return most.combine(
(pkgsDiff) => {
let msg = ''
for (const depType of ['prod', 'optional', 'peer', 'dev', 'nodeModulesOnly']) {
const diffs = R.values(pkgsDiff[depType])
if (diffs.length) {
msg += EOL
if (opts.pnpmConfig?.global) {
msg += chalk.cyanBright(`${opts.cwd}:`)
} else {
msg += chalk.cyanBright(`${propertyByDependencyType[depType] as string}:`)
}
msg += EOL
msg += printDiffs(diffs, { prefix: opts.cwd })
msg += EOL
}
}
return { msg }
},
return Rx.combineLatest(
pkgsDiff$,
summaryLog$
)
.take(1)
.map(most.of)
.pipe(
take(1),
map(([pkgsDiff]) => {
let msg = ''
for (const depType of ['prod', 'optional', 'peer', 'dev', 'nodeModulesOnly']) {
const diffs = R.values(pkgsDiff[depType])
if (diffs.length) {
msg += EOL
if (opts.pnpmConfig?.global) {
msg += chalk.cyanBright(`${opts.cwd}:`)
} else {
msg += chalk.cyanBright(`${propertyByDependencyType[depType] as string}:`)
}
msg += EOL
msg += printDiffs(diffs, { prefix: opts.cwd })
msg += EOL
}
}
return Rx.of({ msg })
})
)
}
function printDiffs (

View File

@@ -1,11 +1,11 @@
import { Config } from '@pnpm/config'
import { Log } from '@pnpm/core-loggers'
import * as Rx from 'rxjs'
import reportError from './reportError'
import chalk = require('chalk')
import most = require('most')
export default function (
log$: most.Stream<Log>,
log$: Rx.Observable<Log>,
config?: Config
) {
log$.subscribe({

View File

@@ -20,6 +20,7 @@ import './reportingLifecycleScripts'
import './reportingProgress'
import './reportingRequestRetry'
import './reportingScope'
import { map, skip, take } from 'rxjs/operators'
import chalk = require('chalk')
import normalizeNewline = require('normalize-newline')
import path = require('path')
@@ -181,7 +182,7 @@ test('prints summary (of current package only)', t => {
t.plan(1)
output$.skip(2).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(2), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -264,7 +265,7 @@ test('prints summary for global installation', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -308,7 +309,7 @@ test('prints added peer dependency', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -375,7 +376,7 @@ test('prints summary correctly when the same package is specified both in option
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -395,7 +396,7 @@ test('prints summary when some packages fail', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -462,7 +463,7 @@ test('prints info', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -483,7 +484,7 @@ test('prints added/removed stats during installation', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -506,7 +507,7 @@ test('prints added/removed stats during installation when 0 removed', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -529,7 +530,7 @@ test('prints only the added stats if nothing was removed', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -551,7 +552,7 @@ test('prints only the removed stats if nothing was added', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -574,7 +575,7 @@ test('prints only the added stats if nothing was removed and a lot added', t =>
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -597,7 +598,7 @@ test('prints only the removed stats if nothing was added and a lot removed', t =
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -620,7 +621,7 @@ test('prints at least one remove sign when removed !== 0', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -644,7 +645,7 @@ test('prints at least one add sign when added !== 0', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -665,7 +666,7 @@ test('prints just removed during uninstallation', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -718,7 +719,7 @@ test('prints added/removed stats and warnings during recursive installation', t
t.plan(1)
output$.skip(8).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(8), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -751,7 +752,7 @@ test('recursive installation: prints only the added stats if nothing was removed
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -775,7 +776,7 @@ test('recursive installation: prints only the removed stats if nothing was added
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -799,7 +800,7 @@ test('recursive installation: prints at least one remove sign when removed !== 0
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -823,7 +824,7 @@ test('recursive installation: prints at least one add sign when added !== 0', t
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -846,7 +847,7 @@ test('recursive uninstall: prints removed packages number', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -873,7 +874,7 @@ test('install: print hook message', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -900,7 +901,7 @@ test('recursive: print hook message', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -934,7 +935,7 @@ test('prints skipped optional dependency info message', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -960,7 +961,7 @@ test('logLevel=default', t => {
t.plan(1)
output$.skip(2).take(1).subscribe({
output$.pipe(skip(2), take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -991,7 +992,7 @@ test('logLevel=warn', t => {
t.plan(1)
output$.skip(1).take(1).subscribe({
output$.pipe(skip(1), take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -1021,7 +1022,7 @@ test('logLevel=error', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -1053,7 +1054,7 @@ test('warnings are collapsed', t => {
t.plan(1)
output$.skip(6).take(1).subscribe({
output$.pipe(skip(6), take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

View File

@@ -4,6 +4,7 @@ import {
createStreamParser,
} from '@pnpm/logger'
import delay from 'delay'
import { take } from 'rxjs/operators'
import test = require('tape')
test('print context and import method info', (t) => {
@@ -25,7 +26,7 @@ test('print context and import method info', (t) => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

View File

@@ -3,6 +3,7 @@ import PnpmError from '@pnpm/error'
import logger, {
createStreamParser,
} from '@pnpm/logger'
import { map, take } from 'rxjs/operators'
import path = require('path')
import chalk = require('chalk')
import loadJsonFile = require('load-json-file')
@@ -23,7 +24,7 @@ test('prints generic error', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -45,7 +46,7 @@ test('prints generic error when recursive install fails', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -64,7 +65,7 @@ test('prints no matching version error when many dist-tags exist', async (t) =>
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -94,7 +95,7 @@ test('prints no matching version error when only the latest dist-tag exists', as
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -119,7 +120,7 @@ test('prints suggestions when an internet-connection related error happens', asy
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -156,7 +157,7 @@ test('prints test error', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -178,7 +179,7 @@ test('prints command error with exit code', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -201,7 +202,7 @@ test('prints command error without exit code', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -223,7 +224,7 @@ test('prints unsupported pnpm version error', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -255,7 +256,7 @@ test('prints unsupported Node version error', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -284,7 +285,7 @@ test('prints unsupported pnpm and Node versions error', async (t) => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -326,7 +327,7 @@ test('prints error even if the error object not passed in through the message ob
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -347,7 +348,7 @@ test('prints error without packages stacktrace when pkgsStack is empty', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -374,7 +375,7 @@ test('prints error with packages stacktrace - depth 1 and hint', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -408,7 +409,7 @@ test('prints error with packages stacktrace - depth 2', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -430,7 +431,7 @@ test('prints error and hint', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -463,7 +464,7 @@ test('prints authorization error with auth settings', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -496,7 +497,7 @@ test('prints authorization error without auth settings, where there are none', t
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

View File

@@ -1,6 +1,7 @@
import { lifecycleLogger } from '@pnpm/core-loggers'
import { toOutput$ } from '@pnpm/default-reporter'
import { createStreamParser } from '@pnpm/logger'
import { map, skip, take } from 'rxjs/operators'
import path = require('path')
import chalk = require('chalk')
import normalizeNewline = require('normalize-newline')
@@ -114,7 +115,7 @@ test('groups lifecycle output', t => {
t.plan(1)
output$.skip(9).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(9), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -235,7 +236,7 @@ test('groups lifecycle output when append-only is used', t => {
const allOutputs = [] as string[]
output$.take(11).map(normalizeNewline).subscribe({
output$.pipe(take(11), map(normalizeNewline)).subscribe({
complete: () => {
t.equal(allOutputs.join(EOL), `\
${chalk.cyan('packages/foo')} ${PREINSTALL}$ node foo
@@ -355,7 +356,7 @@ test('groups lifecycle output when streamLifecycleOutput is used', t => {
t.plan(1)
output$.skip(11).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(11), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -409,7 +410,7 @@ test('collapse lifecycle output when it has too many lines', t => {
t.plan(1)
output$.skip(101).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(101), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -523,7 +524,7 @@ test('collapses lifecycle output of packages from node_modules', t => {
t.plan(1)
output$.skip(5).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(5), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -569,7 +570,7 @@ test('output of failed optional dependency is not shown', t => {
t.plan(1)
output$.skip(1).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(1), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -611,7 +612,7 @@ test('output of failed non-optional dependency is printed', t => {
t.plan(1)
output$.skip(1).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(1), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: (output: string) => {
@@ -671,7 +672,7 @@ test['skip']('prints lifecycle progress', t => {
const childOutputColor = chalk.grey
const childOutputError = chalk.red
output$.skip(3).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(3), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

View File

@@ -9,9 +9,8 @@ import { toOutput$ } from '@pnpm/default-reporter'
import logger, {
createStreamParser,
} from '@pnpm/logger'
import delay from 'delay'
import { map, skip, take } from 'rxjs/operators'
import chalk = require('chalk')
import most = require('most')
import normalizeNewline = require('normalize-newline')
import test = require('tape')
@@ -42,7 +41,7 @@ test('prints progress beginning', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -72,7 +71,7 @@ test('prints progress beginning of node_modules from not cwd', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -105,7 +104,7 @@ test('prints progress beginning when appendOnly is true', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -138,7 +137,7 @@ test('prints progress beginning during recursive install', t => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -159,7 +158,7 @@ test('prints progress on first download', async t => {
streamParser: createStreamParser(),
})
output$.skip(1).take(1).subscribe({
output$.pipe(skip(1), take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -179,8 +178,6 @@ test('prints progress on first download', async t => {
status: 'resolved',
})
await delay(10)
progressLogger.debug({
packageId,
requester: '/src/project',
@@ -200,7 +197,7 @@ test('moves fixed line to the end', async t => {
streamParser: createStreamParser(),
})
output$.skip(3).take(1).map(normalizeNewline).subscribe({
output$.pipe(skip(3), take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -221,8 +218,6 @@ test('moves fixed line to the end', async t => {
status: 'resolved',
})
await delay(10)
progressLogger.debug({
packageId,
requester: prefix,
@@ -230,8 +225,6 @@ test('moves fixed line to the end', async t => {
})
logger.warn({ message: 'foo', prefix })
await delay(10) // w/o delay warning goes below for some reason. Started to happen after switch to most
stageLogger.debug({
prefix: prefix,
stage: 'resolution_done',
@@ -255,7 +248,7 @@ test('prints "Already up-to-date"', t => {
t.plan(1)
output$.take(1).map(normalizeNewline).subscribe({
output$.pipe(take(1), map(normalizeNewline)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -267,8 +260,7 @@ test('prints "Already up-to-date"', t => {
test('prints progress of big files download', async t => {
t.plan(6)
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
let output$ = toOutput$({
const output$ = toOutput$({
context: {
argv: ['install'],
config: { dir: '/src/project' } as Config,
@@ -276,66 +268,48 @@ test('prints progress of big files download', async t => {
reportingOptions: { throttleProgress: 0 },
streamParser: createStreamParser(),
})
.map(normalizeNewline) as most.Stream<string>
const stream$: Array<most.Stream<string>> = []
const pkgId1 = 'registry.npmjs.org/foo/1.0.0'
const pkgId2 = 'registry.npmjs.org/bar/2.0.0'
const pkgId3 = 'registry.npmjs.org/qar/3.0.0'
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}`))
)
output$ = output$.skip(1)
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `\
output$.pipe(
map(normalizeNewline),
map((output, index) => {
switch (index) {
case 0:
t.equal(output, `Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}`)
return
case 1:
t.equal(output, `\
Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}
Downloading ${hlPkgId(pkgId1)}: ${hlValue('0 B')}/${hlValue('10.5 MB')}`))
)
output$ = output$.skip(1)
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `\
Downloading ${hlPkgId(pkgId1)}: ${hlValue('0 B')}/${hlValue('10.5 MB')}`)
return
case 2:
t.equal(output, `\
Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}
Downloading ${hlPkgId(pkgId1)}: ${hlValue('5.77 MB')}/${hlValue('10.5 MB')}`))
)
output$ = output$.skip(2)
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `\
Downloading ${hlPkgId(pkgId1)}: ${hlValue('5.77 MB')}/${hlValue('10.5 MB')}`)
return
case 4:
t.equal(output, `\
Resolving: total ${hlValue('2')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}
Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')}`, 'downloading of small package not reported'))
)
output$ = output$.skip(3)
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `\
Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')}`, 'downloading of small package not reported')
return
case 7:
t.equal(output, `\
Resolving: total ${hlValue('3')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}
Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')}
Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
)
output$ = output$.skip(1)
stream$.push(
output$.take(1)
.tap(output => t.equal(output, `\
Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)
return
case 8:
t.equal(output, `\
Downloading ${hlPkgId(pkgId1)}: ${hlValue('10.5 MB')}/${hlValue('10.5 MB')}, done
Resolving: total ${hlValue('3')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}
Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)
return // eslint-disable-line
}
})
)
most.mergeArray(stream$)
.subscribe({
complete: () => t.end(),
error: t.end,
@@ -353,8 +327,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
status: 'resolved',
})
await delay(10)
fetchingProgressLogger.debug({
attempt: 1,
packageId: pkgId1,
@@ -362,8 +334,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
status: 'started',
})
await delay(10)
fetchingProgressLogger.debug({
downloaded: 1024 * 1024 * 5.5, // 5.5 MB
packageId: pkgId1,
@@ -376,8 +346,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
status: 'resolved',
})
await delay(10)
fetchingProgressLogger.debug({
attempt: 1,
packageId: pkgId1,
@@ -404,8 +372,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`))
status: 'started',
})
await delay(10)
fetchingProgressLogger.debug({
downloaded: 1024 * 1024 * 19, // 19 MB
packageId: pkgId3,

View File

@@ -3,6 +3,7 @@ import { toOutput$ } from '@pnpm/default-reporter'
import {
createStreamParser,
} from '@pnpm/logger'
import { take } from 'rxjs/operators'
import chalk = require('chalk')
import test = require('tape')
@@ -27,7 +28,7 @@ test('print warning about request retry', (t) => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

View File

@@ -3,6 +3,7 @@ import { toOutput$ } from '@pnpm/default-reporter'
import logger, {
createStreamParser,
} from '@pnpm/logger'
import { take } from 'rxjs/operators'
import test = require('tape')
const scopeLogger = logger<object>('scope')
@@ -22,7 +23,7 @@ test('prints scope of non-recursive install in a workspace', (t) => {
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -48,7 +49,7 @@ test('prints scope of recursive install in a workspace when not all packages are
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -74,7 +75,7 @@ test('prints scope of recursive install in a workspace when all packages are sel
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -99,7 +100,7 @@ test('prints scope of recursive install not in a workspace when not all packages
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {
@@ -124,7 +125,7 @@ test('prints scope of recursive install not in a workspace when all packages are
t.plan(1)
output$.take(1).subscribe({
output$.pipe(take(1)).subscribe({
complete: () => t.end(),
error: t.end,
next: output => {

60
pnpm-lock.yaml generated
View File

@@ -258,16 +258,15 @@ importers:
'@pnpm/core-loggers': 'link:../core-loggers'
'@pnpm/error': 'link:../error'
'@pnpm/types': 'link:../types'
'@zkochan/zen-push': 1.0.0
ansi-diff: 1.1.1
chalk: 4.1.0
most: 1.8.1_most@1.8.1
normalize-path: 3.0.0
pretty-bytes: 5.4.1
pretty-ms: 7.0.0
pretty-time: 1.1.0
ramda: 0.27.1
right-pad: 1.0.1
rxjs: 6.6.3
semver: 7.3.2
stacktracey: 1.2.127
string-length: 4.0.1
@@ -294,13 +293,11 @@ importers:
'@types/pretty-time': ^1.1.0
'@types/ramda': ^0.27.15
'@types/semver': ^7.3.3
'@zkochan/zen-push': 1.0.0
ansi-diff: ^1.1.1
chalk: ^4.1.0
delay: ^4.4.0
ghooks: 2.0.4
load-json-file: ^6.2.0
most: ^1.8.1
normalize-newline: 3.0.0
normalize-path: ^3.0.0
pretty-bytes: ^5.4.1
@@ -308,6 +305,7 @@ importers:
pretty-time: ^1.1.0
ramda: ^0.27.1
right-pad: ^1.0.1
rxjs: ^6.6.3
semver: ^7.3.2
stacktracey: ^1.2.127
string-length: ^4.0.1
@@ -3450,19 +3448,6 @@ packages:
dev: true
resolution:
integrity: sha512-J6VClfQSVgR6958eIDTGjfdCrELy1eT+SHeoSMomnvRQVktZMnEA5edIr5ovRFNw5y+Bk/jyoevPzGYod96mhw==
/@most/multicast/1.3.0_most@1.8.1:
dependencies:
'@most/prelude': 1.7.3
most: 1.8.1_most@1.8.1
dev: false
peerDependencies:
most: ^1.0.1
resolution:
integrity: sha512-DWH8AShgp5bXn+auGzf5tzPxvpmEvQJd0CNsApOci1LDF4eAEcnw4HQOr2Jaa+L92NbDYFKBSXxll+i7r1ikvw==
/@most/prelude/1.7.3:
dev: false
resolution:
integrity: sha512-qWWEnA22UP1lzFfKx75XMut6DUUXGRKe7qv2k+Bgs7ju8lwb5RjsZYyQZ+VcsYvHcIavHKzseLlBMLOe2CvUZw==
/@mrmlnc/readdir-enhanced/2.2.1:
dependencies:
call-me-maybe: 1.0.1
@@ -3912,10 +3897,6 @@ packages:
dev: true
resolution:
integrity: sha512-o5Pl/qux8JEuFpof5fUg/Fl6R3N26ExJlsmWpB67ayrEJDMIxWdM9NDJacisXeNB7YW+vij8onctH8Pr1Zhi5g==
/@types/zen-observable/0.8.1:
dev: false
resolution:
integrity: sha512-wmk0xQI6Yy7Fs/il4EpOcflG4uonUpYGqvZARESLc2oy4u69fkatFLbJOeW4Q6awO15P4rduAe6xkwHevpXcUQ==
/@typescript-eslint/eslint-plugin/4.1.0:
dependencies:
'@typescript-eslint/experimental-utils': 4.1.0
@@ -4148,15 +4129,6 @@ packages:
node: '>=8.15'
resolution:
integrity: sha512-uWMEF7fdc6C3VGTaW6Z9G9rYS41ulS0Lz+a3lGlDGji42kI6FSVVLI9s8bZ4ZR4l4Hs28MJHHVN8cOqvNlN86w==
/@zkochan/zen-push/1.0.0:
dependencies:
'@types/zen-observable': 0.8.1
zen-observable: 0.8.15
dev: false
engines:
node: '>=6'
resolution:
integrity: sha512-nHBmtlomBTExmjq9VlSqh33sJI08L4n6g4zkzSh8cHBz+J5uscuXLeLTAZwo9/6W6BngC2z+4G7RKDFgAtB0Fw==
/JSONStream/1.3.5:
dependencies:
jsonparse: 1.3.1
@@ -9214,16 +9186,6 @@ packages:
optional: true
resolution:
integrity: sha512-al0MUK7cpIcglMv3YF13qSgdAIqxHTO7brRtaz3DlSULbqfazqkc5kEjNrLDOM7fsjshoFIihnU8snrP7zUvhQ==
/most/1.8.1_most@1.8.1:
dependencies:
'@most/multicast': 1.3.0_most@1.8.1
'@most/prelude': 1.7.3
symbol-observable: 1.2.0
dev: false
peerDependencies:
most: '*'
resolution:
integrity: sha512-sN3lHtEvVNq5ay/pT93fKQ2XVW2uwbUypSWGt/DYCxGKuyvxaNd8mProK0M4k6irqeh1WD3pWrg4RdrCPohqDQ==
/ms/2.0.0:
resolution:
integrity: sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=
@@ -11104,6 +11066,14 @@ packages:
dev: true
resolution:
integrity: sha1-Gc5QLKVyZl87ZHsQk5+X/RYV8QI=
/rxjs/6.6.3:
dependencies:
tslib: 1.13.0
dev: false
engines:
npm: '>=2.0.0'
resolution:
integrity: sha512-trsQc+xYYXZ3urjOiJOuCOa5N3jAZ3eiSpQB5hIT8zGlL2QfnHLJ2r7GMkBGuIausdJN1OneaI6gQlsqNHHmZQ==
/safe-buffer/5.1.2:
resolution:
integrity: sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
@@ -11797,12 +11767,6 @@ packages:
node: '>=8'
resolution:
integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==
/symbol-observable/1.2.0:
dev: false
engines:
node: '>=0.10.0'
resolution:
integrity: sha512-e900nM8RRtGhlV36KGEU9k65K3mPb1WV70OdjfxlG2EAuM1noi/E/BaW/uMhL7bPEssK8QV57vN3esixjUvcXQ==
/symbol-tree/3.2.4:
dev: true
resolution:
@@ -12989,7 +12953,3 @@ packages:
node: '>=6'
resolution:
integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==
/zen-observable/0.8.15:
dev: false
resolution:
integrity: sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ==