Files
spacedrive/docs/core/sync-api-implementation-plan.md
Jamie Pine 94a52dd2df cleanup
2025-10-11 08:48:11 -07:00

20 KiB

Sync API Implementation Plan

Status: Ready for Implementation Created: 2025-10-09 Target: Spacedrive Core v2 - Leaderless Sync


Executive Summary

The sync infrastructure is complete and all tests pass. However, the current API for emitting sync events is verbose (9 lines of boilerplate per sync call). This plan introduces a clean, ergonomic API that reduces sync calls to 1 line while maintaining full functionality.

Current API (9 lines):

let sync_service = library.sync_service().ok_or(...)?;
let peer_log = sync_service.peer_sync().peer_log();
let mut hlc_gen = sync_service.peer_sync().hlc_generator().lock().await;

library.transaction_manager()
    .commit_shared(library.id(), "tag", result.uuid, ChangeType::Insert,
                   serde_json::to_value(&result)?, peer_log, &mut *hlc_gen)
    .await?;

Proposed API (1 line):

library.sync_model(&result, ChangeType::Insert).await?;

Goals

  1. Simplicity: Reduce sync calls from 9 lines to 1 line
  2. Type Safety: Leverage Syncable trait for automatic dispatch
  3. Performance: Support batch operations for bulk indexing (10K+ entries)
  4. Consistency: Single API for all models (tags, locations, entries, etc.)
  5. Maintainability: Centralize sync logic, making it easy to evolve

Architecture

Core Principle

Add extension methods to Library that handle:

  • Dependency fetching (sync service, peer log, HLC generator)
  • FK conversion (UUID integer ID mapping)
  • Sync strategy selection (device-owned vs shared)
  • Batch optimization for bulk operations

Three-Tier API

User-Facing Methods (in Library)
├─ sync_model() ────────────────► Simple models (no FKs)
├─ sync_model_with_db() ────────► Models with FK relationships
└─ sync_models_batch() ─────────► Bulk operations (1000+ records)

Internal Helpers (private)
├─ sync_device_owned_internal()
├─ sync_shared_internal()
├─ sync_device_owned_batch_internal()
└─ sync_shared_batch_internal()

Foundation (existing)
└─ TransactionManager.commit_*()

API Design

Method 1: sync_model() - Simple Case

Use when: Model has no FK relationships or FKs are already UUIDs

pub async fn sync_model<M: Syncable>(
    &self,
    model: &M,
    change_type: ChangeType,
) -> Result<()>

Examples:

  • Tags (shared, no FKs)
  • Devices (device-owned, self-referential)
  • Albums (shared, no direct FKs)

Usage:

// Create tag
let tag = tag::ActiveModel { ... }.insert(db).await?;
library.sync_model(&tag, ChangeType::Insert).await?;

Method 2: sync_model_with_db() - FK Conversion

Use when: Model has FK relationships that need UUID conversion

pub async fn sync_model_with_db<M: Syncable>(
    &self,
    model: &M,
    change_type: ChangeType,
    db: &DatabaseConnection,
) -> Result<()>

Examples:

  • Locations (device-owned, has device_id + entry_id FKs)
  • Entries (device-owned, has parent_id, metadata_id, content_id FKs)
  • User Metadata (mixed, has various FKs)

Usage:

// Create location
let location = location::ActiveModel { ... }.insert(db).await?;
library.sync_model_with_db(&location, ChangeType::Insert, db).await?;

// Create entry
let entry = entry::ActiveModel { ... }.insert(db).await?;
library.sync_model_with_db(&entry, ChangeType::Insert, db).await?;

Under the Hood:

  1. Serializes model to JSON
  2. Iterates through M::foreign_key_mappings()
  3. Converts each FK integer ID to UUID via database lookup
  4. Emits sync event with UUID-based data

Method 3: sync_models_batch() - Bulk Operations

Use when: Syncing 100+ records at once (indexing, imports, migrations)

pub async fn sync_models_batch<M: Syncable>(
    &self,
    models: &[M],
    change_type: ChangeType,
    db: &DatabaseConnection,
) -> Result<()>

Examples:

  • Indexing 10,000 files in a location
  • Importing photo library (5,000 images)
  • Bulk tag application (1,000 entries)

Usage:

// Indexing job - process in batches of 1000
let mut batch = Vec::new();

for file in discovered_files {
    let entry = entry::ActiveModel { ... }.insert(db).await?;
    batch.push(entry);

    if batch.len() >= 1000 {
        library.sync_models_batch(&batch, ChangeType::Insert, db).await?;
        batch.clear();
    }
}

// Sync remaining
if !batch.is_empty() {
    library.sync_models_batch(&batch, ChangeType::Insert, db).await?;
}

Performance:

  • Without batching: 10,000 individual network messages (~60 seconds)
  • With batching: 10 batched messages (~2 seconds)
  • 30x speedup for bulk operations

Implementation Details

File Structure

core/src/library/
├── mod.rs ───────────────► Add public API methods
└── sync_helpers.rs ──────► Internal implementation (NEW FILE)

core/src/infra/sync/
└── transaction.rs ───────► Add batch event emission support

Code Changes

1. Create core/src/library/sync_helpers.rs

//! Sync helper methods for Library
//!
//! Provides ergonomic API for emitting sync events after database writes.

use crate::{
    infra::{
        db::DatabaseConnection,
        sync::{ChangeType, Syncable},
    },
    library::Library,
};
use anyhow::Result;
use uuid::Uuid;

impl Library {
    /// Sync a model without FK conversion
    pub async fn sync_model<M: Syncable>(
        &self,
        model: &M,
        change_type: ChangeType,
    ) -> Result<()> {
        let data = model.to_sync_json()?;

        if model.is_device_owned() {
            self.sync_device_owned_internal(M::SYNC_MODEL, model.sync_id(), data).await
        } else {
            self.sync_shared_internal(M::SYNC_MODEL, model.sync_id(), change_type, data).await
        }
    }

    /// Sync a model with FK conversion
    pub async fn sync_model_with_db<M: Syncable>(
        &self,
        model: &M,
        change_type: ChangeType,
        db: &DatabaseConnection,
    ) -> Result<()> {
        let mut data = model.to_sync_json()?;

        // Convert FK integer IDs to UUIDs
        for fk in M::foreign_key_mappings() {
            crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut data, &fk, db)
                .await
                .map_err(|e| anyhow::anyhow!("FK conversion failed: {}", e))?;
        }

        if model.is_device_owned() {
            self.sync_device_owned_internal(M::SYNC_MODEL, model.sync_id(), data).await
        } else {
            self.sync_shared_internal(M::SYNC_MODEL, model.sync_id(), change_type, data).await
        }
    }

    /// Batch sync multiple models
    pub async fn sync_models_batch<M: Syncable>(
        &self,
        models: &[M],
        change_type: ChangeType,
        db: &DatabaseConnection,
    ) -> Result<()> {
        if models.is_empty() {
            return Ok(());
        }

        // Convert all models to sync JSON with FK mapping
        let mut sync_data = Vec::new();
        for model in models {
            let mut data = model.to_sync_json()?;

            for fk in M::foreign_key_mappings() {
                crate::infra::sync::fk_mapper::convert_fk_to_uuid(&mut data, &fk, db)
                    .await
                    .map_err(|e| anyhow::anyhow!("FK conversion failed: {}", e))?;
            }

            sync_data.push((model.sync_id(), data));
        }

        let is_device_owned = models[0].is_device_owned();

        if is_device_owned {
            self.sync_device_owned_batch_internal(M::SYNC_MODEL, sync_data).await
        } else {
            self.sync_shared_batch_internal(M::SYNC_MODEL, change_type, sync_data).await
        }
    }

    // ============ Internal Helpers ============

    async fn sync_device_owned_internal(
        &self,
        model_type: &str,
        record_uuid: Uuid,
        data: serde_json::Value,
    ) -> Result<()> {
        let device_id = self.device_id()?;
        self.transaction_manager()
            .commit_device_owned(self.id(), model_type, record_uuid, device_id, data)
            .await
    }

    async fn sync_shared_internal(
        &self,
        model_type: &str,
        record_uuid: Uuid,
        change_type: ChangeType,
        data: serde_json::Value,
    ) -> Result<()> {
        let sync_service = self.sync_service()
            .ok_or_else(|| anyhow::anyhow!("Sync service not initialized"))?;

        let peer_log = sync_service.peer_sync().peer_log();
        let mut hlc_gen = sync_service.peer_sync().hlc_generator().lock().await;

        self.transaction_manager()
            .commit_shared(
                self.id(),
                model_type,
                record_uuid,
                change_type,
                data,
                peer_log,
                &mut *hlc_gen,
            )
            .await
    }

    async fn sync_device_owned_batch_internal(
        &self,
        model_type: &str,
        records: Vec<(Uuid, serde_json::Value)>,
    ) -> Result<()> {
        let device_id = self.device_id()?;

        self.transaction_manager().event_bus().emit(Event::Custom {
            event_type: "sync:state_change_batch".to_string(),
            data: serde_json::json!({
                "library_id": self.id(),
                "model_type": model_type,
                "device_id": device_id,
                "records": records,
                "timestamp": chrono::Utc::now(),
            }),
        });

        Ok(())
    }

    async fn sync_shared_batch_internal(
        &self,
        model_type: &str,
        change_type: ChangeType,
        records: Vec<(Uuid, serde_json::Value)>,
    ) -> Result<()> {
        let sync_service = self.sync_service()
            .ok_or_else(|| anyhow::anyhow!("Sync service not initialized"))?;

        let peer_log = sync_service.peer_sync().peer_log();
        let mut hlc_gen = sync_service.peer_sync().hlc_generator().lock().await;

        for (record_uuid, data) in records {
            let hlc = hlc_gen.next();

            let entry = crate::infra::sync::SharedChangeEntry {
                hlc,
                model_type: model_type.to_string(),
                record_uuid,
                change_type,
                data,
            };

            peer_log.append(entry.clone()).await?;

            self.transaction_manager().event_bus().emit(Event::Custom {
                event_type: "sync:shared_change".to_string(),
                data: serde_json::json!({
                    "library_id": self.id(),
                    "entry": entry,
                }),
            });
        }

        Ok(())
    }
}

2. Update core/src/library/mod.rs

// Add module declaration
mod sync_helpers;

// Existing code remains unchanged

3. Update core/src/service/sync/peer.rs

Add handler for batch events:

// Handle batch state changes
Event::Custom { event_type, data } if event_type == "sync:state_change_batch" => {
    let library_id: Uuid = serde_json::from_value(data["library_id"].clone())?;
    let model_type: String = serde_json::from_value(data["model_type"].clone())?;
    let device_id: Uuid = serde_json::from_value(data["device_id"].clone())?;
    let records: Vec<(Uuid, Value)> = serde_json::from_value(data["records"].clone())?;

    // Broadcast batch to all peers
    self.broadcast_state_batch(library_id, model_type, device_id, records).await?;
}

Rollout Plan

Phase 1: Infrastructure (Week 1)

Goal: Implement clean API without breaking existing code

Tasks:

  1. Create sync_helpers.rs with three public methods
  2. Add batch event handling to PeerSyncService
  3. Update integration tests to use new API
  4. Run full test suite - verify no regressions

Success Criteria: All existing tests pass with new API available


Phase 2: Proof of Concept (Week 1)

Goal: Wire up 2-3 managers to validate API

Tasks:

  1. Wire TagManager.create_tag() - Uses sync_model()
  2. Wire LocationManager.add_location() - Uses sync_model_with_db()
  3. Test end-to-end sync between two real Core instances

Success Criteria:

  • Create tag on Device A → appears on Device B
  • Add location on Device A → visible on Device B

Phase 3: Indexing Integration (Week 2)

Goal: Wire up bulk entry creation with batching

Tasks:

  1. Update EntryPersistence to use sync_models_batch()
  2. Add batch size configuration (default: 1000)
  3. Test indexing 10K+ files with sync enabled
  4. Measure performance (should be <5 seconds for 10K entries)

Success Criteria:

  • Index 10,000 files on Device A
  • All entry metadata syncs to Device B
  • Sync overhead < 20% of indexing time

Phase 4: Complete Migration (Week 3)

Goal: Wire all remaining managers

Managers to Update:

  • LocationManager (add, update, remove)
  • TagManager (create, update, delete)
  • AlbumManager (when implemented)
  • UserMetadataManager (when implemented)
  • EntryProcessor (all entry operations)

Success Criteria: All database writes emit sync events


Phase 5: CLI Setup Flow (Week 4)

Goal: Enable device pairing and sync setup via CLI

Tasks:

  1. Verify network pair command works
  2. Verify network sync-setup registers devices correctly
  3. Test full flow: pair → setup → sync data
  4. Document CLI workflow in user docs

Success Criteria: Users can pair devices and sync via CLI


Usage Guidelines

Decision Tree: Which Method to Use?

Starting Point: You just inserted/updated a model in the database
│
├─ Does your model have FK fields?
│  │
│  ├─ NO ──────────────────────► Use sync_model(model, change_type)
│  │                               Examples: Tag, Device, Album
│  │
│  └─ YES
│     │
│     ├─ Single operation? ─────► Use sync_model_with_db(model, change_type, db)
│     │                           Examples: Add one location, update one entry
│     │
│     └─ Bulk operation (>100)?─► Use sync_models_batch(vec, change_type, db)
│                                 Examples: Indexing, imports, migrations
│
└─ Result: Sync event emitted, data replicates to peers

Code Examples by Model

Tags (Shared, No FKs)

let tag = tag::ActiveModel { ... }.insert(db).await?;
library.sync_model(&tag, ChangeType::Insert).await?;

Locations (Device-Owned, Has FKs)

let location = location::ActiveModel { ... }.insert(db).await?;
library.sync_model_with_db(&location, ChangeType::Insert, db).await?;

Entries (Device-Owned, Has FKs, Bulk)

let mut batch = Vec::new();
for file in files {
    let entry = entry::ActiveModel { ... }.insert(db).await?;
    batch.push(entry);

    if batch.len() >= 1000 {
        library.sync_models_batch(&batch, ChangeType::Insert, db).await?;
        batch.clear();
    }
}
if !batch.is_empty() {
    library.sync_models_batch(&batch, ChangeType::Insert, db).await?;
}

Performance Expectations

Single Operations

Operation Method Overhead Total Time
Create tag sync_model() ~50ms ~70ms
Add location sync_model_with_db() ~60ms ~100ms
Update entry sync_model_with_db() ~60ms ~80ms

Bulk Operations

Scale Without Batching With Batching Speedup
100 entries ~6 seconds ~200ms 30x
1,000 entries ~60 seconds ~500ms 120x
10,000 entries ~10 minutes ~5 seconds 120x

Note: Batching is critical for acceptable performance during indexing.


Testing Strategy

Unit Tests

#[tokio::test]
async fn test_sync_model_tag() {
    let library = setup_test_library().await;
    let tag = create_test_tag().await;

    library.sync_model(&tag, ChangeType::Insert).await.unwrap();

    // Verify event emitted
    assert_event_emitted("sync:shared_change");
}

#[tokio::test]
async fn test_sync_model_with_db_location() {
    let library = setup_test_library().await;
    let db = library.db().conn();
    let location = create_test_location(db).await;

    library.sync_model_with_db(&location, ChangeType::Insert, db).await.unwrap();

    // Verify FK conversion happened
    let event_data = get_last_event_data();
    assert!(event_data["device_uuid"].is_string()); // Not device_id integer!
}

#[tokio::test]
async fn test_sync_models_batch() {
    let library = setup_test_library().await;
    let db = library.db().conn();
    let entries = create_test_entries(1000, db).await;

    let start = Instant::now();
    library.sync_models_batch(&entries, ChangeType::Insert, db).await.unwrap();
    let elapsed = start.elapsed();

    // Should be fast (< 1 second for 1000 entries)
    assert!(elapsed < Duration::from_secs(1));
}

Integration Tests

Update existing tests in core/tests/sync_integration_test.rs:

// BEFORE (verbose)
let sync_service = setup.library_a.sync_service().unwrap();
let peer_sync = sync_service.peer_sync();
let peer_log = peer_sync.peer_log();
let mut hlc_gen = peer_sync.hlc_generator().lock().await;
setup.library_a.transaction_manager().commit_shared(...).await?;

// AFTER (clean)
setup.library_a.sync_model(&tag, ChangeType::Insert).await?;

Migration Path

For Existing Code

If any code already uses commit_device_owned() or commit_shared() directly:

  1. Don't break it - old API continues to work
  2. Gradually migrate - update as you touch files
  3. Eventual deprecation - mark old API as #[deprecated] after 6 months

Deprecation Timeline

  • Month 1-3: New API available, old API works
  • Month 4-6: Add #[deprecated] warnings to old API
  • Month 7+: Remove old low-level API, force migration

Success Metrics

Code Quality

  • Sync calls reduced from 9 lines → 1 line
  • Zero breaking changes to existing tests
  • Consistent API across all models

Performance

  • Single operations: <100ms overhead
  • Bulk operations: 30-120x speedup with batching
  • Indexing 10K files: <5 seconds sync overhead

Developer Experience

  • New contributors can add sync in <5 minutes
  • API is self-documenting (method names explain intent)
  • IDE autocomplete suggests correct method

Risks and Mitigations

Risk 1: Breaking Changes

Likelihood: Low Impact: High Mitigation: Keep old API working, add new API alongside

Risk 2: Performance Regression

Likelihood: Medium (FK conversion on hot path) Impact: Medium Mitigation:

  • Cache FK lookups where possible
  • Use batch API for bulk operations
  • Profile before/after with 10K entry test

Risk 3: Incorrect Sync Strategy Selection

Likelihood: Low (determined by Syncable trait) Impact: High (data corruption) Mitigation:

  • Add debug logging for strategy selection
  • Integration tests cover all model types
  • Fail-safe: default to safer shared resource sync

Open Questions

  1. Batch Size Configuration: Should batch size be configurable per operation or global?

    • Proposal: Global default (1000), override via parameter if needed
  2. FK Conversion Caching: Should we cache device_id → device_uuid lookups?

    • Proposal: Yes, cache in Library with TTL of 60 seconds
  3. Error Handling: If FK conversion fails, skip record or fail entire batch?

    • Proposal: Skip record with warning log, continue batch
  4. Sync During Indexing: Should initial scan sync incrementally or wait until complete?

    • Proposal: Incremental batches (shows progress, allows resume)

References

  • Sync Architecture: docs/core/sync.md
  • Implementation Guide: core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md
  • Syncable Trait: core/src/infra/sync/syncable.rs
  • FK Mapper: core/src/infra/sync/fk_mapper.rs
  • Integration Tests: core/tests/sync_integration_test.rs

Approval

  • Architecture Review - @jamespine
  • Performance Review - TBD
  • Security Review - TBD (ensure FK conversion doesn't leak data)

Changelog

  • 2025-10-09: Initial plan created
  • TBD: Implementation started
  • TBD: Rollout complete