mirror of
https://github.com/pnpm/pnpm.git
synced 2026-05-24 16:46:06 -04:00
perf(pacquet/resolving-npm-resolver): move mirror disk reads off the tokio worker
The pick-and-fetch path was calling synchronous `load_meta` and `load_meta_headers` directly from async tasks. Each call reads the mirror file (`fs::read_to_string`, a blocking syscall) and parses the multi-KB to multi-MB packument body (`serde_json::from_str`, CPU-bound). Neither yields, so the tokio worker stayed pinned for the duration of every disk hit. The hot path runs them on every cache-warm pick: - `fetch_full_metadata_cached` calls `load_meta_headers` to feed the conditional GET headers. - A 304 response then calls `load_meta` to re-materialise the packument. - `pick_package`'s offline / pickLowestVersion, version-spec, and publishedBy shortcuts also call `load_meta` directly. With ~600 unique packuments per install all running through this sequence concurrently behind `try_join_all`, the blocking time serializes against the worker pool size and the async scheduler can't make progress on unrelated resolves / HTTP fetches in the gaps. This change adds async siblings — `load_meta_async` and `load_meta_headers_async` — that dispatch the sync body to `tokio::task::spawn_blocking`. Callers in `pick_package` and `fetch_full_metadata_cached` await the async wrappers. The sync functions stay (tests and the writeback path still call them directly). Matches upstream pnpm's posture, which performs the equivalent work via awaited `fs.readFile` + `JSON.parse` on libuv's worker pool. Test: all 194 tests in `pacquet-resolving-npm-resolver` continue to pass (existing `load_meta` / `load_meta_headers` coverage in the mirror tests and via the fetcher integration tests is the behaviour-level proof — the wrappers delegate verbatim).
This commit is contained in:
@@ -27,8 +27,8 @@ use crate::{
|
||||
FetchMetadataError,
|
||||
fetch_full_metadata::{ACCEPT_ABBREVIATED_DOC, ACCEPT_FULL_DOC},
|
||||
mirror::{
|
||||
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta, load_meta_headers,
|
||||
prepare_json_for_disk, save_meta,
|
||||
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta_async,
|
||||
load_meta_headers_async, prepare_json_for_disk, save_meta,
|
||||
},
|
||||
registry_url::to_registry_url,
|
||||
};
|
||||
@@ -116,7 +116,7 @@ pub async fn fetch_full_metadata_cached(
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
let cache_headers = mirror_path.as_deref().and_then(load_meta_headers);
|
||||
let cache_headers = load_meta_headers_async(mirror_path.as_deref()).await;
|
||||
|
||||
let url = to_registry_url(opts.registry, pkg_name);
|
||||
let accept = if opts.full_metadata { ACCEPT_FULL_DOC } else { ACCEPT_ABBREVIATED_DOC };
|
||||
@@ -148,8 +148,8 @@ pub async fn fetch_full_metadata_cached(
|
||||
pkg_name: pkg_name.to_string(),
|
||||
});
|
||||
};
|
||||
return load_meta(&path).ok_or_else(|| FetchMetadataError::CacheMissingAfter304 {
|
||||
pkg_name: pkg_name.to_string(),
|
||||
return load_meta_async(Some(&path)).await.ok_or_else(|| {
|
||||
FetchMetadataError::CacheMissingAfter304 { pkg_name: pkg_name.to_string() }
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -232,6 +232,45 @@ pub fn load_meta(pkg_mirror: &Path) -> Option<Package> {
|
||||
Some(meta)
|
||||
}
|
||||
|
||||
/// Async sibling of [`load_meta`]. The body is a blocking
|
||||
/// `fs::read_to_string` plus a `serde_json::from_str` that can chew
|
||||
/// through a multi-KB to multi-MB packument body — neither yields, so
|
||||
/// calling [`load_meta`] directly from an async task on the resolve
|
||||
/// hot path blocks the tokio worker for the duration of the read +
|
||||
/// parse. With hundreds of unique packuments per install, that
|
||||
/// serializes the resolve walk against the size of the runtime's
|
||||
/// worker pool. This wrapper dispatches the work to
|
||||
/// [`tokio::task::spawn_blocking`] so the async scheduler keeps
|
||||
/// progressing other resolves and HTTP fetches while one packument's
|
||||
/// body parses on the blocking pool. Matches upstream's stance:
|
||||
/// pnpm's loadMeta is an awaited `fs.readFile` + `JSON.parse` that
|
||||
/// runs on libuv's worker pool, the same separation tokio gives us
|
||||
/// via `spawn_blocking`.
|
||||
///
|
||||
/// `JoinError` (panic in the blocking task) and `None` from
|
||||
/// [`load_meta`] (missing / unreadable file) both collapse to
|
||||
/// `None`. The caller's response to either is the same — fall
|
||||
/// through to the network fetch — so distinguishing them is not
|
||||
/// load-bearing.
|
||||
///
|
||||
/// Returns `None` immediately when `pkg_mirror` is `None`, skipping
|
||||
/// the spawn-blocking dispatch entirely on the no-cache-dir branch.
|
||||
pub async fn load_meta_async(pkg_mirror: Option<&Path>) -> Option<Package> {
|
||||
let pkg_mirror = pkg_mirror?.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || load_meta(&pkg_mirror)).await.ok().flatten()
|
||||
}
|
||||
|
||||
/// Async sibling of [`load_meta_headers`]. Same rationale as
|
||||
/// [`load_meta_async`] — the synchronous body opens a file and
|
||||
/// parses a short JSON header line, blocking the worker for the
|
||||
/// duration. The headers-only read is cheap (~100 bytes typically)
|
||||
/// but is invoked on every cache-warm pick, so the cumulative block
|
||||
/// time is still meaningful with hundreds of packuments.
|
||||
pub async fn load_meta_headers_async(pkg_mirror: Option<&Path>) -> Option<MetaHeaders> {
|
||||
let pkg_mirror = pkg_mirror?.to_path_buf();
|
||||
tokio::task::spawn_blocking(move || load_meta_headers(&pkg_mirror)).await.ok().flatten()
|
||||
}
|
||||
|
||||
/// Atomic write: serialize to a sibling temp file, then `rename` it
|
||||
/// over the target. Mirrors pnpm's
|
||||
/// [`saveMeta`](https://github.com/pnpm/pnpm/blob/2a9bd897bf/resolving/npm-resolver/src/pickPackage.ts#L667-L676).
|
||||
|
||||
@@ -70,8 +70,8 @@ use crate::{
|
||||
FetchFullMetadataCachedOptions, FetchFullMetadataOptions, FetchFullMetadataOutcome,
|
||||
FetchMetadataError, fetch_full_metadata, fetch_full_metadata_cached,
|
||||
mirror::{
|
||||
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta, prepare_json_for_disk,
|
||||
save_meta,
|
||||
ABBREVIATED_META_DIR, FULL_META_DIR, get_pkg_mirror_path, load_meta_async,
|
||||
prepare_json_for_disk, save_meta,
|
||||
},
|
||||
pick_package_from_meta::{
|
||||
PickPackageFromMetaError, PickPackageFromMetaOptions, RegistryPackageSpec,
|
||||
@@ -433,7 +433,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
|
||||
// 2. Offline / pickLowestVersion / preferOffline disk read.
|
||||
if ctx.offline || ctx.prefer_offline || opts.pick_lowest_version {
|
||||
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
|
||||
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
|
||||
|
||||
if ctx.offline {
|
||||
if let Some(meta) = meta_cached_in_store {
|
||||
@@ -478,7 +478,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
// 3. Version-spec fast path.
|
||||
if !opts.include_latest_tag && matches!(spec.spec_type, RegistryPackageSpecType::Version) {
|
||||
if meta_cached_in_store.is_none() {
|
||||
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
|
||||
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
|
||||
}
|
||||
if let Some(ref meta) = meta_cached_in_store
|
||||
&& meta.versions.contains_key(&spec.fetch_spec)
|
||||
@@ -503,7 +503,7 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
&& mtime >= published_by
|
||||
{
|
||||
if meta_cached_in_store.is_none() {
|
||||
meta_cached_in_store = pkg_mirror.as_deref().and_then(load_meta);
|
||||
meta_cached_in_store = load_meta_async(pkg_mirror.as_deref()).await;
|
||||
}
|
||||
if let Some(ref meta) = meta_cached_in_store
|
||||
&& let Ok(Some(picked)) = pick_matching_version_fast(&picker_opts, spec, meta)
|
||||
@@ -533,9 +533,11 @@ pub async fn pick_package<Cache: PackageMetaCache>(
|
||||
// returned (when it returned Ok). If it returned Err,
|
||||
// try the disk fallback: an existing mirror is good
|
||||
// enough to pick from, even if the latest sync failed.
|
||||
if let Some(disk) =
|
||||
meta_cached_in_store.or_else(|| pkg_mirror.as_deref().and_then(load_meta))
|
||||
{
|
||||
let disk_fallback = match meta_cached_in_store {
|
||||
Some(meta) => Some(meta),
|
||||
None => load_meta_async(pkg_mirror.as_deref()).await,
|
||||
};
|
||||
if let Some(disk) = disk_fallback {
|
||||
tracing::debug!(
|
||||
target: "pacquet_resolving_npm_resolver::pick_package",
|
||||
?error,
|
||||
|
||||
Reference in New Issue
Block a user