From 2cb1611cb1aca720ef8a775333afc6f0b137c9cd Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Tue, 25 Nov 2025 10:03:17 -0800 Subject: [PATCH] Switch to shared sync for spaces and related entities - Add apply_shared_change handlers for Space, SpaceGroup, and SpaceItem to upsert by UUID and delete by UUID - Update registry to use shared_with_query and call apply_shared_change; remove deletion hooks - Enhance SyncMonitorPopover to display a colored state badge in a header bar --- core/src/infra/db/entities/space.rs | 96 ++++++++++++++++ core/src/infra/db/entities/space_group.rs | 104 ++++++++++++++++++ core/src/infra/db/entities/space_item.rs | 100 +++++++++++++++++ core/src/infra/sync/registry.rs | 29 ++--- .../SyncMonitor/SyncMonitorPopover.tsx | 57 +++++++--- 5 files changed, 351 insertions(+), 35 deletions(-) diff --git a/core/src/infra/db/entities/space.rs b/core/src/infra/db/entities/space.rs index 07fb425d2..f54dbaf20 100644 --- a/core/src/infra/db/entities/space.rs +++ b/core/src/infra/db/entities/space.rs @@ -108,4 +108,100 @@ impl Syncable for Model { Ok(sync_results) } + + async fn apply_shared_change( + entry: crate::infra::sync::SharedChangeEntry, + db: &DatabaseConnection, + ) -> Result<(), sea_orm::DbErr> { + use crate::infra::sync::ChangeType; + use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet}; + + match entry.change_type { + ChangeType::Insert | ChangeType::Update => { + let data = entry.data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("Space data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + let active = ActiveModel { + id: NotSet, + uuid: Set(uuid), + name: Set(serde_json::from_value( + data.get("name") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing name".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid name: {}", e)))?), + icon: Set(serde_json::from_value( + data.get("icon") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing icon".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid icon: {}", e)))?), + color: Set(serde_json::from_value( + data.get("color") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing color".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid color: {}", e)))?), + order: Set(serde_json::from_value( + data.get("order") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?), + created_at: Set(serde_json::from_value( + data.get("created_at") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?), + updated_at: Set(serde_json::from_value( + data.get("updated_at") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing updated_at".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid updated_at: {}", e)))?), + }; + + // Upsert by UUID + let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?; + + if let Some(existing_model) = existing { + let mut active = active; + active.id = Set(existing_model.id); + active.update(db).await?; + } else { + active.insert(db).await?; + } + + Ok(()) + } + ChangeType::Delete => { + let data = entry.data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("Space data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + Entity::delete_many() + .filter(Column::Uuid.eq(uuid)) + .exec(db) + .await?; + + Ok(()) + } + } + } } diff --git a/core/src/infra/db/entities/space_group.rs b/core/src/infra/db/entities/space_group.rs index 11b2dfc76..264a54775 100644 --- a/core/src/infra/db/entities/space_group.rs +++ b/core/src/infra/db/entities/space_group.rs @@ -118,4 +118,108 @@ impl Syncable for Model { Ok(sync_results) } + + async fn apply_shared_change( + entry: crate::infra::sync::SharedChangeEntry, + db: &DatabaseConnection, + ) -> Result<(), sea_orm::DbErr> { + use crate::infra::sync::{ChangeType, fk_mapper}; + use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet}; + + match entry.change_type { + ChangeType::Insert | ChangeType::Update => { + // Map UUIDs to local IDs for FK fields + let data = + fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db) + .await + .map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?; + + let data = data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("SpaceGroup data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + let space_id: i32 = serde_json::from_value( + data.get("space_id") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing space_id".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid space_id: {}", e)))?; + + let active = ActiveModel { + id: NotSet, + uuid: Set(uuid), + space_id: Set(space_id), + name: Set(serde_json::from_value( + data.get("name") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing name".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid name: {}", e)))?), + group_type: Set(serde_json::from_value( + data.get("group_type") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing group_type".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid group_type: {}", e)))?), + is_collapsed: Set(serde_json::from_value( + data.get("is_collapsed") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing is_collapsed".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid is_collapsed: {}", e)))?), + order: Set(serde_json::from_value( + data.get("order") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?), + created_at: Set(serde_json::from_value( + data.get("created_at") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?), + }; + + // Upsert by UUID + let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?; + + if let Some(existing_model) = existing { + let mut active = active; + active.id = Set(existing_model.id); + active.update(db).await?; + } else { + active.insert(db).await?; + } + + Ok(()) + } + ChangeType::Delete => { + let data = entry.data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("SpaceGroup data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + Entity::delete_many() + .filter(Column::Uuid.eq(uuid)) + .exec(db) + .await?; + + Ok(()) + } + } + } } diff --git a/core/src/infra/db/entities/space_item.rs b/core/src/infra/db/entities/space_item.rs index 15544efd6..33a9a2dc8 100644 --- a/core/src/infra/db/entities/space_item.rs +++ b/core/src/infra/db/entities/space_item.rs @@ -124,4 +124,104 @@ impl Syncable for Model { Ok(sync_results) } + + async fn apply_shared_change( + entry: crate::infra::sync::SharedChangeEntry, + db: &DatabaseConnection, + ) -> Result<(), sea_orm::DbErr> { + use crate::infra::sync::{ChangeType, fk_mapper}; + use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet}; + + match entry.change_type { + ChangeType::Insert | ChangeType::Update => { + // Map UUIDs to local IDs for FK fields + let data = + fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db) + .await + .map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?; + + let data = data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("SpaceItem data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + let space_id: i32 = serde_json::from_value( + data.get("space_id") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing space_id".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid space_id: {}", e)))?; + + let group_id: Option = serde_json::from_value( + data.get("group_id") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing group_id".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid group_id: {}", e)))?; + + let active = ActiveModel { + id: NotSet, + uuid: Set(uuid), + space_id: Set(space_id), + group_id: Set(group_id), + item_type: Set(serde_json::from_value( + data.get("item_type") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing item_type".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid item_type: {}", e)))?), + order: Set(serde_json::from_value( + data.get("order") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?), + created_at: Set(serde_json::from_value( + data.get("created_at") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?), + }; + + // Upsert by UUID + let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?; + + if let Some(existing_model) = existing { + let mut active = active; + active.id = Set(existing_model.id); + active.update(db).await?; + } else { + active.insert(db).await?; + } + + Ok(()) + } + ChangeType::Delete => { + let data = entry.data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("SpaceItem data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + Entity::delete_many() + .filter(Column::Uuid.eq(uuid)) + .exec(db) + .await?; + + Ok(()) + } + } + } } diff --git a/core/src/infra/sync/registry.rs b/core/src/infra/sync/registry.rs index ea558da3c..a582ccc00 100644 --- a/core/src/infra/sync/registry.rs +++ b/core/src/infra/sync/registry.rs @@ -476,14 +476,14 @@ fn initialize_registry() -> HashMap { ), ); - // Space models (device-owned) + // Space models (shared) registry.insert( "space".to_string(), - SyncableModelRegistration::device_owned( + SyncableModelRegistration::shared_with_query( "space", "spaces", - |data, db| { - Box::pin(async move { space::Model::apply_state_change(data, db.as_ref()).await }) + |entry, db| { + Box::pin(async move { space::Model::apply_shared_change(entry, db.as_ref()).await }) }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { @@ -491,20 +491,17 @@ fn initialize_registry() -> HashMap { .await }) }, - Some(|uuid, db| { - Box::pin(async move { space::Model::apply_deletion(uuid, db.as_ref()).await }) - }), ), ); registry.insert( "space_group".to_string(), - SyncableModelRegistration::device_owned( + SyncableModelRegistration::shared_with_query( "space_group", "space_groups", - |data, db| { + |entry, db| { Box::pin(async move { - space_group::Model::apply_state_change(data, db.as_ref()).await + space_group::Model::apply_shared_change(entry, db.as_ref()).await }) }, |device_id, since, cursor, batch_size, db| { @@ -519,20 +516,17 @@ fn initialize_registry() -> HashMap { .await }) }, - Some(|uuid, db| { - Box::pin(async move { space_group::Model::apply_deletion(uuid, db.as_ref()).await }) - }), ), ); registry.insert( "space_item".to_string(), - SyncableModelRegistration::device_owned( + SyncableModelRegistration::shared_with_query( "space_item", "space_items", - |data, db| { + |entry, db| { Box::pin(async move { - space_item::Model::apply_state_change(data, db.as_ref()).await + space_item::Model::apply_shared_change(entry, db.as_ref()).await }) }, |device_id, since, cursor, batch_size, db| { @@ -547,9 +541,6 @@ fn initialize_registry() -> HashMap { .await }) }, - Some(|uuid, db| { - Box::pin(async move { space_item::Model::apply_deletion(uuid, db.as_ref()).await }) - }), ), ); diff --git a/packages/interface/src/components/SyncMonitor/SyncMonitorPopover.tsx b/packages/interface/src/components/SyncMonitor/SyncMonitorPopover.tsx index 4e915ba72..46953bbef 100644 --- a/packages/interface/src/components/SyncMonitor/SyncMonitorPopover.tsx +++ b/packages/interface/src/components/SyncMonitor/SyncMonitorPopover.tsx @@ -93,22 +93,47 @@ export function SyncMonitorPopover({ className }: SyncMonitorPopoverProps) { function SyncMonitorContent({ showActivityFeed }: { showActivityFeed: boolean }) { const sync = useSyncMonitor(); + const getStateColor = (state: string) => { + switch (state) { + case 'Ready': + return 'bg-green-500'; + case 'Backfilling': + return 'bg-yellow-500'; + case 'CatchingUp': + return 'bg-blue-500'; + case 'Uninitialized': + return 'bg-ink-faint'; + case 'Paused': + return 'bg-ink-dull'; + default: + return 'bg-ink-faint'; + } + }; + return ( - - {showActivityFeed ? ( - - ) : ( - - )} - + <> +
+
+
+ {sync.currentState} +
+
+ + {showActivityFeed ? ( + + ) : ( + + )} + + ); }