feat: update sync protocol handler and network transport

This commit is contained in:
Jamie Pine
2025-10-19 17:56:29 -07:00
parent 1d49c98e00
commit 08f73ee67c
2 changed files with 105 additions and 7 deletions

View File

@@ -280,7 +280,10 @@ impl Core {
info!("Network event receiver wired to PeerSync for library {}", library.id());
// Create and register sync protocol handler for this library
let mut sync_handler = service::network::protocol::SyncProtocolHandler::new(library.id());
let mut sync_handler = service::network::protocol::SyncProtocolHandler::new(
library.id(),
networking.device_registry(),
);
sync_handler.set_peer_sync(peer_sync.clone());
sync_handler.set_backfill_manager(sync_service.backfill_manager().clone());

View File

@@ -20,11 +20,15 @@ pub struct SyncProtocolHandler {
library_id: Uuid,
peer_sync: Option<Arc<crate::service::sync::peer::PeerSync>>,
backfill_manager: Option<Arc<crate::service::sync::BackfillManager>>,
device_registry: Arc<tokio::sync::RwLock<crate::service::network::device::DeviceRegistry>>,
}
impl SyncProtocolHandler {
/// Create a new sync protocol handler
pub fn new(library_id: Uuid) -> Self {
pub fn new(
library_id: Uuid,
device_registry: Arc<tokio::sync::RwLock<crate::service::network::device::DeviceRegistry>>,
) -> Self {
info!(
library_id = %library_id,
"Creating SyncProtocolHandler for leaderless hybrid sync"
@@ -33,6 +37,7 @@ impl SyncProtocolHandler {
library_id,
peer_sync: None,
backfill_manager: None,
device_registry,
}
}
@@ -445,11 +450,91 @@ impl crate::service::network::protocol::ProtocolHandler for SyncProtocolHandler
async fn handle_stream(
&self,
_send: Box<dyn tokio::io::AsyncWrite + Send + Unpin>,
_recv: Box<dyn tokio::io::AsyncRead + Send + Unpin>,
_remote_node_id: iroh::NodeId,
mut send: Box<dyn tokio::io::AsyncWrite + Send + Unpin>,
mut recv: Box<dyn tokio::io::AsyncRead + Send + Unpin>,
remote_node_id: iroh::NodeId,
) {
warn!("SyncProtocolHandler::handle_stream called - not used in request/response model");
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Map node_id to device_id using device registry
let from_device = {
let registry = self.device_registry.read().await;
registry.get_device_by_node(remote_node_id)
};
let from_device = match from_device {
Some(id) => id,
None => {
tracing::warn!("Received sync stream from unknown node {}", remote_node_id);
return;
}
};
// Read request with length prefix
let mut len_buf = [0u8; 4];
if let Err(e) = recv.read_exact(&mut len_buf).await {
tracing::error!("Failed to read sync request length: {}", e);
return;
}
let req_len = u32::from_be_bytes(len_buf) as usize;
let mut req_buf = vec![0u8; req_len];
if let Err(e) = recv.read_exact(&mut req_buf).await {
tracing::error!("Failed to read sync request: {}", e);
return;
}
// Deserialize request
let request: SyncMessage = match serde_json::from_slice(&req_buf) {
Ok(msg) => msg,
Err(e) => {
tracing::error!("Failed to deserialize sync request: {}", e);
return;
}
};
tracing::debug!(
from_device = %from_device,
message_type = ?std::mem::discriminant(&request),
"Processing sync request via bidirectional stream"
);
// Handle the request and get response
let response_opt = match self.handle_sync_message(from_device, request).await {
Ok(resp) => resp,
Err(e) => {
tracing::error!("Failed to handle sync message: {}", e);
return;
}
};
// Send response if handler returned one
if let Some(response) = response_opt {
let resp_bytes = match serde_json::to_vec(&response) {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!("Failed to serialize sync response: {}", e);
return;
}
};
let len = resp_bytes.len() as u32;
if let Err(e) = send.write_all(&len.to_be_bytes()).await {
tracing::error!("Failed to send response length: {}", e);
return;
}
if let Err(e) = send.write_all(&resp_bytes).await {
tracing::error!("Failed to send response: {}", e);
return;
}
let _ = send.flush().await;
tracing::debug!(
from_device = %from_device,
response_bytes = resp_bytes.len(),
"Sent sync response"
);
}
}
async fn handle_request(&self, from_device: Uuid, request: Vec<u8>) -> Result<Vec<u8>> {
@@ -509,7 +594,17 @@ mod tests {
#[test]
fn test_handler_creation() {
let handler = SyncProtocolHandler::new(Uuid::new_v4());
// Test uses mock registry
use crate::service::network::device::DeviceRegistry;
use crate::device::DeviceManager;
use std::path::PathBuf;
let device_manager = Arc::new(DeviceManager::new(PathBuf::from("/tmp/test")).unwrap());
let logger = Arc::new(crate::service::network::utils::SilentLogger);
let registry = DeviceRegistry::new(device_manager, PathBuf::from("/tmp/test"), logger).unwrap();
let device_registry = Arc::new(tokio::sync::RwLock::new(registry));
let handler = SyncProtocolHandler::new(Uuid::new_v4(), device_registry);
assert_eq!(handler.protocol_name(), "sync");
}
}