perf(pacquet/resolving-npm-resolver): move mirror disk reads off the tokio worker

The pick-and-fetch path was calling synchronous `load_meta` and
`load_meta_headers` directly from async tasks. Each call reads the
mirror file (`fs::read_to_string`, a blocking syscall) and parses
the multi-KB to multi-MB packument body (`serde_json::from_str`,
CPU-bound). Neither yields, so the tokio worker stayed pinned for
the duration of every disk hit.

The hot path runs them on every cache-warm pick:
- `fetch_full_metadata_cached` calls `load_meta_headers` to feed
  the conditional GET headers.
- A 304 response then calls `load_meta` to re-materialise the
  packument.
- `pick_package`'s offline / pickLowestVersion, version-spec, and
  publishedBy shortcuts also call `load_meta` directly.

With ~600 unique packuments per install all running through this
sequence concurrently behind `try_join_all`, the blocking time
serializes against the worker pool size and the async scheduler
can't make progress on unrelated resolves / HTTP fetches in the
gaps.

This change adds async siblings — `load_meta_async` and
`load_meta_headers_async` — that dispatch the sync body to
`tokio::task::spawn_blocking`. Callers in `pick_package` and
`fetch_full_metadata_cached` await the async wrappers. The sync
functions stay (tests and the writeback path still call them
directly).

Matches upstream pnpm's posture, which performs the equivalent
work via awaited `fs.readFile` + `JSON.parse` on libuv's worker
pool.

Test: all 194 tests in `pacquet-resolving-npm-resolver` continue
to pass (existing `load_meta` / `load_meta_headers` coverage in
the mirror tests and via the fetcher integration tests is the
behaviour-level proof — the wrappers delegate verbatim).
This commit is contained in:
Zoltan Kochan
2026-05-21 22:42:50 +02:00
parent 58f49c9056
commit 6cb50b4fe5
3 changed files with 54 additions and 13 deletions

View File

@@ -27,8 +27,8 @@ use crate::{
FetchMetadataError,
fetch_full_metadata::{ACCEPT_ABBREVIATED_DOC, ACCEPT_FULL_DOC},
mirror::{
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta, load_meta_headers,
prepare_json_for_disk, save_meta,
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta_async,
load_meta_headers_async, prepare_json_for_disk, save_meta,
},
registry_url::to_registry_url,
};
@@ -116,7 +116,7 @@ pub async fn fetch_full_metadata_cached(
},
None => None,
};
let cache_headers = mirror_path.as_deref().and_then(load_meta_headers);
let cache_headers = load_meta_headers_async(mirror_path.as_deref()).await;
let url = to_registry_url(opts.registry, pkg_name);
let accept = if opts.full_metadata { ACCEPT_FULL_DOC } else { ACCEPT_ABBREVIATED_DOC };
@@ -148,8 +148,8 @@ pub async fn fetch_full_metadata_cached(
pkg_name: pkg_name.to_string(),
});
};
return load_meta(&path).ok_or_else(|| FetchMetadataError::CacheMissingAfter304 {
pkg_name: pkg_name.to_string(),
return load_meta_async(Some(&path)).await.ok_or_else(|| {
FetchMetadataError::CacheMissingAfter304 { pkg_name: pkg_name.to_string() }
});
}

View File

@@ -232,6 +232,45 @@ pub fn load_meta(pkg_mirror: &Path) -> Option<Package> {
Some(meta)
}
/// Async sibling of [`load_meta`]. The body is a blocking
/// `fs::read_to_string` plus a `serde_json::from_str` that can chew
/// through a multi-KB to multi-MB packument body — neither yields, so
/// calling [`load_meta`] directly from an async task on the resolve
/// hot path blocks the tokio worker for the duration of the read +
/// parse. With hundreds of unique packuments per install, that
/// serializes the resolve walk against the size of the runtime's
/// worker pool. This wrapper dispatches the work to
/// [`tokio::task::spawn_blocking`] so the async scheduler keeps
/// progressing other resolves and HTTP fetches while one packument's
/// body parses on the blocking pool. Matches upstream's stance:
/// pnpm's loadMeta is an awaited `fs.readFile` + `JSON.parse` that
/// runs on libuv's worker pool, the same separation tokio gives us
/// via `spawn_blocking`.
///
/// `JoinError` (panic in the blocking task) and `None` from
/// [`load_meta`] (missing / unreadable file) both collapse to
/// `None`. The caller's response to either is the same — fall
/// through to the network fetch — so distinguishing them is not
/// load-bearing.
///
/// Returns `None` immediately when `pkg_mirror` is `None`, skipping
/// the spawn-blocking dispatch entirely on the no-cache-dir branch.
pub async fn load_meta_async(pkg_mirror: Option<&Path>) -> Option<Package> {
let pkg_mirror = pkg_mirror?.to_path_buf();
tokio::task::spawn_blocking(move || load_meta(&pkg_mirror)).await.ok().flatten()
}
/// Async sibling of [`load_meta_headers`]. Same rationale as
/// [`load_meta_async`] — the synchronous body opens a file and
/// parses a short JSON header line, blocking the worker for the
/// duration. The headers-only read is cheap (~100 bytes typically)
/// but is invoked on every cache-warm pick, so the cumulative block
/// time is still meaningful with hundreds of packuments.
pub async fn load_meta_headers_async(pkg_mirror: Option<&Path>) -> Option<MetaHeaders> {
let pkg_mirror = pkg_mirror?.to_path_buf();
tokio::task::spawn_blocking(move || load_meta_headers(&pkg_mirror)).await.ok().flatten()
}
/// Atomic write: serialize to a sibling temp file, then `rename` it
/// over the target. Mirrors pnpm's
/// [`saveMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L667-L676).

View File

@@ -70,8 +70,8 @@ use crate::{
FetchFullMetadataCachedOptions, FetchFullMetadataOptions, FetchFullMetadataOutcome,
FetchMetadataError, fetch_full_metadata, fetch_full_metadata_cached,
mirror::{
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta, prepare_json_for_disk,
save_meta,
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta_async,
prepare_json_for_disk, save_meta,
},
pick_package_from_meta::{
PickPackageFromMetaError, PickPackageFromMetaOptions, RegistryPackageSpec,
@@ -433,7 +433,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
// 2. Offline / pickLowestVersion / preferOffline disk read.
if ctx.offline || ctx.prefer_offline || opts.pick_lowest_version {
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
if ctx.offline {
if let Some(meta) = meta_cached_in_store {
@@ -478,7 +478,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
// 3. Version-spec fast path.
if !opts.include_latest_tag && matches!(spec.spec_type, RegistryPackageSpecType::Version) {
if meta_cached_in_store.is_none() {
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
}
if let Some(ref meta) = meta_cached_in_store
&& meta.versions.contains_key(&spec.fetch_spec)
@@ -503,7 +503,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
&& mtime >= published_by
{
if meta_cached_in_store.is_none() {
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
}
if let Some(ref meta) = meta_cached_in_store
&& let Ok(Some(picked)) = pick_matching_version_fast(&picker_opts, spec, meta)
@@ -533,9 +533,11 @@ pub async fn pick_package<Cache: PackageMetaCache>(
// returned (when it returned Ok). If it returned Err,
// try the disk fallback: an existing mirror is good
// enough to pick from, even if the latest sync failed.
if let Some(disk) =
meta_cached_in_store.or_else(|| pkg_mirror.as_deref().and_then(load_meta))
{
let disk_fallback = match meta_cached_in_store {
Some(meta) => Some(meta),
None => load_meta_async(pkg_mirror.as_deref()).await,
};
if let Some(disk) = disk_fallback {
tracing::debug!(
target: "pacquet_resolving_npm_resolver::pick_package",
?error,