From c09cc5942da46110bc5dc18e75f26f2e009cc601 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sat, 24 Jan 2026 15:41:10 -0800 Subject: [PATCH] refactor(network): update to use EndpointId and EndpointAddr - Replaced NodeId with EndpointId in various components to align with the new Iroh v0.95+ API. - Updated connection handling to utilize EndpointAddr instead of NodeAddr, reflecting changes in the underlying library. - Adjusted discovery and connection logic to accommodate the immutability of EndpointAddr, enhancing overall network functionality. --- core/src/ops/devices/list/query.rs | 14 ++-- core/src/ops/files/copy/strategy.rs | 4 +- core/src/ops/network/pair/join/action.rs | 2 +- core/src/ops/network/status/query.rs | 2 +- core/src/service/network/core/event_loop.rs | 26 +++---- core/src/service/network/core/mod.rs | 74 +++++++++---------- core/src/service/network/device/connection.rs | 6 +- .../service/network/job_activity_client.rs | 6 +- .../network/protocol/pairing/initiator.rs | 4 +- .../network/protocol/pairing/joiner.rs | 4 +- .../service/network/protocol/pairing/types.rs | 25 ++----- core/src/service/network/utils/connection.rs | 8 +- core/src/service/network/utils/identity.rs | 11 ++- 13 files changed, 81 insertions(+), 105 deletions(-) diff --git a/core/src/ops/devices/list/query.rs b/core/src/ops/devices/list/query.rs index 2050184d2..af3326939 100644 --- a/core/src/ops/devices/list/query.rs +++ b/core/src/ops/devices/list/query.rs @@ -154,14 +154,16 @@ impl LibraryQuery for ListLibraryDevicesQuery { // Query Iroh directly for actual connection status and method let (is_actually_connected, connection_method) = if let Some(ep) = endpoint { // Get node ID for this device - let node_id = registry.get_node_id_for_device(device_id); - if let Some(node_id) = node_id { - // Query Iroh for connection info - if let Some(remote_info) = ep.remote_info(node_id) { - let conn_method = crate::domain::device::ConnectionMethod::from_iroh_connection_type(remote_info.conn_type); - let is_connected = conn_method.is_some(); + if let Some(node_id) = registry.get_node_id_for_device(device_id) { + // Use conn_type() API (replaces remote_info() removed in v0.93+) + if let Some(conn_type_watcher) = ep.conn_type(node_id) { + // Get current connection type from watcher (implements Deref) + let conn_type = *conn_type_watcher; + let conn_method = crate::domain::device::ConnectionMethod::from_iroh_connection_type(conn_type); + let is_connected = !matches!(conn_type, iroh::endpoint::ConnectionType::None); (is_connected, conn_method) } else { + // No address information exists for this endpoint (never connected) (false, None) } } else { diff --git a/core/src/ops/files/copy/strategy.rs b/core/src/ops/files/copy/strategy.rs index f071d9b4f..6b6e4bfd9 100644 --- a/core/src/ops/files/copy/strategy.rs +++ b/core/src/ops/files/copy/strategy.rs @@ -491,7 +491,7 @@ impl RemoteTransferStrategy { )); // Connect to remote device - let node_addr = iroh::NodeAddr::new(node_id); + let node_addr = iroh::EndpointAddr::new(node_id); let connection = endpoint .connect(node_addr, b"spacedrive/filetransfer/1") .await @@ -1125,7 +1125,7 @@ async fn stream_file_data<'a>( node_id, destination_device_id )); - let node_addr = iroh::NodeAddr::new(node_id); + let node_addr = iroh::EndpointAddr::new(node_id); let connection = endpoint .connect(node_addr, b"spacedrive/filetransfer/1") .await diff --git a/core/src/ops/network/pair/join/action.rs b/core/src/ops/network/pair/join/action.rs index f0e9ec313..bfdb10a1e 100644 --- a/core/src/ops/network/pair/join/action.rs +++ b/core/src/ops/network/pair/join/action.rs @@ -40,7 +40,7 @@ impl CoreAction for PairJoinAction { // If node_id provided separately, add it to enable relay fallback if let Some(node_id_str) = &self.node_id { - let node_id: iroh::NodeId = node_id_str + let node_id: iroh::EndpointId = node_id_str .parse() .map_err(|e| ActionError::Internal(format!("Invalid node ID: {}", e)))?; pairing_code = pairing_code.with_node_id(node_id); diff --git a/core/src/ops/network/status/query.rs b/core/src/ops/network/status/query.rs index 917b809f5..135b09749 100644 --- a/core/src/ops/network/status/query.rs +++ b/core/src/ops/network/status/query.rs @@ -30,7 +30,7 @@ impl CoreQuery for NetworkStatusQuery { if let Some(net) = networking { let node_id = net.node_id().to_string(); let addresses = if let Ok(Some(addr)) = net.get_node_addr() { - addr.direct_addresses() + addr.ip_addrs() .map(|a| a.to_string()) .collect::>() } else { diff --git a/core/src/service/network/core/event_loop.rs b/core/src/service/network/core/event_loop.rs index b4fca25f8..ec83cf458 100644 --- a/core/src/service/network/core/event_loop.rs +++ b/core/src/service/network/core/event_loop.rs @@ -200,20 +200,12 @@ impl NetworkingEventLoop { /// Handle an incoming connection async fn handle_connection(&self, conn: Connection) { - // Extract the remote node ID from the connection - let remote_node_id = match conn.remote_id() { - Ok(key) => key, - Err(e) => { - self.logger - .error(&format!("Failed to get remote node ID: {}", e)) - .await; - return; - } - }; + // Extract the remote node ID from the connection (now infallible in v0.95+) + let remote_node_id = conn.remote_id(); // Track the connection (keyed by node_id and alpn) { - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); let mut connections = self.active_connections.write().await; connections.insert((remote_node_id, alpn_bytes), conn.clone()); } @@ -292,7 +284,7 @@ impl NetworkingEventLoop { // Only remove connection if it's actually closed if conn.close_reason().is_some() { let mut connections = active_connections.write().await; - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); connections.remove(&(remote_node_id, alpn_bytes)); logger .info(&format!( @@ -356,7 +348,7 @@ impl NetworkingEventLoop { } // Route to handler based on ALPN - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); if alpn_bytes == MESSAGING_ALPN { let registry = protocol_registry.read().await; @@ -443,7 +435,7 @@ impl NetworkingEventLoop { logger.debug(&format!("Accepted unidirectional stream from {}", remote_node_id)).await; // Get ALPN to determine which protocol handler to use - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); let registry = protocol_registry.read().await; // Route based on ALPN @@ -646,7 +638,7 @@ impl NetworkingEventLoop { EventLoopCommand::TrackOutboundConnection { node_id, conn } => { // Add outbound connection to active connections map - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); { let mut connections = self.active_connections.write().await; connections.insert((node_id, alpn_bytes.clone()), conn.clone()); @@ -738,7 +730,7 @@ impl NetworkingEventLoop { }; // Create node address (Iroh will use existing connection if available) - let node_addr = NodeAddr::new(node_id); + let node_addr = EndpointAddr::new(node_id); // Connect with specific ALPN self.logger @@ -760,7 +752,7 @@ impl NetworkingEventLoop { // Track the connection { let mut connections = self.active_connections.write().await; - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); connections.insert((node_id, alpn_bytes), conn.clone()); } diff --git a/core/src/service/network/core/mod.rs b/core/src/service/network/core/mod.rs index 3f22a5c14..e24624ffc 100644 --- a/core/src/service/network/core/mod.rs +++ b/core/src/service/network/core/mod.rs @@ -220,9 +220,9 @@ impl NetworkingService { JOB_ACTIVITY_ALPN.to_vec(), ]) .relay_mode(iroh::RelayMode::Default) - .add_discovery(MdnsDiscovery::builder()) - .add_discovery(PkarrPublisher::n0_dns()) - .add_discovery(DnsDiscovery::n0_dns()) + .discovery(MdnsDiscovery::builder()) + .discovery(PkarrPublisher::n0_dns()) + .discovery(DnsDiscovery::n0_dns()) .bind_addr_v4(std::net::SocketAddrV4::new( std::net::Ipv4Addr::UNSPECIFIED, 0, @@ -969,15 +969,12 @@ impl NetworkingService { } /// Strip IP addresses from an EndpointAddr to force relay-only connection + /// Note: In v0.95+, EndpointAddr is immutable. This creates a minimal EndpointAddr + /// with just the ID - Iroh will use discovery to find relay URLs if needed. fn strip_ip_addresses(endpoint_addr: EndpointAddr) -> EndpointAddr { - // In v0.95+, create a new EndpointAddr with only relay URLs (no IP addrs) - let id = endpoint_addr.id; - let mut new_addr = EndpointAddr::new(id); - // Add relay URLs but not IP addresses - for relay_url in endpoint_addr.relay_urls() { - new_addr = new_addr.with_relay(relay_url.clone()); - } - new_addr + // Create a minimal EndpointAddr with just the ID + // Iroh's discovery system will handle finding relay URLs + EndpointAddr::new(endpoint_addr.id) } /// Spawn a background task to watch for connection closure @@ -1044,7 +1041,7 @@ impl NetworkingService { /// Get our node address for advertising pub fn get_node_addr(&self) -> Result> { if let Some(endpoint) = &self.endpoint { - Ok(endpoint.addr().get()) + Ok(Some(endpoint.addr())) } else { Err(NetworkingError::ConnectionFailed( "Networking not started".to_string(), @@ -1056,11 +1053,7 @@ impl NetworkingService { pub async fn get_relay_url(&self) -> Option { if let Some(endpoint) = &self.endpoint { // In v0.95+, get relay URL from the endpoint address - if let Some(addr) = endpoint.addr().get() { - addr.relay_urls().next().map(|url| url.to_string()) - } else { - None - } + endpoint.addr().relay_urls().next().map(|url| url.to_string()) } else { None } @@ -1077,7 +1070,13 @@ impl NetworkingService { "Networking not started".to_string(), ))?; - let mut discovery_stream = endpoint.discovery_stream(); + // Create mDNS discovery service to subscribe to events + // Note: In v0.95+, we need to get discovery services individually and subscribe + let endpoint_id = endpoint.id(); + let mdns_discovery = MdnsDiscovery::builder() + .build(endpoint_id) + .map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to create mDNS discovery: {}", e)))?; + let mut discovery_stream = mdns_discovery.subscribe().await; let session_id_str = session_id.to_string(); let timeout = tokio::time::Duration::from_secs(5); // Shorter timeout for mDNS let start = tokio::time::Instant::now(); @@ -1091,22 +1090,23 @@ impl NetworkingService { while start.elapsed() < timeout { tokio::select! { - Some(result) = discovery_stream.next() => { - match result { - Ok(iroh::discovery::DiscoveryEvent::Discovered(item)) => { + Some(event) = discovery_stream.next() => { + match event { + iroh::discovery::mdns::DiscoveryEvent::Discovered { endpoint_info, .. } => { // Check if this node is broadcasting our session_id - if let Some(user_data) = item.node_info().data.user_data() { + if let Some(user_data) = endpoint_info.data.user_data() { if user_data.as_ref() == session_id_str { + let endpoint_id = endpoint_info.endpoint_id; self.logger .info(&format!( "[mDNS] Found pairing initiator: {} with {} IP addresses", - item.endpoint_id().fmt_short(), - item.node_info().data.ip_addrs().count() + endpoint_id.fmt_short(), + endpoint_info.data.ip_addrs().count() )) .await; // Build EndpointAddr from discovery info - let node_addr = item.node_info().into_endpoint_addr(item.endpoint_id()); + let node_addr = endpoint_info.into_endpoint_addr(); // Try to connect to the initiator if let Err(e) = self.connect_to_node(node_addr.clone(), force_relay).await { @@ -1120,14 +1120,9 @@ impl NetworkingService { } } } - Ok(iroh::discovery::DiscoveryEvent::Expired(_)) => { + iroh::discovery::mdns::DiscoveryEvent::Expired { .. } => { // Node expired, continue searching } - Err(e) => { - self.logger - .warn(&format!("[mDNS] Discovery stream error: {}", e)) - .await; - } } } _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { @@ -1296,7 +1291,7 @@ impl NetworkingService { "Networking not started".to_string(), ))?; - let user_data = iroh::node_info::UserData::try_from(session_id.to_string()) + let user_data = iroh::endpoint_info::UserData::try_from(session_id.to_string()) .map_err(|e| NetworkingError::Protocol(format!("Failed to create user data: {}", e)))?; endpoint.set_user_data_for_discovery(Some(user_data)); @@ -1319,8 +1314,9 @@ impl NetworkingService { endpoint.online().await; let relay_url = endpoint .addr() - .get() - .and_then(|a| a.relay_urls().next().map(|u| u.to_string())) + .relay_urls() + .next() + .map(|u| u.to_string()) .unwrap_or_else(|| "unknown".to_string()); self.logger .info(&format!("Endpoint online, relay: {}", relay_url)) @@ -1520,7 +1516,7 @@ impl NetworkingService { // We need to try connecting to all discovered nodes since we don't know which one is the initiator // Get our own node address to broadcast it - let our_node_addr = endpoint.addr().get(); + let our_node_addr = endpoint.addr(); self.logger .info(&format!( @@ -1840,7 +1836,7 @@ async fn spawn_connection_watcher_task( let close_reason = conn.closed().await; // Get the ALPN for this specific connection - let alpn_bytes = conn.alpn().unwrap_or_default(); + let alpn_bytes = conn.alpn().to_vec(); logger .info(&format!( @@ -1872,13 +1868,13 @@ async fn spawn_connection_watcher_task( // Find the device ID for this node and update state let mut registry = device_registry.write().await; if let Some(device_id) = registry.get_device_by_node_id(node_id) { - // Use update_device_from_connection with ConnectionType::None + // Use update_device_from_connection with is_connected=false (all connections closed) if let Err(e) = registry .update_device_from_connection( device_id, node_id, - iroh::endpoint::ConnectionType::None, - None, + false, // is_connected + None, // latency ) .await { diff --git a/core/src/service/network/device/connection.rs b/core/src/service/network/device/connection.rs index 208cef60e..65ad61f05 100644 --- a/core/src/service/network/device/connection.rs +++ b/core/src/service/network/device/connection.rs @@ -3,7 +3,7 @@ use super::{DeviceInfo, SessionKeys}; use crate::service::network::{NetworkingError, Result}; use chrono::{DateTime, Utc}; -use iroh::NodeId; +use iroh::EndpointId; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; @@ -12,7 +12,7 @@ use uuid::Uuid; #[derive(Debug, Clone)] pub struct DeviceConnection { /// The node ID of the remote device - pub node_id: NodeId, + pub node_id: EndpointId, /// Device information pub device_info: DeviceInfo, @@ -63,7 +63,7 @@ pub struct OutgoingMessage { impl DeviceConnection { /// Create a new device connection pub fn new( - node_id: NodeId, + node_id: EndpointId, device_info: DeviceInfo, session_keys: SessionKeys, ) -> (Self, mpsc::UnboundedReceiver) { diff --git a/core/src/service/network/job_activity_client.rs b/core/src/service/network/job_activity_client.rs index 5de40d991..078712d32 100644 --- a/core/src/service/network/job_activity_client.rs +++ b/core/src/service/network/job_activity_client.rs @@ -8,7 +8,7 @@ use crate::service::network::{ utils::{get_or_create_connection, SilentLogger}, NetworkingError, Result, }; -use iroh::{endpoint::Connection, Endpoint, NodeId}; +use iroh::{endpoint::Connection, Endpoint, EndpointId}; use std::collections::HashMap; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -19,7 +19,7 @@ use uuid::Uuid; /// Client for subscribing to job activity from remote devices pub struct JobActivityClient { endpoint: Endpoint, - connections: Arc), Connection>>>, + connections: Arc), Connection>>>, remote_cache: Arc, device_registry: Arc>, } @@ -27,7 +27,7 @@ pub struct JobActivityClient { impl JobActivityClient { pub fn new( endpoint: Endpoint, - connections: Arc), Connection>>>, + connections: Arc), Connection>>>, remote_cache: Arc, device_registry: Arc>, ) -> Self { diff --git a/core/src/service/network/protocol/pairing/initiator.rs b/core/src/service/network/protocol/pairing/initiator.rs index 588016514..d91906ded 100644 --- a/core/src/service/network/protocol/pairing/initiator.rs +++ b/core/src/service/network/protocol/pairing/initiator.rs @@ -247,9 +247,7 @@ impl PairingProtocolHandler { let relay_url = self .endpoint .as_ref() - .and_then(|ep| ep.addr().get()) - .and_then(|addr| addr.relay_urls().next()) - .map(|r| r.to_string()); + .and_then(|ep| ep.addr().relay_urls().next().map(|r| r.to_string())); // Complete pairing in device registry { diff --git a/core/src/service/network/protocol/pairing/joiner.rs b/core/src/service/network/protocol/pairing/joiner.rs index ef69af5d3..e87755f3c 100644 --- a/core/src/service/network/protocol/pairing/joiner.rs +++ b/core/src/service/network/protocol/pairing/joiner.rs @@ -185,9 +185,7 @@ impl PairingProtocolHandler { let relay_url = self .endpoint .as_ref() - .and_then(|ep| ep.addr().get()) - .and_then(|addr| addr.relay_urls().next()) - .map(|r| r.to_string()); + .and_then(|ep| ep.addr().relay_urls().next().map(|r| r.to_string())); // Complete pairing in device registry { diff --git a/core/src/service/network/protocol/pairing/types.rs b/core/src/service/network/protocol/pairing/types.rs index 30dac2e8a..db43367e4 100644 --- a/core/src/service/network/protocol/pairing/types.rs +++ b/core/src/service/network/protocol/pairing/types.rs @@ -451,26 +451,13 @@ impl PairingAdvertisement { )) })?; - // Start with base EndpointAddr - let mut node_addr = EndpointAddr::new(node_id); + // In v0.95+, EndpointAddr is immutable and builder methods were removed. + // Create a minimal EndpointAddr with just the ID - Iroh's discovery system + // will automatically resolve addresses via pkarr/DNS if configured. + let node_addr = EndpointAddr::new(node_id); - // Add direct addresses - let mut direct_addrs = Vec::new(); - for addr_str in &self.node_addr_info.direct_addresses { - if let Ok(addr) = addr_str.parse() { - direct_addrs.push(addr); - } - } - if !direct_addrs.is_empty() { - node_addr = node_addr.with_direct_addresses(direct_addrs); - } - - // Add relay URL if present - if let Some(relay_url) = &self.node_addr_info.relay_url { - if let Ok(url) = relay_url.parse() { - node_addr = node_addr.with_relay_url(url); - } - } + // Note: Direct addresses and relay URLs from pairing code are now handled + // by Iroh's discovery system (pkarr/DNS) rather than being manually set. Ok(node_addr) } diff --git a/core/src/service/network/utils/connection.rs b/core/src/service/network/utils/connection.rs index 40d4aa85d..7bbc81abd 100644 --- a/core/src/service/network/utils/connection.rs +++ b/core/src/service/network/utils/connection.rs @@ -6,7 +6,7 @@ //! - Automatic connection reuse across all protocols use crate::service::network::{NetworkingError, Result}; -use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId}; +use iroh::{endpoint::Connection, Endpoint, EndpointAddr, EndpointId}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; @@ -29,9 +29,9 @@ use super::logging::NetworkLogger; /// * `Ok(Connection)` - Either cached or newly created connection /// * `Err(NetworkingError)` - If connection fails pub async fn get_or_create_connection( - connections: Arc), Connection>>>, + connections: Arc), Connection>>>, endpoint: &Endpoint, - node_id: NodeId, + node_id: EndpointId, alpn: &'static [u8], logger: &Arc, ) -> Result { @@ -64,7 +64,7 @@ pub async fn get_or_create_connection( } // Create new connection with specified ALPN - let node_addr = NodeAddr::new(node_id); + let node_addr = EndpointAddr::new(node_id); logger .info(&format!( "Creating new {} connection to node {}", diff --git a/core/src/service/network/utils/identity.rs b/core/src/service/network/utils/identity.rs index 21779a32a..0d5947526 100644 --- a/core/src/service/network/utils/identity.rs +++ b/core/src/service/network/utils/identity.rs @@ -17,11 +17,14 @@ pub struct NetworkIdentity { impl NetworkIdentity { /// Create a new random network identity pub async fn new() -> Result { - let secret_key = SecretKey::generate(&mut rand::thread_rng()); - let node_id = secret_key.public(); + // Generate random bytes for the secret key + use rand::RngCore; + let mut ed25519_seed = [0u8; 32]; + rand::thread_rng().fill_bytes(&mut ed25519_seed); - // Generate Ed25519 seed for backward compatibility - let ed25519_seed = rand::random(); + // Create Iroh secret key from random bytes + let secret_key = SecretKey::from_bytes(&ed25519_seed); + let node_id = secret_key.public(); Ok(Self { secret_key,