diff --git a/core/Cargo.toml b/core/Cargo.toml index 4a66f6b5b..209103542 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,9 +19,10 @@ cli = [] [dependencies] # Async runtime -async-trait = "0.1" -futures = "0.3" -tokio = { version = "1.40", features = ["full"] } +async-channel = { workspace = true } +async-trait = "0.1" +futures = "0.3" +tokio = { version = "1.40", features = ["full"] } # Database sea-orm = { version = "1.1", features = [ diff --git a/core/src/ops/indexing/phases/discovery.rs b/core/src/ops/indexing/phases/discovery.rs index 549537ac8..b1d2fb08a 100644 --- a/core/src/ops/indexing/phases/discovery.rs +++ b/core/src/ops/indexing/phases/discovery.rs @@ -9,7 +9,9 @@ use crate::{ state::{DirEntry, EntryKind, IndexError, IndexPhase, IndexerProgress, IndexerState}, }, }; +use async_channel as chan; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::time::Instant; use std::{path::Path, sync::Arc}; @@ -22,7 +24,7 @@ impl crate::ops::indexing::rules::MetadataForIndexerRules for SimpleMetadata { } } -/// Run the discovery phase of indexing +/// Run the discovery phase of indexing with parallel directory walking pub async fn run_discovery_phase( state: &mut IndexerState, ctx: &JobContext<'_>, @@ -31,33 +33,382 @@ pub async fn run_discovery_phase( volume_backend: Option<&Arc>, cloud_url_base: Option, ) -> Result<(), JobError> { + let concurrency = state.discovery_concurrency; + + if concurrency <= 1 { + // Fall back to sequential discovery for concurrency = 1 + return run_discovery_phase_sequential( + state, + ctx, + root_path, + rule_toggles, + volume_backend, + cloud_url_base, + ) + .await; + } + ctx.log(format!( - "Discovery phase starting from: {}", - root_path.display() + "Discovery phase starting from: {} (concurrency: {})", + root_path.display(), + concurrency )); ctx.log(format!( "Initial directories to walk: {}", state.dirs_to_walk.len() )); - let mut skipped_count = 0u64; + run_parallel_discovery(state, ctx, root_path, rule_toggles, volume_backend, cloud_url_base) + .await +} - let toggles = rule_toggles; +/// Parallel discovery implementation using Rayon-style work-stealing +async fn run_parallel_discovery( + state: &mut IndexerState, + ctx: &JobContext<'_>, + root_path: &Path, + rule_toggles: RuleToggles, + volume_backend: Option<&Arc>, + cloud_url_base: Option, +) -> Result<(), JobError> { + let concurrency = state.discovery_concurrency; + + // Use unbounded channels to avoid backpressure/deadlock issues + let (work_tx, work_rx) = chan::unbounded::(); + let (result_tx, result_rx) = chan::unbounded::(); + + // Atomic counter tracking work in progress + shutdown signal + // INVARIANT: incremented BEFORE sending to work channel, decremented AFTER processing + let pending_work = Arc::new(AtomicUsize::new(0)); + let skipped_count = Arc::new(AtomicU64::new(0)); + let shutdown = Arc::new(AtomicBool::new(false)); + + // Seed initial work + while let Some(dir) = state.dirs_to_walk.pop_front() { + pending_work.fetch_add(1, Ordering::Release); + work_tx + .send(dir) + .await + .map_err(|_| JobError::execution("Work channel closed"))?; + } + + // Spawn worker tasks + let mut workers = Vec::new(); + for worker_id in 0..concurrency { + let work_rx = work_rx.clone(); + let work_tx = work_tx.clone(); + let result_tx = result_tx.clone(); + let pending_work = Arc::clone(&pending_work); + let skipped_count = Arc::clone(&skipped_count); + let shutdown = Arc::clone(&shutdown); + let root_path = root_path.to_path_buf(); + let volume_backend = volume_backend.cloned(); + let cloud_url_base = cloud_url_base.clone(); + + let worker = tokio::spawn(async move { + discovery_worker_rayon( + worker_id, + work_rx, + work_tx, + result_tx, + pending_work, + skipped_count, + shutdown, + root_path, + rule_toggles, + volume_backend, + cloud_url_base, + ) + .await + }); + + workers.push(worker); + } + + // Monitor task: signals shutdown when all work is done + let monitor = tokio::spawn({ + let shutdown = Arc::clone(&shutdown); + let pending_work = Arc::clone(&pending_work); + async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + if pending_work.load(Ordering::Acquire) == 0 { + shutdown.store(true, Ordering::Release); + break; + } + } + } + }); + + // Drop our copies so channels close when workers are done + drop(work_tx); + drop(result_tx); + + // Collect results + let mut total_processed = 0u64; + while let Ok(result) = result_rx.recv().await { + match result { + DiscoveryResult::Entry(entry) => { + state.pending_entries.push(entry); + total_processed += 1; + + if state.should_create_batch() { + let batch = state.create_batch(); + state.entry_batches.push(batch); + } + } + DiscoveryResult::Stats { + files, + dirs, + symlinks, + bytes, + } => { + state.stats.files += files; + state.stats.dirs += dirs; + state.stats.symlinks += symlinks; + state.stats.bytes += bytes; + } + DiscoveryResult::Error(error) => { + state.add_error(error); + } + DiscoveryResult::Progress { dirs_queued } => { + let indexer_progress = IndexerProgress { + phase: IndexPhase::Discovery { dirs_queued }, + current_path: root_path.display().to_string(), + total_found: state.stats, + processing_rate: state.calculate_rate(), + estimated_remaining: state.estimate_remaining(), + scope: None, + persistence: None, + is_ephemeral: false, + action_context: None, + }; + ctx.progress(Progress::generic(indexer_progress.to_generic_progress())); + state.items_since_last_update += 1; + } + DiscoveryResult::QueueDirectories(_) => { + // Workers queue directly, this shouldn't happen + unreachable!("Workers should not send QueueDirectories in Rayon-style mode"); + } + } + + ctx.check_interrupt().await?; + } + + // Wait for monitor and workers + monitor + .await + .map_err(|e| JobError::execution(format!("Monitor task failed: {}", e)))?; + + for worker in workers { + worker + .await + .map_err(|e| JobError::execution(format!("Worker task failed: {}", e)))?; + } + + // Final batch + if !state.pending_entries.is_empty() { + let final_batch_size = state.pending_entries.len(); + ctx.log(format!( + "Creating final batch with {} entries", + final_batch_size + )); + let batch = state.create_batch(); + state.entry_batches.push(batch); + } + + let skipped = skipped_count.load(Ordering::SeqCst); + state.stats.skipped = skipped; + + ctx.log(format!( + "Parallel discovery complete: {} files, {} dirs, {} symlinks, {} skipped, {} batches created", + state.stats.files, + state.stats.dirs, + state.stats.symlinks, + skipped, + state.entry_batches.len() + )); + + state.phase = crate::ops::indexing::state::Phase::Processing; + Ok(()) +} + +/// Result types sent from workers back to coordinator +enum DiscoveryResult { + Entry(DirEntry), + QueueDirectories(Vec), + Stats { + files: u64, + dirs: u64, + symlinks: u64, + bytes: u64, + }, + Error(IndexError), + Progress { dirs_queued: usize }, +} + +/// Rayon-style worker: processes directories and directly enqueues new work +async fn discovery_worker_rayon( + _worker_id: usize, + work_rx: chan::Receiver, + work_tx: chan::Sender, + result_tx: chan::Sender, + pending_work: Arc, + skipped_count: Arc, + shutdown: Arc, + root_path: PathBuf, + rule_toggles: RuleToggles, + volume_backend: Option>, + cloud_url_base: Option, +) { + let mut seen_paths = std::collections::HashSet::new(); + + loop { + // Check shutdown signal + if shutdown.load(Ordering::Acquire) { + break; + } + + // Try to get work with a timeout to periodically check shutdown + let dir_path = match tokio::time::timeout( + tokio::time::Duration::from_millis(50), + work_rx.recv(), + ) + .await + { + Ok(Ok(path)) => path, + Ok(Err(_)) => break, // Channel closed + Err(_) => continue, // Timeout, check shutdown flag again + }; + + // Skip if already seen (handles symlink loops) + if !seen_paths.insert(dir_path.clone()) { + pending_work.fetch_sub(1, Ordering::Release); + continue; + } + + // Build rules for this directory + let dir_ruler = build_default_ruler(rule_toggles, &root_path, &dir_path).await; + + // Read directory + match read_directory(&dir_path, volume_backend.as_ref(), cloud_url_base.as_deref()).await + { + Ok(entries) => { + let mut local_stats = LocalStats::default(); + + for entry in entries { + // Apply rules + let decision = dir_ruler + .evaluate_path( + &entry.path, + &SimpleMetadata { + is_dir: matches!(entry.kind, EntryKind::Directory), + }, + ) + .await; + + if matches!(decision, Ok(RulerDecision::Reject)) { + skipped_count.fetch_add(1, Ordering::Relaxed); + continue; + } + + if let Err(err) = decision { + let _ = result_tx + .send(DiscoveryResult::Error(IndexError::FilterCheck { + path: entry.path.to_string_lossy().to_string(), + error: err.to_string(), + })) + .await; + continue; + } + + match entry.kind { + EntryKind::Directory => { + local_stats.dirs += 1; + // Rayon-style: increment BEFORE queueing, worker directly enqueues + pending_work.fetch_add(1, Ordering::Release); + if work_tx.send(entry.path.clone()).await.is_err() { + // Channel closed, decrement and continue + pending_work.fetch_sub(1, Ordering::Release); + } + let _ = result_tx.send(DiscoveryResult::Entry(entry)).await; + } + EntryKind::File => { + local_stats.files += 1; + local_stats.bytes += entry.size; + let _ = result_tx.send(DiscoveryResult::Entry(entry)).await; + } + EntryKind::Symlink => { + local_stats.symlinks += 1; + let _ = result_tx.send(DiscoveryResult::Entry(entry)).await; + } + } + } + + // Send stats update + let _ = result_tx + .send(DiscoveryResult::Stats { + files: local_stats.files, + dirs: local_stats.dirs, + symlinks: local_stats.symlinks, + bytes: local_stats.bytes, + }) + .await; + + // Send progress update + let dirs_queued = pending_work.load(Ordering::Acquire); + let _ = result_tx + .send(DiscoveryResult::Progress { dirs_queued }) + .await; + } + Err(e) => { + let _ = result_tx + .send(DiscoveryResult::Error(IndexError::ReadDir { + path: dir_path.to_string_lossy().to_string(), + error: e.to_string(), + })) + .await; + } + } + + // Decrement AFTER processing complete + pending_work.fetch_sub(1, Ordering::Release); + } +} + +#[derive(Default)] +struct LocalStats { + files: u64, + dirs: u64, + symlinks: u64, + bytes: u64, +} + +/// Sequential discovery fallback (original implementation) +async fn run_discovery_phase_sequential( + state: &mut IndexerState, + ctx: &JobContext<'_>, + root_path: &Path, + rule_toggles: RuleToggles, + volume_backend: Option<&Arc>, + cloud_url_base: Option, +) -> Result<(), JobError> { + ctx.log(format!( + "Discovery phase starting from: {} (sequential mode)", + root_path.display() + )); + + let mut skipped_count = 0u64; while let Some(dir_path) = state.dirs_to_walk.pop_front() { ctx.check_interrupt().await?; - // Skip if already seen (handles symlink loops) if !state.seen_paths.insert(dir_path.clone()) { continue; } - // Build rules in the context of the current directory for gitignore behavior - let dir_ruler = build_default_ruler(toggles, root_path, &dir_path).await; + let dir_ruler = build_default_ruler(rule_toggles, root_path, &dir_path).await; - // Do not skip the directory itself by rules; only apply rules to its entries - - // Update progress let indexer_progress = IndexerProgress { phase: IndexPhase::Discovery { dirs_queued: state.dirs_to_walk.len(), @@ -69,21 +420,18 @@ pub async fn run_discovery_phase( scope: None, persistence: None, is_ephemeral: false, - action_context: None, // TODO: Pass action context from job state + action_context: None, }; ctx.progress(Progress::generic(indexer_progress.to_generic_progress())); - // Read directory entries with per-dir FS timing match read_directory(&dir_path, volume_backend, cloud_url_base.as_deref()).await { Ok(entries) => { let entry_count = entries.len(); let mut added_count = 0; for entry in entries { - // Check for interruption during entry processing ctx.check_interrupt().await?; - // Skip filtered entries via rules engine let decision = dir_ruler .evaluate_path( &entry.path, @@ -95,7 +443,6 @@ pub async fn run_discovery_phase( if matches!(decision, Ok(RulerDecision::Reject)) { state.stats.skipped += 1; skipped_count += 1; - eprintln!("[discovery] Filtered entry: {}", entry.path.display()); continue; } if let Err(err) = decision { @@ -135,7 +482,6 @@ pub async fn run_discovery_phase( )); } - // Batch entries if state.should_create_batch() { let batch = state.create_batch(); state.entry_batches.push(batch); @@ -151,13 +497,9 @@ pub async fn run_discovery_phase( } } - // Update rate tracking state.items_since_last_update += 1; - - // State is automatically saved during job serialization on shutdown } - // Final batch if !state.pending_entries.is_empty() { let final_batch_size = state.pending_entries.len(); ctx.log(format!( diff --git a/core/src/ops/indexing/state.rs b/core/src/ops/indexing/state.rs index 93a8913ad..4e769f8ee 100644 --- a/core/src/ops/indexing/state.rs +++ b/core/src/ops/indexing/state.rs @@ -135,6 +135,11 @@ impl IndexerState { dirs_to_walk.push_back(path.to_path_buf()); } + // Use half of available CPU cores for parallel discovery (Rayon-style) + let discovery_concurrency = std::thread::available_parallelism() + .map(|n| usize::max(n.get() / 2, 1)) + .unwrap_or(4); + Self { phase: Phase::Discovery, started_at: Instant::now(), @@ -150,7 +155,7 @@ impl IndexerState { last_progress_time: Instant::now(), items_since_last_update: 0, batch_size: 1000, - discovery_concurrency: 1, + discovery_concurrency, dirs_channel_capacity: 4096, entries_channel_capacity: 16384, } diff --git a/core/tests/indexing_test.rs b/core/tests/indexing_test.rs index d403a9c37..2a525c215 100644 --- a/core/tests/indexing_test.rs +++ b/core/tests/indexing_test.rs @@ -173,15 +173,16 @@ async fn test_location_indexing() -> Result<(), Box> { // 8. Verify indexed entries in database // Helper to get all entry IDs under the location let get_location_entry_ids = || async { - let descendant_ids = entry_closure::Entity::find() - .filter(entry_closure::Column::AncestorId.eq(location_entry_id)) + let location_id = location_entry_id.expect("Location should have entry_id"); + let descendant_ids: Vec = entry_closure::Entity::find() + .filter(entry_closure::Column::AncestorId.eq(location_id)) .all(db.conn()) .await? .into_iter() .map(|ec| ec.descendant_id) - .collect::>(); + .collect(); - let mut all_ids = vec![location_entry_id]; + let mut all_ids = vec![location_id]; all_ids.extend(descendant_ids); Ok::, anyhow::Error>(all_ids) }; @@ -337,15 +338,16 @@ async fn test_incremental_indexing() -> Result<(), Box> { } // Get all entry IDs under this location - let descendant_ids = entry_closure::Entity::find() - .filter(entry_closure::Column::AncestorId.eq(location_entry_id)) + let location_id = location_entry_id.expect("Location should have entry_id"); + let descendant_ids: Vec = entry_closure::Entity::find() + .filter(entry_closure::Column::AncestorId.eq(location_id)) .all(db.conn()) .await? .into_iter() .map(|ec| ec.descendant_id) - .collect::>(); + .collect(); - let mut all_entry_ids = vec![location_entry_id]; + let mut all_entry_ids = vec![location_id]; all_entry_ids.extend(descendant_ids); let initial_file_count = entities::entry::Entity::find()