From e4d99f1faba775ca875061a656bdeb0a56647580 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 20 Oct 2025 18:32:39 -0700 Subject: [PATCH] feat: add optional library method in IndexingCtx and methods to update watermarks in PeerSync --- core/src/ops/indexing/ctx.rs | 9 +++ core/src/service/sync/peer.rs | 112 ++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/core/src/ops/indexing/ctx.rs b/core/src/ops/indexing/ctx.rs index 172651fb6..ed8aa82ed 100644 --- a/core/src/ops/indexing/ctx.rs +++ b/core/src/ops/indexing/ctx.rs @@ -15,6 +15,11 @@ pub trait IndexingCtx { /// Access to the library database connection fn library_db(&self) -> &DatabaseConnection; + /// Access to the library for sync operations (optional - only available in job context) + fn library(&self) -> Option<&Library> { + None + } + /// Lightweight logging hook fn log(&self, message: impl AsRef) { tracing::debug!(message = %message.as_ref()); @@ -25,6 +30,10 @@ impl<'a> IndexingCtx for JobContext<'a> { fn library_db(&self) -> &DatabaseConnection { self.library_db() } + + fn library(&self) -> Option<&Library> { + Some(self.library()) + } } /// Context for responder paths running outside the job system diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index c8d0445bd..fc1a25c77 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -285,6 +285,112 @@ impl PeerSync { Self::query_device_watermarks(self.device_id, self.db.as_ref()).await } + /// Update state watermark for a device after processing state changes + async fn update_state_watermark(&self, device_id: Uuid, timestamp: chrono::DateTime) -> Result<()> { + use crate::infra::db::entities; + use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter}; + + // Find device and update watermark + let device = entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device_id)) + .one(self.db.as_ref()) + .await + .map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))? + .ok_or_else(|| anyhow::anyhow!("Device not found: {}", device_id))?; + + let mut device_active: entities::device::ActiveModel = device.into(); + + // Update watermark if this timestamp is newer + match &device_active.last_state_watermark { + Set(Some(current)) | sea_orm::ActiveValue::Unchanged(Some(current)) if timestamp <= *current => { + // Don't update if we already have a newer watermark + return Ok(()); + } + _ => { + device_active.last_state_watermark = Set(Some(timestamp)); + } + } + + device_active.update(self.db.as_ref()).await + .map_err(|e| anyhow::anyhow!("Failed to update state watermark: {}", e))?; + + debug!("Updated state watermark for device {} to {}", device_id, timestamp); + Ok(()) + } + + /// Update shared watermark for this device after processing shared changes + async fn update_shared_watermark(&self, hlc: HLC) -> Result<()> { + use crate::infra::db::entities; + use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter}; + + // Find device and update watermark + let device = entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(self.device_id)) + .one(self.db.as_ref()) + .await + .map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))? + .ok_or_else(|| anyhow::anyhow!("Device not found: {}", self.device_id))?; + + let mut device_active: entities::device::ActiveModel = device.into(); + + // Serialize HLC to JSON + let hlc_json = serde_json::to_string(&hlc) + .map_err(|e| anyhow::anyhow!("Failed to serialize HLC: {}", e))?; + + // Update watermark if this HLC is newer + let should_update = match &device_active.last_shared_watermark { + Set(Some(current_json)) | sea_orm::ActiveValue::Unchanged(Some(current_json)) => { + if let Ok(current_hlc) = serde_json::from_str::(current_json) { + hlc > current_hlc + } else { + true // Invalid JSON, update anyway + } + } + _ => true // No watermark set yet + }; + + if should_update { + device_active.last_shared_watermark = Set(Some(hlc_json)); + device_active.update(self.db.as_ref()).await + .map_err(|e| anyhow::anyhow!("Failed to update shared watermark: {}", e))?; + debug!("Updated shared watermark for device {} to {}", self.device_id, hlc); + } + + Ok(()) + } + + /// Set initial watermarks after backfill (called by backfill manager) + pub async fn set_initial_watermarks(&self) -> Result<()> { + use crate::infra::db::entities; + use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter}; + + // Get current time as initial state watermark + let now = chrono::Utc::now(); + + // Get current HLC from generator as shared watermark + let current_hlc = self.hlc_generator.lock().await.next(); + + // Update this device's watermarks + let device = entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(self.device_id)) + .one(self.db.as_ref()) + .await + .map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))? + .ok_or_else(|| anyhow::anyhow!("Device not found: {}", self.device_id))?; + + let mut device_active: entities::device::ActiveModel = device.into(); + device_active.last_state_watermark = Set(Some(now)); + device_active.last_shared_watermark = Set(Some( + serde_json::to_string(¤t_hlc).unwrap_or_default() + )); + + device_active.update(self.db.as_ref()).await + .map_err(|e| anyhow::anyhow!("Failed to set initial watermarks: {}", e))?; + + info!("Set initial watermarks: state={}, shared={}", now, current_hlc); + Ok(()) + } + /// Exchange watermarks with a peer and trigger catch-up if needed /// /// Sends a WatermarkExchangeRequest to the peer with our watermarks. @@ -1497,6 +1603,9 @@ impl PeerSync { "State change applied successfully" ); + // Update state watermark for the device + self.update_state_watermark(change.device_id, change.timestamp).await?; + // Emit event self.event_bus.emit(Event::Custom { event_type: format!("{}_synced", change.model_type), @@ -1554,6 +1663,9 @@ impl PeerSync { "Shared change applied successfully" ); + // Update shared watermark + self.update_shared_watermark(entry.hlc).await?; + // Send ACK to sender for pruning let sender_device_id = entry.hlc.device_id; let up_to_hlc = entry.hlc;