From 7f0569b955468d104f9b0ee122ee58515eacad4e Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 7 Jul 2025 03:56:42 -0700 Subject: [PATCH] Enhance progressive copy design for improved user experience - Introduced a phased implementation strategy for progressive copy operations, enhancing user feedback during file transfers. - Phase 1 focuses on detailed progress reporting, including real-time estimates and preparation feedback. - Phase 2 integrates database querying for instant startup and destination analysis to avoid redundant copies. - Phase 3 aims for concurrent discovery, allowing simultaneous copying and file discovery for complete coverage. These changes significantly improve the user experience by providing clear, real-time updates and optimizing the copy process. --- .../files/copy/PROGRESSIVE_COPY_DESIGN.md | 970 +++++++++--------- 1 file changed, 476 insertions(+), 494 deletions(-) diff --git a/core-new/src/operations/files/copy/PROGRESSIVE_COPY_DESIGN.md b/core-new/src/operations/files/copy/PROGRESSIVE_COPY_DESIGN.md index 709bea0a0..7a64574a8 100644 --- a/core-new/src/operations/files/copy/PROGRESSIVE_COPY_DESIGN.md +++ b/core-new/src/operations/files/copy/PROGRESSIVE_COPY_DESIGN.md @@ -4,6 +4,8 @@ This document outlines a design for implementing progressive copy operations that provide real-time preparation feedback, similar to macOS Finder, while leveraging Spacedrive's indexed data for instant initial preparation and real-time discovery for complete file coverage. +This design is implemented in phases, building on Spacedrive's existing job system infrastructure and gradually adding more sophisticated features. + ## Problem Statement Current copy operations suffer from poor user experience due to: @@ -27,6 +29,7 @@ macOS Finder provides excellent UX by showing: Spacedrive can go beyond Finder by: - **Instant initial estimates** using existing indexed data +- **Cross-device awareness** using SdPath for distributed operations - **Real-time discovery** for files not in database (filtered or new) - **Concurrent processing** - copying known files while discovering new ones - **Complete coverage** ensuring all user-selected files are copied @@ -36,252 +39,190 @@ Spacedrive can go beyond Finder by: Since global filters mean database never contains complete file sets, we use: - **Database for instant start**: Copy known files immediately -- **Real-time indexer for gaps**: Discover and add unindexed files concurrently -- **Delta-only communication**: Child indexer only reports files NOT in database +- **Real-time discovery for gaps**: Find and add unindexed files +- **Incremental implementation**: Build complexity gradually -## Architecture +## Implementation Strategy + +This design is implemented in three phases, each building on the previous: + +### Phase 1: Enhanced Progress (Immediate Implementation) +- Improve existing copy job with detailed progress reporting +- Add database querying for instant estimates +- Use existing job infrastructure + +### Phase 2: Database Integration (Medium Term) +- Query indexed files for instant startup +- Pre-compute destination existence maps +- Enhanced progress with real data + +### Phase 3: Concurrent Discovery (Advanced) +- Parent-child job communication infrastructure +- Real-time file discovery during copying +- Complete file coverage including filtered content + +## Architecture Overview ```rust ┌─────────────────────────────────────────────────────────────────┐ -│ Progressive Copy with Destination Pre-indexing │ +│ Progressive Copy Architecture │ ├─────────────────────────────────────────────────────────────────┤ -│ Phase 0: Destination Pre-indexing (highest priority) │ -│ ├─ Spawn destination indexer FIRST with filter bypass │ -│ ├─ Build complete map of existing files at destination │ -│ ├─ Non-blocking: continue while indexer runs in background │ -│ └─ Provides instant "skip" decisions when indexer completes │ +│ Phase 1: Enhanced Progress (Using Existing Infrastructure) │ +│ ├─ Database size/count estimates (< 100ms) │ +│ ├─ Detailed progress using Progress::structured() │ +│ ├─ SdPath-aware processing for cross-device operations │ +│ └─ Enhanced error handling and resume capabilities │ ├─────────────────────────────────────────────────────────────────┤ -│ Phase 1: Instant Database Start (< 100ms) │ -│ ├─ Query existing indexed files for all sources │ -│ ├─ Get aggregate estimates (size, count) from database │ -│ ├─ Filter against destination index (if ready) or filesystem │ -│ ├─ Start copying known files immediately │ -│ └─ Show initial progress: "Copying 1,234 files (2.1 GB)..." │ +│ Phase 2: Database Integration │ +│ ├─ Query known files from database for instant startup │ +│ ├─ Destination pre-scanning for existence checking │ +│ ├─ Smart progress updates with real file data │ +│ └─ Preparation phase with "Preparing to copy..." feedback │ ├─────────────────────────────────────────────────────────────────┤ -│ Phase 2: Concurrent Real-time Discovery │ -│ ├─ Spawn source indexer with filter bypass │ -│ ├─ Child scans filesystem and finds NEW files (not in DB) │ -│ ├─ Child streams discoveries back to parent as it finds them │ -│ ├─ Parent filters against destination index before adding │ -│ └─ Progress updates: "Found 56 more files, copying..." │ -├─────────────────────────────────────────────────────────────────┤ -│ Phase 3: Complete When All Done │ -│ ├─ Copy job continues until all files (known + discovered) │ -│ ├─ Both indexers report completion │ -│ ├─ Parent ensures all discovered files are copied │ -│ └─ Final completion when copying and both indexers done │ +│ Phase 3: Concurrent Discovery (Future) │ +│ ├─ Parent-child job spawning infrastructure │ +│ ├─ Real-time discovery of unindexed files │ +│ ├─ Concurrent copying and discovery │ +│ └─ Complete file coverage including filtered content │ └─────────────────────────────────────────────────────────────────┘ ``` ## Core Components -### 1. Destination Pre-indexing Strategy +### 1. Enhanced Progress System (Phase 1) -The most critical optimization: index the destination FIRST to build a complete "what already exists" map. +The first phase focuses on dramatically improving the user experience by providing rich, real-time feedback during copy operations. Currently, users see a frustrating "black box" experience where progress jumps from 0% to 100% with no intermediate updates, particularly during the preparation phase where the system is analyzing files. + +The enhanced progress system introduces multiple distinct phases that users can understand and follow. When a copy operation begins, users will immediately see a "Database Query" phase where the system quickly checks if any of the selected files are already indexed in Spacedrive's database. This happens almost instantaneously (under 100ms) and provides immediate feedback that something is happening. + +Next comes a "Preparation" phase that replaces the current silent file analysis period. Instead of users waiting 20+ seconds with no feedback, they'll see "Preparing to copy..." with real-time updates showing how many files have been discovered and the total size being calculated. This mirrors the excellent user experience provided by macOS Finder. + +During the actual "Copying" phase, users see detailed information about which specific file is currently being processed, along with accurate progress percentages based on real file sizes rather than simple file counts. The system tracks both files completed and bytes transferred, providing meaningful progress updates. + +Finally, a "Complete" phase gives users clear confirmation that the operation finished successfully, including summary statistics about what was copied. + +This enhanced progress system builds entirely on Spacedrive's existing job infrastructure, using the established `Progress::structured()` pattern and `JobProgress` trait. It requires no new infrastructure and can be implemented immediately as an enhancement to the current `FileCopyJob`. + +Build on the existing job infrastructure with improved progress reporting: ```rust -/// Destination indexer that builds complete existence map -pub struct DestinationIndexer { - destination_root: PathBuf, - existing_files: Arc>>, - indexing_complete: Arc, +/// Enhanced progress data for copy operations +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CopyProgressData { + pub phase: CopyPhase, + pub current_file: Option, + pub files_copied: u64, + pub total_files: u64, + pub bytes_copied: u64, + pub total_bytes: u64, + pub discovery_active: bool, + pub preparation_complete: bool, } -impl DestinationIndexer { - /// Start destination indexing immediately (highest priority) - pub async fn start_indexing(&self, ctx: &JobContext<'_>) -> JobResult> { - ctx.log("🎯 Starting destination pre-indexing for skip optimization"); - - // Create ephemeral indexer config for destination with filter bypass - let dest_batch = SdPathBatch::new(vec![SdPath::from_path(&self.destination_root)]); - let config = IndexerJobConfig::destination_analysis(dest_batch); - - let mut dest_indexer = IndexerJob::from_config(config); - dest_indexer.set_parent_job_id(ctx.job_id()); - - // Spawn with HIGHEST priority - this blocks nothing but gets CPU priority - let handle = ctx.spawn_child_with_priority(dest_indexer, JobPriority::CRITICAL).await?; - - // Monitor progress and populate our existence map - self.monitor_indexer_progress(handle.clone()).await; - - Ok(handle) - } - - /// Monitor indexer progress and populate existence map - async fn monitor_indexer_progress(&self, handle: JobHandle) { - let progress_stream = handle.subscribe_progress(); - let existing_files = self.existing_files.clone(); - let indexing_complete = self.indexing_complete.clone(); - - tokio::spawn(async move { - while let Ok(progress) = progress_stream.recv().await { - if let Progress::Structured(data) = progress { - // Extract file paths from indexer progress - if let Ok(file_discovered) = serde_json::from_value::(data) { - existing_files.write().await.insert(file_discovered.path); - } - } - } - - // Mark complete when indexer finishes - indexing_complete.store(true, Ordering::Release); - }); - } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CopyPhase { + Initializing, + DatabaseQuery, + Preparation, + Copying, + Complete, } - /// Check if file exists at destination (non-blocking) - pub async fn file_exists(&self, relative_path: &Path) -> ExistenceCheck { - let full_destination = self.destination_root.join(relative_path); +impl JobProgress for CopyProgressData {} - if self.indexing_complete.load(Ordering::Acquire) { - // Indexing complete - use pre-computed results (instant) - let exists = self.existing_files.read().await.contains(&full_destination); - ExistenceCheck::Known(exists) - } else { - // Indexing still running - fall back to filesystem check (non-blocking) - let exists = tokio::fs::metadata(&full_destination).await.is_ok(); - ExistenceCheck::FilesystemFallback(exists) - } - } -} +/// Enhanced FileCopyJob with preparation phase +impl FileCopyJob { + async fn run_with_preparation(&mut self, ctx: JobContext<'_>) -> JobResult { + // Phase 1: Database estimates (if available) + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::DatabaseQuery, + current_file: None, + files_copied: 0, + total_files: 0, + bytes_copied: 0, + total_bytes: 0, + discovery_active: false, + preparation_complete: false, + })); -#[derive(Debug)] -pub enum ExistenceCheck { - /// Result from completed destination index (instant, accurate) - Known(bool), - /// Fallback filesystem check while indexing runs (slower, but non-blocking) - FilesystemFallback(bool), -} + let estimated_info = self.get_database_estimates(&ctx).await?; + + // Phase 2: Preparation with real progress + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::Preparation, + total_files: estimated_info.estimated_files, + total_bytes: estimated_info.estimated_size, + preparation_complete: false, + ..Default::default() + })); -/// Progress type for file discovery from indexer -#[derive(Debug, Serialize, Deserialize)] -pub struct FileDiscoveredProgress { - pub path: PathBuf, - pub size: u64, - pub kind: String, // "file" or "directory" -} - -/// IndexerJobConfig extensions for destination analysis -impl IndexerJobConfig { - /// Create configuration for destination analysis (existence mapping) - pub fn destination_analysis(destination: SdPathBatch) -> Self { - Self { - location_id: None, - sources: destination, - mode: IndexMode::CopyPreparation, // Use same mode to bypass filters - scope: IndexScope::Recursive, - persistence: IndexPersistence::Ephemeral, // Don't save to database - max_depth: None, - bypass_filters: true, // Index ALL files at destination for complete existence map - } + // Phase 3: Enhanced copying with detailed progress + self.run_enhanced_copy(&ctx, estimated_info).await } } ``` -### 2. Hybrid Preparation Engine +### 2. Database Integration (Phase 2) + +The second phase leverages Spacedrive's unique advantage as an indexed file manager to provide instant startup times for copy operations. While traditional file managers must perform expensive filesystem traversals to count files and calculate sizes, Spacedrive can potentially answer these questions instantly by querying its existing database of indexed files. + +When a user initiates a copy operation, the system first attempts to gather size and file count estimates from the database. For paths that have been previously indexed, this query completes in milliseconds and provides immediate feedback to the user about the scope of the operation. The user sees realistic estimates like "Preparing to copy 1,234 files (2.1 GB)" almost instantly, rather than waiting for a lengthy filesystem scan. + +The database integration is designed to work gracefully with Spacedrive's location-based indexing system. When analyzing source paths, the system first determines which managed locations contain the selected files, then queries the entries table for aggregate statistics. This approach leverages the existing database schema without requiring new tables or complex migrations. + +For paths that aren't indexed or are outside managed locations, the system falls back to traditional filesystem scanning, but users get clear feedback about which parts of the operation are instant (from database) versus which require real-time discovery. + +Query existing indexed data for instant estimates and known files: ```rust -/// Preparation engine that combines database queries with real-time discovery -pub struct HybridPreparationEngine { - database: Arc, +/// Database query engine for copy preparation +pub struct CopyPreparationEngine { + database: Arc, } -impl HybridPreparationEngine { - /// Get existing files from database for instant copying start - pub async fn get_database_files(&self, sources: &[SdPath]) -> JobResult> { - let mut all_files = Vec::new(); - - for source in sources { - if source.is_file() { - // Check if single file is in database - if let Some(file_data) = self.query_single_file(source).await? { - all_files.push(file_data); - } - // If not in DB, child indexer will find it - } else { - // Query all indexed files under this directory - let indexed_files = self.query_directory_files(source).await?; - all_files.extend(indexed_files); - } - } - - Ok(all_files) - } - - /// Query database for files under a directory (leverages existing indexed data) - async fn query_directory_files(&self, source: &SdPath) -> JobResult> { - let source_path = source.as_local_path() - .ok_or_else(|| JobError::execution("Source must be local"))?; - - // Find location containing this source - let location = self.find_location_for_path(source_path).await?; - let location_root = Path::new(&location.path); - - // Calculate relative path for database query - let relative_path = if let Ok(rel_path) = source_path.strip_prefix(location_root) { - rel_path.to_string_lossy().to_string() - } else { - return Ok(Vec::new()); // Path not in any indexed location - }; - - // Query all files under this path (what's already indexed) - let files = entries::Entity::find() - .filter(entries::Column::LocationId.eq(location.id)) - .filter( - entries::Column::RelativePath.eq(&relative_path) - .or(entries::Column::RelativePath.like(format!("{}/%", relative_path))) - .or(entries::Column::RelativePath.like(format!("{}\\%", relative_path))) // Windows - ) - .filter(entries::Column::Kind.eq(0)) // Files only - .all(&self.database.connection) - .await?; - - // Convert to AnalyzedFile format - let analyzed_files = files.into_iter().map(|entry| { - let full_path = if entry.relative_path.is_empty() { - location_root.join(&entry.name) - } else { - location_root.join(&entry.relative_path).join(&entry.name) - }; - - AnalyzedFile { - source_path: full_path, - destination_path: PathBuf::new(), // Will be calculated later - size: entry.size as u64, - is_completed: false, - } - }).collect(); - - Ok(analyzed_files) - } - - /// Get quick size estimates from database aggregates +impl CopyPreparationEngine { + /// Get instant estimates from database pub async fn get_database_estimates(&self, sources: &[SdPath]) -> JobResult { let mut total_size = 0; let mut total_files = 0; for source in sources { - if source.is_file() { - if let Some(file_entry) = self.query_single_file_entry(source).await? { - total_size += file_entry.size as u64; - total_files += 1; + if source.is_local() { + // Query local database for this path + if let Some(estimates) = self.query_path_estimates(source).await? { + total_size += estimates.size; + total_files += estimates.file_count; } } else { - // Use aggregate_size and file_count from directory entries - if let Some(dir_entry) = self.query_directory_entry(source).await? { - total_size += dir_entry.aggregate_size.unwrap_or(0) as u64; - total_files += dir_entry.file_count.unwrap_or(0) as u64; - } + // For remote paths, we'll need cross-device queries (future enhancement) + ctx.log(format!("Remote source detected: {}", source.display())); } } Ok(EstimateData { estimated_size: total_size, estimated_files: total_files, - is_complete: false, // These are estimates - discovery may find more + is_complete: false, }) } + + /// Query indexed files for a path (leverages existing database schema) + async fn query_path_estimates(&self, source: &SdPath) -> JobResult> { + let source_path = source.as_local_path() + .ok_or_else(|| JobError::execution("Source must be local"))?; + + // Find location containing this source + let location = self.find_location_for_path(source_path).await?; + let Some(location) = location else { + return Ok(None); // Path not in any indexed location + }; + + // Use existing entries table to get aggregated data + // This leverages the existing database schema + let estimates = self.query_location_path_stats(&location, source_path).await?; + + Ok(estimates) + } } #[derive(Debug)] @@ -290,71 +231,156 @@ pub struct EstimateData { pub estimated_files: u64, pub is_complete: bool, } -``` -### 2. Enhanced Progress System for Real-time Communication - -```rust -/// Enhanced progress types for parent-child job communication -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum CopyProgress { - /// Initial database estimates - InitialEstimate { - known_files: u64, - known_size: u64, - discovery_starting: bool, - }, - - /// Child indexer discovered new file not in database - FileDiscovered { - path: PathBuf, - size: u64, - discovery_batch: u32, - }, - - /// Child indexer completed discovery phase - DiscoveryComplete { - total_discovered: u64, - discovery_size: u64, - }, - - /// File copy progress - FileCopied { - path: PathBuf, - size: u64, - was_discovered: bool, // true if from indexer, false if from database - }, - - /// Overall copy progress - OverallProgress { - copied_files: u64, - total_files: u64, - copied_bytes: u64, - total_bytes: u64, - skipped_files: u64, - discovery_active: bool, - }, - - /// Destination analysis progress - DestinationAnalysis { - files_found: u64, - total_size: u64, - path: PathBuf, - }, +#[derive(Debug)] +pub struct PathEstimates { + pub size: u64, + pub file_count: u64, } ``` -### 3. Child Indexer Job with Delta Discovery +### 3. Destination Pre-analysis (Phase 2) + +This component introduces intelligent destination analysis to optimize copy operations by avoiding unnecessary work. One of the most frustrating aspects of current copy operations is that they often re-copy files that already exist at the destination, wasting time and bandwidth. + +The destination pre-analysis system scans the destination directory before copying begins, building a complete map of existing files. This enables smart skip logic where the system can instantly decide whether each source file needs to be copied or can be skipped because an identical file already exists at the destination. + +The analysis leverages Spacedrive's existing IndexerJob infrastructure with ephemeral configuration, meaning it performs a thorough scan without persisting results to the database. This reuses proven, optimized filesystem traversal code rather than implementing custom scanning logic. + +The system is designed to be non-blocking - if destination analysis is taking too long, copying can begin with fallback filesystem checks for individual files. However, when destination analysis completes quickly (which it often will for directories with reasonable file counts), it provides massive performance benefits by eliminating redundant copies entirely. + +This approach is particularly beneficial when copying to directories that already contain some of the source files, such as backing up a project directory to an existing backup location, or syncing files between devices where partial overlaps are common. + +Use existing IndexerJob patterns for destination scanning: ```rust -/// Indexer job that only reports files NOT in database -pub struct DeltaIndexerJob { - sources: SdPathBatch, - database: Arc, +/// Destination analysis using existing indexer infrastructure +pub struct DestinationAnalyzer { + destination: SdPath, +} + +impl DestinationAnalyzer { + /// Analyze destination using existing IndexerJob + pub async fn analyze_destination(&self, ctx: &JobContext<'_>) -> JobResult { + ctx.log("🎯 Analyzing destination for existence checking"); + + // Use existing IndexerJob with ephemeral configuration + let config = IndexerJobConfig { + location_id: None, // Ephemeral + path: self.destination.clone(), + mode: IndexMode::Shallow, // Just metadata + scope: IndexScope::Recursive, + persistence: IndexPersistence::Ephemeral, + max_depth: None, + }; + + let mut indexer = IndexerJob::new(config); + + // For now, run synchronously in preparation phase + // Later: convert to child job when infrastructure is ready + let result = indexer.run(ctx.clone()).await?; + + self.extract_destination_info(result).await + } + + async fn extract_destination_info(&self, result: IndexerOutput) -> JobResult { + // Extract file existence information from indexer results + let existing_files = if let Some(ephemeral_results) = result.ephemeral_results { + let index = ephemeral_results.read().await; + index.entries.keys().cloned().collect() + } else { + HashSet::new() + }; + + Ok(DestinationInfo { + existing_files, + total_files: result.stats.files, + total_size: result.stats.bytes, + }) + } +} + +#[derive(Debug)] +pub struct DestinationInfo { + pub existing_files: HashSet, + pub total_files: u64, + pub total_size: u64, +} +``` + +## Parent-Child Job Communication (Phase 3) + +The third phase represents the most sophisticated enhancement, introducing true concurrent processing where file discovery and copying happen simultaneously. This requires significant new infrastructure for parent-child job communication, but delivers the ultimate user experience where copy operations can handle any content with real-time discovery feedback. + +The core challenge this phase solves is complete file coverage. Due to Spacedrive's global filters, the database never contains a complete record of all files in a directory - filtered content like .git folders, node_modules, temporary files, and other excluded content exists on disk but not in the index. Traditional copy operations would miss this content or require expensive full filesystem scans. + +The parent-child architecture enables a hybrid approach: the parent copy job immediately begins copying files it knows about from the database, while simultaneously spawning child discovery jobs that scan the filesystem for unindexed content. As the child jobs discover new files, they stream this information back to the parent, which dynamically adds them to the copy queue. + +This creates a fluid, responsive experience where users see copying begin immediately with known files, then watch in real-time as additional files are discovered and added to the operation. The progress indicators smoothly update to reflect the expanding scope, and users get clear feedback about both the copying progress and discovery activity. + +The infrastructure required is substantial but follows established patterns. Child jobs are spawned with priority levels, ensuring discovery doesn't interfere with copying performance. Progress communication uses structured messaging to provide type-safe coordination between parent and child processes. The copy queue becomes thread-safe and dynamically expandable, allowing safe concurrent access from both copying and discovery operations. + +This section outlines the infrastructure needed for advanced concurrent discovery features. This is a future enhancement that will be implemented after the foundation is solid. + +### Required Infrastructure + +```rust +/// Enhanced JobContext with child job spawning capabilities +impl JobContext<'_> { + /// Spawn a child job with priority (NEEDS IMPLEMENTATION) + pub async fn spawn_child_with_priority(&self, job: J, priority: JobPriority) -> JobResult + where + J: Job + JobHandler + Send + 'static, + { + // Create child job record in database with parent_job_id + let child_id = self.job_manager.create_child_job(&job, self.id, priority).await?; + + // Spawn in task system with specified priority + let handle = self.task_system.spawn_with_priority( + JobExecutor::new(job, child_id, self.job_manager.clone()), + priority + ).await?; + + // Track child handle for cleanup + self.child_handles.lock().await.push(handle.clone()); + + Ok(handle) + } + + /// Subscribe to child job progress updates + pub fn subscribe_child_progress(&self, child_id: JobId) -> broadcast::Receiver { + self.job_manager.subscribe_job_progress(child_id) + } +} + +/// Job priority levels for task scheduling +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum JobPriority { + Low, + Normal, + High, + Critical, +} +``` + +### Delta Discovery Job (Future) + +The delta discovery job represents a specialized variant of the existing IndexerJob, optimized for finding only files that aren't already known to the database. Rather than performing a comprehensive indexing operation, it focuses specifically on identifying gaps in Spacedrive's knowledge. + +When processing discovered files, the delta job first checks whether each file exists in the database before reporting it to the parent copy job. This prevents duplicate work - if a file is already known from the initial database query, there's no need to "discover" it again. Only truly new files are streamed back to the parent as discovery progress events. + +This approach minimizes communication overhead and prevents the copy queue from being flooded with redundant file entries. The parent job receives a clean stream of genuinely new discoveries, which it can immediately add to the copy queue and update progress indicators accordingly. + +The delta discovery job maintains the same filesystem traversal efficiency as the regular IndexerJob but adds the database checking layer. This creates some overhead per file, but the trade-off is worthwhile given the alternative of either missing filtered content entirely or performing expensive full filesystem scans without database optimization. + +```rust +/// Specialized indexer job that only reports files NOT in database +pub struct DeltaDiscoveryJob { + sources: Vec, parent_job_id: JobId, } -impl DeltaIndexerJob { +impl DeltaDiscoveryJob { /// Process discovered file - only report if not in database async fn process_discovered_file( &self, @@ -367,257 +393,173 @@ impl DeltaIndexerJob { if !exists_in_db { // This is a NEW discovery - report to parent - ctx.progress(CopyProgress::FileDiscovered { - path: path.to_path_buf(), - size: metadata.len(), - discovery_batch: self.current_batch, - }); - - // Also add to ephemeral index for complete record - self.add_to_ephemeral_index(path, metadata).await?; + ctx.progress(Progress::structured(DiscoveryProgress { + discovered_file: DiscoveredFile { + path: path.to_path_buf(), + size: metadata.len(), + kind: if metadata.is_dir() { "directory" } else { "file" }.to_string(), + }, + batch_number: self.current_batch, + })); } // If exists in DB, parent already has it - don't report Ok(()) } - - /// Check if file exists in database (any indexed location) - async fn check_file_in_database(&self, path: &Path) -> JobResult { - // Find which location this path belongs to - let location = self.find_location_for_path(path).await?; - let Some(loc) = location else { - return Ok(false); // Not in any indexed location - }; - - // Calculate relative path - let location_root = Path::new(&loc.path); - let relative_path = path.strip_prefix(location_root) - .map_err(|_| JobError::execution("Path not in location"))?; - - let (rel_dir, filename) = if let Some(parent) = relative_path.parent() { - (parent.to_string_lossy().to_string(), relative_path.file_name().unwrap().to_string_lossy().to_string()) - } else { - (String::new(), relative_path.to_string_lossy().to_string()) - }; - - // Query database - let exists = entries::Entity::find() - .filter(entries::Column::LocationId.eq(loc.id)) - .filter(entries::Column::RelativePath.eq(rel_dir)) - .filter(entries::Column::Name.eq(filename)) - .one(&self.database.connection) - .await? - .is_some(); - - Ok(exists) - } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiscoveryProgress { + pub discovered_file: DiscoveredFile, + pub batch_number: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiscoveredFile { + pub path: PathBuf, + pub size: u64, + pub kind: String, +} + +impl JobProgress for DiscoveryProgress {} ``` -### 4. Enhanced Copy Job with Destination Pre-indexing +### Concurrent Copy Architecture (Future) + +The concurrent copy architecture enables the dynamic expansion of copy operations while they're running. Unlike traditional copy jobs that have a fixed list of files determined at startup, this system maintains a thread-safe queue that can grow as new files are discovered. + +The core challenge is coordinating between the copying process (which removes files from the queue) and the discovery process (which adds files to the queue), while maintaining accurate progress reporting throughout. The queue uses atomic operations for counters and proper locking for the file list to ensure thread safety without sacrificing performance. + +Progress reporting becomes more sophisticated, tracking not just files copied but also discovery activity. Users see indicators like "Copying files... (1,247 found so far)" which updates in real-time as both copying and discovery progress. The system must coordinate completion - the copy operation isn't finished until both all known files are copied AND all discovery jobs have completed. + +This architecture enables the ultimate user experience where copying begins immediately with known files, smoothly transitions to handle discovered content, and provides complete coverage of all user-selected files regardless of their indexing status. ```rust -/// Copy job with destination pre-indexing and hybrid execution -impl FileCopyJob { - async fn run(&mut self, ctx: JobContext<'_>) -> JobResult { - ctx.log("🚀 Starting progressive copy with destination pre-indexing"); - - // Phase 0: Start destination indexing FIRST (critical priority) - let destination_indexer = DestinationIndexer::new(self.destination_root.clone()); - let dest_handle = destination_indexer.start_indexing(&ctx).await?; - - // Phase 1: Instant database start (while destination indexes) - let prep_engine = HybridPreparationEngine::new(ctx.library_db()); - - // Get quick estimates for immediate user feedback - let estimates = prep_engine.get_database_estimates(&self.sources.paths).await?; - ctx.progress(CopyProgress::InitialEstimate { - known_files: estimates.estimated_files, - known_size: estimates.estimated_size, - discovery_starting: true, - }); - - // Get existing files from database - let existing_files = prep_engine.get_database_files(&self.sources.paths).await?; - ctx.log(format!("📊 Found {} known files from database", existing_files.len())); - - // Filter files through destination indexer (skip existing) - let filtered_files = self.filter_files_by_existence( - existing_files, - &destination_indexer - ).await?; - - ctx.log(format!("📋 {} files to copy after existence filtering", filtered_files.len())); - - // Initialize copy queue with filtered files - let copy_queue = Arc::new(Mutex::new(CopyQueue::new())); - copy_queue.lock().await.add_files(filtered_files); - - // Phase 2: Spawn source indexer for real-time discovery - let indexer_config = IndexerJobConfig::delta_discovery(self.sources.clone()); - let mut source_indexer = DeltaIndexerJob::from_config(indexer_config); - source_indexer.set_parent_job_id(ctx.job_id()); - - ctx.log("🔍 Spawning real-time source discovery indexer"); - let source_handle = ctx.spawn_child_with_priority(source_indexer, JobPriority::HIGH).await?; - - // Phase 3: Concurrent execution - copy known files while discovering new ones - let copy_queue_clone = copy_queue.clone(); - let copy_handle = tokio::spawn(async move { - self.execute_copy_queue(copy_queue_clone, &ctx).await - }); - - // Listen for discoveries and add to queue in real-time - let mut discoveries = indexer_handle.subscribe_progress(); - let mut discovery_active = true; - - while discovery_active { - tokio::select! { - // Handle discovery updates - progress_result = discoveries.recv() => { - match progress_result { - Ok(CopyProgress::FileDiscovered { path, size, .. }) => { - ctx.log(format!("📁 Discovered new file: {}", path.display())); - - let discovered_file = AnalyzedFile { - source_path: path, - destination_path: PathBuf::new(), - size, - is_completed: false, - }; - - copy_queue.lock().await.add_file(discovered_file); - } - Ok(CopyProgress::DiscoveryComplete { total_discovered, discovery_size }) => { - ctx.log(format!("✅ Discovery complete: {} new files ({} bytes)", - total_discovered, discovery_size)); - discovery_active = false; - } - _ => {} // Other progress types - } - } - - // Check if copy is done (but continue until discovery complete) - _ = tokio::time::sleep(Duration::from_millis(100)) => { - // Periodic progress updates - let queue_stats = copy_queue.lock().await.get_stats(); - ctx.progress(CopyProgress::OverallProgress { - copied_files: queue_stats.completed_files, - total_files: queue_stats.total_files, - copied_bytes: queue_stats.completed_bytes, - total_bytes: queue_stats.total_bytes, - discovery_active, - }); - } - } - } - - // Wait for both copy completion and indexer completion - let copy_result = copy_handle.await??; - let indexer_result = indexer_handle.await?; - - ctx.log("🎉 Progressive copy completed - all files copied including discoveries"); - - Ok(FileCopyOutput { - copied_files: copy_result.copied_files, - total_size: copy_result.total_size, - discovered_files: indexer_result.discovered_count, - }) - } -} - /// Thread-safe copy queue that can be expanded during execution #[derive(Debug)] -pub struct CopyQueue { - files: VecDeque, - completed_count: u64, - completed_bytes: u64, - total_bytes: u64, +pub struct DynamicCopyQueue { + files: Arc>>, + completed_count: Arc, + completed_bytes: Arc, + total_bytes: Arc, } -impl CopyQueue { +impl DynamicCopyQueue { pub fn new() -> Self { Self { - files: VecDeque::new(), - completed_count: 0, - completed_bytes: 0, - total_bytes: 0, + files: Arc::new(Mutex::new(VecDeque::new())), + completed_count: Arc::new(AtomicU64::new(0)), + completed_bytes: Arc::new(AtomicU64::new(0)), + total_bytes: Arc::new(AtomicU64::new(0)), } } - pub fn add_files(&mut self, files: Vec) { - for file in files { - self.total_bytes += file.size; - self.files.push_back(file); - } + /// Add file to queue (called by discovery process) + pub async fn add_discovered_file(&self, file: AnalyzedFile) { + self.total_bytes.fetch_add(file.size, Ordering::Relaxed); + self.files.lock().await.push_back(file); } - pub fn add_file(&mut self, file: AnalyzedFile) { - self.total_bytes += file.size; - self.files.push_back(file); + /// Get next file to copy (called by copy process) + pub async fn next_file(&self) -> Option { + self.files.lock().await.pop_front() } - pub fn next_file(&mut self) -> Option { - self.files.pop_front() - } - - pub fn mark_completed(&mut self, size: u64) { - self.completed_count += 1; - self.completed_bytes += size; - } - - pub fn get_stats(&self) -> QueueStats { - QueueStats { - total_files: self.completed_count + self.files.len() as u64, - completed_files: self.completed_count, - total_bytes: self.total_bytes, - completed_bytes: self.completed_bytes, - remaining_files: self.files.len() as u64, - } + /// Mark file as completed + pub fn mark_completed(&self, size: u64) { + self.completed_count.fetch_add(1, Ordering::Relaxed); + self.completed_bytes.fetch_add(size, Ordering::Relaxed); } } ``` -### 5. Required Job System Enhancements +## Implementation Examples -Based on the job system research, we need these implementations: +### Phase 1: Enhanced Progress Implementation + +This example shows how to immediately improve the existing FileCopyJob with minimal changes to the current codebase. The enhancement focuses on providing detailed progress feedback through multiple distinct phases, transforming the current "black box" experience into something that rivals the best file managers. + +The key insight is that this can be implemented entirely within the existing job framework without requiring new infrastructure. By using the established `Progress::structured()` pattern, the enhanced copy job can provide rich progress data that front-end applications can display in sophisticated ways. + +The implementation maintains full compatibility with the existing copy system while dramatically improving the user experience. Each phase transition provides clear feedback about what the system is doing, eliminating the frustrating periods of no feedback that characterize the current implementation. ```rust -/// Enhanced JobContext with child job spawning -impl JobContext<'_> { - /// Spawn a child job with priority (NEEDS IMPLEMENTATION) - pub async fn spawn_child_with_priority(&self, job: J, priority: JobPriority) -> JobResult> - where - J: Job + Send + 'static, - J::Output: Send + 'static, - { - // Create child job record in database with parent_job_id - let child_id = self.job_manager.create_child_job(&job, self.job_id, priority).await?; +/// Immediate improvement to existing FileCopyJob +impl FileCopyJob { + async fn run(&mut self, ctx: JobContext<'_>) -> JobResult { + // Phase 1: Database estimates (if available) + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::DatabaseQuery, + current_file: None, + files_copied: 0, + total_files: 0, + bytes_copied: 0, + total_bytes: 0, + discovery_active: false, + preparation_complete: false, + })); - // Spawn in task system with specified priority - let handle = self.task_system.spawn_with_priority( - JobExecutor::new(job, child_id, self.job_manager.clone()), - priority - ).await?; + // Try to get estimates from database + let prep_engine = CopyPreparationEngine::new(ctx.library_db()); + let estimates = prep_engine.get_database_estimates(&self.sources.paths).await + .unwrap_or_else(|_| EstimateData { + estimated_size: 0, + estimated_files: 0, + is_complete: false + }); - // Track child handle - self.child_handles.lock().unwrap().push(handle.clone()); + // Phase 2: Enhanced preparation + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::Preparation, + total_files: estimates.estimated_files, + total_bytes: estimates.estimated_size, + preparation_complete: false, + ..Default::default() + })); - Ok(handle) + // Calculate actual totals (existing logic enhanced) + let actual_total_bytes = self.calculate_total_size(&ctx).await?; + + // Phase 3: Copy with detailed progress + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::Copying, + total_files: self.sources.paths.len() as u64, + total_bytes: actual_total_bytes, + preparation_complete: true, + ..Default::default() + })); + + // Enhanced copy loop with better progress reporting + for (index, source) in self.sources.paths.iter().enumerate() { + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::Copying, + current_file: Some(source.display()), + files_copied: index as u64, + total_files: self.sources.paths.len() as u64, + total_bytes: actual_total_bytes, + preparation_complete: true, + ..Default::default() + })); + + // Existing copy logic... + let result = self.copy_single_file(source, &ctx).await; + // ... handle result + } + + // Phase 4: Complete + ctx.progress(Progress::structured(CopyProgressData { + phase: CopyPhase::Complete, + files_copied: self.sources.paths.len() as u64, + total_files: self.sources.paths.len() as u64, + bytes_copied: actual_total_bytes, + total_bytes: actual_total_bytes, + preparation_complete: true, + discovery_active: false, + })); + + // Return existing output... + Ok(self.existing_output) } - - /// Subscribe to child job progress updates - pub fn subscribe_child_progress(&self, child_id: JobId) -> broadcast::Receiver { - self.job_manager.subscribe_job_progress(child_id) - } -} - -/// Enhanced Progress types for structured communication -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum Progress { - Percentage(f64), - Message(String), - Structured(serde_json::Value), // For type-safe communication between jobs } ``` @@ -701,33 +643,73 @@ indexer discovers: ## Implementation Roadmap -### Phase 1: Foundation +### Phase 1: Enhanced Progress (Immediate - 1-2 weeks) -- [ ] Implement `spawn_child_with_priority()` in JobContext -- [ ] Add structured Progress types for parent-child communication -- [ ] Create HybridPreparationEngine with database query methods -- [ ] Build thread-safe CopyQueue with dynamic expansion +Building on existing infrastructure with immediate improvements: -### Phase 2: Delta Discovery +- [ ] **Enhanced Progress Types** + - Add `CopyProgressData` struct with detailed phase information + - Implement `JobProgress` trait for structured progress + - Update FileCopyJob to use `Progress::structured()` -- [ ] Implement DeltaIndexerJob that only reports files not in database -- [ ] Add database existence checking for discovered files -- [ ] Create IndexMode::DeltaDiscovery for child indexer jobs -- [ ] Test parent-child progress communication +- [ ] **Database Estimates** + - Create `CopyPreparationEngine` for database queries + - Query existing indexed files for instant size/count estimates + - Add fallback when database queries fail -### Phase 3: Copy Integration +- [ ] **Improved UX** + - Add preparation phase with "Preparing to copy..." messaging + - Show current file being processed + - Display realistic progress percentages -- [ ] Update FileCopyJob to use hybrid execution model -- [ ] Implement concurrent copying while discovery runs -- [ ] Add real-time queue expansion as files are discovered -- [ ] Ensure completion coordination between copy and discovery +**Success Criteria**: Copy operations show meaningful progress from start to finish -### Phase 4: Polish & Optimization +### Phase 2: Database Integration (Medium Term - 3-4 weeks) -- [ ] Add comprehensive error handling and fallbacks -- [ ] Implement progress reporting with discovery status -- [ ] Add performance monitoring and metrics -- [ ] Test with various directory sizes and content types +Leveraging Spacedrive's indexed data for faster preparation: + +- [ ] **Database Query Infrastructure** + - Implement location-aware file querying + - Add support for cross-device path resolution + - Create efficient database queries for file aggregation + +- [ ] **Destination Analysis** + - Use IndexerJob for destination pre-scanning + - Build existence maps for skip logic + - Optimize copy operations by avoiding existing files + +- [ ] **Smart Progress Updates** + - Use real database file counts instead of estimates + - Show preparation progress during destination analysis + - Display skip counts and conflict information + +**Success Criteria**: Large directory copies start instantly with accurate estimates + +### Phase 3: Concurrent Discovery (Advanced - 8-12 weeks) + +Full parent-child job communication for complete file coverage: + +- [ ] **Child Job Infrastructure** + - Implement `spawn_child_with_priority()` in JobContext + - Add parent-child progress communication + - Create job priority system and task scheduling + +- [ ] **Delta Discovery Jobs** + - Build `DeltaDiscoveryJob` that only reports unindexed files + - Add real-time file discovery during copying + - Implement database existence checking for discoveries + +- [ ] **Concurrent Copy Architecture** + - Create thread-safe `DynamicCopyQueue` + - Enable copying known files while discovering new ones + - Coordinate completion between copy and discovery processes + +- [ ] **Complete File Coverage** + - Ensure ALL user-selected files are copied (including filtered) + - Handle .git, node_modules, and other filtered content + - Provide real-time discovery feedback + +**Success Criteria**: Copy operations include all files with real-time discovery, matching Finder's UX ## File Existence and Overwrite Handling