feat: add optional library method in IndexingCtx and methods to update watermarks in PeerSync

This commit is contained in:
Jamie Pine
2025-10-20 18:32:39 -07:00
parent 123a81db80
commit e4d99f1fab
2 changed files with 121 additions and 0 deletions

View File

@@ -15,6 +15,11 @@ pub trait IndexingCtx {
/// Access to the library database connection
fn library_db(&self) -> &DatabaseConnection;
/// Access to the library for sync operations (optional - only available in job context)
fn library(&self) -> Option<&Library> {
None
}
/// Lightweight logging hook
fn log(&self, message: impl AsRef<str>) {
tracing::debug!(message = %message.as_ref());
@@ -25,6 +30,10 @@ impl<'a> IndexingCtx for JobContext<'a> {
fn library_db(&self) -> &DatabaseConnection {
self.library_db()
}
fn library(&self) -> Option<&Library> {
Some(self.library())
}
}
/// Context for responder paths running outside the job system

View File

@@ -285,6 +285,112 @@ impl PeerSync {
Self::query_device_watermarks(self.device_id, self.db.as_ref()).await
}
/// Update state watermark for a device after processing state changes
async fn update_state_watermark(&self, device_id: Uuid, timestamp: chrono::DateTime<chrono::Utc>) -> Result<()> {
use crate::infra::db::entities;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter};
// Find device and update watermark
let device = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device_id))
.one(self.db.as_ref())
.await
.map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))?
.ok_or_else(|| anyhow::anyhow!("Device not found: {}", device_id))?;
let mut device_active: entities::device::ActiveModel = device.into();
// Update watermark if this timestamp is newer
match &device_active.last_state_watermark {
Set(Some(current)) | sea_orm::ActiveValue::Unchanged(Some(current)) if timestamp <= *current => {
// Don't update if we already have a newer watermark
return Ok(());
}
_ => {
device_active.last_state_watermark = Set(Some(timestamp));
}
}
device_active.update(self.db.as_ref()).await
.map_err(|e| anyhow::anyhow!("Failed to update state watermark: {}", e))?;
debug!("Updated state watermark for device {} to {}", device_id, timestamp);
Ok(())
}
/// Update shared watermark for this device after processing shared changes
async fn update_shared_watermark(&self, hlc: HLC) -> Result<()> {
use crate::infra::db::entities;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter};
// Find device and update watermark
let device = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(self.device_id))
.one(self.db.as_ref())
.await
.map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))?
.ok_or_else(|| anyhow::anyhow!("Device not found: {}", self.device_id))?;
let mut device_active: entities::device::ActiveModel = device.into();
// Serialize HLC to JSON
let hlc_json = serde_json::to_string(&hlc)
.map_err(|e| anyhow::anyhow!("Failed to serialize HLC: {}", e))?;
// Update watermark if this HLC is newer
let should_update = match &device_active.last_shared_watermark {
Set(Some(current_json)) | sea_orm::ActiveValue::Unchanged(Some(current_json)) => {
if let Ok(current_hlc) = serde_json::from_str::<HLC>(current_json) {
hlc > current_hlc
} else {
true // Invalid JSON, update anyway
}
}
_ => true // No watermark set yet
};
if should_update {
device_active.last_shared_watermark = Set(Some(hlc_json));
device_active.update(self.db.as_ref()).await
.map_err(|e| anyhow::anyhow!("Failed to update shared watermark: {}", e))?;
debug!("Updated shared watermark for device {} to {}", self.device_id, hlc);
}
Ok(())
}
/// Set initial watermarks after backfill (called by backfill manager)
pub async fn set_initial_watermarks(&self) -> Result<()> {
use crate::infra::db::entities;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter};
// Get current time as initial state watermark
let now = chrono::Utc::now();
// Get current HLC from generator as shared watermark
let current_hlc = self.hlc_generator.lock().await.next();
// Update this device's watermarks
let device = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(self.device_id))
.one(self.db.as_ref())
.await
.map_err(|e| anyhow::anyhow!("Failed to query device: {}", e))?
.ok_or_else(|| anyhow::anyhow!("Device not found: {}", self.device_id))?;
let mut device_active: entities::device::ActiveModel = device.into();
device_active.last_state_watermark = Set(Some(now));
device_active.last_shared_watermark = Set(Some(
serde_json::to_string(&current_hlc).unwrap_or_default()
));
device_active.update(self.db.as_ref()).await
.map_err(|e| anyhow::anyhow!("Failed to set initial watermarks: {}", e))?;
info!("Set initial watermarks: state={}, shared={}", now, current_hlc);
Ok(())
}
/// Exchange watermarks with a peer and trigger catch-up if needed
///
/// Sends a WatermarkExchangeRequest to the peer with our watermarks.
@@ -1497,6 +1603,9 @@ impl PeerSync {
"State change applied successfully"
);
// Update state watermark for the device
self.update_state_watermark(change.device_id, change.timestamp).await?;
// Emit event
self.event_bus.emit(Event::Custom {
event_type: format!("{}_synced", change.model_type),
@@ -1554,6 +1663,9 @@ impl PeerSync {
"Shared change applied successfully"
);
// Update shared watermark
self.update_shared_watermark(entry.hlc).await?;
// Send ACK to sender for pruning
let sender_device_id = entry.hlc.device_id;
let up_to_hlc = entry.hlc;