From aa354f684ebf1db2edaf71855dd4ac18bc620b82 Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Mon, 16 Oct 2023 23:09:36 -0300 Subject: [PATCH] [ENG-1267] Move thumbnails generation away from media processor (#1585) * New vscode task to start developing * Updating db in case of library updates just in case * Done * Forgot to remove some debug logs * Rust fmt * Saving thumbnails processing state on app shutdown * bruh --- .vscode/tasks.json | 7 + core/src/api/mod.rs | 4 +- core/src/api/{api.rs => web_api.rs} | 0 core/src/lib.rs | 8 +- .../isolated_file_path_data.rs | 4 + core/src/location/manager/watcher/linux.rs | 2 +- core/src/location/manager/watcher/utils.rs | 31 +- core/src/location/non_indexed.rs | 54 +- core/src/object/media/media_data_extractor.rs | 47 +- core/src/object/media/media_processor/job.rs | 167 ++--- core/src/object/media/media_processor/mod.rs | 295 ++++---- .../object/media/media_processor/shallow.rs | 162 ++-- core/src/object/media/thumbnail/actor.rs | 689 ++++++++++++++---- core/src/object/media/thumbnail/directory.rs | 6 +- core/src/object/media/thumbnail/mod.rs | 310 +------- crates/file-ext/src/magic.rs | 2 +- 16 files changed, 961 insertions(+), 827 deletions(-) rename core/src/api/{api.rs => web_api.rs} (100%) diff --git a/.vscode/tasks.json b/.vscode/tasks.json index a895b9fa6..a81b441ce 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -19,6 +19,13 @@ "group": "none", "problemMatcher": ["$rustc"] }, + { + "type": "shell", + "label": "start", + "command": "sh", + "args": ["-c", "'pnpm i && pnpm prep'"], + "problemMatcher": ["$tsc-watch", "$rustc"] + }, { "type": "shell", "label": "ui:dev", diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 5cdfb86bf..234fc37fe 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -47,7 +47,6 @@ impl BackendFeature { } } -mod api; mod auth; mod backups; mod categories; @@ -65,6 +64,7 @@ mod sync; mod tags; pub mod utils; pub mod volumes; +mod web_api; // A version of [NodeConfig] that is safe to share with the frontend #[derive(Debug, Serialize, Deserialize, Clone, Type)] @@ -167,7 +167,7 @@ pub(crate) fn mount() -> Arc { Ok(()) }) }) - .merge("api.", api::mount()) + .merge("api.", web_api::mount()) .merge("auth.", auth::mount()) .merge("search.", search::mount()) .merge("library.", libraries::mount()) diff --git a/core/src/api/api.rs b/core/src/api/web_api.rs similarity index 100% rename from core/src/api/api.rs rename to core/src/api/web_api.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 20c4f98a3..a3b340e04 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -113,8 +113,13 @@ impl Node { notifications: notifications::Notifications::new(), p2p, config, + thumbnailer: Thumbnailer::new( + data_dir.to_path_buf(), + libraries.clone(), + event_bus.0.clone(), + ) + .await, event_bus, - thumbnailer: Thumbnailer::new(data_dir.to_path_buf(), libraries.clone()), libraries, files_over_p2p_flag: Arc::new(AtomicBool::new(false)), http: reqwest::Client::new(), @@ -210,6 +215,7 @@ impl Node { pub async fn shutdown(&self) { info!("Spacedrive shutting down..."); + self.thumbnailer.shutdown().await; self.jobs.shutdown().await; self.p2p.shutdown().await; info!("Spacedrive Core shutdown successful!"); diff --git a/core/src/location/file_path_helper/isolated_file_path_data.rs b/core/src/location/file_path_helper/isolated_file_path_data.rs index 7ae6612c6..4c02a32c5 100644 --- a/core/src/location/file_path_helper/isolated_file_path_data.rs +++ b/core/src/location/file_path_helper/isolated_file_path_data.rs @@ -84,6 +84,10 @@ impl<'a> IsolatedFilePathData<'a> { self.location_id } + pub fn extension(&self) -> &str { + self.extension.as_ref() + } + pub fn is_root(&self) -> bool { self.is_dir && self.materialized_path == "/" diff --git a/core/src/location/manager/watcher/linux.rs b/core/src/location/manager/watcher/linux.rs index 8c07e83b7..cada73058 100644 --- a/core/src/location/manager/watcher/linux.rs +++ b/core/src/location/manager/watcher/linux.rs @@ -66,7 +66,7 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { } async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> { - tracing::debug!("Received Linux event: {:#?}", event); + trace!("Received Linux event: {:#?}", event); let Event { kind, mut paths, .. diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 7d84b120d..0cc82606f 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -21,7 +21,7 @@ use crate::{ media::{ media_data_extractor::{can_extract_media_data_for_image, extract_media_data}, media_data_image_to_query, - thumbnail::{generate_thumbnail, get_thumbnail_path}, + thumbnail::get_thumbnail_path, }, validation::hash::file_checksum, }, @@ -59,6 +59,7 @@ use serde_json::json; use tokio::{ fs, io::{self, ErrorKind}, + spawn, time::Instant, }; use tracing::{debug, error, trace, warn}; @@ -278,13 +279,16 @@ async fn inner_create_file( if !extension.is_empty() && matches!(kind, ObjectKind::Image | ObjectKind::Video) { // Running in a detached task as thumbnail generation can take a while and we don't want to block the watcher - let inner_path = path.to_path_buf(); - let node = node.clone(); - let inner_extension = extension.clone(); + if let Some(cas_id) = cas_id { - tokio::spawn(async move { - if let Err(e) = - generate_thumbnail(&inner_extension, cas_id, inner_path, node, true).await + let extension = extension.clone(); + let path = path.to_path_buf(); + let node = node.clone(); + spawn(async move { + if let Err(e) = node + .thumbnailer + .generate_single_thumbnail(&extension, cas_id, path) + .await { error!("Failed to generate thumbnail in the watcher: {e:#?}"); } @@ -499,13 +503,14 @@ async fn inner_update_file( if library.thumbnail_exists(node, old_cas_id).await? { if let Some(ext) = file_path.extension.clone() { // Running in a detached task as thumbnail generation can take a while and we don't want to block the watcher - let inner_path = full_path.to_path_buf(); - let inner_node = node.clone(); if let Some(cas_id) = cas_id { - tokio::spawn(async move { - if let Err(e) = - generate_thumbnail(&ext, cas_id, inner_path, inner_node, true) - .await + let node = node.clone(); + let path = full_path.to_path_buf(); + spawn(async move { + if let Err(e) = node + .thumbnailer + .generate_single_thumbnail(&ext, cas_id, path) + .await { error!("Failed to generate thumbnail in the watcher: {e:#?}"); } diff --git a/core/src/location/non_indexed.rs b/core/src/location/non_indexed.rs index d990c634b..e9ad6c7a8 100644 --- a/core/src/location/non_indexed.rs +++ b/core/src/location/non_indexed.rs @@ -3,7 +3,10 @@ use crate::{ library::Library, object::{ cas::generate_cas_id, - media::thumbnail::{get_thumb_key, GenerateThumbnailArgs}, + media::thumbnail::{ + actor::{BatchToProcess, GenerateThumbnailArgs}, + get_thumb_key, + }, }, prisma::location, util::error::FileIOError, @@ -107,6 +110,8 @@ pub async fn walk( ); let mut thumbnails_to_generate = vec![]; + // Generating thumbnails for PDFs is kinda slow, so we're leaving them for last in the batch + let mut document_thumbnails_to_generate = vec![]; while let Some(entry) = read_dir.next_entry().await.map_err(|e| (path, e))? { let Ok((entry_path, name)) = normalize_path(entry.path()) @@ -161,20 +166,39 @@ pub async fn walk( .map(Into::into) .unwrap_or(ObjectKind::Unknown); - let thumbnail_key = if matches!( - kind, - ObjectKind::Image | ObjectKind::Video | ObjectKind::Document - ) { + let should_generate_thumbnail = { + #[cfg(feature = "ffmpeg")] + { + matches!( + kind, + ObjectKind::Image | ObjectKind::Video | ObjectKind::Document + ) + } + + #[cfg(not(feature = "ffmpeg"))] + { + matches!(kind, ObjectKind::Image | ObjectKind::Document) + } + }; + + let thumbnail_key = if should_generate_thumbnail { if let Ok(cas_id) = generate_cas_id(&path, metadata.len()) .await .map_err(|e| errors.push(NonIndexedLocationError::from((path, e)).into())) { - thumbnails_to_generate.push(GenerateThumbnailArgs::new( - extension.clone(), - cas_id.clone(), - path.to_path_buf(), - Arc::clone(&node), - )); + if kind == ObjectKind::Document { + document_thumbnails_to_generate.push(GenerateThumbnailArgs::new( + extension.clone(), + cas_id.clone(), + path.to_path_buf(), + )); + } else { + thumbnails_to_generate.push(GenerateThumbnailArgs::new( + extension.clone(), + cas_id.clone(), + path.to_path_buf(), + )); + } let thumbnail_key = get_thumb_key(&cas_id); @@ -204,8 +228,14 @@ pub async fn walk( } } + thumbnails_to_generate.extend(document_thumbnails_to_generate); + node.thumbnailer - .new_non_indexed_thumbnails_batch(thumbnails_to_generate) + .new_ephemeral_thumbnails_batch(BatchToProcess { + batch: thumbnails_to_generate, + should_regenerate: false, + in_background: false, + }) .await; let mut locations = library diff --git a/core/src/object/media/media_data_extractor.rs b/core/src/object/media/media_data_extractor.rs index 67a65b8f6..47b19071b 100644 --- a/core/src/object/media/media_data_extractor.rs +++ b/core/src/object/media/media_data_extractor.rs @@ -10,7 +10,7 @@ use sd_media_metadata::ImageMetadata; use std::{collections::HashSet, path::Path}; -use futures::future::join_all; +use futures_concurrency::future::Join; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -65,13 +65,13 @@ pub async fn extract_media_data(path: impl AsRef) -> Result, + files_paths: &[file_path_for_media_processor::Data], location_id: location::id::Type, location_path: impl AsRef, db: &PrismaClient, + ctx_update_fn: &impl Fn(usize), ) -> Result<(MediaDataExtractorMetadata, JobRunErrors), MediaDataError> { let mut run_metadata = MediaDataExtractorMetadata::default(); - let files_paths = files_paths.into_iter().collect::>(); if files_paths.is_empty() { return Ok((run_metadata, JobRunErrors::default())); } @@ -104,26 +104,29 @@ pub async fn process( run_metadata.skipped = objects_already_with_media_data.len() as u32; let (media_datas, errors) = { - let maybe_media_data = join_all( - files_paths - .into_iter() - .filter_map(|file_path| { - file_path.object_id.and_then(|object_id| { - (!objects_already_with_media_data.contains(&object_id)) - .then_some((file_path, object_id)) - }) + let maybe_media_data = files_paths + .iter() + .enumerate() + .filter_map(|(idx, file_path)| { + file_path.object_id.and_then(|object_id| { + (!objects_already_with_media_data.contains(&object_id)) + .then_some((idx, file_path, object_id)) }) - .filter_map(|(file_path, object_id)| { - IsolatedFilePathData::try_from((location_id, file_path)) - .map_err(|e| error!("{e:#?}")) - .ok() - .map(|iso_file_path| (location_path.join(iso_file_path), object_id)) - }) - .map(|(path, object_id)| async move { - (extract_media_data(&path).await, path, object_id) - }), - ) - .await; + }) + .filter_map(|(idx, file_path, object_id)| { + IsolatedFilePathData::try_from((location_id, file_path)) + .map_err(|e| error!("{e:#?}")) + .ok() + .map(|iso_file_path| (idx, location_path.join(iso_file_path), object_id)) + }) + .map(|(idx, path, object_id)| async move { + let res = extract_media_data(&path).await; + ctx_update_fn(idx + 1); + (res, path, object_id) + }) + .collect::>() + .join() + .await; let total_media_data = maybe_media_data.len(); diff --git a/core/src/object/media/media_processor/job.rs b/core/src/object/media/media_processor/job.rs index 2c3815c50..a5baacb27 100644 --- a/core/src/object/media/media_processor/job.rs +++ b/core/src/object/media/media_processor/job.rs @@ -9,26 +9,26 @@ use crate::{ ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, file_path_for_media_processor, IsolatedFilePathData, }, - object::media::media_data_extractor, - object::media::thumbnail::{self, init_thumbnail_dir}, prisma::{location, PrismaClient}, util::db::maybe_missing, }; use std::{ - collections::HashMap, + future::Future, hash::Hash, path::{Path, PathBuf}, }; use itertools::Itertools; +use prisma_client_rust::{raw, PrismaValue}; +use sd_file_ext::extensions::Extension; use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::{debug, info}; use super::{ - get_all_children_files_by_extensions, process, MediaProcessorEntry, MediaProcessorEntryKind, - MediaProcessorError, MediaProcessorMetadata, ThumbnailerEntryKind, + dispatch_thumbnails_for_processing, media_data_extractor, process, MediaProcessorError, + MediaProcessorMetadata, }; const BATCH_SIZE: usize = 10; @@ -51,17 +51,14 @@ impl Hash for MediaProcessorJobInit { #[derive(Debug, Serialize, Deserialize)] pub struct MediaProcessorJobData { - thumbnails_base_dir: PathBuf, location_path: PathBuf, to_process_path: PathBuf, } -type MediaProcessorJobStep = Vec; - #[async_trait::async_trait] impl StatefulJob for MediaProcessorJobInit { type Data = MediaProcessorJobData; - type Step = MediaProcessorJobStep; + type Step = Vec; type RunMetadata = MediaProcessorMetadata; const NAME: &'static str = "media_processor"; @@ -78,10 +75,6 @@ impl StatefulJob for MediaProcessorJobInit { ) -> Result, JobError> { let Library { db, .. } = ctx.library.as_ref(); - let thumbnails_base_dir = init_thumbnail_dir(ctx.node.config.data_directory()) - .await - .map_err(MediaProcessorError::from)?; - let location_id = self.location.id; let location_path = maybe_missing(&self.location.path, "location.path").map(PathBuf::from)?; @@ -120,53 +113,37 @@ impl StatefulJob for MediaProcessorJobInit { "Searching for media files in location {location_id} at directory \"{iso_file_path}\"" ); - let thumbnailer_files = get_files_for_thumbnailer(db, &iso_file_path).await?; + dispatch_thumbnails_for_processing( + location_id, + &location_path, + &iso_file_path, + &ctx.library, + &ctx.node, + false, + get_all_children_files_by_extensions, + ) + .await?; - let mut media_data_files_map = get_files_for_media_data_extraction(db, &iso_file_path) - .await? - .map(|file_path| (file_path.id, file_path)) - .collect::>(); + let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?; - let mut total_files_for_thumbnailer = 0; + let total_files = file_paths.len(); - let chunked_files = thumbnailer_files + let chunked_files = file_paths .into_iter() - .map(|(file_path, thumb_kind)| { - total_files_for_thumbnailer += 1; - MediaProcessorEntry { - operation_kind: if media_data_files_map.remove(&file_path.id).is_some() { - MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind) - } else { - MediaProcessorEntryKind::Thumbnailer(thumb_kind) - }, - file_path, - } - }) - .collect::>() - .into_iter() - .chain( - media_data_files_map - .into_values() - .map(|file_path| MediaProcessorEntry { - operation_kind: MediaProcessorEntryKind::MediaData, - file_path, - }), - ) .chunks(BATCH_SIZE) .into_iter() .map(|chunk| chunk.collect::>()) .collect::>(); ctx.progress(vec![ - JobReportUpdate::TaskCount(total_files_for_thumbnailer), + JobReportUpdate::TaskCount(total_files), JobReportUpdate::Message(format!( - "Preparing to process {total_files_for_thumbnailer} files in {} chunks", + "Preparing to process {total_files} files in {} chunks", chunked_files.len() )), ]); *data = Some(MediaProcessorJobData { - thumbnails_base_dir, location_path, to_process_path, }); @@ -177,18 +154,19 @@ impl StatefulJob for MediaProcessorJobInit { async fn execute_step( &self, ctx: &WorkerContext, - CurrentStep { step, step_number }: CurrentStep<'_, Self::Step>, + CurrentStep { + step: file_paths, + step_number, + }: CurrentStep<'_, Self::Step>, data: &Self::Data, _: &Self::RunMetadata, ) -> Result, JobError> { process( - step, + file_paths, self.location.id, &data.location_path, - &data.thumbnails_base_dir, - self.regenerate_thumbnails, - &ctx.library, - |completed_count| { + &ctx.library.db, + &|completed_count| { ctx.progress(vec![JobReportUpdate::CompletedTaskCount( step_number * BATCH_SIZE + completed_count, )]); @@ -214,7 +192,7 @@ impl StatefulJob for MediaProcessorJobInit { .display() ); - if run_metadata.thumbnailer.created > 0 || run_metadata.media_data.extracted > 0 { + if run_metadata.media_data.extracted > 0 { invalidate_query!(ctx.library, "search.paths"); } @@ -222,55 +200,60 @@ impl StatefulJob for MediaProcessorJobInit { } } -async fn get_files_for_thumbnailer( - db: &PrismaClient, - parent_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result< - impl Iterator, - MediaProcessorError, -> { - // query database for all image files in this location that need thumbnails - let image_thumb_files = get_all_children_files_by_extensions( - db, - parent_iso_file_path, - &thumbnail::THUMBNAILABLE_EXTENSIONS, - ) - .await? - .into_iter() - .map(|file_path| (file_path, ThumbnailerEntryKind::Image)); - - #[cfg(feature = "ffmpeg")] - let all_files = { - // query database for all video files in this location that need thumbnails - let video_files = get_all_children_files_by_extensions( - db, - parent_iso_file_path, - &thumbnail::THUMBNAILABLE_VIDEO_EXTENSIONS, - ) - .await?; - - image_thumb_files.chain( - video_files - .into_iter() - .map(|file_path| (file_path, ThumbnailerEntryKind::Video)), - ) - }; - #[cfg(not(feature = "ffmpeg"))] - let all_files = { image_thumb_files }; - - Ok(all_files) -} - async fn get_files_for_media_data_extraction( db: &PrismaClient, parent_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result, MediaProcessorError> { +) -> Result, MediaProcessorError> { get_all_children_files_by_extensions( db, parent_iso_file_path, &media_data_extractor::FILTERED_IMAGE_EXTENSIONS, ) .await - .map(|file_paths| file_paths.into_iter()) .map_err(Into::into) } + +fn get_all_children_files_by_extensions<'d, 'p, 'e, 'ret>( + db: &'d PrismaClient, + parent_iso_file_path: &'p IsolatedFilePathData<'_>, + extensions: &'e [Extension], +) -> impl Future, MediaProcessorError>> + 'ret +where + 'd: 'ret, + 'p: 'ret, + 'e: 'ret, +{ + async move { + // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite + // We have no data coming from the user, so this is sql injection safe + db._query_raw(raw!( + &format!( + "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id + FROM file_path + WHERE + location_id={{}} + AND cas_id IS NOT NULL + AND LOWER(extension) IN ({}) + AND materialized_path LIKE {{}} + ORDER BY materialized_path ASC", + // Orderind by materialized_path so we can prioritize processing the first files + // in the above part of the directories tree + extensions + .iter() + .map(|ext| format!("LOWER('{ext}')")) + .collect::>() + .join(",") + ), + PrismaValue::Int(parent_iso_file_path.location_id() as i64), + PrismaValue::String(format!( + "{}%", + parent_iso_file_path + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory") + )) + )) + .exec() + .await + .map_err(Into::into) + } +} diff --git a/core/src/object/media/media_processor/mod.rs b/core/src/object/media/media_processor/mod.rs index 489260cf6..2cfd998d1 100644 --- a/core/src/object/media/media_processor/mod.rs +++ b/core/src/object/media/media_processor/mod.rs @@ -4,21 +4,26 @@ use crate::{ location::file_path_helper::{ file_path_for_media_processor, FilePathError, IsolatedFilePathData, }, + util::db::{maybe_missing, MissingFieldError}, + Node, }; use sd_file_ext::extensions::Extension; use sd_prisma::prisma::{location, PrismaClient}; -use std::path::Path; +use std::{future::Future, path::Path}; -use futures::try_join; -use prisma_client_rust::{raw, PrismaValue}; use serde::{Deserialize, Serialize}; use thiserror::Error; +use tracing::error; use super::{ media_data_extractor::{self, MediaDataError, MediaDataExtractorMetadata}, - thumbnail::{self, ThumbnailerEntryKind, ThumbnailerError, ThumbnailerMetadata}, + thumbnail::{ + self, + actor::{BatchToProcess, GenerateThumbnailArgs}, + ThumbnailerError, + }, }; mod job; @@ -43,174 +48,152 @@ pub enum MediaProcessorError { MediaDataExtractor(#[from] MediaDataError), } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] -pub enum MediaProcessorEntryKind { - MediaData, - Thumbnailer(ThumbnailerEntryKind), - MediaDataAndThumbnailer(ThumbnailerEntryKind), -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct MediaProcessorEntry { - file_path: file_path_for_media_processor::Data, - operation_kind: MediaProcessorEntryKind, -} - #[derive(Debug, Serialize, Deserialize, Default)] pub struct MediaProcessorMetadata { media_data: MediaDataExtractorMetadata, - thumbnailer: ThumbnailerMetadata, +} + +impl From for MediaProcessorMetadata { + fn from(media_data: MediaDataExtractorMetadata) -> Self { + Self { media_data } + } } impl JobRunMetadata for MediaProcessorMetadata { fn update(&mut self, new_data: Self) { self.media_data.extracted += new_data.media_data.extracted; self.media_data.skipped += new_data.media_data.skipped; - - self.thumbnailer.created += new_data.thumbnailer.created; - self.thumbnailer.skipped += new_data.thumbnailer.skipped; } } -async fn get_all_children_files_by_extensions( - db: &PrismaClient, - parent_iso_file_path: &IsolatedFilePathData<'_>, - extensions: &[Extension], -) -> Result, MediaProcessorError> { - // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite - // We have no data coming from the user, so this is sql injection safe - db._query_raw(raw!( - &format!( - "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id - FROM file_path - WHERE - location_id={{}} - AND cas_id IS NOT NULL - AND LOWER(extension) IN ({}) - AND materialized_path LIKE {{}}", - extensions - .iter() - .map(|ext| format!("LOWER('{ext}')")) - .collect::>() - .join(",") - ), - PrismaValue::Int(parent_iso_file_path.location_id() as i64), - PrismaValue::String(format!( - "{}%", - parent_iso_file_path - .materialized_path_for_children() - .expect("sub path iso_file_path must be a directory") - )) - )) - .exec() - .await - .map_err(Into::into) -} - -async fn get_files_by_extensions( - db: &PrismaClient, - parent_iso_file_path: &IsolatedFilePathData<'_>, - extensions: &[Extension], -) -> Result, MediaDataError> { - // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite - // We have no data coming from the user, so this is sql injection safe - db._query_raw(raw!( - &format!( - "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id - FROM file_path - WHERE - location_id={{}} - AND cas_id IS NOT NULL - AND LOWER(extension) IN ({}) - AND materialized_path = {{}}", - extensions - .iter() - .map(|ext| format!("LOWER('{ext}')")) - .collect::>() - .join(",") - ), - PrismaValue::Int(parent_iso_file_path.location_id() as i64), - PrismaValue::String( - parent_iso_file_path - .materialized_path_for_children() - .expect("sub path iso_file_path must be a directory") - ) - )) - .exec() - .await - .map_err(Into::into) -} - -async fn process( - entries: &[MediaProcessorEntry], +// `thumbs_fetcher_fn` MUST return file_paths ordered by `materialized_path` for optimal results +async fn dispatch_thumbnails_for_processing<'d, 'p, 'e, 'ret, F>( location_id: location::id::Type, location_path: impl AsRef, - thumbnails_base_dir: impl AsRef, - regenerate_thumbnails: bool, - library: &Library, - ctx_update_fn: impl Fn(usize), -) -> Result<(MediaProcessorMetadata, JobRunErrors), MediaProcessorError> { + parent_iso_file_path: &'p IsolatedFilePathData<'_>, + library: &'d Library, + node: &Node, + should_regenerate: bool, + thumbs_fetcher_fn: impl Fn(&'d PrismaClient, &'p IsolatedFilePathData<'_>, &'e [Extension]) -> F, +) -> Result<(), MediaProcessorError> +where + 'd: 'ret, + 'p: 'ret, + 'e: 'ret, + F: Future, MediaProcessorError>> + + 'ret, +{ + let Library { db, .. } = library; + let location_path = location_path.as_ref(); - let ((media_data_metadata, mut media_data_errors), (thumbnailer_metadata, thumbnailer_errors)) = - try_join!( - async { - media_data_extractor::process( - entries.iter().filter_map( - |MediaProcessorEntry { - file_path, - operation_kind, - }| { - matches!( - operation_kind, - MediaProcessorEntryKind::MediaDataAndThumbnailer(_) - | MediaProcessorEntryKind::MediaData - ) - .then_some(file_path) - }, - ), - location_id, - location_path, - &library.db, - ) - .await - .map_err(MediaProcessorError::from) - }, - async { - thumbnail::process( - entries.iter().filter_map( - |MediaProcessorEntry { - file_path, - operation_kind, - }| { - if let MediaProcessorEntryKind::Thumbnailer(thumb_kind) - | MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind) = operation_kind - { - Some((file_path, *thumb_kind)) - } else { - None - } - }, - ), - location_id, - location_path, - thumbnails_base_dir, - regenerate_thumbnails, - library, - ctx_update_fn, - ) - .await - .map_err(MediaProcessorError::from) - }, - )?; + let file_paths = thumbs_fetcher_fn( + db, + parent_iso_file_path, + &thumbnail::ALL_THUMBNAILABLE_EXTENSIONS, + ) + .await?; - media_data_errors.0.extend(thumbnailer_errors.0.into_iter()); + let mut current_batch = Vec::with_capacity(16); - Ok(( - MediaProcessorMetadata { - media_data: media_data_metadata, - thumbnailer: thumbnailer_metadata, - }, - media_data_errors, - )) + // PDF thumbnails are currently way slower so we process them by last + let mut pdf_thumbs = Vec::with_capacity(16); + + let mut current_materialized_path = None; + + let mut in_background = false; + + for file_path in file_paths { + // Initializing current_materialized_path with the first file_path materialized_path + if current_materialized_path.is_none() { + current_materialized_path = file_path.materialized_path.clone(); + } + + if file_path.materialized_path != current_materialized_path + && (!current_batch.is_empty() || !pdf_thumbs.is_empty()) + { + // Now we found a different materialized_path so we dispatch the current batch and start a new one + + // We starting by appending all pdfs and leaving the vec clean to be reused + current_batch.append(&mut pdf_thumbs); + + node.thumbnailer + .new_indexed_thumbnails_batch(BatchToProcess { + batch: current_batch, + should_regenerate, + in_background, + }) + .await; + + // We moved our vec so we need a new + current_batch = Vec::with_capacity(16); + in_background = true; // Only the first batch should be processed in foreground + + // Exchaging for the first different materialized_path + current_materialized_path = file_path.materialized_path.clone(); + } + + let file_path_id = file_path.id; + if let Err(e) = add_to_batch( + location_id, + location_path, + file_path, + &mut current_batch, + &mut pdf_thumbs, + ) { + error!("Error adding file_path to thumbnail batch: {e:#?}"); + } + } + + // Dispatching the last batch + if !current_batch.is_empty() { + node.thumbnailer + .new_indexed_thumbnails_batch(BatchToProcess { + batch: current_batch, + should_regenerate, + in_background, + }) + .await; + } + + Ok(()) +} + +fn add_to_batch( + location_id: location::id::Type, + location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter + file_path: file_path_for_media_processor::Data, + current_batch: &mut Vec, + pdf_thumbs: &mut Vec, +) -> Result<(), MissingFieldError> { + let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone(); + + let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?; + let full_path = location_path.join(&iso_file_path); + + let extension = iso_file_path.extension(); + let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path); + + if extension != "pdf" { + current_batch.push(args); + } else { + pdf_thumbs.push(args); + } + + Ok(()) +} + +pub async fn process( + files_paths: &[file_path_for_media_processor::Data], + location_id: location::id::Type, + location_path: impl AsRef, + db: &PrismaClient, + ctx_update_fn: &impl Fn(usize), +) -> Result<(MediaProcessorMetadata, JobRunErrors), MediaProcessorError> { + // Add here new kinds of media processing if necessary in the future + + media_data_extractor::process(files_paths, location_id, location_path, db, ctx_update_fn) + .await + .map(|(media_data, errors)| (media_data.into(), errors)) + .map_err(Into::into) } diff --git a/core/src/object/media/media_processor/shallow.rs b/core/src/object/media/media_processor/shallow.rs index 8463f56c8..4312a04d4 100644 --- a/core/src/object/media/media_processor/shallow.rs +++ b/core/src/object/media/media_processor/shallow.rs @@ -6,25 +6,24 @@ use crate::{ ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, file_path_for_media_processor, IsolatedFilePathData, }, - object::media::{ - media_data_extractor, - thumbnail::{self, init_thumbnail_dir, ThumbnailerEntryKind}, - }, prisma::{location, PrismaClient}, util::db::maybe_missing, Node, }; use std::{ - collections::HashMap, + future::Future, path::{Path, PathBuf}, }; use itertools::Itertools; +use prisma_client_rust::{raw, PrismaValue}; +use sd_file_ext::extensions::Extension; use tracing::{debug, error}; use super::{ - get_files_by_extensions, process, MediaProcessorEntry, MediaProcessorEntryKind, + dispatch_thumbnails_for_processing, + media_data_extractor::{self, process}, MediaProcessorError, MediaProcessorMetadata, }; @@ -38,10 +37,6 @@ pub async fn shallow( ) -> Result<(), JobError> { let Library { db, .. } = library; - let thumbnails_base_dir = init_thumbnail_dir(node.config.data_directory()) - .await - .map_err(MediaProcessorError::from)?; - let location_id = location.id; let location_path = maybe_missing(&location.path, "location.path").map(PathBuf::from)?; @@ -73,43 +68,27 @@ pub async fn shallow( debug!("Searching for images in location {location_id} at path {iso_file_path}"); - let thumbnailer_files = get_files_for_thumbnailer(db, &iso_file_path).await?; + dispatch_thumbnails_for_processing( + location.id, + &location_path, + &iso_file_path, + library, + node, + false, + get_files_by_extensions, + ) + .await?; - let mut media_data_files_map = get_files_for_media_data_extraction(db, &iso_file_path) - .await? - .map(|file_path| (file_path.id, file_path)) - .collect::>(); + let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?; - let mut total_files = 0; + let total_files = file_paths.len(); - let chunked_files = thumbnailer_files + let chunked_files = file_paths .into_iter() - .map(|(file_path, thumb_kind)| MediaProcessorEntry { - operation_kind: if media_data_files_map.remove(&file_path.id).is_some() { - MediaProcessorEntryKind::MediaDataAndThumbnailer(thumb_kind) - } else { - MediaProcessorEntryKind::Thumbnailer(thumb_kind) - }, - file_path, - }) - .collect::>() - .into_iter() - .chain( - media_data_files_map - .into_values() - .map(|file_path| MediaProcessorEntry { - operation_kind: MediaProcessorEntryKind::MediaData, - file_path, - }), - ) .chunks(BATCH_SIZE) .into_iter() - .map(|chunk| { - let chunk = chunk.collect::>(); - total_files += chunk.len(); - chunk - }) - .collect::>(); + .map(Iterator::collect) + .collect::>>(); debug!( "Preparing to process {total_files} files in {} chunks", @@ -119,17 +98,11 @@ pub async fn shallow( let mut run_metadata = MediaProcessorMetadata::default(); for files in chunked_files { - let (more_run_metadata, errors) = process( - &files, - location.id, - &location_path, - &thumbnails_base_dir, - false, - library, - |_| {}, - ) - .await?; - run_metadata.update(more_run_metadata); + let (more_run_metadata, errors) = process(&files, location.id, &location_path, db, &|_| {}) + .await + .map_err(MediaProcessorError::from)?; + + run_metadata.update(more_run_metadata.into()); if !errors.is_empty() { error!("Errors processing chunk of media data shallow extraction:\n{errors}"); @@ -138,62 +111,63 @@ pub async fn shallow( debug!("Media shallow processor run metadata: {run_metadata:?}"); - if run_metadata.media_data.extracted > 0 || run_metadata.thumbnailer.created > 0 { + if run_metadata.media_data.extracted > 0 { invalidate_query!(library, "search.paths"); } Ok(()) } -async fn get_files_for_thumbnailer( - db: &PrismaClient, - parent_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result< - impl Iterator, - MediaProcessorError, -> { - // query database for all image files in this location that need thumbnails - let image_thumb_files = get_files_by_extensions( - db, - parent_iso_file_path, - &thumbnail::THUMBNAILABLE_EXTENSIONS, - ) - .await? - .into_iter() - .map(|file_path| (file_path, ThumbnailerEntryKind::Image)); - - #[cfg(feature = "ffmpeg")] - let all_files = { - // query database for all video files in this location that need thumbnails - let video_files = get_files_by_extensions( - db, - parent_iso_file_path, - &thumbnail::THUMBNAILABLE_VIDEO_EXTENSIONS, - ) - .await?; - - image_thumb_files.chain( - video_files - .into_iter() - .map(|file_path| (file_path, ThumbnailerEntryKind::Video)), - ) - }; - #[cfg(not(feature = "ffmpeg"))] - let all_files = { image_thumb_files }; - - Ok(all_files) -} - async fn get_files_for_media_data_extraction( db: &PrismaClient, parent_iso_file_path: &IsolatedFilePathData<'_>, -) -> Result, MediaProcessorError> { +) -> Result, MediaProcessorError> { get_files_by_extensions( db, parent_iso_file_path, &media_data_extractor::FILTERED_IMAGE_EXTENSIONS, ) .await - .map(|file_paths| file_paths.into_iter()) .map_err(Into::into) } + +fn get_files_by_extensions<'d, 'p, 'e, 'ret>( + db: &'d PrismaClient, + parent_iso_file_path: &'p IsolatedFilePathData<'_>, + extensions: &'e [Extension], +) -> impl Future, MediaProcessorError>> + 'ret +where + 'd: 'ret, + 'p: 'ret, + 'e: 'ret, +{ + async move { + // FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite + // We have no data coming from the user, so this is sql injection safe + db._query_raw(raw!( + &format!( + "SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id + FROM file_path + WHERE + location_id={{}} + AND cas_id IS NOT NULL + AND LOWER(extension) IN ({}) + AND materialized_path = {{}}", + extensions + .iter() + .map(|ext| format!("LOWER('{ext}')")) + .collect::>() + .join(",") + ), + PrismaValue::Int(parent_iso_file_path.location_id() as i64), + PrismaValue::String( + parent_iso_file_path + .materialized_path_for_children() + .expect("sub path iso_file_path must be a directory") + ) + )) + .exec() + .await + .map_err(Into::into) + } +} diff --git a/core/src/object/media/thumbnail/actor.rs b/core/src/object/media/thumbnail/actor.rs index cca9cb6bb..4185adeed 100644 --- a/core/src/object/media/thumbnail/actor.rs +++ b/core/src/object/media/thumbnail/actor.rs @@ -1,42 +1,69 @@ use crate::{ + api::CoreEvent, library::{Libraries, LibraryManagerEvent}, - object::media::thumbnail::ThumbnailerError, - prisma::{file_path, PrismaClient}, + object::media::thumbnail::{ + can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, + generate_image_thumbnail, get_shard_hex, get_thumb_key, ThumbnailerError, + }, util::error::{FileIOError, NonUtf8PathError}, }; +use sd_file_ext::extensions::{DocumentExtension, ImageExtension}; +use sd_prisma::prisma::{file_path, PrismaClient}; +use serde::{Deserialize, Serialize}; + use std::{ collections::{HashMap, HashSet, VecDeque}, ffi::OsStr, path::{Path, PathBuf}, - pin::pin, + str::FromStr, sync::Arc, time::{Duration, SystemTime}, }; use async_channel as chan; -use futures::{future::try_join_all, stream::FuturesUnordered, FutureExt}; +use futures::stream::FuturesUnordered; use futures_concurrency::{ - future::{Join, Race}, + future::{Join, Race, TryJoin}, stream::Merge, }; +use once_cell::sync::OnceCell; use thiserror::Error; use tokio::{ fs, io, spawn, - sync::oneshot, - time::{interval, interval_at, timeout, Instant, MissedTickBehavior}, + sync::{broadcast, oneshot, Mutex}, + time::{interval, interval_at, sleep, timeout, Instant, MissedTickBehavior}, }; use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{debug, error, trace}; +use tracing::{debug, error, trace, warn}; use uuid::Uuid; -use super::{generate_thumbnail, GenerateThumbnailArgs, THUMBNAIL_CACHE_DIR_NAME}; +use super::{init_thumbnail_dir, THUMBNAIL_CACHE_DIR_NAME}; const ONE_SEC: Duration = Duration::from_secs(1); const THIRTY_SECS: Duration = Duration::from_secs(30); const HALF_HOUR: Duration = Duration::from_secs(30 * 60); const ONE_WEEK: Duration = Duration::from_secs(7 * 24 * 60 * 60); +const SAVE_STATE_FILE: &str = "thumbs_to_process.bin"; + +static BATCH_SIZE: OnceCell = OnceCell::new(); + +#[derive(Debug, Serialize, Deserialize)] +pub struct GenerateThumbnailArgs { + pub extension: String, + pub cas_id: String, + pub path: PathBuf, +} + +impl GenerateThumbnailArgs { + pub fn new(extension: String, cas_id: String, path: PathBuf) -> Self { + Self { + extension, + cas_id, + path, + } + } +} #[derive(Error, Debug)] enum Error { @@ -53,56 +80,166 @@ enum Error { #[derive(Debug)] enum DatabaseMessage { Add(Uuid, Arc), + Update(Uuid, Arc), Remove(Uuid), } +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchToProcess { + pub batch: Vec, + pub should_regenerate: bool, + pub in_background: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +enum ProcessingKind { + Indexed, + Ephemeral, +} + // Thumbnails directory have the following structure: // thumbnails/ // ├── version.txt // └── [0..2]/ # sharding // └── .webp pub struct Thumbnailer { + thumbnails_directory: PathBuf, cas_ids_to_delete_tx: chan::Sender>, - ephemeral_thumbnails_to_generate_tx: chan::Sender>, - _cancel_loop: DropGuard, + thumbnails_to_generate_tx: chan::Sender<(BatchToProcess, ProcessingKind)>, + last_single_thumb_generated: Mutex, + reporter: broadcast::Sender, + cancel_tx: chan::Sender>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ThumbsProcessingSaveState { + ephemeral_cas_ids: HashSet, + // This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch + // by entering a new directory in the explorer, otherwise processing as FIFO + queue: VecDeque<(BatchToProcess, ProcessingKind)>, + // These below are FIFO queues, so we can process leftovers from the previous batch first + indexed_leftovers_queue: VecDeque, + ephemeral_leftovers_queue: VecDeque, +} + +impl Default for ThumbsProcessingSaveState { + fn default() -> Self { + Self { + ephemeral_cas_ids: HashSet::with_capacity(128), + queue: VecDeque::with_capacity(32), + indexed_leftovers_queue: VecDeque::with_capacity(8), + ephemeral_leftovers_queue: VecDeque::with_capacity(8), + } + } +} + +impl ThumbsProcessingSaveState { + async fn load(thumbnails_directory: impl AsRef) -> Self { + let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); + + match fs::read(&resume_file).await { + Ok(bytes) => { + let this = rmp_serde::from_slice::(&bytes).unwrap_or_else(|e| { + error!("Failed to deserialize save state at thumbnailer actor: {e:#?}"); + Self::default() + }); + + if let Err(e) = fs::remove_file(&resume_file).await { + error!( + "Failed to remove save state file at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + } + + this + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + trace!("No save state found at thumbnailer actor"); + Self::default() + } + Err(e) => { + error!( + "Failed to read save state at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + Self::default() + } + } + } + + async fn store(self, thumbnails_directory: impl AsRef) { + let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE); + + let Ok(bytes) = rmp_serde::to_vec_named(&self).map_err(|e| { + error!("Failed to serialize save state at thumbnailer actor: {e:#?}"); + }) else { + return; + }; + + if let Err(e) = fs::write(&resume_file, bytes).await { + error!( + "Failed to write save state at thumbnailer actor: {:#?}", + FileIOError::from((resume_file, e)) + ); + } + } } impl Thumbnailer { - pub fn new(data_dir: PathBuf, lm: Arc) -> Self { - let mut thumbnails_directory = data_dir; - thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME); + pub async fn new( + data_dir: PathBuf, + lm: Arc, + reporter: broadcast::Sender, + ) -> Self { + let thumbnails_directory = init_thumbnail_dir(&data_dir).await.unwrap_or_else(|e| { + error!("Failed to initialize thumbnail directory: {e:#?}"); + let mut thumbnails_directory = data_dir; + thumbnails_directory.push(THUMBNAIL_CACHE_DIR_NAME); + thumbnails_directory + }); let (databases_tx, databases_rx) = chan::bounded(4); - let (ephemeral_thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) = - chan::unbounded(); + let (thumbnails_to_generate_tx, ephemeral_thumbnails_to_generate_rx) = chan::unbounded(); let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16); - let cancel_token = CancellationToken::new(); + let (cancel_tx, cancel_rx) = chan::bounded(1); - let inner_cancel_token = cancel_token.child_token(); - tokio::spawn(async move { - loop { - if let Err(e) = tokio::spawn(Self::worker( - thumbnails_directory.clone(), - databases_rx.clone(), - cas_ids_to_delete_rx.clone(), - ephemeral_thumbnails_to_generate_rx.clone(), - inner_cancel_token.child_token(), - )) - .await - { - error!( - "Error on Thumbnail Remover Actor; \ + BATCH_SIZE + .set(std::thread::available_parallelism().map_or_else( + |e| { + error!("Failed to get available parallelism: {e:#?}"); + 4 + }, + |non_zero| { + let count = non_zero.get(); + debug!("Thumbnailer will process batches of {count} thumbnails in parallel."); + count + }, + )) + .ok(); + + let inner_cancel_rx = cancel_rx.clone(); + let inner_thumbnails_directory = thumbnails_directory.clone(); + let inner_reporter = reporter.clone(); + spawn(async move { + while let Err(e) = spawn(Self::worker( + inner_reporter.clone(), + inner_thumbnails_directory.clone(), + databases_rx.clone(), + cas_ids_to_delete_rx.clone(), + ephemeral_thumbnails_to_generate_rx.clone(), + inner_cancel_rx.clone(), + )) + .await + { + error!( + "Error on Thumbnail Remover Actor; \ Error: {e}; \ Restarting the worker loop...", - ); - } - if inner_cancel_token.is_cancelled() { - break; - } + ); } }); - tokio::spawn({ + spawn({ let rx = lm.rx.clone(); async move { if let Err(err) = rx @@ -113,15 +250,29 @@ impl Thumbnailer { match event { LibraryManagerEvent::Load(library) => { if databases_tx - .send(DatabaseMessage::Add(library.id, library.db.clone())) + .send(DatabaseMessage::Add( + library.id, + Arc::clone(&library.db), + )) + .await + .is_err() + { + error!("Thumbnail remover actor is dead") + } + } + LibraryManagerEvent::Edit(library) + | LibraryManagerEvent::InstancesModified(library) => { + if databases_tx + .send(DatabaseMessage::Update( + library.id, + Arc::clone(&library.db), + )) .await .is_err() { error!("Thumbnail remover actor is dead") } } - LibraryManagerEvent::Edit(_) => {} - LibraryManagerEvent::InstancesModified(_) => {} LibraryManagerEvent::Delete(library) => { if databases_tx .send(DatabaseMessage::Remove(library.id)) @@ -142,18 +293,22 @@ impl Thumbnailer { }); Self { + thumbnails_directory, cas_ids_to_delete_tx, - ephemeral_thumbnails_to_generate_tx, - _cancel_loop: cancel_token.drop_guard(), + thumbnails_to_generate_tx, + last_single_thumb_generated: Mutex::new(Instant::now()), + reporter, + cancel_tx, } } async fn worker( + reporter: broadcast::Sender, thumbnails_directory: PathBuf, databases_rx: chan::Receiver, cas_ids_to_delete_rx: chan::Receiver>, - ephemeral_thumbnails_to_generate_rx: chan::Receiver>, - cancel_token: CancellationToken, + thumbnails_to_generate_rx: chan::Receiver<(BatchToProcess, ProcessingKind)>, + cancel_rx: chan::Receiver>, ) { let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR); to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -162,31 +317,31 @@ impl Thumbnailer { idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut databases = HashMap::new(); - let mut ephemeral_thumbnails_cas_ids = HashSet::new(); #[derive(Debug)] enum StreamMessage { RemovalTick, ToDelete(Vec), Database(DatabaseMessage), - EphemeralThumbnailNewBatch(Vec), - Leftovers(Vec), + NewBatch((BatchToProcess, ProcessingKind)), + Leftovers((BatchToProcess, ProcessingKind)), NewEphemeralThumbnailCasIds(Vec), - Stop, + Shutdown(oneshot::Sender<()>), IdleTick, } - let cancel = pin!(cancel_token.cancelled()); - - // This is a LIFO queue, so we can process the most recent thumbnails first - let mut ephemeral_thumbnails_queue = Vec::with_capacity(8); - - // This one is a FIFO queue, so we can process leftovers from the previous batch first - let mut ephemeral_thumbnails_leftovers_queue = VecDeque::with_capacity(8); + let ThumbsProcessingSaveState { + mut ephemeral_cas_ids, + mut queue, + mut indexed_leftovers_queue, + mut ephemeral_leftovers_queue, + } = ThumbsProcessingSaveState::load(&thumbnails_directory).await; let (ephemeral_thumbnails_cas_ids_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32); let (leftovers_tx, leftovers_rx) = chan::bounded(8); + let mut shutdown_leftovers_rx = leftovers_rx.clone(); + let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1); let mut current_batch_processing_rx: Option> = None; @@ -194,12 +349,12 @@ impl Thumbnailer { let mut msg_stream = ( databases_rx.map(StreamMessage::Database), cas_ids_to_delete_rx.map(StreamMessage::ToDelete), - ephemeral_thumbnails_to_generate_rx.map(StreamMessage::EphemeralThumbnailNewBatch), + thumbnails_to_generate_rx.map(StreamMessage::NewBatch), leftovers_rx.map(StreamMessage::Leftovers), ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailCasIds), IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick), IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick), - cancel.into_stream().map(|()| StreamMessage::Stop), + cancel_rx.map(StreamMessage::Shutdown), ) .merge(); @@ -217,32 +372,35 @@ impl Thumbnailer { } if current_batch_processing_rx.is_none() - && (!ephemeral_thumbnails_queue.is_empty() - || !ephemeral_thumbnails_leftovers_queue.is_empty()) + && (!queue.is_empty() + || !indexed_leftovers_queue.is_empty() + || !ephemeral_leftovers_queue.is_empty()) { let (done_tx, done_rx) = oneshot::channel(); current_batch_processing_rx = Some(done_rx); - if let Some(batch) = ephemeral_thumbnails_queue.pop() { - spawn(batch_processor( - batch, - ephemeral_thumbnails_cas_ids_tx.clone(), - stop_older_processing_rx.clone(), + let batch_and_kind = if let Some(batch_and_kind) = queue.pop_front() { + batch_and_kind + } else if let Some(batch) = indexed_leftovers_queue.pop_front() { + // indexed leftovers have bigger priority + (batch, ProcessingKind::Indexed) + } else if let Some(batch) = ephemeral_leftovers_queue.pop_front() { + (batch, ProcessingKind::Ephemeral) + } else { + continue; + }; + + spawn(batch_processor( + thumbnails_directory.clone(), + batch_and_kind, + ephemeral_thumbnails_cas_ids_tx.clone(), + ProcessorControlChannels { + stop_rx: stop_older_processing_rx.clone(), done_tx, - leftovers_tx.clone(), - false, - )); - } else if let Some(batch) = ephemeral_thumbnails_leftovers_queue.pop_front() - { - spawn(batch_processor( - batch, - ephemeral_thumbnails_cas_ids_tx.clone(), - stop_older_processing_rx.clone(), - done_tx, - leftovers_tx.clone(), - true, - )); - } + }, + leftovers_tx.clone(), + reporter.clone(), + )); } } @@ -252,7 +410,7 @@ impl Thumbnailer { if let Err(e) = Self::process_clean_up( &thumbnails_directory, databases.values(), - &ephemeral_thumbnails_cas_ids, + &ephemeral_cas_ids, ) .await { @@ -270,31 +428,115 @@ impl Thumbnailer { } } - StreamMessage::EphemeralThumbnailNewBatch(batch) => { - ephemeral_thumbnails_queue.push(batch); - if current_batch_processing_rx.is_some() // Only sends stop signal if there is a batch being processed - && stop_older_processing_tx.send(()).await.is_err() - { - error!("Thumbnail remover actor died when trying to stop older processing"); + StreamMessage::NewBatch((batch, kind)) => { + let in_background = batch.in_background; + + trace!( + "New {kind:?} batch to process in {}, size: {}", + if in_background { + "background" + } else { + "foreground" + }, + batch.batch.len() + ); + + if in_background { + queue.push_back((batch, kind)); + } else { + // If a processing must be in foreground, then it takes maximum priority + queue.push_front((batch, kind)); + } + + // Only sends stop signal if there is a batch being processed + if !in_background && current_batch_processing_rx.is_some() { + trace!("Sending stop signal to older processing"); + let (tx, rx) = oneshot::channel(); + + match stop_older_processing_tx.try_send(tx) { + Ok(()) => { + // We put a timeout here to avoid a deadlock in case the older processing already + // finished its batch + if timeout(ONE_SEC, rx).await.is_err() { + stop_older_processing_rx.recv().await.ok(); + } + } + Err(e) if e.is_full() => { + // The last signal we sent happened after a batch was already processed + // So we clean the channel and we're good to go. + stop_older_processing_rx.recv().await.ok(); + } + Err(_) => { + error!("Thumbnail remover actor died when trying to stop older processing"); + } + } } } - StreamMessage::Leftovers(batch) => { - ephemeral_thumbnails_leftovers_queue.push_back(batch); - } + StreamMessage::Leftovers((batch, kind)) => match kind { + ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch), + ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch), + }, - StreamMessage::Database(DatabaseMessage::Add(id, db)) => { + StreamMessage::Database(DatabaseMessage::Add(id, db)) + | StreamMessage::Database(DatabaseMessage::Update(id, db)) => { databases.insert(id, db); } StreamMessage::Database(DatabaseMessage::Remove(id)) => { databases.remove(&id); } StreamMessage::NewEphemeralThumbnailCasIds(cas_ids) => { - ephemeral_thumbnails_cas_ids.extend(cas_ids); + trace!("New ephemeral thumbnail cas ids: {}", cas_ids.len()); + ephemeral_cas_ids.extend(cas_ids); } - StreamMessage::Stop => { - debug!("Thumbnail remover actor is stopping"); - break; + StreamMessage::Shutdown(cancel_tx) => { + debug!("Thumbnail actor is shutting down..."); + + // First stopping the current batch processing + let (tx, rx) = oneshot::channel(); + match stop_older_processing_tx.try_send(tx) { + Ok(()) => { + // We put a timeout here to avoid a deadlock in case the older processing already + // finished its batch + if timeout(ONE_SEC, rx).await.is_err() { + stop_older_processing_rx.recv().await.ok(); + } + } + Err(e) if e.is_full() => { + // The last signal we sent happened after a batch was already processed + // So we clean the channel and we're good to go. + stop_older_processing_rx.recv().await.ok(); + } + Err(_) => { + error!( + "Thumbnail remover actor died when trying to stop older processing" + ); + } + } + + // Closing the leftovers channel to stop the batch processor as we already sent + // an stop signal + leftovers_tx.close(); + while let Some((batch, kind)) = shutdown_leftovers_rx.next().await { + match kind { + ProcessingKind::Indexed => indexed_leftovers_queue.push_back(batch), + ProcessingKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch), + } + } + + // Saving state + ThumbsProcessingSaveState { + ephemeral_cas_ids, + queue, + indexed_leftovers_queue, + ephemeral_leftovers_queue, + } + .store(thumbnails_directory) + .await; + + // Signaling that we're done shutting down + cancel_tx.send(()).ok(); + return; } } } @@ -304,19 +546,23 @@ impl Thumbnailer { thumbnails_directory: &Path, cas_ids: Vec, ) -> Result<(), Error> { - try_join_all(cas_ids.into_iter().map(|cas_id| async move { - let thumbnail_path = - thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2])); + cas_ids + .into_iter() + .map(|cas_id| async move { + let thumbnail_path = + thumbnails_directory.join(format!("{}/{cas_id}.webp", &cas_id[0..2])); - trace!("Removing thumbnail: {}", thumbnail_path.display()); + trace!("Removing thumbnail: {}", thumbnail_path.display()); - match fs::remove_file(&thumbnail_path).await { - Ok(()) => Ok(()), - Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(FileIOError::from((thumbnail_path, e))), - } - })) - .await?; + match fs::remove_file(&thumbnail_path).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(FileIOError::from((thumbnail_path, e))), + } + }) + .collect::>() + .try_join() + .await?; Ok(()) } @@ -324,7 +570,7 @@ impl Thumbnailer { async fn process_clean_up( thumbnails_directory: &Path, databases: impl Iterator>, - non_indexed_thumbnails_cas_ids: &HashSet, + ephemeral_cas_ids: &HashSet, ) -> Result<(), Error> { let databases = databases.collect::>(); @@ -415,13 +661,13 @@ impl Thumbnailer { }); } - thumbnails_paths_by_cas_id - .retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id)); + thumbnails_paths_by_cas_id.retain(|cas_id, _| !ephemeral_cas_ids.contains(cas_id)); let now = SystemTime::now(); - let removed_count = try_join_all(thumbnails_paths_by_cas_id.into_values().map( - |path| async move { + let removed_count = thumbnails_paths_by_cas_id + .into_values() + .map(|path| async move { if let Ok(metadata) = fs::metadata(&path).await { if metadata .accessed() @@ -438,17 +684,18 @@ impl Thumbnailer { } } - tracing::warn!("Removing stale thumbnail: {}", path.display()); + trace!("Removing stale thumbnail: {}", path.display()); fs::remove_file(&path) .await .map(|()| true) .map_err(|e| FileIOError::from((path, e))) - }, - )) - .await? - .into_iter() - .filter(|r| *r) - .count(); + }) + .collect::>() + .try_join() + .await? + .into_iter() + .filter(|r| *r) + .count(); if thumbs_found == removed_count { // if we removed all the thumnails we found, it means that the directory is empty @@ -466,10 +713,11 @@ impl Thumbnailer { Ok(()) } - pub async fn new_non_indexed_thumbnails_batch(&self, batch: Vec) { + #[inline] + async fn new_batch(&self, batch: BatchToProcess, kind: ProcessingKind) { if self - .ephemeral_thumbnails_to_generate_tx - .send(batch) + .thumbnails_to_generate_tx + .send((batch, kind)) .await .is_err() { @@ -477,45 +725,130 @@ impl Thumbnailer { } } + pub async fn new_ephemeral_thumbnails_batch(&self, batch: BatchToProcess) { + self.new_batch(batch, ProcessingKind::Ephemeral).await; + } + + pub async fn new_indexed_thumbnails_batch(&self, batch: BatchToProcess) { + self.new_batch(batch, ProcessingKind::Indexed).await; + } + pub async fn remove_cas_ids(&self, cas_ids: Vec) { if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() { error!("Thumbnail remover actor is dead: Failed to send cas ids to delete"); } } + + pub async fn shutdown(&self) { + let (tx, rx) = oneshot::channel(); + if self.cancel_tx.send(tx).await.is_err() { + error!("Thumbnail remover actor is dead: Failed to send shutdown signal"); + } else { + rx.await.ok(); + } + } + + /// WARNING!!!! DON'T USE THIS METHOD IN A LOOP!!!!!!!!!!!!! It will be pretty slow on purpose! + pub async fn generate_single_thumbnail( + &self, + extension: &str, + cas_id: String, + path: impl AsRef, + ) -> Result<(), ThumbnailerError> { + let mut last_single_thumb_generated_guard = self.last_single_thumb_generated.lock().await; + + let elapsed = Instant::now() - *last_single_thumb_generated_guard; + if elapsed < ONE_SEC { + // This will choke up in case someone try to use this method in a loop, otherwise + // it will consume all the machine resources like a gluton monster from hell + sleep(ONE_SEC - elapsed).await; + } + + let res = generate_thumbnail( + self.thumbnails_directory.clone(), + extension, + cas_id, + path, + false, + false, + self.reporter.clone(), + ) + .await + .map(|_| ()); + + *last_single_thumb_generated_guard = Instant::now(); + + res + } +} + +struct ProcessorControlChannels { + stop_rx: chan::Receiver>, + done_tx: oneshot::Sender<()>, } async fn batch_processor( - batch: Vec, + thumbnails_directory: PathBuf, + ( + BatchToProcess { + batch, + should_regenerate, + in_background, + }, + kind, + ): (BatchToProcess, ProcessingKind), generated_cas_ids_tx: chan::Sender>, - stop_rx: chan::Receiver<()>, - done_tx: oneshot::Sender<()>, - leftovers_tx: chan::Sender>, - in_background: bool, + ProcessorControlChannels { stop_rx, done_tx }: ProcessorControlChannels, + leftovers_tx: chan::Sender<(BatchToProcess, ProcessingKind)>, + reporter: broadcast::Sender, ) { + trace!( + "Processing thumbnails batch of kind {kind:?} with size {} in {}", + batch.len(), + if in_background { + "background" + } else { + "foreground" + }, + ); + + // Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning + // This from is guaranteed to be O(1) let mut queue = VecDeque::from(batch); enum RaceOutputs { Processed, - Stop, + Stop(oneshot::Sender<()>), } // Need this borrow here to satisfy the async move below let generated_cas_ids_tx = &generated_cas_ids_tx; while !queue.is_empty() { - let chunk = (0..4) + let chunk = (0..*BATCH_SIZE + .get() + .expect("BATCH_SIZE is set at thumbnailer new method")) .filter_map(|_| queue.pop_front()) .map( |GenerateThumbnailArgs { extension, cas_id, path, - node, }| { + let reporter = reporter.clone(); + let thumbnails_directory = thumbnails_directory.clone(); spawn(async move { timeout( THIRTY_SECS, - generate_thumbnail(&extension, cas_id, &path, node, in_background), + generate_thumbnail( + thumbnails_directory, + &extension, + cas_id, + &path, + in_background, + should_regenerate, + reporter, + ), ) .await .unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path()))) @@ -524,7 +857,7 @@ async fn batch_processor( ) .collect::>(); - if let RaceOutputs::Stop = ( + if let RaceOutputs::Stop(tx) = ( async move { let cas_ids = chunk .join() @@ -554,33 +887,121 @@ async fn batch_processor( RaceOutputs::Processed }, async { - stop_rx + let tx = stop_rx .recv() .await .expect("Critical error on thumbnails actor"); trace!("Received a stop signal"); - RaceOutputs::Stop + RaceOutputs::Stop(tx) }, ) .race() .await { - // Our queue is always contiguous, so this `from`` is free + // Our queue is always contiguous, so this `from` is free let leftovers = Vec::from(queue); trace!( "Stopped with {} thumbnails left to process", leftovers.len() ); - if !leftovers.is_empty() && leftovers_tx.send(leftovers).await.is_err() { + if !leftovers.is_empty() + && leftovers_tx + .send(( + BatchToProcess { + batch: leftovers, + should_regenerate, + in_background: true, // Leftovers should always be in background + }, + kind, + )) + .await + .is_err() + { error!("Thumbnail remover actor is dead: Failed to send leftovers") } done_tx.send(()).ok(); + tx.send(()).ok(); return; } } + trace!("Finished batch!"); + done_tx.send(()).ok(); } + +async fn generate_thumbnail( + thumbnails_directory: PathBuf, + extension: &str, + cas_id: String, + path: impl AsRef, + in_background: bool, + should_regenerate: bool, + reporter: broadcast::Sender, +) -> Result { + let path = path.as_ref(); + trace!("Generating thumbnail for {}", path.display()); + + let mut output_path = thumbnails_directory; + output_path.push(get_shard_hex(&cas_id)); + output_path.push(&cas_id); + output_path.set_extension("webp"); + + if let Err(e) = fs::metadata(&output_path).await { + if e.kind() != io::ErrorKind::NotFound { + error!( + "Failed to check if thumbnail exists, but we will try to generate it anyway: {e:#?}" + ); + } + // Otherwise we good, thumbnail doesn't exist so we can generate it + } else if !should_regenerate { + trace!( + "Skipping thumbnail generation for {} because it already exists", + path.display() + ); + return Ok(cas_id); + } + + if let Ok(extension) = ImageExtension::from_str(extension) { + if can_generate_thumbnail_for_image(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } else if let Ok(extension) = DocumentExtension::from_str(extension) { + if can_generate_thumbnail_for_document(&extension) { + generate_image_thumbnail(&path, &output_path).await?; + } + } + + #[cfg(feature = "ffmpeg")] + { + use crate::object::media::thumbnail::{ + can_generate_thumbnail_for_video, generate_video_thumbnail, + }; + use sd_file_ext::extensions::VideoExtension; + + if let Ok(extension) = VideoExtension::from_str(extension) { + if can_generate_thumbnail_for_video(&extension) { + generate_video_thumbnail(&path, &output_path).await?; + } + } + } + + if !in_background { + trace!("Emitting new thumbnail event"); + if reporter + .send(CoreEvent::NewThumbnail { + thumb_key: get_thumb_key(&cas_id), + }) + .is_err() + { + warn!("Error sending event to Node's event bus"); + } + } + + trace!("Generated thumbnail for {}", path.display()); + + Ok(cas_id) +} diff --git a/core/src/object/media/thumbnail/directory.rs b/core/src/object/media/thumbnail/directory.rs index 0de9cf416..a923258eb 100644 --- a/core/src/object/media/thumbnail/directory.rs +++ b/core/src/object/media/thumbnail/directory.rs @@ -1,6 +1,6 @@ use crate::util::{error::FileIOError, version_manager::VersionManager}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use int_enum::IntEnum; use tokio::fs; @@ -16,9 +16,9 @@ enum ThumbnailVersion { Unknown = 0, } -pub async fn init_thumbnail_dir(data_dir: PathBuf) -> Result { +pub async fn init_thumbnail_dir(data_dir: impl AsRef) -> Result { debug!("Initializing thumbnail directory"); - let thumbnail_dir = data_dir.join(THUMBNAIL_CACHE_DIR_NAME); + let thumbnail_dir = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME); let version_file = thumbnail_dir.join("version.txt"); let version_manager = diff --git a/core/src/object/media/thumbnail/mod.rs b/core/src/object/media/thumbnail/mod.rs index c52cd611f..3d9aa2e44 100644 --- a/core/src/object/media/thumbnail/mod.rs +++ b/core/src/object/media/thumbnail/mod.rs @@ -1,9 +1,4 @@ use crate::{ - api::CoreEvent, - job::JobRunErrors, - library::Library, - location::file_path_helper::{file_path_for_media_processor, IsolatedFilePathData}, - prisma::location, util::{error::FileIOError, version_manager::VersionManagerError}, Node, }; @@ -18,20 +13,16 @@ use sd_media_metadata::image::Orientation; use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS}; use std::{ - collections::HashMap, ops::Deref, path::{Path, PathBuf}, - str::FromStr, - sync::Arc, }; -use futures::future::{join_all, try_join_all}; use image::{self, imageops, DynamicImage, GenericImageView}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::{fs, io, task}; -use tracing::{error, trace, warn}; +use tokio::{fs, task}; +use tracing::error; use webp::Encoder; pub mod actor; @@ -87,6 +78,18 @@ pub(super) static THUMBNAILABLE_EXTENSIONS: Lazy> = Lazy::new(|| .collect() }); +pub static ALL_THUMBNAILABLE_EXTENSIONS: Lazy> = Lazy::new(|| { + if cfg!(feature = "ffmpeg") { + THUMBNAILABLE_EXTENSIONS + .iter() + .cloned() + .chain(THUMBNAILABLE_VIDEO_EXTENSIONS.iter().cloned()) + .collect() + } else { + THUMBNAILABLE_EXTENSIONS.clone() + } +}); + #[derive(Error, Debug)] pub enum ThumbnailerError { // Internal errors @@ -217,288 +220,3 @@ pub const fn can_generate_thumbnail_for_document(document_extension: &DocumentEx matches!(document_extension, Pdf) } - -pub(super) async fn process( - entries: impl IntoIterator, - location_id: location::id::Type, - location_path: impl AsRef, - thumbnails_base_dir: impl AsRef, - regenerate: bool, - library: &Library, - ctx_update_fn: impl Fn(usize), -) -> Result<(ThumbnailerMetadata, JobRunErrors), ThumbnailerError> { - let mut run_metadata = ThumbnailerMetadata::default(); - - let location_path = location_path.as_ref(); - let thumbnails_base_dir = thumbnails_base_dir.as_ref(); - let mut errors = vec![]; - - let mut to_create_dirs = HashMap::new(); - - struct WorkTable<'a> { - kind: ThumbnailerEntryKind, - input_path: PathBuf, - cas_id: &'a str, - output_path: PathBuf, - metadata_res: io::Result<()>, - } - - let entries = entries - .into_iter() - .filter_map(|(file_path, kind)| { - IsolatedFilePathData::try_from((location_id, file_path)) - .map(|iso_file_path| (file_path, kind, location_path.join(iso_file_path))) - .map_err(|e| { - errors.push(format!( - "Failed to build path for file with id {}: {e}", - file_path.id - )) - }) - .ok() - }) - .filter_map(|(file_path, kind, path)| { - if let Some(cas_id) = &file_path.cas_id { - Some((kind, path, cas_id)) - } else { - warn!( - "Skipping thumbnail generation for {} due to missing cas_id", - path.display() - ); - run_metadata.skipped += 1; - None - } - }) - .map(|(kind, input_path, cas_id)| { - let thumbnails_shard_dir = thumbnails_base_dir.join(get_shard_hex(cas_id)); - let output_path = thumbnails_shard_dir.join(format!("{cas_id}.webp")); - - // Putting all sharding directories in a map to avoid trying to create repeteaded ones - to_create_dirs - .entry(thumbnails_shard_dir.clone()) - .or_insert_with(|| async move { - fs::create_dir_all(&thumbnails_shard_dir) - .await - .map_err(|e| FileIOError::from((thumbnails_shard_dir, e))) - }); - - async move { - WorkTable { - kind, - input_path, - cas_id, - // Discarding the ok part as we don't actually care about metadata here, maybe avoiding extra space - metadata_res: fs::metadata(&output_path).await.map(|_| ()), - output_path, - } - } - }) - .collect::>(); - if entries.is_empty() { - return Ok((run_metadata, errors.into())); - } - - // Resolving these futures first, as we want to fail early if we can't create the directories - try_join_all(to_create_dirs.into_values()).await?; - - // Running thumbs generation sequentially to don't overload the system, if we're wasting too much time on I/O we can - // try to run them in parallel - for ( - idx, - WorkTable { - kind, - input_path, - cas_id, - output_path, - metadata_res, - }, - ) in join_all(entries).await.into_iter().enumerate() - { - ctx_update_fn(idx + 1); - match metadata_res { - Ok(_) => { - if !regenerate { - trace!( - "Thumbnail already exists, skipping generation for {}", - input_path.display() - ); - run_metadata.skipped += 1; - } else { - tracing::debug!( - "Renegerating thumbnail {} to {}", - input_path.display(), - output_path.display() - ); - process_single_thumbnail( - cas_id, - kind, - &input_path, - &output_path, - &mut errors, - &mut run_metadata, - library, - ) - .await; - } - } - - Err(e) if e.kind() == io::ErrorKind::NotFound => { - trace!( - "Writing {} to {}", - input_path.display(), - output_path.display() - ); - - process_single_thumbnail( - cas_id, - kind, - &input_path, - &output_path, - &mut errors, - &mut run_metadata, - library, - ) - .await; - } - Err(e) => { - error!( - "Error getting metadata for thumb: {:#?}", - FileIOError::from((output_path, e)) - ); - errors.push(format!( - "Had an error generating thumbnail for \"{}\"", - input_path.display() - )); - } - } - } - - Ok((run_metadata, errors.into())) -} - -// Using &Path as this function if private only to this module, always being used with a &Path, so we -// don't pay the compile price for generics -async fn process_single_thumbnail( - cas_id: &str, - kind: ThumbnailerEntryKind, - input_path: &Path, - output_path: &Path, - errors: &mut Vec, - run_metadata: &mut ThumbnailerMetadata, - library: &Library, -) { - match kind { - ThumbnailerEntryKind::Image => { - if let Err(e) = generate_image_thumbnail(&input_path, &output_path).await { - error!( - "Error generating thumb for image \"{}\": {e:#?}", - input_path.display() - ); - errors.push(format!( - "Had an error generating thumbnail for \"{}\"", - input_path.display() - )); - - return; - } - } - #[cfg(feature = "ffmpeg")] - ThumbnailerEntryKind::Video => { - if let Err(e) = generate_video_thumbnail(&input_path, &output_path).await { - error!( - "Error generating thumb for video \"{}\": {e:#?}", - input_path.display() - ); - errors.push(format!( - "Had an error generating thumbnail for \"{}\"", - input_path.display() - )); - - return; - } - } - } - - trace!("Emitting new thumbnail event"); - library.emit(CoreEvent::NewThumbnail { - thumb_key: get_thumb_key(cas_id), - }); - run_metadata.created += 1; -} - -// TODO(fogodev): Unify how we generate thumbnails - -#[derive(Debug)] -pub struct GenerateThumbnailArgs { - pub extension: String, - pub cas_id: String, - pub path: PathBuf, - pub node: Arc, -} - -impl GenerateThumbnailArgs { - pub fn new(extension: String, cas_id: String, path: PathBuf, node: Arc) -> Self { - Self { - extension, - cas_id, - path, - node, - } - } -} - -pub async fn generate_thumbnail( - extension: &str, - cas_id: String, - path: impl AsRef, - node: Arc, - in_background: bool, -) -> Result { - let path = path.as_ref(); - trace!("Generating thumbnail for {}", path.display()); - let output_path = get_thumbnail_path(&node, &cas_id); - - if let Err(e) = fs::metadata(&output_path).await { - if e.kind() != io::ErrorKind::NotFound { - error!( - "Failed to check if thumbnail exists, but we will try to generate it anyway: {e}" - ); - } - // Otherwise we good, thumbnail doesn't exist so we can generate it - } else { - trace!( - "Skipping thumbnail generation for {} because it already exists", - path.display() - ); - return Ok(cas_id); - } - - if let Ok(extension) = ImageExtension::from_str(extension) { - if can_generate_thumbnail_for_image(&extension) { - generate_image_thumbnail(&path, &output_path).await?; - } - } else if let Ok(extension) = DocumentExtension::from_str(extension) { - if can_generate_thumbnail_for_document(&extension) { - generate_image_thumbnail(&path, &output_path).await?; - } - } - - #[cfg(feature = "ffmpeg")] - { - if let Ok(extension) = VideoExtension::from_str(extension) { - if can_generate_thumbnail_for_video(&extension) { - generate_video_thumbnail(&path, &output_path).await?; - } - } - } - - if !in_background { - trace!("Emitting new thumbnail event"); - node.emit(CoreEvent::NewThumbnail { - thumb_key: get_thumb_key(&cas_id), - }); - } - - trace!("Generated thumbnail for {}", path.display()); - - Ok(cas_id) -} diff --git a/crates/file-ext/src/magic.rs b/crates/file-ext/src/magic.rs index 19b8704f4..6f424cffa 100644 --- a/crates/file-ext/src/magic.rs +++ b/crates/file-ext/src/magic.rs @@ -54,7 +54,7 @@ macro_rules! extension_enum { } ) => { // construct enum - #[derive(Debug, ::serde::Serialize, ::serde::Deserialize, PartialEq, Eq)] + #[derive(Debug, ::serde::Serialize, ::serde::Deserialize, PartialEq, Eq, Clone)] pub enum Extension { $( $variant($type), )* }