From 19f7413845d0efb4d484d86c441e1a13741fd53d Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Wed, 15 Oct 2025 09:56:35 -0700 Subject: [PATCH] feat: add sync_conduit and sync_generation entities --- core/src/infra/db/entities/mod.rs | 6 + core/src/infra/db/entities/sync_conduit.rs | 133 +++++++ core/src/infra/db/entities/sync_generation.rs | 111 ++++++ .../m20251015_000002_create_sync_tables.rs | 260 +++++++++++++ core/src/ops/files/copy/strategy.rs | 351 ++++++++++++++---- core/src/service/network/device/registry.rs | 6 +- core/src/service/sync/applier.rs | 32 -- core/src/service/sync/mod.rs | 2 - 8 files changed, 786 insertions(+), 115 deletions(-) create mode 100644 core/src/infra/db/entities/sync_conduit.rs create mode 100644 core/src/infra/db/entities/sync_generation.rs create mode 100644 core/src/infra/db/migration/m20251015_000002_create_sync_tables.rs delete mode 100644 core/src/service/sync/applier.rs diff --git a/core/src/infra/db/entities/mod.rs b/core/src/infra/db/entities/mod.rs index a1cd812b7..7dd697f87 100644 --- a/core/src/infra/db/entities/mod.rs +++ b/core/src/infra/db/entities/mod.rs @@ -25,6 +25,8 @@ pub mod collection_entry; pub mod indexer_rule; pub mod sidecar; pub mod sidecar_availability; +pub mod sync_conduit; +pub mod sync_generation; pub mod volume; // Re-export all entities @@ -40,6 +42,8 @@ pub use indexer_rule::Entity as IndexerRule; pub use location::Entity as Location; pub use sidecar::Entity as Sidecar; pub use sidecar_availability::Entity as SidecarAvailability; +pub use sync_conduit::Entity as SyncConduit; +pub use sync_generation::Entity as SyncGeneration; pub use user_metadata::Entity as UserMetadata; pub use volume::Entity as Volume; @@ -63,6 +67,8 @@ pub use indexer_rule::ActiveModel as IndexerRuleActive; pub use location::ActiveModel as LocationActive; pub use sidecar::ActiveModel as SidecarActive; pub use sidecar_availability::ActiveModel as SidecarAvailabilityActive; +pub use sync_conduit::ActiveModel as SyncConduitActive; +pub use sync_generation::ActiveModel as SyncGenerationActive; pub use user_metadata::ActiveModel as UserMetadataActive; pub use volume::ActiveModel as VolumeActive; diff --git a/core/src/infra/db/entities/sync_conduit.rs b/core/src/infra/db/entities/sync_conduit.rs new file mode 100644 index 000000000..42db1fc48 --- /dev/null +++ b/core/src/infra/db/entities/sync_conduit.rs @@ -0,0 +1,133 @@ +use chrono::{DateTime, Utc}; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Represents a sync relationship between two directories +/// +/// A SyncConduit defines a persistent sync configuration between a source and target directory, +/// tracking sync state, configuration, and statistics. +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "sync_conduit")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + + #[sea_orm(unique)] + pub uuid: Uuid, + + /// Source directory entry ID (must be a directory) + pub source_entry_id: i32, + + /// Target directory entry ID (must be a directory) + pub target_entry_id: i32, + + /// Sync mode: "mirror", "bidirectional", or "selective" + pub sync_mode: String, + + /// Whether this conduit is active + pub enabled: bool, + + /// Sync schedule: "instant", "interval:5m", or "manual" + pub schedule: String, + + /// Whether to use indexer rules for filtering + pub use_index_rules: bool, + + /// Optional override for index mode (e.g., "shallow", "deep") + pub index_mode_override: Option, + + /// Number of parallel file transfers + pub parallel_transfers: i32, + + /// Optional bandwidth limit in MB/s + pub bandwidth_limit_mbps: Option, + + /// Timestamp of last successful sync completion + pub last_sync_completed_at: Option>, + + /// Current sync generation number (increments on each sync) + pub sync_generation: i64, + + /// Last error message if sync failed + pub last_sync_error: Option, + + /// Total number of syncs performed + pub total_syncs: i64, + + /// Total number of files synced + pub files_synced: i64, + + /// Total bytes transferred + pub bytes_transferred: i64, + + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + /// Foreign key to source entry + #[sea_orm( + belongs_to = "super::entry::Entity", + from = "Column::SourceEntryId", + to = "super::entry::Column::Id" + )] + SourceEntry, + + /// Foreign key to target entry + #[sea_orm( + belongs_to = "super::entry::Entity", + from = "Column::TargetEntryId", + to = "super::entry::Column::Id" + )] + TargetEntry, + + /// One-to-many relationship with sync generations + #[sea_orm(has_many = "super::sync_generation::Entity")] + SyncGenerations, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::SyncGenerations.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +/// Sync mode variants +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum SyncMode { + /// One-way sync from source to target with automatic cleanup + Mirror, + /// Two-way sync with conflict detection + Bidirectional, + /// Intelligent local storage management (future) + Selective, +} + +impl SyncMode { + pub fn as_str(&self) -> &'static str { + match self { + Self::Mirror => "mirror", + Self::Bidirectional => "bidirectional", + Self::Selective => "selective", + } + } + + pub fn from_str(s: &str) -> Option { + match s { + "mirror" => Some(Self::Mirror), + "bidirectional" => Some(Self::Bidirectional), + "selective" => Some(Self::Selective), + _ => None, + } + } +} + +impl std::fmt::Display for SyncMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} diff --git a/core/src/infra/db/entities/sync_generation.rs b/core/src/infra/db/entities/sync_generation.rs new file mode 100644 index 000000000..2fc295b13 --- /dev/null +++ b/core/src/infra/db/entities/sync_generation.rs @@ -0,0 +1,111 @@ +use chrono::{DateTime, Utc}; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +/// Tracks individual sync operations for history and verification +/// +/// Each sync execution creates a new generation with metrics and verification status. +/// Generations enable sync history tracking and verification of sync consistency. +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "sync_generation")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + + /// Foreign key to sync_conduit + pub conduit_id: i32, + + /// Generation number (monotonically increasing) + pub generation: i64, + + /// When this sync generation started + pub started_at: DateTime, + + /// When this sync generation completed (None if still running) + pub completed_at: Option>, + + /// Number of files copied during this sync + pub files_copied: i32, + + /// Number of files deleted during this sync + pub files_deleted: i32, + + /// Number of conflicts resolved during this sync + pub conflicts_resolved: i32, + + /// Total bytes transferred during this sync + pub bytes_transferred: i64, + + /// Number of errors encountered during this sync + pub errors_encountered: i32, + + /// When verification was performed (None if not yet verified) + pub verified_at: Option>, + + /// Verification status: "unverified", "waiting_watcher", "waiting_library_sync", "verified", "failed:" + pub verification_status: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + /// Foreign key to sync_conduit + #[sea_orm( + belongs_to = "super::sync_conduit::Entity", + from = "Column::ConduitId", + to = "super::sync_conduit::Column::Id" + )] + SyncConduit, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::SyncConduit.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +/// Verification status variants +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum VerificationStatus { + /// Sync completed, not yet verified + Unverified, + /// Waiting for filesystem watcher to update index + WaitingWatcher, + /// Waiting for library sync to propagate changes + WaitingLibrarySync, + /// Verification query confirmed consistency + Verified, +} + +impl VerificationStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Unverified => "unverified", + Self::WaitingWatcher => "waiting_watcher", + Self::WaitingLibrarySync => "waiting_library_sync", + Self::Verified => "verified", + } + } + + pub fn from_str(s: &str) -> Option { + match s { + "unverified" => Some(Self::Unverified), + "waiting_watcher" => Some(Self::WaitingWatcher), + "waiting_library_sync" => Some(Self::WaitingLibrarySync), + "verified" => Some(Self::Verified), + _ if s.starts_with("failed:") => None, // Failed states are dynamic + _ => None, + } + } + + pub fn failed(reason: &str) -> String { + format!("failed:{}", reason) + } +} + +impl std::fmt::Display for VerificationStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} diff --git a/core/src/infra/db/migration/m20251015_000002_create_sync_tables.rs b/core/src/infra/db/migration/m20251015_000002_create_sync_tables.rs new file mode 100644 index 000000000..320db2aad --- /dev/null +++ b/core/src/infra/db/migration/m20251015_000002_create_sync_tables.rs @@ -0,0 +1,260 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Create sync_conduit table + manager + .create_table( + Table::create() + .table(SyncConduit::Table) + .if_not_exists() + .col( + ColumnDef::new(SyncConduit::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(SyncConduit::Uuid).binary().not_null().unique_key()) + .col(ColumnDef::new(SyncConduit::SourceEntryId).integer().not_null()) + .col(ColumnDef::new(SyncConduit::TargetEntryId).integer().not_null()) + .col(ColumnDef::new(SyncConduit::SyncMode).string().not_null()) + .col( + ColumnDef::new(SyncConduit::Enabled) + .boolean() + .not_null() + .default(true), + ) + .col( + ColumnDef::new(SyncConduit::Schedule) + .string() + .not_null() + .default("manual"), + ) + .col( + ColumnDef::new(SyncConduit::UseIndexRules) + .boolean() + .not_null() + .default(true), + ) + .col(ColumnDef::new(SyncConduit::IndexModeOverride).string()) + .col( + ColumnDef::new(SyncConduit::ParallelTransfers) + .integer() + .not_null() + .default(3), + ) + .col(ColumnDef::new(SyncConduit::BandwidthLimitMbps).integer()) + .col(ColumnDef::new(SyncConduit::LastSyncCompletedAt).timestamp_with_time_zone()) + .col( + ColumnDef::new(SyncConduit::SyncGeneration) + .big_integer() + .not_null() + .default(0), + ) + .col(ColumnDef::new(SyncConduit::LastSyncError).string()) + .col( + ColumnDef::new(SyncConduit::TotalSyncs) + .big_integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncConduit::FilesSynced) + .big_integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncConduit::BytesTransferred) + .big_integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncConduit::CreatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .col( + ColumnDef::new(SyncConduit::UpdatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .foreign_key( + ForeignKey::create() + .name("fk_sync_conduit_source_entry") + .from(SyncConduit::Table, SyncConduit::SourceEntryId) + .to(Entry::Table, Entry::Id) + .on_delete(ForeignKeyAction::Cascade), + ) + .foreign_key( + ForeignKey::create() + .name("fk_sync_conduit_target_entry") + .from(SyncConduit::Table, SyncConduit::TargetEntryId) + .to(Entry::Table, Entry::Id) + .on_delete(ForeignKeyAction::Cascade), + ) + .to_owned(), + ) + .await?; + + // Create index on enabled column for active conduit queries + manager + .create_index( + Index::create() + .name("idx_sync_conduit_enabled") + .table(SyncConduit::Table) + .col(SyncConduit::Enabled) + .to_owned(), + ) + .await?; + + // Create sync_generation table + manager + .create_table( + Table::create() + .table(SyncGeneration::Table) + .if_not_exists() + .col( + ColumnDef::new(SyncGeneration::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(SyncGeneration::ConduitId).integer().not_null()) + .col(ColumnDef::new(SyncGeneration::Generation).big_integer().not_null()) + .col( + ColumnDef::new(SyncGeneration::StartedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .col(ColumnDef::new(SyncGeneration::CompletedAt).timestamp_with_time_zone()) + .col( + ColumnDef::new(SyncGeneration::FilesCopied) + .integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncGeneration::FilesDeleted) + .integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncGeneration::ConflictsResolved) + .integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncGeneration::BytesTransferred) + .big_integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(SyncGeneration::ErrorsEncountered) + .integer() + .not_null() + .default(0), + ) + .col(ColumnDef::new(SyncGeneration::VerifiedAt).timestamp_with_time_zone()) + .col( + ColumnDef::new(SyncGeneration::VerificationStatus) + .string() + .not_null() + .default("unverified"), + ) + .foreign_key( + ForeignKey::create() + .name("fk_sync_generation_conduit") + .from(SyncGeneration::Table, SyncGeneration::ConduitId) + .to(SyncConduit::Table, SyncConduit::Id) + .on_delete(ForeignKeyAction::Cascade), + ) + .to_owned(), + ) + .await?; + + // Create index on (conduit_id, generation) for efficient generation lookups + manager + .create_index( + Index::create() + .name("idx_sync_generation_conduit") + .table(SyncGeneration::Table) + .col(SyncGeneration::ConduitId) + .col(SyncGeneration::Generation) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Drop tables in reverse order (child tables first) + manager + .drop_table(Table::drop().table(SyncGeneration::Table).to_owned()) + .await?; + + manager + .drop_table(Table::drop().table(SyncConduit::Table).to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum SyncConduit { + Table, + Id, + Uuid, + SourceEntryId, + TargetEntryId, + SyncMode, + Enabled, + Schedule, + UseIndexRules, + IndexModeOverride, + ParallelTransfers, + BandwidthLimitMbps, + LastSyncCompletedAt, + SyncGeneration, + LastSyncError, + TotalSyncs, + FilesSynced, + BytesTransferred, + CreatedAt, + UpdatedAt, +} + +#[derive(DeriveIden)] +enum SyncGeneration { + Table, + Id, + ConduitId, + Generation, + StartedAt, + CompletedAt, + FilesCopied, + FilesDeleted, + ConflictsResolved, + BytesTransferred, + ErrorsEncountered, + VerifiedAt, + VerificationStatus, +} + +#[derive(DeriveIden)] +enum Entry { + Table, + Id, +} diff --git a/core/src/ops/files/copy/strategy.rs b/core/src/ops/files/copy/strategy.rs index f20baf40e..b5a10579b 100644 --- a/core/src/ops/files/copy/strategy.rs +++ b/core/src/ops/files/copy/strategy.rs @@ -60,6 +60,7 @@ use async_trait::async_trait; use std::path::Path; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::{info, debug, error}; /// Progress callback for strategy implementations to report granular progress /// Parameters: bytes_copied_for_current_file, total_bytes_for_current_file @@ -257,19 +258,67 @@ impl CopyStrategy for RemoteTransferStrategy { verify_checksum: bool, progress_callback: Option<&ProgressCallback<'a>>, ) -> Result { + info!("[REMOTE_STRATEGY] === RemoteTransferStrategy::execute() CALLED ==="); + info!("[REMOTE_STRATEGY] Source: {:?}", source); + info!("[REMOTE_STRATEGY] Destination: {:?}", destination); + // Get networking service - let networking = ctx - .networking_service() - .ok_or_else(|| anyhow::anyhow!("Networking service not available"))?; + debug!("[REMOTE_STRATEGY] Getting networking service..."); + let networking = match ctx.networking_service() { + Some(n) => { + debug!("[REMOTE_STRATEGY] Networking service retrieved successfully"); + n + } + None => { + error!("[REMOTE_STRATEGY] ERROR: Networking service not available"); + return Err(anyhow::anyhow!("Networking service not available")); + } + }; // Get local path - let local_path = source - .as_local_path() - .ok_or_else(|| anyhow::anyhow!("Source must be local path"))?; + debug!("[REMOTE_STRATEGY] Converting source to local path..."); + let local_path = match source.as_local_path() { + Some(p) => { + info!("[REMOTE_STRATEGY] Local path: {}", p.display()); + p + } + None => { + error!("[REMOTE_STRATEGY] ERROR: Source is not a local path"); + return Err(anyhow::anyhow!("Source must be local path")); + } + }; // Read file metadata - let metadata = tokio::fs::metadata(local_path).await?; + debug!("[REMOTE_STRATEGY] Reading file metadata..."); + let metadata = match tokio::fs::metadata(local_path).await { + Ok(m) => { + debug!("[REMOTE_STRATEGY] Metadata read successfully"); + m + } + Err(e) => { + error!("[REMOTE_STRATEGY] ERROR: Failed to read metadata: {}", e); + return Err(e.into()); + } + }; let file_size = metadata.len(); + info!("[REMOTE_STRATEGY] File size: {} bytes", file_size); + + info!("[REMOTE_STRATEGY] About to calculate file checksum..."); + let checksum = match calculate_file_checksum(local_path).await { + Ok(c) => { + info!("[REMOTE_STRATEGY] Checksum calculated: {}", c); + Some(c) + } + Err(e) => { + error!("[REMOTE_STRATEGY] ERROR: Failed to calculate checksum: {}", e); + return Err(e); + } + }; + + info!("[REMOTE_STRATEGY] Initiating cross-device transfer: {} ({} bytes) -> device:{}", + local_path.display(), + file_size, + destination.device_id().unwrap_or_default()); ctx.log(format!( "Initiating cross-device transfer: {} ({} bytes) -> device:{}", @@ -288,11 +337,12 @@ impl CopyStrategy for RemoteTransferStrategy { size: file_size, modified: metadata.modified().ok(), is_directory: metadata.is_dir(), - checksum: Some(calculate_file_checksum(local_path).await?), + checksum, mime_type: None, }; // Get file transfer protocol handler + debug!("[REMOTE_STRATEGY] Getting file transfer protocol handler..."); let networking_guard = &*networking; let protocol_registry = networking_guard.protocol_registry(); let registry_guard = protocol_registry.read().await; @@ -306,6 +356,8 @@ impl CopyStrategy for RemoteTransferStrategy { .downcast_ref::() .ok_or_else(|| anyhow::anyhow!("Invalid file transfer protocol handler"))?; + info!("[REMOTE_STRATEGY] Protocol handler retrieved, initiating transfer..."); + // Initiate transfer let transfer_id = file_transfer_protocol .initiate_transfer( @@ -315,57 +367,44 @@ impl CopyStrategy for RemoteTransferStrategy { ) .await?; + info!("[REMOTE_STRATEGY] Transfer initiated with ID: {}", transfer_id); ctx.log(format!("Transfer initiated with ID: {}", transfer_id)); - // Send transfer request to remote device - let chunk_size = 64 * 1024u32; - let total_chunks = ((file_size + chunk_size as u64 - 1) / chunk_size as u64) as u32; + // Don't drop the registry guard yet - we need it for stream_file_data + // stream_file_data will handle sending the transfer request AND the file chunks + info!("[REMOTE_STRATEGY] About to call stream_file_data for {} bytes", file_size); + ctx.log(format!("About to call stream_file_data for {} bytes", file_size)); - let transfer_request = crate::service::network::protocol::file_transfer::FileTransferMessage::TransferRequest { - transfer_id, - file_metadata: file_metadata.clone(), - transfer_mode: crate::service::network::protocol::TransferMode::TrustedCopy, - chunk_size, - total_chunks, - destination_path: destination.path().map(|p| p.to_string_lossy().to_string()).unwrap_or_default(), - }; - - let request_data = rmp_serde::to_vec(&transfer_request)?; - - // Send transfer request over network - networking_guard - .send_message( - destination.device_id().unwrap_or_default(), - "file_transfer", - request_data, - ) - .await?; - - ctx.log(format!( - "Transfer request sent to device {}", - destination.device_id().unwrap_or_default() - )); - - // Stream file data - drop(networking_guard); - drop(registry_guard); - - stream_file_data( + let result = stream_file_data( local_path, transfer_id, file_transfer_protocol, file_size, destination.device_id().unwrap_or_default(), + destination.path().map(|p| p.to_string_lossy().to_string()).unwrap_or_default(), + file_metadata, ctx, progress_callback, ) - .await?; + .await; - ctx.log(format!( - "Cross-device transfer completed: {} bytes", - file_size - )); - Ok(file_size) + info!("[REMOTE_STRATEGY] stream_file_data returned: {:?}", result.is_ok()); + + match result { + Ok(()) => { + info!("[REMOTE_STRATEGY] Cross-device transfer completed successfully: {} bytes", file_size); + ctx.log(format!( + "Cross-device transfer completed successfully: {} bytes", + file_size + )); + Ok(file_size) + } + Err(e) => { + error!("[REMOTE_STRATEGY] Cross-device transfer FAILED: {}", e); + ctx.log(format!("Cross-device transfer FAILED: {}", e)); + Err(e) + } + } } } @@ -666,37 +705,126 @@ async fn calculate_file_checksum(path: &Path) -> Result { .map_err(|e| anyhow::anyhow!("Failed to generate content hash: {}", e)) } -/// Stream file data in chunks to the remote device +/// Stream file data in chunks to the remote device using a persistent connection async fn stream_file_data<'a>( file_path: &Path, transfer_id: uuid::Uuid, file_transfer_protocol: &crate::service::network::protocol::FileTransferProtocolHandler, total_size: u64, destination_device_id: uuid::Uuid, + destination_path: String, + file_metadata: crate::service::network::protocol::FileMetadata, ctx: &JobContext<'a>, progress_callback: Option<&ProgressCallback<'a>>, ) -> Result<()> { use blake3::Hasher; - use tokio::io::AsyncReadExt; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; - // Get networking service for sending chunks + info!("[STREAM_FILE_DATA] === stream_file_data() CALLED ==="); + info!("[STREAM_FILE_DATA] transfer_id: {}", transfer_id); + info!("[STREAM_FILE_DATA] file: {}", file_path.display()); + info!("[STREAM_FILE_DATA] size: {} bytes", total_size); + info!("[STREAM_FILE_DATA] dest device: {}", destination_device_id); + + // Get networking service + debug!("[STREAM_FILE_DATA] Getting networking service..."); let networking = ctx .networking_service() .ok_or_else(|| anyhow::anyhow!("Networking service not available"))?; + let networking_guard = &*networking; + + // Get device registry to lookup node_id + debug!("[STREAM_FILE_DATA] Getting device registry..."); + let device_registry = networking_guard.device_registry(); + let registry = device_registry.read().await; + let node_id = registry + .get_node_by_device(destination_device_id) + .ok_or_else(|| { + anyhow::anyhow!( + "Could not find node_id for device {}", + destination_device_id + ) + })?; + drop(registry); + info!("[STREAM_FILE_DATA] Found node_id: {}", node_id); + + // Get endpoint for creating connection + debug!("[STREAM_FILE_DATA] Getting endpoint..."); + let endpoint = networking_guard.endpoint().ok_or_else(|| { + anyhow::anyhow!("Networking endpoint not available") + })?; + + info!("[STREAM_FILE_DATA] Opening persistent connection to node {} (device {})", node_id, destination_device_id); + ctx.log(format!( + "Opening persistent connection to node {} (device {}) for file transfer", + node_id, destination_device_id + )); + + // Connect to the target device using file_transfer ALPN + let node_addr = iroh::NodeAddr::new(node_id); + let connection = endpoint + .connect(node_addr, b"spacedrive/filetransfer/1") + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to device: {}", e))?; + + info!("[STREAM_FILE_DATA] Connected! Opening bidirectional stream..."); + ctx.log(format!( + "Connected to device {}, opening bidirectional stream", + destination_device_id + )); + + // Open bidirectional stream for the transfer (so we can receive acknowledgment) + let (mut send_stream, mut recv_stream) = connection + .open_bi() + .await + .map_err(|e| anyhow::anyhow!("Failed to open stream: {}", e))?; + + info!("[STREAM_FILE_DATA] Bidirectional stream opened successfully"); + + // First, send the TransferRequest message + let chunk_size = 64 * 1024u32; + let total_chunks = ((total_size + chunk_size as u64 - 1) / chunk_size as u64) as u32; + + let transfer_request = crate::service::network::protocol::file_transfer::FileTransferMessage::TransferRequest { + transfer_id, + file_metadata, + transfer_mode: crate::service::network::protocol::TransferMode::TrustedCopy, + chunk_size, + total_chunks, + destination_path, + }; + + let request_data = rmp_serde::to_vec(&transfer_request)?; + + ctx.log(format!( + "Sending TransferRequest for {} bytes ({} chunks)", + total_size, total_chunks + )); + + // Send transfer request: type (0) + length + data + send_stream.write_u8(0).await?; + send_stream.write_all(&(request_data.len() as u32).to_be_bytes()).await?; + send_stream.write_all(&request_data).await?; + send_stream.flush().await?; + + ctx.log("TransferRequest sent, now sending file chunks".to_string()); + + // Open file for reading let mut file = tokio::fs::File::open(file_path).await?; - let chunk_size = 64 * 1024; // 64KB chunks + let chunk_size = 64 * 1024u64; // 64KB chunks let total_chunks = (total_size + chunk_size - 1) / chunk_size; let mut buffer = vec![0u8; chunk_size as usize]; let mut chunk_index = 0u32; let mut bytes_transferred = 0u64; ctx.log(format!( - "Starting to stream {} chunks to device {}", - total_chunks, destination_device_id + "Starting to stream {} chunks ({} bytes) to device {}", + total_chunks, total_size, destination_device_id )); + // Send all chunks over the stream loop { ctx.check_interrupt().await?; @@ -705,16 +833,10 @@ async fn stream_file_data<'a>( break; // End of file } - // Calculate chunk checksum + // Calculate chunk checksum (of original data) let chunk_data = &buffer[..bytes_read]; let chunk_checksum = blake3::hash(chunk_data); - // Update progress - bytes_transferred += bytes_read as u64; - if let Some(callback) = progress_callback { - callback(bytes_transferred, total_size); - } - // Get session keys for encryption let session_keys = file_transfer_protocol .get_session_keys_for_device(destination_device_id) @@ -735,35 +857,59 @@ async fn stream_file_data<'a>( chunk_index, data: encrypted_data, nonce, - chunk_checksum: *chunk_checksum.as_bytes(), // Checksum of original unencrypted data + chunk_checksum: *chunk_checksum.as_bytes(), }; - // Serialize and send chunk over network - let chunk_data = rmp_serde::to_vec(&chunk_message)?; + // Serialize message + let message_data = rmp_serde::to_vec(&chunk_message)?; - let networking_guard = &*networking; - networking_guard - .send_message(destination_device_id, "file_transfer", chunk_data) - .await?; + // Send transfer type (0 for messages) + message length + message data + send_stream.write_u8(0).await?; // Type 0 = message-based transfer + send_stream + .write_all(&(message_data.len() as u32).to_be_bytes()) + .await + .map_err(|e| anyhow::anyhow!("Failed to write message length: {}", e))?; + send_stream + .write_all(&message_data) + .await + .map_err(|e| anyhow::anyhow!("Failed to write chunk data: {}", e))?; + send_stream + .flush() + .await + .map_err(|e| anyhow::anyhow!("Failed to flush stream: {}", e))?; - // Record chunk in local protocol handler for tracking + // Record chunk sent file_transfer_protocol.record_chunk_received( &transfer_id, chunk_index, bytes_read as u64, )?; + // Update progress + bytes_transferred += bytes_read as u64; + if let Some(callback) = progress_callback { + callback(bytes_transferred, total_size); + } + chunk_index += 1; - ctx.log(format!( - "Sent chunk {}/{} ({} bytes)", - chunk_index, total_chunks, bytes_read - )); + // Log every 100 chunks or on first/last chunk + if chunk_index == 1 || chunk_index % 100 == 0 || chunk_index == total_chunks as u32 { + ctx.log(format!( + "Sent chunk {}/{} ({} bytes total)", + chunk_index, total_chunks, bytes_transferred + )); + } - // Small delay to prevent overwhelming the network - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + // Yield to allow other tasks to run + tokio::task::yield_now().await; } + ctx.log(format!( + "All {} chunks sent, sending completion message", + chunk_index + )); + // Send transfer completion message let final_checksum = calculate_file_checksum(file_path).await?; let completion_message = @@ -775,20 +921,65 @@ async fn stream_file_data<'a>( let completion_data = rmp_serde::to_vec(&completion_message)?; - let networking_guard = &*networking; - networking_guard - .send_message(destination_device_id, "file_transfer", completion_data) - .await?; + // Send completion message + send_stream.write_u8(0).await?; + send_stream.write_all(&(completion_data.len() as u32).to_be_bytes()).await?; + send_stream.write_all(&completion_data).await?; + send_stream.flush().await?; - // Mark transfer as completed locally + ctx.log(format!( + "Completion message sent, waiting for final acknowledgment from receiver" + )); + + // Close the send side to signal we're done sending + send_stream + .finish() + .map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?; + + // Wait for TransferFinalAck response from receiver + ctx.log("Waiting for TransferFinalAck from receiver...".to_string()); + + // Read the response message + let mut msg_type = [0u8; 1]; + recv_stream.read_exact(&mut msg_type).await?; + + if msg_type[0] != 0 { + return Err(anyhow::anyhow!("Unexpected response type: {}", msg_type[0])); + } + + let mut len_buf = [0u8; 4]; + recv_stream.read_exact(&mut len_buf).await?; + let msg_len = u32::from_be_bytes(len_buf) as usize; + + let mut msg_buf = vec![0u8; msg_len]; + recv_stream.read_exact(&mut msg_buf).await?; + + let ack_message: crate::service::network::protocol::file_transfer::FileTransferMessage = + rmp_serde::from_slice(&msg_buf)?; + + // Verify it's a TransferFinalAck for our transfer + match ack_message { + crate::service::network::protocol::file_transfer::FileTransferMessage::TransferFinalAck { transfer_id: ack_id } => { + if ack_id != transfer_id { + return Err(anyhow::anyhow!("Received ack for wrong transfer: expected {}, got {}", transfer_id, ack_id)); + } + ctx.log("Received TransferFinalAck from receiver - transfer confirmed!".to_string()); + } + _ => { + return Err(anyhow::anyhow!("Expected TransferFinalAck, got different message type")); + } + } + + // Now mark transfer as completed locally (only after receiving confirmation) file_transfer_protocol.update_session_state( &transfer_id, crate::service::network::protocol::file_transfer::TransferState::Completed, )?; ctx.log(format!( - "File streaming completed: {} chunks, {} bytes sent to device {}", + "File streaming completed and acknowledged: {} chunks, {} bytes sent to device {}", chunk_index, bytes_transferred, destination_device_id )); + Ok(()) } diff --git a/core/src/service/network/device/registry.rs b/core/src/service/network/device/registry.rs index f5011149c..ad67415e7 100644 --- a/core/src/service/network/device/registry.rs +++ b/core/src/service/network/device/registry.rs @@ -437,8 +437,12 @@ impl DeviceRegistry { // Found session keys for connected device Some(session_keys.clone()) } + Some(DeviceState::Disconnected { session_keys, .. }) => { + // Found session keys for disconnected device (session keys are preserved after disconnect) + Some(session_keys.clone()) + } _ => { - // Device not found or not paired/connected + // Device not found or in a state without session keys (Discovered, Pairing) None } } diff --git a/core/src/service/sync/applier.rs b/core/src/service/sync/applier.rs deleted file mode 100644 index 2a6d3d6da..000000000 --- a/core/src/service/sync/applier.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Sync applier (STUB - Being replaced with PeerSync) -//! -//! This module handled applying sync log entries from the leader. -//! In the new leaderless architecture, this logic is in PeerSync. - -use anyhow::Result; -use tracing::warn; - -/// Sync applier (DEPRECATED) -/// -/// Stubbed during migration to leaderless architecture. -pub struct SyncApplier; - -impl SyncApplier { - /// Create a new sync applier (stub) - pub fn new() -> Self { - warn!("SyncApplier is deprecated - use PeerSync instead"); - Self - } - - /// Apply sync entry (stub) - pub async fn apply(&self, _entry: serde_json::Value) -> Result<()> { - warn!("SyncApplier::apply called but deprecated"); - Ok(()) - } -} - -impl Default for SyncApplier { - fn default() -> Self { - Self::new() - } -} diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index 942b17eb9..6fe0f0dae 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -4,7 +4,6 @@ //! - State-based sync for device-owned data //! - Log-based sync with HLC for shared resources -pub mod applier; pub mod backfill; pub mod peer; pub mod protocol_handler; @@ -28,7 +27,6 @@ use tokio::sync::{Mutex, RwLock}; use tracing::{info, warn}; use uuid::Uuid; -pub use applier::SyncApplier; pub use backfill::BackfillManager; pub use protocol_handler::{LogSyncHandler, StateSyncHandler};