Implement shared path tracking in parallel discovery to prevent duplicate processing

- Introduced a shared `seen_paths` structure using `RwLock` to manage paths across all workers, addressing symlink loops and duplicate directory processing.
- Updated the `discovery_worker_rayon` function to utilize the shared `seen_paths`, enhancing efficiency and correctness in the discovery phase.
This commit is contained in:
Jamie Pine
2025-12-07 21:37:28 -08:00
parent 456da8a924
commit aff2398563

View File

@@ -90,6 +90,10 @@ async fn run_parallel_discovery(
let skipped_count = Arc::new(AtomicU64::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
// Shared seen_paths across all workers to prevent duplicate processing
// (handles symlink loops and same directory reached via different paths)
let seen_paths = Arc::new(parking_lot::RwLock::new(std::collections::HashSet::new()));
// Seed initial work
while let Some(dir) = state.dirs_to_walk.pop_front() {
pending_work.fetch_add(1, Ordering::Release);
@@ -108,6 +112,7 @@ async fn run_parallel_discovery(
let pending_work = Arc::clone(&pending_work);
let skipped_count = Arc::clone(&skipped_count);
let shutdown = Arc::clone(&shutdown);
let seen_paths = Arc::clone(&seen_paths);
let root_path = root_path.to_path_buf();
let volume_backend = volume_backend.cloned();
let cloud_url_base = cloud_url_base.clone();
@@ -121,6 +126,7 @@ async fn run_parallel_discovery(
pending_work,
skipped_count,
shutdown,
seen_paths,
root_path,
rule_toggles,
volume_backend,
@@ -265,13 +271,12 @@ async fn discovery_worker_rayon(
pending_work: Arc<AtomicUsize>,
skipped_count: Arc<AtomicU64>,
shutdown: Arc<AtomicBool>,
seen_paths: Arc<parking_lot::RwLock<std::collections::HashSet<PathBuf>>>,
root_path: PathBuf,
rule_toggles: RuleToggles,
volume_backend: Option<Arc<dyn crate::volume::VolumeBackend>>,
cloud_url_base: Option<String>,
) {
let mut seen_paths = std::collections::HashSet::new();
loop {
// Check shutdown signal
if shutdown.load(Ordering::Acquire) {
@@ -290,10 +295,13 @@ async fn discovery_worker_rayon(
Err(_) => continue, // Timeout, check shutdown flag again
};
// Skip if already seen (handles symlink loops)
if !seen_paths.insert(dir_path.clone()) {
pending_work.fetch_sub(1, Ordering::Release);
continue;
// Skip if already seen (handles symlink loops across ALL workers)
{
let mut seen = seen_paths.write();
if !seen.insert(dir_path.clone()) {
pending_work.fetch_sub(1, Ordering::Release);
continue;
}
}
// Build rules for this directory