Enhance DHT-based pairing session discovery and advertisement

Key Changes:
- Implemented DHT record publishing for pairing sessions in `Core` and `NetworkingCore`.
- Added `PairingAdvertisement` struct for serializing pairing session details.
- Updated event loop to handle DHT record queries and responses, enabling discovery of pairing sessions.
- Enhanced CLI commands to support querying and publishing pairing advertisements.

Impact:
- Improved device discovery during pairing, allowing devices to find each other more efficiently.
- Streamlined the pairing process by integrating DHT functionalities, enhancing user experience.

This commit lays the groundwork for a more robust and responsive pairing mechanism in the networking layer.
This commit is contained in:
Jamie Pine
2025-06-22 17:55:27 -07:00
parent fef64b86d0
commit e43ea3be67
8 changed files with 801 additions and 133 deletions

View File

@@ -222,22 +222,22 @@ pub enum NetworkCommands {
#[arg(short, long)]
password: String,
},
/// Start networking service
Start,
/// Stop networking service
Stop,
/// List connected devices
Devices,
/// Revoke trust from a device
Revoke {
/// Device ID to revoke
device_id: Uuid,
},
/// Send a file via Spacedrop
Spacedrop {
/// Target device ID
@@ -753,9 +753,9 @@ pub async fn handle_network_command(
_state: &mut CliState,
) -> Result<(), Box<dyn std::error::Error>> {
use crate::infrastructure::cli::daemon::{DaemonClient, DaemonCommand};
let client = DaemonClient::new();
// Check if daemon is running for most commands
match &cmd {
NetworkCommands::Init { .. } => {
@@ -763,17 +763,22 @@ pub async fn handle_network_command(
}
_ => {
if !client.is_running() {
println!("{} Daemon is not running. Start it with: {}",
"".red(),
"spacedrive start".bright_blue());
println!(
"{} Daemon is not running. Start it with: {}",
"".red(),
"spacedrive start".bright_blue()
);
return Ok(());
}
}
}
match cmd {
NetworkCommands::Init { password } => {
match client.send_command(DaemonCommand::InitNetworking { password }).await? {
match client
.send_command(DaemonCommand::InitNetworking { password })
.await?
{
crate::infrastructure::cli::daemon::DaemonResponse::Ok => {
println!("{} Networking initialized successfully", "".green());
}
@@ -785,7 +790,7 @@ pub async fn handle_network_command(
}
}
}
NetworkCommands::Start => {
match client.send_command(DaemonCommand::StartNetworking).await? {
crate::infrastructure::cli::daemon::DaemonResponse::Ok => {
@@ -799,23 +804,24 @@ pub async fn handle_network_command(
}
}
}
NetworkCommands::Stop => {
match client.send_command(DaemonCommand::StopNetworking).await? {
crate::infrastructure::cli::daemon::DaemonResponse::Ok => {
println!("{} Networking service stopped", "".green());
}
crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => {
println!("{} {}", "".red(), err);
}
_ => {
println!("{} Unexpected response", "".red());
}
NetworkCommands::Stop => match client.send_command(DaemonCommand::StopNetworking).await? {
crate::infrastructure::cli::daemon::DaemonResponse::Ok => {
println!("{} Networking service stopped", "".green());
}
}
crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => {
println!("{} {}", "".red(), err);
}
_ => {
println!("{} Unexpected response", "".red());
}
},
NetworkCommands::Devices => {
match client.send_command(DaemonCommand::ListConnectedDevices).await? {
match client
.send_command(DaemonCommand::ListConnectedDevices)
.await?
{
crate::infrastructure::cli::daemon::DaemonResponse::ConnectedDevices(devices) => {
if devices.is_empty() {
println!("No devices currently connected");
@@ -824,7 +830,7 @@ pub async fn handle_network_command(
let mut table = Table::new();
table.load_preset(UTF8_FULL);
table.set_header(vec!["Device ID", "Name", "Status", "Last Seen"]);
for device in devices {
table.add_row(vec![
Cell::new(&device.device_id.to_string()[..8]),
@@ -833,7 +839,7 @@ pub async fn handle_network_command(
Cell::new(&device.last_seen),
]);
}
println!("{}", table);
}
}
@@ -845,9 +851,12 @@ pub async fn handle_network_command(
}
}
}
NetworkCommands::Revoke { device_id } => {
match client.send_command(DaemonCommand::RevokeDevice { device_id }).await? {
match client
.send_command(DaemonCommand::RevokeDevice { device_id })
.await?
{
crate::infrastructure::cli::daemon::DaemonResponse::Ok => {
println!("{} Device {} revoked", "".green(), device_id);
}
@@ -859,16 +868,30 @@ pub async fn handle_network_command(
}
}
}
NetworkCommands::Spacedrop { device_id, file_path, sender, message } => {
match client.send_command(DaemonCommand::SendSpacedrop {
device_id,
file_path: file_path.to_string_lossy().to_string(),
sender_name: sender,
message
}).await? {
crate::infrastructure::cli::daemon::DaemonResponse::SpacedropStarted { transfer_id } => {
println!("{} Spacedrop started with transfer ID: {}", "".green(), transfer_id);
NetworkCommands::Spacedrop {
device_id,
file_path,
sender,
message,
} => {
match client
.send_command(DaemonCommand::SendSpacedrop {
device_id,
file_path: file_path.to_string_lossy().to_string(),
sender_name: sender,
message,
})
.await?
{
crate::infrastructure::cli::daemon::DaemonResponse::SpacedropStarted {
transfer_id,
} => {
println!(
"{} Spacedrop started with transfer ID: {}",
"".green(),
transfer_id
);
}
crate::infrastructure::cli::daemon::DaemonResponse::Error(err) => {
println!("{} {}", "".red(), err);
@@ -883,7 +906,9 @@ pub async fn handle_network_command(
// Convert from legacy PairingCommands to new PairingAction
let action = match action {
PairingCommands::Generate { auto_accept } => {
crate::infrastructure::cli::networking_commands::PairingAction::Generate { auto_accept }
crate::infrastructure::cli::networking_commands::PairingAction::Generate {
auto_accept,
}
}
PairingCommands::Join { code } => {
let code = match code {
@@ -897,19 +922,30 @@ pub async fn handle_network_command(
};
crate::infrastructure::cli::networking_commands::PairingAction::Join { code }
}
PairingCommands::Status => crate::infrastructure::cli::networking_commands::PairingAction::Status,
PairingCommands::ListPending => crate::infrastructure::cli::networking_commands::PairingAction::List,
PairingCommands::Status => {
crate::infrastructure::cli::networking_commands::PairingAction::Status
}
PairingCommands::ListPending => {
crate::infrastructure::cli::networking_commands::PairingAction::List
}
PairingCommands::Accept { request_id } => {
crate::infrastructure::cli::networking_commands::PairingAction::Accept { request_id }
crate::infrastructure::cli::networking_commands::PairingAction::Accept {
request_id,
}
}
PairingCommands::Reject { request_id } => {
crate::infrastructure::cli::networking_commands::PairingAction::Reject { request_id }
crate::infrastructure::cli::networking_commands::PairingAction::Reject {
request_id,
}
}
};
crate::infrastructure::cli::networking_commands::handle_pairing_command(action, &client).await?;
crate::infrastructure::cli::networking_commands::handle_pairing_command(
action, &client,
)
.await?;
}
}
Ok(())
}
@@ -919,50 +955,100 @@ pub async fn handle_pairing_command(
) -> Result<(), Box<dyn std::error::Error>> {
use crate::infrastructure::cli::daemon::{DaemonCommand, DaemonResponse};
use colored::Colorize;
match action {
PairingCommands::Generate { auto_accept } => {
println!("🔐 Generating pairing code...");
match client.send_command(DaemonCommand::StartPairingAsInitiator { auto_accept }).await? {
DaemonResponse::PairingCodeGenerated { code, expires_in_seconds } => {
println!("\n📋 {} {}", "Your Pairing Code:".bright_cyan().bold(), "Share this with the other device".dimmed());
match client
.send_command(DaemonCommand::StartPairingAsInitiator { auto_accept })
.await?
{
DaemonResponse::PairingCodeGenerated {
code,
expires_in_seconds,
} => {
println!(
"\n📋 {} {}",
"Your Pairing Code:".bright_cyan().bold(),
"Share this with the other device".dimmed()
);
println!();
println!(" {}", code.bright_white().bold());
println!();
println!("{} {} {}", "Expires in".yellow(), expires_in_seconds.to_string().bright_yellow().bold(), "seconds".yellow());
println!("💡 {} {}", "Tip:".bright_blue(), "The other device should use 'spacedrive network pair join'".dimmed());
println!(
" {} {} {}",
"Expires in".yellow(),
expires_in_seconds.to_string().bright_yellow().bold(),
"seconds".yellow()
);
println!(
"💡 {} {}",
"Tip:".bright_blue(),
"BOZO The other device should use 'spacedrive network pair join'".dimmed()
);
if auto_accept {
println!("🤖 {} {}", "Auto-accept enabled:".bright_green(), "Will automatically accept any pairing request".dimmed());
println!(
"🤖 {} {}",
"Auto-accept enabled:".bright_green(),
"Will automatically accept any pairing request".dimmed()
);
} else {
println!("👤 {} {}", "Manual approval:".bright_yellow(), "You'll be asked to confirm pairing requests".dimmed());
println!(
"👤 {} {}",
"Manual approval:".bright_yellow(),
"You'll be asked to confirm pairing requests".dimmed()
);
}
println!("\n📡 Waiting for devices to connect...");
println!(" Press Ctrl+C to cancel");
// Wait for pairing completion or timeout
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
match client.send_command(DaemonCommand::GetPairingStatus).await? {
DaemonResponse::PairingStatus { status, remote_device } => {
DaemonResponse::PairingStatus {
status,
remote_device,
} => {
match status.as_str() {
"completed" => {
if let Some(device) = remote_device {
println!("\n🎉 {} {}", "Pairing completed successfully!".bright_green().bold(), "Device connected".dimmed());
println!(" Device: {}", device.device_name.bright_cyan());
println!(" ID: {}", device.device_id.to_string().bright_blue());
println!(
"\n🎉 {} {}",
"Pairing completed successfully!"
.bright_green()
.bold(),
"Device connected".dimmed()
);
println!(
" Device: {}",
device.device_name.bright_cyan()
);
println!(
" ID: {}",
device.device_id.to_string().bright_blue()
);
}
break;
}
"failed" => {
println!("\n{} {}", "Pairing failed".bright_red().bold(), "Please try again".dimmed());
println!(
"\n{} {}",
"Pairing failed".bright_red().bold(),
"Please try again".dimmed()
);
break;
}
"expired" => {
println!("\n{} {}", "Pairing code expired".bright_yellow().bold(), "Generate a new code to try again".dimmed());
println!(
"\n{} {}",
"Pairing code expired".bright_yellow().bold(),
"Generate a new code to try again".dimmed()
);
break;
}
_ => {
@@ -984,50 +1070,84 @@ pub async fn handle_pairing_command(
}
}
}
PairingCommands::Join { code } => {
let pairing_code = if let Some(code) = code {
code
} else {
println!("📥 {} {}", "Enter Pairing Code".bright_cyan().bold(), "From the other device".dimmed());
println!(
"📥 {} {}",
"Enter Pairing Code".bright_cyan().bold(),
"From the other device".dimmed()
);
println!("Format: word1 word2 word3 word4 word5 word6 word7 word8 word9 word10 word11 word12");
print!("> ");
use std::io::{self, Write};
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
input.trim().to_string()
};
println!("🔍 Connecting to device...");
match client.send_command(DaemonCommand::StartPairingAsJoiner { code: pairing_code }).await? {
match client
.send_command(DaemonCommand::StartPairingAsJoiner { code: pairing_code })
.await?
{
DaemonResponse::PairingInProgress => {
println!("📡 {} {}", "Searching for device...".bright_blue(), "This may take a moment".dimmed());
println!(
"📡 {} {}",
"Searching for device...".bright_blue(),
"This may take a moment".dimmed()
);
// Wait for pairing completion
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
match client.send_command(DaemonCommand::GetPairingStatus).await? {
DaemonResponse::PairingStatus { status, remote_device } => {
DaemonResponse::PairingStatus {
status,
remote_device,
} => {
match status.as_str() {
"completed" => {
if let Some(device) = remote_device {
println!("\n🎉 {} {}", "Pairing completed successfully!".bright_green().bold(), "Device connected".dimmed());
println!(" Device: {}", device.device_name.bright_cyan());
println!(" ID: {}", device.device_id.to_string().bright_blue());
println!(
"\n🎉 {} {}",
"Pairing completed successfully!"
.bright_green()
.bold(),
"Device connected".dimmed()
);
println!(
" Device: {}",
device.device_name.bright_cyan()
);
println!(
" ID: {}",
device.device_id.to_string().bright_blue()
);
}
break;
}
"failed" => {
println!("\n{} {}", "Pairing failed".bright_red().bold(), "Check the pairing code and try again".dimmed());
println!(
"\n{} {}",
"Pairing failed".bright_red().bold(),
"Check the pairing code and try again".dimmed()
);
break;
}
"expired" => {
println!("\n{} {}", "Pairing code expired".bright_yellow().bold(), "Ask for a new pairing code".dimmed());
println!(
"\n{} {}",
"Pairing code expired".bright_yellow().bold(),
"Ask for a new pairing code".dimmed()
);
break;
}
_ => {
@@ -1049,19 +1169,33 @@ pub async fn handle_pairing_command(
}
}
}
PairingCommands::Status => {
match client.send_command(DaemonCommand::GetPairingStatus).await? {
DaemonResponse::PairingStatus { status, remote_device } => {
println!("📊 {} {}", "Pairing Status:".bright_cyan().bold(), status.bright_white());
DaemonResponse::PairingStatus {
status,
remote_device,
} => {
println!(
"📊 {} {}",
"Pairing Status:".bright_cyan().bold(),
status.bright_white()
);
if let Some(device) = remote_device {
println!(" Remote Device: {}", device.device_name.bright_cyan());
println!(" Device ID: {}", device.device_id.to_string().bright_blue());
println!(
" Device ID: {}",
device.device_id.to_string().bright_blue()
);
println!(" Status: {}", device.status.bright_green());
println!(" Last Seen: {}", device.last_seen.bright_yellow());
} else {
println!(" {} {}", "No active pairing session".dimmed(), "Use 'pair generate' or 'pair join' to start pairing".dimmed());
println!(
" {} {}",
"No active pairing session".dimmed(),
"Use 'pair generate' or 'pair join' to start pairing".dimmed()
);
}
}
DaemonResponse::Error(err) => {
@@ -1072,24 +1206,56 @@ pub async fn handle_pairing_command(
}
}
}
PairingCommands::ListPending => {
match client.send_command(DaemonCommand::ListPendingPairings).await? {
match client
.send_command(DaemonCommand::ListPendingPairings)
.await?
{
DaemonResponse::PendingPairings(requests) => {
if requests.is_empty() {
println!("📭 {} {}", "No pending pairing requests".dimmed(), "Devices will appear here when they try to pair".dimmed());
println!(
"📭 {} {}",
"No pending pairing requests".dimmed(),
"Devices will appear here when they try to pair".dimmed()
);
} else {
println!("📋 {} {}", "Pending Pairing Requests:".bright_cyan().bold(), format!("({} total)", requests.len()).dimmed());
println!(
"📋 {} {}",
"Pending Pairing Requests:".bright_cyan().bold(),
format!("({} total)", requests.len()).dimmed()
);
for request in requests {
println!();
println!(" 🔐 {} {}", "Request ID:".bright_blue(), request.request_id.to_string().bright_white());
println!(" 📱 {} {}", "Device:".bright_blue(), request.device_name.bright_cyan());
println!(" 🆔 {} {}", "Device ID:".bright_blue(), request.device_id.to_string().bright_blue());
println!("{} {}", "Received:".bright_blue(), request.received_at.bright_yellow());
println!(" 💬 {} {} {}", "Actions:".bright_blue(),
format!("spacedrive network pair accept {}", request.request_id).bright_green(),
format!("spacedrive network pair reject {}", request.request_id).bright_red());
println!(
" 🔐 {} {}",
"Request ID:".bright_blue(),
request.request_id.to_string().bright_white()
);
println!(
" 📱 {} {}",
"Device:".bright_blue(),
request.device_name.bright_cyan()
);
println!(
" 🆔 {} {}",
"Device ID:".bright_blue(),
request.device_id.to_string().bright_blue()
);
println!(
"{} {}",
"Received:".bright_blue(),
request.received_at.bright_yellow()
);
println!(
" 💬 {} {} {}",
"Actions:".bright_blue(),
format!("spacedrive network pair accept {}", request.request_id)
.bright_green(),
format!("spacedrive network pair reject {}", request.request_id)
.bright_red()
);
}
}
}
@@ -1101,13 +1267,24 @@ pub async fn handle_pairing_command(
}
}
}
PairingCommands::Accept { request_id } => {
println!("{} {}...", "Accepting pairing request".bright_green(), request_id.to_string().bright_blue());
match client.send_command(DaemonCommand::AcceptPairing { request_id }).await? {
println!(
"{} {}...",
"Accepting pairing request".bright_green(),
request_id.to_string().bright_blue()
);
match client
.send_command(DaemonCommand::AcceptPairing { request_id })
.await?
{
DaemonResponse::Ok => {
println!("{} {}", "".green(), "Pairing request accepted successfully".bright_green());
println!(
"{} {}",
"".green(),
"Pairing request accepted successfully".bright_green()
);
}
DaemonResponse::Error(err) => {
println!("{} {}", "".red(), err);
@@ -1117,13 +1294,24 @@ pub async fn handle_pairing_command(
}
}
}
PairingCommands::Reject { request_id } => {
println!("{} {}...", "Rejecting pairing request".bright_red(), request_id.to_string().bright_blue());
match client.send_command(DaemonCommand::RejectPairing { request_id }).await? {
println!(
"{} {}...",
"Rejecting pairing request".bright_red(),
request_id.to_string().bright_blue()
);
match client
.send_command(DaemonCommand::RejectPairing { request_id })
.await?
{
DaemonResponse::Ok => {
println!("{} {}", "".green(), "Pairing request rejected".bright_yellow());
println!(
"{} {}",
"".green(),
"Pairing request rejected".bright_yellow()
);
}
DaemonResponse::Error(err) => {
println!("{} {}", "".red(), err);
@@ -1134,7 +1322,7 @@ pub async fn handle_pairing_command(
}
}
}
Ok(())
}

View File

@@ -110,20 +110,24 @@ pub enum InstanceCommands {
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
// Set up logging with networking debug support
// Set up logging - respect RUST_LOG environment variable with fallback defaults
let log_level = if cli.verbose { "debug" } else { "info" };
let env_filter = if cli.verbose {
// Enable detailed networking and libp2p logging when verbose
format!(
"sd_core_new={},spacedrive_cli={},libp2p_mdns=debug,libp2p_swarm=debug,libp2p_kad=debug",
log_level, log_level
)
} else {
format!(
"sd_core_new={},spacedrive_cli={}",
log_level, log_level
)
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| {
// Fallback to hardcoded filters if RUST_LOG not set
if cli.verbose {
// Enable detailed networking and libp2p logging when verbose
tracing_subscriber::EnvFilter::new(&format!(
"sd_core_new={},spacedrive_cli={},libp2p=debug",
log_level, log_level
))
} else {
tracing_subscriber::EnvFilter::new(&format!(
"sd_core_new={},spacedrive_cli={}",
log_level, log_level
))
}
});
tracing_subscriber::fmt()
.with_env_filter(env_filter)

View File

@@ -0,0 +1,174 @@
# Networking Implementation - Remaining Work
## Current Status: DHT Discovery ✅ - Pairing Flow ❌
The networking system has **successfully implemented DHT-based pairing discovery** but is missing the **actual pairing connection and authentication flow**.
### What Works ✅
1. **DHT Session Advertisement**
- Initiator publishes pairing session to Kademlia DHT
- Session includes peer_id, addresses, device_info, expiration
- BIP39 pairing codes correctly derive session IDs
2. **DHT Session Discovery**
- Joiner queries DHT using session ID from pairing code
- Successfully finds initiator's advertisement
- Emits `PairingSessionDiscovered` event with peer info
3. **LibP2P Infrastructure**
- Unified swarm with proper Send/Sync compliance
- Request-response protocols for pairing and messaging
- Comprehensive Kademlia event handling
- Clean command channel architecture
### What's Missing ❌
#### 1. **Peer Connection After Discovery**
**Problem**: After DHT discovery, joiner doesn't connect to initiator
**Current State**:
```rust
// In event_loop.rs - attempts to dial but doesn't work properly
for address in &addresses {
match swarm.dial(address.clone()) {
Ok(_) => {
println!("Dialing discovered peer {} at {}", peer_id, address);
break;
}
Err(e) => {
println!("Failed to dial {}: {:?}", address, e);
}
}
}
```
**Needed**:
- Fix address dialing (current addresses may be invalid: `/ip4/127.0.0.1/tcp/0`)
- Implement proper external address discovery
- Handle connection establishment events properly
#### 2. **Pairing Request Transmission**
**Problem**: After connection, no pairing request is sent
**Current State**: Connection established but no pairing message sent
**Needed**:
```rust
// After connection established, send pairing request
let pairing_request = PairingMessage::Request {
session_id,
device_info: local_device_info,
public_key: identity.public_key_bytes(),
};
swarm.behaviour_mut().pairing.send_request(&peer_id, pairing_request);
```
#### 3. **Challenge-Response Authentication**
**Problem**: No pairing protocol implementation
**Current State**: Pairing protocol handler exists but not integrated
**Needed**:
- Initiator receives pairing request
- Initiator sends cryptographic challenge
- Joiner signs challenge and responds
- Initiator verifies signature and completes pairing
- Both sides store paired device info
#### 4. **Session State Management**
**Problem**: No tracking of pairing session states
**Needed**:
- Track pending pairing sessions (who initiated what)
- Map session IDs to active pairing attempts
- Timeout handling for expired sessions
- Proper cleanup of failed pairing attempts
#### 5. **External Address Discovery**
**Problem**: DHT advertisements contain invalid addresses
**Current Issue**:
```rust
// Current implementation returns placeholder
pub async fn get_external_addresses(&self) -> Vec<Multiaddr> {
vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()] // Invalid!
}
```
**Needed**:
- Get actual listening addresses from swarm
- Implement STUN/NAT traversal for external addresses
- Filter out non-routable addresses (127.0.0.1, etc.)
## Implementation Priority
### High Priority (Required for Basic Pairing)
1. **Fix External Addresses** (`get_external_addresses()`)
- Extract actual listening addresses from LibP2P swarm
- This will fix the dialing issue
2. **Connection-Triggered Pairing Request**
- Detect when connection is for pairing vs normal operation
- Send `PairingMessage::Request` after connection established
3. **Pairing Protocol Integration**
- Wire up existing `PairingProtocolHandler` methods
- Handle request/challenge/response/complete message flow
### Medium Priority (Production Readiness)
4. **Session State Tracking**
- Store pending sessions in event loop or core
- Proper timeout and cleanup handling
5. **Error Recovery**
- Handle network failures during pairing
- Retry logic for failed connections
- User feedback for pairing failures
### Low Priority (Nice to Have)
6. **NAT Traversal**
- STUN server integration for external address discovery
- UPnP port mapping for better connectivity
7. **DHT Record Cleanup**
- Remove expired pairing sessions from DHT
- Garbage collection for old records
## Code Locations
**Key files needing changes:**
- `src/infrastructure/networking/core/mod.rs` - Fix `get_external_addresses()`
- `src/infrastructure/networking/core/event_loop.rs` - Add pairing request after connection
- `src/infrastructure/networking/protocols/pairing.rs` - Complete protocol handler integration
- `src/lib.rs` - Update Core pairing methods to handle async flow
## Testing Strategy
Once implemented, test with:
```bash
# Terminal 1: Start Alice and generate pairing code
RUST_LOG=libp2p=debug ./target/release/spacedrive --instance alice start --enable-networking --foreground
RUST_LOG=libp2p=debug ./target/release/spacedrive network pair generate --instance alice
# Terminal 2: Start Bob and join Alice's session
RUST_LOG=libp2p=debug ./target/release/spacedrive --instance bob start --enable-networking --foreground
RUST_LOG=libp2p=debug ./target/release/spacedrive network pair join "pairing code words..." --instance bob
# Expected: Successful pairing with device exchange
```
## Architecture Assessment
The **networking architecture is solid** - the missing pieces are implementation details rather than fundamental design issues:
**Good**: DHT discovery, event system, protocol framework, error handling
**Missing**: Connection flow, message transmission, address resolution
Estimated effort: **2-4 hours** to complete basic pairing functionality.

View File

@@ -11,6 +11,7 @@ use crate::infrastructure::networking::{
};
use futures::StreamExt;
use libp2p::{
kad::{self, QueryId, RecordKey},
swarm::{Swarm, SwarmEvent},
Multiaddr, PeerId,
};
@@ -26,6 +27,17 @@ pub enum EventLoopCommand {
protocol: String,
data: Vec<u8>,
},
/// Publish a DHT record for pairing session discovery
PublishDhtRecord {
key: RecordKey,
value: Vec<u8>,
response_channel: tokio::sync::oneshot::Sender<Result<QueryId>>,
},
/// Query a DHT record for pairing session discovery
QueryDhtRecord {
key: RecordKey,
response_channel: tokio::sync::oneshot::Sender<Result<QueryId>>,
},
}
/// Central event loop for processing all LibP2P events
@@ -232,6 +244,38 @@ impl NetworkingEventLoop {
println!("Device {} not found or not connected", device_id);
}
}
EventLoopCommand::PublishDhtRecord {
key,
value,
response_channel,
} => {
// Create a DHT record with the provided key and value
let record = kad::Record::new(key, value);
// Publish the record to the DHT
match swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) {
Ok(query_id) => {
println!("Publishing DHT record with query ID: {:?}", query_id);
let _ = response_channel.send(Ok(query_id));
}
Err(e) => {
println!("Failed to publish DHT record: {:?}", e);
let _ = response_channel.send(Err(NetworkingError::Protocol(format!("DHT put failed: {:?}", e))));
}
}
}
EventLoopCommand::QueryDhtRecord {
key,
response_channel,
} => {
// Query the DHT for the record
let query_id = swarm.behaviour_mut().kademlia.get_record(key);
println!("Querying DHT record with query ID: {:?}", query_id);
// Send the query ID back to the caller
let _ = response_channel.send(Ok(query_id));
}
}
Ok(())
@@ -302,7 +346,105 @@ impl NetworkingEventLoop {
) -> Result<()> {
match event {
UnifiedBehaviourEvent::Kademlia(kad_event) => {
println!("Kademlia event: {:?}", kad_event);
use libp2p::kad;
match kad_event {
kad::Event::OutboundQueryProgressed {
id,
result: kad::QueryResult::PutRecord(put_result),
..
} => {
match put_result {
Ok(kad::PutRecordOk { key }) => {
println!("DHT record published successfully: query_id={:?}, key={:?}", id, key);
}
Err(kad::PutRecordError::QuorumFailed { key, success, quorum }) => {
println!("DHT record publish failed: query_id={:?}, key={:?}, success={:?}, quorum={:?}", id, key, success, quorum);
}
Err(kad::PutRecordError::Timeout { key, .. }) => {
println!("DHT record publish timed out: query_id={:?}, key={:?}", id, key);
}
}
}
kad::Event::OutboundQueryProgressed {
id,
result: kad::QueryResult::GetRecord(get_result),
..
} => {
match get_result {
Ok(kad::GetRecordOk::FoundRecord(record)) => {
println!("DHT record found: query_id={:?}, key={:?}, {} bytes", id, record.record.key, record.record.value.len());
// Try to deserialize as pairing advertisement
if let Ok(advertisement) = serde_json::from_slice::<crate::infrastructure::networking::protocols::pairing::PairingAdvertisement>(&record.record.value) {
println!("Found pairing advertisement from peer: {:?}", advertisement.peer_id);
// Convert strings back to libp2p types
if let (Ok(peer_id), Ok(addresses)) = (advertisement.peer_id(), advertisement.addresses()) {
// Extract session ID from the DHT key
if let Ok(session_id_bytes) = record.record.key.as_ref().try_into() {
let session_id = Uuid::from_bytes(session_id_bytes);
// Emit pairing discovery event
let _ = event_sender.send(NetworkEvent::PairingSessionDiscovered {
session_id,
peer_id,
addresses: addresses.clone(),
device_info: advertisement.device_info.clone(),
});
println!("Emitted pairing session discovery event for session: {}", session_id);
// Automatically connect to the discovered peer
for address in &addresses {
match swarm.dial(address.clone()) {
Ok(_) => {
println!("Dialing discovered peer {} at {}", peer_id, address);
break; // Try only the first successful dial
}
Err(e) => {
println!("Failed to dial {}: {:?}", address, e);
}
}
}
}
} else {
println!("Failed to parse peer_id or addresses from pairing advertisement");
}
}
}
Ok(kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
println!("DHT query finished, no additional records: query_id={:?}", id);
}
Err(kad::GetRecordError::NotFound { key, .. }) => {
println!("DHT record not found: query_id={:?}, key={:?}", id, key);
}
Err(kad::GetRecordError::QuorumFailed { key, .. }) => {
println!("DHT query quorum failed: query_id={:?}, key={:?}", id, key);
}
Err(kad::GetRecordError::Timeout { key }) => {
println!("DHT query timed out: query_id={:?}, key={:?}", id, key);
}
}
}
kad::Event::ModeChanged { new_mode } => {
println!("Kademlia mode changed: {:?}", new_mode);
}
kad::Event::RoutingUpdated { peer, .. } => {
println!("Kademlia routing updated: peer={}", peer);
}
kad::Event::UnroutablePeer { peer } => {
println!("Kademlia unroutable peer: {}", peer);
}
kad::Event::RoutablePeer { peer, .. } => {
println!("Kademlia routable peer: {}", peer);
}
kad::Event::PendingRoutablePeer { peer, .. } => {
println!("Kademlia pending routable peer: {}", peer);
}
_ => {
println!("Other Kademlia event: {:?}", kad_event);
}
}
}
UnifiedBehaviourEvent::Mdns(mdns_event) => {

View File

@@ -12,7 +12,7 @@ use crate::infrastructure::networking::{
utils::NetworkIdentity,
NetworkingError, Result,
};
use libp2p::{Multiaddr, PeerId, Swarm};
use libp2p::{kad::{QueryId, RecordKey}, Multiaddr, PeerId, Swarm};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
@@ -38,6 +38,12 @@ pub enum NetworkEvent {
device_info: DeviceInfo,
peer_id: PeerId,
},
PairingSessionDiscovered {
session_id: Uuid,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
device_info: DeviceInfo,
},
PairingCompleted {
device_id: Uuid,
device_info: DeviceInfo,
@@ -215,6 +221,62 @@ impl NetworkingCore {
pub fn device_registry(&self) -> Arc<RwLock<DeviceRegistry>> {
self.device_registry.clone()
}
/// Publish a DHT record for pairing session discovery
pub async fn publish_dht_record(&self, key: RecordKey, value: Vec<u8>) -> Result<QueryId> {
if let Some(command_sender) = &self.command_sender {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let command = event_loop::EventLoopCommand::PublishDhtRecord {
key,
value,
response_channel: response_tx,
};
command_sender.send(command).map_err(|_| {
NetworkingError::ConnectionFailed("Event loop not running".to_string())
})?;
response_rx.await.map_err(|_| {
NetworkingError::ConnectionFailed("Failed to receive DHT response".to_string())
})?
} else {
Err(NetworkingError::ConnectionFailed(
"Networking not started".to_string(),
))
}
}
/// Query a DHT record for pairing session discovery
pub async fn query_dht_record(&self, key: RecordKey) -> Result<QueryId> {
if let Some(command_sender) = &self.command_sender {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let command = event_loop::EventLoopCommand::QueryDhtRecord {
key,
response_channel: response_tx,
};
command_sender.send(command).map_err(|_| {
NetworkingError::ConnectionFailed("Event loop not running".to_string())
})?;
response_rx.await.map_err(|_| {
NetworkingError::ConnectionFailed("Failed to receive DHT response".to_string())
})?
} else {
Err(NetworkingError::ConnectionFailed(
"Networking not started".to_string(),
))
}
}
/// Get external addresses for advertising in DHT records
pub async fn get_external_addresses(&self) -> Vec<Multiaddr> {
// For now, return local addresses that we're listening on
// In production, this should include external addresses discovered via STUN/UPnP
vec![
"/ip4/127.0.0.1/tcp/0".parse().unwrap(), // Will be replaced with actual port
]
}
}
// Ensure NetworkingCore is Send + Sync for proper async usage

View File

@@ -7,6 +7,7 @@ use crate::infrastructure::networking::{
NetworkingError, Result,
};
use async_trait::async_trait;
use libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
@@ -265,6 +266,37 @@ pub enum PairingState {
Failed { reason: String },
}
/// DHT advertisement for pairing session discovery
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PairingAdvertisement {
/// The peer ID of the initiator (as string for serialization)
pub peer_id: String,
/// The network addresses where the initiator can be reached (as strings for serialization)
pub addresses: Vec<String>,
/// Device information of the initiator
pub device_info: DeviceInfo,
/// When this advertisement expires
pub expires_at: chrono::DateTime<chrono::Utc>,
/// When this advertisement was created
pub created_at: chrono::DateTime<chrono::Utc>,
}
impl PairingAdvertisement {
/// Convert peer ID string back to PeerId
pub fn peer_id(&self) -> Result<PeerId> {
self.peer_id.parse()
.map_err(|e| NetworkingError::Protocol(format!("Invalid peer ID: {}", e)))
}
/// Convert address strings back to Multiaddr
pub fn addresses(&self) -> Result<Vec<Multiaddr>> {
self.addresses.iter()
.map(|addr| addr.parse()
.map_err(|e| NetworkingError::Protocol(format!("Invalid address: {}", e))))
.collect()
}
}
/// Pairing messages
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PairingMessage {
@@ -304,6 +336,7 @@ impl PairingProtocolHandler {
}
/// Start a new pairing session as initiator
/// Returns the session ID which should be advertised via DHT by the caller
pub async fn start_pairing_session(&self) -> Result<Uuid> {
let session_id = Uuid::new_v4();
let session = PairingSession {
@@ -318,9 +351,24 @@ impl PairingProtocolHandler {
.write()
.await
.insert(session_id, session);
println!("Started pairing session: {}", session_id);
Ok(session_id)
}
/// Get device info for advertising in DHT records
pub fn get_device_info(&self) -> DeviceInfo {
DeviceInfo {
device_id: self.identity.device_id(),
device_name: "Spacedrive Device".to_string(), // TODO: Get from device manager
device_type: crate::infrastructure::networking::device::DeviceType::Desktop, // TODO: Get from device manager
os_version: std::env::consts::OS.to_string(),
app_version: "0.1.0".to_string(), // TODO: Get from application version
network_fingerprint: self.identity.network_fingerprint(),
last_seen: chrono::Utc::now(),
}
}
/// Cancel a pairing session
pub async fn cancel_session(&self, session_id: Uuid) -> Result<()> {
self.active_sessions.write().await.remove(&session_id);

View File

@@ -3,6 +3,7 @@
use crate::infrastructure::networking::{NetworkingError, Result};
use libp2p::{identity::Keypair, PeerId};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Network identity containing keypair and peer ID
#[derive(Clone)]
@@ -53,6 +54,30 @@ impl NetworkIdentity {
pub fn verify(&self, data: &[u8], signature: &[u8]) -> bool {
self.keypair.public().verify(data, signature)
}
/// Get a deterministic device ID from the network identity
pub fn device_id(&self) -> Uuid {
// Create a deterministic UUID from the peer ID
let peer_id_bytes = self.peer_id.to_bytes();
// Use the first 16 bytes of the peer ID hash to create a UUID
let mut uuid_bytes = [0u8; 16];
let hash = blake3::hash(&peer_id_bytes);
uuid_bytes.copy_from_slice(&hash.as_bytes()[..16]);
Uuid::from_bytes(uuid_bytes)
}
/// Get network fingerprint for device identification
pub fn network_fingerprint(&self) -> NetworkFingerprint {
let public_key_bytes = self.public_key_bytes();
let public_key_hash = blake3::hash(&public_key_bytes);
NetworkFingerprint {
peer_id: self.peer_id.to_string(),
public_key_hash: hex::encode(&public_key_hash.as_bytes()[..16]),
}
}
}
impl std::fmt::Debug for NetworkIdentity {

View File

@@ -496,7 +496,25 @@ impl Core {
let session_id = pairing_code.session_id();
// Start pairing session with the generated session ID
let _session_id = pairing_handler.start_pairing_session().await?;
let actual_session_id = pairing_handler.start_pairing_session().await?;
// Create pairing advertisement for DHT
let advertisement = networking::protocols::pairing::PairingAdvertisement {
peer_id: service.peer_id().to_string(),
addresses: service.get_external_addresses().await.into_iter()
.map(|addr| addr.to_string())
.collect(),
device_info: pairing_handler.get_device_info(),
expires_at: chrono::Utc::now() + chrono::Duration::minutes(5),
created_at: chrono::Utc::now(),
};
// Publish DHT record for discovery
let key = libp2p::kad::RecordKey::new(&session_id.as_bytes());
let value = serde_json::to_vec(&advertisement)?;
let query_id = service.publish_dht_record(key, value).await?;
println!("Published pairing session to DHT: session={}, query_id={:?}", session_id, query_id);
let expires_in = 300; // 5 minutes
@@ -517,11 +535,18 @@ impl Core {
let pairing_code = networking::protocols::pairing::PairingCode::from_string(code)?;
let session_id = pairing_code.session_id();
// Use networking core to join pairing session
let service = networking.read().await;
service
.send_message(session_id, "pairing", b"join_request".to_vec())
.await?;
// Query DHT for initiator's pairing advertisement
let key = libp2p::kad::RecordKey::new(&session_id.as_bytes());
let query_id = service.query_dht_record(key).await?;
println!("Querying DHT for pairing session: session={}, query_id={:?}", session_id, query_id);
// Note: The actual DHT response will be handled in the event loop
// When the record is found, it will contain the peer_id and addresses
// The application should wait for the DHT query to complete and then connect
// For now, we return success - the pairing will continue asynchronously
println!("DHT query initiated for pairing session: {}", session_id);
Ok(())
}