From 874faa4e55af90672b93507bfc4bbd2122c968d6 Mon Sep 17 00:00:00 2001 From: jake <77554505+brxken128@users.noreply.github.com> Date: Tue, 25 Jul 2023 15:21:50 +0100 Subject: [PATCH] [ENG-926] Prevent thumbnail destruction and fix the remover (#1127) * fix(core): thumbnail removal * chore(core): add todo * New actor on steroids * Improving thumbnail remover actor * Ignoring errors from files that doesn't exist --------- Co-authored-by: Ericson Soares --- Cargo.lock | Bin 245126 -> 246748 bytes core/Cargo.toml | 3 + core/src/library/library.rs | 15 +- core/src/library/manager.rs | 23 +- core/src/location/file_path_helper/mod.rs | 17 +- core/src/location/indexer/indexer_job.rs | 12 +- core/src/location/indexer/mod.rs | 47 +++- core/src/location/indexer/shallow.rs | 11 +- core/src/location/indexer/walk.rs | 24 +- core/src/location/mod.rs | 2 +- core/src/object/fs/delete.rs | 1 - core/src/object/preview/thumbnail/mod.rs | 24 +- core/src/object/thumbnail_remover.rs | 314 +++++++++++++++++----- 13 files changed, 364 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46e83631d7c6c8bc1523b1e82e97c175c61b346a..f53f24e31da118b725e66b93095ef3a454b90b9c 100644 GIT binary patch delta 762 zcmX|<&r4KM6vuhLEWL5wNbrZ!y-THV9t-t+yQ&-Xn4bK&dO#l7XMJRDAz+h6X_ zOHiCzP86jghuk|Sob;5-Af<4MOCSPKaVx-pV#o~w1!h9fOt^A=ce*^@9G{pQZDhXd zi}tlg&5gD%e5-=dWU~C#o$C8+H#^g8cfBo#bu(`)k(5}%2vsHr29674#wb7*loL`* z!wn4tIi&!Kpamp0$_4bQN|us(BXo*(FFO%!Y1izmoR)39c6x619+&UcZ2$MIy*Fum zSDx?aMK2uJlt|#((?BWBNt`Y?Yl!uLQjj36;8aj&gNrys&SY*hgt1ho3Gv(*wk!Qn$!SEotyHx9}RO|gjFFH$U z2*fdk)Nznb66!?k7P&^Ol7i<1EOMuLOwUSP#1-N zl+9J&w`)(U!zanem|gup?7+XM@mjStlT25auB79G{>$1Yo2(NN#}0 zNRTlMjBthC3FVZEmr|JMP-<`ikfPAJLMN&CXQT78_If{!!ByXSsb0u=i2Q@iP23VC zXvB#!I4wER;Fb20I3E$CA?UPc21CpbKn6skd))p?T#U1 diff --git a/core/Cargo.toml b/core/Cargo.toml index b6a40d3e6..f737ca3d2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -97,6 +97,9 @@ regex = "1.8.4" hex = "0.4.3" int-enum = "0.5.0" tokio-stream = "0.1.14" +futures-concurrency = "7.3" +async-channel = "1.9" +tokio-util = "0.7" [target.'cfg(target_os = "macos")'.dependencies] plist = "1" diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 63aa7b444..ed61c917d 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -9,9 +9,8 @@ use crate::{ }, node::NodeConfigManager, object::{ - orphan_remover::OrphanRemoverActor, - preview::{get_thumbnail_path, THUMBNAIL_CACHE_DIR_NAME}, - thumbnail_remover::ThumbnailRemoverActor, + orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path, + thumbnail_remover::ThumbnailRemoverActorProxy, }, prisma::{file_path, location, PrismaClient}, util::{db::maybe_missing, error::FileIOError}, @@ -51,7 +50,7 @@ pub struct Library { /// p2p identity pub identity: Arc, pub orphan_remover: OrphanRemoverActor, - pub thumbnail_remover: ThumbnailRemoverActor, + pub thumbnail_remover_proxy: ThumbnailRemoverActorProxy, } impl Debug for Library { @@ -81,13 +80,7 @@ impl Library { let library = Self { orphan_remover: OrphanRemoverActor::spawn(db.clone()), - thumbnail_remover: ThumbnailRemoverActor::spawn( - db.clone(), - node_context - .config - .data_directory() - .join(THUMBNAIL_CACHE_DIR_NAME), - ), + thumbnail_remover_proxy: library_manager.thumbnail_remover_proxy(), id, db, config, diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index 8e906eea5..baac310d5 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -2,7 +2,11 @@ use crate::{ invalidate_query, location::{indexer, LocationManagerError}, node::{NodeConfig, Platform}, - object::tag, + object::{ + preview::get_thumbnails_directory, + tag, + thumbnail_remover::{ThumbnailRemoverActor, ThumbnailRemoverActorProxy}, + }, prisma::location, util::{ db::{self, MissingFieldError}, @@ -37,6 +41,8 @@ pub struct LibraryManager { libraries: RwLock>>, /// node_context holds the context for the node which this library manager is running on. pub node_context: Arc, + /// An actor that removes stale thumbnails from the file system + thumbnail_remover: ThumbnailRemoverActor, } #[derive(Error, Debug)] @@ -101,6 +107,7 @@ impl LibraryManager { let this = Arc::new(Self { libraries_dir: libraries_dir.clone(), libraries: Default::default(), + thumbnail_remover: ThumbnailRemoverActor::new(get_thumbnails_directory(&node_context)), node_context, }); @@ -124,7 +131,11 @@ impl LibraryManager { .file_stem() .and_then(|v| v.to_str().map(Uuid::from_str)) else { - warn!("Attempted to load library from path '{}' but it has an invalid filename. Skipping...", config_path.display()); + warn!( + "Attempted to load library from path '{}' \ + but it has an invalid filename. Skipping...", + config_path.display() + ); continue; }; @@ -149,6 +160,10 @@ impl LibraryManager { Ok(this) } + pub fn thumbnail_remover_proxy(&self) -> ThumbnailRemoverActorProxy { + self.thumbnail_remover.proxy() + } + /// create creates a new library with the given config and mounts it into the running [LibraryManager]. pub(crate) async fn create( self: &Arc, @@ -325,6 +340,7 @@ impl LibraryManager { invalidate_query!(library, "library.list"); + self.thumbnail_remover.remove_library(id).await; self.libraries.write().await.retain(|l| l.id != id); Ok(()) @@ -417,7 +433,8 @@ impl LibraryManager { self.clone(), )); - self.libraries.write().await.push(library.clone()); + self.thumbnail_remover.new_library(&library).await; + self.libraries.write().await.push(Arc::clone(&library)); if should_seed { library.orphan_remover.invoke().await; diff --git a/core/src/location/file_path_helper/mod.rs b/core/src/location/file_path_helper/mod.rs index 66c9f5ab4..8e5775e9b 100644 --- a/core/src/location/file_path_helper/mod.rs +++ b/core/src/location/file_path_helper/mod.rs @@ -5,7 +5,6 @@ use crate::{ use std::{ fs::Metadata, - hash::{Hash, Hasher}, path::{Path, PathBuf, MAIN_SEPARATOR_STR}, time::SystemTime, }; @@ -24,7 +23,7 @@ pub use isolated_file_path_data::{ }; // File Path selectables! -file_path::select!(file_path_just_pub_id { pub_id }); +file_path::select!(file_path_pub_and_cas_ids { pub_id cas_id }); file_path::select!(file_path_just_pub_id_materialized_path { pub_id materialized_path @@ -104,20 +103,6 @@ file_path::select!(file_path_to_full_path { // File Path includes! file_path::include!(file_path_with_object { object }); -impl Hash for file_path_just_pub_id::Data { - fn hash(&self, state: &mut H) { - self.pub_id.hash(state); - } -} - -impl PartialEq for file_path_just_pub_id::Data { - fn eq(&self, other: &Self) -> bool { - self.pub_id == other.pub_id - } -} - -impl Eq for file_path_just_pub_id::Data {} - #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct FilePathMetadata { pub inode: u64, diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index f13be2213..8c094a570 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -196,6 +196,17 @@ impl StatefulJob for IndexerJobInit { ) .await?; let scan_read_time = scan_start.elapsed(); + let to_remove = to_remove.collect::>(); + + ctx.library + .thumbnail_remover_proxy + .remove_cas_ids( + to_remove + .iter() + .filter_map(|file_path| file_path.cas_id.clone()) + .collect::>(), + ) + .await; let db_delete_start = Instant::now(); // TODO pass these uuids to sync system @@ -451,7 +462,6 @@ impl StatefulJob for IndexerJobInit { if run_metadata.total_updated_paths > 0 { // Invoking orphan remover here as we probably have some orphans objects due to updates ctx.library.orphan_remover.invoke().await; - ctx.library.thumbnail_remover.invoke().await; } Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 7a0497e7c..2289f1d98 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -19,7 +19,7 @@ use thiserror::Error; use tracing::trace; use super::{ - file_path_helper::{file_path_just_pub_id, FilePathError, IsolatedFilePathData}, + file_path_helper::{file_path_pub_and_cas_ids, FilePathError, IsolatedFilePathData}, location_with_indexer_rules, }; @@ -280,7 +280,7 @@ fn iso_file_path_factory( } async fn remove_non_existing_file_paths( - to_remove: impl IntoIterator, + to_remove: impl IntoIterator, db: &PrismaClient, ) -> Result { db.file_path() @@ -333,6 +333,25 @@ macro_rules! file_paths_db_fetcher_fn { macro_rules! to_remove_db_fetcher_fn { ($location_id:expr, $db:expr) => {{ |iso_file_path, unique_location_id_materialized_path_name_extension_params| async { + struct PubAndCasId { + pub_id: ::uuid::Uuid, + maybe_cas_id: Option, + } + + impl ::std::hash::Hash for PubAndCasId { + fn hash(&self, state: &mut H) { + self.pub_id.hash(state); + } + } + + impl ::std::cmp::PartialEq for PubAndCasId { + fn eq(&self, other: &Self) -> bool { + self.pub_id == other.pub_id + } + } + + impl ::std::cmp::Eq for PubAndCasId {} + let iso_file_path: $crate::location::file_path_helper::IsolatedFilePathData<'static> = iso_file_path; @@ -354,7 +373,9 @@ macro_rules! to_remove_db_fetcher_fn { ::prisma_client_rust::operator::or(unique_params.collect()), ]), ]) - .select($crate::location::file_path_helper::file_path_just_pub_id::select()) + .select( + $crate::location::file_path_helper::file_path_pub_and_cas_ids::select(), + ) }) .collect::<::std::vec::Vec<_>>(); @@ -367,9 +388,10 @@ macro_rules! to_remove_db_fetcher_fn { .map(|fetched_vec| { fetched_vec .into_iter() - .map(|fetched| { - ::uuid::Uuid::from_slice(&fetched.pub_id) - .expect("file_path.pub_id is invalid!") + .map(|fetched| PubAndCasId { + pub_id: ::uuid::Uuid::from_slice(&fetched.pub_id) + .expect("file_path.pub_id is invalid!"), + maybe_cas_id: fetched.cas_id, }) .collect::<::std::collections::HashSet<_>>() }) @@ -377,19 +399,20 @@ macro_rules! to_remove_db_fetcher_fn { let mut intersection = ::std::collections::HashSet::new(); while let Some(set) = sets.pop() { - for pub_id in set { + for pub_and_cas_ids in set { // Remove returns true if the element was present in the set - if sets.iter_mut().all(|set| set.remove(&pub_id)) { - intersection.insert(pub_id); + if sets.iter_mut().all(|set| set.remove(&pub_and_cas_ids)) { + intersection.insert(pub_and_cas_ids); } } } intersection .into_iter() - .map(|pub_id| { - $crate::location::file_path_helper::file_path_just_pub_id::Data { - pub_id: pub_id.as_bytes().to_vec(), + .map(|pub_and_cas_ids| { + $crate::location::file_path_helper::file_path_pub_and_cas_ids::Data { + pub_id: pub_and_cas_ids.pub_id.as_bytes().to_vec(), + cas_id: pub_and_cas_ids.maybe_cas_id, } }) .collect() diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index bfbe0f94a..003766d41 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -80,6 +80,16 @@ pub async fn shallow( .await? }; + library + .thumbnail_remover_proxy + .remove_cas_ids( + to_remove + .iter() + .filter_map(|file_path| file_path.cas_id.clone()) + .collect::>(), + ) + .await; + errors.into_iter().for_each(|e| error!("{e}")); // TODO pass these uuids to sync system @@ -116,7 +126,6 @@ pub async fn shallow( invalidate_query!(library, "search.paths"); library.orphan_remover.invoke().await; - library.thumbnail_remover.invoke().await; Ok(()) } diff --git a/core/src/location/indexer/walk.rs b/core/src/location/indexer/walk.rs index 85b370055..3f44588b1 100644 --- a/core/src/location/indexer/walk.rs +++ b/core/src/location/indexer/walk.rs @@ -1,6 +1,6 @@ use crate::{ location::file_path_helper::{ - file_path_just_pub_id, file_path_walker, FilePathMetadata, IsolatedFilePathData, + file_path_pub_and_cas_ids, file_path_walker, FilePathMetadata, IsolatedFilePathData, MetadataExt, }, prisma::file_path, @@ -109,7 +109,7 @@ pub struct WalkResult where Walked: Iterator, ToUpdate: Iterator, - ToRemove: Iterator, + ToRemove: Iterator, { pub walked: Walked, pub to_update: ToUpdate, @@ -136,13 +136,14 @@ pub(super) async fn walk( WalkResult< impl Iterator, impl Iterator, - impl Iterator, + impl Iterator, >, IndexerError, > where FilePathDBFetcherFut: Future, IndexerError>>, - ToRemoveDbFetcherFut: Future, IndexerError>>, + ToRemoveDbFetcherFut: + Future, IndexerError>>, { let root = root.as_ref(); @@ -204,13 +205,14 @@ pub(super) async fn keep_walking( WalkResult< impl Iterator, impl Iterator, - impl Iterator, + impl Iterator, >, IndexerError, > where FilePathDBFetcherFut: Future, IndexerError>>, - ToRemoveDbFetcherFut: Future, IndexerError>>, + ToRemoveDbFetcherFut: + Future, IndexerError>>, { let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY); let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); @@ -259,14 +261,15 @@ pub(super) async fn walk_single_dir( ( impl Iterator, impl Iterator, - Vec, + Vec, Vec, ), IndexerError, > where FilePathDBFetcherFut: Future, IndexerError>>, - ToRemoveDbFetcherFut: Future, IndexerError>>, + ToRemoveDbFetcherFut: + Future, IndexerError>>, { let root = root.as_ref(); @@ -428,9 +431,10 @@ async fn inner_walk_single_dir( mut maybe_to_walk, errors, }: WorkingTable<'_>, -) -> Vec +) -> Vec where - ToRemoveDbFetcherFut: Future, IndexerError>>, + ToRemoveDbFetcherFut: + Future, IndexerError>>, { let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e)) else { diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index ef309bb89..965c4e891 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -713,7 +713,7 @@ pub async fn delete_directory( db.file_path().delete_many(children_params).exec().await?; library.orphan_remover.invoke().await; - library.thumbnail_remover.invoke().await; + invalidate_query!(library, "search.paths"); Ok(()) diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index e35767ea1..2aa0c338c 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -99,7 +99,6 @@ impl StatefulJob for FileDeleterJobInit { invalidate_query!(ctx.library, "search.paths"); ctx.library.orphan_remover.invoke().await; - ctx.library.thumbnail_remover.invoke().await; Ok(Some(json!({ "init": init }))) } diff --git a/core/src/object/preview/thumbnail/mod.rs b/core/src/object/preview/thumbnail/mod.rs index 10e676cc5..3fcb0ba99 100644 --- a/core/src/object/preview/thumbnail/mod.rs +++ b/core/src/object/preview/thumbnail/mod.rs @@ -5,6 +5,7 @@ use crate::{ location::file_path_helper::{file_path_for_thumbnailer, FilePathError, IsolatedFilePathData}, prisma::location, util::{db::maybe_missing, error::FileIOError, version_manager::VersionManagerError}, + NodeContext, }; use std::{ @@ -41,13 +42,22 @@ pub const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails"; /// This does not check if a thumbnail exists, it just returns the path that it would exist at pub fn get_thumbnail_path(library: &Library, cas_id: &str) -> PathBuf { - library - .config() - .data_directory() - .join(THUMBNAIL_CACHE_DIR_NAME) - .join(get_shard_hex(cas_id)) - .join(cas_id) - .with_extension("webp") + let mut thumb_path = library.config().data_directory(); + + thumb_path.push(THUMBNAIL_CACHE_DIR_NAME); + thumb_path.push(get_shard_hex(cas_id)); + thumb_path.push(cas_id); + thumb_path.set_extension("webp"); + + thumb_path +} + +pub fn get_thumbnails_directory(node_ctx: &NodeContext) -> PathBuf { + let mut thumb_path = node_ctx.config.data_directory(); + + thumb_path.push(THUMBNAIL_CACHE_DIR_NAME); + + thumb_path } // this is used to pass the relevant data to the frontend so it can request the thumbnail diff --git a/core/src/object/thumbnail_remover.rs b/core/src/object/thumbnail_remover.rs index e46a47e5e..830a95406 100644 --- a/core/src/object/thumbnail_remover.rs +++ b/core/src/object/thumbnail_remover.rs @@ -1,21 +1,32 @@ use crate::{ + library::Library, prisma::{file_path, PrismaClient}, util::error::{FileIOError, NonUtf8PathError}, }; -use std::{collections::HashSet, ffi::OsStr, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + ffi::OsStr, + path::{Path, PathBuf}, + pin::pin, + sync::Arc, + time::Duration, +}; -use futures::future::try_join_all; +use async_channel as chan; +use futures::{stream::FuturesUnordered, FutureExt}; +use futures_concurrency::{future::TryJoin, stream::Merge}; use thiserror::Error; use tokio::{ - fs, select, - sync::mpsc, - time::{interval_at, Instant, MissedTickBehavior}, + fs, io, + time::{interval, MissedTickBehavior}, }; -use tracing::error; +use tokio_stream::{wrappers::IntervalStream, StreamExt}; +use tokio_util::sync::{CancellationToken, DropGuard}; +use tracing::{debug, error, trace}; +use uuid::Uuid; -const TEN_SECONDS: Duration = Duration::from_secs(10); -const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60); +const HALF_HOUR: Duration = Duration::from_secs(30 * 60); #[derive(Error, Debug)] enum ThumbnailRemoverActorError { @@ -30,53 +41,221 @@ enum ThumbnailRemoverActorError { } #[derive(Clone)] +pub struct ThumbnailRemoverActorProxy { + cas_ids_to_delete_tx: chan::Sender>, + non_indexed_thumbnails_cas_ids_tx: chan::Sender, +} + +impl ThumbnailRemoverActorProxy { + pub async fn new_non_indexed_thumbnail(&self, cas_id: String) { + if self + .non_indexed_thumbnails_cas_ids_tx + .send(cas_id) + .await + .is_err() + { + error!("Thumbnail remover actor is dead"); + } + } + + pub async fn remove_cas_ids(&self, cas_ids: Vec) { + if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() { + error!("Thumbnail remover actor is dead"); + } + } +} + +enum DatabaseMessage { + Add(Uuid, Arc), + Remove(Uuid), +} + pub struct ThumbnailRemoverActor { - tx: mpsc::Sender<()>, + databases_tx: chan::Sender, + cas_ids_to_delete_tx: chan::Sender>, + non_indexed_thumbnails_cas_ids_tx: chan::Sender, + _cancel_loop: DropGuard, } impl ThumbnailRemoverActor { - pub fn spawn(db: Arc, thumbnails_directory: impl AsRef) -> Self { - let (tx, mut rx) = mpsc::channel(4); + pub fn new(thumbnails_directory: impl AsRef) -> Self { let thumbnails_directory = thumbnails_directory.as_ref().to_path_buf(); + let (databases_tx, databases_rx) = chan::bounded(4); + let (non_indexed_thumbnails_cas_ids_tx, non_indexed_thumbnails_cas_ids_rx) = + chan::unbounded(); + let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16); + let cancel_token = CancellationToken::new(); + let inner_cancel_token = cancel_token.child_token(); tokio::spawn(async move { - let mut last_checked = Instant::now(); - - let mut check_interval = interval_at(Instant::now() + FIVE_MINUTES, FIVE_MINUTES); - check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - // Here we wait for a signal or for the tick interval to be reached - select! { - _ = check_interval.tick() => {} - signal = rx.recv() => { - if signal.is_none() { - break; - } - } + if let Err(e) = tokio::spawn(Self::worker( + thumbnails_directory.clone(), + databases_rx.clone(), + cas_ids_to_delete_rx.clone(), + non_indexed_thumbnails_cas_ids_rx.clone(), + inner_cancel_token.child_token(), + )) + .await + { + error!( + "Error on Thumbnail Remover Actor; \ + Error: {e}; \ + Restarting the worker loop...", + ); } - - // For any of them we process a clean up if a time since the last one already passed - if last_checked.elapsed() > TEN_SECONDS { - if let Err(e) = Self::process_clean_up(&db, &thumbnails_directory).await { - error!("Got an error when trying to clean stale thumbnails: {e:#?}"); - } - last_checked = Instant::now(); + if inner_cancel_token.is_cancelled() { + break; } } }); - Self { tx } + Self { + databases_tx, + cas_ids_to_delete_tx, + non_indexed_thumbnails_cas_ids_tx, + _cancel_loop: cancel_token.drop_guard(), + } } - pub async fn invoke(&self) { - self.tx.send(()).await.ok(); + pub async fn new_library(&self, Library { id, db, .. }: &Library) { + if self + .databases_tx + .send(DatabaseMessage::Add(*id, Arc::clone(db))) + .await + .is_err() + { + error!("Thumbnail remover actor is dead") + } + } + + pub async fn remove_library(&self, library_id: Uuid) { + if self + .databases_tx + .send(DatabaseMessage::Remove(library_id)) + .await + .is_err() + { + error!("Thumbnail remover actor is dead") + } + } + + pub fn proxy(&self) -> ThumbnailRemoverActorProxy { + ThumbnailRemoverActorProxy { + cas_ids_to_delete_tx: self.cas_ids_to_delete_tx.clone(), + non_indexed_thumbnails_cas_ids_tx: self.non_indexed_thumbnails_cas_ids_tx.clone(), + } + } + + async fn worker( + thumbnails_directory: PathBuf, + databases_rx: chan::Receiver, + cas_ids_to_delete_rx: chan::Receiver>, + non_indexed_thumbnails_cas_ids_rx: chan::Receiver, + cancel_token: CancellationToken, + ) { + let mut check_interval = interval(HALF_HOUR); + check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut databases = HashMap::new(); + let mut non_indexed_thumbnails_cas_ids = HashSet::new(); + + enum StreamMessage { + Run, + ToDelete(Vec), + Database(DatabaseMessage), + NonIndexedThumbnail(String), + Stop, + } + + let cancel = pin!(cancel_token.cancelled()); + + let mut msg_stream = ( + databases_rx.map(StreamMessage::Database), + cas_ids_to_delete_rx.map(StreamMessage::ToDelete), + non_indexed_thumbnails_cas_ids_rx.map(StreamMessage::NonIndexedThumbnail), + IntervalStream::new(check_interval).map(|_| StreamMessage::Run), + cancel.into_stream().map(|()| StreamMessage::Stop), + ) + .merge(); + + while let Some(msg) = msg_stream.next().await { + match msg { + StreamMessage::Run => { + // For any of them we process a clean up if a time since the last one already passed + if !databases.is_empty() { + if let Err(e) = Self::process_clean_up( + &thumbnails_directory, + databases.values(), + &non_indexed_thumbnails_cas_ids, + ) + .await + { + error!("Got an error when trying to clean stale thumbnails: {e:#?}"); + } + } + } + StreamMessage::ToDelete(cas_ids) => { + if let Err(e) = Self::remove_by_cas_ids(&thumbnails_directory, cas_ids).await { + error!("Got an error when trying to remove thumbnails: {e:#?}"); + } + } + + StreamMessage::Database(DatabaseMessage::Add(id, db)) => { + databases.insert(id, db); + } + StreamMessage::Database(DatabaseMessage::Remove(id)) => { + databases.remove(&id); + } + StreamMessage::NonIndexedThumbnail(cas_id) => { + non_indexed_thumbnails_cas_ids.insert(cas_id); + } + StreamMessage::Stop => { + debug!("Thumbnail remover actor is stopping"); + break; + } + } + } + } + + async fn remove_by_cas_ids( + thumbnails_directory: &Path, + cas_ids: Vec, + ) -> Result<(), ThumbnailRemoverActorError> { + cas_ids + .into_iter() + .map(|cas_id| async move { + let thumbnail_path = + thumbnails_directory.join(format!("{}/{}.webp", &cas_id[0..2], &cas_id[2..])); + + trace!("Removing thumbnail: {}", thumbnail_path.display()); + + match fs::remove_file(&thumbnail_path).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(FileIOError::from((thumbnail_path, e))), + } + }) + .collect::>() + .try_join() + .await?; + + Ok(()) } async fn process_clean_up( - db: &PrismaClient, thumbnails_directory: &Path, + databases: impl Iterator>, + non_indexed_thumbnails_cas_ids: &HashSet, ) -> Result<(), ThumbnailRemoverActorError> { + let databases = databases.collect::>(); + + // Thumbnails directory have the following structure: + // thumbnails/ + // ├── version.txt + //└── [0..2]/ # sharding + // └── [2..].webp + let mut read_dir = fs::read_dir(thumbnails_directory) .await .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; @@ -104,7 +283,7 @@ impl ThumbnailRemoverActor { .to_str() .ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?; - let mut thumbnails_paths_by_cas_id = Vec::new(); + let mut thumbnails_paths_by_cas_id = HashMap::new(); let mut entry_read_dir = fs::read_dir(&entry_path) .await @@ -134,7 +313,7 @@ impl ThumbnailRemoverActor { .ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?; thumbnails_paths_by_cas_id - .push((format!("{}{}", entry_path_name, thumbnail_name), thumb_path)); + .insert(format!("{}{}", entry_path_name, thumbnail_name), thumb_path); } if thumbnails_paths_by_cas_id.is_empty() { @@ -145,38 +324,41 @@ impl ThumbnailRemoverActor { continue; } - let thumbs_in_db = db - .file_path() - .find_many(vec![file_path::cas_id::in_vec( - thumbnails_paths_by_cas_id - .iter() - .map(|(cas_id, _)| cas_id) - .cloned() - .collect(), - )]) - .select(file_path::select!({ cas_id })) - .exec() - .await? - .into_iter() - .map(|file_path| { - file_path - .cas_id - .expect("only file paths with a cas_id were queried") + let mut thumbs_in_db_futs = databases + .iter() + .map(|db| { + db.file_path() + .find_many(vec![file_path::cas_id::in_vec( + thumbnails_paths_by_cas_id.keys().cloned().collect(), + )]) + .select(file_path::select!({ cas_id })) + .exec() }) - .collect::>(); + .collect::>(); - try_join_all( - thumbnails_paths_by_cas_id + while let Some(maybe_thumbs_in_db) = thumbs_in_db_futs.next().await { + maybe_thumbs_in_db? .into_iter() - .filter_map(|(cas_id, path)| { - (!thumbs_in_db.contains(&cas_id)).then_some(async move { - fs::remove_file(&path) - .await - .map_err(|e| FileIOError::from((path, e))) - }) - }), - ) - .await?; + .filter_map(|file_path| file_path.cas_id) + .for_each(|cas_id| { + thumbnails_paths_by_cas_id.remove(&cas_id); + }); + } + + thumbnails_paths_by_cas_id + .retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id)); + + thumbnails_paths_by_cas_id + .into_values() + .map(|path| async move { + trace!("Removing stale thumbnail: {}", path.display()); + fs::remove_file(&path) + .await + .map_err(|e| FileIOError::from((path, e))) + }) + .collect::>() + .try_join() + .await?; } Ok(())