feat: implement connection caching and reuse with updated dependencies

This commit is contained in:
Jamie Pine
2025-10-19 14:49:53 -07:00
parent 2563a6dee9
commit 1d49c98e00
5 changed files with 122 additions and 118 deletions

View File

@@ -5,7 +5,7 @@
use crate::{
infra::sync::NetworkTransport,
service::network::{protocol::sync::messages::SyncMessage, NetworkingError},
service::network::{protocol::sync::messages::SyncMessage, utils, NetworkingError},
};
use anyhow::Result;
use std::sync::Arc;
@@ -27,8 +27,9 @@ impl NetworkTransport for NetworkingService {
///
/// 1. Look up NodeId for device UUID via DeviceRegistry
/// 2. Serialize the SyncMessage to JSON bytes
/// 3. Send via Iroh endpoint using the sync protocol ALPN
/// 4. Handle errors gracefully (device may be offline)
/// 3. Get or reuse cached connection (Iroh best practice)
/// 4. Open new stream on existing connection (essentially 0 RTT)
/// 5. Handle errors gracefully (device may be offline)
async fn send_sync_message(&self, target_device: Uuid, message: SyncMessage) -> Result<()> {
// 1. Look up NodeId for device UUID
let node_id = {
@@ -55,25 +56,31 @@ impl NetworkTransport for NetworkingService {
let bytes = serde_json::to_vec(&message)
.map_err(|e| anyhow::anyhow!("Failed to serialize sync message: {}", e))?;
// 3. Send via Iroh endpoint
// 3. Get or reuse cached connection
let endpoint = self
.endpoint
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// Open a connection and send
// Note: Iroh handles connection pooling, so repeated calls are efficient
let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
let conn = utils::get_or_create_connection(
self.active_connections.clone(),
endpoint,
node_id,
SYNC_ALPN,
&self.logger,
)
.await
.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync"
"Failed to get connection to device for sync"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Open a unidirectional stream and send the message
// 4. Open a unidirectional stream and send the message
let mut send = conn
.open_uni()
.await

View File

@@ -1,7 +1,7 @@
//! Basic messaging protocol handler
use super::{library_messages::LibraryMessage, ProtocolEvent, ProtocolHandler};
use crate::service::network::{NetworkingError, Result};
use crate::service::network::{utils, NetworkingError, Result};
use async_trait::async_trait;
use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId};
use serde::{Deserialize, Serialize};
@@ -395,51 +395,6 @@ impl MessagingProtocolHandler {
}
}
/// Get or create a connection to a specific node
/// Implements Iroh best practice of reusing persistent connections
async fn get_or_create_connection(
&self,
node_id: NodeId,
alpn: &'static [u8],
) -> Result<Connection> {
// Check cache first
{
let connections = self.connections.read().await;
if let Some(conn) = connections.get(&node_id) {
if conn.close_reason().is_none() {
tracing::debug!("Reusing existing connection to node {}", node_id);
return Ok(conn.clone());
} else {
tracing::debug!(
"Cached connection to node {} is closed, creating new one",
node_id
);
}
}
}
// Create new connection
let endpoint = self.endpoint.as_ref().ok_or_else(|| {
NetworkingError::ConnectionFailed("No endpoint available".to_string())
})?;
let node_addr = NodeAddr::new(node_id);
tracing::debug!("Creating new connection to node {}", node_id);
let conn = endpoint
.connect(node_addr, alpn)
.await
.map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
// Cache the connection
{
let mut connections = self.connections.write().await;
connections.insert(node_id, conn.clone());
}
tracing::info!("Created new connection to node {}", node_id);
Ok(conn)
}
/// Send a library message to a remote node and wait for response
/// Uses cached connections and creates new streams (Iroh best practice)
@@ -454,9 +409,19 @@ impl MessagingProtocolHandler {
tracing::debug!("Sending library message to node {}: {:?}", node_id, message);
// Get or create cached connection
let conn = self
.get_or_create_connection(node_id, crate::service::network::core::MESSAGING_ALPN)
.await?;
let endpoint = self.endpoint.as_ref().ok_or_else(|| {
NetworkingError::ConnectionFailed("No endpoint available".to_string())
})?;
let logger: Arc<dyn utils::NetworkLogger> = Arc::new(utils::SilentLogger);
let conn = utils::get_or_create_connection(
self.connections.clone(),
endpoint,
node_id,
crate::service::network::core::MESSAGING_ALPN,
&logger,
)
.await?;
// Create new stream on existing connection
let (mut send, mut recv) = conn.open_bi().await.map_err(|e| {

View File

@@ -18,7 +18,7 @@ pub use types::{PairingAdvertisement, PairingCode, PairingRole, PairingSession,
use super::{ProtocolEvent, ProtocolHandler};
use crate::service::network::{
device::{DeviceInfo, DeviceRegistry, SessionKeys},
utils::{identity::NetworkFingerprint, logging::NetworkLogger, NetworkIdentity},
utils::{self, identity::NetworkFingerprint, logging::NetworkLogger, NetworkIdentity},
NetworkingError, Result,
};
use async_trait::async_trait;
@@ -330,61 +330,6 @@ impl PairingProtocolHandler {
Ok(())
}
/// Get or create a connection to a specific node
/// This method implements the Iroh best practice of reusing connections
/// and creating new streams for each message exchange
async fn get_or_create_connection(
&self,
node_id: NodeId,
alpn: &'static [u8],
) -> Result<Connection> {
{
let connections = self.connections.read().await;
if let Some(conn) = connections.get(&node_id) {
if conn.close_reason().is_none() {
self.log_debug(&format!(
"Reusing existing connection to node {}",
node_id
))
.await;
return Ok(conn.clone());
} else {
self.log_debug(&format!(
"Cached connection to node {} is closed, creating new one",
node_id
))
.await;
}
}
}
let endpoint = self.endpoint.as_ref().ok_or_else(|| {
NetworkingError::ConnectionFailed("No endpoint available".to_string())
})?;
let node_addr = NodeAddr::new(node_id);
self.log_debug(&format!(
"Creating new connection to node {} with ALPN",
node_id
))
.await;
let conn = endpoint.connect(node_addr, alpn).await.map_err(|e| {
NetworkingError::ConnectionFailed(format!("Failed to connect to {}: {}", node_id, e))
})?;
{
let mut connections = self.connections.write().await;
connections.insert(node_id, conn.clone());
self.log_info(&format!(
"Established and cached new connection to node {}",
node_id
))
.await;
}
Ok(conn)
}
/// Get device info for advertising in DHT records
pub async fn get_device_info(&self) -> Result<DeviceInfo> {
@@ -659,15 +604,20 @@ impl PairingProtocolHandler {
/// - Keeps connections alive for future messages
pub async fn send_pairing_message_to_node(
&self,
_endpoint: &Endpoint,
endpoint: &Endpoint,
node_id: NodeId,
message: &PairingMessage,
) -> Result<Option<PairingMessage>> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let conn = self
.get_or_create_connection(node_id, crate::service::network::core::PAIRING_ALPN)
.await?;
let conn = utils::get_or_create_connection(
self.connections.clone(),
endpoint,
node_id,
crate::service::network::core::PAIRING_ALPN,
&self.logger,
)
.await?;
let (mut send, mut recv) = conn.open_bi().await.map_err(|e| {
NetworkingError::ConnectionFailed(format!("Failed to open stream: {}", e))

View File

@@ -0,0 +1,80 @@
//! Shared connection management utilities for Iroh networking
//!
//! Provides connection caching helpers following Iroh best practices:
//! - One persistent connection per device pair
//! - Lightweight streams for individual messages (0 RTT overhead)
//! - Automatic connection reuse across all protocols
use crate::service::network::{NetworkingError, Result};
use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::logging::NetworkLogger;
/// Get or create a connection to a specific node
///
/// This implements Iroh's best practice of reusing persistent connections
/// and creating new streams for each message exchange.
///
/// # Arguments
/// * `connections` - Shared connection cache (all protocols use the same cache)
/// * `endpoint` - Iroh endpoint for creating new connections
/// * `node_id` - Target node to connect to
/// * `alpn` - Protocol ALPN identifier
/// * `logger` - Logger for connection events
///
/// # Returns
/// * `Ok(Connection)` - Either cached or newly created connection
/// * `Err(NetworkingError)` - If connection fails
pub async fn get_or_create_connection(
connections: Arc<RwLock<HashMap<NodeId, Connection>>>,
endpoint: &Endpoint,
node_id: NodeId,
alpn: &'static [u8],
logger: &Arc<dyn NetworkLogger>,
) -> Result<Connection> {
// Check cache first
{
let connections_guard = connections.read().await;
if let Some(conn) = connections_guard.get(&node_id) {
if conn.close_reason().is_none() {
logger
.debug(&format!("Reusing existing connection to node {}", node_id))
.await;
return Ok(conn.clone());
} else {
logger
.debug(&format!(
"Cached connection to node {} is closed, creating new one",
node_id
))
.await;
}
}
}
// Create new connection
let node_addr = NodeAddr::new(node_id);
logger
.debug(&format!("Creating new connection to node {}", node_id))
.await;
let conn = endpoint
.connect(node_addr, alpn)
.await
.map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
// Cache the connection
{
let mut connections_guard = connections.write().await;
connections_guard.insert(node_id, conn.clone());
}
logger
.info(&format!("Created new connection to node {}", node_id))
.await;
Ok(conn)
}

View File

@@ -1,7 +1,9 @@
//! Shared utilities for the networking system
pub mod connection;
pub mod identity;
pub mod logging;
pub use connection::get_or_create_connection;
pub use identity::NetworkIdentity;
pub use logging::{ConsoleLogger, NetworkLogger, SilentLogger};