From e43ea3be67b110ff7fb6c1b2bc0032f9881fd55e Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 22 Jun 2025 17:55:27 -0700 Subject: [PATCH] Enhance DHT-based pairing session discovery and advertisement Key Changes: - Implemented DHT record publishing for pairing sessions in `Core` and `NetworkingCore`. - Added `PairingAdvertisement` struct for serializing pairing session details. - Updated event loop to handle DHT record queries and responses, enabling discovery of pairing sessions. - Enhanced CLI commands to support querying and publishing pairing advertisements. Impact: - Improved device discovery during pairing, allowing devices to find each other more efficiently. - Streamlined the pairing process by integrating DHT functionalities, enhancing user experience. This commit lays the groundwork for a more robust and responsive pairing mechanism in the networking layer. --- core-new/src/infrastructure/cli/commands.rs | 414 +++++++++++++----- core-new/src/infrastructure/cli/mod.rs | 30 +- .../networking/REMAINING_WORK.md | 174 ++++++++ .../networking/core/event_loop.rs | 144 +++++- .../src/infrastructure/networking/core/mod.rs | 64 ++- .../networking/protocols/pairing.rs | 48 ++ .../networking/utils/identity.rs | 25 ++ core-new/src/lib.rs | 35 +- 8 files changed, 801 insertions(+), 133 deletions(-) create mode 100644 core-new/src/infrastructure/networking/REMAINING_WORK.md diff --git a/core-new/src/infrastructure/cli/commands.rs b/core-new/src/infrastructure/cli/commands.rs index e4a27e9bd..e010ed2c7 100644 --- a/core-new/src/infrastructure/cli/commands.rs +++ b/core-new/src/infrastructure/cli/commands.rs @@ -222,22 +222,22 @@ pub enum NetworkCommands { #[arg(short, long)] password: String, }, - + /// Start networking service Start, - + /// Stop networking service Stop, - + /// List connected devices Devices, - + /// Revoke trust from a device Revoke { /// Device ID to revoke device_id: Uuid, }, - + /// Send a file via Spacedrop Spacedrop { /// Target device ID @@ -753,9 +753,9 @@ pub async fn handle_network_command( _state: &mut CliState, ) -> Result<(), Box> { use crate::infrastructure::cli::daemon::{DaemonClient, DaemonCommand}; - + let client = DaemonClient::new(); - + // Check if daemon is running for most commands match &cmd { NetworkCommands::Init { .. } => { @@ -763,17 +763,22 @@ pub async fn handle_network_command( } _ => { if !client.is_running() { - println!("{} Daemon is not running. Start it with: {}", - "✗".red(), - "spacedrive start".bright_blue()); + println!( + "{} Daemon is not running. Start it with: {}", + "✗".red(), + "spacedrive start".bright_blue() + ); return Ok(()); } } } - + match cmd { NetworkCommands::Init { password } => { - match client.send_command(DaemonCommand::InitNetworking { password }).await? { + match client + .send_command(DaemonCommand::InitNetworking { password }) + .await? + { crate::infrastructure::cli::daemon::DaemonResponse::Ok => { println!("{} Networking initialized successfully", "✓".green()); } @@ -785,7 +790,7 @@ pub async fn handle_network_command( } } } - + NetworkCommands::Start => { match client.send_command(DaemonCommand::StartNetworking).await? { crate::infrastructure::cli::daemon::DaemonResponse::Ok => { @@ -799,23 +804,24 @@ pub async fn handle_network_command( } } } - - NetworkCommands::Stop => { - match client.send_command(DaemonCommand::StopNetworking).await? { - crate::infrastructure::cli::daemon::DaemonResponse::Ok => { - println!("{} Networking service stopped", "✓".green()); - } - crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => { - println!("{} {}", "✗".red(), err); - } - _ => { - println!("{} Unexpected response", "✗".red()); - } + + NetworkCommands::Stop => match client.send_command(DaemonCommand::StopNetworking).await? { + crate::infrastructure::cli::daemon::DaemonResponse::Ok => { + println!("{} Networking service stopped", "✓".green()); } - } - + crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => { + println!("{} {}", "✗".red(), err); + } + _ => { + println!("{} Unexpected response", "✗".red()); + } + }, + NetworkCommands::Devices => { - match client.send_command(DaemonCommand::ListConnectedDevices).await? { + match client + .send_command(DaemonCommand::ListConnectedDevices) + .await? + { crate::infrastructure::cli::daemon::DaemonResponse::ConnectedDevices(devices) => { if devices.is_empty() { println!("No devices currently connected"); @@ -824,7 +830,7 @@ pub async fn handle_network_command( let mut table = Table::new(); table.load_preset(UTF8_FULL); table.set_header(vec!["Device ID", "Name", "Status", "Last Seen"]); - + for device in devices { table.add_row(vec![ Cell::new(&device.device_id.to_string()[..8]), @@ -833,7 +839,7 @@ pub async fn handle_network_command( Cell::new(&device.last_seen), ]); } - + println!("{}", table); } } @@ -845,9 +851,12 @@ pub async fn handle_network_command( } } } - + NetworkCommands::Revoke { device_id } => { - match client.send_command(DaemonCommand::RevokeDevice { device_id }).await? { + match client + .send_command(DaemonCommand::RevokeDevice { device_id }) + .await? + { crate::infrastructure::cli::daemon::DaemonResponse::Ok => { println!("{} Device {} revoked", "✓".green(), device_id); } @@ -859,16 +868,30 @@ pub async fn handle_network_command( } } } - - NetworkCommands::Spacedrop { device_id, file_path, sender, message } => { - match client.send_command(DaemonCommand::SendSpacedrop { - device_id, - file_path: file_path.to_string_lossy().to_string(), - sender_name: sender, - message - }).await? { - crate::infrastructure::cli::daemon::DaemonResponse::SpacedropStarted { transfer_id } => { - println!("{} Spacedrop started with transfer ID: {}", "✓".green(), transfer_id); + + NetworkCommands::Spacedrop { + device_id, + file_path, + sender, + message, + } => { + match client + .send_command(DaemonCommand::SendSpacedrop { + device_id, + file_path: file_path.to_string_lossy().to_string(), + sender_name: sender, + message, + }) + .await? + { + crate::infrastructure::cli::daemon::DaemonResponse::SpacedropStarted { + transfer_id, + } => { + println!( + "{} Spacedrop started with transfer ID: {}", + "✓".green(), + transfer_id + ); } crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => { println!("{} {}", "✗".red(), err); @@ -883,7 +906,9 @@ pub async fn handle_network_command( // Convert from legacy PairingCommands to new PairingAction let action = match action { PairingCommands::Generate { auto_accept } => { - crate::infrastructure::cli::networking_commands::PairingAction::Generate { auto_accept } + crate::infrastructure::cli::networking_commands::PairingAction::Generate { + auto_accept, + } } PairingCommands::Join { code } => { let code = match code { @@ -897,19 +922,30 @@ pub async fn handle_network_command( }; crate::infrastructure::cli::networking_commands::PairingAction::Join { code } } - PairingCommands::Status => crate::infrastructure::cli::networking_commands::PairingAction::Status, - PairingCommands::ListPending => crate::infrastructure::cli::networking_commands::PairingAction::List, + PairingCommands::Status => { + crate::infrastructure::cli::networking_commands::PairingAction::Status + } + PairingCommands::ListPending => { + crate::infrastructure::cli::networking_commands::PairingAction::List + } PairingCommands::Accept { request_id } => { - crate::infrastructure::cli::networking_commands::PairingAction::Accept { request_id } + crate::infrastructure::cli::networking_commands::PairingAction::Accept { + request_id, + } } PairingCommands::Reject { request_id } => { - crate::infrastructure::cli::networking_commands::PairingAction::Reject { request_id } + crate::infrastructure::cli::networking_commands::PairingAction::Reject { + request_id, + } } }; - crate::infrastructure::cli::networking_commands::handle_pairing_command(action, &client).await?; + crate::infrastructure::cli::networking_commands::handle_pairing_command( + action, &client, + ) + .await?; } } - + Ok(()) } @@ -919,50 +955,100 @@ pub async fn handle_pairing_command( ) -> Result<(), Box> { use crate::infrastructure::cli::daemon::{DaemonCommand, DaemonResponse}; use colored::Colorize; - + match action { PairingCommands::Generate { auto_accept } => { println!("🔐 Generating pairing code..."); - - match client.send_command(DaemonCommand::StartPairingAsInitiator { auto_accept }).await? { - DaemonResponse::PairingCodeGenerated { code, expires_in_seconds } => { - println!("\n📋 {} {}", "Your Pairing Code:".bright_cyan().bold(), "Share this with the other device".dimmed()); + + match client + .send_command(DaemonCommand::StartPairingAsInitiator { auto_accept }) + .await? + { + DaemonResponse::PairingCodeGenerated { + code, + expires_in_seconds, + } => { + println!( + "\n📋 {} {}", + "Your Pairing Code:".bright_cyan().bold(), + "Share this with the other device".dimmed() + ); println!(); println!(" {}", code.bright_white().bold()); println!(); - println!("⏰ {} {} {}", "Expires in".yellow(), expires_in_seconds.to_string().bright_yellow().bold(), "seconds".yellow()); - println!("💡 {} {}", "Tip:".bright_blue(), "The other device should use 'spacedrive network pair join'".dimmed()); - + println!( + "⏰ {} {} {}", + "Expires in".yellow(), + expires_in_seconds.to_string().bright_yellow().bold(), + "seconds".yellow() + ); + println!( + "💡 {} {}", + "Tip:".bright_blue(), + "BOZO The other device should use 'spacedrive network pair join'".dimmed() + ); + if auto_accept { - println!("🤖 {} {}", "Auto-accept enabled:".bright_green(), "Will automatically accept any pairing request".dimmed()); + println!( + "🤖 {} {}", + "Auto-accept enabled:".bright_green(), + "Will automatically accept any pairing request".dimmed() + ); } else { - println!("👤 {} {}", "Manual approval:".bright_yellow(), "You'll be asked to confirm pairing requests".dimmed()); + println!( + "👤 {} {}", + "Manual approval:".bright_yellow(), + "You'll be asked to confirm pairing requests".dimmed() + ); } - + println!("\n📡 Waiting for devices to connect..."); println!(" Press Ctrl+C to cancel"); - + // Wait for pairing completion or timeout loop { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - + match client.send_command(DaemonCommand::GetPairingStatus).await? { - DaemonResponse::PairingStatus { status, remote_device } => { + DaemonResponse::PairingStatus { + status, + remote_device, + } => { match status.as_str() { "completed" => { if let Some(device) = remote_device { - println!("\n🎉 {} {}", "Pairing completed successfully!".bright_green().bold(), "Device connected".dimmed()); - println!(" Device: {}", device.device_name.bright_cyan()); - println!(" ID: {}", device.device_id.to_string().bright_blue()); + println!( + "\n🎉 {} {}", + "Pairing completed successfully!" + .bright_green() + .bold(), + "Device connected".dimmed() + ); + println!( + " Device: {}", + device.device_name.bright_cyan() + ); + println!( + " ID: {}", + device.device_id.to_string().bright_blue() + ); } break; } "failed" => { - println!("\n❌ {} {}", "Pairing failed".bright_red().bold(), "Please try again".dimmed()); + println!( + "\n❌ {} {}", + "Pairing failed".bright_red().bold(), + "Please try again".dimmed() + ); break; } "expired" => { - println!("\n⏰ {} {}", "Pairing code expired".bright_yellow().bold(), "Generate a new code to try again".dimmed()); + println!( + "\n⏰ {} {}", + "Pairing code expired".bright_yellow().bold(), + "Generate a new code to try again".dimmed() + ); break; } _ => { @@ -984,50 +1070,84 @@ pub async fn handle_pairing_command( } } } - + PairingCommands::Join { code } => { let pairing_code = if let Some(code) = code { code } else { - println!("📥 {} {}", "Enter Pairing Code".bright_cyan().bold(), "From the other device".dimmed()); + println!( + "📥 {} {}", + "Enter Pairing Code".bright_cyan().bold(), + "From the other device".dimmed() + ); println!("Format: word1 word2 word3 word4 word5 word6 word7 word8 word9 word10 word11 word12"); print!("> "); - + use std::io::{self, Write}; io::stdout().flush()?; - + let mut input = String::new(); io::stdin().read_line(&mut input)?; input.trim().to_string() }; - + println!("🔍 Connecting to device..."); - - match client.send_command(DaemonCommand::StartPairingAsJoiner { code: pairing_code }).await? { + + match client + .send_command(DaemonCommand::StartPairingAsJoiner { code: pairing_code }) + .await? + { DaemonResponse::PairingInProgress => { - println!("📡 {} {}", "Searching for device...".bright_blue(), "This may take a moment".dimmed()); - + println!( + "📡 {} {}", + "Searching for device...".bright_blue(), + "This may take a moment".dimmed() + ); + // Wait for pairing completion loop { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - + match client.send_command(DaemonCommand::GetPairingStatus).await? { - DaemonResponse::PairingStatus { status, remote_device } => { + DaemonResponse::PairingStatus { + status, + remote_device, + } => { match status.as_str() { "completed" => { if let Some(device) = remote_device { - println!("\n🎉 {} {}", "Pairing completed successfully!".bright_green().bold(), "Device connected".dimmed()); - println!(" Device: {}", device.device_name.bright_cyan()); - println!(" ID: {}", device.device_id.to_string().bright_blue()); + println!( + "\n🎉 {} {}", + "Pairing completed successfully!" + .bright_green() + .bold(), + "Device connected".dimmed() + ); + println!( + " Device: {}", + device.device_name.bright_cyan() + ); + println!( + " ID: {}", + device.device_id.to_string().bright_blue() + ); } break; } "failed" => { - println!("\n❌ {} {}", "Pairing failed".bright_red().bold(), "Check the pairing code and try again".dimmed()); + println!( + "\n❌ {} {}", + "Pairing failed".bright_red().bold(), + "Check the pairing code and try again".dimmed() + ); break; } "expired" => { - println!("\n⏰ {} {}", "Pairing code expired".bright_yellow().bold(), "Ask for a new pairing code".dimmed()); + println!( + "\n⏰ {} {}", + "Pairing code expired".bright_yellow().bold(), + "Ask for a new pairing code".dimmed() + ); break; } _ => { @@ -1049,19 +1169,33 @@ pub async fn handle_pairing_command( } } } - + PairingCommands::Status => { match client.send_command(DaemonCommand::GetPairingStatus).await? { - DaemonResponse::PairingStatus { status, remote_device } => { - println!("📊 {} {}", "Pairing Status:".bright_cyan().bold(), status.bright_white()); - + DaemonResponse::PairingStatus { + status, + remote_device, + } => { + println!( + "📊 {} {}", + "Pairing Status:".bright_cyan().bold(), + status.bright_white() + ); + if let Some(device) = remote_device { println!(" Remote Device: {}", device.device_name.bright_cyan()); - println!(" Device ID: {}", device.device_id.to_string().bright_blue()); + println!( + " Device ID: {}", + device.device_id.to_string().bright_blue() + ); println!(" Status: {}", device.status.bright_green()); println!(" Last Seen: {}", device.last_seen.bright_yellow()); } else { - println!(" {} {}", "No active pairing session".dimmed(), "Use 'pair generate' or 'pair join' to start pairing".dimmed()); + println!( + " {} {}", + "No active pairing session".dimmed(), + "Use 'pair generate' or 'pair join' to start pairing".dimmed() + ); } } DaemonResponse::Error(err) => { @@ -1072,24 +1206,56 @@ pub async fn handle_pairing_command( } } } - + PairingCommands::ListPending => { - match client.send_command(DaemonCommand::ListPendingPairings).await? { + match client + .send_command(DaemonCommand::ListPendingPairings) + .await? + { DaemonResponse::PendingPairings(requests) => { if requests.is_empty() { - println!("📭 {} {}", "No pending pairing requests".dimmed(), "Devices will appear here when they try to pair".dimmed()); + println!( + "📭 {} {}", + "No pending pairing requests".dimmed(), + "Devices will appear here when they try to pair".dimmed() + ); } else { - println!("📋 {} {}", "Pending Pairing Requests:".bright_cyan().bold(), format!("({} total)", requests.len()).dimmed()); - + println!( + "📋 {} {}", + "Pending Pairing Requests:".bright_cyan().bold(), + format!("({} total)", requests.len()).dimmed() + ); + for request in requests { println!(); - println!(" 🔐 {} {}", "Request ID:".bright_blue(), request.request_id.to_string().bright_white()); - println!(" 📱 {} {}", "Device:".bright_blue(), request.device_name.bright_cyan()); - println!(" 🆔 {} {}", "Device ID:".bright_blue(), request.device_id.to_string().bright_blue()); - println!(" ⏰ {} {}", "Received:".bright_blue(), request.received_at.bright_yellow()); - println!(" 💬 {} {} {}", "Actions:".bright_blue(), - format!("spacedrive network pair accept {}", request.request_id).bright_green(), - format!("spacedrive network pair reject {}", request.request_id).bright_red()); + println!( + " 🔐 {} {}", + "Request ID:".bright_blue(), + request.request_id.to_string().bright_white() + ); + println!( + " 📱 {} {}", + "Device:".bright_blue(), + request.device_name.bright_cyan() + ); + println!( + " 🆔 {} {}", + "Device ID:".bright_blue(), + request.device_id.to_string().bright_blue() + ); + println!( + " ⏰ {} {}", + "Received:".bright_blue(), + request.received_at.bright_yellow() + ); + println!( + " 💬 {} {} {}", + "Actions:".bright_blue(), + format!("spacedrive network pair accept {}", request.request_id) + .bright_green(), + format!("spacedrive network pair reject {}", request.request_id) + .bright_red() + ); } } } @@ -1101,13 +1267,24 @@ pub async fn handle_pairing_command( } } } - + PairingCommands::Accept { request_id } => { - println!("✅ {} {}...", "Accepting pairing request".bright_green(), request_id.to_string().bright_blue()); - - match client.send_command(DaemonCommand::AcceptPairing { request_id }).await? { + println!( + "✅ {} {}...", + "Accepting pairing request".bright_green(), + request_id.to_string().bright_blue() + ); + + match client + .send_command(DaemonCommand::AcceptPairing { request_id }) + .await? + { DaemonResponse::Ok => { - println!("{} {}", "✓".green(), "Pairing request accepted successfully".bright_green()); + println!( + "{} {}", + "✓".green(), + "Pairing request accepted successfully".bright_green() + ); } DaemonResponse::Error(err) => { println!("{} {}", "✗".red(), err); @@ -1117,13 +1294,24 @@ pub async fn handle_pairing_command( } } } - + PairingCommands::Reject { request_id } => { - println!("❌ {} {}...", "Rejecting pairing request".bright_red(), request_id.to_string().bright_blue()); - - match client.send_command(DaemonCommand::RejectPairing { request_id }).await? { + println!( + "❌ {} {}...", + "Rejecting pairing request".bright_red(), + request_id.to_string().bright_blue() + ); + + match client + .send_command(DaemonCommand::RejectPairing { request_id }) + .await? + { DaemonResponse::Ok => { - println!("{} {}", "✓".green(), "Pairing request rejected".bright_yellow()); + println!( + "{} {}", + "✓".green(), + "Pairing request rejected".bright_yellow() + ); } DaemonResponse::Error(err) => { println!("{} {}", "✗".red(), err); @@ -1134,7 +1322,7 @@ pub async fn handle_pairing_command( } } } - + Ok(()) } diff --git a/core-new/src/infrastructure/cli/mod.rs b/core-new/src/infrastructure/cli/mod.rs index 8df091d9b..96cfa1efe 100644 --- a/core-new/src/infrastructure/cli/mod.rs +++ b/core-new/src/infrastructure/cli/mod.rs @@ -110,20 +110,24 @@ pub enum InstanceCommands { pub async fn run() -> Result<(), Box> { let cli = Cli::parse(); - // Set up logging with networking debug support + // Set up logging - respect RUST_LOG environment variable with fallback defaults let log_level = if cli.verbose { "debug" } else { "info" }; - let env_filter = if cli.verbose { - // Enable detailed networking and libp2p logging when verbose - format!( - "sd_core_new={},spacedrive_cli={},libp2p_mdns=debug,libp2p_swarm=debug,libp2p_kad=debug", - log_level, log_level - ) - } else { - format!( - "sd_core_new={},spacedrive_cli={}", - log_level, log_level - ) - }; + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| { + // Fallback to hardcoded filters if RUST_LOG not set + if cli.verbose { + // Enable detailed networking and libp2p logging when verbose + tracing_subscriber::EnvFilter::new(&format!( + "sd_core_new={},spacedrive_cli={},libp2p=debug", + log_level, log_level + )) + } else { + tracing_subscriber::EnvFilter::new(&format!( + "sd_core_new={},spacedrive_cli={}", + log_level, log_level + )) + } + }); tracing_subscriber::fmt() .with_env_filter(env_filter) diff --git a/core-new/src/infrastructure/networking/REMAINING_WORK.md b/core-new/src/infrastructure/networking/REMAINING_WORK.md new file mode 100644 index 000000000..afa1087c5 --- /dev/null +++ b/core-new/src/infrastructure/networking/REMAINING_WORK.md @@ -0,0 +1,174 @@ +# Networking Implementation - Remaining Work + +## Current Status: DHT Discovery ✅ - Pairing Flow ❌ + +The networking system has **successfully implemented DHT-based pairing discovery** but is missing the **actual pairing connection and authentication flow**. + +### What Works ✅ + +1. **DHT Session Advertisement** + - Initiator publishes pairing session to Kademlia DHT + - Session includes peer_id, addresses, device_info, expiration + - BIP39 pairing codes correctly derive session IDs + +2. **DHT Session Discovery** + - Joiner queries DHT using session ID from pairing code + - Successfully finds initiator's advertisement + - Emits `PairingSessionDiscovered` event with peer info + +3. **LibP2P Infrastructure** + - Unified swarm with proper Send/Sync compliance + - Request-response protocols for pairing and messaging + - Comprehensive Kademlia event handling + - Clean command channel architecture + +### What's Missing ❌ + +#### 1. **Peer Connection After Discovery** +**Problem**: After DHT discovery, joiner doesn't connect to initiator + +**Current State**: +```rust +// In event_loop.rs - attempts to dial but doesn't work properly +for address in &addresses { + match swarm.dial(address.clone()) { + Ok(_) => { + println!("Dialing discovered peer {} at {}", peer_id, address); + break; + } + Err(e) => { + println!("Failed to dial {}: {:?}", address, e); + } + } +} +``` + +**Needed**: +- Fix address dialing (current addresses may be invalid: `/ip4/127.0.0.1/tcp/0`) +- Implement proper external address discovery +- Handle connection establishment events properly + +#### 2. **Pairing Request Transmission** +**Problem**: After connection, no pairing request is sent + +**Current State**: Connection established but no pairing message sent + +**Needed**: +```rust +// After connection established, send pairing request +let pairing_request = PairingMessage::Request { + session_id, + device_info: local_device_info, + public_key: identity.public_key_bytes(), +}; + +swarm.behaviour_mut().pairing.send_request(&peer_id, pairing_request); +``` + +#### 3. **Challenge-Response Authentication** +**Problem**: No pairing protocol implementation + +**Current State**: Pairing protocol handler exists but not integrated + +**Needed**: +- Initiator receives pairing request +- Initiator sends cryptographic challenge +- Joiner signs challenge and responds +- Initiator verifies signature and completes pairing +- Both sides store paired device info + +#### 4. **Session State Management** +**Problem**: No tracking of pairing session states + +**Needed**: +- Track pending pairing sessions (who initiated what) +- Map session IDs to active pairing attempts +- Timeout handling for expired sessions +- Proper cleanup of failed pairing attempts + +#### 5. **External Address Discovery** +**Problem**: DHT advertisements contain invalid addresses + +**Current Issue**: +```rust +// Current implementation returns placeholder +pub async fn get_external_addresses(&self) -> Vec { + vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()] // Invalid! +} +``` + +**Needed**: +- Get actual listening addresses from swarm +- Implement STUN/NAT traversal for external addresses +- Filter out non-routable addresses (127.0.0.1, etc.) + +## Implementation Priority + +### High Priority (Required for Basic Pairing) + +1. **Fix External Addresses** (`get_external_addresses()`) + - Extract actual listening addresses from LibP2P swarm + - This will fix the dialing issue + +2. **Connection-Triggered Pairing Request** + - Detect when connection is for pairing vs normal operation + - Send `PairingMessage::Request` after connection established + +3. **Pairing Protocol Integration** + - Wire up existing `PairingProtocolHandler` methods + - Handle request/challenge/response/complete message flow + +### Medium Priority (Production Readiness) + +4. **Session State Tracking** + - Store pending sessions in event loop or core + - Proper timeout and cleanup handling + +5. **Error Recovery** + - Handle network failures during pairing + - Retry logic for failed connections + - User feedback for pairing failures + +### Low Priority (Nice to Have) + +6. **NAT Traversal** + - STUN server integration for external address discovery + - UPnP port mapping for better connectivity + +7. **DHT Record Cleanup** + - Remove expired pairing sessions from DHT + - Garbage collection for old records + +## Code Locations + +**Key files needing changes:** + +- `src/infrastructure/networking/core/mod.rs` - Fix `get_external_addresses()` +- `src/infrastructure/networking/core/event_loop.rs` - Add pairing request after connection +- `src/infrastructure/networking/protocols/pairing.rs` - Complete protocol handler integration +- `src/lib.rs` - Update Core pairing methods to handle async flow + +## Testing Strategy + +Once implemented, test with: + +```bash +# Terminal 1: Start Alice and generate pairing code +RUST_LOG=libp2p=debug ./target/release/spacedrive --instance alice start --enable-networking --foreground +RUST_LOG=libp2p=debug ./target/release/spacedrive network pair generate --instance alice + +# Terminal 2: Start Bob and join Alice's session +RUST_LOG=libp2p=debug ./target/release/spacedrive --instance bob start --enable-networking --foreground +RUST_LOG=libp2p=debug ./target/release/spacedrive network pair join "pairing code words..." --instance bob + +# Expected: Successful pairing with device exchange +``` + +## Architecture Assessment + +The **networking architecture is solid** - the missing pieces are implementation details rather than fundamental design issues: + +✅ **Good**: DHT discovery, event system, protocol framework, error handling +❌ **Missing**: Connection flow, message transmission, address resolution + +Estimated effort: **2-4 hours** to complete basic pairing functionality. \ No newline at end of file diff --git a/core-new/src/infrastructure/networking/core/event_loop.rs b/core-new/src/infrastructure/networking/core/event_loop.rs index 9e679093e..91acddadb 100644 --- a/core-new/src/infrastructure/networking/core/event_loop.rs +++ b/core-new/src/infrastructure/networking/core/event_loop.rs @@ -11,6 +11,7 @@ use crate::infrastructure::networking::{ }; use futures::StreamExt; use libp2p::{ + kad::{self, QueryId, RecordKey}, swarm::{Swarm, SwarmEvent}, Multiaddr, PeerId, }; @@ -26,6 +27,17 @@ pub enum EventLoopCommand { protocol: String, data: Vec, }, + /// Publish a DHT record for pairing session discovery + PublishDhtRecord { + key: RecordKey, + value: Vec, + response_channel: tokio::sync::oneshot::Sender>, + }, + /// Query a DHT record for pairing session discovery + QueryDhtRecord { + key: RecordKey, + response_channel: tokio::sync::oneshot::Sender>, + }, } /// Central event loop for processing all LibP2P events @@ -232,6 +244,38 @@ impl NetworkingEventLoop { println!("Device {} not found or not connected", device_id); } } + EventLoopCommand::PublishDhtRecord { + key, + value, + response_channel, + } => { + // Create a DHT record with the provided key and value + let record = kad::Record::new(key, value); + + // Publish the record to the DHT + match swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { + Ok(query_id) => { + println!("Publishing DHT record with query ID: {:?}", query_id); + let _ = response_channel.send(Ok(query_id)); + } + Err(e) => { + println!("Failed to publish DHT record: {:?}", e); + let _ = response_channel.send(Err(NetworkingError::Protocol(format!("DHT put failed: {:?}", e)))); + } + } + } + EventLoopCommand::QueryDhtRecord { + key, + response_channel, + } => { + // Query the DHT for the record + let query_id = swarm.behaviour_mut().kademlia.get_record(key); + + println!("Querying DHT record with query ID: {:?}", query_id); + + // Send the query ID back to the caller + let _ = response_channel.send(Ok(query_id)); + } } Ok(()) @@ -302,7 +346,105 @@ impl NetworkingEventLoop { ) -> Result<()> { match event { UnifiedBehaviourEvent::Kademlia(kad_event) => { - println!("Kademlia event: {:?}", kad_event); + use libp2p::kad; + match kad_event { + kad::Event::OutboundQueryProgressed { + id, + result: kad::QueryResult::PutRecord(put_result), + .. + } => { + match put_result { + Ok(kad::PutRecordOk { key }) => { + println!("DHT record published successfully: query_id={:?}, key={:?}", id, key); + } + Err(kad::PutRecordError::QuorumFailed { key, success, quorum }) => { + println!("DHT record publish failed: query_id={:?}, key={:?}, success={:?}, quorum={:?}", id, key, success, quorum); + } + Err(kad::PutRecordError::Timeout { key, .. }) => { + println!("DHT record publish timed out: query_id={:?}, key={:?}", id, key); + } + } + } + kad::Event::OutboundQueryProgressed { + id, + result: kad::QueryResult::GetRecord(get_result), + .. + } => { + match get_result { + Ok(kad::GetRecordOk::FoundRecord(record)) => { + println!("DHT record found: query_id={:?}, key={:?}, {} bytes", id, record.record.key, record.record.value.len()); + + // Try to deserialize as pairing advertisement + if let Ok(advertisement) = serde_json::from_slice::(&record.record.value) { + println!("Found pairing advertisement from peer: {:?}", advertisement.peer_id); + + // Convert strings back to libp2p types + if let (Ok(peer_id), Ok(addresses)) = (advertisement.peer_id(), advertisement.addresses()) { + // Extract session ID from the DHT key + if let Ok(session_id_bytes) = record.record.key.as_ref().try_into() { + let session_id = Uuid::from_bytes(session_id_bytes); + + // Emit pairing discovery event + let _ = event_sender.send(NetworkEvent::PairingSessionDiscovered { + session_id, + peer_id, + addresses: addresses.clone(), + device_info: advertisement.device_info.clone(), + }); + + println!("Emitted pairing session discovery event for session: {}", session_id); + + // Automatically connect to the discovered peer + for address in &addresses { + match swarm.dial(address.clone()) { + Ok(_) => { + println!("Dialing discovered peer {} at {}", peer_id, address); + break; // Try only the first successful dial + } + Err(e) => { + println!("Failed to dial {}: {:?}", address, e); + } + } + } + } + } else { + println!("Failed to parse peer_id or addresses from pairing advertisement"); + } + } + } + Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => { + println!("DHT query finished, no additional records: query_id={:?}", id); + } + Err(kad::GetRecordError::NotFound { key, .. }) => { + println!("DHT record not found: query_id={:?}, key={:?}", id, key); + } + Err(kad::GetRecordError::QuorumFailed { key, .. }) => { + println!("DHT query quorum failed: query_id={:?}, key={:?}", id, key); + } + Err(kad::GetRecordError::Timeout { key }) => { + println!("DHT query timed out: query_id={:?}, key={:?}", id, key); + } + } + } + kad::Event::ModeChanged { new_mode } => { + println!("Kademlia mode changed: {:?}", new_mode); + } + kad::Event::RoutingUpdated { peer, .. } => { + println!("Kademlia routing updated: peer={}", peer); + } + kad::Event::UnroutablePeer { peer } => { + println!("Kademlia unroutable peer: {}", peer); + } + kad::Event::RoutablePeer { peer, .. } => { + println!("Kademlia routable peer: {}", peer); + } + kad::Event::PendingRoutablePeer { peer, .. } => { + println!("Kademlia pending routable peer: {}", peer); + } + _ => { + println!("Other Kademlia event: {:?}", kad_event); + } + } } UnifiedBehaviourEvent::Mdns(mdns_event) => { diff --git a/core-new/src/infrastructure/networking/core/mod.rs b/core-new/src/infrastructure/networking/core/mod.rs index 4b9532cac..3d1926b00 100644 --- a/core-new/src/infrastructure/networking/core/mod.rs +++ b/core-new/src/infrastructure/networking/core/mod.rs @@ -12,7 +12,7 @@ use crate::infrastructure::networking::{ utils::NetworkIdentity, NetworkingError, Result, }; -use libp2p::{Multiaddr, PeerId, Swarm}; +use libp2p::{kad::{QueryId, RecordKey}, Multiaddr, PeerId, Swarm}; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; @@ -38,6 +38,12 @@ pub enum NetworkEvent { device_info: DeviceInfo, peer_id: PeerId, }, + PairingSessionDiscovered { + session_id: Uuid, + peer_id: PeerId, + addresses: Vec, + device_info: DeviceInfo, + }, PairingCompleted { device_id: Uuid, device_info: DeviceInfo, @@ -215,6 +221,62 @@ impl NetworkingCore { pub fn device_registry(&self) -> Arc> { self.device_registry.clone() } + + /// Publish a DHT record for pairing session discovery + pub async fn publish_dht_record(&self, key: RecordKey, value: Vec) -> Result { + if let Some(command_sender) = &self.command_sender { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let command = event_loop::EventLoopCommand::PublishDhtRecord { + key, + value, + response_channel: response_tx, + }; + + command_sender.send(command).map_err(|_| { + NetworkingError::ConnectionFailed("Event loop not running".to_string()) + })?; + + response_rx.await.map_err(|_| { + NetworkingError::ConnectionFailed("Failed to receive DHT response".to_string()) + })? + } else { + Err(NetworkingError::ConnectionFailed( + "Networking not started".to_string(), + )) + } + } + + /// Query a DHT record for pairing session discovery + pub async fn query_dht_record(&self, key: RecordKey) -> Result { + if let Some(command_sender) = &self.command_sender { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let command = event_loop::EventLoopCommand::QueryDhtRecord { + key, + response_channel: response_tx, + }; + + command_sender.send(command).map_err(|_| { + NetworkingError::ConnectionFailed("Event loop not running".to_string()) + })?; + + response_rx.await.map_err(|_| { + NetworkingError::ConnectionFailed("Failed to receive DHT response".to_string()) + })? + } else { + Err(NetworkingError::ConnectionFailed( + "Networking not started".to_string(), + )) + } + } + + /// Get external addresses for advertising in DHT records + pub async fn get_external_addresses(&self) -> Vec { + // For now, return local addresses that we're listening on + // In production, this should include external addresses discovered via STUN/UPnP + vec![ + "/ip4/127.0.0.1/tcp/0".parse().unwrap(), // Will be replaced with actual port + ] + } } // Ensure NetworkingCore is Send + Sync for proper async usage diff --git a/core-new/src/infrastructure/networking/protocols/pairing.rs b/core-new/src/infrastructure/networking/protocols/pairing.rs index 9e0700c72..5a23c8e4b 100644 --- a/core-new/src/infrastructure/networking/protocols/pairing.rs +++ b/core-new/src/infrastructure/networking/protocols/pairing.rs @@ -7,6 +7,7 @@ use crate::infrastructure::networking::{ NetworkingError, Result, }; use async_trait::async_trait; +use libp2p::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -265,6 +266,37 @@ pub enum PairingState { Failed { reason: String }, } +/// DHT advertisement for pairing session discovery +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PairingAdvertisement { + /// The peer ID of the initiator (as string for serialization) + pub peer_id: String, + /// The network addresses where the initiator can be reached (as strings for serialization) + pub addresses: Vec, + /// Device information of the initiator + pub device_info: DeviceInfo, + /// When this advertisement expires + pub expires_at: chrono::DateTime, + /// When this advertisement was created + pub created_at: chrono::DateTime, +} + +impl PairingAdvertisement { + /// Convert peer ID string back to PeerId + pub fn peer_id(&self) -> Result { + self.peer_id.parse() + .map_err(|e| NetworkingError::Protocol(format!("Invalid peer ID: {}", e))) + } + + /// Convert address strings back to Multiaddr + pub fn addresses(&self) -> Result> { + self.addresses.iter() + .map(|addr| addr.parse() + .map_err(|e| NetworkingError::Protocol(format!("Invalid address: {}", e)))) + .collect() + } +} + /// Pairing messages #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PairingMessage { @@ -304,6 +336,7 @@ impl PairingProtocolHandler { } /// Start a new pairing session as initiator + /// Returns the session ID which should be advertised via DHT by the caller pub async fn start_pairing_session(&self) -> Result { let session_id = Uuid::new_v4(); let session = PairingSession { @@ -318,9 +351,24 @@ impl PairingProtocolHandler { .write() .await .insert(session_id, session); + + println!("Started pairing session: {}", session_id); Ok(session_id) } + /// Get device info for advertising in DHT records + pub fn get_device_info(&self) -> DeviceInfo { + DeviceInfo { + device_id: self.identity.device_id(), + device_name: "Spacedrive Device".to_string(), // TODO: Get from device manager + device_type: crate::infrastructure::networking::device::DeviceType::Desktop, // TODO: Get from device manager + os_version: std::env::consts::OS.to_string(), + app_version: "0.1.0".to_string(), // TODO: Get from application version + network_fingerprint: self.identity.network_fingerprint(), + last_seen: chrono::Utc::now(), + } + } + /// Cancel a pairing session pub async fn cancel_session(&self, session_id: Uuid) -> Result<()> { self.active_sessions.write().await.remove(&session_id); diff --git a/core-new/src/infrastructure/networking/utils/identity.rs b/core-new/src/infrastructure/networking/utils/identity.rs index 9b0a49f4e..71adbe045 100644 --- a/core-new/src/infrastructure/networking/utils/identity.rs +++ b/core-new/src/infrastructure/networking/utils/identity.rs @@ -3,6 +3,7 @@ use crate::infrastructure::networking::{NetworkingError, Result}; use libp2p::{identity::Keypair, PeerId}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; /// Network identity containing keypair and peer ID #[derive(Clone)] @@ -53,6 +54,30 @@ impl NetworkIdentity { pub fn verify(&self, data: &[u8], signature: &[u8]) -> bool { self.keypair.public().verify(data, signature) } + + /// Get a deterministic device ID from the network identity + pub fn device_id(&self) -> Uuid { + // Create a deterministic UUID from the peer ID + let peer_id_bytes = self.peer_id.to_bytes(); + + // Use the first 16 bytes of the peer ID hash to create a UUID + let mut uuid_bytes = [0u8; 16]; + let hash = blake3::hash(&peer_id_bytes); + uuid_bytes.copy_from_slice(&hash.as_bytes()[..16]); + + Uuid::from_bytes(uuid_bytes) + } + + /// Get network fingerprint for device identification + pub fn network_fingerprint(&self) -> NetworkFingerprint { + let public_key_bytes = self.public_key_bytes(); + let public_key_hash = blake3::hash(&public_key_bytes); + + NetworkFingerprint { + peer_id: self.peer_id.to_string(), + public_key_hash: hex::encode(&public_key_hash.as_bytes()[..16]), + } + } } impl std::fmt::Debug for NetworkIdentity { diff --git a/core-new/src/lib.rs b/core-new/src/lib.rs index 27b77fcfa..a19cb61e6 100644 --- a/core-new/src/lib.rs +++ b/core-new/src/lib.rs @@ -496,7 +496,25 @@ impl Core { let session_id = pairing_code.session_id(); // Start pairing session with the generated session ID - let _session_id = pairing_handler.start_pairing_session().await?; + let actual_session_id = pairing_handler.start_pairing_session().await?; + + // Create pairing advertisement for DHT + let advertisement = networking::protocols::pairing::PairingAdvertisement { + peer_id: service.peer_id().to_string(), + addresses: service.get_external_addresses().await.into_iter() + .map(|addr| addr.to_string()) + .collect(), + device_info: pairing_handler.get_device_info(), + expires_at: chrono::Utc::now() + chrono::Duration::minutes(5), + created_at: chrono::Utc::now(), + }; + + // Publish DHT record for discovery + let key = libp2p::kad::RecordKey::new(&session_id.as_bytes()); + let value = serde_json::to_vec(&advertisement)?; + + let query_id = service.publish_dht_record(key, value).await?; + println!("Published pairing session to DHT: session={}, query_id={:?}", session_id, query_id); let expires_in = 300; // 5 minutes @@ -517,11 +535,18 @@ impl Core { let pairing_code = networking::protocols::pairing::PairingCode::from_string(code)?; let session_id = pairing_code.session_id(); - // Use networking core to join pairing session let service = networking.read().await; - service - .send_message(session_id, "pairing", b"join_request".to_vec()) - .await?; + + // Query DHT for initiator's pairing advertisement + let key = libp2p::kad::RecordKey::new(&session_id.as_bytes()); + let query_id = service.query_dht_record(key).await?; + println!("Querying DHT for pairing session: session={}, query_id={:?}", session_id, query_id); + + // Note: The actual DHT response will be handled in the event loop + // When the record is found, it will contain the peer_id and addresses + // The application should wait for the DHT query to complete and then connect + // For now, we return success - the pairing will continue asynchronously + println!("DHT query initiated for pairing session: {}", session_id); Ok(()) }