feat: Add benchmarks module and refactor job handling for resumable jobs

- Introduced a new `core/benchmarks` module to facilitate performance testing and benchmarking.
- Updated `Cargo.toml` to include the new benchmarks module in the workspace.
- Refactored job handling in the `JobManager` and `JobExecutor` to support job resumption, enhancing the ability to recover from interruptions.
- Improved logging throughout the job lifecycle to provide better visibility into job states and transitions.
- Added integration tests for job pause/resume functionality, ensuring robustness in job management.
This commit is contained in:
Jamie Pine
2025-09-20 00:51:08 -07:00
parent bc7091768d
commit e6e1a3b252
17 changed files with 672 additions and 327 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -9,6 +9,7 @@ members = [
# "apps/server",
"apps/cli",
"core",
"core/benchmarks",
"crates/*"
]
resolver = "2"

View File

@@ -25,7 +25,7 @@ fn write_magic_header_if_needed(
if !enable_magic {
return Ok(0);
}
let registry = sd_core::file_type::FileTypeRegistry::new();
let registry = sd_core::filetype::FileTypeRegistry::new();
let mut candidates = registry.get_by_extension(extension);
if candidates.is_empty() {
return Ok(0);
@@ -52,9 +52,9 @@ fn write_magic_header_if_needed(
.bytes
.iter()
.map(|b| match b {
sd_core::file_type::MagicByte::Exact(v) => *v,
sd_core::file_type::MagicByte::Any => 0u8,
sd_core::file_type::MagicByte::Range { min, .. } => *min,
sd_core::filetype::MagicByte::Exact(v) => *v,
sd_core::filetype::MagicByte::Any => 0u8,
sd_core::filetype::MagicByte::Range { min, .. } => *min,
})
.collect();
file.write_all(&bytes)?;

View File

@@ -1,7 +1,7 @@
//! Common utilities and structures for benchmark scenarios
use anyhow::{anyhow, Result};
use sd_core::infrastructure::events::{Event, EventSubscriber};
use sd_core::infrastructure::jobs::output::JobOutput;
use sd_core::infra::event::{Event, EventSubscriber};
use sd_core::infra::job::output::JobOutput;
use sd_core::library::Library;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;

View File

@@ -6,7 +6,7 @@ use super::{hardware_hint_to_label, infer_hardware_label, Scenario};
use crate::core_boot::CoreBoot;
use crate::metrics::{collect_host_info, BenchmarkRun, Durations, RunMeta};
use crate::recipe::Recipe;
use sd_core::infrastructure::jobs::output::JobOutput;
use sd_core::infra::job::output::JobOutput;
#[derive(Default)]
pub struct ContentIdentificationScenario {
@@ -24,7 +24,7 @@ impl Scenario for ContentIdentificationScenario {
}
async fn prepare(&mut self, boot: &CoreBoot, recipe: &Recipe) -> Result<()> {
use sd_core::infrastructure::actions::handler::ActionHandler;
use sd_core::infra::action::LibraryAction;
let core = &boot.core;
let context = core.context.clone();
let library = match core.libraries.get_active_library().await {
@@ -34,22 +34,15 @@ impl Scenario for ContentIdentificationScenario {
self.base.library = Some(library.clone());
for loc in &recipe.locations {
let action = sd_core::infrastructure::actions::Action::LocationAdd {
library_id: library.id(),
action: sd_core::operations::locations::add::action::LocationAddAction {
path: loc.path.clone(),
name: Some(format!("bench:{}", recipe.name)),
mode: sd_core::operations::indexing::IndexMode::Content,
},
let input = sd_core::ops::locations::add::action::LocationAddInput {
path: loc.path.clone(),
name: Some(format!("bench:{}", recipe.name)),
mode: sd_core::ops::indexing::IndexMode::Content,
};
let handler = sd_core::operations::locations::add::action::LocationAddHandler::new();
let out = handler.execute(context.clone(), action).await?;
if let sd_core::infrastructure::actions::output::ActionOutput::Custom { data, .. } = &out {
if let Some(j) = data.get("job_id").and_then(|v| v.as_str()) {
if let Ok(id) = uuid::Uuid::parse_str(j) {
self.base.job_ids.push(id);
}
}
let action = sd_core::ops::locations::add::action::LocationAddAction::from_input(input).map_err(|e| anyhow::anyhow!(e))?;
let out = action.execute(library.clone(), context.clone()).await.map_err(|e| anyhow::anyhow!(e.to_string()))?;
if let Some(job_id) = out.job_id {
self.base.job_ids.push(job_id);
}
}
Ok(())

View File

@@ -6,7 +6,7 @@ use super::{hardware_hint_to_label, infer_hardware_label, Scenario};
use crate::core_boot::CoreBoot;
use crate::metrics::{collect_host_info, BenchmarkRun, Durations, RunMeta};
use crate::recipe::Recipe;
use sd_core::infrastructure::jobs::output::JobOutput;
use sd_core::infra::job::output::JobOutput;
#[derive(Default)]
pub struct CoreIndexingScenario {
@@ -24,7 +24,7 @@ impl Scenario for CoreIndexingScenario {
}
async fn prepare(&mut self, boot: &CoreBoot, recipe: &Recipe) -> Result<()> {
use sd_core::infrastructure::actions::handler::ActionHandler;
use sd_core::infra::action::LibraryAction;
let core = &boot.core;
let context = core.context.clone();
let library = match core.libraries.get_active_library().await {
@@ -38,24 +38,15 @@ impl Scenario for CoreIndexingScenario {
self.base.library = Some(library.clone());
for loc in &recipe.locations {
let action = sd_core::infrastructure::actions::Action::LocationAdd {
library_id: library.id(),
action: sd_core::operations::locations::add::action::LocationAddAction {
path: loc.path.clone(),
name: Some(format!("bench:{}", recipe.name)),
mode: sd_core::operations::indexing::IndexMode::Shallow,
},
let input = sd_core::ops::locations::add::action::LocationAddInput {
path: loc.path.clone(),
name: Some(format!("bench:{}", recipe.name)),
mode: sd_core::ops::indexing::IndexMode::Shallow,
};
let handler = sd_core::operations::locations::add::action::LocationAddHandler::new();
let out = handler.execute(context.clone(), action).await?;
if let sd_core::infrastructure::actions::output::ActionOutput::Custom { data, .. } =
&out
{
if let Some(j) = data.get("job_id").and_then(|v| v.as_str()) {
if let Ok(id) = uuid::Uuid::parse_str(j) {
self.base.job_ids.push(id);
}
}
let action = sd_core::ops::locations::add::action::LocationAddAction::from_input(input).map_err(|e| anyhow::anyhow!(e))?;
let out = action.execute(library.clone(), context.clone()).await.map_err(|e| anyhow::anyhow!(e.to_string()))?;
if let Some(job_id) = out.job_id {
self.base.job_ids.push(job_id);
}
}
Ok(())

View File

@@ -16,7 +16,7 @@ use async_trait::async_trait;
use sd_task_system::{ExecStatus, Interrupter, Task, TaskId};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::{broadcast, mpsc, watch, Mutex};
use tracing::{debug, error, info, span, Level};
use tracing::{debug, error, info, span, warn, Level};
/// Executor that wraps a job for task system execution
pub struct JobExecutor<J: JobHandler> {
@@ -176,14 +176,18 @@ impl<J: JobHandler> JobExecutor<J> {
info!("Starting job {}: {}", self.state.job_id, J::NAME);
// Update status to running
warn!("DEBUG: JobExecutor setting status to Running for job {}", self.state.job_id);
let _ = self.state.status_tx.send(super::types::JobStatus::Running);
// Also persist status to database
warn!("DEBUG: JobExecutor updating database status to Running for job {}", self.state.job_id);
if let Err(e) = self
.update_job_status_in_db(super::types::JobStatus::Running)
.await
{
error!("Failed to update job status in database: {}", e);
} else {
warn!("DEBUG: JobExecutor successfully updated database status to Running for job {}", self.state.job_id);
}
// Create job context
@@ -202,8 +206,19 @@ impl<J: JobHandler> JobExecutor<J> {
// Progress forwarding is handled by JobManager
// Check if we're resuming
// TODO: Implement proper resume detection
// Check if we're resuming by checking if the job has existing state
// This is a heuristic - if the job implements resumable logic, it should have state
let is_resuming = self.job.is_resuming();
warn!("DEBUG: Job {} is_resuming: {}", self.state.job_id, is_resuming);
if is_resuming {
warn!("DEBUG: Calling on_resume for job {}", self.state.job_id);
if let Err(e) = self.job.on_resume(&ctx).await {
error!("Job {} on_resume failed: {}", self.state.job_id, e);
return Err(e);
}
}
debug!("Starting job {}", self.state.job_id);
// Store metrics reference for later update

View File

@@ -71,8 +71,15 @@ impl JobManager {
Ok(manager)
}
/// Initialize job manager (resume interrupted jobs)
/// Initialize job manager (without resuming jobs)
pub async fn initialize(&self) -> JobResult<()> {
info!("Job manager initialized for library {}", self.library_id);
Ok(())
}
/// Resume interrupted jobs - should be called after library is fully loaded
pub async fn resume_interrupted_jobs_after_load(&self) -> JobResult<()> {
info!("Resuming interrupted jobs for library {}", self.library_id);
if let Err(e) = self.resume_interrupted_jobs().await {
error!("Failed to resume interrupted jobs: {}", e);
}
@@ -891,6 +898,7 @@ impl JobManager {
/// Resume interrupted jobs from the last run
async fn resume_interrupted_jobs(&self) -> JobResult<()> {
warn!("DEBUG: resume_interrupted_jobs called for library {}", self.library_id);
info!("Checking for interrupted jobs to resume");
use sea_orm::{ColumnTrait, QueryFilter};
@@ -902,13 +910,17 @@ impl JobManager {
.all(self.db.conn())
.await?;
warn!("DEBUG: Found {} interrupted jobs to resume", interrupted.len());
for job_record in interrupted {
if let Ok(job_id) = job_record.id.parse::<Uuid>().map(JobId) {
warn!("DEBUG: Processing interrupted job {}: {} with status {}", job_id, job_record.name, job_record.status);
info!("Resuming job {}: {}", job_id, job_record.name);
// Deserialize job from binary data
warn!("DEBUG: Attempting to deserialize job {} of type {}", job_id, job_record.name);
match REGISTRY.deserialize_job(&job_record.name, &job_record.state) {
Ok(erased_job) => {
warn!("DEBUG: Successfully deserialized job {}", job_id);
// Create channels for the resumed job
let (status_tx, status_rx) = watch::channel(JobStatus::Paused);
let (progress_tx, progress_rx) = mpsc::unbounded_channel();
@@ -1005,14 +1017,16 @@ impl JobManager {
// Get the final output from the handle
let output = {
let jobs = running_jobs.read().await;
jobs.get(&job_id_clone)
.and_then(|job| {
job.handle
.output
.blocking_lock()
.clone()
})
.unwrap_or(Ok(JobOutput::Success))
if let Some(job) = jobs.get(&job_id_clone) {
job.handle
.output
.lock()
.await
.clone()
.unwrap_or(Ok(JobOutput::Success))
} else {
Ok(JobOutput::Success)
}
};
// Emit completion event
@@ -1054,6 +1068,33 @@ impl JobManager {
}
});
// Update status to Running after successful dispatch
warn!("DEBUG: Attempting to update resumed job {} status to Running", job_id);
if let Some(running_job) = self.running_jobs.read().await.get(&job_id) {
if let Err(e) = running_job.status_tx.send(JobStatus::Running) {
warn!("Failed to update resumed job status: {}", e);
} else {
warn!("DEBUG: Successfully sent Running status to job {}", job_id);
}
} else {
warn!("DEBUG: Job {} not found in running_jobs when trying to update status", job_id);
}
// Update database status
warn!("DEBUG: Attempting to update database status for job {} to Running", job_id);
use sea_orm::{ActiveModelTrait, ActiveValue::Set};
let mut job_model = database::jobs::ActiveModel {
id: Set(job_id.to_string()),
status: Set(JobStatus::Running.to_string()),
paused_at: Set(None),
..Default::default()
};
if let Err(e) = job_model.update(self.db.conn()).await {
warn!("Failed to update resumed job status in database: {}", e);
} else {
warn!("DEBUG: Successfully updated database status for job {} to Running", job_id);
}
info!("Successfully resumed job {}: {}", job_id, job_record.name);
}
Err(e) => {
@@ -1062,6 +1103,7 @@ impl JobManager {
}
}
Err(e) => {
warn!("DEBUG: Failed to deserialize job {}: {:?}", job_id, e);
error!("Failed to create job {} for resumption: {}", job_id, e);
}
}
@@ -1303,9 +1345,16 @@ impl JobManager {
JobStatus::Completed => {
let output = {
let jobs = running_jobs.read().await;
jobs.get(&job_id_clone)
.and_then(|job| job.handle.output.blocking_lock().clone())
.unwrap_or(Ok(JobOutput::Success))
if let Some(job) = jobs.get(&job_id_clone) {
job.handle
.output
.lock()
.await
.clone()
.unwrap_or(Ok(JobOutput::Success))
} else {
Ok(JobOutput::Success)
}
};
event_bus.emit(Event::JobCompleted {
job_id: job_id_clone.to_string(),
@@ -1455,6 +1504,26 @@ impl JobManager {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
// Close database connection properly
info!("Closing job database connection");
// First, checkpoint the WAL file to merge it back into the main database
use sea_orm::{ConnectionTrait, Statement};
if let Err(e) = self.db.conn().execute(Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
"PRAGMA wal_checkpoint(TRUNCATE)",
)).await {
warn!("Failed to checkpoint job database WAL file: {}", e);
} else {
info!("Job database WAL file checkpointed successfully");
}
if let Err(e) = self.db.conn().clone().close().await {
warn!("Failed to close job database connection: {}", e);
} else {
info!("Job database connection closed successfully");
}
Ok(())
}
}

View File

@@ -58,6 +58,11 @@ pub trait JobHandler: Job {
async fn on_cancel(&mut self, _ctx: &JobContext<'_>) -> JobResult {
Ok(())
}
/// Check if this job is resuming from a previous state (optional)
fn is_resuming(&self) -> bool {
false // Default implementation for non-resumable jobs
}
}
/// Trait for jobs that can be serialized

View File

@@ -231,6 +231,11 @@ impl LibraryManager {
libraries.insert(config.id, library.clone());
}
// Now that the library is registered in the context, resume interrupted jobs
if let Err(e) = library.jobs.resume_interrupted_jobs_after_load().await {
warn!("Failed to resume interrupted jobs for library {}: {}", config.id, e);
}
// Note: Sidecar manager initialization should be done by the Core when libraries are loaded
// This allows Core to pass its services reference

View File

@@ -158,6 +158,27 @@ impl Library {
let config = self.config.read().await;
self.save_config(&*config).await?;
// Close library database connection properly
use tracing::{info, warn};
info!("Closing library database connection");
// First, checkpoint the WAL file to merge it back into the main database
use sea_orm::{ConnectionTrait, Statement};
if let Err(e) = self.db.as_ref().conn().execute(Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
"PRAGMA wal_checkpoint(TRUNCATE)",
)).await {
warn!("Failed to checkpoint WAL file: {}", e);
} else {
info!("WAL file checkpointed successfully");
}
if let Err(e) = self.db.as_ref().conn().clone().close().await {
warn!("Failed to close library database connection: {}", e);
} else {
info!("Library database connection closed successfully");
}
Ok(())
}

View File

@@ -134,6 +134,14 @@ impl ChangeDetector {
self.path_to_entry.len()
));
// DEBUG: Log if we failed to load entries
use tracing::warn;
if self.path_to_entry.is_empty() {
warn!("DEBUG: ChangeDetector loaded 0 entries - database may be locked or empty");
} else {
warn!("DEBUG: ChangeDetector loaded {} entries successfully", self.path_to_entry.len());
}
Ok(())
}
@@ -166,6 +174,9 @@ impl ChangeDetector {
if old_path != path {
// Same inode, different path - it's a move
if let Some(db_entry) = self.path_to_entry.get(old_path) {
// DEBUG: Log false move detection
use tracing::warn;
warn!("DEBUG: Detected move - old: {:?}, new: {:?}, inode: {}", old_path, path, inode_val);
return Some(Change::Moved {
old_path: old_path.clone(),
new_path: path.to_path_buf(),

View File

@@ -12,6 +12,7 @@ use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tracing::warn;
use uuid::Uuid;
use super::{
@@ -213,7 +214,7 @@ pub struct IndexerJob {
pub config: IndexerJobConfig,
// Resumable state
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none", default)]
state: Option<IndexerState>,
// Ephemeral storage for non-persistent jobs
@@ -273,6 +274,8 @@ impl JobHandler for IndexerJob {
let state = match &mut self.state {
Some(state) => {
ctx.log("Resuming indexer from saved state");
warn!("DEBUG: Resumed state - phase: {:?}, entry_batches: {}, entries_for_content: {}",
state.phase, state.entry_batches.len(), state.entries_for_content.len());
state
}
None => {
@@ -321,6 +324,7 @@ impl JobHandler for IndexerJob {
ctx.check_interrupt().await?;
let current_phase = state.phase.clone();
warn!("DEBUG: IndexerJob entering phase: {:?}", current_phase);
match current_phase {
Phase::Discovery => {
// Use scope-aware discovery
@@ -347,6 +351,7 @@ impl JobHandler for IndexerJob {
}
Phase::Processing => {
warn!("DEBUG: IndexerJob starting Processing phase");
if self.config.is_ephemeral() {
let ephemeral_index = self.ephemeral_index.clone().ok_or_else(|| {
JobError::execution("Ephemeral index not initialized".to_string())
@@ -419,8 +424,10 @@ impl JobHandler for IndexerJob {
// Checkpoint after each phase (only for persistent jobs)
if !self.config.is_ephemeral() {
warn!("DEBUG: IndexerJob checkpointing after phase: {:?}", state.phase);
ctx.checkpoint().await?;
}
warn!("DEBUG: IndexerJob completed phase: {:?}, next phase will be: {:?}", current_phase, state.phase);
}
// Send final progress update
@@ -463,7 +470,9 @@ impl JobHandler for IndexerJob {
async fn on_resume(&mut self, ctx: &JobContext<'_>) -> JobResult {
// State is already loaded from serialization
warn!("DEBUG: IndexerJob on_resume called");
if let Some(state) = &self.state {
warn!("DEBUG: IndexerJob has state, resuming in {:?} phase", state.phase);
ctx.log(format!("Resuming indexer in {:?} phase", state.phase));
ctx.log(format!(
"Progress: {} files, {} dirs, {} errors so far",
@@ -472,6 +481,8 @@ impl JobHandler for IndexerJob {
// Reinitialize timer for resumed job
self.timer = Some(PhaseTimer::new());
} else {
warn!("DEBUG: IndexerJob has no state during resume!");
}
Ok(())
}
@@ -491,6 +502,11 @@ impl JobHandler for IndexerJob {
}
Ok(())
}
fn is_resuming(&self) -> bool {
// If we have existing state, we're resuming
self.state.is_some()
}
}
impl IndexerJob {

View File

@@ -15,6 +15,7 @@ use crate::{
};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use std::path::Path;
use tracing::warn;
use uuid::Uuid;
/// Run the processing phase of indexing
@@ -31,6 +32,12 @@ pub async fn run_processing_phase(
total_batches
));
if total_batches == 0 {
ctx.log("No batches to process - transitioning to Aggregation phase");
state.phase = crate::ops::indexing::state::Phase::Aggregation;
return Ok(());
}
// Get the actual location record from database
let location_record = entities::location::Entity::find()
.filter(entities::location::Column::Uuid.eq(location_id))
@@ -165,6 +172,9 @@ pub async fn run_processing_phase(
for entry in batch {
// Check for interruption during batch processing
ctx.check_interrupt().await?;
// Add to seen_paths for delete detection (important for resumed jobs)
state.seen_paths.insert(entry.path.clone());
// Get metadata for change detection
let metadata = match std::fs::metadata(&entry.path) {
Ok(m) => m,

View File

@@ -94,11 +94,17 @@ impl LibraryAction for LocationAddAction {
None
};
Ok(LocationAddOutput::new(
let mut output = LocationAddOutput::new(
location_id,
self.input.path,
self.input.name,
))
);
if let Some(job_id) = job_id {
output = output.with_job_id(job_id);
}
Ok(output)
}
fn action_kind(&self) -> &'static str {

View File

@@ -1,267 +0,0 @@
//! Integration test for job pause/resume functionality
use sd_core::{
infra::db::entities,
infra::job::types::{JobId, JobStatus},
location::{create_location, IndexMode, LocationCreateArgs},
Core,
};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::sleep;
#[tokio::test]
async fn test_pause_and_resume_indexing_job() -> Result<(), Box<dyn std::error::Error>> {
// Setup test environment
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create library
let library = core
.libraries
.create_library("Test Pause Resume Library", None, core.context.clone())
.await?;
// Create test location directory with many files
let test_location_dir = temp_dir.path().join("test_location");
tokio::fs::create_dir_all(&test_location_dir).await?;
// Create many test files to ensure job runs long enough
for i in 0..100 {
let file_path = test_location_dir.join(format!("test_file_{}.txt", i));
tokio::fs::write(&file_path, format!("Test content {}", i)).await?;
}
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// Create location to trigger indexing
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Test Location".to_string()),
index_mode: IndexMode::Deep,
};
let _location_db_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// Get the indexing job that was created
let job_manager = library.jobs();
// Wait a bit for job to be created and start
sleep(Duration::from_millis(200)).await;
// Get running jobs
let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?;
assert!(
!running_jobs.is_empty(),
"Should have a running indexing job"
);
let job_info = &running_jobs[0];
let job_id = JobId(job_info.id);
// Wait a bit for job to start processing
sleep(Duration::from_millis(500)).await;
// Pause the job
job_manager.pause_job(job_id).await?;
// Wait for pause to take effect
sleep(Duration::from_millis(200)).await;
// Check job is paused
let job_info = job_manager.get_job_info(job_id.0).await?.unwrap();
assert_eq!(job_info.status, JobStatus::Paused, "Job should be paused");
// Record progress when paused
let paused_progress = job_info.progress;
assert!(paused_progress > 0.0, "Should have made some progress");
assert!(paused_progress < 100.0, "Should not be complete");
// Wait a bit to ensure no progress is made while paused
sleep(Duration::from_millis(500)).await;
// Check progress hasn't changed
let job_info = job_manager.get_job_info(job_id.0).await?.unwrap();
assert_eq!(
job_info.progress, paused_progress,
"Progress should not change while paused"
);
// Resume the job
job_manager.resume_job(job_id).await?;
// Wait for job to complete
let mut retries = 0;
loop {
let job_info = job_manager.get_job_info(job_id.0).await?.unwrap();
match job_info.status {
JobStatus::Completed => {
assert!(job_info.progress >= 99.0, "Job should be complete");
break;
}
JobStatus::Failed => {
panic!("Job failed: {:?}", job_info.error_message);
}
_ => {
if retries > 100 {
panic!("Job did not complete in time");
}
retries += 1;
sleep(Duration::from_millis(100)).await;
}
}
}
// Verify files were indexed
use sea_orm::PaginatorTrait;
let indexed_count = entities::entry::Entity::find().count(db.conn()).await?;
assert!(indexed_count > 0, "Files should be indexed");
Ok(())
}
#[tokio::test]
async fn test_pause_paused_job_error() -> Result<(), Box<dyn std::error::Error>> {
// Setup test environment
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create library
let library = core
.libraries
.create_library("Test Pause Error Library", None, core.context.clone())
.await?;
// Create test location
let test_location_dir = temp_dir.path().join("test_location");
tokio::fs::create_dir_all(&test_location_dir).await?;
tokio::fs::write(test_location_dir.join("test.txt"), "content").await?;
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_model: entities::device::ActiveModel = device.into();
let device_record = device_model.insert(db.conn()).await?;
// Create location
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Test Location".to_string()),
index_mode: IndexMode::Deep,
};
create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// Get the job
let job_manager = library.jobs();
sleep(Duration::from_millis(200)).await;
let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?;
let job_id = JobId(running_jobs[0].id);
// Pause the job
job_manager.pause_job(job_id).await?;
sleep(Duration::from_millis(100)).await;
// Try to pause again - should fail
let result = job_manager.pause_job(job_id).await;
assert!(
result.is_err(),
"Should not be able to pause an already paused job"
);
assert!(result
.unwrap_err()
.to_string()
.contains("Cannot pause job in Paused state"));
Ok(())
}
#[tokio::test]
async fn test_resume_running_job_error() -> Result<(), Box<dyn std::error::Error>> {
// Setup test environment
let temp_dir = TempDir::new()?;
let core = Core::new_with_config(temp_dir.path().to_path_buf()).await?;
// Create library
let library = core
.libraries
.create_library("Test Resume Error Library", None, core.context.clone())
.await?;
// Create test location with multiple files
let test_location_dir = temp_dir.path().join("test_location");
tokio::fs::create_dir_all(&test_location_dir).await?;
for i in 0..10 {
let file_path = test_location_dir.join(format!("test_file_{}.txt", i));
tokio::fs::write(&file_path, format!("Test content {}", i)).await?;
}
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_model: entities::device::ActiveModel = device.into();
let device_record = device_model.insert(db.conn()).await?;
// Create location
let location_args = LocationCreateArgs {
path: test_location_dir.clone(),
name: Some("Test Location".to_string()),
index_mode: IndexMode::Deep,
};
create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
// Get the running job
let job_manager = library.jobs();
sleep(Duration::from_millis(200)).await;
let running_jobs = job_manager.list_jobs(Some(JobStatus::Running)).await?;
let job_id = JobId(running_jobs[0].id);
// Try to resume a running job - should fail
let result = job_manager.resume_job(job_id).await;
assert!(
result.is_err(),
"Should not be able to resume a running job"
);
assert!(result
.unwrap_err()
.to_string()
.contains("Cannot resume job in Running state"));
Ok(())
}

View File

@@ -0,0 +1,469 @@
//! Integration test for job resumption at various interruption points
//!
//! This test generates benchmark data and tests job resumption by interrupting
//! indexing jobs at different phases and progress points, then verifying they
//! can resume and complete successfully.
use sd_core::{
infra::action::LibraryAction,
ops::{
indexing::IndexMode,
locations::add::{action::LocationAddAction, action::LocationAddInput},
},
};
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::mpsc,
time::{sleep, timeout},
};
use tracing::{info, warn};
use uuid::Uuid;
/// Test data directory in the repo for inspection
const TEST_DATA_DIR: &str = "data";
/// Benchmark recipe name to use for test data generation
const TEST_RECIPE_NAME: &str = "shape_medium";
/// Path where the benchmark data will be generated (relative to workspace root)
const TEST_INDEXING_DATA_PATH: &str = "benchdata/shape_medium";
/// Different interruption points to test
#[derive(Debug, Clone)]
enum InterruptionPoint {
/// Interrupt during discovery phase at X% progress
Discovery(u8),
/// Interrupt during processing phase at X% progress
Processing(u8),
/// Interrupt during content identification at X% progress
ContentIdentification(u8),
/// Interrupt during aggregation phase
Aggregation,
}
/// Test result for a single interruption scenario
#[derive(Debug)]
struct TestResult {
interruption_point: InterruptionPoint,
success: bool,
error: Option<String>,
job_log_path: Option<PathBuf>,
daemon_log_path: Option<PathBuf>,
}
/// Main integration test
#[tokio::test]
async fn test_job_resumption_at_various_points() {
// Initialize tracing for test debugging
let _ = tracing_subscriber::fmt()
.with_env_filter("warn,sd_core=info")
.try_init();
info!("Starting job resumption integration test");
// Generate benchmark data (or use existing data)
info!("Preparing test data");
let indexing_data_path = generate_test_data().await.expect("Failed to prepare test data");
// Define interruption points to test
// For quick testing, comment out all but one interruption point
let interruption_points = vec![
InterruptionPoint::ContentIdentification(30),
// InterruptionPoint::Discovery(25),
// InterruptionPoint::Discovery(75),
// InterruptionPoint::Processing(10),
// InterruptionPoint::Processing(50),
// InterruptionPoint::Processing(90),
// InterruptionPoint::ContentIdentification(80),
// InterruptionPoint::Aggregation,
];
let mut results = Vec::new();
let total_tests = interruption_points.len();
// Test each interruption point
for (i, interruption_point) in interruption_points.into_iter().enumerate() {
info!("Testing interruption point {:?} ({}/{})", interruption_point, i + 1, total_tests);
let result = test_single_interruption_point(
&indexing_data_path,
interruption_point.clone(),
i,
).await;
results.push(result);
// Brief pause between tests
sleep(Duration::from_secs(2)).await;
}
// Analyze results
analyze_test_results(&results);
// Assert all tests passed
let failed_tests: Vec<_> = results.iter().filter(|r| !r.success).collect();
if !failed_tests.is_empty() {
panic!(
"Job resumption test failed at {} interruption points: {:#?}",
failed_tests.len(),
failed_tests
);
}
info!("All job resumption tests passed! 🎉");
}
/// Generate test data using benchmark data generation
async fn generate_test_data() -> Result<PathBuf, Box<dyn std::error::Error>> {
use std::process::Command;
let current_dir = std::env::current_dir()?;
info!("Current directory: {}", current_dir.display());
// Use relative path from workspace root (tests run from core/ directory)
let indexing_data_path = if current_dir.ends_with("core") {
current_dir.parent().unwrap().join(TEST_INDEXING_DATA_PATH)
} else {
current_dir.join(TEST_INDEXING_DATA_PATH)
};
// Check if data already exists
if indexing_data_path.exists() && indexing_data_path.is_dir() {
// Check if directory has files
let entries: Vec<_> = std::fs::read_dir(&indexing_data_path)?
.collect::<Result<Vec<_>, _>>()?;
if !entries.is_empty() {
info!("Test data already exists at: {}, skipping generation", indexing_data_path.display());
return Ok(indexing_data_path);
}
}
// Run benchmark data generation using existing recipe
info!("Generating test data using recipe: {}", TEST_RECIPE_NAME);
let recipe_path = current_dir.join("benchmarks/recipes").join(format!("{}.yaml", TEST_RECIPE_NAME));
info!("Recipe path: {}", recipe_path.display());
let output = Command::new("cargo")
.args([
"run", "-p", "sd-bench", "--bin", "sd-bench", "--",
"mkdata",
"--recipe", recipe_path.to_str().unwrap(),
])
.current_dir(&current_dir)
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
return Err(format!(
"Benchmark data generation failed:\nSTDOUT: {}\nSTDERR: {}",
stdout, stderr
).into());
}
info!("Generated test data at: {}", indexing_data_path.display());
Ok(indexing_data_path)
}
/// Test a single interruption point scenario
async fn test_single_interruption_point(
indexing_data_path: &PathBuf,
interruption_point: InterruptionPoint,
test_index: usize,
) -> TestResult {
let test_name = format!("test_{:02}_{:?}", test_index, interruption_point);
let test_data_path = PathBuf::from(TEST_DATA_DIR);
let core_data_path = test_data_path.join(&test_name);
// Clean core data directory for this test
if core_data_path.exists() {
let _ = std::fs::remove_dir_all(&core_data_path);
}
std::fs::create_dir_all(&core_data_path).expect("Failed to create core data directory");
info!("Testing {} with data at {}", test_name, indexing_data_path.display());
// Phase 1: Start indexing and interrupt at specified point
let interrupt_result = start_and_interrupt_job(
&core_data_path,
indexing_data_path,
&interruption_point,
).await;
let job_id = match interrupt_result {
Ok(job_id) => job_id,
Err(error) => {
return TestResult {
interruption_point,
success: false,
error: Some(format!("Failed to interrupt job: {}", error)),
job_log_path: None,
daemon_log_path: None,
};
}
};
// Brief pause to ensure clean shutdown
sleep(Duration::from_secs(1)).await;
// Phase 2: Resume and complete the job
let resume_result = resume_and_complete_job(
&core_data_path,
indexing_data_path,
job_id,
).await;
match resume_result {
Ok((job_log_path, daemon_log_path)) => TestResult {
interruption_point,
success: true,
error: None,
job_log_path: Some(job_log_path),
daemon_log_path: Some(daemon_log_path),
},
Err(error) => TestResult {
interruption_point,
success: false,
error: Some(format!("Failed to resume job: {}", error)),
job_log_path: None,
daemon_log_path: None,
},
}
}
/// Start indexing job and interrupt at specified point
async fn start_and_interrupt_job(
core_data_path: &PathBuf,
indexing_data_path: &PathBuf,
interruption_point: &InterruptionPoint,
) -> Result<Uuid, Box<dyn std::error::Error>> {
info!("Starting job and waiting for interruption point: {:?}", interruption_point);
// Create core with isolated data directory
let core = sd_core::Core::new_with_config(core_data_path.clone()).await?;
let core_context = core.context.clone();
// Create library
let library = core_context.libraries().await
.create_library("Test Library".to_string(), None, core_context.clone())
.await?;
// Create location add action to automatically trigger indexing
let location_input = LocationAddInput {
path: indexing_data_path.clone(),
name: Some("Test Location".to_string()),
mode: IndexMode::Content,
};
let location_action = LocationAddAction::from_input(location_input)
.map_err(|e| format!("Failed to create location action: {}", e))?;
// Dispatch the location add action through the action manager
let action_manager = core_context.action_manager.read().await;
let action_manager = action_manager.as_ref()
.ok_or("Action manager not initialized")?;
let location_output = action_manager
.dispatch_library(Some(library.id()), location_action)
.await
.map_err(|e| format!("Failed to dispatch location add action: {}", e))?;
// The location add action automatically creates an indexing job
let job_id = location_output.job_id
.ok_or("Location add action did not return a job ID")?;
// Set up event monitoring
let (interrupt_tx, mut interrupt_rx) = mpsc::channel(1);
let should_interrupt = Arc::new(AtomicBool::new(false));
let should_interrupt_clone = should_interrupt.clone();
// Monitor events for interruption point
let mut event_rx = core_context.events.subscribe();
let interruption_point_clone = interruption_point.clone();
tokio::spawn(async move {
while let Ok(event) = event_rx.recv().await {
if let sd_core::infra::event::Event::JobProgress { job_id: event_job_id, progress, message, .. } = event {
if event_job_id == job_id.to_string() {
let message_str = message.as_deref().unwrap_or("");
info!("Job progress: {}% - {}", progress * 100.0, message_str);
let should_interrupt_now = match &interruption_point_clone {
InterruptionPoint::Discovery(target_percent) => {
// Interrupt during discovery phase if we're at or past target percentage
message_str.contains("Discovery") && progress >= (*target_percent as f64 * 0.01)
},
InterruptionPoint::Processing(target_percent) => {
// Interrupt during processing phase if we're at or past target percentage
message_str.contains("Processing") && progress >= (*target_percent as f64 * 0.01)
},
InterruptionPoint::ContentIdentification(target_percent) => {
// Interrupt during content identification if we're at or past target percentage
(message_str.contains("Content") || message_str.contains("content identities")) &&
progress >= (*target_percent as f64 * 0.01)
},
InterruptionPoint::Aggregation => {
// Interrupt as soon as we enter aggregation phase
message_str.contains("Aggregation")
},
};
if should_interrupt_now && !should_interrupt_clone.load(Ordering::Relaxed) {
info!("Triggering interrupt at {}% in phase: {}",
progress, message_str);
should_interrupt_clone.store(true, Ordering::Relaxed);
let _ = interrupt_tx.send(()).await;
}
}
}
}
});
// Wait for interruption point or timeout
let interrupt_timeout = timeout(Duration::from_secs(120), interrupt_rx.recv()).await;
match interrupt_timeout {
Ok(Some(())) => {
info!("Interruption point reached, shutting down core");
// Shutdown core gracefully
core.shutdown().await?;
Ok(job_id)
},
Ok(None) => Err("Interrupt channel closed unexpectedly".into()),
Err(_) => Err("Timeout waiting for interruption point".into()),
}
}
/// Resume and complete the interrupted job
async fn resume_and_complete_job(
core_data_path: &PathBuf,
_indexing_data_path: &PathBuf,
job_id: Uuid,
) -> Result<(PathBuf, PathBuf), Box<dyn std::error::Error>> {
info!("Resuming job {} and waiting for completion", job_id);
// Create core again (simulating daemon restart)
let core = sd_core::Core::new_with_config(core_data_path.clone()).await?;
let core_context = core.context.clone();
// Get the library (should auto-load)
let libraries = core_context.libraries().await.list().await;
let _library = libraries.first()
.ok_or("No library found after restart")?;
// Set up completion monitoring
let (completion_tx, mut completion_rx) = mpsc::channel(1);
let job_completed = Arc::new(AtomicBool::new(false));
let job_completed_clone = job_completed.clone();
// Monitor for job completion
let mut event_rx = core_context.events.subscribe();
tokio::spawn(async move {
while let Ok(event) = event_rx.recv().await {
match event {
sd_core::infra::event::Event::JobCompleted { job_id: event_job_id, .. } => {
if event_job_id == job_id.to_string() {
info!("Job {} completed successfully", job_id);
job_completed_clone.store(true, Ordering::Relaxed);
let _ = completion_tx.send(Ok(())).await;
break;
}
},
sd_core::infra::event::Event::JobFailed { job_id: event_job_id, error, .. } => {
if event_job_id == job_id.to_string() {
warn!("Job {} failed: {}", job_id, error);
let _ = completion_tx.send(Err(error)).await;
break;
}
},
_ => {}
}
}
});
// Wait for completion or timeout
let completion_timeout = timeout(Duration::from_secs(300), completion_rx.recv()).await;
match completion_timeout {
Ok(Some(Ok(()))) => {
info!("Job completed successfully");
// Collect log paths for inspection
let job_log_path = core_data_path.join("job_logs").join(format!("{}.log", job_id));
let daemon_log_path = core_data_path.join("daemon.log");
// Shutdown core
core.shutdown().await?;
Ok((job_log_path, daemon_log_path))
},
Ok(Some(Err(error))) => {
core.shutdown().await?;
Err(format!("Job failed during resumption: {}", error).into())
},
Ok(None) => {
core.shutdown().await?;
Err("Completion channel closed unexpectedly".into())
},
Err(_) => {
core.shutdown().await?;
Err("Timeout waiting for job completion".into())
},
}
}
/// Analyze and report test results
fn analyze_test_results(results: &[TestResult]) {
info!("=== Job Resumption Test Results ===");
let total_tests = results.len();
let passed_tests = results.iter().filter(|r| r.success).count();
let failed_tests = total_tests - passed_tests;
info!("Total tests: {}", total_tests);
info!("Passed: {}", passed_tests);
info!("Failed: {}", failed_tests);
if failed_tests > 0 {
warn!("Failed test details:");
for result in results.iter().filter(|r| !r.success) {
warn!(" {:?}: {}", result.interruption_point,
result.error.as_ref().unwrap_or(&"Unknown error".to_string()));
if let Some(job_log) = &result.job_log_path {
warn!(" Job log: {}", job_log.display());
}
if let Some(daemon_log) = &result.daemon_log_path {
warn!(" Daemon log: {}", daemon_log.display());
}
}
}
// Group results by interruption type
let mut by_phase = std::collections::HashMap::new();
for result in results {
let phase = match &result.interruption_point {
InterruptionPoint::Discovery(_) => "Discovery",
InterruptionPoint::Processing(_) => "Processing",
InterruptionPoint::ContentIdentification(_) => "Content Identification",
InterruptionPoint::Aggregation => "Aggregation",
};
by_phase.entry(phase).or_insert_with(Vec::new).push(result);
}
info!("Results by phase:");
for (phase, phase_results) in by_phase {
let phase_passed = phase_results.iter().filter(|r| r.success).count();
let phase_total = phase_results.len();
info!(" {}: {}/{} passed", phase, phase_passed, phase_total);
}
info!("Test data and logs available in: {}", TEST_DATA_DIR);
}