From d3236ae735ecc8e97ff2637dccfe51c0d6157139 Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Fri, 9 Feb 2024 16:17:04 +0800 Subject: [PATCH] More sync support for file paths + saved searches (#2067) more sync support for file paths + saved searches --- core/prisma/schema.prisma | 4 + core/src/api/files.rs | 148 ++++++++++++--- core/src/api/search/saved.rs | 205 ++++++++++++++++----- core/src/api/tags.rs | 61 +++--- core/src/library/config.rs | 6 +- core/src/location/indexer/indexer_job.rs | 7 +- core/src/location/indexer/mod.rs | 31 +++- core/src/location/indexer/shallow.rs | 3 +- core/src/location/manager/watcher/utils.rs | 87 ++++++--- core/src/location/mod.rs | 104 ++++++----- core/src/object/fs/delete.rs | 23 ++- crates/file-path-helper/src/lib.rs | 2 +- 12 files changed, 493 insertions(+), 188 deletions(-) diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 3858e5dea..d753bc58f 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -124,6 +124,8 @@ model Location { hidden Boolean? date_created DateTime? + /// @local + // this is just a client side cache which is annoying but oh well (@brendan) instance_id Int? instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull) @@ -514,6 +516,7 @@ model Notification { @@map("notification") } +/// @shared(id: pub_id) model SavedSearch { id Int @id @default(autoincrement()) pub_id Bytes @unique @@ -532,6 +535,7 @@ model SavedSearch { @@map("saved_search") } +/// @local(id: id) model CloudCRDTOperation { id Bytes @id timestamp BigInt diff --git a/core/src/api/files.rs b/core/src/api/files.rs index 642d9ea6c..412b56748 100644 --- a/core/src/api/files.rs +++ b/core/src/api/files.rs @@ -21,8 +21,13 @@ use sd_file_path_helper::{ }; use sd_images::ConvertableExtension; use sd_media_metadata::MediaMetadata; -use sd_prisma::prisma::{file_path, location, object}; +use sd_prisma::{ + prisma::{file_path, location, object}, + prisma_sync, +}; +use sd_sync::OperationFactory; use sd_utils::{db::maybe_missing, error::FileIOError}; +use serde_json::json; use std::{ ffi::OsString, @@ -177,15 +182,36 @@ pub(crate) fn mount() -> AlphaRouter { R.with2(library()) .mutation(|(_, library), args: SetNoteArgs| async move { - library - .db + let Library { db, sync, .. } = library.as_ref(); + + let object = db .object() - .update( + .find_unique(object::id::equals(args.id)) + .select(object::select!({ pub_id })) + .exec() + .await? + .ok_or_else(|| { + rspc::Error::new( + rspc::ErrorCode::NotFound, + "Object not found".to_string(), + ) + })?; + + sync.write_op( + &db, + sync.shared_update( + prisma_sync::object::SyncId { + pub_id: object.pub_id, + }, + object::note::NAME, + json!(&args.note), + ), + db.object().update( object::id::equals(args.id), vec![object::note::set(args.note)], - ) - .exec() - .await?; + ), + ) + .await?; invalidate_query!(library, "search.paths"); invalidate_query!(library, "search.objects"); @@ -202,15 +228,36 @@ pub(crate) fn mount() -> AlphaRouter { R.with2(library()) .mutation(|(_, library), args: SetFavoriteArgs| async move { - library - .db + let Library { sync, db, .. } = library.as_ref(); + + let object = db .object() - .update( + .find_unique(object::id::equals(args.id)) + .select(object::select!({ pub_id })) + .exec() + .await? + .ok_or_else(|| { + rspc::Error::new( + rspc::ErrorCode::NotFound, + "Object not found".to_string(), + ) + })?; + + sync.write_op( + &db, + sync.shared_update( + prisma_sync::object::SyncId { + pub_id: object.pub_id, + }, + object::favorite::NAME, + json!(&args.favorite), + ), + db.object().update( object::id::equals(args.id), vec![object::favorite::set(Some(args.favorite))], - ) - .exec() - .await?; + ), + ) + .await?; invalidate_query!(library, "search.paths"); invalidate_query!(library, "search.objects"); @@ -251,16 +298,43 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("updateAccessTime", { R.with2(library()) .mutation(|(_, library), ids: Vec| async move { - library - .db + let Library { sync, db, .. } = library.as_ref(); + + let objects = db .object() - .update_many( - vec![object::id::in_vec(ids)], - vec![object::date_accessed::set(Some(Utc::now().into()))], - ) + .find_many(vec![object::id::in_vec(ids)]) + .select(object::select!({ id pub_id })) .exec() .await?; + let date_accessed = Utc::now().into(); + + let (sync_params, db_params): (Vec<_>, Vec<_>) = objects + .into_iter() + .map(|d| { + ( + sync.shared_update( + prisma_sync::object::SyncId { pub_id: d.pub_id }, + object::date_accessed::NAME, + json!(date_accessed), + ), + d.id, + ) + }) + .unzip(); + + sync.write_ops( + &db, + ( + sync_params, + db.object().update_many( + vec![object::id::in_vec(db_params)], + vec![object::date_accessed::set(Some(date_accessed))], + ), + ), + ) + .await?; + invalidate_query!(library, "search.paths"); invalidate_query!(library, "search.objects"); Ok(()) @@ -269,16 +343,40 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("removeAccessTime", { R.with2(library()) .mutation(|(_, library), object_ids: Vec| async move { - library - .db + let Library { db, sync, .. } = library.as_ref(); + + let objects = db .object() - .update_many( - vec![object::id::in_vec(object_ids)], - vec![object::date_accessed::set(None)], - ) + .find_many(vec![object::id::in_vec(object_ids)]) + .select(object::select!({ id pub_id })) .exec() .await?; + let (sync_params, db_params): (Vec<_>, Vec<_>) = objects + .into_iter() + .map(|d| { + ( + sync.shared_update( + prisma_sync::object::SyncId { pub_id: d.pub_id }, + object::date_accessed::NAME, + json!(null), + ), + d.id, + ) + }) + .unzip(); + sync.write_ops( + &db, + ( + sync_params, + db.object().update_many( + vec![object::id::in_vec(db_params)], + vec![object::date_accessed::set(None)], + ), + ), + ) + .await?; + invalidate_query!(library, "search.objects"); invalidate_query!(library, "search.paths"); Ok(()) diff --git a/core/src/api/search/saved.rs b/core/src/api/search/saved.rs index d858d5287..e9ac626a6 100644 --- a/core/src/api/search/saved.rs +++ b/core/src/api/search/saved.rs @@ -1,11 +1,13 @@ -use crate::{api::utils::library, invalidate_query}; +use crate::{api::utils::library, invalidate_query, library::Library}; -use sd_prisma::prisma::saved_search; +use sd_prisma::{prisma::saved_search, prisma_sync}; +use sd_sync::OperationFactory; use sd_utils::chain_optional_iter; use chrono::{DateTime, FixedOffset, Utc}; use rspc::alpha::AlphaRouter; use serde::{de::IgnoredAny, Deserialize, Serialize}; +use serde_json::json; use specta::Type; use tracing::error; use uuid::Uuid; @@ -31,42 +33,76 @@ pub(crate) fn mount() -> AlphaRouter { } |(_, library), args: Args| async move { + let Library { db, sync, .. } = library.as_ref(); let pub_id = Uuid::new_v4().as_bytes().to_vec(); let date_created: DateTime = Utc::now().into(); - library - .db - .saved_search() - .create( - pub_id, - chain_optional_iter( - [ - saved_search::date_created::set(Some(date_created)), - saved_search::name::set(Some(args.name)), - ], - [ - args.filters - .map(|s| { - // https://github.com/serde-rs/json/issues/579 - // https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html - if let Err(e) = serde_json::from_str::(&s) { - error!("failed to parse filters: {e:#?}"); - None - } else { - Some(s) - } - }) - .map(saved_search::filters::set), - args.search.map(Some).map(saved_search::search::set), - args.description - .map(Some) - .map(saved_search::description::set), - args.icon.map(Some).map(saved_search::icon::set), - ], + let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter( + [ + ( + (saved_search::date_created::NAME, json!(date_created)), + saved_search::date_created::set(Some(date_created)), ), - ) - .exec() - .await?; + ( + (saved_search::name::NAME, json!(&args.name)), + saved_search::name::set(Some(args.name)), + ), + ], + [ + args.filters + .and_then(|s| { + // https://github.com/serde-rs/json/issues/579 + // https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html + + if let Err(e) = serde_json::from_str::(&s) { + error!("failed to parse filters: {e:#?}"); + None + } else { + Some(s) + } + }) + .map(|v| { + ( + (saved_search::filters::NAME, json!(&v)), + saved_search::filters::set(Some(v)), + ) + }), + args.search.map(|v| { + ( + (saved_search::search::NAME, json!(&v)), + saved_search::search::set(Some(v)), + ) + }), + args.description.map(|v| { + ( + (saved_search::description::NAME, json!(&v)), + saved_search::description::set(Some(v)), + ) + }), + args.icon.map(|v| { + ( + (saved_search::icon::NAME, json!(&v)), + saved_search::icon::set(Some(v)), + ) + }), + ], + ) + .into_iter() + .unzip(); + + sync.write_ops( + db, + ( + sync.shared_create( + prisma_sync::saved_search::SyncId { + pub_id: pub_id.clone(), + }, + sync_params, + ), + db.saved_search().create(pub_id, db_params), + ), + ) + .await?; invalidate_query!(library, "search.saved.list"); @@ -107,15 +143,81 @@ pub(crate) fn mount() -> AlphaRouter { }); |(_, library), (id, args): (saved_search::id::Type, Args)| async move { - let mut params = args.to_params(); - params.push(saved_search::date_modified::set(Some(Utc::now().into()))); + let Library { db, sync, .. } = library.as_ref(); + let updated_at = Utc::now().into(); - library - .db + let search = db .saved_search() - .update_unchecked(saved_search::id::equals(id), params) + .find_unique(saved_search::id::equals(id)) + .select(saved_search::select!({ pub_id })) .exec() - .await?; + .await? + .ok_or_else(|| { + rspc::Error::new(rspc::ErrorCode::NotFound, "search not found".into()) + })?; + + let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter( + [( + (saved_search::date_modified::NAME, json!(updated_at)), + saved_search::date_modified::set(Some(updated_at)), + )], + [ + args.name.map(|v| { + ( + (saved_search::name::NAME, json!(&v)), + saved_search::name::set(v), + ) + }), + args.description.map(|v| { + ( + (saved_search::name::NAME, json!(&v)), + saved_search::name::set(v), + ) + }), + args.icon.map(|v| { + ( + (saved_search::icon::NAME, json!(&v)), + saved_search::icon::set(v), + ) + }), + args.search.map(|v| { + ( + (saved_search::search::NAME, json!(&v)), + saved_search::search::set(v), + ) + }), + args.filters.map(|v| { + ( + (saved_search::filters::NAME, json!(&v)), + saved_search::filters::set(v), + ) + }), + ], + ) + .into_iter() + .map(|((k, v), p)| { + ( + sync.shared_update( + prisma_sync::saved_search::SyncId { + pub_id: search.pub_id.clone(), + }, + k, + v, + ), + p, + ) + }) + .unzip(); + + sync.write_ops( + &db, + ( + sync_params, + db.saved_search() + .update_unchecked(saved_search::id::equals(id), db_params), + ), + ) + .await?; invalidate_query!(library, "search.saved.list"); invalidate_query!(library, "search.saved.get"); @@ -127,12 +229,27 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("delete", { R.with2(library()) .mutation(|(_, library), search_id: i32| async move { - library - .db + let Library { db, sync, .. } = library.as_ref(); + + let search = db .saved_search() - .delete(saved_search::id::equals(search_id)) + .find_unique(saved_search::id::equals(search_id)) + .select(saved_search::select!({ pub_id })) .exec() - .await?; + .await? + .ok_or_else(|| { + rspc::Error::new(rspc::ErrorCode::NotFound, "search not found".into()) + })?; + + sync.write_op( + &db, + sync.shared_delete(prisma_sync::saved_search::SyncId { + pub_id: search.pub_id, + }), + db.saved_search() + .delete(saved_search::id::equals(search_id)), + ) + .await?; invalidate_query!(library, "search.saved.list"); // disabled as it's messing with pre-delete navigation diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index 3406dafba..762f9c198 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -1,7 +1,6 @@ use crate::{invalidate_query, library::Library, object::tag::TagCreateArgs}; use sd_cache::{CacheNode, Normalise, NormalisedResult, NormalisedResults, Reference}; -use sd_file_ext::kind::ObjectKind; use sd_prisma::{ prisma::{file_path, object, tag, tag_on_object}, prisma_sync, @@ -165,6 +164,7 @@ pub(crate) fn mount() -> AlphaRouter { .select(file_path::select!({ id pub_id + is_dir object: select { id pub_id } })), ) @@ -216,37 +216,40 @@ pub(crate) fn mount() -> AlphaRouter { ) .await?; } else { - let (new_objects, _) = db - ._batch({ - let (left, right): (Vec<_>, Vec<_>) = file_paths - .iter() - .filter(|fp| fp.object.is_none()) - .map(|fp| { - let id = uuid_to_bytes(Uuid::new_v4()); + let mut sync_params = vec![]; - ( - db.object().create( - id.clone(), - vec![ - object::date_created::set(None), - object::kind::set(Some( - ObjectKind::Folder as i32, - )), - ], - ), - db.file_path().update( - file_path::id::equals(fp.id), - vec![file_path::object::connect( - object::pub_id::equals(id), - )], - ), - ) - }) - .unzip(); + let db_params: (Vec<_>, Vec<_>) = file_paths + .iter() + .filter(|fp| fp.is_dir.unwrap_or_default() && fp.object.is_none()) + .map(|fp| { + let id = uuid_to_bytes(Uuid::new_v4()); - (left, right) + sync_params.extend(sync.shared_create( + prisma_sync::object::SyncId { pub_id: id.clone() }, + [], + )); + + sync_params.push(sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: fp.pub_id.clone(), + }, + file_path::object::NAME, + json!(id), + )); + + ( + db.object().create(id.clone(), vec![]), + db.file_path().update( + file_path::id::equals(fp.id), + vec![file_path::object::connect(object::pub_id::equals( + id, + ))], + ), + ) }) - .await?; + .unzip(); + + let (new_objects, _) = sync.write_ops(db, (sync_params, db_params)).await?; let (sync_ops, db_creates) = objects .into_iter() diff --git a/core/src/library/config.rs b/core/src/library/config.rs index 1441b94aa..8c267495e 100644 --- a/core/src/library/config.rs +++ b/core/src/library/config.rs @@ -225,9 +225,9 @@ impl LibraryConfig { Some(size.to_be_bytes().to_vec()) } else { error!( - "File path had invalid size: '{}'", - path.id, size_in_bytes - ); + "File path had invalid size: '{}'", + path.id, size_in_bytes + ); None }; diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 5ba06656b..5de6901e4 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -164,6 +164,7 @@ impl StatefulJob for IndexerJobInit { let location_path = maybe_missing(&init.location.path, "location.path").map(Path::new)?; let db = Arc::clone(&ctx.library.db); + let sync = &ctx.library.sync; let indexer_rules = init .location @@ -235,7 +236,7 @@ impl StatefulJob for IndexerJobInit { let db_delete_start = Instant::now(); // TODO pass these uuids to sync system - let removed_count = remove_non_existing_file_paths(to_remove, &db).await?; + let removed_count = remove_non_existing_file_paths(to_remove, &db, sync).await?; let db_delete_time = db_delete_start.elapsed(); let total_new_paths = &mut 0; @@ -381,6 +382,7 @@ impl StatefulJob for IndexerJobInit { maybe_missing(&init.location.path, "location.path").map(Path::new)?; let db = Arc::clone(&ctx.library.db); + let sync = &ctx.library.sync; let scan_start = Instant::now(); @@ -407,7 +409,8 @@ impl StatefulJob for IndexerJobInit { let db_delete_time = Instant::now(); // TODO pass these uuids to sync system - new_metadata.removed_count = remove_non_existing_file_paths(to_remove, &db).await?; + new_metadata.removed_count = + remove_non_existing_file_paths(to_remove, &db, sync).await?; new_metadata.db_write_time = db_delete_time.elapsed(); let to_walk_count = to_walk.len(); diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 365019be7..057c4aecd 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -304,15 +304,29 @@ fn iso_file_path_factory( async fn remove_non_existing_file_paths( to_remove: impl IntoIterator, db: &PrismaClient, + sync: &sd_core_sync::Manager, ) -> Result { - db.file_path() - .delete_many(vec![file_path::pub_id::in_vec( - to_remove.into_iter().map(|data| data.pub_id).collect(), - )]) - .exec() - .await - .map(|count| count as u64) - .map_err(Into::into) + let (sync_params, db_params): (Vec<_>, Vec<_>) = to_remove + .into_iter() + .map(|d| { + ( + sync.shared_delete(prisma_sync::file_path::SyncId { pub_id: d.pub_id }), + d.id, + ) + }) + .unzip(); + + sync.write_ops( + db, + ( + sync_params, + db.file_path() + .delete_many(vec![file_path::id::in_vec(db_params)]), + ), + ) + .await?; + + Ok(0) } // TODO: Change this macro to a fn when we're able to return @@ -422,6 +436,7 @@ macro_rules! to_remove_db_fetcher_fn { .into_iter() .filter(|file_path| !founds_ids.contains(&file_path.id)) .map(|file_path| ::sd_file_path_helper::file_path_pub_and_cas_ids::Data { + id: file_path.id, pub_id: file_path.pub_id, cas_id: file_path.cas_id, }), diff --git a/core/src/location/indexer/shallow.rs b/core/src/location/indexer/shallow.rs index 8b29d6657..4440d982b 100644 --- a/core/src/location/indexer/shallow.rs +++ b/core/src/location/indexer/shallow.rs @@ -46,6 +46,7 @@ pub async fn shallow( let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?; let db = library.db.clone(); + let sync = &library.sync; let indexer_rules = location .indexer_rules @@ -103,7 +104,7 @@ pub async fn shallow( errors.into_iter().for_each(|e| error!("{e}")); // TODO pass these uuids to sync system - remove_non_existing_file_paths(to_remove, &db).await?; + remove_non_existing_file_paths(to_remove, &db, sync).await?; let mut new_directories_to_scan = HashSet::new(); diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 0b556f10a..2a9d6a4ef 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -741,7 +741,7 @@ pub(super) async fn rename( let location_path = extract_location_path(location_id, library).await?; let old_path = old_path.as_ref(); let new_path = new_path.as_ref(); - let Library { db, .. } = library; + let Library { db, sync, .. } = library; let old_path_materialized_str = extract_normalized_materialized_path_str(location_id, &location_path, old_path)?; @@ -784,8 +784,8 @@ pub(super) async fn rename( let old_parts = old.to_parts(); // TODO: Fetch all file_paths that will be updated and dispatch sync events - let updated = library - .db + // This is NOT sync compatible! @brendan + let updated = db ._execute_raw(raw!( "UPDATE file_path \ SET materialized_path = REPLACE(materialized_path, {}, {}) \ @@ -807,23 +807,56 @@ pub(super) async fn rename( let is_hidden = path_is_hidden(new_path, &new_path_metadata); - library - .db - .file_path() - .update( - file_path::pub_id::equals(file_path.pub_id), - vec![ - file_path::materialized_path::set(Some(new_path_materialized_str)), - file_path::name::set(Some(new_parts.name.to_string())), - file_path::extension::set(Some(new_parts.extension.to_string())), - file_path::date_modified::set(Some( - DateTime::::from(new_path_metadata.modified_or_now()).into(), - )), - file_path::hidden::set(Some(is_hidden)), - ], - ) - .exec() - .await?; + let date_modified = DateTime::::from(new_path_metadata.modified_or_now()).into(); + + let (sync_params, db_params): (Vec<_>, Vec<_>) = [ + ( + ( + file_path::materialized_path::NAME, + json!(new_path_materialized_str), + ), + file_path::materialized_path::set(Some(new_path_materialized_str)), + ), + ( + (file_path::name::NAME, json!(new_parts.name)), + file_path::name::set(Some(new_parts.name.to_string())), + ), + ( + (file_path::extension::NAME, json!(new_parts.extension)), + file_path::extension::set(Some(new_parts.extension.to_string())), + ), + ( + (file_path::date_modified::NAME, json!(&date_modified)), + file_path::date_modified::set(Some(date_modified)), + ), + ( + (file_path::hidden::NAME, json!(is_hidden)), + file_path::hidden::set(Some(is_hidden)), + ), + ] + .into_iter() + .unzip(); + + sync.write_ops( + db, + ( + sync_params + .into_iter() + .map(|(k, v)| { + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: file_path.pub_id.clone(), + }, + k, + v, + ) + }) + .collect(), + db.file_path() + .update(file_path::pub_id::equals(file_path.pub_id), db_params), + ), + ) + .await?; invalidate_query!(library, "search.paths"); invalidate_query!(library, "search.objects"); @@ -870,7 +903,7 @@ pub(super) async fn remove_by_file_path( todo!("file has changed in some way, re-identify it") } Err(e) if e.kind() == ErrorKind::NotFound => { - let db = &library.db; + let Library { sync, db, .. } = library; let is_dir = maybe_missing(file_path.is_dir, "file_path.is_dir")?; @@ -883,10 +916,14 @@ pub(super) async fn remove_by_file_path( ) .await?; } else { - db.file_path() - .delete(file_path::pub_id::equals(file_path.pub_id.clone())) - .exec() - .await?; + sync.write_op( + &db, + sync.shared_delete(prisma_sync::file_path::SyncId { + pub_id: file_path.pub_id.clone(), + }), + db.file_path().delete(file_path::id::equals(file_path.id)), + ) + .await?; if let Some(object_id) = file_path.object_id { db.object() diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index c11bb6006..92246cf4f 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -741,6 +741,8 @@ pub async fn delete_location( library: &Arc, location_id: location::id::Type, ) -> Result<(), LocationError> { + let Library { db, sync, .. } = library.as_ref(); + let start = Instant::now(); node.locations.remove(location_id, library.clone()).await?; debug!( @@ -808,12 +810,14 @@ pub async fn delete_location( let start = Instant::now(); - library - .db - .location() - .delete(location::id::equals(location_id)) - .exec() - .await?; + sync.write_op( + db, + sync.shared_delete(prisma_sync::location::SyncId { + pub_id: location.pub_id, + }), + db.location().delete(location::id::equals(location_id)), + ) + .await?; debug!( "Elapsed time to delete location from db: {:?}", @@ -836,6 +840,8 @@ pub async fn delete_directory( ) -> Result<(), QueryError> { let Library { db, .. } = library; + // This is NOT sync-compatible! + // Sync requires having sync ids available. let children_params = sd_utils::chain_optional_iter( [file_path::location_id::equals(Some(location_id))], [parent_iso_file_path.and_then(|parent| { @@ -1061,30 +1067,60 @@ pub async fn create_file_path( location_id, ))?; - let params = { + let (sync_params, db_params): (Vec<_>, Vec<_>) = { use file_path::*; - vec![ + [ ( - location::NAME, - json!(prisma_sync::location::SyncId { - pub_id: location.pub_id - }), + ( + location::NAME, + json!(prisma_sync::location::SyncId { + pub_id: location.pub_id + }), + ), + location::connect(prisma::location::id::equals(location.id)), ), - (cas_id::NAME, json!(cas_id)), - (materialized_path::NAME, json!(materialized_path)), - (name::NAME, json!(name)), - (extension::NAME, json!(extension)), + ((cas_id::NAME, json!(cas_id)), cas_id::set(cas_id)), ( - size_in_bytes_bytes::NAME, - json!(metadata.size_in_bytes.to_be_bytes().to_vec()), + (materialized_path::NAME, json!(materialized_path)), + materialized_path::set(Some(materialized_path.into())), + ), + ((name::NAME, json!(name)), name::set(Some(name.into()))), + ( + (extension::NAME, json!(extension)), + extension::set(Some(extension.into())), + ), + ( + ( + size_in_bytes_bytes::NAME, + json!(metadata.size_in_bytes.to_be_bytes().to_vec()), + ), + size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())), + ), + ( + (inode::NAME, json!(metadata.inode.to_le_bytes())), + inode::set(Some(inode_to_db(metadata.inode))), + ), + ((is_dir::NAME, json!(is_dir)), is_dir::set(Some(is_dir))), + ( + (date_created::NAME, json!(metadata.created_at)), + date_created::set(Some(metadata.created_at.into())), + ), + ( + (date_modified::NAME, json!(metadata.modified_at)), + date_modified::set(Some(metadata.modified_at.into())), + ), + ( + (date_indexed::NAME, json!(indexed_at)), + date_indexed::set(Some(indexed_at.into())), + ), + ( + (hidden::NAME, json!(metadata.hidden)), + hidden::set(Some(metadata.hidden)), ), - (inode::NAME, json!(metadata.inode.to_le_bytes())), - (is_dir::NAME, json!(is_dir)), - (date_created::NAME, json!(metadata.created_at)), - (date_modified::NAME, json!(metadata.modified_at)), - (date_indexed::NAME, json!(indexed_at)), ] + .into_iter() + .unzip() }; let pub_id = sd_utils::uuid_to_bytes(Uuid::new_v4()); @@ -1097,27 +1133,9 @@ pub async fn create_file_path( prisma_sync::file_path::SyncId { pub_id: pub_id.clone(), }, - params, + sync_params, ), - db.file_path().create(pub_id, { - use file_path::*; - vec![ - location::connect(prisma::location::id::equals(location.id)), - materialized_path::set(Some(materialized_path.into())), - name::set(Some(name.into())), - extension::set(Some(extension.into())), - inode::set(Some(inode_to_db(metadata.inode))), - cas_id::set(cas_id), - is_dir::set(Some(is_dir)), - size_in_bytes_bytes::set(Some( - metadata.size_in_bytes.to_be_bytes().to_vec(), - )), - date_created::set(Some(metadata.created_at.into())), - date_modified::set(Some(metadata.modified_at.into())), - date_indexed::set(Some(indexed_at.into())), - hidden::set(Some(metadata.hidden)), - ] - }), + db.file_path().create(pub_id, db_params), ), ) .await?; diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index 23ba73ed2..50a1c53d1 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -7,7 +7,11 @@ use crate::{ location::get_location_path_from_location_id, }; -use sd_prisma::prisma::{file_path, location}; +use sd_prisma::{ + prisma::{file_path, location}, + prisma_sync, +}; +use sd_sync::OperationFactory; use sd_utils::{db::maybe_missing, error::FileIOError}; use std::hash::Hash; @@ -70,6 +74,8 @@ impl StatefulJob for FileDeleterJobInit { // need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui) // maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be deleted (and where?) + let Library { db, sync, .. } = ctx.library.as_ref(); + match if maybe_missing(step.file_path.is_dir, "file_path.is_dir")? { fs::remove_dir_all(&step.full_path).await } else { @@ -81,12 +87,15 @@ impl StatefulJob for FileDeleterJobInit { "File not found in the file system, will remove from database: {}", step.full_path.display() ); - ctx.library - .db - .file_path() - .delete(file_path::id::equals(step.file_path.id)) - .exec() - .await?; + sync.write_op( + &db, + sync.shared_delete(prisma_sync::file_path::SyncId { + pub_id: step.file_path.pub_id.clone(), + }), + db.file_path() + .delete(file_path::id::equals(step.file_path.id)), + ) + .await?; } Err(e) => { return Err(JobError::from(FileIOError::from((&step.full_path, e)))); diff --git a/crates/file-path-helper/src/lib.rs b/crates/file-path-helper/src/lib.rs index 555b54b9a..f6dbe9542 100644 --- a/crates/file-path-helper/src/lib.rs +++ b/crates/file-path-helper/src/lib.rs @@ -22,7 +22,7 @@ pub use isolated_file_path_data::{ }; // File Path selectables! -file_path::select!(file_path_pub_and_cas_ids { pub_id cas_id }); +file_path::select!(file_path_pub_and_cas_ids { id pub_id cas_id }); file_path::select!(file_path_just_pub_id_materialized_path { pub_id materialized_path