refactor: Update sync transport implementation and library creation methods

- Revised documentation in the NetworkTransport trait to clarify the source of target device UUIDs and connected partners, switching references from the obsolete sync_partners table to the devices table.
- Introduced `create_library_no_sync` method in LibraryManager to allow library creation without automatic sync initialization, facilitating testing with mock transports.
- Enhanced logging in PeerSync to include network transport details for better debugging.
- Updated MockNetworkTransport to support transport name retrieval for improved test clarity.
This commit is contained in:
Jamie Pine
2025-10-09 09:30:34 -07:00
parent 28e3ee443d
commit f9687c5ec2
6 changed files with 352 additions and 110 deletions

View File

@@ -44,7 +44,7 @@ pub trait NetworkTransport: Send + Sync {
///
/// # Arguments
///
/// * `target_device` - UUID of the target device (from sync_partners table)
/// * `target_device` - UUID of the target device (from devices table where sync_enabled=true)
/// * `message` - The sync message to send (StateChange, SharedChange, etc.)
///
/// # Returns
@@ -72,7 +72,7 @@ pub trait NetworkTransport: Send + Sync {
/// Get list of currently connected sync partner devices
///
/// Returns UUIDs of devices that are:
/// - Listed in sync_partners table with sync_enabled=true
/// - Listed in devices table with sync_enabled=true
/// - Currently connected (have active network connection)
///
/// This is used to optimize broadcasting - only send to devices that can receive.
@@ -85,7 +85,7 @@ pub trait NetworkTransport: Send + Sync {
/// # Implementation Note
///
/// This should query:
/// 1. `sync_partners` table for enabled partners
/// 1. `devices` table for sync_enabled=true devices
/// 2. `device_registry` for connection status
/// 3. Return intersection of (enabled) AND (connected)
async fn get_connected_sync_partners(&self) -> Result<Vec<Uuid>>;
@@ -107,43 +107,16 @@ pub trait NetworkTransport: Send + Sync {
/// `false` otherwise (device offline, not paired, etc.)
async fn is_device_reachable(&self, device_uuid: Uuid) -> bool {
// Default implementation: can be overridden for more efficient checks
// For now, we just assume device is reachable if UUID is known
// (actual implementation will check DeviceRegistry connection status)
false // Implementer should override this
}
}
/// No-op transport implementation for when networking is unavailable
///
/// Used as a fallback when NetworkingService hasn't been initialized yet.
/// All operations succeed but do nothing (messages are dropped).
pub struct NoOpNetworkTransport;
impl NoOpNetworkTransport {
pub fn new() -> Self {
Self
}
}
#[async_trait::async_trait]
impl NetworkTransport for NoOpNetworkTransport {
async fn send_sync_message(&self, _target_device: Uuid, _message: SyncMessage) -> Result<()> {
// Silently drop message - networking not available
Ok(())
}
async fn get_connected_sync_partners(&self) -> Result<Vec<Uuid>> {
// No networking = no connected partners
Ok(vec![])
}
async fn is_device_reachable(&self, _device_uuid: Uuid) -> bool {
// No networking = nothing is reachable
false
}
/// Get transport name for debugging
fn transport_name(&self) -> &'static str {
"UnknownTransport"
}
}
/// Mock implementation for testing
/// Mock implementation for testing - collects messages without sending
#[cfg(test)]
pub struct MockNetworkTransport {
/// Track which devices received which messages
@@ -178,4 +151,8 @@ impl NetworkTransport for MockNetworkTransport {
// For tests, return empty list
Ok(vec![])
}
fn transport_name(&self) -> &'static str {
"MockNetworkTransport"
}
}

View File

@@ -112,6 +112,29 @@ impl LibraryManager {
name: impl Into<String>,
location: Option<PathBuf>,
context: Arc<CoreContext>,
) -> Result<Arc<Library>> {
self.create_library_internal(name, location, context, true)
.await
}
/// Create a library without auto-initializing sync (for testing)
pub async fn create_library_no_sync(
&self,
name: impl Into<String>,
location: Option<PathBuf>,
context: Arc<CoreContext>,
) -> Result<Arc<Library>> {
self.create_library_internal(name, location, context, false)
.await
}
/// Internal library creation with optional sync init
async fn create_library_internal(
&self,
name: impl Into<String>,
location: Option<PathBuf>,
context: Arc<CoreContext>,
auto_init_sync: bool,
) -> Result<Arc<Library>> {
let name = name.into();
@@ -254,26 +277,22 @@ impl LibraryManager {
// Note: Sidecar manager initialization should be done by the Core when libraries are loaded
// This allows Core to pass its services reference
// Initialize sync service with network transport
let network_transport: Arc<dyn crate::infra::sync::NetworkTransport> =
match context.networking.read().await.as_ref() {
Some(networking) => networking.clone(),
None => {
warn!(
"NetworkingService not initialized, sync service will use no-op transport"
);
// Use no-op transport (messages dropped, no broadcast)
Arc::new(crate::infra::sync::transport::NoOpNetworkTransport::new())
}
};
if let Err(e) = library
.init_sync_service(device_id, network_transport)
.await
{
warn!(
"Failed to initialize sync service for library {}: {}",
config.id, e
// Initialize sync service if networking is available
// If networking isn't ready, sync simply won't be initialized until caller does it explicitly
// TODO: maybe consider checking if networking is enabled rather than just checking if it's available
if let Some(networking) = context.networking.read().await.as_ref() {
if let Err(e) = library
.init_sync_service(device_id, networking.clone())
.await
{
warn!(
"Failed to initialize sync service for library {}: {}",
config.id, e
);
}
} else {
info!(
"NetworkingService not available, sync service will be initialized later when networking is ready"
);
}

View File

@@ -112,6 +112,11 @@ impl Library {
network: Arc<dyn crate::infra::sync::NetworkTransport>,
) -> Result<()> {
if self.sync_service.get().is_some() {
warn!(
"Sync service already initialized for library {}, cannot replace transport. Transport: {}",
self.id(),
self.sync_service.get().unwrap().peer_sync().transport_name()
);
return Ok(());
}

View File

@@ -225,6 +225,10 @@ impl PeerSync {
// Clone necessary fields for the spawned task
let library_id = self.library_id;
let network = self.network.clone();
info!(
"PeerSync event listener cloning network transport: {:?}",
std::any::type_name_of_val(&*network)
);
let state = self.state.clone();
let buffer = self.buffer.clone();
let db = self.db.clone();
@@ -234,7 +238,10 @@ impl PeerSync {
let is_running = self.is_running.clone();
tokio::spawn(async move {
info!("PeerSync event listener started");
info!(
"PeerSync event listener started with network transport: {}",
network.transport_name()
);
while is_running.load(Ordering::SeqCst) {
match subscriber.recv().await {
@@ -357,13 +364,20 @@ impl PeerSync {
}
// Get all connected sync partners
debug!("About to call network.get_connected_sync_partners() on handle_state_change_event_static");
let connected_partners = network.get_connected_sync_partners().await.map_err(|e| {
warn!(error = %e, "Failed to get connected partners");
e
})?;
debug!(
count = connected_partners.len(),
partners = ?connected_partners,
"[Static Handler] Got connected sync partners from transport"
);
if connected_partners.is_empty() {
debug!("No connected sync partners to broadcast to");
debug!("[Static Handler] No connected sync partners to broadcast to");
return Ok(());
}
@@ -994,6 +1008,11 @@ impl PeerSync {
&self.hlc_generator
}
/// Get network transport name (for debugging)
pub fn transport_name(&self) -> &'static str {
self.network.transport_name()
}
/// Get device-owned state for backfill (StateRequest)
///
/// This is completely domain-agnostic - it delegates to the Syncable trait

View File

@@ -0,0 +1,213 @@
# Sync Integration Test - Debug Results
## 🎉 BREAKTHROUGH: Sync is Working!
### The Fix: `create_library_no_sync()`
**Problem**: OnceCell timing issue
```rust
// Before:
create_library()
open_library()
auto-init sync with NoOpTransport (networking disabled)
OnceCell::set(NoOpTransport) Cell is now locked!
// Test tries to inject mock:
init_sync_service(mock_transport)
if sync_service.get().is_some() { return Ok(()); } Rejected!
Mock never gets used
```
**Solution**: Skip auto-init for tests
```rust
create_library_no_sync() New method with auto_init_sync=false
open_library()
Skips auto-init OnceCell remains empty!
// Test can now inject mock:
init_sync_service(mock_transport)
OnceCell is empty Succeeds!
Mock is used for all sync operations
```
## ✅ What's Working Now
### Full Sync Flow Validated
```
Device A:
1. Create location + entry in database
2. Call transaction_manager.commit_device_owned()
3. TransactionManager emits sync:state_change event
4. Event bus delivers to 3 subscribers
5. PeerSync picks up event
6. Calls network.get_connected_sync_partners()
→ MockTransportPeer returns [Device B UUID]
7. Calls network.send_sync_message(Device B, StateChange{...})
→ Message queued in A→B queue
8. Test pumps messages
9. MockTransportPeer delivers message to Device B's sync handler
10. Device B's PeerSync.on_state_change_received() called
11. Calls apply_state_change()
12. ❌ FK constraint fails (expected - see below)
```
### Proven Components
-**TransactionManager**: Emits events correctly
-**Event Bus**: Routes events to subscribers
-**PeerSync Event Listener**: Picks up sync events
-**Mock Transport**: get_connected_sync_partners() returns peers
-**Message Sending**: send_sync_message() queues messages
-**Message Delivery**: Bidirectional transport works
-**Message Reception**: on_state_change_received() called
-**Apply Routing**: apply_state_change() is invoked
## ❌ Expected Failures: FK Constraints
### Why Tests Fail (This is Good!)
```
Error: FOREIGN KEY constraint failed (code: 787)
```
**Root Cause**: Location has dependencies that don't exist on Device B:
```rust
CREATE TABLE locations (
device_id INTEGER NOT NULL REFERENCES devices(id), // ← Device A's local ID
entry_id INTEGER NOT NULL REFERENCES entries(id), // ← Entry doesn't exist yet
...
);
```
When we try to insert on Device B:
- `device_id=1` (Device A's local ID on its own DB)
- But on Device B, Device A might have `device_id=2` or not exist
- `entry_id=1` (location's root entry) doesn't exist on Device B yet
### What Needs to be Fixed
**1. Device ID Mapping** (location::apply_state_change)
```rust
// Current (broken):
location.device_id = data.device_id; // Local DB ID from Device A
// Fixed:
let device_uuid = data.device_uuid; // Sync by UUID
let local_device = devices::Entity::find()
.filter(devices::Column::Uuid.eq(device_uuid))
.one(db)
.await?;
location.device_id = local_device.id; // Map to Device B's local ID
```
**2. Entry Dependency** (must sync entries first, or handle missing FK)
```rust
// Option A: Queue for retry if entry doesn't exist
if entry_exists(entry_id).await {
insert_location();
} else {
queue_for_retry_after_dependency();
}
// Option B: Sync entries before locations (dependency ordering)
// This is why we have compute_sync_order()!
```
**3. Apply Function Needs Full Implementation**
Current `location::Model::apply_state_change()` probably does:
```rust
// Simplified version that fails:
let location: Model = serde_json::from_value(data)?;
location.insert(db).await?; // ← FK fails!
```
Needs:
```rust
pub async fn apply_state_change(data: Value, db: &DatabaseConnection) -> Result<()> {
// 1. Deserialize
let mut location_data: Model = serde_json::from_value(data)?;
// 2. Map device UUID → local device ID
let device_uuid = location_data.get_device_uuid()?; // Need to include in sync data
let local_device = get_or_create_device(device_uuid, db).await?;
location_data.device_id = local_device.id;
// 3. Handle entry FK (create stub or queue for retry)
if !entry_exists(location_data.entry_id, db).await {
// Create placeholder entry or skip entry_id
location_data.entry_id = create_stub_entry_for_location(db).await?;
}
// 4. Upsert by UUID
Entity::insert(location_data.into())
.on_conflict(
OnConflict::column(Column::Uuid)
.update_columns([...])
.to_owned()
)
.exec(db)
.await?;
Ok(())
}
```
## 🎯 Test-Driven Development Working Perfectly
Your tests now show:
1.**What works**: Full sync pipeline from TransactionManager → Events → Transport → Delivery
2.**What needs fixing**: Apply functions need FK dependency handling
The failing tests are **guiding rails** showing exactly what to implement next!
## Next Steps
### Immediate: Fix Location Apply Function
File: `core/src/infra/db/entities/location.rs`
```rust
impl Model {
pub async fn apply_state_change(
data: serde_json::Value,
db: &DatabaseConnection,
) -> Result<(), sea_orm::DbErr> {
// TODO:
// 1. Include device_uuid in sync data (not just device_id)
// 2. Map device UUID to local device ID
// 3. Handle missing entry_id (create stub or use null)
// 4. Implement proper upsert by UUID
}
}
```
### Then: Fix Other Models
- `entry::Model::apply_state_change()` - Similar FK mapping needed
- `tag::Model::apply_shared_change()` - Simpler (no FKs to devices)
### Finally: Watch Tests Pass
Once FK handling is implemented:
```bash
cargo test --test sync_integration_test
running 4 tests
test test_sync_entry_with_location ... ok ← Will pass!
test test_sync_infrastructure_summary ... ok
test test_sync_location_device_owned_state_based ... ok ← Will pass!
test test_sync_tag_shared_hlc_based ... ok ← Will pass!
```
## Summary
**The sync system works end-to-end!** The only missing pieces are:
1. UUID↔local ID mapping in apply functions
2. FK dependency handling
3. Proper upsert logic
These are straightforward database operations. The hard architectural work is done! ✅

View File

@@ -164,6 +164,10 @@ impl NetworkTransport for MockTransportPeer {
async fn is_device_reachable(&self, device_uuid: Uuid) -> bool {
device_uuid == self.peer_device_id
}
fn transport_name(&self) -> &'static str {
"MockTransportPeer"
}
}
impl MockTransportPeer {
@@ -313,19 +317,18 @@ impl SyncTestSetup {
let device_b_id = core_b.device.device_id()?;
info!("🖥️ Device B ID: {}", device_b_id);
// Create libraries on both cores
// With networking disabled, they'll use NoOpTransport which we can override
// Create libraries WITHOUT auto-sync-init (allows us to inject mock transport)
let library_a = core_a
.libraries
.create_library("Test Library A", None, core_a.context.clone())
.create_library_no_sync("Test Library A", None, core_a.context.clone())
.await?;
info!("📚 Library A created: {}", library_a.id());
info!("📚 Library A created (no auto-sync): {}", library_a.id());
let library_b = core_b
.libraries
.create_library("Test Library B", None, core_b.context.clone())
.create_library_no_sync("Test Library B", None, core_b.context.clone())
.await?;
info!("📚 Library B created: {}", library_b.id());
info!("📚 Library B created (no auto-sync): {}", library_b.id());
// Register devices in each other's libraries
// This also implicitly makes them sync partners (sync_enabled=true by default)
@@ -548,20 +551,19 @@ async fn test_sync_location_device_owned_state_based() -> anyhow::Result<()> {
let messages_a_to_b = setup.transport.get_a_to_b_messages().await;
info!("📨 Messages sent from A to B: {}", messages_a_to_b.len());
if messages_a_to_b.is_empty() {
info!("⚠️ No messages sent - expected until event system wired to TransactionManager");
info!(" Sync infrastructure is ready, but database operations don't emit events yet");
} else {
info!("✅ Messages are being sent!");
// Check for StateChange message
let has_state_change = messages_a_to_b.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::StateChange { .. }
)
});
assert!(has_state_change, "Expected StateChange message");
}
assert!(
!messages_a_to_b.is_empty(),
"Expected messages to be sent from A to B"
);
// Check for StateChange message
let has_state_change = messages_a_to_b.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::StateChange { .. }
)
});
assert!(has_state_change, "Expected StateChange message");
// Check if location appeared on Device B
let location_on_b = entities::location::Entity::find()
@@ -569,12 +571,16 @@ async fn test_sync_location_device_owned_state_based() -> anyhow::Result<()> {
.one(setup.library_b.db().conn())
.await?;
// This will fail with FK constraint until apply functions handle dependencies properly
// That's GOOD - it shows what needs to be fixed!
assert!(
location_on_b.is_some(),
"Location should sync to Device B (FK constraint failures indicate apply function needs dependency handling)"
);
if let Some(location) = location_on_b {
info!(" Location successfully synced to Device B!");
info!("🎉 Location successfully synced to Device B!");
assert_eq!(location.uuid, location_uuid);
assert_eq!(location.device_id, device_a_record.id); // Owned by Device A
} else {
info!("⚠️ Location NOT synced (expected until TransactionManager wired)");
}
// === MONITOR EVENTS ===
@@ -663,18 +669,19 @@ async fn test_sync_tag_shared_hlc_based() -> anyhow::Result<()> {
let messages_a_to_b = setup.transport.get_a_to_b_messages().await;
info!("📨 Messages sent from A to B: {}", messages_a_to_b.len());
if messages_a_to_b.is_empty() {
info!("⚠️ No messages sent - expected until TransactionManager integration");
} else {
// Check for SharedChange message
let has_shared_change = messages_a_to_b.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::SharedChange { .. }
)
});
assert!(has_shared_change, "Expected SharedChange message with HLC");
}
assert!(
!messages_a_to_b.is_empty(),
"Expected SharedChange messages to be sent"
);
// Check for SharedChange message
let has_shared_change = messages_a_to_b.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::SharedChange { .. }
)
});
assert!(has_shared_change, "Expected SharedChange message with HLC");
// Check if tag appeared on Device B
let tag_on_b = entities::tag::Entity::find()
@@ -682,14 +689,16 @@ async fn test_sync_tag_shared_hlc_based() -> anyhow::Result<()> {
.one(setup.library_b.db().conn())
.await?;
if let Some(synced_tag) = tag_on_b {
info!("✅ Tag successfully synced to Device B!");
assert_eq!(synced_tag.uuid, tag.id);
assert_eq!(synced_tag.canonical_name, "Vacation");
assert_eq!(synced_tag.namespace, Some("photos".to_string()));
} else {
info!("⚠️ Tag NOT synced (expected until TransactionManager wired)");
}
assert!(
tag_on_b.is_some(),
"Tag should sync to Device B (failure indicates apply_shared_change needs implementation/fixes)"
);
let synced_tag = tag_on_b.unwrap();
info!("🎉 Tag successfully synced to Device B!");
assert_eq!(synced_tag.uuid, tag.id);
assert_eq!(synced_tag.canonical_name, "Vacation");
assert_eq!(synced_tag.namespace, Some("photos".to_string()));
// Check ACK messages
let messages_b_to_a = setup.transport.get_b_to_a_messages().await;
@@ -832,14 +841,14 @@ async fn test_sync_entry_with_location() -> anyhow::Result<()> {
info!("📊 State changes: {:?}", state_changes);
if state_changes.is_empty() {
info!("⚠️ No state changes - expected until TransactionManager integration");
} else {
assert!(
state_changes.iter().any(|(m, _)| m == "entry"),
"Should have sent entry state change"
);
}
assert!(
!state_changes.is_empty(),
"Should have sent state change messages"
);
assert!(
state_changes.iter().any(|(m, _)| m == "entry"),
"Should have sent entry state change"
);
info!("✅ TEST COMPLETE: Entry sync infrastructure validated");
Ok(())