fix: fix table names and add fields for collection and collection entry

This commit is contained in:
Jamie Pine
2025-10-19 12:14:29 -07:00
parent 8f4bf51639
commit e26ab63e08
4 changed files with 301 additions and 8 deletions

View File

@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "collections")]
#[sea_orm(table_name = "collection")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,

View File

@@ -1,9 +1,12 @@
use crate::infra::sync::{ChangeType, FKMapping, SharedChangeEntry, Syncable};
use chrono::{DateTime, Utc};
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::NotSet, Set};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "collection_entries")]
#[sea_orm(table_name = "collection_entry")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub collection_id: i32,
@@ -12,6 +15,11 @@ pub struct Model {
pub entry_id: i32,
pub added_at: DateTime<Utc>,
// Sync fields
pub uuid: Uuid,
pub version: i64,
pub updated_at: DateTime<Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -46,3 +54,170 @@ impl Related<super::entry::Entity> for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
// Syncable Implementation
//
// CollectionEntry is a SHARED M2M junction table linking collections to entries.
// Collections are shared resources, and entries are device-owned, but the relationships
// themselves are shared across devices using HLC-based replication.
impl Syncable for Model {
const SYNC_MODEL: &'static str = "collection_entry";
fn sync_id(&self) -> Uuid {
self.uuid
}
fn version(&self) -> i64 {
self.version
}
fn exclude_fields() -> Option<&'static [&'static str]> {
None // FK fields need to be present for UUID conversion
}
fn sync_depends_on() -> &'static [&'static str] {
&["collection", "entry"]
}
fn foreign_key_mappings() -> Vec<FKMapping> {
vec![
FKMapping::new("collection_id", "collection"),
FKMapping::new("entry_id", "entries"),
]
}
async fn query_for_sync(
_device_id: Option<Uuid>,
since: Option<DateTime<Utc>>,
batch_size: usize,
db: &DatabaseConnection,
) -> Result<Vec<(Uuid, serde_json::Value, DateTime<Utc>)>, sea_orm::DbErr> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
let mut query = Entity::find();
if let Some(since_time) = since {
query = query.filter(Column::UpdatedAt.gte(since_time));
}
query = query.limit(batch_size as u64);
let results = query.all(db).await?;
let mut sync_results = Vec::new();
for ce in results {
let mut json = match ce.to_sync_json() {
Ok(j) => j,
Err(e) => {
tracing::warn!(error = %e, uuid = %ce.uuid, "Failed to serialize collection_entry for sync");
continue;
}
};
// Convert FK integer IDs to UUIDs
for fk in <Model as Syncable>::foreign_key_mappings() {
if let Err(e) =
crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut json, &fk, db).await
{
tracing::warn!(
error = %e,
uuid = %ce.uuid,
"Failed to convert FK to UUID for collection_entry"
);
continue;
}
}
sync_results.push((ce.uuid, json, ce.updated_at));
}
Ok(sync_results)
}
async fn apply_shared_change(
entry: SharedChangeEntry,
db: &DatabaseConnection,
) -> Result<(), sea_orm::DbErr> {
match entry.change_type {
ChangeType::Insert | ChangeType::Update => {
// Map UUIDs to local IDs for FK fields
use crate::infra::sync::fk_mapper;
let data = fk_mapper::map_sync_json_to_local(entry.data, Self::foreign_key_mappings(), db)
.await
.map_err(|e| sea_orm::DbErr::Custom(format!("FK mapping failed: {}", e)))?;
let data = data.as_object().ok_or_else(|| {
sea_orm::DbErr::Custom("CollectionEntry data is not an object".to_string())
})?;
let uuid: Uuid = serde_json::from_value(
data.get("uuid")
.ok_or_else(|| sea_orm::DbErr::Custom("Missing uuid".to_string()))?
.clone(),
)
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid uuid: {}", e)))?;
let collection_id: i32 = serde_json::from_value(
data.get("collection_id")
.ok_or_else(|| sea_orm::DbErr::Custom("Missing collection_id".to_string()))?
.clone(),
)
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid collection_id: {}", e)))?;
let entry_id: i32 = serde_json::from_value(
data.get("entry_id")
.ok_or_else(|| sea_orm::DbErr::Custom("Missing entry_id".to_string()))?
.clone(),
)
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid entry_id: {}", e)))?;
let added_at: DateTime<Utc> = serde_json::from_value(
data.get("added_at")
.ok_or_else(|| sea_orm::DbErr::Custom("Missing added_at".to_string()))?
.clone(),
)
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid added_at: {}", e)))?;
let version: i64 = serde_json::from_value(
data.get("version")
.ok_or_else(|| sea_orm::DbErr::Custom("Missing version".to_string()))?
.clone(),
)
.map_err(|e| sea_orm::DbErr::Custom(format!("Invalid version: {}", e)))?;
let active = ActiveModel {
collection_id: Set(collection_id),
entry_id: Set(entry_id),
added_at: Set(added_at),
uuid: Set(uuid),
version: Set(version),
updated_at: Set(Utc::now()),
};
Entity::insert(active)
.on_conflict(
sea_orm::sea_query::OnConflict::column(Column::Uuid)
.update_columns([
Column::CollectionId,
Column::EntryId,
Column::AddedAt,
Column::Version,
Column::UpdatedAt,
])
.to_owned(),
)
.exec(db)
.await?;
}
ChangeType::Delete => {
Entity::delete_many()
.filter(Column::Uuid.eq(entry.record_uuid))
.exec(db)
.await?;
}
}
Ok(())
}
}

View File

@@ -14,6 +14,7 @@ mod m20251009_000001_add_sync_to_devices;
mod m20251015_000001_add_device_slug;
mod m20251015_000002_create_sync_tables;
mod m20251016_000001_add_cloud_identifier;
mod m20251019_000001_add_sync_to_m2m_tables;
pub struct Migrator;
@@ -33,6 +34,7 @@ impl MigratorTrait for Migrator {
Box::new(m20251015_000001_add_device_slug::Migration),
Box::new(m20251015_000002_create_sync_tables::Migration),
Box::new(m20251016_000001_add_cloud_identifier::Migration),
Box::new(m20251019_000001_add_sync_to_m2m_tables::Migration),
]
}
}

View File

@@ -161,7 +161,8 @@ pub async fn register_shared(
/// All domain-specific logic lives in the entity implementations, not here.
fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
use crate::infra::db::entities::{
collection, content_identity, device, entry, location, tag, user_metadata, volume,
collection, collection_entry, content_identity, device, entry, location, tag,
tag_relationship, user_metadata, user_metadata_tag, volume,
};
let mut registry = HashMap::new();
@@ -255,7 +256,7 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
"collection".to_string(),
SyncableModelRegistration::shared_with_query(
"collection",
"collections",
"collection",
|entry, db| {
Box::pin(async move {
collection::Model::apply_shared_change(entry, db.as_ref()).await
@@ -318,6 +319,79 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
),
);
// Many-to-many junction tables (shared models)
registry.insert(
"collection_entry".to_string(),
SyncableModelRegistration::shared_with_query(
"collection_entry",
"collection_entry",
|entry, db| {
Box::pin(async move {
collection_entry::Model::apply_shared_change(entry, db.as_ref()).await
})
},
|device_id, since, batch_size, db| {
Box::pin(async move {
collection_entry::Model::query_for_sync(
device_id,
since,
batch_size,
db.as_ref(),
)
.await
})
},
),
);
registry.insert(
"user_metadata_tag".to_string(),
SyncableModelRegistration::shared_with_query(
"user_metadata_tag",
"user_metadata_tag",
|entry, db| {
Box::pin(async move {
user_metadata_tag::Model::apply_shared_change(entry, db.as_ref()).await
})
},
|device_id, since, batch_size, db| {
Box::pin(async move {
user_metadata_tag::Model::query_for_sync(
device_id,
since,
batch_size,
db.as_ref(),
)
.await
})
},
),
);
registry.insert(
"tag_relationship".to_string(),
SyncableModelRegistration::shared_with_query(
"tag_relationship",
"tag_relationship",
|entry, db| {
Box::pin(async move {
tag_relationship::Model::apply_shared_change(entry, db.as_ref()).await
})
},
|device_id, since, batch_size, db| {
Box::pin(async move {
tag_relationship::Model::query_for_sync(
device_id,
since,
batch_size,
db.as_ref(),
)
.await
})
},
),
);
registry
}
@@ -539,7 +613,8 @@ pub enum ApplyError {
/// ```
pub async fn compute_registry_sync_order() -> Result<Vec<String>, super::DependencyError> {
use crate::infra::db::entities::{
collection, content_identity, device, entry, location, tag, user_metadata, volume,
collection, collection_entry, content_identity, device, entry, location, tag,
tag_relationship, user_metadata, user_metadata_tag, volume,
};
// Build iterator of (model_name, dependencies)
@@ -564,6 +639,18 @@ pub async fn compute_registry_sync_order() -> Result<Vec<String>, super::Depende
user_metadata::Model::SYNC_MODEL,
user_metadata::Model::sync_depends_on(),
),
(
collection_entry::Model::SYNC_MODEL,
collection_entry::Model::sync_depends_on(),
),
(
user_metadata_tag::Model::SYNC_MODEL,
user_metadata_tag::Model::sync_depends_on(),
),
(
tag_relationship::Model::SYNC_MODEL,
tag_relationship::Model::sync_depends_on(),
),
];
super::dependency_graph::compute_sync_order(models.into_iter())
@@ -615,8 +702,8 @@ mod tests {
async fn test_sync_order_computation() {
let order = compute_registry_sync_order().await.unwrap();
// Verify all models are present
assert_eq!(order.len(), 8);
// Verify all models are present (8 base + 3 M2M)
assert_eq!(order.len(), 11);
assert!(order.contains(&"device".to_string()));
assert!(order.contains(&"location".to_string()));
assert!(order.contains(&"volume".to_string()));
@@ -625,12 +712,21 @@ mod tests {
assert!(order.contains(&"collection".to_string()));
assert!(order.contains(&"content_identity".to_string()));
assert!(order.contains(&"user_metadata".to_string()));
assert!(order.contains(&"collection_entry".to_string()));
assert!(order.contains(&"user_metadata_tag".to_string()));
assert!(order.contains(&"tag_relationship".to_string()));
// Verify dependency ordering
let device_idx = order.iter().position(|m| m == "device").unwrap();
let location_idx = order.iter().position(|m| m == "location").unwrap();
let volume_idx = order.iter().position(|m| m == "volume").unwrap();
let entry_idx = order.iter().position(|m| m == "entry").unwrap();
let collection_idx = order.iter().position(|m| m == "collection").unwrap();
let collection_entry_idx = order.iter().position(|m| m == "collection_entry").unwrap();
let tag_idx = order.iter().position(|m| m == "tag").unwrap();
let user_metadata_idx = order.iter().position(|m| m == "user_metadata").unwrap();
let user_metadata_tag_idx = order.iter().position(|m| m == "user_metadata_tag").unwrap();
let tag_relationship_idx = order.iter().position(|m| m == "tag_relationship").unwrap();
// Device must come before location
assert!(
@@ -644,6 +740,26 @@ mod tests {
// Location must come before entry
assert!(location_idx < entry_idx, "location must sync before entry");
// Tag, collection, content_identity, and user_metadata have no dependencies
// M2M dependencies
assert!(
collection_idx < collection_entry_idx,
"collection must sync before collection_entry"
);
assert!(
entry_idx < collection_entry_idx,
"entry must sync before collection_entry"
);
assert!(
user_metadata_idx < user_metadata_tag_idx,
"user_metadata must sync before user_metadata_tag"
);
assert!(
tag_idx < user_metadata_tag_idx,
"tag must sync before user_metadata_tag"
);
assert!(
tag_idx < tag_relationship_idx,
"tag must sync before tag_relationship"
);
}
}