perf(pacquet/resolving-npm-resolver): dedup concurrent packument fetches

Pacquet was firing N concurrent HTTP GETs for the same packument
whenever cross-referencing deps reached `pick_package` from
different points in the resolve tree (e.g. every `react-*` dep
racing for `react`). With no per-key serializer between the
in-memory cache miss check and the network fetch, all N concurrent
callers fell through to step 5 in parallel, queued behind the 16-64
slot `ThrottledClient` semaphore, and re-fetched the same packument
N times.

Upstream pnpm avoids this with `runLimited(pkgMirror, …)` — a
`pLimit(1)` keyed on the on-disk mirror path that wraps the entire
post-cache-miss flow. Only the first caller for a given packument
hits the network; the rest wait on the limit, then short-circuit on
the in-memory cache the winner just populated. See
`resolving/npm-resolver/src/pickPackage.ts:42-64`.

This change ports the same shape:

- `PackumentFetchLocker = Arc<DashMap<String, Arc<Semaphore>>>`
  threaded through `PickPackageContext::fetch_locker`. Constructed
  via `shared_packument_fetch_locker()` once per install and shared
  between `NpmResolver` and `NamedRegistryResolver` so concurrent
  picks across the two resolvers also coalesce.
- After step 1's in-memory cache miss check, `pick_package`
  acquires a single-permit semaphore keyed on the same string the
  cache uses (`{registry}\x00{name}[:full]`). After acquiring,
  re-checks the in-memory cache so a duplicate caller short-circuits
  on the winner's `meta_cache.set` instead of re-fetching.
- `handle_cache_hit` extracts the shared hit-then-upgrade-and-persist
  logic so the pre-permit and post-permit paths can't drift.

Test: `concurrent_picks_for_same_key_share_one_network_fetch` spawns
20 concurrent picks against a `mock.expect(1)` server. Without the
dedup, 20 GETs reach the registry; with it, exactly 1.

Closes #11832 (with the prefetch in the previous commit).
This commit is contained in:
Zoltan Kochan
2026-05-21 21:48:59 +02:00
parent f375c9161e
commit 386a90b5c1
12 changed files with 281 additions and 30 deletions

1
Cargo.lock generated
View File

@@ -2650,6 +2650,7 @@ dependencies = [
"chrono",
"dashmap",
"derive_more",
"futures-util",
"indexmap",
"miette 7.6.0",
"mockito",

View File

@@ -4,7 +4,9 @@ use pacquet_lockfile::{LockfileResolution, TarballResolution};
use pacquet_network::ThrottledClient;
use pacquet_registry_mock::AutoMockInstance;
use pacquet_reporter::{LogEvent, ProgressMessage, Reporter, SilentReporter};
use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, NpmResolver};
use pacquet_resolving_npm_resolver::{
InMemoryPackageMetaCache, NpmResolver, shared_packument_fetch_locker,
};
use pacquet_resolving_resolver_base::{ResolveOptions, ResolveResult, Resolver, WantedDependency};
use pacquet_store_dir::{SharedVerifiedFilesCache, StoreDir};
use pipe_trait::Pipe;
@@ -97,6 +99,7 @@ async fn resolve_via_mock(
http_client,
auth_headers: Default::default(),
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
fetch_locker: shared_packument_fetch_locker(),
cache_dir: Some(cache_dir.to_path_buf()),
offline: false,
prefer_offline: false,

View File

@@ -30,7 +30,7 @@ use pacquet_resolving_local_resolver::{
};
use pacquet_resolving_npm_resolver::{
InMemoryPackageMetaCache, MergeNamedRegistriesError, NamedRegistryResolver, NpmResolver,
merge_named_registries,
merge_named_registries, shared_packument_fetch_locker,
};
use pacquet_resolving_resolver_base::{
LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, Resolver, WantedDependency,
@@ -289,12 +289,21 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
let meta_cache = Arc::new(InMemoryPackageMetaCache::default());
// One per-cache-key packument fetch serializer shared between
// the npm and named-registry resolvers. Ports upstream's
// [`metafileOperationLimits`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L42-L44):
// concurrent picks for the same `(registry, name)` coalesce
// into a single network fetch instead of firing N parallel
// HTTP GETs queued behind the `ThrottledClient` semaphore.
let fetch_locker = shared_packument_fetch_locker();
let npm_resolver: Arc<dyn Resolver> = Arc::new(NpmResolver {
registries,
named_registries: merged_named_registries.clone(),
http_client: Arc::clone(&http_client_arc),
auth_headers: Arc::clone(&config.auth_headers),
meta_cache: Arc::clone(&meta_cache),
fetch_locker: Arc::clone(&fetch_locker),
cache_dir: Some(config.cache_dir.clone()),
offline: config.offline,
prefer_offline: config.prefer_offline,
@@ -334,6 +343,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
http_client: Arc::clone(&http_client_arc),
auth_headers: Arc::clone(&config.auth_headers),
meta_cache: Arc::clone(&meta_cache),
fetch_locker: Arc::clone(&fetch_locker),
cache_dir: Some(config.cache_dir.clone()),
offline: config.offline,
prefer_offline: config.prefer_offline,

View File

@@ -38,6 +38,7 @@ tracing = { workspace = true }
pacquet-resolving-default-resolver = { workspace = true }
pacquet-resolving-local-resolver = { workspace = true }
futures-util = { workspace = true }
mockito = { workspace = true }
pretty_assertions = { workspace = true }
ssri = { workspace = true }

View File

@@ -57,9 +57,9 @@ pub use parse_bare_specifier::{
parse_named_registry_specifier_to_registry_package_spec,
};
pub use pick_package::{
InMemoryPackageMetaCache, MirrorPersistError, PackageMetaCache, PickPackageContext,
PickPackageError, PickPackageOptions, PickPackageResult, persist_meta_to_mirror, pick_package,
shared_in_memory_cache,
InMemoryPackageMetaCache, MirrorPersistError, PackageMetaCache, PackumentFetchLocker,
PickPackageContext, PickPackageError, PickPackageOptions, PickPackageResult,
persist_meta_to_mirror, pick_package, shared_in_memory_cache, shared_packument_fetch_locker,
};
pub use pick_package_from_meta::{
PickPackageFromMetaError, PickPackageFromMetaOptions, PickVersionByVersionRangeOptions,

View File

@@ -69,6 +69,11 @@ pub struct NamedRegistryResolver<Cache: PackageMetaCache> {
pub http_client: Arc<ThrottledClient>,
pub auth_headers: Arc<AuthHeaders>,
pub meta_cache: Arc<Cache>,
/// Shared per-cache-key packument fetch serializer. See
/// [`crate::PackumentFetchLocker`]. Same handle as the sibling
/// [`crate::NpmResolver`] so concurrent picks for the same
/// `(registry, name)` across resolvers coalesce.
pub fetch_locker: crate::PackumentFetchLocker,
pub cache_dir: Option<PathBuf>,
pub offline: bool,
pub prefer_offline: bool,
@@ -200,6 +205,7 @@ impl<Cache: PackageMetaCache + 'static> NamedRegistryResolver<Cache> {
http_client: &self.http_client,
auth_headers: &self.auth_headers,
meta_cache: self.meta_cache.as_ref(),
fetch_locker: &self.fetch_locker,
cache_dir: self.cache_dir.as_deref(),
offline: self.offline,
prefer_offline: self.prefer_offline,

View File

@@ -10,7 +10,8 @@ use pretty_assertions::assert_eq;
use tempfile::TempDir;
use crate::{
NamedRegistryResolver, merge_named_registries, pick_package::InMemoryPackageMetaCache,
NamedRegistryResolver, merge_named_registries,
pick_package::{InMemoryPackageMetaCache, shared_packument_fetch_locker},
};
/// Packument for `@acme/private` served under a named registry —
@@ -68,6 +69,7 @@ fn build_resolver(
http_client: Arc::new(ThrottledClient::default()),
auth_headers: Arc::new(AuthHeaders::default()),
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
fetch_locker: shared_packument_fetch_locker(),
cache_dir: Some(cache_dir.path().to_path_buf()),
offline: false,
prefer_offline: false,

View File

@@ -93,6 +93,12 @@ pub struct NpmResolver<Cache: PackageMetaCache> {
pub http_client: Arc<ThrottledClient>,
pub auth_headers: Arc<AuthHeaders>,
pub meta_cache: Arc<Cache>,
/// Per-cache-key packument fetch serializer. Shared across this
/// resolver and the sibling [`crate::NamedRegistryResolver`] so
/// concurrent picks for the same `(registry, name)` coalesce
/// into one network fetch. Construct via
/// [`crate::shared_packument_fetch_locker`] once per install.
pub fetch_locker: crate::PackumentFetchLocker,
/// Root of the on-disk metadata mirror. `None` disables every
/// disk read/write — the picker goes straight to the network on
/// each cache miss.
@@ -293,6 +299,7 @@ impl<Cache: PackageMetaCache + 'static> NpmResolver<Cache> {
http_client: &self.http_client,
auth_headers: &self.auth_headers,
meta_cache: self.meta_cache.as_ref(),
fetch_locker: &self.fetch_locker,
cache_dir: self.cache_dir.as_deref(),
offline: self.offline,
prefer_offline: self.prefer_offline,

View File

@@ -10,7 +10,8 @@ use pretty_assertions::assert_eq;
use tempfile::TempDir;
use crate::{
npm_resolver::NpmResolver, pick_package::InMemoryPackageMetaCache,
npm_resolver::NpmResolver,
pick_package::{InMemoryPackageMetaCache, shared_packument_fetch_locker},
violation_codes::MINIMUM_RELEASE_AGE_VIOLATION_CODE,
};
@@ -60,6 +61,7 @@ fn build_resolver_with_registries(
http_client: Arc::new(ThrottledClient::default()),
auth_headers: Arc::new(AuthHeaders::default()),
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
fetch_locker: shared_packument_fetch_locker(),
cache_dir: Some(cache_dir.path().to_path_buf()),
offline: false,
prefer_offline: false,

View File

@@ -30,13 +30,25 @@
//! silently degrading to its warn-and-skip fallback. Ports upstream's
//! [`maybeUpgradeAbbreviatedMetaForReleaseAge`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L450-L501).
//!
//! Concurrency: upstream uses `p-limit(1)` keyed on the mirror path
//! to serialize disk operations. Pacquet relies on the atomic
//! rename in [`crate::mirror::save_meta`] for write safety, and on
//! [`std::sync::Mutex`]-guarded in-memory caches for reader
//! coordination. The per-mirror limiter is omitted; if a future
//! issue forces serialization (Windows file-lock contention, e.g.)
//! it would land here as a map of `tokio::sync::Mutex` values.
//! Concurrency: upstream's
//! [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52)
//! wraps the post-cache-miss flow in a `pLimit(1)` keyed by the
//! mirror path so concurrent picks for the same package coalesce
//! into a single network fetch — the rest wait, then re-check the
//! in-memory cache that the winner just populated and short-circuit
//! without hitting the registry. Pacquet ports this via
//! [`PackumentFetchLocker`], a [`DashMap<String, Arc<Semaphore>>`]
//! threaded through [`PickPackageContext::fetch_locker`]: the first
//! caller for a given cache key acquires the per-key permit and
//! does the disk + network work; subsequent callers wait on the
//! permit and (per pnpm's runLimited semantics) re-check
//! [`PackageMetaCache`] after acquiring so the winner's
//! [`PackageMetaCache::set`] short-circuits the rest. Without this,
//! pacquet was firing N concurrent HTTP GETs for the same packument
//! per cluster of cross-referencing deps, queued behind the
//! `ThrottledClient` semaphore — multiplying packument-fetch
//! wall-clock by the dedup factor and putting the resolve walk
//! 3-5× behind pnpm on the `alotta-files` benchmark.
use std::{
collections::HashMap,
@@ -45,12 +57,14 @@ use std::{
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use derive_more::{Display, Error};
use miette::Diagnostic;
use pacquet_config::version_policy::PackageVersionPolicy;
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_registry::{Package, PackageVersion};
use pacquet_resolving_resolver_base::VersionSelectors;
use tokio::sync::Semaphore;
use crate::{
FetchFullMetadataCachedOptions, FetchFullMetadataOptions, FetchMetadataError,
@@ -86,6 +100,36 @@ pub trait PackageMetaCache: Send + Sync {
fn set(&self, key: String, meta: Package);
}
/// Per-`(registry, package_name)` fetch serializer. Mirrors
/// upstream's [`metafileOperationLimits`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L42-L44)
/// — a map of `pLimit(1)` instances keyed on the on-disk mirror
/// path, used by [`runLimited`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
/// to coalesce concurrent picks for the same packument.
///
/// Pacquet stores one [`tokio::sync::Semaphore`] (single-permit) per
/// in-memory cache key; first caller acquires, runs the
/// disk-then-network flow, releases. Subsequent callers wait on the
/// same semaphore; after acquiring, they re-check
/// [`PackageMetaCache`] and short-circuit on a hit so only the first
/// caller hits the network.
///
/// Shared across every [`PickPackageContext`] in a single install so
/// the npm and named-registry resolvers coalesce against the same
/// in-flight set. Keying is on the same string [`PackageMetaCache`]
/// uses (`{registry}\x00{name}` for abbreviated,
/// `{registry}\x00{name}:full` for full), so two callers asking for
/// different forms of the same packument don't accidentally serialize
/// — pnpm scopes its `runLimited` the same way (per `pkgMirror`,
/// which embeds the `metaDir` differentiator).
pub type PackumentFetchLocker = Arc<DashMap<String, Arc<Semaphore>>>;
/// Construct a fresh [`PackumentFetchLocker`] for a new install.
/// Equivalent to `Default::default()`; named for symmetry with
/// [`shared_in_memory_cache`].
pub fn shared_packument_fetch_locker() -> PackumentFetchLocker {
Arc::new(DashMap::new())
}
/// Default thread-safe [`PackageMetaCache`] backed by a [`Mutex`]
/// guarding a [`HashMap`]. A consumer that already has its own
/// shared map can implement the trait directly instead of using
@@ -118,6 +162,12 @@ pub struct PickPackageContext<'a, Cache: PackageMetaCache> {
pub http_client: &'a ThrottledClient,
pub auth_headers: &'a AuthHeaders,
pub meta_cache: &'a Cache,
/// Per-cache-key fetch serializer. See [`PackumentFetchLocker`]
/// for the rationale. Construct once per install via
/// [`shared_packument_fetch_locker`] and thread the same handle
/// through every [`PickPackageContext`] so the npm and named-
/// registry resolvers coalesce against the same in-flight set.
pub fetch_locker: &'a PackumentFetchLocker,
/// Root of the on-disk metadata mirror. `None` disables every
/// disk path — the orchestrator goes straight to the network.
pub cache_dir: Option<&'a Path>,
@@ -324,21 +374,59 @@ pub async fn pick_package<Cache: PackageMetaCache>(
// 1. In-memory cache.
if let Some(cached) = ctx.meta_cache.get(&cache_key) {
let upgrade =
maybe_upgrade_abbreviated_meta_for_release_age(ctx, spec, opts, full_metadata, cached)
.await?;
let meta = upgrade.meta;
if upgrade.upgraded && !opts.dry_run {
// Persist so a fresh process doesn't re-trigger the
// upgrade fetch on its next install. Matches upstream's
// [`persistUpgradedMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L507-L524).
if let Some(path) = pkg_mirror.as_deref() {
persist_upgraded_to_mirror(path, &meta);
}
ctx.meta_cache.set(cache_key.clone(), meta.clone());
}
let picked = pick_matching_version_final(&picker_opts, spec, &meta)?;
return Ok(PickPackageResult { meta, picked_package: picked });
return handle_cache_hit(
ctx,
spec,
opts,
&picker_opts,
full_metadata,
&cache_key,
pkg_mirror.as_deref(),
cached,
)
.await;
}
// Per-cache-key fetch serializer. Mirrors upstream's
// [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
// pLimit(1): concurrent picks for the same packument coalesce
// into a single network fetch. The first caller for `cache_key`
// acquires the permit and runs steps 2-5; the rest park here
// and, after acquiring, re-check the in-memory cache so the
// winner's [`PackageMetaCache::set`] short-circuits them
// without re-fetching. Without this, `try_join_all` over the
// resolved tree fires N concurrent HTTP GETs per shared
// packument (e.g. every `react-*` dep racing for `react`), each
// queued behind the [`ThrottledClient`] semaphore — the
// 3-5× resolve-walk gap the
// [`alotta-files` benchmark]([../../../../../pnpm.io/benchmarks/results/pnpm12])
// surfaced.
let limit = {
let entry = ctx
.fetch_locker
.entry(cache_key.clone())
.or_insert_with(|| Arc::new(Semaphore::new(1)));
Arc::clone(entry.value())
};
let _permit = limit.acquire().await.expect("packument fetch semaphore should not be closed");
// Re-check in-memory cache after acquiring the permit — the
// previous permit holder may have just populated it. Without
// this re-check, every duplicate caller would still fall
// through to the disk + network path even though they were
// waiting precisely for the winner's fetch to complete.
if let Some(cached) = ctx.meta_cache.get(&cache_key) {
return handle_cache_hit(
ctx,
spec,
opts,
&picker_opts,
full_metadata,
&cache_key,
pkg_mirror.as_deref(),
cached,
)
.await;
}
let mut meta_cached_in_store: Option<Package> = None;
@@ -494,6 +582,46 @@ pub async fn pick_package<Cache: PackageMetaCache>(
Ok(PickPackageResult { meta, picked_package: picked })
}
/// Shared cache-hit path. Invoked once on the optimistic pre-permit
/// check and once after the per-key permit is acquired (the re-check
/// that lets duplicate concurrent callers short-circuit without
/// re-fetching). Extracting it keeps the two call sites identical so
/// the upgrade-and-persist side-effects can't drift.
///
/// The argument list is wide because the helper consumes everything
/// the per-call frame already computed (cache key, derived
/// `full_metadata`, pre-resolved mirror path, picker options).
/// Bundling these into a struct would just shuffle the same fields
/// into a wrapper without removing any work; allowing the lint is
/// the lower-noise option.
#[allow(clippy::too_many_arguments)]
async fn handle_cache_hit<Cache: PackageMetaCache>(
ctx: &PickPackageContext<'_, Cache>,
spec: &RegistryPackageSpec,
opts: &PickPackageOptions<'_>,
picker_opts: &PickerOpts<'_>,
full_metadata: bool,
cache_key: &str,
pkg_mirror: Option<&Path>,
cached: Package,
) -> Result<PickPackageResult, PickPackageError> {
let upgrade =
maybe_upgrade_abbreviated_meta_for_release_age(ctx, spec, opts, full_metadata, cached)
.await?;
let meta = upgrade.meta;
if upgrade.upgraded && !opts.dry_run {
// Persist so a fresh process doesn't re-trigger the upgrade
// fetch on its next install. Matches upstream's
// [`persistUpgradedMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L507-L524).
if let Some(path) = pkg_mirror {
persist_upgraded_to_mirror(path, &meta);
}
ctx.meta_cache.set(cache_key.to_string(), meta.clone());
}
let picked = pick_matching_version_final(picker_opts, spec, &meta)?;
Ok(PickPackageResult { meta, picked_package: picked })
}
/// Internal mirror of upstream's
/// [`PickerOptions`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L75-L79).
/// Same fields as [`PickPackageOptions`] minus the dispatcher-only

View File

@@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
use super::{
InMemoryPackageMetaCache, PackageMetaCache, PickPackageContext, PickPackageError,
PickPackageOptions, persist_meta_to_mirror, pick_package,
PickPackageOptions, persist_meta_to_mirror, pick_package, shared_packument_fetch_locker,
};
use crate::{
mirror::{ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta},
@@ -93,10 +93,12 @@ async fn cold_pick_fetches_and_picks_max_in_range() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -132,6 +134,7 @@ async fn warm_in_memory_cache_skips_network() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let preloaded: pacquet_registry::Package =
serde_json::from_str(PACKAGE_BODY).expect("parse packument");
@@ -143,6 +146,7 @@ async fn warm_in_memory_cache_skips_network() {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -174,10 +178,12 @@ async fn offline_with_mirror_picks_from_disk() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: true,
prefer_offline: false,
@@ -202,10 +208,12 @@ async fn offline_without_mirror_errors() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: true,
prefer_offline: false,
@@ -236,10 +244,12 @@ async fn version_spec_with_mirror_takes_fast_path() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -298,10 +308,12 @@ async fn version_spec_missing_in_mirror_fetches() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -336,10 +348,12 @@ async fn dry_run_skips_in_memory_cache() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -372,10 +386,12 @@ async fn pick_lowest_version_picks_min() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -453,10 +469,12 @@ async fn in_memory_cache_does_not_leak_across_registries() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -493,10 +511,12 @@ async fn invalid_package_name_errors_synchronously() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: None,
offline: false,
prefer_offline: false,
@@ -558,10 +578,12 @@ async fn default_pick_targets_abbreviated_endpoint_and_mirror() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -605,10 +627,12 @@ async fn optional_opt_forces_full_metadata_endpoint() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -659,10 +683,12 @@ async fn cache_key_separates_abbreviated_from_full() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -724,10 +750,12 @@ async fn published_by_triggers_upgrade_when_modified_after_cutoff() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -788,10 +816,12 @@ async fn published_by_skips_upgrade_when_modified_equals_cutoff() {
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
@@ -805,3 +835,62 @@ async fn published_by_skips_upgrade_when_modified_equals_cutoff() {
abbrev_mock.assert_async().await;
}
/// Concurrent `pick_package` calls for the same `(registry, name)`
/// coalesce into a single network fetch. Mirrors pnpm's
/// [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
/// behavior — without dedup, each duplicate caller would race past
/// the in-memory cache miss and fire its own GET, exhausting the
/// [`ThrottledClient`] semaphore and re-fetching the same packument
/// `N` times.
///
/// The mock asserts `expect(1)` — even though we spawn 20 concurrent
/// picks, exactly one GET reaches the registry. The other 19 wait
/// on the per-key permit and pick up the cached packument once the
/// winner returns.
#[tokio::test]
async fn concurrent_picks_for_same_key_share_one_network_fetch() {
let mut server = mockito::Server::new_async().await;
// `expect(1)` is the assertion: at most one GET reaches the
// registry for the 20-way concurrent fan-out below. Without the
// per-key serializer, all 20 would race past the empty in-memory
// cache and each fire its own GET.
let mock = server
.mock("GET", "/acme")
.with_status(200)
.with_header("etag", r#"W/"fresh""#)
.with_body(PACKAGE_BODY)
.expect(1)
.create_async()
.await;
let cache_dir = TempDir::new().expect("tempdir");
let registry = format!("{}/", server.url());
let http_client = ThrottledClient::default();
let auth_headers = AuthHeaders::default();
let meta_cache = InMemoryPackageMetaCache::default();
let fetch_locker = shared_packument_fetch_locker();
let ctx = PickPackageContext {
http_client: &http_client,
auth_headers: &auth_headers,
meta_cache: &meta_cache,
fetch_locker: &fetch_locker,
cache_dir: Some(cache_dir.path()),
offline: false,
prefer_offline: false,
ignore_missing_time_field: false,
full_metadata: false,
};
let spec = range_spec("acme", "^1.0.0");
let opts = default_opts(&registry);
let results =
futures_util::future::try_join_all((0..20).map(|_| pick_package(&ctx, &spec, &opts)))
.await
.expect("all picks succeed");
for result in results {
assert_eq!(result.picked_package.expect("picked").version.to_string(), "1.1.0");
}
mock.assert_async().await;
}

View File

@@ -22,6 +22,7 @@ use pacquet_resolving_local_resolver::{
};
use pacquet_resolving_npm_resolver::{
InMemoryPackageMetaCache, NamedRegistryResolver, merge_named_registries,
shared_packument_fetch_locker,
};
use pacquet_resolving_resolver_base::{ResolveOptions, WantedDependency};
use tempfile::TempDir;
@@ -37,6 +38,7 @@ fn named_registry_resolver(
http_client: Arc::new(ThrottledClient::default()),
auth_headers: Arc::new(AuthHeaders::default()),
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
fetch_locker: shared_packument_fetch_locker(),
// No cache_dir means no on-disk mirror — every fetch goes
// through the network. The link / workspace / file tests never
// hit named-registry, so this is fine without mocks.