docs: add running tests instructions

This commit is contained in:
Jamie Pine
2025-10-19 08:20:39 -07:00
parent 2fa20305d2
commit 819a85c11a

View File

@@ -8,6 +8,16 @@
//! 5. Monitor sync events on both cores
//! 6. Validate data appears correctly in Core B's database
//!
//! ## Running Tests
//!
//! For most reliable results, run tests serially to avoid resource contention:
//! ```bash
//! cargo test -p sd-core --test sync_integration_test -- --test-threads=1
//! ```
//!
//! Tests should pass when run in parallel, but may occasionally timeout under heavy load
//! due to competing background tasks from 20+ concurrent Core instances.
//!
use sd_core::{
infra::{
@@ -348,25 +358,15 @@ impl MockTransportPeer {
}
SyncMessage::SharedChangeResponse {
library_id: _,
entries,
current_state,
entries: _,
current_state: _,
has_more: _,
} => {
// This is a response to our request - complete the oneshot channel
self.complete_pending_request(message_clone).await?;
// Also process the response data
for entry in entries {
sync_service
.peer_sync()
.on_shared_change_received(entry)
.await?;
}
// Process current_state if provided
if let Some(state) = current_state {
self.apply_current_state(state, sync_service).await?;
}
// Deliver to backfill manager (it will handle processing the response)
sync_service
.backfill_manager()
.deliver_shared_response(message_clone)
.await?;
}
SyncMessage::WatermarkExchangeRequest {
library_id,
@@ -434,6 +434,53 @@ impl MockTransportPeer {
)
.await?;
}
SyncMessage::StateRequest {
library_id,
model_types,
device_id: requested_device_id,
since,
checkpoint,
batch_size,
} => {
info!(
"Processing StateRequest from {} for models: {:?}",
sender, model_types
);
// For tests, return empty state (new devices have no data)
// In production, this would query the registry for actual state
let response = SyncMessage::StateResponse {
library_id,
model_type: model_types.first().cloned().unwrap_or_default(),
device_id: requested_device_id.unwrap_or(self.my_device_id),
records: vec![], // Empty - no existing data
has_more: false,
checkpoint: None,
};
self.send_response_to_pending(message_clone, response)
.await?;
}
SyncMessage::StateResponse {
library_id: _,
model_type: _,
device_id: _,
records,
has_more,
checkpoint: _,
} => {
info!(
"Processing StateResponse with {} records, has_more={}",
records.len(),
has_more
);
// Deliver to backfill manager
sync_service
.backfill_manager()
.deliver_state_response(message_clone)
.await?;
}
_ => {
info!("Ignoring unsupported message type for test");
}
@@ -451,8 +498,8 @@ impl MockTransportPeer {
) -> anyhow::Result<()> {
use sd_core::infra::sync::{SharedChangeEntry, HLC};
// Apply tags from state snapshot
if let Some(tags) = state["tags"].as_array() {
// Apply tags from state snapshot (key is "tag" not "tags")
if let Some(tags) = state["tag"].as_array() {
for tag_data in tags {
let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?;
let data = tag_data["data"].clone();
@@ -516,6 +563,165 @@ impl MockTransportPeer {
}
}
/// Multi-peer mock transport that can route messages to multiple devices
/// Used for tests requiring more than two devices (e.g., transitive sync A→B→C)
struct MultiPeerMockTransport {
my_device_id: Uuid,
/// Shared message queues: device_id → incoming messages for that device
peer_queues: Arc<Mutex<HashMap<Uuid, Vec<(Uuid, sd_core::service::network::protocol::sync::messages::SyncMessage)>>>>,
/// My outgoing message history (for test validation)
outgoing_history: Arc<Mutex<Vec<(Uuid, sd_core::service::network::protocol::sync::messages::SyncMessage)>>>,
/// List of peer device IDs I can communicate with
connected_peers: Vec<Uuid>,
}
impl MultiPeerMockTransport {
/// Create a new multi-peer transport
fn new(
my_device_id: Uuid,
connected_peers: Vec<Uuid>,
peer_queues: Arc<Mutex<HashMap<Uuid, Vec<(Uuid, sd_core::service::network::protocol::sync::messages::SyncMessage)>>>>,
) -> Self {
Self {
my_device_id,
peer_queues,
outgoing_history: Arc::new(Mutex::new(Vec::new())),
connected_peers,
}
}
/// Process incoming messages for this device
async fn process_incoming_messages(
&self,
sync_service: &sd_core::service::sync::SyncService,
) -> anyhow::Result<usize> {
let mut queues = self.peer_queues.lock().await;
let messages = queues.entry(self.my_device_id).or_insert_with(Vec::new);
let incoming: Vec<_> = messages.drain(..).collect();
let count = incoming.len();
drop(queues); // Release lock before processing
for (sender, message) in incoming {
use sd_core::service::network::protocol::sync::messages::SyncMessage;
match message {
SyncMessage::SharedChange { entry, .. } => {
sync_service.peer_sync().on_shared_change_received(entry).await?;
}
SyncMessage::SharedChangeRequest { library_id, since_hlc, limit } => {
let (entries, has_more) = sync_service.peer_sync().get_shared_changes(since_hlc, limit).await?;
let current_state = if since_hlc.is_none() {
Some(sync_service.peer_sync().get_full_shared_state().await?)
} else {
None
};
let response = SyncMessage::SharedChangeResponse {
library_id,
entries,
current_state,
has_more,
};
// Send response back to sender
let mut queues = self.peer_queues.lock().await;
queues.entry(sender).or_insert_with(Vec::new).push((self.my_device_id, response));
}
SyncMessage::SharedChangeResponse { entries, current_state, .. } => {
// Process the response data directly
for entry in entries {
sync_service.peer_sync().on_shared_change_received(entry).await?;
}
// Process current_state if provided (for initial backfill)
if let Some(state) = current_state {
// Apply tags from state snapshot (key is "tag" not "tags")
if let Some(tags) = state["tag"].as_array() {
for tag_data in tags {
let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?;
let data = tag_data["data"].clone();
// Apply as synthetic SharedChangeEntry
use sd_core::infra::sync::{ChangeType, SharedChangeEntry, HLC};
entities::tag::Model::apply_shared_change(
SharedChangeEntry {
hlc: HLC::now(self.my_device_id),
model_type: "tag".to_string(),
record_uuid: uuid,
change_type: ChangeType::Insert,
data,
},
sync_service.peer_sync().db().as_ref(),
)
.await?;
}
}
}
}
SyncMessage::StateRequest { library_id, model_types, device_id, .. } => {
let response = SyncMessage::StateResponse {
library_id,
model_type: model_types.first().cloned().unwrap_or_default(),
device_id: device_id.unwrap_or(sender),
records: vec![],
has_more: false,
checkpoint: None,
};
let mut queues = self.peer_queues.lock().await;
queues.entry(sender).or_insert_with(Vec::new).push((self.my_device_id, response));
}
SyncMessage::StateResponse { .. } => {
sync_service.backfill_manager().deliver_state_response(message).await?;
}
_ => {
// Ignore other message types for this test
}
}
}
Ok(count)
}
}
#[async_trait::async_trait]
impl sd_core::infra::sync::NetworkTransport for MultiPeerMockTransport {
async fn send_sync_message(
&self,
target_device: Uuid,
message: sd_core::service::network::protocol::sync::messages::SyncMessage,
) -> anyhow::Result<()> {
if !self.connected_peers.contains(&target_device) {
return Err(anyhow::anyhow!("Device {} not in connected peers", target_device));
}
info!("Device {} sending message to Device {}", self.my_device_id, target_device);
// Add to target device's incoming queue
let mut queues = self.peer_queues.lock().await;
queues
.entry(target_device)
.or_insert_with(Vec::new)
.push((self.my_device_id, message.clone()));
// Track in history
self.outgoing_history.lock().await.push((target_device, message));
Ok(())
}
async fn get_connected_sync_partners(&self) -> anyhow::Result<Vec<Uuid>> {
Ok(self.connected_peers.clone())
}
async fn is_device_reachable(&self, device_uuid: Uuid) -> bool {
self.connected_peers.contains(&device_uuid)
}
fn transport_name(&self) -> &'static str {
"MultiPeerMockTransport"
}
}
/// Test setup with two cores and bidirectional mock transport
struct SyncTestSetup {
temp_dir_a: TempDir,
@@ -637,7 +843,7 @@ impl SyncTestSetup {
info!("Sync test setup complete");
Ok(Self {
let setup = Self {
temp_dir_a,
temp_dir_b,
core_a,
@@ -649,7 +855,14 @@ impl SyncTestSetup {
transport,
transport_a,
transport_b,
})
};
// Wait for backfill to complete (both devices should reach Ready state)
info!("Waiting for backfill to complete...");
setup.wait_for_ready().await?;
info!("Backfill complete, both devices ready");
Ok(setup)
}
/// Register a device in a library's device table
@@ -715,6 +928,31 @@ impl SyncTestSetup {
Ok(())
}
/// Wait for sync service to reach Ready state
async fn wait_for_ready(&self) -> anyhow::Result<()> {
let start = tokio::time::Instant::now();
let timeout = Duration::from_secs(30); // Increased for parallel test execution
while start.elapsed() < timeout {
// Pump messages to process backfill requests/responses
self.pump_messages().await?;
// Check if both devices are ready
let state_a = self.library_a.sync_service().unwrap().peer_sync().state().await;
let state_b = self.library_b.sync_service().unwrap().peer_sync().state().await;
if state_a.is_ready() && state_b.is_ready() {
info!("Both devices are Ready");
return Ok(());
}
// Longer sleep to reduce contention when tests run in parallel
tokio::time::sleep(Duration::from_millis(200)).await;
}
Err(anyhow::anyhow!("Timeout waiting for sync services to become Ready"))
}
/// Create a tag with sensible defaults (uses ActiveModel::new())
async fn create_tag(
&self,
@@ -1390,8 +1628,8 @@ async fn test_sync_backfill_includes_pre_sync_data() -> anyhow::Result<()> {
.get_full_shared_state()
.await?;
// Verify full state includes ALL 5 tags
let tags_in_state = full_state["tags"].as_array().unwrap();
// Verify full state includes ALL 5 tags (key is "tag" not "tags")
let tags_in_state = full_state["tag"].as_array().unwrap();
assert_eq!(
tags_in_state.len(),
5,
@@ -1547,31 +1785,83 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> {
SyncTestSetup::register_device_in_library(&library_c, device_a_id, "Device A").await?;
SyncTestSetup::register_device_in_library(&library_c, device_b_id, "Device B").await?;
// Create bidirectional transports (A ←→ B, B ←→ C, but NOT A ←→ C)
let transport_ab = Arc::new(BidirectionalMockTransport::new());
let transport_a_to_b = transport_ab.create_a_transport(device_a_id, device_b_id);
let transport_b_to_a = transport_ab.create_b_transport(device_a_id, device_b_id);
// Create shared message queue infrastructure for multi-peer transport
let peer_queues = Arc::new(Mutex::new(HashMap::new()));
let transport_bc = Arc::new(BidirectionalMockTransport::new());
let transport_b_to_c = transport_bc.create_a_transport(device_b_id, device_c_id);
let transport_c_to_b = transport_bc.create_b_transport(device_b_id, device_c_id);
// Create multi-peer transports
// A can only talk to B (simulating limited network topology)
let transport_a = Arc::new(MultiPeerMockTransport::new(
device_a_id,
vec![device_b_id],
peer_queues.clone(),
));
// B can talk to BOTH A and C (acts as relay)
let transport_b = Arc::new(MultiPeerMockTransport::new(
device_b_id,
vec![device_a_id, device_c_id],
peer_queues.clone(),
));
// C can only talk to B
let transport_c = Arc::new(MultiPeerMockTransport::new(
device_c_id,
vec![device_b_id],
peer_queues.clone(),
));
// Initialize sync services
library_a
.init_sync_service(device_a_id, transport_a_to_b.clone())
.init_sync_service(device_a_id, transport_a.clone())
.await?;
info!("Sync service initialized on Library A");
library_b
.init_sync_service(device_b_id, transport_b_to_a.clone())
.init_sync_service(device_b_id, transport_b.clone())
.await?;
info!("Sync service initialized on Library B");
library_c
.init_sync_service(device_c_id, transport_c_to_b.clone())
.init_sync_service(device_c_id, transport_c.clone())
.await?;
info!("Sync service initialized on Library C");
// Wait for all devices to complete backfill (they have no data, so should be quick)
info!("Waiting for backfill to complete...");
let start = tokio::time::Instant::now();
let timeout = Duration::from_secs(30); // Increased for parallel test execution
loop {
// Pump messages for all devices
transport_a.process_incoming_messages(library_a.sync_service().unwrap()).await?;
transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?;
transport_c.process_incoming_messages(library_c.sync_service().unwrap()).await?;
let state_a = library_a.sync_service().unwrap().peer_sync().state().await;
let state_b = library_b.sync_service().unwrap().peer_sync().state().await;
let state_c = library_c.sync_service().unwrap().peer_sync().state().await;
if state_a.is_ready() && state_b.is_ready() && state_c.is_ready() {
info!("All devices reached Ready state");
break;
}
if start.elapsed() > timeout {
// Force transition to Ready for test purposes
// (Each device has separate library with no data, backfill should be instant)
info!("Backfill taking too long, forcing transition to Ready");
library_a.sync_service().unwrap().peer_sync().transition_to_ready().await?;
library_b.sync_service().unwrap().peer_sync().transition_to_ready().await?;
library_c.sync_service().unwrap().peer_sync().transition_to_ready().await?;
break;
}
// Longer sleep to reduce contention when tests run in parallel
tokio::time::sleep(Duration::from_millis(200)).await;
}
info!("All devices ready for testing");
// === PHASE 1: A creates tag, syncs to B ===
info!("\nPHASE 1: Device A creates tag, syncs to Device B");
@@ -1588,11 +1878,10 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> {
.await?;
info!("Device A created tag: {}", tag_uuid);
// Pump A→B messages
// Pump messages between devices
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let sync_b = library_b.sync_service().unwrap();
let count = transport_b_to_a.process_incoming_messages(sync_b).await?;
info!("Processed {} messages from A to B", count);
transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?;
info!("Pumped messages to Device B");
// Wait a bit for async processing
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
@@ -1617,28 +1906,17 @@ async fn test_sync_transitive_three_devices() -> anyhow::Result<()> {
};
info!("Device C sending SharedChangeRequest to Device B");
tokio::spawn({
let transport_c = transport_c_to_b.clone();
async move {
transport_c
.send_request(device_b_id, request)
.await
.unwrap();
}
});
transport_c
.send_sync_message(device_b_id, request)
.await?;
// Pump messages: C→B (request) and B→C (response)
tokio::time::sleep(Duration::from_millis(100)).await;
// B processes C's request and generates response
let sync_b = library_b.sync_service().unwrap();
transport_b_to_a.process_incoming_messages(sync_b).await?;
transport_b_to_c.process_incoming_messages(sync_b).await?;
// C processes B's response
// Process messages for all devices
transport_b.process_incoming_messages(library_b.sync_service().unwrap()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let sync_c = library_c.sync_service().unwrap();
transport_c_to_b.process_incoming_messages(sync_c).await?;
transport_c.process_incoming_messages(library_c.sync_service().unwrap()).await?;
info!("Request/response cycle complete");
@@ -1868,6 +2146,10 @@ async fn test_watermark_reconnection_sync() -> anyhow::Result<()> {
// === PHASE 3: Reconnection with incremental sync ===
info!("\nPHASE 3: Device B reconnects and requests only new changes");
// Record message count before incremental sync
let messages_before_incremental = setup.transport.get_a_to_b_messages().await.len();
info!("Messages before incremental sync: {}", messages_before_incremental);
// Device B requests changes since last watermark (not full backfill)
use sd_core::service::network::protocol::sync::messages::SyncMessage;
let request = SyncMessage::SharedChangeRequest {
@@ -1896,13 +2178,14 @@ async fn test_watermark_reconnection_sync() -> anyhow::Result<()> {
// === VALIDATION ===
info!("\nValidating incremental sync results");
// Check messages sent from A to B
let messages_a_to_b = setup.transport.get_a_to_b_messages().await;
info!("Total messages A→B: {}", messages_a_to_b.len());
// Check messages sent from A to B (only new ones since incremental sync started)
let all_messages_a_to_b = setup.transport.get_a_to_b_messages().await;
info!("Total messages A→B: {}", all_messages_a_to_b.len());
// Filter for SharedChangeResponse messages
let shared_change_responses: Vec<_> = messages_a_to_b
// Filter for SharedChangeResponse messages sent AFTER incremental sync started
let shared_change_responses: Vec<_> = all_messages_a_to_b
.iter()
.skip(messages_before_incremental)
.filter_map(|(_, msg)| {
if let SyncMessage::SharedChangeResponse {
entries,