From 697dfffe3731e8e995639bbdb164bc2cabc22514 Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Sat, 24 Jun 2023 04:23:30 -0300 Subject: [PATCH] [ENG-670 / ENG-774] Resumable and Pausable jobs / Queued jobs don't show in the job manager (#961) * Fixing some warnings * Optimizing job workers * Removing the delay from getting job reports * remove some commented out stuff * Trying to optimize job report route * fix thread 'tokio-runtime-worker' panicked at 'attempt to subtract with overflow' * fix progress bar * Now pause works from the UI * Fix * Now the worker set job report to paused * show errors for job pause/resume * bunch 'o ui fixes * order location in the sidebar * fix some text * fix clear all jobs * fix clear jobs & job group ui * show queued jobs * object validator job text * make cancel button work * better executable logo * Now cancel button works instantly * disable resume button * remove disabled props from pause/resume buttons * remove large comments * show paused progress --------- Co-authored-by: James Pine Co-authored-by: Jamie Pine <32987599+jamiepine@users.noreply.github.com> --- core/src/api/jobs.rs | 91 ++- core/src/api/locations.rs | 3 +- core/src/api/search.rs | 4 +- core/src/job/error.rs | 17 +- core/src/job/manager.rs | 198 +++-- core/src/job/mod.rs | 739 ++++++++++++++--- core/src/job/report.rs | 83 +- core/src/job/worker.rs | 755 ++++++++++-------- core/src/lib.rs | 14 +- core/src/library/library.rs | 2 +- core/src/library/manager.rs | 2 +- core/src/location/indexer/indexer_job.rs | 322 +++++--- core/src/location/indexer/mod.rs | 112 +-- core/src/location/manager/mod.rs | 2 +- core/src/location/mod.rs | 5 + .../file_identifier/file_identifier_job.rs | 148 ++-- core/src/object/file_identifier/mod.rs | 34 +- core/src/object/file_identifier/shallow.rs | 5 +- core/src/object/fs/copy.rs | 242 +++--- core/src/object/fs/cut.rs | 78 +- core/src/object/fs/decrypt.rs | 2 +- core/src/object/fs/delete.rs | 51 +- core/src/object/fs/encrypt.rs | 2 +- core/src/object/fs/erase.rs | 99 ++- core/src/object/fs/mod.rs | 2 +- core/src/object/preview/thumbnail/mod.rs | 80 +- .../preview/thumbnail/thumbnailer_job.rs | 136 +++- core/src/object/validation/validator_job.rs | 101 +-- .../$libraryId/Explorer/File/ContextMenu.tsx | 31 +- .../Layout/Sidebar/JobManager/Job.tsx | 41 +- .../Sidebar/JobManager/JobContainer.tsx | 16 +- .../Layout/Sidebar/JobManager/JobGroup.tsx | 124 ++- .../JobManager/useGroupJobTimeText.tsx | 2 +- .../Sidebar/JobManager/useGroupedJobs.ts | 129 --- .../Layout/Sidebar/JobManager/useJobInfo.tsx | 90 +-- .../settings/node/libraries/DeleteDialog.tsx | 4 +- interface/app/style.scss | 36 + interface/hooks/useExplorerStore.tsx | 4 +- packages/assets/icons/Executable.png | Bin 90409 -> 52094 bytes packages/client/src/core.ts | 1 + packages/ui/src/ProgressBar.tsx | 15 +- 41 files changed, 2240 insertions(+), 1582 deletions(-) delete mode 100644 interface/app/$libraryId/Layout/Sidebar/JobManager/useGroupedJobs.ts diff --git a/core/src/api/jobs.rs b/core/src/api/jobs.rs index 20a3e9ae3..b5b08e9ef 100644 --- a/core/src/api/jobs.rs +++ b/core/src/api/jobs.rs @@ -11,16 +11,17 @@ use crate::{ }; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, VecDeque}, path::PathBuf, }; use chrono::{DateTime, Utc}; +use prisma_client_rust::or; use rspc::alpha::AlphaRouter; use serde::{Deserialize, Serialize}; use specta::Type; use tokio::time::{interval, Duration}; -use tracing::trace; +use tracing::{info, trace}; use uuid::Uuid; use super::{utils::library, CoreEvent, Ctx, R}; @@ -70,16 +71,16 @@ pub(crate) fn mount() -> AlphaRouter { // - TODO: refactor grouping system to a many-to-many table #[derive(Debug, Clone, Serialize, Deserialize, Type)] pub struct JobGroup { - id: String, + id: Uuid, action: Option, status: JobStatus, created_at: DateTime, - jobs: Vec, + jobs: VecDeque, } #[derive(Debug, Clone, Serialize, Deserialize, Type)] pub struct JobGroups { groups: Vec, - index: HashMap, // maps job ids to their group index + index: HashMap, // maps job ids to their group index } R.with2(library()) .query(|(ctx, library), _: ()| async move { @@ -98,7 +99,7 @@ pub(crate) fn mount() -> AlphaRouter { .flat_map(JobReport::try_from) .collect(); - let active_reports = ctx.jobs.get_active_reports().await; + let active_reports_by_id = ctx.job_manager.get_active_reports_with_id().await; for job in job_reports { // action name and group key are computed from the job data @@ -112,30 +113,33 @@ pub(crate) fn mount() -> AlphaRouter { ); // if the job is running, use the in-memory report - let memory_job = active_reports.values().find(|j| j.id == job.id); - let report = match memory_job { - Some(j) => j, - None => &job, - }; + let report = active_reports_by_id.get(&job.id).unwrap_or(&job); + // if we have a group key, handle grouping if let Some(group_key) = group_key { match groups.entry(group_key) { // Create new job group with metadata - Entry::Vacant(e) => { - let id = job.parent_id.unwrap_or(job.id); - let group = JobGroup { - id: id.to_string(), + Entry::Vacant(entry) => { + entry.insert(JobGroup { + id: job.parent_id.unwrap_or(job.id), action: Some(action_name.clone()), status: job.status, - jobs: vec![report.clone()], + jobs: [report.clone()].into_iter().collect(), created_at: job.created_at.unwrap_or(Utc::now()), - }; - e.insert(group); + }); } // Add to existing job group - Entry::Occupied(mut e) => { - let group = e.get_mut(); - group.jobs.insert(0, report.clone()); // inserts at the beginning + Entry::Occupied(mut entry) => { + let group = entry.get_mut(); + + // protect paused status from being overwritten + if report.status != JobStatus::Paused { + group.status = report.status; + } + + // if group.status.is_finished() && !report.status.is_finished() { + // } + group.jobs.push_front(report.clone()); } } } else { @@ -143,24 +147,24 @@ pub(crate) fn mount() -> AlphaRouter { groups.insert( job.id.to_string(), JobGroup { - id: job.id.to_string(), + id: job.id, action: None, status: job.status, - jobs: vec![report.clone()], + jobs: [report.clone()].into_iter().collect(), created_at: job.created_at.unwrap_or(Utc::now()), }, ); } } - let mut groups_vec: Vec = groups.into_values().collect(); + let mut groups_vec = groups.into_values().collect::>(); groups_vec.sort_by(|a, b| b.created_at.cmp(&a.created_at)); // Update the index after sorting the groups - let mut index: HashMap = HashMap::new(); + let mut index: HashMap = HashMap::new(); for (i, group) in groups_vec.iter().enumerate() { for job in &group.jobs { - index.insert(job.id.clone().to_string(), i as i32); + index.insert(job.id, i as i32); } } @@ -172,7 +176,7 @@ pub(crate) fn mount() -> AlphaRouter { }) .procedure("isActive", { R.with2(library()).query(|(ctx, _), _: ()| async move { - Ok(!ctx.jobs.get_running_reports().await.is_empty()) + Ok(ctx.job_manager.has_active_workers().await) }) }) .procedure("clear", { @@ -192,13 +196,16 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("clearAll", { R.with2(library()) .mutation(|(_, library), _: ()| async move { + info!("Clearing all jobs"); library .db .job() - .delete_many(vec![ + .delete_many(vec![or![ + job::status::equals(Some(JobStatus::Canceled as i32)), + job::status::equals(Some(JobStatus::Failed as i32)), job::status::equals(Some(JobStatus::Completed as i32)), job::status::equals(Some(JobStatus::CompletedWithErrors as i32)), - ]) + ]]) .exec() .await?; @@ -209,14 +216,32 @@ pub(crate) fn mount() -> AlphaRouter { // pause job .procedure("pause", { R.with2(library()) - .mutation(|(ctx, _), id: Uuid| async move { - JobManager::pause(&ctx.jobs, id).await.map_err(Into::into) + .mutation(|(ctx, library), id: Uuid| async move { + let ret = JobManager::pause(&ctx.job_manager, id) + .await + .map_err(Into::into); + invalidate_query!(library, "jobs.reports"); + ret }) }) .procedure("resume", { R.with2(library()) - .mutation(|(ctx, _), id: Uuid| async move { - JobManager::resume(&ctx.jobs, id).await.map_err(Into::into) + .mutation(|(ctx, library), id: Uuid| async move { + let ret = JobManager::resume(&ctx.job_manager, id) + .await + .map_err(Into::into); + invalidate_query!(library, "jobs.reports"); + ret + }) + }) + .procedure("cancel", { + R.with2(library()) + .mutation(|(ctx, library), id: Uuid| async move { + let ret = JobManager::cancel(&ctx.job_manager, id) + .await + .map_err(Into::into); + invalidate_query!(library, "jobs.reports"); + ret }) }) .procedure("generateThumbsForLocation", { diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index 2ad8dc2c1..9a9309b31 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -5,7 +5,7 @@ use crate::{ location_with_indexer_rules, relink_location, scan_location, LocationCreateArgs, LocationError, LocationUpdateArgs, }, - prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object}, + prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, SortOrder}, util::AbortOnDrop, }; @@ -51,6 +51,7 @@ pub(crate) fn mount() -> AlphaRouter { .db .location() .find_many(vec![]) + .order_by(location::date_created::order(SortOrder::Desc)) .include(location::include!({ node })) .exec() .await?) diff --git a/core/src/api/search.rs b/core/src/api/search.rs index 859b1835d..9750309a7 100644 --- a/core/src/api/search.rs +++ b/core/src/api/search.rs @@ -158,7 +158,7 @@ impl ObjectSearchOrdering { } } -#[derive(Deserialize, Type, Debug, Default)] +#[derive(Deserialize, Type, Debug, Default, Clone, Copy)] #[serde(rename_all = "camelCase")] enum ObjectHiddenFilter { #[default] @@ -167,7 +167,7 @@ enum ObjectHiddenFilter { } impl ObjectHiddenFilter { - fn to_param(&self) -> Option { + fn to_param(self) -> Option { match self { ObjectHiddenFilter::Exclude => Some(or![ object::hidden::equals(None), diff --git a/core/src/job/error.rs b/core/src/job/error.rs index 3f7d4e9fb..e296b7c57 100644 --- a/core/src/job/error.rs +++ b/core/src/job/error.rs @@ -7,16 +7,13 @@ use crate::{ util::{db::MissingFieldError, error::FileIOError}, }; -use std::fmt::Debug; - use prisma_client_rust::QueryError; use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError}; use sd_crypto::Error as CryptoError; use thiserror::Error; +use tokio::sync::oneshot; use uuid::Uuid; -use super::JobRunErrors; - #[derive(Error, Debug)] pub enum JobError { // General errors @@ -40,18 +37,12 @@ pub enum JobError { MissingReport { id: Uuid, name: String }, #[error("missing some job data: '{value}'")] MissingData { value: String }, - #[error("error converting/handling OS strings")] - OsStr, #[error("invalid job status integer: {0}")] InvalidJobStatusInt(i32), #[error(transparent)] FileIO(#[from] FileIOError), #[error("Location error: {0}")] Location(#[from] LocationError), - #[error("job failed to pause: {0}")] - PauseFailed(String), - #[error("failed to send command to worker")] - WorkerCommandSendFailed, // Specific job errors #[error(transparent)] @@ -74,16 +65,14 @@ pub enum JobError { ThumbnailSkipped, // Not errors - #[error("step completed with errors: {0:?}")] - StepCompletedWithErrors(JobRunErrors), #[error("job had a early finish: ")] EarlyFinish { name: String, reason: String }, #[error("data needed for job execution not found: job ")] JobDataNotFound(String), #[error("job paused")] - Paused(Vec), + Paused(Vec, oneshot::Sender<()>), #[error("job canceled")] - Canceled(Vec), + Canceled(oneshot::Sender<()>), } #[derive(Error, Debug)] diff --git a/core/src/job/manager.rs b/core/src/job/manager.rs index f7fbe66a5..63de60e70 100644 --- a/core/src/job/manager.rs +++ b/core/src/job/manager.rs @@ -12,28 +12,26 @@ use crate::{ }, prisma::job, }; -use prisma_client_rust::operator::or; use std::{ collections::{HashMap, HashSet, VecDeque}, sync::Arc, }; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - Mutex, RwLock, -}; +use futures::future::join_all; +use prisma_client_rust::operator::or; +use tokio::sync::{mpsc, oneshot, RwLock}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use super::{JobManagerError, JobReport, JobStatus, WorkerCommand}; +use super::{JobManagerError, JobReport, JobStatus}; // db is single threaded, nerd const MAX_WORKERS: usize = 1; pub enum JobManagerEvent { IngestJob(Library, Box), - Shutdown, + Shutdown(oneshot::Sender<()>), } /// JobManager handles queueing and executing jobs using the `DynJob` /// Handling persisting JobReports to the database, pause/resuming, and @@ -41,10 +39,8 @@ pub enum JobManagerEvent { pub struct JobManager { current_jobs_hashes: RwLock>, job_queue: RwLock>>, - running_workers: RwLock>>>, - internal_sender: UnboundedSender, - // pub external_receiver: UnboundedReceiver, - // external_sender: UnboundedSender, + running_workers: RwLock>, + internal_sender: mpsc::UnboundedSender, } impl JobManager { @@ -52,16 +48,12 @@ impl JobManager { pub fn new() -> Arc { // allow the job manager to control its workers let (internal_sender, mut internal_receiver) = mpsc::unbounded_channel(); - // // emit realtime events to the rest of the application - // let (external_sender, external_receiver) = mpsc::unbounded_channel(); let this = Arc::new(Self { current_jobs_hashes: RwLock::new(HashSet::new()), job_queue: RwLock::new(VecDeque::new()), running_workers: RwLock::new(HashMap::new()), internal_sender, - // external_receiver, - // external_sender, }); let this2 = this.clone(); @@ -74,16 +66,12 @@ impl JobManager { } // When the app shuts down, we need to gracefully shutdown all // active workers and preserve their state - JobManagerEvent::Shutdown => { + JobManagerEvent::Shutdown(signal_tx) => { info!("Shutting down job manager"); - let mut running_workers = this2.running_workers.write().await; - for (_, worker) in running_workers.iter_mut() { - worker - .lock() - .await - .command(WorkerCommand::Shutdown) - .expect("Failed to send shutdown command to worker"); - } + let running_workers = this2.running_workers.read().await; + join_all(running_workers.values().map(|worker| worker.shutdown())).await; + + signal_tx.send(()).ok(); } } } @@ -122,46 +110,61 @@ impl JobManager { /// Dispatches a job to a worker if under MAX_WORKERS limit, queues it otherwise. async fn dispatch(self: Arc, library: &Library, mut job: Box) { let mut running_workers = self.running_workers.write().await; + let mut job_report = job + .report_mut() + .take() + .expect("critical error: missing job on worker"); if running_workers.len() < MAX_WORKERS { info!("Running job: {:?}", job.name()); - let job_report = job - .report_mut() - .take() - .expect("critical error: missing job on worker"); + let worker_id = job_report.parent_id.unwrap_or(job_report.id); - let job_id = job_report.id; - - let worker = Worker::new( - job, job_report, // , self.external_sender.clone() - ); - - let wrapped_worker = Arc::new(Mutex::new(worker)); - - if let Err(e) = - Worker::spawn(self.clone(), Arc::clone(&wrapped_worker), library.clone()).await - { - error!("Error spawning worker: {:?}", e); - } else { - running_workers.insert(job_id, wrapped_worker); - } + Worker::new(worker_id, job, job_report, library.clone(), self.clone()) + .await + .map_or_else( + |e| { + error!("Error spawning worker: {:#?}", e); + }, + |worker| { + running_workers.insert(worker_id, worker); + }, + ); } else { debug!( "Queueing job: ", job.name(), job.hash() ); + if let Err(e) = job_report.create(library).await { + // It's alright to just log here, as will try to create the report on run if it wasn't created before + error!("Error creating job report: {:#?}", e); + } + + // Put the report back, or it will be lost forever + *job.report_mut() = Some(job_report); + self.job_queue.write().await.push_back(job); } } - pub async fn complete(self: Arc, library: &Library, job_id: Uuid, job_hash: u64) { + pub async fn complete( + self: Arc, + library: &Library, + worker_id: Uuid, + job_hash: u64, + next_job: Option>, + ) { // remove worker from running workers and from current jobs hashes self.current_jobs_hashes.write().await.remove(&job_hash); - self.running_workers.write().await.remove(&job_id); + self.running_workers.write().await.remove(&worker_id); // continue queue - let job = self.job_queue.write().await.pop_front(); + let job = if next_job.is_some() { + next_job + } else { + self.job_queue.write().await.pop_front() + }; + if let Some(job) = job { // We can't directly execute `self.ingest` here because it would cause an async cycle. self.internal_sender @@ -173,48 +176,56 @@ impl JobManager { } /// Shutdown the job manager, signaled by core on shutdown. - pub async fn shutdown(self: Arc) { + pub async fn shutdown(&self) { + let (tx, rx) = oneshot::channel(); self.internal_sender - .send(JobManagerEvent::Shutdown) + .send(JobManagerEvent::Shutdown(tx)) .unwrap_or_else(|_| { error!("Failed to send shutdown event to job manager!"); }); + + rx.await.unwrap_or_else(|_| { + error!("Failed to receive shutdown event response from job manager!"); + }); } - // Pause a specific job. + /// Pause a specific job. pub async fn pause(&self, job_id: Uuid) -> Result<(), JobManagerError> { - // Get a read lock on the running workers. - let workers_guard = self.running_workers.read().await; - // Look up the worker for the given job ID. - if let Some(worker_mutex) = workers_guard.get(&job_id) { - // Lock the worker. - let worker = worker_mutex.lock().await; - - info!("Pausing job: {:?}", worker.report()); + if let Some(worker) = self.running_workers.read().await.get(&job_id) { + debug!("Pausing job: {:#?}", worker.report()); // Set the pause signal in the worker. - worker.pause(); + worker.pause().await; Ok(()) } else { Err(JobManagerError::NotFound(job_id)) } } - // Resume a specific job. + /// Resume a specific job. pub async fn resume(&self, job_id: Uuid) -> Result<(), JobManagerError> { - // Get a read lock on the running workers. - let workers_guard = self.running_workers.read().await; - // Look up the worker for the given job ID. - if let Some(worker_mutex) = workers_guard.get(&job_id) { - // Lock the worker. - let worker = worker_mutex.lock().await; - - info!("Resuming job: {:?}", worker.report()); + if let Some(worker) = self.running_workers.read().await.get(&job_id) { + debug!("Resuming job: {:?}", worker.report()); // Set the pause signal in the worker. - worker.resume(); + worker.resume().await; + + Ok(()) + } else { + Err(JobManagerError::NotFound(job_id)) + } + } + + /// Cancel a specific job. + pub async fn cancel(&self, job_id: Uuid) -> Result<(), JobManagerError> { + // Look up the worker for the given job ID. + if let Some(worker) = self.running_workers.read().await.get(&job_id) { + debug!("Canceling job: {:#?}", worker.report()); + + // Set the cancel signal in the worker. + worker.cancel().await; Ok(()) } else { @@ -272,26 +283,43 @@ impl JobManager { Ok(()) } - // get all active jobs, including paused jobs - pub async fn get_active_reports(&self) -> HashMap { - let mut active_reports = HashMap::new(); - for worker in self.running_workers.read().await.values() { - let report = worker.lock().await.report(); - active_reports.insert(report.get_meta().0, report); - } - active_reports - } - // get all running jobs, excluding paused jobs - pub async fn get_running_reports(&self) -> HashMap { - let mut active_reports = HashMap::new(); - for worker in self.running_workers.read().await.values() { - let worker = worker.lock().await; - if !worker.is_paused() { + // get all active jobs, including paused jobs organized by job id + pub async fn get_active_reports_with_id(&self) -> HashMap { + self.running_workers + .read() + .await + .values() + .map(|worker| { let report = worker.report(); - active_reports.insert(report.get_meta().0, report); + (report.id, report) + }) + .collect() + } + + // get all running jobs, excluding paused jobs organized by action + pub async fn get_running_reports(&self) -> HashMap { + self.running_workers + .read() + .await + .values() + .filter_map(|worker| { + (!worker.is_paused()).then(|| { + let report = worker.report(); + (report.get_meta().0, report) + }) + }) + .collect() + } + + /// Check if the manager currently has some active workers. + pub async fn has_active_workers(&self) -> bool { + for worker in self.running_workers.read().await.values() { + if !worker.is_paused() { + return true; } } - active_reports + + false } } diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 235ab8ad4..366e119eb 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -2,15 +2,16 @@ use crate::library::Library; use std::{ collections::{hash_map::DefaultHasher, VecDeque}, + fmt, hash::{Hash, Hasher}, mem, - sync::{atomic::Ordering, Arc}, - time::Duration, + sync::Arc, + time::Instant, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; - -use tracing::{debug, error, info, warn}; +use tokio::{select, sync::mpsc}; +use tracing::{debug, info, warn}; use uuid::Uuid; mod error; @@ -25,9 +26,24 @@ pub use worker::*; pub type JobResult = Result; pub type JobMetadata = Option; -pub type JobRunErrors = Vec; + +#[derive(Debug, Default)] +pub struct JobRunErrors(pub Vec); + +impl From> for JobRunErrors { + fn from(errors: Vec) -> Self { + Self(errors) + } +} + +pub struct JobRunOutput { + pub metadata: JobMetadata, + pub errors: JobRunErrors, + pub next_job: Option>, +} + /// `JobInitData` is a trait to represent the data being passed to initialize a `Job` -pub trait JobInitData: Serialize + DeserializeOwned + Send + Sync + Hash { +pub trait JobInitData: Serialize + DeserializeOwned + Send + Sync + Hash + fmt::Debug { type Job: StatefulJob; fn hash(&self) -> u64 { @@ -38,11 +54,22 @@ pub trait JobInitData: Serialize + DeserializeOwned + Send + Sync + Hash { } } +pub trait JobRunMetadata: + Default + Serialize + DeserializeOwned + Send + Sync + fmt::Debug +{ + fn update(&mut self, new_data: Self); +} + +impl JobRunMetadata for () { + fn update(&mut self, _new_data: Self) {} +} + #[async_trait::async_trait] -pub trait StatefulJob: Send + Sync + Sized { +pub trait StatefulJob: Send + Sync + Sized + 'static { type Init: JobInitData; - type Data: Serialize + DeserializeOwned + Send + Sync; - type Step: Serialize + DeserializeOwned + Send + Sync; + type Data: Serialize + DeserializeOwned + Send + Sync + fmt::Debug; + type Step: Serialize + DeserializeOwned + Send + Sync + fmt::Debug; + type RunMetadata: JobRunMetadata; /// The name of the job is a unique human readable identifier for the job. const NAME: &'static str; @@ -55,19 +82,23 @@ pub trait StatefulJob: Send + Sync + Sized { /// initialize the steps for the job async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError>; + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError>; /// is called for each step in the job. These steps are created in the `Self::init` method. async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError>; + ctx: &WorkerContext, + init: &Self::Init, + step: CurrentStep<'_, Self::Step>, + data: &Self::Data, + run_metadata: &Self::RunMetadata, + ) -> Result, JobError>; /// is called after all steps have been executed - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult; + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult; } #[async_trait::async_trait] @@ -79,9 +110,9 @@ pub trait DynJob: Send + Sync { fn name(&self) -> &'static str; async fn run( &mut self, - job_manager: Arc, - ctx: &mut WorkerContext, - ) -> Result<(JobMetadata, JobRunErrors), JobError>; + mut ctx: WorkerContext, + mut commands_rx: mpsc::Receiver, + ) -> Result; fn hash(&self) -> u64; fn set_next_jobs(&mut self, next_jobs: VecDeque>); fn serialize_state(&self) -> Result, JobError>; @@ -92,9 +123,10 @@ pub trait DynJob: Send + Sync { pub struct Job { id: Uuid, + hash: u64, report: Option, - state: JobState, - stateful_job: SJob, + state: Option>, + stateful_job: Option, next_jobs: VecDeque>, } @@ -131,14 +163,16 @@ where let id = Uuid::new_v4(); Box::new(Self { id, + hash: ::hash(&init), report: Some(JobReport::new(id, SJob::NAME.to_string())), - state: JobState { + state: Some(JobState { init, data: None, steps: VecDeque::new(), step_number: 0, - }, - stateful_job: SJob::new(), + run_metadata: Default::default(), + }), + stateful_job: Some(SJob::new()), next_jobs: VecDeque::new(), }) } @@ -147,18 +181,20 @@ where let id = Uuid::new_v4(); Box::new(Self { id, + hash: ::hash(&init), report: Some(JobReport::new_with_action( id, SJob::NAME.to_string(), action, )), - state: JobState { + state: Some(JobState { init, data: None, steps: VecDeque::new(), step_number: 0, - }, - stateful_job: SJob::new(), + run_metadata: Default::default(), + }), + stateful_job: Some(SJob::new()), next_jobs: VecDeque::new(), }) } @@ -190,16 +226,19 @@ where stateful_job: SJob, // whichever type of job this should be is passed here next_jobs: Option>>, ) -> Result, JobError> { + let state = rmp_serde::from_slice::>( + &report + .data + .take() + .ok_or_else(|| JobError::MissingJobDataState(report.id, report.name.clone()))?, + )?; + Ok(Box::new(Self { id: report.id, - state: rmp_serde::from_slice( - &report - .data - .take() - .ok_or_else(|| JobError::MissingJobDataState(report.id, report.name.clone()))?, - )?, + hash: ::hash(&state.init), + state: Some(state), report: Some(report), - stateful_job, + stateful_job: Some(stateful_job), next_jobs: next_jobs.unwrap_or_default(), })) } @@ -208,19 +247,21 @@ where let id = Uuid::new_v4(); Box::new(Self { id, + hash: ::hash(&init), report: Some(JobReport::new_with_parent( id, SJob::NAME.to_string(), parent_id, parent_action, )), - state: JobState { + state: Some(JobState { init, data: None, steps: VecDeque::new(), step_number: 0, - }, - stateful_job: SJob::new(), + run_metadata: Default::default(), + }), + stateful_job: Some(SJob::new()), next_jobs: VecDeque::new(), }) } @@ -232,6 +273,121 @@ pub struct JobState { pub data: Option, pub steps: VecDeque, pub step_number: usize, + pub run_metadata: Job::RunMetadata, +} + +pub struct JobInitOutput { + run_metadata: RunMetadata, + steps: VecDeque, + errors: JobRunErrors, +} + +impl From<(RunMetadata, Vec)> for JobInitOutput { + fn from((run_metadata, steps): (RunMetadata, Vec)) -> Self { + Self { + run_metadata, + steps: VecDeque::from(steps), + errors: Default::default(), + } + } +} + +impl From> for JobInitOutput<(), Step> { + fn from(steps: Vec) -> Self { + Self { + run_metadata: (), + steps: VecDeque::from(steps), + errors: Default::default(), + } + } +} + +impl From<(RunMetadata, Vec, JobRunErrors)> + for JobInitOutput +{ + fn from((run_metadata, steps, errors): (RunMetadata, Vec, JobRunErrors)) -> Self { + Self { + run_metadata, + steps: VecDeque::from(steps), + errors, + } + } +} + +pub struct CurrentStep<'step, Step> { + pub step: &'step Step, + pub step_number: usize, + pub total_steps: usize, +} + +pub struct JobStepOutput { + maybe_more_steps: Option>, + maybe_more_metadata: Option, + errors: JobRunErrors, +} + +impl From> for JobStepOutput { + fn from(more_steps: Vec) -> Self { + Self { + maybe_more_steps: Some(more_steps), + maybe_more_metadata: None, + errors: Default::default(), + } + } +} + +impl From for JobStepOutput { + fn from(more_metadata: RunMetadata) -> Self { + Self { + maybe_more_steps: None, + maybe_more_metadata: Some(more_metadata), + errors: Default::default(), + } + } +} + +impl From for JobStepOutput { + fn from(errors: JobRunErrors) -> Self { + Self { + maybe_more_steps: None, + maybe_more_metadata: None, + errors, + } + } +} + +impl From<(Vec, RunMetadata)> + for JobStepOutput +{ + fn from((more_steps, more_metadata): (Vec, RunMetadata)) -> Self { + Self { + maybe_more_steps: Some(more_steps), + maybe_more_metadata: Some(more_metadata), + errors: Default::default(), + } + } +} + +impl From<(Vec, RunMetadata, JobRunErrors)> + for JobStepOutput +{ + fn from((more_steps, more_metadata, errors): (Vec, RunMetadata, JobRunErrors)) -> Self { + Self { + maybe_more_steps: Some(more_steps), + maybe_more_metadata: Some(more_metadata), + errors, + } + } +} + +impl From> for JobStepOutput { + fn from(_: Option<()>) -> Self { + Self { + maybe_more_steps: None, + maybe_more_metadata: None, + errors: Vec::new().into(), + } + } } #[async_trait::async_trait] @@ -262,104 +418,441 @@ impl DynJob for Job { async fn run( &mut self, - job_manager: Arc, - ctx: &mut WorkerContext, - ) -> Result<(JobMetadata, JobRunErrors), JobError> { - let mut job_should_run = true; + ctx: WorkerContext, + mut commands_rx: mpsc::Receiver, + ) -> Result { + let job_name = self.name(); + let job_id = self.id; let mut errors = vec![]; - info!( - "Starting job {id} ({name})", - id = self.id, - name = self.name() + info!("Starting Job "); + + let JobState { + init, + data, + mut steps, + mut step_number, + mut run_metadata, + } = self + .state + .take() + .expect("critical error: missing job state"); + + let stateful_job = Arc::new( + self.stateful_job + .take() + .expect("critical error: missing stateful job"), ); + let ctx = Arc::new(ctx); + let init_arc = Arc::new(init); + + let mut job_should_run = true; + let job_time = Instant::now(); + // Checking if we have a brand new job, or if we are resuming an old one. - if self.state.data.is_none() { - if let Err(e) = self.stateful_job.init(ctx, &mut self.state).await { - match e { - JobError::EarlyFinish { .. } => { - info!("{e}"); - job_should_run = false; + let working_data = if let Some(data) = data { + Some(data) + } else { + // Job init phase + let inner_ctx = Arc::clone(&ctx); + let inner_init = Arc::clone(&init_arc); + let inner_stateful_job = Arc::clone(&stateful_job); + + let init_time = Instant::now(); + + let mut init_handle = tokio::spawn(async move { + let mut new_data = None; + let res = inner_stateful_job + .init(&inner_ctx, &inner_init, &mut new_data) + .await; + + (new_data, res) + }); + + loop { + select! { + Some(command) = commands_rx.recv() => { + match command { + WorkerCommand::Pause(when) => { + debug!( + "Pausing Job at init phase took {:?}", + when.elapsed() + ); + + // In case of a Pause command, we keep waiting for the next command + let paused_time = Instant::now(); + while let Some(command) = commands_rx.recv().await { + match command { + WorkerCommand::Resume(when) => { + debug!( + "Resuming Job at init phase took {:?}", + when.elapsed() + ); + debug!( + "Total paused time {:?} Job ", + paused_time.elapsed() + ); + break; + } + // The job can also be shutdown or canceled while paused + WorkerCommand::Shutdown(when, signal_tx) => { + init_handle.abort(); + + debug!( + "Shuting down Job at init phase \ + took {:?} after running for {:?}", + when.elapsed(), + init_time.elapsed(), + ); + debug!("Total paused time {:?}", paused_time.elapsed()); + + // Shutting down at init phase will abort the job + return Err( + JobError::Canceled(signal_tx) + ); + } + WorkerCommand::Cancel(when, signal_tx) => { + init_handle.abort(); + debug!( + "Canceling Job at init phase \ + took {:?} after running for {:?}", + when.elapsed(), + init_time.elapsed(), + ); + debug!( + "Total paused time {:?} Job ", + paused_time.elapsed() + ); + return Err(JobError::Canceled(signal_tx)); + } + WorkerCommand::Pause(_) => { + // We continue paused lol + } + } + } + } + + WorkerCommand::Resume(_) => { + // We're already running so we just ignore this command + } + + WorkerCommand::Shutdown(when, signal_tx) => { + init_handle.abort(); + + debug!( + "Shuting down Job at init phase took {:?} \ + after running for {:?}", + when.elapsed(), + init_time.elapsed(), + ); + + // Shutting down at init phase will abort the job + return Err( + JobError::Canceled(signal_tx) + ); + } + WorkerCommand::Cancel(when, signal_tx) => { + init_handle.abort(); + debug!( + "Canceling Job at init phase took {:?} \ + after running for {:?}", + when.elapsed(), + init_time.elapsed() + ); + return Err(JobError::Canceled(signal_tx)); + } + } + } + init_res = &mut init_handle => { + let (new_data, res) = init_res?; + debug!("Init phase took {:?} Job ", init_time.elapsed()); + + match res { + Ok(JobInitOutput { + run_metadata: new_run_metadata, + steps: new_steps, + errors: JobRunErrors(new_errors), + }) => { + steps = new_steps; + errors.extend(new_errors); + run_metadata.update(new_run_metadata); + } + + Err(e) if matches!(e, JobError::EarlyFinish { .. }) => { + job_should_run = false; + info!("{e}"); + } + Err(other) => return Err(other), + } + + break new_data; } - JobError::StepCompletedWithErrors(errors_text) => errors.extend(errors_text), - other => return Err(other), } } - } - - let command_rx = ctx.command_rx.clone(); - let mut command_rx = command_rx.lock().await; + }; // Run the job until it's done or we get a command - while job_should_run && !self.state.steps.is_empty() { - // Check for commands every iteration - if let Ok(command) = command_rx.try_recv() { - match command { - WorkerCommand::Shutdown => { - return Err(JobError::Paused(rmp_serde::to_vec_named(&self.state)?)); - } - WorkerCommand::Cancel => { - return Err(JobError::Canceled(rmp_serde::to_vec_named(&self.state)?)); + let data = if let Some(working_data) = working_data { + let working_data_arc = Arc::new(working_data); + + // Job run phase + while job_should_run && !steps.is_empty() { + let steps_len = steps.len(); + + let run_metadata_arc = Arc::new(run_metadata); + let step_arc = + Arc::new(steps.pop_front().expect("just checked that we have steps")); + + // Need these bunch of Arcs to be able to move them into the async block of tokio::spawn + let inner_ctx = Arc::clone(&ctx); + let inner_init = Arc::clone(&init_arc); + let inner_run_metadata = Arc::clone(&run_metadata_arc); + let inner_working_data = Arc::clone(&working_data_arc); + let inner_step = Arc::clone(&step_arc); + let inner_stateful_job = Arc::clone(&stateful_job); + + let step_time = Instant::now(); + + let mut job_step_handle = tokio::spawn(async move { + inner_stateful_job + .execute_step( + &inner_ctx, + &inner_init, + CurrentStep { + step: &inner_step, + step_number, + total_steps: steps_len, + }, + &inner_working_data, + &inner_run_metadata, + ) + .await + }); + + loop { + select! { + // Here we have a channel that we use to receive commands from the worker + Some(command) = commands_rx.recv() => { + match command { + WorkerCommand::Pause(when) => { + debug!( + "Pausing Job took {:?}", + when.elapsed() + ); + + // In case of a Pause command, we keep waiting for the next command + let paused_time = Instant::now(); + while let Some(command) = commands_rx.recv().await { + match command { + WorkerCommand::Resume(when) => { + debug!( + "Resuming Job took {:?}", + when.elapsed(), + ); + debug!( + "Total paused time {:?} Job ", + paused_time.elapsed(), + ); + break; + } + // The job can also be shutdown or canceled while paused + WorkerCommand::Shutdown(when, signal_tx) => { + job_step_handle.abort(); + + debug!( + "Shuting down Job took {:?} \ + after running for {:?}", + when.elapsed(), + job_time.elapsed(), + ); + debug!( + "Total paused time {:?} Job ", + paused_time.elapsed(), + ); + + // Taking back the last step, so it can run to completion later + steps.push_front( + Arc::try_unwrap(step_arc) + .expect("step already ran, no more refs"), + ); + + return Err( + JobError::Paused( + rmp_serde::to_vec_named( + &JobState:: { + init: Arc::try_unwrap(init_arc) + .expect("handle abort already ran, no more refs"), + data: Some( + Arc::try_unwrap(working_data_arc) + .expect("handle abort already ran, no more refs"), + ), + steps, + step_number, + run_metadata: Arc::try_unwrap(run_metadata_arc) + .expect("handle abort already ran, no more refs"), + } + )?, + signal_tx + ) + ); + } + WorkerCommand::Cancel(when, signal_tx) => { + job_step_handle.abort(); + debug!( + "Canceling Job \ + took {:?} after running for {:?}", + when.elapsed(), + job_time.elapsed(), + ); + debug!( + "Total paused time {:?} Job ", + paused_time.elapsed(), + ); + return Err(JobError::Canceled(signal_tx)); + } + WorkerCommand::Pause(_) => { + // We continue paused lol + } + } + } + } + WorkerCommand::Resume(_) => { + // We're already running so we just ignore this command + } + + WorkerCommand::Shutdown(when, signal_tx) => { + job_step_handle.abort(); + + debug!( + "Shuting down Job took {:?} \ + after running for {:?}", + when.elapsed(), + job_time.elapsed(), + ); + + // Taking back the last step, so it can run to completion later + steps.push_front( + Arc::try_unwrap(step_arc) + .expect("handle abort already ran, no more refs"), + ); + + return Err( + JobError::Paused( + rmp_serde::to_vec_named( + &JobState:: { + init: Arc::try_unwrap(init_arc) + .expect("handle abort already ran, no more refs"), + data: Some( + Arc::try_unwrap(working_data_arc) + .expect("handle abort already ran, no more refs"), + ), + steps, + step_number, + run_metadata: Arc::try_unwrap(run_metadata_arc) + .expect("step already ran, no more refs"), + } + )?, + signal_tx + ) + ); + } + WorkerCommand::Cancel(when, signal_tx) => { + job_step_handle.abort(); + debug!( + "Canceling Job took {:?} \ + after running for {:?}", + when.elapsed(), + job_time.elapsed(), + ); + return Err(JobError::Canceled(signal_tx)); + } + } + } + + // Here we actually run the job, step by step + step_result = &mut job_step_handle => { + debug!( + "Step finished in {:?} Job ", + step_time.elapsed(), + ); + + run_metadata = Arc::try_unwrap(run_metadata_arc) + .expect("step already ran, no more refs"); + + match step_result? { + Ok(JobStepOutput { + maybe_more_steps, + maybe_more_metadata, + errors: JobRunErrors(new_errors) + }) => { + if let Some(more_steps) = maybe_more_steps { + steps.extend(more_steps); + } + + if let Some(more_metadata) = maybe_more_metadata { + run_metadata.update(more_metadata); + } + + if !new_errors.is_empty() { + warn!("Job had a step with errors"); + errors.extend(new_errors); + } + } + Err(e) if matches!(e, JobError::EarlyFinish { .. }) => { + info!("{e}"); + break; + } + Err(e) => return Err(e), + } + // remove the step from the queue + step_number += 1; + + break; + } } } } - let mut state_preserved = false; - // Every X milliseconds, check the AtomicBool if we should pause or stay paused - while ctx.paused.load(Ordering::Relaxed) { - if !state_preserved { - // Save the state of the job - println!("Saving state {:?}", &self.report); - // ctx.preserve_state(rmp_serde::to_vec_named(&self.state)?); - } - state_preserved = true; - tokio::time::sleep(Duration::from_millis(500)).await; - } + debug!( + "Total job run time {:?} Job ", + job_time.elapsed() + ); - // process job step and handle errors if any - let step_result = self.stateful_job.execute_step(ctx, &mut self.state).await; - match step_result { - Err(JobError::EarlyFinish { .. }) => { - step_result - .map_err(|err| { - warn!("{}", err); - }) - .ok(); - break; - } - Err(JobError::StepCompletedWithErrors(errors_text)) => { - warn!("Job had a step with errors", self.id); - errors.extend(errors_text); - } - maybe_err => maybe_err?, - } - // remove the step from the queue - self.state.steps.pop_front(); - self.state.step_number += 1; - } + Some(Arc::try_unwrap(working_data_arc).expect("job already ran, no more refs")) + } else { + warn!("Tried to run a job without data Job "); + None + }; - let metadata = self.stateful_job.finalize(ctx, &mut self.state).await?; + let state = JobState:: { + init: Arc::try_unwrap(init_arc).expect("job already ran, no more refs"), + data, + steps, + step_number, + run_metadata, + }; + + let metadata = stateful_job.finalize(&ctx, &state).await?; let mut next_jobs = mem::take(&mut self.next_jobs); - if let Some(mut next_job) = next_jobs.pop_front() { - debug!( - "Job '{}' requested to spawn '{}' now that it's complete!", - self.name(), - next_job.name() - ); - next_job.set_next_jobs(next_jobs); + Ok(JobRunOutput { + metadata, + errors: errors.into(), + next_job: next_jobs.pop_front().map(|mut next_job| { + debug!( + "Job requesting to spawn '{}' now that it's complete!", + next_job.name() + ); + next_job.set_next_jobs(next_jobs); - if let Err(e) = job_manager.clone().ingest(&ctx.library, next_job).await { - error!("Failed to ingest next job: {e}"); - } - } - - Ok((metadata, errors)) + next_job + }), + }) } fn hash(&self) -> u64 { - ::hash(&self.state.init) + self.hash } fn set_next_jobs(&mut self, next_jobs: VecDeque>) { @@ -423,23 +916,3 @@ impl DynJob for Job { Ok(()) } } - -#[macro_export] -macro_rules! extract_job_data { - ($state:ident) => {{ - $state - .data - .as_ref() - .expect("critical error: missing data on job state") - }}; -} - -#[macro_export] -macro_rules! extract_job_data_mut { - ($state:ident) => {{ - $state - .data - .as_mut() - .expect("critical error: missing data on job state") - }}; -} diff --git a/core/src/job/report.rs b/core/src/job/report.rs index cf3626699..4a924ea81 100644 --- a/core/src/job/report.rs +++ b/core/src/job/report.rs @@ -1,18 +1,14 @@ use crate::{ library::Library, prisma::{job, node}, - util::{ - self, - db::{maybe_missing, MissingFieldError}, - }, + util::db::{chain_optional_iter, maybe_missing, MissingFieldError}, }; + +use std::fmt::{Display, Formatter}; + use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use specta::Type; -use std::{ - fmt::Debug, - fmt::{Display, Formatter}, -}; use tracing::error; use uuid::Uuid; @@ -25,7 +21,21 @@ pub enum JobReportUpdate { Message(String), } -job::select!(job_without_data { id name action status parent_id errors_text metadata date_created date_started date_completed task_count completed_task_count date_estimated_completion }); +job::select!(job_without_data { + id + name + action + status + parent_id + errors_text + metadata + date_created + date_started + date_completed + task_count + completed_task_count + date_estimated_completion +}); #[derive(Debug, Serialize, Deserialize, Type, Clone)] pub struct JobReport { @@ -90,11 +100,8 @@ impl TryFrom for JobReport { .map(|id| Uuid::from_slice(&id).expect("corrupted database")), status: JobStatus::try_from(maybe_missing(data.status, "job.status")?) .expect("corrupted database"), - task_count: maybe_missing(data.task_count, "job.task_count")?, - completed_task_count: maybe_missing( - data.completed_task_count, - "job.completed_task_count", - )?, + task_count: data.task_count.unwrap_or(0), + completed_task_count: data.completed_task_count.unwrap_or(0), message: String::new(), estimated_completion: data .date_estimated_completion @@ -134,11 +141,9 @@ impl TryFrom for JobReport { .map(|id| Uuid::from_slice(&id).expect("corrupted database")), status: JobStatus::try_from(maybe_missing(data.status, "job.status")?) .expect("corrupted database"), - task_count: maybe_missing(data.task_count, "job.task_count")?, - completed_task_count: maybe_missing( - data.completed_task_count, - "job.completed_task_count", - )?, + task_count: data.task_count.unwrap_or(0), + completed_task_count: data.completed_task_count.unwrap_or(0), + message: String::new(), estimated_completion: data .date_estimated_completion @@ -189,16 +194,21 @@ impl JobReport { pub fn get_meta(&self) -> (String, Option) { // actions are formatted like "added_location" or "added_location-1" - let action_name = match self.action { - Some(ref action) => action.split('-').next().unwrap_or("").to_string(), - None => return (self.id.to_string(), None), + let Some(action_name) = self.action + .as_ref() + .map( + |action| action.split('-') + .next() + .map(str::to_string) + .unwrap_or_default() + ) else { + return (self.id.to_string(), None); }; // create a unique group_key, EG: "added_location-" - let group_key = if let Some(parent_id) = &self.parent_id { - format!("{}-{}", action_name, parent_id) - } else { - format!("{}-{}", action_name, &self.id) - }; + let group_key = self.parent_id.map_or_else( + || format!("{}-{}", action_name, &self.id), + |parent_id| format!("{}-{}", action_name, parent_id), + ); (action_name, Some(group_key)) } @@ -206,14 +216,12 @@ impl JobReport { pub async fn create(&mut self, library: &Library) -> Result<(), JobError> { let now = Utc::now(); - self.created_at = Some(now); - library .db .job() .create( self.id.as_bytes().to_vec(), - util::db::chain_optional_iter( + chain_optional_iter( [ job::node::connect(node::id::equals(library.node_local_id)), job::name::set(Some(self.name.clone())), @@ -232,6 +240,10 @@ impl JobReport { ) .exec() .await?; + + // Only setting created_at after we successfully created the job in DB + self.created_at = Some(now); + Ok(()) } @@ -272,6 +284,17 @@ pub enum JobStatus { CompletedWithErrors = 6, } +impl JobStatus { + pub fn is_finished(self) -> bool { + matches!( + self, + Self::Completed + | Self::Canceled | Self::Paused + | Self::Failed | Self::CompletedWithErrors + ) + } +} + impl TryFrom for JobStatus { type Error = JobError; diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index ffc6a6128..d69254ddb 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -1,20 +1,29 @@ -use super::JobReport; -use crate::api::CoreEvent; -use crate::invalidate_query; -use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus}; -use crate::library::Library; +use crate::{api::CoreEvent, invalidate_query, library::Library}; + +use std::{ + fmt, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + use chrono::{DateTime, Utc}; use serde::Serialize; use specta::Type; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, +use tokio::{ + select, + sync::{mpsc, oneshot, watch}, + time::Instant, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; +use super::{ + DynJob, JobError, JobManager, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput, JobStatus, +}; + #[derive(Debug, Clone, Serialize, Type)] pub struct JobProgressEvent { pub id: Uuid, @@ -28,21 +37,35 @@ pub struct JobProgressEvent { #[derive(Debug)] pub enum WorkerEvent { Progressed(Vec), - Paused(Option>), + Stop, } // used to send commands to the worker thread from the manager #[derive(Debug)] pub enum WorkerCommand { - Cancel, - Shutdown, + Pause(Instant), + Resume(Instant), + Cancel(Instant, oneshot::Sender<()>), + Shutdown(Instant, oneshot::Sender<()>), } pub struct WorkerContext { pub library: Library, - events_tx: UnboundedSender, - pub command_rx: Arc>>, - pub paused: Arc, + events_tx: mpsc::UnboundedSender, +} + +impl fmt::Debug for WorkerContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerContext").finish() + } +} + +impl Drop for WorkerContext { + fn drop(&mut self) { + self.events_tx + .send(WorkerEvent::Stop) + .expect("critical error: failed to send worker stop event"); + } } impl WorkerContext { @@ -51,371 +74,425 @@ impl WorkerContext { .send(WorkerEvent::Progressed(updates)) .expect("critical error: failed to send worker worker progress event updates"); } - pub fn preserve_state(&self, state: Vec) { - self.events_tx - .send(WorkerEvent::Paused(Some(state))) - .expect("critical error: failed to send worker worker progress event updates"); - } } // a worker is a dedicated thread that runs a single job // once the job is complete the worker will exit pub struct Worker { - job: Option>, - report: JobReport, - events_tx: UnboundedSender, - events_rx: Option>, - command_tx: Option>, - // external_event_tx: UnboundedSender, - start_time: Option>, - paused: Arc, + commands_tx: mpsc::Sender, + report_watch_tx: Arc>, + report_watch_rx: watch::Receiver, + paused: AtomicBool, } impl Worker { - pub fn new( - job: Box, - report: JobReport, - // external_event_tx: UnboundedSender, - ) -> Self { - let (events_tx, events_rx) = unbounded_channel(); + pub async fn new( + id: Uuid, + mut job: Box, + mut report: JobReport, + library: Library, + job_manager: Arc, + ) -> Result { + let (commands_tx, commands_rx) = mpsc::channel(8); - Self { - job: Some(job), - report, - events_tx, - events_rx: Some(events_rx), - command_tx: None, - // external_event_tx, - start_time: None, - paused: Arc::new(AtomicBool::new(false)), + let job_hash = job.hash(); + + let start_time = Utc::now(); + + report.status = JobStatus::Running; + if report.started_at.is_none() { + report.started_at = Some(start_time); + } + + // If the report doesn't have a created_at date, it's a new report + if report.created_at.is_none() { + report.create(&library).await?; + } else { + // Otherwise it can be a job being resumed or a children job that was already been created + report.update(&library).await?; + } + + job.register_children(&library).await?; + + invalidate_queries(&library); + + let (report_watch_tx, report_watch_rx) = watch::channel(report.clone()); + let report_watch_tx = Arc::new(report_watch_tx); + + // spawn task to handle running the job + tokio::spawn(Self::do_work( + id, + JobWorkTable { + job, + manager: job_manager, + hash: job_hash, + report, + }, + Arc::clone(&report_watch_tx), + start_time, + commands_rx, + library, + )); + + Ok(Self { + commands_tx, + report_watch_tx, + report_watch_rx, + paused: AtomicBool::new(false), + }) + } + + pub async fn pause(&self) { + if self.report_watch_rx.borrow().status == JobStatus::Running { + self.paused.store(true, Ordering::Relaxed); + if self + .commands_tx + .send(WorkerCommand::Pause(Instant::now())) + .await + .is_ok() + { + self.report_watch_tx + .send_modify(|report| report.status = JobStatus::Paused); + } } } - pub fn pause(&self) { - self.paused.store(true, Ordering::Relaxed); + pub async fn resume(&self) { + if self.report_watch_rx.borrow().status == JobStatus::Paused { + self.paused.store(false, Ordering::Relaxed); + if self + .commands_tx + .send(WorkerCommand::Resume(Instant::now())) + .await + .is_ok() + { + self.report_watch_tx + .send_modify(|report| report.status = JobStatus::Running); + } + } } - pub fn resume(&self) { - self.paused.store(false, Ordering::Relaxed); + pub async fn cancel(&self) { + if self.report_watch_rx.borrow().status != JobStatus::Canceled { + let (tx, rx) = oneshot::channel(); + if self + .commands_tx + .send(WorkerCommand::Cancel(Instant::now(), tx)) + .await + .is_ok() + { + self.report_watch_tx + .send_modify(|report| report.status = JobStatus::Canceled); + rx.await.ok(); + } + } + } + + pub async fn shutdown(&self) { + let (tx, rx) = oneshot::channel(); + if self + .commands_tx + .send(WorkerCommand::Shutdown(Instant::now(), tx)) + .await + .is_ok() + { + rx.await.ok(); + } } pub fn report(&self) -> JobReport { - self.report.clone() + self.report_watch_rx.borrow().clone() } pub fn is_paused(&self) -> bool { self.paused.load(Ordering::Relaxed) } - // spawns a thread and extracts channel sender to communicate with it - pub async fn spawn( - job_manager: Arc, - worker_mutex: Arc>, - library: Library, - ) -> Result<(), JobError> { - let mut worker = worker_mutex.lock().await; - // we capture the worker receiver channel so state can be updated from inside the worker - let events_tx = worker.events_tx.clone(); - let events_rx = worker - .events_rx - .take() - .expect("critical error: missing worker events rx"); + fn track_progress( + report: &mut JobReport, + last_report_watch_update: &mut Instant, + report_watch_tx: &watch::Sender, + start_time: DateTime, + updates: Vec, + library: &Library, + ) { + // protect against updates if job is not running + if report.status != JobStatus::Running { + return; + }; - // create command channel to send commands to the worker - let (command_tx, command_rx) = unbounded_channel(); - let command_rx = Arc::new(Mutex::new(command_rx)); - worker.command_tx = Some(command_tx); - - let mut job = worker - .job - .take() - .expect("critical error: missing job on worker"); - - let job_hash = job.hash(); - let job_id = worker.report.id; - - worker.report.status = JobStatus::Running; - if worker.report.started_at.is_none() { - worker.report.started_at = Some(Utc::now()); - } - - worker.start_time = Some(Utc::now()); - - // If the report doesn't have a created_at date, it's a new report - if worker.report.created_at.is_none() { - worker.report.create(&library).await?; - } else { - // Otherwise it can be a job being resumed or a children job that was already been created - worker.report.update(&library).await?; - } - - drop(worker); - - job.register_children(&library).await?; - - invalidate_queries(&library); - - // spawn task to handle receiving events from the worker - tokio::spawn(Worker::track_progress( - Arc::clone(&worker_mutex), - events_rx, - library.clone(), - )); - - let paused = Arc::clone(&worker_mutex.lock().await.paused); - - let worker = Arc::clone(&worker_mutex); - - // spawn task to handle running the job - tokio::spawn(async move { - let mut worker_ctx = WorkerContext { - library: library.clone(), - events_tx, - command_rx, - paused, - }; - - // This oneshot is used to signal job completion, whether successful, failed, or paused, - // back to the task that's monitoring job execution. - // let (done_tx, done_rx) = oneshot::channel::<()>(); - - // Run the job and handle the result - match job.run(job_manager.clone(), &mut worker_ctx).await { - // -> Job completed successfully - Ok((metadata, errors)) if errors.is_empty() => { - // worker_ctx - // .events_tx - // .send(WorkerEvent::Completed(done_tx, metadata)) - // .expect("critical error: failed to send worker complete event"); - - let mut worker = worker.lock().await; - - worker.report.status = JobStatus::Completed; - worker.report.data = None; - worker.report.metadata = metadata; - worker.report.completed_at = Some(Utc::now()); - if let Err(e) = worker.report.update(&library).await { - error!("failed to update job report: {:#?}", e); - } - - invalidate_queries(&library); - info!("{}", worker.report); + for update in updates { + match update { + JobReportUpdate::TaskCount(task_count) => { + report.task_count = task_count as i32; } - // -> Job completed with errors - Ok((metadata, errors)) => { - warn!("Job completed with errors"); - // worker_ctx - // .events_tx - // .send(WorkerEvent::CompletedWithErrors(done_tx, metadata, errors)) - // .expect("critical error: failed to send worker complete event"); - - let mut worker = worker.lock().await; - - worker.report.status = JobStatus::CompletedWithErrors; - worker.report.errors_text = errors; - worker.report.data = None; - worker.report.metadata = metadata; - worker.report.completed_at = Some(Utc::now()); - if let Err(e) = worker.report.update(&library).await { - error!("failed to update job report: {:#?}", e); - } - - invalidate_queries(&library); - info!("{}", worker.report); + JobReportUpdate::CompletedTaskCount(completed_task_count) => { + report.completed_task_count = completed_task_count as i32; } - // -> Job paused - Err(JobError::Paused(state)) => { - info!("Job paused, we will pause all children jobs"); - // if let Err(e) = job.pause_children(&library).await { - // error!("Failed to pause children jobs: {e:#?}"); - // } - debug!("Setting worker status to paused"); - - let mut worker = worker.lock().await; - - worker.report.status = JobStatus::Paused; - worker.report.data = Some(state); - - if let Err(e) = worker.report.update(&library).await { - error!("failed to update job report: {:#?}", e); - } - - info!("{}", worker.report); - - invalidate_queries(&library); - } - // -> Job failed - Err(e) => { - error!("Job failed with error: {e:#?};"); - // if let Err(e) = job.cancel_children(&library).await { - // error!("Failed to cancel children jobs: {e:#?}"); - // } - - let mut worker = worker.lock().await; - - worker.report.status = JobStatus::Failed; - worker.report.data = None; - if let Err(e) = worker.report.update(&library).await { - error!("failed to update job report: {:#?}", e); - } - - invalidate_queries(&library); - warn!("{}", worker.report); + JobReportUpdate::Message(message) => { + trace!("job {} message: {}", report.id, message); + report.message = message; } } - - println!("Worker completed job: {:?}", job_hash); - - job_manager.complete(&library, job_id, job_hash).await; - }); - - Ok(()) - } - - // send command to worker from job manager - pub fn command(&self, command: WorkerCommand) -> Result<(), JobError> { - info!("Sending command to worker: {:#?}", command); - if let Some(tx) = &self.command_tx { - let tx = tx.clone(); - tx.send(command) - .map_err(|_| JobError::WorkerCommandSendFailed) - } else { - Err(JobError::WorkerCommandSendFailed) } + + // Calculate elapsed time + let elapsed = Utc::now() - start_time; + + // Calculate remaining time + let task_count = report.task_count as usize; + let completed_task_count = report.completed_task_count as usize; + let remaining_task_count = task_count.saturating_sub(completed_task_count); + let remaining_time_per_task = elapsed / (completed_task_count + 1) as i32; // Adding 1 to avoid division by zero + let remaining_time = remaining_time_per_task * remaining_task_count as i32; + + // Update the report with estimated remaining time + report.estimated_completion = Utc::now() + .checked_add_signed(remaining_time) + .unwrap_or(Utc::now()); + + // updated the report watcher + if last_report_watch_update.elapsed() > Duration::from_millis(500) { + report_watch_tx.send_modify(|old| { + old.task_count = report.task_count; + old.completed_task_count = report.completed_task_count; + old.estimated_completion = report.estimated_completion; + old.message = report.message.clone(); + }); + *last_report_watch_update = Instant::now(); + } + + // emit a CoreEvent + library.emit(CoreEvent::JobProgress(JobProgressEvent { + id: report.id, + task_count: report.task_count, + completed_task_count: report.completed_task_count, + estimated_completion: report.estimated_completion, + message: report.message.clone(), + })); } - async fn track_progress( - worker: Arc>, - mut events_rx: UnboundedReceiver, + async fn do_work( + worker_id: Uuid, + JobWorkTable { + mut job, + manager, + hash, + mut report, + }: JobWorkTable, + report_watch_tx: Arc>, + start_time: DateTime, + commands_rx: mpsc::Receiver, library: Library, ) { - while let Some(event) = events_rx.recv().await { - let mut worker = worker.lock().await; + let (events_tx, mut events_rx) = mpsc::unbounded_channel(); - match event { - WorkerEvent::Progressed(updates) => { - // protect against updates if job is not running - if worker.report.status != JobStatus::Running { - continue; - }; - for update in updates { - match update { - JobReportUpdate::TaskCount(task_count) => { - worker.report.task_count = task_count as i32; - } - JobReportUpdate::CompletedTaskCount(completed_task_count) => { - worker.report.completed_task_count = completed_task_count as i32; - } + let mut job_future = job.run( + WorkerContext { + library: library.clone(), + events_tx, + }, + commands_rx, + ); - JobReportUpdate::Message(message) => { - worker.report.message = message; + let mut last_reporter_watch_update = Instant::now(); + invalidate_query!(library, "jobs.reports"); + + let mut events_ended = false; + let job_result = 'job: loop { + select! { + job_result = &mut job_future => { + if !events_ended { + // There are still some progress events to be processed so we postpone the job result + while let Some(event) = events_rx.recv().await { + match event { + WorkerEvent::Progressed(updates) => { + Self::track_progress( + &mut report, + &mut last_reporter_watch_update, + &report_watch_tx, + start_time, + updates, + &library + ); + } + WorkerEvent::Stop => { + break 'job job_result; + }, } } + } else { + break 'job job_result; } - // Calculate elapsed time - if let Some(start_time) = worker.start_time { - let elapsed = Utc::now() - start_time; - - // Calculate remaining time - let task_count = worker.report.task_count as usize; - let completed_task_count = worker.report.completed_task_count as usize; - let remaining_task_count = task_count.saturating_sub(completed_task_count); - let remaining_time_per_task = elapsed / (completed_task_count + 1) as i32; // Adding 1 to avoid division by zero - let remaining_time = remaining_time_per_task * remaining_task_count as i32; - - // Update the report with estimated remaining time - worker.report.estimated_completion = Utc::now() - .checked_add_signed(remaining_time) - .unwrap_or(Utc::now()); - - let report = worker.report.clone(); - // emit a CoreEvent - library.emit(CoreEvent::JobProgress(JobProgressEvent { - id: report.id, - task_count: report.task_count, - completed_task_count: report.completed_task_count, - estimated_completion: report.estimated_completion, - message: report.message, - })); + }, + Some(event) = events_rx.recv() => { + match event { + WorkerEvent::Progressed(updates) => { + Self::track_progress( + &mut report, + &mut last_reporter_watch_update, + &report_watch_tx, + start_time, + updates, + &library + ) + } + WorkerEvent::Stop => {events_ended = true;}, } } - // WorkerEvent::Completed(done_tx, metadata) => { - // worker.report.status = JobStatus::Completed; - // worker.report.data = None; - // worker.report.metadata = metadata; - // worker.report.completed_at = Some(Utc::now()); - // if let Err(e) = worker.report.update(&library).await { - // error!("failed to update job report: {:#?}", e); - // } - - // invalidate_query!(library, "jobs.reports"); - - // info!("{}", worker.report); - - // done_tx - // .send(()) - // .expect("critical error: failed to send worker completion"); - - // break; - // } - // WorkerEvent::CompletedWithErrors(done_tx, metadata, errors) => { - // worker.report.status = JobStatus::CompletedWithErrors; - // worker.report.errors_text = errors; - // worker.report.data = None; - // worker.report.metadata = metadata; - // worker.report.completed_at = Some(Utc::now()); - // if let Err(e) = worker.report.update(&library).await { - // error!("failed to update job report: {:#?}", e); - // } - - // invalidate_query!(library, "jobs.reports"); - - // info!("{}", worker.report); - - // done_tx - // .send(()) - // .expect("critical error: failed to send worker completion"); - - // break; - // } - // WorkerEvent::Failed(done_tx) => { - // worker.report.status = JobStatus::Failed; - // worker.report.data = None; - // if let Err(e) = worker.report.update(&library).await { - // error!("failed to update job report: {:#?}", e); - // } - - // invalidate_query!(library, "library.list"); - // invalidate_query!(library, "jobs.reports"); - - // warn!("{}", worker.report); - - // done_tx - // .send(()) - // .expect("critical error: failed to send worker completion"); - - // break; - // } - WorkerEvent::Paused(state) => { - debug!("Setting worker status to paused"); - - worker.report.status = JobStatus::Paused; - worker.report.data = state; - - if let Err(e) = worker.report.update(&library).await { - error!("failed to update job report: {:#?}", e); - } - - info!("{}", worker.report); - - invalidate_query!(library, "jobs.reports"); - - break; - } } - } + }; + + // Need this drop here to sinalize to borrowchecker that we're done with our `&mut job` borrow for `run` method + drop(job_future); + + let next_job = Self::process_job_output(job, job_result, &mut report, &library).await; + + report_watch_tx.send(report.clone()).ok(); + + debug!( + "Worker completed Job", + report.id, report.name + ); + + manager.complete(&library, worker_id, hash, next_job).await; } + + async fn process_job_output( + mut job: Box, + job_result: Result, + report: &mut JobReport, + library: &Library, + ) -> Option> { + // Run the job and handle the result + match job_result { + // -> Job completed successfully + Ok(JobRunOutput { + metadata, + errors: JobRunErrors(errors), + next_job, + }) if errors.is_empty() => { + report.status = JobStatus::Completed; + report.data = None; + report.metadata = metadata; + report.completed_at = Some(Utc::now()); + if let Err(e) = report.update(library).await { + error!("failed to update job report: {:#?}", e); + } + + debug!("{report}"); + + invalidate_queries(library); + + return next_job; + } + // -> Job completed with errors + Ok(JobRunOutput { + metadata, + errors: JobRunErrors(errors), + next_job, + }) => { + warn!( + "Job completed with errors", + report.id, report.name + ); + report.status = JobStatus::CompletedWithErrors; + report.errors_text = errors; + report.data = None; + report.metadata = metadata; + report.completed_at = Some(Utc::now()); + if let Err(e) = report.update(library).await { + error!("failed to update job report: {:#?}", e); + } + + debug!("{report}"); + + invalidate_queries(library); + + return next_job; + } + // -> Job paused + Err(JobError::Paused(state, signal_tx)) => { + info!( + "Job paused, we will pause all children jobs", + report.id, report.name + ); + if let Err(e) = job.pause_children(library).await { + error!("Failed to pause children jobs: {e:#?}"); + } + + debug!("Setting worker status to paused"); + + report.status = JobStatus::Paused; + report.data = Some(state); + + if let Err(e) = report.update(library).await { + error!("failed to update job report: {:#?}", e); + } + + debug!("{report}"); + + invalidate_queries(library); + + signal_tx.send(()).ok(); + } + // -> Job paused + Err(JobError::Canceled(signal_tx)) => { + info!( + "Job canceled, we will cancel all children jobs", + report.id, report.name + ); + if let Err(e) = job.cancel_children(library).await { + error!("Failed to pause children jobs: {e:#?}"); + } + + debug!("Setting worker status to paused"); + + report.status = JobStatus::Canceled; + report.data = None; + + if let Err(e) = report.update(library).await { + error!("failed to update job report: {:#?}", e); + } + + debug!("{report}"); + + invalidate_queries(library); + + signal_tx.send(()).ok(); + } + // -> Job failed + Err(e) => { + error!( + "Job failed with error: {e:#?};", + report.id, report.name + ); + if let Err(e) = job.cancel_children(library).await { + error!("Failed to cancel children jobs: {e:#?}"); + } + + report.status = JobStatus::Failed; + report.data = None; + if let Err(e) = report.update(library).await { + error!("failed to update job report: {:#?}", e); + } + + warn!("{report}"); + + invalidate_queries(library); + } + } + + None + } +} + +struct JobWorkTable { + job: Box, + manager: Arc, + hash: u64, + report: JobReport, } fn invalidate_queries(library: &Library) { diff --git a/core/src/lib.rs b/core/src/lib.rs index eef0c3fc7..434b73160 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -15,6 +15,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; + use thiserror::Error; use tokio::{fs, sync::broadcast}; use tracing::{debug, error, info, warn}; @@ -39,7 +40,7 @@ pub(crate) mod volume; #[derive(Clone)] pub struct NodeContext { pub config: Arc, - pub jobs: Arc, + pub job_manager: Arc, pub location_manager: Arc, pub event_bus_tx: broadcast::Sender, } @@ -49,7 +50,7 @@ pub struct Node { config: Arc, pub library_manager: Arc, location_manager: Arc, - jobs: Arc, + job_manager: Arc, p2p: Arc, event_bus: (broadcast::Sender, broadcast::Receiver), // peer_request: tokio::sync::Mutex>, @@ -73,7 +74,8 @@ impl Node { .map_err(NodeError::FailedToInitializeConfig)?; debug!("Initialised 'NodeConfigManager'..."); - let jobs = JobManager::new(); + let job_manager = JobManager::new(); + debug!("Initialised 'JobManager'..."); let location_manager = LocationManager::new(); @@ -82,7 +84,7 @@ impl Node { data_dir.join("libraries"), NodeContext { config: config.clone(), - jobs: jobs.clone(), + job_manager: job_manager.clone(), location_manager: location_manager.clone(), // p2p: p2p.clone(), event_bus_tx: event_bus.0.clone(), @@ -106,7 +108,7 @@ impl Node { config, library_manager, location_manager, - jobs, + job_manager, p2p, event_bus, // peer_request: tokio::sync::Mutex::new(None), @@ -190,7 +192,7 @@ impl Node { pub async fn shutdown(&self) { info!("Spacedrive shutting down..."); - self.jobs.clone().shutdown().await; + self.job_manager.shutdown().await; self.p2p.shutdown().await; info!("Spacedrive Core shutdown successful!"); } diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 01c190c1c..1b839faef 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -73,7 +73,7 @@ impl Library { Init: JobInitData + 'static, { self.node_context - .jobs + .job_manager .clone() .ingest(self, jobable.into_job()) .await diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index 57702c8de..44d300c84 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -499,7 +499,7 @@ impl LibraryManager { if let Err(e) = library .node_context - .jobs + .job_manager .clone() .cold_resume(&library) .await diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 52b4fec77..79b79c6b8 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -1,27 +1,37 @@ use crate::{ - extract_job_data_mut, file_paths_db_fetcher_fn, - job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext}, - location::file_path_helper::{ - ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - IsolatedFilePathData, + file_paths_db_fetcher_fn, invalidate_query, + job::{ + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunMetadata, JobState, JobStepOutput, StatefulJob, WorkerContext, + }, + location::{ + file_path_helper::{ + ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, + IsolatedFilePathData, + }, + location_with_indexer_rules, }, to_remove_db_fetcher_fn, util::db::maybe_missing, }; -use std::{path::Path, sync::Arc}; +use std::{ + hash::{Hash, Hasher}, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use tracing::info; use super::{ - execute_indexer_save_step, finalize_indexer, iso_file_path_factory, - remove_non_existing_file_paths, + execute_indexer_save_step, iso_file_path_factory, remove_non_existing_file_paths, rules::IndexerRule, - update_notifier_fn, walk::{keep_walking, walk, ToWalkEntry, WalkResult}, - IndexerError, IndexerJobData, IndexerJobInit, IndexerJobSaveStep, ScanProgress, + IndexerError, IndexerJobSaveStep, }; /// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. @@ -32,6 +42,75 @@ const BATCH_SIZE: usize = 1000; /// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database. pub struct IndexerJob; +/// `IndexerJobInit` receives a `location::Data` object to be indexed +/// and possibly a `sub_path` to be indexed. The `sub_path` is used when +/// we want do index just a part of a location. +#[derive(Serialize, Deserialize, Debug)] +pub struct IndexerJobInit { + pub location: location_with_indexer_rules::Data, + pub sub_path: Option, +} + +impl Hash for IndexerJobInit { + fn hash(&self, state: &mut H) { + self.location.id.hash(state); + if let Some(ref sub_path) = self.sub_path { + sub_path.hash(state); + } + } +} +/// `IndexerJobData` contains the state of the indexer job, which includes a `location_path` that +/// is cached and casted on `PathBuf` from `local_path` column in the `location` table. It also +/// contains some metadata for logging purposes. +#[derive(Serialize, Deserialize, Debug)] +pub struct IndexerJobData { + indexed_path: PathBuf, + indexer_rules: Vec, +} + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct IndexerJobRunMetadata { + db_write_time: Duration, + scan_read_time: Duration, + total_paths: u64, + total_save_steps: u64, + indexed_count: u64, + removed_count: u64, +} + +impl JobRunMetadata for IndexerJobRunMetadata { + fn update(&mut self, new_data: Self) { + self.db_write_time += new_data.db_write_time; + self.scan_read_time += new_data.scan_read_time; + self.total_paths += new_data.total_paths; + self.total_save_steps += new_data.total_save_steps; + self.indexed_count += new_data.indexed_count; + self.removed_count += new_data.removed_count; + } +} + +#[derive(Clone)] +pub enum ScanProgress { + ChunkCount(usize), + SavedChunks(usize), + Message(String), +} + +impl IndexerJobData { + fn on_scan_progress(ctx: &WorkerContext, progress: Vec) { + ctx.progress( + progress + .into_iter() + .map(|p| match p { + ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c), + ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p), + ScanProgress::Message(m) => JobReportUpdate::Message(m), + }) + .collect(), + ) + } +} + impl JobInitData for IndexerJobInit { type Job = IndexerJob; } @@ -49,6 +128,7 @@ impl StatefulJob for IndexerJob { type Init = IndexerJobInit; type Data = IndexerJobData; type Step = IndexerJobStepInput; + type RunMetadata = IndexerJobRunMetadata; const NAME: &'static str = "indexer"; @@ -59,17 +139,16 @@ impl StatefulJob for IndexerJob { /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let location_id = state.init.location.id; - let location_path = - maybe_missing(&state.init.location.path, "location.path").map(Path::new)?; + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { + let location_id = init.location.id; + let location_path = maybe_missing(&init.location.path, "location.path").map(Path::new)?; let db = Arc::clone(&ctx.library.db); - let indexer_rules = state - .init + let indexer_rules = init .location .indexer_rules .iter() @@ -77,7 +156,7 @@ impl StatefulJob for IndexerJob { .collect::, _>>() .map_err(IndexerError::from)?; - let to_walk_path = match &state.init.sub_path { + let to_walk_path = match &init.sub_path { Some(sub_path) if sub_path != Path::new("") => { let full_path = ensure_sub_path_is_in_location(location_path, sub_path) .await @@ -110,7 +189,7 @@ impl StatefulJob for IndexerJob { walk( &to_walk_path, &indexer_rules, - update_notifier_fn(BATCH_SIZE, ctx), + update_notifier_fn(ctx), file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, location_path, &db), iso_file_path_factory(location_id, location_path), @@ -128,86 +207,95 @@ impl StatefulJob for IndexerJob { let total_paths = &mut 0; let to_walk_count = to_walk.len(); - state.steps.extend( - walked - .chunks(BATCH_SIZE) - .into_iter() - .enumerate() - .map(|(i, chunk)| { - let chunk_steps = chunk.collect::>(); + let steps = walked + .chunks(BATCH_SIZE) + .into_iter() + .enumerate() + .map(|(i, chunk)| { + let chunk_steps = chunk.collect::>(); - *total_paths += chunk_steps.len() as u64; + *total_paths += chunk_steps.len() as u64; - IndexerJobStepInput::Save(IndexerJobSaveStep { - chunk_idx: i, - walked: chunk_steps, - }) + IndexerJobStepInput::Save(IndexerJobSaveStep { + chunk_idx: i, + walked: chunk_steps, }) - .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)), - ); + }) + .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)) + .collect::>(); IndexerJobData::on_scan_progress( ctx, - vec![ScanProgress::Message(format!( - "Starting saving {total_paths} files or directories, \ + vec![ + ScanProgress::ChunkCount(steps.len() - to_walk_count), + ScanProgress::Message(format!( + "Starting saving {total_paths} files or directories, \ there still {to_walk_count} directories to index", - ))], + )), + ], ); - state.data = Some(IndexerJobData { + *data = Some(IndexerJobData { indexed_path: to_walk_path, indexer_rules, - db_write_time: db_delete_time, - scan_read_time, - total_paths: *total_paths, - indexed_count: 0, - removed_count, - total_save_steps: state.steps.len() as u64 - to_walk_count as u64, }); - if !errors.is_empty() { - Err(JobError::StepCompletedWithErrors( - errors.into_iter().map(|e| format!("{e}")).collect(), - )) - } else { - Ok(()) - } + Ok(( + IndexerJobRunMetadata { + db_write_time: db_delete_time, + scan_read_time, + total_paths: *total_paths, + indexed_count: 0, + removed_count, + total_save_steps: steps.len() as u64 - to_walk_count as u64, + }, + steps, + errors + .into_iter() + .map(|e| format!("{e}")) + .collect::>() + .into(), + ) + .into()) } /// Process each chunk of entries in the indexer job, writing to the `file_path` table async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let data = extract_job_data_mut!(state); - - match &state.steps[0] { + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { step, .. }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + run_metadata: &Self::RunMetadata, + ) -> Result, JobError> { + let mut new_metadata = Self::RunMetadata::default(); + match step { IndexerJobStepInput::Save(step) => { let start_time = Instant::now(); IndexerJobData::on_scan_progress( ctx, vec![ - ScanProgress::SavedChunks(step.chunk_idx), + ScanProgress::SavedChunks(step.chunk_idx + 1), ScanProgress::Message(format!( "Writing chunk {} of {} to database", - step.chunk_idx, data.total_save_steps + step.chunk_idx, run_metadata.total_save_steps )), ], ); let count = - execute_indexer_save_step(&state.init.location, step, &ctx.library.clone()) - .await?; + execute_indexer_save_step(&init.location, step, &ctx.library.clone()).await?; - data.indexed_count += count as u64; - data.db_write_time += start_time.elapsed(); + new_metadata.indexed_count = count as u64; + new_metadata.db_write_time = start_time.elapsed(); + + Ok(new_metadata.into()) } IndexerJobStepInput::Walk(to_walk_entry) => { - let location_id = state.init.location.id; + let location_id = init.location.id; let location_path = - maybe_missing(&state.init.location.path, "location.path").map(Path::new)?; + maybe_missing(&init.location.path, "location.path").map(Path::new)?; let db = Arc::clone(&ctx.library.db); @@ -222,7 +310,7 @@ impl StatefulJob for IndexerJob { keep_walking( to_walk_entry, &data.indexer_rules, - update_notifier_fn(BATCH_SIZE, ctx), + update_notifier_fn(ctx), file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, location_path, &db), iso_file_path_factory(location_id, location_path), @@ -230,57 +318,83 @@ impl StatefulJob for IndexerJob { .await? }; - data.scan_read_time += scan_start.elapsed(); + new_metadata.scan_read_time = scan_start.elapsed(); let db_delete_time = Instant::now(); // TODO pass these uuids to sync system - data.removed_count += remove_non_existing_file_paths(to_remove, &db).await?; - data.db_write_time += db_delete_time.elapsed(); + new_metadata.removed_count = remove_non_existing_file_paths(to_remove, &db).await?; + new_metadata.db_write_time = db_delete_time.elapsed(); - let _old_total = data.total_paths; - let _old_steps_count = state.steps.len() as u64; + let to_walk_count = to_walk.len(); - state.steps.extend( - walked - .chunks(BATCH_SIZE) - .into_iter() - .enumerate() - .map(|(i, chunk)| { - let chunk_steps = chunk.collect::>(); - data.total_paths += chunk_steps.len() as u64; + let more_steps = walked + .chunks(BATCH_SIZE) + .into_iter() + .enumerate() + .map(|(i, chunk)| { + let chunk_steps = chunk.collect::>(); + new_metadata.total_paths += chunk_steps.len() as u64; - IndexerJobStepInput::Save(IndexerJobSaveStep { - chunk_idx: i, - walked: chunk_steps, - }) + IndexerJobStepInput::Save(IndexerJobSaveStep { + chunk_idx: i, + walked: chunk_steps, }) - .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)), + }) + .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)) + .collect::>(); + + IndexerJobData::on_scan_progress( + ctx, + vec![ + ScanProgress::ChunkCount(more_steps.len() - to_walk_count), + ScanProgress::Message(format!( + "Scanned more {} files or directories; {} more directories to scan", + new_metadata.total_paths, to_walk_count + )), + ], ); - // IndexerJobData::on_scan_progress( - // &mut ctx, - // vec![ScanProgress::Message(format!( - // "Scanned more {} files or directories; {} more directories to scan", - // data.total_paths - old_total, - // state.steps.len() as u64 - old_steps_count - data.total_paths - // ))], - // ); - - if !errors.is_empty() { - return Err(JobError::StepCompletedWithErrors( - errors.into_iter().map(|e| format!("{e}")).collect(), - )); - } + Ok(( + more_steps, + new_metadata, + errors + .into_iter() + .map(|e| format!("{e}")) + .collect::>() + .into(), + ) + .into()) } } - - Ok(()) } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { - let location_path = - maybe_missing(&state.init.location.path, "location.path").map(Path::new)?; + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { + info!( + "scan of {} completed in {:?}. {} new files found, \ + indexed {} files in db. db write completed in {:?}", + maybe_missing(&state.init.location.path, "location.path")?, + state.run_metadata.scan_read_time, + state.run_metadata.total_paths, + state.run_metadata.indexed_count, + state.run_metadata.db_write_time, + ); - finalize_indexer(location_path, state, ctx) + if state.run_metadata.indexed_count > 0 || state.run_metadata.removed_count > 0 { + invalidate_query!(ctx.library, "search.paths"); + } + + Ok(Some(serde_json::to_value(state)?)) + } +} + +fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ { + move |path, total_entries| { + IndexerJobData::on_scan_progress( + ctx, + vec![ScanProgress::Message(format!( + "Scanning: {:?}; Found: {total_entries} entries", + path.file_name().unwrap_or(path.as_os_str()) + ))], + ); } } diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 3e49d4113..e8b382f11 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -1,22 +1,16 @@ use crate::{ - extract_job_data, invalidate_query, - job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::Library, prisma::{file_path, location, PrismaClient}, sync, util::{db::uuid_to_bytes, error::FileIOError}, }; -use std::{ - hash::{Hash, Hasher}, - path::{Path, PathBuf}, - time::Duration, -}; +use std::path::Path; use chrono::Utc; use rspc::ErrorCode; use sd_prisma::prisma_sync; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_json::json; use thiserror::Error; use tracing::info; @@ -34,71 +28,15 @@ mod walk; use rules::IndexerRuleError; use walk::WalkedEntry; +pub use indexer_job::IndexerJobInit; pub use shallow::*; -/// `IndexerJobInit` receives a `location::Data` object to be indexed -/// and possibly a `sub_path` to be indexed. The `sub_path` is used when -/// we want do index just a part of a location. -#[derive(Serialize, Deserialize)] -pub struct IndexerJobInit { - pub location: location_with_indexer_rules::Data, - pub sub_path: Option, -} - -impl Hash for IndexerJobInit { - fn hash(&self, state: &mut H) { - self.location.id.hash(state); - if let Some(ref sub_path) = self.sub_path { - sub_path.hash(state); - } - } -} -/// `IndexerJobData` contains the state of the indexer job, which includes a `location_path` that -/// is cached and casted on `PathBuf` from `local_path` column in the `location` table. It also -/// contains some metadata for logging purposes. -#[derive(Serialize, Deserialize)] -pub struct IndexerJobData { - indexed_path: PathBuf, - indexer_rules: Vec, - db_write_time: Duration, - scan_read_time: Duration, - total_paths: u64, - total_save_steps: u64, - indexed_count: u64, - removed_count: u64, -} - -impl IndexerJobData { - fn on_scan_progress(ctx: &mut WorkerContext, progress: Vec) { - ctx.progress( - progress - .iter() - .map(|p| match p.clone() { - ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c), - ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p), - ScanProgress::Message(m) => { - // println!("MESSAGE: {:?}", m); - JobReportUpdate::Message(m) - } - }) - .collect(), - ) - } -} - #[derive(Serialize, Deserialize, Debug)] pub struct IndexerJobSaveStep { chunk_idx: usize, walked: Vec, } -#[derive(Clone)] -pub enum ScanProgress { - ChunkCount(usize), - SavedChunks(usize), - Message(String), -} - /// Error type for the indexer module #[derive(Error, Debug)] pub enum IndexerError { @@ -238,50 +176,6 @@ async fn execute_indexer_save_step( Ok(count) } -fn finalize_indexer( - location_path: impl AsRef, - state: &JobState, - ctx: &mut WorkerContext, -) -> JobResult -where - SJob: StatefulJob, - Init: Serialize + DeserializeOwned + Send + Sync + Hash, - Step: Serialize + DeserializeOwned + Send + Sync, -{ - let data = extract_job_data!(state); - - info!( - "scan of {} completed in {:?}. {} new files found, \ - indexed {} files in db. db write completed in {:?}", - location_path.as_ref().display(), - data.scan_read_time, - data.total_paths, - data.indexed_count, - data.db_write_time, - ); - - if data.indexed_count > 0 || data.removed_count > 0 { - invalidate_query!(ctx.library, "search.paths"); - } - - Ok(Some(serde_json::to_value(state)?)) -} - -fn update_notifier_fn(batch_size: usize, ctx: &mut WorkerContext) -> impl FnMut(&Path, usize) + '_ { - move |path, total_entries| { - IndexerJobData::on_scan_progress( - ctx, - vec![ - ScanProgress::Message(format!( - "Scanning: {:?}", - path.file_name().unwrap_or(path.as_os_str()) - )), - ScanProgress::ChunkCount(total_entries / batch_size), - ], - ); - } -} - fn iso_file_path_factory( location_id: location::id::Type, location_path: &Path, diff --git a/core/src/location/manager/mod.rs b/core/src/location/manager/mod.rs index db50ca649..2ace800e7 100644 --- a/core/src/location/manager/mod.rs +++ b/core/src/location/manager/mod.rs @@ -189,7 +189,7 @@ impl LocationManager { }) .await?; - return rx.await?; + rx.await? } #[cfg(not(feature = "location-watcher"))] diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 6c2308531..ff4967f14 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -20,6 +20,7 @@ use std::{ path::{Component, Path, PathBuf}, }; +use chrono::Utc; use futures::future::TryFutureExt; use normpath::PathExt; use prisma_client_rust::{operator::and, or, QueryError}; @@ -575,6 +576,8 @@ async fn create_location( name = "Unknown".to_string() } + let date_created = Utc::now(); + let location = sync .write_op( db, @@ -585,6 +588,7 @@ async fn create_location( [ (location::name::NAME, json!(&name)), (location::path::NAME, json!(&location_path)), + (location::date_created::NAME, json!(date_created)), ( location::node::NAME, json!(sync::node::SyncId { @@ -599,6 +603,7 @@ async fn create_location( vec![ location::name::set(Some(name.clone())), location::path::set(Some(location_path)), + location::date_created::set(Some(date_created.into())), location::node::connect(node::id::equals(library.node_local_id)), ], ) diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 04856589a..e309060be 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -1,7 +1,7 @@ use crate::{ - extract_job_data, extract_job_data_mut, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunMetadata, JobState, JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::{ @@ -20,9 +20,7 @@ use std::{ use serde::{Deserialize, Serialize}; use tracing::info; -use super::{ - process_identifier_file_paths, FileIdentifierJobError, FileIdentifierReport, CHUNK_SIZE, -}; +use super::{process_identifier_file_paths, FileIdentifierJobError, CHUNK_SIZE}; pub struct FileIdentifierJob {} @@ -31,7 +29,7 @@ pub struct FileIdentifierJob {} /// and uniquely identifies them: /// - first: generating the cas_id and extracting metadata /// - finally: creating unique object records, and linking them to their file_paths -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct FileIdentifierJobInit { pub location: location::Data, pub sub_path: Option, // subpath to start from @@ -46,13 +44,36 @@ impl Hash for FileIdentifierJobInit { } } -#[derive(Serialize, Deserialize)] -pub struct FileIdentifierJobState { - cursor: file_path::id::Type, - report: FileIdentifierReport, +#[derive(Serialize, Deserialize, Debug)] +pub struct FileIdentifierJobData { + location_path: PathBuf, maybe_sub_iso_file_path: Option>, } +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct FileIdentifierJobRunMetadata { + report: FileIdentifierReport, + cursor: file_path::id::Type, +} + +impl JobRunMetadata for FileIdentifierJobRunMetadata { + fn update(&mut self, new_data: Self) { + self.report.total_orphan_paths += new_data.report.total_orphan_paths; + self.report.total_objects_created += new_data.report.total_objects_created; + self.report.total_objects_linked += new_data.report.total_objects_linked; + self.report.total_objects_ignored += new_data.report.total_objects_ignored; + self.cursor = new_data.cursor; + } +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct FileIdentifierReport { + total_orphan_paths: usize, + total_objects_created: usize, + total_objects_linked: usize, + total_objects_ignored: usize, +} + impl JobInitData for FileIdentifierJobInit { type Job = FileIdentifierJob; } @@ -60,8 +81,9 @@ impl JobInitData for FileIdentifierJobInit { #[async_trait::async_trait] impl StatefulJob for FileIdentifierJob { type Init = FileIdentifierJobInit; - type Data = FileIdentifierJobState; + type Data = FileIdentifierJobData; type Step = (); + type RunMetadata = FileIdentifierJobRunMetadata; const NAME: &'static str = "file_identifier"; @@ -71,19 +93,19 @@ impl StatefulJob for FileIdentifierJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; info!("Identifying orphan File Paths..."); - let location_id = state.init.location.id; + let location_id = init.location.id; - let location_path = - maybe_missing(&state.init.location.path, "location.path").map(Path::new)?; + let location_path = maybe_missing(&init.location.path, "location.path").map(Path::new)?; - let maybe_sub_iso_file_path = match &state.init.sub_path { + let maybe_sub_iso_file_path = match &init.sub_path { Some(sub_path) if sub_path != Path::new("") => { let full_path = ensure_sub_path_is_in_location(location_path, sub_path) .await @@ -113,17 +135,12 @@ impl StatefulJob for FileIdentifierJob { count_orphan_file_paths(db, location_id, &maybe_sub_iso_file_path).await?; // Initializing `state.data` here because we need a complete state in case of early finish - state.data = Some(FileIdentifierJobState { - report: FileIdentifierReport { - location_path: location_path.to_path_buf(), - total_orphan_paths: orphan_count, - ..Default::default() - }, - cursor: 0, + *data = Some(FileIdentifierJobData { + location_path: location_path.to_path_buf(), maybe_sub_iso_file_path, }); - let data = extract_job_data_mut!(state); + let data = data.as_ref().expect("we just set it"); if orphan_count == 0 { return Err(JobError::EarlyFinish { @@ -155,33 +172,37 @@ impl StatefulJob for FileIdentifierJob { .await? .expect("We already validated before that there are orphans `file_path`s"); // SAFETY: We already validated before that there are orphans `file_path`s - data.cursor = first_path.id; - - state.steps.extend((0..task_count).map(|_| ())); - - Ok(()) + Ok(( + FileIdentifierJobRunMetadata { + report: FileIdentifierReport { + total_orphan_paths: orphan_count, + ..Default::default() + }, + cursor: first_path.id, + }, + vec![(); task_count], + ) + .into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let FileIdentifierJobState { - ref mut cursor, - ref mut report, - ref maybe_sub_iso_file_path, - } = extract_job_data_mut!(state); + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { step_number, .. }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + run_metadata: &Self::RunMetadata, + ) -> Result, JobError> { + let location = &init.location; - let step_number = state.step_number; - let location = &state.init.location; + let mut new_metadata = Self::RunMetadata::default(); // get chunk of orphans to process let file_paths = get_orphan_file_paths( &ctx.library.db, location.id, - *cursor, - maybe_sub_iso_file_path, + run_metadata.cursor, + &data.maybe_sub_iso_file_path, ) .await?; @@ -195,37 +216,40 @@ impl StatefulJob for FileIdentifierJob { }); } - let (total_objects_created, total_objects_linked) = process_identifier_file_paths( - location, - &file_paths, - step_number, - cursor, - &ctx.library, - report.total_orphan_paths, - ) - .await?; + let (total_objects_created, total_objects_linked, new_cursor) = + process_identifier_file_paths( + location, + &file_paths, + step_number, + run_metadata.cursor, + &ctx.library, + run_metadata.report.total_orphan_paths, + ) + .await?; - report.total_objects_created += total_objects_created; - report.total_objects_linked += total_objects_linked; + new_metadata.report.total_objects_created = total_objects_created; + new_metadata.report.total_objects_linked = total_objects_linked; + new_metadata.cursor = new_cursor; ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(step_number), + JobReportUpdate::CompletedTaskCount(step_number + 1), JobReportUpdate::Message(format!( "Processed {} of {} orphan Paths", step_number * CHUNK_SIZE, - report.total_orphan_paths + run_metadata.report.total_orphan_paths )), ]); - Ok(()) + Ok(new_metadata.into()) } - async fn finalize(&mut self, _: &mut WorkerContext, state: &mut JobState) -> JobResult { - let report = &extract_job_data!(state).report; + async fn finalize(&self, _: &WorkerContext, state: &JobState) -> JobResult { + info!( + "Finalizing identifier job: {:?}", + &state.run_metadata.report + ); - info!("Finalizing identifier job: {report:?}"); - - Ok(Some(serde_json::to_value(report)?)) + Ok(Some(serde_json::to_value(state)?)) } } diff --git a/core/src/object/file_identifier/mod.rs b/core/src/object/file_identifier/mod.rs index a445d503a..cd6fa3fdb 100644 --- a/core/src/object/file_identifier/mod.rs +++ b/core/src/object/file_identifier/mod.rs @@ -19,11 +19,10 @@ use sd_sync::CRDTOperation; use std::{ collections::{HashMap, HashSet}, - path::{Path, PathBuf}, + path::Path, }; use futures::future::join_all; -use serde::{Deserialize, Serialize}; use serde_json::json; use thiserror::Error; use tokio::fs; @@ -94,15 +93,6 @@ impl FileMetadata { } } -#[derive(Serialize, Deserialize, Debug, Default)] -pub struct FileIdentifierReport { - location_path: PathBuf, - total_orphan_paths: usize, - total_objects_created: usize, - total_objects_linked: usize, - total_objects_ignored: usize, -} - async fn identifier_job_step( Library { db, sync, .. }: &Library, location: &location::Data, @@ -346,10 +336,10 @@ async fn process_identifier_file_paths( location: &location::Data, file_paths: &[file_path_for_file_identifier::Data], step_number: usize, - cursor: &mut file_path::id::Type, + cursor: file_path::id::Type, library: &Library, orphan_count: usize, -) -> Result<(usize, usize), JobError> { +) -> Result<(usize, usize, file_path::id::Type), JobError> { info!( "Processing {:?} orphan Paths. ({} completed of {})", file_paths.len(), @@ -357,12 +347,16 @@ async fn process_identifier_file_paths( orphan_count ); - let counts = identifier_job_step(library, location, file_paths).await?; + let (total_objects_created, total_objects_linked) = + identifier_job_step(library, location, file_paths).await?; - // set the step data cursor to the last row of this chunk - if let Some(last_row) = file_paths.last() { - *cursor = last_row.id; - } - - Ok(counts) + Ok(( + total_objects_created, + total_objects_linked, + // returns a new cursor to the last row of this chunk or the current one + file_paths + .last() + .map(|last_row| last_row.id) + .unwrap_or(cursor), + )) } diff --git a/core/src/object/file_identifier/shallow.rs b/core/src/object/file_identifier/shallow.rs index 3d4547f75..8fef7b3e7 100644 --- a/core/src/object/file_identifier/shallow.rs +++ b/core/src/object/file_identifier/shallow.rs @@ -98,15 +98,16 @@ pub async fn shallow( let file_paths = get_orphan_file_paths(&library.db, location.id, *cursor, sub_iso_file_path).await?; - process_identifier_file_paths( + let (_, _, new_cursor) = process_identifier_file_paths( location, &file_paths, step_number, - cursor, + *cursor, library, orphan_count, ) .await?; + *cursor = new_cursor; } invalidate_query!(library, "search.paths"); diff --git a/core/src/object/fs/copy.rs b/core/src/object/fs/copy.rs index 5e64f1416..59a82c26b 100644 --- a/core/src/object/fs/copy.rs +++ b/core/src/object/fs/copy.rs @@ -1,7 +1,8 @@ use crate::{ - extract_job_data, invalidate_query, + invalidate_query, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunErrors, JobState, JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::{join_location_relative_path, IsolatedFilePathData}, @@ -27,11 +28,11 @@ use super::{ pub struct FileCopierJob {} #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FileCopierJobState { +pub struct FileCopierJobData { sources_location_path: PathBuf, } -#[derive(Serialize, Deserialize, Hash, Type)] +#[derive(Serialize, Deserialize, Hash, Type, Debug)] pub struct FileCopierJobInit { pub source_location_id: location::id::Type, pub target_location_id: location::id::Type, @@ -40,7 +41,7 @@ pub struct FileCopierJobInit { pub target_file_name_suffix: Option, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug)] pub struct FileCopierJobStep { pub source_file_data: FileData, pub target_full_path: PathBuf, @@ -53,8 +54,9 @@ impl JobInitData for FileCopierJobInit { #[async_trait::async_trait] impl StatefulJob for FileCopierJob { type Init = FileCopierJobInit; - type Data = FileCopierJobState; + type Data = FileCopierJobData; type Step = FileCopierJobStep; + type RunMetadata = (); const NAME: &'static str = "file_copier"; @@ -64,158 +66,160 @@ impl StatefulJob for FileCopierJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; let (sources_location_path, targets_location_path) = fetch_source_and_target_location_paths( db, - state.init.source_location_id, - state.init.target_location_id, + init.source_location_id, + init.target_location_id, ) .await?; - state.steps = get_many_files_datas( - db, - &sources_location_path, - &state.init.sources_file_path_ids, - ) - .await? - .into_iter() - .flat_map(|file_data| { - // add the currently viewed subdirectory to the location root - let mut full_target_path = join_location_relative_path( - &targets_location_path, - &state.init.target_location_relative_directory_path, - ); + let steps = get_many_files_datas(db, &sources_location_path, &init.sources_file_path_ids) + .await? + .into_iter() + .flat_map(|file_data| { + // add the currently viewed subdirectory to the location root + let mut full_target_path = join_location_relative_path( + &targets_location_path, + &init.target_location_relative_directory_path, + ); - full_target_path.push(construct_target_filename( - &file_data, - &state.init.target_file_name_suffix, - )?); + full_target_path.push(construct_target_filename( + &file_data, + &init.target_file_name_suffix, + )?); - Ok::<_, MissingFieldError>(FileCopierJobStep { - source_file_data: file_data, - target_full_path: full_target_path, + Ok::<_, MissingFieldError>(FileCopierJobStep { + source_file_data: file_data, + target_full_path: full_target_path, + }) }) - }) - .collect(); + .collect::>(); - state.data = Some(FileCopierJobState { + *data = Some(FileCopierJobData { sources_location_path, }); - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]); - Ok(()) + Ok(steps.into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let FileCopierJobStep { - source_file_data, - target_full_path, - } = &state.steps[0]; + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { + step: FileCopierJobStep { + source_file_data, + target_full_path, + }, + step_number, + total_steps, + }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { + let res = if maybe_missing(source_file_data.file_path.is_dir, "file_path.is_dir")? { + let mut more_steps = Vec::new(); - let data = extract_job_data!(state); + fs::create_dir_all(target_full_path) + .await + .map_err(|e| FileIOError::from((target_full_path, e)))?; - let res = - if maybe_missing(source_file_data.file_path.is_dir, "file_path.is_dir")? { - fs::create_dir_all(target_full_path) - .await - .map_err(|e| FileIOError::from((target_full_path, e)))?; + let mut read_dir = fs::read_dir(&source_file_data.full_path) + .await + .map_err(|e| FileIOError::from((&source_file_data.full_path, e)))?; - let mut read_dir = fs::read_dir(&source_file_data.full_path) - .await - .map_err(|e| FileIOError::from((&source_file_data.full_path, e)))?; - - // Can't use the `steps` borrow from here ownwards, or you feel the wrath of the borrow checker - while let Some(children_entry) = read_dir.next_entry().await.map_err(|e| { - FileIOError::from((&state.steps[0].source_file_data.full_path, e)) - })? { - let children_path = children_entry.path(); - let target_children_full_path = state.steps[0].target_full_path.join( + // Can't use the `steps` borrow from here ownwards, or you feel the wrath of the borrow checker + while let Some(children_entry) = read_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&source_file_data.full_path, e)))? + { + let children_path = children_entry.path(); + let target_children_full_path = target_full_path.join( children_path - .strip_prefix(&state.steps[0].source_file_data.full_path) + .strip_prefix(&source_file_data.full_path) .expect("We got the children path from the read_dir, so it should be a child of the source path"), ); - // Currently not supporting file_name suffixes children files in a directory being copied - state.steps.push_back(FileCopierJobStep { - target_full_path: target_children_full_path, - source_file_data: get_file_data_from_isolated_file_path( - &ctx.library.db, + // Currently not supporting file_name suffixes children files in a directory being copied + more_steps.push(FileCopierJobStep { + target_full_path: target_children_full_path, + source_file_data: get_file_data_from_isolated_file_path( + &ctx.library.db, + &data.sources_location_path, + &IsolatedFilePathData::new( + init.source_location_id, &data.sources_location_path, - &IsolatedFilePathData::new( - state.init.source_location_id, - &data.sources_location_path, - &children_path, - children_entry - .metadata() - .await - .map_err(|e| FileIOError::from((&children_path, e)))? - .is_dir(), - ) - .map_err(FileSystemJobsError::from)?, + &children_path, + children_entry + .metadata() + .await + .map_err(|e| FileIOError::from((&children_path, e)))? + .is_dir(), ) - .await?, - }); + .map_err(FileSystemJobsError::from)?, + ) + .await?, + }); - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount( + total_steps + more_steps.len(), + )]); + } + + Ok(more_steps.into()) + } else if &source_file_data.full_path == target_full_path { + // File is already here, do nothing + Ok(().into()) + } else { + match fs::metadata(target_full_path).await { + Ok(_) => { + // only skip as it could be half way through a huge directory copy and run into an issue + warn!( + "Skipping {} as it would be overwritten", + target_full_path.display() + ); + + Ok(JobRunErrors(vec![FileSystemJobsError::WouldOverwrite( + target_full_path.clone().into_boxed_path(), + ) + .to_string()]) + .into()) } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + trace!( + "Copying from {} to {}", + source_file_data.full_path.display(), + target_full_path.display() + ); - Ok(()) - } else if &source_file_data.full_path == target_full_path { - // File is already here, do nothing - Ok(()) - } else { - match fs::metadata(target_full_path).await { - Ok(_) => { - // only skip as it could be half way through a huge directory copy and run into an issue - warn!( - "Skipping {} as it would be overwritten", - target_full_path.display() - ); + fs::copy(&source_file_data.full_path, &target_full_path) + .await + // Using the ? here because we don't want to increase the completed task + // count in case of file system errors + .map_err(|e| FileIOError::from((target_full_path, e)))?; - Err(JobError::StepCompletedWithErrors(vec![ - FileSystemJobsError::WouldOverwrite( - target_full_path.clone().into_boxed_path(), - ) - .to_string(), - ])) - } - Err(e) if e.kind() == io::ErrorKind::NotFound => { - trace!( - "Copying from {} to {}", - source_file_data.full_path.display(), - target_full_path.display() - ); - - fs::copy(&source_file_data.full_path, &target_full_path) - .await - // Using the ? here because we don't want to increase the completed task - // count in case of file system errors - .map_err(|e| FileIOError::from((target_full_path, e)))?; - - Ok(()) - } - Err(e) => return Err(FileIOError::from((target_full_path, e)).into()), + Ok(().into()) } - }; + Err(e) => return Err(FileIOError::from((target_full_path, e)).into()), + } + }; - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); res } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { invalidate_query!(ctx.library, "search.paths"); Ok(Some(serde_json::to_value(&state.init)?)) diff --git a/core/src/object/fs/cut.rs b/core/src/object/fs/cut.rs index 981d0818a..dba62c7f1 100644 --- a/core/src/object/fs/cut.rs +++ b/core/src/object/fs/cut.rs @@ -1,7 +1,8 @@ use crate::{ - extract_job_data, invalidate_query, + invalidate_query, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunErrors, JobState, JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::push_location_relative_path, @@ -21,7 +22,7 @@ use super::{fetch_source_and_target_location_paths, get_many_files_datas, FileDa pub struct FileCutterJob {} -#[derive(Serialize, Deserialize, Hash, Type)] +#[derive(Serialize, Deserialize, Hash, Type, Debug)] pub struct FileCutterJobInit { pub source_location_id: location::id::Type, pub target_location_id: location::id::Type, @@ -29,8 +30,8 @@ pub struct FileCutterJobInit { pub target_location_relative_directory_path: PathBuf, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FileCutterJobState { +#[derive(Serialize, Deserialize, Debug)] +pub struct FileCutterJobData { full_target_directory_path: PathBuf, } @@ -41,8 +42,9 @@ impl JobInitData for FileCutterJobInit { #[async_trait::async_trait] impl StatefulJob for FileCutterJob { type Init = FileCutterJobInit; - type Data = FileCutterJobState; + type Data = FileCutterJobData; type Step = FileData; + type RunMetadata = (); const NAME: &'static str = "file_cutter"; @@ -52,55 +54,56 @@ impl StatefulJob for FileCutterJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; let (sources_location_path, targets_location_path) = fetch_source_and_target_location_paths( db, - state.init.source_location_id, - state.init.target_location_id, + init.source_location_id, + init.target_location_id, ) .await?; let full_target_directory_path = push_location_relative_path( targets_location_path, - &state.init.target_location_relative_directory_path, + &init.target_location_relative_directory_path, ); - state.data = Some(FileCutterJobState { + *data = Some(FileCutterJobData { full_target_directory_path, }); - state.steps = get_many_files_datas( - db, - &sources_location_path, - &state.init.sources_file_path_ids, - ) - .await? - .into(); + let steps = + get_many_files_datas(db, &sources_location_path, &init.sources_file_path_ids).await?; - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]); - Ok(()) + Ok(steps.into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let file_data = &state.steps[0]; - - let full_output = extract_job_data!(state) + ctx: &WorkerContext, + _: &Self::Init, + CurrentStep { + step: file_data, + step_number, + .. + }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { + let full_output = data .full_target_directory_path .join(construct_target_filename(file_data, &None)?); let res = if file_data.full_path == full_output { // File is already here, do nothing - Ok(()) + Ok(().into()) } else { match fs::metadata(&full_output).await { Ok(_) => { @@ -109,10 +112,11 @@ impl StatefulJob for FileCutterJob { full_output.display() ); - return Err(JobError::StepCompletedWithErrors(vec![ - FileSystemJobsError::WouldOverwrite(full_output.into_boxed_path()) - .to_string(), - ])); + Ok(JobRunErrors(vec![FileSystemJobsError::WouldOverwrite( + full_output.into_boxed_path(), + ) + .to_string()]) + .into()) } Err(e) if e.kind() == io::ErrorKind::NotFound => { trace!( @@ -125,21 +129,19 @@ impl StatefulJob for FileCutterJob { .await .map_err(|e| FileIOError::from((&file_data.full_path, e)))?; - Ok(()) + Ok(().into()) } Err(e) => return Err(FileIOError::from((&full_output, e)).into()), } }; - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); res } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { invalidate_query!(ctx.library, "search.paths"); Ok(Some(serde_json::to_value(&state.init)?)) diff --git a/core/src/object/fs/decrypt.rs b/core/src/object/fs/decrypt.rs index 6e9a30f73..871c7ba1b 100644 --- a/core/src/object/fs/decrypt.rs +++ b/core/src/object/fs/decrypt.rs @@ -150,7 +150,7 @@ // Ok(()) // } -// async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { +// async fn finalize(&self, ctx: WorkerContext, state: &mut JobState) -> JobResult { // invalidate_query!(ctx.library, "search.paths"); // // mark job as successful diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index 446ce637e..0709b5484 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -1,7 +1,8 @@ use crate::{ invalidate_query, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, JobState, + JobStepOutput, StatefulJob, WorkerContext, }, library::Library, prisma::{file_path, location}, @@ -19,7 +20,7 @@ use super::{get_location_path_from_location_id, get_many_files_datas, FileData}; pub struct FileDeleterJob {} -#[derive(Serialize, Deserialize, Hash, Type)] +#[derive(Serialize, Deserialize, Hash, Type, Debug)] pub struct FileDeleterJobInit { pub location_id: location::id::Type, pub file_path_ids: Vec, @@ -34,6 +35,7 @@ impl StatefulJob for FileDeleterJob { type Init = FileDeleterJobInit; type Data = (); type Step = FileData; + type RunMetadata = (); const NAME: &'static str = "file_deleter"; @@ -43,32 +45,37 @@ impl StatefulJob for FileDeleterJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; - state.steps = get_many_files_datas( + let steps = get_many_files_datas( db, - get_location_path_from_location_id(db, state.init.location_id).await?, - &state.init.file_path_ids, + get_location_path_from_location_id(db, init.location_id).await?, + &init.file_path_ids, ) - .await? - .into_iter() - .collect(); + .await?; - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]); - Ok(()) + // Must fill in the data, otherwise the job will not run + *data = Some(()); + + Ok(steps.into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - let step = &state.steps[0]; - + ctx: &WorkerContext, + _: &Self::Init, + CurrentStep { + step, step_number, .. + }: CurrentStep<'_, Self::Step>, + _: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { // need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui) // maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be deleted (and where?) @@ -95,14 +102,12 @@ impl StatefulJob for FileDeleterJob { } } - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); - Ok(()) + Ok(().into()) } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { invalidate_query!(ctx.library, "search.paths"); Ok(Some(serde_json::to_value(&state.init)?)) diff --git a/core/src/object/fs/encrypt.rs b/core/src/object/fs/encrypt.rs index 7ecb1aeb8..d1cfd251a 100644 --- a/core/src/object/fs/encrypt.rs +++ b/core/src/object/fs/encrypt.rs @@ -252,7 +252,7 @@ // Ok(()) // } -// async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { +// async fn finalize(&self, ctx: WorkerContext, state: &mut JobState) -> JobResult { // invalidate_query!(ctx.library, "search.paths"); // // mark job as successful diff --git a/core/src/object/fs/erase.rs b/core/src/object/fs/erase.rs index 5b01a96fd..b131bdc7f 100644 --- a/core/src/object/fs/erase.rs +++ b/core/src/object/fs/erase.rs @@ -1,7 +1,8 @@ use crate::{ - extract_job_data_mut, invalidate_query, + invalidate_query, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunMetadata, JobState, JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::IsolatedFilePathData, @@ -29,7 +30,7 @@ use super::{ pub struct FileEraserJob {} #[serde_as] -#[derive(Serialize, Deserialize, Hash, Type)] +#[derive(Serialize, Deserialize, Hash, Type, Debug)] pub struct FileEraserJobInit { pub location_id: location::id::Type, pub file_path_ids: Vec, @@ -42,17 +43,29 @@ impl JobInitData for FileEraserJobInit { type Job = FileEraserJob; } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct FileEraserJobData { location_path: PathBuf, +} + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct FileEraserJobRunMetadata { diretories_to_remove: Vec, } +impl JobRunMetadata for FileEraserJobRunMetadata { + fn update(&mut self, new_data: Self) { + self.diretories_to_remove + .extend(new_data.diretories_to_remove); + } +} + #[async_trait::async_trait] impl StatefulJob for FileEraserJob { type Init = FileEraserJobInit; type Data = FileEraserJobData; type Step = FileData; + type RunMetadata = FileEraserJobRunMetadata; const NAME: &'static str = "file_eraser"; @@ -62,40 +75,43 @@ impl StatefulJob for FileEraserJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; - let location_path = get_location_path_from_location_id(db, state.init.location_id).await?; + let location_path = get_location_path_from_location_id(db, init.location_id).await?; - state.steps = get_many_files_datas(db, &location_path, &state.init.file_path_ids) - .await? - .into(); + let steps = get_many_files_datas(db, &location_path, &init.file_path_ids).await?; - state.data = Some(FileEraserJobData { - location_path, - diretories_to_remove: vec![], - }); + *data = Some(FileEraserJobData { location_path }); - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]); - Ok(()) + Ok((Default::default(), steps).into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { + step, + step_number, + total_steps, + }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { // need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui) // maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be erased (and where?) - let step = &state.steps[0]; + let mut new_metadata = Self::RunMetadata::default(); // Had to use `state.steps[0]` all over the place to appease the borrow checker - if maybe_missing(step.file_path.is_dir, "file_path.is_dir")? { - let data = extract_job_data_mut!(state); + let res = if maybe_missing(step.file_path.is_dir, "file_path.is_dir")? { + let mut more_steps = Vec::new(); let mut dir = tokio::fs::read_dir(&step.full_path) .await @@ -105,16 +121,16 @@ impl StatefulJob for FileEraserJob { while let Some(children_entry) = dir .next_entry() .await - .map_err(|e| FileIOError::from((&state.steps[0].full_path, e)))? + .map_err(|e| FileIOError::from((&step.full_path, e)))? { let children_path = children_entry.path(); - state.steps.push_back( + more_steps.push( get_file_data_from_isolated_file_path( &ctx.library.db, &data.location_path, &IsolatedFilePathData::new( - state.init.location_id, + init.location_id, &data.location_path, &children_path, children_entry @@ -128,10 +144,15 @@ impl StatefulJob for FileEraserJob { .await?, ); - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount( + total_steps + more_steps.len(), + )]); } - data.diretories_to_remove - .push(state.steps[0].full_path.clone()); + new_metadata + .diretories_to_remove + .push(step.full_path.clone()); + + Ok((more_steps, new_metadata).into()) } else { let mut file = OpenOptions::new() .read(true) @@ -145,7 +166,7 @@ impl StatefulJob for FileEraserJob { .map_err(|e| FileIOError::from((&step.full_path, e)))? .len(); - sd_crypto::fs::erase::erase(&mut file, file_len as usize, state.init.passes).await?; + sd_crypto::fs::erase::erase(&mut file, file_len as usize, init.passes).await?; file.set_len(0) .await @@ -160,20 +181,22 @@ impl StatefulJob for FileEraserJob { fs::remove_file(&step.full_path) .await .map_err(|e| FileIOError::from((&step.full_path, e)))?; - } - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); + Ok(None.into()) + }; - Ok(()) + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); + + res } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { try_join_all( - extract_job_data_mut!(state) + state + .run_metadata .diretories_to_remove - .drain(..) + .iter() + .cloned() .map(|data| async { fs::remove_dir_all(&data) .await diff --git a/core/src/object/fs/mod.rs b/core/src/object/fs/mod.rs index 5a97fd66e..5b8ad93d3 100644 --- a/core/src/object/fs/mod.rs +++ b/core/src/object/fs/mod.rs @@ -33,7 +33,7 @@ pub enum ObjectType { Directory, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug)] pub struct FileData { pub file_path: file_path_with_object::Data, pub full_path: PathBuf, diff --git a/core/src/object/preview/thumbnail/mod.rs b/core/src/object/preview/thumbnail/mod.rs index 9d54f7550..25a0eb9b7 100644 --- a/core/src/object/preview/thumbnail/mod.rs +++ b/core/src/object/preview/thumbnail/mod.rs @@ -1,7 +1,6 @@ use crate::{ api::CoreEvent, - invalidate_query, - job::{JobError, JobReportUpdate, JobResult, JobState, WorkerContext}, + job::JobError, library::Library, location::file_path_helper::{file_path_for_thumbnailer, FilePathError, IsolatedFilePathData}, prisma::location, @@ -27,8 +26,6 @@ use tokio::{fs, io, task::block_in_place}; use tracing::{error, info, trace, warn}; use webp::Encoder; -use self::thumbnailer_job::ThumbnailerJob; - mod directory; mod shallow; mod shard; @@ -78,13 +75,6 @@ static FILTERED_IMAGE_EXTENSIONS: Lazy> = Lazy::new(|| { .collect() }); -#[derive(Debug, Serialize, Deserialize)] -pub struct ThumbnailerJobState { - thumbnail_dir: PathBuf, - location_path: PathBuf, - report: ThumbnailerJobReport, -} - #[derive(Error, Debug)] pub enum ThumbnailerError { #[error("sub path not found: ", .0.display())] @@ -100,15 +90,6 @@ pub enum ThumbnailerError { #[error(transparent)] VersionManager(#[from] VersionManagerError), } - -#[derive(Debug, Serialize, Deserialize)] -pub struct ThumbnailerJobReport { - location_id: location::id::Type, - path: PathBuf, - thumbnails_created: u32, - thumbnails_skipped: u32, -} - #[derive(Debug, Serialize, Deserialize, Clone, Copy)] enum ThumbnailerJobStepKind { Image, @@ -209,65 +190,6 @@ pub const fn can_generate_thumbnail_for_image(image_extension: &ImageExtension) res } -fn finalize_thumbnailer(data: &ThumbnailerJobState, ctx: &mut WorkerContext) -> JobResult { - info!( - "Finished thumbnail generation for location {} at {}", - data.report.location_id, - data.report.path.display() - ); - - if data.report.thumbnails_created > 0 { - invalidate_query!(ctx.library, "search.paths"); - } - - Ok(Some(serde_json::to_value(&data.report)?)) -} - -async fn process_step( - state: &mut JobState, - ctx: &mut WorkerContext, -) -> Result<(), JobError> { - let step = &state.steps[0]; - - ctx.progress(vec![JobReportUpdate::Message(format!( - "Processing {}", - maybe_missing( - &step.file_path.materialized_path, - "file_path.materialized_path" - )? - ))]); - - let data = state - .data - .as_mut() - .expect("critical error: missing data on job state"); - - let step_result = inner_process_step( - step, - &data.location_path, - &data.thumbnail_dir, - &state.init.location, - &ctx.library, - ) - .await; - - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); - - match step_result { - Ok(thumbnail_was_created) => { - if thumbnail_was_created { - data.report.thumbnails_created += 1; - } else { - data.report.thumbnails_skipped += 1; - } - Ok(()) - } - Err(e) => Err(e), - } -} - pub async fn inner_process_step( step: &ThumbnailerJobStep, location_path: impl AsRef, diff --git a/core/src/object/preview/thumbnail/thumbnailer_job.rs b/core/src/object/preview/thumbnail/thumbnailer_job.rs index f39dc154c..6e2e14f53 100644 --- a/core/src/object/preview/thumbnail/thumbnailer_job.rs +++ b/core/src/object/preview/thumbnail/thumbnailer_job.rs @@ -1,7 +1,8 @@ use crate::{ - extract_job_data, + invalidate_query, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, + JobRunMetadata, JobState, JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::{ @@ -10,10 +11,10 @@ use crate::{ }, object::preview::thumbnail::directory::init_thumbnail_dir, prisma::{file_path, location, PrismaClient}, + util::db::maybe_missing, }; use std::{ - collections::VecDeque, hash::Hash, path::{Path, PathBuf}, }; @@ -25,8 +26,8 @@ use serde::{Deserialize, Serialize}; use tracing::info; use super::{ - finalize_thumbnailer, process_step, ThumbnailerError, ThumbnailerJobReport, - ThumbnailerJobState, ThumbnailerJobStep, ThumbnailerJobStepKind, FILTERED_IMAGE_EXTENSIONS, + inner_process_step, ThumbnailerError, ThumbnailerJobStep, ThumbnailerJobStepKind, + FILTERED_IMAGE_EXTENSIONS, }; #[cfg(feature = "ffmpeg")] @@ -34,7 +35,7 @@ use super::FILTERED_VIDEO_EXTENSIONS; pub struct ThumbnailerJob {} -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Debug)] pub struct ThumbnailerJobInit { pub location: location::Data, pub sub_path: Option, @@ -53,11 +54,32 @@ impl JobInitData for ThumbnailerJobInit { type Job = ThumbnailerJob; } +#[derive(Debug, Serialize, Deserialize)] +pub struct ThumbnailerJobData { + thumbnail_dir: PathBuf, + location_path: PathBuf, + path: PathBuf, +} + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct ThumbnailerJobRunMetadata { + thumbnails_created: u32, + thumbnails_skipped: u32, +} + +impl JobRunMetadata for ThumbnailerJobRunMetadata { + fn update(&mut self, new_data: Self) { + self.thumbnails_created += new_data.thumbnails_created; + self.thumbnails_skipped += new_data.thumbnails_skipped; + } +} + #[async_trait::async_trait] impl StatefulJob for ThumbnailerJob { type Init = ThumbnailerJobInit; - type Data = ThumbnailerJobState; + type Data = ThumbnailerJobData; type Step = ThumbnailerJobStep; + type RunMetadata = ThumbnailerJobRunMetadata; const NAME: &'static str = "thumbnailer"; @@ -67,21 +89,19 @@ impl StatefulJob for ThumbnailerJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; let thumbnail_dir = init_thumbnail_dir(ctx.library.config().data_directory()).await?; - // .join(THUMBNAIL_CACHE_DIR_NAME); - let location_id = state.init.location.id; - let location_path = match &state.init.location.path { - Some(v) => PathBuf::from(v), - None => return Ok(()), - }; + let location_id = init.location.id; + let location_path = + maybe_missing(&init.location.path, "location.path").map(PathBuf::from)?; - let (path, iso_file_path) = match &state.init.sub_path { + let (path, iso_file_path) = match &init.sub_path { Some(sub_path) if sub_path != Path::new("") => { let full_path = ensure_sub_path_is_in_location(&location_path, sub_path) .await @@ -138,41 +158,91 @@ impl StatefulJob for ThumbnailerJob { image_files .into_iter() .chain(video_files.into_iter()) - .collect::>() + .collect::>() }; #[cfg(not(feature = "ffmpeg"))] - let all_files = { image_files.into_iter().collect::>() }; + let all_files = { image_files.into_iter().collect::>() }; ctx.progress(vec![ JobReportUpdate::TaskCount(all_files.len()), JobReportUpdate::Message(format!("Preparing to process {} files", all_files.len())), ]); - state.data = Some(ThumbnailerJobState { + *data = Some(ThumbnailerJobData { thumbnail_dir, location_path, - report: ThumbnailerJobReport { - location_id, - path, + path, + }); + + Ok(( + ThumbnailerJobRunMetadata { thumbnails_created: 0, thumbnails_skipped: 0, }, - }); - state.steps.extend(all_files); - - Ok(()) + all_files, + ) + .into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { - process_step(state, ctx).await + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { + step, step_number, .. + }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { + ctx.progress(vec![JobReportUpdate::Message(format!( + "Processing {}", + maybe_missing( + &step.file_path.materialized_path, + "file_path.materialized_path" + )? + ))]); + + let mut new_metadata = Self::RunMetadata::default(); + + let step_result = inner_process_step( + step, + &data.location_path, + &data.thumbnail_dir, + &init.location, + &ctx.library, + ) + .await; + + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); + + step_result.map(|thumbnail_was_created| { + if thumbnail_was_created { + new_metadata.thumbnails_created += 1; + } else { + new_metadata.thumbnails_skipped += 1; + } + })?; + + Ok(new_metadata.into()) } - async fn finalize(&mut self, ctx: &mut WorkerContext, state: &mut JobState) -> JobResult { - finalize_thumbnailer(extract_job_data!(state), ctx) + async fn finalize(&self, ctx: &WorkerContext, state: &JobState) -> JobResult { + info!( + "Finished thumbnail generation for location {} at {}", + state.init.location.id, + state + .data + .as_ref() + .expect("critical error: missing data on job state") + .path + .display() + ); + + if state.run_metadata.thumbnails_created > 0 { + invalidate_query!(ctx.library, "search.paths"); + } + + Ok(Some(serde_json::to_value(&state.run_metadata)?)) } } diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index 69d4a048e..ec4742204 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -1,7 +1,7 @@ use crate::{ - extract_job_data, job::{ - JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + CurrentStep, JobError, JobInitData, JobInitOutput, JobReportUpdate, JobResult, JobState, + JobStepOutput, StatefulJob, WorkerContext, }, library::Library, location::file_path_helper::{ @@ -34,7 +34,7 @@ use super::{hash::file_checksum, ValidatorError}; pub struct ObjectValidatorJob {} #[derive(Serialize, Deserialize, Debug)] -pub struct ObjectValidatorJobState { +pub struct ObjectValidatorJobData { pub location_path: PathBuf, pub task_count: usize, } @@ -62,8 +62,9 @@ impl JobInitData for ObjectValidatorJobInit { #[async_trait::async_trait] impl StatefulJob for ObjectValidatorJob { type Init = ObjectValidatorJobInit; - type Data = ObjectValidatorJobState; + type Data = ObjectValidatorJobData; type Step = file_path_for_object_validator::Data; + type RunMetadata = (); const NAME: &'static str = "object_validator"; @@ -73,17 +74,18 @@ impl StatefulJob for ObjectValidatorJob { async fn init( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + data: &mut Option, + ) -> Result, JobError> { let Library { db, .. } = &ctx.library; - let location_id = state.init.location.id; + let location_id = init.location.id; let location_path = - maybe_missing(&state.init.location.path, "location.path").map(PathBuf::from)?; + maybe_missing(&init.location.path, "location.path").map(PathBuf::from)?; - let maybe_sub_iso_file_path = match &state.init.sub_path { + let maybe_sub_iso_file_path = match &init.sub_path { Some(sub_path) if sub_path != Path::new("") => { let full_path = ensure_sub_path_is_in_location(&location_path, sub_path) .await @@ -109,52 +111,55 @@ impl StatefulJob for ObjectValidatorJob { _ => None, }; - state.steps.extend( - db.file_path() - .find_many(chain_optional_iter( - [ - file_path::location_id::equals(Some(state.init.location.id)), - file_path::is_dir::equals(Some(false)), - file_path::integrity_checksum::equals(None), - ], - [maybe_sub_iso_file_path.and_then(|iso_sub_path| { - iso_sub_path - .materialized_path_for_children() - .map(file_path::materialized_path::starts_with) - })], - )) - .select(file_path_for_object_validator::select()) - .exec() - .await?, - ); + let steps = db + .file_path() + .find_many(chain_optional_iter( + [ + file_path::location_id::equals(Some(init.location.id)), + file_path::is_dir::equals(Some(false)), + file_path::integrity_checksum::equals(None), + ], + [maybe_sub_iso_file_path.and_then(|iso_sub_path| { + iso_sub_path + .materialized_path_for_children() + .map(file_path::materialized_path::starts_with) + })], + )) + .select(file_path_for_object_validator::select()) + .exec() + .await?; - state.data = Some(ObjectValidatorJobState { + *data = Some(ObjectValidatorJobData { location_path, - task_count: state.steps.len(), + task_count: steps.len(), }); - ctx.progress(vec![JobReportUpdate::TaskCount(state.steps.len())]); + ctx.progress(vec![JobReportUpdate::TaskCount(steps.len())]); - Ok(()) + Ok(steps.into()) } async fn execute_step( &self, - ctx: &mut WorkerContext, - state: &mut JobState, - ) -> Result<(), JobError> { + ctx: &WorkerContext, + init: &Self::Init, + CurrentStep { + step: file_path, + step_number, + .. + }: CurrentStep<'_, Self::Step>, + data: &Self::Data, + _: &Self::RunMetadata, + ) -> Result, JobError> { let Library { db, sync, .. } = &ctx.library; - let file_path = &state.steps[0]; - let data = extract_job_data!(state); - // this is to skip files that already have checksums // i'm unsure what the desired behaviour is in this case // we can also compare old and new checksums here // This if is just to make sure, we already queried objects where integrity_checksum is null if file_path.integrity_checksum.is_none() { let full_path = data.location_path.join(IsolatedFilePathData::try_from(( - state.init.location.id, + init.location.id, file_path, ))?); let checksum = file_checksum(&full_path) @@ -178,19 +183,17 @@ impl StatefulJob for ObjectValidatorJob { .await?; } - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - state.step_number + 1, - )]); + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(step_number + 1)]); - Ok(()) + Ok(().into()) } - async fn finalize( - &mut self, - _ctx: &mut WorkerContext, - state: &mut JobState, - ) -> JobResult { - let data = extract_job_data!(state); + async fn finalize(&self, _: &WorkerContext, state: &JobState) -> JobResult { + let data = state + .data + .as_ref() + .expect("critical error: missing data on job state"); + info!( "finalizing validator job at {}{}: {} tasks", data.location_path.display(), diff --git a/interface/app/$libraryId/Explorer/File/ContextMenu.tsx b/interface/app/$libraryId/Explorer/File/ContextMenu.tsx index 96be1dfed..7b44b6ece 100644 --- a/interface/app/$libraryId/Explorer/File/ContextMenu.tsx +++ b/interface/app/$libraryId/Explorer/File/ContextMenu.tsx @@ -14,6 +14,7 @@ import { import { useLocation } from 'react-router-dom'; import { ExplorerItem, + ObjectKind, getItemFilePath, getItemObject, useLibraryContext, @@ -260,10 +261,30 @@ export default ({ data }: Props) => { keybind={keybind([ModifierKeys.Control], ['B'])} disabled /> - - - - + {[ObjectKind.Image, ObjectKind.Video].includes(objectData?.kind as ObjectKind) && ( + + {(() => { + switch (objectData?.kind) { + case ObjectKind.Image: + return ( + <> + + + + + ); + case ObjectKind.Video: + return ( + <> + + + + + ); + } + })()} + + )} {locationId != null && ( <> @@ -292,7 +313,7 @@ export default ({ data }: Props) => { } catch (error) { showAlertDialog({ title: 'Error', - value: `Failed to generate thumbanails, due to an error: ${error}` + value: `Failed to generate thumbnails, due to an error: ${error}` }); } }} diff --git a/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx b/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx index a04b21fd1..781252672 100644 --- a/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx @@ -34,35 +34,19 @@ function Job({ job, className, isChild }: JobProps) { textItems: [[{ text: job.status.replace(/([A-Z])/g, ' $1').trim() }]] }; const isRunning = job.status === 'Running'; + const isPaused = job.status === 'Paused'; + + const task_count = realtimeUpdate?.task_count || job.task_count; + const completed_task_count = realtimeUpdate?.completed_task_count || job.completed_task_count; // clear stale realtime state when job is done useEffect(() => { - if (job.status !== 'Running') { - setRealtimeUpdate(null); - } - }, [job.status]); + if (isRunning) setRealtimeUpdate(null); + }, [isRunning]); // dayjs from seconds to time // const timeText = isRunning ? formatEstimatedRemainingTime(job.estimated_completion) : undefined; - const clearJob = useLibraryMutation(['jobs.clear'], { - onError: () => { - showAlertDialog({ - title: 'Error', - value: 'There was an error clearing the job. Please try again.' - }); - }, - onSuccess: () => { - queryClient.invalidateQueries(['jobs.reports']); - } - }); - - // const clearJobHandler = useCallback( - // (id: string) => { - // clearJob.mutate(id); - // }, - // [clearJob] - // ); // I don't like sending TSX as a prop due to lack of hot-reload, but it's the only way to get the error log to show up if (job.status === 'CompletedWithErrors') { @@ -100,16 +84,19 @@ function Job({ job, className, isChild }: JobProps) { name={niceData.name} circleIcon={niceData.icon} textItems={ - ['Queued'].includes(job.status) ? [[{ text: job.status }]] : niceData.textItems + ['Queued'].includes(job.status) + ? [[{ text: job.status }]] + : niceData.textItems } // textItems={[[{ text: job.status }, { text: job.id, }]]} - isChild={job.action !== null} + isChild={isChild} > - {isRunning && ( + {isRunning || isPaused && (
)} diff --git a/interface/app/$libraryId/Layout/Sidebar/JobManager/JobContainer.tsx b/interface/app/$libraryId/Layout/Sidebar/JobManager/JobContainer.tsx index c79c160df..ebc1dedc3 100644 --- a/interface/app/$libraryId/Layout/Sidebar/JobManager/JobContainer.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/JobManager/JobContainer.tsx @@ -26,7 +26,7 @@ interface JobContainerProps extends HTMLAttributes { const CIRCLE_ICON_CLASS = `relative flex-shrink-0 top-1 z-20 mr-3 h-7 w-7 rounded-full bg-app-button p-[5.5px]`; const IMG_ICON_CLASS = `relative left-[-2px] top-1 z-10 mr-2 h-8 w-8`; -const MetaContainer = tw.div`flex w-full flex-col`; +const MetaContainer = tw.div`flex w-full overflow-hidden flex-col`; const TextLine = tw.div`mt-[2px] gap-1 text-ink-faint truncate mr-8 pl-1.5`; const TextItem = tw.span`truncate`; @@ -59,7 +59,9 @@ const JobContainer = forwardRef((props, ref) = )} {iconImg && } - {name} + + {name} + {textItems?.map((textItems, lineIndex) => { // filter out undefined text so we don't render empty TextItems const filteredItems = textItems.filter((i) => i?.text); @@ -67,7 +69,11 @@ const JobContainer = forwardRef((props, ref) = const popoverText = filteredItems.map((i) => i?.text).join(' • '); return ( - + {filteredItems.map((textItem, index) => { const Icon = textItem?.icon; @@ -76,7 +82,7 @@ const JobContainer = forwardRef((props, ref) = 0 && 'px-1.5 py-0.5 italic', + // lineIndex > 0 && 'px-1.5 py-0.5 italic', textItem?.onClick && '-ml-1.5 rounded-md hover:bg-app-button/50' )} @@ -84,7 +90,7 @@ const JobContainer = forwardRef((props, ref) = {Icon && ( )} {textItem?.text} diff --git a/interface/app/$libraryId/Layout/Sidebar/JobManager/JobGroup.tsx b/interface/app/$libraryId/Layout/Sidebar/JobManager/JobGroup.tsx index 69ba4b3ee..ae8c33b94 100644 --- a/interface/app/$libraryId/Layout/Sidebar/JobManager/JobGroup.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/JobManager/JobGroup.tsx @@ -3,13 +3,18 @@ import { Folder } from '@sd/assets/icons'; import clsx from 'clsx'; import dayjs from 'dayjs'; import { DotsThreeVertical, Pause, Play, Stop } from 'phosphor-react'; -import { Fragment, useState } from 'react'; -import { JobReport, useLibraryMutation } from '@sd/client'; +import { Fragment, useEffect, useState } from 'react'; +import { + JobGroup as IJobGroup, + JobProgressEvent, + JobReport, + useLibraryMutation, + useLibrarySubscription +} from '@sd/client'; import { Button, ProgressBar, Tooltip } from '@sd/ui'; import Job from './Job'; import JobContainer from './JobContainer'; import { useTotalElapsedTimeText } from './useGroupJobTimeText'; -import { IJobGroup } from './useGroupedJobs'; interface JobGroupProps { data: IJobGroup; @@ -18,11 +23,32 @@ interface JobGroupProps { function JobGroup({ data: { jobs, ...data }, clearJob }: JobGroupProps) { const [showChildJobs, setShowChildJobs] = useState(false); + const [realtimeUpdate, setRealtimeUpdate] = useState(null); - const pauseJob = useLibraryMutation(['jobs.pause']); - const resumeJob = useLibraryMutation(['jobs.resume']); + const pauseJob = useLibraryMutation(['jobs.pause'], { + onError: alert + }); + const resumeJob = useLibraryMutation(['jobs.resume'], { + onError: alert + }); + const cancelJob = useLibraryMutation(['jobs.cancel'], { + onError: alert + }); const isJobsRunning = jobs.some((job) => job.status === 'Running'); + const isJobPaused = jobs.some((job) => job.status === 'Paused'); + const activeJobId = jobs.find((job) => job.status === 'Running')?.id; + + useLibrarySubscription(['jobs.progress', activeJobId as string], { + onData: setRealtimeUpdate, + enabled: !!activeJobId || !showChildJobs + }); + + useEffect(() => { + if (data.status !== 'Running') { + setRealtimeUpdate(null); + } + }, [data.status]); const tasks = totalTasks(jobs); const totalGroupTime = useTotalElapsedTimeText(jobs); @@ -35,7 +61,7 @@ function JobGroup({ data: { jobs, ...data }, clearJob }: JobGroupProps) { return (