feat: add DataAvailableNotification for proactive data sync notifications

- Introduced a new `DataAvailableNotification` message type to notify peers of new data availability, triggering immediate watermark exchanges for synchronization.
- Enhanced the `PeerSync` implementation to handle notifications and manage real-time activity tracking per peer, improving synchronization efficiency.
- Updated the `SyncProtocolHandler` to process the new notification type and initiate catch-up logic accordingly.
- Implemented periodic watermark checks to ensure timely synchronization and prevent missed events during idle periods.
This commit is contained in:
Jamie Pine
2025-11-16 05:40:30 -08:00
parent a33ca1e884
commit 08d5e7badf
5 changed files with 288 additions and 74 deletions

View File

@@ -488,6 +488,31 @@ impl SyncProtocolHandler {
))
})?;
Ok(None)
}
SyncMessage::DataAvailableNotification {
library_id,
device_id,
resource_types,
approx_count,
} => {
info!(
from_device = %from_device,
resources = ?resource_types,
count = approx_count,
"Peer notified us of new data available, triggering watermark exchange"
);
// Trigger immediate watermark exchange to discover and sync new data
if let Err(e) = peer_sync.exchange_watermarks_and_catchup(from_device).await {
warn!(
from_device = %from_device,
error = %e,
"Failed to trigger watermark exchange after data available notification"
);
}
Ok(None)
}

View File

@@ -119,6 +119,19 @@ pub enum SyncMessage {
resource_watermarks: std::collections::HashMap<String, DateTime<Utc>>,
},
/// Proactive notification that sender has new data available
///
/// Triggers watermark exchange on receiver to discover and sync new data.
/// Sent after bulk operations complete (indexing, bulk tag application, etc.)
DataAvailableNotification {
library_id: Uuid,
device_id: Uuid, // Sender device
/// Hint about which resource types changed (for logging/metrics)
resource_types: Vec<String>,
/// Approximate count of changes (for logging/metrics)
approx_count: u64,
},
/// Error response
Error { library_id: Uuid, message: String },
}
@@ -147,6 +160,7 @@ impl SyncMessage {
| SyncMessage::Heartbeat { library_id, .. }
| SyncMessage::WatermarkExchangeRequest { library_id, .. }
| SyncMessage::WatermarkExchangeResponse { library_id, .. }
| SyncMessage::DataAvailableNotification { library_id, .. }
| SyncMessage::Error { library_id, .. } => *library_id,
}
}
@@ -171,6 +185,7 @@ impl SyncMessage {
| SyncMessage::SharedChange { .. }
| SyncMessage::SharedChangeBatch { .. }
| SyncMessage::AckSharedChanges { .. }
| SyncMessage::DataAvailableNotification { .. }
)
}
}

View File

@@ -347,51 +347,55 @@ impl SyncService {
}
};
// Check if real-time sync is active (lock mechanism)
// If real-time broadcasts are happening, skip catch-up to prevent duplication
let realtime_active = peer_sync.is_realtime_active().await;
// Pick first partner for catch-up check
let catch_up_peer = partners[0];
// Trigger catch-up if:
// - Real-time is NOT active (60+ seconds since last broadcast), AND
// - We haven't synced recently (fallback time check)
let should_catch_up = if realtime_active {
debug!("Skipping catch-up - real-time sync is active (lock mechanism)");
false
} else if let Some(last_sync) = our_device.last_sync_at {
let time_since_sync = chrono::Utc::now().signed_duration_since(last_sync);
time_since_sync.num_seconds() > 60
} else {
true
};
// Check if real-time sync is active for this specific peer
// Per-peer tracking prevents one stuck peer from blocking all catch-up
let realtime_active = peer_sync.is_realtime_active_for_peer(catch_up_peer).await;
// Check if we should retry based on exponential backoff
if should_catch_up && retry_state.should_retry() {
// Check if we should escalate to full backfill after repeated failures
if retry_state.should_escalate() {
warn!(
failures = retry_state.consecutive_failures,
"Too many catch-up failures, escalating to full backfill"
);
retry_state.record_success(); // Reset retry state
// Trigger catch-up if:
// - Real-time is NOT active (30+ seconds since last successful broadcast to this peer), AND
// - We haven't synced recently (fallback time check)
let should_catch_up = if realtime_active {
debug!(
peer = %catch_up_peer,
"Skipping catch-up - real-time sync to this peer is active"
);
false
} else if let Some(last_sync) = our_device.last_sync_at {
let time_since_sync = chrono::Utc::now().signed_duration_since(last_sync);
time_since_sync.num_seconds() > 60
} else {
true
};
// Transition to Uninitialized to trigger full backfill
let mut state = peer_sync.state.write().await;
*state = DeviceSyncState::Uninitialized;
backfill_attempted = false; // Allow backfill to run again
continue; // Skip to next iteration
}
// Get current watermarks from sync.db
let (state_watermark, shared_watermark) = peer_sync.get_watermarks().await;
info!(
"Triggering incremental catch-up since watermarks: state={:?}, shared={:?}",
state_watermark,
shared_watermark
// Check if we should retry based on exponential backoff
if should_catch_up && retry_state.should_retry() {
// Check if we should escalate to full backfill after repeated failures
if retry_state.should_escalate() {
warn!(
failures = retry_state.consecutive_failures,
"Too many catch-up failures, escalating to full backfill"
);
retry_state.record_success(); // Reset retry state
// Pick first partner for catch-up
let catch_up_peer = partners[0];
// Transition to Uninitialized to trigger full backfill
let mut state = peer_sync.state.write().await;
*state = DeviceSyncState::Uninitialized;
backfill_attempted = false; // Allow backfill to run again
continue; // Skip to next iteration
}
// Get current watermarks from sync.db
let (state_watermark, shared_watermark) = peer_sync.get_watermarks().await;
info!(
peer = %catch_up_peer,
"Triggering incremental catch-up since watermarks: state={:?}, shared={:?}",
state_watermark,
shared_watermark
);
// Transition to CatchingUp state
{

View File

@@ -126,9 +126,12 @@ pub struct PeerSync {
/// Buffer for updates during backfill/catch-up
buffer: Arc<BufferQueue>,
/// Last real-time broadcast activity (for catch-up lock mechanism)
/// When real-time broadcasts are active, catch-up is skipped to prevent duplication
last_realtime_activity: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
/// Last successful real-time broadcast per peer (for catch-up lock mechanism)
/// When real-time broadcasts to a specific peer are active, catch-up for that peer
/// is skipped to prevent duplication. Per-peer tracking prevents one stuck peer
/// from blocking recovery for all peers.
last_realtime_activity_per_peer:
Arc<RwLock<std::collections::HashMap<Uuid, chrono::DateTime<chrono::Utc>>>>,
/// HLC generator for this device
hlc_generator: Arc<tokio::sync::Mutex<HLCGenerator>>,
@@ -199,7 +202,9 @@ impl PeerSync {
network,
state: Arc::new(RwLock::new(DeviceSyncState::Uninitialized)),
buffer: Arc::new(BufferQueue::new()),
last_realtime_activity: Arc::new(RwLock::new(None)),
last_realtime_activity_per_peer: Arc::new(
RwLock::new(std::collections::HashMap::new()),
),
hlc_generator: Arc::new(tokio::sync::Mutex::new(HLCGenerator::new(device_id))),
peer_log,
watermark_store,
@@ -256,22 +261,31 @@ impl PeerSync {
self.peer_log.conn()
}
/// Check if real-time sync is currently active (lock mechanism)
/// Check if real-time sync is currently active for a specific peer
///
/// Returns true if real-time broadcasts happened in the last 60 seconds.
/// Returns true if real-time broadcasts to this peer succeeded in the last 30 seconds.
/// Used to prevent catch-up from overlapping with active real-time sync.
pub async fn is_realtime_active(&self) -> bool {
if let Some(last_activity) = *self.last_realtime_activity.read().await {
let elapsed = chrono::Utc::now().signed_duration_since(last_activity);
elapsed.num_seconds() < 60
/// Per-peer tracking prevents one stuck peer from blocking recovery for others.
pub async fn is_realtime_active_for_peer(&self, peer_id: Uuid) -> bool {
if let Some(last_activity) = self
.last_realtime_activity_per_peer
.read()
.await
.get(&peer_id)
{
let elapsed = chrono::Utc::now().signed_duration_since(*last_activity);
elapsed.num_seconds() < 30 // Reduced from 60s to 30s for faster recovery
} else {
false
}
}
/// Update real-time activity timestamp (called after successful broadcast)
async fn mark_realtime_activity(&self) {
*self.last_realtime_activity.write().await = Some(chrono::Utc::now());
/// Update real-time activity timestamp for a specific peer (called after successful broadcast)
async fn mark_realtime_activity_for_peer(&self, peer_id: Uuid) {
self.last_realtime_activity_per_peer
.write()
.await
.insert(peer_id, chrono::Utc::now());
}
/// Get per-resource watermark for a specific peer and resource type
@@ -441,6 +455,57 @@ impl PeerSync {
Ok(())
}
/// Notify all connected peers that we have new data available
///
/// This proactively triggers watermark exchange on peers after bulk operations.
/// Prevents the "20-minute idle" bug where events die and peers don't notice.
pub async fn notify_peers_of_new_data(
&self,
resource_types: Vec<String>,
approx_count: u64,
) -> Result<()> {
let connected_partners = self
.network
.get_connected_sync_partners(self.library_id, &self.db)
.await?;
if connected_partners.is_empty() {
debug!("No connected partners, skipping data available notification");
return Ok(());
}
info!(
resources = ?resource_types,
count = approx_count,
peer_count = connected_partners.len(),
"Notifying peers of new data available"
);
let notification = SyncMessage::DataAvailableNotification {
library_id: self.library_id,
device_id: self.device_id,
resource_types,
approx_count,
};
// Broadcast to all connected peers (fire and forget - they'll request via watermark exchange)
for peer_id in connected_partners {
if let Err(e) = self
.network
.send_sync_message(peer_id, notification.clone())
.await
{
warn!(
peer = %peer_id,
error = %e,
"Failed to send data available notification"
);
}
}
Ok(())
}
/// Exchange watermarks with a peer and trigger catch-up if needed
///
/// Sends a WatermarkExchangeRequest to the peer with our watermarks.
@@ -726,6 +791,9 @@ impl PeerSync {
// Start background task for periodic log pruning
self.start_log_pruner();
// Start periodic watermark check (safety net for missed events)
self.start_periodic_watermark_check();
Ok(())
}
@@ -811,6 +879,69 @@ impl PeerSync {
});
}
/// Start periodic watermark check (safety net for missed events)
///
/// Exchanges watermarks with all connected peers every 5 minutes to ensure
/// sync divergence is detected even if events are dropped or broadcasts fail.
fn start_periodic_watermark_check(&self) {
let library_id = self.library_id;
let device_id = self.device_id;
let network = self.network.clone();
let db = self.db.clone();
let peer_log = self.peer_log.clone();
let is_running = self.is_running.clone();
tokio::spawn(async move {
info!("Started periodic watermark check (every 5 minutes)");
// Wait 5 minutes before first check (allow initial sync to complete)
tokio::time::sleep(tokio::time::Duration::from_secs(300)).await;
while is_running.load(Ordering::SeqCst) {
// Check every 5 minutes
tokio::time::sleep(tokio::time::Duration::from_secs(300)).await;
// Get connected sync partners
match network.get_connected_sync_partners(library_id, &db).await {
Ok(partners) if !partners.is_empty() => {
debug!(
partner_count = partners.len(),
"Running periodic watermark check"
);
// Exchange watermarks with all peers
for peer_id in partners {
if let Err(e) = Self::trigger_watermark_exchange(
library_id, device_id, peer_id, &peer_log, &network,
)
.await
{
debug!(
peer = %peer_id,
error = %e,
"Periodic watermark check failed for peer"
);
} else {
debug!(
peer = %peer_id,
"Periodic watermark exchange triggered"
);
}
}
}
Ok(_) => {
debug!("No connected partners for periodic watermark check");
}
Err(e) => {
warn!(error = %e, "Failed to get connected partners for periodic watermark check");
}
}
}
info!("Periodic watermark check stopped");
});
}
/// Start event listener for TransactionManager sync events
fn start_event_listener(&self) {
// Clone necessary fields for the spawned task
@@ -822,7 +953,7 @@ impl PeerSync {
);
let state = self.state.clone();
let buffer = self.buffer.clone();
let last_realtime_activity = self.last_realtime_activity.clone();
let last_realtime_activity_per_peer = self.last_realtime_activity_per_peer.clone();
let db = self.db.clone();
let event_bus_for_emit = self.event_bus.clone();
let retry_queue = self.retry_queue.clone();
@@ -891,7 +1022,7 @@ impl PeerSync {
&retry_queue,
&db,
&config,
&last_realtime_activity,
&last_realtime_activity_per_peer,
).await;
}
}
@@ -977,7 +1108,7 @@ impl PeerSync {
&retry_queue,
&db,
&config,
&last_realtime_activity,
&last_realtime_activity_per_peer,
).await;
}
}
@@ -999,7 +1130,7 @@ impl PeerSync {
&retry_queue,
&db,
&config,
&last_realtime_activity,
&last_realtime_activity_per_peer,
)
.await;
}
@@ -1023,6 +1154,7 @@ impl PeerSync {
let library_id = self.library_id;
let device_id = self.device_id;
let network = self.network.clone();
let peer_log = self.peer_log.clone(); // CRITICAL FIX: Pass peer_log to spawned task
tokio::spawn(async move {
info!("PeerSync network event listener started");
@@ -1053,13 +1185,24 @@ impl PeerSync {
);
}
// Trigger watermark exchange for reconnection sync
// Note: We need peer_log reference here, which is not available in static context
// This will be triggered via PeerSync::exchange_watermarks_and_catchup() instead
debug!(
peer_id = %peer_id,
"Skipping watermark exchange from network listener (will be handled by PeerSync)"
);
// CRITICAL FIX: Actually trigger watermark exchange on reconnection
// This fixes the 20-minute idle bug where events die but no recovery happens
if let Err(e) = Self::trigger_watermark_exchange(
library_id, device_id, peer_id, &peer_log, &network,
)
.await
{
warn!(
peer_id = %peer_id,
error = %e,
"Failed to trigger watermark exchange on reconnection"
);
} else {
info!(
peer_id = %peer_id,
"Watermark exchange triggered on reconnection"
);
}
}
NetworkEvent::ConnectionLost {
device_id: peer_id,
@@ -1229,7 +1372,9 @@ impl PeerSync {
retry_queue: &Arc<RetryQueue>,
db: &Arc<sea_orm::DatabaseConnection>,
config: &Arc<crate::infra::sync::SyncConfig>,
last_realtime_activity: &Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
last_realtime_activity_per_peer: &Arc<
RwLock<std::collections::HashMap<Uuid, chrono::DateTime<chrono::Utc>>>,
>,
) {
if batch.is_empty() {
return;
@@ -1252,7 +1397,7 @@ impl PeerSync {
retry_queue,
db,
config,
last_realtime_activity,
last_realtime_activity_per_peer,
)
.await
{
@@ -1276,7 +1421,9 @@ impl PeerSync {
retry_queue: &Arc<RetryQueue>,
db: &Arc<sea_orm::DatabaseConnection>,
config: &Arc<crate::infra::sync::SyncConfig>,
last_realtime_activity: &Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
last_realtime_activity_per_peer: &Arc<
RwLock<std::collections::HashMap<Uuid, chrono::DateTime<chrono::Utc>>>,
>,
) -> Result<()> {
let model_type: String = data
.get("model_type")
@@ -1416,14 +1563,16 @@ impl PeerSync {
let results = join_all(send_futures).await;
// Process results
// Process results and mark per-peer activity
let mut success_count = 0;
let mut error_count = 0;
let mut successful_peers = Vec::new();
for (partner_uuid, result) in results {
match result {
Ok(()) => {
success_count += 1;
successful_peers.push(partner_uuid);
debug!(partner = %partner_uuid, "State change sent successfully");
}
Err(e) => {
@@ -1439,9 +1588,13 @@ impl PeerSync {
}
}
// Mark real-time activity if any broadcasts succeeded (lock mechanism)
if success_count > 0 {
*last_realtime_activity.write().await = Some(chrono::Utc::now());
// Mark real-time activity per successful peer (prevents one stuck peer from blocking all catch-up)
if !successful_peers.is_empty() {
let now = chrono::Utc::now();
let mut activity_map = last_realtime_activity_per_peer.write().await;
for peer_id in successful_peers {
activity_map.insert(peer_id, now);
}
}
info!(
@@ -2724,7 +2877,7 @@ impl PeerSync {
&self.retry_queue,
&self.db,
&self.config,
&self.last_realtime_activity,
&self.last_realtime_activity_per_peer,
)
.await
{

View File

@@ -245,9 +245,26 @@ impl MockTransport {
resource_watermarks: our_resource_watermarks,
};
self.send_sync_message(sender, response).await?;
self.send_sync_message(sender, response).await?;
}
SyncMessage::DataAvailableNotification {
device_id,
resource_types,
approx_count,
..
} => {
info!(
from_device = %device_id,
resources = ?resource_types,
count = approx_count,
"Received data available notification in mock transport"
);
// Trigger watermark exchange
if let Err(e) = sync_service.peer_sync().exchange_watermarks_and_catchup(device_id).await {
warn!(error = %e, "Failed to trigger watermark exchange from notification");
}
SyncMessage::WatermarkExchangeResponse {
}
SyncMessage::WatermarkExchangeResponse {
library_id: _,
device_id: peer_device_id,
shared_watermark: peer_shared_watermark,