feat: enhance synchronization with per-resource watermark tracking and batching configuration

- Added support for per-resource watermarks in the synchronization process, allowing for fine-grained comparison and catch-up logic.
- Implemented new configuration options for real-time batching, including maximum entries before flush and flush interval in milliseconds.
- Enhanced the `apply_state_change` function to check for tombstoned records, preventing the resurrection of deleted entries.
- Updated various components to utilize the new watermark and batching features, improving synchronization efficiency and robustness.
This commit is contained in:
Jamie Pine
2025-11-16 04:56:41 -08:00
parent d93d1efaaa
commit a33ca1e884
12 changed files with 420 additions and 123 deletions

View File

@@ -75,6 +75,7 @@ Syncable
thumbstrips
tobiaslutke
tokio
tombstoned
typecheck
unwatch
uuid

View File

@@ -48,6 +48,8 @@ impl SyncConfig {
state_broadcast_batch_size: 500,
shared_broadcast_batch_size: 50,
max_snapshot_size: 50_000,
realtime_batch_max_entries: 50,
realtime_batch_flush_interval_ms: 25,
},
retention: RetentionConfig {
strategy: PruningStrategy::AcknowledgmentBased,
@@ -83,6 +85,8 @@ impl SyncConfig {
state_broadcast_batch_size: 2_000,
shared_broadcast_batch_size: 200,
max_snapshot_size: 200_000,
realtime_batch_max_entries: 200,
realtime_batch_flush_interval_ms: 100,
},
retention: RetentionConfig {
strategy: PruningStrategy::Conservative {
@@ -120,6 +124,8 @@ impl SyncConfig {
state_broadcast_batch_size: 500,
shared_broadcast_batch_size: 50,
max_snapshot_size: 50_000,
realtime_batch_max_entries: 50,
realtime_batch_flush_interval_ms: 100,
},
retention: RetentionConfig {
strategy: PruningStrategy::TimeBased { retention_days: 14 },
@@ -168,6 +174,18 @@ pub struct BatchingConfig {
/// Used for: SharedChangeResponse.current_state limit
/// Default: 100,000
pub max_snapshot_size: usize,
/// Real-time batching: maximum entries before flush
///
/// Used for: Event listener batching in peer.rs
/// Default: 100
pub realtime_batch_max_entries: usize,
/// Real-time batching: flush interval in milliseconds
///
/// Used for: Event listener batching in peer.rs
/// Default: 50ms
pub realtime_batch_flush_interval_ms: u64,
}
impl Default for BatchingConfig {
@@ -177,6 +195,8 @@ impl Default for BatchingConfig {
state_broadcast_batch_size: 1_000,
shared_broadcast_batch_size: 100,
max_snapshot_size: 100_000,
realtime_batch_max_entries: 100,
realtime_batch_flush_interval_ms: 50,
}
}
}

View File

@@ -260,9 +260,32 @@ impl PeerLog {
/// Get the minimum HLC that all peers have acknowledged
///
/// Excludes self-ACKs (where peer_device_id == our device_id) from calculation.
/// Self-ACKs should never exist, but filtering them defensively prevents stale
/// self-ACKs from blocking pruning.
/// Self-ACKs should never exist - if detected, a warning is logged.
async fn get_min_acked_hlc(&self) -> Result<Option<HLC>, PeerLogError> {
// Check for self-ACKs (should never exist)
let self_ack_check = self
.conn
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT COUNT(*) as count FROM peer_acks WHERE peer_device_id = ?",
vec![self.device_id.to_string().into()],
))
.await
.map_err(|e| PeerLogError::QueryError(e.to_string()))?;
if let Some(row) = self_ack_check {
let count: i64 = row
.try_get("", "count")
.map_err(|e| PeerLogError::QueryError(e.to_string()))?;
if count > 0 {
tracing::warn!(
device_id = %self.device_id,
self_ack_count = count,
"Self-ACKs detected in peer_acks table (should never exist). This indicates a bug in ACK handling."
);
}
}
let result = self
.conn
.query_one(Statement::from_sql_and_values(

View File

@@ -31,10 +31,10 @@ pub type SharedApplyFn = fn(
/// Parameters: device_id, since, batch_size, db
/// Returns: Vec of (uuid, data, timestamp)
pub type StateQueryFn = fn(
Option<uuid::Uuid>, // device_id filter
Option<chrono::DateTime<chrono::Utc>>, // since watermark
Option<(chrono::DateTime<chrono::Utc>, uuid::Uuid)>, // cursor for pagination
usize, // batch_size
Option<uuid::Uuid>, // device_id filter
Option<chrono::DateTime<chrono::Utc>>, // since watermark
Option<(chrono::DateTime<chrono::Utc>, uuid::Uuid)>, // cursor for pagination
usize, // batch_size
Arc<DatabaseConnection>,
) -> Pin<
Box<
@@ -156,7 +156,9 @@ pub async fn register_device_owned(
let mut registry = SYNCABLE_REGISTRY.write().await;
registry.insert(
model_type.to_string(),
SyncableModelRegistration::device_owned(model_type, table_name, apply_fn, query_fn, delete_fn),
SyncableModelRegistration::device_owned(
model_type, table_name, apply_fn, query_fn, delete_fn,
),
);
}
@@ -179,8 +181,8 @@ pub async fn register_shared(
/// All domain-specific logic lives in the entity implementations, not here.
fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
use crate::infra::db::entities::{
audit_log, collection, collection_entry, content_identity, device, entry, location, sidecar,
tag, tag_relationship, user_metadata, user_metadata_tag, volume,
audit_log, collection, collection_entry, content_identity, device, entry, location,
sidecar, tag, tag_relationship, user_metadata, user_metadata_tag, volume,
};
let mut registry = HashMap::new();
@@ -199,7 +201,14 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
location::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await
location::Model::query_for_sync(
device_id,
since,
cursor,
batch_size,
db.as_ref(),
)
.await
})
},
Some(|uuid, db| {
@@ -218,7 +227,8 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
volume::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await
volume::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref())
.await
})
},
Some(|uuid, db| {
@@ -237,7 +247,8 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
entry::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await
entry::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref())
.await
})
},
Some(|uuid, db| {
@@ -256,7 +267,8 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
device::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await
device::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref())
.await
})
},
None, // Devices don't support deletion sync
@@ -274,7 +286,8 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
tag::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref()).await
tag::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref())
.await
})
},
),
@@ -286,14 +299,20 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
"collection",
"collection",
|entry, db| {
Box::pin(async move {
collection::Model::apply_shared_change(entry, db.as_ref()).await
})
Box::pin(
async move { collection::Model::apply_shared_change(entry, db.as_ref()).await },
)
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
collection::Model::query_for_sync(device_id, since, cursor, batch_size, db.as_ref())
.await
collection::Model::query_for_sync(
device_id,
since,
cursor,
batch_size,
db.as_ref(),
)
.await
})
},
),
@@ -431,9 +450,9 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
"audit_log",
"audit_log",
|entry, db| {
Box::pin(async move {
audit_log::Model::apply_shared_change(entry, db.as_ref()).await
})
Box::pin(
async move { audit_log::Model::apply_shared_change(entry, db.as_ref()).await },
)
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
@@ -456,9 +475,9 @@ fn initialize_registry() -> HashMap<String, SyncableModelRegistration> {
"sidecar",
"sidecars",
|entry, db| {
Box::pin(async move {
sidecar::Model::apply_shared_change(entry, db.as_ref()).await
})
Box::pin(
async move { sidecar::Model::apply_shared_change(entry, db.as_ref()).await },
)
},
|device_id, since, cursor, batch_size, db| {
Box::pin(async move {
@@ -534,11 +553,48 @@ pub fn get_fk_mappings(model_type: &str) -> Option<Vec<super::FKMapping>> {
/// Apply a state-based sync entry (device-owned model)
///
/// Routes to the appropriate model's apply_state_change function via registry.
/// Enforces tombstone check to prevent resurrection of deleted records.
pub async fn apply_state_change(
model_type: &str,
data: serde_json::Value,
db: Arc<DatabaseConnection>,
) -> Result<(), ApplyError> {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// Extract UUID from data for tombstone check
let record_uuid = data
.get("uuid")
.and_then(|v| v.as_str())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
.ok_or_else(|| {
ApplyError::DatabaseError(
"Missing or invalid UUID in state change data".to_string(),
)
})?;
// Check if record is tombstoned (prevents resurrection of deleted records)
let is_tombstoned = crate::infra::db::entities::device_state_tombstone::Entity::find()
.filter(
crate::infra::db::entities::device_state_tombstone::Column::ModelType.eq(model_type),
)
.filter(
crate::infra::db::entities::device_state_tombstone::Column::RecordUuid
.eq(record_uuid),
)
.one(db.as_ref())
.await
.map_err(|e| ApplyError::DatabaseError(e.to_string()))?
.is_some();
if is_tombstoned {
tracing::debug!(
model_type = %model_type,
uuid = %record_uuid,
"Skipping state change for tombstoned record (prevents resurrection)"
);
return Ok(());
}
let apply_fn = {
let registry = SYNCABLE_REGISTRY.read().await;
let registration = registry
@@ -681,7 +737,10 @@ pub async fn query_all_shared_models(
since: Option<chrono::DateTime<chrono::Utc>>,
batch_size: usize,
db: Arc<DatabaseConnection>,
) -> Result<HashMap<String, Vec<(uuid::Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)>>, ApplyError> {
) -> Result<
HashMap<String, Vec<(uuid::Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)>>,
ApplyError,
> {
// Collect all shared models with query functions
let shared_models: Vec<(String, StateQueryFn)> = {
let registry = SYNCABLE_REGISTRY.read().await;

View File

@@ -70,7 +70,6 @@ pub struct TransactionManager {
event_bus: Arc<EventBus>,
/// Current sequence number per library (library_id -> sequence)
/// TODO: Replace with HLC in leaderless architecture
sync_sequence: Arc<Mutex<std::collections::HashMap<Uuid, u64>>>,
}
@@ -254,7 +253,6 @@ impl TransactionManager {
}
/// Get the next sequence number for a library
/// TODO: Replace with HLC in leaderless architecture
async fn next_sequence(&self, library_id: Uuid) -> Result<u64> {
let mut sequences = self.sync_sequence.lock().await;
let seq = sequences.entry(library_id).or_insert(0);

View File

@@ -256,6 +256,49 @@ impl ResourceWatermarkStore {
Ok(result.rows_affected() as usize)
}
/// Get our latest watermark for each resource type (aggregated across all peers)
///
/// Returns a HashMap mapping resource_type -> max(last_watermark) across all peers.
/// This represents what we've successfully received from our peers.
pub async fn get_our_resource_watermarks<C: ConnectionTrait>(
&self,
conn: &C,
) -> Result<std::collections::HashMap<String, DateTime<Utc>>, WatermarkError> {
let rows = conn
.query_all(Statement::from_sql_and_values(
DbBackend::Sqlite,
r#"
SELECT resource_type, MAX(last_watermark) as max_watermark
FROM device_resource_watermarks
WHERE device_uuid = ?
GROUP BY resource_type
ORDER BY resource_type
"#,
vec![self.device_uuid.to_string().into()],
))
.await
.map_err(|e| WatermarkError::QueryError(e.to_string()))?;
let mut results = std::collections::HashMap::new();
for row in rows {
let resource_type: String = row
.try_get("", "resource_type")
.map_err(|e| WatermarkError::QueryError(e.to_string()))?;
let watermark_str: String = row
.try_get("", "max_watermark")
.map_err(|e| WatermarkError::QueryError(e.to_string()))?;
let dt = DateTime::parse_from_rfc3339(&watermark_str)
.map_err(|e| WatermarkError::ParseError(e.to_string()))?
.with_timezone(&Utc);
results.insert(resource_type, dt);
}
Ok(results)
}
}
/// Watermark errors

View File

@@ -246,9 +246,9 @@ impl SyncProtocolHandler {
// Create checkpoint: "timestamp|uuid" format
let next_checkpoint = if has_more {
records.last().map(|r| {
format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid)
})
records
.last()
.map(|r| format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid))
} else {
None
};
@@ -397,25 +397,39 @@ impl SyncProtocolHandler {
SyncMessage::WatermarkExchangeRequest {
library_id,
device_id,
my_state_watermark: peer_state_watermark,
my_shared_watermark: peer_shared_watermark,
my_resource_watermarks: peer_resource_watermarks,
} => {
debug!(
from_device = %from_device,
peer_state_watermark = ?peer_state_watermark,
peer_shared_watermark = ?peer_shared_watermark,
"Processing WatermarkExchangeRequest"
peer_resource_count = peer_resource_watermarks.len(),
"Processing WatermarkExchangeRequest with per-resource watermarks"
);
// Get our current watermarks
let (our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await;
let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await;
let our_resource_watermarks =
crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id())
.get_our_resource_watermarks(peer_sync.peer_log_conn())
.await
.unwrap_or_default();
// Determine if peer needs catch-up by comparing watermarks
let needs_state_catchup = match (peer_state_watermark, our_state_watermark) {
(Some(peer_ts), Some(our_ts)) => our_ts > peer_ts,
(None, Some(_)) => true,
_ => false,
};
// Determine if peer needs catch-up by comparing per-resource watermarks
let mut needs_state_catchup = false;
for (resource_type, our_ts) in &our_resource_watermarks {
match peer_resource_watermarks.get(resource_type) {
Some(peer_ts) if our_ts > peer_ts => {
needs_state_catchup = true;
break;
}
None => {
needs_state_catchup = true;
break;
}
_ => {}
}
}
let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) {
(Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc,
@@ -427,43 +441,44 @@ impl SyncProtocolHandler {
from_device = %from_device,
needs_state_catchup = needs_state_catchup,
needs_shared_catchup = needs_shared_catchup,
"Responding to watermark exchange request"
our_resource_count = our_resource_watermarks.len(),
"Responding to watermark exchange request with per-resource watermarks"
);
Ok(Some(SyncMessage::WatermarkExchangeResponse {
library_id: self.library_id,
device_id: peer_sync.device_id(),
state_watermark: our_state_watermark,
shared_watermark: our_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
resource_watermarks: our_resource_watermarks,
}))
}
SyncMessage::WatermarkExchangeResponse {
library_id,
device_id,
state_watermark: peer_state_watermark,
shared_watermark: peer_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
resource_watermarks: peer_resource_watermarks,
} => {
debug!(
from_device = %from_device,
peer_state_watermark = ?peer_state_watermark,
peer_shared_watermark = ?peer_shared_watermark,
needs_state_catchup = needs_state_catchup,
needs_shared_catchup = needs_shared_catchup,
"Processing WatermarkExchangeResponse"
peer_resource_count = peer_resource_watermarks.len(),
"Processing WatermarkExchangeResponse with per-resource watermarks"
);
peer_sync
.on_watermark_exchange_response(
from_device,
peer_state_watermark,
peer_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
peer_resource_watermarks,
)
.await
.map_err(|e| {
@@ -509,7 +524,10 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler
) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
tracing::info!("SyncProtocolHandler: Stream accepted from node {}", remote_node_id);
tracing::info!(
"SyncProtocolHandler: Stream accepted from node {}",
remote_node_id
);
// Map node_id to device_id using device registry
let from_device = {
@@ -532,7 +550,10 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler
};
// Read request with length prefix
tracing::info!("SyncProtocolHandler: Reading request from device {}...", from_device);
tracing::info!(
"SyncProtocolHandler: Reading request from device {}...",
from_device
);
let mut len_buf = [0u8; 4];
if let Err(e) = recv.read_exact(&mut len_buf).await {
// This is normal if peer just opened connection to test connectivity
@@ -661,13 +682,14 @@ mod tests {
#[test]
fn test_handler_creation() {
// Test uses mock registry
use crate::service::network::device::DeviceRegistry;
use crate::device::DeviceManager;
use crate::service::network::device::DeviceRegistry;
use std::path::PathBuf;
let device_manager = Arc::new(DeviceManager::new().unwrap());
let logger = Arc::new(crate::service::network::utils::SilentLogger);
let registry = DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap();
let registry =
DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap();
let device_registry = Arc::new(tokio::sync::RwLock::new(registry));
let handler = SyncProtocolHandler::new(Uuid::new_v4(), device_registry);

View File

@@ -103,18 +103,20 @@ pub enum SyncMessage {
WatermarkExchangeRequest {
library_id: Uuid,
device_id: Uuid, // Requesting device
my_state_watermark: Option<DateTime<Utc>>,
my_shared_watermark: Option<HLC>,
/// Per-resource state watermarks (resource_type -> timestamp)
my_resource_watermarks: std::collections::HashMap<String, DateTime<Utc>>,
},
/// Response with peer's watermarks
WatermarkExchangeResponse {
library_id: Uuid,
device_id: Uuid, // Responding device
state_watermark: Option<DateTime<Utc>>,
shared_watermark: Option<HLC>,
needs_state_catchup: bool, // If true, peer needs our state
needs_shared_catchup: bool, // If true, peer needs our shared changes
/// Per-resource state watermarks (resource_type -> timestamp)
resource_watermarks: std::collections::HashMap<String, DateTime<Utc>>,
},
/// Error response

View File

@@ -108,6 +108,26 @@ impl DependencyTracker {
pub async fn dependency_count(&self) -> usize {
self.waiting_for.read().await.len()
}
/// Get all pending dependency UUIDs (for requesting from peers)
///
/// Returns a list of UUIDs that are currently blocking updates.
/// These can be requested from peers to resolve stuck dependencies.
pub async fn get_pending_dependency_uuids(&self) -> Vec<Uuid> {
let waiting = self.waiting_for.read().await;
waiting.keys().copied().collect()
}
/// Clear all pending dependencies (timeout/force sync fallback)
///
/// Call this as a last resort when dependencies cannot be resolved.
/// Returns the number of dependencies cleared.
pub async fn clear_all(&self) -> usize {
let mut waiting = self.waiting_for.write().await;
let count = waiting.len();
waiting.clear();
count
}
}
impl Default for DependencyTracker {

View File

@@ -251,6 +251,11 @@ impl PeerSync {
self.library_id
}
/// Get peer log connection for watermark queries
pub fn peer_log_conn(&self) -> &sea_orm::DatabaseConnection {
self.peer_log.conn()
}
/// Check if real-time sync is currently active (lock mechanism)
///
/// Returns true if real-time broadcasts happened in the last 60 seconds.
@@ -448,21 +453,31 @@ impl PeerSync {
);
// Get our watermarks
let (my_state_watermark, my_shared_watermark) = self.get_watermarks().await;
let (_my_state_watermark, my_shared_watermark) = self.get_watermarks().await;
// Get per-resource watermarks for fine-grained comparison
let my_resource_watermarks =
crate::infra::sync::ResourceWatermarkStore::new(self.device_id)
.get_our_resource_watermarks(self.peer_log.conn())
.await
.unwrap_or_else(|e| {
warn!(error = %e, "Failed to get per-resource watermarks");
std::collections::HashMap::new()
});
debug!(
peer = %peer_id,
my_state_watermark = ?my_state_watermark,
my_shared_watermark = ?my_shared_watermark,
"Sending watermark exchange request"
resource_count = my_resource_watermarks.len(),
"Sending watermark exchange request with per-resource watermarks"
);
// Send request to peer
let request = SyncMessage::WatermarkExchangeRequest {
library_id: self.library_id,
device_id: self.device_id,
my_state_watermark,
my_shared_watermark,
my_resource_watermarks,
};
self.network
@@ -485,47 +500,75 @@ impl PeerSync {
pub async fn on_watermark_exchange_response(
&self,
peer_id: Uuid,
peer_state_watermark: Option<chrono::DateTime<chrono::Utc>>,
peer_shared_watermark: Option<HLC>,
needs_state_catchup: bool,
needs_shared_catchup: bool,
peer_resource_watermarks: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
) -> Result<()> {
info!(
peer = %peer_id,
peer_state_watermark = ?peer_state_watermark,
peer_shared_watermark = ?peer_shared_watermark,
needs_state_catchup = needs_state_catchup,
needs_shared_catchup = needs_shared_catchup,
"Received watermark exchange response"
peer_resource_count = peer_resource_watermarks.len(),
"Received watermark exchange response with per-resource watermarks"
);
// Get our watermarks to compare
let (my_state_watermark, my_shared_watermark) = self.get_watermarks().await;
let (_my_state_watermark, my_shared_watermark) = self.get_watermarks().await;
let my_resource_watermarks =
crate::infra::sync::ResourceWatermarkStore::new(self.device_id)
.get_our_resource_watermarks(self.peer_log.conn())
.await
.unwrap_or_default();
// Determine if WE need to catch up based on watermark comparison
// Determine if WE need to catch up based on per-resource watermark comparison
let mut we_need_state_catchup = false;
let mut we_need_shared_catchup = false;
let mut resources_needing_catchup = Vec::new();
// Compare state watermarks (timestamps)
match (my_state_watermark, peer_state_watermark) {
(Some(my_ts), Some(peer_ts)) if peer_ts > my_ts => {
info!(
peer = %peer_id,
my_timestamp = %my_ts,
peer_timestamp = %peer_ts,
"Peer has newer state, need to catch up"
);
we_need_state_catchup = true;
}
(None, Some(_)) => {
info!(peer = %peer_id, "We have no state watermark, need full state catch-up");
we_need_state_catchup = true;
}
_ => {
debug!(peer = %peer_id, "State watermarks in sync");
// Compare per-resource watermarks (CRITICAL FIX: Issue #10)
// This fixes the bug where global watermark comparison missed per-resource divergence
for (resource_type, peer_ts) in &peer_resource_watermarks {
match my_resource_watermarks.get(resource_type) {
Some(my_ts) if peer_ts > my_ts => {
info!(
peer = %peer_id,
resource_type = %resource_type,
my_timestamp = %my_ts,
peer_timestamp = %peer_ts,
"Peer has newer data for this resource"
);
resources_needing_catchup.push(resource_type.clone());
we_need_state_catchup = true;
}
None => {
info!(
peer = %peer_id,
resource_type = %resource_type,
peer_timestamp = %peer_ts,
"We have no watermark for this resource, need catch-up"
);
resources_needing_catchup.push(resource_type.clone());
we_need_state_catchup = true;
}
_ => {
debug!(
resource_type = %resource_type,
"Resource in sync with peer"
);
}
}
}
if we_need_state_catchup {
info!(
peer = %peer_id,
resources = ?resources_needing_catchup,
"Need state catch-up for specific resources"
);
}
// Compare shared watermarks (HLC)
match (my_shared_watermark, peer_shared_watermark) {
(Some(my_hlc), Some(peer_hlc)) if peer_hlc > my_hlc => {
@@ -555,10 +598,11 @@ impl PeerSync {
if let Some(weak_ref) = backfill_mgr.as_ref() {
if let Some(manager) = weak_ref.upgrade() {
// Trigger incremental catch-up from this peer
// Note: We now use per-resource watermarks instead of global state watermark
if let Err(e) = manager
.catch_up_from_peer(
peer_id,
my_state_watermark,
None, // Per-resource watermarks used instead of global
my_shared_watermark.map(|hlc| hlc.to_string()),
)
.await
@@ -615,7 +659,8 @@ impl PeerSync {
}
// Update devices table with peer's watermarks for future comparisons
self.update_peer_watermarks(peer_id, peer_state_watermark, peer_shared_watermark)
// Note: We now track per-resource watermarks, not global state watermark
self.update_peer_watermarks(peer_id, None, peer_shared_watermark)
.await?;
info!(peer = %peer_id, "Watermark exchange complete");
@@ -798,10 +843,11 @@ impl PeerSync {
let mut last_lag_warning = std::time::Instant::now();
let lag_warning_cooldown = std::time::Duration::from_secs(5);
// Real-time batching mechanism (accumulate up to 100 entries or 50ms)
// Real-time batching mechanism (configurable via SyncConfig)
let mut state_change_batch: Vec<serde_json::Value> = Vec::new();
let mut batch_flush_interval =
tokio::time::interval(std::time::Duration::from_millis(50));
let mut batch_flush_interval = tokio::time::interval(std::time::Duration::from_millis(
config.batching.realtime_batch_flush_interval_ms,
));
batch_flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
while is_running.load(Ordering::SeqCst) {
@@ -824,18 +870,18 @@ impl PeerSync {
state_change_count += 1;
last_event_type = Some(format!("StateChange({})", model_type));
// Add to batch instead of processing immediately
state_change_batch.push(serde_json::json!({
"library_id": event_library_id,
"model_type": model_type,
"record_uuid": record_uuid,
"device_id": device_id,
"data": data,
"timestamp": timestamp,
}));
// Add to batch instead of processing immediately
state_change_batch.push(serde_json::json!({
"library_id": event_library_id,
"model_type": model_type,
"record_uuid": record_uuid,
"device_id": device_id,
"data": data,
"timestamp": timestamp,
}));
// Flush if batch reaches 100 entries
if state_change_batch.len() >= 100 {
// Flush if batch reaches configured max entries
if state_change_batch.len() >= config.batching.realtime_batch_max_entries {
Self::flush_state_change_batch(
library_id,
&mut state_change_batch,
@@ -919,7 +965,7 @@ impl PeerSync {
}
}
// Flush batch on timer (every 50ms)
// Flush batch on timer (configurable interval)
_ = batch_flush_interval.tick() => {
if !state_change_batch.is_empty() {
Self::flush_state_change_batch(
@@ -1133,22 +1179,31 @@ impl PeerSync {
);
// Query our watermarks from sync.db
let (my_state_watermark, my_shared_watermark) =
let (_my_state_watermark, my_shared_watermark) =
Self::query_device_watermarks(device_id, peer_log).await;
// Get per-resource watermarks for fine-grained comparison
let my_resource_watermarks = crate::infra::sync::ResourceWatermarkStore::new(device_id)
.get_our_resource_watermarks(peer_log.conn())
.await
.unwrap_or_else(|e| {
warn!(error = %e, "Failed to get per-resource watermarks in static exchange");
std::collections::HashMap::new()
});
debug!(
peer = %peer_id,
my_state_watermark = ?my_state_watermark,
my_shared_watermark = ?my_shared_watermark,
"Sending watermark exchange request"
resource_count = my_resource_watermarks.len(),
"Sending watermark exchange request with per-resource watermarks"
);
// Send request to peer
let request = SyncMessage::WatermarkExchangeRequest {
library_id,
device_id,
my_state_watermark,
my_shared_watermark,
my_resource_watermarks,
};
network
@@ -1249,9 +1304,17 @@ impl PeerSync {
let timestamp = data
.get("timestamp")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(Utc::now);
.ok_or_else(|| anyhow::anyhow!("Missing or invalid timestamp in state_change event"))?;
let timestamp = chrono::DateTime::parse_from_rfc3339(timestamp)
.map_err(|e| {
anyhow::anyhow!(
"Failed to parse timestamp '{}': {}. This may indicate clock skew.",
timestamp,
e
)
})?
.with_timezone(&chrono::Utc);
let change = StateChangeMessage {
model_type,
@@ -2357,9 +2420,6 @@ impl PeerSync {
}
/// Get device-owned state for backfill (StateRequest)
///
/// This is completely domain-agnostic - it delegates to the Syncable trait
/// implementations in each entity. No switch statements, no domain logic.
pub async fn get_device_state(
&self,
model_types: Vec<String>,
@@ -2601,15 +2661,43 @@ impl PeerSync {
}
}
// Process dependency tracker (NEW: flush all tracked dependencies)
// Note: We can't easily extract from HashMap during iteration, so we log the issue
// In practice, dependencies should have been resolved during normal operation
// This is a fallback for stuck dependencies
// Process dependency tracker - handle unresolved dependencies
// This is a fallback for dependencies that couldn't be resolved during normal operation
if !dep_stats.is_empty() {
warn!(
"Dependency tracker still has {} unresolved dependencies - these entries may have circular or missing parent references",
dep_stats.total_dependencies
dependencies = dep_stats.total_dependencies,
waiting_updates = dep_stats.total_waiting_updates,
"Dependency tracker has unresolved dependencies at Ready transition"
);
// Get list of missing UUIDs for diagnostic purposes
let missing_uuids = self.dependency_tracker.get_pending_dependency_uuids().await;
if missing_uuids.len() <= 10 {
// Log specific UUIDs if count is manageable
warn!(
?missing_uuids,
"Missing dependency UUIDs (may be circular references or orphaned data)"
);
} else {
warn!(
missing_count = missing_uuids.len(),
sample_uuids = ?&missing_uuids[..10],
"Many missing dependencies (showing first 10)"
);
}
// Strategy: Clear dependencies after logging to prevent blocking sync indefinitely
// These entries either have:
// - Circular dependencies (impossible to resolve)
// - References to deleted records
// - Incomplete sync data from peer
// They will be resynced on next full backfill if the data becomes available
let cleared_count = self.dependency_tracker.clear_all().await;
warn!(
cleared_count,
"Cleared unresolved dependencies to prevent sync deadlock. These updates will be retried on next full sync."
);
}
info!(

View File

@@ -201,14 +201,35 @@ impl MockTransport {
SyncMessage::WatermarkExchangeRequest {
library_id,
device_id: requesting_device_id,
my_state_watermark: peer_state_watermark,
my_shared_watermark: peer_shared_watermark,
my_resource_watermarks: peer_resource_watermarks,
} => {
let (our_state_watermark, our_shared_watermark) =
let (_our_state_watermark, our_shared_watermark) =
sync_service.peer_sync().get_watermarks().await;
let needs_state_catchup = matches!((peer_state_watermark, our_state_watermark), (Some(p), Some(o)) if o > p)
|| matches!((peer_state_watermark, our_state_watermark), (None, Some(_)));
// Get our per-resource watermarks
let our_resource_watermarks =
sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id)
.get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn())
.await
.unwrap_or_default();
// Compare per-resource watermarks to determine if peer needs catch-up
let mut needs_state_catchup = false;
for (resource_type, our_ts) in &our_resource_watermarks {
match peer_resource_watermarks.get(resource_type) {
Some(peer_ts) if our_ts > peer_ts => {
needs_state_catchup = true;
break;
}
None => {
needs_state_catchup = true;
break;
}
_ => {}
}
}
let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p)
|| matches!(
(peer_shared_watermark, our_shared_watermark),
@@ -218,10 +239,10 @@ impl MockTransport {
let response = SyncMessage::WatermarkExchangeResponse {
library_id,
device_id: self.my_device_id,
state_watermark: our_state_watermark,
shared_watermark: our_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
resource_watermarks: our_resource_watermarks,
};
self.send_sync_message(sender, response).await?;
@@ -229,19 +250,19 @@ impl MockTransport {
SyncMessage::WatermarkExchangeResponse {
library_id: _,
device_id: peer_device_id,
state_watermark: peer_state_watermark,
shared_watermark: peer_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
resource_watermarks: peer_resource_watermarks,
} => {
sync_service
.peer_sync()
.on_watermark_exchange_response(
peer_device_id,
peer_state_watermark,
peer_shared_watermark,
needs_state_catchup,
needs_shared_catchup,
peer_resource_watermarks,
)
.await?;
}

View File

@@ -1066,7 +1066,7 @@ impl Drop for SyncTestHarness {
//
/// Test: Location 1 indexed on Alice, syncs to Bob in real-time
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_realtime_sync_alice_to_bob() -> anyhow::Result<()> {
let harness = SyncTestHarness::new("realtime_alice_to_bob").await?;
@@ -1206,7 +1206,7 @@ async fn test_realtime_sync_alice_to_bob() -> anyhow::Result<()> {
}
/// Test: Location indexed on Bob, syncs to Alice (reverse direction)
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_realtime_sync_bob_to_alice() -> anyhow::Result<()> {
let harness = SyncTestHarness::new("realtime_bob_to_alice").await?;
@@ -1241,7 +1241,7 @@ async fn test_realtime_sync_bob_to_alice() -> anyhow::Result<()> {
}
/// Test: Concurrent indexing on both devices
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_indexing() -> anyhow::Result<()> {
let harness = SyncTestHarness::new("concurrent_indexing").await?;
@@ -1287,7 +1287,7 @@ async fn test_concurrent_indexing() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_content_identity_linkage() -> anyhow::Result<()> {
let harness = SyncTestHarness::new("content_identity_linkage").await?;