diff --git a/core/AGENTS.md b/core/AGENTS.md new file mode 100644 index 000000000..5260dad07 --- /dev/null +++ b/core/AGENTS.md @@ -0,0 +1,100 @@ +# AGENTS.md - Spacedrive Core v2 + +## Build/Test Commands + +- `cargo build` - Build the project +- `cargo test` - Run all tests +- `cargo test ` - Run specific test (e.g., `cargo test library_test`) +- `cargo clippy` - Lint code +- `cargo fmt` - Format code +- `cargo run --bin sd-cli -- ` - Run CLI (note: binary is `sd-cli`, not `spacedrive`) + +## Code Style + +- **Imports**: Group std, external crates, then local modules with blank lines between +- **Formatting**: Use `cargo fmt` - tabs for indentation, snake_case for variables/functions. DO NOT use emojis at all. +- **Types**: Explicit types preferred, use `Result` for error handling with `thiserror` +- **Naming**: snake_case for functions/variables, PascalCase for types, SCREAMING_SNAKE_CASE for constants +- **Error Handling**: Use `Result` types, `thiserror` for custom errors, `anyhow` for application errors +- **Async**: Use `async/await`, prefer `tokio` primitives, avoid blocking operations +- **Resumable Jobs**: For long-running jobs that need to be resumable, store the job's state within the job's struct itself. Use `#[serde(skip)]` for fields that should not be persisted. For example, in a file copy job, the list of already copied files can be stored to allow the job to resume from where it left off. +- **Documentation**: Use `//!` for module docs, `///` for public items, include examples +- **Architecture**: Follow a Command Query Responsibility Segregation (CQRS) and Domain-Driven Design (DDD) pattern. + - **Domain**: Core data structures and business logic are located in `src/domain/`. These are the "nouns" of your system. + - **Operations**: State-changing commands (actions) and data-retrieving queries are located in `src/ops/`. These are the "verbs" of your system. + - **Actions**: Operations that modify the state of the application. They should be self-contained and transactional. + - **Queries**: Operations that retrieve data without modifying state. They should be efficient and optimized for reading. +- **Feature Modules**: Each new feature should be implemented in its own module within the `src/ops/` directory. For example, a new "share" feature would live in `src/ops/files/share`. Each feature module should contain the following files where applicable: + - `action.rs`: The main logic for the state-changing operation. + - `input.rs`: Data structures for the action's input. + - `output.rs`: Data structures for the action's output. + - `job.rs`: If the action is long-running, the job implementation. +- **Database**: Use SeaORM entities, async queries, proper error propagation +- **Comments**: Minimal inline comments, focus on why not what, no TODO comments in production code + +## Daemon Architecture + +Spacedrive uses a **daemon-client architecture** where a single daemon process manages the core functionality and multiple client applications (CLI, GraphQL server, desktop app) connect to it via Unix domain sockets. + +> **For detailed daemon architecture documentation, see [/docs/core/daemon.md](/docs/core/daemon.md)** + +### **The `Wire` Trait** +All actions and queries must implement the `Wire` trait to enable type-safe client-daemon communication: + +```rust +pub trait Wire { + const METHOD: &'static str; +} +``` + +### **Registration Macros** +Instead of manually implementing `Wire`, use these registration macros that automatically: +1. Implement the `Wire` trait with the correct method string +2. Register the operation in the global registry using the `inventory` crate + +**For Queries:** +```rust +crate::register_query!(NetworkStatusQuery, "network.status"); +// Generates method: "query:network.status.v1" +``` + +**For Library Actions:** +```rust +crate::register_library_action!(FileCopyAction, "files.copy"); +// Generates method: "action:files.copy.input.v1" +``` + +**For Core Actions:** +```rust +crate::register_core_action!(LibraryCreateAction, "libraries.create"); +// Generates method: "action:libraries.create.input.v1" +``` + +### **Registry System** +- **Location**: `core/src/ops/registry.rs` +- **Mechanism**: Uses the `inventory` crate for compile-time registration +- **Global Maps**: `QUERIES` and `ACTIONS` hashmaps populated at startup +- **Handler Functions**: Generic handlers that decode payloads, execute operations, and encode results + +## Logging Standards + +- **Setup**: Use `tracing_subscriber::fmt()` with env filter for structured logging +- **Macros**: Use `info!`, `warn!`, `error!`, `debug!` from `tracing` crate, not `println!` +- **Job Context**: Use `ctx.log()` in jobs for job-specific logging with automatic job_id tagging +- **Structured**: Include relevant context fields: `debug!(job_id = %self.id, "message")` +- **Levels**: debug for detailed flow, info for user-relevant events, warn for recoverable issues, error for failures +- **Format**: `tracing_subscriber::fmt().with_env_filter(env_filter).init()` in main/examples +- **Environment**: Respect `RUST_LOG` env var, fallback to module-specific filters like `sd_core=info` + +## Documentation + +- **Core level docs**: Live in `/docs/core` - comprehensive architecture and implementation guides +- **Core design docs**: Live in `/docs/core/design` - planning documents, RFCs, and design decisions +- **Application level docs**: Live in `/docs` +- **Code docs**: Use `///` for public APIs, `//!` for module overviews, include examples + + +## Debug Instructions + +- You can view the logs of a job in the job_logs directory in the root of the data folder +- When testing the CLI, after compiling you must first use the `restart` command to ensure the Spacedrive daemon is using the latest build. diff --git a/core/PAIRING_FIX_PLAN.md b/core/PAIRING_FIX_PLAN.md new file mode 100644 index 000000000..40f8d48e3 --- /dev/null +++ b/core/PAIRING_FIX_PLAN.md @@ -0,0 +1,563 @@ +# Pairing Protocol - Comprehensive Fix Plan + +## Overview + +This plan addresses both **critical security vulnerabilities** and **protocol correctness issues** that cause test failures. + +**Priority**: CRITICAL - Current system is broken and insecure + +--- + +## Phase 1: Fix Critical Protocol Flow (Required for Tests to Pass) + +### Issue #1: Bob Completes Pairing Too Early + +**File**: `joiner.rs:17-211` (handle_pairing_challenge) + +**Current (WRONG)**: + +```rust +pub(crate) async fn handle_pairing_challenge(...) -> Result> { + // Sign challenge + let signature = self.identity.sign(&challenge)?; + + // Generate shared secret TOO EARLY + let shared_secret = self.generate_shared_secret(session_id).await?; + let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); + + // Complete pairing BEFORE Alice confirms + registry.complete_pairing(device_id, initiator_device_info.clone(), session_keys).await?; + + // Mark as connected BEFORE Alice confirms + registry.mark_connected(device_id, simple_connection).await?; + + // Set state to Completed BEFORE receiving confirmation + session.state = PairingState::Completed; + + // Send response + let response = PairingMessage::Response { session_id, response: signature, device_info }; + Ok(serde_json::to_vec(&response)?) +} +``` + +**Fixed**: + +```rust +pub(crate) async fn handle_pairing_challenge(...) -> Result> { + // Sign challenge + let signature = self.identity.sign(&challenge)?; + + // Store initiator info for later (when we receive Complete) + { + let mut sessions = self.active_sessions.write().await; + if let Some(session) = sessions.get_mut(&session_id) { + session.remote_device_info = Some(initiator_device_info.clone()); + session.state = PairingState::ResponseSent; // NOT Completed! + } + } + + // ONLY send response, don't complete anything yet + let device_info = self.get_device_info().await?; + let response = PairingMessage::Response { + session_id, + response: signature, + device_info + }; + + Ok(serde_json::to_vec(&response)?) +} +``` + +**Changes**: + +- Remove all lines 71-178 (shared secret, complete_pairing, mark_connected) +- Only store initiator_device_info for later use +- Transition to `ResponseSent` state (not `Completed`) +- Wait for `Complete` message before doing anything + +### Issue #2: handle_completion is Redundant + +**File**: `joiner.rs:214-411` (handle_completion) + +**Current**: Does everything that handle_pairing_challenge already did + +**Fixed**: This becomes the ONLY place Bob completes pairing + +```rust +pub(crate) async fn handle_completion(...) -> Result<()> { + if success { + // NOW generate shared secret (not before!) + let shared_secret = self.generate_shared_secret(session_id).await?; + let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); + + // Get initiator info we stored earlier + let initiator_device_info = { + let sessions = self.active_sessions.read().await; + sessions.get(&session_id) + .and_then(|s| s.remote_device_info.clone()) + .ok_or(NetworkingError::Protocol("No device info stored".to_string()))? + }; + + let device_id = initiator_device_info.device_id; + + // Register initiator in Pairing state + registry.start_pairing(device_id, node_id, session_id, node_addr)?; + + // NOW complete pairing (Alice already confirmed!) + registry.complete_pairing(device_id, initiator_device_info.clone(), session_keys).await?; + + // Mark as connected + registry.mark_connected(device_id, simple_connection).await?; + + // Update session state + session.state = PairingState::Completed; + session.shared_secret = Some(shared_secret); + } + Ok(()) +} +``` + +### Issue #3: Alice Must Guarantee Complete Send + +**File**: `initiator.rs:120-295` (handle_pairing_response) + +**Current**: Marks as completed, then sends Complete (might fail silently) + +**Fixed**: Send Complete synchronously, fail if it doesn't work + +```rust +pub(crate) async fn handle_pairing_response(...) -> Result> { + // ... signature verification ... + + if !signature_valid { + // Mark as failed + session.state = PairingState::Failed { reason: "Invalid signature".to_string() }; + + // Send failure Complete message + let response = PairingMessage::Complete { + session_id, + success: false, + reason: Some("Invalid signature".to_string()), + }; + return serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e)); + } + + // Signature valid - complete pairing + let shared_secret = self.generate_shared_secret(session_id).await?; + let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); + + registry.start_pairing(device_id, node_id, session_id, node_addr)?; + registry.complete_pairing(device_id, device_info.clone(), session_keys).await?; + registry.mark_connected(device_id, simple_connection).await?; + + // Update session BEFORE sending Complete + session.state = PairingState::Completed; + session.shared_secret = Some(shared_secret); + + // Send success Complete message + let response = PairingMessage::Complete { + session_id, + success: true, + reason: None, + }; + + serde_json::to_vec(&response).map_err(|e| NetworkingError::Serialization(e)) +} +``` + +**Key Change**: If Complete message fails to send, the error propagates and Bob never receives confirmation. + +--- + +## Phase 2: Fix Critical Security Issues + +### Issue #4: DoS via Unbounded Message Size + +**File**: `mod.rs:704`, `mod.rs:647` + +**Add constant**: + +```rust +// At top of mod.rs +const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB max +``` + +**Fix both locations**: + +```rust +// Line 647 (send_pairing_message_to_node) +let mut len_buf = [0u8; 4]; +match recv.read_exact(&mut len_buf).await { + Ok(_) => { + let resp_len = u32::from_be_bytes(len_buf) as usize; + + // Validate size + if resp_len > MAX_MESSAGE_SIZE { + return Err(NetworkingError::Protocol( + format!("Message too large: {} bytes", resp_len) + )); + } + + let mut resp_buf = vec![0u8; resp_len]; + // ... rest of code ... + } +} + +// Line 704 (handle_stream) +let msg_len = u32::from_be_bytes(len_buf) as usize; + +// Validate size +if msg_len > MAX_MESSAGE_SIZE { + self.logger.error(&format!("Rejecting oversized message: {} bytes", msg_len)).await; + break; +} + +let mut msg_buf = vec![0u8; msg_len]; +``` + +### Issue #5: Replay Attack Protection + +**File**: `initiator.rs:34`, `mod.rs:516` + +**Add challenge tracking**: + +```rust +// In PairingProtocolHandler struct +used_challenges: Arc, chrono::DateTime>>>, +``` + +**Generate challenge with timestamp**: + +```rust +fn generate_challenge(&self) -> Result> { + use rand::RngCore; + let mut challenge = vec![0u8; 40]; // 32 random + 8 timestamp + rand::thread_rng().fill_bytes(&mut challenge[0..32]); + + // Add timestamp + let timestamp = chrono::Utc::now().timestamp(); + challenge[32..40].copy_from_slice(×tamp.to_be_bytes()); + + Ok(challenge) +} +``` + +**Verify challenge hasn't been used**: + +```rust +// In handle_pairing_response, BEFORE verifying signature +let challenge_timestamp = { + if challenge.len() != 40 { + return Err(NetworkingError::Protocol("Invalid challenge format".to_string())); + } + let ts_bytes: [u8; 8] = challenge[32..40].try_into().unwrap(); + let timestamp = i64::from_be_bytes(ts_bytes); + chrono::DateTime::from_timestamp(timestamp, 0) + .ok_or(NetworkingError::Protocol("Invalid challenge timestamp".to_string()))? +}; + +// Check if challenge is too old (> 5 minutes) +let now = chrono::Utc::now(); +if now.signed_duration_since(challenge_timestamp) > chrono::Duration::minutes(5) { + return Err(NetworkingError::Protocol("Challenge expired".to_string())); +} + +// Check if challenge was already used +{ + let mut used = self.used_challenges.write().await; + if used.contains_key(&challenge) { + return Err(NetworkingError::Protocol("Challenge already used (replay attack?)".to_string())); + } + used.insert(challenge.clone(), now); + + // Cleanup old challenges (> 10 minutes) + used.retain(|_, &mut used_at| { + now.signed_duration_since(used_at) < chrono::Duration::minutes(10) + }); +} +``` + +### Issue #6: Use Proper KDF for Session Keys + +**File**: `mod.rs:524-532` + +**Current (WRONG)**: + +```rust +async fn generate_shared_secret(&self, session_id: Uuid) -> Result> { + let pairing_codes = self.pairing_codes.read().await; + let pairing_code = pairing_codes.get(&session_id).ok_or_else(|| { + NetworkingError::Protocol(format!("No pairing code found for session {}", session_id)) + })?; + Ok(pairing_code.secret().to_vec()) // Direct use! +} +``` + +**Fixed (use HKDF)**: + +```rust +async fn generate_shared_secret(&self, session_id: Uuid) -> Result> { + let pairing_codes = self.pairing_codes.read().await; + let pairing_code = pairing_codes.get(&session_id).ok_or_else(|| { + NetworkingError::Protocol(format!("No pairing code found for session {}", session_id)) + })?; + + // Use HKDF with session-specific context + use blake3::Hasher; + let mut hasher = Hasher::new(); + hasher.update(b"spacedrive-pairing-session-key-v1"); + hasher.update(session_id.as_bytes()); + hasher.update(pairing_code.secret()); + + Ok(hasher.finalize().as_bytes().to_vec()) +} +``` + +### Issue #7: Fix TOCTOU Race in Session Creation + +**File**: `mod.rs:254-279` + +**Current (WRONG)**: + +```rust +// Line 256: Check (read lock) +let sessions = self.active_sessions.read().await; +if let Some(existing_session) = sessions.get(&session_id) { + return Err(...); +} +// Lock released - RACE CONDITION HERE! + +// Line 277: Insert (write lock) +let mut sessions = self.active_sessions.write().await; +sessions.insert(session_id, session); +``` + +**Fixed**: + +```rust +// Single write lock for atomic check-and-insert +let mut sessions = self.active_sessions.write().await; +if sessions.contains_key(&session_id) { + return Err(NetworkingError::Protocol(format!( + "Session {} already exists", + session_id + ))); +} + +// Create new session +let session = PairingSession { + id: session_id, + state: PairingState::Scanning, + remote_device_id: None, + remote_device_info: None, + remote_public_key: None, + shared_secret: None, + created_at: chrono::Utc::now(), +}; + +sessions.insert(session_id, session); +``` + +### Issue #8: Align Timeout Values + +**File**: `mod.rs:368`, `types.rs:52` + +**Fix**: + +```rust +// types.rs:52 - Keep 5 minutes +expires_at: Utc::now() + chrono::Duration::minutes(5), + +// mod.rs:368 - Change from 10 to 5 minutes +let timeout_duration = chrono::Duration::minutes(5); // Match code expiry +``` + +--- + +## Phase 3: Fix Medium/Low Priority Issues + +### Issue #9: Fix get_current_pairing_code + +**File**: `mod.rs:233-237` + +**Replace HashMap with BTreeMap** or track most recent explicitly: + +```rust +// Option A: Use BTreeMap (ordered by insertion) +// In struct: pairing_codes: Arc>>, + +// Option B: Track most recent explicitly +// In struct: most_recent_pairing_code: Arc>>, + +pub async fn get_current_pairing_code(&self) -> Option { + self.most_recent_pairing_code.read().await.clone() +} + +// Update when creating pairing code: +*self.most_recent_pairing_code.write().await = Some(pairing_code.clone()); +``` + +### Issue #10: Sanitize Error Messages + +**File**: Throughout + +**Pattern**: + +```rust +// Don't expose internal state +format!("Session {} already exists in state {:?}", session_id, existing_session.state) + +// Generic error for external, detailed log for internal +self.log_error(&format!("Session {} already exists in state {:?}", session_id, existing_session.state)).await; +return Err(NetworkingError::Protocol("Session already exists".to_string())); +``` + +### Issue #11: Reduce Verbose Logging in Production + +**File**: All files + +Add log levels: + +```rust +// Use debug! for verbose info +self.log_debug(&format!("Session state: {:?}", session.state)).await; + +// Only info! for important events +self.log_info("Pairing completed successfully").await; +``` + +### Issue #12: Encrypt Persisted Sessions + +**File**: `persistence.rs:118-161` + +**Add encryption**: + +```rust +// Use platform keychain to store encryption key +// Encrypt JSON before writing to disk +// Decrypt when reading + +// Example using age or similar: +let encrypted_data = encrypt_with_platform_key(&json_data)?; +fs::write(&self.sessions_file, encrypted_data).await?; +``` + +--- + +## Implementation Order + +### Step 1: Protocol Flow Fixes (REQUIRED FOR TESTS) + +1. Fix `handle_pairing_challenge` to NOT complete pairing early +2. Fix `handle_completion` to be the ONLY place Bob completes +3. Fix `handle_pairing_response` to guarantee Complete send +4. Test: `cargo test device_pairing_test` should PASS + +### Step 2: Critical Security (DO NOT SHIP WITHOUT) + +5. Add message size limits (DoS fix) +6. Add replay attack protection +7. Fix TOCTOU race condition +8. Add proper KDF for session keys + +### Step 3: Important Security + +9. Encrypt persisted sessions +10. Align timeout values +11. Fix session_id derivation consistency (types.rs) + +### Step 4: Polish + +12. Fix get_current_pairing_code +13. Sanitize error messages +14. Reduce verbose logging +15. Improve cryptographic validation + +--- + +## Testing Plan + +### Unit Tests + +```rust +#[tokio::test] +async fn test_challenge_replay_protection() { + // Generate challenge + // Use it once ✓ + // Try to reuse it ✗ should fail +} + +#[tokio::test] +async fn test_message_size_limit() { + // Try to send 5GB message + // Should fail with "Message too large" +} + +#[tokio::test] +async fn test_joiner_waits_for_complete() { + // Bob sends Response + // Bob session should be ResponseSent (NOT Completed) + // Alice sends Complete + // NOW Bob session should be Completed +} +``` + +### Integration Test + +```bash +# Should pass after Phase 1 +cargo test device_pairing_test +``` + +--- + +## Success Criteria + +**Protocol Correctness**: + +- Test `device_pairing_test` passes 100% of the time +- No split-brain states possible +- Both Alice and Bob atomically complete pairing + +**Security**: + +- No DoS via large messages +- No replay attacks possible +- Proper mutual authentication +- Session keys properly derived +- Secrets encrypted at rest + +**Code Quality**: + +- No redundant logic +- Clear state machine +- Proper error handling +- Reasonable logging + +--- + +## Estimated Effort + +- **Phase 1**: 4-6 hours (protocol flow fixes) +- **Phase 2**: 4-6 hours (critical security) +- **Phase 3**: 2-4 hours (important security) +- **Phase 4**: 2-3 hours (polish) +- **Total**: ~12-19 hours + +--- + +## Risk Assessment + +**High Risk if NOT fixed**: + +- Split-brain pairing states +- DoS attacks crash daemon +- Replay attacks compromise security +- Plaintext secrets on disk + +**Low Risk when fixed**: + +- Well-tested protocol +- Defense in depth +- Cryptographic guarantees +- Production-ready security diff --git a/core/PAIRING_PROTOCOL_ANALYSIS.md b/core/PAIRING_PROTOCOL_ANALYSIS.md new file mode 100644 index 000000000..074cd6154 --- /dev/null +++ b/core/PAIRING_PROTOCOL_ANALYSIS.md @@ -0,0 +1,328 @@ +# Pairing Protocol Flow Analysis + +## Current Protocol Flow + +### 1. Alice (Initiator) Starts Pairing +**File**: `core/mod.rs:428-622` + +``` +1. Call start_pairing_as_initiator() +2. Generate session_id and pairing code with relay info +3. Start pairing session in WaitingForConnection state +4. Broadcast session_id via mDNS user_data +5. Wait for Bob to connect... +``` + +### 2. Bob (Joiner) Starts Pairing +**File**: `core/mod.rs:630-920` + +``` +1. Call start_pairing_as_joiner(code) +2. Parse pairing code to get session_id +3. Join pairing session in Scanning state +4. Discover Alice via: + - mDNS (local network) OR + - Relay (cross-network) +5. Connect to Alice using PAIRING_ALPN +6. Send PairingRequest message to Alice (lines 865-913) +``` + +### 3. Alice Receives PairingRequest +**File**: `initiator.rs:18-117` + +``` +1. Validate Bob's public key (line 26) +2. Generate 32-byte random challenge (line 34) +3. Update session to ChallengeReceived state (lines 64 or 85) +4. Store Bob's public key in session (line 69 or 90) +5. Send Challenge message back to Bob (lines 104-116) +``` + +### 4. Bob Receives Challenge ️ **CRITICAL ISSUE HERE** +**File**: `joiner.rs:17-211` + +``` +WRONG ORDER: +1. Sign challenge with Bob's private key (line 32) +2. Generate shared secret from pairing code (line 71) +3. Create session keys (line 72) +4. COMPLETE PAIRING IN REGISTRY (line 126) ← TOO EARLY! +5. MARK SESSION AS COMPLETED (line 175) ← TOO EARLY! +6. Send Response message to Alice (line 195) + +CORRECT ORDER SHOULD BE: +1. Sign challenge +2. Send Response to Alice +3. Wait for Complete message from Alice +4. ONLY THEN complete pairing and mark as Completed +``` + +**Split-Brain Scenario**: +``` +- Line 989 (mod.rs): command_sender.send(response) could fail +- If send fails: Alice never receives response +- But Bob already completed pairing (line 126) +- Result: Bob thinks paired, Alice doesn't +- Test fails because only Bob sees connection +``` + +### 5. Alice Receives Response +**File**: `initiator.rs:120-295` + +``` +1. Get Bob's public key from session (stored in step 3) +2. Verify signature: PairingSecurity::verify_challenge_response() (line 162) +3. If signature INVALID: + - Mark session as Failed + - Return error +4. If signature VALID: + - Generate shared secret (line 191) + - Register Bob in device registry (line 228) + - Complete pairing (line 244) + - Mark session as Completed (line 277) + - Send Complete message to Bob (line 288) +``` + +### 6. Bob Receives Complete (Redundant!) +**File**: `joiner.rs:214-411` + +``` +REDUNDANT: Everything already done in step 4! +1. Generate shared secret AGAIN (line 230) +2. Complete pairing AGAIN (line 326) +3. Mark session Completed AGAIN (line 342) + +This code only runs if Alice successfully sent Complete +If Alice never sends Complete, Bob still thinks pairing succeeded (from step 4) +``` + +--- + +## Critical Protocol Flaws + +### 1. No Cryptographic Certainty of Completion +**Issue**: Bob completes pairing without cryptographic proof that Alice verified his signature + +**Attack Scenario**: +``` +1. Attacker (Alice) sends Challenge to Bob +2. Bob signs challenge and sends Response +3. Bob immediately completes pairing (joiner.rs:126) +4. Attacker never verifies signature, just drops connection +5. Bob thinks pairing succeeded with attacker +6. Bob's device registry now has attacker's keys stored +``` + +**Fix**: Bob MUST wait for Alice's Complete message before completing pairing + +### 2. Split-Brain State +**Issue**: Bob and Alice can have different views of pairing success + +**Failure Modes**: +``` +Mode A: Response send fails (mod.rs:989) +- Bob: Completed ✓ +- Alice: WaitingForConnection or ChallengeReceived +- Result: Test fails, devices don't see each other + +Mode B: Alice rejects signature +- Bob: Completed ✓ +- Alice: Failed ✗ +- Result: Bob thinks paired, Alice knows it failed + +Mode C: Complete message send fails +- Bob: Completed ✓ +- Alice: Completed ✓ +- But Bob's completion handler never runs (joiner.rs:214) +- Result: Actually works, but redundant code confusing +``` + +### 3. Redundant Completion Logic +**Issue**: `handle_completion()` duplicates all work already done in `handle_pairing_challenge()` + +**Code Smell**: +```rust +// joiner.rs:71-178 (in handle_pairing_challenge) +let shared_secret = self.generate_shared_secret(session_id).await?; +let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); +registry.complete_pairing(...).await?; +session.state = PairingState::Completed; + +// joiner.rs:230-344 (in handle_completion) - EXACT SAME LOGIC +let shared_secret = self.generate_shared_secret(session_id).await?; +let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); +registry.complete_pairing(...).await?; +session.state = PairingState::Completed; +``` + +This suggests the protocol state machine is incorrectly designed. + +### 4. No Message Ordering Guarantees +**Issue**: QUIC streams don't guarantee Complete arrives before Bob times out + +Even if Alice sends Complete, network delays could cause: +``` +1. Bob sends Response at T+0ms +2. Bob completes pairing at T+1ms +3. Alice receives Response at T+500ms +4. Alice sends Complete at T+501ms +5. Bob's test timeout at T+1000ms ← Fails before Complete arrives +``` + +--- + +## Why Test Fails + +**Test File**: `tests/device_pairing_test.rs:89-134` + +Alice waits for: +```rust +let connected_devices = core.services.device.get_connected_devices().await.unwrap(); +if !connected_devices.is_empty() { // Line 96 + println!("Alice: Pairing completed successfully!"); +``` + +Bob waits for: +```rust +let connected_devices = core.services.device.get_connected_devices().await.unwrap(); +if !connected_devices.is_empty() { // Line 215 + println!("Bob: Pairing completed successfully!"); +``` + +**Failure Modes**: + +1. **Bob completes before Alice receives Response**: + - Bob marks self as Completed (joiner.rs:175) + - Bob calls registry.mark_connected (joiner.rs:151) + - Bob sees Alice as connected ✓ + - Alice never receives Response (network loss, send failure) + - Alice stays in ChallengeReceived state + - Alice never calls registry.mark_connected + - Alice doesn't see Bob as connected ✗ + - **Test hangs on Alice's wait loop (line 92-134)** + +2. **Alice rejects Bob's signature**: + - Alice calls verify_challenge_response (initiator.rs:162) + - Signature verification fails (corrupted data, timing attack, etc.) + - Alice marks session as Failed (initiator.rs:173-176) + - Alice never sends Complete + - Bob completed pairing already (joiner.rs:126) + - Bob sees Alice as connected ✓ + - Alice sees Bob as failed ✗ + - **Test hangs on Alice's wait loop** + +3. **TOCTOU in connection tracking**: + - Bob sends Response + - Alice verifies signature ✓ + - Alice completes pairing + - Alice marks Bob as connected (initiator.rs:256) + - Alice sends Complete message + - **Complete send fails** (connection closed, network error) + - Bob's handle_completion never called + - But Bob already completed (joiner.rs:126) + - Both think paired, but Alice's registry might not have correct state + - **Test might pass or fail randomly** + +--- + +## Correct Protocol Design + +### Fixed Flow + +``` +1. Alice → Bob: Challenge(session_id, challenge, alice_device_info) +2. Bob signs challenge +3. Bob → Alice: Response(session_id, signature, bob_device_info) +4. Alice verifies signature +5. Alice generates shared_secret +6. Alice completes pairing +7. Alice → Bob: Complete(session_id, success=true) +8. Bob receives Complete +9. Bob NOW completes pairing (NOT before step 8!) +10. Both sides atomically mark as connected +``` + +### Required Changes + +**File: `joiner.rs:17-211` (handle_pairing_challenge)** + +```rust +// REMOVE these lines (66-178): +// - generate_shared_secret() +// - SessionKeys::from_shared_secret() +// - registry.complete_pairing() +// - registry.mark_connected() +// - session.state = PairingState::Completed + +// ONLY do: +1. Sign challenge +2. Send Response +3. Transition to ResponseSent state (NOT Completed!) +4. Wait for Complete message +``` + +**File: `joiner.rs:214-411` (handle_completion)** + +```rust +// KEEP all the pairing logic HERE (not in handle_pairing_challenge) +// This is the ONLY place Bob should complete pairing +``` + +**File: `initiator.rs:120-295` (handle_pairing_response)** + +```rust +// MUST send Complete message synchronously +// Cannot fail silently +// If send fails, mark session as Failed +``` + +--- + +## Security Implications + +### Current System is Vulnerable + +**Vulnerability**: Bob completes pairing without cryptographic proof Alice accepted him + +**Attack**: Rogue Alice +``` +1. Attacker runs modified Alice that: + - Sends Challenge to Bob + - Receives Bob's signed Response + - Never verifies signature + - Stores Bob's public key and device info + - Drops connection + +2. Bob completes pairing (current code joiner.rs:126) +3. Bob thinks he's paired with legitimate Alice +4. Attacker has Bob's: + - Public key + - Device info + - Session keys (derived from pairing code Bob entered) +``` + +**Fix**: Bob MUST wait for Complete message before trusting the pairing + +### Proper Mutual Authentication + +Both sides must cryptographically confirm: +``` +Alice verifies: Bob signed the challenge with his claimed public key +Bob verifies: Alice sent Complete message (proves Alice accepted the signature) +``` + +Only after BOTH verifications should pairing complete on both sides. + +--- + +## Test Requirements + +For `device_pairing_test.rs` to pass 100%: + +1. Alice must see Bob as connected +2. Bob must see Alice as connected +3. Both must happen atomically (no split-brain) +4. Must handle network failures gracefully +5. Must have timeout if pairing fails + +Current code fails because Bob completes before Alice confirms. diff --git a/core/PAIRING_SECURITY_ISSUES.md b/core/PAIRING_SECURITY_ISSUES.md new file mode 100644 index 000000000..fe18ad6cb --- /dev/null +++ b/core/PAIRING_SECURITY_ISSUES.md @@ -0,0 +1,186 @@ +# Pairing Protocol Security Issues + +## CRITICAL - Must Fix Immediately + +### 1. Memory Exhaustion DoS Vulnerability +**Severity**: CRITICAL +**Location**: `mod.rs:704`, `mod.rs:647` +**Impact**: Attacker can crash the application by claiming 4GB message size + +```rust +// VULNERABLE CODE +let msg_len = u32::from_be_bytes(len_buf) as usize; +let mut msg_buf = vec![0u8; msg_len]; // NO SIZE LIMIT! +``` + +**Fix**: Add maximum message size constant +```rust +const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB +if msg_len > MAX_MESSAGE_SIZE { + return Err(NetworkingError::Protocol("Message too large".to_string())); +} +``` + +--- + +### 2. Plaintext Storage of Cryptographic Secrets +**Severity**: CRITICAL +**Location**: `persistence.rs:118-161` +**Impact**: Filesystem access = complete security compromise + +```rust +// VULNERABLE CODE +let json_data = serde_json::to_string_pretty(&persisted) // Plaintext! +``` + +**Fix**: Encrypt sessions file or use platform keychain + +--- + +### 3. Session State Split-Brain Condition +**Severity**: CRITICAL +**Location**: `joiner.rs:66-191` +**Impact**: Joiner completes pairing before confirming initiator received response + +**Current Flow (BROKEN)**: +``` +Joiner: +1. Generates shared secret (line 71) +2. Completes pairing in registry (line 126) +3. Marks session as Completed (line 175) +4. THEN sends response (line 989 in mod.rs) + └─> If this fails, initiator never completes but joiner thinks it did +``` + +**Fix**: Only complete after receiving confirmation from initiator + +--- + +### 4. Session Fixation via QR Codes +**Severity**: CRITICAL +**Location**: `types.rs:126-218` +**Impact**: Attacker can create QR code with controlled session_id + +```rust +// Line 213: session_id from QR is trusted +session_id, // Use the session_id from the QR code +``` + +**Fix**: Always derive session_id from cryptographic secret + +--- + +## HIGH - Important to Address + +### 5. TOCTOU Race Condition in Session Creation +**Severity**: HIGH +**Location**: `mod.rs:254-279` +**Impact**: Concurrent session creation can corrupt state + +```rust +// RACE CONDITION +let sessions = self.active_sessions.read().await; // Check +if sessions.get(&session_id).is_some() { return Err(...); } +// Lock released - another thread could insert here! +let mut sessions = self.active_sessions.write().await; // Insert +sessions.insert(session_id, session); +``` + +**Fix**: Use single write lock with entry API + +--- + +### 6. No Replay Attack Protection +**Severity**: HIGH +**Location**: `initiator.rs:34` +**Impact**: Captured challenge-response can be replayed + +**Fix**: Add timestamp to challenges and track used challenges + +--- + +### 7. No Key Derivation Function +**Severity**: HIGH +**Location**: `mod.rs:524-532` +**Impact**: Pairing code secret used directly as session key + +```rust +// WEAK +Ok(pairing_code.secret().to_vec()) // Direct use! +``` + +**Fix**: Use HKDF with session-specific context + +--- + +### 8. Mismatched Timeout Values +**Severity**: HIGH +**Location**: `mod.rs:368`, `types.rs:52` + +```rust +// Session cleanup: 10 minutes +let timeout_duration = chrono::Duration::minutes(10); + +// Code expiry: 5 minutes +expires_at: Utc::now() + chrono::Duration::minutes(5), +``` + +**Fix**: Align timeouts (use 5 minutes for both) + +--- + +## MEDIUM + +### 9. Unpredictable Pairing Code Selection +**Severity**: MEDIUM +**Location**: `mod.rs:233-237` + +```rust +codes.values().last().cloned() // HashMap order is random! +``` + +**Fix**: Use BTreeMap or track most recent explicitly + +--- + +### 10. Overly Detailed Error Messages +**Severity**: MEDIUM +**Impact**: Reveals internal state to attackers + +**Fix**: Generic errors for external-facing, detailed logs internally + +--- + +### 11. Premature Connection Status +**Severity**: MEDIUM +**Location**: `initiator.rs:248-273`, `joiner.rs:140-163` + +```rust +let simple_connection = ConnectionInfo { + addresses: vec![], // Empty! Not really connected +``` + +**Fix**: Only mark connected when real connection established + +--- + +## LOW + +### 12. Weak Cryptographic Validation +**Location**: `security.rs:50-55` +**Fix**: Check for more weak patterns beyond all-zeros + +### 13. Verbose Production Logging +**Fix**: Reduce logging in production builds + +### 14. Inconsistent Session ID Derivation +**Location**: `types.rs:59-92` +**Fix**: Clarify why session_id is re-derived in `from_session_id()` + +--- + +## Fix Priority + +1. **Phase 1 (Immediate)**: #1, #3, #5, #6 +2. **Phase 2 (Important)**: #2, #4, #7, #8 +3. **Phase 3 (Nice to have)**: #9, #10, #11, #12, #13, #14 diff --git a/core/crush.json b/core/crush.json new file mode 100644 index 000000000..170f0e769 --- /dev/null +++ b/core/crush.json @@ -0,0 +1,18 @@ +{ + "$schema": "https://charm.land/crush.json", + "providers": { + "lmstudio": { + "name": "LM Studio", + "base_url": "http://localhost:1234/v1/", + "type": "openai", + "models": [ + { + "name": "Qwen3 30B MOE", + "id": "local-model", + "context_window": 131072, + "default_max_tokens": 20000 + } + ] + } + } +} \ No newline at end of file diff --git a/core/examples/location_watcher_demo.rs b/core/examples/location_watcher_demo.rs new file mode 100644 index 000000000..6c38f5152 --- /dev/null +++ b/core/examples/location_watcher_demo.rs @@ -0,0 +1,181 @@ +//! Location Watcher Demo +//! +//! This example demonstrates how to use the location watcher to monitor +//! file system changes in real-time. + +use sd_core::{infra::event::Event, Core}; +use std::path::PathBuf; +use tokio::time::{sleep, Duration}; +use tracing::info; +use uuid::Uuid; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + tracing_subscriber::fmt::init(); + + info!("Starting Location Watcher Demo"); + + // Initialize core + let core = Core::new().await?; + info!("Core initialized successfully"); + + // Create a test library + let library = core + .libraries + .create_library("Watcher Demo Library", None, core.context.clone()) + .await?; + + let library_id = library.id(); + info!("Created demo library: {}", library_id); + + // Add a location to watch + let watch_dir = PathBuf::from("./data/spacedrive_watcher_demo"); + tokio::fs::create_dir_all(&watch_dir).await?; + + let location_id = Uuid::new_v4(); + core.add_watched_location(location_id, library_id, watch_dir.clone(), true) + .await?; + info!("Added watched location: {}", watch_dir.display()); + + // Subscribe to events + let mut event_subscriber = core.events.subscribe(); + + // Spawn event listener + let events_handle = tokio::spawn(async move { + info!("Event listener started"); + + while let Ok(event) = event_subscriber.recv().await { + match event { + Event::EntryCreated { + library_id, + entry_id, + } => { + info!( + "File created - Library: {}, Entry: {}", + library_id, entry_id + ); + } + Event::EntryModified { + library_id, + entry_id, + } => { + info!( + " File modified - Library: {}, Entry: {}", + library_id, entry_id + ); + } + Event::EntryDeleted { + library_id, + entry_id, + } => { + info!( + " File deleted - Library: {}, Entry: {}", + library_id, entry_id + ); + } + Event::EntryMoved { + library_id, + entry_id, + old_path, + new_path, + } => { + info!( + "File moved - Library: {}, Entry: {}, {} -> {}", + library_id, entry_id, old_path, new_path + ); + } + _ => {} // Ignore other events for this demo + } + } + }); + + // Simulate file operations + info!("Starting file operations simulation..."); + + // Create a test file + let test_file = watch_dir.join("test_file.txt"); + tokio::fs::write(&test_file, "Hello, Spacedrive!").await?; + info!("Created test file: {}", test_file.display()); + sleep(Duration::from_millis(200)).await; + + // Modify the file + tokio::fs::write(&test_file, "Hello, Spacedrive! Modified content.").await?; + info!("Modified test file"); + sleep(Duration::from_millis(200)).await; + + // Create a directory + let test_dir = watch_dir.join("test_directory"); + tokio::fs::create_dir(&test_dir).await?; + info!("Created test directory: {}", test_dir.display()); + sleep(Duration::from_millis(200)).await; + + // Create a file in the directory + let nested_file = test_dir.join("nested_file.txt"); + tokio::fs::write(&nested_file, "Nested file content").await?; + info!("Created nested file: {}", nested_file.display()); + sleep(Duration::from_millis(200)).await; + + // Rename the file + let renamed_file = test_dir.join("renamed_file.txt"); + tokio::fs::rename(&nested_file, &renamed_file).await?; + info!( + "Renamed file: {} -> {}", + nested_file.display(), + renamed_file.display() + ); + sleep(Duration::from_millis(200)).await; + + // Delete the file + tokio::fs::remove_file(&renamed_file).await?; + info!("Deleted file: {}", renamed_file.display()); + sleep(Duration::from_millis(200)).await; + + // Delete the directory + tokio::fs::remove_dir(&test_dir).await?; + info!("Deleted directory: {}", test_dir.display()); + sleep(Duration::from_millis(200)).await; + + // Delete the original test file + tokio::fs::remove_file(&test_file).await?; + info!("Deleted test file: {}", test_file.display()); + + // Give some time for all events to be processed + sleep(Duration::from_secs(2)).await; + + // Display current watched locations + let watched_locations = core.get_watched_locations().await; + info!("Currently watching {} locations:", watched_locations.len()); + for location in watched_locations { + info!( + " - {} ({}): {} [{}]", + location.id, + location.library_id, + location.path.display(), + if location.enabled { + "enabled" + } else { + "disabled" + } + ); + } + + // Clean up + core.remove_watched_location(location_id).await?; + info!("Removed watched location"); + + // Clean up directory + if watch_dir.exists() { + tokio::fs::remove_dir_all(&watch_dir).await?; + info!("Cleaned up demo directory"); + } + + // Stop event listener + events_handle.abort(); + + // Shutdown core + core.shutdown().await?; + info!("Demo completed successfully"); + + Ok(()) +} diff --git a/core/rust-toolchain.toml b/core/rust-toolchain.toml new file mode 100644 index 000000000..31578d3bf --- /dev/null +++ b/core/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "stable" \ No newline at end of file diff --git a/core/scripts/combine.sh b/core/scripts/combine.sh new file mode 100755 index 000000000..a2f32ff01 --- /dev/null +++ b/core/scripts/combine.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# A generic script to combine files from a directory into a single file. + +# Function to show usage +usage() { + echo "Usage: $0 [options]" + echo "" + echo "Commands:" + echo " docs [--with-design] Combine documentation files (.md) from 'docs/'." + echo " --with-design: Include the 'docs/design' directory." + echo " rust [path] Combine Rust files (.rs) from a given path (default: '.')." + echo " Respects .gitignore." + echo " cli Combine Rust files (.rs) from 'apps/cli'." + echo " Respects .gitignore." + echo " tasks Combine task files (.md) from '.tasks/'." + + echo "" + echo "Options:" + echo "-h, --help Show this help message." +} + +# Function to check if a file should be ignored based on .gitignore +is_ignored() { + git check-ignore -q "$1" +} + +# Main combine function +combine_files() { + local search_path=$1 + local file_pattern=$2 + local output_file=$3 + local title=$4 + local lang_tag=$5 + local respect_gitignore=$6 + shift 6 + local exclude_patterns=("$@") + + echo "Combining files from '$search_path' into '$output_file'..." + + # Clear the output file + > "$output_file" + + echo "# $title" >> "$output_file" + echo "" >> "$output_file" + + # Build find args and exclude patterns + local find_args=("$search_path" -name "$file_pattern" -type f) + for pattern in "${exclude_patterns[@]}"; do + find_args+=("!" "-path" "$pattern") + done + + # Run find with exec to safely process files one-by-one + find "${find_args[@]}" | sort | while read -r file; do + if [ "$respect_gitignore" = "true" ] && git check-ignore -q "$file"; then + continue + fi + + echo "Processing: $file" + echo "## ${file}" >> "$output_file" + echo "" >> "$output_file" + echo '```'$lang_tag >> "$output_file" + + if [ -r "$file" ]; then + cat "$file" >> "$output_file" + else + echo "[Could not read file]" >> "$output_file" + fi + + echo '```' >> "$output_file" + echo "" >> "$output_file" + done + + echo "Done! All files have been combined into $output_file" +} + +# Main script logic +if [ "$#" -eq 0 ]; then + usage + exit 1 +fi + +COMMAND=$1 +shift + +case $COMMAND in + docs) + include_design=false + if [ "$1" = "--with-design" ]; then + include_design=true + fi + + exclude_patterns=() + if [ "$include_design" = "false" ]; then + exclude_patterns+=("../docs/design/*") + fi + + combine_files "../docs" "*.md" "combined_docs.txt" "Combined Documentation Files" "markdown" "false" "${exclude_patterns[@]}" + ;; + rust) + root_path=${1:-.} + combine_files "$root_path" "*.rs" "combined_rust_files.txt" "Combined Rust Files" "rust" "true" + ;; + cli) + combine_files "./apps/cli" "*.rs" "combined_cli_rust_files.txt" "Combined CLI Rust Files" "rust" "true" + ;; + tasks) + combine_files "../.tasks" "*.md" "combined_tasks.txt" "Combined Task Files" "markdown" "false" + ;; + -h|--help) + usage + ;; + *) + echo "Error: Unknown command '$COMMAND'" + usage + exit 1 + ;; +esac diff --git a/core/scripts/test_daemon.sh b/core/scripts/test_daemon.sh new file mode 100755 index 000000000..fab9d9fff --- /dev/null +++ b/core/scripts/test_daemon.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Start daemon in background +./target/release/spacedrive daemon start --foreground 2>&1 | tee daemon_test.log & +DAEMON_PID=$! + +# Wait for daemon to initialize +sleep 3 + +# Try to create a library +echo "Creating library..." +./target/release/spacedrive library create "TestLib" 2>&1 + +# Check if daemon is still running +if kill -0 $DAEMON_PID 2>/dev/null; then + echo "Daemon is still running" +else + echo "Daemon crashed!" +fi + +# Stop daemon +kill $DAEMON_PID 2>/dev/null + +# Show any errors from the log +echo "Last 20 lines of daemon log:" +tail -20 daemon_test.log \ No newline at end of file diff --git a/core/scripts/update_spacedrive.sh b/core/scripts/update_spacedrive.sh new file mode 100644 index 000000000..fe97a921e --- /dev/null +++ b/core/scripts/update_spacedrive.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# --- Configuration --- +PROJECT_ROOT="/Users/jamespine/Projects/spacedrive/core" +INSTALL_DIR="/usr/local/bin" +BINARY_NAME="spacedrive" + +# --- Script Logic --- + +echo "Updating Spacedrive CLI..." + +# Navigate to the project root +cd "$PROJECT_ROOT" || { echo "Error: Could not navigate to project root: $PROJECT_ROOT"; exit 1; } + +# Pull latest changes from Git +echo "Pulling latest changes from Git..." +git pull || { echo "Error: Git pull failed."; exit 1; } + +# Build the project in release mode +echo "Building Spacedrive CLI (release mode)..." +cargo build --release || { echo "Error: Cargo build failed."; exit 1; } + +# Define source and destination paths +SOURCE_BINARY="$PROJECT_ROOT/target/release/$BINARY_NAME" +DEST_BINARY="$INSTALL_DIR/$BINARY_NAME" + +# Ensure the installation directory exists +mkdir -p "$INSTALL_DIR" || { echo "Error: Could not create installation directory: $INSTALL_DIR"; exit 1; } + +# Copy the new binary to the installation directory +echo "Copying new binary to $DEST_BINARY..." +cp "$SOURCE_BINARY" "$DEST_BINARY" || { echo "Error: Could not copy binary to $DEST_BINARY. Check permissions."; exit 1; } + +echo "Spacedrive CLI updated successfully!" + +# Optional: Display the version of the newly installed binary +if [ -x "$DEST_BINARY" ]; then + echo "Installed version:" + "$DEST_BINARY" --version +fi diff --git a/core/src/cqrs.rs b/core/src/cqrs.rs new file mode 100644 index 000000000..c4ff6d08a --- /dev/null +++ b/core/src/cqrs.rs @@ -0,0 +1,112 @@ +//! CQRS (Command Query Responsibility Segregation) for Spacedrive Core +//! +//! This module provides a simplified CQRS implementation that leverages existing +//! infrastructure while providing modular outputs (copying the job system pattern). +//! TODO: Remove Query in favor of CoreQuery and LibraryQuery + +use crate::context::CoreContext; +use anyhow::Result; +use std::sync::Arc; +use uuid::Uuid; + +/// A query that retrieves data without mutating state. +/// +/// This trait provides the foundation for read-only operations with +/// consistent infrastructure (validation, permissions, logging). +pub trait Query: Send + 'static { + /// The data structure returned by the query (owned by the operation module). + type Output: Send + Sync + 'static; + type Input: Send + Sync + 'static; + + /// Execute this query with the given context. + fn execute( + self, + context: Arc, + ) -> impl std::future::Future> + Send; +} + +/// Library-scoped query that operates within a specific library context +pub trait LibraryQuery: Send + 'static { + /// The input data structure for this query + type Input: Send + Sync + 'static; + /// The data structure returned by the query + type Output: Send + Sync + 'static; + + /// Create query from input + fn from_input(input: Self::Input) -> Result + where + Self: Sized; + + /// Execute this query with rich session context + /// + /// The session provides authentication, permissions, audit context, + /// and library context when needed + fn execute( + self, + context: Arc, + session: crate::infra::api::SessionContext, + ) -> impl std::future::Future> + Send; +} + +/// Core-level query that operates at the daemon level +pub trait CoreQuery: Send + 'static { + /// The input data structure for this query + type Input: Send + Sync + 'static; + /// The data structure returned by the query + type Output: Send + Sync + 'static; + + /// Create query from input + fn from_input(input: Self::Input) -> Result + where + Self: Sized; + + /// Execute this query with rich session context + /// + /// The session provides authentication, permissions, and audit context + fn execute( + self, + context: Arc, + session: crate::infra::api::SessionContext, + ) -> impl std::future::Future> + Send; +} + +/// QueryManager provides infrastructure for read-only operations. +/// +/// This mirrors the ActionManager pattern but for queries, providing +/// validation, permissions checking, and audit logging for read operations. +pub struct QueryManager { + context: Arc, +} + +impl QueryManager { + /// Create a new QueryManager + pub fn new(context: Arc) -> Self { + Self { context } + } + + /// Dispatch a query for execution with full infrastructure support + pub async fn dispatch(&self, query: Q) -> Result { + query.execute(self.context.clone()).await + } + + /// Dispatch a core-scoped query for execution with full infrastructure support + pub async fn dispatch_core(&self, query: Q) -> Result { + // Create session context for core queries + let device_id = self.context.device_manager.device_id()?; + let session = crate::infra::api::SessionContext::device_session(device_id, "Core Device".to_string()); + query.execute(self.context.clone(), session).await + } + + /// Dispatch a library-scoped query for execution with full infrastructure support + pub async fn dispatch_library( + &self, + query: Q, + library_id: Uuid, + ) -> Result { + // Create session context for library queries with library context + let device_id = self.context.device_manager.device_id()?; + let mut session = crate::infra::api::SessionContext::device_session(device_id, "Core Device".to_string()); + session = session.with_library(library_id); + query.execute(self.context.clone(), session).await + } +} diff --git a/core/src/domain/entry.rs b/core/src/domain/entry.rs new file mode 100644 index 000000000..74df9076e --- /dev/null +++ b/core/src/domain/entry.rs @@ -0,0 +1,242 @@ +//! Entry - the core file/directory representation in Spacedrive +//! +//! An Entry represents any filesystem item (file, directory, symlink) that +//! Spacedrive knows about. It's the foundation of the VDFS. + +use crate::domain::addressing::SdPath; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use specta::Type; +use uuid::Uuid; + +/// Represents any filesystem entry (file or directory) in the VDFS +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +pub struct Entry { + /// Unique identifier for this entry + pub id: Uuid, + + /// The virtual path including device context + pub sd_path: SdPathSerialized, + + /// File/directory name + pub name: String, + + /// Type of entry + pub kind: EntryKind, + + /// Size in bytes (None for directories) + pub size: Option, + + /// Filesystem timestamps + pub created_at: Option>, + pub modified_at: Option>, + pub accessed_at: Option>, + + /// Platform-specific identifiers + pub inode: Option, // Unix/macOS + pub file_id: Option, // Windows + + /// Parent directory entry ID + pub parent_id: Option, + + /// Location this entry belongs to (if indexed) + pub location_id: Option, + + /// User metadata (ALWAYS exists - key innovation!) + pub metadata_id: Uuid, + + /// Content identity for deduplication (optional) + pub content_id: Option, + + /// Tracking information + pub first_seen_at: DateTime, + pub last_indexed_at: Option>, +} + +/// Type of filesystem entry +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Type)] +pub enum EntryKind { + /// Regular file + File { + /// File extension (without dot) + extension: Option, + }, + + /// Directory + Directory, + + /// Symbolic link + Symlink { + /// Target path + target: String, + }, +} + +/// How SdPath is stored in the database +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +pub struct SdPathSerialized { + /// Device where this entry exists + pub device_id: Uuid, + + /// Normalized path on that device + pub path: String, +} + +impl SdPathSerialized { + /// Create from an SdPath + pub fn from_sdpath(sdpath: &SdPath) -> Option { + match sdpath { + SdPath::Physical { device_id, path } => Some(Self { + device_id: *device_id, + path: path.to_string_lossy().to_string(), + }), + SdPath::Content { .. } => None, // Can't serialize content paths to this format + } + } + + /// Convert back to SdPath + pub fn to_sdpath(&self) -> SdPath { + SdPath::Physical { + device_id: self.device_id, + path: self.path.clone().into(), + } + } +} + +impl Entry { + /// Create a new Entry from filesystem metadata + pub fn new(sd_path: SdPath, metadata: std::fs::Metadata) -> Self { + let name = sd_path.file_name().unwrap_or("unknown").to_string(); + + let kind = if metadata.is_dir() { + EntryKind::Directory + } else if metadata.is_symlink() { + EntryKind::Symlink { + target: String::new(), // Would need to read link + } + } else { + let extension = sd_path + .path() + .and_then(|p| p.extension()) + .and_then(|e| e.to_str()) + .map(|e| e.to_string()); + EntryKind::File { extension } + }; + + let size = if metadata.is_file() { + Some(metadata.len()) + } else { + None + }; + + Self { + id: Uuid::new_v4(), + sd_path: SdPathSerialized::from_sdpath(&sd_path) + .expect("Entry requires a physical path"), + name, + kind, + size, + created_at: metadata.created().ok().map(|t| t.into()), + modified_at: metadata.modified().ok().map(|t| t.into()), + accessed_at: metadata.accessed().ok().map(|t| t.into()), + inode: None, // Platform-specific, would need conditional compilation + file_id: None, + parent_id: None, + location_id: None, + metadata_id: Uuid::new_v4(), // Will create UserMetadata with this ID + content_id: None, + first_seen_at: Utc::now(), + last_indexed_at: None, + } + } + + /// Check if this is a file + pub fn is_file(&self) -> bool { + matches!(self.kind, EntryKind::File { .. }) + } + + /// Check if this is a directory + pub fn is_directory(&self) -> bool { + matches!(self.kind, EntryKind::Directory) + } + + /// Get the file extension if this is a file + pub fn extension(&self) -> Option<&str> { + match &self.kind { + EntryKind::File { extension } => extension.as_deref(), + _ => None, + } + } + + /// Get the SdPath for this entry + pub fn sd_path(&self) -> SdPath { + self.sd_path.to_sdpath() + } +} + +/// Conversion from database model to domain Entry +impl TryFrom<(crate::infra::db::entities::entry::Model, SdPath)> for Entry { + type Error = anyhow::Error; + + fn try_from( + (entry_model, parent_sd_path): (crate::infra::db::entities::entry::Model, SdPath), + ) -> Result { + let device_uuid = match &parent_sd_path { + SdPath::Physical { device_id, .. } => *device_id, + SdPath::Content { .. } => { + return Err(anyhow::anyhow!( + "Content-addressed paths not supported for directory listing" + )) + } + }; + + // Construct the full path properly to avoid double slashes + // TODO: validation should happen on SdPath imo + let full_path = if entry_model.parent_id.is_none() { + format!("/{}", entry_model.name) + } else { + let parent_path = parent_sd_path.display().to_string(); + if parent_path.ends_with('/') { + format!("{}{}", parent_path, entry_model.name) + } else { + format!("{}/{}", parent_path, entry_model.name) + } + }; + + Ok(Entry { + id: entry_model.uuid.unwrap_or_else(|| Uuid::new_v4()), + sd_path: SdPathSerialized { + device_id: device_uuid, + path: full_path, + }, + name: entry_model.name, + kind: match entry_model.kind { + 0 => EntryKind::File { + extension: entry_model.extension, + }, + 1 => EntryKind::Directory, + 2 => EntryKind::Symlink { + target: "".to_string(), // TODO: Get from database + }, + _ => EntryKind::File { + extension: entry_model.extension, + }, + }, + size: Some(entry_model.size as u64), + created_at: Some(entry_model.created_at), + modified_at: Some(entry_model.modified_at), + accessed_at: entry_model.accessed_at, + inode: entry_model.inode.map(|i| i as u64), + file_id: None, + parent_id: entry_model.parent_id.map(|_| Uuid::new_v4()), // TODO: Proper UUID conversion + location_id: None, + metadata_id: entry_model + .metadata_id + .map(|_| Uuid::new_v4()) + .unwrap_or_else(Uuid::new_v4), + content_id: entry_model.content_id.map(|_| Uuid::new_v4()), // TODO: Proper UUID conversion + first_seen_at: entry_model.created_at, + last_indexed_at: Some(entry_model.created_at), + }) + } +} diff --git a/core/src/infra/db/entities/label.rs b/core/src/infra/db/entities/label.rs new file mode 100644 index 000000000..37ea2dc9c --- /dev/null +++ b/core/src/infra/db/entities/label.rs @@ -0,0 +1,19 @@ +//! Label entity + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "labels")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub uuid: Uuid, + pub name: String, + pub created_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file diff --git a/core/src/infra/db/entities/metadata_label.rs b/core/src/infra/db/entities/metadata_label.rs new file mode 100644 index 000000000..3b7e2f2ec --- /dev/null +++ b/core/src/infra/db/entities/metadata_label.rs @@ -0,0 +1,42 @@ +//! MetadataLabel junction entity + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "metadata_labels")] +pub struct Model { + #[sea_orm(primary_key)] + pub metadata_id: i32, + #[sea_orm(primary_key)] + pub label_id: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::user_metadata::Entity", + from = "Column::MetadataId", + to = "super::user_metadata::Column::Id" + )] + UserMetadata, + #[sea_orm( + belongs_to = "super::label::Entity", + from = "Column::LabelId", + to = "super::label::Column::Id" + )] + Label, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::UserMetadata.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Label.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file diff --git a/core/src/infra/sync/registry.rs b/core/src/infra/sync/registry.rs index 5a085f4ae..e89b2e8e5 100644 --- a/core/src/infra/sync/registry.rs +++ b/core/src/infra/sync/registry.rs @@ -440,6 +440,58 @@ pub async fn query_device_state( .map_err(|e| ApplyError::DatabaseError(e.to_string())) } +/// Query all shared models for backfill (generic registry-based approach) +/// +/// This discovers and queries ALL shared models registered in the system, +/// making it completely generic - no need to modify this when adding new shared models. +/// +/// # Parameters +/// - `since`: Optional timestamp filter +/// - `batch_size`: Max records per model type +/// - `db`: Database connection +/// +/// # Returns +/// HashMap mapping model_type -> Vec of (uuid, data, timestamp) tuples +pub async fn query_all_shared_models( + since: Option>, + batch_size: usize, + db: Arc, +) -> Result)>>, ApplyError> { + // Collect all shared models with query functions + let shared_models: Vec<(String, StateQueryFn)> = { + let registry = SYNCABLE_REGISTRY.read().await; + registry + .iter() + .filter(|(_, reg)| !reg.is_device_owned && reg.state_query_fn.is_some()) + .map(|(model_type, reg)| (model_type.clone(), reg.state_query_fn.unwrap())) + .collect() + }; // Lock dropped + + // Query all models concurrently + let mut results = HashMap::new(); + + for (model_type, query_fn) in shared_models { + let records = query_fn(None, since, batch_size, db.clone()) + .await + .map_err(|e| ApplyError::DatabaseError(format!( + "Failed to query {}: {}", + model_type, + e + )))?; + + if !records.is_empty() { + tracing::info!( + model_type = %model_type, + count = records.len(), + "Queried shared model for backfill" + ); + results.insert(model_type, records); + } + } + + Ok(results) +} + /// Errors that can occur when applying sync entries #[derive(Debug, thiserror::Error)] pub enum ApplyError { diff --git a/core/src/lib.rs b/core/src/lib.rs index 62cdd273c..aff3460ae 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -321,6 +321,34 @@ impl Core { Err(e) => error!("Failed to start services: {}", e), } + // Set up networking event bridge and register protocol handlers AFTER networking is started + if service_config.networking_enabled { + if let Some(networking) = services.networking() { + // Set up event bridge to integrate network events with core event system + let event_bridge = NetworkEventBridge::new( + networking.subscribe_events(), + events.clone(), + ); + tokio::spawn(event_bridge.run()); + info!("Network event bridge initialized"); + + // Register default protocol handlers (pairing, messaging, file transfer) + info!("Registering default protocol handlers..."); + let data_dir_for_protocols = config.read().await.data_dir.clone(); + if let Err(e) = register_default_protocol_handlers( + &networking, + data_dir_for_protocols, + context.clone(), + ) + .await + { + error!("Failed to register default protocol handlers: {}", e); + } else { + info!("Default protocol handlers registered successfully"); + } + } + } + //Initialize ActionManager and set it in context let action_manager = Arc::new(crate::infra::action::manager::ActionManager::new( context.clone(), @@ -403,23 +431,14 @@ impl Core { // Register protocols and set up event bridge if let Some(networking_service) = self.services.networking() { - // Check if protocols are already registered by checking handler count - let handler_count = { - let registry = networking_service.protocol_registry(); - let registry_guard = registry.read().await; - registry_guard.handler_count() - }; - - // Register default protocol handlers if not already registered - if handler_count == 0 { + // Register default protocol handlers only if networking was just initialized + // (if networking was already initialized during Core::new(), protocols are already registered) + if !already_initialized { logger.info("Registering protocol handlers...").await; self.register_default_protocols(&networking_service).await?; } else { logger - .info(&format!( - "Protocol handlers already registered ({} handlers)", - handler_count - )) + .info("Protocol handlers already registered during initialization") .await; } @@ -445,79 +464,8 @@ impl Core { &self, networking: &service::network::NetworkingService, ) -> Result<(), Box> { - let logger = std::sync::Arc::new(service::network::utils::logging::ConsoleLogger); - - // Get command sender for the pairing handler's state machine - let command_sender = networking - .command_sender() - .ok_or("NetworkingEventLoop command sender not available")? - .clone(); - - // Get data directory from config - let data_dir = { - let config = self.config.read().await; - config.data_dir.clone() - }; - - let pairing_handler = Arc::new( - service::network::protocol::PairingProtocolHandler::new_with_persistence( - networking.identity().clone(), - networking.device_registry(), - logger.clone(), - command_sender, - data_dir, - ), - ); - - // Try to load persisted sessions, but don't fail if there's an error - if let Err(e) = pairing_handler.load_persisted_sessions().await { - logger - .warn(&format!( - "Failed to load persisted pairing sessions: {}. Starting with empty sessions.", - e - )) - .await; - } - - // Start the state machine task for pairing - service::network::protocol::PairingProtocolHandler::start_state_machine_task( - pairing_handler.clone(), - ); - - // Start cleanup task for expired sessions - service::network::protocol::PairingProtocolHandler::start_cleanup_task( - pairing_handler.clone(), - ); - - let mut messaging_handler = service::network::protocol::MessagingProtocolHandler::new(); - - // Inject context for library operations - messaging_handler.set_context(self.context.clone()); - - let mut file_transfer_handler = - service::network::protocol::FileTransferProtocolHandler::new_default(logger.clone()); - - // Inject device registry into file transfer handler for encryption - file_transfer_handler.set_device_registry(networking.device_registry()); - - let protocol_registry = networking.protocol_registry(); - { - let mut registry = protocol_registry.write().await; - registry.register_handler(pairing_handler)?; - registry.register_handler(Arc::new(messaging_handler))?; - registry.register_handler(Arc::new(file_transfer_handler))?; - logger - .info("All protocol handlers registered successfully") - .await; - } - - // Brief delay to ensure protocol handlers are fully initialized and background - // tasks have started before accepting connections. This prevents race conditions - // where incoming connections arrive before handlers are ready. - // 50ms is imperceptible to users but sufficient for async task scheduling. - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - - Ok(()) + let data_dir = self.config.read().await.data_dir.clone(); + register_default_protocol_handlers(networking, data_dir, self.context.clone()).await } /// Get the networking service (if initialized) @@ -559,6 +507,83 @@ impl Core { } } +/// Standalone helper to register default protocol handlers +/// This is used both during Core::new() and when explicitly calling init_networking() +async fn register_default_protocol_handlers( + networking: &service::network::NetworkingService, + data_dir: PathBuf, + context: Arc, +) -> Result<(), Box> { + let logger = std::sync::Arc::new(service::network::utils::logging::ConsoleLogger); + + // Get command sender for the pairing handler's state machine + let command_sender = networking + .command_sender() + .ok_or("NetworkingEventLoop command sender not available")? + .clone(); + + let pairing_handler = Arc::new( + service::network::protocol::PairingProtocolHandler::new_with_persistence( + networking.identity().clone(), + networking.device_registry(), + logger.clone(), + command_sender, + data_dir, + networking.endpoint().cloned(), + ), + ); + + // Try to load persisted sessions, but don't fail if there's an error + if let Err(e) = pairing_handler.load_persisted_sessions().await { + logger + .warn(&format!( + "Failed to load persisted pairing sessions: {}. Starting with empty sessions.", + e + )) + .await; + } + + // Start the state machine task for pairing + service::network::protocol::PairingProtocolHandler::start_state_machine_task( + pairing_handler.clone(), + ); + + // Start cleanup task for expired sessions + service::network::protocol::PairingProtocolHandler::start_cleanup_task( + pairing_handler.clone(), + ); + + let mut messaging_handler = service::network::protocol::MessagingProtocolHandler::new(); + + // Inject context for library operations + messaging_handler.set_context(context); + + let mut file_transfer_handler = + service::network::protocol::FileTransferProtocolHandler::new_default(logger.clone()); + + // Inject device registry into file transfer handler for encryption + file_transfer_handler.set_device_registry(networking.device_registry()); + + let protocol_registry = networking.protocol_registry(); + { + let mut registry = protocol_registry.write().await; + registry.register_handler(pairing_handler)?; + registry.register_handler(Arc::new(messaging_handler))?; + registry.register_handler(Arc::new(file_transfer_handler))?; + logger + .info("All protocol handlers registered successfully") + .await; + } + + // Brief delay to ensure protocol handlers are fully initialized and background + // tasks have started before accepting connections. This prevents race conditions + // where incoming connections arrive before handlers are ready. + // 50ms is imperceptible to users but sufficient for async task scheduling. + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + Ok(()) +} + /// Set up log event emitter to forward tracing events to the event bus fn setup_log_event_emitter(event_bus: Arc) { use crate::infra::event::log_emitter::LogEventLayer; diff --git a/core/src/ops/api_types.rs b/core/src/ops/api_types.rs new file mode 100644 index 000000000..466e3ebff --- /dev/null +++ b/core/src/ops/api_types.rs @@ -0,0 +1,41 @@ +//! API-safe type wrappers for Swift export +//! +//! This module provides wrappers that convert internal types to +//! Swift-exportable types automatically. + +use crate::infra::job::handle::{JobHandle, JobReceipt}; +use serde::{Deserialize, Serialize}; +use specta::Type; + +/// Wrapper that converts JobHandle to JobReceipt for API export +/// This allows library actions to keep returning JobHandle internally +/// while exporting JobReceipt to Swift +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +pub struct ApiJobHandle(pub JobReceipt); + +impl From for ApiJobHandle { + fn from(handle: JobHandle) -> Self { + Self(handle.into()) + } +} + +impl From<&JobHandle> for ApiJobHandle { + fn from(handle: &JobHandle) -> Self { + Self(handle.into()) + } +} + +/// Trait to convert internal types to API-safe types +pub trait ToApiType { + type ApiType: Type + Serialize + for<'de> Deserialize<'de>; + + fn to_api_type(self) -> Self::ApiType; +} + +impl ToApiType for JobHandle { + type ApiType = ApiJobHandle; + + fn to_api_type(self) -> Self::ApiType { + self.into() + } +} diff --git a/core/src/ops/entries/mod.rs b/core/src/ops/entries/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/core/src/ops/entries/mod.rs @@ -0,0 +1 @@ + diff --git a/core/src/ops/indexing/responder.rs b/core/src/ops/indexing/responder.rs index bd06192e7..d128fe5d2 100644 --- a/core/src/ops/indexing/responder.rs +++ b/core/src/ops/indexing/responder.rs @@ -9,6 +9,7 @@ use crate::infra::db::entities; use crate::infra::event::FsRawEventKind; use crate::ops::indexing::entry::EntryProcessor; use crate::ops::indexing::path_resolver::PathResolver; +use crate::ops::indexing::rules::{build_default_ruler, RuleToggles, RulerDecision}; use crate::ops::indexing::state::{DirEntry, IndexerState}; use crate::ops::indexing::{ctx::ResponderCtx, IndexingCtx}; use anyhow::Result; @@ -23,15 +24,23 @@ pub async fn apply( context: &Arc, library_id: Uuid, kind: FsRawEventKind, + rule_toggles: RuleToggles, + location_root: &Path, ) -> Result<()> { // Lightweight indexing context for DB access let ctx = ResponderCtx::new(context, library_id).await?; match kind { - FsRawEventKind::Create { path } => handle_create(&ctx, &path).await?, - FsRawEventKind::Modify { path } => handle_modify(&ctx, &path).await?, + FsRawEventKind::Create { path } => { + handle_create(&ctx, &path, rule_toggles, location_root).await? + } + FsRawEventKind::Modify { path } => { + handle_modify(&ctx, &path, rule_toggles, location_root).await? + } FsRawEventKind::Remove { path } => handle_remove(&ctx, &path).await?, - FsRawEventKind::Rename { from, to } => handle_rename(&ctx, &from, &to).await?, + FsRawEventKind::Rename { from, to } => { + handle_rename(&ctx, &from, &to, rule_toggles, location_root).await? + } } Ok(()) } @@ -41,6 +50,8 @@ pub async fn apply_batch( context: &Arc, library_id: Uuid, events: Vec, + rule_toggles: RuleToggles, + location_root: &Path, ) -> Result<()> { if events.is_empty() { return Ok(()); @@ -76,7 +87,7 @@ pub async fn apply_batch( // Process renames for (from, to) in renames { - if let Err(e) = handle_rename(&ctx, &from, &to).await { + if let Err(e) = handle_rename(&ctx, &from, &to, rule_toggles, location_root).await { tracing::error!( "Failed to handle rename from {} to {}: {}", from.display(), @@ -88,14 +99,14 @@ pub async fn apply_batch( // Process creates for path in creates { - if let Err(e) = handle_create(&ctx, &path).await { + if let Err(e) = handle_create(&ctx, &path, rule_toggles, location_root).await { tracing::error!("Failed to handle create for {}: {}", path.display(), e); } } // Process modifies for path in modifies { - if let Err(e) = handle_modify(&ctx, &path).await { + if let Err(e) = handle_modify(&ctx, &path, rule_toggles, location_root).await { tracing::error!("Failed to handle modify for {}: {}", path.display(), e); } } @@ -103,9 +114,61 @@ pub async fn apply_batch( Ok(()) } +/// Check if a path should be filtered based on indexing rules +async fn should_filter_path( + path: &Path, + rule_toggles: RuleToggles, + location_root: &Path, +) -> Result { + // Build ruler for this path using the same logic as the indexer + let ruler = build_default_ruler(rule_toggles, location_root, path).await; + + // Get metadata for the path + let metadata = tokio::fs::metadata(path).await?; + + // Simple metadata implementation for rule evaluation + struct SimpleMetadata { + is_dir: bool, + } + impl crate::ops::indexing::rules::MetadataForIndexerRules for SimpleMetadata { + fn is_dir(&self) -> bool { + self.is_dir + } + } + + let simple_meta = SimpleMetadata { + is_dir: metadata.is_dir(), + }; + + // Evaluate the path against the ruler + match ruler.evaluate_path(path, &simple_meta).await { + Ok(RulerDecision::Reject) => { + debug!("Filtered path by indexing rules: {}", path.display()); + Ok(true) + } + Ok(RulerDecision::Accept) => Ok(false), + Err(e) => { + tracing::warn!("Error evaluating rules for {}: {}", path.display(), e); + Ok(false) // Don't filter on error, let it through + } + } +} + /// Handle create: extract metadata and insert via EntryProcessor -async fn handle_create(ctx: &impl IndexingCtx, path: &Path) -> Result<()> { +async fn handle_create( + ctx: &impl IndexingCtx, + path: &Path, + rule_toggles: RuleToggles, + location_root: &Path, +) -> Result<()> { debug!("Create: {}", path.display()); + + // Check if path should be filtered + if should_filter_path(path, rule_toggles, location_root).await? { + debug!("Skipping filtered path: {}", path.display()); + return Ok(()); + } + let dir_entry = build_dir_entry(path).await?; // If inode matches an existing entry at another path, treat this as a move @@ -127,9 +190,20 @@ async fn handle_create(ctx: &impl IndexingCtx, path: &Path) -> Result<()> { } /// Handle modify: resolve entry ID by path, then update -async fn handle_modify(ctx: &impl IndexingCtx, path: &Path) -> Result<()> { +async fn handle_modify( + ctx: &impl IndexingCtx, + path: &Path, + rule_toggles: RuleToggles, + location_root: &Path, +) -> Result<()> { debug!("Modify: {}", path.display()); + // Check if path should be filtered + if should_filter_path(path, rule_toggles, location_root).await? { + debug!("Skipping filtered path: {}", path.display()); + return Ok(()); + } + // If inode indicates a move, handle as a move and skip update // Responder uses direct filesystem access (None backend) since it reacts to local FS events let meta = EntryProcessor::extract_metadata(path, None).await?; @@ -160,8 +234,22 @@ async fn handle_remove(ctx: &impl IndexingCtx, path: &Path) -> Result<()> { } /// Handle rename/move: resolve source entry and move via EntryProcessor -async fn handle_rename(ctx: &impl IndexingCtx, from: &Path, to: &Path) -> Result<()> { +async fn handle_rename( + ctx: &impl IndexingCtx, + from: &Path, + to: &Path, + rule_toggles: RuleToggles, + location_root: &Path, +) -> Result<()> { debug!("Rename: {} -> {}", from.display(), to.display()); + + // Check if the destination path should be filtered + // If the file is being moved to a filtered location, we should remove it from the database + if should_filter_path(to, rule_toggles, location_root).await? { + debug!("Destination path is filtered, removing entry: {}", to.display()); + // Treat this as a removal of the source file + return handle_remove(ctx, from).await; + } if let Some(entry_id) = resolve_entry_id_by_path(ctx, from).await? { debug!("Found entry {} for old path, moving to new path", entry_id); diff --git a/core/src/ops/registry.rs b/core/src/ops/registry.rs new file mode 100644 index 000000000..d36ea29a9 --- /dev/null +++ b/core/src/ops/registry.rs @@ -0,0 +1,483 @@ +//! Minimal action/query registry (action-centric) using `inventory`. +//! +//! Goals: +//! - Tiny, action-centric API: register Actions, decode their associated Inputs +//! - No conversion traits on inputs; Actions declare `type Input` and `from_input(..)` +//! - Single place that resolves `library_id` and dispatches + +use futures::future::{FutureExt, LocalBoxFuture}; +use once_cell::sync::Lazy; +use serde::de::DeserializeOwned; +use std::{collections::HashMap, sync::Arc}; +use uuid::Uuid; + +/// Registry handler for library queries - thin wrapper calling business logic +pub fn handle_library_query( + context: Arc, + session: crate::infra::api::SessionContext, + payload: serde_json::Value, +) -> std::pin::Pin< + Box> + Send + 'static>, +> +where + Q: crate::cqrs::LibraryQuery + 'static, + Q::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static, + Q::Output: serde::Serialize + std::fmt::Debug + 'static, +{ + Box::pin(async move { + // Create dispatcher + let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone()); + + // Deserialize input + let input: Q::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?; + + // Call business logic method + let output = dispatcher + .execute_library_query::(input, session) + .await + .map_err(|e| e.to_string())?; + + // Serialize output + serde_json::to_value(output).map_err(|e| e.to_string()) + }) +} + +/// Registry handler for core queries - thin wrapper calling business logic +pub fn handle_core_query( + context: Arc, + session: crate::infra::api::SessionContext, + payload: serde_json::Value, +) -> std::pin::Pin< + Box> + Send + 'static>, +> +where + Q: crate::cqrs::CoreQuery + 'static, + Q::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static, + Q::Output: serde::Serialize + std::fmt::Debug + 'static, +{ + Box::pin(async move { + // Create dispatcher + let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone()); + + // Deserialize input + let input: Q::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?; + + // Call business logic method + let output = dispatcher + .execute_core_query::(input, session) + .await + .map_err(|e| e.to_string())?; + + // Serialize output + serde_json::to_value(output).map_err(|e| e.to_string()) + }) +} + +/// Registry handler for library actions - thin wrapper calling business logic +pub fn handle_library_action( + context: Arc, + session: crate::infra::api::SessionContext, + payload: serde_json::Value, +) -> std::pin::Pin< + Box> + Send + 'static>, +> +where + A: crate::infra::action::LibraryAction + 'static, + A::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static, + A::Output: serde::Serialize + std::fmt::Debug + 'static, +{ + Box::pin(async move { + // Create dispatcher + let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone()); + + // Deserialize input + let input: A::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?; + + // Call business logic method + let output = dispatcher + .execute_library_action::(input, session) + .await + .map_err(|e| e.to_string())?; + + // Serialize output + serde_json::to_value(output).map_err(|e| e.to_string()) + }) +} + +/// Registry handler for core actions - thin wrapper calling business logic +pub fn handle_core_action( + context: Arc, + payload: serde_json::Value, +) -> std::pin::Pin< + Box> + Send + 'static>, +> +where + A: crate::infra::action::CoreAction + 'static, + A::Input: serde::de::DeserializeOwned + std::fmt::Debug + 'static, + A::Output: serde::Serialize + std::fmt::Debug + 'static, +{ + Box::pin(async move { + // Create dispatcher + let dispatcher = crate::infra::api::dispatcher::ApiDispatcher::new(context.clone()); + + // Create base session + let session = dispatcher + .create_base_session() + .map_err(|e| e.to_string())?; + + // Deserialize input + let input: A::Input = serde_json::from_value(payload).map_err(|e| e.to_string())?; + + // Call business logic method + let output = dispatcher + .execute_core_action::(input, session) + .await + .map_err(|e| e.to_string())?; + + // Serialize output + serde_json::to_value(output).map_err(|e| e.to_string()) + }) +} + +/// Handler function signature for library queries. +pub type LibraryQueryHandlerFn = fn( + Arc, + crate::infra::api::SessionContext, // session with library context + serde_json::Value, // payload with Q::Input as JSON +) -> std::pin::Pin< + Box> + Send + 'static>, +>; + +/// Handler function signature for core queries. +pub type CoreQueryHandlerFn = fn( + Arc, + crate::infra::api::SessionContext, // session context + serde_json::Value, // payload with Q::Input as JSON +) -> std::pin::Pin< + Box> + Send + 'static>, +>; + +/// Handler function signature for library actions. +pub type LibraryActionHandlerFn = fn( + Arc, + crate::infra::api::SessionContext, // session with library context + serde_json::Value, // payload with A::Input as JSON +) -> std::pin::Pin< + Box> + Send + 'static>, +>; + +/// Handler function signature for core actions. +pub type CoreActionHandlerFn = fn( + Arc, + serde_json::Value, // payload with A::Input as JSON +) -> std::pin::Pin< + Box> + Send + 'static>, +>; + +/// Registry entry for a library query operation. +pub struct LibraryQueryEntry { + pub method: &'static str, + pub handler: LibraryQueryHandlerFn, +} + +/// Registry entry for a core query operation. +pub struct CoreQueryEntry { + pub method: &'static str, + pub handler: CoreQueryHandlerFn, +} + +/// Registry entry for a library action operation. +pub struct LibraryActionEntry { + pub method: &'static str, + pub handler: LibraryActionHandlerFn, +} + +/// Registry entry for a core action operation. +pub struct CoreActionEntry { + pub method: &'static str, + pub handler: CoreActionHandlerFn, +} + +inventory::collect!(LibraryQueryEntry); +inventory::collect!(CoreQueryEntry); +inventory::collect!(LibraryActionEntry); +inventory::collect!(CoreActionEntry); + +pub static LIBRARY_QUERIES: Lazy> = Lazy::new(|| { + let mut map = HashMap::new(); + for entry in inventory::iter::() { + map.insert(entry.method, entry.handler); + } + map +}); + +pub static CORE_QUERIES: Lazy> = Lazy::new(|| { + let mut map = HashMap::new(); + for entry in inventory::iter::() { + map.insert(entry.method, entry.handler); + } + map +}); + +pub static LIBRARY_ACTIONS: Lazy> = Lazy::new(|| { + let mut map = HashMap::new(); + for entry in inventory::iter::() { + map.insert(entry.method, entry.handler); + } + map +}); + +pub static CORE_ACTIONS: Lazy> = Lazy::new(|| { + let mut map = HashMap::new(); + for entry in inventory::iter::() { + map.insert(entry.method, entry.handler); + } + map +}); + +#[cfg(test)] +mod tests { + #[test] + fn list_registered_ops() { + // Collect and display registered actions + let mut action_methods: Vec<&'static str> = + crate::ops::registry::CORE_ACTIONS.keys().cloned().collect(); + action_methods.sort(); + println!("Registered actions ({}):", action_methods.len()); + for method in &action_methods { + println!(" {}", method); + } + + let mut library_action_methods: Vec<&'static str> = crate::ops::registry::LIBRARY_ACTIONS + .keys() + .cloned() + .collect(); + library_action_methods.sort(); + println!( + "Registered library actions ({}):", + library_action_methods.len() + ); + for method in &library_action_methods { + println!(" {}", method); + } + + // Collect and display registered queries + let mut query_methods: Vec<&'static str> = + crate::ops::registry::CORE_QUERIES.keys().cloned().collect(); + query_methods.sort(); + println!("Registered queries ({}):", query_methods.len()); + for method in &query_methods { + println!(" {}", method); + } + + let mut library_query_methods: Vec<&'static str> = crate::ops::registry::LIBRARY_QUERIES + .keys() + .cloned() + .collect(); + library_query_methods.sort(); + println!( + "Registered library queries ({}):", + library_query_methods.len() + ); + for method in &library_query_methods { + println!(" {}", method); + } + + // Ensure we have at least one action or query registered + assert!( + !action_methods.is_empty() + || !query_methods.is_empty() + || !library_action_methods.is_empty() + || !library_query_methods.is_empty(), + "No actions or queries registered" + ); + } +} + +/// Helper: construct action method string from a short name like "files.copy" +#[macro_export] +macro_rules! action_method { + ($name:literal) => { + concat!("action:", $name, ".input.v1") + }; +} + +/// Helper: construct query method string from a short name like "core.status" +#[macro_export] +macro_rules! query_method { + ($name:literal) => { + concat!("query:", $name, ".v1") + }; +} + +/// Register a library query `Q` by short name; binds method to `Q::Input` and handler to `handle_library_query::`. +/// Implements QueryTypeInfo trait for automatic type extraction +#[macro_export] +macro_rules! register_library_query { + ($query:ty, $name:literal) => { + impl $crate::client::Wire for <$query as $crate::cqrs::LibraryQuery>::Input { + const METHOD: &'static str = $crate::query_method!($name); + } + inventory::submit! { + $crate::ops::registry::LibraryQueryEntry { + method: <<$query as $crate::cqrs::LibraryQuery>::Input as $crate::client::Wire>::METHOD, + handler: $crate::ops::registry::handle_library_query::<$query>, + } + } + + // Automatic QueryTypeInfo implementation for type extraction + impl $crate::ops::type_extraction::QueryTypeInfo for $query { + type Input = <$query as $crate::cqrs::LibraryQuery>::Input; + type Output = <$query as $crate::cqrs::LibraryQuery>::Output; + + fn identifier() -> &'static str { + $name + } + + fn scope() -> $crate::ops::type_extraction::QueryScope { + $crate::ops::type_extraction::QueryScope::Library + } + + fn wire_method() -> String { + $crate::query_method!($name).to_string() + } + } + + // Submit query type extractor to inventory + inventory::submit! { + $crate::ops::type_extraction::QueryExtractorEntry { + extractor: <$query as $crate::ops::type_extraction::QueryTypeInfo>::extract_types, + identifier: $name, + } + } + }; +} + +/// Register a core query `Q` by short name; binds method to `Q::Input` and handler to `handle_core_query::`. +/// Implements QueryTypeInfo trait for automatic type extraction +#[macro_export] +macro_rules! register_core_query { + ($query:ty, $name:literal) => { + impl $crate::client::Wire for <$query as $crate::cqrs::CoreQuery>::Input { + const METHOD: &'static str = $crate::query_method!($name); + } + inventory::submit! { + $crate::ops::registry::CoreQueryEntry { + method: <<$query as $crate::cqrs::CoreQuery>::Input as $crate::client::Wire>::METHOD, + handler: $crate::ops::registry::handle_core_query::<$query>, + } + } + + // Automatic QueryTypeInfo implementation for type extraction + impl $crate::ops::type_extraction::QueryTypeInfo for $query { + type Input = <$query as $crate::cqrs::CoreQuery>::Input; + type Output = <$query as $crate::cqrs::CoreQuery>::Output; + + fn identifier() -> &'static str { + $name + } + + fn scope() -> $crate::ops::type_extraction::QueryScope { + $crate::ops::type_extraction::QueryScope::Core + } + + fn wire_method() -> String { + $crate::query_method!($name).to_string() + } + } + + // Submit query type extractor to inventory + inventory::submit! { + $crate::ops::type_extraction::QueryExtractorEntry { + extractor: <$query as $crate::ops::type_extraction::QueryTypeInfo>::extract_types, + identifier: $name, + } + } + }; +} + +/// Register a library action `A` by short name; binds method to `A::Input` and handler to `handle_library_action::`. +/// Implements OperationTypeInfo trait for automatic type extraction +#[macro_export] +macro_rules! register_library_action { + ($action:ty, $name:literal) => { + impl $crate::client::Wire for <$action as $crate::infra::action::LibraryAction>::Input { + const METHOD: &'static str = $crate::action_method!($name); + } + inventory::submit! { + $crate::ops::registry::LibraryActionEntry { + method: <<$action as $crate::infra::action::LibraryAction>::Input as $crate::client::Wire>::METHOD, + handler: $crate::ops::registry::handle_library_action::<$action>, + } + } + + // Automatic OperationTypeInfo implementation for type extraction + impl $crate::ops::type_extraction::OperationTypeInfo for $action { + type Input = <$action as $crate::infra::action::LibraryAction>::Input; + type Output = <$action as $crate::infra::action::LibraryAction>::Output; + + fn identifier() -> &'static str { + $name + } + + fn scope() -> $crate::ops::type_extraction::OperationScope { + $crate::ops::type_extraction::OperationScope::Library + } + + fn wire_method() -> String { + $crate::action_method!($name).to_string() + } + } + + // Submit type extractor to inventory for compile-time collection + inventory::submit! { + $crate::ops::type_extraction::TypeExtractorEntry { + extractor: <$action as $crate::ops::type_extraction::OperationTypeInfo>::extract_types, + identifier: $name, + } + } + }; +} + +/// Register a core action `A` similarly. +/// Implements OperationTypeInfo trait for automatic type extraction +#[macro_export] +macro_rules! register_core_action { + ($action:ty, $name:literal) => { + impl $crate::client::Wire for <$action as $crate::infra::action::CoreAction>::Input { + const METHOD: &'static str = $crate::action_method!($name); + } + inventory::submit! { + $crate::ops::registry::CoreActionEntry { + method: <<$action as $crate::infra::action::CoreAction>::Input as $crate::client::Wire>::METHOD, + handler: $crate::ops::registry::handle_core_action::<$action>, + } + } + + // Automatic OperationTypeInfo implementation for core actions + impl $crate::ops::type_extraction::OperationTypeInfo for $action { + type Input = <$action as $crate::infra::action::CoreAction>::Input; + type Output = <$action as $crate::infra::action::CoreAction>::Output; + + fn identifier() -> &'static str { + $name + } + + fn scope() -> $crate::ops::type_extraction::OperationScope { + $crate::ops::type_extraction::OperationScope::Core + } + + fn wire_method() -> String { + $crate::action_method!($name).to_string() + } + } + + // Submit type extractor to inventory for compile-time collection + inventory::submit! { + $crate::ops::type_extraction::TypeExtractorEntry { + extractor: <$action as $crate::ops::type_extraction::OperationTypeInfo>::extract_types, + identifier: $name, + } + } + }; +} diff --git a/core/src/ops/test_type_extraction.rs b/core/src/ops/test_type_extraction.rs new file mode 100644 index 000000000..9069793b4 --- /dev/null +++ b/core/src/ops/test_type_extraction.rs @@ -0,0 +1,33 @@ +//! Test the type extraction system with just the types that have derives + +use crate::ops::type_extraction::*; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_working_operations() { + let (operations, queries, collection) = generate_spacedrive_api(); + + println!( + "RSPC Magic: Discovered {} operations and {} queries", + operations.len(), + queries.len() + ); + println!("Type collection has {} types", collection.len()); + + // Show discovered operations + for op in operations.iter() { + println!(" Operation: {} -> {}", op.identifier, op.wire_method); + } + + for query in queries.iter() { + println!(" Query: {} -> {}", query.identifier, query.wire_method); + } + + if !operations.is_empty() { + println!("RSPC-inspired type extraction is working!"); + } + } +} diff --git a/core/src/ops/type_extraction.rs b/core/src/ops/type_extraction.rs new file mode 100644 index 000000000..a4e0fd3c9 --- /dev/null +++ b/core/src/ops/type_extraction.rs @@ -0,0 +1,697 @@ +//! rspc-inspired trait-based type extraction for automatic API generation +//! +//! This module implements the core trait system that allows automatic discovery +//! and extraction of Input/Output types from registered operations at compile-time. + +use serde::{de::DeserializeOwned, Serialize}; +use specta::{DataType, Type, TypeCollection}; + +/// Operation scope - automatically determined by registration macro +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OperationScope { + Core, + Library, +} + +/// Query scope - automatically determined by registration macro +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum QueryScope { + Core, + Library, +} + +/// Core trait that provides compile-time type information for operations +/// +/// This is inspired by rspc's resolver trait system and enables automatic +/// type extraction without runtime iteration over inventory data. +pub trait OperationTypeInfo { + /// The input type for this operation + type Input: Type + Serialize + DeserializeOwned + 'static; + + /// The output type for this operation + type Output: Type + Serialize + DeserializeOwned + 'static; + + /// The operation identifier (e.g., "files.copy", "libraries.create") + fn identifier() -> &'static str; + + /// The operation scope (Core or Library) - automatically determined by registration macro + fn scope() -> OperationScope; + + /// Generate the wire method string for this operation + fn wire_method() -> String; + + /// Extract type metadata and register with Specta's TypeCollection + /// This is the key method that enables automatic type discovery + fn extract_types(collection: &mut TypeCollection) -> OperationMetadata { + // Register the types with Specta and get their DataType definitions + let input_type = Self::Input::definition(collection); + let output_type = Self::Output::definition(collection); + + let input_type_name = extract_type_name(std::any::type_name::()); + let output_type_name = extract_type_name(std::any::type_name::()); + + // Debug output for type names + if Self::identifier() == "jobs.info" { + println!( + "DEBUG: jobs.info input type: {} -> {}", + std::any::type_name::(), + input_type_name + ); + println!( + "DEBUG: jobs.info output type: {} -> {}", + std::any::type_name::(), + output_type_name + ); + } + if Self::identifier() == "jobs.list" { + println!( + "DEBUG: jobs.list input type: {} -> {}", + std::any::type_name::(), + input_type_name + ); + println!( + "DEBUG: jobs.list output type: {} -> {}", + std::any::type_name::(), + output_type_name + ); + } + + OperationMetadata { + identifier: Self::identifier(), + wire_method: Self::wire_method(), + input_type, + output_type, + input_type_name, + output_type_name, + scope: Self::scope(), + } + } +} + +/// Similar trait for query operations +pub trait QueryTypeInfo { + /// Query input type (often () for queries with no parameters) + type Input: Type + Serialize + DeserializeOwned + 'static; + + /// Query output type + type Output: Type + Serialize + DeserializeOwned + 'static; + + /// Query identifier (e.g., "jobs.list", "libraries.list") + fn identifier() -> &'static str; + + /// The query scope (Core or Library) - automatically determined by registration macro + fn scope() -> QueryScope; + + /// Generate wire method for queries + fn wire_method() -> String; + + /// Extract query type metadata + fn extract_types(collection: &mut TypeCollection) -> QueryMetadata { + // Register the types with Specta and get their DataType definitions + let input_type = Self::Input::definition(collection); + let output_type = Self::Output::definition(collection); + + QueryMetadata { + identifier: Self::identifier(), + wire_method: Self::wire_method(), + input_type, + output_type, + input_type_name: extract_type_name(std::any::type_name::()), + output_type_name: extract_type_name(std::any::type_name::()), + scope: Self::scope(), + } + } +} + +/// Metadata extracted from an operation +#[derive(Debug, Clone)] +pub struct OperationMetadata { + pub identifier: &'static str, + pub wire_method: String, + pub input_type: DataType, + pub output_type: DataType, + pub input_type_name: String, + pub output_type_name: String, + pub scope: OperationScope, +} + +/// Metadata extracted from a query +#[derive(Debug, Clone)] +pub struct QueryMetadata { + pub identifier: &'static str, + pub wire_method: String, + pub input_type: DataType, + pub output_type: DataType, + pub input_type_name: String, + pub output_type_name: String, + pub scope: QueryScope, +} + +/// Entry for collecting type extractors via inventory +/// This is the key that makes compile-time collection possible +pub struct TypeExtractorEntry { + /// Function that extracts operation metadata and registers types + pub extractor: fn(&mut TypeCollection) -> OperationMetadata, + pub identifier: &'static str, +} + +/// Entry for collecting query type extractors +pub struct QueryExtractorEntry { + /// Function that extracts query metadata and registers types + pub extractor: fn(&mut TypeCollection) -> QueryMetadata, + pub identifier: &'static str, +} + +// Collect type extractors via inventory - this enables compile-time discovery +inventory::collect!(TypeExtractorEntry); +inventory::collect!(QueryExtractorEntry); + +/// Generate complete API metadata by running all collected type extractors +/// +/// This is the rspc-inspired magic: we iterate over compile-time registered +/// extractors rather than runtime data, solving the timeline problem. +pub fn generate_spacedrive_api() -> (Vec, Vec, TypeCollection) { + let mut collection = TypeCollection::default(); + let mut operations = Vec::new(); + let mut queries = Vec::new(); + + // Extract all operations - this works because extractors are registered at compile-time + for entry in inventory::iter::() { + let metadata = (entry.extractor)(&mut collection); + operations.push(metadata); + } + + // Extract all queries + for entry in inventory::iter::() { + let metadata = (entry.extractor)(&mut collection); + queries.push(metadata); + } + + // Register event types in the same collection to avoid duplicates + collection.register_mut::(); + collection.register_mut::(); + collection.register_mut::(); + + (operations, queries, collection) +} + +/// Generate the complete Spacedrive API structure as a Specta-compatible type +/// +/// This creates a runtime representation of our API structure that Specta can export. +/// Similar to rspc's approach with TypesOrType, but tailored for Spacedrive's needs. +pub fn create_spacedrive_api_structure( + operations: &[OperationMetadata], + queries: &[QueryMetadata], +) -> SpacedriveApiStructure { + let mut core_actions = Vec::new(); + let mut library_actions = Vec::new(); + let mut core_queries = Vec::new(); + let mut library_queries = Vec::new(); + + // Group operations by scope - preserve the actual DataType objects! + for op in operations { + match op.scope { + OperationScope::Core => { + core_actions.push(ApiOperationType { + identifier: op.identifier.to_string(), + wire_method: op.wire_method.clone(), + input_type: op.input_type.clone(), + output_type: op.output_type.clone(), + input_type_name: op.input_type_name.clone(), + output_type_name: op.output_type_name.clone(), + }); + } + OperationScope::Library => { + library_actions.push(ApiOperationType { + identifier: op.identifier.to_string(), + wire_method: op.wire_method.clone(), + input_type: op.input_type.clone(), + output_type: op.output_type.clone(), + input_type_name: op.input_type_name.clone(), + output_type_name: op.output_type_name.clone(), + }); + } + } + } + + // Group queries by scope - preserve the actual DataType objects! + for query in queries { + match query.scope { + QueryScope::Core => { + core_queries.push(ApiQueryType { + identifier: query.identifier.to_string(), + wire_method: query.wire_method.clone(), + input_type: query.input_type.clone(), + output_type: query.output_type.clone(), + input_type_name: query.input_type_name.clone(), + output_type_name: query.output_type_name.clone(), + }); + } + QueryScope::Library => { + library_queries.push(ApiQueryType { + identifier: query.identifier.to_string(), + wire_method: query.wire_method.clone(), + input_type: query.input_type.clone(), + output_type: query.output_type.clone(), + input_type_name: query.input_type_name.clone(), + output_type_name: query.output_type_name.clone(), + }); + } + } + } + + SpacedriveApiStructure { + core_actions, + library_actions, + core_queries, + library_queries, + } +} + +/// Represents the complete Spacedrive API structure for code generation +pub struct SpacedriveApiStructure { + pub core_actions: Vec, + pub library_actions: Vec, + pub core_queries: Vec, + pub library_queries: Vec, +} + +/// Represents a single API operation with actual type information +pub struct ApiOperationType { + pub identifier: String, + pub wire_method: String, + pub input_type: specta::datatype::DataType, + pub output_type: specta::datatype::DataType, + pub input_type_name: String, + pub output_type_name: String, +} + +/// Represents a single API query with actual type information +pub struct ApiQueryType { + pub identifier: String, + pub wire_method: String, + pub input_type: specta::datatype::DataType, + pub output_type: specta::datatype::DataType, + pub input_type_name: String, + pub output_type_name: String, +} + +/// Intermediate struct to hold API function metadata for Swift code generation +/// This is used to organize operations and queries into namespaces and methods +#[derive(Debug, Clone)] +pub struct ApiFunction { + /// The namespace this function belongs to (e.g., "core", "libraries", "jobs") + pub namespace: String, + /// The method name within the namespace (e.g., "create", "list", "start") + pub method_name: String, + /// The full identifier (e.g., "libraries.create", "jobs.list") + pub identifier: String, + /// The wire method string (e.g., "action:libraries.create.input.v1") + pub wire_method: String, + /// Whether this is an action (true) or query (false) + pub is_action: bool, + /// The scope (Core or Library) + pub scope: String, + /// Input type name for Swift generation + pub input_type_name: String, + /// Output type name for Swift generation + pub output_type_name: String, +} + +/// Extract API functions from the collected metadata +/// This organizes operations and queries into a flat list of functions with namespace information +pub fn extract_api_functions( + operations: &[OperationMetadata], + queries: &[QueryMetadata], +) -> Vec { + let mut functions = Vec::new(); + + // Process operations (actions) + for op in operations { + let namespace = extract_namespace(&op.identifier); + let method_name = extract_method_name(&op.identifier); + let scope = match op.scope { + OperationScope::Core => "Core", + OperationScope::Library => "Library", + }; + + functions.push(ApiFunction { + namespace, + method_name, + identifier: op.identifier.to_string(), + wire_method: op.wire_method.clone(), + is_action: true, + scope: scope.to_string(), + input_type_name: op.input_type_name.clone(), + output_type_name: op.output_type_name.clone(), + }); + } + + // Process queries + for query in queries { + let namespace = extract_namespace(&query.identifier); + let method_name = extract_method_name(&query.identifier); + let scope = match query.scope { + QueryScope::Core => "Core", + QueryScope::Library => "Library", + }; + + functions.push(ApiFunction { + namespace, + method_name, + identifier: query.identifier.to_string(), + wire_method: query.wire_method.clone(), + is_action: false, + scope: scope.to_string(), + input_type_name: query.input_type_name.clone(), + output_type_name: query.output_type_name.clone(), + }); + } + + functions +} + +/// Extract namespace from identifier (e.g., "libraries.create" -> "libraries") +fn extract_namespace(identifier: &str) -> String { + identifier.split('.').next().unwrap_or("core").to_string() +} + +/// Extract method name from identifier (e.g., "libraries.create" -> "create") +fn extract_method_name(identifier: &str) -> String { + identifier.split('.').skip(1).collect::>().join("_") +} + +/// Convert snake_case to PascalCase for Swift type names +fn to_pascal_case(s: &str) -> String { + s.split(&['.', '_'][..]) + .map(|word| { + let mut chars = word.chars(); + match chars.next() { + None => String::new(), + Some(first) => first.to_uppercase().collect::() + &chars.as_str(), + } + }) + .collect::>() + .join("") +} + +/// Extract just the type name from a full Rust type path +fn extract_type_name(full_type_name: &str) -> String { + // Handle unit type () - use Empty struct for Swift + if full_type_name == "()" { + let result = "Empty".to_string(); + if full_type_name.contains("()") { + println!( + "DEBUG: Unit case: '{}' -> result: '{}'", + full_type_name, result + ); + } + return result; + } + + // Handle generic types like Option, Vec, etc. + // For Option, we want just T + if full_type_name.contains("Option<") && full_type_name.ends_with(">") { + // Find the content inside Option<...> + let start = full_type_name.find("Option<").unwrap() + 7; // Skip "Option<" + let end = full_type_name.rfind(">").unwrap(); + let inner = &full_type_name[start..end]; + let result = extract_type_name(inner); // Recursively extract the inner type + if full_type_name.contains("JobInfo") + || full_type_name.contains("Vec") + || full_type_name.contains("()") + { + println!( + "DEBUG: Option case: '{}' -> inner: '{}' -> result: '{}'", + full_type_name, inner, result + ); + } + return result; + } + + // Handle Vec - we want to keep Vec but with proper Swift syntax + if full_type_name.contains("Vec<") && full_type_name.ends_with(">") { + // Find the content inside Vec<...> + let start = full_type_name.find("Vec<").unwrap() + 4; // Skip "Vec<" + let end = full_type_name.rfind(">").unwrap(); + let inner = &full_type_name[start..end]; + let inner_type = extract_type_name(inner); // Recursively extract the inner type + let result = format!("[{}]", inner_type); // Convert to Swift array syntax + if full_type_name.contains("JobInfo") + || full_type_name.contains("Vec") + || full_type_name.contains("()") + { + println!( + "DEBUG: Vec case: '{}' -> inner: '{}' -> inner_type: '{}' -> result: '{}'", + full_type_name, inner, inner_type, result + ); + } + return result; + } + + // For other generic types, just return the base name + if full_type_name.contains('<') { + let base_name = full_type_name.split('<').next().unwrap_or(full_type_name); + let result = base_name.to_string(); + if full_type_name.contains("JobInfo") + || full_type_name.contains("Vec") + || full_type_name.contains("()") + { + println!( + "DEBUG: Generic case: '{}' -> base_name: '{}' -> result: '{}'", + full_type_name, base_name, result + ); + } + return result; + } + + // For simple types, extract just the type name from the path + let type_name = full_type_name.split("::").last().unwrap_or(full_type_name); + + let result = type_name.to_string(); + if full_type_name.contains("JobInfo") + || full_type_name.contains("Vec") + || full_type_name.contains("()") + { + println!( + "DEBUG: Simple case: '{}' -> type_name: '{}' -> result: '{}'", + full_type_name, type_name, result + ); + } + result +} + +/// Convert snake_case to camelCase for Swift method names +fn to_camel_case(s: &str) -> String { + let mut words = s.split('_'); + let first_word = words.next().unwrap_or(""); + let rest_words: String = words + .map(|word| { + let mut chars = word.chars(); + match chars.next() { + None => String::new(), + Some(first) => first.to_uppercase().collect::() + &chars.as_str(), + } + }) + .collect(); + first_word.to_lowercase() + &rest_words +} + +/// Generate Swift code for API namespace structs and their methods +pub fn generate_swift_api_code(functions: &[ApiFunction]) -> String { + let mut swift_code = String::new(); + + // Add import statement for Foundation (needed for async/await) + swift_code.push_str("import Foundation\n\n"); + + // Group functions by namespace + let mut namespaces: std::collections::HashMap> = + std::collections::HashMap::new(); + for func in functions { + namespaces + .entry(func.namespace.clone()) + .or_default() + .push(func); + } + + // Generate code for each namespace + for (namespace, funcs) in namespaces { + let namespace_struct_name = format!("{}API", to_pascal_case(&namespace)); + + swift_code.push_str(&format!("/// {} operations\n", to_pascal_case(&namespace))); + swift_code.push_str(&format!("public struct {} {{\n", namespace_struct_name)); + swift_code.push_str(" private let client: SpacedriveClient\n"); + swift_code.push_str("\n"); + swift_code.push_str(" init(client: SpacedriveClient) {\n"); + swift_code.push_str(" self.client = client\n"); + swift_code.push_str(" }\n"); + swift_code.push_str("\n"); + + // Generate methods for each function in this namespace + for func in funcs { + swift_code.push_str(&generate_swift_method(func)); + swift_code.push_str("\n"); + } + + swift_code.push_str("}\n\n"); + } + + swift_code +} + +/// Generate Swift method code for a single API function +fn generate_swift_method(func: &ApiFunction) -> String { + let method_name = to_camel_case(&func.method_name); + let input_type = &func.input_type_name; + let output_type = &func.output_type_name; + let wire_method = &func.wire_method; + + // Determine if this is an action or query for documentation + let operation_type = if func.is_action { "action" } else { "query" }; + + let mut method_code = String::new(); + + // Add documentation comment + method_code.push_str(&format!( + " /// Execute {}: {}\n", + operation_type, func.identifier + )); + + // Generate method signature + if input_type == "EmptyInput" { + // For operations with no input, use Empty struct + method_code.push_str(&format!( + " public func {}() async throws -> {} {{\n", + method_name, output_type + )); + method_code.push_str(" let input = Empty()\n"); + } else { + // For operations with input, take the input as parameter + method_code.push_str(&format!( + " public func {}(_ input: {}) async throws -> {} {{\n", + method_name, input_type, output_type + )); + } + + // Generate method body + method_code.push_str(&format!(" return try await client.execute(\n")); + method_code.push_str(" input,\n"); + method_code.push_str(&format!(" method: \"{}\",\n", wire_method)); + method_code.push_str(&format!(" responseType: {}.self\n", output_type)); + method_code.push_str(" )\n"); + method_code.push_str(" }\n"); + + method_code +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_type_extraction_system() { + let (operations, queries, collection) = generate_spacedrive_api(); + + println!( + "Discovered {} operations and {} queries", + operations.len(), + queries.len() + ); + println!("Type collection has {} types", collection.len()); + + // Should have some operations if the system is working + if !operations.is_empty() { + println!("Type extraction system is working!"); + + // Show some examples with scope information + for op in operations.iter().take(3) { + println!( + " Operation: {} -> wire: {} -> scope: {:?}", + op.identifier, op.wire_method, op.scope + ); + } + } + + if !queries.is_empty() { + for query in queries.iter().take(3) { + println!( + " Query: {} -> wire: {} -> scope: {:?}", + query.identifier, query.wire_method, query.scope + ); + } + } + } + + #[test] + fn test_api_functions_extraction() { + let (operations, queries, _collection) = generate_spacedrive_api(); + let functions = extract_api_functions(&operations, &queries); + + println!("Extracted {} API functions", functions.len()); + + // Group functions by namespace to show organization + let mut namespaces: std::collections::HashMap> = + std::collections::HashMap::new(); + for func in &functions { + namespaces + .entry(func.namespace.clone()) + .or_default() + .push(func); + } + + for (namespace, funcs) in namespaces { + println!("Namespace '{}': {} functions", namespace, funcs.len()); + for func in funcs.iter().take(3) { + println!( + " {}: {} -> {} ({})", + func.method_name, + func.input_type_name, + func.output_type_name, + if func.is_action { "action" } else { "query" } + ); + } + } + + // Verify some basic properties + assert!( + !functions.is_empty(), + "Should have extracted some API functions" + ); + + // Check that namespaces are properly extracted + let has_libraries = functions.iter().any(|f| f.namespace == "libraries"); + let has_jobs = functions.iter().any(|f| f.namespace == "jobs"); + println!("Found libraries namespace: {}", has_libraries); + println!("Found jobs namespace: {}", has_jobs); + } + + #[test] + fn test_swift_code_generation() { + let (operations, queries, _collection) = generate_spacedrive_api(); + let functions = extract_api_functions(&operations, &queries); + let swift_code = generate_swift_api_code(&functions); + + println!("Generated Swift code (first 1000 chars):"); + println!("{}", &swift_code[..swift_code.len().min(1000)]); + + // Verify basic structure + assert!(swift_code.contains("public struct LibrariesAPI")); + assert!(swift_code.contains("public struct JobsAPI")); + assert!(swift_code.contains("public struct NetworkAPI")); + + // Verify method generation + assert!(swift_code.contains("public func create(")); + assert!(swift_code.contains("public func list(")); + assert!(swift_code.contains("public func start(")); + + // Verify method calls to client.execute + assert!(swift_code.contains("client.execute(")); + assert!(swift_code.contains("responseType:")); + + // Verify wire method strings are included + assert!(swift_code.contains("action:libraries.create.input.v1")); + assert!(swift_code.contains("query:jobs.list.v1")); + + println!("Swift code generation test passed!"); + } +} diff --git a/core/src/service/network/core/mod.rs b/core/src/service/network/core/mod.rs index 425fbea09..34531ab24 100644 --- a/core/src/service/network/core/mod.rs +++ b/core/src/service/network/core/mod.rs @@ -404,9 +404,6 @@ impl NetworkingService { conn: conn.clone(), }); - // Connection established - don't open streams immediately - // Let connections be idle until actually needed for data transfer - // This prevents connection closure after initial ping/pong logger .info(&format!("Connection established to device {}", device_id)) .await; @@ -416,6 +413,35 @@ impl NetworkingService { device_id, node_id, }); + + // Open a hello stream to keep the connection alive + // This prevents idle timeout and signals to the receiver that the connection is ready + logger + .debug("Opening hello stream to keep connection alive...") + .await; + + let conn_for_hello = conn.clone(); + let logger_clone = logger.clone(); + tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + match conn_for_hello.open_bi().await { + Ok((mut send, _recv)) => { + // Send a simple hello message + let hello_msg = b"HELLO"; + let _ = send.write_all(hello_msg).await; + let _ = send.finish(); + logger_clone.debug("Hello stream sent successfully").await; + } + Err(e) => { + // Log error but don't fail - connection is still tracked + logger_clone + .warn(&format!("Failed to open hello stream: {}", e)) + .await; + } + } + }); + break; } Err(e) => { @@ -1239,7 +1265,7 @@ impl NetworkingService { relay_url, ); - // CRITICAL: Use the session_id derived from the pairing code, not the random seed + // The session_id derived from the pairing code // This ensures both initiator and joiner derive the same session_id from the BIP39 words let session_id = pairing_code.session_id(); @@ -1248,16 +1274,7 @@ impl NetworkingService { .start_pairing_session_with_id(session_id, pairing_code.clone()) .await?; - // Register in device registry - let initiator_device_id = self.device_id(); - let initiator_node_id = self.node_id(); - let device_registry = self.device_registry(); - { - let mut registry = device_registry.write().await; - registry.start_pairing(initiator_device_id, initiator_node_id, session_id)?; - } - - // Get our node address for advertising + // Get our node address first (needed for device registry) let mut node_addr = self.get_node_addr()?; // If we don't have any direct addresses yet, wait a bit for them to be discovered @@ -1321,6 +1338,17 @@ impl NetworkingService { )) .await; + // Register in device registry with the node address + let initiator_device_id = self.device_id(); + let initiator_node_id = self.node_id(); + let device_registry = self.device_registry(); + { + let mut registry = device_registry.write().await; + // Use node_addr or create an empty one if not available + let addr_for_registry = node_addr.clone().unwrap_or(NodeAddr::new(initiator_node_id)); + registry.start_pairing(initiator_device_id, initiator_node_id, session_id, addr_for_registry)?; + } + // Publish pairing session via mDNS using user_data field // The joiner will filter discovered nodes by this session_id if let Some(endpoint) = &self.endpoint { @@ -1482,9 +1510,7 @@ impl NetworkingService { // Text-based pairing code: only use mDNS (local network only) match self.try_mdns_discovery(session_id, force_relay).await { Ok(()) => { - self.logger - .info("Connected via mDNS (local network)") - .await; + self.logger.info("Connected via mDNS (local network)").await; Ok(()) } Err(e) => { @@ -1630,6 +1656,7 @@ impl NetworkingService { app_version: env!("CARGO_PKG_VERSION").to_string(), network_fingerprint: self.identity().network_fingerprint(), last_seen: chrono::Utc::now(), + direct_addresses: vec![], } }) }; @@ -1790,6 +1817,7 @@ impl NetworkingService { app_version: env!("CARGO_PKG_VERSION").to_string(), network_fingerprint: self.identity().network_fingerprint(), last_seen: chrono::Utc::now(), + direct_addresses: vec![], } }) }; diff --git a/core/src/service/network/device/mod.rs b/core/src/service/network/device/mod.rs index 8c721cc12..a0da7576f 100644 --- a/core/src/service/network/device/mod.rs +++ b/core/src/service/network/device/mod.rs @@ -32,6 +32,7 @@ pub struct DeviceInfo { pub app_version: String, pub network_fingerprint: crate::service::network::utils::identity::NetworkFingerprint, pub last_seen: DateTime, + pub direct_addresses: Vec, } /// Type of device @@ -63,6 +64,7 @@ pub enum DeviceState { Pairing { node_id: NodeId, session_id: Uuid, + node_addr: NodeAddr, started_at: DateTime, }, /// Device successfully paired but not currently connected diff --git a/core/src/service/network/device/persistence.rs b/core/src/service/network/device/persistence.rs index 31f732bbd..890abb1ea 100644 --- a/core/src/service/network/device/persistence.rs +++ b/core/src/service/network/device/persistence.rs @@ -429,6 +429,7 @@ mod tests { public_key_hash: "test_hash".to_string(), }, last_seen: Utc::now(), + direct_addresses: vec![], } } diff --git a/core/src/service/network/device/registry.rs b/core/src/service/network/device/registry.rs index ad67415e7..6697412f4 100644 --- a/core/src/service/network/device/registry.rs +++ b/core/src/service/network/device/registry.rs @@ -119,10 +119,12 @@ impl DeviceRegistry { device_id: Uuid, node_id: NodeId, session_id: Uuid, + node_addr: NodeAddr, ) -> Result<()> { let state = DeviceState::Pairing { node_id, session_id, + node_addr, started_at: Utc::now(), }; @@ -174,6 +176,13 @@ impl DeviceRegistry { .map(|addr| addr.to_string()) .collect() } + Some(DeviceState::Pairing { node_addr, .. }) => { + // Extract addresses from the active pairing connection + node_addr + .direct_addresses() + .map(|addr| addr.to_string()) + .collect() + } Some(DeviceState::Connected { connection, .. }) => { // If somehow already connected, use those addresses connection.addresses.clone() @@ -477,6 +486,7 @@ impl DeviceRegistry { public_key_hash: "placeholder".to_string(), }, last_seen: Utc::now(), + direct_addresses: vec![], }) } diff --git a/core/src/service/network/protocol/pairing/initiator.rs b/core/src/service/network/protocol/pairing/initiator.rs index f61defbdf..d9bd7280f 100644 --- a/core/src/service/network/protocol/pairing/initiator.rs +++ b/core/src/service/network/protocol/pairing/initiator.rs @@ -191,15 +191,8 @@ impl PairingProtocolHandler { let shared_secret = self.generate_shared_secret(session_id).await?; let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); - // Complete pairing in device registry with proper lock scoping // Use the actual device ID from device_info to ensure consistency let actual_device_id = device_info.device_id; - { - let mut registry = self.device_registry.write().await; - registry - .complete_pairing(actual_device_id, device_info.clone(), session_keys.clone()) - .await?; - } // Release write lock here // Get node ID from the device info's network fingerprint let node_id = match device_info.network_fingerprint.node_id.parse::() { @@ -211,6 +204,47 @@ impl PairingProtocolHandler { } }; + // Register the remote device in Pairing state before completing pairing + // This allows complete_pairing to extract addresses from the Pairing state + { + let mut registry = self.device_registry.write().await; + // Create a NodeAddr with the node_id and extract direct addresses from device_info + let mut node_addr = iroh::NodeAddr::new(node_id); + + // Add direct addresses from the device_info + for addr_str in &device_info.direct_addresses { + if let Ok(socket_addr) = addr_str.parse() { + node_addr = node_addr.with_direct_addresses([socket_addr]); + } + } + + if !device_info.direct_addresses.is_empty() { + self.log_info(&format!( + "Extracted {} direct addresses from joiner device info", + device_info.direct_addresses.len() + )) + .await; + } + registry + .start_pairing(actual_device_id, node_id, session_id, node_addr) + .map_err(|e| { + self.log_warn(&format!( + "Warning: Could not register device in Pairing state: {}", + e + )); + e + }) + .ok(); // Ignore errors - device might already be in pairing state + } // Release write lock here + + // Complete pairing in device registry with proper lock scoping + { + let mut registry = self.device_registry.write().await; + registry + .complete_pairing(actual_device_id, device_info.clone(), session_keys.clone()) + .await?; + } // Release write lock here + // Mark device as connected since pairing is successful let simple_connection = crate::service::network::device::ConnectionInfo { addresses: vec![], // Will be filled in later diff --git a/core/src/service/network/protocol/pairing/joiner.rs b/core/src/service/network/protocol/pairing/joiner.rs index fb1e3896d..3ee2a2777 100644 --- a/core/src/service/network/protocol/pairing/joiner.rs +++ b/core/src/service/network/protocol/pairing/joiner.rs @@ -71,13 +71,61 @@ impl PairingProtocolHandler { let shared_secret = self.generate_shared_secret(session_id).await?; let session_keys = SessionKeys::from_shared_secret(shared_secret.clone()); + // Get node ID from the initiator's device info + let device_id = initiator_device_info.device_id; + let node_id = match initiator_device_info + .network_fingerprint + .node_id + .parse::() + { + Ok(id) => id, + Err(_) => { + self.log_warn("Failed to parse node ID from initiator device info, using fallback") + .await; + NodeId::from_bytes(&[0u8; 32]).unwrap() + } + }; + + // Register the initiator device in Pairing state before completing pairing + // This stores Alice's addresses so Bob can reconnect if needed + { + let mut registry = self.device_registry.write().await; + // Create NodeAddr with addresses extracted from initiator_device_info + let mut node_addr = iroh::NodeAddr::new(node_id); + + // Add direct addresses from the device_info + for addr_str in &initiator_device_info.direct_addresses { + if let Ok(socket_addr) = addr_str.parse() { + node_addr = node_addr.with_direct_addresses([socket_addr]); + } + } + + if !initiator_device_info.direct_addresses.is_empty() { + self.log_info(&format!( + "Extracted {} direct addresses from initiator device info", + initiator_device_info.direct_addresses.len() + )) + .await; + } + + registry + .start_pairing(device_id, node_id, session_id, node_addr) + .map_err(|e| { + self.log_warn(&format!( + "Warning: Could not register initiator device in Pairing state: {}", + e + )); + e + }) + .ok(); // Ignore errors - device might already be in pairing state + } // Release write lock here + // Complete pairing in device registry - let actual_device_id = initiator_device_info.device_id; { let mut registry = self.device_registry.write().await; if let Err(e) = registry .complete_pairing( - actual_device_id, + device_id, initiator_device_info.clone(), session_keys.clone(), ) @@ -100,19 +148,16 @@ impl PairingProtocolHandler { }; let mut registry = self.device_registry.write().await; - if let Err(e) = registry - .mark_connected(actual_device_id, simple_connection) - .await - { + if let Err(e) = registry.mark_connected(device_id, simple_connection).await { self.log_warn(&format!( "Warning - failed to mark initiator device {} as connected: {}", - actual_device_id, e + device_id, e )) .await; } else { self.log_info(&format!( "Successfully marked initiator device {} as connected after pairing", - actual_device_id + device_id )) .await; } @@ -217,6 +262,7 @@ impl PairingProtocolHandler { public_key_hash: "unknown".to_string(), }, last_seen: chrono::Utc::now(), + direct_addresses: vec![], } } } else { @@ -226,14 +272,61 @@ impl PairingProtocolHandler { } }; - // Complete pairing in device registry // Use the actual device ID from device_info to ensure consistency - let actual_device_id = initiator_device_info.device_id; + let device_id = initiator_device_info.device_id; + let node_id = match initiator_device_info + .network_fingerprint + .node_id + .parse::() + { + Ok(id) => id, + Err(_) => { + self.log_warn("Failed to parse node ID from initiator device info, using from_node fallback") + .await; + from_node + } + }; + + // Register the initiator device in Pairing state before completing pairing + // This stores Alice's addresses so Bob can reconnect if needed + { + let mut registry = self.device_registry.write().await; + // Create NodeAddr with addresses extracted from initiator_device_info + let mut node_addr = iroh::NodeAddr::new(node_id); + + // Add direct addresses from the device_info + for addr_str in &initiator_device_info.direct_addresses { + if let Ok(socket_addr) = addr_str.parse() { + node_addr = node_addr.with_direct_addresses([socket_addr]); + } + } + + if !initiator_device_info.direct_addresses.is_empty() { + self.log_info(&format!( + "Extracted {} direct addresses from initiator device info in completion handler", + initiator_device_info.direct_addresses.len() + )) + .await; + } + + registry + .start_pairing(device_id, node_id, session_id, node_addr) + .map_err(|e| { + self.log_warn(&format!( + "Warning: Could not register initiator device in Pairing state: {}", + e + )); + e + }) + .ok(); // Ignore errors - device might already be in pairing state + } // Release write lock here + + // Complete pairing in device registry let pairing_result = { let mut registry = self.device_registry.write().await; registry .complete_pairing( - actual_device_id, + device_id, initiator_device_info.clone(), session_keys.clone(), ) @@ -248,7 +341,7 @@ impl PairingProtocolHandler { if let Some(session) = sessions.get_mut(&session_id) { session.state = PairingState::Completed; session.shared_secret = Some(shared_secret.clone()); - session.remote_device_id = Some(actual_device_id); + session.remote_device_id = Some(device_id); } } @@ -268,9 +361,7 @@ impl PairingProtocolHandler { let _mark_result = { let mut registry = self.device_registry.write().await; - registry - .mark_connected(actual_device_id, simple_connection) - .await + registry.mark_connected(device_id, simple_connection).await }; } } diff --git a/core/src/service/network/protocol/pairing/mod.rs b/core/src/service/network/protocol/pairing/mod.rs index 16e3f442a..caf196484 100644 --- a/core/src/service/network/protocol/pairing/mod.rs +++ b/core/src/service/network/protocol/pairing/mod.rs @@ -19,7 +19,7 @@ use crate::service::network::{ }; use async_trait::async_trait; use blake3; -use iroh::{Endpoint, NodeAddr, NodeId}; +use iroh::{Endpoint, NodeAddr, NodeId, Watcher}; use persistence::PairingPersistence; use security::PairingSecurity; use std::collections::HashMap; @@ -55,6 +55,9 @@ pub struct PairingProtocolHandler { /// Session persistence manager persistence: Option>, + + /// Endpoint for accessing direct addresses + endpoint: Option, } impl PairingProtocolHandler { @@ -66,6 +69,7 @@ impl PairingProtocolHandler { command_sender: tokio::sync::mpsc::UnboundedSender< crate::service::network::core::event_loop::EventLoopCommand, >, + endpoint: Option, ) -> Self { Self { identity, @@ -76,6 +80,7 @@ impl PairingProtocolHandler { command_sender, role: None, persistence: None, + endpoint, } } @@ -88,6 +93,7 @@ impl PairingProtocolHandler { crate::service::network::core::event_loop::EventLoopCommand, >, data_dir: PathBuf, + endpoint: Option, ) -> Self { let persistence = Arc::new(PairingPersistence::new(data_dir)); Self { @@ -99,6 +105,7 @@ impl PairingProtocolHandler { command_sender, role: None, persistence: Some(persistence), + endpoint, } } @@ -321,6 +328,20 @@ impl PairingProtocolHandler { device_info.network_fingerprint = self.identity.network_fingerprint(); device_info.last_seen = chrono::Utc::now(); + // Populate direct addresses from endpoint + device_info.direct_addresses = if let Some(endpoint) = &self.endpoint { + if let Some(node_addr) = endpoint.node_addr().get() { + node_addr + .direct_addresses() + .map(|addr| addr.to_string()) + .collect() + } else { + vec![] + } + } else { + vec![] + }; + Ok(device_info) } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index ef11f8624..884a2685f 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -1741,33 +1741,45 @@ impl PeerSync { /// /// Queries the database for ALL shared resources (tags, albums, etc.) to send /// to a new device during initial sync. This ensures pre-sync data is included. + /// + /// This is fully generic - uses the registry to discover and query all shared models. pub async fn get_full_shared_state(&self) -> Result { - use crate::infra::sync::Syncable; - debug!("Querying full shared resource state for backfill"); - // Query all shared models - // TODO: Add volumes, user_metadata, etc. as they get Syncable impl - let tags = crate::infra::db::entities::tag::Model::query_for_sync( - None, - None, - 10000, // Large limit for full state - self.db.as_ref(), + // Query all shared models through the registry (fully generic) + let all_shared_state = crate::infra::sync::registry::query_all_shared_models( + None, // No since filter - get everything + CATCHUP_BATCH_SIZE, + self.db.clone(), ) .await - .map_err(|e| anyhow::anyhow!("Failed to query tags: {}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to query shared models: {}", e))?; - info!("Queried {} tags for backfill state snapshot", tags.len()); + // Build response object dynamically + let mut response = serde_json::Map::new(); - Ok(serde_json::json!({ - "tags": tags.into_iter().map(|(uuid, data, _ts)| { - serde_json::json!({ - "uuid": uuid, - "data": data + for (model_type, records) in all_shared_state { + info!( + model_type = %model_type, + count = records.len(), + "Queried shared model for backfill state snapshot" + ); + + // Convert records to array of {uuid, data} objects + let records_json: Vec = records + .into_iter() + .map(|(uuid, data, _ts)| { + serde_json::json!({ + "uuid": uuid, + "data": data + }) }) - }).collect::>(), - // Add more shared resources here as they're implemented - })) + .collect(); + + response.insert(model_type, serde_json::Value::Array(records_json)); + } + + Ok(serde_json::Value::Object(response)) } /// Transition to ready state (after backfill) diff --git a/core/src/service/watcher/mod.rs b/core/src/service/watcher/mod.rs index 94c07e89f..0896cf3ea 100644 --- a/core/src/service/watcher/mod.rs +++ b/core/src/service/watcher/mod.rs @@ -166,6 +166,8 @@ pub struct WatchedLocation { pub path: PathBuf, /// Whether watching is enabled for this location pub enabled: bool, + /// Indexing rule toggles for filtering events + pub rule_toggles: crate::ops::indexing::rules::RuleToggles, } impl LocationWatcher { @@ -206,6 +208,15 @@ impl LocationWatcher { } } + // Get rule toggles and location root from watched locations + let (rule_toggles, location_root) = { + let locations = self.watched_locations.read().await; + locations + .get(&location_id) + .map(|loc| (loc.rule_toggles, loc.path.clone())) + .ok_or_else(|| anyhow::anyhow!("Location {} not found in watched locations", location_id))? + }; + // Create metrics for this worker let worker_metrics = Arc::new(LocationWorkerMetrics::new()); { @@ -223,6 +234,8 @@ impl LocationWatcher { self.events.clone(), self.config.clone(), worker_metrics.clone(), + rule_toggles, + location_root, ); // Record worker creation @@ -498,6 +511,7 @@ impl LocationWatcher { library_id: library.id(), path: path.clone(), enabled: true, // TODO: Add enabled field to database schema + rule_toggles: Default::default(), // Use default rules for existing locations }; // Add to watched locations @@ -827,6 +841,7 @@ impl LocationWatcher { library_id, path: path.clone(), enabled: true, + rule_toggles: Default::default(), // Use default rules for new locations }; // Add location to watcher diff --git a/core/src/service/watcher/worker.rs b/core/src/service/watcher/worker.rs index 0a50ebbd0..433a41536 100644 --- a/core/src/service/watcher/worker.rs +++ b/core/src/service/watcher/worker.rs @@ -29,6 +29,10 @@ pub struct LocationWorker { config: LocationWatcherConfig, /// Metrics for this worker metrics: Arc, + /// Indexing rule toggles for filtering events + rule_toggles: crate::ops::indexing::rules::RuleToggles, + /// Location root path for rule evaluation + location_root: PathBuf, } impl LocationWorker { @@ -41,6 +45,8 @@ impl LocationWorker { events: Arc, config: LocationWatcherConfig, metrics: Arc, + rule_toggles: crate::ops::indexing::rules::RuleToggles, + location_root: PathBuf, ) -> Self { Self { location_id, @@ -50,6 +56,8 @@ impl LocationWorker { events, config, metrics, + rule_toggles, + location_root, } } @@ -422,7 +430,15 @@ impl LocationWorker { } } - if let Err(e) = responder::apply_batch(&self.context, self.library_id, raw_events).await { + if let Err(e) = responder::apply_batch( + &self.context, + self.library_id, + raw_events, + self.rule_toggles, + &self.location_root, + ) + .await + { error!( "Failed to apply batch for location {}: {}", self.location_id, e @@ -490,6 +506,8 @@ mod tests { events: Arc::new(EventBus::default()), config, metrics: Arc::new(LocationWorkerMetrics::new()), + rule_toggles: Default::default(), + location_root: PathBuf::from("/test"), }; let events = vec![ @@ -522,6 +540,8 @@ mod tests { events: Arc::new(EventBus::default()), metrics: Arc::new(LocationWorkerMetrics::new()), config, + rule_toggles: Default::default(), + location_root: PathBuf::from("/test"), }; let renames = vec![ diff --git a/core/src/testing/runner.rs b/core/src/testing/runner.rs index e000440ae..64b1663d5 100644 --- a/core/src/testing/runner.rs +++ b/core/src/testing/runner.rs @@ -170,6 +170,13 @@ impl CargoTestRunner { .env("TEST_ROLE", &process.name) .env("TEST_DATA_DIR", process.data_dir.path().to_str().unwrap()); + // Forward RUST_LOG from parent or default to debug for subprocess visibility + if let Ok(rust_log) = std::env::var("RUST_LOG") { + command.env("RUST_LOG", rust_log); + } else { + command.env("RUST_LOG", "debug"); + } + let child = command .stdout(Stdio::inherit()) .stderr(Stdio::inherit()) @@ -216,6 +223,13 @@ impl CargoTestRunner { .env("TEST_ROLE", &process.name) .env("TEST_DATA_DIR", process.data_dir.path().to_str().unwrap()); + // Forward RUST_LOG from parent or default to debug for subprocess visibility + if let Ok(rust_log) = std::env::var("RUST_LOG") { + command.env("RUST_LOG", rust_log); + } else { + command.env("RUST_LOG", "debug"); + } + let child = command .stdout(Stdio::inherit()) .stderr(Stdio::inherit()) diff --git a/core/tests/device_pairing_test.rs b/core/tests/device_pairing_test.rs index 8c12e941b..f8d4a12fc 100644 --- a/core/tests/device_pairing_test.rs +++ b/core/tests/device_pairing_test.rs @@ -233,6 +233,11 @@ async fn bob_pairing_scenario() { println!("PAIRING_SUCCESS: Bob's Test Device connected to Alice successfully"); + // Wait longer to allow persistent connection to be established via auto-reconnection + // The pairing stream is temporary; we need to wait for Bob to reconnect + println!("Bob: Waiting for persistent connection to be established..."); + tokio::time::sleep(Duration::from_secs(10)).await; + // Write success marker for orchestrator to detect std::fs::write("/tmp/spacedrive-pairing-test/bob_success.txt", "success").unwrap(); break; diff --git a/core/tests/sync_backfill_integration_test.rs b/core/tests/sync_backfill_integration_test.rs new file mode 100644 index 000000000..64b653d53 --- /dev/null +++ b/core/tests/sync_backfill_integration_test.rs @@ -0,0 +1,487 @@ +//! End-to-end sync backfill integration test +//! +//! This test validates the full sync stack by: +//! 1. Device A (Alice) indexes the Spacedrive source code +//! 2. Device B (Bob) pairs with Alice and triggers backfill +//! 3. Validates that Bob receives all indexed data via sync + +use sd_core::infra::db::entities::{entry, entry_closure}; +use sd_core::testing::CargoTestRunner; +use sd_core::Core; +use sea_orm::{EntityTrait, PaginatorTrait}; +use std::env; +use std::path::PathBuf; +use std::time::Duration; +use tokio::time::timeout; + +const TEST_DIR: &str = "/tmp/spacedrive-sync-backfill-test"; + +/// Alice indexes the Spacedrive source code +#[tokio::test] +#[ignore] +async fn alice_indexes_scenario() { + if env::var("TEST_ROLE").unwrap_or_default() != "alice" { + return; + } + + env::set_var("SPACEDRIVE_TEST_DIR", TEST_DIR); + + let data_dir = PathBuf::from(format!("{}/alice", TEST_DIR)); + let device_name = "Alice's Device"; + + println!("Alice: Starting sync backfill test"); + println!("Alice: Data dir: {:?}", data_dir); + + // Initialize Core + println!("Alice: Initializing Core..."); + let mut core = timeout(Duration::from_secs(10), Core::new(data_dir)) + .await + .unwrap() + .unwrap(); + core.device.set_name(device_name.to_string()).unwrap(); + println!("Alice: Core initialized"); + + // Subscribe to events to monitor job completion + let mut event_subscriber = core.events.subscribe(); + + // Get the Spacedrive project root (current working directory during test) + let project_root = env::current_dir().expect("Failed to get current directory"); + println!("Alice: Project root: {:?}", project_root); + + // Create library + println!("Alice: Creating library..."); + let library = core + .libraries + .create_library("Test Sync Library", None, core.context.clone()) + .await + .unwrap(); + + // Register device in database + let db = library.db(); + let device = core.device.to_device().unwrap(); + + use sd_core::infra::db::entities; + use sea_orm::{ActiveModelTrait, ColumnTrait, QueryFilter}; + + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await + .unwrap() + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await.unwrap() + } + }; + + // Create a location pointing to the Spacedrive source code + println!("Alice: Creating location for source code..."); + + let location_args = sd_core::location::LocationCreateArgs { + path: project_root.clone(), + name: Some("Spacedrive Source".to_string()), + index_mode: sd_core::location::IndexMode::Shallow, + }; + + let location_id = sd_core::location::create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await + .expect("Failed to create location"); + + println!("Alice: Location created with ID: {}", location_id); + + // The indexing job is automatically started by add_location, so we just monitor events + println!("Alice: Monitoring indexer job progress..."); + + let mut job_completed = false; + let mut attempts = 0; + let max_attempts = 300; // 5 minutes max + + tokio::spawn(async move { + while let Ok(event) = event_subscriber.recv().await { + match event { + sd_core::infra::event::Event::JobProgress { + job_id, + progress, + message, + .. + } => { + println!( + "Alice: Job {} progress: {}% - {}", + job_id, + progress, + message.unwrap_or_else(|| "".to_string()) + ); + } + sd_core::infra::event::Event::JobCompleted { job_id, .. } => { + println!("Alice: Job {} completed!", job_id); + } + sd_core::infra::event::Event::JobFailed { error, .. } => { + panic!("Alice: Job failed: {}", error); + } + _ => {} + } + } + }); + + // Poll for indexing completion by checking if files have been indexed + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let entry_count = entry::Entity::find() + .count(library.db().conn()) + .await + .unwrap(); + + if entry_count > 0 && attempts > 10 { + // Give it some time to ensure indexing is done + tokio::time::sleep(Duration::from_secs(3)).await; + + let final_entry_count = entry::Entity::find() + .count(library.db().conn()) + .await + .unwrap(); + + if final_entry_count == entry_count { + println!("Alice: Indexing complete! Found {} entry entries", final_entry_count); + job_completed = true; + break; + } + } + + attempts += 1; + if attempts >= max_attempts { + panic!("Alice: Indexing timeout - job did not complete"); + } + + if attempts % 10 == 0 { + println!("Alice: Still waiting for indexing... (current count: {})", entry_count); + } + } + + if !job_completed { + panic!("Alice: Failed to complete indexing"); + } + + // Count total entries via entry_closure table + let entry_count = entry_closure::Entity::find() + .count(library.db().conn()) + .await + .unwrap(); + + println!("Alice: Total entry_closure count: {}", entry_count); + + // Write entry count to shared file for Bob to validate against + std::fs::create_dir_all(TEST_DIR).unwrap(); + std::fs::write( + format!("{}/alice_entry_count.txt", TEST_DIR), + entry_count.to_string(), + ) + .unwrap(); + println!("Alice: Entry count written to shared file"); + + // Initialize networking for device pairing + println!("Alice: Initializing networking..."); + timeout(Duration::from_secs(10), core.init_networking()) + .await + .unwrap() + .unwrap(); + tokio::time::sleep(Duration::from_secs(3)).await; + println!("Alice: Networking initialized"); + + // Start pairing as initiator + println!("Alice: Starting pairing as initiator..."); + let (pairing_code, expires_in) = if let Some(networking) = core.networking() { + timeout( + Duration::from_secs(15), + networking.start_pairing_as_initiator(false), + ) + .await + .unwrap() + .unwrap() + } else { + panic!("Networking not initialized"); + }; + + let short_code = pairing_code + .split_whitespace() + .take(3) + .collect::>() + .join(" "); + println!( + "Alice: Pairing code: {}... (expires in {}s)", + short_code, expires_in + ); + + // Write pairing code for Bob + std::fs::write(format!("{}/pairing_code.txt", TEST_DIR), &pairing_code).unwrap(); + println!("Alice: Pairing code written"); + + // Wait for Bob to connect + println!("Alice: Waiting for Bob to pair..."); + let mut pair_attempts = 0; + let max_pair_attempts = 60; + + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let connected_devices = core + .services + .device + .get_connected_devices() + .await + .unwrap(); + if !connected_devices.is_empty() { + println!("Alice: Bob paired successfully!"); + break; + } + + pair_attempts += 1; + if pair_attempts >= max_pair_attempts { + panic!("Alice: Pairing timeout"); + } + + if pair_attempts % 5 == 0 { + println!("Alice: Still waiting for Bob..."); + } + } + + // Give time for sync to initialize and begin backfill + println!("Alice: Waiting for sync to begin..."); + tokio::time::sleep(Duration::from_secs(10)).await; + + // Write success marker + std::fs::write(format!("{}/alice_success.txt", TEST_DIR), "success").unwrap(); + println!("Alice: Test completed successfully"); + + // Keep alive for Bob to complete backfill + tokio::time::sleep(Duration::from_secs(60)).await; +} + +/// Bob pairs with Alice and backfills the indexed data +#[tokio::test] +#[ignore] +async fn bob_backfills_scenario() { + if env::var("TEST_ROLE").unwrap_or_default() != "bob" { + return; + } + + env::set_var("SPACEDRIVE_TEST_DIR", TEST_DIR); + + let data_dir = PathBuf::from(format!("{}/bob", TEST_DIR)); + let device_name = "Bob's Device"; + + println!("Bob: Starting sync backfill test"); + println!("Bob: Data dir: {:?}", data_dir); + + // Initialize Core + println!("Bob: Initializing Core..."); + let mut core = timeout(Duration::from_secs(10), Core::new(data_dir)) + .await + .unwrap() + .unwrap(); + core.device.set_name(device_name.to_string()).unwrap(); + println!("Bob: Core initialized"); + + // Initialize networking + println!("Bob: Initializing networking..."); + timeout(Duration::from_secs(10), core.init_networking()) + .await + .unwrap() + .unwrap(); + tokio::time::sleep(Duration::from_secs(3)).await; + println!("Bob: Networking initialized"); + + // Wait for Alice's pairing code + println!("Bob: Looking for pairing code..."); + let pairing_code = loop { + if let Ok(code) = std::fs::read_to_string(format!("{}/pairing_code.txt", TEST_DIR)) { + break code.trim().to_string(); + } + tokio::time::sleep(Duration::from_millis(500)).await; + }; + println!("Bob: Found pairing code"); + + // Join pairing + println!("Bob: Joining pairing..."); + if let Some(networking) = core.networking() { + timeout( + Duration::from_secs(15), + networking.start_pairing_as_joiner(&pairing_code, false), + ) + .await + .unwrap() + .unwrap(); + } else { + panic!("Networking not initialized"); + } + println!("Bob: Successfully paired with Alice"); + + // Wait for devices to be connected + println!("Bob: Waiting for connection..."); + let mut pair_attempts = 0; + let max_pair_attempts = 30; + + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let connected_devices = core + .services + .device + .get_connected_devices() + .await + .unwrap(); + if !connected_devices.is_empty() { + println!("Bob: Connected to Alice!"); + break; + } + + pair_attempts += 1; + if pair_attempts >= max_pair_attempts { + panic!("Bob: Connection timeout"); + } + } + + // Read Alice's entry count for validation + let alice_entry_count: u64 = loop { + if let Ok(content) = std::fs::read_to_string(format!("{}/alice_entry_count.txt", TEST_DIR)) + { + if let Ok(count) = content.trim().parse() { + break count; + } + } + tokio::time::sleep(Duration::from_millis(500)).await; + }; + println!("Bob: Alice indexed {} entries", alice_entry_count); + + // Wait for library to be available after pairing + println!("Bob: Waiting for shared library..."); + let library = loop { + let libs = core.libraries.list().await; + if !libs.is_empty() { + break libs[0].clone(); + } + tokio::time::sleep(Duration::from_millis(500)).await; + }; + println!("Bob: Got shared library: {}", library.id()); + + let mut backfill_attempts = 0; + let max_backfill_attempts = 120; // 2 minutes + + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let bob_entry_count = entry_closure::Entity::find() + .count(library.db().conn()) + .await + .unwrap(); + + if backfill_attempts % 10 == 0 { + println!( + "Bob: Current entry count: {} / {}", + bob_entry_count, alice_entry_count + ); + } + + // Check if Bob has received most of the data (within 5% tolerance) + let min_expected = (alice_entry_count as f64 * 0.95) as u64; + if bob_entry_count >= min_expected { + println!( + "Bob: Backfill complete! Received {} entries (expected ~{})", + bob_entry_count, alice_entry_count + ); + + // Validate the count is reasonable + if bob_entry_count < alice_entry_count / 2 { + panic!( + "Bob: Backfill validation failed - received {} but expected ~{}", + bob_entry_count, alice_entry_count + ); + } + + break; + } + + backfill_attempts += 1; + if backfill_attempts >= max_backfill_attempts { + panic!( + "Bob: Backfill timeout - only received {} / {} entries", + bob_entry_count, alice_entry_count + ); + } + } + + // Write success marker + std::fs::write(format!("{}/bob_success.txt", TEST_DIR), "success").unwrap(); + println!("Bob: Test completed successfully"); + + // Give time for graceful shutdown + tokio::time::sleep(Duration::from_secs(5)).await; +} + +/// Main test orchestrator +#[tokio::test] +async fn test_sync_backfill_end_to_end() { + println!("Starting end-to-end sync backfill integration test"); + + // Clean up from previous runs + let _ = std::fs::remove_dir_all(TEST_DIR); + std::fs::create_dir_all(TEST_DIR).unwrap(); + + let mut runner = CargoTestRunner::for_test_file("sync_backfill_integration_test") + .with_timeout(Duration::from_secs(600)) // 10 minutes for full test + .add_subprocess("alice", "alice_indexes_scenario") + .add_subprocess("bob", "bob_backfills_scenario"); + + // Start Alice first to index the source code + println!("Starting Alice (indexer)..."); + runner + .spawn_single_process("alice") + .await + .expect("Failed to spawn Alice"); + + // Wait for Alice to complete indexing and initialize networking + // Indexing can take 30-60 seconds depending on system + println!("Waiting for Alice to complete indexing..."); + tokio::time::sleep(Duration::from_secs(90)).await; + + // Start Bob to trigger backfill + println!("Starting Bob (backfill receiver)..."); + runner + .spawn_single_process("bob") + .await + .expect("Failed to spawn Bob"); + + // Wait for both devices to complete + let result = runner + .wait_for_success(|_outputs| { + let alice_success = std::fs::read_to_string(format!("{}/alice_success.txt", TEST_DIR)) + .map(|content| content.trim() == "success") + .unwrap_or(false); + let bob_success = std::fs::read_to_string(format!("{}/bob_success.txt", TEST_DIR)) + .map(|content| content.trim() == "success") + .unwrap_or(false); + + alice_success && bob_success + }) + .await; + + match result { + Ok(_) => { + println!("End-to-end sync backfill test successful!"); + } + Err(e) => { + println!("End-to-end sync backfill test failed: {}", e); + for (name, output) in runner.get_all_outputs() { + println!("\n{} output:\n{}", name, output); + } + panic!("Sync backfill test failed - see output above"); + } + } +} diff --git a/docs b/docs index ffcd1266e..8f6e22609 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit ffcd1266ecd36c0efd7535e31d3432688c7fc80b +Subproject commit 8f6e226094c0edc54c2ddb6ddf97681bfb633349