This commit is contained in:
Jamie Pine
2025-10-18 10:33:20 -07:00
parent 3ba1102020
commit dcc4dc4fc1
36 changed files with 4293 additions and 156 deletions

100
core/AGENTS.md Normal file
View File

@@ -0,0 +1,100 @@
# AGENTS.md - Spacedrive Core v2
## Build/Test Commands
- `cargo build` - Build the project
- `cargo test` - Run all tests
- `cargo test <test_name>` - Run specific test (e.g., `cargo test library_test`)
- `cargo clippy` - Lint code
- `cargo fmt` - Format code
- `cargo run --bin sd-cli -- <command>` - Run CLI (note: binary is `sd-cli`, not `spacedrive`)
## Code Style
- **Imports**: Group std, external crates, then local modules with blank lines between
- **Formatting**: Use `cargo fmt` - tabs for indentation, snake_case for variables/functions. DO NOT use emojis at all.
- **Types**: Explicit types preferred, use `Result<T, E>` for error handling with `thiserror`
- **Naming**: snake_case for functions/variables, PascalCase for types, SCREAMING_SNAKE_CASE for constants
- **Error Handling**: Use `Result` types, `thiserror` for custom errors, `anyhow` for application errors
- **Async**: Use `async/await`, prefer `tokio` primitives, avoid blocking operations
- **Resumable Jobs**: For long-running jobs that need to be resumable, store the job's state within the job's struct itself. Use `#[serde(skip)]` for fields that should not be persisted. For example, in a file copy job, the list of already copied files can be stored to allow the job to resume from where it left off.
- **Documentation**: Use `//!` for module docs, `///` for public items, include examples
- **Architecture**: Follow a Command Query Responsibility Segregation (CQRS) and Domain-Driven Design (DDD) pattern.
- **Domain**: Core data structures and business logic are located in `src/domain/`. These are the "nouns" of your system.
- **Operations**: State-changing commands (actions) and data-retrieving queries are located in `src/ops/`. These are the "verbs" of your system.
- **Actions**: Operations that modify the state of the application. They should be self-contained and transactional.
- **Queries**: Operations that retrieve data without modifying state. They should be efficient and optimized for reading.
- **Feature Modules**: Each new feature should be implemented in its own module within the `src/ops/` directory. For example, a new "share" feature would live in `src/ops/files/share`. Each feature module should contain the following files where applicable:
- `action.rs`: The main logic for the state-changing operation.
- `input.rs`: Data structures for the action's input.
- `output.rs`: Data structures for the action's output.
- `job.rs`: If the action is long-running, the job implementation.
- **Database**: Use SeaORM entities, async queries, proper error propagation
- **Comments**: Minimal inline comments, focus on why not what, no TODO comments in production code
## Daemon Architecture
Spacedrive uses a **daemon-client architecture** where a single daemon process manages the core functionality and multiple client applications (CLI, GraphQL server, desktop app) connect to it via Unix domain sockets.
> **For detailed daemon architecture documentation, see [/docs/core/daemon.md](/docs/core/daemon.md)**
### **The `Wire` Trait**
All actions and queries must implement the `Wire` trait to enable type-safe client-daemon communication:
```rust
pub trait Wire {
const METHOD: &'static str;
}
```
### **Registration Macros**
Instead of manually implementing `Wire`, use these registration macros that automatically:
1. Implement the `Wire` trait with the correct method string
2. Register the operation in the global registry using the `inventory` crate
**For Queries:**
```rust
crate::register_query!(NetworkStatusQuery, "network.status");
// Generates method: "query:network.status.v1"
```
**For Library Actions:**
```rust
crate::register_library_action!(FileCopyAction, "files.copy");
// Generates method: "action:files.copy.input.v1"
```
**For Core Actions:**
```rust
crate::register_core_action!(LibraryCreateAction, "libraries.create");
// Generates method: "action:libraries.create.input.v1"
```
### **Registry System**
- **Location**: `core/src/ops/registry.rs`
- **Mechanism**: Uses the `inventory` crate for compile-time registration
- **Global Maps**: `QUERIES` and `ACTIONS` hashmaps populated at startup
- **Handler Functions**: Generic handlers that decode payloads, execute operations, and encode results
## Logging Standards
- **Setup**: Use `tracing_subscriber::fmt()` with env filter for structured logging
- **Macros**: Use `info!`, `warn!`, `error!`, `debug!` from `tracing` crate, not `println!`
- **Job Context**: Use `ctx.log()` in jobs for job-specific logging with automatic job_id tagging
- **Structured**: Include relevant context fields: `debug!(job_id = %self.id, "message")`
- **Levels**: debug for detailed flow, info for user-relevant events, warn for recoverable issues, error for failures
- **Format**: `tracing_subscriber::fmt().with_env_filter(env_filter).init()` in main/examples
- **Environment**: Respect `RUST_LOG` env var, fallback to module-specific filters like `sd_core=info`
## Documentation
- **Core level docs**: Live in `/docs/core` - comprehensive architecture and implementation guides
- **Core design docs**: Live in `/docs/core/design` - planning documents, RFCs, and design decisions
- **Application level docs**: Live in `/docs`
- **Code docs**: Use `///` for public APIs, `//!` for module overviews, include examples
## Debug Instructions
- You can view the logs of a job in the job_logs directory in the root of the data folder
- When testing the CLI, after compiling you must first use the `restart` command to ensure the Spacedrive daemon is using the latest build.

563
core/PAIRING_FIX_PLAN.md Normal file
View File

@@ -0,0 +1,563 @@
# Pairing Protocol - Comprehensive Fix Plan
## Overview
This plan addresses both **critical security vulnerabilities** and **protocol correctness issues** that cause test failures.
**Priority**: CRITICAL - Current system is broken and insecure
---
## Phase 1: Fix Critical Protocol Flow (Required for Tests to Pass)
### Issue #1: Bob Completes Pairing Too Early
**File**: `joiner.rs:17-211` (handle_pairing_challenge)
**Current (WRONG)**:
```rust
pub(crate) async fn handle_pairing_challenge(...) -> Result<Vec<u8>> {
// Sign challenge
let signature = self.identity.sign(&challenge)?;
// Generate shared secret TOO EARLY
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
// Complete pairing BEFORE Alice confirms
registry.complete_pairing(device_id, initiator_device_info.clone(), session_keys).await?;
// Mark as connected BEFORE Alice confirms
registry.mark_connected(device_id, simple_connection).await?;
// Set state to Completed BEFORE receiving confirmation
session.state = PairingState::Completed;
// Send response
let response = PairingMessage::Response { session_id, response: signature, device_info };
Ok(serde_json::to_vec(&response)?)
}
```
**Fixed**:
```rust
pub(crate) async fn handle_pairing_challenge(...) -> Result<Vec<u8>> {
// Sign challenge
let signature = self.identity.sign(&challenge)?;
// Store initiator info for later (when we receive Complete)
{
let mut sessions = self.active_sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.remote_device_info = Some(initiator_device_info.clone());
session.state = PairingState::ResponseSent; // NOT Completed!
}
}
// ONLY send response, don't complete anything yet
let device_info = self.get_device_info().await?;
let response = PairingMessage::Response {
session_id,
response: signature,
device_info
};
Ok(serde_json::to_vec(&response)?)
}
```
**Changes**:
- Remove all lines 71-178 (shared secret, complete_pairing, mark_connected)
- Only store initiator_device_info for later use
- Transition to `ResponseSent` state (not `Completed`)
- Wait for `Complete` message before doing anything
### Issue #2: handle_completion is Redundant
**File**: `joiner.rs:214-411` (handle_completion)
**Current**: Does everything that handle_pairing_challenge already did
**Fixed**: This becomes the ONLY place Bob completes pairing
```rust
pub(crate) async fn handle_completion(...) -> Result<()> {
if success {
// NOW generate shared secret (not before!)
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
// Get initiator info we stored earlier
let initiator_device_info = {
let sessions = self.active_sessions.read().await;
sessions.get(&session_id)
.and_then(|s| s.remote_device_info.clone())
.ok_or(NetworkingError::Protocol("No device info stored".to_string()))?
};
let device_id = initiator_device_info.device_id;
// Register initiator in Pairing state
registry.start_pairing(device_id, node_id, session_id, node_addr)?;
// NOW complete pairing (Alice already confirmed!)
registry.complete_pairing(device_id, initiator_device_info.clone(), session_keys).await?;
// Mark as connected
registry.mark_connected(device_id, simple_connection).await?;
// Update session state
session.state = PairingState::Completed;
session.shared_secret = Some(shared_secret);
}
Ok(())
}
```
### Issue #3: Alice Must Guarantee Complete Send
**File**: `initiator.rs:120-295` (handle_pairing_response)
**Current**: Marks as completed, then sends Complete (might fail silently)
**Fixed**: Send Complete synchronously, fail if it doesn't work
```rust
pub(crate) async fn handle_pairing_response(...) -> Result<Vec<u8>> {
// ... signature verification ...
if !signature_valid {
// Mark as failed
session.state = PairingState::Failed { reason: "Invalid signature".to_string() };
// Send failure Complete message
let response = PairingMessage::Complete {
session_id,
success: false,
reason: Some("Invalid signature".to_string()),
};
return serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e));
}
// Signature valid - complete pairing
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
registry.start_pairing(device_id, node_id, session_id, node_addr)?;
registry.complete_pairing(device_id, device_info.clone(), session_keys).await?;
registry.mark_connected(device_id, simple_connection).await?;
// Update session BEFORE sending Complete
session.state = PairingState::Completed;
session.shared_secret = Some(shared_secret);
// Send success Complete message
let response = PairingMessage::Complete {
session_id,
success: true,
reason: None,
};
serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e))
}
```
**Key Change**: If Complete message fails to send, the error propagates and Bob never receives confirmation.
---
## Phase 2: Fix Critical Security Issues
### Issue #4: DoS via Unbounded Message Size
**File**: `mod.rs:704`, `mod.rs:647`
**Add constant**:
```rust
// At top of mod.rs
const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB max
```
**Fix both locations**:
```rust
// Line 647 (send_pairing_message_to_node)
let mut len_buf = [0u8; 4];
match recv.read_exact(&mut len_buf).await {
Ok(_) => {
let resp_len = u32::from_be_bytes(len_buf) as usize;
// Validate size
if resp_len > MAX_MESSAGE_SIZE {
return Err(NetworkingError::Protocol(
format!("Message too large: {} bytes", resp_len)
));
}
let mut resp_buf = vec![0u8; resp_len];
// ... rest of code ...
}
}
// Line 704 (handle_stream)
let msg_len = u32::from_be_bytes(len_buf) as usize;
// Validate size
if msg_len > MAX_MESSAGE_SIZE {
self.logger.error(&format!("Rejecting oversized message: {} bytes", msg_len)).await;
break;
}
let mut msg_buf = vec![0u8; msg_len];
```
### Issue #5: Replay Attack Protection
**File**: `initiator.rs:34`, `mod.rs:516`
**Add challenge tracking**:
```rust
// In PairingProtocolHandler struct
used_challenges: Arc<RwLock<HashMap<Vec<u8>, chrono::DateTime<Utc>>>>,
```
**Generate challenge with timestamp**:
```rust
fn generate_challenge(&self) -> Result<Vec<u8>> {
use rand::RngCore;
let mut challenge = vec![0u8; 40]; // 32 random + 8 timestamp
rand::thread_rng().fill_bytes(&mut challenge[0..32]);
// Add timestamp
let timestamp = chrono::Utc::now().timestamp();
challenge[32..40].copy_from_slice(&timestamp.to_be_bytes());
Ok(challenge)
}
```
**Verify challenge hasn't been used**:
```rust
// In handle_pairing_response, BEFORE verifying signature
let challenge_timestamp = {
if challenge.len() != 40 {
return Err(NetworkingError::Protocol("Invalid challenge format".to_string()));
}
let ts_bytes: [u8; 8] = challenge[32..40].try_into().unwrap();
let timestamp = i64::from_be_bytes(ts_bytes);
chrono::DateTime::from_timestamp(timestamp, 0)
.ok_or(NetworkingError::Protocol("Invalid challenge timestamp".to_string()))?
};
// Check if challenge is too old (> 5 minutes)
let now = chrono::Utc::now();
if now.signed_duration_since(challenge_timestamp) > chrono::Duration::minutes(5) {
return Err(NetworkingError::Protocol("Challenge expired".to_string()));
}
// Check if challenge was already used
{
let mut used = self.used_challenges.write().await;
if used.contains_key(&challenge) {
return Err(NetworkingError::Protocol("Challenge already used (replay attack?)".to_string()));
}
used.insert(challenge.clone(), now);
// Cleanup old challenges (> 10 minutes)
used.retain(|_, &mut used_at| {
now.signed_duration_since(used_at) < chrono::Duration::minutes(10)
});
}
```
### Issue #6: Use Proper KDF for Session Keys
**File**: `mod.rs:524-532`
**Current (WRONG)**:
```rust
async fn generate_shared_secret(&self, session_id: Uuid) -> Result<Vec<u8>> {
let pairing_codes = self.pairing_codes.read().await;
let pairing_code = pairing_codes.get(&session_id).ok_or_else(|| {
NetworkingError::Protocol(format!("No pairing code found for session {}", session_id))
})?;
Ok(pairing_code.secret().to_vec()) // Direct use!
}
```
**Fixed (use HKDF)**:
```rust
async fn generate_shared_secret(&self, session_id: Uuid) -> Result<Vec<u8>> {
let pairing_codes = self.pairing_codes.read().await;
let pairing_code = pairing_codes.get(&session_id).ok_or_else(|| {
NetworkingError::Protocol(format!("No pairing code found for session {}", session_id))
})?;
// Use HKDF with session-specific context
use blake3::Hasher;
let mut hasher = Hasher::new();
hasher.update(b"spacedrive-pairing-session-key-v1");
hasher.update(session_id.as_bytes());
hasher.update(pairing_code.secret());
Ok(hasher.finalize().as_bytes().to_vec())
}
```
### Issue #7: Fix TOCTOU Race in Session Creation
**File**: `mod.rs:254-279`
**Current (WRONG)**:
```rust
// Line 256: Check (read lock)
let sessions = self.active_sessions.read().await;
if let Some(existing_session) = sessions.get(&session_id) {
return Err(...);
}
// Lock released - RACE CONDITION HERE!
// Line 277: Insert (write lock)
let mut sessions = self.active_sessions.write().await;
sessions.insert(session_id, session);
```
**Fixed**:
```rust
// Single write lock for atomic check-and-insert
let mut sessions = self.active_sessions.write().await;
if sessions.contains_key(&session_id) {
return Err(NetworkingError::Protocol(format!(
"Session {} already exists",
session_id
)));
}
// Create new session
let session = PairingSession {
id: session_id,
state: PairingState::Scanning,
remote_device_id: None,
remote_device_info: None,
remote_public_key: None,
shared_secret: None,
created_at: chrono::Utc::now(),
};
sessions.insert(session_id, session);
```
### Issue #8: Align Timeout Values
**File**: `mod.rs:368`, `types.rs:52`
**Fix**:
```rust
// types.rs:52 - Keep 5 minutes
expires_at: Utc::now() + chrono::Duration::minutes(5),
// mod.rs:368 - Change from 10 to 5 minutes
let timeout_duration = chrono::Duration::minutes(5); // Match code expiry
```
---
## Phase 3: Fix Medium/Low Priority Issues
### Issue #9: Fix get_current_pairing_code
**File**: `mod.rs:233-237`
**Replace HashMap with BTreeMap** or track most recent explicitly:
```rust
// Option A: Use BTreeMap (ordered by insertion)
// In struct: pairing_codes: Arc<RwLock<BTreeMap<Uuid, PairingCode>>>,
// Option B: Track most recent explicitly
// In struct: most_recent_pairing_code: Arc<RwLock<Option<PairingCode>>>,
pub async fn get_current_pairing_code(&self) -> Option<PairingCode> {
self.most_recent_pairing_code.read().await.clone()
}
// Update when creating pairing code:
*self.most_recent_pairing_code.write().await = Some(pairing_code.clone());
```
### Issue #10: Sanitize Error Messages
**File**: Throughout
**Pattern**:
```rust
// Don't expose internal state
format!("Session {} already exists in state {:?}", session_id, existing_session.state)
// Generic error for external, detailed log for internal
self.log_error(&format!("Session {} already exists in state {:?}", session_id, existing_session.state)).await;
return Err(NetworkingError::Protocol("Session already exists".to_string()));
```
### Issue #11: Reduce Verbose Logging in Production
**File**: All files
Add log levels:
```rust
// Use debug! for verbose info
self.log_debug(&format!("Session state: {:?}", session.state)).await;
// Only info! for important events
self.log_info("Pairing completed successfully").await;
```
### Issue #12: Encrypt Persisted Sessions
**File**: `persistence.rs:118-161`
**Add encryption**:
```rust
// Use platform keychain to store encryption key
// Encrypt JSON before writing to disk
// Decrypt when reading
// Example using age or similar:
let encrypted_data = encrypt_with_platform_key(&json_data)?;
fs::write(&self.sessions_file, encrypted_data).await?;
```
---
## Implementation Order
### Step 1: Protocol Flow Fixes (REQUIRED FOR TESTS)
1. Fix `handle_pairing_challenge` to NOT complete pairing early
2. Fix `handle_completion` to be the ONLY place Bob completes
3. Fix `handle_pairing_response` to guarantee Complete send
4. Test: `cargo test device_pairing_test` should PASS
### Step 2: Critical Security (DO NOT SHIP WITHOUT)
5. Add message size limits (DoS fix)
6. Add replay attack protection
7. Fix TOCTOU race condition
8. Add proper KDF for session keys
### Step 3: Important Security
9. Encrypt persisted sessions
10. Align timeout values
11. Fix session_id derivation consistency (types.rs)
### Step 4: Polish
12. Fix get_current_pairing_code
13. Sanitize error messages
14. Reduce verbose logging
15. Improve cryptographic validation
---
## Testing Plan
### Unit Tests
```rust
#[tokio::test]
async fn test_challenge_replay_protection() {
// Generate challenge
// Use it once ✓
// Try to reuse it ✗ should fail
}
#[tokio::test]
async fn test_message_size_limit() {
// Try to send 5GB message
// Should fail with "Message too large"
}
#[tokio::test]
async fn test_joiner_waits_for_complete() {
// Bob sends Response
// Bob session should be ResponseSent (NOT Completed)
// Alice sends Complete
// NOW Bob session should be Completed
}
```
### Integration Test
```bash
# Should pass after Phase 1
cargo test device_pairing_test
```
---
## Success Criteria
**Protocol Correctness**:
- Test `device_pairing_test` passes 100% of the time
- No split-brain states possible
- Both Alice and Bob atomically complete pairing
**Security**:
- No DoS via large messages
- No replay attacks possible
- Proper mutual authentication
- Session keys properly derived
- Secrets encrypted at rest
**Code Quality**:
- No redundant logic
- Clear state machine
- Proper error handling
- Reasonable logging
---
## Estimated Effort
- **Phase 1**: 4-6 hours (protocol flow fixes)
- **Phase 2**: 4-6 hours (critical security)
- **Phase 3**: 2-4 hours (important security)
- **Phase 4**: 2-3 hours (polish)
- **Total**: ~12-19 hours
---
## Risk Assessment
**High Risk if NOT fixed**:
- Split-brain pairing states
- DoS attacks crash daemon
- Replay attacks compromise security
- Plaintext secrets on disk
**Low Risk when fixed**:
- Well-tested protocol
- Defense in depth
- Cryptographic guarantees
- Production-ready security

View File

@@ -0,0 +1,328 @@
# Pairing Protocol Flow Analysis
## Current Protocol Flow
### 1. Alice (Initiator) Starts Pairing
**File**: `core/mod.rs:428-622`
```
1. Call start_pairing_as_initiator()
2. Generate session_id and pairing code with relay info
3. Start pairing session in WaitingForConnection state
4. Broadcast session_id via mDNS user_data
5. Wait for Bob to connect...
```
### 2. Bob (Joiner) Starts Pairing
**File**: `core/mod.rs:630-920`
```
1. Call start_pairing_as_joiner(code)
2. Parse pairing code to get session_id
3. Join pairing session in Scanning state
4. Discover Alice via:
- mDNS (local network) OR
- Relay (cross-network)
5. Connect to Alice using PAIRING_ALPN
6. Send PairingRequest message to Alice (lines 865-913)
```
### 3. Alice Receives PairingRequest
**File**: `initiator.rs:18-117`
```
1. Validate Bob's public key (line 26)
2. Generate 32-byte random challenge (line 34)
3. Update session to ChallengeReceived state (lines 64 or 85)
4. Store Bob's public key in session (line 69 or 90)
5. Send Challenge message back to Bob (lines 104-116)
```
### 4. Bob Receives Challenge **CRITICAL ISSUE HERE**
**File**: `joiner.rs:17-211`
```
WRONG ORDER:
1. Sign challenge with Bob's private key (line 32)
2. Generate shared secret from pairing code (line 71)
3. Create session keys (line 72)
4. COMPLETE PAIRING IN REGISTRY (line 126) ← TOO EARLY!
5. MARK SESSION AS COMPLETED (line 175) ← TOO EARLY!
6. Send Response message to Alice (line 195)
CORRECT ORDER SHOULD BE:
1. Sign challenge
2. Send Response to Alice
3. Wait for Complete message from Alice
4. ONLY THEN complete pairing and mark as Completed
```
**Split-Brain Scenario**:
```
- Line 989 (mod.rs): command_sender.send(response) could fail
- If send fails: Alice never receives response
- But Bob already completed pairing (line 126)
- Result: Bob thinks paired, Alice doesn't
- Test fails because only Bob sees connection
```
### 5. Alice Receives Response
**File**: `initiator.rs:120-295`
```
1. Get Bob's public key from session (stored in step 3)
2. Verify signature: PairingSecurity::verify_challenge_response() (line 162)
3. If signature INVALID:
- Mark session as Failed
- Return error
4. If signature VALID:
- Generate shared secret (line 191)
- Register Bob in device registry (line 228)
- Complete pairing (line 244)
- Mark session as Completed (line 277)
- Send Complete message to Bob (line 288)
```
### 6. Bob Receives Complete (Redundant!)
**File**: `joiner.rs:214-411`
```
REDUNDANT: Everything already done in step 4!
1. Generate shared secret AGAIN (line 230)
2. Complete pairing AGAIN (line 326)
3. Mark session Completed AGAIN (line 342)
This code only runs if Alice successfully sent Complete
If Alice never sends Complete, Bob still thinks pairing succeeded (from step 4)
```
---
## Critical Protocol Flaws
### 1. No Cryptographic Certainty of Completion
**Issue**: Bob completes pairing without cryptographic proof that Alice verified his signature
**Attack Scenario**:
```
1. Attacker (Alice) sends Challenge to Bob
2. Bob signs challenge and sends Response
3. Bob immediately completes pairing (joiner.rs:126)
4. Attacker never verifies signature, just drops connection
5. Bob thinks pairing succeeded with attacker
6. Bob's device registry now has attacker's keys stored
```
**Fix**: Bob MUST wait for Alice's Complete message before completing pairing
### 2. Split-Brain State
**Issue**: Bob and Alice can have different views of pairing success
**Failure Modes**:
```
Mode A: Response send fails (mod.rs:989)
- Bob: Completed ✓
- Alice: WaitingForConnection or ChallengeReceived
- Result: Test fails, devices don't see each other
Mode B: Alice rejects signature
- Bob: Completed ✓
- Alice: Failed ✗
- Result: Bob thinks paired, Alice knows it failed
Mode C: Complete message send fails
- Bob: Completed ✓
- Alice: Completed ✓
- But Bob's completion handler never runs (joiner.rs:214)
- Result: Actually works, but redundant code confusing
```
### 3. Redundant Completion Logic
**Issue**: `handle_completion()` duplicates all work already done in `handle_pairing_challenge()`
**Code Smell**:
```rust
// joiner.rs:71-178 (in handle_pairing_challenge)
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
registry.complete_pairing(...).await?;
session.state = PairingState::Completed;
// joiner.rs:230-344 (in handle_completion) - EXACT SAME LOGIC
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
registry.complete_pairing(...).await?;
session.state = PairingState::Completed;
```
This suggests the protocol state machine is incorrectly designed.
### 4. No Message Ordering Guarantees
**Issue**: QUIC streams don't guarantee Complete arrives before Bob times out
Even if Alice sends Complete, network delays could cause:
```
1. Bob sends Response at T+0ms
2. Bob completes pairing at T+1ms
3. Alice receives Response at T+500ms
4. Alice sends Complete at T+501ms
5. Bob's test timeout at T+1000ms ← Fails before Complete arrives
```
---
## Why Test Fails
**Test File**: `tests/device_pairing_test.rs:89-134`
Alice waits for:
```rust
let connected_devices = core.services.device.get_connected_devices().await.unwrap();
if !connected_devices.is_empty() { // Line 96
println!("Alice: Pairing completed successfully!");
```
Bob waits for:
```rust
let connected_devices = core.services.device.get_connected_devices().await.unwrap();
if !connected_devices.is_empty() { // Line 215
println!("Bob: Pairing completed successfully!");
```
**Failure Modes**:
1. **Bob completes before Alice receives Response**:
- Bob marks self as Completed (joiner.rs:175)
- Bob calls registry.mark_connected (joiner.rs:151)
- Bob sees Alice as connected ✓
- Alice never receives Response (network loss, send failure)
- Alice stays in ChallengeReceived state
- Alice never calls registry.mark_connected
- Alice doesn't see Bob as connected ✗
- **Test hangs on Alice's wait loop (line 92-134)**
2. **Alice rejects Bob's signature**:
- Alice calls verify_challenge_response (initiator.rs:162)
- Signature verification fails (corrupted data, timing attack, etc.)
- Alice marks session as Failed (initiator.rs:173-176)
- Alice never sends Complete
- Bob completed pairing already (joiner.rs:126)
- Bob sees Alice as connected ✓
- Alice sees Bob as failed ✗
- **Test hangs on Alice's wait loop**
3. **TOCTOU in connection tracking**:
- Bob sends Response
- Alice verifies signature ✓
- Alice completes pairing
- Alice marks Bob as connected (initiator.rs:256)
- Alice sends Complete message
- **Complete send fails** (connection closed, network error)
- Bob's handle_completion never called
- But Bob already completed (joiner.rs:126)
- Both think paired, but Alice's registry might not have correct state
- **Test might pass or fail randomly**
---
## Correct Protocol Design
### Fixed Flow
```
1. Alice → Bob: Challenge(session_id, challenge, alice_device_info)
2. Bob signs challenge
3. Bob → Alice: Response(session_id, signature, bob_device_info)
4. Alice verifies signature
5. Alice generates shared_secret
6. Alice completes pairing
7. Alice → Bob: Complete(session_id, success=true)
8. Bob receives Complete
9. Bob NOW completes pairing (NOT before step 8!)
10. Both sides atomically mark as connected
```
### Required Changes
**File: `joiner.rs:17-211` (handle_pairing_challenge)**
```rust
// REMOVE these lines (66-178):
// - generate_shared_secret()
// - SessionKeys::from_shared_secret()
// - registry.complete_pairing()
// - registry.mark_connected()
// - session.state = PairingState::Completed
// ONLY do:
1. Sign challenge
2. Send Response
3. Transition to ResponseSent state (NOT Completed!)
4. Wait for Complete message
```
**File: `joiner.rs:214-411` (handle_completion)**
```rust
// KEEP all the pairing logic HERE (not in handle_pairing_challenge)
// This is the ONLY place Bob should complete pairing
```
**File: `initiator.rs:120-295` (handle_pairing_response)**
```rust
// MUST send Complete message synchronously
// Cannot fail silently
// If send fails, mark session as Failed
```
---
## Security Implications
### Current System is Vulnerable
**Vulnerability**: Bob completes pairing without cryptographic proof Alice accepted him
**Attack**: Rogue Alice
```
1. Attacker runs modified Alice that:
- Sends Challenge to Bob
- Receives Bob's signed Response
- Never verifies signature
- Stores Bob's public key and device info
- Drops connection
2. Bob completes pairing (current code joiner.rs:126)
3. Bob thinks he's paired with legitimate Alice
4. Attacker has Bob's:
- Public key
- Device info
- Session keys (derived from pairing code Bob entered)
```
**Fix**: Bob MUST wait for Complete message before trusting the pairing
### Proper Mutual Authentication
Both sides must cryptographically confirm:
```
Alice verifies: Bob signed the challenge with his claimed public key
Bob verifies: Alice sent Complete message (proves Alice accepted the signature)
```
Only after BOTH verifications should pairing complete on both sides.
---
## Test Requirements
For `device_pairing_test.rs` to pass 100%:
1. Alice must see Bob as connected
2. Bob must see Alice as connected
3. Both must happen atomically (no split-brain)
4. Must handle network failures gracefully
5. Must have timeout if pairing fails
Current code fails because Bob completes before Alice confirms.

View File

@@ -0,0 +1,186 @@
# Pairing Protocol Security Issues
## CRITICAL - Must Fix Immediately
### 1. Memory Exhaustion DoS Vulnerability
**Severity**: CRITICAL
**Location**: `mod.rs:704`, `mod.rs:647`
**Impact**: Attacker can crash the application by claiming 4GB message size
```rust
// VULNERABLE CODE
let msg_len = u32::from_be_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len]; // NO SIZE LIMIT!
```
**Fix**: Add maximum message size constant
```rust
const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB
if msg_len > MAX_MESSAGE_SIZE {
return Err(NetworkingError::Protocol("Message too large".to_string()));
}
```
---
### 2. Plaintext Storage of Cryptographic Secrets
**Severity**: CRITICAL
**Location**: `persistence.rs:118-161`
**Impact**: Filesystem access = complete security compromise
```rust
// VULNERABLE CODE
let json_data = serde_json::to_string_pretty(&persisted) // Plaintext!
```
**Fix**: Encrypt sessions file or use platform keychain
---
### 3. Session State Split-Brain Condition
**Severity**: CRITICAL
**Location**: `joiner.rs:66-191`
**Impact**: Joiner completes pairing before confirming initiator received response
**Current Flow (BROKEN)**:
```
Joiner:
1. Generates shared secret (line 71)
2. Completes pairing in registry (line 126)
3. Marks session as Completed (line 175)
4. THEN sends response (line 989 in mod.rs)
└─> If this fails, initiator never completes but joiner thinks it did
```
**Fix**: Only complete after receiving confirmation from initiator
---
### 4. Session Fixation via QR Codes
**Severity**: CRITICAL
**Location**: `types.rs:126-218`
**Impact**: Attacker can create QR code with controlled session_id
```rust
// Line 213: session_id from QR is trusted
session_id, // Use the session_id from the QR code
```
**Fix**: Always derive session_id from cryptographic secret
---
## HIGH - Important to Address
### 5. TOCTOU Race Condition in Session Creation
**Severity**: HIGH
**Location**: `mod.rs:254-279`
**Impact**: Concurrent session creation can corrupt state
```rust
// RACE CONDITION
let sessions = self.active_sessions.read().await; // Check
if sessions.get(&session_id).is_some() { return Err(...); }
// Lock released - another thread could insert here!
let mut sessions = self.active_sessions.write().await; // Insert
sessions.insert(session_id, session);
```
**Fix**: Use single write lock with entry API
---
### 6. No Replay Attack Protection
**Severity**: HIGH
**Location**: `initiator.rs:34`
**Impact**: Captured challenge-response can be replayed
**Fix**: Add timestamp to challenges and track used challenges
---
### 7. No Key Derivation Function
**Severity**: HIGH
**Location**: `mod.rs:524-532`
**Impact**: Pairing code secret used directly as session key
```rust
// WEAK
Ok(pairing_code.secret().to_vec()) // Direct use!
```
**Fix**: Use HKDF with session-specific context
---
### 8. Mismatched Timeout Values
**Severity**: HIGH
**Location**: `mod.rs:368`, `types.rs:52`
```rust
// Session cleanup: 10 minutes
let timeout_duration = chrono::Duration::minutes(10);
// Code expiry: 5 minutes
expires_at: Utc::now() + chrono::Duration::minutes(5),
```
**Fix**: Align timeouts (use 5 minutes for both)
---
## MEDIUM
### 9. Unpredictable Pairing Code Selection
**Severity**: MEDIUM
**Location**: `mod.rs:233-237`
```rust
codes.values().last().cloned() // HashMap order is random!
```
**Fix**: Use BTreeMap or track most recent explicitly
---
### 10. Overly Detailed Error Messages
**Severity**: MEDIUM
**Impact**: Reveals internal state to attackers
**Fix**: Generic errors for external-facing, detailed logs internally
---
### 11. Premature Connection Status
**Severity**: MEDIUM
**Location**: `initiator.rs:248-273`, `joiner.rs:140-163`
```rust
let simple_connection = ConnectionInfo {
addresses: vec![], // Empty! Not really connected
```
**Fix**: Only mark connected when real connection established
---
## LOW
### 12. Weak Cryptographic Validation
**Location**: `security.rs:50-55`
**Fix**: Check for more weak patterns beyond all-zeros
### 13. Verbose Production Logging
**Fix**: Reduce logging in production builds
### 14. Inconsistent Session ID Derivation
**Location**: `types.rs:59-92`
**Fix**: Clarify why session_id is re-derived in `from_session_id()`
---
## Fix Priority
1. **Phase 1 (Immediate)**: #1, #3, #5, #6
2. **Phase 2 (Important)**: #2, #4, #7, #8
3. **Phase 3 (Nice to have)**: #9, #10, #11, #12, #13, #14

18
core/crush.json Normal file
View File

@@ -0,0 +1,18 @@
{
"$schema": "https://charm.land/crush.json",
"providers": {
"lmstudio": {
"name": "LM Studio",
"base_url": "http://localhost:1234/v1/",
"type": "openai",
"models": [
{
"name": "Qwen3 30B MOE",
"id": "local-model",
"context_window": 131072,
"default_max_tokens": 20000
}
]
}
}
}

View File

@@ -0,0 +1,181 @@
//! Location Watcher Demo
//!
//! This example demonstrates how to use the location watcher to monitor
//! file system changes in real-time.
use sd_core::{infra::event::Event, Core};
use std::path::PathBuf;
use tokio::time::{sleep, Duration};
use tracing::info;
use uuid::Uuid;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt::init();
info!("Starting Location Watcher Demo");
// Initialize core
let core = Core::new().await?;
info!("Core initialized successfully");
// Create a test library
let library = core
.libraries
.create_library("Watcher Demo Library", None, core.context.clone())
.await?;
let library_id = library.id();
info!("Created demo library: {}", library_id);
// Add a location to watch
let watch_dir = PathBuf::from("./data/spacedrive_watcher_demo");
tokio::fs::create_dir_all(&watch_dir).await?;
let location_id = Uuid::new_v4();
core.add_watched_location(location_id, library_id, watch_dir.clone(), true)
.await?;
info!("Added watched location: {}", watch_dir.display());
// Subscribe to events
let mut event_subscriber = core.events.subscribe();
// Spawn event listener
let events_handle = tokio::spawn(async move {
info!("Event listener started");
while let Ok(event) = event_subscriber.recv().await {
match event {
Event::EntryCreated {
library_id,
entry_id,
} => {
info!(
"File created - Library: {}, Entry: {}",
library_id, entry_id
);
}
Event::EntryModified {
library_id,
entry_id,
} => {
info!(
" File modified - Library: {}, Entry: {}",
library_id, entry_id
);
}
Event::EntryDeleted {
library_id,
entry_id,
} => {
info!(
" File deleted - Library: {}, Entry: {}",
library_id, entry_id
);
}
Event::EntryMoved {
library_id,
entry_id,
old_path,
new_path,
} => {
info!(
"File moved - Library: {}, Entry: {}, {} -> {}",
library_id, entry_id, old_path, new_path
);
}
_ => {} // Ignore other events for this demo
}
}
});
// Simulate file operations
info!("Starting file operations simulation...");
// Create a test file
let test_file = watch_dir.join("test_file.txt");
tokio::fs::write(&test_file, "Hello, Spacedrive!").await?;
info!("Created test file: {}", test_file.display());
sleep(Duration::from_millis(200)).await;
// Modify the file
tokio::fs::write(&test_file, "Hello, Spacedrive! Modified content.").await?;
info!("Modified test file");
sleep(Duration::from_millis(200)).await;
// Create a directory
let test_dir = watch_dir.join("test_directory");
tokio::fs::create_dir(&test_dir).await?;
info!("Created test directory: {}", test_dir.display());
sleep(Duration::from_millis(200)).await;
// Create a file in the directory
let nested_file = test_dir.join("nested_file.txt");
tokio::fs::write(&nested_file, "Nested file content").await?;
info!("Created nested file: {}", nested_file.display());
sleep(Duration::from_millis(200)).await;
// Rename the file
let renamed_file = test_dir.join("renamed_file.txt");
tokio::fs::rename(&nested_file, &renamed_file).await?;
info!(
"Renamed file: {} -> {}",
nested_file.display(),
renamed_file.display()
);
sleep(Duration::from_millis(200)).await;
// Delete the file
tokio::fs::remove_file(&renamed_file).await?;
info!("Deleted file: {}", renamed_file.display());
sleep(Duration::from_millis(200)).await;
// Delete the directory
tokio::fs::remove_dir(&test_dir).await?;
info!("Deleted directory: {}", test_dir.display());
sleep(Duration::from_millis(200)).await;
// Delete the original test file
tokio::fs::remove_file(&test_file).await?;
info!("Deleted test file: {}", test_file.display());
// Give some time for all events to be processed
sleep(Duration::from_secs(2)).await;
// Display current watched locations
let watched_locations = core.get_watched_locations().await;
info!("Currently watching {} locations:", watched_locations.len());
for location in watched_locations {
info!(
" - {} ({}): {} [{}]",
location.id,
location.library_id,
location.path.display(),
if location.enabled {
"enabled"
} else {
"disabled"
}
);
}
// Clean up
core.remove_watched_location(location_id).await?;
info!("Removed watched location");
// Clean up directory
if watch_dir.exists() {
tokio::fs::remove_dir_all(&watch_dir).await?;
info!("Cleaned up demo directory");
}
// Stop event listener
events_handle.abort();
// Shutdown core
core.shutdown().await?;
info!("Demo completed successfully");
Ok(())
}

2
core/rust-toolchain.toml Normal file
View File

@@ -0,0 +1,2 @@
[toolchain]
channel = "stable"

118
core/scripts/combine.sh Executable file
View File

@@ -0,0 +1,118 @@
#!/bin/bash
# A generic script to combine files from a directory into a single file.
# Function to show usage
usage() {
echo "Usage: $0 <command> [options]"
echo ""
echo "Commands:"
echo " docs [--with-design] Combine documentation files (.md) from 'docs/'."
echo " --with-design: Include the 'docs/design' directory."
echo " rust [path] Combine Rust files (.rs) from a given path (default: '.')."
echo " Respects .gitignore."
echo " cli Combine Rust files (.rs) from 'apps/cli'."
echo " Respects .gitignore."
echo " tasks Combine task files (.md) from '.tasks/'."
echo ""
echo "Options:"
echo "-h, --help Show this help message."
}
# Function to check if a file should be ignored based on .gitignore
is_ignored() {
git check-ignore -q "$1"
}
# Main combine function
combine_files() {
local search_path=$1
local file_pattern=$2
local output_file=$3
local title=$4
local lang_tag=$5
local respect_gitignore=$6
shift 6
local exclude_patterns=("$@")
echo "Combining files from '$search_path' into '$output_file'..."
# Clear the output file
> "$output_file"
echo "# $title" >> "$output_file"
echo "" >> "$output_file"
# Build find args and exclude patterns
local find_args=("$search_path" -name "$file_pattern" -type f)
for pattern in "${exclude_patterns[@]}"; do
find_args+=("!" "-path" "$pattern")
done
# Run find with exec to safely process files one-by-one
find "${find_args[@]}" | sort | while read -r file; do
if [ "$respect_gitignore" = "true" ] && git check-ignore -q "$file"; then
continue
fi
echo "Processing: $file"
echo "## ${file}" >> "$output_file"
echo "" >> "$output_file"
echo '```'$lang_tag >> "$output_file"
if [ -r "$file" ]; then
cat "$file" >> "$output_file"
else
echo "[Could not read file]" >> "$output_file"
fi
echo '```' >> "$output_file"
echo "" >> "$output_file"
done
echo "Done! All files have been combined into $output_file"
}
# Main script logic
if [ "$#" -eq 0 ]; then
usage
exit 1
fi
COMMAND=$1
shift
case $COMMAND in
docs)
include_design=false
if [ "$1" = "--with-design" ]; then
include_design=true
fi
exclude_patterns=()
if [ "$include_design" = "false" ]; then
exclude_patterns+=("../docs/design/*")
fi
combine_files "../docs" "*.md" "combined_docs.txt" "Combined Documentation Files" "markdown" "false" "${exclude_patterns[@]}"
;;
rust)
root_path=${1:-.}
combine_files "$root_path" "*.rs" "combined_rust_files.txt" "Combined Rust Files" "rust" "true"
;;
cli)
combine_files "./apps/cli" "*.rs" "combined_cli_rust_files.txt" "Combined CLI Rust Files" "rust" "true"
;;
tasks)
combine_files "../.tasks" "*.md" "combined_tasks.txt" "Combined Task Files" "markdown" "false"
;;
-h|--help)
usage
;;
*)
echo "Error: Unknown command '$COMMAND'"
usage
exit 1
;;
esac

26
core/scripts/test_daemon.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/bin/bash
# Start daemon in background
./target/release/spacedrive daemon start --foreground 2>&1 | tee daemon_test.log &
DAEMON_PID=$!
# Wait for daemon to initialize
sleep 3
# Try to create a library
echo "Creating library..."
./target/release/spacedrive library create "TestLib" 2>&1
# Check if daemon is still running
if kill -0 $DAEMON_PID 2>/dev/null; then
echo "Daemon is still running"
else
echo "Daemon crashed!"
fi
# Stop daemon
kill $DAEMON_PID 2>/dev/null
# Show any errors from the log
echo "Last 20 lines of daemon log:"
tail -20 daemon_test.log

View File

@@ -0,0 +1,40 @@
#!/bin/bash
# --- Configuration ---
PROJECT_ROOT="/Users/jamespine/Projects/spacedrive/core"
INSTALL_DIR="/usr/local/bin"
BINARY_NAME="spacedrive"
# --- Script Logic ---
echo "Updating Spacedrive CLI..."
# Navigate to the project root
cd "$PROJECT_ROOT" || { echo "Error: Could not navigate to project root: $PROJECT_ROOT"; exit 1; }
# Pull latest changes from Git
echo "Pulling latest changes from Git..."
git pull || { echo "Error: Git pull failed."; exit 1; }
# Build the project in release mode
echo "Building Spacedrive CLI (release mode)..."
cargo build --release || { echo "Error: Cargo build failed."; exit 1; }
# Define source and destination paths
SOURCE_BINARY="$PROJECT_ROOT/target/release/$BINARY_NAME"
DEST_BINARY="$INSTALL_DIR/$BINARY_NAME"
# Ensure the installation directory exists
mkdir -p "$INSTALL_DIR" || { echo "Error: Could not create installation directory: $INSTALL_DIR"; exit 1; }
# Copy the new binary to the installation directory
echo "Copying new binary to $DEST_BINARY..."
cp "$SOURCE_BINARY" "$DEST_BINARY" || { echo "Error: Could not copy binary to $DEST_BINARY. Check permissions."; exit 1; }
echo "Spacedrive CLI updated successfully!"
# Optional: Display the version of the newly installed binary
if [ -x "$DEST_BINARY" ]; then
echo "Installed version:"
"$DEST_BINARY" --version
fi

112
core/src/cqrs.rs Normal file
View File

@@ -0,0 +1,112 @@
//! CQRS (Command Query Responsibility Segregation) for Spacedrive Core
//!
//! This module provides a simplified CQRS implementation that leverages existing
//! infrastructure while providing modular outputs (copying the job system pattern).
//! TODO: Remove Query in favor of CoreQuery and LibraryQuery
use crate::context::CoreContext;
use anyhow::Result;
use std::sync::Arc;
use uuid::Uuid;
/// A query that retrieves data without mutating state.
///
/// This trait provides the foundation for read-only operations with
/// consistent infrastructure (validation, permissions, logging).
pub trait Query: Send + 'static {
/// The data structure returned by the query (owned by the operation module).
type Output: Send + Sync + 'static;
type Input: Send + Sync + 'static;
/// Execute this query with the given context.
fn execute(
self,
context: Arc<CoreContext>,
) -> impl std::future::Future<Output = Result<Self::Output>> + Send;
}
/// Library-scoped query that operates within a specific library context
pub trait LibraryQuery: Send + 'static {
/// The input data structure for this query
type Input: Send + Sync + 'static;
/// The data structure returned by the query
type Output: Send + Sync + 'static;
/// Create query from input
fn from_input(input: Self::Input) -> Result<Self>
where
Self: Sized;
/// Execute this query with rich session context
///
/// The session provides authentication, permissions, audit context,
/// and library context when needed
fn execute(
self,
context: Arc<CoreContext>,
session: crate::infra::api::SessionContext,
) -> impl std::future::Future<Output = Result<Self::Output>> + Send;
}
/// Core-level query that operates at the daemon level
pub trait CoreQuery: Send + 'static {
/// The input data structure for this query
type Input: Send + Sync + 'static;
/// The data structure returned by the query
type Output: Send + Sync + 'static;
/// Create query from input
fn from_input(input: Self::Input) -> Result<Self>
where
Self: Sized;
/// Execute this query with rich session context
///
/// The session provides authentication, permissions, and audit context
fn execute(
self,
context: Arc<CoreContext>,
session: crate::infra::api::SessionContext,
) -> impl std::future::Future<Output = Result<Self::Output>> + Send;
}
/// QueryManager provides infrastructure for read-only operations.
///
/// This mirrors the ActionManager pattern but for queries, providing
/// validation, permissions checking, and audit logging for read operations.
pub struct QueryManager {
context: Arc<CoreContext>,
}
impl QueryManager {
/// Create a new QueryManager
pub fn new(context: Arc<CoreContext>) -> Self {
Self { context }
}
/// Dispatch a query for execution with full infrastructure support
pub async fn dispatch<Q: Query>(&self, query: Q) -> Result<Q::Output> {
query.execute(self.context.clone()).await
}
/// Dispatch a core-scoped query for execution with full infrastructure support
pub async fn dispatch_core<Q: CoreQuery>(&self, query: Q) -> Result<Q::Output> {
// Create session context for core queries
let device_id = self.context.device_manager.device_id()?;
let session = crate::infra::api::SessionContext::device_session(device_id, "Core Device".to_string());
query.execute(self.context.clone(), session).await
}
/// Dispatch a library-scoped query for execution with full infrastructure support
pub async fn dispatch_library<Q: LibraryQuery>(
&self,
query: Q,
library_id: Uuid,
) -> Result<Q::Output> {
// Create session context for library queries with library context
let device_id = self.context.device_manager.device_id()?;
let mut session = crate::infra::api::SessionContext::device_session(device_id, "Core Device".to_string());
session = session.with_library(library_id);
query.execute(self.context.clone(), session).await
}
}

242
core/src/domain/entry.rs Normal file
View File

@@ -0,0 +1,242 @@
//! Entry - the core file/directory representation in Spacedrive
//!
//! An Entry represents any filesystem item (file, directory, symlink) that
//! Spacedrive knows about. It's the foundation of the VDFS.
use crate::domain::addressing::SdPath;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use specta::Type;
use uuid::Uuid;
/// Represents any filesystem entry (file or directory) in the VDFS
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct Entry {
/// Unique identifier for this entry
pub id: Uuid,
/// The virtual path including device context
pub sd_path: SdPathSerialized,
/// File/directory name
pub name: String,
/// Type of entry
pub kind: EntryKind,
/// Size in bytes (None for directories)
pub size: Option<u64>,
/// Filesystem timestamps
pub created_at: Option<DateTime<Utc>>,
pub modified_at: Option<DateTime<Utc>>,
pub accessed_at: Option<DateTime<Utc>>,
/// Platform-specific identifiers
pub inode: Option<u64>, // Unix/macOS
pub file_id: Option<u64>, // Windows
/// Parent directory entry ID
pub parent_id: Option<Uuid>,
/// Location this entry belongs to (if indexed)
pub location_id: Option<Uuid>,
/// User metadata (ALWAYS exists - key innovation!)
pub metadata_id: Uuid,
/// Content identity for deduplication (optional)
pub content_id: Option<Uuid>,
/// Tracking information
pub first_seen_at: DateTime<Utc>,
pub last_indexed_at: Option<DateTime<Utc>>,
}
/// Type of filesystem entry
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Type)]
pub enum EntryKind {
/// Regular file
File {
/// File extension (without dot)
extension: Option<String>,
},
/// Directory
Directory,
/// Symbolic link
Symlink {
/// Target path
target: String,
},
}
/// How SdPath is stored in the database
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct SdPathSerialized {
/// Device where this entry exists
pub device_id: Uuid,
/// Normalized path on that device
pub path: String,
}
impl SdPathSerialized {
/// Create from an SdPath
pub fn from_sdpath(sdpath: &SdPath) -> Option<Self> {
match sdpath {
SdPath::Physical { device_id, path } => Some(Self {
device_id: *device_id,
path: path.to_string_lossy().to_string(),
}),
SdPath::Content { .. } => None, // Can't serialize content paths to this format
}
}
/// Convert back to SdPath
pub fn to_sdpath(&self) -> SdPath {
SdPath::Physical {
device_id: self.device_id,
path: self.path.clone().into(),
}
}
}
impl Entry {
/// Create a new Entry from filesystem metadata
pub fn new(sd_path: SdPath, metadata: std::fs::Metadata) -> Self {
let name = sd_path.file_name().unwrap_or("unknown").to_string();
let kind = if metadata.is_dir() {
EntryKind::Directory
} else if metadata.is_symlink() {
EntryKind::Symlink {
target: String::new(), // Would need to read link
}
} else {
let extension = sd_path
.path()
.and_then(|p| p.extension())
.and_then(|e| e.to_str())
.map(|e| e.to_string());
EntryKind::File { extension }
};
let size = if metadata.is_file() {
Some(metadata.len())
} else {
None
};
Self {
id: Uuid::new_v4(),
sd_path: SdPathSerialized::from_sdpath(&sd_path)
.expect("Entry requires a physical path"),
name,
kind,
size,
created_at: metadata.created().ok().map(|t| t.into()),
modified_at: metadata.modified().ok().map(|t| t.into()),
accessed_at: metadata.accessed().ok().map(|t| t.into()),
inode: None, // Platform-specific, would need conditional compilation
file_id: None,
parent_id: None,
location_id: None,
metadata_id: Uuid::new_v4(), // Will create UserMetadata with this ID
content_id: None,
first_seen_at: Utc::now(),
last_indexed_at: None,
}
}
/// Check if this is a file
pub fn is_file(&self) -> bool {
matches!(self.kind, EntryKind::File { .. })
}
/// Check if this is a directory
pub fn is_directory(&self) -> bool {
matches!(self.kind, EntryKind::Directory)
}
/// Get the file extension if this is a file
pub fn extension(&self) -> Option<&str> {
match &self.kind {
EntryKind::File { extension } => extension.as_deref(),
_ => None,
}
}
/// Get the SdPath for this entry
pub fn sd_path(&self) -> SdPath {
self.sd_path.to_sdpath()
}
}
/// Conversion from database model to domain Entry
impl TryFrom<(crate::infra::db::entities::entry::Model, SdPath)> for Entry {
type Error = anyhow::Error;
fn try_from(
(entry_model, parent_sd_path): (crate::infra::db::entities::entry::Model, SdPath),
) -> Result<Self, Self::Error> {
let device_uuid = match &parent_sd_path {
SdPath::Physical { device_id, .. } => *device_id,
SdPath::Content { .. } => {
return Err(anyhow::anyhow!(
"Content-addressed paths not supported for directory listing"
))
}
};
// Construct the full path properly to avoid double slashes
// TODO: validation should happen on SdPath imo
let full_path = if entry_model.parent_id.is_none() {
format!("/{}", entry_model.name)
} else {
let parent_path = parent_sd_path.display().to_string();
if parent_path.ends_with('/') {
format!("{}{}", parent_path, entry_model.name)
} else {
format!("{}/{}", parent_path, entry_model.name)
}
};
Ok(Entry {
id: entry_model.uuid.unwrap_or_else(|| Uuid::new_v4()),
sd_path: SdPathSerialized {
device_id: device_uuid,
path: full_path,
},
name: entry_model.name,
kind: match entry_model.kind {
0 => EntryKind::File {
extension: entry_model.extension,
},
1 => EntryKind::Directory,
2 => EntryKind::Symlink {
target: "".to_string(), // TODO: Get from database
},
_ => EntryKind::File {
extension: entry_model.extension,
},
},
size: Some(entry_model.size as u64),
created_at: Some(entry_model.created_at),
modified_at: Some(entry_model.modified_at),
accessed_at: entry_model.accessed_at,
inode: entry_model.inode.map(|i| i as u64),
file_id: None,
parent_id: entry_model.parent_id.map(|_| Uuid::new_v4()), // TODO: Proper UUID conversion
location_id: None,
metadata_id: entry_model
.metadata_id
.map(|_| Uuid::new_v4())
.unwrap_or_else(Uuid::new_v4),
content_id: entry_model.content_id.map(|_| Uuid::new_v4()), // TODO: Proper UUID conversion
first_seen_at: entry_model.created_at,
last_indexed_at: Some(entry_model.created_at),
})
}
}

View File

@@ -0,0 +1,19 @@
//! Label entity
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "labels")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub uuid: Uuid,
pub name: String,
pub created_at: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -0,0 +1,42 @@
//! MetadataLabel junction entity
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "metadata_labels")]
pub struct Model {
#[sea_orm(primary_key)]
pub metadata_id: i32,
#[sea_orm(primary_key)]
pub label_id: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user_metadata::Entity",
from = "Column::MetadataId",
to = "super::user_metadata::Column::Id"
)]
UserMetadata,
#[sea_orm(
belongs_to = "super::label::Entity",
from = "Column::LabelId",
to = "super::label::Column::Id"
)]
Label,
}
impl Related<super::user_metadata::Entity> for Entity {
fn to() -> RelationDef {
Relation::UserMetadata.def()
}
}
impl Related<super::label::Entity> for Entity {
fn to() -> RelationDef {
Relation::Label.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -440,6 +440,58 @@ pub async fn query_device_state(
.map_err(|e| ApplyError::DatabaseError(e.to_string()))
}
/// Query all shared models for backfill (generic registry-based approach)
///
/// This discovers and queries ALL shared models registered in the system,
/// making it completely generic - no need to modify this when adding new shared models.
///
/// # Parameters
/// - `since`: Optional timestamp filter
/// - `batch_size`: Max records per model type
/// - `db`: Database connection
///
/// # Returns
/// HashMap mapping model_type -> Vec of (uuid, data, timestamp) tuples
pub async fn query_all_shared_models(
since: Option<chrono::DateTime<chrono::Utc>>,
batch_size: usize,
db: Arc<DatabaseConnection>,
) -> Result<HashMap<String, Vec<(uuid::Uuid, serde_json::Value, chrono::DateTime<chrono::Utc>)>>, ApplyError> {
// Collect all shared models with query functions
let shared_models: Vec<(String, StateQueryFn)> = {
let registry = SYNCABLE_REGISTRY.read().await;
registry
.iter()
.filter(|(_, reg)| !reg.is_device_owned && reg.state_query_fn.is_some())
.map(|(model_type, reg)| (model_type.clone(), reg.state_query_fn.unwrap()))
.collect()
}; // Lock dropped
// Query all models concurrently
let mut results = HashMap::new();
for (model_type, query_fn) in shared_models {
let records = query_fn(None, since, batch_size, db.clone())
.await
.map_err(|e| ApplyError::DatabaseError(format!(
"Failed to query {}: {}",
model_type,
e
)))?;
if !records.is_empty() {
tracing::info!(
model_type = %model_type,
count = records.len(),
"Queried shared model for backfill"
);
results.insert(model_type, records);
}
}
Ok(results)
}
/// Errors that can occur when applying sync entries
#[derive(Debug, thiserror::Error)]
pub enum ApplyError {

View File

@@ -321,6 +321,34 @@ impl Core {
Err(e) => error!("Failed to start services: {}", e),
}
// Set up networking event bridge and register protocol handlers AFTER networking is started
if service_config.networking_enabled {
if let Some(networking) = services.networking() {
// Set up event bridge to integrate network events with core event system
let event_bridge = NetworkEventBridge::new(
networking.subscribe_events(),
events.clone(),
);
tokio::spawn(event_bridge.run());
info!("Network event bridge initialized");
// Register default protocol handlers (pairing, messaging, file transfer)
info!("Registering default protocol handlers...");
let data_dir_for_protocols = config.read().await.data_dir.clone();
if let Err(e) = register_default_protocol_handlers(
&networking,
data_dir_for_protocols,
context.clone(),
)
.await
{
error!("Failed to register default protocol handlers: {}", e);
} else {
info!("Default protocol handlers registered successfully");
}
}
}
//Initialize ActionManager and set it in context
let action_manager = Arc::new(crate::infra::action::manager::ActionManager::new(
context.clone(),
@@ -403,23 +431,14 @@ impl Core {
// Register protocols and set up event bridge
if let Some(networking_service) = self.services.networking() {
// Check if protocols are already registered by checking handler count
let handler_count = {
let registry = networking_service.protocol_registry();
let registry_guard = registry.read().await;
registry_guard.handler_count()
};
// Register default protocol handlers if not already registered
if handler_count == 0 {
// Register default protocol handlers only if networking was just initialized
// (if networking was already initialized during Core::new(), protocols are already registered)
if !already_initialized {
logger.info("Registering protocol handlers...").await;
self.register_default_protocols(&networking_service).await?;
} else {
logger
.info(&format!(
"Protocol handlers already registered ({} handlers)",
handler_count
))
.info("Protocol handlers already registered during initialization")
.await;
}
@@ -445,79 +464,8 @@ impl Core {
&self,
networking: &service::network::NetworkingService,
) -> Result<(), Box<dyn std::error::Error>> {
let logger = std::sync::Arc::new(service::network::utils::logging::ConsoleLogger);
// Get command sender for the pairing handler's state machine
let command_sender = networking
.command_sender()
.ok_or("NetworkingEventLoop command sender not available")?
.clone();
// Get data directory from config
let data_dir = {
let config = self.config.read().await;
config.data_dir.clone()
};
let pairing_handler = Arc::new(
service::network::protocol::PairingProtocolHandler::new_with_persistence(
networking.identity().clone(),
networking.device_registry(),
logger.clone(),
command_sender,
data_dir,
),
);
// Try to load persisted sessions, but don't fail if there's an error
if let Err(e) = pairing_handler.load_persisted_sessions().await {
logger
.warn(&format!(
"Failed to load persisted pairing sessions: {}. Starting with empty sessions.",
e
))
.await;
}
// Start the state machine task for pairing
service::network::protocol::PairingProtocolHandler::start_state_machine_task(
pairing_handler.clone(),
);
// Start cleanup task for expired sessions
service::network::protocol::PairingProtocolHandler::start_cleanup_task(
pairing_handler.clone(),
);
let mut messaging_handler = service::network::protocol::MessagingProtocolHandler::new();
// Inject context for library operations
messaging_handler.set_context(self.context.clone());
let mut file_transfer_handler =
service::network::protocol::FileTransferProtocolHandler::new_default(logger.clone());
// Inject device registry into file transfer handler for encryption
file_transfer_handler.set_device_registry(networking.device_registry());
let protocol_registry = networking.protocol_registry();
{
let mut registry = protocol_registry.write().await;
registry.register_handler(pairing_handler)?;
registry.register_handler(Arc::new(messaging_handler))?;
registry.register_handler(Arc::new(file_transfer_handler))?;
logger
.info("All protocol handlers registered successfully")
.await;
}
// Brief delay to ensure protocol handlers are fully initialized and background
// tasks have started before accepting connections. This prevents race conditions
// where incoming connections arrive before handlers are ready.
// 50ms is imperceptible to users but sufficient for async task scheduling.
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(())
let data_dir = self.config.read().await.data_dir.clone();
register_default_protocol_handlers(networking, data_dir, self.context.clone()).await
}
/// Get the networking service (if initialized)
@@ -559,6 +507,83 @@ impl Core {
}
}
/// Standalone helper to register default protocol handlers
/// This is used both during Core::new() and when explicitly calling init_networking()
async fn register_default_protocol_handlers(
networking: &service::network::NetworkingService,
data_dir: PathBuf,
context: Arc<CoreContext>,
) -> Result<(), Box<dyn std::error::Error>> {
let logger = std::sync::Arc::new(service::network::utils::logging::ConsoleLogger);
// Get command sender for the pairing handler's state machine
let command_sender = networking
.command_sender()
.ok_or("NetworkingEventLoop command sender not available")?
.clone();
let pairing_handler = Arc::new(
service::network::protocol::PairingProtocolHandler::new_with_persistence(
networking.identity().clone(),
networking.device_registry(),
logger.clone(),
command_sender,
data_dir,
networking.endpoint().cloned(),
),
);
// Try to load persisted sessions, but don't fail if there's an error
if let Err(e) = pairing_handler.load_persisted_sessions().await {
logger
.warn(&format!(
"Failed to load persisted pairing sessions: {}. Starting with empty sessions.",
e
))
.await;
}
// Start the state machine task for pairing
service::network::protocol::PairingProtocolHandler::start_state_machine_task(
pairing_handler.clone(),
);
// Start cleanup task for expired sessions
service::network::protocol::PairingProtocolHandler::start_cleanup_task(
pairing_handler.clone(),
);
let mut messaging_handler = service::network::protocol::MessagingProtocolHandler::new();
// Inject context for library operations
messaging_handler.set_context(context);
let mut file_transfer_handler =
service::network::protocol::FileTransferProtocolHandler::new_default(logger.clone());
// Inject device registry into file transfer handler for encryption
file_transfer_handler.set_device_registry(networking.device_registry());
let protocol_registry = networking.protocol_registry();
{
let mut registry = protocol_registry.write().await;
registry.register_handler(pairing_handler)?;
registry.register_handler(Arc::new(messaging_handler))?;
registry.register_handler(Arc::new(file_transfer_handler))?;
logger
.info("All protocol handlers registered successfully")
.await;
}
// Brief delay to ensure protocol handlers are fully initialized and background
// tasks have started before accepting connections. This prevents race conditions
// where incoming connections arrive before handlers are ready.
// 50ms is imperceptible to users but sufficient for async task scheduling.
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(())
}
/// Set up log event emitter to forward tracing events to the event bus
fn setup_log_event_emitter(event_bus: Arc<crate::infra::event::EventBus>) {
use crate::infra::event::log_emitter::LogEventLayer;

41
core/src/ops/api_types.rs Normal file
View File

@@ -0,0 +1,41 @@
//! API-safe type wrappers for Swift export
//!
//! This module provides wrappers that convert internal types to
//! Swift-exportable types automatically.
use crate::infra::job::handle::{JobHandle, JobReceipt};
use serde::{Deserialize, Serialize};
use specta::Type;
/// Wrapper that converts JobHandle to JobReceipt for API export
/// This allows library actions to keep returning JobHandle internally
/// while exporting JobReceipt to Swift
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct ApiJobHandle(pub JobReceipt);
impl From<JobHandle> for ApiJobHandle {
fn from(handle: JobHandle) -> Self {
Self(handle.into())
}
}
impl From<&JobHandle> for ApiJobHandle {
fn from(handle: &JobHandle) -> Self {
Self(handle.into())
}
}
/// Trait to convert internal types to API-safe types
pub trait ToApiType {
type ApiType: Type + Serialize + for<'de> Deserialize<'de>;
fn to_api_type(self) -> Self::ApiType;
}
impl ToApiType for JobHandle {
type ApiType = ApiJobHandle;
fn to_api_type(self) -> Self::ApiType {
self.into()
}
}

View File

@@ -0,0 +1 @@

View File

@@ -9,6 +9,7 @@ use crate::infra::db::entities;
use crate::infra::event::FsRawEventKind;
use crate::ops::indexing::entry::EntryProcessor;
use crate::ops::indexing::path_resolver::PathResolver;
use crate::ops::indexing::rules::{build_default_ruler, RuleToggles, RulerDecision};
use crate::ops::indexing::state::{DirEntry, IndexerState};
use crate::ops::indexing::{ctx::ResponderCtx, IndexingCtx};
use anyhow::Result;
@@ -23,15 +24,23 @@ pub async fn apply(
context: &Arc<CoreContext>,
library_id: Uuid,
kind: FsRawEventKind,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<()> {
// Lightweight indexing context for DB access
let ctx = ResponderCtx::new(context, library_id).await?;
match kind {
FsRawEventKind::Create { path } => handle_create(&ctx, &path).await?,
FsRawEventKind::Modify { path } => handle_modify(&ctx, &path).await?,
FsRawEventKind::Create { path } => {
handle_create(&ctx, &path, rule_toggles, location_root).await?
}
FsRawEventKind::Modify { path } => {
handle_modify(&ctx, &path, rule_toggles, location_root).await?
}
FsRawEventKind::Remove { path } => handle_remove(&ctx, &path).await?,
FsRawEventKind::Rename { from, to } => handle_rename(&ctx, &from, &to).await?,
FsRawEventKind::Rename { from, to } => {
handle_rename(&ctx, &from, &to, rule_toggles, location_root).await?
}
}
Ok(())
}
@@ -41,6 +50,8 @@ pub async fn apply_batch(
context: &Arc<CoreContext>,
library_id: Uuid,
events: Vec<FsRawEventKind>,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<()> {
if events.is_empty() {
return Ok(());
@@ -76,7 +87,7 @@ pub async fn apply_batch(
// Process renames
for (from, to) in renames {
if let Err(e) = handle_rename(&ctx, &from, &to).await {
if let Err(e) = handle_rename(&ctx, &from, &to, rule_toggles, location_root).await {
tracing::error!(
"Failed to handle rename from {} to {}: {}",
from.display(),
@@ -88,14 +99,14 @@ pub async fn apply_batch(
// Process creates
for path in creates {
if let Err(e) = handle_create(&ctx, &path).await {
if let Err(e) = handle_create(&ctx, &path, rule_toggles, location_root).await {
tracing::error!("Failed to handle create for {}: {}", path.display(), e);
}
}
// Process modifies
for path in modifies {
if let Err(e) = handle_modify(&ctx, &path).await {
if let Err(e) = handle_modify(&ctx, &path, rule_toggles, location_root).await {
tracing::error!("Failed to handle modify for {}: {}", path.display(), e);
}
}
@@ -103,9 +114,61 @@ pub async fn apply_batch(
Ok(())
}
/// Check if a path should be filtered based on indexing rules
async fn should_filter_path(
path: &Path,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<bool> {
// Build ruler for this path using the same logic as the indexer
let ruler = build_default_ruler(rule_toggles, location_root, path).await;
// Get metadata for the path
let metadata = tokio::fs::metadata(path).await?;
// Simple metadata implementation for rule evaluation
struct SimpleMetadata {
is_dir: bool,
}
impl crate::ops::indexing::rules::MetadataForIndexerRules for SimpleMetadata {
fn is_dir(&self) -> bool {
self.is_dir
}
}
let simple_meta = SimpleMetadata {
is_dir: metadata.is_dir(),
};
// Evaluate the path against the ruler
match ruler.evaluate_path(path, &simple_meta).await {
Ok(RulerDecision::Reject) => {
debug!("Filtered path by indexing rules: {}", path.display());
Ok(true)
}
Ok(RulerDecision::Accept) => Ok(false),
Err(e) => {
tracing::warn!("Error evaluating rules for {}: {}", path.display(), e);
Ok(false) // Don't filter on error, let it through
}
}
}
/// Handle create: extract metadata and insert via EntryProcessor
async fn handle_create(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
async fn handle_create(
ctx: &impl IndexingCtx,
path: &Path,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<()> {
debug!("Create: {}", path.display());
// Check if path should be filtered
if should_filter_path(path, rule_toggles, location_root).await? {
debug!("Skipping filtered path: {}", path.display());
return Ok(());
}
let dir_entry = build_dir_entry(path).await?;
// If inode matches an existing entry at another path, treat this as a move
@@ -127,9 +190,20 @@ async fn handle_create(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
}
/// Handle modify: resolve entry ID by path, then update
async fn handle_modify(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
async fn handle_modify(
ctx: &impl IndexingCtx,
path: &Path,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<()> {
debug!("Modify: {}", path.display());
// Check if path should be filtered
if should_filter_path(path, rule_toggles, location_root).await? {
debug!("Skipping filtered path: {}", path.display());
return Ok(());
}
// If inode indicates a move, handle as a move and skip update
// Responder uses direct filesystem access (None backend) since it reacts to local FS events
let meta = EntryProcessor::extract_metadata(path, None).await?;
@@ -160,8 +234,22 @@ async fn handle_remove(ctx: &impl IndexingCtx, path: &Path) -> Result<()> {
}
/// Handle rename/move: resolve source entry and move via EntryProcessor
async fn handle_rename(ctx: &impl IndexingCtx, from: &Path, to: &Path) -> Result<()> {
async fn handle_rename(
ctx: &impl IndexingCtx,
from: &Path,
to: &Path,
rule_toggles: RuleToggles,
location_root: &Path,
) -> Result<()> {
debug!("Rename: {} -> {}", from.display(), to.display());
// Check if the destination path should be filtered
// If the file is being moved to a filtered location, we should remove it from the database
if should_filter_path(to, rule_toggles, location_root).await? {
debug!("Destination path is filtered, removing entry: {}", to.display());
// Treat this as a removal of the source file
return handle_remove(ctx, from).await;
}
if let Some(entry_id) = resolve_entry_id_by_path(ctx, from).await? {
debug!("Found entry {} for old path, moving to new path", entry_id);

483
core/src/ops/registry.rs Normal file
View File

@@ -0,0 +1,483 @@
//! Minimal action/query registry (action-centric) using `inventory`.
//!
//! Goals:
//! - Tiny, action-centric API: register Actions, decode their associated Inputs
//! - No conversion traits on inputs; Actions declare `type Input` and `from_input(..)`
//! - Single place that resolves `library_id` and dispatches
use futures::future::{FutureExt, LocalBoxFuture};
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
/// Registry handler for library queries - thin wrapper calling business logic
pub fn handle_library_query<Q>(
context: Arc<crate::context::CoreContext>,
session: crate::infra::api::SessionContext,
payload: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>
where
Q: crate::cqrs::LibraryQuery + 'static,
Q::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
Q::Output: serde::Serialize + std::fmt::Debug + 'static,
{
Box::pin(async move {
// Create dispatcher
let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone());
// Deserialize input
let input: Q::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?;
// Call business logic method
let output = dispatcher
.execute_library_query::<Q>(input, session)
.await
.map_err(|e| e.to_string())?;
// Serialize output
serde_json::to_value(output).map_err(|e| e.to_string())
})
}
/// Registry handler for core queries - thin wrapper calling business logic
pub fn handle_core_query<Q>(
context: Arc<crate::context::CoreContext>,
session: crate::infra::api::SessionContext,
payload: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>
where
Q: crate::cqrs::CoreQuery + 'static,
Q::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
Q::Output: serde::Serialize + std::fmt::Debug + 'static,
{
Box::pin(async move {
// Create dispatcher
let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone());
// Deserialize input
let input: Q::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?;
// Call business logic method
let output = dispatcher
.execute_core_query::<Q>(input, session)
.await
.map_err(|e| e.to_string())?;
// Serialize output
serde_json::to_value(output).map_err(|e| e.to_string())
})
}
/// Registry handler for library actions - thin wrapper calling business logic
pub fn handle_library_action<A>(
context: Arc<crate::context::CoreContext>,
session: crate::infra::api::SessionContext,
payload: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>
where
A: crate::infra::action::LibraryAction + 'static,
A::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
A::Output: serde::Serialize + std::fmt::Debug + 'static,
{
Box::pin(async move {
// Create dispatcher
let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone());
// Deserialize input
let input: A::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?;
// Call business logic method
let output = dispatcher
.execute_library_action::<A>(input, session)
.await
.map_err(|e| e.to_string())?;
// Serialize output
serde_json::to_value(output).map_err(|e| e.to_string())
})
}
/// Registry handler for core actions - thin wrapper calling business logic
pub fn handle_core_action<A>(
context: Arc<crate::context::CoreContext>,
payload: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>
where
A: crate::infra::action::CoreAction + 'static,
A::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static,
A::Output: serde::Serialize + std::fmt::Debug + 'static,
{
Box::pin(async move {
// Create dispatcher
let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone());
// Create base session
let session = dispatcher
.create_base_session()
.map_err(|e| e.to_string())?;
// Deserialize input
let input: A::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?;
// Call business logic method
let output = dispatcher
.execute_core_action::<A>(input, session)
.await
.map_err(|e| e.to_string())?;
// Serialize output
serde_json::to_value(output).map_err(|e| e.to_string())
})
}
/// Handler function signature for library queries.
pub type LibraryQueryHandlerFn = fn(
Arc<crate::context::CoreContext>,
crate::infra::api::SessionContext, // session with library context
serde_json::Value, // payload with Q::Input as JSON
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>;
/// Handler function signature for core queries.
pub type CoreQueryHandlerFn = fn(
Arc<crate::context::CoreContext>,
crate::infra::api::SessionContext, // session context
serde_json::Value, // payload with Q::Input as JSON
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>;
/// Handler function signature for library actions.
pub type LibraryActionHandlerFn = fn(
Arc<crate::context::CoreContext>,
crate::infra::api::SessionContext, // session with library context
serde_json::Value, // payload with A::Input as JSON
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>;
/// Handler function signature for core actions.
pub type CoreActionHandlerFn = fn(
Arc<crate::context::CoreContext>,
serde_json::Value, // payload with A::Input as JSON
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + 'static>,
>;
/// Registry entry for a library query operation.
pub struct LibraryQueryEntry {
pub method: &'static str,
pub handler: LibraryQueryHandlerFn,
}
/// Registry entry for a core query operation.
pub struct CoreQueryEntry {
pub method: &'static str,
pub handler: CoreQueryHandlerFn,
}
/// Registry entry for a library action operation.
pub struct LibraryActionEntry {
pub method: &'static str,
pub handler: LibraryActionHandlerFn,
}
/// Registry entry for a core action operation.
pub struct CoreActionEntry {
pub method: &'static str,
pub handler: CoreActionHandlerFn,
}
inventory::collect!(LibraryQueryEntry);
inventory::collect!(CoreQueryEntry);
inventory::collect!(LibraryActionEntry);
inventory::collect!(CoreActionEntry);
pub static LIBRARY_QUERIES: Lazy<HashMap<&'static str, LibraryQueryHandlerFn>> = Lazy::new(|| {
let mut map = HashMap::new();
for entry in inventory::iter::<LibraryQueryEntry>() {
map.insert(entry.method, entry.handler);
}
map
});
pub static CORE_QUERIES: Lazy<HashMap<&'static str, CoreQueryHandlerFn>> = Lazy::new(|| {
let mut map = HashMap::new();
for entry in inventory::iter::<CoreQueryEntry>() {
map.insert(entry.method, entry.handler);
}
map
});
pub static LIBRARY_ACTIONS: Lazy<HashMap<&'static str, LibraryActionHandlerFn>> = Lazy::new(|| {
let mut map = HashMap::new();
for entry in inventory::iter::<LibraryActionEntry>() {
map.insert(entry.method, entry.handler);
}
map
});
pub static CORE_ACTIONS: Lazy<HashMap<&'static str, CoreActionHandlerFn>> = Lazy::new(|| {
let mut map = HashMap::new();
for entry in inventory::iter::<CoreActionEntry>() {
map.insert(entry.method, entry.handler);
}
map
});
#[cfg(test)]
mod tests {
#[test]
fn list_registered_ops() {
// Collect and display registered actions
let mut action_methods: Vec<&'static str> =
crate::ops::registry::CORE_ACTIONS.keys().cloned().collect();
action_methods.sort();
println!("Registered actions ({}):", action_methods.len());
for method in &action_methods {
println!(" {}", method);
}
let mut library_action_methods: Vec<&'static str> = crate::ops::registry::LIBRARY_ACTIONS
.keys()
.cloned()
.collect();
library_action_methods.sort();
println!(
"Registered library actions ({}):",
library_action_methods.len()
);
for method in &library_action_methods {
println!(" {}", method);
}
// Collect and display registered queries
let mut query_methods: Vec<&'static str> =
crate::ops::registry::CORE_QUERIES.keys().cloned().collect();
query_methods.sort();
println!("Registered queries ({}):", query_methods.len());
for method in &query_methods {
println!(" {}", method);
}
let mut library_query_methods: Vec<&'static str> = crate::ops::registry::LIBRARY_QUERIES
.keys()
.cloned()
.collect();
library_query_methods.sort();
println!(
"Registered library queries ({}):",
library_query_methods.len()
);
for method in &library_query_methods {
println!(" {}", method);
}
// Ensure we have at least one action or query registered
assert!(
!action_methods.is_empty()
|| !query_methods.is_empty()
|| !library_action_methods.is_empty()
|| !library_query_methods.is_empty(),
"No actions or queries registered"
);
}
}
/// Helper: construct action method string from a short name like "files.copy"
#[macro_export]
macro_rules! action_method {
($name:literal) => {
concat!("action:", $name, ".input.v1")
};
}
/// Helper: construct query method string from a short name like "core.status"
#[macro_export]
macro_rules! query_method {
($name:literal) => {
concat!("query:", $name, ".v1")
};
}
/// Register a library query `Q` by short name; binds method to `Q::Input` and handler to `handle_library_query::<Q>`.
/// Implements QueryTypeInfo trait for automatic type extraction
#[macro_export]
macro_rules! register_library_query {
($query:ty, $name:literal) => {
impl $crate::client::Wire for <$query as $crate::cqrs::LibraryQuery>::Input {
const METHOD: &'static str = $crate::query_method!($name);
}
inventory::submit! {
$crate::ops::registry::LibraryQueryEntry {
method: <<$query as $crate::cqrs::LibraryQuery>::Input as $crate::client::Wire>::METHOD,
handler: $crate::ops::registry::handle_library_query::<$query>,
}
}
// Automatic QueryTypeInfo implementation for type extraction
impl $crate::ops::type_extraction::QueryTypeInfo for $query {
type Input = <$query as $crate::cqrs::LibraryQuery>::Input;
type Output = <$query as $crate::cqrs::LibraryQuery>::Output;
fn identifier() -> &'static str {
$name
}
fn scope() -> $crate::ops::type_extraction::QueryScope {
$crate::ops::type_extraction::QueryScope::Library
}
fn wire_method() -> String {
$crate::query_method!($name).to_string()
}
}
// Submit query type extractor to inventory
inventory::submit! {
$crate::ops::type_extraction::QueryExtractorEntry {
extractor: <$query as $crate::ops::type_extraction::QueryTypeInfo>::extract_types,
identifier: $name,
}
}
};
}
/// Register a core query `Q` by short name; binds method to `Q::Input` and handler to `handle_core_query::<Q>`.
/// Implements QueryTypeInfo trait for automatic type extraction
#[macro_export]
macro_rules! register_core_query {
($query:ty, $name:literal) => {
impl $crate::client::Wire for <$query as $crate::cqrs::CoreQuery>::Input {
const METHOD: &'static str = $crate::query_method!($name);
}
inventory::submit! {
$crate::ops::registry::CoreQueryEntry {
method: <<$query as $crate::cqrs::CoreQuery>::Input as $crate::client::Wire>::METHOD,
handler: $crate::ops::registry::handle_core_query::<$query>,
}
}
// Automatic QueryTypeInfo implementation for type extraction
impl $crate::ops::type_extraction::QueryTypeInfo for $query {
type Input = <$query as $crate::cqrs::CoreQuery>::Input;
type Output = <$query as $crate::cqrs::CoreQuery>::Output;
fn identifier() -> &'static str {
$name
}
fn scope() -> $crate::ops::type_extraction::QueryScope {
$crate::ops::type_extraction::QueryScope::Core
}
fn wire_method() -> String {
$crate::query_method!($name).to_string()
}
}
// Submit query type extractor to inventory
inventory::submit! {
$crate::ops::type_extraction::QueryExtractorEntry {
extractor: <$query as $crate::ops::type_extraction::QueryTypeInfo>::extract_types,
identifier: $name,
}
}
};
}
/// Register a library action `A` by short name; binds method to `A::Input` and handler to `handle_library_action::<A>`.
/// Implements OperationTypeInfo trait for automatic type extraction
#[macro_export]
macro_rules! register_library_action {
($action:ty, $name:literal) => {
impl $crate::client::Wire for <$action as $crate::infra::action::LibraryAction>::Input {
const METHOD: &'static str = $crate::action_method!($name);
}
inventory::submit! {
$crate::ops::registry::LibraryActionEntry {
method: <<$action as $crate::infra::action::LibraryAction>::Input as $crate::client::Wire>::METHOD,
handler: $crate::ops::registry::handle_library_action::<$action>,
}
}
// Automatic OperationTypeInfo implementation for type extraction
impl $crate::ops::type_extraction::OperationTypeInfo for $action {
type Input = <$action as $crate::infra::action::LibraryAction>::Input;
type Output = <$action as $crate::infra::action::LibraryAction>::Output;
fn identifier() -> &'static str {
$name
}
fn scope() -> $crate::ops::type_extraction::OperationScope {
$crate::ops::type_extraction::OperationScope::Library
}
fn wire_method() -> String {
$crate::action_method!($name).to_string()
}
}
// Submit type extractor to inventory for compile-time collection
inventory::submit! {
$crate::ops::type_extraction::TypeExtractorEntry {
extractor: <$action as $crate::ops::type_extraction::OperationTypeInfo>::extract_types,
identifier: $name,
}
}
};
}
/// Register a core action `A` similarly.
/// Implements OperationTypeInfo trait for automatic type extraction
#[macro_export]
macro_rules! register_core_action {
($action:ty, $name:literal) => {
impl $crate::client::Wire for <$action as $crate::infra::action::CoreAction>::Input {
const METHOD: &'static str = $crate::action_method!($name);
}
inventory::submit! {
$crate::ops::registry::CoreActionEntry {
method: <<$action as $crate::infra::action::CoreAction>::Input as $crate::client::Wire>::METHOD,
handler: $crate::ops::registry::handle_core_action::<$action>,
}
}
// Automatic OperationTypeInfo implementation for core actions
impl $crate::ops::type_extraction::OperationTypeInfo for $action {
type Input = <$action as $crate::infra::action::CoreAction>::Input;
type Output = <$action as $crate::infra::action::CoreAction>::Output;
fn identifier() -> &'static str {
$name
}
fn scope() -> $crate::ops::type_extraction::OperationScope {
$crate::ops::type_extraction::OperationScope::Core
}
fn wire_method() -> String {
$crate::action_method!($name).to_string()
}
}
// Submit type extractor to inventory for compile-time collection
inventory::submit! {
$crate::ops::type_extraction::TypeExtractorEntry {
extractor: <$action as $crate::ops::type_extraction::OperationTypeInfo>::extract_types,
identifier: $name,
}
}
};
}

View File

@@ -0,0 +1,33 @@
//! Test the type extraction system with just the types that have derives
use crate::ops::type_extraction::*;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_working_operations() {
let (operations, queries, collection) = generate_spacedrive_api();
println!(
"RSPC Magic: Discovered {} operations and {} queries",
operations.len(),
queries.len()
);
println!("Type collection has {} types", collection.len());
// Show discovered operations
for op in operations.iter() {
println!(" Operation: {} -> {}", op.identifier, op.wire_method);
}
for query in queries.iter() {
println!(" Query: {} -> {}", query.identifier, query.wire_method);
}
if !operations.is_empty() {
println!("RSPC-inspired type extraction is working!");
}
}
}

View File

@@ -0,0 +1,697 @@
//! rspc-inspired trait-based type extraction for automatic API generation
//!
//! This module implements the core trait system that allows automatic discovery
//! and extraction of Input/Output types from registered operations at compile-time.
use serde::{de::DeserializeOwned, Serialize};
use specta::{DataType, Type, TypeCollection};
/// Operation scope - automatically determined by registration macro
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OperationScope {
Core,
Library,
}
/// Query scope - automatically determined by registration macro
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryScope {
Core,
Library,
}
/// Core trait that provides compile-time type information for operations
///
/// This is inspired by rspc's resolver trait system and enables automatic
/// type extraction without runtime iteration over inventory data.
pub trait OperationTypeInfo {
/// The input type for this operation
type Input: Type + Serialize + DeserializeOwned + 'static;
/// The output type for this operation
type Output: Type + Serialize + DeserializeOwned + 'static;
/// The operation identifier (e.g., "files.copy", "libraries.create")
fn identifier() -> &'static str;
/// The operation scope (Core or Library) - automatically determined by registration macro
fn scope() -> OperationScope;
/// Generate the wire method string for this operation
fn wire_method() -> String;
/// Extract type metadata and register with Specta's TypeCollection
/// This is the key method that enables automatic type discovery
fn extract_types(collection: &mut TypeCollection) -> OperationMetadata {
// Register the types with Specta and get their DataType definitions
let input_type = Self::Input::definition(collection);
let output_type = Self::Output::definition(collection);
let input_type_name = extract_type_name(std::any::type_name::<Self::Input>());
let output_type_name = extract_type_name(std::any::type_name::<Self::Output>());
// Debug output for type names
if Self::identifier() == "jobs.info" {
println!(
"DEBUG: jobs.info input type: {} -> {}",
std::any::type_name::<Self::Input>(),
input_type_name
);
println!(
"DEBUG: jobs.info output type: {} -> {}",
std::any::type_name::<Self::Output>(),
output_type_name
);
}
if Self::identifier() == "jobs.list" {
println!(
"DEBUG: jobs.list input type: {} -> {}",
std::any::type_name::<Self::Input>(),
input_type_name
);
println!(
"DEBUG: jobs.list output type: {} -> {}",
std::any::type_name::<Self::Output>(),
output_type_name
);
}
OperationMetadata {
identifier: Self::identifier(),
wire_method: Self::wire_method(),
input_type,
output_type,
input_type_name,
output_type_name,
scope: Self::scope(),
}
}
}
/// Similar trait for query operations
pub trait QueryTypeInfo {
/// Query input type (often () for queries with no parameters)
type Input: Type + Serialize + DeserializeOwned + 'static;
/// Query output type
type Output: Type + Serialize + DeserializeOwned + 'static;
/// Query identifier (e.g., "jobs.list", "libraries.list")
fn identifier() -> &'static str;
/// The query scope (Core or Library) - automatically determined by registration macro
fn scope() -> QueryScope;
/// Generate wire method for queries
fn wire_method() -> String;
/// Extract query type metadata
fn extract_types(collection: &mut TypeCollection) -> QueryMetadata {
// Register the types with Specta and get their DataType definitions
let input_type = Self::Input::definition(collection);
let output_type = Self::Output::definition(collection);
QueryMetadata {
identifier: Self::identifier(),
wire_method: Self::wire_method(),
input_type,
output_type,
input_type_name: extract_type_name(std::any::type_name::<Self::Input>()),
output_type_name: extract_type_name(std::any::type_name::<Self::Output>()),
scope: Self::scope(),
}
}
}
/// Metadata extracted from an operation
#[derive(Debug, Clone)]
pub struct OperationMetadata {
pub identifier: &'static str,
pub wire_method: String,
pub input_type: DataType,
pub output_type: DataType,
pub input_type_name: String,
pub output_type_name: String,
pub scope: OperationScope,
}
/// Metadata extracted from a query
#[derive(Debug, Clone)]
pub struct QueryMetadata {
pub identifier: &'static str,
pub wire_method: String,
pub input_type: DataType,
pub output_type: DataType,
pub input_type_name: String,
pub output_type_name: String,
pub scope: QueryScope,
}
/// Entry for collecting type extractors via inventory
/// This is the key that makes compile-time collection possible
pub struct TypeExtractorEntry {
/// Function that extracts operation metadata and registers types
pub extractor: fn(&mut TypeCollection) -> OperationMetadata,
pub identifier: &'static str,
}
/// Entry for collecting query type extractors
pub struct QueryExtractorEntry {
/// Function that extracts query metadata and registers types
pub extractor: fn(&mut TypeCollection) -> QueryMetadata,
pub identifier: &'static str,
}
// Collect type extractors via inventory - this enables compile-time discovery
inventory::collect!(TypeExtractorEntry);
inventory::collect!(QueryExtractorEntry);
/// Generate complete API metadata by running all collected type extractors
///
/// This is the rspc-inspired magic: we iterate over compile-time registered
/// extractors rather than runtime data, solving the timeline problem.
pub fn generate_spacedrive_api() -> (Vec<OperationMetadata>, Vec<QueryMetadata>, TypeCollection) {
let mut collection = TypeCollection::default();
let mut operations = Vec::new();
let mut queries = Vec::new();
// Extract all operations - this works because extractors are registered at compile-time
for entry in inventory::iter::<TypeExtractorEntry>() {
let metadata = (entry.extractor)(&mut collection);
operations.push(metadata);
}
// Extract all queries
for entry in inventory::iter::<QueryExtractorEntry>() {
let metadata = (entry.extractor)(&mut collection);
queries.push(metadata);
}
// Register event types in the same collection to avoid duplicates
collection.register_mut::<crate::infra::event::Event>();
collection.register_mut::<crate::infra::event::FsRawEventKind>();
collection.register_mut::<crate::infra::event::FileOperation>();
(operations, queries, collection)
}
/// Generate the complete Spacedrive API structure as a Specta-compatible type
///
/// This creates a runtime representation of our API structure that Specta can export.
/// Similar to rspc's approach with TypesOrType, but tailored for Spacedrive's needs.
pub fn create_spacedrive_api_structure(
operations: &[OperationMetadata],
queries: &[QueryMetadata],
) -> SpacedriveApiStructure {
let mut core_actions = Vec::new();
let mut library_actions = Vec::new();
let mut core_queries = Vec::new();
let mut library_queries = Vec::new();
// Group operations by scope - preserve the actual DataType objects!
for op in operations {
match op.scope {
OperationScope::Core => {
core_actions.push(ApiOperationType {
identifier: op.identifier.to_string(),
wire_method: op.wire_method.clone(),
input_type: op.input_type.clone(),
output_type: op.output_type.clone(),
input_type_name: op.input_type_name.clone(),
output_type_name: op.output_type_name.clone(),
});
}
OperationScope::Library => {
library_actions.push(ApiOperationType {
identifier: op.identifier.to_string(),
wire_method: op.wire_method.clone(),
input_type: op.input_type.clone(),
output_type: op.output_type.clone(),
input_type_name: op.input_type_name.clone(),
output_type_name: op.output_type_name.clone(),
});
}
}
}
// Group queries by scope - preserve the actual DataType objects!
for query in queries {
match query.scope {
QueryScope::Core => {
core_queries.push(ApiQueryType {
identifier: query.identifier.to_string(),
wire_method: query.wire_method.clone(),
input_type: query.input_type.clone(),
output_type: query.output_type.clone(),
input_type_name: query.input_type_name.clone(),
output_type_name: query.output_type_name.clone(),
});
}
QueryScope::Library => {
library_queries.push(ApiQueryType {
identifier: query.identifier.to_string(),
wire_method: query.wire_method.clone(),
input_type: query.input_type.clone(),
output_type: query.output_type.clone(),
input_type_name: query.input_type_name.clone(),
output_type_name: query.output_type_name.clone(),
});
}
}
}
SpacedriveApiStructure {
core_actions,
library_actions,
core_queries,
library_queries,
}
}
/// Represents the complete Spacedrive API structure for code generation
pub struct SpacedriveApiStructure {
pub core_actions: Vec<ApiOperationType>,
pub library_actions: Vec<ApiOperationType>,
pub core_queries: Vec<ApiQueryType>,
pub library_queries: Vec<ApiQueryType>,
}
/// Represents a single API operation with actual type information
pub struct ApiOperationType {
pub identifier: String,
pub wire_method: String,
pub input_type: specta::datatype::DataType,
pub output_type: specta::datatype::DataType,
pub input_type_name: String,
pub output_type_name: String,
}
/// Represents a single API query with actual type information
pub struct ApiQueryType {
pub identifier: String,
pub wire_method: String,
pub input_type: specta::datatype::DataType,
pub output_type: specta::datatype::DataType,
pub input_type_name: String,
pub output_type_name: String,
}
/// Intermediate struct to hold API function metadata for Swift code generation
/// This is used to organize operations and queries into namespaces and methods
#[derive(Debug, Clone)]
pub struct ApiFunction {
/// The namespace this function belongs to (e.g., "core", "libraries", "jobs")
pub namespace: String,
/// The method name within the namespace (e.g., "create", "list", "start")
pub method_name: String,
/// The full identifier (e.g., "libraries.create", "jobs.list")
pub identifier: String,
/// The wire method string (e.g., "action:libraries.create.input.v1")
pub wire_method: String,
/// Whether this is an action (true) or query (false)
pub is_action: bool,
/// The scope (Core or Library)
pub scope: String,
/// Input type name for Swift generation
pub input_type_name: String,
/// Output type name for Swift generation
pub output_type_name: String,
}
/// Extract API functions from the collected metadata
/// This organizes operations and queries into a flat list of functions with namespace information
pub fn extract_api_functions(
operations: &[OperationMetadata],
queries: &[QueryMetadata],
) -> Vec<ApiFunction> {
let mut functions = Vec::new();
// Process operations (actions)
for op in operations {
let namespace = extract_namespace(&op.identifier);
let method_name = extract_method_name(&op.identifier);
let scope = match op.scope {
OperationScope::Core => "Core",
OperationScope::Library => "Library",
};
functions.push(ApiFunction {
namespace,
method_name,
identifier: op.identifier.to_string(),
wire_method: op.wire_method.clone(),
is_action: true,
scope: scope.to_string(),
input_type_name: op.input_type_name.clone(),
output_type_name: op.output_type_name.clone(),
});
}
// Process queries
for query in queries {
let namespace = extract_namespace(&query.identifier);
let method_name = extract_method_name(&query.identifier);
let scope = match query.scope {
QueryScope::Core => "Core",
QueryScope::Library => "Library",
};
functions.push(ApiFunction {
namespace,
method_name,
identifier: query.identifier.to_string(),
wire_method: query.wire_method.clone(),
is_action: false,
scope: scope.to_string(),
input_type_name: query.input_type_name.clone(),
output_type_name: query.output_type_name.clone(),
});
}
functions
}
/// Extract namespace from identifier (e.g., "libraries.create" -> "libraries")
fn extract_namespace(identifier: &str) -> String {
identifier.split('.').next().unwrap_or("core").to_string()
}
/// Extract method name from identifier (e.g., "libraries.create" -> "create")
fn extract_method_name(identifier: &str) -> String {
identifier.split('.').skip(1).collect::<Vec<_>>().join("_")
}
/// Convert snake_case to PascalCase for Swift type names
fn to_pascal_case(s: &str) -> String {
s.split(&['.', '_'][..])
.map(|word| {
let mut chars = word.chars();
match chars.next() {
None => String::new(),
Some(first) => first.to_uppercase().collect::<String>() + &chars.as_str(),
}
})
.collect::<Vec<_>>()
.join("")
}
/// Extract just the type name from a full Rust type path
fn extract_type_name(full_type_name: &str) -> String {
// Handle unit type () - use Empty struct for Swift
if full_type_name == "()" {
let result = "Empty".to_string();
if full_type_name.contains("()") {
println!(
"DEBUG: Unit case: '{}' -> result: '{}'",
full_type_name, result
);
}
return result;
}
// Handle generic types like Option<T>, Vec<T>, etc.
// For Option<T>, we want just T
if full_type_name.contains("Option<") && full_type_name.ends_with(">") {
// Find the content inside Option<...>
let start = full_type_name.find("Option<").unwrap() + 7; // Skip "Option<"
let end = full_type_name.rfind(">").unwrap();
let inner = &full_type_name[start..end];
let result = extract_type_name(inner); // Recursively extract the inner type
if full_type_name.contains("JobInfo")
|| full_type_name.contains("Vec")
|| full_type_name.contains("()")
{
println!(
"DEBUG: Option case: '{}' -> inner: '{}' -> result: '{}'",
full_type_name, inner, result
);
}
return result;
}
// Handle Vec<T> - we want to keep Vec but with proper Swift syntax
if full_type_name.contains("Vec<") && full_type_name.ends_with(">") {
// Find the content inside Vec<...>
let start = full_type_name.find("Vec<").unwrap() + 4; // Skip "Vec<"
let end = full_type_name.rfind(">").unwrap();
let inner = &full_type_name[start..end];
let inner_type = extract_type_name(inner); // Recursively extract the inner type
let result = format!("[{}]", inner_type); // Convert to Swift array syntax
if full_type_name.contains("JobInfo")
|| full_type_name.contains("Vec")
|| full_type_name.contains("()")
{
println!(
"DEBUG: Vec case: '{}' -> inner: '{}' -> inner_type: '{}' -> result: '{}'",
full_type_name, inner, inner_type, result
);
}
return result;
}
// For other generic types, just return the base name
if full_type_name.contains('<') {
let base_name = full_type_name.split('<').next().unwrap_or(full_type_name);
let result = base_name.to_string();
if full_type_name.contains("JobInfo")
|| full_type_name.contains("Vec")
|| full_type_name.contains("()")
{
println!(
"DEBUG: Generic case: '{}' -> base_name: '{}' -> result: '{}'",
full_type_name, base_name, result
);
}
return result;
}
// For simple types, extract just the type name from the path
let type_name = full_type_name.split("::").last().unwrap_or(full_type_name);
let result = type_name.to_string();
if full_type_name.contains("JobInfo")
|| full_type_name.contains("Vec")
|| full_type_name.contains("()")
{
println!(
"DEBUG: Simple case: '{}' -> type_name: '{}' -> result: '{}'",
full_type_name, type_name, result
);
}
result
}
/// Convert snake_case to camelCase for Swift method names
fn to_camel_case(s: &str) -> String {
let mut words = s.split('_');
let first_word = words.next().unwrap_or("");
let rest_words: String = words
.map(|word| {
let mut chars = word.chars();
match chars.next() {
None => String::new(),
Some(first) => first.to_uppercase().collect::<String>() + &chars.as_str(),
}
})
.collect();
first_word.to_lowercase() + &rest_words
}
/// Generate Swift code for API namespace structs and their methods
pub fn generate_swift_api_code(functions: &[ApiFunction]) -> String {
let mut swift_code = String::new();
// Add import statement for Foundation (needed for async/await)
swift_code.push_str("import Foundation\n\n");
// Group functions by namespace
let mut namespaces: std::collections::HashMap<String, Vec<&ApiFunction>> =
std::collections::HashMap::new();
for func in functions {
namespaces
.entry(func.namespace.clone())
.or_default()
.push(func);
}
// Generate code for each namespace
for (namespace, funcs) in namespaces {
let namespace_struct_name = format!("{}API", to_pascal_case(&namespace));
swift_code.push_str(&format!("/// {} operations\n", to_pascal_case(&namespace)));
swift_code.push_str(&format!("public struct {} {{\n", namespace_struct_name));
swift_code.push_str(" private let client: SpacedriveClient\n");
swift_code.push_str("\n");
swift_code.push_str(" init(client: SpacedriveClient) {\n");
swift_code.push_str(" self.client = client\n");
swift_code.push_str(" }\n");
swift_code.push_str("\n");
// Generate methods for each function in this namespace
for func in funcs {
swift_code.push_str(&generate_swift_method(func));
swift_code.push_str("\n");
}
swift_code.push_str("}\n\n");
}
swift_code
}
/// Generate Swift method code for a single API function
fn generate_swift_method(func: &ApiFunction) -> String {
let method_name = to_camel_case(&func.method_name);
let input_type = &func.input_type_name;
let output_type = &func.output_type_name;
let wire_method = &func.wire_method;
// Determine if this is an action or query for documentation
let operation_type = if func.is_action { "action" } else { "query" };
let mut method_code = String::new();
// Add documentation comment
method_code.push_str(&format!(
" /// Execute {}: {}\n",
operation_type, func.identifier
));
// Generate method signature
if input_type == "EmptyInput" {
// For operations with no input, use Empty struct
method_code.push_str(&format!(
" public func {}() async throws -> {} {{\n",
method_name, output_type
));
method_code.push_str(" let input = Empty()\n");
} else {
// For operations with input, take the input as parameter
method_code.push_str(&format!(
" public func {}(_ input: {}) async throws -> {} {{\n",
method_name, input_type, output_type
));
}
// Generate method body
method_code.push_str(&format!(" return try await client.execute(\n"));
method_code.push_str(" input,\n");
method_code.push_str(&format!(" method: \"{}\",\n", wire_method));
method_code.push_str(&format!(" responseType: {}.self\n", output_type));
method_code.push_str(" )\n");
method_code.push_str(" }\n");
method_code
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_type_extraction_system() {
let (operations, queries, collection) = generate_spacedrive_api();
println!(
"Discovered {} operations and {} queries",
operations.len(),
queries.len()
);
println!("Type collection has {} types", collection.len());
// Should have some operations if the system is working
if !operations.is_empty() {
println!("Type extraction system is working!");
// Show some examples with scope information
for op in operations.iter().take(3) {
println!(
" Operation: {} -> wire: {} -> scope: {:?}",
op.identifier, op.wire_method, op.scope
);
}
}
if !queries.is_empty() {
for query in queries.iter().take(3) {
println!(
" Query: {} -> wire: {} -> scope: {:?}",
query.identifier, query.wire_method, query.scope
);
}
}
}
#[test]
fn test_api_functions_extraction() {
let (operations, queries, _collection) = generate_spacedrive_api();
let functions = extract_api_functions(&operations, &queries);
println!("Extracted {} API functions", functions.len());
// Group functions by namespace to show organization
let mut namespaces: std::collections::HashMap<String, Vec<&ApiFunction>> =
std::collections::HashMap::new();
for func in &functions {
namespaces
.entry(func.namespace.clone())
.or_default()
.push(func);
}
for (namespace, funcs) in namespaces {
println!("Namespace '{}': {} functions", namespace, funcs.len());
for func in funcs.iter().take(3) {
println!(
" {}: {} -> {} ({})",
func.method_name,
func.input_type_name,
func.output_type_name,
if func.is_action { "action" } else { "query" }
);
}
}
// Verify some basic properties
assert!(
!functions.is_empty(),
"Should have extracted some API functions"
);
// Check that namespaces are properly extracted
let has_libraries = functions.iter().any(|f| f.namespace == "libraries");
let has_jobs = functions.iter().any(|f| f.namespace == "jobs");
println!("Found libraries namespace: {}", has_libraries);
println!("Found jobs namespace: {}", has_jobs);
}
#[test]
fn test_swift_code_generation() {
let (operations, queries, _collection) = generate_spacedrive_api();
let functions = extract_api_functions(&operations, &queries);
let swift_code = generate_swift_api_code(&functions);
println!("Generated Swift code (first 1000 chars):");
println!("{}", &swift_code[..swift_code.len().min(1000)]);
// Verify basic structure
assert!(swift_code.contains("public struct LibrariesAPI"));
assert!(swift_code.contains("public struct JobsAPI"));
assert!(swift_code.contains("public struct NetworkAPI"));
// Verify method generation
assert!(swift_code.contains("public func create("));
assert!(swift_code.contains("public func list("));
assert!(swift_code.contains("public func start("));
// Verify method calls to client.execute
assert!(swift_code.contains("client.execute("));
assert!(swift_code.contains("responseType:"));
// Verify wire method strings are included
assert!(swift_code.contains("action:libraries.create.input.v1"));
assert!(swift_code.contains("query:jobs.list.v1"));
println!("Swift code generation test passed!");
}
}

View File

@@ -404,9 +404,6 @@ impl NetworkingService {
conn: conn.clone(),
});
// Connection established - don't open streams immediately
// Let connections be idle until actually needed for data transfer
// This prevents connection closure after initial ping/pong
logger
.info(&format!("Connection established to device {}", device_id))
.await;
@@ -416,6 +413,35 @@ impl NetworkingService {
device_id,
node_id,
});
// Open a hello stream to keep the connection alive
// This prevents idle timeout and signals to the receiver that the connection is ready
logger
.debug("Opening hello stream to keep connection alive...")
.await;
let conn_for_hello = conn.clone();
let logger_clone = logger.clone();
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
match conn_for_hello.open_bi().await {
Ok((mut send, _recv)) => {
// Send a simple hello message
let hello_msg = b"HELLO";
let _ = send.write_all(hello_msg).await;
let _ = send.finish();
logger_clone.debug("Hello stream sent successfully").await;
}
Err(e) => {
// Log error but don't fail - connection is still tracked
logger_clone
.warn(&format!("Failed to open hello stream: {}", e))
.await;
}
}
});
break;
}
Err(e) => {
@@ -1239,7 +1265,7 @@ impl NetworkingService {
relay_url,
);
// CRITICAL: Use the session_id derived from the pairing code, not the random seed
// The session_id derived from the pairing code
// This ensures both initiator and joiner derive the same session_id from the BIP39 words
let session_id = pairing_code.session_id();
@@ -1248,16 +1274,7 @@ impl NetworkingService {
.start_pairing_session_with_id(session_id, pairing_code.clone())
.await?;
// Register in device registry
let initiator_device_id = self.device_id();
let initiator_node_id = self.node_id();
let device_registry = self.device_registry();
{
let mut registry = device_registry.write().await;
registry.start_pairing(initiator_device_id, initiator_node_id, session_id)?;
}
// Get our node address for advertising
// Get our node address first (needed for device registry)
let mut node_addr = self.get_node_addr()?;
// If we don't have any direct addresses yet, wait a bit for them to be discovered
@@ -1321,6 +1338,17 @@ impl NetworkingService {
))
.await;
// Register in device registry with the node address
let initiator_device_id = self.device_id();
let initiator_node_id = self.node_id();
let device_registry = self.device_registry();
{
let mut registry = device_registry.write().await;
// Use node_addr or create an empty one if not available
let addr_for_registry = node_addr.clone().unwrap_or(NodeAddr::new(initiator_node_id));
registry.start_pairing(initiator_device_id, initiator_node_id, session_id, addr_for_registry)?;
}
// Publish pairing session via mDNS using user_data field
// The joiner will filter discovered nodes by this session_id
if let Some(endpoint) = &self.endpoint {
@@ -1482,9 +1510,7 @@ impl NetworkingService {
// Text-based pairing code: only use mDNS (local network only)
match self.try_mdns_discovery(session_id, force_relay).await {
Ok(()) => {
self.logger
.info("Connected via mDNS (local network)")
.await;
self.logger.info("Connected via mDNS (local network)").await;
Ok(())
}
Err(e) => {
@@ -1630,6 +1656,7 @@ impl NetworkingService {
app_version: env!("CARGO_PKG_VERSION").to_string(),
network_fingerprint: self.identity().network_fingerprint(),
last_seen: chrono::Utc::now(),
direct_addresses: vec![],
}
})
};
@@ -1790,6 +1817,7 @@ impl NetworkingService {
app_version: env!("CARGO_PKG_VERSION").to_string(),
network_fingerprint: self.identity().network_fingerprint(),
last_seen: chrono::Utc::now(),
direct_addresses: vec![],
}
})
};

View File

@@ -32,6 +32,7 @@ pub struct DeviceInfo {
pub app_version: String,
pub network_fingerprint: crate::service::network::utils::identity::NetworkFingerprint,
pub last_seen: DateTime<Utc>,
pub direct_addresses: Vec<String>,
}
/// Type of device
@@ -63,6 +64,7 @@ pub enum DeviceState {
Pairing {
node_id: NodeId,
session_id: Uuid,
node_addr: NodeAddr,
started_at: DateTime<Utc>,
},
/// Device successfully paired but not currently connected

View File

@@ -429,6 +429,7 @@ mod tests {
public_key_hash: "test_hash".to_string(),
},
last_seen: Utc::now(),
direct_addresses: vec![],
}
}

View File

@@ -119,10 +119,12 @@ impl DeviceRegistry {
device_id: Uuid,
node_id: NodeId,
session_id: Uuid,
node_addr: NodeAddr,
) -> Result<()> {
let state = DeviceState::Pairing {
node_id,
session_id,
node_addr,
started_at: Utc::now(),
};
@@ -174,6 +176,13 @@ impl DeviceRegistry {
.map(|addr| addr.to_string())
.collect()
}
Some(DeviceState::Pairing { node_addr, .. }) => {
// Extract addresses from the active pairing connection
node_addr
.direct_addresses()
.map(|addr| addr.to_string())
.collect()
}
Some(DeviceState::Connected { connection, .. }) => {
// If somehow already connected, use those addresses
connection.addresses.clone()
@@ -477,6 +486,7 @@ impl DeviceRegistry {
public_key_hash: "placeholder".to_string(),
},
last_seen: Utc::now(),
direct_addresses: vec![],
})
}

View File

@@ -191,15 +191,8 @@ impl PairingProtocolHandler {
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
// Complete pairing in device registry with proper lock scoping
// Use the actual device ID from device_info to ensure consistency
let actual_device_id = device_info.device_id;
{
let mut registry = self.device_registry.write().await;
registry
.complete_pairing(actual_device_id, device_info.clone(), session_keys.clone())
.await?;
} // Release write lock here
// Get node ID from the device info's network fingerprint
let node_id = match device_info.network_fingerprint.node_id.parse::<NodeId>() {
@@ -211,6 +204,47 @@ impl PairingProtocolHandler {
}
};
// Register the remote device in Pairing state before completing pairing
// This allows complete_pairing to extract addresses from the Pairing state
{
let mut registry = self.device_registry.write().await;
// Create a NodeAddr with the node_id and extract direct addresses from device_info
let mut node_addr = iroh::NodeAddr::new(node_id);
// Add direct addresses from the device_info
for addr_str in &device_info.direct_addresses {
if let Ok(socket_addr) = addr_str.parse() {
node_addr = node_addr.with_direct_addresses([socket_addr]);
}
}
if !device_info.direct_addresses.is_empty() {
self.log_info(&format!(
"Extracted {} direct addresses from joiner device info",
device_info.direct_addresses.len()
))
.await;
}
registry
.start_pairing(actual_device_id, node_id, session_id, node_addr)
.map_err(|e| {
self.log_warn(&format!(
"Warning: Could not register device in Pairing state: {}",
e
));
e
})
.ok(); // Ignore errors - device might already be in pairing state
} // Release write lock here
// Complete pairing in device registry with proper lock scoping
{
let mut registry = self.device_registry.write().await;
registry
.complete_pairing(actual_device_id, device_info.clone(), session_keys.clone())
.await?;
} // Release write lock here
// Mark device as connected since pairing is successful
let simple_connection = crate::service::network::device::ConnectionInfo {
addresses: vec![], // Will be filled in later

View File

@@ -71,13 +71,61 @@ impl PairingProtocolHandler {
let shared_secret = self.generate_shared_secret(session_id).await?;
let session_keys = SessionKeys::from_shared_secret(shared_secret.clone());
// Get node ID from the initiator's device info
let device_id = initiator_device_info.device_id;
let node_id = match initiator_device_info
.network_fingerprint
.node_id
.parse::<NodeId>()
{
Ok(id) => id,
Err(_) => {
self.log_warn("Failed to parse node ID from initiator device info, using fallback")
.await;
NodeId::from_bytes(&[0u8; 32]).unwrap()
}
};
// Register the initiator device in Pairing state before completing pairing
// This stores Alice's addresses so Bob can reconnect if needed
{
let mut registry = self.device_registry.write().await;
// Create NodeAddr with addresses extracted from initiator_device_info
let mut node_addr = iroh::NodeAddr::new(node_id);
// Add direct addresses from the device_info
for addr_str in &initiator_device_info.direct_addresses {
if let Ok(socket_addr) = addr_str.parse() {
node_addr = node_addr.with_direct_addresses([socket_addr]);
}
}
if !initiator_device_info.direct_addresses.is_empty() {
self.log_info(&format!(
"Extracted {} direct addresses from initiator device info",
initiator_device_info.direct_addresses.len()
))
.await;
}
registry
.start_pairing(device_id, node_id, session_id, node_addr)
.map_err(|e| {
self.log_warn(&format!(
"Warning: Could not register initiator device in Pairing state: {}",
e
));
e
})
.ok(); // Ignore errors - device might already be in pairing state
} // Release write lock here
// Complete pairing in device registry
let actual_device_id = initiator_device_info.device_id;
{
let mut registry = self.device_registry.write().await;
if let Err(e) = registry
.complete_pairing(
actual_device_id,
device_id,
initiator_device_info.clone(),
session_keys.clone(),
)
@@ -100,19 +148,16 @@ impl PairingProtocolHandler {
};
let mut registry = self.device_registry.write().await;
if let Err(e) = registry
.mark_connected(actual_device_id, simple_connection)
.await
{
if let Err(e) = registry.mark_connected(device_id, simple_connection).await {
self.log_warn(&format!(
"Warning - failed to mark initiator device {} as connected: {}",
actual_device_id, e
device_id, e
))
.await;
} else {
self.log_info(&format!(
"Successfully marked initiator device {} as connected after pairing",
actual_device_id
device_id
))
.await;
}
@@ -217,6 +262,7 @@ impl PairingProtocolHandler {
public_key_hash: "unknown".to_string(),
},
last_seen: chrono::Utc::now(),
direct_addresses: vec![],
}
}
} else {
@@ -226,14 +272,61 @@ impl PairingProtocolHandler {
}
};
// Complete pairing in device registry
// Use the actual device ID from device_info to ensure consistency
let actual_device_id = initiator_device_info.device_id;
let device_id = initiator_device_info.device_id;
let node_id = match initiator_device_info
.network_fingerprint
.node_id
.parse::<NodeId>()
{
Ok(id) => id,
Err(_) => {
self.log_warn("Failed to parse node ID from initiator device info, using from_node fallback")
.await;
from_node
}
};
// Register the initiator device in Pairing state before completing pairing
// This stores Alice's addresses so Bob can reconnect if needed
{
let mut registry = self.device_registry.write().await;
// Create NodeAddr with addresses extracted from initiator_device_info
let mut node_addr = iroh::NodeAddr::new(node_id);
// Add direct addresses from the device_info
for addr_str in &initiator_device_info.direct_addresses {
if let Ok(socket_addr) = addr_str.parse() {
node_addr = node_addr.with_direct_addresses([socket_addr]);
}
}
if !initiator_device_info.direct_addresses.is_empty() {
self.log_info(&format!(
"Extracted {} direct addresses from initiator device info in completion handler",
initiator_device_info.direct_addresses.len()
))
.await;
}
registry
.start_pairing(device_id, node_id, session_id, node_addr)
.map_err(|e| {
self.log_warn(&format!(
"Warning: Could not register initiator device in Pairing state: {}",
e
));
e
})
.ok(); // Ignore errors - device might already be in pairing state
} // Release write lock here
// Complete pairing in device registry
let pairing_result = {
let mut registry = self.device_registry.write().await;
registry
.complete_pairing(
actual_device_id,
device_id,
initiator_device_info.clone(),
session_keys.clone(),
)
@@ -248,7 +341,7 @@ impl PairingProtocolHandler {
if let Some(session) = sessions.get_mut(&session_id) {
session.state = PairingState::Completed;
session.shared_secret = Some(shared_secret.clone());
session.remote_device_id = Some(actual_device_id);
session.remote_device_id = Some(device_id);
}
}
@@ -268,9 +361,7 @@ impl PairingProtocolHandler {
let _mark_result = {
let mut registry = self.device_registry.write().await;
registry
.mark_connected(actual_device_id, simple_connection)
.await
registry.mark_connected(device_id, simple_connection).await
};
}
}

View File

@@ -19,7 +19,7 @@ use crate::service::network::{
};
use async_trait::async_trait;
use blake3;
use iroh::{Endpoint, NodeAddr, NodeId};
use iroh::{Endpoint, NodeAddr, NodeId, Watcher};
use persistence::PairingPersistence;
use security::PairingSecurity;
use std::collections::HashMap;
@@ -55,6 +55,9 @@ pub struct PairingProtocolHandler {
/// Session persistence manager
persistence: Option<Arc<PairingPersistence>>,
/// Endpoint for accessing direct addresses
endpoint: Option<Endpoint>,
}
impl PairingProtocolHandler {
@@ -66,6 +69,7 @@ impl PairingProtocolHandler {
command_sender: tokio::sync::mpsc::UnboundedSender<
crate::service::network::core::event_loop::EventLoopCommand,
>,
endpoint: Option<Endpoint>,
) -> Self {
Self {
identity,
@@ -76,6 +80,7 @@ impl PairingProtocolHandler {
command_sender,
role: None,
persistence: None,
endpoint,
}
}
@@ -88,6 +93,7 @@ impl PairingProtocolHandler {
crate::service::network::core::event_loop::EventLoopCommand,
>,
data_dir: PathBuf,
endpoint: Option<Endpoint>,
) -> Self {
let persistence = Arc::new(PairingPersistence::new(data_dir));
Self {
@@ -99,6 +105,7 @@ impl PairingProtocolHandler {
command_sender,
role: None,
persistence: Some(persistence),
endpoint,
}
}
@@ -321,6 +328,20 @@ impl PairingProtocolHandler {
device_info.network_fingerprint = self.identity.network_fingerprint();
device_info.last_seen = chrono::Utc::now();
// Populate direct addresses from endpoint
device_info.direct_addresses = if let Some(endpoint) = &self.endpoint {
if let Some(node_addr) = endpoint.node_addr().get() {
node_addr
.direct_addresses()
.map(|addr| addr.to_string())
.collect()
} else {
vec![]
}
} else {
vec![]
};
Ok(device_info)
}

View File

@@ -1741,33 +1741,45 @@ impl PeerSync {
///
/// Queries the database for ALL shared resources (tags, albums, etc.) to send
/// to a new device during initial sync. This ensures pre-sync data is included.
///
/// This is fully generic - uses the registry to discover and query all shared models.
pub async fn get_full_shared_state(&self) -> Result<serde_json::Value> {
use crate::infra::sync::Syncable;
debug!("Querying full shared resource state for backfill");
// Query all shared models
// TODO: Add volumes, user_metadata, etc. as they get Syncable impl
let tags = crate::infra::db::entities::tag::Model::query_for_sync(
None,
None,
10000, // Large limit for full state
self.db.as_ref(),
// Query all shared models through the registry (fully generic)
let all_shared_state = crate::infra::sync::registry::query_all_shared_models(
None, // No since filter - get everything
CATCHUP_BATCH_SIZE,
self.db.clone(),
)
.await
.map_err(|e| anyhow::anyhow!("Failed to query tags: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to query shared models: {}", e))?;
info!("Queried {} tags for backfill state snapshot", tags.len());
// Build response object dynamically
let mut response = serde_json::Map::new();
Ok(serde_json::json!({
"tags": tags.into_iter().map(|(uuid, data, _ts)| {
serde_json::json!({
"uuid": uuid,
"data": data
for (model_type, records) in all_shared_state {
info!(
model_type = %model_type,
count = records.len(),
"Queried shared model for backfill state snapshot"
);
// Convert records to array of {uuid, data} objects
let records_json: Vec<serde_json::Value> = records
.into_iter()
.map(|(uuid, data, _ts)| {
serde_json::json!({
"uuid": uuid,
"data": data
})
})
}).collect::<Vec<_>>(),
// Add more shared resources here as they're implemented
}))
.collect();
response.insert(model_type, serde_json::Value::Array(records_json));
}
Ok(serde_json::Value::Object(response))
}
/// Transition to ready state (after backfill)

View File

@@ -166,6 +166,8 @@ pub struct WatchedLocation {
pub path: PathBuf,
/// Whether watching is enabled for this location
pub enabled: bool,
/// Indexing rule toggles for filtering events
pub rule_toggles: crate::ops::indexing::rules::RuleToggles,
}
impl LocationWatcher {
@@ -206,6 +208,15 @@ impl LocationWatcher {
}
}
// Get rule toggles and location root from watched locations
let (rule_toggles, location_root) = {
let locations = self.watched_locations.read().await;
locations
.get(&location_id)
.map(|loc| (loc.rule_toggles, loc.path.clone()))
.ok_or_else(|| anyhow::anyhow!("Location {} not found in watched locations", location_id))?
};
// Create metrics for this worker
let worker_metrics = Arc::new(LocationWorkerMetrics::new());
{
@@ -223,6 +234,8 @@ impl LocationWatcher {
self.events.clone(),
self.config.clone(),
worker_metrics.clone(),
rule_toggles,
location_root,
);
// Record worker creation
@@ -498,6 +511,7 @@ impl LocationWatcher {
library_id: library.id(),
path: path.clone(),
enabled: true, // TODO: Add enabled field to database schema
rule_toggles: Default::default(), // Use default rules for existing locations
};
// Add to watched locations
@@ -827,6 +841,7 @@ impl LocationWatcher {
library_id,
path: path.clone(),
enabled: true,
rule_toggles: Default::default(), // Use default rules for new locations
};
// Add location to watcher

View File

@@ -29,6 +29,10 @@ pub struct LocationWorker {
config: LocationWatcherConfig,
/// Metrics for this worker
metrics: Arc<LocationWorkerMetrics>,
/// Indexing rule toggles for filtering events
rule_toggles: crate::ops::indexing::rules::RuleToggles,
/// Location root path for rule evaluation
location_root: PathBuf,
}
impl LocationWorker {
@@ -41,6 +45,8 @@ impl LocationWorker {
events: Arc<EventBus>,
config: LocationWatcherConfig,
metrics: Arc<LocationWorkerMetrics>,
rule_toggles: crate::ops::indexing::rules::RuleToggles,
location_root: PathBuf,
) -> Self {
Self {
location_id,
@@ -50,6 +56,8 @@ impl LocationWorker {
events,
config,
metrics,
rule_toggles,
location_root,
}
}
@@ -422,7 +430,15 @@ impl LocationWorker {
}
}
if let Err(e) = responder::apply_batch(&self.context, self.library_id, raw_events).await {
if let Err(e) = responder::apply_batch(
&self.context,
self.library_id,
raw_events,
self.rule_toggles,
&self.location_root,
)
.await
{
error!(
"Failed to apply batch for location {}: {}",
self.location_id, e
@@ -490,6 +506,8 @@ mod tests {
events: Arc::new(EventBus::default()),
config,
metrics: Arc::new(LocationWorkerMetrics::new()),
rule_toggles: Default::default(),
location_root: PathBuf::from("/test"),
};
let events = vec![
@@ -522,6 +540,8 @@ mod tests {
events: Arc::new(EventBus::default()),
metrics: Arc::new(LocationWorkerMetrics::new()),
config,
rule_toggles: Default::default(),
location_root: PathBuf::from("/test"),
};
let renames = vec![

View File

@@ -170,6 +170,13 @@ impl CargoTestRunner {
.env("TEST_ROLE", &process.name)
.env("TEST_DATA_DIR", process.data_dir.path().to_str().unwrap());
// Forward RUST_LOG from parent or default to debug for subprocess visibility
if let Ok(rust_log) = std::env::var("RUST_LOG") {
command.env("RUST_LOG", rust_log);
} else {
command.env("RUST_LOG", "debug");
}
let child = command
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
@@ -216,6 +223,13 @@ impl CargoTestRunner {
.env("TEST_ROLE", &process.name)
.env("TEST_DATA_DIR", process.data_dir.path().to_str().unwrap());
// Forward RUST_LOG from parent or default to debug for subprocess visibility
if let Ok(rust_log) = std::env::var("RUST_LOG") {
command.env("RUST_LOG", rust_log);
} else {
command.env("RUST_LOG", "debug");
}
let child = command
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())

View File

@@ -233,6 +233,11 @@ async fn bob_pairing_scenario() {
println!("PAIRING_SUCCESS: Bob's Test Device connected to Alice successfully");
// Wait longer to allow persistent connection to be established via auto-reconnection
// The pairing stream is temporary; we need to wait for Bob to reconnect
println!("Bob: Waiting for persistent connection to be established...");
tokio::time::sleep(Duration::from_secs(10)).await;
// Write success marker for orchestrator to detect
std::fs::write("/tmp/spacedrive-pairing-test/bob_success.txt", "success").unwrap();
break;

View File

@@ -0,0 +1,487 @@
//! End-to-end sync backfill integration test
//!
//! This test validates the full sync stack by:
//! 1. Device A (Alice) indexes the Spacedrive source code
//! 2. Device B (Bob) pairs with Alice and triggers backfill
//! 3. Validates that Bob receives all indexed data via sync
use sd_core::infra::db::entities::{entry, entry_closure};
use sd_core::testing::CargoTestRunner;
use sd_core::Core;
use sea_orm::{EntityTrait, PaginatorTrait};
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use tokio::time::timeout;
const TEST_DIR: &str = "/tmp/spacedrive-sync-backfill-test";
/// Alice indexes the Spacedrive source code
#[tokio::test]
#[ignore]
async fn alice_indexes_scenario() {
if env::var("TEST_ROLE").unwrap_or_default() != "alice" {
return;
}
env::set_var("SPACEDRIVE_TEST_DIR", TEST_DIR);
let data_dir = PathBuf::from(format!("{}/alice", TEST_DIR));
let device_name = "Alice's Device";
println!("Alice: Starting sync backfill test");
println!("Alice: Data dir: {:?}", data_dir);
// Initialize Core
println!("Alice: Initializing Core...");
let mut core = timeout(Duration::from_secs(10), Core::new(data_dir))
.await
.unwrap()
.unwrap();
core.device.set_name(device_name.to_string()).unwrap();
println!("Alice: Core initialized");
// Subscribe to events to monitor job completion
let mut event_subscriber = core.events.subscribe();
// Get the Spacedrive project root (current working directory during test)
let project_root = env::current_dir().expect("Failed to get current directory");
println!("Alice: Project root: {:?}", project_root);
// Create library
println!("Alice: Creating library...");
let library = core
.libraries
.create_library("Test Sync Library", None, core.context.clone())
.await
.unwrap();
// Register device in database
let db = library.db();
let device = core.device.to_device().unwrap();
use sd_core::infra::db::entities;
use sea_orm::{ActiveModelTrait, ColumnTrait, QueryFilter};
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await
.unwrap()
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await.unwrap()
}
};
// Create a location pointing to the Spacedrive source code
println!("Alice: Creating location for source code...");
let location_args = sd_core::location::LocationCreateArgs {
path: project_root.clone(),
name: Some("Spacedrive Source".to_string()),
index_mode: sd_core::location::IndexMode::Shallow,
};
let location_id = sd_core::location::create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await
.expect("Failed to create location");
println!("Alice: Location created with ID: {}", location_id);
// The indexing job is automatically started by add_location, so we just monitor events
println!("Alice: Monitoring indexer job progress...");
let mut job_completed = false;
let mut attempts = 0;
let max_attempts = 300; // 5 minutes max
tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
match event {
sd_core::infra::event::Event::JobProgress {
job_id,
progress,
message,
..
} => {
println!(
"Alice: Job {} progress: {}% - {}",
job_id,
progress,
message.unwrap_or_else(|| "".to_string())
);
}
sd_core::infra::event::Event::JobCompleted { job_id, .. } => {
println!("Alice: Job {} completed!", job_id);
}
sd_core::infra::event::Event::JobFailed { error, .. } => {
panic!("Alice: Job failed: {}", error);
}
_ => {}
}
}
});
// Poll for indexing completion by checking if files have been indexed
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let entry_count = entry::Entity::find()
.count(library.db().conn())
.await
.unwrap();
if entry_count > 0 && attempts > 10 {
// Give it some time to ensure indexing is done
tokio::time::sleep(Duration::from_secs(3)).await;
let final_entry_count = entry::Entity::find()
.count(library.db().conn())
.await
.unwrap();
if final_entry_count == entry_count {
println!("Alice: Indexing complete! Found {} entry entries", final_entry_count);
job_completed = true;
break;
}
}
attempts += 1;
if attempts >= max_attempts {
panic!("Alice: Indexing timeout - job did not complete");
}
if attempts % 10 == 0 {
println!("Alice: Still waiting for indexing... (current count: {})", entry_count);
}
}
if !job_completed {
panic!("Alice: Failed to complete indexing");
}
// Count total entries via entry_closure table
let entry_count = entry_closure::Entity::find()
.count(library.db().conn())
.await
.unwrap();
println!("Alice: Total entry_closure count: {}", entry_count);
// Write entry count to shared file for Bob to validate against
std::fs::create_dir_all(TEST_DIR).unwrap();
std::fs::write(
format!("{}/alice_entry_count.txt", TEST_DIR),
entry_count.to_string(),
)
.unwrap();
println!("Alice: Entry count written to shared file");
// Initialize networking for device pairing
println!("Alice: Initializing networking...");
timeout(Duration::from_secs(10), core.init_networking())
.await
.unwrap()
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
println!("Alice: Networking initialized");
// Start pairing as initiator
println!("Alice: Starting pairing as initiator...");
let (pairing_code, expires_in) = if let Some(networking) = core.networking() {
timeout(
Duration::from_secs(15),
networking.start_pairing_as_initiator(false),
)
.await
.unwrap()
.unwrap()
} else {
panic!("Networking not initialized");
};
let short_code = pairing_code
.split_whitespace()
.take(3)
.collect::<Vec<_>>()
.join(" ");
println!(
"Alice: Pairing code: {}... (expires in {}s)",
short_code, expires_in
);
// Write pairing code for Bob
std::fs::write(format!("{}/pairing_code.txt", TEST_DIR), &pairing_code).unwrap();
println!("Alice: Pairing code written");
// Wait for Bob to connect
println!("Alice: Waiting for Bob to pair...");
let mut pair_attempts = 0;
let max_pair_attempts = 60;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let connected_devices = core
.services
.device
.get_connected_devices()
.await
.unwrap();
if !connected_devices.is_empty() {
println!("Alice: Bob paired successfully!");
break;
}
pair_attempts += 1;
if pair_attempts >= max_pair_attempts {
panic!("Alice: Pairing timeout");
}
if pair_attempts % 5 == 0 {
println!("Alice: Still waiting for Bob...");
}
}
// Give time for sync to initialize and begin backfill
println!("Alice: Waiting for sync to begin...");
tokio::time::sleep(Duration::from_secs(10)).await;
// Write success marker
std::fs::write(format!("{}/alice_success.txt", TEST_DIR), "success").unwrap();
println!("Alice: Test completed successfully");
// Keep alive for Bob to complete backfill
tokio::time::sleep(Duration::from_secs(60)).await;
}
/// Bob pairs with Alice and backfills the indexed data
#[tokio::test]
#[ignore]
async fn bob_backfills_scenario() {
if env::var("TEST_ROLE").unwrap_or_default() != "bob" {
return;
}
env::set_var("SPACEDRIVE_TEST_DIR", TEST_DIR);
let data_dir = PathBuf::from(format!("{}/bob", TEST_DIR));
let device_name = "Bob's Device";
println!("Bob: Starting sync backfill test");
println!("Bob: Data dir: {:?}", data_dir);
// Initialize Core
println!("Bob: Initializing Core...");
let mut core = timeout(Duration::from_secs(10), Core::new(data_dir))
.await
.unwrap()
.unwrap();
core.device.set_name(device_name.to_string()).unwrap();
println!("Bob: Core initialized");
// Initialize networking
println!("Bob: Initializing networking...");
timeout(Duration::from_secs(10), core.init_networking())
.await
.unwrap()
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
println!("Bob: Networking initialized");
// Wait for Alice's pairing code
println!("Bob: Looking for pairing code...");
let pairing_code = loop {
if let Ok(code) = std::fs::read_to_string(format!("{}/pairing_code.txt", TEST_DIR)) {
break code.trim().to_string();
}
tokio::time::sleep(Duration::from_millis(500)).await;
};
println!("Bob: Found pairing code");
// Join pairing
println!("Bob: Joining pairing...");
if let Some(networking) = core.networking() {
timeout(
Duration::from_secs(15),
networking.start_pairing_as_joiner(&pairing_code, false),
)
.await
.unwrap()
.unwrap();
} else {
panic!("Networking not initialized");
}
println!("Bob: Successfully paired with Alice");
// Wait for devices to be connected
println!("Bob: Waiting for connection...");
let mut pair_attempts = 0;
let max_pair_attempts = 30;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let connected_devices = core
.services
.device
.get_connected_devices()
.await
.unwrap();
if !connected_devices.is_empty() {
println!("Bob: Connected to Alice!");
break;
}
pair_attempts += 1;
if pair_attempts >= max_pair_attempts {
panic!("Bob: Connection timeout");
}
}
// Read Alice's entry count for validation
let alice_entry_count: u64 = loop {
if let Ok(content) = std::fs::read_to_string(format!("{}/alice_entry_count.txt", TEST_DIR))
{
if let Ok(count) = content.trim().parse() {
break count;
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
};
println!("Bob: Alice indexed {} entries", alice_entry_count);
// Wait for library to be available after pairing
println!("Bob: Waiting for shared library...");
let library = loop {
let libs = core.libraries.list().await;
if !libs.is_empty() {
break libs[0].clone();
}
tokio::time::sleep(Duration::from_millis(500)).await;
};
println!("Bob: Got shared library: {}", library.id());
let mut backfill_attempts = 0;
let max_backfill_attempts = 120; // 2 minutes
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bob_entry_count = entry_closure::Entity::find()
.count(library.db().conn())
.await
.unwrap();
if backfill_attempts % 10 == 0 {
println!(
"Bob: Current entry count: {} / {}",
bob_entry_count, alice_entry_count
);
}
// Check if Bob has received most of the data (within 5% tolerance)
let min_expected = (alice_entry_count as f64 * 0.95) as u64;
if bob_entry_count >= min_expected {
println!(
"Bob: Backfill complete! Received {} entries (expected ~{})",
bob_entry_count, alice_entry_count
);
// Validate the count is reasonable
if bob_entry_count < alice_entry_count / 2 {
panic!(
"Bob: Backfill validation failed - received {} but expected ~{}",
bob_entry_count, alice_entry_count
);
}
break;
}
backfill_attempts += 1;
if backfill_attempts >= max_backfill_attempts {
panic!(
"Bob: Backfill timeout - only received {} / {} entries",
bob_entry_count, alice_entry_count
);
}
}
// Write success marker
std::fs::write(format!("{}/bob_success.txt", TEST_DIR), "success").unwrap();
println!("Bob: Test completed successfully");
// Give time for graceful shutdown
tokio::time::sleep(Duration::from_secs(5)).await;
}
/// Main test orchestrator
#[tokio::test]
async fn test_sync_backfill_end_to_end() {
println!("Starting end-to-end sync backfill integration test");
// Clean up from previous runs
let _ = std::fs::remove_dir_all(TEST_DIR);
std::fs::create_dir_all(TEST_DIR).unwrap();
let mut runner = CargoTestRunner::for_test_file("sync_backfill_integration_test")
.with_timeout(Duration::from_secs(600)) // 10 minutes for full test
.add_subprocess("alice", "alice_indexes_scenario")
.add_subprocess("bob", "bob_backfills_scenario");
// Start Alice first to index the source code
println!("Starting Alice (indexer)...");
runner
.spawn_single_process("alice")
.await
.expect("Failed to spawn Alice");
// Wait for Alice to complete indexing and initialize networking
// Indexing can take 30-60 seconds depending on system
println!("Waiting for Alice to complete indexing...");
tokio::time::sleep(Duration::from_secs(90)).await;
// Start Bob to trigger backfill
println!("Starting Bob (backfill receiver)...");
runner
.spawn_single_process("bob")
.await
.expect("Failed to spawn Bob");
// Wait for both devices to complete
let result = runner
.wait_for_success(|_outputs| {
let alice_success = std::fs::read_to_string(format!("{}/alice_success.txt", TEST_DIR))
.map(|content| content.trim() == "success")
.unwrap_or(false);
let bob_success = std::fs::read_to_string(format!("{}/bob_success.txt", TEST_DIR))
.map(|content| content.trim() == "success")
.unwrap_or(false);
alice_success && bob_success
})
.await;
match result {
Ok(_) => {
println!("End-to-end sync backfill test successful!");
}
Err(e) => {
println!("End-to-end sync backfill test failed: {}", e);
for (name, output) in runner.get_all_outputs() {
println!("\n{} output:\n{}", name, output);
}
panic!("Sync backfill test failed - see output above");
}
}
}

2
docs

Submodule docs updated: ffcd1266ec...8f6e226094