From 08d5e7badf484f520c765e9655ede2bc7a90ab71 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 16 Nov 2025 05:40:30 -0800 Subject: [PATCH] feat: add DataAvailableNotification for proactive data sync notifications - Introduced a new `DataAvailableNotification` message type to notify peers of new data availability, triggering immediate watermark exchanges for synchronization. - Enhanced the `PeerSync` implementation to handle notifications and manage real-time activity tracking per peer, improving synchronization efficiency. - Updated the `SyncProtocolHandler` to process the new notification type and initiate catch-up logic accordingly. - Implemented periodic watermark checks to ensure timely synchronization and prevent missed events during idle periods. --- .../service/network/protocol/sync/handler.rs | 25 ++ .../service/network/protocol/sync/messages.rs | 15 ++ core/src/service/sync/mod.rs | 84 +++---- core/src/service/sync/peer.rs | 217 +++++++++++++++--- core/tests/helpers/sync_transport.rs | 21 +- 5 files changed, 288 insertions(+), 74 deletions(-) diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index d55649df1..7a716d9c3 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -488,6 +488,31 @@ impl SyncProtocolHandler { )) })?; + Ok(None) + } + + SyncMessage::DataAvailableNotification { + library_id, + device_id, + resource_types, + approx_count, + } => { + info!( + from_device = %from_device, + resources = ?resource_types, + count = approx_count, + "Peer notified us of new data available, triggering watermark exchange" + ); + + // Trigger immediate watermark exchange to discover and sync new data + if let Err(e) = peer_sync.exchange_watermarks_and_catchup(from_device).await { + warn!( + from_device = %from_device, + error = %e, + "Failed to trigger watermark exchange after data available notification" + ); + } + Ok(None) } diff --git a/core/src/service/network/protocol/sync/messages.rs b/core/src/service/network/protocol/sync/messages.rs index acc303b6b..ee6dfee4b 100644 --- a/core/src/service/network/protocol/sync/messages.rs +++ b/core/src/service/network/protocol/sync/messages.rs @@ -119,6 +119,19 @@ pub enum SyncMessage { resource_watermarks: std::collections::HashMap>, }, + /// Proactive notification that sender has new data available + /// + /// Triggers watermark exchange on receiver to discover and sync new data. + /// Sent after bulk operations complete (indexing, bulk tag application, etc.) + DataAvailableNotification { + library_id: Uuid, + device_id: Uuid, // Sender device + /// Hint about which resource types changed (for logging/metrics) + resource_types: Vec, + /// Approximate count of changes (for logging/metrics) + approx_count: u64, + }, + /// Error response Error { library_id: Uuid, message: String }, } @@ -147,6 +160,7 @@ impl SyncMessage { | SyncMessage::Heartbeat { library_id, .. } | SyncMessage::WatermarkExchangeRequest { library_id, .. } | SyncMessage::WatermarkExchangeResponse { library_id, .. } + | SyncMessage::DataAvailableNotification { library_id, .. } | SyncMessage::Error { library_id, .. } => *library_id, } } @@ -171,6 +185,7 @@ impl SyncMessage { | SyncMessage::SharedChange { .. } | SyncMessage::SharedChangeBatch { .. } | SyncMessage::AckSharedChanges { .. } + | SyncMessage::DataAvailableNotification { .. } ) } } diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index 25adb303c..443572bd3 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -347,51 +347,55 @@ impl SyncService { } }; - // Check if real-time sync is active (lock mechanism) - // If real-time broadcasts are happening, skip catch-up to prevent duplication - let realtime_active = peer_sync.is_realtime_active().await; + // Pick first partner for catch-up check + let catch_up_peer = partners[0]; - // Trigger catch-up if: - // - Real-time is NOT active (60+ seconds since last broadcast), AND - // - We haven't synced recently (fallback time check) - let should_catch_up = if realtime_active { - debug!("Skipping catch-up - real-time sync is active (lock mechanism)"); - false - } else if let Some(last_sync) = our_device.last_sync_at { - let time_since_sync = chrono::Utc::now().signed_duration_since(last_sync); - time_since_sync.num_seconds() > 60 - } else { - true - }; + // Check if real-time sync is active for this specific peer + // Per-peer tracking prevents one stuck peer from blocking all catch-up + let realtime_active = peer_sync.is_realtime_active_for_peer(catch_up_peer).await; - // Check if we should retry based on exponential backoff - if should_catch_up && retry_state.should_retry() { - // Check if we should escalate to full backfill after repeated failures - if retry_state.should_escalate() { - warn!( - failures = retry_state.consecutive_failures, - "Too many catch-up failures, escalating to full backfill" - ); - retry_state.record_success(); // Reset retry state + // Trigger catch-up if: + // - Real-time is NOT active (30+ seconds since last successful broadcast to this peer), AND + // - We haven't synced recently (fallback time check) + let should_catch_up = if realtime_active { + debug!( + peer = %catch_up_peer, + "Skipping catch-up - real-time sync to this peer is active" + ); + false + } else if let Some(last_sync) = our_device.last_sync_at { + let time_since_sync = chrono::Utc::now().signed_duration_since(last_sync); + time_since_sync.num_seconds() > 60 + } else { + true + }; - // Transition to Uninitialized to trigger full backfill - let mut state = peer_sync.state.write().await; - *state = DeviceSyncState::Uninitialized; - backfill_attempted = false; // Allow backfill to run again - continue; // Skip to next iteration - } - - // Get current watermarks from sync.db - let (state_watermark, shared_watermark) = peer_sync.get_watermarks().await; - - info!( - "Triggering incremental catch-up since watermarks: state={:?}, shared={:?}", - state_watermark, - shared_watermark + // Check if we should retry based on exponential backoff + if should_catch_up && retry_state.should_retry() { + // Check if we should escalate to full backfill after repeated failures + if retry_state.should_escalate() { + warn!( + failures = retry_state.consecutive_failures, + "Too many catch-up failures, escalating to full backfill" ); + retry_state.record_success(); // Reset retry state - // Pick first partner for catch-up - let catch_up_peer = partners[0]; + // Transition to Uninitialized to trigger full backfill + let mut state = peer_sync.state.write().await; + *state = DeviceSyncState::Uninitialized; + backfill_attempted = false; // Allow backfill to run again + continue; // Skip to next iteration + } + + // Get current watermarks from sync.db + let (state_watermark, shared_watermark) = peer_sync.get_watermarks().await; + + info!( + peer = %catch_up_peer, + "Triggering incremental catch-up since watermarks: state={:?}, shared={:?}", + state_watermark, + shared_watermark + ); // Transition to CatchingUp state { diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index dacb1043b..b81abcc11 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -126,9 +126,12 @@ pub struct PeerSync { /// Buffer for updates during backfill/catch-up buffer: Arc, - /// Last real-time broadcast activity (for catch-up lock mechanism) - /// When real-time broadcasts are active, catch-up is skipped to prevent duplication - last_realtime_activity: Arc>>>, + /// Last successful real-time broadcast per peer (for catch-up lock mechanism) + /// When real-time broadcasts to a specific peer are active, catch-up for that peer + /// is skipped to prevent duplication. Per-peer tracking prevents one stuck peer + /// from blocking recovery for all peers. + last_realtime_activity_per_peer: + Arc>>>, /// HLC generator for this device hlc_generator: Arc>, @@ -199,7 +202,9 @@ impl PeerSync { network, state: Arc::new(RwLock::new(DeviceSyncState::Uninitialized)), buffer: Arc::new(BufferQueue::new()), - last_realtime_activity: Arc::new(RwLock::new(None)), + last_realtime_activity_per_peer: Arc::new( + RwLock::new(std::collections::HashMap::new()), + ), hlc_generator: Arc::new(tokio::sync::Mutex::new(HLCGenerator::new(device_id))), peer_log, watermark_store, @@ -256,22 +261,31 @@ impl PeerSync { self.peer_log.conn() } - /// Check if real-time sync is currently active (lock mechanism) + /// Check if real-time sync is currently active for a specific peer /// - /// Returns true if real-time broadcasts happened in the last 60 seconds. + /// Returns true if real-time broadcasts to this peer succeeded in the last 30 seconds. /// Used to prevent catch-up from overlapping with active real-time sync. - pub async fn is_realtime_active(&self) -> bool { - if let Some(last_activity) = *self.last_realtime_activity.read().await { - let elapsed = chrono::Utc::now().signed_duration_since(last_activity); - elapsed.num_seconds() < 60 + /// Per-peer tracking prevents one stuck peer from blocking recovery for others. + pub async fn is_realtime_active_for_peer(&self, peer_id: Uuid) -> bool { + if let Some(last_activity) = self + .last_realtime_activity_per_peer + .read() + .await + .get(&peer_id) + { + let elapsed = chrono::Utc::now().signed_duration_since(*last_activity); + elapsed.num_seconds() < 30 // Reduced from 60s to 30s for faster recovery } else { false } } - /// Update real-time activity timestamp (called after successful broadcast) - async fn mark_realtime_activity(&self) { - *self.last_realtime_activity.write().await = Some(chrono::Utc::now()); + /// Update real-time activity timestamp for a specific peer (called after successful broadcast) + async fn mark_realtime_activity_for_peer(&self, peer_id: Uuid) { + self.last_realtime_activity_per_peer + .write() + .await + .insert(peer_id, chrono::Utc::now()); } /// Get per-resource watermark for a specific peer and resource type @@ -441,6 +455,57 @@ impl PeerSync { Ok(()) } + /// Notify all connected peers that we have new data available + /// + /// This proactively triggers watermark exchange on peers after bulk operations. + /// Prevents the "20-minute idle" bug where events die and peers don't notice. + pub async fn notify_peers_of_new_data( + &self, + resource_types: Vec, + approx_count: u64, + ) -> Result<()> { + let connected_partners = self + .network + .get_connected_sync_partners(self.library_id, &self.db) + .await?; + + if connected_partners.is_empty() { + debug!("No connected partners, skipping data available notification"); + return Ok(()); + } + + info!( + resources = ?resource_types, + count = approx_count, + peer_count = connected_partners.len(), + "Notifying peers of new data available" + ); + + let notification = SyncMessage::DataAvailableNotification { + library_id: self.library_id, + device_id: self.device_id, + resource_types, + approx_count, + }; + + // Broadcast to all connected peers (fire and forget - they'll request via watermark exchange) + for peer_id in connected_partners { + if let Err(e) = self + .network + .send_sync_message(peer_id, notification.clone()) + .await + { + warn!( + peer = %peer_id, + error = %e, + "Failed to send data available notification" + ); + } + } + + Ok(()) + } + /// Exchange watermarks with a peer and trigger catch-up if needed /// /// Sends a WatermarkExchangeRequest to the peer with our watermarks. @@ -726,6 +791,9 @@ impl PeerSync { // Start background task for periodic log pruning self.start_log_pruner(); + // Start periodic watermark check (safety net for missed events) + self.start_periodic_watermark_check(); + Ok(()) } @@ -811,6 +879,69 @@ impl PeerSync { }); } + /// Start periodic watermark check (safety net for missed events) + /// + /// Exchanges watermarks with all connected peers every 5 minutes to ensure + /// sync divergence is detected even if events are dropped or broadcasts fail. + fn start_periodic_watermark_check(&self) { + let library_id = self.library_id; + let device_id = self.device_id; + let network = self.network.clone(); + let db = self.db.clone(); + let peer_log = self.peer_log.clone(); + let is_running = self.is_running.clone(); + + tokio::spawn(async move { + info!("Started periodic watermark check (every 5 minutes)"); + + // Wait 5 minutes before first check (allow initial sync to complete) + tokio::time::sleep(tokio::time::Duration::from_secs(300)).await; + + while is_running.load(Ordering::SeqCst) { + // Check every 5 minutes + tokio::time::sleep(tokio::time::Duration::from_secs(300)).await; + + // Get connected sync partners + match network.get_connected_sync_partners(library_id, &db).await { + Ok(partners) if !partners.is_empty() => { + debug!( + partner_count = partners.len(), + "Running periodic watermark check" + ); + + // Exchange watermarks with all peers + for peer_id in partners { + if let Err(e) = Self::trigger_watermark_exchange( + library_id, device_id, peer_id, &peer_log, &network, + ) + .await + { + debug!( + peer = %peer_id, + error = %e, + "Periodic watermark check failed for peer" + ); + } else { + debug!( + peer = %peer_id, + "Periodic watermark exchange triggered" + ); + } + } + } + Ok(_) => { + debug!("No connected partners for periodic watermark check"); + } + Err(e) => { + warn!(error = %e, "Failed to get connected partners for periodic watermark check"); + } + } + } + + info!("Periodic watermark check stopped"); + }); + } + /// Start event listener for TransactionManager sync events fn start_event_listener(&self) { // Clone necessary fields for the spawned task @@ -822,7 +953,7 @@ impl PeerSync { ); let state = self.state.clone(); let buffer = self.buffer.clone(); - let last_realtime_activity = self.last_realtime_activity.clone(); + let last_realtime_activity_per_peer = self.last_realtime_activity_per_peer.clone(); let db = self.db.clone(); let event_bus_for_emit = self.event_bus.clone(); let retry_queue = self.retry_queue.clone(); @@ -891,7 +1022,7 @@ impl PeerSync { &retry_queue, &db, &config, - &last_realtime_activity, + &last_realtime_activity_per_peer, ).await; } } @@ -977,7 +1108,7 @@ impl PeerSync { &retry_queue, &db, &config, - &last_realtime_activity, + &last_realtime_activity_per_peer, ).await; } } @@ -999,7 +1130,7 @@ impl PeerSync { &retry_queue, &db, &config, - &last_realtime_activity, + &last_realtime_activity_per_peer, ) .await; } @@ -1023,6 +1154,7 @@ impl PeerSync { let library_id = self.library_id; let device_id = self.device_id; let network = self.network.clone(); + let peer_log = self.peer_log.clone(); // CRITICAL FIX: Pass peer_log to spawned task tokio::spawn(async move { info!("PeerSync network event listener started"); @@ -1053,13 +1185,24 @@ impl PeerSync { ); } - // Trigger watermark exchange for reconnection sync - // Note: We need peer_log reference here, which is not available in static context - // This will be triggered via PeerSync::exchange_watermarks_and_catchup() instead - debug!( - peer_id = %peer_id, - "Skipping watermark exchange from network listener (will be handled by PeerSync)" - ); + // CRITICAL FIX: Actually trigger watermark exchange on reconnection + // This fixes the 20-minute idle bug where events die but no recovery happens + if let Err(e) = Self::trigger_watermark_exchange( + library_id, device_id, peer_id, &peer_log, &network, + ) + .await + { + warn!( + peer_id = %peer_id, + error = %e, + "Failed to trigger watermark exchange on reconnection" + ); + } else { + info!( + peer_id = %peer_id, + "Watermark exchange triggered on reconnection" + ); + } } NetworkEvent::ConnectionLost { device_id: peer_id, @@ -1229,7 +1372,9 @@ impl PeerSync { retry_queue: &Arc, db: &Arc, config: &Arc, - last_realtime_activity: &Arc>>>, + last_realtime_activity_per_peer: &Arc< + RwLock>>, + >, ) { if batch.is_empty() { return; @@ -1252,7 +1397,7 @@ impl PeerSync { retry_queue, db, config, - last_realtime_activity, + last_realtime_activity_per_peer, ) .await { @@ -1276,7 +1421,9 @@ impl PeerSync { retry_queue: &Arc, db: &Arc, config: &Arc, - last_realtime_activity: &Arc>>>, + last_realtime_activity_per_peer: &Arc< + RwLock>>, + >, ) -> Result<()> { let model_type: String = data .get("model_type") @@ -1416,14 +1563,16 @@ impl PeerSync { let results = join_all(send_futures).await; - // Process results + // Process results and mark per-peer activity let mut success_count = 0; let mut error_count = 0; + let mut successful_peers = Vec::new(); for (partner_uuid, result) in results { match result { Ok(()) => { success_count += 1; + successful_peers.push(partner_uuid); debug!(partner = %partner_uuid, "State change sent successfully"); } Err(e) => { @@ -1439,9 +1588,13 @@ impl PeerSync { } } - // Mark real-time activity if any broadcasts succeeded (lock mechanism) - if success_count > 0 { - *last_realtime_activity.write().await = Some(chrono::Utc::now()); + // Mark real-time activity per successful peer (prevents one stuck peer from blocking all catch-up) + if !successful_peers.is_empty() { + let now = chrono::Utc::now(); + let mut activity_map = last_realtime_activity_per_peer.write().await; + for peer_id in successful_peers { + activity_map.insert(peer_id, now); + } } info!( @@ -2724,7 +2877,7 @@ impl PeerSync { &self.retry_queue, &self.db, &self.config, - &self.last_realtime_activity, + &self.last_realtime_activity_per_peer, ) .await { diff --git a/core/tests/helpers/sync_transport.rs b/core/tests/helpers/sync_transport.rs index 35d53e975..a51c646bb 100644 --- a/core/tests/helpers/sync_transport.rs +++ b/core/tests/helpers/sync_transport.rs @@ -245,9 +245,26 @@ impl MockTransport { resource_watermarks: our_resource_watermarks, }; - self.send_sync_message(sender, response).await?; + self.send_sync_message(sender, response).await?; + } + SyncMessage::DataAvailableNotification { + device_id, + resource_types, + approx_count, + .. + } => { + info!( + from_device = %device_id, + resources = ?resource_types, + count = approx_count, + "Received data available notification in mock transport" + ); + // Trigger watermark exchange + if let Err(e) = sync_service.peer_sync().exchange_watermarks_and_catchup(device_id).await { + warn!(error = %e, "Failed to trigger watermark exchange from notification"); } - SyncMessage::WatermarkExchangeResponse { + } + SyncMessage::WatermarkExchangeResponse { library_id: _, device_id: peer_device_id, shared_watermark: peer_shared_watermark,