mirror of
https://github.com/pnpm/pnpm.git
synced 2026-05-24 16:46:06 -04:00
fix(pacquet/tarball): emit pnpm:progress exactly once per URL
`run_with_mem_cache`'s Available branches were re-emitting `found_in_store` on every duplicate caller for an already-known URL, so the per-package counter in pnpm's default-reporter ticked by the number of resolve occurrences instead of by 1 — `~120×` on the alotta-files benchmark, surfacing as a logged `reused: 164746` for `1362` packages. The stderr write storm also showed up directly as resolve-walk wall-clock. Upstream's [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436) attaches the emit via `.then()` on the first writer's promise; later `await`s of the same promise do not re-trigger the handler. This change matches that — only the first writer's `run_without_mem_cache` call emits. Inverts the existing `mem_cache_hit_emits_found_in_store_for_second_requester` test to `mem_cache_hit_does_not_re_emit_for_second_requester` — it now asserts the second caller emits neither `found_in_store` nor `fetched`.
This commit is contained in:
@@ -1779,7 +1779,7 @@ impl<'a> DownloadTarballToStore<'a> {
|
||||
self,
|
||||
mem_cache: &'a MemCache,
|
||||
) -> Result<Arc<HashMap<String, PathBuf>>, TarballError> {
|
||||
let &DownloadTarballToStore { package_url, package_id, requester, .. } = &self;
|
||||
let &DownloadTarballToStore { package_url, .. } = &self;
|
||||
|
||||
// QUESTION: I see no copying from existing store_dir, is there such mechanism?
|
||||
// TODO: If it's not implemented yet, implement it
|
||||
@@ -1792,18 +1792,15 @@ impl<'a> DownloadTarballToStore<'a> {
|
||||
// inner `Arc` out and drop the `Ref` immediately.
|
||||
let existing = mem_cache.get(package_url).map(|entry| Arc::clone(entry.value()));
|
||||
if let Some(cache_lock) = existing {
|
||||
// `pnpm:progress` fires exactly once per URL — only the
|
||||
// first writer's `run_without_mem_cache` call emits.
|
||||
// Mirrors pnpm's
|
||||
// [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436),
|
||||
// which attaches the emit via `.then()` on the first
|
||||
// writer's promise; later `await`s of the same promise
|
||||
// do not re-trigger the handler.
|
||||
let notify = match &*cache_lock.write().await {
|
||||
CacheValue::Available(cas_paths) => {
|
||||
// The mem cache deduplicates concurrent fetches of the
|
||||
// same tarball URL. The first requester goes through
|
||||
// `run_without_mem_cache` and emits `fetched` /
|
||||
// `found_in_store` for *its* package_id; later
|
||||
// requesters share the bytes here without reaching
|
||||
// those emit sites. From this requester's
|
||||
// perspective the package is already in the store, so
|
||||
// emit `found_in_store` so the per-package counters
|
||||
// in pnpm's reporter increment correctly.
|
||||
emit_progress_found_in_store::<Reporter>(package_id, requester);
|
||||
return Ok(Arc::clone(cas_paths));
|
||||
}
|
||||
CacheValue::InProgress(notify) => Arc::clone(notify),
|
||||
@@ -1817,14 +1814,7 @@ impl<'a> DownloadTarballToStore<'a> {
|
||||
tracing::info!(target: "pacquet::download", ?package_url, "Wait for cache");
|
||||
notify.notified().await;
|
||||
match &*cache_lock.read().await {
|
||||
CacheValue::Available(cas_paths) => {
|
||||
// Same rationale as the immediate-`Available`
|
||||
// branch above: this requester didn't drive the
|
||||
// fetch, but its package_id still needs
|
||||
// `found_in_store` for the counter to advance.
|
||||
emit_progress_found_in_store::<Reporter>(package_id, requester);
|
||||
Ok(Arc::clone(cas_paths))
|
||||
}
|
||||
CacheValue::Available(cas_paths) => Ok(Arc::clone(cas_paths)),
|
||||
CacheValue::Failed => {
|
||||
Err(TarballError::SiblingFetchFailed { url: package_url.to_string() })
|
||||
}
|
||||
|
||||
@@ -1533,20 +1533,20 @@ async fn retry_re_attaches_authorization_header_on_each_attempt() {
|
||||
drop(store_dir_keep);
|
||||
}
|
||||
|
||||
/// `run_with_mem_cache`'s in-process dedup must still fire
|
||||
/// `pnpm:progress found_in_store` for the *second* requester of a
|
||||
/// shared tarball URL. Without it, the per-package counters in
|
||||
/// pnpm's reporter would only advance for the first package sharing
|
||||
/// a URL — every later package would resolve and import successfully
|
||||
/// but never tick the "fetched" gauge.
|
||||
/// `run_with_mem_cache`'s in-process dedup emits `pnpm:progress`
|
||||
/// exactly once per URL — only the first writer's
|
||||
/// `run_without_mem_cache` call fires the event. Mirrors pnpm's
|
||||
/// [`packageRequester`](https://github.com/pnpm/pnpm/blob/086c5e91e8/installing/package-requester/src/packageRequester.ts#L410-L436):
|
||||
/// the emit is attached via `.then()` on the first writer's
|
||||
/// promise and `await`s from later callers don't re-trigger the
|
||||
/// handler.
|
||||
///
|
||||
/// Drives two `run_with_mem_cache` calls for the same URL but
|
||||
/// different `package_id`s. The first goes through
|
||||
/// `run_without_mem_cache` (network fetch + `fetched`); the second
|
||||
/// hits the immediate-`Available` branch and must emit
|
||||
/// `found_in_store` for *its* package_id.
|
||||
/// hits the immediate-`Available` branch and must emit nothing.
|
||||
#[tokio::test]
|
||||
async fn mem_cache_hit_emits_found_in_store_for_second_requester() {
|
||||
async fn mem_cache_hit_does_not_re_emit_for_second_requester() {
|
||||
use std::sync::Mutex;
|
||||
|
||||
use pacquet_reporter::{LogEvent, ProgressMessage};
|
||||
@@ -1633,16 +1633,12 @@ async fn mem_cache_hit_emits_found_in_store_for_second_requester() {
|
||||
|
||||
let captured = EVENTS.lock().unwrap();
|
||||
assert!(
|
||||
captured.iter().any(|e| matches!(
|
||||
!captured.iter().any(|e| matches!(
|
||||
e,
|
||||
LogEvent::Progress(log)
|
||||
if matches!(
|
||||
&log.message,
|
||||
ProgressMessage::FoundInStore { package_id, requester }
|
||||
if package_id == "second@2.0.0" && requester == "/proj",
|
||||
)
|
||||
if matches!(&log.message, ProgressMessage::FoundInStore { .. })
|
||||
)),
|
||||
"found_in_store must fire for the second requester's package_id; got {captured:?}",
|
||||
"found_in_store must NOT re-fire for the second requester; got {captured:?}",
|
||||
);
|
||||
assert!(
|
||||
!captured.iter().any(|e| matches!(
|
||||
|
||||
Reference in New Issue
Block a user