diff --git a/.gitignore b/.gitignore index 271efa353..aaf9028d0 100644 --- a/.gitignore +++ b/.gitignore @@ -77,5 +77,6 @@ dev.db-journal /core/migration_test sd_init.json +spacedrive .cargo/config .github/scripts/deps diff --git a/core/src/location/file_path_helper/mod.rs b/core/src/location/file_path_helper/mod.rs index 9ed27b807..e158769a7 100644 --- a/core/src/location/file_path_helper/mod.rs +++ b/core/src/location/file_path_helper/mod.rs @@ -80,6 +80,7 @@ file_path::select!(file_path_walker { date_modified inode device + size_in_bytes_bytes }); file_path::select!(file_path_to_handle_custom_uri { pub_id diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index d4c7273bd..5f1e8f444 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -4,6 +4,7 @@ use crate::{ CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata, JobStepOutput, StatefulJob, WorkerContext, }, + library::Library, location::{ file_path_helper::{ ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -11,11 +12,17 @@ use crate::{ }, location_with_indexer_rules, }, + prisma::{file_path, location}, to_remove_db_fetcher_fn, util::db::maybe_missing, }; +use sd_prisma::prisma_sync; +use sd_sync::*; +use sd_utils::from_bytes_to_uuid; + use std::{ + collections::HashMap, hash::{Hash, Hasher}, path::{Path, PathBuf}, sync::Arc, @@ -23,14 +30,15 @@ use std::{ }; use itertools::Itertools; +use prisma_client_rust::operator::or; use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::time::Instant; -use tracing::info; +use tracing::{debug, info, warn}; use super::{ execute_indexer_save_step, execute_indexer_update_step, iso_file_path_factory, - remove_non_existing_file_paths, + remove_non_existing_file_paths, reverse_update_directories_sizes, rules::IndexerRule, walk::{keep_walking, walk, ToWalkEntry, WalkResult}, IndexerError, IndexerJobSaveStep, IndexerJobUpdateStep, @@ -61,6 +69,7 @@ impl Hash for IndexerJobInit { /// contains some metadata for logging purposes. #[derive(Serialize, Deserialize, Debug)] pub struct IndexerJobData { + location_path: PathBuf, indexed_path: PathBuf, indexer_rules: Vec, } @@ -76,6 +85,7 @@ pub struct IndexerJobRunMetadata { indexed_count: u64, updated_count: u64, removed_count: u64, + paths_and_sizes: HashMap, } impl JobRunMetadata for IndexerJobRunMetadata { @@ -88,6 +98,10 @@ impl JobRunMetadata for IndexerJobRunMetadata { self.total_update_steps += new_data.total_update_steps; self.indexed_count += new_data.indexed_count; self.removed_count += new_data.removed_count; + + for (path, size) in new_data.paths_and_sizes { + *self.paths_and_sizes.entry(path).or_default() += size; + } } } @@ -186,6 +200,7 @@ impl StatefulJob for IndexerJobInit { to_walk, to_remove, errors, + paths_and_sizes, } = walk( &to_walk_path, &indexer_rules, @@ -199,6 +214,11 @@ impl StatefulJob for IndexerJobInit { let scan_read_time = scan_start.elapsed(); let to_remove = to_remove.collect::>(); + debug!( + "Walker at indexer job found {} file_paths to be removed", + to_remove.len() + ); + ctx.node .thumbnail_remover .remove_cas_ids( @@ -255,6 +275,8 @@ impl StatefulJob for IndexerJobInit { .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)) .collect::>(); + debug!("Walker at indexer job found {total_updated_paths} file_paths to be updated"); + IndexerJobData::on_scan_progress( ctx, vec![ @@ -268,6 +290,7 @@ impl StatefulJob for IndexerJobInit { ); *data = Some(IndexerJobData { + location_path: location_path.to_path_buf(), indexed_path: to_walk_path, indexer_rules, }); @@ -283,6 +306,7 @@ impl StatefulJob for IndexerJobInit { removed_count, total_save_steps: *to_save_chunks as u64, total_update_steps: *to_update_chunks as u64, + paths_and_sizes, }, steps, errors @@ -362,6 +386,7 @@ impl StatefulJob for IndexerJobInit { to_walk, to_remove, errors, + paths_and_sizes, } = keep_walking( to_walk_entry, &data.indexer_rules, @@ -372,6 +397,8 @@ impl StatefulJob for IndexerJobInit { ) .await?; + new_metadata.paths_and_sizes = paths_and_sizes; + new_metadata.scan_read_time = scan_start.elapsed(); let db_delete_time = Instant::now(); @@ -441,14 +468,18 @@ impl StatefulJob for IndexerJobInit { async fn finalize( &self, ctx: &WorkerContext, - _data: &Option, + data: &Option, run_metadata: &Self::RunMetadata, ) -> JobResult { let init = self; + let indexed_path_str = data + .as_ref() + .map(|data| Ok(data.indexed_path.to_string_lossy().to_string())) + .unwrap_or_else(|| maybe_missing(&init.location.path, "location.path").cloned())?; + info!( - "Scan of {} completed in {:?}. {} new files found, \ + "Scan of {indexed_path_str} completed in {:?}. {} new files found, \ indexed {} files in db, updated {} entries. db write completed in {:?}", - maybe_missing(&init.location.path, "location.path")?, run_metadata.scan_read_time, run_metadata.total_paths, run_metadata.indexed_count, @@ -465,6 +496,27 @@ impl StatefulJob for IndexerJobInit { ctx.library.orphan_remover.invoke().await; } + if let Some(data) = data { + update_directories_sizes( + &run_metadata.paths_and_sizes, + init.location.id, + &data.indexed_path, + &ctx.library, + ) + .await?; + + if data.indexed_path != data.location_path { + reverse_update_directories_sizes( + &data.indexed_path, + init.location.id, + &data.location_path, + &ctx.library, + ) + .await + .map_err(IndexerError::from)?; + } + } + Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) } } @@ -480,3 +532,81 @@ fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ { ); } } + +async fn update_directories_sizes( + paths_and_sizes: &HashMap, + location_id: location::id::Type, + location_path: impl AsRef, + library: &Library, +) -> Result<(), IndexerError> { + let location_path = location_path.as_ref(); + + let Library { db, sync, .. } = library; + + let chunked_queries = paths_and_sizes + .keys() + .chunks(200) + .into_iter() + .map(|paths_chunk| { + paths_chunk + .into_iter() + .map(|path| { + IsolatedFilePathData::new(location_id, location_path, path, true) + .map(file_path::WhereParam::from) + }) + .collect::, _>>() + .map(|params| { + db.file_path() + .find_many(vec![or(params)]) + .select(file_path::select!({ pub_id materialized_path name })) + }) + }) + .collect::, _>>()?; + + let to_sync_and_update = db + ._batch(chunked_queries) + .await? + .into_iter() + .flatten() + .filter_map( + |file_path| match (file_path.materialized_path, file_path.name) { + (Some(materialized_path), Some(name)) => { + let mut directory_full_path = location_path.join(&materialized_path[1..]); + directory_full_path.push(name); + + if let Some(size) = paths_and_sizes.get(&directory_full_path) { + let size_bytes = size.to_be_bytes().to_vec(); + + Some(( + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: file_path.pub_id.clone(), + }, + file_path::size_in_bytes_bytes::NAME, + json!(size_bytes.clone()), + ), + db.file_path().update( + file_path::pub_id::equals(file_path.pub_id), + vec![file_path::size_in_bytes_bytes::set(Some(size_bytes))], + ), + )) + } else { + warn!("Found a file_path without ancestor in the database, possible corruption"); + None + } + } + _ => { + warn!( + "Found a file_path missing its materialized_path or name: ", + from_bytes_to_uuid(&file_path.pub_id) + ); + None + } + }, + ) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + sync.write_ops(db, to_sync_and_update).await?; + + Ok(()) +} diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 229061cb7..fe93d19a4 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -7,16 +7,20 @@ use crate::{ }, }; -use std::path::Path; - -use chrono::Utc; -use rspc::ErrorCode; use sd_prisma::prisma_sync; use sd_sync::*; +use sd_utils::from_bytes_to_uuid; + +use std::{collections::HashMap, path::Path}; + +use chrono::Utc; +use itertools::Itertools; +use prisma_client_rust::operator::or; +use rspc::ErrorCode; use serde::{Deserialize, Serialize}; use serde_json::json; use thiserror::Error; -use tracing::trace; +use tracing::{trace, warn}; use super::{ file_path_helper::{file_path_pub_and_cas_ids, FilePathError, IsolatedFilePathData}, @@ -386,3 +390,137 @@ macro_rules! to_remove_db_fetcher_fn { } }}; } + +pub async fn reverse_update_directories_sizes( + base_path: impl AsRef, + location_id: location::id::Type, + location_path: impl AsRef, + library: &Library, +) -> Result<(), FilePathError> { + let base_path = base_path.as_ref(); + let location_path = location_path.as_ref(); + + let Library { sync, db, .. } = library; + + let ancestors = base_path + .ancestors() + .take_while(|&ancestor| ancestor != location_path) + .map(|ancestor| IsolatedFilePathData::new(location_id, location_path, ancestor, true)) + .collect::, _>>()?; + + let chunked_queries = ancestors + .iter() + .chunks(200) + .into_iter() + .map(|ancestors_iso_file_paths_chunk| { + db.file_path() + .find_many(vec![or(ancestors_iso_file_paths_chunk + .into_iter() + .map(file_path::WhereParam::from) + .collect::>())]) + .select(file_path::select!({ pub_id materialized_path name })) + }) + .collect::>(); + + let mut pub_id_by_ancestor_materialized_path = db + ._batch(chunked_queries) + .await? + .into_iter() + .flatten() + .filter_map( + |file_path| match (file_path.materialized_path, file_path.name) { + (Some(materialized_path), Some(name)) => { + Some((format!("{materialized_path}{name}/"), (file_path.pub_id, 0))) + } + _ => { + warn!( + "Found a file_path missing its materialized_path or name: ", + from_bytes_to_uuid(&file_path.pub_id) + ); + None + } + }, + ) + .collect::>(); + + db.file_path() + .find_many(vec![ + file_path::location_id::equals(Some(location_id)), + file_path::materialized_path::in_vec( + ancestors + .iter() + .map(|ancestor_iso_file_path| { + ancestor_iso_file_path + .materialized_path_for_children() + .expect("each ancestor is a directory") + }) + .collect(), + ), + ]) + .select(file_path::select!({ materialized_path size_in_bytes_bytes })) + .exec() + .await? + .into_iter() + .for_each(|file_path| { + if let Some(materialized_path) = file_path.materialized_path { + if let Some((_, size)) = + pub_id_by_ancestor_materialized_path.get_mut(&materialized_path) + { + *size += file_path + .size_in_bytes_bytes + .map(|size_in_bytes_bytes| { + u64::from_be_bytes([ + size_in_bytes_bytes[0], + size_in_bytes_bytes[1], + size_in_bytes_bytes[2], + size_in_bytes_bytes[3], + size_in_bytes_bytes[4], + size_in_bytes_bytes[5], + size_in_bytes_bytes[6], + size_in_bytes_bytes[7], + ]) + }) + .unwrap_or_else(|| { + warn!("Got a directory missing its size in bytes"); + 0 + }); + } + } else { + warn!("Corrupt database possesing a file_path entry without materialized_path"); + } + }); + + let to_sync_and_update = ancestors + .into_iter() + .filter_map(|ancestor_iso_file_path| { + if let Some((pub_id, size)) = pub_id_by_ancestor_materialized_path.remove( + &ancestor_iso_file_path + .materialized_path_for_children() + .expect("each ancestor is a directory"), + ) { + let size_bytes = size.to_be_bytes().to_vec(); + + Some(( + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: pub_id.clone(), + }, + file_path::size_in_bytes_bytes::NAME, + json!(size_bytes.clone()), + ), + db.file_path().update( + file_path::pub_id::equals(pub_id), + vec![file_path::size_in_bytes_bytes::set(Some(size_bytes))], + ), + )) + } else { + warn!("Got a missing ancestor for a file_path in the database, maybe we have a corruption"); + None + } + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + sync.write_ops(db, to_sync_and_update).await?; + + Ok(()) +} diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index cbf225db2..96242d216 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -7,19 +7,25 @@ use crate::{ check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, IsolatedFilePathData, }, - indexer::{execute_indexer_update_step, IndexerJobUpdateStep}, - LocationError, + indexer::{ + execute_indexer_update_step, reverse_update_directories_sizes, IndexerJobUpdateStep, + }, + scan_location_sub_path, }, - to_remove_db_fetcher_fn, Node, + to_remove_db_fetcher_fn, + util::db::maybe_missing, + Node, }; -use tracing::error; use std::{ + collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; +use futures::future::join_all; use itertools::Itertools; +use tracing::{debug, error}; use super::{ execute_indexer_save_step, iso_file_path_factory, location_with_indexer_rules, @@ -34,12 +40,10 @@ pub async fn shallow( location: &location_with_indexer_rules::Data, sub_path: &PathBuf, node: &Arc, - library: &Library, + library: &Arc, ) -> Result<(), JobError> { let location_id = location.id; - let Some(location_path) = location.path.as_ref().map(PathBuf::from) else { - return Err(JobError::Location(LocationError::MissingPath(location_id))); - }; + let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?; let db = library.db.clone(); @@ -60,7 +64,7 @@ pub async fn shallow( ( !check_file_path_exists::( - &IsolatedFilePathData::new(location_id, &location_path, &full_path, true) + &IsolatedFilePathData::new(location_id, location_path, &full_path, true) .map_err(IndexerError::from)?, &db, ) @@ -71,19 +75,24 @@ pub async fn shallow( (false, location_path.to_path_buf()) }; - let (walked, to_update, to_remove, errors) = { + let (walked, to_update, to_remove, errors, _s) = { walk_single_dir( &to_walk_path, &indexer_rules, |_, _| {}, file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, &db), - iso_file_path_factory(location_id, &location_path), + iso_file_path_factory(location_id, location_path), add_root, ) .await? }; + debug!( + "Walker at shallow indexer found {} file_paths to be removed", + to_remove.len() + ); + node.thumbnail_remover .remove_cas_ids( to_remove @@ -98,13 +107,28 @@ pub async fn shallow( // TODO pass these uuids to sync system remove_non_existing_file_paths(to_remove, &db).await?; + let mut new_directories_to_scan = HashSet::new(); + let save_steps = walked .chunks(BATCH_SIZE) .into_iter() .enumerate() - .map(|(i, chunk)| IndexerJobSaveStep { - chunk_idx: i, - walked: chunk.collect::>(), + .map(|(i, chunk)| { + let walked = chunk.collect::>(); + + walked + .iter() + .filter_map(|walked_entry| { + walked_entry.iso_file_path.materialized_path_for_children() + }) + .for_each(|new_dir| { + new_directories_to_scan.insert(new_dir); + }); + + IndexerJobSaveStep { + chunk_idx: i, + walked, + } }) .collect::>(); @@ -112,20 +136,47 @@ pub async fn shallow( execute_indexer_save_step(location, &step, library).await?; } + for scan in join_all( + new_directories_to_scan + .into_iter() + .map(|sub_path| scan_location_sub_path(node, library, location.clone(), sub_path)), + ) + .await + { + if let Err(e) = scan { + error!("{e}"); + } + } + + let mut to_update_count = 0; + let update_steps = to_update .chunks(BATCH_SIZE) .into_iter() .enumerate() - .map(|(i, chunk)| IndexerJobUpdateStep { - chunk_idx: i, - to_update: chunk.collect::>(), + .map(|(i, chunk)| { + let to_update = chunk.collect::>(); + to_update_count += to_update.len(); + + IndexerJobUpdateStep { + chunk_idx: i, + to_update, + } }) .collect::>(); + debug!("Walker at shallow indexer found {to_update_count} file_paths to be updated"); + for step in update_steps { execute_indexer_update_step(&step, library).await?; } + if to_walk_path != location_path { + reverse_update_directories_sizes(to_walk_path, location_id, location_path, library) + .await + .map_err(IndexerError::from)?; + } + invalidate_query!(library, "search.paths"); library.orphan_remover.invoke().await; diff --git a/core/src/location/indexer/walk.rs b/core/src/location/indexer/walk.rs index dee8df6c9..824e9f356 100644 --- a/core/src/location/indexer/walk.rs +++ b/core/src/location/indexer/walk.rs @@ -44,6 +44,7 @@ pub struct WalkedEntry { pub struct ToWalkEntry { path: PathBuf, parent_dir_accepted_by_its_children: Option, + maybe_parent: Option, } #[derive(Debug)] @@ -109,6 +110,7 @@ where pub to_walk: VecDeque, pub to_remove: ToRemove, pub errors: Vec, + pub paths_and_sizes: HashMap, } /// This function walks through the filesystem, applying the rules to each entry and then returning @@ -144,16 +146,18 @@ where to_walk.push_back(ToWalkEntry { path: root.to_path_buf(), parent_dir_accepted_by_its_children: None, + maybe_parent: None, }); let mut indexed_paths = HashSet::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY); let mut errors = vec![]; - let mut paths_buffer = Vec::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY); + let mut paths_buffer = HashSet::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY); + let mut paths_and_sizes = HashMap::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY); let mut to_remove = vec![]; - while let Some(ref entry) = to_walk.pop_front() { - let current_to_remove = inner_walk_single_dir( + while let Some(entry) = to_walk.pop_front() { + let (entry_size, current_to_remove) = inner_walk_single_dir( root, - entry, + &entry, indexer_rules, &mut update_notifier, &to_remove_db_fetcher, @@ -168,6 +172,14 @@ where .await; to_remove.push(current_to_remove); + // Saving the size of current entry + paths_and_sizes.insert(entry.path, entry_size); + + // Adding the size of current entry to its parent + if let Some(parent) = entry.maybe_parent { + *paths_and_sizes.entry(parent).or_default() += entry_size; + } + if indexed_paths.len() >= limit as usize { break; } @@ -181,6 +193,7 @@ where to_walk, to_remove: to_remove.into_iter().flatten(), errors, + paths_and_sizes, }) } @@ -209,10 +222,10 @@ where { let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY); let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); - let mut paths_buffer = Vec::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); + let mut paths_buffer = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); let mut errors = vec![]; - let to_remove = inner_walk_single_dir( + let (to_walk_entry_size, to_remove) = inner_walk_single_dir( to_walk_entry.path.clone(), to_walk_entry, indexer_rules, @@ -236,6 +249,16 @@ where to_walk: to_keep_walking, to_remove: to_remove.into_iter(), errors, + paths_and_sizes: [ + Some((to_walk_entry.path.clone(), to_walk_entry_size)), + to_walk_entry + .maybe_parent + .as_ref() + .map(|parent_path| (parent_path.clone(), to_walk_entry_size)), + ] + .into_iter() + .flatten() + .collect(), }) } @@ -256,6 +279,7 @@ pub(super) async fn walk_single_dir( impl Iterator, Vec, Vec, + u64, ), IndexerError, > @@ -279,14 +303,15 @@ where }); } - let mut paths_buffer = Vec::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); + let mut paths_buffer = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY); let mut errors = vec![]; - let to_remove = inner_walk_single_dir( + let (root_size, to_remove) = inner_walk_single_dir( root, &ToWalkEntry { path: root.to_path_buf(), parent_dir_accepted_by_its_children: None, + maybe_parent: None, }, indexer_rules, &mut update_notifier, @@ -303,7 +328,7 @@ where let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?; - Ok((walked, to_update, to_remove, errors)) + Ok((walked, to_update, to_remove, errors, root_size)) } async fn filter_existing_paths( @@ -357,11 +382,30 @@ where // Datetimes stored in DB loses a bit of precision, so we need to check against a delta // instead of using != operator - if inode != metadata.inode + if (inode != metadata.inode || device != metadata.device || DateTime::::from( metadata.modified_at, ) - *date_modified - > Duration::milliseconds(1) + > Duration::milliseconds(1)) && !(entry.iso_file_path.is_dir + && metadata.size_in_bytes + != file_path + .size_in_bytes_bytes + .as_ref() + .map(|size_in_bytes_bytes| { + u64::from_be_bytes([ + size_in_bytes_bytes[0], + size_in_bytes_bytes[1], + size_in_bytes_bytes[2], + size_in_bytes_bytes[3], + size_in_bytes_bytes[4], + size_in_bytes_bytes[5], + size_in_bytes_bytes[6], + size_in_bytes_bytes[7], + ]) + }) + .unwrap_or_default()) + // We ignore the size of directories because it is not reliable, we need to + // calculate it ourselves later { to_update.push( (sd_utils::from_bytes_to_uuid(&file_path.pub_id), entry).into(), @@ -382,7 +426,7 @@ where struct WorkingTable<'a> { indexed_paths: &'a mut HashSet, - paths_buffer: &'a mut Vec, + paths_buffer: &'a mut HashSet, maybe_to_walk: Option<&'a mut VecDeque>, errors: &'a mut Vec, } @@ -392,6 +436,7 @@ async fn inner_walk_single_dir( ToWalkEntry { path, parent_dir_accepted_by_its_children, + .. }: &ToWalkEntry, indexer_rules: &[IndexerRule], update_notifier: &mut impl FnMut(&Path, usize), @@ -406,21 +451,21 @@ async fn inner_walk_single_dir( mut maybe_to_walk, errors, }: WorkingTable<'_>, -) -> Vec +) -> (u64, Vec) where ToRemoveDbFetcherFut: Future, IndexerError>>, { let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e)) else { - return vec![]; + return (0, vec![]); }; let Ok(mut read_dir) = fs::read_dir(path) .await .map_err(|e| errors.push(FileIOError::from((path.clone(), e)).into())) else { - return vec![]; + return (0, vec![]); }; let root = root.as_ref(); @@ -537,6 +582,7 @@ where to_walk.push_back(ToWalkEntry { path: entry.path(), parent_dir_accepted_by_its_children: accept_by_children_dir, + maybe_parent: Some(path.clone()), }); } } @@ -567,7 +613,7 @@ where continue; }; - paths_buffer.push(WalkingEntry { + paths_buffer.insert(WalkingEntry { iso_file_path, maybe_metadata: Some(metadata), }); @@ -608,7 +654,7 @@ where ancestor_iso_walking_entry.maybe_metadata = Some(metadata); - paths_buffer.push(ancestor_iso_walking_entry); + paths_buffer.insert(ancestor_iso_walking_entry); } else { // If indexed_paths contains the current ancestors, then it will contain // also all if its ancestors too, so we can stop here @@ -635,11 +681,18 @@ where vec![] }); + let mut to_walk_entry_size = 0; + // Just merging the `found_paths` with `indexed_paths` here in the end to avoid possibly // multiple rehashes during function execution - indexed_paths.extend(paths_buffer.drain(..)); + indexed_paths.extend(paths_buffer.drain().map(|walking_entry| { + if let Some(metadata) = &walking_entry.maybe_metadata { + to_walk_entry_size += metadata.size_in_bytes; + } + walking_entry + })); - to_remove + (to_walk_entry_size, to_remove) } #[cfg(test)] diff --git a/core/src/location/manager/watcher/linux.rs b/core/src/location/manager/watcher/linux.rs index 3694f8148..8c07e83b7 100644 --- a/core/src/location/manager/watcher/linux.rs +++ b/core/src/location/manager/watcher/linux.rs @@ -13,7 +13,7 @@ use crate::{ use std::{ collections::{BTreeMap, HashMap}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; @@ -26,7 +26,7 @@ use tokio::{fs, time::Instant}; use tracing::{error, trace}; use super::{ - utils::{create_dir, remove, rename, update_file}, + utils::{create_dir, recalculate_directories_size, remove, rename, update_file}, EventHandler, HUNDRED_MILLIS, ONE_SECOND, }; @@ -37,11 +37,11 @@ pub(super) struct LinuxEventHandler<'lib> { node: &'lib Arc, last_events_eviction_check: Instant, rename_from: HashMap, - rename_from_buffer: Vec<(PathBuf, Instant)>, recently_renamed_from: BTreeMap, files_to_update: HashMap, - files_to_update_buffer: Vec<(PathBuf, Instant)>, reincident_to_update_files: HashMap, + to_recalculate_size: HashMap, + path_and_instant_buffer: Vec<(PathBuf, Instant)>, } #[async_trait] @@ -57,11 +57,11 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { node, last_events_eviction_check: Instant::now(), rename_from: HashMap::new(), - rename_from_buffer: Vec::new(), recently_renamed_from: BTreeMap::new(), files_to_update: HashMap::new(), - files_to_update_buffer: Vec::new(), reincident_to_update_files: HashMap::new(), + to_recalculate_size: HashMap::new(), + path_and_instant_buffer: Vec::new(), } } @@ -97,6 +97,9 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { EventKind::Create(CreateKind::Folder) => { let path = &paths[0]; + // Don't need to dispatch a recalculate directory event as `create_dir` dispatches + // a `scan_location_sub_path` function, which recalculates the size already + create_dir( self.location_id, path, @@ -135,7 +138,15 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { .insert(paths.swap_remove(0), Instant::now()); } EventKind::Remove(_) => { - remove(self.location_id, &paths[0], self.library).await?; + let path = paths.remove(0); + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } + + remove(self.location_id, &path, self.library).await?; } other_event_kind => { trace!("Other Linux event that we don't handle for now: {other_event_kind:#?}"); @@ -158,6 +169,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { self.recently_renamed_from .retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS); + if !self.to_recalculate_size.is_empty() { + if let Err(e) = recalculate_directories_size( + &mut self.to_recalculate_size, + &mut self.path_and_instant_buffer, + self.location_id, + self.library, + ) + .await + { + error!("Failed to recalculate directories size: {e:#?}"); + } + } + self.last_events_eviction_check = Instant::now(); } } @@ -165,13 +189,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { impl LinuxEventHandler<'_> { async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> { - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); let mut should_invalidate = false; for (path, created_at) in self.files_to_update.drain() { if created_at.elapsed() < HUNDRED_MILLIS * 5 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } self.reincident_to_update_files.remove(&path); update_file(self.location_id, &path, self.node, self.library).await?; should_invalidate = true; @@ -179,17 +209,23 @@ impl LinuxEventHandler<'_> { } self.files_to_update - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); // We have to check if we have any reincident files to update and update them after a bigger // timeout, this way we keep track of files being update frequently enough to bypass our // eviction check above for (path, created_at) in self.reincident_to_update_files.drain() { if created_at.elapsed() < ONE_SECOND * 10 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } self.files_to_update.remove(&path); update_file(self.location_id, &path, self.node, self.library).await?; should_invalidate = true; @@ -201,22 +237,28 @@ impl LinuxEventHandler<'_> { } self.reincident_to_update_files - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); Ok(()) } async fn handle_rename_from_eviction(&mut self) -> Result<(), LocationManagerError> { - self.rename_from_buffer.clear(); + self.path_and_instant_buffer.clear(); let mut should_invalidate = false; for (path, instant) in self.rename_from.drain() { if instant.elapsed() > HUNDRED_MILLIS { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } remove(self.location_id, &path, self.library).await?; should_invalidate = true; trace!("Removed file_path due timeout: {}", path.display()); } else { - self.rename_from_buffer.push((path, instant)); + self.path_and_instant_buffer.push((path, instant)); } } @@ -224,7 +266,7 @@ impl LinuxEventHandler<'_> { invalidate_query!(self.library, "search.paths"); } - for (path, instant) in self.rename_from_buffer.drain(..) { + for (path, instant) in self.path_and_instant_buffer.drain(..) { self.rename_from.insert(path, instant); } diff --git a/core/src/location/manager/watcher/macos.rs b/core/src/location/manager/watcher/macos.rs index a9e229546..1230dcea1 100644 --- a/core/src/location/manager/watcher/macos.rs +++ b/core/src/location/manager/watcher/macos.rs @@ -23,7 +23,11 @@ use crate::{ Node, }; -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; use async_trait::async_trait; use notify::{ @@ -35,8 +39,8 @@ use tracing::{error, trace, warn}; use super::{ utils::{ - create_dir, create_dir_or_file, extract_inode_and_device_from_path, extract_location_path, - remove, rename, update_file, + create_dir, create_file, extract_inode_and_device_from_path, extract_location_path, + recalculate_directories_size, remove, rename, update_file, }, EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND, }; @@ -47,13 +51,14 @@ pub(super) struct MacOsEventHandler<'lib> { library: &'lib Arc, node: &'lib Arc, files_to_update: HashMap, - files_to_update_buffer: Vec<(PathBuf, Instant)>, reincident_to_update_files: HashMap, last_events_eviction_check: Instant, latest_created_dir: Option, old_paths_map: HashMap, new_paths_map: HashMap, paths_map_buffer: Vec<(INodeAndDevice, InstantAndPath)>, + to_recalculate_size: HashMap, + path_and_instant_buffer: Vec<(PathBuf, Instant)>, } #[async_trait] @@ -71,13 +76,14 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { library, node, files_to_update: HashMap::new(), - files_to_update_buffer: Vec::new(), reincident_to_update_files: HashMap::new(), last_events_eviction_check: Instant::now(), latest_created_dir: None, old_paths_map: HashMap::new(), new_paths_map: HashMap::new(), paths_map_buffer: Vec::new(), + to_recalculate_size: HashMap::new(), + path_and_instant_buffer: Vec::new(), } } @@ -101,6 +107,9 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { } } + // Don't need to dispatch a recalculate directory event as `create_dir` dispatches + // a `scan_location_sub_path` function, which recalculates the size already + create_dir( self.location_id, path, @@ -141,7 +150,14 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { } EventKind::Remove(_) => { - remove(self.location_id, &paths[0], self.library).await?; + let path = paths.remove(0); + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } + remove(self.location_id, &path, self.library).await?; } other_event_kind => { trace!("Other MacOS event that we don't handle for now: {other_event_kind:#?}"); @@ -166,6 +182,19 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { error!("Failed to remove file_path: {e:#?}"); } + if !self.to_recalculate_size.is_empty() { + if let Err(e) = recalculate_directories_size( + &mut self.to_recalculate_size, + &mut self.path_and_instant_buffer, + self.location_id, + self.library, + ) + .await + { + error!("Failed to recalculate directories size: {e:#?}"); + } + } + self.last_events_eviction_check = Instant::now(); } } @@ -173,13 +202,19 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { impl MacOsEventHandler<'_> { async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> { - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); let mut should_invalidate = false; for (path, created_at) in self.files_to_update.drain() { if created_at.elapsed() < HUNDRED_MILLIS * 5 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } self.reincident_to_update_files.remove(&path); update_file(self.location_id, &path, self.node, self.library).await?; should_invalidate = true; @@ -187,17 +222,23 @@ impl MacOsEventHandler<'_> { } self.files_to_update - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); // We have to check if we have any reincident files to update and update them after a bigger // timeout, this way we keep track of files being update frequently enough to bypass our // eviction check above for (path, created_at) in self.reincident_to_update_files.drain() { if created_at.elapsed() < ONE_SECOND * 10 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } self.files_to_update.remove(&path); update_file(self.location_id, &path, self.node, self.library).await?; should_invalidate = true; @@ -209,7 +250,7 @@ impl MacOsEventHandler<'_> { } self.reincident_to_update_files - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); Ok(()) } @@ -222,7 +263,26 @@ impl MacOsEventHandler<'_> { for (inode_and_device, (instant, path)) in self.new_paths_map.drain() { if instant.elapsed() > HUNDRED_MILLIS { if !self.files_to_update.contains_key(&path) { - create_dir_or_file(self.location_id, &path, self.node, self.library).await?; + let metadata = fs::metadata(&path) + .await + .map_err(|e| FileIOError::from((&path, e)))?; + + if metadata.is_dir() { + // Don't need to dispatch a recalculate directory event as `create_dir` dispatches + // a `scan_location_sub_path` function, which recalculates the size already + create_dir(self.location_id, &path, &metadata, self.node, self.library) + .await?; + } else { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } + create_file(self.location_id, &path, &metadata, self.node, self.library) + .await?; + } + trace!("Created file_path due timeout: {}", path.display()); should_invalidate = true; } @@ -248,6 +308,12 @@ impl MacOsEventHandler<'_> { for (inode_and_device, (instant, path)) in self.old_paths_map.drain() { if instant.elapsed() > HUNDRED_MILLIS { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } remove(self.location_id, &path, self.library).await?; trace!("Removed file_path due timeout: {}", path.display()); should_invalidate = true; diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index e5b34612e..287e7f357 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -10,7 +10,9 @@ use crate::{ loose_find_existing_file_path_params, FilePathError, FilePathMetadata, IsolatedFilePathData, MetadataExt, }, - find_location, generate_thumbnail, location_with_indexer_rules, + find_location, generate_thumbnail, + indexer::reverse_update_directories_sizes, + location_with_indexer_rules, manager::LocationManagerError, scan_location_sub_path, }, @@ -38,7 +40,7 @@ use crate::location::file_path_helper::get_inode_and_device; use crate::location::file_path_helper::get_inode_and_device_from_path; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, ffi::OsStr, fs::Metadata, path::{Path, PathBuf}, @@ -57,11 +59,12 @@ use serde_json::json; use tokio::{ fs, io::{self, ErrorKind}, + time::Instant, }; use tracing::{debug, error, trace, warn}; use uuid::Uuid; -use super::INodeAndDevice; +use super::{INodeAndDevice, HUNDRED_MILLIS}; pub(super) fn check_event(event: &Event, ignore_paths: &HashSet) -> bool { // if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore @@ -316,25 +319,6 @@ async fn inner_create_file( Ok(()) } -pub(super) async fn create_dir_or_file( - location_id: location::id::Type, - path: impl AsRef, - node: &Arc, - library: &Arc, -) -> Result { - let path = path.as_ref(); - let metadata = fs::metadata(path) - .await - .map_err(|e| FileIOError::from((path, e)))?; - - if metadata.is_dir() { - create_dir(location_id, path, &metadata, node, library).await - } else { - create_file(location_id, path, &metadata, node, library).await - } - .map(|_| metadata) -} - pub(super) async fn update_file( location_id: location::id::Type, full_path: impl AsRef, @@ -825,3 +809,52 @@ pub(super) async fn extract_location_path( |location| Ok(maybe_missing(location.path, "location.path")?.into()), ) } + +pub(super) async fn recalculate_directories_size( + candidates: &mut HashMap, + buffer: &mut Vec<(PathBuf, Instant)>, + location_id: location::id::Type, + library: &Library, +) -> Result<(), LocationManagerError> { + let mut location_path_cache = None; + let mut should_invalidate = false; + buffer.clear(); + + for (path, instant) in candidates.drain() { + if instant.elapsed() > HUNDRED_MILLIS * 5 { + if location_path_cache.is_none() { + location_path_cache = Some(maybe_missing( + find_location(library, location_id) + .select(location::select!({ path })) + .exec() + .await? + .ok_or(LocationManagerError::MissingLocation(location_id))? + .path, + "location.path", + )?) + } + + if let Some(location_path) = &location_path_cache { + if path != Path::new(location_path) { + trace!( + "Reverse calculating directory sizes starting at {} until {location_path}", + path.display() + ); + reverse_update_directories_sizes(path, location_id, location_path, library) + .await?; + should_invalidate = true; + } + } + } else { + buffer.push((path, instant)); + } + } + + if should_invalidate { + invalidate_query!(library, "search.paths"); + } + + candidates.extend(buffer.drain(..)); + + Ok(()) +} diff --git a/core/src/location/manager/watcher/windows.rs b/core/src/location/manager/watcher/windows.rs index 43ca2c21d..6a0bc5dec 100644 --- a/core/src/location/manager/watcher/windows.rs +++ b/core/src/location/manager/watcher/windows.rs @@ -21,7 +21,7 @@ use crate::{ use std::{ collections::{BTreeMap, HashMap}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; @@ -34,7 +34,10 @@ use tokio::{fs, time::Instant}; use tracing::{error, trace}; use super::{ - utils::{create_dir, extract_inode_and_device_from_path, remove, rename, update_file}, + utils::{ + create_dir, extract_inode_and_device_from_path, recalculate_directories_size, remove, + rename, update_file, + }, EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND, }; @@ -50,8 +53,9 @@ pub(super) struct WindowsEventHandler<'lib> { files_to_remove: HashMap, files_to_remove_buffer: Vec<(INodeAndDevice, InstantAndPath)>, files_to_update: HashMap, - files_to_update_buffer: Vec<(PathBuf, Instant)>, reincident_to_update_files: HashMap, + to_recalculate_size: HashMap, + path_and_instant_buffer: Vec<(PathBuf, Instant)>, } #[async_trait] @@ -74,8 +78,9 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { files_to_remove: HashMap::new(), files_to_remove_buffer: Vec::new(), files_to_update: HashMap::new(), - files_to_update_buffer: Vec::new(), reincident_to_update_files: HashMap::new(), + to_recalculate_size: HashMap::new(), + path_and_instant_buffer: Vec::new(), } } @@ -132,6 +137,8 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { let path = paths.remove(0); if metadata.is_dir() { + // Don't need to dispatch a recalculate directory event as `create_dir` dispatches + // a `scan_location_sub_path` function, which recalculates the size already create_dir(self.location_id, path, &metadata, self.node, self.library) .await?; } else if self.files_to_update.contains_key(&path) { @@ -249,6 +256,19 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { error!("Failed to remove file_path: {e:#?}"); } + if !self.to_recalculate_size.is_empty() { + if let Err(e) = recalculate_directories_size( + &mut self.to_recalculate_size, + &mut self.path_and_instant_buffer, + self.location_id, + self.library, + ) + .await + { + error!("Failed to recalculate directories size: {e:#?}"); + } + } + self.last_events_eviction_check = Instant::now(); } } @@ -256,33 +276,47 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { impl WindowsEventHandler<'_> { async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> { - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); let mut should_invalidate = false; for (path, created_at) in self.files_to_update.drain() { if created_at.elapsed() < HUNDRED_MILLIS * 5 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { self.reincident_to_update_files.remove(&path); - handle_update(self.location_id, &path, self.node, self.library).await?; + handle_update( + self.location_id, + &path, + self.node, + &mut self.to_recalculate_size, + self.library, + ) + .await?; should_invalidate = true; } } self.files_to_update - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); - self.files_to_update_buffer.clear(); + self.path_and_instant_buffer.clear(); // We have to check if we have any reincident files to update and update them after a bigger // timeout, this way we keep track of files being update frequently enough to bypass our // eviction check above for (path, created_at) in self.reincident_to_update_files.drain() { if created_at.elapsed() < ONE_SECOND * 10 { - self.files_to_update_buffer.push((path, created_at)); + self.path_and_instant_buffer.push((path, created_at)); } else { self.files_to_update.remove(&path); - handle_update(self.location_id, &path, self.node, self.library).await?; + handle_update( + self.location_id, + &path, + self.node, + &mut self.to_recalculate_size, + self.library, + ) + .await?; should_invalidate = true; } } @@ -292,7 +326,7 @@ impl WindowsEventHandler<'_> { } self.reincident_to_update_files - .extend(self.files_to_update_buffer.drain(..)); + .extend(self.path_and_instant_buffer.drain(..)); Ok(()) } @@ -303,6 +337,12 @@ impl WindowsEventHandler<'_> { for (inode_and_device, (instant, path)) in self.files_to_remove.drain() { if instant.elapsed() > HUNDRED_MILLIS { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + self.to_recalculate_size + .insert(parent.to_path_buf(), Instant::now()); + } + } remove(self.location_id, &path, self.library).await?; should_invalidate = true; trace!("Removed file_path due timeout: {}", path.display()); @@ -327,12 +367,18 @@ async fn handle_update<'lib>( location_id: location::id::Type, path: &PathBuf, node: &'lib Arc, + to_recalculate_size: &mut HashMap, library: &'lib Arc, ) -> Result<(), LocationManagerError> { let metadata = fs::metadata(&path) .await .map_err(|e| FileIOError::from((&path, e)))?; if metadata.is_file() { + if let Some(parent) = path.parent() { + if parent != Path::new("") { + to_recalculate_size.insert(parent.to_path_buf(), Instant::now()); + } + } update_file(location_id, path, node, library).await?; }