From 6177fc480ea0a4f3cc95d2c6548db3942ae98a2a Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 12 Jan 2026 02:50:11 -0800 Subject: [PATCH] feat(CopyJobDetails): enhance file transfer UI with auto-scrolling and metadata refetching This commit improves the `CopyJobDetails` component by adding an auto-scroll feature that centers the currently copying file in the transfer queue. Additionally, it implements a mechanism to refetch copy metadata when the completed file count changes, ensuring that the displayed information is always up-to-date. The UI is further refined with updated icons and styles for better user experience during file operations. --- apps/cli/src/domains/events/args.rs | 16 +- apps/cli/src/domains/events/mod.rs | 14 +- apps/cli/src/domains/file/mod.rs | 1 + core/src/infra/job/manager.rs | 65 ++ core/src/ops/files/copy/job.rs | 55 +- core/src/ops/files/copy/metadata.rs | 1 + core/src/ops/files/copy/strategy.rs | 19 +- core/tests/copy_progress_test.rs | 841 ++++++++---------- .../JobManager/components/CopyJobDetails.tsx | 49 +- .../JobManager/hooks/useJobManager.ts | 1 + 10 files changed, 582 insertions(+), 480 deletions(-) diff --git a/apps/cli/src/domains/events/args.rs b/apps/cli/src/domains/events/args.rs index e74e28888..84d8dcda6 100644 --- a/apps/cli/src/domains/events/args.rs +++ b/apps/cli/src/domains/events/args.rs @@ -1,4 +1,4 @@ -use clap::{Args, Subcommand}; +use clap::{Args, Subcommand, ValueEnum}; #[derive(Debug, Subcommand)] pub enum EventsCmd { @@ -6,6 +6,16 @@ pub enum EventsCmd { Monitor(EventsMonitorArgs), } +#[derive(Debug, Clone, Copy, ValueEnum)] +pub enum OutputFormat { + /// Human-readable summary (default) + Human, + /// Compact JSON output + Json, + /// Pretty-printed JSON output + JsonPretty, +} + #[derive(Debug, Args)] pub struct EventsMonitorArgs { /// Filter by event type(s) (comma-separated, e.g., "JobProgress,JobCompleted") @@ -29,6 +39,10 @@ pub struct EventsMonitorArgs { #[arg(long)] pub timestamps: bool, + /// Output format + #[arg(short = 'f', long, value_enum, default_value = "human")] + pub format: OutputFormat, + /// Show full event JSON (verbose mode) #[arg(short = 'v', long)] pub verbose: bool, diff --git a/apps/cli/src/domains/events/mod.rs b/apps/cli/src/domains/events/mod.rs index d438c85ed..da5ea7126 100644 --- a/apps/cli/src/domains/events/mod.rs +++ b/apps/cli/src/domains/events/mod.rs @@ -97,16 +97,22 @@ fn display_event(event: &Event, args: &EventsMonitorArgs) { let event_variant = event.variant_name(); - if args.verbose || args.pretty { - // Verbose mode: show full JSON - let json_str = if args.pretty { + // Determine output format (new format flag takes precedence over legacy flags) + let use_json_pretty = matches!(args.format, OutputFormat::JsonPretty) || args.pretty; + let use_json = matches!(args.format, OutputFormat::Json) + || use_json_pretty + || args.verbose; + + if use_json { + // JSON mode: show full JSON + let json_str = if use_json_pretty { serde_json::to_string_pretty(&event).unwrap_or_else(|_| format!("{:?}", event)) } else { serde_json::to_string(&event).unwrap_or_else(|_| format!("{:?}", event)) }; println!("{}{}: {}", timestamp, event_variant, json_str); } else { - // Compact mode: show event type and key fields + // Human-readable mode: show event type and key fields let summary = summarize_event(event); println!("{}{}: {}", timestamp, event_variant, summary); } diff --git a/apps/cli/src/domains/file/mod.rs b/apps/cli/src/domains/file/mod.rs index 64d6380a8..72401f66b 100644 --- a/apps/cli/src/domains/file/mod.rs +++ b/apps/cli/src/domains/file/mod.rs @@ -137,6 +137,7 @@ async fn run_copy_with_confirmation( "Rename the new file(s) (e.g., file.txt -> file (1).txt)".to_string(), "Abort this copy operation".to_string(), ], + metadata: None, }; let choice_index = prompt_for_choice(request)?; diff --git a/core/src/infra/job/manager.rs b/core/src/infra/job/manager.rs index 2ddbc2176..087a8ef81 100644 --- a/core/src/infra/job/manager.rs +++ b/core/src/infra/job/manager.rs @@ -351,6 +351,9 @@ impl JobManager { match task_handle { Ok(handle_result) => { // Track running job + // Clone latest_progress for monitoring task before moving into RunningJob + let latest_progress_for_monitor = latest_progress.clone(); + self.running_jobs.write().await.insert( job_id, RunningJob { @@ -408,6 +411,35 @@ impl JobManager { } }; + // Emit final progress event if one exists (may have been throttled) + if let Some(final_progress) = latest_progress_for_monitor.lock().await.as_ref() { + let generic_progress = match final_progress { + Progress::Structured(value) => { + // Try to deserialize CopyProgress and convert to GenericProgress + if let Ok(copy_progress) = serde_json::from_value::< + crate::ops::files::copy::CopyProgress, + >(value.clone()) + { + use crate::infra::job::generic_progress::ToGenericProgress; + Some(copy_progress.to_generic_progress()) + } else { + None + } + } + Progress::Generic(gp) => Some(gp.clone()), + _ => None, + }; + + event_bus.emit(Event::JobProgress { + job_id: job_id_clone.to_string(), + job_type: job_type_str.to_string(), + device_id, + progress: final_progress.as_percentage().unwrap_or(0.0) as f64, + message: Some(final_progress.to_string()), + generic_progress, + }); + } + // Emit completion event with the job's output event_bus.emit(Event::JobCompleted { job_id: job_id_clone.to_string(), @@ -759,6 +791,7 @@ impl JobManager { let job_type_str = J::NAME; let library_id_clone = self.library_id; let context = self.context.clone(); + let latest_progress_for_monitor = latest_progress.clone(); let device_id = self .context .device_manager @@ -798,6 +831,35 @@ impl JobManager { } }; + // Emit final progress event if one exists (may have been throttled) + if let Some(final_progress) = latest_progress_for_monitor.lock().await.as_ref() { + let generic_progress = match final_progress { + Progress::Structured(value) => { + // Try to deserialize CopyProgress and convert to GenericProgress + if let Ok(copy_progress) = serde_json::from_value::< + crate::ops::files::copy::CopyProgress, + >(value.clone()) + { + use crate::infra::job::generic_progress::ToGenericProgress; + Some(copy_progress.to_generic_progress()) + } else { + None + } + } + Progress::Generic(gp) => Some(gp.clone()), + _ => None, + }; + + event_bus.emit(Event::JobProgress { + job_id: job_id_clone.to_string(), + job_type: job_type_str.to_string(), + device_id, + progress: final_progress.as_percentage().unwrap_or(0.0) as f64, + message: Some(final_progress.to_string()), + generic_progress, + }); + } + // Emit completion event with the job's output event_bus.emit(Event::JobCompleted { job_id: job_id_clone.to_string(), @@ -1412,6 +1474,9 @@ impl JobManager { { Ok(task_handle) => { // Track running job + // Clone latest_progress for monitoring task before moving into RunningJob + let latest_progress_for_monitor = latest_progress.clone(); + self.running_jobs.write().await.insert( job_id, RunningJob { diff --git a/core/src/ops/files/copy/job.rs b/core/src/ops/files/copy/job.rs index ffb9d52a9..e786bae6b 100644 --- a/core/src/ops/files/copy/job.rs +++ b/core/src/ops/files/copy/job.rs @@ -373,6 +373,9 @@ impl JobHandler for FileCopyJob { }; copied_count += files_in_source; // Count actual files as copied for progress tracking + + // Mark as completed in metadata (already done during previous run) + self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Completed); continue; } @@ -418,6 +421,9 @@ impl JobHandler for FileCopyJob { 1 }; + // Mark file as currently copying in metadata + self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Copying); + // Update aggregator with current file info let operation_description = CopyStrategyRouter::describe_strategy( &resolved_source, @@ -495,6 +501,10 @@ impl JobHandler for FileCopyJob { if let Some(dest_path) = final_destination.as_local_path() { if dest_path.exists() { ctx.log(format!("Skipping existing file: {}", dest_path.display())); + + // Mark as skipped in metadata + self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Skipped); + // Skip this file progress_aggregator.complete_source(); copied_count += files_in_source; @@ -558,6 +568,9 @@ impl JobHandler for FileCopyJob { // Track successful completion for resume self.completed_indices.push(index); + // Mark as completed in metadata + self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Completed); + // If this is a move operation and the strategy didn't handle deletion, // we need to delete the source after successful copy if is_move && resolved_source.device_slug() == final_destination.device_slug() { @@ -608,10 +621,16 @@ impl JobHandler for FileCopyJob { resolved_source.display(), e )); + + // Mark as failed in metadata + self.job_metadata.set_error(&resolved_source, e.to_string()); } } - // Checkpoint every 20 files to save completed_indices + // Persist after every file so UI can show real-time checkbox updates + self.persist_job_state_to_db(&ctx).await?; + + // Checkpoint every 20 files to save job state to disk if copied_count % 20 == 0 { ctx.checkpoint().await?; } @@ -642,6 +661,9 @@ impl JobHandler for FileCopyJob { }; ctx.progress(Progress::generic(progress.to_generic_progress())); + // Persist final job state with all file statuses + self.persist_job_state_to_db(&ctx).await?; + ctx.log(format!( "Copy operation completed: {} copied, {} failed", copied_count, @@ -785,21 +807,27 @@ impl<'a> ProgressAggregator<'a> { Box::new(move |bytes_value: u64, signal_value: u64| { // Signal: u64::MAX means a file has finished, bytes_value is its size if signal_value == u64::MAX { + // Update backend state let mut files = files_completed.lock().unwrap(); *files += 1; + drop(files); let mut bytes = bytes_before.lock().unwrap(); *bytes += bytes_value; - ctx.log(format!( - "File completed. Total files: {}/{}, Total bytes: {}", - *files, total_files, *bytes - )); - return; + drop(bytes); } - // Normal byte-level progress update - let bytes_before_snapshot = *bytes_before.lock().unwrap(); - let total_bytes_copied = bytes_before_snapshot + bytes_value; + // Read current backend state for progress event (AFTER mutations above) let files_completed_count = *files_completed.lock().unwrap(); + let bytes_before_snapshot = *bytes_before.lock().unwrap(); + + // Calculate total bytes for this progress event + let total_bytes_copied = if signal_value == u64::MAX { + // File just completed - bytes were already added to bytes_before above + bytes_before_snapshot + } else { + // Byte-level progress - bytes_value is current file progress within current file + bytes_before_snapshot + bytes_value + }; // Update speed tracker and get current rate let mut tracker = speed_tracker.lock().unwrap(); @@ -829,8 +857,13 @@ impl<'a> ProgressAggregator<'a> { strategy_metadata: current_strategy_metadata, }; - // Log progress details every 100MB - if total_bytes_copied % (100 * 1024 * 1024) < bytes_value { + // Log progress details every 100MB or on file completion + if signal_value == u64::MAX { + ctx.log(format!( + "File completed. Total files: {}/{}, Total bytes: {}", + files_completed_count, total_files, total_bytes_copied + )); + } else if total_bytes_copied % (100 * 1024 * 1024) < bytes_value { ctx.log(format!( "Progress: {} / {} bytes ({:.1}%), {}/s, ETA: {}", total_bytes_copied, diff --git a/core/src/ops/files/copy/metadata.rs b/core/src/ops/files/copy/metadata.rs index 07dcd5be1..bda25ce3e 100644 --- a/core/src/ops/files/copy/metadata.rs +++ b/core/src/ops/files/copy/metadata.rs @@ -10,6 +10,7 @@ use specta::Type; /// Status of a file in the copy operation. #[derive(Debug, Clone, Serialize, Deserialize, Type, PartialEq)] +#[serde(rename_all = "lowercase")] pub enum CopyFileStatus { /// File is waiting to be copied Pending, diff --git a/core/src/ops/files/copy/strategy.rs b/core/src/ops/files/copy/strategy.rs index ca803b9c6..f071d9b4f 100644 --- a/core/src/ops/files/copy/strategy.rs +++ b/core/src/ops/files/copy/strategy.rs @@ -111,7 +111,7 @@ impl CopyStrategy for LocalMoveStrategy { fs::rename(source_path, dest_path).await?; if let Some(callback) = progress_callback { - callback(size, size); + callback(size, u64::MAX); } ctx.log(format!( @@ -235,6 +235,11 @@ impl CopyStrategy for FastCopyStrategy { } } + // Signal file completion to aggregator + if let Some(callback) = progress_callback { + callback(bytes_copied, u64::MAX); + } + ctx.log(format!( "Fast copy: {} -> {} ({} bytes)", source_path.display(), @@ -752,6 +757,11 @@ impl RemoteTransferStrategy { final_dest_path.display() )); + // Signal file completion to aggregator + if let Some(callback) = progress_callback { + callback(total_bytes_received, u64::MAX); + } + Ok(total_bytes_received) } } @@ -931,7 +941,7 @@ async fn copy_single_file_with_offset<'a>( dest_file.sync_all().await?; if let Some(callback) = progress_callback { - callback(total_copied, file_size); + callback(file_size, u64::MAX); ctx.log(format!( "Strategy final progress: {} / {} bytes (100%)", total_copied, file_size @@ -1305,5 +1315,10 @@ async fn stream_file_data<'a>( chunk_index, bytes_transferred, destination_device_id )); + // Signal file completion to aggregator + if let Some(callback) = progress_callback { + callback(bytes_transferred, u64::MAX); + } + Ok(()) } diff --git a/core/tests/copy_progress_test.rs b/core/tests/copy_progress_test.rs index 094718b17..44918b8c2 100644 --- a/core/tests/copy_progress_test.rs +++ b/core/tests/copy_progress_test.rs @@ -1,7 +1,11 @@ -//! Test for monitoring copy progress with large files +//! Test for monitoring copy progress with file-level tracking //! -//! This test verifies that copy progress updates smoothly with byte-level -//! granularity rather than jumping in large increments. +//! This test verifies that: +//! 1. File status metadata updates correctly (Pending -> Copying -> Completed) +//! 2. Progress events show accurate file counts +//! 3. Progress bar doesn't reset between files +//! +//! The test captures all events and queries metadata for post-mortem analysis. use sd_core::domain::addressing::{SdPath, SdPathBatch}; use sd_core::{ @@ -9,7 +13,9 @@ use sd_core::{ ops::files::copy::{action::FileCopyAction, input::CopyMethod, job::CopyOptions}, Core, }; +use serde::{Deserialize, Serialize}; use std::{ + collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -46,16 +52,55 @@ async fn create_large_test_file( Ok(()) } -#[derive(Debug, Clone)] +/// Progress snapshot with full event details +#[derive(Debug, Clone, Serialize, Deserialize)] struct ProgressSnapshot { - timestamp: std::time::Instant, + timestamp_ms: u128, percentage: f32, bytes_copied: u64, + bytes_total: u64, + files_copied: usize, + files_total: usize, message: String, + phase: String, +} + +/// Job metadata snapshot from query +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MetadataSnapshot { + timestamp_ms: u128, + total_files: usize, + file_statuses: HashMap, +} + +/// Complete test snapshot for analysis +#[derive(Debug, Serialize, Deserialize)] +struct TestSnapshot { + test_name: String, + file_count: usize, + file_size_mb: usize, + copy_method: String, + progress_events: Vec, + metadata_queries: Vec, + final_metadata: Option, + summary: TestSummary, +} + +#[derive(Debug, Serialize, Deserialize)] +struct TestSummary { + total_progress_events: usize, + total_metadata_queries: usize, + min_percentage: f32, + max_percentage: f32, + percentage_jumps: Vec, + max_jump: f32, + files_completed_at_end: usize, + test_passed: bool, + failure_reason: Option, } #[tokio::test] -async fn test_copy_progress_monitoring_large_file() { +async fn test_copy_progress_with_metadata_tracking() { // Initialize tracing subscriber for debug logs let _guard = tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer()) @@ -65,6 +110,9 @@ async fn test_copy_progress_monitoring_large_file() { ) .set_default(); + println!("\n=== Copy Progress Metadata Test ==="); + println!("This test captures all progress events and metadata queries for analysis\n"); + // Setup test environment let temp_dir = TempDir::new().unwrap(); let test_root = temp_dir.path(); @@ -75,30 +123,26 @@ async fn test_copy_progress_monitoring_large_file() { fs::create_dir_all(&source_dir).await.unwrap(); fs::create_dir_all(&dest_dir).await.unwrap(); - // Create a large test file (500MB to ensure enough time for progress updates) - let source_file = source_dir.join("large_test_file.bin"); - let file_size_mb = 500; // 500MB + // Create 3 test files of 1GB each + let file_size_mb = 1000; + let file_count = 3; + let mut source_files = Vec::new(); - println!("Creating {}MB test file...", file_size_mb); - create_large_test_file(&source_file, file_size_mb) - .await - .unwrap(); + for i in 1..=file_count { + let file_path = source_dir.join(format!("test_file_{}.bin", i)); + println!("Creating test_file_{}.bin ({}MB)...", i, file_size_mb); + create_large_test_file(&file_path, file_size_mb) + .await + .unwrap(); + source_files.push(file_path); + } - // Verify file size - let metadata = fs::metadata(&source_file).await.unwrap(); - let expected_size = (file_size_mb * 1024 * 1024) as u64; - assert_eq!( - metadata.len(), - expected_size, - "Test file should be exactly {}MB", - file_size_mb - ); - - // Initialize core with custom data directory + println!("Initializing Spacedrive Core..."); let core_data_dir = test_root.join("core_data"); - let core = Core::new(core_data_dir).await.unwrap(); + let core = Core::new(core_data_dir.clone()).await.unwrap(); // Create a test library + println!("Creating test library..."); let library = core .libraries .create_library("Progress Test Library", None, core.context.clone()) @@ -109,326 +153,21 @@ async fn test_copy_progress_monitoring_large_file() { // Create ActionManager let context = core.context.clone(); - let action_manager = ActionManager::new(context); + let action_manager = ActionManager::new(context.clone()); - // Build the copy action with the exact options from the CLI command + // Build the copy action let copy_action = FileCopyAction { - sources: SdPathBatch::new(vec![SdPath::local(source_file.clone())]), + sources: SdPathBatch::new( + source_files + .iter() + .map(|p| SdPath::local(p.clone())) + .collect(), + ), destination: SdPath::local(dest_dir.clone()), options: CopyOptions { conflict_resolution: None, - overwrite: true, // Bypass confirmation workflow in tests - verify_checksum: true, // --verify - preserve_timestamps: true, // --preserve-timestamps - delete_after_copy: false, - move_mode: None, - copy_method: CopyMethod::Streaming, // --method streaming - }, - on_conflict: None, - }; - - // Dispatch the action directly via ActionManager (library-scoped) - - // Setup progress monitoring - let progress_snapshots = Arc::new(Mutex::new(Vec::new())); - let progress_snapshots_clone = progress_snapshots.clone(); - let start_time = std::time::Instant::now(); - - // Subscribe to events BEFORE dispatching to avoid race condition - let mut event_subscriber = core.events.subscribe(); - - // Execute the action - println!("Starting copy operation..."); - let _job_handle = action_manager - .dispatch_library(Some(library_id), copy_action) - .await - .expect("Action dispatch should succeed"); - - // Job ID will be read from first Job* event below - let expected_size_clone = expected_size; - let mut observed_job_id: Option = None; - - // Start monitoring task using EventBus - let monitor_handle = tokio::spawn(async move { - let mut last_progress = 0.0; - let mut has_seen_progress = false; - let mut event_count = 0; - - while let Ok(event) = event_subscriber.recv().await { - event_count += 1; - - match event { - Event::JobProgress { - job_id: event_job_id, - progress, - message, - .. - } => { - if observed_job_id.is_none() { - observed_job_id = Some(event_job_id.clone()); - } - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } - let current_progress = progress * 100.0; - - // Record snapshot if progress changed - if (current_progress - last_progress).abs() > 0.01 { - has_seen_progress = true; - - let snapshot = ProgressSnapshot { - timestamp: std::time::Instant::now(), - percentage: current_progress as f32, - bytes_copied: (expected_size_clone as f64 * (progress as f64)) as u64, - message: message.unwrap_or_else(|| format!("{:.1}%", current_progress)), - }; - - println!( - "Progress: {:.1}% ({} MB)", - current_progress, - snapshot.bytes_copied / (1024 * 1024) - ); - - progress_snapshots_clone.lock().unwrap().push(snapshot); - last_progress = current_progress; - } - } - Event::JobCompleted { - job_id: event_job_id, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } else { - observed_job_id = Some(event_job_id.clone()); - } - println!("Job completed! (after {} events)", event_count); - println!("Final progress: {:.1}%", last_progress); - - // Record final progress if we haven't seen any updates - if !has_seen_progress && last_progress == 0.0 { - let snapshot = ProgressSnapshot { - timestamp: std::time::Instant::now(), - percentage: 100.0, - bytes_copied: expected_size_clone, - message: "Final".to_string(), - }; - progress_snapshots_clone.lock().unwrap().push(snapshot); - } - break; - } - Event::JobFailed { - job_id: event_job_id, - error, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } else { - observed_job_id = Some(event_job_id.clone()); - } - println!("Job failed after {} events: {}", event_count, error); - panic!("Job failed: {}", error); - } - Event::JobCancelled { - job_id: event_job_id, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } else { - observed_job_id = Some(event_job_id.clone()); - } - println!("Job was cancelled after {} events", event_count); - break; - } - _ => { - // Other events - continue monitoring - } - } - } - - has_seen_progress - }); - - // Wait for job completion with timeout (120s for large file on slower systems) - let completion_result = timeout(Duration::from_secs(120), monitor_handle).await; - - let has_seen_progress = match completion_result { - Ok(Ok(has_progress)) => { - println!("Monitoring completed successfully"); - has_progress - } - Ok(Err(e)) => panic!("Monitoring task failed: {}", e), - Err(_) => panic!("Copy operation timed out after 120 seconds"), - }; - - // Analyze progress snapshots - let snapshots = progress_snapshots.lock().unwrap(); - println!("\n=== Progress Analysis ==="); - println!("Total snapshots captured: {}", snapshots.len()); - println!("Saw progress updates during copy: {}", has_seen_progress); - - // First check if we got ANY progress updates at all - if snapshots.is_empty() { - panic!( - "No progress updates were captured! Progress stayed at 0% throughout the entire copy operation. \ - This indicates the progress reporting is not working correctly." - ); - } - - // If we only got one snapshot at the end, that's also a problem - if snapshots.len() == 1 && !has_seen_progress { - panic!( - "Only captured final progress update. Progress reporting did not work during the copy operation." - ); - } - - // With modern fast SSDs, progress updates happen every 50ms but files copy very quickly - // Expecting at least 3-5 updates for a large file is reasonable - if snapshots.len() < 3 { - panic!( - "Too few progress updates captured! Only {} snapshots for a {}MB file. \ - Expected at least a few progress updates throughout the operation.", - snapshots.len(), - file_size_mb - ); - } - - // Calculate progress increments - let mut increments = Vec::new(); - for i in 1..snapshots.len() { - let increment = snapshots[i].percentage - snapshots[i - 1].percentage; - if increment > 0.0 { - increments.push(increment); - } - } - - // Calculate statistics - let avg_increment = increments.iter().sum::() / increments.len() as f32; - let max_increment = increments.iter().cloned().fold(0.0f32, f32::max); - let min_increment = increments.iter().cloned().fold(100.0f32, f32::min); - - println!("Progress increments:"); - println!(" Average: {:.2}%", avg_increment); - println!(" Maximum: {:.2}%", max_increment); - println!(" Minimum: {:.2}%", min_increment); - println!(" Total updates: {}", increments.len()); - - // Verify smooth progress (no large jumps) - // For fast SSDs, progress updates happen every 50ms but files copy quickly - // Accept up to 25% jumps since the actual granularity depends on I/O speed - assert!( - max_increment < 25.0, - "Progress jumped by {:.1}% - should update reasonably smoothly (max 25%)", - max_increment - ); - - // Verify we got reasonable granularity - // With fast SSDs, we may get fewer updates than expected - assert!( - snapshots.len() > 5, - "Expected at least 5 progress updates for a {}MB file, got {}", - file_size_mb, - snapshots.len() - ); - - // Verify file was copied successfully - let dest_file = dest_dir.join("large_test_file.bin"); - assert!(dest_file.exists(), "Destination file should exist"); - - let dest_metadata = fs::metadata(&dest_file).await.unwrap(); - assert_eq!( - dest_metadata.len(), - expected_size, - "Copied file size should match source" - ); - - // Calculate effective copy speed - let total_time = start_time.elapsed(); - let mb_per_second = (file_size_mb as f64) / total_time.as_secs_f64(); - println!("\nCopy performance: {:.1} MB/s", mb_per_second); - - println!("\nCopy progress monitoring test passed!"); - println!(" - Progress updated smoothly with byte-level granularity"); - println!(" - No large progress jumps detected"); - println!(" - File copied successfully with checksum verification"); - - // Cleanup: shutdown Core to stop background services - core.shutdown().await.unwrap(); - - // Explicitly drop Core to free resources - drop(core); - - // Give time for all async cleanup to complete (increased for slower systems) - tokio::time::sleep(std::time::Duration::from_millis(500)).await; -} - -#[tokio::test] -async fn test_copy_progress_multiple_files() { - // Initialize tracing subscriber for debug logs - let _guard = tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer()) - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), - ) - .try_init(); - - // This test verifies progress tracking across multiple files - let temp_dir = TempDir::new().unwrap(); - let test_root = temp_dir.path(); - - let source_dir = test_root.join("source"); - let dest_dir = test_root.join("destination"); - fs::create_dir_all(&source_dir).await.unwrap(); - fs::create_dir_all(&dest_dir).await.unwrap(); - - // Create 4 files of different sizes (large enough to ensure progress is captured) - let files = vec![ - ("file1.bin", 100), // 100MB - ("file2.bin", 200), // 200MB - ("file3.bin", 150), // 150MB - ("file4.bin", 50), // 50MB - ]; - - let mut source_files = Vec::new(); - for (name, size_mb) in &files { - let path = source_dir.join(name); - println!("Creating {} ({}MB)...", name, size_mb); - create_large_test_file(&path, *size_mb).await.unwrap(); - source_files.push(path); - } - - // Initialize core and library - let core_data_dir = test_root.join("core_data"); - let core = Core::new(core_data_dir).await.unwrap(); - let library = core - .libraries - .create_library("Multi-file Progress Test", None, core.context.clone()) - .await - .unwrap(); - let library_id = library.id(); - - let context = core.context.clone(); - let action_manager = ActionManager::new(context); - - // Build copy action for multiple files - let copy_action = FileCopyAction { - sources: SdPathBatch::new(source_files.iter().cloned().map(SdPath::local).collect()), - destination: SdPath::local(dest_dir.clone()), - options: CopyOptions { - conflict_resolution: None, - overwrite: true, // Bypass confirmation workflow in tests - verify_checksum: true, + overwrite: true, + verify_checksum: false, // Disable for speed preserve_timestamps: true, delete_after_copy: false, move_mode: None, @@ -437,154 +176,346 @@ async fn test_copy_progress_multiple_files() { on_conflict: None, }; - // Dispatch the action directly via ActionManager (library-scoped) - - // Setup progress monitoring + // Setup data collection let progress_snapshots = Arc::new(Mutex::new(Vec::new())); - let progress_snapshots_clone = progress_snapshots.clone(); + let metadata_snapshots = Arc::new(Mutex::new(Vec::new())); + let progress_clone = progress_snapshots.clone(); + let metadata_clone = metadata_snapshots.clone(); + let library_clone = library.clone(); + let core_context_clone = core.context.clone(); + let start_time = std::time::Instant::now(); - // Subscribe to events BEFORE dispatching to avoid race condition + // Subscribe to events BEFORE dispatching let mut event_subscriber = core.events.subscribe(); - // Execute the action - println!("\nStarting multi-file copy operation..."); - let _job_handle = action_manager - .dispatch_library(Some(library_id), copy_action) - .await - .expect("Action dispatch should succeed"); - let mut observed_job_id: Option = None; + // Start monitoring task BEFORE dispatching to avoid missing events + let (job_id_tx, job_id_rx) = tokio::sync::oneshot::channel::(); let monitor_handle = tokio::spawn(async move { - let mut last_progress = 0.0; + // Wait for job ID to be sent + let job_id = job_id_rx.await.expect("Job ID should be sent"); + let mut event_count = 0; + let mut metadata_query_count = 0; - while let Ok(event) = event_subscriber.recv().await { - match event { - Event::JobProgress { - job_id: event_job_id, - progress, - .. - } => { - if observed_job_id.is_none() { - observed_job_id = Some(event_job_id.clone()); - } - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; + println!("Monitor task started, listening for job {}", job_id); + + loop { + // Wait for next event with timeout for metadata queries + match tokio::time::timeout(Duration::from_millis(500), event_subscriber.recv()).await { + Ok(Ok(event)) => { + event_count += 1; + + // Debug ALL events + println!("[DEBUG] Event #{}: {:?}", event_count, event); + + match event { + Event::JobProgress { + job_id: event_job_id, + generic_progress, + progress, + message, + .. + } => { + if event_job_id != job_id.to_string() { + continue; + } + + println!( + "[DEBUG] JobProgress event: progress={}, message={:?}, generic_progress.is_some()={}", + progress, + message, + generic_progress.is_some() + ); + + if let Some(gen_progress) = generic_progress { + let snapshot = ProgressSnapshot { + timestamp_ms: start_time.elapsed().as_millis(), + percentage: gen_progress.percentage, + bytes_copied: gen_progress + .completion + .bytes_completed + .unwrap_or(0), + bytes_total: gen_progress.completion.total_bytes.unwrap_or(0), + files_copied: gen_progress.completion.completed as usize, + files_total: gen_progress.completion.total as usize, + message: gen_progress.message.clone(), + phase: gen_progress.phase.clone(), + }; + + println!( + "[{:>6}ms] Progress: {:.1}% | Files: {}/{} | Bytes: {}/{} {}", + snapshot.timestamp_ms, + snapshot.percentage * 100.0, + snapshot.files_copied, + snapshot.files_total, + snapshot.bytes_copied / (1024 * 1024), + snapshot.bytes_total / (1024 * 1024), + snapshot.phase + ); + + progress_clone.lock().unwrap().push(snapshot); + } + } + Event::JobCompleted { + job_id: event_job_id, + .. + } => { + if event_job_id != job_id.to_string() { + continue; + } + println!("\n✓ Job completed after {} events", event_count); + break; + } + Event::JobFailed { + job_id: event_job_id, + error, + .. + } => { + if event_job_id != job_id.to_string() { + continue; + } + println!("\n✗ Job failed: {}", error); + panic!("Job failed: {}", error); + } + _ => { + // Other events } } - let current_progress = progress * 100.0; + } + Ok(Err(_)) => { + // Channel closed + break; + } + Err(_) => { + // Timeout - query metadata + if metadata_query_count < 20 { + // Limit queries + use sd_core::infra::query::LibraryQuery; - if (current_progress - last_progress).abs() > 0.01 { - let snapshot = ProgressSnapshot { - timestamp: std::time::Instant::now(), - percentage: current_progress as f32, - bytes_copied: 0, // Would need to calculate from percentage - message: format!("{:.1}%", current_progress), + let query_input = sd_core::ops::jobs::copy_metadata::query::CopyMetadataQueryInput { + job_id: job_id.into(), }; - println!("Multi-file progress: {:.1}%", current_progress); - progress_snapshots_clone.lock().unwrap().push(snapshot); - last_progress = current_progress; - } - } - Event::JobCompleted { - job_id: event_job_id, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; + let query = + sd_core::ops::jobs::copy_metadata::query::CopyMetadataQuery::from_input( + query_input, + ) + .unwrap(); + + let mut session = sd_core::infra::api::SessionContext::device_session( + uuid::Uuid::new_v4(), + "test-device".to_string(), + ); + session.current_library_id = Some(library_clone.id()); + + match query.execute(core_context_clone.clone(), session).await { + Ok(result) => { + if let Some(metadata) = result.metadata { + let mut file_statuses = HashMap::new(); + for file in &metadata.files { + let status = format!("{:?}", file.status); + let name = file + .source_path + .path() + .and_then(|p| { + p.file_name() + .map(|n| n.to_string_lossy().to_string()) + }) + .unwrap_or_else(|| "unknown".to_string()); + file_statuses.insert(name, status); + } + + let snapshot = MetadataSnapshot { + timestamp_ms: start_time.elapsed().as_millis(), + total_files: metadata.files.len(), + file_statuses, + }; + + metadata_clone.lock().unwrap().push(snapshot); + metadata_query_count += 1; + } + } + Err(e) => { + eprintln!("Metadata query failed: {}", e); + } } - } else { - observed_job_id = Some(event_job_id.clone()); } - println!("Multi-file job completed"); - break; - } - Event::JobFailed { - job_id: event_job_id, - error, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } else { - observed_job_id = Some(event_job_id.clone()); - } - panic!("Multi-file job failed: {}", error); - } - Event::JobCancelled { - job_id: event_job_id, - .. - } => { - if let Some(ref jid) = observed_job_id { - if &event_job_id != jid { - continue; - } - } else { - observed_job_id = Some(event_job_id.clone()); - } - println!("Multi-file job was cancelled"); - break; - } - _ => { - // Other events - continue monitoring } } } + + (event_count, metadata_query_count) }); - timeout(Duration::from_secs(60), monitor_handle) + // NOW dispatch the job (monitor is already listening) + println!("\nStarting copy operation for {} files...\n", file_count); + let job_handle = action_manager + .dispatch_library(Some(library_id), copy_action) .await - .expect("Multi-file copy should complete within 60 seconds") - .expect("Monitor task should succeed"); + .expect("Action dispatch should succeed"); - // Analyze progress - let snapshots = progress_snapshots.lock().unwrap(); - println!("\n=== Multi-file Progress Analysis ==="); - println!("Total snapshots: {}", snapshots.len()); + let job_id = job_handle.id; - // Must have captured at least some progress updates during the copy - assert!( - snapshots.len() >= 3, - "Expected at least 3 progress snapshots for multi-file copy, got {}. \ - Progress tracking may not be working properly.", - snapshots.len() + // Send job ID to monitoring task + job_id_tx.send(job_id).expect("Monitor task should be running"); + + // Wait for job completion with timeout + let (event_count, metadata_query_count) = + match timeout(Duration::from_secs(60), monitor_handle).await { + Ok(Ok(result)) => result, + Ok(Err(e)) => panic!("Monitoring task failed: {}", e), + Err(_) => panic!("Copy operation timed out after 60 seconds"), + }; + + println!("\n=== Data Collection Summary ==="); + println!("Progress events captured: {}", event_count); + println!("Metadata queries executed: {}", metadata_query_count); + + // Query final metadata state + println!("\nQuerying final job metadata..."); + use sd_core::infra::query::LibraryQuery; + let query_input = + sd_core::ops::jobs::copy_metadata::query::CopyMetadataQueryInput { job_id: job_id.into() }; + let query = + sd_core::ops::jobs::copy_metadata::query::CopyMetadataQuery::from_input(query_input) + .unwrap(); + let mut session = sd_core::infra::api::SessionContext::device_session( + uuid::Uuid::new_v4(), + "test-device".to_string(), ); + session.current_library_id = Some(library_id); + let final_ctx = context.clone(); + let final_metadata_result = query.execute(final_ctx, session).await.unwrap(); + let final_metadata_json = serde_json::to_value(&final_metadata_result).unwrap(); - // With 4 files totaling 500MB, we should see smooth progress - // not 4 discrete 25% jumps - let mut increments = Vec::new(); - for i in 1..snapshots.len() { - let increment = snapshots[i].percentage - snapshots[i - 1].percentage; - if increment > 0.0 { - increments.push(increment); + // Analyze collected data + let progress_snapshots = progress_snapshots.lock().unwrap().clone(); + let metadata_snapshots = metadata_snapshots.lock().unwrap().clone(); + + // Calculate statistics + let percentages: Vec = progress_snapshots.iter().map(|s| s.percentage).collect(); + let min_percentage = percentages.iter().cloned().fold(1.0f32, f32::min); + let max_percentage = percentages.iter().cloned().fold(0.0f32, f32::max); + + let mut percentage_jumps = Vec::new(); + for i in 1..percentages.len() { + let jump = (percentages[i] - percentages[i - 1]) * 100.0; + if jump > 0.0 { + percentage_jumps.push(jump); } } + let max_jump = percentage_jumps.iter().cloned().fold(0.0f32, f32::max); - if !increments.is_empty() { - let max_increment = increments.iter().cloned().fold(0.0f32, f32::max); - println!("Maximum progress increment: {:.2}%", max_increment); + let files_completed_at_end = if let Some(last_snapshot) = progress_snapshots.last() { + last_snapshot.files_copied + } else { + 0 + }; - // Should have reasonable progress granularity - // With fast SSDs, individual files copy quickly, so accept larger jumps - assert!( - max_increment < 30.0, - "Progress should update reasonably across files, not jump by {:.1}%", - max_increment + let test_passed = files_completed_at_end == file_count + && max_percentage >= 0.99 + && max_jump < 50.0; + + let failure_reason = if !test_passed { + if files_completed_at_end != file_count { + Some(format!( + "Expected {} files completed, got {}", + file_count, files_completed_at_end + )) + } else if max_percentage < 0.99 { + Some(format!( + "Progress never reached 100% (max: {:.1}%)", + max_percentage * 100.0 + )) + } else { + Some(format!( + "Progress bar reset detected (max jump: {:.1}%)", + max_jump + )) + } + } else { + None + }; + + let snapshot = TestSnapshot { + test_name: "copy_progress_with_metadata_tracking".to_string(), + file_count, + file_size_mb, + copy_method: "Streaming".to_string(), + progress_events: progress_snapshots, + metadata_queries: metadata_snapshots, + final_metadata: Some(final_metadata_json), + summary: TestSummary { + total_progress_events: event_count, + total_metadata_queries: metadata_query_count, + min_percentage, + max_percentage, + percentage_jumps, + max_jump, + files_completed_at_end, + test_passed, + failure_reason, + }, + }; + + // Save snapshot if enabled + if std::env::var("SD_TEST_SNAPSHOTS").is_ok() { + let snapshot_dir = dirs::data_local_dir() + .unwrap() + .join("spacedrive") + .join("test_snapshots") + .join("copy_progress_test") + .join(chrono::Local::now().format("%Y%m%d_%H%M%S").to_string()); + + fs::create_dir_all(&snapshot_dir).await.unwrap(); + + let snapshot_path = snapshot_dir.join("test_snapshot.json"); + let snapshot_json = serde_json::to_string_pretty(&snapshot).unwrap(); + fs::write(&snapshot_path, snapshot_json).await.unwrap(); + + println!("\n📸 Snapshot saved to: {}", snapshot_path.display()); + } else { + // Always write to temp dir for local inspection + let temp_snapshot_path = test_root.join("test_snapshot.json"); + let snapshot_json = serde_json::to_string_pretty(&snapshot).unwrap(); + fs::write(&temp_snapshot_path, snapshot_json) + .await + .unwrap(); + println!( + "\n📄 Snapshot written to temp: {}", + temp_snapshot_path.display() ); + println!(" (Set SD_TEST_SNAPSHOTS=1 to save to permanent location)"); } - println!("\nMulti-file progress monitoring test passed!"); + // Print summary + println!("\n=== Test Summary ==="); + println!("Files completed: {}/{}", files_completed_at_end, file_count); + println!( + "Progress range: {:.1}% - {:.1}%", + min_percentage * 100.0, + max_percentage * 100.0 + ); + println!("Max progress jump: {:.1}%", max_jump); + println!("Test passed: {}", test_passed); + if let Some(reason) = &snapshot.summary.failure_reason { + println!("Failure reason: {}", reason); + } - // Cleanup: shutdown Core to stop background services + // Cleanup core.shutdown().await.unwrap(); - - // Explicitly drop Core to free resources drop(core); + tokio::time::sleep(Duration::from_millis(500)).await; - // Give time for all async cleanup to complete (increased for slower systems) - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + // Assert test passed + if !test_passed { + panic!( + "Test failed: {}", + snapshot + .summary + .failure_reason + .unwrap_or_else(|| "Unknown failure".to_string()) + ); + } } + diff --git a/packages/interface/src/components/JobManager/components/CopyJobDetails.tsx b/packages/interface/src/components/JobManager/components/CopyJobDetails.tsx index c83a8f7b3..7fbd3d251 100644 --- a/packages/interface/src/components/JobManager/components/CopyJobDetails.tsx +++ b/packages/interface/src/components/JobManager/components/CopyJobDetails.tsx @@ -1,5 +1,6 @@ -import { Check, Circle, X, Spinner } from "@phosphor-icons/react"; +import { CheckCircle, Circle, X, Spinner } from "@phosphor-icons/react"; import clsx from "clsx"; +import { useEffect, useRef } from "react"; import { useLibraryQuery } from "../../../contexts/SpacedriveContext"; import type { JobListItem } from "../types"; import type { SpeedSample } from "../hooks/useJobs"; @@ -14,13 +15,24 @@ interface CopyJobDetailsProps { export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) { const generic = job.generic_progress; + const scrollContainerRef = useRef(null); // Fetch copy metadata (file queue with File objects) - const { data: metadata } = useLibraryQuery({ + const { data: metadata, refetch } = useLibraryQuery({ type: "jobs.get_copy_metadata", input: { job_id: job.id }, }); + // Refetch when completed count changes + const prevCompletedRef = useRef(0); + useEffect(() => { + const currentCompleted = generic?.completion?.completed || 0; + if (currentCompleted !== prevCompletedRef.current) { + prevCompletedRef.current = currentCompleted; + refetch(); + } + }, [generic?.completion?.completed, refetch]); + if (!generic) { return (
@@ -38,6 +50,25 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) { fileMap.set(file.id, file); }); + // Auto-scroll to center the currently copying file + useEffect(() => { + const currentFileIndex = files.findIndex(f => f.status === "copying"); + if (currentFileIndex !== -1 && scrollContainerRef.current) { + const container = scrollContainerRef.current; + const fileElements = container.children; + const currentElement = fileElements[currentFileIndex] as HTMLElement; + + if (currentElement) { + const containerHeight = container.clientHeight; + const elementTop = currentElement.offsetTop; + const elementHeight = currentElement.clientHeight; + const scrollTop = elementTop - containerHeight / 2 + elementHeight / 2; + + container.scrollTo({ top: scrollTop, behavior: "smooth" }); + } + } + }, [files]); + return (
{/* Speed graph */} @@ -47,7 +78,10 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) { {files.length > 0 && (
Transfer Queue
-
+
{files.map((file, index) => { const fileObj = file.entry_id ? fileMap.get(file.entry_id) : null; const isEven = index % 2 === 0; @@ -56,15 +90,16 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
{/* Status icon */} {file.status === "completed" ? ( - + ) : file.status === "copying" ? ( ) : file.status === "failed" ? ( @@ -89,7 +124,7 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {