From 420aa75da53b37197bc101d8a1d0dc49db28b5b0 Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Fri, 3 Mar 2023 13:23:10 +0800 Subject: [PATCH] More sync impls (#586) tags and stuff --- core/prisma/schema.prisma | 2 +- core/src/api/locations.rs | 12 +-- core/src/api/tags.rs | 73 ++++++++++--- core/src/location/indexer/indexer_job.rs | 9 +- core/src/location/mod.rs | 85 ++++++++++----- core/src/sync/manager.rs | 128 ++++++++++++++++------- 6 files changed, 213 insertions(+), 96 deletions(-) diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 625b6f577..2b52b4188 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -155,7 +155,7 @@ model Object { @@map("object") } -/// @shared(id: [location, id]) +/// @owned(id: [location, id]) model FilePath { id Int is_dir Boolean @default(false) diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index 58ca0d094..5038f76d1 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -1,4 +1,5 @@ use crate::{ + library::LibraryContext, location::{ delete_location, fetch_location, indexer::{indexer_job::indexer_job_location, rules::IndexerRuleCreateArgs}, @@ -79,8 +80,9 @@ pub(crate) fn mount() -> impl RouterBuilderLike { } t(|_, mut args: LocationExplorerArgs, library| async move { - let location = library - .db + let LibraryContext { db, .. } = &library; + + let location = db .location() .find_unique(location::id::equals(args.location_id)) .exec() @@ -93,8 +95,7 @@ pub(crate) fn mount() -> impl RouterBuilderLike { args.path += "/"; } - let directory = library - .db + let directory = db .file_path() .find_first(vec![ file_path::location_id::equals(location.id), @@ -107,8 +108,7 @@ pub(crate) fn mount() -> impl RouterBuilderLike { rspc::Error::new(ErrorCode::NotFound, "Directory not found".into()) })?; - let file_paths = library - .db + let file_paths = db .file_path() .find_many(vec![ file_path::location_id::equals(location.id), diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index 5e54c407e..dfcf500b9 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -1,6 +1,7 @@ use rspc::{ErrorCode, Type}; use serde::Deserialize; +use serde_json::json; use tracing::info; use uuid::Uuid; @@ -9,6 +10,7 @@ use crate::{ invalidate_query, library::LibraryContext, prisma::{object, tag, tag_on_object}, + sync, }; use super::{utils::LibraryRequest, RouterBuilder}; @@ -138,17 +140,27 @@ pub(crate) fn mount() -> RouterBuilder { } t(|_, args: TagCreateArgs, library| async move { - let created_tag = library - .db - .tag() - .create( - Uuid::new_v4().as_bytes().to_vec(), - vec![ - tag::name::set(Some(args.name)), - tag::color::set(Some(args.color)), - ], + let LibraryContext { db, sync, .. } = &library; + + let pub_id = Uuid::new_v4().as_bytes().to_vec(); + + let created_tag = sync + .write_op( + db, + sync.unique_shared_create( + sync::tag::SyncId { + pub_id: pub_id.clone(), + }, + [("name", json!(args.name)), ("color", json!(args.color))], + ), + db.tag().create( + pub_id, + vec![ + tag::name::set(Some(args.name)), + tag::color::set(Some(args.color)), + ], + ), ) - .exec() .await?; invalidate_query!(library, "tags.list"); @@ -199,15 +211,42 @@ pub(crate) fn mount() -> RouterBuilder { } t(|_, args: TagUpdateArgs, library| async move { - library - .db + let LibraryContext { sync, db, .. } = &library; + + let tag = db .tag() - .update( - tag::id::equals(args.id), - vec![tag::name::set(args.name), tag::color::set(args.color)], - ) + .find_unique(tag::id::equals(args.id)) + .select(tag::select!({ pub_id })) .exec() - .await?; + .await? + .unwrap(); + + sync.write_ops( + db, + ( + [ + args.name.as_ref().map(|v| ("name", json!(v))), + args.color.as_ref().map(|v| ("color", json!(v))), + ] + .into_iter() + .flatten() + .map(|(k, v)| { + sync.shared_update( + sync::tag::SyncId { + pub_id: tag.pub_id.clone(), + }, + k, + v, + ) + }) + .collect(), + db.tag().update( + tag::id::equals(args.id), + vec![tag::name::set(args.name), tag::color::set(args.color)], + ), + ), + ) + .await?; invalidate_query!(library, "tags.list"); diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 268456a07..dc02b96e4 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -1,5 +1,6 @@ use crate::{ job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + library::LibraryContext, location::indexer::rules::RuleKind, prisma::{file_path, location}, sync, @@ -217,7 +218,7 @@ impl StatefulJob for IndexerJob { ctx: WorkerContext, state: &mut JobState, ) -> Result<(), JobError> { - let db = &ctx.library_ctx.db; + let LibraryContext { sync, db, .. } = &ctx.library_ctx; let location = &state.init.location; @@ -284,12 +285,10 @@ impl StatefulJob for IndexerJob { }) .unzip(); - let count = ctx - .library_ctx - .sync + let count = sync .write_op( db, - ctx.library_ctx.sync.owned_create_many(sync_stuff, true), + sync.owned_create_many(sync_stuff, true), db.file_path().create_many(paths).skip_duplicates(), ) .await?; diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index dffd65796..8a310a027 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -164,33 +164,51 @@ pub struct LocationUpdateArgs { impl LocationUpdateArgs { pub async fn update(self, ctx: &LibraryContext) -> Result<(), LocationError> { + let LibraryContext { sync, db, .. } = &ctx; + let location = fetch_location(ctx, self.id) .include(location::include!({ indexer_rules })) .exec() .await? .ok_or(LocationError::IdNotFound(self.id))?; - let params = [ + let (sync_params, db_params): (Vec<_>, Vec<_>) = [ self.name .clone() .filter(|name| &location.name != name) - .map(location::name::set), - self.generate_preview_media - .map(location::generate_preview_media::set), - self.sync_preview_media - .map(location::sync_preview_media::set), - self.hidden.map(location::hidden::set), + .map(|v| (("name", json!(v)), location::name::set(v))), + self.generate_preview_media.map(|v| { + ( + ("generate_preview_media", json!(v)), + location::generate_preview_media::set(v), + ) + }), + self.sync_preview_media.map(|v| { + ( + ("sync_preview_media", json!(v)), + location::sync_preview_media::set(v), + ) + }), + self.hidden + .map(|v| (("hidden", json!(v)), location::hidden::set(v))), ] .into_iter() .flatten() - .collect::>(); + .unzip(); - if !params.is_empty() { - ctx.db - .location() - .update(location::id::equals(self.id), params) - .exec() - .await?; + if !sync_params.is_empty() { + sync.write_op( + db, + sync.owned_update( + sync::location::SyncId { + pub_id: location.pub_id, + }, + sync_params, + ), + db.location() + .update(location::id::equals(self.id), db_params), + ) + .await?; if location.node_id == ctx.node_local_id { if let Some(mut metadata) = @@ -301,26 +319,35 @@ pub async fn relink_location( ctx: &LibraryContext, location_path: impl AsRef, ) -> Result<(), LocationError> { + let LibraryContext { db, id, sync, .. } = &ctx; + let mut metadata = SpacedriveLocationMetadataFile::try_load(&location_path) .await? .ok_or_else(|| LocationError::MissingMetadataFile(location_path.as_ref().to_path_buf()))?; - metadata.relink(ctx.id, &location_path).await?; + metadata.relink(*id, &location_path).await?; - ctx.db - .location() - .update( - location::pub_id::equals(metadata.location_pub_id(ctx.id)?.as_ref().to_vec()), - vec![location::path::set( - location_path - .as_ref() - .to_str() - .expect("Found non-UTF-8 path") - .to_string(), - )], - ) - .exec() - .await?; + let pub_id = metadata.location_pub_id(ctx.id)?.as_ref().to_vec(); + let path = location_path + .as_ref() + .to_str() + .expect("Found non-UTF-8 path") + .to_string(); + + sync.write_op( + db, + sync.owned_update( + sync::location::SyncId { + pub_id: pub_id.clone(), + }, + [("path", json!(path))], + ), + db.location().update( + location::pub_id::equals(pub_id), + vec![location::path::set(path)], + ), + ) + .await?; Ok(()) } diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs index cae46427f..b475fa88a 100644 --- a/core/src/sync/manager.rs +++ b/core/src/sync/manager.rs @@ -1,8 +1,10 @@ use crate::{ - prisma::{file_path, location, node, object, owned_operation, shared_operation, PrismaClient}, - prisma_sync, + prisma::{ + file_path, location, node, object, owned_operation, shared_operation, tag, PrismaClient, + }, + sync, }; - +use prisma_client_rust::ModelTypes; use sd_sync::*; use futures::future::join_all; @@ -196,15 +198,15 @@ impl SyncManager { } pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> { + let db = &self.db; + match op.typ { CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() { - "FilePath" => { + file_path::Types::MODEL => { for item in owned_op.items { - let id: prisma_sync::file_path::SyncId = - serde_json::from_value(item.id).unwrap(); + let id: sync::file_path::SyncId = serde_json::from_value(item.id).unwrap(); - let location = self - .db + let location = db .location() .find_unique(location::pub_id::equals(id.location.pub_id)) .select(location::select!({ id })) @@ -214,8 +216,7 @@ impl SyncManager { match item.data { OwnedOperationData::Create(mut data) => { - self.db - .file_path() + db.file_path() .create( id.id, location::id::equals(location.id), @@ -244,21 +245,21 @@ impl SyncManager { values, skip_duplicates, } => { - let location_ids = - values - .iter() - .map(|(id, _)| { - serde_json::from_value::(id.clone()) - .unwrap() - .location - .pub_id - }) - .collect::>(); + let location_ids = values + .iter() + .map(|(id, _)| { + serde_json::from_value::( + id.clone(), + ) + .unwrap() + .location + .pub_id + }) + .collect::>(); let location_id_mappings = join_all(location_ids.iter().map(|id| async move { - self.db - .location() + db.location() .find_unique(location::pub_id::equals(id.clone())) .exec() .await @@ -270,11 +271,11 @@ impl SyncManager { .flatten() .collect::>(); - let mut q = self.db.file_path().create_many( + let mut q = db.file_path().create_many( values .into_iter() .map(|(id, mut data)| { - let id: prisma_sync::file_path::SyncId = + let id: sync::file_path::SyncId = serde_json::from_value(id).unwrap(); file_path::create_unchecked( @@ -330,14 +331,13 @@ impl SyncManager { } } } - "Location" => { + location::Types::MODEL => { for item in owned_op.items { - let id: prisma_sync::location::SyncId = from_value(item.id).unwrap(); + let id: sync::location::SyncId = from_value(item.id).unwrap(); match item.data { OwnedOperationData::Create(mut data) => { - self.db - .location() + db.location() .create( id.pub_id, serde_json::from_value(data.remove("name").unwrap()) @@ -368,13 +368,12 @@ impl SyncManager { _ => {} }, CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() { - "Object" => { - let id: prisma_sync::object::SyncId = from_value(shared_op.record_id).unwrap(); + object::Types::MODEL => { + let id: sync::object::SyncId = from_value(shared_op.record_id).unwrap(); match shared_op.data { SharedOperationData::Create(_) => { - self.db - .object() + db.object() .upsert( object::pub_id::equals(id.pub_id.clone()), (id.pub_id, vec![]), @@ -385,8 +384,7 @@ impl SyncManager { .ok(); } SharedOperationData::Update { field, value } => { - self.db - .object() + db.object() .update( object::pub_id::equals(id.pub_id), vec![object::SetParam::deserialize(&field, value).unwrap()], @@ -397,6 +395,41 @@ impl SyncManager { _ => todo!(), } } + tag::Types::MODEL => { + let sync::tag::SyncId { pub_id } = from_value(shared_op.record_id).unwrap(); + + match shared_op.data { + SharedOperationData::Create(create_data) => match create_data { + SharedOperationCreateData::Unique(create_data) => { + db.tag() + .create( + pub_id, + create_data + .into_iter() + .flat_map(|(field, value)| { + tag::SetParam::deserialize(&field, value) + }) + .collect(), + ) + .exec() + .await?; + } + _ => unreachable!(), + }, + SharedOperationData::Update { field, value } => { + db.tag() + .update( + tag::pub_id::equals(pub_id), + vec![tag::SetParam::deserialize(&field, value).unwrap()], + ) + .exec() + .await?; + } + SharedOperationData::Delete => { + db.tag().delete(tag::pub_id::equals(pub_id)).exec().await?; + } + } + } _ => todo!(), }, _ => {} @@ -441,7 +474,7 @@ impl SyncManager { pub fn owned_create_many< const SIZE: usize, TSyncId: SyncId, - TModel: SyncType, + TModel: SyncType, >( &self, data: impl IntoIterator, @@ -467,13 +500,12 @@ impl SyncManager { })) } pub fn owned_update< - const SIZE: usize, TSyncId: SyncId, - TModel: SyncType, + TModel: SyncType, >( &self, id: TSyncId, - values: [(&'static str, Value); SIZE], + values: impl IntoIterator, ) -> CRDTOperation { self.new_op(CRDTOperationType::Owned(OwnedOperation { model: TModel::MODEL.to_string(), @@ -502,6 +534,26 @@ impl SyncManager { data: SharedOperationData::Create(SharedOperationCreateData::Atomic), })) } + pub fn unique_shared_create< + const SIZE: usize, + TSyncId: SyncId, + TModel: SyncType, + >( + &self, + id: TSyncId, + values: [(&'static str, Value); SIZE], + ) -> CRDTOperation { + self.new_op(CRDTOperationType::Shared(SharedOperation { + model: TModel::MODEL.to_string(), + record_id: json!(id), + data: SharedOperationData::Create(SharedOperationCreateData::Unique( + values + .into_iter() + .map(|(name, value)| (name.to_string(), value)) + .collect(), + )), + })) + } pub fn shared_update< TSyncId: SyncId, TModel: SyncType,