a truck load of docs

This commit is contained in:
Jamie Pine
2025-10-07 20:31:12 -07:00
parent 4494c2d070
commit 03cb298683
38 changed files with 11635 additions and 169 deletions

View File

@@ -0,0 +1,57 @@
---
id: CORE-011
title: Unified Resource Event System
status: To Do
assignee: unassigned
priority: High
tags: [core, events, architecture, refactor]
---
## Description
Refactor the event system from 40+ specialized event variants to a unified generic resource event architecture. This eliminates boilerplate and enables horizontal scaling - adding new resources requires zero event handling code changes.
## Current Problem
- 40+ event variants in `core/src/infra/event/mod.rs`
- Manual event emission scattered across codebase (easy to forget)
- Adding new resource = new event variant + client code changes
- No type safety between events and resources
## Solution
- Generic `ResourceChanged`, `ResourceBatchChanged`, `ResourceDeleted` events
- TransactionManager emits automatically (no manual emission)
- Client type registries handle deserialization generically
- Infrastructure events remain specific (CoreStarted, Job, etc.)
## Implementation Steps
1. Define new `Event` struct with `EventEnvelope` and `EventKind`
2. Add `ResourceChanged` and related variants to `EventKind`
3. Update TransactionManager to emit resource events automatically
4. Keep infrastructure events as specific variants
5. Mark old event variants as `#[deprecated]`
6. Migrate Albums/Tags/Locations to new events (parallel systems)
7. Remove old variants after full migration
## Acceptance Criteria
- [ ] New event structure defined
- [ ] EventEnvelope includes id, timestamp, library_id, sequence
- [ ] ResourceChanged auto-emitted by TransactionManager
- [ ] Infrastructure events (Job, CoreStarted) preserved
- [ ] No breaking changes (parallel systems initially)
- [ ] Documentation updated
## Migration Strategy
- Phase 1: Additive (both old and new events)
- Phase 2: Parallel (new resources use unified events only)
- Phase 3: Deprecation (mark old events deprecated)
- Phase 4: Cleanup (remove old events)
## References
- `docs/core/events.md` - Complete specification
- Current: `core/src/infra/event/mod.rs`

View File

@@ -0,0 +1,72 @@
---
id: CORE-012
title: Resource Type Registry (Swift)
status: To Do
assignee: unassigned
parent: CORE-011
priority: High
tags: [client, swift, codegen, cache]
depends_on: [CORE-011]
---
## Description
Create the Swift ResourceTypeRegistry that enables generic deserialization of resource events. This is the key component that makes unified events zero-friction on the client side.
## Implementation Steps
1. Define `CacheableResource` protocol
2. Create `ResourceTypeRegistry` class with decoder map
3. Implement `register<T>()` method
4. Implement `decode(resourceType:from:)` method
5. Generate registry entries from specta codegen
6. Add auto-registration on app startup
7. Integrate with event handler
## Technical Details
- Location: `packages/client-swift/Sources/SpacedriveCore/Cache/ResourceTypeRegistry.swift`
- Protocol: `CacheableResource: Identifiable, Codable`
- Registry: `[String: (Data) throws -> any CacheableResource]`
- Auto-generated via specta codegen
## Example
```swift
// Protocol
protocol CacheableResource: Identifiable, Codable {
static var resourceType: String { get }
}
// Registry
class ResourceTypeRegistry {
private static var decoders: [String: (Data) throws -> any CacheableResource] = [:]
static func register<T: CacheableResource>(_ type: T.Type) {
decoders[T.resourceType] = { data in
try JSONDecoder().decode(T.self, from: data)
}
}
static func decode(resourceType: String, from data: Data) throws -> any CacheableResource {
guard let decoder = decoders[resourceType] else {
throw CacheError.unknownResourceType(resourceType)
}
return try decoder(data)
}
}
```
## Acceptance Criteria
- [ ] ResourceTypeRegistry implemented
- [ ] All domain resources conform to CacheableResource
- [ ] Auto-registration on app init
- [ ] Error handling for unknown types
- [ ] Unit tests for registration and decoding
- [ ] Integration with EventCacheUpdater
## References
- `docs/core/events.md` lines 213-298
- `docs/core/normalized_cache.md` - Cache integration

View File

@@ -0,0 +1,78 @@
---
id: CORE-013
title: Resource Type Registry (TypeScript)
status: To Do
assignee: unassigned
parent: CORE-011
priority: High
tags: [client, typescript, codegen, cache]
depends_on: [CORE-011]
---
## Description
Create the TypeScript ResourceTypeRegistry for web/desktop clients. Enables generic deserialization of resource events with type safety maintained through generated bindings.
## Implementation Steps
1. Create `ResourceTypeRegistry` class
2. Implement `register()` method with validators
3. Implement `decode()` method
4. Auto-generate `resourceTypeMap` via specta
5. Add auto-registration from generated map
6. Integrate with EventCacheUpdater
7. Add TypeScript type safety
## Technical Details
- Location: `packages/client/src/core/ResourceTypeRegistry.ts`
- Auto-generated: `packages/client/src/bindings/resourceRegistry.ts`
- Type-safe: TypeScript types generated from Rust
- Validation: Runtime type checking (optional)
## Example
```typescript
class ResourceTypeRegistry {
private static validators = new Map<string, (data: unknown) => any>();
static register<T>(resourceType: string, validator: (data: unknown) => T) {
this.validators.set(resourceType, validator);
}
static decode(resourceType: string, data: unknown): any {
const validator = this.validators.get(resourceType);
if (!validator) {
throw new Error(`Unknown resource type: ${resourceType}`);
}
return validator(data);
}
}
// Auto-generated from specta
export const resourceTypeMap = {
'file': File,
'album': Album,
'tag': Tag,
'location': Location,
} as const;
// Auto-registration
Object.entries(resourceTypeMap).forEach(([type, TypeClass]) => {
ResourceTypeRegistry.register(type, (data) => data as InstanceType<typeof TypeClass>);
});
```
## Acceptance Criteria
- [ ] ResourceTypeRegistry implemented
- [ ] Auto-generated resourceTypeMap from specta
- [ ] Type safety preserved through generics
- [ ] Error handling for unknown types
- [ ] Unit tests for registration and decoding
- [ ] Integration with EventCacheUpdater
## References
- `docs/core/events.md` lines 302-389
- Specta codegen: `xtask/src/specta_gen.rs`

View File

@@ -0,0 +1,75 @@
---
id: CORE-014
title: Specta Codegen for Resource Events
status: To Do
assignee: unassigned
parent: CORE-011
priority: High
tags: [codegen, specta, typescript, swift]
depends_on: [CORE-011]
---
## Description
Extend the existing specta codegen system to auto-generate resource type registries for TypeScript and Swift. This ensures client-side type registries stay in sync with Rust domain models.
## Implementation Steps
1. Update `xtask/src/specta_gen.rs` to collect all `Identifiable` types
2. Generate TypeScript `resourceTypeMap` with all resource types
3. Generate Swift `ResourceTypeRegistry+Generated.swift` with registrations
4. Add build verification that all Identifiable types are registered
5. Update CI to regenerate on every commit
6. Document regeneration process for developers
## Generated Output
### TypeScript
```typescript
// packages/client/src/bindings/resourceRegistry.ts
export const resourceTypeMap = {
'file': File,
'album': Album,
'tag': Tag,
'location': Location,
'device': Device,
'volume': Volume,
'content_identity': ContentIdentity,
// ... all Identifiable types
} as const;
```
### Swift (Future)
```swift
// SpacedriveCore/Generated/ResourceTypeRegistry+Generated.swift
extension ResourceTypeRegistry {
static func registerAllTypes() {
register(File.self)
register(Album.self)
register(Tag.self)
// ... all Identifiable types
}
}
```
## Technical Details
- Location: `xtask/src/specta_gen.rs`
- Trait marker: Check for `impl Identifiable`
- Output: `packages/client/src/bindings/resourceRegistry.ts`
- Build step: `cargo xtask specta-gen`
- CI: Auto-run on pre-commit or CI build
## Acceptance Criteria
- [ ] Specta codegen extended for resource types
- [ ] TypeScript resourceTypeMap auto-generated
- [ ] Build verification ensures all types registered
- [ ] CI/CD regenerates on every commit
- [ ] Developer documentation updated
- [ ] Diff checking prevents manual edits
## References
- `docs/core/events.md` lines 391-434
- Existing: `xtask/src/specta_gen.rs`

View File

@@ -0,0 +1,68 @@
---
id: CORE-015
title: Normalized Client Cache (Swift)
status: To Do
assignee: unassigned
priority: High
tags: [client, swift, cache, performance]
depends_on: [CORE-012]
---
## Description
Implement the normalized client cache for iOS/macOS apps. Provides instant UI updates, offline support, and massive bandwidth savings by normalizing all resources by ID and updating atomically when events arrive.
## Implementation Steps
1. Create `NormalizedCache` actor with two-level structure:
- Level 1: Entity store (normalized by ID)
- Level 2: Query index (maps queries to entity IDs)
2. Implement `updateEntity<T>()` - updates entity and notifies observers
3. Implement `query<T>()` - caches queries and results
4. Implement `deleteEntity()` - removes entity and updates indices
5. Implement `invalidateQueriesForResource()` - bulk operation handling
6. Add LRU eviction (max 10K entities)
7. Add SQLite persistence for offline support
8. Create `EventCacheUpdater` for event integration
## Cache Architecture
```
┌─────────────────────────────────────────┐
│ Entity Store (Level 1) │
│ "file:uuid-1" → File { ... } │
│ "album:uuid-2" → Album { ... } │
└─────────────────────────────────────────┘
│ Atomic updates
┌─────────────────────────────────────────┐
│ Query Index (Level 2) │
│ "search:photos" → ["file:uuid-1", ...] │
│ "albums.list" → ["album:uuid-2"] │
└─────────────────────────────────────────┘
```
## Technical Details
- Location: `packages/client-swift/Sources/SpacedriveCore/Cache/NormalizedCache.swift`
- Actor for thread-safety
- Max entities: 10,000 (configurable)
- TTL: 5 minutes default (query-specific)
- Persistence: SQLite in app cache directory
## Acceptance Criteria
- [ ] NormalizedCache actor implemented
- [ ] Entity store with LRU eviction
- [ ] Query index with TTL
- [ ] SQLite persistence
- [ ] EventCacheUpdater integration
- [ ] ObservableObject wrapper for SwiftUI
- [ ] Memory stays under 15MB with 10K entities
- [ ] Unit tests for cache operations
- [ ] Integration tests with events
## References
- `docs/core/normalized_cache.md` - Complete specification

View File

@@ -0,0 +1,77 @@
---
id: CORE-016
title: Normalized Client Cache (TypeScript)
status: To Do
assignee: unassigned
priority: High
tags: [client, typescript, react, cache, performance]
depends_on: [CORE-013]
---
## Description
Implement the normalized client cache for web/desktop (Electron) apps. Same architecture as Swift version but with React integration via hooks.
## Implementation Steps
1. Create `NormalizedCache` class with entity store + query index
2. Implement `updateEntity()` with subscription notifications
3. Implement `query()` with caching
4. Implement `deleteEntity()` and query invalidation
5. Add LRU eviction
6. Add IndexedDB persistence for offline support
7. Create `useCachedQuery` React hook
8. Create `EventCacheUpdater` for event integration
## React Integration
```typescript
function useCachedQuery<T>(
method: string,
input: any,
): { data: T[] | null; loading: boolean; error: Error | null } {
const cache = useContext(CacheContext);
const [data, setData] = useState<T[] | null>(null);
useEffect(() => {
const queryKey = cache.generateQueryKey(method, input);
// Subscribe to cache changes
const unsubscribe = cache.subscribe(queryKey, () => {
const result = cache.getQueryResult<T>(queryKey);
setData(result);
});
// Initial fetch
cache.query<T>(method, input).then(setData);
return unsubscribe;
}, [method, JSON.stringify(input)]);
return { data, loading: data === null, error: null };
}
```
## Technical Details
- Location: `packages/client/src/core/NormalizedCache.ts`
- React hook: `packages/client/src/hooks/useCachedQuery.ts`
- Max entities: 10,000
- TTL: 5 minutes default
- Persistence: IndexedDB
## Acceptance Criteria
- [ ] NormalizedCache class implemented
- [ ] Entity store with LRU eviction
- [ ] Query index with TTL
- [ ] IndexedDB persistence
- [ ] useCachedQuery hook
- [ ] EventCacheUpdater integration
- [ ] Memory stays under 15MB
- [ ] Unit tests for cache operations
- [ ] Integration tests with React components
## References
- `docs/core/normalized_cache.md` lines 188-279

View File

@@ -0,0 +1,67 @@
---
id: CORE-017
title: Optimistic Updates for Client Cache
status: To Do
assignee: unassigned
parent: CORE-015
priority: Medium
tags: [client, cache, ux, optimistic]
depends_on: [CORE-015, CORE-016]
---
## Description
Implement optimistic updates in the normalized cache, allowing instant UI feedback before server confirmation. If the action fails, the update is rolled back automatically.
## Implementation Steps
1. Add `optimisticUpdates` map to cache (pending_id → resource)
2. Implement `updateOptimistically()` - applies change immediately
3. Implement `commitOptimisticUpdate()` - replaces with confirmed data
4. Implement `rollbackOptimisticUpdate()` - reverts on error
5. Integrate with action execution flow
6. Add visual indicators for pending changes (optional)
## Flow Example
```typescript
// 1. Optimistic update (instant UI)
const pendingId = uuid();
await cache.updateOptimistically(pendingId, {
id: albumId,
name: newName,
...optimisticAlbum
});
try {
// 2. Send action to server
const confirmed = await client.action('albums.rename.v1', { id: albumId, name: newName });
// 3. Commit (replace optimistic with confirmed)
await cache.commitOptimisticUpdate(pendingId, confirmed);
} catch (error) {
// 4. Rollback on error
await cache.rollbackOptimisticUpdate(pendingId);
throw error;
}
```
## Technical Details
- Optimistic updates stored separately from confirmed entities
- UI sees merged view (optimistic + confirmed)
- Pending changes visually indicated (future)
- Automatic rollback on action failure
## Acceptance Criteria
- [ ] Optimistic update API implemented
- [ ] UI updates instantly before server response
- [ ] Rollback works on errors
- [ ] No flickering during commit
- [ ] Unit tests for optimistic flow
- [ ] Integration tests validate error scenarios
## References
- `docs/core/normalized_cache.md` lines 685-741

View File

@@ -1,8 +1,8 @@
---
id: LSYNC-000
title: "Epic: Library-based Synchronization"
status: To Do
assignee: unassigned
status: In Progress
assignee: james
priority: High
tags: [epic, sync, networking, library-sync]
whitepaper: Section 4.5.1
@@ -10,4 +10,57 @@ whitepaper: Section 4.5.1
## Description
This epic covers the implementation of the "Library Sync" model, a novel approach to synchronization that leverages the VDFS index to avoid the complexities of CRDTs. It treats metadata (index, audit log) and file operations as separate, coordinated streams, enabling efficient and robust syncing between peers.
This epic covers the implementation of the "Library Sync" system, enabling real-time, multi-device synchronization of library metadata. The architecture consists of three pillars: TransactionManager (write gatekeeper), Sync Log (append-only change log), and Sync Service (pull-based replication).
## Current Status
**Completed (Phase 1)**:
- NET-001: Iroh P2P stack
- NET-002: Device pairing protocol
- LSYNC-004: SyncRelationship schema
- LSYNC-005: Library sync setup (device discovery & registration)
- LSYNC-001: Protocol design (documented in `docs/core/`)
**In Progress (Phase 2)**:
- LSYNC-006: TransactionManager core
- LSYNC-007: Syncable trait & derives
- LSYNC-008: Sync log schema
- LSYNC-009: Leader election
**Upcoming (Phase 3)**:
- LSYNC-013: Sync protocol handler (message-based)
- LSYNC-010: Sync service (leader & follower)
- LSYNC-011: Conflict resolution
- LSYNC-002: Metadata sync (albums/tags)
- LSYNC-012: Entry sync (bulk optimization)
## Architecture
**Message-based Sync**: Push notifications via dedicated sync protocol instead of polling for better performance, lower latency, and battery efficiency.
See `docs/core/sync.md` for complete specification.
## Subtasks
### Phase 1: Foundation (Completed)
- LSYNC-001: Protocol design ✅
- LSYNC-004: Database schema ✅
- LSYNC-005: Sync setup ✅
### Phase 2: Core Infrastructure (In Progress)
- LSYNC-006: TransactionManager
- LSYNC-007: Syncable trait
- LSYNC-008: Sync log schema (separate DB)
- LSYNC-009: Leader election
### Phase 3: Sync Services (Next)
- LSYNC-013: Sync protocol handler (push-based)
- LSYNC-010: Sync service (leader & follower)
- LSYNC-011: Conflict resolution
### Phase 4: Application (After Phase 3)
- LSYNC-002: Metadata sync (albums/tags)
- LSYNC-012: Entry sync (bulk optimization)
### Future
- LSYNC-003: File operations (sync conduits)

View File

@@ -1,8 +1,8 @@
---
id: LSYNC-001
title: Design Library Sync Protocol
status: To Do
assignee: unassigned
status: Done
assignee: james
parent: LSYNC-000
priority: High
tags: [sync, networking, protocol, design]
@@ -13,14 +13,30 @@ whitepaper: Section 4.5.1
Design the detailed protocol for Library Sync. This includes defining the communication flow between peers, the format of the messages, and the logic for initiating and managing a sync session.
## Implementation Steps
**Update**: The sync protocol has been fully designed and documented as the "Three Pillars" architecture:
1. TransactionManager (sole gatekeeper for writes)
2. Sync Log (append-only, sequentially-ordered)
3. Sync Service (pull-based replication)
1. Define the state machine for a sync session (e.g., negotiation, metadata exchange, file operations, completion).
2. Specify the protocol messages for each stage of the sync process.
3. Design the error handling and recovery mechanisms.
4. Create a sequence diagram to illustrate the protocol flow.
## Design Documents
- `docs/core/sync.md` - Complete sync system specification
- `docs/core/sync-setup.md` - Library sync setup flow
- `docs/core/events.md` - Unified event system
- `docs/core/normalized_cache.md` - Client-side cache
- `docs/core/devices.md` - Device system and leadership
## Architecture Decisions
- **Pull-based sync**: Followers poll leader every 5 seconds
- **Bulk optimization**: 1K+ items create metadata-only sync logs
- **Leader election**: Single leader per library assigns sequences
- **Conflict resolution**: Last-Write-Wins via version field
- **No CRDTs**: Simpler approach, sufficient for metadata
## Acceptance Criteria
- [ ] A detailed protocol design document is created.
- [ ] The protocol addresses all aspects of the Library Sync model.
- [ ] The design is approved and ready for implementation.
- [x] Detailed protocol design document created
- [x] Protocol addresses all aspects of Library Sync
- [x] Design reviewed and approved
- [x] Implementation tasks created (LSYNC-006 through LSYNC-011)

View File

@@ -1,26 +1,46 @@
---
id: LSYNC-002
title: Metadata Sync (Index & Audit Log)
title: Metadata Sync (Albums, Tags, Locations)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, networking, database, metadata]
whitepaper: Section 4.5.1
tags: [sync, metadata, albums, tags]
depends_on: [LSYNC-006, LSYNC-010]
---
## Description
Implement the metadata synchronization part of the Library Sync protocol. This involves efficiently syncing the VDFS index and the audit log between two peers.
Implement metadata synchronization for user-created resources (Albums, Tags, Locations) using the sync system. This is the first practical application of the TransactionManager and sync follower service.
**Note**: This task focuses on rich metadata (albums, tags), NOT file entries. File entry sync is handled separately with bulk optimization (see LSYNC-012-entry-sync.md).
## Implementation Steps
1. Develop a mechanism to efficiently diff the SQLite databases of the two peers.
2. Implement the logic to transfer the missing index and audit log entries.
3. Ensure that the metadata sync is atomic and consistent.
4. Optimize the data transfer to minimize network usage.
1. Implement `Syncable` trait for `albums::Model`
2. Implement `Syncable` trait for `tags::Model`
3. Implement `Syncable` trait for `locations::Model`
4. Update album/tag/location actions to use TransactionManager
5. Verify sync logs created on leader device
6. Verify follower service applies changes correctly
7. Test cross-device album/tag creation and updates
## Technical Details
- Albums: Sync name, cover, description
- Tags: Sync name, color
- Locations: Sync metadata only (path is device-specific)
- Entry relationships: Sync via junction tables
## Acceptance Criteria
- [ ] Two peers can successfully sync their VDFS index and audit log.
- [ ] The metadata sync is efficient and scalable.
- [ ] The synced data is consistent and correct on both peers.
- [ ] Albums sync between devices
- [ ] Tags sync between devices
- [ ] Location metadata syncs
- [ ] Changes appear instantly in client cache
- [ ] Conflict resolution works for concurrent edits
- [ ] Integration tests validate cross-device sync
## References
- See `docs/core/sync.md` for domain sync strategy

View File

@@ -1,26 +1,45 @@
---
id: LSYNC-003
title: File Operation Sync (via Action System)
title: Cross-Device File Operations (Future Phase)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, networking, actions, jobs]
whitepaper: Section 4.5.1
priority: Low
tags: [sync, file-ops, future]
---
## Description
Implement the file operation synchronization part of the Library Sync protocol. This involves using the Action System to replicate file operations (copy, move, delete) between peers based on the synced metadata.
Enable file operations (copy, move, delete) to be executed across devices. This is a **future phase** feature - not part of the initial sync implementation.
## Implementation Steps
**Current Architecture**: File operations are device-local. If you delete a file on Device A, only the metadata syncs (the Entry is marked deleted). Device B sees the metadata change but does NOT delete its local file copy.
1. Develop a mechanism to translate the diff of the audit logs into a series of `Action`s.
2. Implement the logic to dispatch these `Action`s to the `ActionManager` on the target peer.
3. Ensure that the file operations are executed in the correct order and with the correct context.
4. Integrate this with the overall Library Sync protocol.
**Future Goal**: User can optionally enable "sync conduits" where file operations replicate across devices. Example: Delete on Device A → Device B also deletes local file.
## Implementation Steps (Future)
1. Design "sync conduit" configuration (which locations participate)
2. File operation actions emit special sync log entries
3. Follower service recognizes file-op entries
4. Follower executes corresponding local file operation
5. Handle conflicts (file already deleted, moved, etc.)
6. Add user controls for sync conduit policies
## Why Not Phase 1?
- Metadata sync is complex enough initially
- File operations need robust conflict resolution
- Users may not want all devices to mirror operations
- Bandwidth/storage considerations (mobile devices)
## Acceptance Criteria
- [ ] File operations are correctly replicated on the target peer.
- [ ] The system can handle conflicts and errors during file operation sync.
- [ ] The file operation sync is integrated seamlessly into the Library Sync protocol.
- [ ] Sync conduit configuration schema
- [ ] File operations create special sync log type
- [ ] Follower can execute file operations
- [ ] Conflict resolution for file ops
- [ ] User can enable/disable per location pair
## References
- `docs/core/sync.md` - Sync domains (Content future phase)

View File

@@ -0,0 +1,31 @@
---
id: LSYNC-005
title: Library Sync Setup (Device Registration & Discovery)
status: Done
assignee: james
parent: LSYNC-000
priority: High
tags: [sync, networking, library-setup, device-pairing]
---
## Description
Implement the library sync setup flow that enables paired devices to discover each other's libraries and register for synchronization. This is Phase 1 of the sync system (RegisterOnly mode).
## Implementation Notes
- Complete implementation detailed in `docs/core/sync-setup.md`
- Devices must be paired (NET-002) before library sync setup
- Two-phase process: Discovery → Registration
- Bidirectional device registration in each library's database
- Leader election during setup
- No actual sync replication in Phase 1 (just registration)
## Acceptance Criteria
- [x] Paired devices can discover each other's libraries
- [x] Devices can be registered in remote library databases
- [x] Sync leadership assigned during setup
- [x] `sync_setup.discover.v1` query implemented
- [x] `sync_setup.input.v1` action implemented
- [x] Integration tests validate cross-device setup

View File

@@ -0,0 +1,46 @@
---
id: LSYNC-006
title: TransactionManager Core Implementation
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: Critical
tags: [sync, database, transaction, architecture]
---
## Description
Implement the TransactionManager, the sole gatekeeper for all syncable database writes. It guarantees atomic DB commits + sync log creation, ensuring that state changes are always logged for synchronization.
## Implementation Steps
1. Create `TransactionManager` struct with event bus and sync sequence tracking
2. Implement `commit<M, R>()` for single resource changes
3. Implement `commit_batch<M, R>()` for 10-1K item batches
4. Implement `commit_bulk<M>()` for 1K+ items (metadata-only sync logs)
5. Add sequence number generation (only on leader devices)
6. Integrate with event emission (automatic ResourceChanged events)
7. Add `with_tx()` for raw SQL compatibility
## Technical Details
- Location: `core/src/infra/transaction/manager.rs`
- Must check leader status before assigning sequence numbers
- Atomic transaction: DB write + sync log entry creation
- Auto-emit events via EventBus after commit
- Bulk operations create single metadata sync log (not per-item)
## Acceptance Criteria
- [ ] TransactionManager can commit single resources with sync logs
- [ ] Batch operations create per-item sync logs
- [ ] Bulk operations create metadata-only sync logs
- [ ] Events emitted automatically after commits
- [ ] Leader check prevents non-leaders from creating sync logs
- [ ] Unit tests verify atomicity
- [ ] Integration tests validate sync log creation
## References
- `docs/core/sync.md` - Complete specification
- Phase 1 dependency for sync system

View File

@@ -0,0 +1,61 @@
---
id: LSYNC-007
title: Syncable Trait & Derive Macros
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, trait, codegen, macro]
---
## Description
Create the `Syncable` trait that database models implement to enable automatic sync log creation. Includes derive macro for ergonomic implementation.
## Implementation Steps
1. Define `Syncable` trait with required methods:
- `SYNC_MODEL: &'static str` - Model identifier
- `sync_id() -> Uuid` - Global resource ID
- `version() -> i64` - For conflict resolution
- `exclude_fields()` - Optional field exclusion
- `to_sync_json()` - Optional custom serialization
2. Create `#[derive(Syncable)]` macro
3. Implement for initial models: Album, Tag, Location
4. Add validation that `sync_id` is unique across model
5. Document field exclusion patterns (db IDs, timestamps)
## Technical Details
- Location: `core/src/infra/sync/syncable.rs`
- Macro location: `crates/sync-derive/src/lib.rs`
- Must integrate with SeaORM models
- `exclude_fields()` prevents platform-specific data from syncing
## Example Usage
```rust
impl Syncable for albums::Model {
const SYNC_MODEL: &'static str = "album";
fn sync_id(&self) -> Uuid { self.uuid }
fn version(&self) -> i64 { self.version }
fn exclude_fields() -> Option<&'static [&'static str]> {
Some(&["id", "created_at", "updated_at"])
}
}
```
## Acceptance Criteria
- [ ] `Syncable` trait defined
- [ ] Derive macro implemented
- [ ] Works with SeaORM models
- [ ] Field exclusion functional
- [ ] Documentation with examples
- [ ] Unit tests for derive macro
## References
- `docs/core/sync.md` lines 60-118

View File

@@ -0,0 +1,107 @@
---
id: LSYNC-008
title: Sync Log Database Schema & Entity (Separate DB)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, database, schema, migration]
---
## Description
Create the sync log database schema and SeaORM entity. The sync log is an append-only, sequentially-ordered log of all state changes per library, maintained by the leader device.
**Architecture Decision**: The sync log lives in its own separate database (`sync_log.db`) in the Library data folder rather than in the main library database. This provides:
- Better performance (no query contention)
- Easier maintenance (vacuum, archive old entries)
- Cleaner separation (infrastructure vs domain data)
- Simpler backup/restore (library can be backed up without sync log)
## Implementation Steps
1. Create sync log database connection per library
2. Create migration for `sync_log` table (in sync DB)
3. Create SeaORM entity `core/src/infra/db/entities/sync_log.rs`
4. Add indexes for efficient querying:
- `(sequence)` - Primary lookup for sync (library_id not needed since it's per-library DB)
- `(device_id)` - Filter by originating device
- `(model_type, record_id)` - Find changes to specific records
5. Add unique constraint on `(sequence)`
6. Create `SyncLogDb` wrapper for database lifecycle
7. Create helper methods for querying sync entries
## Schema
```sql
CREATE TABLE sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sequence INTEGER NOT NULL UNIQUE, -- Monotonic per library (unique since this DB is per-library)
device_id TEXT NOT NULL, -- Device that created this entry
timestamp TEXT NOT NULL,
-- Change details
model_type TEXT NOT NULL, -- "album", "tag", "entry", "bulk_operation"
record_id TEXT NOT NULL, -- UUID of changed record
change_type TEXT NOT NULL, -- "insert", "update", "delete", "bulk_insert"
version INTEGER NOT NULL DEFAULT 1, -- Optimistic concurrency
-- Data payload (JSON)
data TEXT NOT NULL
);
CREATE INDEX idx_sync_log_sequence ON sync_log(sequence);
CREATE INDEX idx_sync_log_device ON sync_log(device_id);
CREATE INDEX idx_sync_log_model_record ON sync_log(model_type, record_id);
CREATE INDEX idx_sync_log_timestamp ON sync_log(timestamp);
```
**Note**: `library_id` field removed since each library has its own sync log database.
## Database Location
- Path: `~/.spacedrive/libraries/{library_uuid}/sync.db`
- One sync log DB per library
- Created automatically when library is opened
- Managed by `SyncLogDb` wrapper
## SyncLogDb Wrapper
```rust
pub struct SyncLogDb {
library_id: Uuid,
conn: DatabaseConnection,
}
impl SyncLogDb {
/// Open or create sync log DB for library
pub async fn open(library_id: Uuid, data_dir: &Path) -> Result<Self, DbError>;
/// Append entry to sync log (leader only)
pub async fn append(&self, entry: SyncLogEntry) -> Result<u64, DbError>;
/// Fetch entries since sequence
pub async fn fetch_since(&self, sequence: u64, limit: usize) -> Result<Vec<SyncLogEntry>, DbError>;
/// Get latest sequence number
pub async fn latest_sequence(&self) -> Result<u64, DbError>;
/// Vacuum old entries (> 30 days)
pub async fn vacuum_old_entries(&self, before: DateTime<Utc>) -> Result<usize, DbError>;
}
```
## Acceptance Criteria
- [ ] Per-library sync log database created
- [ ] Migration created and tested
- [ ] SeaORM entity implemented
- [ ] Indexes created for performance
- [ ] SyncLogDb wrapper implemented
- [ ] Helper methods for common queries
- [ ] Database lifecycle managed correctly
- [ ] Documentation of schema design
## References
- `docs/core/sync.md` lines 211-236

View File

@@ -0,0 +1,58 @@
---
id: LSYNC-009
title: Sync Leader Election & Lease Management
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, leadership, distributed-systems]
---
## Description
Implement the leader election mechanism that ensures each library has a single leader device responsible for assigning sync log sequence numbers. This prevents sequence collisions and ensures consistent ordering.
## Implementation Steps
1. Create `SyncLeader` struct with lease tracking
2. Implement `request_leadership()` method
3. Implement `is_leader()` check
4. Add heartbeat mechanism (leader sends every 30s)
5. Implement re-election on leader timeout (>60s)
6. Use highest device_id as tiebreaker
7. Integrate with TransactionManager
## Election Strategy
- **Initial leader**: Device that creates the library
- **Heartbeat**: Leader sends heartbeat every 30 seconds
- **Re-election**: If leader offline >60s, devices elect new leader
- **Tiebreaker**: Highest device_id wins
- **Lease**: Leader holds exclusive write lease
## Technical Details
- Location: `core/src/infra/sync/leader.rs`
- Leadership state stored in device's `sync_leadership` field
- Lease expires_at tracked per library
- TransactionManager checks leadership before assigning sequences
## Acceptance Criteria
- [ ] Leader election on library creation
- [ ] Heartbeat mechanism prevents false timeouts
- [ ] Re-election works when leader goes offline
- [ ] Only leader can create sync log entries
- [ ] Follower devices reject write attempts with clear error
- [ ] Integration tests validate failover scenarios
## Future Enhancements
- Multi-leader support with sequence partitioning
- Manual leader reassignment via admin action
- Leader election metrics and monitoring
## References
- `docs/core/sync.md` lines 238-280
- `docs/core/devices.md` - Sync leadership model

View File

@@ -0,0 +1,162 @@
---
id: LSYNC-010
title: Sync Service (Leader & Follower)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, replication, service, leader, follower]
depends_on: [LSYNC-006, LSYNC-008, LSYNC-009, LSYNC-013]
---
## Description
Implement the complete sync service with both leader and follower functionality. The leader pushes notifications to followers when new sync log entries are created, and followers receive these notifications and apply changes locally.
**Architecture**: Message-based (push) instead of polling for better performance, lower latency, and battery efficiency.
## Implementation Steps
### Core Service
1. Create `SyncService` struct with role-specific behavior
2. Initialize service when library opens (determines leader/follower role)
3. Integrate with SyncProtocolHandler for messaging
4. Handle role transitions (leader election changes)
### Leader Functionality
5. Subscribe to TransactionManager commit events
6. Implement `on_commit()` - called after sync log entry created
7. Implement `notify_followers()` - sends NewEntries to all followers
8. Implement batch notification logic (debounce within 100ms)
9. Implement message handler for `SyncMessage::FetchEntries`
10. Implement message handler for `SyncMessage::Heartbeat`
11. Track connected followers per library
### Follower Functionality
12. Implement message handler for `SyncMessage::NewEntries`
13. Implement `request_entries()` - uses SyncProtocolHandler to fetch
14. Implement `apply_sync_entry()` - deserializes and applies changes
15. Track `last_synced_sequence` per library
16. Handle bulk operation metadata (trigger local indexing)
17. Handle connection loss with reconnect logic
18. Send heartbeats to leader
## Technical Details
- Location: `core/src/service/sync/`
- `mod.rs` - SyncService orchestrator
- `leader.rs` - Leader-specific logic
- `follower.rs` - Follower-specific logic
- Push-based: Leader notifies when changes happen
- Batch size: Max 100 entries per request
- Error handling: Retry with exponential backoff
- Gap detection: Detect missed entries and reconcile
## Complete Flow
### Leader Side
```
1. TransactionManager commits change + creates sync log entry
2. Leader receives commit event
3. Leader groups entries (if multiple commits in <100ms)
4. Leader sends SyncMessage::NewEntries to all followers:
- library_id
- from_sequence (start of batch)
- to_sequence (end of batch)
- entry_count
5. Follower requests entries with FetchEntries
6. Leader responds with EntriesResponse (up to 100 entries)
7. Follower sends Acknowledge
```
### Follower Side
```
1. Listen for SyncMessage::NewEntries from leader
2. On notification:
- Send FetchEntries request (since last_synced_sequence)
- Receive EntriesResponse
- For each entry:
- Deserialize model
- Apply change to local DB
- Update last_synced_sequence
- Send Acknowledge
3. On connection loss:
- Reconnect
- Send Heartbeat with current sequence
- Leader sends SyncRequired if needed
```
## Service Structure
```rust
pub struct SyncService {
library_id: Uuid,
role: SyncRole,
sync_log_db: Arc<SyncLogDb>,
protocol_handler: Arc<SyncProtocolHandler>,
event_bus: Arc<EventBus>,
// Leader-specific
pending_batches: Arc<Mutex<HashMap<Uuid, NotificationBatch>>>,
followers: Arc<RwLock<HashSet<Uuid>>>,
// Follower-specific
last_synced_sequence: Arc<Mutex<u64>>,
}
impl SyncService {
/// Create and start sync service for library
pub async fn start(
library_id: Uuid,
role: SyncRole,
sync_log_db: Arc<SyncLogDb>,
protocol_handler: Arc<SyncProtocolHandler>,
) -> Result<Self, SyncError>;
/// Leader: Notify followers of new entries
async fn notify_followers(&self, from_seq: u64, to_seq: u64);
/// Follower: Apply sync entry locally
async fn apply_sync_entry(&self, entry: SyncLogEntry);
/// Handle role transition (election change)
pub async fn transition_role(&mut self, new_role: SyncRole);
}
## Acceptance Criteria
### Leader
- [ ] Leader service receives commit events
- [ ] Notifications sent to all followers instantly
- [ ] Batch notifications for rapid commits (100ms window)
- [ ] Handles FetchEntries requests
- [ ] Responds with EntriesResponse
- [ ] Tracks connected followers
- [ ] Handles disconnections gracefully
### Follower
- [ ] Follower receives NewEntries push notifications
- [ ] Entries fetched and applied correctly
- [ ] Sequence tracking prevents duplicate application
- [ ] Bulk operations trigger local jobs (not replication)
- [ ] Connection loss handled with reconnect
- [ ] Gap detection triggers full reconciliation
### Integration
- [ ] Service starts correctly based on device role
- [ ] Role transitions handled (leader election)
- [ ] Integration tests validate device-to-device sync
- [ ] Multi-follower scenario tested
## Performance Benefits vs Polling
- **Latency**: Instant (push) vs 5s average (polling)
- **Bandwidth**: Only when changes occur vs constant polls
- **Battery**: Idle until notification vs wake every 5s
## References
- `docs/core/sync.md` - Complete sync specification
- Protocol: LSYNC-013 (Sync protocol handler)
- Leader election: LSYNC-009
- TransactionManager: LSYNC-006

View File

@@ -0,0 +1,68 @@
---
id: LSYNC-011
title: Sync Conflict Resolution (Optimistic Concurrency)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: Medium
tags: [sync, conflict-resolution, versioning]
depends_on: [LSYNC-010]
---
## Description
Implement conflict resolution for sync entries using optimistic concurrency control. All Syncable models have a version field; when applying updates, the system compares versions to determine which change wins.
## Implementation Steps
1. Implement `apply_model_change()` with version checking
2. Add Last-Write-Wins (LWW) strategy
3. Handle Insert/Update/Delete operations
4. Skip updates when local version is newer
5. Log conflicts for debugging/monitoring
6. Add optional conflict resolution UI hooks (future)
## Conflict Strategy
- **Last-Write-Wins (LWW)**: Use version field to determine winner
- **Insert**: Always apply (no conflict possible)
- **Update**: Compare versions, skip if local >= remote
- **Delete**: Always apply (tombstone)
- **User Metadata** (tags, albums): Union merge (future)
## Technical Details
- Location: `core/src/service/sync/conflict.rs`
- Version field: Monotonically increasing integer
- Timestamp-based versioning for some models
- No CRDTs in Phase 1 (simpler, sufficient for metadata)
## Example Logic
```rust
if remote_model.version > local_model.version {
// Remote is newer - apply update
remote_model.update(db).await?;
} else {
// Local is newer or same - skip
tracing::debug!("Skipping sync entry: local version is newer");
}
```
## Acceptance Criteria
- [ ] Version comparison logic implemented
- [ ] Conflicts resolved automatically
- [ ] Conflicts logged for monitoring
- [ ] Unit tests cover all conflict scenarios
- [ ] Integration tests validate cross-device conflicts
## Future Enhancements
- CRDT-based merge for rich text fields
- User-facing conflict resolution UI
- Conflict metrics and alerting
## References
- `docs/core/sync.md` lines 403-443

View File

@@ -0,0 +1,73 @@
---
id: LSYNC-012
title: Entry Sync with Bulk Optimization
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, indexing, bulk, performance]
depends_on: [LSYNC-006, LSYNC-010]
---
## Description
Implement entry (file/folder) synchronization with bulk optimization. When a device indexes 1M files, it creates ONE metadata sync log instead of 1M individual entries. Other devices trigger their own local indexing when they see this notification.
## The Problem
Naive approach: Index 1M files → Create 1M sync log entries → 500MB sync log → 10 minutes to replicate
**This doesn't scale.**
## The Solution
Bulk operations create metadata-only sync logs:
```json
{
"sequence": 1234,
"model_type": "bulk_operation",
"operation": "InitialIndex",
"location_id": "uuid-...",
"affected_count": 1000000,
"hints": { "location_path": "/Users/alice/Photos" }
}
```
Other devices see this, check if they have a matching location, and trigger their own indexing job.
## Implementation Steps
1. Create `BulkOperation` enum (InitialIndex, WatcherBatch)
2. Update `commit_bulk()` in TransactionManager
3. Create bulk operation sync log entries
4. Implement `handle_bulk_operation()` in sync follower
5. Match location by path/fingerprint on remote device
6. Queue local IndexerJob when match found
7. Handle watcher batches (10-1K items) with per-item logs
## Performance Impact
- **Before**: 1M entries, 500MB, 10 minutes, 3M operations
- **After**: 1 entry, 500 bytes, <1 second, 1M operations
- **Result**: 10x faster, 1 million times smaller!
## Technical Details
- Initial indexing: Always bulk (1K+ items)
- Watcher events: Batch if 10-1K, per-item if <10
- User operations: Always per-item (instant sync)
- Location matching: By path or volume fingerprint
## Acceptance Criteria
- [ ] Bulk operations create metadata-only sync logs
- [ ] Follower triggers local indexing on bulk notification
- [ ] 1M file indexing creates <10KB sync log
- [ ] Watcher batches use appropriate strategy
- [ ] Location matching across devices works
- [ ] Performance tests validate 10x improvement
## References
- `docs/core/sync.md` lines 172-196 (bulk operations)
- `docs/core/sync.md` lines 495-502 (performance metrics)

View File

@@ -0,0 +1,154 @@
---
id: LSYNC-013
title: Sync Protocol Handler (Message-based)
status: To Do
assignee: unassigned
parent: LSYNC-000
priority: High
tags: [sync, networking, protocol, push]
depends_on: [LSYNC-008, LSYNC-009]
---
## Description
Create a dedicated sync protocol handler for the networking layer that enables push-based sync via `SyncMessage` enum. This replaces polling with efficient message-passing between leader and follower devices.
## Architecture Decision
**Before**: Follower polls leader every 5 seconds (`sync_iteration()`)
- High latency (up to 5s)
- Wasted bandwidth (empty polls)
- Battery drain on mobile
**After**: Push-based messaging via dedicated protocol
- Instant updates (pushed when changes happen)
- No empty polls
- Bi-directional: Leader pushes, follower can request
## SyncMessage Enum
```rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SyncMessage {
// Leader → Follower: New entries available
NewEntries {
library_id: Uuid,
from_sequence: u64,
to_sequence: u64,
entry_count: usize,
},
// Follower → Leader: Request entries
FetchEntries {
library_id: Uuid,
since_sequence: u64,
limit: usize,
},
// Leader → Follower: Response with entries
EntriesResponse {
library_id: Uuid,
entries: Vec<SyncLogEntry>,
has_more: bool,
},
// Follower → Leader: Acknowledge received
Acknowledge {
library_id: Uuid,
up_to_sequence: u64,
},
// Bi-directional: Heartbeat
Heartbeat {
library_id: Uuid,
current_sequence: u64,
role: SyncRole,
},
// Leader → Follower: You're behind, full sync needed
SyncRequired {
library_id: Uuid,
reason: String,
},
}
```
## Implementation Steps
1. Create `core/src/service/network/protocol/sync/` directory
2. Create `sync/mod.rs` - Main protocol handler
3. Create `sync/messages.rs` - SyncMessage enum
4. Create `sync/leader.rs` - Leader-side message handling
5. Create `sync/follower.rs` - Follower-side message handling
6. Register protocol with ALPN: `/spacedrive/sync/1.0.0`
7. Integrate with DeviceRegistry for connection lookup
8. Add connection lifecycle management
## Protocol Handler Structure
```rust
// core/src/service/network/protocol/sync/mod.rs
pub struct SyncProtocolHandler {
library_id: Uuid,
sync_log_db: Arc<SyncLogDb>,
device_registry: Arc<RwLock<DeviceRegistry>>,
event_bus: Arc<EventBus>,
role: SyncRole,
}
impl ProtocolHandler for SyncProtocolHandler {
const ALPN: &'static [u8] = b"/spacedrive/sync/1.0.0";
async fn handle_connection(
&self,
stream: BiStream,
peer_device_id: Uuid,
) -> Result<(), NetworkingError>;
}
impl SyncProtocolHandler {
// Leader: Push notification when new entries created
pub async fn notify_new_entries(
&self,
from_seq: u64,
to_seq: u64,
) -> Result<(), SyncError>;
// Follower: Request entries from leader
pub async fn request_entries(
&self,
since_seq: u64,
) -> Result<Vec<SyncLogEntry>, SyncError>;
// Handle incoming message
async fn handle_message(
&self,
msg: SyncMessage,
stream: &mut BiStream,
) -> Result<(), SyncError>;
}
```
## Connection Management
- Protocol uses Iroh BiStreams for bi-directional communication
- Leader maintains open connections to all follower devices
- Follower connects to leader on library open
- Heartbeat every 30 seconds to detect disconnections
- Auto-reconnect on connection loss
## Acceptance Criteria
- [ ] SyncProtocolHandler implemented
- [ ] SyncMessage enum defined
- [ ] Leader can push NewEntries notifications
- [ ] Follower can request entries
- [ ] BiStream communication working
- [ ] Protocol registered with correct ALPN
- [ ] Connection lifecycle managed
- [ ] Integration tests validate message flow
## References
- Existing protocols: `core/src/service/network/protocol/pairing/`
- Protocol registry: `core/src/service/network/protocol/registry.rs`

View File

@@ -329,3 +329,4 @@ Week 4: Observability + metrics + documentation
Week 5-6: Beta testing with various network configs
Week 7: Production rollout
```

View File

@@ -188,3 +188,4 @@ These are production-grade, handling 200k+ concurrent connections.
---
**See detailed plan**: [IROH_RELAY_INTEGRATION.md](./IROH_RELAY_INTEGRATION.md)

View File

@@ -1,132 +0,0 @@
// Example: How the frontend would use the GraphQL API with full type safety
import { useQuery, useMutation, gql } from '@apollo/client';
import type {
Library,
CreateLibraryInput,
UpdateLibrarySettingsInput
} from './generated/graphql';
// GraphQL queries and mutations
const GET_LIBRARIES = gql`
query GetLibraries {
libraries {
id
name
path
description
totalFiles
totalSize
createdAt
updatedAt
}
}
`;
const CREATE_LIBRARY = gql`
mutation CreateLibrary($input: CreateLibraryInput!) {
createLibrary(input: $input) {
id
name
path
description
}
}
`;
const UPDATE_LIBRARY_SETTINGS = gql`
mutation UpdateLibrarySettings($input: UpdateLibrarySettingsInput!) {
updateLibrarySettings(input: $input) {
id
name
}
}
`;
// React component with full type safety
export function LibraryManager() {
// Fully typed query - TypeScript knows the shape of data
const { data, loading, error } = useQuery<{ libraries: Library[] }>(GET_LIBRARIES);
// Fully typed mutation
const [createLibrary] = useMutation<
{ createLibrary: Library },
{ input: CreateLibraryInput }
>(CREATE_LIBRARY);
const [updateSettings] = useMutation<
{ updateLibrarySettings: Library },
{ input: UpdateLibrarySettingsInput }
>(UPDATE_LIBRARY_SETTINGS);
const handleCreateLibrary = async () => {
// TypeScript enforces correct input shape
const result = await createLibrary({
variables: {
input: {
name: "My Photos",
description: "Personal photo collection",
location: "/Users/me/Pictures"
}
}
});
// result.data.createLibrary is fully typed as Library
console.log("Created library:", result.data?.createLibrary.name);
};
const handleUpdateSettings = async (libraryId: string) => {
// TypeScript ensures we pass valid settings
await updateSettings({
variables: {
input: {
id: libraryId,
generateThumbnails: true,
thumbnailQuality: 90,
enableAiTagging: false
}
}
});
};
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return (
<div>
<h1>Libraries</h1>
{data?.libraries.map(library => (
<div key={library.id}>
<h2>{library.name}</h2>
<p>Files: {library.totalFiles}</p>
<p>Size: {library.totalSize}</p>
{/* TypeScript knows all available fields */}
<button onClick={() => handleUpdateSettings(library.id)}>
Update Settings
</button>
</div>
))}
<button onClick={handleCreateLibrary}>Create Library</button>
</div>
);
}
// Example: Using with React hooks for even better DX
import { useGetLibrariesQuery, useCreateLibraryMutation } from './generated/graphql';
export function LibraryManagerWithHooks() {
// Even simpler with generated hooks!
const { data, loading } = useGetLibrariesQuery();
const [createLibrary] = useCreateLibraryMutation();
// Full intellisense and type checking
const libraries = data?.libraries ?? [];
return (
<div>
{libraries.map(lib => (
<div key={lib.id}>{lib.name}</div>
))}
</div>
);
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,180 @@
# Sync System Design Documentation
This directory contains **detailed design documents** for Spacedrive's multi-device synchronization and client-side caching architecture.
## 🎯 Implementation Guides (Start Here!)
For implementation, read these **root-level guides**:
1. **[../../sync.md](../../sync.md)** ⭐ **Sync System Implementation Guide**
- TransactionManager API and usage
- Syncable trait specification
- Leader election protocol
- Sync service implementation
- Production-ready reference
2. **[../../events.md](../../events.md)** ⭐ **Unified Event System**
- Generic resource events
- Type registry pattern (zero switch statements!)
- Client integration (Swift + TypeScript)
- Migration strategy
3. **[../../normalized_cache.md](../../normalized_cache.md)** ⭐ **Client-Side Normalized Cache**
- Cache architecture and implementation
- Memory management (LRU, TTL, ref counting)
- React and SwiftUI integration
- Optimistic updates and offline support
---
## 📚 Design Documents (Deep Dives)
The documents in this directory provide comprehensive design rationale and detailed exploration. Read these for context and decision history:
### 1. Foundation & Context
- **[SYNC_DESIGN.md](./SYNC_DESIGN.md)** - The original comprehensive sync architecture
- Covers: Sync domains (Index, Metadata, Content, State), conflict resolution, leader election
- Start here for foundational understanding
### 2. Core Implementation Specs
- **[SYNC_TX_CACHE_MINI_SPEC.md](./SYNC_TX_CACHE_MINI_SPEC.md)** ⭐ **START HERE FOR IMPLEMENTATION**
- Concise, actionable spec for `Syncable`/`Identifiable` traits
- TransactionManager API and semantics
- BulkChangeSet mechanism for efficient bulk operations
- Albums example with minimal boilerplate
- Raw SQL compatibility notes
- **[UNIFIED_RESOURCE_EVENTS.md](./UNIFIED_RESOURCE_EVENTS.md)** ⭐ **CRITICAL FOR EVENT SYSTEM**
- Generic resource event design (eliminates ~40 specialized event variants)
- Type registry pattern for zero-friction horizontal scaling
- Swift and TypeScript examples with auto-generation via specta
- **Key insight**: Zero switch statements when adding new resources
### 3. Unified Architecture
- **[UNIFIED_TRANSACTIONAL_SYNC_AND_CACHE.md](./UNIFIED_TRANSACTIONAL_SYNC_AND_CACHE.md)**
- Complete end-to-end architecture integrating sync + cache
- Context-aware commits: `transactional` vs `bulk` vs `silent`
- **Critical**: Bulk operations create ONE metadata sync entry (not millions)
- Performance analysis and decision rationale
- 2295 lines of comprehensive design (reference doc, not reading material)
### 4. Client-Side Caching
- **[NORMALIZED_CACHE_DESIGN.md](./NORMALIZED_CACHE_DESIGN.md)**
- Client-side normalized entity cache (similar to Apollo Client)
- Event-driven invalidation and atomic updates
- Memory management (LRU, TTL, reference counting)
- Swift and TypeScript implementation patterns
- 2674 lines covering edge cases and advanced scenarios
### 5. Implementation Analysis
- **[TRANSACTION_MANAGER_COMPATIBILITY.md](./TRANSACTION_MANAGER_COMPATIBILITY.md)**
- Compatibility analysis with existing codebase
- Current write patterns (SeaORM, transactions, raw SQL)
- Migration strategy with code examples
- Risk analysis and mitigation
- **Verdict**: Fully compatible, ready to implement ✅
### 6. Historical & Supplementary
- **[SYNC_DESIGN_2025_08_19.md](./SYNC_DESIGN_2025_08_19.md)** - Updated sync design iteration
- **[SYNC_FIRST_DRAFT_DESIGN.md](./SYNC_FIRST_DRAFT_DESIGN.md)** - Early draft (historical context)
- **[SYNC_INTEGRATION_NOTES.md](./SYNC_INTEGRATION_NOTES.md)** - Integration notes and considerations
- **[SYNC_CONDUIT_DESIGN.md](./SYNC_CONDUIT_DESIGN.md)** - Sync conduit specific design
---
## 🎯 Quick Reference
### Key Concepts
**Syncable** (Rust persistence models)
```rust
pub trait Syncable {
const SYNC_MODEL: &'static str;
fn sync_id(&self) -> Uuid;
fn version(&self) -> i64;
}
```
**Identifiable** (Client-facing resources)
```rust
pub trait Identifiable {
type Id;
fn resource_id(&self) -> Self::Id;
fn resource_type() -> &'static str;
}
```
**TransactionManager** (Sole write gateway)
- `commit()` - Single resource, per-entry sync log
- `commit_batch()` - Micro-batch (10-1K), per-entry sync logs
- `commit_bulk()` - Bulk (1K+), ONE metadata sync entry
**Event System** (Generic, horizontally scalable)
- `ResourceChanged { resource_type, resource }`
- `ResourceBatchChanged { resource_type, resources }`
- `BulkOperationCompleted { resource_type, affected_count, hints }`
### Critical Design Decisions
1. **Indexing ≠ Sync**: Each device indexes its own filesystem. Bulk operations create metadata notifications, not individual entry replications.
2. **Leader Election**: One device per library assigns sync log sequence numbers. Prevents collisions.
3. **Zero Manual Sync Logging**: TransactionManager automatically creates sync logs. Application code never touches sync infrastructure.
4. **Type Registry Pattern**: Clients use type registries (auto-generated via specta) to handle all resource events generically. No switch statements per resource type.
5. **Client-Side Cache**: Normalized entity store + query index. Events trigger atomic updates. Cache persistence for offline mode.
---
## 📋 Implementation Status
- [x] Design documentation complete
- [ ] Phase 1: Core infrastructure (TM, traits, events)
- [ ] Phase 2: Client prototype (Swift cache + event handler)
- [ ] Phase 3: Expansion (migrate all ops to TM)
- [ ] Phase 4: TypeScript port + advanced features
---
## 🔗 Related Documentation
**Implementation Guides** (Root Level):
- `../../sync.md` - Sync system implementation
- `../../events.md` - Unified event system
- `../../normalized_cache.md` - Client cache implementation
- `../../sync-setup.md` - Library sync setup (Phase 1)
**Infrastructure**:
- `../INFRA_LAYER_SEPARATION.md` - Infrastructure layer architecture
- `../JOB_SYSTEM_DESIGN.md` - Job system (indexing jobs integrate with TM)
- `../DEVICE_PAIRING_PROTOCOL.md` - Device pairing (prerequisite for sync)
---
## 📖 Documentation Philosophy
**Root-level docs** (`docs/core/*.md`):
- Implementation-ready guides
- Concise, actionable specifications
- Code examples and usage patterns
- Reference during development
**Design docs** (`docs/core/design/sync/*.md`):
- Comprehensive exploration
- Decision rationale and alternatives
- Edge cases and advanced scenarios
- Historical context
---
## 💡 Contributing
**Adding implementation guidance**: Update root-level docs (`sync.md`, `events.md`, `normalized_cache.md`)
**Adding design exploration**: Create new document in this directory:
1. Follow naming: `SYNC_<TOPIC>_DESIGN.md`
2. Update this README
3. Reference related documents
4. Include comprehensive examples

View File

@@ -0,0 +1,271 @@
# Sync + Transaction Manager + Normalized Cache: Mini Spec
## Scope
A concise specification aligning TransactionManager, Syncable/Identifiable traits, bulk change handling, raw query compatibility, and leader election. Includes a concrete Albums example with minimal boilerplate.
## Goals
- Zero manual sync-log creation in application code
- Keep raw SQL for complex reads; writes go through TransactionManager
- Bulk = mechanism (generic changeset), not hard-coded enum cases
- Clear trait-based configuration, minimal boilerplate
- Compatible with existing SeaORM patterns
## Core Traits
```rust
// client-facing identity for cache normalization
pub trait Identifiable {
type Id: Into<Uuid> + Copy + Eq + std::hash::Hash + Serialize + for<'de> Deserialize<'de>;
fn id(&self) -> Self::Id;
fn resource_type() -> &'static str;
}
// persistence-facing for sync logging
pub trait Syncable {
// stable model type name used in sync log
const SYNC_MODEL: &'static str;
// globally unique logical id for sync (Uuid recommended)
fn sync_id(&self) -> Uuid;
// optimistic concurrency
fn version(&self) -> i64;
// minimal payload for replication (defaults to full serde)
fn to_sync_json(&self) -> serde_json::Value where Self: Serialize {
serde_json::to_value(self).unwrap_or(serde_json::json!({}))
}
// optional field allow/deny (minimize boilerplate: both optional)
fn include_fields() -> Option<&'static [&'static str]> { None }
fn exclude_fields() -> Option<&'static [&'static str]> { None }
}
```
Notes:
- App code should not construct sync logs; TransactionManager derives them from `Syncable`.
- `include_fields`/`exclude_fields` are optional knobs. If both None, default to `to_sync_json()`.
## TransactionManager Responsibilities
- Enforce atomic DB write + sync log creation
- Emit rich events post-commit for client cache
- Support single, batch, and bulk change sets
- Provide a transaction-bound context for raw SQL when needed
### API (sketch)
```rust
pub struct TransactionManager { /* event bus, seq allocator, leader state */ }
pub struct ChangeSet<T> { pub items: Vec<T> } // generic mechanism for bulk
impl TransactionManager {
// single model
pub async fn commit<M: Syncable + IntoActiveModel>(
&self,
library: Arc<Library>,
model: M,
) -> Result<M, TxError>;
// micro-batch (101k), produces per-item sync entries
pub async fn commit_batch<M: Syncable + IntoActiveModel>(
&self,
library: Arc<Library>,
models: Vec<M>,
) -> Result<Vec<M>, TxError>;
// bulk (1k+), produces ONE metadata sync entry with ChangeSet descriptor
pub async fn commit_bulk<M: Syncable + IntoActiveModel>(
&self,
library: Arc<Library>,
changes: ChangeSet<M>,
) -> Result<BulkAck, TxError>;
}
pub struct BulkAck { pub affected: usize, pub token: Uuid }
```
### Sync Log Semantics
- commit: one sync entry per item
- commit_batch: one per item (same txn), event may be batched
- commit_bulk: ONE metadata sync entry:
```json
{
"sequence": 1234,
"model_type": "bulk_changeset",
"token": "uuid-token",
"affected": 1000000,
"model": "entry", // derived from Syncable::SYNC_MODEL
"mode": "insert|update|delete",
"hints": { "location_id": "..." }
}
```
Followers treat this as a notification; they DO NOT pull all items. They trigger local indexing where applicable.
## Raw Query Compatibility
- Reads: unrestricted (SeaORM query builder or raw SQL)
- Writes: perform inside TM-provided transaction handle
- TM exposes `with_tx(|txn| async { /* raw SQL writes */ })` that auto sync-logs via `Syncable` wrappers or explicit `commit_*` calls.
## Leader Election (Minimum)
- Single leader per library for assigning sync sequences
- Election strategy per SYNC_DESIGN.md (initial leader = creator; re-elect via heartbeat timeout)
- TM refuses sync-log creation if not leader (or buffers and requests lease)
## Albums Example (Concrete)
Schema (SeaORM model):
```rust
#[derive(Clone, Debug, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "albums")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub uuid: Uuid,
pub name: String,
pub cover_entry_uuid: Option<Uuid>,
pub version: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
```
Implement traits:
```rust
impl Syncable for albums::Model {
const SYNC_MODEL: &'static str = "album";
fn sync_id(&self) -> Uuid { self.uuid }
fn version(&self) -> i64 { self.version }
fn exclude_fields() -> Option<&'static [&'static str]> {
// example: exclude timestamps from replication
Some(&["created_at", "updated_at", "id"])
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Album { pub id: Uuid, pub name: String, pub cover: Option<Uuid> }
impl Identifiable for Album {
type Id = Uuid;
fn id(&self) -> Self::Id { self.id }
fn resource_type() -> &'static str { "album" }
}
```
Create action (no manual sync logging):
```rust
pub async fn create_album(
tm: &TransactionManager,
library: Arc<Library>,
name: String,
) -> Result<Album, TxError> {
let model = albums::Model {
id: 0,
uuid: Uuid::new_v4(),
name,
cover_entry_uuid: None,
version: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
};
// TM writes + sync logs atomically
let saved = tm.commit(library.clone(), model).await?;
// Build client model (query layer)
let album = Album { id: saved.uuid, name: saved.name, cover: saved.cover_entry_uuid };
// TM (post-commit) emits Event::AlbumUpdated { album } automatically
Ok(album)
}
```
Bulk import albums:
```rust
pub async fn import_albums(
tm: &TransactionManager,
library: Arc<Library>,
names: Vec<String>,
) -> Result<usize, TxError> {
let models: Vec<albums::Model> = names.into_iter().map(|n| albums::Model {
id: 0,
uuid: Uuid::new_v4(),
name: n,
cover_entry_uuid: None,
version: 1,
created_at: Utc::now(),
updated_at: Utc::now(),
}).collect();
let ack = tm.commit_bulk(library, ChangeSet { items: models }).await?;
Ok(ack.affected)
}
```
## Boilerplate Minimization
- Derive macros can implement `Syncable` and `Identifiable` from annotations:
```rust
#[derive(Syncable)]
#[syncable(model="album", id="uuid", version="version", exclude=["created_at","updated_at","id"])]
struct albums::Model { /* ... */ }
#[derive(Identifiable)]
#[identifiable(resource="album", id="id")]
struct Album { /* ... */ }
```
## Event Emission (Unified System)
See `UNIFIED_RESOURCE_EVENTS.md` for complete design.
**Key Points**:
- TM emits generic `ResourceChanged` events automatically
- No manual `event_bus.emit()` in application code
- Clients handle resources generically via `resource_type` field
- Event structure:
```rust
Event {
envelope: { id, timestamp, library_id, sequence },
kind: ResourceChanged { resource_type, resource }
| ResourceBatchChanged { resource_type, resources, operation }
| BulkOperationCompleted { resource_type, affected_count, token, hints }
| ResourceDeleted { resource_type, resource_id }
}
```
**Example**:
```rust
// Rust: Automatic emission
let album = tm.commit::<albums::Model, Album>(library, model).await?;
// → Emits: ResourceChanged { resource_type: "album", resource: album }
// Swift: Generic handling
case .ResourceChanged(let type, let json):
switch type {
case "album": cache.updateEntity(try decode(Album.self, json))
case "file": cache.updateEntity(try decode(File.self, json))
// Add new resources without changing event code!
}
```
Benefits:
- Zero boilerplate for new resources
- Type-safe on both ends
- Cache integration automatic
- ~35 specialized event variants eliminated
## Consistency Rules
- All sync-worthy writes go through TM
- Reads, including raw SQL, remain unrestricted
- Followers treat bulk metadata as notification; they re-index locally if applicable
## Appendix: Raw SQL inside TM
```rust
tm.with_tx(library, |txn| async move {
// raw SQL writes
txn.execute(Statement::from_sql_and_values(DbBackend::Sqlite, "UPDATE albums SET name=? WHERE uuid=?", vec![name.into(), uuid.into()])).await?;
// tell TM to record sync for this model change
tm.sync_log_for::<albums::Model>(txn, uuid).await?;
Ok(())
}).await?;
```

View File

@@ -0,0 +1,604 @@
# TransactionManager Compatibility Analysis
## Executive Summary
**Status**: ✅ **FULLY COMPATIBLE** with existing codebase patterns
The `TransactionManager` design is **fully compatible** with the current database write patterns. The codebase uses **SeaORM exclusively** with well-structured transaction patterns that the TransactionManager can enhance without requiring major refactoring.
**Key Finding**: No sync log infrastructure exists yet - the TransactionManager will be the **first implementation** of transactional sync.
---
## Current Database Write Patterns
### 1. SeaORM-Only Architecture ✅
**Good news**: The codebase uses **SeaORM exclusively** for database operations. No raw SQL for writes (except for optimized bulk operations).
```rust
// Pattern 1: Single insert with ActiveModel
let new_entry = entry::ActiveModel {
uuid: Set(Uuid::new_v4()),
name: Set(entry_name),
size: Set(file_size),
// ...
};
let result = new_entry.insert(db).await?;
```
```rust
// Pattern 2: Batch insert
let entries: Vec<entry::ActiveModel> = /* ... */;
entry::Entity::insert_many(entries)
.exec(db)
.await?;
```
```rust
// Pattern 3: Transaction-wrapped operations
let txn = db.begin().await?;
// Multiple operations
let result1 = model1.insert(&txn).await?;
let result2 = model2.insert(&txn).await?;
txn.commit().await?;
```
**TransactionManager Compatibility**: ✅ **Perfect fit**
- Can wrap existing ActiveModel operations
- Can use SeaORM's transaction support
- No need to change ORM layer
---
## Where Writes Currently Happen
### 1. **Indexer** (Bulk Operations)
**Location**: `core/src/ops/indexing/`
**Pattern**: Batch transactions with bulk inserts
```rust
// Current indexer pattern
let txn = ctx.library_db().begin().await?;
// Accumulate entries in memory
let mut bulk_self_closures: Vec<entry_closure::ActiveModel> = Vec::new();
let mut bulk_dir_paths: Vec<directory_paths::ActiveModel> = Vec::new();
// Process batch
for entry in batch {
EntryProcessor::create_entry_in_conn(
state, ctx, &entry, device_id, location_root_path,
&txn, // ← Single transaction for whole batch
&mut bulk_self_closures,
&mut bulk_dir_paths,
).await?;
}
// Bulk insert related tables
entry_closure::Entity::insert_many(bulk_self_closures)
.exec(&txn).await?;
directory_paths::Entity::insert_many(bulk_dir_paths)
.exec(&txn).await?;
txn.commit().await?;
```
**TransactionManager Integration**:
```rust
// New pattern with TransactionManager
let entries: Vec<entry::ActiveModel> = /* collect in memory */;
tx_manager.commit_bulk(
library,
entries,
BulkOperation::InitialIndex { location_id }
).await?;
// ✅ ONE sync log entry created automatically
// ✅ Event emitted automatically
```
**Refactoring Required**: ⚠️ **Moderate**
- Replace batch transaction with `commit_bulk` call
- Remove manual transaction management
- Add BulkOperation context
- **Benefit**: 10x performance improvement + sync log integration
---
### 2. **User Actions** (Single Operations)
**Location**: `core/src/ops/tags/apply/action.rs`, `core/src/ops/locations/add/action.rs`
**Pattern**: Direct inserts via managers/services
```rust
// Current action pattern
impl LibraryAction for ApplyTagsAction {
async fn execute(
self,
library: Arc<Library>,
_context: Arc<CoreContext>,
) -> Result<Self::Output, ActionError> {
let db = library.db();
let metadata_manager = UserMetadataManager::new(db.conn().clone());
// Apply tags (internally does inserts)
metadata_manager.apply_semantic_tags(
entry_uuid,
tag_applications,
device_id
).await?;
Ok(output)
}
}
```
**TransactionManager Integration**:
```rust
// New pattern with TransactionManager
impl LibraryAction for ApplyTagsAction {
async fn execute(
self,
library: Arc<Library>,
context: Arc<CoreContext>,
) -> Result<Self::Output, ActionError> {
let tx_manager = context.transaction_manager();
// Prepare models
let entry_model = /* ... */;
let tag_link_model = /* ... */;
// Commit transactionally (creates sync log + event)
let file = tx_manager.commit_tag_addition(
library,
entry_model,
tag_link_model,
).await?;
Ok(output)
}
}
```
**Refactoring Required**: ⚠️ **Moderate**
- Inject TransactionManager from CoreContext
- Replace direct DB writes with tx_manager calls
- **Benefit**: Automatic sync log + event emission + audit trail
---
### 3. **TagManager** (Service Layer)
**Location**: `core/src/ops/tags/manager.rs`
**Pattern**: Direct ActiveModel inserts
```rust
// Current tag manager pattern
pub async fn create_tag(&self, canonical_name: String, ...) -> Result<Tag> {
let db = &*self.db;
let active_model = tag::ActiveModel {
uuid: Set(tag.id),
canonical_name: Set(canonical_name),
// ...
};
let result = active_model.insert(db).await?;
Ok(tag)
}
```
**TransactionManager Integration**:
```rust
// New pattern with TransactionManager
pub async fn create_tag(&self, canonical_name: String, ...) -> Result<Tag> {
let tx_manager = self.tx_manager.clone();
let active_model = tag::ActiveModel {
uuid: Set(tag.id),
canonical_name: Set(canonical_name),
// ...
};
// If sync-worthy:
let tag = tx_manager.commit_transactional(
self.library,
active_model,
).await?;
// If not sync-worthy (internal operation):
let tag = tx_manager.commit_silent(
self.library,
active_model,
).await?;
Ok(tag)
}
```
**Refactoring Required**: ⚠️ **Minor**
- Inject TransactionManager into service constructors
- Replace .insert(db) with appropriate commit method
- **Benefit**: Sync-aware services
---
## Raw SQL Usage Analysis
### Current Raw SQL Patterns
**Pattern 1**: Optimized bulk operations (closure table population)
```rust
// core/src/ops/indexing/persistence.rs
txn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"INSERT INTO entry_closure (ancestor_id, descendant_id, depth) \
SELECT ancestor_id, ?, depth + 1 \
FROM entry_closure \
WHERE descendant_id = ?",
vec![result.id.into(), parent_id.into()],
)).await?;
```
**TransactionManager Compatibility**: ✅ **Fully compatible**
- Raw SQL operations happen **inside the transaction**
- TransactionManager provides the transaction context
- No changes needed to these optimizations
**Pattern 2**: FTS5 search queries (read-only)
```rust
// core/src/ops/search/query.rs
db.query_all(
Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT rowid FROM search_index WHERE search_index MATCH '{}'", query)
)
).await?;
```
**TransactionManager Compatibility**: ✅ **No conflict**
- Read-only operations don't need TransactionManager
- Queries remain unchanged
---
## Sync Log Infrastructure
### Current State: ❌ **Does Not Exist**
**Finding**: No `sync_log` table or entity exists in the current database schema.
**Files Checked**:
- `core/src/infra/db/entities/`: No sync_log.rs
- No SyncLog ActiveModel
- No sync log creation in any write operations
**Existing Related Infrastructure**:
1.**Audit Log** (`core/src/infra/db/entities/audit_log.rs`): Tracks user actions
- Used by ActionManager
- Tracks action status, errors, results
- NOT used for sync (library-local only)
2.**Job Database** (`core/src/infra/job/database.rs`): Tracks job execution
- Separate database from library DB
- NOT synced between devices
- Used for resumable jobs
3.**Sync Log**: Not implemented yet
---
## TransactionManager Implementation Strategy
### Phase 1: Create Sync Infrastructure
**Step 1**: Create sync_log entity
```rust
// core/src/infra/db/entities/sync_log.rs
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "sync_log")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
// Core fields
pub sequence: i64, // Monotonically increasing per library
pub library_id: Uuid,
pub device_id: Uuid,
pub timestamp: chrono::DateTime<chrono::Utc>,
// Change tracking
pub model_type: String, // "entry", "tag", "bulk_operation"
pub record_id: String, // UUID of changed record
pub change_type: String, // "insert", "update", "delete", "bulk_insert"
pub version: i32, // Optimistic concurrency version
// Data payload
pub data: serde_json::Value, // Full model data or metadata
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
// Add to core/src/infra/db/entities/mod.rs
pub mod sync_log;
pub use sync_log::Entity as SyncLog;
```
**Step 2**: Create migration
```rust
// Add to database migrations
CREATE TABLE sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sequence INTEGER NOT NULL,
library_id TEXT NOT NULL,
device_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
model_type TEXT NOT NULL,
record_id TEXT NOT NULL,
change_type TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
data TEXT NOT NULL, -- JSON
UNIQUE(library_id, sequence)
);
CREATE INDEX idx_sync_log_library_sequence ON sync_log(library_id, sequence);
CREATE INDEX idx_sync_log_device ON sync_log(device_id);
CREATE INDEX idx_sync_log_model_record ON sync_log(model_type, record_id);
```
---
### Phase 2: Implement TransactionManager
**File**: `core/src/infra/transaction/manager.rs`
```rust
pub struct TransactionManager {
event_bus: Arc<EventBus>,
sync_sequence: Arc<Mutex<HashMap<Uuid, i64>>>,
}
impl TransactionManager {
/// Transactional commit: DB + Sync Log + Event
pub async fn commit_transactional<M: Syncable>(
&self,
library: Arc<Library>,
model: M::ActiveModel,
) -> Result<M::Model, TransactionError> {
let library_id = library.id();
let db = library.db().conn();
// Atomic transaction
let saved_model = db.transaction(|txn| async move {
// 1. Save main model
let saved = model.save(txn).await?;
// 2. Create sync log entry
let sync_entry = self.create_sync_log_entry(
library_id,
&saved,
ChangeType::Upsert,
)?;
sync_entry.insert(txn).await?;
Ok::<_, TransactionError>(saved)
}).await?;
// 3. Emit event (outside transaction)
let event = self.build_event(&library_id, &saved_model);
self.event_bus.emit(event);
Ok(saved_model)
}
/// Bulk commit: DB + ONE metadata sync log
pub async fn commit_bulk(
&self,
library: Arc<Library>,
models: Vec<M::ActiveModel>,
operation: BulkOperation,
) -> Result<BulkResult, TransactionError> {
let library_id = library.id();
let db = library.db().conn();
db.transaction(|txn| async move {
// 1. Bulk insert models
M::Entity::insert_many(models)
.exec(txn)
.await?;
// 2. ONE sync log with metadata
let bulk_sync = self.create_bulk_sync_entry(
library_id,
&operation,
models.len(),
)?;
bulk_sync.insert(txn).await?;
Ok::<_, TransactionError>(())
}).await?;
// 3. Summary event
self.event_bus.emit(Event::BulkOperationCompleted {
library_id,
operation,
affected_count: models.len(),
});
Ok(BulkResult { count: models.len() })
}
}
```
---
### Phase 3: Refactor Existing Code
**Priority 1: Indexer** (Highest impact)
```rust
// Before
let txn = db.begin().await?;
for entry in entries {
entry.insert(&txn).await?;
}
txn.commit().await?;
// After
tx_manager.commit_bulk(
library,
entries,
BulkOperation::InitialIndex { location_id }
).await?;
```
**Priority 2: User Actions** (Highest value)
```rust
// Before
let model = entry::ActiveModel { /* ... */ };
model.insert(db).await?;
// After
tx_manager.commit_transactional(library, model).await?;
```
**Priority 3: Services** (TagManager, etc.)
```rust
// Inject tx_manager into constructors
impl TagManager {
pub fn new(
db: Arc<DatabaseConnection>,
tx_manager: Arc<TransactionManager>, // ← NEW
) -> Self {
// ...
}
}
```
---
## Compatibility Matrix
| Component | Current Pattern | TransactionManager Method | Refactor Effort | Benefit |
|-----------|----------------|---------------------------|-----------------|---------|
| **Indexer** | Batch txn + bulk insert | `commit_bulk` | Moderate | 10x faster, sync aware |
| **Actions** | Direct insert via services | `commit_transactional` | Moderate | Auto sync + event |
| **TagManager** | Direct ActiveModel insert | `commit_transactional` or `commit_silent` | Minor | Sync aware |
| **LocationManager** | Spawns indexer job | Use indexer's commit_bulk | None | Inherits benefits |
| **Watcher** | Individual inserts | `commit_transactional_batch` | Minor | Batch optimization |
| **Raw SQL optimizations** | Inside transactions | Unchanged (use txn from manager) | None | Fully compatible |
| **Queries** | Read-only | Unchanged | None | No conflict |
---
## Migration Path
### Step 1: Foundation (Week 1)
- [ ] Create `sync_log` entity and migration
- [ ] Implement `TransactionManager` core
- [ ] Add to `CoreContext`
- [ ] Write unit tests
### Step 2: Indexer (Week 2)
- [ ] Refactor indexer to use `commit_bulk`
- [ ] Benchmark before/after
- [ ] Integration tests
- [ ] Deploy to test library
### Step 3: User Actions (Week 3)
- [ ] Refactor file operations (rename, tag, move)
- [ ] Refactor location operations
- [ ] Test sync log creation
- [ ] Test event emission
### Step 4: Services (Week 4)
- [ ] Inject TransactionManager into TagManager
- [ ] Inject into other services
- [ ] Update all write operations
- [ ] Comprehensive integration tests
### Step 5: Client Integration (Week 5+)
- [ ] Implement sync follower service
- [ ] Implement client cache
- [ ] Test end-to-end sync
- [ ] Performance testing
---
## Risk Analysis
### Low Risk ✅
1. **SeaORM Compatibility**: ✅ Perfect fit
- TransactionManager uses SeaORM's native transaction support
- No ORM layer changes needed
2. **Raw SQL Compatibility**: ✅ No issues
- Raw SQL stays inside transactions
- TransactionManager provides transaction context
3. **Backward Compatibility**: ✅ Non-breaking
- Existing code continues to work
- Gradual migration possible
- No API changes for external callers
### Medium Risk ⚠️
1. **Refactoring Effort**: ⚠️ Moderate work required
- ~50 write locations across codebase
- Need to inject TransactionManager into services
- Testing effort substantial but manageable
2. **Performance Impact**: ⚠️ Need validation
- Sync log writes add overhead
- Mitigated by bulk operations
- Need benchmarks before/after
### Mitigation Strategies
1. **Gradual Migration**: Start with indexer, then actions, then services
2. **Feature Flag**: Gate sync log creation behind config flag during rollout
3. **Performance Testing**: Benchmark each phase before moving to next
4. **Rollback Plan**: Keep old code paths until validated
---
## Conclusion
**Verdict**: ✅ **FULLY COMPATIBLE AND READY TO IMPLEMENT**
The TransactionManager design is **architecturally sound** and **fully compatible** with the existing codebase:
1.**No conflicts** with existing patterns
2.**Enhances** rather than replaces current code
3.**Gradual migration** path available
4.**Significant benefits**: Sync support, event emission, audit trail
5.**Performance improvements** for bulk operations
**Recommendation**: **Proceed with implementation using the phased approach outlined above.**
The TransactionManager will be the **foundation** for Spacedrive's sync architecture, and the current codebase is **well-structured** to integrate it cleanly.

View File

@@ -0,0 +1,740 @@
# Unified Resource Event System
## Problem Statement
Current event system has ~40 specialized variants (`EntryCreated`, `VolumeAdded`, `JobStarted`, etc.), leading to:
- Manual event emission scattered across codebase
- No type safety between events and resources
- Clients must handle each variant specifically
- Adding new resources requires new event variants
- TransactionManager cannot automatically emit events
**Observation from code**: Line 353 has a TODO: "events should have an envelope that contains the library_id instead of this"
## Solution: Generic Resource Events
All resources implementing `Identifiable` can use a unified event structure. TransactionManager emits these automatically.
### Design
```rust
// core/src/infra/event/mod.rs
/// Unified event envelope wrapping all resource events
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct Event {
/// Event metadata
pub envelope: EventEnvelope,
/// The actual event payload
pub kind: EventKind,
}
/// Standard envelope for all events
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct EventEnvelope {
/// Event ID for deduplication/tracking
pub id: Uuid,
/// When this event was created
pub timestamp: DateTime<Utc>,
/// Library context (if applicable)
pub library_id: Option<Uuid>,
/// Sequence number for ordering (optional)
pub sequence: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[serde(tag = "type", content = "data")]
pub enum EventKind {
// ========================================
// GENERIC RESOURCE EVENTS
// ========================================
/// A resource was created/updated (single)
ResourceChanged {
/// Resource type identifier (from Identifiable::resource_type)
resource_type: String,
/// The full resource data (must implement Identifiable)
#[specta(skip)] // Clients reconstruct from JSON
resource: serde_json::Value,
},
/// Multiple resources changed in a batch
ResourceBatchChanged {
resource_type: String,
resources: Vec<serde_json::Value>,
operation: BatchOperation,
},
/// A resource was deleted
ResourceDeleted {
resource_type: String,
resource_id: Uuid,
},
/// Bulk operation completed (notification only, no data transfer)
BulkOperationCompleted {
/// Type of resource affected
resource_type: String,
/// Summary info
affected_count: usize,
operation_token: Uuid,
hints: serde_json::Value, // location_id, etc.
},
// ========================================
// LIFECYCLE EVENTS (no resources)
// ========================================
CoreStarted,
CoreShutdown,
LibraryOpened { id: Uuid, name: String },
LibraryClosed { id: Uuid },
// ========================================
// INFRASTRUCTURE EVENTS
// ========================================
/// Job lifecycle (not a domain resource)
Job {
job_id: String,
status: JobStatus,
progress: Option<f64>,
message: Option<String>,
},
/// Raw filesystem changes (before DB resolution)
FsRawChange {
kind: FsRawEventKind,
},
/// Log streaming
LogMessage {
timestamp: DateTime<Utc>,
level: String,
target: String,
message: String,
job_id: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub enum BatchOperation {
Index,
Search,
Update,
WatcherBatch,
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub enum JobStatus {
Queued,
Started,
Progress,
Completed { output: JobOutput },
Failed { error: String },
Cancelled,
Paused,
Resumed,
}
```
### TransactionManager Integration
```rust
impl TransactionManager {
/// Emit a resource changed event (automatic)
fn emit_resource_changed<R: Identifiable + Serialize>(
&self,
library_id: Uuid,
resource: &R,
) {
let event = Event {
envelope: EventEnvelope {
id: Uuid::new_v4(),
timestamp: Utc::now(),
library_id: Some(library_id),
sequence: None,
},
kind: EventKind::ResourceChanged {
resource_type: R::resource_type().to_string(),
resource: serde_json::to_value(resource).unwrap(),
},
};
self.event_bus.emit(event);
}
/// Commit single resource (emits ResourceChanged)
pub async fn commit<M: Syncable + IntoActiveModel, R: Identifiable + From<M>>(
&self,
library: Arc<Library>,
model: M,
) -> Result<R, TxError> {
let library_id = library.id();
// Atomic: DB + sync log
let saved = /* transaction logic */;
// Build client resource
let resource = R::from(saved);
// Auto-emit
self.emit_resource_changed(library_id, &resource);
Ok(resource)
}
/// Commit batch (emits ResourceBatchChanged)
pub async fn commit_batch<M, R>(
&self,
library: Arc<Library>,
models: Vec<M>,
) -> Result<Vec<R>, TxError>
where
M: Syncable + IntoActiveModel,
R: Identifiable + From<M>,
{
let library_id = library.id();
// Atomic batch transaction
let saved_models = /* batch transaction */;
// Build resources
let resources: Vec<R> = saved_models.into_iter().map(R::from).collect();
// Emit batch event
let event = Event {
envelope: EventEnvelope {
id: Uuid::new_v4(),
timestamp: Utc::now(),
library_id: Some(library_id),
sequence: None,
},
kind: EventKind::ResourceBatchChanged {
resource_type: R::resource_type().to_string(),
resources: resources.iter()
.map(|r| serde_json::to_value(r).unwrap())
.collect(),
operation: BatchOperation::Update,
},
};
self.event_bus.emit(event);
Ok(resources)
}
/// Bulk operation (emits BulkOperationCompleted)
pub async fn commit_bulk<M: Syncable>(
&self,
library: Arc<Library>,
changes: ChangeSet<M>,
) -> Result<BulkAck, TxError> {
let library_id = library.id();
// Atomic bulk insert + metadata sync log
let token = /* bulk transaction */;
// Emit summary event (no resource data!)
let event = Event {
envelope: EventEnvelope {
id: Uuid::new_v4(),
timestamp: Utc::now(),
library_id: Some(library_id),
sequence: None,
},
kind: EventKind::BulkOperationCompleted {
resource_type: M::SYNC_MODEL.to_string(),
affected_count: changes.items.len(),
operation_token: token,
hints: changes.hints,
},
};
self.event_bus.emit(event);
Ok(BulkAck { affected: changes.items.len(), token })
}
}
```
### Client Handling (Swift Example)
```swift
// ZERO-FRICTION: Type registry (auto-generated from Rust via specta)
protocol CacheableResource: Identifiable, Codable {
static var resourceType: String { get }
}
// Auto-generated registry (no manual maintenance!)
class ResourceTypeRegistry {
private static var decoders: [String: (Data) throws -> any CacheableResource] = [:]
// Called automatically when types are loaded
static func register<T: CacheableResource>(_ type: T.Type) {
decoders[T.resourceType] = { data in
try JSONDecoder().decode(T.self, from: data)
}
}
static func decode(resourceType: String, from data: Data) throws -> any CacheableResource {
guard let decoder = decoders[resourceType] else {
throw CacheError.unknownResourceType(resourceType)
}
return try decoder(data)
}
}
// Types auto-register via property wrapper or extension
extension File: CacheableResource {
static let resourceType = "file"
}
extension Album: CacheableResource {
static let resourceType = "album"
}
extension Tag: CacheableResource {
static let resourceType = "tag"
}
// Add new resources without touching ANY event handling code!
extension Location: CacheableResource {
static let resourceType = "location"
}
// GENERIC event handler (ZERO switch statements!)
actor ResourceCache {
func handleEvent(_ event: Event) async {
switch event.kind {
case .ResourceChanged(let resourceType, let resourceJSON):
do {
// Generic decode - works for ALL resources!
let resource = try ResourceTypeRegistry.decode(
resourceType: resourceType,
from: resourceJSON
)
updateEntity(resource)
} catch {
print("Failed to decode \(resourceType): \(error)")
}
case .ResourceBatchChanged(let resourceType, let resourcesJSON, let operation):
// Generic batch decode
let resources = resourcesJSON.compactMap { json in
try? ResourceTypeRegistry.decode(resourceType: resourceType, from: json)
}
resources.forEach { updateEntity($0) }
case .BulkOperationCompleted(let resourceType, let count, let token, let hints):
// Invalidate queries
print("Bulk op on \(resourceType): \(count) items")
invalidateQueriesForResource(resourceType, hints: hints)
case .ResourceDeleted(let resourceType, let resourceId):
// Generic deletion
deleteEntity(resourceType: resourceType, id: resourceId)
// Infrastructure events
case .Job(let jobId, let status, _, _):
updateJobStatus(jobId: jobId, status: status)
default:
break
}
}
// Generic entity update (works for all Identifiable resources)
func updateEntity(_ resource: any CacheableResource) {
let cacheKey = type(of: resource).resourceType + ":" + resource.id.uuidString
entityStore[cacheKey] = resource
// Update all queries that reference this resource
invalidateQueriesContaining(cacheKey)
}
// Generic deletion
func deleteEntity(resourceType: String, id: UUID) {
let cacheKey = resourceType + ":" + id.uuidString
entityStore.removeValue(forKey: cacheKey)
invalidateQueriesContaining(cacheKey)
}
}
```
**Key Innovation**: Type registry eliminates all switch statements!
**Adding a new resource**:
```swift
// 1. Define type (auto-generated from Rust via specta)
struct Photo: CacheableResource {
let id: UUID
let albumId: UUID
let path: String
static let resourceType = "photo"
}
// 2. That's it! Event handling automatically works.
// No changes to ResourceCache, no switch cases, nothing!
```
```
### TypeScript Client Example
```typescript
// ZERO-FRICTION: Type registry (auto-generated from Rust via specta)
interface CacheableResource {
id: string;
}
// Auto-generated type map (from Rust types via specta)
type ResourceTypeMap = {
file: File;
album: Album;
tag: Tag;
location: Location;
// New types added automatically by codegen!
};
// Generic decoder with type safety
class ResourceTypeRegistry {
private static validators: Map<string, (data: unknown) => CacheableResource> = new Map();
// Auto-register types (called during module init)
static register<T extends CacheableResource>(
resourceType: string,
validator: (data: unknown) => T
) {
this.validators.set(resourceType, validator);
}
static decode(resourceType: string, data: unknown): CacheableResource {
const validator = this.validators.get(resourceType);
if (!validator) {
throw new Error(`Unknown resource type: ${resourceType}`);
}
return validator(data);
}
}
// Types auto-register (could use decorators or explicit calls)
ResourceTypeRegistry.register('file', (data) => data as File);
ResourceTypeRegistry.register('album', (data) => data as Album);
ResourceTypeRegistry.register('tag', (data) => data as Tag);
// Add new types without touching event handler!
// GENERIC event handler (ZERO switch statements!)
export class NormalizedCache {
handleEvent(event: Event) {
switch (event.kind.type) {
case 'ResourceChanged': {
const { resource_type, resource } = event.kind.data;
// ✅ Generic decode - works for ALL resources!
const decoded = ResourceTypeRegistry.decode(resource_type, resource);
this.updateEntity(resource_type, decoded);
break;
}
case 'ResourceBatchChanged': {
const { resource_type, resources } = event.kind.data;
// ✅ Generic batch
resources.forEach(r => {
const decoded = ResourceTypeRegistry.decode(resource_type, r);
this.updateEntity(resource_type, decoded);
});
break;
}
case 'BulkOperationCompleted': {
const { resource_type, hints } = event.kind.data;
this.invalidateQueries(resource_type, hints);
break;
}
case 'ResourceDeleted': {
const { resource_type, resource_id } = event.kind.data;
this.deleteEntity(resource_type, resource_id);
break;
}
}
}
// ✅ Automatic cache update for ANY resource
private updateEntity(resourceType: string, resource: CacheableResource) {
const cacheKey = `${resourceType}:${resource.id}`;
this.entities.set(cacheKey, resource);
// Trigger UI updates for queries using this resource
this.notifyQueries(cacheKey);
}
// ✅ Generic deletion
private deleteEntity(resourceType: string, resourceId: string) {
const cacheKey = `${resourceType}:${resourceId}`;
this.entities.delete(cacheKey);
this.notifyQueries(cacheKey);
}
}
// Adding a new resource (Photo):
// 1. Rust: impl Identifiable for Photo { resource_type() = "photo" }
// 2. Run: cargo run --bin specta-gen (regenerates TypeScript types)
// 3. TypeScript: import { Photo } from './bindings/Photo.ts'
// 4. ResourceTypeRegistry.register('photo', (data) => data as Photo);
// 5. Done! No changes to event handling, cache logic, nothing!
```
**With Build Script Automation** (fully automatic):
```typescript
// Auto-generated file: src/bindings/resourceRegistry.ts
// This file is generated by: cargo run --bin specta-gen
// DO NOT EDIT MANUALLY
import { File } from './File';
import { Album } from './Album';
import { Tag } from './Tag';
import { Location } from './Location';
// ... all other Identifiable types
// Registry is populated at module load time
export const resourceTypeMap = {
'file': File,
'album': Album,
'tag': Tag,
'location': Location,
// ... all other types
} as const;
// Zero-config setup
Object.entries(resourceTypeMap).forEach(([type, validator]) => {
ResourceTypeRegistry.register(type, validator as any);
});
```
**Result**: Adding a new Identifiable resource in Rust automatically:
1. Generates TypeScript type
2. Registers in type map
3. Works with event handling
4. **Zero manual client changes!**
## Migration Strategy
### Phase 1: Add Unified Events (Additive)
- Keep existing Event variants
- Add new `ResourceChanged`, `ResourceBatchChanged`, etc.
- TransactionManager emits new events
- Clients can start consuming new events
### Phase 2: Migrate Resources One-by-One
For each resource (File, Album, Tag, Location, etc.):
1. Implement `Identifiable` trait
2. Switch from manual `event_bus.emit(Event::EntryCreated)` to TM
3. Update client to consume `ResourceChanged` for that type
4. Mark old event variant as deprecated
### Phase 3: Remove Old Events
Once all resources migrated:
- Remove `EntryCreated`, `VolumeAdded`, etc.
- Keep infrastructure events (Job, Log, FsRawChange)
- Remove manual event emission from ops code
## Benefits
### For Rust Core
**Zero boilerplate**: No manual event emission
**Type safety**: TM ensures events match resources
**Automatic**: Emit on every commit
**Uniform**: All resources handled same way
### For Clients
**ZERO switch statements**: Type registry handles all resources
**Type-safe deserialization**: JSON → typed resource
**Zero-friction scaling**: Add 100 resources, no client changes
**Auto-generated**: specta codegen creates registry automatically
**Cache-friendly**: Direct integration with normalized cache
### Horizontal Scaling
**Rust**: Add `impl Identifiable` → automatic events
**TypeScript**: Run codegen → automatic type + registry
**Swift**: Add `CacheableResource` conformance → automatic handling
**New platforms**: Implement type registry once, scales infinitely
### For Maintenance
**Less code**: ~40 variants → ~5 generic variants
**No manual updates**: Adding File → Album → Tag reuses same code
**Clear semantics**: Resource events vs infrastructure events
**Centralized**: All emission in TransactionManager
## Examples by Resource Type
### Files (Entry → File)
```rust
// Rust
let file = tm.commit::<entry::Model, File>(library, entry_model).await?;
// → Emits: ResourceChanged { resource_type: "file", resource: file }
// Swift
case .ResourceChanged("file", let json):
let file = try decode(File.self, json)
cache.updateEntity(file)
```
### Albums
```rust
// Rust
let album = tm.commit::<albums::Model, Album>(library, album_model).await?;
// → Emits: ResourceChanged { resource_type: "album", resource: album }
// Swift
case .ResourceChanged("album", let json):
let album = try decode(Album.self, json)
cache.updateEntity(album)
```
### Tags
```rust
// Rust
let tag = tm.commit::<tags::Model, Tag>(library, tag_model).await?;
// → Emits: ResourceChanged { resource_type: "tag", resource: tag }
// Swift
case .ResourceChanged("tag", let json):
let tag = try decode(Tag.self, json)
cache.updateEntity(tag)
```
### Locations
```rust
// Rust
let location = tm.commit::<locations::Model, Location>(library, location_model).await?;
// → Emits: ResourceChanged { resource_type: "location", resource: location }
// Swift
case .ResourceChanged("location", let json):
let location = try decode(Location.self, json)
cache.updateEntity(location)
```
## Infrastructure Events (Not Resources)
Some events are not domain resources:
- **Jobs**: Ephemeral, not cached, different lifecycle
- **Logs**: Streaming, not state
- **FsRawChange**: Pre-database, becomes Entry later
- **Core lifecycle**: System-level
These keep specialized variants under `EventKind`.
## Comparison: Before vs After
### Before (Current)
```rust
// Scattered manual emission
pub async fn create_album(library: Arc<Library>, name: String) -> Result<Album> {
let model = albums::ActiveModel { /* ... */ };
let saved = model.insert(db).await?;
// Manual event emission
event_bus.emit(Event::AlbumCreated {
library_id: library.id(),
album_id: saved.uuid,
});
Ok(album)
}
// Client must handle specific variant + switch case
case .AlbumCreated(let libraryId, let albumId):
// Fetch album data separately
let album = await client.query("albums.get", albumId)
cache.updateEntity(album)
```
### After (Unified + Type Registry)
```rust
// Automatic emission via TransactionManager
pub async fn create_album(
tm: &TransactionManager,
library: Arc<Library>,
name: String,
) -> Result<Album> {
let model = albums::ActiveModel { /* ... */ };
// TM emits ResourceChanged automatically
let album = tm.commit::<albums::Model, Album>(library, model).await?;
Ok(album)
}
// Client: ZERO resource-specific code!
case .ResourceChanged(let resourceType, let json):
// ✅ Works for Album, File, Tag, Location, everything!
let resource = try ResourceTypeRegistry.decode(resourceType, json)
cache.updateEntity(resource)
// Add 100 new resources: this code never changes!
```
**Adding a 101st resource**:
- Rust: `impl Identifiable for NewResource` (3 lines)
- Client: Nothing! (codegen handles it)
**Horizontal scaling achieved!** 🎉
## Event Size Considerations
**Concern**: Sending full resources in events increases bandwidth
**Mitigations**:
1. **Gzip compression**: Event bus can compress large payloads
2. **Client caching**: Only send if resource changed
3. **Delta events** (future): Send only changed fields
4. **Bulk events**: Don't send individual resources (just metadata)
**Measurement**:
- File resource: ~500 bytes JSON
- Album resource: ~200 bytes JSON
- Tag resource: ~150 bytes JSON
Even with 100 concurrent updates: 500 bytes × 100 = 50KB (negligible)
## Alternative: Lightweight Events
If bandwidth becomes an issue, use two-tier system:
```rust
pub enum EventKind {
// Lightweight: just ID
ResourceChanged {
resource_type: String,
resource_id: Uuid,
// Client fetches if needed
},
// Rich: full data (opt-in)
ResourceChangedRich {
resource_type: String,
resource: serde_json::Value,
},
}
```
But start with rich events (simpler, better cache consistency).
## Conclusion
This unified event system:
- ✅ Eliminates ~35 specialized event variants
- ✅ Makes TransactionManager sole event emitter
- ✅ Enables generic client handling
- ✅ Reduces boilerplate to zero
- ✅ Scales to infinite resource types
- ✅ Aligns perfectly with Identifiable/Syncable design
**Next Step**: Implement `Event` refactor alongside TransactionManager in mini-spec.

View File

File diff suppressed because it is too large Load Diff

1290
docs/core/devices.md Normal file
View File

File diff suppressed because it is too large Load Diff

688
docs/core/events.md Normal file
View File

@@ -0,0 +1,688 @@
# Unified Resource Event System
**Status**: Implementation Ready
**Version**: 2.0
**Last Updated**: 2025-10-08
## Overview
Spacedrive's event system broadcasts real-time updates to all connected clients. This document specifies the **unified resource event architecture** that eliminates ~40 specialized event variants in favor of generic, horizontally-scalable events.
## The Problem
**Current system** (`core/src/infra/event/mod.rs` has 40+ variants):
```rust
Event::EntryCreated { library_id, entry_id }
Event::EntryModified { library_id, entry_id }
Event::VolumeAdded(Volume)
Event::LibraryCreated { id, name, path }
Event::JobStarted { job_id, job_type }
// ... 35 more variants
```
**Issues**:
- ❌ Manual emission scattered across codebase (easy to forget)
- ❌ Adding new resource = new event variant + client code changes
- ❌ No type safety between events and resources
- ❌ Clients must handle each variant specifically
- ❌ Horizontal scaling requires per-resource boilerplate
## The Solution: Generic Resource Events
All resources implementing `Identifiable` use a unified event structure. The TransactionManager emits these automatically.
### Event Structure
```rust
/// Unified event wrapper
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct Event {
/// Standard metadata for all events
pub envelope: EventEnvelope,
/// The actual event payload
pub kind: EventKind,
}
/// Standard envelope (addresses TODO on line 353 of current event/mod.rs)
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct EventEnvelope {
/// Event ID for deduplication
pub id: Uuid,
/// When this event occurred
pub timestamp: DateTime<Utc>,
/// Library context (if applicable)
pub library_id: Option<Uuid>,
/// Sequence number for ordering and gap detection
pub sequence: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[serde(tag = "type", content = "data")]
pub enum EventKind {
// ============================================
// GENERIC RESOURCE EVENTS (for Identifiable)
// ============================================
/// A resource was created or updated
ResourceChanged {
/// Resource type from Identifiable::resource_type (e.g., "file", "album")
resource_type: String,
/// The full resource as JSON (client deserializes generically)
#[specta(skip)]
resource: serde_json::Value,
},
/// Multiple resources changed in a batch
ResourceBatchChanged {
resource_type: String,
resources: Vec<serde_json::Value>,
operation: BatchOperation,
},
/// A resource was deleted
ResourceDeleted {
resource_type: String,
resource_id: Uuid,
},
/// Bulk operation completed (notification only, no individual resources)
BulkOperationCompleted {
resource_type: String,
affected_count: usize,
operation_token: Uuid,
hints: serde_json::Value, // location_id, etc.
},
// ============================================
// INFRASTRUCTURE EVENTS (not resources)
// ============================================
/// Core lifecycle
CoreStarted,
CoreShutdown,
/// Library lifecycle
LibraryOpened { id: Uuid, name: String },
LibraryClosed { id: Uuid },
/// Job status (not a domain resource)
Job {
job_id: String,
status: JobStatus,
progress: Option<f64>,
message: Option<String>,
generic_progress: Option<GenericProgress>,
},
/// Raw filesystem changes (before DB resolution)
FsRawChange { kind: FsRawEventKind },
/// Log streaming
LogMessage {
timestamp: DateTime<Utc>,
level: String,
target: String,
message: String,
job_id: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub enum BatchOperation {
Index,
Search,
Update,
WatcherBatch,
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub enum JobStatus {
Queued,
Started,
Progress,
Completed { output: JobOutput },
Failed { error: String },
Cancelled,
Paused,
Resumed,
}
```
## TransactionManager Integration
The TM automatically emits events after successful commits:
```rust
impl TransactionManager {
/// Emit resource changed event (automatic)
fn emit_resource_changed<R: Identifiable + Serialize>(
&self,
library_id: Uuid,
resource: &R,
) {
let event = Event {
envelope: EventEnvelope {
id: Uuid::new_v4(),
timestamp: Utc::now(),
library_id: Some(library_id),
sequence: None,
},
kind: EventKind::ResourceChanged {
resource_type: R::resource_type().to_string(),
resource: serde_json::to_value(resource).unwrap(),
},
};
self.event_bus.emit(event);
}
/// Commit automatically emits
pub async fn commit<M, R>(
&self,
library: Arc<Library>,
model: M,
) -> Result<R, TxError>
where
M: Syncable + IntoActiveModel,
R: Identifiable + From<M>,
{
// 1. Atomic transaction (DB + sync log)
let saved_model = /* ... */;
// 2. Convert to client resource
let resource = R::from(saved_model);
// 3. Emit automatically
self.emit_resource_changed(library.id(), &resource);
Ok(resource)
}
}
```
**Result**: Application code never calls `event_bus.emit()` manually!
## Client-Side: Zero-Friction Event Handling
The true power is realized on the client through **type registries**.
### Swift Implementation
```swift
// ===================================================
// ONE-TIME SETUP: Type registry (auto-maintained)
// ===================================================
protocol CacheableResource: Identifiable, Codable {
static var resourceType: String { get }
}
class ResourceTypeRegistry {
private static var decoders: [String: (Data) throws -> any CacheableResource] = [:]
static func register<T: CacheableResource>(_ type: T.Type) {
decoders[T.resourceType] = { data in
try JSONDecoder().decode(T.self, from: data)
}
}
static func decode(resourceType: String, from data: Data) throws -> any CacheableResource {
guard let decoder = decoders[resourceType] else {
throw CacheError.unknownResourceType(resourceType)
}
return try decoder(data)
}
}
// Register types (auto-generated via specta codegen)
extension File: CacheableResource { static let resourceType = "file" }
extension Album: CacheableResource { static let resourceType = "album" }
extension Tag: CacheableResource { static let resourceType = "tag" }
extension Location: CacheableResource { static let resourceType = "location" }
// Add 100 more: event handler NEVER changes!
// ===================================================
// PERMANENT: Generic event handler
// THIS CODE NEVER CHANGES WHEN ADDING NEW RESOURCES!
// ===================================================
actor EventCacheUpdater {
let cache: NormalizedCache
func handleEvent(_ event: Event) async {
switch event.kind {
case .ResourceChanged(let resourceType, let resourceJSON):
do {
// Works for ALL current and future resources!
let resource = try ResourceTypeRegistry.decode(
resourceType: resourceType,
from: resourceJSON
)
await cache.updateEntity(resource)
} catch {
print("Failed to decode \(resourceType): \(error)")
}
case .ResourceBatchChanged(let resourceType, let resourcesJSON, _):
// Generic batch handling
let resources = resourcesJSON.compactMap { json in
try? ResourceTypeRegistry.decode(resourceType: resourceType, from: json)
}
for resource in resources {
await cache.updateEntity(resource)
}
case .BulkOperationCompleted(let resourceType, let count, _, let hints):
// Invalidate affected queries
print("📦 Bulk operation: \(count) \(resourceType) items")
await cache.invalidateQueriesForResource(resourceType, hints: hints)
case .ResourceDeleted(let resourceType, let resourceId):
// Generic deletion
await cache.deleteEntity(resourceType: resourceType, id: resourceId)
// Infrastructure events
case .Job(let jobId, let status, let progress, _):
await cache.updateJobStatus(jobId: jobId, status: status, progress: progress)
default:
break
}
}
}
```
**Key Achievement**: Adding a 101st resource requires **zero changes** to event handling!
### TypeScript Implementation
```typescript
// ===================================================
// Auto-generated by: cargo run --bin specta-gen
// ===================================================
// src/bindings/resourceRegistry.ts
import { File } from './File';
import { Album } from './Album';
import { Tag } from './Tag';
import { Location } from './Location';
export const resourceTypeMap = {
'file': File,
'album': Album,
'tag': Tag,
'location': Location,
// ... all Identifiable types
} as const;
// ===================================================
// PERMANENT: Generic event handler (never changes!)
// ===================================================
export class EventCacheUpdater {
constructor(private cache: NormalizedCache) {}
handleEvent(event: Event) {
switch (event.kind.type) {
case 'ResourceChanged': {
const { resource_type, resource } = event.kind.data;
// ✅ Generic decode via auto-generated registry!
const decoded = ResourceTypeRegistry.decode(resource_type, resource);
this.cache.updateEntity(resource_type, decoded);
break;
}
case 'ResourceBatchChanged': {
const { resource_type, resources } = event.kind.data;
resources.forEach(r => {
const decoded = ResourceTypeRegistry.decode(resource_type, r);
this.cache.updateEntity(resource_type, decoded);
});
break;
}
case 'BulkOperationCompleted': {
const { resource_type, hints } = event.kind.data;
this.cache.invalidateQueries(resource_type, hints);
break;
}
case 'ResourceDeleted': {
const { resource_type, resource_id } = event.kind.data;
this.cache.deleteEntity(resource_type, resource_id);
break;
}
}
}
}
// ===================================================
// Generic type registry
// ===================================================
class ResourceTypeRegistry {
private static validators = new Map<string, (data: unknown) => any>();
static register<T>(resourceType: string, validator: (data: unknown) => T) {
this.validators.set(resourceType, validator);
}
static decode(resourceType: string, data: unknown): any {
const validator = this.validators.get(resourceType);
if (!validator) {
throw new Error(`Unknown resource type: ${resourceType}`);
}
return validator(data);
}
}
// Auto-registration from generated map
Object.entries(resourceTypeMap).forEach(([type, TypeClass]) => {
ResourceTypeRegistry.register(type, (data) => data as InstanceType<typeof TypeClass>);
});
```
## Specta Codegen Integration
Spacedrive uses `specta` to generate TypeScript types from Rust. This extends to the event system:
### Rust: Mark Types for Export
```rust
#[derive(Debug, Clone, Serialize, Deserialize, Type)] // Type = specta
pub struct Album {
pub id: Uuid,
pub name: String,
pub cover_entry_uuid: Option<Uuid>,
}
impl Identifiable for Album {
type Id = Uuid;
fn resource_id(&self) -> Self::Id { self.id }
fn resource_type() -> &'static str { "album" }
}
```
### Build Script
```rust
// xtask/src/specta_gen.rs
fn main() {
let mut builder = TypeCollection::default();
// Export all Identifiable types
builder.register::<File>();
builder.register::<Album>();
builder.register::<Tag>();
builder.register::<Location>();
// Auto-generate resourceRegistry.ts
generate_resource_registry(&builder);
// Generate types
builder.export_ts("../packages/client/src/bindings/").unwrap();
}
```
**Result**: Run `cargo xtask specta-gen` → TypeScript types + registry auto-update!
## Migration Strategy
### Phase 1: Additive (No Breaking Changes)
- Keep all existing Event variants
- Add new `ResourceChanged`, `ResourceBatchChanged`, etc.
- TransactionManager emits new events
- Old manual emissions still work
- Clients can gradually adopt new events
### Phase 2: Parallel Systems
- New resources (Albums) use unified events only
- Existing resources (Files, Tags) emit both old and new
- Clients migrate to new event handlers one resource at a time
- Measure: No regressions in UI responsiveness
### Phase 3: Deprecation
- Mark old variants as `#[deprecated]`
- Remove manual `event_bus.emit()` calls from ops
- Clients fully migrated to generic handlers
### Phase 4: Cleanup
- Remove old event variants
- Codegen verification: All Identifiable types have registry entries
- Performance testing: Ensure no regression
## Event Emission Guidelines
### ✅ DO: Let TransactionManager Handle It
```rust
// ✅ CORRECT: TM emits automatically
pub async fn create_album(tm: &TransactionManager, library: Arc<Library>, name: String) -> Result<Album> {
let model = albums::ActiveModel { /* ... */ };
let album = tm.commit::<albums::Model, Album>(library, model).await?;
Ok(album) // Event emitted automatically!
}
```
### ❌ DON'T: Manual Event Emission
```rust
// ❌ WRONG: Manual emission (error-prone, bypasses sync)
pub async fn create_album(library: Arc<Library>, name: String) -> Result<Album> {
let model = albums::ActiveModel { /* ... */ };
model.insert(db).await?; // No sync log!
event_bus.emit(Event::AlbumCreated { /* ... */ }); // Can forget this!
Ok(album)
}
```
## Event Types by Category
### Resource Events (Automatic via TM)
- `ResourceChanged` - Single create/update
- `ResourceBatchChanged` - Batch updates (10-1K items)
- `ResourceDeleted` - Single deletion
- `BulkOperationCompleted` - Bulk notification (1K+ items)
**Resources**:
- File (from Entry persistence model)
- Album
- Tag
- Location
- Device
- ContentIdentity
- Sidecar
- Collection
- Label
### Infrastructure Events (Manual, Specific)
- `CoreStarted`, `CoreShutdown` - Daemon lifecycle
- `LibraryOpened`, `LibraryClosed` - Library state
- `Job { status, progress }` - Job execution
- `FsRawChange` - Watcher events (pre-database)
- `LogMessage` - Log streaming
## Complete Example: Album Creation
### Rust Core
```rust
// ops/albums/create/action.rs
impl LibraryAction for CreateAlbumAction {
async fn execute(
self,
library: Arc<Library>,
context: Arc<CoreContext>,
) -> Result<Self::Output, ActionError> {
let tm = context.transaction_manager();
let model = albums::ActiveModel {
id: NotSet,
uuid: Set(Uuid::new_v4()),
name: Set(self.input.name),
version: Set(1),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
};
// TM handles: DB write + sync log + event emission
let album = tm.commit::<albums::Model, Album>(library, model)
.await
.map_err(|e| ActionError::Internal(e.to_string()))?;
Ok(CreateAlbumOutput { album })
}
}
// domain/album.rs
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct Album {
pub id: Uuid,
pub name: String,
pub cover_entry_uuid: Option<Uuid>,
}
impl Identifiable for Album {
type Id = Uuid;
fn resource_id(&self) -> Self::Id { self.id }
fn resource_type() -> &'static str { "album" }
}
impl From<albums::Model> for Album {
fn from(model: albums::Model) -> Self {
Self {
id: model.uuid,
name: model.name,
cover_entry_uuid: model.cover_entry_uuid,
}
}
}
```
### Swift Client
```swift
// NO RESOURCE-SPECIFIC CODE NEEDED!
// The generic handler already works:
// User taps "Create Album"
try await client.action("albums.create.v1", input: ["name": "Vacation 2025"])
// Event arrives:
// Event {
// envelope: { id, timestamp, library_id, sequence },
// kind: ResourceChanged {
// resource_type: "album",
// resource: { "id": "uuid-...", "name": "Vacation 2025" }
// }
// }
// Generic handler processes it:
let resource = try ResourceTypeRegistry.decode("album", from: resourceJSON)
await cache.updateEntity(resource)
// SwiftUI view updates automatically!
```
## Performance
### Bandwidth
**Single update**:
- Album resource: ~200 bytes JSON
- File resource: ~500 bytes JSON
- Overhead: ~100 bytes (envelope)
- Total: <1KB per event
**Bulk operation**:
- Metadata only: ~500 bytes
- NO individual resources sent
- Client invalidates queries, refetches on demand
### Event Bus Capacity
- Tokio broadcast channel: 1024 event buffer (configurable)
- Slow subscribers lag detection
- No blocking on emit (fire-and-forget)
## Testing
### Unit Tests
```rust
#[tokio::test]
async fn test_resource_changed_emission() {
let event_bus = EventBus::new(100);
let mut subscriber = event_bus.subscribe();
let tm = TransactionManager::new(event_bus.clone());
// Commit should emit ResourceChanged
let album = tm.commit::<albums::Model, Album>(library, model).await.unwrap();
let event = subscriber.recv().await.unwrap();
match event.kind {
EventKind::ResourceChanged { resource_type, resource } => {
assert_eq!(resource_type, "album");
let decoded: Album = serde_json::from_value(resource).unwrap();
assert_eq!(decoded.id, album.id);
}
_ => panic!("Wrong event type"),
}
}
```
### Integration Tests
- Create resource on Device A → Event emitted
- Device B receives sync entry → Event emitted locally
- Client cache updates → UI reflects change
## Migration Checklist
- [ ] Implement new Event enum with EventEnvelope
- [ ] Update TransactionManager to emit ResourceChanged
- [ ] Create ResourceTypeRegistry for Swift
- [ ] Create ResourceTypeRegistry for TypeScript
- [ ] Update specta codegen to generate registry
- [ ] Migrate Albums to unified events
- [ ] Migrate Tags to unified events
- [ ] Migrate Locations to unified events
- [ ] Migrate Files to unified events
- [ ] Remove old event variants
- [ ] Remove manual event_bus.emit() calls
## Benefits Summary
### Rust Core
- ✅ Zero manual event emission
- ✅ 40 variants → 5 generic variants
- ✅ Type-safe: Events always match resources
- ✅ Centralized: All emission in TransactionManager
### Clients
- ✅ Zero switch statements per resource
- ✅ Type registry handles all deserialization
- ✅ Auto-generated via specta
- ✅ Add 100 resources: zero event handling changes
### Maintenance
- ✅ Less code: ~2000 lines eliminated
- ✅ No forgetting: TM always emits
- ✅ Consistent: Same pattern everywhere
- ✅ Scalable: Horizontal scaling achieved
## References
- **Sync System**: `docs/core/sync.md`
- **Client Cache**: `docs/core/normalized_cache.md`
- **Current Implementation**: `core/src/infra/event/mod.rs`
- **Design Details**: `docs/core/design/sync/UNIFIED_RESOURCE_EVENTS.md`

View File

@@ -0,0 +1,933 @@
# Normalized Client Cache
**Status**: Implementation Ready
**Version**: 2.0
**Last Updated**: 2025-10-08
## Overview
The Normalized Client Cache is a client-side entity store that provides instant UI updates, offline support, and massive bandwidth savings. Inspired by Apollo Client, it normalizes all resources by unique ID and updates atomically when events arrive.
## The Problem
**Traditional approach**:
```swift
// Query returns files
let files = try await client.query("files.search", input: searchParams)
// User renames file on Device B
// ...
// UI doesn't update! Must manually refetch:
let files = try await client.query("files.search", input: searchParams) // Network call
```
**Issues**:
- ❌ Stale data in UI
- ❌ Manual refetch required (slow, bandwidth-heavy)
- ❌ No offline support
- ❌ Duplicate data (same file in multiple queries)
## The Solution
**Normalized cache** + **event-driven updates**:
```swift
// Query uses cache
let files = cache.query("files.search", input: searchParams) // Instant!
// Device B renames file Event arrives
// Event: ResourceChanged { resource_type: "file", resource: File { id, name: "new.jpg" } }
// Cache updates automatically
cache.updateEntity(file)
// UI updates instantly (ObservableObject/StateFlow)
// No refetch, no network, no user action!
```
## Cache Architecture
### Two-Level Structure
```
┌─────────────────────────────────────────────────────────────┐
│ LEVEL 1: Entity Store (normalized by ID) │
│ │
│ "file:uuid-1" → File { id: uuid-1, name: "photo.jpg" } │
│ "file:uuid-2" → File { id: uuid-2, name: "doc.pdf" } │
│ "album:uuid-3" → Album { id: uuid-3, name: "Vacation" } │
│ "tag:uuid-4" → Tag { id: uuid-4, name: "Important" } │
│ │
└─────────────────────────────────────────────────────────────┘
│ Atomic updates
┌─────────────────────────────────────────────────────────────┐
│ LEVEL 2: Query Index (maps queries to entity IDs) │
│ │
│ "search:photos" → ["file:uuid-1", "file:uuid-2"] │
│ "directory:/vacation" → ["file:uuid-1"] │
│ "albums.list" → ["album:uuid-3"] │
│ │
└─────────────────────────────────────────────────────────────┘
```
**Key Insight**: When `file:uuid-1` updates, we find all queries referencing it and trigger UI updates for those views.
### Swift Implementation
```swift
/// Normalized entity cache with event-driven updates
actor NormalizedCache {
// LEVEL 1: Entity store
private var entityStore: [String: any Identifiable] = [:]
// LEVEL 2: Query index
private var queryIndex: [String: QueryCacheEntry] = [:]
// Observers for reactive UI updates
private var queryObservers: [String: Set<UUID>] = [:]
/// Update a single entity (called by event handler)
func updateEntity<T: Identifiable>(_ resource: T) {
let cacheKey = "\(T.resourceType):\(resource.id.uuidString)"
// 1. Update entity store
entityStore[cacheKey] = resource
// 2. Find all queries containing this entity
let affectedQueries = queryIndex.filter { _, entry in
entry.entityKeys.contains(cacheKey)
}
// 3. Notify observers (SwiftUI views re-render)
for (queryKey, _) in affectedQueries {
notifyObservers(for: queryKey)
}
}
/// Execute a query (with caching)
func query<T: Identifiable>(
_ method: String,
input: Encodable
) async throws -> [T] {
let queryKey = generateQueryKey(method, input)
// Check cache
if let cached = queryIndex[queryKey], !cached.isExpired {
// Cache hit! Return from entity store
return cached.entityKeys.compactMap { key in
entityStore[key] as? T
}
}
// Cache miss - fetch from server
let results: [T] = try await client.query(method, input: input)
// Store entities
for resource in results {
let cacheKey = "\(T.resourceType):\(resource.id.uuidString)"
entityStore[cacheKey] = resource
}
// Store query index
let entityKeys = results.map { "\(T.resourceType):\($0.id.uuidString)" }
queryIndex[queryKey] = QueryCacheEntry(
entityKeys: Set(entityKeys),
fetchedAt: Date(),
ttl: 300 // 5 minutes
)
return results
}
/// Delete entity (called by event handler)
func deleteEntity(resourceType: String, id: UUID) {
let cacheKey = "\(resourceType):\(id.uuidString)"
// Remove from store
entityStore.removeValue(forKey: cacheKey)
// Remove from query indices
for (queryKey, var entry) in queryIndex {
if entry.entityKeys.remove(cacheKey) != nil {
queryIndex[queryKey] = entry
notifyObservers(for: queryKey)
}
}
}
/// Invalidate queries (called by bulk operation events)
func invalidateQueriesForResource(_ resourceType: String, hints: [String: Any]) {
// Invalidate all queries matching hints (e.g., location_id)
let keysToInvalidate = queryIndex.keys.filter { queryKey in
if let locationId = hints["location_id"] as? String {
return queryKey.contains(locationId)
}
return queryKey.contains(resourceType)
}
for key in keysToInvalidate {
queryIndex.removeValue(forKey: key)
notifyObservers(for: key)
}
}
}
struct QueryCacheEntry {
var entityKeys: Set<String> // References to entity store
let fetchedAt: Date
let ttl: TimeInterval // Time to live
var isExpired: Bool {
Date().timeIntervalSince(fetchedAt) > ttl
}
}
```
### TypeScript Implementation
```typescript
/**
* Normalized entity cache with reactive updates
*/
export class NormalizedCache {
// LEVEL 1: Entity store
private entityStore = new Map<string, any>();
// LEVEL 2: Query index
private queryIndex = new Map<string, QueryCacheEntry>();
// Reactive subscriptions (for React hooks)
private querySubscriptions = new Map<string, Set<() => void>>();
/**
* Update entity (called by event handler)
*/
updateEntity(resourceType: string, resource: any) {
const cacheKey = `${resourceType}:${resource.id}`;
// 1. Update entity
this.entityStore.set(cacheKey, resource);
// 2. Find affected queries
for (const [queryKey, entry] of this.queryIndex.entries()) {
if (entry.entityKeys.has(cacheKey)) {
this.notifySubscribers(queryKey);
}
}
}
/**
* Query with caching
*/
async query<T>(method: string, input: any): Promise<T[]> {
const queryKey = this.generateQueryKey(method, input);
// Check cache
const cached = this.queryIndex.get(queryKey);
if (cached && !cached.isExpired()) {
// Cache hit!
return Array.from(cached.entityKeys)
.map(key => this.entityStore.get(key))
.filter(Boolean) as T[];
}
// Cache miss - fetch
const results: T[] = await this.client.query(method, input);
// Store entities
const entityKeys = new Set<string>();
for (const resource of results) {
const cacheKey = `${(resource as any).__resourceType}:${(resource as any).id}`;
this.entityStore.set(cacheKey, resource);
entityKeys.add(cacheKey);
}
// Store query
this.queryIndex.set(queryKey, {
entityKeys,
fetchedAt: Date.now(),
ttl: 300000, // 5 minutes
});
return results;
}
/**
* Subscribe to query changes (for React hooks)
*/
subscribe(queryKey: string, callback: () => void): () => void {
if (!this.querySubscriptions.has(queryKey)) {
this.querySubscriptions.set(queryKey, new Set());
}
this.querySubscriptions.get(queryKey)!.add(callback);
// Return unsubscribe function
return () => {
this.querySubscriptions.get(queryKey)?.delete(callback);
};
}
private notifySubscribers(queryKey: string) {
const subscribers = this.querySubscriptions.get(queryKey);
if (subscribers) {
subscribers.forEach(callback => callback());
}
}
}
```
## React Integration
### useCachedQuery Hook
```typescript
/**
* React hook for cached queries with automatic updates
*/
export function useCachedQuery<T>(
method: string,
input: any,
options?: { enabled?: boolean }
): { data: T[] | null; loading: boolean; error: Error | null } {
const cache = useContext(CacheContext);
const [data, setData] = useState<T[] | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<Error | null>(null);
useEffect(() => {
if (options?.enabled === false) return;
const queryKey = cache.generateQueryKey(method, input);
// Subscribe to cache changes
const unsubscribe = cache.subscribe(queryKey, () => {
// Query result changed - re-read from cache
const result = cache.getQueryResult<T>(queryKey);
setData(result);
});
// Initial fetch
(async () => {
try {
const result = await cache.query<T>(method, input);
setData(result);
} catch (e) {
setError(e as Error);
} finally {
setLoading(false);
}
})();
return unsubscribe;
}, [method, JSON.stringify(input), options?.enabled]);
return { data, loading, error };
}
// Usage in component
function AlbumList() {
const { data: albums, loading } = useCachedQuery<Album>('albums.list', {});
if (loading) return <Spinner />;
// When ResourceChanged event arrives for an album:
// 1. Cache updates
// 2. This component re-renders
// 3. User sees new data instantly!
return (
<div>
{albums?.map(album => <AlbumCard key={album.id} album={album} />)}
</div>
);
}
```
## SwiftUI Integration
### ObservableObject Pattern
```swift
/// Observable cache for SwiftUI
@MainActor
class CachedQueryClient: ObservableObject {
private let cache: NormalizedCache
@Published private var queryResults: [String: Any] = [:]
init(cache: NormalizedCache) {
self.cache = cache
// Subscribe to cache changes
Task {
for await notification in cache.changeStream {
// Update published results
queryResults[notification.queryKey] = notification.newValue
}
}
}
func query<T: Identifiable>(_ method: String, input: Encodable) async throws -> [T] {
let results = try await cache.query(method, input: input)
// Store in published results for observation
let queryKey = cache.generateQueryKey(method, input)
queryResults[queryKey] = results
return results
}
func getQueryResult<T>(_ queryKey: String) -> [T]? {
queryResults[queryKey] as? [T]
}
}
// Usage in SwiftUI view
struct AlbumListView: View {
@ObservedObject var client: CachedQueryClient
@State private var albums: [Album] = []
var body: some View {
List(albums, id: \.id) { album in
Text(album.name)
}
.task {
albums = try await client.query("albums.list", input: EmptyInput())
}
// When ResourceChanged event arrives:
// 1. Cache updates
// 2. client publishes change
// 3. SwiftUI re-renders
// 4. User sees update instantly!
}
}
```
## Memory Management
### LRU Eviction
```swift
actor NormalizedCache {
private let maxEntities: Int = 10_000
private var accessOrder: [String] = [] // LRU tracking
func updateEntity<T: Identifiable>(_ resource: T) {
let cacheKey = "\(T.resourceType):\(resource.id.uuidString)"
// Update store
entityStore[cacheKey] = resource
// Update access order (LRU)
if let index = accessOrder.firstIndex(of: cacheKey) {
accessOrder.remove(at: index)
}
accessOrder.append(cacheKey)
// Evict if over limit
if entityStore.count > maxEntities {
evictLRU()
}
}
private func evictLRU() {
// Evict oldest unreferenced entities
let referencedKeys = Set(queryIndex.values.flatMap { $0.entityKeys })
for key in accessOrder {
if !referencedKeys.contains(key) {
// Not in any active query - safe to evict
entityStore.removeValue(forKey: key)
accessOrder.removeAll { $0 == key }
if entityStore.count <= maxEntities * 9 / 10 {
break // Evicted 10% - done
}
}
}
}
}
```
### TTL (Time-To-Live)
```swift
struct QueryCacheEntry {
var entityKeys: Set<String>
let fetchedAt: Date
let ttl: TimeInterval = 300 // 5 minutes default
var isExpired: Bool {
Date().timeIntervalSince(fetchedAt) > ttl
}
}
// Different TTLs per query type
func getTTL(for method: String) -> TimeInterval {
switch method {
case "files.search": return 60 // 1 minute (changes frequently)
case "albums.list": return 300 // 5 minutes (changes rarely)
case "core.status": return 10 // 10 seconds (real-time)
default: return 300
}
}
```
### Reference Counting
```swift
// Track which queries reference each entity
private var entityRefCounts: [String: Int] = [:]
func removeQuery(_ queryKey: String) {
guard let entry = queryIndex[queryKey] else { return }
// Decrement ref counts
for entityKey in entry.entityKeys {
entityRefCounts[entityKey, default: 0] -= 1
// If no longer referenced, can evict
if entityRefCounts[entityKey] == 0 {
entityStore.removeValue(forKey: entityKey)
entityRefCounts.removeValue(forKey: entityKey)
}
}
queryIndex.removeValue(forKey: queryKey)
}
```
## Event-Driven Updates
### Integration with Event System
```swift
actor EventCacheUpdater {
let cache: NormalizedCache
func start(eventStream: AsyncStream<Event>) async {
for await event in eventStream {
await handleEvent(event)
}
}
func handleEvent(_ event: Event) async {
switch event.kind {
case .ResourceChanged(let resourceType, let resourceJSON):
// Decode resource
guard let resource = try? ResourceTypeRegistry.decode(
resourceType: resourceType,
from: resourceJSON
) else {
print("Failed to decode \(resourceType)")
return
}
// Update cache (triggers UI updates)
await cache.updateEntity(resource)
case .ResourceBatchChanged(let resourceType, let resourcesJSON, _):
// Batch update
for json in resourcesJSON {
if let resource = try? ResourceTypeRegistry.decode(resourceType: resourceType, from: json) {
await cache.updateEntity(resource)
}
}
case .ResourceDeleted(let resourceType, let resourceId):
// Remove from cache
await cache.deleteEntity(resourceType: resourceType, id: resourceId)
case .BulkOperationCompleted(let resourceType, _, _, let hints):
// Invalidate affected queries
await cache.invalidateQueriesMatching { queryKey in
// Match by location_id or other hints
if let locationId = hints["location_id"] as? String {
return queryKey.contains(locationId)
}
return queryKey.contains(resourceType)
}
default:
break
}
}
}
```
### Gap Detection
When events have sequence numbers, detect gaps caused by network issues:
```swift
actor NormalizedCache {
private var lastEventSequence: [UUID: UInt64] = [:] // library_id sequence
func processEvent(_ event: Event) async {
guard let libraryId = event.envelope.library_id,
let sequence = event.envelope.sequence else {
return
}
let lastSeq = lastEventSequence[libraryId] ?? 0
if sequence > lastSeq + 1 {
// Gap detected! Missed events
print("⚠️ Gap detected: expected \(lastSeq + 1), got \(sequence)")
await reconcileState(libraryId: libraryId, fromSequence: lastSeq + 1)
}
// Update sequence tracker
lastEventSequence[libraryId] = sequence
// Process event normally
await handleEvent(event)
}
/// Reconcile state after detecting missed events
func reconcileState(libraryId: UUID, fromSequence: UInt64) async {
print("🔄 Reconciling state from sequence \(fromSequence)")
// Option 1: Fetch missed events
if let missedEvents = try? await client.query(
"events.since.v1",
input: ["library_id": libraryId, "sequence": fromSequence]
) {
for event in missedEvents {
await processEvent(event)
}
}
// Option 2: Full cache invalidation (fallback)
invalidateLibrary(libraryId)
}
}
```
## Cache Persistence (Offline Support)
### SQLite Storage
```swift
import SQLite
actor NormalizedCache {
private let db: Connection
init() {
// SQLite database for cache persistence
let path = FileManager.default
.urls(for: .cachesDirectory, in: .userDomainMask)[0]
.appendingPathComponent("spacedrive_cache.db")
db = try! Connection(path.path)
createTables()
}
func createTables() {
try! db.run("""
CREATE TABLE IF NOT EXISTS entities (
cache_key TEXT PRIMARY KEY,
resource_type TEXT NOT NULL,
resource_data TEXT NOT NULL,
updated_at INTEGER NOT NULL
)
""")
try! db.run("""
CREATE TABLE IF NOT EXISTS queries (
query_key TEXT PRIMARY KEY,
entity_keys TEXT NOT NULL,
fetched_at INTEGER NOT NULL,
ttl INTEGER NOT NULL
)
""")
}
func updateEntity<T: Identifiable>(_ resource: T) async {
let cacheKey = "\(T.resourceType):\(resource.id.uuidString)"
let json = try! JSONEncoder().encode(resource)
// Update memory
entityStore[cacheKey] = resource
// Persist to disk
let stmt = try! db.prepare("""
INSERT OR REPLACE INTO entities (cache_key, resource_type, resource_data, updated_at)
VALUES (?, ?, ?, ?)
""")
try! stmt.run(cacheKey, T.resourceType, String(data: json, encoding: .utf8)!, Date().timeIntervalSince1970)
}
/// Load cache from disk on startup
func loadFromDisk() async {
let stmt = try! db.prepare("SELECT cache_key, resource_data FROM entities")
for row in stmt {
let cacheKey = row[0] as! String
let jsonString = row[1] as! String
// Deserialize using type registry
let parts = cacheKey.split(separator: ":")
let resourceType = String(parts[0])
if let data = jsonString.data(using: .utf8),
let resource = try? ResourceTypeRegistry.decode(resourceType: resourceType, from: data) {
entityStore[cacheKey] = resource
}
}
print("✅ Loaded \(entityStore.count) entities from disk cache")
}
}
```
## Optimistic Updates
```swift
actor NormalizedCache {
private var optimisticUpdates: [UUID: any Identifiable] = [:] // pending_id resource
/// Apply optimistic update immediately
func updateOptimistically<T: Identifiable>(pendingId: UUID, resource: T) {
let cacheKey = "\(T.resourceType):\(resource.id.uuidString)"
// Store in both places
entityStore[cacheKey] = resource
optimisticUpdates[pendingId] = resource
// Notify observers (UI updates instantly!)
notifyAffectedQueries(cacheKey)
}
/// Commit optimistic update when server confirms
func commitOptimisticUpdate(pendingId: UUID, confirmedResource: any Identifiable) {
optimisticUpdates.removeValue(forKey: pendingId)
updateEntity(confirmedResource) // Final update
}
/// Rollback optimistic update on error
func rollbackOptimisticUpdate(pendingId: UUID) {
guard let resource = optimisticUpdates.removeValue(forKey: pendingId) else {
return
}
let cacheKey = "\(type(of: resource).resourceType):\(resource.id.uuidString)"
entityStore.removeValue(forKey: cacheKey)
notifyAffectedQueries(cacheKey)
}
}
// Usage example
func renameAlbum(id: UUID, newName: String) async throws {
let pendingId = UUID()
// 1. Optimistic update (instant UI)
let optimisticAlbum = Album(id: id, name: newName, cover: nil)
await cache.updateOptimistically(pendingId: pendingId, resource: optimisticAlbum)
do {
// 2. Send action to server
let confirmed = try await client.action("albums.rename.v1", input: ["id": id, "name": newName])
// 3. Commit (replace optimistic with confirmed)
await cache.commitOptimisticUpdate(pendingId: pendingId, confirmedResource: confirmed)
} catch {
// 4. Rollback on error
await cache.rollbackOptimisticUpdate(pendingId: pendingId)
throw error
}
}
```
## Query Invalidation
### Manual Invalidation
```swift
// After bulk operations
cache.invalidateQuery("files.search", input: searchParams)
// After mutations
cache.invalidateQueriesMatching { queryKey in
queryKey.contains("albums.list")
}
// Clear entire library
cache.invalidateLibrary(libraryId)
```
### Automatic Invalidation
```swift
// ResourceBatchChanged with hints
case .ResourceBatchChanged(_, _, let operation):
switch operation {
case .Index:
// Invalidate directory listings
cache.invalidateQueriesMatching { $0.contains("directory:") }
case .WatcherBatch:
// Keep cache (events contain full data)
break
}
```
## Memory Budget
```swift
struct CacheConfig {
// Entity store limits
let maxEntities: Int = 10_000 // ~10MB at 1KB/entity
let evictionThreshold: Int = 9_000 // Start evicting at 90%
// Query limits
let maxQueries: Int = 100
let defaultTTL: TimeInterval = 300 // 5 minutes
// Persistence
let persistToDisk: Bool = true
let maxDiskSize: Int64 = 50_000_000 // 50MB
}
```
## Testing
### Unit Tests
```swift
func testCacheUpdate() async {
let cache = NormalizedCache()
// Store entity
let album = Album(id: UUID(), name: "Test", cover: nil)
await cache.updateEntity(album)
// Verify stored
let retrieved = await cache.getEntity(Album.self, id: album.id)
XCTAssertEqual(retrieved?.name, "Test")
}
func testQueryInvalidation() async {
let cache = NormalizedCache()
// Query and cache
let albums = try await cache.query("albums.list", input: EmptyInput())
XCTAssertEqual(albums.count, 5)
// Invalidate
await cache.invalidateQuery("albums.list", input: EmptyInput())
// Verify cache miss
let cached = await cache.getQueryResult("albums.list", input: EmptyInput())
XCTAssertNil(cached)
}
```
### Integration Tests
1. **Real-time update**: Create album on Device A → Event → Device B cache updates
2. **Offline resilience**: Disconnect → Queue writes → Reconnect → Sync
3. **Memory limits**: Load 20K entities → Verify LRU eviction
4. **Gap detection**: Miss events → Detect gap → Reconcile
## Performance Metrics
### Cache Hit Rates (Target)
- File queries: >90% hit rate
- Album/Tag queries: >95% hit rate
- Search queries: >70% hit rate (more volatile)
### Memory Usage (Typical)
- Entity store: 5-10MB (5K-10K entities)
- Query index: 1-2MB (100 queries)
- Total: <15MB
### Update Latency
- Event received → Cache updated: <1ms
- Cache updated → UI re-renders: <16ms (1 frame)
- Total: <20ms from server to UI
## Implementation Checklist
### Swift
- [ ] Create `NormalizedCache` actor
- [ ] Implement entity store + query index
- [ ] Implement `EventCacheUpdater`
- [ ] Create `ResourceTypeRegistry`
- [ ] Add LRU eviction
- [ ] Add SQLite persistence
- [ ] Create `CachedQueryClient` (ObservableObject)
- [ ] Create SwiftUI view integration
- [ ] Unit tests
- [ ] Integration tests
### TypeScript/React
- [ ] Create `NormalizedCache` class
- [ ] Implement entity store + query index
- [ ] Create `EventCacheUpdater`
- [ ] Create `ResourceTypeRegistry`
- [ ] Add LRU eviction
- [ ] Add IndexedDB persistence
- [ ] Create `useCachedQuery` hook
- [ ] Create React integration examples
- [ ] Unit tests
- [ ] Integration tests
## Migration Strategy
### Phase 1: Parallel Systems
- New cache runs alongside existing query system
- No breaking changes
- Opt-in per view/component
### Phase 2: Gradual Adoption
- Migrate high-traffic views first (file browser, search)
- Measure: Cache hit rate, UI responsiveness
- Iterate on memory management
### Phase 3: Full Migration
- All queries use cache
- Remove old query caching logic
- Cleanup legacy code
## Edge Cases
### Circular References
```swift
// File references Album, Album references Files (cover)
// Solution: Store by ID, resolve lazily
struct Album {
let id: UUID
let name: String
let coverFileId: UUID? // Just ID, not full File object
}
// UI resolves when needed:
let coverFile = cache.getEntity(File.self, id: album.coverFileId)
```
### Large Resources
```swift
// File with 1000 tags (rare but possible)
// Solution: Paginate relationships or use lazy loading
struct File {
let id: UUID
let name: String
let tagIds: [UUID] // Just IDs
// NOT: tags: [Tag] // Would explode memory
}
// Load tags on demand:
let tags = album.tagIds.compactMap { cache.getEntity(Tag.self, id: $0) }
```
## References
- **Sync System**: `docs/core/sync.md`
- **Event System**: `docs/core/events.md`
- **Design Details**: `docs/core/design/sync/NORMALIZED_CACHE_DESIGN.md` (2674 lines, comprehensive)
- **Client Architecture**: `docs/core/design/sync/SYNC_TX_CACHE_MINI_SPEC.md`

561
docs/core/sync.md Normal file
View File

@@ -0,0 +1,561 @@
# Spacedrive Sync System
**Status**: Implementation Ready
**Version**: 2.0
**Last Updated**: 2025-10-08
## Overview
Spacedrive's sync system enables real-time, multi-device synchronization of library metadata, ensuring that changes made on one device are reflected across all paired devices. This document provides the definitive specification for implementing sync.
## Core Architecture
### The Three Pillars
1. **TransactionManager (TM)**: Sole gatekeeper for all syncable database writes, ensuring atomic DB commits + sync log creation
2. **Sync Log**: Append-only, sequentially-ordered log of all state changes per library, maintained only by the leader device
3. **Sync Service**: Replicates sync log entries between paired devices using pull-based synchronization
### Data Flow
```
┌─────────────────────────────────────────────────────────────────────┐
│ Device A │
│ │
│ User Action (e.g., create album) │
│ ↓ │
│ [ Action Layer ] │
│ ↓ │
│ [ TransactionManager ] │
│ ↓ │
│ ┌─────────────────────────────┐ │
│ │ ATOMIC TRANSACTION │ │
│ │ 1. Write to database │ │
│ │ 2. Create sync log entry │ │
│ │ COMMIT │ │
│ └─────────────────────────────┘ │
│ ↓ │
│ [ Event Bus ] → Client cache updates │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓ Sync replication
┌─────────────────────────────────────────────────────────────────────┐
│ Device B │
│ │
│ [ Sync Service ] │
│ ↓ (polls for new entries) │
│ Fetch sync log from Device A │
│ ↓ │
│ [ Apply Sync Entry ] │
│ ↓ │
│ [ TransactionManager ] (applies change) │
│ ↓ │
│ Database updated + Event emitted │
│ ↓ │
│ Client cache updates → UI reflects change │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
## Syncable Trait
All database models that need to sync implement the `Syncable` trait:
```rust
/// Enables automatic sync log creation for database models
pub trait Syncable {
/// Stable model identifier used in sync logs (e.g., "album", "tag", "entry")
const SYNC_MODEL: &'static str;
/// Globally unique ID for this resource across all devices
fn sync_id(&self) -> Uuid;
/// Version number for optimistic concurrency control
fn version(&self) -> i64;
/// Optional: Exclude platform-specific or derived fields from sync
fn exclude_fields() -> Option<&'static [&'static str]> {
None
}
/// Optional: Convert to sync-safe JSON (default: full serialization)
fn to_sync_json(&self) -> serde_json::Value where Self: Serialize {
serde_json::to_value(self).unwrap_or(serde_json::json!({}))
}
}
```
**Example Implementation**:
```rust
// Database model
#[derive(Clone, Debug, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "albums")]
pub struct Model {
pub id: i32, // Database primary key
pub uuid: Uuid, // Sync identifier
pub name: String,
pub version: i64, // For conflict resolution
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl Syncable for albums::Model {
const SYNC_MODEL: &'static str = "album";
fn sync_id(&self) -> Uuid {
self.uuid
}
fn version(&self) -> i64 {
self.version
}
fn exclude_fields() -> Option<&'static [&'static str]> {
// Don't sync database IDs or timestamps (platform-specific)
Some(&["id", "created_at", "updated_at"])
}
}
```
## TransactionManager
The TM is the **only** component that performs state-changing writes. It guarantees atomicity and automatic sync log creation.
### Core API
```rust
pub struct TransactionManager {
event_bus: Arc<EventBus>,
sync_sequence: Arc<Mutex<HashMap<Uuid, u64>>>, // library_id → sequence
}
impl TransactionManager {
/// Commit single resource change (creates sync log)
pub async fn commit<M, R>(
&self,
library: Arc<Library>,
model: M,
) -> Result<R, TxError>
where
M: Syncable + IntoActiveModel,
R: Identifiable + From<M>;
/// Commit batch of changes (10-1K items, creates per-item sync logs)
pub async fn commit_batch<M, R>(
&self,
library: Arc<Library>,
models: Vec<M>,
) -> Result<Vec<R>, TxError>
where
M: Syncable + IntoActiveModel,
R: Identifiable + From<M>;
/// Commit bulk operation (1K+ items, creates ONE metadata sync log)
pub async fn commit_bulk<M>(
&self,
library: Arc<Library>,
changes: ChangeSet<M>,
) -> Result<BulkAck, TxError>
where
M: Syncable + IntoActiveModel;
}
```
### Commit Strategies
| Method | Use Case | Sync Log | Event | Example |
|--------|----------|----------|-------|---------|
| `commit()` | Single user action | 1 per item | Rich resource | User renames file |
| `commit_batch()` | Watcher events (10-1K) | 1 per item | Batch | User copies folder |
| `commit_bulk()` | Initial indexing (1K+) | 1 metadata only | Summary | Index 1M files |
### Critical: Bulk Operations
**Problem**: Indexing 1M files shouldn't create 1M sync log entries.
**Solution**: Bulk operations create **ONE** metadata sync log:
```json
{
"sequence": 1234,
"model_type": "bulk_operation",
"operation": "InitialIndex",
"location_id": "uuid-...",
"affected_count": 1000000,
"hints": {
"location_path": "/Users/alice/Photos"
}
}
```
**Why**: Each device indexes its own filesystem independently. The sync log just says "I indexed location X" — it does NOT replicate 1M entries. Other devices trigger their own local indexing jobs when they see this notification.
**Performance Impact**:
- With per-entry sync logs: ~500MB, 10 minutes, 3M operations
- With bulk metadata: ~500 bytes, 1 minute, 1M operations (10x faster!)
### Usage Example
```rust
// Before: Manual DB write + event emission (error-prone)
let model = albums::ActiveModel { /* ... */ };
model.insert(db).await?;
event_bus.emit(Event::AlbumCreated { /* ... */ }); // Can forget this!
// After: TransactionManager (atomic, automatic)
let model = albums::ActiveModel { /* ... */ };
let album = tm.commit::<albums::Model, Album>(library, model).await?;
// ✅ DB write + sync log + event — all atomic!
```
## Sync Log Schema
```sql
CREATE TABLE sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sequence INTEGER NOT NULL, -- Monotonic per library
library_id TEXT NOT NULL,
device_id TEXT NOT NULL, -- Device that created this entry
timestamp TEXT NOT NULL,
-- Change details
model_type TEXT NOT NULL, -- "album", "tag", "entry", "bulk_operation"
record_id TEXT NOT NULL, -- UUID of changed record
change_type TEXT NOT NULL, -- "insert", "update", "delete", "bulk_insert"
version INTEGER NOT NULL DEFAULT 1, -- Optimistic concurrency
-- Data payload (JSON)
data TEXT NOT NULL,
UNIQUE(library_id, sequence)
);
CREATE INDEX idx_sync_log_library_sequence ON sync_log(library_id, sequence);
CREATE INDEX idx_sync_log_device ON sync_log(device_id);
CREATE INDEX idx_sync_log_model_record ON sync_log(model_type, record_id);
```
## Leader Election
Each library requires a **single leader device** responsible for assigning sync log sequence numbers. This prevents sequence collisions.
### Election Strategy
1. **Initial Leader**: Device that creates the library
2. **Heartbeat**: Leader sends heartbeat every 30 seconds
3. **Re-election**: If leader offline >60s, devices elect new leader (highest device_id wins)
4. **Lease**: Leader holds exclusive write lease
### Implementation
```rust
pub struct SyncLeader {
library_id: Uuid,
leader_device_id: Uuid,
lease_expires_at: DateTime<Utc>,
}
impl TransactionManager {
pub async fn request_leadership(&self, library_id: Uuid) -> Result<bool, TxError> {
// Check if current leader is still valid
// If not, attempt to become leader
// Update leadership table with lease
}
pub async fn is_leader(&self, library_id: Uuid) -> bool {
// Check if this device holds valid lease
}
async fn next_sequence(&self, library_id: Uuid) -> Result<u64, TxError> {
if !self.is_leader(library_id).await {
return Err(TxError::NotLeader);
}
let mut sequences = self.sync_sequence.lock().unwrap();
let seq = sequences.entry(library_id).or_insert(0);
*seq += 1;
Ok(*seq)
}
}
```
## Sync Service (Follower)
Devices that are not the leader pull sync log entries and apply them locally.
```rust
pub struct SyncFollowerService {
library_id: Uuid,
leader_device_id: Uuid,
last_synced_sequence: Arc<Mutex<u64>>,
tx_manager: Arc<TransactionManager>,
}
impl SyncFollowerService {
/// Poll for new sync entries (called every 5 seconds)
pub async fn sync_iteration(&mut self) -> Result<SyncResult, SyncError> {
let last_seq = *self.last_synced_sequence.lock().unwrap();
// Fetch entries from leader since last_seq
let entries = self.fetch_entries_from_leader(last_seq).await?;
if entries.is_empty() {
return Ok(SyncResult::NoChanges);
}
// Apply each entry
for entry in entries {
self.apply_sync_entry(entry).await?;
}
Ok(SyncResult::Applied { count: entries.len() })
}
async fn apply_sync_entry(&mut self, entry: SyncLogEntry) -> Result<(), SyncError> {
match entry.model_type.as_str() {
"bulk_operation" => {
// Parse metadata
let metadata: BulkOperationMetadata = serde_json::from_value(entry.data)?;
self.handle_bulk_operation(metadata).await?;
}
_ => {
// Regular sync entry - deserialize and apply
let model = self.deserialize_model(&entry)?;
self.apply_model_change(model, entry.change_type).await?;
}
}
// Update last synced sequence
*self.last_synced_sequence.lock().unwrap() = entry.sequence;
Ok(())
}
async fn handle_bulk_operation(&mut self, metadata: BulkOperationMetadata) -> Result<(), SyncError> {
match metadata.operation {
BulkOperation::InitialIndex { location_id, location_path } => {
tracing::info!(
"Peer indexed location {} with {} entries",
location_id, metadata.affected_count
);
// Check if we have this location locally
if let Some(local_location) = self.find_matching_location(&location_path).await? {
// Trigger our own indexing job
self.job_manager.queue(IndexerJob {
location_id: local_location.id,
mode: IndexMode::Full,
}).await?;
}
}
_ => {}
}
Ok(())
}
}
```
## Library Sync Setup (Phase 1)
Before devices can sync, they must:
1. **Pair** (cryptographic authentication)
2. **Discover** libraries on remote device
3. **Register** devices in each other's libraries
See `sync-setup.md` for complete implementation details.
### Setup Flow
```rust
// 1. Discover remote libraries (after pairing)
let discovery = client.query(
"query:network.sync_setup.discover.v1",
DiscoverRemoteLibrariesInput { device_id: paired_device.id }
).await?;
// 2. Setup library sync (RegisterOnly in Phase 1)
let setup_result = client.action(
"action:network.sync_setup.input.v1",
LibrarySyncSetupInput {
local_device_id: my_device_id,
remote_device_id: paired_device.id,
local_library_id: my_library.id,
remote_library_id: discovery.libraries[0].id,
action: LibrarySyncAction::RegisterOnly,
leader_device_id: my_device_id, // This device becomes leader
}
).await?;
// 3. Ready for sync!
// Sync service starts polling for changes
```
## Sync Domains
Spacedrive syncs different types of data with different strategies:
| Domain | What Syncs | Strategy |
|--------|-----------|----------|
| **Index** | File/folder entries | Metadata only (each device indexes own filesystem) |
| **Metadata** | Tags, albums, collections | Full replication across devices |
| **Content** | File content (future) | User-configured sync conduits |
| **State** | UI state, preferences | Device-specific, no sync |
## Conflict Resolution
### Optimistic Concurrency
All `Syncable` models have a `version` field. When applying a sync entry:
```rust
async fn apply_model_change(&self, remote_model: Model, change_type: ChangeType) -> Result<()> {
match change_type {
ChangeType::Update => {
// Fetch current local version
let local_model = Model::find_by_uuid(remote_model.sync_id(), db).await?;
if let Some(local) = local_model {
if local.version >= remote_model.version {
// Local is newer or same - skip update
tracing::debug!("Skipping sync entry: local version is newer");
return Ok(());
}
}
// Remote is newer - apply update
remote_model.update(db).await?;
}
ChangeType::Insert => {
remote_model.insert(db).await?;
}
ChangeType::Delete => {
Model::delete_by_uuid(remote_model.sync_id(), db).await?;
}
}
Ok(())
}
```
### Conflict Strategy
- **Last-Write-Wins (LWW)**: Use `version` field to determine winner
- **No CRDTs**: Simpler, sufficient for metadata sync
- **User Metadata**: Tags, albums use union merge (both versions kept)
## Raw SQL Compatibility
**Reads**: Unrestricted. Use SeaORM query builder or raw SQL freely.
**Writes**: Must go through TransactionManager. For advanced cases:
```rust
tm.with_tx(library, |txn| async move {
// Raw SQL writes inside TM transaction
txn.execute(Statement::from_sql_and_values(
DbBackend::Sqlite,
"UPDATE albums SET name = ? WHERE uuid = ?",
vec![name.into(), uuid.into()],
)).await?;
// Tell TM to log this change
tm.sync_log_for::<albums::Model>(txn, uuid).await?;
Ok(())
}).await?;
```
## Implementation Roadmap
### Phase 1: Foundation (Current)
- [x] Device pairing protocol
- [x] Library sync setup (RegisterOnly)
- [ ] TransactionManager core
- [ ] Syncable trait + derives
- [ ] Sync log schema
- [ ] Leader election
### Phase 2: Basic Sync
- [ ] Sync follower service (pull-based)
- [ ] Apply sync entries
- [ ] Handle bulk operations
- [ ] Conflict resolution
- [ ] Album/Tag/Location sync
### Phase 3: File Sync
- [ ] Entry sync (metadata only)
- [ ] Watcher integration
- [ ] Bulk indexing with metadata logs
- [ ] Cross-device file operations
### Phase 4: Advanced Features
- [ ] Content sync (via sync conduits)
- [ ] Push-based sync (optional optimization)
- [ ] Multi-leader support
- [ ] Conflict resolution UI
## Performance Considerations
### Indexing Performance
- **1M files, per-entry logs**: 10 minutes, 500MB sync log
- **1M files, bulk metadata**: 1 minute, 500 bytes sync log
- **Result**: 10x faster, 1 million times smaller sync log
### Network Efficiency
- Pull-based sync: Batch fetch (max 100 entries per request)
- Compression: Gzip sync log JSON (typically 5x reduction)
- Delta sync: Only fetch entries since last sequence
### Database Optimization
- Sync log: Append-only, no updates (fast writes)
- Indexes on (library_id, sequence) for efficient polling
- Vacuum old entries after successful sync (> 30 days)
## Security
### Encryption
- All sync data transmitted over encrypted Iroh streams
- Sync log contains full model data (no encryption at rest in Phase 1)
- Future: Library-level encryption (see `AT_REST_LIBRARY_ENCRYPTION.md`)
### Access Control
- Only paired devices can sync
- Device pairing uses cryptographic challenge/response
- Leader election prevents unauthorized writes
## Testing Strategy
### Unit Tests
```rust
#[tokio::test]
async fn test_sync_log_creation() {
let tm = TransactionManager::new(event_bus);
let model = albums::Model { /* ... */ };
let album = tm.commit::<albums::Model, Album>(library, model).await.unwrap();
// Verify sync log entry created
let entry = sync_log::Entity::find()
.filter(sync_log::Column::RecordId.eq(album.id))
.one(db)
.await
.unwrap()
.unwrap();
assert_eq!(entry.model_type, "album");
}
```
### Integration Tests
- Two-device sync simulation
- Leader failover scenarios
- Bulk operation handling
- Conflict resolution
## References
- **Sync Setup**: `docs/core/sync-setup.md`
- **Event System**: `docs/core/events.md`
- **Client Cache**: `docs/core/normalized_cache.md`
- **Design Details**: `docs/core/design/sync/`