From 4dc99c931b409fd326e64c350129135749d00a23 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 19 Oct 2025 17:56:29 -0700 Subject: [PATCH] feat: implement send sync request in network and sync service --- core/src/infra/sync/transport.rs | 61 +++++++++ .../src/service/network/protocol/messaging.rs | 50 ++++++++ .../network/protocol/sync/transport.rs | 115 +++++++++++++++++ core/src/service/network/transports/sync.rs | 119 ++++++++++++++++++ 4 files changed, 345 insertions(+) diff --git a/core/src/infra/sync/transport.rs b/core/src/infra/sync/transport.rs index 063a1f416..ad711bf4f 100644 --- a/core/src/infra/sync/transport.rs +++ b/core/src/infra/sync/transport.rs @@ -69,6 +69,33 @@ pub trait NetworkTransport: Send + Sync { /// Consider logging warnings rather than failing the entire operation. async fn send_sync_message(&self, target_device: Uuid, message: SyncMessage) -> Result<()>; + /// Send a sync request and wait for response (request/response pattern) + /// + /// Use for requests that expect responses: StateRequest, SharedChangeRequest, etc. + /// Uses bidirectional streams to receive the response. + /// + /// # Arguments + /// + /// * `target_device` - UUID of the target device + /// * `request` - The sync request message + /// + /// # Returns + /// + /// The response message from the peer + /// + /// # Errors + /// + /// Returns error if: + /// - Device is not reachable + /// - Network transport fails + /// - Response timeout (60s) + /// - Response is malformed + async fn send_sync_request( + &self, + target_device: Uuid, + request: SyncMessage, + ) -> Result; + /// Get list of currently connected sync partner devices /// /// Returns UUIDs of devices that are: @@ -136,6 +163,7 @@ impl MockNetworkTransport { } } +// NOTE: This isn't actually used, I think #[cfg(test)] #[async_trait::async_trait] impl NetworkTransport for MockNetworkTransport { @@ -147,6 +175,39 @@ impl NetworkTransport for MockNetworkTransport { Ok(()) } + async fn send_sync_request( + &self, + target_device: Uuid, + request: SyncMessage, + ) -> Result { + // Mock implementation: record the request and return a mock response + self.sent_messages + .lock() + .unwrap() + .push((target_device, request.clone())); + + // Return appropriate mock response based on request type + match request { + SyncMessage::StateRequest { library_id, .. } => Ok(SyncMessage::StateResponse { + library_id, + model_type: "device".to_string(), + device_id: target_device, + records: vec![], + checkpoint: None, + has_more: false, + }), + SyncMessage::SharedChangeRequest { library_id, .. } => { + Ok(SyncMessage::SharedChangeResponse { + library_id, + entries: vec![], + current_state: None, + has_more: false, + }) + } + _ => Err(anyhow::anyhow!("Mock: unexpected request type")), + } + } + async fn get_connected_sync_partners(&self) -> Result> { // For tests, return empty list Ok(vec![]) diff --git a/core/src/service/network/protocol/messaging.rs b/core/src/service/network/protocol/messaging.rs index d27c5e92f..cf7f82631 100644 --- a/core/src/service/network/protocol/messaging.rs +++ b/core/src/service/network/protocol/messaging.rs @@ -392,6 +392,56 @@ impl MessagingProtocolHandler { // This is a response, not a request Ok(Vec::new()) } + + LibraryMessage::LibraryStateRequest { + request_id, + library_id, + } => { + tracing::info!("Received LibraryStateRequest for library {}", library_id); + + let context = self.context.as_ref().ok_or_else(|| { + NetworkingError::Protocol("Context not available".to_string()) + })?; + + let library_manager = context.libraries().await; + let library = library_manager.get_library(library_id).await.ok_or_else(|| { + NetworkingError::Protocol(format!("Library {} not found", library_id)) + })?; + + let db = library.db(); + + // Query all device slugs from this library + use crate::infra::db::entities; + use sea_orm::EntityTrait; + + let devices = entities::device::Entity::find() + .all(db.conn()) + .await + .map_err(|e| NetworkingError::Protocol(format!("Database error: {}", e)))?; + + let device_slugs: Vec = devices.iter().map(|d| d.slug.clone()).collect(); + + tracing::info!( + "Returning {} device slugs for library {}", + device_slugs.len(), + library_id + ); + + let response = Message::Library(LibraryMessage::LibraryStateResponse { + request_id, + library_id, + library_name: library.name().await, + device_slugs, + device_count: devices.len(), + }); + + serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e)) + } + + LibraryMessage::LibraryStateResponse { .. } => { + // This is a response, not a request + Ok(Vec::new()) + } } } diff --git a/core/src/service/network/protocol/sync/transport.rs b/core/src/service/network/protocol/sync/transport.rs index 73526f642..538e695b7 100644 --- a/core/src/service/network/protocol/sync/transport.rs +++ b/core/src/service/network/protocol/sync/transport.rs @@ -104,6 +104,121 @@ impl NetworkTransport for NetworkingService { Ok(()) } + /// Send a sync request and wait for response + /// + /// Uses bidirectional streams for proper request/response pattern (Iroh best practice) + async fn send_sync_request( + &self, + target_device: Uuid, + request: SyncMessage, + ) -> Result { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::time::{timeout, Duration}; + + // Look up NodeId for device UUID + let node_id = { + let registry = self.device_registry.read().await; + registry + .get_node_id_for_device(target_device) + .ok_or_else(|| { + anyhow::anyhow!( + "Device {} not found in registry (not paired or offline)", + target_device + ) + })? + }; + + debug!( + device_uuid = %target_device, + node_id = %node_id, + message_type = ?std::mem::discriminant(&request), + library_id = %request.library_id(), + "Sending sync request" + ); + + // Get endpoint + let endpoint = self + .endpoint + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?; + + // Connect with SYNC_ALPN + let conn = endpoint.connect(node_id.into(), SYNC_ALPN).await.map_err(|e| { + warn!( + device_uuid = %target_device, + node_id = %node_id, + error = %e, + "Failed to connect to device for sync request" + ); + anyhow::anyhow!("Failed to connect to {}: {}", target_device, e) + })?; + + // Open bidirectional stream + let (mut send, mut recv) = conn + .open_bi() + .await + .map_err(|e| anyhow::anyhow!("Failed to open bidirectional stream: {}", e))?; + + // Serialize and send request + let req_bytes = serde_json::to_vec(&request) + .map_err(|e| anyhow::anyhow!("Failed to serialize sync request: {}", e))?; + + let len = req_bytes.len() as u32; + send.write_all(&len.to_be_bytes()) + .await + .map_err(|e| anyhow::anyhow!("Failed to send length: {}", e))?; + send.write_all(&req_bytes) + .await + .map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?; + + // Properly close send stream + send.finish() + .map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?; + + debug!("Sync request sent, waiting for response..."); + + // Read response with timeout + let result = timeout(Duration::from_secs(60), async { + let mut len_buf = [0u8; 4]; + recv.read_exact(&mut len_buf).await.map_err(|e| { + anyhow::anyhow!("Failed to read response length: {}", e) + })?; + let resp_len = u32::from_be_bytes(len_buf) as usize; + + debug!("Receiving sync response of {} bytes", resp_len); + + let mut resp_buf = vec![0u8; resp_len]; + recv.read_exact(&mut resp_buf) + .await + .map_err(|e| anyhow::anyhow!("Failed to read response: {}", e))?; + Ok::<_, anyhow::Error>(resp_buf) + }) + .await; + + let resp_buf = match result { + Ok(Ok(buf)) => buf, + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(anyhow::anyhow!( + "Sync request timed out after 60s - peer {} not responding", + target_device + )) + } + }; + + // Deserialize response + let response: SyncMessage = serde_json::from_slice(&resp_buf) + .map_err(|e| anyhow::anyhow!("Failed to deserialize sync response: {}", e))?; + + debug!( + device_uuid = %target_device, + response_type = ?std::mem::discriminant(&response), + "Received sync response" + ); + + Ok(response) + } + /// Get list of currently connected sync partner devices /// /// Returns device UUIDs that are both: diff --git a/core/src/service/network/transports/sync.rs b/core/src/service/network/transports/sync.rs index 4a60add4c..35bd88fed 100644 --- a/core/src/service/network/transports/sync.rs +++ b/core/src/service/network/transports/sync.rs @@ -101,6 +101,121 @@ impl NetworkTransport for NetworkingService { Ok(()) } + /// Send a sync request and wait for response + /// + /// Uses bidirectional streams for proper request/response pattern (Iroh best practice) + async fn send_sync_request( + &self, + target_device: Uuid, + request: SyncMessage, + ) -> Result { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::time::{timeout, Duration}; + + // Look up NodeId for device UUID + let device_registry_arc = self.device_registry(); + let node_id = { + let registry = device_registry_arc.read().await; + registry + .get_node_id_for_device(target_device) + .ok_or_else(|| { + anyhow::anyhow!( + "Device {} not found in registry (not paired or offline)", + target_device + ) + })? + }; + + debug!( + device_uuid = %target_device, + node_id = %node_id, + message_type = ?std::mem::discriminant(&request), + library_id = %request.library_id(), + "Sending sync request" + ); + + // Get endpoint + let endpoint = self + .endpoint() + .ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?; + + // Connect with SYNC_ALPN + let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| { + warn!( + device_uuid = %target_device, + node_id = %node_id, + error = %e, + "Failed to connect to device for sync request" + ); + anyhow::anyhow!("Failed to connect to {}: {}", target_device, e) + })?; + + // Open bidirectional stream + let (mut send, mut recv) = conn + .open_bi() + .await + .map_err(|e| anyhow::anyhow!("Failed to open bidirectional stream: {}", e))?; + + // Serialize and send request + let req_bytes = serde_json::to_vec(&request) + .map_err(|e| anyhow::anyhow!("Failed to serialize sync request: {}", e))?; + + let len = req_bytes.len() as u32; + send.write_all(&len.to_be_bytes()) + .await + .map_err(|e| anyhow::anyhow!("Failed to send length: {}", e))?; + send.write_all(&req_bytes) + .await + .map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?; + + // Properly close send stream + send.finish() + .map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?; + + debug!("Sync request sent, waiting for response..."); + + // Read response with timeout + let result = timeout(Duration::from_secs(60), async { + let mut len_buf = [0u8; 4]; + recv.read_exact(&mut len_buf).await.map_err(|e| { + anyhow::anyhow!("Failed to read response length: {}", e) + })?; + let resp_len = u32::from_be_bytes(len_buf) as usize; + + debug!("Receiving sync response of {} bytes", resp_len); + + let mut resp_buf = vec![0u8; resp_len]; + recv.read_exact(&mut resp_buf) + .await + .map_err(|e| anyhow::anyhow!("Failed to read response: {}", e))?; + Ok::<_, anyhow::Error>(resp_buf) + }) + .await; + + let resp_buf = match result { + Ok(Ok(buf)) => buf, + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(anyhow::anyhow!( + "Sync request timed out after 60s - peer {} not responding", + target_device + )) + } + }; + + // Deserialize response + let response: SyncMessage = serde_json::from_slice(&resp_buf) + .map_err(|e| anyhow::anyhow!("Failed to deserialize sync response: {}", e))?; + + debug!( + device_uuid = %target_device, + response_type = ?std::mem::discriminant(&response), + "Received sync response" + ); + + Ok(response) + } + /// Get list of currently connected sync partner devices /// /// Returns device UUIDs that are both: @@ -149,6 +264,10 @@ impl NetworkTransport for NetworkingService { false } + + fn transport_name(&self) -> &'static str { + "NetworkingService" + } } #[cfg(test)]