mirror of
https://github.com/pnpm/pnpm.git
synced 2026-05-24 16:46:06 -04:00
perf(pacquet/resolving-npm-resolver): dedup concurrent packument fetches
Pacquet was firing N concurrent HTTP GETs for the same packument
whenever cross-referencing deps reached `pick_package` from
different points in the resolve tree (e.g. every `react-*` dep
racing for `react`). With no per-key serializer between the
in-memory cache miss check and the network fetch, all N concurrent
callers fell through to step 5 in parallel, queued behind the 16-64
slot `ThrottledClient` semaphore, and re-fetched the same packument
N times.
Upstream pnpm avoids this with `runLimited(pkgMirror, …)` — a
`pLimit(1)` keyed on the on-disk mirror path that wraps the entire
post-cache-miss flow. Only the first caller for a given packument
hits the network; the rest wait on the limit, then short-circuit on
the in-memory cache the winner just populated. See
`resolving/npm-resolver/src/pickPackage.ts:42-64`.
This change ports the same shape:
- `PackumentFetchLocker = Arc<DashMap<String, Arc<Semaphore>>>`
threaded through `PickPackageContext::fetch_locker`. Constructed
via `shared_packument_fetch_locker()` once per install and shared
between `NpmResolver` and `NamedRegistryResolver` so concurrent
picks across the two resolvers also coalesce.
- After step 1's in-memory cache miss check, `pick_package`
acquires a single-permit semaphore keyed on the same string the
cache uses (`{registry}\x00{name}[:full]`). After acquiring,
re-checks the in-memory cache so a duplicate caller short-circuits
on the winner's `meta_cache.set` instead of re-fetching.
- `handle_cache_hit` extracts the shared hit-then-upgrade-and-persist
logic so the pre-permit and post-permit paths can't drift.
Test: `concurrent_picks_for_same_key_share_one_network_fetch` spawns
20 concurrent picks against a `mock.expect(1)` server. Without the
dedup, 20 GETs reach the registry; with it, exactly 1.
Closes #11832 (with the prefetch in the previous commit).
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2650,6 +2650,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"dashmap",
|
||||
"derive_more",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
"miette 7.6.0",
|
||||
"mockito",
|
||||
|
||||
@@ -4,7 +4,9 @@ use pacquet_lockfile::{LockfileResolution, TarballResolution};
|
||||
use pacquet_network::ThrottledClient;
|
||||
use pacquet_registry_mock::AutoMockInstance;
|
||||
use pacquet_reporter::{LogEvent, ProgressMessage, Reporter, SilentReporter};
|
||||
use pacquet_resolving_npm_resolver::{InMemoryPackageMetaCache, NpmResolver};
|
||||
use pacquet_resolving_npm_resolver::{
|
||||
InMemoryPackageMetaCache, NpmResolver, shared_packument_fetch_locker,
|
||||
};
|
||||
use pacquet_resolving_resolver_base::{ResolveOptions, ResolveResult, Resolver, WantedDependency};
|
||||
use pacquet_store_dir::{SharedVerifiedFilesCache, StoreDir};
|
||||
use pipe_trait::Pipe;
|
||||
@@ -97,6 +99,7 @@ async fn resolve_via_mock(
|
||||
http_client,
|
||||
auth_headers: Default::default(),
|
||||
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
|
||||
fetch_locker: shared_packument_fetch_locker(),
|
||||
cache_dir: Some(cache_dir.to_path_buf()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
|
||||
@@ -30,7 +30,7 @@ use pacquet_resolving_local_resolver::{
|
||||
};
|
||||
use pacquet_resolving_npm_resolver::{
|
||||
InMemoryPackageMetaCache, MergeNamedRegistriesError, NamedRegistryResolver, NpmResolver,
|
||||
merge_named_registries,
|
||||
merge_named_registries, shared_packument_fetch_locker,
|
||||
};
|
||||
use pacquet_resolving_resolver_base::{
|
||||
LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, Resolver, WantedDependency,
|
||||
@@ -289,12 +289,21 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
|
||||
let meta_cache = Arc::new(InMemoryPackageMetaCache::default());
|
||||
|
||||
// One per-cache-key packument fetch serializer shared between
|
||||
// the npm and named-registry resolvers. Ports upstream's
|
||||
// [`metafileOperationLimits`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L42-L44):
|
||||
// concurrent picks for the same `(registry, name)` coalesce
|
||||
// into a single network fetch instead of firing N parallel
|
||||
// HTTP GETs queued behind the `ThrottledClient` semaphore.
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
|
||||
let npm_resolver: Arc<dyn Resolver> = Arc::new(NpmResolver {
|
||||
registries,
|
||||
named_registries: merged_named_registries.clone(),
|
||||
http_client: Arc::clone(&http_client_arc),
|
||||
auth_headers: Arc::clone(&config.auth_headers),
|
||||
meta_cache: Arc::clone(&meta_cache),
|
||||
fetch_locker: Arc::clone(&fetch_locker),
|
||||
cache_dir: Some(config.cache_dir.clone()),
|
||||
offline: config.offline,
|
||||
prefer_offline: config.prefer_offline,
|
||||
@@ -334,6 +343,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
http_client: Arc::clone(&http_client_arc),
|
||||
auth_headers: Arc::clone(&config.auth_headers),
|
||||
meta_cache: Arc::clone(&meta_cache),
|
||||
fetch_locker: Arc::clone(&fetch_locker),
|
||||
cache_dir: Some(config.cache_dir.clone()),
|
||||
offline: config.offline,
|
||||
prefer_offline: config.prefer_offline,
|
||||
|
||||
@@ -38,6 +38,7 @@ tracing = { workspace = true }
|
||||
pacquet-resolving-default-resolver = { workspace = true }
|
||||
pacquet-resolving-local-resolver = { workspace = true }
|
||||
|
||||
futures-util = { workspace = true }
|
||||
mockito = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
ssri = { workspace = true }
|
||||
|
||||
@@ -57,9 +57,9 @@ pub use parse_bare_specifier::{
|
||||
parse_named_registry_specifier_to_registry_package_spec,
|
||||
};
|
||||
pub use pick_package::{
|
||||
InMemoryPackageMetaCache, MirrorPersistError, PackageMetaCache, PickPackageContext,
|
||||
PickPackageError, PickPackageOptions, PickPackageResult, persist_meta_to_mirror, pick_package,
|
||||
shared_in_memory_cache,
|
||||
InMemoryPackageMetaCache, MirrorPersistError, PackageMetaCache, PackumentFetchLocker,
|
||||
PickPackageContext, PickPackageError, PickPackageOptions, PickPackageResult,
|
||||
persist_meta_to_mirror, pick_package, shared_in_memory_cache, shared_packument_fetch_locker,
|
||||
};
|
||||
pub use pick_package_from_meta::{
|
||||
PickPackageFromMetaError, PickPackageFromMetaOptions, PickVersionByVersionRangeOptions,
|
||||
|
||||
@@ -69,6 +69,11 @@ pub struct NamedRegistryResolver<Cache: PackageMetaCache> {
|
||||
pub http_client: Arc<ThrottledClient>,
|
||||
pub auth_headers: Arc<AuthHeaders>,
|
||||
pub meta_cache: Arc<Cache>,
|
||||
/// Shared per-cache-key packument fetch serializer. See
|
||||
/// [`crate::PackumentFetchLocker`]. Same handle as the sibling
|
||||
/// [`crate::NpmResolver`] so concurrent picks for the same
|
||||
/// `(registry, name)` across resolvers coalesce.
|
||||
pub fetch_locker: crate::PackumentFetchLocker,
|
||||
pub cache_dir: Option<PathBuf>,
|
||||
pub offline: bool,
|
||||
pub prefer_offline: bool,
|
||||
@@ -200,6 +205,7 @@ impl<Cache: PackageMetaCache + 'static> NamedRegistryResolver<Cache> {
|
||||
http_client: &self.http_client,
|
||||
auth_headers: &self.auth_headers,
|
||||
meta_cache: self.meta_cache.as_ref(),
|
||||
fetch_locker: &self.fetch_locker,
|
||||
cache_dir: self.cache_dir.as_deref(),
|
||||
offline: self.offline,
|
||||
prefer_offline: self.prefer_offline,
|
||||
|
||||
@@ -10,7 +10,8 @@ use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::{
|
||||
NamedRegistryResolver, merge_named_registries, pick_package::InMemoryPackageMetaCache,
|
||||
NamedRegistryResolver, merge_named_registries,
|
||||
pick_package::{InMemoryPackageMetaCache, shared_packument_fetch_locker},
|
||||
};
|
||||
|
||||
/// Packument for `@acme/private` served under a named registry —
|
||||
@@ -68,6 +69,7 @@ fn build_resolver(
|
||||
http_client: Arc::new(ThrottledClient::default()),
|
||||
auth_headers: Arc::new(AuthHeaders::default()),
|
||||
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
|
||||
fetch_locker: shared_packument_fetch_locker(),
|
||||
cache_dir: Some(cache_dir.path().to_path_buf()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
|
||||
@@ -93,6 +93,12 @@ pub struct NpmResolver<Cache: PackageMetaCache> {
|
||||
pub http_client: Arc<ThrottledClient>,
|
||||
pub auth_headers: Arc<AuthHeaders>,
|
||||
pub meta_cache: Arc<Cache>,
|
||||
/// Per-cache-key packument fetch serializer. Shared across this
|
||||
/// resolver and the sibling [`crate::NamedRegistryResolver`] so
|
||||
/// concurrent picks for the same `(registry, name)` coalesce
|
||||
/// into one network fetch. Construct via
|
||||
/// [`crate::shared_packument_fetch_locker`] once per install.
|
||||
pub fetch_locker: crate::PackumentFetchLocker,
|
||||
/// Root of the on-disk metadata mirror. `None` disables every
|
||||
/// disk read/write — the picker goes straight to the network on
|
||||
/// each cache miss.
|
||||
@@ -293,6 +299,7 @@ impl<Cache: PackageMetaCache + 'static> NpmResolver<Cache> {
|
||||
http_client: &self.http_client,
|
||||
auth_headers: &self.auth_headers,
|
||||
meta_cache: self.meta_cache.as_ref(),
|
||||
fetch_locker: &self.fetch_locker,
|
||||
cache_dir: self.cache_dir.as_deref(),
|
||||
offline: self.offline,
|
||||
prefer_offline: self.prefer_offline,
|
||||
|
||||
@@ -10,7 +10,8 @@ use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::{
|
||||
npm_resolver::NpmResolver, pick_package::InMemoryPackageMetaCache,
|
||||
npm_resolver::NpmResolver,
|
||||
pick_package::{InMemoryPackageMetaCache, shared_packument_fetch_locker},
|
||||
violation_codes::MINIMUM_RELEASE_AGE_VIOLATION_CODE,
|
||||
};
|
||||
|
||||
@@ -60,6 +61,7 @@ fn build_resolver_with_registries(
|
||||
http_client: Arc::new(ThrottledClient::default()),
|
||||
auth_headers: Arc::new(AuthHeaders::default()),
|
||||
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
|
||||
fetch_locker: shared_packument_fetch_locker(),
|
||||
cache_dir: Some(cache_dir.path().to_path_buf()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
|
||||
@@ -30,13 +30,25 @@
|
||||
//! silently degrading to its warn-and-skip fallback. Ports upstream's
|
||||
//! [`maybeUpgradeAbbreviatedMetaForReleaseAge`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L450-L501).
|
||||
//!
|
||||
//! Concurrency: upstream uses `p-limit(1)` keyed on the mirror path
|
||||
//! to serialize disk operations. Pacquet relies on the atomic
|
||||
//! rename in [`crate::mirror::save_meta`] for write safety, and on
|
||||
//! [`std::sync::Mutex`]-guarded in-memory caches for reader
|
||||
//! coordination. The per-mirror limiter is omitted; if a future
|
||||
//! issue forces serialization (Windows file-lock contention, e.g.)
|
||||
//! it would land here as a map of `tokio::sync::Mutex` values.
|
||||
//! Concurrency: upstream's
|
||||
//! [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52)
|
||||
//! wraps the post-cache-miss flow in a `pLimit(1)` keyed by the
|
||||
//! mirror path so concurrent picks for the same package coalesce
|
||||
//! into a single network fetch — the rest wait, then re-check the
|
||||
//! in-memory cache that the winner just populated and short-circuit
|
||||
//! without hitting the registry. Pacquet ports this via
|
||||
//! [`PackumentFetchLocker`], a [`DashMap<String, Arc<Semaphore>>`]
|
||||
//! threaded through [`PickPackageContext::fetch_locker`]: the first
|
||||
//! caller for a given cache key acquires the per-key permit and
|
||||
//! does the disk + network work; subsequent callers wait on the
|
||||
//! permit and (per pnpm's runLimited semantics) re-check
|
||||
//! [`PackageMetaCache`] after acquiring so the winner's
|
||||
//! [`PackageMetaCache::set`] short-circuits the rest. Without this,
|
||||
//! pacquet was firing N concurrent HTTP GETs for the same packument
|
||||
//! per cluster of cross-referencing deps, queued behind the
|
||||
//! `ThrottledClient` semaphore — multiplying packument-fetch
|
||||
//! wall-clock by the dedup factor and putting the resolve walk
|
||||
//! 3-5× behind pnpm on the `alotta-files` benchmark.
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
@@ -45,12 +57,14 @@ use std::{
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use dashmap::DashMap;
|
||||
use derive_more::{Display, Error};
|
||||
use miette::Diagnostic;
|
||||
use pacquet_config::version_policy::PackageVersionPolicy;
|
||||
use pacquet_network::{AuthHeaders, ThrottledClient};
|
||||
use pacquet_registry::{Package, PackageVersion};
|
||||
use pacquet_resolving_resolver_base::VersionSelectors;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::{
|
||||
FetchFullMetadataCachedOptions, FetchFullMetadataOptions, FetchMetadataError,
|
||||
@@ -86,6 +100,36 @@ pub trait PackageMetaCache: Send + Sync {
|
||||
fn set(&self, key: String, meta: Package);
|
||||
}
|
||||
|
||||
/// Per-`(registry, package_name)` fetch serializer. Mirrors
|
||||
/// upstream's [`metafileOperationLimits`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L42-L44)
|
||||
/// — a map of `pLimit(1)` instances keyed on the on-disk mirror
|
||||
/// path, used by [`runLimited`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
|
||||
/// to coalesce concurrent picks for the same packument.
|
||||
///
|
||||
/// Pacquet stores one [`tokio::sync::Semaphore`] (single-permit) per
|
||||
/// in-memory cache key; first caller acquires, runs the
|
||||
/// disk-then-network flow, releases. Subsequent callers wait on the
|
||||
/// same semaphore; after acquiring, they re-check
|
||||
/// [`PackageMetaCache`] and short-circuit on a hit so only the first
|
||||
/// caller hits the network.
|
||||
///
|
||||
/// Shared across every [`PickPackageContext`] in a single install so
|
||||
/// the npm and named-registry resolvers coalesce against the same
|
||||
/// in-flight set. Keying is on the same string [`PackageMetaCache`]
|
||||
/// uses (`{registry}\x00{name}` for abbreviated,
|
||||
/// `{registry}\x00{name}:full` for full), so two callers asking for
|
||||
/// different forms of the same packument don't accidentally serialize
|
||||
/// — pnpm scopes its `runLimited` the same way (per `pkgMirror`,
|
||||
/// which embeds the `metaDir` differentiator).
|
||||
pub type PackumentFetchLocker = Arc<DashMap<String, Arc<Semaphore>>>;
|
||||
|
||||
/// Construct a fresh [`PackumentFetchLocker`] for a new install.
|
||||
/// Equivalent to `Default::default()`; named for symmetry with
|
||||
/// [`shared_in_memory_cache`].
|
||||
pub fn shared_packument_fetch_locker() -> PackumentFetchLocker {
|
||||
Arc::new(DashMap::new())
|
||||
}
|
||||
|
||||
/// Default thread-safe [`PackageMetaCache`] backed by a [`Mutex`]
|
||||
/// guarding a [`HashMap`]. A consumer that already has its own
|
||||
/// shared map can implement the trait directly instead of using
|
||||
@@ -118,6 +162,12 @@ pub struct PickPackageContext<'a, Cache: PackageMetaCache> {
|
||||
pub http_client: &'a ThrottledClient,
|
||||
pub auth_headers: &'a AuthHeaders,
|
||||
pub meta_cache: &'a Cache,
|
||||
/// Per-cache-key fetch serializer. See [`PackumentFetchLocker`]
|
||||
/// for the rationale. Construct once per install via
|
||||
/// [`shared_packument_fetch_locker`] and thread the same handle
|
||||
/// through every [`PickPackageContext`] so the npm and named-
|
||||
/// registry resolvers coalesce against the same in-flight set.
|
||||
pub fetch_locker: &'a PackumentFetchLocker,
|
||||
/// Root of the on-disk metadata mirror. `None` disables every
|
||||
/// disk path — the orchestrator goes straight to the network.
|
||||
pub cache_dir: Option<&'a Path>,
|
||||
@@ -324,21 +374,59 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
|
||||
// 1. In-memory cache.
|
||||
if let Some(cached) = ctx.meta_cache.get(&cache_key) {
|
||||
let upgrade =
|
||||
maybe_upgrade_abbreviated_meta_for_release_age(ctx, spec, opts, full_metadata, cached)
|
||||
.await?;
|
||||
let meta = upgrade.meta;
|
||||
if upgrade.upgraded && !opts.dry_run {
|
||||
// Persist so a fresh process doesn't re-trigger the
|
||||
// upgrade fetch on its next install. Matches upstream's
|
||||
// [`persistUpgradedMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L507-L524).
|
||||
if let Some(path) = pkg_mirror.as_deref() {
|
||||
persist_upgraded_to_mirror(path, &meta);
|
||||
}
|
||||
ctx.meta_cache.set(cache_key.clone(), meta.clone());
|
||||
}
|
||||
let picked = pick_matching_version_final(&picker_opts, spec, &meta)?;
|
||||
return Ok(PickPackageResult { meta, picked_package: picked });
|
||||
return handle_cache_hit(
|
||||
ctx,
|
||||
spec,
|
||||
opts,
|
||||
&picker_opts,
|
||||
full_metadata,
|
||||
&cache_key,
|
||||
pkg_mirror.as_deref(),
|
||||
cached,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Per-cache-key fetch serializer. Mirrors upstream's
|
||||
// [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
|
||||
// pLimit(1): concurrent picks for the same packument coalesce
|
||||
// into a single network fetch. The first caller for `cache_key`
|
||||
// acquires the permit and runs steps 2-5; the rest park here
|
||||
// and, after acquiring, re-check the in-memory cache so the
|
||||
// winner's [`PackageMetaCache::set`] short-circuits them
|
||||
// without re-fetching. Without this, `try_join_all` over the
|
||||
// resolved tree fires N concurrent HTTP GETs per shared
|
||||
// packument (e.g. every `react-*` dep racing for `react`), each
|
||||
// queued behind the [`ThrottledClient`] semaphore — the
|
||||
// 3-5× resolve-walk gap the
|
||||
// [`alotta-files` benchmark]([../../../../../pnpm.io/benchmarks/results/pnpm12])
|
||||
// surfaced.
|
||||
let limit = {
|
||||
let entry = ctx
|
||||
.fetch_locker
|
||||
.entry(cache_key.clone())
|
||||
.or_insert_with(|| Arc::new(Semaphore::new(1)));
|
||||
Arc::clone(entry.value())
|
||||
};
|
||||
let _permit = limit.acquire().await.expect("packument fetch semaphore should not be closed");
|
||||
|
||||
// Re-check in-memory cache after acquiring the permit — the
|
||||
// previous permit holder may have just populated it. Without
|
||||
// this re-check, every duplicate caller would still fall
|
||||
// through to the disk + network path even though they were
|
||||
// waiting precisely for the winner's fetch to complete.
|
||||
if let Some(cached) = ctx.meta_cache.get(&cache_key) {
|
||||
return handle_cache_hit(
|
||||
ctx,
|
||||
spec,
|
||||
opts,
|
||||
&picker_opts,
|
||||
full_metadata,
|
||||
&cache_key,
|
||||
pkg_mirror.as_deref(),
|
||||
cached,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut meta_cached_in_store: Option<Package> = None;
|
||||
@@ -494,6 +582,46 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
Ok(PickPackageResult { meta, picked_package: picked })
|
||||
}
|
||||
|
||||
/// Shared cache-hit path. Invoked once on the optimistic pre-permit
|
||||
/// check and once after the per-key permit is acquired (the re-check
|
||||
/// that lets duplicate concurrent callers short-circuit without
|
||||
/// re-fetching). Extracting it keeps the two call sites identical so
|
||||
/// the upgrade-and-persist side-effects can't drift.
|
||||
///
|
||||
/// The argument list is wide because the helper consumes everything
|
||||
/// the per-call frame already computed (cache key, derived
|
||||
/// `full_metadata`, pre-resolved mirror path, picker options).
|
||||
/// Bundling these into a struct would just shuffle the same fields
|
||||
/// into a wrapper without removing any work; allowing the lint is
|
||||
/// the lower-noise option.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_cache_hit<Cache: PackageMetaCache>(
|
||||
ctx: &PickPackageContext<'_, Cache>,
|
||||
spec: &RegistryPackageSpec,
|
||||
opts: &PickPackageOptions<'_>,
|
||||
picker_opts: &PickerOpts<'_>,
|
||||
full_metadata: bool,
|
||||
cache_key: &str,
|
||||
pkg_mirror: Option<&Path>,
|
||||
cached: Package,
|
||||
) -> Result<PickPackageResult, PickPackageError> {
|
||||
let upgrade =
|
||||
maybe_upgrade_abbreviated_meta_for_release_age(ctx, spec, opts, full_metadata, cached)
|
||||
.await?;
|
||||
let meta = upgrade.meta;
|
||||
if upgrade.upgraded && !opts.dry_run {
|
||||
// Persist so a fresh process doesn't re-trigger the upgrade
|
||||
// fetch on its next install. Matches upstream's
|
||||
// [`persistUpgradedMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L507-L524).
|
||||
if let Some(path) = pkg_mirror {
|
||||
persist_upgraded_to_mirror(path, &meta);
|
||||
}
|
||||
ctx.meta_cache.set(cache_key.to_string(), meta.clone());
|
||||
}
|
||||
let picked = pick_matching_version_final(picker_opts, spec, &meta)?;
|
||||
Ok(PickPackageResult { meta, picked_package: picked })
|
||||
}
|
||||
|
||||
/// Internal mirror of upstream's
|
||||
/// [`PickerOptions`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L75-L79).
|
||||
/// Same fields as [`PickPackageOptions`] minus the dispatcher-only
|
||||
|
||||
@@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
|
||||
|
||||
use super::{
|
||||
InMemoryPackageMetaCache, PackageMetaCache, PickPackageContext, PickPackageError,
|
||||
PickPackageOptions, persist_meta_to_mirror, pick_package,
|
||||
PickPackageOptions, persist_meta_to_mirror, pick_package, shared_packument_fetch_locker,
|
||||
};
|
||||
use crate::{
|
||||
mirror::{ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta},
|
||||
@@ -93,10 +93,12 @@ async fn cold_pick_fetches_and_picks_max_in_range() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -132,6 +134,7 @@ async fn warm_in_memory_cache_skips_network() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
|
||||
let preloaded: pacquet_registry::Package =
|
||||
serde_json::from_str(PACKAGE_BODY).expect("parse packument");
|
||||
@@ -143,6 +146,7 @@ async fn warm_in_memory_cache_skips_network() {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -174,10 +178,12 @@ async fn offline_with_mirror_picks_from_disk() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: true,
|
||||
prefer_offline: false,
|
||||
@@ -202,10 +208,12 @@ async fn offline_without_mirror_errors() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: true,
|
||||
prefer_offline: false,
|
||||
@@ -236,10 +244,12 @@ async fn version_spec_with_mirror_takes_fast_path() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -298,10 +308,12 @@ async fn version_spec_missing_in_mirror_fetches() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -336,10 +348,12 @@ async fn dry_run_skips_in_memory_cache() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -372,10 +386,12 @@ async fn pick_lowest_version_picks_min() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -453,10 +469,12 @@ async fn in_memory_cache_does_not_leak_across_registries() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -493,10 +511,12 @@ async fn invalid_package_name_errors_synchronously() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: None,
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -558,10 +578,12 @@ async fn default_pick_targets_abbreviated_endpoint_and_mirror() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -605,10 +627,12 @@ async fn optional_opt_forces_full_metadata_endpoint() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -659,10 +683,12 @@ async fn cache_key_separates_abbreviated_from_full() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -724,10 +750,12 @@ async fn published_by_triggers_upgrade_when_modified_after_cutoff() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -788,10 +816,12 @@ async fn published_by_skips_upgrade_when_modified_equals_cutoff() {
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
@@ -805,3 +835,62 @@ async fn published_by_skips_upgrade_when_modified_equals_cutoff() {
|
||||
|
||||
abbrev_mock.assert_async().await;
|
||||
}
|
||||
|
||||
/// Concurrent `pick_package` calls for the same `(registry, name)`
|
||||
/// coalesce into a single network fetch. Mirrors pnpm's
|
||||
/// [`runLimited(pkgMirror, …)`](https://github.com/pnpm/pnpm/blob/f657b5cb44/resolving/npm-resolver/src/pickPackage.ts#L52-L64)
|
||||
/// behavior — without dedup, each duplicate caller would race past
|
||||
/// the in-memory cache miss and fire its own GET, exhausting the
|
||||
/// [`ThrottledClient`] semaphore and re-fetching the same packument
|
||||
/// `N` times.
|
||||
///
|
||||
/// The mock asserts `expect(1)` — even though we spawn 20 concurrent
|
||||
/// picks, exactly one GET reaches the registry. The other 19 wait
|
||||
/// on the per-key permit and pick up the cached packument once the
|
||||
/// winner returns.
|
||||
#[tokio::test]
|
||||
async fn concurrent_picks_for_same_key_share_one_network_fetch() {
|
||||
let mut server = mockito::Server::new_async().await;
|
||||
// `expect(1)` is the assertion: at most one GET reaches the
|
||||
// registry for the 20-way concurrent fan-out below. Without the
|
||||
// per-key serializer, all 20 would race past the empty in-memory
|
||||
// cache and each fire its own GET.
|
||||
let mock = server
|
||||
.mock("GET", "/acme")
|
||||
.with_status(200)
|
||||
.with_header("etag", r#"W/"fresh""#)
|
||||
.with_body(PACKAGE_BODY)
|
||||
.expect(1)
|
||||
.create_async()
|
||||
.await;
|
||||
|
||||
let cache_dir = TempDir::new().expect("tempdir");
|
||||
let registry = format!("{}/", server.url());
|
||||
let http_client = ThrottledClient::default();
|
||||
let auth_headers = AuthHeaders::default();
|
||||
let meta_cache = InMemoryPackageMetaCache::default();
|
||||
let fetch_locker = shared_packument_fetch_locker();
|
||||
let ctx = PickPackageContext {
|
||||
http_client: &http_client,
|
||||
auth_headers: &auth_headers,
|
||||
meta_cache: &meta_cache,
|
||||
fetch_locker: &fetch_locker,
|
||||
cache_dir: Some(cache_dir.path()),
|
||||
offline: false,
|
||||
prefer_offline: false,
|
||||
ignore_missing_time_field: false,
|
||||
full_metadata: false,
|
||||
};
|
||||
|
||||
let spec = range_spec("acme", "^1.0.0");
|
||||
let opts = default_opts(®istry);
|
||||
let results =
|
||||
futures_util::future::try_join_all((0..20).map(|_| pick_package(&ctx, &spec, &opts)))
|
||||
.await
|
||||
.expect("all picks succeed");
|
||||
|
||||
for result in results {
|
||||
assert_eq!(result.picked_package.expect("picked").version.to_string(), "1.1.0");
|
||||
}
|
||||
mock.assert_async().await;
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use pacquet_resolving_local_resolver::{
|
||||
};
|
||||
use pacquet_resolving_npm_resolver::{
|
||||
InMemoryPackageMetaCache, NamedRegistryResolver, merge_named_registries,
|
||||
shared_packument_fetch_locker,
|
||||
};
|
||||
use pacquet_resolving_resolver_base::{ResolveOptions, WantedDependency};
|
||||
use tempfile::TempDir;
|
||||
@@ -37,6 +38,7 @@ fn named_registry_resolver(
|
||||
http_client: Arc::new(ThrottledClient::default()),
|
||||
auth_headers: Arc::new(AuthHeaders::default()),
|
||||
meta_cache: Arc::new(InMemoryPackageMetaCache::default()),
|
||||
fetch_locker: shared_packument_fetch_locker(),
|
||||
// No cache_dir means no on-disk mirror — every fetch goes
|
||||
// through the network. The link / workspace / file tests never
|
||||
// hit named-registry, so this is fine without mocks.
|
||||
|
||||
Reference in New Issue
Block a user