perf(pacquet): pipeline tarball fetches with resolution

Pacquet currently resolves the entire dependency tree before any
tarball is fetched: `resolve_importer` walks every transitive dep
through the resolver chain and returns; only then does
`install_subtree` start calling `DownloadTarballToStore`. On the
`alotta-files` benchmark this puts pacquet 3-5x behind the
TypeScript pnpm CLI whenever resolution runs — pnpm
pipelines the two stages.

This change wraps the resolver chain in a `PrefetchingResolver`
that, after each successful resolve returning a tarball-shaped
result, `tokio::spawn`s a `DownloadTarballToStore::run_with_mem_cache`
into the shared `MemCache`. Resolution of children, siblings, and
the rest of the tree continues in parallel with that download.
When `install_subtree` later calls `run_with_mem_cache` for the
same URL, the per-URL cache either returns `CacheValue::Available`
immediately or briefly blocks on the existing `Notify`. Errors
surface to the install path as `TarballError::SiblingFetchFailed`,
unchanged.

Mirrors pnpm's `packageRequester.requestPackage` shape, which
returns a `pkgResponse` whose `fetching` field is a Promise that
is already running by the time the resolver returns
(`installing/package-requester/src/packageRequester.ts#L266`,
`installing/deps-resolver/src/resolveDependencies.ts#L1631`).

Mechanical changes to support the prefetch capture:
- `State::tarball_mem_cache` is now `Arc<MemCache>` so the spawned
  task can own a clone without leaking lifetimes.
- `store_index`, `store_index_writer`, and `verified_files_cache`
  are opened before the resolver chain instead of after
  `resolve_importer` so the prefetcher shares them with the
  install pass.
- `Reporter` bounds tightened to `+ 'static` along the path that
  feeds the spawn. All existing impls (`SilentReporter`,
  `NdjsonReporter`) already satisfy it.

Closes #11832.
This commit is contained in:
Zoltan Kochan
2026-05-21 21:17:38 +02:00
parent fecaee0b35
commit f375c9161e
10 changed files with 377 additions and 110 deletions

View File

@@ -90,7 +90,10 @@ pub struct AddArgs {
impl AddArgs {
/// Execute the subcommand.
pub async fn run<Reporter: self::Reporter>(self, mut state: State) -> miette::Result<()> {
pub async fn run<Reporter: self::Reporter + 'static>(
self,
mut state: State,
) -> miette::Result<()> {
// TODO: if a package already exists in another dependency group, don't remove the existing entry.
let State { tarball_mem_cache, http_client, config, manifest, lockfile, resolved_packages } =
@@ -109,7 +112,7 @@ impl AddArgs {
.parent()
.map(|parent| parent.join(pacquet_lockfile::Lockfile::FILE_NAME));
Add {
tarball_mem_cache,
tarball_mem_cache: std::sync::Arc::clone(tarball_mem_cache),
http_client,
http_client_arc: std::sync::Arc::clone(http_client),
config,

View File

@@ -169,7 +169,7 @@ pub struct InstallArgs {
}
impl InstallArgs {
pub async fn run<Reporter: self::Reporter>(self, state: State) -> miette::Result<()> {
pub async fn run<Reporter: self::Reporter + 'static>(self, state: State) -> miette::Result<()> {
let State { tarball_mem_cache, http_client, config, manifest, lockfile, resolved_packages } =
&state;
let InstallArgs {
@@ -226,7 +226,7 @@ impl InstallArgs {
.parent()
.map(|parent| parent.join(pacquet_lockfile::Lockfile::FILE_NAME));
Install {
tarball_mem_cache,
tarball_mem_cache: std::sync::Arc::clone(tarball_mem_cache),
http_client,
http_client_arc: std::sync::Arc::clone(http_client),
config,

View File

@@ -7,12 +7,17 @@ use pacquet_package_manager::ResolvedPackages;
use pacquet_package_manifest::{PackageManifest, PackageManifestError};
use pacquet_tarball::MemCache;
use pipe_trait::Pipe;
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};
/// Application state when running `pacquet run` or `pacquet install`.
pub struct State {
/// Shared cache that store downloaded tarballs.
pub tarball_mem_cache: MemCache,
/// Shared cache that store downloaded tarballs. Held behind
/// [`Arc`] so the resolve-time prefetch (see
/// [`pacquet_package_manager::PrefetchingResolver`]) can capture
/// an owned clone into the `tokio::spawn`ed background download
/// while every install sub-pipeline still takes a borrowed
/// `&MemCache` via deref.
pub tarball_mem_cache: Arc<MemCache>,
/// HTTP client to make HTTP requests. Held behind [`std::sync::Arc`] so
/// the lockfile-verification gate can own a clone for the
/// `NpmResolutionVerifier`'s lifetime while every install
@@ -69,7 +74,7 @@ impl State {
ThrottledClient::for_installs(&config.proxy, &config.tls, &config.tls_by_uri)
.map_err(InitStateError::Network)?,
),
tarball_mem_cache: MemCache::new(),
tarball_mem_cache: Arc::new(MemCache::new()),
resolved_packages: ResolvedPackages::new(),
})
}

View File

@@ -17,7 +17,7 @@ where
ListDependencyGroups: Fn() -> DependencyGroupList,
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub tarball_mem_cache: &'a MemCache,
pub tarball_mem_cache: std::sync::Arc<MemCache>,
pub resolved_packages: &'a ResolvedPackages,
pub http_client: &'a ThrottledClient,
pub http_client_arc: std::sync::Arc<ThrottledClient>,
@@ -51,7 +51,7 @@ where
ListDependencyGroups: Fn() -> DependencyGroupList,
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub async fn run<Reporter: self::Reporter>(self) -> Result<(), AddError> {
pub async fn run<Reporter: self::Reporter + 'static>(self) -> Result<(), AddError> {
let Add {
tarball_mem_cache,
http_client,

View File

@@ -40,7 +40,11 @@ pub struct Install<'a, DependencyGroupList>
where
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
pub tarball_mem_cache: &'a MemCache,
/// Shared in-memory tarball cache. Held behind [`Arc`] so the
/// prefetcher constructed in [`InstallWithFreshLockfile::run`]
/// can capture an owned clone into the background download task
/// while the install-side calls still take `&MemCache` via deref.
pub tarball_mem_cache: Arc<MemCache>,
pub resolved_packages: &'a ResolvedPackages,
pub http_client: &'a ThrottledClient,
/// Same client behind an [`Arc`] for the lockfile-verification
@@ -268,7 +272,7 @@ where
DependencyGroupList: IntoIterator<Item = DependencyGroup>,
{
/// Execute the subroutine.
pub async fn run<Reporter: self::Reporter>(self) -> Result<(), InstallError> {
pub async fn run<Reporter: self::Reporter + 'static>(self) -> Result<(), InstallError> {
let Install {
tarball_mem_cache,
resolved_packages,

View File

@@ -49,7 +49,7 @@ async fn should_install_dependencies() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -116,7 +116,7 @@ async fn should_error_when_frozen_lockfile_is_requested_but_none_exists() {
let config = config.leak();
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -187,7 +187,7 @@ async fn frozen_lockfile_flag_overrides_config_lockfile_false() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -251,7 +251,7 @@ async fn npm_alias_dependency_installs_under_alias_key() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -332,7 +332,7 @@ async fn unversioned_npm_alias_defaults_to_latest() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -398,7 +398,7 @@ async fn frozen_lockfile_flag_with_no_lockfile_errors() {
let config = config.leak();
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -484,7 +484,7 @@ async fn install_emits_pnpm_event_sequence() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -624,7 +624,7 @@ async fn install_writes_modules_yaml() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -723,7 +723,7 @@ async fn install_writes_workspace_state() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -842,7 +842,7 @@ async fn install_optional_failing_postinstall_dep_via_registry_mock_succeeds() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -963,7 +963,7 @@ async fn warm_reinstall_skips_snapshot_when_current_lockfile_matches() {
seed_placeholder_virtual_store_slot(&virtual_store_dir);
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1059,7 +1059,7 @@ async fn warm_reinstall_emits_broken_modules_when_dir_is_missing() {
// event in the captured set regardless of the final install
// result.
let _ = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1163,7 +1163,7 @@ async fn context_log_reflects_current_lockfile_after_first_install() {
// First install: `lock.yaml` does not exist yet.
EVENTS.lock().unwrap().clear();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1211,7 +1211,7 @@ async fn context_log_reflects_current_lockfile_after_first_install() {
// install's `lock.yaml` is now on disk.
EVENTS.lock().unwrap().clear();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1301,7 +1301,7 @@ async fn warm_reinstall_reports_added_zero_and_emits_no_imported_events() {
EVENTS.lock().unwrap().clear();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1393,7 +1393,7 @@ async fn frozen_lockfile_errors_when_manifest_drifts_from_lockfile() {
.expect("parse partial-install fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1455,7 +1455,7 @@ async fn ignore_manifest_check_bypasses_manifest_freshness_gate() {
.expect("parse partial-install fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1518,7 +1518,7 @@ async fn frozen_lockfile_errors_when_overrides_drift_from_lockfile() {
.expect("parse minimal lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1607,7 +1607,7 @@ async fn frozen_lockfile_applies_overrides_to_manifest_before_freshness_check()
.expect("parse fixture lockfile with overrides");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1712,7 +1712,7 @@ async fn frozen_lockfile_resolves_catalog_protocol_in_overrides_before_freshness
.expect("parse fixture lockfile with overrides");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1771,7 +1771,7 @@ async fn frozen_lockfile_errors_when_lockfile_has_no_root_importer() {
serde_saphyr::from_str("lockfileVersion: '9.0'\n").expect("parse minimal lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1857,7 +1857,7 @@ async fn frozen_lockfile_under_gvs_registers_project_and_runs_clean() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -1933,7 +1933,7 @@ async fn frozen_lockfile_with_gvs_off_skips_project_registry() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2015,7 +2015,7 @@ async fn frozen_lockfile_under_gvs_registers_each_workspace_importer() {
.expect("parse minimal v9 workspace lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2215,7 +2215,7 @@ async fn frozen_install_preserves_seeded_skipped_across_reinstall() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2337,7 +2337,7 @@ async fn frozen_install_silently_swallows_unreachable_optional_tarball() {
.expect("parse broken-optional fixture lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2435,7 +2435,7 @@ async fn frozen_install_propagates_non_optional_fetch_failure() {
.expect("parse non-optional broken-fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2539,7 +2539,7 @@ async fn frozen_install_no_optional_drops_optional_only_snapshots() {
// `InstallDependencyOptions::dependency_groups()` in
// `crates/cli/src/cli_args/install.rs`.
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2628,7 +2628,7 @@ async fn frozen_install_optional_included_surfaces_missing_metadata() {
.expect("parse optional-no-metadata fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2719,7 +2719,7 @@ async fn frozen_install_no_optional_keeps_shared_non_optional_snapshot() {
.expect("parse shared-non-optional fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2809,7 +2809,7 @@ async fn hoisted_node_linker_empty_lockfile_writes_modules_yaml() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2895,7 +2895,7 @@ async fn hoisted_node_linker_does_not_create_virtual_store_root() {
.expect("parse minimal v9 lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -2987,7 +2987,7 @@ async fn frozen_lockfile_install_errors_when_no_variant_matches_host() {
.expect("parse variant-mismatch fixture lockfile");
let err = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3077,7 +3077,7 @@ async fn frozen_lockfile_install_skips_runtime_when_skip_runtimes_set() {
.expect("parse --no-runtime fixture lockfile");
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3173,7 +3173,7 @@ async fn install_rejects_invalid_minimum_release_age_exclude_pattern() {
.expect("parse minimal v9 lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3271,7 +3271,7 @@ async fn frozen_lockfile_gate_rejects_under_huge_minimum_release_age() {
.expect("parse lockfile fixture");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3355,7 +3355,7 @@ async fn fresh_install_writes_pnpm_lock_yaml_with_expected_shape() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3437,7 +3437,7 @@ async fn fresh_install_splits_dev_and_prod_dependency_sections() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3505,7 +3505,7 @@ async fn fresh_install_records_user_written_specifier() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3569,7 +3569,7 @@ async fn fresh_install_lockfile_round_trips_through_load_save_load() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3632,7 +3632,7 @@ async fn fresh_install_with_lockfile_disabled_does_not_write_a_lockfile() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3698,7 +3698,7 @@ async fn fresh_install_also_writes_current_lockfile_under_virtual_store() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3780,7 +3780,7 @@ async fn fresh_install_with_lockfile_disabled_skips_current_lockfile_too() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3840,7 +3840,7 @@ async fn fresh_install_marks_optional_snapshots_in_pnpm_lock_yaml() {
let config = config.leak();
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3923,7 +3923,7 @@ async fn fresh_install_refuses_hoisted_node_linker_before_writing_state() {
let config = config.leak();
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -3972,7 +3972,7 @@ async fn fresh_install_refuses_skip_runtimes_before_writing_state() {
let config = config.leak();
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -4039,7 +4039,7 @@ async fn prefer_frozen_lockfile_takes_frozen_path_when_lockfile_is_fresh() {
seed_placeholder_virtual_store_slot(&virtual_store_dir);
Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -4109,7 +4109,7 @@ async fn no_prefer_frozen_lockfile_flag_forces_fresh_resolve() {
seed_placeholder_virtual_store_slot(&virtual_store_dir);
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,
@@ -4175,7 +4175,7 @@ async fn stale_lockfile_under_no_flag_falls_through_to_fresh_resolve() {
.expect("parse partial-install fixture lockfile");
let result = Install {
tarball_mem_cache: &Default::default(),
tarball_mem_cache: Default::default(),
http_client: &Default::default(),
http_client_arc: std::sync::Arc::new(Default::default()),
config,

View File

@@ -216,7 +216,7 @@ impl<'a> InstallPackageFromRegistry<'a> {
/// Pull the tarball URL + integrity hash out of the resolver-produced
/// resolution. Refuses any shape the npm install path can't fetch.
fn extract_tarball(
pub(crate) fn extract_tarball(
resolution: &LockfileResolution,
) -> Result<(&str, Integrity), InstallPackageFromRegistryError> {
match resolution {
@@ -243,7 +243,7 @@ fn extract_tarball(
/// Read `dist.unpackedSize` off the resolver-fetched manifest. Returns
/// `None` when missing or non-numeric — the tarball extractor treats it
/// as a hint, not a hard requirement.
fn manifest_unpacked_size(manifest: Option<&Value>) -> Option<usize> {
pub(crate) fn manifest_unpacked_size(manifest: Option<&Value>) -> Option<usize> {
// `usize::try_from` so a `u64` value larger than the host's
// `usize` (32-bit targets) degrades to "no hint" rather than
// truncating silently and producing an undersized pre-allocation.

View File

@@ -1,8 +1,8 @@
use crate::{
AllowBuildPolicy, GraphToLockfileOptions, HoistedDependencies, InstallPackageFromRegistry,
InstallPackageFromRegistryError, LinkVirtualStoreBins, LinkVirtualStoreBinsError,
VersionPolicyError, VirtualStoreLayout, dependencies_graph_to_lockfile,
store_init::init_store_dir_best_effort,
PrefetchContext, PrefetchingResolver, VersionPolicyError, VirtualStoreLayout,
dependencies_graph_to_lockfile, store_init::init_store_dir_best_effort,
};
use async_recursion::async_recursion;
use dashmap::{DashMap, mapref::entry::Entry};
@@ -81,7 +81,12 @@ pub type ResolvedPackages = DashMap<String, watch::Sender<bool>>;
/// `pnpm-lock.yaml`; the caller writes it to `<lockfile_dir>/pnpm-lock.yaml`.
#[must_use]
pub struct InstallWithFreshLockfile<'a, DependencyGroupList> {
pub tarball_mem_cache: &'a MemCache,
/// Shared in-memory tarball cache. Held behind [`Arc`] so the
/// resolve-time prefetcher ([`PrefetchingResolver`]) can capture
/// an owned clone into the background download task spawned for
/// each fresh resolution while the install-side per-package call
/// in [`install_subtree`] still takes `&MemCache` via deref.
pub tarball_mem_cache: Arc<MemCache>,
pub resolved_packages: &'a ResolvedPackages,
pub http_client: &'a ThrottledClient,
/// Same client behind an [`Arc`] for the [`NpmResolver`], whose
@@ -228,7 +233,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
/// frozen-lockfile install ([`crate::InstallFrozenLockfile::run`]).
/// The signature symmetry keeps `Install::run` from branching on
/// which sub-path produced the result.
pub async fn run<Reporter: self::Reporter>(
pub async fn run<Reporter: self::Reporter + 'static>(
self,
) -> Result<InstallWithFreshLockfileResult, InstallWithFreshLockfileError>
where
@@ -346,7 +351,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
// (`contains_path_sep` in `parse_bare_specifier.rs`) would
// otherwise claim and prevent the named-registry resolver
// from running.
let resolver: Box<dyn Resolver> = Box::new(DefaultResolver::new(vec![
let inner_resolver: Box<dyn Resolver> = Box::new(DefaultResolver::new(vec![
Box::new(ArcResolver(Arc::clone(&npm_resolver))),
Box::new(git_resolver),
Box::new(tarball_resolver),
@@ -358,6 +363,60 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
Box::new(local_path_resolver),
]));
// Open the read-only SQLite index, spawn the batched writer,
// and allocate the install-scoped `verifiedFilesCache` *before*
// the resolver chain runs. These were originally opened after
// `resolve_importer` returned, but the
// [`PrefetchingResolver`] needs them at construction time so
// the per-resolve `tokio::spawn`ed [`DownloadTarballToStore`]
// shares the same store-index / writer / verify cache as the
// install pass that runs once resolution is done. Mirrors
// pnpm's `packageRequester` shape: the fetch begins as soon as
// the resolver returns, before any further tree walk.
let store_index =
match tokio::task::spawn_blocking(move || StoreIndex::shared_readonly_in(store_dir))
.await
{
Ok(store_index) => store_index,
Err(error) => {
tracing::warn!(
target: "pacquet::install",
?error,
"store-index open task failed; continuing without a shared cache index",
);
None
}
};
let store_index_ref = store_index.as_ref();
let (store_index_writer, writer_task) = StoreIndexWriter::spawn(store_dir);
let store_index_writer_ref = Some(&store_index_writer);
let verified_files_cache = SharedVerifiedFilesCache::default();
// Wrap the resolver chain so each tarball-shaped result fires a
// background download into `tarball_mem_cache` while the
// deps-resolver continues to walk the tree. The install pass
// later calls `DownloadTarballToStore::run_with_mem_cache` for
// the same URLs and either picks up `CacheValue::Available`
// immediately or briefly blocks on the per-URL `Notify`. The
// wrapper is generic over `R: Reporter` so the spawned
// download's `pnpm:progress` emits route through the same
// reporter the install pass uses. See
// `prefetching_resolver.rs` for the full design rationale.
let resolver: Box<dyn Resolver> = Box::new(PrefetchingResolver::<Reporter>::new(
inner_resolver,
PrefetchContext {
http_client: &http_client_arc,
mem_cache: &tarball_mem_cache,
store_index: store_index_ref,
store_index_writer: Some(&store_index_writer),
verified_files_cache: &verified_files_cache,
config,
requester,
},
));
// Compile `minimumReleaseAge` (and its exclude pattern set)
// for the resolve pass. Mirrors the verifier wiring in
// `build_resolution_verifiers` so the resolver-time pick and
@@ -440,50 +499,20 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
// Drop the resolver (and its meta cache) before the install
// pass: the tree captures every `ResolveResult` we need.
// Dropping `resolver` releases the `ArcResolver` wrapper's
// strong reference to `npm_resolver`; the standalone
// `npm_resolver` binding holds a second strong reference
// because the deno- and bun-resolvers were handed a clone of
// the same `Arc` for their version-selection delegate. Drop
// both so the `NpmResolver` (and its packument cache) can be
// freed before the install pass starts allocating tarballs.
// Dropping `resolver` releases the inner resolver chain's
// strong reference to `npm_resolver` (held by the
// `ArcResolver` wrapper inside the inner chain); the
// standalone `npm_resolver` binding holds a second strong
// reference because the deno- and bun-resolvers were handed a
// clone of the same `Arc` for their version-selection
// delegate. Drop both so the `NpmResolver` (and its packument
// cache) can be freed before the install pass keeps
// allocating tarballs. The store_index / store_index_writer /
// verified_files_cache are opened above the resolver chain so
// the prefetching wrapper can share them with the install pass.
drop(resolver);
drop(npm_resolver);
// Open the read-only SQLite index once per install, shared across
// every `DownloadTarballToStore`. See the matching comment in
// `create_virtual_store.rs` for the full rationale, including the
// `JoinError`-to-cache-miss degradation (with a `warn!` so it
// stays diagnosable).
let store_index =
match tokio::task::spawn_blocking(move || StoreIndex::shared_readonly_in(store_dir))
.await
{
Ok(store_index) => store_index,
Err(error) => {
tracing::warn!(
target: "pacquet::install",
?error,
"store-index open task failed; continuing without a shared cache index",
);
None
}
};
let store_index_ref = store_index.as_ref();
// Batched store-index writer. See `create_virtual_store.rs` for
// the full rationale — we spawn once, every tarball just queues a
// row, and one writer task flushes them in batched transactions.
let (store_index_writer, writer_task) = StoreIndexWriter::spawn(store_dir);
let store_index_writer_ref = Some(&store_index_writer);
// Install-scoped `verifiedFilesCache`. See the matching block
// in `create_virtual_store.rs` for the full rationale — pnpm
// threads one `Set<string>` through every package's verify
// pass so a CAFS path stat'd for one package skips the stat
// for any later package referencing the same blob.
let verified_files_cache = SharedVerifiedFilesCache::default();
// Peer-resolution result (collected by `resolve_importer` after
// the hoist loop converged). Peer issues collected here are not
// fatal — they are reported (TODO: wire into the reporter once
@@ -546,7 +575,7 @@ impl<'a, DependencyGroupList> InstallWithFreshLockfile<'a, DependencyGroupList>
);
let install_ctx = InstallCtx {
tarball_mem_cache,
tarball_mem_cache: tarball_mem_cache.as_ref(),
http_client,
config,
graph: &peers_result.graph,

View File

@@ -23,6 +23,7 @@ mod link_bins;
mod link_file;
mod link_hoisted_modules;
mod overrides;
mod prefetching_resolver;
mod retry_config;
mod store_init;
mod symlink_direct_dependencies;
@@ -55,6 +56,7 @@ pub use link_bins::*;
pub use link_file::*;
pub use link_hoisted_modules::*;
pub use overrides::*;
pub use prefetching_resolver::*;
pub use symlink_direct_dependencies::*;
pub use symlink_package::*;
pub use version_policy::*;

View File

@@ -0,0 +1,224 @@
//! Resolver wrapper that pipelines tarball downloads with resolution.
//!
//! Upstream pnpm's [`packageRequester.requestPackage`](https://github.com/pnpm/pnpm/blob/fecaee0b35/installing/package-requester/src/packageRequester.ts#L266)
//! returns a `pkgResponse` whose `fetching` field is a Promise that is
//! already running by the time the resolver returns. Resolution of
//! children continues in parallel with that download; the install pass
//! later `await`s each `fetching` promise (which is usually already
//! resolved by then).
//!
//! Pacquet's deps-resolver crate stays pure: it walks the manifest tree
//! and returns a [`ResolveResult`] without doing any tarball I/O. To
//! match pnpm's pipelined shape, the install orchestrator wraps the
//! resolver chain with [`PrefetchingResolver`]. After the inner
//! resolver claims a wanted dep, the wrapper inspects the result and,
//! for tarball-shaped resolutions, [`tokio::spawn`]s a
//! [`DownloadTarballToStore`] in the background.
//!
//! The download lands its result in the shared [`MemCache`]. Later, when
//! [`crate::InstallPackageFromRegistry`] calls
//! [`DownloadTarballToStore::run_with_mem_cache`] for the same URL, the
//! `MemCache` either returns `CacheValue::Available` immediately (the
//! prefetch is already done) or briefly blocks on the `Notify` (the
//! prefetch is still in flight). Errors are surfaced to the install
//! path as [`TarballError::SiblingFetchFailed`].
use crate::install_package_from_registry::{extract_tarball, manifest_unpacked_size};
use crate::retry_config::retry_opts_from_config;
use pacquet_config::Config;
use pacquet_network::{AuthHeaders, ThrottledClient};
use pacquet_reporter::Reporter;
use pacquet_resolving_resolver_base::{
LatestQuery, ResolveFuture, ResolveLatestFuture, ResolveOptions, ResolveResult, Resolver,
WantedDependency,
};
use pacquet_store_dir::{
SharedReadonlyStoreIndex, SharedVerifiedFilesCache, StoreDir, StoreIndexWriter,
};
use pacquet_tarball::{DownloadTarballToStore, MemCache, RetryOpts};
use std::{marker::PhantomData, sync::Arc};
/// Borrowed-data bag handed to [`PrefetchingResolver::new`]. Everything
/// the wrapper needs to drive a background tarball download:
/// network/store handles, the shared mem cache, the
/// `verifiedFilesCache`, retry/offline knobs, and the install's
/// `requester` prefix for reporter events. The wrapper clones each
/// field into the form a `tokio::spawn`ed task can capture (`Arc` for
/// shared refs, `&'static` passes through, primitive copies).
pub struct PrefetchContext<'a> {
pub http_client: &'a Arc<ThrottledClient>,
pub mem_cache: &'a Arc<MemCache>,
pub store_index: Option<&'a SharedReadonlyStoreIndex>,
pub store_index_writer: Option<&'a Arc<StoreIndexWriter>>,
pub verified_files_cache: &'a SharedVerifiedFilesCache,
pub config: &'static Config,
pub requester: &'a str,
}
/// Owned, `'static`-friendly clones of [`PrefetchContext`] stored on
/// the wrapper. Every field is either an `Arc` clone or a `Copy`
/// scalar so each [`tokio::spawn`]ed download task can capture an
/// independent set without leaking lifetimes back into the resolver's
/// type.
struct OwnedFetchCtx {
http_client: Arc<ThrottledClient>,
mem_cache: Arc<MemCache>,
store_dir: &'static StoreDir,
store_index: Option<SharedReadonlyStoreIndex>,
store_index_writer: Option<Arc<StoreIndexWriter>>,
verified_files_cache: SharedVerifiedFilesCache,
auth_headers: Arc<AuthHeaders>,
retry_opts: RetryOpts,
requester: Arc<str>,
offline: bool,
verify_store_integrity: bool,
}
/// Wraps an inner [`Resolver`] and, after each successful resolve that
/// produces a tarball-shaped result, fires the tarball download into
/// the shared [`MemCache`] via [`tokio::spawn`]. The resolver returns
/// to the deps-resolver immediately; the download runs concurrently
/// with the rest of the tree walk.
///
/// Generic over `R: Reporter` so [`DownloadTarballToStore`]'s
/// `pnpm:progress` emits route through the same reporter the install
/// pass uses. The wrapper itself doesn't hold an `R` value (Reporter is
/// a static trait); `PhantomData` carries the type through.
pub struct PrefetchingResolver<R: Reporter> {
inner: Box<dyn Resolver>,
ctx: OwnedFetchCtx,
_phantom: PhantomData<fn() -> R>,
}
impl<R: Reporter + 'static> PrefetchingResolver<R> {
/// Build a wrapper from the install orchestrator's borrowed
/// references. Clones the necessary `Arc`s up front so the
/// per-`resolve` spawn has all the data it needs without
/// re-borrowing the install scope.
pub fn new(inner: Box<dyn Resolver>, prefetch_ctx: PrefetchContext<'_>) -> Self {
let PrefetchContext {
http_client,
mem_cache,
store_index,
store_index_writer,
verified_files_cache,
config,
requester,
} = prefetch_ctx;
let ctx = OwnedFetchCtx {
http_client: Arc::clone(http_client),
mem_cache: Arc::clone(mem_cache),
store_dir: &config.store_dir,
store_index: store_index.cloned(),
store_index_writer: store_index_writer.cloned(),
verified_files_cache: SharedVerifiedFilesCache::clone(verified_files_cache),
auth_headers: Arc::clone(&config.auth_headers),
retry_opts: retry_opts_from_config(config),
requester: Arc::<str>::from(requester),
offline: config.offline,
verify_store_integrity: config.verify_store_integrity,
};
PrefetchingResolver { inner, ctx, _phantom: PhantomData }
}
/// Inspect a fresh `ResolveResult` and, if it carries a tarball
/// URL + integrity, kick off the download as a detached
/// [`tokio::spawn`] task.
///
/// Non-tarball resolutions (git, directory, registry-shape,
/// binary, variations) and resolutions missing a structured
/// `name@version` fall through to a no-op — the install path's
/// per-protocol code path handles them.
///
/// The spawned task's result is dropped: the per-URL `MemCache`
/// stores `CacheValue::Available` (on success) or
/// `CacheValue::Failed` (on error) and any later
/// `run_with_mem_cache` call observes the right value. Surfacing
/// the error from inside the resolver would force the resolve
/// pass to abort before the rest of the tree walk completes,
/// which is the opposite of what we want for a prefetch.
fn maybe_kickoff_download(&self, result: &ResolveResult) {
// Only spawn for tarball-shaped resolutions with both URL and
// integrity. Mirrors the gate in
// `install_package_from_registry::extract_tarball`; other
// resolution shapes are not fetched through
// `DownloadTarballToStore` at all.
let Ok((package_url, integrity)) = extract_tarball(&result.resolution) else {
return;
};
// The npm picker's `dist.tarball` is the canonical URL the
// install path will look up in `MemCache`. Tarball-resolver
// and git-resolver paths can leave `name_ver` unset (they
// learn the name from the manifest only after the fetch); in
// those cases the install path's `InstallPackageFromRegistry`
// also fails, so skipping here matches the install-side
// behaviour without adding a divergence.
let Some(name_ver) = result.name_ver.as_ref() else { return };
let package_id = format!("{}@{}", name_ver.name, name_ver.suffix);
let package_url = package_url.to_string();
let package_unpacked_size = manifest_unpacked_size(result.manifest.as_ref());
let http_client = Arc::clone(&self.ctx.http_client);
let mem_cache = Arc::clone(&self.ctx.mem_cache);
let store_dir = self.ctx.store_dir;
let store_index = self.ctx.store_index.clone();
let store_index_writer = self.ctx.store_index_writer.clone();
let verified_files_cache = SharedVerifiedFilesCache::clone(&self.ctx.verified_files_cache);
let auth_headers = Arc::clone(&self.ctx.auth_headers);
let retry_opts = self.ctx.retry_opts;
let requester = Arc::clone(&self.ctx.requester);
let offline = self.ctx.offline;
let verify_store_integrity = self.ctx.verify_store_integrity;
tokio::spawn(async move {
// Result is intentionally discarded — see the doc comment
// above. The `MemCache` carries success / failure state to
// the install path.
let _ = DownloadTarballToStore {
http_client: &http_client,
store_dir,
store_index,
store_index_writer,
verify_store_integrity,
verified_files_cache,
package_integrity: &integrity,
package_unpacked_size,
package_url: &package_url,
package_id: &package_id,
requester: &requester,
prefetched_cas_paths: None,
retry_opts,
auth_headers: &auth_headers,
ignore_file_pattern: None,
offline,
}
.run_with_mem_cache::<R>(&mem_cache)
.await;
});
}
}
impl<R: Reporter + 'static> Resolver for PrefetchingResolver<R> {
fn resolve<'a>(
&'a self,
wanted_dependency: &'a WantedDependency,
opts: &'a ResolveOptions,
) -> ResolveFuture<'a> {
Box::pin(async move {
let result = self.inner.resolve(wanted_dependency, opts).await?;
if let Some(result_ref) = result.as_ref() {
self.maybe_kickoff_download(result_ref);
}
Ok(result)
})
}
fn resolve_latest<'a>(
&'a self,
query: &'a LatestQuery,
opts: &'a ResolveOptions,
) -> ResolveLatestFuture<'a> {
self.inner.resolve_latest(query, opts)
}
}