From 033e61ac3329de46d471cefdb1d76f8a1652d61d Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Thu, 26 Oct 2023 01:48:29 -0300 Subject: [PATCH] [ENG-1320 | ENG-1340] Separate thumbnails FS location by library | Make the thumbnails actor report back its progress (#1656) * Renaming error type on ffmpeg subcrate * Version manager overhaul * Reworked thumbnail actor * Updating sharding scheme * New migration system for thumbnails * Updating search to new actor * Updating custom_uri to new thumbnail improvements * Updating library to new thumbnail stuff * LibraryId type alias * Updating indexer to new thumbnail actor * Updating watcher to new thumbnail actor * Update location manager to use LibraryId type alias * Updating location metadata to new LibraryId type alias * New LocationPubId type alias * Updating ephemeral walker to new thumbnail stuff * Updating media processor to new thumbnail actor * New thumbnailer actor state manager * Introducing the concept of job phases * Segregating the thumbnailer actor worker fn * Fixes on job pausing * Processing batches with progress reporting * Updated actor * Updated media processor * Small tweaks * Updating non indexed walker * Changing a UI string --- core/src/api/search.rs | 9 +- core/src/custom_uri/mod.rs | 3 +- core/src/job/mod.rs | 58 +- core/src/job/report.rs | 6 + core/src/job/worker.rs | 10 + core/src/library/library.rs | 4 +- core/src/library/mod.rs | 2 + core/src/location/indexer/indexer_job.rs | 3 +- core/src/location/indexer/shallow.rs | 3 +- core/src/location/manager/helpers.rs | 8 +- core/src/location/manager/watcher/utils.rs | 12 +- core/src/location/metadata.rs | 7 +- core/src/location/mod.rs | 2 + core/src/location/non_indexed.rs | 15 +- core/src/object/media/media_processor/job.rs | 340 ++++-- core/src/object/media/media_processor/mod.rs | 140 +-- .../object/media/media_processor/shallow.rs | 138 ++- core/src/object/media/thumbnail/actor.rs | 1046 +++-------------- core/src/object/media/thumbnail/clean_up.rs | 178 +++ core/src/object/media/thumbnail/directory.rs | 414 ++++++- core/src/object/media/thumbnail/mod.rs | 160 +-- core/src/object/media/thumbnail/process.rs | 420 +++++++ core/src/object/media/thumbnail/shard.rs | 17 +- core/src/object/media/thumbnail/state.rs | 222 ++++ core/src/object/media/thumbnail/worker.rs | 323 +++++ core/src/util/version_manager.rs | 128 +- crates/ffmpeg/src/error.rs | 2 +- crates/ffmpeg/src/lib.rs | 6 +- crates/ffmpeg/src/movie_decoder.rs | 43 +- crates/ffmpeg/src/thumbnailer.rs | 16 +- crates/ffmpeg/src/utils.rs | 9 +- packages/client/src/core.ts | 4 +- packages/client/src/utils/jobs/useJobInfo.tsx | 87 +- 33 files changed, 2426 insertions(+), 1409 deletions(-) create mode 100644 core/src/object/media/thumbnail/clean_up.rs create mode 100644 core/src/object/media/thumbnail/process.rs create mode 100644 core/src/object/media/thumbnail/state.rs create mode 100644 core/src/object/media/thumbnail/worker.rs diff --git a/core/src/api/search.rs b/core/src/api/search.rs index 4391a227e..bac086283 100644 --- a/core/src/api/search.rs +++ b/core/src/api/search.rs @@ -8,7 +8,7 @@ use crate::{ file_path_helper::{check_file_path_exists, IsolatedFilePathData}, non_indexed, LocationError, }, - object::media::thumbnail::get_thumb_key, + object::media::thumbnail::get_indexed_thumb_key, prisma::{self, file_path, location, object, tag, tag_on_object, PrismaClient}, }; @@ -589,7 +589,10 @@ pub fn mount() -> AlphaRouter { items.push(ExplorerItem::Path { has_local_thumbnail: thumbnail_exists_locally, - thumbnail_key: file_path.cas_id.as_ref().map(|i| get_thumb_key(i)), + thumbnail_key: file_path + .cas_id + .as_ref() + .map(|i| get_indexed_thumb_key(i, library.id)), item: file_path, }) } @@ -738,7 +741,7 @@ pub fn mount() -> AlphaRouter { items.push(ExplorerItem::Object { has_local_thumbnail: thumbnail_exists_locally, - thumbnail_key: cas_id.map(|i| get_thumb_key(i)), + thumbnail_key: cas_id.map(|i| get_indexed_thumb_key(i, library.id)), item: object, }); } diff --git a/core/src/custom_uri/mod.rs b/core/src/custom_uri/mod.rs index 605ec9a36..3b90366a8 100644 --- a/core/src/custom_uri/mod.rs +++ b/core/src/custom_uri/mod.rs @@ -2,6 +2,7 @@ use crate::{ api::{utils::InvalidateOperationEvent, CoreEvent}, library::Library, location::file_path_helper::{file_path_to_handle_custom_uri, IsolatedFilePathData}, + object::media::thumbnail::WEBP_EXTENSION, p2p::{sync::InstanceState, IdentityOrRemoteIdentity}, prisma::{file_path, location}, util::{db::*, InfallibleResponse}, @@ -160,7 +161,7 @@ pub fn router(node: Arc) -> Router<()> { // Prevent directory traversal attacks (Eg. requesting `../../../etc/passwd`) // For now we only support `webp` thumbnails. (path.starts_with(&thumbnail_path) - && path.extension() == Some(OsStr::new("webp"))) + && path.extension() == Some(WEBP_EXTENSION.as_ref())) .then_some(()) .ok_or_else(|| not_found(()))?; diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 75c39e40a..6b4d21a31 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -482,9 +482,9 @@ impl DynJob for Job { let target_location = init.target_location(); - let stateful_job = Arc::new(init); + let mut stateful_job = Arc::new(init); - let ctx = Arc::new(ctx); + let mut ctx = Arc::new(ctx); let mut job_should_run = true; let job_init_time = Instant::now(); @@ -497,7 +497,6 @@ impl DynJob for Job { let init_time = Instant::now(); let init_task = { let ctx = Arc::clone(&ctx); - let stateful_job = Arc::clone(&stateful_job); spawn(async move { let mut new_data = None; let res = stateful_job.init(&ctx, &mut new_data).await; @@ -508,11 +507,15 @@ impl DynJob for Job { } } - (new_data, res) + (stateful_job, new_data, res) }) }; - let InitPhaseOutput { maybe_data, output } = handle_init_phase::( + let InitPhaseOutput { + stateful_job: returned_stateful_job, + maybe_data, + output, + } = handle_init_phase::( JobRunWorkTable { id: job_id, name: job_name, @@ -525,6 +528,8 @@ impl DynJob for Job { ) .await?; + stateful_job = returned_stateful_job; + match output { Ok(JobInitOutput { run_metadata: new_run_metadata, @@ -547,13 +552,13 @@ impl DynJob for Job { // Run the job until it's done or we get a command let data = if let Some(working_data) = working_data { - let working_data_arc = Arc::new(working_data); + let mut working_data_arc = Arc::new(working_data); // Job run phase while job_should_run && !steps.is_empty() { - let steps_len = steps.len(); + let steps_len: usize = steps.len(); - let run_metadata_arc = Arc::new(run_metadata); + let mut run_metadata_arc = Arc::new(run_metadata); let step = Arc::new(steps.pop_front().expect("just checked that we have steps")); let init_time = Instant::now(); @@ -584,6 +589,13 @@ impl DynJob for Job { let JobStepsPhaseOutput { steps: returned_steps, output, + step_arcs: + ( + returned_ctx, + returned_run_metadata_arc, + returned_working_data_arc, + returned_stateful_job, + ), } = handle_single_step::( JobRunWorkTable { id: job_id, @@ -593,10 +605,11 @@ impl DynJob for Job { }, &job_init_time, ( - Arc::clone(&ctx), - Arc::clone(&run_metadata_arc), - Arc::clone(&working_data_arc), - Arc::clone(&stateful_job), + // Must not hold extra references here; moving and getting back on function completion + ctx, + run_metadata_arc, + working_data_arc, + stateful_job, ), JobStepDataWorkTable { step_number, @@ -609,6 +622,10 @@ impl DynJob for Job { .await?; steps = returned_steps; + ctx = returned_ctx; + run_metadata_arc = returned_run_metadata_arc; + working_data_arc = returned_working_data_arc; + stateful_job = returned_stateful_job; run_metadata = Arc::try_unwrap(run_metadata_arc).expect("step already ran, no more refs"); @@ -751,6 +768,7 @@ impl DynJob for Job { } struct InitPhaseOutput { + stateful_job: Arc, maybe_data: Option, output: Result, JobError>, } @@ -763,6 +781,7 @@ struct JobRunWorkTable { } type InitTaskOutput = ( + Arc, Option<::Data>, Result< JobInitOutput<::RunMetadata, ::Step>, @@ -806,13 +825,17 @@ async fn handle_init_phase( ); return Err(join_error.into()); } - StreamMessage::InitResult(Ok((maybe_data, output))) => { + StreamMessage::InitResult(Ok((stateful_job, maybe_data, output))) => { debug!( "Init phase took {:?} Job ", init_time.elapsed() ); - return Ok(InitPhaseOutput { maybe_data, output }); + return Ok(InitPhaseOutput { + stateful_job, + maybe_data, + output, + }); } StreamMessage::NewCommand(WorkerCommand::IdentifyYourself(tx)) => { if tx @@ -978,6 +1001,7 @@ struct JobStepDataWorkTable { struct JobStepsPhaseOutput { steps: VecDeque, output: StepTaskOutput, + step_arcs: StepArcs, } type StepArcs = ( @@ -1033,7 +1057,11 @@ async fn handle_single_step( init_time.elapsed(), ); - return Ok(JobStepsPhaseOutput { steps, output }); + return Ok(JobStepsPhaseOutput { + steps, + output, + step_arcs: (worker_ctx, run_metadata, working_data, stateful_job), + }); } StreamMessage::NewCommand(WorkerCommand::IdentifyYourself(tx)) => { if tx diff --git a/core/src/job/report.rs b/core/src/job/report.rs index 7f671ef6f..db99e6df0 100644 --- a/core/src/job/report.rs +++ b/core/src/job/report.rs @@ -19,6 +19,7 @@ pub enum JobReportUpdate { TaskCount(usize), CompletedTaskCount(usize), Message(String), + Phase(String), } job::select!(job_without_data { @@ -57,6 +58,7 @@ pub struct JobReport { pub task_count: i32, pub completed_task_count: i32, + pub phase: String, pub message: String, pub estimated_completion: DateTime, } @@ -102,6 +104,7 @@ impl TryFrom for JobReport { .expect("corrupted database"), task_count: data.task_count.unwrap_or(0), completed_task_count: data.completed_task_count.unwrap_or(0), + phase: String::new(), message: String::new(), estimated_completion: data .date_estimated_completion @@ -144,6 +147,7 @@ impl TryFrom for JobReport { task_count: data.task_count.unwrap_or(0), completed_task_count: data.completed_task_count.unwrap_or(0), + phase: String::new(), message: String::new(), estimated_completion: data .date_estimated_completion @@ -169,6 +173,7 @@ impl JobReport { metadata: None, parent_id: None, completed_task_count: 0, + phase: String::new(), message: String::new(), estimated_completion: Utc::now(), } @@ -319,6 +324,7 @@ impl JobReportBuilder { metadata: self.metadata, parent_id: self.parent_id, completed_task_count: 0, + phase: String::new(), message: String::new(), estimated_completion: Utc::now(), } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index fef095b64..2de4b7a4c 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -40,6 +40,7 @@ pub struct JobProgressEvent { pub library_id: Uuid, pub task_count: i32, pub completed_task_count: i32, + pub phase: String, pub message: String, pub estimated_completion: DateTime, } @@ -287,6 +288,14 @@ impl Worker { trace!("job {} message: {}", report.id, message); report.message = message; } + JobReportUpdate::Phase(phase) => { + trace!( + "changing Job phase: {} -> {phase}", + report.id, + report.phase + ); + report.phase = phase; + } } } @@ -323,6 +332,7 @@ impl Worker { task_count: report.task_count, completed_task_count: report.completed_task_count, estimated_completion: report.estimated_completion, + phase: report.phase.clone(), message: report.message.clone(), })); } diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 0a3c6209b..c6def1513 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -5,7 +5,7 @@ use crate::{ }, location::file_path_helper::{file_path_to_full_path, IsolatedFilePathData}, notifications, - object::{media::thumbnail::get_thumbnail_path, orphan_remover::OrphanRemoverActor}, + object::{media::thumbnail::get_indexed_thumbnail_path, orphan_remover::OrphanRemoverActor}, prisma::{file_path, location, PrismaClient}, sync, util::{db::maybe_missing, error::FileIOError}, @@ -120,7 +120,7 @@ impl Library { } pub async fn thumbnail_exists(&self, node: &Node, cas_id: &str) -> Result { - let thumb_path = get_thumbnail_path(node, cas_id); + let thumb_path = get_indexed_thumbnail_path(node, cas_id, self.id); match fs::metadata(&thumb_path).await { Ok(_) => Ok(true), diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 61e937429..f599da21c 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -10,3 +10,5 @@ pub use config::*; pub use library::*; pub use manager::*; pub use name::*; + +pub type LibraryId = uuid::Uuid; diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 006f7aa37..5975146e8 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -225,11 +225,12 @@ impl StatefulJob for IndexerJobInit { ctx.node .thumbnailer - .remove_cas_ids( + .remove_indexed_cas_ids( to_remove .iter() .filter_map(|file_path| file_path.cas_id.clone()) .collect::>(), + ctx.library.id, ) .await; diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index 110a0a9b2..07ae1ec25 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -91,11 +91,12 @@ pub async fn shallow( let to_remove_count = to_remove.len(); node.thumbnailer - .remove_cas_ids( + .remove_indexed_cas_ids( to_remove .iter() .filter_map(|file_path| file_path.cas_id.clone()) .collect::>(), + library.id, ) .await; diff --git a/core/src/location/manager/helpers.rs b/core/src/location/manager/helpers.rs index e37b894e7..651b45d81 100644 --- a/core/src/location/manager/helpers.rs +++ b/core/src/location/manager/helpers.rs @@ -1,4 +1,9 @@ -use crate::{library::Library, prisma::location, util::db::maybe_missing, Node}; +use crate::{ + library::{Library, LibraryId}, + prisma::location, + util::db::maybe_missing, + Node, +}; use std::{ collections::{HashMap, HashSet}, @@ -13,7 +18,6 @@ use uuid::Uuid; use super::{watcher::LocationWatcher, LocationManagerError}; -type LibraryId = Uuid; type LocationAndLibraryKey = (location::id::Type, LibraryId); const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5); diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index ffbde062d..618be45e2 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -21,7 +21,7 @@ use crate::{ media::{ media_data_extractor::{can_extract_media_data_for_image, extract_media_data}, media_data_image_to_query, - thumbnail::get_thumbnail_path, + thumbnail::get_indexed_thumbnail_path, }, validation::hash::file_checksum, }, @@ -285,10 +285,11 @@ async fn inner_create_file( let extension = extension.clone(); let path = path.to_path_buf(); let node = node.clone(); + let library_id = library.id; spawn(async move { if let Err(e) = node .thumbnailer - .generate_single_thumbnail(&extension, cas_id, path) + .generate_single_indexed_thumbnail(&extension, cas_id, path, library_id) .await { error!("Failed to generate thumbnail in the watcher: {e:#?}"); @@ -522,10 +523,13 @@ async fn inner_update_file( if let Some(cas_id) = cas_id { let node = node.clone(); let path = full_path.to_path_buf(); + let library_id = library.id; spawn(async move { if let Err(e) = node .thumbnailer - .generate_single_thumbnail(&ext, cas_id, path) + .generate_single_indexed_thumbnail( + &ext, cas_id, path, library_id, + ) .await { error!("Failed to generate thumbnail in the watcher: {e:#?}"); @@ -534,7 +538,7 @@ async fn inner_update_file( } // remove the old thumbnail as we're generating a new one - let thumb_path = get_thumbnail_path(node, old_cas_id); + let thumb_path = get_indexed_thumbnail_path(node, old_cas_id, library.id); fs::remove_file(&thumb_path) .await .map_err(|e| FileIOError::from((thumb_path, e)))?; diff --git a/core/src/location/metadata.rs b/core/src/location/metadata.rs index 151003cfc..048685f01 100644 --- a/core/src/location/metadata.rs +++ b/core/src/location/metadata.rs @@ -1,3 +1,5 @@ +use crate::library::LibraryId; + use std::{ collections::{HashMap, HashSet}, path::{Path, PathBuf}, @@ -10,10 +12,9 @@ use tokio::{fs, io}; use tracing::error; use uuid::Uuid; -static SPACEDRIVE_LOCATION_METADATA_FILE: &str = ".spacedrive"; +use super::LocationPubId; -pub(super) type LibraryId = Uuid; -pub(super) type LocationPubId = Uuid; +static SPACEDRIVE_LOCATION_METADATA_FILE: &str = ".spacedrive"; #[derive(Serialize, Deserialize, Default, Debug)] struct LocationMetadata { diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index cd7fa9dc8..687e211e9 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -49,6 +49,8 @@ use metadata::SpacedriveLocationMetadataFile; use file_path_helper::IsolatedFilePathData; +pub type LocationPubId = Uuid; + // Location includes! location::include!(location_with_indexer_rules { indexer_rules: select { indexer_rule } diff --git a/core/src/location/non_indexed.rs b/core/src/location/non_indexed.rs index e9ad6c7a8..140e11144 100644 --- a/core/src/location/non_indexed.rs +++ b/core/src/location/non_indexed.rs @@ -3,10 +3,7 @@ use crate::{ library::Library, object::{ cas::generate_cas_id, - media::thumbnail::{ - actor::{BatchToProcess, GenerateThumbnailArgs}, - get_thumb_key, - }, + media::thumbnail::{get_ephemeral_thumb_key, BatchToProcess, GenerateThumbnailArgs}, }, prisma::location, util::error::FileIOError, @@ -200,9 +197,7 @@ pub async fn walk( )); } - let thumbnail_key = get_thumb_key(&cas_id); - - Some(thumbnail_key) + Some(get_ephemeral_thumb_key(&cas_id)) } else { None } @@ -231,11 +226,7 @@ pub async fn walk( thumbnails_to_generate.extend(document_thumbnails_to_generate); node.thumbnailer - .new_ephemeral_thumbnails_batch(BatchToProcess { - batch: thumbnails_to_generate, - should_regenerate: false, - in_background: false, - }) + .new_ephemeral_thumbnails_batch(BatchToProcess::new(thumbnails_to_generate, false, false)) .await; let mut locations = library diff --git a/core/src/object/media/media_processor/job.rs b/core/src/object/media/media_processor/job.rs index a5baacb27..f84237831 100644 --- a/core/src/object/media/media_processor/job.rs +++ b/core/src/object/media/media_processor/job.rs @@ -10,25 +10,30 @@ use crate::{ file_path_for_media_processor, IsolatedFilePathData, }, prisma::{location, PrismaClient}, - util::db::maybe_missing, + util::db::{maybe_missing, MissingFieldError}, + Node, }; use std::{ - future::Future, hash::Hash, path::{Path, PathBuf}, + time::Duration, }; +use async_channel as chan; +use futures::StreamExt; use itertools::Itertools; use prisma_client_rust::{raw, PrismaValue}; use sd_file_ext::extensions::Extension; use serde::{Deserialize, Serialize}; use serde_json::json; -use tracing::{debug, info}; +use tokio::time::sleep; +use tracing::{debug, error, info, trace, warn}; use super::{ - dispatch_thumbnails_for_processing, media_data_extractor, process, MediaProcessorError, - MediaProcessorMetadata, + media_data_extractor, process, + thumbnail::{self, GenerateThumbnailArgs}, + BatchToProcess, MediaProcessorError, MediaProcessorMetadata, }; const BATCH_SIZE: usize = 10; @@ -53,12 +58,20 @@ impl Hash for MediaProcessorJobInit { pub struct MediaProcessorJobData { location_path: PathBuf, to_process_path: PathBuf, + #[serde(skip, default)] + maybe_thumbnailer_progress_rx: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum MediaProcessorJobStep { + ExtractMediaData(Vec), + WaitThumbnails(usize), } #[async_trait::async_trait] impl StatefulJob for MediaProcessorJobInit { type Data = MediaProcessorJobData; - type Step = Vec; + type Step = MediaProcessorJobStep; type RunMetadata = MediaProcessorMetadata; const NAME: &'static str = "media_processor"; @@ -113,30 +126,52 @@ impl StatefulJob for MediaProcessorJobInit { "Searching for media files in location {location_id} at directory \"{iso_file_path}\"" ); - dispatch_thumbnails_for_processing( + let thumbs_to_process_count = dispatch_thumbnails_for_processing( location_id, &location_path, &iso_file_path, &ctx.library, &ctx.node, false, - get_all_children_files_by_extensions, ) .await?; + let maybe_thumbnailer_progress_rx = if thumbs_to_process_count > 0 { + let (progress_tx, progress_rx) = chan::unbounded(); + + ctx.node + .thumbnailer + .register_reporter(location_id, progress_tx) + .await; + + Some(progress_rx) + } else { + None + }; + let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?; let total_files = file_paths.len(); - let chunked_files = file_paths - .into_iter() - .chunks(BATCH_SIZE) - .into_iter() - .map(|chunk| chunk.collect::>()) - .collect::>(); + let chunked_files = + file_paths + .into_iter() + .chunks(BATCH_SIZE) + .into_iter() + .map(|chunk| chunk.collect::>()) + .map(MediaProcessorJobStep::ExtractMediaData) + .chain( + [(thumbs_to_process_count > 0).then_some( + MediaProcessorJobStep::WaitThumbnails(thumbs_to_process_count as usize), + )] + .into_iter() + .flatten(), + ) + .collect::>(); ctx.progress(vec![ JobReportUpdate::TaskCount(total_files), + JobReportUpdate::Phase("media_data".to_string()), JobReportUpdate::Message(format!( "Preparing to process {total_files} files in {} chunks", chunked_files.len() @@ -146,35 +181,85 @@ impl StatefulJob for MediaProcessorJobInit { *data = Some(MediaProcessorJobData { location_path, to_process_path, + maybe_thumbnailer_progress_rx, }); - Ok(chunked_files.into()) + Ok(( + Self::RunMetadata { + thumbs_processed: thumbs_to_process_count, + ..Default::default() + }, + chunked_files, + ) + .into()) } async fn execute_step( &self, ctx: &WorkerContext, - CurrentStep { - step: file_paths, - step_number, - }: CurrentStep<'_, Self::Step>, + CurrentStep { step, step_number }: CurrentStep<'_, Self::Step>, data: &Self::Data, _: &Self::RunMetadata, ) -> Result, JobError> { - process( - file_paths, - self.location.id, - &data.location_path, - &ctx.library.db, - &|completed_count| { - ctx.progress(vec![JobReportUpdate::CompletedTaskCount( - step_number * BATCH_SIZE + completed_count, - )]); - }, - ) - .await - .map(Into::into) - .map_err(Into::into) + match step { + MediaProcessorJobStep::ExtractMediaData(file_paths) => process( + file_paths, + self.location.id, + &data.location_path, + &ctx.library.db, + &|completed_count| { + ctx.progress(vec![JobReportUpdate::CompletedTaskCount( + step_number * BATCH_SIZE + completed_count, + )]); + }, + ) + .await + .map(Into::into) + .map_err(Into::into), + MediaProcessorJobStep::WaitThumbnails(total_thumbs) => { + ctx.progress(vec![ + JobReportUpdate::TaskCount(*total_thumbs), + JobReportUpdate::Phase("thumbnails".to_string()), + JobReportUpdate::Message(format!( + "Waiting for processing of {total_thumbs} thumbnails", + )), + ]); + + let mut progress_rx = + if let Some(progress_rx) = data.maybe_thumbnailer_progress_rx.clone() { + progress_rx + } else { + let (progress_tx, progress_rx) = chan::unbounded(); + + ctx.node + .thumbnailer + .register_reporter(self.location.id, progress_tx) + .await; + + progress_rx + }; + + let mut total_completed = 0; + + while let Some((completed, total)) = progress_rx.next().await { + trace!("Received progress update from thumbnailer: {completed}/{total}",); + ctx.progress(vec![JobReportUpdate::CompletedTaskCount( + completed as usize, + )]); + total_completed = completed; + } + + if progress_rx.is_closed() && total_completed < *total_thumbs as u32 { + warn!( + "Thumbnailer progress reporter channel closed before all thumbnails were + processed, job will wait a bit waiting for a shutdown signal from manager" + ); + sleep(Duration::from_secs(5)).await; + } + + Ok(None.into()) + } + } } async fn finalize( @@ -200,6 +285,108 @@ impl StatefulJob for MediaProcessorJobInit { } } +async fn dispatch_thumbnails_for_processing( + location_id: location::id::Type, + location_path: impl AsRef, + parent_iso_file_path: &IsolatedFilePathData<'_>, + library: &Library, + node: &Node, + should_regenerate: bool, +) -> Result { + let Library { db, .. } = library; + + let location_path = location_path.as_ref(); + + let file_paths = get_all_children_files_by_extensions( + db, + parent_iso_file_path, + &thumbnail::ALL_THUMBNAILABLE_EXTENSIONS, + ) + .await?; + + if file_paths.is_empty() { + return Ok(0); + } + + let mut current_batch = Vec::with_capacity(16); + + // PDF thumbnails are currently way slower so we process them by last + let mut pdf_thumbs = Vec::with_capacity(16); + + let mut current_materialized_path = None; + + let mut in_background = false; + + let mut thumbs_count = 0; + + for file_path in file_paths { + // Initializing current_materialized_path with the first file_path materialized_path + if current_materialized_path.is_none() { + current_materialized_path = file_path.materialized_path.clone(); + } + + if file_path.materialized_path != current_materialized_path + && (!current_batch.is_empty() || !pdf_thumbs.is_empty()) + { + // Now we found a different materialized_path so we dispatch the current batch and start a new one + + thumbs_count += current_batch.len() as u32; + + node.thumbnailer + .new_indexed_thumbnails_batch_with_ticket( + BatchToProcess::new(current_batch, should_regenerate, in_background), + library.id, + location_id, + ) + .await; + + // We moved our vec so we need a new + current_batch = Vec::with_capacity(16); + in_background = true; // Only the first batch should be processed in foreground + + // Exchaging for the first different materialized_path + current_materialized_path = file_path.materialized_path.clone(); + } + + let file_path_id = file_path.id; + if let Err(e) = add_to_batch( + location_id, + location_path, + file_path, + &mut current_batch, + &mut pdf_thumbs, + ) { + error!("Error adding file_path to thumbnail batch: {e:#?}"); + } + } + + // Dispatching the last batch + if !current_batch.is_empty() { + thumbs_count += current_batch.len() as u32; + node.thumbnailer + .new_indexed_thumbnails_batch_with_ticket( + BatchToProcess::new(current_batch, should_regenerate, in_background), + library.id, + location_id, + ) + .await; + } + + // We now put the pdf_thumbs to be processed by last + if !pdf_thumbs.is_empty() { + thumbs_count += pdf_thumbs.len() as u32; + node.thumbnailer + .new_indexed_thumbnails_batch_with_ticket( + BatchToProcess::new(pdf_thumbs, should_regenerate, in_background), + library.id, + location_id, + ) + .await; + } + + Ok(thumbs_count) +} + async fn get_files_for_media_data_extraction( db: &PrismaClient, parent_iso_file_path: &IsolatedFilePathData<'_>, @@ -213,22 +400,16 @@ async fn get_files_for_media_data_extraction( .map_err(Into::into) } -fn get_all_children_files_by_extensions<'d, 'p, 'e, 'ret>( - db: &'d PrismaClient, - parent_iso_file_path: &'p IsolatedFilePathData<'_>, - extensions: &'e [Extension], -) -> impl Future, MediaProcessorError>> + 'ret -where - 'd: 'ret, - 'p: 'ret, - 'e: 'ret, -{ - async move { - // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite - // We have no data coming from the user, so this is sql injection safe - db._query_raw(raw!( - &format!( - "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id +async fn get_all_children_files_by_extensions( + db: &PrismaClient, + parent_iso_file_path: &IsolatedFilePathData<'_>, + extensions: &[Extension], +) -> Result, MediaProcessorError> { + // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite + // We have no data coming from the user, so this is sql injection safe + db._query_raw(raw!( + &format!( + "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id FROM file_path WHERE location_id={{}} @@ -236,24 +417,47 @@ where AND LOWER(extension) IN ({}) AND materialized_path LIKE {{}} ORDER BY materialized_path ASC", - // Orderind by materialized_path so we can prioritize processing the first files - // in the above part of the directories tree - extensions - .iter() - .map(|ext| format!("LOWER('{ext}')")) - .collect::>() - .join(",") - ), - PrismaValue::Int(parent_iso_file_path.location_id() as i64), - PrismaValue::String(format!( - "{}%", - parent_iso_file_path - .materialized_path_for_children() - .expect("sub path iso_file_path must be a directory") - )) + // Orderind by materialized_path so we can prioritize processing the first files + // in the above part of the directories tree + extensions + .iter() + .map(|ext| format!("LOWER('{ext}')")) + .collect::>() + .join(",") + ), + PrismaValue::Int(parent_iso_file_path.location_id() as i64), + PrismaValue::String(format!( + "{}%", + parent_iso_file_path + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory") )) - .exec() - .await - .map_err(Into::into) - } + )) + .exec() + .await + .map_err(Into::into) +} + +fn add_to_batch( + location_id: location::id::Type, + location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter + file_path: file_path_for_media_processor::Data, + current_batch: &mut Vec, + pdf_thumbs: &mut Vec, +) -> Result<(), MissingFieldError> { + let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone(); + + let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?; + let full_path = location_path.join(&iso_file_path); + + let extension = iso_file_path.extension(); + let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path); + + if extension != "pdf" { + current_batch.push(args); + } else { + pdf_thumbs.push(args); + } + + Ok(()) } diff --git a/core/src/object/media/media_processor/mod.rs b/core/src/object/media/media_processor/mod.rs index 2cfd998d1..ce5ee2a2d 100644 --- a/core/src/object/media/media_processor/mod.rs +++ b/core/src/object/media/media_processor/mod.rs @@ -1,17 +1,11 @@ use crate::{ job::{JobRunErrors, JobRunMetadata}, - library::Library, - location::file_path_helper::{ - file_path_for_media_processor, FilePathError, IsolatedFilePathData, - }, - util::db::{maybe_missing, MissingFieldError}, - Node, + location::file_path_helper::{file_path_for_media_processor, FilePathError}, }; -use sd_file_ext::extensions::Extension; use sd_prisma::prisma::{location, PrismaClient}; -use std::{future::Future, path::Path}; +use std::path::Path; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -19,11 +13,7 @@ use tracing::error; use super::{ media_data_extractor::{self, MediaDataError, MediaDataExtractorMetadata}, - thumbnail::{ - self, - actor::{BatchToProcess, GenerateThumbnailArgs}, - ThumbnailerError, - }, + thumbnail::{self, BatchToProcess, ThumbnailerError}, }; mod job; @@ -51,11 +41,15 @@ pub enum MediaProcessorError { #[derive(Debug, Serialize, Deserialize, Default)] pub struct MediaProcessorMetadata { media_data: MediaDataExtractorMetadata, + thumbs_processed: u32, } impl From for MediaProcessorMetadata { fn from(media_data: MediaDataExtractorMetadata) -> Self { - Self { media_data } + Self { + media_data, + thumbs_processed: 0, + } } } @@ -63,126 +57,10 @@ impl JobRunMetadata for MediaProcessorMetadata { fn update(&mut self, new_data: Self) { self.media_data.extracted += new_data.media_data.extracted; self.media_data.skipped += new_data.media_data.skipped; + self.thumbs_processed += new_data.thumbs_processed; } } -// `thumbs_fetcher_fn` MUST return file_paths ordered by `materialized_path` for optimal results -async fn dispatch_thumbnails_for_processing<'d, 'p, 'e, 'ret, F>( - location_id: location::id::Type, - location_path: impl AsRef, - parent_iso_file_path: &'p IsolatedFilePathData<'_>, - library: &'d Library, - node: &Node, - should_regenerate: bool, - thumbs_fetcher_fn: impl Fn(&'d PrismaClient, &'p IsolatedFilePathData<'_>, &'e [Extension]) -> F, -) -> Result<(), MediaProcessorError> -where - 'd: 'ret, - 'p: 'ret, - 'e: 'ret, - F: Future, MediaProcessorError>> - + 'ret, -{ - let Library { db, .. } = library; - - let location_path = location_path.as_ref(); - - let file_paths = thumbs_fetcher_fn( - db, - parent_iso_file_path, - &thumbnail::ALL_THUMBNAILABLE_EXTENSIONS, - ) - .await?; - - let mut current_batch = Vec::with_capacity(16); - - // PDF thumbnails are currently way slower so we process them by last - let mut pdf_thumbs = Vec::with_capacity(16); - - let mut current_materialized_path = None; - - let mut in_background = false; - - for file_path in file_paths { - // Initializing current_materialized_path with the first file_path materialized_path - if current_materialized_path.is_none() { - current_materialized_path = file_path.materialized_path.clone(); - } - - if file_path.materialized_path != current_materialized_path - && (!current_batch.is_empty() || !pdf_thumbs.is_empty()) - { - // Now we found a different materialized_path so we dispatch the current batch and start a new one - - // We starting by appending all pdfs and leaving the vec clean to be reused - current_batch.append(&mut pdf_thumbs); - - node.thumbnailer - .new_indexed_thumbnails_batch(BatchToProcess { - batch: current_batch, - should_regenerate, - in_background, - }) - .await; - - // We moved our vec so we need a new - current_batch = Vec::with_capacity(16); - in_background = true; // Only the first batch should be processed in foreground - - // Exchaging for the first different materialized_path - current_materialized_path = file_path.materialized_path.clone(); - } - - let file_path_id = file_path.id; - if let Err(e) = add_to_batch( - location_id, - location_path, - file_path, - &mut current_batch, - &mut pdf_thumbs, - ) { - error!("Error adding file_path to thumbnail batch: {e:#?}"); - } - } - - // Dispatching the last batch - if !current_batch.is_empty() { - node.thumbnailer - .new_indexed_thumbnails_batch(BatchToProcess { - batch: current_batch, - should_regenerate, - in_background, - }) - .await; - } - - Ok(()) -} - -fn add_to_batch( - location_id: location::id::Type, - location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter - file_path: file_path_for_media_processor::Data, - current_batch: &mut Vec, - pdf_thumbs: &mut Vec, -) -> Result<(), MissingFieldError> { - let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone(); - - let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?; - let full_path = location_path.join(&iso_file_path); - - let extension = iso_file_path.extension(); - let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path); - - if extension != "pdf" { - current_batch.push(args); - } else { - pdf_thumbs.push(args); - } - - Ok(()) -} - pub async fn process( files_paths: &[file_path_for_media_processor::Data], location_id: location::id::Type, diff --git a/core/src/object/media/media_processor/shallow.rs b/core/src/object/media/media_processor/shallow.rs index 19ebc98ef..258aea154 100644 --- a/core/src/object/media/media_processor/shallow.rs +++ b/core/src/object/media/media_processor/shallow.rs @@ -6,13 +6,14 @@ use crate::{ ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, file_path_for_media_processor, IsolatedFilePathData, }, + object::media::thumbnail::GenerateThumbnailArgs, prisma::{location, PrismaClient}, util::db::maybe_missing, Node, }; use std::{ - future::Future, + cmp::Ordering, path::{Path, PathBuf}, }; @@ -22,8 +23,8 @@ use sd_file_ext::extensions::Extension; use tracing::{debug, error}; use super::{ - dispatch_thumbnails_for_processing, media_data_extractor::{self, process}, + thumbnail::{self, BatchToProcess}, MediaProcessorError, MediaProcessorMetadata, }; @@ -75,7 +76,6 @@ pub async fn shallow( library, node, false, - get_files_by_extensions, ) .await?; @@ -132,43 +132,109 @@ async fn get_files_for_media_data_extraction( .map_err(Into::into) } -fn get_files_by_extensions<'d, 'p, 'e, 'ret>( - db: &'d PrismaClient, - parent_iso_file_path: &'p IsolatedFilePathData<'_>, - extensions: &'e [Extension], -) -> impl Future, MediaProcessorError>> + 'ret -where - 'd: 'ret, - 'p: 'ret, - 'e: 'ret, -{ - async move { - // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite - // We have no data coming from the user, so this is sql injection safe - db._query_raw(raw!( - &format!( - "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id +async fn dispatch_thumbnails_for_processing( + location_id: location::id::Type, + location_path: impl AsRef, + parent_iso_file_path: &IsolatedFilePathData<'_>, + library: &Library, + node: &Node, + should_regenerate: bool, +) -> Result<(), MediaProcessorError> { + let Library { db, .. } = library; + + let location_path = location_path.as_ref(); + + let file_paths = get_files_by_extensions( + db, + parent_iso_file_path, + &thumbnail::ALL_THUMBNAILABLE_EXTENSIONS, + ) + .await?; + + let current_batch = file_paths + .into_iter() + .filter_map(|file_path| { + if let Some(cas_id) = file_path.cas_id.as_ref() { + Some((cas_id.clone(), file_path)) + } else { + error!("File path has no cas_id, skipping", file_path.id); + None + } + }) + .filter_map(|(cas_id, file_path)| { + let file_path_id = file_path.id; + IsolatedFilePathData::try_from((location_id, file_path)) + .map_err(|e| { + error!("Failed to extract isolated file path data from file path : {e:#?}"); + }) + .ok() + .map(|iso_file_path| (cas_id, iso_file_path)) + }) + .map(|(cas_id, iso_file_path)| { + let full_path = location_path.join(&iso_file_path); + + let extension = iso_file_path.extension().to_string(); + + ( + GenerateThumbnailArgs::new(extension.clone(), cas_id, full_path), + extension, + ) + }) + .sorted_by(|(_, ext_a), (_, ext_b)| + // This will put PDF files by last as they're currently way slower to be processed + // FIXME(fogodev): Remove this sort when no longer needed + match (*ext_a == "pdf", *ext_b == "pdf") { + (true, true) => Ordering::Equal, + (false, true) => Ordering::Less, + (true, false) => Ordering::Greater, + (false, false) => Ordering::Equal, + }) + .map(|(args, _)| args) + .collect::>(); + + // Let's not send an empty batch lol + if !current_batch.is_empty() { + node.thumbnailer + .new_indexed_thumbnails_batch( + BatchToProcess::new(current_batch, should_regenerate, true), + library.id, + ) + .await; + } + + Ok(()) +} + +async fn get_files_by_extensions( + db: &PrismaClient, + parent_iso_file_path: &IsolatedFilePathData<'_>, + extensions: &[Extension], +) -> Result, MediaProcessorError> { + // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite + // We have no data coming from the user, so this is sql injection safe + db._query_raw(raw!( + &format!( + "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id FROM file_path WHERE location_id={{}} AND cas_id IS NOT NULL AND LOWER(extension) IN ({}) AND materialized_path = {{}}", - extensions - .iter() - .map(|ext| format!("LOWER('{ext}')")) - .collect::>() - .join(",") - ), - PrismaValue::Int(parent_iso_file_path.location_id() as i64), - PrismaValue::String( - parent_iso_file_path - .materialized_path_for_children() - .expect("sub path iso_file_path must be a directory") - ) - )) - .exec() - .await - .map_err(Into::into) - } + extensions + .iter() + .map(|ext| format!("LOWER('{ext}')")) + .collect::>() + .join(",") + ), + PrismaValue::Int(parent_iso_file_path.location_id() as i64), + PrismaValue::String( + parent_iso_file_path + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory") + ) + )) + .exec() + .await + .map_err(Into::into) } diff --git a/core/src/object/media/thumbnail/actor.rs b/core/src/object/media/thumbnail/actor.rs index 4185adeed..4d25c3097 100644 --- a/core/src/object/media/thumbnail/actor.rs +++ b/core/src/object/media/thumbnail/actor.rs @@ -1,76 +1,41 @@ use crate::{ api::CoreEvent, - library::{Libraries, LibraryManagerEvent}, - object::media::thumbnail::{ - can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, - generate_image_thumbnail, get_shard_hex, get_thumb_key, ThumbnailerError, - }, + library::{Libraries, LibraryId, LibraryManagerEvent}, util::error::{FileIOError, NonUtf8PathError}, }; -use sd_file_ext::extensions::{DocumentExtension, ImageExtension}; -use sd_prisma::prisma::{file_path, PrismaClient}; -use serde::{Deserialize, Serialize}; +use sd_prisma::prisma::{location, PrismaClient}; use std::{ - collections::{HashMap, HashSet, VecDeque}, - ffi::OsStr, path::{Path, PathBuf}, - str::FromStr, sync::Arc, - time::{Duration, SystemTime}, }; use async_channel as chan; -use futures::stream::FuturesUnordered; -use futures_concurrency::{ - future::{Join, Race, TryJoin}, - stream::Merge, -}; use once_cell::sync::OnceCell; use thiserror::Error; use tokio::{ - fs, io, spawn, + fs, spawn, sync::{broadcast, oneshot, Mutex}, - time::{interval, interval_at, sleep, timeout, Instant, MissedTickBehavior}, + time::{sleep, Instant}, }; -use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error}; use uuid::Uuid; -use super::{init_thumbnail_dir, THUMBNAIL_CACHE_DIR_NAME}; - -const ONE_SEC: Duration = Duration::from_secs(1); -const THIRTY_SECS: Duration = Duration::from_secs(30); -const HALF_HOUR: Duration = Duration::from_secs(30 * 60); -const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60); -const SAVE_STATE_FILE: &str = "thumbs_to_process.bin"; +use super::{ + directory::init_thumbnail_dir, + process::{generate_thumbnail, ThumbData}, + state::RegisterReporter, + worker::{worker, WorkerChannels}, + BatchToProcess, ThumbnailKind, ThumbnailerError, ONE_SEC, THUMBNAIL_CACHE_DIR_NAME, +}; static BATCH_SIZE: OnceCell = OnceCell::new(); -#[derive(Debug, Serialize, Deserialize)] -pub struct GenerateThumbnailArgs { - pub extension: String, - pub cas_id: String, - pub path: PathBuf, -} - -impl GenerateThumbnailArgs { - pub fn new(extension: String, cas_id: String, path: PathBuf) -> Self { - Self { - extension, - cas_id, - path, - } - } -} - #[derive(Error, Debug)] -enum Error { +pub(super) enum ActorError { #[error("database error")] Database(#[from] prisma_client_rust::QueryError), - #[error("missing file name: {}", .0.display())] - MissingFileName(Box), #[error(transparent)] FileIO(#[from] FileIOError), #[error(transparent)] @@ -78,125 +43,50 @@ enum Error { } #[derive(Debug)] -enum DatabaseMessage { +pub(super) enum DatabaseMessage { Add(Uuid, Arc), Update(Uuid, Arc), Remove(Uuid), } -#[derive(Debug, Serialize, Deserialize)] -pub struct BatchToProcess { - pub batch: Vec, - pub should_regenerate: bool, - pub in_background: bool, -} - -#[derive(Debug, Serialize, Deserialize)] -enum ProcessingKind { - Indexed, - Ephemeral, -} - // Thumbnails directory have the following structure: // thumbnails/ // ├── version.txt -// └── [0..2]/ # sharding -// └── .webp +// ├── thumbs_to_process.bin # processing save state +// ├── ephemeral/ # ephemeral ones have it's own directory +// │ └── [0..3]/ # sharding +// │ └── .webp +// └── / # we segregate thumbnails by library +// └── [0..3]/ # sharding +// └── .webp pub struct Thumbnailer { - thumbnails_directory: PathBuf, - cas_ids_to_delete_tx: chan::Sender>, - thumbnails_to_generate_tx: chan::Sender<(BatchToProcess, ProcessingKind)>, + thumbnails_directory: Arc, + cas_ids_to_delete_tx: chan::Sender<(Vec, ThumbnailKind)>, + thumbnails_to_generate_tx: chan::Sender<(BatchToProcess, ThumbnailKind)>, + progress_reporter_tx: chan::Sender, last_single_thumb_generated: Mutex, reporter: broadcast::Sender, cancel_tx: chan::Sender>, } -#[derive(Debug, Serialize, Deserialize)] -struct ThumbsProcessingSaveState { - ephemeral_cas_ids: HashSet, - // This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch - // by entering a new directory in the explorer, otherwise processing as FIFO - queue: VecDeque<(BatchToProcess, ProcessingKind)>, - // These below are FIFO queues, so we can process leftovers from the previous batch first - indexed_leftovers_queue: VecDeque, - ephemeral_leftovers_queue: VecDeque, -} - -impl Default for ThumbsProcessingSaveState { - fn default() -> Self { - Self { - ephemeral_cas_ids: HashSet::with_capacity(128), - queue: VecDeque::with_capacity(32), - indexed_leftovers_queue: VecDeque::with_capacity(8), - ephemeral_leftovers_queue: VecDeque::with_capacity(8), - } - } -} - -impl ThumbsProcessingSaveState { - async fn load(thumbnails_directory: impl AsRef) -> Self { - let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); - - match fs::read(&resume_file).await { - Ok(bytes) => { - let this = rmp_serde::from_slice::(&bytes).unwrap_or_else(|e| { - error!("Failed to deserialize save state at thumbnailer actor: {e:#?}"); - Self::default() - }); - - if let Err(e) = fs::remove_file(&resume_file).await { - error!( - "Failed to remove save state file at thumbnailer actor: {:#?}", - FileIOError::from((resume_file, e)) - ); - } - - this - } - Err(e) if e.kind() == io::ErrorKind::NotFound => { - trace!("No save state found at thumbnailer actor"); - Self::default() - } - Err(e) => { - error!( - "Failed to read save state at thumbnailer actor: {:#?}", - FileIOError::from((resume_file, e)) - ); - Self::default() - } - } - } - - async fn store(self, thumbnails_directory: impl AsRef) { - let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); - - let Ok(bytes) = rmp_serde::to_vec_named(&self).map_err(|e| { - error!("Failed to serialize save state at thumbnailer actor: {e:#?}"); - }) else { - return; - }; - - if let Err(e) = fs::write(&resume_file, bytes).await { - error!( - "Failed to write save state at thumbnailer actor: {:#?}", - FileIOError::from((resume_file, e)) - ); - } - } -} - impl Thumbnailer { pub async fn new( data_dir: PathBuf, - lm: Arc, + libraries_manager: Arc, reporter: broadcast::Sender, ) -> Self { - let thumbnails_directory = init_thumbnail_dir(&data_dir).await.unwrap_or_else(|e| { - error!("Failed to initialize thumbnail directory: {e:#?}"); - let mut thumbnails_directory = data_dir; - thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME); - thumbnails_directory - }); + let thumbnails_directory = Arc::new( + init_thumbnail_dir(&data_dir, Arc::clone(&libraries_manager)) + .await + .unwrap_or_else(|e| { + error!("Failed to initialize thumbnail directory: {e:#?}"); + let mut thumbnails_directory = data_dir; + thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME); + thumbnails_directory + }), + ); + + let (progress_management_tx, progress_management_rx) = chan::bounded(16); let (databases_tx, databases_rx) = chan::bounded(4); let (thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) = chan::unbounded(); @@ -217,77 +107,91 @@ impl Thumbnailer { )) .ok(); - let inner_cancel_rx = cancel_rx.clone(); - let inner_thumbnails_directory = thumbnails_directory.clone(); - let inner_reporter = reporter.clone(); - spawn(async move { - while let Err(e) = spawn(Self::worker( - inner_reporter.clone(), - inner_thumbnails_directory.clone(), - databases_rx.clone(), - cas_ids_to_delete_rx.clone(), - ephemeral_thumbnails_to_generate_rx.clone(), - inner_cancel_rx.clone(), - )) - .await - { - error!( - "Error on Thumbnail Remover Actor; \ + spawn({ + let progress_management_rx = progress_management_rx.clone(); + let cancel_rx = cancel_rx.clone(); + let thumbnails_directory = Arc::clone(&thumbnails_directory); + let reporter = reporter.clone(); + + async move { + while let Err(e) = spawn(worker( + *BATCH_SIZE + .get() + .expect("BATCH_SIZE is set at thumbnailer new method"), + reporter.clone(), + thumbnails_directory.clone(), + WorkerChannels { + progress_management_rx: progress_management_rx.clone(), + databases_rx: databases_rx.clone(), + cas_ids_to_delete_rx: cas_ids_to_delete_rx.clone(), + thumbnails_to_generate_rx: ephemeral_thumbnails_to_generate_rx.clone(), + cancel_rx: cancel_rx.clone(), + }, + )) + .await + { + error!( + "Error on Thumbnail Remover Actor; \ Error: {e}; \ Restarting the worker loop...", - ); + ); + } } }); spawn({ - let rx = lm.rx.clone(); + let rx = libraries_manager.rx.clone(); + let thumbnails_directory = Arc::clone(&thumbnails_directory); + async move { - if let Err(err) = rx - .subscribe(move |event| { + let subscribe_res = rx + .subscribe(|event| { let databases_tx = databases_tx.clone(); + let thumbnails_directory = &thumbnails_directory; + async move { match event { LibraryManagerEvent::Load(library) => { - if databases_tx + let library_dir = + thumbnails_directory.join(library.id.to_string()); + + if let Err(e) = fs::create_dir_all(&library_dir).await { + error!( + "Failed to create library dir for thumbnails: {:#?}", + FileIOError::from((library_dir, e)) + ); + } + + databases_tx .send(DatabaseMessage::Add( library.id, Arc::clone(&library.db), )) .await - .is_err() - { - error!("Thumbnail remover actor is dead") - } + .expect("critical thumbnailer error: databases channel closed on send add") } + LibraryManagerEvent::Edit(library) - | LibraryManagerEvent::InstancesModified(library) => { - if databases_tx - .send(DatabaseMessage::Update( - library.id, - Arc::clone(&library.db), - )) - .await - .is_err() - { - error!("Thumbnail remover actor is dead") - } - } - LibraryManagerEvent::Delete(library) => { - if databases_tx - .send(DatabaseMessage::Remove(library.id)) - .await - .is_err() - { - error!("Thumbnail remover actor is dead") - } - } + | LibraryManagerEvent::InstancesModified(library) => databases_tx + .send(DatabaseMessage::Update( + library.id, + Arc::clone(&library.db), + )) + .await + .expect("critical thumbnailer error: databases channel closed on send update"), + + LibraryManagerEvent::Delete(library) => databases_tx + .send(DatabaseMessage::Remove(library.id)) + .await + .expect("critical thumbnailer error: databases channel closed on send delete"), } } }) - .await - { - error!("Thumbnail remover actor has crashed with error: {err:?}") + .await; + + if subscribe_res.is_err() { + error!("Thumbnailer actor has crashed...") } } }); @@ -296,464 +200,109 @@ impl Thumbnailer { thumbnails_directory, cas_ids_to_delete_tx, thumbnails_to_generate_tx, + progress_reporter_tx: progress_management_tx, last_single_thumb_generated: Mutex::new(Instant::now()), reporter, cancel_tx, } } - async fn worker( - reporter: broadcast::Sender, - thumbnails_directory: PathBuf, - databases_rx: chan::Receiver, - cas_ids_to_delete_rx: chan::Receiver>, - thumbnails_to_generate_rx: chan::Receiver<(BatchToProcess, ProcessingKind)>, - cancel_rx: chan::Receiver>, - ) { - let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR); - to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut idle_interval = interval(ONE_SEC); - idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut databases = HashMap::new(); - - #[derive(Debug)] - enum StreamMessage { - RemovalTick, - ToDelete(Vec), - Database(DatabaseMessage), - NewBatch((BatchToProcess, ProcessingKind)), - Leftovers((BatchToProcess, ProcessingKind)), - NewEphemeralThumbnailCasIds(Vec), - Shutdown(oneshot::Sender<()>), - IdleTick, - } - - let ThumbsProcessingSaveState { - mut ephemeral_cas_ids, - mut queue, - mut indexed_leftovers_queue, - mut ephemeral_leftovers_queue, - } = ThumbsProcessingSaveState::load(&thumbnails_directory).await; - - let (ephemeral_thumbnails_cas_ids_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32); - let (leftovers_tx, leftovers_rx) = chan::bounded(8); - - let mut shutdown_leftovers_rx = leftovers_rx.clone(); - - let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1); - - let mut current_batch_processing_rx: Option> = None; - - let mut msg_stream = ( - databases_rx.map(StreamMessage::Database), - cas_ids_to_delete_rx.map(StreamMessage::ToDelete), - thumbnails_to_generate_rx.map(StreamMessage::NewBatch), - leftovers_rx.map(StreamMessage::Leftovers), - ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailCasIds), - IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick), - IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick), - cancel_rx.map(StreamMessage::Shutdown), - ) - .merge(); - - while let Some(msg) = msg_stream.next().await { - match msg { - StreamMessage::IdleTick => { - if let Some(done_rx) = current_batch_processing_rx.as_mut() { - // Checking if the previous run finished or was aborted to clean state - if matches!( - done_rx.try_recv(), - Ok(()) | Err(oneshot::error::TryRecvError::Closed) - ) { - current_batch_processing_rx = None; - } - } - - if current_batch_processing_rx.is_none() - && (!queue.is_empty() - || !indexed_leftovers_queue.is_empty() - || !ephemeral_leftovers_queue.is_empty()) - { - let (done_tx, done_rx) = oneshot::channel(); - current_batch_processing_rx = Some(done_rx); - - let batch_and_kind = if let Some(batch_and_kind) = queue.pop_front() { - batch_and_kind - } else if let Some(batch) = indexed_leftovers_queue.pop_front() { - // indexed leftovers have bigger priority - (batch, ProcessingKind::Indexed) - } else if let Some(batch) = ephemeral_leftovers_queue.pop_front() { - (batch, ProcessingKind::Ephemeral) - } else { - continue; - }; - - spawn(batch_processor( - thumbnails_directory.clone(), - batch_and_kind, - ephemeral_thumbnails_cas_ids_tx.clone(), - ProcessorControlChannels { - stop_rx: stop_older_processing_rx.clone(), - done_tx, - }, - leftovers_tx.clone(), - reporter.clone(), - )); - } - } - - StreamMessage::RemovalTick => { - // For any of them we process a clean up if a time since the last one already passed - if !databases.is_empty() { - if let Err(e) = Self::process_clean_up( - &thumbnails_directory, - databases.values(), - &ephemeral_cas_ids, - ) - .await - { - error!("Got an error when trying to clean stale thumbnails: {e:#?}"); - } - } - } - StreamMessage::ToDelete(cas_ids) => { - if !cas_ids.is_empty() { - if let Err(e) = - Self::remove_by_cas_ids(&thumbnails_directory, cas_ids).await - { - error!("Got an error when trying to remove thumbnails: {e:#?}"); - } - } - } - - StreamMessage::NewBatch((batch, kind)) => { - let in_background = batch.in_background; - - trace!( - "New {kind:?} batch to process in {}, size: {}", - if in_background { - "background" - } else { - "foreground" - }, - batch.batch.len() - ); - - if in_background { - queue.push_back((batch, kind)); - } else { - // If a processing must be in foreground, then it takes maximum priority - queue.push_front((batch, kind)); - } - - // Only sends stop signal if there is a batch being processed - if !in_background && current_batch_processing_rx.is_some() { - trace!("Sending stop signal to older processing"); - let (tx, rx) = oneshot::channel(); - - match stop_older_processing_tx.try_send(tx) { - Ok(()) => { - // We put a timeout here to avoid a deadlock in case the older processing already - // finished its batch - if timeout(ONE_SEC, rx).await.is_err() { - stop_older_processing_rx.recv().await.ok(); - } - } - Err(e) if e.is_full() => { - // The last signal we sent happened after a batch was already processed - // So we clean the channel and we're good to go. - stop_older_processing_rx.recv().await.ok(); - } - Err(_) => { - error!("Thumbnail remover actor died when trying to stop older processing"); - } - } - } - } - - StreamMessage::Leftovers((batch, kind)) => match kind { - ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch), - ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch), - }, - - StreamMessage::Database(DatabaseMessage::Add(id, db)) - | StreamMessage::Database(DatabaseMessage::Update(id, db)) => { - databases.insert(id, db); - } - StreamMessage::Database(DatabaseMessage::Remove(id)) => { - databases.remove(&id); - } - StreamMessage::NewEphemeralThumbnailCasIds(cas_ids) => { - trace!("New ephemeral thumbnail cas ids: {}", cas_ids.len()); - ephemeral_cas_ids.extend(cas_ids); - } - StreamMessage::Shutdown(cancel_tx) => { - debug!("Thumbnail actor is shutting down..."); - - // First stopping the current batch processing - let (tx, rx) = oneshot::channel(); - match stop_older_processing_tx.try_send(tx) { - Ok(()) => { - // We put a timeout here to avoid a deadlock in case the older processing already - // finished its batch - if timeout(ONE_SEC, rx).await.is_err() { - stop_older_processing_rx.recv().await.ok(); - } - } - Err(e) if e.is_full() => { - // The last signal we sent happened after a batch was already processed - // So we clean the channel and we're good to go. - stop_older_processing_rx.recv().await.ok(); - } - Err(_) => { - error!( - "Thumbnail remover actor died when trying to stop older processing" - ); - } - } - - // Closing the leftovers channel to stop the batch processor as we already sent - // an stop signal - leftovers_tx.close(); - while let Some((batch, kind)) = shutdown_leftovers_rx.next().await { - match kind { - ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch), - ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch), - } - } - - // Saving state - ThumbsProcessingSaveState { - ephemeral_cas_ids, - queue, - indexed_leftovers_queue, - ephemeral_leftovers_queue, - } - .store(thumbnails_directory) - .await; - - // Signaling that we're done shutting down - cancel_tx.send(()).ok(); - return; - } - } - } - } - - async fn remove_by_cas_ids( - thumbnails_directory: &Path, - cas_ids: Vec, - ) -> Result<(), Error> { - cas_ids - .into_iter() - .map(|cas_id| async move { - let thumbnail_path = - thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2])); - - trace!("Removing thumbnail: {}", thumbnail_path.display()); - - match fs::remove_file(&thumbnail_path).await { - Ok(()) => Ok(()), - Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(FileIOError::from((thumbnail_path, e))), - } - }) - .collect::>() - .try_join() - .await?; - - Ok(()) - } - - async fn process_clean_up( - thumbnails_directory: &Path, - databases: impl Iterator>, - ephemeral_cas_ids: &HashSet, - ) -> Result<(), Error> { - let databases = databases.collect::>(); - - fs::create_dir_all(&thumbnails_directory) + #[inline] + async fn new_batch(&self, batch: BatchToProcess, kind: ThumbnailKind) { + self.thumbnails_to_generate_tx + .send((batch, kind)) .await - .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; - let mut read_dir = fs::read_dir(thumbnails_directory) - .await - .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; - - while let Some(entry) = read_dir - .next_entry() - .await - .map_err(|e| FileIOError::from((thumbnails_directory, e)))? - { - let entry_path = entry.path(); - if !entry - .metadata() - .await - .map_err(|e| FileIOError::from((thumbnails_directory, e)))? - .is_dir() - { - continue; - } - - let mut thumbnails_paths_by_cas_id = HashMap::new(); - - let mut entry_read_dir = fs::read_dir(&entry_path) - .await - .map_err(|e| FileIOError::from((&entry_path, e)))?; - - while let Some(thumb_entry) = entry_read_dir - .next_entry() - .await - .map_err(|e| FileIOError::from((&entry_path, e)))? - { - let thumb_path = thumb_entry.path(); - - if thumb_path - .extension() - .and_then(OsStr::to_str) - .map_or(true, |ext| ext != "webp") - { - continue; - } - - let thumbnail_name = thumb_path - .file_stem() - .ok_or_else(|| Error::MissingFileName(entry.path().into_boxed_path()))? - .to_str() - .ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?; - - thumbnails_paths_by_cas_id.insert(thumbnail_name.to_string(), thumb_path); - } - - if thumbnails_paths_by_cas_id.is_empty() { - trace!( - "Removing empty thumbnails sharding directory: {}", - entry_path.display() - ); - fs::remove_dir(&entry_path) - .await - .map_err(|e| FileIOError::from((entry_path, e)))?; - - continue; - } - - let thumbs_found = thumbnails_paths_by_cas_id.len(); - - let mut thumbs_in_db_futs = databases - .iter() - .map(|db| { - db.file_path() - .find_many(vec![file_path::cas_id::in_vec( - thumbnails_paths_by_cas_id.keys().cloned().collect(), - )]) - .select(file_path::select!({ cas_id })) - .exec() - }) - .collect::>(); - - while let Some(maybe_thumbs_in_db) = thumbs_in_db_futs.next().await { - maybe_thumbs_in_db? - .into_iter() - .filter_map(|file_path| file_path.cas_id) - .for_each(|cas_id| { - thumbnails_paths_by_cas_id.remove(&cas_id); - }); - } - - thumbnails_paths_by_cas_id.retain(|cas_id, _| !ephemeral_cas_ids.contains(cas_id)); - - let now = SystemTime::now(); - - let removed_count = thumbnails_paths_by_cas_id - .into_values() - .map(|path| async move { - if let Ok(metadata) = fs::metadata(&path).await { - if metadata - .accessed() - .map(|when| { - now.duration_since(when) - .map(|duration| duration < ONE_WEEK) - .unwrap_or(false) - }) - .unwrap_or(false) - { - // If the thumbnail was accessed in the last week, we don't remove it yet - // as the file is probably still in use - return Ok(false); - } - } - - trace!("Removing stale thumbnail: {}", path.display()); - fs::remove_file(&path) - .await - .map(|()| true) - .map_err(|e| FileIOError::from((path, e))) - }) - .collect::>() - .try_join() - .await? - .into_iter() - .filter(|r| *r) - .count(); - - if thumbs_found == removed_count { - // if we removed all the thumnails we found, it means that the directory is empty - // and can be removed... - trace!( - "Removing empty thumbnails sharding directory: {}", - entry_path.display() - ); - fs::remove_dir(&entry_path) - .await - .map_err(|e| FileIOError::from((entry_path, e)))?; - } - } - - Ok(()) + .expect("critical thumbnailer error: failed to send new batch"); } #[inline] - async fn new_batch(&self, batch: BatchToProcess, kind: ProcessingKind) { - if self - .thumbnails_to_generate_tx - .send((batch, kind)) - .await - .is_err() - { - error!("Thumbnail remover actor is dead: Failed to send new batch"); - } - } - pub async fn new_ephemeral_thumbnails_batch(&self, batch: BatchToProcess) { - self.new_batch(batch, ProcessingKind::Ephemeral).await; + self.new_batch(batch, ThumbnailKind::Ephemeral).await } - pub async fn new_indexed_thumbnails_batch(&self, batch: BatchToProcess) { - self.new_batch(batch, ProcessingKind::Indexed).await; + #[inline] + pub async fn new_indexed_thumbnails_batch(&self, batch: BatchToProcess, library_id: LibraryId) { + self.new_batch(batch, ThumbnailKind::Indexed(library_id)) + .await } - pub async fn remove_cas_ids(&self, cas_ids: Vec) { - if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() { - error!("Thumbnail remover actor is dead: Failed to send cas ids to delete"); - } + #[inline] + pub async fn new_indexed_thumbnails_batch_with_ticket( + &self, + mut batch: BatchToProcess, + library_id: LibraryId, + location_id: location::id::Type, + ) { + batch.location_id = Some(location_id); + + self.new_batch(batch, ThumbnailKind::Indexed(library_id)) + .await; } + #[inline] + pub async fn register_reporter( + &self, + location_id: location::id::Type, + progress_tx: chan::Sender<(u32, u32)>, + ) { + self.progress_reporter_tx + .send((location_id, progress_tx)) + .await + .expect("critical thumbnailer error: failed to send register reporter fn"); + } + + #[inline] + async fn remove_cas_ids(&self, cas_ids: Vec, kind: ThumbnailKind) { + self.cas_ids_to_delete_tx + .send((cas_ids, kind)) + .await + .expect("critical thumbnailer error: failed to send cas ids to delete"); + } + + #[inline] + pub async fn remove_ephemeral_cas_ids(&self, cas_ids: Vec) { + self.remove_cas_ids(cas_ids, ThumbnailKind::Ephemeral).await + } + + #[inline] + pub async fn remove_indexed_cas_ids(&self, cas_ids: Vec, library_id: LibraryId) { + self.remove_cas_ids(cas_ids, ThumbnailKind::Indexed(library_id)) + .await + } + + #[inline] pub async fn shutdown(&self) { + let start = Instant::now(); let (tx, rx) = oneshot::channel(); - if self.cancel_tx.send(tx).await.is_err() { - error!("Thumbnail remover actor is dead: Failed to send shutdown signal"); - } else { - rx.await.ok(); - } + self.cancel_tx + .send(tx) + .await + .expect("critical thumbnailer error: failed to send shutdown signal"); + + rx.await + .expect("critical thumbnailer error: failed to receive shutdown signal response"); + + debug!("Thumbnailer has been shutdown in {:?}", start.elapsed()); } /// WARNING!!!! DON'T USE THIS METHOD IN A LOOP!!!!!!!!!!!!! It will be pretty slow on purpose! - pub async fn generate_single_thumbnail( + pub async fn generate_single_indexed_thumbnail( &self, extension: &str, cas_id: String, path: impl AsRef, + library_id: LibraryId, + ) -> Result<(), ThumbnailerError> { + self.generate_single_thumbnail(extension, cas_id, path, ThumbnailKind::Indexed(library_id)) + .await + } + + async fn generate_single_thumbnail( + &self, + extension: &str, + cas_id: String, + path: impl AsRef, + kind: ThumbnailKind, ) -> Result<(), ThumbnailerError> { let mut last_single_thumb_generated_guard = self.last_single_thumb_generated.lock().await; @@ -765,12 +314,15 @@ impl Thumbnailer { } let res = generate_thumbnail( - self.thumbnails_directory.clone(), - extension, - cas_id, - path, - false, - false, + self.thumbnails_directory.as_ref().clone(), + ThumbData { + extension, + cas_id, + path, + in_background: false, + should_regenerate: false, + kind, + }, self.reporter.clone(), ) .await @@ -781,227 +333,3 @@ impl Thumbnailer { res } } - -struct ProcessorControlChannels { - stop_rx: chan::Receiver>, - done_tx: oneshot::Sender<()>, -} - -async fn batch_processor( - thumbnails_directory: PathBuf, - ( - BatchToProcess { - batch, - should_regenerate, - in_background, - }, - kind, - ): (BatchToProcess, ProcessingKind), - generated_cas_ids_tx: chan::Sender>, - ProcessorControlChannels { stop_rx, done_tx }: ProcessorControlChannels, - leftovers_tx: chan::Sender<(BatchToProcess, ProcessingKind)>, - reporter: broadcast::Sender, -) { - trace!( - "Processing thumbnails batch of kind {kind:?} with size {} in {}", - batch.len(), - if in_background { - "background" - } else { - "foreground" - }, - ); - - // Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning - // This from is guaranteed to be O(1) - let mut queue = VecDeque::from(batch); - - enum RaceOutputs { - Processed, - Stop(oneshot::Sender<()>), - } - - // Need this borrow here to satisfy the async move below - let generated_cas_ids_tx = &generated_cas_ids_tx; - - while !queue.is_empty() { - let chunk = (0..*BATCH_SIZE - .get() - .expect("BATCH_SIZE is set at thumbnailer new method")) - .filter_map(|_| queue.pop_front()) - .map( - |GenerateThumbnailArgs { - extension, - cas_id, - path, - }| { - let reporter = reporter.clone(); - let thumbnails_directory = thumbnails_directory.clone(); - spawn(async move { - timeout( - THIRTY_SECS, - generate_thumbnail( - thumbnails_directory, - &extension, - cas_id, - &path, - in_background, - should_regenerate, - reporter, - ), - ) - .await - .unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path()))) - }) - }, - ) - .collect::>(); - - if let RaceOutputs::Stop(tx) = ( - async move { - let cas_ids = chunk - .join() - .await - .into_iter() - .filter_map(|join_result| { - join_result - .map_err(|e| error!("Failed to join thumbnail generation task: {e:#?}")) - .ok() - }) - .filter_map(|result| { - result - .map_err(|e| { - error!( - "Failed to generate thumbnail for ephemeral location: {e:#?}" - ) - }) - .ok() - }) - .collect(); - - if generated_cas_ids_tx.send(cas_ids).await.is_err() { - error!("Thumbnail remover actor is dead: Failed to send generated cas ids") - } - - trace!("Processed chunk of thumbnails"); - RaceOutputs::Processed - }, - async { - let tx = stop_rx - .recv() - .await - .expect("Critical error on thumbnails actor"); - trace!("Received a stop signal"); - RaceOutputs::Stop(tx) - }, - ) - .race() - .await - { - // Our queue is always contiguous, so this `from` is free - let leftovers = Vec::from(queue); - - trace!( - "Stopped with {} thumbnails left to process", - leftovers.len() - ); - if !leftovers.is_empty() - && leftovers_tx - .send(( - BatchToProcess { - batch: leftovers, - should_regenerate, - in_background: true, // Leftovers should always be in background - }, - kind, - )) - .await - .is_err() - { - error!("Thumbnail remover actor is dead: Failed to send leftovers") - } - - done_tx.send(()).ok(); - tx.send(()).ok(); - - return; - } - } - - trace!("Finished batch!"); - - done_tx.send(()).ok(); -} - -async fn generate_thumbnail( - thumbnails_directory: PathBuf, - extension: &str, - cas_id: String, - path: impl AsRef, - in_background: bool, - should_regenerate: bool, - reporter: broadcast::Sender, -) -> Result { - let path = path.as_ref(); - trace!("Generating thumbnail for {}", path.display()); - - let mut output_path = thumbnails_directory; - output_path.push(get_shard_hex(&cas_id)); - output_path.push(&cas_id); - output_path.set_extension("webp"); - - if let Err(e) = fs::metadata(&output_path).await { - if e.kind() != io::ErrorKind::NotFound { - error!( - "Failed to check if thumbnail exists, but we will try to generate it anyway: {e:#?}" - ); - } - // Otherwise we good, thumbnail doesn't exist so we can generate it - } else if !should_regenerate { - trace!( - "Skipping thumbnail generation for {} because it already exists", - path.display() - ); - return Ok(cas_id); - } - - if let Ok(extension) = ImageExtension::from_str(extension) { - if can_generate_thumbnail_for_image(&extension) { - generate_image_thumbnail(&path, &output_path).await?; - } - } else if let Ok(extension) = DocumentExtension::from_str(extension) { - if can_generate_thumbnail_for_document(&extension) { - generate_image_thumbnail(&path, &output_path).await?; - } - } - - #[cfg(feature = "ffmpeg")] - { - use crate::object::media::thumbnail::{ - can_generate_thumbnail_for_video, generate_video_thumbnail, - }; - use sd_file_ext::extensions::VideoExtension; - - if let Ok(extension) = VideoExtension::from_str(extension) { - if can_generate_thumbnail_for_video(&extension) { - generate_video_thumbnail(&path, &output_path).await?; - } - } - } - - if !in_background { - trace!("Emitting new thumbnail event"); - if reporter - .send(CoreEvent::NewThumbnail { - thumb_key: get_thumb_key(&cas_id), - }) - .is_err() - { - warn!("Error sending event to Node's event bus"); - } - } - - trace!("Generated thumbnail for {}", path.display()); - - Ok(cas_id) -} diff --git a/core/src/object/media/thumbnail/clean_up.rs b/core/src/object/media/thumbnail/clean_up.rs new file mode 100644 index 000000000..f63924b8a --- /dev/null +++ b/core/src/object/media/thumbnail/clean_up.rs @@ -0,0 +1,178 @@ +use crate::{library::LibraryId, util::error::FileIOError}; + +use sd_prisma::prisma::{file_path, PrismaClient}; + +use std::{collections::HashSet, ffi::OsString, path::PathBuf, sync::Arc}; + +use futures_concurrency::future::Join; +use tokio::{fs, spawn}; +use tracing::{debug, error}; + +use super::{ThumbnailerError, EPHEMERAL_DIR, WEBP_EXTENSION}; + +pub(super) async fn process_ephemeral_clean_up( + thumbnails_directory: Arc, + existing_ephemeral_thumbs: HashSet, +) { + let ephemeral_thumbs_dir = thumbnails_directory.join(EPHEMERAL_DIR); + + spawn(async move { + let mut to_remove = vec![]; + + let mut read_ephemeral_thumbs_dir = fs::read_dir(&ephemeral_thumbs_dir) + .await + .map_err(|e| FileIOError::from((&ephemeral_thumbs_dir, e)))?; + + while let Some(shard_entry) = read_ephemeral_thumbs_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&ephemeral_thumbs_dir, e)))? + { + let shard_path = shard_entry.path(); + if shard_entry + .file_type() + .await + .map_err(|e| FileIOError::from((&shard_path, e)))? + .is_dir() + { + let mut read_shard_dir = fs::read_dir(&shard_path) + .await + .map_err(|e| FileIOError::from((&shard_path, e)))?; + + while let Some(thumb_entry) = read_shard_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&shard_path, e)))? + { + let thumb_path = thumb_entry.path(); + if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref()) + && !existing_ephemeral_thumbs.contains(&thumb_entry.file_name()) + { + to_remove.push(async move { + debug!( + "Removing stale ephemeral thumbnail: {}", + thumb_path.display() + ); + fs::remove_file(&thumb_path).await.map_err(|e| { + ThumbnailerError::FileIO(FileIOError::from((thumb_path, e))) + }) + }); + } + } + } + } + + Ok::<_, ThumbnailerError>(to_remove.join().await) + }) + .await + .map_or_else( + |e| error!("Join error on ephemeral clean up: {e:#?}",), + |fetching_res| { + fetching_res.map_or_else( + |e| error!("Error fetching ephemeral thumbs to be removed: {e:#?}"), + |remove_results| { + remove_results.into_iter().for_each(|remove_res| { + if let Err(e) = remove_res { + error!("Error on ephemeral clean up: {e:#?}"); + } + }) + }, + ) + }, + ) +} + +pub(super) async fn process_indexed_clean_up( + thumbnails_directory: Arc, + libraries_ids_and_databases: Vec<(LibraryId, Arc)>, +) { + libraries_ids_and_databases + .into_iter() + .map(|(library_id, db)| { + let library_thumbs_dir = thumbnails_directory.join(library_id.to_string()); + spawn(async move { + let existing_thumbs = db + .file_path() + .find_many(vec![file_path::cas_id::not(None)]) + .select(file_path::select!({ cas_id })) + .exec() + .await? + .into_iter() + .map(|file_path| { + OsString::from(format!( + "{}.webp", + file_path.cas_id.expect("we filtered right") + )) + }) + .collect::>(); + + let mut read_library_thumbs_dir = fs::read_dir(&library_thumbs_dir) + .await + .map_err(|e| FileIOError::from((&library_thumbs_dir, e)))?; + + let mut to_remove = vec![]; + + while let Some(shard_entry) = read_library_thumbs_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&library_thumbs_dir, e)))? + { + let shard_path = shard_entry.path(); + if shard_entry + .file_type() + .await + .map_err(|e| FileIOError::from((&shard_path, e)))? + .is_dir() + { + let mut read_shard_dir = fs::read_dir(&shard_path) + .await + .map_err(|e| FileIOError::from((&shard_path, e)))?; + + while let Some(thumb_entry) = read_shard_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&shard_path, e)))? + { + let thumb_path = thumb_entry.path(); + if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref()) + && !existing_thumbs.contains(&thumb_entry.file_name()) + { + to_remove.push(async move { + debug!( + "Removing stale indexed thumbnail: {}", + thumb_path.display() + ); + fs::remove_file(&thumb_path).await.map_err(|e| { + ThumbnailerError::FileIO(FileIOError::from((thumb_path, e))) + }) + }); + } + } + } + } + + Ok::<_, ThumbnailerError>(to_remove.join().await) + }) + }) + .collect::>() + .join() + .await + .into_iter() + .filter_map(|join_res| { + join_res + .map_err(|e| error!("Join error on indexed clean up: {e:#?}")) + .ok() + }) + .filter_map(|fetching_res| { + fetching_res + .map_err(|e| error!("Error fetching indexed thumbs to be removed: {e:#?}")) + .ok() + }) + .for_each(|remove_results| { + remove_results.into_iter().for_each(|remove_res| { + if let Err(e) = remove_res { + error!("Error on indexed clean up: {e:#?}"); + } + }) + }) +} diff --git a/core/src/object/media/thumbnail/directory.rs b/core/src/object/media/thumbnail/directory.rs index a923258eb..493dbca88 100644 --- a/core/src/object/media/thumbnail/directory.rs +++ b/core/src/object/media/thumbnail/directory.rs @@ -1,102 +1,398 @@ -use crate::util::{error::FileIOError, version_manager::VersionManager}; +use crate::{ + library::{Libraries, LibraryId}, + object::media::thumbnail::ONE_SEC, + util::{ + error::FileIOError, + version_manager::{ManagedVersion, VersionManager, VersionManagerError}, + }, +}; -use std::path::{Path, PathBuf}; +use sd_prisma::prisma::{file_path, PrismaClient}; +use std::{ + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use futures_concurrency::future::{Join, TryJoin}; use int_enum::IntEnum; -use tokio::fs; -use tracing::{debug, error, trace}; +use tokio::{ + fs, io, spawn, + time::{sleep, timeout}, +}; +use tracing::{debug, error, info, trace, warn}; -use super::{get_shard_hex, ThumbnailerError, THUMBNAIL_CACHE_DIR_NAME}; +use super::{ + get_shard_hex, ThumbnailerError, EPHEMERAL_DIR, THIRTY_SECS, THUMBNAIL_CACHE_DIR_NAME, + VERSION_FILE, WEBP_EXTENSION, +}; -#[derive(IntEnum, Debug, Clone, Copy, Eq, PartialEq)] +#[derive(IntEnum, Debug, Clone, Copy, Eq, PartialEq, strum::Display)] #[repr(i32)] enum ThumbnailVersion { V1 = 1, V2 = 2, + V3 = 3, Unknown = 0, } -pub async fn init_thumbnail_dir(data_dir: impl AsRef) -> Result { +impl ManagedVersion for ThumbnailVersion { + const LATEST_VERSION: Self = Self::V3; + type MigrationError = ThumbnailerError; +} + +pub(super) async fn init_thumbnail_dir( + data_dir: impl AsRef, + libraries_manager: Arc, +) -> Result { debug!("Initializing thumbnail directory"); - let thumbnail_dir = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME); + let thumbnails_directory = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME); - let version_file = thumbnail_dir.join("version.txt"); - let version_manager = - VersionManager::::new(version_file.to_str().expect("Invalid path")); + debug!("Thumbnail directory: {:?}", thumbnails_directory); - debug!("Thumbnail directory: {:?}", thumbnail_dir); - - // create all necessary directories if they don't exist - fs::create_dir_all(&thumbnail_dir) + // create thumbnails base directory + fs::create_dir_all(&thumbnails_directory) .await - .map_err(|e| FileIOError::from((&thumbnail_dir, e)))?; + .map_err(|e| FileIOError::from((&thumbnails_directory, e)))?; - let mut current_version = match version_manager.get_version() { + spawn({ + let thumbnails_directory = thumbnails_directory.clone(); + async move { + let Ok(databases) = timeout(THIRTY_SECS, async move { + loop { + let libraries = libraries_manager.get_all().await; + if !libraries.is_empty() { + break libraries + .into_iter() + .map(|library| (library.id, Arc::clone(&library.db))) + .collect::>(); + } + + sleep(ONE_SEC).await; + } + }) + .await + else { + warn!( + "Failed to get libraries after 30 seconds, thumbnailer migration will not work; \ + Ignore this warning if you don't created libraries yet." + ); + return; + }; + + if let Err(e) = process_migration(thumbnails_directory, databases).await { + error!("Failed to migrate thumbnails: {e:#?}"); + } + } + }); + + Ok(thumbnails_directory) +} + +async fn process_migration( + thumbnails_directory: impl AsRef, + databases: HashMap>, +) -> Result<(), ThumbnailerError> { + let thumbnails_directory = thumbnails_directory.as_ref(); + + let version_manager = + VersionManager::::new(thumbnails_directory.join(VERSION_FILE)); + + // create all other directories, for each library and for ephemeral thumbnails + databases + .keys() + .map(|library_id| thumbnails_directory.join(library_id.to_string())) + .chain([thumbnails_directory.join(EPHEMERAL_DIR)]) + .map(|path| async move { + fs::create_dir_all(&path) + .await + .map_err(|e| FileIOError::from((&path, e))) + }) + .collect::>() + .join() + .await + .into_iter() + .collect::, _>>()?; + + let current_version = match version_manager.get_version().await { Ok(version) => version, - Err(_) => { - debug!("Thumbnail version file does not exist, starting fresh"); + Err(e) => { + debug!("Thumbnail version file does not exist, starting fresh: {e:#?}"); // Version file does not exist, start fresh - version_manager.set_version(ThumbnailVersion::V1)?; + version_manager.set_version(ThumbnailVersion::V1).await?; ThumbnailVersion::V1 } }; - while current_version != ThumbnailVersion::V2 { - match current_version { - ThumbnailVersion::V1 => { - let thumbnail_dir_for_task = thumbnail_dir.clone(); - // If the migration fails, it will return the error and exit the function - move_webp_files(&thumbnail_dir_for_task).await?; - version_manager.set_version(ThumbnailVersion::V2)?; - current_version = ThumbnailVersion::V2; - } - // If the current version is not handled explicitly, break the loop or return an error. - _ => { - error!("Thumbnail version is not handled: {:?}", current_version); - } - } + if current_version != ThumbnailVersion::LATEST_VERSION { + info!( + "Migrating thumbnail directory from {:?} to V3", + current_version + ); + + // Taking a reference to databases so we can move it into the closure and comply with the borrowck + let databases = &databases; + + version_manager + .migrate(current_version, |current, next| { + let thumbnail_dir = &thumbnails_directory; + async move { + match (current, next) { + (ThumbnailVersion::V1, ThumbnailVersion::V2) => { + move_to_shards(thumbnail_dir).await + } + (ThumbnailVersion::V2, ThumbnailVersion::V3) => { + segregate_thumbnails_by_library(thumbnail_dir, databases).await + } + + _ => { + error!("Thumbnail version is not handled: {:?}", current); + Err(VersionManagerError::UnexpectedMigration { + current_version: current.int_value(), + next_version: next.int_value(), + } + .into()) + } + } + } + }) + .await?; } - Ok(thumbnail_dir) + Ok(()) } /// This function moves all webp files in the thumbnail directory to their respective shard folders. /// It is used to migrate from V1 to V2. -async fn move_webp_files(dir: &PathBuf) -> Result<(), ThumbnailerError> { - let mut dir_entries = fs::read_dir(dir) +async fn move_to_shards(thumbnails_directory: impl AsRef) -> Result<(), ThumbnailerError> { + let thumbnails_directory = thumbnails_directory.as_ref(); + + let mut dir_entries = fs::read_dir(thumbnails_directory) .await - .map_err(|source| FileIOError::from((dir, source)))?; + .map_err(|source| FileIOError::from((thumbnails_directory, source)))?; + let mut count = 0; while let Ok(Some(entry)) = dir_entries.next_entry().await { - let path = entry.path(); - if path.is_file() { - if let Some(extension) = path.extension() { - if extension == "webp" { - let filename = path - .file_name() - .expect("Missing file name") - .to_str() - .expect("Failed to parse UTF8"); // we know they're cas_id's, so they're valid utf8 - let shard_folder = get_shard_hex(filename); + if entry + .file_type() + .await + .map_err(|e| FileIOError::from((entry.path(), e)))? + .is_file() + { + let path = entry.path(); + if path.extension() == Some(WEBP_EXTENSION.as_ref()) { + let file_name = entry.file_name(); - let new_dir = dir.join(shard_folder); - fs::create_dir_all(&new_dir) - .await - .map_err(|source| FileIOError::from((new_dir.clone(), source)))?; + // we know they're cas_id's, so they're valid utf8 + let shard_folder = get_shard_hex(file_name.to_str().expect("Failed to parse UTF8")); - let new_path = new_dir.join(filename); - fs::rename(&path, &new_path) - .await - .map_err(|source| FileIOError::from((path.clone(), source)))?; - count += 1; - } + let new_dir = thumbnails_directory.join(shard_folder); + fs::create_dir_all(&new_dir) + .await + .map_err(|source| FileIOError::from((new_dir.clone(), source)))?; + + let new_path = new_dir.join(file_name); + fs::rename(&path, &new_path) + .await + .map_err(|source| FileIOError::from((path.clone(), source)))?; + count += 1; } } } - trace!( + + info!( "Moved {} webp files to their respective shard folders.", count ); + + Ok(()) +} + +async fn segregate_thumbnails_by_library( + thumbnails_directory: impl AsRef, + databases: &HashMap>, +) -> Result<(), ThumbnailerError> { + // We already created the library folders in init_thumbnail_dir, so we can just move the files + // to their respective folders + + let thumbnails_directory = thumbnails_directory.as_ref(); + + databases + .iter() + .map(|(library_id, db)| (*library_id, Arc::clone(db))) + .map(|(library_id, db)| { + let library_thumbs_dir = thumbnails_directory.join(library_id.to_string()); + let old_thumbs_dir = thumbnails_directory.to_path_buf(); + spawn(async move { + let mut shards_to_create = HashSet::new(); + + let to_move = db + .file_path() + .find_many(vec![file_path::cas_id::not(None)]) + .select(file_path::select!({ cas_id })) + .exec() + .await? + .into_iter() + .filter_map(|file_path| file_path.cas_id) + .map(|cas_id| { + let new_shard = get_shard_hex(&cas_id).to_string(); + let new_sharded_filename = format!("{new_shard}/{cas_id}.webp"); + let old_sharded_filename = format!("{}/{cas_id}.webp", &cas_id[0..2]); + + (new_shard, new_sharded_filename, old_sharded_filename) + }) + .map(|(new_shard, new_sharded_filename, old_sharded_filename)| { + let old = old_thumbs_dir.join(old_sharded_filename); + let new = library_thumbs_dir.join(new_sharded_filename); + let new_shard_dir = library_thumbs_dir.join(new_shard); + + shards_to_create.insert(new_shard_dir); + + async move { + trace!( + "Moving thumbnail from old location to new location: {} -> {}", + old.display(), + new.display() + ); + + match fs::rename(&old, new).await { + Ok(_) => Ok(1), + Err(e) if e.kind() == io::ErrorKind::NotFound => { + // Thumbnail not found, it probably wasn't processed yet + Ok(0) + } + Err(e) => { + Err(ThumbnailerError::FileIO(FileIOError::from((old, e)))) + } + } + } + }) + .collect::>(); + + let shards_created_count = shards_to_create + .into_iter() + .map(|path| async move { + fs::create_dir_all(&path) + .await + .map_err(|e| FileIOError::from((path, e))) + }) + .collect::>() + .try_join() + .await? + .len(); + + let moved_count = to_move.try_join().await?.into_iter().sum::(); + + info!( + "Created {shards_created_count} shards and moved {moved_count} \ + thumbnails to library folder {library_id}" + ); + + Ok::<_, ThumbnailerError>(()) + }) + }) + .collect::>() + .try_join() + .await? + .into_iter() + .collect::>()?; + + // Now that we moved all files from all databases, everything else should be ephemeral thumbnails + // so we can just move all of them to the ephemeral directory + let ephemeral_thumbs_dir = thumbnails_directory.join(EPHEMERAL_DIR); + + let mut shards_to_create = HashSet::new(); + let mut to_move = vec![]; + + let mut read_thumbs_dir = fs::read_dir(thumbnails_directory) + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; + + let mut empty_shards = vec![]; + + while let Some(shard_entry) = read_thumbs_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))? + { + let old_shard_path = shard_entry.path(); + if shard_entry + .file_type() + .await + .map_err(|e| FileIOError::from((&old_shard_path, e)))? + .is_dir() + { + let mut read_shard_dir = fs::read_dir(&old_shard_path) + .await + .map_err(|e| FileIOError::from((&old_shard_path, e)))?; + + while let Some(thumb_entry) = read_shard_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&old_shard_path, e)))? + { + let thumb_path = thumb_entry.path(); + if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref()) { + let thumb_filename = thumb_entry.file_name(); + + let mut new_ephemeral_shard = ephemeral_thumbs_dir.join(get_shard_hex( + thumb_filename.to_str().expect("cas_ids are utf-8"), + )); + + shards_to_create.insert(new_ephemeral_shard.clone()); + + new_ephemeral_shard.push(thumb_filename); + + to_move.push(async move { + trace!( + "Moving thumbnail from old location to new location: {} -> {}", + thumb_path.display(), + new_ephemeral_shard.display() + ); + + fs::rename(&thumb_path, &new_ephemeral_shard) + .await + .map_err(|e| FileIOError::from((thumb_path, e))) + }); + } + } + + empty_shards.push(old_shard_path); + } + } + + shards_to_create + .into_iter() + .map(|path| async move { + fs::create_dir_all(&path) + .await + .map_err(|e| FileIOError::from((path, e))) + }) + .collect::>() + .try_join() + .await?; + + let moved_shard = to_move.try_join().await?.len(); + + info!("Moved {moved_shard} shards to the ephemeral directory"); + + empty_shards + .into_iter() + .filter_map(|path| { + path.file_name() + .map_or(false, |name| name.len() == 2) + .then_some(async move { + trace!("Removing empty shard directory: {}", path.display()); + fs::remove_dir(&path) + .await + .map_err(|e| FileIOError::from((path, e))) + }) + }) + .collect::>() + .try_join() + .await?; + Ok(()) } diff --git a/core/src/object/media/thumbnail/mod.rs b/core/src/object/media/thumbnail/mod.rs index ab459b968..ea0cc7e59 100644 --- a/core/src/object/media/thumbnail/mod.rs +++ b/core/src/object/media/thumbnail/mod.rs @@ -1,4 +1,5 @@ use crate::{ + library::LibraryId, util::{error::FileIOError, version_manager::VersionManagerError}, Node, }; @@ -6,50 +7,99 @@ use crate::{ use sd_file_ext::extensions::{ DocumentExtension, Extension, ImageExtension, ALL_DOCUMENT_EXTENSIONS, ALL_IMAGE_EXTENSIONS, }; -use sd_images::{format_image, scale_dimensions}; -use sd_media_metadata::image::Orientation; #[cfg(feature = "ffmpeg")] use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS}; use std::{ - ops::Deref, path::{Path, PathBuf}, + time::Duration, }; -use image::{self, imageops, DynamicImage, GenericImageView}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::{fs, task}; +use tokio::task; use tracing::error; -use webp::Encoder; pub mod actor; +mod clean_up; mod directory; +mod process; mod shard; +mod state; +mod worker; -pub use directory::init_thumbnail_dir; +pub use process::{BatchToProcess, GenerateThumbnailArgs}; pub use shard::get_shard_hex; -pub const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; +// Files names constants +const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; +const SAVE_STATE_FILE: &str = "thumbs_to_process.bin"; +const VERSION_FILE: &str = "version.txt"; +pub const WEBP_EXTENSION: &str = "webp"; +const EPHEMERAL_DIR: &str = "ephemeral"; + +/// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled +/// to [`TARGET_QUALITY`]. +const TARGET_PX: f32 = 262144_f32; + +/// This is the target quality that we render thumbnails at, it is a float between 0-100 +/// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`). +const TARGET_QUALITY: f32 = 30_f32; + +// Some time constants +const ONE_SEC: Duration = Duration::from_secs(1); +const THIRTY_SECS: Duration = Duration::from_secs(30); +const HALF_HOUR: Duration = Duration::from_secs(30 * 60); + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum ThumbnailKind { + Ephemeral, + Indexed(LibraryId), +} + +pub fn get_indexed_thumbnail_path(node: &Node, cas_id: &str, library_id: LibraryId) -> PathBuf { + get_thumbnail_path(node, cas_id, ThumbnailKind::Indexed(library_id)) +} /// This does not check if a thumbnail exists, it just returns the path that it would exist at -pub fn get_thumbnail_path(node: &Node, cas_id: &str) -> PathBuf { +fn get_thumbnail_path(node: &Node, cas_id: &str, kind: ThumbnailKind) -> PathBuf { let mut thumb_path = node.config.data_directory(); thumb_path.push(THUMBNAIL_CACHE_DIR_NAME); + match kind { + ThumbnailKind::Ephemeral => thumb_path.push(EPHEMERAL_DIR), + ThumbnailKind::Indexed(library_id) => { + thumb_path.push(library_id.to_string()); + } + } thumb_path.push(get_shard_hex(cas_id)); thumb_path.push(cas_id); - thumb_path.set_extension("webp"); + thumb_path.set_extension(WEBP_EXTENSION); thumb_path } +pub fn get_indexed_thumb_key(cas_id: &str, library_id: LibraryId) -> Vec { + get_thumb_key(cas_id, ThumbnailKind::Indexed(library_id)) +} + +pub fn get_ephemeral_thumb_key(cas_id: &str) -> Vec { + get_thumb_key(cas_id, ThumbnailKind::Ephemeral) +} + // this is used to pass the relevant data to the frontend so it can request the thumbnail // it supports extending the shard hex to support deeper directory structures in the future -pub fn get_thumb_key(cas_id: &str) -> Vec { - vec![get_shard_hex(cas_id), cas_id.to_string()] +fn get_thumb_key(cas_id: &str, kind: ThumbnailKind) -> Vec { + vec![ + match kind { + ThumbnailKind::Ephemeral => String::from(EPHEMERAL_DIR), + ThumbnailKind::Indexed(library_id) => library_id.to_string(), + }, + get_shard_hex(cas_id).to_string(), + cas_id.to_string(), + ] } #[cfg(feature = "ffmpeg")] @@ -78,7 +128,7 @@ pub(super) static THUMBNAILABLE_EXTENSIONS: Lazy> = Lazy::new(|| .collect() }); -pub static ALL_THUMBNAILABLE_EXTENSIONS: Lazy> = Lazy::new(|| { +pub(super) static ALL_THUMBNAILABLE_EXTENSIONS: Lazy> = Lazy::new(|| { #[cfg(feature = "ffmpeg")] return THUMBNAILABLE_EXTENSIONS .iter() @@ -110,19 +160,11 @@ pub enum ThumbnailerError { Task(#[from] task::JoinError), #[cfg(feature = "ffmpeg")] #[error(transparent)] - FFmpeg(#[from] sd_ffmpeg::ThumbnailerError), + FFmpeg(#[from] sd_ffmpeg::Error), #[error("thumbnail generation timed out for {}", .0.display())] TimedOut(Box), } -/// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled -/// to [`TARGET_QUALITY`]. -const TARGET_PX: f32 = 262144_f32; - -/// This is the target quality that we render thumbnails at, it is a float between 0-100 -/// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`). -const TARGET_QUALITY: f32 = 30_f32; - #[derive(Debug, Serialize, Deserialize, Clone, Copy)] pub enum ThumbnailerEntryKind { Image, @@ -136,80 +178,6 @@ pub struct ThumbnailerMetadata { pub skipped: u32, } -pub async fn generate_image_thumbnail( - file_path: impl AsRef, - output_path: impl AsRef, -) -> Result<(), ThumbnailerError> { - let file_path = file_path.as_ref().to_path_buf(); - - let webp = task::spawn_blocking(move || -> Result<_, ThumbnailerError> { - let img = format_image(&file_path).map_err(|e| ThumbnailerError::SdImages { - path: file_path.clone().into_boxed_path(), - error: e, - })?; - - let (w, h) = img.dimensions(); - let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX); - - // Optionally, resize the existing photo and convert back into DynamicImage - let mut img = DynamicImage::ImageRgba8(imageops::resize( - &img, - w_scaled as u32, - h_scaled as u32, - imageops::FilterType::Triangle, - )); - - // this corrects the rotation/flip of the image based on the *available* exif data - // not all images have exif data, so we don't error - if let Some(orientation) = Orientation::from_path(&file_path) { - img = orientation.correct_thumbnail(img); - } - - // Create the WebP encoder for the above image - let encoder = - Encoder::from_image(&img).map_err(|reason| ThumbnailerError::WebPEncoding { - path: file_path.into_boxed_path(), - reason: reason.to_string(), - })?; - - // Type WebPMemory is !Send, which makes the Future in this function !Send, - // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec - // which implies on a unwanted clone... - Ok(encoder.encode(TARGET_QUALITY).deref().to_owned()) - }) - .await??; - - let output_path = output_path.as_ref(); - - if let Some(shard_dir) = output_path.parent() { - fs::create_dir_all(shard_dir) - .await - .map_err(|e| FileIOError::from((shard_dir, e)))?; - } else { - error!( - "Failed to get parent directory of '{}' for sharding parent directory", - output_path.display() - ); - } - - fs::write(output_path, &webp) - .await - .map_err(|e| FileIOError::from((output_path, e))) - .map_err(Into::into) -} - -#[cfg(feature = "ffmpeg")] -pub async fn generate_video_thumbnail( - file_path: impl AsRef, - output_path: impl AsRef, -) -> Result<(), ThumbnailerError> { - use sd_ffmpeg::to_thumbnail; - - to_thumbnail(file_path, output_path, 256, TARGET_QUALITY) - .await - .map_err(Into::into) -} - #[cfg(feature = "ffmpeg")] pub const fn can_generate_thumbnail_for_video(video_extension: &VideoExtension) -> bool { use VideoExtension::*; diff --git a/core/src/object/media/thumbnail/process.rs b/core/src/object/media/thumbnail/process.rs new file mode 100644 index 000000000..00d366b4b --- /dev/null +++ b/core/src/object/media/thumbnail/process.rs @@ -0,0 +1,420 @@ +use crate::{api::CoreEvent, util::error::FileIOError}; + +use sd_file_ext::extensions::{DocumentExtension, ImageExtension}; +use sd_images::{format_image, scale_dimensions}; +use sd_media_metadata::image::Orientation; +use sd_prisma::prisma::location; + +use std::{ + collections::VecDeque, + ffi::OsString, + ops::Deref, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, +}; + +use async_channel as chan; +use futures_concurrency::future::{Join, Race}; +use image::{self, imageops, DynamicImage, GenericImageView}; +use serde::{Deserialize, Serialize}; +use tokio::{ + fs, io, + sync::{broadcast, oneshot}, + task::{spawn, spawn_blocking}, + time::timeout, +}; +use tracing::{error, trace, warn}; +use webp::Encoder; + +use super::{ + can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, get_thumb_key, + shard::get_shard_hex, ThumbnailKind, ThumbnailerError, EPHEMERAL_DIR, TARGET_PX, + TARGET_QUALITY, THIRTY_SECS, WEBP_EXTENSION, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct GenerateThumbnailArgs { + pub extension: String, + pub cas_id: String, + pub path: PathBuf, +} + +impl GenerateThumbnailArgs { + pub fn new(extension: String, cas_id: String, path: PathBuf) -> Self { + Self { + extension, + cas_id, + path, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchToProcess { + pub(super) batch: Vec, + pub(super) should_regenerate: bool, + pub(super) in_background: bool, + pub(super) location_id: Option, +} + +impl BatchToProcess { + pub fn new( + batch: Vec, + should_regenerate: bool, + in_background: bool, + ) -> Self { + Self { + batch, + should_regenerate, + in_background, + location_id: None, + } + } +} + +pub(super) struct ProcessorControlChannels { + pub stop_rx: chan::Receiver>, + pub done_tx: oneshot::Sender<()>, + pub batch_report_progress_tx: chan::Sender<(location::id::Type, u32)>, +} + +pub(super) async fn batch_processor( + thumbnails_directory: Arc, + ( + BatchToProcess { + batch, + should_regenerate, + in_background, + location_id, + }, + kind, + ): (BatchToProcess, ThumbnailKind), + generated_ephemeral_thumbs_file_names_tx: chan::Sender>, + ProcessorControlChannels { + stop_rx, + done_tx, + batch_report_progress_tx, + }: ProcessorControlChannels, + leftovers_tx: chan::Sender<(BatchToProcess, ThumbnailKind)>, + reporter: broadcast::Sender, + batch_size: usize, +) { + trace!( + "Processing thumbnails batch of kind {kind:?} with size {} in {}", + batch.len(), + if in_background { + "background" + } else { + "foreground" + }, + ); + + // Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning + // This from is guaranteed to be O(1) + let mut queue = VecDeque::from(batch); + + enum RaceOutputs { + Processed, + Stop(oneshot::Sender<()>), + } + + while !queue.is_empty() { + let chunk = (0..batch_size) + .filter_map(|_| queue.pop_front()) + .map( + |GenerateThumbnailArgs { + extension, + cas_id, + path, + }| { + let reporter = reporter.clone(); + let thumbnails_directory = thumbnails_directory.as_ref().clone(); + spawn(async move { + timeout( + THIRTY_SECS, + generate_thumbnail( + thumbnails_directory, + ThumbData { + extension: &extension, + cas_id, + path: &path, + in_background, + should_regenerate, + kind, + }, + reporter, + ), + ) + .await + .unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path()))) + }) + }, + ) + .collect::>(); + + let generated_ephemeral_thumbs_file_names_tx = + generated_ephemeral_thumbs_file_names_tx.clone(); + let report_progress_tx = batch_report_progress_tx.clone(); + let chunk_len = chunk.len() as u32; + + if let RaceOutputs::Stop(stopped_tx) = ( + async move { + if let Err(e) = spawn(async move { + let cas_ids = chunk + .join() + .await + .into_iter() + .filter_map(|join_result| { + join_result + .map_err(|e| { + error!("Failed to join thumbnail generation task: {e:#?}") + }) + .ok() + }) + .filter_map(|result| { + result + .map_err(|e| { + error!( + "Failed to generate thumbnail for {} location: {e:#?}", + if let ThumbnailKind::Ephemeral = kind { + "ephemeral" + } else { + "indexed" + } + ) + }) + .ok() + }) + .map(|cas_id| OsString::from(format!("{}.webp", cas_id))) + .collect(); + + if kind == ThumbnailKind::Ephemeral + && generated_ephemeral_thumbs_file_names_tx + .send(cas_ids) + .await + .is_err() + { + error!("Thumbnail actor is dead: Failed to send generated cas ids") + } + + if let Some(location_id) = location_id { + report_progress_tx.send((location_id, chunk_len)).await.ok(); + } + }) + .await + { + error!("Failed to join spawned task to process thumbnails chunk on a batch: {e:#?}"); + } + + trace!("Processed chunk with {chunk_len} thumbnails"); + RaceOutputs::Processed + }, + async { + let tx = stop_rx + .recv() + .await + .expect("Critical error on thumbnails actor"); + trace!("Received a stop signal"); + RaceOutputs::Stop(tx) + }, + ) + .race() + .await + { + // Our queue is always contiguous, so this `from` is free + let leftovers = Vec::from(queue); + + trace!( + "Stopped with {} thumbnails left to process", + leftovers.len() + ); + if !leftovers.is_empty() + && leftovers_tx + .send(( + BatchToProcess { + batch: leftovers, + should_regenerate, + in_background: true, // Leftovers should always be in background + location_id, + }, + kind, + )) + .await + .is_err() + { + error!("Thumbnail actor is dead: Failed to send leftovers") + } + + done_tx.send(()).ok(); + stopped_tx.send(()).ok(); + + return; + } + } + + trace!("Finished batch!"); + + done_tx.send(()).ok(); +} + +pub(super) struct ThumbData<'ext, P: AsRef> { + pub extension: &'ext str, + pub cas_id: String, + pub path: P, + pub in_background: bool, + pub should_regenerate: bool, + pub kind: ThumbnailKind, +} + +pub(super) async fn generate_thumbnail( + thumbnails_directory: PathBuf, + ThumbData { + extension, + cas_id, + path, + in_background, + should_regenerate, + kind, + }: ThumbData<'_, impl AsRef>, + reporter: broadcast::Sender, +) -> Result { + let path = path.as_ref(); + trace!("Generating thumbnail for {}", path.display()); + + let mut output_path = thumbnails_directory; + match kind { + ThumbnailKind::Ephemeral => output_path.push(EPHEMERAL_DIR), + ThumbnailKind::Indexed(library_id) => output_path.push(library_id.to_string()), + }; + output_path.push(get_shard_hex(&cas_id)); + output_path.push(&cas_id); + output_path.set_extension(WEBP_EXTENSION); + + if let Err(e) = fs::metadata(&output_path).await { + if e.kind() != io::ErrorKind::NotFound { + error!( + "Failed to check if thumbnail exists, but we will try to generate it anyway: {e:#?}" + ); + } + // Otherwise we good, thumbnail doesn't exist so we can generate it + } else if !should_regenerate { + trace!( + "Skipping thumbnail generation for {} because it already exists", + path.display() + ); + return Ok(cas_id); + } + + if let Ok(extension) = ImageExtension::from_str(extension) { + if can_generate_thumbnail_for_image(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } else if let Ok(extension) = DocumentExtension::from_str(extension) { + if can_generate_thumbnail_for_document(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } + + #[cfg(feature = "ffmpeg")] + { + use crate::object::media::thumbnail::can_generate_thumbnail_for_video; + use sd_file_ext::extensions::VideoExtension; + + if let Ok(extension) = VideoExtension::from_str(extension) { + if can_generate_thumbnail_for_video(&extension) { + generate_video_thumbnail(&path, &output_path).await?; + } + } + } + + if !in_background { + trace!("Emitting new thumbnail event"); + if reporter + .send(CoreEvent::NewThumbnail { + thumb_key: get_thumb_key(&cas_id, kind), + }) + .is_err() + { + warn!("Error sending event to Node's event bus"); + } + } + + trace!("Generated thumbnail for {}", path.display()); + + Ok(cas_id) +} + +async fn generate_image_thumbnail( + file_path: impl AsRef, + output_path: impl AsRef, +) -> Result<(), ThumbnailerError> { + let file_path = file_path.as_ref().to_path_buf(); + + let webp = spawn_blocking(move || -> Result<_, ThumbnailerError> { + let img = format_image(&file_path).map_err(|e| ThumbnailerError::SdImages { + path: file_path.clone().into_boxed_path(), + error: e, + })?; + + let (w, h) = img.dimensions(); + let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX); + + // Optionally, resize the existing photo and convert back into DynamicImage + let mut img = DynamicImage::ImageRgba8(imageops::resize( + &img, + w_scaled as u32, + h_scaled as u32, + imageops::FilterType::Triangle, + )); + + // this corrects the rotation/flip of the image based on the *available* exif data + // not all images have exif data, so we don't error + if let Some(orientation) = Orientation::from_path(&file_path) { + img = orientation.correct_thumbnail(img); + } + + // Create the WebP encoder for the above image + let encoder = + Encoder::from_image(&img).map_err(|reason| ThumbnailerError::WebPEncoding { + path: file_path.into_boxed_path(), + reason: reason.to_string(), + })?; + + // Type WebPMemory is !Send, which makes the Future in this function !Send, + // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec + // which implies on a unwanted clone... + Ok(encoder.encode(TARGET_QUALITY).deref().to_owned()) + }) + .await??; + + let output_path = output_path.as_ref(); + + if let Some(shard_dir) = output_path.parent() { + fs::create_dir_all(shard_dir) + .await + .map_err(|e| FileIOError::from((shard_dir, e)))?; + } else { + error!( + "Failed to get parent directory of '{}' for sharding parent directory", + output_path.display() + ); + } + + fs::write(output_path, &webp) + .await + .map_err(|e| FileIOError::from((output_path, e))) + .map_err(Into::into) +} + +#[cfg(feature = "ffmpeg")] +async fn generate_video_thumbnail( + file_path: impl AsRef, + output_path: impl AsRef, +) -> Result<(), ThumbnailerError> { + use sd_ffmpeg::to_thumbnail; + + to_thumbnail(file_path, output_path, 256, TARGET_QUALITY) + .await + .map_err(Into::into) +} diff --git a/core/src/object/media/thumbnail/shard.rs b/core/src/object/media/thumbnail/shard.rs index 81fd417e8..be61e2034 100644 --- a/core/src/object/media/thumbnail/shard.rs +++ b/core/src/object/media/thumbnail/shard.rs @@ -1,8 +1,13 @@ -/// The practice of dividing files into hex coded folders, often called "sharding," is mainly used to optimize file system performance. File systems can start to slow down as the number of files in a directory increases. Thus, it's often beneficial to split files into multiple directories to avoid this performance degradation. +/// The practice of dividing files into hex coded folders, often called "sharding," +/// is mainly used to optimize file system performance. File systems can start to slow down +/// as the number of files in a directory increases. Thus, it's often beneficial to split +/// files into multiple directories to avoid this performance degradation. -/// `get_shard_hex` takes a cas_id (a hexadecimal hash) as input and returns the first two characters of the hash as the directory name. Because we're using the first two characters of a the hash, this will give us 256 (16*16) possible directories, named 00 to ff. -pub fn get_shard_hex(cas_id: &str) -> String { - // Use the first two characters of the hash as the directory name - let directory_name = &cas_id[0..2]; - directory_name.to_string() +/// `get_shard_hex` takes a cas_id (a hexadecimal hash) as input and returns the first +/// three characters of the hash as the directory name. Because we're using these first +/// three characters of a the hash, this will give us 4096 (16^3) possible directories, +/// named 000 to fff. +pub fn get_shard_hex(cas_id: &str) -> &str { + // Use the first three characters of the hash as the directory name + &cas_id[0..3] } diff --git a/core/src/object/media/thumbnail/state.rs b/core/src/object/media/thumbnail/state.rs new file mode 100644 index 000000000..400302725 --- /dev/null +++ b/core/src/object/media/thumbnail/state.rs @@ -0,0 +1,222 @@ +use crate::{library::LibraryId, util::error::FileIOError}; + +use std::{ + collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + ffi::OsString, + path::Path, +}; + +use async_channel as chan; +use futures_concurrency::future::TryJoin; +use sd_prisma::prisma::location; +use serde::{Deserialize, Serialize}; +use tokio::{fs, io}; +use tracing::{error, info, trace}; + +use super::{ + actor::ActorError, get_shard_hex, BatchToProcess, ThumbnailKind, EPHEMERAL_DIR, SAVE_STATE_FILE, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct ThumbsProcessingSaveState { + pub(super) bookkeeper: BookKeeper, + pub(super) ephemeral_file_names: HashSet, + // This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch + // by entering a new directory in the explorer, otherwise processing as FIFO + pub(super) queue: VecDeque<(BatchToProcess, ThumbnailKind)>, + // These below are FIFO queues, so we can process leftovers from the previous batch first + pub(super) indexed_leftovers_queue: VecDeque<(BatchToProcess, LibraryId)>, + pub(super) ephemeral_leftovers_queue: VecDeque, +} + +impl Default for ThumbsProcessingSaveState { + fn default() -> Self { + Self { + bookkeeper: BookKeeper::default(), + ephemeral_file_names: HashSet::with_capacity(128), + queue: VecDeque::with_capacity(32), + indexed_leftovers_queue: VecDeque::with_capacity(8), + ephemeral_leftovers_queue: VecDeque::with_capacity(8), + } + } +} + +impl ThumbsProcessingSaveState { + pub(super) async fn load(thumbnails_directory: impl AsRef) -> Self { + let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); + + match fs::read(&resume_file).await { + Ok(bytes) => { + let this = rmp_serde::from_slice::(&bytes).unwrap_or_else(|e| { + error!("Failed to deserialize save state at thumbnailer actor: {e:#?}"); + Self::default() + }); + + if let Err(e) = fs::remove_file(&resume_file).await { + error!( + "Failed to remove save state file at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + } + + info!( + "Resuming thumbnailer actor state: Existing ephemeral thumbs: {}; \ + Queued batches waiting processing: {}", + this.ephemeral_file_names.len(), + this.queue.len() + + this.indexed_leftovers_queue.len() + + this.ephemeral_leftovers_queue.len() + ); + + this + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + trace!("No save state found at thumbnailer actor"); + Self::default() + } + Err(e) => { + error!( + "Failed to read save state at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + Self::default() + } + } + } + + pub(super) async fn store(self, thumbnails_directory: impl AsRef) { + let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); + + info!( + "Saving thumbnailer actor state: Existing ephemeral thumbs: {}; \ + Queued batches waiting processing: {}", + self.ephemeral_file_names.len(), + self.queue.len() + + self.indexed_leftovers_queue.len() + + self.ephemeral_leftovers_queue.len() + ); + + let Ok(bytes) = rmp_serde::to_vec_named(&self).map_err(|e| { + error!("Failed to serialize save state at thumbnailer actor: {e:#?}"); + }) else { + return; + }; + + if let Err(e) = fs::write(&resume_file, bytes).await { + error!( + "Failed to write save state at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + } + } +} + +pub(super) async fn remove_by_cas_ids( + thumbnails_directory: &Path, + cas_ids: Vec, + kind: ThumbnailKind, +) -> Result<(), ActorError> { + let base_dir = match kind { + ThumbnailKind::Ephemeral => thumbnails_directory.join(EPHEMERAL_DIR), + ThumbnailKind::Indexed(library_id) => thumbnails_directory.join(library_id.to_string()), + }; + + cas_ids + .into_iter() + .map(|cas_id| { + let thumbnail_path = base_dir.join(format!("{}/{cas_id}.webp", get_shard_hex(&cas_id))); + + trace!("Removing thumbnail: {}", thumbnail_path.display()); + + async move { + match fs::remove_file(&thumbnail_path).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(FileIOError::from((thumbnail_path, e))), + } + } + }) + .collect::>() + .try_join() + .await?; + + Ok(()) +} + +pub(super) type RegisterReporter = (location::id::Type, chan::Sender<(u32, u32)>); + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct BookKeeper { + work_progress: HashMap, // (pending, total) + + // We can't save reporter function or a channel to disk, the job must ask again to be registered + #[serde(skip, default)] + reporter_by_location: HashMap>, +} +impl Default for BookKeeper { + fn default() -> Self { + Self { + work_progress: HashMap::with_capacity(8), + reporter_by_location: HashMap::with_capacity(8), + } + } +} + +impl BookKeeper { + pub(super) async fn add_work(&mut self, location_id: location::id::Type, thumbs_count: u32) { + let (in_progress, total) = match self.work_progress.entry(location_id) { + Entry::Occupied(mut entry) => { + let (in_progress, total) = entry.get_mut(); + + *total += thumbs_count; + + (*in_progress, *total) + } + Entry::Vacant(entry) => { + entry.insert((0, thumbs_count)); + + (0, thumbs_count) + } + }; + + if let Some(progress_tx) = self.reporter_by_location.get(&location_id) { + if progress_tx.send((in_progress, total)).await.is_err() { + error!( + "Failed to send progress update to reporter on location " + ); + } + } + } + + pub(super) fn register_reporter( + &mut self, + location_id: location::id::Type, + reporter_tx: chan::Sender<(u32, u32)>, + ) { + self.reporter_by_location.insert(location_id, reporter_tx); + } + + pub(super) async fn add_progress(&mut self, location_id: location::id::Type, progress: u32) { + if let Some((current_progress, total)) = self.work_progress.get_mut(&location_id) { + *current_progress += progress; + + if *current_progress == *total { + if let Some(progress_tx) = self.reporter_by_location.remove(&location_id) { + if progress_tx.send((*current_progress, *total)).await.is_err() { + error!( + "Failed to send progress update to reporter on location " + ); + } + } + + self.work_progress.remove(&location_id); + } else if let Some(progress_tx) = self.reporter_by_location.get(&location_id) { + if progress_tx.send((*current_progress, *total)).await.is_err() { + error!( + "Failed to send progress update to reporter on location " + ); + } + } + } + } +} diff --git a/core/src/object/media/thumbnail/worker.rs b/core/src/object/media/thumbnail/worker.rs new file mode 100644 index 000000000..23c2a4c45 --- /dev/null +++ b/core/src/object/media/thumbnail/worker.rs @@ -0,0 +1,323 @@ +use crate::api::CoreEvent; + +use std::{collections::HashMap, ffi::OsString, path::PathBuf, sync::Arc}; + +use sd_prisma::prisma::location; + +use async_channel as chan; +use futures_concurrency::stream::Merge; +use tokio::{ + spawn, + sync::{broadcast, oneshot}, + time::{interval, interval_at, timeout, Instant, MissedTickBehavior}, +}; +use tokio_stream::{wrappers::IntervalStream, StreamExt}; +use tracing::{debug, error, trace}; + +use super::{ + actor::DatabaseMessage, + clean_up::{process_ephemeral_clean_up, process_indexed_clean_up}, + process::{batch_processor, ProcessorControlChannels}, + state::{remove_by_cas_ids, RegisterReporter, ThumbsProcessingSaveState}, + BatchToProcess, ThumbnailKind, HALF_HOUR, ONE_SEC, THIRTY_SECS, +}; + +#[derive(Debug, Clone)] +pub(super) struct WorkerChannels { + pub(super) progress_management_rx: chan::Receiver, + pub(super) databases_rx: chan::Receiver, + pub(super) cas_ids_to_delete_rx: chan::Receiver<(Vec, ThumbnailKind)>, + pub(super) thumbnails_to_generate_rx: chan::Receiver<(BatchToProcess, ThumbnailKind)>, + pub(super) cancel_rx: chan::Receiver>, +} + +pub(super) async fn worker( + batch_size: usize, + reporter: broadcast::Sender, + thumbnails_directory: Arc, + WorkerChannels { + progress_management_rx, + databases_rx, + cas_ids_to_delete_rx, + thumbnails_to_generate_rx, + cancel_rx, + }: WorkerChannels, +) { + let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR); + to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut idle_interval = interval(ONE_SEC); + idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut databases = HashMap::new(); + + #[derive(Debug)] + enum StreamMessage { + RemovalTick, + ToDelete((Vec, ThumbnailKind)), + Database(DatabaseMessage), + NewBatch((BatchToProcess, ThumbnailKind)), + Leftovers((BatchToProcess, ThumbnailKind)), + NewEphemeralThumbnailsFilenames(Vec), + ProgressManagement(RegisterReporter), + BatchProgress((location::id::Type, u32)), + Shutdown(oneshot::Sender<()>), + IdleTick, + } + + let ThumbsProcessingSaveState { + mut bookkeeper, + mut ephemeral_file_names, + mut queue, + mut indexed_leftovers_queue, + mut ephemeral_leftovers_queue, + } = ThumbsProcessingSaveState::load(thumbnails_directory.as_ref()).await; + + let (generated_ephemeral_thumbnails_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32); + let (leftovers_tx, leftovers_rx) = chan::bounded(8); + let (batch_report_progress_tx, batch_report_progress_rx) = chan::bounded(8); + let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1); + + let mut shutdown_leftovers_rx = leftovers_rx.clone(); + let mut shutdowm_batch_report_progress_rx = batch_report_progress_rx.clone(); + + let mut current_batch_processing_rx: Option> = None; + + let mut msg_stream = ( + IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick), + cas_ids_to_delete_rx.map(StreamMessage::ToDelete), + databases_rx.map(StreamMessage::Database), + thumbnails_to_generate_rx.map(StreamMessage::NewBatch), + leftovers_rx.map(StreamMessage::Leftovers), + ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailsFilenames), + progress_management_rx.map(StreamMessage::ProgressManagement), + batch_report_progress_rx.map(StreamMessage::BatchProgress), + cancel_rx.map(StreamMessage::Shutdown), + IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick), + ) + .merge(); + + while let Some(msg) = msg_stream.next().await { + match msg { + StreamMessage::IdleTick => { + if let Some(done_rx) = current_batch_processing_rx.as_mut() { + // Checking if the previous run finished or was aborted to clean state + match done_rx.try_recv() { + Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { + current_batch_processing_rx = None; + } + + Err(oneshot::error::TryRecvError::Empty) => { + // The previous run is still running + continue; + } + } + } + + if current_batch_processing_rx.is_none() + && (!queue.is_empty() + || !indexed_leftovers_queue.is_empty() + || !ephemeral_leftovers_queue.is_empty()) + { + let (done_tx, done_rx) = oneshot::channel(); + current_batch_processing_rx = Some(done_rx); + + let batch_and_kind = if let Some(batch_and_kind) = queue.pop_front() { + batch_and_kind + } else if let Some((batch, library_id)) = indexed_leftovers_queue.pop_front() { + // indexed leftovers have bigger priority + (batch, ThumbnailKind::Indexed(library_id)) + } else if let Some(batch) = ephemeral_leftovers_queue.pop_front() { + (batch, ThumbnailKind::Ephemeral) + } else { + continue; + }; + + spawn(batch_processor( + thumbnails_directory.clone(), + batch_and_kind, + generated_ephemeral_thumbnails_tx.clone(), + ProcessorControlChannels { + stop_rx: stop_older_processing_rx.clone(), + done_tx, + batch_report_progress_tx: batch_report_progress_tx.clone(), + }, + leftovers_tx.clone(), + reporter.clone(), + batch_size, + )); + } + } + + StreamMessage::RemovalTick => { + // For any of them we process a clean up if a time since the last one already passed + if !databases.is_empty() { + spawn(process_indexed_clean_up( + thumbnails_directory.clone(), + databases + .iter() + .map(|(id, db)| (*id, Arc::clone(db))) + .collect::>(), + )); + } + + if !ephemeral_file_names.is_empty() { + spawn(process_ephemeral_clean_up( + thumbnails_directory.clone(), + ephemeral_file_names.clone(), + )); + } + } + + StreamMessage::ToDelete((cas_ids, kind)) => { + if !cas_ids.is_empty() { + if let Err(e) = remove_by_cas_ids(&thumbnails_directory, cas_ids, kind).await { + error!("Got an error when trying to remove thumbnails: {e:#?}"); + } + } + } + + StreamMessage::NewBatch((batch, kind)) => { + let in_background = batch.in_background; + + if let Some(location_id) = batch.location_id { + bookkeeper + .add_work(location_id, batch.batch.len() as u32) + .await; + } + + trace!( + "New {kind:?} batch to process in {}, size: {}", + if in_background { + "background" + } else { + "foreground" + }, + batch.batch.len() + ); + + if in_background { + queue.push_back((batch, kind)); + } else { + // If a processing must be in foreground, then it takes maximum priority + queue.push_front((batch, kind)); + } + + // Only sends stop signal if there is a batch being processed + if !in_background && current_batch_processing_rx.is_some() { + trace!("Sending stop signal to older processing"); + let (tx, rx) = oneshot::channel(); + + match stop_older_processing_tx.try_send(tx) { + Ok(()) => { + // We put a timeout here to avoid a deadlock in case the older processing already + // finished its batch + if timeout(ONE_SEC, rx).await.is_err() { + stop_older_processing_rx.recv().await.ok(); + } + } + Err(e) if e.is_full() => { + // The last signal we sent happened after a batch was already processed + // So we clean the channel and we're good to go. + stop_older_processing_rx.recv().await.ok(); + } + Err(_) => { + error!( + "Thumbnail remover actor died when trying to stop older processing" + ); + } + } + } + } + + StreamMessage::Leftovers((batch, ThumbnailKind::Indexed(library_id))) => { + indexed_leftovers_queue.push_back((batch, library_id)) + } + + StreamMessage::Leftovers((batch, ThumbnailKind::Ephemeral)) => { + ephemeral_leftovers_queue.push_back(batch) + } + + StreamMessage::Database(DatabaseMessage::Add(id, db)) + | StreamMessage::Database(DatabaseMessage::Update(id, db)) => { + databases.insert(id, db); + } + + StreamMessage::Database(DatabaseMessage::Remove(id)) => { + databases.remove(&id); + } + + StreamMessage::NewEphemeralThumbnailsFilenames(new_ephemeral_thumbs) => { + trace!("New ephemeral thumbnails: {}", new_ephemeral_thumbs.len()); + ephemeral_file_names.extend(new_ephemeral_thumbs); + } + + StreamMessage::BatchProgress((location_id, progressed)) => { + bookkeeper.add_progress(location_id, progressed).await; + } + + StreamMessage::Shutdown(cancel_tx) => { + debug!("Thumbnail actor is shutting down..."); + + // First stopping the current batch processing + let (tx, rx) = oneshot::channel(); + match stop_older_processing_tx.try_send(tx) { + Ok(()) => { + // We put a timeout here to avoid a deadlock in case the older processing already + // finished its batch + if timeout(ONE_SEC, rx).await.is_err() { + stop_older_processing_rx.recv().await.ok(); + } + } + Err(e) if e.is_full() => { + // The last signal we sent happened after a batch was already processed + // So we clean the channel and we're good to go. + stop_older_processing_rx.recv().await.ok(); + } + Err(_) => { + error!("Thumbnail actor died when trying to stop older processing"); + } + } + + // Closing the leftovers channel to stop the batch processor as we already sent + // an stop signal + leftovers_tx.close(); + while let Some((batch, kind)) = shutdown_leftovers_rx.next().await { + match kind { + ThumbnailKind::Indexed(library_id) => { + indexed_leftovers_queue.push_back((batch, library_id)) + } + ThumbnailKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch), + } + } + + // Consuming the last progress reports to keep everything up to date + shutdowm_batch_report_progress_rx.close(); + while let Some((location_id, progressed)) = + shutdowm_batch_report_progress_rx.next().await + { + bookkeeper.add_progress(location_id, progressed).await; + } + + // Saving state + ThumbsProcessingSaveState { + bookkeeper, + ephemeral_file_names, + queue, + indexed_leftovers_queue, + ephemeral_leftovers_queue, + } + .store(thumbnails_directory.as_ref()) + .await; + + // Signaling that we're done shutting down + cancel_tx.send(()).ok(); + return; + } + + StreamMessage::ProgressManagement((location_id, progress_tx)) => { + bookkeeper.register_reporter(location_id, progress_tx); + } + } + } +} diff --git a/core/src/util/version_manager.rs b/core/src/util/version_manager.rs index a982fb1f2..89e12d896 100644 --- a/core/src/util/version_manager.rs +++ b/core/src/util/version_manager.rs @@ -1,76 +1,114 @@ +use std::{ + any::type_name, + fmt::Display, + future::Future, + num::ParseIntError, + path::{Path, PathBuf}, + str::FromStr, +}; + use int_enum::IntEnum; -use std::fs; -use std::io::prelude::*; -use std::path::Path; -use std::str::FromStr; +use itertools::Itertools; use thiserror::Error; +use tokio::{fs, io}; +use tracing::info; + +use super::error::FileIOError; #[derive(Error, Debug)] pub enum VersionManagerError { - #[error("Invalid version")] + #[error("invalid version")] InvalidVersion, - #[error("Version file does not exist")] + #[error("version file does not exist")] VersionFileDoesNotExist, - #[error("Error while converting integer to enum")] + #[error("error while converting integer to enum")] IntConversionError, - #[error("Malformed version file")] + #[error("malformed version file")] MalformedVersionFile, + #[error("unexpected migration: {current_version} -> {next_version}")] + UnexpectedMigration { + current_version: i32, + next_version: i32, + }, + #[error(transparent)] - IO(#[from] std::io::Error), + FileIO(#[from] FileIOError), #[error(transparent)] - ParseIntError(#[from] std::num::ParseIntError), + ParseIntError(#[from] ParseIntError), +} + +pub trait ManagedVersion: IntEnum + Display + 'static { + const LATEST_VERSION: Self; + type MigrationError: std::error::Error + Display + From + 'static; } -/// /// An abstract system for saving a text file containing a version number. /// The version number is an integer that can be converted to and from an enum. /// The enum must implement the IntEnum trait. -/// -pub struct VersionManager> { - version_file_path: String, +pub struct VersionManager { + version_file_path: PathBuf, _marker: std::marker::PhantomData, } -impl> VersionManager { - pub fn new(version_file_path: &str) -> Self { +impl VersionManager { + pub fn new(version_file_path: impl AsRef) -> Self { VersionManager { - version_file_path: version_file_path.to_string(), + version_file_path: version_file_path.as_ref().into(), _marker: std::marker::PhantomData, } } - pub fn get_version(&self) -> Result { - if Path::new(&self.version_file_path).exists() { - let contents = fs::read_to_string(&self.version_file_path)?; - let version = i32::from_str(contents.trim())?; - T::from_int(version).map_err(|_| VersionManagerError::IntConversionError) - } else { - Err(VersionManagerError::VersionFileDoesNotExist) + pub async fn get_version(&self) -> Result { + match fs::read_to_string(&self.version_file_path).await { + Ok(contents) => { + let version = i32::from_str(contents.trim())?; + T::from_int(version).map_err(|_| VersionManagerError::IntConversionError) + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + Err(VersionManagerError::VersionFileDoesNotExist) + } + Err(e) => Err(FileIOError::from((&self.version_file_path, e)).into()), } } - pub fn set_version(&self, version: T) -> Result<(), VersionManagerError> { - let mut file = fs::File::create(&self.version_file_path)?; - file.write_all(version.int_value().to_string().as_bytes())?; - Ok(()) + pub async fn set_version(&self, version: T) -> Result<(), VersionManagerError> { + fs::write( + &self.version_file_path, + version.int_value().to_string().as_bytes(), + ) + .await + .map_err(|e| FileIOError::from((&self.version_file_path, e)).into()) } - // pub async fn migrate Result<(), VersionManagerError>>( - // &self, - // current: T, - // latest: T, - // mut migrate_fn: F, - // ) -> Result<(), VersionManagerError> { - // for version_int in (current.int_value() + 1)..=latest.int_value() { - // let version = match T::from_int(version_int) { - // Ok(version) => version, - // Err(_) => return Err(VersionManagerError::IntConversionError), - // }; - // migrate_fn(version)?; - // } + pub async fn migrate( + &self, + current: T, + migrate_fn: impl Fn(T, T) -> Fut, + ) -> Result<(), T::MigrationError> + where + Fut: Future>, + { + for (current_version, next_version) in + (current.int_value()..=T::LATEST_VERSION.int_value()).tuple_windows() + { + match (T::from_int(current_version), T::from_int(next_version)) { + (Ok(current), Ok(next)) => { + info!( + "Running {} migrator: {} -> {}", + type_name::(), + current, + next + ); + migrate_fn(current, next).await? + } + (Err(_), _) | (_, Err(_)) => { + return Err(VersionManagerError::IntConversionError.into()) + } + }; + } - // self.set_version(latest)?; - - // Ok(()) - // } + self.set_version(T::LATEST_VERSION) + .await + .map_err(Into::into) + } } diff --git a/crates/ffmpeg/src/error.rs b/crates/ffmpeg/src/error.rs index 910001d68..f399d3e92 100644 --- a/crates/ffmpeg/src/error.rs +++ b/crates/ffmpeg/src/error.rs @@ -15,7 +15,7 @@ use ffmpeg_sys_next::{ /// Error type for the library. #[derive(Error, Debug)] -pub enum ThumbnailerError { +pub enum Error { #[error("I/O Error: {0}")] Io(#[from] std::io::Error), #[error("Path conversion error: Path: {0:#?}")] diff --git a/crates/ffmpeg/src/lib.rs b/crates/ffmpeg/src/lib.rs index e1b6db295..c1170afe0 100644 --- a/crates/ffmpeg/src/lib.rs +++ b/crates/ffmpeg/src/lib.rs @@ -13,7 +13,7 @@ mod thumbnailer; mod utils; mod video_frame; -pub use error::ThumbnailerError; +pub use error::Error; pub use thumbnailer::{Thumbnailer, ThumbnailerBuilder}; /// Helper function to generate a thumbnail file from a video file with reasonable defaults @@ -22,7 +22,7 @@ pub async fn to_thumbnail( output_thumbnail_path: impl AsRef, size: u32, quality: f32, -) -> Result<(), ThumbnailerError> { +) -> Result<(), Error> { ThumbnailerBuilder::new() .with_film_strip(false) .size(size) @@ -37,7 +37,7 @@ pub async fn to_webp_bytes( video_file_path: impl AsRef, size: u32, quality: f32, -) -> Result, ThumbnailerError> { +) -> Result, Error> { ThumbnailerBuilder::new() .size(size) .quality(quality)? diff --git a/crates/ffmpeg/src/movie_decoder.rs b/crates/ffmpeg/src/movie_decoder.rs index 5b260857c..d0c59c145 100644 --- a/crates/ffmpeg/src/movie_decoder.rs +++ b/crates/ffmpeg/src/movie_decoder.rs @@ -1,5 +1,5 @@ use crate::{ - error::{FfmpegError, ThumbnailerError}, + error::{Error, FfmpegError}, utils::from_path, video_frame::{FfmpegFrame, FrameSource, VideoFrame}, }; @@ -48,7 +48,7 @@ impl MovieDecoder { pub(crate) fn new( filename: impl AsRef, prefer_embedded_metadata: bool, - ) -> Result { + ) -> Result { let filename = filename.as_ref(); let input_file = if filename == Path::new("-") { @@ -90,7 +90,7 @@ impl MovieDecoder { )?; } e => { - return Err(ThumbnailerError::FfmpegWithReason( + return Err(Error::FfmpegWithReason( FfmpegError::from(e), "Failed to open input".to_string(), )) @@ -102,7 +102,7 @@ impl MovieDecoder { // This needs to remain at 100 or the app will force crash if it comes // across a video with subtitles or any type of corruption. if (*decoder.format_context).probe_score != AVPROBE_SCORE_MAX { - return Err(ThumbnailerError::CorruptVideo); + return Err(Error::CorruptVideo); } } @@ -116,7 +116,7 @@ impl MovieDecoder { Ok(decoder) } - pub(crate) fn decode_video_frame(&mut self) -> Result<(), ThumbnailerError> { + pub(crate) fn decode_video_frame(&mut self) -> Result<(), Error> { let mut frame_finished = false; while !frame_finished && self.get_video_packet() { @@ -124,7 +124,7 @@ impl MovieDecoder { } if !frame_finished { - return Err(ThumbnailerError::FrameDecodeError); + return Err(Error::FrameDecodeError); } Ok(()) @@ -134,7 +134,7 @@ impl MovieDecoder { self.use_embedded_data } - pub(crate) fn seek(&mut self, seconds: i64) -> Result<(), ThumbnailerError> { + pub(crate) fn seek(&mut self, seconds: i64) -> Result<(), Error> { if !self.allow_seek { return Ok(()); } @@ -170,7 +170,7 @@ impl MovieDecoder { } if !got_frame { - return Err(ThumbnailerError::SeekError); + return Err(Error::SeekError); } Ok(()) @@ -181,7 +181,7 @@ impl MovieDecoder { scaled_size: Option, maintain_aspect_ratio: bool, video_frame: &mut VideoFrame, - ) -> Result<(), ThumbnailerError> { + ) -> Result<(), Error> { self.initialize_filter_graph( unsafe { &(*(*(*self.format_context) @@ -211,7 +211,7 @@ impl MovieDecoder { attempts += 1; } if ret < 0 { - return Err(ThumbnailerError::FfmpegWithReason( + return Err(Error::FfmpegWithReason( FfmpegError::from(ret), "Failed to get buffer from filter".to_string(), )); @@ -266,7 +266,7 @@ impl MovieDecoder { Duration::from_secs(unsafe { (*self.format_context).duration as u64 / AV_TIME_BASE as u64 }) } - fn initialize_video(&mut self, prefer_embedded_metadata: bool) -> Result<(), ThumbnailerError> { + fn initialize_video(&mut self, prefer_embedded_metadata: bool) -> Result<(), Error> { self.find_preferred_video_stream(prefer_embedded_metadata)?; self.video_stream = unsafe { @@ -309,10 +309,7 @@ impl MovieDecoder { ) } - fn find_preferred_video_stream( - &mut self, - prefer_embedded_metadata: bool, - ) -> Result<(), ThumbnailerError> { + fn find_preferred_video_stream(&mut self, prefer_embedded_metadata: bool) -> Result<(), Error> { let mut video_streams = vec![]; let mut embedded_data_streams = vec![]; let empty_cstring = CString::new("").unwrap(); @@ -404,7 +401,7 @@ impl MovieDecoder { frame_decoded } - fn decode_video_packet(&self) -> Result { + fn decode_video_packet(&self) -> Result { if unsafe { (*self.packet).stream_index } != self.video_stream_index { return Ok(false); } @@ -414,7 +411,7 @@ impl MovieDecoder { if ret == AVERROR_EOF { return Ok(false); } else if ret < 0 { - return Err(ThumbnailerError::FfmpegWithReason( + return Err(Error::FfmpegWithReason( FfmpegError::from(ret), "Failed to send packet to decoder".to_string(), )); @@ -423,7 +420,7 @@ impl MovieDecoder { match unsafe { avcodec_receive_frame(self.video_codec_context, self.frame) } { 0 => Ok(true), - e if e != AVERROR(EAGAIN) => Err(ThumbnailerError::FfmpegWithReason( + e if e != AVERROR(EAGAIN) => Err(Error::FfmpegWithReason( FfmpegError::from(e), "Failed to receive frame from decoder".to_string(), )), @@ -437,7 +434,7 @@ impl MovieDecoder { timebase: &AVRational, scaled_size: Option, maintain_aspect_ratio: bool, - ) -> Result<(), ThumbnailerError> { + ) -> Result<(), Error> { unsafe { self.filter_graph = avfilter_graph_alloc() }; if self.filter_graph.is_null() { return Err(FfmpegError::FilterGraphAllocation.into()); @@ -689,9 +686,9 @@ impl Drop for MovieDecoder { } } -fn check_error(return_code: i32, error_message: &str) -> Result<(), ThumbnailerError> { +fn check_error(return_code: i32, error_message: &str) -> Result<(), Error> { if return_code < 0 { - Err(ThumbnailerError::FfmpegWithReason( + Err(Error::FfmpegWithReason( FfmpegError::from(return_code), error_message.to_string(), )) @@ -707,7 +704,7 @@ fn setup_filter( args: &str, graph_ctx: *mut AVFilterGraph, error_message: &str, -) -> Result<(), ThumbnailerError> { +) -> Result<(), Error> { let filter_name_cstr = CString::new(filter_name).expect("CString from str"); let filter_setup_name_cstr = CString::new(filter_setup_name).expect("CString from str"); let args_cstr = CString::new(args).expect("CString from str"); @@ -733,7 +730,7 @@ fn setup_filter_without_args( filter_setup_name: &str, graph_ctx: *mut AVFilterGraph, error_message: &str, -) -> Result<(), ThumbnailerError> { +) -> Result<(), Error> { let filter_name_cstr = CString::new(filter_name).unwrap(); let filter_setup_name_cstr = CString::new(filter_setup_name).unwrap(); diff --git a/crates/ffmpeg/src/thumbnailer.rs b/crates/ffmpeg/src/thumbnailer.rs index aa54a625c..820c5a6ee 100644 --- a/crates/ffmpeg/src/thumbnailer.rs +++ b/crates/ffmpeg/src/thumbnailer.rs @@ -1,4 +1,4 @@ -use crate::{film_strip_filter, MovieDecoder, ThumbnailSize, ThumbnailerError, VideoFrame}; +use crate::{film_strip_filter, Error, MovieDecoder, ThumbnailSize, VideoFrame}; use std::{io, ops::Deref, path::Path}; use tokio::{fs, task::spawn_blocking}; @@ -18,7 +18,7 @@ impl Thumbnailer { &self, video_file_path: impl AsRef, output_thumbnail_path: impl AsRef, - ) -> Result<(), ThumbnailerError> { + ) -> Result<(), Error> { let path = output_thumbnail_path.as_ref().parent().ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, @@ -40,7 +40,7 @@ impl Thumbnailer { pub async fn process_to_webp_bytes( &self, video_file_path: impl AsRef, - ) -> Result, ThumbnailerError> { + ) -> Result, Error> { let video_file_path = video_file_path.as_ref().to_path_buf(); let prefer_embedded_metadata = self.builder.prefer_embedded_metadata; let seek_percentage = self.builder.seek_percentage; @@ -49,7 +49,7 @@ impl Thumbnailer { let with_film_strip = self.builder.with_film_strip; let quality = self.builder.quality; - spawn_blocking(move || -> Result, ThumbnailerError> { + spawn_blocking(move || -> Result, Error> { let mut decoder = MovieDecoder::new(video_file_path.clone(), prefer_embedded_metadata)?; // We actually have to decode a frame to get some metadata before we can start decoding for real decoder.decode_video_frame()?; @@ -149,18 +149,18 @@ impl ThumbnailerBuilder { } /// Seek percentage must be a value between 0.0 and 1.0 - pub fn seek_percentage(mut self, seek_percentage: f32) -> Result { + pub fn seek_percentage(mut self, seek_percentage: f32) -> Result { if !(0.0..=1.0).contains(&seek_percentage) { - return Err(ThumbnailerError::InvalidSeekPercentage(seek_percentage)); + return Err(Error::InvalidSeekPercentage(seek_percentage)); } self.seek_percentage = seek_percentage; Ok(self) } /// Quality must be a value between 0.0 and 100.0 - pub fn quality(mut self, quality: f32) -> Result { + pub fn quality(mut self, quality: f32) -> Result { if !(0.0..=100.0).contains(&quality) { - return Err(ThumbnailerError::InvalidQuality(quality)); + return Err(Error::InvalidQuality(quality)); } self.quality = quality; Ok(self) diff --git a/crates/ffmpeg/src/utils.rs b/crates/ffmpeg/src/utils.rs index 2ae136213..ea202f5d8 100644 --- a/crates/ffmpeg/src/utils.rs +++ b/crates/ffmpeg/src/utils.rs @@ -1,22 +1,21 @@ -use crate::error::ThumbnailerError; +use crate::error::Error; use std::ffi::CString; use std::path::Path; -pub fn from_path(path: impl AsRef) -> Result { +pub fn from_path(path: impl AsRef) -> Result { let path = path.as_ref(); let path_str = path.as_os_str(); #[cfg(unix)] { use std::os::unix::ffi::OsStrExt; - CString::new(path_str.as_bytes()) - .map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf())) + CString::new(path_str.as_bytes()).map_err(|_| Error::PathConversion(path.to_path_buf())) } #[cfg(not(unix))] { path_str .to_str() .and_then(|str| CString::new(str.as_bytes()).ok()) - .ok_or(ThumbnailerError::PathConversion(path.to_path_buf())) + .ok_or(Error::PathConversion(path.to_path_buf())) } } diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 941f66394..edd5316b4 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -241,9 +241,9 @@ export type InvalidateOperationEvent = { type: "single"; data: SingleInvalidateO export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] } -export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; message: string; estimated_completion: string } +export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string } -export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; message: string; estimated_completion: string } +export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string } export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused" | "CompletedWithErrors" diff --git a/packages/client/src/utils/jobs/useJobInfo.tsx b/packages/client/src/utils/jobs/useJobInfo.tsx index 2ae4d2c7e..4a716c9c2 100644 --- a/packages/client/src/utils/jobs/useJobInfo.tsx +++ b/packages/client/src/utils/jobs/useJobInfo.tsx @@ -23,6 +23,7 @@ export function useJobInfo(job: JobReport, realtimeUpdate: JobProgressEvent | nu indexedPath = job.metadata?.data?.location.path, taskCount = realtimeUpdate?.task_count || job.task_count, completedTaskCount = realtimeUpdate?.completed_task_count || job.completed_task_count, + phase = realtimeUpdate?.phase, meta = job.metadata, output = meta?.output?.run_metadata; @@ -59,35 +60,75 @@ export function useJobInfo(job: JobReport, realtimeUpdate: JobProgressEvent | nu ] ] }; - case 'media_processor': + case 'media_processor': { + const generateTexts = () => { + switch (phase) { + case 'media_data': { + return [ + { + text: `${ + completedTaskCount + ? formatNumber(completedTaskCount || 0) + : formatNumber(output?.media_data?.extracted) + } of ${formatNumber(taskCount)} ${plural( + taskCount, + 'media file' + )} processed` + } + ]; + } + + case 'thumbnails': { + return [ + { + text: `${ + completedTaskCount + ? formatNumber(completedTaskCount || 0) + : formatNumber(output?.thumbs_processed) + } of ${formatNumber(taskCount)} ${plural( + taskCount, + 'thumbnail' + )} generated` + } + ]; + } + + default: { + // If we don't have a phase set, then we're done + + const totalThumbs = output?.thumbs_processed || 0; + const totalMediaFiles = + output?.media_data?.extracted || 0 + output?.media_data?.skipped || 0; + + return totalThumbs === 0 && totalMediaFiles === 0 + ? [{ text: 'None processed' }] + : [ + { + text: `Extracted ${formatNumber(totalMediaFiles)} ${plural( + totalMediaFiles, + 'media file' + )}` + }, + { + text: `Generated ${formatNumber(totalThumbs)} ${plural( + totalThumbs, + 'thumb' + )}` + } + ]; + } + } + }; + return { ...data, name: `${ isQueued ? 'Process' : isRunning ? 'Processing' : 'Processed' } media files`, - textItems: [ - [ - { - text: - output?.thumbnails_created === 0 - ? 'None processed' - : `${ - completedTaskCount - ? formatNumber(completedTaskCount || 0) - : formatNumber(output?.thumbnails_created) - } of ${formatNumber(taskCount)} ${plural( - taskCount, - 'media file' - )} processed` - }, - { - text: - output?.thumbnails_skipped && - `${output?.thumbnails_skipped} already exist` - } - ] - ] + textItems: [generateTexts()] }; + } + case 'file_identifier': return { ...data,