[ENG-1320 | ENG-1340] Separate thumbnails FS location by library | Make the thumbnails actor report back its progress (#1656)

* Renaming error type on ffmpeg subcrate

* Version manager overhaul

* Reworked thumbnail actor

* Updating sharding scheme

* New migration system for thumbnails

* Updating search to new actor

* Updating custom_uri to new thumbnail improvements

* Updating library to new thumbnail stuff

* LibraryId type alias

* Updating indexer to new thumbnail actor

* Updating watcher to new thumbnail actor

* Update location manager to use LibraryId type alias

* Updating location metadata to new LibraryId type alias

* New LocationPubId type alias

* Updating ephemeral walker to new thumbnail stuff

* Updating media processor to new thumbnail actor

* New thumbnailer actor state manager

* Introducing the concept of job phases

* Segregating the thumbnailer actor worker fn

* Fixes on job pausing

* Processing batches with progress reporting

* Updated actor

* Updated media processor

* Small tweaks

* Updating non indexed walker

* Changing a UI string
This commit is contained in:
Ericson "Fogo" Soares
2023-10-26 01:48:29 -03:00
committed by GitHub
parent ed6c76fe90
commit 033e61ac33
33 changed files with 2426 additions and 1409 deletions

View File

@@ -8,7 +8,7 @@ use crate::{
file_path_helper::{check_file_path_exists, IsolatedFilePathData},
non_indexed, LocationError,
},
object::media::thumbnail::get_thumb_key,
object::media::thumbnail::get_indexed_thumb_key,
prisma::{self, file_path, location, object, tag, tag_on_object, PrismaClient},
};
@@ -589,7 +589,10 @@ pub fn mount() -> AlphaRouter<Ctx> {
items.push(ExplorerItem::Path {
has_local_thumbnail: thumbnail_exists_locally,
thumbnail_key: file_path.cas_id.as_ref().map(|i| get_thumb_key(i)),
thumbnail_key: file_path
.cas_id
.as_ref()
.map(|i| get_indexed_thumb_key(i, library.id)),
item: file_path,
})
}
@@ -738,7 +741,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
items.push(ExplorerItem::Object {
has_local_thumbnail: thumbnail_exists_locally,
thumbnail_key: cas_id.map(|i| get_thumb_key(i)),
thumbnail_key: cas_id.map(|i| get_indexed_thumb_key(i, library.id)),
item: object,
});
}

View File

@@ -2,6 +2,7 @@ use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
location::file_path_helper::{file_path_to_handle_custom_uri, IsolatedFilePathData},
object::media::thumbnail::WEBP_EXTENSION,
p2p::{sync::InstanceState, IdentityOrRemoteIdentity},
prisma::{file_path, location},
util::{db::*, InfallibleResponse},
@@ -160,7 +161,7 @@ pub fn router(node: Arc<Node>) -> Router<()> {
// Prevent directory traversal attacks (Eg. requesting `../../../etc/passwd`)
// For now we only support `webp` thumbnails.
(path.starts_with(&thumbnail_path)
&& path.extension() == Some(OsStr::new("webp")))
&& path.extension() == Some(WEBP_EXTENSION.as_ref()))
.then_some(())
.ok_or_else(|| not_found(()))?;

View File

@@ -482,9 +482,9 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
let target_location = init.target_location();
let stateful_job = Arc::new(init);
let mut stateful_job = Arc::new(init);
let ctx = Arc::new(ctx);
let mut ctx = Arc::new(ctx);
let mut job_should_run = true;
let job_init_time = Instant::now();
@@ -497,7 +497,6 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
let init_time = Instant::now();
let init_task = {
let ctx = Arc::clone(&ctx);
let stateful_job = Arc::clone(&stateful_job);
spawn(async move {
let mut new_data = None;
let res = stateful_job.init(&ctx, &mut new_data).await;
@@ -508,11 +507,15 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
}
}
(new_data, res)
(stateful_job, new_data, res)
})
};
let InitPhaseOutput { maybe_data, output } = handle_init_phase::<SJob>(
let InitPhaseOutput {
stateful_job: returned_stateful_job,
maybe_data,
output,
} = handle_init_phase::<SJob>(
JobRunWorkTable {
id: job_id,
name: job_name,
@@ -525,6 +528,8 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
)
.await?;
stateful_job = returned_stateful_job;
match output {
Ok(JobInitOutput {
run_metadata: new_run_metadata,
@@ -547,13 +552,13 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
// Run the job until it's done or we get a command
let data = if let Some(working_data) = working_data {
let working_data_arc = Arc::new(working_data);
let mut working_data_arc = Arc::new(working_data);
// Job run phase
while job_should_run && !steps.is_empty() {
let steps_len = steps.len();
let steps_len: usize = steps.len();
let run_metadata_arc = Arc::new(run_metadata);
let mut run_metadata_arc = Arc::new(run_metadata);
let step = Arc::new(steps.pop_front().expect("just checked that we have steps"));
let init_time = Instant::now();
@@ -584,6 +589,13 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
let JobStepsPhaseOutput {
steps: returned_steps,
output,
step_arcs:
(
returned_ctx,
returned_run_metadata_arc,
returned_working_data_arc,
returned_stateful_job,
),
} = handle_single_step::<SJob>(
JobRunWorkTable {
id: job_id,
@@ -593,10 +605,11 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
},
&job_init_time,
(
Arc::clone(&ctx),
Arc::clone(&run_metadata_arc),
Arc::clone(&working_data_arc),
Arc::clone(&stateful_job),
// Must not hold extra references here; moving and getting back on function completion
ctx,
run_metadata_arc,
working_data_arc,
stateful_job,
),
JobStepDataWorkTable {
step_number,
@@ -609,6 +622,10 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
.await?;
steps = returned_steps;
ctx = returned_ctx;
run_metadata_arc = returned_run_metadata_arc;
working_data_arc = returned_working_data_arc;
stateful_job = returned_stateful_job;
run_metadata =
Arc::try_unwrap(run_metadata_arc).expect("step already ran, no more refs");
@@ -751,6 +768,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
}
struct InitPhaseOutput<SJob: StatefulJob> {
stateful_job: Arc<SJob>,
maybe_data: Option<SJob::Data>,
output: Result<JobInitOutput<SJob::RunMetadata, SJob::Step>, JobError>,
}
@@ -763,6 +781,7 @@ struct JobRunWorkTable {
}
type InitTaskOutput<SJob> = (
Arc<SJob>,
Option<<SJob as StatefulJob>::Data>,
Result<
JobInitOutput<<SJob as StatefulJob>::RunMetadata, <SJob as StatefulJob>::Step>,
@@ -806,13 +825,17 @@ async fn handle_init_phase<SJob: StatefulJob>(
);
return Err(join_error.into());
}
StreamMessage::InitResult(Ok((maybe_data, output))) => {
StreamMessage::InitResult(Ok((stateful_job, maybe_data, output))) => {
debug!(
"Init phase took {:?} Job <id='{id}', name='{name}'>",
init_time.elapsed()
);
return Ok(InitPhaseOutput { maybe_data, output });
return Ok(InitPhaseOutput {
stateful_job,
maybe_data,
output,
});
}
StreamMessage::NewCommand(WorkerCommand::IdentifyYourself(tx)) => {
if tx
@@ -978,6 +1001,7 @@ struct JobStepDataWorkTable<SJob: StatefulJob> {
struct JobStepsPhaseOutput<SJob: StatefulJob> {
steps: VecDeque<SJob::Step>,
output: StepTaskOutput<SJob>,
step_arcs: StepArcs<SJob>,
}
type StepArcs<SJob> = (
@@ -1033,7 +1057,11 @@ async fn handle_single_step<SJob: StatefulJob>(
init_time.elapsed(),
);
return Ok(JobStepsPhaseOutput { steps, output });
return Ok(JobStepsPhaseOutput {
steps,
output,
step_arcs: (worker_ctx, run_metadata, working_data, stateful_job),
});
}
StreamMessage::NewCommand(WorkerCommand::IdentifyYourself(tx)) => {
if tx

View File

@@ -19,6 +19,7 @@ pub enum JobReportUpdate {
TaskCount(usize),
CompletedTaskCount(usize),
Message(String),
Phase(String),
}
job::select!(job_without_data {
@@ -57,6 +58,7 @@ pub struct JobReport {
pub task_count: i32,
pub completed_task_count: i32,
pub phase: String,
pub message: String,
pub estimated_completion: DateTime<Utc>,
}
@@ -102,6 +104,7 @@ impl TryFrom<job::Data> for JobReport {
.expect("corrupted database"),
task_count: data.task_count.unwrap_or(0),
completed_task_count: data.completed_task_count.unwrap_or(0),
phase: String::new(),
message: String::new(),
estimated_completion: data
.date_estimated_completion
@@ -144,6 +147,7 @@ impl TryFrom<job_without_data::Data> for JobReport {
task_count: data.task_count.unwrap_or(0),
completed_task_count: data.completed_task_count.unwrap_or(0),
phase: String::new(),
message: String::new(),
estimated_completion: data
.date_estimated_completion
@@ -169,6 +173,7 @@ impl JobReport {
metadata: None,
parent_id: None,
completed_task_count: 0,
phase: String::new(),
message: String::new(),
estimated_completion: Utc::now(),
}
@@ -319,6 +324,7 @@ impl JobReportBuilder {
metadata: self.metadata,
parent_id: self.parent_id,
completed_task_count: 0,
phase: String::new(),
message: String::new(),
estimated_completion: Utc::now(),
}

View File

@@ -40,6 +40,7 @@ pub struct JobProgressEvent {
pub library_id: Uuid,
pub task_count: i32,
pub completed_task_count: i32,
pub phase: String,
pub message: String,
pub estimated_completion: DateTime<Utc>,
}
@@ -287,6 +288,14 @@ impl Worker {
trace!("job {} message: {}", report.id, message);
report.message = message;
}
JobReportUpdate::Phase(phase) => {
trace!(
"changing Job <id='{}'> phase: {} -> {phase}",
report.id,
report.phase
);
report.phase = phase;
}
}
}
@@ -323,6 +332,7 @@ impl Worker {
task_count: report.task_count,
completed_task_count: report.completed_task_count,
estimated_completion: report.estimated_completion,
phase: report.phase.clone(),
message: report.message.clone(),
}));
}

View File

@@ -5,7 +5,7 @@ use crate::{
},
location::file_path_helper::{file_path_to_full_path, IsolatedFilePathData},
notifications,
object::{media::thumbnail::get_thumbnail_path, orphan_remover::OrphanRemoverActor},
object::{media::thumbnail::get_indexed_thumbnail_path, orphan_remover::OrphanRemoverActor},
prisma::{file_path, location, PrismaClient},
sync,
util::{db::maybe_missing, error::FileIOError},
@@ -120,7 +120,7 @@ impl Library {
}
pub async fn thumbnail_exists(&self, node: &Node, cas_id: &str) -> Result<bool, FileIOError> {
let thumb_path = get_thumbnail_path(node, cas_id);
let thumb_path = get_indexed_thumbnail_path(node, cas_id, self.id);
match fs::metadata(&thumb_path).await {
Ok(_) => Ok(true),

View File

@@ -10,3 +10,5 @@ pub use config::*;
pub use library::*;
pub use manager::*;
pub use name::*;
pub type LibraryId = uuid::Uuid;

View File

@@ -225,11 +225,12 @@ impl StatefulJob for IndexerJobInit {
ctx.node
.thumbnailer
.remove_cas_ids(
.remove_indexed_cas_ids(
to_remove
.iter()
.filter_map(|file_path| file_path.cas_id.clone())
.collect::<Vec<_>>(),
ctx.library.id,
)
.await;

View File

@@ -91,11 +91,12 @@ pub async fn shallow(
let to_remove_count = to_remove.len();
node.thumbnailer
.remove_cas_ids(
.remove_indexed_cas_ids(
to_remove
.iter()
.filter_map(|file_path| file_path.cas_id.clone())
.collect::<Vec<_>>(),
library.id,
)
.await;

View File

@@ -1,4 +1,9 @@
use crate::{library::Library, prisma::location, util::db::maybe_missing, Node};
use crate::{
library::{Library, LibraryId},
prisma::location,
util::db::maybe_missing,
Node,
};
use std::{
collections::{HashMap, HashSet},
@@ -13,7 +18,6 @@ use uuid::Uuid;
use super::{watcher::LocationWatcher, LocationManagerError};
type LibraryId = Uuid;
type LocationAndLibraryKey = (location::id::Type, LibraryId);
const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);

View File

@@ -21,7 +21,7 @@ use crate::{
media::{
media_data_extractor::{can_extract_media_data_for_image, extract_media_data},
media_data_image_to_query,
thumbnail::get_thumbnail_path,
thumbnail::get_indexed_thumbnail_path,
},
validation::hash::file_checksum,
},
@@ -285,10 +285,11 @@ async fn inner_create_file(
let extension = extension.clone();
let path = path.to_path_buf();
let node = node.clone();
let library_id = library.id;
spawn(async move {
if let Err(e) = node
.thumbnailer
.generate_single_thumbnail(&extension, cas_id, path)
.generate_single_indexed_thumbnail(&extension, cas_id, path, library_id)
.await
{
error!("Failed to generate thumbnail in the watcher: {e:#?}");
@@ -522,10 +523,13 @@ async fn inner_update_file(
if let Some(cas_id) = cas_id {
let node = node.clone();
let path = full_path.to_path_buf();
let library_id = library.id;
spawn(async move {
if let Err(e) = node
.thumbnailer
.generate_single_thumbnail(&ext, cas_id, path)
.generate_single_indexed_thumbnail(
&ext, cas_id, path, library_id,
)
.await
{
error!("Failed to generate thumbnail in the watcher: {e:#?}");
@@ -534,7 +538,7 @@ async fn inner_update_file(
}
// remove the old thumbnail as we're generating a new one
let thumb_path = get_thumbnail_path(node, old_cas_id);
let thumb_path = get_indexed_thumbnail_path(node, old_cas_id, library.id);
fs::remove_file(&thumb_path)
.await
.map_err(|e| FileIOError::from((thumb_path, e)))?;

View File

@@ -1,3 +1,5 @@
use crate::library::LibraryId;
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
@@ -10,10 +12,9 @@ use tokio::{fs, io};
use tracing::error;
use uuid::Uuid;
static SPACEDRIVE_LOCATION_METADATA_FILE: &str = ".spacedrive";
use super::LocationPubId;
pub(super) type LibraryId = Uuid;
pub(super) type LocationPubId = Uuid;
static SPACEDRIVE_LOCATION_METADATA_FILE: &str = ".spacedrive";
#[derive(Serialize, Deserialize, Default, Debug)]
struct LocationMetadata {

View File

@@ -49,6 +49,8 @@ use metadata::SpacedriveLocationMetadataFile;
use file_path_helper::IsolatedFilePathData;
pub type LocationPubId = Uuid;
// Location includes!
location::include!(location_with_indexer_rules {
indexer_rules: select { indexer_rule }

View File

@@ -3,10 +3,7 @@ use crate::{
library::Library,
object::{
cas::generate_cas_id,
media::thumbnail::{
actor::{BatchToProcess, GenerateThumbnailArgs},
get_thumb_key,
},
media::thumbnail::{get_ephemeral_thumb_key, BatchToProcess, GenerateThumbnailArgs},
},
prisma::location,
util::error::FileIOError,
@@ -200,9 +197,7 @@ pub async fn walk(
));
}
let thumbnail_key = get_thumb_key(&cas_id);
Some(thumbnail_key)
Some(get_ephemeral_thumb_key(&cas_id))
} else {
None
}
@@ -231,11 +226,7 @@ pub async fn walk(
thumbnails_to_generate.extend(document_thumbnails_to_generate);
node.thumbnailer
.new_ephemeral_thumbnails_batch(BatchToProcess {
batch: thumbnails_to_generate,
should_regenerate: false,
in_background: false,
})
.new_ephemeral_thumbnails_batch(BatchToProcess::new(thumbnails_to_generate, false, false))
.await;
let mut locations = library

View File

@@ -10,25 +10,30 @@ use crate::{
file_path_for_media_processor, IsolatedFilePathData,
},
prisma::{location, PrismaClient},
util::db::maybe_missing,
util::db::{maybe_missing, MissingFieldError},
Node,
};
use std::{
future::Future,
hash::Hash,
path::{Path, PathBuf},
time::Duration,
};
use async_channel as chan;
use futures::StreamExt;
use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
use sd_file_ext::extensions::Extension;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, info};
use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};
use super::{
dispatch_thumbnails_for_processing, media_data_extractor, process, MediaProcessorError,
MediaProcessorMetadata,
media_data_extractor, process,
thumbnail::{self, GenerateThumbnailArgs},
BatchToProcess, MediaProcessorError, MediaProcessorMetadata,
};
const BATCH_SIZE: usize = 10;
@@ -53,12 +58,20 @@ impl Hash for MediaProcessorJobInit {
pub struct MediaProcessorJobData {
location_path: PathBuf,
to_process_path: PathBuf,
#[serde(skip, default)]
maybe_thumbnailer_progress_rx: Option<chan::Receiver<(u32, u32)>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum MediaProcessorJobStep {
ExtractMediaData(Vec<file_path_for_media_processor::Data>),
WaitThumbnails(usize),
}
#[async_trait::async_trait]
impl StatefulJob for MediaProcessorJobInit {
type Data = MediaProcessorJobData;
type Step = Vec<file_path_for_media_processor::Data>;
type Step = MediaProcessorJobStep;
type RunMetadata = MediaProcessorMetadata;
const NAME: &'static str = "media_processor";
@@ -113,30 +126,52 @@ impl StatefulJob for MediaProcessorJobInit {
"Searching for media files in location {location_id} at directory \"{iso_file_path}\""
);
dispatch_thumbnails_for_processing(
let thumbs_to_process_count = dispatch_thumbnails_for_processing(
location_id,
&location_path,
&iso_file_path,
&ctx.library,
&ctx.node,
false,
get_all_children_files_by_extensions,
)
.await?;
let maybe_thumbnailer_progress_rx = if thumbs_to_process_count > 0 {
let (progress_tx, progress_rx) = chan::unbounded();
ctx.node
.thumbnailer
.register_reporter(location_id, progress_tx)
.await;
Some(progress_rx)
} else {
None
};
let file_paths = get_files_for_media_data_extraction(db, &iso_file_path).await?;
let total_files = file_paths.len();
let chunked_files = file_paths
.into_iter()
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
let chunked_files =
file_paths
.into_iter()
.chunks(BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.map(MediaProcessorJobStep::ExtractMediaData)
.chain(
[(thumbs_to_process_count > 0).then_some(
MediaProcessorJobStep::WaitThumbnails(thumbs_to_process_count as usize),
)]
.into_iter()
.flatten(),
)
.collect::<Vec<_>>();
ctx.progress(vec![
JobReportUpdate::TaskCount(total_files),
JobReportUpdate::Phase("media_data".to_string()),
JobReportUpdate::Message(format!(
"Preparing to process {total_files} files in {} chunks",
chunked_files.len()
@@ -146,35 +181,85 @@ impl StatefulJob for MediaProcessorJobInit {
*data = Some(MediaProcessorJobData {
location_path,
to_process_path,
maybe_thumbnailer_progress_rx,
});
Ok(chunked_files.into())
Ok((
Self::RunMetadata {
thumbs_processed: thumbs_to_process_count,
..Default::default()
},
chunked_files,
)
.into())
}
async fn execute_step(
&self,
ctx: &WorkerContext,
CurrentStep {
step: file_paths,
step_number,
}: CurrentStep<'_, Self::Step>,
CurrentStep { step, step_number }: CurrentStep<'_, Self::Step>,
data: &Self::Data,
_: &Self::RunMetadata,
) -> Result<JobStepOutput<Self::Step, Self::RunMetadata>, JobError> {
process(
file_paths,
self.location.id,
&data.location_path,
&ctx.library.db,
&|completed_count| {
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
step_number * BATCH_SIZE + completed_count,
)]);
},
)
.await
.map(Into::into)
.map_err(Into::into)
match step {
MediaProcessorJobStep::ExtractMediaData(file_paths) => process(
file_paths,
self.location.id,
&data.location_path,
&ctx.library.db,
&|completed_count| {
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
step_number * BATCH_SIZE + completed_count,
)]);
},
)
.await
.map(Into::into)
.map_err(Into::into),
MediaProcessorJobStep::WaitThumbnails(total_thumbs) => {
ctx.progress(vec![
JobReportUpdate::TaskCount(*total_thumbs),
JobReportUpdate::Phase("thumbnails".to_string()),
JobReportUpdate::Message(format!(
"Waiting for processing of {total_thumbs} thumbnails",
)),
]);
let mut progress_rx =
if let Some(progress_rx) = data.maybe_thumbnailer_progress_rx.clone() {
progress_rx
} else {
let (progress_tx, progress_rx) = chan::unbounded();
ctx.node
.thumbnailer
.register_reporter(self.location.id, progress_tx)
.await;
progress_rx
};
let mut total_completed = 0;
while let Some((completed, total)) = progress_rx.next().await {
trace!("Received progress update from thumbnailer: {completed}/{total}",);
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
completed as usize,
)]);
total_completed = completed;
}
if progress_rx.is_closed() && total_completed < *total_thumbs as u32 {
warn!(
"Thumbnailer progress reporter channel closed before all thumbnails were
processed, job will wait a bit waiting for a shutdown signal from manager"
);
sleep(Duration::from_secs(5)).await;
}
Ok(None.into())
}
}
}
async fn finalize(
@@ -200,6 +285,108 @@ impl StatefulJob for MediaProcessorJobInit {
}
}
async fn dispatch_thumbnails_for_processing(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
parent_iso_file_path: &IsolatedFilePathData<'_>,
library: &Library,
node: &Node,
should_regenerate: bool,
) -> Result<u32, MediaProcessorError> {
let Library { db, .. } = library;
let location_path = location_path.as_ref();
let file_paths = get_all_children_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;
if file_paths.is_empty() {
return Ok(0);
}
let mut current_batch = Vec::with_capacity(16);
// PDF thumbnails are currently way slower so we process them by last
let mut pdf_thumbs = Vec::with_capacity(16);
let mut current_materialized_path = None;
let mut in_background = false;
let mut thumbs_count = 0;
for file_path in file_paths {
// Initializing current_materialized_path with the first file_path materialized_path
if current_materialized_path.is_none() {
current_materialized_path = file_path.materialized_path.clone();
}
if file_path.materialized_path != current_materialized_path
&& (!current_batch.is_empty() || !pdf_thumbs.is_empty())
{
// Now we found a different materialized_path so we dispatch the current batch and start a new one
thumbs_count += current_batch.len() as u32;
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(current_batch, should_regenerate, in_background),
library.id,
location_id,
)
.await;
// We moved our vec so we need a new
current_batch = Vec::with_capacity(16);
in_background = true; // Only the first batch should be processed in foreground
// Exchaging for the first different materialized_path
current_materialized_path = file_path.materialized_path.clone();
}
let file_path_id = file_path.id;
if let Err(e) = add_to_batch(
location_id,
location_path,
file_path,
&mut current_batch,
&mut pdf_thumbs,
) {
error!("Error adding file_path <id='{file_path_id}'> to thumbnail batch: {e:#?}");
}
}
// Dispatching the last batch
if !current_batch.is_empty() {
thumbs_count += current_batch.len() as u32;
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(current_batch, should_regenerate, in_background),
library.id,
location_id,
)
.await;
}
// We now put the pdf_thumbs to be processed by last
if !pdf_thumbs.is_empty() {
thumbs_count += pdf_thumbs.len() as u32;
node.thumbnailer
.new_indexed_thumbnails_batch_with_ticket(
BatchToProcess::new(pdf_thumbs, should_regenerate, in_background),
library.id,
location_id,
)
.await;
}
Ok(thumbs_count)
}
async fn get_files_for_media_data_extraction(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
@@ -213,22 +400,16 @@ async fn get_files_for_media_data_extraction(
.map_err(Into::into)
}
fn get_all_children_files_by_extensions<'d, 'p, 'e, 'ret>(
db: &'d PrismaClient,
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
extensions: &'e [Extension],
) -> impl Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>> + 'ret
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
{
async move {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
async fn get_all_children_files_by_extensions(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
extensions: &[Extension],
) -> Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError> {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
@@ -236,24 +417,47 @@ where
AND LOWER(extension) IN ({})
AND materialized_path LIKE {{}}
ORDER BY materialized_path ASC",
// Orderind by materialized_path so we can prioritize processing the first files
// in the above part of the directories tree
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(format!(
"{}%",
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
))
// Orderind by materialized_path so we can prioritize processing the first files
// in the above part of the directories tree
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(format!(
"{}%",
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
))
.exec()
.await
.map_err(Into::into)
}
))
.exec()
.await
.map_err(Into::into)
}
fn add_to_batch(
location_id: location::id::Type,
location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter
file_path: file_path_for_media_processor::Data,
current_batch: &mut Vec<GenerateThumbnailArgs>,
pdf_thumbs: &mut Vec<GenerateThumbnailArgs>,
) -> Result<(), MissingFieldError> {
let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone();
let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?;
let full_path = location_path.join(&iso_file_path);
let extension = iso_file_path.extension();
let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path);
if extension != "pdf" {
current_batch.push(args);
} else {
pdf_thumbs.push(args);
}
Ok(())
}

View File

@@ -1,17 +1,11 @@
use crate::{
job::{JobRunErrors, JobRunMetadata},
library::Library,
location::file_path_helper::{
file_path_for_media_processor, FilePathError, IsolatedFilePathData,
},
util::db::{maybe_missing, MissingFieldError},
Node,
location::file_path_helper::{file_path_for_media_processor, FilePathError},
};
use sd_file_ext::extensions::Extension;
use sd_prisma::prisma::{location, PrismaClient};
use std::{future::Future, path::Path};
use std::path::Path;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -19,11 +13,7 @@ use tracing::error;
use super::{
media_data_extractor::{self, MediaDataError, MediaDataExtractorMetadata},
thumbnail::{
self,
actor::{BatchToProcess, GenerateThumbnailArgs},
ThumbnailerError,
},
thumbnail::{self, BatchToProcess, ThumbnailerError},
};
mod job;
@@ -51,11 +41,15 @@ pub enum MediaProcessorError {
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct MediaProcessorMetadata {
media_data: MediaDataExtractorMetadata,
thumbs_processed: u32,
}
impl From<MediaDataExtractorMetadata> for MediaProcessorMetadata {
fn from(media_data: MediaDataExtractorMetadata) -> Self {
Self { media_data }
Self {
media_data,
thumbs_processed: 0,
}
}
}
@@ -63,126 +57,10 @@ impl JobRunMetadata for MediaProcessorMetadata {
fn update(&mut self, new_data: Self) {
self.media_data.extracted += new_data.media_data.extracted;
self.media_data.skipped += new_data.media_data.skipped;
self.thumbs_processed += new_data.thumbs_processed;
}
}
// `thumbs_fetcher_fn` MUST return file_paths ordered by `materialized_path` for optimal results
async fn dispatch_thumbnails_for_processing<'d, 'p, 'e, 'ret, F>(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
library: &'d Library,
node: &Node,
should_regenerate: bool,
thumbs_fetcher_fn: impl Fn(&'d PrismaClient, &'p IsolatedFilePathData<'_>, &'e [Extension]) -> F,
) -> Result<(), MediaProcessorError>
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
F: Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>>
+ 'ret,
{
let Library { db, .. } = library;
let location_path = location_path.as_ref();
let file_paths = thumbs_fetcher_fn(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;
let mut current_batch = Vec::with_capacity(16);
// PDF thumbnails are currently way slower so we process them by last
let mut pdf_thumbs = Vec::with_capacity(16);
let mut current_materialized_path = None;
let mut in_background = false;
for file_path in file_paths {
// Initializing current_materialized_path with the first file_path materialized_path
if current_materialized_path.is_none() {
current_materialized_path = file_path.materialized_path.clone();
}
if file_path.materialized_path != current_materialized_path
&& (!current_batch.is_empty() || !pdf_thumbs.is_empty())
{
// Now we found a different materialized_path so we dispatch the current batch and start a new one
// We starting by appending all pdfs and leaving the vec clean to be reused
current_batch.append(&mut pdf_thumbs);
node.thumbnailer
.new_indexed_thumbnails_batch(BatchToProcess {
batch: current_batch,
should_regenerate,
in_background,
})
.await;
// We moved our vec so we need a new
current_batch = Vec::with_capacity(16);
in_background = true; // Only the first batch should be processed in foreground
// Exchaging for the first different materialized_path
current_materialized_path = file_path.materialized_path.clone();
}
let file_path_id = file_path.id;
if let Err(e) = add_to_batch(
location_id,
location_path,
file_path,
&mut current_batch,
&mut pdf_thumbs,
) {
error!("Error adding file_path <id='{file_path_id}'> to thumbnail batch: {e:#?}");
}
}
// Dispatching the last batch
if !current_batch.is_empty() {
node.thumbnailer
.new_indexed_thumbnails_batch(BatchToProcess {
batch: current_batch,
should_regenerate,
in_background,
})
.await;
}
Ok(())
}
fn add_to_batch(
location_id: location::id::Type,
location_path: &Path, // This function is only used internally once, so we can pass &Path as a parameter
file_path: file_path_for_media_processor::Data,
current_batch: &mut Vec<GenerateThumbnailArgs>,
pdf_thumbs: &mut Vec<GenerateThumbnailArgs>,
) -> Result<(), MissingFieldError> {
let cas_id = maybe_missing(&file_path.cas_id, "file_path.cas_id")?.clone();
let iso_file_path = IsolatedFilePathData::try_from((location_id, file_path))?;
let full_path = location_path.join(&iso_file_path);
let extension = iso_file_path.extension();
let args = GenerateThumbnailArgs::new(extension.to_string(), cas_id, full_path);
if extension != "pdf" {
current_batch.push(args);
} else {
pdf_thumbs.push(args);
}
Ok(())
}
pub async fn process(
files_paths: &[file_path_for_media_processor::Data],
location_id: location::id::Type,

View File

@@ -6,13 +6,14 @@ use crate::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_media_processor, IsolatedFilePathData,
},
object::media::thumbnail::GenerateThumbnailArgs,
prisma::{location, PrismaClient},
util::db::maybe_missing,
Node,
};
use std::{
future::Future,
cmp::Ordering,
path::{Path, PathBuf},
};
@@ -22,8 +23,8 @@ use sd_file_ext::extensions::Extension;
use tracing::{debug, error};
use super::{
dispatch_thumbnails_for_processing,
media_data_extractor::{self, process},
thumbnail::{self, BatchToProcess},
MediaProcessorError, MediaProcessorMetadata,
};
@@ -75,7 +76,6 @@ pub async fn shallow(
library,
node,
false,
get_files_by_extensions,
)
.await?;
@@ -132,43 +132,109 @@ async fn get_files_for_media_data_extraction(
.map_err(Into::into)
}
fn get_files_by_extensions<'d, 'p, 'e, 'ret>(
db: &'d PrismaClient,
parent_iso_file_path: &'p IsolatedFilePathData<'_>,
extensions: &'e [Extension],
) -> impl Future<Output = Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError>> + 'ret
where
'd: 'ret,
'p: 'ret,
'e: 'ret,
{
async move {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
async fn dispatch_thumbnails_for_processing(
location_id: location::id::Type,
location_path: impl AsRef<Path>,
parent_iso_file_path: &IsolatedFilePathData<'_>,
library: &Library,
node: &Node,
should_regenerate: bool,
) -> Result<(), MediaProcessorError> {
let Library { db, .. } = library;
let location_path = location_path.as_ref();
let file_paths = get_files_by_extensions(
db,
parent_iso_file_path,
&thumbnail::ALL_THUMBNAILABLE_EXTENSIONS,
)
.await?;
let current_batch = file_paths
.into_iter()
.filter_map(|file_path| {
if let Some(cas_id) = file_path.cas_id.as_ref() {
Some((cas_id.clone(), file_path))
} else {
error!("File path <id='{}'> has no cas_id, skipping", file_path.id);
None
}
})
.filter_map(|(cas_id, file_path)| {
let file_path_id = file_path.id;
IsolatedFilePathData::try_from((location_id, file_path))
.map_err(|e| {
error!("Failed to extract isolated file path data from file path <id='{file_path_id}'>: {e:#?}");
})
.ok()
.map(|iso_file_path| (cas_id, iso_file_path))
})
.map(|(cas_id, iso_file_path)| {
let full_path = location_path.join(&iso_file_path);
let extension = iso_file_path.extension().to_string();
(
GenerateThumbnailArgs::new(extension.clone(), cas_id, full_path),
extension,
)
})
.sorted_by(|(_, ext_a), (_, ext_b)|
// This will put PDF files by last as they're currently way slower to be processed
// FIXME(fogodev): Remove this sort when no longer needed
match (*ext_a == "pdf", *ext_b == "pdf") {
(true, true) => Ordering::Equal,
(false, true) => Ordering::Less,
(true, false) => Ordering::Greater,
(false, false) => Ordering::Equal,
})
.map(|(args, _)| args)
.collect::<Vec<_>>();
// Let's not send an empty batch lol
if !current_batch.is_empty() {
node.thumbnailer
.new_indexed_thumbnails_batch(
BatchToProcess::new(current_batch, should_regenerate, true),
library.id,
)
.await;
}
Ok(())
}
async fn get_files_by_extensions(
db: &PrismaClient,
parent_iso_file_path: &IsolatedFilePathData<'_>,
extensions: &[Extension],
) -> Result<Vec<file_path_for_media_processor::Data>, MediaProcessorError> {
// FIXME: Had to use format! macro because PCR doesn't support IN with Vec for SQLite
// We have no data coming from the user, so this is sql injection safe
db._query_raw(raw!(
&format!(
"SELECT id, materialized_path, is_dir, name, extension, cas_id, object_id
FROM file_path
WHERE
location_id={{}}
AND cas_id IS NOT NULL
AND LOWER(extension) IN ({})
AND materialized_path = {{}}",
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
)
))
.exec()
.await
.map_err(Into::into)
}
extensions
.iter()
.map(|ext| format!("LOWER('{ext}')"))
.collect::<Vec<_>>()
.join(",")
),
PrismaValue::Int(parent_iso_file_path.location_id() as i64),
PrismaValue::String(
parent_iso_file_path
.materialized_path_for_children()
.expect("sub path iso_file_path must be a directory")
)
))
.exec()
.await
.map_err(Into::into)
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,178 @@
use crate::{library::LibraryId, util::error::FileIOError};
use sd_prisma::prisma::{file_path, PrismaClient};
use std::{collections::HashSet, ffi::OsString, path::PathBuf, sync::Arc};
use futures_concurrency::future::Join;
use tokio::{fs, spawn};
use tracing::{debug, error};
use super::{ThumbnailerError, EPHEMERAL_DIR, WEBP_EXTENSION};
pub(super) async fn process_ephemeral_clean_up(
thumbnails_directory: Arc<PathBuf>,
existing_ephemeral_thumbs: HashSet<OsString>,
) {
let ephemeral_thumbs_dir = thumbnails_directory.join(EPHEMERAL_DIR);
spawn(async move {
let mut to_remove = vec![];
let mut read_ephemeral_thumbs_dir = fs::read_dir(&ephemeral_thumbs_dir)
.await
.map_err(|e| FileIOError::from((&ephemeral_thumbs_dir, e)))?;
while let Some(shard_entry) = read_ephemeral_thumbs_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&ephemeral_thumbs_dir, e)))?
{
let shard_path = shard_entry.path();
if shard_entry
.file_type()
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?
.is_dir()
{
let mut read_shard_dir = fs::read_dir(&shard_path)
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?;
while let Some(thumb_entry) = read_shard_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?
{
let thumb_path = thumb_entry.path();
if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref())
&& !existing_ephemeral_thumbs.contains(&thumb_entry.file_name())
{
to_remove.push(async move {
debug!(
"Removing stale ephemeral thumbnail: {}",
thumb_path.display()
);
fs::remove_file(&thumb_path).await.map_err(|e| {
ThumbnailerError::FileIO(FileIOError::from((thumb_path, e)))
})
});
}
}
}
}
Ok::<_, ThumbnailerError>(to_remove.join().await)
})
.await
.map_or_else(
|e| error!("Join error on ephemeral clean up: {e:#?}",),
|fetching_res| {
fetching_res.map_or_else(
|e| error!("Error fetching ephemeral thumbs to be removed: {e:#?}"),
|remove_results| {
remove_results.into_iter().for_each(|remove_res| {
if let Err(e) = remove_res {
error!("Error on ephemeral clean up: {e:#?}");
}
})
},
)
},
)
}
pub(super) async fn process_indexed_clean_up(
thumbnails_directory: Arc<PathBuf>,
libraries_ids_and_databases: Vec<(LibraryId, Arc<PrismaClient>)>,
) {
libraries_ids_and_databases
.into_iter()
.map(|(library_id, db)| {
let library_thumbs_dir = thumbnails_directory.join(library_id.to_string());
spawn(async move {
let existing_thumbs = db
.file_path()
.find_many(vec![file_path::cas_id::not(None)])
.select(file_path::select!({ cas_id }))
.exec()
.await?
.into_iter()
.map(|file_path| {
OsString::from(format!(
"{}.webp",
file_path.cas_id.expect("we filtered right")
))
})
.collect::<HashSet<_>>();
let mut read_library_thumbs_dir = fs::read_dir(&library_thumbs_dir)
.await
.map_err(|e| FileIOError::from((&library_thumbs_dir, e)))?;
let mut to_remove = vec![];
while let Some(shard_entry) = read_library_thumbs_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&library_thumbs_dir, e)))?
{
let shard_path = shard_entry.path();
if shard_entry
.file_type()
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?
.is_dir()
{
let mut read_shard_dir = fs::read_dir(&shard_path)
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?;
while let Some(thumb_entry) = read_shard_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&shard_path, e)))?
{
let thumb_path = thumb_entry.path();
if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref())
&& !existing_thumbs.contains(&thumb_entry.file_name())
{
to_remove.push(async move {
debug!(
"Removing stale indexed thumbnail: {}",
thumb_path.display()
);
fs::remove_file(&thumb_path).await.map_err(|e| {
ThumbnailerError::FileIO(FileIOError::from((thumb_path, e)))
})
});
}
}
}
}
Ok::<_, ThumbnailerError>(to_remove.join().await)
})
})
.collect::<Vec<_>>()
.join()
.await
.into_iter()
.filter_map(|join_res| {
join_res
.map_err(|e| error!("Join error on indexed clean up: {e:#?}"))
.ok()
})
.filter_map(|fetching_res| {
fetching_res
.map_err(|e| error!("Error fetching indexed thumbs to be removed: {e:#?}"))
.ok()
})
.for_each(|remove_results| {
remove_results.into_iter().for_each(|remove_res| {
if let Err(e) = remove_res {
error!("Error on indexed clean up: {e:#?}");
}
})
})
}

View File

@@ -1,102 +1,398 @@
use crate::util::{error::FileIOError, version_manager::VersionManager};
use crate::{
library::{Libraries, LibraryId},
object::media::thumbnail::ONE_SEC,
util::{
error::FileIOError,
version_manager::{ManagedVersion, VersionManager, VersionManagerError},
},
};
use std::path::{Path, PathBuf};
use sd_prisma::prisma::{file_path, PrismaClient};
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::Arc,
};
use futures_concurrency::future::{Join, TryJoin};
use int_enum::IntEnum;
use tokio::fs;
use tracing::{debug, error, trace};
use tokio::{
fs, io, spawn,
time::{sleep, timeout},
};
use tracing::{debug, error, info, trace, warn};
use super::{get_shard_hex, ThumbnailerError, THUMBNAIL_CACHE_DIR_NAME};
use super::{
get_shard_hex, ThumbnailerError, EPHEMERAL_DIR, THIRTY_SECS, THUMBNAIL_CACHE_DIR_NAME,
VERSION_FILE, WEBP_EXTENSION,
};
#[derive(IntEnum, Debug, Clone, Copy, Eq, PartialEq)]
#[derive(IntEnum, Debug, Clone, Copy, Eq, PartialEq, strum::Display)]
#[repr(i32)]
enum ThumbnailVersion {
V1 = 1,
V2 = 2,
V3 = 3,
Unknown = 0,
}
pub async fn init_thumbnail_dir(data_dir: impl AsRef<Path>) -> Result<PathBuf, ThumbnailerError> {
impl ManagedVersion for ThumbnailVersion {
const LATEST_VERSION: Self = Self::V3;
type MigrationError = ThumbnailerError;
}
pub(super) async fn init_thumbnail_dir(
data_dir: impl AsRef<Path>,
libraries_manager: Arc<Libraries>,
) -> Result<PathBuf, ThumbnailerError> {
debug!("Initializing thumbnail directory");
let thumbnail_dir = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME);
let thumbnails_directory = data_dir.as_ref().join(THUMBNAIL_CACHE_DIR_NAME);
let version_file = thumbnail_dir.join("version.txt");
let version_manager =
VersionManager::<ThumbnailVersion>::new(version_file.to_str().expect("Invalid path"));
debug!("Thumbnail directory: {:?}", thumbnails_directory);
debug!("Thumbnail directory: {:?}", thumbnail_dir);
// create all necessary directories if they don't exist
fs::create_dir_all(&thumbnail_dir)
// create thumbnails base directory
fs::create_dir_all(&thumbnails_directory)
.await
.map_err(|e| FileIOError::from((&thumbnail_dir, e)))?;
.map_err(|e| FileIOError::from((&thumbnails_directory, e)))?;
let mut current_version = match version_manager.get_version() {
spawn({
let thumbnails_directory = thumbnails_directory.clone();
async move {
let Ok(databases) = timeout(THIRTY_SECS, async move {
loop {
let libraries = libraries_manager.get_all().await;
if !libraries.is_empty() {
break libraries
.into_iter()
.map(|library| (library.id, Arc::clone(&library.db)))
.collect::<HashMap<_, _>>();
}
sleep(ONE_SEC).await;
}
})
.await
else {
warn!(
"Failed to get libraries after 30 seconds, thumbnailer migration will not work; \
Ignore this warning if you don't created libraries yet."
);
return;
};
if let Err(e) = process_migration(thumbnails_directory, databases).await {
error!("Failed to migrate thumbnails: {e:#?}");
}
}
});
Ok(thumbnails_directory)
}
async fn process_migration(
thumbnails_directory: impl AsRef<Path>,
databases: HashMap<LibraryId, Arc<PrismaClient>>,
) -> Result<(), ThumbnailerError> {
let thumbnails_directory = thumbnails_directory.as_ref();
let version_manager =
VersionManager::<ThumbnailVersion>::new(thumbnails_directory.join(VERSION_FILE));
// create all other directories, for each library and for ephemeral thumbnails
databases
.keys()
.map(|library_id| thumbnails_directory.join(library_id.to_string()))
.chain([thumbnails_directory.join(EPHEMERAL_DIR)])
.map(|path| async move {
fs::create_dir_all(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))
})
.collect::<Vec<_>>()
.join()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let current_version = match version_manager.get_version().await {
Ok(version) => version,
Err(_) => {
debug!("Thumbnail version file does not exist, starting fresh");
Err(e) => {
debug!("Thumbnail version file does not exist, starting fresh: {e:#?}");
// Version file does not exist, start fresh
version_manager.set_version(ThumbnailVersion::V1)?;
version_manager.set_version(ThumbnailVersion::V1).await?;
ThumbnailVersion::V1
}
};
while current_version != ThumbnailVersion::V2 {
match current_version {
ThumbnailVersion::V1 => {
let thumbnail_dir_for_task = thumbnail_dir.clone();
// If the migration fails, it will return the error and exit the function
move_webp_files(&thumbnail_dir_for_task).await?;
version_manager.set_version(ThumbnailVersion::V2)?;
current_version = ThumbnailVersion::V2;
}
// If the current version is not handled explicitly, break the loop or return an error.
_ => {
error!("Thumbnail version is not handled: {:?}", current_version);
}
}
if current_version != ThumbnailVersion::LATEST_VERSION {
info!(
"Migrating thumbnail directory from {:?} to V3",
current_version
);
// Taking a reference to databases so we can move it into the closure and comply with the borrowck
let databases = &databases;
version_manager
.migrate(current_version, |current, next| {
let thumbnail_dir = &thumbnails_directory;
async move {
match (current, next) {
(ThumbnailVersion::V1, ThumbnailVersion::V2) => {
move_to_shards(thumbnail_dir).await
}
(ThumbnailVersion::V2, ThumbnailVersion::V3) => {
segregate_thumbnails_by_library(thumbnail_dir, databases).await
}
_ => {
error!("Thumbnail version is not handled: {:?}", current);
Err(VersionManagerError::UnexpectedMigration {
current_version: current.int_value(),
next_version: next.int_value(),
}
.into())
}
}
}
})
.await?;
}
Ok(thumbnail_dir)
Ok(())
}
/// This function moves all webp files in the thumbnail directory to their respective shard folders.
/// It is used to migrate from V1 to V2.
async fn move_webp_files(dir: &PathBuf) -> Result<(), ThumbnailerError> {
let mut dir_entries = fs::read_dir(dir)
async fn move_to_shards(thumbnails_directory: impl AsRef<Path>) -> Result<(), ThumbnailerError> {
let thumbnails_directory = thumbnails_directory.as_ref();
let mut dir_entries = fs::read_dir(thumbnails_directory)
.await
.map_err(|source| FileIOError::from((dir, source)))?;
.map_err(|source| FileIOError::from((thumbnails_directory, source)))?;
let mut count = 0;
while let Ok(Some(entry)) = dir_entries.next_entry().await {
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
if extension == "webp" {
let filename = path
.file_name()
.expect("Missing file name")
.to_str()
.expect("Failed to parse UTF8"); // we know they're cas_id's, so they're valid utf8
let shard_folder = get_shard_hex(filename);
if entry
.file_type()
.await
.map_err(|e| FileIOError::from((entry.path(), e)))?
.is_file()
{
let path = entry.path();
if path.extension() == Some(WEBP_EXTENSION.as_ref()) {
let file_name = entry.file_name();
let new_dir = dir.join(shard_folder);
fs::create_dir_all(&new_dir)
.await
.map_err(|source| FileIOError::from((new_dir.clone(), source)))?;
// we know they're cas_id's, so they're valid utf8
let shard_folder = get_shard_hex(file_name.to_str().expect("Failed to parse UTF8"));
let new_path = new_dir.join(filename);
fs::rename(&path, &new_path)
.await
.map_err(|source| FileIOError::from((path.clone(), source)))?;
count += 1;
}
let new_dir = thumbnails_directory.join(shard_folder);
fs::create_dir_all(&new_dir)
.await
.map_err(|source| FileIOError::from((new_dir.clone(), source)))?;
let new_path = new_dir.join(file_name);
fs::rename(&path, &new_path)
.await
.map_err(|source| FileIOError::from((path.clone(), source)))?;
count += 1;
}
}
}
trace!(
info!(
"Moved {} webp files to their respective shard folders.",
count
);
Ok(())
}
async fn segregate_thumbnails_by_library(
thumbnails_directory: impl AsRef<Path>,
databases: &HashMap<LibraryId, Arc<PrismaClient>>,
) -> Result<(), ThumbnailerError> {
// We already created the library folders in init_thumbnail_dir, so we can just move the files
// to their respective folders
let thumbnails_directory = thumbnails_directory.as_ref();
databases
.iter()
.map(|(library_id, db)| (*library_id, Arc::clone(db)))
.map(|(library_id, db)| {
let library_thumbs_dir = thumbnails_directory.join(library_id.to_string());
let old_thumbs_dir = thumbnails_directory.to_path_buf();
spawn(async move {
let mut shards_to_create = HashSet::new();
let to_move = db
.file_path()
.find_many(vec![file_path::cas_id::not(None)])
.select(file_path::select!({ cas_id }))
.exec()
.await?
.into_iter()
.filter_map(|file_path| file_path.cas_id)
.map(|cas_id| {
let new_shard = get_shard_hex(&cas_id).to_string();
let new_sharded_filename = format!("{new_shard}/{cas_id}.webp");
let old_sharded_filename = format!("{}/{cas_id}.webp", &cas_id[0..2]);
(new_shard, new_sharded_filename, old_sharded_filename)
})
.map(|(new_shard, new_sharded_filename, old_sharded_filename)| {
let old = old_thumbs_dir.join(old_sharded_filename);
let new = library_thumbs_dir.join(new_sharded_filename);
let new_shard_dir = library_thumbs_dir.join(new_shard);
shards_to_create.insert(new_shard_dir);
async move {
trace!(
"Moving thumbnail from old location to new location: {} -> {}",
old.display(),
new.display()
);
match fs::rename(&old, new).await {
Ok(_) => Ok(1),
Err(e) if e.kind() == io::ErrorKind::NotFound => {
// Thumbnail not found, it probably wasn't processed yet
Ok(0)
}
Err(e) => {
Err(ThumbnailerError::FileIO(FileIOError::from((old, e))))
}
}
}
})
.collect::<Vec<_>>();
let shards_created_count = shards_to_create
.into_iter()
.map(|path| async move {
fs::create_dir_all(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
.collect::<Vec<_>>()
.try_join()
.await?
.len();
let moved_count = to_move.try_join().await?.into_iter().sum::<u64>();
info!(
"Created {shards_created_count} shards and moved {moved_count} \
thumbnails to library folder {library_id}"
);
Ok::<_, ThumbnailerError>(())
})
})
.collect::<Vec<_>>()
.try_join()
.await?
.into_iter()
.collect::<Result<_, _>>()?;
// Now that we moved all files from all databases, everything else should be ephemeral thumbnails
// so we can just move all of them to the ephemeral directory
let ephemeral_thumbs_dir = thumbnails_directory.join(EPHEMERAL_DIR);
let mut shards_to_create = HashSet::new();
let mut to_move = vec![];
let mut read_thumbs_dir = fs::read_dir(thumbnails_directory)
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;
let mut empty_shards = vec![];
while let Some(shard_entry) = read_thumbs_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?
{
let old_shard_path = shard_entry.path();
if shard_entry
.file_type()
.await
.map_err(|e| FileIOError::from((&old_shard_path, e)))?
.is_dir()
{
let mut read_shard_dir = fs::read_dir(&old_shard_path)
.await
.map_err(|e| FileIOError::from((&old_shard_path, e)))?;
while let Some(thumb_entry) = read_shard_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&old_shard_path, e)))?
{
let thumb_path = thumb_entry.path();
if thumb_path.extension() == Some(WEBP_EXTENSION.as_ref()) {
let thumb_filename = thumb_entry.file_name();
let mut new_ephemeral_shard = ephemeral_thumbs_dir.join(get_shard_hex(
thumb_filename.to_str().expect("cas_ids are utf-8"),
));
shards_to_create.insert(new_ephemeral_shard.clone());
new_ephemeral_shard.push(thumb_filename);
to_move.push(async move {
trace!(
"Moving thumbnail from old location to new location: {} -> {}",
thumb_path.display(),
new_ephemeral_shard.display()
);
fs::rename(&thumb_path, &new_ephemeral_shard)
.await
.map_err(|e| FileIOError::from((thumb_path, e)))
});
}
}
empty_shards.push(old_shard_path);
}
}
shards_to_create
.into_iter()
.map(|path| async move {
fs::create_dir_all(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
.collect::<Vec<_>>()
.try_join()
.await?;
let moved_shard = to_move.try_join().await?.len();
info!("Moved {moved_shard} shards to the ephemeral directory");
empty_shards
.into_iter()
.filter_map(|path| {
path.file_name()
.map_or(false, |name| name.len() == 2)
.then_some(async move {
trace!("Removing empty shard directory: {}", path.display());
fs::remove_dir(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
})
.collect::<Vec<_>>()
.try_join()
.await?;
Ok(())
}

View File

@@ -1,4 +1,5 @@
use crate::{
library::LibraryId,
util::{error::FileIOError, version_manager::VersionManagerError},
Node,
};
@@ -6,50 +7,99 @@ use crate::{
use sd_file_ext::extensions::{
DocumentExtension, Extension, ImageExtension, ALL_DOCUMENT_EXTENSIONS, ALL_IMAGE_EXTENSIONS,
};
use sd_images::{format_image, scale_dimensions};
use sd_media_metadata::image::Orientation;
#[cfg(feature = "ffmpeg")]
use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
use std::{
ops::Deref,
path::{Path, PathBuf},
time::Duration,
};
use image::{self, imageops, DynamicImage, GenericImageView};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{fs, task};
use tokio::task;
use tracing::error;
use webp::Encoder;
pub mod actor;
mod clean_up;
mod directory;
mod process;
mod shard;
mod state;
mod worker;
pub use directory::init_thumbnail_dir;
pub use process::{BatchToProcess, GenerateThumbnailArgs};
pub use shard::get_shard_hex;
pub const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
// Files names constants
const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
const SAVE_STATE_FILE: &str = "thumbs_to_process.bin";
const VERSION_FILE: &str = "version.txt";
pub const WEBP_EXTENSION: &str = "webp";
const EPHEMERAL_DIR: &str = "ephemeral";
/// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled
/// to [`TARGET_QUALITY`].
const TARGET_PX: f32 = 262144_f32;
/// This is the target quality that we render thumbnails at, it is a float between 0-100
/// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`).
const TARGET_QUALITY: f32 = 30_f32;
// Some time constants
const ONE_SEC: Duration = Duration::from_secs(1);
const THIRTY_SECS: Duration = Duration::from_secs(30);
const HALF_HOUR: Duration = Duration::from_secs(30 * 60);
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ThumbnailKind {
Ephemeral,
Indexed(LibraryId),
}
pub fn get_indexed_thumbnail_path(node: &Node, cas_id: &str, library_id: LibraryId) -> PathBuf {
get_thumbnail_path(node, cas_id, ThumbnailKind::Indexed(library_id))
}
/// This does not check if a thumbnail exists, it just returns the path that it would exist at
pub fn get_thumbnail_path(node: &Node, cas_id: &str) -> PathBuf {
fn get_thumbnail_path(node: &Node, cas_id: &str, kind: ThumbnailKind) -> PathBuf {
let mut thumb_path = node.config.data_directory();
thumb_path.push(THUMBNAIL_CACHE_DIR_NAME);
match kind {
ThumbnailKind::Ephemeral => thumb_path.push(EPHEMERAL_DIR),
ThumbnailKind::Indexed(library_id) => {
thumb_path.push(library_id.to_string());
}
}
thumb_path.push(get_shard_hex(cas_id));
thumb_path.push(cas_id);
thumb_path.set_extension("webp");
thumb_path.set_extension(WEBP_EXTENSION);
thumb_path
}
pub fn get_indexed_thumb_key(cas_id: &str, library_id: LibraryId) -> Vec<String> {
get_thumb_key(cas_id, ThumbnailKind::Indexed(library_id))
}
pub fn get_ephemeral_thumb_key(cas_id: &str) -> Vec<String> {
get_thumb_key(cas_id, ThumbnailKind::Ephemeral)
}
// this is used to pass the relevant data to the frontend so it can request the thumbnail
// it supports extending the shard hex to support deeper directory structures in the future
pub fn get_thumb_key(cas_id: &str) -> Vec<String> {
vec![get_shard_hex(cas_id), cas_id.to_string()]
fn get_thumb_key(cas_id: &str, kind: ThumbnailKind) -> Vec<String> {
vec![
match kind {
ThumbnailKind::Ephemeral => String::from(EPHEMERAL_DIR),
ThumbnailKind::Indexed(library_id) => library_id.to_string(),
},
get_shard_hex(cas_id).to_string(),
cas_id.to_string(),
]
}
#[cfg(feature = "ffmpeg")]
@@ -78,7 +128,7 @@ pub(super) static THUMBNAILABLE_EXTENSIONS: Lazy<Vec<Extension>> = Lazy::new(||
.collect()
});
pub static ALL_THUMBNAILABLE_EXTENSIONS: Lazy<Vec<Extension>> = Lazy::new(|| {
pub(super) static ALL_THUMBNAILABLE_EXTENSIONS: Lazy<Vec<Extension>> = Lazy::new(|| {
#[cfg(feature = "ffmpeg")]
return THUMBNAILABLE_EXTENSIONS
.iter()
@@ -110,19 +160,11 @@ pub enum ThumbnailerError {
Task(#[from] task::JoinError),
#[cfg(feature = "ffmpeg")]
#[error(transparent)]
FFmpeg(#[from] sd_ffmpeg::ThumbnailerError),
FFmpeg(#[from] sd_ffmpeg::Error),
#[error("thumbnail generation timed out for {}", .0.display())]
TimedOut(Box<Path>),
}
/// This is the target pixel count for all thumbnails to be resized to, and it is eventually downscaled
/// to [`TARGET_QUALITY`].
const TARGET_PX: f32 = 262144_f32;
/// This is the target quality that we render thumbnails at, it is a float between 0-100
/// and is treated as a percentage (so 30% in this case, or it's the same as multiplying by `0.3`).
const TARGET_QUALITY: f32 = 30_f32;
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub enum ThumbnailerEntryKind {
Image,
@@ -136,80 +178,6 @@ pub struct ThumbnailerMetadata {
pub skipped: u32,
}
pub async fn generate_image_thumbnail(
file_path: impl AsRef<Path>,
output_path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
let file_path = file_path.as_ref().to_path_buf();
let webp = task::spawn_blocking(move || -> Result<_, ThumbnailerError> {
let img = format_image(&file_path).map_err(|e| ThumbnailerError::SdImages {
path: file_path.clone().into_boxed_path(),
error: e,
})?;
let (w, h) = img.dimensions();
let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX);
// Optionally, resize the existing photo and convert back into DynamicImage
let mut img = DynamicImage::ImageRgba8(imageops::resize(
&img,
w_scaled as u32,
h_scaled as u32,
imageops::FilterType::Triangle,
));
// this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error
if let Some(orientation) = Orientation::from_path(&file_path) {
img = orientation.correct_thumbnail(img);
}
// Create the WebP encoder for the above image
let encoder =
Encoder::from_image(&img).map_err(|reason| ThumbnailerError::WebPEncoding {
path: file_path.into_boxed_path(),
reason: reason.to_string(),
})?;
// Type WebPMemory is !Send, which makes the Future in this function !Send,
// this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec<u8>
// which implies on a unwanted clone...
Ok(encoder.encode(TARGET_QUALITY).deref().to_owned())
})
.await??;
let output_path = output_path.as_ref();
if let Some(shard_dir) = output_path.parent() {
fs::create_dir_all(shard_dir)
.await
.map_err(|e| FileIOError::from((shard_dir, e)))?;
} else {
error!(
"Failed to get parent directory of '{}' for sharding parent directory",
output_path.display()
);
}
fs::write(output_path, &webp)
.await
.map_err(|e| FileIOError::from((output_path, e)))
.map_err(Into::into)
}
#[cfg(feature = "ffmpeg")]
pub async fn generate_video_thumbnail(
file_path: impl AsRef<Path>,
output_path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
use sd_ffmpeg::to_thumbnail;
to_thumbnail(file_path, output_path, 256, TARGET_QUALITY)
.await
.map_err(Into::into)
}
#[cfg(feature = "ffmpeg")]
pub const fn can_generate_thumbnail_for_video(video_extension: &VideoExtension) -> bool {
use VideoExtension::*;

View File

@@ -0,0 +1,420 @@
use crate::{api::CoreEvent, util::error::FileIOError};
use sd_file_ext::extensions::{DocumentExtension, ImageExtension};
use sd_images::{format_image, scale_dimensions};
use sd_media_metadata::image::Orientation;
use sd_prisma::prisma::location;
use std::{
collections::VecDeque,
ffi::OsString,
ops::Deref,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use async_channel as chan;
use futures_concurrency::future::{Join, Race};
use image::{self, imageops, DynamicImage, GenericImageView};
use serde::{Deserialize, Serialize};
use tokio::{
fs, io,
sync::{broadcast, oneshot},
task::{spawn, spawn_blocking},
time::timeout,
};
use tracing::{error, trace, warn};
use webp::Encoder;
use super::{
can_generate_thumbnail_for_document, can_generate_thumbnail_for_image, get_thumb_key,
shard::get_shard_hex, ThumbnailKind, ThumbnailerError, EPHEMERAL_DIR, TARGET_PX,
TARGET_QUALITY, THIRTY_SECS, WEBP_EXTENSION,
};
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateThumbnailArgs {
pub extension: String,
pub cas_id: String,
pub path: PathBuf,
}
impl GenerateThumbnailArgs {
pub fn new(extension: String, cas_id: String, path: PathBuf) -> Self {
Self {
extension,
cas_id,
path,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BatchToProcess {
pub(super) batch: Vec<GenerateThumbnailArgs>,
pub(super) should_regenerate: bool,
pub(super) in_background: bool,
pub(super) location_id: Option<location::id::Type>,
}
impl BatchToProcess {
pub fn new(
batch: Vec<GenerateThumbnailArgs>,
should_regenerate: bool,
in_background: bool,
) -> Self {
Self {
batch,
should_regenerate,
in_background,
location_id: None,
}
}
}
pub(super) struct ProcessorControlChannels {
pub stop_rx: chan::Receiver<oneshot::Sender<()>>,
pub done_tx: oneshot::Sender<()>,
pub batch_report_progress_tx: chan::Sender<(location::id::Type, u32)>,
}
pub(super) async fn batch_processor(
thumbnails_directory: Arc<PathBuf>,
(
BatchToProcess {
batch,
should_regenerate,
in_background,
location_id,
},
kind,
): (BatchToProcess, ThumbnailKind),
generated_ephemeral_thumbs_file_names_tx: chan::Sender<Vec<OsString>>,
ProcessorControlChannels {
stop_rx,
done_tx,
batch_report_progress_tx,
}: ProcessorControlChannels,
leftovers_tx: chan::Sender<(BatchToProcess, ThumbnailKind)>,
reporter: broadcast::Sender<CoreEvent>,
batch_size: usize,
) {
trace!(
"Processing thumbnails batch of kind {kind:?} with size {} in {}",
batch.len(),
if in_background {
"background"
} else {
"foreground"
},
);
// Tranforming to `VecDeque` so we don't need to move anything as we consume from the beginning
// This from is guaranteed to be O(1)
let mut queue = VecDeque::from(batch);
enum RaceOutputs {
Processed,
Stop(oneshot::Sender<()>),
}
while !queue.is_empty() {
let chunk = (0..batch_size)
.filter_map(|_| queue.pop_front())
.map(
|GenerateThumbnailArgs {
extension,
cas_id,
path,
}| {
let reporter = reporter.clone();
let thumbnails_directory = thumbnails_directory.as_ref().clone();
spawn(async move {
timeout(
THIRTY_SECS,
generate_thumbnail(
thumbnails_directory,
ThumbData {
extension: &extension,
cas_id,
path: &path,
in_background,
should_regenerate,
kind,
},
reporter,
),
)
.await
.unwrap_or_else(|_| Err(ThumbnailerError::TimedOut(path.into_boxed_path())))
})
},
)
.collect::<Vec<_>>();
let generated_ephemeral_thumbs_file_names_tx =
generated_ephemeral_thumbs_file_names_tx.clone();
let report_progress_tx = batch_report_progress_tx.clone();
let chunk_len = chunk.len() as u32;
if let RaceOutputs::Stop(stopped_tx) = (
async move {
if let Err(e) = spawn(async move {
let cas_ids = chunk
.join()
.await
.into_iter()
.filter_map(|join_result| {
join_result
.map_err(|e| {
error!("Failed to join thumbnail generation task: {e:#?}")
})
.ok()
})
.filter_map(|result| {
result
.map_err(|e| {
error!(
"Failed to generate thumbnail for {} location: {e:#?}",
if let ThumbnailKind::Ephemeral = kind {
"ephemeral"
} else {
"indexed"
}
)
})
.ok()
})
.map(|cas_id| OsString::from(format!("{}.webp", cas_id)))
.collect();
if kind == ThumbnailKind::Ephemeral
&& generated_ephemeral_thumbs_file_names_tx
.send(cas_ids)
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send generated cas ids")
}
if let Some(location_id) = location_id {
report_progress_tx.send((location_id, chunk_len)).await.ok();
}
})
.await
{
error!("Failed to join spawned task to process thumbnails chunk on a batch: {e:#?}");
}
trace!("Processed chunk with {chunk_len} thumbnails");
RaceOutputs::Processed
},
async {
let tx = stop_rx
.recv()
.await
.expect("Critical error on thumbnails actor");
trace!("Received a stop signal");
RaceOutputs::Stop(tx)
},
)
.race()
.await
{
// Our queue is always contiguous, so this `from` is free
let leftovers = Vec::from(queue);
trace!(
"Stopped with {} thumbnails left to process",
leftovers.len()
);
if !leftovers.is_empty()
&& leftovers_tx
.send((
BatchToProcess {
batch: leftovers,
should_regenerate,
in_background: true, // Leftovers should always be in background
location_id,
},
kind,
))
.await
.is_err()
{
error!("Thumbnail actor is dead: Failed to send leftovers")
}
done_tx.send(()).ok();
stopped_tx.send(()).ok();
return;
}
}
trace!("Finished batch!");
done_tx.send(()).ok();
}
pub(super) struct ThumbData<'ext, P: AsRef<Path>> {
pub extension: &'ext str,
pub cas_id: String,
pub path: P,
pub in_background: bool,
pub should_regenerate: bool,
pub kind: ThumbnailKind,
}
pub(super) async fn generate_thumbnail(
thumbnails_directory: PathBuf,
ThumbData {
extension,
cas_id,
path,
in_background,
should_regenerate,
kind,
}: ThumbData<'_, impl AsRef<Path>>,
reporter: broadcast::Sender<CoreEvent>,
) -> Result<String, ThumbnailerError> {
let path = path.as_ref();
trace!("Generating thumbnail for {}", path.display());
let mut output_path = thumbnails_directory;
match kind {
ThumbnailKind::Ephemeral => output_path.push(EPHEMERAL_DIR),
ThumbnailKind::Indexed(library_id) => output_path.push(library_id.to_string()),
};
output_path.push(get_shard_hex(&cas_id));
output_path.push(&cas_id);
output_path.set_extension(WEBP_EXTENSION);
if let Err(e) = fs::metadata(&output_path).await {
if e.kind() != io::ErrorKind::NotFound {
error!(
"Failed to check if thumbnail exists, but we will try to generate it anyway: {e:#?}"
);
}
// Otherwise we good, thumbnail doesn't exist so we can generate it
} else if !should_regenerate {
trace!(
"Skipping thumbnail generation for {} because it already exists",
path.display()
);
return Ok(cas_id);
}
if let Ok(extension) = ImageExtension::from_str(extension) {
if can_generate_thumbnail_for_image(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
} else if let Ok(extension) = DocumentExtension::from_str(extension) {
if can_generate_thumbnail_for_document(&extension) {
generate_image_thumbnail(&path, &output_path).await?;
}
}
#[cfg(feature = "ffmpeg")]
{
use crate::object::media::thumbnail::can_generate_thumbnail_for_video;
use sd_file_ext::extensions::VideoExtension;
if let Ok(extension) = VideoExtension::from_str(extension) {
if can_generate_thumbnail_for_video(&extension) {
generate_video_thumbnail(&path, &output_path).await?;
}
}
}
if !in_background {
trace!("Emitting new thumbnail event");
if reporter
.send(CoreEvent::NewThumbnail {
thumb_key: get_thumb_key(&cas_id, kind),
})
.is_err()
{
warn!("Error sending event to Node's event bus");
}
}
trace!("Generated thumbnail for {}", path.display());
Ok(cas_id)
}
async fn generate_image_thumbnail(
file_path: impl AsRef<Path>,
output_path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
let file_path = file_path.as_ref().to_path_buf();
let webp = spawn_blocking(move || -> Result<_, ThumbnailerError> {
let img = format_image(&file_path).map_err(|e| ThumbnailerError::SdImages {
path: file_path.clone().into_boxed_path(),
error: e,
})?;
let (w, h) = img.dimensions();
let (w_scaled, h_scaled) = scale_dimensions(w as f32, h as f32, TARGET_PX);
// Optionally, resize the existing photo and convert back into DynamicImage
let mut img = DynamicImage::ImageRgba8(imageops::resize(
&img,
w_scaled as u32,
h_scaled as u32,
imageops::FilterType::Triangle,
));
// this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error
if let Some(orientation) = Orientation::from_path(&file_path) {
img = orientation.correct_thumbnail(img);
}
// Create the WebP encoder for the above image
let encoder =
Encoder::from_image(&img).map_err(|reason| ThumbnailerError::WebPEncoding {
path: file_path.into_boxed_path(),
reason: reason.to_string(),
})?;
// Type WebPMemory is !Send, which makes the Future in this function !Send,
// this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec<u8>
// which implies on a unwanted clone...
Ok(encoder.encode(TARGET_QUALITY).deref().to_owned())
})
.await??;
let output_path = output_path.as_ref();
if let Some(shard_dir) = output_path.parent() {
fs::create_dir_all(shard_dir)
.await
.map_err(|e| FileIOError::from((shard_dir, e)))?;
} else {
error!(
"Failed to get parent directory of '{}' for sharding parent directory",
output_path.display()
);
}
fs::write(output_path, &webp)
.await
.map_err(|e| FileIOError::from((output_path, e)))
.map_err(Into::into)
}
#[cfg(feature = "ffmpeg")]
async fn generate_video_thumbnail(
file_path: impl AsRef<Path>,
output_path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
use sd_ffmpeg::to_thumbnail;
to_thumbnail(file_path, output_path, 256, TARGET_QUALITY)
.await
.map_err(Into::into)
}

View File

@@ -1,8 +1,13 @@
/// The practice of dividing files into hex coded folders, often called "sharding," is mainly used to optimize file system performance. File systems can start to slow down as the number of files in a directory increases. Thus, it's often beneficial to split files into multiple directories to avoid this performance degradation.
/// The practice of dividing files into hex coded folders, often called "sharding,"
/// is mainly used to optimize file system performance. File systems can start to slow down
/// as the number of files in a directory increases. Thus, it's often beneficial to split
/// files into multiple directories to avoid this performance degradation.
/// `get_shard_hex` takes a cas_id (a hexadecimal hash) as input and returns the first two characters of the hash as the directory name. Because we're using the first two characters of a the hash, this will give us 256 (16*16) possible directories, named 00 to ff.
pub fn get_shard_hex(cas_id: &str) -> String {
// Use the first two characters of the hash as the directory name
let directory_name = &cas_id[0..2];
directory_name.to_string()
/// `get_shard_hex` takes a cas_id (a hexadecimal hash) as input and returns the first
/// three characters of the hash as the directory name. Because we're using these first
/// three characters of a the hash, this will give us 4096 (16^3) possible directories,
/// named 000 to fff.
pub fn get_shard_hex(cas_id: &str) -> &str {
// Use the first three characters of the hash as the directory name
&cas_id[0..3]
}

View File

@@ -0,0 +1,222 @@
use crate::{library::LibraryId, util::error::FileIOError};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
ffi::OsString,
path::Path,
};
use async_channel as chan;
use futures_concurrency::future::TryJoin;
use sd_prisma::prisma::location;
use serde::{Deserialize, Serialize};
use tokio::{fs, io};
use tracing::{error, info, trace};
use super::{
actor::ActorError, get_shard_hex, BatchToProcess, ThumbnailKind, EPHEMERAL_DIR, SAVE_STATE_FILE,
};
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct ThumbsProcessingSaveState {
pub(super) bookkeeper: BookKeeper,
pub(super) ephemeral_file_names: HashSet<OsString>,
// This queues doubles as LIFO and FIFO, assuming LIFO in case of users asking for a new batch
// by entering a new directory in the explorer, otherwise processing as FIFO
pub(super) queue: VecDeque<(BatchToProcess, ThumbnailKind)>,
// These below are FIFO queues, so we can process leftovers from the previous batch first
pub(super) indexed_leftovers_queue: VecDeque<(BatchToProcess, LibraryId)>,
pub(super) ephemeral_leftovers_queue: VecDeque<BatchToProcess>,
}
impl Default for ThumbsProcessingSaveState {
fn default() -> Self {
Self {
bookkeeper: BookKeeper::default(),
ephemeral_file_names: HashSet::with_capacity(128),
queue: VecDeque::with_capacity(32),
indexed_leftovers_queue: VecDeque::with_capacity(8),
ephemeral_leftovers_queue: VecDeque::with_capacity(8),
}
}
}
impl ThumbsProcessingSaveState {
pub(super) async fn load(thumbnails_directory: impl AsRef<Path>) -> Self {
let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE);
match fs::read(&resume_file).await {
Ok(bytes) => {
let this = rmp_serde::from_slice::<Self>(&bytes).unwrap_or_else(|e| {
error!("Failed to deserialize save state at thumbnailer actor: {e:#?}");
Self::default()
});
if let Err(e) = fs::remove_file(&resume_file).await {
error!(
"Failed to remove save state file at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
}
info!(
"Resuming thumbnailer actor state: Existing ephemeral thumbs: {}; \
Queued batches waiting processing: {}",
this.ephemeral_file_names.len(),
this.queue.len()
+ this.indexed_leftovers_queue.len()
+ this.ephemeral_leftovers_queue.len()
);
this
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
trace!("No save state found at thumbnailer actor");
Self::default()
}
Err(e) => {
error!(
"Failed to read save state at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
Self::default()
}
}
}
pub(super) async fn store(self, thumbnails_directory: impl AsRef<Path>) {
let resume_file = thumbnails_directory.as_ref().join(SAVE_STATE_FILE);
info!(
"Saving thumbnailer actor state: Existing ephemeral thumbs: {}; \
Queued batches waiting processing: {}",
self.ephemeral_file_names.len(),
self.queue.len()
+ self.indexed_leftovers_queue.len()
+ self.ephemeral_leftovers_queue.len()
);
let Ok(bytes) = rmp_serde::to_vec_named(&self).map_err(|e| {
error!("Failed to serialize save state at thumbnailer actor: {e:#?}");
}) else {
return;
};
if let Err(e) = fs::write(&resume_file, bytes).await {
error!(
"Failed to write save state at thumbnailer actor: {:#?}",
FileIOError::from((resume_file, e))
);
}
}
}
pub(super) async fn remove_by_cas_ids(
thumbnails_directory: &Path,
cas_ids: Vec<String>,
kind: ThumbnailKind,
) -> Result<(), ActorError> {
let base_dir = match kind {
ThumbnailKind::Ephemeral => thumbnails_directory.join(EPHEMERAL_DIR),
ThumbnailKind::Indexed(library_id) => thumbnails_directory.join(library_id.to_string()),
};
cas_ids
.into_iter()
.map(|cas_id| {
let thumbnail_path = base_dir.join(format!("{}/{cas_id}.webp", get_shard_hex(&cas_id)));
trace!("Removing thumbnail: {}", thumbnail_path.display());
async move {
match fs::remove_file(&thumbnail_path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FileIOError::from((thumbnail_path, e))),
}
}
})
.collect::<Vec<_>>()
.try_join()
.await?;
Ok(())
}
pub(super) type RegisterReporter = (location::id::Type, chan::Sender<(u32, u32)>);
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct BookKeeper {
work_progress: HashMap<location::id::Type, (u32, u32)>, // (pending, total)
// We can't save reporter function or a channel to disk, the job must ask again to be registered
#[serde(skip, default)]
reporter_by_location: HashMap<location::id::Type, chan::Sender<(u32, u32)>>,
}
impl Default for BookKeeper {
fn default() -> Self {
Self {
work_progress: HashMap::with_capacity(8),
reporter_by_location: HashMap::with_capacity(8),
}
}
}
impl BookKeeper {
pub(super) async fn add_work(&mut self, location_id: location::id::Type, thumbs_count: u32) {
let (in_progress, total) = match self.work_progress.entry(location_id) {
Entry::Occupied(mut entry) => {
let (in_progress, total) = entry.get_mut();
*total += thumbs_count;
(*in_progress, *total)
}
Entry::Vacant(entry) => {
entry.insert((0, thumbs_count));
(0, thumbs_count)
}
};
if let Some(progress_tx) = self.reporter_by_location.get(&location_id) {
if progress_tx.send((in_progress, total)).await.is_err() {
error!(
"Failed to send progress update to reporter on location <id='{location_id}'>"
);
}
}
}
pub(super) fn register_reporter(
&mut self,
location_id: location::id::Type,
reporter_tx: chan::Sender<(u32, u32)>,
) {
self.reporter_by_location.insert(location_id, reporter_tx);
}
pub(super) async fn add_progress(&mut self, location_id: location::id::Type, progress: u32) {
if let Some((current_progress, total)) = self.work_progress.get_mut(&location_id) {
*current_progress += progress;
if *current_progress == *total {
if let Some(progress_tx) = self.reporter_by_location.remove(&location_id) {
if progress_tx.send((*current_progress, *total)).await.is_err() {
error!(
"Failed to send progress update to reporter on location <id='{location_id}'>"
);
}
}
self.work_progress.remove(&location_id);
} else if let Some(progress_tx) = self.reporter_by_location.get(&location_id) {
if progress_tx.send((*current_progress, *total)).await.is_err() {
error!(
"Failed to send progress update to reporter on location <id='{location_id}'>"
);
}
}
}
}
}

View File

@@ -0,0 +1,323 @@
use crate::api::CoreEvent;
use std::{collections::HashMap, ffi::OsString, path::PathBuf, sync::Arc};
use sd_prisma::prisma::location;
use async_channel as chan;
use futures_concurrency::stream::Merge;
use tokio::{
spawn,
sync::{broadcast, oneshot},
time::{interval, interval_at, timeout, Instant, MissedTickBehavior},
};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tracing::{debug, error, trace};
use super::{
actor::DatabaseMessage,
clean_up::{process_ephemeral_clean_up, process_indexed_clean_up},
process::{batch_processor, ProcessorControlChannels},
state::{remove_by_cas_ids, RegisterReporter, ThumbsProcessingSaveState},
BatchToProcess, ThumbnailKind, HALF_HOUR, ONE_SEC, THIRTY_SECS,
};
#[derive(Debug, Clone)]
pub(super) struct WorkerChannels {
pub(super) progress_management_rx: chan::Receiver<RegisterReporter>,
pub(super) databases_rx: chan::Receiver<DatabaseMessage>,
pub(super) cas_ids_to_delete_rx: chan::Receiver<(Vec<String>, ThumbnailKind)>,
pub(super) thumbnails_to_generate_rx: chan::Receiver<(BatchToProcess, ThumbnailKind)>,
pub(super) cancel_rx: chan::Receiver<oneshot::Sender<()>>,
}
pub(super) async fn worker(
batch_size: usize,
reporter: broadcast::Sender<CoreEvent>,
thumbnails_directory: Arc<PathBuf>,
WorkerChannels {
progress_management_rx,
databases_rx,
cas_ids_to_delete_rx,
thumbnails_to_generate_rx,
cancel_rx,
}: WorkerChannels,
) {
let mut to_remove_interval = interval_at(Instant::now() + THIRTY_SECS, HALF_HOUR);
to_remove_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut idle_interval = interval(ONE_SEC);
idle_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut databases = HashMap::new();
#[derive(Debug)]
enum StreamMessage {
RemovalTick,
ToDelete((Vec<String>, ThumbnailKind)),
Database(DatabaseMessage),
NewBatch((BatchToProcess, ThumbnailKind)),
Leftovers((BatchToProcess, ThumbnailKind)),
NewEphemeralThumbnailsFilenames(Vec<OsString>),
ProgressManagement(RegisterReporter),
BatchProgress((location::id::Type, u32)),
Shutdown(oneshot::Sender<()>),
IdleTick,
}
let ThumbsProcessingSaveState {
mut bookkeeper,
mut ephemeral_file_names,
mut queue,
mut indexed_leftovers_queue,
mut ephemeral_leftovers_queue,
} = ThumbsProcessingSaveState::load(thumbnails_directory.as_ref()).await;
let (generated_ephemeral_thumbnails_tx, ephemeral_thumbnails_cas_ids_rx) = chan::bounded(32);
let (leftovers_tx, leftovers_rx) = chan::bounded(8);
let (batch_report_progress_tx, batch_report_progress_rx) = chan::bounded(8);
let (stop_older_processing_tx, stop_older_processing_rx) = chan::bounded(1);
let mut shutdown_leftovers_rx = leftovers_rx.clone();
let mut shutdowm_batch_report_progress_rx = batch_report_progress_rx.clone();
let mut current_batch_processing_rx: Option<oneshot::Receiver<()>> = None;
let mut msg_stream = (
IntervalStream::new(to_remove_interval).map(|_| StreamMessage::RemovalTick),
cas_ids_to_delete_rx.map(StreamMessage::ToDelete),
databases_rx.map(StreamMessage::Database),
thumbnails_to_generate_rx.map(StreamMessage::NewBatch),
leftovers_rx.map(StreamMessage::Leftovers),
ephemeral_thumbnails_cas_ids_rx.map(StreamMessage::NewEphemeralThumbnailsFilenames),
progress_management_rx.map(StreamMessage::ProgressManagement),
batch_report_progress_rx.map(StreamMessage::BatchProgress),
cancel_rx.map(StreamMessage::Shutdown),
IntervalStream::new(idle_interval).map(|_| StreamMessage::IdleTick),
)
.merge();
while let Some(msg) = msg_stream.next().await {
match msg {
StreamMessage::IdleTick => {
if let Some(done_rx) = current_batch_processing_rx.as_mut() {
// Checking if the previous run finished or was aborted to clean state
match done_rx.try_recv() {
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
current_batch_processing_rx = None;
}
Err(oneshot::error::TryRecvError::Empty) => {
// The previous run is still running
continue;
}
}
}
if current_batch_processing_rx.is_none()
&& (!queue.is_empty()
|| !indexed_leftovers_queue.is_empty()
|| !ephemeral_leftovers_queue.is_empty())
{
let (done_tx, done_rx) = oneshot::channel();
current_batch_processing_rx = Some(done_rx);
let batch_and_kind = if let Some(batch_and_kind) = queue.pop_front() {
batch_and_kind
} else if let Some((batch, library_id)) = indexed_leftovers_queue.pop_front() {
// indexed leftovers have bigger priority
(batch, ThumbnailKind::Indexed(library_id))
} else if let Some(batch) = ephemeral_leftovers_queue.pop_front() {
(batch, ThumbnailKind::Ephemeral)
} else {
continue;
};
spawn(batch_processor(
thumbnails_directory.clone(),
batch_and_kind,
generated_ephemeral_thumbnails_tx.clone(),
ProcessorControlChannels {
stop_rx: stop_older_processing_rx.clone(),
done_tx,
batch_report_progress_tx: batch_report_progress_tx.clone(),
},
leftovers_tx.clone(),
reporter.clone(),
batch_size,
));
}
}
StreamMessage::RemovalTick => {
// For any of them we process a clean up if a time since the last one already passed
if !databases.is_empty() {
spawn(process_indexed_clean_up(
thumbnails_directory.clone(),
databases
.iter()
.map(|(id, db)| (*id, Arc::clone(db)))
.collect::<Vec<_>>(),
));
}
if !ephemeral_file_names.is_empty() {
spawn(process_ephemeral_clean_up(
thumbnails_directory.clone(),
ephemeral_file_names.clone(),
));
}
}
StreamMessage::ToDelete((cas_ids, kind)) => {
if !cas_ids.is_empty() {
if let Err(e) = remove_by_cas_ids(&thumbnails_directory, cas_ids, kind).await {
error!("Got an error when trying to remove thumbnails: {e:#?}");
}
}
}
StreamMessage::NewBatch((batch, kind)) => {
let in_background = batch.in_background;
if let Some(location_id) = batch.location_id {
bookkeeper
.add_work(location_id, batch.batch.len() as u32)
.await;
}
trace!(
"New {kind:?} batch to process in {}, size: {}",
if in_background {
"background"
} else {
"foreground"
},
batch.batch.len()
);
if in_background {
queue.push_back((batch, kind));
} else {
// If a processing must be in foreground, then it takes maximum priority
queue.push_front((batch, kind));
}
// Only sends stop signal if there is a batch being processed
if !in_background && current_batch_processing_rx.is_some() {
trace!("Sending stop signal to older processing");
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!(
"Thumbnail remover actor died when trying to stop older processing"
);
}
}
}
}
StreamMessage::Leftovers((batch, ThumbnailKind::Indexed(library_id))) => {
indexed_leftovers_queue.push_back((batch, library_id))
}
StreamMessage::Leftovers((batch, ThumbnailKind::Ephemeral)) => {
ephemeral_leftovers_queue.push_back(batch)
}
StreamMessage::Database(DatabaseMessage::Add(id, db))
| StreamMessage::Database(DatabaseMessage::Update(id, db)) => {
databases.insert(id, db);
}
StreamMessage::Database(DatabaseMessage::Remove(id)) => {
databases.remove(&id);
}
StreamMessage::NewEphemeralThumbnailsFilenames(new_ephemeral_thumbs) => {
trace!("New ephemeral thumbnails: {}", new_ephemeral_thumbs.len());
ephemeral_file_names.extend(new_ephemeral_thumbs);
}
StreamMessage::BatchProgress((location_id, progressed)) => {
bookkeeper.add_progress(location_id, progressed).await;
}
StreamMessage::Shutdown(cancel_tx) => {
debug!("Thumbnail actor is shutting down...");
// First stopping the current batch processing
let (tx, rx) = oneshot::channel();
match stop_older_processing_tx.try_send(tx) {
Ok(()) => {
// We put a timeout here to avoid a deadlock in case the older processing already
// finished its batch
if timeout(ONE_SEC, rx).await.is_err() {
stop_older_processing_rx.recv().await.ok();
}
}
Err(e) if e.is_full() => {
// The last signal we sent happened after a batch was already processed
// So we clean the channel and we're good to go.
stop_older_processing_rx.recv().await.ok();
}
Err(_) => {
error!("Thumbnail actor died when trying to stop older processing");
}
}
// Closing the leftovers channel to stop the batch processor as we already sent
// an stop signal
leftovers_tx.close();
while let Some((batch, kind)) = shutdown_leftovers_rx.next().await {
match kind {
ThumbnailKind::Indexed(library_id) => {
indexed_leftovers_queue.push_back((batch, library_id))
}
ThumbnailKind::Ephemeral => ephemeral_leftovers_queue.push_back(batch),
}
}
// Consuming the last progress reports to keep everything up to date
shutdowm_batch_report_progress_rx.close();
while let Some((location_id, progressed)) =
shutdowm_batch_report_progress_rx.next().await
{
bookkeeper.add_progress(location_id, progressed).await;
}
// Saving state
ThumbsProcessingSaveState {
bookkeeper,
ephemeral_file_names,
queue,
indexed_leftovers_queue,
ephemeral_leftovers_queue,
}
.store(thumbnails_directory.as_ref())
.await;
// Signaling that we're done shutting down
cancel_tx.send(()).ok();
return;
}
StreamMessage::ProgressManagement((location_id, progress_tx)) => {
bookkeeper.register_reporter(location_id, progress_tx);
}
}
}
}

View File

@@ -1,76 +1,114 @@
use std::{
any::type_name,
fmt::Display,
future::Future,
num::ParseIntError,
path::{Path, PathBuf},
str::FromStr,
};
use int_enum::IntEnum;
use std::fs;
use std::io::prelude::*;
use std::path::Path;
use std::str::FromStr;
use itertools::Itertools;
use thiserror::Error;
use tokio::{fs, io};
use tracing::info;
use super::error::FileIOError;
#[derive(Error, Debug)]
pub enum VersionManagerError {
#[error("Invalid version")]
#[error("invalid version")]
InvalidVersion,
#[error("Version file does not exist")]
#[error("version file does not exist")]
VersionFileDoesNotExist,
#[error("Error while converting integer to enum")]
#[error("error while converting integer to enum")]
IntConversionError,
#[error("Malformed version file")]
#[error("malformed version file")]
MalformedVersionFile,
#[error("unexpected migration: {current_version} -> {next_version}")]
UnexpectedMigration {
current_version: i32,
next_version: i32,
},
#[error(transparent)]
IO(#[from] std::io::Error),
FileIO(#[from] FileIOError),
#[error(transparent)]
ParseIntError(#[from] std::num::ParseIntError),
ParseIntError(#[from] ParseIntError),
}
pub trait ManagedVersion: IntEnum<Int = i32> + Display + 'static {
const LATEST_VERSION: Self;
type MigrationError: std::error::Error + Display + From<VersionManagerError> + 'static;
}
///
/// An abstract system for saving a text file containing a version number.
/// The version number is an integer that can be converted to and from an enum.
/// The enum must implement the IntEnum trait.
///
pub struct VersionManager<T: IntEnum<Int = i32>> {
version_file_path: String,
pub struct VersionManager<T: ManagedVersion> {
version_file_path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
impl<T: IntEnum<Int = i32>> VersionManager<T> {
pub fn new(version_file_path: &str) -> Self {
impl<T: ManagedVersion> VersionManager<T> {
pub fn new(version_file_path: impl AsRef<Path>) -> Self {
VersionManager {
version_file_path: version_file_path.to_string(),
version_file_path: version_file_path.as_ref().into(),
_marker: std::marker::PhantomData,
}
}
pub fn get_version(&self) -> Result<T, VersionManagerError> {
if Path::new(&self.version_file_path).exists() {
let contents = fs::read_to_string(&self.version_file_path)?;
let version = i32::from_str(contents.trim())?;
T::from_int(version).map_err(|_| VersionManagerError::IntConversionError)
} else {
Err(VersionManagerError::VersionFileDoesNotExist)
pub async fn get_version(&self) -> Result<T, VersionManagerError> {
match fs::read_to_string(&self.version_file_path).await {
Ok(contents) => {
let version = i32::from_str(contents.trim())?;
T::from_int(version).map_err(|_| VersionManagerError::IntConversionError)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(VersionManagerError::VersionFileDoesNotExist)
}
Err(e) => Err(FileIOError::from((&self.version_file_path, e)).into()),
}
}
pub fn set_version(&self, version: T) -> Result<(), VersionManagerError> {
let mut file = fs::File::create(&self.version_file_path)?;
file.write_all(version.int_value().to_string().as_bytes())?;
Ok(())
pub async fn set_version(&self, version: T) -> Result<(), VersionManagerError> {
fs::write(
&self.version_file_path,
version.int_value().to_string().as_bytes(),
)
.await
.map_err(|e| FileIOError::from((&self.version_file_path, e)).into())
}
// pub async fn migrate<F: FnMut(T) -> Result<(), VersionManagerError>>(
// &self,
// current: T,
// latest: T,
// mut migrate_fn: F,
// ) -> Result<(), VersionManagerError> {
// for version_int in (current.int_value() + 1)..=latest.int_value() {
// let version = match T::from_int(version_int) {
// Ok(version) => version,
// Err(_) => return Err(VersionManagerError::IntConversionError),
// };
// migrate_fn(version)?;
// }
pub async fn migrate<Fut>(
&self,
current: T,
migrate_fn: impl Fn(T, T) -> Fut,
) -> Result<(), T::MigrationError>
where
Fut: Future<Output = Result<(), T::MigrationError>>,
{
for (current_version, next_version) in
(current.int_value()..=T::LATEST_VERSION.int_value()).tuple_windows()
{
match (T::from_int(current_version), T::from_int(next_version)) {
(Ok(current), Ok(next)) => {
info!(
"Running {} migrator: {} -> {}",
type_name::<T>(),
current,
next
);
migrate_fn(current, next).await?
}
(Err(_), _) | (_, Err(_)) => {
return Err(VersionManagerError::IntConversionError.into())
}
};
}
// self.set_version(latest)?;
// Ok(())
// }
self.set_version(T::LATEST_VERSION)
.await
.map_err(Into::into)
}
}

View File

@@ -15,7 +15,7 @@ use ffmpeg_sys_next::{
/// Error type for the library.
#[derive(Error, Debug)]
pub enum ThumbnailerError {
pub enum Error {
#[error("I/O Error: {0}")]
Io(#[from] std::io::Error),
#[error("Path conversion error: Path: {0:#?}")]

View File

@@ -13,7 +13,7 @@ mod thumbnailer;
mod utils;
mod video_frame;
pub use error::ThumbnailerError;
pub use error::Error;
pub use thumbnailer::{Thumbnailer, ThumbnailerBuilder};
/// Helper function to generate a thumbnail file from a video file with reasonable defaults
@@ -22,7 +22,7 @@ pub async fn to_thumbnail(
output_thumbnail_path: impl AsRef<Path>,
size: u32,
quality: f32,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
ThumbnailerBuilder::new()
.with_film_strip(false)
.size(size)
@@ -37,7 +37,7 @@ pub async fn to_webp_bytes(
video_file_path: impl AsRef<Path>,
size: u32,
quality: f32,
) -> Result<Vec<u8>, ThumbnailerError> {
) -> Result<Vec<u8>, Error> {
ThumbnailerBuilder::new()
.size(size)
.quality(quality)?

View File

@@ -1,5 +1,5 @@
use crate::{
error::{FfmpegError, ThumbnailerError},
error::{Error, FfmpegError},
utils::from_path,
video_frame::{FfmpegFrame, FrameSource, VideoFrame},
};
@@ -48,7 +48,7 @@ impl MovieDecoder {
pub(crate) fn new(
filename: impl AsRef<Path>,
prefer_embedded_metadata: bool,
) -> Result<Self, ThumbnailerError> {
) -> Result<Self, Error> {
let filename = filename.as_ref();
let input_file = if filename == Path::new("-") {
@@ -90,7 +90,7 @@ impl MovieDecoder {
)?;
}
e => {
return Err(ThumbnailerError::FfmpegWithReason(
return Err(Error::FfmpegWithReason(
FfmpegError::from(e),
"Failed to open input".to_string(),
))
@@ -102,7 +102,7 @@ impl MovieDecoder {
// This needs to remain at 100 or the app will force crash if it comes
// across a video with subtitles or any type of corruption.
if (*decoder.format_context).probe_score != AVPROBE_SCORE_MAX {
return Err(ThumbnailerError::CorruptVideo);
return Err(Error::CorruptVideo);
}
}
@@ -116,7 +116,7 @@ impl MovieDecoder {
Ok(decoder)
}
pub(crate) fn decode_video_frame(&mut self) -> Result<(), ThumbnailerError> {
pub(crate) fn decode_video_frame(&mut self) -> Result<(), Error> {
let mut frame_finished = false;
while !frame_finished && self.get_video_packet() {
@@ -124,7 +124,7 @@ impl MovieDecoder {
}
if !frame_finished {
return Err(ThumbnailerError::FrameDecodeError);
return Err(Error::FrameDecodeError);
}
Ok(())
@@ -134,7 +134,7 @@ impl MovieDecoder {
self.use_embedded_data
}
pub(crate) fn seek(&mut self, seconds: i64) -> Result<(), ThumbnailerError> {
pub(crate) fn seek(&mut self, seconds: i64) -> Result<(), Error> {
if !self.allow_seek {
return Ok(());
}
@@ -170,7 +170,7 @@ impl MovieDecoder {
}
if !got_frame {
return Err(ThumbnailerError::SeekError);
return Err(Error::SeekError);
}
Ok(())
@@ -181,7 +181,7 @@ impl MovieDecoder {
scaled_size: Option<ThumbnailSize>,
maintain_aspect_ratio: bool,
video_frame: &mut VideoFrame,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
self.initialize_filter_graph(
unsafe {
&(*(*(*self.format_context)
@@ -211,7 +211,7 @@ impl MovieDecoder {
attempts += 1;
}
if ret < 0 {
return Err(ThumbnailerError::FfmpegWithReason(
return Err(Error::FfmpegWithReason(
FfmpegError::from(ret),
"Failed to get buffer from filter".to_string(),
));
@@ -266,7 +266,7 @@ impl MovieDecoder {
Duration::from_secs(unsafe { (*self.format_context).duration as u64 / AV_TIME_BASE as u64 })
}
fn initialize_video(&mut self, prefer_embedded_metadata: bool) -> Result<(), ThumbnailerError> {
fn initialize_video(&mut self, prefer_embedded_metadata: bool) -> Result<(), Error> {
self.find_preferred_video_stream(prefer_embedded_metadata)?;
self.video_stream = unsafe {
@@ -309,10 +309,7 @@ impl MovieDecoder {
)
}
fn find_preferred_video_stream(
&mut self,
prefer_embedded_metadata: bool,
) -> Result<(), ThumbnailerError> {
fn find_preferred_video_stream(&mut self, prefer_embedded_metadata: bool) -> Result<(), Error> {
let mut video_streams = vec![];
let mut embedded_data_streams = vec![];
let empty_cstring = CString::new("").unwrap();
@@ -404,7 +401,7 @@ impl MovieDecoder {
frame_decoded
}
fn decode_video_packet(&self) -> Result<bool, ThumbnailerError> {
fn decode_video_packet(&self) -> Result<bool, Error> {
if unsafe { (*self.packet).stream_index } != self.video_stream_index {
return Ok(false);
}
@@ -414,7 +411,7 @@ impl MovieDecoder {
if ret == AVERROR_EOF {
return Ok(false);
} else if ret < 0 {
return Err(ThumbnailerError::FfmpegWithReason(
return Err(Error::FfmpegWithReason(
FfmpegError::from(ret),
"Failed to send packet to decoder".to_string(),
));
@@ -423,7 +420,7 @@ impl MovieDecoder {
match unsafe { avcodec_receive_frame(self.video_codec_context, self.frame) } {
0 => Ok(true),
e if e != AVERROR(EAGAIN) => Err(ThumbnailerError::FfmpegWithReason(
e if e != AVERROR(EAGAIN) => Err(Error::FfmpegWithReason(
FfmpegError::from(e),
"Failed to receive frame from decoder".to_string(),
)),
@@ -437,7 +434,7 @@ impl MovieDecoder {
timebase: &AVRational,
scaled_size: Option<ThumbnailSize>,
maintain_aspect_ratio: bool,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
unsafe { self.filter_graph = avfilter_graph_alloc() };
if self.filter_graph.is_null() {
return Err(FfmpegError::FilterGraphAllocation.into());
@@ -689,9 +686,9 @@ impl Drop for MovieDecoder {
}
}
fn check_error(return_code: i32, error_message: &str) -> Result<(), ThumbnailerError> {
fn check_error(return_code: i32, error_message: &str) -> Result<(), Error> {
if return_code < 0 {
Err(ThumbnailerError::FfmpegWithReason(
Err(Error::FfmpegWithReason(
FfmpegError::from(return_code),
error_message.to_string(),
))
@@ -707,7 +704,7 @@ fn setup_filter(
args: &str,
graph_ctx: *mut AVFilterGraph,
error_message: &str,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
let filter_name_cstr = CString::new(filter_name).expect("CString from str");
let filter_setup_name_cstr = CString::new(filter_setup_name).expect("CString from str");
let args_cstr = CString::new(args).expect("CString from str");
@@ -733,7 +730,7 @@ fn setup_filter_without_args(
filter_setup_name: &str,
graph_ctx: *mut AVFilterGraph,
error_message: &str,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
let filter_name_cstr = CString::new(filter_name).unwrap();
let filter_setup_name_cstr = CString::new(filter_setup_name).unwrap();

View File

@@ -1,4 +1,4 @@
use crate::{film_strip_filter, MovieDecoder, ThumbnailSize, ThumbnailerError, VideoFrame};
use crate::{film_strip_filter, Error, MovieDecoder, ThumbnailSize, VideoFrame};
use std::{io, ops::Deref, path::Path};
use tokio::{fs, task::spawn_blocking};
@@ -18,7 +18,7 @@ impl Thumbnailer {
&self,
video_file_path: impl AsRef<Path>,
output_thumbnail_path: impl AsRef<Path>,
) -> Result<(), ThumbnailerError> {
) -> Result<(), Error> {
let path = output_thumbnail_path.as_ref().parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
@@ -40,7 +40,7 @@ impl Thumbnailer {
pub async fn process_to_webp_bytes(
&self,
video_file_path: impl AsRef<Path>,
) -> Result<Vec<u8>, ThumbnailerError> {
) -> Result<Vec<u8>, Error> {
let video_file_path = video_file_path.as_ref().to_path_buf();
let prefer_embedded_metadata = self.builder.prefer_embedded_metadata;
let seek_percentage = self.builder.seek_percentage;
@@ -49,7 +49,7 @@ impl Thumbnailer {
let with_film_strip = self.builder.with_film_strip;
let quality = self.builder.quality;
spawn_blocking(move || -> Result<Vec<u8>, ThumbnailerError> {
spawn_blocking(move || -> Result<Vec<u8>, Error> {
let mut decoder = MovieDecoder::new(video_file_path.clone(), prefer_embedded_metadata)?;
// We actually have to decode a frame to get some metadata before we can start decoding for real
decoder.decode_video_frame()?;
@@ -149,18 +149,18 @@ impl ThumbnailerBuilder {
}
/// Seek percentage must be a value between 0.0 and 1.0
pub fn seek_percentage(mut self, seek_percentage: f32) -> Result<Self, ThumbnailerError> {
pub fn seek_percentage(mut self, seek_percentage: f32) -> Result<Self, Error> {
if !(0.0..=1.0).contains(&seek_percentage) {
return Err(ThumbnailerError::InvalidSeekPercentage(seek_percentage));
return Err(Error::InvalidSeekPercentage(seek_percentage));
}
self.seek_percentage = seek_percentage;
Ok(self)
}
/// Quality must be a value between 0.0 and 100.0
pub fn quality(mut self, quality: f32) -> Result<Self, ThumbnailerError> {
pub fn quality(mut self, quality: f32) -> Result<Self, Error> {
if !(0.0..=100.0).contains(&quality) {
return Err(ThumbnailerError::InvalidQuality(quality));
return Err(Error::InvalidQuality(quality));
}
self.quality = quality;
Ok(self)

View File

@@ -1,22 +1,21 @@
use crate::error::ThumbnailerError;
use crate::error::Error;
use std::ffi::CString;
use std::path::Path;
pub fn from_path(path: impl AsRef<Path>) -> Result<CString, ThumbnailerError> {
pub fn from_path(path: impl AsRef<Path>) -> Result<CString, Error> {
let path = path.as_ref();
let path_str = path.as_os_str();
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
CString::new(path_str.as_bytes())
.map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf()))
CString::new(path_str.as_bytes()).map_err(|_| Error::PathConversion(path.to_path_buf()))
}
#[cfg(not(unix))]
{
path_str
.to_str()
.and_then(|str| CString::new(str.as_bytes()).ok())
.ok_or(ThumbnailerError::PathConversion(path.to_path_buf()))
.ok_or(Error::PathConversion(path.to_path_buf()))
}
}

View File

@@ -241,9 +241,9 @@ export type InvalidateOperationEvent = { type: "single"; data: SingleInvalidateO
export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] }
export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; message: string; estimated_completion: string }
export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string }
export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; message: string; estimated_completion: string }
export type JobReport = { id: string; name: string; action: string | null; data: number[] | null; metadata: any | null; is_background: boolean; errors_text: string[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: JobStatus; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused" | "CompletedWithErrors"

View File

@@ -23,6 +23,7 @@ export function useJobInfo(job: JobReport, realtimeUpdate: JobProgressEvent | nu
indexedPath = job.metadata?.data?.location.path,
taskCount = realtimeUpdate?.task_count || job.task_count,
completedTaskCount = realtimeUpdate?.completed_task_count || job.completed_task_count,
phase = realtimeUpdate?.phase,
meta = job.metadata,
output = meta?.output?.run_metadata;
@@ -59,35 +60,75 @@ export function useJobInfo(job: JobReport, realtimeUpdate: JobProgressEvent | nu
]
]
};
case 'media_processor':
case 'media_processor': {
const generateTexts = () => {
switch (phase) {
case 'media_data': {
return [
{
text: `${
completedTaskCount
? formatNumber(completedTaskCount || 0)
: formatNumber(output?.media_data?.extracted)
} of ${formatNumber(taskCount)} ${plural(
taskCount,
'media file'
)} processed`
}
];
}
case 'thumbnails': {
return [
{
text: `${
completedTaskCount
? formatNumber(completedTaskCount || 0)
: formatNumber(output?.thumbs_processed)
} of ${formatNumber(taskCount)} ${plural(
taskCount,
'thumbnail'
)} generated`
}
];
}
default: {
// If we don't have a phase set, then we're done
const totalThumbs = output?.thumbs_processed || 0;
const totalMediaFiles =
output?.media_data?.extracted || 0 + output?.media_data?.skipped || 0;
return totalThumbs === 0 && totalMediaFiles === 0
? [{ text: 'None processed' }]
: [
{
text: `Extracted ${formatNumber(totalMediaFiles)} ${plural(
totalMediaFiles,
'media file'
)}`
},
{
text: `Generated ${formatNumber(totalThumbs)} ${plural(
totalThumbs,
'thumb'
)}`
}
];
}
}
};
return {
...data,
name: `${
isQueued ? 'Process' : isRunning ? 'Processing' : 'Processed'
} media files`,
textItems: [
[
{
text:
output?.thumbnails_created === 0
? 'None processed'
: `${
completedTaskCount
? formatNumber(completedTaskCount || 0)
: formatNumber(output?.thumbnails_created)
} of ${formatNumber(taskCount)} ${plural(
taskCount,
'media file'
)} processed`
},
{
text:
output?.thumbnails_skipped &&
`${output?.thumbnails_skipped} already exist`
}
]
]
textItems: [generateTexts()]
};
}
case 'file_identifier':
return {
...data,