From 680585a02fb0e97fa66966ad29b09dee2a2ab7ad Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 16 Nov 2025 06:39:59 -0800 Subject: [PATCH] feat: enhance watermark synchronization with resource count validation - Introduced new functionality to validate resource counts during watermark exchanges, improving synchronization accuracy. - Added methods to retrieve device-owned resource counts for gap detection, ensuring that discrepancies are identified and addressed. - Updated the `SyncMessage` structure to include actual resource counts, facilitating better tracking of synchronization state. - Enhanced the `PeerSync` implementation to clear watermarks for mismatched resources, allowing for surgical recovery in case of count mismatches. - Improved logging for better visibility into synchronization processes and potential issues. --- core/src/infra/sync/watermarks.rs | 48 ++-- .../service/network/protocol/sync/handler.rs | 191 +++++++------ .../service/network/protocol/sync/messages.rs | 8 + core/src/service/sync/peer.rs | 259 +++++++++++++++++- core/tests/helpers/sync_transport.rs | 134 ++++----- workbench | 2 +- 6 files changed, 476 insertions(+), 166 deletions(-) diff --git a/core/src/infra/sync/watermarks.rs b/core/src/infra/sync/watermarks.rs index 9fcab248e..28514be94 100644 --- a/core/src/infra/sync/watermarks.rs +++ b/core/src/infra/sync/watermarks.rs @@ -215,9 +215,7 @@ impl ResourceWatermarkStore { match result { Some(row) => { - let watermark_str: Option = row - .try_get("", "max_watermark") - .ok(); + let watermark_str: Option = row.try_get("", "max_watermark").ok(); if let Some(wm_str) = watermark_str { let dt = DateTime::parse_from_rfc3339(&wm_str) @@ -233,6 +231,34 @@ impl ResourceWatermarkStore { } } + /// Delete watermark for a specific resource type from a peer + /// + /// Used for surgical recovery when count mismatch is detected for a single resource. + pub async fn delete_resource( + &self, + conn: &C, + peer_device_uuid: Uuid, + resource_type: &str, + ) -> Result { + let result = conn + .execute(Statement::from_sql_and_values( + DbBackend::Sqlite, + r#" + DELETE FROM device_resource_watermarks + WHERE device_uuid = ? AND peer_device_uuid = ? AND resource_type = ? + "#, + vec![ + self.device_uuid.to_string().into(), + peer_device_uuid.to_string().into(), + resource_type.into(), + ], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + Ok(result.rows_affected() > 0) + } + /// Delete all watermarks for a peer (cleanup on peer removal) pub async fn delete_peer( &self, @@ -347,10 +373,7 @@ mod tests { // Verify retrieval let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap(); assert!(retrieved.is_some()); - assert_eq!( - retrieved.unwrap().timestamp(), - timestamp1.timestamp() - ); + assert_eq!(retrieved.unwrap().timestamp(), timestamp1.timestamp()); // Update with newer timestamp let timestamp2 = timestamp1 + chrono::Duration::seconds(10); @@ -361,10 +384,7 @@ mod tests { // Verify update let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap(); - assert_eq!( - retrieved.unwrap().timestamp(), - timestamp2.timestamp() - ); + assert_eq!(retrieved.unwrap().timestamp(), timestamp2.timestamp()); // Attempt update with older timestamp (should be ignored) let timestamp0 = timestamp1 - chrono::Duration::seconds(10); @@ -375,10 +395,7 @@ mod tests { // Verify still has timestamp2 (newer) let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap(); - assert_eq!( - retrieved.unwrap().timestamp(), - timestamp2.timestamp() - ); + assert_eq!(retrieved.unwrap().timestamp(), timestamp2.timestamp()); } #[tokio::test] @@ -468,4 +485,3 @@ mod tests { assert_eq!(all.len(), 0); } } - diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index 7a716d9c3..86e075a6a 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -394,99 +394,122 @@ impl SyncProtocolHandler { })) } - SyncMessage::WatermarkExchangeRequest { - library_id, - device_id, - my_shared_watermark: peer_shared_watermark, - my_resource_watermarks: peer_resource_watermarks, - } => { - debug!( - from_device = %from_device, - peer_shared_watermark = ?peer_shared_watermark, - peer_resource_count = peer_resource_watermarks.len(), - "Processing WatermarkExchangeRequest with per-resource watermarks" - ); + SyncMessage::WatermarkExchangeRequest { + library_id, + device_id, + my_shared_watermark: peer_shared_watermark, + my_resource_watermarks: peer_resource_watermarks, + my_peer_resource_counts: peer_counts_of_our_data, + } => { + debug!( + from_device = %from_device, + peer_shared_watermark = ?peer_shared_watermark, + peer_resource_count = peer_resource_watermarks.len(), + peer_counts_of_us = ?peer_counts_of_our_data, + "Processing WatermarkExchangeRequest with count validation" + ); - // Get our current watermarks - let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await; - let our_resource_watermarks = - crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id()) - .get_our_resource_watermarks(peer_sync.peer_log_conn()) - .await - .unwrap_or_default(); + // Get our current watermarks + let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await; + let our_resource_watermarks = + crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id()) + .get_our_resource_watermarks(peer_sync.peer_log_conn()) + .await + .unwrap_or_default(); - // Determine if peer needs catch-up by comparing per-resource watermarks - let mut needs_state_catchup = false; - for (resource_type, our_ts) in &our_resource_watermarks { - match peer_resource_watermarks.get(resource_type) { - Some(peer_ts) if our_ts > peer_ts => { - needs_state_catchup = true; - break; - } - None => { - needs_state_catchup = true; - break; - } - _ => {} + // Get our ACTUAL counts (what we truly own) + let our_actual_resource_counts = crate::service::sync::PeerSync::get_device_owned_counts( + peer_sync.device_id(), + peer_sync.db(), + ) + .await + .unwrap_or_default(); + + // Determine if peer needs catch-up by comparing per-resource watermarks + let mut needs_state_catchup = false; + for (resource_type, our_ts) in &our_resource_watermarks { + match peer_resource_watermarks.get(resource_type) { + Some(peer_ts) if our_ts > peer_ts => { + needs_state_catchup = true; + break; } + None => { + needs_state_catchup = true; + break; + } + _ => {} } - - let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) { - (Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc, - (None, Some(_)) => true, - _ => false, - }; - - info!( - from_device = %from_device, - needs_state_catchup = needs_state_catchup, - needs_shared_catchup = needs_shared_catchup, - our_resource_count = our_resource_watermarks.len(), - "Responding to watermark exchange request with per-resource watermarks" - ); - - Ok(Some(SyncMessage::WatermarkExchangeResponse { - library_id: self.library_id, - device_id: peer_sync.device_id(), - shared_watermark: our_shared_watermark, - needs_state_catchup, - needs_shared_catchup, - resource_watermarks: our_resource_watermarks, - })) } - SyncMessage::WatermarkExchangeResponse { - library_id, - device_id, - shared_watermark: peer_shared_watermark, + // Also check if we have resources peer doesn't know about + for resource_type in our_resource_watermarks.keys() { + if !peer_resource_watermarks.contains_key(resource_type) { + needs_state_catchup = true; + break; + } + } + + let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) { + (Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc, + (None, Some(_)) => true, + _ => false, + }; + + debug!( + from_device = %from_device, + needs_state_catchup = needs_state_catchup, + needs_shared_catchup = needs_shared_catchup, + our_resource_count = our_resource_watermarks.len(), + our_actual_counts = ?our_actual_resource_counts, + "Responding to watermark exchange with actual counts" + ); + + Ok(Some(SyncMessage::WatermarkExchangeResponse { + library_id: self.library_id, + device_id: peer_sync.device_id(), + shared_watermark: our_shared_watermark, needs_state_catchup, needs_shared_catchup, - resource_watermarks: peer_resource_watermarks, - } => { - debug!( - from_device = %from_device, - peer_shared_watermark = ?peer_shared_watermark, - needs_state_catchup = needs_state_catchup, - needs_shared_catchup = needs_shared_catchup, - peer_resource_count = peer_resource_watermarks.len(), - "Processing WatermarkExchangeResponse with per-resource watermarks" - ); + resource_watermarks: our_resource_watermarks, + my_actual_resource_counts: our_actual_resource_counts, + })) + } - peer_sync - .on_watermark_exchange_response( - from_device, - peer_shared_watermark, - needs_state_catchup, - needs_shared_catchup, - peer_resource_watermarks, - ) - .await - .map_err(|e| { - NetworkingError::Protocol(format!( - "Failed to handle watermark exchange response: {}", - e - )) - })?; + SyncMessage::WatermarkExchangeResponse { + library_id, + device_id, + shared_watermark: peer_shared_watermark, + needs_state_catchup, + needs_shared_catchup, + resource_watermarks: peer_resource_watermarks, + my_actual_resource_counts: peer_actual_counts, + } => { + debug!( + from_device = %from_device, + peer_shared_watermark = ?peer_shared_watermark, + needs_state_catchup = needs_state_catchup, + needs_shared_catchup = needs_shared_catchup, + peer_resource_count = peer_resource_watermarks.len(), + peer_actual_counts = ?peer_actual_counts, + "Processing WatermarkExchangeResponse with counts" + ); + + peer_sync + .on_watermark_exchange_response( + from_device, + peer_shared_watermark, + needs_state_catchup, + needs_shared_catchup, + peer_resource_watermarks, + peer_actual_counts, + ) + .await + .map_err(|e| { + NetworkingError::Protocol(format!( + "Failed to handle watermark exchange response: {}", + e + )) + })?; Ok(None) } diff --git a/core/src/service/network/protocol/sync/messages.rs b/core/src/service/network/protocol/sync/messages.rs index ee6dfee4b..8e7c3cfc5 100644 --- a/core/src/service/network/protocol/sync/messages.rs +++ b/core/src/service/network/protocol/sync/messages.rs @@ -106,6 +106,10 @@ pub enum SyncMessage { my_shared_watermark: Option, /// Per-resource state watermarks (resource_type -> timestamp) my_resource_watermarks: std::collections::HashMap>, + /// Counts of peer's device-owned resources that we have synced (for gap detection) + /// Maps resource_type -> count of records we have from this peer + #[serde(default)] + my_peer_resource_counts: std::collections::HashMap, }, /// Response with peer's watermarks @@ -117,6 +121,10 @@ pub enum SyncMessage { needs_shared_catchup: bool, // If true, peer needs our shared changes /// Per-resource state watermarks (resource_type -> timestamp) resource_watermarks: std::collections::HashMap>, + /// Our actual device-owned resource counts (for gap detection) + /// Maps resource_type -> actual count of records we own + #[serde(default)] + my_actual_resource_counts: std::collections::HashMap, }, /// Proactive notification that sender has new data available diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index b81abcc11..e11460a10 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -261,6 +261,123 @@ impl PeerSync { self.peer_log.conn() } + /// Get counts of device-owned resources owned by a specific device + /// + /// Used for gap detection during watermark exchange. + /// Only counts non-deleted records where device ownership matches. + pub async fn get_device_owned_counts( + owner_device_id: Uuid, + db: &DatabaseConnection, + ) -> Result> { + use crate::infra::db::entities::{device, entry, location, volume}; + use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect}; + + let mut counts = std::collections::HashMap::new(); + + // Get device's internal ID from UUID + let device = device::Entity::find() + .filter(device::Column::Uuid.eq(owner_device_id)) + .one(db) + .await?; + + let device_internal_id = match device { + Some(d) => d.id, + None => { + debug!(device_id = %owner_device_id, "Device not found for count query"); + return Ok(counts); + } + }; + + // Location count (owned by device) + let location_count = location::Entity::find() + .filter(location::Column::DeviceId.eq(device_internal_id)) + .count(db) + .await + .unwrap_or(0); + counts.insert("location".to_string(), location_count); + + // Entry count (via location ownership chain) + // Query entries where location.device_id matches + let entry_count: u64 = { + use sea_orm::sea_query::{Expr, Query}; + use sea_orm::{FromQueryResult, Statement}; + + #[derive(FromQueryResult)] + struct CountResult { + count: i64, + } + + let stmt = Statement::from_sql_and_values( + sea_orm::DbBackend::Sqlite, + r#" + SELECT COUNT(*) as count + FROM entries e + INNER JOIN locations l ON e.location_id = l.id + WHERE l.device_id = ? + "#, + vec![device_internal_id.into()], + ); + + let result = CountResult::find_by_statement(stmt) + .one(db) + .await + .unwrap_or(None); + + result.map(|r| r.count as u64).unwrap_or(0) + }; + counts.insert("entry".to_string(), entry_count); + + // Volume count (owned by device) + let volume_count = volume::Entity::find() + .filter(volume::Column::DeviceId.eq(device_internal_id)) + .count(db) + .await + .unwrap_or(0); + counts.insert("volume".to_string(), volume_count); + + debug!( + device_id = %owner_device_id, + location_count = location_count, + entry_count = entry_count, + volume_count = volume_count, + "Queried device-owned resource counts" + ); + + Ok(counts) + } + + /// Clear watermarks for specific resources (surgical recovery) + /// + /// Called when count mismatch is detected for specific resources. + /// Only clears the mismatched resources, leaving correct watermarks intact. + async fn clear_resource_watermarks( + &self, + peer_id: Uuid, + resource_types: Vec, + ) -> Result<()> { + let store = crate::infra::sync::ResourceWatermarkStore::new(self.device_id); + + let mut cleared_count = 0; + for resource_type in &resource_types { + if store + .delete_resource(self.peer_log.conn(), peer_id, resource_type) + .await + .map_err(|e| anyhow::anyhow!("Failed to clear resource watermark: {}", e))? + { + cleared_count += 1; + } + } + + info!( + peer = %peer_id, + resources = ?resource_types, + cleared = cleared_count, + "Cleared watermarks for mismatched resources only (surgical recovery)" + ); + + Ok(()) + } + /// Check if real-time sync is currently active for a specific peer /// /// Returns true if real-time broadcasts to this peer succeeded in the last 30 seconds. @@ -530,11 +647,20 @@ impl PeerSync { std::collections::HashMap::new() }); + // Get counts of peer's device-owned resources that we have synced (for gap detection) + let my_peer_resource_counts = Self::get_device_owned_counts(peer_id, &self.db) + .await + .unwrap_or_else(|e| { + warn!(error = %e, peer = %peer_id, "Failed to get peer resource counts"); + std::collections::HashMap::new() + }); + debug!( peer = %peer_id, my_shared_watermark = ?my_shared_watermark, resource_count = my_resource_watermarks.len(), - "Sending watermark exchange request with per-resource watermarks" + peer_owned_counts = ?my_peer_resource_counts, + "Sending watermark exchange request with counts for gap detection" ); // Send request to peer @@ -543,6 +669,7 @@ impl PeerSync { device_id: self.device_id, my_shared_watermark, my_resource_watermarks, + my_peer_resource_counts, }; self.network @@ -552,7 +679,7 @@ impl PeerSync { info!( peer = %peer_id, - "Watermark exchange request sent, waiting for response" + "Watermark exchange request sent with resource counts" ); Ok(()) @@ -569,6 +696,7 @@ impl PeerSync { needs_state_catchup: bool, needs_shared_catchup: bool, peer_resource_watermarks: std::collections::HashMap>, + peer_actual_resource_counts: std::collections::HashMap, ) -> Result<()> { info!( peer = %peer_id, @@ -576,7 +704,7 @@ impl PeerSync { needs_state_catchup = needs_state_catchup, needs_shared_catchup = needs_shared_catchup, peer_resource_count = peer_resource_watermarks.len(), - "Received watermark exchange response with per-resource watermarks" + "Received watermark exchange response with resource counts" ); // Get our watermarks to compare @@ -587,6 +715,124 @@ impl PeerSync { .await .unwrap_or_default(); + // Count-based gap detection (detects watermark leapfrog bugs) + // Only run when NOT in real-time sync or active backfill + let state = self.state().await; + let realtime_active = self.is_realtime_active_for_peer(peer_id).await; + let in_stable_state = state.is_ready() && !realtime_active; + + if in_stable_state && !peer_actual_resource_counts.is_empty() { + // Get what we think peer has + let my_counts_of_peer_data = Self::get_device_owned_counts(peer_id, &self.db) + .await + .unwrap_or_default(); + + let mut mismatched_resource_types = Vec::new(); + let mut mismatch_details = Vec::new(); + + for (resource_type, peer_actual_count) in &peer_actual_resource_counts { + let our_count = my_counts_of_peer_data + .get(resource_type) + .copied() + .unwrap_or(0); + + // Only flag mismatch if watermarks appear synchronized + // If watermarks already show divergence, normal catch-up will handle it + let watermarks_appear_synced = match ( + my_resource_watermarks.get(resource_type), + peer_resource_watermarks.get(resource_type), + ) { + (Some(my_ts), Some(peer_ts)) => { + let diff_seconds = (my_ts.timestamp() - peer_ts.timestamp()).abs(); + diff_seconds < 10 // Within 10 seconds = appear synchronized + } + (None, None) => true, // Both missing watermarks = appear synchronized + _ => false, // One has watermark, one doesn't = watermarks diverge, catch-up will fix it + }; + + if our_count != *peer_actual_count && watermarks_appear_synced { + warn!( + peer = %peer_id, + resource = %resource_type, + our_count = our_count, + peer_actual = peer_actual_count, + gap = i64::abs(our_count as i64 - *peer_actual_count as i64), + our_watermark = ?my_resource_watermarks.get(resource_type), + peer_watermark = ?peer_resource_watermarks.get(resource_type), + "COUNT MISMATCH WITH SYNCED WATERMARKS! Watermark leapfrog bug detected" + ); + mismatched_resource_types.push(resource_type.clone()); + mismatch_details.push(format!( + "{}({}/{}, wm_diff={}s)", + resource_type, + our_count, + peer_actual_count, + my_resource_watermarks + .get(resource_type) + .and_then(|my_ts| { + peer_resource_watermarks + .get(resource_type) + .map(|peer_ts| (my_ts.timestamp() - peer_ts.timestamp()).abs()) + }) + .unwrap_or(0) + )); + } else if our_count != *peer_actual_count { + debug!( + peer = %peer_id, + resource = %resource_type, + our_count = our_count, + peer_actual = peer_actual_count, + "Count mismatch but watermarks diverge - normal catch-up will fix it" + ); + } + } + + if !mismatched_resource_types.is_empty() { + error!( + peer = %peer_id, + mismatches = ?mismatch_details, + resources = ?mismatched_resource_types, + "Count mismatch indicates watermark leapfrog bug, clearing only affected resources" + ); + + // Clear watermarks only for mismatched resources + // This preserves correct watermarks for other resources + self.clear_resource_watermarks(peer_id, mismatched_resource_types.clone()) + .await?; + + // Trigger catch-up for affected resources + let backfill_mgr = self.backfill_manager.read().await; + if let Some(weak_ref) = backfill_mgr.as_ref() { + if let Some(manager) = weak_ref.upgrade() { + info!( + peer = %peer_id, + resources = ?mismatched_resource_types, + "Initiating targeted backfill for resources with count mismatch" + ); + // Force full backfill for affected resources (no watermarks) + manager.catch_up_from_peer(peer_id, None, None).await?; + } + } else { + warn!("BackfillManager not available, cannot trigger recovery backfill"); + } + + return Ok(()); + } else { + debug!( + peer = %peer_id, + "Count validation passed - no gaps detected" + ); + } + } else { + debug!( + peer = %peer_id, + state = ?state, + realtime_active = realtime_active, + has_counts = !peer_actual_resource_counts.is_empty(), + "Skipping count-based gap detection - not in stable state or no counts provided" + ); + } + // Determine if WE need to catch up based on per-resource watermark comparison let mut we_need_state_catchup = false; let mut we_need_shared_catchup = false; @@ -1334,11 +1580,15 @@ impl PeerSync { std::collections::HashMap::new() }); + // Note: Counts unavailable in static context (no db access) + // This is fine - counts only used when called from instance method + let my_peer_resource_counts = std::collections::HashMap::new(); + debug!( peer = %peer_id, my_shared_watermark = ?my_shared_watermark, resource_count = my_resource_watermarks.len(), - "Sending watermark exchange request with per-resource watermarks" + "Sending watermark exchange request (static trigger, counts unavailable)" ); // Send request to peer @@ -1347,6 +1597,7 @@ impl PeerSync { device_id, my_shared_watermark, my_resource_watermarks, + my_peer_resource_counts, }; network diff --git a/core/tests/helpers/sync_transport.rs b/core/tests/helpers/sync_transport.rs index a51c646bb..a85dcbeea 100644 --- a/core/tests/helpers/sync_transport.rs +++ b/core/tests/helpers/sync_transport.rs @@ -198,55 +198,65 @@ impl MockTransport { } } } - SyncMessage::WatermarkExchangeRequest { - library_id, - device_id: requesting_device_id, - my_shared_watermark: peer_shared_watermark, - my_resource_watermarks: peer_resource_watermarks, - } => { - let (_our_state_watermark, our_shared_watermark) = - sync_service.peer_sync().get_watermarks().await; + SyncMessage::WatermarkExchangeRequest { + library_id, + device_id: requesting_device_id, + my_shared_watermark: peer_shared_watermark, + my_resource_watermarks: peer_resource_watermarks, + my_peer_resource_counts: _peer_counts, + } => { + let (_our_state_watermark, our_shared_watermark) = + sync_service.peer_sync().get_watermarks().await; - // Get our per-resource watermarks - let our_resource_watermarks = - sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id) - .get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn()) - .await - .unwrap_or_default(); + // Get our per-resource watermarks + let our_resource_watermarks = + sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id) + .get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn()) + .await + .unwrap_or_default(); - // Compare per-resource watermarks to determine if peer needs catch-up - let mut needs_state_catchup = false; - for (resource_type, our_ts) in &our_resource_watermarks { - match peer_resource_watermarks.get(resource_type) { - Some(peer_ts) if our_ts > peer_ts => { - needs_state_catchup = true; - break; - } - None => { - needs_state_catchup = true; - break; - } - _ => {} + // Get our actual counts + let our_actual_counts = sd_core::service::sync::PeerSync::get_device_owned_counts( + self.my_device_id, + sync_service.peer_sync().db(), + ) + .await + .unwrap_or_default(); + + // Compare per-resource watermarks to determine if peer needs catch-up + let mut needs_state_catchup = false; + for (resource_type, our_ts) in &our_resource_watermarks { + match peer_resource_watermarks.get(resource_type) { + Some(peer_ts) if our_ts > peer_ts => { + needs_state_catchup = true; + break; } + None => { + needs_state_catchup = true; + break; + } + _ => {} } + } - let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p) - || matches!( - (peer_shared_watermark, our_shared_watermark), - (None, Some(_)) - ); + let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p) + || matches!( + (peer_shared_watermark, our_shared_watermark), + (None, Some(_)) + ); - let response = SyncMessage::WatermarkExchangeResponse { - library_id, - device_id: self.my_device_id, - shared_watermark: our_shared_watermark, - needs_state_catchup, - needs_shared_catchup, - resource_watermarks: our_resource_watermarks, - }; + let response = SyncMessage::WatermarkExchangeResponse { + library_id, + device_id: self.my_device_id, + shared_watermark: our_shared_watermark, + needs_state_catchup, + needs_shared_catchup, + resource_watermarks: our_resource_watermarks, + my_actual_resource_counts: our_actual_counts, + }; - self.send_sync_message(sender, response).await?; - } + self.send_sync_message(sender, response).await?; + } SyncMessage::DataAvailableNotification { device_id, resource_types, @@ -264,25 +274,27 @@ impl MockTransport { warn!(error = %e, "Failed to trigger watermark exchange from notification"); } } - SyncMessage::WatermarkExchangeResponse { - library_id: _, - device_id: peer_device_id, - shared_watermark: peer_shared_watermark, - needs_state_catchup, - needs_shared_catchup, - resource_watermarks: peer_resource_watermarks, - } => { - sync_service - .peer_sync() - .on_watermark_exchange_response( - peer_device_id, - peer_shared_watermark, - needs_state_catchup, - needs_shared_catchup, - peer_resource_watermarks, - ) - .await?; - } + SyncMessage::WatermarkExchangeResponse { + library_id: _, + device_id: peer_device_id, + shared_watermark: peer_shared_watermark, + needs_state_catchup, + needs_shared_catchup, + resource_watermarks: peer_resource_watermarks, + my_actual_resource_counts: peer_actual_counts, + } => { + sync_service + .peer_sync() + .on_watermark_exchange_response( + peer_device_id, + peer_shared_watermark, + needs_state_catchup, + needs_shared_catchup, + peer_resource_watermarks, + peer_actual_counts, + ) + .await?; + } SyncMessage::StateRequest { library_id, model_types, diff --git a/workbench b/workbench index 93727d779..678ef4e29 160000 --- a/workbench +++ b/workbench @@ -1 +1 @@ -Subproject commit 93727d779103a317148545964b9ec35a475b4cba +Subproject commit 678ef4e29577a8a91716795d5e83044d6f824839