From 09b42d3ab06caa81ec8f75c53453625496bc0e8a Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Thu, 10 Sep 2020 11:46:38 +0300 Subject: [PATCH] refactor: use RxJS instead of "most" in @pnpm/default-reporter PR #2841 --- .changeset/few-balloons-press.md | 5 + packages/default-reporter/package.json | 3 +- packages/default-reporter/src/index.ts | 100 ++++++------ packages/default-reporter/src/mergeOutputs.ts | 77 ++++++---- .../src/reporterForClient/index.ts | 44 +++--- .../src/reporterForClient/pkgsDiff.ts | 145 +++++++++++------- .../reportBigTarballsProgress.ts | 25 +-- .../src/reporterForClient/reportContext.ts | 65 ++++---- .../reporterForClient/reportDeprecations.ts | 21 +-- .../src/reporterForClient/reportHooks.ts | 11 +- .../reporterForClient/reportInstallChecks.ts | 15 +- .../reportLifecycleScripts.ts | 25 +-- .../src/reporterForClient/reportMisc.ts | 33 ++-- .../src/reporterForClient/reportProgress.ts | 114 +++++++------- .../reporterForClient/reportRequestRetry.ts | 12 +- .../src/reporterForClient/reportScope.ts | 20 +-- .../reportSkippedOptionalDependencies.ts | 12 +- .../src/reporterForClient/reportStats.ts | 63 ++++---- .../src/reporterForClient/reportSummary.ts | 69 +++++---- .../default-reporter/src/reporterForServer.ts | 4 +- packages/default-reporter/test/index.ts | 57 +++---- .../default-reporter/test/reportingContext.ts | 3 +- .../default-reporter/test/reportingErrors.ts | 37 ++--- .../test/reportingLifecycleScripts.ts | 17 +- .../test/reportingProgress.ts | 110 +++++-------- .../test/reportingRequestRetry.ts | 3 +- .../default-reporter/test/reportingScope.ts | 11 +- pnpm-lock.yaml | 60 ++------ 28 files changed, 577 insertions(+), 584 deletions(-) create mode 100644 .changeset/few-balloons-press.md diff --git a/.changeset/few-balloons-press.md b/.changeset/few-balloons-press.md new file mode 100644 index 0000000000..af220f5024 --- /dev/null +++ b/.changeset/few-balloons-press.md @@ -0,0 +1,5 @@ +--- +"@pnpm/default-reporter": minor +--- + +Use RxJS instead of "most". diff --git a/packages/default-reporter/package.json b/packages/default-reporter/package.json index 8877506ce0..49dfdbd56a 100644 --- a/packages/default-reporter/package.json +++ b/packages/default-reporter/package.json @@ -34,16 +34,15 @@ "@pnpm/core-loggers": "workspace:4.2.0", "@pnpm/error": "workspace:1.3.1", "@pnpm/types": "workspace:6.2.0", - "@zkochan/zen-push": "1.0.0", "ansi-diff": "^1.1.1", "chalk": "^4.1.0", - "most": "^1.8.1", "normalize-path": "^3.0.0", "pretty-bytes": "^5.4.1", "pretty-ms": "^7.0.0", "pretty-time": "^1.1.0", "ramda": "^0.27.1", "right-pad": "^1.0.1", + "rxjs": "^6.6.3", "semver": "^7.3.2", "stacktracey": "^1.2.127", "string-length": "^4.0.1", diff --git a/packages/default-reporter/src/index.ts b/packages/default-reporter/src/index.ts index bd21350bb9..6fb5800946 100644 --- a/packages/default-reporter/src/index.ts +++ b/packages/default-reporter/src/index.ts @@ -1,13 +1,13 @@ import { Config } from '@pnpm/config' import * as logs from '@pnpm/core-loggers' import { LogLevel } from '@pnpm/logger' -import PushStream from '@zkochan/zen-push' +import * as Rx from 'rxjs' +import { map, mergeAll } from 'rxjs/operators' import { EOL } from './constants' import mergeOutputs from './mergeOutputs' import reporterForClient from './reporterForClient' import reporterForServer from './reporterForServer' import createDiffer = require('ansi-diff') -import most = require('most') export default function ( opts: { @@ -26,7 +26,8 @@ export default function ( } ) { if (opts.context.argv[0] === 'server') { - const log$ = most.fromEvent('data', opts.streamParser) + // eslint-disable-next-line + const log$ = Rx.fromEvent(opts.streamParser as any, 'data') reporterForServer(log$, opts.context.config) return } @@ -75,28 +76,28 @@ export function toOutput$ ( config?: Config } } -): most.Stream { +): Rx.Observable { opts = opts || {} - const contextPushStream = new PushStream() - const fetchingProgressPushStream = new PushStream() - const progressPushStream = new PushStream() - const stagePushStream = new PushStream() - const deprecationPushStream = new PushStream() - const summaryPushStream = new PushStream() - const lifecyclePushStream = new PushStream() - const statsPushStream = new PushStream() - const packageImportMethodPushStream = new PushStream() - const installCheckPushStream = new PushStream() - const registryPushStream = new PushStream() - const rootPushStream = new PushStream() - const packageManifestPushStream = new PushStream() - const linkPushStream = new PushStream() - const otherPushStream = new PushStream() - const hookPushStream = new PushStream() - const skippedOptionalDependencyPushStream = new PushStream() - const scopePushStream = new PushStream() - const requestRetryPushStream = new PushStream() - setTimeout(() => { // setTimeout is a workaround for a strange bug in most https://github.com/cujojs/most/issues/491 + const contextPushStream = new Rx.Subject() + const fetchingProgressPushStream = new Rx.Subject() + const progressPushStream = new Rx.Subject() + const stagePushStream = new Rx.Subject() + const deprecationPushStream = new Rx.Subject() + const summaryPushStream = new Rx.Subject() + const lifecyclePushStream = new Rx.Subject() + const statsPushStream = new Rx.Subject() + const packageImportMethodPushStream = new Rx.Subject() + const installCheckPushStream = new Rx.Subject() + const registryPushStream = new Rx.Subject() + const rootPushStream = new Rx.Subject() + const packageManifestPushStream = new Rx.Subject() + const linkPushStream = new Rx.Subject() + const otherPushStream = new Rx.Subject() + const hookPushStream = new Rx.Subject() + const skippedOptionalDependencyPushStream = new Rx.Subject() + const scopePushStream = new Rx.Subject() + const requestRetryPushStream = new Rx.Subject() + setTimeout(() => { opts.streamParser['on']('data', (log: logs.Log) => { switch (log.name) { case 'pnpm:context': @@ -163,27 +164,27 @@ export function toOutput$ ( }) }, 0) const log$ = { - context: most.from(contextPushStream.observable), - deprecation: most.from(deprecationPushStream.observable), - fetchingProgress: most.from(fetchingProgressPushStream.observable), - hook: most.from(hookPushStream.observable), - installCheck: most.from(installCheckPushStream.observable), - lifecycle: most.from(lifecyclePushStream.observable), - link: most.from(linkPushStream.observable), - other: most.from(otherPushStream.observable), - packageImportMethod: most.from(packageImportMethodPushStream.observable), - packageManifest: most.from(packageManifestPushStream.observable), - progress: most.from(progressPushStream.observable), - registry: most.from(registryPushStream.observable), - requestRetry: most.from(requestRetryPushStream.observable), - root: most.from(rootPushStream.observable), - scope: most.from(scopePushStream.observable), - skippedOptionalDependency: most.from(skippedOptionalDependencyPushStream.observable), - stage: most.from(stagePushStream.observable), - stats: most.from(statsPushStream.observable), - summary: most.from(summaryPushStream.observable), + context: Rx.from(contextPushStream), + deprecation: Rx.from(deprecationPushStream), + fetchingProgress: Rx.from(fetchingProgressPushStream), + hook: Rx.from(hookPushStream), + installCheck: Rx.from(installCheckPushStream), + lifecycle: Rx.from(lifecyclePushStream), + link: Rx.from(linkPushStream), + other: Rx.from(otherPushStream), + packageImportMethod: Rx.from(packageImportMethodPushStream), + packageManifest: Rx.from(packageManifestPushStream), + progress: Rx.from(progressPushStream), + registry: Rx.from(registryPushStream), + requestRetry: Rx.from(requestRetryPushStream), + root: Rx.from(rootPushStream), + scope: Rx.from(scopePushStream), + skippedOptionalDependency: Rx.from(skippedOptionalDependencyPushStream), + stage: Rx.from(stagePushStream), + stats: Rx.from(statsPushStream), + summary: Rx.from(summaryPushStream), } - const outputs: Array>> = reporterForClient( + const outputs: Array>> = reporterForClient( log$, { appendOnly: opts.reportingOptions?.appendOnly, @@ -199,10 +200,11 @@ export function toOutput$ ( ) if (opts.reportingOptions?.appendOnly) { - return most.join( - most.mergeArray(outputs) - .map((log: most.Stream<{msg: string}>) => log.map((msg) => msg.msg)) - ) + return Rx.merge(...outputs) + .pipe( + map((log: Rx.Observable<{msg: string}>) => log.pipe(map((msg) => msg.msg))), + mergeAll() + ) } - return mergeOutputs(outputs).multicast() + return mergeOutputs(outputs) } diff --git a/packages/default-reporter/src/mergeOutputs.ts b/packages/default-reporter/src/mergeOutputs.ts index a42b34c548..c6f166be99 100644 --- a/packages/default-reporter/src/mergeOutputs.ts +++ b/packages/default-reporter/src/mergeOutputs.ts @@ -1,40 +1,42 @@ import { EOL } from './constants' -import most = require('most') +import * as Rx from 'rxjs' +import { filter, map, mergeAll, scan } from 'rxjs/operators' -export default function mergeOutputs (outputs: Array>>): most.Stream { +export default function mergeOutputs (outputs: Array>>): Rx.Observable { let blockNo = 0 let fixedBlockNo = 0 let started = false - return most.join( - most.mergeArray(outputs) - .map((log: most.Stream<{msg: string}>) => { - let currentBlockNo = -1 - let currentFixedBlockNo = -1 - return log - .map((msg) => { - if (msg['fixed']) { - if (currentFixedBlockNo === -1) { - currentFixedBlockNo = fixedBlockNo++ - } - return { - blockNo: currentFixedBlockNo, - fixed: true, - msg: msg.msg, - } - } - if (currentBlockNo === -1) { - currentBlockNo = blockNo++ + let previousOuput: string | null = null + return Rx.merge(...outputs).pipe( + map((log: Rx.Observable<{msg: string}>) => { + let currentBlockNo = -1 + let currentFixedBlockNo = -1 + return log.pipe( + map((msg) => { + if (msg['fixed']) { + if (currentFixedBlockNo === -1) { + currentFixedBlockNo = fixedBlockNo++ } return { - blockNo: currentBlockNo, - fixed: false, - msg: typeof msg === 'string' ? msg : msg.msg, // eslint-disable-line - prevFixedBlockNo: currentFixedBlockNo, + blockNo: currentFixedBlockNo, + fixed: true, + msg: msg.msg, } - }) - }) - ) - .scan((acc, log) => { + } + if (currentBlockNo === -1) { + currentBlockNo = blockNo++ + } + return { + blockNo: currentBlockNo, + fixed: false, + msg: typeof msg === 'string' ? msg : msg.msg, // eslint-disable-line + prevFixedBlockNo: currentFixedBlockNo, + } + }) + ) + }), + mergeAll(), + scan((acc, log) => { if (log.fixed) { acc.fixedBlocks[log.blockNo] = log.msg } else { @@ -42,8 +44,8 @@ export default function mergeOutputs (outputs: Array { + }, { fixedBlocks: [], blocks: [] } as {fixedBlocks: string[], blocks: string[]}), + map((sections) => { const fixedBlocks = sections.fixedBlocks.filter(Boolean) const nonFixedPart = sections.blocks.filter(Boolean).join(EOL) if (!fixedBlocks.length) { @@ -54,14 +56,21 @@ export default function mergeOutputs (outputs: Array { + }), + filter((msg) => { if (started) { return true } if (msg === '') return false started = true return true + }), + filter((msg) => { + if (msg !== previousOuput) { + previousOuput = msg + return true + } + return false }) - .skipRepeats() + ) } diff --git a/packages/default-reporter/src/reporterForClient/index.ts b/packages/default-reporter/src/reporterForClient/index.ts index 6a8dc8b535..bbbcc5db1d 100644 --- a/packages/default-reporter/src/reporterForClient/index.ts +++ b/packages/default-reporter/src/reporterForClient/index.ts @@ -1,6 +1,7 @@ import { Config } from '@pnpm/config' import * as logs from '@pnpm/core-loggers' import { LogLevel } from '@pnpm/logger' +import * as Rx from 'rxjs' import reportBigTarballsProgress from './reportBigTarballsProgress' import reportContext from './reportContext' import reportDeprecations from './reportDeprecations' @@ -14,29 +15,28 @@ import reportScope from './reportScope' import reportSkippedOptionalDependencies from './reportSkippedOptionalDependencies' import reportStats from './reportStats' import reportSummary from './reportSummary' -import most = require('most') export default function ( log$: { - context: most.Stream - fetchingProgress: most.Stream - progress: most.Stream - stage: most.Stream - deprecation: most.Stream - summary: most.Stream - lifecycle: most.Stream - stats: most.Stream - installCheck: most.Stream - registry: most.Stream - root: most.Stream - packageManifest: most.Stream - requestRetry: most.Stream - link: most.Stream - other: most.Stream - hook: most.Stream - scope: most.Stream - skippedOptionalDependency: most.Stream - packageImportMethod: most.Stream + context: Rx.Observable + fetchingProgress: Rx.Observable + progress: Rx.Observable + stage: Rx.Observable + deprecation: Rx.Observable + summary: Rx.Observable + lifecycle: Rx.Observable + stats: Rx.Observable + installCheck: Rx.Observable + registry: Rx.Observable + root: Rx.Observable + packageManifest: Rx.Observable + requestRetry: Rx.Observable + link: Rx.Observable + other: Rx.Observable + hook: Rx.Observable + scope: Rx.Observable + skippedOptionalDependency: Rx.Observable + packageImportMethod: Rx.Observable }, opts: { appendOnly?: boolean @@ -49,11 +49,11 @@ export default function ( throttleProgress?: number width?: number } -): Array>> { +): Array>> { const width = opts.width ?? process.stdout.columns ?? 80 const cwd = opts.pnpmConfig?.dir ?? process.cwd() - const outputs: Array>> = [ + const outputs: Array>> = [ reportProgress(log$, { cwd, throttleProgress: opts.throttleProgress, diff --git a/packages/default-reporter/src/reporterForClient/pkgsDiff.ts b/packages/default-reporter/src/reporterForClient/pkgsDiff.ts index cb8f775301..dc04c962ad 100644 --- a/packages/default-reporter/src/reporterForClient/pkgsDiff.ts +++ b/packages/default-reporter/src/reporterForClient/pkgsDiff.ts @@ -1,6 +1,7 @@ import * as logs from '@pnpm/core-loggers' import { PackageManifest } from '@pnpm/types' -import most = require('most') +import * as Rx from 'rxjs' +import { filter, map, mapTo, reduce, scan, startWith, take } from 'rxjs/operators' import R = require('ramda') export interface PackageDiff { @@ -27,28 +28,31 @@ export const propertyByDependencyType = { export default function ( log$: { - deprecation: most.Stream - summary: most.Stream - root: most.Stream - packageManifest: most.Stream + deprecation: Rx.Observable + summary: Rx.Observable + root: Rx.Observable + packageManifest: Rx.Observable }, opts: { prefix: string } ) { const deprecationSet$ = log$.deprecation - .filter((log) => log.prefix === opts.prefix) - .scan((acc, log) => { - acc.add(log.pkgId) - return acc - }, new Set()) + .pipe( + filter((log) => log.prefix === opts.prefix), + scan((acc, log) => { + acc.add(log.pkgId) + return acc + }, new Set()), + startWith(new Set()) + ) - const pkgsDiff$ = most.combine( - (rootLog, deprecationSet) => [rootLog, deprecationSet], - log$.root.filter((log) => log.prefix === opts.prefix), + const filterPrefix = filter((log: { prefix: string }) => log.prefix === opts.prefix) + const pkgsDiff$ = Rx.combineLatest( + log$.root.pipe(filterPrefix), deprecationSet$ - ) - .scan((pkgsDiff, args) => { + ).pipe( + scan((pkgsDiff, args) => { const rootLog = args[0] const deprecationSet = args[1] as Set if (rootLog['added']) { @@ -83,57 +87,80 @@ export default function ( nodeModulesOnly: Map optional: Map prod: Map + }), + startWith({ + dev: {}, + nodeModulesOnly: {}, + optional: {}, + peer: {}, + prod: {}, }) - - const packageManifest$ = most.fromPromise( - most.merge( - log$.packageManifest.filter((log) => log.prefix === opts.prefix), - log$.summary.filter((log) => log.prefix === opts.prefix).constant({}) - ) - .take(2) - .reduce(R.merge, {} as any) // eslint-disable-line @typescript-eslint/no-explicit-any ) - return most.combine( - (pkgsDiff, packageManifests: { initial?: PackageManifest, updated?: PackageManifest }) => { - if (!packageManifests['initial'] || !packageManifests['updated']) return pkgsDiff + const packageManifest$ = Rx.merge( + log$.packageManifest.pipe(filterPrefix), + log$.summary.pipe(filterPrefix, mapTo({})) + ) + .pipe( + take(2), + reduce(R.merge, {} as any) // eslint-disable-line @typescript-eslint/no-explicit-any + ) - const initialPackageManifest = removeOptionalFromProdDeps(packageManifests['initial']) - const updatedPackageManifest = removeOptionalFromProdDeps(packageManifests['updated']) - - for (const depType of ['peer', 'prod', 'optional', 'dev']) { - const prop = propertyByDependencyType[depType] - const initialDeps = Object.keys(initialPackageManifest[prop] || {}) - const updatedDeps = Object.keys(updatedPackageManifest[prop] || {}) - const removedDeps = R.difference(initialDeps, updatedDeps) - - for (const removedDep of removedDeps) { - if (!pkgsDiff[depType][`-${removedDep}`]) { - pkgsDiff[depType][`-${removedDep}`] = { - added: false, - name: removedDep, - version: initialPackageManifest[prop][removedDep], - } - } - } - - const addedDeps = R.difference(updatedDeps, initialDeps) - - for (const addedDep of addedDeps) { - if (!pkgsDiff[depType][`+${addedDep}`]) { - pkgsDiff[depType][`+${addedDep}`] = { - added: true, - name: addedDep, - version: updatedPackageManifest[prop][addedDep], - } - } - } - } - return pkgsDiff - }, + return Rx.combineLatest( pkgsDiff$, packageManifest$ ) + .pipe( + map( + ([pkgsDiff, packageManifests]: [ + { + dev: Map + nodeModulesOnly: Map + optional: Map + prod: Map + }, + { + initial?: PackageManifest + updated?: PackageManifest + } + ]) => { + if (!packageManifests['initial'] || !packageManifests['updated']) return pkgsDiff + + const initialPackageManifest = removeOptionalFromProdDeps(packageManifests['initial']) + const updatedPackageManifest = removeOptionalFromProdDeps(packageManifests['updated']) + + for (const depType of ['peer', 'prod', 'optional', 'dev']) { + const prop = propertyByDependencyType[depType] + const initialDeps = Object.keys(initialPackageManifest[prop] || {}) + const updatedDeps = Object.keys(updatedPackageManifest[prop] || {}) + const removedDeps = R.difference(initialDeps, updatedDeps) + + for (const removedDep of removedDeps) { + if (!pkgsDiff[depType][`-${removedDep}`]) { + pkgsDiff[depType][`-${removedDep}`] = { + added: false, + name: removedDep, + version: initialPackageManifest[prop][removedDep], + } + } + } + + const addedDeps = R.difference(updatedDeps, initialDeps) + + for (const addedDep of addedDeps) { + if (!pkgsDiff[depType][`+${addedDep}`]) { + pkgsDiff[depType][`+${addedDep}`] = { + added: true, + name: addedDep, + version: updatedPackageManifest[prop][addedDep], + } + } + } + } + return pkgsDiff + } + ) + ) } function removeOptionalFromProdDeps (pkg: PackageManifest): PackageManifest { diff --git a/packages/default-reporter/src/reporterForClient/reportBigTarballsProgress.ts b/packages/default-reporter/src/reporterForClient/reportBigTarballsProgress.ts index b33788d5af..fde7a574cd 100644 --- a/packages/default-reporter/src/reporterForClient/reportBigTarballsProgress.ts +++ b/packages/default-reporter/src/reporterForClient/reportBigTarballsProgress.ts @@ -1,32 +1,33 @@ import { FetchingProgressLog } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' +import { filter, map, startWith } from 'rxjs/operators' import { hlPkgId, hlValue, } from './outputConstants' -import most = require('most') import prettyBytes = require('pretty-bytes') const BIG_TARBALL_SIZE = 1024 * 1024 * 5 // 5 MB export default ( log$: { - fetchingProgress: most.Stream + fetchingProgress: Rx.Observable } ) => { - return log$.fetchingProgress - .filter((log) => log.status === 'started' && + return log$.fetchingProgress.pipe( + filter((log: FetchingProgressLog) => log.status === 'started' && typeof log.size === 'number' && log.size >= BIG_TARBALL_SIZE && // When retrying the download, keep the existing progress line. // Fixing issue: https://github.com/pnpm/pnpm/issues/1013 log.attempt === 1 - ) - .map((startedLog) => { + ), + map((startedLog: FetchingProgressLog) => { const size = prettyBytes(startedLog['size']) - return log$.fetchingProgress - .filter((log) => log.status === 'in_progress' && log.packageId === startedLog['packageId']) - .map((log) => log['downloaded']) - .startWith(0) - .map((downloadedRaw) => { + return log$.fetchingProgress.pipe( + filter((log: FetchingProgressLog) => log.status === 'in_progress' && log.packageId === startedLog['packageId']), + map((log: FetchingProgressLog) => log['downloaded']), + startWith(0), + map((downloadedRaw: number) => { const done = startedLog['size'] === downloadedRaw const downloaded = prettyBytes(downloadedRaw) return { @@ -34,5 +35,7 @@ export default ( msg: `Downloading ${hlPkgId(startedLog['packageId'])}: ${hlValue(downloaded)}/${hlValue(size)}${done ? ', done' : ''}`, } }) + ) }) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportContext.ts b/packages/default-reporter/src/reporterForClient/reportContext.ts index e937f41535..8a96c45edf 100644 --- a/packages/default-reporter/src/reporterForClient/reportContext.ts +++ b/packages/default-reporter/src/reporterForClient/reportContext.ts @@ -1,42 +1,47 @@ import { ContextLog, PackageImportMethodLog } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' +import { map, take } from 'rxjs/operators' import path = require('path') -import most = require('most') export default ( log$: { - context: most.Stream - packageImportMethod: most.Stream + context: Rx.Observable + packageImportMethod: Rx.Observable }, opts: { cwd: string } ) => { - return most.combine( - (context, packageImportMethod) => { - if (context.currentLockfileExists) { - return most.never() - } - let method!: string - switch (packageImportMethod.method) { - case 'copy': - method = 'copied' - break - case 'clone': - method = 'cloned' - break - case 'hardlink': - method = 'hard linked' - break - default: - method = packageImportMethod.method - break - } - return most.of({ - msg: `\ + return Rx.combineLatest( + log$.context.pipe(take(1)), + log$.packageImportMethod.pipe(take(1)) + ) + .pipe( + map( + ([context, packageImportMethod]) => { + if (context.currentLockfileExists) { + return Rx.NEVER + } + let method!: string + switch (packageImportMethod.method) { + case 'copy': + method = 'copied' + break + case 'clone': + method = 'cloned' + break + case 'hardlink': + method = 'hard linked' + break + default: + method = packageImportMethod.method + break + } + return Rx.of({ + msg: `\ Packages are ${method} from the content-addressable store to the virtual store. Content-addressable store is at: ${context.storeDir} Virtual store is at: ${path.relative(opts.cwd, context.virtualStoreDir)}`, - }) - }, - log$.context.take(1), - log$.packageImportMethod.take(1) - ) + }) + } + ) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportDeprecations.ts b/packages/default-reporter/src/reporterForClient/reportDeprecations.ts index b487ee9a3d..ac95683684 100644 --- a/packages/default-reporter/src/reporterForClient/reportDeprecations.ts +++ b/packages/default-reporter/src/reporterForClient/reportDeprecations.ts @@ -1,28 +1,29 @@ import { DeprecationLog } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' +import { filter, map } from 'rxjs/operators' import formatWarn from './utils/formatWarn' import { zoomOut } from './utils/zooming' import chalk = require('chalk') -import most = require('most') export default ( - deprecation$: most.Stream, + deprecation$: Rx.Observable, opts: { cwd: string isRecursive: boolean } ) => { - return deprecation$ + return deprecation$.pipe( // print warnings only about deprecated packages from the root - .filter((log) => log.depth === 0) - .map((log) => { + filter((log) => log.depth === 0), + map((log) => { if (!opts.isRecursive && log.prefix === opts.cwd) { - return { + return Rx.of({ msg: formatWarn(`${chalk.red('deprecated')} ${log.pkgName}@${log.pkgVersion}: ${log.deprecated}`), - } + }) } - return { + return Rx.of({ msg: zoomOut(opts.cwd, log.prefix, formatWarn(`${chalk.red('deprecated')} ${log.pkgName}@${log.pkgVersion}`)), - } + }) }) - .map(most.of) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportHooks.ts b/packages/default-reporter/src/reporterForClient/reportHooks.ts index 75e89d8180..93f472024a 100644 --- a/packages/default-reporter/src/reporterForClient/reportHooks.ts +++ b/packages/default-reporter/src/reporterForClient/reportHooks.ts @@ -1,17 +1,18 @@ import { HookLog } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' +import { map } from 'rxjs/operators' import { autozoom } from './utils/zooming' import chalk = require('chalk') -import most = require('most') export default ( - hook$: most.Stream, + hook$: Rx.Observable, opts: { cwd: string isRecursive: boolean } ) => { - return hook$ - .map((log) => ({ + return hook$.pipe( + map((log) => Rx.of({ msg: autozoom( opts.cwd, log.prefix, @@ -21,5 +22,5 @@ export default ( } ), })) - .map(most.of) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportInstallChecks.ts b/packages/default-reporter/src/reporterForClient/reportInstallChecks.ts index 0864a32c3f..88ac3ade0a 100644 --- a/packages/default-reporter/src/reporterForClient/reportInstallChecks.ts +++ b/packages/default-reporter/src/reporterForClient/reportInstallChecks.ts @@ -1,19 +1,20 @@ import { InstallCheckLog } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' +import { filter, map } from 'rxjs/operators' import formatWarn from './utils/formatWarn' import { autozoom } from './utils/zooming' -import most = require('most') export default ( - installCheck$: most.Stream, + installCheck$: Rx.Observable, opts: { cwd: string } ) => { - return installCheck$ - .map(formatInstallCheck.bind(null, opts.cwd)) - .filter(Boolean) - .map((msg) => ({ msg })) - .map(most.of) as most.Stream> + return installCheck$.pipe( + map((log) => formatInstallCheck(opts.cwd, log)), + filter(Boolean), + map((msg) => Rx.of({ msg })) + ) } function formatInstallCheck ( diff --git a/packages/default-reporter/src/reporterForClient/reportLifecycleScripts.ts b/packages/default-reporter/src/reporterForClient/reportLifecycleScripts.ts index 811590fc68..1190f15d9c 100644 --- a/packages/default-reporter/src/reporterForClient/reportLifecycleScripts.ts +++ b/packages/default-reporter/src/reporterForClient/reportLifecycleScripts.ts @@ -1,13 +1,13 @@ import { LifecycleLog } from '@pnpm/core-loggers' -import PushStream from '@zkochan/zen-push' +import * as Rx from 'rxjs' +import { map } from 'rxjs/operators' import { EOL } from '../constants' +import formatPrefix, { formatPrefixNoTrim } from './utils/formatPrefix' import { hlValue, } from './outputConstants' -import formatPrefix, { formatPrefixNoTrim } from './utils/formatPrefix' -import path = require('path') import chalk = require('chalk') -import most = require('most') +import path = require('path') import prettyTime = require('pretty-time') import stripAnsi = require('strip-ansi') @@ -24,7 +24,7 @@ type ColorByPkg = Map string> export default ( log$: { - lifecycle: most.Stream + lifecycle: Rx.Observable }, opts: { appendOnly?: boolean @@ -36,10 +36,11 @@ export default ( // in order to reduce flickering if (opts.appendOnly) { const streamLifecycleOutput = createStreamLifecycleOutput(opts.cwd) - return log$.lifecycle - .map((log: LifecycleLog) => most.of({ + return log$.lifecycle.pipe( + map((log: LifecycleLog) => Rx.of({ msg: streamLifecycleOutput(log), })) + ) } const lifecycleMessages: { [depPath: string]: { @@ -51,9 +52,9 @@ export default ( } } = {} const lifecycleStreamByDepPath: { - [depPath: string]: PushStream<{ msg: string }> + [depPath: string]: Rx.Subject<{ msg: string }> } = {} - const lifecyclePushStream = new PushStream() + const lifecyclePushStream = new Rx.Subject>() // TODO: handle promise of .forEach?! log$.lifecycle // eslint-disable-line @@ -76,8 +77,8 @@ export default ( delete lifecycleMessages[key] } if (!lifecycleStreamByDepPath[key]) { - lifecycleStreamByDepPath[key] = new PushStream() - lifecyclePushStream.next(most.from(lifecycleStreamByDepPath[key].observable)) + lifecycleStreamByDepPath[key] = new Rx.Subject<{ msg: string }>() + lifecyclePushStream.next(Rx.from(lifecycleStreamByDepPath[key])) } lifecycleStreamByDepPath[key].next({ msg }) if (exit) { @@ -85,7 +86,7 @@ export default ( } }) - return most.from(lifecyclePushStream.observable) + return Rx.from(lifecyclePushStream) } function renderCollapsedScriptOutput ( diff --git a/packages/default-reporter/src/reporterForClient/reportMisc.ts b/packages/default-reporter/src/reporterForClient/reportMisc.ts index 4f472afb4f..5b1ba2eeb4 100644 --- a/packages/default-reporter/src/reporterForClient/reportMisc.ts +++ b/packages/default-reporter/src/reporterForClient/reportMisc.ts @@ -1,12 +1,12 @@ import { Config } from '@pnpm/config' import { Log, RegistryLog } from '@pnpm/core-loggers' import { LogLevel } from '@pnpm/logger' -import PushStream from '@zkochan/zen-push' import reportError from '../reportError' import formatWarn from './utils/formatWarn' import { autozoom } from './utils/zooming' +import * as Rx from 'rxjs' +import { filter, map } from 'rxjs/operators' import os = require('os') -import most = require('most') // eslint-disable:object-literal-sort-keys const LOG_LEVEL_NUMBER: Record = { @@ -21,8 +21,8 @@ const MAX_SHOWN_WARNINGS = 5 export default ( log$: { - registry: most.Stream - other: most.Stream + registry: Rx.Observable + other: Rx.Observable }, opts: { cwd: string @@ -33,25 +33,26 @@ export default ( ) => { const maxLogLevel = LOG_LEVEL_NUMBER[opts.logLevel ?? 'info'] ?? LOG_LEVEL_NUMBER['info'] const reportWarning = makeWarningReporter(opts) - return most.merge(log$.registry, log$.other) - .filter((obj) => LOG_LEVEL_NUMBER[obj.level] <= maxLogLevel && - (obj.level !== 'info' || !obj['prefix'] || obj['prefix'] === opts.cwd)) - .map((obj) => { + return Rx.merge(log$.registry, log$.other).pipe( + filter((obj) => LOG_LEVEL_NUMBER[obj.level] <= maxLogLevel && + (obj.level !== 'info' || !obj['prefix'] || obj['prefix'] === opts.cwd)), + map((obj) => { switch (obj.level) { case 'warn': { return reportWarning(obj) } case 'error': if (obj['message']?.['prefix'] && obj['message']['prefix'] !== opts.cwd) { - return most.of({ + return Rx.of({ msg: `${obj['message']['prefix'] as string}:` + os.EOL + reportError(obj, opts.config), }) } - return most.of({ msg: reportError(obj, opts.config) }) + return Rx.of({ msg: reportError(obj, opts.config) }) default: - return most.of({ msg: obj['message'] }) + return Rx.of({ msg: obj['message'] }) } }) + ) } // Sometimes, when installing new dependencies that rely on many peer dependencies, @@ -65,21 +66,21 @@ function makeWarningReporter ( } ) { let warningsCounter = 0 - let collapsedWarnings: PushStream<{ msg: string }> + let collapsedWarnings: Rx.Subject<{ msg: string }> return (obj: { prefix: string, message: string }) => { warningsCounter++ if (warningsCounter <= MAX_SHOWN_WARNINGS) { - return most.of({ msg: autozoom(opts.cwd, obj.prefix, formatWarn(obj.message), opts) }) + return Rx.of({ msg: autozoom(opts.cwd, obj.prefix, formatWarn(obj.message), opts) }) } const warningMsg = formatWarn(`${warningsCounter - MAX_SHOWN_WARNINGS} other warnings`) if (!collapsedWarnings) { - collapsedWarnings = new PushStream() + collapsedWarnings = new Rx.Subject() // For some reason, without using setTimeout, the warning summary is printed above the rest of the warnings // Even though the summary event happens last. Probably a bug in "most". setTimeout(() => collapsedWarnings.next({ msg: warningMsg }), 0) - return most.from(collapsedWarnings.observable) + return Rx.from(collapsedWarnings) } setTimeout(() => collapsedWarnings!.next({ msg: warningMsg }), 0) - return most.never() + return Rx.NEVER } } diff --git a/packages/default-reporter/src/reporterForClient/reportProgress.ts b/packages/default-reporter/src/reporterForClient/reportProgress.ts index 6b196782eb..d9f74428bd 100644 --- a/packages/default-reporter/src/reporterForClient/reportProgress.ts +++ b/packages/default-reporter/src/reporterForClient/reportProgress.ts @@ -1,8 +1,8 @@ import { ProgressLog, StageLog } from '@pnpm/core-loggers' -import PushStream from '@zkochan/zen-push' +import * as Rx from 'rxjs' +import { filter, map, mapTo, sampleTime, takeWhile, startWith, take } from 'rxjs/operators' import { hlValue } from './outputConstants' import { zoomOut } from './utils/zooming' -import most = require('most') interface ProgressStats { fetched: number @@ -11,15 +11,15 @@ interface ProgressStats { } interface ModulesInstallProgress { - importingDone$: most.Stream - progress$: most.Stream + importingDone$: Rx.Observable + progress$: Rx.Observable requirer: string } export default ( log$: { - progress: most.Stream - stage: most.Stream + progress: Rx.Observable + stage: Rx.Observable }, opts: { cwd: string @@ -30,101 +30,100 @@ export default ( ? throttledProgressOutput.bind(null, opts.throttleProgress) : nonThrottledProgressOutput - return getModulesInstallProgress$(log$.stage, log$.progress) - .map(({ importingDone$, progress$, requirer }) => { + return getModulesInstallProgress$(log$.stage, log$.progress).pipe( + map(({ importingDone$, progress$, requirer }) => { const output$ = progressOutput(importingDone$, progress$) if (requirer === opts.cwd) { return output$ } - return output$.map((msg: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any - msg['msg'] = zoomOut(opts.cwd, requirer, msg['msg']) - return msg - }) + return output$.pipe( + map((msg: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any + msg['msg'] = zoomOut(opts.cwd, requirer, msg['msg']) + return msg + }) + ) }) + ) } function throttledProgressOutput ( throttleProgress: number, - importingDone$: most.Stream, - progress$: most.Stream + importingDone$: Rx.Observable, + progress$: Rx.Observable ) { // Reporting is done every `throttleProgress` milliseconds // and once all packages are fetched. - const sampler = most.merge( - most.periodic(throttleProgress).until(importingDone$.skip(1)), + return Rx.combineLatest( + progress$.pipe(sampleTime(throttleProgress)), importingDone$ ) - return most.sample( - createStatusMessage, - sampler, - progress$, - importingDone$ - ) - // Avoid logs after all resolved packages were downloaded. - // Fixing issue: https://github.com/pnpm/pnpm/issues/1028#issuecomment-364782901 - .skipAfter((msg) => msg['done'] === true) + .pipe( + map(createStatusMessage), + // Avoid logs after all resolved packages were downloaded. + // Fixing issue: https://github.com/pnpm/pnpm/issues/1028#issuecomment-364782901 + takeWhile((msg) => msg['done'] !== true, true) + ) } function nonThrottledProgressOutput ( - importingDone$: most.Stream, - progress$: most.Stream + importingDone$: Rx.Observable, + progress$: Rx.Observable ) { - return most.combine( - createStatusMessage, + return Rx.combineLatest( progress$, importingDone$ ) + .pipe(map(createStatusMessage)) } function getModulesInstallProgress$ ( - stage$: most.Stream, - progress$: most.Stream -): most.Stream { - const modulesInstallProgressPushStream = new PushStream() + stage$: Rx.Observable, + progress$: Rx.Observable +): Rx.Observable { + const modulesInstallProgressPushStream = new Rx.Subject() const progessStatsPushStreamByRequirer = getProgessStatsPushStreamByRequirer(progress$) const stagePushStreamByRequirer: { - [requirer: string]: PushStream + [requirer: string]: Rx.Subject } = {} stage$ .forEach((log: StageLog) => { if (!stagePushStreamByRequirer[log.prefix]) { - stagePushStreamByRequirer[log.prefix] = new PushStream() + stagePushStreamByRequirer[log.prefix] = new Rx.Subject() if (!progessStatsPushStreamByRequirer[log.prefix]) { - progessStatsPushStreamByRequirer[log.prefix] = new PushStream() + progessStatsPushStreamByRequirer[log.prefix] = new Rx.Subject() } modulesInstallProgressPushStream.next({ - importingDone$: stage$ToImportingDone$(most.from(stagePushStreamByRequirer[log.prefix].observable)), - progress$: most.from(progessStatsPushStreamByRequirer[log.prefix].observable), + importingDone$: stage$ToImportingDone$(Rx.from(stagePushStreamByRequirer[log.prefix])), + progress$: Rx.from(progessStatsPushStreamByRequirer[log.prefix]), requirer: log.prefix, }) } - setTimeout(() => { // without this, `pnpm m i` might hang in some cases - stagePushStreamByRequirer[log.prefix].next(log) - if (log.stage === 'importing_done') { - progessStatsPushStreamByRequirer[log.prefix].complete() - stagePushStreamByRequirer[log.prefix].complete() - } - }, 0) + stagePushStreamByRequirer[log.prefix].next(log) + if (log.stage === 'importing_done') { + progessStatsPushStreamByRequirer[log.prefix].complete() + stagePushStreamByRequirer[log.prefix].complete() + } }) .catch(() => {}) - return most.from(modulesInstallProgressPushStream.observable) + return Rx.from(modulesInstallProgressPushStream) } -function stage$ToImportingDone$ (stage$: most.Stream) { +function stage$ToImportingDone$ (stage$: Rx.Observable) { return stage$ - .filter((log: StageLog) => log.stage === 'importing_done') - .constant(true) - .take(1) - .startWith(false) - .multicast() + .pipe( + filter((log: StageLog) => log.stage === 'importing_done'), + mapTo(true), + take(1), + startWith(false) + ) } -function getProgessStatsPushStreamByRequirer (progress$: most.Stream) { +function getProgessStatsPushStreamByRequirer (progress$: Rx.Observable) { const progessStatsPushStreamByRequirer: { - [requirer: string]: PushStream + [requirer: string]: Rx.Subject } = {} const previousProgressStatsByRequirer: { [requirer: string]: ProgressStats } = {} @@ -149,7 +148,7 @@ function getProgessStatsPushStreamByRequirer (progress$: most.Stream() + progessStatsPushStreamByRequirer[log.requester] = new Rx.Subject() } progessStatsPushStreamByRequirer[log.requester].next(previousProgressStatsByRequirer[log.requester]) }) @@ -158,10 +157,7 @@ function getProgessStatsPushStreamByRequirer (progress$: most.Stream + requestRetry$: Rx.Observable ) => { - return requestRetry$ - .map((log) => { + return requestRetry$.pipe( + map((log) => { const retriesLeft = log.maxRetries - log.attempt + 1 const errorCode = log.error['httpStatusCode'] || log.error['status'] || log.error['errno'] || log.error['code'] // eslint-disable-next-line @typescript-eslint/restrict-template-expressions const msg = `${log.method} ${log.url} error (${errorCode}). \ Will retry in ${prettyMilliseconds(log.timeout, { verbose: true })}. \ ${retriesLeft} retries left.` - return most.of({ msg: formatWarn(msg) }) + return Rx.of({ msg: formatWarn(msg) }) }) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportScope.ts b/packages/default-reporter/src/reporterForClient/reportScope.ts index b7aa4e19fe..adad0b7e72 100644 --- a/packages/default-reporter/src/reporterForClient/reportScope.ts +++ b/packages/default-reporter/src/reporterForClient/reportScope.ts @@ -1,5 +1,6 @@ import { ScopeLog } from '@pnpm/core-loggers' -import most = require('most') +import * as Rx from 'rxjs' +import { map, take } from 'rxjs/operators' const COMMANDS_THAT_REPORT_SCOPE = new Set([ 'install', @@ -14,21 +15,21 @@ const COMMANDS_THAT_REPORT_SCOPE = new Set([ ]) export default ( - scope$: most.Stream, + scope$: Rx.Observable, opts: { isRecursive: boolean cmd: string } ) => { if (!COMMANDS_THAT_REPORT_SCOPE.has(opts.cmd)) { - return most.never() + return Rx.NEVER } - return scope$ - .take(1) - .map((log) => { + return scope$.pipe( + take(1), + map((log) => { if (log.selected === 1 && typeof log.total !== 'number') { - if (!log.workspacePrefix) return most.never() - if (!opts.isRecursive) return most.of({ msg: 'Scope: current workspace package' }) + if (!log.workspacePrefix) return Rx.NEVER + if (!opts.isRecursive) return Rx.of({ msg: 'Scope: current workspace package' }) } let msg = 'Scope: ' @@ -47,6 +48,7 @@ export default ( msg += ' projects' } - return most.of({ msg }) + return Rx.of({ msg }) }) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportSkippedOptionalDependencies.ts b/packages/default-reporter/src/reporterForClient/reportSkippedOptionalDependencies.ts index 3075d63140..089f4d4ca1 100644 --- a/packages/default-reporter/src/reporterForClient/reportSkippedOptionalDependencies.ts +++ b/packages/default-reporter/src/reporterForClient/reportSkippedOptionalDependencies.ts @@ -1,18 +1,20 @@ import { SkippedOptionalDependencyLog } from '@pnpm/core-loggers' -import most = require('most') +import * as Rx from 'rxjs' +import { filter, map } from 'rxjs/operators' export default ( - skippedOptionalDependency$: most.Stream, + skippedOptionalDependency$: Rx.Observable, opts: { cwd: string } ) => { - return skippedOptionalDependency$ - .filter((log) => Boolean(log['prefix'] === opts.cwd && log.parents && log.parents.length === 0)) - .map((log) => most.of({ + return skippedOptionalDependency$.pipe( + filter((log) => Boolean(log['prefix'] === opts.cwd && log.parents && log.parents.length === 0)), + map((log) => Rx.of({ msg: `info: ${ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions log.package['id'] || log.package.name && (`${log.package.name}@${log.package.version}`) || log.package['pref'] } is an optional dependency and failed compatibility check. Excluding it from installation.`, })) + ) } diff --git a/packages/default-reporter/src/reporterForClient/reportStats.ts b/packages/default-reporter/src/reporterForClient/reportStats.ts index def2e797bf..e32ed506a0 100644 --- a/packages/default-reporter/src/reporterForClient/reportStats.ts +++ b/packages/default-reporter/src/reporterForClient/reportStats.ts @@ -1,18 +1,19 @@ import { StatsLog } from '@pnpm/core-loggers' -import { zoomOut } from './utils/zooming' +import * as Rx from 'rxjs' +import { filter, take, reduce, map } from 'rxjs/operators' import { EOL } from '../constants' import { ADDED_CHAR, REMOVED_CHAR, } from './outputConstants' +import { zoomOut } from './utils/zooming' import chalk = require('chalk') -import most = require('most') import R = require('ramda') import stringLength = require('string-length') export default ( log$: { - stats: most.Stream + stats: Rx.Observable }, opts: { cmd: string @@ -23,7 +24,7 @@ export default ( ) => { const stats$ = opts.isRecursive ? log$.stats - : log$.stats.filter((log) => log.prefix !== opts.cwd) + : log$.stats.pipe(filter((log) => log.prefix !== opts.cwd)) const outputs = [ statsForNotCurrentPackage(stats$, { @@ -45,32 +46,30 @@ export default ( } function statsForCurrentPackage ( - stats$: most.Stream, + stats$: Rx.Observable, opts: { cmd: string currentPrefix: string width: number } ) { - return most.fromPromise( - stats$ - .filter((log) => log.prefix === opts.currentPrefix) - .take((opts.cmd === 'install' || opts.cmd === 'install-test' || opts.cmd === 'add' || opts.cmd === 'update') ? 2 : 1) - .reduce((acc, log) => { - if (typeof log['added'] === 'number') { - acc['added'] = log['added'] - } else if (typeof log['removed'] === 'number') { - acc['removed'] = log['removed'] - } - return acc - }, {}) - ) - .map((stats) => { + return stats$.pipe( + filter((log) => log.prefix === opts.currentPrefix), + take((opts.cmd === 'install' || opts.cmd === 'install-test' || opts.cmd === 'add' || opts.cmd === 'update') ? 2 : 1), + reduce((acc, log) => { + if (typeof log['added'] === 'number') { + acc['added'] = log['added'] + } else if (typeof log['removed'] === 'number') { + acc['removed'] = log['removed'] + } + return acc + }, {}), + map((stats) => { if (!stats['removed'] && !stats['added']) { if (opts.cmd === 'link') { - return most.never() + return Rx.NEVER } - return most.of({ msg: 'Already up-to-date' }) + return Rx.of({ msg: 'Already up-to-date' }) } let msg = 'Packages:' @@ -83,22 +82,24 @@ function statsForCurrentPackage ( msg += ' ' + chalk.red(`-${stats['removed'].toString()}`) } msg += EOL + printPlusesAndMinuses(opts.width, (stats['added'] || 0), (stats['removed'] || 0)) - return most.of({ msg }) + return Rx.of({ msg }) }) + ) } function statsForNotCurrentPackage ( - stats$: most.Stream, + stats$: Rx.Observable, opts: { cmd: string currentPrefix: string width: number } ) { + const stats = {} const cookedStats$ = ( opts.cmd !== 'remove' - ? stats$ - .loop((stats, log) => { + ? stats$.pipe( + map((log) => { // As of pnpm v2.9.0, during `pnpm recursive link`, logging of removed stats happens twice // 1. during linking // 2. during installing @@ -115,14 +116,15 @@ function statsForNotCurrentPackage ( } else { const value = { ...stats[log.prefix], ...log } delete stats[log.prefix] - return { seed: stats, value } + return value } }, {}) + ) : stats$ ) - return cookedStats$ - .filter((stats) => stats !== null && (stats['removed'] || stats['added'])) - .map((stats) => { + return cookedStats$.pipe( + filter((stats) => stats !== null && (stats['removed'] || stats['added'])), + map((stats) => { const parts = [] as string[] if (stats['added']) { @@ -137,8 +139,9 @@ function statsForNotCurrentPackage ( let msg = zoomOut(opts.currentPrefix, stats['prefix'], parts.join(' ')) const rest = Math.max(0, opts.width - 1 - stringLength(msg)) msg += ' ' + printPlusesAndMinuses(rest, roundStats(stats['added'] || 0), roundStats(stats['removed'] || 0)) - return most.of({ msg }) + return Rx.of({ msg }) }) + ) } function padStep (s: string, step: number) { diff --git a/packages/default-reporter/src/reporterForClient/reportSummary.ts b/packages/default-reporter/src/reporterForClient/reportSummary.ts index 5d4875bfe6..f198d404b4 100644 --- a/packages/default-reporter/src/reporterForClient/reportSummary.ts +++ b/packages/default-reporter/src/reporterForClient/reportSummary.ts @@ -1,31 +1,32 @@ -import { Config } from '@pnpm/config' import { DeprecationLog, PackageManifestLog, RootLog, SummaryLog, } from '@pnpm/core-loggers' +import { Config } from '@pnpm/config' +import * as Rx from 'rxjs' +import { map, take } from 'rxjs/operators' import { EOL } from '../constants' -import { - ADDED_CHAR, - REMOVED_CHAR, -} from './outputConstants' import getPkgsDiff, { PackageDiff, propertyByDependencyType, } from './pkgsDiff' -import path = require('path') +import { + ADDED_CHAR, + REMOVED_CHAR, +} from './outputConstants' import chalk = require('chalk') -import most = require('most') +import path = require('path') import R = require('ramda') import semver = require('semver') export default ( log$: { - deprecation: most.Stream - summary: most.Stream - root: most.Stream - packageManifest: most.Stream + deprecation: Rx.Observable + summary: Rx.Observable + root: Rx.Observable + packageManifest: Rx.Observable }, opts: { cwd: string @@ -34,33 +35,33 @@ export default ( ) => { const pkgsDiff$ = getPkgsDiff(log$, { prefix: opts.cwd }) - const summaryLog$ = log$.summary - .take(1) + const summaryLog$ = log$.summary.pipe(take(1)) - return most.combine( - (pkgsDiff) => { - let msg = '' - for (const depType of ['prod', 'optional', 'peer', 'dev', 'nodeModulesOnly']) { - const diffs = R.values(pkgsDiff[depType]) - if (diffs.length) { - msg += EOL - if (opts.pnpmConfig?.global) { - msg += chalk.cyanBright(`${opts.cwd}:`) - } else { - msg += chalk.cyanBright(`${propertyByDependencyType[depType] as string}:`) - } - msg += EOL - msg += printDiffs(diffs, { prefix: opts.cwd }) - msg += EOL - } - } - return { msg } - }, + return Rx.combineLatest( pkgsDiff$, summaryLog$ ) - .take(1) - .map(most.of) + .pipe( + take(1), + map(([pkgsDiff]) => { + let msg = '' + for (const depType of ['prod', 'optional', 'peer', 'dev', 'nodeModulesOnly']) { + const diffs = R.values(pkgsDiff[depType]) + if (diffs.length) { + msg += EOL + if (opts.pnpmConfig?.global) { + msg += chalk.cyanBright(`${opts.cwd}:`) + } else { + msg += chalk.cyanBright(`${propertyByDependencyType[depType] as string}:`) + } + msg += EOL + msg += printDiffs(diffs, { prefix: opts.cwd }) + msg += EOL + } + } + return Rx.of({ msg }) + }) + ) } function printDiffs ( diff --git a/packages/default-reporter/src/reporterForServer.ts b/packages/default-reporter/src/reporterForServer.ts index eed8f4945f..6ac91b7f66 100644 --- a/packages/default-reporter/src/reporterForServer.ts +++ b/packages/default-reporter/src/reporterForServer.ts @@ -1,11 +1,11 @@ import { Config } from '@pnpm/config' import { Log } from '@pnpm/core-loggers' +import * as Rx from 'rxjs' import reportError from './reportError' import chalk = require('chalk') -import most = require('most') export default function ( - log$: most.Stream, + log$: Rx.Observable, config?: Config ) { log$.subscribe({ diff --git a/packages/default-reporter/test/index.ts b/packages/default-reporter/test/index.ts index 4b1a09f0fd..927d6c7c90 100644 --- a/packages/default-reporter/test/index.ts +++ b/packages/default-reporter/test/index.ts @@ -20,6 +20,7 @@ import './reportingLifecycleScripts' import './reportingProgress' import './reportingRequestRetry' import './reportingScope' +import { map, skip, take } from 'rxjs/operators' import chalk = require('chalk') import normalizeNewline = require('normalize-newline') import path = require('path') @@ -181,7 +182,7 @@ test('prints summary (of current package only)', t => { t.plan(1) - output$.skip(2).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(2), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -264,7 +265,7 @@ test('prints summary for global installation', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -308,7 +309,7 @@ test('prints added peer dependency', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -375,7 +376,7 @@ test('prints summary correctly when the same package is specified both in option t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -395,7 +396,7 @@ test('prints summary when some packages fail', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -462,7 +463,7 @@ test('prints info', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -483,7 +484,7 @@ test('prints added/removed stats during installation', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -506,7 +507,7 @@ test('prints added/removed stats during installation when 0 removed', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -529,7 +530,7 @@ test('prints only the added stats if nothing was removed', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -551,7 +552,7 @@ test('prints only the removed stats if nothing was added', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -574,7 +575,7 @@ test('prints only the added stats if nothing was removed and a lot added', t => t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -597,7 +598,7 @@ test('prints only the removed stats if nothing was added and a lot removed', t = t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -620,7 +621,7 @@ test('prints at least one remove sign when removed !== 0', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -644,7 +645,7 @@ test('prints at least one add sign when added !== 0', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -665,7 +666,7 @@ test('prints just removed during uninstallation', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -718,7 +719,7 @@ test('prints added/removed stats and warnings during recursive installation', t t.plan(1) - output$.skip(8).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(8), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -751,7 +752,7 @@ test('recursive installation: prints only the added stats if nothing was removed t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -775,7 +776,7 @@ test('recursive installation: prints only the removed stats if nothing was added t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -799,7 +800,7 @@ test('recursive installation: prints at least one remove sign when removed !== 0 t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -823,7 +824,7 @@ test('recursive installation: prints at least one add sign when added !== 0', t t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -846,7 +847,7 @@ test('recursive uninstall: prints removed packages number', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -873,7 +874,7 @@ test('install: print hook message', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -900,7 +901,7 @@ test('recursive: print hook message', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -934,7 +935,7 @@ test('prints skipped optional dependency info message', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -960,7 +961,7 @@ test('logLevel=default', t => { t.plan(1) - output$.skip(2).take(1).subscribe({ + output$.pipe(skip(2), take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -991,7 +992,7 @@ test('logLevel=warn', t => { t.plan(1) - output$.skip(1).take(1).subscribe({ + output$.pipe(skip(1), take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -1021,7 +1022,7 @@ test('logLevel=error', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -1053,7 +1054,7 @@ test('warnings are collapsed', t => { t.plan(1) - output$.skip(6).take(1).subscribe({ + output$.pipe(skip(6), take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/packages/default-reporter/test/reportingContext.ts b/packages/default-reporter/test/reportingContext.ts index 490c959308..33bf0a78cc 100644 --- a/packages/default-reporter/test/reportingContext.ts +++ b/packages/default-reporter/test/reportingContext.ts @@ -4,6 +4,7 @@ import { createStreamParser, } from '@pnpm/logger' import delay from 'delay' +import { take } from 'rxjs/operators' import test = require('tape') test('print context and import method info', (t) => { @@ -25,7 +26,7 @@ test('print context and import method info', (t) => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/packages/default-reporter/test/reportingErrors.ts b/packages/default-reporter/test/reportingErrors.ts index 60cdcb02a9..1b64854a29 100644 --- a/packages/default-reporter/test/reportingErrors.ts +++ b/packages/default-reporter/test/reportingErrors.ts @@ -3,6 +3,7 @@ import PnpmError from '@pnpm/error' import logger, { createStreamParser, } from '@pnpm/logger' +import { map, take } from 'rxjs/operators' import path = require('path') import chalk = require('chalk') import loadJsonFile = require('load-json-file') @@ -23,7 +24,7 @@ test('prints generic error', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -45,7 +46,7 @@ test('prints generic error when recursive install fails', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -64,7 +65,7 @@ test('prints no matching version error when many dist-tags exist', async (t) => t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -94,7 +95,7 @@ test('prints no matching version error when only the latest dist-tag exists', as t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -119,7 +120,7 @@ test('prints suggestions when an internet-connection related error happens', asy t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -156,7 +157,7 @@ test('prints test error', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -178,7 +179,7 @@ test('prints command error with exit code', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -201,7 +202,7 @@ test('prints command error without exit code', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -223,7 +224,7 @@ test('prints unsupported pnpm version error', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -255,7 +256,7 @@ test('prints unsupported Node version error', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -284,7 +285,7 @@ test('prints unsupported pnpm and Node versions error', async (t) => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -326,7 +327,7 @@ test('prints error even if the error object not passed in through the message ob t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -347,7 +348,7 @@ test('prints error without packages stacktrace when pkgsStack is empty', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -374,7 +375,7 @@ test('prints error with packages stacktrace - depth 1 and hint', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -408,7 +409,7 @@ test('prints error with packages stacktrace - depth 2', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -430,7 +431,7 @@ test('prints error and hint', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -463,7 +464,7 @@ test('prints authorization error with auth settings', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -496,7 +497,7 @@ test('prints authorization error without auth settings, where there are none', t t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/packages/default-reporter/test/reportingLifecycleScripts.ts b/packages/default-reporter/test/reportingLifecycleScripts.ts index b9200c6c16..90dcc01918 100644 --- a/packages/default-reporter/test/reportingLifecycleScripts.ts +++ b/packages/default-reporter/test/reportingLifecycleScripts.ts @@ -1,6 +1,7 @@ import { lifecycleLogger } from '@pnpm/core-loggers' import { toOutput$ } from '@pnpm/default-reporter' import { createStreamParser } from '@pnpm/logger' +import { map, skip, take } from 'rxjs/operators' import path = require('path') import chalk = require('chalk') import normalizeNewline = require('normalize-newline') @@ -114,7 +115,7 @@ test('groups lifecycle output', t => { t.plan(1) - output$.skip(9).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(9), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -235,7 +236,7 @@ test('groups lifecycle output when append-only is used', t => { const allOutputs = [] as string[] - output$.take(11).map(normalizeNewline).subscribe({ + output$.pipe(take(11), map(normalizeNewline)).subscribe({ complete: () => { t.equal(allOutputs.join(EOL), `\ ${chalk.cyan('packages/foo')} ${PREINSTALL}$ node foo @@ -355,7 +356,7 @@ test('groups lifecycle output when streamLifecycleOutput is used', t => { t.plan(1) - output$.skip(11).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(11), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -409,7 +410,7 @@ test('collapse lifecycle output when it has too many lines', t => { t.plan(1) - output$.skip(101).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(101), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -523,7 +524,7 @@ test('collapses lifecycle output of packages from node_modules', t => { t.plan(1) - output$.skip(5).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(5), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -569,7 +570,7 @@ test('output of failed optional dependency is not shown', t => { t.plan(1) - output$.skip(1).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(1), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -611,7 +612,7 @@ test('output of failed non-optional dependency is printed', t => { t.plan(1) - output$.skip(1).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(1), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: (output: string) => { @@ -671,7 +672,7 @@ test['skip']('prints lifecycle progress', t => { const childOutputColor = chalk.grey const childOutputError = chalk.red - output$.skip(3).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(3), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/packages/default-reporter/test/reportingProgress.ts b/packages/default-reporter/test/reportingProgress.ts index b23d2ffb5a..655611300c 100644 --- a/packages/default-reporter/test/reportingProgress.ts +++ b/packages/default-reporter/test/reportingProgress.ts @@ -9,9 +9,8 @@ import { toOutput$ } from '@pnpm/default-reporter' import logger, { createStreamParser, } from '@pnpm/logger' -import delay from 'delay' +import { map, skip, take } from 'rxjs/operators' import chalk = require('chalk') -import most = require('most') import normalizeNewline = require('normalize-newline') import test = require('tape') @@ -42,7 +41,7 @@ test('prints progress beginning', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -72,7 +71,7 @@ test('prints progress beginning of node_modules from not cwd', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -105,7 +104,7 @@ test('prints progress beginning when appendOnly is true', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -138,7 +137,7 @@ test('prints progress beginning during recursive install', t => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -159,7 +158,7 @@ test('prints progress on first download', async t => { streamParser: createStreamParser(), }) - output$.skip(1).take(1).subscribe({ + output$.pipe(skip(1), take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -179,8 +178,6 @@ test('prints progress on first download', async t => { status: 'resolved', }) - await delay(10) - progressLogger.debug({ packageId, requester: '/src/project', @@ -200,7 +197,7 @@ test('moves fixed line to the end', async t => { streamParser: createStreamParser(), }) - output$.skip(3).take(1).map(normalizeNewline).subscribe({ + output$.pipe(skip(3), take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -221,8 +218,6 @@ test('moves fixed line to the end', async t => { status: 'resolved', }) - await delay(10) - progressLogger.debug({ packageId, requester: prefix, @@ -230,8 +225,6 @@ test('moves fixed line to the end', async t => { }) logger.warn({ message: 'foo', prefix }) - await delay(10) // w/o delay warning goes below for some reason. Started to happen after switch to most - stageLogger.debug({ prefix: prefix, stage: 'resolution_done', @@ -255,7 +248,7 @@ test('prints "Already up-to-date"', t => { t.plan(1) - output$.take(1).map(normalizeNewline).subscribe({ + output$.pipe(take(1), map(normalizeNewline)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -267,8 +260,7 @@ test('prints "Already up-to-date"', t => { test('prints progress of big files download', async t => { t.plan(6) - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion - let output$ = toOutput$({ + const output$ = toOutput$({ context: { argv: ['install'], config: { dir: '/src/project' } as Config, @@ -276,66 +268,48 @@ test('prints progress of big files download', async t => { reportingOptions: { throttleProgress: 0 }, streamParser: createStreamParser(), }) - .map(normalizeNewline) as most.Stream - const stream$: Array> = [] const pkgId1 = 'registry.npmjs.org/foo/1.0.0' const pkgId2 = 'registry.npmjs.org/bar/2.0.0' const pkgId3 = 'registry.npmjs.org/qar/3.0.0' - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}`)) - ) - - output$ = output$.skip(1) - - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `\ + output$.pipe( + map(normalizeNewline), + map((output, index) => { + switch (index) { + case 0: + t.equal(output, `Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')}`) + return + case 1: + t.equal(output, `\ Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')} -Downloading ${hlPkgId(pkgId1)}: ${hlValue('0 B')}/${hlValue('10.5 MB')}`)) - ) - - output$ = output$.skip(1) - - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `\ +Downloading ${hlPkgId(pkgId1)}: ${hlValue('0 B')}/${hlValue('10.5 MB')}`) + return + case 2: + t.equal(output, `\ Resolving: total ${hlValue('1')}, reused ${hlValue('0')}, downloaded ${hlValue('0')} -Downloading ${hlPkgId(pkgId1)}: ${hlValue('5.77 MB')}/${hlValue('10.5 MB')}`)) - ) - - output$ = output$.skip(2) - - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `\ +Downloading ${hlPkgId(pkgId1)}: ${hlValue('5.77 MB')}/${hlValue('10.5 MB')}`) + return + case 4: + t.equal(output, `\ Resolving: total ${hlValue('2')}, reused ${hlValue('0')}, downloaded ${hlValue('0')} -Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')}`, 'downloading of small package not reported')) - ) - - output$ = output$.skip(3) - - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `\ +Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')}`, 'downloading of small package not reported') + return + case 7: + t.equal(output, `\ Resolving: total ${hlValue('3')}, reused ${hlValue('0')}, downloaded ${hlValue('0')} Downloading ${hlPkgId(pkgId1)}: ${hlValue('7.34 MB')}/${hlValue('10.5 MB')} -Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) - ) - - output$ = output$.skip(1) - - stream$.push( - output$.take(1) - .tap(output => t.equal(output, `\ +Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`) + return + case 8: + t.equal(output, `\ Downloading ${hlPkgId(pkgId1)}: ${hlValue('10.5 MB')}/${hlValue('10.5 MB')}, done Resolving: total ${hlValue('3')}, reused ${hlValue('0')}, downloaded ${hlValue('0')} -Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) +Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`) + return // eslint-disable-line + } + }) ) - - most.mergeArray(stream$) .subscribe({ complete: () => t.end(), error: t.end, @@ -353,8 +327,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) status: 'resolved', }) - await delay(10) - fetchingProgressLogger.debug({ attempt: 1, packageId: pkgId1, @@ -362,8 +334,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) status: 'started', }) - await delay(10) - fetchingProgressLogger.debug({ downloaded: 1024 * 1024 * 5.5, // 5.5 MB packageId: pkgId1, @@ -376,8 +346,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) status: 'resolved', }) - await delay(10) - fetchingProgressLogger.debug({ attempt: 1, packageId: pkgId1, @@ -404,8 +372,6 @@ Downloading ${hlPkgId(pkgId3)}: ${hlValue('19.9 MB')}/${hlValue('21 MB')}`)) status: 'started', }) - await delay(10) - fetchingProgressLogger.debug({ downloaded: 1024 * 1024 * 19, // 19 MB packageId: pkgId3, diff --git a/packages/default-reporter/test/reportingRequestRetry.ts b/packages/default-reporter/test/reportingRequestRetry.ts index 04d0f610ca..8b77d8315d 100644 --- a/packages/default-reporter/test/reportingRequestRetry.ts +++ b/packages/default-reporter/test/reportingRequestRetry.ts @@ -3,6 +3,7 @@ import { toOutput$ } from '@pnpm/default-reporter' import { createStreamParser, } from '@pnpm/logger' +import { take } from 'rxjs/operators' import chalk = require('chalk') import test = require('tape') @@ -27,7 +28,7 @@ test('print warning about request retry', (t) => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/packages/default-reporter/test/reportingScope.ts b/packages/default-reporter/test/reportingScope.ts index 8f9d76307c..850ba14d82 100644 --- a/packages/default-reporter/test/reportingScope.ts +++ b/packages/default-reporter/test/reportingScope.ts @@ -3,6 +3,7 @@ import { toOutput$ } from '@pnpm/default-reporter' import logger, { createStreamParser, } from '@pnpm/logger' +import { take } from 'rxjs/operators' import test = require('tape') const scopeLogger = logger('scope') @@ -22,7 +23,7 @@ test('prints scope of non-recursive install in a workspace', (t) => { t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -48,7 +49,7 @@ test('prints scope of recursive install in a workspace when not all packages are t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -74,7 +75,7 @@ test('prints scope of recursive install in a workspace when all packages are sel t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -99,7 +100,7 @@ test('prints scope of recursive install not in a workspace when not all packages t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { @@ -124,7 +125,7 @@ test('prints scope of recursive install not in a workspace when all packages are t.plan(1) - output$.take(1).subscribe({ + output$.pipe(take(1)).subscribe({ complete: () => t.end(), error: t.end, next: output => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1b26a64faf..e4a1560784 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,16 +258,15 @@ importers: '@pnpm/core-loggers': 'link:../core-loggers' '@pnpm/error': 'link:../error' '@pnpm/types': 'link:../types' - '@zkochan/zen-push': 1.0.0 ansi-diff: 1.1.1 chalk: 4.1.0 - most: 1.8.1_most@1.8.1 normalize-path: 3.0.0 pretty-bytes: 5.4.1 pretty-ms: 7.0.0 pretty-time: 1.1.0 ramda: 0.27.1 right-pad: 1.0.1 + rxjs: 6.6.3 semver: 7.3.2 stacktracey: 1.2.127 string-length: 4.0.1 @@ -294,13 +293,11 @@ importers: '@types/pretty-time': ^1.1.0 '@types/ramda': ^0.27.15 '@types/semver': ^7.3.3 - '@zkochan/zen-push': 1.0.0 ansi-diff: ^1.1.1 chalk: ^4.1.0 delay: ^4.4.0 ghooks: 2.0.4 load-json-file: ^6.2.0 - most: ^1.8.1 normalize-newline: 3.0.0 normalize-path: ^3.0.0 pretty-bytes: ^5.4.1 @@ -308,6 +305,7 @@ importers: pretty-time: ^1.1.0 ramda: ^0.27.1 right-pad: ^1.0.1 + rxjs: ^6.6.3 semver: ^7.3.2 stacktracey: ^1.2.127 string-length: ^4.0.1 @@ -3450,19 +3448,6 @@ packages: dev: true resolution: integrity: sha512-J6VClfQSVgR6958eIDTGjfdCrELy1eT+SHeoSMomnvRQVktZMnEA5edIr5ovRFNw5y+Bk/jyoevPzGYod96mhw== - /@most/multicast/1.3.0_most@1.8.1: - dependencies: - '@most/prelude': 1.7.3 - most: 1.8.1_most@1.8.1 - dev: false - peerDependencies: - most: ^1.0.1 - resolution: - integrity: sha512-DWH8AShgp5bXn+auGzf5tzPxvpmEvQJd0CNsApOci1LDF4eAEcnw4HQOr2Jaa+L92NbDYFKBSXxll+i7r1ikvw== - /@most/prelude/1.7.3: - dev: false - resolution: - integrity: sha512-qWWEnA22UP1lzFfKx75XMut6DUUXGRKe7qv2k+Bgs7ju8lwb5RjsZYyQZ+VcsYvHcIavHKzseLlBMLOe2CvUZw== /@mrmlnc/readdir-enhanced/2.2.1: dependencies: call-me-maybe: 1.0.1 @@ -3912,10 +3897,6 @@ packages: dev: true resolution: integrity: sha512-o5Pl/qux8JEuFpof5fUg/Fl6R3N26ExJlsmWpB67ayrEJDMIxWdM9NDJacisXeNB7YW+vij8onctH8Pr1Zhi5g== - /@types/zen-observable/0.8.1: - dev: false - resolution: - integrity: sha512-wmk0xQI6Yy7Fs/il4EpOcflG4uonUpYGqvZARESLc2oy4u69fkatFLbJOeW4Q6awO15P4rduAe6xkwHevpXcUQ== /@typescript-eslint/eslint-plugin/4.1.0: dependencies: '@typescript-eslint/experimental-utils': 4.1.0 @@ -4148,15 +4129,6 @@ packages: node: '>=8.15' resolution: integrity: sha512-uWMEF7fdc6C3VGTaW6Z9G9rYS41ulS0Lz+a3lGlDGji42kI6FSVVLI9s8bZ4ZR4l4Hs28MJHHVN8cOqvNlN86w== - /@zkochan/zen-push/1.0.0: - dependencies: - '@types/zen-observable': 0.8.1 - zen-observable: 0.8.15 - dev: false - engines: - node: '>=6' - resolution: - integrity: sha512-nHBmtlomBTExmjq9VlSqh33sJI08L4n6g4zkzSh8cHBz+J5uscuXLeLTAZwo9/6W6BngC2z+4G7RKDFgAtB0Fw== /JSONStream/1.3.5: dependencies: jsonparse: 1.3.1 @@ -9214,16 +9186,6 @@ packages: optional: true resolution: integrity: sha512-al0MUK7cpIcglMv3YF13qSgdAIqxHTO7brRtaz3DlSULbqfazqkc5kEjNrLDOM7fsjshoFIihnU8snrP7zUvhQ== - /most/1.8.1_most@1.8.1: - dependencies: - '@most/multicast': 1.3.0_most@1.8.1 - '@most/prelude': 1.7.3 - symbol-observable: 1.2.0 - dev: false - peerDependencies: - most: '*' - resolution: - integrity: sha512-sN3lHtEvVNq5ay/pT93fKQ2XVW2uwbUypSWGt/DYCxGKuyvxaNd8mProK0M4k6irqeh1WD3pWrg4RdrCPohqDQ== /ms/2.0.0: resolution: integrity: sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g= @@ -11104,6 +11066,14 @@ packages: dev: true resolution: integrity: sha1-Gc5QLKVyZl87ZHsQk5+X/RYV8QI= + /rxjs/6.6.3: + dependencies: + tslib: 1.13.0 + dev: false + engines: + npm: '>=2.0.0' + resolution: + integrity: sha512-trsQc+xYYXZ3urjOiJOuCOa5N3jAZ3eiSpQB5hIT8zGlL2QfnHLJ2r7GMkBGuIausdJN1OneaI6gQlsqNHHmZQ== /safe-buffer/5.1.2: resolution: integrity: sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== @@ -11797,12 +11767,6 @@ packages: node: '>=8' resolution: integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw== - /symbol-observable/1.2.0: - dev: false - engines: - node: '>=0.10.0' - resolution: - integrity: sha512-e900nM8RRtGhlV36KGEU9k65K3mPb1WV70OdjfxlG2EAuM1noi/E/BaW/uMhL7bPEssK8QV57vN3esixjUvcXQ== /symbol-tree/3.2.4: dev: true resolution: @@ -12989,7 +12953,3 @@ packages: node: '>=6' resolution: integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q== - /zen-observable/0.8.15: - dev: false - resolution: - integrity: sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ==