mirror of
https://github.com/pnpm/pnpm.git
synced 2026-05-24 16:46:06 -04:00
perf(pacquet/resolving-deps-resolver): swap tokio Mutex for std Mutex on TreeCtx
`TreeCtx`'s state (`packages`, `dependencies_tree`, `all_peer_dep_names`, `policy_violations`, `applied_patches`) was guarded by `tokio::sync::Mutex`. Every visit acquired 2-3 of them behind `.lock().await`, paying the async-mutex per-acquire overhead (registration, future polling, wake) on the resolve hot path. None of the critical sections hold the lock across an `.await` — they're short `HashMap` / `HashSet` inserts and `Vec::push`es. That's exactly the shape `std::sync::Mutex` is for. Switch the fields, remove the `.await`s, and route every acquire through a new `lock_recoverable` helper that mirrors the rest of the codebase's poisoning recovery (`build_modules.rs`, `pick_package.rs`, …): an unrelated panic shouldn't escalate into a hard install failure since the guarded values are plain collections with no invariants that survive a panic. Side effects: - `TreeCtx::snapshot` and `TreeCtx::resolved_versions` drop the `async` qualifier and their callers drop the `.await`. The closures they live in are still `async` (they await elsewhere), so the change is mechanical. - `update_preferred_versions_with_ctx` becomes sync along with `resolved_versions`. Tests: 50/50 in `pacquet-resolving-deps-resolver` pass. Clippy clean.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use async_recursion::async_recursion;
|
||||
use derive_more::{Display, Error};
|
||||
@@ -15,7 +15,17 @@ use pacquet_patching::{PatchGroupRecord, PatchKeyConflictError, get_patch_info};
|
||||
use pacquet_resolving_resolver_base::{ResolveError, ResolveOptions, Resolver, WantedDependency};
|
||||
use pipe_trait::Pipe;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// Acquire a [`Mutex`] guard, recovering from poisoning the same way
|
||||
/// the rest of pacquet does (`build_modules.rs`, `pick_package.rs`,
|
||||
/// …). The mutexes guarded by this helper hold short HashMap /
|
||||
/// HashSet inserts with no invariants that survive a panic, so the
|
||||
/// install can keep going after the unrelated panic that poisoned
|
||||
/// the lock — better than escalating into a hard install-wide
|
||||
/// failure.
|
||||
fn lock_recoverable<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
|
||||
mutex.lock().unwrap_or_else(|err| err.into_inner())
|
||||
}
|
||||
|
||||
use crate::{
|
||||
node_id::NodeId,
|
||||
@@ -112,11 +122,15 @@ impl From<PatchKeyConflictError> for ResolveDependencyTreeError {
|
||||
///
|
||||
/// Resolves siblings in parallel via `try_join_all` at every level.
|
||||
/// The per-package dedupe gate is a shared `HashMap` behind a
|
||||
/// [`tokio::sync::Mutex`]: a sibling already resolving an id `X` makes
|
||||
/// [`std::sync::Mutex`]: a sibling already resolving an id `X` makes
|
||||
/// later visitors skip the recursion the in-flight task is running and
|
||||
/// reuse the eventually-populated `ResolvedPackage`. Per-occurrence
|
||||
/// tree nodes are still allocated for each visit — only the
|
||||
/// `ResolvedPackage` envelope is shared.
|
||||
/// reuse the eventually-populated `ResolvedPackage`. The critical
|
||||
/// sections are short `HashMap` inserts with no `await` inside, so
|
||||
/// a sync mutex is the right tool — tokio's async mutex adds
|
||||
/// per-acquire overhead that the resolve hot path was paying once
|
||||
/// per visit per ctx field.
|
||||
/// Per-occurrence tree nodes are still allocated for each visit —
|
||||
/// only the `ResolvedPackage` envelope is shared.
|
||||
pub async fn resolve_dependency_tree<DependencyGroupList, Chain>(
|
||||
resolver: &Chain,
|
||||
manifest: &PackageManifest,
|
||||
@@ -205,13 +219,29 @@ impl TreeCtx {
|
||||
/// cumulative [`DirectDep`] list (initial walk + each hoist
|
||||
/// iteration's contributions) as `direct`.
|
||||
pub fn into_resolved_tree(self, direct: Vec<DirectDep>) -> ResolvedTree {
|
||||
// `std::sync::Mutex::into_inner` returns `Result` to surface
|
||||
// poisoning; recover from it the same way the per-acquire
|
||||
// `lock_recoverable` helper does so a panic in an unrelated
|
||||
// task doesn't escalate into a hard install failure here.
|
||||
ResolvedTree {
|
||||
direct,
|
||||
packages: self.packages.into_inner(),
|
||||
dependencies_tree: self.dependencies_tree.into_inner(),
|
||||
all_peer_dep_names: self.all_peer_dep_names.into_inner(),
|
||||
policy_violations: self.policy_violations.into_inner(),
|
||||
applied_patches: self.applied_patches.into_inner(),
|
||||
packages: self.packages.into_inner().unwrap_or_else(|err| err.into_inner()),
|
||||
dependencies_tree: self
|
||||
.dependencies_tree
|
||||
.into_inner()
|
||||
.unwrap_or_else(|err| err.into_inner()),
|
||||
all_peer_dep_names: self
|
||||
.all_peer_dep_names
|
||||
.into_inner()
|
||||
.unwrap_or_else(|err| err.into_inner()),
|
||||
policy_violations: self
|
||||
.policy_violations
|
||||
.into_inner()
|
||||
.unwrap_or_else(|err| err.into_inner()),
|
||||
applied_patches: self
|
||||
.applied_patches
|
||||
.into_inner()
|
||||
.unwrap_or_else(|err| err.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,14 +249,14 @@ impl TreeCtx {
|
||||
/// `self`. The orchestrator's hoist loop snapshots after each
|
||||
/// [`extend_tree`] call to run [`fn@crate::resolve_peers`] over the
|
||||
/// growing tree and find missing peers to hoist next.
|
||||
pub async fn snapshot(&self, direct: Vec<DirectDep>) -> ResolvedTree {
|
||||
pub fn snapshot(&self, direct: Vec<DirectDep>) -> ResolvedTree {
|
||||
ResolvedTree {
|
||||
direct,
|
||||
packages: self.packages.lock().await.clone(),
|
||||
dependencies_tree: self.dependencies_tree.lock().await.clone(),
|
||||
all_peer_dep_names: self.all_peer_dep_names.lock().await.clone(),
|
||||
policy_violations: self.policy_violations.lock().await.clone(),
|
||||
applied_patches: self.applied_patches.lock().await.clone(),
|
||||
packages: lock_recoverable(&self.packages).clone(),
|
||||
dependencies_tree: lock_recoverable(&self.dependencies_tree).clone(),
|
||||
all_peer_dep_names: lock_recoverable(&self.all_peer_dep_names).clone(),
|
||||
policy_violations: lock_recoverable(&self.policy_violations).clone(),
|
||||
applied_patches: lock_recoverable(&self.applied_patches).clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,10 +264,8 @@ impl TreeCtx {
|
||||
/// so far. Used by the orchestrator to keep `allPreferredVersions`
|
||||
/// in sync — mirrors upstream's resolveDependency-time push at
|
||||
/// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440).
|
||||
pub async fn resolved_versions(&self) -> Vec<(String, String)> {
|
||||
self.packages
|
||||
.lock()
|
||||
.await
|
||||
pub fn resolved_versions(&self) -> Vec<(String, String)> {
|
||||
lock_recoverable(&self.packages)
|
||||
.values()
|
||||
.filter_map(|pkg| {
|
||||
let name_ver = pkg.result.name_ver.as_ref()?;
|
||||
@@ -322,7 +350,7 @@ where
|
||||
};
|
||||
|
||||
if let Some(violation) = result.policy_violation.clone() {
|
||||
ctx.policy_violations.lock().await.push(violation);
|
||||
lock_recoverable(&ctx.policy_violations).push(violation);
|
||||
}
|
||||
|
||||
if ctx.base_opts.block_exotic_subdeps
|
||||
@@ -356,7 +384,7 @@ where
|
||||
// [`resolvedPkgsById[...].optional = ... && currentIsOptional`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1630)
|
||||
// arm.
|
||||
{
|
||||
let mut packages = ctx.packages.lock().await;
|
||||
let mut packages = lock_recoverable(&ctx.packages);
|
||||
match packages.get_mut(&id) {
|
||||
Some(existing) => {
|
||||
existing.optional = existing.optional && current_is_optional;
|
||||
@@ -366,7 +394,7 @@ where
|
||||
// Collect peer names for the peer-resolution stage's
|
||||
// `parentPkgs` filter (only peers count as parents).
|
||||
{
|
||||
let mut all_peers = ctx.all_peer_dep_names.lock().await;
|
||||
let mut all_peers = lock_recoverable(&ctx.all_peer_dep_names);
|
||||
for name in peer_dependencies.keys() {
|
||||
all_peers.insert(name.clone());
|
||||
}
|
||||
@@ -420,7 +448,7 @@ where
|
||||
let children: BTreeMap<String, NodeId> =
|
||||
child_results.into_iter().flatten().map(|dep| (dep.alias, dep.node_id)).collect();
|
||||
|
||||
ctx.dependencies_tree.lock().await.insert(
|
||||
lock_recoverable(&ctx.dependencies_tree).insert(
|
||||
node_id,
|
||||
DependenciesTreeNode {
|
||||
resolved_package_id: id.clone(),
|
||||
@@ -507,7 +535,7 @@ async fn build_pkg_id_with_patch_hash(
|
||||
let Some(patch) = get_patch_info(Some(groups), &name, &version)? else {
|
||||
return Ok(prefixed);
|
||||
};
|
||||
ctx.applied_patches.lock().await.insert(patch.key.clone());
|
||||
lock_recoverable(&ctx.applied_patches).insert(patch.key.clone());
|
||||
Ok(format!("{prefixed}(patch_hash={})", patch.hash))
|
||||
}
|
||||
|
||||
|
||||
@@ -164,7 +164,7 @@ where
|
||||
.collect();
|
||||
let initial_wanted = resolve_catalog_specifiers(initial_wanted, &catalogs)?;
|
||||
let mut direct = extend_tree(&ctx, resolver, initial_wanted).await?;
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
|
||||
|
||||
let mut parent_pkg_aliases: HashSet<String> =
|
||||
direct.iter().map(|dep| dep.alias.clone()).collect();
|
||||
@@ -172,7 +172,7 @@ where
|
||||
|
||||
loop {
|
||||
loop {
|
||||
let snapshot = ctx.snapshot(direct.clone()).await;
|
||||
let snapshot = ctx.snapshot(direct.clone());
|
||||
let peers_result = resolve_peers(&snapshot, ResolvePeersOptions::default());
|
||||
|
||||
let (missing_required, fresh_optional) = partition_missing_peers(
|
||||
@@ -227,7 +227,7 @@ where
|
||||
hoisted.into_iter().map(|(name, range)| (name, range, false)).collect();
|
||||
let new_direct = extend_tree(&ctx, resolver, new_wanted).await?;
|
||||
direct.extend(new_direct);
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
|
||||
}
|
||||
|
||||
if all_missing_optional_peers.is_empty() {
|
||||
@@ -249,7 +249,7 @@ where
|
||||
hoisted_optional.into_iter().map(|(name, range)| (name, range, false)).collect();
|
||||
let new_direct = extend_tree(&ctx, resolver, new_wanted).await?;
|
||||
direct.extend(new_direct);
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions).await;
|
||||
update_preferred_versions_with_ctx(&ctx, &mut all_preferred_versions);
|
||||
all_missing_optional_peers.clear();
|
||||
}
|
||||
|
||||
@@ -368,8 +368,8 @@ fn build_workspace_root_deps(
|
||||
/// = 'version'` assignment at
|
||||
/// [`resolveDependencies.ts:1440`](https://github.com/pnpm/pnpm/blob/097983fbca/installing/deps-resolver/src/resolveDependencies.ts#L1440).
|
||||
/// Idempotent: only inserts when no entry exists for `(name, version)`.
|
||||
async fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) {
|
||||
for (name, version) in ctx.resolved_versions().await {
|
||||
fn update_preferred_versions_with_ctx(ctx: &TreeCtx, preferred: &mut PreferredVersions) {
|
||||
for (name, version) in ctx.resolved_versions() {
|
||||
let bucket = preferred.entry(name).or_default();
|
||||
bucket.entry(version).or_insert(VersionSelectorEntry::Plain(VersionSelectorType::Version));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user