mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-18 05:15:16 -04:00
Switch to shared sync for spaces and related entities
- Add apply_shared_change handlers for Space, SpaceGroup, and SpaceItem to upsert by UUID and delete by UUID - Update registry to use shared_with_query and call apply_shared_change; remove deletion hooks - Enhance SyncMonitorPopover to display a colored state badge in a header bar
This commit is contained in:
@@ -108,4 +108,100 @@ impl Syncable for Model {
|
||||
|
||||
Ok(sync_results)
|
||||
}
|
||||
|
||||
async fn apply_shared_change(
|
||||
entry: crate::infra::sync::SharedChangeEntry,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
use crate::infra::sync::ChangeType;
|
||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet};
|
||||
|
||||
match entry.change_type {
|
||||
ChangeType::Insert | ChangeType::Update => {
|
||||
let data = entry.data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("Space data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
let active = ActiveModel {
|
||||
id: NotSet,
|
||||
uuid: Set(uuid),
|
||||
name: Set(serde_json::from_value(
|
||||
data.get("name")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing name".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid name: {}", e)))?),
|
||||
icon: Set(serde_json::from_value(
|
||||
data.get("icon")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing icon".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid icon: {}", e)))?),
|
||||
color: Set(serde_json::from_value(
|
||||
data.get("color")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing color".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid color: {}", e)))?),
|
||||
order: Set(serde_json::from_value(
|
||||
data.get("order")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?),
|
||||
created_at: Set(serde_json::from_value(
|
||||
data.get("created_at")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?),
|
||||
updated_at: Set(serde_json::from_value(
|
||||
data.get("updated_at")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing updated_at".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid updated_at: {}", e)))?),
|
||||
};
|
||||
|
||||
// Upsert by UUID
|
||||
let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?;
|
||||
|
||||
if let Some(existing_model) = existing {
|
||||
let mut active = active;
|
||||
active.id = Set(existing_model.id);
|
||||
active.update(db).await?;
|
||||
} else {
|
||||
active.insert(db).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
ChangeType::Delete => {
|
||||
let data = entry.data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("Space data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
Entity::delete_many()
|
||||
.filter(Column::Uuid.eq(uuid))
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,4 +118,108 @@ impl Syncable for Model {
|
||||
|
||||
Ok(sync_results)
|
||||
}
|
||||
|
||||
async fn apply_shared_change(
|
||||
entry: crate::infra::sync::SharedChangeEntry,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
use crate::infra::sync::{ChangeType, fk_mapper};
|
||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet};
|
||||
|
||||
match entry.change_type {
|
||||
ChangeType::Insert | ChangeType::Update => {
|
||||
// Map UUIDs to local IDs for FK fields
|
||||
let data =
|
||||
fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db)
|
||||
.await
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?;
|
||||
|
||||
let data = data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("SpaceGroup data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
let space_id: i32 = serde_json::from_value(
|
||||
data.get("space_id")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing space_id".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid space_id: {}", e)))?;
|
||||
|
||||
let active = ActiveModel {
|
||||
id: NotSet,
|
||||
uuid: Set(uuid),
|
||||
space_id: Set(space_id),
|
||||
name: Set(serde_json::from_value(
|
||||
data.get("name")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing name".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid name: {}", e)))?),
|
||||
group_type: Set(serde_json::from_value(
|
||||
data.get("group_type")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing group_type".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid group_type: {}", e)))?),
|
||||
is_collapsed: Set(serde_json::from_value(
|
||||
data.get("is_collapsed")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing is_collapsed".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid is_collapsed: {}", e)))?),
|
||||
order: Set(serde_json::from_value(
|
||||
data.get("order")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?),
|
||||
created_at: Set(serde_json::from_value(
|
||||
data.get("created_at")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?),
|
||||
};
|
||||
|
||||
// Upsert by UUID
|
||||
let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?;
|
||||
|
||||
if let Some(existing_model) = existing {
|
||||
let mut active = active;
|
||||
active.id = Set(existing_model.id);
|
||||
active.update(db).await?;
|
||||
} else {
|
||||
active.insert(db).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
ChangeType::Delete => {
|
||||
let data = entry.data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("SpaceGroup data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
Entity::delete_many()
|
||||
.filter(Column::Uuid.eq(uuid))
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,4 +124,104 @@ impl Syncable for Model {
|
||||
|
||||
Ok(sync_results)
|
||||
}
|
||||
|
||||
async fn apply_shared_change(
|
||||
entry: crate::infra::sync::SharedChangeEntry,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
use crate::infra::sync::{ChangeType, fk_mapper};
|
||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set, NotSet};
|
||||
|
||||
match entry.change_type {
|
||||
ChangeType::Insert | ChangeType::Update => {
|
||||
// Map UUIDs to local IDs for FK fields
|
||||
let data =
|
||||
fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db)
|
||||
.await
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?;
|
||||
|
||||
let data = data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("SpaceItem data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
let space_id: i32 = serde_json::from_value(
|
||||
data.get("space_id")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing space_id".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid space_id: {}", e)))?;
|
||||
|
||||
let group_id: Option<i32> = serde_json::from_value(
|
||||
data.get("group_id")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing group_id".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid group_id: {}", e)))?;
|
||||
|
||||
let active = ActiveModel {
|
||||
id: NotSet,
|
||||
uuid: Set(uuid),
|
||||
space_id: Set(space_id),
|
||||
group_id: Set(group_id),
|
||||
item_type: Set(serde_json::from_value(
|
||||
data.get("item_type")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing item_type".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid item_type: {}", e)))?),
|
||||
order: Set(serde_json::from_value(
|
||||
data.get("order")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing order".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid order: {}", e)))?),
|
||||
created_at: Set(serde_json::from_value(
|
||||
data.get("created_at")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing created_at".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid created_at: {}", e)))?),
|
||||
};
|
||||
|
||||
// Upsert by UUID
|
||||
let existing = Entity::find().filter(Column::Uuid.eq(uuid)).one(db).await?;
|
||||
|
||||
if let Some(existing_model) = existing {
|
||||
let mut active = active;
|
||||
active.id = Set(existing_model.id);
|
||||
active.update(db).await?;
|
||||
} else {
|
||||
active.insert(db).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
ChangeType::Delete => {
|
||||
let data = entry.data.as_object().ok_or_else(|| {
|
||||
sea_orm::DbErr::Custom("SpaceItem data is not an object".to_string())
|
||||
})?;
|
||||
|
||||
let uuid: Uuid = serde_json::from_value(
|
||||
data.get("uuid")
|
||||
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
|
||||
|
||||
Entity::delete_many()
|
||||
.filter(Column::Uuid.eq(uuid))
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -476,14 +476,14 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
|
||||
),
|
||||
);
|
||||
|
||||
// Space models (device-owned)
|
||||
// Space models (shared)
|
||||
registry.insert(
|
||||
"space".to_string(),
|
||||
SyncableModelRegistration::device_owned(
|
||||
SyncableModelRegistration::shared_with_query(
|
||||
"space",
|
||||
"spaces",
|
||||
|data, db| {
|
||||
Box::pin(async move { space::Model::apply_state_change(data, db.as_ref()).await })
|
||||
|entry, db| {
|
||||
Box::pin(async move { space::Model::apply_shared_change(entry, db.as_ref()).await })
|
||||
},
|
||||
|device_id, since, cursor, batch_size, db| {
|
||||
Box::pin(async move {
|
||||
@@ -491,20 +491,17 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
|
||||
.await
|
||||
})
|
||||
},
|
||||
Some(|uuid, db| {
|
||||
Box::pin(async move { space::Model::apply_deletion(uuid, db.as_ref()).await })
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
registry.insert(
|
||||
"space_group".to_string(),
|
||||
SyncableModelRegistration::device_owned(
|
||||
SyncableModelRegistration::shared_with_query(
|
||||
"space_group",
|
||||
"space_groups",
|
||||
|data, db| {
|
||||
|entry, db| {
|
||||
Box::pin(async move {
|
||||
space_group::Model::apply_state_change(data, db.as_ref()).await
|
||||
space_group::Model::apply_shared_change(entry, db.as_ref()).await
|
||||
})
|
||||
},
|
||||
|device_id, since, cursor, batch_size, db| {
|
||||
@@ -519,20 +516,17 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
|
||||
.await
|
||||
})
|
||||
},
|
||||
Some(|uuid, db| {
|
||||
Box::pin(async move { space_group::Model::apply_deletion(uuid, db.as_ref()).await })
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
registry.insert(
|
||||
"space_item".to_string(),
|
||||
SyncableModelRegistration::device_owned(
|
||||
SyncableModelRegistration::shared_with_query(
|
||||
"space_item",
|
||||
"space_items",
|
||||
|data, db| {
|
||||
|entry, db| {
|
||||
Box::pin(async move {
|
||||
space_item::Model::apply_state_change(data, db.as_ref()).await
|
||||
space_item::Model::apply_shared_change(entry, db.as_ref()).await
|
||||
})
|
||||
},
|
||||
|device_id, since, cursor, batch_size, db| {
|
||||
@@ -547,9 +541,6 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
|
||||
.await
|
||||
})
|
||||
},
|
||||
Some(|uuid, db| {
|
||||
Box::pin(async move { space_item::Model::apply_deletion(uuid, db.as_ref()).await })
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -93,22 +93,47 @@ export function SyncMonitorPopover({ className }: SyncMonitorPopoverProps) {
|
||||
function SyncMonitorContent({ showActivityFeed }: { showActivityFeed: boolean }) {
|
||||
const sync = useSyncMonitor();
|
||||
|
||||
const getStateColor = (state: string) => {
|
||||
switch (state) {
|
||||
case 'Ready':
|
||||
return 'bg-green-500';
|
||||
case 'Backfilling':
|
||||
return 'bg-yellow-500';
|
||||
case 'CatchingUp':
|
||||
return 'bg-blue-500';
|
||||
case 'Uninitialized':
|
||||
return 'bg-ink-faint';
|
||||
case 'Paused':
|
||||
return 'bg-ink-dull';
|
||||
default:
|
||||
return 'bg-ink-faint';
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<motion.div
|
||||
className="overflow-y-auto no-scrollbar"
|
||||
initial={false}
|
||||
animate={{
|
||||
height: showActivityFeed
|
||||
? Math.min(sync.recentActivity.length * 40 + 16, 400)
|
||||
: Math.min(sync.peers.length * 80 + 16, 400),
|
||||
}}
|
||||
transition={{ duration: 0.2, ease: [0.25, 1, 0.5, 1] }}
|
||||
>
|
||||
{showActivityFeed ? (
|
||||
<ActivityFeed activities={sync.recentActivity} />
|
||||
) : (
|
||||
<PeerList peers={sync.peers} currentState={sync.currentState} />
|
||||
)}
|
||||
</motion.div>
|
||||
<>
|
||||
<div className="px-4 py-2 border-b border-app-line bg-app-box/50">
|
||||
<div className="flex items-center gap-2">
|
||||
<div className={`size-2 rounded-full ${getStateColor(sync.currentState)}`} />
|
||||
<span className="text-xs font-medium text-ink-dull">{sync.currentState}</span>
|
||||
</div>
|
||||
</div>
|
||||
<motion.div
|
||||
className="overflow-y-auto no-scrollbar"
|
||||
initial={false}
|
||||
animate={{
|
||||
height: showActivityFeed
|
||||
? Math.min(sync.recentActivity.length * 40 + 16, 400)
|
||||
: Math.min(sync.peers.length * 80 + 16, 400),
|
||||
}}
|
||||
transition={{ duration: 0.2, ease: [0.25, 1, 0.5, 1] }}
|
||||
>
|
||||
{showActivityFeed ? (
|
||||
<ActivityFeed activities={sync.recentActivity} />
|
||||
) : (
|
||||
<PeerList peers={sync.peers} currentState={sync.currentState} />
|
||||
)}
|
||||
</motion.div>
|
||||
</>
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user