fix(pacquet/tarball): emit pnpm:progress exactly once per URL

`run_with_mem_cache`'s Available branches were re-emitting
`found_in_store` on every duplicate caller for an already-known
URL, so the per-package counter in pnpm's default-reporter ticked
by the number of resolve occurrences instead of by 1 — `~120×` on
the alotta-files benchmark, surfacing as a logged
`reused: 164746` for `1362` packages. The stderr write storm also
showed up directly as resolve-walk wall-clock.

Upstream's
[`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436)
attaches the emit via `.then()` on the first writer's promise;
later `await`s of the same promise do not re-trigger the handler.
This change matches that — only the first writer's
`run_without_mem_cache` call emits.

Inverts the existing
`mem_cache_hit_emits_found_in_store_for_second_requester` test to
`mem_cache_hit_does_not_re_emit_for_second_requester` — it now
asserts the second caller emits neither `found_in_store` nor
`fetched`.
This commit is contained in:
Zoltan Kochan
2026-05-21 23:09:46 +02:00
parent 6cb50b4fe5
commit e54208e1ed
2 changed files with 21 additions and 35 deletions

View File

@@ -1779,7 +1779,7 @@ impl<'a> DownloadTarballToStore<'a> {
self,
mem_cache: &'a MemCache,
) -> Result<Arc<HashMap<String, PathBuf>>, TarballError> {
let &DownloadTarballToStore { package_url, package_id, requester, .. } = &self;
let &DownloadTarballToStore { package_url, .. } = &self;
// QUESTION: I see no copying from existing store_dir, is there such mechanism?
// TODO: If it's not implemented yet, implement it
@@ -1792,18 +1792,15 @@ impl<'a> DownloadTarballToStore<'a> {
// inner `Arc` out and drop the `Ref` immediately.
let existing = mem_cache.get(package_url).map(|entry| Arc::clone(entry.value()));
if let Some(cache_lock) = existing {
// `pnpm:progress` fires exactly once per URL — only the
// first writer's `run_without_mem_cache` call emits.
// Mirrors pnpm's
// [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436),
// which attaches the emit via `.then()` on the first
// writer's promise; later `await`s of the same promise
// do not re-trigger the handler.
let notify = match &*cache_lock.write().await {
CacheValue::Available(cas_paths) => {
// The mem cache deduplicates concurrent fetches of the
// same tarball URL. The first requester goes through
// `run_without_mem_cache` and emits `fetched` /
// `found_in_store` for *its* package_id; later
// requesters share the bytes here without reaching
// those emit sites. From this requester's
// perspective the package is already in the store, so
// emit `found_in_store` so the per-package counters
// in pnpm's reporter increment correctly.
emit_progress_found_in_store::<Reporter>(package_id, requester);
return Ok(Arc::clone(cas_paths));
}
CacheValue::InProgress(notify) => Arc::clone(notify),
@@ -1817,14 +1814,7 @@ impl<'a> DownloadTarballToStore<'a> {
tracing::info!(target: "pacquet::download", ?package_url, "Wait for cache");
notify.notified().await;
match &*cache_lock.read().await {
CacheValue::Available(cas_paths) => {
// Same rationale as the immediate-`Available`
// branch above: this requester didn't drive the
// fetch, but its package_id still needs
// `found_in_store` for the counter to advance.
emit_progress_found_in_store::<Reporter>(package_id, requester);
Ok(Arc::clone(cas_paths))
}
CacheValue::Available(cas_paths) => Ok(Arc::clone(cas_paths)),
CacheValue::Failed => {
Err(TarballError::SiblingFetchFailed { url: package_url.to_string() })
}

View File

@@ -1533,20 +1533,20 @@ async fn retry_re_attaches_authorization_header_on_each_attempt() {
drop(store_dir_keep);
}
/// `run_with_mem_cache`'s in-process dedup must still fire
/// `pnpm:progress found_in_store` for the *second* requester of a
/// shared tarball URL. Without it, the per-package counters in
/// pnpm's reporter would only advance for the first package sharing
/// a URL — every later package would resolve and import successfully
/// but never tick the "fetched" gauge.
/// `run_with_mem_cache`'s in-process dedup emits `pnpm:progress`
/// exactly once per URL — only the first writer's
/// `run_without_mem_cache` call fires the event. Mirrors pnpm's
/// [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436):
/// the emit is attached via `.then()` on the first writer's
/// promise and `await`s from later callers don't re-trigger the
/// handler.
///
/// Drives two `run_with_mem_cache` calls for the same URL but
/// different `package_id`s. The first goes through
/// `run_without_mem_cache` (network fetch + `fetched`); the second
/// hits the immediate-`Available` branch and must emit
/// `found_in_store` for *its* package_id.
/// hits the immediate-`Available` branch and must emit nothing.
#[tokio::test]
async fn mem_cache_hit_emits_found_in_store_for_second_requester() {
async fn mem_cache_hit_does_not_re_emit_for_second_requester() {
use std::sync::Mutex;
use pacquet_reporter::{LogEvent, ProgressMessage};
@@ -1633,16 +1633,12 @@ async fn mem_cache_hit_emits_found_in_store_for_second_requester() {
let captured = EVENTS.lock().unwrap();
assert!(
captured.iter().any(|e| matches!(
!captured.iter().any(|e| matches!(
e,
LogEvent::Progress(log)
if matches!(
&log.message,
ProgressMessage::FoundInStore { package_id, requester }
if package_id == "second@2.0.0" && requester == "/proj",
)
if matches!(&log.message, ProgressMessage::FoundInStore { .. })
)),
"found_in_store must fire for the second requester's package_id; got {captured:?}",
"found_in_store must NOT re-fire for the second requester; got {captured:?}",
);
assert!(
!captured.iter().any(|e| matches!(