[ENG-812] Folder size calculation (#1362)

* Fix warnings

* Working when doing a full location indexing

* Directory size calculation for sub_path indexing
Also for shallow indexer
Now shallow indexer dispatches sub_path scans when it finds a new folder

* Directory size calculations in the watcher

* Pass directory sizes to sync system
This commit is contained in:
Ericson "Fogo" Soares
2023-09-21 10:11:25 -03:00
committed by GitHub
parent 5005c50e23
commit 3cd55bb551
10 changed files with 670 additions and 109 deletions

1
.gitignore vendored
View File

@@ -77,5 +77,6 @@ dev.db-journal
/core/migration_test
sd_init.json
spacedrive
.cargo/config
.github/scripts/deps

View File

@@ -80,6 +80,7 @@ file_path::select!(file_path_walker {
date_modified
inode
device
size_in_bytes_bytes
});
file_path::select!(file_path_to_handle_custom_uri {
pub_id

View File

@@ -4,6 +4,7 @@ use crate::{
CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata,
JobStepOutput, StatefulJob, WorkerContext,
},
library::Library,
location::{
file_path_helper::{
ensure_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
@@ -11,11 +12,17 @@ use crate::{
},
location_with_indexer_rules,
},
prisma::{file_path, location},
to_remove_db_fetcher_fn,
util::db::maybe_missing,
};
use sd_prisma::prisma_sync;
use sd_sync::*;
use sd_utils::from_bytes_to_uuid;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
@@ -23,14 +30,15 @@ use std::{
};
use itertools::Itertools;
use prisma_client_rust::operator::or;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::info;
use tracing::{debug, info, warn};
use super::{
execute_indexer_save_step, execute_indexer_update_step, iso_file_path_factory,
remove_non_existing_file_paths,
remove_non_existing_file_paths, reverse_update_directories_sizes,
rules::IndexerRule,
walk::{keep_walking, walk, ToWalkEntry, WalkResult},
IndexerError, IndexerJobSaveStep, IndexerJobUpdateStep,
@@ -61,6 +69,7 @@ impl Hash for IndexerJobInit {
/// contains some metadata for logging purposes.
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobData {
location_path: PathBuf,
indexed_path: PathBuf,
indexer_rules: Vec<IndexerRule>,
}
@@ -76,6 +85,7 @@ pub struct IndexerJobRunMetadata {
indexed_count: u64,
updated_count: u64,
removed_count: u64,
paths_and_sizes: HashMap<PathBuf, u64>,
}
impl JobRunMetadata for IndexerJobRunMetadata {
@@ -88,6 +98,10 @@ impl JobRunMetadata for IndexerJobRunMetadata {
self.total_update_steps += new_data.total_update_steps;
self.indexed_count += new_data.indexed_count;
self.removed_count += new_data.removed_count;
for (path, size) in new_data.paths_and_sizes {
*self.paths_and_sizes.entry(path).or_default() += size;
}
}
}
@@ -186,6 +200,7 @@ impl StatefulJob for IndexerJobInit {
to_walk,
to_remove,
errors,
paths_and_sizes,
} = walk(
&to_walk_path,
&indexer_rules,
@@ -199,6 +214,11 @@ impl StatefulJob for IndexerJobInit {
let scan_read_time = scan_start.elapsed();
let to_remove = to_remove.collect::<Vec<_>>();
debug!(
"Walker at indexer job found {} file_paths to be removed",
to_remove.len()
);
ctx.node
.thumbnail_remover
.remove_cas_ids(
@@ -255,6 +275,8 @@ impl StatefulJob for IndexerJobInit {
.chain(to_walk.into_iter().map(IndexerJobStepInput::Walk))
.collect::<Vec<_>>();
debug!("Walker at indexer job found {total_updated_paths} file_paths to be updated");
IndexerJobData::on_scan_progress(
ctx,
vec![
@@ -268,6 +290,7 @@ impl StatefulJob for IndexerJobInit {
);
*data = Some(IndexerJobData {
location_path: location_path.to_path_buf(),
indexed_path: to_walk_path,
indexer_rules,
});
@@ -283,6 +306,7 @@ impl StatefulJob for IndexerJobInit {
removed_count,
total_save_steps: *to_save_chunks as u64,
total_update_steps: *to_update_chunks as u64,
paths_and_sizes,
},
steps,
errors
@@ -362,6 +386,7 @@ impl StatefulJob for IndexerJobInit {
to_walk,
to_remove,
errors,
paths_and_sizes,
} = keep_walking(
to_walk_entry,
&data.indexer_rules,
@@ -372,6 +397,8 @@ impl StatefulJob for IndexerJobInit {
)
.await?;
new_metadata.paths_and_sizes = paths_and_sizes;
new_metadata.scan_read_time = scan_start.elapsed();
let db_delete_time = Instant::now();
@@ -441,14 +468,18 @@ impl StatefulJob for IndexerJobInit {
async fn finalize(
&self,
ctx: &WorkerContext,
_data: &Option<Self::Data>,
data: &Option<Self::Data>,
run_metadata: &Self::RunMetadata,
) -> JobResult {
let init = self;
let indexed_path_str = data
.as_ref()
.map(|data| Ok(data.indexed_path.to_string_lossy().to_string()))
.unwrap_or_else(|| maybe_missing(&init.location.path, "location.path").cloned())?;
info!(
"Scan of {} completed in {:?}. {} new files found, \
"Scan of {indexed_path_str} completed in {:?}. {} new files found, \
indexed {} files in db, updated {} entries. db write completed in {:?}",
maybe_missing(&init.location.path, "location.path")?,
run_metadata.scan_read_time,
run_metadata.total_paths,
run_metadata.indexed_count,
@@ -465,6 +496,27 @@ impl StatefulJob for IndexerJobInit {
ctx.library.orphan_remover.invoke().await;
}
if let Some(data) = data {
update_directories_sizes(
&run_metadata.paths_and_sizes,
init.location.id,
&data.indexed_path,
&ctx.library,
)
.await?;
if data.indexed_path != data.location_path {
reverse_update_directories_sizes(
&data.indexed_path,
init.location.id,
&data.location_path,
&ctx.library,
)
.await
.map_err(IndexerError::from)?;
}
}
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}
@@ -480,3 +532,81 @@ fn update_notifier_fn(ctx: &WorkerContext) -> impl FnMut(&Path, usize) + '_ {
);
}
}
async fn update_directories_sizes(
paths_and_sizes: &HashMap<PathBuf, u64>,
location_id: location::id::Type,
location_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), IndexerError> {
let location_path = location_path.as_ref();
let Library { db, sync, .. } = library;
let chunked_queries = paths_and_sizes
.keys()
.chunks(200)
.into_iter()
.map(|paths_chunk| {
paths_chunk
.into_iter()
.map(|path| {
IsolatedFilePathData::new(location_id, location_path, path, true)
.map(file_path::WhereParam::from)
})
.collect::<Result<Vec<_>, _>>()
.map(|params| {
db.file_path()
.find_many(vec![or(params)])
.select(file_path::select!({ pub_id materialized_path name }))
})
})
.collect::<Result<Vec<_>, _>>()?;
let to_sync_and_update = db
._batch(chunked_queries)
.await?
.into_iter()
.flatten()
.filter_map(
|file_path| match (file_path.materialized_path, file_path.name) {
(Some(materialized_path), Some(name)) => {
let mut directory_full_path = location_path.join(&materialized_path[1..]);
directory_full_path.push(name);
if let Some(size) = paths_and_sizes.get(&directory_full_path) {
let size_bytes = size.to_be_bytes().to_vec();
Some((
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: file_path.pub_id.clone(),
},
file_path::size_in_bytes_bytes::NAME,
json!(size_bytes.clone()),
),
db.file_path().update(
file_path::pub_id::equals(file_path.pub_id),
vec![file_path::size_in_bytes_bytes::set(Some(size_bytes))],
),
))
} else {
warn!("Found a file_path without ancestor in the database, possible corruption");
None
}
}
_ => {
warn!(
"Found a file_path missing its materialized_path or name: <pub_id='{:#?}'>",
from_bytes_to_uuid(&file_path.pub_id)
);
None
}
},
)
.unzip::<_, _, Vec<_>, Vec<_>>();
sync.write_ops(db, to_sync_and_update).await?;
Ok(())
}

View File

@@ -7,16 +7,20 @@ use crate::{
},
};
use std::path::Path;
use chrono::Utc;
use rspc::ErrorCode;
use sd_prisma::prisma_sync;
use sd_sync::*;
use sd_utils::from_bytes_to_uuid;
use std::{collections::HashMap, path::Path};
use chrono::Utc;
use itertools::Itertools;
use prisma_client_rust::operator::or;
use rspc::ErrorCode;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tracing::trace;
use tracing::{trace, warn};
use super::{
file_path_helper::{file_path_pub_and_cas_ids, FilePathError, IsolatedFilePathData},
@@ -386,3 +390,137 @@ macro_rules! to_remove_db_fetcher_fn {
}
}};
}
pub async fn reverse_update_directories_sizes(
base_path: impl AsRef<Path>,
location_id: location::id::Type,
location_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), FilePathError> {
let base_path = base_path.as_ref();
let location_path = location_path.as_ref();
let Library { sync, db, .. } = library;
let ancestors = base_path
.ancestors()
.take_while(|&ancestor| ancestor != location_path)
.map(|ancestor| IsolatedFilePathData::new(location_id, location_path, ancestor, true))
.collect::<Result<Vec<_>, _>>()?;
let chunked_queries = ancestors
.iter()
.chunks(200)
.into_iter()
.map(|ancestors_iso_file_paths_chunk| {
db.file_path()
.find_many(vec![or(ancestors_iso_file_paths_chunk
.into_iter()
.map(file_path::WhereParam::from)
.collect::<Vec<_>>())])
.select(file_path::select!({ pub_id materialized_path name }))
})
.collect::<Vec<_>>();
let mut pub_id_by_ancestor_materialized_path = db
._batch(chunked_queries)
.await?
.into_iter()
.flatten()
.filter_map(
|file_path| match (file_path.materialized_path, file_path.name) {
(Some(materialized_path), Some(name)) => {
Some((format!("{materialized_path}{name}/"), (file_path.pub_id, 0)))
}
_ => {
warn!(
"Found a file_path missing its materialized_path or name: <pub_id='{:#?}'>",
from_bytes_to_uuid(&file_path.pub_id)
);
None
}
},
)
.collect::<HashMap<_, _>>();
db.file_path()
.find_many(vec![
file_path::location_id::equals(Some(location_id)),
file_path::materialized_path::in_vec(
ancestors
.iter()
.map(|ancestor_iso_file_path| {
ancestor_iso_file_path
.materialized_path_for_children()
.expect("each ancestor is a directory")
})
.collect(),
),
])
.select(file_path::select!({ materialized_path size_in_bytes_bytes }))
.exec()
.await?
.into_iter()
.for_each(|file_path| {
if let Some(materialized_path) = file_path.materialized_path {
if let Some((_, size)) =
pub_id_by_ancestor_materialized_path.get_mut(&materialized_path)
{
*size += file_path
.size_in_bytes_bytes
.map(|size_in_bytes_bytes| {
u64::from_be_bytes([
size_in_bytes_bytes[0],
size_in_bytes_bytes[1],
size_in_bytes_bytes[2],
size_in_bytes_bytes[3],
size_in_bytes_bytes[4],
size_in_bytes_bytes[5],
size_in_bytes_bytes[6],
size_in_bytes_bytes[7],
])
})
.unwrap_or_else(|| {
warn!("Got a directory missing its size in bytes");
0
});
}
} else {
warn!("Corrupt database possesing a file_path entry without materialized_path");
}
});
let to_sync_and_update = ancestors
.into_iter()
.filter_map(|ancestor_iso_file_path| {
if let Some((pub_id, size)) = pub_id_by_ancestor_materialized_path.remove(
&ancestor_iso_file_path
.materialized_path_for_children()
.expect("each ancestor is a directory"),
) {
let size_bytes = size.to_be_bytes().to_vec();
Some((
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
file_path::size_in_bytes_bytes::NAME,
json!(size_bytes.clone()),
),
db.file_path().update(
file_path::pub_id::equals(pub_id),
vec![file_path::size_in_bytes_bytes::set(Some(size_bytes))],
),
))
} else {
warn!("Got a missing ancestor for a file_path in the database, maybe we have a corruption");
None
}
})
.unzip::<_, _, Vec<_>, Vec<_>>();
sync.write_ops(db, to_sync_and_update).await?;
Ok(())
}

View File

@@ -7,19 +7,25 @@ use crate::{
check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
IsolatedFilePathData,
},
indexer::{execute_indexer_update_step, IndexerJobUpdateStep},
LocationError,
indexer::{
execute_indexer_update_step, reverse_update_directories_sizes, IndexerJobUpdateStep,
},
scan_location_sub_path,
},
to_remove_db_fetcher_fn, Node,
to_remove_db_fetcher_fn,
util::db::maybe_missing,
Node,
};
use tracing::error;
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
};
use futures::future::join_all;
use itertools::Itertools;
use tracing::{debug, error};
use super::{
execute_indexer_save_step, iso_file_path_factory, location_with_indexer_rules,
@@ -34,12 +40,10 @@ pub async fn shallow(
location: &location_with_indexer_rules::Data,
sub_path: &PathBuf,
node: &Arc<Node>,
library: &Library,
library: &Arc<Library>,
) -> Result<(), JobError> {
let location_id = location.id;
let Some(location_path) = location.path.as_ref().map(PathBuf::from) else {
return Err(JobError::Location(LocationError::MissingPath(location_id)));
};
let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?;
let db = library.db.clone();
@@ -60,7 +64,7 @@ pub async fn shallow(
(
!check_file_path_exists::<IndexerError>(
&IsolatedFilePathData::new(location_id, &location_path, &full_path, true)
&IsolatedFilePathData::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?,
&db,
)
@@ -71,19 +75,24 @@ pub async fn shallow(
(false, location_path.to_path_buf())
};
let (walked, to_update, to_remove, errors) = {
let (walked, to_update, to_remove, errors, _s) = {
walk_single_dir(
&to_walk_path,
&indexer_rules,
|_, _| {},
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, &db),
iso_file_path_factory(location_id, &location_path),
iso_file_path_factory(location_id, location_path),
add_root,
)
.await?
};
debug!(
"Walker at shallow indexer found {} file_paths to be removed",
to_remove.len()
);
node.thumbnail_remover
.remove_cas_ids(
to_remove
@@ -98,13 +107,28 @@ pub async fn shallow(
// TODO pass these uuids to sync system
remove_non_existing_file_paths(to_remove, &db).await?;
let mut new_directories_to_scan = HashSet::new();
let save_steps = walked
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| IndexerJobSaveStep {
chunk_idx: i,
walked: chunk.collect::<Vec<_>>(),
.map(|(i, chunk)| {
let walked = chunk.collect::<Vec<_>>();
walked
.iter()
.filter_map(|walked_entry| {
walked_entry.iso_file_path.materialized_path_for_children()
})
.for_each(|new_dir| {
new_directories_to_scan.insert(new_dir);
});
IndexerJobSaveStep {
chunk_idx: i,
walked,
}
})
.collect::<Vec<_>>();
@@ -112,20 +136,47 @@ pub async fn shallow(
execute_indexer_save_step(location, &step, library).await?;
}
for scan in join_all(
new_directories_to_scan
.into_iter()
.map(|sub_path| scan_location_sub_path(node, library, location.clone(), sub_path)),
)
.await
{
if let Err(e) = scan {
error!("{e}");
}
}
let mut to_update_count = 0;
let update_steps = to_update
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| IndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk.collect::<Vec<_>>(),
.map(|(i, chunk)| {
let to_update = chunk.collect::<Vec<_>>();
to_update_count += to_update.len();
IndexerJobUpdateStep {
chunk_idx: i,
to_update,
}
})
.collect::<Vec<_>>();
debug!("Walker at shallow indexer found {to_update_count} file_paths to be updated");
for step in update_steps {
execute_indexer_update_step(&step, library).await?;
}
if to_walk_path != location_path {
reverse_update_directories_sizes(to_walk_path, location_id, location_path, library)
.await
.map_err(IndexerError::from)?;
}
invalidate_query!(library, "search.paths");
library.orphan_remover.invoke().await;

View File

@@ -44,6 +44,7 @@ pub struct WalkedEntry {
pub struct ToWalkEntry {
path: PathBuf,
parent_dir_accepted_by_its_children: Option<bool>,
maybe_parent: Option<PathBuf>,
}
#[derive(Debug)]
@@ -109,6 +110,7 @@ where
pub to_walk: VecDeque<ToWalkEntry>,
pub to_remove: ToRemove,
pub errors: Vec<IndexerError>,
pub paths_and_sizes: HashMap<PathBuf, u64>,
}
/// This function walks through the filesystem, applying the rules to each entry and then returning
@@ -144,16 +146,18 @@ where
to_walk.push_back(ToWalkEntry {
path: root.to_path_buf(),
parent_dir_accepted_by_its_children: None,
maybe_parent: None,
});
let mut indexed_paths = HashSet::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY);
let mut errors = vec![];
let mut paths_buffer = Vec::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY);
let mut paths_buffer = HashSet::with_capacity(WALKER_PATHS_BUFFER_INITIAL_CAPACITY);
let mut paths_and_sizes = HashMap::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
let mut to_remove = vec![];
while let Some(ref entry) = to_walk.pop_front() {
let current_to_remove = inner_walk_single_dir(
while let Some(entry) = to_walk.pop_front() {
let (entry_size, current_to_remove) = inner_walk_single_dir(
root,
entry,
&entry,
indexer_rules,
&mut update_notifier,
&to_remove_db_fetcher,
@@ -168,6 +172,14 @@ where
.await;
to_remove.push(current_to_remove);
// Saving the size of current entry
paths_and_sizes.insert(entry.path, entry_size);
// Adding the size of current entry to its parent
if let Some(parent) = entry.maybe_parent {
*paths_and_sizes.entry(parent).or_default() += entry_size;
}
if indexed_paths.len() >= limit as usize {
break;
}
@@ -181,6 +193,7 @@ where
to_walk,
to_remove: to_remove.into_iter().flatten(),
errors,
paths_and_sizes,
})
}
@@ -209,10 +222,10 @@ where
{
let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
let mut indexed_paths = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
let mut paths_buffer = Vec::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
let mut paths_buffer = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
let mut errors = vec![];
let to_remove = inner_walk_single_dir(
let (to_walk_entry_size, to_remove) = inner_walk_single_dir(
to_walk_entry.path.clone(),
to_walk_entry,
indexer_rules,
@@ -236,6 +249,16 @@ where
to_walk: to_keep_walking,
to_remove: to_remove.into_iter(),
errors,
paths_and_sizes: [
Some((to_walk_entry.path.clone(), to_walk_entry_size)),
to_walk_entry
.maybe_parent
.as_ref()
.map(|parent_path| (parent_path.clone(), to_walk_entry_size)),
]
.into_iter()
.flatten()
.collect(),
})
}
@@ -256,6 +279,7 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
impl Iterator<Item = WalkedEntry>,
Vec<file_path_pub_and_cas_ids::Data>,
Vec<IndexerError>,
u64,
),
IndexerError,
>
@@ -279,14 +303,15 @@ where
});
}
let mut paths_buffer = Vec::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
let mut paths_buffer = HashSet::with_capacity(WALK_SINGLE_DIR_PATHS_BUFFER_INITIAL_CAPACITY);
let mut errors = vec![];
let to_remove = inner_walk_single_dir(
let (root_size, to_remove) = inner_walk_single_dir(
root,
&ToWalkEntry {
path: root.to_path_buf(),
parent_dir_accepted_by_its_children: None,
maybe_parent: None,
},
indexer_rules,
&mut update_notifier,
@@ -303,7 +328,7 @@ where
let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?;
Ok((walked, to_update, to_remove, errors))
Ok((walked, to_update, to_remove, errors, root_size))
}
async fn filter_existing_paths<F>(
@@ -357,11 +382,30 @@ where
// Datetimes stored in DB loses a bit of precision, so we need to check against a delta
// instead of using != operator
if inode != metadata.inode
if (inode != metadata.inode
|| device != metadata.device || DateTime::<FixedOffset>::from(
metadata.modified_at,
) - *date_modified
> Duration::milliseconds(1)
> Duration::milliseconds(1)) && !(entry.iso_file_path.is_dir
&& metadata.size_in_bytes
!= file_path
.size_in_bytes_bytes
.as_ref()
.map(|size_in_bytes_bytes| {
u64::from_be_bytes([
size_in_bytes_bytes[0],
size_in_bytes_bytes[1],
size_in_bytes_bytes[2],
size_in_bytes_bytes[3],
size_in_bytes_bytes[4],
size_in_bytes_bytes[5],
size_in_bytes_bytes[6],
size_in_bytes_bytes[7],
])
})
.unwrap_or_default())
// We ignore the size of directories because it is not reliable, we need to
// calculate it ourselves later
{
to_update.push(
(sd_utils::from_bytes_to_uuid(&file_path.pub_id), entry).into(),
@@ -382,7 +426,7 @@ where
struct WorkingTable<'a> {
indexed_paths: &'a mut HashSet<WalkingEntry>,
paths_buffer: &'a mut Vec<WalkingEntry>,
paths_buffer: &'a mut HashSet<WalkingEntry>,
maybe_to_walk: Option<&'a mut VecDeque<ToWalkEntry>>,
errors: &'a mut Vec<IndexerError>,
}
@@ -392,6 +436,7 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
ToWalkEntry {
path,
parent_dir_accepted_by_its_children,
..
}: &ToWalkEntry,
indexer_rules: &[IndexerRule],
update_notifier: &mut impl FnMut(&Path, usize),
@@ -406,21 +451,21 @@ async fn inner_walk_single_dir<ToRemoveDbFetcherFut>(
mut maybe_to_walk,
errors,
}: WorkingTable<'_>,
) -> Vec<file_path_pub_and_cas_ids::Data>
) -> (u64, Vec<file_path_pub_and_cas_ids::Data>)
where
ToRemoveDbFetcherFut:
Future<Output = Result<Vec<file_path_pub_and_cas_ids::Data>, IndexerError>>,
{
let Ok(iso_file_path_to_walk) = iso_file_path_factory(path, true).map_err(|e| errors.push(e))
else {
return vec![];
return (0, vec![]);
};
let Ok(mut read_dir) = fs::read_dir(path)
.await
.map_err(|e| errors.push(FileIOError::from((path.clone(), e)).into()))
else {
return vec![];
return (0, vec![]);
};
let root = root.as_ref();
@@ -537,6 +582,7 @@ where
to_walk.push_back(ToWalkEntry {
path: entry.path(),
parent_dir_accepted_by_its_children: accept_by_children_dir,
maybe_parent: Some(path.clone()),
});
}
}
@@ -567,7 +613,7 @@ where
continue;
};
paths_buffer.push(WalkingEntry {
paths_buffer.insert(WalkingEntry {
iso_file_path,
maybe_metadata: Some(metadata),
});
@@ -608,7 +654,7 @@ where
ancestor_iso_walking_entry.maybe_metadata = Some(metadata);
paths_buffer.push(ancestor_iso_walking_entry);
paths_buffer.insert(ancestor_iso_walking_entry);
} else {
// If indexed_paths contains the current ancestors, then it will contain
// also all if its ancestors too, so we can stop here
@@ -635,11 +681,18 @@ where
vec![]
});
let mut to_walk_entry_size = 0;
// Just merging the `found_paths` with `indexed_paths` here in the end to avoid possibly
// multiple rehashes during function execution
indexed_paths.extend(paths_buffer.drain(..));
indexed_paths.extend(paths_buffer.drain().map(|walking_entry| {
if let Some(metadata) = &walking_entry.maybe_metadata {
to_walk_entry_size += metadata.size_in_bytes;
}
walking_entry
}));
to_remove
(to_walk_entry_size, to_remove)
}
#[cfg(test)]

View File

@@ -13,7 +13,7 @@ use crate::{
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
path::{Path, PathBuf},
sync::Arc,
};
@@ -26,7 +26,7 @@ use tokio::{fs, time::Instant};
use tracing::{error, trace};
use super::{
utils::{create_dir, remove, rename, update_file},
utils::{create_dir, recalculate_directories_size, remove, rename, update_file},
EventHandler, HUNDRED_MILLIS, ONE_SECOND,
};
@@ -37,11 +37,11 @@ pub(super) struct LinuxEventHandler<'lib> {
node: &'lib Arc<Node>,
last_events_eviction_check: Instant,
rename_from: HashMap<PathBuf, Instant>,
rename_from_buffer: Vec<(PathBuf, Instant)>,
recently_renamed_from: BTreeMap<PathBuf, Instant>,
files_to_update: HashMap<PathBuf, Instant>,
files_to_update_buffer: Vec<(PathBuf, Instant)>,
reincident_to_update_files: HashMap<PathBuf, Instant>,
to_recalculate_size: HashMap<PathBuf, Instant>,
path_and_instant_buffer: Vec<(PathBuf, Instant)>,
}
#[async_trait]
@@ -57,11 +57,11 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
node,
last_events_eviction_check: Instant::now(),
rename_from: HashMap::new(),
rename_from_buffer: Vec::new(),
recently_renamed_from: BTreeMap::new(),
files_to_update: HashMap::new(),
files_to_update_buffer: Vec::new(),
reincident_to_update_files: HashMap::new(),
to_recalculate_size: HashMap::new(),
path_and_instant_buffer: Vec::new(),
}
}
@@ -97,6 +97,9 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
EventKind::Create(CreateKind::Folder) => {
let path = &paths[0];
// Don't need to dispatch a recalculate directory event as `create_dir` dispatches
// a `scan_location_sub_path` function, which recalculates the size already
create_dir(
self.location_id,
path,
@@ -135,7 +138,15 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
.insert(paths.swap_remove(0), Instant::now());
}
EventKind::Remove(_) => {
remove(self.location_id, &paths[0], self.library).await?;
let path = paths.remove(0);
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
remove(self.location_id, &path, self.library).await?;
}
other_event_kind => {
trace!("Other Linux event that we don't handle for now: {other_event_kind:#?}");
@@ -158,6 +169,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
self.recently_renamed_from
.retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS);
if !self.to_recalculate_size.is_empty() {
if let Err(e) = recalculate_directories_size(
&mut self.to_recalculate_size,
&mut self.path_and_instant_buffer,
self.location_id,
self.library,
)
.await
{
error!("Failed to recalculate directories size: {e:#?}");
}
}
self.last_events_eviction_check = Instant::now();
}
}
@@ -165,13 +189,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
impl LinuxEventHandler<'_> {
async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> {
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
let mut should_invalidate = false;
for (path, created_at) in self.files_to_update.drain() {
if created_at.elapsed() < HUNDRED_MILLIS * 5 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
self.reincident_to_update_files.remove(&path);
update_file(self.location_id, &path, self.node, self.library).await?;
should_invalidate = true;
@@ -179,17 +209,23 @@ impl LinuxEventHandler<'_> {
}
self.files_to_update
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
// We have to check if we have any reincident files to update and update them after a bigger
// timeout, this way we keep track of files being update frequently enough to bypass our
// eviction check above
for (path, created_at) in self.reincident_to_update_files.drain() {
if created_at.elapsed() < ONE_SECOND * 10 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
self.files_to_update.remove(&path);
update_file(self.location_id, &path, self.node, self.library).await?;
should_invalidate = true;
@@ -201,22 +237,28 @@ impl LinuxEventHandler<'_> {
}
self.reincident_to_update_files
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
Ok(())
}
async fn handle_rename_from_eviction(&mut self) -> Result<(), LocationManagerError> {
self.rename_from_buffer.clear();
self.path_and_instant_buffer.clear();
let mut should_invalidate = false;
for (path, instant) in self.rename_from.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
remove(self.location_id, &path, self.library).await?;
should_invalidate = true;
trace!("Removed file_path due timeout: {}", path.display());
} else {
self.rename_from_buffer.push((path, instant));
self.path_and_instant_buffer.push((path, instant));
}
}
@@ -224,7 +266,7 @@ impl LinuxEventHandler<'_> {
invalidate_query!(self.library, "search.paths");
}
for (path, instant) in self.rename_from_buffer.drain(..) {
for (path, instant) in self.path_and_instant_buffer.drain(..) {
self.rename_from.insert(path, instant);
}

View File

@@ -23,7 +23,11 @@ use crate::{
Node,
};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use async_trait::async_trait;
use notify::{
@@ -35,8 +39,8 @@ use tracing::{error, trace, warn};
use super::{
utils::{
create_dir, create_dir_or_file, extract_inode_and_device_from_path, extract_location_path,
remove, rename, update_file,
create_dir, create_file, extract_inode_and_device_from_path, extract_location_path,
recalculate_directories_size, remove, rename, update_file,
},
EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND,
};
@@ -47,13 +51,14 @@ pub(super) struct MacOsEventHandler<'lib> {
library: &'lib Arc<Library>,
node: &'lib Arc<Node>,
files_to_update: HashMap<PathBuf, Instant>,
files_to_update_buffer: Vec<(PathBuf, Instant)>,
reincident_to_update_files: HashMap<PathBuf, Instant>,
last_events_eviction_check: Instant,
latest_created_dir: Option<PathBuf>,
old_paths_map: HashMap<INodeAndDevice, InstantAndPath>,
new_paths_map: HashMap<INodeAndDevice, InstantAndPath>,
paths_map_buffer: Vec<(INodeAndDevice, InstantAndPath)>,
to_recalculate_size: HashMap<PathBuf, Instant>,
path_and_instant_buffer: Vec<(PathBuf, Instant)>,
}
#[async_trait]
@@ -71,13 +76,14 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
library,
node,
files_to_update: HashMap::new(),
files_to_update_buffer: Vec::new(),
reincident_to_update_files: HashMap::new(),
last_events_eviction_check: Instant::now(),
latest_created_dir: None,
old_paths_map: HashMap::new(),
new_paths_map: HashMap::new(),
paths_map_buffer: Vec::new(),
to_recalculate_size: HashMap::new(),
path_and_instant_buffer: Vec::new(),
}
}
@@ -101,6 +107,9 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
}
}
// Don't need to dispatch a recalculate directory event as `create_dir` dispatches
// a `scan_location_sub_path` function, which recalculates the size already
create_dir(
self.location_id,
path,
@@ -141,7 +150,14 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
}
EventKind::Remove(_) => {
remove(self.location_id, &paths[0], self.library).await?;
let path = paths.remove(0);
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
remove(self.location_id, &path, self.library).await?;
}
other_event_kind => {
trace!("Other MacOS event that we don't handle for now: {other_event_kind:#?}");
@@ -166,6 +182,19 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
error!("Failed to remove file_path: {e:#?}");
}
if !self.to_recalculate_size.is_empty() {
if let Err(e) = recalculate_directories_size(
&mut self.to_recalculate_size,
&mut self.path_and_instant_buffer,
self.location_id,
self.library,
)
.await
{
error!("Failed to recalculate directories size: {e:#?}");
}
}
self.last_events_eviction_check = Instant::now();
}
}
@@ -173,13 +202,19 @@ impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
impl MacOsEventHandler<'_> {
async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> {
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
let mut should_invalidate = false;
for (path, created_at) in self.files_to_update.drain() {
if created_at.elapsed() < HUNDRED_MILLIS * 5 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
self.reincident_to_update_files.remove(&path);
update_file(self.location_id, &path, self.node, self.library).await?;
should_invalidate = true;
@@ -187,17 +222,23 @@ impl MacOsEventHandler<'_> {
}
self.files_to_update
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
// We have to check if we have any reincident files to update and update them after a bigger
// timeout, this way we keep track of files being update frequently enough to bypass our
// eviction check above
for (path, created_at) in self.reincident_to_update_files.drain() {
if created_at.elapsed() < ONE_SECOND * 10 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
self.files_to_update.remove(&path);
update_file(self.location_id, &path, self.node, self.library).await?;
should_invalidate = true;
@@ -209,7 +250,7 @@ impl MacOsEventHandler<'_> {
}
self.reincident_to_update_files
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
Ok(())
}
@@ -222,7 +263,26 @@ impl MacOsEventHandler<'_> {
for (inode_and_device, (instant, path)) in self.new_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if !self.files_to_update.contains_key(&path) {
create_dir_or_file(self.location_id, &path, self.node, self.library).await?;
let metadata = fs::metadata(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))?;
if metadata.is_dir() {
// Don't need to dispatch a recalculate directory event as `create_dir` dispatches
// a `scan_location_sub_path` function, which recalculates the size already
create_dir(self.location_id, &path, &metadata, self.node, self.library)
.await?;
} else {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
create_file(self.location_id, &path, &metadata, self.node, self.library)
.await?;
}
trace!("Created file_path due timeout: {}", path.display());
should_invalidate = true;
}
@@ -248,6 +308,12 @@ impl MacOsEventHandler<'_> {
for (inode_and_device, (instant, path)) in self.old_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
remove(self.location_id, &path, self.library).await?;
trace!("Removed file_path due timeout: {}", path.display());
should_invalidate = true;

View File

@@ -10,7 +10,9 @@ use crate::{
loose_find_existing_file_path_params, FilePathError, FilePathMetadata,
IsolatedFilePathData, MetadataExt,
},
find_location, generate_thumbnail, location_with_indexer_rules,
find_location, generate_thumbnail,
indexer::reverse_update_directories_sizes,
location_with_indexer_rules,
manager::LocationManagerError,
scan_location_sub_path,
},
@@ -38,7 +40,7 @@ use crate::location::file_path_helper::get_inode_and_device;
use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
ffi::OsStr,
fs::Metadata,
path::{Path, PathBuf},
@@ -57,11 +59,12 @@ use serde_json::json;
use tokio::{
fs,
io::{self, ErrorKind},
time::Instant,
};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use super::INodeAndDevice;
use super::{INodeAndDevice, HUNDRED_MILLIS};
pub(super) fn check_event(event: &Event, ignore_paths: &HashSet<PathBuf>) -> bool {
// if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore
@@ -316,25 +319,6 @@ async fn inner_create_file(
Ok(())
}
pub(super) async fn create_dir_or_file(
location_id: location::id::Type,
path: impl AsRef<Path>,
node: &Arc<Node>,
library: &Arc<Library>,
) -> Result<Metadata, LocationManagerError> {
let path = path.as_ref();
let metadata = fs::metadata(path)
.await
.map_err(|e| FileIOError::from((path, e)))?;
if metadata.is_dir() {
create_dir(location_id, path, &metadata, node, library).await
} else {
create_file(location_id, path, &metadata, node, library).await
}
.map(|_| metadata)
}
pub(super) async fn update_file(
location_id: location::id::Type,
full_path: impl AsRef<Path>,
@@ -825,3 +809,52 @@ pub(super) async fn extract_location_path(
|location| Ok(maybe_missing(location.path, "location.path")?.into()),
)
}
pub(super) async fn recalculate_directories_size(
candidates: &mut HashMap<PathBuf, Instant>,
buffer: &mut Vec<(PathBuf, Instant)>,
location_id: location::id::Type,
library: &Library,
) -> Result<(), LocationManagerError> {
let mut location_path_cache = None;
let mut should_invalidate = false;
buffer.clear();
for (path, instant) in candidates.drain() {
if instant.elapsed() > HUNDRED_MILLIS * 5 {
if location_path_cache.is_none() {
location_path_cache = Some(maybe_missing(
find_location(library, location_id)
.select(location::select!({ path }))
.exec()
.await?
.ok_or(LocationManagerError::MissingLocation(location_id))?
.path,
"location.path",
)?)
}
if let Some(location_path) = &location_path_cache {
if path != Path::new(location_path) {
trace!(
"Reverse calculating directory sizes starting at {} until {location_path}",
path.display()
);
reverse_update_directories_sizes(path, location_id, location_path, library)
.await?;
should_invalidate = true;
}
}
} else {
buffer.push((path, instant));
}
}
if should_invalidate {
invalidate_query!(library, "search.paths");
}
candidates.extend(buffer.drain(..));
Ok(())
}

View File

@@ -21,7 +21,7 @@ use crate::{
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
path::{Path, PathBuf},
sync::Arc,
};
@@ -34,7 +34,10 @@ use tokio::{fs, time::Instant};
use tracing::{error, trace};
use super::{
utils::{create_dir, extract_inode_and_device_from_path, remove, rename, update_file},
utils::{
create_dir, extract_inode_and_device_from_path, recalculate_directories_size, remove,
rename, update_file,
},
EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND,
};
@@ -50,8 +53,9 @@ pub(super) struct WindowsEventHandler<'lib> {
files_to_remove: HashMap<INodeAndDevice, InstantAndPath>,
files_to_remove_buffer: Vec<(INodeAndDevice, InstantAndPath)>,
files_to_update: HashMap<PathBuf, Instant>,
files_to_update_buffer: Vec<(PathBuf, Instant)>,
reincident_to_update_files: HashMap<PathBuf, Instant>,
to_recalculate_size: HashMap<PathBuf, Instant>,
path_and_instant_buffer: Vec<(PathBuf, Instant)>,
}
#[async_trait]
@@ -74,8 +78,9 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
files_to_remove: HashMap::new(),
files_to_remove_buffer: Vec::new(),
files_to_update: HashMap::new(),
files_to_update_buffer: Vec::new(),
reincident_to_update_files: HashMap::new(),
to_recalculate_size: HashMap::new(),
path_and_instant_buffer: Vec::new(),
}
}
@@ -132,6 +137,8 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
let path = paths.remove(0);
if metadata.is_dir() {
// Don't need to dispatch a recalculate directory event as `create_dir` dispatches
// a `scan_location_sub_path` function, which recalculates the size already
create_dir(self.location_id, path, &metadata, self.node, self.library)
.await?;
} else if self.files_to_update.contains_key(&path) {
@@ -249,6 +256,19 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
error!("Failed to remove file_path: {e:#?}");
}
if !self.to_recalculate_size.is_empty() {
if let Err(e) = recalculate_directories_size(
&mut self.to_recalculate_size,
&mut self.path_and_instant_buffer,
self.location_id,
self.library,
)
.await
{
error!("Failed to recalculate directories size: {e:#?}");
}
}
self.last_events_eviction_check = Instant::now();
}
}
@@ -256,33 +276,47 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
impl WindowsEventHandler<'_> {
async fn handle_to_update_eviction(&mut self) -> Result<(), LocationManagerError> {
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
let mut should_invalidate = false;
for (path, created_at) in self.files_to_update.drain() {
if created_at.elapsed() < HUNDRED_MILLIS * 5 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
self.reincident_to_update_files.remove(&path);
handle_update(self.location_id, &path, self.node, self.library).await?;
handle_update(
self.location_id,
&path,
self.node,
&mut self.to_recalculate_size,
self.library,
)
.await?;
should_invalidate = true;
}
}
self.files_to_update
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
self.files_to_update_buffer.clear();
self.path_and_instant_buffer.clear();
// We have to check if we have any reincident files to update and update them after a bigger
// timeout, this way we keep track of files being update frequently enough to bypass our
// eviction check above
for (path, created_at) in self.reincident_to_update_files.drain() {
if created_at.elapsed() < ONE_SECOND * 10 {
self.files_to_update_buffer.push((path, created_at));
self.path_and_instant_buffer.push((path, created_at));
} else {
self.files_to_update.remove(&path);
handle_update(self.location_id, &path, self.node, self.library).await?;
handle_update(
self.location_id,
&path,
self.node,
&mut self.to_recalculate_size,
self.library,
)
.await?;
should_invalidate = true;
}
}
@@ -292,7 +326,7 @@ impl WindowsEventHandler<'_> {
}
self.reincident_to_update_files
.extend(self.files_to_update_buffer.drain(..));
.extend(self.path_and_instant_buffer.drain(..));
Ok(())
}
@@ -303,6 +337,12 @@ impl WindowsEventHandler<'_> {
for (inode_and_device, (instant, path)) in self.files_to_remove.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
self.to_recalculate_size
.insert(parent.to_path_buf(), Instant::now());
}
}
remove(self.location_id, &path, self.library).await?;
should_invalidate = true;
trace!("Removed file_path due timeout: {}", path.display());
@@ -327,12 +367,18 @@ async fn handle_update<'lib>(
location_id: location::id::Type,
path: &PathBuf,
node: &'lib Arc<Node>,
to_recalculate_size: &mut HashMap<PathBuf, Instant>,
library: &'lib Arc<Library>,
) -> Result<(), LocationManagerError> {
let metadata = fs::metadata(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))?;
if metadata.is_file() {
if let Some(parent) = path.parent() {
if parent != Path::new("") {
to_recalculate_size.insert(parent.to_path_buf(), Instant::now());
}
}
update_file(location_id, path, node, library).await?;
}