From e54208e1ed087722d5d8a8e34225e58ac89b2022 Mon Sep 17 00:00:00 2001 From: Zoltan Kochan Date: Thu, 21 May 2026 23:09:46 +0200 Subject: [PATCH] fix(pacquet/tarball): emit pnpm:progress exactly once per URL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `run_with_mem_cache`'s Available branches were re-emitting `found_in_store` on every duplicate caller for an already-known URL, so the per-package counter in pnpm's default-reporter ticked by the number of resolve occurrences instead of by 1 — `~120×` on the alotta-files benchmark, surfacing as a logged `reused: 164746` for `1362` packages. The stderr write storm also showed up directly as resolve-walk wall-clock. Upstream's [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436) attaches the emit via `.then()` on the first writer's promise; later `await`s of the same promise do not re-trigger the handler. This change matches that — only the first writer's `run_without_mem_cache` call emits. Inverts the existing `mem_cache_hit_emits_found_in_store_for_second_requester` test to `mem_cache_hit_does_not_re_emit_for_second_requester` — it now asserts the second caller emits neither `found_in_store` nor `fetched`. --- pacquet/crates/tarball/src/lib.rs | 28 +++++++++------------------- pacquet/crates/tarball/src/tests.rs | 28 ++++++++++++---------------- 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/pacquet/crates/tarball/src/lib.rs b/pacquet/crates/tarball/src/lib.rs index 4ca54afcb6..c1fbf7e4a4 100644 --- a/pacquet/crates/tarball/src/lib.rs +++ b/pacquet/crates/tarball/src/lib.rs @@ -1779,7 +1779,7 @@ impl<'a> DownloadTarballToStore<'a> { self, mem_cache: &'a MemCache, ) -> Result>, TarballError> { - let &DownloadTarballToStore { package_url, package_id, requester, .. } = &self; + let &DownloadTarballToStore { package_url, .. } = &self; // QUESTION: I see no copying from existing store_dir, is there such mechanism? // TODO: If it's not implemented yet, implement it @@ -1792,18 +1792,15 @@ impl<'a> DownloadTarballToStore<'a> { // inner `Arc` out and drop the `Ref` immediately. let existing = mem_cache.get(package_url).map(|entry| Arc::clone(entry.value())); if let Some(cache_lock) = existing { + // `pnpm:progress` fires exactly once per URL — only the + // first writer's `run_without_mem_cache` call emits. + // Mirrors pnpm's + // [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436), + // which attaches the emit via `.then()` on the first + // writer's promise; later `await`s of the same promise + // do not re-trigger the handler. let notify = match &*cache_lock.write().await { CacheValue::Available(cas_paths) => { - // The mem cache deduplicates concurrent fetches of the - // same tarball URL. The first requester goes through - // `run_without_mem_cache` and emits `fetched` / - // `found_in_store` for *its* package_id; later - // requesters share the bytes here without reaching - // those emit sites. From this requester's - // perspective the package is already in the store, so - // emit `found_in_store` so the per-package counters - // in pnpm's reporter increment correctly. - emit_progress_found_in_store::(package_id, requester); return Ok(Arc::clone(cas_paths)); } CacheValue::InProgress(notify) => Arc::clone(notify), @@ -1817,14 +1814,7 @@ impl<'a> DownloadTarballToStore<'a> { tracing::info!(target: "pacquet::download", ?package_url, "Wait for cache"); notify.notified().await; match &*cache_lock.read().await { - CacheValue::Available(cas_paths) => { - // Same rationale as the immediate-`Available` - // branch above: this requester didn't drive the - // fetch, but its package_id still needs - // `found_in_store` for the counter to advance. - emit_progress_found_in_store::(package_id, requester); - Ok(Arc::clone(cas_paths)) - } + CacheValue::Available(cas_paths) => Ok(Arc::clone(cas_paths)), CacheValue::Failed => { Err(TarballError::SiblingFetchFailed { url: package_url.to_string() }) } diff --git a/pacquet/crates/tarball/src/tests.rs b/pacquet/crates/tarball/src/tests.rs index 68cf0322b8..0dd08e1e83 100644 --- a/pacquet/crates/tarball/src/tests.rs +++ b/pacquet/crates/tarball/src/tests.rs @@ -1533,20 +1533,20 @@ async fn retry_re_attaches_authorization_header_on_each_attempt() { drop(store_dir_keep); } -/// `run_with_mem_cache`'s in-process dedup must still fire -/// `pnpm:progress found_in_store` for the *second* requester of a -/// shared tarball URL. Without it, the per-package counters in -/// pnpm's reporter would only advance for the first package sharing -/// a URL — every later package would resolve and import successfully -/// but never tick the "fetched" gauge. +/// `run_with_mem_cache`'s in-process dedup emits `pnpm:progress` +/// exactly once per URL — only the first writer's +/// `run_without_mem_cache` call fires the event. Mirrors pnpm's +/// [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436): +/// the emit is attached via `.then()` on the first writer's +/// promise and `await`s from later callers don't re-trigger the +/// handler. /// /// Drives two `run_with_mem_cache` calls for the same URL but /// different `package_id`s. The first goes through /// `run_without_mem_cache` (network fetch + `fetched`); the second -/// hits the immediate-`Available` branch and must emit -/// `found_in_store` for *its* package_id. +/// hits the immediate-`Available` branch and must emit nothing. #[tokio::test] -async fn mem_cache_hit_emits_found_in_store_for_second_requester() { +async fn mem_cache_hit_does_not_re_emit_for_second_requester() { use std::sync::Mutex; use pacquet_reporter::{LogEvent, ProgressMessage}; @@ -1633,16 +1633,12 @@ async fn mem_cache_hit_emits_found_in_store_for_second_requester() { let captured = EVENTS.lock().unwrap(); assert!( - captured.iter().any(|e| matches!( + !captured.iter().any(|e| matches!( e, LogEvent::Progress(log) - if matches!( - &log.message, - ProgressMessage::FoundInStore { package_id, requester } - if package_id == "second@2.0.0" && requester == "/proj", - ) + if matches!(&log.message, ProgressMessage::FoundInStore { .. }) )), - "found_in_store must fire for the second requester's package_id; got {captured:?}", + "found_in_store must NOT re-fire for the second requester; got {captured:?}", ); assert!( !captured.iter().any(|e| matches!(