From 08f73ee67c5cbf44e7c8f0f3940ae27a2bd45455 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 19 Oct 2025 17:56:29 -0700 Subject: [PATCH] feat: update sync protocol handler and network transport --- core/src/lib.rs | 5 +- .../service/network/protocol/sync/handler.rs | 107 +++++++++++++++++- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index b30090408..2cdd42ebb 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -280,7 +280,10 @@ impl Core { info!("Network event receiver wired to PeerSync for library {}", library.id()); // Create and register sync protocol handler for this library - let mut sync_handler = service::network::protocol::SyncProtocolHandler::new(library.id()); + let mut sync_handler = service::network::protocol::SyncProtocolHandler::new( + library.id(), + networking.device_registry(), + ); sync_handler.set_peer_sync(peer_sync.clone()); sync_handler.set_backfill_manager(sync_service.backfill_manager().clone()); diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index 584d07181..ca2a42f09 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -20,11 +20,15 @@ pub struct SyncProtocolHandler { library_id: Uuid, peer_sync: Option>, backfill_manager: Option>, + device_registry: Arc>, } impl SyncProtocolHandler { /// Create a new sync protocol handler - pub fn new(library_id: Uuid) -> Self { + pub fn new( + library_id: Uuid, + device_registry: Arc>, + ) -> Self { info!( library_id = %library_id, "Creating SyncProtocolHandler for leaderless hybrid sync" @@ -33,6 +37,7 @@ impl SyncProtocolHandler { library_id, peer_sync: None, backfill_manager: None, + device_registry, } } @@ -445,11 +450,91 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler async fn handle_stream( &self, - _send: Box, - _recv: Box, - _remote_node_id: iroh::NodeId, + mut send: Box, + mut recv: Box, + remote_node_id: iroh::NodeId, ) { - warn!("SyncProtocolHandler::handle_stream called - not used in request/response model"); + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + // Map node_id to device_id using device registry + let from_device = { + let registry = self.device_registry.read().await; + registry.get_device_by_node(remote_node_id) + }; + + let from_device = match from_device { + Some(id) => id, + None => { + tracing::warn!("Received sync stream from unknown node {}", remote_node_id); + return; + } + }; + + // Read request with length prefix + let mut len_buf = [0u8; 4]; + if let Err(e) = recv.read_exact(&mut len_buf).await { + tracing::error!("Failed to read sync request length: {}", e); + return; + } + let req_len = u32::from_be_bytes(len_buf) as usize; + + let mut req_buf = vec![0u8; req_len]; + if let Err(e) = recv.read_exact(&mut req_buf).await { + tracing::error!("Failed to read sync request: {}", e); + return; + } + + // Deserialize request + let request: SyncMessage = match serde_json::from_slice(&req_buf) { + Ok(msg) => msg, + Err(e) => { + tracing::error!("Failed to deserialize sync request: {}", e); + return; + } + }; + + tracing::debug!( + from_device = %from_device, + message_type = ?std::mem::discriminant(&request), + "Processing sync request via bidirectional stream" + ); + + // Handle the request and get response + let response_opt = match self.handle_sync_message(from_device, request).await { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Failed to handle sync message: {}", e); + return; + } + }; + + // Send response if handler returned one + if let Some(response) = response_opt { + let resp_bytes = match serde_json::to_vec(&response) { + Ok(bytes) => bytes, + Err(e) => { + tracing::error!("Failed to serialize sync response: {}", e); + return; + } + }; + + let len = resp_bytes.len() as u32; + if let Err(e) = send.write_all(&len.to_be_bytes()).await { + tracing::error!("Failed to send response length: {}", e); + return; + } + if let Err(e) = send.write_all(&resp_bytes).await { + tracing::error!("Failed to send response: {}", e); + return; + } + let _ = send.flush().await; + + tracing::debug!( + from_device = %from_device, + response_bytes = resp_bytes.len(), + "Sent sync response" + ); + } } async fn handle_request(&self, from_device: Uuid, request: Vec) -> Result> { @@ -509,7 +594,17 @@ mod tests { #[test] fn test_handler_creation() { - let handler = SyncProtocolHandler::new(Uuid::new_v4()); + // Test uses mock registry + use crate::service::network::device::DeviceRegistry; + use crate::device::DeviceManager; + use std::path::PathBuf; + + let device_manager = Arc::new(DeviceManager::new(PathBuf::from("/tmp/test")).unwrap()); + let logger = Arc::new(crate::service::network::utils::SilentLogger); + let registry = DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap(); + let device_registry = Arc::new(tokio::sync::RwLock::new(registry)); + + let handler = SyncProtocolHandler::new(Uuid::new_v4(), device_registry); assert_eq!(handler.protocol_name(), "sync"); } }