From a33ca1e884f03abdee626d9df5b5d2fc48c80e90 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 16 Nov 2025 04:56:41 -0800 Subject: [PATCH] feat: enhance synchronization with per-resource watermark tracking and batching configuration - Added support for per-resource watermarks in the synchronization process, allowing for fine-grained comparison and catch-up logic. - Implemented new configuration options for real-time batching, including maximum entries before flush and flush interval in milliseconds. - Enhanced the `apply_state_change` function to check for tombstoned records, preventing the resurrection of deleted entries. - Updated various components to utilize the new watermark and batching features, improving synchronization efficiency and robustness. --- .cspell/project_words.txt | 1 + core/src/infra/sync/config.rs | 20 ++ core/src/infra/sync/peer_log.rs | 27 ++- core/src/infra/sync/registry.rs | 107 +++++++-- core/src/infra/sync/transaction.rs | 2 - core/src/infra/sync/watermarks.rs | 43 ++++ .../service/network/protocol/sync/handler.rs | 68 ++++-- .../service/network/protocol/sync/messages.rs | 6 +- core/src/service/sync/dependency.rs | 20 ++ core/src/service/sync/peer.rs | 206 +++++++++++++----- core/tests/helpers/sync_transport.rs | 35 ++- core/tests/sync_realtime_integration_test.rs | 8 +- 12 files changed, 420 insertions(+), 123 deletions(-) diff --git a/.cspell/project_words.txt b/.cspell/project_words.txt index 6502a3706..3e9dcae85 100644 --- a/.cspell/project_words.txt +++ b/.cspell/project_words.txt @@ -75,6 +75,7 @@ Syncable thumbstrips tobiaslutke tokio +tombstoned typecheck unwatch uuid diff --git a/core/src/infra/sync/config.rs b/core/src/infra/sync/config.rs index bb6d7c66b..2d3020607 100644 --- a/core/src/infra/sync/config.rs +++ b/core/src/infra/sync/config.rs @@ -48,6 +48,8 @@ impl SyncConfig { state_broadcast_batch_size: 500, shared_broadcast_batch_size: 50, max_snapshot_size: 50_000, + realtime_batch_max_entries: 50, + realtime_batch_flush_interval_ms: 25, }, retention: RetentionConfig { strategy: PruningStrategy::AcknowledgmentBased, @@ -83,6 +85,8 @@ impl SyncConfig { state_broadcast_batch_size: 2_000, shared_broadcast_batch_size: 200, max_snapshot_size: 200_000, + realtime_batch_max_entries: 200, + realtime_batch_flush_interval_ms: 100, }, retention: RetentionConfig { strategy: PruningStrategy::Conservative { @@ -120,6 +124,8 @@ impl SyncConfig { state_broadcast_batch_size: 500, shared_broadcast_batch_size: 50, max_snapshot_size: 50_000, + realtime_batch_max_entries: 50, + realtime_batch_flush_interval_ms: 100, }, retention: RetentionConfig { strategy: PruningStrategy::TimeBased { retention_days: 14 }, @@ -168,6 +174,18 @@ pub struct BatchingConfig { /// Used for: SharedChangeResponse.current_state limit /// Default: 100,000 pub max_snapshot_size: usize, + + /// Real-time batching: maximum entries before flush + /// + /// Used for: Event listener batching in peer.rs + /// Default: 100 + pub realtime_batch_max_entries: usize, + + /// Real-time batching: flush interval in milliseconds + /// + /// Used for: Event listener batching in peer.rs + /// Default: 50ms + pub realtime_batch_flush_interval_ms: u64, } impl Default for BatchingConfig { @@ -177,6 +195,8 @@ impl Default for BatchingConfig { state_broadcast_batch_size: 1_000, shared_broadcast_batch_size: 100, max_snapshot_size: 100_000, + realtime_batch_max_entries: 100, + realtime_batch_flush_interval_ms: 50, } } } diff --git a/core/src/infra/sync/peer_log.rs b/core/src/infra/sync/peer_log.rs index d0f304501..35a8a761f 100644 --- a/core/src/infra/sync/peer_log.rs +++ b/core/src/infra/sync/peer_log.rs @@ -260,9 +260,32 @@ impl PeerLog { /// Get the minimum HLC that all peers have acknowledged /// /// Excludes self-ACKs (where peer_device_id == our device_id) from calculation. - /// Self-ACKs should never exist, but filtering them defensively prevents stale - /// self-ACKs from blocking pruning. + /// Self-ACKs should never exist - if detected, a warning is logged. async fn get_min_acked_hlc(&self) -> Result, PeerLogError> { + // Check for self-ACKs (should never exist) + let self_ack_check = self + .conn + .query_one(Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT COUNT(*) as count FROM peer_acks WHERE peer_device_id = ?", + vec![self.device_id.to_string().into()], + )) + .await + .map_err(|e| PeerLogError::QueryError(e.to_string()))?; + + if let Some(row) = self_ack_check { + let count: i64 = row + .try_get("", "count") + .map_err(|e| PeerLogError::QueryError(e.to_string()))?; + if count > 0 { + tracing::warn!( + device_id = %self.device_id, + self_ack_count = count, + "Self-ACKs detected in peer_acks table (should never exist). This indicates a bug in ACK handling." + ); + } + } + let result = self .conn .query_one(Statement::from_sql_and_values( diff --git a/core/src/infra/sync/registry.rs b/core/src/infra/sync/registry.rs index 0dcd62b96..3746a8ec3 100644 --- a/core/src/infra/sync/registry.rs +++ b/core/src/infra/sync/registry.rs @@ -31,10 +31,10 @@ pub type SharedApplyFn = fn( /// Parameters: device_id, since, batch_size, db /// Returns: Vec of (uuid, data, timestamp) pub type StateQueryFn = fn( - Option, // device_id filter - Option>, // since watermark - Option<(chrono::DateTime, uuid::Uuid)>, // cursor for pagination - usize, // batch_size + Option, // device_id filter + Option>, // since watermark + Option<(chrono::DateTime, uuid::Uuid)>, // cursor for pagination + usize, // batch_size Arc, ) -> Pin< Box< @@ -156,7 +156,9 @@ pub async fn register_device_owned( let mut registry = SYNCABLE_REGISTRY.write().await; registry.insert( model_type.to_string(), - SyncableModelRegistration::device_owned(model_type, table_name, apply_fn, query_fn, delete_fn), + SyncableModelRegistration::device_owned( + model_type, table_name, apply_fn, query_fn, delete_fn, + ), ); } @@ -179,8 +181,8 @@ pub async fn register_shared( /// All domain-specific logic lives in the entity implementations, not here. fn initialize_registry() -> HashMap { use crate::infra::db::entities::{ - audit_log, collection, collection_entry, content_identity, device, entry, location, sidecar, - tag, tag_relationship, user_metadata, user_metadata_tag, volume, + audit_log, collection, collection_entry, content_identity, device, entry, location, + sidecar, tag, tag_relationship, user_metadata, user_metadata_tag, volume, }; let mut registry = HashMap::new(); @@ -199,7 +201,14 @@ fn initialize_registry() -> HashMap { }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - location::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await + location::Model::query_for_sync( + device_id, + since, + cursor, + batch_size, + db.as_ref(), + ) + .await }) }, Some(|uuid, db| { @@ -218,7 +227,8 @@ fn initialize_registry() -> HashMap { }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - volume::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await + volume::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()) + .await }) }, Some(|uuid, db| { @@ -237,7 +247,8 @@ fn initialize_registry() -> HashMap { }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - entry::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await + entry::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()) + .await }) }, Some(|uuid, db| { @@ -256,7 +267,8 @@ fn initialize_registry() -> HashMap { }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - device::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await + device::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()) + .await }) }, None, // Devices don't support deletion sync @@ -274,7 +286,8 @@ fn initialize_registry() -> HashMap { }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - tag::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await + tag::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()) + .await }) }, ), @@ -286,14 +299,20 @@ fn initialize_registry() -> HashMap { "collection", "collection", |entry, db| { - Box::pin(async move { - collection::Model::apply_shared_change(entry, db.as_ref()).await - }) + Box::pin( + async move { collection::Model::apply_shared_change(entry, db.as_ref()).await }, + ) }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { - collection::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()) - .await + collection::Model::query_for_sync( + device_id, + since, + cursor, + batch_size, + db.as_ref(), + ) + .await }) }, ), @@ -431,9 +450,9 @@ fn initialize_registry() -> HashMap { "audit_log", "audit_log", |entry, db| { - Box::pin(async move { - audit_log::Model::apply_shared_change(entry, db.as_ref()).await - }) + Box::pin( + async move { audit_log::Model::apply_shared_change(entry, db.as_ref()).await }, + ) }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { @@ -456,9 +475,9 @@ fn initialize_registry() -> HashMap { "sidecar", "sidecars", |entry, db| { - Box::pin(async move { - sidecar::Model::apply_shared_change(entry, db.as_ref()).await - }) + Box::pin( + async move { sidecar::Model::apply_shared_change(entry, db.as_ref()).await }, + ) }, |device_id, since, cursor, batch_size, db| { Box::pin(async move { @@ -534,11 +553,48 @@ pub fn get_fk_mappings(model_type: &str) -> Option> { /// Apply a state-based sync entry (device-owned model) /// /// Routes to the appropriate model's apply_state_change function via registry. +/// Enforces tombstone check to prevent resurrection of deleted records. pub async fn apply_state_change( model_type: &str, data: serde_json::Value, db: Arc, ) -> Result<(), ApplyError> { + use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; + + // Extract UUID from data for tombstone check + let record_uuid = data + .get("uuid") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + .ok_or_else(|| { + ApplyError::DatabaseError( + "Missing or invalid UUID in state change data".to_string(), + ) + })?; + + // Check if record is tombstoned (prevents resurrection of deleted records) + let is_tombstoned = crate::infra::db::entities::device_state_tombstone::Entity::find() + .filter( + crate::infra::db::entities::device_state_tombstone::Column::ModelType.eq(model_type), + ) + .filter( + crate::infra::db::entities::device_state_tombstone::Column::RecordUuid + .eq(record_uuid), + ) + .one(db.as_ref()) + .await + .map_err(|e| ApplyError::DatabaseError(e.to_string()))? + .is_some(); + + if is_tombstoned { + tracing::debug!( + model_type = %model_type, + uuid = %record_uuid, + "Skipping state change for tombstoned record (prevents resurrection)" + ); + return Ok(()); + } + let apply_fn = { let registry = SYNCABLE_REGISTRY.read().await; let registration = registry @@ -681,7 +737,10 @@ pub async fn query_all_shared_models( since: Option>, batch_size: usize, db: Arc, -) -> Result)>>, ApplyError> { +) -> Result< + HashMap)>>, + ApplyError, +> { // Collect all shared models with query functions let shared_models: Vec<(String, StateQueryFn)> = { let registry = SYNCABLE_REGISTRY.read().await; diff --git a/core/src/infra/sync/transaction.rs b/core/src/infra/sync/transaction.rs index 6af9abcf7..ae8e073d7 100644 --- a/core/src/infra/sync/transaction.rs +++ b/core/src/infra/sync/transaction.rs @@ -70,7 +70,6 @@ pub struct TransactionManager { event_bus: Arc, /// Current sequence number per library (library_id -> sequence) - /// TODO: Replace with HLC in leaderless architecture sync_sequence: Arc>>, } @@ -254,7 +253,6 @@ impl TransactionManager { } /// Get the next sequence number for a library - /// TODO: Replace with HLC in leaderless architecture async fn next_sequence(&self, library_id: Uuid) -> Result { let mut sequences = self.sync_sequence.lock().await; let seq = sequences.entry(library_id).or_insert(0); diff --git a/core/src/infra/sync/watermarks.rs b/core/src/infra/sync/watermarks.rs index 12a3803db..9fcab248e 100644 --- a/core/src/infra/sync/watermarks.rs +++ b/core/src/infra/sync/watermarks.rs @@ -256,6 +256,49 @@ impl ResourceWatermarkStore { Ok(result.rows_affected() as usize) } + + /// Get our latest watermark for each resource type (aggregated across all peers) + /// + /// Returns a HashMap mapping resource_type -> max(last_watermark) across all peers. + /// This represents what we've successfully received from our peers. + pub async fn get_our_resource_watermarks( + &self, + conn: &C, + ) -> Result>, WatermarkError> { + let rows = conn + .query_all(Statement::from_sql_and_values( + DbBackend::Sqlite, + r#" + SELECT resource_type, MAX(last_watermark) as max_watermark + FROM device_resource_watermarks + WHERE device_uuid = ? + GROUP BY resource_type + ORDER BY resource_type + "#, + vec![self.device_uuid.to_string().into()], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let mut results = std::collections::HashMap::new(); + for row in rows { + let resource_type: String = row + .try_get("", "resource_type") + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let watermark_str: String = row + .try_get("", "max_watermark") + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let dt = DateTime::parse_from_rfc3339(&watermark_str) + .map_err(|e| WatermarkError::ParseError(e.to_string()))? + .with_timezone(&Utc); + + results.insert(resource_type, dt); + } + + Ok(results) + } } /// Watermark errors diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index 43e8d16b8..d55649df1 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -246,9 +246,9 @@ impl SyncProtocolHandler { // Create checkpoint: "timestamp|uuid" format let next_checkpoint = if has_more { - records.last().map(|r| { - format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid) - }) + records + .last() + .map(|r| format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid)) } else { None }; @@ -397,25 +397,39 @@ impl SyncProtocolHandler { SyncMessage::WatermarkExchangeRequest { library_id, device_id, - my_state_watermark: peer_state_watermark, my_shared_watermark: peer_shared_watermark, + my_resource_watermarks: peer_resource_watermarks, } => { debug!( from_device = %from_device, - peer_state_watermark = ?peer_state_watermark, peer_shared_watermark = ?peer_shared_watermark, - "Processing WatermarkExchangeRequest" + peer_resource_count = peer_resource_watermarks.len(), + "Processing WatermarkExchangeRequest with per-resource watermarks" ); // Get our current watermarks - let (our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await; + let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await; + let our_resource_watermarks = + crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id()) + .get_our_resource_watermarks(peer_sync.peer_log_conn()) + .await + .unwrap_or_default(); - // Determine if peer needs catch-up by comparing watermarks - let needs_state_catchup = match (peer_state_watermark, our_state_watermark) { - (Some(peer_ts), Some(our_ts)) => our_ts > peer_ts, - (None, Some(_)) => true, - _ => false, - }; + // Determine if peer needs catch-up by comparing per-resource watermarks + let mut needs_state_catchup = false; + for (resource_type, our_ts) in &our_resource_watermarks { + match peer_resource_watermarks.get(resource_type) { + Some(peer_ts) if our_ts > peer_ts => { + needs_state_catchup = true; + break; + } + None => { + needs_state_catchup = true; + break; + } + _ => {} + } + } let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) { (Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc, @@ -427,43 +441,44 @@ impl SyncProtocolHandler { from_device = %from_device, needs_state_catchup = needs_state_catchup, needs_shared_catchup = needs_shared_catchup, - "Responding to watermark exchange request" + our_resource_count = our_resource_watermarks.len(), + "Responding to watermark exchange request with per-resource watermarks" ); Ok(Some(SyncMessage::WatermarkExchangeResponse { library_id: self.library_id, device_id: peer_sync.device_id(), - state_watermark: our_state_watermark, shared_watermark: our_shared_watermark, needs_state_catchup, needs_shared_catchup, + resource_watermarks: our_resource_watermarks, })) } SyncMessage::WatermarkExchangeResponse { library_id, device_id, - state_watermark: peer_state_watermark, shared_watermark: peer_shared_watermark, needs_state_catchup, needs_shared_catchup, + resource_watermarks: peer_resource_watermarks, } => { debug!( from_device = %from_device, - peer_state_watermark = ?peer_state_watermark, peer_shared_watermark = ?peer_shared_watermark, needs_state_catchup = needs_state_catchup, needs_shared_catchup = needs_shared_catchup, - "Processing WatermarkExchangeResponse" + peer_resource_count = peer_resource_watermarks.len(), + "Processing WatermarkExchangeResponse with per-resource watermarks" ); peer_sync .on_watermark_exchange_response( from_device, - peer_state_watermark, peer_shared_watermark, needs_state_catchup, needs_shared_catchup, + peer_resource_watermarks, ) .await .map_err(|e| { @@ -509,7 +524,10 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler ) { use tokio::io::{AsyncReadExt, AsyncWriteExt}; - tracing::info!("SyncProtocolHandler: Stream accepted from node {}", remote_node_id); + tracing::info!( + "SyncProtocolHandler: Stream accepted from node {}", + remote_node_id + ); // Map node_id to device_id using device registry let from_device = { @@ -532,7 +550,10 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler }; // Read request with length prefix - tracing::info!("SyncProtocolHandler: Reading request from device {}...", from_device); + tracing::info!( + "SyncProtocolHandler: Reading request from device {}...", + from_device + ); let mut len_buf = [0u8; 4]; if let Err(e) = recv.read_exact(&mut len_buf).await { // This is normal if peer just opened connection to test connectivity @@ -661,13 +682,14 @@ mod tests { #[test] fn test_handler_creation() { // Test uses mock registry - use crate::service::network::device::DeviceRegistry; use crate::device::DeviceManager; + use crate::service::network::device::DeviceRegistry; use std::path::PathBuf; let device_manager = Arc::new(DeviceManager::new().unwrap()); let logger = Arc::new(crate::service::network::utils::SilentLogger); - let registry = DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap(); + let registry = + DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap(); let device_registry = Arc::new(tokio::sync::RwLock::new(registry)); let handler = SyncProtocolHandler::new(Uuid::new_v4(), device_registry); diff --git a/core/src/service/network/protocol/sync/messages.rs b/core/src/service/network/protocol/sync/messages.rs index fa6ea8ba7..acc303b6b 100644 --- a/core/src/service/network/protocol/sync/messages.rs +++ b/core/src/service/network/protocol/sync/messages.rs @@ -103,18 +103,20 @@ pub enum SyncMessage { WatermarkExchangeRequest { library_id: Uuid, device_id: Uuid, // Requesting device - my_state_watermark: Option>, my_shared_watermark: Option, + /// Per-resource state watermarks (resource_type -> timestamp) + my_resource_watermarks: std::collections::HashMap>, }, /// Response with peer's watermarks WatermarkExchangeResponse { library_id: Uuid, device_id: Uuid, // Responding device - state_watermark: Option>, shared_watermark: Option, needs_state_catchup: bool, // If true, peer needs our state needs_shared_catchup: bool, // If true, peer needs our shared changes + /// Per-resource state watermarks (resource_type -> timestamp) + resource_watermarks: std::collections::HashMap>, }, /// Error response diff --git a/core/src/service/sync/dependency.rs b/core/src/service/sync/dependency.rs index 272e33437..dfec47369 100644 --- a/core/src/service/sync/dependency.rs +++ b/core/src/service/sync/dependency.rs @@ -108,6 +108,26 @@ impl DependencyTracker { pub async fn dependency_count(&self) -> usize { self.waiting_for.read().await.len() } + + /// Get all pending dependency UUIDs (for requesting from peers) + /// + /// Returns a list of UUIDs that are currently blocking updates. + /// These can be requested from peers to resolve stuck dependencies. + pub async fn get_pending_dependency_uuids(&self) -> Vec { + let waiting = self.waiting_for.read().await; + waiting.keys().copied().collect() + } + + /// Clear all pending dependencies (timeout/force sync fallback) + /// + /// Call this as a last resort when dependencies cannot be resolved. + /// Returns the number of dependencies cleared. + pub async fn clear_all(&self) -> usize { + let mut waiting = self.waiting_for.write().await; + let count = waiting.len(); + waiting.clear(); + count + } } impl Default for DependencyTracker { diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 87f729e59..dacb1043b 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -251,6 +251,11 @@ impl PeerSync { self.library_id } + /// Get peer log connection for watermark queries + pub fn peer_log_conn(&self) -> &sea_orm::DatabaseConnection { + self.peer_log.conn() + } + /// Check if real-time sync is currently active (lock mechanism) /// /// Returns true if real-time broadcasts happened in the last 60 seconds. @@ -448,21 +453,31 @@ impl PeerSync { ); // Get our watermarks - let (my_state_watermark, my_shared_watermark) = self.get_watermarks().await; + let (_my_state_watermark, my_shared_watermark) = self.get_watermarks().await; + + // Get per-resource watermarks for fine-grained comparison + let my_resource_watermarks = + crate::infra::sync::ResourceWatermarkStore::new(self.device_id) + .get_our_resource_watermarks(self.peer_log.conn()) + .await + .unwrap_or_else(|e| { + warn!(error = %e, "Failed to get per-resource watermarks"); + std::collections::HashMap::new() + }); debug!( peer = %peer_id, - my_state_watermark = ?my_state_watermark, my_shared_watermark = ?my_shared_watermark, - "Sending watermark exchange request" + resource_count = my_resource_watermarks.len(), + "Sending watermark exchange request with per-resource watermarks" ); // Send request to peer let request = SyncMessage::WatermarkExchangeRequest { library_id: self.library_id, device_id: self.device_id, - my_state_watermark, my_shared_watermark, + my_resource_watermarks, }; self.network @@ -485,47 +500,75 @@ impl PeerSync { pub async fn on_watermark_exchange_response( &self, peer_id: Uuid, - peer_state_watermark: Option>, peer_shared_watermark: Option, needs_state_catchup: bool, needs_shared_catchup: bool, + peer_resource_watermarks: std::collections::HashMap>, ) -> Result<()> { info!( peer = %peer_id, - peer_state_watermark = ?peer_state_watermark, peer_shared_watermark = ?peer_shared_watermark, needs_state_catchup = needs_state_catchup, needs_shared_catchup = needs_shared_catchup, - "Received watermark exchange response" + peer_resource_count = peer_resource_watermarks.len(), + "Received watermark exchange response with per-resource watermarks" ); // Get our watermarks to compare - let (my_state_watermark, my_shared_watermark) = self.get_watermarks().await; + let (_my_state_watermark, my_shared_watermark) = self.get_watermarks().await; + let my_resource_watermarks = + crate::infra::sync::ResourceWatermarkStore::new(self.device_id) + .get_our_resource_watermarks(self.peer_log.conn()) + .await + .unwrap_or_default(); - // Determine if WE need to catch up based on watermark comparison + // Determine if WE need to catch up based on per-resource watermark comparison let mut we_need_state_catchup = false; let mut we_need_shared_catchup = false; + let mut resources_needing_catchup = Vec::new(); - // Compare state watermarks (timestamps) - match (my_state_watermark, peer_state_watermark) { - (Some(my_ts), Some(peer_ts)) if peer_ts > my_ts => { - info!( - peer = %peer_id, - my_timestamp = %my_ts, - peer_timestamp = %peer_ts, - "Peer has newer state, need to catch up" - ); - we_need_state_catchup = true; - } - (None, Some(_)) => { - info!(peer = %peer_id, "We have no state watermark, need full state catch-up"); - we_need_state_catchup = true; - } - _ => { - debug!(peer = %peer_id, "State watermarks in sync"); + // Compare per-resource watermarks (CRITICAL FIX: Issue #10) + // This fixes the bug where global watermark comparison missed per-resource divergence + for (resource_type, peer_ts) in &peer_resource_watermarks { + match my_resource_watermarks.get(resource_type) { + Some(my_ts) if peer_ts > my_ts => { + info!( + peer = %peer_id, + resource_type = %resource_type, + my_timestamp = %my_ts, + peer_timestamp = %peer_ts, + "Peer has newer data for this resource" + ); + resources_needing_catchup.push(resource_type.clone()); + we_need_state_catchup = true; + } + None => { + info!( + peer = %peer_id, + resource_type = %resource_type, + peer_timestamp = %peer_ts, + "We have no watermark for this resource, need catch-up" + ); + resources_needing_catchup.push(resource_type.clone()); + we_need_state_catchup = true; + } + _ => { + debug!( + resource_type = %resource_type, + "Resource in sync with peer" + ); + } } } + if we_need_state_catchup { + info!( + peer = %peer_id, + resources = ?resources_needing_catchup, + "Need state catch-up for specific resources" + ); + } + // Compare shared watermarks (HLC) match (my_shared_watermark, peer_shared_watermark) { (Some(my_hlc), Some(peer_hlc)) if peer_hlc > my_hlc => { @@ -555,10 +598,11 @@ impl PeerSync { if let Some(weak_ref) = backfill_mgr.as_ref() { if let Some(manager) = weak_ref.upgrade() { // Trigger incremental catch-up from this peer + // Note: We now use per-resource watermarks instead of global state watermark if let Err(e) = manager .catch_up_from_peer( peer_id, - my_state_watermark, + None, // Per-resource watermarks used instead of global my_shared_watermark.map(|hlc| hlc.to_string()), ) .await @@ -615,7 +659,8 @@ impl PeerSync { } // Update devices table with peer's watermarks for future comparisons - self.update_peer_watermarks(peer_id, peer_state_watermark, peer_shared_watermark) + // Note: We now track per-resource watermarks, not global state watermark + self.update_peer_watermarks(peer_id, None, peer_shared_watermark) .await?; info!(peer = %peer_id, "Watermark exchange complete"); @@ -798,10 +843,11 @@ impl PeerSync { let mut last_lag_warning = std::time::Instant::now(); let lag_warning_cooldown = std::time::Duration::from_secs(5); - // Real-time batching mechanism (accumulate up to 100 entries or 50ms) + // Real-time batching mechanism (configurable via SyncConfig) let mut state_change_batch: Vec = Vec::new(); - let mut batch_flush_interval = - tokio::time::interval(std::time::Duration::from_millis(50)); + let mut batch_flush_interval = tokio::time::interval(std::time::Duration::from_millis( + config.batching.realtime_batch_flush_interval_ms, + )); batch_flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); while is_running.load(Ordering::SeqCst) { @@ -824,18 +870,18 @@ impl PeerSync { state_change_count += 1; last_event_type = Some(format!("StateChange({})", model_type)); - // Add to batch instead of processing immediately - state_change_batch.push(serde_json::json!({ - "library_id": event_library_id, - "model_type": model_type, - "record_uuid": record_uuid, - "device_id": device_id, - "data": data, - "timestamp": timestamp, - })); + // Add to batch instead of processing immediately + state_change_batch.push(serde_json::json!({ + "library_id": event_library_id, + "model_type": model_type, + "record_uuid": record_uuid, + "device_id": device_id, + "data": data, + "timestamp": timestamp, + })); - // Flush if batch reaches 100 entries - if state_change_batch.len() >= 100 { + // Flush if batch reaches configured max entries + if state_change_batch.len() >= config.batching.realtime_batch_max_entries { Self::flush_state_change_batch( library_id, &mut state_change_batch, @@ -919,7 +965,7 @@ impl PeerSync { } } - // Flush batch on timer (every 50ms) + // Flush batch on timer (configurable interval) _ = batch_flush_interval.tick() => { if !state_change_batch.is_empty() { Self::flush_state_change_batch( @@ -1133,22 +1179,31 @@ impl PeerSync { ); // Query our watermarks from sync.db - let (my_state_watermark, my_shared_watermark) = + let (_my_state_watermark, my_shared_watermark) = Self::query_device_watermarks(device_id, peer_log).await; + // Get per-resource watermarks for fine-grained comparison + let my_resource_watermarks = crate::infra::sync::ResourceWatermarkStore::new(device_id) + .get_our_resource_watermarks(peer_log.conn()) + .await + .unwrap_or_else(|e| { + warn!(error = %e, "Failed to get per-resource watermarks in static exchange"); + std::collections::HashMap::new() + }); + debug!( peer = %peer_id, - my_state_watermark = ?my_state_watermark, my_shared_watermark = ?my_shared_watermark, - "Sending watermark exchange request" + resource_count = my_resource_watermarks.len(), + "Sending watermark exchange request with per-resource watermarks" ); // Send request to peer let request = SyncMessage::WatermarkExchangeRequest { library_id, device_id, - my_state_watermark, my_shared_watermark, + my_resource_watermarks, }; network @@ -1249,9 +1304,17 @@ impl PeerSync { let timestamp = data .get("timestamp") .and_then(|v| v.as_str()) - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&chrono::Utc)) - .unwrap_or_else(Utc::now); + .ok_or_else(|| anyhow::anyhow!("Missing or invalid timestamp in state_change event"))?; + + let timestamp = chrono::DateTime::parse_from_rfc3339(timestamp) + .map_err(|e| { + anyhow::anyhow!( + "Failed to parse timestamp '{}': {}. This may indicate clock skew.", + timestamp, + e + ) + })? + .with_timezone(&chrono::Utc); let change = StateChangeMessage { model_type, @@ -2357,9 +2420,6 @@ impl PeerSync { } /// Get device-owned state for backfill (StateRequest) - /// - /// This is completely domain-agnostic - it delegates to the Syncable trait - /// implementations in each entity. No switch statements, no domain logic. pub async fn get_device_state( &self, model_types: Vec, @@ -2601,15 +2661,43 @@ impl PeerSync { } } - // Process dependency tracker (NEW: flush all tracked dependencies) - // Note: We can't easily extract from HashMap during iteration, so we log the issue - // In practice, dependencies should have been resolved during normal operation - // This is a fallback for stuck dependencies + // Process dependency tracker - handle unresolved dependencies + // This is a fallback for dependencies that couldn't be resolved during normal operation if !dep_stats.is_empty() { warn!( - "Dependency tracker still has {} unresolved dependencies - these entries may have circular or missing parent references", - dep_stats.total_dependencies + dependencies = dep_stats.total_dependencies, + waiting_updates = dep_stats.total_waiting_updates, + "Dependency tracker has unresolved dependencies at Ready transition" ); + + // Get list of missing UUIDs for diagnostic purposes + let missing_uuids = self.dependency_tracker.get_pending_dependency_uuids().await; + + if missing_uuids.len() <= 10 { + // Log specific UUIDs if count is manageable + warn!( + ?missing_uuids, + "Missing dependency UUIDs (may be circular references or orphaned data)" + ); + } else { + warn!( + missing_count = missing_uuids.len(), + sample_uuids = ?&missing_uuids[..10], + "Many missing dependencies (showing first 10)" + ); + } + + // Strategy: Clear dependencies after logging to prevent blocking sync indefinitely + // These entries either have: + // - Circular dependencies (impossible to resolve) + // - References to deleted records + // - Incomplete sync data from peer + // They will be resynced on next full backfill if the data becomes available + let cleared_count = self.dependency_tracker.clear_all().await; + warn!( + cleared_count, + "Cleared unresolved dependencies to prevent sync deadlock. These updates will be retried on next full sync." + ); } info!( diff --git a/core/tests/helpers/sync_transport.rs b/core/tests/helpers/sync_transport.rs index 8dd4ade0a..35d53e975 100644 --- a/core/tests/helpers/sync_transport.rs +++ b/core/tests/helpers/sync_transport.rs @@ -201,14 +201,35 @@ impl MockTransport { SyncMessage::WatermarkExchangeRequest { library_id, device_id: requesting_device_id, - my_state_watermark: peer_state_watermark, my_shared_watermark: peer_shared_watermark, + my_resource_watermarks: peer_resource_watermarks, } => { - let (our_state_watermark, our_shared_watermark) = + let (_our_state_watermark, our_shared_watermark) = sync_service.peer_sync().get_watermarks().await; - let needs_state_catchup = matches!((peer_state_watermark, our_state_watermark), (Some(p), Some(o)) if o > p) - || matches!((peer_state_watermark, our_state_watermark), (None, Some(_))); + // Get our per-resource watermarks + let our_resource_watermarks = + sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id) + .get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn()) + .await + .unwrap_or_default(); + + // Compare per-resource watermarks to determine if peer needs catch-up + let mut needs_state_catchup = false; + for (resource_type, our_ts) in &our_resource_watermarks { + match peer_resource_watermarks.get(resource_type) { + Some(peer_ts) if our_ts > peer_ts => { + needs_state_catchup = true; + break; + } + None => { + needs_state_catchup = true; + break; + } + _ => {} + } + } + let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p) || matches!( (peer_shared_watermark, our_shared_watermark), @@ -218,10 +239,10 @@ impl MockTransport { let response = SyncMessage::WatermarkExchangeResponse { library_id, device_id: self.my_device_id, - state_watermark: our_state_watermark, shared_watermark: our_shared_watermark, needs_state_catchup, needs_shared_catchup, + resource_watermarks: our_resource_watermarks, }; self.send_sync_message(sender, response).await?; @@ -229,19 +250,19 @@ impl MockTransport { SyncMessage::WatermarkExchangeResponse { library_id: _, device_id: peer_device_id, - state_watermark: peer_state_watermark, shared_watermark: peer_shared_watermark, needs_state_catchup, needs_shared_catchup, + resource_watermarks: peer_resource_watermarks, } => { sync_service .peer_sync() .on_watermark_exchange_response( peer_device_id, - peer_state_watermark, peer_shared_watermark, needs_state_catchup, needs_shared_catchup, + peer_resource_watermarks, ) .await?; } diff --git a/core/tests/sync_realtime_integration_test.rs b/core/tests/sync_realtime_integration_test.rs index 8fbb810ef..8e782b018 100644 --- a/core/tests/sync_realtime_integration_test.rs +++ b/core/tests/sync_realtime_integration_test.rs @@ -1066,7 +1066,7 @@ impl Drop for SyncTestHarness { // /// Test: Location 1 indexed on Alice, syncs to Bob in real-time -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_realtime_sync_alice_to_bob() -> anyhow::Result<()> { let harness = SyncTestHarness::new("realtime_alice_to_bob").await?; @@ -1206,7 +1206,7 @@ async fn test_realtime_sync_alice_to_bob() -> anyhow::Result<()> { } /// Test: Location indexed on Bob, syncs to Alice (reverse direction) -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_realtime_sync_bob_to_alice() -> anyhow::Result<()> { let harness = SyncTestHarness::new("realtime_bob_to_alice").await?; @@ -1241,7 +1241,7 @@ async fn test_realtime_sync_bob_to_alice() -> anyhow::Result<()> { } /// Test: Concurrent indexing on both devices -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_concurrent_indexing() -> anyhow::Result<()> { let harness = SyncTestHarness::new("concurrent_indexing").await?; @@ -1287,7 +1287,7 @@ async fn test_concurrent_indexing() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_content_identity_linkage() -> anyhow::Result<()> { let harness = SyncTestHarness::new("content_identity_linkage").await?;