Remove obsolete job system documentation and related test files

- Deleted the `fix_job_system.md` and `JOB_CLEANUP_FIX.md` files, which contained outdated information regarding job system issues and fixes.
- Removed associated test files that were no longer relevant to the current implementation of the job management system.
- This cleanup helps streamline the documentation and testing structure, ensuring that only current and relevant information is maintained.

These changes aim to improve project organization and clarity by eliminating obsolete content.
This commit is contained in:
Jamie Pine
2025-07-13 19:30:35 -07:00
parent 3416006cf3
commit 907229c8a0
15 changed files with 850 additions and 1484 deletions

View File

@@ -1,46 +0,0 @@
# Job Manager Memory Cleanup Fix
## Problem
The job manager was not removing completed jobs from the `running_jobs` HashMap, causing:
- Memory leak as the map only grew over time
- Stale status being reported (jobs showing as "Running" even when database showed "Completed")
- Race condition where in-memory status took precedence over database status
## Root Cause
In `JobExecutor::run()`, when a job completed:
1. The database status was updated to "Completed" ✓
2. The status channel was updated ✓
3. But the job was never removed from `JobManager.running_jobs`
## Solution
Added cleanup tasks that monitor job status changes and remove jobs from `running_jobs` when they complete, fail, or are cancelled.
### Changes Made
1. **Job Dispatch Cleanup** (lines 256-301)
- Added a monitoring task that subscribes to status changes
- Removes job from `running_jobs` on terminal states (Completed/Failed/Cancelled)
- Emits appropriate events (JobCompleted, JobFailed, JobCancelled)
2. **Job Dispatch with Priority Cleanup** (lines 482-527)
- Same cleanup logic for the priority dispatch method
3. **Resumed Jobs Cleanup** (lines 920-965)
- Added cleanup for jobs that are resumed from interruption
- Ensures resumed jobs are also properly cleaned up
4. **Event Emission**
- Now properly emits JobCompleted, JobFailed, and JobCancelled events
- These events were defined but never actually emitted before
## Testing
Created `examples/test_job_cleanup.rs` to verify:
- Jobs are removed from memory when they complete
- Database and in-memory status stay synchronized
- No memory leaks from accumulating completed jobs
## Benefits
- Fixes memory leak in long-running systems
- Ensures accurate job status reporting
- Eliminates race condition between database and memory state
- Properly emits job lifecycle events for monitoring

View File

@@ -1,122 +0,0 @@
use sd_core_new::{
context::CoreContext,
infrastructure::jobs::{Job, JobHandler, JobInfo, JobOutput, JobResult, Progress},
library::{Library, LibraryConfig, LibraryManager},
};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestJob {
duration_secs: u64,
}
#[sd_core_new::infrastructure::jobs::job]
impl Job for TestJob {
const NAME: &'static str = "test_job";
async fn run(
mut self,
ctx: sd_core_new::infrastructure::jobs::context::JobContext,
) -> JobResult<JobOutput> {
info!("Test job started, will run for {} seconds", self.duration_secs);
// Simulate work with progress updates
for i in 0..self.duration_secs {
ctx.progress(Progress::percentage(i as f32 / self.duration_secs as f32)).await;
sleep(Duration::from_secs(1)).await;
}
ctx.progress(Progress::percentage(1.0)).await;
info!("Test job completed");
Ok(JobOutput::Empty)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
// Create temp directory for testing
let temp_dir = tempfile::tempdir()?;
let data_dir = temp_dir.path().to_path_buf();
// Initialize core context
let context = Arc::new(CoreContext::new(data_dir.clone()).await?);
// Create a test library
let library_manager = context.library_manager.clone();
let library_config = LibraryConfig {
name: "Test Library".to_string(),
description: Some("Test library for job cleanup".to_string()),
};
let library = library_manager.create_library(library_config).await?;
info!("Created library: {}", library.id);
// Get the job manager
let job_manager = library.job_manager.clone();
// Dispatch a test job
let test_job = TestJob { duration_secs: 3 };
let handle = job_manager.dispatch(test_job).await?;
let job_id = handle.id;
info!("Dispatched job: {}", job_id);
// Monitor the job status
let mut last_status = None;
for i in 0..10 {
sleep(Duration::from_secs(1)).await;
// Check if job is in running_jobs
let running_jobs = job_manager.list_running_jobs().await;
let in_memory = running_jobs.iter().any(|j| j.id == job_id.0);
// Check database status
let job_info = job_manager.get_job_info(job_id.0).await?;
if let Some(info) = &job_info {
let status_changed = last_status.as_ref() != Some(&info.status);
if status_changed {
info!(
"Job {} status: {:?}, progress: {:.1}%, in_memory: {}",
job_id,
info.status,
info.progress * 100.0,
in_memory
);
last_status = Some(info.status.clone());
}
// Check if job completed
if matches!(info.status, sd_core_new::infrastructure::jobs::types::JobStatus::Completed) {
info!("Job completed! Checking if removed from memory...");
sleep(Duration::from_millis(100)).await; // Give cleanup task time to run
let running_jobs_after = job_manager.list_running_jobs().await;
let still_in_memory = running_jobs_after.iter().any(|j| j.id == job_id.0);
if still_in_memory {
eprintln!("ERROR: Job {} is still in running_jobs map after completion!", job_id);
} else {
info!("SUCCESS: Job {} was properly removed from running_jobs map", job_id);
}
break;
}
}
}
// Final check
let final_running = job_manager.list_running_jobs().await;
info!("Final running jobs count: {}", final_running.len());
Ok(())
}

View File

@@ -1,48 +0,0 @@
# Job System Issues Fix Summary
## Issues Identified
1. **Duplicate Job Creation**: Two indexer jobs are created for one location add
2. **Progress Always Shows 0%**: Jobs complete but progress remains at 0%
3. **Status Shows "Queued"**: Completed jobs show as "Queued" instead of "Completed"
4. **Non-linear Progress**: Progress jumps around instead of increasing smoothly
## Root Causes
### 1. Duplicate Jobs
- The CLI might be sending duplicate requests
- Or there's a race condition in job creation
- Need to add deduplication logic
### 2. Progress Not Updating
- Progress forwarding is working in JobManager
- But jobs might be completing too fast for progress to be captured
- Need to ensure final progress is always saved
### 3. Status Issues
- Jobs complete quickly but status might not be persisted properly
- Need better status synchronization
### 4. Non-linear Progress
- Already fixed by implementing phase-based progress ranges
- Discovery: 5-10%, Processing: 20-60%, Content: 70-98%, Finalizing: 99%
## Fixes Implemented
1. **Progress Persistence**: Added batched progress updates to database
2. **Status Synchronization**: Final progress saved with status updates
3. **Unified Query System**: Combined memory and database queries
4. **Progress Calculation**: Fixed phase-based progress calculations
## Additional Fixes Needed
1. **Deduplication**: Add request ID or check for existing jobs before creating new ones
2. **Minimum Job Duration**: Ensure jobs run long enough to be monitored
3. **Better Error Handling**: Log all job state transitions
## Testing
After applying fixes:
- Run `sd location add` and check for duplicate jobs with `sd job list`
- Monitor with `sd job monitor` to see real-time progress
- Verify completed jobs show 100% and "Completed" status

View File

@@ -1,219 +0,0 @@
# Spacedrive Core Integration Tests
This directory contains integration tests for Spacedrive Core v2, focusing on end-to-end functionality and real-world usage scenarios.
## Test Files
### `cli_pairing_integration.rs`
**Complete CLI pairing workflow testing**
Tests the full device pairing functionality that would be used by the CLI:
- Two Core instances simulating different devices
- Pairing code generation and joining
- Automatic device registration
- Persistent connections across restarts
- Error handling and edge cases
- Session management APIs
**Key Tests:**
- `test_cli_pairing_full_workflow` - Complete pairing between two devices
- `test_cli_pairing_error_conditions` - Error handling and invalid inputs
- `test_cli_pairing_session_management` - Session lifecycle management
### `integration_networking.rs`
**Basic networking functionality**
Tests core networking initialization and basic functionality:
- Core networking initialization
- Device pairing integration
- Spacedrop API integration
- Networking service features
### Other Integration Tests
- `cas_generation_test.rs` - Content addressable storage testing
- `file_copy_integration_test.rs` - File operations testing
- `copy_progress_monitor_test.rs` - **Copy progress monitoring with large files**
- Tests smooth byte-level progress updates for 1GB file copies
- Verifies progress doesn't jump in large increments
- Tests multi-file copy progress aggregation
- `copy_progress_quick_test.rs` - **Quick copy progress verification**
- Faster variant using smaller files (50MB)
- Tests directory copy progress
- Suitable for rapid iteration during development
- `job_registration_test.rs` - Job system testing
- `job_system_test.rs` - Advanced job system functionality
- `library_test.rs` - Library management testing
- `volume_test.rs` - Volume detection and management
## Running Tests
### Run All Integration Tests
```bash
cargo test --tests
```
### Run Specific Test File
```bash
# CLI pairing tests
cargo test cli_pairing_integration
# Networking tests
cargo test integration_networking
# CAS tests
cargo test cas_generation_test
# Copy progress monitoring tests
cargo test copy_progress_monitor_test
# Quick progress tests (faster)
cargo test copy_progress_quick_test
```
### Run Specific Test Function
```bash
# Full CLI pairing workflow
cargo test test_cli_pairing_full_workflow
# Error condition testing
cargo test test_cli_pairing_error_conditions
```
### Debug Mode with Logging
```bash
# Show all output and enable debug logging
RUST_LOG=debug cargo test cli_pairing_integration -- --nocapture
# Show output for specific networking components
RUST_LOG=sd_core_new::networking=debug cargo test cli_pairing_integration -- --nocapture
# Show libp2p debug logs
RUST_LOG=libp2p_swarm=debug,sd_core_new::networking::pairing::protocol=debug cargo test cli_pairing_integration -- --nocapture
```
### Single-threaded Testing
```bash
# Run tests one at a time (useful for networking tests)
cargo test cli_pairing_integration -- --test-threads=1
```
## Test Environment Notes
### CLI Pairing Tests
- Creates temporary directories for test data
- Tests real libp2p networking (may be slow in CI)
- Includes timeout handling for network operations
- Tests persistence across Core restarts
- Cleans up temporary files automatically
### Network-dependent Tests
Some tests require actual network functionality:
- May be slower in CI environments
- May timeout if network discovery fails
- Include graceful fallbacks for network issues
### Local Development
For faster iteration during development:
```bash
# Run only fast tests
cargo test --tests --exclude cli_pairing_integration
# Run CLI pairing tests with shorter timeouts
RUST_LOG=info cargo test test_cli_pairing_error_conditions -- --nocapture
# Run quick copy progress test instead of the full 1GB test
cargo test test_copy_progress_quick -- --nocapture
# Run with debug logging to see detailed progress updates
RUST_LOG=debug cargo test copy_progress_monitor_test -- --nocapture
```
### Copy Progress Tests
The copy progress tests verify smooth byte-level progress updates:
- **Full test** (`copy_progress_monitor_test`): Uses 1GB files, takes 1-2 minutes
- **Quick test** (`copy_progress_quick_test`): Uses 50MB files, takes ~10 seconds
Both tests verify that progress updates smoothly rather than jumping in large increments (e.g., 25% at a time for 4 files).
## Test Data
Tests create temporary directories in the system temp directory:
- Pattern: `/tmp/test-{test-name}-{uuid}`
- Automatically cleaned up after tests
- Safe to delete manually if tests are interrupted
## Debugging Failed Tests
### Network Issues
```bash
# Check if networking is working
RUST_LOG=libp2p=debug cargo test test_cli_pairing_full_workflow -- --nocapture
# Test with extended timeouts
RUST_TEST_TIMEOUT=120 cargo test cli_pairing_integration
```
### Permission Issues
```bash
# Ensure temp directory is writable
ls -la /tmp/
# Check if ports are available
netstat -ln | grep 52063
```
### Cleanup Issues
```bash
# Clean up any remaining test directories
rm -rf /tmp/test-*
```
## Contributing
When adding new integration tests:
1. **Follow the naming convention**: `{feature}_integration.rs`
2. **Include comprehensive documentation** in the file header
3. **Add cleanup code** for any resources created
4. **Handle timeouts gracefully** for network operations
5. **Test both success and error conditions**
6. **Update this README** with new test descriptions
### Example Test Structure
```rust
#[tokio::test]
async fn test_new_feature_integration() {
// Setup
let temp_dir = std::env::temp_dir().join(format!("test-feature-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).unwrap();
// Test logic
// ...
// Cleanup
std::fs::remove_dir_all(&temp_dir).ok();
}
```

View File

@@ -1,136 +0,0 @@
//! Test copy action dispatch without full job execution
//!
//! This test verifies that the action system can properly dispatch copy actions
//! and validate them correctly.
use sd_core_new::{
operations::files::copy::{
action::{FileCopyAction, FileCopyHandler},
job::CopyOptions,
},
infrastructure::actions::{
Action,
handler::ActionHandler,
},
};
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::fs;
use uuid::Uuid;
/// Helper to create test files with content
async fn create_test_file(path: &std::path::Path, content: &str) -> Result<(), std::io::Error> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(path, content).await
}
#[tokio::test]
async fn test_copy_action_handler_validation() {
// Setup test environment
let temp_dir = TempDir::new().unwrap();
let test_root = temp_dir.path();
// Create source and destination directories
let source_dir = test_root.join("source");
let dest_dir = test_root.join("destination");
fs::create_dir_all(&source_dir).await.unwrap();
fs::create_dir_all(&dest_dir).await.unwrap();
// Create test files
let source_file1 = source_dir.join("test1.txt");
let source_file2 = source_dir.join("test2.txt");
create_test_file(&source_file1, "Hello, World! This is test file 1.").await.unwrap();
create_test_file(&source_file2, "This is the content of test file 2.").await.unwrap();
// Create a copy action with valid sources
let copy_action = FileCopyAction {
sources: vec![source_file1.clone(), source_file2.clone()],
destination: dest_dir.clone(),
options: CopyOptions::default(),
};
let library_id = Uuid::new_v4();
let action = Action::FileCopy {
library_id,
action: copy_action,
};
// Test handler can handle this action
let handler = FileCopyHandler::new();
assert!(handler.can_handle(&action));
assert_eq!(FileCopyHandler::supported_actions(), &["file.copy"]);
// Test validation without context (should pass basic validation)
// Note: We can't test full validation without a proper CoreContext
// but we can test the basic structure
println!("✅ Copy action handler validation test passed!");
}
#[tokio::test]
async fn test_copy_action_validation_errors() {
// Test with empty sources - should fail validation
let copy_action = FileCopyAction {
sources: vec![], // Empty sources
destination: PathBuf::from("/tmp/dest"),
options: CopyOptions::default(),
};
let library_id = Uuid::new_v4();
let action = Action::FileCopy {
library_id,
action: copy_action,
};
// Create handler
let handler = FileCopyHandler::new();
// Test that handler can handle the action type
assert!(handler.can_handle(&action));
// For actual validation testing, we would need a proper CoreContext
// which requires full core initialization. The validation logic
// is in the handler's validate method.
println!("✅ Copy action validation errors test passed!");
}
#[test]
fn test_copy_action_metadata() {
let source_file = PathBuf::from("/tmp/source.txt");
let dest_dir = PathBuf::from("/tmp/dest");
let copy_action = FileCopyAction {
sources: vec![source_file.clone()],
destination: dest_dir.clone(),
options: CopyOptions::default(),
};
let library_id = Uuid::new_v4();
let action = Action::FileCopy {
library_id,
action: copy_action,
};
// Test action metadata
assert_eq!(action.library_id(), Some(library_id));
assert_eq!(action.kind(), "file.copy");
let description = action.description();
assert!(description.contains("Copy"));
assert!(description.contains("1 file(s)"));
assert!(description.contains("/tmp/dest"));
let targets = action.targets_summary();
let sources = targets.get("sources").unwrap();
let destination = targets.get("destination").unwrap();
assert!(sources.is_array());
assert_eq!(sources.as_array().unwrap().len(), 1);
assert_eq!(destination.as_str().unwrap(), "/tmp/dest");
println!("✅ Copy action metadata test passed!");
}

View File

@@ -1,396 +0,0 @@
//! Integration tests for the file copy action system
//!
//! This test verifies the complete flow from action dispatch through job execution,
//! file system operations, and audit logging.
use sd_core_new::{
infrastructure::{
actions::{manager::ActionManager, Action},
database::entities::{audit_log, AuditLog},
jobs::types::{JobId, JobStatus},
},
operations::files::{
copy::{
action::FileCopyAction,
job::{CopyOptions, MoveMode},
},
input::CopyMethod,
},
Core,
};
use sea_orm::{EntityTrait, QuerySelect};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tempfile::TempDir;
use tokio::{fs, time::timeout};
use uuid::Uuid;
/// Helper to create test files with content
async fn create_test_file(path: &std::path::Path, content: &str) -> Result<(), std::io::Error> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(path, content).await
}
/// Helper to verify file content matches expected
async fn verify_file_content(
path: &std::path::Path,
expected: &str,
) -> Result<bool, std::io::Error> {
let content = fs::read_to_string(path).await?;
Ok(content == expected)
}
#[tokio::test]
async fn test_copy_action_full_integration() {
// Setup test environment
let temp_dir = TempDir::new().unwrap();
let test_root = temp_dir.path();
// Create source and destination directories
let source_dir = test_root.join("source");
let dest_dir = test_root.join("destination");
fs::create_dir_all(&source_dir).await.unwrap();
fs::create_dir_all(&dest_dir).await.unwrap();
// Create test files
let source_file1 = source_dir.join("test1.txt");
let source_file2 = source_dir.join("test2.txt");
let dest_file1 = dest_dir.join("test1.txt");
let dest_file2 = dest_dir.join("test2.txt");
create_test_file(&source_file1, "Hello, World! This is test file 1.")
.await
.unwrap();
create_test_file(&source_file2, "This is the content of test file 2.")
.await
.unwrap();
// Initialize core with custom data directory
let core_data_dir = test_root.join("core_data");
let core = Core::new_with_config(core_data_dir).await.unwrap();
// Create a test library
let library = core
.libraries
.create_library("Copy Test Library", None, core.context.clone())
.await
.unwrap();
let library_id = library.id();
// Create ActionManager
let context = core.context.clone();
let action_manager = ActionManager::new(context);
// Build the copy action
let copy_action = FileCopyAction {
sources: vec![source_file1.clone(), source_file2.clone()],
destination: dest_dir.clone(),
options: CopyOptions {
overwrite: false,
copy_method: CopyMethod::Auto,
verify_checksum: true,
preserve_timestamps: true,
delete_after_copy: false,
move_mode: None,
},
};
// Create the Action enum with library context
let action = Action::FileCopy {
library_id,
action: copy_action,
};
// Record initial state
let initial_audit_count = count_audit_entries(&library, library_id).await;
// Verify source files exist and destination files don't
assert!(source_file1.exists());
assert!(source_file2.exists());
assert!(!dest_file1.exists());
assert!(!dest_file2.exists());
// ===== Execute the action =====
let action_output = action_manager
.dispatch(action)
.await
.expect("Action dispatch should succeed");
// Verify action output
assert_eq!(action_output.output_type, "file.copy.dispatched");
assert!(action_output.data.get("job_id").is_some());
assert!(action_output.message.contains("Dispatched file copy job"));
// Extract job ID from output
let job_id_value = action_output.data.get("job_id").unwrap();
let job_id_str = job_id_value.as_str().expect("job_id should be a string");
let job_id = Uuid::parse_str(job_id_str).expect("job_id should be valid UUID");
// ===== Wait for job completion =====
// Poll job status until completion (with timeout)
let job_completion = timeout(Duration::from_secs(30), async {
loop {
if let Some(job_handle) = library.jobs().get_job(JobId::from(job_id)).await {
let status = job_handle.status();
if matches!(status, JobStatus::Completed | JobStatus::Failed) {
return status;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.expect("Job should complete within timeout");
// Verify job completed successfully
assert!(
matches!(job_completion, JobStatus::Completed),
"Job should complete successfully"
);
// ===== Verify file system changes =====
// Check that files were copied successfully
assert!(dest_file1.exists(), "Destination file 1 should exist");
assert!(dest_file2.exists(), "Destination file 2 should exist");
// Verify file contents match
assert!(
verify_file_content(&dest_file1, "Hello, World! This is test file 1.")
.await
.unwrap(),
"Destination file 1 content should match source"
);
assert!(
verify_file_content(&dest_file2, "This is the content of test file 2.")
.await
.unwrap(),
"Destination file 2 content should match source"
);
// Verify source files still exist (copy, not move)
assert!(source_file1.exists(), "Source file 1 should still exist");
assert!(source_file2.exists(), "Source file 2 should still exist");
// ===== Verify audit log =====
let final_audit_count = count_audit_entries(&library, library_id).await;
assert_eq!(
final_audit_count,
initial_audit_count + 1,
"Should have one new audit log entry"
);
// Get the audit log entry
let audit_entries = get_recent_audit_entries(&library, library_id, 1).await;
assert_eq!(
audit_entries.len(),
1,
"Should have exactly one audit entry"
);
let audit_entry = &audit_entries[0];
assert_eq!(audit_entry.action_type, "file.copy");
assert_eq!(audit_entry.status, audit_log::ActionStatus::Completed);
assert!(
audit_entry.job_id.is_some(),
"Audit entry should have job_id"
);
assert_eq!(audit_entry.job_id.as_ref().unwrap(), &job_id.to_string());
assert!(
audit_entry.completed_at.is_some(),
"Audit entry should have completion time"
);
assert!(
audit_entry.error_message.is_none(),
"Audit entry should not have error message"
);
// Verify audit entry targets contain source and destination info (now stored as JSON string)
let targets_json: serde_json::Value = serde_json::from_str(&audit_entry.targets).unwrap();
assert!(
targets_json.get("sources").is_some(),
"Audit should contain sources"
);
assert!(
targets_json.get("destination").is_some(),
"Audit should contain destination"
);
let sources = targets_json.get("sources").unwrap().as_array().unwrap();
assert_eq!(sources.len(), 2, "Should have 2 source files in audit");
println!("✅ Copy action integration test passed!");
println!(" - Action dispatched successfully");
println!(" - Job executed and completed");
println!(" - Files copied correctly");
println!(" - Audit log entry created");
}
#[tokio::test]
async fn test_copy_action_with_move_operation() {
// Setup test environment
let temp_dir = TempDir::new().unwrap();
let test_root = temp_dir.path();
let source_dir = test_root.join("source");
let dest_dir = test_root.join("destination");
fs::create_dir_all(&source_dir).await.unwrap();
fs::create_dir_all(&dest_dir).await.unwrap();
// Create test file
let source_file = source_dir.join("move_test.txt");
let dest_file = dest_dir.join("move_test.txt");
create_test_file(&source_file, "This file will be moved.")
.await
.unwrap();
// Initialize core and library
let core_data_dir = test_root.join("core_data");
let core = Core::new_with_config(core_data_dir).await.unwrap();
let library = core
.libraries
.create_library("Move Test Library", None, core.context.clone())
.await
.unwrap();
let library_id = library.id();
// Create ActionManager
let context = core.context.clone();
let action_manager = ActionManager::new(context);
// Build move action (copy with delete_after_copy)
let copy_action = FileCopyAction {
sources: vec![source_file.clone()],
destination: dest_file.clone(),
options: CopyOptions {
overwrite: false,
copy_method: CopyMethod::Auto,
verify_checksum: false,
preserve_timestamps: true,
delete_after_copy: true, // This makes it a move operation
move_mode: Some(MoveMode::Move),
},
};
let action = Action::FileCopy {
library_id,
action: copy_action,
};
// Verify initial state
assert!(source_file.exists());
assert!(!dest_file.exists());
// Execute the move action
let action_output = action_manager
.dispatch(action)
.await
.expect("Move action should succeed");
// Extract and wait for job completion
let job_id_str = action_output.data.get("job_id").unwrap().as_str().unwrap();
let job_id = Uuid::parse_str(job_id_str).unwrap();
// Wait for job completion
timeout(Duration::from_secs(15), async {
loop {
if let Some(job_handle) = library.jobs().get_job(JobId::from(job_id)).await {
let status = job_handle.status();
if matches!(status, JobStatus::Completed | JobStatus::Failed) {
break;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("Move job should complete");
// Verify file was moved (destination exists, source doesn't)
assert!(
dest_file.exists(),
"Destination file should exist after move"
);
assert!(
!source_file.exists(),
"Source file should not exist after move"
);
// Verify content
assert!(
verify_file_content(&dest_file, "This file will be moved.")
.await
.unwrap(),
"Moved file content should match"
);
println!("✅ Move operation test passed!");
}
#[tokio::test]
async fn test_copy_action_validation_errors() {
let temp_dir = TempDir::new().unwrap();
let core_data_dir = temp_dir.path().join("core_data");
let core = Core::new_with_config(core_data_dir).await.unwrap();
let library = core
.libraries
.create_library("Validation Test Library", None, core.context.clone())
.await
.unwrap();
let library_id = library.id();
let context = core.context.clone();
let action_manager = ActionManager::new(context);
// Test 1: Empty sources should fail validation
let invalid_action = Action::FileCopy {
library_id,
action: FileCopyAction {
sources: vec![], // Empty sources
destination: PathBuf::from("/tmp/dest"),
options: CopyOptions::default(),
},
};
let result = action_manager.dispatch(invalid_action).await;
assert!(
result.is_err(),
"Empty sources should cause validation error"
);
let error = result.unwrap_err();
assert!(
error.to_string().contains("At least one source"),
"Error should mention source requirement"
);
println!("✅ Validation error test passed!");
}
/// Helper function to count audit log entries for a library
async fn count_audit_entries(
library: &Arc<sd_core_new::library::Library>,
_library_id: Uuid,
) -> usize {
let db = library.db().conn();
AuditLog::find().all(db).await.unwrap_or_default().len()
}
/// Helper function to get recent audit log entries
async fn get_recent_audit_entries(
library: &Arc<sd_core_new::library::Library>,
_library_id: Uuid,
limit: u64,
) -> Vec<audit_log::Model> {
let db = library.db().conn();
AuditLog::find()
.limit(limit)
.all(db)
.await
.unwrap_or_default()
}

View File

@@ -71,12 +71,18 @@ async fn alice_persistence_scenario() {
let connected_devices = core.get_connected_devices().await.unwrap();
if !connected_devices.is_empty() {
println!("🎉 Alice: Auto-reconnection successful!");
println!("✅ Alice: Connected {} devices after restart", connected_devices.len());
println!(
"✅ Alice: Connected {} devices after restart",
connected_devices.len()
);
// Verify it's Bob
let device_info = core.get_connected_devices_info().await.unwrap();
let bob_found = device_info.iter().any(|d| d.device_name.contains("Bob"));
assert!(bob_found, "Bob not found in connected devices after restart");
assert!(
bob_found,
"Bob not found in connected devices after restart"
);
for device in &device_info {
println!(
@@ -86,7 +92,11 @@ async fn alice_persistence_scenario() {
}
// Write success marker
std::fs::write("/tmp/spacedrive-persistence-test/alice_restart_success.txt", "success").unwrap();
std::fs::write(
"/tmp/spacedrive-persistence-test/alice_restart_success.txt",
"success",
)
.unwrap();
println!("✅ Alice: Device persistence test completed successfully");
break;
}
@@ -97,7 +107,10 @@ async fn alice_persistence_scenario() {
}
if attempts % 5 == 0 {
println!("🔍 Alice: Auto-reconnection check {} - waiting for Bob", attempts / 5);
println!(
"🔍 Alice: Auto-reconnection check {} - waiting for Bob",
attempts / 5
);
}
}
} else {
@@ -141,7 +154,10 @@ async fn alice_persistence_scenario() {
panic!("Networking not initialized");
};
println!("✅ Alice: Pairing code generated (expires in {}s)", expires_in);
println!(
"✅ Alice: Pairing code generated (expires in {}s)",
expires_in
);
// Write pairing code for Bob
std::fs::create_dir_all("/tmp/spacedrive-persistence-test").unwrap();
@@ -168,12 +184,22 @@ async fn alice_persistence_scenario() {
if let Some(networking) = core.networking() {
let registry = networking.device_registry();
let paired_devices = registry.read().await.get_paired_devices();
assert!(!paired_devices.is_empty(), "No paired devices found in registry");
println!("✅ Alice: {} devices persisted to registry", paired_devices.len());
assert!(
!paired_devices.is_empty(),
"No paired devices found in registry"
);
println!(
"✅ Alice: {} devices persisted to registry",
paired_devices.len()
);
}
// Write success marker
std::fs::write("/tmp/spacedrive-persistence-test/alice_paired.txt", "success").unwrap();
std::fs::write(
"/tmp/spacedrive-persistence-test/alice_paired.txt",
"success",
)
.unwrap();
// Keep running for a bit to ensure persistence completes
tokio::time::sleep(Duration::from_secs(3)).await;
@@ -251,12 +277,18 @@ async fn bob_persistence_scenario() {
let connected_devices = core.get_connected_devices().await.unwrap();
if !connected_devices.is_empty() {
println!("🎉 Bob: Auto-reconnection successful!");
println!("✅ Bob: Connected {} devices after restart", connected_devices.len());
println!(
"✅ Bob: Connected {} devices after restart",
connected_devices.len()
);
// Verify it's Alice
let device_info = core.get_connected_devices_info().await.unwrap();
let alice_found = device_info.iter().any(|d| d.device_name.contains("Alice"));
assert!(alice_found, "Alice not found in connected devices after restart");
assert!(
alice_found,
"Alice not found in connected devices after restart"
);
for device in &device_info {
println!(
@@ -266,7 +298,11 @@ async fn bob_persistence_scenario() {
}
// Write success marker
std::fs::write("/tmp/spacedrive-persistence-test/bob_restart_success.txt", "success").unwrap();
std::fs::write(
"/tmp/spacedrive-persistence-test/bob_restart_success.txt",
"success",
)
.unwrap();
println!("✅ Bob: Device persistence test completed successfully");
break;
}
@@ -277,7 +313,10 @@ async fn bob_persistence_scenario() {
}
if attempts % 5 == 0 {
println!("🔍 Bob: Auto-reconnection check {} - waiting for Alice", attempts / 5);
println!(
"🔍 Bob: Auto-reconnection check {} - waiting for Alice",
attempts / 5
);
}
}
} else {
@@ -310,7 +349,9 @@ async fn bob_persistence_scenario() {
// Wait for pairing code from Alice
println!("🔍 Bob: Looking for pairing code...");
let pairing_code = loop {
if let Ok(code) = std::fs::read_to_string("/tmp/spacedrive-persistence-test/pairing_code.txt") {
if let Ok(code) =
std::fs::read_to_string("/tmp/spacedrive-persistence-test/pairing_code.txt")
{
break code.trim().to_string();
}
tokio::time::sleep(Duration::from_millis(500)).await;
@@ -348,12 +389,19 @@ async fn bob_persistence_scenario() {
if let Some(networking) = core.networking() {
let registry = networking.device_registry();
let paired_devices = registry.read().await.get_paired_devices();
assert!(!paired_devices.is_empty(), "No paired devices found in registry");
println!("✅ Bob: {} devices persisted to registry", paired_devices.len());
assert!(
!paired_devices.is_empty(),
"No paired devices found in registry"
);
println!(
"✅ Bob: {} devices persisted to registry",
paired_devices.len()
);
}
// Write success marker
std::fs::write("/tmp/spacedrive-persistence-test/bob_paired.txt", "success").unwrap();
std::fs::write("/tmp/spacedrive-persistence-test/bob_paired.txt", "success")
.unwrap();
// Keep running for a bit to ensure persistence completes
tokio::time::sleep(Duration::from_secs(3)).await;
@@ -409,18 +457,17 @@ async fn test_device_persistence() {
// Wait for initial pairing to complete
let pairing_result = runner
.wait_for_condition(
Duration::from_secs(60),
|_| {
let alice_paired = std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_paired.txt")
.wait_for_success(|_| {
let alice_paired =
std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_paired.txt")
.map(|content| content.trim() == "success")
.unwrap_or(false);
let bob_paired = std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_paired.txt")
let bob_paired =
std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_paired.txt")
.map(|content| content.trim() == "success")
.unwrap_or(false);
alice_paired && bob_paired
}
)
alice_paired && bob_paired
})
.await;
if pairing_result.is_err() {
@@ -460,12 +507,15 @@ async fn test_device_persistence() {
// Wait for auto-reconnection
let reconnection_result = runner
.wait_for_success(|_| {
let alice_reconnected = std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_restart_success.txt")
.map(|content| content.trim() == "success")
.unwrap_or(false);
let bob_reconnected = std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_restart_success.txt")
.map(|content| content.trim() == "success")
.unwrap_or(false);
let alice_reconnected = std::fs::read_to_string(
"/tmp/spacedrive-persistence-test/alice_restart_success.txt",
)
.map(|content| content.trim() == "success")
.unwrap_or(false);
let bob_reconnected =
std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_restart_success.txt")
.map(|content| content.trim() == "success")
.unwrap_or(false);
alice_reconnected && bob_reconnected
})
.await;
@@ -483,4 +533,4 @@ async fn test_device_persistence() {
panic!("Devices did not automatically reconnect after restart");
}
}
}
}

View File

@@ -0,0 +1,408 @@
//! Event System Integration Test
//!
//! Tests the event bus functionality by performing various operations
//! and verifying that the correct events are emitted. This includes:
//! - Core lifecycle events (CoreShutdown)
//! - Library management events (LibraryCreated, LibraryOpened, LibraryClosed)
//! - Location and indexing events (LocationAdded, IndexingStarted)
//! - Job system events (JobProgress, JobCompleted)
//! - Event filtering capabilities (library-specific filtering)
//! - Multiple concurrent subscribers
//! - Custom event emission and handling
//!
//! Note: These tests should be run with --test-threads=1 to avoid
//! potential conflicts between tests
use sd_core_new::{
infrastructure::events::{Event, EventFilter},
location::{create_location, LocationCreateArgs, IndexMode},
Core,
};
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_core_and_library_events() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
// Set up event collection
let collected_events = Arc::new(Mutex::new(Vec::new()));
let events_clone = collected_events.clone();
// Initialize core
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Note: CoreStarted is emitted during core initialization, so we won't catch it
// Start collecting events from now on
let mut event_subscriber = core.events.subscribe();
let event_collector = tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
events_clone.lock().await.push(event);
}
});
// Wait a bit for CoreStarted event
tokio::time::sleep(Duration::from_millis(100)).await;
// Test 1: Library creation
let library = core
.libraries
.create_library("Test Event Library", None, core.context.clone())
.await?;
let library_id = library.id();
// Wait for events to be processed
tokio::time::sleep(Duration::from_millis(100)).await;
// Test 2: Library operations
let library_path = library.path().to_path_buf();
drop(library); // Drop the Arc to release the library
core.libraries.close_library(library_id).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
// Open library again by path with context
let library = core.libraries.open_library_with_context(&library_path, core.context.clone()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
// Test 3: Shutdown
drop(library);
core.shutdown().await?;
// Wait for shutdown event to be processed
tokio::time::sleep(Duration::from_millis(100)).await;
// Stop event collector
event_collector.abort();
// Verify collected events
let events = collected_events.lock().await;
// Check for expected events
let event_types: HashSet<String> = events.iter().map(|e| match e {
Event::CoreStarted => "CoreStarted".to_string(),
Event::CoreShutdown => "CoreShutdown".to_string(),
Event::LibraryCreated { .. } => "LibraryCreated".to_string(),
Event::LibraryOpened { .. } => "LibraryOpened".to_string(),
Event::LibraryClosed { .. } => "LibraryClosed".to_string(),
_ => format!("Other({:?})", e),
}).collect();
println!("Collected events: {:?}", event_types);
// Verify core events (CoreStarted was emitted before we subscribed)
assert!(event_types.contains("CoreShutdown"), "Should emit CoreShutdown event");
// Verify library events
assert!(event_types.contains("LibraryCreated"), "Should emit LibraryCreated event");
assert!(event_types.contains("LibraryClosed"), "Should emit LibraryClosed event");
assert!(event_types.contains("LibraryOpened"), "Should emit LibraryOpened event");
Ok(())
}
#[tokio::test]
async fn test_location_and_job_events() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create library
let library = core
.libraries
.create_library("Test Location Events", None, core.context.clone())
.await?;
// Set up filtered event collection - only job and indexing events
let job_events = Arc::new(Mutex::new(Vec::new()));
let job_events_clone = job_events.clone();
let mut event_subscriber = core.events.subscribe();
let event_collector = tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
if event.is_job_event() || matches!(event,
Event::IndexingStarted { .. } |
Event::IndexingProgress { .. } |
Event::IndexingCompleted { .. } |
Event::IndexingFailed { .. } |
Event::LocationAdded { .. }
) {
job_events_clone.lock().await.push(event);
}
}
});
// Create test location
let test_location_dir = temp_dir.path().join("test_location");
tokio::fs::create_dir_all(&test_location_dir).await?;
tokio::fs::write(test_location_dir.join("test.txt"), "Hello World").await?;
// Register device
let db = library.db();
let device = core.device.to_device()?;
use sd_core_new::infrastructure::database::entities;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// Add location (triggers indexing job)
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Test Location".to_string()),
index_mode: IndexMode::Shallow,
};
let _location_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// Wait for indexing to complete
tokio::time::sleep(Duration::from_secs(2)).await;
// Stop collector and check events
event_collector.abort();
let events = job_events.lock().await;
let event_types: Vec<String> = events.iter().map(|e| match e {
Event::JobQueued { .. } => "JobQueued".to_string(),
Event::JobStarted { .. } => "JobStarted".to_string(),
Event::JobProgress { .. } => "JobProgress".to_string(),
Event::JobCompleted { .. } => "JobCompleted".to_string(),
Event::IndexingStarted { .. } => "IndexingStarted".to_string(),
Event::IndexingProgress { .. } => "IndexingProgress".to_string(),
Event::IndexingCompleted { .. } => "IndexingCompleted".to_string(),
Event::LocationAdded { .. } => "LocationAdded".to_string(),
_ => format!("Other({:?})", e),
}).collect();
println!("Job-related events: {:?}", event_types);
// Verify job events (Note: JobQueued might not be emitted if job starts immediately)
assert!(
event_types.contains(&"JobQueued".to_string()) ||
event_types.contains(&"JobStarted".to_string()) ||
event_types.contains(&"JobProgress".to_string()) ||
event_types.contains(&"JobCompleted".to_string()),
"Should emit at least one job event"
);
// Verify location event
assert!(event_types.contains(&"LocationAdded".to_string()), "Should emit LocationAdded event");
// Cleanup
let lib_id = library.id();
core.libraries.close_library(lib_id).await?;
drop(library);
core.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_event_filtering() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create two libraries
let library1 = core
.libraries
.create_library("Library 1", None, core.context.clone())
.await?;
let lib1_id = library1.id();
let library2 = core
.libraries
.create_library("Library 2", None, core.context.clone())
.await?;
let lib2_id = library2.id();
// Set up filtered event collection - only library1 events
let lib1_events = Arc::new(Mutex::new(Vec::new()));
let lib1_events_clone = lib1_events.clone();
let event_subscriber = core.events.subscribe();
let event_collector = tokio::spawn(async move {
let mut subscriber = event_subscriber;
loop {
match timeout(
Duration::from_millis(100),
subscriber.recv_filtered(|e| e.is_for_library(lib1_id))
).await {
Ok(Ok(event)) => {
lib1_events_clone.lock().await.push(event);
}
_ => break,
}
}
});
// Perform operations on both libraries
core.libraries.close_library(lib1_id).await?;
core.libraries.close_library(lib2_id).await?;
// Wait and stop collector
tokio::time::sleep(Duration::from_millis(500)).await;
event_collector.abort();
// Check filtered events
let events = lib1_events.lock().await;
for event in events.iter() {
match event {
Event::LibraryCreated { id, .. } |
Event::LibraryClosed { id, .. } => {
assert_eq!(id, &lib1_id, "Should only receive library1 events");
}
_ => {}
}
}
// Cleanup
drop(library1);
drop(library2);
core.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_concurrent_event_subscribers() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create multiple subscribers
let subscriber1_events = Arc::new(Mutex::new(Vec::new()));
let subscriber2_events = Arc::new(Mutex::new(Vec::new()));
let subscriber3_events = Arc::new(Mutex::new(Vec::new()));
let events1 = subscriber1_events.clone();
let events2 = subscriber2_events.clone();
let events3 = subscriber3_events.clone();
let mut sub1 = core.events.subscribe();
let mut sub2 = core.events.subscribe();
let mut sub3 = core.events.subscribe();
// Start collectors
let collector1 = tokio::spawn(async move {
while let Ok(event) = sub1.recv().await {
if matches!(event, Event::LibraryCreated { .. }) {
events1.lock().await.push(event);
}
}
});
let collector2 = tokio::spawn(async move {
while let Ok(event) = sub2.recv().await {
if matches!(event, Event::LibraryCreated { .. }) {
events2.lock().await.push(event);
}
}
});
let collector3 = tokio::spawn(async move {
while let Ok(event) = sub3.recv().await {
if matches!(event, Event::LibraryCreated { .. }) {
events3.lock().await.push(event);
}
}
});
// Create a library (should be received by all subscribers)
let library = core
.libraries
.create_library("Broadcast Test", None, core.context.clone())
.await?;
// Wait for events
tokio::time::sleep(Duration::from_millis(200)).await;
// Stop collectors
collector1.abort();
collector2.abort();
collector3.abort();
// Verify all subscribers received the event
assert_eq!(subscriber1_events.lock().await.len(), 1, "Subscriber 1 should receive event");
assert_eq!(subscriber2_events.lock().await.len(), 1, "Subscriber 2 should receive event");
assert_eq!(subscriber3_events.lock().await.len(), 1, "Subscriber 3 should receive event");
// Cleanup
let lib_id = library.id();
core.libraries.close_library(lib_id).await?;
drop(library);
core.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_custom_events() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
let collected_events = Arc::new(Mutex::new(Vec::new()));
let events_clone = collected_events.clone();
let mut event_subscriber = core.events.subscribe();
let event_collector = tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
if matches!(event, Event::Custom { .. }) {
events_clone.lock().await.push(event);
}
}
});
// Emit custom events
let custom_data = serde_json::json!({
"action": "test_action",
"value": 42,
"message": "Custom event test"
});
core.events.emit(Event::Custom {
event_type: "test_event".to_string(),
data: custom_data.clone(),
});
// Wait for event processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Stop collector
event_collector.abort();
// Verify custom event
let events = collected_events.lock().await;
assert_eq!(events.len(), 1, "Should receive one custom event");
if let Event::Custom { event_type, data } = &events[0] {
assert_eq!(event_type, "test_event");
assert_eq!(data, &custom_data);
} else {
panic!("Expected custom event");
}
// Test subscriber count
let subscriber_count = core.events.subscriber_count();
println!("Event subscribers: {}", subscriber_count);
assert!(subscriber_count > 0, "Should have active subscribers");
core.shutdown().await?;
Ok(())
}

View File

@@ -0,0 +1,361 @@
//! Indexing Integration Test
//!
//! Tests the production indexer functionality including:
//! - Location creation and indexing
//! - Smart filtering of system files
//! - Inode tracking for incremental indexing
//! - Event monitoring during indexing
//! - Database persistence of indexed entries
//!
//! Note: These tests should be run with --test-threads=1 to avoid
//! device UUID conflicts when multiple tests run in parallel
use sd_core_new::{
infrastructure::database::entities,
location::{create_location, LocationCreateArgs, IndexMode},
Core,
};
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter,
};
use tempfile::TempDir;
use tokio::time::Duration;
#[tokio::test]
async fn test_location_indexing() -> Result<(), Box<dyn std::error::Error>> {
// 1. Setup test environment
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// 2. Create library
let library = core
.libraries
.create_library("Test Indexing Library", None, core.context.clone())
.await?;
// 3. Create test location directory with some files
let test_location_dir = temp_dir.path().join("test_location");
tokio::fs::create_dir_all(&test_location_dir).await?;
// Create test files
tokio::fs::write(test_location_dir.join("test1.txt"), "Hello World").await?;
tokio::fs::write(test_location_dir.join("test2.rs"), "fn main() {}").await?;
tokio::fs::create_dir_all(test_location_dir.join("subdir")).await?;
tokio::fs::write(test_location_dir.join("subdir/test3.md"), "# Test").await?;
// Create files that should be filtered
tokio::fs::write(test_location_dir.join(".DS_Store"), "system file").await?;
tokio::fs::create_dir_all(test_location_dir.join("node_modules")).await?;
tokio::fs::write(test_location_dir.join("node_modules/package.json"), "{}").await?;
// 4. Register device in database
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// 5. Set up to monitor job completion
// Note: Due to current implementation, IndexingCompleted event may not be emitted
// So we'll monitor job status directly instead
// 6. Create location and trigger indexing
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Test Location".to_string()),
index_mode: IndexMode::Deep,
};
let location_db_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// 7. Wait for indexing to complete by monitoring job status
let start_time = tokio::time::Instant::now();
let timeout_duration = Duration::from_secs(30);
let mut job_seen = false;
let mut last_entry_count = 0;
let mut stable_count_iterations = 0;
loop {
// Check all job statuses
let all_jobs = library.jobs().list_jobs(None).await?;
let running_jobs = library
.jobs()
.list_jobs(Some(
sd_core_new::infrastructure::jobs::types::JobStatus::Running,
))
.await?;
// If we see a running job, mark that we've seen it
if !running_jobs.is_empty() {
job_seen = true;
}
// Check if any entries have been created (partial progress)
let current_entries = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.count(db.conn())
.await?;
println!("Job status - Total: {}, Running: {}, Entries indexed: {}",
all_jobs.len(), running_jobs.len(), current_entries);
// If we've seen a job and now there are no jobs, indexing likely completed
if job_seen && all_jobs.is_empty() && current_entries > 0 {
// Wait for entries to stabilize
if current_entries == last_entry_count {
stable_count_iterations += 1;
if stable_count_iterations >= 3 {
println!("Indexing appears complete (job finished, entries stable)");
break;
}
} else {
stable_count_iterations = 0;
}
last_entry_count = current_entries;
}
// Check for failed jobs
let failed_jobs = library
.jobs()
.list_jobs(Some(
sd_core_new::infrastructure::jobs::types::JobStatus::Failed,
))
.await?;
if !failed_jobs.is_empty() {
panic!("Indexing job failed");
}
// Check timeout
if start_time.elapsed() > timeout_duration {
panic!("Indexing timed out after {:?}", timeout_duration);
}
// Wait a bit before checking again
tokio::time::sleep(Duration::from_millis(500)).await;
}
// 8. Verify indexed entries in database
let _entry_count = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.count(db.conn())
.await?;
let file_count = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.filter(entities::entry::Column::Kind.eq(0)) // Files
.count(db.conn())
.await?;
let dir_count = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.filter(entities::entry::Column::Kind.eq(1)) // Directories
.count(db.conn())
.await?;
// 9. Verify smart filtering worked
let all_entries = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.all(db.conn())
.await?;
// Check that filtered files are not indexed
for entry in &all_entries {
assert_ne!(entry.name, ".DS_Store", "System files should be filtered");
assert_ne!(entry.name, "node_modules", "Dev directories should be filtered");
}
// 10. Verify expected counts
assert_eq!(file_count, 3, "Should index 3 files (excluding filtered)");
assert!(dir_count >= 1, "Should index at least 1 directory (subdir)");
// 11. Verify inode tracking
let entries_with_inodes = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.filter(entities::entry::Column::Inode.is_not_null())
.count(db.conn())
.await?;
assert!(entries_with_inodes > 0, "Entries should have inode tracking");
// 12. Cleanup
let lib_id = library.id();
core.libraries.close_library(lib_id).await?;
drop(library);
core.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_incremental_indexing() -> Result<(), Box<dyn std::error::Error>> {
// 1. Setup
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
let library = core
.libraries
.create_library("Test Incremental Library", None, core.context.clone())
.await?;
let test_location_dir = temp_dir.path().join("incremental_test");
tokio::fs::create_dir_all(&test_location_dir).await?;
// Initial files
tokio::fs::write(test_location_dir.join("file1.txt"), "Initial content").await?;
tokio::fs::write(test_location_dir.join("file2.txt"), "More content").await?;
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// 2. First indexing run
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Incremental Test".to_string()),
index_mode: IndexMode::Deep,
};
let location_db_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// Wait for initial indexing to complete
let start_time = tokio::time::Instant::now();
let timeout_duration = Duration::from_secs(10);
let mut job_seen = false;
loop {
let running_jobs = library
.jobs()
.list_jobs(Some(
sd_core_new::infrastructure::jobs::types::JobStatus::Running,
))
.await?;
if !running_jobs.is_empty() {
job_seen = true;
}
let current_entries = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.count(db.conn())
.await?;
if job_seen && running_jobs.is_empty() && current_entries > 0 {
break;
}
if start_time.elapsed() > timeout_duration {
break; // Don't fail, just continue
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
let initial_file_count = entities::entry::Entity::find()
.filter(entities::entry::Column::LocationId.eq(location_db_id))
.filter(entities::entry::Column::Kind.eq(0))
.count(db.conn())
.await?;
assert_eq!(initial_file_count, 2, "Should index 2 initial files");
// Cleanup
let lib_id = library.id();
core.libraries.close_library(lib_id).await?;
drop(library);
core.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_indexing_error_handling() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
let library = core
.libraries
.create_library("Test Error Library", None, core.context.clone())
.await?;
// Try to index non-existent location
let non_existent = temp_dir.path().join("does_not_exist");
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
let location_args = LocationCreateArgs {
path: non_existent,
name: Some("Non-existent".to_string()),
index_mode: IndexMode::Deep,
};
// This should handle the error gracefully
let result = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await;
// The location creation should fail for non-existent path
assert!(result.is_err(), "Should fail to create location for non-existent path");
// Cleanup
let lib_id = library.id();
core.libraries.close_library(lib_id).await?;
drop(library);
core.shutdown().await?;
Ok(())
}

View File

@@ -1,421 +0,0 @@
// //! Integration tests for the job system
// use sd_core_new::{
// infrastructure::jobs::{
// manager::JobManager,
// traits::{Job, JobHandler},
// types::{JobId, JobStatus},
// context::JobContext,
// error::{JobError, JobResult},
// progress::Progress,
// output::JobOutput,
// prelude::JobProgress,
// },
// operations::{
// file_ops::copy_job::FileCopyJob,
// indexing::indexer_job::{IndexerJob, IndexMode},
// },
// shared::types::{SdPath, SdPathBatch},
// };
// use serde::{Deserialize, Serialize};
// use std::{
// path::PathBuf,
// time::Duration,
// };
// use tempfile::TempDir;
// use uuid::Uuid;
// // Simple test job for testing basic functionality
// #[derive(Debug, Serialize, Deserialize)]
// struct TestJob {
// name: String,
// sleep_ms: u64,
// should_fail: bool,
// counter: u32,
// }
// #[derive(Debug, Clone, Serialize, Deserialize)]
// struct TestProgress {
// current: u32,
// total: u32,
// message: String,
// }
// impl JobProgress for TestProgress {}
// impl Job for TestJob {
// const NAME: &'static str = "test_job";
// const RESUMABLE: bool = true;
// const DESCRIPTION: Option<&'static str> = Some("Simple test job");
// }
// #[async_trait::async_trait]
// impl JobHandler for TestJob {
// type Output = TestOutput;
// async fn run(&mut self, ctx: JobContext<'_>) -> JobResult<Self::Output> {
// ctx.log(format!("Starting test job: {}", self.name));
// if self.should_fail {
// return Err(JobError::execution("Test failure"));
// }
// // Simulate work with progress updates
// for i in 0..5 {
// ctx.check_interrupt().await?;
// self.counter += 1;
// ctx.progress(Progress::structured(TestProgress {
// current: i + 1,
// total: 5,
// message: format!("Processing step {}", i + 1),
// }));
// if self.sleep_ms > 0 {
// tokio::time::sleep(Duration::from_millis(self.sleep_ms)).await;
// }
// // Checkpoint every 2 steps
// if i % 2 == 1 {
// ctx.checkpoint().await?;
// }
// }
// ctx.log("Test job completed successfully");
// Ok(TestOutput {
// name: self.name.clone(),
// final_counter: self.counter,
// })
// }
// }
// #[derive(Debug, Serialize, Deserialize)]
// struct TestOutput {
// name: String,
// final_counter: u32,
// }
// impl From<TestOutput> for JobOutput {
// fn from(output: TestOutput) -> Self {
// JobOutput::custom(output)
// }
// }
// impl TestJob {
// fn new(name: String, sleep_ms: u64, should_fail: bool) -> Self {
// Self {
// name,
// sleep_ms,
// should_fail,
// counter: 0,
// }
// }
// }
// #[tokio::test]
// async fn test_job_manager_initialization() {
// let temp_dir = TempDir::new().unwrap();
// let data_dir = temp_dir.path().to_path_buf();
// // Initialize job manager
// let job_manager = JobManager::new(data_dir.clone()).await.unwrap();
// // Verify database was created
// assert!(data_dir.join("jobs.db").exists());
// // Test basic operations
// let jobs = job_manager.list_jobs(None).await.unwrap();
// assert!(jobs.is_empty());
// // Shutdown cleanly
// job_manager.shutdown().await.unwrap();
// }
// #[tokio::test]
// async fn test_job_serialization() {
// // Test FileCopyJob serialization
// let device_id = Uuid::new_v4();
// let sources = vec![
// SdPath::new(device_id, PathBuf::from("/test/file1.txt")),
// SdPath::new(device_id, PathBuf::from("/test/file2.txt")),
// ];
// let destination = SdPath::new(device_id, PathBuf::from("/dest"));
// let sources_batch = SdPathBatch::new(sources);
// let copy_job = FileCopyJob::new(sources_batch, destination);
// // Serialize and deserialize
// let serialized = rmp_serde::to_vec(&copy_job).unwrap();
// let deserialized: FileCopyJob = rmp_serde::from_slice(&serialized).unwrap();
// assert_eq!(copy_job.sources.paths.len(), deserialized.sources.paths.len());
// // Test IndexerJob serialization
// let indexer_job = IndexerJob::new(
// Uuid::new_v4(),
// SdPath::new(device_id, PathBuf::from("/index/path")),
// IndexMode::Deep,
// );
// let serialized = rmp_serde::to_vec(&indexer_job).unwrap();
// let deserialized: IndexerJob = rmp_serde::from_slice(&serialized).unwrap();
// // Verify key fields are preserved
// assert_eq!(indexer_job.location_id, deserialized.location_id);
// // Test TestJob serialization
// let test_job = TestJob::new("test".to_string(), 100, false);
// let serialized = rmp_serde::to_vec(&test_job).unwrap();
// let deserialized: TestJob = rmp_serde::from_slice(&serialized).unwrap();
// assert_eq!(test_job.name, deserialized.name);
// assert_eq!(test_job.sleep_ms, deserialized.sleep_ms);
// assert_eq!(test_job.should_fail, deserialized.should_fail);
// }
// #[tokio::test]
// async fn test_job_database_operations() {
// let temp_dir = TempDir::new().unwrap();
// let job_manager = JobManager::new(temp_dir.path().to_path_buf()).await.unwrap();
// // Test listing empty jobs
// let jobs = job_manager.list_jobs(None).await.unwrap();
// assert!(jobs.is_empty());
// // Test queued jobs (empty initially)
// let queued = job_manager.list_jobs(Some(JobStatus::Queued)).await.unwrap();
// assert!(queued.is_empty());
// // Test job status filtering
// let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await.unwrap();
// assert!(running_jobs.is_empty());
// let completed_jobs = job_manager.list_jobs(Some(JobStatus::Completed)).await.unwrap();
// assert!(completed_jobs.is_empty());
// job_manager.shutdown().await.unwrap();
// }
// #[tokio::test]
// async fn test_job_constants_and_metadata() {
// // Test job constants are properly defined
// assert_eq!(FileCopyJob::NAME, "file_copy");
// assert_eq!(FileCopyJob::RESUMABLE, true);
// assert_eq!(IndexerJob::NAME, "indexer");
// assert_eq!(IndexerJob::RESUMABLE, true);
// assert_eq!(TestJob::NAME, "test_job");
// assert_eq!(TestJob::RESUMABLE, true);
// // Test job schemas
// let copy_schema = FileCopyJob::schema();
// assert_eq!(copy_schema.name, "file_copy");
// assert_eq!(copy_schema.version, 1);
// let indexer_schema = IndexerJob::schema();
// assert_eq!(indexer_schema.name, "indexer");
// assert_eq!(indexer_schema.version, 1);
// let test_schema = TestJob::schema();
// assert_eq!(test_schema.name, "test_job");
// assert_eq!(test_schema.version, 1);
// }
// #[tokio::test]
// async fn test_job_progress_types() {
// use sd_core_new::infrastructure::jobs::progress::Progress;
// // Test percentage progress
// let percentage = Progress::percentage(0.75);
// match percentage {
// Progress::Percentage(percent) => {
// assert_eq!(percent, 0.75);
// }
// _ => panic!("Expected percentage progress"),
// }
// // Test structured progress
// let test_progress = TestProgress {
// current: 3,
// total: 10,
// message: "Test message".to_string(),
// };
// let structured = Progress::structured(test_progress.clone());
// match structured {
// Progress::Structured(data) => {
// let deserialized: TestProgress = serde_json::from_value(data).unwrap();
// assert_eq!(deserialized.current, test_progress.current);
// assert_eq!(deserialized.total, test_progress.total);
// assert_eq!(deserialized.message, test_progress.message);
// }
// _ => panic!("Expected structured progress"),
// }
// }
// #[tokio::test]
// async fn test_job_error_types() {
// // Test different error types
// let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
// let job_error = JobError::from(io_error);
// match job_error {
// JobError::Io(e) => {
// assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
// }
// _ => panic!("Expected IO error"),
// }
// let execution_error = JobError::execution("Execution error message");
// match execution_error {
// JobError::ExecutionFailed(msg) => {
// assert_eq!(msg, "Execution error message");
// }
// _ => panic!("Expected execution error"),
// }
// let interrupted_error = JobError::Interrupted;
// match interrupted_error {
// JobError::Interrupted => {
// // Expected
// }
// _ => panic!("Expected interrupted error"),
// }
// }
// #[tokio::test]
// async fn test_job_output_types() {
// // Test different output types
// let copied_output = JobOutput::FileCopy {
// copied_count: 95,
// total_bytes: 1024 * 1024,
// };
// match copied_output {
// JobOutput::FileCopy { copied_count, total_bytes } => {
// assert_eq!(copied_count, 95);
// assert_eq!(total_bytes, 1024 * 1024);
// }
// _ => panic!("Expected file copy output"),
// }
// let indexed_output = JobOutput::Indexed {
// total_files: 500,
// total_dirs: 50,
// total_bytes: 10 * 1024 * 1024,
// };
// match indexed_output {
// JobOutput::Indexed { total_files, total_dirs, total_bytes } => {
// assert_eq!(total_files, 500);
// assert_eq!(total_dirs, 50);
// assert_eq!(total_bytes, 10 * 1024 * 1024);
// }
// _ => panic!("Expected indexed output"),
// }
// let custom_data = serde_json::json!({
// "test": "value",
// "number": 42
// });
// let custom_output = JobOutput::Custom(custom_data.clone());
// match custom_output {
// JobOutput::Custom(data) => {
// assert_eq!(data, custom_data);
// }
// _ => panic!("Expected custom output"),
// }
// }
// #[tokio::test]
// async fn test_job_id_generation() {
// // Test that JobIds are unique
// let id1 = JobId::new();
// let id2 = JobId::new();
// assert_ne!(id1, id2);
// // Test string conversion
// let id_str = id1.to_string();
// assert!(!id_str.is_empty());
// // Test that IDs are valid UUIDs
// let parsed = Uuid::parse_str(&id_str);
// assert!(parsed.is_ok());
// }
// #[tokio::test]
// async fn test_job_status_transitions() {
// // Test status equality and display
// assert_eq!(JobStatus::Queued, JobStatus::Queued);
// assert_ne!(JobStatus::Queued, JobStatus::Running);
// // Test string conversion
// assert_eq!(JobStatus::Queued.to_string(), "queued");
// assert_eq!(JobStatus::Running.to_string(), "running");
// assert_eq!(JobStatus::Completed.to_string(), "completed");
// assert_eq!(JobStatus::Failed.to_string(), "failed");
// assert_eq!(JobStatus::Cancelled.to_string(), "cancelled");
// assert_eq!(JobStatus::Paused.to_string(), "paused");
// }
// #[tokio::test]
// async fn test_job_context_functionality() {
// // This test verifies JobContext methods work correctly
// // Since JobContext requires a full job execution environment,
// // we test that the types and structures are correct
// let temp_dir = TempDir::new().unwrap();
// let job_manager = JobManager::new(temp_dir.path().to_path_buf()).await.unwrap();
// // Test that the job manager can be created and shut down
// job_manager.shutdown().await.unwrap();
// // Test that job context structures are properly defined
// // (Full context testing would require running actual jobs)
// assert!(true); // Placeholder for context method tests
// }
// #[tokio::test]
// async fn test_job_system_concurrency() {
// // Test that multiple JobManagers can be created independently
// let temp_dir1 = TempDir::new().unwrap();
// let temp_dir2 = TempDir::new().unwrap();
// let manager1 = JobManager::new(temp_dir1.path().to_path_buf()).await.unwrap();
// let manager2 = JobManager::new(temp_dir2.path().to_path_buf()).await.unwrap();
// // Both should work independently
// let jobs1 = manager1.list_jobs(None).await.unwrap();
// let jobs2 = manager2.list_jobs(None).await.unwrap();
// assert!(jobs1.is_empty());
// assert!(jobs2.is_empty());
// // Shutdown both
// manager1.shutdown().await.unwrap();
// manager2.shutdown().await.unwrap();
// }
// #[tokio::test]
// async fn test_job_system_persistence() {
// let temp_dir = TempDir::new().unwrap();
// let data_dir = temp_dir.path().to_path_buf();
// // Create manager, verify database
// let manager1 = JobManager::new(data_dir.clone()).await.unwrap();
// assert!(data_dir.join("jobs.db").exists());
// manager1.shutdown().await.unwrap();
// // Create new manager with same directory - should reuse database
// let manager2 = JobManager::new(data_dir.clone()).await.unwrap();
// let jobs = manager2.list_jobs(None).await.unwrap();
// assert!(jobs.is_empty()); // Should start empty but database should exist
// manager2.shutdown().await.unwrap();
// }

View File

@@ -1,65 +0,0 @@
//! Simple test to verify LibP2P mDNS discovery works between separate processes
//! This isolates the networking layer from the full pairing protocol
use std::process::{Command, Stdio};
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_mdns_discovery_between_processes() {
println!("🧪 Testing basic mDNS discovery between two LibP2P processes");
// Start Alice (listener) process
println!("🟦 Starting Alice (mDNS listener)...");
let mut alice = Command::new("cargo")
.args(&["run", "--bin", "mdns_test_helper", "--", "listen"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start Alice process");
// Give Alice time to start listening
tokio::time::sleep(Duration::from_secs(2)).await;
// Start Bob (discoverer) process
println!("🟨 Starting Bob (mDNS discoverer)...");
let bob_result = timeout(
Duration::from_secs(10),
async {
Command::new("cargo")
.args(&["run", "--bin", "mdns_test_helper", "--", "discover"])
.output()
.expect("Failed to start Bob process")
}
).await;
// Kill Alice process
let _ = alice.kill();
let alice_output = alice.wait_with_output().expect("Failed to read Alice output");
match bob_result {
Ok(bob_output) => {
let alice_stdout = String::from_utf8_lossy(&alice_output.stdout);
let bob_stdout = String::from_utf8_lossy(&bob_output.stdout);
println!("📤 Alice output:\n{}", alice_stdout);
println!("📥 Bob output:\n{}", bob_stdout);
// Check if discovery succeeded
if bob_stdout.contains("PEER_DISCOVERED") {
println!("✅ mDNS discovery successful!");
} else {
println!("❌ mDNS discovery failed");
println!("Alice stderr: {}", String::from_utf8_lossy(&alice_output.stderr));
println!("Bob stderr: {}", String::from_utf8_lossy(&bob_output.stderr));
panic!("mDNS discovery between processes failed");
}
}
Err(_) => {
let alice_stdout = String::from_utf8_lossy(&alice_output.stdout);
println!("📤 Alice output:\n{}", alice_stdout);
println!("⏰ Bob discovery timed out after 10 seconds");
panic!("mDNS discovery timed out");
}
}
}