diff --git a/.tasks/FSYNC-000-file-sync-system.md b/.tasks/FSYNC-000-file-sync-system.md index a92d21e8f..71756fed3 100644 --- a/.tasks/FSYNC-000-file-sync-system.md +++ b/.tasks/FSYNC-000-file-sync-system.md @@ -55,7 +55,7 @@ Intelligent local storage management with access pattern tracking. ## Child Tasks - **FSYNC-001**: DeleteJob Strategy Pattern & Remote Deletion (Phase 1) - Done -- **FSYNC-002**: Database Schema & Entities (Phase 2) +- **FSYNC-002**: Database Schema & Entities (Phase 2) - Done - **FSYNC-003**: FileSyncService Core Implementation (Phase 3) - **FSYNC-004**: Service Integration & API (Phase 4) - **FSYNC-005**: Advanced Features (Phase 5) diff --git a/core/src/infra/job/logger.rs b/core/src/infra/job/logger.rs index 45982fc69..0c4ebd357 100644 --- a/core/src/infra/job/logger.rs +++ b/core/src/infra/job/logger.rs @@ -236,16 +236,23 @@ impl FileJobLogger { return Ok(()); } - let mut file = self.file.lock().unwrap(); - writeln!( - file, + let formatted = format!( "[{}] {} {}: {}", chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"), level, self.job_id, message - )?; - file.flush() + ); + + // Write to file + let mut file = self.file.lock().unwrap(); + writeln!(file, "{}", formatted)?; + file.flush()?; + + // ALSO output to stdout for test visibility + println!("{}", formatted); + + Ok(()) } } diff --git a/core/src/ops/files/copy/job.rs b/core/src/ops/files/copy/job.rs index a639bf2e3..274192d30 100644 --- a/core/src/ops/files/copy/job.rs +++ b/core/src/ops/files/copy/job.rs @@ -13,6 +13,7 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, Instant}, }; +use tracing::info; use uuid::Uuid; /// Move operation modes for UI context @@ -329,6 +330,9 @@ impl JobHandler for FileCopyJob { ) .await; + info!("[JOB] About to execute strategy for {} -> {}", + resolved_source.display(), final_destination.display()); + // 2. Execute the strategy with progress callback match strategy .execute( diff --git a/core/src/service/network/core/event_loop.rs b/core/src/service/network/core/event_loop.rs index 08024eb3c..e89f52430 100644 --- a/core/src/service/network/core/event_loop.rs +++ b/core/src/service/network/core/event_loop.rs @@ -342,21 +342,131 @@ impl NetworkingEventLoop { } } - // Route to appropriate handler based on pairing status - let handler_name = if is_paired { "messaging" } else { "pairing" }; + // For paired devices, check if this is a file transfer stream + // File transfer streams start with type byte 0, followed by length and message + if is_paired { + // Try to peek at the first byte to detect file transfer protocol + // File transfer protocol starts with transfer_type byte + use tokio::io::AsyncReadExt; + let mut peek_buf = [0u8; 1]; + let mut recv_peekable = recv; - let registry = protocol_registry.read().await; - if let Some(handler) = registry.get_handler(handler_name) { - logger.info(&format!("Directing bidirectional stream to {} handler", handler_name)).await; - handler.handle_stream( - Box::new(send), - Box::new(recv), - remote_node_id, - ).await; - logger.info(&format!("{} handler completed for stream from {}", handler_name, remote_node_id)).await; - } else { - logger.error(&format!("No {} handler registered!", handler_name)).await; + // Try to peek the first byte + let bytes_read = recv_peekable.read(&mut peek_buf).await; + match bytes_read { + Ok(Some(n)) if n > 0 => { + // Check if this looks like a file transfer message (type 0 or 1) + if peek_buf[0] <= 1 { + // Likely file transfer - but we need to put the byte back + // Wrap in a custom reader that replays this byte + struct PrependReader { + byte: Option, + inner: R, + } + + impl tokio::io::AsyncRead for PrependReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + if let Some(byte) = self.byte.take() { + buf.put_slice(&[byte]); + std::task::Poll::Ready(Ok(())) + } else { + std::pin::Pin::new(&mut self.inner).poll_read(cx, buf) + } + } + } + + let recv_with_byte = PrependReader { + byte: Some(peek_buf[0]), + inner: recv_peekable, + }; + + let registry = protocol_registry.read().await; + if let Some(handler) = registry.get_handler("file_transfer") { + logger.info("Directing bidirectional stream to file_transfer handler").await; + handler.handle_stream( + Box::new(send), + Box::new(recv_with_byte), + remote_node_id, + ).await; + logger.info("file_transfer handler completed for stream").await; + } + continue; // Skip the default handler logic below + } else { + // Not file transfer, use messaging + // Need to wrap to replay the byte + struct PrependReader { + byte: Option, + inner: R, + } + + impl tokio::io::AsyncRead for PrependReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + if let Some(byte) = self.byte.take() { + buf.put_slice(&[byte]); + std::task::Poll::Ready(Ok(())) + } else { + std::pin::Pin::new(&mut self.inner).poll_read(cx, buf) + } + } + } + + let recv_with_byte = PrependReader { + byte: Some(peek_buf[0]), + inner: recv_peekable, + }; + + let registry = protocol_registry.read().await; + if let Some(handler) = registry.get_handler("messaging") { + logger.info("Directing bidirectional stream to messaging handler").await; + handler.handle_stream( + Box::new(send), + Box::new(recv_with_byte), + remote_node_id, + ).await; + logger.info("messaging handler completed for stream").await; + } + continue; // Skip the default handler logic below + } } + _ => { + // Default to messaging if we can't peek, no bytes, or error + // Use recv_peekable since recv was already moved + let registry = protocol_registry.read().await; + if let Some(handler) = registry.get_handler("messaging") { + logger.info("Directing bidirectional stream to messaging handler (fallback)").await; + handler.handle_stream( + Box::new(send), + Box::new(recv_peekable), + remote_node_id, + ).await; + logger.info("messaging handler completed for stream").await; + } + continue; + } + } + } else { + // Unpaired device, use pairing handler + let registry = protocol_registry.read().await; + if let Some(handler) = registry.get_handler("pairing") { + logger.info("Directing bidirectional stream to pairing handler").await; + handler.handle_stream( + Box::new(send), + Box::new(recv), + remote_node_id, + ).await; + logger.info("pairing handler completed for stream").await; + } else { + logger.error("No pairing handler registered!").await; + } + } } Err(e) => { // Check if the QUIC connection itself is closed diff --git a/core/src/service/network/protocol/file_transfer.rs b/core/src/service/network/protocol/file_transfer.rs index a7fe8b2ad..5e9100593 100644 --- a/core/src/service/network/protocol/file_transfer.rs +++ b/core/src/service/network/protocol/file_transfer.rs @@ -998,28 +998,58 @@ impl super::ProtocolHandler for FileTransferProtocolHandler { match transfer_type[0] { 0 => { - // File metadata request - // Read message length - let mut len_buf = [0u8; 4]; - if let Err(e) = recv.read_exact(&mut len_buf).await { - self.logger - .error(&format!("Failed to read message length: {}", e)) - .await; - return; - } - let msg_len = u32::from_be_bytes(len_buf) as usize; + // File metadata request - this is now a message stream + // Keep reading messages until stream closes or TransferComplete received + // Note: The first type byte (0) was already read above + let mut first_message = true; - // Read message - let mut msg_buf = vec![0u8; msg_len]; - if let Err(e) = recv.read_exact(&mut msg_buf).await { - self.logger - .error(&format!("Failed to read message: {}", e)) - .await; - return; - } + loop { + // For messages after the first, read the type byte + if !first_message { + let mut msg_type = [0u8; 1]; + match recv.read_exact(&mut msg_type).await { + Ok(_) => { + if msg_type[0] != 0 { + self.logger + .error(&format!("Unexpected message type in stream: {}", msg_type[0])) + .await; + break; + } + }, + Err(e) => { + self.logger + .debug(&format!("Stream ended or error reading type: {}", e)) + .await; + break; + } + } + } + first_message = false; - // Deserialize and handle - if let Ok(message) = rmp_serde::from_slice::(&msg_buf) { + // Read message length + let mut len_buf = [0u8; 4]; + match recv.read_exact(&mut len_buf).await { + Ok(_) => {}, + Err(e) => { + self.logger + .error(&format!("Failed to read message length: {}", e)) + .await; + break; + } + } + let msg_len = u32::from_be_bytes(len_buf) as usize; + + // Read message + let mut msg_buf = vec![0u8; msg_len]; + if let Err(e) = recv.read_exact(&mut msg_buf).await { + self.logger + .error(&format!("Failed to read message: {}", e)) + .await; + break; + } + + // Deserialize and handle + if let Ok(message) = rmp_serde::from_slice::(&msg_buf) { self.logger .debug(&format!( "Received file transfer message: {}", @@ -1098,7 +1128,7 @@ impl super::ProtocolHandler for FileTransferProtocolHandler { if let Err(e) = self .handle_incoming_transfer_complete( transfer_id, - final_checksum, + final_checksum.clone(), total_bytes, ) .await @@ -1106,6 +1136,24 @@ impl super::ProtocolHandler for FileTransferProtocolHandler { self.logger .error(&format!("Failed to handle transfer completion: {}", e)) .await; + } else { + // Send TransferFinalAck response back to sender + self.logger + .info(&format!("Sending TransferFinalAck for transfer {}", transfer_id)) + .await; + + let ack_message = FileTransferMessage::TransferFinalAck { transfer_id }; + if let Ok(ack_data) = rmp_serde::to_vec(&ack_message) { + // Send type (0) + length + data + let _ = send.write_u8(0).await; + let _ = send.write_all(&(ack_data.len() as u32).to_be_bytes()).await; + let _ = send.write_all(&ack_data).await; + let _ = send.flush().await; + + self.logger + .info(&format!("TransferFinalAck sent for transfer {}", transfer_id)) + .await; + } } } _ => { @@ -1114,7 +1162,8 @@ impl super::ProtocolHandler for FileTransferProtocolHandler { .await; } } - } + } // Close the if let Ok(message) + } // Close the loop } 1 => { // File data stream diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 23a22fe89..728782dba 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -1,8 +1,81 @@ -//! Peer sync service - Leaderless architecture -//! -//! All devices are peers, using hybrid sync: +//! Peer sync service //! - State-based for device-owned data //! - Log-based with HLC for shared resources +//! +//! ## Architecture +//! +//! This service implements a leaderless peer-to-peer synchronization system where all devices +//! are equal participants. It uses a hybrid approach: +//! +//! - **State-based sync**: For device-owned data (locations, file entries). Each device owns +//! its data and broadcasts changes via timestamps. +//! - **Log-based sync with HLC**: For shared resources (tags, albums). Uses Hybrid Logical Clocks +//! for causal ordering and conflict resolution. +//! +//! ## Core Responsibilities +//! +//! ### 1. Connection Management +//! - Monitors network connection/disconnection events +//! - Updates device online status in the database +//! - Automatically triggers watermark exchange on reconnection for incremental catch-up +//! +//! ### 2. Watermark Exchange +//! - Compares sync progress between devices using two types of watermarks: +//! - `state_watermark`: timestamp tracking device-owned data progress +//! - `shared_watermark`: HLC tracking shared resource progress +//! - Determines which device is behind and needs catch-up +//! - Initiates incremental sync requests for missing changes +//! +//! ### 3. Change Broadcasting +//! - `broadcast_state_change()`: Sends device-owned changes to all connected peers +//! - `broadcast_shared_change()`: Sends shared resource changes with HLC for ordering +//! - Broadcasts in parallel to all connected sync partners (30s timeout per peer) +//! - Failed sends are queued for retry with exponential backoff +//! +//! ### 4. Change Application +//! - `on_state_change_received()`: Applies incoming device state changes +//! - `on_shared_change_received()`: Applies shared changes with HLC-based conflict resolution +//! - Buffers changes during backfill/catch-up to maintain consistency +//! - Sends ACKs back to originators for distributed log pruning +//! +//! ### 5. Backfill Support +//! - `get_device_state()`: Queries device-owned data for initial sync (domain-agnostic via registry) +//! - `get_shared_changes()`: Retrieves shared changes from peer log since a given HLC +//! - `get_full_shared_state()`: Gets complete snapshot of shared resources for new devices +//! +//! ### 6. Background Tasks +//! - **Retry processor**: Retries failed message sends every 10 seconds +//! - **Log pruner**: Prunes acknowledged log entries every 5 minutes +//! - **Event listener**: Listens for TransactionManager events and broadcasts changes +//! - **Network listener**: Tracks peer connections and triggers watermark exchange +//! +//! ### 7. State Machine +//! The service manages sync lifecycle states: +//! - `Uninitialized`: Service created but not started +//! - `Backfilling`: Receiving initial state from peers +//! - `CatchingUp`: Processing buffered changes after backfill +//! - `Ready`: Fully synchronized, applying changes in real-time +//! +//! During non-ready states, incoming changes are buffered and applied during transition to ready. +//! +//! ## Example Flow: Peer Reconnection +//! +//! 1. Network detects peer connection → `handle_peer_connected()` +//! 2. Watermark exchange initiated → `trigger_watermark_exchange()` +//! 3. Peer responds with its watermarks → `on_watermark_exchange_response()` +//! 4. Watermarks compared to determine who needs catch-up +//! 5. If behind, send catch-up requests (`SharedChangeRequest`) +//! 6. Peer responds with missing changes +//! 7. Changes applied with conflict resolution (HLC comparison) +//! 8. ACKs sent back to peer +//! 9. Both devices prune acknowledged log entries +//! +//! ## Conflict Resolution +//! +//! For shared resources, conflicts are resolved using HLC timestamps: +//! - Incoming change with older/equal HLC → ignored +//! - Incoming change with newer HLC → applied (last-write-wins) +//! - HLC ensures causal consistency across all peers use crate::{ infra::{ @@ -591,7 +664,10 @@ impl PeerSync { Ok(event) => { use crate::service::network::core::NetworkEvent; match event { - NetworkEvent::ConnectionEstablished { device_id: peer_id, node_id } => { + NetworkEvent::ConnectionEstablished { + device_id: peer_id, + node_id, + } => { info!( peer_id = %peer_id, node_id = %node_id, @@ -609,11 +685,7 @@ impl PeerSync { // Trigger watermark exchange for reconnection sync if let Err(e) = Self::trigger_watermark_exchange( - library_id, - device_id, - peer_id, - &db, - &network, + library_id, device_id, peer_id, &db, &network, ) .await { @@ -624,15 +696,17 @@ impl PeerSync { ); } } - NetworkEvent::ConnectionLost { device_id: peer_id, node_id } => { + NetworkEvent::ConnectionLost { + device_id: peer_id, + node_id, + } => { info!( peer_id = %peer_id, node_id = %node_id, "Device disconnected - updating devices table" ); - if let Err(e) = Self::handle_peer_disconnected(peer_id, &db).await - { + if let Err(e) = Self::handle_peer_disconnected(peer_id, &db).await { warn!( peer_id = %peer_id, error = %e, @@ -1416,7 +1490,11 @@ impl PeerSync { ); // HLC conflict resolution: check if we already have a more recent change for this record - if let Ok(Some(existing_hlc)) = self.peer_log.get_latest_hlc_for_record(entry.record_uuid).await { + if let Ok(Some(existing_hlc)) = self + .peer_log + .get_latest_hlc_for_record(entry.record_uuid) + .await + { if entry.hlc <= existing_hlc { debug!( incoming_hlc = %entry.hlc,