perf(pacquet/resolving-deps-resolver): swap tokio Mutex for std Mutex on TreeCtx

`TreeCtx`'s state (`packages`, `dependencies_tree`,
`all_peer_dep_names`, `policy_violations`, `applied_patches`) was
guarded by `tokio::sync::Mutex`. Every visit acquired 2-3 of them
behind `.lock().await`, paying the async-mutex per-acquire
overhead (registration, future polling, wake) on the resolve hot
path.

None of the critical sections hold the lock across an `.await` —
they're short `HashMap` / `HashSet` inserts and `Vec::push`es.
That's exactly the shape `std::sync::Mutex` is for. Switch the
fields, remove the `.await`s, and route every acquire through a
new `lock_recoverable` helper that mirrors the rest of the
codebase's poisoning recovery (`build_modules.rs`,
`pick_package.rs`, …): an unrelated panic shouldn't escalate into
a hard install failure since the guarded values are plain
collections with no invariants that survive a panic.

Side effects:

- `TreeCtx::snapshot` and `TreeCtx::resolved_versions` drop the
  `async` qualifier and their callers drop the `.await`. The
  closures they live in are still `async` (they await elsewhere),
  so the change is mechanical.
- `update_preferred_versions_with_ctx` becomes sync along with
  `resolved_versions`.

Tests: 50/50 in `pacquet-resolving-deps-resolver` pass. Clippy
clean.
This commit is contained in:
Zoltan Kochan
2026-05-22 00:36:35 +02:00
parent 387b8721c5
commit a7c94a905d
2 changed files with 60 additions and 32 deletions

View File

@@ -1,5 +1,5 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, Mutex, MutexGuard};
use async_recursion::async_recursion;
use derive_more::{Display, Error};
@@ -15,7 +15,17 @@ use pacquet_patching::{PatchGroupRecord, PatchKeyConflictError, get_patch_info};
use pacquet_resolving_resolver_base::{ResolveError, ResolveOptions, Resolver, WantedDependency};
use pipe_trait::Pipe;
use serde_json::Value;
use tokio::sync::Mutex;
/// Acquire a [`Mutex`] guard, recovering from poisoning the same way
/// the rest of pacquet does (`build_modules.rs`, `pick_package.rs`,
/// …). The mutexes guarded by this helper hold short HashMap /
/// HashSet inserts with no invariants that survive a panic, so the
/// install can keep going after the unrelated panic that poisoned
/// the lock — better than escalating into a hard install-wide
/// failure.
fn lock_recoverable<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
mutex.lock().unwrap_or_else(|err| err.into_inner())
}
use crate::{
node_id::NodeId,
@@ -112,11 +122,15 @@ impl From<PatchKeyConflictError> for ResolveDependencyTreeError {
///
/// Resolves siblings in parallel via `try_join_all` at every level.
/// The per-package dedupe gate is a shared `HashMap` behind a
/// [`tokio::sync::Mutex`]: a sibling already resolving an id `X` makes
/// [`std::sync::Mutex`]: a sibling already resolving an id `X` makes
/// later visitors skip the recursion the in-flight task is running and
/// reuse the eventually-populated `ResolvedPackage`. Per-occurrence
/// tree nodes are still allocated for each visit — only the
/// `ResolvedPackage` envelope is shared.
/// reuse the eventually-populated `ResolvedPackage`. The critical
/// sections are short `HashMap` inserts with no `await` inside, so
/// a sync mutex is the right tool — tokio's async mutex adds
/// per-acquire overhead that the resolve hot path was paying once
/// per visit per ctx field.
/// Per-occurrence tree nodes are still allocated for each visit —
/// only the `ResolvedPackage` envelope is shared.
pub async fn resolve_dependency_tree<DependencyGroupList, Chain>(
resolver: &Chain,
manifest: &PackageManifest,
@@ -205,13 +219,29 @@ impl TreeCtx {
/// cumulative [`DirectDep`] list (initial walk + each hoist
/// iteration's contributions) as `direct`.
pub fn into_resolved_tree(self, direct: Vec<DirectDep>) -> ResolvedTree {
// `std::sync::Mutex::into_inner` returns `Result` to surface
// poisoning; recover from it the same way the per-acquire
// `lock_recoverable` helper does so a panic in an unrelated
// task doesn't escalate into a hard install failure here.
ResolvedTree {
direct,
packages: self.packages.into_inner(),
dependencies_tree: self.dependencies_tree.into_inner(),
all_peer_dep_names: self.all_peer_dep_names.into_inner(),
policy_violations: self.policy_violations.into_inner(),
applied_patches: self.applied_patches.into_inner(),
packages: self.packages.into_inner().unwrap_or_else(|err| err.into_inner()),
dependencies_tree: self
.dependencies_tree
.into_inner()
.unwrap_or_else(|err| err.into_inner()),
all_peer_dep_names: self
.all_peer_dep_names
.into_inner()
.unwrap_or_else(|err| err.into_inner()),
policy_violations: self
.policy_violations
.into_inner()
.unwrap_or_else(|err| err.into_inner()),
applied_patches: self
.applied_patches
.into_inner()
.unwrap_or_else(|err| err.into_inner()),
}
}
@@ -219,14 +249,14 @@ impl TreeCtx {
/// `self`. The orchestrator's hoist loop snapshots after each
/// [`extend_tree`] call to run [`fn@crate::resolve_peers`] over the
/// growing tree and find missing peers to hoist next.
pub async fn snapshot(&self, direct: Vec<DirectDep>) -> ResolvedTree {
pub fn snapshot(&self, direct: Vec<DirectDep>) -> ResolvedTree {
ResolvedTree {
direct,
packages: self.packages.lock().await.clone(),
dependencies_tree: self.dependencies_tree.lock().await.clone(),
all_peer_dep_names: self.all_peer_dep_names.lock().await.clone(),
policy_violations: self.policy_violations.lock().await.clone(),
applied_patches: self.applied_patches.lock().await.clone(),
packages: lock_recoverable(&self.packages).clone(),
dependencies_tree: lock_recoverable(&self.dependencies_tree).clone(),
all_peer_dep_names: lock_recoverable(&self.all_peer_dep_names).clone(),
policy_violations: lock_recoverable(&self.policy_violations).clone(),
applied_patches: lock_recoverable(&self.applied_patches).clone(),
}
}
@@ -234,10 +264,8 @@ impl TreeCtx {
/// so far. Used by the orchestrator to keep `allPreferredVersions`
/// in sync — mirrors upstream's resolveDependency-time push at
/// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440).
pub async fn resolved_versions(&self) -> Vec<(String, String)> {
self.packages
.lock()
.await
pub fn resolved_versions(&self) -> Vec<(String, String)> {
lock_recoverable(&self.packages)
.values()
.filter_map(|pkg| {
let name_ver = pkg.result.name_ver.as_ref()?;
@@ -322,7 +350,7 @@ where
};
if let Some(violation) = result.policy_violation.clone() {
ctx.policy_violations.lock().await.push(violation);
lock_recoverable(&ctx.policy_violations).push(violation);
}
if ctx.base_opts.block_exotic_subdeps
@@ -356,7 +384,7 @@ where
// [`resolvedPkgsById[...].optional = ... && currentIsOptional`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1630)
// arm.
{
let mut packages = ctx.packages.lock().await;
let mut packages = lock_recoverable(&ctx.packages);
match packages.get_mut(&id) {
Some(existing) => {
existing.optional = existing.optional && current_is_optional;
@@ -366,7 +394,7 @@ where
// Collect peer names for the peer-resolution stage's
// `parentPkgs` filter (only peers count as parents).
{
let mut all_peers = ctx.all_peer_dep_names.lock().await;
let mut all_peers = lock_recoverable(&ctx.all_peer_dep_names);
for name in peer_dependencies.keys() {
all_peers.insert(name.clone());
}
@@ -420,7 +448,7 @@ where
let children: BTreeMap<String, NodeId> =
child_results.into_iter().flatten().map(|dep| (dep.alias, dep.node_id)).collect();
ctx.dependencies_tree.lock().await.insert(
lock_recoverable(&ctx.dependencies_tree).insert(
node_id,
DependenciesTreeNode {
resolved_package_id: id.clone(),
@@ -507,7 +535,7 @@ async fn build_pkg_id_with_patch_hash(
let Some(patch) = get_patch_info(Some(groups), &name, &version)? else {
return Ok(prefixed);
};
ctx.applied_patches.lock().await.insert(patch.key.clone());
lock_recoverable(&ctx.applied_patches).insert(patch.key.clone());
Ok(format!("{prefixed}(patch_hash={})", patch.hash))
}

View File

@@ -164,7 +164,7 @@ where
.collect();
let initial_wanted = resolve_catalog_specifiers(initial_wanted, &catalogs)?;
let mut direct = extend_tree(&ctx, resolver, initial_wanted).await?;
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
let mut parent_pkg_aliases: HashSet<String> =
direct.iter().map(|dep| dep.alias.clone()).collect();
@@ -172,7 +172,7 @@ where
loop {
loop {
let snapshot = ctx.snapshot(direct.clone()).await;
let snapshot = ctx.snapshot(direct.clone());
let peers_result = resolve_peers(&snapshot, ResolvePeersOptions::default());
let (missing_required, fresh_optional) = partition_missing_peers(
@@ -227,7 +227,7 @@ where
hoisted.into_iter().map(|(name, range)| (name, range, false)).collect();
let new_direct = extend_tree(&ctx, resolver, new_wanted).await?;
direct.extend(new_direct);
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
}
if all_missing_optional_peers.is_empty() {
@@ -249,7 +249,7 @@ where
hoisted_optional.into_iter().map(|(name, range)| (name, range, false)).collect();
let new_direct = extend_tree(&ctx, resolver, new_wanted).await?;
direct.extend(new_direct);
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
all_missing_optional_peers.clear();
}
@@ -368,8 +368,8 @@ fn build_workspace_root_deps(
/// = 'version'` assignment at
/// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440).
/// Idempotent: only inserts when no entry exists for `(name, version)`.
async fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) {
for (name, version) in ctx.resolved_versions().await {
fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) {
for (name, version) in ctx.resolved_versions() {
let bucket = preferred.entry(name).or_default();
bucket.entry(version).or_insert(VersionSelectorEntry::Plain(VersionSelectorType::Version));
}