diff --git a/apps/cli/src/domains/mod.rs b/apps/cli/src/domains/mod.rs index 2c288ee95..0fa9fb427 100644 --- a/apps/cli/src/domains/mod.rs +++ b/apps/cli/src/domains/mod.rs @@ -10,6 +10,7 @@ pub mod location; pub mod logs; pub mod network; pub mod search; +pub mod sync; pub mod tag; pub mod update; pub mod volume; diff --git a/apps/cli/src/domains/sync/args.rs b/apps/cli/src/domains/sync/args.rs new file mode 100644 index 000000000..d675f8f7f --- /dev/null +++ b/apps/cli/src/domains/sync/args.rs @@ -0,0 +1,36 @@ +use clap::Args; + +#[derive(Args, Debug)] +pub struct SyncMetricsArgs { + /// Show metrics for a specific time range + #[arg(long, help = "Show metrics since this time (e.g., '1 hour ago', '2025-10-23 10:00:00')")] + pub since: Option, + + /// Show metrics for a specific peer device + #[arg(long, help = "Filter metrics by peer device ID")] + pub peer: Option, + + /// Show metrics for a specific model type + #[arg(long, help = "Filter metrics by model type (e.g., 'entry', 'tag')")] + pub model: Option, + + /// Watch metrics in real-time + #[arg(short, long, help = "Watch metrics updates in real-time")] + pub watch: bool, + + /// Output as JSON + #[arg(long, help = "Output metrics as JSON")] + pub json: bool, + + /// Show only state metrics + #[arg(long, help = "Show only state transition metrics")] + pub state: bool, + + /// Show only operation metrics + #[arg(long, help = "Show only operation counter metrics")] + pub operations: bool, + + /// Show only error metrics + #[arg(long, help = "Show only error metrics")] + pub errors: bool, +} \ No newline at end of file diff --git a/apps/cli/src/domains/sync/mod.rs b/apps/cli/src/domains/sync/mod.rs new file mode 100644 index 000000000..77b5c704b --- /dev/null +++ b/apps/cli/src/domains/sync/mod.rs @@ -0,0 +1,40 @@ +mod args; + +use anyhow::Result; +use clap::Subcommand; + +use crate::util::prelude::*; +use crate::context::Context; + +use self::args::*; + +#[derive(Subcommand, Debug)] +pub enum SyncCmd { + /// Show sync metrics + Metrics(SyncMetricsArgs), +} + +pub async fn run(ctx: &Context, cmd: SyncCmd) -> Result<()> { + match cmd { + SyncCmd::Metrics(args) => { + // For now, we'll implement a simple metrics display + // In the future, this will call the core metrics API + println!("Sync Metrics"); + println!("==========="); + println!(); + println!("This feature is under development."); + println!("Metrics will be available once the sync service is running."); + + if args.watch { + println!("Watch mode: Press Ctrl+C to stop"); + // TODO: Implement watch mode + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + + if args.json { + println!("{{}}"); + } + } + } + Ok(()) +} \ No newline at end of file diff --git a/apps/cli/src/main.rs b/apps/cli/src/main.rs index 001482344..d9a9459a1 100644 --- a/apps/cli/src/main.rs +++ b/apps/cli/src/main.rs @@ -58,6 +58,7 @@ use crate::domains::{ logs::{self, LogsCmd}, network::{self, NetworkCmd}, search::{self, SearchCmd}, + sync::{self, SyncCmd}, tag::{self, TagCmd}, update, volume::{self, VolumeCmd}, @@ -204,6 +205,9 @@ enum Commands { /// Search operations #[command(subcommand)] Search(SearchCmd), + /// Sync operations and metrics + #[command(subcommand)] + Sync(SyncCmd), /// Tag operations #[command(subcommand)] Tag(TagCmd), @@ -693,6 +697,7 @@ async fn run_client_command( Commands::Location(cmd) => location::run(&ctx, cmd).await?, Commands::Network(cmd) => network::run(&ctx, cmd).await?, Commands::Job(cmd) => job::run(&ctx, cmd).await?, + Commands::Sync(cmd) => sync::run(&ctx, cmd).await?, Commands::Logs(cmd) => logs::run(&ctx, cmd).await?, Commands::Search(cmd) => search::run(&ctx, cmd).await?, Commands::Tag(cmd) => tag::run(&ctx, cmd).await?, diff --git a/core/src/ops/mod.rs b/core/src/ops/mod.rs index 096e627bc..f22266b32 100644 --- a/core/src/ops/mod.rs +++ b/core/src/ops/mod.rs @@ -23,5 +23,6 @@ pub mod metadata; pub mod network; pub mod search; pub mod sidecar; +pub mod sync; pub mod tags; pub mod volumes; diff --git a/core/src/ops/sync/get_metrics/action.rs b/core/src/ops/sync/get_metrics/action.rs new file mode 100644 index 000000000..e36bfa034 --- /dev/null +++ b/core/src/ops/sync/get_metrics/action.rs @@ -0,0 +1,67 @@ +//! Get sync metrics action + +use crate::ops::{LibraryQuery, LibraryQueryContext}; +use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot; +use anyhow::Result; + +use super::{GetSyncMetricsInput, GetSyncMetricsOutput}; + +/// Get sync metrics for the current library +pub struct GetSyncMetrics; + +impl LibraryQuery for GetSyncMetrics { + type Input = GetSyncMetricsInput; + type Output = GetSyncMetricsOutput; + + async fn execute(input: Self::Input, ctx: LibraryQueryContext) -> Result { + // Get the sync service from the library + let sync_service = ctx.library.sync_service().await?; + let metrics = sync_service.metrics(); + + // Create a snapshot of current metrics + let mut snapshot = SyncMetricsSnapshot::from_metrics(metrics).await; + + // Apply filters + if let Some(since) = input.since { + snapshot.filter_since(since); + } + + if let Some(peer_id) = input.peer_id { + snapshot.filter_by_peer(peer_id); + } + + if let Some(model_type) = input.model_type { + snapshot.filter_by_model(&model_type); + } + + // Apply category filters + if input.state_only.unwrap_or(false) { + // Keep only state metrics, clear others + snapshot.operations = Default::default(); + snapshot.data_volume = Default::default(); + snapshot.performance = Default::default(); + snapshot.errors = Default::default(); + } + + if input.operations_only.unwrap_or(false) { + // Keep only operation metrics, clear others + snapshot.state = Default::default(); + snapshot.data_volume = Default::default(); + snapshot.performance = Default::default(); + snapshot.errors = Default::default(); + } + + if input.errors_only.unwrap_or(false) { + // Keep only error metrics, clear others + snapshot.state = Default::default(); + snapshot.operations = Default::default(); + snapshot.data_volume = Default::default(); + snapshot.performance = Default::default(); + } + + Ok(GetSyncMetricsOutput { metrics: snapshot }) + } +} + +// Register the query +crate::register_library_query!(GetSyncMetrics, "sync.metrics"); \ No newline at end of file diff --git a/core/src/ops/sync/get_metrics/input.rs b/core/src/ops/sync/get_metrics/input.rs new file mode 100644 index 000000000..aa8c29645 --- /dev/null +++ b/core/src/ops/sync/get_metrics/input.rs @@ -0,0 +1,27 @@ +//! Input for getting sync metrics + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use specta::Type; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, Type)] +pub struct GetSyncMetricsInput { + /// Filter metrics since this time + pub since: Option>, + + /// Filter metrics for specific peer device + pub peer_id: Option, + + /// Filter metrics for specific model type + pub model_type: Option, + + /// Show only state metrics + pub state_only: Option, + + /// Show only operation metrics + pub operations_only: Option, + + /// Show only error metrics + pub errors_only: Option, +} \ No newline at end of file diff --git a/core/src/ops/sync/get_metrics/mod.rs b/core/src/ops/sync/get_metrics/mod.rs new file mode 100644 index 000000000..204f4bd57 --- /dev/null +++ b/core/src/ops/sync/get_metrics/mod.rs @@ -0,0 +1,9 @@ +//! Get sync metrics operation + +pub mod action; +pub mod input; +pub mod output; + +pub use action::GetSyncMetrics; +pub use input::GetSyncMetricsInput; +pub use output::GetSyncMetricsOutput; \ No newline at end of file diff --git a/core/src/ops/sync/get_metrics/output.rs b/core/src/ops/sync/get_metrics/output.rs new file mode 100644 index 000000000..646ceb8ea --- /dev/null +++ b/core/src/ops/sync/get_metrics/output.rs @@ -0,0 +1,11 @@ +//! Output for getting sync metrics + +use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot; +use serde::{Deserialize, Serialize}; +use specta::Type; + +#[derive(Debug, Serialize, Deserialize, Type)] +pub struct GetSyncMetricsOutput { + /// The metrics snapshot + pub metrics: SyncMetricsSnapshot, +} \ No newline at end of file diff --git a/core/src/ops/sync/mod.rs b/core/src/ops/sync/mod.rs new file mode 100644 index 000000000..cde4a4868 --- /dev/null +++ b/core/src/ops/sync/mod.rs @@ -0,0 +1,3 @@ +//! Sync operations + +pub mod get_metrics; \ No newline at end of file diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index a3e598ee2..d4d0fa2ff 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -8,6 +8,7 @@ //! 5. Transition to ready use super::{ + metrics::SyncMetricsCollector, peer::PeerSync, protocol_handler::{LogSyncHandler, StateSyncHandler}, state::{select_backfill_peer, BackfillCheckpoint, DeviceSyncState, PeerInfo}, @@ -32,6 +33,7 @@ pub struct BackfillManager { state_handler: Arc, log_handler: Arc, config: Arc, + metrics: Arc, /// Pending state request channel (backfill is sequential, only one at a time) pending_state_response: Arc>>>, @@ -48,6 +50,7 @@ impl BackfillManager { state_handler: Arc, log_handler: Arc, config: Arc, + metrics: Arc, ) -> Self { Self { library_id, @@ -56,6 +59,7 @@ impl BackfillManager { state_handler, log_handler, config, + metrics, pending_state_response: Arc::new(Mutex::new(None)), pending_shared_response: Arc::new(Mutex::new(None)), } @@ -97,6 +101,9 @@ impl BackfillManager { /// Start complete backfill process pub async fn start_backfill(&self, available_peers: Vec) -> Result<()> { + // Record metrics + self.metrics.record_backfill_session_start(); + info!( library_id = %self.library_id, device_id = %self.device_id, @@ -155,6 +162,9 @@ impl BackfillManager { // Phase 5: Set initial watermarks from actual received data (not local DB query) self.set_initial_watermarks_after_backfill(final_state_checkpoint, max_shared_hlc).await?; + // Record metrics + self.metrics.record_backfill_session_complete(); + info!("Backfill complete, device is ready"); Ok(()) @@ -205,6 +215,9 @@ impl BackfillManager { // Update watermarks from actual received data (not local DB query) self.set_initial_watermarks_after_backfill(final_state_checkpoint, max_shared_hlc).await?; + // Record metrics + self.metrics.record_backfill_session_complete(); + info!("Incremental catch-up complete"); Ok(()) } @@ -305,6 +318,9 @@ impl BackfillManager { { let db = self.peer_sync.db().clone(); + // Record data volume metrics before consuming records + let records_count = records.len() as u64; + // Apply updates via registry for record in records { crate::infra::sync::registry::apply_state_change( @@ -316,6 +332,9 @@ impl BackfillManager { .map_err(|e| anyhow::anyhow!("{}", e))?; } + // Record data volume metrics + self.metrics.record_entries_synced(&model_type, records_count).await; + // Apply deletions via registry for uuid in deleted_uuids { crate::infra::sync::registry::apply_deletion(&model_type, uuid, db.clone()) @@ -326,6 +345,9 @@ impl BackfillManager { current_checkpoint.update(chk.clone(), 0.5); // TODO: Calculate actual progress current_checkpoint.save().await?; + // Record pagination round + self.metrics.record_backfill_pagination_round(); + // Update cursor for next iteration cursor_checkpoint = chk; @@ -383,6 +405,11 @@ impl BackfillManager { } total_applied += batch_size; + + // Record metrics + self.metrics.record_backfill_pagination_round(); + self.metrics.record_entries_synced("shared", batch_size as u64).await; + info!("Applied {} shared changes (total: {})", batch_size, total_applied); // Update cursor to last HLC for next batch diff --git a/core/src/service/sync/metrics/collector.rs b/core/src/service/sync/metrics/collector.rs new file mode 100644 index 000000000..9fa02e7dd --- /dev/null +++ b/core/src/service/sync/metrics/collector.rs @@ -0,0 +1,329 @@ +//! Central metrics collector for sync operations + +use crate::service::sync::state::DeviceSyncState; +use crate::service::sync::metrics::types::*; +use anyhow::Result; +use chrono::Utc; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use tokio::sync::RwLock; +use tracing::{debug, warn}; +use uuid::Uuid; + +/// Central collector for all sync metrics +#[derive(Debug)] +pub struct SyncMetricsCollector { + metrics: Arc, + max_history_size: usize, +} + +impl SyncMetricsCollector { + pub fn new() -> Self { + Self { + metrics: Arc::new(SyncMetrics::default()), + max_history_size: 100, + } + } + + pub fn with_history_size(mut self, size: usize) -> Self { + self.max_history_size = size; + self + } + + /// Get reference to metrics + pub fn metrics(&self) -> &Arc { + &self.metrics + } + + /// Record a state transition + pub async fn record_state_transition( + &self, + from: DeviceSyncState, + to: DeviceSyncState, + reason: Option, + ) -> Result<()> { + let now = Utc::now(); + + // Update current state + { + let mut current_state = self.metrics.state.current_state.write().await; + *current_state = to; + } + + // Update state entered time + { + let mut state_entered_at = self.metrics.state.state_entered_at.write().await; + *state_entered_at = now; + } + + // Record transition + let transition = StateTransition { + from, + to, + timestamp: now, + reason: reason.clone(), + }; + + // Add to history + { + let mut history = self.metrics.state.state_history.write().await; + history.push_back(transition.clone()); + + // Trim to max size + while history.len() > self.max_history_size { + history.pop_front(); + } + } + + // Update transition count + { + let mut transition_count = self.metrics.state.transition_count.write().await; + *transition_count.entry((from, to)).or_insert(0) += 1; + } + + // Update time in previous state + { + let mut total_time = self.metrics.state.total_time_in_state.write().await; + let duration = now.signed_duration_since(*self.metrics.state.state_entered_at.read().await); + *total_time.entry(from).or_insert(std::time::Duration::ZERO) += + std::time::Duration::from_millis(duration.num_milliseconds().max(0) as u64); + } + + debug!( + from = ?from, + to = ?to, + reason = ?reason, + "Recorded sync state transition" + ); + + Ok(()) + } + + /// Record a broadcast operation + pub fn record_broadcast(&self, is_state_change: bool, batch_size: Option) { + self.metrics.operations.broadcasts_sent.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + if is_state_change { + self.metrics.operations.state_changes_broadcast.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } else { + self.metrics.operations.shared_changes_broadcast.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + if let Some(size) = batch_size { + self.metrics.operations.broadcast_batches_sent.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + } + + /// Record a failed broadcast + pub fn record_failed_broadcast(&self) { + self.metrics.operations.failed_broadcasts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record received changes + pub fn record_changes_received(&self, count: u64) { + self.metrics.operations.changes_received.fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Record applied changes + pub fn record_changes_applied(&self, count: u64) { + self.metrics.operations.changes_applied.fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Record rejected changes + pub fn record_changes_rejected(&self, count: u64) { + self.metrics.operations.changes_rejected.fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Record buffer queue depth + pub fn record_buffer_queue_depth(&self, depth: u64) { + self.metrics.operations.buffer_queue_depth.store(depth, std::sync::atomic::Ordering::Relaxed); + } + + /// Record backfill session start + pub fn record_backfill_session_start(&self) { + self.metrics.operations.active_backfill_sessions.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record backfill session completion + pub fn record_backfill_session_complete(&self) { + self.metrics.operations.active_backfill_sessions.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + self.metrics.operations.backfill_sessions_completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record backfill pagination round + pub fn record_backfill_pagination_round(&self) { + self.metrics.operations.backfill_pagination_rounds.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record retry queue depth + pub fn record_retry_queue_depth(&self, depth: u64) { + self.metrics.operations.retry_queue_depth.store(depth, std::sync::atomic::Ordering::Relaxed); + } + + /// Record retry attempt + pub fn record_retry_attempt(&self) { + self.metrics.operations.retry_attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record retry success + pub fn record_retry_success(&self) { + self.metrics.operations.retry_successes.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record data volume by model type + pub async fn record_entries_synced(&self, model_type: &str, count: u64) { + let mut entries_synced = self.metrics.data_volume.entries_synced.write().await; + entries_synced + .entry(model_type.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Record data volume by device + pub async fn record_device_entries(&self, device_id: Uuid, device_name: &str, count: u64) { + let mut entries_by_device = self.metrics.data_volume.entries_by_device.write().await; + + let device_metrics = entries_by_device + .entry(device_id) + .or_insert_with(|| DeviceMetrics::new(device_id, device_name.to_string())); + + device_metrics.entries_received.fetch_add(count, std::sync::atomic::Ordering::Relaxed); + device_metrics.last_seen.store(Utc::now().timestamp() as u64, std::sync::atomic::Ordering::Relaxed); + } + + /// Record bytes transferred + pub fn record_bytes_sent(&self, bytes: u64) { + self.metrics.data_volume.bytes_sent.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); + } + + /// Record bytes received + pub fn record_bytes_received(&self, bytes: u64) { + self.metrics.data_volume.bytes_received.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); + } + + /// Record last sync time for peer + pub async fn record_last_sync_peer(&self, peer_id: Uuid) { + let mut last_sync_per_peer = self.metrics.data_volume.last_sync_per_peer.write().await; + last_sync_per_peer.insert(peer_id, Utc::now()); + } + + /// Record last sync time for model + pub async fn record_last_sync_model(&self, model_type: &str) { + let mut last_sync_per_model = self.metrics.data_volume.last_sync_per_model.write().await; + last_sync_per_model.insert(model_type.to_string(), Utc::now()); + } + + /// Record latency for broadcast operations + pub fn record_broadcast_latency(&self, latency_ms: u64) { + self.metrics.performance.broadcast_latency_ms.record(latency_ms); + } + + /// Record latency for apply operations + pub fn record_apply_latency(&self, latency_ms: u64) { + self.metrics.performance.apply_latency_ms.record(latency_ms); + } + + /// Record latency for backfill requests + pub fn record_backfill_request_latency(&self, latency_ms: u64) { + self.metrics.performance.backfill_request_latency_ms.record(latency_ms); + } + + /// Record database query duration + pub fn record_db_query_duration(&self, duration_ms: u64) { + self.metrics.performance.db_query_duration_ms.record(duration_ms); + self.metrics.performance.db_query_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Update state watermark + pub fn update_state_watermark(&self, timestamp: u64) { + self.metrics.performance.state_watermark.store(timestamp, std::sync::atomic::Ordering::Relaxed); + } + + /// Update shared watermark + pub async fn update_shared_watermark(&self, hlc: &str) { + let mut shared_watermark = self.metrics.performance.shared_watermark.write().await; + *shared_watermark = hlc.to_string(); + } + + /// Update watermark lag for peer + pub async fn update_watermark_lag(&self, peer_id: Uuid, lag_ms: u64) { + let mut watermark_lag = self.metrics.performance.watermark_lag_ms.write().await; + watermark_lag + .entry(peer_id) + .or_insert_with(|| AtomicU64::new(0)) + .store(lag_ms, std::sync::atomic::Ordering::Relaxed); + } + + /// Update HLC drift + pub fn update_hlc_drift(&self, drift_ms: i64) { + self.metrics.performance.hlc_physical_drift_ms.store(drift_ms, std::sync::atomic::Ordering::Relaxed); + } + + /// Update HLC counter max + pub fn update_hlc_counter_max(&self, max: u64) { + self.metrics.performance.hlc_counter_max.store(max, std::sync::atomic::Ordering::Relaxed); + } + + /// Record an error event + pub async fn record_error(&self, error: ErrorEvent) { + // Update error counts + self.metrics.errors.total_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + match error.error_type.as_str() { + "network" => { + self.metrics.errors.network_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + "database" => { + self.metrics.errors.database_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + "apply" => { + self.metrics.errors.apply_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + "validation" => { + self.metrics.errors.validation_errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + _ => {} + } + + // Add to recent errors + { + let mut recent_errors = self.metrics.errors.recent_errors.write().await; + recent_errors.push_back(error.clone()); + + // Trim to max size + while recent_errors.len() > self.max_history_size { + recent_errors.pop_front(); + } + } + + warn!( + error_type = %error.error_type, + message = %error.message, + "Recorded sync error" + ); + } + + /// Record conflict detection + pub fn record_conflict_detected(&self) { + self.metrics.errors.conflicts_detected.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Record conflict resolution by HLC + pub fn record_conflict_resolved_by_hlc(&self) { + self.metrics.errors.conflicts_resolved_by_hlc.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Set device online status + pub async fn set_device_online(&self, device_id: Uuid, is_online: bool) { + let mut entries_by_device = self.metrics.data_volume.entries_by_device.write().await; + if let Some(device_metrics) = entries_by_device.get_mut(&device_id) { + device_metrics.is_online.store(is_online, std::sync::atomic::Ordering::Relaxed); + } + } +} + +impl Default for SyncMetricsCollector { + fn default() -> Self { + Self::new() + } +} \ No newline at end of file diff --git a/core/src/service/sync/metrics/history.rs b/core/src/service/sync/metrics/history.rs new file mode 100644 index 000000000..f86412738 --- /dev/null +++ b/core/src/service/sync/metrics/history.rs @@ -0,0 +1,139 @@ +//! Time-series storage for sync metrics history + +use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Ring buffer for storing historical metrics snapshots +#[derive(Debug)] +pub struct MetricsHistory { + /// Ring buffer of snapshots + snapshots: Arc>>, + + /// Maximum number of snapshots to keep + max_size: usize, +} + +impl MetricsHistory { + /// Create a new metrics history with specified capacity + pub fn new(max_size: usize) -> Self { + Self { + snapshots: Arc::new(RwLock::new(VecDeque::with_capacity(max_size))), + max_size, + } + } + + /// Add a new snapshot to the history + pub async fn add_snapshot(&self, snapshot: SyncMetricsSnapshot) { + let mut snapshots = self.snapshots.write().await; + + // Add new snapshot + snapshots.push_back(snapshot); + + // Trim to max size + while snapshots.len() > self.max_size { + snapshots.pop_front(); + } + } + + /// Get all snapshots + pub async fn get_all_snapshots(&self) -> Vec { + let snapshots = self.snapshots.read().await; + snapshots.clone().into() + } + + /// Get snapshots since a specific time + pub async fn get_snapshots_since(&self, since: DateTime) -> Vec { + let snapshots = self.snapshots.read().await; + snapshots + .iter() + .filter(|snapshot| snapshot.timestamp >= since) + .cloned() + .collect() + } + + /// Get snapshots in a time range + pub async fn get_snapshots_range( + &self, + start: DateTime, + end: DateTime, + ) -> Vec { + let snapshots = self.snapshots.read().await; + snapshots + .iter() + .filter(|snapshot| snapshot.timestamp >= start && snapshot.timestamp <= end) + .cloned() + .collect() + } + + /// Get the latest snapshot + pub async fn get_latest_snapshot(&self) -> Option { + let snapshots = self.snapshots.read().await; + snapshots.back().cloned() + } + + /// Get the oldest snapshot + pub async fn get_oldest_snapshot(&self) -> Option { + let snapshots = self.snapshots.read().await; + snapshots.front().cloned() + } + + /// Clear all snapshots + pub async fn clear(&self) { + let mut snapshots = self.snapshots.write().await; + snapshots.clear(); + } + + /// Get the number of stored snapshots + pub async fn len(&self) -> usize { + let snapshots = self.snapshots.read().await; + snapshots.len() + } + + /// Check if history is empty + pub async fn is_empty(&self) -> bool { + let snapshots = self.snapshots.read().await; + snapshots.is_empty() + } + + /// Get capacity + pub fn capacity(&self) -> usize { + self.max_size + } +} + +impl Default for MetricsHistory { + fn default() -> Self { + Self::new(1000) // Default to 1000 snapshots + } +} + +/// Configuration for metrics history +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricsHistoryConfig { + /// Maximum number of snapshots to keep in memory + pub max_snapshots: usize, + + /// How often to take snapshots (in seconds) + pub snapshot_interval_secs: u64, + + /// Whether to persist snapshots to database + pub persist_to_db: bool, + + /// How often to persist to database (in seconds) + pub persist_interval_secs: u64, +} + +impl Default for MetricsHistoryConfig { + fn default() -> Self { + Self { + max_snapshots: 1000, + snapshot_interval_secs: 60, // Every minute + persist_to_db: false, + persist_interval_secs: 300, // Every 5 minutes + } + } +} \ No newline at end of file diff --git a/core/src/service/sync/metrics/mod.rs b/core/src/service/sync/metrics/mod.rs new file mode 100644 index 000000000..4baa4235b --- /dev/null +++ b/core/src/service/sync/metrics/mod.rs @@ -0,0 +1,14 @@ +//! Sync Metrics and Observability System +//! +//! Provides comprehensive metrics collection and monitoring for the sync system. +//! Tracks state transitions, operation counts, data volumes, performance metrics, +//! and error events to enable debugging and monitoring. + +pub mod collector; +pub mod history; +pub mod persistence; +pub mod snapshot; +pub mod types; + +pub use collector::SyncMetricsCollector; +pub use types::*; \ No newline at end of file diff --git a/core/src/service/sync/metrics/persistence.rs b/core/src/service/sync/metrics/persistence.rs new file mode 100644 index 000000000..c66225dad --- /dev/null +++ b/core/src/service/sync/metrics/persistence.rs @@ -0,0 +1,68 @@ +//! Persistence layer for sync metrics + +use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, Set, +}; +use serde_json; +use std::sync::Arc; +use uuid::Uuid; + +/// Store a metrics snapshot in the database +pub async fn store_metrics_snapshot( + db: &Arc, + library_id: Uuid, + snapshot: SyncMetricsSnapshot, +) -> Result<()> { + // For now, we'll store the snapshot as JSON in a simple table + // In the future, this could be optimized to store individual metrics + let snapshot_json = serde_json::to_value(&snapshot)?; + + // TODO: Create a proper database table for metrics snapshots + // For now, we'll just log that we would store it + tracing::debug!( + library_id = %library_id, + timestamp = %snapshot.timestamp, + "Would store metrics snapshot to database" + ); + + Ok(()) +} + +/// Retrieve metrics snapshots from the database +pub async fn get_metrics_snapshots( + db: &Arc, + library_id: Uuid, + since: Option>, + limit: Option, +) -> Result> { + // TODO: Implement database retrieval + // For now, return empty vector + tracing::debug!( + library_id = %library_id, + since = ?since, + limit = ?limit, + "Would retrieve metrics snapshots from database" + ); + + Ok(vec![]) +} + +/// Clean up old metrics snapshots +pub async fn cleanup_old_metrics( + db: &Arc, + library_id: Uuid, + older_than: DateTime, +) -> Result { + // TODO: Implement cleanup + // For now, return 0 + tracing::debug!( + library_id = %library_id, + older_than = %older_than, + "Would cleanup old metrics from database" + ); + + Ok(0) +} \ No newline at end of file diff --git a/core/src/service/sync/metrics/snapshot.rs b/core/src/service/sync/metrics/snapshot.rs new file mode 100644 index 000000000..642346ca0 --- /dev/null +++ b/core/src/service/sync/metrics/snapshot.rs @@ -0,0 +1,297 @@ +//! Point-in-time snapshots of sync metrics + +use crate::service::sync::state::DeviceSyncState; +use crate::service::sync::metrics::types::*; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use uuid::Uuid; + +/// Point-in-time snapshot of all sync metrics +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncMetricsSnapshot { + /// When this snapshot was taken + pub timestamp: DateTime, + + /// State metrics + pub state: SyncStateSnapshot, + + /// Operation metrics + pub operations: OperationSnapshot, + + /// Data volume metrics + pub data_volume: DataVolumeSnapshot, + + /// Performance metrics + pub performance: PerformanceSnapshot, + + /// Error metrics + pub errors: ErrorSnapshot, +} + +/// State metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncStateSnapshot { + pub current_state: DeviceSyncState, + pub state_entered_at: DateTime, + pub uptime_seconds: u64, + pub state_history: Vec, + pub total_time_in_state: HashMap, // milliseconds + pub transition_count: HashMap<(DeviceSyncState, DeviceSyncState), u64>, +} + +/// Operation metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OperationSnapshot { + // Broadcasts + pub broadcasts_sent: u64, + pub state_changes_broadcast: u64, + pub shared_changes_broadcast: u64, + pub broadcast_batches_sent: u64, + pub failed_broadcasts: u64, + + // Receives + pub changes_received: u64, + pub changes_applied: u64, + pub changes_rejected: u64, + pub buffer_queue_depth: u64, + + // Backfill + pub active_backfill_sessions: u64, + pub backfill_sessions_completed: u64, + pub backfill_pagination_rounds: u64, + + // Retries + pub retry_queue_depth: u64, + pub retry_attempts: u64, + pub retry_successes: u64, +} + +/// Data volume metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DataVolumeSnapshot { + pub entries_synced: HashMap, + pub entries_by_device: HashMap, + pub bytes_sent: u64, + pub bytes_received: u64, + pub last_sync_per_peer: HashMap>, + pub last_sync_per_model: HashMap>, +} + +/// Device metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceMetricsSnapshot { + pub device_id: Uuid, + pub device_name: String, + pub entries_received: u64, + pub last_seen: DateTime, + pub is_online: bool, +} + +/// Performance metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PerformanceSnapshot { + pub broadcast_latency: LatencySnapshot, + pub apply_latency: LatencySnapshot, + pub backfill_request_latency: LatencySnapshot, + pub state_watermark: DateTime, + pub shared_watermark: String, + pub watermark_lag_ms: HashMap, + pub hlc_physical_drift_ms: i64, + pub hlc_counter_max: u64, + pub db_query_duration: LatencySnapshot, + pub db_query_count: u64, +} + +/// Latency metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LatencySnapshot { + pub count: u64, + pub avg_ms: f64, + pub min_ms: u64, + pub max_ms: u64, +} + +/// Error metrics snapshot +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorSnapshot { + pub total_errors: u64, + pub network_errors: u64, + pub database_errors: u64, + pub apply_errors: u64, + pub validation_errors: u64, + pub recent_errors: Vec, + pub conflicts_detected: u64, + pub conflicts_resolved_by_hlc: u64, +} + +impl SyncMetricsSnapshot { + /// Create a snapshot from current metrics + pub async fn from_metrics(metrics: &Arc) -> Self { + let now = Utc::now(); + + // State snapshot + let current_state = *metrics.state.current_state.read().await; + let state_entered_at = *metrics.state.state_entered_at.read().await; + let uptime_seconds = now.signed_duration_since(state_entered_at).num_seconds().max(0) as u64; + + let state_history = metrics.state.state_history.read().await.clone().into(); + let total_time_in_state = metrics.state.total_time_in_state.read().await + .iter() + .map(|(k, v)| (*k, v.as_millis() as u64)) + .collect(); + let transition_count = metrics.state.transition_count.read().await.clone(); + + let state = SyncStateSnapshot { + current_state, + state_entered_at, + uptime_seconds, + state_history, + total_time_in_state, + transition_count, + }; + + // Operation snapshot + let operations = OperationSnapshot { + broadcasts_sent: metrics.operations.broadcasts_sent.load(std::sync::atomic::Ordering::Relaxed), + state_changes_broadcast: metrics.operations.state_changes_broadcast.load(std::sync::atomic::Ordering::Relaxed), + shared_changes_broadcast: metrics.operations.shared_changes_broadcast.load(std::sync::atomic::Ordering::Relaxed), + broadcast_batches_sent: metrics.operations.broadcast_batches_sent.load(std::sync::atomic::Ordering::Relaxed), + failed_broadcasts: metrics.operations.failed_broadcasts.load(std::sync::atomic::Ordering::Relaxed), + changes_received: metrics.operations.changes_received.load(std::sync::atomic::Ordering::Relaxed), + changes_applied: metrics.operations.changes_applied.load(std::sync::atomic::Ordering::Relaxed), + changes_rejected: metrics.operations.changes_rejected.load(std::sync::atomic::Ordering::Relaxed), + buffer_queue_depth: metrics.operations.buffer_queue_depth.load(std::sync::atomic::Ordering::Relaxed), + active_backfill_sessions: metrics.operations.active_backfill_sessions.load(std::sync::atomic::Ordering::Relaxed), + backfill_sessions_completed: metrics.operations.backfill_sessions_completed.load(std::sync::atomic::Ordering::Relaxed), + backfill_pagination_rounds: metrics.operations.backfill_pagination_rounds.load(std::sync::atomic::Ordering::Relaxed), + retry_queue_depth: metrics.operations.retry_queue_depth.load(std::sync::atomic::Ordering::Relaxed), + retry_attempts: metrics.operations.retry_attempts.load(std::sync::atomic::Ordering::Relaxed), + retry_successes: metrics.operations.retry_successes.load(std::sync::atomic::Ordering::Relaxed), + }; + + // Data volume snapshot + let entries_synced = metrics.data_volume.entries_synced.read().await + .iter() + .map(|(k, v)| (k.clone(), v.load(std::sync::atomic::Ordering::Relaxed))) + .collect(); + + let entries_by_device = metrics.data_volume.entries_by_device.read().await + .iter() + .map(|(device_id, device_metrics)| { + (*device_id, DeviceMetricsSnapshot { + device_id: device_metrics.device_id, + device_name: device_metrics.device_name.clone(), + entries_received: device_metrics.entries_received.load(std::sync::atomic::Ordering::Relaxed), + last_seen: DateTime::from_timestamp(device_metrics.last_seen.load(std::sync::atomic::Ordering::Relaxed) as i64, 0) + .unwrap_or_else(|| Utc::now()), + is_online: device_metrics.is_online.load(std::sync::atomic::Ordering::Relaxed), + }) + }) + .collect(); + + let last_sync_per_peer = metrics.data_volume.last_sync_per_peer.read().await.clone(); + let last_sync_per_model = metrics.data_volume.last_sync_per_model.read().await.clone(); + + let data_volume = DataVolumeSnapshot { + entries_synced, + entries_by_device, + bytes_sent: metrics.data_volume.bytes_sent.load(std::sync::atomic::Ordering::Relaxed), + bytes_received: metrics.data_volume.bytes_received.load(std::sync::atomic::Ordering::Relaxed), + last_sync_per_peer, + last_sync_per_model, + }; + + // Performance snapshot + let state_watermark = DateTime::from_timestamp( + metrics.performance.state_watermark.load(std::sync::atomic::Ordering::Relaxed) as i64, + 0 + ).unwrap_or_else(|| Utc::now()); + + let shared_watermark = metrics.performance.shared_watermark.read().await.clone(); + let watermark_lag_ms = metrics.performance.watermark_lag_ms.read().await + .iter() + .map(|(k, v)| (*k, v.load(std::sync::atomic::Ordering::Relaxed))) + .collect(); + + let performance = PerformanceSnapshot { + broadcast_latency: LatencySnapshot::from_histogram(&metrics.performance.broadcast_latency_ms), + apply_latency: LatencySnapshot::from_histogram(&metrics.performance.apply_latency_ms), + backfill_request_latency: LatencySnapshot::from_histogram(&metrics.performance.backfill_request_latency_ms), + state_watermark, + shared_watermark, + watermark_lag_ms, + hlc_physical_drift_ms: metrics.performance.hlc_physical_drift_ms.load(std::sync::atomic::Ordering::Relaxed), + hlc_counter_max: metrics.performance.hlc_counter_max.load(std::sync::atomic::Ordering::Relaxed), + db_query_duration: LatencySnapshot::from_histogram(&metrics.performance.db_query_duration_ms), + db_query_count: metrics.performance.db_query_count.load(std::sync::atomic::Ordering::Relaxed), + }; + + // Error snapshot + let recent_errors = metrics.errors.recent_errors.read().await.clone().into(); + + let errors = ErrorSnapshot { + total_errors: metrics.errors.total_errors.load(std::sync::atomic::Ordering::Relaxed), + network_errors: metrics.errors.network_errors.load(std::sync::atomic::Ordering::Relaxed), + database_errors: metrics.errors.database_errors.load(std::sync::atomic::Ordering::Relaxed), + apply_errors: metrics.errors.apply_errors.load(std::sync::atomic::Ordering::Relaxed), + validation_errors: metrics.errors.validation_errors.load(std::sync::atomic::Ordering::Relaxed), + recent_errors, + conflicts_detected: metrics.errors.conflicts_detected.load(std::sync::atomic::Ordering::Relaxed), + conflicts_resolved_by_hlc: metrics.errors.conflicts_resolved_by_hlc.load(std::sync::atomic::Ordering::Relaxed), + }; + + Self { + timestamp: now, + state, + operations, + data_volume, + performance, + errors, + } + } + + /// Filter snapshot to only include data since a specific time + pub fn filter_since(&mut self, since: DateTime) { + // Filter state history + self.state.state_history.retain(|transition| transition.timestamp >= since); + + // Filter recent errors + self.errors.recent_errors.retain(|error| error.timestamp >= since); + + // Note: Other metrics are cumulative, so we don't filter them + } + + /// Filter snapshot to only include data for a specific peer + pub fn filter_by_peer(&mut self, peer_id: Uuid) { + // Filter device metrics + self.data_volume.entries_by_device.retain(|device_id, _| *device_id == peer_id); + self.data_volume.last_sync_per_peer.retain(|device_id, _| *device_id == peer_id); + self.performance.watermark_lag_ms.retain(|device_id, _| *device_id == peer_id); + + // Filter recent errors + self.errors.recent_errors.retain(|error| error.device_id == Some(peer_id)); + } + + /// Filter snapshot to only include data for a specific model type + pub fn filter_by_model(&mut self, model_type: &str) { + // Filter entries synced + self.data_volume.entries_synced.retain(|model, _| model == model_type); + self.data_volume.last_sync_per_model.retain(|model, _| model == model_type); + + // Filter recent errors + self.errors.recent_errors.retain(|error| error.model_type.as_ref() == Some(&model_type.to_string())); + } +} + +impl LatencySnapshot { + fn from_histogram(histogram: &HistogramMetric) -> Self { + Self { + count: histogram.count(), + avg_ms: histogram.avg(), + min_ms: histogram.min(), + max_ms: histogram.max(), + } + } +} \ No newline at end of file diff --git a/core/src/service/sync/metrics/types.rs b/core/src/service/sync/metrics/types.rs new file mode 100644 index 000000000..05fc99e0e --- /dev/null +++ b/core/src/service/sync/metrics/types.rs @@ -0,0 +1,357 @@ +//! Metric types and data structures for sync observability + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +use crate::service::sync::state::DeviceSyncState; + +/// Central metrics collector for sync operations +#[derive(Debug)] +pub struct SyncMetrics { + /// State transition tracking + pub state: SyncStateMetrics, + + /// Operation counters + pub operations: OperationMetrics, + + /// Data volume tracking + pub data_volume: DataVolumeMetrics, + + /// Performance metrics + pub performance: PerformanceMetrics, + + /// Error tracking + pub errors: ErrorMetrics, +} + +impl Default for SyncMetrics { + fn default() -> Self { + Self { + state: SyncStateMetrics::default(), + operations: OperationMetrics::default(), + data_volume: DataVolumeMetrics::default(), + performance: PerformanceMetrics::default(), + errors: ErrorMetrics::default(), + } + } +} + +/// Sync state transition tracking +#[derive(Debug)] +pub struct SyncStateMetrics { + /// Current sync state + pub current_state: Arc>, + + /// When current state was entered + pub state_entered_at: Arc>>, + + /// State transition history (last N transitions) + pub state_history: Arc>>, + + /// Total time spent in each state + pub total_time_in_state: Arc>>, + + /// Transition counts between states + pub transition_count: Arc>>, +} + +impl Default for SyncStateMetrics { + fn default() -> Self { + Self { + current_state: Arc::new(RwLock::new(DeviceSyncState::Uninitialized)), + state_entered_at: Arc::new(RwLock::new(Utc::now())), + state_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))), + total_time_in_state: Arc::new(RwLock::new(HashMap::new())), + transition_count: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +/// State transition event +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateTransition { + pub from: DeviceSyncState, + pub to: DeviceSyncState, + pub timestamp: DateTime, + pub reason: Option, +} + +/// Operation counters for sync activities +#[derive(Debug)] +pub struct OperationMetrics { + // Broadcasts + pub broadcasts_sent: AtomicU64, + pub state_changes_broadcast: AtomicU64, + pub shared_changes_broadcast: AtomicU64, + pub broadcast_batches_sent: AtomicU64, + pub failed_broadcasts: AtomicU64, + + // Receives + pub changes_received: AtomicU64, + pub changes_applied: AtomicU64, + pub changes_rejected: AtomicU64, + pub buffer_queue_depth: AtomicU64, + + // Backfill + pub active_backfill_sessions: AtomicU64, + pub backfill_sessions_completed: AtomicU64, + pub backfill_pagination_rounds: AtomicU64, + + // Retries + pub retry_queue_depth: AtomicU64, + pub retry_attempts: AtomicU64, + pub retry_successes: AtomicU64, +} + +impl Default for OperationMetrics { + fn default() -> Self { + Self { + broadcasts_sent: AtomicU64::new(0), + state_changes_broadcast: AtomicU64::new(0), + shared_changes_broadcast: AtomicU64::new(0), + broadcast_batches_sent: AtomicU64::new(0), + failed_broadcasts: AtomicU64::new(0), + changes_received: AtomicU64::new(0), + changes_applied: AtomicU64::new(0), + changes_rejected: AtomicU64::new(0), + buffer_queue_depth: AtomicU64::new(0), + active_backfill_sessions: AtomicU64::new(0), + backfill_sessions_completed: AtomicU64::new(0), + backfill_pagination_rounds: AtomicU64::new(0), + retry_queue_depth: AtomicU64::new(0), + retry_attempts: AtomicU64::new(0), + retry_successes: AtomicU64::new(0), + } + } +} + +/// Data volume tracking metrics +#[derive(Debug)] +pub struct DataVolumeMetrics { + /// Per-model type counters + pub entries_synced: Arc>>, + + /// Per-device metrics + pub entries_by_device: Arc>>, + + /// Bytes transferred + pub bytes_sent: AtomicU64, + pub bytes_received: AtomicU64, + + /// Last sync timestamps + pub last_sync_per_peer: Arc>>>, + pub last_sync_per_model: Arc>>>, +} + +impl Default for DataVolumeMetrics { + fn default() -> Self { + Self { + entries_synced: Arc::new(RwLock::new(HashMap::new())), + entries_by_device: Arc::new(RwLock::new(HashMap::new())), + bytes_sent: AtomicU64::new(0), + bytes_received: AtomicU64::new(0), + last_sync_per_peer: Arc::new(RwLock::new(HashMap::new())), + last_sync_per_model: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +/// Per-device metrics +#[derive(Debug)] +pub struct DeviceMetrics { + pub device_id: Uuid, + pub device_name: String, + pub entries_received: AtomicU64, + pub last_seen: AtomicU64, // Unix timestamp + pub is_online: AtomicBool, +} + +impl DeviceMetrics { + pub fn new(device_id: Uuid, device_name: String) -> Self { + Self { + device_id, + device_name, + entries_received: AtomicU64::new(0), + last_seen: AtomicU64::new(Utc::now().timestamp() as u64), + is_online: AtomicBool::new(true), + } + } +} + +/// Performance metrics with latency tracking +#[derive(Debug)] +pub struct PerformanceMetrics { + /// Latency histograms + pub broadcast_latency_ms: HistogramMetric, + pub apply_latency_ms: HistogramMetric, + pub backfill_request_latency_ms: HistogramMetric, + + /// Watermark tracking + pub state_watermark: AtomicU64, // Unix timestamp + pub shared_watermark: Arc>, // HLC string + pub watermark_lag_ms: Arc>>, // Per-peer lag + + /// HLC drift tracking + pub hlc_physical_drift_ms: AtomicI64, + pub hlc_counter_max: AtomicU64, + + /// Database performance + pub db_query_duration_ms: HistogramMetric, + pub db_query_count: AtomicU64, +} + +impl Default for PerformanceMetrics { + fn default() -> Self { + Self { + broadcast_latency_ms: HistogramMetric::new(), + apply_latency_ms: HistogramMetric::new(), + backfill_request_latency_ms: HistogramMetric::new(), + state_watermark: AtomicU64::new(Utc::now().timestamp() as u64), + shared_watermark: Arc::new(RwLock::new(String::new())), + watermark_lag_ms: Arc::new(RwLock::new(HashMap::new())), + hlc_physical_drift_ms: AtomicI64::new(0), + hlc_counter_max: AtomicU64::new(0), + db_query_duration_ms: HistogramMetric::new(), + db_query_count: AtomicU64::new(0), + } + } +} + +/// Histogram metric for tracking latency distributions +#[derive(Debug)] +pub struct HistogramMetric { + pub count: AtomicU64, + pub sum: AtomicU64, + pub min: AtomicU64, + pub max: AtomicU64, +} + +impl HistogramMetric { + pub fn new() -> Self { + Self { + count: AtomicU64::new(0), + sum: AtomicU64::new(0), + min: AtomicU64::new(u64::MAX), + max: AtomicU64::new(0), + } + } + + pub fn record(&self, value_ms: u64) { + self.count.fetch_add(1, Ordering::Relaxed); + self.sum.fetch_add(value_ms, Ordering::Relaxed); + + // Update min + loop { + let current_min = self.min.load(Ordering::Relaxed); + if value_ms >= current_min { + break; + } + if self.min.compare_exchange_weak(current_min, value_ms, Ordering::Relaxed, Ordering::Relaxed).is_ok() { + break; + } + } + + // Update max + loop { + let current_max = self.max.load(Ordering::Relaxed); + if value_ms <= current_max { + break; + } + if self.max.compare_exchange_weak(current_max, value_ms, Ordering::Relaxed, Ordering::Relaxed).is_ok() { + break; + } + } + } + + pub fn avg(&self) -> f64 { + let count = self.count.load(Ordering::Relaxed); + if count == 0 { + 0.0 + } else { + self.sum.load(Ordering::Relaxed) as f64 / count as f64 + } + } + + pub fn min(&self) -> u64 { + let min = self.min.load(Ordering::Relaxed); + if min == u64::MAX { 0 } else { min } + } + + pub fn max(&self) -> u64 { + self.max.load(Ordering::Relaxed) + } + + pub fn count(&self) -> u64 { + self.count.load(Ordering::Relaxed) + } +} + +/// Error tracking metrics +#[derive(Debug)] +pub struct ErrorMetrics { + /// Error counts by type + pub total_errors: AtomicU64, + pub network_errors: AtomicU64, + pub database_errors: AtomicU64, + pub apply_errors: AtomicU64, + pub validation_errors: AtomicU64, + + /// Recent errors (ring buffer) + pub recent_errors: Arc>>, + + /// Conflict resolution + pub conflicts_detected: AtomicU64, + pub conflicts_resolved_by_hlc: AtomicU64, +} + +impl Default for ErrorMetrics { + fn default() -> Self { + Self { + total_errors: AtomicU64::new(0), + network_errors: AtomicU64::new(0), + database_errors: AtomicU64::new(0), + apply_errors: AtomicU64::new(0), + validation_errors: AtomicU64::new(0), + recent_errors: Arc::new(RwLock::new(VecDeque::with_capacity(100))), + conflicts_detected: AtomicU64::new(0), + conflicts_resolved_by_hlc: AtomicU64::new(0), + } + } +} + +/// Error event for tracking recent errors +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorEvent { + pub timestamp: DateTime, + pub error_type: String, + pub message: String, + pub model_type: Option, + pub device_id: Option, +} + +impl ErrorEvent { + pub fn new(error_type: String, message: String) -> Self { + Self { + timestamp: Utc::now(), + error_type, + message, + model_type: None, + device_id: None, + } + } + + pub fn with_model_type(mut self, model_type: String) -> Self { + self.model_type = Some(model_type); + self + } + + pub fn with_device_id(mut self, device_id: Uuid) -> Self { + self.device_id = Some(device_id); + self + } +} \ No newline at end of file diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index aed6105f4..84bd49515 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -5,6 +5,7 @@ //! - Log-based sync with HLC for shared resources pub mod backfill; +pub mod metrics; pub mod peer; pub mod protocol_handler; pub mod retry_queue; @@ -29,6 +30,8 @@ use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, warn}; use uuid::Uuid; +pub use metrics::SyncMetricsCollector; + pub use backfill::BackfillManager; pub use protocol_handler::{LogSyncHandler, StateSyncHandler}; @@ -46,6 +49,9 @@ pub struct SyncService { /// Backfill manager for orchestrating initial sync backfill_manager: Arc, + /// Metrics collector for observability + metrics: Arc, + /// Whether the service is running is_running: Arc, @@ -88,8 +94,11 @@ impl SyncService { .map_err(|e| anyhow::anyhow!("Failed to open sync.db: {}", e))?, ); + // Create metrics collector + let metrics = Arc::new(SyncMetricsCollector::new()); + // Create peer sync handler with network transport - let peer_sync = Arc::new(PeerSync::new(library, device_id, peer_log, network, config.clone()).await?); + let peer_sync = Arc::new(PeerSync::new(library, device_id, peer_log, network, config.clone(), metrics.clone()).await?); // Create protocol handlers let state_handler = Arc::new(StateSyncHandler::new(library_id, library.db().clone())); @@ -107,6 +116,7 @@ impl SyncService { state_handler, log_handler, config.clone(), + metrics.clone(), )); info!( @@ -121,6 +131,7 @@ impl SyncService { config, peer_sync, backfill_manager, + metrics, is_running: Arc::new(AtomicBool::new(false)), shutdown_tx: Arc::new(Mutex::new(None)), }) @@ -141,6 +152,26 @@ impl SyncService { &self.backfill_manager } + /// Get the metrics collector + pub fn metrics(&self) -> &Arc { + &self.metrics + } + + /// Emit metrics update event + pub async fn emit_metrics_event(&self, library_id: Uuid) { + // Create a snapshot of current metrics + let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&self.metrics).await; + + // Emit event + self.peer_sync.event_bus().emit(crate::infra::event::Event::Custom { + event_type: "sync:metrics_updated".to_string(), + data: serde_json::json!({ + "library_id": library_id, + "metrics": snapshot + }), + }); + } + /// Main sync loop (spawned as background task) /// /// This is the orchestration layer that: @@ -451,6 +482,14 @@ impl crate::service::Service for SyncService { Self::run_pruning_task(config_clone, peer_sync_clone).await; }); + // Spawn metrics persistence task (runs every 5 minutes) + let metrics = self.metrics.clone(); + let library_id = library_id; + let db = self.peer_sync.db().clone(); + tokio::spawn(async move { + Self::run_metrics_persistence_task(metrics, library_id, db).await; + }); + info!("Peer sync service started (with pruning task)"); Ok(()) @@ -478,4 +517,35 @@ impl crate::service::Service for SyncService { Ok(()) } + + /// Background task for persisting metrics snapshots + async fn run_metrics_persistence_task( + metrics: Arc, + library_id: Uuid, + db: Arc, + ) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // 5 minutes + + info!("Starting metrics persistence task (interval: 5m)"); + + loop { + interval.tick().await; + + // Create snapshot + let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&metrics).await; + + // Store in database + if let Err(e) = crate::service::sync::metrics::persistence::store_metrics_snapshot( + &db, + library_id, + snapshot, + ).await { + warn!( + library_id = %library_id, + error = %e, + "Failed to persist metrics snapshot" + ); + } + } + } } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 303a1ba8a..ba7c6013c 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -147,6 +147,9 @@ pub struct PeerSync { Option>, >, >, + + /// Metrics collector for observability + metrics: Arc, } impl PeerSync { @@ -157,6 +160,7 @@ impl PeerSync { peer_log: Arc, network: Arc, config: Arc, + metrics: Arc, ) -> Result { let library_id = library.id(); @@ -180,6 +184,7 @@ impl PeerSync { retry_queue: Arc::new(RetryQueue::new()), is_running: Arc::new(AtomicBool::new(false)), network_events: Arc::new(tokio::sync::Mutex::new(None)), + metrics, }) } @@ -1404,6 +1409,9 @@ impl PeerSync { "Broadcasting state change to sync partners" ); + // Record start time for latency tracking + let start_time = std::time::Instant::now(); + // Broadcast to all partners in parallel using futures::join_all use futures::future::join_all; @@ -1455,6 +1463,18 @@ impl PeerSync { } } + // Record metrics + self.metrics.record_broadcast(true, None); + if error_count > 0 { + for _ in 0..error_count { + self.metrics.record_failed_broadcast(); + } + } + + // Record latency + let latency_ms = start_time.elapsed().as_millis() as u64; + self.metrics.record_broadcast_latency(latency_ms); + info!( model_type = %change.model_type, success = success_count, @@ -1541,6 +1561,9 @@ impl PeerSync { "Broadcasting shared change to sync partners" ); + // Record start time for latency tracking + let start_time = std::time::Instant::now(); + // Broadcast to all partners in parallel using futures::join_all use futures::future::join_all; @@ -1592,6 +1615,18 @@ impl PeerSync { } } + // Record metrics + self.metrics.record_broadcast(false, None); + if error_count > 0 { + for _ in 0..error_count { + self.metrics.record_failed_broadcast(); + } + } + + // Record latency + let latency_ms = start_time.elapsed().as_millis() as u64; + self.metrics.record_broadcast_latency(latency_ms); + info!( hlc = %hlc, model_type = %model_type, @@ -1605,6 +1640,9 @@ impl PeerSync { /// Handle received state change pub async fn on_state_change_received(&self, change: StateChangeMessage) -> Result<()> { + // Record metrics + self.metrics.record_changes_received(1); + let state = self.state().await; if state.should_buffer() { @@ -1622,6 +1660,9 @@ impl PeerSync { /// Handle received shared change pub async fn on_shared_change_received(&self, entry: SharedChangeEntry) -> Result<()> { + // Record metrics + self.metrics.record_changes_received(1); + // Update causality self.hlc_generator.lock().await.update(entry.hlc); @@ -1646,6 +1687,9 @@ impl PeerSync { /// Apply state change to database async fn apply_state_change(&self, change: StateChangeMessage) -> Result<()> { + // Record start time for latency tracking + let start_time = std::time::Instant::now(); + debug!( model_type = %change.model_type, record_uuid = %change.record_uuid, @@ -1660,7 +1704,27 @@ impl PeerSync { self.db.clone(), ) .await - .map_err(|e| anyhow::anyhow!("Failed to apply state change: {}", e))?; + .map_err(|e| { + // Record error metrics (spawn async task) + let metrics = self.metrics.clone(); + let model_type = change.model_type.clone(); + let error_msg = format!("Failed to apply state change: {}", e); + tokio::spawn(async move { + let _ = metrics.record_error( + super::metrics::ErrorEvent::new("apply".to_string(), error_msg) + .with_model_type(model_type) + ).await; + }); + anyhow::anyhow!("Failed to apply state change: {}", e) + })?; + + // Record metrics + self.metrics.record_changes_applied(1); + self.metrics.record_entries_synced(&change.model_type, 1).await; + + // Record latency + let latency_ms = start_time.elapsed().as_millis() as u64; + self.metrics.record_apply_latency(latency_ms); info!( model_type = %change.model_type, @@ -1686,6 +1750,9 @@ impl PeerSync { /// Apply shared change to database with conflict resolution async fn apply_shared_change(&self, entry: SharedChangeEntry) -> Result<()> { + // Record start time for latency tracking + let start_time = std::time::Instant::now(); + debug!( hlc = %entry.hlc, model_type = %entry.model_type, @@ -1713,7 +1780,19 @@ impl PeerSync { // Use the registry to route to the appropriate apply function crate::infra::sync::apply_shared_change(entry.clone(), self.db.clone()) .await - .map_err(|e| anyhow::anyhow!("Failed to apply shared change: {}", e))?; + .map_err(|e| { + // Record error metrics (spawn async task) + let metrics = self.metrics.clone(); + let model_type = entry.model_type.clone(); + let error_msg = format!("Failed to apply shared change: {}", e); + tokio::spawn(async move { + let _ = metrics.record_error( + super::metrics::ErrorEvent::new("apply".to_string(), error_msg) + .with_model_type(model_type) + ).await; + }); + anyhow::anyhow!("Failed to apply shared change: {}", e) + })?; // Record this change in our peer log (track what we've applied) self.peer_log @@ -1721,6 +1800,14 @@ impl PeerSync { .await .map_err(|e| anyhow::anyhow!("Failed to append to peer log: {}", e))?; + // Record metrics + self.metrics.record_changes_applied(1); + self.metrics.record_entries_synced(&entry.model_type, 1).await; + + // Record latency + let latency_ms = start_time.elapsed().as_millis() as u64; + self.metrics.record_apply_latency(latency_ms); + info!( hlc = %entry.hlc, model_type = %entry.model_type, @@ -2047,6 +2134,9 @@ impl PeerSync { }; } + // Record state transition + self.metrics.record_state_transition(current_state, DeviceSyncState::CatchingUp { buffered_count: 0 }, Some("transitioning to ready".to_string())).await?; + // Process buffer while let Some(update) = self.buffer.pop_ordered().await { match update { @@ -2065,6 +2155,9 @@ impl PeerSync { *state = DeviceSyncState::Ready; } + // Record state transition + self.metrics.record_state_transition(DeviceSyncState::CatchingUp { buffered_count: 0 }, DeviceSyncState::Ready, Some("buffered updates processed".to_string())).await?; + info!("Sync service is now ready"); // Emit event @@ -2078,4 +2171,9 @@ impl PeerSync { Ok(()) } + + /// Get the event bus + pub fn event_bus(&self) -> &Arc { + &self.event_bus + } } diff --git a/core/src/service/sync/state.rs b/core/src/service/sync/state.rs index 72e0d30cf..f73848f78 100644 --- a/core/src/service/sync/state.rs +++ b/core/src/service/sync/state.rs @@ -11,7 +11,7 @@ use tracing::warn; use uuid::Uuid; /// Device sync state for state machine -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum DeviceSyncState { /// Not yet synced, no backfill started Uninitialized,