Files
spacedrive/core/tests/sync_integration_test.rs
Jamie Pine 94a52dd2df cleanup
2025-10-11 08:48:11 -07:00

1554 lines
45 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Sync Integration Test
//!
//! This test validates the full end-to-end sync flow using mock transport:
//! 1. Set up two Core instances with separate libraries
//! 2. Connect them via mock transport (simulating network layer)
//! 3. Initialize sync services on both cores
//! 4. Create test data on Core A (locations, entries, tags)
//! 5. Monitor sync events on both cores
//! 6. Validate data appears correctly in Core B's database
//!
use sd_core::{
infra::{
db::entities,
event::Event,
sync::{ChangeType, NetworkTransport, Syncable},
},
library::Library,
Core,
};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};
use std::{collections::HashMap, sync::Arc};
use tempfile::TempDir;
use tokio::{
sync::{oneshot, Mutex},
time::{timeout, Duration},
};
use tracing::info;
use uuid::Uuid;
/// Shared mock transport that connects two core instances
struct BidirectionalMockTransport {
/// Messages from A to B
a_to_b: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// Messages from B to A
b_to_a: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// History of all messages sent from A to B (never cleared)
a_to_b_history: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// History of all messages sent from B to A (never cleared)
b_to_a_history: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
}
impl BidirectionalMockTransport {
fn new() -> Self {
Self {
a_to_b: Arc::new(Mutex::new(Vec::new())),
b_to_a: Arc::new(Mutex::new(Vec::new())),
a_to_b_history: Arc::new(Mutex::new(Vec::new())),
b_to_a_history: Arc::new(Mutex::new(Vec::new())),
}
}
/// Create transport for device A (sends to A->B, receives from B->A)
fn create_a_transport(&self, device_a_id: Uuid, device_b_id: Uuid) -> Arc<MockTransportPeer> {
Arc::new(MockTransportPeer {
my_device_id: device_a_id,
peer_device_id: device_b_id,
outgoing: self.a_to_b.clone(),
incoming: self.b_to_a.clone(),
outgoing_history: self.a_to_b_history.clone(),
pending_requests: Arc::new(Mutex::new(HashMap::new())),
})
}
/// Create transport for device B (sends to B->A, receives from A->B)
fn create_b_transport(&self, device_a_id: Uuid, device_b_id: Uuid) -> Arc<MockTransportPeer> {
Arc::new(MockTransportPeer {
my_device_id: device_b_id,
peer_device_id: device_a_id,
outgoing: self.b_to_a.clone(),
incoming: self.a_to_b.clone(),
outgoing_history: self.b_to_a_history.clone(),
pending_requests: Arc::new(Mutex::new(HashMap::new())),
})
}
/// Get all messages sent from A to B (from history, not cleared by pump)
async fn get_a_to_b_messages(
&self,
) -> Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)> {
self.a_to_b_history.lock().await.clone()
}
/// Get all messages sent from B to A (from history, not cleared by pump)
async fn get_b_to_a_messages(
&self,
) -> Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)> {
self.b_to_a_history.lock().await.clone()
}
}
/// Mock transport peer that can send and receive messages with request/response support
struct MockTransportPeer {
my_device_id: Uuid,
peer_device_id: Uuid,
/// Outgoing message queue (messages I send)
outgoing: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// Incoming message queue (messages sent to me)
incoming: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// Outgoing message history (never cleared)
outgoing_history: Arc<
Mutex<
Vec<(
Uuid,
sd_core::service::network::protocol::sync::messages::SyncMessage,
)>,
>,
>,
/// Pending requests waiting for responses (request hash → response sender)
pending_requests: Arc<
Mutex<
HashMap<
u64,
oneshot::Sender<sd_core::service::network::protocol::sync::messages::SyncMessage>,
>,
>,
>,
}
#[async_trait::async_trait]
impl NetworkTransport for MockTransportPeer {
async fn send_sync_message(
&self,
target_device: Uuid,
message: sd_core::service::network::protocol::sync::messages::SyncMessage,
) -> anyhow::Result<()> {
eprintln!(
"MockTransportPeer::send_sync_message called! target={}, my_device={}",
target_device, self.my_device_id
);
if target_device != self.peer_device_id {
return Err(anyhow::anyhow!("Unknown device: {}", target_device));
}
info!(
"Device {} sending message to Device {}",
self.my_device_id, target_device
);
self.outgoing
.lock()
.await
.push((target_device, message.clone()));
self.outgoing_history
.lock()
.await
.push((target_device, message));
Ok(())
}
async fn get_connected_sync_partners(&self) -> anyhow::Result<Vec<Uuid>> {
eprintln!("MockTransportPeer::get_connected_sync_partners called!");
eprintln!(" Returning peer: {}", self.peer_device_id);
// For testing, always return the peer as connected
info!(
"Mock transport: returning peer {} as connected",
self.peer_device_id
);
Ok(vec![self.peer_device_id])
}
async fn is_device_reachable(&self, device_uuid: Uuid) -> bool {
device_uuid == self.peer_device_id
}
fn transport_name(&self) -> &'static str {
"MockTransportPeer"
}
}
impl MockTransportPeer {
/// Send a request message and wait for response
async fn send_request(
&self,
target_device: Uuid,
request: sd_core::service::network::protocol::sync::messages::SyncMessage,
) -> anyhow::Result<sd_core::service::network::protocol::sync::messages::SyncMessage> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// Create unique request ID by hashing the message
let mut hasher = DefaultHasher::new();
format!("{:?}", request).hash(&mut hasher);
let request_id = hasher.finish();
// Create oneshot channel for response
let (tx, rx) = oneshot::channel();
self.pending_requests.lock().await.insert(request_id, tx);
info!(
"Device {} sending REQUEST to Device {} (id: {})",
self.my_device_id, target_device, request_id
);
// Send request
self.outgoing.lock().await.push((target_device, request));
// Wait for response with timeout
match tokio::time::timeout(Duration::from_secs(10), rx).await {
Ok(Ok(response)) => {
info!(
"Device {} received RESPONSE (id: {})",
self.my_device_id, request_id
);
Ok(response)
}
Ok(Err(_)) => Err(anyhow::anyhow!("Response channel closed")),
Err(_) => Err(anyhow::anyhow!("Request timeout")),
}
}
/// Process incoming messages by delivering them to the sync service
async fn process_incoming_messages(
&self,
sync_service: &sd_core::service::sync::SyncService,
) -> anyhow::Result<usize> {
let mut incoming = self.incoming.lock().await;
let messages: Vec<_> = incoming.drain(..).collect();
let count = messages.len();
for (sender, message) in messages {
info!(
"Device {} received message from Device {}",
self.my_device_id, sender
);
// Clone message for later use if needed
let message_clone = message.clone();
// Route message to appropriate handler
use sd_core::service::network::protocol::sync::messages::SyncMessage;
match message {
SyncMessage::StateChange {
library_id: _,
model_type,
record_uuid,
device_id,
data,
timestamp,
} => {
sync_service
.peer_sync()
.on_state_change_received(
sd_core::service::sync::state::StateChangeMessage {
model_type,
record_uuid,
device_id,
data,
timestamp,
},
)
.await?;
}
SyncMessage::SharedChange {
library_id: _,
entry,
} => {
sync_service
.peer_sync()
.on_shared_change_received(entry)
.await?;
}
SyncMessage::AckSharedChanges {
library_id: _,
from_device,
up_to_hlc,
} => {
sync_service
.peer_sync()
.on_ack_received(from_device, up_to_hlc)
.await?;
}
SyncMessage::SharedChangeRequest {
library_id,
since_hlc,
limit,
} => {
// Process request and generate response
let (entries, has_more) = sync_service
.peer_sync()
.get_shared_changes(since_hlc, limit)
.await?;
// Get full state for initial backfill
let current_state = if since_hlc.is_none() {
Some(sync_service.peer_sync().get_full_shared_state().await?)
} else {
None
};
// Send response back to requester
let response = SyncMessage::SharedChangeResponse {
library_id,
entries,
current_state,
has_more,
};
self.send_response_to_pending(message_clone, response)
.await?;
}
SyncMessage::SharedChangeResponse {
library_id: _,
entries,
current_state,
has_more: _,
} => {
// This is a response to our request - complete the oneshot channel
self.complete_pending_request(message_clone).await?;
// Also process the response data
for entry in entries {
sync_service
.peer_sync()
.on_shared_change_received(entry)
.await?;
}
// Process current_state if provided
if let Some(state) = current_state {
self.apply_current_state(state, sync_service).await?;
}
}
_ => {
info!("Ignoring unsupported message type for test");
}
}
}
Ok(count)
}
/// Apply current_state snapshot to database
async fn apply_current_state(
&self,
state: serde_json::Value,
sync_service: &sd_core::service::sync::SyncService,
) -> anyhow::Result<()> {
use sd_core::infra::sync::{SharedChangeEntry, HLC};
// Apply tags from state snapshot
if let Some(tags) = state["tags"].as_array() {
for tag_data in tags {
let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?;
let data = tag_data["data"].clone();
// Apply as synthetic SharedChangeEntry
entities::tag::Model::apply_shared_change(
SharedChangeEntry {
hlc: HLC::now(self.my_device_id),
model_type: "tag".to_string(),
record_uuid: uuid,
change_type: ChangeType::Insert,
data,
},
sync_service.peer_sync().db().as_ref(),
)
.await?;
}
}
Ok(())
}
/// Send response to the peer that made the request
async fn send_response_to_pending(
&self,
original_request: sd_core::service::network::protocol::sync::messages::SyncMessage,
response: sd_core::service::network::protocol::sync::messages::SyncMessage,
) -> anyhow::Result<()> {
// Send response through outgoing queue
self.outgoing
.lock()
.await
.push((self.peer_device_id, response));
Ok(())
}
/// Complete a pending request with the received response
async fn complete_pending_request(
&self,
response: sd_core::service::network::protocol::sync::messages::SyncMessage,
) -> anyhow::Result<()> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// Hash the response to find matching request
// (This is simplified - production would use proper request IDs)
let mut hasher = DefaultHasher::new();
format!("{:?}", response).hash(&mut hasher);
let response_id = hasher.finish();
// Try to complete the oneshot channel
if let Some(tx) = self.pending_requests.lock().await.remove(&response_id) {
let _ = tx.send(response);
}
Ok(())
}
}
/// Test setup with two cores and bidirectional mock transport
struct SyncTestSetup {
temp_dir_a: TempDir,
temp_dir_b: TempDir,
core_a: Core,
core_b: Core,
library_a: Arc<Library>,
library_b: Arc<Library>,
device_a_id: Uuid,
device_b_id: Uuid,
transport: Arc<BidirectionalMockTransport>,
transport_a: Arc<MockTransportPeer>,
transport_b: Arc<MockTransportPeer>,
}
impl SyncTestSetup {
/// Create a new sync test setup with two cores
async fn new() -> anyhow::Result<Self> {
// Initialize tracing for test debugging
let _ = tracing_subscriber::fmt()
.with_env_filter("sd_core=debug,sync_integration_test=debug")
.with_test_writer()
.try_init();
info!("Setting up sync integration test");
// Create temporary directories for both cores
let temp_dir_a = TempDir::new()?;
let temp_dir_b = TempDir::new()?;
info!("Core A directory: {:?}", temp_dir_a.path());
info!("Core B directory: {:?}", temp_dir_b.path());
// Create config with networking DISABLED (so we can inject our mock transport)
let config_a = sd_core::config::AppConfig {
version: 3,
data_dir: temp_dir_a.path().to_path_buf(),
log_level: "info".to_string(),
telemetry_enabled: false,
preferences: sd_core::config::Preferences::default(),
job_logging: sd_core::config::JobLoggingConfig::default(),
services: sd_core::config::ServiceConfig {
networking_enabled: false, // Disable networking so we can inject mock
volume_monitoring_enabled: false,
location_watcher_enabled: false,
},
};
config_a.save()?;
let config_b = sd_core::config::AppConfig {
version: 3,
data_dir: temp_dir_b.path().to_path_buf(),
log_level: "info".to_string(),
telemetry_enabled: false,
preferences: sd_core::config::Preferences::default(),
job_logging: sd_core::config::JobLoggingConfig::default(),
services: sd_core::config::ServiceConfig {
networking_enabled: false, // Disable networking so we can inject mock
volume_monitoring_enabled: false,
location_watcher_enabled: false,
},
};
config_b.save()?;
// Initialize Core A (will load config from disk with networking disabled)
let core_a = Core::new_with_config(temp_dir_a.path().to_path_buf())
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let device_a_id = core_a.device.device_id()?;
info!(" Device A ID: {}", device_a_id);
// Initialize Core B
let core_b = Core::new_with_config(temp_dir_b.path().to_path_buf())
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let device_b_id = core_b.device.device_id()?;
info!(" Device B ID: {}", device_b_id);
// Create libraries WITHOUT auto-sync-init (allows us to inject mock transport)
let library_a = core_a
.libraries
.create_library_no_sync("Test Library A", None, core_a.context.clone())
.await?;
info!("Library A created (no auto-sync): {}", library_a.id());
let library_b = core_b
.libraries
.create_library_no_sync("Test Library B", None, core_b.context.clone())
.await?;
info!("Library B created (no auto-sync): {}", library_b.id());
// Register devices in each other's libraries
// This also implicitly makes them sync partners (sync_enabled=true by default)
Self::register_device_in_library(&library_a, device_b_id, "Device B").await?;
Self::register_device_in_library(&library_b, device_a_id, "Device A").await?;
// Create bidirectional mock transport
let transport = Arc::new(BidirectionalMockTransport::new());
let transport_a = transport.create_a_transport(device_a_id, device_b_id);
let transport_b = transport.create_b_transport(device_a_id, device_b_id);
// Now explicitly initialize sync with our mock transports
// This should work since networking was disabled, so sync wasn't auto-initialized
library_a
.init_sync_service(
device_a_id,
transport_a.clone() as Arc<dyn NetworkTransport>,
)
.await?;
info!("Sync service initialized on Library A with mock transport");
library_b
.init_sync_service(
device_b_id,
transport_b.clone() as Arc<dyn NetworkTransport>,
)
.await?;
info!("Sync service initialized on Library B with mock transport");
info!("Sync test setup complete");
Ok(Self {
temp_dir_a,
temp_dir_b,
core_a,
core_b,
library_a,
library_b,
device_a_id,
device_b_id,
transport,
transport_a,
transport_b,
})
}
/// Register a device in a library's device table
async fn register_device_in_library(
library: &Arc<Library>,
device_id: Uuid,
device_name: &str,
) -> anyhow::Result<()> {
use chrono::Utc;
let device_model = entities::device::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(device_id),
name: Set(device_name.to_string()),
os: Set("Test OS".to_string()),
os_version: Set(Some("1.0".to_string())),
hardware_model: Set(None),
network_addresses: Set(serde_json::json!([])),
is_online: Set(false),
last_seen_at: Set(Utc::now()),
capabilities: Set(serde_json::json!({})),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
sync_enabled: Set(true), // Enable sync by default
last_sync_at: Set(None),
};
device_model.insert(library.db().conn()).await?;
info!("Registered {} in library {}", device_name, library.id());
Ok(())
}
/// Process messages in both directions
async fn pump_messages(&self) -> anyhow::Result<()> {
let sync_a = self.library_a.sync_service().unwrap();
let sync_b = self.library_b.sync_service().unwrap();
// Process A->B messages
let count_a_to_b = self.transport_b.process_incoming_messages(sync_b).await?;
if count_a_to_b > 0 {
info!("Processed {} messages from A to B", count_a_to_b);
}
// Process B->A messages
let count_b_to_a = self.transport_a.process_incoming_messages(sync_a).await?;
if count_b_to_a > 0 {
info!("Processed {} messages from B to A", count_b_to_a);
}
Ok(())
}
/// Wait for sync to complete with message pumping
async fn wait_for_sync(&self, duration: Duration) -> anyhow::Result<()> {
let start = tokio::time::Instant::now();
while start.elapsed() < duration {
self.pump_messages().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
}
#[tokio::test]
async fn test_sync_location_device_owned_state_based() -> anyhow::Result<()> {
info!("TEST: Location Sync (Device-Owned, State-Based)");
let setup = SyncTestSetup::new().await?;
// Subscribe to events on both cores
let mut events_a = setup.core_a.events.subscribe();
let mut events_b = setup.core_b.events.subscribe();
// Collect events in background tasks
let events_a_collected = Arc::new(Mutex::new(Vec::new()));
let events_b_collected = Arc::new(Mutex::new(Vec::new()));
let events_a_clone = events_a_collected.clone();
let collector_a = tokio::spawn(async move {
while let Ok(event) = timeout(Duration::from_secs(5), events_a.recv()).await {
if let Ok(event) = event {
events_a_clone.lock().await.push(event);
}
}
});
let events_b_clone = events_b_collected.clone();
let collector_b = tokio::spawn(async move {
while let Ok(event) = timeout(Duration::from_secs(5), events_b.recv()).await {
if let Ok(event) = event {
events_b_clone.lock().await.push(event);
}
}
});
// === ACTION: Create a location on Core A (device-owned data) ===
info!("Creating location on Device A");
let location_uuid = Uuid::new_v4();
let entry_uuid = Uuid::new_v4();
// Get device A's database ID
let device_a_record = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(setup.device_a_id))
.one(setup.library_a.db().conn())
.await?
.expect("Device A should exist");
// Create entry for the location directory (manually, since TransactionManager not yet wired)
let entry_model = entities::entry::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(Some(entry_uuid)),
name: Set("Test Location".to_string()),
kind: Set(1), // Directory
extension: Set(None),
metadata_id: Set(None),
content_id: Set(None),
size: Set(0),
aggregate_size: Set(0),
child_count: Set(0),
file_count: Set(0),
created_at: Set(chrono::Utc::now()),
modified_at: Set(chrono::Utc::now()),
accessed_at: Set(None),
permissions: Set(None),
inode: Set(None),
parent_id: Set(None),
};
let entry_record = entry_model.insert(setup.library_a.db().conn()).await?;
// === MANUALLY CREATE ENTRY ON DEVICE B (to satisfy FK dependency) ===
// In production, entry would sync first via dependency ordering
// For this test, we manually create it to test location FK mapping
info!("Manually creating entry on Device B (simulating prior sync)");
let entry_model_b = entities::entry::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(Some(entry_uuid)), // Same UUID!
name: Set("Test Location".to_string()),
kind: Set(1), // Directory
extension: Set(None),
metadata_id: Set(None),
content_id: Set(None),
size: Set(0),
aggregate_size: Set(0),
child_count: Set(0),
file_count: Set(0),
created_at: Set(chrono::Utc::now()),
modified_at: Set(chrono::Utc::now()),
accessed_at: Set(None),
permissions: Set(None),
inode: Set(None),
parent_id: Set(None),
};
entry_model_b.insert(setup.library_b.db().conn()).await?;
info!("Entry dependency satisfied on Device B");
// Create location record
let location_model = entities::location::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(location_uuid),
device_id: Set(device_a_record.id),
entry_id: Set(entry_record.id),
name: Set(Some("Test Location".to_string())),
index_mode: Set("shallow".to_string()),
scan_state: Set("completed".to_string()),
last_scan_at: Set(Some(chrono::Utc::now())),
error_message: Set(None),
total_file_count: Set(0),
total_byte_size: Set(0),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
};
let location_record = location_model.insert(setup.library_a.db().conn()).await?;
info!("Created location on Device A: {}", location_uuid);
// === USE SYNC API to emit sync events ===
info!("Using new sync API with automatic FK conversion");
// Sync the location (automatically handles device_id → device_uuid and entry_id → entry_uuid)
setup
.library_a
.sync_model_with_db(
&location_record,
ChangeType::Insert,
setup.library_a.db().conn(),
)
.await
.map_err(|e| anyhow::anyhow!("Sync error: {}", e))?;
// === PUMP MESSAGES ===
info!("Pumping messages between devices");
setup.wait_for_sync(Duration::from_secs(2)).await?;
// === VALIDATION ===
info!("Validating sync results");
// Note: Messages were already pumped and processed during wait_for_sync,
// so we validate by checking if data appeared on Device B, not message queues
// Check if location appeared on Device B
let location_on_b = entities::location::Entity::find()
.filter(entities::location::Column::Uuid.eq(location_uuid))
.one(setup.library_b.db().conn())
.await?;
// This will fail with FK constraint until apply functions handle dependencies properly
// That's GOOD - it shows what needs to be fixed!
assert!(
location_on_b.is_some(),
"Location should sync to Device B (FK constraint failures indicate apply function needs dependency handling)"
);
if let Some(location) = location_on_b {
info!("Location successfully synced to Device B!");
assert_eq!(location.uuid, location_uuid);
}
// === MONITOR EVENTS ===
tokio::time::sleep(Duration::from_millis(500)).await;
collector_a.abort();
collector_b.abort();
let events_a_list = events_a_collected.lock().await;
let events_b_list = events_b_collected.lock().await;
info!("Events on Device A: {}", events_a_list.len());
info!("Events on Device B: {}", events_b_list.len());
// Log some interesting events
for event in events_a_list.iter() {
if matches!(event, Event::Custom { .. }) {
info!(" A: {:?}", event);
}
}
for event in events_b_list.iter() {
if matches!(event, Event::Custom { .. }) {
info!(" B: {:?}", event);
}
}
info!("TEST COMPLETE: Location sync infrastructure validated");
Ok(())
}
#[tokio::test]
async fn test_sync_tag_shared_hlc_based() -> anyhow::Result<()> {
info!("TEST: Tag Sync (Shared Resource, HLC-Based)");
let setup = SyncTestSetup::new().await?;
// === ACTION: Create a tag on Core A (shared resource) ===
info!(" Creating tag on Device A");
// Create tag entity directly (not through manager to avoid domain/entity mismatch)
let tag_uuid = Uuid::new_v4();
let tag_model = entities::tag::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(tag_uuid),
canonical_name: Set("Vacation".to_string()),
display_name: Set(None),
formal_name: Set(None),
abbreviation: Set(None),
aliases: Set(None),
namespace: Set(Some("photos".to_string())),
tag_type: Set("standard".to_string()),
color: Set(None),
icon: Set(None),
description: Set(None),
is_organizational_anchor: Set(false),
privacy_level: Set("normal".to_string()),
search_weight: Set(100),
attributes: Set(None),
composition_rules: Set(None),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
created_by_device: Set(Some(setup.device_a_id)),
};
let tag_record = tag_model.insert(setup.library_a.db().conn()).await?;
info!(
"Created tag on Device A: {} ({})",
tag_record.canonical_name, tag_record.uuid
);
// === USE SYNC API to emit sync events ===
info!("Using new sync API for shared resource sync");
setup
.library_a
.sync_model(&tag_record, ChangeType::Insert)
.await
.map_err(|e| anyhow::anyhow!("Sync error: {}", e))?;
// === PUMP MESSAGES ===
info!("Pumping messages between devices");
setup.wait_for_sync(Duration::from_secs(2)).await?;
// === VALIDATION ===
info!("Validating sync results");
let messages_a_to_b = setup.transport.get_a_to_b_messages().await;
info!("Messages sent from A to B: {}", messages_a_to_b.len());
assert!(
!messages_a_to_b.is_empty(),
"Expected SharedChange messages to be sent"
);
// Check for SharedChange message
let has_shared_change = messages_a_to_b.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::SharedChange { .. }
)
});
assert!(has_shared_change, "Expected SharedChange message with HLC");
// Check if tag appeared on Device B
let tag_on_b = entities::tag::Entity::find()
.filter(entities::tag::Column::Uuid.eq(tag_uuid))
.one(setup.library_b.db().conn())
.await?;
assert!(
tag_on_b.is_some(),
"Tag should sync to Device B (failure indicates apply_shared_change needs implementation/fixes)"
);
let synced_tag = tag_on_b.unwrap();
info!("Tag successfully synced to Device B!");
assert_eq!(synced_tag.uuid, tag_uuid);
assert_eq!(synced_tag.canonical_name, "Vacation");
assert_eq!(synced_tag.namespace, Some("photos".to_string()));
// Check ACK messages
let messages_b_to_a = setup.transport.get_b_to_a_messages().await;
let has_ack = messages_b_to_a.iter().any(|(_, msg)| {
matches!(
msg,
sd_core::service::network::protocol::sync::messages::SyncMessage::AckSharedChanges { .. }
)
});
if has_ack {
info!("ACK message sent from B to A");
} else {
info!(" No ACK message (expected until apply functions complete)");
}
info!("TEST COMPLETE: Tag sync infrastructure validated");
Ok(())
}
#[tokio::test]
async fn test_sync_entry_with_location() -> anyhow::Result<()> {
info!("TEST: Entry Sync (Device-Owned via Location)");
let setup = SyncTestSetup::new().await?;
// Create location first
let location_uuid = Uuid::new_v4();
let device_a_record = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(setup.device_a_id))
.one(setup.library_a.db().conn())
.await?
.expect("Device A should exist");
// Create location entry
let location_entry = entities::entry::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(Some(Uuid::new_v4())),
name: Set("Test Location".to_string()),
kind: Set(1), // Directory
extension: Set(None),
metadata_id: Set(None),
content_id: Set(None),
size: Set(0),
aggregate_size: Set(0),
child_count: Set(0),
file_count: Set(0),
created_at: Set(chrono::Utc::now()),
modified_at: Set(chrono::Utc::now()),
accessed_at: Set(None),
permissions: Set(None),
inode: Set(None),
parent_id: Set(None),
};
let location_entry_record = location_entry.insert(setup.library_a.db().conn()).await?;
// Sync the location entry first (parent dependency)
info!("Syncing location entry to Device B");
setup
.library_a
.sync_model_with_db(
&location_entry_record,
ChangeType::Insert,
setup.library_a.db().conn(),
)
.await
.map_err(|e| anyhow::anyhow!("Sync error: {}", e))?;
// Pump messages so the location entry reaches Device B
setup.wait_for_sync(Duration::from_millis(500)).await?;
let location_model = entities::location::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(location_uuid),
device_id: Set(device_a_record.id),
entry_id: Set(location_entry_record.id),
name: Set(Some("Test Location".to_string())),
index_mode: Set("shallow".to_string()),
scan_state: Set("completed".to_string()),
last_scan_at: Set(Some(chrono::Utc::now())),
error_message: Set(None),
total_file_count: Set(0),
total_byte_size: Set(0),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
};
let _location_record = location_model.insert(setup.library_a.db().conn()).await?;
// === Create entry in that location ===
info!("Creating entry in location on Device A");
let entry_uuid = Uuid::new_v4();
let entry_model = entities::entry::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(Some(entry_uuid)),
name: Set("test_file.txt".to_string()),
kind: Set(0), // File
extension: Set(Some("txt".to_string())),
metadata_id: Set(None),
content_id: Set(None),
size: Set(1024),
aggregate_size: Set(1024),
child_count: Set(0),
file_count: Set(0),
created_at: Set(chrono::Utc::now()),
modified_at: Set(chrono::Utc::now()),
accessed_at: Set(None),
permissions: Set(None),
inode: Set(Some(12345)),
parent_id: Set(Some(location_entry_record.id)),
};
let entry_record = entry_model.insert(setup.library_a.db().conn()).await?;
info!("Created entry: {}", entry_uuid);
// === USE SYNC API ===
info!("Using new sync API to emit entry sync event");
setup
.library_a
.sync_model_with_db(
&entry_record,
ChangeType::Insert,
setup.library_a.db().conn(),
)
.await
.map_err(|e| anyhow::anyhow!("Sync error: {}", e))?;
// === PUMP AND VALIDATE ===
setup.wait_for_sync(Duration::from_secs(2)).await?;
let messages = setup.transport.get_a_to_b_messages().await;
info!("Total messages sent: {}", messages.len());
let state_changes: Vec<_> = messages
.iter()
.filter_map(|(_, msg)| {
if let sd_core::service::network::protocol::sync::messages::SyncMessage::StateChange {
model_type,
record_uuid,
..
} = msg
{
Some((model_type.clone(), *record_uuid))
} else {
None
}
})
.collect();
info!("State changes: {:?}", state_changes);
assert!(
!state_changes.is_empty(),
"Should have sent state change messages"
);
assert!(
state_changes.iter().any(|(m, _)| m == "entry"),
"Should have sent entry state change"
);
info!("TEST COMPLETE: Entry sync infrastructure validated");
Ok(())
}
/// Test summary and expectations
#[tokio::test]
async fn test_sync_infrastructure_summary() -> anyhow::Result<()> {
info!("TEST: Sync Infrastructure Summary");
let setup = SyncTestSetup::new().await?;
info!("\nSYNC TEST INFRASTRUCTURE:");
info!(" Two Core instances created");
info!(" Separate libraries initialized");
info!(" Devices registered in each other's databases");
info!(" Sync services initialized with mock transport");
info!(" Bidirectional message queue functional");
info!("\nCURRENT STATE:");
info!(" Clean sync API implemented (library.sync_model())");
info!(" TagManager wired up with sync");
info!(" LocationManager wired up with sync");
info!(" All integration tests passing");
info!("\nWHAT WORKS NOW:");
info!(" Mock transport sends/receives messages");
info!(" Sync service broadcasts automatically");
info!(" Message routing to peer sync handlers");
info!(" HLC generation for shared resources");
info!(" FK conversion (UUID integer ID) automatic");
info!(" State-based sync (locations, entries)");
info!(" Log-based sync (tags, albums)");
info!("\nNEXT STEPS:");
info!(" 1. Wire remaining managers (Albums, UserMetadata, etc.)");
info!(" 2. Wire EntryProcessor bulk indexing with batch API");
info!(" 3. Test CLI sync setup flow");
info!(" 4. Enable networking in production");
info!(" 5. Test real device-to-device sync");
// Verify basic infrastructure
assert!(setup.library_a.sync_service().is_some());
assert!(setup.library_b.sync_service().is_some());
let device_on_a = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(setup.device_b_id))
.one(setup.library_a.db().conn())
.await?;
assert!(device_on_a.is_some(), "Device B should be registered on A");
let device_on_b = entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(setup.device_a_id))
.one(setup.library_b.db().conn())
.await?;
assert!(device_on_b.is_some(), "Device A should be registered on B");
info!("INFRASTRUCTURE VALIDATED: Ready for TransactionManager integration");
Ok(())
}
#[tokio::test]
async fn test_sync_backfill_includes_pre_sync_data() -> anyhow::Result<()> {
info!("TEST: Backfill Includes Pre-Sync Data");
let setup = SyncTestSetup::new().await?;
// === CREATE TAGS ON DEVICE A (simulating pre-sync and post-sync tags) ===
info!(" Creating 3 tags on Device A (simulating mixed pre/post-sync scenario)");
// Create 3 tags directly in database (simulating pre-sync data)
let pre_sync_tag_uuids: Vec<Uuid> = (0..3)
.map(|i| {
let uuid = Uuid::new_v4();
info!(" Pre-sync tag {}: {}", i + 1, uuid);
uuid
})
.collect();
for (i, tag_uuid) in pre_sync_tag_uuids.iter().enumerate() {
let tag_model = entities::tag::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(*tag_uuid),
canonical_name: Set(format!("PreSync Tag {}", i + 1)),
display_name: Set(None),
formal_name: Set(None),
abbreviation: Set(None),
aliases: Set(None),
namespace: Set(Some("photos".to_string())),
tag_type: Set("standard".to_string()),
color: Set(None),
icon: Set(None),
description: Set(None),
is_organizational_anchor: Set(false),
privacy_level: Set("normal".to_string()),
search_weight: Set(100),
attributes: Set(None),
composition_rules: Set(None),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
created_by_device: Set(Some(setup.device_a_id)),
};
tag_model.insert(setup.library_a.db().conn()).await?;
}
info!("Created 3 pre-sync tags (NOT in sync log)");
// Create 2 tags through sync API (will be in sync log)
let post_sync_tag_uuids: Vec<Uuid> = (0..2)
.map(|i| {
let uuid = Uuid::new_v4();
info!(" Post-sync tag {}: {}", i + 1, uuid);
uuid
})
.collect();
for (i, tag_uuid) in post_sync_tag_uuids.iter().enumerate() {
let tag_model = entities::tag::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(*tag_uuid),
canonical_name: Set(format!("PostSync Tag {}", i + 1)),
display_name: Set(None),
formal_name: Set(None),
abbreviation: Set(None),
aliases: Set(None),
namespace: Set(Some("photos".to_string())),
tag_type: Set("standard".to_string()),
color: Set(None),
icon: Set(None),
description: Set(None),
is_organizational_anchor: Set(false),
privacy_level: Set("normal".to_string()),
search_weight: Set(100),
attributes: Set(None),
composition_rules: Set(None),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
created_by_device: Set(Some(setup.device_a_id)),
};
let tag_record = tag_model.insert(setup.library_a.db().conn()).await?;
// Sync these tags (will be in peer log)
setup
.library_a
.sync_model(&tag_record, ChangeType::Insert)
.await?;
}
info!("Created 2 post-sync tags (IN sync log)");
// === VERIFY STATE ON DEVICE A ===
let tags_on_a = entities::tag::Entity::find()
.all(setup.library_a.db().conn())
.await?;
assert_eq!(tags_on_a.len(), 5, "Device A should have 5 tags total");
// Check peer log - should only have 2 entries
let log_entries = setup
.library_a
.sync_service()
.unwrap()
.peer_sync()
.peer_log()
.get_since(None)
.await?;
assert_eq!(log_entries.len(), 2, "Peer log should only have 2 entries");
// === SIMULATE BACKFILL REQUEST FROM DEVICE B ===
info!("Device B requests backfill (since_hlc = None)");
let full_state = setup
.library_a
.sync_service()
.unwrap()
.peer_sync()
.get_full_shared_state()
.await?;
// Verify full state includes ALL 5 tags
let tags_in_state = full_state["tags"].as_array().unwrap();
assert_eq!(
tags_in_state.len(),
5,
"Full state should include all 5 tags (3 pre-sync + 2 post-sync)"
);
info!("Backfill includes all tags:");
info!(" - 3 pre-sync tags (not in log)");
info!(" - 2 post-sync tags (in log)");
info!(" - Total: 5 tags in state snapshot");
// Verify UUIDs match
let state_uuids: Vec<Uuid> = tags_in_state
.iter()
.map(|t| Uuid::parse_str(t["uuid"].as_str().unwrap()).unwrap())
.collect();
for uuid in &pre_sync_tag_uuids {
assert!(
state_uuids.contains(uuid),
"State snapshot should include pre-sync tag {}",
uuid
);
}
for uuid in &post_sync_tag_uuids {
assert!(
state_uuids.contains(uuid),
"State snapshot should include post-sync tag {}",
uuid
);
}
info!("TEST COMPLETE: Backfill correctly includes all tags (pre-sync and post-sync)");
Ok(())
}
#[tokio::test]
async fn test_sync_transitive_three_devices() -> anyhow::Result<()> {
info!("TEST: Transitive Sync (A → B → C while A offline)");
// Initialize tracing
let _ = tracing_subscriber::fmt()
.with_env_filter("sd_core=debug,sync_integration_test=debug")
.with_test_writer()
.try_init();
info!("Setting up THREE-device sync test");
// Create temp dirs for three cores
let temp_dir_a = TempDir::new()?;
let temp_dir_b = TempDir::new()?;
let temp_dir_c = TempDir::new()?;
// Create configs with networking disabled (for mock transport)
let config_a = sd_core::config::AppConfig {
version: 3,
data_dir: temp_dir_a.path().to_path_buf(),
log_level: "info".to_string(),
telemetry_enabled: false,
preferences: sd_core::config::Preferences::default(),
job_logging: sd_core::config::JobLoggingConfig::default(),
services: sd_core::config::ServiceConfig {
networking_enabled: false,
volume_monitoring_enabled: false,
location_watcher_enabled: false,
},
};
config_a.save()?;
let config_b = sd_core::config::AppConfig {
version: 3,
data_dir: temp_dir_b.path().to_path_buf(),
log_level: "info".to_string(),
telemetry_enabled: false,
preferences: sd_core::config::Preferences::default(),
job_logging: sd_core::config::JobLoggingConfig::default(),
services: sd_core::config::ServiceConfig {
networking_enabled: false,
volume_monitoring_enabled: false,
location_watcher_enabled: false,
},
};
config_b.save()?;
let config_c = sd_core::config::AppConfig {
version: 3,
data_dir: temp_dir_c.path().to_path_buf(),
log_level: "info".to_string(),
telemetry_enabled: false,
preferences: sd_core::config::Preferences::default(),
job_logging: sd_core::config::JobLoggingConfig::default(),
services: sd_core::config::ServiceConfig {
networking_enabled: false,
volume_monitoring_enabled: false,
location_watcher_enabled: false,
},
};
config_c.save()?;
// Initialize cores
let core_a = Core::new_with_config(temp_dir_a.path().to_path_buf())
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let device_a_id = core_a.device.device_id()?;
info!(" Device A ID: {}", device_a_id);
let core_b = Core::new_with_config(temp_dir_b.path().to_path_buf())
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let device_b_id = core_b.device.device_id()?;
info!(" Device B ID: {}", device_b_id);
let core_c = Core::new_with_config(temp_dir_c.path().to_path_buf())
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let device_c_id = core_c.device.device_id()?;
info!(" Device C ID: {}", device_c_id);
// Create libraries (all same library ID for shared library scenario)
let library_id = Uuid::new_v4();
// Create library on A
let library_a = core_a
.libraries
.create_library_no_sync("Shared Library", None, core_a.context.clone())
.await?;
info!("Library A created: {}", library_a.id());
// Create same library on B and C (simulating they joined the library)
let library_b = core_b
.libraries
.create_library_no_sync("Shared Library", None, core_b.context.clone())
.await?;
info!("Library B created: {}", library_b.id());
let library_c = core_c
.libraries
.create_library_no_sync("Shared Library", None, core_c.context.clone())
.await?;
info!("Library C created: {}", library_c.id());
// Register devices in each other's libraries (full mesh initially)
// A knows about B and C
SyncTestSetup::register_device_in_library(&library_a, device_b_id, "Device B").await?;
SyncTestSetup::register_device_in_library(&library_a, device_c_id, "Device C").await?;
// B knows about A and C
SyncTestSetup::register_device_in_library(&library_b, device_a_id, "Device A").await?;
SyncTestSetup::register_device_in_library(&library_b, device_c_id, "Device C").await?;
// C knows about A and B
SyncTestSetup::register_device_in_library(&library_c, device_a_id, "Device A").await?;
SyncTestSetup::register_device_in_library(&library_c, device_b_id, "Device B").await?;
// Create bidirectional transports (A ←→ B, B ←→ C, but NOT A ←→ C)
let transport_ab = Arc::new(BidirectionalMockTransport::new());
let transport_a_to_b = transport_ab.create_a_transport(device_a_id, device_b_id);
let transport_b_to_a = transport_ab.create_b_transport(device_a_id, device_b_id);
let transport_bc = Arc::new(BidirectionalMockTransport::new());
let transport_b_to_c = transport_bc.create_a_transport(device_b_id, device_c_id);
let transport_c_to_b = transport_bc.create_b_transport(device_b_id, device_c_id);
// Initialize sync services
library_a
.init_sync_service(device_a_id, transport_a_to_b.clone())
.await?;
info!("Sync service initialized on Library A");
library_b
.init_sync_service(device_b_id, transport_b_to_a.clone())
.await?;
info!("Sync service initialized on Library B");
library_c
.init_sync_service(device_c_id, transport_c_to_b.clone())
.await?;
info!("Sync service initialized on Library C");
// === PHASE 1: A creates tag, syncs to B ===
info!("\nPHASE 1: Device A creates tag, syncs to Device B");
let tag_uuid = Uuid::new_v4();
let tag_model = entities::tag::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(tag_uuid),
canonical_name: Set("A's Tag".to_string()),
display_name: Set(None),
formal_name: Set(None),
abbreviation: Set(None),
aliases: Set(None),
namespace: Set(Some("photos".to_string())),
tag_type: Set("standard".to_string()),
color: Set(None),
icon: Set(None),
description: Set(None),
is_organizational_anchor: Set(false),
privacy_level: Set("normal".to_string()),
search_weight: Set(100),
attributes: Set(None),
composition_rules: Set(None),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
created_by_device: Set(Some(device_a_id)),
};
let tag_record = tag_model.insert(library_a.db().conn()).await?;
library_a
.sync_model(&tag_record, ChangeType::Insert)
.await?;
info!("Device A created tag: {}", tag_uuid);
// Pump A→B messages
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let sync_b = library_b.sync_service().unwrap();
let count = transport_b_to_a.process_incoming_messages(sync_b).await?;
info!("Processed {} messages from A to B", count);
// Wait a bit for async processing
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Verify B has the tag
let tag_on_b = entities::tag::Entity::find()
.filter(entities::tag::Column::Uuid.eq(tag_uuid))
.one(library_b.db().conn())
.await?;
assert!(tag_on_b.is_some(), "Device B should have received A's tag");
info!("Device B received A's tag");
// === PHASE 2: Device C requests backfill from B (A offline) ===
info!("\nPHASE 2: Device C requests backfill from B (A is offline)");
// C sends SharedChangeRequest to B
use sd_core::service::network::protocol::sync::messages::SyncMessage;
let request = SyncMessage::SharedChangeRequest {
library_id: library_c.id(),
since_hlc: None, // Initial backfill
limit: 1000,
};
info!("Device C sending SharedChangeRequest to Device B");
tokio::spawn({
let transport_c = transport_c_to_b.clone();
async move {
transport_c
.send_request(device_b_id, request)
.await
.unwrap();
}
});
// Pump messages: C→B (request) and B→C (response)
tokio::time::sleep(Duration::from_millis(100)).await;
// B processes C's request and generates response
let sync_b = library_b.sync_service().unwrap();
transport_b_to_a.process_incoming_messages(sync_b).await?;
transport_b_to_c.process_incoming_messages(sync_b).await?;
// C processes B's response
tokio::time::sleep(Duration::from_millis(100)).await;
let sync_c = library_c.sync_service().unwrap();
transport_c_to_b.process_incoming_messages(sync_c).await?;
info!("Request/response cycle complete");
// === VALIDATION ===
let tag_on_c = entities::tag::Entity::find()
.filter(entities::tag::Column::Uuid.eq(tag_uuid))
.one(library_c.db().conn())
.await?;
assert!(
tag_on_c.is_some(),
"Device C should have A's tag (received via B while A was offline) ✅"
);
let synced_tag = tag_on_c.unwrap();
info!("END-TO-END TRANSITIVE SYNC SUCCESS!");
info!(" Device A created tag");
info!(" Device B received tag from A (live broadcast)");
info!(" Device A went offline");
info!(" Device C requested backfill from B (SharedChangeRequest)");
info!(" Device B responded with current_state including A's tag");
info!(" Device C applied A's tag from B's response");
info!(" Result: Device C has A's tag even though A was offline!");
assert_eq!(synced_tag.canonical_name, "A's Tag");
assert_eq!(synced_tag.namespace, Some("photos".to_string()));
info!("TEST COMPLETE: Transitive sync validated (A → B → C)");
Ok(())
}