Refactor sync metrics collection and persistence

Co-authored-by: ijamespine <ijamespine@me.com>
This commit is contained in:
Cursor Agent
2025-10-24 04:26:14 +00:00
parent facf10b299
commit d6941aaa14
5 changed files with 82 additions and 44 deletions

View File

@@ -1,25 +1,50 @@
//! Get sync metrics action
use crate::ops::{LibraryQuery, LibraryQueryContext};
use crate::infra::query::{LibraryQuery, QueryError, QueryResult};
use crate::service::sync::metrics::snapshot::SyncMetricsSnapshot;
use crate::context::CoreContext;
use anyhow::Result;
use std::sync::Arc;
use super::{GetSyncMetricsInput, GetSyncMetricsOutput};
/// Get sync metrics for the current library
pub struct GetSyncMetrics;
pub struct GetSyncMetrics {
pub input: GetSyncMetricsInput,
}
impl LibraryQuery for GetSyncMetrics {
type Input = GetSyncMetricsInput;
type Output = GetSyncMetricsOutput;
async fn execute(input: Self::Input, ctx: LibraryQueryContext) -> Result<Self::Output> {
// Get the sync service from the library
let sync_service = ctx.library.sync_service().await?;
fn from_input(input: Self::Input) -> QueryResult<Self> {
Ok(Self { input })
}
async fn execute(
self,
context: Arc<CoreContext>,
session: crate::infra::api::SessionContext,
) -> QueryResult<Self::Output> {
// Use the input from the query
let input = self.input;
// Get the specific library from the library manager
let library_id = session
.current_library_id
.ok_or_else(|| QueryError::Internal("No library in session".to_string()))?;
let library = context
.libraries()
.await
.get_library(library_id)
.await
.ok_or_else(|| QueryError::LibraryNotFound(library_id))?;
let sync_service = library.sync_service()
.ok_or_else(|| QueryError::Internal("Sync service not available".to_string()))?;
let metrics = sync_service.metrics();
// Create a snapshot of current metrics
let mut snapshot = SyncMetricsSnapshot::from_metrics(metrics).await;
let mut snapshot = SyncMetricsSnapshot::from_metrics(metrics.metrics()).await;
// Apply filters
if let Some(since) = input.since {

View File

@@ -29,6 +29,11 @@ impl SyncMetricsCollector {
self.max_history_size = size;
self
}
/// Get the underlying metrics
pub fn metrics(&self) -> &Arc<SyncMetrics> {
&self.metrics
}
/// Get reference to metrics
pub fn metrics(&self) -> &Arc<SyncMetrics> {
@@ -83,10 +88,16 @@ impl SyncMetricsCollector {
// Update time in previous state
{
let mut state_entered_at = self.metrics.state.state_entered_at.write().await;
let mut total_time = self.metrics.state.total_time_in_state.write().await;
let duration = now.signed_duration_since(*self.metrics.state.state_entered_at.read().await);
// Calculate duration BEFORE updating the entry time
let duration = now.signed_duration_since(*state_entered_at);
*total_time.entry(from).or_insert(std::time::Duration::ZERO) +=
std::time::Duration::from_millis(duration.num_milliseconds().max(0) as u64);
// Update entry time for new state AFTER calculating duration
*state_entered_at = now;
}
debug!(

View File

@@ -31,7 +31,7 @@ pub struct SyncMetricsSnapshot {
}
/// State metrics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncStateSnapshot {
pub current_state: DeviceSyncState,
pub state_entered_at: DateTime<Utc>,
@@ -42,7 +42,7 @@ pub struct SyncStateSnapshot {
}
/// Operation metrics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OperationSnapshot {
// Broadcasts
pub broadcasts_sent: u64,
@@ -69,7 +69,7 @@ pub struct OperationSnapshot {
}
/// Data volume metrics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DataVolumeSnapshot {
pub entries_synced: HashMap<String, u64>,
pub entries_by_device: HashMap<Uuid, DeviceMetricsSnapshot>,
@@ -90,7 +90,7 @@ pub struct DeviceMetricsSnapshot {
}
/// Performance metrics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PerformanceSnapshot {
pub broadcast_latency: LatencySnapshot,
pub apply_latency: LatencySnapshot,
@@ -114,7 +114,7 @@ pub struct LatencySnapshot {
}
/// Error metrics snapshot
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ErrorSnapshot {
pub total_errors: u64,
pub network_errors: u64,
@@ -281,7 +281,7 @@ impl SyncMetricsSnapshot {
self.data_volume.last_sync_per_model.retain(|model, _| model == model_type);
// Filter recent errors
self.errors.recent_errors.retain(|error| error.model_type.as_ref() == Some(&model_type.to_string()));
self.errors.recent_errors.retain(|error| error.model_type.as_ref() == Some(model_type));
}
}

View File

@@ -160,7 +160,7 @@ impl SyncService {
/// Emit metrics update event
pub async fn emit_metrics_event(&self, library_id: Uuid) {
// Create a snapshot of current metrics
let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&self.metrics).await;
let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(self.metrics.metrics()).await;
// Emit event
self.peer_sync.event_bus().emit(crate::infra::event::Event::Custom {
@@ -484,10 +484,10 @@ impl crate::service::Service for SyncService {
// Spawn metrics persistence task (runs every 5 minutes)
let metrics = self.metrics.clone();
let library_id = library_id;
let library_id = self.library_id;
let db = self.peer_sync.db().clone();
tokio::spawn(async move {
Self::run_metrics_persistence_task(metrics, library_id, db).await;
run_metrics_persistence_task(metrics, library_id, db).await;
});
info!("Peer sync service started (with pruning task)");
@@ -517,35 +517,35 @@ impl crate::service::Service for SyncService {
Ok(())
}
}
/// Background task for persisting metrics snapshots
async fn run_metrics_persistence_task(
metrics: Arc<SyncMetricsCollector>,
library_id: Uuid,
db: Arc<sea_orm::DatabaseConnection>,
) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // 5 minutes
/// Background task for persisting metrics snapshots
async fn run_metrics_persistence_task(
metrics: Arc<SyncMetricsCollector>,
library_id: Uuid,
db: Arc<sea_orm::DatabaseConnection>,
) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // 5 minutes
info!("Starting metrics persistence task (interval: 5m)");
loop {
interval.tick().await;
info!("Starting metrics persistence task (interval: 5m)");
// Create snapshot
let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(metrics.metrics()).await;
loop {
interval.tick().await;
// Create snapshot
let snapshot = crate::service::sync::metrics::snapshot::SyncMetricsSnapshot::from_metrics(&metrics).await;
// Store in database
if let Err(e) = crate::service::sync::metrics::persistence::store_metrics_snapshot(
&db,
library_id,
snapshot,
).await {
warn!(
library_id = %library_id,
error = %e,
"Failed to persist metrics snapshot"
);
}
// Store in database
if let Err(e) = crate::service::sync::metrics::persistence::store_metrics_snapshot(
&db,
library_id,
snapshot,
).await {
warn!(
library_id = %library_id,
error = %e,
"Failed to persist metrics snapshot"
);
}
}
}

View File

@@ -2135,7 +2135,8 @@ impl PeerSync {
}
// Record state transition
self.metrics.record_state_transition(current_state, DeviceSyncState::CatchingUp { buffered_count: 0 }, Some("transitioning to ready".to_string())).await?;
let buffered_count = self.buffer.len().await;
self.metrics.record_state_transition(current_state, DeviceSyncState::CatchingUp { buffered_count }, Some("transitioning to ready".to_string())).await?;
// Process buffer
while let Some(update) = self.buffer.pop_ordered().await {
@@ -2156,7 +2157,8 @@ impl PeerSync {
}
// Record state transition
self.metrics.record_state_transition(DeviceSyncState::CatchingUp { buffered_count: 0 }, DeviceSyncState::Ready, Some("buffered updates processed".to_string())).await?;
let buffered_count = self.buffer.len().await;
self.metrics.record_state_transition(DeviceSyncState::CatchingUp { buffered_count }, DeviceSyncState::Ready, Some("buffered updates processed".to_string())).await?;
info!("Sync service is now ready");