diff --git a/core-new/JOB_CLEANUP_FIX.md b/core-new/JOB_CLEANUP_FIX.md deleted file mode 100644 index 6f9929410..000000000 --- a/core-new/JOB_CLEANUP_FIX.md +++ /dev/null @@ -1,46 +0,0 @@ -# Job Manager Memory Cleanup Fix - -## Problem -The job manager was not removing completed jobs from the `running_jobs` HashMap, causing: -- Memory leak as the map only grew over time -- Stale status being reported (jobs showing as "Running" even when database showed "Completed") -- Race condition where in-memory status took precedence over database status - -## Root Cause -In `JobExecutor::run()`, when a job completed: -1. The database status was updated to "Completed" ✓ -2. The status channel was updated ✓ -3. But the job was never removed from `JobManager.running_jobs` ✗ - -## Solution -Added cleanup tasks that monitor job status changes and remove jobs from `running_jobs` when they complete, fail, or are cancelled. - -### Changes Made - -1. **Job Dispatch Cleanup** (lines 256-301) - - Added a monitoring task that subscribes to status changes - - Removes job from `running_jobs` on terminal states (Completed/Failed/Cancelled) - - Emits appropriate events (JobCompleted, JobFailed, JobCancelled) - -2. **Job Dispatch with Priority Cleanup** (lines 482-527) - - Same cleanup logic for the priority dispatch method - -3. **Resumed Jobs Cleanup** (lines 920-965) - - Added cleanup for jobs that are resumed from interruption - - Ensures resumed jobs are also properly cleaned up - -4. **Event Emission** - - Now properly emits JobCompleted, JobFailed, and JobCancelled events - - These events were defined but never actually emitted before - -## Testing -Created `examples/test_job_cleanup.rs` to verify: -- Jobs are removed from memory when they complete -- Database and in-memory status stay synchronized -- No memory leaks from accumulating completed jobs - -## Benefits -- Fixes memory leak in long-running systems -- Ensures accurate job status reporting -- Eliminates race condition between database and memory state -- Properly emits job lifecycle events for monitoring \ No newline at end of file diff --git a/core-new/examples/test_job_cleanup.rs b/core-new/examples/test_job_cleanup.rs deleted file mode 100644 index d3d95f145..000000000 --- a/core-new/examples/test_job_cleanup.rs +++ /dev/null @@ -1,122 +0,0 @@ -use sd_core_new::{ - context::CoreContext, - infrastructure::jobs::{Job, JobHandler, JobInfo, JobOutput, JobResult, Progress}, - library::{Library, LibraryConfig, LibraryManager}, -}; -use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::time::sleep; -use tracing::{info, Level}; -use tracing_subscriber::FmtSubscriber; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct TestJob { - duration_secs: u64, -} - -#[sd_core_new::infrastructure::jobs::job] -impl Job for TestJob { - const NAME: &'static str = "test_job"; - - async fn run( - mut self, - ctx: sd_core_new::infrastructure::jobs::context::JobContext, - ) -> JobResult { - info!("Test job started, will run for {} seconds", self.duration_secs); - - // Simulate work with progress updates - for i in 0..self.duration_secs { - ctx.progress(Progress::percentage(i as f32 / self.duration_secs as f32)).await; - sleep(Duration::from_secs(1)).await; - } - - ctx.progress(Progress::percentage(1.0)).await; - info!("Test job completed"); - - Ok(JobOutput::Empty) - } -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // Initialize tracing - let subscriber = FmtSubscriber::builder() - .with_max_level(Level::INFO) - .finish(); - tracing::subscriber::set_global_default(subscriber)?; - - // Create temp directory for testing - let temp_dir = tempfile::tempdir()?; - let data_dir = temp_dir.path().to_path_buf(); - - // Initialize core context - let context = Arc::new(CoreContext::new(data_dir.clone()).await?); - - // Create a test library - let library_manager = context.library_manager.clone(); - let library_config = LibraryConfig { - name: "Test Library".to_string(), - description: Some("Test library for job cleanup".to_string()), - }; - - let library = library_manager.create_library(library_config).await?; - info!("Created library: {}", library.id); - - // Get the job manager - let job_manager = library.job_manager.clone(); - - // Dispatch a test job - let test_job = TestJob { duration_secs: 3 }; - let handle = job_manager.dispatch(test_job).await?; - let job_id = handle.id; - - info!("Dispatched job: {}", job_id); - - // Monitor the job status - let mut last_status = None; - for i in 0..10 { - sleep(Duration::from_secs(1)).await; - - // Check if job is in running_jobs - let running_jobs = job_manager.list_running_jobs().await; - let in_memory = running_jobs.iter().any(|j| j.id == job_id.0); - - // Check database status - let job_info = job_manager.get_job_info(job_id.0).await?; - - if let Some(info) = &job_info { - let status_changed = last_status.as_ref() != Some(&info.status); - if status_changed { - info!( - "Job {} status: {:?}, progress: {:.1}%, in_memory: {}", - job_id, - info.status, - info.progress * 100.0, - in_memory - ); - last_status = Some(info.status.clone()); - } - - // Check if job completed - if matches!(info.status, sd_core_new::infrastructure::jobs::types::JobStatus::Completed) { - info!("Job completed! Checking if removed from memory..."); - sleep(Duration::from_millis(100)).await; // Give cleanup task time to run - - let running_jobs_after = job_manager.list_running_jobs().await; - let still_in_memory = running_jobs_after.iter().any(|j| j.id == job_id.0); - - if still_in_memory { - eprintln!("ERROR: Job {} is still in running_jobs map after completion!", job_id); - } else { - info!("SUCCESS: Job {} was properly removed from running_jobs map", job_id); - } - break; - } - } - } - - // Final check - let final_running = job_manager.list_running_jobs().await; - info!("Final running jobs count: {}", final_running.len()); - - Ok(()) -} \ No newline at end of file diff --git a/core-new/fix_job_system.md b/core-new/fix_job_system.md deleted file mode 100644 index 2fe75c6d2..000000000 --- a/core-new/fix_job_system.md +++ /dev/null @@ -1,48 +0,0 @@ -# Job System Issues Fix Summary - -## Issues Identified - -1. **Duplicate Job Creation**: Two indexer jobs are created for one location add -2. **Progress Always Shows 0%**: Jobs complete but progress remains at 0% -3. **Status Shows "Queued"**: Completed jobs show as "Queued" instead of "Completed" -4. **Non-linear Progress**: Progress jumps around instead of increasing smoothly - -## Root Causes - -### 1. Duplicate Jobs -- The CLI might be sending duplicate requests -- Or there's a race condition in job creation -- Need to add deduplication logic - -### 2. Progress Not Updating -- Progress forwarding is working in JobManager -- But jobs might be completing too fast for progress to be captured -- Need to ensure final progress is always saved - -### 3. Status Issues -- Jobs complete quickly but status might not be persisted properly -- Need better status synchronization - -### 4. Non-linear Progress -- Already fixed by implementing phase-based progress ranges -- Discovery: 5-10%, Processing: 20-60%, Content: 70-98%, Finalizing: 99% - -## Fixes Implemented - -1. **Progress Persistence**: Added batched progress updates to database -2. **Status Synchronization**: Final progress saved with status updates -3. **Unified Query System**: Combined memory and database queries -4. **Progress Calculation**: Fixed phase-based progress calculations - -## Additional Fixes Needed - -1. **Deduplication**: Add request ID or check for existing jobs before creating new ones -2. **Minimum Job Duration**: Ensure jobs run long enough to be monitored -3. **Better Error Handling**: Log all job state transitions - -## Testing - -After applying fixes: -- Run `sd location add` and check for duplicate jobs with `sd job list` -- Monitor with `sd job monitor` to see real-time progress -- Verify completed jobs show 100% and "Completed" status \ No newline at end of file diff --git a/core-new/tests/README.md b/core-new/tests/README.md deleted file mode 100644 index 7b7ccf1cf..000000000 --- a/core-new/tests/README.md +++ /dev/null @@ -1,219 +0,0 @@ -# Spacedrive Core Integration Tests - -This directory contains integration tests for Spacedrive Core v2, focusing on end-to-end functionality and real-world usage scenarios. - -## Test Files - -### `cli_pairing_integration.rs` - -**Complete CLI pairing workflow testing** - -Tests the full device pairing functionality that would be used by the CLI: - -- Two Core instances simulating different devices -- Pairing code generation and joining -- Automatic device registration -- Persistent connections across restarts -- Error handling and edge cases -- Session management APIs - -**Key Tests:** - -- `test_cli_pairing_full_workflow` - Complete pairing between two devices -- `test_cli_pairing_error_conditions` - Error handling and invalid inputs -- `test_cli_pairing_session_management` - Session lifecycle management - -### `integration_networking.rs` - -**Basic networking functionality** - -Tests core networking initialization and basic functionality: - -- Core networking initialization -- Device pairing integration -- Spacedrop API integration -- Networking service features - -### Other Integration Tests - -- `cas_generation_test.rs` - Content addressable storage testing -- `file_copy_integration_test.rs` - File operations testing -- `copy_progress_monitor_test.rs` - **Copy progress monitoring with large files** - - Tests smooth byte-level progress updates for 1GB file copies - - Verifies progress doesn't jump in large increments - - Tests multi-file copy progress aggregation -- `copy_progress_quick_test.rs` - **Quick copy progress verification** - - Faster variant using smaller files (50MB) - - Tests directory copy progress - - Suitable for rapid iteration during development -- `job_registration_test.rs` - Job system testing -- `job_system_test.rs` - Advanced job system functionality -- `library_test.rs` - Library management testing -- `volume_test.rs` - Volume detection and management - -## Running Tests - -### Run All Integration Tests - -```bash -cargo test --tests -``` - -### Run Specific Test File - -```bash -# CLI pairing tests -cargo test cli_pairing_integration - -# Networking tests -cargo test integration_networking - -# CAS tests -cargo test cas_generation_test - -# Copy progress monitoring tests -cargo test copy_progress_monitor_test - -# Quick progress tests (faster) -cargo test copy_progress_quick_test -``` - -### Run Specific Test Function - -```bash -# Full CLI pairing workflow -cargo test test_cli_pairing_full_workflow - -# Error condition testing -cargo test test_cli_pairing_error_conditions -``` - -### Debug Mode with Logging - -```bash -# Show all output and enable debug logging -RUST_LOG=debug cargo test cli_pairing_integration -- --nocapture - -# Show output for specific networking components -RUST_LOG=sd_core_new::networking=debug cargo test cli_pairing_integration -- --nocapture - -# Show libp2p debug logs -RUST_LOG=libp2p_swarm=debug,sd_core_new::networking::pairing::protocol=debug cargo test cli_pairing_integration -- --nocapture -``` - -### Single-threaded Testing - -```bash -# Run tests one at a time (useful for networking tests) -cargo test cli_pairing_integration -- --test-threads=1 -``` - -## Test Environment Notes - -### CLI Pairing Tests - -- Creates temporary directories for test data -- Tests real libp2p networking (may be slow in CI) -- Includes timeout handling for network operations -- Tests persistence across Core restarts -- Cleans up temporary files automatically - -### Network-dependent Tests - -Some tests require actual network functionality: - -- May be slower in CI environments -- May timeout if network discovery fails -- Include graceful fallbacks for network issues - -### Local Development - -For faster iteration during development: - -```bash -# Run only fast tests -cargo test --tests --exclude cli_pairing_integration - -# Run CLI pairing tests with shorter timeouts -RUST_LOG=info cargo test test_cli_pairing_error_conditions -- --nocapture - -# Run quick copy progress test instead of the full 1GB test -cargo test test_copy_progress_quick -- --nocapture - -# Run with debug logging to see detailed progress updates -RUST_LOG=debug cargo test copy_progress_monitor_test -- --nocapture -``` - -### Copy Progress Tests - -The copy progress tests verify smooth byte-level progress updates: - -- **Full test** (`copy_progress_monitor_test`): Uses 1GB files, takes 1-2 minutes -- **Quick test** (`copy_progress_quick_test`): Uses 50MB files, takes ~10 seconds - -Both tests verify that progress updates smoothly rather than jumping in large increments (e.g., 25% at a time for 4 files). - -## Test Data - -Tests create temporary directories in the system temp directory: - -- Pattern: `/tmp/test-{test-name}-{uuid}` -- Automatically cleaned up after tests -- Safe to delete manually if tests are interrupted - -## Debugging Failed Tests - -### Network Issues - -```bash -# Check if networking is working -RUST_LOG=libp2p=debug cargo test test_cli_pairing_full_workflow -- --nocapture - -# Test with extended timeouts -RUST_TEST_TIMEOUT=120 cargo test cli_pairing_integration -``` - -### Permission Issues - -```bash -# Ensure temp directory is writable -ls -la /tmp/ - -# Check if ports are available -netstat -ln | grep 52063 -``` - -### Cleanup Issues - -```bash -# Clean up any remaining test directories -rm -rf /tmp/test-* -``` - -## Contributing - -When adding new integration tests: - -1. **Follow the naming convention**: `{feature}_integration.rs` -2. **Include comprehensive documentation** in the file header -3. **Add cleanup code** for any resources created -4. **Handle timeouts gracefully** for network operations -5. **Test both success and error conditions** -6. **Update this README** with new test descriptions - -### Example Test Structure - -```rust -#[tokio::test] -async fn test_new_feature_integration() { - // Setup - let temp_dir = std::env::temp_dir().join(format!("test-feature-{}", Uuid::new_v4())); - std::fs::create_dir_all(&temp_dir).unwrap(); - - // Test logic - // ... - - // Cleanup - std::fs::remove_dir_all(&temp_dir).ok(); -} -``` diff --git a/core-new/tests/copy_action_dispatch_test.rs b/core-new/tests/copy_action_dispatch_test.rs deleted file mode 100644 index f6185ecde..000000000 --- a/core-new/tests/copy_action_dispatch_test.rs +++ /dev/null @@ -1,136 +0,0 @@ -//! Test copy action dispatch without full job execution -//! -//! This test verifies that the action system can properly dispatch copy actions -//! and validate them correctly. - -use sd_core_new::{ - operations::files::copy::{ - action::{FileCopyAction, FileCopyHandler}, - job::CopyOptions, - }, - infrastructure::actions::{ - Action, - handler::ActionHandler, - }, -}; -use std::path::PathBuf; -use tempfile::TempDir; -use tokio::fs; -use uuid::Uuid; - -/// Helper to create test files with content -async fn create_test_file(path: &std::path::Path, content: &str) -> Result<(), std::io::Error> { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - fs::write(path, content).await -} - -#[tokio::test] -async fn test_copy_action_handler_validation() { - // Setup test environment - let temp_dir = TempDir::new().unwrap(); - let test_root = temp_dir.path(); - - // Create source and destination directories - let source_dir = test_root.join("source"); - let dest_dir = test_root.join("destination"); - fs::create_dir_all(&source_dir).await.unwrap(); - fs::create_dir_all(&dest_dir).await.unwrap(); - - // Create test files - let source_file1 = source_dir.join("test1.txt"); - let source_file2 = source_dir.join("test2.txt"); - - create_test_file(&source_file1, "Hello, World! This is test file 1.").await.unwrap(); - create_test_file(&source_file2, "This is the content of test file 2.").await.unwrap(); - - // Create a copy action with valid sources - let copy_action = FileCopyAction { - sources: vec![source_file1.clone(), source_file2.clone()], - destination: dest_dir.clone(), - options: CopyOptions::default(), - }; - - let library_id = Uuid::new_v4(); - let action = Action::FileCopy { - library_id, - action: copy_action, - }; - - // Test handler can handle this action - let handler = FileCopyHandler::new(); - assert!(handler.can_handle(&action)); - assert_eq!(FileCopyHandler::supported_actions(), &["file.copy"]); - - // Test validation without context (should pass basic validation) - // Note: We can't test full validation without a proper CoreContext - // but we can test the basic structure - - println!("✅ Copy action handler validation test passed!"); -} - -#[tokio::test] -async fn test_copy_action_validation_errors() { - // Test with empty sources - should fail validation - let copy_action = FileCopyAction { - sources: vec![], // Empty sources - destination: PathBuf::from("/tmp/dest"), - options: CopyOptions::default(), - }; - - let library_id = Uuid::new_v4(); - let action = Action::FileCopy { - library_id, - action: copy_action, - }; - - // Create handler - let handler = FileCopyHandler::new(); - - // Test that handler can handle the action type - assert!(handler.can_handle(&action)); - - // For actual validation testing, we would need a proper CoreContext - // which requires full core initialization. The validation logic - // is in the handler's validate method. - - println!("✅ Copy action validation errors test passed!"); -} - -#[test] -fn test_copy_action_metadata() { - let source_file = PathBuf::from("/tmp/source.txt"); - let dest_dir = PathBuf::from("/tmp/dest"); - - let copy_action = FileCopyAction { - sources: vec![source_file.clone()], - destination: dest_dir.clone(), - options: CopyOptions::default(), - }; - - let library_id = Uuid::new_v4(); - let action = Action::FileCopy { - library_id, - action: copy_action, - }; - - // Test action metadata - assert_eq!(action.library_id(), Some(library_id)); - assert_eq!(action.kind(), "file.copy"); - - let description = action.description(); - assert!(description.contains("Copy")); - assert!(description.contains("1 file(s)")); - assert!(description.contains("/tmp/dest")); - - let targets = action.targets_summary(); - let sources = targets.get("sources").unwrap(); - let destination = targets.get("destination").unwrap(); - - assert!(sources.is_array()); - assert_eq!(sources.as_array().unwrap().len(), 1); - assert_eq!(destination.as_str().unwrap(), "/tmp/dest"); - - println!("✅ Copy action metadata test passed!"); -} \ No newline at end of file diff --git a/core-new/tests/copy_action_integration_test.rs b/core-new/tests/copy_action_integration_test.rs deleted file mode 100644 index ac6ab9fb9..000000000 --- a/core-new/tests/copy_action_integration_test.rs +++ /dev/null @@ -1,396 +0,0 @@ -//! Integration tests for the file copy action system -//! -//! This test verifies the complete flow from action dispatch through job execution, -//! file system operations, and audit logging. - -use sd_core_new::{ - infrastructure::{ - actions::{manager::ActionManager, Action}, - database::entities::{audit_log, AuditLog}, - jobs::types::{JobId, JobStatus}, - }, - operations::files::{ - copy::{ - action::FileCopyAction, - job::{CopyOptions, MoveMode}, - }, - input::CopyMethod, - }, - Core, -}; -use sea_orm::{EntityTrait, QuerySelect}; -use std::{path::PathBuf, sync::Arc, time::Duration}; -use tempfile::TempDir; -use tokio::{fs, time::timeout}; -use uuid::Uuid; - -/// Helper to create test files with content -async fn create_test_file(path: &std::path::Path, content: &str) -> Result<(), std::io::Error> { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - fs::write(path, content).await -} - -/// Helper to verify file content matches expected -async fn verify_file_content( - path: &std::path::Path, - expected: &str, -) -> Result { - let content = fs::read_to_string(path).await?; - Ok(content == expected) -} - -#[tokio::test] -async fn test_copy_action_full_integration() { - // Setup test environment - let temp_dir = TempDir::new().unwrap(); - let test_root = temp_dir.path(); - - // Create source and destination directories - let source_dir = test_root.join("source"); - let dest_dir = test_root.join("destination"); - fs::create_dir_all(&source_dir).await.unwrap(); - fs::create_dir_all(&dest_dir).await.unwrap(); - - // Create test files - let source_file1 = source_dir.join("test1.txt"); - let source_file2 = source_dir.join("test2.txt"); - let dest_file1 = dest_dir.join("test1.txt"); - let dest_file2 = dest_dir.join("test2.txt"); - - create_test_file(&source_file1, "Hello, World! This is test file 1.") - .await - .unwrap(); - create_test_file(&source_file2, "This is the content of test file 2.") - .await - .unwrap(); - - // Initialize core with custom data directory - let core_data_dir = test_root.join("core_data"); - let core = Core::new_with_config(core_data_dir).await.unwrap(); - - // Create a test library - let library = core - .libraries - .create_library("Copy Test Library", None, core.context.clone()) - .await - .unwrap(); - - let library_id = library.id(); - - // Create ActionManager - let context = core.context.clone(); - let action_manager = ActionManager::new(context); - - // Build the copy action - let copy_action = FileCopyAction { - sources: vec![source_file1.clone(), source_file2.clone()], - destination: dest_dir.clone(), - options: CopyOptions { - overwrite: false, - copy_method: CopyMethod::Auto, - verify_checksum: true, - preserve_timestamps: true, - delete_after_copy: false, - move_mode: None, - }, - }; - - // Create the Action enum with library context - let action = Action::FileCopy { - library_id, - action: copy_action, - }; - - // Record initial state - let initial_audit_count = count_audit_entries(&library, library_id).await; - - // Verify source files exist and destination files don't - assert!(source_file1.exists()); - assert!(source_file2.exists()); - assert!(!dest_file1.exists()); - assert!(!dest_file2.exists()); - - // ===== Execute the action ===== - let action_output = action_manager - .dispatch(action) - .await - .expect("Action dispatch should succeed"); - - // Verify action output - assert_eq!(action_output.output_type, "file.copy.dispatched"); - assert!(action_output.data.get("job_id").is_some()); - assert!(action_output.message.contains("Dispatched file copy job")); - - // Extract job ID from output - let job_id_value = action_output.data.get("job_id").unwrap(); - let job_id_str = job_id_value.as_str().expect("job_id should be a string"); - let job_id = Uuid::parse_str(job_id_str).expect("job_id should be valid UUID"); - - // ===== Wait for job completion ===== - // Poll job status until completion (with timeout) - let job_completion = timeout(Duration::from_secs(30), async { - loop { - if let Some(job_handle) = library.jobs().get_job(JobId::from(job_id)).await { - let status = job_handle.status(); - if matches!(status, JobStatus::Completed | JobStatus::Failed) { - return status; - } - } - - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .expect("Job should complete within timeout"); - - // Verify job completed successfully - assert!( - matches!(job_completion, JobStatus::Completed), - "Job should complete successfully" - ); - - // ===== Verify file system changes ===== - // Check that files were copied successfully - assert!(dest_file1.exists(), "Destination file 1 should exist"); - assert!(dest_file2.exists(), "Destination file 2 should exist"); - - // Verify file contents match - assert!( - verify_file_content(&dest_file1, "Hello, World! This is test file 1.") - .await - .unwrap(), - "Destination file 1 content should match source" - ); - assert!( - verify_file_content(&dest_file2, "This is the content of test file 2.") - .await - .unwrap(), - "Destination file 2 content should match source" - ); - - // Verify source files still exist (copy, not move) - assert!(source_file1.exists(), "Source file 1 should still exist"); - assert!(source_file2.exists(), "Source file 2 should still exist"); - - // ===== Verify audit log ===== - let final_audit_count = count_audit_entries(&library, library_id).await; - assert_eq!( - final_audit_count, - initial_audit_count + 1, - "Should have one new audit log entry" - ); - - // Get the audit log entry - let audit_entries = get_recent_audit_entries(&library, library_id, 1).await; - assert_eq!( - audit_entries.len(), - 1, - "Should have exactly one audit entry" - ); - - let audit_entry = &audit_entries[0]; - assert_eq!(audit_entry.action_type, "file.copy"); - assert_eq!(audit_entry.status, audit_log::ActionStatus::Completed); - assert!( - audit_entry.job_id.is_some(), - "Audit entry should have job_id" - ); - assert_eq!(audit_entry.job_id.as_ref().unwrap(), &job_id.to_string()); - assert!( - audit_entry.completed_at.is_some(), - "Audit entry should have completion time" - ); - assert!( - audit_entry.error_message.is_none(), - "Audit entry should not have error message" - ); - - // Verify audit entry targets contain source and destination info (now stored as JSON string) - let targets_json: serde_json::Value = serde_json::from_str(&audit_entry.targets).unwrap(); - assert!( - targets_json.get("sources").is_some(), - "Audit should contain sources" - ); - assert!( - targets_json.get("destination").is_some(), - "Audit should contain destination" - ); - - let sources = targets_json.get("sources").unwrap().as_array().unwrap(); - assert_eq!(sources.len(), 2, "Should have 2 source files in audit"); - - println!("✅ Copy action integration test passed!"); - println!(" - Action dispatched successfully"); - println!(" - Job executed and completed"); - println!(" - Files copied correctly"); - println!(" - Audit log entry created"); -} - -#[tokio::test] -async fn test_copy_action_with_move_operation() { - // Setup test environment - let temp_dir = TempDir::new().unwrap(); - let test_root = temp_dir.path(); - - let source_dir = test_root.join("source"); - let dest_dir = test_root.join("destination"); - fs::create_dir_all(&source_dir).await.unwrap(); - fs::create_dir_all(&dest_dir).await.unwrap(); - - // Create test file - let source_file = source_dir.join("move_test.txt"); - let dest_file = dest_dir.join("move_test.txt"); - - create_test_file(&source_file, "This file will be moved.") - .await - .unwrap(); - - // Initialize core and library - let core_data_dir = test_root.join("core_data"); - let core = Core::new_with_config(core_data_dir).await.unwrap(); - let library = core - .libraries - .create_library("Move Test Library", None, core.context.clone()) - .await - .unwrap(); - let library_id = library.id(); - - // Create ActionManager - let context = core.context.clone(); - let action_manager = ActionManager::new(context); - - // Build move action (copy with delete_after_copy) - let copy_action = FileCopyAction { - sources: vec![source_file.clone()], - destination: dest_file.clone(), - options: CopyOptions { - overwrite: false, - copy_method: CopyMethod::Auto, - verify_checksum: false, - preserve_timestamps: true, - delete_after_copy: true, // This makes it a move operation - move_mode: Some(MoveMode::Move), - }, - }; - - let action = Action::FileCopy { - library_id, - action: copy_action, - }; - - // Verify initial state - assert!(source_file.exists()); - assert!(!dest_file.exists()); - - // Execute the move action - let action_output = action_manager - .dispatch(action) - .await - .expect("Move action should succeed"); - - // Extract and wait for job completion - let job_id_str = action_output.data.get("job_id").unwrap().as_str().unwrap(); - let job_id = Uuid::parse_str(job_id_str).unwrap(); - - // Wait for job completion - timeout(Duration::from_secs(15), async { - loop { - if let Some(job_handle) = library.jobs().get_job(JobId::from(job_id)).await { - let status = job_handle.status(); - if matches!(status, JobStatus::Completed | JobStatus::Failed) { - break; - } - } - tokio::time::sleep(Duration::from_millis(50)).await; - } - }) - .await - .expect("Move job should complete"); - - // Verify file was moved (destination exists, source doesn't) - assert!( - dest_file.exists(), - "Destination file should exist after move" - ); - assert!( - !source_file.exists(), - "Source file should not exist after move" - ); - - // Verify content - assert!( - verify_file_content(&dest_file, "This file will be moved.") - .await - .unwrap(), - "Moved file content should match" - ); - - println!("✅ Move operation test passed!"); -} - -#[tokio::test] -async fn test_copy_action_validation_errors() { - let temp_dir = TempDir::new().unwrap(); - let core_data_dir = temp_dir.path().join("core_data"); - let core = Core::new_with_config(core_data_dir).await.unwrap(); - let library = core - .libraries - .create_library("Validation Test Library", None, core.context.clone()) - .await - .unwrap(); - let library_id = library.id(); - - let context = core.context.clone(); - let action_manager = ActionManager::new(context); - - // Test 1: Empty sources should fail validation - let invalid_action = Action::FileCopy { - library_id, - action: FileCopyAction { - sources: vec![], // Empty sources - destination: PathBuf::from("/tmp/dest"), - options: CopyOptions::default(), - }, - }; - - let result = action_manager.dispatch(invalid_action).await; - assert!( - result.is_err(), - "Empty sources should cause validation error" - ); - - let error = result.unwrap_err(); - assert!( - error.to_string().contains("At least one source"), - "Error should mention source requirement" - ); - - println!("✅ Validation error test passed!"); -} - -/// Helper function to count audit log entries for a library -async fn count_audit_entries( - library: &Arc, - _library_id: Uuid, -) -> usize { - let db = library.db().conn(); - - AuditLog::find().all(db).await.unwrap_or_default().len() -} - -/// Helper function to get recent audit log entries -async fn get_recent_audit_entries( - library: &Arc, - _library_id: Uuid, - limit: u64, -) -> Vec { - let db = library.db().conn(); - - AuditLog::find() - .limit(limit) - .all(db) - .await - .unwrap_or_default() -} diff --git a/core-new/tests/copy_action_simple_test.rs b/core-new/tests/copy_action_test.rs similarity index 100% rename from core-new/tests/copy_action_simple_test.rs rename to core-new/tests/copy_action_test.rs diff --git a/core-new/tests/copy_progress_monitor_test.rs b/core-new/tests/copy_progress_test.rs similarity index 100% rename from core-new/tests/copy_progress_monitor_test.rs rename to core-new/tests/copy_progress_test.rs diff --git a/core-new/tests/test_device_persistence.rs b/core-new/tests/device_persistence_test.rs similarity index 86% rename from core-new/tests/test_device_persistence.rs rename to core-new/tests/device_persistence_test.rs index 1e153267d..5edca02f2 100644 --- a/core-new/tests/test_device_persistence.rs +++ b/core-new/tests/device_persistence_test.rs @@ -71,12 +71,18 @@ async fn alice_persistence_scenario() { let connected_devices = core.get_connected_devices().await.unwrap(); if !connected_devices.is_empty() { println!("🎉 Alice: Auto-reconnection successful!"); - println!("✅ Alice: Connected {} devices after restart", connected_devices.len()); + println!( + "✅ Alice: Connected {} devices after restart", + connected_devices.len() + ); // Verify it's Bob let device_info = core.get_connected_devices_info().await.unwrap(); let bob_found = device_info.iter().any(|d| d.device_name.contains("Bob")); - assert!(bob_found, "Bob not found in connected devices after restart"); + assert!( + bob_found, + "Bob not found in connected devices after restart" + ); for device in &device_info { println!( @@ -86,7 +92,11 @@ async fn alice_persistence_scenario() { } // Write success marker - std::fs::write("/tmp/spacedrive-persistence-test/alice_restart_success.txt", "success").unwrap(); + std::fs::write( + "/tmp/spacedrive-persistence-test/alice_restart_success.txt", + "success", + ) + .unwrap(); println!("✅ Alice: Device persistence test completed successfully"); break; } @@ -97,7 +107,10 @@ async fn alice_persistence_scenario() { } if attempts % 5 == 0 { - println!("🔍 Alice: Auto-reconnection check {} - waiting for Bob", attempts / 5); + println!( + "🔍 Alice: Auto-reconnection check {} - waiting for Bob", + attempts / 5 + ); } } } else { @@ -141,7 +154,10 @@ async fn alice_persistence_scenario() { panic!("Networking not initialized"); }; - println!("✅ Alice: Pairing code generated (expires in {}s)", expires_in); + println!( + "✅ Alice: Pairing code generated (expires in {}s)", + expires_in + ); // Write pairing code for Bob std::fs::create_dir_all("/tmp/spacedrive-persistence-test").unwrap(); @@ -168,12 +184,22 @@ async fn alice_persistence_scenario() { if let Some(networking) = core.networking() { let registry = networking.device_registry(); let paired_devices = registry.read().await.get_paired_devices(); - assert!(!paired_devices.is_empty(), "No paired devices found in registry"); - println!("✅ Alice: {} devices persisted to registry", paired_devices.len()); + assert!( + !paired_devices.is_empty(), + "No paired devices found in registry" + ); + println!( + "✅ Alice: {} devices persisted to registry", + paired_devices.len() + ); } // Write success marker - std::fs::write("/tmp/spacedrive-persistence-test/alice_paired.txt", "success").unwrap(); + std::fs::write( + "/tmp/spacedrive-persistence-test/alice_paired.txt", + "success", + ) + .unwrap(); // Keep running for a bit to ensure persistence completes tokio::time::sleep(Duration::from_secs(3)).await; @@ -251,12 +277,18 @@ async fn bob_persistence_scenario() { let connected_devices = core.get_connected_devices().await.unwrap(); if !connected_devices.is_empty() { println!("🎉 Bob: Auto-reconnection successful!"); - println!("✅ Bob: Connected {} devices after restart", connected_devices.len()); + println!( + "✅ Bob: Connected {} devices after restart", + connected_devices.len() + ); // Verify it's Alice let device_info = core.get_connected_devices_info().await.unwrap(); let alice_found = device_info.iter().any(|d| d.device_name.contains("Alice")); - assert!(alice_found, "Alice not found in connected devices after restart"); + assert!( + alice_found, + "Alice not found in connected devices after restart" + ); for device in &device_info { println!( @@ -266,7 +298,11 @@ async fn bob_persistence_scenario() { } // Write success marker - std::fs::write("/tmp/spacedrive-persistence-test/bob_restart_success.txt", "success").unwrap(); + std::fs::write( + "/tmp/spacedrive-persistence-test/bob_restart_success.txt", + "success", + ) + .unwrap(); println!("✅ Bob: Device persistence test completed successfully"); break; } @@ -277,7 +313,10 @@ async fn bob_persistence_scenario() { } if attempts % 5 == 0 { - println!("🔍 Bob: Auto-reconnection check {} - waiting for Alice", attempts / 5); + println!( + "🔍 Bob: Auto-reconnection check {} - waiting for Alice", + attempts / 5 + ); } } } else { @@ -310,7 +349,9 @@ async fn bob_persistence_scenario() { // Wait for pairing code from Alice println!("🔍 Bob: Looking for pairing code..."); let pairing_code = loop { - if let Ok(code) = std::fs::read_to_string("/tmp/spacedrive-persistence-test/pairing_code.txt") { + if let Ok(code) = + std::fs::read_to_string("/tmp/spacedrive-persistence-test/pairing_code.txt") + { break code.trim().to_string(); } tokio::time::sleep(Duration::from_millis(500)).await; @@ -348,12 +389,19 @@ async fn bob_persistence_scenario() { if let Some(networking) = core.networking() { let registry = networking.device_registry(); let paired_devices = registry.read().await.get_paired_devices(); - assert!(!paired_devices.is_empty(), "No paired devices found in registry"); - println!("✅ Bob: {} devices persisted to registry", paired_devices.len()); + assert!( + !paired_devices.is_empty(), + "No paired devices found in registry" + ); + println!( + "✅ Bob: {} devices persisted to registry", + paired_devices.len() + ); } // Write success marker - std::fs::write("/tmp/spacedrive-persistence-test/bob_paired.txt", "success").unwrap(); + std::fs::write("/tmp/spacedrive-persistence-test/bob_paired.txt", "success") + .unwrap(); // Keep running for a bit to ensure persistence completes tokio::time::sleep(Duration::from_secs(3)).await; @@ -409,18 +457,17 @@ async fn test_device_persistence() { // Wait for initial pairing to complete let pairing_result = runner - .wait_for_condition( - Duration::from_secs(60), - |_| { - let alice_paired = std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_paired.txt") + .wait_for_success(|_| { + let alice_paired = + std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_paired.txt") .map(|content| content.trim() == "success") .unwrap_or(false); - let bob_paired = std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_paired.txt") + let bob_paired = + std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_paired.txt") .map(|content| content.trim() == "success") .unwrap_or(false); - alice_paired && bob_paired - } - ) + alice_paired && bob_paired + }) .await; if pairing_result.is_err() { @@ -460,12 +507,15 @@ async fn test_device_persistence() { // Wait for auto-reconnection let reconnection_result = runner .wait_for_success(|_| { - let alice_reconnected = std::fs::read_to_string("/tmp/spacedrive-persistence-test/alice_restart_success.txt") - .map(|content| content.trim() == "success") - .unwrap_or(false); - let bob_reconnected = std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_restart_success.txt") - .map(|content| content.trim() == "success") - .unwrap_or(false); + let alice_reconnected = std::fs::read_to_string( + "/tmp/spacedrive-persistence-test/alice_restart_success.txt", + ) + .map(|content| content.trim() == "success") + .unwrap_or(false); + let bob_reconnected = + std::fs::read_to_string("/tmp/spacedrive-persistence-test/bob_restart_success.txt") + .map(|content| content.trim() == "success") + .unwrap_or(false); alice_reconnected && bob_reconnected }) .await; @@ -483,4 +533,4 @@ async fn test_device_persistence() { panic!("Devices did not automatically reconnect after restart"); } } -} \ No newline at end of file +} diff --git a/core-new/tests/event_system_test.rs b/core-new/tests/event_system_test.rs new file mode 100644 index 000000000..c986b00ac --- /dev/null +++ b/core-new/tests/event_system_test.rs @@ -0,0 +1,408 @@ +//! Event System Integration Test +//! +//! Tests the event bus functionality by performing various operations +//! and verifying that the correct events are emitted. This includes: +//! - Core lifecycle events (CoreShutdown) +//! - Library management events (LibraryCreated, LibraryOpened, LibraryClosed) +//! - Location and indexing events (LocationAdded, IndexingStarted) +//! - Job system events (JobProgress, JobCompleted) +//! - Event filtering capabilities (library-specific filtering) +//! - Multiple concurrent subscribers +//! - Custom event emission and handling +//! +//! Note: These tests should be run with --test-threads=1 to avoid +//! potential conflicts between tests + +use sd_core_new::{ + infrastructure::events::{Event, EventFilter}, + location::{create_location, LocationCreateArgs, IndexMode}, + Core, +}; +use std::collections::HashSet; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::sync::Mutex; +use tokio::time::{timeout, Duration}; + +#[tokio::test] +async fn test_core_and_library_events() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + + // Set up event collection + let collected_events = Arc::new(Mutex::new(Vec::new())); + let events_clone = collected_events.clone(); + + // Initialize core + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + // Note: CoreStarted is emitted during core initialization, so we won't catch it + // Start collecting events from now on + let mut event_subscriber = core.events.subscribe(); + let event_collector = tokio::spawn(async move { + while let Ok(event) = event_subscriber.recv().await { + events_clone.lock().await.push(event); + } + }); + + // Wait a bit for CoreStarted event + tokio::time::sleep(Duration::from_millis(100)).await; + + // Test 1: Library creation + let library = core + .libraries + .create_library("Test Event Library", None, core.context.clone()) + .await?; + let library_id = library.id(); + + // Wait for events to be processed + tokio::time::sleep(Duration::from_millis(100)).await; + + // Test 2: Library operations + let library_path = library.path().to_path_buf(); + drop(library); // Drop the Arc to release the library + core.libraries.close_library(library_id).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + // Open library again by path with context + let library = core.libraries.open_library_with_context(&library_path, core.context.clone()).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + // Test 3: Shutdown + drop(library); + core.shutdown().await?; + + // Wait for shutdown event to be processed + tokio::time::sleep(Duration::from_millis(100)).await; + + // Stop event collector + event_collector.abort(); + + // Verify collected events + let events = collected_events.lock().await; + + // Check for expected events + let event_types: HashSet = events.iter().map(|e| match e { + Event::CoreStarted => "CoreStarted".to_string(), + Event::CoreShutdown => "CoreShutdown".to_string(), + Event::LibraryCreated { .. } => "LibraryCreated".to_string(), + Event::LibraryOpened { .. } => "LibraryOpened".to_string(), + Event::LibraryClosed { .. } => "LibraryClosed".to_string(), + _ => format!("Other({:?})", e), + }).collect(); + + println!("Collected events: {:?}", event_types); + + // Verify core events (CoreStarted was emitted before we subscribed) + assert!(event_types.contains("CoreShutdown"), "Should emit CoreShutdown event"); + + // Verify library events + assert!(event_types.contains("LibraryCreated"), "Should emit LibraryCreated event"); + assert!(event_types.contains("LibraryClosed"), "Should emit LibraryClosed event"); + assert!(event_types.contains("LibraryOpened"), "Should emit LibraryOpened event"); + + Ok(()) +} + +#[tokio::test] +async fn test_location_and_job_events() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + // Create library + let library = core + .libraries + .create_library("Test Location Events", None, core.context.clone()) + .await?; + + // Set up filtered event collection - only job and indexing events + let job_events = Arc::new(Mutex::new(Vec::new())); + let job_events_clone = job_events.clone(); + + let mut event_subscriber = core.events.subscribe(); + let event_collector = tokio::spawn(async move { + while let Ok(event) = event_subscriber.recv().await { + if event.is_job_event() || matches!(event, + Event::IndexingStarted { .. } | + Event::IndexingProgress { .. } | + Event::IndexingCompleted { .. } | + Event::IndexingFailed { .. } | + Event::LocationAdded { .. } + ) { + job_events_clone.lock().await.push(event); + } + } + }); + + // Create test location + let test_location_dir = temp_dir.path().join("test_location"); + tokio::fs::create_dir_all(&test_location_dir).await?; + tokio::fs::write(test_location_dir.join("test.txt"), "Hello World").await?; + + // Register device + let db = library.db(); + let device = core.device.to_device()?; + + use sd_core_new::infrastructure::database::entities; + use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; + + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await? + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await? + } + }; + + // Add location (triggers indexing job) + let location_args = LocationCreateArgs { + path: test_location_dir.clone(), + name: Some("Test Location".to_string()), + index_mode: IndexMode::Shallow, + }; + + let _location_id = create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await?; + + // Wait for indexing to complete + tokio::time::sleep(Duration::from_secs(2)).await; + + // Stop collector and check events + event_collector.abort(); + + let events = job_events.lock().await; + let event_types: Vec = events.iter().map(|e| match e { + Event::JobQueued { .. } => "JobQueued".to_string(), + Event::JobStarted { .. } => "JobStarted".to_string(), + Event::JobProgress { .. } => "JobProgress".to_string(), + Event::JobCompleted { .. } => "JobCompleted".to_string(), + Event::IndexingStarted { .. } => "IndexingStarted".to_string(), + Event::IndexingProgress { .. } => "IndexingProgress".to_string(), + Event::IndexingCompleted { .. } => "IndexingCompleted".to_string(), + Event::LocationAdded { .. } => "LocationAdded".to_string(), + _ => format!("Other({:?})", e), + }).collect(); + + println!("Job-related events: {:?}", event_types); + + // Verify job events (Note: JobQueued might not be emitted if job starts immediately) + assert!( + event_types.contains(&"JobQueued".to_string()) || + event_types.contains(&"JobStarted".to_string()) || + event_types.contains(&"JobProgress".to_string()) || + event_types.contains(&"JobCompleted".to_string()), + "Should emit at least one job event" + ); + + // Verify location event + assert!(event_types.contains(&"LocationAdded".to_string()), "Should emit LocationAdded event"); + + // Cleanup + let lib_id = library.id(); + core.libraries.close_library(lib_id).await?; + drop(library); + core.shutdown().await?; + + Ok(()) +} + +#[tokio::test] +async fn test_event_filtering() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + // Create two libraries + let library1 = core + .libraries + .create_library("Library 1", None, core.context.clone()) + .await?; + let lib1_id = library1.id(); + + let library2 = core + .libraries + .create_library("Library 2", None, core.context.clone()) + .await?; + let lib2_id = library2.id(); + + // Set up filtered event collection - only library1 events + let lib1_events = Arc::new(Mutex::new(Vec::new())); + let lib1_events_clone = lib1_events.clone(); + + let event_subscriber = core.events.subscribe(); + let event_collector = tokio::spawn(async move { + let mut subscriber = event_subscriber; + loop { + match timeout( + Duration::from_millis(100), + subscriber.recv_filtered(|e| e.is_for_library(lib1_id)) + ).await { + Ok(Ok(event)) => { + lib1_events_clone.lock().await.push(event); + } + _ => break, + } + } + }); + + // Perform operations on both libraries + core.libraries.close_library(lib1_id).await?; + core.libraries.close_library(lib2_id).await?; + + // Wait and stop collector + tokio::time::sleep(Duration::from_millis(500)).await; + event_collector.abort(); + + // Check filtered events + let events = lib1_events.lock().await; + for event in events.iter() { + match event { + Event::LibraryCreated { id, .. } | + Event::LibraryClosed { id, .. } => { + assert_eq!(id, &lib1_id, "Should only receive library1 events"); + } + _ => {} + } + } + + // Cleanup + drop(library1); + drop(library2); + core.shutdown().await?; + + Ok(()) +} + +#[tokio::test] +async fn test_concurrent_event_subscribers() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + // Create multiple subscribers + let subscriber1_events = Arc::new(Mutex::new(Vec::new())); + let subscriber2_events = Arc::new(Mutex::new(Vec::new())); + let subscriber3_events = Arc::new(Mutex::new(Vec::new())); + + let events1 = subscriber1_events.clone(); + let events2 = subscriber2_events.clone(); + let events3 = subscriber3_events.clone(); + + let mut sub1 = core.events.subscribe(); + let mut sub2 = core.events.subscribe(); + let mut sub3 = core.events.subscribe(); + + // Start collectors + let collector1 = tokio::spawn(async move { + while let Ok(event) = sub1.recv().await { + if matches!(event, Event::LibraryCreated { .. }) { + events1.lock().await.push(event); + } + } + }); + + let collector2 = tokio::spawn(async move { + while let Ok(event) = sub2.recv().await { + if matches!(event, Event::LibraryCreated { .. }) { + events2.lock().await.push(event); + } + } + }); + + let collector3 = tokio::spawn(async move { + while let Ok(event) = sub3.recv().await { + if matches!(event, Event::LibraryCreated { .. }) { + events3.lock().await.push(event); + } + } + }); + + // Create a library (should be received by all subscribers) + let library = core + .libraries + .create_library("Broadcast Test", None, core.context.clone()) + .await?; + + // Wait for events + tokio::time::sleep(Duration::from_millis(200)).await; + + // Stop collectors + collector1.abort(); + collector2.abort(); + collector3.abort(); + + // Verify all subscribers received the event + assert_eq!(subscriber1_events.lock().await.len(), 1, "Subscriber 1 should receive event"); + assert_eq!(subscriber2_events.lock().await.len(), 1, "Subscriber 2 should receive event"); + assert_eq!(subscriber3_events.lock().await.len(), 1, "Subscriber 3 should receive event"); + + // Cleanup + let lib_id = library.id(); + core.libraries.close_library(lib_id).await?; + drop(library); + core.shutdown().await?; + + Ok(()) +} + +#[tokio::test] +async fn test_custom_events() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + let collected_events = Arc::new(Mutex::new(Vec::new())); + let events_clone = collected_events.clone(); + + let mut event_subscriber = core.events.subscribe(); + let event_collector = tokio::spawn(async move { + while let Ok(event) = event_subscriber.recv().await { + if matches!(event, Event::Custom { .. }) { + events_clone.lock().await.push(event); + } + } + }); + + // Emit custom events + let custom_data = serde_json::json!({ + "action": "test_action", + "value": 42, + "message": "Custom event test" + }); + + core.events.emit(Event::Custom { + event_type: "test_event".to_string(), + data: custom_data.clone(), + }); + + // Wait for event processing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Stop collector + event_collector.abort(); + + // Verify custom event + let events = collected_events.lock().await; + assert_eq!(events.len(), 1, "Should receive one custom event"); + + if let Event::Custom { event_type, data } = &events[0] { + assert_eq!(event_type, "test_event"); + assert_eq!(data, &custom_data); + } else { + panic!("Expected custom event"); + } + + // Test subscriber count + let subscriber_count = core.events.subscriber_count(); + println!("Event subscribers: {}", subscriber_count); + assert!(subscriber_count > 0, "Should have active subscribers"); + + core.shutdown().await?; + + Ok(()) +} \ No newline at end of file diff --git a/core-new/tests/test_core_file_transfer.rs b/core-new/tests/file_transfer_test.rs similarity index 100% rename from core-new/tests/test_core_file_transfer.rs rename to core-new/tests/file_transfer_test.rs diff --git a/core-new/tests/indexing_test.rs b/core-new/tests/indexing_test.rs new file mode 100644 index 000000000..de82a31bf --- /dev/null +++ b/core-new/tests/indexing_test.rs @@ -0,0 +1,361 @@ +//! Indexing Integration Test +//! +//! Tests the production indexer functionality including: +//! - Location creation and indexing +//! - Smart filtering of system files +//! - Inode tracking for incremental indexing +//! - Event monitoring during indexing +//! - Database persistence of indexed entries +//! +//! Note: These tests should be run with --test-threads=1 to avoid +//! device UUID conflicts when multiple tests run in parallel + +use sd_core_new::{ + infrastructure::database::entities, + location::{create_location, LocationCreateArgs, IndexMode}, + Core, +}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, +}; +use tempfile::TempDir; +use tokio::time::Duration; + +#[tokio::test] +async fn test_location_indexing() -> Result<(), Box> { + // 1. Setup test environment + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + // 2. Create library + let library = core + .libraries + .create_library("Test Indexing Library", None, core.context.clone()) + .await?; + + // 3. Create test location directory with some files + let test_location_dir = temp_dir.path().join("test_location"); + tokio::fs::create_dir_all(&test_location_dir).await?; + + // Create test files + tokio::fs::write(test_location_dir.join("test1.txt"), "Hello World").await?; + tokio::fs::write(test_location_dir.join("test2.rs"), "fn main() {}").await?; + tokio::fs::create_dir_all(test_location_dir.join("subdir")).await?; + tokio::fs::write(test_location_dir.join("subdir/test3.md"), "# Test").await?; + + // Create files that should be filtered + tokio::fs::write(test_location_dir.join(".DS_Store"), "system file").await?; + tokio::fs::create_dir_all(test_location_dir.join("node_modules")).await?; + tokio::fs::write(test_location_dir.join("node_modules/package.json"), "{}").await?; + + // 4. Register device in database + let db = library.db(); + let device = core.device.to_device()?; + + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await? + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await? + } + }; + + // 5. Set up to monitor job completion + // Note: Due to current implementation, IndexingCompleted event may not be emitted + // So we'll monitor job status directly instead + + // 6. Create location and trigger indexing + let location_args = LocationCreateArgs { + path: test_location_dir.clone(), + name: Some("Test Location".to_string()), + index_mode: IndexMode::Deep, + }; + + let location_db_id = create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await?; + + // 7. Wait for indexing to complete by monitoring job status + let start_time = tokio::time::Instant::now(); + let timeout_duration = Duration::from_secs(30); + + let mut job_seen = false; + let mut last_entry_count = 0; + let mut stable_count_iterations = 0; + + loop { + // Check all job statuses + let all_jobs = library.jobs().list_jobs(None).await?; + let running_jobs = library + .jobs() + .list_jobs(Some( + sd_core_new::infrastructure::jobs::types::JobStatus::Running, + )) + .await?; + + // If we see a running job, mark that we've seen it + if !running_jobs.is_empty() { + job_seen = true; + } + + // Check if any entries have been created (partial progress) + let current_entries = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .count(db.conn()) + .await?; + + println!("Job status - Total: {}, Running: {}, Entries indexed: {}", + all_jobs.len(), running_jobs.len(), current_entries); + + // If we've seen a job and now there are no jobs, indexing likely completed + if job_seen && all_jobs.is_empty() && current_entries > 0 { + // Wait for entries to stabilize + if current_entries == last_entry_count { + stable_count_iterations += 1; + if stable_count_iterations >= 3 { + println!("Indexing appears complete (job finished, entries stable)"); + break; + } + } else { + stable_count_iterations = 0; + } + last_entry_count = current_entries; + } + + // Check for failed jobs + let failed_jobs = library + .jobs() + .list_jobs(Some( + sd_core_new::infrastructure::jobs::types::JobStatus::Failed, + )) + .await?; + + if !failed_jobs.is_empty() { + panic!("Indexing job failed"); + } + + // Check timeout + if start_time.elapsed() > timeout_duration { + panic!("Indexing timed out after {:?}", timeout_duration); + } + + // Wait a bit before checking again + tokio::time::sleep(Duration::from_millis(500)).await; + } + + // 8. Verify indexed entries in database + let _entry_count = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .count(db.conn()) + .await?; + + let file_count = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .filter(entities::entry::Column::Kind.eq(0)) // Files + .count(db.conn()) + .await?; + + let dir_count = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .filter(entities::entry::Column::Kind.eq(1)) // Directories + .count(db.conn()) + .await?; + + // 9. Verify smart filtering worked + let all_entries = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .all(db.conn()) + .await?; + + // Check that filtered files are not indexed + for entry in &all_entries { + assert_ne!(entry.name, ".DS_Store", "System files should be filtered"); + assert_ne!(entry.name, "node_modules", "Dev directories should be filtered"); + } + + // 10. Verify expected counts + assert_eq!(file_count, 3, "Should index 3 files (excluding filtered)"); + assert!(dir_count >= 1, "Should index at least 1 directory (subdir)"); + + // 11. Verify inode tracking + let entries_with_inodes = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .filter(entities::entry::Column::Inode.is_not_null()) + .count(db.conn()) + .await?; + + assert!(entries_with_inodes > 0, "Entries should have inode tracking"); + + // 12. Cleanup + let lib_id = library.id(); + core.libraries.close_library(lib_id).await?; + drop(library); + + core.shutdown().await?; + + Ok(()) +} + +#[tokio::test] +async fn test_incremental_indexing() -> Result<(), Box> { + // 1. Setup + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + let library = core + .libraries + .create_library("Test Incremental Library", None, core.context.clone()) + .await?; + + let test_location_dir = temp_dir.path().join("incremental_test"); + tokio::fs::create_dir_all(&test_location_dir).await?; + + // Initial files + tokio::fs::write(test_location_dir.join("file1.txt"), "Initial content").await?; + tokio::fs::write(test_location_dir.join("file2.txt"), "More content").await?; + + // Register device + let db = library.db(); + let device = core.device.to_device()?; + + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await? + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await? + } + }; + + // 2. First indexing run + let location_args = LocationCreateArgs { + path: test_location_dir.clone(), + name: Some("Incremental Test".to_string()), + index_mode: IndexMode::Deep, + }; + + let location_db_id = create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await?; + + // Wait for initial indexing to complete + let start_time = tokio::time::Instant::now(); + let timeout_duration = Duration::from_secs(10); + let mut job_seen = false; + + loop { + let running_jobs = library + .jobs() + .list_jobs(Some( + sd_core_new::infrastructure::jobs::types::JobStatus::Running, + )) + .await?; + + if !running_jobs.is_empty() { + job_seen = true; + } + + let current_entries = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .count(db.conn()) + .await?; + + if job_seen && running_jobs.is_empty() && current_entries > 0 { + break; + } + + if start_time.elapsed() > timeout_duration { + break; // Don't fail, just continue + } + + tokio::time::sleep(Duration::from_millis(200)).await; + } + + let initial_file_count = entities::entry::Entity::find() + .filter(entities::entry::Column::LocationId.eq(location_db_id)) + .filter(entities::entry::Column::Kind.eq(0)) + .count(db.conn()) + .await?; + + assert_eq!(initial_file_count, 2, "Should index 2 initial files"); + + // Cleanup + let lib_id = library.id(); + core.libraries.close_library(lib_id).await?; + drop(library); + + core.shutdown().await?; + + Ok(()) +} + +#[tokio::test] +async fn test_indexing_error_handling() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; + + let library = core + .libraries + .create_library("Test Error Library", None, core.context.clone()) + .await?; + + // Try to index non-existent location + let non_existent = temp_dir.path().join("does_not_exist"); + + let db = library.db(); + let device = core.device.to_device()?; + + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await? + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await? + } + }; + + let location_args = LocationCreateArgs { + path: non_existent, + name: Some("Non-existent".to_string()), + index_mode: IndexMode::Deep, + }; + + // This should handle the error gracefully + let result = create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await; + + // The location creation should fail for non-existent path + assert!(result.is_err(), "Should fail to create location for non-existent path"); + + // Cleanup + let lib_id = library.id(); + core.libraries.close_library(lib_id).await?; + drop(library); + + core.shutdown().await?; + + Ok(()) +} \ No newline at end of file diff --git a/core-new/tests/job_system_test.rs b/core-new/tests/job_system_test.rs deleted file mode 100644 index c6f62f5a6..000000000 --- a/core-new/tests/job_system_test.rs +++ /dev/null @@ -1,421 +0,0 @@ -// //! Integration tests for the job system - -// use sd_core_new::{ -// infrastructure::jobs::{ -// manager::JobManager, -// traits::{Job, JobHandler}, -// types::{JobId, JobStatus}, -// context::JobContext, -// error::{JobError, JobResult}, -// progress::Progress, -// output::JobOutput, -// prelude::JobProgress, -// }, -// operations::{ -// file_ops::copy_job::FileCopyJob, -// indexing::indexer_job::{IndexerJob, IndexMode}, -// }, -// shared::types::{SdPath, SdPathBatch}, -// }; -// use serde::{Deserialize, Serialize}; -// use std::{ -// path::PathBuf, -// time::Duration, -// }; -// use tempfile::TempDir; -// use uuid::Uuid; - -// // Simple test job for testing basic functionality -// #[derive(Debug, Serialize, Deserialize)] -// struct TestJob { -// name: String, -// sleep_ms: u64, -// should_fail: bool, -// counter: u32, -// } - -// #[derive(Debug, Clone, Serialize, Deserialize)] -// struct TestProgress { -// current: u32, -// total: u32, -// message: String, -// } - -// impl JobProgress for TestProgress {} - -// impl Job for TestJob { -// const NAME: &'static str = "test_job"; -// const RESUMABLE: bool = true; -// const DESCRIPTION: Option<&'static str> = Some("Simple test job"); -// } - -// #[async_trait::async_trait] -// impl JobHandler for TestJob { -// type Output = TestOutput; - -// async fn run(&mut self, ctx: JobContext<'_>) -> JobResult { -// ctx.log(format!("Starting test job: {}", self.name)); - -// if self.should_fail { -// return Err(JobError::execution("Test failure")); -// } - -// // Simulate work with progress updates -// for i in 0..5 { -// ctx.check_interrupt().await?; - -// self.counter += 1; - -// ctx.progress(Progress::structured(TestProgress { -// current: i + 1, -// total: 5, -// message: format!("Processing step {}", i + 1), -// })); - -// if self.sleep_ms > 0 { -// tokio::time::sleep(Duration::from_millis(self.sleep_ms)).await; -// } - -// // Checkpoint every 2 steps -// if i % 2 == 1 { -// ctx.checkpoint().await?; -// } -// } - -// ctx.log("Test job completed successfully"); - -// Ok(TestOutput { -// name: self.name.clone(), -// final_counter: self.counter, -// }) -// } -// } - -// #[derive(Debug, Serialize, Deserialize)] -// struct TestOutput { -// name: String, -// final_counter: u32, -// } - -// impl From for JobOutput { -// fn from(output: TestOutput) -> Self { -// JobOutput::custom(output) -// } -// } - -// impl TestJob { -// fn new(name: String, sleep_ms: u64, should_fail: bool) -> Self { -// Self { -// name, -// sleep_ms, -// should_fail, -// counter: 0, -// } -// } -// } - -// #[tokio::test] -// async fn test_job_manager_initialization() { -// let temp_dir = TempDir::new().unwrap(); -// let data_dir = temp_dir.path().to_path_buf(); - -// // Initialize job manager -// let job_manager = JobManager::new(data_dir.clone()).await.unwrap(); - -// // Verify database was created -// assert!(data_dir.join("jobs.db").exists()); - -// // Test basic operations -// let jobs = job_manager.list_jobs(None).await.unwrap(); -// assert!(jobs.is_empty()); - -// // Shutdown cleanly -// job_manager.shutdown().await.unwrap(); -// } - -// #[tokio::test] -// async fn test_job_serialization() { -// // Test FileCopyJob serialization -// let device_id = Uuid::new_v4(); -// let sources = vec![ -// SdPath::new(device_id, PathBuf::from("/test/file1.txt")), -// SdPath::new(device_id, PathBuf::from("/test/file2.txt")), -// ]; -// let destination = SdPath::new(device_id, PathBuf::from("/dest")); - -// let sources_batch = SdPathBatch::new(sources); -// let copy_job = FileCopyJob::new(sources_batch, destination); - -// // Serialize and deserialize -// let serialized = rmp_serde::to_vec(©_job).unwrap(); -// let deserialized: FileCopyJob = rmp_serde::from_slice(&serialized).unwrap(); - -// assert_eq!(copy_job.sources.paths.len(), deserialized.sources.paths.len()); - -// // Test IndexerJob serialization -// let indexer_job = IndexerJob::new( -// Uuid::new_v4(), -// SdPath::new(device_id, PathBuf::from("/index/path")), -// IndexMode::Deep, -// ); - -// let serialized = rmp_serde::to_vec(&indexer_job).unwrap(); -// let deserialized: IndexerJob = rmp_serde::from_slice(&serialized).unwrap(); - -// // Verify key fields are preserved -// assert_eq!(indexer_job.location_id, deserialized.location_id); - -// // Test TestJob serialization -// let test_job = TestJob::new("test".to_string(), 100, false); -// let serialized = rmp_serde::to_vec(&test_job).unwrap(); -// let deserialized: TestJob = rmp_serde::from_slice(&serialized).unwrap(); - -// assert_eq!(test_job.name, deserialized.name); -// assert_eq!(test_job.sleep_ms, deserialized.sleep_ms); -// assert_eq!(test_job.should_fail, deserialized.should_fail); -// } - -// #[tokio::test] -// async fn test_job_database_operations() { -// let temp_dir = TempDir::new().unwrap(); -// let job_manager = JobManager::new(temp_dir.path().to_path_buf()).await.unwrap(); - -// // Test listing empty jobs -// let jobs = job_manager.list_jobs(None).await.unwrap(); -// assert!(jobs.is_empty()); - -// // Test queued jobs (empty initially) -// let queued = job_manager.list_jobs(Some(JobStatus::Queued)).await.unwrap(); -// assert!(queued.is_empty()); - -// // Test job status filtering -// let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await.unwrap(); -// assert!(running_jobs.is_empty()); - -// let completed_jobs = job_manager.list_jobs(Some(JobStatus::Completed)).await.unwrap(); -// assert!(completed_jobs.is_empty()); - -// job_manager.shutdown().await.unwrap(); -// } - -// #[tokio::test] -// async fn test_job_constants_and_metadata() { -// // Test job constants are properly defined -// assert_eq!(FileCopyJob::NAME, "file_copy"); -// assert_eq!(FileCopyJob::RESUMABLE, true); - -// assert_eq!(IndexerJob::NAME, "indexer"); -// assert_eq!(IndexerJob::RESUMABLE, true); - -// assert_eq!(TestJob::NAME, "test_job"); -// assert_eq!(TestJob::RESUMABLE, true); - -// // Test job schemas -// let copy_schema = FileCopyJob::schema(); -// assert_eq!(copy_schema.name, "file_copy"); -// assert_eq!(copy_schema.version, 1); - -// let indexer_schema = IndexerJob::schema(); -// assert_eq!(indexer_schema.name, "indexer"); -// assert_eq!(indexer_schema.version, 1); - -// let test_schema = TestJob::schema(); -// assert_eq!(test_schema.name, "test_job"); -// assert_eq!(test_schema.version, 1); -// } - -// #[tokio::test] -// async fn test_job_progress_types() { -// use sd_core_new::infrastructure::jobs::progress::Progress; - -// // Test percentage progress -// let percentage = Progress::percentage(0.75); -// match percentage { -// Progress::Percentage(percent) => { -// assert_eq!(percent, 0.75); -// } -// _ => panic!("Expected percentage progress"), -// } - -// // Test structured progress -// let test_progress = TestProgress { -// current: 3, -// total: 10, -// message: "Test message".to_string(), -// }; - -// let structured = Progress::structured(test_progress.clone()); -// match structured { -// Progress::Structured(data) => { -// let deserialized: TestProgress = serde_json::from_value(data).unwrap(); -// assert_eq!(deserialized.current, test_progress.current); -// assert_eq!(deserialized.total, test_progress.total); -// assert_eq!(deserialized.message, test_progress.message); -// } -// _ => panic!("Expected structured progress"), -// } -// } - -// #[tokio::test] -// async fn test_job_error_types() { -// // Test different error types -// let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found"); -// let job_error = JobError::from(io_error); - -// match job_error { -// JobError::Io(e) => { -// assert_eq!(e.kind(), std::io::ErrorKind::NotFound); -// } -// _ => panic!("Expected IO error"), -// } - -// let execution_error = JobError::execution("Execution error message"); -// match execution_error { -// JobError::ExecutionFailed(msg) => { -// assert_eq!(msg, "Execution error message"); -// } -// _ => panic!("Expected execution error"), -// } - -// let interrupted_error = JobError::Interrupted; -// match interrupted_error { -// JobError::Interrupted => { -// // Expected -// } -// _ => panic!("Expected interrupted error"), -// } -// } - -// #[tokio::test] -// async fn test_job_output_types() { -// // Test different output types -// let copied_output = JobOutput::FileCopy { -// copied_count: 95, -// total_bytes: 1024 * 1024, -// }; - -// match copied_output { -// JobOutput::FileCopy { copied_count, total_bytes } => { -// assert_eq!(copied_count, 95); -// assert_eq!(total_bytes, 1024 * 1024); -// } -// _ => panic!("Expected file copy output"), -// } - -// let indexed_output = JobOutput::Indexed { -// total_files: 500, -// total_dirs: 50, -// total_bytes: 10 * 1024 * 1024, -// }; - -// match indexed_output { -// JobOutput::Indexed { total_files, total_dirs, total_bytes } => { -// assert_eq!(total_files, 500); -// assert_eq!(total_dirs, 50); -// assert_eq!(total_bytes, 10 * 1024 * 1024); -// } -// _ => panic!("Expected indexed output"), -// } - -// let custom_data = serde_json::json!({ -// "test": "value", -// "number": 42 -// }); - -// let custom_output = JobOutput::Custom(custom_data.clone()); - -// match custom_output { -// JobOutput::Custom(data) => { -// assert_eq!(data, custom_data); -// } -// _ => panic!("Expected custom output"), -// } -// } - -// #[tokio::test] -// async fn test_job_id_generation() { -// // Test that JobIds are unique -// let id1 = JobId::new(); -// let id2 = JobId::new(); - -// assert_ne!(id1, id2); - -// // Test string conversion -// let id_str = id1.to_string(); -// assert!(!id_str.is_empty()); - -// // Test that IDs are valid UUIDs -// let parsed = Uuid::parse_str(&id_str); -// assert!(parsed.is_ok()); -// } - -// #[tokio::test] -// async fn test_job_status_transitions() { -// // Test status equality and display -// assert_eq!(JobStatus::Queued, JobStatus::Queued); -// assert_ne!(JobStatus::Queued, JobStatus::Running); - -// // Test string conversion -// assert_eq!(JobStatus::Queued.to_string(), "queued"); -// assert_eq!(JobStatus::Running.to_string(), "running"); -// assert_eq!(JobStatus::Completed.to_string(), "completed"); -// assert_eq!(JobStatus::Failed.to_string(), "failed"); -// assert_eq!(JobStatus::Cancelled.to_string(), "cancelled"); -// assert_eq!(JobStatus::Paused.to_string(), "paused"); -// } - -// #[tokio::test] -// async fn test_job_context_functionality() { -// // This test verifies JobContext methods work correctly -// // Since JobContext requires a full job execution environment, -// // we test that the types and structures are correct - -// let temp_dir = TempDir::new().unwrap(); -// let job_manager = JobManager::new(temp_dir.path().to_path_buf()).await.unwrap(); - -// // Test that the job manager can be created and shut down -// job_manager.shutdown().await.unwrap(); - -// // Test that job context structures are properly defined -// // (Full context testing would require running actual jobs) -// assert!(true); // Placeholder for context method tests -// } - -// #[tokio::test] -// async fn test_job_system_concurrency() { -// // Test that multiple JobManagers can be created independently -// let temp_dir1 = TempDir::new().unwrap(); -// let temp_dir2 = TempDir::new().unwrap(); - -// let manager1 = JobManager::new(temp_dir1.path().to_path_buf()).await.unwrap(); -// let manager2 = JobManager::new(temp_dir2.path().to_path_buf()).await.unwrap(); - -// // Both should work independently -// let jobs1 = manager1.list_jobs(None).await.unwrap(); -// let jobs2 = manager2.list_jobs(None).await.unwrap(); - -// assert!(jobs1.is_empty()); -// assert!(jobs2.is_empty()); - -// // Shutdown both -// manager1.shutdown().await.unwrap(); -// manager2.shutdown().await.unwrap(); -// } - -// #[tokio::test] -// async fn test_job_system_persistence() { -// let temp_dir = TempDir::new().unwrap(); -// let data_dir = temp_dir.path().to_path_buf(); - -// // Create manager, verify database -// let manager1 = JobManager::new(data_dir.clone()).await.unwrap(); -// assert!(data_dir.join("jobs.db").exists()); -// manager1.shutdown().await.unwrap(); - -// // Create new manager with same directory - should reuse database -// let manager2 = JobManager::new(data_dir.clone()).await.unwrap(); -// let jobs = manager2.list_jobs(None).await.unwrap(); -// assert!(jobs.is_empty()); // Should start empty but database should exist - -// manager2.shutdown().await.unwrap(); -// } diff --git a/core-new/tests/mdns_discovery_test.rs b/core-new/tests/mdns_discovery_test.rs deleted file mode 100644 index 31386c6ce..000000000 --- a/core-new/tests/mdns_discovery_test.rs +++ /dev/null @@ -1,65 +0,0 @@ -//! Simple test to verify LibP2P mDNS discovery works between separate processes -//! This isolates the networking layer from the full pairing protocol - -use std::process::{Command, Stdio}; -use std::time::Duration; -use tokio::time::timeout; - -#[tokio::test] -async fn test_mdns_discovery_between_processes() { - println!("🧪 Testing basic mDNS discovery between two LibP2P processes"); - - // Start Alice (listener) process - println!("🟦 Starting Alice (mDNS listener)..."); - let mut alice = Command::new("cargo") - .args(&["run", "--bin", "mdns_test_helper", "--", "listen"]) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .expect("Failed to start Alice process"); - - // Give Alice time to start listening - tokio::time::sleep(Duration::from_secs(2)).await; - - // Start Bob (discoverer) process - println!("🟨 Starting Bob (mDNS discoverer)..."); - let bob_result = timeout( - Duration::from_secs(10), - async { - Command::new("cargo") - .args(&["run", "--bin", "mdns_test_helper", "--", "discover"]) - .output() - .expect("Failed to start Bob process") - } - ).await; - - // Kill Alice process - let _ = alice.kill(); - let alice_output = alice.wait_with_output().expect("Failed to read Alice output"); - - match bob_result { - Ok(bob_output) => { - let alice_stdout = String::from_utf8_lossy(&alice_output.stdout); - let bob_stdout = String::from_utf8_lossy(&bob_output.stdout); - - println!("📤 Alice output:\n{}", alice_stdout); - println!("📥 Bob output:\n{}", bob_stdout); - - // Check if discovery succeeded - if bob_stdout.contains("PEER_DISCOVERED") { - println!("✅ mDNS discovery successful!"); - } else { - println!("❌ mDNS discovery failed"); - println!("Alice stderr: {}", String::from_utf8_lossy(&alice_output.stderr)); - println!("Bob stderr: {}", String::from_utf8_lossy(&bob_output.stderr)); - panic!("mDNS discovery between processes failed"); - } - } - Err(_) => { - let alice_stdout = String::from_utf8_lossy(&alice_output.stdout); - println!("📤 Alice output:\n{}", alice_stdout); - println!("⏰ Bob discovery timed out after 10 seconds"); - panic!("mDNS discovery timed out"); - } - } -} \ No newline at end of file diff --git a/core-new/tests/test_core_pairing.rs b/core-new/tests/pairing_test.rs similarity index 100% rename from core-new/tests/test_core_pairing.rs rename to core-new/tests/pairing_test.rs