diff --git a/core/src/ops/sync/get_metrics/action.rs b/core/src/ops/sync/get_metrics/action.rs index e36bfa034..b2039d56e 100644 --- a/core/src/ops/sync/get_metrics/action.rs +++ b/core/src/ops/sync/get_metrics/action.rs @@ -1,25 +1,50 @@ //! Get sync metrics action -use crate::ops::{LibraryQuery, LibraryQueryContext}; +use crate::infra::query::{LibraryQuery, QueryError, QueryResult}; use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot; +use crate::context::CoreContext; use anyhow::Result; +use std::sync::Arc; use super::{GetSyncMetricsInput, GetSyncMetricsOutput}; /// Get sync metrics for the current library -pub struct GetSyncMetrics; +pub struct GetSyncMetrics { + pub input: GetSyncMetricsInput, +} impl LibraryQuery for GetSyncMetrics { type Input = GetSyncMetricsInput; type Output = GetSyncMetricsOutput; - async fn execute(input: Self::Input, ctx: LibraryQueryContext) -> Result { - // Get the sync service from the library - let sync_service = ctx.library.sync_service().await?; + fn from_input(input: Self::Input) -> QueryResult { + Ok(Self { input }) + } + + async fn execute( + self, + context: Arc, + session: crate::infra::api::SessionContext, + ) -> QueryResult { + // Use the input from the query + let input = self.input; + // Get the specific library from the library manager + let library_id = session + .current_library_id + .ok_or_else(|| QueryError::Internal("No library in session".to_string()))?; + let library = context + .libraries() + .await + .get_library(library_id) + .await + .ok_or_else(|| QueryError::LibraryNotFound(library_id))?; + + let sync_service = library.sync_service() + .ok_or_else(|| QueryError::Internal("Sync service not available".to_string()))?; let metrics = sync_service.metrics(); // Create a snapshot of current metrics - let mut snapshot = SyncMetricsSnapshot::from_metrics(metrics).await; + let mut snapshot = SyncMetricsSnapshot::from_metrics(metrics.metrics()).await; // Apply filters if let Some(since) = input.since { diff --git a/core/src/service/sync/metrics/collector.rs b/core/src/service/sync/metrics/collector.rs index 9fa02e7dd..3a6fd4d7c 100644 --- a/core/src/service/sync/metrics/collector.rs +++ b/core/src/service/sync/metrics/collector.rs @@ -29,6 +29,11 @@ impl SyncMetricsCollector { self.max_history_size = size; self } + + /// Get the underlying metrics + pub fn metrics(&self) -> &Arc { + &self.metrics + } /// Get reference to metrics pub fn metrics(&self) -> &Arc { @@ -83,10 +88,16 @@ impl SyncMetricsCollector { // Update time in previous state { + let mut state_entered_at = self.metrics.state.state_entered_at.write().await; let mut total_time = self.metrics.state.total_time_in_state.write().await; - let duration = now.signed_duration_since(*self.metrics.state.state_entered_at.read().await); + + // Calculate duration BEFORE updating the entry time + let duration = now.signed_duration_since(*state_entered_at); *total_time.entry(from).or_insert(std::time::Duration::ZERO) += std::time::Duration::from_millis(duration.num_milliseconds().max(0) as u64); + + // Update entry time for new state AFTER calculating duration + *state_entered_at = now; } debug!( diff --git a/core/src/service/sync/metrics/snapshot.rs b/core/src/service/sync/metrics/snapshot.rs index 642346ca0..9cd651b41 100644 --- a/core/src/service/sync/metrics/snapshot.rs +++ b/core/src/service/sync/metrics/snapshot.rs @@ -31,7 +31,7 @@ pub struct SyncMetricsSnapshot { } /// State metrics snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct SyncStateSnapshot { pub current_state: DeviceSyncState, pub state_entered_at: DateTime, @@ -42,7 +42,7 @@ pub struct SyncStateSnapshot { } /// Operation metrics snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct OperationSnapshot { // Broadcasts pub broadcasts_sent: u64, @@ -69,7 +69,7 @@ pub struct OperationSnapshot { } /// Data volume metrics snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct DataVolumeSnapshot { pub entries_synced: HashMap, pub entries_by_device: HashMap, @@ -90,7 +90,7 @@ pub struct DeviceMetricsSnapshot { } /// Performance metrics snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct PerformanceSnapshot { pub broadcast_latency: LatencySnapshot, pub apply_latency: LatencySnapshot, @@ -114,7 +114,7 @@ pub struct LatencySnapshot { } /// Error metrics snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ErrorSnapshot { pub total_errors: u64, pub network_errors: u64, @@ -281,7 +281,7 @@ impl SyncMetricsSnapshot { self.data_volume.last_sync_per_model.retain(|model, _| model == model_type); // Filter recent errors - self.errors.recent_errors.retain(|error| error.model_type.as_ref() == Some(&model_type.to_string())); + self.errors.recent_errors.retain(|error| error.model_type.as_ref() == Some(model_type)); } } diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index 84bd49515..804ef3c10 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -160,7 +160,7 @@ impl SyncService { /// Emit metrics update event pub async fn emit_metrics_event(&self, library_id: Uuid) { // Create a snapshot of current metrics - let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&self.metrics).await; + let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(self.metrics.metrics()).await; // Emit event self.peer_sync.event_bus().emit(crate::infra::event::Event::Custom { @@ -484,10 +484,10 @@ impl crate::service::Service for SyncService { // Spawn metrics persistence task (runs every 5 minutes) let metrics = self.metrics.clone(); - let library_id = library_id; + let library_id = self.library_id; let db = self.peer_sync.db().clone(); tokio::spawn(async move { - Self::run_metrics_persistence_task(metrics, library_id, db).await; + run_metrics_persistence_task(metrics, library_id, db).await; }); info!("Peer sync service started (with pruning task)"); @@ -517,35 +517,35 @@ impl crate::service::Service for SyncService { Ok(()) } +} - /// Background task for persisting metrics snapshots - async fn run_metrics_persistence_task( - metrics: Arc, - library_id: Uuid, - db: Arc, - ) { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // 5 minutes +/// Background task for persisting metrics snapshots +async fn run_metrics_persistence_task( + metrics: Arc, + library_id: Uuid, + db: Arc, +) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // 5 minutes + + info!("Starting metrics persistence task (interval: 5m)"); + + loop { + interval.tick().await; - info!("Starting metrics persistence task (interval: 5m)"); + // Create snapshot + let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(metrics.metrics()).await; - loop { - interval.tick().await; - - // Create snapshot - let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&metrics).await; - - // Store in database - if let Err(e) = crate::service::sync::metrics::persistence::store_metrics_snapshot( - &db, - library_id, - snapshot, - ).await { - warn!( - library_id = %library_id, - error = %e, - "Failed to persist metrics snapshot" - ); - } + // Store in database + if let Err(e) = crate::service::sync::metrics::persistence::store_metrics_snapshot( + &db, + library_id, + snapshot, + ).await { + warn!( + library_id = %library_id, + error = %e, + "Failed to persist metrics snapshot" + ); } } } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index ba7c6013c..a07c1f791 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -2135,7 +2135,8 @@ impl PeerSync { } // Record state transition - self.metrics.record_state_transition(current_state, DeviceSyncState::CatchingUp { buffered_count: 0 }, Some("transitioning to ready".to_string())).await?; + let buffered_count = self.buffer.len().await; + self.metrics.record_state_transition(current_state, DeviceSyncState::CatchingUp { buffered_count }, Some("transitioning to ready".to_string())).await?; // Process buffer while let Some(update) = self.buffer.pop_ordered().await { @@ -2156,7 +2157,8 @@ impl PeerSync { } // Record state transition - self.metrics.record_state_transition(DeviceSyncState::CatchingUp { buffered_count: 0 }, DeviceSyncState::Ready, Some("buffered updates processed".to_string())).await?; + let buffered_count = self.buffer.len().await; + self.metrics.record_state_transition(DeviceSyncState::CatchingUp { buffered_count }, DeviceSyncState::Ready, Some("buffered updates processed".to_string())).await?; info!("Sync service is now ready");