From 0dc770aa7bdfa9dbe12ff9b2ca9e29c34b28031d Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Sun, 7 Jun 2026 23:27:15 +0200 Subject: [PATCH] perf(pacquet): share virtual-store slot linking pass (#12251) Refs: pnpm/pnpm#12250 - share warm/cold virtual-store slot linking through one parallel helper - emit structured pacquet install phase metrics for virtual-store partition sizes and link-slot elapsed time - generate integrated-benchmark diagnostics artifacts and use them for the fresh pnpr cold-batch and pnpr-vs-direct guardrails - split client registry latency/bandwidth from pnpr server registry latency, while rewriting server-origin tarball URLs back to the measured client registry path - keep the benchmark PR comment focused on scenario tables and collapsed raw JSON; diagnostics stay available as artifacts instead of inline report noise --- .../pacquet-integrated-benchmark.yml | 8 +- Cargo.lock | 1 + pacquet/crates/cli/src/cli_args/install.rs | 130 ++++- .../crates/cli/src/cli_args/install/tests.rs | 88 ++- pacquet/crates/diagnostics/Cargo.toml | 2 +- .../crates/diagnostics/src/local_tracing.rs | 16 +- .../src/create_virtual_dir_by_snapshot.rs | 10 +- .../create_virtual_dir_by_snapshot/tests.rs | 77 ++- .../src/create_virtual_store.rs | 242 ++++++--- .../src/create_virtual_store/tests.rs | 121 ++++- .../src/install_frozen_lockfile.rs | 9 + .../src/install_package_by_snapshot.rs | 7 + .../src/install_package_by_snapshot/tests.rs | 3 + .../src/install_with_fresh_lockfile.rs | 2 + pacquet/tasks/integrated-benchmark/Cargo.toml | 1 + .../integrated-benchmark/src/cli_args.rs | 58 +- .../tasks/integrated-benchmark/src/main.rs | 4 +- .../integrated-benchmark/src/work_env.rs | 512 ++++++++++++++++-- .../src/work_env/tests.rs | 206 +++++++ 19 files changed, 1337 insertions(+), 160 deletions(-) create mode 100644 pacquet/tasks/integrated-benchmark/src/work_env/tests.rs diff --git a/.github/workflows/pacquet-integrated-benchmark.yml b/.github/workflows/pacquet-integrated-benchmark.yml index 2ce884f341..d689401a6d 100644 --- a/.github/workflows/pacquet-integrated-benchmark.yml +++ b/.github/workflows/pacquet-integrated-benchmark.yml @@ -329,7 +329,7 @@ jobs: ( echo '## Integrated-Benchmark Report (${{ runner.os }})' echo - echo 'Each scenario has pacquet rows (direct install) and pnpr rows (the same client through the pnpr install accelerator), so pnpr@HEAD vs pacquet@HEAD is the pnpr-vs-direct ratio. Cold-store scenarios wipe the client store between runs (warm server); hot-store scenarios keep it warm. The pacquet@HEAD rows feed the pacquet Bencher testbed; the pnpr@HEAD rows feed the pnpr testbed.' + echo 'Each scenario reports direct installs and pnpr installs. Bencher consumes pacquet@HEAD and pnpr@HEAD.' echo echo '### Scenario: Isolated linker: fresh restore, cold cache + cold store' echo @@ -381,8 +381,6 @@ jobs: echo echo '### Scenario: Isolated linker: fresh install, cold cache + hot store' echo - echo 'Resolution-only: cold packument cache (full re-resolve over the registry link) with a hot store (no tarball download), so this isolates pnpr offloading the client resolution to its warm server.' - echo cat bench-work-env/BENCHMARK_REPORT_ISOLATED_FRESH_INSTALL_COLD_CACHE_HOT_STORE.md echo echo '
BENCHMARK_REPORT.json' @@ -399,9 +397,9 @@ jobs: # Bencher's shell_hyperfine adapter accepts, one per testbed. For # each, keep only the chosen target's result from each scenario # and rename `.command` to the scenario name, so Bencher names the - # benchmark after the scenario. Both tools run all four scenarios, + # benchmark after the scenario. Both tools run all five scenarios, # so the `pacquet` testbed gets `pacquet@HEAD` and the `pnpr` - # testbed gets `pnpr@HEAD`, each across the same four scenarios. + # testbed gets `pnpr@HEAD`, each across the same scenario set. shell: bash run: | # Build one bencher-results file: $1 = output path, $2 = the diff --git a/Cargo.lock b/Cargo.lock index f172959948..a700f93eb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3145,6 +3145,7 @@ dependencies = [ "reqwest 0.13.4", "serde", "serde-saphyr", + "serde_json", "tokio", "which 8.0.2", ] diff --git a/pacquet/crates/cli/src/cli_args/install.rs b/pacquet/crates/cli/src/cli_args/install.rs index 96df264a97..35d35a04c9 100644 --- a/pacquet/crates/cli/src/cli_args/install.rs +++ b/pacquet/crates/cli/src/cli_args/install.rs @@ -2,12 +2,15 @@ use crate::{State, cli_args::supported_architectures::SupportedArchitecturesArgs use clap::{Args, ValueEnum}; use miette::Context; use pacquet_config::NodeLinker; -use pacquet_lockfile::Lockfile; +use pacquet_lockfile::{Lockfile, LockfileResolution}; use pacquet_package_manager::{Install, TarballPrefetcher, UpdateSeedPolicy}; use pacquet_package_manifest::DependencyGroup; use pacquet_pnpr_client::{PnprClient, PnprClientError, ResolveOptions}; use pacquet_reporter::Reporter; +const BENCHMARK_PNPR_SERVER_REGISTRY_ENV: &str = "PACQUET_BENCHMARK_PNPR_SERVER_REGISTRY"; +const BENCHMARK_PNPR_TARBALL_REWRITE_FROM_ENV: &str = "PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM"; + /// `--node-linker` value parser. CLI mirror of /// [`pacquet_config::NodeLinker`] so the config crate stays free /// of `clap` as a dependency. Converted to the canonical enum at @@ -461,6 +464,11 @@ async fn install_via_pnpr( .map(serde_json::to_value) .transpose() .map_err(|err| miette::miette!("failed to serialize overrides: {err}"))?; + let benchmark_registry_override = + PnprBenchmarkRegistryOverride::from_env(&state.config.registry); + let resolve_registry = benchmark_registry_override + .as_ref() + .map_or_else(|| state.config.registry.clone(), |registry| registry.resolve_registry()); // Send the on-disk lockfile + the full client policy so the server // verifies the input lockfile under *our* policy before resolving; @@ -474,7 +482,7 @@ async fn install_via_pnpr( dependencies, dev_dependencies, optional_dependencies, - registry: state.config.registry.clone(), + registry: resolve_registry, named_registries: state.config.named_registries.clone(), // Forward the whole credential map: the registries a graph // touches aren't known up front (scope-routed or tarball-URL @@ -532,13 +540,17 @@ async fn install_via_pnpr( Some(prefetcher) => { client .resolve_streaming(opts, |pkg| { - prefetcher.prefetch(pkg.id, pkg.tarball, &pkg.integrity); + let tarball = benchmark_registry_override.as_ref().map_or_else( + || pkg.tarball.clone(), + |registry| registry.client_tarball_url(&pkg.tarball), + ); + prefetcher.prefetch(pkg.id, tarball, &pkg.integrity); }) .await } None => client.resolve(opts).await, }; - let outcome = match result { + let mut outcome = match result { Ok(outcome) => outcome, // The server rejected the input lockfile under our policy. // Surface the reconstructed `VerifyError` so the abort + the @@ -551,6 +563,9 @@ async fn install_via_pnpr( .wrap_err("resolving dependencies via the pnpr server"); } }; + if let Some(registry) = benchmark_registry_override.as_ref() { + registry.rewrite_lockfile(&mut outcome.lockfile); + } if state.config.lockfile { outcome @@ -613,5 +628,112 @@ async fn install_via_pnpr( Ok(()) } +struct PnprBenchmarkRegistryOverride { + resolve_registry: String, + tarball_rewrite: Option, +} + +impl PnprBenchmarkRegistryOverride { + /// Benchmark-only hook for `pacquet/tasks/integrated-benchmark`. + /// + /// The benchmark runs release-built pacquet and pnpr binaries, so this + /// cannot be hidden behind `#[cfg(test)]`. Keep every + /// `PACQUET_BENCHMARK_*` env read in this type: normal pnpr installs + /// take one no-op branch, while benchmark runs can ask the pnpr server + /// to resolve against a server-side registry URL and then rewrite + /// server-origin tarball URLs back to the client-facing registry. The + /// rewrite is applied before saving the lockfile because the benchmark's + /// frozen materialization must use the same client-registry path that + /// direct installs pay for. + fn from_env(client_registry: &str) -> Option { + let resolve_registry = std::env::var(BENCHMARK_PNPR_SERVER_REGISTRY_ENV) + .ok() + .filter(|registry| !registry.is_empty()) + .map(|registry| normalize_registry(®istry))?; + let tarball_rewrite_from = std::env::var(BENCHMARK_PNPR_TARBALL_REWRITE_FROM_ENV) + .ok() + .filter(|registry| !registry.is_empty()); + let tarball_rewrite = BenchmarkRegistryRewrite::new( + [Some(resolve_registry.as_str()), tarball_rewrite_from.as_deref()] + .into_iter() + .flatten(), + client_registry, + ); + Some(Self { resolve_registry, tarball_rewrite }) + } + + fn resolve_registry(&self) -> String { + self.resolve_registry.clone() + } + + fn client_tarball_url(&self, url: &str) -> String { + self.tarball_rewrite.as_ref().map_or_else(|| url.to_string(), |rewrite| rewrite.url(url)) + } + + fn rewrite_lockfile(&self, lockfile: &mut Lockfile) { + let Some(rewrite) = self.tarball_rewrite.as_ref() else { return }; + let Some(packages) = lockfile.packages.as_mut() else { return }; + for metadata in packages.values_mut() { + rewrite_resolution_registry(&mut metadata.resolution, rewrite); + } + } +} + +struct BenchmarkRegistryRewrite { + from: Vec, + to: String, +} + +impl BenchmarkRegistryRewrite { + pub(super) fn new(from: Registries, to: &str) -> Option + where + Registry: AsRef, + Registries: IntoIterator, + { + let to = normalize_registry(to); + let mut from_registries = Vec::new(); + for registry in from { + let registry = normalize_registry(registry.as_ref()); + if registry != to && !from_registries.contains(®istry) { + from_registries.push(registry); + } + } + (!from_registries.is_empty()).then_some(Self { from: from_registries, to }) + } + + pub(super) fn url(&self, url: &str) -> String { + self.from + .iter() + .find_map(|from| url.strip_prefix(from)) + .map_or_else(|| url.to_string(), |suffix| format!("{}{}", self.to, suffix)) + } +} + +fn normalize_registry(registry: &str) -> String { + if registry.ends_with('/') { registry.to_string() } else { format!("{registry}/") } +} + +fn rewrite_resolution_registry( + resolution: &mut LockfileResolution, + rewrite: &BenchmarkRegistryRewrite, +) { + match resolution { + LockfileResolution::Tarball(resolution) => { + resolution.tarball = rewrite.url(&resolution.tarball); + } + LockfileResolution::Binary(resolution) => { + resolution.url = rewrite.url(&resolution.url); + } + LockfileResolution::Variations(resolution) => { + for variant in &mut resolution.variants { + rewrite_resolution_registry(&mut variant.resolution, rewrite); + } + } + LockfileResolution::Directory(_) + | LockfileResolution::Git(_) + | LockfileResolution::Registry(_) => {} + } +} + #[cfg(test)] mod tests; diff --git a/pacquet/crates/cli/src/cli_args/install/tests.rs b/pacquet/crates/cli/src/cli_args/install/tests.rs index d0bb740aba..21124bb009 100644 --- a/pacquet/crates/cli/src/cli_args/install/tests.rs +++ b/pacquet/crates/cli/src/cli_args/install/tests.rs @@ -1,6 +1,10 @@ -use super::{InstallArgs, InstallDependencyOptions, NodeLinkerArg}; +use super::{ + BenchmarkRegistryRewrite, InstallArgs, InstallDependencyOptions, NodeLinkerArg, + PnprBenchmarkRegistryOverride, rewrite_resolution_registry, +}; use clap::Parser; use pacquet_config::NodeLinker; +use pacquet_lockfile::{LockfileResolution, TarballResolution}; use pacquet_package_manifest::DependencyGroup; use pipe_trait::Pipe; use pretty_assertions::assert_eq; @@ -205,3 +209,85 @@ fn node_linker_arg_into_config_matches_every_variant() { } } } + +#[test] +fn registry_rewrite_replaces_only_the_configured_registry_prefix() { + let rewrite = BenchmarkRegistryRewrite::new( + ["http://server-registry.test"], + "http://client-registry.test", + ) + .expect("different registries create a rewrite"); + + assert_eq!( + rewrite.url("http://server-registry.test/foo/-/foo-1.0.0.tgz"), + "http://client-registry.test/foo/-/foo-1.0.0.tgz", + ); + assert_eq!( + rewrite.url("http://other-registry.test/foo/-/foo-1.0.0.tgz"), + "http://other-registry.test/foo/-/foo-1.0.0.tgz", + ); +} + +#[test] +fn registry_rewrite_accepts_multiple_server_registry_prefixes() { + let rewrite = BenchmarkRegistryRewrite::new( + ["http://server-proxy.test", "http://server-registry.test"], + "http://client-registry.test", + ) + .expect("different registries create a rewrite"); + + assert_eq!( + rewrite.url("http://server-proxy.test/foo/-/foo-1.0.0.tgz"), + "http://client-registry.test/foo/-/foo-1.0.0.tgz", + ); + assert_eq!( + rewrite.url("http://server-registry.test/foo/-/foo-1.0.0.tgz"), + "http://client-registry.test/foo/-/foo-1.0.0.tgz", + ); +} + +#[test] +fn registry_rewrite_is_none_for_equal_registries_after_normalization() { + assert!( + BenchmarkRegistryRewrite::new(["http://registry.test"], "http://registry.test/").is_none(), + ); +} + +#[test] +fn registry_rewrite_updates_explicit_tarball_resolution_urls() { + let rewrite = BenchmarkRegistryRewrite::new( + ["http://server-registry.test"], + "http://client-registry.test", + ) + .expect("different registries create a rewrite"); + let mut resolution = LockfileResolution::Tarball(TarballResolution { + tarball: "http://server-registry.test/foo/-/foo-1.0.0.tgz".to_string(), + integrity: None, + git_hosted: None, + path: None, + }); + + rewrite_resolution_registry(&mut resolution, &rewrite); + + let LockfileResolution::Tarball(resolution) = resolution else { + panic!("resolution stays tarball"); + }; + assert_eq!(resolution.tarball, "http://client-registry.test/foo/-/foo-1.0.0.tgz"); +} + +#[test] +fn pnpr_benchmark_override_keeps_resolve_registry_separate_from_tarball_rewrite() { + let override_ = PnprBenchmarkRegistryOverride { + resolve_registry: "http://server-proxy.test/".to_string(), + tarball_rewrite: BenchmarkRegistryRewrite::new( + ["http://server-proxy.test", "http://server-registry.test"], + "http://client-registry.test", + ), + }; + + assert_eq!(override_.resolve_registry(), "http://server-proxy.test/"); + assert_eq!( + override_.client_tarball_url("http://server-registry.test/foo/-/foo-1.0.0.tgz"), + "http://client-registry.test/foo/-/foo-1.0.0.tgz", + ); +} diff --git a/pacquet/crates/diagnostics/Cargo.toml b/pacquet/crates/diagnostics/Cargo.toml index cda06b6732..f1d041e2a0 100644 --- a/pacquet/crates/diagnostics/Cargo.toml +++ b/pacquet/crates/diagnostics/Cargo.toml @@ -13,7 +13,7 @@ repository.workspace = true [dependencies] miette = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["json"] } [lints] workspace = true diff --git a/pacquet/crates/diagnostics/src/local_tracing.rs b/pacquet/crates/diagnostics/src/local_tracing.rs index 04eb9740d7..8d8f2639f7 100644 --- a/pacquet/crates/diagnostics/src/local_tracing.rs +++ b/pacquet/crates/diagnostics/src/local_tracing.rs @@ -8,11 +8,17 @@ pub fn enable_tracing_by_env() { use tracing_subscriber::{fmt, prelude::*}; let layer = common_layer(&trace_var); - - tracing_subscriber::registry() - .with(layer) - .with(fmt::layer().pretty().with_file(true).with_span_events(FmtSpan::CLOSE)) - .init(); + if std::env::var("TRACE_FORMAT").is_ok_and(|format| format == "json") { + tracing_subscriber::registry() + .with(layer) + .with(fmt::layer().json().flatten_event(true)) + .init(); + } else { + tracing_subscriber::registry() + .with(layer) + .with(fmt::layer().pretty().with_file(true).with_span_events(FmtSpan::CLOSE)) + .init(); + } tracing::trace!("enable_tracing_by_env"); } diff --git a/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot.rs b/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot.rs index ee6033f58b..6c7d4beb24 100644 --- a/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot.rs +++ b/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot.rs @@ -55,6 +55,8 @@ pub struct CreateVirtualDirBySnapshot<'a> { /// `linkAllModules` at /// . pub skipped: &'a SkippedSnapshots, + #[cfg(test)] + pub(crate) link_concurrency_probe: Option<&'a tests::LinkConcurrencyProbe>, } /// Error type of [`CreateVirtualDirBySnapshot`]. @@ -88,8 +90,14 @@ impl<'a> CreateVirtualDirBySnapshot<'a> { package_key, snapshot, skipped, + #[cfg(test)] + link_concurrency_probe, } = self; + #[cfg(test)] + let _link_concurrency_guard = + link_concurrency_probe.map(tests::LinkConcurrencyProbe::enter); + let virtual_node_modules_dir = layout.slot_dir(package_key).join("node_modules"); fs::create_dir_all(&virtual_node_modules_dir).map_err(|error| { CreateVirtualDirError::CreateNodeModulesDir { @@ -174,4 +182,4 @@ pub(crate) fn optimistic_wire_method(method: PackageImportMethod) -> WireImportM } #[cfg(test)] -mod tests; +pub(crate) mod tests; diff --git a/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot/tests.rs b/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot/tests.rs index a17cf764f6..002cf1a0ed 100644 --- a/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot/tests.rs +++ b/pacquet/crates/package-manager/src/create_virtual_dir_by_snapshot/tests.rs @@ -7,10 +7,84 @@ use pacquet_reporter::{ use std::{ collections::HashMap, path::Path, - sync::{Mutex, atomic::AtomicU8}, + sync::{ + Condvar, Mutex, + atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}, + }, + time::Duration, }; use tempfile::tempdir; +pub(crate) struct LinkConcurrencyProbe { + current: AtomicUsize, + max: AtomicUsize, + wait_for_overlap: bool, + wait_started: AtomicBool, + mutex: Mutex<()>, + condvar: Condvar, +} + +impl LinkConcurrencyProbe { + pub(crate) fn waiting_for_overlap() -> Self { + Self { wait_for_overlap: true, ..Self::default() } + } + + pub(crate) fn max_concurrent(&self) -> usize { + self.max.load(Ordering::SeqCst) + } + + pub(super) fn enter(&self) -> LinkConcurrencyGuard<'_> { + let current = self.current.fetch_add(1, Ordering::SeqCst) + 1; + let mut max = self.max.load(Ordering::SeqCst); + while current > max { + match self.max.compare_exchange_weak(max, current, Ordering::SeqCst, Ordering::SeqCst) { + Ok(_) => { + self.condvar.notify_all(); + break; + } + Err(next) => max = next, + } + } + + if self.wait_for_overlap && current == 1 && !self.wait_started.swap(true, Ordering::SeqCst) + { + let guard = self.mutex.lock().expect("lock link-concurrency probe"); + let _ = self + .condvar + .wait_timeout_while(guard, Duration::from_secs(2), |_| { + self.max.load(Ordering::SeqCst) < 2 + }) + .expect("wait for overlapping link"); + } + + LinkConcurrencyGuard { probe: self } + } +} + +impl Default for LinkConcurrencyProbe { + fn default() -> Self { + Self { + current: AtomicUsize::new(0), + max: AtomicUsize::new(0), + wait_for_overlap: false, + wait_started: AtomicBool::new(false), + mutex: Mutex::new(()), + condvar: Condvar::new(), + } + } +} + +pub(super) struct LinkConcurrencyGuard<'a> { + probe: &'a LinkConcurrencyProbe, +} + +impl Drop for LinkConcurrencyGuard<'_> { + fn drop(&mut self) { + self.probe.current.fetch_sub(1, Ordering::SeqCst); + self.probe.condvar.notify_all(); + } +} + /// `optimistic_wire_method` is the source of truth for the /// configured-method → wire-method mapping the `imported` event /// reports. `Auto` and `CloneOrCopy` collapse to `Clone` (the @@ -75,6 +149,7 @@ async fn run_emits_imported_event_after_import_indexed_dir() { package_key: &package_key, snapshot: &snapshot, skipped: &skipped, + link_concurrency_probe: None, } .run::() .expect("empty-cas-paths run should succeed"); diff --git a/pacquet/crates/package-manager/src/create_virtual_store.rs b/pacquet/crates/package-manager/src/create_virtual_store.rs index 63630f150a..d57c6d4c87 100644 --- a/pacquet/crates/package-manager/src/create_virtual_store.rs +++ b/pacquet/crates/package-manager/src/create_virtual_store.rs @@ -5,7 +5,7 @@ use crate::{ use derive_more::{Display, Error}; use futures_util::future; use miette::Diagnostic; -use pacquet_config::{Config, NodeLinker}; +use pacquet_config::{Config, NodeLinker, PackageImportMethod}; use pacquet_deps_path::get_pkg_id_with_patch_hash; use pacquet_lockfile::{ LockfileResolution, PackageKey, PackageMetadata, PkgIdWithPatchHash, PkgNameVerPeer, @@ -171,6 +171,9 @@ pub struct CreateVirtualStore<'a> { /// the fresh-resolve path's [`crate::PrefetchingResolver`] (closing /// ); `None` otherwise. pub tarball_mem_cache: Option<&'a std::sync::Arc>, + #[cfg(test)] + pub(crate) link_concurrency_probe: + Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>, } /// Error type of [`CreateVirtualStore`]. @@ -217,6 +220,8 @@ impl<'a> CreateVirtualStore<'a> { node_linker, progress_reported, tarball_mem_cache, + #[cfg(test)] + link_concurrency_probe, } = self; let is_hoisted = matches!(node_linker, NodeLinker::Hoisted); @@ -635,6 +640,16 @@ impl<'a> CreateVirtualStore<'a> { None => cold.push((snapshot_key, snapshot)), } } + tracing::info!( + target: "pacquet::install::phase", + phase = "create_virtual_store_partition", + warm = warm.len(), + cold = cold.len(), + skipped = skipped_entries.len(), + total = snapshot_entries.len(), + node_linker = ?node_linker, + "phase complete", + ); // Hoisted-mode CAS index assembly. Collected here, *before* // the warm-batch closure consumes `warm` under the @@ -659,21 +674,6 @@ impl<'a> CreateVirtualStore<'a> { let import_method = config.package_import_method; if !is_hoisted { - use rayon::prelude::*; - // Driving the warm batch from inside an `async fn` means - // the `par_iter` blocks the calling tokio worker for the - // duration. On the production multi-thread runtime that's - // fine — `block_in_place` tells the runtime to migrate any - // other futures off this worker first, so async progress - // continues on the other workers — but `block_in_place` - // panics on `current_thread` runtimes, which is what - // `#[tokio::test]` defaults to. Detect the flavor and only - // call `block_in_place` when it's safe; on - // `current_thread` we fall back to a plain inline call, - // matching how the rest of the test suite already runs - // sync work directly on the test thread (Copilot review on - // ). - // // Hoisted skips this batch entirely: no virtual-store slot // gets written, so there's no per-snapshot link work to // do — the CAS paths captured below are the only output @@ -681,42 +681,27 @@ impl<'a> CreateVirtualStore<'a> { // `nodeLinker === 'hoisted'` guard at // // which routes all link work into `linkHoistedModules`. - let warm_work = move || { - warm.par_iter().try_for_each(|(snapshot_key, snapshot, cas_paths, cache_key)| { - let package_id = snapshot_key.without_peer().to_string(); - emit_warm_snapshot_progress::( - &package_id, - requester, - progress_reported.contains(*cache_key), - ); - - crate::CreateVirtualDirBySnapshot { - layout, - cas_paths: cas_paths.as_ref(), - import_method, - logged_methods, - requester, - package_id: &package_id, - package_key: snapshot_key, - snapshot, - skipped, - } - .run::() - .map_err(|error| { - CreateVirtualStoreError::InstallPackageBySnapshot( - InstallPackageBySnapshotError::CreateVirtualDir(error), - ) - }) + let warm_slots: Vec> = warm + .iter() + .map(|(snapshot_key, snapshot, cas_paths, cache_key)| SlotLink { + snapshot_key, + snapshot, + cas_paths: cas_paths.as_ref(), + warm_cache_key: Some(cache_key), }) - }; - let on_multi_thread = tokio::runtime::Handle::try_current().is_ok_and(|handle| { - handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread - }); - if on_multi_thread { - tokio::task::block_in_place(warm_work)?; - } else { - warm_work()?; - } + .collect(); + link_slots_parallel::(LinkSlotsParallel { + batch: "warm", + slots: &warm_slots, + layout, + import_method, + logged_methods, + requester, + skipped, + progress_reported, + #[cfg(test)] + link_concurrency_probe, + })?; } else { // Hoisted still wants the progress reporter to fire so // `pnpm:progress imported`-style updates render the warm @@ -795,6 +780,8 @@ impl<'a> CreateVirtualStore<'a> { // below so it doesn't serialize inside this // cooperative `try_join_all` task. defer_link: true, + #[cfg(test)] + link_concurrency_probe, } .run::() .await; @@ -853,43 +840,27 @@ impl<'a> CreateVirtualStore<'a> { // concurrently. Hoisted writes no slots, so it skips this and // consumes `cold_cas_paths` for the per-pkg CAS index below. if !is_hoisted && !cold_cas_paths.is_empty() { - use rayon::prelude::*; - let import_method = config.package_import_method; - let link_work = || { - cold_cas_paths.par_iter().try_for_each(|(snapshot_key, snapshot, cas_paths)| { - let package_id = snapshot_key.without_peer().to_string(); - crate::CreateVirtualDirBySnapshot { - layout, - cas_paths, - import_method, - logged_methods, - requester, - package_id: &package_id, - package_key: snapshot_key, - snapshot, - skipped, - } - .run::() - .map_err(|error| { - CreateVirtualStoreError::InstallPackageBySnapshot( - InstallPackageBySnapshotError::CreateVirtualDir(error), - ) - }) + let cold_slots: Vec> = cold_cas_paths + .iter() + .map(|(snapshot_key, snapshot, cas_paths)| SlotLink { + snapshot_key, + snapshot, + cas_paths, + warm_cache_key: None, }) - }; - // `block_in_place` (the same guard the warm batch uses) - // migrates other futures off this worker so async progress - // continues; it panics on the `current_thread` runtime that - // `#[tokio::test]` defaults to, so fall back to a plain call - // there. - let on_multi_thread = tokio::runtime::Handle::try_current().is_ok_and(|handle| { - handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread - }); - if on_multi_thread { - tokio::task::block_in_place(link_work)?; - } else { - link_work()?; - } + .collect(); + link_slots_parallel::(LinkSlotsParallel { + batch: "cold", + slots: &cold_slots, + layout, + import_method, + logged_methods, + requester, + skipped, + progress_reported, + #[cfg(test)] + link_concurrency_probe, + })?; } // Build the per-pkg CAS index when the install is targeting @@ -950,6 +921,103 @@ impl<'a> CreateVirtualStore<'a> { } } +struct SlotLink<'a> { + snapshot_key: &'a PackageKey, + snapshot: &'a SnapshotEntry, + cas_paths: &'a HashMap, + warm_cache_key: Option<&'a str>, +} + +struct LinkSlotsParallel<'a> { + batch: &'static str, + slots: &'a [SlotLink<'a>], + layout: &'a crate::VirtualStoreLayout, + import_method: PackageImportMethod, + logged_methods: &'a AtomicU8, + requester: &'a str, + skipped: &'a SkippedSnapshots, + progress_reported: &'a SharedReportedProgressKeys, + #[cfg(test)] + link_concurrency_probe: + Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>, +} + +fn link_slots_parallel( + opts: LinkSlotsParallel<'_>, +) -> Result<(), CreateVirtualStoreError> { + use rayon::prelude::*; + + let LinkSlotsParallel { + batch, + slots, + layout, + import_method, + logged_methods, + requester, + skipped, + progress_reported, + #[cfg(test)] + link_concurrency_probe, + } = opts; + + let phase_start = std::time::Instant::now(); + let link_work = || { + slots.par_iter().try_for_each(|slot| { + let package_id = slot.snapshot_key.without_peer().to_string(); + if let Some(cache_key) = slot.warm_cache_key { + emit_warm_snapshot_progress::( + &package_id, + requester, + progress_reported.contains(cache_key), + ); + } + + crate::CreateVirtualDirBySnapshot { + layout, + cas_paths: slot.cas_paths, + import_method, + logged_methods, + requester, + package_id: &package_id, + package_key: slot.snapshot_key, + snapshot: slot.snapshot, + skipped, + #[cfg(test)] + link_concurrency_probe, + } + .run::() + .map_err(|error| { + CreateVirtualStoreError::InstallPackageBySnapshot( + InstallPackageBySnapshotError::CreateVirtualDir(error), + ) + }) + }) + }; + // Driving the link pass from inside an `async fn` means the + // `par_iter` blocks the calling tokio worker for the duration. On + // the production multi-thread runtime, `block_in_place` migrates + // other futures off this worker so async progress continues; it + // panics on the `current_thread` runtime that `#[tokio::test]` + // defaults to, so fall back to a plain call there. + let on_multi_thread = tokio::runtime::Handle::try_current() + .is_ok_and(|handle| handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread); + if on_multi_thread { + tokio::task::block_in_place(link_work)?; + } else { + link_work()?; + } + tracing::info!( + target: "pacquet::install::phase", + phase = "link_slots", + batch, + slots = slots.len(), + elapsed_ms = phase_start.elapsed().as_millis() as u64, + "phase complete", + ); + + Ok(()) +} + /// Build the store-index cache key for a snapshot. /// /// Returns: diff --git a/pacquet/crates/package-manager/src/create_virtual_store/tests.rs b/pacquet/crates/package-manager/src/create_virtual_store/tests.rs index 5ec76e4969..1db144ef01 100644 --- a/pacquet/crates/package-manager/src/create_virtual_store/tests.rs +++ b/pacquet/crates/package-manager/src/create_virtual_store/tests.rs @@ -1,13 +1,17 @@ use super::{ - CreateVirtualStoreError, InstallPackageBySnapshotError, emit_warm_snapshot_progress, - integrity_equal, snapshot_cache_key, snapshot_deps_equal, + CreateVirtualStore, CreateVirtualStoreError, InstallPackageBySnapshotError, + emit_warm_snapshot_progress, integrity_equal, snapshot_cache_key, snapshot_deps_equal, }; use pacquet_lockfile::{ GitResolution, LockfileResolution, PackageKey, PackageMetadata, PkgName, PkgVerPeer, RegistryResolution, SnapshotDepRef, SnapshotEntry, TarballResolution, }; -use pacquet_reporter::{LogEvent, ProgressMessage, Reporter}; -use std::{collections::HashMap, sync::Mutex}; +use pacquet_reporter::{LogEvent, ProgressMessage, Reporter, SilentReporter}; +use std::{ + collections::HashMap, + fs, + sync::{Arc, Mutex, atomic::AtomicU8}, +}; fn name(text: &str) -> PkgName { PkgName::parse(text).expect("parse pkg name") @@ -39,6 +43,115 @@ fn snapshot_with_dep(child: &str, ref_str: &str) -> SnapshotEntry { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn cold_batch_links_slots_in_parallel() { + use crate::{AllowBuildPolicy, SkippedSnapshots, VirtualStoreLayout}; + use pacquet_config::{Config, NodeLinker, PackageImportMethod}; + use pacquet_store_dir::StoreIndexWriter; + use pacquet_tarball::{CacheValue, MemCache, SharedReportedProgressKeys}; + + if rayon::current_num_threads() < 2 { + eprintln!( + "skipping cold-batch concurrency assertion with rayon_threads={}", + rayon::current_num_threads(), + ); + return; + } + + let root = tempfile::tempdir().expect("create temp dir"); + let workspace_root = root.path().join("workspace"); + fs::create_dir_all(&workspace_root).expect("create workspace root"); + let modules_dir = workspace_root.join("node_modules"); + let virtual_store_dir = modules_dir.join(".pacquet"); + let store_dir = root.path().join("store"); + + let mut config = Config::new(); + config.registry = "https://registry.test".to_string(); + config.store_dir = store_dir.into(); + config.modules_dir = modules_dir; + config.virtual_store_dir = virtual_store_dir.clone(); + config.package_import_method = PackageImportMethod::Copy; + config.offline = true; + let config = config.leak(); + + let mut snapshots = HashMap::new(); + let mut packages = HashMap::new(); + let mem_cache = Arc::new(MemCache::default()); + for package_name in ["cold-a", "cold-b", "cold-c", "cold-d"] { + let package_key = key(package_name, "1.0.0"); + let source_dir = workspace_root.join("prefetched").join(package_name); + fs::create_dir_all(&source_dir).expect("create prefetched package dir"); + let manifest_path = source_dir.join("package.json"); + fs::write(&manifest_path, format!(r#"{{"name":"{package_name}","version":"1.0.0"}}"#)) + .expect("write package manifest"); + let index_path = source_dir.join("index.js"); + fs::write(&index_path, "module.exports = true\n").expect("write package body"); + + let cas_paths = HashMap::from([ + ("package.json".to_string(), manifest_path), + ("index.js".to_string(), index_path), + ]); + mem_cache.insert( + format!("https://registry.test/{package_name}/-/{package_name}-1.0.0.tgz"), + Arc::new(tokio::sync::RwLock::new(CacheValue::Available(Arc::new(cas_paths)))), + ); + + snapshots.insert(package_key.clone(), SnapshotEntry::default()); + packages.insert(package_key.without_peer(), metadata_with_integrity(DUMMY_SHA512)); + } + + let allow_build_policy = AllowBuildPolicy::default(); + let layout = VirtualStoreLayout::new( + config, + None, + Some(&snapshots), + Some(&packages), + Some(&allow_build_policy), + ); + let skipped = SkippedSnapshots::new(); + let logged_methods = AtomicU8::new(0); + let progress_reported = SharedReportedProgressKeys::default(); + let (store_index_writer, writer_task) = StoreIndexWriter::spawn(&config.store_dir); + let requester = workspace_root.to_string_lossy().into_owned(); + let probe = + crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe::waiting_for_overlap(); + + CreateVirtualStore { + http_client: &Default::default(), + config, + packages: Some(&packages), + snapshots: Some(&snapshots), + current_snapshots: None, + current_packages: None, + layout: &layout, + logged_methods: &logged_methods, + requester: &requester, + store_index_writer: &store_index_writer, + allow_build_policy: &allow_build_policy, + skipped: &skipped, + workspace_root: &workspace_root, + node_linker: NodeLinker::Isolated, + progress_reported: &progress_reported, + tarball_mem_cache: Some(&mem_cache), + link_concurrency_probe: Some(&probe), + } + .run::() + .await + .expect("all-cold virtual-store creation should succeed from the mem cache"); + + drop(store_index_writer); + writer_task.await.expect("join store-index writer").expect("flush store-index writer"); + + assert!( + probe.max_concurrent() >= 2, + "cold-batch slot linking must overlap; observed max_concurrent={} with rayon_threads={}", + probe.max_concurrent(), + rayon::current_num_threads(), + ); +} + +const DUMMY_SHA512: &str = "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=="; + /// `emit_warm_snapshot_progress` fires `resolved` then /// `found_in_store` when no earlier fetch path already emitted the /// package status. Both events carry the same identifiers — pnpm's diff --git a/pacquet/crates/package-manager/src/install_frozen_lockfile.rs b/pacquet/crates/package-manager/src/install_frozen_lockfile.rs index 410d934aa2..aba37cdb2d 100644 --- a/pacquet/crates/package-manager/src/install_frozen_lockfile.rs +++ b/pacquet/crates/package-manager/src/install_frozen_lockfile.rs @@ -590,6 +590,7 @@ where // leaves every warm package reported as `found_in_store`. let progress_reported = SharedReportedProgressKeys::default(); + let phase_start = std::time::Instant::now(); let CreateVirtualStoreOutput { package_manifests, side_effects_maps_by_snapshot, @@ -612,10 +613,18 @@ where node_linker, progress_reported: &progress_reported, tarball_mem_cache, + #[cfg(test)] + link_concurrency_probe: None, } .run::() .await .map_err(InstallFrozenLockfileError::CreateVirtualStore)?; + tracing::info!( + target: "pacquet::install::phase", + phase = "create_virtual_store", + elapsed_ms = phase_start.elapsed().as_millis() as u64, + "phase complete", + ); // Fold fetch-failure swallows into the live skip set so // downstream consumers (`SymlinkDirectDependencies`, diff --git a/pacquet/crates/package-manager/src/install_package_by_snapshot.rs b/pacquet/crates/package-manager/src/install_package_by_snapshot.rs index 0e17d0fc72..0e6da2cb72 100644 --- a/pacquet/crates/package-manager/src/install_package_by_snapshot.rs +++ b/pacquet/crates/package-manager/src/install_package_by_snapshot.rs @@ -124,6 +124,9 @@ pub struct InstallPackageBySnapshot<'a> { /// once its tarball is in the store. No effect under /// [`NodeLinker::Hoisted`], which never writes virtual-store slots. pub defer_link: bool, + #[cfg(test)] + pub(crate) link_concurrency_probe: + Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>, } /// Error type of [`InstallPackageBySnapshot`]. @@ -259,6 +262,8 @@ impl<'a> InstallPackageBySnapshot<'a> { workspace_root, node_linker, defer_link, + #[cfg(test)] + link_concurrency_probe, } = self; // TODO: skip when already exists in store? @@ -559,6 +564,8 @@ impl<'a> InstallPackageBySnapshot<'a> { package_key, snapshot, skipped, + #[cfg(test)] + link_concurrency_probe, } .run::() .map_err(InstallPackageBySnapshotError::CreateVirtualDir)?; diff --git a/pacquet/crates/package-manager/src/install_package_by_snapshot/tests.rs b/pacquet/crates/package-manager/src/install_package_by_snapshot/tests.rs index e95da23c4f..9cae51cc7c 100644 --- a/pacquet/crates/package-manager/src/install_package_by_snapshot/tests.rs +++ b/pacquet/crates/package-manager/src/install_package_by_snapshot/tests.rs @@ -402,6 +402,7 @@ async fn cold_batch_reuses_in_flight_prefetch_from_mem_cache() { // back directly. node_linker: pacquet_config::NodeLinker::Hoisted, defer_link: false, + link_concurrency_probe: None, } .run::() .await @@ -471,6 +472,7 @@ async fn without_mem_cache_skips_coordination_and_downloads() { workspace_root: store_tmp.path(), node_linker: pacquet_config::NodeLinker::Hoisted, defer_link: false, + link_concurrency_probe: None, } .run::() .await @@ -540,6 +542,7 @@ async fn cold_batch_falls_back_when_prefetch_failed() { workspace_root: store_tmp.path(), node_linker: pacquet_config::NodeLinker::Hoisted, defer_link: false, + link_concurrency_probe: None, } .run::() .await diff --git a/pacquet/crates/package-manager/src/install_with_fresh_lockfile.rs b/pacquet/crates/package-manager/src/install_with_fresh_lockfile.rs index 978c5d887c..4847fbdd28 100644 --- a/pacquet/crates/package-manager/src/install_with_fresh_lockfile.rs +++ b/pacquet/crates/package-manager/src/install_with_fresh_lockfile.rs @@ -1309,6 +1309,8 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList> // batch through the mem cache makes it reuse the in-flight // download instead. tarball_mem_cache: Some(&tarball_mem_cache), + #[cfg(test)] + link_concurrency_probe: None, } .run::() .await diff --git a/pacquet/tasks/integrated-benchmark/Cargo.toml b/pacquet/tasks/integrated-benchmark/Cargo.toml index 3d4b0cef31..908937bfa9 100644 --- a/pacquet/tasks/integrated-benchmark/Cargo.toml +++ b/pacquet/tasks/integrated-benchmark/Cargo.toml @@ -24,6 +24,7 @@ os_display = { workspace = true } pipe-trait = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } serde-saphyr = { workspace = true } tokio = { workspace = true } which = { workspace = true } diff --git a/pacquet/tasks/integrated-benchmark/src/cli_args.rs b/pacquet/tasks/integrated-benchmark/src/cli_args.rs index e653a39e44..a485312d1f 100644 --- a/pacquet/tasks/integrated-benchmark/src/cli_args.rs +++ b/pacquet/tasks/integrated-benchmark/src/cli_args.rs @@ -49,28 +49,36 @@ pub struct CliArgs { #[clap(long, default_value_t = 0)] pub pnpr_latency_ms: u64, - /// Round-trip latency, in milliseconds, to inject on the link to the - /// *registry*, applied to **every** client that touches it: direct - /// `pacquet@` / `pnpm@` installs, the `pnpr@` server's - /// resolution, and the pnpr client's tarball fetches. A request to the - /// registry-mock should cost the same regardless of who makes it, so - /// the registry-mock is uniformly as remote as the real one; pnpr's - /// advantage then shows up as fewer client round trips (one - /// `--pnpr-latency-ms` hop to the server) rather than a faster - /// backend. `0` disables injection; ignored with `--registry=npm` - /// (already remote). + /// Round-trip latency, in milliseconds, to inject on the client link + /// to the registry. Direct `pacquet@` / `pnpm@` installs and + /// pnpr clients' tarball fetches use this link. The pnpr server's own + /// resolution uses `--pnpr-server-registry-latency-ms` instead, so a + /// benchmark can model a co-located warm server resolving quickly while + /// remote clients still fetch tarballs across the slower registry link. + /// `0` disables injection; ignored with `--registry=npm` (already + /// remote). #[clap(long, default_value_t = 0)] pub registry_latency_ms: u64, + /// Round-trip latency, in milliseconds, to inject between each + /// `pnpr@` server and the registry it uses for resolution. Keep + /// this low (often `0`) when modeling production, where pnpr sits near + /// its registry/cache backend; otherwise the server resolves too slowly + /// and the benchmark under-represents the client's cold materialization + /// batch. Direct installs and pnpr clients' tarball fetches are + /// unaffected; they use `--registry-latency-ms`. + #[clap(long, default_value_t = 0)] + pub pnpr_server_registry_latency_ms: u64, + /// Download-bandwidth cap, in **megabits per second**, on the link to - /// the registry, applied to every client (direct installs and the pnpr - /// server + client alike), so tarball fetches take the time they would - /// over a real connection instead of being free on loopback. Loopback - /// serves at ~GB/s; the public npm registry measured ~190 Mbit/s - /// (~24 MB/s) peak on a fast link, and typical home/CI links are - /// 50–200 Mbit/s. Pairs with `--registry-latency-ms` (latency - /// dominates small packages, bandwidth dominates large ones). `0` - /// leaves the registry at loopback speed; ignored with + /// the client-facing registry, applied to direct installs and pnpr + /// clients' tarball fetches, so tarballs take the time they would over + /// a real connection instead of being free on loopback. Loopback serves + /// at ~GB/s; the public npm registry measured ~190 Mbit/s (~24 MB/s) + /// peak on a fast link, and typical home/CI links are 50–200 Mbit/s. + /// Pairs with `--registry-latency-ms` (latency dominates small + /// packages, bandwidth dominates large ones). `0` leaves the registry + /// at loopback speed; ignored with /// `--registry=npm` (already remote). #[clap(long, default_value_t = 0.0)] pub registry_bandwidth_mbps: f64, @@ -164,7 +172,7 @@ pub enum RegistryMode { clippy::enum_variant_names, reason = "the shared `Isolated` prefix mirrors the scenario slug and keeps the linker grouping legible; it stops firing once other linker buckets land" )] -#[derive(Debug, Clone, Copy, ValueEnum)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] pub enum BenchmarkScenario { /// No lockfile, cold cache + cold store. Mirrors `pnpm install` with nothing on disk. #[value(name = "isolated-linker.fresh-install.cold-cache.cold-store")] @@ -303,6 +311,18 @@ impl BenchmarkScenario { pub fn enables_gvs(self) -> bool { matches!(self, BenchmarkScenario::GvsFreshRestoreHotCacheHotStore) } + + /// Scenarios where pnpr's server-side resolution is expected to beat + /// or match a direct pacquet install. Hot-cache scenarios deliberately + /// skip this canary because there is little resolution work left to + /// offload and the remote pnpr hop can dominate. + pub fn expects_pnpr_not_slower_than_direct(self) -> bool { + matches!( + self, + BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore + | BenchmarkScenario::IsolatedFreshInstallColdCacheHotStore, + ) + } } #[derive(Debug, Args)] diff --git a/pacquet/tasks/integrated-benchmark/src/main.rs b/pacquet/tasks/integrated-benchmark/src/main.rs index 63c8287f5f..4db97d1dc6 100644 --- a/pacquet/tasks/integrated-benchmark/src/main.rs +++ b/pacquet/tasks/integrated-benchmark/src/main.rs @@ -31,6 +31,7 @@ async fn main() { with_pnpm, pnpr_latency_ms, registry_latency_ms, + pnpr_server_registry_latency_ms, registry_bandwidth_mbps, reuse_prebuilt_binaries, build_only, @@ -166,8 +167,9 @@ async fn main() { fixture_dir, pnpr_latency_ms, registry_latency_ms, + pnpr_server_registry_latency_ms, registry_bandwidth_mbps, - registry_port, + registry_port: spawned_registry_port, reuse_prebuilt_binaries, }; if build_only { diff --git a/pacquet/tasks/integrated-benchmark/src/work_env.rs b/pacquet/tasks/integrated-benchmark/src/work_env.rs index ae35efb738..7837443cdf 100644 --- a/pacquet/tasks/integrated-benchmark/src/work_env.rs +++ b/pacquet/tasks/integrated-benchmark/src/work_env.rs @@ -12,8 +12,11 @@ use os_display::Quotable; use pacquet_fs::file_mode::make_file_executable; use pacquet_registry_mock::pick_unused_port; use pipe_trait::Pipe; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::{ borrow::Cow, + collections::HashMap, fmt, fs::{self, File}, io::Write, @@ -24,6 +27,13 @@ use std::{ time::Duration, }; +const BENCHMARK_OUTPUT_LOG: &str = "BENCHMARK_OUTPUT.ndjson"; +const BENCHMARK_DIAGNOSTICS_JSON: &str = "BENCHMARK_DIAGNOSTICS.json"; +const BENCHMARK_DIAGNOSTICS_MD: &str = "BENCHMARK_DIAGNOSTICS.md"; +const PNPR_DIRECT_RATIO_MAX: f64 = 1.05; +const PNPR_SERVER_REGISTRY_ENV: &str = "PACQUET_BENCHMARK_PNPR_SERVER_REGISTRY"; +const PNPR_TARBALL_REWRITE_FROM_ENV: &str = "PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM"; + #[derive(Debug)] pub struct WorkEnv { pub root: PathBuf, @@ -47,6 +57,12 @@ pub struct WorkEnv { /// `0` leaves the registry on loopback. Ignored in `--registry=npm` /// mode (already remote). pub registry_latency_ms: u64, + /// Round-trip latency (ms) between a pnpr server and the registry it + /// resolves against. Separate from `registry_latency_ms` so the + /// benchmark can model a co-located server with fast metadata access + /// while clients still fetch tarballs over a remote link. Ignored in + /// `--registry=npm` mode. + pub pnpr_server_registry_latency_ms: u64, /// Download-bandwidth cap (megabits/sec) on the link to the registry, /// applied to every client, so tarball fetches cost real time instead /// of being free on loopback. `0` leaves the registry at loopback @@ -201,7 +217,7 @@ impl WorkEnv { fs::create_dir_all(&dir).expect("create directory for the revision"); create_package_json(&dir, self.fixture_dir.as_deref()); create_pnpm_workspace(&dir, self.fixture_dir.as_deref(), registry, scenario); - create_install_script(&dir, scenario, &WorkEnv::install_command(id), id.is_pnpr()); + create_install_script(&dir, scenario, &WorkEnv::install_command(id), id); create_npmrc(&dir, registry, scenario); may_create_lockfile(&dir, scenario, self.fixture_dir.as_deref()); save_pristine_copies(&dir); @@ -378,7 +394,7 @@ impl WorkEnv { .pipe(executor("pnpm run compile-only")); } - fn benchmark(&self) { + fn benchmark(&self, pnpr_server_registry: &str) { let scenario = self.scenario.expect("scenario set when benchmark() is reached"); // Pre-benchmark wipe of `node_modules`, `store-dir`, and @@ -406,6 +422,10 @@ impl WorkEnv { fs::remove_dir_all(&path).expect("pre-benchmark wipe"); } } + let output_log = dir.join(BENCHMARK_OUTPUT_LOG); + if output_log.exists() { + fs::remove_file(output_log).expect("pre-benchmark metrics-log wipe"); + } } // Start a pnpr server per `pnpr@` target and keep the guards @@ -413,7 +433,7 @@ impl WorkEnv { // the end of this method. Empty (no-op) when there are no pnpr // targets. Spawned before the GVS pre-warm below so a pnpr target // would have its server up if a scenario ever combines the two. - let _pnpr_servers = self.start_pnpr_servers(); + let _pnpr_servers = self.start_pnpr_servers(pnpr_server_registry); // For GVS-warm we need a pre-warm pass: hyperfine's `--warmup` // would otherwise time-from-empty for the first run since the @@ -462,6 +482,7 @@ impl WorkEnv { .arg(self.root().join("BENCHMARK_REPORT.md")); executor("hyperfine")(&mut command); + self.write_benchmark_diagnostics(); } /// Start a pnpr resolver server for every `pnpr@` @@ -469,14 +490,14 @@ impl WorkEnv { /// server gets an isolated `/pnpr-storage`. The returned /// guards keep the servers alive and kill them on drop; the vec is /// empty when no target is a pnpr target. - fn start_pnpr_servers(&self) -> Vec { + fn start_pnpr_servers(&self, pnpr_server_registry: &str) -> Vec { self.benchmarked_ids() .filter(|id| id.is_pnpr()) - .map(|id| self.start_pnpr_server(id)) + .map(|id| self.start_pnpr_server(id, pnpr_server_registry)) .collect() } - fn start_pnpr_server(&self, id: BenchId) -> PnprServer { + fn start_pnpr_server(&self, id: BenchId, pnpr_server_registry: &str) -> PnprServer { let bench_dir = self.bench_dir(id); let binary = bench_dir.join("pacquet").join("target").join("release").join("pnpr"); assert!( @@ -547,9 +568,19 @@ impl WorkEnv { // bare `PNPR_SERVER` is silently ignored and the install runs // *direct* instead of through pnpr — making every `pnpr@` // target a duplicate of its `pacquet@` row. + // `PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM` is a source + // prefix, not the client fetch path. Some registry fixtures return + // raw upstream tarball URLs even when the server resolves through a + // latency proxy; the pacquet client rewrites this prefix to its + // configured registry, which is `client_registry` from `.npmrc`. fs::write( bench_dir.join(".pnpr-env"), - format!("export PNPM_CONFIG_PNPR_SERVER={client_url}\n"), + format!( + "export PNPM_CONFIG_PNPR_SERVER={client_url}\n\ + export {PNPR_SERVER_REGISTRY_ENV}={pnpr_server_registry}\n\ + export {PNPR_TARBALL_REWRITE_FROM_ENV}={tarball_rewrite_from}\n", + tarball_rewrite_from = self.registry, + ), ) .expect("write .pnpr-env"); @@ -557,22 +588,29 @@ impl WorkEnv { } pub fn run(&self) { - // Virtual mode points at an already-running registry, so this - // method can only wrap it with a random local proxy and bake that - // URL into the benchmark configs. Verdaccio mode is handled in - // `main`: the proxy must own the public registry port before - // registry-mock starts so packuments advertise proxied tarball URLs. - let registry_proxy = self.start_registry_proxy(); + // The client registry URL is baked into every target's config + // during `init`. Direct pacquet/pnpm and the pnpr client tarball + // materialization go through this URL. The pnpr server receives a + // separate resolve-registry URL so server-side metadata access can + // be measured independently. + let registry_proxy = self.start_client_registry_proxy(); let client_registry = registry_proxy .as_ref() .map(|proxy| format!("http://{}/", proxy.addr)) .unwrap_or_else(|| self.registry.clone()); + let pnpr_server_registry_proxy = self.start_pnpr_server_registry_proxy(); + let pnpr_server_registry = pnpr_server_registry_proxy + .as_ref() + .map(|proxy| format!("http://{}/", proxy.addr)) + .unwrap_or_else(|| self.registry_cache_populator.clone()); self.init(&client_registry); self.build(); - self.benchmark(); + self.benchmark(&pnpr_server_registry); + drop(pnpr_server_registry_proxy); drop(registry_proxy); self.verify_pnpr_targets_were_routed(); + self.verify_benchmark_diagnostics(); } /// Fail the run if a `pnpr@` target never actually went through its @@ -596,10 +634,11 @@ impl WorkEnv { } /// Start a proxy in front of an external virtual registry that emulates - /// a real link (latency + bandwidth cap) for every client, or `None` - /// when neither is requested. Spawned Verdaccio registries are proxied - /// in `main` so their advertised tarball URLs use the proxied port. - fn start_registry_proxy(&self) -> Option { + /// a real link (latency + bandwidth cap) for benchmark clients, or + /// `None` when neither is requested. Spawned Verdaccio registries are + /// proxied in `main` so their advertised tarball URLs use the proxied + /// public port. + fn start_client_registry_proxy(&self) -> Option { let rate_limit = mbps_to_bytes_per_sec(self.registry_bandwidth_mbps); if (self.registry_latency_ms == 0 && rate_limit.is_none()) || matches!(self.registry_mode, RegistryMode::Npm | RegistryMode::Verdaccio) @@ -624,15 +663,401 @@ impl WorkEnv { Some(proxy) } - /// The registry a given bench id resolves against. Every benchmarked - /// target — direct *and* pnpr — uses `client_registry` (the emulated - /// link when throttling is on; the raw registry otherwise), because a - /// request to the registry-mock should cost the same regardless of who - /// makes it. The proxy-cache populator may use a separate registry URL - /// for untimed cache priming. + /// Start a latency-only proxy for pnpr server-side registry access. + /// The client still uses [`Self::start_client_registry_proxy`], which + /// may have a higher latency and a bandwidth cap for tarball fetches. + fn start_pnpr_server_registry_proxy(&self) -> Option { + if self.pnpr_server_registry_latency_ms == 0 + || matches!(self.registry_mode, RegistryMode::Npm) + { + return None; + } + let upstream = SocketAddr::from((Ipv4Addr::LOCALHOST, self.registry_port)); + let profile = LinkProfile { + one_way: Duration::from_millis(self.pnpr_server_registry_latency_ms) / 2, + rate_limit: None, + }; + let proxy = + LatencyProxy::spawn(upstream, profile).expect("spawn pnpr server registry proxy"); + eprintln!( + "Fronting the pnpr server registry link with {}ms round-trip latency (proxy at {})", + self.pnpr_server_registry_latency_ms, proxy.addr, + ); + Some(proxy) + } + + /// The registry a given bench id resolves against from the client's + /// point of view. Direct targets use this for every registry request; + /// pnpr targets keep it as the materialization registry while their + /// server receives a separate resolve-registry override in `.pnpr-env`. + /// The proxy-cache populator may use a separate registry URL for + /// untimed cache priming. fn registry_for<'a>(&'a self, id: BenchId, client_registry: &'a str) -> &'a str { if id.is_proxy_cache_populator() { &self.registry_cache_populator } else { client_registry } } + + fn write_benchmark_diagnostics(&self) { + let diagnostics = self.collect_benchmark_diagnostics(); + let json = serde_json::to_string_pretty(&diagnostics).expect("serialize diagnostics JSON"); + fs::write(self.root().join(BENCHMARK_DIAGNOSTICS_JSON), json) + .expect("write benchmark diagnostics JSON"); + let markdown = render_diagnostics_markdown(&diagnostics, self.scenario); + fs::write(self.root().join(BENCHMARK_DIAGNOSTICS_MD), &markdown) + .expect("write benchmark diagnostics markdown"); + } + + fn collect_benchmark_diagnostics(&self) -> BenchmarkDiagnostics { + let hyperfine = read_hyperfine_report(&self.root().join("BENCHMARK_REPORT.json")); + let commands_by_name: HashMap = hyperfine + .results + .into_iter() + .map(|command| (command.name().to_string(), command)) + .collect(); + let targets = self + .benchmarked_ids() + .map(|id| { + let id = id.to_string(); + let phase_events = + read_phase_events(&self.root().join(&id).join(BENCHMARK_OUTPUT_LOG)); + let command = commands_by_name.get(&id); + BenchmarkTargetDiagnostics { + id, + hyperfine_mean_seconds: command.map(|command| command.mean), + phase_summary: summarize_phase_events(&phase_events), + phase_events, + } + }) + .collect(); + + BenchmarkDiagnostics { + targets, + pnpr_direct_ratios: collect_pnpr_direct_ratios(&commands_by_name), + } + } + + fn verify_benchmark_diagnostics(&self) { + let diagnostics = read_benchmark_diagnostics(&self.root().join(BENCHMARK_DIAGNOSTICS_JSON)); + self.verify_fresh_pnpr_cold_batch(&diagnostics); + self.verify_pnpr_direct_ratios(&diagnostics); + } + + fn verify_fresh_pnpr_cold_batch(&self, diagnostics: &BenchmarkDiagnostics) { + if self.scenario != Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore) { + return; + } + for target in diagnostics + .targets + .iter() + .filter(|target| requires_fresh_pnpr_cold_batch_metrics(&target.id)) + { + let Some(partition) = target.phase_summary.partition.as_ref() else { + panic!( + "{id} did not emit create_virtual_store_partition metrics; \ + benchmark cannot prove the pnpr fresh install exercised the cold batch", + id = target.id, + ); + }; + assert!( + non_trivial_cold_batch(partition.cold, partition.total), + "{id} did not exercise a non-trivial cold batch: warm={} cold={} skipped={} total={}", + partition.warm, + partition.cold, + partition.skipped, + partition.total, + id = target.id, + ); + } + } + + fn verify_pnpr_direct_ratios(&self, diagnostics: &BenchmarkDiagnostics) { + let Some(scenario) = self.scenario else { return }; + if !scenario.expects_pnpr_not_slower_than_direct() { + return; + } + for ratio in &diagnostics.pnpr_direct_ratios { + if ratio.revision != "HEAD" { + continue; + } + assert!( + ratio.ratio <= PNPR_DIRECT_RATIO_MAX, + "pnpr@{} was slower than pacquet@{}: ratio {:.3} > {:.3} (pnpr {:.3}s, pacquet {:.3}s)", + ratio.revision, + ratio.revision, + ratio.ratio, + PNPR_DIRECT_RATIO_MAX, + ratio.pnpr_mean_seconds, + ratio.pacquet_mean_seconds, + ); + } + } +} + +#[derive(Debug, Deserialize)] +struct HyperfineReport { + results: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct HyperfineCommand { + command: String, + #[serde(default)] + command_name: Option, + mean: f64, +} + +impl HyperfineCommand { + fn name(&self) -> &str { + self.command_name.as_deref().unwrap_or(&self.command) + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct BenchmarkDiagnostics { + targets: Vec, + pnpr_direct_ratios: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +struct BenchmarkTargetDiagnostics { + id: String, + hyperfine_mean_seconds: Option, + phase_summary: PhaseSummary, + phase_events: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct PhaseSummary { + partition: Option, + create_virtual_store_mean_ms: Option, + link_slots: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PartitionMetric { + warm: u64, + cold: u64, + skipped: u64, + total: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +struct LinkSlotsMetric { + batch: String, + slots: u64, + mean_ms: f64, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PhaseEvent { + phase: String, + elapsed_ms: Option, + warm: Option, + cold: Option, + skipped: Option, + total: Option, + batch: Option, + slots: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PnprDirectRatio { + revision: String, + pnpr_mean_seconds: f64, + pacquet_mean_seconds: f64, + ratio: f64, +} + +fn read_hyperfine_report(path: &Path) -> HyperfineReport { + let text = fs::read_to_string(path) + .unwrap_or_else(|err| panic!("read hyperfine report at {}: {err}", path.display())); + serde_json::from_str(&text) + .unwrap_or_else(|err| panic!("parse hyperfine report at {}: {err}", path.display())) +} + +fn read_benchmark_diagnostics(path: &Path) -> BenchmarkDiagnostics { + let text = fs::read_to_string(path) + .unwrap_or_else(|err| panic!("read benchmark diagnostics at {}: {err}", path.display())); + serde_json::from_str(&text) + .unwrap_or_else(|err| panic!("parse benchmark diagnostics at {}: {err}", path.display())) +} + +fn read_phase_events(path: &Path) -> Vec { + let Ok(text) = fs::read_to_string(path) else { return Vec::new() }; + text.lines() + .filter_map(|line| serde_json::from_str::(line).ok()) + .filter(|value| { + value.get("target").and_then(Value::as_str) == Some("pacquet::install::phase") + }) + .filter_map(|value| { + let phase = event_str(&value, "phase")?.to_string(); + Some(PhaseEvent { + phase, + elapsed_ms: event_u64(&value, "elapsed_ms"), + warm: event_u64(&value, "warm"), + cold: event_u64(&value, "cold"), + skipped: event_u64(&value, "skipped"), + total: event_u64(&value, "total"), + batch: event_str(&value, "batch").map(str::to_string), + slots: event_u64(&value, "slots"), + }) + }) + .collect() +} + +fn event_field<'a>(value: &'a Value, key: &str) -> Option<&'a Value> { + value.get(key).or_else(|| value.get("fields").and_then(|fields| fields.get(key))) +} + +fn event_str<'a>(value: &'a Value, key: &str) -> Option<&'a str> { + event_field(value, key).and_then(Value::as_str) +} + +fn event_u64(value: &Value, key: &str) -> Option { + let value = event_field(value, key)?; + value.as_u64().or_else(|| value.as_str().and_then(|text| text.parse().ok())) +} + +fn summarize_phase_events(events: &[PhaseEvent]) -> PhaseSummary { + let partition = + events.iter().rev().find(|event| event.phase == "create_virtual_store_partition").and_then( + |event| { + Some(PartitionMetric { + warm: event.warm?, + cold: event.cold?, + skipped: event.skipped.unwrap_or(0), + total: event.total?, + }) + }, + ); + let create_virtual_store_mean_ms = mean( + events + .iter() + .filter(|event| event.phase == "create_virtual_store") + .filter_map(|event| event.elapsed_ms) + .map(|elapsed| elapsed as f64), + ); + let link_slots = ["warm", "cold"] + .into_iter() + .filter_map(|batch| { + let matching: Vec<&PhaseEvent> = events + .iter() + .filter(|event| { + event.phase == "link_slots" && event.batch.as_deref() == Some(batch) + }) + .collect(); + if matching.is_empty() { + return None; + } + let slots = matching.iter().filter_map(|event| event.slots).max().unwrap_or(0); + let mean_ms = + mean(matching.iter().filter_map(|event| event.elapsed_ms).map(|ms| ms as f64))?; + Some(LinkSlotsMetric { batch: batch.to_string(), slots, mean_ms }) + }) + .collect(); + PhaseSummary { partition, create_virtual_store_mean_ms, link_slots } +} + +fn mean(values: impl Iterator) -> Option { + let mut total = 0.0; + let mut count = 0_u64; + for value in values { + total += value; + count += 1; + } + (count > 0).then_some(total / count as f64) +} + +fn collect_pnpr_direct_ratios( + commands_by_name: &HashMap, +) -> Vec { + let mut ratios: Vec = commands_by_name + .iter() + .filter_map(|(name, pnpr)| { + let revision = name.strip_prefix("pnpr@")?; + let direct = commands_by_name.get(&format!("pacquet@{revision}"))?; + Some(PnprDirectRatio { + revision: revision.to_string(), + pnpr_mean_seconds: pnpr.mean, + pacquet_mean_seconds: direct.mean, + ratio: pnpr.mean / direct.mean, + }) + }) + .collect(); + ratios.sort_by(|a, b| a.revision.cmp(&b.revision)); + ratios +} + +fn non_trivial_cold_batch(cold: u64, total: u64) -> bool { + cold > 0 && (total < 10 || cold.saturating_mul(10) >= total) +} + +fn requires_fresh_pnpr_cold_batch_metrics(target_id: &str) -> bool { + target_id == "pnpr@HEAD" +} + +fn render_diagnostics_markdown( + diagnostics: &BenchmarkDiagnostics, + scenario: Option, +) -> String { + let mut out = String::from("## Pacquet benchmark diagnostics\n\n"); + if scenario == Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore) + && contains_uninstrumented_pnpr_main(diagnostics) + { + out.push_str( + "> Note: `pnpr@main` in this no-lockfile cold-store report predates the benchmark tarball URL rewrite, so newly resolved tarballs can use raw loopback registry URLs. `pnpr@HEAD` rewrites those URLs to the client-facing registry and pays the configured registry latency/bandwidth. Treat `pnpr@HEAD / pacquet@HEAD` as the guarded comparison here, not `pnpr@HEAD` versus `pnpr@main`.\n\n", + ); + } + out.push_str( + "| Target | hyperfine mean | warm | cold | skipped | CreateVirtualStore mean | link warm mean | link cold mean |\n", + ); + out.push_str("| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |\n"); + for target in &diagnostics.targets { + let partition = target.phase_summary.partition.as_ref(); + out.push_str(&format!( + "| {} | {} | {} | {} | {} | {} | {} | {} |\n", + target.id, + format_seconds(target.hyperfine_mean_seconds), + format_u64(partition.map(|metric| metric.warm)), + format_u64(partition.map(|metric| metric.cold)), + format_u64(partition.map(|metric| metric.skipped)), + format_ms(target.phase_summary.create_virtual_store_mean_ms), + format_ms(link_slots_mean(&target.phase_summary, "warm")), + format_ms(link_slots_mean(&target.phase_summary, "cold")), + )); + } + if !diagnostics.pnpr_direct_ratios.is_empty() { + out.push_str("\n| Ratio | value |\n| --- | ---: |\n"); + for ratio in &diagnostics.pnpr_direct_ratios { + out.push_str(&format!( + "| pnpr@{} / pacquet@{} | {:.3} |\n", + ratio.revision, ratio.revision, ratio.ratio, + )); + } + } + out +} + +fn contains_uninstrumented_pnpr_main(diagnostics: &BenchmarkDiagnostics) -> bool { + diagnostics + .targets + .iter() + .any(|target| target.id == "pnpr@main" && target.phase_summary.partition.is_none()) +} + +fn link_slots_mean(summary: &PhaseSummary, batch: &str) -> Option { + summary.link_slots.iter().find(|metric| metric.batch == batch).map(|metric| metric.mean_ms) +} + +fn format_seconds(value: Option) -> String { + value.map_or_else(|| "-".to_string(), |value| format!("{value:.3}s")) +} + +fn format_ms(value: Option) -> String { + value.map_or_else(|| "-".to_string(), |value| format!("{value:.1}ms")) +} + +fn format_u64(value: Option) -> String { + value.map_or_else(|| "-".to_string(), |value| value.to_string()) } /// Whether `dir` contains at least one regular file, recursively. Used to @@ -961,13 +1386,9 @@ fn may_create_lockfile(dst_dir: &Path, scenario: BenchmarkScenario, src_dir: Opt /// client picks up `PNPM_CONFIG_PNPR_SERVER` and routes the install /// through it. The `source` fails loudly under `errexit` if the file is /// missing, rather than silently falling back to a direct install. -fn create_install_script( - dir: &Path, - scenario: BenchmarkScenario, - command: &str, - needs_pnpr_env: bool, -) { +fn create_install_script(dir: &Path, scenario: BenchmarkScenario, command: &str, id: BenchId) { let path = dir.join("install.bash"); + let capture_pacquet_metrics = id.is_pacquet_like(); eprintln!("Creating script {path:?}..."); let mut file = File::create(&path).expect("create install.bash"); @@ -975,14 +1396,34 @@ fn create_install_script( writeln!(file, "#!/bin/bash").unwrap(); writeln!(file, "set -o errexit -o nounset -o pipefail").unwrap(); writeln!(file, r#"cd "$(dirname "$0")""#).unwrap(); - if needs_pnpr_env { + if id.is_pnpr() { writeln!(file, "source ./.pnpr-env").unwrap(); } + if capture_pacquet_metrics { + // pnpm targets cannot emit pacquet phase events, so diagnostics are + // pacquet/pnpr-only. This adds a small one-sided tracing + file-I/O + // cost to pnpm comparisons, but keeps materialization regressions + // visible in the benchmark report. + writeln!(file, r#"export TRACE="${{TRACE:-pacquet::install::phase=info}}""#).unwrap(); + writeln!(file, r#"export TRACE_FORMAT="${{TRACE_FORMAT:-json}}""#).unwrap(); + writeln!( + file, + r#"printf '{{"benchmarkTarget":"{}","event":"runStart"}}\n' >> {}"#, + id, BENCHMARK_OUTPUT_LOG, + ) + .unwrap(); + } write!(file, "exec {command}").unwrap(); + if capture_pacquet_metrics { + write!(file, " --reporter ndjson").unwrap(); + } for arg in scenario.install_args() { write!(file, " {arg}").unwrap(); } + if capture_pacquet_metrics { + write!(file, " >> {BENCHMARK_OUTPUT_LOG} 2>&1").unwrap(); + } writeln!(file).unwrap(); make_file_executable(&file).expect("make the script executable"); @@ -1016,6 +1457,12 @@ impl BenchId<'_> { matches!(self, BenchId::PnprRevision(_)) } + /// Whether this bench id runs the Rust pacquet client, either + /// directly or through a pnpr server. + fn is_pacquet_like(self) -> bool { + matches!(self, BenchId::PacquetRevision(_) | BenchId::PnprRevision(_)) + } + /// Whether this is the proxy-cache populator (untimed setup that /// warms the registry cache), which always uses the real registry. fn is_proxy_cache_populator(self) -> bool { @@ -1033,3 +1480,6 @@ impl<'a> fmt::Display for BenchId<'a> { } } } + +#[cfg(test)] +mod tests; diff --git a/pacquet/tasks/integrated-benchmark/src/work_env/tests.rs b/pacquet/tasks/integrated-benchmark/src/work_env/tests.rs new file mode 100644 index 0000000000..8a15f98486 --- /dev/null +++ b/pacquet/tasks/integrated-benchmark/src/work_env/tests.rs @@ -0,0 +1,206 @@ +use super::{ + BenchmarkScenario, HyperfineCommand, PhaseEvent, collect_pnpr_direct_ratios, + non_trivial_cold_batch, read_phase_events, render_diagnostics_markdown, + requires_fresh_pnpr_cold_batch_metrics, summarize_phase_events, +}; +use std::{collections::HashMap, fs}; + +#[test] +fn phase_event_parser_reads_flat_and_nested_json_trace_fields() { + let path = std::env::temp_dir() + .join(format!("pacquet-integrated-benchmark-phase-events-{}.ndjson", std::process::id())); + fs::write( + &path, + r#"{"target":"pacquet::install::phase","phase":"create_virtual_store_partition","warm":3,"cold":7,"skipped":1,"total":11}"# + .to_string() + + "\n" + + r#"{"target":"pacquet::install::phase","fields":{"phase":"create_virtual_store","elapsed_ms":42}}"# + + "\n" + + r#"{"name":"pnpm:progress","status":"resolved"}"# + + "\n", + ) + .expect("write phase fixture"); + + let events = read_phase_events(&path); + let _ = fs::remove_file(path); + + assert_eq!(events.len(), 2); + assert_eq!(events[0].phase, "create_virtual_store_partition"); + assert_eq!(events[0].warm, Some(3)); + assert_eq!(events[0].cold, Some(7)); + assert_eq!(events[1].phase, "create_virtual_store"); + assert_eq!(events[1].elapsed_ms, Some(42)); +} + +#[test] +fn phase_summary_reports_partition_and_means() { + let events = vec![ + PhaseEvent { + phase: "create_virtual_store_partition".to_string(), + elapsed_ms: None, + warm: Some(1), + cold: Some(9), + skipped: Some(0), + total: Some(10), + batch: None, + slots: None, + }, + PhaseEvent { + phase: "create_virtual_store".to_string(), + elapsed_ms: Some(100), + warm: None, + cold: None, + skipped: None, + total: None, + batch: None, + slots: None, + }, + PhaseEvent { + phase: "create_virtual_store".to_string(), + elapsed_ms: Some(200), + warm: None, + cold: None, + skipped: None, + total: None, + batch: None, + slots: None, + }, + PhaseEvent { + phase: "link_slots".to_string(), + elapsed_ms: Some(30), + warm: None, + cold: None, + skipped: None, + total: None, + batch: Some("cold".to_string()), + slots: Some(9), + }, + ]; + + let summary = summarize_phase_events(&events); + let partition = summary.partition.expect("partition summary"); + assert_eq!(partition.warm, 1); + assert_eq!(partition.cold, 9); + assert_eq!(summary.create_virtual_store_mean_ms, Some(150.0)); + assert_eq!(summary.link_slots[0].batch, "cold"); + assert_eq!(summary.link_slots[0].mean_ms, 30.0); +} + +#[test] +fn pnpr_direct_ratios_pair_matching_revisions() { + let commands = HashMap::from([ + ( + "pacquet@HEAD".to_string(), + HyperfineCommand { + command: "pacquet@HEAD".to_string(), + command_name: None, + mean: 10.0, + }, + ), + ( + "pnpr@HEAD".to_string(), + HyperfineCommand { command: "pnpr@HEAD".to_string(), command_name: None, mean: 8.0 }, + ), + ( + "pnpr@main".to_string(), + HyperfineCommand { command: "pnpr@main".to_string(), command_name: None, mean: 9.0 }, + ), + ]); + + let ratios = collect_pnpr_direct_ratios(&commands); + + assert_eq!(ratios.len(), 1); + assert_eq!(ratios[0].revision, "HEAD"); + assert_eq!(ratios[0].ratio, 0.8); +} + +#[test] +fn cold_batch_canary_requires_non_trivial_cold_share() { + assert!(non_trivial_cold_batch(1, 1)); + assert!(non_trivial_cold_batch(10, 100)); + assert!(!non_trivial_cold_batch(0, 100)); + assert!(!non_trivial_cold_batch(9, 100)); +} + +#[test] +fn cold_batch_metrics_canary_targets_current_pnpr_revision() { + assert!(requires_fresh_pnpr_cold_batch_metrics("pnpr@HEAD")); + assert!(!requires_fresh_pnpr_cold_batch_metrics("pnpr@main")); + assert!(!requires_fresh_pnpr_cold_batch_metrics("pacquet@HEAD")); +} + +#[test] +fn diagnostics_markdown_includes_create_virtual_store_line_item() { + let markdown = render_diagnostics_markdown( + &super::BenchmarkDiagnostics { + targets: vec![super::BenchmarkTargetDiagnostics { + id: "pnpr@HEAD".to_string(), + hyperfine_mean_seconds: Some(7.5), + phase_summary: super::PhaseSummary { + partition: Some(super::PartitionMetric { + warm: 12, + cold: 88, + skipped: 0, + total: 100, + }), + create_virtual_store_mean_ms: Some(1234.0), + link_slots: vec![], + }, + phase_events: vec![], + }], + pnpr_direct_ratios: vec![], + }, + None, + ); + + assert!(markdown.contains("CreateVirtualStore mean")); + assert!(markdown.contains("1234.0ms")); + assert!(markdown.contains("| pnpr@HEAD |")); +} + +#[test] +fn diagnostics_markdown_notes_fresh_install_cold_store_tarball_baseline_shift() { + let markdown = render_diagnostics_markdown( + &super::BenchmarkDiagnostics { + targets: vec![super::BenchmarkTargetDiagnostics { + id: "pnpr@main".to_string(), + hyperfine_mean_seconds: Some(1.0), + phase_summary: super::PhaseSummary::default(), + phase_events: vec![], + }], + pnpr_direct_ratios: vec![], + }, + Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore), + ); + + assert!(markdown.contains("pnpr@main")); + assert!(markdown.contains("tarball URL rewrite")); + assert!(markdown.contains("pnpr@HEAD / pacquet@HEAD")); +} + +#[test] +fn diagnostics_markdown_omits_baseline_note_after_pnpr_main_is_instrumented() { + let markdown = render_diagnostics_markdown( + &super::BenchmarkDiagnostics { + targets: vec![super::BenchmarkTargetDiagnostics { + id: "pnpr@main".to_string(), + hyperfine_mean_seconds: Some(1.0), + phase_summary: super::PhaseSummary { + partition: Some(super::PartitionMetric { + warm: 0, + cold: 1, + skipped: 0, + total: 1, + }), + create_virtual_store_mean_ms: None, + link_slots: vec![], + }, + phase_events: vec![], + }], + pnpr_direct_ratios: vec![], + }, + Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore), + ); + + assert!(!markdown.contains("tarball URL rewrite")); +}