From 657d322b155eb3733136c2fd9053a4a3ed0bc694 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Thu, 11 Jun 2026 02:58:36 +0200 Subject: [PATCH] perf(network): schedule tarball downloads by estimated pipeline work (#12309) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary When the download connection pool saturates, freed slots are granted by a two-class scheduling policy instead of FIFO: - **Latency class** (packument/metadata fetches, which gate resolution progress): served FIFO. - **Throughput class** (tarball downloads): ranked by **estimated total pipeline work** — `unpackedSize + 3000 × fileCount` — so the most expensive download+extract jobs start first (longest-processing-time-first; a large archive that starts last runs alone at single-connection throughput while every other slot idles, see [pnpm/pnpm#12230](https://github.com/pnpm/pnpm/issues/12230)). The per-file term prices the fixed CAS-write/hash overhead, so a many-small-files package ranks as the long job it actually is. - **Neither class can starve the other**: downloads are guaranteed a reserved half of the pool (strict metadata-first was measured to serialize cold installs — no tarball got a slot until resolution drained, costing the whole resolve/fetch overlap), and metadata wins beyond that reserve (a download backlog can't stall resolution). Both directions are work-conserving. ### How the size hints travel - Local fresh installs read `dist.unpackedSize` / `dist.fileCount` off the resolver-fetched manifest (also fixes exact decompression-buffer preallocation on the prefetch path, previously hardcoded `None`). - The pnpr `/v1/resolve` `package` frame carries both as optional `unpackedSize` / `fileCount` fields (omitted when the registry never published them; old clients and servers interoperate unchanged). - pnpr frozen restores: the lockfile records no sizes, but the verification fan-out fetches each entry's metadata anyway — the npm verifier records both stats into an optional `ObservedDistStats` sink as a side product of the tarball-URL binding check, and the frozen fast path announces every verified tarball as a sized `package` frame before `done` (URLs derived by the same `tarball_url_and_integrity` the client materialization uses). Verdict-cache hits fetch no metadata and keep the bare `done` frame. - pnpr's abbreviated metadata now **preserves** `unpackedSize`/`fileCount` instead of stripping them, since pacquet reads both. - Resolve-time tarball fetches (tarball deps' manifests come from their archives) acquire in the latency class — they gate the resolver's walk. ### Benchmark tooling - The integrated benchmark's latency proxy gained `--registry-slow-start`: per-connection TCP slow start (RFC 6928 initial window, doubling per delivered window toward the bandwidth cap), so scheduling effects that depend on per-connection ramp-up are measurable. - Fixed a macOS bug where the proxy's accepted sockets inherited the listener's `O_NONBLOCK` and every proxied connection died on its first read — all shaped benchmark traffic silently failed before this. ## Measurements Fixture: ~110 direct deps / 1308 packages (~90 MB wire), `isolated-linker.fresh-install.cold-cache.cold-store`, local mirror of real npm behind the shaped proxy (30 ms RTT, 80 Mbit/s per-connection cap, TCP slow start). **Drift-controlled interleaved comparison** (4 alternating blocks x 4 runs each; sequential multi-target sessions on this machine showed up to +75% session-order drift, so block-paired ABAB is the only design we trust): | target | mean +/- sd (n=16) | | --- | --- | | baseline FIFO | 14.36 s +/- 0.54 | | this PR | **14.06 s +/- 0.70** | The PR wins **all 4 paired blocks** (-0.18 s to -0.50 s, mean -0.30 s, ~2%). A scheduler ablation (reserve+FIFO, smallest-first, unpackedSize-only, work with K=3k and K=10k per file) ordered as the pipeline model predicts, but the per-variant deltas sit inside the session-drift noise, so only the FIFO-vs-full-design pairing is claimed. K in [3000, 10000] is indistinguishable. **The starvation fix is the load-bearing piece, established mechanistically rather than by wall clock:** with strict metadata-first priority (an intermediate design), cold-install event timelines showed 4-7 s windows at install start with zero tarball activity - downloads never won a slot during the resolution burst, serializing the resolve and fetch phases. The reserved share removes those gaps entirely and the worst observed cold-install runs with it are within ~1 s of the median, where unreserved variants showed multi-second stragglers. Real-registry A/B (15 randomized cold-install pairs against npmjs) is noise-bound on a saturated ~100 Mbit link (+/-3 s registry variance), median -0.17 s in this PR's favor - consistent with "never slower." --- pacquet/crates/cli/src/cli_args/install.rs | 8 +- .../src/node_resolver.rs | 1 + .../env-installer/src/install_config_deps.rs | 1 + pacquet/crates/network/src/lib.rs | 61 ++++- .../crates/network/src/priority_semaphore.rs | 243 ++++++++++++++++++ .../network/src/priority_semaphore/tests.rs | 166 ++++++++++++ .../src/build_resolution_verifiers.rs | 9 +- pacquet/crates/package-manager/src/install.rs | 1 + .../src/install_package_by_snapshot.rs | 14 +- .../src/install_package_from_registry.rs | 19 +- .../src/prefetching_resolver.rs | 4 +- .../src/resolution_observer.rs | 15 +- .../package-manager/src/tarball_prefetch.rs | 20 +- pacquet/crates/pnpr-client/src/lib.rs | 36 ++- pacquet/crates/pnpr-client/src/tests.rs | 20 +- .../resolving-git-resolver/src/runners.rs | 1 + .../src/create_npm_resolution_verifier.rs | 51 ++++ .../create_npm_resolution_verifier/tests.rs | 59 ++++- .../crates/resolving-npm-resolver/src/lib.rs | 3 +- .../src/lookup_context.rs | 9 + pacquet/crates/tarball/src/lib.rs | 57 +++- pacquet/crates/tarball/src/tests.rs | 46 +++- .../integrated-benchmark/src/cli_args.rs | 10 + .../integrated-benchmark/src/latency_proxy.rs | 64 ++++- .../src/latency_proxy/tests.rs | 60 ++++- .../tasks/integrated-benchmark/src/main.rs | 3 + .../integrated-benchmark/src/work_env.rs | 4 + pacquet/tasks/micro-benchmark/src/main.rs | 1 + pnpr/crates/pnpr/src/resolver.rs | 156 +++++++++-- pnpr/crates/pnpr/src/resolver/tests.rs | 74 ++++++ pnpr/crates/pnpr/src/upstream.rs | 13 +- pnpr/crates/pnpr/src/upstream/tests.rs | 9 +- 32 files changed, 1151 insertions(+), 87 deletions(-) create mode 100644 pacquet/crates/network/src/priority_semaphore.rs create mode 100644 pacquet/crates/network/src/priority_semaphore/tests.rs diff --git a/pacquet/crates/cli/src/cli_args/install.rs b/pacquet/crates/cli/src/cli_args/install.rs index 39ca801363..b6eefa4fd7 100644 --- a/pacquet/crates/cli/src/cli_args/install.rs +++ b/pacquet/crates/cli/src/cli_args/install.rs @@ -545,7 +545,13 @@ async fn install_via_pnpr( || pkg.tarball.clone(), |registry| registry.client_tarball_url(&pkg.tarball), ); - prefetcher.prefetch(pkg.id, tarball, &pkg.integrity); + prefetcher.prefetch( + pkg.id, + tarball, + &pkg.integrity, + pkg.unpacked_size, + pkg.file_count, + ); }) .await } diff --git a/pacquet/crates/engine-runtime-node-resolver/src/node_resolver.rs b/pacquet/crates/engine-runtime-node-resolver/src/node_resolver.rs index 651876e11a..a66080355c 100644 --- a/pacquet/crates/engine-runtime-node-resolver/src/node_resolver.rs +++ b/pacquet/crates/engine-runtime-node-resolver/src/node_resolver.rs @@ -96,6 +96,7 @@ pub struct NodeResolver { } impl NodeResolver { + #[must_use] pub fn new(http_client: Arc) -> Self { Self { http_client, node_download_mirrors: HashMap::new(), offline: false } } diff --git a/pacquet/crates/env-installer/src/install_config_deps.rs b/pacquet/crates/env-installer/src/install_config_deps.rs index 609fb255d1..c63a86277b 100644 --- a/pacquet/crates/env-installer/src/install_config_deps.rs +++ b/pacquet/crates/env-installer/src/install_config_deps.rs @@ -183,6 +183,7 @@ async fn materialize( verified_files_cache: SharedVerifiedFilesCache::default(), package_integrity: integrity, package_unpacked_size: None, + package_file_count: None, package_url: tarball, package_id: &package_id, auth_headers: opts.auth_headers, diff --git a/pacquet/crates/network/src/lib.rs b/pacquet/crates/network/src/lib.rs index 204580994d..fde7cd0304 100644 --- a/pacquet/crates/network/src/lib.rs +++ b/pacquet/crates/network/src/lib.rs @@ -1,4 +1,5 @@ mod auth; +mod priority_semaphore; mod proxy; mod retry; #[cfg(test)] @@ -10,13 +11,13 @@ pub use proxy::{NoProxySetting, ProxyConfig, ProxyError}; pub use retry::{RetryOpts, send_with_retry, should_retry_status}; pub use tls::{PerRegistryTls, RegistryTls, TlsConfig, TlsError}; +use priority_semaphore::{Permit, PrioritySemaphore}; use proxy::{NoProxyMatcher, parse_proxy_url, strip_userinfo}; use reqwest::{ Certificate, Client, Identity, Proxy, header::{HeaderMap, HeaderValue, USER_AGENT}, }; use std::{collections::HashMap, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration}; -use tokio::sync::{Semaphore, SemaphorePermit}; /// Fallback `User-Agent` for the install client's no-config /// constructors ([`ThrottledClient::new_for_installs`], @@ -38,6 +39,15 @@ use tokio::sync::{Semaphore, SemaphorePermit}; /// The install client therefore always sends one. pub const DEFAULT_USER_AGENT: &str = "pnpm"; +/// Permit priority used by [`ThrottledClient::acquire`] / +/// [`ThrottledClient::acquire_for_url`] for callers that don't pass an +/// explicit one. Marks the request as latency class — packument and +/// other metadata fetches that gate resolution progress — served FIFO +/// and preferred over size-prioritized downloads beyond the downloads' +/// reserved share of the pool (see the `priority_semaphore` module +/// docs for the two-class grant policy). +pub const UNPRIORITIZED: u64 = u64::MAX; + /// Default per-request timeout in milliseconds, matching pnpm v11's /// `fetchTimeout` default of `60000` /// ([`config/reader/src/index.ts:151`](https://github.com/pnpm/pnpm/blob/1819226b51/config/reader/src/index.ts#L151)). @@ -77,7 +87,8 @@ impl Default for NetworkSettings { } } -/// Wrapper around [`Client`] with concurrent request limit enforced by the [`Semaphore`] mechanism. +/// Wrapper around [`Client`] with a concurrent request limit enforced +/// by a priority-ordered semaphore (`priority_semaphore` module). /// /// Holds a default [`Client`] for the top-level proxy / TLS config /// plus an optional map of per-registry clients keyed by nerf-darted @@ -87,9 +98,20 @@ impl Default for NetworkSettings { /// client. The semaphore is shared across both — bounding the total /// concurrent socket count regardless of which registry a request /// targets. +/// +/// When the pool saturates, freed slots are granted by a two-class +/// policy (see the `priority_semaphore` module docs): requests +/// acquired without an explicit priority ([`Self::acquire`], +/// [`Self::acquire_for_url`]) form the FIFO latency class (typically +/// metadata fetches gating resolution progress), while downloads pass +/// their estimated pipeline work through +/// [`Self::acquire_for_url_with_priority`] and are guaranteed a +/// reserved share of the pool, granted most-expensive-first — so the +/// longest download jobs start early and neither class starves the +/// other. #[derive(Debug)] pub struct ThrottledClient { - semaphore: Semaphore, + semaphore: PrioritySemaphore, client: Client, /// Per-registry clients keyed by nerf-darted URI. Empty when no /// `//host/:cert=…` / `:key=…` / `:ca=…` / `:cafile=…` / @@ -120,7 +142,7 @@ pub struct ThrottledClient { /// still draining, and the per-process FD count overruns the /// platform limit — surfacing as `EMFILE` "too many open files". pub struct ThrottledClientGuard<'a> { - _permit: SemaphorePermit<'a>, + _permit: Permit, client: &'a Client, } @@ -139,8 +161,7 @@ impl ThrottledClient { /// against [`default_network_concurrency`] — typically the full /// `send + body-consume` lifetime, not just `.send()`. pub async fn acquire(&self) -> ThrottledClientGuard<'_> { - let permit = - self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon"); + let permit = self.semaphore.acquire(UNPRIORITIZED).await; ThrottledClientGuard { _permit: permit, client: &self.client } } @@ -290,7 +311,7 @@ impl ThrottledClient { } Ok(ThrottledClient { - semaphore: Semaphore::new(settings.network_concurrency), + semaphore: PrioritySemaphore::new(settings.network_concurrency), client: default_client, per_registry: per_registry_clients, routing: per_registry.clone(), @@ -304,7 +325,7 @@ impl ThrottledClient { /// test-suite budget instead of waiting on TCP retry. #[must_use] pub fn from_client(client: Client) -> Self { - let semaphore = Semaphore::new(default_network_concurrency()); + let semaphore = PrioritySemaphore::new(default_network_concurrency()); ThrottledClient { semaphore, client, @@ -333,8 +354,22 @@ impl ThrottledClient { /// just to satisfy the type signature — the lookup itself works /// on the raw string form. pub async fn acquire_for_url(&self, url: &str) -> ThrottledClientGuard<'_> { - let permit = - self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon"); + self.acquire_for_url_with_priority(url, UNPRIORITIZED).await + } + + /// [`Self::acquire_for_url`], but queueing behind the saturated + /// pool at an explicit `priority` instead of [`UNPRIORITIZED`] — + /// the throughput class of the two-class grant policy. Tarball + /// downloads pass their estimated pipeline work (0 when unknown) + /// so that freed slots go to the most expensive pending archive + /// first — the longest download+extract jobs start earliest and + /// never end up running alone after the small ones drained. + pub async fn acquire_for_url_with_priority( + &self, + url: &str, + priority: u64, + ) -> ThrottledClientGuard<'_> { + let permit = self.semaphore.acquire(priority).await; let client = self .routing .pick_for_url(url) @@ -502,9 +537,9 @@ pub enum ForInstallsError { #[diagnostic(transparent)] Tls(#[error(source)] TlsError), - /// `network_concurrency` resolved to `0`. A zero-permit - /// [`Semaphore`] would make every `acquire` block forever, hanging - /// the install. pnpm rejects the same value — its `p-queue` throws + /// `network_concurrency` resolved to `0`. A zero-permit semaphore + /// would make every `acquire` block forever, hanging the install. + /// pnpm rejects the same value — its `p-queue` throws /// `Expected concurrency to be a number from 1 and up` — so pacquet /// fails fast rather than deadlock. #[display("networkConcurrency must be at least 1")] diff --git a/pacquet/crates/network/src/priority_semaphore.rs b/pacquet/crates/network/src/priority_semaphore.rs new file mode 100644 index 0000000000..278220a7d9 --- /dev/null +++ b/pacquet/crates/network/src/priority_semaphore.rs @@ -0,0 +1,243 @@ +//! Class-aware permit dispenser backing +//! [`ThrottledClient`](crate::ThrottledClient). +//! +//! Two request classes share the pool: +//! +//! * **Latency** ([`crate::UNPRIORITIZED`]) — packument and other +//! metadata fetches that gate resolution progress. Served FIFO. +//! * **Throughput** (any other priority) — tarball downloads, ranked +//! by their estimated pipeline work so the most expensive archives +//! claim freed slots first (the longest-processing-time-first +//! scheduling argument; see +//! [pnpm/pnpm#12230](https://github.com/pnpm/pnpm/issues/12230) for +//! the per-connection bandwidth measurements behind it). +//! +//! Neither class may starve the other. Strictly preferring latency +//! work was measured to *serialize* a cold fresh install — during the +//! resolution burst no tarball ever got a slot, so the download +//! pipeline started only after resolution finished, costing the whole +//! resolve/fetch overlap. Instead the throughput class is guaranteed a +//! reserved share of the pool (half, rounded up — but never all of +//! it): when downloads hold +//! fewer than the reserve, a freed slot goes to the largest pending +//! download even while metadata is queued; beyond the reserve, queued +//! metadata wins. Both directions are work-conserving — either class +//! may use the whole pool while the other has nothing pending. + +use std::{ + cmp::Ordering, + collections::{BinaryHeap, VecDeque}, + sync::{Arc, Mutex}, +}; +use tokio::sync::oneshot; + +use crate::UNPRIORITIZED; + +/// Counting semaphore with the two-class grant policy described in the +/// module docs. Cancel-safe: dropping a waiting [`acquire`] future +/// gives up its place in line, and a permit granted to a waiter that +/// was cancelled in the same instant passes on to the next waiter +/// instead of leaking. +/// +/// [`acquire`]: Self::acquire +pub(crate) struct PrioritySemaphore { + state: Arc>, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum Class { + Latency, + Throughput, +} + +impl Class { + fn of(priority: u64) -> Class { + if priority == UNPRIORITIZED { Class::Latency } else { Class::Throughput } + } +} + +struct SemState { + free: usize, + latency_in_flight: usize, + throughput_in_flight: usize, + /// Minimum number of slots queued throughput work can always + /// grow into, even while latency work is queued. + throughput_reserve: usize, + /// Registration counter used as the FIFO tie-break. + next_seq: u64, + latency_waiters: VecDeque, + throughput_waiters: BinaryHeap, +} + +struct Waiter { + priority: u64, + seq: u64, + tx: oneshot::Sender, +} + +/// `BinaryHeap` is a max-heap, so "greatest" pops first: order by +/// priority, then by *earlier* registration among equals. +impl Ord for Waiter { + fn cmp(&self, other: &Self) -> Ordering { + self.priority.cmp(&other.priority).then_with(|| other.seq.cmp(&self.seq)) + } +} + +impl PartialOrd for Waiter { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for Waiter { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for Waiter {} + +/// Owned permit returned by [`PrioritySemaphore::acquire`]. Dropping it +/// hands the slot to the next waiter per the class policy (or back to +/// the free-permit count when nobody is waiting). +pub(crate) struct Permit { + state: Arc>, + class: Class, + /// Cleared when the permit's release has been handed off inside + /// [`release`] (a waiter vanished between pop and send), so this + /// permit's own `Drop` must not release a second slot. + armed: bool, +} + +impl Drop for Permit { + fn drop(&mut self) { + if self.armed { + release(&self.state, self.class); + } + } +} + +impl PrioritySemaphore { + pub(crate) fn new(permits: usize) -> Self { + PrioritySemaphore { + state: Arc::new(Mutex::new(SemState { + free: permits, + latency_in_flight: 0, + throughput_in_flight: 0, + // Half the pool, but never all of it: a reserve that + // covers every permit (the `permits == 1` case) would + // invert the starvation guarantee — queued downloads + // would block metadata outright instead of sharing. + throughput_reserve: permits.div_ceil(2).min(permits.saturating_sub(1)), + next_seq: 0, + latency_waiters: VecDeque::new(), + throughput_waiters: BinaryHeap::new(), + })), + } + } + + /// Wait for a permit. Free permits are claimed immediately; when + /// the pool is saturated the caller queues in its class and is + /// woken per the grant policy in the module docs. + pub(crate) async fn acquire(&self, priority: u64) -> Permit { + let class = Class::of(priority); + let rx = { + let mut state = self.state.lock().expect("priority semaphore lock poisoned"); + if state.free > 0 { + state.free -= 1; + *state.count_mut(class) += 1; + return Permit { state: Arc::clone(&self.state), class, armed: true }; + } + let (tx, rx) = oneshot::channel(); + let seq = state.next_seq; + state.next_seq += 1; + let waiter = Waiter { priority, seq, tx }; + match class { + Class::Latency => state.latency_waiters.push_back(waiter), + Class::Throughput => state.throughput_waiters.push(waiter), + } + rx + }; + rx.await.expect("priority semaphore state dropped while a permit was awaited") + } + + #[cfg(test)] + pub(crate) fn queued_waiters(&self) -> usize { + let state = self.state.lock().expect("priority semaphore lock poisoned"); + state.latency_waiters.len() + state.throughput_waiters.len() + } + + #[cfg(test)] + pub(crate) fn available_permits(&self) -> usize { + self.state.lock().expect("priority semaphore lock poisoned").free + } +} + +impl SemState { + fn count_mut(&mut self, class: Class) -> &mut usize { + match class { + Class::Latency => &mut self.latency_in_flight, + Class::Throughput => &mut self.throughput_in_flight, + } + } + + /// Pop the waiter the grant policy picks next, or `None` when both + /// queues are empty: throughput work below its reserve goes first, + /// then queued latency work, then throughput work above the + /// reserve. + fn next_waiter(&mut self) -> Option<(Waiter, Class)> { + if self.throughput_in_flight < self.throughput_reserve + && let Some(waiter) = self.throughput_waiters.pop() + { + return Some((waiter, Class::Throughput)); + } + if let Some(waiter) = self.latency_waiters.pop_front() { + return Some((waiter, Class::Latency)); + } + self.throughput_waiters.pop().map(|waiter| (waiter, Class::Throughput)) + } +} + +impl std::fmt::Debug for PrioritySemaphore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = self.state.lock().expect("priority semaphore lock poisoned"); + f.debug_struct("PrioritySemaphore") + .field("free", &state.free) + .field("latency_in_flight", &state.latency_in_flight) + .field("throughput_in_flight", &state.throughput_in_flight) + .field("latency_waiters", &state.latency_waiters.len()) + .field("throughput_waiters", &state.throughput_waiters.len()) + .finish() + } +} + +/// Hand a freed slot to the next waiter per the grant policy, skipping +/// waiters whose `acquire` future was dropped while queued; with no +/// live waiter the slot returns to the free-permit count. +fn release(state_arc: &Arc>, released: Class) { + let mut state = state_arc.lock().expect("priority semaphore lock poisoned"); + *state.count_mut(released) -= 1; + loop { + let Some((waiter, class)) = state.next_waiter() else { + state.free += 1; + return; + }; + let permit = Permit { state: Arc::clone(state_arc), class, armed: true }; + match waiter.tx.send(permit) { + Ok(()) => { + *state.count_mut(class) += 1; + return; + } + Err(mut returned) => { + // The receiver was dropped before the send, so this + // permit was never observed — disarm it and offer the + // slot to the next waiter. (Dropping it armed would + // re-enter `release` and deadlock on `state_arc`.) + returned.armed = false; + } + } + } +} + +#[cfg(test)] +mod tests; diff --git a/pacquet/crates/network/src/priority_semaphore/tests.rs b/pacquet/crates/network/src/priority_semaphore/tests.rs new file mode 100644 index 0000000000..89c7170e66 --- /dev/null +++ b/pacquet/crates/network/src/priority_semaphore/tests.rs @@ -0,0 +1,166 @@ +use pretty_assertions::assert_eq; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use super::PrioritySemaphore; +use crate::UNPRIORITIZED; + +/// Spawn a task that acquires with `priority`, records `label` on +/// grant, and releases immediately. Returns once the waiter is queued, +/// so registration order across calls is deterministic. +async fn spawn_waiter( + sem: &Arc, + order: &Arc>>, + label: &'static str, + priority: u64, +) -> tokio::task::JoinHandle<()> { + let queued_before = sem.queued_waiters(); + let handle = { + let sem = Arc::clone(sem); + let order = Arc::clone(order); + tokio::spawn(async move { + let _permit = sem.acquire(priority).await; + order.lock().unwrap().push(label); + }) + }; + while sem.queued_waiters() == queued_before { + tokio::task::yield_now().await; + } + handle +} + +#[tokio::test] +async fn saturated_semaphore_grants_highest_priority_first() { + let sem = Arc::new(PrioritySemaphore::new(1)); + let holder = sem.acquire(0).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let mut handles = Vec::new(); + for (label, priority) in [("small", 1), ("large", 500), ("medium", 30)] { + handles.push(spawn_waiter(&sem, &order, label, priority).await); + } + + drop(holder); + for handle in handles { + handle.await.unwrap(); + } + assert_eq!(*order.lock().unwrap(), vec!["large", "medium", "small"]); +} + +#[tokio::test] +async fn equal_priorities_are_granted_fifo() { + let sem = Arc::new(PrioritySemaphore::new(1)); + let holder = sem.acquire(0).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let mut handles = Vec::new(); + for label in ["first", "second", "third"] { + handles.push(spawn_waiter(&sem, &order, label, 7).await); + } + + drop(holder); + for handle in handles { + handle.await.unwrap(); + } + assert_eq!(*order.lock().unwrap(), vec!["first", "second", "third"]); +} + +#[tokio::test] +async fn cancelled_waiter_passes_its_grant_to_the_next_waiter() { + let sem = Arc::new(PrioritySemaphore::new(1)); + let holder = sem.acquire(0).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let cancelled = spawn_waiter(&sem, &order, "cancelled", 100).await; + let survivor = spawn_waiter(&sem, &order, "survivor", 1).await; + + cancelled.abort(); + let abort_result = cancelled.await; + dbg!(&abort_result); + assert!(abort_result.unwrap_err().is_cancelled()); + + drop(holder); + tokio::time::timeout(Duration::from_secs(5), survivor) + .await + .expect("the freed permit should reach the surviving waiter") + .unwrap(); + assert_eq!(*order.lock().unwrap(), vec!["survivor"]); +} + +#[tokio::test] +async fn released_permit_with_no_waiters_returns_to_the_free_pool() { + let sem = PrioritySemaphore::new(1); + drop(sem.acquire(0).await); + drop(sem.acquire(0).await); + assert_eq!(sem.queued_waiters(), 0); +} + +/// During a metadata flood, queued downloads still get slots up to +/// their reserved share — strict metadata-first was measured to +/// serialize cold installs (no download starts until resolution +/// drains), so the reserve is what keeps the two phases overlapping. +#[tokio::test] +async fn downloads_keep_their_reserved_share_during_a_metadata_flood() { + // 2 permits -> a throughput reserve of 1. + let sem = Arc::new(PrioritySemaphore::new(2)); + let holder_meta = sem.acquire(UNPRIORITIZED).await; + let holder_download = sem.acquire(5).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let meta_one = spawn_waiter(&sem, &order, "meta-1", UNPRIORITIZED).await; + let meta_two = spawn_waiter(&sem, &order, "meta-2", UNPRIORITIZED).await; + let download = spawn_waiter(&sem, &order, "download", 9).await; + + // Freeing the download slot drops throughput below its reserve, so + // the queued download outranks the earlier-queued metadata. + drop(holder_download); + for handle in [download, meta_one, meta_two] { + handle.await.unwrap(); + } + drop(holder_meta); + assert_eq!(*order.lock().unwrap(), vec!["download", "meta-1", "meta-2"]); +} + +/// The reserve is a floor, not free rein: with downloads already at +/// their share, a freed latency slot goes to queued metadata before a +/// higher-priority download — resolution progress can't be starved by +/// a backlog of large archives either. +#[tokio::test] +async fn metadata_outranks_downloads_beyond_the_reserve() { + let sem = Arc::new(PrioritySemaphore::new(2)); + let holder_meta = sem.acquire(UNPRIORITIZED).await; + let holder_download = sem.acquire(5).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let download = spawn_waiter(&sem, &order, "download", 9).await; + let meta = spawn_waiter(&sem, &order, "meta", UNPRIORITIZED).await; + + // Freeing the metadata slot keeps throughput at its reserve (the + // download holder is still running), so queued metadata wins even + // though the download was queued first with a high priority. + drop(holder_meta); + meta.await.unwrap(); + drop(holder_download); + download.await.unwrap(); + assert_eq!(*order.lock().unwrap(), vec!["meta", "download"]); +} + +/// With a single permit the reserve must be zero — a reserve covering +/// the whole pool would invert the starvation guarantee and let queued +/// downloads block metadata outright. +#[tokio::test] +async fn single_permit_pool_still_serves_metadata_first() { + let sem = Arc::new(PrioritySemaphore::new(1)); + let holder = sem.acquire(5).await; + + let order = Arc::new(Mutex::new(Vec::new())); + let download = spawn_waiter(&sem, &order, "download", 9).await; + let meta = spawn_waiter(&sem, &order, "meta", UNPRIORITIZED).await; + + drop(holder); + meta.await.unwrap(); + download.await.unwrap(); + assert_eq!(*order.lock().unwrap(), vec!["meta", "download"]); +} diff --git a/pacquet/crates/package-manager/src/build_resolution_verifiers.rs b/pacquet/crates/package-manager/src/build_resolution_verifiers.rs index 30879b5f60..193ffc90b9 100644 --- a/pacquet/crates/package-manager/src/build_resolution_verifiers.rs +++ b/pacquet/crates/package-manager/src/build_resolution_verifiers.rs @@ -25,7 +25,8 @@ use pacquet_config::{ }; use pacquet_network::{AuthHeaders, ThrottledClient}; use pacquet_resolving_npm_resolver::{ - CreateNpmResolutionVerifierOptions, PackageMetaCache, create_npm_resolution_verifier, + CreateNpmResolutionVerifierOptions, ObservedDistStats, PackageMetaCache, + create_npm_resolution_verifier, }; use pacquet_resolving_resolver_base::ResolutionVerifier; @@ -67,11 +68,16 @@ pub enum BuildVerifiersError { /// during the same install yields the cached document instead of a /// fresh round-trip. Pass `None` from contexts where no resolver /// runs alongside (the frozen-install path, unit tests). +/// +/// `observed_dist_stats` is the optional [`ObservedDistStats`] sink +/// the npm verifier fills with each verified entry's `dist` work +/// statistics; pass `None` when the caller has no use for them. pub fn build_resolution_verifiers( config: &Config, http_client: Arc, meta_cache: Option>, auth_override: Option>, + observed_dist_stats: Option, ) -> Result>, BuildVerifiersError> { let mut verifiers: Vec> = Vec::new(); @@ -125,6 +131,7 @@ pub fn build_resolution_verifiers( meta_cache, retry_opts: retry_opts_from_config(config), now: None, + observed_dist_stats, }; verifiers.push(Arc::new(create_npm_resolution_verifier(opts))); diff --git a/pacquet/crates/package-manager/src/install.rs b/pacquet/crates/package-manager/src/install.rs index d175b7d4cc..6af0f132c7 100644 --- a/pacquet/crates/package-manager/src/install.rs +++ b/pacquet/crates/package-manager/src/install.rs @@ -649,6 +649,7 @@ where Some(Arc::clone(&meta_cache) as Arc), auth_override.clone(), + None, ) .map_err(InstallError::BuildVerifiers)?; diff --git a/pacquet/crates/package-manager/src/install_package_by_snapshot.rs b/pacquet/crates/package-manager/src/install_package_by_snapshot.rs index 936e45c2ce..cfa6988576 100644 --- a/pacquet/crates/package-manager/src/install_package_by_snapshot.rs +++ b/pacquet/crates/package-manager/src/install_package_by_snapshot.rs @@ -302,6 +302,7 @@ impl InstallPackageBySnapshot<'_> { verified_files_cache: Arc::clone(verified_files_cache), package_integrity: integrity, package_unpacked_size: None, + package_file_count: None, package_url: &tarball_url, package_id: &package_id, requester, @@ -578,8 +579,16 @@ impl InstallPackageBySnapshot<'_> { /// Resolve the tarball URL + integrity for tarball- and registry-shaped /// resolutions. Factored out so the per-resolution-type dispatch in /// [`InstallPackageBySnapshot::run`] reads top-down: each variant builds -/// its own `cas_paths`. -fn tarball_url_and_integrity<'a>( +/// its own `cas_paths`. Public because the pnpr server derives the same +/// URLs when it announces a verified frozen lockfile's tarballs to the +/// client — both sides must derive byte-identical URLs so the client's +/// prefetch mem-cache keys line up. +/// +/// # Panics +/// +/// On directory / git / binary / variations resolutions — callers gate +/// on the tarball/registry shapes first. +pub fn tarball_url_and_integrity<'a>( resolution: &'a LockfileResolution, package_key: &PackageKey, config: &'a Config, @@ -766,6 +775,7 @@ async fn fetch_binary_resolution_to_cas( verified_files_cache: Arc::clone(verified_files_cache), package_integrity: &binary.integrity, package_unpacked_size: None, + package_file_count: None, package_url: &binary.url, package_id: &package_id, requester, diff --git a/pacquet/crates/package-manager/src/install_package_from_registry.rs b/pacquet/crates/package-manager/src/install_package_from_registry.rs index e07b0731d0..75a5110851 100644 --- a/pacquet/crates/package-manager/src/install_package_from_registry.rs +++ b/pacquet/crates/package-manager/src/install_package_from_registry.rs @@ -147,6 +147,7 @@ impl InstallPackageFromRegistry<'_> { if first_visit { let (tarball_url, integrity) = extract_tarball(&resolution.resolution)?; let unpacked_size = manifest_unpacked_size(resolution.manifest.as_deref()); + let file_count = manifest_file_count(resolution.manifest.as_deref()); // `pnpm:progress resolved` mirrors pnpm's emit at // : @@ -172,6 +173,7 @@ impl InstallPackageFromRegistry<'_> { verified_files_cache: SharedVerifiedFilesCache::clone(verified_files_cache), package_integrity: &integrity, package_unpacked_size: unpacked_size, + package_file_count: file_count, package_url: tarball_url, package_id: &package_id, requester, @@ -272,14 +274,21 @@ pub(crate) fn extract_tarball( /// `None` when missing or non-numeric — the tarball extractor treats it /// as a hint, not a hard requirement. pub(crate) fn manifest_unpacked_size(manifest: Option<&Value>) -> Option { + manifest_dist_field(manifest, "unpackedSize") +} + +/// Read `dist.fileCount` off the resolver-fetched manifest. Feeds the +/// download priority's per-file pipeline-cost term; `None` when the +/// registry never published one. +pub(crate) fn manifest_file_count(manifest: Option<&Value>) -> Option { + manifest_dist_field(manifest, "fileCount") +} + +fn manifest_dist_field(manifest: Option<&Value>, field: &str) -> Option { // `usize::try_from` so a `u64` value larger than the host's // `usize` (32-bit targets) degrades to "no hint" rather than // truncating silently and producing an undersized pre-allocation. - manifest? - .get("dist")? - .get("unpackedSize")? - .as_u64() - .and_then(|value| usize::try_from(value).ok()) + manifest?.get("dist")?.get(field)?.as_u64().and_then(|value| usize::try_from(value).ok()) } #[cfg(test)] diff --git a/pacquet/crates/package-manager/src/prefetching_resolver.rs b/pacquet/crates/package-manager/src/prefetching_resolver.rs index 1b6481f52b..4be4fabf58 100644 --- a/pacquet/crates/package-manager/src/prefetching_resolver.rs +++ b/pacquet/crates/package-manager/src/prefetching_resolver.rs @@ -24,7 +24,7 @@ //! path as `TarballError::SiblingFetchFailed`. use crate::{ - install_package_from_registry::{extract_tarball, manifest_unpacked_size}, + install_package_from_registry::{extract_tarball, manifest_file_count, manifest_unpacked_size}, retry_config::retry_opts_from_config, }; use dashmap::DashSet; @@ -203,6 +203,7 @@ impl PrefetchingResolver { let package_id = format!("{}@{}", name_ver.name, name_ver.suffix); let package_url = package_url.to_string(); let package_unpacked_size = manifest_unpacked_size(result.manifest.as_deref()); + let package_file_count = manifest_file_count(result.manifest.as_deref()); let http_client = Arc::clone(&self.ctx.http_client); let mem_cache = Arc::clone(&self.ctx.mem_cache); @@ -238,6 +239,7 @@ impl PrefetchingResolver { verified_files_cache, package_integrity: &integrity, package_unpacked_size, + package_file_count, package_url: &package_url, package_id: &package_id, requester: &requester, diff --git a/pacquet/crates/package-manager/src/resolution_observer.rs b/pacquet/crates/package-manager/src/resolution_observer.rs index 25cafe2266..4b141d68c2 100644 --- a/pacquet/crates/package-manager/src/resolution_observer.rs +++ b/pacquet/crates/package-manager/src/resolution_observer.rs @@ -9,7 +9,9 @@ //! while the server is still resolving //! ([pnpm/pnpm#12234](https://github.com/pnpm/pnpm/issues/12234)). -use crate::install_package_from_registry::extract_tarball; +use crate::install_package_from_registry::{ + extract_tarball, manifest_file_count, manifest_unpacked_size, +}; use dashmap::DashSet; use pacquet_resolving_resolver_base::{ LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, ResolveResult, Resolver, @@ -32,6 +34,15 @@ pub struct ResolvedPackageHint<'a> { /// The resolver's `dist.tarball` URL — the same string the install /// pass looks up in the shared mem cache. pub tarball_url: &'a str, + /// `dist.unpackedSize` from the resolver-fetched manifest, when + /// the registry published one. Lets the fetching side size its + /// decompression buffer exactly and start the largest archives + /// first when the connection pool is saturated. + pub unpacked_size: Option, + /// `dist.fileCount` from the resolver-fetched manifest, when the + /// registry published one. The per-file term of the download + /// priority's pipeline-work estimate. + pub file_count: Option, } /// Sink notified once per resolved tarball package during a resolve. @@ -84,6 +95,8 @@ impl ObservingResolver { version: &version, integrity: &integrity, tarball_url, + unpacked_size: manifest_unpacked_size(result.manifest.as_deref()), + file_count: manifest_file_count(result.manifest.as_deref()), }); } } diff --git a/pacquet/crates/package-manager/src/tarball_prefetch.rs b/pacquet/crates/package-manager/src/tarball_prefetch.rs index 88aef28a65..f456e8287b 100644 --- a/pacquet/crates/package-manager/src/tarball_prefetch.rs +++ b/pacquet/crates/package-manager/src/tarball_prefetch.rs @@ -47,6 +47,7 @@ pub(crate) struct TarballDownload { pub package_url: String, pub integrity: Integrity, pub package_unpacked_size: Option, + pub package_file_count: Option, } /// [`tokio::spawn`] a single tarball download into the shared mem cache @@ -74,6 +75,7 @@ pub(crate) fn spawn_tarball_download(download: TarballDownload) { package_url, integrity, package_unpacked_size, + package_file_count, } = download; tokio::spawn(async move { @@ -86,6 +88,7 @@ pub(crate) fn spawn_tarball_download(download: TarballDownload) { verified_files_cache, package_integrity: &integrity, package_unpacked_size, + package_file_count, package_url: &package_url, package_id: &package_id, requester: &requester, @@ -190,8 +193,18 @@ impl TarballPrefetcher { /// Fire a background download of one resolved tarball. Deduplicated /// by URL; a no-op when the same URL was already prefetched or when /// `integrity` doesn't parse (the materialization install fetches - /// that package the normal way). - pub fn prefetch(&self, package_id: String, package_url: String, integrity: &str) { + /// that package the normal way). `unpacked_size` (the frame's + /// `unpackedSize`, when the registry published one) sizes the + /// decompression buffer and acts as the download's queueing + /// priority — largest pending archives start first. + pub fn prefetch( + &self, + package_id: String, + package_url: String, + integrity: &str, + unpacked_size: Option, + file_count: Option, + ) { let integrity = match integrity.parse::() { Ok(integrity) => integrity, Err(error) => { @@ -222,7 +235,8 @@ impl TarballPrefetcher { package_id, package_url, integrity, - package_unpacked_size: None, + package_unpacked_size: unpacked_size, + package_file_count: file_count, }); } diff --git a/pacquet/crates/pnpr-client/src/lib.rs b/pacquet/crates/pnpr-client/src/lib.rs index 234e3f5911..5f1a7b42b5 100644 --- a/pacquet/crates/pnpr-client/src/lib.rs +++ b/pacquet/crates/pnpr-client/src/lib.rs @@ -111,6 +111,15 @@ pub struct ResolvedPackage { pub integrity: String, /// The resolver's `dist.tarball` URL. pub tarball: String, + /// `dist.unpackedSize` from the server-side resolve, when the + /// registry published one. Sizes the decompression buffer exactly + /// and prioritizes the largest pending downloads when the + /// connection pool is saturated. + pub unpacked_size: Option, + /// `dist.fileCount` from the server-side resolve, when the registry + /// published one. The per-file term of the download priority's + /// pipeline-work estimate. + pub file_count: Option, } #[derive(Debug, Display, Error, From)] @@ -254,8 +263,24 @@ impl PnprClient { continue; } match parse_frame(line)? { - Frame::Package { id, name, version, integrity, tarball } => { - on_package(ResolvedPackage { id, name, version, integrity, tarball }); + Frame::Package { + id, + name, + version, + integrity, + tarball, + unpacked_size, + file_count, + } => { + on_package(ResolvedPackage { + id, + name, + version, + integrity, + tarball, + unpacked_size, + file_count, + }); } Frame::Done { lockfile, stats } => { return Ok(ResolveOutcome { lockfile: *lockfile, stats }); @@ -289,6 +314,13 @@ enum Frame { version: String, integrity: String, tarball: String, + /// Absent from frames sent by servers that predate the field + /// and for packages whose registry never published a + /// `dist.unpackedSize`. + #[serde(rename = "unpackedSize", default)] + unpacked_size: Option, + #[serde(rename = "fileCount", default)] + file_count: Option, }, /// Boxed: the lockfile dwarfs the other variants, so keeping it /// behind a pointer keeps the enum small. diff --git a/pacquet/crates/pnpr-client/src/tests.rs b/pacquet/crates/pnpr-client/src/tests.rs index 0287c02c0a..46a6ae4d5a 100644 --- a/pacquet/crates/pnpr-client/src/tests.rs +++ b/pacquet/crates/pnpr-client/src/tests.rs @@ -36,8 +36,8 @@ fn tarball_mismatch_maps_to_the_generic_envelope() { /// A `package` frame carries the fetch hint fields verbatim. #[test] fn a_package_frame_parses_its_fetch_hint() { - let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz"}"#; - let Frame::Package { id, name, version, integrity, tarball } = + let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz","unpackedSize":123456,"fileCount":42}"#; + let Frame::Package { id, name, version, integrity, tarball, unpacked_size, file_count } = parse_frame(line).expect("frame parses") else { panic!("expected a package frame"); @@ -47,6 +47,22 @@ fn a_package_frame_parses_its_fetch_hint() { assert_eq!(version, "1.0.0"); assert_eq!(integrity, "sha512-abc"); assert_eq!(tarball, "https://r.test/acme/-/acme-1.0.0.tgz"); + assert_eq!(unpacked_size, Some(123456)); + assert_eq!(file_count, Some(42)); +} + +/// A `package` frame without `unpackedSize` / `fileCount` — an older +/// server, or a registry that never published the fields — still +/// parses. +#[test] +fn a_package_frame_without_dist_stats_parses() { + let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz"}"#; + let Frame::Package { unpacked_size, file_count, .. } = parse_frame(line).expect("frame parses") + else { + panic!("expected a package frame"); + }; + assert_eq!(unpacked_size, None); + assert_eq!(file_count, None); } /// A line with no `type` tag is malformed, not a silent success. diff --git a/pacquet/crates/resolving-git-resolver/src/runners.rs b/pacquet/crates/resolving-git-resolver/src/runners.rs index 009ac53700..5c3d89b979 100644 --- a/pacquet/crates/resolving-git-resolver/src/runners.rs +++ b/pacquet/crates/resolving-git-resolver/src/runners.rs @@ -29,6 +29,7 @@ pub struct RealGitProbe { } impl RealGitProbe { + #[must_use] pub fn new(http_client: Arc) -> Self { Self { http_client, git_bin: None } } diff --git a/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier.rs b/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier.rs index fa6de3bd59..93c47e0815 100644 --- a/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier.rs +++ b/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier.rs @@ -23,6 +23,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc}; use chrono::{DateTime, Utc}; +use dashmap::DashMap; use pacquet_config::{TrustPolicy, version_policy::PackageVersionPolicy}; use pacquet_lockfile::{LockfileResolution, PkgName}; use pacquet_network::{AuthHeaders, RetryOpts, ThrottledClient}; @@ -47,6 +48,31 @@ use crate::{ }, }; +/// Per-version `dist` statistics that estimate a tarball's pipeline +/// work: `unpackedSize` (transfer + decompress + hash bytes) and +/// `fileCount` (per-file CAS-write overhead). Either may be absent — +/// registries only publish them for packages uploaded since npm 6. +#[derive(Debug, Default, Clone, Copy)] +pub struct DistStats { + pub unpacked_size: Option, + pub file_count: Option, +} + +/// `(package name, version) → dist` work statistics filled by the +/// verifier as a side product of the tarball-URL binding check. The +/// metadata is already in hand per entry, so collecting costs no extra +/// fetch; consumers (the pnpr server's frozen fast path) use the stats +/// to schedule the most expensive tarball downloads first. Shared as an +/// `Arc` so the caller keeps a handle while the verifier fan-out writes. +pub type ObservedDistStats = Arc>; + +/// Construct a fresh sink for +/// [`CreateNpmResolutionVerifierOptions::observed_dist_stats`]. +#[must_use] +pub fn observed_dist_stats_sink() -> ObservedDistStats { + Arc::new(DashMap::new()) +} + /// Options bundle for [`create_npm_resolution_verifier`]. Mirrors /// upstream's /// [`CreateNpmResolutionVerifierOptions`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/createNpmResolutionVerifier.ts#L28-L84). @@ -114,6 +140,10 @@ pub struct CreateNpmResolutionVerifierOptions { /// the `trustPolicyIgnoreAfter` window. `None` falls back to /// wall-clock at construction time. pub now: Option>, + /// Optional sink the verifier fills with each verified entry's + /// `dist` work statistics (see [`ObservedDistStats`]). `None` + /// skips collection. + pub observed_dist_stats: Option, } /// Verifier returned by [`create_npm_resolution_verifier`]. Stores @@ -143,6 +173,7 @@ pub struct NpmResolutionVerifier { now: Option>, policy_snapshot: serde_json::Map, lookup_context: PublishedAtLookupContext, + observed_dist_stats: Option, } impl std::fmt::Debug for NpmResolutionVerifier { @@ -223,6 +254,7 @@ pub fn create_npm_resolution_verifier( now: opts.now, policy_snapshot, lookup_context: PublishedAtLookupContext::new(), + observed_dist_stats: opts.observed_dist_stats, } } @@ -420,6 +452,12 @@ impl NpmResolutionVerifier { ) -> Option { let registry_tarball = match self.fetch_abbreviated_meta(registry, name).await { Ok(Some(meta)) => { + if let Some(sink) = self.observed_dist_stats.as_ref() + && let Some(stats) = + meta.version_dist_stats.as_ref().and_then(|stats| stats.get(version)) + { + sink.insert((name.to_string(), version.to_string()), *stats); + } meta.version_tarballs.and_then(|tarballs| tarballs.get(version).cloned()) } Ok(None) | Err(_) => None, @@ -1028,9 +1066,22 @@ fn project_abbreviated_meta(meta: &Package) -> crate::lookup_context::Abbreviate .iter() .map(|(version, manifest)| (version.clone(), manifest.dist.tarball.clone())) .collect(); + let version_dist_stats = meta + .versions + .iter() + .filter_map(|(version, manifest)| { + let stats = DistStats { + unpacked_size: manifest.dist.unpacked_size, + file_count: manifest.dist.file_count, + }; + (stats.unpacked_size.is_some() || stats.file_count.is_some()) + .then(|| (version.clone(), stats)) + }) + .collect(); crate::lookup_context::AbbreviatedMetaProjection { modified: meta.modified.clone(), version_tarballs: Some(version_tarballs), + version_dist_stats: Some(version_dist_stats), } } diff --git a/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier/tests.rs b/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier/tests.rs index 3491bf8d90..93b13c81d5 100644 --- a/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier/tests.rs +++ b/pacquet/crates/resolving-npm-resolver/src/create_npm_resolution_verifier/tests.rs @@ -8,7 +8,9 @@ use pacquet_resolving_resolver_base::{ResolutionVerification, ResolutionVerifier use pretty_assertions::assert_eq; use ssri::Integrity; -use super::{CreateNpmResolutionVerifierOptions, create_npm_resolution_verifier}; +use super::{ + CreateNpmResolutionVerifierOptions, create_npm_resolution_verifier, observed_dist_stats_sink, +}; const FAKE_INTEGRITY: &str = "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=="; @@ -53,6 +55,7 @@ fn default_opts(registry_url: &str) -> CreateNpmResolutionVerifierOptions { // backoff (10 s + 60 s) on every run. retry_opts: RetryOpts { retries: 0, ..RetryOpts::default() }, now: None, + observed_dist_stats: None, } } @@ -1052,3 +1055,57 @@ async fn concurrent_verifications_share_one_fetch() { } abbreviated_mock.assert_async().await; } + +/// The binding check records each verified entry's `dist.unpackedSize` +/// and `dist.fileCount` into the `observed_dist_stats` sink when one is +/// provided. +#[tokio::test] +async fn binding_check_records_dist_stats_into_the_sink() { + let mut server = mockito::Server::new_async().await; + let registry = format!("{}/", server.url()); + let server_url = server.url(); + let tarball_url = format!("{server_url}/acme/-/acme-1.0.0.tgz"); + let packument = serde_json::json!({ + "name": "acme", + "dist-tags": { "latest": "1.0.0" }, + "versions": { + "1.0.0": { + "name": "acme", + "version": "1.0.0", + "dist": { + "integrity": FAKE_INTEGRITY, + "tarball": tarball_url, + "unpackedSize": 123_456, + "fileCount": 42, + } + } + } + }); + let _meta_mock = server + .mock("GET", "/acme") + .with_status(200) + .with_body(packument.to_string()) + .create_async() + .await; + + let sink = observed_dist_stats_sink(); + let mut opts = default_opts(®istry); + opts.observed_dist_stats = Some(Arc::clone(&sink)); + let verifier = create_npm_resolution_verifier(opts); + let resolution = LockfileResolution::Tarball(TarballResolution { + tarball: tarball_url.clone(), + integrity: Some(fake_integrity()), + git_hosted: None, + path: None, + }); + let name: PkgName = "acme".parse().expect("parse"); + let result = verifier.verify(&resolution, ctx(&name, "1.0.0")).await; + + assert_eq!(result, ResolutionVerification::Ok); + let recorded = sink + .get(&("acme".to_string(), "1.0.0".to_string())) + .map(|entry| *entry.value()) + .expect("stats recorded"); + assert_eq!(recorded.unpacked_size, Some(123_456)); + assert_eq!(recorded.file_count, Some(42)); +} diff --git a/pacquet/crates/resolving-npm-resolver/src/lib.rs b/pacquet/crates/resolving-npm-resolver/src/lib.rs index 4fae3b5e66..819f6f8d3c 100644 --- a/pacquet/crates/resolving-npm-resolver/src/lib.rs +++ b/pacquet/crates/resolving-npm-resolver/src/lib.rs @@ -38,7 +38,8 @@ mod violation_codes; mod workspace_pref_to_npm; pub use create_npm_resolution_verifier::{ - CreateNpmResolutionVerifierOptions, NpmResolutionVerifier, create_npm_resolution_verifier, + CreateNpmResolutionVerifierOptions, DistStats, NpmResolutionVerifier, ObservedDistStats, + create_npm_resolution_verifier, observed_dist_stats_sink, }; pub use errors::FetchMetadataError; pub use fetch_attestation_published_at::{FetchAttestationOptions, fetch_attestation_published_at}; diff --git a/pacquet/crates/resolving-npm-resolver/src/lookup_context.rs b/pacquet/crates/resolving-npm-resolver/src/lookup_context.rs index 487111471e..7428f5bf1a 100644 --- a/pacquet/crates/resolving-npm-resolver/src/lookup_context.rs +++ b/pacquet/crates/resolving-npm-resolver/src/lookup_context.rs @@ -45,6 +45,15 @@ pub(crate) struct AbbreviatedMetaProjection { pub modified: Option, /// version → `dist.tarball`; key presence means the version is published. pub version_tarballs: Option>, + /// version → `dist` work statistics (`unpackedSize`, `fileCount`), + /// for the versions whose registry published either. Not part of + /// upstream's projection — carried so the verifier can surface + /// tarball work estimates to fetch scheduling (the verifier's + /// [`ObservedDistStats`] sink) without a second metadata + /// round-trip. + /// + /// [`ObservedDistStats`]: crate::ObservedDistStats + pub version_dist_stats: Option>, } /// Slot map of singleflight cells. Outer mutex guards lookup/insert; diff --git a/pacquet/crates/tarball/src/lib.rs b/pacquet/crates/tarball/src/lib.rs index efe232a3b6..fec657ba1f 100644 --- a/pacquet/crates/tarball/src/lib.rs +++ b/pacquet/crates/tarball/src/lib.rs @@ -11,7 +11,7 @@ use derive_more::{Display, Error, From}; use miette::Diagnostic; use pacquet_fs::file_mode; pub use pacquet_network::RetryOpts; -use pacquet_network::{AuthHeaders, ThrottledClient}; +use pacquet_network::{AuthHeaders, ThrottledClient, UNPRIORITIZED}; use pacquet_reporter::{ FetchingProgressLog, FetchingProgressMessage, LogEvent, LogLevel, ProgressLog, ProgressMessage, Reporter, RequestRetryError, RequestRetryLog, @@ -1314,6 +1314,12 @@ pub struct DownloadTarballToStore<'a> { pub verified_files_cache: SharedVerifiedFilesCache, pub package_integrity: &'a Integrity, pub package_unpacked_size: Option, + /// `dist.fileCount` when the registry published one. Combined with + /// `package_unpacked_size` into the download's queueing priority — + /// per-file pipeline overhead (CAS write syscalls, hashing) makes a + /// many-small-files package as slow to finish as a much larger + /// few-files one. + pub package_file_count: Option, pub package_url: &'a str, /// Stable identifier for the package, e.g. `"{name}@{version}"`. Paired /// with `package_integrity` to form the `SQLite` index key per pnpm v11's @@ -1504,6 +1510,7 @@ async fn fetch_and_extract_once( package_url: &str, expected_integrity: Option<&Integrity>, package_unpacked_size: Option, + download_priority: u64, package_id: &str, attempt: u32, store_dir: &'static StoreDir, @@ -1518,14 +1525,16 @@ async fn fetch_and_extract_once( // batch of futures `connect()` while previous bodies are still // draining, breaking the bound on concurrent open sockets. // - // `acquire_for_url` routes the request through the per-registry - // TLS-configured client when one is set for `package_url`'s - // nerf-darted prefix, falling back to the default client - // otherwise. Tarball hosts that differ from the metadata host - // still pick up the right per-registry client because the + // `acquire_for_url_with_priority` routes the request through the + // per-registry TLS-configured client when one is set for + // `package_url`'s nerf-darted prefix, falling back to the default + // client otherwise. Tarball hosts that differ from the metadata + // host still pick up the right per-registry client because the // 5-step `pickSettingByUrl` lookup also matches on the tarball - // URL. - let client = http_client.acquire_for_url(package_url).await; + // URL. When the pool is saturated, the package with the most + // estimated pipeline work claims the next freed slot, so the + // longest download+extract jobs never start last. + let client = http_client.acquire_for_url_with_priority(package_url, download_priority).await; let mut request = client.get(package_url); // Match pnpm's tarball download path // ([`remoteTarballFetcher.ts`](https://github.com/pnpm/pnpm/blob/601317e7a3/fetching/tarball-fetcher/src/remoteTarballFetcher.ts#L66-L70)): @@ -1789,6 +1798,30 @@ fn progress_already_reported(progress_key: Option<(&SharedReportedProgressKeys, progress_key.is_some_and(|(reported, key)| !reported.insert(key.to_owned())) } +/// Byte-equivalent cost of one file's fixed pipeline overhead (the +/// CAS-write syscalls and hash setup paid per file regardless of its +/// size, ~75 µs against a pipeline that moves a byte through +/// download + decompress + hash + write in ~25 ns). Folding it into +/// the priority makes a many-small-files package rank as the long +/// job it actually is: extraction cost, not just transfer cost, +/// decides when a package's pipeline work finishes. +const PRIORITY_BYTES_PER_FILE: u64 = 3_000; + +/// Queueing priority of a tarball download: the package's estimated +/// total pipeline work (transfer + decompress + hash + CAS writes) in +/// byte-equivalents. Missing hints contribute zero, so a package with +/// no published `dist` stats queues behind every estimated one. +#[must_use] +pub fn download_priority(unpacked_size: Option, file_count: Option) -> u64 { + let size = unpacked_size.map_or(0, |size| size as u64); + let per_file = + file_count.map_or(0, |count| (count as u64).saturating_mul(PRIORITY_BYTES_PER_FILE)); + // `UNPRIORITIZED` (`u64::MAX`) is the latency-class sentinel; a + // hostile registry publishing absurd `dist` stats must not be able + // to saturate a download's priority into that class. + size.saturating_add(per_file).min(UNPRIORITIZED - 1) +} + // 9 arguments — over the default clippy threshold but each is // distinct: client + URL + integrity describe the request, ID + // requester are the reporter dimensions, unpacked-size is allocation @@ -1804,6 +1837,7 @@ async fn fetch_and_extract_with_retry( package_url: &str, expected_integrity: Option<&Integrity>, package_unpacked_size: Option, + download_priority: u64, package_id: &str, requester: &str, store_dir: &'static StoreDir, @@ -1819,6 +1853,7 @@ async fn fetch_and_extract_with_retry( package_url, expected_integrity, package_unpacked_size, + download_priority, package_id, attempt, store_dir, @@ -2070,6 +2105,7 @@ impl<'a> DownloadTarballToStore<'a> { store_dir, package_integrity, package_unpacked_size, + package_file_count, package_url, package_id, requester, @@ -2184,6 +2220,7 @@ impl<'a> DownloadTarballToStore<'a> { package_url, Some(package_integrity), package_unpacked_size, + download_priority(package_unpacked_size, package_file_count), package_id, requester, store_dir, @@ -2269,11 +2306,15 @@ impl FetchTarballForResolution<'_> { // `name@version` is only known once the manifest is read below, // and the resolve-time fetch is silent (the install pass owns // the reporter ordering), so the placeholder never surfaces. + // `UNPRIORITIZED`: this fetch gates the resolver's walk (a + // tarball dep's manifest comes from its archive), so like a + // packument fetch it must not queue behind sized downloads. let (integrity, cas_paths, pkg_files_idx) = fetch_and_extract_with_retry::( http_client, package_url, None, None, + UNPRIORITIZED, package_url, package_url, store_dir, diff --git a/pacquet/crates/tarball/src/tests.rs b/pacquet/crates/tarball/src/tests.rs index b66ae0d29d..a378b948be 100644 --- a/pacquet/crates/tarball/src/tests.rs +++ b/pacquet/crates/tarball/src/tests.rs @@ -1,10 +1,10 @@ use super::{ DownloadTarballToStore, HttpStatusError, MemCache, NetworkError, PrefetchedCasPaths, RetryOpts, SharedReportedProgressKeys, TarballError, VerifyChecksumError, allocate_tarball_buffer, - extract_tarball_entries, extract_zip_entries, fetch_and_extract_with_retry, is_transient_error, - normalize_bundled_manifest, prefetch_cas_paths, + download_priority, extract_tarball_entries, extract_zip_entries, fetch_and_extract_with_retry, + is_transient_error, normalize_bundled_manifest, prefetch_cas_paths, }; -use pacquet_network::{AuthHeaders, ThrottledClient}; +use pacquet_network::{AuthHeaders, ThrottledClient, UNPRIORITIZED}; use pacquet_reporter::SilentReporter; use pacquet_store_dir::{ CafsFileInfo, PackageFilesIndex, SharedVerifiedFilesCache, StoreDir, StoreIndex, @@ -169,6 +169,7 @@ async fn packages_under_orgs_should_work() { verify_store_integrity: true, package_integrity: &integrity("sha512-dj7vjIn1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="), package_unpacked_size: Some(16697), + package_file_count: None, package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz", package_id: "@fastify/error@3.3.0", requester: "", @@ -232,6 +233,7 @@ async fn network_fetch_records_progress_key() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: Some(16697), + package_file_count: None, package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz", package_id: pkg_id, requester: "", @@ -267,6 +269,7 @@ async fn should_throw_error_on_checksum_mismatch() { verify_store_integrity: true, package_integrity: &integrity("sha512-aaaan1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="), package_unpacked_size: Some(16697), + package_file_count: None, package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz", package_id: "@fastify/error@3.3.0", requester: "", @@ -345,6 +348,7 @@ async fn reuses_cached_cas_paths_when_index_entry_is_live() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, // Any request that reaches the network here would fail the // test; the cache lookup must short-circuit before we get // near it. `fast_fail_client` caps that at 1 s per side in @@ -415,6 +419,7 @@ async fn reuses_prefetched_cas_paths_when_provided() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: "http://127.0.0.1:1/unreachable.tgz", package_id: pkg_id, requester: "", @@ -645,6 +650,7 @@ async fn falls_through_when_cafs_file_missing() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: "http://127.0.0.1:1/unreachable.tgz", package_id: pkg_id, requester: "", @@ -707,6 +713,7 @@ async fn falls_through_when_digest_is_malformed() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: "http://127.0.0.1:1/unreachable.tgz", package_id: pkg_id, requester: "", @@ -772,6 +779,7 @@ async fn falls_through_when_cafs_path_is_a_directory() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: "http://127.0.0.1:1/unreachable.tgz", package_id: pkg_id, requester: "", @@ -847,6 +855,7 @@ async fn falls_through_when_cafs_path_is_a_symlink() { verify_store_integrity: true, package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: "http://127.0.0.1:1/unreachable.tgz", package_id: pkg_id, requester: "", @@ -1126,6 +1135,7 @@ async fn retries_then_succeeds_on_transient_5xx() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1175,6 +1185,7 @@ async fn retries_integrity_mismatch_until_exhausted() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1208,6 +1219,7 @@ async fn fails_fast_on_404() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1250,6 +1262,7 @@ async fn retries_other_4xx_codes() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1286,6 +1299,7 @@ async fn retry_exhaustion_returns_last_error() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1409,6 +1423,7 @@ fn run_with_mem_cache_does_not_deadlock_on_dashmap_shard_contention() { verify_store_integrity: true, package_integrity: pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: url, package_id: "fastify-error@3.3.0", requester: "", @@ -1482,6 +1497,7 @@ async fn zero_retries_makes_a_single_attempt() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1528,6 +1544,7 @@ async fn fetch_attaches_authorization_header_when_creds_match_tarball_url() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1583,6 +1600,7 @@ async fn retry_re_attaches_authorization_header_on_each_attempt() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -1658,6 +1676,7 @@ async fn mem_cache_hit_emits_found_in_store_against_callers_reporter() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: "first@1.0.0", requester: "/proj", @@ -1686,6 +1705,7 @@ async fn mem_cache_hit_emits_found_in_store_against_callers_reporter() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: "second@2.0.0", requester: "/proj", @@ -1781,6 +1801,7 @@ async fn mem_cache_hit_skips_package_status_when_progress_already_reported() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: pkg_id, requester: "/proj", @@ -1818,6 +1839,7 @@ async fn mem_cache_hit_skips_package_status_when_progress_already_reported() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: pkg_id, requester: "/proj", @@ -1901,6 +1923,7 @@ async fn run_with_mem_cache_recovers_from_owning_fetch_error() { verified_files_cache: SharedVerifiedFilesCache::default(), package_integrity: pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: url, package_id: "deadlock@1.0.0", requester: "/proj", @@ -2005,6 +2028,7 @@ async fn fetching_progress_and_fetched_events_fire_during_download() { &url, Some(&pkg_integrity), None, + 0, "@fastify/error@3.3.0", "", store_path, @@ -2096,6 +2120,7 @@ async fn started_fires_for_connection_level_failures() { "http://127.0.0.1:1/pkg.tgz", // port 1 is reserved → connect-refused Some(&pkg_integrity), None, + 0, "test-pkg", "/proj", store_path, @@ -2186,6 +2211,7 @@ async fn found_in_store_event_fires_on_cache_hit() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: "@fastify/error@3.3.0", requester: "/proj", @@ -2225,6 +2251,7 @@ async fn found_in_store_event_fires_on_cache_hit() { verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: "@fastify/error@3.3.0", requester: "/proj", @@ -2306,6 +2333,7 @@ async fn request_retry_event_fires_per_retried_attempt() { &url, Some(&pkg_integrity), None, + 0, "test-pkg", "", store_path, @@ -2618,6 +2646,7 @@ async fn offline_mode_skips_network_on_cache_miss() { verified_files_cache: SharedVerifiedFilesCache::default(), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: pkg_id, requester: "", @@ -2689,6 +2718,7 @@ async fn offline_mode_still_uses_prefetched_cache() { verified_files_cache: SharedVerifiedFilesCache::default(), package_integrity: &pkg_integrity, package_unpacked_size: None, + package_file_count: None, package_url: &url, package_id: pkg_id, requester: "", @@ -2892,3 +2922,13 @@ mod normalize_bundled_manifest_tests { ); } } + +/// Saturated `dist` stats must not collide with the latency-class +/// sentinel (`UNPRIORITIZED`) — a hostile registry publishing absurd +/// sizes would otherwise reclassify its downloads as metadata. +#[test] +fn download_priority_never_reaches_the_latency_sentinel() { + let priority = download_priority(Some(usize::MAX), Some(usize::MAX)); + assert!(priority < UNPRIORITIZED); + assert_eq!(priority, UNPRIORITIZED - 1); +} diff --git a/pacquet/tasks/integrated-benchmark/src/cli_args.rs b/pacquet/tasks/integrated-benchmark/src/cli_args.rs index a485312d1f..1bcc44db7e 100644 --- a/pacquet/tasks/integrated-benchmark/src/cli_args.rs +++ b/pacquet/tasks/integrated-benchmark/src/cli_args.rs @@ -83,6 +83,16 @@ pub struct CliArgs { #[clap(long, default_value_t = 0.0)] pub registry_bandwidth_mbps: f64, + /// Model TCP slow start on the client↔registry link: each + /// connection ramps from a ~14.6 KB initial window toward + /// `--registry-bandwidth-mbps`, doubling per round trip, instead + /// of transmitting at the full cap from its first byte. Small and + /// mid-size tarballs then take the several round trips they cost + /// over a real link. Requires both `--registry-latency-ms` and + /// `--registry-bandwidth-mbps` to be set; no effect otherwise. + #[clap(long)] + pub registry_slow_start: bool, + /// Build each target without running the benchmark. #[clap(long)] pub build_only: bool, diff --git a/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs b/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs index e85fdf7356..84ac2cd778 100644 --- a/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs +++ b/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs @@ -43,6 +43,15 @@ pub struct LinkProfile { /// Per-direction throughput cap in bytes per second; `None` leaves /// the direction at loopback speed (unlimited). pub rate_limit: Option, + /// Model TCP slow start: each connection starts at an initial + /// congestion window (~14.6 KB) and its effective rate + /// (`cwnd ÷ RTT`) doubles per delivered window until it reaches + /// `rate_limit`. Without it a transfer runs at the full cap from + /// its first byte, which overstates real-TCP throughput for small + /// and mid-size objects — a transfer under ~10 windows never gets + /// near the link's capacity. Requires both `one_way > 0` (the ramp + /// is per-RTT) and a `rate_limit` (the ceiling); ignored otherwise. + pub slow_start: bool, } /// A running proxy. Drop stops accepting new connections and joins the @@ -128,6 +137,13 @@ fn accept_loop( /// client→server pump on a spawned one, so the connection's threads end /// together when either side closes. fn handle_connection(inbound: TcpStream, upstream: SocketAddr, profile: LinkProfile) { + // The accept listener is non-blocking, and BSD-derived platforms + // (macOS) hand accepted sockets the listener's O_NONBLOCK — the + // pump's blocking reads would then surface spurious `WouldBlock` + // errors and drop the connection before the first byte. + if inbound.set_nonblocking(false).is_err() { + return; + } let Ok(outbound) = TcpStream::connect(upstream) else { return }; let _ = inbound.set_nodelay(true); let _ = outbound.set_nodelay(true); @@ -170,6 +186,7 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) { // by each chunk's serialization time (`len / rate`) so the cap is a // sustained throughput limit, not just a per-chunk cap. let mut link_free_at = Instant::now(); + let mut slow_start = SlowStart::for_profile(&profile); while let Ok((release, bytes)) = rx.recv() { // A chunk leaves no earlier than its latency release *and* no // earlier than the link finishing the previous chunk. @@ -179,7 +196,11 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) { thread::sleep(send_at - now); } if let Some(rate) = profile.rate_limit { - link_free_at = send_at + Duration::from_secs_f64(bytes.len() as f64 / rate as f64); + let effective = match slow_start.as_mut() { + Some(ramp) => ramp.effective_rate(rate as f64, bytes.len()), + None => rate as f64, + }; + link_free_at = send_at + Duration::from_secs_f64(bytes.len() as f64 / effective); } if dst.write_all(&bytes).is_err() { break; @@ -195,5 +216,46 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) { let _ = reader.join(); } +/// Initial congestion window, mirroring RFC 6928's 10 segments of +/// ~1460 bytes. +const INITIAL_CWND_BYTES: f64 = 14_600.0; + +/// Per-connection slow-start state: effective rate is `cwnd ÷ RTT`, +/// and `cwnd` doubles each time a full window has been delivered, +/// until the rate reaches the link's cap. An approximation of real +/// TCP — no loss events, no congestion-avoidance phase — but it +/// reproduces the property that matters for install benchmarks: +/// per-connection throughput is a function of bytes already +/// delivered, so short transfers never reach the cap. +struct SlowStart { + cwnd: f64, + rtt_secs: f64, + bytes_in_round: f64, +} + +impl SlowStart { + /// `Some` ramp when the profile asks for one and has the RTT + + /// cap the model needs; `None` keeps the flat-rate behavior. + fn for_profile(profile: &LinkProfile) -> Option { + let rtt_secs = profile.one_way.as_secs_f64() * 2.0; + (profile.slow_start && rtt_secs > 0.0 && profile.rate_limit.is_some()) + .then_some(SlowStart { cwnd: INITIAL_CWND_BYTES, rtt_secs, bytes_in_round: 0.0 }) + } + + /// The rate (bytes/sec) at which the next `len`-byte chunk + /// serializes, advancing the window-growth bookkeeping. + fn effective_rate(&mut self, cap: f64, len: usize) -> f64 { + let rate = (self.cwnd / self.rtt_secs).min(cap); + self.bytes_in_round += len as f64; + if self.bytes_in_round >= self.cwnd { + self.bytes_in_round = 0.0; + // Stop growing once the window sustains the cap + // (`cap × RTT` is the bandwidth-delay product). + self.cwnd = (self.cwnd * 2.0).min((cap * self.rtt_secs).max(INITIAL_CWND_BYTES)); + } + rate + } +} + #[cfg(test)] mod tests; diff --git a/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs b/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs index 150a038026..cd7bbdf2c1 100644 --- a/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs +++ b/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs @@ -22,7 +22,7 @@ fn binds_to_requested_listen_addr() { let listen = reserved.local_addr().expect("listen addr"); drop(reserved); - let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None }; + let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None, slow_start: false }; let proxy = LatencyProxy::spawn_on(listen, upstream_addr, profile).expect("spawn proxy"); assert_eq!(proxy.addr, listen); @@ -56,7 +56,8 @@ fn injects_round_trip_latency() { socket.write_all(b"pong").expect("write reply"); }); - let profile = LinkProfile { one_way: Duration::from_millis(60), rate_limit: None }; + let profile = + LinkProfile { one_way: Duration::from_millis(60), rate_limit: None, slow_start: false }; let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy"); let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy"); @@ -95,7 +96,8 @@ fn caps_throughput_to_the_rate_limit() { // No latency, only a bandwidth cap, so the wall time is the // serialization delay alone. - let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: Some(RATE) }; + let profile = + LinkProfile { one_way: Duration::ZERO, rate_limit: Some(RATE), slow_start: false }; let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy"); let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy"); @@ -121,3 +123,55 @@ fn caps_throughput_to_the_rate_limit() { "transfer {elapsed:?} should reflect the ~0.26s bandwidth cap", ); } + +/// With slow start the same transfer takes the ramp-up rounds a real +/// TCP connection would: the early windows serialize at `cwnd ÷ RTT`, +/// far below the cap, so the wall time exceeds the flat-rate model's +/// `size ÷ cap` by several round trips. +#[test] +fn slow_start_ramps_per_connection_throughput() { + const PAYLOAD: usize = 256 * 1024; + const RATE: u64 = 10_000_000; // 10 MB/s cap + + let timed_transfer = |slow_start: bool| { + let upstream = TcpListener::bind(("127.0.0.1", 0)).expect("bind upstream"); + let upstream_addr = upstream.local_addr().expect("upstream addr"); + thread::spawn(move || { + let (mut socket, _) = upstream.accept().expect("accept"); + let mut buf = [0u8; 64]; + let _ = socket.read(&mut buf).expect("read request"); + socket.write_all(&vec![0u8; PAYLOAD]).expect("write payload"); + }); + + let profile = + LinkProfile { one_way: Duration::from_millis(20), rate_limit: Some(RATE), slow_start }; + let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy"); + + let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy"); + let start = Instant::now(); + client.write_all(b"go").expect("send request"); + let mut received = 0; + let mut buf = [0u8; 16 * 1024]; + loop { + match client.read(&mut buf) { + Ok(0) => break, + Ok(n) => received += n, + Err(_) => break, + } + } + assert_eq!(received, PAYLOAD, "the whole payload should arrive"); + start.elapsed() + }; + + let flat = timed_transfer(false); + let ramped = timed_transfer(true); + dbg!(flat, ramped); + // Flat: ~2×20ms latency + 256KiB/10MB/s ≈ 66 ms. Ramped: the first + // windows (14.6 KB and doubling) each serialize at cwnd/RTT, adding ~3-4 + // window-times before the rate approaches the cap. Require a solid + // margin rather than exact math to stay robust under CI jitter. + assert!( + ramped > flat + Duration::from_millis(60), + "slow start should add ramp-up time: flat {flat:?} vs ramped {ramped:?}", + ); +} diff --git a/pacquet/tasks/integrated-benchmark/src/main.rs b/pacquet/tasks/integrated-benchmark/src/main.rs index 737a599aec..4c0e5fa9d5 100644 --- a/pacquet/tasks/integrated-benchmark/src/main.rs +++ b/pacquet/tasks/integrated-benchmark/src/main.rs @@ -33,6 +33,7 @@ async fn main() { registry_latency_ms, pnpr_server_registry_latency_ms, registry_bandwidth_mbps, + registry_slow_start, reuse_prebuilt_binaries, build_only, targets, @@ -102,6 +103,7 @@ async fn main() { let profile = LinkProfile { one_way: Duration::from_millis(registry_latency_ms) / 2, rate_limit: registry_rate_limit, + slow_start: registry_slow_start, }; let proxy = LatencyProxy::spawn_on(listen, upstream, profile).expect("spawn registry proxy"); @@ -169,6 +171,7 @@ async fn main() { registry_latency_ms, pnpr_server_registry_latency_ms, registry_bandwidth_mbps, + registry_slow_start, registry_port: spawned_registry_port, reuse_prebuilt_binaries, }; diff --git a/pacquet/tasks/integrated-benchmark/src/work_env.rs b/pacquet/tasks/integrated-benchmark/src/work_env.rs index 66b24dca04..9715678bf4 100644 --- a/pacquet/tasks/integrated-benchmark/src/work_env.rs +++ b/pacquet/tasks/integrated-benchmark/src/work_env.rs @@ -68,6 +68,7 @@ pub struct WorkEnv { /// of being free on loopback. `0` leaves the registry at loopback /// speed. Ignored in `--registry=npm` mode (already remote). pub registry_bandwidth_mbps: f64, + pub registry_slow_start: bool, /// Port the local registry listens on, used as the proxy's upstream /// when latency or a bandwidth cap is requested. pub registry_port: u16, @@ -549,6 +550,7 @@ impl WorkEnv { let profile = LinkProfile { one_way: Duration::from_millis(self.pnpr_latency_ms) / 2, rate_limit: None, + slow_start: false, }; let proxy = LatencyProxy::spawn(upstream, profile).expect("spawn pnpr latency proxy"); let proxy_url = format!("http://{}", proxy.addr); @@ -648,6 +650,7 @@ impl WorkEnv { let profile = LinkProfile { one_way: Duration::from_millis(self.registry_latency_ms) / 2, rate_limit, + slow_start: self.registry_slow_start, }; let proxy = LatencyProxy::spawn(upstream, profile).expect("spawn registry proxy"); eprintln!( @@ -675,6 +678,7 @@ impl WorkEnv { let profile = LinkProfile { one_way: Duration::from_millis(self.pnpr_server_registry_latency_ms) / 2, rate_limit: None, + slow_start: false, }; let proxy = LatencyProxy::spawn(upstream, profile).expect("spawn pnpr server registry proxy"); diff --git a/pacquet/tasks/micro-benchmark/src/main.rs b/pacquet/tasks/micro-benchmark/src/main.rs index 96fa972764..0baeefca39 100644 --- a/pacquet/tasks/micro-benchmark/src/main.rs +++ b/pacquet/tasks/micro-benchmark/src/main.rs @@ -45,6 +45,7 @@ fn bench_tarball(criterion: &mut Criterion, server: &mut ServerGuard, fixtures_f verified_files_cache: pacquet_store_dir::SharedVerifiedFilesCache::default(), package_integrity: &package_integrity, package_unpacked_size: Some(16697), + package_file_count: None, package_url: url, package_id: "fast-querystring@1.0.0", requester: "", diff --git a/pnpr/crates/pnpr/src/resolver.rs b/pnpr/crates/pnpr/src/resolver.rs index d1df9898f8..bd3fda4e81 100644 --- a/pnpr/crates/pnpr/src/resolver.rs +++ b/pnpr/crates/pnpr/src/resolver.rs @@ -55,11 +55,15 @@ use axum::{ }; use indexmap::IndexMap; use pacquet_config::Config as PacquetConfig; -use pacquet_lockfile::Lockfile; +use pacquet_lockfile::{Lockfile, LockfileResolution}; use pacquet_lockfile_verification::{collect_resolution_policy_violations, hash_lockfile}; use pacquet_network::{AuthHeaders, ThrottledClient}; -use pacquet_package_manager::build_resolution_verifiers; -use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, PackageMetaCache}; +use pacquet_package_manager::{ + ResolvedPackageHint, build_resolution_verifiers, tarball_url_and_integrity, +}; +use pacquet_resolving_npm_resolver::{ + InMemoryPackageMetaCache, ObservedDistStats, PackageMetaCache, observed_dist_stats_sink, +}; use pacquet_resolving_resolver_base::ResolutionVerifier; use pacquet_store_dir::StoreDir; use sha2::{Digest, Sha256}; @@ -224,23 +228,32 @@ pub(crate) async fn handle_resolve(runtime: &Resolver, body: Bytes) -> Response // opt-out (mirrors the local path's `--trust-lockfile`). Freshly- // resolved entries are held to the same policy by the resolver's // pick-time gate (the policy is wired into `config`). + let mut verified_dist_stats = None; if !request.trust_lockfile && let Some(input_lockfile) = request.lockfile.as_ref() - && let Err(failure) = - verify_input_lockfile(runtime, config, &request_auth, input_lockfile).await { - return match failure { - VerifyFailure::Internal(response) => response, - VerifyFailure::Violations(violations) => { - ndjson_single_frame(&violations_frame(&violations)) + match verify_input_lockfile(runtime, config, &request_auth, input_lockfile).await { + Ok(stats) => verified_dist_stats = stats, + Err(VerifyFailure::Internal(response)) => return response, + Err(VerifyFailure::Violations(violations)) => { + return ndjson_single_frame(&violations_frame(&violations)); } - }; + } } // Short-circuit paths that produce the whole lockfile without an - // incremental tree walk: nothing to stream, so emit only `done`. + // incremental tree walk. A verified frozen lockfile still announces + // its tarballs as `package` frames when the verification fan-out + // just fetched their metadata — the sizes let the client start the + // largest downloads first. On a verdict-cache hit no metadata was + // fetched, so there's nothing to add and the response is the bare + // `done` frame. if let Some(lockfile) = resolve::fresh_frozen_input_lockfile(config, &request) { - return ndjson_single_frame(&done_frame(&lockfile)); + let mut frames = verified_dist_stats + .map(|sizes| frozen_package_frames(config, &lockfile, &sizes)) + .unwrap_or_default(); + frames.push(done_frame(&lockfile)); + return ndjson_frames(&frames); } let resolution_cache_key = if request.auth_headers.is_empty() && request.lockfile.is_none() { resolution_cache_key(config, &request) @@ -376,20 +389,92 @@ struct StreamObserver { impl pacquet_package_manager::ResolutionObserver for StreamObserver { fn on_resolved(&self, hint: pacquet_package_manager::ResolvedPackageHint<'_>) { - let frame = serde_json::json!({ - "type": "package", - "id": hint.id, - "name": hint.name, - "version": hint.version, - "integrity": hint.integrity, - "tarball": hint.tarball_url, - }); - if let Ok(line) = ndjson_line(&frame) { + if let Ok(line) = ndjson_line(&package_frame(&hint)) { let _ = self.tx.send(line); } } } +/// One `package` NDJSON frame. `unpackedSize` is omitted (not null) +/// when the registry never published a `dist.unpackedSize`, so older +/// clients parse the frame unchanged. +fn package_frame(hint: &ResolvedPackageHint<'_>) -> serde_json::Value { + let mut frame = serde_json::json!({ + "type": "package", + "id": hint.id, + "name": hint.name, + "version": hint.version, + "integrity": hint.integrity, + "tarball": hint.tarball_url, + }); + if let Some(size) = hint.unpacked_size { + frame["unpackedSize"] = serde_json::Value::from(size); + } + if let Some(count) = hint.file_count { + frame["fileCount"] = serde_json::Value::from(count); + } + frame +} + +/// `package` frames for every tarball-fetchable entry of a verified +/// frozen lockfile, deduplicated by tarball URL. Mirrors what the +/// streaming resolve's [`StreamObserver`] would have announced had the +/// tree walk run: the client prefetches each tarball on arrival, with +/// `unpackedSize` (from the verification fan-out's metadata, when the +/// registry published one) prioritizing the largest downloads. +/// +/// Tarball URLs are derived with the same +/// [`tarball_url_and_integrity`] the client's frozen materialization +/// uses, so the announced URLs match the client's mem-cache keys +/// byte-for-byte. Non-tarball resolutions (git, directory, binary, +/// variations) are skipped — the client fetches those through their +/// own protocol paths. +fn frozen_package_frames( + config: &PacquetConfig, + lockfile: &Lockfile, + dist_stats: &ObservedDistStats, +) -> Vec> { + let Some(packages) = lockfile.packages.as_ref() else { + return Vec::new(); + }; + let mut seen_urls = std::collections::HashSet::new(); + let mut frames = Vec::new(); + for (package_key, snapshot) in packages { + if !matches!( + snapshot.resolution, + LockfileResolution::Registry(_) | LockfileResolution::Tarball(_), + ) { + continue; + } + let Ok((tarball_url, integrity)) = + tarball_url_and_integrity(&snapshot.resolution, package_key, config) + else { + continue; + }; + if !seen_urls.insert(tarball_url.to_string()) { + continue; + } + let name = package_key.name.to_string(); + let version = package_key.suffix.version().to_string(); + let id = format!("{name}@{version}"); + let integrity = integrity.to_string(); + let stats = dist_stats.get(&(name.clone(), version.clone())).map(|entry| *entry.value()); + let frame = package_frame(&ResolvedPackageHint { + id: &id, + name: &name, + version: &version, + integrity: &integrity, + tarball_url: &tarball_url, + unpacked_size: stats.and_then(|stats| stats.unpacked_size), + file_count: stats.and_then(|stats| stats.file_count), + }); + if let Ok(line) = ndjson_line(&frame) { + frames.push(line); + } + } + frames +} + /// Terminal `done` frame: the full resolved lockfile + stats. The client /// writes the lockfile and fetches every tarball itself. fn done_frame(lockfile: &Lockfile) -> Vec { @@ -441,6 +526,17 @@ fn ndjson_single_frame(frame: &[u8]) -> Response { .expect("binary response is always valid") } +/// A 200 NDJSON response carrying several already-serialized frames in +/// one fixed body. Used by the frozen fast path, where every frame is +/// known up front — no channel to stream from. +fn ndjson_frames(frames: &[Vec]) -> Response { + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, NDJSON_CONTENT_TYPE) + .body(Body::from(frames.concat())) + .expect("binary response is always valid") +} + /// A 200 NDJSON response whose body drains the frame channel as the /// detached resolve task produces frames. Closing the channel (the task /// dropped its sender) ends the body. @@ -465,25 +561,29 @@ enum VerifyFailure { } /// Verify the client's input lockfile under the client's policy. On a -/// clean pass returns `Ok(())`; on a policy violation returns the -/// rendered violations so the caller can deliver them to the client. A -/// build-verifiers failure (e.g. an invalid exclude pattern) returns a -/// ready-made 500. +/// clean pass returns the [`ObservedDistStats`] the verifier +/// collected — `None` when the whole-lockfile verdict cache satisfied +/// the check without a fan-out (no metadata was fetched, so no sizes +/// exist). On a policy violation returns the rendered violations so +/// the caller can deliver them to the client. A build-verifiers +/// failure (e.g. an invalid exclude pattern) returns a ready-made 500. async fn verify_input_lockfile( runtime: &Resolver, config: &'static PacquetConfig, auth_headers: &Arc, lockfile: &Lockfile, -) -> Result<(), VerifyFailure> { +) -> Result, VerifyFailure> { // A fresh per-request packument cache shared with the verifier; the // on-disk metadata mirror under `/v11/metadata-full` is // warm across requests and is the real verification cache. let meta_cache = Arc::new(InMemoryPackageMetaCache::default()); + let dist_stats = observed_dist_stats_sink(); let verifiers = build_resolution_verifiers( config, Arc::clone(&runtime.client), Some(meta_cache as Arc), Some(Arc::clone(auth_headers)), + Some(Arc::clone(&dist_stats)), ) .map_err(|err| { VerifyFailure::Internal(json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string())) @@ -499,7 +599,7 @@ async fn verify_input_lockfile( verifiers.iter().all(|verifier| verifier.can_trust_past_check(policy)) }) { - return Ok(()); + return Ok(None); } let violations = collect_resolution_policy_violations(lockfile, &verifiers, None).await; @@ -507,7 +607,7 @@ async fn verify_input_lockfile( if let Some(cache) = runtime.verdict_cache.as_ref() { cache.record(&hash, &merge_policies(&verifiers)); } - return Ok(()); + return Ok(Some(dist_stats)); } let rendered: Vec = violations diff --git a/pnpr/crates/pnpr/src/resolver/tests.rs b/pnpr/crates/pnpr/src/resolver/tests.rs index 5d26d09d59..efd780c153 100644 --- a/pnpr/crates/pnpr/src/resolver/tests.rs +++ b/pnpr/crates/pnpr/src/resolver/tests.rs @@ -60,3 +60,77 @@ fn resolution_cache_key_changes_with_dependencies_and_policy() { assert_ne!(base_key, resolution_cache_key(&config, &different_dep)); assert_ne!(base_key, resolution_cache_key(&config, &different_policy)); } + +#[test] +fn a_package_frame_carries_unpacked_size_and_omits_it_when_unknown() { + use pacquet_package_manager::{ResolutionObserver, ResolvedPackageHint}; + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let observer = super::StreamObserver { tx }; + + let hint = |unpacked_size, file_count| ResolvedPackageHint { + id: "acme@1.0.0", + name: "acme", + version: "1.0.0", + integrity: "sha512-abc", + tarball_url: "https://r.test/acme/-/acme-1.0.0.tgz", + unpacked_size, + file_count, + }; + observer.on_resolved(hint(Some(123_456), Some(42))); + observer.on_resolved(hint(None, None)); + + let sized: serde_json::Value = + serde_json::from_slice(&rx.try_recv().expect("sized frame sent")).unwrap(); + assert_eq!(sized["unpackedSize"], serde_json::json!(123_456)); + assert_eq!(sized["fileCount"], serde_json::json!(42)); + + let unsized_frame: serde_json::Value = + serde_json::from_slice(&rx.try_recv().expect("unsized frame sent")).unwrap(); + dbg!(&unsized_frame); + assert!(unsized_frame.get("unpackedSize").is_none()); + assert!(unsized_frame.get("fileCount").is_none()); + assert_eq!(unsized_frame["tarball"], serde_json::json!("https://r.test/acme/-/acme-1.0.0.tgz")); +} + +#[test] +fn frozen_package_frames_announce_lockfile_tarballs_with_sizes() { + use pacquet_lockfile::Lockfile; + use pacquet_resolving_npm_resolver::{DistStats, observed_dist_stats_sink}; + + let lockfile: Lockfile = serde_json::from_value(serde_json::json!({ + "lockfileVersion": "9.0", + "importers": { + ".": { "dependencies": { "acme": { "specifier": "^1.0.0", "version": "1.0.0" } } } + }, + "packages": { + "acme@1.0.0": { + "resolution": { "integrity": "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==" } + }, + "linked-dir@1.0.0": { + "resolution": { "type": "directory", "directory": "../linked-dir" } + } + } + })) + .expect("lockfile parses"); + + let stats = observed_dist_stats_sink(); + stats.insert( + ("acme".to_string(), "1.0.0".to_string()), + DistStats { unpacked_size: Some(123_456), file_count: Some(42) }, + ); + + let frames = super::frozen_package_frames(&config(), &lockfile, &stats); + dbg!(frames.len()); + assert_eq!(frames.len(), 1); + + let frame: serde_json::Value = serde_json::from_slice(&frames[0]).unwrap(); + assert_eq!(frame["type"], serde_json::json!("package")); + assert_eq!(frame["id"], serde_json::json!("acme@1.0.0")); + assert_eq!( + frame["tarball"], + serde_json::json!("https://registry.example.test/acme/-/acme-1.0.0.tgz"), + ); + assert_eq!(frame["unpackedSize"], serde_json::json!(123_456)); + assert_eq!(frame["fileCount"], serde_json::json!(42)); +} diff --git a/pnpr/crates/pnpr/src/upstream.rs b/pnpr/crates/pnpr/src/upstream.rs index abf40bff3c..e39358d817 100644 --- a/pnpr/crates/pnpr/src/upstream.rs +++ b/pnpr/crates/pnpr/src/upstream.rs @@ -331,11 +331,6 @@ pub fn abbreviate_packument(packument: &Value, now: DateTime) -> Value { /// * `npm-signature` — the legacy PGP detached signature. npm stopped /// populating it years ago in favour of the ECDSA `signatures`, and /// nothing in pnpm or pacquet reads it. -/// * `fileCount` — read nowhere in pnpm or pacquet. -/// * `unpackedSize` — only `pnpm view` reads it, and that command -/// fetches the *full* metadata document (`fullMetadata: true`) which -/// pnpr serves unstripped, so dropping it from the abbreviated form -/// is safe. /// * `shasum` — the legacy sha1 hash, redundant once `integrity` (SRI) /// is present. "Present" mirrors pnpm's `getIntegrity` truthiness /// check (`if (dist.integrity)`): a non-empty string. An absent, @@ -346,13 +341,17 @@ pub fn abbreviate_packument(packument: &Value, now: DateTime) -> Value { /// preserved: it binds `name@version:integrity` to the upstream /// registry's key and is the input to a potential client-side /// install-time verification on the pnpr path. +/// +/// `unpackedSize` and `fileCount` are preserved: pacquet reads both +/// off the resolver-fetched manifest — `unpackedSize` sizes the +/// decompression buffer, and together they form the download's +/// queueing priority (the estimated pipeline work that starts the +/// most expensive tarballs first). fn trim_dist_fields(version: &mut serde_json::Map) { let Some(dist) = version.get_mut("dist").and_then(Value::as_object_mut) else { return; }; dist.remove("npm-signature"); - dist.remove("fileCount"); - dist.remove("unpackedSize"); if dist.get("integrity").and_then(Value::as_str).is_some_and(|integrity| !integrity.is_empty()) { dist.remove("shasum"); diff --git a/pnpr/crates/pnpr/src/upstream/tests.rs b/pnpr/crates/pnpr/src/upstream/tests.rs index caa8fd4f18..bedd7a72e1 100644 --- a/pnpr/crates/pnpr/src/upstream/tests.rs +++ b/pnpr/crates/pnpr/src/upstream/tests.rs @@ -338,12 +338,13 @@ fn abbreviation_drops_fields_the_resolver_ignores() { // `shasum` dropped because `integrity` is present. assert_eq!(version["dist"]["integrity"], "sha512-abc"); assert!(version["dist"].get("shasum").is_none()); - // Legacy PGP signature and unused size fields dropped; ECDSA - // registry signatures kept. + // Legacy PGP signature dropped; ECDSA registry signatures kept. assert!(version["dist"].get("npm-signature").is_none()); - assert!(version["dist"].get("fileCount").is_none()); - assert!(version["dist"].get("unpackedSize").is_none()); assert_eq!(version["dist"]["signatures"][0]["keyid"], "SHA256:xyz"); + // Size hints kept: pacquet reads both for decompression + // preallocation and download scheduling. + assert_eq!(version["dist"]["fileCount"], 12); + assert_eq!(version["dist"]["unpackedSize"], 34567); } #[test]