perf(network): schedule tarball downloads by estimated pipeline work (#12309)

## Summary

When the download connection pool saturates, freed slots are granted by a two-class scheduling policy instead of FIFO:

- **Latency class** (packument/metadata fetches, which gate resolution progress): served FIFO.
- **Throughput class** (tarball downloads): ranked by **estimated total pipeline work** — `unpackedSize + 3000 × fileCount` — so the most expensive download+extract jobs start first (longest-processing-time-first; a large archive that starts last runs alone at single-connection throughput while every other slot idles, see [pnpm/pnpm#12230](https://github.com/pnpm/pnpm/issues/12230)). The per-file term prices the fixed CAS-write/hash overhead, so a many-small-files package ranks as the long job it actually is.
- **Neither class can starve the other**: downloads are guaranteed a reserved half of the pool (strict metadata-first was measured to serialize cold installs — no tarball got a slot until resolution drained, costing the whole resolve/fetch overlap), and metadata wins beyond that reserve (a download backlog can't stall resolution). Both directions are work-conserving.

### How the size hints travel

- Local fresh installs read `dist.unpackedSize` / `dist.fileCount` off the resolver-fetched manifest (also fixes exact decompression-buffer preallocation on the prefetch path, previously hardcoded `None`).
- The pnpr `/v1/resolve` `package` frame carries both as optional `unpackedSize` / `fileCount` fields (omitted when the registry never published them; old clients and servers interoperate unchanged).
- pnpr frozen restores: the lockfile records no sizes, but the verification fan-out fetches each entry's metadata anyway — the npm verifier records both stats into an optional `ObservedDistStats` sink as a side product of the tarball-URL binding check, and the frozen fast path announces every verified tarball as a sized `package` frame before `done` (URLs derived by the same `tarball_url_and_integrity` the client materialization uses). Verdict-cache hits fetch no metadata and keep the bare `done` frame.
- pnpr's abbreviated metadata now **preserves** `unpackedSize`/`fileCount` instead of stripping them, since pacquet reads both.
- Resolve-time tarball fetches (tarball deps' manifests come from their archives) acquire in the latency class — they gate the resolver's walk.

### Benchmark tooling

- The integrated benchmark's latency proxy gained `--registry-slow-start`: per-connection TCP slow start (RFC 6928 initial window, doubling per delivered window toward the bandwidth cap), so scheduling effects that depend on per-connection ramp-up are measurable.
- Fixed a macOS bug where the proxy's accepted sockets inherited the listener's `O_NONBLOCK` and every proxied connection died on its first read — all shaped benchmark traffic silently failed before this.

## Measurements

Fixture: ~110 direct deps / 1308 packages (~90 MB wire), `isolated-linker.fresh-install.cold-cache.cold-store`, local mirror of real npm behind the shaped proxy (30 ms RTT, 80 Mbit/s per-connection cap, TCP slow start).

**Drift-controlled interleaved comparison** (4 alternating blocks x 4 runs each; sequential multi-target sessions on this machine showed up to +75% session-order drift, so block-paired ABAB is the only design we trust):

| target | mean +/- sd (n=16) |
| --- | --- |
| baseline FIFO | 14.36 s +/- 0.54 |
| this PR | **14.06 s +/- 0.70** |

The PR wins **all 4 paired blocks** (-0.18 s to -0.50 s, mean -0.30 s, ~2%). A scheduler ablation (reserve+FIFO, smallest-first, unpackedSize-only, work with K=3k and K=10k per file) ordered as the pipeline model predicts, but the per-variant deltas sit inside the session-drift noise, so only the FIFO-vs-full-design pairing is claimed. K in [3000, 10000] is indistinguishable.

**The starvation fix is the load-bearing piece, established mechanistically rather than by wall clock:** with strict metadata-first priority (an intermediate design), cold-install event timelines showed 4-7 s windows at install start with zero tarball activity - downloads never won a slot during the resolution burst, serializing the resolve and fetch phases. The reserved share removes those gaps entirely and the worst observed cold-install runs with it are within ~1 s of the median, where unreserved variants showed multi-second stragglers.

Real-registry A/B (15 randomized cold-install pairs against npmjs) is noise-bound on a saturated ~100 Mbit link (+/-3 s registry variance), median -0.17 s in this PR's favor - consistent with "never slower."
This commit is contained in:
Zoltan Kochan
2026-06-11 02:58:36 +02:00
committed by GitHub
parent ac367fce91
commit 657d322b15
32 changed files with 1151 additions and 87 deletions

View File

@@ -545,7 +545,13 @@ async fn install_via_pnpr<Reporter: self::Reporter + 'static>(
|| pkg.tarball.clone(),
|registry| registry.client_tarball_url(&pkg.tarball),
);
prefetcher.prefetch(pkg.id, tarball, &pkg.integrity);
prefetcher.prefetch(
pkg.id,
tarball,
&pkg.integrity,
pkg.unpacked_size,
pkg.file_count,
);
})
.await
}

View File

@@ -96,6 +96,7 @@ pub struct NodeResolver {
}
impl NodeResolver {
#[must_use]
pub fn new(http_client: Arc<ThrottledClient>) -> Self {
Self { http_client, node_download_mirrors: HashMap::new(), offline: false }
}

View File

@@ -183,6 +183,7 @@ async fn materialize<Reporter: self::Reporter>(
verified_files_cache: SharedVerifiedFilesCache::default(),
package_integrity: integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: tarball,
package_id: &package_id,
auth_headers: opts.auth_headers,

View File

@@ -1,4 +1,5 @@
mod auth;
mod priority_semaphore;
mod proxy;
mod retry;
#[cfg(test)]
@@ -10,13 +11,13 @@ pub use proxy::{NoProxySetting, ProxyConfig, ProxyError};
pub use retry::{RetryOpts, send_with_retry, should_retry_status};
pub use tls::{PerRegistryTls, RegistryTls, TlsConfig, TlsError};
use priority_semaphore::{Permit, PrioritySemaphore};
use proxy::{NoProxyMatcher, parse_proxy_url, strip_userinfo};
use reqwest::{
Certificate, Client, Identity, Proxy,
header::{HeaderMap, HeaderValue, USER_AGENT},
};
use std::{collections::HashMap, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration};
use tokio::sync::{Semaphore, SemaphorePermit};
/// Fallback `User-Agent` for the install client's no-config
/// constructors ([`ThrottledClient::new_for_installs`],
@@ -38,6 +39,15 @@ use tokio::sync::{Semaphore, SemaphorePermit};
/// The install client therefore always sends one.
pub const DEFAULT_USER_AGENT: &str = "pnpm";
/// Permit priority used by [`ThrottledClient::acquire`] /
/// [`ThrottledClient::acquire_for_url`] for callers that don't pass an
/// explicit one. Marks the request as latency class — packument and
/// other metadata fetches that gate resolution progress — served FIFO
/// and preferred over size-prioritized downloads beyond the downloads'
/// reserved share of the pool (see the `priority_semaphore` module
/// docs for the two-class grant policy).
pub const UNPRIORITIZED: u64 = u64::MAX;
/// Default per-request timeout in milliseconds, matching pnpm v11's
/// `fetchTimeout` default of `60000`
/// ([`config/reader/src/index.ts:151`](https://github.com/pnpm/pnpm/blob/1819226b51/config/reader/src/index.ts#L151)).
@@ -77,7 +87,8 @@ impl Default for NetworkSettings {
}
}
/// Wrapper around [`Client`] with concurrent request limit enforced by the [`Semaphore`] mechanism.
/// Wrapper around [`Client`] with a concurrent request limit enforced
/// by a priority-ordered semaphore (`priority_semaphore` module).
///
/// Holds a default [`Client`] for the top-level proxy / TLS config
/// plus an optional map of per-registry clients keyed by nerf-darted
@@ -87,9 +98,20 @@ impl Default for NetworkSettings {
/// client. The semaphore is shared across both — bounding the total
/// concurrent socket count regardless of which registry a request
/// targets.
///
/// When the pool saturates, freed slots are granted by a two-class
/// policy (see the `priority_semaphore` module docs): requests
/// acquired without an explicit priority ([`Self::acquire`],
/// [`Self::acquire_for_url`]) form the FIFO latency class (typically
/// metadata fetches gating resolution progress), while downloads pass
/// their estimated pipeline work through
/// [`Self::acquire_for_url_with_priority`] and are guaranteed a
/// reserved share of the pool, granted most-expensive-first — so the
/// longest download jobs start early and neither class starves the
/// other.
#[derive(Debug)]
pub struct ThrottledClient {
semaphore: Semaphore,
semaphore: PrioritySemaphore,
client: Client,
/// Per-registry clients keyed by nerf-darted URI. Empty when no
/// `//host/:cert=…` / `:key=…` / `:ca=…` / `:cafile=…` /
@@ -120,7 +142,7 @@ pub struct ThrottledClient {
/// still draining, and the per-process FD count overruns the
/// platform limit — surfacing as `EMFILE` "too many open files".
pub struct ThrottledClientGuard<'a> {
_permit: SemaphorePermit<'a>,
_permit: Permit,
client: &'a Client,
}
@@ -139,8 +161,7 @@ impl ThrottledClient {
/// against [`default_network_concurrency`] — typically the full
/// `send + body-consume` lifetime, not just `.send()`.
pub async fn acquire(&self) -> ThrottledClientGuard<'_> {
let permit =
self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon");
let permit = self.semaphore.acquire(UNPRIORITIZED).await;
ThrottledClientGuard { _permit: permit, client: &self.client }
}
@@ -290,7 +311,7 @@ impl ThrottledClient {
}
Ok(ThrottledClient {
semaphore: Semaphore::new(settings.network_concurrency),
semaphore: PrioritySemaphore::new(settings.network_concurrency),
client: default_client,
per_registry: per_registry_clients,
routing: per_registry.clone(),
@@ -304,7 +325,7 @@ impl ThrottledClient {
/// test-suite budget instead of waiting on TCP retry.
#[must_use]
pub fn from_client(client: Client) -> Self {
let semaphore = Semaphore::new(default_network_concurrency());
let semaphore = PrioritySemaphore::new(default_network_concurrency());
ThrottledClient {
semaphore,
client,
@@ -333,8 +354,22 @@ impl ThrottledClient {
/// just to satisfy the type signature — the lookup itself works
/// on the raw string form.
pub async fn acquire_for_url(&self, url: &str) -> ThrottledClientGuard<'_> {
let permit =
self.semaphore.acquire().await.expect("semaphore shouldn't have been closed this soon");
self.acquire_for_url_with_priority(url, UNPRIORITIZED).await
}
/// [`Self::acquire_for_url`], but queueing behind the saturated
/// pool at an explicit `priority` instead of [`UNPRIORITIZED`] —
/// the throughput class of the two-class grant policy. Tarball
/// downloads pass their estimated pipeline work (0 when unknown)
/// so that freed slots go to the most expensive pending archive
/// first — the longest download+extract jobs start earliest and
/// never end up running alone after the small ones drained.
pub async fn acquire_for_url_with_priority(
&self,
url: &str,
priority: u64,
) -> ThrottledClientGuard<'_> {
let permit = self.semaphore.acquire(priority).await;
let client = self
.routing
.pick_for_url(url)
@@ -502,9 +537,9 @@ pub enum ForInstallsError {
#[diagnostic(transparent)]
Tls(#[error(source)] TlsError),
/// `network_concurrency` resolved to `0`. A zero-permit
/// [`Semaphore`] would make every `acquire` block forever, hanging
/// the install. pnpm rejects the same value — its `p-queue` throws
/// `network_concurrency` resolved to `0`. A zero-permit semaphore
/// would make every `acquire` block forever, hanging the install.
/// pnpm rejects the same value — its `p-queue` throws
/// `Expected concurrency to be a number from 1 and up` — so pacquet
/// fails fast rather than deadlock.
#[display("networkConcurrency must be at least 1")]

View File

@@ -0,0 +1,243 @@
//! Class-aware permit dispenser backing
//! [`ThrottledClient`](crate::ThrottledClient).
//!
//! Two request classes share the pool:
//!
//! * **Latency** ([`crate::UNPRIORITIZED`]) — packument and other
//! metadata fetches that gate resolution progress. Served FIFO.
//! * **Throughput** (any other priority) — tarball downloads, ranked
//! by their estimated pipeline work so the most expensive archives
//! claim freed slots first (the longest-processing-time-first
//! scheduling argument; see
//! [pnpm/pnpm#12230](https://github.com/pnpm/pnpm/issues/12230) for
//! the per-connection bandwidth measurements behind it).
//!
//! Neither class may starve the other. Strictly preferring latency
//! work was measured to *serialize* a cold fresh install — during the
//! resolution burst no tarball ever got a slot, so the download
//! pipeline started only after resolution finished, costing the whole
//! resolve/fetch overlap. Instead the throughput class is guaranteed a
//! reserved share of the pool (half, rounded up — but never all of
//! it): when downloads hold
//! fewer than the reserve, a freed slot goes to the largest pending
//! download even while metadata is queued; beyond the reserve, queued
//! metadata wins. Both directions are work-conserving — either class
//! may use the whole pool while the other has nothing pending.
use std::{
cmp::Ordering,
collections::{BinaryHeap, VecDeque},
sync::{Arc, Mutex},
};
use tokio::sync::oneshot;
use crate::UNPRIORITIZED;
/// Counting semaphore with the two-class grant policy described in the
/// module docs. Cancel-safe: dropping a waiting [`acquire`] future
/// gives up its place in line, and a permit granted to a waiter that
/// was cancelled in the same instant passes on to the next waiter
/// instead of leaking.
///
/// [`acquire`]: Self::acquire
pub(crate) struct PrioritySemaphore {
state: Arc<Mutex<SemState>>,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum Class {
Latency,
Throughput,
}
impl Class {
fn of(priority: u64) -> Class {
if priority == UNPRIORITIZED { Class::Latency } else { Class::Throughput }
}
}
struct SemState {
free: usize,
latency_in_flight: usize,
throughput_in_flight: usize,
/// Minimum number of slots queued throughput work can always
/// grow into, even while latency work is queued.
throughput_reserve: usize,
/// Registration counter used as the FIFO tie-break.
next_seq: u64,
latency_waiters: VecDeque<Waiter>,
throughput_waiters: BinaryHeap<Waiter>,
}
struct Waiter {
priority: u64,
seq: u64,
tx: oneshot::Sender<Permit>,
}
/// `BinaryHeap` is a max-heap, so "greatest" pops first: order by
/// priority, then by *earlier* registration among equals.
impl Ord for Waiter {
fn cmp(&self, other: &Self) -> Ordering {
self.priority.cmp(&other.priority).then_with(|| other.seq.cmp(&self.seq))
}
}
impl PartialOrd for Waiter {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Waiter {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for Waiter {}
/// Owned permit returned by [`PrioritySemaphore::acquire`]. Dropping it
/// hands the slot to the next waiter per the class policy (or back to
/// the free-permit count when nobody is waiting).
pub(crate) struct Permit {
state: Arc<Mutex<SemState>>,
class: Class,
/// Cleared when the permit's release has been handed off inside
/// [`release`] (a waiter vanished between pop and send), so this
/// permit's own `Drop` must not release a second slot.
armed: bool,
}
impl Drop for Permit {
fn drop(&mut self) {
if self.armed {
release(&self.state, self.class);
}
}
}
impl PrioritySemaphore {
pub(crate) fn new(permits: usize) -> Self {
PrioritySemaphore {
state: Arc::new(Mutex::new(SemState {
free: permits,
latency_in_flight: 0,
throughput_in_flight: 0,
// Half the pool, but never all of it: a reserve that
// covers every permit (the `permits == 1` case) would
// invert the starvation guarantee — queued downloads
// would block metadata outright instead of sharing.
throughput_reserve: permits.div_ceil(2).min(permits.saturating_sub(1)),
next_seq: 0,
latency_waiters: VecDeque::new(),
throughput_waiters: BinaryHeap::new(),
})),
}
}
/// Wait for a permit. Free permits are claimed immediately; when
/// the pool is saturated the caller queues in its class and is
/// woken per the grant policy in the module docs.
pub(crate) async fn acquire(&self, priority: u64) -> Permit {
let class = Class::of(priority);
let rx = {
let mut state = self.state.lock().expect("priority semaphore lock poisoned");
if state.free > 0 {
state.free -= 1;
*state.count_mut(class) += 1;
return Permit { state: Arc::clone(&self.state), class, armed: true };
}
let (tx, rx) = oneshot::channel();
let seq = state.next_seq;
state.next_seq += 1;
let waiter = Waiter { priority, seq, tx };
match class {
Class::Latency => state.latency_waiters.push_back(waiter),
Class::Throughput => state.throughput_waiters.push(waiter),
}
rx
};
rx.await.expect("priority semaphore state dropped while a permit was awaited")
}
#[cfg(test)]
pub(crate) fn queued_waiters(&self) -> usize {
let state = self.state.lock().expect("priority semaphore lock poisoned");
state.latency_waiters.len() + state.throughput_waiters.len()
}
#[cfg(test)]
pub(crate) fn available_permits(&self) -> usize {
self.state.lock().expect("priority semaphore lock poisoned").free
}
}
impl SemState {
fn count_mut(&mut self, class: Class) -> &mut usize {
match class {
Class::Latency => &mut self.latency_in_flight,
Class::Throughput => &mut self.throughput_in_flight,
}
}
/// Pop the waiter the grant policy picks next, or `None` when both
/// queues are empty: throughput work below its reserve goes first,
/// then queued latency work, then throughput work above the
/// reserve.
fn next_waiter(&mut self) -> Option<(Waiter, Class)> {
if self.throughput_in_flight < self.throughput_reserve
&& let Some(waiter) = self.throughput_waiters.pop()
{
return Some((waiter, Class::Throughput));
}
if let Some(waiter) = self.latency_waiters.pop_front() {
return Some((waiter, Class::Latency));
}
self.throughput_waiters.pop().map(|waiter| (waiter, Class::Throughput))
}
}
impl std::fmt::Debug for PrioritySemaphore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock().expect("priority semaphore lock poisoned");
f.debug_struct("PrioritySemaphore")
.field("free", &state.free)
.field("latency_in_flight", &state.latency_in_flight)
.field("throughput_in_flight", &state.throughput_in_flight)
.field("latency_waiters", &state.latency_waiters.len())
.field("throughput_waiters", &state.throughput_waiters.len())
.finish()
}
}
/// Hand a freed slot to the next waiter per the grant policy, skipping
/// waiters whose `acquire` future was dropped while queued; with no
/// live waiter the slot returns to the free-permit count.
fn release(state_arc: &Arc<Mutex<SemState>>, released: Class) {
let mut state = state_arc.lock().expect("priority semaphore lock poisoned");
*state.count_mut(released) -= 1;
loop {
let Some((waiter, class)) = state.next_waiter() else {
state.free += 1;
return;
};
let permit = Permit { state: Arc::clone(state_arc), class, armed: true };
match waiter.tx.send(permit) {
Ok(()) => {
*state.count_mut(class) += 1;
return;
}
Err(mut returned) => {
// The receiver was dropped before the send, so this
// permit was never observed — disarm it and offer the
// slot to the next waiter. (Dropping it armed would
// re-enter `release` and deadlock on `state_arc`.)
returned.armed = false;
}
}
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,166 @@
use pretty_assertions::assert_eq;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use super::PrioritySemaphore;
use crate::UNPRIORITIZED;
/// Spawn a task that acquires with `priority`, records `label` on
/// grant, and releases immediately. Returns once the waiter is queued,
/// so registration order across calls is deterministic.
async fn spawn_waiter(
sem: &Arc<PrioritySemaphore>,
order: &Arc<Mutex<Vec<&'static str>>>,
label: &'static str,
priority: u64,
) -> tokio::task::JoinHandle<()> {
let queued_before = sem.queued_waiters();
let handle = {
let sem = Arc::clone(sem);
let order = Arc::clone(order);
tokio::spawn(async move {
let _permit = sem.acquire(priority).await;
order.lock().unwrap().push(label);
})
};
while sem.queued_waiters() == queued_before {
tokio::task::yield_now().await;
}
handle
}
#[tokio::test]
async fn saturated_semaphore_grants_highest_priority_first() {
let sem = Arc::new(PrioritySemaphore::new(1));
let holder = sem.acquire(0).await;
let order = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
for (label, priority) in [("small", 1), ("large", 500), ("medium", 30)] {
handles.push(spawn_waiter(&sem, &order, label, priority).await);
}
drop(holder);
for handle in handles {
handle.await.unwrap();
}
assert_eq!(*order.lock().unwrap(), vec!["large", "medium", "small"]);
}
#[tokio::test]
async fn equal_priorities_are_granted_fifo() {
let sem = Arc::new(PrioritySemaphore::new(1));
let holder = sem.acquire(0).await;
let order = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
for label in ["first", "second", "third"] {
handles.push(spawn_waiter(&sem, &order, label, 7).await);
}
drop(holder);
for handle in handles {
handle.await.unwrap();
}
assert_eq!(*order.lock().unwrap(), vec!["first", "second", "third"]);
}
#[tokio::test]
async fn cancelled_waiter_passes_its_grant_to_the_next_waiter() {
let sem = Arc::new(PrioritySemaphore::new(1));
let holder = sem.acquire(0).await;
let order = Arc::new(Mutex::new(Vec::new()));
let cancelled = spawn_waiter(&sem, &order, "cancelled", 100).await;
let survivor = spawn_waiter(&sem, &order, "survivor", 1).await;
cancelled.abort();
let abort_result = cancelled.await;
dbg!(&abort_result);
assert!(abort_result.unwrap_err().is_cancelled());
drop(holder);
tokio::time::timeout(Duration::from_secs(5), survivor)
.await
.expect("the freed permit should reach the surviving waiter")
.unwrap();
assert_eq!(*order.lock().unwrap(), vec!["survivor"]);
}
#[tokio::test]
async fn released_permit_with_no_waiters_returns_to_the_free_pool() {
let sem = PrioritySemaphore::new(1);
drop(sem.acquire(0).await);
drop(sem.acquire(0).await);
assert_eq!(sem.queued_waiters(), 0);
}
/// During a metadata flood, queued downloads still get slots up to
/// their reserved share — strict metadata-first was measured to
/// serialize cold installs (no download starts until resolution
/// drains), so the reserve is what keeps the two phases overlapping.
#[tokio::test]
async fn downloads_keep_their_reserved_share_during_a_metadata_flood() {
// 2 permits -> a throughput reserve of 1.
let sem = Arc::new(PrioritySemaphore::new(2));
let holder_meta = sem.acquire(UNPRIORITIZED).await;
let holder_download = sem.acquire(5).await;
let order = Arc::new(Mutex::new(Vec::new()));
let meta_one = spawn_waiter(&sem, &order, "meta-1", UNPRIORITIZED).await;
let meta_two = spawn_waiter(&sem, &order, "meta-2", UNPRIORITIZED).await;
let download = spawn_waiter(&sem, &order, "download", 9).await;
// Freeing the download slot drops throughput below its reserve, so
// the queued download outranks the earlier-queued metadata.
drop(holder_download);
for handle in [download, meta_one, meta_two] {
handle.await.unwrap();
}
drop(holder_meta);
assert_eq!(*order.lock().unwrap(), vec!["download", "meta-1", "meta-2"]);
}
/// The reserve is a floor, not free rein: with downloads already at
/// their share, a freed latency slot goes to queued metadata before a
/// higher-priority download — resolution progress can't be starved by
/// a backlog of large archives either.
#[tokio::test]
async fn metadata_outranks_downloads_beyond_the_reserve() {
let sem = Arc::new(PrioritySemaphore::new(2));
let holder_meta = sem.acquire(UNPRIORITIZED).await;
let holder_download = sem.acquire(5).await;
let order = Arc::new(Mutex::new(Vec::new()));
let download = spawn_waiter(&sem, &order, "download", 9).await;
let meta = spawn_waiter(&sem, &order, "meta", UNPRIORITIZED).await;
// Freeing the metadata slot keeps throughput at its reserve (the
// download holder is still running), so queued metadata wins even
// though the download was queued first with a high priority.
drop(holder_meta);
meta.await.unwrap();
drop(holder_download);
download.await.unwrap();
assert_eq!(*order.lock().unwrap(), vec!["meta", "download"]);
}
/// With a single permit the reserve must be zero — a reserve covering
/// the whole pool would invert the starvation guarantee and let queued
/// downloads block metadata outright.
#[tokio::test]
async fn single_permit_pool_still_serves_metadata_first() {
let sem = Arc::new(PrioritySemaphore::new(1));
let holder = sem.acquire(5).await;
let order = Arc::new(Mutex::new(Vec::new()));
let download = spawn_waiter(&sem, &order, "download", 9).await;
let meta = spawn_waiter(&sem, &order, "meta", UNPRIORITIZED).await;
drop(holder);
meta.await.unwrap();
download.await.unwrap();
assert_eq!(*order.lock().unwrap(), vec!["meta", "download"]);
}

View File

@@ -25,7 +25,8 @@ use pacquet_config::{
};
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_resolving_npm_resolver::{
CreateNpmResolutionVerifierOptions, PackageMetaCache, create_npm_resolution_verifier,
CreateNpmResolutionVerifierOptions, ObservedDistStats, PackageMetaCache,
create_npm_resolution_verifier,
};
use pacquet_resolving_resolver_base::ResolutionVerifier;
@@ -67,11 +68,16 @@ pub enum BuildVerifiersError {
/// during the same install yields the cached document instead of a
/// fresh round-trip. Pass `None` from contexts where no resolver
/// runs alongside (the frozen-install path, unit tests).
///
/// `observed_dist_stats` is the optional [`ObservedDistStats`] sink
/// the npm verifier fills with each verified entry's `dist` work
/// statistics; pass `None` when the caller has no use for them.
pub fn build_resolution_verifiers(
config: &Config,
http_client: Arc<ThrottledClient>,
meta_cache: Option<Arc<dyn PackageMetaCache>>,
auth_override: Option<Arc<AuthHeaders>>,
observed_dist_stats: Option<ObservedDistStats>,
) -> Result<Vec<Arc<dyn ResolutionVerifier>>, BuildVerifiersError> {
let mut verifiers: Vec<Arc<dyn ResolutionVerifier>> = Vec::new();
@@ -125,6 +131,7 @@ pub fn build_resolution_verifiers(
meta_cache,
retry_opts: retry_opts_from_config(config),
now: None,
observed_dist_stats,
};
verifiers.push(Arc::new(create_npm_resolution_verifier(opts)));

View File

@@ -649,6 +649,7 @@ where
Some(Arc::clone(&meta_cache)
as Arc<dyn pacquet_resolving_npm_resolver::PackageMetaCache>),
auth_override.clone(),
None,
)
.map_err(InstallError::BuildVerifiers)?;

View File

@@ -302,6 +302,7 @@ impl InstallPackageBySnapshot<'_> {
verified_files_cache: Arc::clone(verified_files_cache),
package_integrity: integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &tarball_url,
package_id: &package_id,
requester,
@@ -578,8 +579,16 @@ impl InstallPackageBySnapshot<'_> {
/// Resolve the tarball URL + integrity for tarball- and registry-shaped
/// resolutions. Factored out so the per-resolution-type dispatch in
/// [`InstallPackageBySnapshot::run`] reads top-down: each variant builds
/// its own `cas_paths`.
fn tarball_url_and_integrity<'a>(
/// its own `cas_paths`. Public because the pnpr server derives the same
/// URLs when it announces a verified frozen lockfile's tarballs to the
/// client — both sides must derive byte-identical URLs so the client's
/// prefetch mem-cache keys line up.
///
/// # Panics
///
/// On directory / git / binary / variations resolutions — callers gate
/// on the tarball/registry shapes first.
pub fn tarball_url_and_integrity<'a>(
resolution: &'a LockfileResolution,
package_key: &PackageKey,
config: &'a Config,
@@ -766,6 +775,7 @@ async fn fetch_binary_resolution_to_cas<Reporter: self::Reporter>(
verified_files_cache: Arc::clone(verified_files_cache),
package_integrity: &binary.integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &binary.url,
package_id: &package_id,
requester,

View File

@@ -147,6 +147,7 @@ impl InstallPackageFromRegistry<'_> {
if first_visit {
let (tarball_url, integrity) = extract_tarball(&resolution.resolution)?;
let unpacked_size = manifest_unpacked_size(resolution.manifest.as_deref());
let file_count = manifest_file_count(resolution.manifest.as_deref());
// `pnpm:progress resolved` mirrors pnpm's emit at
// <https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/deps-resolver/src/resolveDependencies.ts#L1586>:
@@ -172,6 +173,7 @@ impl InstallPackageFromRegistry<'_> {
verified_files_cache: SharedVerifiedFilesCache::clone(verified_files_cache),
package_integrity: &integrity,
package_unpacked_size: unpacked_size,
package_file_count: file_count,
package_url: tarball_url,
package_id: &package_id,
requester,
@@ -272,14 +274,21 @@ pub(crate) fn extract_tarball(
/// `None` when missing or non-numeric — the tarball extractor treats it
/// as a hint, not a hard requirement.
pub(crate) fn manifest_unpacked_size(manifest: Option<&Value>) -> Option<usize> {
manifest_dist_field(manifest, "unpackedSize")
}
/// Read `dist.fileCount` off the resolver-fetched manifest. Feeds the
/// download priority's per-file pipeline-cost term; `None` when the
/// registry never published one.
pub(crate) fn manifest_file_count(manifest: Option<&Value>) -> Option<usize> {
manifest_dist_field(manifest, "fileCount")
}
fn manifest_dist_field(manifest: Option<&Value>, field: &str) -> Option<usize> {
// `usize::try_from` so a `u64` value larger than the host's
// `usize` (32-bit targets) degrades to "no hint" rather than
// truncating silently and producing an undersized pre-allocation.
manifest?
.get("dist")?
.get("unpackedSize")?
.as_u64()
.and_then(|value| usize::try_from(value).ok())
manifest?.get("dist")?.get(field)?.as_u64().and_then(|value| usize::try_from(value).ok())
}
#[cfg(test)]

View File

@@ -24,7 +24,7 @@
//! path as `TarballError::SiblingFetchFailed`.
use crate::{
install_package_from_registry::{extract_tarball, manifest_unpacked_size},
install_package_from_registry::{extract_tarball, manifest_file_count, manifest_unpacked_size},
retry_config::retry_opts_from_config,
};
use dashmap::DashSet;
@@ -203,6 +203,7 @@ impl<Reporter: self::Reporter + 'static> PrefetchingResolver<Reporter> {
let package_id = format!("{}@{}", name_ver.name, name_ver.suffix);
let package_url = package_url.to_string();
let package_unpacked_size = manifest_unpacked_size(result.manifest.as_deref());
let package_file_count = manifest_file_count(result.manifest.as_deref());
let http_client = Arc::clone(&self.ctx.http_client);
let mem_cache = Arc::clone(&self.ctx.mem_cache);
@@ -238,6 +239,7 @@ impl<Reporter: self::Reporter + 'static> PrefetchingResolver<Reporter> {
verified_files_cache,
package_integrity: &integrity,
package_unpacked_size,
package_file_count,
package_url: &package_url,
package_id: &package_id,
requester: &requester,

View File

@@ -9,7 +9,9 @@
//! while the server is still resolving
//! ([pnpm/pnpm#12234](https://github.com/pnpm/pnpm/issues/12234)).
use crate::install_package_from_registry::extract_tarball;
use crate::install_package_from_registry::{
extract_tarball, manifest_file_count, manifest_unpacked_size,
};
use dashmap::DashSet;
use pacquet_resolving_resolver_base::{
LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, ResolveResult, Resolver,
@@ -32,6 +34,15 @@ pub struct ResolvedPackageHint<'a> {
/// The resolver's `dist.tarball` URL — the same string the install
/// pass looks up in the shared mem cache.
pub tarball_url: &'a str,
/// `dist.unpackedSize` from the resolver-fetched manifest, when
/// the registry published one. Lets the fetching side size its
/// decompression buffer exactly and start the largest archives
/// first when the connection pool is saturated.
pub unpacked_size: Option<usize>,
/// `dist.fileCount` from the resolver-fetched manifest, when the
/// registry published one. The per-file term of the download
/// priority's pipeline-work estimate.
pub file_count: Option<usize>,
}
/// Sink notified once per resolved tarball package during a resolve.
@@ -84,6 +95,8 @@ impl ObservingResolver {
version: &version,
integrity: &integrity,
tarball_url,
unpacked_size: manifest_unpacked_size(result.manifest.as_deref()),
file_count: manifest_file_count(result.manifest.as_deref()),
});
}
}

View File

@@ -47,6 +47,7 @@ pub(crate) struct TarballDownload {
pub package_url: String,
pub integrity: Integrity,
pub package_unpacked_size: Option<usize>,
pub package_file_count: Option<usize>,
}
/// [`tokio::spawn`] a single tarball download into the shared mem cache
@@ -74,6 +75,7 @@ pub(crate) fn spawn_tarball_download(download: TarballDownload) {
package_url,
integrity,
package_unpacked_size,
package_file_count,
} = download;
tokio::spawn(async move {
@@ -86,6 +88,7 @@ pub(crate) fn spawn_tarball_download(download: TarballDownload) {
verified_files_cache,
package_integrity: &integrity,
package_unpacked_size,
package_file_count,
package_url: &package_url,
package_id: &package_id,
requester: &requester,
@@ -190,8 +193,18 @@ impl TarballPrefetcher {
/// Fire a background download of one resolved tarball. Deduplicated
/// by URL; a no-op when the same URL was already prefetched or when
/// `integrity` doesn't parse (the materialization install fetches
/// that package the normal way).
pub fn prefetch(&self, package_id: String, package_url: String, integrity: &str) {
/// that package the normal way). `unpacked_size` (the frame's
/// `unpackedSize`, when the registry published one) sizes the
/// decompression buffer and acts as the download's queueing
/// priority — largest pending archives start first.
pub fn prefetch(
&self,
package_id: String,
package_url: String,
integrity: &str,
unpacked_size: Option<usize>,
file_count: Option<usize>,
) {
let integrity = match integrity.parse::<Integrity>() {
Ok(integrity) => integrity,
Err(error) => {
@@ -222,7 +235,8 @@ impl TarballPrefetcher {
package_id,
package_url,
integrity,
package_unpacked_size: None,
package_unpacked_size: unpacked_size,
package_file_count: file_count,
});
}

View File

@@ -111,6 +111,15 @@ pub struct ResolvedPackage {
pub integrity: String,
/// The resolver's `dist.tarball` URL.
pub tarball: String,
/// `dist.unpackedSize` from the server-side resolve, when the
/// registry published one. Sizes the decompression buffer exactly
/// and prioritizes the largest pending downloads when the
/// connection pool is saturated.
pub unpacked_size: Option<usize>,
/// `dist.fileCount` from the server-side resolve, when the registry
/// published one. The per-file term of the download priority's
/// pipeline-work estimate.
pub file_count: Option<usize>,
}
#[derive(Debug, Display, Error, From)]
@@ -254,8 +263,24 @@ impl PnprClient {
continue;
}
match parse_frame(line)? {
Frame::Package { id, name, version, integrity, tarball } => {
on_package(ResolvedPackage { id, name, version, integrity, tarball });
Frame::Package {
id,
name,
version,
integrity,
tarball,
unpacked_size,
file_count,
} => {
on_package(ResolvedPackage {
id,
name,
version,
integrity,
tarball,
unpacked_size,
file_count,
});
}
Frame::Done { lockfile, stats } => {
return Ok(ResolveOutcome { lockfile: *lockfile, stats });
@@ -289,6 +314,13 @@ enum Frame {
version: String,
integrity: String,
tarball: String,
/// Absent from frames sent by servers that predate the field
/// and for packages whose registry never published a
/// `dist.unpackedSize`.
#[serde(rename = "unpackedSize", default)]
unpacked_size: Option<usize>,
#[serde(rename = "fileCount", default)]
file_count: Option<usize>,
},
/// Boxed: the lockfile dwarfs the other variants, so keeping it
/// behind a pointer keeps the enum small.

View File

@@ -36,8 +36,8 @@ fn tarball_mismatch_maps_to_the_generic_envelope() {
/// A `package` frame carries the fetch hint fields verbatim.
#[test]
fn a_package_frame_parses_its_fetch_hint() {
let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz"}"#;
let Frame::Package { id, name, version, integrity, tarball } =
let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz","unpackedSize":123456,"fileCount":42}"#;
let Frame::Package { id, name, version, integrity, tarball, unpacked_size, file_count } =
parse_frame(line).expect("frame parses")
else {
panic!("expected a package frame");
@@ -47,6 +47,22 @@ fn a_package_frame_parses_its_fetch_hint() {
assert_eq!(version, "1.0.0");
assert_eq!(integrity, "sha512-abc");
assert_eq!(tarball, "https://r.test/acme/-/acme-1.0.0.tgz");
assert_eq!(unpacked_size, Some(123456));
assert_eq!(file_count, Some(42));
}
/// A `package` frame without `unpackedSize` / `fileCount` — an older
/// server, or a registry that never published the fields — still
/// parses.
#[test]
fn a_package_frame_without_dist_stats_parses() {
let line = br#"{"type":"package","id":"acme@1.0.0","name":"acme","version":"1.0.0","integrity":"sha512-abc","tarball":"https://r.test/acme/-/acme-1.0.0.tgz"}"#;
let Frame::Package { unpacked_size, file_count, .. } = parse_frame(line).expect("frame parses")
else {
panic!("expected a package frame");
};
assert_eq!(unpacked_size, None);
assert_eq!(file_count, None);
}
/// A line with no `type` tag is malformed, not a silent success.

View File

@@ -29,6 +29,7 @@ pub struct RealGitProbe {
}
impl RealGitProbe {
#[must_use]
pub fn new(http_client: Arc<ThrottledClient>) -> Self {
Self { http_client, git_bin: None }
}

View File

@@ -23,6 +23,7 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use pacquet_config::{TrustPolicy, version_policy::PackageVersionPolicy};
use pacquet_lockfile::{LockfileResolution, PkgName};
use pacquet_network::{AuthHeaders, RetryOpts, ThrottledClient};
@@ -47,6 +48,31 @@ use crate::{
},
};
/// Per-version `dist` statistics that estimate a tarball's pipeline
/// work: `unpackedSize` (transfer + decompress + hash bytes) and
/// `fileCount` (per-file CAS-write overhead). Either may be absent —
/// registries only publish them for packages uploaded since npm 6.
#[derive(Debug, Default, Clone, Copy)]
pub struct DistStats {
pub unpacked_size: Option<usize>,
pub file_count: Option<usize>,
}
/// `(package name, version) → dist` work statistics filled by the
/// verifier as a side product of the tarball-URL binding check. The
/// metadata is already in hand per entry, so collecting costs no extra
/// fetch; consumers (the pnpr server's frozen fast path) use the stats
/// to schedule the most expensive tarball downloads first. Shared as an
/// `Arc` so the caller keeps a handle while the verifier fan-out writes.
pub type ObservedDistStats = Arc<DashMap<(String, String), DistStats>>;
/// Construct a fresh sink for
/// [`CreateNpmResolutionVerifierOptions::observed_dist_stats`].
#[must_use]
pub fn observed_dist_stats_sink() -> ObservedDistStats {
Arc::new(DashMap::new())
}
/// Options bundle for [`create_npm_resolution_verifier`]. Mirrors
/// upstream's
/// [`CreateNpmResolutionVerifierOptions`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/createNpmResolutionVerifier.ts#L28-L84).
@@ -114,6 +140,10 @@ pub struct CreateNpmResolutionVerifierOptions {
/// the `trustPolicyIgnoreAfter` window. `None` falls back to
/// wall-clock at construction time.
pub now: Option<DateTime<Utc>>,
/// Optional sink the verifier fills with each verified entry's
/// `dist` work statistics (see [`ObservedDistStats`]). `None`
/// skips collection.
pub observed_dist_stats: Option<ObservedDistStats>,
}
/// Verifier returned by [`create_npm_resolution_verifier`]. Stores
@@ -143,6 +173,7 @@ pub struct NpmResolutionVerifier {
now: Option<DateTime<Utc>>,
policy_snapshot: serde_json::Map<String, JsonValue>,
lookup_context: PublishedAtLookupContext,
observed_dist_stats: Option<ObservedDistStats>,
}
impl std::fmt::Debug for NpmResolutionVerifier {
@@ -223,6 +254,7 @@ pub fn create_npm_resolution_verifier(
now: opts.now,
policy_snapshot,
lookup_context: PublishedAtLookupContext::new(),
observed_dist_stats: opts.observed_dist_stats,
}
}
@@ -420,6 +452,12 @@ impl NpmResolutionVerifier {
) -> Option<ResolutionVerification> {
let registry_tarball = match self.fetch_abbreviated_meta(registry, name).await {
Ok(Some(meta)) => {
if let Some(sink) = self.observed_dist_stats.as_ref()
&& let Some(stats) =
meta.version_dist_stats.as_ref().and_then(|stats| stats.get(version))
{
sink.insert((name.to_string(), version.to_string()), *stats);
}
meta.version_tarballs.and_then(|tarballs| tarballs.get(version).cloned())
}
Ok(None) | Err(_) => None,
@@ -1028,9 +1066,22 @@ fn project_abbreviated_meta(meta: &Package) -> crate::lookup_context::Abbreviate
.iter()
.map(|(version, manifest)| (version.clone(), manifest.dist.tarball.clone()))
.collect();
let version_dist_stats = meta
.versions
.iter()
.filter_map(|(version, manifest)| {
let stats = DistStats {
unpacked_size: manifest.dist.unpacked_size,
file_count: manifest.dist.file_count,
};
(stats.unpacked_size.is_some() || stats.file_count.is_some())
.then(|| (version.clone(), stats))
})
.collect();
crate::lookup_context::AbbreviatedMetaProjection {
modified: meta.modified.clone(),
version_tarballs: Some(version_tarballs),
version_dist_stats: Some(version_dist_stats),
}
}

View File

@@ -8,7 +8,9 @@ use pacquet_resolving_resolver_base::{ResolutionVerification, ResolutionVerifier
use pretty_assertions::assert_eq;
use ssri::Integrity;
use super::{CreateNpmResolutionVerifierOptions, create_npm_resolution_verifier};
use super::{
CreateNpmResolutionVerifierOptions, create_npm_resolution_verifier, observed_dist_stats_sink,
};
const FAKE_INTEGRITY: &str = "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==";
@@ -53,6 +55,7 @@ fn default_opts(registry_url: &str) -> CreateNpmResolutionVerifierOptions {
// backoff (10 s + 60 s) on every run.
retry_opts: RetryOpts { retries: 0, ..RetryOpts::default() },
now: None,
observed_dist_stats: None,
}
}
@@ -1052,3 +1055,57 @@ async fn concurrent_verifications_share_one_fetch() {
}
abbreviated_mock.assert_async().await;
}
/// The binding check records each verified entry's `dist.unpackedSize`
/// and `dist.fileCount` into the `observed_dist_stats` sink when one is
/// provided.
#[tokio::test]
async fn binding_check_records_dist_stats_into_the_sink() {
let mut server = mockito::Server::new_async().await;
let registry = format!("{}/", server.url());
let server_url = server.url();
let tarball_url = format!("{server_url}/acme/-/acme-1.0.0.tgz");
let packument = serde_json::json!({
"name": "acme",
"dist-tags": { "latest": "1.0.0" },
"versions": {
"1.0.0": {
"name": "acme",
"version": "1.0.0",
"dist": {
"integrity": FAKE_INTEGRITY,
"tarball": tarball_url,
"unpackedSize": 123_456,
"fileCount": 42,
}
}
}
});
let _meta_mock = server
.mock("GET", "/acme")
.with_status(200)
.with_body(packument.to_string())
.create_async()
.await;
let sink = observed_dist_stats_sink();
let mut opts = default_opts(&registry);
opts.observed_dist_stats = Some(Arc::clone(&sink));
let verifier = create_npm_resolution_verifier(opts);
let resolution = LockfileResolution::Tarball(TarballResolution {
tarball: tarball_url.clone(),
integrity: Some(fake_integrity()),
git_hosted: None,
path: None,
});
let name: PkgName = "acme".parse().expect("parse");
let result = verifier.verify(&resolution, ctx(&name, "1.0.0")).await;
assert_eq!(result, ResolutionVerification::Ok);
let recorded = sink
.get(&("acme".to_string(), "1.0.0".to_string()))
.map(|entry| *entry.value())
.expect("stats recorded");
assert_eq!(recorded.unpacked_size, Some(123_456));
assert_eq!(recorded.file_count, Some(42));
}

View File

@@ -38,7 +38,8 @@ mod violation_codes;
mod workspace_pref_to_npm;
pub use create_npm_resolution_verifier::{
CreateNpmResolutionVerifierOptions, NpmResolutionVerifier, create_npm_resolution_verifier,
CreateNpmResolutionVerifierOptions, DistStats, NpmResolutionVerifier, ObservedDistStats,
create_npm_resolution_verifier, observed_dist_stats_sink,
};
pub use errors::FetchMetadataError;
pub use fetch_attestation_published_at::{FetchAttestationOptions, fetch_attestation_published_at};

View File

@@ -45,6 +45,15 @@ pub(crate) struct AbbreviatedMetaProjection {
pub modified: Option<String>,
/// version → `dist.tarball`; key presence means the version is published.
pub version_tarballs: Option<HashMap<String, String>>,
/// version → `dist` work statistics (`unpackedSize`, `fileCount`),
/// for the versions whose registry published either. Not part of
/// upstream's projection — carried so the verifier can surface
/// tarball work estimates to fetch scheduling (the verifier's
/// [`ObservedDistStats`] sink) without a second metadata
/// round-trip.
///
/// [`ObservedDistStats`]: crate::ObservedDistStats
pub version_dist_stats: Option<HashMap<String, crate::DistStats>>,
}
/// Slot map of singleflight cells. Outer mutex guards lookup/insert;

View File

@@ -11,7 +11,7 @@ use derive_more::{Display, Error, From};
use miette::Diagnostic;
use pacquet_fs::file_mode;
pub use pacquet_network::RetryOpts;
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_network::{AuthHeaders, ThrottledClient, UNPRIORITIZED};
use pacquet_reporter::{
FetchingProgressLog, FetchingProgressMessage, LogEvent, LogLevel, ProgressLog, ProgressMessage,
Reporter, RequestRetryError, RequestRetryLog,
@@ -1314,6 +1314,12 @@ pub struct DownloadTarballToStore<'a> {
pub verified_files_cache: SharedVerifiedFilesCache,
pub package_integrity: &'a Integrity,
pub package_unpacked_size: Option<usize>,
/// `dist.fileCount` when the registry published one. Combined with
/// `package_unpacked_size` into the download's queueing priority —
/// per-file pipeline overhead (CAS write syscalls, hashing) makes a
/// many-small-files package as slow to finish as a much larger
/// few-files one.
pub package_file_count: Option<usize>,
pub package_url: &'a str,
/// Stable identifier for the package, e.g. `"{name}@{version}"`. Paired
/// with `package_integrity` to form the `SQLite` index key per pnpm v11's
@@ -1504,6 +1510,7 @@ async fn fetch_and_extract_once<Reporter: self::Reporter>(
package_url: &str,
expected_integrity: Option<&Integrity>,
package_unpacked_size: Option<usize>,
download_priority: u64,
package_id: &str,
attempt: u32,
store_dir: &'static StoreDir,
@@ -1518,14 +1525,16 @@ async fn fetch_and_extract_once<Reporter: self::Reporter>(
// batch of futures `connect()` while previous bodies are still
// draining, breaking the bound on concurrent open sockets.
//
// `acquire_for_url` routes the request through the per-registry
// TLS-configured client when one is set for `package_url`'s
// nerf-darted prefix, falling back to the default client
// otherwise. Tarball hosts that differ from the metadata host
// still pick up the right per-registry client because the
// `acquire_for_url_with_priority` routes the request through the
// per-registry TLS-configured client when one is set for
// `package_url`'s nerf-darted prefix, falling back to the default
// client otherwise. Tarball hosts that differ from the metadata
// host still pick up the right per-registry client because the
// 5-step `pickSettingByUrl` lookup also matches on the tarball
// URL.
let client = http_client.acquire_for_url(package_url).await;
// URL. When the pool is saturated, the package with the most
// estimated pipeline work claims the next freed slot, so the
// longest download+extract jobs never start last.
let client = http_client.acquire_for_url_with_priority(package_url, download_priority).await;
let mut request = client.get(package_url);
// Match pnpm's tarball download path
// ([`remoteTarballFetcher.ts`](https://github.com/pnpm/pnpm/blob/601317e7a3/fetching/tarball-fetcher/src/remoteTarballFetcher.ts#L66-L70)):
@@ -1789,6 +1798,30 @@ fn progress_already_reported(progress_key: Option<(&SharedReportedProgressKeys,
progress_key.is_some_and(|(reported, key)| !reported.insert(key.to_owned()))
}
/// Byte-equivalent cost of one file's fixed pipeline overhead (the
/// CAS-write syscalls and hash setup paid per file regardless of its
/// size, ~75 µs against a pipeline that moves a byte through
/// download + decompress + hash + write in ~25 ns). Folding it into
/// the priority makes a many-small-files package rank as the long
/// job it actually is: extraction cost, not just transfer cost,
/// decides when a package's pipeline work finishes.
const PRIORITY_BYTES_PER_FILE: u64 = 3_000;
/// Queueing priority of a tarball download: the package's estimated
/// total pipeline work (transfer + decompress + hash + CAS writes) in
/// byte-equivalents. Missing hints contribute zero, so a package with
/// no published `dist` stats queues behind every estimated one.
#[must_use]
pub fn download_priority(unpacked_size: Option<usize>, file_count: Option<usize>) -> u64 {
let size = unpacked_size.map_or(0, |size| size as u64);
let per_file =
file_count.map_or(0, |count| (count as u64).saturating_mul(PRIORITY_BYTES_PER_FILE));
// `UNPRIORITIZED` (`u64::MAX`) is the latency-class sentinel; a
// hostile registry publishing absurd `dist` stats must not be able
// to saturate a download's priority into that class.
size.saturating_add(per_file).min(UNPRIORITIZED - 1)
}
// 9 arguments — over the default clippy threshold but each is
// distinct: client + URL + integrity describe the request, ID +
// requester are the reporter dimensions, unpacked-size is allocation
@@ -1804,6 +1837,7 @@ async fn fetch_and_extract_with_retry<Reporter: self::Reporter>(
package_url: &str,
expected_integrity: Option<&Integrity>,
package_unpacked_size: Option<usize>,
download_priority: u64,
package_id: &str,
requester: &str,
store_dir: &'static StoreDir,
@@ -1819,6 +1853,7 @@ async fn fetch_and_extract_with_retry<Reporter: self::Reporter>(
package_url,
expected_integrity,
package_unpacked_size,
download_priority,
package_id,
attempt,
store_dir,
@@ -2070,6 +2105,7 @@ impl<'a> DownloadTarballToStore<'a> {
store_dir,
package_integrity,
package_unpacked_size,
package_file_count,
package_url,
package_id,
requester,
@@ -2184,6 +2220,7 @@ impl<'a> DownloadTarballToStore<'a> {
package_url,
Some(package_integrity),
package_unpacked_size,
download_priority(package_unpacked_size, package_file_count),
package_id,
requester,
store_dir,
@@ -2269,11 +2306,15 @@ impl FetchTarballForResolution<'_> {
// `name@version` is only known once the manifest is read below,
// and the resolve-time fetch is silent (the install pass owns
// the reporter ordering), so the placeholder never surfaces.
// `UNPRIORITIZED`: this fetch gates the resolver's walk (a
// tarball dep's manifest comes from its archive), so like a
// packument fetch it must not queue behind sized downloads.
let (integrity, cas_paths, pkg_files_idx) = fetch_and_extract_with_retry::<Reporter>(
http_client,
package_url,
None,
None,
UNPRIORITIZED,
package_url,
package_url,
store_dir,

View File

@@ -1,10 +1,10 @@
use super::{
DownloadTarballToStore, HttpStatusError, MemCache, NetworkError, PrefetchedCasPaths, RetryOpts,
SharedReportedProgressKeys, TarballError, VerifyChecksumError, allocate_tarball_buffer,
extract_tarball_entries, extract_zip_entries, fetch_and_extract_with_retry, is_transient_error,
normalize_bundled_manifest, prefetch_cas_paths,
download_priority, extract_tarball_entries, extract_zip_entries, fetch_and_extract_with_retry,
is_transient_error, normalize_bundled_manifest, prefetch_cas_paths,
};
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_network::{AuthHeaders, ThrottledClient, UNPRIORITIZED};
use pacquet_reporter::SilentReporter;
use pacquet_store_dir::{
CafsFileInfo, PackageFilesIndex, SharedVerifiedFilesCache, StoreDir, StoreIndex,
@@ -169,6 +169,7 @@ async fn packages_under_orgs_should_work() {
verify_store_integrity: true,
package_integrity: &integrity("sha512-dj7vjIn1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="),
package_unpacked_size: Some(16697),
package_file_count: None,
package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz",
package_id: "@fastify/error@3.3.0",
requester: "",
@@ -232,6 +233,7 @@ async fn network_fetch_records_progress_key() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: Some(16697),
package_file_count: None,
package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz",
package_id: pkg_id,
requester: "",
@@ -267,6 +269,7 @@ async fn should_throw_error_on_checksum_mismatch() {
verify_store_integrity: true,
package_integrity: &integrity("sha512-aaaan1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="),
package_unpacked_size: Some(16697),
package_file_count: None,
package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz",
package_id: "@fastify/error@3.3.0",
requester: "",
@@ -345,6 +348,7 @@ async fn reuses_cached_cas_paths_when_index_entry_is_live() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
// Any request that reaches the network here would fail the
// test; the cache lookup must short-circuit before we get
// near it. `fast_fail_client` caps that at 1 s per side in
@@ -415,6 +419,7 @@ async fn reuses_prefetched_cas_paths_when_provided() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: "http://127.0.0.1:1/unreachable.tgz",
package_id: pkg_id,
requester: "",
@@ -645,6 +650,7 @@ async fn falls_through_when_cafs_file_missing() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: "http://127.0.0.1:1/unreachable.tgz",
package_id: pkg_id,
requester: "",
@@ -707,6 +713,7 @@ async fn falls_through_when_digest_is_malformed() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: "http://127.0.0.1:1/unreachable.tgz",
package_id: pkg_id,
requester: "",
@@ -772,6 +779,7 @@ async fn falls_through_when_cafs_path_is_a_directory() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: "http://127.0.0.1:1/unreachable.tgz",
package_id: pkg_id,
requester: "",
@@ -847,6 +855,7 @@ async fn falls_through_when_cafs_path_is_a_symlink() {
verify_store_integrity: true,
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: "http://127.0.0.1:1/unreachable.tgz",
package_id: pkg_id,
requester: "",
@@ -1126,6 +1135,7 @@ async fn retries_then_succeeds_on_transient_5xx() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1175,6 +1185,7 @@ async fn retries_integrity_mismatch_until_exhausted() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1208,6 +1219,7 @@ async fn fails_fast_on_404() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1250,6 +1262,7 @@ async fn retries_other_4xx_codes() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1286,6 +1299,7 @@ async fn retry_exhaustion_returns_last_error() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1409,6 +1423,7 @@ fn run_with_mem_cache_does_not_deadlock_on_dashmap_shard_contention() {
verify_store_integrity: true,
package_integrity: pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: url,
package_id: "fastify-error@3.3.0",
requester: "",
@@ -1482,6 +1497,7 @@ async fn zero_retries_makes_a_single_attempt() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1528,6 +1544,7 @@ async fn fetch_attaches_authorization_header_when_creds_match_tarball_url() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1583,6 +1600,7 @@ async fn retry_re_attaches_authorization_header_on_each_attempt() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -1658,6 +1676,7 @@ async fn mem_cache_hit_emits_found_in_store_against_callers_reporter() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: "first@1.0.0",
requester: "/proj",
@@ -1686,6 +1705,7 @@ async fn mem_cache_hit_emits_found_in_store_against_callers_reporter() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: "second@2.0.0",
requester: "/proj",
@@ -1781,6 +1801,7 @@ async fn mem_cache_hit_skips_package_status_when_progress_already_reported() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: pkg_id,
requester: "/proj",
@@ -1818,6 +1839,7 @@ async fn mem_cache_hit_skips_package_status_when_progress_already_reported() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: pkg_id,
requester: "/proj",
@@ -1901,6 +1923,7 @@ async fn run_with_mem_cache_recovers_from_owning_fetch_error() {
verified_files_cache: SharedVerifiedFilesCache::default(),
package_integrity: pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: url,
package_id: "deadlock@1.0.0",
requester: "/proj",
@@ -2005,6 +2028,7 @@ async fn fetching_progress_and_fetched_events_fire_during_download() {
&url,
Some(&pkg_integrity),
None,
0,
"@fastify/error@3.3.0",
"",
store_path,
@@ -2096,6 +2120,7 @@ async fn started_fires_for_connection_level_failures() {
"http://127.0.0.1:1/pkg.tgz", // port 1 is reserved → connect-refused
Some(&pkg_integrity),
None,
0,
"test-pkg",
"/proj",
store_path,
@@ -2186,6 +2211,7 @@ async fn found_in_store_event_fires_on_cache_hit() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: "@fastify/error@3.3.0",
requester: "/proj",
@@ -2225,6 +2251,7 @@ async fn found_in_store_event_fires_on_cache_hit() {
verified_files_cache: SharedVerifiedFilesCache::clone(&verified_files_cache),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: "@fastify/error@3.3.0",
requester: "/proj",
@@ -2306,6 +2333,7 @@ async fn request_retry_event_fires_per_retried_attempt() {
&url,
Some(&pkg_integrity),
None,
0,
"test-pkg",
"",
store_path,
@@ -2618,6 +2646,7 @@ async fn offline_mode_skips_network_on_cache_miss() {
verified_files_cache: SharedVerifiedFilesCache::default(),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: pkg_id,
requester: "",
@@ -2689,6 +2718,7 @@ async fn offline_mode_still_uses_prefetched_cache() {
verified_files_cache: SharedVerifiedFilesCache::default(),
package_integrity: &pkg_integrity,
package_unpacked_size: None,
package_file_count: None,
package_url: &url,
package_id: pkg_id,
requester: "",
@@ -2892,3 +2922,13 @@ mod normalize_bundled_manifest_tests {
);
}
}
/// Saturated `dist` stats must not collide with the latency-class
/// sentinel (`UNPRIORITIZED`) — a hostile registry publishing absurd
/// sizes would otherwise reclassify its downloads as metadata.
#[test]
fn download_priority_never_reaches_the_latency_sentinel() {
let priority = download_priority(Some(usize::MAX), Some(usize::MAX));
assert!(priority < UNPRIORITIZED);
assert_eq!(priority, UNPRIORITIZED - 1);
}

View File

@@ -83,6 +83,16 @@ pub struct CliArgs {
#[clap(long, default_value_t = 0.0)]
pub registry_bandwidth_mbps: f64,
/// Model TCP slow start on the client↔registry link: each
/// connection ramps from a ~14.6 KB initial window toward
/// `--registry-bandwidth-mbps`, doubling per round trip, instead
/// of transmitting at the full cap from its first byte. Small and
/// mid-size tarballs then take the several round trips they cost
/// over a real link. Requires both `--registry-latency-ms` and
/// `--registry-bandwidth-mbps` to be set; no effect otherwise.
#[clap(long)]
pub registry_slow_start: bool,
/// Build each target without running the benchmark.
#[clap(long)]
pub build_only: bool,

View File

@@ -43,6 +43,15 @@ pub struct LinkProfile {
/// Per-direction throughput cap in bytes per second; `None` leaves
/// the direction at loopback speed (unlimited).
pub rate_limit: Option<u64>,
/// Model TCP slow start: each connection starts at an initial
/// congestion window (~14.6 KB) and its effective rate
/// (`cwnd ÷ RTT`) doubles per delivered window until it reaches
/// `rate_limit`. Without it a transfer runs at the full cap from
/// its first byte, which overstates real-TCP throughput for small
/// and mid-size objects — a transfer under ~10 windows never gets
/// near the link's capacity. Requires both `one_way > 0` (the ramp
/// is per-RTT) and a `rate_limit` (the ceiling); ignored otherwise.
pub slow_start: bool,
}
/// A running proxy. Drop stops accepting new connections and joins the
@@ -128,6 +137,13 @@ fn accept_loop(
/// client→server pump on a spawned one, so the connection's threads end
/// together when either side closes.
fn handle_connection(inbound: TcpStream, upstream: SocketAddr, profile: LinkProfile) {
// The accept listener is non-blocking, and BSD-derived platforms
// (macOS) hand accepted sockets the listener's O_NONBLOCK — the
// pump's blocking reads would then surface spurious `WouldBlock`
// errors and drop the connection before the first byte.
if inbound.set_nonblocking(false).is_err() {
return;
}
let Ok(outbound) = TcpStream::connect(upstream) else { return };
let _ = inbound.set_nodelay(true);
let _ = outbound.set_nodelay(true);
@@ -170,6 +186,7 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) {
// by each chunk's serialization time (`len / rate`) so the cap is a
// sustained throughput limit, not just a per-chunk cap.
let mut link_free_at = Instant::now();
let mut slow_start = SlowStart::for_profile(&profile);
while let Ok((release, bytes)) = rx.recv() {
// A chunk leaves no earlier than its latency release *and* no
// earlier than the link finishing the previous chunk.
@@ -179,7 +196,11 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) {
thread::sleep(send_at - now);
}
if let Some(rate) = profile.rate_limit {
link_free_at = send_at + Duration::from_secs_f64(bytes.len() as f64 / rate as f64);
let effective = match slow_start.as_mut() {
Some(ramp) => ramp.effective_rate(rate as f64, bytes.len()),
None => rate as f64,
};
link_free_at = send_at + Duration::from_secs_f64(bytes.len() as f64 / effective);
}
if dst.write_all(&bytes).is_err() {
break;
@@ -195,5 +216,46 @@ fn pump(mut src: TcpStream, mut dst: TcpStream, profile: LinkProfile) {
let _ = reader.join();
}
/// Initial congestion window, mirroring RFC 6928's 10 segments of
/// ~1460 bytes.
const INITIAL_CWND_BYTES: f64 = 14_600.0;
/// Per-connection slow-start state: effective rate is `cwnd ÷ RTT`,
/// and `cwnd` doubles each time a full window has been delivered,
/// until the rate reaches the link's cap. An approximation of real
/// TCP — no loss events, no congestion-avoidance phase — but it
/// reproduces the property that matters for install benchmarks:
/// per-connection throughput is a function of bytes already
/// delivered, so short transfers never reach the cap.
struct SlowStart {
cwnd: f64,
rtt_secs: f64,
bytes_in_round: f64,
}
impl SlowStart {
/// `Some` ramp when the profile asks for one and has the RTT +
/// cap the model needs; `None` keeps the flat-rate behavior.
fn for_profile(profile: &LinkProfile) -> Option<SlowStart> {
let rtt_secs = profile.one_way.as_secs_f64() * 2.0;
(profile.slow_start && rtt_secs > 0.0 && profile.rate_limit.is_some())
.then_some(SlowStart { cwnd: INITIAL_CWND_BYTES, rtt_secs, bytes_in_round: 0.0 })
}
/// The rate (bytes/sec) at which the next `len`-byte chunk
/// serializes, advancing the window-growth bookkeeping.
fn effective_rate(&mut self, cap: f64, len: usize) -> f64 {
let rate = (self.cwnd / self.rtt_secs).min(cap);
self.bytes_in_round += len as f64;
if self.bytes_in_round >= self.cwnd {
self.bytes_in_round = 0.0;
// Stop growing once the window sustains the cap
// (`cap × RTT` is the bandwidth-delay product).
self.cwnd = (self.cwnd * 2.0).min((cap * self.rtt_secs).max(INITIAL_CWND_BYTES));
}
rate
}
}
#[cfg(test)]
mod tests;

View File

@@ -22,7 +22,7 @@ fn binds_to_requested_listen_addr() {
let listen = reserved.local_addr().expect("listen addr");
drop(reserved);
let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None };
let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None, slow_start: false };
let proxy = LatencyProxy::spawn_on(listen, upstream_addr, profile).expect("spawn proxy");
assert_eq!(proxy.addr, listen);
@@ -56,7 +56,8 @@ fn injects_round_trip_latency() {
socket.write_all(b"pong").expect("write reply");
});
let profile = LinkProfile { one_way: Duration::from_millis(60), rate_limit: None };
let profile =
LinkProfile { one_way: Duration::from_millis(60), rate_limit: None, slow_start: false };
let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy");
let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy");
@@ -95,7 +96,8 @@ fn caps_throughput_to_the_rate_limit() {
// No latency, only a bandwidth cap, so the wall time is the
// serialization delay alone.
let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: Some(RATE) };
let profile =
LinkProfile { one_way: Duration::ZERO, rate_limit: Some(RATE), slow_start: false };
let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy");
let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy");
@@ -121,3 +123,55 @@ fn caps_throughput_to_the_rate_limit() {
"transfer {elapsed:?} should reflect the ~0.26s bandwidth cap",
);
}
/// With slow start the same transfer takes the ramp-up rounds a real
/// TCP connection would: the early windows serialize at `cwnd ÷ RTT`,
/// far below the cap, so the wall time exceeds the flat-rate model's
/// `size ÷ cap` by several round trips.
#[test]
fn slow_start_ramps_per_connection_throughput() {
const PAYLOAD: usize = 256 * 1024;
const RATE: u64 = 10_000_000; // 10 MB/s cap
let timed_transfer = |slow_start: bool| {
let upstream = TcpListener::bind(("127.0.0.1", 0)).expect("bind upstream");
let upstream_addr = upstream.local_addr().expect("upstream addr");
thread::spawn(move || {
let (mut socket, _) = upstream.accept().expect("accept");
let mut buf = [0u8; 64];
let _ = socket.read(&mut buf).expect("read request");
socket.write_all(&vec![0u8; PAYLOAD]).expect("write payload");
});
let profile =
LinkProfile { one_way: Duration::from_millis(20), rate_limit: Some(RATE), slow_start };
let proxy = LatencyProxy::spawn(upstream_addr, profile).expect("spawn proxy");
let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy");
let start = Instant::now();
client.write_all(b"go").expect("send request");
let mut received = 0;
let mut buf = [0u8; 16 * 1024];
loop {
match client.read(&mut buf) {
Ok(0) => break,
Ok(n) => received += n,
Err(_) => break,
}
}
assert_eq!(received, PAYLOAD, "the whole payload should arrive");
start.elapsed()
};
let flat = timed_transfer(false);
let ramped = timed_transfer(true);
dbg!(flat, ramped);
// Flat: ~2×20ms latency + 256KiB/10MB/s ≈ 66 ms. Ramped: the first
// windows (14.6 KB and doubling) each serialize at cwnd/RTT, adding ~3-4
// window-times before the rate approaches the cap. Require a solid
// margin rather than exact math to stay robust under CI jitter.
assert!(
ramped > flat + Duration::from_millis(60),
"slow start should add ramp-up time: flat {flat:?} vs ramped {ramped:?}",
);
}

View File

@@ -33,6 +33,7 @@ async fn main() {
registry_latency_ms,
pnpr_server_registry_latency_ms,
registry_bandwidth_mbps,
registry_slow_start,
reuse_prebuilt_binaries,
build_only,
targets,
@@ -102,6 +103,7 @@ async fn main() {
let profile = LinkProfile {
one_way: Duration::from_millis(registry_latency_ms) / 2,
rate_limit: registry_rate_limit,
slow_start: registry_slow_start,
};
let proxy =
LatencyProxy::spawn_on(listen, upstream, profile).expect("spawn registry proxy");
@@ -169,6 +171,7 @@ async fn main() {
registry_latency_ms,
pnpr_server_registry_latency_ms,
registry_bandwidth_mbps,
registry_slow_start,
registry_port: spawned_registry_port,
reuse_prebuilt_binaries,
};

View File

@@ -68,6 +68,7 @@ pub struct WorkEnv {
/// of being free on loopback. `0` leaves the registry at loopback
/// speed. Ignored in `--registry=npm` mode (already remote).
pub registry_bandwidth_mbps: f64,
pub registry_slow_start: bool,
/// Port the local registry listens on, used as the proxy's upstream
/// when latency or a bandwidth cap is requested.
pub registry_port: u16,
@@ -549,6 +550,7 @@ impl WorkEnv {
let profile = LinkProfile {
one_way: Duration::from_millis(self.pnpr_latency_ms) / 2,
rate_limit: None,
slow_start: false,
};
let proxy = LatencyProxy::spawn(upstream, profile).expect("spawn pnpr latency proxy");
let proxy_url = format!("http://{}", proxy.addr);
@@ -648,6 +650,7 @@ impl WorkEnv {
let profile = LinkProfile {
one_way: Duration::from_millis(self.registry_latency_ms) / 2,
rate_limit,
slow_start: self.registry_slow_start,
};
let proxy = LatencyProxy::spawn(upstream, profile).expect("spawn registry proxy");
eprintln!(
@@ -675,6 +678,7 @@ impl WorkEnv {
let profile = LinkProfile {
one_way: Duration::from_millis(self.pnpr_server_registry_latency_ms) / 2,
rate_limit: None,
slow_start: false,
};
let proxy =
LatencyProxy::spawn(upstream, profile).expect("spawn pnpr server registry proxy");

View File

@@ -45,6 +45,7 @@ fn bench_tarball(criterion: &mut Criterion, server: &mut ServerGuard, fixtures_f
verified_files_cache: pacquet_store_dir::SharedVerifiedFilesCache::default(),
package_integrity: &package_integrity,
package_unpacked_size: Some(16697),
package_file_count: None,
package_url: url,
package_id: "fast-querystring@1.0.0",
requester: "",

View File

@@ -55,11 +55,15 @@ use axum::{
};
use indexmap::IndexMap;
use pacquet_config::Config as PacquetConfig;
use pacquet_lockfile::Lockfile;
use pacquet_lockfile::{Lockfile, LockfileResolution};
use pacquet_lockfile_verification::{collect_resolution_policy_violations, hash_lockfile};
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_package_manager::build_resolution_verifiers;
use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, PackageMetaCache};
use pacquet_package_manager::{
ResolvedPackageHint, build_resolution_verifiers, tarball_url_and_integrity,
};
use pacquet_resolving_npm_resolver::{
InMemoryPackageMetaCache, ObservedDistStats, PackageMetaCache, observed_dist_stats_sink,
};
use pacquet_resolving_resolver_base::ResolutionVerifier;
use pacquet_store_dir::StoreDir;
use sha2::{Digest, Sha256};
@@ -224,23 +228,32 @@ pub(crate) async fn handle_resolve(runtime: &Resolver, body: Bytes) -> Response
// opt-out (mirrors the local path's `--trust-lockfile`). Freshly-
// resolved entries are held to the same policy by the resolver's
// pick-time gate (the policy is wired into `config`).
let mut verified_dist_stats = None;
if !request.trust_lockfile
&& let Some(input_lockfile) = request.lockfile.as_ref()
&& let Err(failure) =
verify_input_lockfile(runtime, config, &request_auth, input_lockfile).await
{
return match failure {
VerifyFailure::Internal(response) => response,
VerifyFailure::Violations(violations) => {
ndjson_single_frame(&violations_frame(&violations))
match verify_input_lockfile(runtime, config, &request_auth, input_lockfile).await {
Ok(stats) => verified_dist_stats = stats,
Err(VerifyFailure::Internal(response)) => return response,
Err(VerifyFailure::Violations(violations)) => {
return ndjson_single_frame(&violations_frame(&violations));
}
};
}
}
// Short-circuit paths that produce the whole lockfile without an
// incremental tree walk: nothing to stream, so emit only `done`.
// incremental tree walk. A verified frozen lockfile still announces
// its tarballs as `package` frames when the verification fan-out
// just fetched their metadata — the sizes let the client start the
// largest downloads first. On a verdict-cache hit no metadata was
// fetched, so there's nothing to add and the response is the bare
// `done` frame.
if let Some(lockfile) = resolve::fresh_frozen_input_lockfile(config, &request) {
return ndjson_single_frame(&done_frame(&lockfile));
let mut frames = verified_dist_stats
.map(|sizes| frozen_package_frames(config, &lockfile, &sizes))
.unwrap_or_default();
frames.push(done_frame(&lockfile));
return ndjson_frames(&frames);
}
let resolution_cache_key = if request.auth_headers.is_empty() && request.lockfile.is_none() {
resolution_cache_key(config, &request)
@@ -376,20 +389,92 @@ struct StreamObserver {
impl pacquet_package_manager::ResolutionObserver for StreamObserver {
fn on_resolved(&self, hint: pacquet_package_manager::ResolvedPackageHint<'_>) {
let frame = serde_json::json!({
"type": "package",
"id": hint.id,
"name": hint.name,
"version": hint.version,
"integrity": hint.integrity,
"tarball": hint.tarball_url,
});
if let Ok(line) = ndjson_line(&frame) {
if let Ok(line) = ndjson_line(&package_frame(&hint)) {
let _ = self.tx.send(line);
}
}
}
/// One `package` NDJSON frame. `unpackedSize` is omitted (not null)
/// when the registry never published a `dist.unpackedSize`, so older
/// clients parse the frame unchanged.
fn package_frame(hint: &ResolvedPackageHint<'_>) -> serde_json::Value {
let mut frame = serde_json::json!({
"type": "package",
"id": hint.id,
"name": hint.name,
"version": hint.version,
"integrity": hint.integrity,
"tarball": hint.tarball_url,
});
if let Some(size) = hint.unpacked_size {
frame["unpackedSize"] = serde_json::Value::from(size);
}
if let Some(count) = hint.file_count {
frame["fileCount"] = serde_json::Value::from(count);
}
frame
}
/// `package` frames for every tarball-fetchable entry of a verified
/// frozen lockfile, deduplicated by tarball URL. Mirrors what the
/// streaming resolve's [`StreamObserver`] would have announced had the
/// tree walk run: the client prefetches each tarball on arrival, with
/// `unpackedSize` (from the verification fan-out's metadata, when the
/// registry published one) prioritizing the largest downloads.
///
/// Tarball URLs are derived with the same
/// [`tarball_url_and_integrity`] the client's frozen materialization
/// uses, so the announced URLs match the client's mem-cache keys
/// byte-for-byte. Non-tarball resolutions (git, directory, binary,
/// variations) are skipped — the client fetches those through their
/// own protocol paths.
fn frozen_package_frames(
config: &PacquetConfig,
lockfile: &Lockfile,
dist_stats: &ObservedDistStats,
) -> Vec<Vec<u8>> {
let Some(packages) = lockfile.packages.as_ref() else {
return Vec::new();
};
let mut seen_urls = std::collections::HashSet::new();
let mut frames = Vec::new();
for (package_key, snapshot) in packages {
if !matches!(
snapshot.resolution,
LockfileResolution::Registry(_) | LockfileResolution::Tarball(_),
) {
continue;
}
let Ok((tarball_url, integrity)) =
tarball_url_and_integrity(&snapshot.resolution, package_key, config)
else {
continue;
};
if !seen_urls.insert(tarball_url.to_string()) {
continue;
}
let name = package_key.name.to_string();
let version = package_key.suffix.version().to_string();
let id = format!("{name}@{version}");
let integrity = integrity.to_string();
let stats = dist_stats.get(&(name.clone(), version.clone())).map(|entry| *entry.value());
let frame = package_frame(&ResolvedPackageHint {
id: &id,
name: &name,
version: &version,
integrity: &integrity,
tarball_url: &tarball_url,
unpacked_size: stats.and_then(|stats| stats.unpacked_size),
file_count: stats.and_then(|stats| stats.file_count),
});
if let Ok(line) = ndjson_line(&frame) {
frames.push(line);
}
}
frames
}
/// Terminal `done` frame: the full resolved lockfile + stats. The client
/// writes the lockfile and fetches every tarball itself.
fn done_frame(lockfile: &Lockfile) -> Vec<u8> {
@@ -441,6 +526,17 @@ fn ndjson_single_frame(frame: &[u8]) -> Response {
.expect("binary response is always valid")
}
/// A 200 NDJSON response carrying several already-serialized frames in
/// one fixed body. Used by the frozen fast path, where every frame is
/// known up front — no channel to stream from.
fn ndjson_frames(frames: &[Vec<u8>]) -> Response {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, NDJSON_CONTENT_TYPE)
.body(Body::from(frames.concat()))
.expect("binary response is always valid")
}
/// A 200 NDJSON response whose body drains the frame channel as the
/// detached resolve task produces frames. Closing the channel (the task
/// dropped its sender) ends the body.
@@ -465,25 +561,29 @@ enum VerifyFailure {
}
/// Verify the client's input lockfile under the client's policy. On a
/// clean pass returns `Ok(())`; on a policy violation returns the
/// rendered violations so the caller can deliver them to the client. A
/// build-verifiers failure (e.g. an invalid exclude pattern) returns a
/// ready-made 500.
/// clean pass returns the [`ObservedDistStats`] the verifier
/// collected — `None` when the whole-lockfile verdict cache satisfied
/// the check without a fan-out (no metadata was fetched, so no sizes
/// exist). On a policy violation returns the rendered violations so
/// the caller can deliver them to the client. A build-verifiers
/// failure (e.g. an invalid exclude pattern) returns a ready-made 500.
async fn verify_input_lockfile(
runtime: &Resolver,
config: &'static PacquetConfig,
auth_headers: &Arc<AuthHeaders>,
lockfile: &Lockfile,
) -> Result<(), VerifyFailure> {
) -> Result<Option<ObservedDistStats>, VerifyFailure> {
// A fresh per-request packument cache shared with the verifier; the
// on-disk metadata mirror under `<cache_dir>/v11/metadata-full` is
// warm across requests and is the real verification cache.
let meta_cache = Arc::new(InMemoryPackageMetaCache::default());
let dist_stats = observed_dist_stats_sink();
let verifiers = build_resolution_verifiers(
config,
Arc::clone(&runtime.client),
Some(meta_cache as Arc<dyn PackageMetaCache>),
Some(Arc::clone(auth_headers)),
Some(Arc::clone(&dist_stats)),
)
.map_err(|err| {
VerifyFailure::Internal(json_error(StatusCode::INTERNAL_SERVER_ERROR, &err.to_string()))
@@ -499,7 +599,7 @@ async fn verify_input_lockfile(
verifiers.iter().all(|verifier| verifier.can_trust_past_check(policy))
})
{
return Ok(());
return Ok(None);
}
let violations = collect_resolution_policy_violations(lockfile, &verifiers, None).await;
@@ -507,7 +607,7 @@ async fn verify_input_lockfile(
if let Some(cache) = runtime.verdict_cache.as_ref() {
cache.record(&hash, &merge_policies(&verifiers));
}
return Ok(());
return Ok(Some(dist_stats));
}
let rendered: Vec<serde_json::Value> = violations

View File

@@ -60,3 +60,77 @@ fn resolution_cache_key_changes_with_dependencies_and_policy() {
assert_ne!(base_key, resolution_cache_key(&config, &different_dep));
assert_ne!(base_key, resolution_cache_key(&config, &different_policy));
}
#[test]
fn a_package_frame_carries_unpacked_size_and_omits_it_when_unknown() {
use pacquet_package_manager::{ResolutionObserver, ResolvedPackageHint};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let observer = super::StreamObserver { tx };
let hint = |unpacked_size, file_count| ResolvedPackageHint {
id: "acme@1.0.0",
name: "acme",
version: "1.0.0",
integrity: "sha512-abc",
tarball_url: "https://r.test/acme/-/acme-1.0.0.tgz",
unpacked_size,
file_count,
};
observer.on_resolved(hint(Some(123_456), Some(42)));
observer.on_resolved(hint(None, None));
let sized: serde_json::Value =
serde_json::from_slice(&rx.try_recv().expect("sized frame sent")).unwrap();
assert_eq!(sized["unpackedSize"], serde_json::json!(123_456));
assert_eq!(sized["fileCount"], serde_json::json!(42));
let unsized_frame: serde_json::Value =
serde_json::from_slice(&rx.try_recv().expect("unsized frame sent")).unwrap();
dbg!(&unsized_frame);
assert!(unsized_frame.get("unpackedSize").is_none());
assert!(unsized_frame.get("fileCount").is_none());
assert_eq!(unsized_frame["tarball"], serde_json::json!("https://r.test/acme/-/acme-1.0.0.tgz"));
}
#[test]
fn frozen_package_frames_announce_lockfile_tarballs_with_sizes() {
use pacquet_lockfile::Lockfile;
use pacquet_resolving_npm_resolver::{DistStats, observed_dist_stats_sink};
let lockfile: Lockfile = serde_json::from_value(serde_json::json!({
"lockfileVersion": "9.0",
"importers": {
".": { "dependencies": { "acme": { "specifier": "^1.0.0", "version": "1.0.0" } } }
},
"packages": {
"acme@1.0.0": {
"resolution": { "integrity": "sha512-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==" }
},
"linked-dir@1.0.0": {
"resolution": { "type": "directory", "directory": "../linked-dir" }
}
}
}))
.expect("lockfile parses");
let stats = observed_dist_stats_sink();
stats.insert(
("acme".to_string(), "1.0.0".to_string()),
DistStats { unpacked_size: Some(123_456), file_count: Some(42) },
);
let frames = super::frozen_package_frames(&config(), &lockfile, &stats);
dbg!(frames.len());
assert_eq!(frames.len(), 1);
let frame: serde_json::Value = serde_json::from_slice(&frames[0]).unwrap();
assert_eq!(frame["type"], serde_json::json!("package"));
assert_eq!(frame["id"], serde_json::json!("acme@1.0.0"));
assert_eq!(
frame["tarball"],
serde_json::json!("https://registry.example.test/acme/-/acme-1.0.0.tgz"),
);
assert_eq!(frame["unpackedSize"], serde_json::json!(123_456));
assert_eq!(frame["fileCount"], serde_json::json!(42));
}

View File

@@ -331,11 +331,6 @@ pub fn abbreviate_packument(packument: &Value, now: DateTime<Utc>) -> Value {
/// * `npm-signature` — the legacy PGP detached signature. npm stopped
/// populating it years ago in favour of the ECDSA `signatures`, and
/// nothing in pnpm or pacquet reads it.
/// * `fileCount` — read nowhere in pnpm or pacquet.
/// * `unpackedSize` — only `pnpm view` reads it, and that command
/// fetches the *full* metadata document (`fullMetadata: true`) which
/// pnpr serves unstripped, so dropping it from the abbreviated form
/// is safe.
/// * `shasum` — the legacy sha1 hash, redundant once `integrity` (SRI)
/// is present. "Present" mirrors pnpm's `getIntegrity` truthiness
/// check (`if (dist.integrity)`): a non-empty string. An absent,
@@ -346,13 +341,17 @@ pub fn abbreviate_packument(packument: &Value, now: DateTime<Utc>) -> Value {
/// preserved: it binds `name@version:integrity` to the upstream
/// registry's key and is the input to a potential client-side
/// install-time verification on the pnpr path.
///
/// `unpackedSize` and `fileCount` are preserved: pacquet reads both
/// off the resolver-fetched manifest — `unpackedSize` sizes the
/// decompression buffer, and together they form the download's
/// queueing priority (the estimated pipeline work that starts the
/// most expensive tarballs first).
fn trim_dist_fields(version: &mut serde_json::Map<String, Value>) {
let Some(dist) = version.get_mut("dist").and_then(Value::as_object_mut) else {
return;
};
dist.remove("npm-signature");
dist.remove("fileCount");
dist.remove("unpackedSize");
if dist.get("integrity").and_then(Value::as_str).is_some_and(|integrity| !integrity.is_empty())
{
dist.remove("shasum");

View File

@@ -338,12 +338,13 @@ fn abbreviation_drops_fields_the_resolver_ignores() {
// `shasum` dropped because `integrity` is present.
assert_eq!(version["dist"]["integrity"], "sha512-abc");
assert!(version["dist"].get("shasum").is_none());
// Legacy PGP signature and unused size fields dropped; ECDSA
// registry signatures kept.
// Legacy PGP signature dropped; ECDSA registry signatures kept.
assert!(version["dist"].get("npm-signature").is_none());
assert!(version["dist"].get("fileCount").is_none());
assert!(version["dist"].get("unpackedSize").is_none());
assert_eq!(version["dist"]["signatures"][0]["keyid"], "SHA256:xyz");
// Size hints kept: pacquet reads both for decompression
// preallocation and download scheduling.
assert_eq!(version["dist"]["fileCount"], 12);
assert_eq!(version["dist"]["unpackedSize"], 34567);
}
#[test]