Enhance unified pairing flow with improved logging and DHT integration

- Implemented a unified pairing flow that supports both mDNS (local) and DHT (remote) pairing methods running in parallel.
- Added detailed logging for each pairing method, including success and failure messages.
- Updated the event loop to schedule pairing requests for mDNS discovered peers, ensuring requests are sent after connection establishment.
- Enhanced DHT query handling with retries for improved reliability in challenging network conditions.
- Refactored pairing code generation to derive session IDs from BIP39 entropy for consistency across devices.

This update significantly improves the robustness and user experience of the pairing process, ensuring better connectivity and clearer feedback during pairing operations.
This commit is contained in:
Jamie Pine
2025-06-22 23:22:07 -07:00
parent 78e119eef6
commit 6048e655ae
6 changed files with 424 additions and 618 deletions

View File

@@ -8,150 +8,180 @@ use tokio::time::timeout;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long)]
data_dir: String,
#[arg(long)]
data_dir: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
println!("🟦 Alice: Starting Core pairing test");
println!("📁 Alice: Data dir: {}", args.data_dir);
// Initialize tracing for debug output
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
// Create Core instance
println!("🔧 Alice: Initializing Core...");
let mut core = match timeout(Duration::from_secs(10), sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir))).await {
Ok(Ok(core)) => {
println!("✅ Alice: Core initialized successfully");
core
}
Ok(Err(e)) => {
println!("❌ Alice: Core initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Core initialization timed out");
return Err("Core initialization timeout".into());
}
};
// Initialize networking
println!("🌐 Alice: Initializing networking...");
match timeout(Duration::from_secs(10), core.init_networking("alice-password")).await {
Ok(Ok(_)) => {
println!("✅ Alice: Networking initialized successfully");
}
Ok(Err(e)) => {
println!("❌ Alice: Networking initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Networking initialization timed out");
return Err("Networking initialization timeout".into());
}
}
// Start pairing as initiator
println!("🔑 Alice: Starting pairing as initiator...");
let (pairing_code, expires_in) = match timeout(
Duration::from_secs(15),
core.start_pairing_as_initiator(true)
).await {
Ok(Ok((code, expires))) => {
println!("✅ Alice: Pairing code generated: {}... (expires in {}s)",
code.split_whitespace().take(3).collect::<Vec<_>>().join(" "), expires);
(code, expires)
}
Ok(Err(e)) => {
println!("❌ Alice: Pairing code generation failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Pairing code generation timed out");
return Err("Pairing code generation timeout".into());
}
};
// Write pairing code to shared file for Bob to read
let shared_dir = "/tmp/spacedrive-pairing-test";
std::fs::create_dir_all(shared_dir).expect("Failed to create shared directory");
let code_file = format!("{}/pairing_code.txt", shared_dir);
match std::fs::write(&code_file, &pairing_code) {
Ok(_) => {
println!("📝 Alice: Pairing code written to {}", code_file);
}
Err(e) => {
println!("❌ Alice: Failed to write pairing code: {}", e);
return Err(e.into());
}
}
// Wait for pairing to complete (Bob should join)
println!("⏳ Alice: Waiting for pairing to complete...");
let mut attempts = 0;
let max_attempts = 30; // 30 seconds
loop {
if attempts >= max_attempts {
println!("❌ Alice: Pairing timed out after {} seconds", max_attempts);
return Err("Pairing timeout".into());
}
// Check pairing status
match timeout(Duration::from_secs(3), core.get_pairing_status()).await {
Ok(Ok(status)) => {
println!("🔍 Alice: Pairing status check {} - {} sessions", attempts + 1, status.len());
// Check if we have any completed pairings
if !status.is_empty() {
for session in &status {
println!("📊 Alice: Session state: {:?}", session);
}
// Look for successful pairing
if status.iter().any(|s| matches!(s.state, sd_core_new::networking::PairingState::Completed { .. })) {
println!("🎉 Alice: Pairing completed successfully!");
break;
}
}
}
Ok(Err(e)) => {
println!("⚠️ Alice: Pairing status check failed: {}", e);
}
Err(_) => {
println!("⚠️ Alice: Pairing status check timed out");
}
}
attempts += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Check connected devices
println!("🔗 Alice: Checking connected devices...");
match timeout(Duration::from_secs(5), core.get_connected_devices()).await {
Ok(Ok(devices)) => {
println!("✅ Alice: Connected devices: {:?}", devices);
if !devices.is_empty() {
println!("PAIRING_SUCCESS: Alice has {} connected devices", devices.len());
} else {
println!("⚠️ Alice: No devices connected after pairing");
}
}
Ok(Err(e)) => {
println!("❌ Alice: Failed to get connected devices: {}", e);
}
Err(_) => {
println!("❌ Alice: Get connected devices timed out");
}
}
println!("🧹 Alice: Test completed");
Ok(())
}
let args = Args::parse();
println!("🟦 Alice: Starting Core pairing test");
println!("📁 Alice: Data dir: {}", args.data_dir);
// Initialize tracing for debug output
// tracing_subscriber::fmt()
// .with_max_level(tracing::Level::DEBUG)
// .init();
// Create Core instance
println!("🔧 Alice: Initializing Core...");
let mut core = match timeout(
Duration::from_secs(10),
sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir)),
)
.await
{
Ok(Ok(core)) => {
println!("✅ Alice: Core initialized successfully");
core
}
Ok(Err(e)) => {
println!("❌ Alice: Core initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Core initialization timed out");
return Err("Core initialization timeout".into());
}
};
// Initialize networking
println!("🌐 Alice: Initializing networking...");
match timeout(
Duration::from_secs(10),
core.init_networking("alice-password"),
)
.await
{
Ok(Ok(_)) => {
println!("✅ Alice: Networking initialized successfully");
}
Ok(Err(e)) => {
println!(" Alice: Networking initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Networking initialization timed out");
return Err("Networking initialization timeout".into());
}
}
// Start pairing as initiator
println!("🔑 Alice: Starting pairing as initiator...");
let (pairing_code, expires_in) = match timeout(
Duration::from_secs(15),
core.start_pairing_as_initiator(true),
)
.await
{
Ok(Ok((code, expires))) => {
println!(
"✅ Alice: Pairing code generated: {}... (expires in {}s)",
code.split_whitespace()
.take(3)
.collect::<Vec<_>>()
.join(" "),
expires
);
(code, expires)
}
Ok(Err(e)) => {
println!("❌ Alice: Pairing code generation failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Alice: Pairing code generation timed out");
return Err("Pairing code generation timeout".into());
}
};
// Write pairing code to shared file for Bob to read
let shared_dir = "/tmp/spacedrive-pairing-test";
std::fs::create_dir_all(shared_dir).expect("Failed to create shared directory");
let code_file = format!("{}/pairing_code.txt", shared_dir);
match std::fs::write(&code_file, &pairing_code) {
Ok(_) => {
println!("📝 Alice: Pairing code written to {}", code_file);
}
Err(e) => {
println!("❌ Alice: Failed to write pairing code: {}", e);
return Err(e.into());
}
}
// Wait for pairing to complete (Bob should join)
println!(" Alice: Waiting for pairing to complete...");
let mut attempts = 0;
let max_attempts = 30; // 30 seconds
loop {
if attempts >= max_attempts {
println!("❌ Alice: Pairing timed out after {} seconds", max_attempts);
return Err("Pairing timeout".into());
}
// Check pairing status
match timeout(Duration::from_secs(3), core.get_pairing_status()).await {
Ok(Ok(status)) => {
println!(
"🔍 Alice: Pairing status check {} - {} sessions",
attempts + 1,
status.len()
);
// Check if we have any completed pairings
if !status.is_empty() {
for session in &status {
println!("📊 Alice: Session state: {:?}", session);
}
// Look for successful pairing
if status.iter().any(|s| {
matches!(
s.state,
sd_core_new::networking::PairingState::Completed { .. }
)
}) {
println!("🎉 Alice: Pairing completed successfully!");
break;
}
}
}
Ok(Err(e)) => {
println!("⚠️ Alice: Pairing status check failed: {}", e);
}
Err(_) => {
println!("⚠️ Alice: Pairing status check timed out");
}
}
attempts += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Check connected devices
println!("🔗 Alice: Checking connected devices...");
match timeout(Duration::from_secs(5), core.get_connected_devices()).await {
Ok(Ok(devices)) => {
println!("✅ Alice: Connected devices: {:?}", devices);
if !devices.is_empty() {
println!(
"PAIRING_SUCCESS: Alice has {} connected devices",
devices.len()
);
} else {
println!("⚠️ Alice: No devices connected after pairing");
}
}
Ok(Err(e)) => {
println!("❌ Alice: Failed to get connected devices: {}", e);
}
Err(_) => {
println!("❌ Alice: Get connected devices timed out");
}
}
println!("🧹 Alice: Test completed");
Ok(())
}

View File

@@ -8,152 +8,185 @@ use tokio::time::timeout;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long)]
data_dir: String,
#[arg(long)]
data_dir: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
println!("🟨 Bob: Starting Core pairing test");
println!("📁 Bob: Data dir: {}", args.data_dir);
// Initialize tracing for debug output
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
// Create Core instance
println!("🔧 Bob: Initializing Core...");
let mut core = match timeout(Duration::from_secs(10), sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir))).await {
Ok(Ok(core)) => {
println!("✅ Bob: Core initialized successfully");
core
}
Ok(Err(e)) => {
println!("❌ Bob: Core initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Core initialization timed out");
return Err("Core initialization timeout".into());
}
};
// Initialize networking
println!("🌐 Bob: Initializing networking...");
match timeout(Duration::from_secs(10), core.init_networking("bob-password")).await {
Ok(Ok(_)) => {
println!("✅ Bob: Networking initialized successfully");
}
Ok(Err(e)) => {
println!("❌ Bob: Networking initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Networking initialization timed out");
return Err("Networking initialization timeout".into());
}
}
// Wait for Alice's pairing code
println!(" Bob: Waiting for Alice's pairing code...");
let shared_dir = "/tmp/spacedrive-pairing-test";
let code_file = format!("{}/pairing_code.txt", shared_dir);
let pairing_code = loop {
match std::fs::read_to_string(&code_file) {
Ok(code) => {
if !code.trim().is_empty() {
println!("✅ Bob: Found pairing code: {}...",
code.trim().split_whitespace().take(3).collect::<Vec<_>>().join(" "));
break code.trim().to_string();
}
}
Err(_) => {
// File doesn't exist yet, keep waiting
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
};
// Join pairing using the code
println!("🤝 Bob: Joining pairing with code...");
match timeout(Duration::from_secs(15), core.start_pairing_as_joiner(&pairing_code)).await {
Ok(Ok(_)) => {
println!("✅ Bob: Successfully joined pairing");
}
Ok(Err(e)) => {
println!("❌ Bob: Failed to join pairing: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Pairing join timed out");
return Err("Pairing join timeout".into());
}
}
// Wait for pairing to complete
println!(" Bob: Waiting for pairing to complete...");
let mut attempts = 0;
let max_attempts = 20; // 20 seconds
loop {
if attempts >= max_attempts {
println!("❌ Bob: Pairing timed out after {} seconds", max_attempts);
return Err("Pairing timeout".into());
}
// Check pairing status
match timeout(Duration::from_secs(3), core.get_pairing_status()).await {
Ok(Ok(status)) => {
println!("🔍 Bob: Pairing status check {} - {} sessions", attempts + 1, status.len());
// Check if we have any completed pairings
if !status.is_empty() {
for session in &status {
println!("📊 Bob: Session state: {:?}", session);
}
// Look for successful pairing
if status.iter().any(|s| matches!(s.state, sd_core_new::networking::PairingState::Completed { .. })) {
println!("🎉 Bob: Pairing completed successfully!");
break;
}
}
}
Ok(Err(e)) => {
println!("⚠️ Bob: Pairing status check failed: {}", e);
}
Err(_) => {
println!("⚠️ Bob: Pairing status check timed out");
}
}
attempts += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Check connected devices
println!("🔗 Bob: Checking connected devices...");
match timeout(Duration::from_secs(5), core.get_connected_devices()).await {
Ok(Ok(devices)) => {
println!("✅ Bob: Connected devices: {:?}", devices);
if !devices.is_empty() {
println!("PAIRING_SUCCESS: Bob has {} connected devices", devices.len());
} else {
println!("⚠️ Bob: No devices connected after pairing");
}
}
Ok(Err(e)) => {
println!("❌ Bob: Failed to get connected devices: {}", e);
}
Err(_) => {
println!("❌ Bob: Get connected devices timed out");
}
}
println!("🧹 Bob: Test completed");
Ok(())
}
let args = Args::parse();
println!("🟨 Bob: Starting Core pairing test");
println!("📁 Bob: Data dir: {}", args.data_dir);
// Initialize tracing for debug output
// tracing_subscriber::fmt()
// .with_max_level(tracing::Level::DEBUG)
// .init();
// Create Core instance
println!("🔧 Bob: Initializing Core...");
let mut core = match timeout(
Duration::from_secs(10),
sd_core_new::Core::new_with_config(std::path::PathBuf::from(&args.data_dir)),
)
.await
{
Ok(Ok(core)) => {
println!("✅ Bob: Core initialized successfully");
core
}
Ok(Err(e)) => {
println!("❌ Bob: Core initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Core initialization timed out");
return Err("Core initialization timeout".into());
}
};
// Initialize networking
println!("🌐 Bob: Initializing networking...");
match timeout(
Duration::from_secs(10),
core.init_networking("bob-password"),
)
.await
{
Ok(Ok(_)) => {
println!("✅ Bob: Networking initialized successfully");
}
Ok(Err(e)) => {
println!(" Bob: Networking initialization failed: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Networking initialization timed out");
return Err("Networking initialization timeout".into());
}
}
// Wait for Alice's pairing code
println!("⏳ Bob: Waiting for Alice's pairing code...");
let shared_dir = "/tmp/spacedrive-pairing-test";
let code_file = format!("{}/pairing_code.txt", shared_dir);
let pairing_code = loop {
match std::fs::read_to_string(&code_file) {
Ok(code) => {
if !code.trim().is_empty() {
println!(
"✅ Bob: Found pairing code: {}...",
code.trim()
.split_whitespace()
.take(3)
.collect::<Vec<_>>()
.join(" ")
);
break code.trim().to_string();
}
}
Err(_) => {
// File doesn't exist yet, keep waiting
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
};
// Join pairing using the code
println!("🤝 Bob: Joining pairing with code...");
match timeout(
Duration::from_secs(15),
core.start_pairing_as_joiner(&pairing_code),
)
.await
{
Ok(Ok(_)) => {
println!("✅ Bob: Successfully joined pairing");
}
Ok(Err(e)) => {
println!("❌ Bob: Failed to join pairing: {}", e);
return Err(e);
}
Err(_) => {
println!("❌ Bob: Pairing join timed out");
return Err("Pairing join timeout".into());
}
}
// Wait for pairing to complete
println!("⏳ Bob: Waiting for pairing to complete...");
let mut attempts = 0;
let max_attempts = 20; // 20 seconds
loop {
if attempts >= max_attempts {
println!("❌ Bob: Pairing timed out after {} seconds", max_attempts);
return Err("Pairing timeout".into());
}
// Check pairing status
match timeout(Duration::from_secs(3), core.get_pairing_status()).await {
Ok(Ok(status)) => {
println!(
"🔍 Bob: Pairing status check {} - {} sessions",
attempts + 1,
status.len()
);
// Check if we have any completed pairings
if !status.is_empty() {
for session in &status {
println!("📊 Bob: Session state: {:?}", session);
}
// Look for successful pairing
if status.iter().any(|s| {
matches!(
s.state,
sd_core_new::networking::PairingState::Completed { .. }
)
}) {
println!("🎉 Bob: Pairing completed successfully!");
break;
}
}
}
Ok(Err(e)) => {
println!("⚠️ Bob: Pairing status check failed: {}", e);
}
Err(_) => {
println!("⚠️ Bob: Pairing status check timed out");
}
}
attempts += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Check connected devices
println!("🔗 Bob: Checking connected devices...");
match timeout(Duration::from_secs(5), core.get_connected_devices()).await {
Ok(Ok(devices)) => {
println!("✅ Bob: Connected devices: {:?}", devices);
if !devices.is_empty() {
println!(
"PAIRING_SUCCESS: Bob has {} connected devices",
devices.len()
);
} else {
println!("⚠️ Bob: No devices connected after pairing");
}
}
Ok(Err(e)) => {
println!("❌ Bob: Failed to get connected devices: {}", e);
}
Err(_) => {
println!("❌ Bob: Get connected devices timed out");
}
}
println!("🧹 Bob: Test completed");
Ok(())
}

View File

@@ -498,15 +498,15 @@ impl NetworkingEventLoop {
Ok(())
}
/// Attempt direct pairing when discovering peers via mDNS
/// Returns the number of pairing requests sent
async fn attempt_direct_pairing_on_mdns_discovery(
/// Schedule pairing requests for mDNS discovered peers (wait for connection establishment)
/// Returns the number of pairing sessions scheduled for connection
async fn schedule_pairing_on_mdns_discovery(
protocol_registry: &Arc<RwLock<ProtocolRegistry>>,
identity: &NetworkIdentity,
swarm: &mut Swarm<UnifiedBehaviour>,
discovered_peer_id: PeerId,
pending_pairing_connections: &mut std::collections::HashMap<uuid::Uuid, (PeerId, crate::infrastructure::networking::device::DeviceInfo, u32, chrono::DateTime<chrono::Utc>)>,
) -> Result<u32> {
let mut requests_sent = 0;
let mut sessions_scheduled = 0;
// Get pairing handler from protocol registry with proper error handling
let registry = protocol_registry.read().await;
@@ -531,44 +531,52 @@ impl NetworkingEventLoop {
// Process each session that's actively scanning for peers
for session in &active_sessions {
// Only send requests for sessions where we're actively scanning (Bob's role)
// Only schedule requests for sessions where we're actively scanning (Bob's role)
if matches!(session.state, crate::infrastructure::networking::protocols::pairing::PairingState::Scanning) {
println!("🔍 Found scanning session {} - sending pairing request to peer {}", session.id, discovered_peer_id);
println!("🔍 Found scanning session {} - scheduling pairing request for peer {} (waiting for connection)", session.id, discovered_peer_id);
// Create pairing request message
let pairing_request = super::behavior::PairingMessage::PairingRequest {
session_id: session.id,
// Create device info for this session
let device_info = crate::infrastructure::networking::device::DeviceInfo {
device_id: identity.device_id(),
device_name: Self::get_device_name_for_pairing(),
public_key: identity.public_key_bytes(),
device_type: crate::infrastructure::networking::device::DeviceType::Desktop,
os_version: std::env::consts::OS.to_string(),
app_version: env!("CARGO_PKG_VERSION").to_string(),
network_fingerprint: identity.network_fingerprint(),
last_seen: chrono::Utc::now(),
};
// Send pairing request directly to the discovered peer
let request_id = swarm.behaviour_mut().pairing.send_request(&discovered_peer_id, pairing_request);
println!("✅ mDNS Direct Pairing: Sent request to peer {} for session {} (request_id: {:?})",
discovered_peer_id, session.id, request_id);
// Add to pending connections - pairing request will be sent after connection establishment
// Using 5-minute timeout (300 seconds) and current time
pending_pairing_connections.insert(
session.id,
(discovered_peer_id, device_info, 300, chrono::Utc::now())
);
requests_sent += 1;
println!("✅ mDNS Discovery: Scheduled pairing request for session {} with peer {} (pending connection)",
session.id, discovered_peer_id);
sessions_scheduled += 1;
} else {
// Log other session states for debugging
match &session.state {
crate::infrastructure::networking::protocols::pairing::PairingState::WaitingForConnection => {
// This is Alice waiting for Bob - don't send requests
println!("🔍 Found waiting session {} (Alice side) - not sending request", session.id);
// This is Alice waiting for Bob - don't schedule requests
println!("🔍 Found waiting session {} (Alice side) - not scheduling request", session.id);
}
_ => {
// Other states like Completed, Failed, etc.
println!("🔍 Found session {} in state {:?} - not sending request", session.id, session.state);
println!("🔍 Found session {} in state {:?} - not scheduling request", session.id, session.state);
}
}
}
}
if requests_sent == 0 && !active_sessions.is_empty() {
if sessions_scheduled == 0 && !active_sessions.is_empty() {
println!("🔍 mDNS Discovery: Found {} active sessions but none in Scanning state", active_sessions.len());
}
Ok(requests_sent)
Ok(sessions_scheduled)
}
/// Get device name for pairing (production-ready with fallback)
@@ -746,21 +754,21 @@ impl NetworkingEventLoop {
println!("Bootstrapping Kademlia DHT with query ID: {:?}", query_id);
}
// PRODUCTION: Check if we have active pairing sessions and attempt direct pairing
// PRODUCTION: Schedule pairing requests for mDNS discovered peers (wait for connection)
// This handles the case where Bob discovers Alice via mDNS during pairing
match Self::attempt_direct_pairing_on_mdns_discovery(
match Self::schedule_pairing_on_mdns_discovery(
&protocol_registry,
&identity,
swarm,
peer_id,
pending_pairing_connections,
).await {
Ok(requests_sent) => {
if requests_sent > 0 {
println!("🔍 mDNS Discovery: Sent {} direct pairing requests to peer {}", requests_sent, peer_id);
Ok(sessions_scheduled) => {
if sessions_scheduled > 0 {
println!("🔍 mDNS Discovery: Scheduled {} pairing sessions for peer {} (waiting for connection)", sessions_scheduled, peer_id);
}
}
Err(e) => {
println!("⚠️ mDNS Discovery: Failed to attempt direct pairing with peer {}: {}", peer_id, e);
println!("⚠️ mDNS Discovery: Failed to schedule pairing with peer {}: {}", peer_id, e);
}
}

View File

@@ -207,8 +207,10 @@ impl PairingCode {
/// Derive session ID from secret
fn derive_session_id(secret: &[u8; 32]) -> Uuid {
// Use BLAKE3 to derive UUID from secret
let hash = blake3::hash(secret);
// For pairing codes, derive session ID from the entropy that survives BIP39 round-trip
// This ensures Alice (who generates) and Bob (who parses) get the same session ID
// This is critical for DHT-based pairing where session IDs must match
let hash = blake3::hash(&secret[..16]); // Use only the first 16 bytes (BIP39 entropy)
let bytes = hash.as_bytes();
let uuid_bytes = [

View File

@@ -570,17 +570,35 @@ impl Core {
println!("⏳ Waiting for mDNS discovery to trigger pairing requests...");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Hybrid approach: Try local pairing first, then DHT fallback for remote pairing
// Unified Pairing Flow: Support both mDNS (local) and DHT (remote) simultaneously
// Both methods run in parallel, first successful response completes pairing
// 1. Attempt direct pairing with any currently connected peers (local pairing)
// This handles the common case where Alice and Bob are on the same network
println!("🔄 Starting unified pairing flow for session: {}", session_id);
// Method 1: mDNS-based local pairing (already handled by event loop)
// The event loop automatically detects mDNS peers and schedules pairing requests
// This handles Alice and Bob on the same network
println!("📡 mDNS pairing: Listening for local network discoveries...");
// Method 2: DHT-based remote pairing (for cross-network scenarios)
// Query DHT for Alice's published session record
println!("🌐 DHT pairing: Querying distributed hash table...");
let key = libp2p::kad::RecordKey::new(&session_id.as_bytes());
match service.query_dht_record(key).await {
Ok(query_id) => {
println!("🔍 DHT Query initiated: session={}, query_id={:?}", session_id, query_id);
}
Err(e) => {
println!("⚠️ DHT Query failed: {}", e);
}
}
// Method 3: Direct requests to any currently connected peers (immediate attempt)
// This covers cases where Alice is already connected but not yet paired
let connected_peers = service.get_connected_peers().await;
if !connected_peers.is_empty() {
println!("Attempting direct pairing with {} connected peers for session: {}",
connected_peers.len(), session_id);
println!("🔗 Direct pairing: Sending requests to {} connected peers", connected_peers.len());
// Send pairing requests to all connected peers
// One of them might be Alice with the matching session
for peer_id in connected_peers {
let pairing_request = networking::core::behavior::PairingMessage::PairingRequest {
session_id,
@@ -594,36 +612,32 @@ impl Core {
"pairing",
serde_json::to_vec(&pairing_request).unwrap_or_default()
).await {
Ok(_) => println!("Sent direct pairing request to peer: {}", peer_id),
Err(e) => println!("Failed to send pairing request to {}: {}", peer_id, e),
Ok(_) => println!("📤 Sent direct pairing request to peer: {}", peer_id),
Err(e) => println!("Failed to send pairing request to {}: {}", peer_id, e),
}
}
}
// 2. Query DHT for remote pairing (primary method when mDNS fails)
let key = libp2p::kad::RecordKey::new(&session_id.as_bytes());
let query_id = service.query_dht_record(key).await?;
println!("🔍 Querying DHT for pairing session: session={}, query_id={:?}", session_id, query_id);
// 3. Add periodic DHT queries as backup in case the first query fails
// This is important for test environments where mDNS might not work
// Add periodic DHT retries for reliability in challenging network conditions
let networking_ref = networking.clone();
let session_id_clone = session_id;
tokio::spawn(async move {
for i in 1..=3 {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let key = libp2p::kad::RecordKey::new(&session_id_clone.as_bytes());
let service = networking_ref.read().await;
match service.query_dht_record(key).await {
Ok(query_id) => {
println!("🔍 DHT Retry {}: Querying for session {} (query_id: {:?})", i, session_id_clone, query_id);
println!("🔄 DHT Retry {}: session={}, query_id={:?}", i, session_id_clone, query_id);
}
Err(e) => {
println!("⚠️ DHT Retry {}: Failed to query session {}: {}", i, session_id_clone, e);
println!("⚠️ DHT Retry {} failed: {}", i, e);
}
}
}
});
println!("✅ Unified pairing flow initiated - waiting for responses from any method...");
Ok(())
}

View File

@@ -1,281 +0,0 @@
//! Real CLI command integration test for device pairing
//! This test spawns actual CLI processes using instances and tests the real user workflow
use std::process::Command;
use std::time::Duration;
use tokio::time::timeout;
use tempfile::TempDir;
struct CliInstance {
name: String,
data_dir: String,
cli_binary: String,
}
impl CliInstance {
fn new(name: &str, data_dir: &str) -> Self {
Self {
name: name.to_string(),
data_dir: data_dir.to_string(),
cli_binary: "./target/debug/spacedrive".to_string(),
}
}
fn run_command(&self, args: &[&str]) -> Result<std::process::Output, std::io::Error> {
let mut full_args = vec![
"--data-dir", &self.data_dir,
"--instance", &self.name,
];
full_args.extend_from_slice(args);
Command::new(&self.cli_binary)
.args(&full_args)
.output()
}
async fn run_command_with_timeout(&self, args: &[&str], timeout_secs: u64) -> Result<std::process::Output, String> {
let timeout_duration = Duration::from_secs(timeout_secs);
match timeout(timeout_duration, async {
self.run_command(args)
}).await {
Ok(result) => result.map_err(|e| format!("Command failed: {}", e)),
Err(_) => Err(format!("Command timed out after {} seconds", timeout_secs))
}
}
fn start_daemon(&self) -> Result<(), String> {
let output = self.run_command(&["start", "--enable-networking"])
.map_err(|e| format!("Failed to start daemon: {}", e))?;
if !output.status.success() {
return Err(format!("Daemon start failed: {}", String::from_utf8_lossy(&output.stderr)));
}
println!("{} daemon started: {}", self.name, String::from_utf8_lossy(&output.stdout));
Ok(())
}
fn stop_daemon(&self) {
let _ = self.run_command(&["instance", "stop", &self.name]);
}
fn check_status(&self) -> Result<String, String> {
let output = self.run_command(&["status"])
.map_err(|e| format!("Failed to check status: {}", e))?;
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
fn init_networking(&self, password: &str) -> Result<(), String> {
let output = self.run_command(&["network", "init", "--password", password])
.map_err(|e| format!("Failed to init networking: {}", e))?;
if !output.status.success() {
return Err(format!("Networking init failed: {}", String::from_utf8_lossy(&output.stderr)));
}
println!("{} networking initialized: {}", self.name, String::from_utf8_lossy(&output.stdout));
Ok(())
}
async fn generate_pairing_code(&self) -> Result<(String, tokio::process::Child), String> {
use tokio::process::Command as TokioCommand;
use tokio::io::{AsyncBufReadExt, BufReader};
let mut full_args = vec![
"--data-dir", &self.data_dir,
"--instance", &self.name,
"network", "pair", "generate", "--auto-accept"
];
let mut cmd = TokioCommand::new(&self.cli_binary)
.args(&full_args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| format!("Failed to spawn pairing command: {}", e))?;
let stdout = cmd.stdout.take().unwrap();
let mut reader = BufReader::new(stdout);
let mut line = String::new();
// Read lines until we get the pairing code
let mut pairing_code = None;
let timeout_duration = tokio::time::Duration::from_secs(15);
match tokio::time::timeout(timeout_duration, async {
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
println!("{} output: {}", self.name, line.trim());
// Look for the pairing code line
if let Some(code) = extract_pairing_code(&line) {
pairing_code = Some(code);
break;
}
line.clear();
}
}).await {
Ok(_) => {},
Err(_) => return Err("Timeout waiting for pairing code".to_string()),
}
match pairing_code {
Some(code) => Ok((code, cmd)),
None => Err("Failed to extract pairing code from output".to_string()),
}
}
async fn join_pairing(&self, code: &str) -> Result<(), String> {
let output = self.run_command_with_timeout(&["network", "pair", "join", "--code", code], 30).await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
println!("{} join output:\n{}", self.name, stdout);
if !stderr.is_empty() {
println!("{} join stderr:\n{}", self.name, stderr);
}
if !output.status.success() {
return Err(format!("Pairing join failed: {}", stderr));
}
// Check for success indicators in output
if stdout.contains("") || stdout.contains("success") || stdout.contains("paired") {
Ok(())
} else {
Err("No success indicator found in output".to_string())
}
}
}
#[tokio::test]
async fn test_cli_pairing_real_commands() {
println!("🧪 Testing real CLI pairing commands with instances");
// Create temporary directories for Alice and Bob
let alice_dir = TempDir::new().expect("Failed to create Alice temp dir");
let bob_dir = TempDir::new().expect("Failed to create Bob temp dir");
let alice = CliInstance::new("alice", alice_dir.path().to_str().unwrap());
let bob = CliInstance::new("bob", bob_dir.path().to_str().unwrap());
println!("📁 Alice data dir: {:?}", alice_dir.path());
println!("📁 Bob data dir: {:?}", bob_dir.path());
// Build the CLI binary first
println!("🔨 Building CLI binary...");
let build_result = Command::new("cargo")
.args(&["build", "--bin", "spacedrive"])
.output()
.expect("Failed to build CLI binary");
if !build_result.status.success() {
panic!("Failed to build CLI binary: {}", String::from_utf8_lossy(&build_result.stderr));
}
// Cleanup function to stop daemons
let cleanup = || {
alice.stop_daemon();
bob.stop_daemon();
};
// Start both daemons
println!("🟦 Starting Alice daemon...");
if let Err(e) = alice.start_daemon() {
cleanup();
panic!("Failed to start Alice daemon: {}", e);
}
println!("🟨 Starting Bob daemon...");
if let Err(e) = bob.start_daemon() {
cleanup();
panic!("Failed to start Bob daemon: {}", e);
}
// Give daemons time to start
tokio::time::sleep(Duration::from_secs(5)).await;
// Check daemon status
println!("🔍 Checking daemon status...");
match alice.check_status() {
Ok(status) => println!("Alice status: {}", status.lines().take(3).collect::<Vec<_>>().join(" ")),
Err(e) => println!("Alice status error: {}", e),
}
match bob.check_status() {
Ok(status) => println!("Bob status: {}", status.lines().take(3).collect::<Vec<_>>().join(" ")),
Err(e) => println!("Bob status error: {}", e),
}
// Initialize networking for both
println!("🔧 Initializing networking...");
if let Err(e) = alice.init_networking("alice-password") {
cleanup();
panic!("Failed to initialize Alice networking: {}", e);
}
if let Err(e) = bob.init_networking("bob-password") {
cleanup();
panic!("Failed to initialize Bob networking: {}", e);
}
// Give networking time to initialize
tokio::time::sleep(Duration::from_secs(2)).await;
// Alice generates pairing code (starts background process)
println!("🔑 Alice generating pairing code...");
let (pairing_code, mut alice_process) = match alice.generate_pairing_code().await {
Ok((code, process)) => (code, process),
Err(e) => {
cleanup();
panic!("Alice pairing code generation failed: {}", e);
}
};
println!("🔗 Extracted pairing code: {}...",
pairing_code.split_whitespace().take(3).collect::<Vec<_>>().join(" "));
// Bob joins using the pairing code
println!("🤝 Bob joining with pairing code...");
match bob.join_pairing(&pairing_code).await {
Ok(_) => {
println!("✅ CLI pairing test successful!");
}
Err(e) => {
// Kill Alice's pairing process before cleanup
let _ = alice_process.kill().await;
cleanup();
panic!("Bob pairing failed: {}", e);
}
}
// Wait a moment for pairing to complete on Alice's side
tokio::time::sleep(Duration::from_secs(2)).await;
// Kill Alice's pairing process
let _ = alice_process.kill().await;
// Cleanup
cleanup();
println!("🧹 Cleaned up daemon instances");
}
/// Extract the pairing code from CLI output
fn extract_pairing_code(output: &str) -> Option<String> {
// Look for the pairing code in the CLI output
// The CLI outputs it in a specific format after "Your Pairing Code:"
for line in output.lines() {
let trimmed = line.trim();
// Look for a line with 12 words (the pairing code format)
let words: Vec<&str> = trimmed.split_whitespace().collect();
if words.len() == 12 {
// Verify these look like BIP39 words (basic check - all lowercase alphabetic)
if words.iter().all(|w| w.chars().all(|c| c.is_ascii_lowercase())) {
return Some(words.join(" "));
}
}
}
None
}