From 1d49c98e004e046eca9b16af30e5e5aed6cebbcd Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 19 Oct 2025 14:49:53 -0700 Subject: [PATCH] feat: implement connection caching and reuse with updated dependencies --- .../service/network/core/sync_transport.rs | 25 +++--- .../src/service/network/protocol/messaging.rs | 63 ++++----------- .../service/network/protocol/pairing/mod.rs | 70 +++------------- core/src/service/network/utils/connection.rs | 80 +++++++++++++++++++ core/src/service/network/utils/mod.rs | 2 + 5 files changed, 122 insertions(+), 118 deletions(-) create mode 100644 core/src/service/network/utils/connection.rs diff --git a/core/src/service/network/core/sync_transport.rs b/core/src/service/network/core/sync_transport.rs index 701dfa0e5..1d0502850 100644 --- a/core/src/service/network/core/sync_transport.rs +++ b/core/src/service/network/core/sync_transport.rs @@ -5,7 +5,7 @@ use crate::{ infra::sync::NetworkTransport, - service::network::{protocol::sync::messages::SyncMessage, NetworkingError}, + service::network::{protocol::sync::messages::SyncMessage, utils, NetworkingError}, }; use anyhow::Result; use std::sync::Arc; @@ -27,8 +27,9 @@ impl NetworkTransport for NetworkingService { /// /// 1. Look up NodeId for device UUID via DeviceRegistry /// 2. Serialize the SyncMessage to JSON bytes - /// 3. Send via Iroh endpoint using the sync protocol ALPN - /// 4. Handle errors gracefully (device may be offline) + /// 3. Get or reuse cached connection (Iroh best practice) + /// 4. Open new stream on existing connection (essentially 0 RTT) + /// 5. Handle errors gracefully (device may be offline) async fn send_sync_message(&self, target_device: Uuid, message: SyncMessage) -> Result<()> { // 1. Look up NodeId for device UUID let node_id = { @@ -55,25 +56,31 @@ impl NetworkTransport for NetworkingService { let bytes = serde_json::to_vec(&message) .map_err(|e| anyhow::anyhow!("Failed to serialize sync message: {}", e))?; - // 3. Send via Iroh endpoint + // 3. Get or reuse cached connection let endpoint = self .endpoint .as_ref() .ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?; - // Open a connection and send - // Note: Iroh handles connection pooling, so repeated calls are efficient - let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| { + let conn = utils::get_or_create_connection( + self.active_connections.clone(), + endpoint, + node_id, + SYNC_ALPN, + &self.logger, + ) + .await + .map_err(|e| { warn!( device_uuid = %target_device, node_id = %node_id, error = %e, - "Failed to connect to device for sync" + "Failed to get connection to device for sync" ); anyhow::anyhow!("Failed to connect to {}: {}", target_device, e) })?; - // Open a unidirectional stream and send the message + // 4. Open a unidirectional stream and send the message let mut send = conn .open_uni() .await diff --git a/core/src/service/network/protocol/messaging.rs b/core/src/service/network/protocol/messaging.rs index 550597213..d27c5e92f 100644 --- a/core/src/service/network/protocol/messaging.rs +++ b/core/src/service/network/protocol/messaging.rs @@ -1,7 +1,7 @@ //! Basic messaging protocol handler use super::{library_messages::LibraryMessage, ProtocolEvent, ProtocolHandler}; -use crate::service::network::{NetworkingError, Result}; +use crate::service::network::{utils, NetworkingError, Result}; use async_trait::async_trait; use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId}; use serde::{Deserialize, Serialize}; @@ -395,51 +395,6 @@ impl MessagingProtocolHandler { } } - /// Get or create a connection to a specific node - /// Implements Iroh best practice of reusing persistent connections - async fn get_or_create_connection( - &self, - node_id: NodeId, - alpn: &'static [u8], - ) -> Result { - // Check cache first - { - let connections = self.connections.read().await; - if let Some(conn) = connections.get(&node_id) { - if conn.close_reason().is_none() { - tracing::debug!("Reusing existing connection to node {}", node_id); - return Ok(conn.clone()); - } else { - tracing::debug!( - "Cached connection to node {} is closed, creating new one", - node_id - ); - } - } - } - - // Create new connection - let endpoint = self.endpoint.as_ref().ok_or_else(|| { - NetworkingError::ConnectionFailed("No endpoint available".to_string()) - })?; - - let node_addr = NodeAddr::new(node_id); - tracing::debug!("Creating new connection to node {}", node_id); - - let conn = endpoint - .connect(node_addr, alpn) - .await - .map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to connect: {}", e)))?; - - // Cache the connection - { - let mut connections = self.connections.write().await; - connections.insert(node_id, conn.clone()); - } - - tracing::info!("Created new connection to node {}", node_id); - Ok(conn) - } /// Send a library message to a remote node and wait for response /// Uses cached connections and creates new streams (Iroh best practice) @@ -454,9 +409,19 @@ impl MessagingProtocolHandler { tracing::debug!("Sending library message to node {}: {:?}", node_id, message); // Get or create cached connection - let conn = self - .get_or_create_connection(node_id, crate::service::network::core::MESSAGING_ALPN) - .await?; + let endpoint = self.endpoint.as_ref().ok_or_else(|| { + NetworkingError::ConnectionFailed("No endpoint available".to_string()) + })?; + + let logger: Arc = Arc::new(utils::SilentLogger); + let conn = utils::get_or_create_connection( + self.connections.clone(), + endpoint, + node_id, + crate::service::network::core::MESSAGING_ALPN, + &logger, + ) + .await?; // Create new stream on existing connection let (mut send, mut recv) = conn.open_bi().await.map_err(|e| { diff --git a/core/src/service/network/protocol/pairing/mod.rs b/core/src/service/network/protocol/pairing/mod.rs index 845711ded..b7bebc44f 100644 --- a/core/src/service/network/protocol/pairing/mod.rs +++ b/core/src/service/network/protocol/pairing/mod.rs @@ -18,7 +18,7 @@ pub use types::{PairingAdvertisement, PairingCode, PairingRole, PairingSession, use super::{ProtocolEvent, ProtocolHandler}; use crate::service::network::{ device::{DeviceInfo, DeviceRegistry, SessionKeys}, - utils::{identity::NetworkFingerprint, logging::NetworkLogger, NetworkIdentity}, + utils::{self, identity::NetworkFingerprint, logging::NetworkLogger, NetworkIdentity}, NetworkingError, Result, }; use async_trait::async_trait; @@ -330,61 +330,6 @@ impl PairingProtocolHandler { Ok(()) } - /// Get or create a connection to a specific node - /// This method implements the Iroh best practice of reusing connections - /// and creating new streams for each message exchange - async fn get_or_create_connection( - &self, - node_id: NodeId, - alpn: &'static [u8], - ) -> Result { - { - let connections = self.connections.read().await; - if let Some(conn) = connections.get(&node_id) { - if conn.close_reason().is_none() { - self.log_debug(&format!( - "Reusing existing connection to node {}", - node_id - )) - .await; - return Ok(conn.clone()); - } else { - self.log_debug(&format!( - "Cached connection to node {} is closed, creating new one", - node_id - )) - .await; - } - } - } - - let endpoint = self.endpoint.as_ref().ok_or_else(|| { - NetworkingError::ConnectionFailed("No endpoint available".to_string()) - })?; - - let node_addr = NodeAddr::new(node_id); - self.log_debug(&format!( - "Creating new connection to node {} with ALPN", - node_id - )) - .await; - - let conn = endpoint.connect(node_addr, alpn).await.map_err(|e| { - NetworkingError::ConnectionFailed(format!("Failed to connect to {}: {}", node_id, e)) - })?; - - { - let mut connections = self.connections.write().await; - connections.insert(node_id, conn.clone()); - self.log_info(&format!( - "Established and cached new connection to node {}", - node_id - )) - .await; - } - - Ok(conn) - } /// Get device info for advertising in DHT records pub async fn get_device_info(&self) -> Result { @@ -659,15 +604,20 @@ impl PairingProtocolHandler { /// - Keeps connections alive for future messages pub async fn send_pairing_message_to_node( &self, - _endpoint: &Endpoint, + endpoint: &Endpoint, node_id: NodeId, message: &PairingMessage, ) -> Result> { use tokio::io::{AsyncReadExt, AsyncWriteExt}; - let conn = self - .get_or_create_connection(node_id, crate::service::network::core::PAIRING_ALPN) - .await?; + let conn = utils::get_or_create_connection( + self.connections.clone(), + endpoint, + node_id, + crate::service::network::core::PAIRING_ALPN, + &self.logger, + ) + .await?; let (mut send, mut recv) = conn.open_bi().await.map_err(|e| { NetworkingError::ConnectionFailed(format!("Failed to open stream: {}", e)) diff --git a/core/src/service/network/utils/connection.rs b/core/src/service/network/utils/connection.rs new file mode 100644 index 000000000..a555f4bff --- /dev/null +++ b/core/src/service/network/utils/connection.rs @@ -0,0 +1,80 @@ +//! Shared connection management utilities for Iroh networking +//! +//! Provides connection caching helpers following Iroh best practices: +//! - One persistent connection per device pair +//! - Lightweight streams for individual messages (0 RTT overhead) +//! - Automatic connection reuse across all protocols + +use crate::service::network::{NetworkingError, Result}; +use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use super::logging::NetworkLogger; + +/// Get or create a connection to a specific node +/// +/// This implements Iroh's best practice of reusing persistent connections +/// and creating new streams for each message exchange. +/// +/// # Arguments +/// * `connections` - Shared connection cache (all protocols use the same cache) +/// * `endpoint` - Iroh endpoint for creating new connections +/// * `node_id` - Target node to connect to +/// * `alpn` - Protocol ALPN identifier +/// * `logger` - Logger for connection events +/// +/// # Returns +/// * `Ok(Connection)` - Either cached or newly created connection +/// * `Err(NetworkingError)` - If connection fails +pub async fn get_or_create_connection( + connections: Arc>>, + endpoint: &Endpoint, + node_id: NodeId, + alpn: &'static [u8], + logger: &Arc, +) -> Result { + // Check cache first + { + let connections_guard = connections.read().await; + if let Some(conn) = connections_guard.get(&node_id) { + if conn.close_reason().is_none() { + logger + .debug(&format!("Reusing existing connection to node {}", node_id)) + .await; + return Ok(conn.clone()); + } else { + logger + .debug(&format!( + "Cached connection to node {} is closed, creating new one", + node_id + )) + .await; + } + } + } + + // Create new connection + let node_addr = NodeAddr::new(node_id); + logger + .debug(&format!("Creating new connection to node {}", node_id)) + .await; + + let conn = endpoint + .connect(node_addr, alpn) + .await + .map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to connect: {}", e)))?; + + // Cache the connection + { + let mut connections_guard = connections.write().await; + connections_guard.insert(node_id, conn.clone()); + } + + logger + .info(&format!("Created new connection to node {}", node_id)) + .await; + + Ok(conn) +} diff --git a/core/src/service/network/utils/mod.rs b/core/src/service/network/utils/mod.rs index 4fe92ea7e..2df57a25c 100644 --- a/core/src/service/network/utils/mod.rs +++ b/core/src/service/network/utils/mod.rs @@ -1,7 +1,9 @@ //! Shared utilities for the networking system +pub mod connection; pub mod identity; pub mod logging; +pub use connection::get_or_create_connection; pub use identity::NetworkIdentity; pub use logging::{ConsoleLogger, NetworkLogger, SilentLogger};