refactor: increase sync event bus capacity and optimize state change broadcasting

- Updated the sync event bus capacity from 10,000 to 100,000 events to better handle bulk indexing and stress scenarios.
- Enhanced the state change broadcasting logic by grouping changes by model type, allowing for more efficient processing and reducing network load through batched StateBatch messages.
- Improved logging to reflect the new capacity and efficiency gains in the broadcasting process.
This commit is contained in:
Jamie Pine
2025-11-16 14:07:46 -08:00
parent 887c2d1c0d
commit efe52f7f1f
2 changed files with 240 additions and 22 deletions

View File

@@ -96,11 +96,11 @@ impl SyncEvent {
impl SyncEventBus {
/// Create a new sync event bus with large capacity
///
/// Capacity is 10,000 events - sync is critical and should rarely lag.
/// If lag occurs with this capacity, it indicates extreme system load or a bug.
/// Capacity is 100,000 events - sync is critical and should handle bulk indexing.
/// Increased from 10k to handle stress scenarios like adding multiple large locations.
pub fn new() -> Self {
let (sender, _) = broadcast::channel(10_000);
debug!("Created sync event bus with capacity 10,000");
let (sender, _) = broadcast::channel(100_000);
debug!("Created sync event bus with capacity 100,000");
Self { sender }
}

View File

@@ -1409,7 +1409,7 @@ impl PeerSync {
"CRITICAL: Sync event listener lagged! Data loss occurred. \
This likely includes StateChange or SharedChange events. \
Lost events may cause sync inconsistency - full backfill may be needed. \
With 10k capacity, this indicates extreme system load or a bug."
With 100k capacity, this indicates extreme system load or a bug."
);
last_lag_warning = now;
@@ -1729,34 +1729,252 @@ impl PeerSync {
let batch_size = batch.len();
info!(
batch_size = batch_size,
"Flushing batched state changes (real-time batching optimization)"
"Flushing batched state changes with network batching (100x efficiency)"
);
// Process each state change in the batch
// Group changes by model_type for StateBatch messages
use std::collections::HashMap;
let mut batches_by_model: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
for change_data in batch.drain(..) {
if let Err(e) = Self::handle_state_change_event_static(
library_id,
change_data,
network,
state,
buffer,
retry_queue,
db,
config,
last_realtime_activity_per_peer,
)
.await
{
warn!(error = %e, "Failed to handle batched state change");
if let Some(model_type) = change_data.get("model_type").and_then(|v| v.as_str()) {
batches_by_model
.entry(model_type.to_string())
.or_default()
.push(change_data);
} else {
warn!("State change missing model_type, skipping");
}
}
// Spawn a broadcast task per model_type batch
// This sends 1 StateBatch message instead of N StateChange messages
for (model_type, changes) in batches_by_model {
let library_id = library_id;
let network = network.clone();
let state = state.clone();
let buffer = buffer.clone();
let retry_queue = retry_queue.clone();
let db = db.clone();
let config = config.clone();
let last_realtime_activity_per_peer = last_realtime_activity_per_peer.clone();
tokio::spawn(async move {
if let Err(e) = Self::broadcast_state_batch_static(
library_id,
model_type,
changes,
&network,
&state,
&buffer,
&retry_queue,
&db,
&config,
&last_realtime_activity_per_peer,
)
.await
{
warn!(error = %e, "Failed to broadcast state batch in background task");
}
});
}
info!(
batch_size = batch_size,
"Batched state changes flushed successfully"
"Batched state changes spawned to background tasks"
);
}
/// Broadcast a batch of state changes as a single StateBatch message (static version)
async fn broadcast_state_batch_static(
library_id: Uuid,
model_type: String,
changes: Vec<serde_json::Value>,
network: &Arc<dyn NetworkTransport>,
state: &Arc<RwLock<DeviceSyncState>>,
buffer: &Arc<BufferQueue>,
retry_queue: &Arc<RetryQueue>,
db: &Arc<sea_orm::DatabaseConnection>,
config: &Arc<crate::infra::sync::SyncConfig>,
last_realtime_activity_per_peer: &Arc<
RwLock<std::collections::HashMap<Uuid, chrono::DateTime<chrono::Utc>>>,
>,
) -> Result<()> {
use crate::service::network::protocol::sync::messages::{StateRecord, SyncMessage};
// Check if we should buffer during backfill
let current_state = state.read().await;
if current_state.should_buffer() {
drop(current_state);
debug!(
model_type = %model_type,
count = changes.len(),
"Buffering state batch during backfill"
);
// Buffer individual changes for later
for change_data in changes {
if let (Some(record_uuid), Some(device_id), Some(data)) = (
change_data.get("record_uuid").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok()),
change_data.get("device_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok()),
change_data.get("data"),
) {
buffer
.push(super::state::BufferedUpdate::StateChange(
super::state::StateChangeMessage {
model_type: model_type.clone(),
record_uuid,
device_id,
data: data.clone(),
},
))
.await;
}
}
return Ok(());
}
drop(current_state);
// Extract device_id from first change (all changes in batch have same device_id)
let device_id = changes
.first()
.and_then(|c| c.get("device_id"))
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.ok_or_else(|| anyhow::anyhow!("Missing device_id in state batch"))?;
// Convert changes to StateRecord format
let mut records = Vec::with_capacity(changes.len());
for change_data in changes {
if let (Some(uuid_str), Some(data), Some(timestamp_str)) = (
change_data.get("record_uuid").and_then(|v| v.as_str()),
change_data.get("data"),
change_data.get("timestamp").and_then(|v| v.as_str()),
) {
if let (Ok(uuid), Ok(timestamp)) = (
Uuid::parse_str(uuid_str),
chrono::DateTime::parse_from_rfc3339(timestamp_str),
) {
records.push(StateRecord {
uuid,
data: data.clone(),
timestamp: timestamp.with_timezone(&chrono::Utc),
});
}
}
}
if records.is_empty() {
warn!(model_type = %model_type, "No valid records in state batch");
return Ok(());
}
// Create StateBatch message
let message = SyncMessage::StateBatch {
library_id,
model_type: model_type.clone(),
device_id,
records: records.clone(),
};
// Get connected sync partners
let connected_partners = network
.get_connected_sync_partners(library_id, db)
.await
.map_err(|e| {
warn!(error = %e, "Failed to get connected partners for batch");
e
})?;
if connected_partners.is_empty() {
debug!(
model_type = %model_type,
records = records.len(),
"No connected partners for state batch"
);
return Ok(());
}
debug!(
model_type = %model_type,
records = records.len(),
partners = connected_partners.len(),
"Broadcasting StateBatch to sync partners (100x network efficiency)"
);
// Broadcast to all partners in parallel
use futures::future::join_all;
let timeout_secs = config.network.message_timeout_secs;
let send_futures: Vec<_> = connected_partners
.iter()
.map(|&partner| {
let network = network.clone();
let msg = message.clone();
async move {
match tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
network.send_sync_message(partner, msg),
)
.await
{
Ok(Ok(())) => (partner, Ok(())),
Ok(Err(e)) => (partner, Err(e)),
Err(_) => (
partner,
Err(anyhow::anyhow!("Send timeout after {}s", timeout_secs)),
),
}
}
})
.collect();
let results = join_all(send_futures).await;
// Process results
let mut success_count = 0;
let mut error_count = 0;
let mut successful_peers = Vec::new();
for (partner_uuid, result) in results {
match result {
Ok(()) => {
success_count += 1;
successful_peers.push(partner_uuid);
debug!(partner = %partner_uuid, "StateBatch sent successfully");
}
Err(e) => {
error_count += 1;
warn!(
partner = %partner_uuid,
error = %e,
"Failed to send StateBatch to partner, enqueuing for retry"
);
// Enqueue for retry
retry_queue.enqueue(partner_uuid, message.clone()).await;
}
}
}
// Mark real-time activity per successful peer
if !successful_peers.is_empty() {
let now = chrono::Utc::now();
let mut activity_map = last_realtime_activity_per_peer.write().await;
for peer_id in successful_peers {
activity_map.insert(peer_id, now);
}
}
info!(
model_type = %model_type,
records = records.len(),
success = success_count,
errors = error_count,
"StateBatch broadcast complete (100x efficiency vs individual messages)"
);
Ok(())
}
/// Handle state change event from TransactionManager (static version for spawned task)
async fn handle_state_change_event_static(
library_id: Uuid,