From 03e71e98a49369819b511f79182bbff0fb59b5fd Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Thu, 20 Jul 2023 12:17:54 -0300 Subject: [PATCH] [ENG-778] Location clean up job (#1123) * Introducing thumbnail remover actor Also tweaking the orphan remover actor * Rust fmt --- core/src/library/library.rs | 6 +- core/src/library/manager.rs | 12 +- core/src/location/file_path_helper/mod.rs | 40 +++-- core/src/location/indexer/indexer_job.rs | 1 + core/src/location/indexer/shallow.rs | 1 + core/src/location/manager/watcher/utils.rs | 28 ++-- core/src/location/mod.rs | 5 +- core/src/object/fs/delete.rs | 3 + core/src/object/mod.rs | 1 + core/src/object/orphan_remover.rs | 106 +++++++----- core/src/object/thumbnail_remover.rs | 184 +++++++++++++++++++++ 11 files changed, 316 insertions(+), 71 deletions(-) create mode 100644 core/src/object/thumbnail_remover.rs diff --git a/core/src/library/library.rs b/core/src/library/library.rs index e56adfec5..d17b78fc9 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -8,7 +8,10 @@ use crate::{ LocationManager, }, node::NodeConfigManager, - object::{orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path}, + object::{ + orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path, + thumbnail_remover::ThumbnailRemoverActor, + }, prisma::{file_path, location, PrismaClient}, sync::SyncManager, util::{db::maybe_missing, error::FileIOError}, @@ -48,6 +51,7 @@ pub struct Library { /// p2p identity pub identity: Arc, pub orphan_remover: OrphanRemoverActor, + pub thumbnail_remover: ThumbnailRemoverActor, } impl Debug for Library { diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index 02f5fadb2..e13d062f0 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -2,7 +2,10 @@ use crate::{ invalidate_query, location::{indexer, LocationManagerError}, node::{NodeConfig, Platform}, - object::{orphan_remover::OrphanRemoverActor, tag}, + object::{ + orphan_remover::OrphanRemoverActor, preview::THUMBNAIL_CACHE_DIR_NAME, tag, + thumbnail_remover::ThumbnailRemoverActor, + }, prisma::location, sync::{SyncManager, SyncMessage}, util::{ @@ -475,6 +478,13 @@ impl LibraryManager { // key_manager, sync: Arc::new(sync_manager), orphan_remover: OrphanRemoverActor::spawn(db.clone()), + thumbnail_remover: ThumbnailRemoverActor::spawn( + db.clone(), + node_context + .config + .data_directory() + .join(THUMBNAIL_CACHE_DIR_NAME), + ), db, node_context, identity, diff --git a/core/src/location/file_path_helper/mod.rs b/core/src/location/file_path_helper/mod.rs index 868b3f4ec..082aa2925 100644 --- a/core/src/location/file_path_helper/mod.rs +++ b/core/src/location/file_path_helper/mod.rs @@ -284,20 +284,32 @@ pub fn filter_existing_file_path_params( /// the materialized path #[allow(unused)] pub fn loose_find_existing_file_path_params( - IsolatedFilePathData { - materialized_path, - location_id, - name, - extension, - .. - }: &IsolatedFilePathData, -) -> Vec { - vec![ - file_path::location_id::equals(Some(*location_id)), - file_path::materialized_path::equals(Some(materialized_path.to_string())), - file_path::name::equals(Some(name.to_string())), - file_path::extension::equals(Some(extension.to_string())), - ] + location_id: location::id::Type, + location_path: impl AsRef, + full_path: impl AsRef, +) -> Result, FilePathError> { + let location_path = location_path.as_ref(); + let full_path = full_path.as_ref(); + + let file_iso_file_path = + IsolatedFilePathData::new(location_id, location_path, full_path, false)?; + + let dir_iso_file_path = IsolatedFilePathData::new(location_id, location_path, full_path, true)?; + + Ok(vec![ + file_path::location_id::equals(Some(location_id)), + file_path::materialized_path::equals(Some( + file_iso_file_path.materialized_path.to_string(), + )), + file_path::name::in_vec(vec![ + file_iso_file_path.name.to_string(), + dir_iso_file_path.name.to_string(), + ]), + file_path::extension::in_vec(vec![ + file_iso_file_path.extension.to_string(), + dir_iso_file_path.extension.to_string(), + ]), + ]) } pub async fn ensure_sub_path_is_in_location( diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 308e01781..f13be2213 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -451,6 +451,7 @@ impl StatefulJob for IndexerJobInit { if run_metadata.total_updated_paths > 0 { // Invoking orphan remover here as we probably have some orphans objects due to updates ctx.library.orphan_remover.invoke().await; + ctx.library.thumbnail_remover.invoke().await; } Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index 98ecbb560..bfbe0f94a 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -116,6 +116,7 @@ pub async fn shallow( invalidate_query!(library, "search.paths"); library.orphan_remover.invoke().await; + library.thumbnail_remover.invoke().await; Ok(()) } diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index bca1648ba..e1a3b3855 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -35,6 +35,7 @@ use crate::location::file_path_helper::get_inode_and_device_from_path; use std::{ collections::HashSet, + ffi::OsStr, fs::Metadata, path::{Path, PathBuf}, str::FromStr, @@ -43,7 +44,7 @@ use std::{ use sd_file_ext::extensions::ImageExtension; use chrono::{DateTime, Local, Utc}; -use notify::{Event, EventKind}; +use notify::Event; use prisma_client_rust::{raw, PrismaValue}; use serde_json::json; use tokio::{fs, io::ErrorKind}; @@ -55,10 +56,9 @@ use super::INodeAndDevice; pub(super) fn check_event(event: &Event, ignore_paths: &HashSet) -> bool { // if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore !event.paths.iter().any(|p| { - let path_str = p.to_str().expect("Found non-UTF-8 path"); - - path_str.contains(".DS_Store") - || (path_str.contains(".spacedrive") && matches!(event.kind, EventKind::Create(_))) + p.file_name() + .and_then(OsStr::to_str) + .map_or(false, |name| name == ".DS_Store" || name == ".spacedrive") || ignore_paths.contains(p) }) } @@ -600,8 +600,10 @@ pub(super) async fn rename( if let Some(file_path) = db .file_path() .find_first(loose_find_existing_file_path_params( - &IsolatedFilePathData::new(location_id, &location_path, old_path, false)?, - )) + location_id, + &location_path, + old_path, + )?) .exec() .await? { @@ -664,8 +666,8 @@ pub(super) async fn remove( let Some(file_path) = library.db .file_path() .find_first(loose_find_existing_file_path_params( - &IsolatedFilePathData::new(location_id, &location_path, full_path, false)?, - )) + location_id, &location_path, full_path, + )?) .exec() .await? else { return Ok(()); @@ -715,8 +717,6 @@ pub(super) async fn remove_by_file_path( .await?; } } - - library.orphan_remover.invoke().await; } Err(e) => return Err(FileIOError::from((path, e)).into()), } @@ -791,8 +791,10 @@ pub(super) async fn extract_inode_and_device_from_path( .db .file_path() .find_first(loose_find_existing_file_path_params( - &IsolatedFilePathData::new(location_id, location_path, path, false)?, - )) + location_id, + location_path, + path, + )?) .select(file_path::select!({ inode device })) .exec() .await? diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index 2bf818188..c31641c47 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -706,11 +706,10 @@ pub async fn delete_directory( })], ); - for params in children_params.chunks(512) { - db.file_path().delete_many(params.to_vec()).exec().await?; - } + db.file_path().delete_many(children_params).exec().await?; library.orphan_remover.invoke().await; + library.thumbnail_remover.invoke().await; invalidate_query!(library, "search.paths"); Ok(()) diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index 00fb581cb..762397056 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -98,6 +98,9 @@ impl StatefulJob for FileDeleterJobInit { let init = self; invalidate_query!(ctx.library, "search.paths"); + ctx.library.orphan_remover.invoke().await; + ctx.library.thumbnail_remover.invoke().await; + Ok(Some(json!({ "init": init }))) } } diff --git a/core/src/object/mod.rs b/core/src/object/mod.rs index 524f39773..b02ee2331 100644 --- a/core/src/object/mod.rs +++ b/core/src/object/mod.rs @@ -9,6 +9,7 @@ pub mod fs; pub mod orphan_remover; pub mod preview; pub mod tag; +pub mod thumbnail_remover; pub mod validation; // Objects are primarily created by the identifier from Paths diff --git a/core/src/object/orphan_remover.rs b/core/src/object/orphan_remover.rs index 71df0eefe..ea920df1a 100644 --- a/core/src/object/orphan_remover.rs +++ b/core/src/object/orphan_remover.rs @@ -1,58 +1,48 @@ -use std::{sync::Arc, time::Duration}; -use tokio::sync::mpsc::*; -use tracing::{debug, error}; +use crate::prisma::{object, tag_on_object, PrismaClient}; -use crate::prisma::*; +use std::{sync::Arc, time::Duration}; + +use tokio::{ + select, + sync::mpsc, + time::{interval_at, Instant, MissedTickBehavior}, +}; +use tracing::{error, trace}; + +const TEN_SECONDS: Duration = Duration::from_secs(10); +const ONE_MINUTE: Duration = Duration::from_secs(60); // Actor that can be invoked to find and delete objects with no matching file paths #[derive(Clone)] pub struct OrphanRemoverActor { - tx: Sender<()>, + tx: mpsc::Sender<()>, } impl OrphanRemoverActor { pub fn spawn(db: Arc) -> Self { - let (tx, mut rx) = channel(4); + let (tx, mut rx) = mpsc::channel(4); tokio::spawn(async move { - while let Some(()) = rx.recv().await { - // prevents timeouts - tokio::time::sleep(Duration::from_millis(10)).await; + let mut last_checked = Instant::now(); - loop { - let objs = match db - .object() - .find_many(vec![object::file_paths::none(vec![])]) - .take(512) - .select(object::select!({ id pub_id })) - .exec() - .await - { - Ok(objs) => objs, - Err(e) => { - error!("Failed to fetch orphaned objects: {e}"); + let mut check_interval = interval_at(Instant::now() + ONE_MINUTE, ONE_MINUTE); + check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + // Here we wait for a signal or for the tick interval to be reached + select! { + _ = check_interval.tick() => {} + signal = rx.recv() => { + if signal.is_none() { break; } - }; - - if objs.is_empty() { - break; } + } - debug!("Removing {} orphaned objects", objs.len()); - - let ids: Vec<_> = objs.iter().map(|o| o.id).collect(); - - if let Err(e) = db - ._batch(( - db.tag_on_object() - .delete_many(vec![tag_on_object::object_id::in_vec(ids.clone())]), - db.object().delete_many(vec![object::id::in_vec(ids)]), - )) - .await - { - error!("Failed to remove orphaned objects: {e}"); - } + // For any of them we process a clean up if a time since the last one already passed + if last_checked.elapsed() > TEN_SECONDS { + Self::process_clean_up(&db).await; + last_checked = Instant::now(); } } }); @@ -63,4 +53,42 @@ impl OrphanRemoverActor { pub async fn invoke(&self) { self.tx.send(()).await.ok(); } + + async fn process_clean_up(db: &PrismaClient) { + loop { + let Ok(objects_ids) = db + .object() + .find_many(vec![object::file_paths::none(vec![])]) + .take(512) + .select(object::select!({ id })) + .exec() + .await + .map(|objects| objects.into_iter() + .map(|object| object.id) + .collect::>() + ) + .map_err(|e| error!("Failed to fetch orphaned objects: {e:#?}")) + else { + break; + }; + + if objects_ids.is_empty() { + break; + } + + trace!("Removing {} orphaned objects", objects_ids.len()); + + if let Err(e) = db + ._batch(( + db.tag_on_object() + .delete_many(vec![tag_on_object::object_id::in_vec(objects_ids.clone())]), + db.object() + .delete_many(vec![object::id::in_vec(objects_ids)]), + )) + .await + { + error!("Failed to remove orphaned objects: {e:#?}"); + } + } + } } diff --git a/core/src/object/thumbnail_remover.rs b/core/src/object/thumbnail_remover.rs new file mode 100644 index 000000000..e46a47e5e --- /dev/null +++ b/core/src/object/thumbnail_remover.rs @@ -0,0 +1,184 @@ +use crate::{ + prisma::{file_path, PrismaClient}, + util::error::{FileIOError, NonUtf8PathError}, +}; + +use std::{collections::HashSet, ffi::OsStr, path::Path, sync::Arc, time::Duration}; + +use futures::future::try_join_all; +use thiserror::Error; +use tokio::{ + fs, select, + sync::mpsc, + time::{interval_at, Instant, MissedTickBehavior}, +}; +use tracing::error; + +const TEN_SECONDS: Duration = Duration::from_secs(10); +const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60); + +#[derive(Error, Debug)] +enum ThumbnailRemoverActorError { + #[error("database error")] + Database(#[from] prisma_client_rust::QueryError), + #[error("missing file name: {}", .0.display())] + MissingFileName(Box), + #[error(transparent)] + FileIO(#[from] FileIOError), + #[error(transparent)] + NonUtf8Path(#[from] NonUtf8PathError), +} + +#[derive(Clone)] +pub struct ThumbnailRemoverActor { + tx: mpsc::Sender<()>, +} + +impl ThumbnailRemoverActor { + pub fn spawn(db: Arc, thumbnails_directory: impl AsRef) -> Self { + let (tx, mut rx) = mpsc::channel(4); + let thumbnails_directory = thumbnails_directory.as_ref().to_path_buf(); + + tokio::spawn(async move { + let mut last_checked = Instant::now(); + + let mut check_interval = interval_at(Instant::now() + FIVE_MINUTES, FIVE_MINUTES); + check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + // Here we wait for a signal or for the tick interval to be reached + select! { + _ = check_interval.tick() => {} + signal = rx.recv() => { + if signal.is_none() { + break; + } + } + } + + // For any of them we process a clean up if a time since the last one already passed + if last_checked.elapsed() > TEN_SECONDS { + if let Err(e) = Self::process_clean_up(&db, &thumbnails_directory).await { + error!("Got an error when trying to clean stale thumbnails: {e:#?}"); + } + last_checked = Instant::now(); + } + } + }); + + Self { tx } + } + + pub async fn invoke(&self) { + self.tx.send(()).await.ok(); + } + + async fn process_clean_up( + db: &PrismaClient, + thumbnails_directory: &Path, + ) -> Result<(), ThumbnailRemoverActorError> { + let mut read_dir = fs::read_dir(thumbnails_directory) + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; + + while let Some(entry) = read_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))? + { + let entry_path = entry.path(); + if !entry + .metadata() + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))? + .is_dir() + { + continue; + } + + let entry_path_name = entry_path + .file_name() + .ok_or_else(|| { + ThumbnailRemoverActorError::MissingFileName(entry.path().into_boxed_path()) + })? + .to_str() + .ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?; + + let mut thumbnails_paths_by_cas_id = Vec::new(); + + let mut entry_read_dir = fs::read_dir(&entry_path) + .await + .map_err(|e| FileIOError::from((&entry_path, e)))?; + + while let Some(thumb_entry) = entry_read_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((&entry_path, e)))? + { + let thumb_path = thumb_entry.path(); + + if thumb_path + .extension() + .and_then(OsStr::to_str) + .map_or(true, |ext| ext != "webp") + { + continue; + } + + let thumbnail_name = thumb_path + .file_stem() + .ok_or_else(|| { + ThumbnailRemoverActorError::MissingFileName(entry.path().into_boxed_path()) + })? + .to_str() + .ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?; + + thumbnails_paths_by_cas_id + .push((format!("{}{}", entry_path_name, thumbnail_name), thumb_path)); + } + + if thumbnails_paths_by_cas_id.is_empty() { + fs::remove_dir(&entry_path) + .await + .map_err(|e| FileIOError::from((entry_path, e)))?; + + continue; + } + + let thumbs_in_db = db + .file_path() + .find_many(vec![file_path::cas_id::in_vec( + thumbnails_paths_by_cas_id + .iter() + .map(|(cas_id, _)| cas_id) + .cloned() + .collect(), + )]) + .select(file_path::select!({ cas_id })) + .exec() + .await? + .into_iter() + .map(|file_path| { + file_path + .cas_id + .expect("only file paths with a cas_id were queried") + }) + .collect::>(); + + try_join_all( + thumbnails_paths_by_cas_id + .into_iter() + .filter_map(|(cas_id, path)| { + (!thumbs_in_db.contains(&cas_id)).then_some(async move { + fs::remove_file(&path) + .await + .map_err(|e| FileIOError::from((path, e))) + }) + }), + ) + .await?; + } + + Ok(()) + } +}