From e26ab63e08cd21665575cd5b031eba547f359250 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 19 Oct 2025 12:14:29 -0700 Subject: [PATCH] fix: fix table names and add fields for collection and collection entry --- core/src/infra/db/entities/collection.rs | 2 +- .../src/infra/db/entities/collection_entry.rs | 177 +++++++++++++++++- core/src/infra/db/migration/mod.rs | 2 + core/src/infra/sync/registry.rs | 128 ++++++++++++- 4 files changed, 301 insertions(+), 8 deletions(-) diff --git a/core/src/infra/db/entities/collection.rs b/core/src/infra/db/entities/collection.rs index 21894f5da..c7acf301b 100644 --- a/core/src/infra/db/entities/collection.rs +++ b/core/src/infra/db/entities/collection.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] -#[sea_orm(table_name = "collections")] +#[sea_orm(table_name = "collection")] pub struct Model { #[sea_orm(primary_key)] pub id: i32, diff --git a/core/src/infra/db/entities/collection_entry.rs b/core/src/infra/db/entities/collection_entry.rs index bba26e28f..d4cbbbe7b 100644 --- a/core/src/infra/db/entities/collection_entry.rs +++ b/core/src/infra/db/entities/collection_entry.rs @@ -1,9 +1,12 @@ +use crate::infra::sync::{ChangeType, FKMapping, SharedChangeEntry, Syncable}; use chrono::{DateTime, Utc}; use sea_orm::entity::prelude::*; +use sea_orm::{ActiveValue::NotSet, Set}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] -#[sea_orm(table_name = "collection_entries")] +#[sea_orm(table_name = "collection_entry")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub collection_id: i32, @@ -12,6 +15,11 @@ pub struct Model { pub entry_id: i32, pub added_at: DateTime, + + // Sync fields + pub uuid: Uuid, + pub version: i64, + pub updated_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -46,3 +54,170 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +// Syncable Implementation +// +// CollectionEntry is a SHARED M2M junction table linking collections to entries. +// Collections are shared resources, and entries are device-owned, but the relationships +// themselves are shared across devices using HLC-based replication. +impl Syncable for Model { + const SYNC_MODEL: &'static str = "collection_entry"; + + fn sync_id(&self) -> Uuid { + self.uuid + } + + fn version(&self) -> i64 { + self.version + } + + fn exclude_fields() -> Option<&'static [&'static str]> { + None // FK fields need to be present for UUID conversion + } + + fn sync_depends_on() -> &'static [&'static str] { + &["collection", "entry"] + } + + fn foreign_key_mappings() -> Vec { + vec![ + FKMapping::new("collection_id", "collection"), + FKMapping::new("entry_id", "entries"), + ] + } + + async fn query_for_sync( + _device_id: Option, + since: Option>, + batch_size: usize, + db: &DatabaseConnection, + ) -> Result)>, sea_orm::DbErr> { + use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; + + let mut query = Entity::find(); + + if let Some(since_time) = since { + query = query.filter(Column::UpdatedAt.gte(since_time)); + } + + query = query.limit(batch_size as u64); + + let results = query.all(db).await?; + + let mut sync_results = Vec::new(); + for ce in results { + let mut json = match ce.to_sync_json() { + Ok(j) => j, + Err(e) => { + tracing::warn!(error = %e, uuid = %ce.uuid, "Failed to serialize collection_entry for sync"); + continue; + } + }; + + // Convert FK integer IDs to UUIDs + for fk in ::foreign_key_mappings() { + if let Err(e) = + crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await + { + tracing::warn!( + error = %e, + uuid = %ce.uuid, + "Failed to convert FK to UUID for collection_entry" + ); + continue; + } + } + + sync_results.push((ce.uuid, json, ce.updated_at)); + } + + Ok(sync_results) + } + + async fn apply_shared_change( + entry: SharedChangeEntry, + db: &DatabaseConnection, + ) -> Result<(), sea_orm::DbErr> { + match entry.change_type { + ChangeType::Insert | ChangeType::Update => { + // Map UUIDs to local IDs for FK fields + use crate::infra::sync::fk_mapper; + let data = fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db) + .await + .map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?; + + let data = data.as_object().ok_or_else(|| { + sea_orm::DbErr::Custom("CollectionEntry data is not an object".to_string()) + })?; + + let uuid: Uuid = serde_json::from_value( + data.get("uuid") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?; + + let collection_id: i32 = serde_json::from_value( + data.get("collection_id") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing collection_id".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid collection_id: {}", e)))?; + + let entry_id: i32 = serde_json::from_value( + data.get("entry_id") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing entry_id".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid entry_id: {}", e)))?; + + let added_at: DateTime = serde_json::from_value( + data.get("added_at") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing added_at".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid added_at: {}", e)))?; + + let version: i64 = serde_json::from_value( + data.get("version") + .ok_or_else(|| sea_orm::DbErr::Custom("Missing version".to_string()))? + .clone(), + ) + .map_err(|e| sea_orm::DbErr::Custom(format!("Invalid version: {}", e)))?; + + let active = ActiveModel { + collection_id: Set(collection_id), + entry_id: Set(entry_id), + added_at: Set(added_at), + uuid: Set(uuid), + version: Set(version), + updated_at: Set(Utc::now()), + }; + + Entity::insert(active) + .on_conflict( + sea_orm::sea_query::OnConflict::column(Column::Uuid) + .update_columns([ + Column::CollectionId, + Column::EntryId, + Column::AddedAt, + Column::Version, + Column::UpdatedAt, + ]) + .to_owned(), + ) + .exec(db) + .await?; + } + + ChangeType::Delete => { + Entity::delete_many() + .filter(Column::Uuid.eq(entry.record_uuid)) + .exec(db) + .await?; + } + } + + Ok(()) + } +} diff --git a/core/src/infra/db/migration/mod.rs b/core/src/infra/db/migration/mod.rs index 785a2d2e0..ca5227d08 100644 --- a/core/src/infra/db/migration/mod.rs +++ b/core/src/infra/db/migration/mod.rs @@ -14,6 +14,7 @@ mod m20251009_000001_add_sync_to_devices; mod m20251015_000001_add_device_slug; mod m20251015_000002_create_sync_tables; mod m20251016_000001_add_cloud_identifier; +mod m20251019_000001_add_sync_to_m2m_tables; pub struct Migrator; @@ -33,6 +34,7 @@ impl MigratorTrait for Migrator { Box::new(m20251015_000001_add_device_slug::Migration), Box::new(m20251015_000002_create_sync_tables::Migration), Box::new(m20251016_000001_add_cloud_identifier::Migration), + Box::new(m20251019_000001_add_sync_to_m2m_tables::Migration), ] } } diff --git a/core/src/infra/sync/registry.rs b/core/src/infra/sync/registry.rs index c55054a96..2819ceab9 100644 --- a/core/src/infra/sync/registry.rs +++ b/core/src/infra/sync/registry.rs @@ -161,7 +161,8 @@ pub async fn register_shared( /// All domain-specific logic lives in the entity implementations, not here. fn initialize_registry() -> HashMap { use crate::infra::db::entities::{ - collection, content_identity, device, entry, location, tag, user_metadata, volume, + collection, collection_entry, content_identity, device, entry, location, tag, + tag_relationship, user_metadata, user_metadata_tag, volume, }; let mut registry = HashMap::new(); @@ -255,7 +256,7 @@ fn initialize_registry() -> HashMap { "collection".to_string(), SyncableModelRegistration::shared_with_query( "collection", - "collections", + "collection", |entry, db| { Box::pin(async move { collection::Model::apply_shared_change(entry, db.as_ref()).await @@ -318,6 +319,79 @@ fn initialize_registry() -> HashMap { ), ); + // Many-to-many junction tables (shared models) + registry.insert( + "collection_entry".to_string(), + SyncableModelRegistration::shared_with_query( + "collection_entry", + "collection_entry", + |entry, db| { + Box::pin(async move { + collection_entry::Model::apply_shared_change(entry, db.as_ref()).await + }) + }, + |device_id, since, batch_size, db| { + Box::pin(async move { + collection_entry::Model::query_for_sync( + device_id, + since, + batch_size, + db.as_ref(), + ) + .await + }) + }, + ), + ); + + registry.insert( + "user_metadata_tag".to_string(), + SyncableModelRegistration::shared_with_query( + "user_metadata_tag", + "user_metadata_tag", + |entry, db| { + Box::pin(async move { + user_metadata_tag::Model::apply_shared_change(entry, db.as_ref()).await + }) + }, + |device_id, since, batch_size, db| { + Box::pin(async move { + user_metadata_tag::Model::query_for_sync( + device_id, + since, + batch_size, + db.as_ref(), + ) + .await + }) + }, + ), + ); + + registry.insert( + "tag_relationship".to_string(), + SyncableModelRegistration::shared_with_query( + "tag_relationship", + "tag_relationship", + |entry, db| { + Box::pin(async move { + tag_relationship::Model::apply_shared_change(entry, db.as_ref()).await + }) + }, + |device_id, since, batch_size, db| { + Box::pin(async move { + tag_relationship::Model::query_for_sync( + device_id, + since, + batch_size, + db.as_ref(), + ) + .await + }) + }, + ), + ); + registry } @@ -539,7 +613,8 @@ pub enum ApplyError { /// ``` pub async fn compute_registry_sync_order() -> Result, super::DependencyError> { use crate::infra::db::entities::{ - collection, content_identity, device, entry, location, tag, user_metadata, volume, + collection, collection_entry, content_identity, device, entry, location, tag, + tag_relationship, user_metadata, user_metadata_tag, volume, }; // Build iterator of (model_name, dependencies) @@ -564,6 +639,18 @@ pub async fn compute_registry_sync_order() -> Result, super::Depende user_metadata::Model::SYNC_MODEL, user_metadata::Model::sync_depends_on(), ), + ( + collection_entry::Model::SYNC_MODEL, + collection_entry::Model::sync_depends_on(), + ), + ( + user_metadata_tag::Model::SYNC_MODEL, + user_metadata_tag::Model::sync_depends_on(), + ), + ( + tag_relationship::Model::SYNC_MODEL, + tag_relationship::Model::sync_depends_on(), + ), ]; super::dependency_graph::compute_sync_order(models.into_iter()) @@ -615,8 +702,8 @@ mod tests { async fn test_sync_order_computation() { let order = compute_registry_sync_order().await.unwrap(); - // Verify all models are present - assert_eq!(order.len(), 8); + // Verify all models are present (8 base + 3 M2M) + assert_eq!(order.len(), 11); assert!(order.contains(&"device".to_string())); assert!(order.contains(&"location".to_string())); assert!(order.contains(&"volume".to_string())); @@ -625,12 +712,21 @@ mod tests { assert!(order.contains(&"collection".to_string())); assert!(order.contains(&"content_identity".to_string())); assert!(order.contains(&"user_metadata".to_string())); + assert!(order.contains(&"collection_entry".to_string())); + assert!(order.contains(&"user_metadata_tag".to_string())); + assert!(order.contains(&"tag_relationship".to_string())); // Verify dependency ordering let device_idx = order.iter().position(|m| m == "device").unwrap(); let location_idx = order.iter().position(|m| m == "location").unwrap(); let volume_idx = order.iter().position(|m| m == "volume").unwrap(); let entry_idx = order.iter().position(|m| m == "entry").unwrap(); + let collection_idx = order.iter().position(|m| m == "collection").unwrap(); + let collection_entry_idx = order.iter().position(|m| m == "collection_entry").unwrap(); + let tag_idx = order.iter().position(|m| m == "tag").unwrap(); + let user_metadata_idx = order.iter().position(|m| m == "user_metadata").unwrap(); + let user_metadata_tag_idx = order.iter().position(|m| m == "user_metadata_tag").unwrap(); + let tag_relationship_idx = order.iter().position(|m| m == "tag_relationship").unwrap(); // Device must come before location assert!( @@ -644,6 +740,26 @@ mod tests { // Location must come before entry assert!(location_idx < entry_idx, "location must sync before entry"); - // Tag, collection, content_identity, and user_metadata have no dependencies + // M2M dependencies + assert!( + collection_idx < collection_entry_idx, + "collection must sync before collection_entry" + ); + assert!( + entry_idx < collection_entry_idx, + "entry must sync before collection_entry" + ); + assert!( + user_metadata_idx < user_metadata_tag_idx, + "user_metadata must sync before user_metadata_tag" + ); + assert!( + tag_idx < user_metadata_tag_idx, + "tag must sync before user_metadata_tag" + ); + assert!( + tag_idx < tag_relationship_idx, + "tag must sync before tag_relationship" + ); } }