From e6e1a3b252ef5235145465fd2e239355cb44aed9 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sat, 20 Sep 2025 00:51:08 -0700 Subject: [PATCH] feat: Add benchmarks module and refactor job handling for resumable jobs - Introduced a new `core/benchmarks` module to facilitate performance testing and benchmarking. - Updated `Cargo.toml` to include the new benchmarks module in the workspace. - Refactored job handling in the `JobManager` and `JobExecutor` to support job resumption, enhancing the ability to recover from interruptions. - Improved logging throughout the job lifecycle to provide better visibility into job states and transitions. - Added integration tests for job pause/resume functionality, ensuring robustness in job management. --- Cargo.lock | Bin 283152 -> 284481 bytes Cargo.toml | 1 + core/benchmarks/src/generator/filesystem.rs | 8 +- core/benchmarks/src/scenarios/common.rs | 4 +- .../src/scenarios/content_identification.rs | 27 +- .../benchmarks/src/scenarios/core_indexing.rs | 29 +- core/src/infra/job/executor.rs | 21 +- core/src/infra/job/manager.rs | 93 +++- core/src/infra/job/traits.rs | 5 + core/src/library/manager.rs | 5 + core/src/library/mod.rs | 21 + core/src/ops/indexing/change_detection/mod.rs | 11 + core/src/ops/indexing/job.rs | 18 +- core/src/ops/indexing/phases/processing.rs | 10 + core/src/ops/locations/add/action.rs | 10 +- core/tests/job_pause_resume_test.rs | 267 ---------- core/tests/job_resumption_integration_test.rs | 469 ++++++++++++++++++ 17 files changed, 672 insertions(+), 327 deletions(-) delete mode 100644 core/tests/job_pause_resume_test.rs create mode 100644 core/tests/job_resumption_integration_test.rs diff --git a/Cargo.lock b/Cargo.lock index b22ea53b7e37e1c91b20c54f38852f1e0a92bcd8..5e6b6744da354d746563a6db1895a54206ad562a 100644 GIT binary patch delta 619 zcmY+BJ!n%=7>2p`Ms16TrJDFR#2_eQ!uh@Do}np%o0F2goS*Ns7n7Uc2D2%y9h3}b zQ;?40q-1cAY@#5DI0=&OR;hzd24ke6+xxw|Jm1Ir-QQUrKV81_EVxuFY&BHYOB+yH z>O-fSHnZYtu|z6!eNX{H!kd!Cjh;WiaA_b0} zHk|o))!@v+!GhuLhZXf2w#)6Axw?%e!);r~;)3mpq{FU@RfR4;XY_kDOW2ODtPuYU~)66yJx(AJxOY7htUJK6p$FbyH;(##CpzX^i_yxT*44-GL{1HwnSTR8*-y8kBcn&{s6F`w;Y!Xu delta 40 ycmV+@0N4M)t`U%=5rDJ-C{(xGRRPVcw}S%$$?Uhf@dDk0mo9n(5x3oT17zTa#S=6D diff --git a/Cargo.toml b/Cargo.toml index e41fcc482..fc78e74ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ # "apps/server", "apps/cli", "core", + "core/benchmarks", "crates/*" ] resolver = "2" diff --git a/core/benchmarks/src/generator/filesystem.rs b/core/benchmarks/src/generator/filesystem.rs index eeadb7ea1..bf64bade2 100644 --- a/core/benchmarks/src/generator/filesystem.rs +++ b/core/benchmarks/src/generator/filesystem.rs @@ -25,7 +25,7 @@ fn write_magic_header_if_needed( if !enable_magic { return Ok(0); } - let registry = sd_core::file_type::FileTypeRegistry::new(); + let registry = sd_core::filetype::FileTypeRegistry::new(); let mut candidates = registry.get_by_extension(extension); if candidates.is_empty() { return Ok(0); @@ -52,9 +52,9 @@ fn write_magic_header_if_needed( .bytes .iter() .map(|b| match b { - sd_core::file_type::MagicByte::Exact(v) => *v, - sd_core::file_type::MagicByte::Any => 0u8, - sd_core::file_type::MagicByte::Range { min, .. } => *min, + sd_core::filetype::MagicByte::Exact(v) => *v, + sd_core::filetype::MagicByte::Any => 0u8, + sd_core::filetype::MagicByte::Range { min, .. } => *min, }) .collect(); file.write_all(&bytes)?; diff --git a/core/benchmarks/src/scenarios/common.rs b/core/benchmarks/src/scenarios/common.rs index f269e09d9..1e60739da 100644 --- a/core/benchmarks/src/scenarios/common.rs +++ b/core/benchmarks/src/scenarios/common.rs @@ -1,7 +1,7 @@ //! Common utilities and structures for benchmark scenarios use anyhow::{anyhow, Result}; -use sd_core::infrastructure::events::{Event, EventSubscriber}; -use sd_core::infrastructure::jobs::output::JobOutput; +use sd_core::infra::event::{Event, EventSubscriber}; +use sd_core::infra::job::output::JobOutput; use sd_core::library::Library; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; diff --git a/core/benchmarks/src/scenarios/content_identification.rs b/core/benchmarks/src/scenarios/content_identification.rs index 6fe9ecbbe..a089c6ab2 100644 --- a/core/benchmarks/src/scenarios/content_identification.rs +++ b/core/benchmarks/src/scenarios/content_identification.rs @@ -6,7 +6,7 @@ use super::{hardware_hint_to_label, infer_hardware_label, Scenario}; use crate::core_boot::CoreBoot; use crate::metrics::{collect_host_info, BenchmarkRun, Durations, RunMeta}; use crate::recipe::Recipe; -use sd_core::infrastructure::jobs::output::JobOutput; +use sd_core::infra::job::output::JobOutput; #[derive(Default)] pub struct ContentIdentificationScenario { @@ -24,7 +24,7 @@ impl Scenario for ContentIdentificationScenario { } async fn prepare(&mut self, boot: &CoreBoot, recipe: &Recipe) -> Result<()> { - use sd_core::infrastructure::actions::handler::ActionHandler; + use sd_core::infra::action::LibraryAction; let core = &boot.core; let context = core.context.clone(); let library = match core.libraries.get_active_library().await { @@ -34,22 +34,15 @@ impl Scenario for ContentIdentificationScenario { self.base.library = Some(library.clone()); for loc in &recipe.locations { - let action = sd_core::infrastructure::actions::Action::LocationAdd { - library_id: library.id(), - action: sd_core::operations::locations::add::action::LocationAddAction { - path: loc.path.clone(), - name: Some(format!("bench:{}", recipe.name)), - mode: sd_core::operations::indexing::IndexMode::Content, - }, + let input = sd_core::ops::locations::add::action::LocationAddInput { + path: loc.path.clone(), + name: Some(format!("bench:{}", recipe.name)), + mode: sd_core::ops::indexing::IndexMode::Content, }; - let handler = sd_core::operations::locations::add::action::LocationAddHandler::new(); - let out = handler.execute(context.clone(), action).await?; - if let sd_core::infrastructure::actions::output::ActionOutput::Custom { data, .. } = &out { - if let Some(j) = data.get("job_id").and_then(|v| v.as_str()) { - if let Ok(id) = uuid::Uuid::parse_str(j) { - self.base.job_ids.push(id); - } - } + let action = sd_core::ops::locations::add::action::LocationAddAction::from_input(input).map_err(|e| anyhow::anyhow!(e))?; + let out = action.execute(library.clone(), context.clone()).await.map_err(|e| anyhow::anyhow!(e.to_string()))?; + if let Some(job_id) = out.job_id { + self.base.job_ids.push(job_id); } } Ok(()) diff --git a/core/benchmarks/src/scenarios/core_indexing.rs b/core/benchmarks/src/scenarios/core_indexing.rs index 8d6994f16..d577451d4 100644 --- a/core/benchmarks/src/scenarios/core_indexing.rs +++ b/core/benchmarks/src/scenarios/core_indexing.rs @@ -6,7 +6,7 @@ use super::{hardware_hint_to_label, infer_hardware_label, Scenario}; use crate::core_boot::CoreBoot; use crate::metrics::{collect_host_info, BenchmarkRun, Durations, RunMeta}; use crate::recipe::Recipe; -use sd_core::infrastructure::jobs::output::JobOutput; +use sd_core::infra::job::output::JobOutput; #[derive(Default)] pub struct CoreIndexingScenario { @@ -24,7 +24,7 @@ impl Scenario for CoreIndexingScenario { } async fn prepare(&mut self, boot: &CoreBoot, recipe: &Recipe) -> Result<()> { - use sd_core::infrastructure::actions::handler::ActionHandler; + use sd_core::infra::action::LibraryAction; let core = &boot.core; let context = core.context.clone(); let library = match core.libraries.get_active_library().await { @@ -38,24 +38,15 @@ impl Scenario for CoreIndexingScenario { self.base.library = Some(library.clone()); for loc in &recipe.locations { - let action = sd_core::infrastructure::actions::Action::LocationAdd { - library_id: library.id(), - action: sd_core::operations::locations::add::action::LocationAddAction { - path: loc.path.clone(), - name: Some(format!("bench:{}", recipe.name)), - mode: sd_core::operations::indexing::IndexMode::Shallow, - }, + let input = sd_core::ops::locations::add::action::LocationAddInput { + path: loc.path.clone(), + name: Some(format!("bench:{}", recipe.name)), + mode: sd_core::ops::indexing::IndexMode::Shallow, }; - let handler = sd_core::operations::locations::add::action::LocationAddHandler::new(); - let out = handler.execute(context.clone(), action).await?; - if let sd_core::infrastructure::actions::output::ActionOutput::Custom { data, .. } = - &out - { - if let Some(j) = data.get("job_id").and_then(|v| v.as_str()) { - if let Ok(id) = uuid::Uuid::parse_str(j) { - self.base.job_ids.push(id); - } - } + let action = sd_core::ops::locations::add::action::LocationAddAction::from_input(input).map_err(|e| anyhow::anyhow!(e))?; + let out = action.execute(library.clone(), context.clone()).await.map_err(|e| anyhow::anyhow!(e.to_string()))?; + if let Some(job_id) = out.job_id { + self.base.job_ids.push(job_id); } } Ok(()) diff --git a/core/src/infra/job/executor.rs b/core/src/infra/job/executor.rs index 86df55735..081ce7ecb 100644 --- a/core/src/infra/job/executor.rs +++ b/core/src/infra/job/executor.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use sd_task_system::{ExecStatus, Interrupter, Task, TaskId}; use std::{path::PathBuf, sync::Arc}; use tokio::sync::{broadcast, mpsc, watch, Mutex}; -use tracing::{debug, error, info, span, Level}; +use tracing::{debug, error, info, span, warn, Level}; /// Executor that wraps a job for task system execution pub struct JobExecutor { @@ -176,14 +176,18 @@ impl JobExecutor { info!("Starting job {}: {}", self.state.job_id, J::NAME); // Update status to running + warn!("DEBUG: JobExecutor setting status to Running for job {}", self.state.job_id); let _ = self.state.status_tx.send(super::types::JobStatus::Running); // Also persist status to database + warn!("DEBUG: JobExecutor updating database status to Running for job {}", self.state.job_id); if let Err(e) = self .update_job_status_in_db(super::types::JobStatus::Running) .await { error!("Failed to update job status in database: {}", e); + } else { + warn!("DEBUG: JobExecutor successfully updated database status to Running for job {}", self.state.job_id); } // Create job context @@ -202,8 +206,19 @@ impl JobExecutor { // Progress forwarding is handled by JobManager - // Check if we're resuming - // TODO: Implement proper resume detection + // Check if we're resuming by checking if the job has existing state + // This is a heuristic - if the job implements resumable logic, it should have state + let is_resuming = self.job.is_resuming(); + warn!("DEBUG: Job {} is_resuming: {}", self.state.job_id, is_resuming); + + if is_resuming { + warn!("DEBUG: Calling on_resume for job {}", self.state.job_id); + if let Err(e) = self.job.on_resume(&ctx).await { + error!("Job {} on_resume failed: {}", self.state.job_id, e); + return Err(e); + } + } + debug!("Starting job {}", self.state.job_id); // Store metrics reference for later update diff --git a/core/src/infra/job/manager.rs b/core/src/infra/job/manager.rs index 78cfd39b5..09b67c8ef 100644 --- a/core/src/infra/job/manager.rs +++ b/core/src/infra/job/manager.rs @@ -71,8 +71,15 @@ impl JobManager { Ok(manager) } - /// Initialize job manager (resume interrupted jobs) + /// Initialize job manager (without resuming jobs) pub async fn initialize(&self) -> JobResult<()> { + info!("Job manager initialized for library {}", self.library_id); + Ok(()) + } + + /// Resume interrupted jobs - should be called after library is fully loaded + pub async fn resume_interrupted_jobs_after_load(&self) -> JobResult<()> { + info!("Resuming interrupted jobs for library {}", self.library_id); if let Err(e) = self.resume_interrupted_jobs().await { error!("Failed to resume interrupted jobs: {}", e); } @@ -891,6 +898,7 @@ impl JobManager { /// Resume interrupted jobs from the last run async fn resume_interrupted_jobs(&self) -> JobResult<()> { + warn!("DEBUG: resume_interrupted_jobs called for library {}", self.library_id); info!("Checking for interrupted jobs to resume"); use sea_orm::{ColumnTrait, QueryFilter}; @@ -902,13 +910,17 @@ impl JobManager { .all(self.db.conn()) .await?; + warn!("DEBUG: Found {} interrupted jobs to resume", interrupted.len()); for job_record in interrupted { if let Ok(job_id) = job_record.id.parse::().map(JobId) { + warn!("DEBUG: Processing interrupted job {}: {} with status {}", job_id, job_record.name, job_record.status); info!("Resuming job {}: {}", job_id, job_record.name); // Deserialize job from binary data + warn!("DEBUG: Attempting to deserialize job {} of type {}", job_id, job_record.name); match REGISTRY.deserialize_job(&job_record.name, &job_record.state) { Ok(erased_job) => { + warn!("DEBUG: Successfully deserialized job {}", job_id); // Create channels for the resumed job let (status_tx, status_rx) = watch::channel(JobStatus::Paused); let (progress_tx, progress_rx) = mpsc::unbounded_channel(); @@ -1005,14 +1017,16 @@ impl JobManager { // Get the final output from the handle let output = { let jobs = running_jobs.read().await; - jobs.get(&job_id_clone) - .and_then(|job| { - job.handle - .output - .blocking_lock() - .clone() - }) - .unwrap_or(Ok(JobOutput::Success)) + if let Some(job) = jobs.get(&job_id_clone) { + job.handle + .output + .lock() + .await + .clone() + .unwrap_or(Ok(JobOutput::Success)) + } else { + Ok(JobOutput::Success) + } }; // Emit completion event @@ -1054,6 +1068,33 @@ impl JobManager { } }); + // Update status to Running after successful dispatch + warn!("DEBUG: Attempting to update resumed job {} status to Running", job_id); + if let Some(running_job) = self.running_jobs.read().await.get(&job_id) { + if let Err(e) = running_job.status_tx.send(JobStatus::Running) { + warn!("Failed to update resumed job status: {}", e); + } else { + warn!("DEBUG: Successfully sent Running status to job {}", job_id); + } + } else { + warn!("DEBUG: Job {} not found in running_jobs when trying to update status", job_id); + } + + // Update database status + warn!("DEBUG: Attempting to update database status for job {} to Running", job_id); + use sea_orm::{ActiveModelTrait, ActiveValue::Set}; + let mut job_model = database::jobs::ActiveModel { + id: Set(job_id.to_string()), + status: Set(JobStatus::Running.to_string()), + paused_at: Set(None), + ..Default::default() + }; + if let Err(e) = job_model.update(self.db.conn()).await { + warn!("Failed to update resumed job status in database: {}", e); + } else { + warn!("DEBUG: Successfully updated database status for job {} to Running", job_id); + } + info!("Successfully resumed job {}: {}", job_id, job_record.name); } Err(e) => { @@ -1062,6 +1103,7 @@ impl JobManager { } } Err(e) => { + warn!("DEBUG: Failed to deserialize job {}: {:?}", job_id, e); error!("Failed to create job {} for resumption: {}", job_id, e); } } @@ -1303,9 +1345,16 @@ impl JobManager { JobStatus::Completed => { let output = { let jobs = running_jobs.read().await; - jobs.get(&job_id_clone) - .and_then(|job| job.handle.output.blocking_lock().clone()) - .unwrap_or(Ok(JobOutput::Success)) + if let Some(job) = jobs.get(&job_id_clone) { + job.handle + .output + .lock() + .await + .clone() + .unwrap_or(Ok(JobOutput::Success)) + } else { + Ok(JobOutput::Success) + } }; event_bus.emit(Event::JobCompleted { job_id: job_id_clone.to_string(), @@ -1455,6 +1504,26 @@ impl JobManager { tokio::time::sleep(std::time::Duration::from_millis(500)).await; } + // Close database connection properly + info!("Closing job database connection"); + + // First, checkpoint the WAL file to merge it back into the main database + use sea_orm::{ConnectionTrait, Statement}; + if let Err(e) = self.db.conn().execute(Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA wal_checkpoint(TRUNCATE)", + )).await { + warn!("Failed to checkpoint job database WAL file: {}", e); + } else { + info!("Job database WAL file checkpointed successfully"); + } + + if let Err(e) = self.db.conn().clone().close().await { + warn!("Failed to close job database connection: {}", e); + } else { + info!("Job database connection closed successfully"); + } + Ok(()) } } diff --git a/core/src/infra/job/traits.rs b/core/src/infra/job/traits.rs index 770d8a239..646e7ad97 100644 --- a/core/src/infra/job/traits.rs +++ b/core/src/infra/job/traits.rs @@ -58,6 +58,11 @@ pub trait JobHandler: Job { async fn on_cancel(&mut self, _ctx: &JobContext<'_>) -> JobResult { Ok(()) } + + /// Check if this job is resuming from a previous state (optional) + fn is_resuming(&self) -> bool { + false // Default implementation for non-resumable jobs + } } /// Trait for jobs that can be serialized diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index 339a139b0..5a2249676 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -231,6 +231,11 @@ impl LibraryManager { libraries.insert(config.id, library.clone()); } + // Now that the library is registered in the context, resume interrupted jobs + if let Err(e) = library.jobs.resume_interrupted_jobs_after_load().await { + warn!("Failed to resume interrupted jobs for library {}: {}", config.id, e); + } + // Note: Sidecar manager initialization should be done by the Core when libraries are loaded // This allows Core to pass its services reference diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 01c95a1e1..859e0d1cb 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -158,6 +158,27 @@ impl Library { let config = self.config.read().await; self.save_config(&*config).await?; + // Close library database connection properly + use tracing::{info, warn}; + info!("Closing library database connection"); + + // First, checkpoint the WAL file to merge it back into the main database + use sea_orm::{ConnectionTrait, Statement}; + if let Err(e) = self.db.as_ref().conn().execute(Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA wal_checkpoint(TRUNCATE)", + )).await { + warn!("Failed to checkpoint WAL file: {}", e); + } else { + info!("WAL file checkpointed successfully"); + } + + if let Err(e) = self.db.as_ref().conn().clone().close().await { + warn!("Failed to close library database connection: {}", e); + } else { + info!("Library database connection closed successfully"); + } + Ok(()) } diff --git a/core/src/ops/indexing/change_detection/mod.rs b/core/src/ops/indexing/change_detection/mod.rs index 6dbf00b62..9e51e9e04 100644 --- a/core/src/ops/indexing/change_detection/mod.rs +++ b/core/src/ops/indexing/change_detection/mod.rs @@ -134,6 +134,14 @@ impl ChangeDetector { self.path_to_entry.len() )); + // DEBUG: Log if we failed to load entries + use tracing::warn; + if self.path_to_entry.is_empty() { + warn!("DEBUG: ChangeDetector loaded 0 entries - database may be locked or empty"); + } else { + warn!("DEBUG: ChangeDetector loaded {} entries successfully", self.path_to_entry.len()); + } + Ok(()) } @@ -166,6 +174,9 @@ impl ChangeDetector { if old_path != path { // Same inode, different path - it's a move if let Some(db_entry) = self.path_to_entry.get(old_path) { + // DEBUG: Log false move detection + use tracing::warn; + warn!("DEBUG: Detected move - old: {:?}, new: {:?}, inode: {}", old_path, path, inode_val); return Some(Change::Moved { old_path: old_path.clone(), new_path: path.to_path_buf(), diff --git a/core/src/ops/indexing/job.rs b/core/src/ops/indexing/job.rs index b7d483bf9..4a42c2e05 100644 --- a/core/src/ops/indexing/job.rs +++ b/core/src/ops/indexing/job.rs @@ -12,6 +12,7 @@ use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::RwLock; +use tracing::warn; use uuid::Uuid; use super::{ @@ -213,7 +214,7 @@ pub struct IndexerJob { pub config: IndexerJobConfig, // Resumable state - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none", default)] state: Option, // Ephemeral storage for non-persistent jobs @@ -273,6 +274,8 @@ impl JobHandler for IndexerJob { let state = match &mut self.state { Some(state) => { ctx.log("Resuming indexer from saved state"); + warn!("DEBUG: Resumed state - phase: {:?}, entry_batches: {}, entries_for_content: {}", + state.phase, state.entry_batches.len(), state.entries_for_content.len()); state } None => { @@ -321,6 +324,7 @@ impl JobHandler for IndexerJob { ctx.check_interrupt().await?; let current_phase = state.phase.clone(); + warn!("DEBUG: IndexerJob entering phase: {:?}", current_phase); match current_phase { Phase::Discovery => { // Use scope-aware discovery @@ -347,6 +351,7 @@ impl JobHandler for IndexerJob { } Phase::Processing => { + warn!("DEBUG: IndexerJob starting Processing phase"); if self.config.is_ephemeral() { let ephemeral_index = self.ephemeral_index.clone().ok_or_else(|| { JobError::execution("Ephemeral index not initialized".to_string()) @@ -419,8 +424,10 @@ impl JobHandler for IndexerJob { // Checkpoint after each phase (only for persistent jobs) if !self.config.is_ephemeral() { + warn!("DEBUG: IndexerJob checkpointing after phase: {:?}", state.phase); ctx.checkpoint().await?; } + warn!("DEBUG: IndexerJob completed phase: {:?}, next phase will be: {:?}", current_phase, state.phase); } // Send final progress update @@ -463,7 +470,9 @@ impl JobHandler for IndexerJob { async fn on_resume(&mut self, ctx: &JobContext<'_>) -> JobResult { // State is already loaded from serialization + warn!("DEBUG: IndexerJob on_resume called"); if let Some(state) = &self.state { + warn!("DEBUG: IndexerJob has state, resuming in {:?} phase", state.phase); ctx.log(format!("Resuming indexer in {:?} phase", state.phase)); ctx.log(format!( "Progress: {} files, {} dirs, {} errors so far", @@ -472,6 +481,8 @@ impl JobHandler for IndexerJob { // Reinitialize timer for resumed job self.timer = Some(PhaseTimer::new()); + } else { + warn!("DEBUG: IndexerJob has no state during resume!"); } Ok(()) } @@ -491,6 +502,11 @@ impl JobHandler for IndexerJob { } Ok(()) } + + fn is_resuming(&self) -> bool { + // If we have existing state, we're resuming + self.state.is_some() + } } impl IndexerJob { diff --git a/core/src/ops/indexing/phases/processing.rs b/core/src/ops/indexing/phases/processing.rs index 18059502f..04f759b52 100644 --- a/core/src/ops/indexing/phases/processing.rs +++ b/core/src/ops/indexing/phases/processing.rs @@ -15,6 +15,7 @@ use crate::{ }; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; use std::path::Path; +use tracing::warn; use uuid::Uuid; /// Run the processing phase of indexing @@ -31,6 +32,12 @@ pub async fn run_processing_phase( total_batches )); + if total_batches == 0 { + ctx.log("No batches to process - transitioning to Aggregation phase"); + state.phase = crate::ops::indexing::state::Phase::Aggregation; + return Ok(()); + } + // Get the actual location record from database let location_record = entities::location::Entity::find() .filter(entities::location::Column::Uuid.eq(location_id)) @@ -165,6 +172,9 @@ pub async fn run_processing_phase( for entry in batch { // Check for interruption during batch processing ctx.check_interrupt().await?; + + // Add to seen_paths for delete detection (important for resumed jobs) + state.seen_paths.insert(entry.path.clone()); // Get metadata for change detection let metadata = match std::fs::metadata(&entry.path) { Ok(m) => m, diff --git a/core/src/ops/locations/add/action.rs b/core/src/ops/locations/add/action.rs index 5cc7a3225..9c9b2e401 100644 --- a/core/src/ops/locations/add/action.rs +++ b/core/src/ops/locations/add/action.rs @@ -94,11 +94,17 @@ impl LibraryAction for LocationAddAction { None }; - Ok(LocationAddOutput::new( + let mut output = LocationAddOutput::new( location_id, self.input.path, self.input.name, - )) + ); + + if let Some(job_id) = job_id { + output = output.with_job_id(job_id); + } + + Ok(output) } fn action_kind(&self) -> &'static str { diff --git a/core/tests/job_pause_resume_test.rs b/core/tests/job_pause_resume_test.rs deleted file mode 100644 index 63cadb335..000000000 --- a/core/tests/job_pause_resume_test.rs +++ /dev/null @@ -1,267 +0,0 @@ -//! Integration test for job pause/resume functionality - -use sd_core::{ - infra::db::entities, - infra::job::types::{JobId, JobStatus}, - location::{create_location, IndexMode, LocationCreateArgs}, - Core, -}; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; -use std::time::Duration; -use tempfile::TempDir; -use tokio::time::sleep; - -#[tokio::test] -async fn test_pause_and_resume_indexing_job() -> Result<(), Box> { - // Setup test environment - let temp_dir = TempDir::new()?; - let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; - - // Create library - let library = core - .libraries - .create_library("Test Pause Resume Library", None, core.context.clone()) - .await?; - - // Create test location directory with many files - let test_location_dir = temp_dir.path().join("test_location"); - tokio::fs::create_dir_all(&test_location_dir).await?; - - // Create many test files to ensure job runs long enough - for i in 0..100 { - let file_path = test_location_dir.join(format!("test_file_{}.txt", i)); - tokio::fs::write(&file_path, format!("Test content {}", i)).await?; - } - - // Register device - let db = library.db(); - let device = core.device.to_device()?; - - let device_record = match entities::device::Entity::find() - .filter(entities::device::Column::Uuid.eq(device.id)) - .one(db.conn()) - .await? - { - Some(existing) => existing, - None => { - let device_model: entities::device::ActiveModel = device.into(); - device_model.insert(db.conn()).await? - } - }; - - // Create location to trigger indexing - let location_args = LocationCreateArgs { - path: test_location_dir.clone(), - name: Some("Test Location".to_string()), - index_mode: IndexMode::Deep, - }; - - let _location_db_id = create_location( - library.clone(), - &core.events, - location_args, - device_record.id, - ) - .await?; - - // Get the indexing job that was created - let job_manager = library.jobs(); - - // Wait a bit for job to be created and start - sleep(Duration::from_millis(200)).await; - - // Get running jobs - let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?; - assert!( - !running_jobs.is_empty(), - "Should have a running indexing job" - ); - - let job_info = &running_jobs[0]; - let job_id = JobId(job_info.id); - - // Wait a bit for job to start processing - sleep(Duration::from_millis(500)).await; - - // Pause the job - job_manager.pause_job(job_id).await?; - - // Wait for pause to take effect - sleep(Duration::from_millis(200)).await; - - // Check job is paused - let job_info = job_manager.get_job_info(job_id.0).await?.unwrap(); - assert_eq!(job_info.status, JobStatus::Paused, "Job should be paused"); - - // Record progress when paused - let paused_progress = job_info.progress; - assert!(paused_progress > 0.0, "Should have made some progress"); - assert!(paused_progress < 100.0, "Should not be complete"); - - // Wait a bit to ensure no progress is made while paused - sleep(Duration::from_millis(500)).await; - - // Check progress hasn't changed - let job_info = job_manager.get_job_info(job_id.0).await?.unwrap(); - assert_eq!( - job_info.progress, paused_progress, - "Progress should not change while paused" - ); - - // Resume the job - job_manager.resume_job(job_id).await?; - - // Wait for job to complete - let mut retries = 0; - loop { - let job_info = job_manager.get_job_info(job_id.0).await?.unwrap(); - match job_info.status { - JobStatus::Completed => { - assert!(job_info.progress >= 99.0, "Job should be complete"); - break; - } - JobStatus::Failed => { - panic!("Job failed: {:?}", job_info.error_message); - } - _ => { - if retries > 100 { - panic!("Job did not complete in time"); - } - retries += 1; - sleep(Duration::from_millis(100)).await; - } - } - } - - // Verify files were indexed - use sea_orm::PaginatorTrait; - let indexed_count = entities::entry::Entity::find().count(db.conn()).await?; - - assert!(indexed_count > 0, "Files should be indexed"); - - Ok(()) -} - -#[tokio::test] -async fn test_pause_paused_job_error() -> Result<(), Box> { - // Setup test environment - let temp_dir = TempDir::new()?; - let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; - - // Create library - let library = core - .libraries - .create_library("Test Pause Error Library", None, core.context.clone()) - .await?; - - // Create test location - let test_location_dir = temp_dir.path().join("test_location"); - tokio::fs::create_dir_all(&test_location_dir).await?; - tokio::fs::write(test_location_dir.join("test.txt"), "content").await?; - - // Register device - let db = library.db(); - let device = core.device.to_device()?; - let device_model: entities::device::ActiveModel = device.into(); - let device_record = device_model.insert(db.conn()).await?; - - // Create location - let location_args = LocationCreateArgs { - path: test_location_dir.clone(), - name: Some("Test Location".to_string()), - index_mode: IndexMode::Deep, - }; - - create_location( - library.clone(), - &core.events, - location_args, - device_record.id, - ) - .await?; - - // Get the job - let job_manager = library.jobs(); - sleep(Duration::from_millis(200)).await; - let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?; - let job_id = JobId(running_jobs[0].id); - - // Pause the job - job_manager.pause_job(job_id).await?; - sleep(Duration::from_millis(100)).await; - - // Try to pause again - should fail - let result = job_manager.pause_job(job_id).await; - assert!( - result.is_err(), - "Should not be able to pause an already paused job" - ); - assert!(result - .unwrap_err() - .to_string() - .contains("Cannot pause job in Paused state")); - - Ok(()) -} - -#[tokio::test] -async fn test_resume_running_job_error() -> Result<(), Box> { - // Setup test environment - let temp_dir = TempDir::new()?; - let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?; - - // Create library - let library = core - .libraries - .create_library("Test Resume Error Library", None, core.context.clone()) - .await?; - - // Create test location with multiple files - let test_location_dir = temp_dir.path().join("test_location"); - tokio::fs::create_dir_all(&test_location_dir).await?; - - for i in 0..10 { - let file_path = test_location_dir.join(format!("test_file_{}.txt", i)); - tokio::fs::write(&file_path, format!("Test content {}", i)).await?; - } - - // Register device - let db = library.db(); - let device = core.device.to_device()?; - let device_model: entities::device::ActiveModel = device.into(); - let device_record = device_model.insert(db.conn()).await?; - - // Create location - let location_args = LocationCreateArgs { - path: test_location_dir.clone(), - name: Some("Test Location".to_string()), - index_mode: IndexMode::Deep, - }; - - create_location( - library.clone(), - &core.events, - location_args, - device_record.id, - ) - .await?; - - // Get the running job - let job_manager = library.jobs(); - sleep(Duration::from_millis(200)).await; - let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?; - let job_id = JobId(running_jobs[0].id); - - // Try to resume a running job - should fail - let result = job_manager.resume_job(job_id).await; - assert!( - result.is_err(), - "Should not be able to resume a running job" - ); - assert!(result - .unwrap_err() - .to_string() - .contains("Cannot resume job in Running state")); - - Ok(()) -} diff --git a/core/tests/job_resumption_integration_test.rs b/core/tests/job_resumption_integration_test.rs new file mode 100644 index 000000000..31eaad69e --- /dev/null +++ b/core/tests/job_resumption_integration_test.rs @@ -0,0 +1,469 @@ +//! Integration test for job resumption at various interruption points +//! +//! This test generates benchmark data and tests job resumption by interrupting +//! indexing jobs at different phases and progress points, then verifying they +//! can resume and complete successfully. + +use sd_core::{ + infra::action::LibraryAction, + ops::{ + indexing::IndexMode, + locations::add::{action::LocationAddAction, action::LocationAddInput}, + }, +}; +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{ + sync::mpsc, + time::{sleep, timeout}, +}; +use tracing::{info, warn}; +use uuid::Uuid; + +/// Test data directory in the repo for inspection +const TEST_DATA_DIR: &str = "data"; + +/// Benchmark recipe name to use for test data generation +const TEST_RECIPE_NAME: &str = "shape_medium"; + +/// Path where the benchmark data will be generated (relative to workspace root) +const TEST_INDEXING_DATA_PATH: &str = "benchdata/shape_medium"; + +/// Different interruption points to test +#[derive(Debug, Clone)] +enum InterruptionPoint { + /// Interrupt during discovery phase at X% progress + Discovery(u8), + /// Interrupt during processing phase at X% progress + Processing(u8), + /// Interrupt during content identification at X% progress + ContentIdentification(u8), + /// Interrupt during aggregation phase + Aggregation, +} + +/// Test result for a single interruption scenario +#[derive(Debug)] +struct TestResult { + interruption_point: InterruptionPoint, + success: bool, + error: Option, + job_log_path: Option, + daemon_log_path: Option, +} + +/// Main integration test +#[tokio::test] +async fn test_job_resumption_at_various_points() { + // Initialize tracing for test debugging + let _ = tracing_subscriber::fmt() + .with_env_filter("warn,sd_core=info") + .try_init(); + + info!("Starting job resumption integration test"); + + // Generate benchmark data (or use existing data) + info!("Preparing test data"); + let indexing_data_path = generate_test_data().await.expect("Failed to prepare test data"); + + // Define interruption points to test + // For quick testing, comment out all but one interruption point + let interruption_points = vec![ + InterruptionPoint::ContentIdentification(30), + // InterruptionPoint::Discovery(25), + // InterruptionPoint::Discovery(75), + // InterruptionPoint::Processing(10), + // InterruptionPoint::Processing(50), + // InterruptionPoint::Processing(90), + // InterruptionPoint::ContentIdentification(80), + // InterruptionPoint::Aggregation, + ]; + + let mut results = Vec::new(); + let total_tests = interruption_points.len(); + + // Test each interruption point + for (i, interruption_point) in interruption_points.into_iter().enumerate() { + info!("Testing interruption point {:?} ({}/{})", interruption_point, i + 1, total_tests); + + let result = test_single_interruption_point( + &indexing_data_path, + interruption_point.clone(), + i, + ).await; + + results.push(result); + + // Brief pause between tests + sleep(Duration::from_secs(2)).await; + } + + // Analyze results + analyze_test_results(&results); + + // Assert all tests passed + let failed_tests: Vec<_> = results.iter().filter(|r| !r.success).collect(); + if !failed_tests.is_empty() { + panic!( + "Job resumption test failed at {} interruption points: {:#?}", + failed_tests.len(), + failed_tests + ); + } + + info!("All job resumption tests passed! 🎉"); +} + +/// Generate test data using benchmark data generation +async fn generate_test_data() -> Result> { + use std::process::Command; + + let current_dir = std::env::current_dir()?; + info!("Current directory: {}", current_dir.display()); + + // Use relative path from workspace root (tests run from core/ directory) + let indexing_data_path = if current_dir.ends_with("core") { + current_dir.parent().unwrap().join(TEST_INDEXING_DATA_PATH) + } else { + current_dir.join(TEST_INDEXING_DATA_PATH) + }; + + // Check if data already exists + if indexing_data_path.exists() && indexing_data_path.is_dir() { + // Check if directory has files + let entries: Vec<_> = std::fs::read_dir(&indexing_data_path)? + .collect::, _>>()?; + + if !entries.is_empty() { + info!("Test data already exists at: {}, skipping generation", indexing_data_path.display()); + return Ok(indexing_data_path); + } + } + + // Run benchmark data generation using existing recipe + info!("Generating test data using recipe: {}", TEST_RECIPE_NAME); + let recipe_path = current_dir.join("benchmarks/recipes").join(format!("{}.yaml", TEST_RECIPE_NAME)); + info!("Recipe path: {}", recipe_path.display()); + + let output = Command::new("cargo") + .args([ + "run", "-p", "sd-bench", "--bin", "sd-bench", "--", + "mkdata", + "--recipe", recipe_path.to_str().unwrap(), + ]) + .current_dir(¤t_dir) + .output()?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + return Err(format!( + "Benchmark data generation failed:\nSTDOUT: {}\nSTDERR: {}", + stdout, stderr + ).into()); + } + + info!("Generated test data at: {}", indexing_data_path.display()); + Ok(indexing_data_path) +} + +/// Test a single interruption point scenario +async fn test_single_interruption_point( + indexing_data_path: &PathBuf, + interruption_point: InterruptionPoint, + test_index: usize, +) -> TestResult { + let test_name = format!("test_{:02}_{:?}", test_index, interruption_point); + let test_data_path = PathBuf::from(TEST_DATA_DIR); + let core_data_path = test_data_path.join(&test_name); + + // Clean core data directory for this test + if core_data_path.exists() { + let _ = std::fs::remove_dir_all(&core_data_path); + } + std::fs::create_dir_all(&core_data_path).expect("Failed to create core data directory"); + + info!("Testing {} with data at {}", test_name, indexing_data_path.display()); + + // Phase 1: Start indexing and interrupt at specified point + let interrupt_result = start_and_interrupt_job( + &core_data_path, + indexing_data_path, + &interruption_point, + ).await; + + let job_id = match interrupt_result { + Ok(job_id) => job_id, + Err(error) => { + return TestResult { + interruption_point, + success: false, + error: Some(format!("Failed to interrupt job: {}", error)), + job_log_path: None, + daemon_log_path: None, + }; + } + }; + + // Brief pause to ensure clean shutdown + sleep(Duration::from_secs(1)).await; + + // Phase 2: Resume and complete the job + let resume_result = resume_and_complete_job( + &core_data_path, + indexing_data_path, + job_id, + ).await; + + match resume_result { + Ok((job_log_path, daemon_log_path)) => TestResult { + interruption_point, + success: true, + error: None, + job_log_path: Some(job_log_path), + daemon_log_path: Some(daemon_log_path), + }, + Err(error) => TestResult { + interruption_point, + success: false, + error: Some(format!("Failed to resume job: {}", error)), + job_log_path: None, + daemon_log_path: None, + }, + } +} + +/// Start indexing job and interrupt at specified point +async fn start_and_interrupt_job( + core_data_path: &PathBuf, + indexing_data_path: &PathBuf, + interruption_point: &InterruptionPoint, +) -> Result> { + info!("Starting job and waiting for interruption point: {:?}", interruption_point); + + // Create core with isolated data directory + let core = sd_core::Core::new_with_config(core_data_path.clone()).await?; + let core_context = core.context.clone(); + + // Create library + let library = core_context.libraries().await + .create_library("Test Library".to_string(), None, core_context.clone()) + .await?; + + // Create location add action to automatically trigger indexing + let location_input = LocationAddInput { + path: indexing_data_path.clone(), + name: Some("Test Location".to_string()), + mode: IndexMode::Content, + }; + + let location_action = LocationAddAction::from_input(location_input) + .map_err(|e| format!("Failed to create location action: {}", e))?; + + // Dispatch the location add action through the action manager + let action_manager = core_context.action_manager.read().await; + let action_manager = action_manager.as_ref() + .ok_or("Action manager not initialized")?; + + let location_output = action_manager + .dispatch_library(Some(library.id()), location_action) + .await + .map_err(|e| format!("Failed to dispatch location add action: {}", e))?; + + // The location add action automatically creates an indexing job + let job_id = location_output.job_id + .ok_or("Location add action did not return a job ID")?; + + // Set up event monitoring + let (interrupt_tx, mut interrupt_rx) = mpsc::channel(1); + let should_interrupt = Arc::new(AtomicBool::new(false)); + let should_interrupt_clone = should_interrupt.clone(); + + // Monitor events for interruption point + let mut event_rx = core_context.events.subscribe(); + let interruption_point_clone = interruption_point.clone(); + tokio::spawn(async move { + while let Ok(event) = event_rx.recv().await { + if let sd_core::infra::event::Event::JobProgress { job_id: event_job_id, progress, message, .. } = event { + if event_job_id == job_id.to_string() { + let message_str = message.as_deref().unwrap_or(""); + info!("Job progress: {}% - {}", progress * 100.0, message_str); + let should_interrupt_now = match &interruption_point_clone { + InterruptionPoint::Discovery(target_percent) => { + // Interrupt during discovery phase if we're at or past target percentage + message_str.contains("Discovery") && progress >= (*target_percent as f64 * 0.01) + }, + InterruptionPoint::Processing(target_percent) => { + // Interrupt during processing phase if we're at or past target percentage + message_str.contains("Processing") && progress >= (*target_percent as f64 * 0.01) + }, + InterruptionPoint::ContentIdentification(target_percent) => { + // Interrupt during content identification if we're at or past target percentage + (message_str.contains("Content") || message_str.contains("content identities")) && + progress >= (*target_percent as f64 * 0.01) + }, + InterruptionPoint::Aggregation => { + // Interrupt as soon as we enter aggregation phase + message_str.contains("Aggregation") + }, + }; + + if should_interrupt_now && !should_interrupt_clone.load(Ordering::Relaxed) { + info!("Triggering interrupt at {}% in phase: {}", + progress, message_str); + should_interrupt_clone.store(true, Ordering::Relaxed); + let _ = interrupt_tx.send(()).await; + } + } + } + } + }); + + // Wait for interruption point or timeout + let interrupt_timeout = timeout(Duration::from_secs(120), interrupt_rx.recv()).await; + + match interrupt_timeout { + Ok(Some(())) => { + info!("Interruption point reached, shutting down core"); + // Shutdown core gracefully + core.shutdown().await?; + Ok(job_id) + }, + Ok(None) => Err("Interrupt channel closed unexpectedly".into()), + Err(_) => Err("Timeout waiting for interruption point".into()), + } +} + +/// Resume and complete the interrupted job +async fn resume_and_complete_job( + core_data_path: &PathBuf, + _indexing_data_path: &PathBuf, + job_id: Uuid, +) -> Result<(PathBuf, PathBuf), Box> { + info!("Resuming job {} and waiting for completion", job_id); + + // Create core again (simulating daemon restart) + let core = sd_core::Core::new_with_config(core_data_path.clone()).await?; + let core_context = core.context.clone(); + + // Get the library (should auto-load) + let libraries = core_context.libraries().await.list().await; + let _library = libraries.first() + .ok_or("No library found after restart")?; + + // Set up completion monitoring + let (completion_tx, mut completion_rx) = mpsc::channel(1); + let job_completed = Arc::new(AtomicBool::new(false)); + let job_completed_clone = job_completed.clone(); + + // Monitor for job completion + let mut event_rx = core_context.events.subscribe(); + tokio::spawn(async move { + while let Ok(event) = event_rx.recv().await { + match event { + sd_core::infra::event::Event::JobCompleted { job_id: event_job_id, .. } => { + if event_job_id == job_id.to_string() { + info!("Job {} completed successfully", job_id); + job_completed_clone.store(true, Ordering::Relaxed); + let _ = completion_tx.send(Ok(())).await; + break; + } + }, + sd_core::infra::event::Event::JobFailed { job_id: event_job_id, error, .. } => { + if event_job_id == job_id.to_string() { + warn!("Job {} failed: {}", job_id, error); + let _ = completion_tx.send(Err(error)).await; + break; + } + }, + _ => {} + } + } + }); + + // Wait for completion or timeout + let completion_timeout = timeout(Duration::from_secs(300), completion_rx.recv()).await; + + match completion_timeout { + Ok(Some(Ok(()))) => { + info!("Job completed successfully"); + + // Collect log paths for inspection + let job_log_path = core_data_path.join("job_logs").join(format!("{}.log", job_id)); + let daemon_log_path = core_data_path.join("daemon.log"); + + // Shutdown core + core.shutdown().await?; + + Ok((job_log_path, daemon_log_path)) + }, + Ok(Some(Err(error))) => { + core.shutdown().await?; + Err(format!("Job failed during resumption: {}", error).into()) + }, + Ok(None) => { + core.shutdown().await?; + Err("Completion channel closed unexpectedly".into()) + }, + Err(_) => { + core.shutdown().await?; + Err("Timeout waiting for job completion".into()) + }, + } +} + +/// Analyze and report test results +fn analyze_test_results(results: &[TestResult]) { + info!("=== Job Resumption Test Results ==="); + + let total_tests = results.len(); + let passed_tests = results.iter().filter(|r| r.success).count(); + let failed_tests = total_tests - passed_tests; + + info!("Total tests: {}", total_tests); + info!("Passed: {}", passed_tests); + info!("Failed: {}", failed_tests); + + if failed_tests > 0 { + warn!("Failed test details:"); + for result in results.iter().filter(|r| !r.success) { + warn!(" {:?}: {}", result.interruption_point, + result.error.as_ref().unwrap_or(&"Unknown error".to_string())); + + if let Some(job_log) = &result.job_log_path { + warn!(" Job log: {}", job_log.display()); + } + if let Some(daemon_log) = &result.daemon_log_path { + warn!(" Daemon log: {}", daemon_log.display()); + } + } + } + + // Group results by interruption type + let mut by_phase = std::collections::HashMap::new(); + for result in results { + let phase = match &result.interruption_point { + InterruptionPoint::Discovery(_) => "Discovery", + InterruptionPoint::Processing(_) => "Processing", + InterruptionPoint::ContentIdentification(_) => "Content Identification", + InterruptionPoint::Aggregation => "Aggregation", + }; + by_phase.entry(phase).or_insert_with(Vec::new).push(result); + } + + info!("Results by phase:"); + for (phase, phase_results) in by_phase { + let phase_passed = phase_results.iter().filter(|r| r.success).count(); + let phase_total = phase_results.len(); + info!(" {}: {}/{} passed", phase, phase_passed, phase_total); + } + + info!("Test data and logs available in: {}", TEST_DATA_DIR); +} +