diff --git a/core/src/encode/thumb.rs b/core/src/encode/thumb.rs index 6c30786b8..e26aa1e41 100644 --- a/core/src/encode/thumb.rs +++ b/core/src/encode/thumb.rs @@ -1,9 +1,7 @@ use crate::{ api::{locations::LocationExplorerArgs, CoreEvent, LibraryArgs}, invalidate_query, - job::{ - JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, + job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::LibraryContext, prisma::{file_path, location}, }; @@ -95,7 +93,7 @@ impl StatefulJob for ThumbnailJob { }); state.steps = image_files.into_iter().collect(); - Ok(()) + Ok(None) } async fn execute_step( @@ -128,12 +126,12 @@ impl StatefulJob for ThumbnailJob { "skipping thumbnail generation for {}", step.materialized_path ); - return Ok(()); + return Ok(None); } } Err(_) => { error!("Error getting cas_id {:?}", step.materialized_path); - return Ok(()); + return Ok(None); } }; @@ -175,14 +173,14 @@ impl StatefulJob for ThumbnailJob { state.step_number + 1, )]); - Ok(()) + Ok(None) } async fn finalize( &self, _ctx: WorkerContext, state: &mut JobState, - ) -> Result { + ) -> JobResult { let data = state .data .as_ref() diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index d7f3c1651..db13406e0 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -1,7 +1,5 @@ use crate::{ - job::{ - JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, + job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::LibraryContext, prisma::{file, file_path, location}, }; @@ -126,7 +124,7 @@ impl StatefulJob for FileIdentifierJob { }); state.steps = (0..task_count).map(|_| ()).collect(); - Ok(()) + Ok(None) } async fn execute_step( @@ -290,14 +288,14 @@ impl StatefulJob for FileIdentifierJob { ]); // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; - Ok(()) + Ok(None) } async fn finalize( &self, _ctx: WorkerContext, state: &mut JobState, - ) -> Result { + ) -> JobResult { let data = state .data .as_ref() diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 5eec57eb4..c11837c1c 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -42,7 +42,7 @@ pub enum JobError { Paused(Vec), } -pub type JobResult = Result<(), JobError>; +pub type JobResult = Result; pub type JobMetadata = Option>; #[async_trait::async_trait] @@ -68,7 +68,7 @@ pub trait StatefulJob: Send + Sync { &self, ctx: WorkerContext, state: &mut JobState, - ) -> Result; + ) -> JobResult; } #[async_trait::async_trait] @@ -184,12 +184,8 @@ where self.state.step_number += 1; } - // It is ok to unwrap here, a running job will always have a report. - self.report.as_mut().unwrap().metadata = self - .stateful_job + self.stateful_job .finalize(ctx.clone(), &mut self.state) - .await?; - - Ok(()) + .await } } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index e94bd61d5..99bb540a4 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -1,4 +1,4 @@ -use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus}; +use crate::job::{DynJob, JobError, JobManager, JobMetadata, JobReportUpdate, JobStatus}; use crate::library::LibraryContext; use crate::{api::LibraryArgs, invalidate_query}; use std::{sync::Arc, time::Duration}; @@ -22,7 +22,7 @@ pub enum WorkerEvent { updates: Vec, debounce: bool, }, - Completed(oneshot::Sender<()>), + Completed(oneshot::Sender<()>, JobMetadata), Failed(oneshot::Sender<()>), Paused(Vec, oneshot::Sender<()>), } @@ -151,25 +151,27 @@ impl Worker { let (done_tx, done_rx) = oneshot::channel(); - if let Err(e) = job.run(worker_ctx.clone()).await { - if let JobError::Paused(state) = e { + match job.run(worker_ctx.clone()).await { + Ok(metadata) => { + // handle completion + worker_ctx + .events_tx + .send(WorkerEvent::Completed(done_tx, metadata)) + .expect("critical error: failed to send worker complete event"); + } + Err(JobError::Paused(state)) => { worker_ctx .events_tx .send(WorkerEvent::Paused(state, done_tx)) .expect("critical error: failed to send worker pause event"); - } else { + } + Err(e) => { error!("job '{}' failed with error: {:#?}", job_id, e); worker_ctx .events_tx .send(WorkerEvent::Failed(done_tx)) .expect("critical error: failed to send worker fail event"); } - } else { - // handle completion - worker_ctx - .events_tx - .send(WorkerEvent::Completed(done_tx)) - .expect("critical error: failed to send worker complete event"); } if let Err(e) = done_rx.await { @@ -228,9 +230,10 @@ impl Worker { LibraryArgs::new(library.id, ()) ); } - WorkerEvent::Completed(done_tx) => { + WorkerEvent::Completed(done_tx, metadata) => { worker.report.status = JobStatus::Completed; worker.report.data = None; + worker.report.metadata = metadata; if let Err(e) = worker.report.update(&library).await { error!("failed to update job report: {:#?}", e); } diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 3618e9d20..7c0615c1b 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -1,7 +1,5 @@ use crate::{ - job::{ - JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, - }, + job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, prisma::{file_path, location}, }; @@ -221,7 +219,7 @@ impl StatefulJob for IndexerJob { }) .collect(); - Ok(()) + Ok(None) } /// Process each chunk of entries in the indexer job, writing to the `file_path` table @@ -284,7 +282,7 @@ impl StatefulJob for IndexerJob { info!("Inserted {count} records"); - Ok(()) + Ok(None) } /// Logs some metadata about the indexer job @@ -292,7 +290,7 @@ impl StatefulJob for IndexerJob { &self, _ctx: WorkerContext, state: &mut JobState, - ) -> Result { + ) -> JobResult { let data = state .data .as_ref()