diff --git a/core-new/src/bin/core_test_alice.rs b/core-new/src/bin/core_test_alice.rs index a6a301e7b..c1627079e 100644 --- a/core-new/src/bin/core_test_alice.rs +++ b/core-new/src/bin/core_test_alice.rs @@ -8,150 +8,180 @@ use tokio::time::timeout; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { - #[arg(long)] - data_dir: String, + #[arg(long)] + data_dir: String, } #[tokio::main] async fn main() -> Result<(), Box> { - let args = Args::parse(); - - println!("๐ŸŸฆ Alice: Starting Core pairing test"); - println!("๐Ÿ“ Alice: Data dir: {}", args.data_dir); - - // Initialize tracing for debug output - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); - - // Create Core instance - println!("๐Ÿ”ง Alice: Initializing Core..."); - let mut core = match timeout(Duration::from_secs(10), sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir))).await { - Ok(Ok(core)) => { - println!("โœ… Alice: Core initialized successfully"); - core - } - Ok(Err(e)) => { - println!("โŒ Alice: Core initialization failed: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Alice: Core initialization timed out"); - return Err("Core initialization timeout".into()); - } - }; - - // Initialize networking - println!("๐ŸŒ Alice: Initializing networking..."); - match timeout(Duration::from_secs(10), core.init_networking("alice-password")).await { - Ok(Ok(_)) => { - println!("โœ… Alice: Networking initialized successfully"); - } - Ok(Err(e)) => { - println!("โŒ Alice: Networking initialization failed: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Alice: Networking initialization timed out"); - return Err("Networking initialization timeout".into()); - } - } - - // Start pairing as initiator - println!("๐Ÿ”‘ Alice: Starting pairing as initiator..."); - let (pairing_code, expires_in) = match timeout( - Duration::from_secs(15), - core.start_pairing_as_initiator(true) - ).await { - Ok(Ok((code, expires))) => { - println!("โœ… Alice: Pairing code generated: {}... (expires in {}s)", - code.split_whitespace().take(3).collect::>().join(" "), expires); - (code, expires) - } - Ok(Err(e)) => { - println!("โŒ Alice: Pairing code generation failed: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Alice: Pairing code generation timed out"); - return Err("Pairing code generation timeout".into()); - } - }; - - // Write pairing code to shared file for Bob to read - let shared_dir = "/tmp/spacedrive-pairing-test"; - std::fs::create_dir_all(shared_dir).expect("Failed to create shared directory"); - let code_file = format!("{}/pairing_code.txt", shared_dir); - match std::fs::write(&code_file, &pairing_code) { - Ok(_) => { - println!("๐Ÿ“ Alice: Pairing code written to {}", code_file); - } - Err(e) => { - println!("โŒ Alice: Failed to write pairing code: {}", e); - return Err(e.into()); - } - } - - // Wait for pairing to complete (Bob should join) - println!("โณ Alice: Waiting for pairing to complete..."); - let mut attempts = 0; - let max_attempts = 30; // 30 seconds - - loop { - if attempts >= max_attempts { - println!("โŒ Alice: Pairing timed out after {} seconds", max_attempts); - return Err("Pairing timeout".into()); - } - - // Check pairing status - match timeout(Duration::from_secs(3), core.get_pairing_status()).await { - Ok(Ok(status)) => { - println!("๐Ÿ” Alice: Pairing status check {} - {} sessions", attempts + 1, status.len()); - - // Check if we have any completed pairings - if !status.is_empty() { - for session in &status { - println!("๐Ÿ“Š Alice: Session state: {:?}", session); - } - - // Look for successful pairing - if status.iter().any(|s| matches!(s.state, sd_core_new::networking::PairingState::Completed { .. })) { - println!("๐ŸŽ‰ Alice: Pairing completed successfully!"); - break; - } - } - } - Ok(Err(e)) => { - println!("โš ๏ธ Alice: Pairing status check failed: {}", e); - } - Err(_) => { - println!("โš ๏ธ Alice: Pairing status check timed out"); - } - } - - attempts += 1; - tokio::time::sleep(Duration::from_secs(1)).await; - } - - // Check connected devices - println!("๐Ÿ”— Alice: Checking connected devices..."); - match timeout(Duration::from_secs(5), core.get_connected_devices()).await { - Ok(Ok(devices)) => { - println!("โœ… Alice: Connected devices: {:?}", devices); - if !devices.is_empty() { - println!("PAIRING_SUCCESS: Alice has {} connected devices", devices.len()); - } else { - println!("โš ๏ธ Alice: No devices connected after pairing"); - } - } - Ok(Err(e)) => { - println!("โŒ Alice: Failed to get connected devices: {}", e); - } - Err(_) => { - println!("โŒ Alice: Get connected devices timed out"); - } - } - - println!("๐Ÿงน Alice: Test completed"); - Ok(()) -} \ No newline at end of file + let args = Args::parse(); + + println!("๐ŸŸฆ Alice: Starting Core pairing test"); + println!("๐Ÿ“ Alice: Data dir: {}", args.data_dir); + + // Initialize tracing for debug output + // tracing_subscriber::fmt() + // .with_max_level(tracing::Level::DEBUG) + // .init(); + + // Create Core instance + println!("๐Ÿ”ง Alice: Initializing Core..."); + let mut core = match timeout( + Duration::from_secs(10), + sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir)), + ) + .await + { + Ok(Ok(core)) => { + println!("โœ… Alice: Core initialized successfully"); + core + } + Ok(Err(e)) => { + println!("โŒ Alice: Core initialization failed: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Alice: Core initialization timed out"); + return Err("Core initialization timeout".into()); + } + }; + + // Initialize networking + println!("๐ŸŒ Alice: Initializing networking..."); + match timeout( + Duration::from_secs(10), + core.init_networking("alice-password"), + ) + .await + { + Ok(Ok(_)) => { + println!("โœ… Alice: Networking initialized successfully"); + } + Ok(Err(e)) => { + println!("โŒ Alice: Networking initialization failed: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Alice: Networking initialization timed out"); + return Err("Networking initialization timeout".into()); + } + } + + // Start pairing as initiator + println!("๐Ÿ”‘ Alice: Starting pairing as initiator..."); + let (pairing_code, expires_in) = match timeout( + Duration::from_secs(15), + core.start_pairing_as_initiator(true), + ) + .await + { + Ok(Ok((code, expires))) => { + println!( + "โœ… Alice: Pairing code generated: {}... (expires in {}s)", + code.split_whitespace() + .take(3) + .collect::>() + .join(" "), + expires + ); + (code, expires) + } + Ok(Err(e)) => { + println!("โŒ Alice: Pairing code generation failed: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Alice: Pairing code generation timed out"); + return Err("Pairing code generation timeout".into()); + } + }; + + // Write pairing code to shared file for Bob to read + let shared_dir = "/tmp/spacedrive-pairing-test"; + std::fs::create_dir_all(shared_dir).expect("Failed to create shared directory"); + let code_file = format!("{}/pairing_code.txt", shared_dir); + match std::fs::write(&code_file, &pairing_code) { + Ok(_) => { + println!("๐Ÿ“ Alice: Pairing code written to {}", code_file); + } + Err(e) => { + println!("โŒ Alice: Failed to write pairing code: {}", e); + return Err(e.into()); + } + } + + // Wait for pairing to complete (Bob should join) + println!("โณ Alice: Waiting for pairing to complete..."); + let mut attempts = 0; + let max_attempts = 30; // 30 seconds + + loop { + if attempts >= max_attempts { + println!("โŒ Alice: Pairing timed out after {} seconds", max_attempts); + return Err("Pairing timeout".into()); + } + + // Check pairing status + match timeout(Duration::from_secs(3), core.get_pairing_status()).await { + Ok(Ok(status)) => { + println!( + "๐Ÿ” Alice: Pairing status check {} - {} sessions", + attempts + 1, + status.len() + ); + + // Check if we have any completed pairings + if !status.is_empty() { + for session in &status { + println!("๐Ÿ“Š Alice: Session state: {:?}", session); + } + + // Look for successful pairing + if status.iter().any(|s| { + matches!( + s.state, + sd_core_new::networking::PairingState::Completed { .. } + ) + }) { + println!("๐ŸŽ‰ Alice: Pairing completed successfully!"); + break; + } + } + } + Ok(Err(e)) => { + println!("โš ๏ธ Alice: Pairing status check failed: {}", e); + } + Err(_) => { + println!("โš ๏ธ Alice: Pairing status check timed out"); + } + } + + attempts += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // Check connected devices + println!("๐Ÿ”— Alice: Checking connected devices..."); + match timeout(Duration::from_secs(5), core.get_connected_devices()).await { + Ok(Ok(devices)) => { + println!("โœ… Alice: Connected devices: {:?}", devices); + if !devices.is_empty() { + println!( + "PAIRING_SUCCESS: Alice has {} connected devices", + devices.len() + ); + } else { + println!("โš ๏ธ Alice: No devices connected after pairing"); + } + } + Ok(Err(e)) => { + println!("โŒ Alice: Failed to get connected devices: {}", e); + } + Err(_) => { + println!("โŒ Alice: Get connected devices timed out"); + } + } + + println!("๐Ÿงน Alice: Test completed"); + Ok(()) +} diff --git a/core-new/src/bin/core_test_bob.rs b/core-new/src/bin/core_test_bob.rs index c5bfbbdda..cce6f4eb4 100644 --- a/core-new/src/bin/core_test_bob.rs +++ b/core-new/src/bin/core_test_bob.rs @@ -8,152 +8,185 @@ use tokio::time::timeout; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { - #[arg(long)] - data_dir: String, + #[arg(long)] + data_dir: String, } #[tokio::main] async fn main() -> Result<(), Box> { - let args = Args::parse(); - - println!("๐ŸŸจ Bob: Starting Core pairing test"); - println!("๐Ÿ“ Bob: Data dir: {}", args.data_dir); - - // Initialize tracing for debug output - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); - - // Create Core instance - println!("๐Ÿ”ง Bob: Initializing Core..."); - let mut core = match timeout(Duration::from_secs(10), sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir))).await { - Ok(Ok(core)) => { - println!("โœ… Bob: Core initialized successfully"); - core - } - Ok(Err(e)) => { - println!("โŒ Bob: Core initialization failed: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Bob: Core initialization timed out"); - return Err("Core initialization timeout".into()); - } - }; - - // Initialize networking - println!("๐ŸŒ Bob: Initializing networking..."); - match timeout(Duration::from_secs(10), core.init_networking("bob-password")).await { - Ok(Ok(_)) => { - println!("โœ… Bob: Networking initialized successfully"); - } - Ok(Err(e)) => { - println!("โŒ Bob: Networking initialization failed: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Bob: Networking initialization timed out"); - return Err("Networking initialization timeout".into()); - } - } - - // Wait for Alice's pairing code - println!("โณ Bob: Waiting for Alice's pairing code..."); - let shared_dir = "/tmp/spacedrive-pairing-test"; - let code_file = format!("{}/pairing_code.txt", shared_dir); - let pairing_code = loop { - match std::fs::read_to_string(&code_file) { - Ok(code) => { - if !code.trim().is_empty() { - println!("โœ… Bob: Found pairing code: {}...", - code.trim().split_whitespace().take(3).collect::>().join(" ")); - break code.trim().to_string(); - } - } - Err(_) => { - // File doesn't exist yet, keep waiting - } - } - - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - // Join pairing using the code - println!("๐Ÿค Bob: Joining pairing with code..."); - match timeout(Duration::from_secs(15), core.start_pairing_as_joiner(&pairing_code)).await { - Ok(Ok(_)) => { - println!("โœ… Bob: Successfully joined pairing"); - } - Ok(Err(e)) => { - println!("โŒ Bob: Failed to join pairing: {}", e); - return Err(e); - } - Err(_) => { - println!("โŒ Bob: Pairing join timed out"); - return Err("Pairing join timeout".into()); - } - } - - // Wait for pairing to complete - println!("โณ Bob: Waiting for pairing to complete..."); - let mut attempts = 0; - let max_attempts = 20; // 20 seconds - - loop { - if attempts >= max_attempts { - println!("โŒ Bob: Pairing timed out after {} seconds", max_attempts); - return Err("Pairing timeout".into()); - } - - // Check pairing status - match timeout(Duration::from_secs(3), core.get_pairing_status()).await { - Ok(Ok(status)) => { - println!("๐Ÿ” Bob: Pairing status check {} - {} sessions", attempts + 1, status.len()); - - // Check if we have any completed pairings - if !status.is_empty() { - for session in &status { - println!("๐Ÿ“Š Bob: Session state: {:?}", session); - } - - // Look for successful pairing - if status.iter().any(|s| matches!(s.state, sd_core_new::networking::PairingState::Completed { .. })) { - println!("๐ŸŽ‰ Bob: Pairing completed successfully!"); - break; - } - } - } - Ok(Err(e)) => { - println!("โš ๏ธ Bob: Pairing status check failed: {}", e); - } - Err(_) => { - println!("โš ๏ธ Bob: Pairing status check timed out"); - } - } - - attempts += 1; - tokio::time::sleep(Duration::from_secs(1)).await; - } - - // Check connected devices - println!("๐Ÿ”— Bob: Checking connected devices..."); - match timeout(Duration::from_secs(5), core.get_connected_devices()).await { - Ok(Ok(devices)) => { - println!("โœ… Bob: Connected devices: {:?}", devices); - if !devices.is_empty() { - println!("PAIRING_SUCCESS: Bob has {} connected devices", devices.len()); - } else { - println!("โš ๏ธ Bob: No devices connected after pairing"); - } - } - Ok(Err(e)) => { - println!("โŒ Bob: Failed to get connected devices: {}", e); - } - Err(_) => { - println!("โŒ Bob: Get connected devices timed out"); - } - } - - println!("๐Ÿงน Bob: Test completed"); - Ok(()) -} \ No newline at end of file + let args = Args::parse(); + + println!("๐ŸŸจ Bob: Starting Core pairing test"); + println!("๐Ÿ“ Bob: Data dir: {}", args.data_dir); + + // Initialize tracing for debug output + // tracing_subscriber::fmt() + // .with_max_level(tracing::Level::DEBUG) + // .init(); + + // Create Core instance + println!("๐Ÿ”ง Bob: Initializing Core..."); + let mut core = match timeout( + Duration::from_secs(10), + sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir)), + ) + .await + { + Ok(Ok(core)) => { + println!("โœ… Bob: Core initialized successfully"); + core + } + Ok(Err(e)) => { + println!("โŒ Bob: Core initialization failed: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Bob: Core initialization timed out"); + return Err("Core initialization timeout".into()); + } + }; + + // Initialize networking + println!("๐ŸŒ Bob: Initializing networking..."); + match timeout( + Duration::from_secs(10), + core.init_networking("bob-password"), + ) + .await + { + Ok(Ok(_)) => { + println!("โœ… Bob: Networking initialized successfully"); + } + Ok(Err(e)) => { + println!("โŒ Bob: Networking initialization failed: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Bob: Networking initialization timed out"); + return Err("Networking initialization timeout".into()); + } + } + + // Wait for Alice's pairing code + println!("โณ Bob: Waiting for Alice's pairing code..."); + let shared_dir = "/tmp/spacedrive-pairing-test"; + let code_file = format!("{}/pairing_code.txt", shared_dir); + let pairing_code = loop { + match std::fs::read_to_string(&code_file) { + Ok(code) => { + if !code.trim().is_empty() { + println!( + "โœ… Bob: Found pairing code: {}...", + code.trim() + .split_whitespace() + .take(3) + .collect::>() + .join(" ") + ); + break code.trim().to_string(); + } + } + Err(_) => { + // File doesn't exist yet, keep waiting + } + } + + tokio::time::sleep(Duration::from_millis(100)).await; + }; + + // Join pairing using the code + println!("๐Ÿค Bob: Joining pairing with code..."); + match timeout( + Duration::from_secs(15), + core.start_pairing_as_joiner(&pairing_code), + ) + .await + { + Ok(Ok(_)) => { + println!("โœ… Bob: Successfully joined pairing"); + } + Ok(Err(e)) => { + println!("โŒ Bob: Failed to join pairing: {}", e); + return Err(e); + } + Err(_) => { + println!("โŒ Bob: Pairing join timed out"); + return Err("Pairing join timeout".into()); + } + } + + // Wait for pairing to complete + println!("โณ Bob: Waiting for pairing to complete..."); + let mut attempts = 0; + let max_attempts = 20; // 20 seconds + + loop { + if attempts >= max_attempts { + println!("โŒ Bob: Pairing timed out after {} seconds", max_attempts); + return Err("Pairing timeout".into()); + } + + // Check pairing status + match timeout(Duration::from_secs(3), core.get_pairing_status()).await { + Ok(Ok(status)) => { + println!( + "๐Ÿ” Bob: Pairing status check {} - {} sessions", + attempts + 1, + status.len() + ); + + // Check if we have any completed pairings + if !status.is_empty() { + for session in &status { + println!("๐Ÿ“Š Bob: Session state: {:?}", session); + } + + // Look for successful pairing + if status.iter().any(|s| { + matches!( + s.state, + sd_core_new::networking::PairingState::Completed { .. } + ) + }) { + println!("๐ŸŽ‰ Bob: Pairing completed successfully!"); + break; + } + } + } + Ok(Err(e)) => { + println!("โš ๏ธ Bob: Pairing status check failed: {}", e); + } + Err(_) => { + println!("โš ๏ธ Bob: Pairing status check timed out"); + } + } + + attempts += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // Check connected devices + println!("๐Ÿ”— Bob: Checking connected devices..."); + match timeout(Duration::from_secs(5), core.get_connected_devices()).await { + Ok(Ok(devices)) => { + println!("โœ… Bob: Connected devices: {:?}", devices); + if !devices.is_empty() { + println!( + "PAIRING_SUCCESS: Bob has {} connected devices", + devices.len() + ); + } else { + println!("โš ๏ธ Bob: No devices connected after pairing"); + } + } + Ok(Err(e)) => { + println!("โŒ Bob: Failed to get connected devices: {}", e); + } + Err(_) => { + println!("โŒ Bob: Get connected devices timed out"); + } + } + + println!("๐Ÿงน Bob: Test completed"); + Ok(()) +} diff --git a/core-new/src/infrastructure/networking/core/event_loop.rs b/core-new/src/infrastructure/networking/core/event_loop.rs index 0f6f1c7b2..013e91ead 100644 --- a/core-new/src/infrastructure/networking/core/event_loop.rs +++ b/core-new/src/infrastructure/networking/core/event_loop.rs @@ -498,15 +498,15 @@ impl NetworkingEventLoop { Ok(()) } - /// Attempt direct pairing when discovering peers via mDNS - /// Returns the number of pairing requests sent - async fn attempt_direct_pairing_on_mdns_discovery( + /// Schedule pairing requests for mDNS discovered peers (wait for connection establishment) + /// Returns the number of pairing sessions scheduled for connection + async fn schedule_pairing_on_mdns_discovery( protocol_registry: &Arc>, identity: &NetworkIdentity, - swarm: &mut Swarm, discovered_peer_id: PeerId, + pending_pairing_connections: &mut std::collections::HashMap)>, ) -> Result { - let mut requests_sent = 0; + let mut sessions_scheduled = 0; // Get pairing handler from protocol registry with proper error handling let registry = protocol_registry.read().await; @@ -531,44 +531,52 @@ impl NetworkingEventLoop { // Process each session that's actively scanning for peers for session in &active_sessions { - // Only send requests for sessions where we're actively scanning (Bob's role) + // Only schedule requests for sessions where we're actively scanning (Bob's role) if matches!(session.state, crate::infrastructure::networking::protocols::pairing::PairingState::Scanning) { - println!("๐Ÿ” Found scanning session {} - sending pairing request to peer {}", session.id, discovered_peer_id); + println!("๐Ÿ” Found scanning session {} - scheduling pairing request for peer {} (waiting for connection)", session.id, discovered_peer_id); - // Create pairing request message - let pairing_request = super::behavior::PairingMessage::PairingRequest { - session_id: session.id, + // Create device info for this session + let device_info = crate::infrastructure::networking::device::DeviceInfo { device_id: identity.device_id(), device_name: Self::get_device_name_for_pairing(), - public_key: identity.public_key_bytes(), + device_type: crate::infrastructure::networking::device::DeviceType::Desktop, + os_version: std::env::consts::OS.to_string(), + app_version: env!("CARGO_PKG_VERSION").to_string(), + network_fingerprint: identity.network_fingerprint(), + last_seen: chrono::Utc::now(), }; - // Send pairing request directly to the discovered peer - let request_id = swarm.behaviour_mut().pairing.send_request(&discovered_peer_id, pairing_request); - println!("โœ… mDNS Direct Pairing: Sent request to peer {} for session {} (request_id: {:?})", - discovered_peer_id, session.id, request_id); + // Add to pending connections - pairing request will be sent after connection establishment + // Using 5-minute timeout (300 seconds) and current time + pending_pairing_connections.insert( + session.id, + (discovered_peer_id, device_info, 300, chrono::Utc::now()) + ); - requests_sent += 1; + println!("โœ… mDNS Discovery: Scheduled pairing request for session {} with peer {} (pending connection)", + session.id, discovered_peer_id); + + sessions_scheduled += 1; } else { // Log other session states for debugging match &session.state { crate::infrastructure::networking::protocols::pairing::PairingState::WaitingForConnection => { - // This is Alice waiting for Bob - don't send requests - println!("๐Ÿ” Found waiting session {} (Alice side) - not sending request", session.id); + // This is Alice waiting for Bob - don't schedule requests + println!("๐Ÿ” Found waiting session {} (Alice side) - not scheduling request", session.id); } _ => { // Other states like Completed, Failed, etc. - println!("๐Ÿ” Found session {} in state {:?} - not sending request", session.id, session.state); + println!("๐Ÿ” Found session {} in state {:?} - not scheduling request", session.id, session.state); } } } } - if requests_sent == 0 && !active_sessions.is_empty() { + if sessions_scheduled == 0 && !active_sessions.is_empty() { println!("๐Ÿ” mDNS Discovery: Found {} active sessions but none in Scanning state", active_sessions.len()); } - Ok(requests_sent) + Ok(sessions_scheduled) } /// Get device name for pairing (production-ready with fallback) @@ -746,21 +754,21 @@ impl NetworkingEventLoop { println!("Bootstrapping Kademlia DHT with query ID: {:?}", query_id); } - // PRODUCTION: Check if we have active pairing sessions and attempt direct pairing + // PRODUCTION: Schedule pairing requests for mDNS discovered peers (wait for connection) // This handles the case where Bob discovers Alice via mDNS during pairing - match Self::attempt_direct_pairing_on_mdns_discovery( + match Self::schedule_pairing_on_mdns_discovery( &protocol_registry, &identity, - swarm, peer_id, + pending_pairing_connections, ).await { - Ok(requests_sent) => { - if requests_sent > 0 { - println!("๐Ÿ” mDNS Discovery: Sent {} direct pairing requests to peer {}", requests_sent, peer_id); + Ok(sessions_scheduled) => { + if sessions_scheduled > 0 { + println!("๐Ÿ” mDNS Discovery: Scheduled {} pairing sessions for peer {} (waiting for connection)", sessions_scheduled, peer_id); } } Err(e) => { - println!("โš ๏ธ mDNS Discovery: Failed to attempt direct pairing with peer {}: {}", peer_id, e); + println!("โš ๏ธ mDNS Discovery: Failed to schedule pairing with peer {}: {}", peer_id, e); } } diff --git a/core-new/src/infrastructure/networking/protocols/pairing.rs b/core-new/src/infrastructure/networking/protocols/pairing.rs index adc18be67..6f1413f1b 100644 --- a/core-new/src/infrastructure/networking/protocols/pairing.rs +++ b/core-new/src/infrastructure/networking/protocols/pairing.rs @@ -207,8 +207,10 @@ impl PairingCode { /// Derive session ID from secret fn derive_session_id(secret: &[u8; 32]) -> Uuid { - // Use BLAKE3 to derive UUID from secret - let hash = blake3::hash(secret); + // For pairing codes, derive session ID from the entropy that survives BIP39 round-trip + // This ensures Alice (who generates) and Bob (who parses) get the same session ID + // This is critical for DHT-based pairing where session IDs must match + let hash = blake3::hash(&secret[..16]); // Use only the first 16 bytes (BIP39 entropy) let bytes = hash.as_bytes(); let uuid_bytes = [ diff --git a/core-new/src/lib.rs b/core-new/src/lib.rs index 3264339d0..c32fa9f22 100644 --- a/core-new/src/lib.rs +++ b/core-new/src/lib.rs @@ -570,17 +570,35 @@ impl Core { println!("โณ Waiting for mDNS discovery to trigger pairing requests..."); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - // Hybrid approach: Try local pairing first, then DHT fallback for remote pairing + // Unified Pairing Flow: Support both mDNS (local) and DHT (remote) simultaneously + // Both methods run in parallel, first successful response completes pairing - // 1. Attempt direct pairing with any currently connected peers (local pairing) - // This handles the common case where Alice and Bob are on the same network + println!("๐Ÿ”„ Starting unified pairing flow for session: {}", session_id); + + // Method 1: mDNS-based local pairing (already handled by event loop) + // The event loop automatically detects mDNS peers and schedules pairing requests + // This handles Alice and Bob on the same network + println!("๐Ÿ“ก mDNS pairing: Listening for local network discoveries..."); + + // Method 2: DHT-based remote pairing (for cross-network scenarios) + // Query DHT for Alice's published session record + println!("๐ŸŒ DHT pairing: Querying distributed hash table..."); + let key = libp2p::kad::RecordKey::new(&session_id.as_bytes()); + match service.query_dht_record(key).await { + Ok(query_id) => { + println!("๐Ÿ” DHT Query initiated: session={}, query_id={:?}", session_id, query_id); + } + Err(e) => { + println!("โš ๏ธ DHT Query failed: {}", e); + } + } + + // Method 3: Direct requests to any currently connected peers (immediate attempt) + // This covers cases where Alice is already connected but not yet paired let connected_peers = service.get_connected_peers().await; if !connected_peers.is_empty() { - println!("Attempting direct pairing with {} connected peers for session: {}", - connected_peers.len(), session_id); + println!("๐Ÿ”— Direct pairing: Sending requests to {} connected peers", connected_peers.len()); - // Send pairing requests to all connected peers - // One of them might be Alice with the matching session for peer_id in connected_peers { let pairing_request = networking::core::behavior::PairingMessage::PairingRequest { session_id, @@ -594,36 +612,32 @@ impl Core { "pairing", serde_json::to_vec(&pairing_request).unwrap_or_default() ).await { - Ok(_) => println!("Sent direct pairing request to peer: {}", peer_id), - Err(e) => println!("Failed to send pairing request to {}: {}", peer_id, e), + Ok(_) => println!("๐Ÿ“ค Sent direct pairing request to peer: {}", peer_id), + Err(e) => println!("โŒ Failed to send pairing request to {}: {}", peer_id, e), } } } - // 2. Query DHT for remote pairing (primary method when mDNS fails) - let key = libp2p::kad::RecordKey::new(&session_id.as_bytes()); - let query_id = service.query_dht_record(key).await?; - println!("๐Ÿ” Querying DHT for pairing session: session={}, query_id={:?}", session_id, query_id); - - // 3. Add periodic DHT queries as backup in case the first query fails - // This is important for test environments where mDNS might not work + // Add periodic DHT retries for reliability in challenging network conditions let networking_ref = networking.clone(); let session_id_clone = session_id; tokio::spawn(async move { for i in 1..=3 { - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; let key = libp2p::kad::RecordKey::new(&session_id_clone.as_bytes()); let service = networking_ref.read().await; match service.query_dht_record(key).await { Ok(query_id) => { - println!("๐Ÿ” DHT Retry {}: Querying for session {} (query_id: {:?})", i, session_id_clone, query_id); + println!("๐Ÿ”„ DHT Retry {}: session={}, query_id={:?}", i, session_id_clone, query_id); } Err(e) => { - println!("โš ๏ธ DHT Retry {}: Failed to query session {}: {}", i, session_id_clone, e); + println!("โš ๏ธ DHT Retry {} failed: {}", i, e); } } } }); + + println!("โœ… Unified pairing flow initiated - waiting for responses from any method..."); Ok(()) } diff --git a/core-new/tests/cli_pairing_integration_test.rs b/core-new/tests/cli_pairing_integration_test.rs deleted file mode 100644 index 05d5ecce6..000000000 --- a/core-new/tests/cli_pairing_integration_test.rs +++ /dev/null @@ -1,281 +0,0 @@ -//! Real CLI command integration test for device pairing -//! This test spawns actual CLI processes using instances and tests the real user workflow - -use std::process::Command; -use std::time::Duration; -use tokio::time::timeout; -use tempfile::TempDir; - -struct CliInstance { - name: String, - data_dir: String, - cli_binary: String, -} - -impl CliInstance { - fn new(name: &str, data_dir: &str) -> Self { - Self { - name: name.to_string(), - data_dir: data_dir.to_string(), - cli_binary: "./target/debug/spacedrive".to_string(), - } - } - - fn run_command(&self, args: &[&str]) -> Result { - let mut full_args = vec![ - "--data-dir", &self.data_dir, - "--instance", &self.name, - ]; - full_args.extend_from_slice(args); - - Command::new(&self.cli_binary) - .args(&full_args) - .output() - } - - async fn run_command_with_timeout(&self, args: &[&str], timeout_secs: u64) -> Result { - let timeout_duration = Duration::from_secs(timeout_secs); - - match timeout(timeout_duration, async { - self.run_command(args) - }).await { - Ok(result) => result.map_err(|e| format!("Command failed: {}", e)), - Err(_) => Err(format!("Command timed out after {} seconds", timeout_secs)) - } - } - - fn start_daemon(&self) -> Result<(), String> { - let output = self.run_command(&["start", "--enable-networking"]) - .map_err(|e| format!("Failed to start daemon: {}", e))?; - - if !output.status.success() { - return Err(format!("Daemon start failed: {}", String::from_utf8_lossy(&output.stderr))); - } - - println!("{} daemon started: {}", self.name, String::from_utf8_lossy(&output.stdout)); - Ok(()) - } - - fn stop_daemon(&self) { - let _ = self.run_command(&["instance", "stop", &self.name]); - } - - fn check_status(&self) -> Result { - let output = self.run_command(&["status"]) - .map_err(|e| format!("Failed to check status: {}", e))?; - - Ok(String::from_utf8_lossy(&output.stdout).to_string()) - } - - fn init_networking(&self, password: &str) -> Result<(), String> { - let output = self.run_command(&["network", "init", "--password", password]) - .map_err(|e| format!("Failed to init networking: {}", e))?; - - if !output.status.success() { - return Err(format!("Networking init failed: {}", String::from_utf8_lossy(&output.stderr))); - } - - println!("{} networking initialized: {}", self.name, String::from_utf8_lossy(&output.stdout)); - Ok(()) - } - - async fn generate_pairing_code(&self) -> Result<(String, tokio::process::Child), String> { - use tokio::process::Command as TokioCommand; - use tokio::io::{AsyncBufReadExt, BufReader}; - - let mut full_args = vec![ - "--data-dir", &self.data_dir, - "--instance", &self.name, - "network", "pair", "generate", "--auto-accept" - ]; - - let mut cmd = TokioCommand::new(&self.cli_binary) - .args(&full_args) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .map_err(|e| format!("Failed to spawn pairing command: {}", e))?; - - let stdout = cmd.stdout.take().unwrap(); - let mut reader = BufReader::new(stdout); - let mut line = String::new(); - - // Read lines until we get the pairing code - let mut pairing_code = None; - let timeout_duration = tokio::time::Duration::from_secs(15); - - match tokio::time::timeout(timeout_duration, async { - while reader.read_line(&mut line).await.unwrap_or(0) > 0 { - println!("{} output: {}", self.name, line.trim()); - - // Look for the pairing code line - if let Some(code) = extract_pairing_code(&line) { - pairing_code = Some(code); - break; - } - line.clear(); - } - }).await { - Ok(_) => {}, - Err(_) => return Err("Timeout waiting for pairing code".to_string()), - } - - match pairing_code { - Some(code) => Ok((code, cmd)), - None => Err("Failed to extract pairing code from output".to_string()), - } - } - - async fn join_pairing(&self, code: &str) -> Result<(), String> { - let output = self.run_command_with_timeout(&["network", "pair", "join", "--code", code], 30).await?; - - let stdout = String::from_utf8_lossy(&output.stdout); - let stderr = String::from_utf8_lossy(&output.stderr); - - println!("{} join output:\n{}", self.name, stdout); - if !stderr.is_empty() { - println!("{} join stderr:\n{}", self.name, stderr); - } - - if !output.status.success() { - return Err(format!("Pairing join failed: {}", stderr)); - } - - // Check for success indicators in output - if stdout.contains("โœ“") || stdout.contains("success") || stdout.contains("paired") { - Ok(()) - } else { - Err("No success indicator found in output".to_string()) - } - } -} - -#[tokio::test] -async fn test_cli_pairing_real_commands() { - println!("๐Ÿงช Testing real CLI pairing commands with instances"); - - // Create temporary directories for Alice and Bob - let alice_dir = TempDir::new().expect("Failed to create Alice temp dir"); - let bob_dir = TempDir::new().expect("Failed to create Bob temp dir"); - - let alice = CliInstance::new("alice", alice_dir.path().to_str().unwrap()); - let bob = CliInstance::new("bob", bob_dir.path().to_str().unwrap()); - - println!("๐Ÿ“ Alice data dir: {:?}", alice_dir.path()); - println!("๐Ÿ“ Bob data dir: {:?}", bob_dir.path()); - - // Build the CLI binary first - println!("๐Ÿ”จ Building CLI binary..."); - let build_result = Command::new("cargo") - .args(&["build", "--bin", "spacedrive"]) - .output() - .expect("Failed to build CLI binary"); - - if !build_result.status.success() { - panic!("Failed to build CLI binary: {}", String::from_utf8_lossy(&build_result.stderr)); - } - - // Cleanup function to stop daemons - let cleanup = || { - alice.stop_daemon(); - bob.stop_daemon(); - }; - - // Start both daemons - println!("๐ŸŸฆ Starting Alice daemon..."); - if let Err(e) = alice.start_daemon() { - cleanup(); - panic!("Failed to start Alice daemon: {}", e); - } - - println!("๐ŸŸจ Starting Bob daemon..."); - if let Err(e) = bob.start_daemon() { - cleanup(); - panic!("Failed to start Bob daemon: {}", e); - } - - // Give daemons time to start - tokio::time::sleep(Duration::from_secs(5)).await; - - // Check daemon status - println!("๐Ÿ” Checking daemon status..."); - match alice.check_status() { - Ok(status) => println!("Alice status: {}", status.lines().take(3).collect::>().join(" ")), - Err(e) => println!("Alice status error: {}", e), - } - - match bob.check_status() { - Ok(status) => println!("Bob status: {}", status.lines().take(3).collect::>().join(" ")), - Err(e) => println!("Bob status error: {}", e), - } - - // Initialize networking for both - println!("๐Ÿ”ง Initializing networking..."); - if let Err(e) = alice.init_networking("alice-password") { - cleanup(); - panic!("Failed to initialize Alice networking: {}", e); - } - - if let Err(e) = bob.init_networking("bob-password") { - cleanup(); - panic!("Failed to initialize Bob networking: {}", e); - } - - // Give networking time to initialize - tokio::time::sleep(Duration::from_secs(2)).await; - - // Alice generates pairing code (starts background process) - println!("๐Ÿ”‘ Alice generating pairing code..."); - let (pairing_code, mut alice_process) = match alice.generate_pairing_code().await { - Ok((code, process)) => (code, process), - Err(e) => { - cleanup(); - panic!("Alice pairing code generation failed: {}", e); - } - }; - - println!("๐Ÿ”— Extracted pairing code: {}...", - pairing_code.split_whitespace().take(3).collect::>().join(" ")); - - // Bob joins using the pairing code - println!("๐Ÿค Bob joining with pairing code..."); - match bob.join_pairing(&pairing_code).await { - Ok(_) => { - println!("โœ… CLI pairing test successful!"); - } - Err(e) => { - // Kill Alice's pairing process before cleanup - let _ = alice_process.kill().await; - cleanup(); - panic!("Bob pairing failed: {}", e); - } - } - - // Wait a moment for pairing to complete on Alice's side - tokio::time::sleep(Duration::from_secs(2)).await; - - // Kill Alice's pairing process - let _ = alice_process.kill().await; - - // Cleanup - cleanup(); - println!("๐Ÿงน Cleaned up daemon instances"); -} - -/// Extract the pairing code from CLI output -fn extract_pairing_code(output: &str) -> Option { - // Look for the pairing code in the CLI output - // The CLI outputs it in a specific format after "Your Pairing Code:" - for line in output.lines() { - let trimmed = line.trim(); - // Look for a line with 12 words (the pairing code format) - let words: Vec<&str> = trimmed.split_whitespace().collect(); - if words.len() == 12 { - // Verify these look like BIP39 words (basic check - all lowercase alphabetic) - if words.iter().all(|w| w.chars().all(|c| c.is_ascii_lowercase())) { - return Some(words.join(" ")); - } - } - } - None -} \ No newline at end of file