feat: implement connection caching for improved network performance

- Enhanced the `NetworkingService` to cache active connections, allowing for reuse of existing connections and reducing the overhead of establishing new ones.
- Implemented logic to check the status of cached connections before creating new ones, improving efficiency during synchronization and request handling.
- Added detailed logging for connection reuse and creation, aiding in monitoring and debugging network interactions.
This commit is contained in:
Jamie Pine
2025-11-15 07:42:29 -08:00
parent 15821e581a
commit eb4e1db688

View File

@@ -56,31 +56,69 @@ impl NetworkTransport for NetworkingService {
let bytes = serde_json::to_vec(&message)
.map_err(|e| anyhow::anyhow!("Failed to serialize sync message: {}", e))?;
// 3. Get Iroh endpoint via public getter
// 3. Get or create connection (with caching for massive performance improvement)
let endpoint = self
.endpoint()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// Open a connection and send
// Note: Iroh handles connection pooling, so repeated calls are efficient
let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
warn!(
let active_connections = self.active_connections();
let cache_key = (node_id, SYNC_ALPN.to_vec());
// Check cache first - reuse existing connection if alive
let conn = {
let connections = active_connections.read().await;
if let Some(cached_conn) = connections.get(&cache_key) {
if cached_conn.close_reason().is_none() {
tracing::debug!(
device_uuid = %target_device,
"Reusing cached connection (avoids TLS handshake)"
);
Some(cached_conn.clone())
} else {
None // Connection closed, need new one
}
} else {
None
}
};
// Create new connection only if cache miss
let conn = if let Some(conn) = conn {
conn
} else {
tracing::debug!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync"
"Creating new connection (cache miss)"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Track outbound connection so we can receive incoming streams on it
if let Some(cmd_sender) = self.command_sender() {
use crate::service::network::core::event_loop::EventLoopCommand;
let _ = cmd_sender.send(EventLoopCommand::TrackOutboundConnection {
node_id,
conn: conn.clone(),
});
}
let new_conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Add to cache
{
let mut connections = active_connections.write().await;
connections.insert(cache_key, new_conn.clone());
}
// Track outbound connection so we can receive incoming streams on it
if let Some(cmd_sender) = self.command_sender() {
use crate::service::network::core::event_loop::EventLoopCommand;
let _ = cmd_sender.send(EventLoopCommand::TrackOutboundConnection {
node_id,
conn: new_conn.clone(),
});
}
new_conn
};
// Open a unidirectional stream and send the message
let mut send = conn
@@ -149,30 +187,68 @@ impl NetworkTransport for NetworkingService {
"Sending sync request"
);
// Get endpoint
// Get or create connection (with caching)
let endpoint = self
.endpoint()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// Connect with SYNC_ALPN
let conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync request"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
let active_connections = self.active_connections();
let cache_key = (node_id, SYNC_ALPN.to_vec());
// Track outbound connection so we can receive incoming streams on it
if let Some(cmd_sender) = self.command_sender() {
use crate::service::network::core::event_loop::EventLoopCommand;
let _ = cmd_sender.send(EventLoopCommand::TrackOutboundConnection {
node_id,
conn: conn.clone(),
});
}
// Check cache first
let conn = {
let connections = active_connections.read().await;
if let Some(cached_conn) = connections.get(&cache_key) {
if cached_conn.close_reason().is_none() {
tracing::debug!(
device_uuid = %target_device,
"Reusing cached connection for request"
);
Some(cached_conn.clone())
} else {
None
}
} else {
None
}
};
// Create if needed
let conn = if let Some(conn) = conn {
conn
} else {
tracing::debug!(
device_uuid = %target_device,
"Creating new connection for request"
);
let new_conn = endpoint.connect(node_id, SYNC_ALPN).await.map_err(|e| {
warn!(
device_uuid = %target_device,
node_id = %node_id,
error = %e,
"Failed to connect to device for sync request"
);
anyhow::anyhow!("Failed to connect to {}: {}", target_device, e)
})?;
// Cache it
{
let mut connections = active_connections.write().await;
connections.insert(cache_key, new_conn.clone());
}
// Track it
if let Some(cmd_sender) = self.command_sender() {
use crate::service::network::core::event_loop::EventLoopCommand;
let _ = cmd_sender.send(EventLoopCommand::TrackOutboundConnection {
node_id,
conn: new_conn.clone(),
});
}
new_conn
};
// Open bidirectional stream
let (mut send, mut recv) = conn