mirror of
https://github.com/pnpm/pnpm.git
synced 2026-05-25 00:57:38 -04:00
perf(pacquet/package-manager): batched store-index prefetch on fresh-lockfile install
Under global virtual store the install side is essentially just symlinking, so the resolve-walk wall-clock dominates — and almost all of it ends up serialized on the `Arc<Mutex<StoreIndex>>`. `run_with_mem_cache` → `run_without_mem_cache` calls `load_cached_cas_paths` per package, which under the hood `spawn_blocking`s a SQL `SELECT` against the shared store-index mutex. When `PrefetchingResolver` fires that path once per resolve occurrence (~1k spawns on the alotta-files fixture), the mutex contention dominates wall-clock. `create_virtual_store::run` already avoids this for the frozen-lockfile path via `prefetch_cas_paths` — one batched `SELECT … WHERE key IN (…)` followed by rayon-parallel per-row verify. This change ports that shape to the fresh-lockfile path: - After `resolve_importer`, walk the graph, collect every `(integrity, pkg_id)` cache key via the new `collect_prefetch_cache_keys` helper (mirrors the equivalent loop in `create_virtual_store::run`), run `prefetch_cas_paths`, and thread the resulting `PrefetchedCasPaths` through `InstallCtx` → `InstallPackageFromRegistry::run` → `DownloadTarballToStore` via the existing `prefetched_cas_paths` field. - Drop the `PrefetchingResolver` wrapper. With the batched prefetch in place, the resolver chain is the plain `DefaultResolver` again — same setup the frozen-lockfile path uses. Side effects: - The `Resolved` → `found_in_store` → `Imported` emit ordering is fixed by construction: all three now fire from `InstallPackageFromRegistry::run` in that order. Previously the `PrefetchingResolver` spawn fired `found_in_store` from the resolve phase, before `Resolved` from the install phase. - `prefetching_resolver.rs` is deleted. Tests: 298/298 pass in `pacquet-package-manager`. Clippy clean.
This commit is contained in:
@@ -47,6 +47,14 @@ pub struct InstallPackageFromRegistry<'a> {
|
||||
/// per-package fetch. See `DownloadTarballToStore::verified_files_cache`
|
||||
/// for the rationale.
|
||||
pub verified_files_cache: &'a SharedVerifiedFilesCache,
|
||||
/// Warm-cache prefetch result built once per install via
|
||||
/// [`pacquet_tarball::prefetch_cas_paths`] — `cache_key →
|
||||
/// Arc<cas_paths>`. When `Some`, the
|
||||
/// `DownloadTarballToStore::run_without_mem_cache` cache-lookup
|
||||
/// branch reads from here before falling back to the per-snapshot
|
||||
/// SQLite lookup, avoiding `Arc<Mutex<StoreIndex>>` contention on
|
||||
/// the resolve hot path.
|
||||
pub prefetched_cas_paths: Option<&'a pacquet_tarball::PrefetchedCasPaths>,
|
||||
/// Install-scoped dedupe state for `pnpm:package-import-method`.
|
||||
/// See `link_file::log_method_once`.
|
||||
pub logged_methods: &'a AtomicU8,
|
||||
@@ -107,6 +115,7 @@ impl<'a> InstallPackageFromRegistry<'a> {
|
||||
store_index,
|
||||
store_index_writer,
|
||||
verified_files_cache,
|
||||
prefetched_cas_paths,
|
||||
logged_methods,
|
||||
requester,
|
||||
node_modules_dir,
|
||||
@@ -168,7 +177,7 @@ impl<'a> InstallPackageFromRegistry<'a> {
|
||||
package_url: tarball_url,
|
||||
package_id: &package_id,
|
||||
requester,
|
||||
prefetched_cas_paths: None,
|
||||
prefetched_cas_paths,
|
||||
retry_opts: retry_opts_from_config(config),
|
||||
auth_headers: &config.auth_headers,
|
||||
ignore_file_pattern: None,
|
||||
|
||||
@@ -155,6 +155,7 @@ pub async fn should_install_package_from_pre_resolved_result() {
|
||||
store_index: None,
|
||||
store_index_writer: None,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: None,
|
||||
logged_methods: &logged_methods,
|
||||
requester: "",
|
||||
alias: "@pnpm.e2e/hello-world-js-bin",
|
||||
@@ -240,6 +241,7 @@ async fn second_visit_skips_progress_emits_but_still_links() {
|
||||
store_index: None,
|
||||
store_index_writer: None,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: None,
|
||||
logged_methods: &logged_methods,
|
||||
requester: "/proj",
|
||||
alias: "first-alias",
|
||||
@@ -261,6 +263,7 @@ async fn second_visit_skips_progress_emits_but_still_links() {
|
||||
store_index: None,
|
||||
store_index_writer: None,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: None,
|
||||
logged_methods: &logged_methods,
|
||||
requester: "/proj",
|
||||
alias: "second-alias",
|
||||
@@ -352,6 +355,7 @@ async fn install_emits_progress_sequence() {
|
||||
store_index: None,
|
||||
store_index_writer: None,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: None,
|
||||
logged_methods: &logged_methods,
|
||||
requester: "/proj",
|
||||
alias: "@pnpm.e2e/hello-world-js-bin",
|
||||
@@ -453,6 +457,7 @@ async fn install_returns_unsupported_resolution_when_name_ver_missing() {
|
||||
store_index: None,
|
||||
store_index_writer: None,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: None,
|
||||
logged_methods: &logged_methods,
|
||||
requester: "",
|
||||
alias: "bar",
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
AllowBuildPolicy, GraphToLockfileOptions, HoistedDependencies, InstallPackageFromRegistry,
|
||||
InstallPackageFromRegistryError, LinkVirtualStoreBins, LinkVirtualStoreBinsError,
|
||||
PrefetchContext, PrefetchingResolver, VersionPolicyError, VirtualStoreLayout,
|
||||
dependencies_graph_to_lockfile, store_init::init_store_dir_best_effort,
|
||||
VersionPolicyError, VirtualStoreLayout, dependencies_graph_to_lockfile,
|
||||
store_init::init_store_dir_best_effort,
|
||||
};
|
||||
use async_recursion::async_recursion;
|
||||
use dashmap::{DashMap, mapref::entry::Entry};
|
||||
@@ -81,11 +81,9 @@ pub type ResolvedPackages = DashMap<String, watch::Sender<bool>>;
|
||||
/// `pnpm-lock.yaml`; the caller writes it to `<lockfile_dir>/pnpm-lock.yaml`.
|
||||
#[must_use]
|
||||
pub struct InstallWithFreshLockfile<'a, DependencyGroupList> {
|
||||
/// Shared in-memory tarball cache. Held behind [`Arc`] so the
|
||||
/// resolve-time prefetcher ([`PrefetchingResolver`]) can capture
|
||||
/// an owned clone into the background download task spawned for
|
||||
/// each fresh resolution while the install-side per-package call
|
||||
/// in [`install_subtree`] still takes `&MemCache` via deref.
|
||||
/// Shared in-memory tarball cache. Held behind [`Arc`] for parity
|
||||
/// with the [`crate::Install`] surface; the install-side calls
|
||||
/// take `&MemCache` via deref.
|
||||
pub tarball_mem_cache: Arc<MemCache>,
|
||||
pub resolved_packages: &'a ResolvedPackages,
|
||||
pub http_client: &'a ThrottledClient,
|
||||
@@ -361,7 +359,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
// (`contains_path_sep` in `parse_bare_specifier.rs`) would
|
||||
// otherwise claim and prevent the named-registry resolver
|
||||
// from running.
|
||||
let inner_resolver: Box<dyn Resolver> = Box::new(DefaultResolver::new(vec![
|
||||
let resolver: Box<dyn Resolver> = Box::new(DefaultResolver::new(vec![
|
||||
Box::new(ArcResolver(Arc::clone(&npm_resolver))),
|
||||
Box::new(git_resolver),
|
||||
Box::new(tarball_resolver),
|
||||
@@ -373,60 +371,6 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
Box::new(local_path_resolver),
|
||||
]));
|
||||
|
||||
// Open the read-only SQLite index, spawn the batched writer,
|
||||
// and allocate the install-scoped `verifiedFilesCache` *before*
|
||||
// the resolver chain runs. These were originally opened after
|
||||
// `resolve_importer` returned, but the
|
||||
// [`PrefetchingResolver`] needs them at construction time so
|
||||
// the per-resolve `tokio::spawn`ed [`DownloadTarballToStore`]
|
||||
// shares the same store-index / writer / verify cache as the
|
||||
// install pass that runs once resolution is done. Mirrors
|
||||
// pnpm's `packageRequester` shape: the fetch begins as soon as
|
||||
// the resolver returns, before any further tree walk.
|
||||
let store_index =
|
||||
match tokio::task::spawn_blocking(move || StoreIndex::shared_readonly_in(store_dir))
|
||||
.await
|
||||
{
|
||||
Ok(store_index) => store_index,
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
target: "pacquet::install",
|
||||
?error,
|
||||
"store-index open task failed; continuing without a shared cache index",
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
let store_index_ref = store_index.as_ref();
|
||||
|
||||
let (store_index_writer, writer_task) = StoreIndexWriter::spawn(store_dir);
|
||||
let store_index_writer_ref = Some(&store_index_writer);
|
||||
|
||||
let verified_files_cache = SharedVerifiedFilesCache::default();
|
||||
|
||||
// Wrap the resolver chain so each tarball-shaped result fires a
|
||||
// background download into `tarball_mem_cache` while the
|
||||
// deps-resolver continues to walk the tree. The install pass
|
||||
// later calls `DownloadTarballToStore::run_with_mem_cache` for
|
||||
// the same URLs and either picks up `CacheValue::Available`
|
||||
// immediately or briefly blocks on the per-URL `Notify`. The
|
||||
// wrapper is generic over `R: Reporter` so the spawned
|
||||
// download's `pnpm:progress` emits route through the same
|
||||
// reporter the install pass uses. See
|
||||
// `prefetching_resolver.rs` for the full design rationale.
|
||||
let resolver: Box<dyn Resolver> = Box::new(PrefetchingResolver::<Reporter>::new(
|
||||
inner_resolver,
|
||||
PrefetchContext {
|
||||
http_client: &http_client_arc,
|
||||
mem_cache: &tarball_mem_cache,
|
||||
store_index: store_index_ref,
|
||||
store_index_writer: Some(&store_index_writer),
|
||||
verified_files_cache: &verified_files_cache,
|
||||
config,
|
||||
requester,
|
||||
},
|
||||
));
|
||||
|
||||
// Compile `minimumReleaseAge` (and its exclude pattern set)
|
||||
// for the resolve pass. Mirrors the verifier wiring in
|
||||
// `build_resolution_verifiers` so the resolver-time pick and
|
||||
@@ -507,22 +451,65 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
.await
|
||||
.map_err(InstallWithFreshLockfileError::ResolveImporter)?;
|
||||
|
||||
// Drop the resolver (and its meta cache) before the install
|
||||
// pass: the tree captures every `ResolveResult` we need.
|
||||
// Dropping `resolver` releases the inner resolver chain's
|
||||
// strong reference to `npm_resolver` (held by the
|
||||
// `ArcResolver` wrapper inside the inner chain); the
|
||||
// standalone `npm_resolver` binding holds a second strong
|
||||
// reference because the deno- and bun-resolvers were handed a
|
||||
// clone of the same `Arc` for their version-selection
|
||||
// delegate. Drop both so the `NpmResolver` (and its packument
|
||||
// cache) can be freed before the install pass keeps
|
||||
// allocating tarballs. The store_index / store_index_writer /
|
||||
// verified_files_cache are opened above the resolver chain so
|
||||
// the prefetching wrapper can share them with the install pass.
|
||||
// Drop the resolver (and its packument cache) before the
|
||||
// install pass. Dropping `resolver` releases the strong
|
||||
// reference held by the `ArcResolver` wrapper; the standalone
|
||||
// `npm_resolver` binding holds a second strong reference
|
||||
// because the deno- and bun-resolvers were handed a clone of
|
||||
// the same `Arc` for their version-selection delegate. Drop
|
||||
// both so the `NpmResolver`'s meta cache is freed before the
|
||||
// install pass starts pulling tarballs into the CAFS.
|
||||
drop(resolver);
|
||||
drop(npm_resolver);
|
||||
|
||||
// Open the read-only SQLite index, spawn the batched writer,
|
||||
// and allocate the install-scoped `verifiedFilesCache`. Same
|
||||
// shape `create_virtual_store::run` opens for the frozen-
|
||||
// lockfile install path — the warm-cache prefetch below shares
|
||||
// them with the per-package install routines.
|
||||
let store_index =
|
||||
match tokio::task::spawn_blocking(move || StoreIndex::shared_readonly_in(store_dir))
|
||||
.await
|
||||
{
|
||||
Ok(store_index) => store_index,
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
target: "pacquet::install",
|
||||
?error,
|
||||
"store-index open task failed; continuing without a shared cache index",
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
let store_index_ref = store_index.as_ref();
|
||||
|
||||
let (store_index_writer, writer_task) = StoreIndexWriter::spawn(store_dir);
|
||||
let store_index_writer_ref = Some(&store_index_writer);
|
||||
|
||||
let verified_files_cache = SharedVerifiedFilesCache::default();
|
||||
|
||||
// Warm-cache batched prefetch: collect every `(integrity,
|
||||
// pkg_id)` pair the resolver produced, run one batched SQL
|
||||
// `SELECT ... WHERE key IN (...)` against the store index,
|
||||
// then verify each row's files on rayon. Mirrors what
|
||||
// `create_virtual_store::run` already does for the frozen-
|
||||
// lockfile path. Without this, the per-package `run_with_mem_cache`
|
||||
// → `load_cached_cas_paths` flow fires N individual `spawn_blocking`
|
||||
// tasks that all serialize on `Arc<Mutex<StoreIndex>>` — at ~1k
|
||||
// resolved packages the Mutex contention dominates the resolve-
|
||||
// walk wall-clock under global virtual store, where the install
|
||||
// side is otherwise just symlinking.
|
||||
let cache_keys: Vec<String> = collect_prefetch_cache_keys(&importer_result.peers_result);
|
||||
let prefetch = pacquet_tarball::prefetch_cas_paths(
|
||||
store_index_ref.cloned(),
|
||||
store_dir,
|
||||
cache_keys,
|
||||
config.verify_store_integrity,
|
||||
SharedVerifiedFilesCache::clone(&verified_files_cache),
|
||||
)
|
||||
.await;
|
||||
let prefetched_cas_paths = prefetch.cas_paths;
|
||||
|
||||
// Peer-resolution result (collected by `resolve_importer` after
|
||||
// the hoist loop converged). Peer issues collected here are not
|
||||
// fatal — they are reported (TODO: wire into the reporter once
|
||||
@@ -593,6 +580,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
store_index: store_index_ref,
|
||||
store_index_writer: store_index_writer_ref,
|
||||
verified_files_cache: &verified_files_cache,
|
||||
prefetched_cas_paths: &prefetched_cas_paths,
|
||||
logged_methods,
|
||||
resolved_packages,
|
||||
requester,
|
||||
@@ -714,6 +702,48 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk the resolver-produced graph and emit the
|
||||
/// `{integrity}\t{pkg_id}` cache keys
|
||||
/// [`pacquet_tarball::prefetch_cas_paths`] uses for its batched
|
||||
/// `SELECT ... WHERE key IN (...)` against the store index. Mirrors
|
||||
/// the equivalent collection loop in
|
||||
/// [`crate::CreateVirtualStore::run`] for the frozen-lockfile path —
|
||||
/// same key shape, same dedup, so the fresh-lockfile path's warm
|
||||
/// batch hits the same rows pnpm or pacquet wrote on the prior
|
||||
/// install.
|
||||
///
|
||||
/// Skips nodes whose resolver result isn't a tarball with both
|
||||
/// `integrity` and a structured `name@version`: git-hosted tarballs
|
||||
/// and directory / git / binary resolutions use a different key
|
||||
/// shape (`pkg_id`-only) and route through the cold path. Today's
|
||||
/// `install_subtree` only handles tarball+integrity anyway, so the
|
||||
/// skipped entries can't be served from the prefetch either way.
|
||||
fn collect_prefetch_cache_keys(
|
||||
peers_result: &pacquet_resolving_deps_resolver::ResolvePeersResult,
|
||||
) -> Vec<String> {
|
||||
let mut keys: Vec<String> = peers_result
|
||||
.graph
|
||||
.values()
|
||||
.filter_map(|node| {
|
||||
let pacquet_lockfile::LockfileResolution::Tarball(tarball) =
|
||||
&node.resolve_result.resolution
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
if tarball.git_hosted == Some(true) {
|
||||
return None;
|
||||
}
|
||||
let integrity = tarball.integrity.as_ref()?.to_string();
|
||||
let name_ver = node.resolve_result.name_ver.as_ref()?;
|
||||
let pkg_id = format!("{}@{}", name_ver.name, name_ver.suffix);
|
||||
Some(pacquet_store_dir::store_index_key(&integrity, &pkg_id))
|
||||
})
|
||||
.collect();
|
||||
keys.sort_unstable();
|
||||
keys.dedup();
|
||||
keys
|
||||
}
|
||||
|
||||
/// Build the [`Lockfile`] for `<lockfile_dir>/pnpm-lock.yaml` from the
|
||||
/// resolver's output.
|
||||
///
|
||||
@@ -763,6 +793,13 @@ struct InstallCtx<'a> {
|
||||
store_index: Option<&'a pacquet_store_dir::SharedReadonlyStoreIndex>,
|
||||
store_index_writer: Option<&'a Arc<StoreIndexWriter>>,
|
||||
verified_files_cache: &'a SharedVerifiedFilesCache,
|
||||
/// Warm-cache lookup table built once at install start via
|
||||
/// [`pacquet_tarball::prefetch_cas_paths`]. Threaded through to
|
||||
/// [`InstallPackageFromRegistry`] so the per-package
|
||||
/// `run_with_mem_cache` short-circuits the per-snapshot SQLite
|
||||
/// round-trip + per-file `fs::metadata` work when the
|
||||
/// `(integrity, pkg_id)` is already in the CAFS.
|
||||
prefetched_cas_paths: &'a pacquet_tarball::PrefetchedCasPaths,
|
||||
logged_methods: &'a AtomicU8,
|
||||
resolved_packages: &'a ResolvedPackages,
|
||||
requester: &'a str,
|
||||
@@ -856,6 +893,7 @@ where
|
||||
store_index: ctx.store_index,
|
||||
store_index_writer: ctx.store_index_writer,
|
||||
verified_files_cache: ctx.verified_files_cache,
|
||||
prefetched_cas_paths: Some(ctx.prefetched_cas_paths),
|
||||
logged_methods: ctx.logged_methods,
|
||||
requester: ctx.requester,
|
||||
node_modules_dir,
|
||||
|
||||
@@ -23,7 +23,6 @@ mod link_bins;
|
||||
mod link_file;
|
||||
mod link_hoisted_modules;
|
||||
mod overrides;
|
||||
mod prefetching_resolver;
|
||||
mod retry_config;
|
||||
mod store_init;
|
||||
mod symlink_direct_dependencies;
|
||||
@@ -56,7 +55,6 @@ pub use link_bins::*;
|
||||
pub use link_file::*;
|
||||
pub use link_hoisted_modules::*;
|
||||
pub use overrides::*;
|
||||
pub use prefetching_resolver::*;
|
||||
pub use symlink_direct_dependencies::*;
|
||||
pub use symlink_package::*;
|
||||
pub use version_policy::*;
|
||||
|
||||
@@ -1,224 +0,0 @@
|
||||
//! Resolver wrapper that pipelines tarball downloads with resolution.
|
||||
//!
|
||||
//! Upstream pnpm's [`packageRequester.requestPackage`](https://github.com/pnpm/pnpm/blob/fecaee0b35/installing/package-requester/src/packageRequester.ts#L266)
|
||||
//! returns a `pkgResponse` whose `fetching` field is a Promise that is
|
||||
//! already running by the time the resolver returns. Resolution of
|
||||
//! children continues in parallel with that download; the install pass
|
||||
//! later `await`s each `fetching` promise (which is usually already
|
||||
//! resolved by then).
|
||||
//!
|
||||
//! Pacquet's deps-resolver crate stays pure: it walks the manifest tree
|
||||
//! and returns a [`ResolveResult`] without doing any tarball I/O. To
|
||||
//! match pnpm's pipelined shape, the install orchestrator wraps the
|
||||
//! resolver chain with [`PrefetchingResolver`]. After the inner
|
||||
//! resolver claims a wanted dep, the wrapper inspects the result and,
|
||||
//! for tarball-shaped resolutions, [`tokio::spawn`]s a
|
||||
//! [`DownloadTarballToStore`] in the background.
|
||||
//!
|
||||
//! The download lands its result in the shared [`MemCache`]. Later, when
|
||||
//! [`crate::InstallPackageFromRegistry`] calls
|
||||
//! [`DownloadTarballToStore::run_with_mem_cache`] for the same URL, the
|
||||
//! `MemCache` either returns `CacheValue::Available` immediately (the
|
||||
//! prefetch is already done) or briefly blocks on the `Notify` (the
|
||||
//! prefetch is still in flight). Errors are surfaced to the install
|
||||
//! path as [`TarballError::SiblingFetchFailed`].
|
||||
|
||||
use crate::install_package_from_registry::{extract_tarball, manifest_unpacked_size};
|
||||
use crate::retry_config::retry_opts_from_config;
|
||||
use pacquet_config::Config;
|
||||
use pacquet_network::{AuthHeaders, ThrottledClient};
|
||||
use pacquet_reporter::Reporter;
|
||||
use pacquet_resolving_resolver_base::{
|
||||
LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, ResolveResult, Resolver,
|
||||
WantedDependency,
|
||||
};
|
||||
use pacquet_store_dir::{
|
||||
SharedReadonlyStoreIndex, SharedVerifiedFilesCache, StoreDir, StoreIndexWriter,
|
||||
};
|
||||
use pacquet_tarball::{DownloadTarballToStore, MemCache, RetryOpts};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
/// Borrowed-data bag handed to [`PrefetchingResolver::new`]. Everything
|
||||
/// the wrapper needs to drive a background tarball download:
|
||||
/// network/store handles, the shared mem cache, the
|
||||
/// `verifiedFilesCache`, retry/offline knobs, and the install's
|
||||
/// `requester` prefix for reporter events. The wrapper clones each
|
||||
/// field into the form a `tokio::spawn`ed task can capture (`Arc` for
|
||||
/// shared refs, `&'static` passes through, primitive copies).
|
||||
pub struct PrefetchContext<'a> {
|
||||
pub http_client: &'a Arc<ThrottledClient>,
|
||||
pub mem_cache: &'a Arc<MemCache>,
|
||||
pub store_index: Option<&'a SharedReadonlyStoreIndex>,
|
||||
pub store_index_writer: Option<&'a Arc<StoreIndexWriter>>,
|
||||
pub verified_files_cache: &'a SharedVerifiedFilesCache,
|
||||
pub config: &'static Config,
|
||||
pub requester: &'a str,
|
||||
}
|
||||
|
||||
/// Owned, `'static`-friendly clones of [`PrefetchContext`] stored on
|
||||
/// the wrapper. Every field is either an `Arc` clone or a `Copy`
|
||||
/// scalar so each [`tokio::spawn`]ed download task can capture an
|
||||
/// independent set without leaking lifetimes back into the resolver's
|
||||
/// type.
|
||||
struct OwnedFetchCtx {
|
||||
http_client: Arc<ThrottledClient>,
|
||||
mem_cache: Arc<MemCache>,
|
||||
store_dir: &'static StoreDir,
|
||||
store_index: Option<SharedReadonlyStoreIndex>,
|
||||
store_index_writer: Option<Arc<StoreIndexWriter>>,
|
||||
verified_files_cache: SharedVerifiedFilesCache,
|
||||
auth_headers: Arc<AuthHeaders>,
|
||||
retry_opts: RetryOpts,
|
||||
requester: Arc<str>,
|
||||
offline: bool,
|
||||
verify_store_integrity: bool,
|
||||
}
|
||||
|
||||
/// Wraps an inner [`Resolver`] and, after each successful resolve that
|
||||
/// produces a tarball-shaped result, fires the tarball download into
|
||||
/// the shared [`MemCache`] via [`tokio::spawn`]. The resolver returns
|
||||
/// to the deps-resolver immediately; the download runs concurrently
|
||||
/// with the rest of the tree walk.
|
||||
///
|
||||
/// Generic over `R: Reporter` so [`DownloadTarballToStore`]'s
|
||||
/// `pnpm:progress` emits route through the same reporter the install
|
||||
/// pass uses. The wrapper itself doesn't hold an `R` value (Reporter is
|
||||
/// a static trait); `PhantomData` carries the type through.
|
||||
pub struct PrefetchingResolver<R: Reporter> {
|
||||
inner: Box<dyn Resolver>,
|
||||
ctx: OwnedFetchCtx,
|
||||
_phantom: PhantomData<fn() -> R>,
|
||||
}
|
||||
|
||||
impl<R: Reporter + 'static> PrefetchingResolver<R> {
|
||||
/// Build a wrapper from the install orchestrator's borrowed
|
||||
/// references. Clones the necessary `Arc`s up front so the
|
||||
/// per-`resolve` spawn has all the data it needs without
|
||||
/// re-borrowing the install scope.
|
||||
pub fn new(inner: Box<dyn Resolver>, prefetch_ctx: PrefetchContext<'_>) -> Self {
|
||||
let PrefetchContext {
|
||||
http_client,
|
||||
mem_cache,
|
||||
store_index,
|
||||
store_index_writer,
|
||||
verified_files_cache,
|
||||
config,
|
||||
requester,
|
||||
} = prefetch_ctx;
|
||||
let ctx = OwnedFetchCtx {
|
||||
http_client: Arc::clone(http_client),
|
||||
mem_cache: Arc::clone(mem_cache),
|
||||
store_dir: &config.store_dir,
|
||||
store_index: store_index.cloned(),
|
||||
store_index_writer: store_index_writer.cloned(),
|
||||
verified_files_cache: SharedVerifiedFilesCache::clone(verified_files_cache),
|
||||
auth_headers: Arc::clone(&config.auth_headers),
|
||||
retry_opts: retry_opts_from_config(config),
|
||||
requester: Arc::<str>::from(requester),
|
||||
offline: config.offline,
|
||||
verify_store_integrity: config.verify_store_integrity,
|
||||
};
|
||||
PrefetchingResolver { inner, ctx, _phantom: PhantomData }
|
||||
}
|
||||
|
||||
/// Inspect a fresh `ResolveResult` and, if it carries a tarball
|
||||
/// URL + integrity, kick off the download as a detached
|
||||
/// [`tokio::spawn`] task.
|
||||
///
|
||||
/// Non-tarball resolutions (git, directory, registry-shape,
|
||||
/// binary, variations) and resolutions missing a structured
|
||||
/// `name@version` fall through to a no-op — the install path's
|
||||
/// per-protocol code path handles them.
|
||||
///
|
||||
/// The spawned task's result is dropped: the per-URL `MemCache`
|
||||
/// stores `CacheValue::Available` (on success) or
|
||||
/// `CacheValue::Failed` (on error) and any later
|
||||
/// `run_with_mem_cache` call observes the right value. Surfacing
|
||||
/// the error from inside the resolver would force the resolve
|
||||
/// pass to abort before the rest of the tree walk completes,
|
||||
/// which is the opposite of what we want for a prefetch.
|
||||
fn maybe_kickoff_download(&self, result: &ResolveResult) {
|
||||
// Only spawn for tarball-shaped resolutions with both URL and
|
||||
// integrity. Mirrors the gate in
|
||||
// `install_package_from_registry::extract_tarball`; other
|
||||
// resolution shapes are not fetched through
|
||||
// `DownloadTarballToStore` at all.
|
||||
let Ok((package_url, integrity)) = extract_tarball(&result.resolution) else {
|
||||
return;
|
||||
};
|
||||
// The npm picker's `dist.tarball` is the canonical URL the
|
||||
// install path will look up in `MemCache`. Tarball-resolver
|
||||
// and git-resolver paths can leave `name_ver` unset (they
|
||||
// learn the name from the manifest only after the fetch); in
|
||||
// those cases the install path's `InstallPackageFromRegistry`
|
||||
// also fails, so skipping here matches the install-side
|
||||
// behaviour without adding a divergence.
|
||||
let Some(name_ver) = result.name_ver.as_ref() else { return };
|
||||
|
||||
let package_id = format!("{}@{}", name_ver.name, name_ver.suffix);
|
||||
let package_url = package_url.to_string();
|
||||
let package_unpacked_size = manifest_unpacked_size(result.manifest.as_ref());
|
||||
|
||||
let http_client = Arc::clone(&self.ctx.http_client);
|
||||
let mem_cache = Arc::clone(&self.ctx.mem_cache);
|
||||
let store_dir = self.ctx.store_dir;
|
||||
let store_index = self.ctx.store_index.clone();
|
||||
let store_index_writer = self.ctx.store_index_writer.clone();
|
||||
let verified_files_cache = SharedVerifiedFilesCache::clone(&self.ctx.verified_files_cache);
|
||||
let auth_headers = Arc::clone(&self.ctx.auth_headers);
|
||||
let retry_opts = self.ctx.retry_opts;
|
||||
let requester = Arc::clone(&self.ctx.requester);
|
||||
let offline = self.ctx.offline;
|
||||
let verify_store_integrity = self.ctx.verify_store_integrity;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Result is intentionally discarded — see the doc comment
|
||||
// above. The `MemCache` carries success / failure state to
|
||||
// the install path.
|
||||
let _ = DownloadTarballToStore {
|
||||
http_client: &http_client,
|
||||
store_dir,
|
||||
store_index,
|
||||
store_index_writer,
|
||||
verify_store_integrity,
|
||||
verified_files_cache,
|
||||
package_integrity: &integrity,
|
||||
package_unpacked_size,
|
||||
package_url: &package_url,
|
||||
package_id: &package_id,
|
||||
requester: &requester,
|
||||
prefetched_cas_paths: None,
|
||||
retry_opts,
|
||||
auth_headers: &auth_headers,
|
||||
ignore_file_pattern: None,
|
||||
offline,
|
||||
}
|
||||
.run_with_mem_cache::<R>(&mem_cache)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Reporter + 'static> Resolver for PrefetchingResolver<R> {
|
||||
fn resolve<'a>(
|
||||
&'a self,
|
||||
wanted_dependency: &'a WantedDependency,
|
||||
opts: &'a ResolveOptions,
|
||||
) -> ResolveFuture<'a> {
|
||||
Box::pin(async move {
|
||||
let result = self.inner.resolve(wanted_dependency, opts).await?;
|
||||
if let Some(result_ref) = result.as_ref() {
|
||||
self.maybe_kickoff_download(result_ref);
|
||||
}
|
||||
Ok(result)
|
||||
})
|
||||
}
|
||||
|
||||
fn resolve_latest<'a>(
|
||||
&'a self,
|
||||
query: &'a LatestQuery,
|
||||
opts: &'a ResolveOptions,
|
||||
) -> ResolveLatestFuture<'a> {
|
||||
self.inner.resolve_latest(query, opts)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user