fix(pacquet/fs): serialize concurrent CAS writes to the same path (#11758)

* fix(pacquet/fs): serialize concurrent CAS writes to the same path

Two install snapshots whose tarballs ship identical file content
(e.g. a shared LICENSE across sibling packages like
`@pnpm.e2e/hello-world-js-bin` and `@pnpm.e2e/hello-world-js-bin-parent`)
hash to the same CAS path and call `ensure_file` concurrently. Without
serialization the second writer's `O_CREAT|O_EXCL` hits `AlreadyExists`
while the first writer is mid-`write_all`, falls into
`verify_or_rewrite`'s `meta.len() != content.len()` arm, and runs
`write_atomic` — which renames a temp file over the live source.
On Linux/ext4 the partial-size observation is fast enough that this
window opens often, surfacing as flaky CI failures of the form:

    failed to import "<store>/v11/files/65/<hash>" to ".../LICENSE":
    No such file or directory (os error 2)

emitted by `link_file` whose `reflink`/`fs::hard_link` raced the
rename. macOS/APFS and Windows tests pass because their `stat` cadence
and CI runner parallelism don't reliably open the window.

Port pnpm v11's `locker: Map<string, number>` semantics — slightly
stronger in pacquet, as a per-path `Mutex` rather than a dedup cache —
so two writers of the same CAS path serialize through it. The second
caller acquires the lock after the first writer's `write_all` has
finished, then takes the byte-match fast path inside `verify_or_rewrite`
and never has to rewrite. The previous docstring at the bottom of the
"Differences from pnpm" list explicitly acknowledged the locker
omission and predicted it would matter; this change closes that gap.

Add a regression test (`concurrent_writers_of_same_path_do_not_swap_the_inode`)
that fires 32 threads at one path with identical content and asserts
the inode never swaps — the observable signal that no writer ever took
the `write_atomic` rename path.

* fix(pacquet/fs): unlink intra-doc references to private items

`ensure_file` is public; references to private `verify_or_rewrite`,
`write_atomic`, and `cas_write_lock` only resolve under
`--document-private-items`, which trips `rustdoc::private-intra-doc-links`
under `-D warnings`. Drop the link form and keep the names as plain
backticked identifiers — the docstring still reads correctly and
`cargo doc` no longer fails.

* style(pacquet/fs): rename single-letter N to WRITER_COUNT in test

Address review feedback on #11758 — single-letter constants are
opaque; `WRITER_COUNT` reads as the loop count of concurrent
writers the test fires at one CAS path.

* docs(pacquet/fs): tighten ensure_file / cas_write_lock / test docs

Address review feedback on #11758: drop body-narrating prose, keep
only the contract and the non-obvious why. The test docstring no
longer references "pre-fix code" so it reads independent of this
PR's history.

* test(pacquet/fs): pre-create + per-thread inode capture in concurrent test

Address CodeRabbit's and Copilot's review feedback on #11758. The
previous assert compared two metadata reads taken at the same moment
after join — tautological. Pre-creating the file gives an
`original_ino` reference taken before the contended run, and each
writer also captures the path's inode immediately after its own
`ensure_file` returns so a mid-run rename swap from another writer
is visible to at least one of those observations.

* style(pacquet/fs): add trailing comma in multi-line assert! macro

Address `perfectionist::macro_trailing_comma` warning surfaced by
the Dylint CI on #11758 — multi-line macro invocations must end with
a trailing comma. (My earlier local `dylint --all` run missed this
because the perfectionist library hadn't fully recompiled against the
new test code.)

* test(pacquet/fs): drop pre-create from concurrent writer test

Address Copilot review feedback on #11758. The pre-create made every
contender take the byte-match fast path against a complete file, so
the lock made no observable difference and the test couldn't even
weakly distinguish lock from no-lock. Removing it leaves the
fresh-dirent shape (`O_CREAT|O_EXCL` race + verify_or_rewrite for
the rest), and the per-thread inode observations can now diverge
under a multi-rename race without the lock.

Honest about the limitation in the docstring: any observation taken
after `ensure_file` returns has already missed the rename window, so
a single-rename race converges on one inode and slips past. The
test catches the multi-rename case and validates the "no deadlock,
all writers see correct content" baseline.

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Khải
2026-05-20 16:57:00 +07:00
committed by GitHub
parent 3a54205178
commit df77f649ee
4 changed files with 96 additions and 7 deletions

1
Cargo.lock generated
View File

@@ -2088,6 +2088,7 @@ dependencies = [
name = "pacquet-fs"
version = "0.0.1"
dependencies = [
"dashmap",
"derive_more",
"dunce",
"junction",

View File

@@ -11,6 +11,7 @@ license.workspace = true
repository.workspace = true
[dependencies]
dashmap = { workspace = true }
derive_more = { workspace = true }
miette = { workspace = true }
pathdiff = { workspace = true }

View File

@@ -1,10 +1,14 @@
use dashmap::DashMap;
use derive_more::{Display, Error};
use miette::Diagnostic;
use std::{
fs::{self, File, OpenOptions},
io::{self, Write},
path::{Path, PathBuf},
sync::atomic::{AtomicU64, Ordering},
sync::{
Arc, Mutex, OnceLock,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
@@ -148,12 +152,13 @@ pub fn ensure_parent_dir(dir: &Path) -> Result<(), EnsureFileError> {
/// buffer we were about to write, so comparing against it
/// implicitly verifies the sha512 without a second hash pass. Same
/// correctness guarantee, one fewer full-buffer walk.
/// * **No `locker: Map<string, number>` process-local cache**: pnpm's
/// locker skips re-verifying the same file within one install.
/// Pacquet's hot path calls `ensure_file` at most once per CAS file
/// per install (the `StoreIndex` cache decides whether we even get
/// here), so the locker would be mostly empty work. Can revisit if
/// profiling shows repeated AlreadyExists hits on a single path.
/// * **Process-local per-path mutex for serialization**: two
/// snapshots whose tarballs ship identical file content
/// (e.g. a shared `LICENSE`) compute the same CAS path and would
/// race in `verify_or_rewrite`. The mutex makes the second
/// writer wait for the first's `write_all` so the byte-match
/// fast path always applies. Pacquet's stronger form of pnpm
/// v11's [`locker: Map<string, number>`](https://github.com/pnpm/pnpm/blob/4750fd370c/store/cafs/src/writeFile.ts).
///
/// Matches pnpm's guarantee: a successful return means `file_path`
/// exists on disk with contents equal to `content`. A torn mid-write
@@ -163,6 +168,11 @@ pub fn ensure_file(
content: &[u8],
#[cfg_attr(windows, allow(unused))] mode: Option<u32>,
) -> Result<(), EnsureFileError> {
// See the "Process-local per-path mutex" bullet above and
// [`cas_write_lock`] for the rationale.
let lock = cas_write_lock(file_path);
let _guard = lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
let mut options = OpenOptions::new();
options.write(true).create_new(true);
@@ -188,6 +198,21 @@ pub fn ensure_file(
}
}
/// Borrow the process-local write mutex for `file_path`.
///
/// Returning `Arc<Mutex<()>>` (rather than a `Ref` into the map) so
/// the caller `.lock()`s after the shard guard from `entry()` is
/// released — otherwise a recursive lookup on the same shard could
/// deadlock. The map is never pruned: entries are one per unique
/// CAS path the install wrote, bounded by the working set.
fn cas_write_lock(file_path: &Path) -> Arc<Mutex<()>> {
static LOCKS: OnceLock<DashMap<PathBuf, Arc<Mutex<()>>>> = OnceLock::new();
let locks = LOCKS.get_or_init(DashMap::new);
Arc::clone(
locks.entry(file_path.to_path_buf()).or_insert_with(|| Arc::new(Mutex::new(()))).value(),
)
}
/// Re-read an already-present CAS file and byte-compare with `content`.
/// If they match we're done; if not, recover the torn blob by writing a
/// fresh temp file and renaming it over the target.

View File

@@ -301,6 +301,68 @@ fn retry_on_fd_pressure_retries_emfile_and_enfile_until_success() {
}
}
/// Concurrent writers of the same CAS path on a fresh dirent must
/// all return `Ok(())`, produce identical inode observations, and
/// leave a file with the correct content. One writer wins
/// `O_CREAT|O_EXCL`; the rest take `verify_or_rewrite`. With the
/// per-path mutex, the late-comers see the winner's fully-written
/// file and take the byte-match fast path, so the inode never
/// changes. Without it, a late-comer can race into `write_atomic`
/// on a partial size, swap the inode, and the per-writer
/// observations below can diverge under a multi-rename race.
///
/// Note this is a smoke test, not a strict regression test: any
/// observation taken *after* `ensure_file` returns has already
/// missed the rename window, so a single-rename race typically
/// converges on one final inode and slips past. It catches the
/// multi-rename case and validates the "no deadlock, all writers
/// see correct content" baseline.
#[cfg(unix)]
#[test]
fn concurrent_writers_of_same_path_do_not_swap_the_inode() {
use std::os::unix::fs::MetadataExt;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
let tmp = tempdir().unwrap();
let path = Arc::new(tmp.path().join("shared"));
let content: Arc<Vec<u8>> = Arc::new(vec![0xAB; 1024 * 64]);
const WRITER_COUNT: usize = 32;
let barrier = Arc::new(Barrier::new(WRITER_COUNT));
let observed: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::with_capacity(WRITER_COUNT)));
let handles: Vec<_> = (0..WRITER_COUNT)
.map(|_| {
let path = Arc::clone(&path);
let content = Arc::clone(&content);
let barrier = Arc::clone(&barrier);
let observed = Arc::clone(&observed);
thread::spawn(move || {
barrier.wait();
ensure_file(&path, &content, None).expect("each writer should succeed");
let ino = fs::metadata(&*path).unwrap().ino();
observed.lock().unwrap().push(ino);
})
})
.collect();
for handle in handles {
handle.join().expect("writer thread should not panic");
}
let final_meta = fs::metadata(&*path).unwrap();
assert_eq!(fs::read(&*path).unwrap(), *content);
assert_eq!(final_meta.len(), content.len() as u64);
let observed = observed.lock().unwrap();
let first = observed[0];
assert!(
observed.iter().all(|ino| *ino == first),
"inode changed during concurrent writes: {observed:?}",
);
assert_eq!(final_meta.ino(), first);
}
/// Errors that aren't fd-pressure must propagate immediately —
/// retrying would just delay surfacing a real failure (e.g. a
/// genuine `NotFound` on the parent dir).