[ENG-884] Update file_path metadata (#1077)

* Partitioned paths to be updated on walker

* Updating file_paths in indexer

* Properly tracking modified date on renames

---------

Co-authored-by: Vítor Vasconcellos <vasconcellos.dev@gmail.com>
This commit is contained in:
Ericson "Fogo" Soares
2023-07-11 12:14:11 -03:00
committed by GitHub
parent 121b5b4bfe
commit 7cd00dab49
13 changed files with 441 additions and 118 deletions

View File

@@ -14,7 +14,6 @@ use std::path::PathBuf;
use rspc::{self, alpha::AlphaRouter, ErrorCode};
use serde::{Deserialize, Serialize};
use specta::Type;
use tracing::info;
use super::{utils::library, Ctx, R};
@@ -141,28 +140,20 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
reidentify_objects,
}| async move {
if reidentify_objects {
let object_ids = library
library
.db
.file_path()
.find_many(vec![
file_path::location_id::equals(Some(location_id)),
file_path::object_id::not(None),
])
.select(file_path::select!({ object_id }))
.exec()
.await?
.into_iter()
.filter_map(|file_path| file_path.object_id)
.collect::<Vec<_>>();
let count = library
.db
.object()
.delete_many(vec![object::id::in_vec(object_ids)])
.update_many(
vec![
file_path::location_id::equals(Some(location_id)),
file_path::object_id::not(None),
],
vec![file_path::object::disconnect()],
)
.exec()
.await?;
info!("Deleted {count} objects, to be reidentified");
library.orphan_remover.invoke().await;
}
// rescan location

View File

@@ -16,12 +16,12 @@ use serde::{Deserialize, Serialize};
use super::{
file_path_for_file_identifier, file_path_for_object_validator, file_path_for_thumbnailer,
file_path_to_full_path, file_path_to_handle_custom_uri, file_path_to_isolate,
file_path_to_isolate_with_id, file_path_with_object, FilePathError,
file_path_to_isolate_with_id, file_path_walker, file_path_with_object, FilePathError,
};
static FORBIDDEN_FILE_NAMES: OnceLock<RegexSet> = OnceLock::new();
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Hash, Eq, PartialEq)]
#[non_exhaustive]
pub struct IsolatedFilePathData<'a> {
// WARN! These fields MUST NOT be changed outside the location module, that's why they have this visibility
@@ -446,6 +446,7 @@ mod macros {
impl_from_db!(
file_path,
file_path_to_isolate,
file_path_walker,
file_path_to_isolate_with_id,
file_path_with_object
);

View File

@@ -68,6 +68,17 @@ file_path::select!(file_path_to_isolate_with_id {
name
extension
});
file_path::select!(file_path_walker {
pub_id
location_id
materialized_path
is_dir
name
extension
date_modified
inode
device
});
file_path::select!(file_path_to_handle_custom_uri {
materialized_path
is_dir
@@ -170,6 +181,7 @@ pub async fn create_file_path(
cas_id: Option<String>,
metadata: FilePathMetadata,
) -> Result<file_path::Data, FilePathError> {
use crate::util::db::{device_to_db, inode_to_db};
use crate::{sync, util::db::uuid_to_bytes};
use sd_prisma::prisma;
@@ -228,8 +240,8 @@ pub async fn create_file_path(
materialized_path::set(Some(materialized_path.into_owned())),
name::set(Some(name.into_owned())),
extension::set(Some(extension.into_owned())),
inode::set(Some(metadata.inode.to_le_bytes().into())),
device::set(Some(metadata.device.to_le_bytes().into())),
inode::set(Some(inode_to_db(metadata.inode))),
device::set(Some(device_to_db(metadata.device))),
cas_id::set(cas_id),
is_dir::set(Some(is_dir)),
size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())),

View File

@@ -29,10 +29,11 @@ use tokio::time::Instant;
use tracing::info;
use super::{
execute_indexer_save_step, iso_file_path_factory, remove_non_existing_file_paths,
execute_indexer_save_step, execute_indexer_update_step, iso_file_path_factory,
remove_non_existing_file_paths,
rules::IndexerRule,
walk::{keep_walking, walk, ToWalkEntry, WalkResult},
IndexerError, IndexerJobSaveStep,
IndexerError, IndexerJobSaveStep, IndexerJobUpdateStep,
};
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
@@ -69,8 +70,11 @@ pub struct IndexerJobRunMetadata {
db_write_time: Duration,
scan_read_time: Duration,
total_paths: u64,
total_updated_paths: u64,
total_save_steps: u64,
total_update_steps: u64,
indexed_count: u64,
updated_count: u64,
removed_count: u64,
}
@@ -79,7 +83,9 @@ impl JobRunMetadata for IndexerJobRunMetadata {
self.db_write_time += new_data.db_write_time;
self.scan_read_time += new_data.scan_read_time;
self.total_paths += new_data.total_paths;
self.total_updated_paths += new_data.total_updated_paths;
self.total_save_steps += new_data.total_save_steps;
self.total_update_steps += new_data.total_update_steps;
self.indexed_count += new_data.indexed_count;
self.removed_count += new_data.removed_count;
}
@@ -89,6 +95,7 @@ impl JobRunMetadata for IndexerJobRunMetadata {
pub enum ScanProgress {
ChunkCount(usize),
SavedChunks(usize),
UpdatedChunks(usize),
Message(String),
}
@@ -99,7 +106,9 @@ impl IndexerJobData {
.into_iter()
.map(|p| match p {
ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c),
ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p),
ScanProgress::SavedChunks(p) | ScanProgress::UpdatedChunks(p) => {
JobReportUpdate::CompletedTaskCount(p)
}
ScanProgress::Message(m) => JobReportUpdate::Message(m),
})
.collect(),
@@ -110,9 +119,9 @@ impl IndexerJobData {
/// `IndexerJobStepInput` defines the action that should be executed in the current step
#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerJobStepInput {
/// `IndexerJobStepEntry`. The size of this vector is given by the [`BATCH_SIZE`] constant.
Save(IndexerJobSaveStep),
Walk(ToWalkEntry),
Update(IndexerJobUpdateStep),
}
/// A `IndexerJob` is a stateful job that walks a directory and indexes all files.
@@ -172,6 +181,7 @@ impl StatefulJob for IndexerJobInit {
let scan_start = Instant::now();
let WalkResult {
walked,
to_update,
to_walk,
to_remove,
errors,
@@ -192,8 +202,11 @@ impl StatefulJob for IndexerJobInit {
let removed_count = remove_non_existing_file_paths(to_remove, &db).await?;
let db_delete_time = db_delete_start.elapsed();
let total_paths = &mut 0;
let total_new_paths = &mut 0;
let total_updated_paths = &mut 0;
let to_walk_count = to_walk.len();
let to_save_chunks = &mut 0;
let to_update_chunks = &mut 0;
let steps = walked
.chunks(BATCH_SIZE)
@@ -202,22 +215,41 @@ impl StatefulJob for IndexerJobInit {
.map(|(i, chunk)| {
let chunk_steps = chunk.collect::<Vec<_>>();
*total_paths += chunk_steps.len() as u64;
*total_new_paths += chunk_steps.len() as u64;
*to_save_chunks += 1;
IndexerJobStepInput::Save(IndexerJobSaveStep {
chunk_idx: i,
walked: chunk_steps,
})
})
.chain(
to_update
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| {
let chunk_updates = chunk.collect::<Vec<_>>();
*total_updated_paths += chunk_updates.len() as u64;
*to_update_chunks += 1;
IndexerJobStepInput::Update(IndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk_updates,
})
}),
)
.chain(to_walk.into_iter().map(IndexerJobStepInput::Walk))
.collect::<Vec<_>>();
IndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::ChunkCount(steps.len() - to_walk_count),
ScanProgress::ChunkCount(*to_save_chunks + *to_update_chunks),
ScanProgress::Message(format!(
"Starting saving {total_paths} files or directories, \
"Starting saving {total_new_paths} files or directories, \
{total_updated_paths} files or directories to update, \
there still {to_walk_count} directories to index",
)),
],
@@ -232,10 +264,13 @@ impl StatefulJob for IndexerJobInit {
IndexerJobRunMetadata {
db_write_time: db_delete_time,
scan_read_time,
total_paths: *total_paths,
total_paths: *total_new_paths,
total_updated_paths: *total_updated_paths,
indexed_count: 0,
updated_count: 0,
removed_count,
total_save_steps: steps.len() as u64 - to_walk_count as u64,
total_save_steps: *to_save_chunks as u64,
total_update_steps: *to_update_chunks as u64,
},
steps,
errors
@@ -272,14 +307,34 @@ impl StatefulJob for IndexerJobInit {
],
);
let count =
execute_indexer_save_step(&init.location, step, &ctx.library.clone()).await?;
let count = execute_indexer_save_step(&init.location, step, &ctx.library).await?;
new_metadata.indexed_count = count as u64;
new_metadata.db_write_time = start_time.elapsed();
Ok(new_metadata.into())
}
IndexerJobStepInput::Update(to_update) => {
let start_time = Instant::now();
IndexerJobData::on_scan_progress(
ctx,
vec![
ScanProgress::UpdatedChunks(to_update.chunk_idx + 1),
ScanProgress::Message(format!(
"Updating chunk {} of {} to database",
to_update.chunk_idx, run_metadata.total_save_steps
)),
],
);
let count = execute_indexer_update_step(to_update, &ctx.library).await?;
new_metadata.updated_count = count as u64;
new_metadata.db_write_time = start_time.elapsed();
Ok(new_metadata.into())
}
IndexerJobStepInput::Walk(to_walk_entry) => {
let location_id = init.location.id;
let location_path =
@@ -291,6 +346,7 @@ impl StatefulJob for IndexerJobInit {
let WalkResult {
walked,
to_update,
to_walk,
to_remove,
errors,
@@ -320,12 +376,25 @@ impl StatefulJob for IndexerJobInit {
.map(|(i, chunk)| {
let chunk_steps = chunk.collect::<Vec<_>>();
new_metadata.total_paths += chunk_steps.len() as u64;
new_metadata.total_save_steps += 1;
IndexerJobStepInput::Save(IndexerJobSaveStep {
chunk_idx: i,
walked: chunk_steps,
})
})
.chain(to_update.chunks(BATCH_SIZE).into_iter().enumerate().map(
|(i, chunk)| {
let chunk_updates = chunk.collect::<Vec<_>>();
new_metadata.total_updated_paths += chunk_updates.len() as u64;
new_metadata.total_update_steps += 1;
IndexerJobStepInput::Update(IndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk_updates,
})
},
))
.chain(to_walk.into_iter().map(IndexerJobStepInput::Walk))
.collect::<Vec<_>>();
@@ -334,8 +403,11 @@ impl StatefulJob for IndexerJobInit {
vec![
ScanProgress::ChunkCount(more_steps.len() - to_walk_count),
ScanProgress::Message(format!(
"Scanned more {} files or directories; {} more directories to scan",
new_metadata.total_paths, to_walk_count
"Scanned more {} files or directories; \
{} more directories to scan and more {} entries to update",
new_metadata.total_paths,
to_walk_count,
new_metadata.total_updated_paths
)),
],
);
@@ -363,11 +435,12 @@ impl StatefulJob for IndexerJobInit {
let init = self;
info!(
"Scan of {} completed in {:?}. {} new files found, \
indexed {} files in db. db write completed in {:?}",
indexed {} files in db, updated {} entries. db write completed in {:?}",
maybe_missing(&init.location.path, "location.path")?,
run_metadata.scan_read_time,
run_metadata.total_paths,
run_metadata.indexed_count,
run_metadata.total_updated_paths,
run_metadata.db_write_time,
);
@@ -375,6 +448,11 @@ impl StatefulJob for IndexerJobInit {
invalidate_query!(ctx.library, "search.paths");
}
if run_metadata.total_updated_paths > 0 {
// Invoking orphan remover here as we probably have some orphans objects due to updates
ctx.library.orphan_remover.invoke().await;
}
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))
}
}

View File

@@ -2,7 +2,10 @@ use crate::{
library::Library,
prisma::{file_path, location, PrismaClient},
sync,
util::{db::uuid_to_bytes, error::FileIOError},
util::{
db::{device_to_db, inode_to_db, uuid_to_bytes},
error::FileIOError,
},
};
use std::path::Path;
@@ -37,6 +40,12 @@ pub struct IndexerJobSaveStep {
walked: Vec<WalkedEntry>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexerJobUpdateStep {
chunk_idx: usize,
to_update: Vec<WalkedEntry>,
}
/// Error type for the indexer module
#[derive(Error, Debug)]
pub enum IndexerError {
@@ -127,11 +136,11 @@ async fn execute_indexer_save_step(
),
(
(inode::NAME, json!(entry.metadata.inode.to_le_bytes())),
inode::set(Some(entry.metadata.inode.to_le_bytes().into())),
inode::set(Some(inode_to_db(entry.metadata.inode))),
),
(
(device::NAME, json!(entry.metadata.device.to_le_bytes())),
device::set(Some(entry.metadata.device.to_le_bytes().into())),
device::set(Some(device_to_db(entry.metadata.device))),
),
(
(date_created::NAME, json!(entry.metadata.created_at)),
@@ -176,6 +185,91 @@ async fn execute_indexer_save_step(
Ok(count)
}
async fn execute_indexer_update_step(
update_step: &IndexerJobUpdateStep,
library: &Library,
) -> Result<i64, IndexerError> {
let Library { sync, db, .. } = &library;
let (sync_stuff, paths_to_update): (Vec<_>, Vec<_>) = update_step
.to_update
.iter()
.map(|entry| {
let IsolatedFilePathData { is_dir, .. } = &entry.iso_file_path;
use file_path::*;
let pub_id = uuid_to_bytes(entry.pub_id);
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
// As this file was updated while Spacedrive was offline, we mark the object_id as null
// So this file_path will be updated at file identifier job
(
(object_id::NAME, serde_json::Value::Null),
object::disconnect(),
),
((is_dir::NAME, json!(*is_dir)), is_dir::set(Some(*is_dir))),
(
(
size_in_bytes_bytes::NAME,
json!(entry.metadata.size_in_bytes.to_be_bytes().to_vec()),
),
size_in_bytes_bytes::set(Some(
entry.metadata.size_in_bytes.to_be_bytes().to_vec(),
)),
),
(
(inode::NAME, json!(entry.metadata.inode.to_le_bytes())),
inode::set(Some(inode_to_db(entry.metadata.inode))),
),
(
(device::NAME, json!(entry.metadata.device.to_le_bytes())),
device::set(Some(device_to_db(entry.metadata.device))),
),
(
(date_created::NAME, json!(entry.metadata.created_at)),
date_created::set(Some(entry.metadata.created_at.into())),
),
(
(date_modified::NAME, json!(entry.metadata.modified_at)),
date_modified::set(Some(entry.metadata.modified_at.into())),
),
]
.into_iter()
.unzip();
(
sync_params
.into_iter()
.map(|(field, value)| {
sync.shared_update(
sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
field,
value,
)
})
.collect::<Vec<_>>(),
db.file_path()
.update(file_path::pub_id::equals(pub_id), db_params)
.select(file_path::select!({ id })),
)
})
.unzip();
let updated = sync
.write_ops(
db,
(sync_stuff.into_iter().flatten().collect(), paths_to_update),
)
.await?;
trace!("Updated {updated:?} records");
Ok(updated.len() as i64)
}
fn iso_file_path_factory(
location_id: location::id::Type,
location_path: &Path,
@@ -200,7 +294,7 @@ async fn remove_non_existing_file_paths(
}
// TODO: Change this macro to a fn when we're able to return
// `impl Fn(Vec<file_path::WhereParam>) -> impl Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>`
// `impl Fn(Vec<file_path::WhereParam>) -> impl Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>`
// Maybe when TAITs arrive
#[macro_export]
macro_rules! file_paths_db_fetcher_fn {
@@ -216,8 +310,10 @@ macro_rules! file_paths_db_fetcher_fn {
.into_iter()
.map(|founds| {
$db.file_path()
.find_many(founds.collect::<Vec<_>>())
.select($crate::location::file_path_helper::file_path_to_isolate::select())
.find_many(vec![::prisma_client_rust::operator::or(
founds.collect::<Vec<_>>(),
)])
.select($crate::location::file_path_helper::file_path_walker::select())
})
.collect::<Vec<_>>();

View File

@@ -7,6 +7,7 @@ use crate::{
check_file_path_exists, ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
IsolatedFilePathData,
},
indexer::{execute_indexer_update_step, IndexerJobUpdateStep},
LocationError,
},
to_remove_db_fetcher_fn,
@@ -66,7 +67,7 @@ pub async fn shallow(
(false, location_path.to_path_buf())
};
let (walked, to_remove, errors) = {
let (walked, to_update, to_remove, errors) = {
walk_single_dir(
&to_walk_path,
&indexer_rules,
@@ -84,28 +85,34 @@ pub async fn shallow(
// TODO pass these uuids to sync system
remove_non_existing_file_paths(to_remove, &db).await?;
let total_paths = &mut 0;
let steps = walked
let save_steps = walked
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| {
let chunk_steps = chunk.collect::<Vec<_>>();
*total_paths += chunk_steps.len() as u64;
IndexerJobSaveStep {
chunk_idx: i,
walked: chunk_steps,
}
.map(|(i, chunk)| IndexerJobSaveStep {
chunk_idx: i,
walked: chunk.collect::<Vec<_>>(),
})
.collect::<Vec<_>>();
for step in steps {
for step in save_steps {
execute_indexer_save_step(location, &step, library).await?;
}
let update_steps = to_update
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| IndexerJobUpdateStep {
chunk_idx: i,
to_update: chunk.collect::<Vec<_>>(),
})
.collect::<Vec<_>>();
for step in update_steps {
execute_indexer_update_step(&step, library).await?;
}
invalidate_query!(library, "search.paths");
library.orphan_remover.invoke().await;

View File

@@ -1,10 +1,13 @@
use crate::{
location::file_path_helper::{
file_path_just_pub_id, file_path_to_isolate, FilePathMetadata, IsolatedFilePathData,
file_path_just_pub_id, file_path_walker, FilePathMetadata, IsolatedFilePathData,
MetadataExt,
},
prisma::file_path,
util::error::FileIOError,
util::{
db::{device_from_db, from_bytes_to_uuid, inode_from_db},
error::FileIOError,
},
};
#[cfg(target_family = "unix")]
@@ -14,12 +17,13 @@ use crate::location::file_path_helper::get_inode_and_device;
use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
future::Future,
hash::{Hash, Hasher},
path::{Path, PathBuf},
};
use chrono::{DateTime, Duration, FixedOffset};
use serde::{Deserialize, Serialize};
use tokio::fs;
use tracing::trace;
@@ -49,11 +53,44 @@ pub struct ToWalkEntry {
parent_dir_accepted_by_its_children: Option<bool>,
}
#[derive(Debug)]
struct WalkingEntry {
iso_file_path: IsolatedFilePathData<'static>,
maybe_metadata: Option<FilePathMetadata>,
}
impl From<WalkingEntry> for WalkedEntry {
fn from(walking_entry: WalkingEntry) -> Self {
let WalkingEntry {
iso_file_path,
maybe_metadata,
} = walking_entry;
Self {
pub_id: Uuid::new_v4(),
iso_file_path,
metadata: maybe_metadata
.expect("we always use Some in `the inner_walk_single_dir` function"),
}
}
}
impl From<(Uuid, WalkingEntry)> for WalkedEntry {
fn from((pub_id, walking_entry): (Uuid, WalkingEntry)) -> Self {
let WalkingEntry {
iso_file_path,
maybe_metadata,
} = walking_entry;
Self {
pub_id,
iso_file_path,
metadata: maybe_metadata
.expect("we always use Some in `the inner_walk_single_dir` function"),
}
}
}
impl PartialEq for WalkingEntry {
fn eq(&self, other: &Self) -> bool {
self.iso_file_path == other.iso_file_path
@@ -68,12 +105,14 @@ impl Hash for WalkingEntry {
}
}
pub struct WalkResult<Walked, ToRemove>
pub struct WalkResult<Walked, ToUpdate, ToRemove>
where
Walked: Iterator<Item = WalkedEntry>,
ToUpdate: Iterator<Item = WalkedEntry>,
ToRemove: Iterator<Item = file_path_just_pub_id::Data>,
{
pub walked: Walked,
pub to_update: ToUpdate,
pub to_walk: VecDeque<ToWalkEntry>,
pub to_remove: ToRemove,
pub errors: Vec<IndexerError>,
@@ -95,13 +134,14 @@ pub(super) async fn walk<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
limit: u64,
) -> Result<
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_just_pub_id::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let root = root.as_ref();
@@ -139,8 +179,11 @@ where
}
}
let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?;
Ok(WalkResult {
walked: filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?,
walked,
to_update,
to_walk,
to_remove: to_remove.into_iter().flatten(),
errors,
@@ -159,13 +202,14 @@ pub(super) async fn keep_walking<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
iso_file_path_factory: impl Fn(&Path, bool) -> Result<IsolatedFilePathData<'static>, IndexerError>,
) -> Result<
WalkResult<
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = file_path_just_pub_id::Data>,
>,
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let mut to_keep_walking = VecDeque::with_capacity(TO_WALK_QUEUE_INITIAL_CAPACITY);
@@ -189,8 +233,11 @@ where
)
.await;
let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?;
Ok(WalkResult {
walked: filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?,
walked,
to_update,
to_walk: to_keep_walking,
to_remove: to_remove.into_iter(),
errors,
@@ -210,6 +257,7 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
add_root: bool,
) -> Result<
(
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
Vec<file_path_just_pub_id::Data>,
Vec<IndexerError>,
@@ -217,7 +265,7 @@ pub(super) async fn walk_single_dir<FilePathDBFetcherFut, ToRemoveDbFetcherFut>(
IndexerError,
>
where
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
FilePathDBFetcherFut: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
ToRemoveDbFetcherFut: Future<Output = Result<Vec<file_path_just_pub_id::Data>, IndexerError>>,
{
let root = root.as_ref();
@@ -275,19 +323,23 @@ where
)
.await;
Ok((
filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?,
to_remove,
errors,
))
let (walked, to_update) = filter_existing_paths(indexed_paths, file_paths_db_fetcher).await?;
Ok((walked, to_update, to_remove, errors))
}
async fn filter_existing_paths<F>(
indexed_paths: HashSet<WalkingEntry>,
file_paths_db_fetcher: impl Fn(Vec<file_path::WhereParam>) -> F,
) -> Result<impl Iterator<Item = WalkedEntry>, IndexerError>
) -> Result<
(
impl Iterator<Item = WalkedEntry>,
impl Iterator<Item = WalkedEntry>,
),
IndexerError,
>
where
F: Future<Output = Result<Vec<file_path_to_isolate::Data>, IndexerError>>,
F: Future<Output = Result<Vec<file_path_walker::Data>, IndexerError>>,
{
if !indexed_paths.is_empty() {
file_paths_db_fetcher(
@@ -304,18 +356,47 @@ where
.map(move |file_paths| {
let isolated_paths_already_in_db = file_paths
.into_iter()
.flat_map(IsolatedFilePathData::try_from)
.collect::<HashSet<_>>();
indexed_paths.into_iter().filter_map(move |entry| {
(!isolated_paths_already_in_db.contains(&entry.iso_file_path)).then(|| WalkedEntry {
pub_id: Uuid::new_v4(),
iso_file_path: entry.iso_file_path,
metadata: entry
.maybe_metadata
.expect("we always use Some in `the inner_walk_single_dir` function"),
.flat_map(|file_path| {
IsolatedFilePathData::try_from(file_path.clone())
.map(|iso_file_path| (iso_file_path, file_path))
})
})
.collect::<HashMap<_, _>>();
let mut to_update = vec![];
let to_create = indexed_paths
.into_iter()
.filter_map(|entry| {
if let Some(file_path) = isolated_paths_already_in_db.get(&entry.iso_file_path) {
if let (Some(metadata), Some(inode), Some(device), Some(date_modified)) = (
&entry.maybe_metadata,
&file_path.inode,
&file_path.device,
&file_path.date_modified,
) {
let (inode, device) =
(inode_from_db(&inode[0..8]), device_from_db(&device[0..8]));
// Datetimes stored in DB loses a bit of precision, so we need to check against a delta
// instead of using != operator
if inode != metadata.inode
|| device != metadata.device || DateTime::<FixedOffset>::from(
metadata.modified_at,
) - *date_modified
> Duration::milliseconds(1)
{
to_update.push((from_bytes_to_uuid(&file_path.pub_id), entry).into());
}
}
None
} else {
Some(entry.into())
}
})
.collect::<Vec<_>>();
(to_create.into_iter(), to_update.into_iter())
})
}

View File

@@ -110,11 +110,6 @@ pub enum LocationManagerError {
#[error("missing-field")]
MissingField(#[from] MissingFieldError),
#[error("invalid inode")]
InvalidInode,
#[error("invalid device")]
InvalidDevice,
#[error(transparent)]
FileIO(#[from] FileIOError),
}

View File

@@ -106,8 +106,19 @@ impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => {
let from_path = &paths[0];
let to_path = &paths[1];
self.rename_from.remove(from_path);
rename(self.location_id, &paths[1], from_path, self.library).await?;
rename(
self.location_id,
to_path,
from_path,
fs::metadata(to_path)
.await
.map_err(|e| FileIOError::from((to_path, e)))?,
self.library,
)
.await?;
self.recently_renamed_from
.insert(paths.swap_remove(0), Instant::now());
}

View File

@@ -268,7 +268,7 @@ impl MacOsEventHandler<'_> {
);
// We found a new path for this old path, so we can rename it
rename(self.location_id, &path, &old_path, self.library).await?;
rename(self.location_id, &path, &old_path, meta, self.library).await?;
} else {
trace!("No match for new path yet: {}", path.display());
self.new_paths_map
@@ -299,7 +299,16 @@ impl MacOsEventHandler<'_> {
);
// We found a new path for this old path, so we can rename it
rename(self.location_id, &new_path, &path, self.library).await?;
rename(
self.location_id,
&new_path,
&path,
fs::metadata(&new_path)
.await
.map_err(|e| FileIOError::from((&new_path, e)))?,
self.library,
)
.await?;
} else {
trace!("No match for old path yet: {}", path.display());
// We didn't find a new path for this old path, so we store ir for later

View File

@@ -21,7 +21,10 @@ use crate::{
},
prisma::{file_path, location, object},
sync,
util::{db::maybe_missing, error::FileIOError},
util::{
db::{device_from_db, device_to_db, inode_from_db, inode_to_db, maybe_missing},
error::FileIOError,
},
};
#[cfg(target_family = "unix")]
@@ -39,7 +42,7 @@ use std::{
use sd_file_ext::extensions::ImageExtension;
use chrono::{DateTime, Local};
use chrono::{DateTime, Local, Utc};
use notify::{Event, EventKind};
use prisma_client_rust::{raw, PrismaValue};
use serde_json::json;
@@ -382,16 +385,8 @@ async fn inner_update_file(
let location_path = location_path.as_ref();
let (current_inode, current_device) = (
u64::from_le_bytes(
maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidInode)?,
),
u64::from_le_bytes(
maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidDevice)?,
),
inode_from_db(&maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8]),
device_from_db(&maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8]),
);
trace!(
@@ -451,7 +446,7 @@ async fn inner_update_file(
))),
),
{
let date = DateTime::<Local>::from(fs_metadata.modified_or_now()).into();
let date = DateTime::<Utc>::from(fs_metadata.modified_or_now()).into();
(
(date_modified::NAME, json!(date)),
@@ -480,7 +475,7 @@ async fn inner_update_file(
if let Some(new_inode) = maybe_new_inode {
(
(inode::NAME, json!(new_inode)),
Some(inode::set(Some(new_inode.to_le_bytes().to_vec()))),
Some(inode::set(Some(inode_to_db(new_inode)))),
)
} else {
((inode::NAME, serde_json::Value::Null), None)
@@ -490,7 +485,7 @@ async fn inner_update_file(
if let Some(new_device) = maybe_new_device {
(
(device::NAME, json!(new_device)),
Some(device::set(Some(new_device.to_le_bytes().to_vec()))),
Some(device::set(Some(device_to_db(new_device)))),
)
} else {
((device::NAME, serde_json::Value::Null), None)
@@ -574,6 +569,7 @@ pub(super) async fn rename(
location_id: location::id::Type,
new_path: impl AsRef<Path>,
old_path: impl AsRef<Path>,
new_path_metadata: Metadata,
library: &Library,
) -> Result<(), LocationManagerError> {
let location_path = extract_location_path(location_id, library).await?;
@@ -642,6 +638,9 @@ pub(super) async fn rename(
file_path::materialized_path::set(Some(new_path_materialized_str)),
file_path::name::set(Some(new.name.to_string())),
file_path::extension::set(Some(new.extension.to_string())),
file_path::date_modified::set(Some(
DateTime::<Utc>::from(new_path_metadata.modified_or_now()).into(),
)),
],
)
.exec()
@@ -801,15 +800,11 @@ pub(super) async fn extract_inode_and_device_from_path(
Err(FilePathError::NotFound(path.into()).into()),
|file_path| {
Ok((
u64::from_le_bytes(
maybe_missing(file_path.inode, "file_path.inode")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidInode)?,
inode_from_db(
&maybe_missing(file_path.inode.as_ref(), "file_path.inode")?[0..8],
),
u64::from_le_bytes(
maybe_missing(file_path.device, "file_path.device")?[0..8]
.try_into()
.map_err(|_| LocationManagerError::InvalidDevice)?,
device_from_db(
&maybe_missing(file_path.device.as_ref(), "file_path.device")?[0..8],
),
))
},

View File

@@ -88,7 +88,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
);
// We found a new path for this old path, so we can rename it instead of removing and creating it
rename(self.location_id, &paths[0], &old_path, self.library).await?;
rename(
self.location_id,
&paths[0],
&old_path,
fs::metadata(&paths[0])
.await
.map_err(|e| FileIOError::from((&paths[0], e)))?,
self.library,
)
.await?;
} else {
let metadata =
create_dir_or_file(self.location_id, &paths[0], self.library).await?;
@@ -120,7 +129,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
if let Some((_, new_path)) = self.rename_to_map.remove(&inode_and_device) {
// We found a new path for this old path, so we can rename it
rename(self.location_id, &new_path, &path, self.library).await?;
rename(
self.location_id,
&new_path,
&path,
fs::metadata(&new_path)
.await
.map_err(|e| FileIOError::from((&new_path, e)))?,
self.library,
)
.await?;
} else {
self.rename_from_map
.insert(inode_and_device, (Instant::now(), path));
@@ -135,7 +153,16 @@ impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
if let Some((_, old_path)) = self.rename_to_map.remove(&inode_and_device) {
// We found a old path for this new path, so we can rename it
rename(self.location_id, &path, &old_path, self.library).await?;
rename(
self.location_id,
&path,
&old_path,
fs::metadata(&path)
.await
.map_err(|e| FileIOError::from((&path, e)))?,
self.library,
)
.await?;
} else {
self.rename_from_map
.insert(inode_and_device, (Instant::now(), path));

View File

@@ -76,6 +76,26 @@ pub fn uuid_to_bytes(uuid: Uuid) -> Vec<u8> {
uuid.as_bytes().to_vec()
}
pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
Uuid::from_slice(bytes).expect("corrupted uuid in database")
}
pub fn inode_from_db(db_inode: &[u8]) -> u64 {
u64::from_le_bytes(db_inode.try_into().expect("corrupted inode in database"))
}
pub fn device_from_db(db_device: &[u8]) -> u64 {
u64::from_le_bytes(db_device.try_into().expect("corrupted device in database"))
}
pub fn inode_to_db(inode: u64) -> Vec<u8> {
inode.to_le_bytes().to_vec()
}
pub fn device_to_db(device: u64) -> Vec<u8> {
device.to_le_bytes().to_vec()
}
#[derive(Error, Debug)]
#[error("Missing field {0}")]
pub struct MissingFieldError(&'static str);