mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-19 05:45:01 -04:00
feat(CopyJobDetails): enhance file transfer UI with auto-scrolling and metadata refetching
This commit improves the `CopyJobDetails` component by adding an auto-scroll feature that centers the currently copying file in the transfer queue. Additionally, it implements a mechanism to refetch copy metadata when the completed file count changes, ensuring that the displayed information is always up-to-date. The UI is further refined with updated icons and styles for better user experience during file operations.
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use clap::{Args, Subcommand};
|
||||
use clap::{Args, Subcommand, ValueEnum};
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum EventsCmd {
|
||||
@@ -6,6 +6,16 @@ pub enum EventsCmd {
|
||||
Monitor(EventsMonitorArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, ValueEnum)]
|
||||
pub enum OutputFormat {
|
||||
/// Human-readable summary (default)
|
||||
Human,
|
||||
/// Compact JSON output
|
||||
Json,
|
||||
/// Pretty-printed JSON output
|
||||
JsonPretty,
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct EventsMonitorArgs {
|
||||
/// Filter by event type(s) (comma-separated, e.g., "JobProgress,JobCompleted")
|
||||
@@ -29,6 +39,10 @@ pub struct EventsMonitorArgs {
|
||||
#[arg(long)]
|
||||
pub timestamps: bool,
|
||||
|
||||
/// Output format
|
||||
#[arg(short = 'f', long, value_enum, default_value = "human")]
|
||||
pub format: OutputFormat,
|
||||
|
||||
/// Show full event JSON (verbose mode)
|
||||
#[arg(short = 'v', long)]
|
||||
pub verbose: bool,
|
||||
|
||||
@@ -97,16 +97,22 @@ fn display_event(event: &Event, args: &EventsMonitorArgs) {
|
||||
|
||||
let event_variant = event.variant_name();
|
||||
|
||||
if args.verbose || args.pretty {
|
||||
// Verbose mode: show full JSON
|
||||
let json_str = if args.pretty {
|
||||
// Determine output format (new format flag takes precedence over legacy flags)
|
||||
let use_json_pretty = matches!(args.format, OutputFormat::JsonPretty) || args.pretty;
|
||||
let use_json = matches!(args.format, OutputFormat::Json)
|
||||
|| use_json_pretty
|
||||
|| args.verbose;
|
||||
|
||||
if use_json {
|
||||
// JSON mode: show full JSON
|
||||
let json_str = if use_json_pretty {
|
||||
serde_json::to_string_pretty(&event).unwrap_or_else(|_| format!("{:?}", event))
|
||||
} else {
|
||||
serde_json::to_string(&event).unwrap_or_else(|_| format!("{:?}", event))
|
||||
};
|
||||
println!("{}{}: {}", timestamp, event_variant, json_str);
|
||||
} else {
|
||||
// Compact mode: show event type and key fields
|
||||
// Human-readable mode: show event type and key fields
|
||||
let summary = summarize_event(event);
|
||||
println!("{}{}: {}", timestamp, event_variant, summary);
|
||||
}
|
||||
|
||||
@@ -137,6 +137,7 @@ async fn run_copy_with_confirmation(
|
||||
"Rename the new file(s) (e.g., file.txt -> file (1).txt)".to_string(),
|
||||
"Abort this copy operation".to_string(),
|
||||
],
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
let choice_index = prompt_for_choice(request)?;
|
||||
|
||||
@@ -351,6 +351,9 @@ impl JobManager {
|
||||
match task_handle {
|
||||
Ok(handle_result) => {
|
||||
// Track running job
|
||||
// Clone latest_progress for monitoring task before moving into RunningJob
|
||||
let latest_progress_for_monitor = latest_progress.clone();
|
||||
|
||||
self.running_jobs.write().await.insert(
|
||||
job_id,
|
||||
RunningJob {
|
||||
@@ -408,6 +411,35 @@ impl JobManager {
|
||||
}
|
||||
};
|
||||
|
||||
// Emit final progress event if one exists (may have been throttled)
|
||||
if let Some(final_progress) = latest_progress_for_monitor.lock().await.as_ref() {
|
||||
let generic_progress = match final_progress {
|
||||
Progress::Structured(value) => {
|
||||
// Try to deserialize CopyProgress and convert to GenericProgress
|
||||
if let Ok(copy_progress) = serde_json::from_value::<
|
||||
crate::ops::files::copy::CopyProgress,
|
||||
>(value.clone())
|
||||
{
|
||||
use crate::infra::job::generic_progress::ToGenericProgress;
|
||||
Some(copy_progress.to_generic_progress())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Progress::Generic(gp) => Some(gp.clone()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
event_bus.emit(Event::JobProgress {
|
||||
job_id: job_id_clone.to_string(),
|
||||
job_type: job_type_str.to_string(),
|
||||
device_id,
|
||||
progress: final_progress.as_percentage().unwrap_or(0.0) as f64,
|
||||
message: Some(final_progress.to_string()),
|
||||
generic_progress,
|
||||
});
|
||||
}
|
||||
|
||||
// Emit completion event with the job's output
|
||||
event_bus.emit(Event::JobCompleted {
|
||||
job_id: job_id_clone.to_string(),
|
||||
@@ -759,6 +791,7 @@ impl JobManager {
|
||||
let job_type_str = J::NAME;
|
||||
let library_id_clone = self.library_id;
|
||||
let context = self.context.clone();
|
||||
let latest_progress_for_monitor = latest_progress.clone();
|
||||
let device_id = self
|
||||
.context
|
||||
.device_manager
|
||||
@@ -798,6 +831,35 @@ impl JobManager {
|
||||
}
|
||||
};
|
||||
|
||||
// Emit final progress event if one exists (may have been throttled)
|
||||
if let Some(final_progress) = latest_progress_for_monitor.lock().await.as_ref() {
|
||||
let generic_progress = match final_progress {
|
||||
Progress::Structured(value) => {
|
||||
// Try to deserialize CopyProgress and convert to GenericProgress
|
||||
if let Ok(copy_progress) = serde_json::from_value::<
|
||||
crate::ops::files::copy::CopyProgress,
|
||||
>(value.clone())
|
||||
{
|
||||
use crate::infra::job::generic_progress::ToGenericProgress;
|
||||
Some(copy_progress.to_generic_progress())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Progress::Generic(gp) => Some(gp.clone()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
event_bus.emit(Event::JobProgress {
|
||||
job_id: job_id_clone.to_string(),
|
||||
job_type: job_type_str.to_string(),
|
||||
device_id,
|
||||
progress: final_progress.as_percentage().unwrap_or(0.0) as f64,
|
||||
message: Some(final_progress.to_string()),
|
||||
generic_progress,
|
||||
});
|
||||
}
|
||||
|
||||
// Emit completion event with the job's output
|
||||
event_bus.emit(Event::JobCompleted {
|
||||
job_id: job_id_clone.to_string(),
|
||||
@@ -1412,6 +1474,9 @@ impl JobManager {
|
||||
{
|
||||
Ok(task_handle) => {
|
||||
// Track running job
|
||||
// Clone latest_progress for monitoring task before moving into RunningJob
|
||||
let latest_progress_for_monitor = latest_progress.clone();
|
||||
|
||||
self.running_jobs.write().await.insert(
|
||||
job_id,
|
||||
RunningJob {
|
||||
|
||||
@@ -373,6 +373,9 @@ impl JobHandler for FileCopyJob {
|
||||
};
|
||||
|
||||
copied_count += files_in_source; // Count actual files as copied for progress tracking
|
||||
|
||||
// Mark as completed in metadata (already done during previous run)
|
||||
self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Completed);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -418,6 +421,9 @@ impl JobHandler for FileCopyJob {
|
||||
1
|
||||
};
|
||||
|
||||
// Mark file as currently copying in metadata
|
||||
self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Copying);
|
||||
|
||||
// Update aggregator with current file info
|
||||
let operation_description = CopyStrategyRouter::describe_strategy(
|
||||
&resolved_source,
|
||||
@@ -495,6 +501,10 @@ impl JobHandler for FileCopyJob {
|
||||
if let Some(dest_path) = final_destination.as_local_path() {
|
||||
if dest_path.exists() {
|
||||
ctx.log(format!("Skipping existing file: {}", dest_path.display()));
|
||||
|
||||
// Mark as skipped in metadata
|
||||
self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Skipped);
|
||||
|
||||
// Skip this file
|
||||
progress_aggregator.complete_source();
|
||||
copied_count += files_in_source;
|
||||
@@ -558,6 +568,9 @@ impl JobHandler for FileCopyJob {
|
||||
// Track successful completion for resume
|
||||
self.completed_indices.push(index);
|
||||
|
||||
// Mark as completed in metadata
|
||||
self.job_metadata.update_status(&resolved_source, super::metadata::CopyFileStatus::Completed);
|
||||
|
||||
// If this is a move operation and the strategy didn't handle deletion,
|
||||
// we need to delete the source after successful copy
|
||||
if is_move && resolved_source.device_slug() == final_destination.device_slug() {
|
||||
@@ -608,10 +621,16 @@ impl JobHandler for FileCopyJob {
|
||||
resolved_source.display(),
|
||||
e
|
||||
));
|
||||
|
||||
// Mark as failed in metadata
|
||||
self.job_metadata.set_error(&resolved_source, e.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// Checkpoint every 20 files to save completed_indices
|
||||
// Persist after every file so UI can show real-time checkbox updates
|
||||
self.persist_job_state_to_db(&ctx).await?;
|
||||
|
||||
// Checkpoint every 20 files to save job state to disk
|
||||
if copied_count % 20 == 0 {
|
||||
ctx.checkpoint().await?;
|
||||
}
|
||||
@@ -642,6 +661,9 @@ impl JobHandler for FileCopyJob {
|
||||
};
|
||||
ctx.progress(Progress::generic(progress.to_generic_progress()));
|
||||
|
||||
// Persist final job state with all file statuses
|
||||
self.persist_job_state_to_db(&ctx).await?;
|
||||
|
||||
ctx.log(format!(
|
||||
"Copy operation completed: {} copied, {} failed",
|
||||
copied_count,
|
||||
@@ -785,21 +807,27 @@ impl<'a> ProgressAggregator<'a> {
|
||||
Box::new(move |bytes_value: u64, signal_value: u64| {
|
||||
// Signal: u64::MAX means a file has finished, bytes_value is its size
|
||||
if signal_value == u64::MAX {
|
||||
// Update backend state
|
||||
let mut files = files_completed.lock().unwrap();
|
||||
*files += 1;
|
||||
drop(files);
|
||||
let mut bytes = bytes_before.lock().unwrap();
|
||||
*bytes += bytes_value;
|
||||
ctx.log(format!(
|
||||
"File completed. Total files: {}/{}, Total bytes: {}",
|
||||
*files, total_files, *bytes
|
||||
));
|
||||
return;
|
||||
drop(bytes);
|
||||
}
|
||||
|
||||
// Normal byte-level progress update
|
||||
let bytes_before_snapshot = *bytes_before.lock().unwrap();
|
||||
let total_bytes_copied = bytes_before_snapshot + bytes_value;
|
||||
// Read current backend state for progress event (AFTER mutations above)
|
||||
let files_completed_count = *files_completed.lock().unwrap();
|
||||
let bytes_before_snapshot = *bytes_before.lock().unwrap();
|
||||
|
||||
// Calculate total bytes for this progress event
|
||||
let total_bytes_copied = if signal_value == u64::MAX {
|
||||
// File just completed - bytes were already added to bytes_before above
|
||||
bytes_before_snapshot
|
||||
} else {
|
||||
// Byte-level progress - bytes_value is current file progress within current file
|
||||
bytes_before_snapshot + bytes_value
|
||||
};
|
||||
|
||||
// Update speed tracker and get current rate
|
||||
let mut tracker = speed_tracker.lock().unwrap();
|
||||
@@ -829,8 +857,13 @@ impl<'a> ProgressAggregator<'a> {
|
||||
strategy_metadata: current_strategy_metadata,
|
||||
};
|
||||
|
||||
// Log progress details every 100MB
|
||||
if total_bytes_copied % (100 * 1024 * 1024) < bytes_value {
|
||||
// Log progress details every 100MB or on file completion
|
||||
if signal_value == u64::MAX {
|
||||
ctx.log(format!(
|
||||
"File completed. Total files: {}/{}, Total bytes: {}",
|
||||
files_completed_count, total_files, total_bytes_copied
|
||||
));
|
||||
} else if total_bytes_copied % (100 * 1024 * 1024) < bytes_value {
|
||||
ctx.log(format!(
|
||||
"Progress: {} / {} bytes ({:.1}%), {}/s, ETA: {}",
|
||||
total_bytes_copied,
|
||||
|
||||
@@ -10,6 +10,7 @@ use specta::Type;
|
||||
|
||||
/// Status of a file in the copy operation.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type, PartialEq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum CopyFileStatus {
|
||||
/// File is waiting to be copied
|
||||
Pending,
|
||||
|
||||
@@ -111,7 +111,7 @@ impl CopyStrategy for LocalMoveStrategy {
|
||||
fs::rename(source_path, dest_path).await?;
|
||||
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(size, size);
|
||||
callback(size, u64::MAX);
|
||||
}
|
||||
|
||||
ctx.log(format!(
|
||||
@@ -235,6 +235,11 @@ impl CopyStrategy for FastCopyStrategy {
|
||||
}
|
||||
}
|
||||
|
||||
// Signal file completion to aggregator
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(bytes_copied, u64::MAX);
|
||||
}
|
||||
|
||||
ctx.log(format!(
|
||||
"Fast copy: {} -> {} ({} bytes)",
|
||||
source_path.display(),
|
||||
@@ -752,6 +757,11 @@ impl RemoteTransferStrategy {
|
||||
final_dest_path.display()
|
||||
));
|
||||
|
||||
// Signal file completion to aggregator
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(total_bytes_received, u64::MAX);
|
||||
}
|
||||
|
||||
Ok(total_bytes_received)
|
||||
}
|
||||
}
|
||||
@@ -931,7 +941,7 @@ async fn copy_single_file_with_offset<'a>(
|
||||
dest_file.sync_all().await?;
|
||||
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(total_copied, file_size);
|
||||
callback(file_size, u64::MAX);
|
||||
ctx.log(format!(
|
||||
"Strategy final progress: {} / {} bytes (100%)",
|
||||
total_copied, file_size
|
||||
@@ -1305,5 +1315,10 @@ async fn stream_file_data<'a>(
|
||||
chunk_index, bytes_transferred, destination_device_id
|
||||
));
|
||||
|
||||
// Signal file completion to aggregator
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(bytes_transferred, u64::MAX);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
//! Test for monitoring copy progress with large files
|
||||
//! Test for monitoring copy progress with file-level tracking
|
||||
//!
|
||||
//! This test verifies that copy progress updates smoothly with byte-level
|
||||
//! granularity rather than jumping in large increments.
|
||||
//! This test verifies that:
|
||||
//! 1. File status metadata updates correctly (Pending -> Copying -> Completed)
|
||||
//! 2. Progress events show accurate file counts
|
||||
//! 3. Progress bar doesn't reset between files
|
||||
//!
|
||||
//! The test captures all events and queries metadata for post-mortem analysis.
|
||||
|
||||
use sd_core::domain::addressing::{SdPath, SdPathBatch};
|
||||
use sd_core::{
|
||||
@@ -9,7 +13,9 @@ use sd_core::{
|
||||
ops::files::copy::{action::FileCopyAction, input::CopyMethod, job::CopyOptions},
|
||||
Core,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -46,16 +52,55 @@ async fn create_large_test_file(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
/// Progress snapshot with full event details
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct ProgressSnapshot {
|
||||
timestamp: std::time::Instant,
|
||||
timestamp_ms: u128,
|
||||
percentage: f32,
|
||||
bytes_copied: u64,
|
||||
bytes_total: u64,
|
||||
files_copied: usize,
|
||||
files_total: usize,
|
||||
message: String,
|
||||
phase: String,
|
||||
}
|
||||
|
||||
/// Job metadata snapshot from query
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct MetadataSnapshot {
|
||||
timestamp_ms: u128,
|
||||
total_files: usize,
|
||||
file_statuses: HashMap<String, String>,
|
||||
}
|
||||
|
||||
/// Complete test snapshot for analysis
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TestSnapshot {
|
||||
test_name: String,
|
||||
file_count: usize,
|
||||
file_size_mb: usize,
|
||||
copy_method: String,
|
||||
progress_events: Vec<ProgressSnapshot>,
|
||||
metadata_queries: Vec<MetadataSnapshot>,
|
||||
final_metadata: Option<serde_json::Value>,
|
||||
summary: TestSummary,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TestSummary {
|
||||
total_progress_events: usize,
|
||||
total_metadata_queries: usize,
|
||||
min_percentage: f32,
|
||||
max_percentage: f32,
|
||||
percentage_jumps: Vec<f32>,
|
||||
max_jump: f32,
|
||||
files_completed_at_end: usize,
|
||||
test_passed: bool,
|
||||
failure_reason: Option<String>,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_copy_progress_monitoring_large_file() {
|
||||
async fn test_copy_progress_with_metadata_tracking() {
|
||||
// Initialize tracing subscriber for debug logs
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
@@ -65,6 +110,9 @@ async fn test_copy_progress_monitoring_large_file() {
|
||||
)
|
||||
.set_default();
|
||||
|
||||
println!("\n=== Copy Progress Metadata Test ===");
|
||||
println!("This test captures all progress events and metadata queries for analysis\n");
|
||||
|
||||
// Setup test environment
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let test_root = temp_dir.path();
|
||||
@@ -75,30 +123,26 @@ async fn test_copy_progress_monitoring_large_file() {
|
||||
fs::create_dir_all(&source_dir).await.unwrap();
|
||||
fs::create_dir_all(&dest_dir).await.unwrap();
|
||||
|
||||
// Create a large test file (500MB to ensure enough time for progress updates)
|
||||
let source_file = source_dir.join("large_test_file.bin");
|
||||
let file_size_mb = 500; // 500MB
|
||||
// Create 3 test files of 1GB each
|
||||
let file_size_mb = 1000;
|
||||
let file_count = 3;
|
||||
let mut source_files = Vec::new();
|
||||
|
||||
println!("Creating {}MB test file...", file_size_mb);
|
||||
create_large_test_file(&source_file, file_size_mb)
|
||||
.await
|
||||
.unwrap();
|
||||
for i in 1..=file_count {
|
||||
let file_path = source_dir.join(format!("test_file_{}.bin", i));
|
||||
println!("Creating test_file_{}.bin ({}MB)...", i, file_size_mb);
|
||||
create_large_test_file(&file_path, file_size_mb)
|
||||
.await
|
||||
.unwrap();
|
||||
source_files.push(file_path);
|
||||
}
|
||||
|
||||
// Verify file size
|
||||
let metadata = fs::metadata(&source_file).await.unwrap();
|
||||
let expected_size = (file_size_mb * 1024 * 1024) as u64;
|
||||
assert_eq!(
|
||||
metadata.len(),
|
||||
expected_size,
|
||||
"Test file should be exactly {}MB",
|
||||
file_size_mb
|
||||
);
|
||||
|
||||
// Initialize core with custom data directory
|
||||
println!("Initializing Spacedrive Core...");
|
||||
let core_data_dir = test_root.join("core_data");
|
||||
let core = Core::new(core_data_dir).await.unwrap();
|
||||
let core = Core::new(core_data_dir.clone()).await.unwrap();
|
||||
|
||||
// Create a test library
|
||||
println!("Creating test library...");
|
||||
let library = core
|
||||
.libraries
|
||||
.create_library("Progress Test Library", None, core.context.clone())
|
||||
@@ -109,326 +153,21 @@ async fn test_copy_progress_monitoring_large_file() {
|
||||
|
||||
// Create ActionManager
|
||||
let context = core.context.clone();
|
||||
let action_manager = ActionManager::new(context);
|
||||
let action_manager = ActionManager::new(context.clone());
|
||||
|
||||
// Build the copy action with the exact options from the CLI command
|
||||
// Build the copy action
|
||||
let copy_action = FileCopyAction {
|
||||
sources: SdPathBatch::new(vec![SdPath::local(source_file.clone())]),
|
||||
sources: SdPathBatch::new(
|
||||
source_files
|
||||
.iter()
|
||||
.map(|p| SdPath::local(p.clone()))
|
||||
.collect(),
|
||||
),
|
||||
destination: SdPath::local(dest_dir.clone()),
|
||||
options: CopyOptions {
|
||||
conflict_resolution: None,
|
||||
overwrite: true, // Bypass confirmation workflow in tests
|
||||
verify_checksum: true, // --verify
|
||||
preserve_timestamps: true, // --preserve-timestamps
|
||||
delete_after_copy: false,
|
||||
move_mode: None,
|
||||
copy_method: CopyMethod::Streaming, // --method streaming
|
||||
},
|
||||
on_conflict: None,
|
||||
};
|
||||
|
||||
// Dispatch the action directly via ActionManager (library-scoped)
|
||||
|
||||
// Setup progress monitoring
|
||||
let progress_snapshots = Arc::new(Mutex::new(Vec::new()));
|
||||
let progress_snapshots_clone = progress_snapshots.clone();
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
// Subscribe to events BEFORE dispatching to avoid race condition
|
||||
let mut event_subscriber = core.events.subscribe();
|
||||
|
||||
// Execute the action
|
||||
println!("Starting copy operation...");
|
||||
let _job_handle = action_manager
|
||||
.dispatch_library(Some(library_id), copy_action)
|
||||
.await
|
||||
.expect("Action dispatch should succeed");
|
||||
|
||||
// Job ID will be read from first Job* event below
|
||||
let expected_size_clone = expected_size;
|
||||
let mut observed_job_id: Option<String> = None;
|
||||
|
||||
// Start monitoring task using EventBus
|
||||
let monitor_handle = tokio::spawn(async move {
|
||||
let mut last_progress = 0.0;
|
||||
let mut has_seen_progress = false;
|
||||
let mut event_count = 0;
|
||||
|
||||
while let Ok(event) = event_subscriber.recv().await {
|
||||
event_count += 1;
|
||||
|
||||
match event {
|
||||
Event::JobProgress {
|
||||
job_id: event_job_id,
|
||||
progress,
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
if observed_job_id.is_none() {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let current_progress = progress * 100.0;
|
||||
|
||||
// Record snapshot if progress changed
|
||||
if (current_progress - last_progress).abs() > 0.01 {
|
||||
has_seen_progress = true;
|
||||
|
||||
let snapshot = ProgressSnapshot {
|
||||
timestamp: std::time::Instant::now(),
|
||||
percentage: current_progress as f32,
|
||||
bytes_copied: (expected_size_clone as f64 * (progress as f64)) as u64,
|
||||
message: message.unwrap_or_else(|| format!("{:.1}%", current_progress)),
|
||||
};
|
||||
|
||||
println!(
|
||||
"Progress: {:.1}% ({} MB)",
|
||||
current_progress,
|
||||
snapshot.bytes_copied / (1024 * 1024)
|
||||
);
|
||||
|
||||
progress_snapshots_clone.lock().unwrap().push(snapshot);
|
||||
last_progress = current_progress;
|
||||
}
|
||||
}
|
||||
Event::JobCompleted {
|
||||
job_id: event_job_id,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
println!("Job completed! (after {} events)", event_count);
|
||||
println!("Final progress: {:.1}%", last_progress);
|
||||
|
||||
// Record final progress if we haven't seen any updates
|
||||
if !has_seen_progress && last_progress == 0.0 {
|
||||
let snapshot = ProgressSnapshot {
|
||||
timestamp: std::time::Instant::now(),
|
||||
percentage: 100.0,
|
||||
bytes_copied: expected_size_clone,
|
||||
message: "Final".to_string(),
|
||||
};
|
||||
progress_snapshots_clone.lock().unwrap().push(snapshot);
|
||||
}
|
||||
break;
|
||||
}
|
||||
Event::JobFailed {
|
||||
job_id: event_job_id,
|
||||
error,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
println!("Job failed after {} events: {}", event_count, error);
|
||||
panic!("Job failed: {}", error);
|
||||
}
|
||||
Event::JobCancelled {
|
||||
job_id: event_job_id,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
println!("Job was cancelled after {} events", event_count);
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
// Other events - continue monitoring
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
has_seen_progress
|
||||
});
|
||||
|
||||
// Wait for job completion with timeout (120s for large file on slower systems)
|
||||
let completion_result = timeout(Duration::from_secs(120), monitor_handle).await;
|
||||
|
||||
let has_seen_progress = match completion_result {
|
||||
Ok(Ok(has_progress)) => {
|
||||
println!("Monitoring completed successfully");
|
||||
has_progress
|
||||
}
|
||||
Ok(Err(e)) => panic!("Monitoring task failed: {}", e),
|
||||
Err(_) => panic!("Copy operation timed out after 120 seconds"),
|
||||
};
|
||||
|
||||
// Analyze progress snapshots
|
||||
let snapshots = progress_snapshots.lock().unwrap();
|
||||
println!("\n=== Progress Analysis ===");
|
||||
println!("Total snapshots captured: {}", snapshots.len());
|
||||
println!("Saw progress updates during copy: {}", has_seen_progress);
|
||||
|
||||
// First check if we got ANY progress updates at all
|
||||
if snapshots.is_empty() {
|
||||
panic!(
|
||||
"No progress updates were captured! Progress stayed at 0% throughout the entire copy operation. \
|
||||
This indicates the progress reporting is not working correctly."
|
||||
);
|
||||
}
|
||||
|
||||
// If we only got one snapshot at the end, that's also a problem
|
||||
if snapshots.len() == 1 && !has_seen_progress {
|
||||
panic!(
|
||||
"Only captured final progress update. Progress reporting did not work during the copy operation."
|
||||
);
|
||||
}
|
||||
|
||||
// With modern fast SSDs, progress updates happen every 50ms but files copy very quickly
|
||||
// Expecting at least 3-5 updates for a large file is reasonable
|
||||
if snapshots.len() < 3 {
|
||||
panic!(
|
||||
"Too few progress updates captured! Only {} snapshots for a {}MB file. \
|
||||
Expected at least a few progress updates throughout the operation.",
|
||||
snapshots.len(),
|
||||
file_size_mb
|
||||
);
|
||||
}
|
||||
|
||||
// Calculate progress increments
|
||||
let mut increments = Vec::new();
|
||||
for i in 1..snapshots.len() {
|
||||
let increment = snapshots[i].percentage - snapshots[i - 1].percentage;
|
||||
if increment > 0.0 {
|
||||
increments.push(increment);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate statistics
|
||||
let avg_increment = increments.iter().sum::<f32>() / increments.len() as f32;
|
||||
let max_increment = increments.iter().cloned().fold(0.0f32, f32::max);
|
||||
let min_increment = increments.iter().cloned().fold(100.0f32, f32::min);
|
||||
|
||||
println!("Progress increments:");
|
||||
println!(" Average: {:.2}%", avg_increment);
|
||||
println!(" Maximum: {:.2}%", max_increment);
|
||||
println!(" Minimum: {:.2}%", min_increment);
|
||||
println!(" Total updates: {}", increments.len());
|
||||
|
||||
// Verify smooth progress (no large jumps)
|
||||
// For fast SSDs, progress updates happen every 50ms but files copy quickly
|
||||
// Accept up to 25% jumps since the actual granularity depends on I/O speed
|
||||
assert!(
|
||||
max_increment < 25.0,
|
||||
"Progress jumped by {:.1}% - should update reasonably smoothly (max 25%)",
|
||||
max_increment
|
||||
);
|
||||
|
||||
// Verify we got reasonable granularity
|
||||
// With fast SSDs, we may get fewer updates than expected
|
||||
assert!(
|
||||
snapshots.len() > 5,
|
||||
"Expected at least 5 progress updates for a {}MB file, got {}",
|
||||
file_size_mb,
|
||||
snapshots.len()
|
||||
);
|
||||
|
||||
// Verify file was copied successfully
|
||||
let dest_file = dest_dir.join("large_test_file.bin");
|
||||
assert!(dest_file.exists(), "Destination file should exist");
|
||||
|
||||
let dest_metadata = fs::metadata(&dest_file).await.unwrap();
|
||||
assert_eq!(
|
||||
dest_metadata.len(),
|
||||
expected_size,
|
||||
"Copied file size should match source"
|
||||
);
|
||||
|
||||
// Calculate effective copy speed
|
||||
let total_time = start_time.elapsed();
|
||||
let mb_per_second = (file_size_mb as f64) / total_time.as_secs_f64();
|
||||
println!("\nCopy performance: {:.1} MB/s", mb_per_second);
|
||||
|
||||
println!("\nCopy progress monitoring test passed!");
|
||||
println!(" - Progress updated smoothly with byte-level granularity");
|
||||
println!(" - No large progress jumps detected");
|
||||
println!(" - File copied successfully with checksum verification");
|
||||
|
||||
// Cleanup: shutdown Core to stop background services
|
||||
core.shutdown().await.unwrap();
|
||||
|
||||
// Explicitly drop Core to free resources
|
||||
drop(core);
|
||||
|
||||
// Give time for all async cleanup to complete (increased for slower systems)
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_copy_progress_multiple_files() {
|
||||
// Initialize tracing subscriber for debug logs
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.try_init();
|
||||
|
||||
// This test verifies progress tracking across multiple files
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let test_root = temp_dir.path();
|
||||
|
||||
let source_dir = test_root.join("source");
|
||||
let dest_dir = test_root.join("destination");
|
||||
fs::create_dir_all(&source_dir).await.unwrap();
|
||||
fs::create_dir_all(&dest_dir).await.unwrap();
|
||||
|
||||
// Create 4 files of different sizes (large enough to ensure progress is captured)
|
||||
let files = vec![
|
||||
("file1.bin", 100), // 100MB
|
||||
("file2.bin", 200), // 200MB
|
||||
("file3.bin", 150), // 150MB
|
||||
("file4.bin", 50), // 50MB
|
||||
];
|
||||
|
||||
let mut source_files = Vec::new();
|
||||
for (name, size_mb) in &files {
|
||||
let path = source_dir.join(name);
|
||||
println!("Creating {} ({}MB)...", name, size_mb);
|
||||
create_large_test_file(&path, *size_mb).await.unwrap();
|
||||
source_files.push(path);
|
||||
}
|
||||
|
||||
// Initialize core and library
|
||||
let core_data_dir = test_root.join("core_data");
|
||||
let core = Core::new(core_data_dir).await.unwrap();
|
||||
let library = core
|
||||
.libraries
|
||||
.create_library("Multi-file Progress Test", None, core.context.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let library_id = library.id();
|
||||
|
||||
let context = core.context.clone();
|
||||
let action_manager = ActionManager::new(context);
|
||||
|
||||
// Build copy action for multiple files
|
||||
let copy_action = FileCopyAction {
|
||||
sources: SdPathBatch::new(source_files.iter().cloned().map(SdPath::local).collect()),
|
||||
destination: SdPath::local(dest_dir.clone()),
|
||||
options: CopyOptions {
|
||||
conflict_resolution: None,
|
||||
overwrite: true, // Bypass confirmation workflow in tests
|
||||
verify_checksum: true,
|
||||
overwrite: true,
|
||||
verify_checksum: false, // Disable for speed
|
||||
preserve_timestamps: true,
|
||||
delete_after_copy: false,
|
||||
move_mode: None,
|
||||
@@ -437,154 +176,346 @@ async fn test_copy_progress_multiple_files() {
|
||||
on_conflict: None,
|
||||
};
|
||||
|
||||
// Dispatch the action directly via ActionManager (library-scoped)
|
||||
|
||||
// Setup progress monitoring
|
||||
// Setup data collection
|
||||
let progress_snapshots = Arc::new(Mutex::new(Vec::new()));
|
||||
let progress_snapshots_clone = progress_snapshots.clone();
|
||||
let metadata_snapshots = Arc::new(Mutex::new(Vec::new()));
|
||||
let progress_clone = progress_snapshots.clone();
|
||||
let metadata_clone = metadata_snapshots.clone();
|
||||
let library_clone = library.clone();
|
||||
let core_context_clone = core.context.clone();
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
// Subscribe to events BEFORE dispatching to avoid race condition
|
||||
// Subscribe to events BEFORE dispatching
|
||||
let mut event_subscriber = core.events.subscribe();
|
||||
|
||||
// Execute the action
|
||||
println!("\nStarting multi-file copy operation...");
|
||||
let _job_handle = action_manager
|
||||
.dispatch_library(Some(library_id), copy_action)
|
||||
.await
|
||||
.expect("Action dispatch should succeed");
|
||||
let mut observed_job_id: Option<String> = None;
|
||||
// Start monitoring task BEFORE dispatching to avoid missing events
|
||||
let (job_id_tx, job_id_rx) = tokio::sync::oneshot::channel::<sd_core::infra::job::types::JobId>();
|
||||
|
||||
let monitor_handle = tokio::spawn(async move {
|
||||
let mut last_progress = 0.0;
|
||||
// Wait for job ID to be sent
|
||||
let job_id = job_id_rx.await.expect("Job ID should be sent");
|
||||
let mut event_count = 0;
|
||||
let mut metadata_query_count = 0;
|
||||
|
||||
while let Ok(event) = event_subscriber.recv().await {
|
||||
match event {
|
||||
Event::JobProgress {
|
||||
job_id: event_job_id,
|
||||
progress,
|
||||
..
|
||||
} => {
|
||||
if observed_job_id.is_none() {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
println!("Monitor task started, listening for job {}", job_id);
|
||||
|
||||
loop {
|
||||
// Wait for next event with timeout for metadata queries
|
||||
match tokio::time::timeout(Duration::from_millis(500), event_subscriber.recv()).await {
|
||||
Ok(Ok(event)) => {
|
||||
event_count += 1;
|
||||
|
||||
// Debug ALL events
|
||||
println!("[DEBUG] Event #{}: {:?}", event_count, event);
|
||||
|
||||
match event {
|
||||
Event::JobProgress {
|
||||
job_id: event_job_id,
|
||||
generic_progress,
|
||||
progress,
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
if event_job_id != job_id.to_string() {
|
||||
continue;
|
||||
}
|
||||
|
||||
println!(
|
||||
"[DEBUG] JobProgress event: progress={}, message={:?}, generic_progress.is_some()={}",
|
||||
progress,
|
||||
message,
|
||||
generic_progress.is_some()
|
||||
);
|
||||
|
||||
if let Some(gen_progress) = generic_progress {
|
||||
let snapshot = ProgressSnapshot {
|
||||
timestamp_ms: start_time.elapsed().as_millis(),
|
||||
percentage: gen_progress.percentage,
|
||||
bytes_copied: gen_progress
|
||||
.completion
|
||||
.bytes_completed
|
||||
.unwrap_or(0),
|
||||
bytes_total: gen_progress.completion.total_bytes.unwrap_or(0),
|
||||
files_copied: gen_progress.completion.completed as usize,
|
||||
files_total: gen_progress.completion.total as usize,
|
||||
message: gen_progress.message.clone(),
|
||||
phase: gen_progress.phase.clone(),
|
||||
};
|
||||
|
||||
println!(
|
||||
"[{:>6}ms] Progress: {:.1}% | Files: {}/{} | Bytes: {}/{} {}",
|
||||
snapshot.timestamp_ms,
|
||||
snapshot.percentage * 100.0,
|
||||
snapshot.files_copied,
|
||||
snapshot.files_total,
|
||||
snapshot.bytes_copied / (1024 * 1024),
|
||||
snapshot.bytes_total / (1024 * 1024),
|
||||
snapshot.phase
|
||||
);
|
||||
|
||||
progress_clone.lock().unwrap().push(snapshot);
|
||||
}
|
||||
}
|
||||
Event::JobCompleted {
|
||||
job_id: event_job_id,
|
||||
..
|
||||
} => {
|
||||
if event_job_id != job_id.to_string() {
|
||||
continue;
|
||||
}
|
||||
println!("\n✓ Job completed after {} events", event_count);
|
||||
break;
|
||||
}
|
||||
Event::JobFailed {
|
||||
job_id: event_job_id,
|
||||
error,
|
||||
..
|
||||
} => {
|
||||
if event_job_id != job_id.to_string() {
|
||||
continue;
|
||||
}
|
||||
println!("\n✗ Job failed: {}", error);
|
||||
panic!("Job failed: {}", error);
|
||||
}
|
||||
_ => {
|
||||
// Other events
|
||||
}
|
||||
}
|
||||
let current_progress = progress * 100.0;
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
// Channel closed
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout - query metadata
|
||||
if metadata_query_count < 20 {
|
||||
// Limit queries
|
||||
use sd_core::infra::query::LibraryQuery;
|
||||
|
||||
if (current_progress - last_progress).abs() > 0.01 {
|
||||
let snapshot = ProgressSnapshot {
|
||||
timestamp: std::time::Instant::now(),
|
||||
percentage: current_progress as f32,
|
||||
bytes_copied: 0, // Would need to calculate from percentage
|
||||
message: format!("{:.1}%", current_progress),
|
||||
let query_input = sd_core::ops::jobs::copy_metadata::query::CopyMetadataQueryInput {
|
||||
job_id: job_id.into(),
|
||||
};
|
||||
|
||||
println!("Multi-file progress: {:.1}%", current_progress);
|
||||
progress_snapshots_clone.lock().unwrap().push(snapshot);
|
||||
last_progress = current_progress;
|
||||
}
|
||||
}
|
||||
Event::JobCompleted {
|
||||
job_id: event_job_id,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
let query =
|
||||
sd_core::ops::jobs::copy_metadata::query::CopyMetadataQuery::from_input(
|
||||
query_input,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut session = sd_core::infra::api::SessionContext::device_session(
|
||||
uuid::Uuid::new_v4(),
|
||||
"test-device".to_string(),
|
||||
);
|
||||
session.current_library_id = Some(library_clone.id());
|
||||
|
||||
match query.execute(core_context_clone.clone(), session).await {
|
||||
Ok(result) => {
|
||||
if let Some(metadata) = result.metadata {
|
||||
let mut file_statuses = HashMap::new();
|
||||
for file in &metadata.files {
|
||||
let status = format!("{:?}", file.status);
|
||||
let name = file
|
||||
.source_path
|
||||
.path()
|
||||
.and_then(|p| {
|
||||
p.file_name()
|
||||
.map(|n| n.to_string_lossy().to_string())
|
||||
})
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
file_statuses.insert(name, status);
|
||||
}
|
||||
|
||||
let snapshot = MetadataSnapshot {
|
||||
timestamp_ms: start_time.elapsed().as_millis(),
|
||||
total_files: metadata.files.len(),
|
||||
file_statuses,
|
||||
};
|
||||
|
||||
metadata_clone.lock().unwrap().push(snapshot);
|
||||
metadata_query_count += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Metadata query failed: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
println!("Multi-file job completed");
|
||||
break;
|
||||
}
|
||||
Event::JobFailed {
|
||||
job_id: event_job_id,
|
||||
error,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
panic!("Multi-file job failed: {}", error);
|
||||
}
|
||||
Event::JobCancelled {
|
||||
job_id: event_job_id,
|
||||
..
|
||||
} => {
|
||||
if let Some(ref jid) = observed_job_id {
|
||||
if &event_job_id != jid {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
observed_job_id = Some(event_job_id.clone());
|
||||
}
|
||||
println!("Multi-file job was cancelled");
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
// Other events - continue monitoring
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(event_count, metadata_query_count)
|
||||
});
|
||||
|
||||
timeout(Duration::from_secs(60), monitor_handle)
|
||||
// NOW dispatch the job (monitor is already listening)
|
||||
println!("\nStarting copy operation for {} files...\n", file_count);
|
||||
let job_handle = action_manager
|
||||
.dispatch_library(Some(library_id), copy_action)
|
||||
.await
|
||||
.expect("Multi-file copy should complete within 60 seconds")
|
||||
.expect("Monitor task should succeed");
|
||||
.expect("Action dispatch should succeed");
|
||||
|
||||
// Analyze progress
|
||||
let snapshots = progress_snapshots.lock().unwrap();
|
||||
println!("\n=== Multi-file Progress Analysis ===");
|
||||
println!("Total snapshots: {}", snapshots.len());
|
||||
let job_id = job_handle.id;
|
||||
|
||||
// Must have captured at least some progress updates during the copy
|
||||
assert!(
|
||||
snapshots.len() >= 3,
|
||||
"Expected at least 3 progress snapshots for multi-file copy, got {}. \
|
||||
Progress tracking may not be working properly.",
|
||||
snapshots.len()
|
||||
// Send job ID to monitoring task
|
||||
job_id_tx.send(job_id).expect("Monitor task should be running");
|
||||
|
||||
// Wait for job completion with timeout
|
||||
let (event_count, metadata_query_count) =
|
||||
match timeout(Duration::from_secs(60), monitor_handle).await {
|
||||
Ok(Ok(result)) => result,
|
||||
Ok(Err(e)) => panic!("Monitoring task failed: {}", e),
|
||||
Err(_) => panic!("Copy operation timed out after 60 seconds"),
|
||||
};
|
||||
|
||||
println!("\n=== Data Collection Summary ===");
|
||||
println!("Progress events captured: {}", event_count);
|
||||
println!("Metadata queries executed: {}", metadata_query_count);
|
||||
|
||||
// Query final metadata state
|
||||
println!("\nQuerying final job metadata...");
|
||||
use sd_core::infra::query::LibraryQuery;
|
||||
let query_input =
|
||||
sd_core::ops::jobs::copy_metadata::query::CopyMetadataQueryInput { job_id: job_id.into() };
|
||||
let query =
|
||||
sd_core::ops::jobs::copy_metadata::query::CopyMetadataQuery::from_input(query_input)
|
||||
.unwrap();
|
||||
let mut session = sd_core::infra::api::SessionContext::device_session(
|
||||
uuid::Uuid::new_v4(),
|
||||
"test-device".to_string(),
|
||||
);
|
||||
session.current_library_id = Some(library_id);
|
||||
let final_ctx = context.clone();
|
||||
let final_metadata_result = query.execute(final_ctx, session).await.unwrap();
|
||||
let final_metadata_json = serde_json::to_value(&final_metadata_result).unwrap();
|
||||
|
||||
// With 4 files totaling 500MB, we should see smooth progress
|
||||
// not 4 discrete 25% jumps
|
||||
let mut increments = Vec::new();
|
||||
for i in 1..snapshots.len() {
|
||||
let increment = snapshots[i].percentage - snapshots[i - 1].percentage;
|
||||
if increment > 0.0 {
|
||||
increments.push(increment);
|
||||
// Analyze collected data
|
||||
let progress_snapshots = progress_snapshots.lock().unwrap().clone();
|
||||
let metadata_snapshots = metadata_snapshots.lock().unwrap().clone();
|
||||
|
||||
// Calculate statistics
|
||||
let percentages: Vec<f32> = progress_snapshots.iter().map(|s| s.percentage).collect();
|
||||
let min_percentage = percentages.iter().cloned().fold(1.0f32, f32::min);
|
||||
let max_percentage = percentages.iter().cloned().fold(0.0f32, f32::max);
|
||||
|
||||
let mut percentage_jumps = Vec::new();
|
||||
for i in 1..percentages.len() {
|
||||
let jump = (percentages[i] - percentages[i - 1]) * 100.0;
|
||||
if jump > 0.0 {
|
||||
percentage_jumps.push(jump);
|
||||
}
|
||||
}
|
||||
let max_jump = percentage_jumps.iter().cloned().fold(0.0f32, f32::max);
|
||||
|
||||
if !increments.is_empty() {
|
||||
let max_increment = increments.iter().cloned().fold(0.0f32, f32::max);
|
||||
println!("Maximum progress increment: {:.2}%", max_increment);
|
||||
let files_completed_at_end = if let Some(last_snapshot) = progress_snapshots.last() {
|
||||
last_snapshot.files_copied
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
// Should have reasonable progress granularity
|
||||
// With fast SSDs, individual files copy quickly, so accept larger jumps
|
||||
assert!(
|
||||
max_increment < 30.0,
|
||||
"Progress should update reasonably across files, not jump by {:.1}%",
|
||||
max_increment
|
||||
let test_passed = files_completed_at_end == file_count
|
||||
&& max_percentage >= 0.99
|
||||
&& max_jump < 50.0;
|
||||
|
||||
let failure_reason = if !test_passed {
|
||||
if files_completed_at_end != file_count {
|
||||
Some(format!(
|
||||
"Expected {} files completed, got {}",
|
||||
file_count, files_completed_at_end
|
||||
))
|
||||
} else if max_percentage < 0.99 {
|
||||
Some(format!(
|
||||
"Progress never reached 100% (max: {:.1}%)",
|
||||
max_percentage * 100.0
|
||||
))
|
||||
} else {
|
||||
Some(format!(
|
||||
"Progress bar reset detected (max jump: {:.1}%)",
|
||||
max_jump
|
||||
))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let snapshot = TestSnapshot {
|
||||
test_name: "copy_progress_with_metadata_tracking".to_string(),
|
||||
file_count,
|
||||
file_size_mb,
|
||||
copy_method: "Streaming".to_string(),
|
||||
progress_events: progress_snapshots,
|
||||
metadata_queries: metadata_snapshots,
|
||||
final_metadata: Some(final_metadata_json),
|
||||
summary: TestSummary {
|
||||
total_progress_events: event_count,
|
||||
total_metadata_queries: metadata_query_count,
|
||||
min_percentage,
|
||||
max_percentage,
|
||||
percentage_jumps,
|
||||
max_jump,
|
||||
files_completed_at_end,
|
||||
test_passed,
|
||||
failure_reason,
|
||||
},
|
||||
};
|
||||
|
||||
// Save snapshot if enabled
|
||||
if std::env::var("SD_TEST_SNAPSHOTS").is_ok() {
|
||||
let snapshot_dir = dirs::data_local_dir()
|
||||
.unwrap()
|
||||
.join("spacedrive")
|
||||
.join("test_snapshots")
|
||||
.join("copy_progress_test")
|
||||
.join(chrono::Local::now().format("%Y%m%d_%H%M%S").to_string());
|
||||
|
||||
fs::create_dir_all(&snapshot_dir).await.unwrap();
|
||||
|
||||
let snapshot_path = snapshot_dir.join("test_snapshot.json");
|
||||
let snapshot_json = serde_json::to_string_pretty(&snapshot).unwrap();
|
||||
fs::write(&snapshot_path, snapshot_json).await.unwrap();
|
||||
|
||||
println!("\n📸 Snapshot saved to: {}", snapshot_path.display());
|
||||
} else {
|
||||
// Always write to temp dir for local inspection
|
||||
let temp_snapshot_path = test_root.join("test_snapshot.json");
|
||||
let snapshot_json = serde_json::to_string_pretty(&snapshot).unwrap();
|
||||
fs::write(&temp_snapshot_path, snapshot_json)
|
||||
.await
|
||||
.unwrap();
|
||||
println!(
|
||||
"\n📄 Snapshot written to temp: {}",
|
||||
temp_snapshot_path.display()
|
||||
);
|
||||
println!(" (Set SD_TEST_SNAPSHOTS=1 to save to permanent location)");
|
||||
}
|
||||
|
||||
println!("\nMulti-file progress monitoring test passed!");
|
||||
// Print summary
|
||||
println!("\n=== Test Summary ===");
|
||||
println!("Files completed: {}/{}", files_completed_at_end, file_count);
|
||||
println!(
|
||||
"Progress range: {:.1}% - {:.1}%",
|
||||
min_percentage * 100.0,
|
||||
max_percentage * 100.0
|
||||
);
|
||||
println!("Max progress jump: {:.1}%", max_jump);
|
||||
println!("Test passed: {}", test_passed);
|
||||
if let Some(reason) = &snapshot.summary.failure_reason {
|
||||
println!("Failure reason: {}", reason);
|
||||
}
|
||||
|
||||
// Cleanup: shutdown Core to stop background services
|
||||
// Cleanup
|
||||
core.shutdown().await.unwrap();
|
||||
|
||||
// Explicitly drop Core to free resources
|
||||
drop(core);
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
// Give time for all async cleanup to complete (increased for slower systems)
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
// Assert test passed
|
||||
if !test_passed {
|
||||
panic!(
|
||||
"Test failed: {}",
|
||||
snapshot
|
||||
.summary
|
||||
.failure_reason
|
||||
.unwrap_or_else(|| "Unknown failure".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Check, Circle, X, Spinner } from "@phosphor-icons/react";
|
||||
import { CheckCircle, Circle, X, Spinner } from "@phosphor-icons/react";
|
||||
import clsx from "clsx";
|
||||
import { useEffect, useRef } from "react";
|
||||
import { useLibraryQuery } from "../../../contexts/SpacedriveContext";
|
||||
import type { JobListItem } from "../types";
|
||||
import type { SpeedSample } from "../hooks/useJobs";
|
||||
@@ -14,13 +15,24 @@ interface CopyJobDetailsProps {
|
||||
|
||||
export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
|
||||
const generic = job.generic_progress;
|
||||
const scrollContainerRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// Fetch copy metadata (file queue with File objects)
|
||||
const { data: metadata } = useLibraryQuery({
|
||||
const { data: metadata, refetch } = useLibraryQuery({
|
||||
type: "jobs.get_copy_metadata",
|
||||
input: { job_id: job.id },
|
||||
});
|
||||
|
||||
// Refetch when completed count changes
|
||||
const prevCompletedRef = useRef<number>(0);
|
||||
useEffect(() => {
|
||||
const currentCompleted = generic?.completion?.completed || 0;
|
||||
if (currentCompleted !== prevCompletedRef.current) {
|
||||
prevCompletedRef.current = currentCompleted;
|
||||
refetch();
|
||||
}
|
||||
}, [generic?.completion?.completed, refetch]);
|
||||
|
||||
if (!generic) {
|
||||
return (
|
||||
<div className="p-4 text-xs text-ink-faint">
|
||||
@@ -38,6 +50,25 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
|
||||
fileMap.set(file.id, file);
|
||||
});
|
||||
|
||||
// Auto-scroll to center the currently copying file
|
||||
useEffect(() => {
|
||||
const currentFileIndex = files.findIndex(f => f.status === "copying");
|
||||
if (currentFileIndex !== -1 && scrollContainerRef.current) {
|
||||
const container = scrollContainerRef.current;
|
||||
const fileElements = container.children;
|
||||
const currentElement = fileElements[currentFileIndex] as HTMLElement;
|
||||
|
||||
if (currentElement) {
|
||||
const containerHeight = container.clientHeight;
|
||||
const elementTop = currentElement.offsetTop;
|
||||
const elementHeight = currentElement.clientHeight;
|
||||
const scrollTop = elementTop - containerHeight / 2 + elementHeight / 2;
|
||||
|
||||
container.scrollTo({ top: scrollTop, behavior: "smooth" });
|
||||
}
|
||||
}
|
||||
}, [files]);
|
||||
|
||||
return (
|
||||
<div className="p-4 space-y-4">
|
||||
{/* Speed graph */}
|
||||
@@ -47,7 +78,10 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
|
||||
{files.length > 0 && (
|
||||
<div>
|
||||
<div className="text-xs font-medium text-ink mb-2">Transfer Queue</div>
|
||||
<div className="space-y-0 max-h-[200px] overflow-y-auto border border-app-line rounded-lg">
|
||||
<div
|
||||
ref={scrollContainerRef}
|
||||
className="space-y-0 max-h-[200px] overflow-y-auto border border-app-line rounded-lg"
|
||||
>
|
||||
{files.map((file, index) => {
|
||||
const fileObj = file.entry_id ? fileMap.get(file.entry_id) : null;
|
||||
const isEven = index % 2 === 0;
|
||||
@@ -56,15 +90,16 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
|
||||
<div
|
||||
key={index}
|
||||
className={clsx(
|
||||
"flex items-center gap-3 px-3 py-2",
|
||||
"flex items-center gap-3 px-3 py-2 transition-opacity",
|
||||
isEven ? "bg-app/30" : "bg-app/10",
|
||||
index === 0 && "rounded-t-lg",
|
||||
index === files.length - 1 && "rounded-b-lg"
|
||||
index === files.length - 1 && "rounded-b-lg",
|
||||
file.status === "completed" && "opacity-50"
|
||||
)}
|
||||
>
|
||||
{/* Status icon */}
|
||||
{file.status === "completed" ? (
|
||||
<Check size={14} weight="bold" className="text-green-500 flex-shrink-0" />
|
||||
<CheckCircle size={14} weight="fill" className="text-ink-dull flex-shrink-0" />
|
||||
) : file.status === "copying" ? (
|
||||
<Spinner size={14} className="text-accent animate-spin flex-shrink-0" />
|
||||
) : file.status === "failed" ? (
|
||||
@@ -89,7 +124,7 @@ export function CopyJobDetails({ job, speedHistory }: CopyJobDetailsProps) {
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className={clsx(
|
||||
"text-xs font-medium truncate",
|
||||
file.status === "completed" && "text-ink-dull line-through",
|
||||
file.status === "completed" && "text-ink-dull",
|
||||
file.status === "copying" && "text-ink",
|
||||
file.status === "failed" && "text-red-400",
|
||||
file.status === "skipped" && "text-ink-dull",
|
||||
|
||||
@@ -51,6 +51,7 @@ export function useJobManager() {
|
||||
return {
|
||||
...job,
|
||||
progress: progressData.progress,
|
||||
generic_progress: generic,
|
||||
...(generic && {
|
||||
current_phase: generic.phase,
|
||||
current_path: generic.current_path,
|
||||
|
||||
Reference in New Issue
Block a user