mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-14 02:04:32 -04:00
feat: enhance watermark synchronization with resource count validation
- Introduced new functionality to validate resource counts during watermark exchanges, improving synchronization accuracy. - Added methods to retrieve device-owned resource counts for gap detection, ensuring that discrepancies are identified and addressed. - Updated the `SyncMessage` structure to include actual resource counts, facilitating better tracking of synchronization state. - Enhanced the `PeerSync` implementation to clear watermarks for mismatched resources, allowing for surgical recovery in case of count mismatches. - Improved logging for better visibility into synchronization processes and potential issues.
This commit is contained in:
@@ -215,9 +215,7 @@ impl ResourceWatermarkStore {
|
||||
|
||||
match result {
|
||||
Some(row) => {
|
||||
let watermark_str: Option<String> = row
|
||||
.try_get("", "max_watermark")
|
||||
.ok();
|
||||
let watermark_str: Option<String> = row.try_get("", "max_watermark").ok();
|
||||
|
||||
if let Some(wm_str) = watermark_str {
|
||||
let dt = DateTime::parse_from_rfc3339(&wm_str)
|
||||
@@ -233,6 +231,34 @@ impl ResourceWatermarkStore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete watermark for a specific resource type from a peer
|
||||
///
|
||||
/// Used for surgical recovery when count mismatch is detected for a single resource.
|
||||
pub async fn delete_resource<C: ConnectionTrait>(
|
||||
&self,
|
||||
conn: &C,
|
||||
peer_device_uuid: Uuid,
|
||||
resource_type: &str,
|
||||
) -> Result<bool, WatermarkError> {
|
||||
let result = conn
|
||||
.execute(Statement::from_sql_and_values(
|
||||
DbBackend::Sqlite,
|
||||
r#"
|
||||
DELETE FROM device_resource_watermarks
|
||||
WHERE device_uuid = ? AND peer_device_uuid = ? AND resource_type = ?
|
||||
"#,
|
||||
vec![
|
||||
self.device_uuid.to_string().into(),
|
||||
peer_device_uuid.to_string().into(),
|
||||
resource_type.into(),
|
||||
],
|
||||
))
|
||||
.await
|
||||
.map_err(|e| WatermarkError::QueryError(e.to_string()))?;
|
||||
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
/// Delete all watermarks for a peer (cleanup on peer removal)
|
||||
pub async fn delete_peer<C: ConnectionTrait>(
|
||||
&self,
|
||||
@@ -347,10 +373,7 @@ mod tests {
|
||||
// Verify retrieval
|
||||
let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap();
|
||||
assert!(retrieved.is_some());
|
||||
assert_eq!(
|
||||
retrieved.unwrap().timestamp(),
|
||||
timestamp1.timestamp()
|
||||
);
|
||||
assert_eq!(retrieved.unwrap().timestamp(), timestamp1.timestamp());
|
||||
|
||||
// Update with newer timestamp
|
||||
let timestamp2 = timestamp1 + chrono::Duration::seconds(10);
|
||||
@@ -361,10 +384,7 @@ mod tests {
|
||||
|
||||
// Verify update
|
||||
let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap();
|
||||
assert_eq!(
|
||||
retrieved.unwrap().timestamp(),
|
||||
timestamp2.timestamp()
|
||||
);
|
||||
assert_eq!(retrieved.unwrap().timestamp(), timestamp2.timestamp());
|
||||
|
||||
// Attempt update with older timestamp (should be ignored)
|
||||
let timestamp0 = timestamp1 - chrono::Duration::seconds(10);
|
||||
@@ -375,10 +395,7 @@ mod tests {
|
||||
|
||||
// Verify still has timestamp2 (newer)
|
||||
let retrieved = store.get(&conn, peer_uuid, "location").await.unwrap();
|
||||
assert_eq!(
|
||||
retrieved.unwrap().timestamp(),
|
||||
timestamp2.timestamp()
|
||||
);
|
||||
assert_eq!(retrieved.unwrap().timestamp(), timestamp2.timestamp());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -468,4 +485,3 @@ mod tests {
|
||||
assert_eq!(all.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -394,99 +394,122 @@ impl SyncProtocolHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
SyncMessage::WatermarkExchangeRequest {
|
||||
library_id,
|
||||
device_id,
|
||||
my_shared_watermark: peer_shared_watermark,
|
||||
my_resource_watermarks: peer_resource_watermarks,
|
||||
} => {
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
peer_shared_watermark = ?peer_shared_watermark,
|
||||
peer_resource_count = peer_resource_watermarks.len(),
|
||||
"Processing WatermarkExchangeRequest with per-resource watermarks"
|
||||
);
|
||||
SyncMessage::WatermarkExchangeRequest {
|
||||
library_id,
|
||||
device_id,
|
||||
my_shared_watermark: peer_shared_watermark,
|
||||
my_resource_watermarks: peer_resource_watermarks,
|
||||
my_peer_resource_counts: peer_counts_of_our_data,
|
||||
} => {
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
peer_shared_watermark = ?peer_shared_watermark,
|
||||
peer_resource_count = peer_resource_watermarks.len(),
|
||||
peer_counts_of_us = ?peer_counts_of_our_data,
|
||||
"Processing WatermarkExchangeRequest with count validation"
|
||||
);
|
||||
|
||||
// Get our current watermarks
|
||||
let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await;
|
||||
let our_resource_watermarks =
|
||||
crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id())
|
||||
.get_our_resource_watermarks(peer_sync.peer_log_conn())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
// Get our current watermarks
|
||||
let (_our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await;
|
||||
let our_resource_watermarks =
|
||||
crate::infra::sync::ResourceWatermarkStore::new(peer_sync.device_id())
|
||||
.get_our_resource_watermarks(peer_sync.peer_log_conn())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
// Determine if peer needs catch-up by comparing per-resource watermarks
|
||||
let mut needs_state_catchup = false;
|
||||
for (resource_type, our_ts) in &our_resource_watermarks {
|
||||
match peer_resource_watermarks.get(resource_type) {
|
||||
Some(peer_ts) if our_ts > peer_ts => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
// Get our ACTUAL counts (what we truly own)
|
||||
let our_actual_resource_counts = crate::service::sync::PeerSync::get_device_owned_counts(
|
||||
peer_sync.device_id(),
|
||||
peer_sync.db(),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
// Determine if peer needs catch-up by comparing per-resource watermarks
|
||||
let mut needs_state_catchup = false;
|
||||
for (resource_type, our_ts) in &our_resource_watermarks {
|
||||
match peer_resource_watermarks.get(resource_type) {
|
||||
Some(peer_ts) if our_ts > peer_ts => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) {
|
||||
(Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc,
|
||||
(None, Some(_)) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
info!(
|
||||
from_device = %from_device,
|
||||
needs_state_catchup = needs_state_catchup,
|
||||
needs_shared_catchup = needs_shared_catchup,
|
||||
our_resource_count = our_resource_watermarks.len(),
|
||||
"Responding to watermark exchange request with per-resource watermarks"
|
||||
);
|
||||
|
||||
Ok(Some(SyncMessage::WatermarkExchangeResponse {
|
||||
library_id: self.library_id,
|
||||
device_id: peer_sync.device_id(),
|
||||
shared_watermark: our_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: our_resource_watermarks,
|
||||
}))
|
||||
}
|
||||
|
||||
SyncMessage::WatermarkExchangeResponse {
|
||||
library_id,
|
||||
device_id,
|
||||
shared_watermark: peer_shared_watermark,
|
||||
// Also check if we have resources peer doesn't know about
|
||||
for resource_type in our_resource_watermarks.keys() {
|
||||
if !peer_resource_watermarks.contains_key(resource_type) {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let needs_shared_catchup = match (peer_shared_watermark, our_shared_watermark) {
|
||||
(Some(peer_hlc), Some(our_hlc)) => our_hlc > peer_hlc,
|
||||
(None, Some(_)) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
needs_state_catchup = needs_state_catchup,
|
||||
needs_shared_catchup = needs_shared_catchup,
|
||||
our_resource_count = our_resource_watermarks.len(),
|
||||
our_actual_counts = ?our_actual_resource_counts,
|
||||
"Responding to watermark exchange with actual counts"
|
||||
);
|
||||
|
||||
Ok(Some(SyncMessage::WatermarkExchangeResponse {
|
||||
library_id: self.library_id,
|
||||
device_id: peer_sync.device_id(),
|
||||
shared_watermark: our_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: peer_resource_watermarks,
|
||||
} => {
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
peer_shared_watermark = ?peer_shared_watermark,
|
||||
needs_state_catchup = needs_state_catchup,
|
||||
needs_shared_catchup = needs_shared_catchup,
|
||||
peer_resource_count = peer_resource_watermarks.len(),
|
||||
"Processing WatermarkExchangeResponse with per-resource watermarks"
|
||||
);
|
||||
resource_watermarks: our_resource_watermarks,
|
||||
my_actual_resource_counts: our_actual_resource_counts,
|
||||
}))
|
||||
}
|
||||
|
||||
peer_sync
|
||||
.on_watermark_exchange_response(
|
||||
from_device,
|
||||
peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
peer_resource_watermarks,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
NetworkingError::Protocol(format!(
|
||||
"Failed to handle watermark exchange response: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
SyncMessage::WatermarkExchangeResponse {
|
||||
library_id,
|
||||
device_id,
|
||||
shared_watermark: peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: peer_resource_watermarks,
|
||||
my_actual_resource_counts: peer_actual_counts,
|
||||
} => {
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
peer_shared_watermark = ?peer_shared_watermark,
|
||||
needs_state_catchup = needs_state_catchup,
|
||||
needs_shared_catchup = needs_shared_catchup,
|
||||
peer_resource_count = peer_resource_watermarks.len(),
|
||||
peer_actual_counts = ?peer_actual_counts,
|
||||
"Processing WatermarkExchangeResponse with counts"
|
||||
);
|
||||
|
||||
peer_sync
|
||||
.on_watermark_exchange_response(
|
||||
from_device,
|
||||
peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
peer_resource_watermarks,
|
||||
peer_actual_counts,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
NetworkingError::Protocol(format!(
|
||||
"Failed to handle watermark exchange response: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -106,6 +106,10 @@ pub enum SyncMessage {
|
||||
my_shared_watermark: Option<HLC>,
|
||||
/// Per-resource state watermarks (resource_type -> timestamp)
|
||||
my_resource_watermarks: std::collections::HashMap<String, DateTime<Utc>>,
|
||||
/// Counts of peer's device-owned resources that we have synced (for gap detection)
|
||||
/// Maps resource_type -> count of records we have from this peer
|
||||
#[serde(default)]
|
||||
my_peer_resource_counts: std::collections::HashMap<String, u64>,
|
||||
},
|
||||
|
||||
/// Response with peer's watermarks
|
||||
@@ -117,6 +121,10 @@ pub enum SyncMessage {
|
||||
needs_shared_catchup: bool, // If true, peer needs our shared changes
|
||||
/// Per-resource state watermarks (resource_type -> timestamp)
|
||||
resource_watermarks: std::collections::HashMap<String, DateTime<Utc>>,
|
||||
/// Our actual device-owned resource counts (for gap detection)
|
||||
/// Maps resource_type -> actual count of records we own
|
||||
#[serde(default)]
|
||||
my_actual_resource_counts: std::collections::HashMap<String, u64>,
|
||||
},
|
||||
|
||||
/// Proactive notification that sender has new data available
|
||||
|
||||
@@ -261,6 +261,123 @@ impl PeerSync {
|
||||
self.peer_log.conn()
|
||||
}
|
||||
|
||||
/// Get counts of device-owned resources owned by a specific device
|
||||
///
|
||||
/// Used for gap detection during watermark exchange.
|
||||
/// Only counts non-deleted records where device ownership matches.
|
||||
pub async fn get_device_owned_counts(
|
||||
owner_device_id: Uuid,
|
||||
db: &DatabaseConnection,
|
||||
) -> Result<std::collections::HashMap<String, u64>> {
|
||||
use crate::infra::db::entities::{device, entry, location, volume};
|
||||
use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect};
|
||||
|
||||
let mut counts = std::collections::HashMap::new();
|
||||
|
||||
// Get device's internal ID from UUID
|
||||
let device = device::Entity::find()
|
||||
.filter(device::Column::Uuid.eq(owner_device_id))
|
||||
.one(db)
|
||||
.await?;
|
||||
|
||||
let device_internal_id = match device {
|
||||
Some(d) => d.id,
|
||||
None => {
|
||||
debug!(device_id = %owner_device_id, "Device not found for count query");
|
||||
return Ok(counts);
|
||||
}
|
||||
};
|
||||
|
||||
// Location count (owned by device)
|
||||
let location_count = location::Entity::find()
|
||||
.filter(location::Column::DeviceId.eq(device_internal_id))
|
||||
.count(db)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
counts.insert("location".to_string(), location_count);
|
||||
|
||||
// Entry count (via location ownership chain)
|
||||
// Query entries where location.device_id matches
|
||||
let entry_count: u64 = {
|
||||
use sea_orm::sea_query::{Expr, Query};
|
||||
use sea_orm::{FromQueryResult, Statement};
|
||||
|
||||
#[derive(FromQueryResult)]
|
||||
struct CountResult {
|
||||
count: i64,
|
||||
}
|
||||
|
||||
let stmt = Statement::from_sql_and_values(
|
||||
sea_orm::DbBackend::Sqlite,
|
||||
r#"
|
||||
SELECT COUNT(*) as count
|
||||
FROM entries e
|
||||
INNER JOIN locations l ON e.location_id = l.id
|
||||
WHERE l.device_id = ?
|
||||
"#,
|
||||
vec![device_internal_id.into()],
|
||||
);
|
||||
|
||||
let result = CountResult::find_by_statement(stmt)
|
||||
.one(db)
|
||||
.await
|
||||
.unwrap_or(None);
|
||||
|
||||
result.map(|r| r.count as u64).unwrap_or(0)
|
||||
};
|
||||
counts.insert("entry".to_string(), entry_count);
|
||||
|
||||
// Volume count (owned by device)
|
||||
let volume_count = volume::Entity::find()
|
||||
.filter(volume::Column::DeviceId.eq(device_internal_id))
|
||||
.count(db)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
counts.insert("volume".to_string(), volume_count);
|
||||
|
||||
debug!(
|
||||
device_id = %owner_device_id,
|
||||
location_count = location_count,
|
||||
entry_count = entry_count,
|
||||
volume_count = volume_count,
|
||||
"Queried device-owned resource counts"
|
||||
);
|
||||
|
||||
Ok(counts)
|
||||
}
|
||||
|
||||
/// Clear watermarks for specific resources (surgical recovery)
|
||||
///
|
||||
/// Called when count mismatch is detected for specific resources.
|
||||
/// Only clears the mismatched resources, leaving correct watermarks intact.
|
||||
async fn clear_resource_watermarks(
|
||||
&self,
|
||||
peer_id: Uuid,
|
||||
resource_types: Vec<String>,
|
||||
) -> Result<()> {
|
||||
let store = crate::infra::sync::ResourceWatermarkStore::new(self.device_id);
|
||||
|
||||
let mut cleared_count = 0;
|
||||
for resource_type in &resource_types {
|
||||
if store
|
||||
.delete_resource(self.peer_log.conn(), peer_id, resource_type)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to clear resource watermark: {}", e))?
|
||||
{
|
||||
cleared_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
peer = %peer_id,
|
||||
resources = ?resource_types,
|
||||
cleared = cleared_count,
|
||||
"Cleared watermarks for mismatched resources only (surgical recovery)"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if real-time sync is currently active for a specific peer
|
||||
///
|
||||
/// Returns true if real-time broadcasts to this peer succeeded in the last 30 seconds.
|
||||
@@ -530,11 +647,20 @@ impl PeerSync {
|
||||
std::collections::HashMap::new()
|
||||
});
|
||||
|
||||
// Get counts of peer's device-owned resources that we have synced (for gap detection)
|
||||
let my_peer_resource_counts = Self::get_device_owned_counts(peer_id, &self.db)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(error = %e, peer = %peer_id, "Failed to get peer resource counts");
|
||||
std::collections::HashMap::new()
|
||||
});
|
||||
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
my_shared_watermark = ?my_shared_watermark,
|
||||
resource_count = my_resource_watermarks.len(),
|
||||
"Sending watermark exchange request with per-resource watermarks"
|
||||
peer_owned_counts = ?my_peer_resource_counts,
|
||||
"Sending watermark exchange request with counts for gap detection"
|
||||
);
|
||||
|
||||
// Send request to peer
|
||||
@@ -543,6 +669,7 @@ impl PeerSync {
|
||||
device_id: self.device_id,
|
||||
my_shared_watermark,
|
||||
my_resource_watermarks,
|
||||
my_peer_resource_counts,
|
||||
};
|
||||
|
||||
self.network
|
||||
@@ -552,7 +679,7 @@ impl PeerSync {
|
||||
|
||||
info!(
|
||||
peer = %peer_id,
|
||||
"Watermark exchange request sent, waiting for response"
|
||||
"Watermark exchange request sent with resource counts"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -569,6 +696,7 @@ impl PeerSync {
|
||||
needs_state_catchup: bool,
|
||||
needs_shared_catchup: bool,
|
||||
peer_resource_watermarks: std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
|
||||
peer_actual_resource_counts: std::collections::HashMap<String, u64>,
|
||||
) -> Result<()> {
|
||||
info!(
|
||||
peer = %peer_id,
|
||||
@@ -576,7 +704,7 @@ impl PeerSync {
|
||||
needs_state_catchup = needs_state_catchup,
|
||||
needs_shared_catchup = needs_shared_catchup,
|
||||
peer_resource_count = peer_resource_watermarks.len(),
|
||||
"Received watermark exchange response with per-resource watermarks"
|
||||
"Received watermark exchange response with resource counts"
|
||||
);
|
||||
|
||||
// Get our watermarks to compare
|
||||
@@ -587,6 +715,124 @@ impl PeerSync {
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
// Count-based gap detection (detects watermark leapfrog bugs)
|
||||
// Only run when NOT in real-time sync or active backfill
|
||||
let state = self.state().await;
|
||||
let realtime_active = self.is_realtime_active_for_peer(peer_id).await;
|
||||
let in_stable_state = state.is_ready() && !realtime_active;
|
||||
|
||||
if in_stable_state && !peer_actual_resource_counts.is_empty() {
|
||||
// Get what we think peer has
|
||||
let my_counts_of_peer_data = Self::get_device_owned_counts(peer_id, &self.db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut mismatched_resource_types = Vec::new();
|
||||
let mut mismatch_details = Vec::new();
|
||||
|
||||
for (resource_type, peer_actual_count) in &peer_actual_resource_counts {
|
||||
let our_count = my_counts_of_peer_data
|
||||
.get(resource_type)
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
|
||||
// Only flag mismatch if watermarks appear synchronized
|
||||
// If watermarks already show divergence, normal catch-up will handle it
|
||||
let watermarks_appear_synced = match (
|
||||
my_resource_watermarks.get(resource_type),
|
||||
peer_resource_watermarks.get(resource_type),
|
||||
) {
|
||||
(Some(my_ts), Some(peer_ts)) => {
|
||||
let diff_seconds = (my_ts.timestamp() - peer_ts.timestamp()).abs();
|
||||
diff_seconds < 10 // Within 10 seconds = appear synchronized
|
||||
}
|
||||
(None, None) => true, // Both missing watermarks = appear synchronized
|
||||
_ => false, // One has watermark, one doesn't = watermarks diverge, catch-up will fix it
|
||||
};
|
||||
|
||||
if our_count != *peer_actual_count && watermarks_appear_synced {
|
||||
warn!(
|
||||
peer = %peer_id,
|
||||
resource = %resource_type,
|
||||
our_count = our_count,
|
||||
peer_actual = peer_actual_count,
|
||||
gap = i64::abs(our_count as i64 - *peer_actual_count as i64),
|
||||
our_watermark = ?my_resource_watermarks.get(resource_type),
|
||||
peer_watermark = ?peer_resource_watermarks.get(resource_type),
|
||||
"COUNT MISMATCH WITH SYNCED WATERMARKS! Watermark leapfrog bug detected"
|
||||
);
|
||||
mismatched_resource_types.push(resource_type.clone());
|
||||
mismatch_details.push(format!(
|
||||
"{}({}/{}, wm_diff={}s)",
|
||||
resource_type,
|
||||
our_count,
|
||||
peer_actual_count,
|
||||
my_resource_watermarks
|
||||
.get(resource_type)
|
||||
.and_then(|my_ts| {
|
||||
peer_resource_watermarks
|
||||
.get(resource_type)
|
||||
.map(|peer_ts| (my_ts.timestamp() - peer_ts.timestamp()).abs())
|
||||
})
|
||||
.unwrap_or(0)
|
||||
));
|
||||
} else if our_count != *peer_actual_count {
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
resource = %resource_type,
|
||||
our_count = our_count,
|
||||
peer_actual = peer_actual_count,
|
||||
"Count mismatch but watermarks diverge - normal catch-up will fix it"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if !mismatched_resource_types.is_empty() {
|
||||
error!(
|
||||
peer = %peer_id,
|
||||
mismatches = ?mismatch_details,
|
||||
resources = ?mismatched_resource_types,
|
||||
"Count mismatch indicates watermark leapfrog bug, clearing only affected resources"
|
||||
);
|
||||
|
||||
// Clear watermarks only for mismatched resources
|
||||
// This preserves correct watermarks for other resources
|
||||
self.clear_resource_watermarks(peer_id, mismatched_resource_types.clone())
|
||||
.await?;
|
||||
|
||||
// Trigger catch-up for affected resources
|
||||
let backfill_mgr = self.backfill_manager.read().await;
|
||||
if let Some(weak_ref) = backfill_mgr.as_ref() {
|
||||
if let Some(manager) = weak_ref.upgrade() {
|
||||
info!(
|
||||
peer = %peer_id,
|
||||
resources = ?mismatched_resource_types,
|
||||
"Initiating targeted backfill for resources with count mismatch"
|
||||
);
|
||||
// Force full backfill for affected resources (no watermarks)
|
||||
manager.catch_up_from_peer(peer_id, None, None).await?;
|
||||
}
|
||||
} else {
|
||||
warn!("BackfillManager not available, cannot trigger recovery backfill");
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
} else {
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
"Count validation passed - no gaps detected"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
state = ?state,
|
||||
realtime_active = realtime_active,
|
||||
has_counts = !peer_actual_resource_counts.is_empty(),
|
||||
"Skipping count-based gap detection - not in stable state or no counts provided"
|
||||
);
|
||||
}
|
||||
|
||||
// Determine if WE need to catch up based on per-resource watermark comparison
|
||||
let mut we_need_state_catchup = false;
|
||||
let mut we_need_shared_catchup = false;
|
||||
@@ -1334,11 +1580,15 @@ impl PeerSync {
|
||||
std::collections::HashMap::new()
|
||||
});
|
||||
|
||||
// Note: Counts unavailable in static context (no db access)
|
||||
// This is fine - counts only used when called from instance method
|
||||
let my_peer_resource_counts = std::collections::HashMap::new();
|
||||
|
||||
debug!(
|
||||
peer = %peer_id,
|
||||
my_shared_watermark = ?my_shared_watermark,
|
||||
resource_count = my_resource_watermarks.len(),
|
||||
"Sending watermark exchange request with per-resource watermarks"
|
||||
"Sending watermark exchange request (static trigger, counts unavailable)"
|
||||
);
|
||||
|
||||
// Send request to peer
|
||||
@@ -1347,6 +1597,7 @@ impl PeerSync {
|
||||
device_id,
|
||||
my_shared_watermark,
|
||||
my_resource_watermarks,
|
||||
my_peer_resource_counts,
|
||||
};
|
||||
|
||||
network
|
||||
|
||||
@@ -198,55 +198,65 @@ impl MockTransport {
|
||||
}
|
||||
}
|
||||
}
|
||||
SyncMessage::WatermarkExchangeRequest {
|
||||
library_id,
|
||||
device_id: requesting_device_id,
|
||||
my_shared_watermark: peer_shared_watermark,
|
||||
my_resource_watermarks: peer_resource_watermarks,
|
||||
} => {
|
||||
let (_our_state_watermark, our_shared_watermark) =
|
||||
sync_service.peer_sync().get_watermarks().await;
|
||||
SyncMessage::WatermarkExchangeRequest {
|
||||
library_id,
|
||||
device_id: requesting_device_id,
|
||||
my_shared_watermark: peer_shared_watermark,
|
||||
my_resource_watermarks: peer_resource_watermarks,
|
||||
my_peer_resource_counts: _peer_counts,
|
||||
} => {
|
||||
let (_our_state_watermark, our_shared_watermark) =
|
||||
sync_service.peer_sync().get_watermarks().await;
|
||||
|
||||
// Get our per-resource watermarks
|
||||
let our_resource_watermarks =
|
||||
sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id)
|
||||
.get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
// Get our per-resource watermarks
|
||||
let our_resource_watermarks =
|
||||
sd_core::infra::sync::ResourceWatermarkStore::new(self.my_device_id)
|
||||
.get_our_resource_watermarks(sync_service.peer_sync().peer_log_conn())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
// Compare per-resource watermarks to determine if peer needs catch-up
|
||||
let mut needs_state_catchup = false;
|
||||
for (resource_type, our_ts) in &our_resource_watermarks {
|
||||
match peer_resource_watermarks.get(resource_type) {
|
||||
Some(peer_ts) if our_ts > peer_ts => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
// Get our actual counts
|
||||
let our_actual_counts = sd_core::service::sync::PeerSync::get_device_owned_counts(
|
||||
self.my_device_id,
|
||||
sync_service.peer_sync().db(),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
// Compare per-resource watermarks to determine if peer needs catch-up
|
||||
let mut needs_state_catchup = false;
|
||||
for (resource_type, our_ts) in &our_resource_watermarks {
|
||||
match peer_resource_watermarks.get(resource_type) {
|
||||
Some(peer_ts) if our_ts > peer_ts => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
needs_state_catchup = true;
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p)
|
||||
|| matches!(
|
||||
(peer_shared_watermark, our_shared_watermark),
|
||||
(None, Some(_))
|
||||
);
|
||||
let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p)
|
||||
|| matches!(
|
||||
(peer_shared_watermark, our_shared_watermark),
|
||||
(None, Some(_))
|
||||
);
|
||||
|
||||
let response = SyncMessage::WatermarkExchangeResponse {
|
||||
library_id,
|
||||
device_id: self.my_device_id,
|
||||
shared_watermark: our_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: our_resource_watermarks,
|
||||
};
|
||||
let response = SyncMessage::WatermarkExchangeResponse {
|
||||
library_id,
|
||||
device_id: self.my_device_id,
|
||||
shared_watermark: our_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: our_resource_watermarks,
|
||||
my_actual_resource_counts: our_actual_counts,
|
||||
};
|
||||
|
||||
self.send_sync_message(sender, response).await?;
|
||||
}
|
||||
self.send_sync_message(sender, response).await?;
|
||||
}
|
||||
SyncMessage::DataAvailableNotification {
|
||||
device_id,
|
||||
resource_types,
|
||||
@@ -264,25 +274,27 @@ impl MockTransport {
|
||||
warn!(error = %e, "Failed to trigger watermark exchange from notification");
|
||||
}
|
||||
}
|
||||
SyncMessage::WatermarkExchangeResponse {
|
||||
library_id: _,
|
||||
device_id: peer_device_id,
|
||||
shared_watermark: peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: peer_resource_watermarks,
|
||||
} => {
|
||||
sync_service
|
||||
.peer_sync()
|
||||
.on_watermark_exchange_response(
|
||||
peer_device_id,
|
||||
peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
peer_resource_watermarks,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
SyncMessage::WatermarkExchangeResponse {
|
||||
library_id: _,
|
||||
device_id: peer_device_id,
|
||||
shared_watermark: peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
resource_watermarks: peer_resource_watermarks,
|
||||
my_actual_resource_counts: peer_actual_counts,
|
||||
} => {
|
||||
sync_service
|
||||
.peer_sync()
|
||||
.on_watermark_exchange_response(
|
||||
peer_device_id,
|
||||
peer_shared_watermark,
|
||||
needs_state_catchup,
|
||||
needs_shared_catchup,
|
||||
peer_resource_watermarks,
|
||||
peer_actual_counts,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
SyncMessage::StateRequest {
|
||||
library_id,
|
||||
model_types,
|
||||
|
||||
Submodule workbench updated: 93727d7791...678ef4e295
Reference in New Issue
Block a user