diff --git a/Cargo.lock b/Cargo.lock index 326b3edc72..12f4694c44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2088,6 +2088,7 @@ dependencies = [ name = "pacquet-fs" version = "0.0.1" dependencies = [ + "dashmap", "derive_more", "dunce", "junction", diff --git a/pacquet/crates/fs/Cargo.toml b/pacquet/crates/fs/Cargo.toml index 3cccc01008..4e3600f39a 100644 --- a/pacquet/crates/fs/Cargo.toml +++ b/pacquet/crates/fs/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true repository.workspace = true [dependencies] +dashmap = { workspace = true } derive_more = { workspace = true } miette = { workspace = true } pathdiff = { workspace = true } diff --git a/pacquet/crates/fs/src/ensure_file.rs b/pacquet/crates/fs/src/ensure_file.rs index d8afcdbe9e..8ffcdfb94a 100644 --- a/pacquet/crates/fs/src/ensure_file.rs +++ b/pacquet/crates/fs/src/ensure_file.rs @@ -1,10 +1,14 @@ +use dashmap::DashMap; use derive_more::{Display, Error}; use miette::Diagnostic; use std::{ fs::{self, File, OpenOptions}, io::{self, Write}, path::{Path, PathBuf}, - sync::atomic::{AtomicU64, Ordering}, + sync::{ + Arc, Mutex, OnceLock, + atomic::{AtomicU64, Ordering}, + }, time::{Duration, Instant}, }; @@ -148,12 +152,13 @@ pub fn ensure_parent_dir(dir: &Path) -> Result<(), EnsureFileError> { /// buffer we were about to write, so comparing against it /// implicitly verifies the sha512 without a second hash pass. Same /// correctness guarantee, one fewer full-buffer walk. -/// * **No `locker: Map` process-local cache**: pnpm's -/// locker skips re-verifying the same file within one install. -/// Pacquet's hot path calls `ensure_file` at most once per CAS file -/// per install (the `StoreIndex` cache decides whether we even get -/// here), so the locker would be mostly empty work. Can revisit if -/// profiling shows repeated AlreadyExists hits on a single path. +/// * **Process-local per-path mutex for serialization**: two +/// snapshots whose tarballs ship identical file content +/// (e.g. a shared `LICENSE`) compute the same CAS path and would +/// race in `verify_or_rewrite`. The mutex makes the second +/// writer wait for the first's `write_all` so the byte-match +/// fast path always applies. Pacquet's stronger form of pnpm +/// v11's [`locker: Map`](https://github.com/pnpm/pnpm/blob/4750fd370c/store/cafs/src/writeFile.ts). /// /// Matches pnpm's guarantee: a successful return means `file_path` /// exists on disk with contents equal to `content`. A torn mid-write @@ -163,6 +168,11 @@ pub fn ensure_file( content: &[u8], #[cfg_attr(windows, allow(unused))] mode: Option, ) -> Result<(), EnsureFileError> { + // See the "Process-local per-path mutex" bullet above and + // [`cas_write_lock`] for the rationale. + let lock = cas_write_lock(file_path); + let _guard = lock.lock().unwrap_or_else(|poisoned| poisoned.into_inner()); + let mut options = OpenOptions::new(); options.write(true).create_new(true); @@ -188,6 +198,21 @@ pub fn ensure_file( } } +/// Borrow the process-local write mutex for `file_path`. +/// +/// Returning `Arc>` (rather than a `Ref` into the map) so +/// the caller `.lock()`s after the shard guard from `entry()` is +/// released — otherwise a recursive lookup on the same shard could +/// deadlock. The map is never pruned: entries are one per unique +/// CAS path the install wrote, bounded by the working set. +fn cas_write_lock(file_path: &Path) -> Arc> { + static LOCKS: OnceLock>>> = OnceLock::new(); + let locks = LOCKS.get_or_init(DashMap::new); + Arc::clone( + locks.entry(file_path.to_path_buf()).or_insert_with(|| Arc::new(Mutex::new(()))).value(), + ) +} + /// Re-read an already-present CAS file and byte-compare with `content`. /// If they match we're done; if not, recover the torn blob by writing a /// fresh temp file and renaming it over the target. diff --git a/pacquet/crates/fs/src/ensure_file/tests.rs b/pacquet/crates/fs/src/ensure_file/tests.rs index 15a6ff7e91..1f3c34f039 100644 --- a/pacquet/crates/fs/src/ensure_file/tests.rs +++ b/pacquet/crates/fs/src/ensure_file/tests.rs @@ -301,6 +301,68 @@ fn retry_on_fd_pressure_retries_emfile_and_enfile_until_success() { } } +/// Concurrent writers of the same CAS path on a fresh dirent must +/// all return `Ok(())`, produce identical inode observations, and +/// leave a file with the correct content. One writer wins +/// `O_CREAT|O_EXCL`; the rest take `verify_or_rewrite`. With the +/// per-path mutex, the late-comers see the winner's fully-written +/// file and take the byte-match fast path, so the inode never +/// changes. Without it, a late-comer can race into `write_atomic` +/// on a partial size, swap the inode, and the per-writer +/// observations below can diverge under a multi-rename race. +/// +/// Note this is a smoke test, not a strict regression test: any +/// observation taken *after* `ensure_file` returns has already +/// missed the rename window, so a single-rename race typically +/// converges on one final inode and slips past. It catches the +/// multi-rename case and validates the "no deadlock, all writers +/// see correct content" baseline. +#[cfg(unix)] +#[test] +fn concurrent_writers_of_same_path_do_not_swap_the_inode() { + use std::os::unix::fs::MetadataExt; + use std::sync::{Arc, Barrier, Mutex}; + use std::thread; + + let tmp = tempdir().unwrap(); + let path = Arc::new(tmp.path().join("shared")); + let content: Arc> = Arc::new(vec![0xAB; 1024 * 64]); + + const WRITER_COUNT: usize = 32; + let barrier = Arc::new(Barrier::new(WRITER_COUNT)); + let observed: Arc>> = Arc::new(Mutex::new(Vec::with_capacity(WRITER_COUNT))); + + let handles: Vec<_> = (0..WRITER_COUNT) + .map(|_| { + let path = Arc::clone(&path); + let content = Arc::clone(&content); + let barrier = Arc::clone(&barrier); + let observed = Arc::clone(&observed); + thread::spawn(move || { + barrier.wait(); + ensure_file(&path, &content, None).expect("each writer should succeed"); + let ino = fs::metadata(&*path).unwrap().ino(); + observed.lock().unwrap().push(ino); + }) + }) + .collect(); + + for handle in handles { + handle.join().expect("writer thread should not panic"); + } + + let final_meta = fs::metadata(&*path).unwrap(); + assert_eq!(fs::read(&*path).unwrap(), *content); + assert_eq!(final_meta.len(), content.len() as u64); + let observed = observed.lock().unwrap(); + let first = observed[0]; + assert!( + observed.iter().all(|ino| *ino == first), + "inode changed during concurrent writes: {observed:?}", + ); + assert_eq!(final_meta.ino(), first); +} + /// Errors that aren't fd-pressure must propagate immediately — /// retrying would just delay surfacing a real failure (e.g. a /// genuine `NotFound` on the parent dir).