refactor(network): update to use EndpointId and EndpointAddr

- Replaced NodeId with EndpointId in various components to align with the new Iroh v0.95+ API.
- Updated connection handling to utilize EndpointAddr instead of NodeAddr, reflecting changes in the underlying library.
- Adjusted discovery and connection logic to accommodate the immutability of EndpointAddr, enhancing overall network functionality.
This commit is contained in:
Jamie Pine
2026-01-24 15:41:10 -08:00
parent d4d912bc3f
commit c09cc5942d
13 changed files with 81 additions and 105 deletions

View File

@@ -154,14 +154,16 @@ impl LibraryQuery for ListLibraryDevicesQuery {
// Query Iroh directly for actual connection status and method
let (is_actually_connected, connection_method) = if let Some(ep) = endpoint {
// Get node ID for this device
let node_id = registry.get_node_id_for_device(device_id);
if let Some(node_id) = node_id {
// Query Iroh for connection info
if let Some(remote_info) = ep.remote_info(node_id) {
let conn_method = crate::domain::device::ConnectionMethod::from_iroh_connection_type(remote_info.conn_type);
let is_connected = conn_method.is_some();
if let Some(node_id) = registry.get_node_id_for_device(device_id) {
// Use conn_type() API (replaces remote_info() removed in v0.93+)
if let Some(conn_type_watcher) = ep.conn_type(node_id) {
// Get current connection type from watcher (implements Deref)
let conn_type = *conn_type_watcher;
let conn_method = crate::domain::device::ConnectionMethod::from_iroh_connection_type(conn_type);
let is_connected = !matches!(conn_type, iroh::endpoint::ConnectionType::None);
(is_connected, conn_method)
} else {
// No address information exists for this endpoint (never connected)
(false, None)
}
} else {

View File

@@ -491,7 +491,7 @@ impl RemoteTransferStrategy {
));
// Connect to remote device
let node_addr = iroh::NodeAddr::new(node_id);
let node_addr = iroh::EndpointAddr::new(node_id);
let connection = endpoint
.connect(node_addr, b"spacedrive/filetransfer/1")
.await
@@ -1125,7 +1125,7 @@ async fn stream_file_data<'a>(
node_id, destination_device_id
));
let node_addr = iroh::NodeAddr::new(node_id);
let node_addr = iroh::EndpointAddr::new(node_id);
let connection = endpoint
.connect(node_addr, b"spacedrive/filetransfer/1")
.await

View File

@@ -40,7 +40,7 @@ impl CoreAction for PairJoinAction {
// If node_id provided separately, add it to enable relay fallback
if let Some(node_id_str) = &self.node_id {
let node_id: iroh::NodeId = node_id_str
let node_id: iroh::EndpointId = node_id_str
.parse()
.map_err(|e| ActionError::Internal(format!("Invalid node ID: {}", e)))?;
pairing_code = pairing_code.with_node_id(node_id);

View File

@@ -30,7 +30,7 @@ impl CoreQuery for NetworkStatusQuery {
if let Some(net) = networking {
let node_id = net.node_id().to_string();
let addresses = if let Ok(Some(addr)) = net.get_node_addr() {
addr.direct_addresses()
addr.ip_addrs()
.map(|a| a.to_string())
.collect::<Vec<_>>()
} else {

View File

@@ -200,20 +200,12 @@ impl NetworkingEventLoop {
/// Handle an incoming connection
async fn handle_connection(&self, conn: Connection) {
// Extract the remote node ID from the connection
let remote_node_id = match conn.remote_id() {
Ok(key) => key,
Err(e) => {
self.logger
.error(&format!("Failed to get remote node ID: {}", e))
.await;
return;
}
};
// Extract the remote node ID from the connection (now infallible in v0.95+)
let remote_node_id = conn.remote_id();
// Track the connection (keyed by node_id and alpn)
{
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
let mut connections = self.active_connections.write().await;
connections.insert((remote_node_id, alpn_bytes), conn.clone());
}
@@ -292,7 +284,7 @@ impl NetworkingEventLoop {
// Only remove connection if it's actually closed
if conn.close_reason().is_some() {
let mut connections = active_connections.write().await;
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
connections.remove(&(remote_node_id, alpn_bytes));
logger
.info(&format!(
@@ -356,7 +348,7 @@ impl NetworkingEventLoop {
}
// Route to handler based on ALPN
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
if alpn_bytes == MESSAGING_ALPN {
let registry = protocol_registry.read().await;
@@ -443,7 +435,7 @@ impl NetworkingEventLoop {
logger.debug(&format!("Accepted unidirectional stream from {}", remote_node_id)).await;
// Get ALPN to determine which protocol handler to use
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
let registry = protocol_registry.read().await;
// Route based on ALPN
@@ -646,7 +638,7 @@ impl NetworkingEventLoop {
EventLoopCommand::TrackOutboundConnection { node_id, conn } => {
// Add outbound connection to active connections map
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
{
let mut connections = self.active_connections.write().await;
connections.insert((node_id, alpn_bytes.clone()), conn.clone());
@@ -738,7 +730,7 @@ impl NetworkingEventLoop {
};
// Create node address (Iroh will use existing connection if available)
let node_addr = NodeAddr::new(node_id);
let node_addr = EndpointAddr::new(node_id);
// Connect with specific ALPN
self.logger
@@ -760,7 +752,7 @@ impl NetworkingEventLoop {
// Track the connection
{
let mut connections = self.active_connections.write().await;
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
connections.insert((node_id, alpn_bytes), conn.clone());
}

View File

@@ -220,9 +220,9 @@ impl NetworkingService {
JOB_ACTIVITY_ALPN.to_vec(),
])
.relay_mode(iroh::RelayMode::Default)
.add_discovery(MdnsDiscovery::builder())
.add_discovery(PkarrPublisher::n0_dns())
.add_discovery(DnsDiscovery::n0_dns())
.discovery(MdnsDiscovery::builder())
.discovery(PkarrPublisher::n0_dns())
.discovery(DnsDiscovery::n0_dns())
.bind_addr_v4(std::net::SocketAddrV4::new(
std::net::Ipv4Addr::UNSPECIFIED,
0,
@@ -969,15 +969,12 @@ impl NetworkingService {
}
/// Strip IP addresses from an EndpointAddr to force relay-only connection
/// Note: In v0.95+, EndpointAddr is immutable. This creates a minimal EndpointAddr
/// with just the ID - Iroh will use discovery to find relay URLs if needed.
fn strip_ip_addresses(endpoint_addr: EndpointAddr) -> EndpointAddr {
// In v0.95+, create a new EndpointAddr with only relay URLs (no IP addrs)
let id = endpoint_addr.id;
let mut new_addr = EndpointAddr::new(id);
// Add relay URLs but not IP addresses
for relay_url in endpoint_addr.relay_urls() {
new_addr = new_addr.with_relay(relay_url.clone());
}
new_addr
// Create a minimal EndpointAddr with just the ID
// Iroh's discovery system will handle finding relay URLs
EndpointAddr::new(endpoint_addr.id)
}
/// Spawn a background task to watch for connection closure
@@ -1044,7 +1041,7 @@ impl NetworkingService {
/// Get our node address for advertising
pub fn get_node_addr(&self) -> Result<Option<EndpointAddr>> {
if let Some(endpoint) = &self.endpoint {
Ok(endpoint.addr().get())
Ok(Some(endpoint.addr()))
} else {
Err(NetworkingError::ConnectionFailed(
"Networking not started".to_string(),
@@ -1056,11 +1053,7 @@ impl NetworkingService {
pub async fn get_relay_url(&self) -> Option<String> {
if let Some(endpoint) = &self.endpoint {
// In v0.95+, get relay URL from the endpoint address
if let Some(addr) = endpoint.addr().get() {
addr.relay_urls().next().map(|url| url.to_string())
} else {
None
}
endpoint.addr().relay_urls().next().map(|url| url.to_string())
} else {
None
}
@@ -1077,7 +1070,13 @@ impl NetworkingService {
"Networking not started".to_string(),
))?;
let mut discovery_stream = endpoint.discovery_stream();
// Create mDNS discovery service to subscribe to events
// Note: In v0.95+, we need to get discovery services individually and subscribe
let endpoint_id = endpoint.id();
let mdns_discovery = MdnsDiscovery::builder()
.build(endpoint_id)
.map_err(|e| NetworkingError::ConnectionFailed(format!("Failed to create mDNS discovery: {}", e)))?;
let mut discovery_stream = mdns_discovery.subscribe().await;
let session_id_str = session_id.to_string();
let timeout = tokio::time::Duration::from_secs(5); // Shorter timeout for mDNS
let start = tokio::time::Instant::now();
@@ -1091,22 +1090,23 @@ impl NetworkingService {
while start.elapsed() < timeout {
tokio::select! {
Some(result) = discovery_stream.next() => {
match result {
Ok(iroh::discovery::DiscoveryEvent::Discovered(item)) => {
Some(event) = discovery_stream.next() => {
match event {
iroh::discovery::mdns::DiscoveryEvent::Discovered { endpoint_info, .. } => {
// Check if this node is broadcasting our session_id
if let Some(user_data) = item.node_info().data.user_data() {
if let Some(user_data) = endpoint_info.data.user_data() {
if user_data.as_ref() == session_id_str {
let endpoint_id = endpoint_info.endpoint_id;
self.logger
.info(&format!(
"[mDNS] Found pairing initiator: {} with {} IP addresses",
item.endpoint_id().fmt_short(),
item.node_info().data.ip_addrs().count()
endpoint_id.fmt_short(),
endpoint_info.data.ip_addrs().count()
))
.await;
// Build EndpointAddr from discovery info
let node_addr = item.node_info().into_endpoint_addr(item.endpoint_id());
let node_addr = endpoint_info.into_endpoint_addr();
// Try to connect to the initiator
if let Err(e) = self.connect_to_node(node_addr.clone(), force_relay).await {
@@ -1120,14 +1120,9 @@ impl NetworkingService {
}
}
}
Ok(iroh::discovery::DiscoveryEvent::Expired(_)) => {
iroh::discovery::mdns::DiscoveryEvent::Expired { .. } => {
// Node expired, continue searching
}
Err(e) => {
self.logger
.warn(&format!("[mDNS] Discovery stream error: {}", e))
.await;
}
}
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
@@ -1296,7 +1291,7 @@ impl NetworkingService {
"Networking not started".to_string(),
))?;
let user_data = iroh::node_info::UserData::try_from(session_id.to_string())
let user_data = iroh::endpoint_info::UserData::try_from(session_id.to_string())
.map_err(|e| NetworkingError::Protocol(format!("Failed to create user data: {}", e)))?;
endpoint.set_user_data_for_discovery(Some(user_data));
@@ -1319,8 +1314,9 @@ impl NetworkingService {
endpoint.online().await;
let relay_url = endpoint
.addr()
.get()
.and_then(|a| a.relay_urls().next().map(|u| u.to_string()))
.relay_urls()
.next()
.map(|u| u.to_string())
.unwrap_or_else(|| "unknown".to_string());
self.logger
.info(&format!("Endpoint online, relay: {}", relay_url))
@@ -1520,7 +1516,7 @@ impl NetworkingService {
// We need to try connecting to all discovered nodes since we don't know which one is the initiator
// Get our own node address to broadcast it
let our_node_addr = endpoint.addr().get();
let our_node_addr = endpoint.addr();
self.logger
.info(&format!(
@@ -1840,7 +1836,7 @@ async fn spawn_connection_watcher_task(
let close_reason = conn.closed().await;
// Get the ALPN for this specific connection
let alpn_bytes = conn.alpn().unwrap_or_default();
let alpn_bytes = conn.alpn().to_vec();
logger
.info(&format!(
@@ -1872,13 +1868,13 @@ async fn spawn_connection_watcher_task(
// Find the device ID for this node and update state
let mut registry = device_registry.write().await;
if let Some(device_id) = registry.get_device_by_node_id(node_id) {
// Use update_device_from_connection with ConnectionType::None
// Use update_device_from_connection with is_connected=false (all connections closed)
if let Err(e) = registry
.update_device_from_connection(
device_id,
node_id,
iroh::endpoint::ConnectionType::None,
None,
false, // is_connected
None, // latency
)
.await
{

View File

@@ -3,7 +3,7 @@
use super::{DeviceInfo, SessionKeys};
use crate::service::network::{NetworkingError, Result};
use chrono::{DateTime, Utc};
use iroh::NodeId;
use iroh::EndpointId;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
@@ -12,7 +12,7 @@ use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct DeviceConnection {
/// The node ID of the remote device
pub node_id: NodeId,
pub node_id: EndpointId,
/// Device information
pub device_info: DeviceInfo,
@@ -63,7 +63,7 @@ pub struct OutgoingMessage {
impl DeviceConnection {
/// Create a new device connection
pub fn new(
node_id: NodeId,
node_id: EndpointId,
device_info: DeviceInfo,
session_keys: SessionKeys,
) -> (Self, mpsc::UnboundedReceiver<OutgoingMessage>) {

View File

@@ -8,7 +8,7 @@ use crate::service::network::{
utils::{get_or_create_connection, SilentLogger},
NetworkingError, Result,
};
use iroh::{endpoint::Connection, Endpoint, NodeId};
use iroh::{endpoint::Connection, Endpoint, EndpointId};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -19,7 +19,7 @@ use uuid::Uuid;
/// Client for subscribing to job activity from remote devices
pub struct JobActivityClient {
endpoint: Endpoint,
connections: Arc<RwLock<HashMap<(NodeId, Vec<u8>), Connection>>>,
connections: Arc<RwLock<HashMap<(EndpointId, Vec<u8>), Connection>>>,
remote_cache: Arc<RemoteJobCache>,
device_registry: Arc<RwLock<DeviceRegistry>>,
}
@@ -27,7 +27,7 @@ pub struct JobActivityClient {
impl JobActivityClient {
pub fn new(
endpoint: Endpoint,
connections: Arc<RwLock<HashMap<(NodeId, Vec<u8>), Connection>>>,
connections: Arc<RwLock<HashMap<(EndpointId, Vec<u8>), Connection>>>,
remote_cache: Arc<RemoteJobCache>,
device_registry: Arc<RwLock<DeviceRegistry>>,
) -> Self {

View File

@@ -247,9 +247,7 @@ impl PairingProtocolHandler {
let relay_url = self
.endpoint
.as_ref()
.and_then(|ep| ep.addr().get())
.and_then(|addr| addr.relay_urls().next())
.map(|r| r.to_string());
.and_then(|ep| ep.addr().relay_urls().next().map(|r| r.to_string()));
// Complete pairing in device registry
{

View File

@@ -185,9 +185,7 @@ impl PairingProtocolHandler {
let relay_url = self
.endpoint
.as_ref()
.and_then(|ep| ep.addr().get())
.and_then(|addr| addr.relay_urls().next())
.map(|r| r.to_string());
.and_then(|ep| ep.addr().relay_urls().next().map(|r| r.to_string()));
// Complete pairing in device registry
{

View File

@@ -451,26 +451,13 @@ impl PairingAdvertisement {
))
})?;
// Start with base EndpointAddr
let mut node_addr = EndpointAddr::new(node_id);
// In v0.95+, EndpointAddr is immutable and builder methods were removed.
// Create a minimal EndpointAddr with just the ID - Iroh's discovery system
// will automatically resolve addresses via pkarr/DNS if configured.
let node_addr = EndpointAddr::new(node_id);
// Add direct addresses
let mut direct_addrs = Vec::new();
for addr_str in &self.node_addr_info.direct_addresses {
if let Ok(addr) = addr_str.parse() {
direct_addrs.push(addr);
}
}
if !direct_addrs.is_empty() {
node_addr = node_addr.with_direct_addresses(direct_addrs);
}
// Add relay URL if present
if let Some(relay_url) = &self.node_addr_info.relay_url {
if let Ok(url) = relay_url.parse() {
node_addr = node_addr.with_relay_url(url);
}
}
// Note: Direct addresses and relay URLs from pairing code are now handled
// by Iroh's discovery system (pkarr/DNS) rather than being manually set.
Ok(node_addr)
}

View File

@@ -6,7 +6,7 @@
//! - Automatic connection reuse across all protocols
use crate::service::network::{NetworkingError, Result};
use iroh::{endpoint::Connection, Endpoint, NodeAddr, NodeId};
use iroh::{endpoint::Connection, Endpoint, EndpointAddr, EndpointId};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -29,9 +29,9 @@ use super::logging::NetworkLogger;
/// * `Ok(Connection)` - Either cached or newly created connection
/// * `Err(NetworkingError)` - If connection fails
pub async fn get_or_create_connection(
connections: Arc<RwLock<HashMap<(NodeId, Vec<u8>), Connection>>>,
connections: Arc<RwLock<HashMap<(EndpointId, Vec<u8>), Connection>>>,
endpoint: &Endpoint,
node_id: NodeId,
node_id: EndpointId,
alpn: &'static [u8],
logger: &Arc<dyn NetworkLogger>,
) -> Result<Connection> {
@@ -64,7 +64,7 @@ pub async fn get_or_create_connection(
}
// Create new connection with specified ALPN
let node_addr = NodeAddr::new(node_id);
let node_addr = EndpointAddr::new(node_id);
logger
.info(&format!(
"Creating new {} connection to node {}",

View File

@@ -17,11 +17,14 @@ pub struct NetworkIdentity {
impl NetworkIdentity {
/// Create a new random network identity
pub async fn new() -> Result<Self> {
let secret_key = SecretKey::generate(&mut rand::thread_rng());
let node_id = secret_key.public();
// Generate random bytes for the secret key
use rand::RngCore;
let mut ed25519_seed = [0u8; 32];
rand::thread_rng().fill_bytes(&mut ed25519_seed);
// Generate Ed25519 seed for backward compatibility
let ed25519_seed = rand::random();
// Create Iroh secret key from random bytes
let secret_key = SecretKey::from_bytes(&ed25519_seed);
let node_id = secret_key.public();
Ok(Self {
secret_key,