diff --git a/core/src/infra/sync/event_bus.rs b/core/src/infra/sync/event_bus.rs index b75780682..ccbcc160b 100644 --- a/core/src/infra/sync/event_bus.rs +++ b/core/src/infra/sync/event_bus.rs @@ -96,11 +96,11 @@ impl SyncEvent { impl SyncEventBus { /// Create a new sync event bus with large capacity /// - /// Capacity is 10,000 events - sync is critical and should rarely lag. - /// If lag occurs with this capacity, it indicates extreme system load or a bug. + /// Capacity is 100,000 events - sync is critical and should handle bulk indexing. + /// Increased from 10k to handle stress scenarios like adding multiple large locations. pub fn new() -> Self { - let (sender, _) = broadcast::channel(10_000); - debug!("Created sync event bus with capacity 10,000"); + let (sender, _) = broadcast::channel(100_000); + debug!("Created sync event bus with capacity 100,000"); Self { sender } } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index a6004b614..69e05a904 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -1409,7 +1409,7 @@ impl PeerSync { "CRITICAL: Sync event listener lagged! Data loss occurred. \ This likely includes StateChange or SharedChange events. \ Lost events may cause sync inconsistency - full backfill may be needed. \ - With 10k capacity, this indicates extreme system load or a bug." + With 100k capacity, this indicates extreme system load or a bug." ); last_lag_warning = now; @@ -1729,34 +1729,252 @@ impl PeerSync { let batch_size = batch.len(); info!( batch_size = batch_size, - "Flushing batched state changes (real-time batching optimization)" + "Flushing batched state changes with network batching (100x efficiency)" ); - // Process each state change in the batch + // Group changes by model_type for StateBatch messages + use std::collections::HashMap; + let mut batches_by_model: HashMap> = HashMap::new(); + for change_data in batch.drain(..) { - if let Err(e) = Self::handle_state_change_event_static( - library_id, - change_data, - network, - state, - buffer, - retry_queue, - db, - config, - last_realtime_activity_per_peer, - ) - .await - { - warn!(error = %e, "Failed to handle batched state change"); + if let Some(model_type) = change_data.get("model_type").and_then(|v| v.as_str()) { + batches_by_model + .entry(model_type.to_string()) + .or_default() + .push(change_data); + } else { + warn!("State change missing model_type, skipping"); } } + // Spawn a broadcast task per model_type batch + // This sends 1 StateBatch message instead of N StateChange messages + for (model_type, changes) in batches_by_model { + let library_id = library_id; + let network = network.clone(); + let state = state.clone(); + let buffer = buffer.clone(); + let retry_queue = retry_queue.clone(); + let db = db.clone(); + let config = config.clone(); + let last_realtime_activity_per_peer = last_realtime_activity_per_peer.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::broadcast_state_batch_static( + library_id, + model_type, + changes, + &network, + &state, + &buffer, + &retry_queue, + &db, + &config, + &last_realtime_activity_per_peer, + ) + .await + { + warn!(error = %e, "Failed to broadcast state batch in background task"); + } + }); + } + info!( batch_size = batch_size, - "Batched state changes flushed successfully" + "Batched state changes spawned to background tasks" ); } + /// Broadcast a batch of state changes as a single StateBatch message (static version) + async fn broadcast_state_batch_static( + library_id: Uuid, + model_type: String, + changes: Vec, + network: &Arc, + state: &Arc>, + buffer: &Arc, + retry_queue: &Arc, + db: &Arc, + config: &Arc, + last_realtime_activity_per_peer: &Arc< + RwLock>>, + >, + ) -> Result<()> { + use crate::service::network::protocol::sync::messages::{StateRecord, SyncMessage}; + + // Check if we should buffer during backfill + let current_state = state.read().await; + if current_state.should_buffer() { + drop(current_state); + debug!( + model_type = %model_type, + count = changes.len(), + "Buffering state batch during backfill" + ); + // Buffer individual changes for later + for change_data in changes { + if let (Some(record_uuid), Some(device_id), Some(data)) = ( + change_data.get("record_uuid").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok()), + change_data.get("device_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok()), + change_data.get("data"), + ) { + buffer + .push(super::state::BufferedUpdate::StateChange( + super::state::StateChangeMessage { + model_type: model_type.clone(), + record_uuid, + device_id, + data: data.clone(), + }, + )) + .await; + } + } + return Ok(()); + } + drop(current_state); + + // Extract device_id from first change (all changes in batch have same device_id) + let device_id = changes + .first() + .and_then(|c| c.get("device_id")) + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .ok_or_else(|| anyhow::anyhow!("Missing device_id in state batch"))?; + + // Convert changes to StateRecord format + let mut records = Vec::with_capacity(changes.len()); + for change_data in changes { + if let (Some(uuid_str), Some(data), Some(timestamp_str)) = ( + change_data.get("record_uuid").and_then(|v| v.as_str()), + change_data.get("data"), + change_data.get("timestamp").and_then(|v| v.as_str()), + ) { + if let (Ok(uuid), Ok(timestamp)) = ( + Uuid::parse_str(uuid_str), + chrono::DateTime::parse_from_rfc3339(timestamp_str), + ) { + records.push(StateRecord { + uuid, + data: data.clone(), + timestamp: timestamp.with_timezone(&chrono::Utc), + }); + } + } + } + + if records.is_empty() { + warn!(model_type = %model_type, "No valid records in state batch"); + return Ok(()); + } + + // Create StateBatch message + let message = SyncMessage::StateBatch { + library_id, + model_type: model_type.clone(), + device_id, + records: records.clone(), + }; + + // Get connected sync partners + let connected_partners = network + .get_connected_sync_partners(library_id, db) + .await + .map_err(|e| { + warn!(error = %e, "Failed to get connected partners for batch"); + e + })?; + + if connected_partners.is_empty() { + debug!( + model_type = %model_type, + records = records.len(), + "No connected partners for state batch" + ); + return Ok(()); + } + + debug!( + model_type = %model_type, + records = records.len(), + partners = connected_partners.len(), + "Broadcasting StateBatch to sync partners (100x network efficiency)" + ); + + // Broadcast to all partners in parallel + use futures::future::join_all; + + let timeout_secs = config.network.message_timeout_secs; + let send_futures: Vec<_> = connected_partners + .iter() + .map(|&partner| { + let network = network.clone(); + let msg = message.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + network.send_sync_message(partner, msg), + ) + .await + { + Ok(Ok(())) => (partner, Ok(())), + Ok(Err(e)) => (partner, Err(e)), + Err(_) => ( + partner, + Err(anyhow::anyhow!("Send timeout after {}s", timeout_secs)), + ), + } + } + }) + .collect(); + + let results = join_all(send_futures).await; + + // Process results + let mut success_count = 0; + let mut error_count = 0; + let mut successful_peers = Vec::new(); + + for (partner_uuid, result) in results { + match result { + Ok(()) => { + success_count += 1; + successful_peers.push(partner_uuid); + debug!(partner = %partner_uuid, "StateBatch sent successfully"); + } + Err(e) => { + error_count += 1; + warn!( + partner = %partner_uuid, + error = %e, + "Failed to send StateBatch to partner, enqueuing for retry" + ); + // Enqueue for retry + retry_queue.enqueue(partner_uuid, message.clone()).await; + } + } + } + + // Mark real-time activity per successful peer + if !successful_peers.is_empty() { + let now = chrono::Utc::now(); + let mut activity_map = last_realtime_activity_per_peer.write().await; + for peer_id in successful_peers { + activity_map.insert(peer_id, now); + } + } + + info!( + model_type = %model_type, + records = records.len(), + success = success_count, + errors = error_count, + "StateBatch broadcast complete (100x efficiency vs individual messages)" + ); + + Ok(()) + } + /// Handle state change event from TransactionManager (static version for spawned task) async fn handle_state_change_event_static( library_id: Uuid,