From 7cd00dab49ef405fb5679bd8bb472ff4abdb53cb Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Tue, 11 Jul 2023 12:14:11 -0300 Subject: [PATCH] [ENG-884] Update file_path metadata (#1077) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Partitioned paths to be updated on walker * Updating file_paths in indexer * Properly tracking modified date on renames --------- Co-authored-by: VĂ­tor Vasconcellos --- core/src/api/locations.rs | 27 ++-- .../isolated_file_path_data.rs | 5 +- core/src/location/file_path_helper/mod.rs | 16 ++- core/src/location/indexer/indexer_job.rs | 108 ++++++++++++-- core/src/location/indexer/mod.rs | 108 +++++++++++++- core/src/location/indexer/shallow.rs | 35 +++-- core/src/location/indexer/walk.rs | 135 ++++++++++++++---- core/src/location/manager/mod.rs | 5 - core/src/location/manager/watcher/linux.rs | 13 +- core/src/location/manager/watcher/macos.rs | 13 +- core/src/location/manager/watcher/utils.rs | 41 +++--- core/src/location/manager/watcher/windows.rs | 33 ++++- core/src/util/db.rs | 20 +++ 13 files changed, 441 insertions(+), 118 deletions(-) diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index 47da8048c..e1b73978b 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -14,7 +14,6 @@ use std::path::PathBuf; use rspc::{self, alpha::AlphaRouter, ErrorCode}; use serde::{Deserialize, Serialize}; use specta::Type; -use tracing::info; use super::{utils::library, Ctx, R}; @@ -141,28 +140,20 @@ pub(crate) fn mount() -> AlphaRouter { reidentify_objects, }| async move { if reidentify_objects { - let object_ids = library + library .db .file_path() - .find_many(vec![ - file_path::location_id::equals(Some(location_id)), - file_path::object_id::not(None), - ]) - .select(file_path::select!({ object_id })) - .exec() - .await? - .into_iter() - .filter_map(|file_path| file_path.object_id) - .collect::>(); - - let count = library - .db - .object() - .delete_many(vec![object::id::in_vec(object_ids)]) + .update_many( + vec![ + file_path::location_id::equals(Some(location_id)), + file_path::object_id::not(None), + ], + vec![file_path::object::disconnect()], + ) .exec() .await?; - info!("Deleted {count} objects, to be reidentified"); + library.orphan_remover.invoke().await; } // rescan location diff --git a/core/src/location/file_path_helper/isolated_file_path_data.rs b/core/src/location/file_path_helper/isolated_file_path_data.rs index 95b0141a6..b14c39e9c 100644 --- a/core/src/location/file_path_helper/isolated_file_path_data.rs +++ b/core/src/location/file_path_helper/isolated_file_path_data.rs @@ -16,12 +16,12 @@ use serde::{Deserialize, Serialize}; use super::{ file_path_for_file_identifier, file_path_for_object_validator, file_path_for_thumbnailer, file_path_to_full_path, file_path_to_handle_custom_uri, file_path_to_isolate, - file_path_to_isolate_with_id, file_path_with_object, FilePathError, + file_path_to_isolate_with_id, file_path_walker, file_path_with_object, FilePathError, }; static FORBIDDEN_FILE_NAMES: OnceLock = OnceLock::new(); -#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Hash, Eq, PartialEq)] #[non_exhaustive] pub struct IsolatedFilePathData<'a> { // WARN! These fields MUST NOT be changed outside the location module, that's why they have this visibility @@ -446,6 +446,7 @@ mod macros { impl_from_db!( file_path, file_path_to_isolate, + file_path_walker, file_path_to_isolate_with_id, file_path_with_object ); diff --git a/core/src/location/file_path_helper/mod.rs b/core/src/location/file_path_helper/mod.rs index a59b420ae..376f7e871 100644 --- a/core/src/location/file_path_helper/mod.rs +++ b/core/src/location/file_path_helper/mod.rs @@ -68,6 +68,17 @@ file_path::select!(file_path_to_isolate_with_id { name extension }); +file_path::select!(file_path_walker { + pub_id + location_id + materialized_path + is_dir + name + extension + date_modified + inode + device +}); file_path::select!(file_path_to_handle_custom_uri { materialized_path is_dir @@ -170,6 +181,7 @@ pub async fn create_file_path( cas_id: Option, metadata: FilePathMetadata, ) -> Result { + use crate::util::db::{device_to_db, inode_to_db}; use crate::{sync, util::db::uuid_to_bytes}; use sd_prisma::prisma; @@ -228,8 +240,8 @@ pub async fn create_file_path( materialized_path::set(Some(materialized_path.into_owned())), name::set(Some(name.into_owned())), extension::set(Some(extension.into_owned())), - inode::set(Some(metadata.inode.to_le_bytes().into())), - device::set(Some(metadata.device.to_le_bytes().into())), + inode::set(Some(inode_to_db(metadata.inode))), + device::set(Some(device_to_db(metadata.device))), cas_id::set(cas_id), is_dir::set(Some(is_dir)), size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())), diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 30717e47d..308e01781 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -29,10 +29,11 @@ use tokio::time::Instant; use tracing::info; use super::{ - execute_indexer_save_step, iso_file_path_factory, remove_non_existing_file_paths, + execute_indexer_save_step, execute_indexer_update_step, iso_file_path_factory, + remove_non_existing_file_paths, rules::IndexerRule, walk::{keep_walking, walk, ToWalkEntry, WalkResult}, - IndexerError, IndexerJobSaveStep, + IndexerError, IndexerJobSaveStep, IndexerJobUpdateStep, }; /// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. @@ -69,8 +70,11 @@ pub struct IndexerJobRunMetadata { db_write_time: Duration, scan_read_time: Duration, total_paths: u64, + total_updated_paths: u64, total_save_steps: u64, + total_update_steps: u64, indexed_count: u64, + updated_count: u64, removed_count: u64, } @@ -79,7 +83,9 @@ impl JobRunMetadata for IndexerJobRunMetadata { self.db_write_time += new_data.db_write_time; self.scan_read_time += new_data.scan_read_time; self.total_paths += new_data.total_paths; + self.total_updated_paths += new_data.total_updated_paths; self.total_save_steps += new_data.total_save_steps; + self.total_update_steps += new_data.total_update_steps; self.indexed_count += new_data.indexed_count; self.removed_count += new_data.removed_count; } @@ -89,6 +95,7 @@ impl JobRunMetadata for IndexerJobRunMetadata { pub enum ScanProgress { ChunkCount(usize), SavedChunks(usize), + UpdatedChunks(usize), Message(String), } @@ -99,7 +106,9 @@ impl IndexerJobData { .into_iter() .map(|p| match p { ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c), - ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p), + ScanProgress::SavedChunks(p) | ScanProgress::UpdatedChunks(p) => { + JobReportUpdate::CompletedTaskCount(p) + } ScanProgress::Message(m) => JobReportUpdate::Message(m), }) .collect(), @@ -110,9 +119,9 @@ impl IndexerJobData { /// `IndexerJobStepInput` defines the action that should be executed in the current step #[derive(Serialize, Deserialize, Debug)] pub enum IndexerJobStepInput { - /// `IndexerJobStepEntry`. The size of this vector is given by the [`BATCH_SIZE`] constant. Save(IndexerJobSaveStep), Walk(ToWalkEntry), + Update(IndexerJobUpdateStep), } /// A `IndexerJob` is a stateful job that walks a directory and indexes all files. @@ -172,6 +181,7 @@ impl StatefulJob for IndexerJobInit { let scan_start = Instant::now(); let WalkResult { walked, + to_update, to_walk, to_remove, errors, @@ -192,8 +202,11 @@ impl StatefulJob for IndexerJobInit { let removed_count = remove_non_existing_file_paths(to_remove, &db).await?; let db_delete_time = db_delete_start.elapsed(); - let total_paths = &mut 0; + let total_new_paths = &mut 0; + let total_updated_paths = &mut 0; let to_walk_count = to_walk.len(); + let to_save_chunks = &mut 0; + let to_update_chunks = &mut 0; let steps = walked .chunks(BATCH_SIZE) @@ -202,22 +215,41 @@ impl StatefulJob for IndexerJobInit { .map(|(i, chunk)| { let chunk_steps = chunk.collect::>(); - *total_paths += chunk_steps.len() as u64; + *total_new_paths += chunk_steps.len() as u64; + *to_save_chunks += 1; IndexerJobStepInput::Save(IndexerJobSaveStep { chunk_idx: i, walked: chunk_steps, }) }) + .chain( + to_update + .chunks(BATCH_SIZE) + .into_iter() + .enumerate() + .map(|(i, chunk)| { + let chunk_updates = chunk.collect::>(); + + *total_updated_paths += chunk_updates.len() as u64; + *to_update_chunks += 1; + + IndexerJobStepInput::Update(IndexerJobUpdateStep { + chunk_idx: i, + to_update: chunk_updates, + }) + }), + ) .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)) .collect::>(); IndexerJobData::on_scan_progress( ctx, vec![ - ScanProgress::ChunkCount(steps.len() - to_walk_count), + ScanProgress::ChunkCount(*to_save_chunks + *to_update_chunks), ScanProgress::Message(format!( - "Starting saving {total_paths} files or directories, \ + "Starting saving {total_new_paths} files or directories, \ + {total_updated_paths} files or directories to update, \ there still {to_walk_count} directories to index", )), ], @@ -232,10 +264,13 @@ impl StatefulJob for IndexerJobInit { IndexerJobRunMetadata { db_write_time: db_delete_time, scan_read_time, - total_paths: *total_paths, + total_paths: *total_new_paths, + total_updated_paths: *total_updated_paths, indexed_count: 0, + updated_count: 0, removed_count, - total_save_steps: steps.len() as u64 - to_walk_count as u64, + total_save_steps: *to_save_chunks as u64, + total_update_steps: *to_update_chunks as u64, }, steps, errors @@ -272,14 +307,34 @@ impl StatefulJob for IndexerJobInit { ], ); - let count = - execute_indexer_save_step(&init.location, step, &ctx.library.clone()).await?; + let count = execute_indexer_save_step(&init.location, step, &ctx.library).await?; new_metadata.indexed_count = count as u64; new_metadata.db_write_time = start_time.elapsed(); Ok(new_metadata.into()) } + IndexerJobStepInput::Update(to_update) => { + let start_time = Instant::now(); + IndexerJobData::on_scan_progress( + ctx, + vec![ + ScanProgress::UpdatedChunks(to_update.chunk_idx + 1), + ScanProgress::Message(format!( + "Updating chunk {} of {} to database", + to_update.chunk_idx, run_metadata.total_save_steps + )), + ], + ); + + let count = execute_indexer_update_step(to_update, &ctx.library).await?; + + new_metadata.updated_count = count as u64; + new_metadata.db_write_time = start_time.elapsed(); + + Ok(new_metadata.into()) + } + IndexerJobStepInput::Walk(to_walk_entry) => { let location_id = init.location.id; let location_path = @@ -291,6 +346,7 @@ impl StatefulJob for IndexerJobInit { let WalkResult { walked, + to_update, to_walk, to_remove, errors, @@ -320,12 +376,25 @@ impl StatefulJob for IndexerJobInit { .map(|(i, chunk)| { let chunk_steps = chunk.collect::>(); new_metadata.total_paths += chunk_steps.len() as u64; + new_metadata.total_save_steps += 1; IndexerJobStepInput::Save(IndexerJobSaveStep { chunk_idx: i, walked: chunk_steps, }) }) + .chain(to_update.chunks(BATCH_SIZE).into_iter().enumerate().map( + |(i, chunk)| { + let chunk_updates = chunk.collect::>(); + new_metadata.total_updated_paths += chunk_updates.len() as u64; + new_metadata.total_update_steps += 1; + + IndexerJobStepInput::Update(IndexerJobUpdateStep { + chunk_idx: i, + to_update: chunk_updates, + }) + }, + )) .chain(to_walk.into_iter().map(IndexerJobStepInput::Walk)) .collect::>(); @@ -334,8 +403,11 @@ impl StatefulJob for IndexerJobInit { vec![ ScanProgress::ChunkCount(more_steps.len() - to_walk_count), ScanProgress::Message(format!( - "Scanned more {} files or directories; {} more directories to scan", - new_metadata.total_paths, to_walk_count + "Scanned more {} files or directories; \ + {} more directories to scan and more {} entries to update", + new_metadata.total_paths, + to_walk_count, + new_metadata.total_updated_paths )), ], ); @@ -363,11 +435,12 @@ impl StatefulJob for IndexerJobInit { let init = self; info!( "Scan of {} completed in {:?}. {} new files found, \ - indexed {} files in db. db write completed in {:?}", + indexed {} files in db, updated {} entries. db write completed in {:?}", maybe_missing(&init.location.path, "location.path")?, run_metadata.scan_read_time, run_metadata.total_paths, run_metadata.indexed_count, + run_metadata.total_updated_paths, run_metadata.db_write_time, ); @@ -375,6 +448,11 @@ impl StatefulJob for IndexerJobInit { invalidate_query!(ctx.library, "search.paths"); } + if run_metadata.total_updated_paths > 0 { + // Invoking orphan remover here as we probably have some orphans objects due to updates + ctx.library.orphan_remover.invoke().await; + } + Ok(Some(json!({"init: ": init, "run_metadata": run_metadata}))) } } diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 588b0ba46..c41225c46 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -2,7 +2,10 @@ use crate::{ library::Library, prisma::{file_path, location, PrismaClient}, sync, - util::{db::uuid_to_bytes, error::FileIOError}, + util::{ + db::{device_to_db, inode_to_db, uuid_to_bytes}, + error::FileIOError, + }, }; use std::path::Path; @@ -37,6 +40,12 @@ pub struct IndexerJobSaveStep { walked: Vec, } +#[derive(Serialize, Deserialize, Debug)] +pub struct IndexerJobUpdateStep { + chunk_idx: usize, + to_update: Vec, +} + /// Error type for the indexer module #[derive(Error, Debug)] pub enum IndexerError { @@ -127,11 +136,11 @@ async fn execute_indexer_save_step( ), ( (inode::NAME, json!(entry.metadata.inode.to_le_bytes())), - inode::set(Some(entry.metadata.inode.to_le_bytes().into())), + inode::set(Some(inode_to_db(entry.metadata.inode))), ), ( (device::NAME, json!(entry.metadata.device.to_le_bytes())), - device::set(Some(entry.metadata.device.to_le_bytes().into())), + device::set(Some(device_to_db(entry.metadata.device))), ), ( (date_created::NAME, json!(entry.metadata.created_at)), @@ -176,6 +185,91 @@ async fn execute_indexer_save_step( Ok(count) } +async fn execute_indexer_update_step( + update_step: &IndexerJobUpdateStep, + library: &Library, +) -> Result { + let Library { sync, db, .. } = &library; + + let (sync_stuff, paths_to_update): (Vec<_>, Vec<_>) = update_step + .to_update + .iter() + .map(|entry| { + let IsolatedFilePathData { is_dir, .. } = &entry.iso_file_path; + + use file_path::*; + + let pub_id = uuid_to_bytes(entry.pub_id); + + let (sync_params, db_params): (Vec<_>, Vec<_>) = [ + // As this file was updated while Spacedrive was offline, we mark the object_id as null + // So this file_path will be updated at file identifier job + ( + (object_id::NAME, serde_json::Value::Null), + object::disconnect(), + ), + ((is_dir::NAME, json!(*is_dir)), is_dir::set(Some(*is_dir))), + ( + ( + size_in_bytes_bytes::NAME, + json!(entry.metadata.size_in_bytes.to_be_bytes().to_vec()), + ), + size_in_bytes_bytes::set(Some( + entry.metadata.size_in_bytes.to_be_bytes().to_vec(), + )), + ), + ( + (inode::NAME, json!(entry.metadata.inode.to_le_bytes())), + inode::set(Some(inode_to_db(entry.metadata.inode))), + ), + ( + (device::NAME, json!(entry.metadata.device.to_le_bytes())), + device::set(Some(device_to_db(entry.metadata.device))), + ), + ( + (date_created::NAME, json!(entry.metadata.created_at)), + date_created::set(Some(entry.metadata.created_at.into())), + ), + ( + (date_modified::NAME, json!(entry.metadata.modified_at)), + date_modified::set(Some(entry.metadata.modified_at.into())), + ), + ] + .into_iter() + .unzip(); + + ( + sync_params + .into_iter() + .map(|(field, value)| { + sync.shared_update( + sync::file_path::SyncId { + pub_id: pub_id.clone(), + }, + field, + value, + ) + }) + .collect::>(), + db.file_path() + .update(file_path::pub_id::equals(pub_id), db_params) + .select(file_path::select!({ id })), + ) + }) + .unzip(); + + let updated = sync + .write_ops( + db, + (sync_stuff.into_iter().flatten().collect(), paths_to_update), + ) + .await?; + + trace!("Updated {updated:?} records"); + + Ok(updated.len() as i64) +} + fn iso_file_path_factory( location_id: location::id::Type, location_path: &Path, @@ -200,7 +294,7 @@ async fn remove_non_existing_file_paths( } // TODO: Change this macro to a fn when we're able to return -// `impl Fn(Vec) -> impl Future, IndexerError>>` +// `impl Fn(Vec) -> impl Future, IndexerError>>` // Maybe when TAITs arrive #[macro_export] macro_rules! file_paths_db_fetcher_fn { @@ -216,8 +310,10 @@ macro_rules! file_paths_db_fetcher_fn { .into_iter() .map(|founds| { $db.file_path() - .find_many(founds.collect::>()) - .select($crate::location::file_path_helper::file_path_to_isolate::select()) + .find_many(vec![::prisma_client_rust::operator::or( + founds.collect::>(), + )]) + .select($crate::location::file_path_helper::file_path_walker::select()) }) .collect::>(); diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index df3e86537..98ecbb560 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -7,6 +7,7 @@ use crate::{ check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location, IsolatedFilePathData, }, + indexer::{execute_indexer_update_step, IndexerJobUpdateStep}, LocationError, }, to_remove_db_fetcher_fn, @@ -66,7 +67,7 @@ pub async fn shallow( (false, location_path.to_path_buf()) }; - let (walked, to_remove, errors) = { + let (walked, to_update, to_remove, errors) = { walk_single_dir( &to_walk_path, &indexer_rules, @@ -84,28 +85,34 @@ pub async fn shallow( // TODO pass these uuids to sync system remove_non_existing_file_paths(to_remove, &db).await?; - let total_paths = &mut 0; - - let steps = walked + let save_steps = walked .chunks(BATCH_SIZE) .into_iter() .enumerate() - .map(|(i, chunk)| { - let chunk_steps = chunk.collect::>(); - - *total_paths += chunk_steps.len() as u64; - - IndexerJobSaveStep { - chunk_idx: i, - walked: chunk_steps, - } + .map(|(i, chunk)| IndexerJobSaveStep { + chunk_idx: i, + walked: chunk.collect::>(), }) .collect::>(); - for step in steps { + for step in save_steps { execute_indexer_save_step(location, &step, library).await?; } + let update_steps = to_update + .chunks(BATCH_SIZE) + .into_iter() + .enumerate() + .map(|(i, chunk)| IndexerJobUpdateStep { + chunk_idx: i, + to_update: chunk.collect::>(), + }) + .collect::>(); + + for step in update_steps { + execute_indexer_update_step(&step, library).await?; + } + invalidate_query!(library, "search.paths"); library.orphan_remover.invoke().await; diff --git a/core/src/location/indexer/walk.rs b/core/src/location/indexer/walk.rs index 753cb73a6..cd64a98c9 100644 --- a/core/src/location/indexer/walk.rs +++ b/core/src/location/indexer/walk.rs @@ -1,10 +1,13 @@ use crate::{ location::file_path_helper::{ - file_path_just_pub_id, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData, + file_path_just_pub_id, file_path_walker, FilePathMetadata, IsolatedFilePathData, MetadataExt, }, prisma::file_path, - util::error::FileIOError, + util::{ + db::{device_from_db, from_bytes_to_uuid, inode_from_db}, + error::FileIOError, + }, }; #[cfg(target_family = "unix")] @@ -14,12 +17,13 @@ use crate::location::file_path_helper::get_inode_and_device; use crate::location::file_path_helper::get_inode_and_device_from_path; use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, future::Future, hash::{Hash, Hasher}, path::{Path, PathBuf}, }; +use chrono::{DateTime, Duration, FixedOffset}; use serde::{Deserialize, Serialize}; use tokio::fs; use tracing::trace; @@ -49,11 +53,44 @@ pub struct ToWalkEntry { parent_dir_accepted_by_its_children: Option, } +#[derive(Debug)] struct WalkingEntry { iso_file_path: IsolatedFilePathData<'static>, maybe_metadata: Option, } +impl From for WalkedEntry { + fn from(walking_entry: WalkingEntry) -> Self { + let WalkingEntry { + iso_file_path, + maybe_metadata, + } = walking_entry; + + Self { + pub_id: Uuid::new_v4(), + iso_file_path, + metadata: maybe_metadata + .expect("we always use Some in `the inner_walk_single_dir` function"), + } + } +} + +impl From<(Uuid, WalkingEntry)> for WalkedEntry { + fn from((pub_id, walking_entry): (Uuid, WalkingEntry)) -> Self { + let WalkingEntry { + iso_file_path, + maybe_metadata, + } = walking_entry; + + Self { + pub_id, + iso_file_path, + metadata: maybe_metadata + .expect("we always use Some in `the inner_walk_single_dir` function"), + } + } +} + impl PartialEq for WalkingEntry { fn eq(&self, other: &Self) -> bool { self.iso_file_path == other.iso_file_path @@ -68,12 +105,14 @@ impl Hash for WalkingEntry { } } -pub struct WalkResult +pub struct WalkResult where Walked: Iterator, + ToUpdate: Iterator, ToRemove: Iterator, { pub walked: Walked, + pub to_update: ToUpdate, pub to_walk: VecDeque, pub to_remove: ToRemove, pub errors: Vec, @@ -95,13 +134,14 @@ pub(super) async fn walk( limit: u64, ) -> Result< WalkResult< + impl Iterator, impl Iterator, impl Iterator, >, IndexerError, > where - FilePathDBFetcherFut: Future, IndexerError>>, + FilePathDBFetcherFut: Future, IndexerError>>, ToRemoveDbFetcherFut: Future, IndexerError>>, { let root = root.as_ref(); @@ -139,8 +179,11 @@ where } } + let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?; + Ok(WalkResult { - walked: filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?, + walked, + to_update, to_walk, to_remove: to_remove.into_iter().flatten(), errors, @@ -159,13 +202,14 @@ pub(super) async fn keep_walking( iso_file_path_factory: impl Fn(&Path, bool) -> Result, IndexerError>, ) -> Result< WalkResult< + impl Iterator, impl Iterator, impl Iterator, >, IndexerError, > where - FilePathDBFetcherFut: Future, IndexerError>>, + FilePathDBFetcherFut: Future, IndexerError>>, ToRemoveDbFetcherFut: Future, IndexerError>>, { let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY); @@ -189,8 +233,11 @@ where ) .await; + let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?; + Ok(WalkResult { - walked: filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?, + walked, + to_update, to_walk: to_keep_walking, to_remove: to_remove.into_iter(), errors, @@ -210,6 +257,7 @@ pub(super) async fn walk_single_dir( add_root: bool, ) -> Result< ( + impl Iterator, impl Iterator, Vec, Vec, @@ -217,7 +265,7 @@ pub(super) async fn walk_single_dir( IndexerError, > where - FilePathDBFetcherFut: Future, IndexerError>>, + FilePathDBFetcherFut: Future, IndexerError>>, ToRemoveDbFetcherFut: Future, IndexerError>>, { let root = root.as_ref(); @@ -275,19 +323,23 @@ where ) .await; - Ok(( - filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?, - to_remove, - errors, - )) + let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?; + + Ok((walked, to_update, to_remove, errors)) } async fn filter_existing_paths( indexed_paths: HashSet, file_paths_db_fetcher: impl Fn(Vec) -> F, -) -> Result, IndexerError> +) -> Result< + ( + impl Iterator, + impl Iterator, + ), + IndexerError, +> where - F: Future, IndexerError>>, + F: Future, IndexerError>>, { if !indexed_paths.is_empty() { file_paths_db_fetcher( @@ -304,18 +356,47 @@ where .map(move |file_paths| { let isolated_paths_already_in_db = file_paths .into_iter() - .flat_map(IsolatedFilePathData::try_from) - .collect::>(); - - indexed_paths.into_iter().filter_map(move |entry| { - (!isolated_paths_already_in_db.contains(&entry.iso_file_path)).then(|| WalkedEntry { - pub_id: Uuid::new_v4(), - iso_file_path: entry.iso_file_path, - metadata: entry - .maybe_metadata - .expect("we always use Some in `the inner_walk_single_dir` function"), + .flat_map(|file_path| { + IsolatedFilePathData::try_from(file_path.clone()) + .map(|iso_file_path| (iso_file_path, file_path)) }) - }) + .collect::>(); + + let mut to_update = vec![]; + + let to_create = indexed_paths + .into_iter() + .filter_map(|entry| { + if let Some(file_path) = isolated_paths_already_in_db.get(&entry.iso_file_path) { + if let (Some(metadata), Some(inode), Some(device), Some(date_modified)) = ( + &entry.maybe_metadata, + &file_path.inode, + &file_path.device, + &file_path.date_modified, + ) { + let (inode, device) = + (inode_from_db(&inode[0..8]), device_from_db(&device[0..8])); + + // Datetimes stored in DB loses a bit of precision, so we need to check against a delta + // instead of using != operator + if inode != metadata.inode + || device != metadata.device || DateTime::::from( + metadata.modified_at, + ) - *date_modified + > Duration::milliseconds(1) + { + to_update.push((from_bytes_to_uuid(&file_path.pub_id), entry).into()); + } + } + + None + } else { + Some(entry.into()) + } + }) + .collect::>(); + + (to_create.into_iter(), to_update.into_iter()) }) } diff --git a/core/src/location/manager/mod.rs b/core/src/location/manager/mod.rs index 2ace800e7..e0927368d 100644 --- a/core/src/location/manager/mod.rs +++ b/core/src/location/manager/mod.rs @@ -110,11 +110,6 @@ pub enum LocationManagerError { #[error("missing-field")] MissingField(#[from] MissingFieldError), - #[error("invalid inode")] - InvalidInode, - #[error("invalid device")] - InvalidDevice, - #[error(transparent)] FileIO(#[from] FileIOError), } diff --git a/core/src/location/manager/watcher/linux.rs b/core/src/location/manager/watcher/linux.rs index 8d69c56b8..2ffa438e2 100644 --- a/core/src/location/manager/watcher/linux.rs +++ b/core/src/location/manager/watcher/linux.rs @@ -106,8 +106,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => { let from_path = &paths[0]; + let to_path = &paths[1]; + self.rename_from.remove(from_path); - rename(self.location_id, &paths[1], from_path, self.library).await?; + rename( + self.location_id, + to_path, + from_path, + fs::metadata(to_path) + .await + .map_err(|e| FileIOError::from((to_path, e)))?, + self.library, + ) + .await?; self.recently_renamed_from .insert(paths.swap_remove(0), Instant::now()); } diff --git a/core/src/location/manager/watcher/macos.rs b/core/src/location/manager/watcher/macos.rs index d19ccdd47..a1ef6f41b 100644 --- a/core/src/location/manager/watcher/macos.rs +++ b/core/src/location/manager/watcher/macos.rs @@ -268,7 +268,7 @@ impl MacOsEventHandler<'_> { ); // We found a new path for this old path, so we can rename it - rename(self.location_id, &path, &old_path, self.library).await?; + rename(self.location_id, &path, &old_path, meta, self.library).await?; } else { trace!("No match for new path yet: {}", path.display()); self.new_paths_map @@ -299,7 +299,16 @@ impl MacOsEventHandler<'_> { ); // We found a new path for this old path, so we can rename it - rename(self.location_id, &new_path, &path, self.library).await?; + rename( + self.location_id, + &new_path, + &path, + fs::metadata(&new_path) + .await + .map_err(|e| FileIOError::from((&new_path, e)))?, + self.library, + ) + .await?; } else { trace!("No match for old path yet: {}", path.display()); // We didn't find a new path for this old path, so we store ir for later diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 952b1d4af..a1a0bf355 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -21,7 +21,10 @@ use crate::{ }, prisma::{file_path, location, object}, sync, - util::{db::maybe_missing, error::FileIOError}, + util::{ + db::{device_from_db, device_to_db, inode_from_db, inode_to_db, maybe_missing}, + error::FileIOError, + }, }; #[cfg(target_family = "unix")] @@ -39,7 +42,7 @@ use std::{ use sd_file_ext::extensions::ImageExtension; -use chrono::{DateTime, Local}; +use chrono::{DateTime, Local, Utc}; use notify::{Event, EventKind}; use prisma_client_rust::{raw, PrismaValue}; use serde_json::json; @@ -382,16 +385,8 @@ async fn inner_update_file( let location_path = location_path.as_ref(); let (current_inode, current_device) = ( - u64::from_le_bytes( - maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8] - .try_into() - .map_err(|_| LocationManagerError::InvalidInode)?, - ), - u64::from_le_bytes( - maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8] - .try_into() - .map_err(|_| LocationManagerError::InvalidDevice)?, - ), + inode_from_db(&maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8]), + device_from_db(&maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8]), ); trace!( @@ -451,7 +446,7 @@ async fn inner_update_file( ))), ), { - let date = DateTime::::from(fs_metadata.modified_or_now()).into(); + let date = DateTime::::from(fs_metadata.modified_or_now()).into(); ( (date_modified::NAME, json!(date)), @@ -480,7 +475,7 @@ async fn inner_update_file( if let Some(new_inode) = maybe_new_inode { ( (inode::NAME, json!(new_inode)), - Some(inode::set(Some(new_inode.to_le_bytes().to_vec()))), + Some(inode::set(Some(inode_to_db(new_inode)))), ) } else { ((inode::NAME, serde_json::Value::Null), None) @@ -490,7 +485,7 @@ async fn inner_update_file( if let Some(new_device) = maybe_new_device { ( (device::NAME, json!(new_device)), - Some(device::set(Some(new_device.to_le_bytes().to_vec()))), + Some(device::set(Some(device_to_db(new_device)))), ) } else { ((device::NAME, serde_json::Value::Null), None) @@ -574,6 +569,7 @@ pub(super) async fn rename( location_id: location::id::Type, new_path: impl AsRef, old_path: impl AsRef, + new_path_metadata: Metadata, library: &Library, ) -> Result<(), LocationManagerError> { let location_path = extract_location_path(location_id, library).await?; @@ -642,6 +638,9 @@ pub(super) async fn rename( file_path::materialized_path::set(Some(new_path_materialized_str)), file_path::name::set(Some(new.name.to_string())), file_path::extension::set(Some(new.extension.to_string())), + file_path::date_modified::set(Some( + DateTime::::from(new_path_metadata.modified_or_now()).into(), + )), ], ) .exec() @@ -801,15 +800,11 @@ pub(super) async fn extract_inode_and_device_from_path( Err(FilePathError::NotFound(path.into()).into()), |file_path| { Ok(( - u64::from_le_bytes( - maybe_missing(file_path.inode, "file_path.inode")?[0..8] - .try_into() - .map_err(|_| LocationManagerError::InvalidInode)?, + inode_from_db( + &maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8], ), - u64::from_le_bytes( - maybe_missing(file_path.device, "file_path.device")?[0..8] - .try_into() - .map_err(|_| LocationManagerError::InvalidDevice)?, + device_from_db( + &maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8], ), )) }, diff --git a/core/src/location/manager/watcher/windows.rs b/core/src/location/manager/watcher/windows.rs index bcf982c76..7c5c2fd43 100644 --- a/core/src/location/manager/watcher/windows.rs +++ b/core/src/location/manager/watcher/windows.rs @@ -88,7 +88,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { ); // We found a new path for this old path, so we can rename it instead of removing and creating it - rename(self.location_id, &paths[0], &old_path, self.library).await?; + rename( + self.location_id, + &paths[0], + &old_path, + fs::metadata(&paths[0]) + .await + .map_err(|e| FileIOError::from((&paths[0], e)))?, + self.library, + ) + .await?; } else { let metadata = create_dir_or_file(self.location_id, &paths[0], self.library).await?; @@ -120,7 +129,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { if let Some((_, new_path)) = self.rename_to_map.remove(&inode_and_device) { // We found a new path for this old path, so we can rename it - rename(self.location_id, &new_path, &path, self.library).await?; + rename( + self.location_id, + &new_path, + &path, + fs::metadata(&new_path) + .await + .map_err(|e| FileIOError::from((&new_path, e)))?, + self.library, + ) + .await?; } else { self.rename_from_map .insert(inode_and_device, (Instant::now(), path)); @@ -135,7 +153,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { if let Some((_, old_path)) = self.rename_to_map.remove(&inode_and_device) { // We found a old path for this new path, so we can rename it - rename(self.location_id, &path, &old_path, self.library).await?; + rename( + self.location_id, + &path, + &old_path, + fs::metadata(&path) + .await + .map_err(|e| FileIOError::from((&path, e)))?, + self.library, + ) + .await?; } else { self.rename_from_map .insert(inode_and_device, (Instant::now(), path)); diff --git a/core/src/util/db.rs b/core/src/util/db.rs index ab5889a7d..df43a24e0 100644 --- a/core/src/util/db.rs +++ b/core/src/util/db.rs @@ -76,6 +76,26 @@ pub fn uuid_to_bytes(uuid: Uuid) -> Vec { uuid.as_bytes().to_vec() } +pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid { + Uuid::from_slice(bytes).expect("corrupted uuid in database") +} + +pub fn inode_from_db(db_inode: &[u8]) -> u64 { + u64::from_le_bytes(db_inode.try_into().expect("corrupted inode in database")) +} + +pub fn device_from_db(db_device: &[u8]) -> u64 { + u64::from_le_bytes(db_device.try_into().expect("corrupted device in database")) +} + +pub fn inode_to_db(inode: u64) -> Vec { + inode.to_le_bytes().to_vec() +} + +pub fn device_to_db(device: u64) -> Vec { + device.to_le_bytes().to_vec() +} + #[derive(Error, Debug)] #[error("Missing field {0}")] pub struct MissingFieldError(&'static str);