perf(pacquet): share virtual-store slot linking pass (#12251)

Refs: pnpm/pnpm#12250

- share warm/cold virtual-store slot linking through one parallel helper
- emit structured pacquet install phase metrics for virtual-store partition sizes and link-slot elapsed time
- generate integrated-benchmark diagnostics artifacts and use them for the fresh pnpr cold-batch and pnpr-vs-direct guardrails
- split client registry latency/bandwidth from pnpr server registry latency, while rewriting server-origin tarball URLs back to the measured client registry path
- keep the benchmark PR comment focused on scenario tables and collapsed raw JSON; diagnostics stay available as artifacts instead of inline report noise
This commit is contained in:
Zoltan Kochan
2026-06-07 23:27:15 +02:00
committed by GitHub
parent 68ddaf9198
commit 0dc770aa7b
19 changed files with 1337 additions and 160 deletions

View File

@@ -329,7 +329,7 @@ jobs:
(
echo '## Integrated-Benchmark Report (${{ runner.os }})'
echo
echo 'Each scenario has pacquet rows (direct install) and pnpr rows (the same client through the pnpr install accelerator), so pnpr@HEAD vs pacquet@HEAD is the pnpr-vs-direct ratio. Cold-store scenarios wipe the client store between runs (warm server); hot-store scenarios keep it warm. The pacquet@HEAD rows feed the pacquet Bencher testbed; the pnpr@HEAD rows feed the pnpr testbed.'
echo 'Each scenario reports direct installs and pnpr installs. Bencher consumes pacquet@HEAD and pnpr@HEAD.'
echo
echo '### Scenario: Isolated linker: fresh restore, cold cache + cold store'
echo
@@ -381,8 +381,6 @@ jobs:
echo
echo '### Scenario: Isolated linker: fresh install, cold cache + hot store'
echo
echo 'Resolution-only: cold packument cache (full re-resolve over the registry link) with a hot store (no tarball download), so this isolates pnpr offloading the client resolution to its warm server.'
echo
cat bench-work-env/BENCHMARK_REPORT_ISOLATED_FRESH_INSTALL_COLD_CACHE_HOT_STORE.md
echo
echo '<details><summary>BENCHMARK_REPORT.json</summary>'
@@ -399,9 +397,9 @@ jobs:
# Bencher's shell_hyperfine adapter accepts, one per testbed. For
# each, keep only the chosen target's result from each scenario
# and rename `.command` to the scenario name, so Bencher names the
# benchmark after the scenario. Both tools run all four scenarios,
# benchmark after the scenario. Both tools run all five scenarios,
# so the `pacquet` testbed gets `pacquet@HEAD` and the `pnpr`
# testbed gets `pnpr@HEAD`, each across the same four scenarios.
# testbed gets `pnpr@HEAD`, each across the same scenario set.
shell: bash
run: |
# Build one bencher-results file: $1 = output path, $2 = the

1
Cargo.lock generated
View File

@@ -3145,6 +3145,7 @@ dependencies = [
"reqwest 0.13.4",
"serde",
"serde-saphyr",
"serde_json",
"tokio",
"which 8.0.2",
]

View File

@@ -2,12 +2,15 @@ use crate::{State, cli_args::supported_architectures::SupportedArchitecturesArgs
use clap::{Args, ValueEnum};
use miette::Context;
use pacquet_config::NodeLinker;
use pacquet_lockfile::Lockfile;
use pacquet_lockfile::{Lockfile, LockfileResolution};
use pacquet_package_manager::{Install, TarballPrefetcher, UpdateSeedPolicy};
use pacquet_package_manifest::DependencyGroup;
use pacquet_pnpr_client::{PnprClient, PnprClientError, ResolveOptions};
use pacquet_reporter::Reporter;
const BENCHMARK_PNPR_SERVER_REGISTRY_ENV: &str = "PACQUET_BENCHMARK_PNPR_SERVER_REGISTRY";
const BENCHMARK_PNPR_TARBALL_REWRITE_FROM_ENV: &str = "PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM";
/// `--node-linker` value parser. CLI mirror of
/// [`pacquet_config::NodeLinker`] so the config crate stays free
/// of `clap` as a dependency. Converted to the canonical enum at
@@ -461,6 +464,11 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
.map(serde_json::to_value)
.transpose()
.map_err(|err| miette::miette!("failed to serialize overrides: {err}"))?;
let benchmark_registry_override =
PnprBenchmarkRegistryOverride::from_env(&state.config.registry);
let resolve_registry = benchmark_registry_override
.as_ref()
.map_or_else(|| state.config.registry.clone(), |registry| registry.resolve_registry());
// Send the on-disk lockfile + the full client policy so the server
// verifies the input lockfile under *our* policy before resolving;
@@ -474,7 +482,7 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
dependencies,
dev_dependencies,
optional_dependencies,
registry: state.config.registry.clone(),
registry: resolve_registry,
named_registries: state.config.named_registries.clone(),
// Forward the whole credential map: the registries a graph
// touches aren't known up front (scope-routed or tarball-URL
@@ -532,13 +540,17 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
Some(prefetcher) => {
client
.resolve_streaming(opts, |pkg| {
prefetcher.prefetch(pkg.id, pkg.tarball, &pkg.integrity);
let tarball = benchmark_registry_override.as_ref().map_or_else(
|| pkg.tarball.clone(),
|registry| registry.client_tarball_url(&pkg.tarball),
);
prefetcher.prefetch(pkg.id, tarball, &pkg.integrity);
})
.await
}
None => client.resolve(opts).await,
};
let outcome = match result {
let mut outcome = match result {
Ok(outcome) => outcome,
// The server rejected the input lockfile under our policy.
// Surface the reconstructed `VerifyError` so the abort + the
@@ -551,6 +563,9 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
.wrap_err("resolving dependencies via the pnpr server");
}
};
if let Some(registry) = benchmark_registry_override.as_ref() {
registry.rewrite_lockfile(&mut outcome.lockfile);
}
if state.config.lockfile {
outcome
@@ -613,5 +628,112 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
Ok(())
}
struct PnprBenchmarkRegistryOverride {
resolve_registry: String,
tarball_rewrite: Option<BenchmarkRegistryRewrite>,
}
impl PnprBenchmarkRegistryOverride {
/// Benchmark-only hook for `pacquet/tasks/integrated-benchmark`.
///
/// The benchmark runs release-built pacquet and pnpr binaries, so this
/// cannot be hidden behind `#[cfg(test)]`. Keep every
/// `PACQUET_BENCHMARK_*` env read in this type: normal pnpr installs
/// take one no-op branch, while benchmark runs can ask the pnpr server
/// to resolve against a server-side registry URL and then rewrite
/// server-origin tarball URLs back to the client-facing registry. The
/// rewrite is applied before saving the lockfile because the benchmark's
/// frozen materialization must use the same client-registry path that
/// direct installs pay for.
fn from_env(client_registry: &str) -> Option<Self> {
let resolve_registry = std::env::var(BENCHMARK_PNPR_SERVER_REGISTRY_ENV)
.ok()
.filter(|registry| !registry.is_empty())
.map(|registry| normalize_registry(&registry))?;
let tarball_rewrite_from = std::env::var(BENCHMARK_PNPR_TARBALL_REWRITE_FROM_ENV)
.ok()
.filter(|registry| !registry.is_empty());
let tarball_rewrite = BenchmarkRegistryRewrite::new(
[Some(resolve_registry.as_str()), tarball_rewrite_from.as_deref()]
.into_iter()
.flatten(),
client_registry,
);
Some(Self { resolve_registry, tarball_rewrite })
}
fn resolve_registry(&self) -> String {
self.resolve_registry.clone()
}
fn client_tarball_url(&self, url: &str) -> String {
self.tarball_rewrite.as_ref().map_or_else(|| url.to_string(), |rewrite| rewrite.url(url))
}
fn rewrite_lockfile(&self, lockfile: &mut Lockfile) {
let Some(rewrite) = self.tarball_rewrite.as_ref() else { return };
let Some(packages) = lockfile.packages.as_mut() else { return };
for metadata in packages.values_mut() {
rewrite_resolution_registry(&mut metadata.resolution, rewrite);
}
}
}
struct BenchmarkRegistryRewrite {
from: Vec<String>,
to: String,
}
impl BenchmarkRegistryRewrite {
pub(super) fn new<Registry, Registries>(from: Registries, to: &str) -> Option<Self>
where
Registry: AsRef<str>,
Registries: IntoIterator<Item = Registry>,
{
let to = normalize_registry(to);
let mut from_registries = Vec::new();
for registry in from {
let registry = normalize_registry(registry.as_ref());
if registry != to && !from_registries.contains(&registry) {
from_registries.push(registry);
}
}
(!from_registries.is_empty()).then_some(Self { from: from_registries, to })
}
pub(super) fn url(&self, url: &str) -> String {
self.from
.iter()
.find_map(|from| url.strip_prefix(from))
.map_or_else(|| url.to_string(), |suffix| format!("{}{}", self.to, suffix))
}
}
fn normalize_registry(registry: &str) -> String {
if registry.ends_with('/') { registry.to_string() } else { format!("{registry}/") }
}
fn rewrite_resolution_registry(
resolution: &mut LockfileResolution,
rewrite: &BenchmarkRegistryRewrite,
) {
match resolution {
LockfileResolution::Tarball(resolution) => {
resolution.tarball = rewrite.url(&resolution.tarball);
}
LockfileResolution::Binary(resolution) => {
resolution.url = rewrite.url(&resolution.url);
}
LockfileResolution::Variations(resolution) => {
for variant in &mut resolution.variants {
rewrite_resolution_registry(&mut variant.resolution, rewrite);
}
}
LockfileResolution::Directory(_)
| LockfileResolution::Git(_)
| LockfileResolution::Registry(_) => {}
}
}
#[cfg(test)]
mod tests;

View File

@@ -1,6 +1,10 @@
use super::{InstallArgs, InstallDependencyOptions, NodeLinkerArg};
use super::{
BenchmarkRegistryRewrite, InstallArgs, InstallDependencyOptions, NodeLinkerArg,
PnprBenchmarkRegistryOverride, rewrite_resolution_registry,
};
use clap::Parser;
use pacquet_config::NodeLinker;
use pacquet_lockfile::{LockfileResolution, TarballResolution};
use pacquet_package_manifest::DependencyGroup;
use pipe_trait::Pipe;
use pretty_assertions::assert_eq;
@@ -205,3 +209,85 @@ fn node_linker_arg_into_config_matches_every_variant() {
}
}
}
#[test]
fn registry_rewrite_replaces_only_the_configured_registry_prefix() {
let rewrite = BenchmarkRegistryRewrite::new(
["http://server-registry.test"],
"http://client-registry.test",
)
.expect("different registries create a rewrite");
assert_eq!(
rewrite.url("http://server-registry.test/foo/-/foo-1.0.0.tgz"),
"http://client-registry.test/foo/-/foo-1.0.0.tgz",
);
assert_eq!(
rewrite.url("http://other-registry.test/foo/-/foo-1.0.0.tgz"),
"http://other-registry.test/foo/-/foo-1.0.0.tgz",
);
}
#[test]
fn registry_rewrite_accepts_multiple_server_registry_prefixes() {
let rewrite = BenchmarkRegistryRewrite::new(
["http://server-proxy.test", "http://server-registry.test"],
"http://client-registry.test",
)
.expect("different registries create a rewrite");
assert_eq!(
rewrite.url("http://server-proxy.test/foo/-/foo-1.0.0.tgz"),
"http://client-registry.test/foo/-/foo-1.0.0.tgz",
);
assert_eq!(
rewrite.url("http://server-registry.test/foo/-/foo-1.0.0.tgz"),
"http://client-registry.test/foo/-/foo-1.0.0.tgz",
);
}
#[test]
fn registry_rewrite_is_none_for_equal_registries_after_normalization() {
assert!(
BenchmarkRegistryRewrite::new(["http://registry.test"], "http://registry.test/").is_none(),
);
}
#[test]
fn registry_rewrite_updates_explicit_tarball_resolution_urls() {
let rewrite = BenchmarkRegistryRewrite::new(
["http://server-registry.test"],
"http://client-registry.test",
)
.expect("different registries create a rewrite");
let mut resolution = LockfileResolution::Tarball(TarballResolution {
tarball: "http://server-registry.test/foo/-/foo-1.0.0.tgz".to_string(),
integrity: None,
git_hosted: None,
path: None,
});
rewrite_resolution_registry(&mut resolution, &rewrite);
let LockfileResolution::Tarball(resolution) = resolution else {
panic!("resolution stays tarball");
};
assert_eq!(resolution.tarball, "http://client-registry.test/foo/-/foo-1.0.0.tgz");
}
#[test]
fn pnpr_benchmark_override_keeps_resolve_registry_separate_from_tarball_rewrite() {
let override_ = PnprBenchmarkRegistryOverride {
resolve_registry: "http://server-proxy.test/".to_string(),
tarball_rewrite: BenchmarkRegistryRewrite::new(
["http://server-proxy.test", "http://server-registry.test"],
"http://client-registry.test",
),
};
assert_eq!(override_.resolve_registry(), "http://server-proxy.test/");
assert_eq!(
override_.client_tarball_url("http://server-registry.test/foo/-/foo-1.0.0.tgz"),
"http://client-registry.test/foo/-/foo-1.0.0.tgz",
);
}

View File

@@ -13,7 +13,7 @@ repository.workspace = true
[dependencies]
miette = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }
[lints]
workspace = true

View File

@@ -8,11 +8,17 @@ pub fn enable_tracing_by_env() {
use tracing_subscriber::{fmt, prelude::*};
let layer = common_layer(&trace_var);
tracing_subscriber::registry()
.with(layer)
.with(fmt::layer().pretty().with_file(true).with_span_events(FmtSpan::CLOSE))
.init();
if std::env::var("TRACE_FORMAT").is_ok_and(|format| format == "json") {
tracing_subscriber::registry()
.with(layer)
.with(fmt::layer().json().flatten_event(true))
.init();
} else {
tracing_subscriber::registry()
.with(layer)
.with(fmt::layer().pretty().with_file(true).with_span_events(FmtSpan::CLOSE))
.init();
}
tracing::trace!("enable_tracing_by_env");
}

View File

@@ -55,6 +55,8 @@ pub struct CreateVirtualDirBySnapshot<'a> {
/// `linkAllModules` at
/// <https://github.com/pnpm/pnpm/blob/f2981a316/installing/deps-installer/src/install/link.ts#L540>.
pub skipped: &'a SkippedSnapshots,
#[cfg(test)]
pub(crate) link_concurrency_probe: Option<&'a tests::LinkConcurrencyProbe>,
}
/// Error type of [`CreateVirtualDirBySnapshot`].
@@ -88,8 +90,14 @@ impl<'a> CreateVirtualDirBySnapshot<'a> {
package_key,
snapshot,
skipped,
#[cfg(test)]
link_concurrency_probe,
} = self;
#[cfg(test)]
let _link_concurrency_guard =
link_concurrency_probe.map(tests::LinkConcurrencyProbe::enter);
let virtual_node_modules_dir = layout.slot_dir(package_key).join("node_modules");
fs::create_dir_all(&virtual_node_modules_dir).map_err(|error| {
CreateVirtualDirError::CreateNodeModulesDir {
@@ -174,4 +182,4 @@ pub(crate) fn optimistic_wire_method(method: PackageImportMethod) -> WireImportM
}
#[cfg(test)]
mod tests;
pub(crate) mod tests;

View File

@@ -7,10 +7,84 @@ use pacquet_reporter::{
use std::{
collections::HashMap,
path::Path,
sync::{Mutex, atomic::AtomicU8},
sync::{
Condvar, Mutex,
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
},
time::Duration,
};
use tempfile::tempdir;
pub(crate) struct LinkConcurrencyProbe {
current: AtomicUsize,
max: AtomicUsize,
wait_for_overlap: bool,
wait_started: AtomicBool,
mutex: Mutex<()>,
condvar: Condvar,
}
impl LinkConcurrencyProbe {
pub(crate) fn waiting_for_overlap() -> Self {
Self { wait_for_overlap: true, ..Self::default() }
}
pub(crate) fn max_concurrent(&self) -> usize {
self.max.load(Ordering::SeqCst)
}
pub(super) fn enter(&self) -> LinkConcurrencyGuard<'_> {
let current = self.current.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = self.max.load(Ordering::SeqCst);
while current > max {
match self.max.compare_exchange_weak(max, current, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => {
self.condvar.notify_all();
break;
}
Err(next) => max = next,
}
}
if self.wait_for_overlap && current == 1 && !self.wait_started.swap(true, Ordering::SeqCst)
{
let guard = self.mutex.lock().expect("lock link-concurrency probe");
let _ = self
.condvar
.wait_timeout_while(guard, Duration::from_secs(2), |_| {
self.max.load(Ordering::SeqCst) < 2
})
.expect("wait for overlapping link");
}
LinkConcurrencyGuard { probe: self }
}
}
impl Default for LinkConcurrencyProbe {
fn default() -> Self {
Self {
current: AtomicUsize::new(0),
max: AtomicUsize::new(0),
wait_for_overlap: false,
wait_started: AtomicBool::new(false),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}
}
}
pub(super) struct LinkConcurrencyGuard<'a> {
probe: &'a LinkConcurrencyProbe,
}
impl Drop for LinkConcurrencyGuard<'_> {
fn drop(&mut self) {
self.probe.current.fetch_sub(1, Ordering::SeqCst);
self.probe.condvar.notify_all();
}
}
/// `optimistic_wire_method` is the source of truth for the
/// configured-method → wire-method mapping the `imported` event
/// reports. `Auto` and `CloneOrCopy` collapse to `Clone` (the
@@ -75,6 +149,7 @@ async fn run_emits_imported_event_after_import_indexed_dir() {
package_key: &package_key,
snapshot: &snapshot,
skipped: &skipped,
link_concurrency_probe: None,
}
.run::<RecordingReporter>()
.expect("empty-cas-paths run should succeed");

View File

@@ -5,7 +5,7 @@ use crate::{
use derive_more::{Display, Error};
use futures_util::future;
use miette::Diagnostic;
use pacquet_config::{Config, NodeLinker};
use pacquet_config::{Config, NodeLinker, PackageImportMethod};
use pacquet_deps_path::get_pkg_id_with_patch_hash;
use pacquet_lockfile::{
LockfileResolution, PackageKey, PackageMetadata, PkgIdWithPatchHash, PkgNameVerPeer,
@@ -171,6 +171,9 @@ pub struct CreateVirtualStore<'a> {
/// the fresh-resolve path's [`crate::PrefetchingResolver`] (closing
/// <https://github.com/pnpm/pnpm/issues/12241>); `None` otherwise.
pub tarball_mem_cache: Option<&'a std::sync::Arc<MemCache>>,
#[cfg(test)]
pub(crate) link_concurrency_probe:
Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>,
}
/// Error type of [`CreateVirtualStore`].
@@ -217,6 +220,8 @@ impl<'a> CreateVirtualStore<'a> {
node_linker,
progress_reported,
tarball_mem_cache,
#[cfg(test)]
link_concurrency_probe,
} = self;
let is_hoisted = matches!(node_linker, NodeLinker::Hoisted);
@@ -635,6 +640,16 @@ impl<'a> CreateVirtualStore<'a> {
None => cold.push((snapshot_key, snapshot)),
}
}
tracing::info!(
target: "pacquet::install::phase",
phase = "create_virtual_store_partition",
warm = warm.len(),
cold = cold.len(),
skipped = skipped_entries.len(),
total = snapshot_entries.len(),
node_linker = ?node_linker,
"phase complete",
);
// Hoisted-mode CAS index assembly. Collected here, *before*
// the warm-batch closure consumes `warm` under the
@@ -659,21 +674,6 @@ impl<'a> CreateVirtualStore<'a> {
let import_method = config.package_import_method;
if !is_hoisted {
use rayon::prelude::*;
// Driving the warm batch from inside an `async fn` means
// the `par_iter` blocks the calling tokio worker for the
// duration. On the production multi-thread runtime that's
// fine — `block_in_place` tells the runtime to migrate any
// other futures off this worker first, so async progress
// continues on the other workers — but `block_in_place`
// panics on `current_thread` runtimes, which is what
// `#[tokio::test]` defaults to. Detect the flavor and only
// call `block_in_place` when it's safe; on
// `current_thread` we fall back to a plain inline call,
// matching how the rest of the test suite already runs
// sync work directly on the test thread (Copilot review on
// <https://github.com/pnpm/pacquet/pull/292>).
//
// Hoisted skips this batch entirely: no virtual-store slot
// gets written, so there's no per-snapshot link work to
// do — the CAS paths captured below are the only output
@@ -681,42 +681,27 @@ impl<'a> CreateVirtualStore<'a> {
// `nodeLinker === 'hoisted'` guard at
// <https://github.com/pnpm/pnpm/blob/94240bc046/installing/deps-restorer/src/index.ts#L411-L425>
// which routes all link work into `linkHoistedModules`.
let warm_work = move || {
warm.par_iter().try_for_each(|(snapshot_key, snapshot, cas_paths, cache_key)| {
let package_id = snapshot_key.without_peer().to_string();
emit_warm_snapshot_progress::<Reporter>(
&package_id,
requester,
progress_reported.contains(*cache_key),
);
crate::CreateVirtualDirBySnapshot {
layout,
cas_paths: cas_paths.as_ref(),
import_method,
logged_methods,
requester,
package_id: &package_id,
package_key: snapshot_key,
snapshot,
skipped,
}
.run::<Reporter>()
.map_err(|error| {
CreateVirtualStoreError::InstallPackageBySnapshot(
InstallPackageBySnapshotError::CreateVirtualDir(error),
)
})
let warm_slots: Vec<SlotLink<'_>> = warm
.iter()
.map(|(snapshot_key, snapshot, cas_paths, cache_key)| SlotLink {
snapshot_key,
snapshot,
cas_paths: cas_paths.as_ref(),
warm_cache_key: Some(cache_key),
})
};
let on_multi_thread = tokio::runtime::Handle::try_current().is_ok_and(|handle| {
handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread
});
if on_multi_thread {
tokio::task::block_in_place(warm_work)?;
} else {
warm_work()?;
}
.collect();
link_slots_parallel::<Reporter>(LinkSlotsParallel {
batch: "warm",
slots: &warm_slots,
layout,
import_method,
logged_methods,
requester,
skipped,
progress_reported,
#[cfg(test)]
link_concurrency_probe,
})?;
} else {
// Hoisted still wants the progress reporter to fire so
// `pnpm:progress imported`-style updates render the warm
@@ -795,6 +780,8 @@ impl<'a> CreateVirtualStore<'a> {
// below so it doesn't serialize inside this
// cooperative `try_join_all` task.
defer_link: true,
#[cfg(test)]
link_concurrency_probe,
}
.run::<Reporter>()
.await;
@@ -853,43 +840,27 @@ impl<'a> CreateVirtualStore<'a> {
// concurrently. Hoisted writes no slots, so it skips this and
// consumes `cold_cas_paths` for the per-pkg CAS index below.
if !is_hoisted && !cold_cas_paths.is_empty() {
use rayon::prelude::*;
let import_method = config.package_import_method;
let link_work = || {
cold_cas_paths.par_iter().try_for_each(|(snapshot_key, snapshot, cas_paths)| {
let package_id = snapshot_key.without_peer().to_string();
crate::CreateVirtualDirBySnapshot {
layout,
cas_paths,
import_method,
logged_methods,
requester,
package_id: &package_id,
package_key: snapshot_key,
snapshot,
skipped,
}
.run::<Reporter>()
.map_err(|error| {
CreateVirtualStoreError::InstallPackageBySnapshot(
InstallPackageBySnapshotError::CreateVirtualDir(error),
)
})
let cold_slots: Vec<SlotLink<'_>> = cold_cas_paths
.iter()
.map(|(snapshot_key, snapshot, cas_paths)| SlotLink {
snapshot_key,
snapshot,
cas_paths,
warm_cache_key: None,
})
};
// `block_in_place` (the same guard the warm batch uses)
// migrates other futures off this worker so async progress
// continues; it panics on the `current_thread` runtime that
// `#[tokio::test]` defaults to, so fall back to a plain call
// there.
let on_multi_thread = tokio::runtime::Handle::try_current().is_ok_and(|handle| {
handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread
});
if on_multi_thread {
tokio::task::block_in_place(link_work)?;
} else {
link_work()?;
}
.collect();
link_slots_parallel::<Reporter>(LinkSlotsParallel {
batch: "cold",
slots: &cold_slots,
layout,
import_method,
logged_methods,
requester,
skipped,
progress_reported,
#[cfg(test)]
link_concurrency_probe,
})?;
}
// Build the per-pkg CAS index when the install is targeting
@@ -950,6 +921,103 @@ impl<'a> CreateVirtualStore<'a> {
}
}
struct SlotLink<'a> {
snapshot_key: &'a PackageKey,
snapshot: &'a SnapshotEntry,
cas_paths: &'a HashMap<String, PathBuf>,
warm_cache_key: Option<&'a str>,
}
struct LinkSlotsParallel<'a> {
batch: &'static str,
slots: &'a [SlotLink<'a>],
layout: &'a crate::VirtualStoreLayout,
import_method: PackageImportMethod,
logged_methods: &'a AtomicU8,
requester: &'a str,
skipped: &'a SkippedSnapshots,
progress_reported: &'a SharedReportedProgressKeys,
#[cfg(test)]
link_concurrency_probe:
Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>,
}
fn link_slots_parallel<Reporter: self::Reporter>(
opts: LinkSlotsParallel<'_>,
) -> Result<(), CreateVirtualStoreError> {
use rayon::prelude::*;
let LinkSlotsParallel {
batch,
slots,
layout,
import_method,
logged_methods,
requester,
skipped,
progress_reported,
#[cfg(test)]
link_concurrency_probe,
} = opts;
let phase_start = std::time::Instant::now();
let link_work = || {
slots.par_iter().try_for_each(|slot| {
let package_id = slot.snapshot_key.without_peer().to_string();
if let Some(cache_key) = slot.warm_cache_key {
emit_warm_snapshot_progress::<Reporter>(
&package_id,
requester,
progress_reported.contains(cache_key),
);
}
crate::CreateVirtualDirBySnapshot {
layout,
cas_paths: slot.cas_paths,
import_method,
logged_methods,
requester,
package_id: &package_id,
package_key: slot.snapshot_key,
snapshot: slot.snapshot,
skipped,
#[cfg(test)]
link_concurrency_probe,
}
.run::<Reporter>()
.map_err(|error| {
CreateVirtualStoreError::InstallPackageBySnapshot(
InstallPackageBySnapshotError::CreateVirtualDir(error),
)
})
})
};
// Driving the link pass from inside an `async fn` means the
// `par_iter` blocks the calling tokio worker for the duration. On
// the production multi-thread runtime, `block_in_place` migrates
// other futures off this worker so async progress continues; it
// panics on the `current_thread` runtime that `#[tokio::test]`
// defaults to, so fall back to a plain call there.
let on_multi_thread = tokio::runtime::Handle::try_current()
.is_ok_and(|handle| handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread);
if on_multi_thread {
tokio::task::block_in_place(link_work)?;
} else {
link_work()?;
}
tracing::info!(
target: "pacquet::install::phase",
phase = "link_slots",
batch,
slots = slots.len(),
elapsed_ms = phase_start.elapsed().as_millis() as u64,
"phase complete",
);
Ok(())
}
/// Build the store-index cache key for a snapshot.
///
/// Returns:

View File

@@ -1,13 +1,17 @@
use super::{
CreateVirtualStoreError, InstallPackageBySnapshotError, emit_warm_snapshot_progress,
integrity_equal, snapshot_cache_key, snapshot_deps_equal,
CreateVirtualStore, CreateVirtualStoreError, InstallPackageBySnapshotError,
emit_warm_snapshot_progress, integrity_equal, snapshot_cache_key, snapshot_deps_equal,
};
use pacquet_lockfile::{
GitResolution, LockfileResolution, PackageKey, PackageMetadata, PkgName, PkgVerPeer,
RegistryResolution, SnapshotDepRef, SnapshotEntry, TarballResolution,
};
use pacquet_reporter::{LogEvent, ProgressMessage, Reporter};
use std::{collections::HashMap, sync::Mutex};
use pacquet_reporter::{LogEvent, ProgressMessage, Reporter, SilentReporter};
use std::{
collections::HashMap,
fs,
sync::{Arc, Mutex, atomic::AtomicU8},
};
fn name(text: &str) -> PkgName {
PkgName::parse(text).expect("parse pkg name")
@@ -39,6 +43,115 @@ fn snapshot_with_dep(child: &str, ref_str: &str) -> SnapshotEntry {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cold_batch_links_slots_in_parallel() {
use crate::{AllowBuildPolicy, SkippedSnapshots, VirtualStoreLayout};
use pacquet_config::{Config, NodeLinker, PackageImportMethod};
use pacquet_store_dir::StoreIndexWriter;
use pacquet_tarball::{CacheValue, MemCache, SharedReportedProgressKeys};
if rayon::current_num_threads() < 2 {
eprintln!(
"skipping cold-batch concurrency assertion with rayon_threads={}",
rayon::current_num_threads(),
);
return;
}
let root = tempfile::tempdir().expect("create temp dir");
let workspace_root = root.path().join("workspace");
fs::create_dir_all(&workspace_root).expect("create workspace root");
let modules_dir = workspace_root.join("node_modules");
let virtual_store_dir = modules_dir.join(".pacquet");
let store_dir = root.path().join("store");
let mut config = Config::new();
config.registry = "https://registry.test".to_string();
config.store_dir = store_dir.into();
config.modules_dir = modules_dir;
config.virtual_store_dir = virtual_store_dir.clone();
config.package_import_method = PackageImportMethod::Copy;
config.offline = true;
let config = config.leak();
let mut snapshots = HashMap::new();
let mut packages = HashMap::new();
let mem_cache = Arc::new(MemCache::default());
for package_name in ["cold-a", "cold-b", "cold-c", "cold-d"] {
let package_key = key(package_name, "1.0.0");
let source_dir = workspace_root.join("prefetched").join(package_name);
fs::create_dir_all(&source_dir).expect("create prefetched package dir");
let manifest_path = source_dir.join("package.json");
fs::write(&manifest_path, format!(r#"{{"name":"{package_name}","version":"1.0.0"}}"#))
.expect("write package manifest");
let index_path = source_dir.join("index.js");
fs::write(&index_path, "module.exports = true\n").expect("write package body");
let cas_paths = HashMap::from([
("package.json".to_string(), manifest_path),
("index.js".to_string(), index_path),
]);
mem_cache.insert(
format!("https://registry.test/{package_name}/-/{package_name}-1.0.0.tgz"),
Arc::new(tokio::sync::RwLock::new(CacheValue::Available(Arc::new(cas_paths)))),
);
snapshots.insert(package_key.clone(), SnapshotEntry::default());
packages.insert(package_key.without_peer(), metadata_with_integrity(DUMMY_SHA512));
}
let allow_build_policy = AllowBuildPolicy::default();
let layout = VirtualStoreLayout::new(
config,
None,
Some(&snapshots),
Some(&packages),
Some(&allow_build_policy),
);
let skipped = SkippedSnapshots::new();
let logged_methods = AtomicU8::new(0);
let progress_reported = SharedReportedProgressKeys::default();
let (store_index_writer, writer_task) = StoreIndexWriter::spawn(&config.store_dir);
let requester = workspace_root.to_string_lossy().into_owned();
let probe =
crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe::waiting_for_overlap();
CreateVirtualStore {
http_client: &Default::default(),
config,
packages: Some(&packages),
snapshots: Some(&snapshots),
current_snapshots: None,
current_packages: None,
layout: &layout,
logged_methods: &logged_methods,
requester: &requester,
store_index_writer: &store_index_writer,
allow_build_policy: &allow_build_policy,
skipped: &skipped,
workspace_root: &workspace_root,
node_linker: NodeLinker::Isolated,
progress_reported: &progress_reported,
tarball_mem_cache: Some(&mem_cache),
link_concurrency_probe: Some(&probe),
}
.run::<SilentReporter>()
.await
.expect("all-cold virtual-store creation should succeed from the mem cache");
drop(store_index_writer);
writer_task.await.expect("join store-index writer").expect("flush store-index writer");
assert!(
probe.max_concurrent() >= 2,
"cold-batch slot linking must overlap; observed max_concurrent={} with rayon_threads={}",
probe.max_concurrent(),
rayon::current_num_threads(),
);
}
const DUMMY_SHA512: &str = "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==";
/// `emit_warm_snapshot_progress` fires `resolved` then
/// `found_in_store` when no earlier fetch path already emitted the
/// package status. Both events carry the same identifiers — pnpm's

View File

@@ -590,6 +590,7 @@ where
// leaves every warm package reported as `found_in_store`.
let progress_reported = SharedReportedProgressKeys::default();
let phase_start = std::time::Instant::now();
let CreateVirtualStoreOutput {
package_manifests,
side_effects_maps_by_snapshot,
@@ -612,10 +613,18 @@ where
node_linker,
progress_reported: &progress_reported,
tarball_mem_cache,
#[cfg(test)]
link_concurrency_probe: None,
}
.run::<Reporter>()
.await
.map_err(InstallFrozenLockfileError::CreateVirtualStore)?;
tracing::info!(
target: "pacquet::install::phase",
phase = "create_virtual_store",
elapsed_ms = phase_start.elapsed().as_millis() as u64,
"phase complete",
);
// Fold fetch-failure swallows into the live skip set so
// downstream consumers (`SymlinkDirectDependencies`,

View File

@@ -124,6 +124,9 @@ pub struct InstallPackageBySnapshot<'a> {
/// once its tarball is in the store. No effect under
/// [`NodeLinker::Hoisted`], which never writes virtual-store slots.
pub defer_link: bool,
#[cfg(test)]
pub(crate) link_concurrency_probe:
Option<&'a crate::create_virtual_dir_by_snapshot::tests::LinkConcurrencyProbe>,
}
/// Error type of [`InstallPackageBySnapshot`].
@@ -259,6 +262,8 @@ impl<'a> InstallPackageBySnapshot<'a> {
workspace_root,
node_linker,
defer_link,
#[cfg(test)]
link_concurrency_probe,
} = self;
// TODO: skip when already exists in store?
@@ -559,6 +564,8 @@ impl<'a> InstallPackageBySnapshot<'a> {
package_key,
snapshot,
skipped,
#[cfg(test)]
link_concurrency_probe,
}
.run::<Reporter>()
.map_err(InstallPackageBySnapshotError::CreateVirtualDir)?;

View File

@@ -402,6 +402,7 @@ async fn cold_batch_reuses_in_flight_prefetch_from_mem_cache() {
// back directly.
node_linker: pacquet_config::NodeLinker::Hoisted,
defer_link: false,
link_concurrency_probe: None,
}
.run::<pacquet_reporter::SilentReporter>()
.await
@@ -471,6 +472,7 @@ async fn without_mem_cache_skips_coordination_and_downloads() {
workspace_root: store_tmp.path(),
node_linker: pacquet_config::NodeLinker::Hoisted,
defer_link: false,
link_concurrency_probe: None,
}
.run::<pacquet_reporter::SilentReporter>()
.await
@@ -540,6 +542,7 @@ async fn cold_batch_falls_back_when_prefetch_failed() {
workspace_root: store_tmp.path(),
node_linker: pacquet_config::NodeLinker::Hoisted,
defer_link: false,
link_concurrency_probe: None,
}
.run::<pacquet_reporter::SilentReporter>()
.await

View File

@@ -1309,6 +1309,8 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
// batch through the mem cache makes it reuse the in-flight
// download instead.
tarball_mem_cache: Some(&tarball_mem_cache),
#[cfg(test)]
link_concurrency_probe: None,
}
.run::<Reporter>()
.await

View File

@@ -24,6 +24,7 @@ os_display = { workspace = true }
pipe-trait = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde-saphyr = { workspace = true }
tokio = { workspace = true }
which = { workspace = true }

View File

@@ -49,28 +49,36 @@ pub struct CliArgs {
#[clap(long, default_value_t = 0)]
pub pnpr_latency_ms: u64,
/// Round-trip latency, in milliseconds, to inject on the link to the
/// *registry*, applied to **every** client that touches it: direct
/// `pacquet@<rev>` / `pnpm@<rev>` installs, the `pnpr@<rev>` server's
/// resolution, and the pnpr client's tarball fetches. A request to the
/// registry-mock should cost the same regardless of who makes it, so
/// the registry-mock is uniformly as remote as the real one; pnpr's
/// advantage then shows up as fewer client round trips (one
/// `--pnpr-latency-ms` hop to the server) rather than a faster
/// backend. `0` disables injection; ignored with `--registry=npm`
/// (already remote).
/// Round-trip latency, in milliseconds, to inject on the client link
/// to the registry. Direct `pacquet@<rev>` / `pnpm@<rev>` installs and
/// pnpr clients' tarball fetches use this link. The pnpr server's own
/// resolution uses `--pnpr-server-registry-latency-ms` instead, so a
/// benchmark can model a co-located warm server resolving quickly while
/// remote clients still fetch tarballs across the slower registry link.
/// `0` disables injection; ignored with `--registry=npm` (already
/// remote).
#[clap(long, default_value_t = 0)]
pub registry_latency_ms: u64,
/// Round-trip latency, in milliseconds, to inject between each
/// `pnpr@<rev>` server and the registry it uses for resolution. Keep
/// this low (often `0`) when modeling production, where pnpr sits near
/// its registry/cache backend; otherwise the server resolves too slowly
/// and the benchmark under-represents the client's cold materialization
/// batch. Direct installs and pnpr clients' tarball fetches are
/// unaffected; they use `--registry-latency-ms`.
#[clap(long, default_value_t = 0)]
pub pnpr_server_registry_latency_ms: u64,
/// Download-bandwidth cap, in **megabits per second**, on the link to
/// the registry, applied to every client (direct installs and the pnpr
/// server + client alike), so tarball fetches take the time they would
/// over a real connection instead of being free on loopback. Loopback
/// serves at ~GB/s; the public npm registry measured ~190 Mbit/s
/// (~24 MB/s) peak on a fast link, and typical home/CI links are
/// 50200 Mbit/s. Pairs with `--registry-latency-ms` (latency
/// dominates small packages, bandwidth dominates large ones). `0`
/// leaves the registry at loopback speed; ignored with
/// the client-facing registry, applied to direct installs and pnpr
/// clients' tarball fetches, so tarballs take the time they would over
/// a real connection instead of being free on loopback. Loopback serves
/// at ~GB/s; the public npm registry measured ~190 Mbit/s (~24 MB/s)
/// peak on a fast link, and typical home/CI links are 50200 Mbit/s.
/// Pairs with `--registry-latency-ms` (latency dominates small
/// packages, bandwidth dominates large ones). `0` leaves the registry
/// at loopback speed; ignored with
/// `--registry=npm` (already remote).
#[clap(long, default_value_t = 0.0)]
pub registry_bandwidth_mbps: f64,
@@ -164,7 +172,7 @@ pub enum RegistryMode {
clippy::enum_variant_names,
reason = "the shared `Isolated` prefix mirrors the scenario slug and keeps the linker grouping legible; it stops firing once other linker buckets land"
)]
#[derive(Debug, Clone, Copy, ValueEnum)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum BenchmarkScenario {
/// No lockfile, cold cache + cold store. Mirrors `pnpm install` with nothing on disk.
#[value(name = "isolated-linker.fresh-install.cold-cache.cold-store")]
@@ -303,6 +311,18 @@ impl BenchmarkScenario {
pub fn enables_gvs(self) -> bool {
matches!(self, BenchmarkScenario::GvsFreshRestoreHotCacheHotStore)
}
/// Scenarios where pnpr's server-side resolution is expected to beat
/// or match a direct pacquet install. Hot-cache scenarios deliberately
/// skip this canary because there is little resolution work left to
/// offload and the remote pnpr hop can dominate.
pub fn expects_pnpr_not_slower_than_direct(self) -> bool {
matches!(
self,
BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore
| BenchmarkScenario::IsolatedFreshInstallColdCacheHotStore,
)
}
}
#[derive(Debug, Args)]

View File

@@ -31,6 +31,7 @@ async fn main() {
with_pnpm,
pnpr_latency_ms,
registry_latency_ms,
pnpr_server_registry_latency_ms,
registry_bandwidth_mbps,
reuse_prebuilt_binaries,
build_only,
@@ -166,8 +167,9 @@ async fn main() {
fixture_dir,
pnpr_latency_ms,
registry_latency_ms,
pnpr_server_registry_latency_ms,
registry_bandwidth_mbps,
registry_port,
registry_port: spawned_registry_port,
reuse_prebuilt_binaries,
};
if build_only {

View File

@@ -12,8 +12,11 @@ use os_display::Quotable;
use pacquet_fs::file_mode::make_file_executable;
use pacquet_registry_mock::pick_unused_port;
use pipe_trait::Pipe;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
borrow::Cow,
collections::HashMap,
fmt,
fs::{self, File},
io::Write,
@@ -24,6 +27,13 @@ use std::{
time::Duration,
};
const BENCHMARK_OUTPUT_LOG: &str = "BENCHMARK_OUTPUT.ndjson";
const BENCHMARK_DIAGNOSTICS_JSON: &str = "BENCHMARK_DIAGNOSTICS.json";
const BENCHMARK_DIAGNOSTICS_MD: &str = "BENCHMARK_DIAGNOSTICS.md";
const PNPR_DIRECT_RATIO_MAX: f64 = 1.05;
const PNPR_SERVER_REGISTRY_ENV: &str = "PACQUET_BENCHMARK_PNPR_SERVER_REGISTRY";
const PNPR_TARBALL_REWRITE_FROM_ENV: &str = "PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM";
#[derive(Debug)]
pub struct WorkEnv {
pub root: PathBuf,
@@ -47,6 +57,12 @@ pub struct WorkEnv {
/// `0` leaves the registry on loopback. Ignored in `--registry=npm`
/// mode (already remote).
pub registry_latency_ms: u64,
/// Round-trip latency (ms) between a pnpr server and the registry it
/// resolves against. Separate from `registry_latency_ms` so the
/// benchmark can model a co-located server with fast metadata access
/// while clients still fetch tarballs over a remote link. Ignored in
/// `--registry=npm` mode.
pub pnpr_server_registry_latency_ms: u64,
/// Download-bandwidth cap (megabits/sec) on the link to the registry,
/// applied to every client, so tarball fetches cost real time instead
/// of being free on loopback. `0` leaves the registry at loopback
@@ -201,7 +217,7 @@ impl WorkEnv {
fs::create_dir_all(&dir).expect("create directory for the revision");
create_package_json(&dir, self.fixture_dir.as_deref());
create_pnpm_workspace(&dir, self.fixture_dir.as_deref(), registry, scenario);
create_install_script(&dir, scenario, &WorkEnv::install_command(id), id.is_pnpr());
create_install_script(&dir, scenario, &WorkEnv::install_command(id), id);
create_npmrc(&dir, registry, scenario);
may_create_lockfile(&dir, scenario, self.fixture_dir.as_deref());
save_pristine_copies(&dir);
@@ -378,7 +394,7 @@ impl WorkEnv {
.pipe(executor("pnpm run compile-only"));
}
fn benchmark(&self) {
fn benchmark(&self, pnpr_server_registry: &str) {
let scenario = self.scenario.expect("scenario set when benchmark() is reached");
// Pre-benchmark wipe of `node_modules`, `store-dir`, and
@@ -406,6 +422,10 @@ impl WorkEnv {
fs::remove_dir_all(&path).expect("pre-benchmark wipe");
}
}
let output_log = dir.join(BENCHMARK_OUTPUT_LOG);
if output_log.exists() {
fs::remove_file(output_log).expect("pre-benchmark metrics-log wipe");
}
}
// Start a pnpr server per `pnpr@<rev>` target and keep the guards
@@ -413,7 +433,7 @@ impl WorkEnv {
// the end of this method. Empty (no-op) when there are no pnpr
// targets. Spawned before the GVS pre-warm below so a pnpr target
// would have its server up if a scenario ever combines the two.
let _pnpr_servers = self.start_pnpr_servers();
let _pnpr_servers = self.start_pnpr_servers(pnpr_server_registry);
// For GVS-warm we need a pre-warm pass: hyperfine's `--warmup`
// would otherwise time-from-empty for the first run since the
@@ -462,6 +482,7 @@ impl WorkEnv {
.arg(self.root().join("BENCHMARK_REPORT.md"));
executor("hyperfine")(&mut command);
self.write_benchmark_diagnostics();
}
/// Start a pnpr resolver server for every `pnpr@<rev>`
@@ -469,14 +490,14 @@ impl WorkEnv {
/// server gets an isolated `<bench_dir>/pnpr-storage`. The returned
/// guards keep the servers alive and kill them on drop; the vec is
/// empty when no target is a pnpr target.
fn start_pnpr_servers(&self) -> Vec<PnprServer> {
fn start_pnpr_servers(&self, pnpr_server_registry: &str) -> Vec<PnprServer> {
self.benchmarked_ids()
.filter(|id| id.is_pnpr())
.map(|id| self.start_pnpr_server(id))
.map(|id| self.start_pnpr_server(id, pnpr_server_registry))
.collect()
}
fn start_pnpr_server(&self, id: BenchId) -> PnprServer {
fn start_pnpr_server(&self, id: BenchId, pnpr_server_registry: &str) -> PnprServer {
let bench_dir = self.bench_dir(id);
let binary = bench_dir.join("pacquet").join("target").join("release").join("pnpr");
assert!(
@@ -547,9 +568,19 @@ impl WorkEnv {
// bare `PNPR_SERVER` is silently ignored and the install runs
// *direct* instead of through pnpr — making every `pnpr@<rev>`
// target a duplicate of its `pacquet@<rev>` row.
// `PACQUET_BENCHMARK_PNPR_TARBALL_REWRITE_FROM` is a source
// prefix, not the client fetch path. Some registry fixtures return
// raw upstream tarball URLs even when the server resolves through a
// latency proxy; the pacquet client rewrites this prefix to its
// configured registry, which is `client_registry` from `.npmrc`.
fs::write(
bench_dir.join(".pnpr-env"),
format!("export PNPM_CONFIG_PNPR_SERVER={client_url}\n"),
format!(
"export PNPM_CONFIG_PNPR_SERVER={client_url}\n\
export {PNPR_SERVER_REGISTRY_ENV}={pnpr_server_registry}\n\
export {PNPR_TARBALL_REWRITE_FROM_ENV}={tarball_rewrite_from}\n",
tarball_rewrite_from = self.registry,
),
)
.expect("write .pnpr-env");
@@ -557,22 +588,29 @@ impl WorkEnv {
}
pub fn run(&self) {
// Virtual mode points at an already-running registry, so this
// method can only wrap it with a random local proxy and bake that
// URL into the benchmark configs. Verdaccio mode is handled in
// `main`: the proxy must own the public registry port before
// registry-mock starts so packuments advertise proxied tarball URLs.
let registry_proxy = self.start_registry_proxy();
// The client registry URL is baked into every target's config
// during `init`. Direct pacquet/pnpm and the pnpr client tarball
// materialization go through this URL. The pnpr server receives a
// separate resolve-registry URL so server-side metadata access can
// be measured independently.
let registry_proxy = self.start_client_registry_proxy();
let client_registry = registry_proxy
.as_ref()
.map(|proxy| format!("http://{}/", proxy.addr))
.unwrap_or_else(|| self.registry.clone());
let pnpr_server_registry_proxy = self.start_pnpr_server_registry_proxy();
let pnpr_server_registry = pnpr_server_registry_proxy
.as_ref()
.map(|proxy| format!("http://{}/", proxy.addr))
.unwrap_or_else(|| self.registry_cache_populator.clone());
self.init(&client_registry);
self.build();
self.benchmark();
self.benchmark(&pnpr_server_registry);
drop(pnpr_server_registry_proxy);
drop(registry_proxy);
self.verify_pnpr_targets_were_routed();
self.verify_benchmark_diagnostics();
}
/// Fail the run if a `pnpr@<rev>` target never actually went through its
@@ -596,10 +634,11 @@ impl WorkEnv {
}
/// Start a proxy in front of an external virtual registry that emulates
/// a real link (latency + bandwidth cap) for every client, or `None`
/// when neither is requested. Spawned Verdaccio registries are proxied
/// in `main` so their advertised tarball URLs use the proxied port.
fn start_registry_proxy(&self) -> Option<LatencyProxy> {
/// a real link (latency + bandwidth cap) for benchmark clients, or
/// `None` when neither is requested. Spawned Verdaccio registries are
/// proxied in `main` so their advertised tarball URLs use the proxied
/// public port.
fn start_client_registry_proxy(&self) -> Option<LatencyProxy> {
let rate_limit = mbps_to_bytes_per_sec(self.registry_bandwidth_mbps);
if (self.registry_latency_ms == 0 && rate_limit.is_none())
|| matches!(self.registry_mode, RegistryMode::Npm | RegistryMode::Verdaccio)
@@ -624,15 +663,401 @@ impl WorkEnv {
Some(proxy)
}
/// The registry a given bench id resolves against. Every benchmarked
/// target — direct *and* pnpr — uses `client_registry` (the emulated
/// link when throttling is on; the raw registry otherwise), because a
/// request to the registry-mock should cost the same regardless of who
/// makes it. The proxy-cache populator may use a separate registry URL
/// for untimed cache priming.
/// Start a latency-only proxy for pnpr server-side registry access.
/// The client still uses [`Self::start_client_registry_proxy`], which
/// may have a higher latency and a bandwidth cap for tarball fetches.
fn start_pnpr_server_registry_proxy(&self) -> Option<LatencyProxy> {
if self.pnpr_server_registry_latency_ms == 0
|| matches!(self.registry_mode, RegistryMode::Npm)
{
return None;
}
let upstream = SocketAddr::from((Ipv4Addr::LOCALHOST, self.registry_port));
let profile = LinkProfile {
one_way: Duration::from_millis(self.pnpr_server_registry_latency_ms) / 2,
rate_limit: None,
};
let proxy =
LatencyProxy::spawn(upstream, profile).expect("spawn pnpr server registry proxy");
eprintln!(
"Fronting the pnpr server registry link with {}ms round-trip latency (proxy at {})",
self.pnpr_server_registry_latency_ms, proxy.addr,
);
Some(proxy)
}
/// The registry a given bench id resolves against from the client's
/// point of view. Direct targets use this for every registry request;
/// pnpr targets keep it as the materialization registry while their
/// server receives a separate resolve-registry override in `.pnpr-env`.
/// The proxy-cache populator may use a separate registry URL for
/// untimed cache priming.
fn registry_for<'a>(&'a self, id: BenchId, client_registry: &'a str) -> &'a str {
if id.is_proxy_cache_populator() { &self.registry_cache_populator } else { client_registry }
}
fn write_benchmark_diagnostics(&self) {
let diagnostics = self.collect_benchmark_diagnostics();
let json = serde_json::to_string_pretty(&diagnostics).expect("serialize diagnostics JSON");
fs::write(self.root().join(BENCHMARK_DIAGNOSTICS_JSON), json)
.expect("write benchmark diagnostics JSON");
let markdown = render_diagnostics_markdown(&diagnostics, self.scenario);
fs::write(self.root().join(BENCHMARK_DIAGNOSTICS_MD), &markdown)
.expect("write benchmark diagnostics markdown");
}
fn collect_benchmark_diagnostics(&self) -> BenchmarkDiagnostics {
let hyperfine = read_hyperfine_report(&self.root().join("BENCHMARK_REPORT.json"));
let commands_by_name: HashMap<String, HyperfineCommand> = hyperfine
.results
.into_iter()
.map(|command| (command.name().to_string(), command))
.collect();
let targets = self
.benchmarked_ids()
.map(|id| {
let id = id.to_string();
let phase_events =
read_phase_events(&self.root().join(&id).join(BENCHMARK_OUTPUT_LOG));
let command = commands_by_name.get(&id);
BenchmarkTargetDiagnostics {
id,
hyperfine_mean_seconds: command.map(|command| command.mean),
phase_summary: summarize_phase_events(&phase_events),
phase_events,
}
})
.collect();
BenchmarkDiagnostics {
targets,
pnpr_direct_ratios: collect_pnpr_direct_ratios(&commands_by_name),
}
}
fn verify_benchmark_diagnostics(&self) {
let diagnostics = read_benchmark_diagnostics(&self.root().join(BENCHMARK_DIAGNOSTICS_JSON));
self.verify_fresh_pnpr_cold_batch(&diagnostics);
self.verify_pnpr_direct_ratios(&diagnostics);
}
fn verify_fresh_pnpr_cold_batch(&self, diagnostics: &BenchmarkDiagnostics) {
if self.scenario != Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore) {
return;
}
for target in diagnostics
.targets
.iter()
.filter(|target| requires_fresh_pnpr_cold_batch_metrics(&target.id))
{
let Some(partition) = target.phase_summary.partition.as_ref() else {
panic!(
"{id} did not emit create_virtual_store_partition metrics; \
benchmark cannot prove the pnpr fresh install exercised the cold batch",
id = target.id,
);
};
assert!(
non_trivial_cold_batch(partition.cold, partition.total),
"{id} did not exercise a non-trivial cold batch: warm={} cold={} skipped={} total={}",
partition.warm,
partition.cold,
partition.skipped,
partition.total,
id = target.id,
);
}
}
fn verify_pnpr_direct_ratios(&self, diagnostics: &BenchmarkDiagnostics) {
let Some(scenario) = self.scenario else { return };
if !scenario.expects_pnpr_not_slower_than_direct() {
return;
}
for ratio in &diagnostics.pnpr_direct_ratios {
if ratio.revision != "HEAD" {
continue;
}
assert!(
ratio.ratio <= PNPR_DIRECT_RATIO_MAX,
"pnpr@{} was slower than pacquet@{}: ratio {:.3} > {:.3} (pnpr {:.3}s, pacquet {:.3}s)",
ratio.revision,
ratio.revision,
ratio.ratio,
PNPR_DIRECT_RATIO_MAX,
ratio.pnpr_mean_seconds,
ratio.pacquet_mean_seconds,
);
}
}
}
#[derive(Debug, Deserialize)]
struct HyperfineReport {
results: Vec<HyperfineCommand>,
}
#[derive(Debug, Clone, Deserialize)]
struct HyperfineCommand {
command: String,
#[serde(default)]
command_name: Option<String>,
mean: f64,
}
impl HyperfineCommand {
fn name(&self) -> &str {
self.command_name.as_deref().unwrap_or(&self.command)
}
}
#[derive(Debug, Serialize, Deserialize)]
struct BenchmarkDiagnostics {
targets: Vec<BenchmarkTargetDiagnostics>,
pnpr_direct_ratios: Vec<PnprDirectRatio>,
}
#[derive(Debug, Serialize, Deserialize)]
struct BenchmarkTargetDiagnostics {
id: String,
hyperfine_mean_seconds: Option<f64>,
phase_summary: PhaseSummary,
phase_events: Vec<PhaseEvent>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct PhaseSummary {
partition: Option<PartitionMetric>,
create_virtual_store_mean_ms: Option<f64>,
link_slots: Vec<LinkSlotsMetric>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PartitionMetric {
warm: u64,
cold: u64,
skipped: u64,
total: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct LinkSlotsMetric {
batch: String,
slots: u64,
mean_ms: f64,
}
#[derive(Debug, Serialize, Deserialize)]
struct PhaseEvent {
phase: String,
elapsed_ms: Option<u64>,
warm: Option<u64>,
cold: Option<u64>,
skipped: Option<u64>,
total: Option<u64>,
batch: Option<String>,
slots: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
struct PnprDirectRatio {
revision: String,
pnpr_mean_seconds: f64,
pacquet_mean_seconds: f64,
ratio: f64,
}
fn read_hyperfine_report(path: &Path) -> HyperfineReport {
let text = fs::read_to_string(path)
.unwrap_or_else(|err| panic!("read hyperfine report at {}: {err}", path.display()));
serde_json::from_str(&text)
.unwrap_or_else(|err| panic!("parse hyperfine report at {}: {err}", path.display()))
}
fn read_benchmark_diagnostics(path: &Path) -> BenchmarkDiagnostics {
let text = fs::read_to_string(path)
.unwrap_or_else(|err| panic!("read benchmark diagnostics at {}: {err}", path.display()));
serde_json::from_str(&text)
.unwrap_or_else(|err| panic!("parse benchmark diagnostics at {}: {err}", path.display()))
}
fn read_phase_events(path: &Path) -> Vec<PhaseEvent> {
let Ok(text) = fs::read_to_string(path) else { return Vec::new() };
text.lines()
.filter_map(|line| serde_json::from_str::<Value>(line).ok())
.filter(|value| {
value.get("target").and_then(Value::as_str) == Some("pacquet::install::phase")
})
.filter_map(|value| {
let phase = event_str(&value, "phase")?.to_string();
Some(PhaseEvent {
phase,
elapsed_ms: event_u64(&value, "elapsed_ms"),
warm: event_u64(&value, "warm"),
cold: event_u64(&value, "cold"),
skipped: event_u64(&value, "skipped"),
total: event_u64(&value, "total"),
batch: event_str(&value, "batch").map(str::to_string),
slots: event_u64(&value, "slots"),
})
})
.collect()
}
fn event_field<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
value.get(key).or_else(|| value.get("fields").and_then(|fields| fields.get(key)))
}
fn event_str<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
event_field(value, key).and_then(Value::as_str)
}
fn event_u64(value: &Value, key: &str) -> Option<u64> {
let value = event_field(value, key)?;
value.as_u64().or_else(|| value.as_str().and_then(|text| text.parse().ok()))
}
fn summarize_phase_events(events: &[PhaseEvent]) -> PhaseSummary {
let partition =
events.iter().rev().find(|event| event.phase == "create_virtual_store_partition").and_then(
|event| {
Some(PartitionMetric {
warm: event.warm?,
cold: event.cold?,
skipped: event.skipped.unwrap_or(0),
total: event.total?,
})
},
);
let create_virtual_store_mean_ms = mean(
events
.iter()
.filter(|event| event.phase == "create_virtual_store")
.filter_map(|event| event.elapsed_ms)
.map(|elapsed| elapsed as f64),
);
let link_slots = ["warm", "cold"]
.into_iter()
.filter_map(|batch| {
let matching: Vec<&PhaseEvent> = events
.iter()
.filter(|event| {
event.phase == "link_slots" && event.batch.as_deref() == Some(batch)
})
.collect();
if matching.is_empty() {
return None;
}
let slots = matching.iter().filter_map(|event| event.slots).max().unwrap_or(0);
let mean_ms =
mean(matching.iter().filter_map(|event| event.elapsed_ms).map(|ms| ms as f64))?;
Some(LinkSlotsMetric { batch: batch.to_string(), slots, mean_ms })
})
.collect();
PhaseSummary { partition, create_virtual_store_mean_ms, link_slots }
}
fn mean(values: impl Iterator<Item = f64>) -> Option<f64> {
let mut total = 0.0;
let mut count = 0_u64;
for value in values {
total += value;
count += 1;
}
(count > 0).then_some(total / count as f64)
}
fn collect_pnpr_direct_ratios(
commands_by_name: &HashMap<String, HyperfineCommand>,
) -> Vec<PnprDirectRatio> {
let mut ratios: Vec<PnprDirectRatio> = commands_by_name
.iter()
.filter_map(|(name, pnpr)| {
let revision = name.strip_prefix("pnpr@")?;
let direct = commands_by_name.get(&format!("pacquet@{revision}"))?;
Some(PnprDirectRatio {
revision: revision.to_string(),
pnpr_mean_seconds: pnpr.mean,
pacquet_mean_seconds: direct.mean,
ratio: pnpr.mean / direct.mean,
})
})
.collect();
ratios.sort_by(|a, b| a.revision.cmp(&b.revision));
ratios
}
fn non_trivial_cold_batch(cold: u64, total: u64) -> bool {
cold > 0 && (total < 10 || cold.saturating_mul(10) >= total)
}
fn requires_fresh_pnpr_cold_batch_metrics(target_id: &str) -> bool {
target_id == "pnpr@HEAD"
}
fn render_diagnostics_markdown(
diagnostics: &BenchmarkDiagnostics,
scenario: Option<BenchmarkScenario>,
) -> String {
let mut out = String::from("## Pacquet benchmark diagnostics\n\n");
if scenario == Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore)
&& contains_uninstrumented_pnpr_main(diagnostics)
{
out.push_str(
"> Note: `pnpr@main` in this no-lockfile cold-store report predates the benchmark tarball URL rewrite, so newly resolved tarballs can use raw loopback registry URLs. `pnpr@HEAD` rewrites those URLs to the client-facing registry and pays the configured registry latency/bandwidth. Treat `pnpr@HEAD / pacquet@HEAD` as the guarded comparison here, not `pnpr@HEAD` versus `pnpr@main`.\n\n",
);
}
out.push_str(
"| Target | hyperfine mean | warm | cold | skipped | CreateVirtualStore mean | link warm mean | link cold mean |\n",
);
out.push_str("| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |\n");
for target in &diagnostics.targets {
let partition = target.phase_summary.partition.as_ref();
out.push_str(&format!(
"| {} | {} | {} | {} | {} | {} | {} | {} |\n",
target.id,
format_seconds(target.hyperfine_mean_seconds),
format_u64(partition.map(|metric| metric.warm)),
format_u64(partition.map(|metric| metric.cold)),
format_u64(partition.map(|metric| metric.skipped)),
format_ms(target.phase_summary.create_virtual_store_mean_ms),
format_ms(link_slots_mean(&target.phase_summary, "warm")),
format_ms(link_slots_mean(&target.phase_summary, "cold")),
));
}
if !diagnostics.pnpr_direct_ratios.is_empty() {
out.push_str("\n| Ratio | value |\n| --- | ---: |\n");
for ratio in &diagnostics.pnpr_direct_ratios {
out.push_str(&format!(
"| pnpr@{} / pacquet@{} | {:.3} |\n",
ratio.revision, ratio.revision, ratio.ratio,
));
}
}
out
}
fn contains_uninstrumented_pnpr_main(diagnostics: &BenchmarkDiagnostics) -> bool {
diagnostics
.targets
.iter()
.any(|target| target.id == "pnpr@main" && target.phase_summary.partition.is_none())
}
fn link_slots_mean(summary: &PhaseSummary, batch: &str) -> Option<f64> {
summary.link_slots.iter().find(|metric| metric.batch == batch).map(|metric| metric.mean_ms)
}
fn format_seconds(value: Option<f64>) -> String {
value.map_or_else(|| "-".to_string(), |value| format!("{value:.3}s"))
}
fn format_ms(value: Option<f64>) -> String {
value.map_or_else(|| "-".to_string(), |value| format!("{value:.1}ms"))
}
fn format_u64(value: Option<u64>) -> String {
value.map_or_else(|| "-".to_string(), |value| value.to_string())
}
/// Whether `dir` contains at least one regular file, recursively. Used to
@@ -961,13 +1386,9 @@ fn may_create_lockfile(dst_dir: &Path, scenario: BenchmarkScenario, src_dir: Opt
/// client picks up `PNPM_CONFIG_PNPR_SERVER` and routes the install
/// through it. The `source` fails loudly under `errexit` if the file is
/// missing, rather than silently falling back to a direct install.
fn create_install_script(
dir: &Path,
scenario: BenchmarkScenario,
command: &str,
needs_pnpr_env: bool,
) {
fn create_install_script(dir: &Path, scenario: BenchmarkScenario, command: &str, id: BenchId) {
let path = dir.join("install.bash");
let capture_pacquet_metrics = id.is_pacquet_like();
eprintln!("Creating script {path:?}...");
let mut file = File::create(&path).expect("create install.bash");
@@ -975,14 +1396,34 @@ fn create_install_script(
writeln!(file, "#!/bin/bash").unwrap();
writeln!(file, "set -o errexit -o nounset -o pipefail").unwrap();
writeln!(file, r#"cd "$(dirname "$0")""#).unwrap();
if needs_pnpr_env {
if id.is_pnpr() {
writeln!(file, "source ./.pnpr-env").unwrap();
}
if capture_pacquet_metrics {
// pnpm targets cannot emit pacquet phase events, so diagnostics are
// pacquet/pnpr-only. This adds a small one-sided tracing + file-I/O
// cost to pnpm comparisons, but keeps materialization regressions
// visible in the benchmark report.
writeln!(file, r#"export TRACE="${{TRACE:-pacquet::install::phase=info}}""#).unwrap();
writeln!(file, r#"export TRACE_FORMAT="${{TRACE_FORMAT:-json}}""#).unwrap();
writeln!(
file,
r#"printf '{{"benchmarkTarget":"{}","event":"runStart"}}\n' >> {}"#,
id, BENCHMARK_OUTPUT_LOG,
)
.unwrap();
}
write!(file, "exec {command}").unwrap();
if capture_pacquet_metrics {
write!(file, " --reporter ndjson").unwrap();
}
for arg in scenario.install_args() {
write!(file, " {arg}").unwrap();
}
if capture_pacquet_metrics {
write!(file, " >> {BENCHMARK_OUTPUT_LOG} 2>&1").unwrap();
}
writeln!(file).unwrap();
make_file_executable(&file).expect("make the script executable");
@@ -1016,6 +1457,12 @@ impl BenchId<'_> {
matches!(self, BenchId::PnprRevision(_))
}
/// Whether this bench id runs the Rust pacquet client, either
/// directly or through a pnpr server.
fn is_pacquet_like(self) -> bool {
matches!(self, BenchId::PacquetRevision(_) | BenchId::PnprRevision(_))
}
/// Whether this is the proxy-cache populator (untimed setup that
/// warms the registry cache), which always uses the real registry.
fn is_proxy_cache_populator(self) -> bool {
@@ -1033,3 +1480,6 @@ impl<'a> fmt::Display for BenchId<'a> {
}
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,206 @@
use super::{
BenchmarkScenario, HyperfineCommand, PhaseEvent, collect_pnpr_direct_ratios,
non_trivial_cold_batch, read_phase_events, render_diagnostics_markdown,
requires_fresh_pnpr_cold_batch_metrics, summarize_phase_events,
};
use std::{collections::HashMap, fs};
#[test]
fn phase_event_parser_reads_flat_and_nested_json_trace_fields() {
let path = std::env::temp_dir()
.join(format!("pacquet-integrated-benchmark-phase-events-{}.ndjson", std::process::id()));
fs::write(
&path,
r#"{"target":"pacquet::install::phase","phase":"create_virtual_store_partition","warm":3,"cold":7,"skipped":1,"total":11}"#
.to_string()
+ "\n"
+ r#"{"target":"pacquet::install::phase","fields":{"phase":"create_virtual_store","elapsed_ms":42}}"#
+ "\n"
+ r#"{"name":"pnpm:progress","status":"resolved"}"#
+ "\n",
)
.expect("write phase fixture");
let events = read_phase_events(&path);
let _ = fs::remove_file(path);
assert_eq!(events.len(), 2);
assert_eq!(events[0].phase, "create_virtual_store_partition");
assert_eq!(events[0].warm, Some(3));
assert_eq!(events[0].cold, Some(7));
assert_eq!(events[1].phase, "create_virtual_store");
assert_eq!(events[1].elapsed_ms, Some(42));
}
#[test]
fn phase_summary_reports_partition_and_means() {
let events = vec![
PhaseEvent {
phase: "create_virtual_store_partition".to_string(),
elapsed_ms: None,
warm: Some(1),
cold: Some(9),
skipped: Some(0),
total: Some(10),
batch: None,
slots: None,
},
PhaseEvent {
phase: "create_virtual_store".to_string(),
elapsed_ms: Some(100),
warm: None,
cold: None,
skipped: None,
total: None,
batch: None,
slots: None,
},
PhaseEvent {
phase: "create_virtual_store".to_string(),
elapsed_ms: Some(200),
warm: None,
cold: None,
skipped: None,
total: None,
batch: None,
slots: None,
},
PhaseEvent {
phase: "link_slots".to_string(),
elapsed_ms: Some(30),
warm: None,
cold: None,
skipped: None,
total: None,
batch: Some("cold".to_string()),
slots: Some(9),
},
];
let summary = summarize_phase_events(&events);
let partition = summary.partition.expect("partition summary");
assert_eq!(partition.warm, 1);
assert_eq!(partition.cold, 9);
assert_eq!(summary.create_virtual_store_mean_ms, Some(150.0));
assert_eq!(summary.link_slots[0].batch, "cold");
assert_eq!(summary.link_slots[0].mean_ms, 30.0);
}
#[test]
fn pnpr_direct_ratios_pair_matching_revisions() {
let commands = HashMap::from([
(
"pacquet@HEAD".to_string(),
HyperfineCommand {
command: "pacquet@HEAD".to_string(),
command_name: None,
mean: 10.0,
},
),
(
"pnpr@HEAD".to_string(),
HyperfineCommand { command: "pnpr@HEAD".to_string(), command_name: None, mean: 8.0 },
),
(
"pnpr@main".to_string(),
HyperfineCommand { command: "pnpr@main".to_string(), command_name: None, mean: 9.0 },
),
]);
let ratios = collect_pnpr_direct_ratios(&commands);
assert_eq!(ratios.len(), 1);
assert_eq!(ratios[0].revision, "HEAD");
assert_eq!(ratios[0].ratio, 0.8);
}
#[test]
fn cold_batch_canary_requires_non_trivial_cold_share() {
assert!(non_trivial_cold_batch(1, 1));
assert!(non_trivial_cold_batch(10, 100));
assert!(!non_trivial_cold_batch(0, 100));
assert!(!non_trivial_cold_batch(9, 100));
}
#[test]
fn cold_batch_metrics_canary_targets_current_pnpr_revision() {
assert!(requires_fresh_pnpr_cold_batch_metrics("pnpr@HEAD"));
assert!(!requires_fresh_pnpr_cold_batch_metrics("pnpr@main"));
assert!(!requires_fresh_pnpr_cold_batch_metrics("pacquet@HEAD"));
}
#[test]
fn diagnostics_markdown_includes_create_virtual_store_line_item() {
let markdown = render_diagnostics_markdown(
&super::BenchmarkDiagnostics {
targets: vec![super::BenchmarkTargetDiagnostics {
id: "pnpr@HEAD".to_string(),
hyperfine_mean_seconds: Some(7.5),
phase_summary: super::PhaseSummary {
partition: Some(super::PartitionMetric {
warm: 12,
cold: 88,
skipped: 0,
total: 100,
}),
create_virtual_store_mean_ms: Some(1234.0),
link_slots: vec![],
},
phase_events: vec![],
}],
pnpr_direct_ratios: vec![],
},
None,
);
assert!(markdown.contains("CreateVirtualStore mean"));
assert!(markdown.contains("1234.0ms"));
assert!(markdown.contains("| pnpr@HEAD |"));
}
#[test]
fn diagnostics_markdown_notes_fresh_install_cold_store_tarball_baseline_shift() {
let markdown = render_diagnostics_markdown(
&super::BenchmarkDiagnostics {
targets: vec![super::BenchmarkTargetDiagnostics {
id: "pnpr@main".to_string(),
hyperfine_mean_seconds: Some(1.0),
phase_summary: super::PhaseSummary::default(),
phase_events: vec![],
}],
pnpr_direct_ratios: vec![],
},
Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore),
);
assert!(markdown.contains("pnpr@main"));
assert!(markdown.contains("tarball URL rewrite"));
assert!(markdown.contains("pnpr@HEAD / pacquet@HEAD"));
}
#[test]
fn diagnostics_markdown_omits_baseline_note_after_pnpr_main_is_instrumented() {
let markdown = render_diagnostics_markdown(
&super::BenchmarkDiagnostics {
targets: vec![super::BenchmarkTargetDiagnostics {
id: "pnpr@main".to_string(),
hyperfine_mean_seconds: Some(1.0),
phase_summary: super::PhaseSummary {
partition: Some(super::PartitionMetric {
warm: 0,
cold: 1,
skipped: 0,
total: 1,
}),
create_virtual_store_mean_ms: None,
link_slots: vec![],
},
phase_events: vec![],
}],
pnpr_direct_ratios: vec![],
},
Some(BenchmarkScenario::IsolatedFreshInstallColdCacheColdStore),
);
assert!(!markdown.contains("tarball URL rewrite"));
}