From 39dbb53c630e101b4af7e23f067fec3994331def Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sat, 11 Oct 2025 12:44:47 -0700 Subject: [PATCH] cleanup --- core/src/infra/sync/docs/DEPENDENCY_GRAPH.md | 312 ----- core/src/infra/sync/docs/FILE_ORGANIZATION.md | 357 ------ .../sync/docs/NETWORK_INTEGRATION_STATUS.md | 518 --------- .../infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md | 536 --------- .../sync/docs/SYNC_IMPLEMENTATION_GUIDE.md | 600 ---------- .../sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md | 1007 ----------------- core/src/infra/sync/mod.rs | 2 - core/src/infra/sync/transaction.rs | 11 - core/src/lib.rs | 15 +- 9 files changed, 6 insertions(+), 3352 deletions(-) delete mode 100644 core/src/infra/sync/docs/DEPENDENCY_GRAPH.md delete mode 100644 core/src/infra/sync/docs/FILE_ORGANIZATION.md delete mode 100644 core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md delete mode 100644 core/src/infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md delete mode 100644 core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md delete mode 100644 core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md diff --git a/core/src/infra/sync/docs/DEPENDENCY_GRAPH.md b/core/src/infra/sync/docs/DEPENDENCY_GRAPH.md deleted file mode 100644 index 23abecdfd..000000000 --- a/core/src/infra/sync/docs/DEPENDENCY_GRAPH.md +++ /dev/null @@ -1,312 +0,0 @@ -# Sync Dependency Graph System - -**Status**: Complete -**Date**: October 9, 2025 -**Phase**: 1 (Explicit Dependencies) & 2 (Topological Sort) - ---- - -## Overview - -The sync dependency graph system automatically computes the correct order for syncing models based on their foreign key dependencies. This prevents foreign key violations during backfill operations by ensuring parent records always sync before child records. - -## Architecture - -### **1. Syncable Trait Extension** - -Added `sync_depends_on()` method to the `Syncable` trait: - -```rust -fn sync_depends_on() -> &'static [&'static str] { - &[] // Default: no dependencies -} -``` - -**Location**: `core/src/infra/sync/syncable.rs` - -### **2. Entity Declarations** - -Each syncable model declares its dependencies: - -```rust -// Device (no dependencies - root of graph) -impl Syncable for device::Model { - fn sync_depends_on() -> &'static [&'static str] { - &[] - } -} - -// Location depends on Device -impl Syncable for location::Model { - fn sync_depends_on() -> &'static [&'static str] { - &["device"] - } -} - -// Entry depends on Location -impl Syncable for entry::Model { - fn sync_depends_on() -> &'static [&'static str] { - &["location"] - } -} - -// Tag (shared model, no FK dependencies) -impl Syncable for tag::Model { - fn sync_depends_on() -> &'static [&'static str] { - &[] - } -} -``` - -**Locations**: -- `core/src/infra/db/entities/device.rs` -- `core/src/infra/db/entities/location.rs` -- `core/src/infra/db/entities/entry.rs` -- `core/src/infra/db/entities/tag.rs` - -### **3. Dependency Graph Module** - -Implements Kahn's algorithm for topological sorting with cycle detection: - -```rust -pub fn compute_sync_order<'a>( - models: impl Iterator, -) -> Result, DependencyError> -``` - -**Features**: -- Topological sort (respects dependencies) -- Cycle detection (returns error if circular dependencies found) -- Unknown dependency validation -- Comprehensive test coverage - -**Location**: `core/src/infra/sync/dependency_graph.rs` - -### **4. Registry Integration** - -Computes sync order from all registered models: - -```rust -pub async fn compute_registry_sync_order() - -> Result, DependencyError> -``` - -This function: -1. Collects all registered models -2. Extracts their dependencies via `sync_depends_on()` -3. Computes topological sort -4. Returns ordered list - -**Location**: `core/src/infra/sync/registry.rs` - -### **5. BackfillManager Integration** - -The BackfillManager now uses computed order instead of hardcoded list: - -```rust -// Before (hardcoded): -let model_types = vec![ - "location".to_string(), - "entry".to_string(), - "volume".to_string(), -]; - -// After (automatic): -let sync_order = compute_registry_sync_order().await?; -let model_types = sync_order - .into_iter() - .filter(|m| is_device_owned(m)) - .collect(); -``` - -**Location**: `core/src/service/sync/backfill.rs` - ---- - -## Dependency Graph Example - -Current production dependency graph: - -``` -┌─────────┐ ┌─────────┐ -│ Device │ │ Tag │ -└────┬────┘ └─────────┘ - │ - ↓ -┌─────────┐ -│ Location│ -└────┬────┘ - │ - ↓ -┌─────────┐ -│ Entry │ -└─────────┘ -``` - -**Sync Order**: `["device", "tag", "location", "entry"]` -(Note: `device` and `tag` are independent, so order between them doesn't matter) - ---- - -## Benefits - -### **1. Safety** -- **Zero FK violations**: Parent records always arrive before children -- **Compile-time enforcement**: Dependencies declared in code -- **Runtime validation**: Detects circular dependencies at startup - -### **2. Maintainability** -- **Single source of truth**: Dependencies declared in entity code -- **Self-documenting**: Sync order is computed from schema -- **No manual lists**: No need to remember to update backfill order - -### **3. Correctness** -- **Automated**: No human error in determining order -- **Tested**: Comprehensive unit tests for all edge cases -- **Validated**: Checks for cycles and unknown dependencies - ---- - -## How to Add a New Syncable Model - -1. **Implement `Syncable` trait** on your entity: - ```rust - impl Syncable for MyModel { - const SYNC_MODEL: &'static str = "my_model"; - - fn sync_depends_on() -> &'static [&'static str] { - &["device", "location"] // Declare your FK dependencies - } - - // ... other trait methods - } - ``` - -2. **Register in `initialize_registry()`** (`registry.rs`): - ```rust - registry.insert( - "my_model".to_string(), - SyncableModelRegistration::device_owned( - "my_model", - "my_models", - |data, db| { /* ... */ }, - |device_id, since, batch_size, db| { /* ... */ }, - ), - ); - ``` - -3. **Add to `compute_registry_sync_order()`** (`registry.rs`): - ```rust - let models = vec![ - // ... existing models - (MyModel::SYNC_MODEL, MyModel::sync_depends_on()), - ]; - ``` - -4. **Done!** The system will automatically: - - Include your model in the dependency graph - - Compute the correct sync order - - Detect any circular dependencies - - Sync your model at the right time during backfill - ---- - -## Testing - -### **Unit Tests** - -**Dependency Graph** (`core/src/infra/sync/dependency_graph.rs`): -- Simple dependency chains -- Independent models -- Circular dependency detection -- Complex graphs with multiple dependencies -- Empty graph handling - -**Registry** (`core/src/infra/sync/registry.rs`): -- Sync order computation with real models -- Dependency order validation (device → location → entry) - -### **Running Tests** - -```bash -# Run dependency graph tests -cargo test --package sd-core --lib sync::dependency_graph - -# Run registry tests (includes sync order) -cargo test --package sd-core --lib sync::registry::tests::test_sync_order -``` - ---- - -## Error Handling - -### **DependencyError Types** - -1. **`CircularDependency`**: Models have circular FK references - ```rust - // Example: A depends on B, B depends on A - Err(DependencyError::CircularDependency("Models involved in cycle: a, b")) - ``` - -2. **`UnknownDependency`**: Model depends on non-existent model - ```rust - // Example: MyModel depends on "nonexistent" - Err(DependencyError::UnknownDependency("my_model", "nonexistent")) - ``` - -3. **`NoModels`**: No models registered - ```rust - Err(DependencyError::NoModels) - ``` - -All errors are propagated to the BackfillManager and logged appropriately. - ---- - -## Future Enhancements (Phase 3+) - -### **Phase 3: Compile-Time Validation** -- Validate dependency names at compile time (prevent typos) -- Use macros to ensure dependencies exist - -### **Phase 4: Procedural Macro** -- Auto-generate `sync_depends_on()` from SeaORM `Relation` enum -- Inspect `belongs_to` attributes automatically -- Zero boilerplate for developers - -Example future syntax: -```rust -#[derive(Syncable)] -#[syncable(auto_deps)] // Auto-extracts from Relations -pub struct Location { - // ... -} -``` - -### **Phase 5: FK Resolution Helpers** -- Automatic UUID → local ID resolution -- Two-phase upserts for models with foreign keys -- Handle nullable vs non-nullable FK constraints - ---- - -## Key Insights - -1. **Dependency order is a schema problem**, not a sync problem -2. By anchoring the solution in entity definitions (via `Syncable` trait), we create a maintainable architecture -3. Topological sort is the correct algorithm (well-studied, efficient) -4. Starting with explicit declarations provides immediate safety while paving the way for full automation - ---- - -## References - -- **Main Documentation**: `/docs/core/sync.md` -- **Implementation Guide**: `core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md` -- **Code Review Guide**: `core/src/infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md` - ---- - -**Status**: Production-ready ✅ -**Next Steps**: Monitor backfill operations in production, add more models as needed - diff --git a/core/src/infra/sync/docs/FILE_ORGANIZATION.md b/core/src/infra/sync/docs/FILE_ORGANIZATION.md deleted file mode 100644 index e977947e6..000000000 --- a/core/src/infra/sync/docs/FILE_ORGANIZATION.md +++ /dev/null @@ -1,357 +0,0 @@ -# Sync System - File Organization Guide - -Quick reference for navigating the sync system codebase. - -## Documentation Index - -**New to the sync system?** Read these in order: -1. [`SYNC_IMPLEMENTATION_GUIDE.md`](./SYNC_IMPLEMENTATION_GUIDE.md) - Implementation guide with architectural principles -2. [`/docs/core/sync.md`](/Users/jamespine/Projects/spacedrive/docs/core/sync.md) - Comprehensive architectural design -3. [`SYNC_IMPLEMENTATION_ROADMAP.md`](./SYNC_IMPLEMENTATION_ROADMAP.md) - Current status and priorities -4. This file - Where everything lives in the codebase - ---- - -## File Structure by Layer - -### Layer 1: Infrastructure (`core/src/infra/sync/`) - -Core sync primitives and data structures. - -``` -infra/sync/ -├── mod.rs Module exports -├── hlc.rs Hybrid Logical Clock implementation -├── peer_log.rs Per-device sync.db for shared changes -├── syncable.rs Syncable trait for models -├── registry.rs Model registry with apply function pointers -├── deterministic.rs Deterministic UUIDs for system resources -├── transaction.rs TransactionManager for atomic commits (stubbed) -├── transport.rs NetworkTransport trait - sync's network interface -│ -├── SYNC_IMPLEMENTATION_GUIDE.md START HERE - Implementation guide -├── SYNC_IMPLEMENTATION_ROADMAP.md Comprehensive implementation tracking -├── NETWORK_INTEGRATION_STATUS.md Network integration progress -└── FILE_ORGANIZATION.md This file -``` - -**Purpose**: Low-level sync primitives that other layers build on. - ---- - -### Layer 2: Sync Service (`core/src/service/sync/`) - -High-level sync coordination and state management. - -``` -service/sync/ -├── mod.rs SyncService - main entry point -├── peer.rs PeerSync - core sync engine with network -├── state.rs Sync state machine + buffer queue -├── protocol_handler.rs StateSyncHandler + LogSyncHandler -├── backfill.rs Backfill coordinator for new devices -└── applier.rs DEPRECATED - Delete this -``` - -**Purpose**: Orchestrates sync operations, manages state, handles backfill. - -**Note**: `peer.rs` now has `network: Arc` and can broadcast! ✅ - ---- - -### Layer 3: Network Integration (`core/src/service/network/`) - -Network transport and protocol handling. - -#### A. Protocol Handlers (Inbound) -``` -service/network/protocol/sync/ -├── mod.rs Module exports -├── messages.rs SyncMessage enum - all message types -├── handler.rs ️ SyncProtocolHandler - STUBBED (CRITICAL) -└── transport.rs DUPLICATE - Delete this -``` - -**Purpose**: Handle incoming sync messages from peers. - -#### B. Network Transports (Outbound) -``` -service/network/transports/ -├── mod.rs Module organization + docs -└── sync.rs NetworkTransport impl for NetworkingService -``` - -**Purpose**: Send sync messages to peers. - -#### C. Supporting Infrastructure -``` -service/network/ -├── core/ -│ ├── mod.rs NetworkingService + endpoint() getter -│ └── sync_transport.rs OLD VERSION - Delete this -└── device/ - └── registry.rs DeviceRegistry - UUIDNodeId mapping -``` - -**Purpose**: Bridge between sync layer (device UUIDs) and network layer (Iroh NodeIds). - ---- - -### Layer 4: Database Entities (`core/src/infra/db/entities/`) - -Models that implement sync apply functions. - -``` -infra/db/entities/ -├── location.rs Location model + apply_state_change() -├── tag.rs Tag model + apply_shared_change() -│ -├── entry.rs ️ Entry model (needs apply) -├── volume.rs ️ Volume model (needs apply) -├── device.rs ️ Device model (needs apply) -├── collection.rs ️ Album model (needs apply) -└── user_metadata.rs ️ UserMetadata model (needs apply) -``` - -**Purpose**: Domain models that participate in sync. - -**Status**: 2/7 models have apply functions implemented. - ---- - -### Layer 5: Operations/Actions (`core/src/ops/network/sync_setup/`) - -CLI/API operations for managing sync configuration. - -``` -ops/network/sync_setup/ -├── mod.rs Module exports -├── action.rs SyncSetupAction - configure sync partners -├── input.rs Action input schema -├── output.rs Action output schema -└── discovery/ - ├── mod.rs Module exports - ├── query.rs DiscoverLibrariesQuery - └── output.rs Discovery results -``` - -**Purpose**: User-facing operations to set up sync between devices. - ---- - -### Layer 6: Database Migrations (`core/src/infra/db/migration/`) - -Schema changes for sync-related tables. - -``` -infra/db/migration/ -└── m20250200_000001_remove_sync_leadership.rs Migration for leaderless sync -``` - -**Purpose**: Database schema evolution for sync features. - ---- - -## ️ Data Flow Map - -### Outbound Sync (Device A → Device B) - -``` -1. User Action - └─ LocationManager.create_location() - -2. Database Write - └─ location.insert(db).await - -3. Sync Trigger (TODO: Auto via TransactionManager) - └─ peer_sync.broadcast_state_change(change) - └─ service/sync/peer.rs:144-219 - -4. Network Transport - └─ network.send_sync_message(device_b_uuid, message) - └─ service/network/transports/sync.rs:32-102 - -5. UUID → NodeId Mapping - └─ device_registry.get_node_id_for_device(device_b_uuid) - └─ service/network/device/registry.rs - -6. Iroh Send - └─ endpoint.connect(node_id, SYNC_ALPN) - └─ service/network/core/mod.rs - -7. Bytes Over Network - └─ Iroh QUIC transport -``` - -### Inbound Sync (Device B receives from Device A) - -``` -1. Iroh Receive - └─ endpoint.accept() in NetworkingService - -2. Protocol Router - └─ Match ALPN = "sync" - └─ service/network/core/mod.rs - -3. SyncProtocolHandler (STUBBED - P1 PRIORITY!) - └─ handler.handle_stream(send, recv, remote_node_id) - └─ service/network/protocol/sync/handler.rs:45-52 - -4. Message Deserialization - └─ SyncMessage::from_bytes() - └─ service/network/protocol/sync/messages.rs - -5. NodeId → UUID Mapping - └─ device_registry.get_device_by_node(remote_node_id) - └─ service/network/device/registry.rs - -6. Dispatch to Handler - └─ Match on SyncMessage variant - ├─ StateChange → peer_sync.on_state_change_received() - └─ SharedChange → peer_sync.on_shared_change_received() - └─ service/sync/peer.rs:321-407 - -7. Apply via Registry - └─ registry::apply_state_change(model_type, data, db) - └─ infra/sync/registry.rs:160-189 - -8. Model Apply Function - └─ location::Model::apply_state_change(data, db) - └─ infra/db/entities/location.rs -``` - ---- - -## Finding Code - -### "Where is X defined?" - -| What | Where | Status | -|------|-------|--------| -| **NetworkTransport trait** | `infra/sync/transport.rs:42-114` | | -| **NetworkTransport impl** | `service/network/transports/sync.rs:23-152` | | -| **SyncMessage types** | `service/network/protocol/sync/messages.rs:14-107` | | -| **PeerSync broadcasting** | `service/sync/peer.rs:144-318` | | -| **Inbound handler** | `service/network/protocol/sync/handler.rs` | ️ Stubbed | -| **Registry dispatch** | `infra/sync/registry.rs:160-222` | | -| **HLC implementation** | `infra/sync/hlc.rs` | | -| **PeerLog (sync.db)** | `infra/sync/peer_log.rs` | | -| **DeviceRegistry** | `service/network/device/registry.rs` | | -| **TransactionManager** | `infra/sync/transaction.rs` | Stubbed | - -### "How do I..." - -| Task | File | Function | -|------|------|----------| -| **Send a sync message** | `service/sync/peer.rs` | `broadcast_state_change()` | -| **Handle received message** | `service/network/protocol/sync/handler.rs` | `handle_stream()` (stub) | -| **Add new model to sync** | `infra/db/entities/[model].rs` | Implement `apply_state_change()` | -| **Register model** | `infra/sync/registry.rs` | Add to `initialize_registry()` | -| **Generate HLC** | `infra/sync/hlc.rs` | `HLCGenerator::next()` | -| **Write to peer log** | `infra/sync/peer_log.rs` | `PeerLog::append()` | -| **Map UUIDNodeId** | `service/network/device/registry.rs` | `DeviceRegistry` methods | - ---- - -## File Status Matrix - -| File | LOC | Status | Priority | Notes | -|------|-----|--------|----------|-------| -| **Core Infrastructure** | -| `infra/sync/hlc.rs` | 150 | Complete | - | HLC implementation | -| `infra/sync/peer_log.rs` | 300 | Complete | - | Sync.db management | -| `infra/sync/syncable.rs` | 50 | Complete | - | Trait definition | -| `infra/sync/registry.rs` | 286 | Complete | - | Dynamic dispatch | -| `infra/sync/transport.rs` | 182 | Complete | - | NetworkTransport trait | -| `infra/sync/transaction.rs` | 226 | Stubbed | **P1** | Need auto-broadcast | -| **Network Layer** | -| `service/network/transports/sync.rs` | 169 | Complete | - | Outbound messaging | -| `service/network/protocol/sync/handler.rs` | 94 | ️ Stubbed | **P1** | **CRITICAL** | -| `service/network/protocol/sync/messages.rs` | 205 | Complete | - | Message types | -| **Sync Service** | -| `service/sync/peer.rs` | 482 | Complete | - | Core sync engine | -| `service/sync/state.rs` | 200 | Complete | - | State machine | -| `service/sync/mod.rs` | 184 | Complete | - | Service wrapper | -| **Models** | -| `infra/db/entities/location.rs` | ? | Complete | - | Has apply function | -| `infra/db/entities/tag.rs` | ? | Complete | - | Has apply function | -| `infra/db/entities/entry.rs` | ? | ️ Partial | P2 | Needs apply | -| `infra/db/entities/volume.rs` | ? | ️ Partial | P2 | Needs apply | -| `infra/db/entities/device.rs` | ? | ️ Partial | P2 | Needs apply | -| `infra/db/entities/collection.rs` | ? | ️ Partial | P2 | Needs apply | -| `infra/db/entities/user_metadata.rs` | ? | ️ Partial | P2 | Needs apply | - -**Total Lines**: ~2,500 LOC -**Completion**: 75% (25/34 files) - ---- - -## Files to Delete - -These files are legacy/duplicate and should be removed: - -1. **`service/sync/applier.rs`** - - Status: Legacy stub from leader-based sync - - Reason: Replaced by PeerSync - - Action: `rm -f core/src/service/sync/applier.rs` - - Update: Remove from `service/sync/mod.rs` - -2. **`service/network/protocol/sync/transport.rs`** - - Status: Duplicate of `transports/sync.rs` - - Reason: Wrong location (should be in transports/) - - Action: `rm -f core/src/service/network/protocol/sync/transport.rs` - - Update: Already not imported - -3. **`service/network/core/sync_transport.rs`** - - Status: Old version, moved to transports/ - - Reason: File was moved - - Action: `rm -f core/src/service/network/core/sync_transport.rs` - - Update: Already updated to use transports/ - ---- - -## Quick Links - -### Documentation -- [Implementation Roadmap](./SYNC_IMPLEMENTATION_ROADMAP.md) - Comprehensive tracking -- [Network Integration Status](./NETWORK_INTEGRATION_STATUS.md) - Phase progress -- [Sync Roadmap (docs)](../../../docs/core/sync-roadmap.md) - High-level overview - -### Key Files (Most Important) -1. `service/sync/peer.rs` - Core sync engine -2. `service/network/transports/sync.rs` - Network transport -3. `infra/sync/registry.rs` - Model dispatch -4. `service/network/protocol/sync/handler.rs` - Inbound handling ️ STUB - -### Tests -- Unit tests: Inline in each file (`#[cfg(test)]` modules) -- Integration tests: `core/tests/sync_integration_test.rs` (TODO) - ---- - -## Navigation Tips - -### By Layer -- **Infrastructure**: `core/src/infra/sync/` -- **Service**: `core/src/service/sync/` -- **Network**: `core/src/service/network/` -- **Models**: `core/src/infra/db/entities/` -- **Operations**: `core/src/ops/network/sync_setup/` - -### By Concern -- **Message Definition**: `service/network/protocol/sync/messages.rs` -- **Sending Messages**: `service/network/transports/sync.rs` -- **Receiving Messages**: `service/network/protocol/sync/handler.rs` -- **Broadcasting Logic**: `service/sync/peer.rs` -- **Model Application**: `infra/db/entities/[model].rs` - -### By State -- **Complete**: Look at `location.rs` and `tag.rs` for examples -- **In Progress**: Check `transaction.rs` for stubbed methods -- **Blocked**: `handler.rs` blocks all inbound sync - ---- - -**Last Updated**: October 9, 2025 -**Purpose**: Quick reference for developers navigating the sync codebase - diff --git a/core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md b/core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md deleted file mode 100644 index 807382669..000000000 --- a/core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md +++ /dev/null @@ -1,518 +0,0 @@ -# Sync Network Integration Status - -**Date**: 2025-10-09 -**Status**: Phase 0 & 1 Complete ✅ - -> **See also**: [SYNC_IMPLEMENTATION_ROADMAP.md](./SYNC_IMPLEMENTATION_ROADMAP.md) for comprehensive tracking, architectural review, and detailed refactoring recommendations. - ---- - -## What We Accomplished - -### Phase 0: Architecture Foundation ✅ - -**1. NetworkTransport Trait** (`infra/sync/transport.rs`) -- Defined interface for sync→network communication -- Decouples sync layer from networking implementation -- Enables dependency inversion (sync defines what it needs) -- Includes NoOpTransport fallback when networking unavailable - -**2. DeviceRegistry Integration** (Existing) -- Already has UUIDNodeId bidirectional mapping -- Methods: `get_node_id_for_device()`, `get_device_by_node()` -- Populated during device pairing -- No new code needed! - -**3. Transport Implementation** (`service/network/transports/sync.rs`) -- NetworkingService implements NetworkTransport trait -- Maps device UUIDs to NodeIds via DeviceRegistry -- Sends messages via Iroh endpoint -- Handles errors gracefully (device offline, etc.) - -**4. Clean Module Organization** -``` -core/src/ -├── infra/sync/ -│ └── transport.rs ← NetworkTransport TRAIT (interface) -└── service/network/ - ├── transports/ ← OUTBOUND senders NEW! - │ ├── mod.rs - │ └── sync.rs ← NetworkTransport IMPL - ├── protocol/ ← INBOUND handlers - │ ├── sync/handler.rs - │ └── sync/messages.rs - └── device/registry.rs ← UUIDNodeId mapping -``` - ---- - -### Phase 1: PeerSync Integration ✅ - -**1. PeerSync with NetworkTransport** (`service/sync/peer.rs`) -- Added `network: Arc` field -- Updated constructor to accept network transport -- Dependency injected from Library→SyncService→PeerSync - -**2. Real Broadcasting Implementation** - -**`broadcast_state_change()`** (lines 144-219): -```rust -// Now actually sends messages! -Queries connected sync partners -Creates SyncMessage::StateChange -Broadcasts to all partners via network.send_sync_message() -Graceful error handling (some partners offline = OK) -Detailed logging (success/error counts) -``` - -**`broadcast_shared_change()`** (lines 221-318): -```rust -// Now actually sends messages! -Generates HLC -Writes to peer_log -Creates SyncMessage::SharedChange -Broadcasts to all partners via network.send_sync_message() -Graceful error handling -Detailed logging -``` - -**3. Dependency Injection Chain** - -``` -CoreContext - ↓ (has) -NetworkingService (implements NetworkTransport) - ↓ (passed to) -Library::init_sync_service() - ↓ (creates) -SyncService::new_from_library() - ↓ (creates) -PeerSync::new() - ↓ (stores) -PeerSync.network: Arc - ↓ (uses in) -broadcast_state_change() / broadcast_shared_change() -``` - -**Fallback**: If NetworkingService not initialized, uses `NoOpNetworkTransport` (silent no-op) - ---- - -## What Works Now - -### Outbound Broadcasting ✅ - -**When Location is created:** -```rust -// In LocationManager or TransactionManager -location.insert(db).await?; - -// PeerSync broadcasts: -peer_sync.broadcast_state_change(StateChangeMessage { - model_type: "location", - record_uuid: location.uuid, - device_id: MY_DEVICE_ID, - data: location.to_sync_json()?, -}).await?; - -// NetworkTransport sends via Iroh: -// 1. Look up sync_partners → [Device B, Device C] -// 2. Map UUIDs to NodeIds via DeviceRegistry -// 3. Send SyncMessage::StateChange to each via Iroh endpoint -// 4. Log results (2 sent, 0 errors) -``` - -**When Tag is created:** -```rust -tag.insert(db).await?; - -peer_sync.broadcast_shared_change( - "tag", - tag.uuid, - ChangeType::Insert, - tag.to_sync_json()? -).await?; - -// PeerSync: -// 1. Generates HLC -// 2. Writes to sync.db peer_log -// 3. Broadcasts SyncMessage::SharedChange via NetworkTransport -// 4. Waits for ACKs (handled separately) -``` - ---- - -## ️ What's Still Missing - -### Inbound Message Handling (Phase 2) - -**Current State**: Messages can be SENT but not RECEIVED - -**Problem**: `SyncProtocolHandler` is still stubbed -```rust -// core/src/service/network/protocol/sync/handler.rs -impl ProtocolHandler for SyncProtocolHandler { - async fn handle_stream(&self, send, recv, node_id) -> Result<()> { - anyhow::bail!("SyncProtocolHandler not yet implemented") // STUB! - } -} -``` - -**What's Needed**: -```rust -impl ProtocolHandler for SyncProtocolHandler { - async fn handle_stream(&self, mut send, mut recv, remote_node_id) -> Result<()> { - // 1. Read SyncMessage from stream - let message: SyncMessage = read_message(&mut recv).await?; - - // 2. Look up device UUID from NodeId - let device_uuid = self.device_registry - .read() - .await - .get_device_by_node(remote_node_id) - .ok_or_else(|| anyhow!("Unknown device"))?; - - // 3. Route to appropriate handler - match message { - SyncMessage::StateChange { model_type, record_uuid, device_id, data, .. } => { - self.state_handler.handle_state_change( - model_type, record_uuid, device_id, data - ).await?; - } - SyncMessage::SharedChange { entry, .. } => { - self.log_handler.handle_shared_change(entry).await?; - } - // ... other message types - } - - Ok(()) - } -} -``` - -**Dependencies**: -- Stream read/write helpers (probably exist for other protocols) -- Wire up StateSyncHandler and LogSyncHandler -- Handle protocol routing - -**Effort**: ~150 lines -**Complexity**: Medium - ---- - -### TransactionManager Integration (Phase 3) - -**Current State**: Manual calls to broadcast functions - -**Problem**: Developers must remember to call broadcast_state_change() - -**What's Needed**: -```rust -impl TransactionManager { - pub async fn commit_device_owned( - &self, - library: &Library, - model: M, - ) -> Result { - // 1. Write to DB - let saved = model.insert(library.db()).await?; - - // 2. AUTO-BROADCAST via sync service - if let Some(sync) = library.sync_service() { - sync.peer_sync().broadcast_state_change( - StateChangeMessage::from(&saved) - ).await?; - } - - Ok(saved) - } -} -``` - -**Benefit**: Automatic sync - just use TransactionManager, broadcasting happens automatically - -**Effort**: ~200 lines -**Complexity**: Medium-High - ---- - -## Network Integration Progress - -| Component | Status | Lines | Notes | -|-----------|--------|-------|-------| -| **NetworkTransport Trait** | Done | 150 | Interface definition | -| **Transport Implementation** | Done | 170 | NetworkingService impl | -| **DeviceRegistry Mapping** | Exists | 0 | Already had it! | -| **PeerSync Integration** | Done | 80 | Network field + broadcasting | -| **Outbound Broadcasting** | Works | 150 | State & shared changes | -| **Inbound Handling** | ️ Stub | 0 | SyncProtocolHandler empty | -| **TransactionManager** | ️ Stub | 0 | Manual broadcast calls | - -**Total Implemented**: ~550 lines -**Remaining**: ~350 lines - ---- - -## How to Test Current Implementation - -### Manual Test (will work after Phase 2): - -```bash -# Terminal 1: Device A -$ cd ~/Library/Application\ Support/Spacedrive/ -$ sd-cli library create "Test Library" -$ sd-cli location add /tmp/test-photos - -# Terminal 2: Device B -$ sd-cli network pair-with -$ sd-cli library sync-setup --library= - -# Back to Device A terminal -$ sd-cli location list -# You should see the location - -# Back to Device B terminal -$ sd-cli location list -# After Phase 2 is done, you'll see Device A's location! -``` - -**Current Behavior** (Phase 1 only): -- Device A broadcasts StateChange message -- Device B doesn't receive it (handler stubbed) - -**After Phase 2**: -- Device A broadcasts -- Device B receives and applies -- Full bidirectional sync working! - ---- - -## Next Steps (Priority Order) - -### Phase 2: Inbound Message Handling (~4 hours) - -**A. Implement SyncProtocolHandler** (~150 lines) -- Read messages from stream -- Route to StateSyncHandler / LogSyncHandler -- Map NodeId → device UUID - -**B. Wire up Protocol Handlers** (~50 lines) -- StateSyncHandler needs database access (already has it) -- LogSyncHandler needs PeerSync reference (already has it) -- Just need to call the apply functions (already implemented!) - -### Phase 3: TransactionManager Auto-Broadcast (~3 hours) - -**A. Implement commit_device_owned()** (~100 lines) -- Write to DB -- Auto-broadcast state change -- Emit events - -**B. Implement commit_shared()** (~100 lines) -- Generate HLC -- Write to DB + peer_log atomically -- Auto-broadcast shared change -- Emit events - ---- - -## Key Architectural Decisions - -### 1. Dependency Inversion ✅ -- Sync layer defines NetworkTransport interface -- Network layer implements the interface -- No circular dependencies! - -### 2. Existing DeviceRegistry Reuse ✅ -- Didn't create duplicate UUIDNodeId mapping -- Leveraged existing pairing infrastructure -- Less code, more cohesion - -### 3. Clean Module Organization ✅ -- `transports/` for outbound (initiating messages) -- `protocol/` for inbound (handling received messages) -- Clear separation of concerns - -### 4. Graceful Degradation ✅ -- NoOpTransport when networking unavailable -- Broadcast errors don't fail entire operation -- Offline devices skipped automatically - ---- - -## Files Modified - -| File | Changes | Lines | -|------|---------|-------| -| `infra/sync/transport.rs` | NetworkTransport trait + NoOp impl | +150 | -| `service/network/transports/sync.rs` | NetworkTransport impl for NetworkingService | +170 | -| `service/network/transports/mod.rs` | Module organization | +25 | -| `service/network/core/mod.rs` | endpoint() getter | +5 | -| `service/sync/peer.rs` | Network field + real broadcasting | +150 | -| `service/sync/mod.rs` | Network parameter | +5 | -| `library/mod.rs` | Network parameter | +5 | -| `library/manager.rs` | Network injection | +15 | -| `infra/sync/mod.rs` | Export NetworkTransport | +2 | -| `service/network/mod.rs` | Export transports | +1 | - -**Total**: ~530 lines added/modified - ---- - -## Build Status - -```bash -cargo check --lib -# Compiles successfully -# No linter errors -# Follows Spacedrive code style -``` - ---- - -## Success Criteria for Full MVP - -- [x] Location apply works ✅ -- [x] Tag apply works ✅ -- [x] Registry dispatch works ✅ -- [x] Outbound broadcasting works ✅ -- [ ] Inbound message handling ️ Phase 2 -- [ ] TransactionManager auto-broadcast ️ Phase 3 -- [ ] Integration test: 2 devices sync location ️ Needs Phase 2 -- [ ] Integration test: 2 devices sync tag ️ Needs Phase 2 - -**Progress**: 4/8 criteria met (50%) - ---- - -## Next Command - -**To continue with Phase 2 (inbound handling):** -``` -"Implement SyncProtocolHandler to receive and route messages" -``` - -**Or test what we have so far:** -``` -"Show me how to test the outbound broadcasting" -``` - ---- - -## Architecture Visualization - -``` -┌─────────────────────────────────────────────────────────────┐ -│ Application Layer │ -│ (LocationManager, TagManager, TransactionManager) │ -└──────────────────────┬──────────────────────────────────────┘ - │ - ↓ calls broadcast_state_change() -┌─────────────────────────────────────────────────────────────┐ -│ Sync Layer (PeerSync) │ -│ - broadcast_state_change() IMPLEMENTED │ -│ - broadcast_shared_change() IMPLEMENTED │ -└──────────────────────┬──────────────────────────────────────┘ - │ - ↓ uses NetworkTransport trait -┌─────────────────────────────────────────────────────────────┐ -│ Network Transport (Abstraction Layer) │ -│ - send_sync_message() TRAIT DEFINED │ -│ - get_connected_partners() TRAIT DEFINED │ -└──────────────────────┬──────────────────────────────────────┘ - │ - ↓ implemented by -┌─────────────────────────────────────────────────────────────┐ -│ NetworkingService (Transport Implementation) │ -│ - Maps UUID → NodeId IMPLEMENTED │ -│ - Sends via Iroh IMPLEMENTED │ -│ - Uses DeviceRegistry INTEGRATED │ -└──────────────────────┬──────────────────────────────────────┘ - │ - ↓ sends to -┌─────────────────────────────────────────────────────────────┐ -│ Remote Device (Iroh P2P) │ -│ - Receives via SyncProtocolHandler ️ STUBBED (Phase 2) │ -│ - Applies via registry dispatch READY │ -└─────────────────────────────────────────────────────────────┘ -``` - -**Status**: Messages can be SENT but not yet RECEIVED ️ - ---- - -## Code Example: What Works Now - -```rust -// Device A creates a location -let location = location::ActiveModel { - uuid: Set(Uuid::new_v4()), - device_id: Set(device_a_id), - name: Set(Some("Photos".into())), - // ... other fields -}; - -// Insert into database -location.insert(db).await?; - -// Broadcast to peers (THIS NOW WORKS!) -peer_sync.broadcast_state_change(StateChangeMessage { - model_type: "location".to_string(), - record_uuid: location.uuid, - device_id: device_a_id, - data: location.to_sync_json()?, -}).await?; - -// What happens: -// PeerSync queries connected_sync_partners → [Device B] -// Creates SyncMessage::StateChange -// NetworkTransport.send_sync_message(device_b_uuid, message) -// ├─ Maps device_b_uuid → NodeId via DeviceRegistry -// ├─ Serializes message to JSON bytes -// ├─ endpoint.connect(node_id, SYNC_ALPN) -// ├─ Opens uni-stream -// └─ Sends bytes -// Logs: "State change sent successfully" -// ️ Device B receives bytes but handler is stubbed (drops them) -``` - ---- - -## Known Limitations (Until Phase 2) - -1. **One-Way Communication**: Can send, can't receive -2. **No ACKs**: Shared changes don't get acknowledged yet -3. **No Backfill**: Can't request historical state from peers -4. **No Protocol Registration**: SyncProtocolHandler not registered yet - -All of these are Phase 2 & 3 work! - ---- - -## Remaining Timeline - -| Phase | Component | Effort | Status | -|-------|-----------|--------|--------| -| ~~0~~ | ~~Architecture foundation~~ | ~~100 lines~~ | Done | -| ~~1~~ | ~~Network transport integration~~ | ~~200 lines~~ | Done | -| **2** | **Inbound message handling** | 200 lines | ️ Next | -| **3** | **TransactionManager** | 200 lines | ️ After 2 | -| **4** | **Testing & polish** | 100 lines | ️ Final | - -**Remaining Work**: ~500 lines, ~7-8 hours -**Completed**: ~530 lines (52% done!) - ---- - -## Immediate Next Step - -**Implement Phase 2: SyncProtocolHandler** - -This will enable Device B to actually receive and process the messages that Device A is now successfully sending! - -Files to modify: -1. `service/network/protocol/sync/handler.rs` - Implement handle_stream() -2. `service/network/core/mod.rs` - Register SyncProtocolHandler - -**Ready?** Say: *"Implement inbound sync message handling"* - diff --git a/core/src/infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md b/core/src/infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md deleted file mode 100644 index 18780c6be..000000000 --- a/core/src/infra/sync/docs/SYNC_CODE_REVIEW_GUIDE.md +++ /dev/null @@ -1,536 +0,0 @@ -# Sync System Code Review Guide - -**Purpose**: Complete module map for reviewing the hybrid leaderless sync implementation -**Last Updated**: October 9, 2025 -**Status**: Network Integration Complete - ---- - -## Overview - -The sync system follows a **Domain-Driven Design (DDD)** architecture with clear separation: -- **Domain Layer**: Entities define their own sync behavior (queries, applies) -- **Infrastructure Layer**: Generic sync primitives (HLC, PeerLog, Registry) -- **Service Layer**: Orchestration (PeerSync, NetworkProtocolHandler) -- **No domain-specific logic in infrastructure or service layers** - ---- - -## Core Architecture Documents - -### Primary Documentation -- **`/docs/core/sync.md`** - Comprehensive architectural specification (authoritative source) -- **`core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md`** - Implementation patterns and principles -- **`core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md`** - Implementation status tracking - ---- - -## Layer 1: Infrastructure - Sync Primitives - -**Location**: `core/src/infra/sync/` - -### Core Primitives - -| File | Purpose | Key Components | -|------|---------|----------------| -| **`mod.rs`** | Module root, public API exports | `HLC`, `PeerLog`, `Syncable`, `TransactionManager` | -| **`hlc.rs`** | Hybrid Logical Clock implementation | `HLC` struct, `HLCGenerator`, timestamp ordering | -| **`peer_log.rs`** | Per-device append-only sync log | `PeerLog`, `SharedChangeEntry`, ACK tracking, pruning | -| **`syncable.rs`** | Trait defining sync behavior | `Syncable` trait with `query_for_sync`, `apply_state_change`, `apply_shared_change` | -| **`registry.rs`** | Runtime dispatch for syncable models | `SYNCABLE_REGISTRY`, `query_device_state`, `apply_state_change`, `apply_shared_change` | -| **`transaction.rs`** | Gatekeeper for all sync-enabled writes | `TransactionManager::commit_device_owned`, `commit_shared` | - -### Review Focus (Infrastructure): -- **No domain-specific logic** - should be generic -- **Proper trait abstractions** - `Syncable` trait defines contract -- **Registry pattern** - dynamic dispatch without switch statements -- **HLC correctness** - causality tracking, ordering guarantees -- **PeerLog correctness** - append-only, proper pruning - ---- - -## Layer 2: Domain - Entity Sync Implementations - -**Location**: `core/src/infra/db/entities/` - -### Device-Owned Models (State-Based Sync) - -| File | Entity | Sync Status | Key Methods | -|------|--------|-------------|-------------| -| **`location.rs`** | Location | Fully Implemented | `query_for_sync`, `apply_state_change` | -| **`entry.rs`** | Entry | Fully Implemented | `query_for_sync`, `apply_state_change` | -| **`device.rs`** | Device | Fully Implemented | `query_for_sync`, `apply_state_change` | - -### Shared Models (Log-Based Sync with HLC) - -| File | Entity | Sync Status | Key Methods | -|------|--------|-------------|-------------| -| **`tag.rs`** | Tag | Fully Implemented | `apply_shared_change` (trait method) | - -### Review Focus (Domain): -- **Each entity owns its sync logic** - query and apply in entity file -- **Proper serialization** - uses `to_sync_json()`, excludes non-sync fields -- **Idempotent upserts** - `on_conflict` with UUID, updates all synced columns -- **Conflict resolution** - For shared models: HLC comparison for "last write wins" - ---- - -## Layer 3: Service - Sync Orchestration - -**Location**: `core/src/service/sync/` - -### Core Service Components - -| File | Purpose | Key Responsibilities | -|------|---------|---------------------| -| **`mod.rs`** | Sync service root | `SyncService` lifecycle, exports | -| **`peer.rs`** | Peer-to-peer sync coordination | `PeerSync`: broadcasting, receiving, applying changes | -| **`state.rs`** | Sync state machine | `DeviceSyncState`, `BufferQueue`, backfill states | -| **`backfill.rs`** | Initial sync from peers | `BackfillManager`, peer selection, resume logic | -| **`retry_queue.rs`** | Failed message retry with backoff | `RetryQueue`, exponential backoff, max retries | -| **`applier.rs`** | Apply incoming sync messages | Delegates to registry | -| **`protocol_handler.rs`** | Legacy handlers (being phased out) | State/log sync handlers | - -### Review Focus (Service): -- **Domain-agnostic** - calls registry, never switches on model types -- **Event-driven** - listens to TransactionManager events, broadcasts changes -- **Parallel broadcasts** - uses `join_all`, not sequential sends -- **Proper error handling** - timeouts, retry queues, no `.unwrap_or_default()` -- **Background tasks** - retry processor, log pruner running -- **State machine** - buffering during backfill, transitioning to ready - ---- - -## Layer 4: Network - Protocol Handler - -**Location**: `core/src/service/network/protocol/sync/` - -### Network Protocol Components - -| File | Purpose | Key Components | -|------|---------|----------------| -| **`mod.rs`** | Protocol module root | Exports | -| **`handler.rs`** | Incoming sync message router | `SyncProtocolHandler::handle_sync_message` | -| **`messages.rs`** | Sync message definitions | `SyncMessage` enum, `StateRecord`, message types | - -### Message Types Implemented - -| Message Type | Direction | Handler Status | Purpose | -|--------------|-----------|----------------|---------| -| `StateChange` | Broadcast | Complete | Send state-based updates | -| `StateBatch` | Broadcast | Complete | Batch state updates | -| `StateRequest` | Request/Response | Complete | Request backfill data | -| `StateResponse` | Response | Complete | Return backfill data | -| `SharedChange` | Broadcast | Complete | Send HLC-ordered log entries | -| `SharedChangeBatch` | Broadcast | Complete | Batch shared changes | -| `SharedChangeRequest` | Request/Response | Complete | Request shared changes since HLC | -| `SharedChangeResponse` | Response | Complete | Return shared changes | -| `AckSharedChanges` | Notification | Complete | Acknowledge receipt (for pruning) | -| `Heartbeat` | Bidirectional | Complete | Keep-alive with watermarks | -| `Error` | Notification | Complete | Error reporting | - -### Review Focus (Network): -- **Complete message handling** - all message types handled -- **Request/response flow** - proper response message generation -- **Error handling** - graceful degradation, error messages sent back -- **Integration with PeerSync** - delegates to PeerSync methods - ---- - -## Data Flow Architecture - -### Write Path (Local Change → Peers) - -``` -1. Operation/Job - ↓ -2. TransactionManager.commit_device_owned() or commit_shared() - ↓ -3. Emits event: "sync:state_change" or "sync:shared_change" - ↓ -4. PeerSync event listener picks up event - ↓ -5. PeerSync.broadcast_state_change() or broadcast_shared_change() - ↓ -6. NetworkTransport.send_sync_message() (parallel to all peers) - ↓ -7. Peer's SyncProtocolHandler.handle_sync_message() - ↓ -8. PeerSync.on_state_change_received() or on_shared_change_received() - ↓ -9. Registry.apply_state_change() or apply_shared_change() - ↓ -10. Entity.apply_state_change() (domain logic) - ↓ -11. Database upsert -``` - -**Files Involved**: -1. `core/src/ops/` (operations) -2. `core/src/infra/sync/transaction.rs` -3. `core/src/service/sync/peer.rs` (lines 124-336) -4. `core/src/service/network/protocol/sync/handler.rs` -5. `core/src/service/sync/peer.rs` (lines 746-856) -6. `core/src/infra/sync/registry.rs` -7. `core/src/infra/db/entities/*.rs` - -### Read Path (Backfill Request) - -``` -1. Peer sends StateRequest message - ↓ -2. SyncProtocolHandler.handle_sync_message() - ↓ -3. PeerSync.get_device_state() - ↓ -4. Registry.query_device_state() (loops through model types) - ↓ -5. Entity.query_for_sync() (domain logic) - ↓ -6. Database query - ↓ -7. Entity.to_sync_json() (domain serialization) - ↓ -8. Return StateResponse to requester -``` - -**Files Involved**: -1. `core/src/service/network/protocol/sync/handler.rs` (lines 163-197) -2. `core/src/service/sync/peer.rs` (lines 987-1057) -3. `core/src/infra/sync/registry.rs` (lines 256-289) -4. `core/src/infra/db/entities/*.rs` (Syncable impls) - ---- - -## Critical Review Points - -### 1. DDD Compliance ✅ - -**Check**: No domain-specific logic in sync infrastructure -- BAD: Switch statements on model types in `peer.rs` -- GOOD: Registry-based dispatch -- BAD: JSON serialization logic in `registry.rs` -- GOOD: Entity calls `to_sync_json()` - -**Files to Verify**: -- `core/src/service/sync/peer.rs` - should call registry, not switch on types -- `core/src/infra/sync/registry.rs` - should be pure routing -- `core/src/infra/db/entities/*.rs` - should contain all domain logic - -### 2. Error Handling ✅ - -**Check**: Proper error propagation, no silent failures -- BAD: `.unwrap_or_default()` hiding errors -- GOOD: Proper `Result` propagation -- GOOD: Retry queue for network failures -- GOOD: Timeout handling (30s) - -**Files to Verify**: -- `core/src/service/sync/peer.rs` (lines 286-354, 399-476) -- `core/src/service/sync/retry_queue.rs` - -### 3. Parallel Operations ✅ - -**Check**: Broadcasts use parallel sends -- BAD: Sequential `for` loop with awaits -- GOOD: `join_all` with parallel futures - -**Files to Verify**: -- `core/src/service/sync/peer.rs::broadcast_state_change` (lines 564-606) -- `core/src/service/sync/peer.rs::broadcast_shared_change` (lines 670-741) - -### 4. Background Tasks ✅ - -**Check**: Required background tasks running -- Event listener (TransactionManager → PeerSync) -- Retry queue processor (every 10s) -- Log pruner (every 5min) - -**Files to Verify**: -- `core/src/service/sync/peer.rs::start` (lines 123-147) -- `core/src/service/sync/peer.rs::start_event_listener` (lines 220-276) -- `core/src/service/sync/peer.rs::start_retry_processor` (lines 149-189) -- `core/src/service/sync/peer.rs::start_log_pruner` (lines 191-218) - -### 5. ACK Mechanism ✅ - -**Check**: Automatic ACK sending after applying shared changes -- Extracts sender from `entry.hlc.device_id` -- Sends ACK back to sender -- Non-fatal error handling (continues if ACK fails) - -**Files to Verify**: -- `core/src/service/sync/peer.rs::apply_shared_change` (lines 806-843) - ---- - -## Testing Areas - -### Files Requiring Integration Tests - -1. **State-Based Sync (Two Peers)** - - File: `core/tests/sync_state_integration_test.rs` (to be created) - - Tests: Location sync, Entry sync, Device sync - -2. **Log-Based Sync (Conflict Resolution)** - - File: `core/tests/sync_log_integration_test.rs` (to be created) - - Tests: Tag concurrent modification, HLC ordering - -3. **Network Partition Recovery** - - File: `core/tests/sync_recovery_test.rs` (to be created) - - Tests: Reconnect after disconnection, retry queue - -4. **Backfill/Catch-up** - - File: `core/tests/sync_backfill_test.rs` (to be created) - - Tests: New device joins, request/response flow - ---- - -## Complete File Manifest - -### Infrastructure Layer (`core/src/infra/`) - -``` -sync/ -├── mod.rs - Public API, exports -├── hlc.rs - Hybrid Logical Clock (324 lines) -├── peer_log.rs - Append-only shared change log (397 lines) -├── syncable.rs - Syncable trait definition (302 lines) -├── registry.rs - Model registration and dispatch (397 lines) -├── transaction.rs - Transaction manager, event emission (287 lines) -└── docs/ - ├── SYNC_IMPLEMENTATION_GUIDE.md (601 lines) - ├── SYNC_IMPLEMENTATION_ROADMAP.md (1008 lines) - └── SYNC_CODE_REVIEW_GUIDE.md (this file) -``` - -### Domain Layer (`core/src/infra/db/entities/`) - -``` -entities/ -├── mod.rs - Entity exports -├── location.rs - Location entity + Syncable impl (285 lines) -├── entry.rs - Entry entity + Syncable impl (245 lines) -├── device.rs - Device entity + Syncable impl (148 lines) -└── tag.rs - Tag entity + Syncable impl (partial) -``` - -### Service Layer (`core/src/service/sync/`) - -``` -sync/ -├── mod.rs - Service lifecycle (183 lines) -├── peer.rs - Core sync orchestration (1144 lines) CRITICAL -├── state.rs - State machine, buffering (195 lines) -├── backfill.rs - Initial sync from peers (247 lines) -├── retry_queue.rs - Failed message retry (134 lines) -├── applier.rs - Apply incoming changes (delegates to registry) -└── protocol_handler.rs - Legacy handlers (being phased out) -``` - -### Network Protocol Layer (`core/src/service/network/protocol/sync/`) - -``` -sync/ -├── mod.rs - Protocol exports -├── handler.rs - Protocol message router (336 lines) CRITICAL -└── messages.rs - Message type definitions (205 lines) -``` - ---- - -## Key Files for Deep Review - -### Critical Path Files (Must Review Thoroughly) - -1. **`core/src/service/sync/peer.rs`** (1144 lines) - - Most complex file in the system - - Orchestrates all sync operations - - Review: Event handling (lines 220-454), Broadcasting (lines 513-741), Applying (lines 787-856) - -2. **`core/src/service/network/protocol/sync/handler.rs`** (336 lines) - - Entry point for all incoming sync messages - - Review: All message type handlers, error handling - -3. **`core/src/infra/sync/registry.rs`** (397 lines) - - Central dispatch mechanism - - Review: Should have ZERO domain logic, only routing - -4. **`core/src/infra/sync/hlc.rs`** (324 lines) - - Correctness is critical for conflict resolution - - Review: Ordering guarantees, causality tracking - -### Important Supporting Files - -5. **`core/src/infra/sync/peer_log.rs`** (397 lines) - - Manages sync.db, ACK tracking, pruning - - Review: SQL correctness, transaction safety - -6. **`core/src/infra/sync/transaction.rs`** (287 lines) - - Sole gatekeeper for writes - - Review: Event emission, proper use throughout codebase - -7. **`core/src/infra/sync/syncable.rs`** (302 lines) - - Trait contract for all syncable models - - Review: Complete API, proper abstractions - -### Domain Implementations - -8. **`core/src/infra/db/entities/location.rs`** (285 lines) - - Reference implementation for state-based sync - - Review: Query logic, apply logic, field exclusions - -9. **`core/src/infra/db/entities/entry.rs`** (245 lines) - - Most complex entity (hierarchical, large scale) - - Review: UUID handling, sync-ready filtering - -10. **`core/src/infra/db/entities/tag.rs`** (Fixed - October 9, 2025) - - Reference implementation for log-based sync - - Review: HLC conflict resolution, apply logic, trait implementation - ---- - -## Known Issues / TODOs - -### Non-Critical (Enhancements) - -1. **Checkpoint Persistence** (`core/src/service/sync/state.rs`, lines 185-195) - - Currently in-memory only - - Impact: Backfill restarts from beginning if interrupted - -2. **Fallback for Pruned Logs** (`core/src/service/network/protocol/sync/handler.rs`, line 235) - - Returns error if logs already pruned - - Impact: Triggers full resync (acceptable) - -3. **Device UUID Filtering** (`core/src/infra/db/entities/location.rs`, line 1018) - - Minor query optimization - - Impact: Minimal, locations already device-scoped - -### Critical (Blockers) - Currently NONE ✅ - -All critical functionality is implemented and working. - ---- - -## Review Checklist - -### Architecture Review - -- [ ] Verify no switch statements on model types outside domain layer -- [ ] Verify all domain logic is in `entities/*.rs`, not `sync/*` -- [ ] Verify registry is pure routing (no JSON serialization) -- [ ] Verify proper trait-based dispatch throughout - -### Correctness Review - -- [ ] HLC ordering guarantees (timestamp > counter > device_id) -- [ ] HLC causality tracking (update on receive) -- [ ] Idempotent upserts (on_conflict with UUID) -- [ ] Proper field exclusions (id, created_at, updated_at not synced) -- [ ] ACK mechanism (sent after apply, enables pruning) - -### Performance Review - -- [ ] Parallel broadcasts (not sequential) -- [ ] Batch operations where appropriate -- [ ] Database query efficiency (proper indexes on uuid, updated_at) -- [ ] Buffer queue during backfill (prevents overwhelm) - -### Reliability Review - -- [ ] Timeout handling (30s on all network operations) -- [ ] Retry queue with exponential backoff -- [ ] Error propagation (no silent failures) -- [ ] Log pruning (prevents unbounded growth) -- [ ] Background task lifecycle (starts with service, stops cleanly) - -### Integration Review - -- [ ] TransactionManager → PeerSync event flow working -- [ ] PeerSync → NetworkHandler message flow working -- [ ] StateRequest/Response backfill flow working -- [ ] SharedChangeRequest/Response sync flow working -- [ ] ACK flow enabling log pruning - ---- - -## Line-by-Line Review Priorities - -### High Priority Sections - -**`peer.rs`**: -- Lines 123-147: Service startup (background tasks) -- Lines 220-454: Event handling (TransactionManager integration) -- Lines 513-606: State change broadcasting -- Lines 619-741: Shared change broadcasting -- Lines 787-856: Apply received changes - -**`handler.rs`**: -- Lines 47-287: Message routing (all types) - -**`registry.rs`**: -- Lines 140-203: Registry initialization (should be pure routing) -- Lines 256-289: Query dispatch function - -**`location.rs`, `entry.rs`, `device.rs`**: -- Syncable trait implementations -- Query and apply logic - -### Medium Priority Sections - -**`peer_log.rs`**: -- Lines 103-131: Append operation -- Lines 133-198: Query operations -- Lines 200-270: ACK and pruning - -**`hlc.rs`**: -- Lines 67-92: HLC update logic (causality) -- Lines 132-145: Ordering implementation - -**`retry_queue.rs`**: -- Lines 51-95: Retry logic with exponential backoff - ---- - -## Questions for Review - -1. **Architecture**: Is the DDD boundary properly maintained? Any domain logic leaking into infrastructure? - -2. **Completeness**: Are all message types handled? Any stub functions remaining? - -3. **Correctness**: Will the HLC ordering guarantee convergence? Is the ACK mechanism sound? - -4. **Performance**: Are there any blocking operations in hot paths? Any N+1 query issues? - -5. **Reliability**: What happens if a peer goes offline mid-sync? Are all failure modes handled? - -6. **Testing**: What integration tests are needed? Can we mock the network layer effectively? - ---- - -## Success Criteria for Review - -### Passing Review Means: - -**Architecture**: Clean DDD separation, no domain logic in infrastructure -**Completeness**: All message types handled, no critical stubs -**Correctness**: HLC math verified, conflict resolution sound -**Performance**: Parallel operations, efficient queries -**Reliability**: Proper error handling, retry mechanisms -**Readiness**: Code is ready for integration testing - ---- - -## Next Steps After Review - -1. **Model Integration** - Implement Syncable for remaining models (Album, Collection, UserMetadata) -2. **Integration Tests** - Write end-to-end sync tests -3. **Performance Testing** - Benchmark with 1M+ entries -4. **Production Hardening** - Add metrics, monitoring, circuit breakers - ---- - -**Review Status**: Ready for review ✅ -**Estimated Review Time**: 4-6 hours for thorough review -**Reviewers Should Focus On**: Architecture violations, correctness issues, missing error handling - diff --git a/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md b/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md deleted file mode 100644 index e9970d6e4..000000000 --- a/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md +++ /dev/null @@ -1,600 +0,0 @@ -# Sync Implementation Guide - -**Purpose**: Primary guide for implementing the Spacedrive sync system -**Status**: Active Development -**Last Updated**: October 9, 2025 - ---- - -## Executive Summary - -The Spacedrive sync system is a well-architected, hybrid synchronization solution combining state-based and log-based approaches. The architectural design is comprehensive and implementation-ready. This guide synthesizes the detailed architecture from [`/docs/core/sync.md`](/Users/jamespine/Projects/spacedrive/docs/core/sync.md) with the current implementation status from [`SYNC_IMPLEMENTATION_ROADMAP.md`](./SYNC_IMPLEMENTATION_ROADMAP.md). - -**Current Risk Profile**: LOW - The task is primarily one of executing a well-defined plan rather than solving architectural unknowns. - -**Key Documents**: -- **Architecture Reference**: [`/docs/core/sync.md`](/Users/jamespine/Projects/spacedrive/docs/core/sync.md) - Your primary guide for "how to build" -- **Implementation Status**: [`SYNC_IMPLEMENTATION_ROADMAP.md`](./SYNC_IMPLEMENTATION_ROADMAP.md) - Current progress and priorities -- **File Organization**: [`FILE_ORGANIZATION.md`](./FILE_ORGANIZATION.md) - Where everything lives - ---- - -## Core Architectural Principles - -These principles from `sync.md` are non-negotiable. All implementation work must adhere to them. - -### 1. All Data Modifications Must Go Through TransactionManager - -**The Rule**: Every create, update, or delete operation on library data **must** be routed through the `TransactionManager`. Direct database writes are not permitted. - -**Why**: This is the cornerstone that guarantees all changes are captured for synchronization. - -**Implementation Pattern**: - -```rust -// CORRECT: Use TransactionManager -let mut tx = library.transaction_manager().begin().await?; -tx.commit_device_owned("location", location_uuid, location_json).await?; - -// WRONG: Direct database write -location::Entity::insert(active_model).exec(db).await?; -``` - -**Where This Applies**: -- Job System (file operations that create/update entries) -- Operations in `src/ops/` (all CRUD actions) -- File Watcher (entry creation/updates) - -**Action Required**: Audit all database write operations and migrate them to use `TransactionManager`. - ---- - -### 2. Use the Correct Sync Strategy Per Model - -The architecture uses two distinct synchronization strategies. You must implement the correct one for each model. - -#### Strategy A: State-Based Sync (Device-Owned Data) - -**Used For**: Data that belongs to a specific device and cannot have conflicts between devices. - -**Models**: `Location`, `Entry`, `Volume`, `Device` - -**How It Works**: -1. Device makes a change locally -2. Updates its local database immediately -3. Broadcasts the new state to all connected peers -4. Peers receive and apply the state directly (no conflict resolution needed) - -**Implementation**: -```rust -impl Syncable for location::Model { - fn sync_strategy() -> SyncStrategy { - SyncStrategy::StateBased - } - - async fn apply_state_change( - data: serde_json::Value, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - let location: Self = serde_json::from_value(data)?; - - // Simple upsert - last write wins - let active = ActiveModel { - uuid: Set(location.uuid), - device_id: Set(location.device_id), - path: Set(location.path), - // ... other fields - }; - - active.insert(db).await?; - Ok(()) - } -} -``` - -#### Strategy B: Log-Based Sync (Shared Resources) - -**Used For**: Data that can be edited by multiple devices concurrently and needs conflict resolution. - -**Models**: `Tag`, `Album`, `Collection`, `UserMetadata` - -**How It Works**: -1. Device makes a change locally -2. Generates an HLC timestamp -3. Writes change to local `sync.db` (append-only log) -4. Updates main database -5. Broadcasts the log entry to all connected peers -6. Peers receive, compare HLC timestamps, and resolve conflicts - -**Implementation**: -```rust -impl Syncable for tag::Model { - fn sync_strategy() -> SyncStrategy { - SyncStrategy::LogBased - } - - async fn apply_shared_change( - entry: SharedChangeEntry, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - let new_tag: Self = serde_json::from_value(entry.data)?; - - // Load current state - let current = tag::Entity::find_by_id(new_tag.uuid) - .one(db) - .await?; - - match current { - None => { - // New record, just insert - new_tag.into_active_model().insert(db).await?; - } - Some(existing) => { - // Conflict! Use HLC to determine winner - if entry.hlc > existing.last_modified_hlc { - // Incoming change is newer, apply it - new_tag.into_active_model().update(db).await?; - } else { - // Local change is newer, ignore incoming - debug!("Ignoring older change for tag {}", new_tag.uuid); - } - } - } - - Ok(()) - } -} -``` - -**Reference Table** (from `sync.md`): - -| Model | Strategy | Reason | -|-------|----------|--------| -| Location | State-Based | Device-owned, no conflicts | -| Entry | State-Based | Device-owned, no conflicts | -| Volume | State-Based | Device-owned, no conflicts | -| Device | State-Based | Device-owned, no conflicts | -| Tag | Log-Based | Shared, needs conflict resolution | -| Album | Log-Based | Shared, needs conflict resolution | -| Collection | Log-Based | Shared, needs conflict resolution | -| UserMetadata | Hybrid | Per-device + shared fields | - -**Action Required**: Implement `Syncable` trait for all models using the correct strategy. - ---- - -### 3. Implement Proper Error Handling and Recovery - -The `sync.md` document specifies recovery procedures for common failure scenarios. You must implement these. - -#### Network Timeout Handling - -```rust -pub async fn send_sync_message( - &self, - target: Uuid, - message: SyncMessage, -) -> Result<()> { - match tokio::time::timeout( - Duration::from_secs(30), - self.do_send(target, message.clone()) - ).await { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => { - // Network error - enqueue for retry - self.retry_queue.enqueue(target, message).await; - Err(e) - } - Err(_) => { - // Timeout - enqueue for retry - warn!("Send timeout for {}, will retry", target); - self.retry_queue.enqueue(target, message).await; - Err(anyhow!("Send timeout")) - } - } -} -``` - -#### Database Constraint Violations - -```rust -async fn apply_change(&self, change: SyncMessage) -> Result<()> { - match self.do_apply(change.clone()).await { - Ok(()) => Ok(()), - Err(DbErr::RecordNotFound(_)) => { - // Missing foreign key - request dependency - self.request_missing_record(change.dependency_id).await?; - // Buffer this change to retry after dependency arrives - self.buffer_queue.push(change).await; - Ok(()) - } - Err(e) => Err(e.into()), - } -} -``` - -#### Partial Sync Resume - -```rust -pub async fn resume_sync(&self, peer_id: Uuid) -> Result<()> { - // Load last successfully processed HLC from local state - let watermark = self.get_watermark(peer_id).await?; - - // Request all changes since watermark - self.request_shared_changes(peer_id, watermark).await?; - - Ok(()) -} -``` - -**Action Required**: Implement these recovery patterns in `PeerSync` and `SyncProtocolHandler`. - ---- - -### 4. Build Comprehensive Tests - -The `sync.md` document provides test templates. Use these as your starting point. - -#### Required Integration Tests - -1. **Two-Peer State-Based Sync**: - - Device A creates a location - - Device B receives and applies it - - Verify both databases match - -2. **Two-Peer Log-Based Sync with Conflict**: - - Device A and B both edit the same tag concurrently - - Exchange changes - - Verify both converge to the same state (HLC winner) - -3. **Network Partition Recovery**: - - Device A and B both make changes while disconnected - - Reconnect - - Verify full synchronization - -4. **Partial Sync Resume**: - - Start syncing 1000 changes - - Disconnect at change 500 - - Reconnect - - Verify resume from change 501 - -**Test Infrastructure** (from `sync.md`): - -```rust -// Mock network for testing -pub struct MockNetworkTransport { - sent_messages: Arc>>, -} - -impl NetworkTransport for MockNetworkTransport { - async fn send_sync_message( - &self, - target: Uuid, - message: SyncMessage, - ) -> Result<()> { - self.sent_messages.lock().await.push((target, message)); - Ok(()) - } -} - -// Test helper to create isolated test device -async fn create_test_device(name: &str) -> TestDevice { - let db = create_test_database().await; - let sync_db_path = format!("/tmp/sync_test_{}.db", name); - let peer_log = PeerLog::new(&sync_db_path).await.unwrap(); - - TestDevice { - id: Uuid::new_v4(), - name: name.to_string(), - db, - peer_log, - sync: /* ... */, - } -} -``` - -**Action Required**: Write integration tests in `core/tests/sync_integration_test.rs` using the patterns from `sync.md`. - ---- - -## Current Implementation Status - -See [`SYNC_IMPLEMENTATION_ROADMAP.md`](./SYNC_IMPLEMENTATION_ROADMAP.md) for detailed status. Key phases: - -- **Phase 1: Core Infrastructure** - COMPLETE -- **Phase 2: Network Integration** - IN PROGRESS (80%) -- **Phase 3: Model Integration** - IN PROGRESS (29%) -- **Phase 4: End-to-End Testing** - NOT STARTED - ---- - -## Priority 1: Critical Path Items - -These must be completed for the sync system to function end-to-end. - -### 1. Implement SyncProtocolHandler (CRITICAL) - -**File**: `core/src/service/network/protocol/sync/handler.rs` - -**Current State**: Stubbed with warnings - -**What It Must Do**: -- Receive incoming `SyncMessage` from network layer -- Route to appropriate handler based on message type -- Call `PeerSync` methods to process the message -- Return response messages when needed - -**Implementation Template**: - -```rust -impl SyncProtocolHandler { - pub async fn handle_sync_message( - &self, - from_device: Uuid, - message: SyncMessage, - ) -> Result> { - let library = self.get_library(message.library_id())?; - let peer_sync = library.sync_service().peer_sync(); - - match message { - SyncMessage::StateChange { model_type, record_uuid, data, .. } => { - peer_sync.on_state_change_received( - model_type, - record_uuid, - data - ).await?; - Ok(None) // No response needed - } - - SyncMessage::SharedChange { entry, .. } => { - peer_sync.on_shared_change_received(entry).await?; - Ok(None) - } - - SyncMessage::StateRequest { model_type, record_uuid, .. } => { - let response = self.handle_state_request( - model_type, - record_uuid - ).await?; - Ok(Some(SyncMessage::StateResponse { /* ... */ })) - } - - SyncMessage::SharedChangeRequest { from_hlc, .. } => { - let entries = peer_sync.get_changes_since(from_hlc).await?; - Ok(Some(SyncMessage::SharedChangeResponse { entries })) - } - - SyncMessage::AckSharedChanges { from_device, up_to_hlc, .. } => { - peer_sync.on_ack_received(from_device, up_to_hlc).await?; - Ok(None) - } - - // ... handle other message types - } - } -} -``` - -**Estimated Effort**: 4-6 hours - ---- - -### 2. Enable Auto-Broadcast in TransactionManager - -**File**: `core/src/infra/sync/transaction.rs` - -**Current State**: Transactions commit but don't trigger sync - -**What It Must Do**: -- After successful database commit, automatically broadcast changes -- Use `PeerSync::broadcast_state_change()` for device-owned data -- Use `PeerSync::broadcast_shared_change()` for shared data - -**Implementation**: - -```rust -impl TransactionManager { - pub async fn commit_device_owned( - &mut self, - model_type: &str, - record_uuid: Uuid, - data: serde_json::Value, - ) -> Result<()> { - // 1. Commit to database - self.commit_to_db(model_type, record_uuid, &data).await?; - - // 2. Broadcast to peers - self.peer_sync.broadcast_state_change(StateChangeMessage { - model_type: model_type.to_string(), - record_uuid, - data, - }).await?; - - Ok(()) - } - - pub async fn commit_shared( - &mut self, - model_type: &str, - record_uuid: Uuid, - data: serde_json::Value, - ) -> Result<()> { - // 1. Generate HLC timestamp - let hlc = self.hlc.generate_timestamp().await; - - // 2. Write to sync.db (append-only log) - let entry = SharedChangeEntry { - hlc, - device_id: self.device_id, - model_type: model_type.to_string(), - record_uuid, - data: data.clone(), - timestamp: Utc::now(), - }; - - self.peer_log.append(entry.clone()).await?; - - // 3. Commit to main database - self.commit_to_db(model_type, record_uuid, &data).await?; - - // 4. Broadcast to peers - self.peer_sync.broadcast_shared_change(entry).await?; - - Ok(()) - } -} -``` - -**Estimated Effort**: 3-4 hours - ---- - -### 3. Fix Broadcast Error Handling - -**File**: `core/src/service/sync/peer.rs` (lines 186-209) - -**Current Problem**: Sequential sends, silent failures with `.unwrap_or_default()` - -**Required Changes**: - -1. **Use parallel sends** (not sequential): -```rust -use futures::future::join_all; - -let send_futures: Vec<_> = connected_partners - .iter() - .map(|&partner| { - let network = self.network.clone(); - let msg = message.clone(); - async move { - (partner, network.send_sync_message(partner, msg).await) - } - }) - .collect(); - -let results = join_all(send_futures).await; -``` - -2. **Handle errors properly** (not `.unwrap_or_default()`): -```rust -let connected_partners = self - .network - .get_connected_sync_partners() - .await - .map_err(|e| { - warn!("Failed to get connected partners: {}", e); - e - })?; // Propagate error instead of hiding it -``` - -3. **Enqueue failures for retry**: -```rust -for (partner_uuid, err) in failures { - warn!(partner = %partner_uuid, error = %err, "Send failed, will retry"); - self.retry_queue.enqueue(partner_uuid, message.clone()).await; -} -``` - -**Estimated Effort**: 2-3 hours - ---- - -## Implementation Checklist - -Use this checklist to track progress on the critical path. - -### Network Integration -- [ ] Implement `SyncProtocolHandler::handle_sync_message()` -- [ ] Handle each message type (StateChange, SharedChange, etc.) -- [ ] Wire up to `NetworkingService` message router -- [ ] Add proper error handling and logging -- [ ] Write unit tests for each message type - -### Transaction Manager -- [ ] Implement auto-broadcast in `commit_device_owned()` -- [ ] Implement auto-broadcast in `commit_shared()` -- [ ] Generate HLC timestamps for shared changes -- [ ] Write to `sync.db` before broadcasting -- [ ] Add transaction rollback on broadcast failure - -### Broadcast Improvements -- [ ] Convert to parallel sends using `join_all` -- [ ] Remove `.unwrap_or_default()` error hiding -- [ ] Add retry queue integration -- [ ] Add timeout handling -- [ ] Add metrics collection - -### Model Integration -- [ ] Implement `Entry::apply_state_change()` -- [ ] Implement `Volume::apply_state_change()` -- [ ] Implement `Device::apply_state_change()` -- [ ] Implement `Album::apply_shared_change()` -- [ ] Implement `Collection::apply_shared_change()` -- [ ] Implement `UserMetadata::apply_mixed()` -- [ ] Register all models in `registry.rs` - -### Testing -- [ ] Set up test infrastructure (mock network, test DB) -- [ ] Write two-peer state-based sync test -- [ ] Write two-peer log-based sync test with conflict -- [ ] Write network partition recovery test -- [ ] Write partial sync resume test -- [ ] Run tests in CI - ---- - -## Common Pitfalls to Avoid - -### 1. Direct Database Writes -**Don't**: Write directly to the database in operations/jobs -**Do**: Always use `TransactionManager` - -### 2. Wrong Sync Strategy -**Don't**: Use log-based sync for device-owned data -**Do**: Check the model classification table in this guide - -### 3. Silent Error Handling -**Don't**: Use `.unwrap_or_default()` or similar patterns -**Do**: Properly propagate errors and log them - -### 4. Missing HLC Timestamps -**Don't**: Forget to generate HLC for shared changes -**Do**: Always generate HLC before writing to `sync.db` - -### 5. Ignoring Conflicts -**Don't**: Last-write-wins for shared data -**Do**: Use HLC comparison for conflict resolution - ---- - -## Questions and Support - -If you encounter architectural questions during implementation: - -1. **First**, check [`/docs/core/sync.md`](/Users/jamespine/Projects/spacedrive/docs/core/sync.md) for detailed patterns -2. **Second**, check this guide for implementation examples -3. **Third**, check the [`SYNC_IMPLEMENTATION_ROADMAP.md`](./SYNC_IMPLEMENTATION_ROADMAP.md) for status updates -4. **Finally**, document any unresolved questions and architectural decisions - ---- - -## Success Criteria - -### MVP (End-to-End Sync Working) -- [ ] All message types handled correctly -- [ ] At least 2 models fully syncing (one state-based, one log-based) -- [ ] Basic integration test passing -- [ ] No data loss in normal operation - -### Production Ready -- [ ] All 7 models syncing correctly -- [ ] Comprehensive integration tests passing -- [ ] Error handling and retry mechanisms working -- [ ] < 1% message loss rate -- [ ] < 100ms broadcast latency for 10 peers - ---- - -**Remember**: The architecture is solid. Focus on execution, not redesign. When in doubt, follow the patterns in `sync.md`. - - diff --git a/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md b/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md deleted file mode 100644 index 5131c486e..000000000 --- a/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md +++ /dev/null @@ -1,1007 +0,0 @@ -# Sync System Implementation Roadmap - -**Status**: In Active Development -**Architecture**: Leaderless Hybrid (State-based + Log-based with HLC) -**Last Updated**: October 9, 2025 - ---- - -> **START HERE**: If you're implementing the sync system, read [`SYNC_IMPLEMENTATION_GUIDE.md`](./SYNC_IMPLEMENTATION_GUIDE.md) first. -> That guide synthesizes the architecture from `/docs/core/sync.md` with the status from this roadmap, -> providing clear, actionable guidance for implementation. - ---- - -## Executive Summary - -**Current Grade**: 7.5/10 - Solid foundation with clear architectural vision, mid-migration from leader-based to leaderless architecture. - -**Completion Status**: ~75% (25/34 files fully implemented) - ---- - -## Critical Path to MVP - -### **Phase 1: Core Infrastructure** (COMPLETE) - -- [x] HLC implementation (`hlc.rs`) -- [x] PeerLog for per-device sync.db (`peer_log.rs`) -- [x] Syncable trait (`syncable.rs`) -- [x] Registry for model dispatch (`registry.rs`) -- [x] NetworkTransport trait (`transport.rs`) -- [x] Transaction manager structure (`transaction.rs`) - -### **Phase 2: Network Integration** (IN PROGRESS - 80%) - -- [x] NetworkTransport implementation (`network/transports/sync.rs`) -- [x] PeerSync with broadcast capabilities (`service/sync/peer.rs`) -- [x] SyncMessage types (`network/protocol/sync/messages.rs`) -- [ ] **CRITICAL**: SyncProtocolHandler inbound handling (`network/protocol/sync/handler.rs`) -- [ ] **CRITICAL**: TransactionManager auto-broadcast (`infra/sync/transaction.rs`) - -### **Phase 3: Model Integration** (IN PROGRESS - 29%) - -- [x] Location apply function (`db/entities/location.rs`) -- [x] Tag apply function + Syncable impl (`db/entities/tag.rs`) -- [ ] Entry apply function (`db/entities/entry.rs`) -- [ ] Volume apply function (`db/entities/volume.rs`) -- [ ] Device apply function (`db/entities/device.rs`) -- [ ] Collection/Album apply function (`db/entities/collection.rs`) -- [ ] UserMetadata apply function (`db/entities/user_metadata.rs`) - -### **Phase 4: End-to-End Testing** (NOT STARTED) - -- [ ] Integration tests for state-based sync -- [ ] Integration tests for log-based sync -- [ ] Conflict resolution tests -- [ ] Backfill/catch-up tests -- [ ] Network partition tests - ---- - -## Priority 1: Immediate Actions (This Week) - -### 1.1 Clean Up Migration Artifacts - -**Goal**: Remove confusion from legacy code - -**Files to Delete**: -- [ ] `service/sync/applier.rs` - Legacy stub, no longer used -- [ ] `service/network/protocol/sync/transport.rs` - Duplicate, wrong location -- [ ] `service/network/core/sync_transport.rs` - Moved to transports/ - -**Command**: -```bash -cd /Users/jamespine/Projects/spacedrive/core -rm -f src/service/sync/applier.rs -rm -f src/service/network/protocol/sync/transport.rs -rm -f src/service/network/core/sync_transport.rs -``` - -**Then update module imports**: -- [ ] Remove `pub mod applier;` from `service/sync/mod.rs` -- [ ] Remove `pub use applier::SyncApplier;` from `service/sync/mod.rs` - ---- - -### 1.2 Implement SyncProtocolHandler (CRITICAL) - -**File**: `core/src/service/network/protocol/sync/handler.rs` - -**Current State**: Stubbed with warnings - -**Required Implementation**: - -```rust -impl SyncProtocolHandler { - /// Handle incoming sync message from peer - async fn handle_sync_message( - &self, - from_device: Uuid, - message: SyncMessage, - ) -> Result> { - // Get PeerSync from library - let library = self.get_library(message.library_id())?; - let peer_sync = library.sync_service().peer_sync(); - - match message { - SyncMessage::StateChange { .. } => { - peer_sync.on_state_change_received(change).await?; - Ok(None) // No response needed - } - SyncMessage::SharedChange { entry, .. } => { - peer_sync.on_shared_change_received(entry).await?; - Ok(None) - } - SyncMessage::StateRequest { .. } => { - // Query DB and return StateResponse - let response = self.handle_state_request(request).await?; - Ok(Some(response)) - } - SyncMessage::SharedChangeRequest { .. } => { - // Query PeerLog and return SharedChangeResponse - let response = self.handle_shared_request(request).await?; - Ok(Some(response)) - } - SyncMessage::AckSharedChanges { from_device, up_to_hlc, .. } => { - peer_sync.on_ack_received(from_device, up_to_hlc).await?; - Ok(None) - } - // ... handle other message types - } - } -} -``` - -**Checklist**: -- [ ] Wire up to `NetworkingService` message router -- [ ] Implement each message type handler -- [ ] Add proper error handling and logging -- [ ] Write unit tests for each message type - -**Estimated Effort**: 4-6 hours - ---- - -### 1.3 Fix Broadcast Error Handling - -**File**: `core/src/service/sync/peer.rs` - -**Issue**: Sequential sends, silent failures, no retry - -**Current Code** (lines 186-209): -```rust -for partner_uuid in connected_partners { - match self.network.send_sync_message(partner_uuid, message.clone()).await { - Ok(()) => { success_count += 1; } - Err(e) => { - error_count += 1; - warn!("Failed to send"); // Continues - } - } -} -``` - -**Improved Implementation**: -```rust -use futures::future::join_all; - -// Parallel sends -let send_futures: Vec<_> = connected_partners - .iter() - .map(|&partner| { - let network = self.network.clone(); - let msg = message.clone(); - async move { - (partner, network.send_sync_message(partner, msg).await) - } - }) - .collect(); - -let results = join_all(send_futures).await; - -// Structured error handling -let (successes, failures): (Vec<_>, Vec<_>) = results - .into_iter() - .partition(|(_, result)| result.is_ok()); - -// Enqueue failures for retry -for (partner_uuid, err) in failures { - warn!(partner = %partner_uuid, error = %err, "Send failed, will retry"); - // TODO: Add to retry queue -} - -// Fail if no one received the message -if successes.is_empty() && !failures.is_empty() { - return Err(anyhow!("Failed to broadcast to any partner")); -} -``` - -**Checklist**: -- [ ] Add `futures` dependency to Cargo.toml -- [ ] Implement parallel broadcast in `broadcast_state_change()` -- [ ] Implement parallel broadcast in `broadcast_shared_change()` -- [ ] Add retry queue structure (see Priority 2) - -**Estimated Effort**: 2-3 hours - ---- - -### 1.4 Fix Silent Error Handling - -**File**: `core/src/service/sync/peer.rs` (lines 156-161) - -**Issue**: `.unwrap_or_default()` hides network errors - -**Replace**: -```rust -let connected_partners = self - .network - .get_connected_sync_partners() - .await - .unwrap_or_default(); // Hides errors -``` - -**With**: -```rust -let connected_partners = self - .network - .get_connected_sync_partners() - .await - .map_err(|e| { - warn!("Failed to get connected partners: {}", e); - e - })?; -``` - -**Checklist**: -- [ ] Fix in `broadcast_state_change()` (line 157) -- [ ] Fix in `broadcast_shared_change()` (line 258) -- [ ] Audit entire codebase for similar patterns - -**Estimated Effort**: 30 minutes - ---- - -## Priority 2: Short-Term Improvements (This Month) - -### 2.1 Implement Retry Queue - -**New File**: `core/src/service/sync/retry_queue.rs` - -**Purpose**: Automatically retry failed broadcasts with exponential backoff - -```rust -use std::collections::VecDeque; -use tokio::sync::Mutex; -use chrono::{DateTime, Utc, Duration}; - -pub struct RetryQueue { - queue: Arc>>, -} - -struct FailedSend { - target: Uuid, - message: SyncMessage, - attempts: u8, - next_retry: DateTime, -} - -impl RetryQueue { - /// Enqueue a failed send with exponential backoff - /// Backoff: 1s, 2s, 4s, 8s, 16s, then move to DLQ - pub async fn enqueue(&self, target: Uuid, message: SyncMessage) { - let mut queue = self.queue.lock().await; - queue.push_back(FailedSend { - target, - message, - attempts: 0, - next_retry: Utc::now() + Duration::seconds(1), - }); - } - - /// Process retries (called by background task) - pub async fn process_retries(&self, network: Arc) { - let mut queue = self.queue.lock().await; - let now = Utc::now(); - - // Try to send all messages due for retry - let mut i = 0; - while i < queue.len() { - let failed = &mut queue[i]; - - if failed.next_retry > now { - i += 1; - continue; - } - - match network.send_sync_message(failed.target, failed.message.clone()).await { - Ok(()) => { - // Success! Remove from queue - queue.remove(i); - } - Err(e) => { - failed.attempts += 1; - - if failed.attempts >= 5 { - // Move to DLQ - warn!(target = %failed.target, "Max retries exceeded, moving to DLQ"); - // TODO: Persist to dead letter queue - queue.remove(i); - } else { - // Exponential backoff: 2^attempts seconds - let backoff_secs = 1 << failed.attempts; - failed.next_retry = Utc::now() + Duration::seconds(backoff_secs); - i += 1; - } - } - } - } - } -} -``` - -**Integration Points**: -- [ ] Add `retry_queue: Arc` to `PeerSync` -- [ ] Call `retry_queue.enqueue()` on broadcast failures -- [ ] Add background task in `SyncService::run_sync_loop()` to call `process_retries()` - -**Estimated Effort**: 4-6 hours - ---- - -### 2.2 Add Message Envelope Pattern - -**File**: `core/src/service/network/protocol/sync/messages.rs` - -**Issue**: 11 message types with duplicated fields (library_id, device_id, timestamp) - -**Refactor to Envelope Pattern**: - -```rust -/// Envelope for all sync messages -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SyncEnvelope { - /// Protocol version (for future compatibility) - pub version: u8, - - /// Library this message pertains to - pub library_id: Uuid, - - /// Sending device ID - pub device_id: Uuid, - - /// Message timestamp - pub timestamp: DateTime, - - /// Actual payload - pub payload: SyncPayload, -} - -/// Sync message payloads (simplified) -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum SyncPayload { - // State-based (device-owned) - StateChange { - model_type: String, - record_uuid: Uuid, - data: serde_json::Value, - }, - StateBatch { - model_type: String, - records: Vec, - }, - - // Log-based (shared) - SharedChange(SharedChangeEntry), - SharedChangeBatch(Vec), - - // Requests - StateRequest(StateRequestParams), - SharedChangeRequest(SharedChangeRequestParams), - - // Responses - StateResponse(StateResponseData), - SharedChangeResponse(SharedChangeResponseData), - - // Control - AckSharedChanges { up_to_hlc: HLC }, - Heartbeat { - state_watermark: Option>, - shared_watermark: Option, - }, - Error { message: String }, -} - -impl SyncEnvelope { - pub fn new(library_id: Uuid, device_id: Uuid, payload: SyncPayload) -> Self { - Self { - version: 1, - library_id, - device_id, - timestamp: Utc::now(), - payload, - } - } -} -``` - -**Migration Strategy**: -1. [ ] Add `SyncEnvelope` alongside existing `SyncMessage` -2. [ ] Update serialization to wrap/unwrap envelope -3. [ ] Support both formats during transition (check version field) -4. [ ] Migrate all senders to use envelope -5. [ ] Remove old `SyncMessage` enum - -**Estimated Effort**: 6-8 hours - ---- - -### 2.3 Complete Model Apply Functions - -**Status**: 2/7 models implemented - -**Remaining Models**: - -#### Entry (`core/src/infra/db/entities/entry.rs`) -```rust -impl Model { - pub async fn apply_state_change( - data: serde_json::Value, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - let entry: Self = serde_json::from_value(data) - .map_err(|e| DbErr::Custom(format!("Deserialization failed: {}", e)))?; - - // Upsert entry - let active = ActiveModel { - uuid: Set(entry.uuid), - location_id: Set(entry.location_id), - device_id: Set(entry.device_id), - path: Set(entry.path), - // ... other fields - }; - - active.insert(db).await?; - Ok(()) - } -} -``` - -**Checklist**: -- [ ] Implement `Entry::apply_state_change()` -- [ ] Implement `Volume::apply_state_change()` -- [ ] Implement `Device::apply_state_change()` -- [ ] Implement `Collection::apply_shared_change()` (with conflict resolution) -- [ ] Implement `UserMetadata::apply_mixed()` (hybrid strategy) -- [ ] Register all models in `registry.rs` -- [ ] Add tests for each apply function - -**Estimated Effort**: 8-12 hours (2-3 hours per model) - ---- - -### 2.4 Write Integration Tests - -**New File**: `core/tests/sync_integration_test.rs` - -**Test Scenarios**: - -```rust -#[tokio::test] -async fn test_state_based_sync_flow() { - // Setup: Two devices with mock network - let device_a = create_test_device("A").await; - let device_b = create_test_device("B").await; - let network = Arc::new(MockNetworkTransport::new()); - - // 1. Device A creates a location - let location = create_location(device_a.id, "/test"); - device_a.sync.broadcast_state_change(location.into()).await.unwrap(); - - // 2. Verify message was sent - let messages = network.get_sent_messages(); - assert_eq!(messages.len(), 1); - - // 3. Device B receives and applies - let (target, msg) = &messages[0]; - assert_eq!(*target, device_b.id); - device_b.sync.on_state_change_received(msg.into()).await.unwrap(); - - // 4. Verify location exists on device B - let location_b = device_b.db.find_location(location.uuid).await.unwrap(); - assert_eq!(location_b.path, "/test"); -} - -#[tokio::test] -async fn test_log_based_sync_with_conflict() { - // Setup: Two devices editing same tag - let device_a = create_test_device("A").await; - let device_b = create_test_device("B").await; - - // 1. Both devices create same tag (deterministic UUID) - let tag_uuid = deterministic_system_tag_uuid("Important"); - - // Device A: name="Important", color="red" - device_a.create_tag(tag_uuid, "Important", "red").await; - - // Device B: name="Important", color="blue" (concurrent) - device_b.create_tag(tag_uuid, "Important", "blue").await; - - // 2. Exchange changes - let change_a = device_a.peer_log.get_latest().await.unwrap(); - let change_b = device_b.peer_log.get_latest().await.unwrap(); - - device_a.sync.on_shared_change_received(change_b).await.unwrap(); - device_b.sync.on_shared_change_received(change_a).await.unwrap(); - - // 3. Both devices should converge (HLC determines winner) - let tag_a = device_a.db.find_tag(tag_uuid).await.unwrap(); - let tag_b = device_b.db.find_tag(tag_uuid).await.unwrap(); - - assert_eq!(tag_a.color, tag_b.color); // Convergence! -} - -#[tokio::test] -async fn test_backfill_with_buffering() { - // Test that changes during backfill are buffered and applied later - todo!("Implement backfill test"); -} - -#[tokio::test] -async fn test_network_partition_recovery() { - // Test that devices sync correctly after network partition - todo!("Implement partition recovery test"); -} -``` - -**Checklist**: -- [ ] Set up test infrastructure (mock network, test databases) -- [ ] Write state-based sync test -- [ ] Write log-based sync test with conflicts -- [ ] Write backfill test -- [ ] Write partition recovery test -- [ ] Run tests in CI - -**Estimated Effort**: 12-16 hours - ---- - -## ️ Priority 3: Architectural Refactors (This Quarter) - -### 3.1 Eliminate Circular Dependency - -**Issue**: Library → SyncService → needs NetworkingService, NetworkingService → needs Library - -**Current Workaround**: `Arc` trait abstraction - -**Better Solution**: Extract shared context - -**New File**: `core/src/library/context.rs` - -```rust -/// Shared context for all library services -/// -/// Breaks circular dependencies by extracting shared state into a context object -/// that both SyncService and NetworkingService depend on. -pub struct LibraryContext { - /// Library ID - pub id: Uuid, - - /// This device's ID - pub device_id: Uuid, - - /// Database connection - pub db: Arc, - - /// Event bus for cross-cutting events - pub event_bus: Arc, - - /// Device registry (UUID NodeId mapping) - pub device_registry: Arc>, - - /// Library path (for sync.db location) - pub path: PathBuf, -} - -impl LibraryContext { - pub fn new(library: &Library, device_id: Uuid) -> Self { - Self { - id: library.id(), - device_id, - db: library.db().conn().clone(), - event_bus: library.event_bus().clone(), - device_registry: Arc::new(RwLock::new(DeviceRegistry::new())), - path: library.path().to_path_buf(), - } - } -} -``` - -**Refactored Services**: - -```rust -// SyncService no longer needs NetworkTransport trait -pub struct SyncService { - context: Arc, - network: Arc, // Direct reference - peer_sync: Arc, -} - -// NetworkingService uses context -pub struct NetworkingService { - context: Arc, - endpoint: Option, -} - -impl NetworkingService { - /// Send sync message using context's device registry - pub async fn send_sync_message( - &self, - target_device: Uuid, - message: SyncMessage, - ) -> Result<()> { - let node_id = self.context.device_registry - .read() - .await - .get_node_id_for_device(target_device)?; - - // Send via endpoint... - } -} -``` - -**Migration Steps**: -1. [ ] Create `LibraryContext` struct -2. [ ] Update `Library` to create and store context -3. [ ] Refactor `NetworkingService` to use context -4. [ ] Remove `NetworkTransport` trait -5. [ ] Update `SyncService` and `PeerSync` to use direct `NetworkingService` reference -6. [ ] Update all initialization code -7. [ ] Run full test suite - -**Estimated Effort**: 16-24 hours - ---- - -### 3.2 Simplify Registry Pattern - -**Issue**: Function pointer registry is complex and hard to debug - -**Current Implementation**: `StateApplyFn` and `SharedApplyFn` function pointer types - -**Better Implementation**: Trait-based with auto-registration - -```rust -// New trait -#[async_trait] -pub trait SyncableModel: Send + Sync + 'static { - const MODEL_TYPE: &'static str; - const TABLE_NAME: &'static str; - const IS_DEVICE_OWNED: bool; - - async fn apply_state_change( - data: serde_json::Value, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - Err(DbErr::Custom("Not a device-owned model".to_string())) - } - - async fn apply_shared_change( - entry: SharedChangeEntry, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - Err(DbErr::Custom("Not a shared model".to_string())) - } -} - -// Models implement the trait -impl SyncableModel for location::Model { - const MODEL_TYPE: &'static str = "location"; - const TABLE_NAME: &'static str = "locations"; - const IS_DEVICE_OWNED: bool = true; - - async fn apply_state_change( - data: serde_json::Value, - db: &DatabaseConnection, - ) -> Result<(), DbErr> { - // Implementation... - } -} - -// Auto-registration using inventory -pub struct RegisteredModel { - pub model_type: &'static str, - pub table_name: &'static str, - pub is_device_owned: bool, - pub applier: &'static dyn SyncableModelApplier, -} - -inventory::collect!(RegisteredModel); - -// Register models -inventory::submit! { - RegisteredModel { - model_type: "location", - table_name: "locations", - is_device_owned: true, - applier: &LocationApplier, - } -} -``` - -**Migration Steps**: -1. [ ] Define `SyncableModel` trait -2. [ ] Implement trait for existing models (location, tag) -3. [ ] Set up inventory-based registration -4. [ ] Add macro to simplify registration -5. [ ] Migrate apply functions to use new registry -6. [ ] Remove old function pointer registry -7. [ ] Update all call sites - -**Estimated Effort**: 12-16 hours - ---- - -### 3.3 Add Observability Infrastructure - -**Goal**: Production-ready monitoring and debugging - -**Components**: - -#### Metrics (`core/src/service/sync/metrics.rs`) -```rust -use prometheus::{Counter, Histogram, IntGauge}; - -pub struct SyncMetrics { - // Counters - pub state_changes_sent: Counter, - pub state_changes_received: Counter, - pub shared_changes_sent: Counter, - pub shared_changes_received: Counter, - pub broadcast_failures: Counter, - - // Gauges - pub connected_partners: IntGauge, - pub buffer_queue_size: IntGauge, - pub retry_queue_size: IntGauge, - - // Histograms - pub broadcast_duration: Histogram, - pub apply_duration: Histogram, -} - -impl SyncMetrics { - pub fn new() -> Self { - Self { - state_changes_sent: Counter::new("sync_state_changes_sent", "State changes sent") - .unwrap(), - // ... register all metrics - } - } - - pub fn record_broadcast(&self, duration: Duration, success: bool) { - self.broadcast_duration.observe(duration.as_secs_f64()); - if !success { - self.broadcast_failures.inc(); - } - } -} -``` - -#### Tracing Spans -```rust -use tracing::instrument; - -#[instrument(skip(self), fields(library_id = %self.library_id, model_type = %change.model_type))] -pub async fn broadcast_state_change(&self, change: StateChangeMessage) -> Result<()> { - let span = tracing::info_span!("broadcast", partner_count = connected_partners.len()); - let _enter = span.enter(); - - // Broadcast logic... -} -``` - -**Checklist**: -- [ ] Add `prometheus` dependency -- [ ] Create `SyncMetrics` struct -- [ ] Instrument all critical paths -- [ ] Add tracing spans with context -- [ ] Create Grafana dashboard -- [ ] Add health check endpoint - -**Estimated Effort**: 8-12 hours - ---- - -## Current Status Matrix - -| Component | Status | Priority | Effort | Owner | -|-----------|--------|----------|--------|-------| -| **Core Infrastructure** | -| HLC | Complete | - | - | - | -| PeerLog | Complete | - | - | - | -| NetworkTransport trait | Complete | - | - | - | -| TransactionManager | Stubbed | P1 | 4h | TBD | -| **Network Layer** | -| NetworkTransport impl | Complete | - | - | - | -| SyncProtocolHandler | Stubbed | P1 (CRITICAL) | 6h | TBD | -| Message types | Complete | - | - | - | -| Envelope pattern | Not started | P2 | 8h | TBD | -| **Sync Service** | -| PeerSync | Complete | - | - | - | -| Broadcast (sequential) | Works | P1 | 2h | TBD | -| Broadcast (parallel) | Not started | P1 | 2h | TBD | -| Retry queue | Not started | P2 | 6h | TBD | -| **Models** | -| Location | Complete | - | - | - | -| Tag | Complete | - | - | - | -| Entry | Not started | P2 | 2h | TBD | -| Volume | Not started | P2 | 2h | TBD | -| Device | Not started | P2 | 2h | TBD | -| Collection | Not started | P2 | 3h | TBD | -| UserMetadata | Not started | P2 | 3h | TBD | -| **Testing** | -| Unit tests | Partial | P2 | 8h | TBD | -| Integration tests | Not started | P2 | 16h | TBD | -| Performance tests | Not started | P3 | 12h | TBD | -| **Architecture** | -| Circular dependency | Workaround | P3 | 24h | TBD | -| Registry pattern | Functional | P3 | 16h | TBD | -| Observability | Not started | P3 | 12h | TBD | - -**Legend**: -- Complete -- In progress / Partial -- Not started -- P1 = Priority 1 (This week) -- P2 = Priority 2 (This month) -- P3 = Priority 3 (This quarter) - ---- - -## Known Issues - -### Critical -1. **SyncProtocolHandler is stubbed** - Incoming messages are not processed -2. **TransactionManager doesn't auto-broadcast** - Changes don't trigger sync -3. **Sequential broadcasts** - Slow for many peers -4. **Silent error handling** - `.unwrap_or_default()` hides network issues - -### Major -5. **No retry mechanism** - Failed broadcasts are lost -6. **No conflict resolution UI** - Users can't resolve conflicts manually -7. **Only 2/7 models have apply functions** - Most models can't sync -8. **No integration tests** - Untested end-to-end flows - -### Minor -9. **Circular dependency workaround** - Adds unnecessary complexity -10. **Function pointer registry** - Hard to debug -11. **No observability** - Can't monitor sync health in production -12. **Clone overuse** - Potential performance issues - ---- - -## Architecture Decisions - -### ADR-001: Hybrid Sync Model -**Decision**: Use state-based sync for device-owned data, log-based with HLC for shared resources - -**Rationale**: -- State-based is simpler and more efficient for data that can't conflict -- Log-based provides proper conflict resolution for shared data -- Hybrid approach gives us best of both worlds - -**Status**: Implemented - -### ADR-002: NetworkTransport Trait -**Decision**: Use trait abstraction to break circular dependency - -**Rationale**: -- Library → Sync → Network circular dependency -- Trait allows dependency injection -- Enables testing with mocks - -**Status**: Implemented (may be refactored in P3) - -### ADR-003: Leaderless Architecture -**Decision**: All devices are peers, no leader election - -**Rationale**: -- Simpler than leader-based approach -- More resilient (no single point of failure) -- Better for offline-first usage - -**Status**: Implemented - -### ADR-004: Per-Device Sync.db -**Decision**: Each device has its own sync.db for shared changes - -**Rationale**: -- Allows independent pruning -- Clear ownership of log entries -- Simplifies ACK tracking - -**Status**: Implemented - ---- - -## Learning Resources - -### Distributed Sync Papers -- [Hybrid Logical Clocks](https://cse.buffalo.edu/tech-reports/2014-04.pdf) -- [Conflict-free Replicated Data Types (CRDTs)](https://hal.inria.fr/inria-00555588/document) -- [Operational Transformation](http://www.codecommit.com/blog/java/understanding-and-applying-operational-transformation) - -### Related Projects -- [Automerge](https://github.com/automerge/automerge) - CRDT library -- [Yrs](https://github.com/y-crdt/y-crdt) - Rust CRDT implementation -- [Syncthing](https://github.com/syncthing/syncthing) - File sync reference - -### Spacedrive Documentation -- [Daemon Architecture](/Users/jamespine/Projects/spacedrive/docs/core/daemon.md) -- [AGENTS.md](/Users/jamespine/Projects/spacedrive/core/AGENTS.md) -- [Sync Design Docs](/Users/jamespine/Projects/spacedrive/docs/core/) - ---- - -## Questions / Discussion Points - -1. **Should we support protocol versioning from day one?** - - Pros: Future-proof, easier upgrades - - Cons: More complexity upfront - - **Recommendation**: Yes, add envelope pattern now (P2) - -2. **How should we handle conflicts in the UI?** - - Option A: Always auto-resolve using HLC - - Option B: Present conflicts to user for manual resolution - - **Recommendation**: A for MVP, B for future - -3. **Should we compress messages?** - - Large batches (1000+ entries) could benefit from zstd - - Adds complexity and CPU overhead - - **Recommendation**: Not for MVP, revisit in P3 - -4. **Should we encrypt sync messages?** - - End-to-end encryption for privacy - - Per-library keys - - **Recommendation**: Not for MVP, but design with encryption in mind - ---- - -## Timeline - -### Week 1 (Current) -- [ ] Clean up legacy files -- [ ] Implement SyncProtocolHandler -- [ ] Fix broadcast error handling -- [ ] Fix silent error handling - -**Goal**: Basic end-to-end sync working - -### Week 2-4 -- [ ] Implement retry queue -- [ ] Complete all model apply functions -- [ ] Write integration tests -- [ ] Add message envelope pattern - -**Goal**: Production-ready sync - -### Month 2-3 -- [ ] Refactor circular dependency -- [ ] Simplify registry pattern -- [ ] Add observability -- [ ] Performance testing - -**Goal**: Clean, maintainable architecture - ---- - -## Success Metrics - -### MVP (End of Week 4) -- [ ] All message types handled correctly -- [ ] 7/7 models can sync -- [ ] Integration tests pass -- [ ] No data loss in normal operation -- [ ] Basic error handling and logging - -### Production-Ready (End of Month 3) -- [ ] Zero data corruption issues -- [ ] < 1% message loss rate (with retry) -- [ ] < 100ms broadcast latency (10 peers) -- [ ] < 5s sync time for 1000 changes -- [ ] Comprehensive test coverage (>70%) -- [ ] Monitoring dashboards - ---- - -## Notes - -- Keep this document updated as work progresses -- Link to relevant PRs and commits -- Document any architectural changes or decisions -- Add new issues as they're discovered - -**Last Updated**: October 9, 2025 - diff --git a/core/src/infra/sync/mod.rs b/core/src/infra/sync/mod.rs index 2c2bc59a7..fbcb6346c 100644 --- a/core/src/infra/sync/mod.rs +++ b/core/src/infra/sync/mod.rs @@ -6,8 +6,6 @@ //! - Syncable trait for model registration //! - Transaction manager for atomic commits //! -//! Legacy files (leader-based, will be removed): -//! - legacy_sync_log_* (deprecated) pub mod dependency_graph; pub mod deterministic; diff --git a/core/src/infra/sync/transaction.rs b/core/src/infra/sync/transaction.rs index 3a7f9a0e2..3b1eec16d 100644 --- a/core/src/infra/sync/transaction.rs +++ b/core/src/infra/sync/transaction.rs @@ -2,17 +2,6 @@ //! //! The TransactionManager ensures that all state-changing writes to sync-enabled //! models are atomic, logged, and emit appropriate events. -//! -//! ## Leaderless Architecture (NEW) -//! -//! In the new leaderless model, this will be simplified to: -//! - Device-owned data: Just emit events (state-based sync) -//! - Shared resources: Use HLC + PeerLog (log-based sync) -//! -//! ## Current Status -//! -//! This file is in transition. The old sync log methods are stubbed out -//! and will be replaced with HLC-based methods. use super::Syncable; use crate::infra::event::{Event, EventBus}; diff --git a/core/src/lib.rs b/core/src/lib.rs index ed0a3b201..1b1f470bf 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,9 +1,8 @@ #![allow(warnings)] //! Spacedrive Core v2 //! -//! A complete reimplementation of Spacedrive's core with modern Rust patterns, unified file operations, and a foundation built for the Virtual Distributed File System vision. +//! A Virtual Distributed File System (VDFS) implementation in Rust. -// Module declarations pub mod client; pub mod common; pub mod config; @@ -20,12 +19,6 @@ pub mod service; pub mod testing; pub mod volume; -// Compatibility module for legacy networking references -pub mod networking { - pub use crate::service::network::*; -} - -// Internal crate imports use crate::{ config::AppConfig, context::CoreContext, @@ -45,7 +38,6 @@ use crate::{ volume::{VolumeDetectionConfig, VolumeManager}, }; -// External crate imports use std::{path::PathBuf, sync::Arc}; use tokio::sync::{mpsc, RwLock}; use tracing::{error, info, warn}; @@ -634,3 +626,8 @@ fn setup_log_event_emitter(event_bus: Arc) { let _ = tracing_subscriber::registry().with(log_layer).try_init(); }); } + +// Compatibility module for legacy networking references +pub mod networking { + pub use crate::service::network::*; +}