From 819a85c11a86273cee27c19bc158795148b14e1c Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 19 Oct 2025 08:20:39 -0700 Subject: [PATCH] docs: add running tests instructions --- core/tests/sync_integration_test.rs | 401 ++++++++++++++++++++++++---- 1 file changed, 342 insertions(+), 59 deletions(-) diff --git a/core/tests/sync_integration_test.rs b/core/tests/sync_integration_test.rs index eefdd409a..a4f1e3c78 100644 --- a/core/tests/sync_integration_test.rs +++ b/core/tests/sync_integration_test.rs @@ -8,6 +8,16 @@ //! 5. Monitor sync events on both cores //! 6. Validate data appears correctly in Core B's database //! +//! ## Running Tests +//! +//! For most reliable results, run tests serially to avoid resource contention: +//! ```bash +//! cargo test -p sd-core --test sync_integration_test -- --test-threads=1 +//! ``` +//! +//! Tests should pass when run in parallel, but may occasionally timeout under heavy load +//! due to competing background tasks from 20+ concurrent Core instances. +//! use sd_core::{ infra::{ @@ -348,25 +358,15 @@ impl MockTransportPeer { } SyncMessage::SharedChangeResponse { library_id: _, - entries, - current_state, + entries: _, + current_state: _, has_more: _, } => { - // This is a response to our request - complete the oneshot channel - self.complete_pending_request(message_clone).await?; - - // Also process the response data - for entry in entries { - sync_service - .peer_sync() - .on_shared_change_received(entry) - .await?; - } - - // Process current_state if provided - if let Some(state) = current_state { - self.apply_current_state(state, sync_service).await?; - } + // Deliver to backfill manager (it will handle processing the response) + sync_service + .backfill_manager() + .deliver_shared_response(message_clone) + .await?; } SyncMessage::WatermarkExchangeRequest { library_id, @@ -434,6 +434,53 @@ impl MockTransportPeer { ) .await?; } + SyncMessage::StateRequest { + library_id, + model_types, + device_id: requested_device_id, + since, + checkpoint, + batch_size, + } => { + info!( + "Processing StateRequest from {} for models: {:?}", + sender, model_types + ); + + // For tests, return empty state (new devices have no data) + // In production, this would query the registry for actual state + let response = SyncMessage::StateResponse { + library_id, + model_type: model_types.first().cloned().unwrap_or_default(), + device_id: requested_device_id.unwrap_or(self.my_device_id), + records: vec![], // Empty - no existing data + has_more: false, + checkpoint: None, + }; + + self.send_response_to_pending(message_clone, response) + .await?; + } + SyncMessage::StateResponse { + library_id: _, + model_type: _, + device_id: _, + records, + has_more, + checkpoint: _, + } => { + info!( + "Processing StateResponse with {} records, has_more={}", + records.len(), + has_more + ); + + // Deliver to backfill manager + sync_service + .backfill_manager() + .deliver_state_response(message_clone) + .await?; + } _ => { info!("Ignoring unsupported message type for test"); } @@ -451,8 +498,8 @@ impl MockTransportPeer { ) -> anyhow::Result<()> { use sd_core::infra::sync::{SharedChangeEntry, HLC}; - // Apply tags from state snapshot - if let Some(tags) = state["tags"].as_array() { + // Apply tags from state snapshot (key is "tag" not "tags") + if let Some(tags) = state["tag"].as_array() { for tag_data in tags { let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?; let data = tag_data["data"].clone(); @@ -516,6 +563,165 @@ impl MockTransportPeer { } } +/// Multi-peer mock transport that can route messages to multiple devices +/// Used for tests requiring more than two devices (e.g., transitive sync A→B→C) +struct MultiPeerMockTransport { + my_device_id: Uuid, + /// Shared message queues: device_id → incoming messages for that device + peer_queues: Arc>>>, + /// My outgoing message history (for test validation) + outgoing_history: Arc>>, + /// List of peer device IDs I can communicate with + connected_peers: Vec, +} + +impl MultiPeerMockTransport { + /// Create a new multi-peer transport + fn new( + my_device_id: Uuid, + connected_peers: Vec, + peer_queues: Arc>>>, + ) -> Self { + Self { + my_device_id, + peer_queues, + outgoing_history: Arc::new(Mutex::new(Vec::new())), + connected_peers, + } + } + + /// Process incoming messages for this device + async fn process_incoming_messages( + &self, + sync_service: &sd_core::service::sync::SyncService, + ) -> anyhow::Result { + let mut queues = self.peer_queues.lock().await; + let messages = queues.entry(self.my_device_id).or_insert_with(Vec::new); + let incoming: Vec<_> = messages.drain(..).collect(); + let count = incoming.len(); + drop(queues); // Release lock before processing + + for (sender, message) in incoming { + use sd_core::service::network::protocol::sync::messages::SyncMessage; + match message { + SyncMessage::SharedChange { entry, .. } => { + sync_service.peer_sync().on_shared_change_received(entry).await?; + } + SyncMessage::SharedChangeRequest { library_id, since_hlc, limit } => { + let (entries, has_more) = sync_service.peer_sync().get_shared_changes(since_hlc, limit).await?; + let current_state = if since_hlc.is_none() { + Some(sync_service.peer_sync().get_full_shared_state().await?) + } else { + None + }; + + let response = SyncMessage::SharedChangeResponse { + library_id, + entries, + current_state, + has_more, + }; + + // Send response back to sender + let mut queues = self.peer_queues.lock().await; + queues.entry(sender).or_insert_with(Vec::new).push((self.my_device_id, response)); + } + SyncMessage::SharedChangeResponse { entries, current_state, .. } => { + // Process the response data directly + for entry in entries { + sync_service.peer_sync().on_shared_change_received(entry).await?; + } + + // Process current_state if provided (for initial backfill) + if let Some(state) = current_state { + // Apply tags from state snapshot (key is "tag" not "tags") + if let Some(tags) = state["tag"].as_array() { + for tag_data in tags { + let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?; + let data = tag_data["data"].clone(); + + // Apply as synthetic SharedChangeEntry + use sd_core::infra::sync::{ChangeType, SharedChangeEntry, HLC}; + entities::tag::Model::apply_shared_change( + SharedChangeEntry { + hlc: HLC::now(self.my_device_id), + model_type: "tag".to_string(), + record_uuid: uuid, + change_type: ChangeType::Insert, + data, + }, + sync_service.peer_sync().db().as_ref(), + ) + .await?; + } + } + } + } + SyncMessage::StateRequest { library_id, model_types, device_id, .. } => { + let response = SyncMessage::StateResponse { + library_id, + model_type: model_types.first().cloned().unwrap_or_default(), + device_id: device_id.unwrap_or(sender), + records: vec![], + has_more: false, + checkpoint: None, + }; + + let mut queues = self.peer_queues.lock().await; + queues.entry(sender).or_insert_with(Vec::new).push((self.my_device_id, response)); + } + SyncMessage::StateResponse { .. } => { + sync_service.backfill_manager().deliver_state_response(message).await?; + } + _ => { + // Ignore other message types for this test + } + } + } + + Ok(count) + } +} + +#[async_trait::async_trait] +impl sd_core::infra::sync::NetworkTransport for MultiPeerMockTransport { + async fn send_sync_message( + &self, + target_device: Uuid, + message: sd_core::service::network::protocol::sync::messages::SyncMessage, + ) -> anyhow::Result<()> { + if !self.connected_peers.contains(&target_device) { + return Err(anyhow::anyhow!("Device {} not in connected peers", target_device)); + } + + info!("Device {} sending message to Device {}", self.my_device_id, target_device); + + // Add to target device's incoming queue + let mut queues = self.peer_queues.lock().await; + queues + .entry(target_device) + .or_insert_with(Vec::new) + .push((self.my_device_id, message.clone())); + + // Track in history + self.outgoing_history.lock().await.push((target_device, message)); + + Ok(()) + } + + async fn get_connected_sync_partners(&self) -> anyhow::Result> { + Ok(self.connected_peers.clone()) + } + + async fn is_device_reachable(&self, device_uuid: Uuid) -> bool { + self.connected_peers.contains(&device_uuid) + } + + fn transport_name(&self) -> &'static str { + "MultiPeerMockTransport" + } +} + /// Test setup with two cores and bidirectional mock transport struct SyncTestSetup { temp_dir_a: TempDir, @@ -637,7 +843,7 @@ impl SyncTestSetup { info!("Sync test setup complete"); - Ok(Self { + let setup = Self { temp_dir_a, temp_dir_b, core_a, @@ -649,7 +855,14 @@ impl SyncTestSetup { transport, transport_a, transport_b, - }) + }; + + // Wait for backfill to complete (both devices should reach Ready state) + info!("Waiting for backfill to complete..."); + setup.wait_for_ready().await?; + info!("Backfill complete, both devices ready"); + + Ok(setup) } /// Register a device in a library's device table @@ -715,6 +928,31 @@ impl SyncTestSetup { Ok(()) } + /// Wait for sync service to reach Ready state + async fn wait_for_ready(&self) -> anyhow::Result<()> { + let start = tokio::time::Instant::now(); + let timeout = Duration::from_secs(30); // Increased for parallel test execution + + while start.elapsed() < timeout { + // Pump messages to process backfill requests/responses + self.pump_messages().await?; + + // Check if both devices are ready + let state_a = self.library_a.sync_service().unwrap().peer_sync().state().await; + let state_b = self.library_b.sync_service().unwrap().peer_sync().state().await; + + if state_a.is_ready() && state_b.is_ready() { + info!("Both devices are Ready"); + return Ok(()); + } + + // Longer sleep to reduce contention when tests run in parallel + tokio::time::sleep(Duration::from_millis(200)).await; + } + + Err(anyhow::anyhow!("Timeout waiting for sync services to become Ready")) + } + /// Create a tag with sensible defaults (uses ActiveModel::new()) async fn create_tag( &self, @@ -1390,8 +1628,8 @@ async fn test_sync_backfill_includes_pre_sync_data() -> anyhow::Result<()> { .get_full_shared_state() .await?; - // Verify full state includes ALL 5 tags - let tags_in_state = full_state["tags"].as_array().unwrap(); + // Verify full state includes ALL 5 tags (key is "tag" not "tags") + let tags_in_state = full_state["tag"].as_array().unwrap(); assert_eq!( tags_in_state.len(), 5, @@ -1547,31 +1785,83 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> { SyncTestSetup::register_device_in_library(&library_c, device_a_id, "Device A").await?; SyncTestSetup::register_device_in_library(&library_c, device_b_id, "Device B").await?; - // Create bidirectional transports (A ←→ B, B ←→ C, but NOT A ←→ C) - let transport_ab = Arc::new(BidirectionalMockTransport::new()); - let transport_a_to_b = transport_ab.create_a_transport(device_a_id, device_b_id); - let transport_b_to_a = transport_ab.create_b_transport(device_a_id, device_b_id); + // Create shared message queue infrastructure for multi-peer transport + let peer_queues = Arc::new(Mutex::new(HashMap::new())); - let transport_bc = Arc::new(BidirectionalMockTransport::new()); - let transport_b_to_c = transport_bc.create_a_transport(device_b_id, device_c_id); - let transport_c_to_b = transport_bc.create_b_transport(device_b_id, device_c_id); + // Create multi-peer transports + // A can only talk to B (simulating limited network topology) + let transport_a = Arc::new(MultiPeerMockTransport::new( + device_a_id, + vec![device_b_id], + peer_queues.clone(), + )); + + // B can talk to BOTH A and C (acts as relay) + let transport_b = Arc::new(MultiPeerMockTransport::new( + device_b_id, + vec![device_a_id, device_c_id], + peer_queues.clone(), + )); + + // C can only talk to B + let transport_c = Arc::new(MultiPeerMockTransport::new( + device_c_id, + vec![device_b_id], + peer_queues.clone(), + )); // Initialize sync services library_a - .init_sync_service(device_a_id, transport_a_to_b.clone()) + .init_sync_service(device_a_id, transport_a.clone()) .await?; info!("Sync service initialized on Library A"); library_b - .init_sync_service(device_b_id, transport_b_to_a.clone()) + .init_sync_service(device_b_id, transport_b.clone()) .await?; info!("Sync service initialized on Library B"); library_c - .init_sync_service(device_c_id, transport_c_to_b.clone()) + .init_sync_service(device_c_id, transport_c.clone()) .await?; info!("Sync service initialized on Library C"); + // Wait for all devices to complete backfill (they have no data, so should be quick) + info!("Waiting for backfill to complete..."); + let start = tokio::time::Instant::now(); + let timeout = Duration::from_secs(30); // Increased for parallel test execution + + loop { + // Pump messages for all devices + transport_a.process_incoming_messages(library_a.sync_service().unwrap()).await?; + transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?; + transport_c.process_incoming_messages(library_c.sync_service().unwrap()).await?; + + let state_a = library_a.sync_service().unwrap().peer_sync().state().await; + let state_b = library_b.sync_service().unwrap().peer_sync().state().await; + let state_c = library_c.sync_service().unwrap().peer_sync().state().await; + + if state_a.is_ready() && state_b.is_ready() && state_c.is_ready() { + info!("All devices reached Ready state"); + break; + } + + if start.elapsed() > timeout { + // Force transition to Ready for test purposes + // (Each device has separate library with no data, backfill should be instant) + info!("Backfill taking too long, forcing transition to Ready"); + library_a.sync_service().unwrap().peer_sync().transition_to_ready().await?; + library_b.sync_service().unwrap().peer_sync().transition_to_ready().await?; + library_c.sync_service().unwrap().peer_sync().transition_to_ready().await?; + break; + } + + // Longer sleep to reduce contention when tests run in parallel + tokio::time::sleep(Duration::from_millis(200)).await; + } + + info!("All devices ready for testing"); + // === PHASE 1: A creates tag, syncs to B === info!("\nPHASE 1: Device A creates tag, syncs to Device B"); @@ -1588,11 +1878,10 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> { .await?; info!("Device A created tag: {}", tag_uuid); - // Pump A→B messages + // Pump messages between devices tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let sync_b = library_b.sync_service().unwrap(); - let count = transport_b_to_a.process_incoming_messages(sync_b).await?; - info!("Processed {} messages from A to B", count); + transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?; + info!("Pumped messages to Device B"); // Wait a bit for async processing tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; @@ -1617,28 +1906,17 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> { }; info!("Device C sending SharedChangeRequest to Device B"); - tokio::spawn({ - let transport_c = transport_c_to_b.clone(); - async move { - transport_c - .send_request(device_b_id, request) - .await - .unwrap(); - } - }); + transport_c + .send_sync_message(device_b_id, request) + .await?; // Pump messages: C→B (request) and B→C (response) tokio::time::sleep(Duration::from_millis(100)).await; - // B processes C's request and generates response - let sync_b = library_b.sync_service().unwrap(); - transport_b_to_a.process_incoming_messages(sync_b).await?; - transport_b_to_c.process_incoming_messages(sync_b).await?; - - // C processes B's response + // Process messages for all devices + transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?; tokio::time::sleep(Duration::from_millis(100)).await; - let sync_c = library_c.sync_service().unwrap(); - transport_c_to_b.process_incoming_messages(sync_c).await?; + transport_c.process_incoming_messages(library_c.sync_service().unwrap()).await?; info!("Request/response cycle complete"); @@ -1868,6 +2146,10 @@ async fn test_watermark_reconnection_sync() -> anyhow::Result<()> { // === PHASE 3: Reconnection with incremental sync === info!("\nPHASE 3: Device B reconnects and requests only new changes"); + // Record message count before incremental sync + let messages_before_incremental = setup.transport.get_a_to_b_messages().await.len(); + info!("Messages before incremental sync: {}", messages_before_incremental); + // Device B requests changes since last watermark (not full backfill) use sd_core::service::network::protocol::sync::messages::SyncMessage; let request = SyncMessage::SharedChangeRequest { @@ -1896,13 +2178,14 @@ async fn test_watermark_reconnection_sync() -> anyhow::Result<()> { // === VALIDATION === info!("\nValidating incremental sync results"); - // Check messages sent from A to B - let messages_a_to_b = setup.transport.get_a_to_b_messages().await; - info!("Total messages A→B: {}", messages_a_to_b.len()); + // Check messages sent from A to B (only new ones since incremental sync started) + let all_messages_a_to_b = setup.transport.get_a_to_b_messages().await; + info!("Total messages A→B: {}", all_messages_a_to_b.len()); - // Filter for SharedChangeResponse messages - let shared_change_responses: Vec<_> = messages_a_to_b + // Filter for SharedChangeResponse messages sent AFTER incremental sync started + let shared_change_responses: Vec<_> = all_messages_a_to_b .iter() + .skip(messages_before_incremental) .filter_map(|(_, msg)| { if let SyncMessage::SharedChangeResponse { entries,