feat: implement send sync request in network and sync service

This commit is contained in:
Jamie Pine
2025-10-19 17:56:29 -07:00
parent 08f73ee67c
commit 4dc99c931b
4 changed files with 345 additions and 0 deletions

View File

@@ -69,6 +69,33 @@ pub trait NetworkTransport: Send + Sync {
/// Consider logging warnings rather than failing the entire operation.
async fn send_sync_message(&self, target_device: Uuid, message: SyncMessage) -> Result<()>;
/// Send a sync request and wait for response (request/response pattern)
///
/// Use for requests that expect responses: StateRequest, SharedChangeRequest, etc.
/// Uses bidirectional streams to receive the response.
///
/// # Arguments
///
/// * `target_device` - UUID of the target device
/// * `request` - The sync request message
///
/// # Returns
///
/// The response message from the peer
///
/// # Errors
///
/// Returns error if:
/// - Device is not reachable
/// - Network transport fails
/// - Response timeout (60s)
/// - Response is malformed
async fn send_sync_request(
&self,
target_device: Uuid,
request: SyncMessage,
) -> Result<SyncMessage>;
/// Get list of currently connected sync partner devices
///
/// Returns UUIDs of devices that are:
@@ -136,6 +163,7 @@ impl MockNetworkTransport {
}
}
// NOTE: This isn't actually used, I think
#[cfg(test)]
#[async_trait::async_trait]
impl NetworkTransport for MockNetworkTransport {
@@ -147,6 +175,39 @@ impl NetworkTransport for MockNetworkTransport {
Ok(())
}
async fn send_sync_request(
&self,
target_device: Uuid,
request: SyncMessage,
) -> Result<SyncMessage> {
// Mock implementation: record the request and return a mock response
self.sent_messages
.lock()
.unwrap()
.push((target_device, request.clone()));
// Return appropriate mock response based on request type
match request {
SyncMessage::StateRequest { library_id, .. } => Ok(SyncMessage::StateResponse {
library_id,
model_type: "device".to_string(),
device_id: target_device,
records: vec![],
checkpoint: None,
has_more: false,
}),
SyncMessage::SharedChangeRequest { library_id, .. } => {
Ok(SyncMessage::SharedChangeResponse {
library_id,
entries: vec![],
current_state: None,
has_more: false,
})
}
_ => Err(anyhow::anyhow!("Mock: unexpected request type")),
}
}
async fn get_connected_sync_partners(&self) -> Result<Vec<Uuid>> {
// For tests, return empty list
Ok(vec![])

View File

@@ -392,6 +392,56 @@ impl MessagingProtocolHandler {
// This is a response, not a request
Ok(Vec::new())
}
LibraryMessage::LibraryStateRequest {
request_id,
library_id,
} => {
tracing::info!("Received LibraryStateRequest for library {}", library_id);
let context = self.context.as_ref().ok_or_else(|| {
NetworkingError::Protocol("Context not available".to_string())
})?;
let library_manager = context.libraries().await;
let library = library_manager.get_library(library_id).await.ok_or_else(|| {
NetworkingError::Protocol(format!("Library {} not found", library_id))
})?;
let db = library.db();
// Query all device slugs from this library
use crate::infra::db::entities;
use sea_orm::EntityTrait;
let devices = entities::device::Entity::find()
.all(db.conn())
.await
.map_err(|e| NetworkingError::Protocol(format!("Database error: {}", e)))?;
let device_slugs: Vec<String> = devices.iter().map(|d| d.slug.clone()).collect();
tracing::info!(
"Returning {} device slugs for library {}",
device_slugs.len(),
library_id
);
let response = Message::Library(LibraryMessage::LibraryStateResponse {
request_id,
library_id,
library_name: library.name().await,
device_slugs,
device_count: devices.len(),
});
serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e))
}
LibraryMessage::LibraryStateResponse { .. } => {
// This is a response, not a request
Ok(Vec::new())
}
}
}

View File

@@ -104,6 +104,121 @@ impl NetworkTransport for NetworkingService {
Ok(())
}
/// Send a sync request and wait for response
///
/// Uses bidirectional streams for proper request/response pattern (Iroh best practice)
async fn send_sync_request(
&self,
target_device: Uuid,
request: SyncMessage,
) -> Result<SyncMessage> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::{timeout, Duration};
// Look up NodeId for device UUID
let node_id = {
let registry = self.device_registry.read().await;
registry
.get_node_id_for_device(target_device)
.ok_or_else(|| {
anyhow::anyhow!(
"Device {} not found in registry (not paired or offline)",
target_device
)
})?
};
debug!(
device_uuid = %target_device,
node_id = %node_id,
message_type = ?std::mem::discriminant(&request),
library_id = %request.library_id(),
"Sending sync request"
);
// Get endpoint
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// Connect with SYNC_ALPN
let conn = endpoint.connect(node_id.into(), SYNC_ALPN).await.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync request"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Open bidirectional stream
let (mut send, mut recv) = conn
.open_bi()
.await
.map_err(|e| anyhow::anyhow!("Failed to open bidirectional stream: {}", e))?;
// Serialize and send request
let req_bytes = serde_json::to_vec(&request)
.map_err(|e| anyhow::anyhow!("Failed to serialize sync request: {}", e))?;
let len = req_bytes.len() as u32;
send.write_all(&len.to_be_bytes())
.await
.map_err(|e| anyhow::anyhow!("Failed to send length: {}", e))?;
send.write_all(&req_bytes)
.await
.map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?;
// Properly close send stream
send.finish()
.map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?;
debug!("Sync request sent, waiting for response...");
// Read response with timeout
let result = timeout(Duration::from_secs(60), async {
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await.map_err(|e| {
anyhow::anyhow!("Failed to read response length: {}", e)
})?;
let resp_len = u32::from_be_bytes(len_buf) as usize;
debug!("Receiving sync response of {} bytes", resp_len);
let mut resp_buf = vec![0u8; resp_len];
recv.read_exact(&mut resp_buf)
.await
.map_err(|e| anyhow::anyhow!("Failed to read response: {}", e))?;
Ok::<_, anyhow::Error>(resp_buf)
})
.await;
let resp_buf = match result {
Ok(Ok(buf)) => buf,
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(anyhow::anyhow!(
"Sync request timed out after 60s - peer {} not responding",
target_device
))
}
};
// Deserialize response
let response: SyncMessage = serde_json::from_slice(&resp_buf)
.map_err(|e| anyhow::anyhow!("Failed to deserialize sync response: {}", e))?;
debug!(
device_uuid = %target_device,
response_type = ?std::mem::discriminant(&response),
"Received sync response"
);
Ok(response)
}
/// Get list of currently connected sync partner devices
///
/// Returns device UUIDs that are both:

View File

@@ -101,6 +101,121 @@ impl NetworkTransport for NetworkingService {
Ok(())
}
/// Send a sync request and wait for response
///
/// Uses bidirectional streams for proper request/response pattern (Iroh best practice)
async fn send_sync_request(
&self,
target_device: Uuid,
request: SyncMessage,
) -> Result<SyncMessage> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::{timeout, Duration};
// Look up NodeId for device UUID
let device_registry_arc = self.device_registry();
let node_id = {
let registry = device_registry_arc.read().await;
registry
.get_node_id_for_device(target_device)
.ok_or_else(|| {
anyhow::anyhow!(
"Device {} not found in registry (not paired or offline)",
target_device
)
})?
};
debug!(
device_uuid = %target_device,
node_id = %node_id,
message_type = ?std::mem::discriminant(&request),
library_id = %request.library_id(),
"Sending sync request"
);
// Get endpoint
let endpoint = self
.endpoint()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// Connect with SYNC_ALPN
let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync request"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Open bidirectional stream
let (mut send, mut recv) = conn
.open_bi()
.await
.map_err(|e| anyhow::anyhow!("Failed to open bidirectional stream: {}", e))?;
// Serialize and send request
let req_bytes = serde_json::to_vec(&request)
.map_err(|e| anyhow::anyhow!("Failed to serialize sync request: {}", e))?;
let len = req_bytes.len() as u32;
send.write_all(&len.to_be_bytes())
.await
.map_err(|e| anyhow::anyhow!("Failed to send length: {}", e))?;
send.write_all(&req_bytes)
.await
.map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?;
// Properly close send stream
send.finish()
.map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?;
debug!("Sync request sent, waiting for response...");
// Read response with timeout
let result = timeout(Duration::from_secs(60), async {
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await.map_err(|e| {
anyhow::anyhow!("Failed to read response length: {}", e)
})?;
let resp_len = u32::from_be_bytes(len_buf) as usize;
debug!("Receiving sync response of {} bytes", resp_len);
let mut resp_buf = vec![0u8; resp_len];
recv.read_exact(&mut resp_buf)
.await
.map_err(|e| anyhow::anyhow!("Failed to read response: {}", e))?;
Ok::<_, anyhow::Error>(resp_buf)
})
.await;
let resp_buf = match result {
Ok(Ok(buf)) => buf,
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(anyhow::anyhow!(
"Sync request timed out after 60s - peer {} not responding",
target_device
))
}
};
// Deserialize response
let response: SyncMessage = serde_json::from_slice(&resp_buf)
.map_err(|e| anyhow::anyhow!("Failed to deserialize sync response: {}", e))?;
debug!(
device_uuid = %target_device,
response_type = ?std::mem::discriminant(&response),
"Received sync response"
);
Ok(response)
}
/// Get list of currently connected sync partner devices
///
/// Returns device UUIDs that are both:
@@ -149,6 +264,10 @@ impl NetworkTransport for NetworkingService {
false
}
fn transport_name(&self) -> &'static str {
"NetworkingService"
}
}
#[cfg(test)]