[ENG-926] Prevent thumbnail destruction and fix the remover (#1127)

* fix(core): thumbnail removal

* chore(core): add todo

* New actor on steroids

* Improving thumbnail remover actor

* Ignoring errors from files that doesn't exist

---------

Co-authored-by: Ericson Soares <ericson.ds999@gmail.com>
This commit is contained in:
jake
2023-07-25 15:21:50 +01:00
committed by GitHub
parent 9e65a22e74
commit 874faa4e55
13 changed files with 364 additions and 129 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -97,6 +97,9 @@ regex = "1.8.4"
hex = "0.4.3"
int-enum = "0.5.0"
tokio-stream = "0.1.14"
futures-concurrency = "7.3"
async-channel = "1.9"
tokio-util = "0.7"
[target.'cfg(target_os = "macos")'.dependencies]
plist = "1"

View File

@@ -9,9 +9,8 @@ use crate::{
},
node::NodeConfigManager,
object::{
orphan_remover::OrphanRemoverActor,
preview::{get_thumbnail_path, THUMBNAIL_CACHE_DIR_NAME},
thumbnail_remover::ThumbnailRemoverActor,
orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path,
thumbnail_remover::ThumbnailRemoverActorProxy,
},
prisma::{file_path, location, PrismaClient},
util::{db::maybe_missing, error::FileIOError},
@@ -51,7 +50,7 @@ pub struct Library {
/// p2p identity
pub identity: Arc<Identity>,
pub orphan_remover: OrphanRemoverActor,
pub thumbnail_remover: ThumbnailRemoverActor,
pub thumbnail_remover_proxy: ThumbnailRemoverActorProxy,
}
impl Debug for Library {
@@ -81,13 +80,7 @@ impl Library {
let library = Self {
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
thumbnail_remover: ThumbnailRemoverActor::spawn(
db.clone(),
node_context
.config
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME),
),
thumbnail_remover_proxy: library_manager.thumbnail_remover_proxy(),
id,
db,
config,

View File

@@ -2,7 +2,11 @@ use crate::{
invalidate_query,
location::{indexer, LocationManagerError},
node::{NodeConfig, Platform},
object::tag,
object::{
preview::get_thumbnails_directory,
tag,
thumbnail_remover::{ThumbnailRemoverActor, ThumbnailRemoverActorProxy},
},
prisma::location,
util::{
db::{self, MissingFieldError},
@@ -37,6 +41,8 @@ pub struct LibraryManager {
libraries: RwLock<Vec<Arc<Library>>>,
/// node_context holds the context for the node which this library manager is running on.
pub node_context: Arc<NodeContext>,
/// An actor that removes stale thumbnails from the file system
thumbnail_remover: ThumbnailRemoverActor,
}
#[derive(Error, Debug)]
@@ -101,6 +107,7 @@ impl LibraryManager {
let this = Arc::new(Self {
libraries_dir: libraries_dir.clone(),
libraries: Default::default(),
thumbnail_remover: ThumbnailRemoverActor::new(get_thumbnails_directory(&node_context)),
node_context,
});
@@ -124,7 +131,11 @@ impl LibraryManager {
.file_stem()
.and_then(|v| v.to_str().map(Uuid::from_str))
else {
warn!("Attempted to load library from path '{}' but it has an invalid filename. Skipping...", config_path.display());
warn!(
"Attempted to load library from path '{}' \
but it has an invalid filename. Skipping...",
config_path.display()
);
continue;
};
@@ -149,6 +160,10 @@ impl LibraryManager {
Ok(this)
}
pub fn thumbnail_remover_proxy(&self) -> ThumbnailRemoverActorProxy {
self.thumbnail_remover.proxy()
}
/// create creates a new library with the given config and mounts it into the running [LibraryManager].
pub(crate) async fn create(
self: &Arc<Self>,
@@ -325,6 +340,7 @@ impl LibraryManager {
invalidate_query!(library, "library.list");
self.thumbnail_remover.remove_library(id).await;
self.libraries.write().await.retain(|l| l.id != id);
Ok(())
@@ -417,7 +433,8 @@ impl LibraryManager {
self.clone(),
));
self.libraries.write().await.push(library.clone());
self.thumbnail_remover.new_library(&library).await;
self.libraries.write().await.push(Arc::clone(&library));
if should_seed {
library.orphan_remover.invoke().await;

View File

@@ -5,7 +5,6 @@ use crate::{
use std::{
fs::Metadata,
hash::{Hash, Hasher},
path::{Path, PathBuf, MAIN_SEPARATOR_STR},
time::SystemTime,
};
@@ -24,7 +23,7 @@ pub use isolated_file_path_data::{
};
// File Path selectables!
file_path::select!(file_path_just_pub_id { pub_id });
file_path::select!(file_path_pub_and_cas_ids { pub_id cas_id });
file_path::select!(file_path_just_pub_id_materialized_path {
pub_id
materialized_path
@@ -104,20 +103,6 @@ file_path::select!(file_path_to_full_path {
// File Path includes!
file_path::include!(file_path_with_object { object });
impl Hash for file_path_just_pub_id::Data {
fn hash<H: Hasher>(&self, state: &mut H) {
self.pub_id.hash(state);
}
}
impl PartialEq for file_path_just_pub_id::Data {
fn eq(&self, other: &Self) -> bool {
self.pub_id == other.pub_id
}
}
impl Eq for file_path_just_pub_id::Data {}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct FilePathMetadata {
pub inode: u64,

View File

@@ -196,6 +196,17 @@ impl StatefulJob for IndexerJobInit {
)
.await?;
let scan_read_time = scan_start.elapsed();
let to_remove = to_remove.collect::<Vec<_>>();
ctx.library
.thumbnail_remover_proxy
.remove_cas_ids(
to_remove
.iter()
.filter_map(|file_path| file_path.cas_id.clone())
.collect::<Vec<_>>(),
)
.await;
let db_delete_start = Instant::now();
// TODO pass these uuids to sync system
@@ -451,7 +462,6 @@ impl StatefulJob for IndexerJobInit {
if run_metadata.total_updated_paths > 0 {
// Invoking orphan remover here as we probably have some orphans objects due to updates
ctx.library.orphan_remover.invoke().await;
ctx.library.thumbnail_remover.invoke().await;
}
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))

View File

@@ -19,7 +19,7 @@ use thiserror::Error;
use tracing::trace;
use super::{
file_path_helper::{file_path_just_pub_id, FilePathError, IsolatedFilePathData},
file_path_helper::{file_path_pub_and_cas_ids, FilePathError, IsolatedFilePathData},
location_with_indexer_rules,
};
@@ -280,7 +280,7 @@ fn iso_file_path_factory(
}
async fn remove_non_existing_file_paths(
to_remove: impl IntoIterator<Item = file_path_just_pub_id::Data>,
to_remove: impl IntoIterator<Item = file_path_pub_and_cas_ids::Data>,
db: &PrismaClient,
) -> Result<u64, IndexerError> {
db.file_path()
@@ -333,6 +333,25 @@ macro_rules! file_paths_db_fetcher_fn {
macro_rules! to_remove_db_fetcher_fn {
($location_id:expr, $db:expr) => {{
|iso_file_path, unique_location_id_materialized_path_name_extension_params| async {
struct PubAndCasId {
pub_id: ::uuid::Uuid,
maybe_cas_id: Option<String>,
}
impl ::std::hash::Hash for PubAndCasId {
fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
self.pub_id.hash(state);
}
}
impl ::std::cmp::PartialEq for PubAndCasId {
fn eq(&self, other: &Self) -> bool {
self.pub_id == other.pub_id
}
}
impl ::std::cmp::Eq for PubAndCasId {}
let iso_file_path: $crate::location::file_path_helper::IsolatedFilePathData<'static> =
iso_file_path;
@@ -354,7 +373,9 @@ macro_rules! to_remove_db_fetcher_fn {
::prisma_client_rust::operator::or(unique_params.collect()),
]),
])
.select($crate::location::file_path_helper::file_path_just_pub_id::select())
.select(
$crate::location::file_path_helper::file_path_pub_and_cas_ids::select(),
)
})
.collect::<::std::vec::Vec<_>>();
@@ -367,9 +388,10 @@ macro_rules! to_remove_db_fetcher_fn {
.map(|fetched_vec| {
fetched_vec
.into_iter()
.map(|fetched| {
::uuid::Uuid::from_slice(&fetched.pub_id)
.expect("file_path.pub_id is invalid!")
.map(|fetched| PubAndCasId {
pub_id: ::uuid::Uuid::from_slice(&fetched.pub_id)
.expect("file_path.pub_id is invalid!"),
maybe_cas_id: fetched.cas_id,
})
.collect::<::std::collections::HashSet<_>>()
})
@@ -377,19 +399,20 @@ macro_rules! to_remove_db_fetcher_fn {
let mut intersection = ::std::collections::HashSet::new();
while let Some(set) = sets.pop() {
for pub_id in set {
for pub_and_cas_ids in set {
// Remove returns true if the element was present in the set
if sets.iter_mut().all(|set| set.remove(&pub_id)) {
intersection.insert(pub_id);
if sets.iter_mut().all(|set| set.remove(&pub_and_cas_ids)) {
intersection.insert(pub_and_cas_ids);
}
}
}
intersection
.into_iter()
.map(|pub_id| {
$crate::location::file_path_helper::file_path_just_pub_id::Data {
pub_id: pub_id.as_bytes().to_vec(),
.map(|pub_and_cas_ids| {
$crate::location::file_path_helper::file_path_pub_and_cas_ids::Data {
pub_id: pub_and_cas_ids.pub_id.as_bytes().to_vec(),
cas_id: pub_and_cas_ids.maybe_cas_id,
}
})
.collect()

View File

@@ -80,6 +80,16 @@ pub async fn shallow(
.await?
};
library
.thumbnail_remover_proxy
.remove_cas_ids(
to_remove
.iter()
.filter_map(|file_path| file_path.cas_id.clone())
.collect::<Vec<_>>(),
)
.await;
errors.into_iter().for_each(|e| error!("{e}"));
// TODO pass these uuids to sync system
@@ -116,7 +126,6 @@ pub async fn shallow(
invalidate_query!(library, "search.paths");
library.orphan_remover.invoke().await;
library.thumbnail_remover.invoke().await;
Ok(())
}

View File

@@ -1,6 +1,6 @@
use crate::{
location::file_path_helper::{
file_path_just_pub_id, file_path_walker, FilePathMetadata, IsolatedFilePathData,
file_path_pub_and_cas_ids, file_path_walker, FilePathMetadata, IsolatedFilePathData,
MetadataExt,
},
prisma::file_path,
@@ -109,7 +109,7 @@ pub struct WalkResult<Walked, ToUpdate, ToRemove>
where
Walked: Iterator<Item = WalkedEntry>,
ToUpdate: Iterator<Item = WalkedEntry>,
ToRemove: Iterator<Item = file_path_just_pub_id::Data>,
ToRemove: Iterator<Item = file_path_pub_and_cas_ids::Data>,
{
pub walked: Walked,
pub to_update: ToUpdate,
@@ -136,13 +136,14 @@ pub(super) async fn walk<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_just_pub_id::Data>,
impl Iterator<Item = file_path_pub_and_cas_ids::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
ToRemoveDbFetcherFut:
Future<Output = Result<Vec<file_path_pub_and_cas_ids::Data>, IndexerError>>,
{
let root = root.as_ref();
@@ -204,13 +205,14 @@ pub(super) async fn keep_walking<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_just_pub_id::Data>,
impl Iterator<Item = file_path_pub_and_cas_ids::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
ToRemoveDbFetcherFut:
Future<Output = Result<Vec<file_path_pub_and_cas_ids::Data>, IndexerError>>,
{
let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
@@ -259,14 +261,15 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
(
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
Vec<file_path_just_pub_id::Data>,
Vec<file_path_pub_and_cas_ids::Data>,
Vec<IndexerError>,
),
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
ToRemoveDbFetcherFut:
Future<Output = Result<Vec<file_path_pub_and_cas_ids::Data>, IndexerError>>,
{
let root = root.as_ref();
@@ -428,9 +431,10 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
mut maybe_to_walk,
errors,
}: WorkingTable<'_>,
) -> Vec<file_path_just_pub_id::Data>
) -> Vec<file_path_pub_and_cas_ids::Data>
where
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
ToRemoveDbFetcherFut:
Future<Output = Result<Vec<file_path_pub_and_cas_ids::Data>, IndexerError>>,
{
let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e))
else {

View File

@@ -713,7 +713,7 @@ pub async fn delete_directory(
db.file_path().delete_many(children_params).exec().await?;
library.orphan_remover.invoke().await;
library.thumbnail_remover.invoke().await;
invalidate_query!(library, "search.paths");
Ok(())

View File

@@ -99,7 +99,6 @@ impl StatefulJob for FileDeleterJobInit {
invalidate_query!(ctx.library, "search.paths");
ctx.library.orphan_remover.invoke().await;
ctx.library.thumbnail_remover.invoke().await;
Ok(Some(json!({ "init": init })))
}

View File

@@ -5,6 +5,7 @@ use crate::{
location::file_path_helper::{file_path_for_thumbnailer, FilePathError, IsolatedFilePathData},
prisma::location,
util::{db::maybe_missing, error::FileIOError, version_manager::VersionManagerError},
NodeContext,
};
use std::{
@@ -41,13 +42,22 @@ pub const THUMBNAIL_CACHE_DIR_NAME: &str = "thumbnails";
/// This does not check if a thumbnail exists, it just returns the path that it would exist at
pub fn get_thumbnail_path(library: &Library, cas_id: &str) -> PathBuf {
library
.config()
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(get_shard_hex(cas_id))
.join(cas_id)
.with_extension("webp")
let mut thumb_path = library.config().data_directory();
thumb_path.push(THUMBNAIL_CACHE_DIR_NAME);
thumb_path.push(get_shard_hex(cas_id));
thumb_path.push(cas_id);
thumb_path.set_extension("webp");
thumb_path
}
pub fn get_thumbnails_directory(node_ctx: &NodeContext) -> PathBuf {
let mut thumb_path = node_ctx.config.data_directory();
thumb_path.push(THUMBNAIL_CACHE_DIR_NAME);
thumb_path
}
// this is used to pass the relevant data to the frontend so it can request the thumbnail

View File

@@ -1,21 +1,32 @@
use crate::{
library::Library,
prisma::{file_path, PrismaClient},
util::error::{FileIOError, NonUtf8PathError},
};
use std::{collections::HashSet, ffi::OsStr, path::Path, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
ffi::OsStr,
path::{Path, PathBuf},
pin::pin,
sync::Arc,
time::Duration,
};
use futures::future::try_join_all;
use async_channel as chan;
use futures::{stream::FuturesUnordered, FutureExt};
use futures_concurrency::{future::TryJoin, stream::Merge};
use thiserror::Error;
use tokio::{
fs, select,
sync::mpsc,
time::{interval_at, Instant, MissedTickBehavior},
fs, io,
time::{interval, MissedTickBehavior},
};
use tracing::error;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, trace};
use uuid::Uuid;
const TEN_SECONDS: Duration = Duration::from_secs(10);
const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60);
const HALF_HOUR: Duration = Duration::from_secs(30 * 60);
#[derive(Error, Debug)]
enum ThumbnailRemoverActorError {
@@ -30,53 +41,221 @@ enum ThumbnailRemoverActorError {
}
#[derive(Clone)]
pub struct ThumbnailRemoverActorProxy {
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,
non_indexed_thumbnails_cas_ids_tx: chan::Sender<String>,
}
impl ThumbnailRemoverActorProxy {
pub async fn new_non_indexed_thumbnail(&self, cas_id: String) {
if self
.non_indexed_thumbnails_cas_ids_tx
.send(cas_id)
.await
.is_err()
{
error!("Thumbnail remover actor is dead");
}
}
pub async fn remove_cas_ids(&self, cas_ids: Vec<String>) {
if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() {
error!("Thumbnail remover actor is dead");
}
}
}
enum DatabaseMessage {
Add(Uuid, Arc<PrismaClient>),
Remove(Uuid),
}
pub struct ThumbnailRemoverActor {
tx: mpsc::Sender<()>,
databases_tx: chan::Sender<DatabaseMessage>,
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,
non_indexed_thumbnails_cas_ids_tx: chan::Sender<String>,
_cancel_loop: DropGuard,
}
impl ThumbnailRemoverActor {
pub fn spawn(db: Arc<PrismaClient>, thumbnails_directory: impl AsRef<Path>) -> Self {
let (tx, mut rx) = mpsc::channel(4);
pub fn new(thumbnails_directory: impl AsRef<Path>) -> Self {
let thumbnails_directory = thumbnails_directory.as_ref().to_path_buf();
let (databases_tx, databases_rx) = chan::bounded(4);
let (non_indexed_thumbnails_cas_ids_tx, non_indexed_thumbnails_cas_ids_rx) =
chan::unbounded();
let (cas_ids_to_delete_tx, cas_ids_to_delete_rx) = chan::bounded(16);
let cancel_token = CancellationToken::new();
let inner_cancel_token = cancel_token.child_token();
tokio::spawn(async move {
let mut last_checked = Instant::now();
let mut check_interval = interval_at(Instant::now() + FIVE_MINUTES, FIVE_MINUTES);
check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
// Here we wait for a signal or for the tick interval to be reached
select! {
_ = check_interval.tick() => {}
signal = rx.recv() => {
if signal.is_none() {
break;
}
}
if let Err(e) = tokio::spawn(Self::worker(
thumbnails_directory.clone(),
databases_rx.clone(),
cas_ids_to_delete_rx.clone(),
non_indexed_thumbnails_cas_ids_rx.clone(),
inner_cancel_token.child_token(),
))
.await
{
error!(
"Error on Thumbnail Remover Actor; \
Error: {e}; \
Restarting the worker loop...",
);
}
// For any of them we process a clean up if a time since the last one already passed
if last_checked.elapsed() > TEN_SECONDS {
if let Err(e) = Self::process_clean_up(&db, &thumbnails_directory).await {
error!("Got an error when trying to clean stale thumbnails: {e:#?}");
}
last_checked = Instant::now();
if inner_cancel_token.is_cancelled() {
break;
}
}
});
Self { tx }
Self {
databases_tx,
cas_ids_to_delete_tx,
non_indexed_thumbnails_cas_ids_tx,
_cancel_loop: cancel_token.drop_guard(),
}
}
pub async fn invoke(&self) {
self.tx.send(()).await.ok();
pub async fn new_library(&self, Library { id, db, .. }: &Library) {
if self
.databases_tx
.send(DatabaseMessage::Add(*id, Arc::clone(db)))
.await
.is_err()
{
error!("Thumbnail remover actor is dead")
}
}
pub async fn remove_library(&self, library_id: Uuid) {
if self
.databases_tx
.send(DatabaseMessage::Remove(library_id))
.await
.is_err()
{
error!("Thumbnail remover actor is dead")
}
}
pub fn proxy(&self) -> ThumbnailRemoverActorProxy {
ThumbnailRemoverActorProxy {
cas_ids_to_delete_tx: self.cas_ids_to_delete_tx.clone(),
non_indexed_thumbnails_cas_ids_tx: self.non_indexed_thumbnails_cas_ids_tx.clone(),
}
}
async fn worker(
thumbnails_directory: PathBuf,
databases_rx: chan::Receiver<DatabaseMessage>,
cas_ids_to_delete_rx: chan::Receiver<Vec<String>>,
non_indexed_thumbnails_cas_ids_rx: chan::Receiver<String>,
cancel_token: CancellationToken,
) {
let mut check_interval = interval(HALF_HOUR);
check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut databases = HashMap::new();
let mut non_indexed_thumbnails_cas_ids = HashSet::new();
enum StreamMessage {
Run,
ToDelete(Vec<String>),
Database(DatabaseMessage),
NonIndexedThumbnail(String),
Stop,
}
let cancel = pin!(cancel_token.cancelled());
let mut msg_stream = (
databases_rx.map(StreamMessage::Database),
cas_ids_to_delete_rx.map(StreamMessage::ToDelete),
non_indexed_thumbnails_cas_ids_rx.map(StreamMessage::NonIndexedThumbnail),
IntervalStream::new(check_interval).map(|_| StreamMessage::Run),
cancel.into_stream().map(|()| StreamMessage::Stop),
)
.merge();
while let Some(msg) = msg_stream.next().await {
match msg {
StreamMessage::Run => {
// For any of them we process a clean up if a time since the last one already passed
if !databases.is_empty() {
if let Err(e) = Self::process_clean_up(
&thumbnails_directory,
databases.values(),
&non_indexed_thumbnails_cas_ids,
)
.await
{
error!("Got an error when trying to clean stale thumbnails: {e:#?}");
}
}
}
StreamMessage::ToDelete(cas_ids) => {
if let Err(e) = Self::remove_by_cas_ids(&thumbnails_directory, cas_ids).await {
error!("Got an error when trying to remove thumbnails: {e:#?}");
}
}
StreamMessage::Database(DatabaseMessage::Add(id, db)) => {
databases.insert(id, db);
}
StreamMessage::Database(DatabaseMessage::Remove(id)) => {
databases.remove(&id);
}
StreamMessage::NonIndexedThumbnail(cas_id) => {
non_indexed_thumbnails_cas_ids.insert(cas_id);
}
StreamMessage::Stop => {
debug!("Thumbnail remover actor is stopping");
break;
}
}
}
}
async fn remove_by_cas_ids(
thumbnails_directory: &Path,
cas_ids: Vec<String>,
) -> Result<(), ThumbnailRemoverActorError> {
cas_ids
.into_iter()
.map(|cas_id| async move {
let thumbnail_path =
thumbnails_directory.join(format!("{}/{}.webp", &cas_id[0..2], &cas_id[2..]));
trace!("Removing thumbnail: {}", thumbnail_path.display());
match fs::remove_file(&thumbnail_path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FileIOError::from((thumbnail_path, e))),
}
})
.collect::<Vec<_>>()
.try_join()
.await?;
Ok(())
}
async fn process_clean_up(
db: &PrismaClient,
thumbnails_directory: &Path,
databases: impl Iterator<Item = &Arc<PrismaClient>>,
non_indexed_thumbnails_cas_ids: &HashSet<String>,
) -> Result<(), ThumbnailRemoverActorError> {
let databases = databases.collect::<Vec<_>>();
// Thumbnails directory have the following structure:
// thumbnails/
// ├── version.txt
//└── <cas_id>[0..2]/ # sharding
// └── <cas_id>[2..].webp
let mut read_dir = fs::read_dir(thumbnails_directory)
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;
@@ -104,7 +283,7 @@ impl ThumbnailRemoverActor {
.to_str()
.ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?;
let mut thumbnails_paths_by_cas_id = Vec::new();
let mut thumbnails_paths_by_cas_id = HashMap::new();
let mut entry_read_dir = fs::read_dir(&entry_path)
.await
@@ -134,7 +313,7 @@ impl ThumbnailRemoverActor {
.ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?;
thumbnails_paths_by_cas_id
.push((format!("{}{}", entry_path_name, thumbnail_name), thumb_path));
.insert(format!("{}{}", entry_path_name, thumbnail_name), thumb_path);
}
if thumbnails_paths_by_cas_id.is_empty() {
@@ -145,38 +324,41 @@ impl ThumbnailRemoverActor {
continue;
}
let thumbs_in_db = db
.file_path()
.find_many(vec![file_path::cas_id::in_vec(
thumbnails_paths_by_cas_id
.iter()
.map(|(cas_id, _)| cas_id)
.cloned()
.collect(),
)])
.select(file_path::select!({ cas_id }))
.exec()
.await?
.into_iter()
.map(|file_path| {
file_path
.cas_id
.expect("only file paths with a cas_id were queried")
let mut thumbs_in_db_futs = databases
.iter()
.map(|db| {
db.file_path()
.find_many(vec![file_path::cas_id::in_vec(
thumbnails_paths_by_cas_id.keys().cloned().collect(),
)])
.select(file_path::select!({ cas_id }))
.exec()
})
.collect::<HashSet<_>>();
.collect::<FuturesUnordered<_>>();
try_join_all(
thumbnails_paths_by_cas_id
while let Some(maybe_thumbs_in_db) = thumbs_in_db_futs.next().await {
maybe_thumbs_in_db?
.into_iter()
.filter_map(|(cas_id, path)| {
(!thumbs_in_db.contains(&cas_id)).then_some(async move {
fs::remove_file(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
}),
)
.await?;
.filter_map(|file_path| file_path.cas_id)
.for_each(|cas_id| {
thumbnails_paths_by_cas_id.remove(&cas_id);
});
}
thumbnails_paths_by_cas_id
.retain(|cas_id, _| !non_indexed_thumbnails_cas_ids.contains(cas_id));
thumbnails_paths_by_cas_id
.into_values()
.map(|path| async move {
trace!("Removing stale thumbnail: {}", path.display());
fs::remove_file(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
.collect::<Vec<_>>()
.try_join()
.await?;
}
Ok(())