From a7c94a905dee77a5231b976daddd36d60db352d3 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Fri, 22 May 2026 00:36:35 +0200 Subject: [PATCH] perf(pacquet/resolving-deps-resolver): swap tokio Mutex for std Mutex on TreeCtx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `TreeCtx`'s state (`packages`, `dependencies_tree`, `all_peer_dep_names`, `policy_violations`, `applied_patches`) was guarded by `tokio::sync::Mutex`. Every visit acquired 2-3 of them behind `.lock().await`, paying the async-mutex per-acquire overhead (registration, future polling, wake) on the resolve hot path. None of the critical sections hold the lock across an `.await` — they're short `HashMap` / `HashSet` inserts and `Vec::push`es. That's exactly the shape `std::sync::Mutex` is for. Switch the fields, remove the `.await`s, and route every acquire through a new `lock_recoverable` helper that mirrors the rest of the codebase's poisoning recovery (`build_modules.rs`, `pick_package.rs`, …): an unrelated panic shouldn't escalate into a hard install failure since the guarded values are plain collections with no invariants that survive a panic. Side effects: - `TreeCtx::snapshot` and `TreeCtx::resolved_versions` drop the `async` qualifier and their callers drop the `.await`. The closures they live in are still `async` (they await elsewhere), so the change is mechanical. - `update_preferred_versions_with_ctx` becomes sync along with `resolved_versions`. Tests: 50/50 in `pacquet-resolving-deps-resolver` pass. Clippy clean. --- .../src/resolve_dependency_tree.rs | 80 +++++++++++++------ .../src/resolve_importer.rs | 12 +-- 2 files changed, 60 insertions(+), 32 deletions(-) diff --git a/pacquet/crates/resolving-deps-resolver/src/resolve_dependency_tree.rs b/pacquet/crates/resolving-deps-resolver/src/resolve_dependency_tree.rs index 0c62f8cfba..c3303413e3 100644 --- a/pacquet/crates/resolving-deps-resolver/src/resolve_dependency_tree.rs +++ b/pacquet/crates/resolving-deps-resolver/src/resolve_dependency_tree.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, Mutex, MutexGuard}; use async_recursion::async_recursion; use derive_more::{Display, Error}; @@ -15,7 +15,17 @@ use pacquet_patching::{PatchGroupRecord, PatchKeyConflictError, get_patch_info}; use pacquet_resolving_resolver_base::{ResolveError, ResolveOptions, Resolver, WantedDependency}; use pipe_trait::Pipe; use serde_json::Value; -use tokio::sync::Mutex; + +/// Acquire a [`Mutex`] guard, recovering from poisoning the same way +/// the rest of pacquet does (`build_modules.rs`, `pick_package.rs`, +/// …). The mutexes guarded by this helper hold short HashMap / +/// HashSet inserts with no invariants that survive a panic, so the +/// install can keep going after the unrelated panic that poisoned +/// the lock — better than escalating into a hard install-wide +/// failure. +fn lock_recoverable(mutex: &Mutex) -> MutexGuard<'_, T> { + mutex.lock().unwrap_or_else(|err| err.into_inner()) +} use crate::{ node_id::NodeId, @@ -112,11 +122,15 @@ impl From for ResolveDependencyTreeError { /// /// Resolves siblings in parallel via `try_join_all` at every level. /// The per-package dedupe gate is a shared `HashMap` behind a -/// [`tokio::sync::Mutex`]: a sibling already resolving an id `X` makes +/// [`std::sync::Mutex`]: a sibling already resolving an id `X` makes /// later visitors skip the recursion the in-flight task is running and -/// reuse the eventually-populated `ResolvedPackage`. Per-occurrence -/// tree nodes are still allocated for each visit — only the -/// `ResolvedPackage` envelope is shared. +/// reuse the eventually-populated `ResolvedPackage`. The critical +/// sections are short `HashMap` inserts with no `await` inside, so +/// a sync mutex is the right tool — tokio's async mutex adds +/// per-acquire overhead that the resolve hot path was paying once +/// per visit per ctx field. +/// Per-occurrence tree nodes are still allocated for each visit — +/// only the `ResolvedPackage` envelope is shared. pub async fn resolve_dependency_tree( resolver: &Chain, manifest: &PackageManifest, @@ -205,13 +219,29 @@ impl TreeCtx { /// cumulative [`DirectDep`] list (initial walk + each hoist /// iteration's contributions) as `direct`. pub fn into_resolved_tree(self, direct: Vec) -> ResolvedTree { + // `std::sync::Mutex::into_inner` returns `Result` to surface + // poisoning; recover from it the same way the per-acquire + // `lock_recoverable` helper does so a panic in an unrelated + // task doesn't escalate into a hard install failure here. ResolvedTree { direct, - packages: self.packages.into_inner(), - dependencies_tree: self.dependencies_tree.into_inner(), - all_peer_dep_names: self.all_peer_dep_names.into_inner(), - policy_violations: self.policy_violations.into_inner(), - applied_patches: self.applied_patches.into_inner(), + packages: self.packages.into_inner().unwrap_or_else(|err| err.into_inner()), + dependencies_tree: self + .dependencies_tree + .into_inner() + .unwrap_or_else(|err| err.into_inner()), + all_peer_dep_names: self + .all_peer_dep_names + .into_inner() + .unwrap_or_else(|err| err.into_inner()), + policy_violations: self + .policy_violations + .into_inner() + .unwrap_or_else(|err| err.into_inner()), + applied_patches: self + .applied_patches + .into_inner() + .unwrap_or_else(|err| err.into_inner()), } } @@ -219,14 +249,14 @@ impl TreeCtx { /// `self`. The orchestrator's hoist loop snapshots after each /// [`extend_tree`] call to run [`fn@crate::resolve_peers`] over the /// growing tree and find missing peers to hoist next. - pub async fn snapshot(&self, direct: Vec) -> ResolvedTree { + pub fn snapshot(&self, direct: Vec) -> ResolvedTree { ResolvedTree { direct, - packages: self.packages.lock().await.clone(), - dependencies_tree: self.dependencies_tree.lock().await.clone(), - all_peer_dep_names: self.all_peer_dep_names.lock().await.clone(), - policy_violations: self.policy_violations.lock().await.clone(), - applied_patches: self.applied_patches.lock().await.clone(), + packages: lock_recoverable(&self.packages).clone(), + dependencies_tree: lock_recoverable(&self.dependencies_tree).clone(), + all_peer_dep_names: lock_recoverable(&self.all_peer_dep_names).clone(), + policy_violations: lock_recoverable(&self.policy_violations).clone(), + applied_patches: lock_recoverable(&self.applied_patches).clone(), } } @@ -234,10 +264,8 @@ impl TreeCtx { /// so far. Used by the orchestrator to keep `allPreferredVersions` /// in sync — mirrors upstream's resolveDependency-time push at /// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440). - pub async fn resolved_versions(&self) -> Vec<(String, String)> { - self.packages - .lock() - .await + pub fn resolved_versions(&self) -> Vec<(String, String)> { + lock_recoverable(&self.packages) .values() .filter_map(|pkg| { let name_ver = pkg.result.name_ver.as_ref()?; @@ -322,7 +350,7 @@ where }; if let Some(violation) = result.policy_violation.clone() { - ctx.policy_violations.lock().await.push(violation); + lock_recoverable(&ctx.policy_violations).push(violation); } if ctx.base_opts.block_exotic_subdeps @@ -356,7 +384,7 @@ where // [`resolvedPkgsById[...].optional = ... && currentIsOptional`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1630) // arm. { - let mut packages = ctx.packages.lock().await; + let mut packages = lock_recoverable(&ctx.packages); match packages.get_mut(&id) { Some(existing) => { existing.optional = existing.optional && current_is_optional; @@ -366,7 +394,7 @@ where // Collect peer names for the peer-resolution stage's // `parentPkgs` filter (only peers count as parents). { - let mut all_peers = ctx.all_peer_dep_names.lock().await; + let mut all_peers = lock_recoverable(&ctx.all_peer_dep_names); for name in peer_dependencies.keys() { all_peers.insert(name.clone()); } @@ -420,7 +448,7 @@ where let children: BTreeMap = child_results.into_iter().flatten().map(|dep| (dep.alias, dep.node_id)).collect(); - ctx.dependencies_tree.lock().await.insert( + lock_recoverable(&ctx.dependencies_tree).insert( node_id, DependenciesTreeNode { resolved_package_id: id.clone(), @@ -507,7 +535,7 @@ async fn build_pkg_id_with_patch_hash( let Some(patch) = get_patch_info(Some(groups), &name, &version)? else { return Ok(prefixed); }; - ctx.applied_patches.lock().await.insert(patch.key.clone()); + lock_recoverable(&ctx.applied_patches).insert(patch.key.clone()); Ok(format!("{prefixed}(patch_hash={})", patch.hash)) } diff --git a/pacquet/crates/resolving-deps-resolver/src/resolve_importer.rs b/pacquet/crates/resolving-deps-resolver/src/resolve_importer.rs index 526743bb13..bcdcd97c1a 100644 --- a/pacquet/crates/resolving-deps-resolver/src/resolve_importer.rs +++ b/pacquet/crates/resolving-deps-resolver/src/resolve_importer.rs @@ -164,7 +164,7 @@ where .collect(); let initial_wanted = resolve_catalog_specifiers(initial_wanted, &catalogs)?; let mut direct = extend_tree(&ctx, resolver, initial_wanted).await?; - update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await; + update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions); let mut parent_pkg_aliases: HashSet = direct.iter().map(|dep| dep.alias.clone()).collect(); @@ -172,7 +172,7 @@ where loop { loop { - let snapshot = ctx.snapshot(direct.clone()).await; + let snapshot = ctx.snapshot(direct.clone()); let peers_result = resolve_peers(&snapshot, ResolvePeersOptions::default()); let (missing_required, fresh_optional) = partition_missing_peers( @@ -227,7 +227,7 @@ where hoisted.into_iter().map(|(name, range)| (name, range, false)).collect(); let new_direct = extend_tree(&ctx, resolver, new_wanted).await?; direct.extend(new_direct); - update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await; + update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions); } if all_missing_optional_peers.is_empty() { @@ -249,7 +249,7 @@ where hoisted_optional.into_iter().map(|(name, range)| (name, range, false)).collect(); let new_direct = extend_tree(&ctx, resolver, new_wanted).await?; direct.extend(new_direct); - update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await; + update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions); all_missing_optional_peers.clear(); } @@ -368,8 +368,8 @@ fn build_workspace_root_deps( /// = 'version'` assignment at /// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440). /// Idempotent: only inserts when no entry exists for `(name, version)`. -async fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) { - for (name, version) in ctx.resolved_versions().await { +fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) { + for (name, version) in ctx.resolved_versions() { let bucket = preferred.entry(name).or_default(); bucket.entry(version).or_insert(VersionSelectorEntry::Plain(VersionSelectorType::Version)); }