mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-18 21:36:56 -04:00
feat: add sync_conduit and sync_generation entities
This commit is contained in:
@@ -25,6 +25,8 @@ pub mod collection_entry;
|
||||
pub mod indexer_rule;
|
||||
pub mod sidecar;
|
||||
pub mod sidecar_availability;
|
||||
pub mod sync_conduit;
|
||||
pub mod sync_generation;
|
||||
pub mod volume;
|
||||
|
||||
// Re-export all entities
|
||||
@@ -40,6 +42,8 @@ pub use indexer_rule::Entity as IndexerRule;
|
||||
pub use location::Entity as Location;
|
||||
pub use sidecar::Entity as Sidecar;
|
||||
pub use sidecar_availability::Entity as SidecarAvailability;
|
||||
pub use sync_conduit::Entity as SyncConduit;
|
||||
pub use sync_generation::Entity as SyncGeneration;
|
||||
pub use user_metadata::Entity as UserMetadata;
|
||||
pub use volume::Entity as Volume;
|
||||
|
||||
@@ -63,6 +67,8 @@ pub use indexer_rule::ActiveModel as IndexerRuleActive;
|
||||
pub use location::ActiveModel as LocationActive;
|
||||
pub use sidecar::ActiveModel as SidecarActive;
|
||||
pub use sidecar_availability::ActiveModel as SidecarAvailabilityActive;
|
||||
pub use sync_conduit::ActiveModel as SyncConduitActive;
|
||||
pub use sync_generation::ActiveModel as SyncGenerationActive;
|
||||
pub use user_metadata::ActiveModel as UserMetadataActive;
|
||||
pub use volume::ActiveModel as VolumeActive;
|
||||
|
||||
|
||||
133
core/src/infra/db/entities/sync_conduit.rs
Normal file
133
core/src/infra/db/entities/sync_conduit.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Represents a sync relationship between two directories
|
||||
///
|
||||
/// A SyncConduit defines a persistent sync configuration between a source and target directory,
|
||||
/// tracking sync state, configuration, and statistics.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "sync_conduit")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: i32,
|
||||
|
||||
#[sea_orm(unique)]
|
||||
pub uuid: Uuid,
|
||||
|
||||
/// Source directory entry ID (must be a directory)
|
||||
pub source_entry_id: i32,
|
||||
|
||||
/// Target directory entry ID (must be a directory)
|
||||
pub target_entry_id: i32,
|
||||
|
||||
/// Sync mode: "mirror", "bidirectional", or "selective"
|
||||
pub sync_mode: String,
|
||||
|
||||
/// Whether this conduit is active
|
||||
pub enabled: bool,
|
||||
|
||||
/// Sync schedule: "instant", "interval:5m", or "manual"
|
||||
pub schedule: String,
|
||||
|
||||
/// Whether to use indexer rules for filtering
|
||||
pub use_index_rules: bool,
|
||||
|
||||
/// Optional override for index mode (e.g., "shallow", "deep")
|
||||
pub index_mode_override: Option<String>,
|
||||
|
||||
/// Number of parallel file transfers
|
||||
pub parallel_transfers: i32,
|
||||
|
||||
/// Optional bandwidth limit in MB/s
|
||||
pub bandwidth_limit_mbps: Option<i32>,
|
||||
|
||||
/// Timestamp of last successful sync completion
|
||||
pub last_sync_completed_at: Option<DateTime<Utc>>,
|
||||
|
||||
/// Current sync generation number (increments on each sync)
|
||||
pub sync_generation: i64,
|
||||
|
||||
/// Last error message if sync failed
|
||||
pub last_sync_error: Option<String>,
|
||||
|
||||
/// Total number of syncs performed
|
||||
pub total_syncs: i64,
|
||||
|
||||
/// Total number of files synced
|
||||
pub files_synced: i64,
|
||||
|
||||
/// Total bytes transferred
|
||||
pub bytes_transferred: i64,
|
||||
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {
|
||||
/// Foreign key to source entry
|
||||
#[sea_orm(
|
||||
belongs_to = "super::entry::Entity",
|
||||
from = "Column::SourceEntryId",
|
||||
to = "super::entry::Column::Id"
|
||||
)]
|
||||
SourceEntry,
|
||||
|
||||
/// Foreign key to target entry
|
||||
#[sea_orm(
|
||||
belongs_to = "super::entry::Entity",
|
||||
from = "Column::TargetEntryId",
|
||||
to = "super::entry::Column::Id"
|
||||
)]
|
||||
TargetEntry,
|
||||
|
||||
/// One-to-many relationship with sync generations
|
||||
#[sea_orm(has_many = "super::sync_generation::Entity")]
|
||||
SyncGenerations,
|
||||
}
|
||||
|
||||
impl Related<super::sync_generation::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::SyncGenerations.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
/// Sync mode variants
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum SyncMode {
|
||||
/// One-way sync from source to target with automatic cleanup
|
||||
Mirror,
|
||||
/// Two-way sync with conflict detection
|
||||
Bidirectional,
|
||||
/// Intelligent local storage management (future)
|
||||
Selective,
|
||||
}
|
||||
|
||||
impl SyncMode {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Mirror => "mirror",
|
||||
Self::Bidirectional => "bidirectional",
|
||||
Self::Selective => "selective",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_str(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"mirror" => Some(Self::Mirror),
|
||||
"bidirectional" => Some(Self::Bidirectional),
|
||||
"selective" => Some(Self::Selective),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SyncMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
111
core/src/infra/db/entities/sync_generation.rs
Normal file
111
core/src/infra/db/entities/sync_generation.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Tracks individual sync operations for history and verification
|
||||
///
|
||||
/// Each sync execution creates a new generation with metrics and verification status.
|
||||
/// Generations enable sync history tracking and verification of sync consistency.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "sync_generation")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: i32,
|
||||
|
||||
/// Foreign key to sync_conduit
|
||||
pub conduit_id: i32,
|
||||
|
||||
/// Generation number (monotonically increasing)
|
||||
pub generation: i64,
|
||||
|
||||
/// When this sync generation started
|
||||
pub started_at: DateTime<Utc>,
|
||||
|
||||
/// When this sync generation completed (None if still running)
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
|
||||
/// Number of files copied during this sync
|
||||
pub files_copied: i32,
|
||||
|
||||
/// Number of files deleted during this sync
|
||||
pub files_deleted: i32,
|
||||
|
||||
/// Number of conflicts resolved during this sync
|
||||
pub conflicts_resolved: i32,
|
||||
|
||||
/// Total bytes transferred during this sync
|
||||
pub bytes_transferred: i64,
|
||||
|
||||
/// Number of errors encountered during this sync
|
||||
pub errors_encountered: i32,
|
||||
|
||||
/// When verification was performed (None if not yet verified)
|
||||
pub verified_at: Option<DateTime<Utc>>,
|
||||
|
||||
/// Verification status: "unverified", "waiting_watcher", "waiting_library_sync", "verified", "failed:<reason>"
|
||||
pub verification_status: String,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {
|
||||
/// Foreign key to sync_conduit
|
||||
#[sea_orm(
|
||||
belongs_to = "super::sync_conduit::Entity",
|
||||
from = "Column::ConduitId",
|
||||
to = "super::sync_conduit::Column::Id"
|
||||
)]
|
||||
SyncConduit,
|
||||
}
|
||||
|
||||
impl Related<super::sync_conduit::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::SyncConduit.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
/// Verification status variants
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum VerificationStatus {
|
||||
/// Sync completed, not yet verified
|
||||
Unverified,
|
||||
/// Waiting for filesystem watcher to update index
|
||||
WaitingWatcher,
|
||||
/// Waiting for library sync to propagate changes
|
||||
WaitingLibrarySync,
|
||||
/// Verification query confirmed consistency
|
||||
Verified,
|
||||
}
|
||||
|
||||
impl VerificationStatus {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Unverified => "unverified",
|
||||
Self::WaitingWatcher => "waiting_watcher",
|
||||
Self::WaitingLibrarySync => "waiting_library_sync",
|
||||
Self::Verified => "verified",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_str(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"unverified" => Some(Self::Unverified),
|
||||
"waiting_watcher" => Some(Self::WaitingWatcher),
|
||||
"waiting_library_sync" => Some(Self::WaitingLibrarySync),
|
||||
"verified" => Some(Self::Verified),
|
||||
_ if s.starts_with("failed:") => None, // Failed states are dynamic
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn failed(reason: &str) -> String {
|
||||
format!("failed:{}", reason)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for VerificationStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,260 @@
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
// Create sync_conduit table
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(SyncConduit::Table)
|
||||
.if_not_exists()
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::Id)
|
||||
.integer()
|
||||
.not_null()
|
||||
.auto_increment()
|
||||
.primary_key(),
|
||||
)
|
||||
.col(ColumnDef::new(SyncConduit::Uuid).binary().not_null().unique_key())
|
||||
.col(ColumnDef::new(SyncConduit::SourceEntryId).integer().not_null())
|
||||
.col(ColumnDef::new(SyncConduit::TargetEntryId).integer().not_null())
|
||||
.col(ColumnDef::new(SyncConduit::SyncMode).string().not_null())
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::Enabled)
|
||||
.boolean()
|
||||
.not_null()
|
||||
.default(true),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::Schedule)
|
||||
.string()
|
||||
.not_null()
|
||||
.default("manual"),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::UseIndexRules)
|
||||
.boolean()
|
||||
.not_null()
|
||||
.default(true),
|
||||
)
|
||||
.col(ColumnDef::new(SyncConduit::IndexModeOverride).string())
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::ParallelTransfers)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(3),
|
||||
)
|
||||
.col(ColumnDef::new(SyncConduit::BandwidthLimitMbps).integer())
|
||||
.col(ColumnDef::new(SyncConduit::LastSyncCompletedAt).timestamp_with_time_zone())
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::SyncGeneration)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(ColumnDef::new(SyncConduit::LastSyncError).string())
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::TotalSyncs)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::FilesSynced)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::BytesTransferred)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::CreatedAt)
|
||||
.timestamp_with_time_zone()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncConduit::UpdatedAt)
|
||||
.timestamp_with_time_zone()
|
||||
.not_null(),
|
||||
)
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_sync_conduit_source_entry")
|
||||
.from(SyncConduit::Table, SyncConduit::SourceEntryId)
|
||||
.to(Entry::Table, Entry::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade),
|
||||
)
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_sync_conduit_target_entry")
|
||||
.from(SyncConduit::Table, SyncConduit::TargetEntryId)
|
||||
.to(Entry::Table, Entry::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Create index on enabled column for active conduit queries
|
||||
manager
|
||||
.create_index(
|
||||
Index::create()
|
||||
.name("idx_sync_conduit_enabled")
|
||||
.table(SyncConduit::Table)
|
||||
.col(SyncConduit::Enabled)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Create sync_generation table
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(SyncGeneration::Table)
|
||||
.if_not_exists()
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::Id)
|
||||
.integer()
|
||||
.not_null()
|
||||
.auto_increment()
|
||||
.primary_key(),
|
||||
)
|
||||
.col(ColumnDef::new(SyncGeneration::ConduitId).integer().not_null())
|
||||
.col(ColumnDef::new(SyncGeneration::Generation).big_integer().not_null())
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::StartedAt)
|
||||
.timestamp_with_time_zone()
|
||||
.not_null(),
|
||||
)
|
||||
.col(ColumnDef::new(SyncGeneration::CompletedAt).timestamp_with_time_zone())
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::FilesCopied)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::FilesDeleted)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::ConflictsResolved)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::BytesTransferred)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::ErrorsEncountered)
|
||||
.integer()
|
||||
.not_null()
|
||||
.default(0),
|
||||
)
|
||||
.col(ColumnDef::new(SyncGeneration::VerifiedAt).timestamp_with_time_zone())
|
||||
.col(
|
||||
ColumnDef::new(SyncGeneration::VerificationStatus)
|
||||
.string()
|
||||
.not_null()
|
||||
.default("unverified"),
|
||||
)
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_sync_generation_conduit")
|
||||
.from(SyncGeneration::Table, SyncGeneration::ConduitId)
|
||||
.to(SyncConduit::Table, SyncConduit::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Create index on (conduit_id, generation) for efficient generation lookups
|
||||
manager
|
||||
.create_index(
|
||||
Index::create()
|
||||
.name("idx_sync_generation_conduit")
|
||||
.table(SyncGeneration::Table)
|
||||
.col(SyncGeneration::ConduitId)
|
||||
.col(SyncGeneration::Generation)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
// Drop tables in reverse order (child tables first)
|
||||
manager
|
||||
.drop_table(Table::drop().table(SyncGeneration::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_table(Table::drop().table(SyncConduit::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum SyncConduit {
|
||||
Table,
|
||||
Id,
|
||||
Uuid,
|
||||
SourceEntryId,
|
||||
TargetEntryId,
|
||||
SyncMode,
|
||||
Enabled,
|
||||
Schedule,
|
||||
UseIndexRules,
|
||||
IndexModeOverride,
|
||||
ParallelTransfers,
|
||||
BandwidthLimitMbps,
|
||||
LastSyncCompletedAt,
|
||||
SyncGeneration,
|
||||
LastSyncError,
|
||||
TotalSyncs,
|
||||
FilesSynced,
|
||||
BytesTransferred,
|
||||
CreatedAt,
|
||||
UpdatedAt,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum SyncGeneration {
|
||||
Table,
|
||||
Id,
|
||||
ConduitId,
|
||||
Generation,
|
||||
StartedAt,
|
||||
CompletedAt,
|
||||
FilesCopied,
|
||||
FilesDeleted,
|
||||
ConflictsResolved,
|
||||
BytesTransferred,
|
||||
ErrorsEncountered,
|
||||
VerifiedAt,
|
||||
VerificationStatus,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Entry {
|
||||
Table,
|
||||
Id,
|
||||
}
|
||||
@@ -60,6 +60,7 @@ use async_trait::async_trait;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{info, debug, error};
|
||||
|
||||
/// Progress callback for strategy implementations to report granular progress
|
||||
/// Parameters: bytes_copied_for_current_file, total_bytes_for_current_file
|
||||
@@ -257,19 +258,67 @@ impl CopyStrategy for RemoteTransferStrategy {
|
||||
verify_checksum: bool,
|
||||
progress_callback: Option<&ProgressCallback<'a>>,
|
||||
) -> Result<u64> {
|
||||
info!("[REMOTE_STRATEGY] === RemoteTransferStrategy::execute() CALLED ===");
|
||||
info!("[REMOTE_STRATEGY] Source: {:?}", source);
|
||||
info!("[REMOTE_STRATEGY] Destination: {:?}", destination);
|
||||
|
||||
// Get networking service
|
||||
let networking = ctx
|
||||
.networking_service()
|
||||
.ok_or_else(|| anyhow::anyhow!("Networking service not available"))?;
|
||||
debug!("[REMOTE_STRATEGY] Getting networking service...");
|
||||
let networking = match ctx.networking_service() {
|
||||
Some(n) => {
|
||||
debug!("[REMOTE_STRATEGY] Networking service retrieved successfully");
|
||||
n
|
||||
}
|
||||
None => {
|
||||
error!("[REMOTE_STRATEGY] ERROR: Networking service not available");
|
||||
return Err(anyhow::anyhow!("Networking service not available"));
|
||||
}
|
||||
};
|
||||
|
||||
// Get local path
|
||||
let local_path = source
|
||||
.as_local_path()
|
||||
.ok_or_else(|| anyhow::anyhow!("Source must be local path"))?;
|
||||
debug!("[REMOTE_STRATEGY] Converting source to local path...");
|
||||
let local_path = match source.as_local_path() {
|
||||
Some(p) => {
|
||||
info!("[REMOTE_STRATEGY] Local path: {}", p.display());
|
||||
p
|
||||
}
|
||||
None => {
|
||||
error!("[REMOTE_STRATEGY] ERROR: Source is not a local path");
|
||||
return Err(anyhow::anyhow!("Source must be local path"));
|
||||
}
|
||||
};
|
||||
|
||||
// Read file metadata
|
||||
let metadata = tokio::fs::metadata(local_path).await?;
|
||||
debug!("[REMOTE_STRATEGY] Reading file metadata...");
|
||||
let metadata = match tokio::fs::metadata(local_path).await {
|
||||
Ok(m) => {
|
||||
debug!("[REMOTE_STRATEGY] Metadata read successfully");
|
||||
m
|
||||
}
|
||||
Err(e) => {
|
||||
error!("[REMOTE_STRATEGY] ERROR: Failed to read metadata: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
let file_size = metadata.len();
|
||||
info!("[REMOTE_STRATEGY] File size: {} bytes", file_size);
|
||||
|
||||
info!("[REMOTE_STRATEGY] About to calculate file checksum...");
|
||||
let checksum = match calculate_file_checksum(local_path).await {
|
||||
Ok(c) => {
|
||||
info!("[REMOTE_STRATEGY] Checksum calculated: {}", c);
|
||||
Some(c)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("[REMOTE_STRATEGY] ERROR: Failed to calculate checksum: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
info!("[REMOTE_STRATEGY] Initiating cross-device transfer: {} ({} bytes) -> device:{}",
|
||||
local_path.display(),
|
||||
file_size,
|
||||
destination.device_id().unwrap_or_default());
|
||||
|
||||
ctx.log(format!(
|
||||
"Initiating cross-device transfer: {} ({} bytes) -> device:{}",
|
||||
@@ -288,11 +337,12 @@ impl CopyStrategy for RemoteTransferStrategy {
|
||||
size: file_size,
|
||||
modified: metadata.modified().ok(),
|
||||
is_directory: metadata.is_dir(),
|
||||
checksum: Some(calculate_file_checksum(local_path).await?),
|
||||
checksum,
|
||||
mime_type: None,
|
||||
};
|
||||
|
||||
// Get file transfer protocol handler
|
||||
debug!("[REMOTE_STRATEGY] Getting file transfer protocol handler...");
|
||||
let networking_guard = &*networking;
|
||||
let protocol_registry = networking_guard.protocol_registry();
|
||||
let registry_guard = protocol_registry.read().await;
|
||||
@@ -306,6 +356,8 @@ impl CopyStrategy for RemoteTransferStrategy {
|
||||
.downcast_ref::<crate::service::network::protocol::FileTransferProtocolHandler>()
|
||||
.ok_or_else(|| anyhow::anyhow!("Invalid file transfer protocol handler"))?;
|
||||
|
||||
info!("[REMOTE_STRATEGY] Protocol handler retrieved, initiating transfer...");
|
||||
|
||||
// Initiate transfer
|
||||
let transfer_id = file_transfer_protocol
|
||||
.initiate_transfer(
|
||||
@@ -315,57 +367,44 @@ impl CopyStrategy for RemoteTransferStrategy {
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("[REMOTE_STRATEGY] Transfer initiated with ID: {}", transfer_id);
|
||||
ctx.log(format!("Transfer initiated with ID: {}", transfer_id));
|
||||
|
||||
// Send transfer request to remote device
|
||||
let chunk_size = 64 * 1024u32;
|
||||
let total_chunks = ((file_size + chunk_size as u64 - 1) / chunk_size as u64) as u32;
|
||||
// Don't drop the registry guard yet - we need it for stream_file_data
|
||||
// stream_file_data will handle sending the transfer request AND the file chunks
|
||||
info!("[REMOTE_STRATEGY] About to call stream_file_data for {} bytes", file_size);
|
||||
ctx.log(format!("About to call stream_file_data for {} bytes", file_size));
|
||||
|
||||
let transfer_request = crate::service::network::protocol::file_transfer::FileTransferMessage::TransferRequest {
|
||||
transfer_id,
|
||||
file_metadata: file_metadata.clone(),
|
||||
transfer_mode: crate::service::network::protocol::TransferMode::TrustedCopy,
|
||||
chunk_size,
|
||||
total_chunks,
|
||||
destination_path: destination.path().map(|p| p.to_string_lossy().to_string()).unwrap_or_default(),
|
||||
};
|
||||
|
||||
let request_data = rmp_serde::to_vec(&transfer_request)?;
|
||||
|
||||
// Send transfer request over network
|
||||
networking_guard
|
||||
.send_message(
|
||||
destination.device_id().unwrap_or_default(),
|
||||
"file_transfer",
|
||||
request_data,
|
||||
)
|
||||
.await?;
|
||||
|
||||
ctx.log(format!(
|
||||
"Transfer request sent to device {}",
|
||||
destination.device_id().unwrap_or_default()
|
||||
));
|
||||
|
||||
// Stream file data
|
||||
drop(networking_guard);
|
||||
drop(registry_guard);
|
||||
|
||||
stream_file_data(
|
||||
let result = stream_file_data(
|
||||
local_path,
|
||||
transfer_id,
|
||||
file_transfer_protocol,
|
||||
file_size,
|
||||
destination.device_id().unwrap_or_default(),
|
||||
destination.path().map(|p| p.to_string_lossy().to_string()).unwrap_or_default(),
|
||||
file_metadata,
|
||||
ctx,
|
||||
progress_callback,
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
|
||||
ctx.log(format!(
|
||||
"Cross-device transfer completed: {} bytes",
|
||||
file_size
|
||||
));
|
||||
Ok(file_size)
|
||||
info!("[REMOTE_STRATEGY] stream_file_data returned: {:?}", result.is_ok());
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!("[REMOTE_STRATEGY] Cross-device transfer completed successfully: {} bytes", file_size);
|
||||
ctx.log(format!(
|
||||
"Cross-device transfer completed successfully: {} bytes",
|
||||
file_size
|
||||
));
|
||||
Ok(file_size)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("[REMOTE_STRATEGY] Cross-device transfer FAILED: {}", e);
|
||||
ctx.log(format!("Cross-device transfer FAILED: {}", e));
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -666,37 +705,126 @@ async fn calculate_file_checksum(path: &Path) -> Result<String> {
|
||||
.map_err(|e| anyhow::anyhow!("Failed to generate content hash: {}", e))
|
||||
}
|
||||
|
||||
/// Stream file data in chunks to the remote device
|
||||
/// Stream file data in chunks to the remote device using a persistent connection
|
||||
async fn stream_file_data<'a>(
|
||||
file_path: &Path,
|
||||
transfer_id: uuid::Uuid,
|
||||
file_transfer_protocol: &crate::service::network::protocol::FileTransferProtocolHandler,
|
||||
total_size: u64,
|
||||
destination_device_id: uuid::Uuid,
|
||||
destination_path: String,
|
||||
file_metadata: crate::service::network::protocol::FileMetadata,
|
||||
ctx: &JobContext<'a>,
|
||||
progress_callback: Option<&ProgressCallback<'a>>,
|
||||
) -> Result<()> {
|
||||
use blake3::Hasher;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
// Get networking service for sending chunks
|
||||
info!("[STREAM_FILE_DATA] === stream_file_data() CALLED ===");
|
||||
info!("[STREAM_FILE_DATA] transfer_id: {}", transfer_id);
|
||||
info!("[STREAM_FILE_DATA] file: {}", file_path.display());
|
||||
info!("[STREAM_FILE_DATA] size: {} bytes", total_size);
|
||||
info!("[STREAM_FILE_DATA] dest device: {}", destination_device_id);
|
||||
|
||||
// Get networking service
|
||||
debug!("[STREAM_FILE_DATA] Getting networking service...");
|
||||
let networking = ctx
|
||||
.networking_service()
|
||||
.ok_or_else(|| anyhow::anyhow!("Networking service not available"))?;
|
||||
|
||||
let networking_guard = &*networking;
|
||||
|
||||
// Get device registry to lookup node_id
|
||||
debug!("[STREAM_FILE_DATA] Getting device registry...");
|
||||
let device_registry = networking_guard.device_registry();
|
||||
let registry = device_registry.read().await;
|
||||
let node_id = registry
|
||||
.get_node_by_device(destination_device_id)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Could not find node_id for device {}",
|
||||
destination_device_id
|
||||
)
|
||||
})?;
|
||||
drop(registry);
|
||||
info!("[STREAM_FILE_DATA] Found node_id: {}", node_id);
|
||||
|
||||
// Get endpoint for creating connection
|
||||
debug!("[STREAM_FILE_DATA] Getting endpoint...");
|
||||
let endpoint = networking_guard.endpoint().ok_or_else(|| {
|
||||
anyhow::anyhow!("Networking endpoint not available")
|
||||
})?;
|
||||
|
||||
info!("[STREAM_FILE_DATA] Opening persistent connection to node {} (device {})", node_id, destination_device_id);
|
||||
ctx.log(format!(
|
||||
"Opening persistent connection to node {} (device {}) for file transfer",
|
||||
node_id, destination_device_id
|
||||
));
|
||||
|
||||
// Connect to the target device using file_transfer ALPN
|
||||
let node_addr = iroh::NodeAddr::new(node_id);
|
||||
let connection = endpoint
|
||||
.connect(node_addr, b"spacedrive/filetransfer/1")
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to connect to device: {}", e))?;
|
||||
|
||||
info!("[STREAM_FILE_DATA] Connected! Opening bidirectional stream...");
|
||||
ctx.log(format!(
|
||||
"Connected to device {}, opening bidirectional stream",
|
||||
destination_device_id
|
||||
));
|
||||
|
||||
// Open bidirectional stream for the transfer (so we can receive acknowledgment)
|
||||
let (mut send_stream, mut recv_stream) = connection
|
||||
.open_bi()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to open stream: {}", e))?;
|
||||
|
||||
info!("[STREAM_FILE_DATA] Bidirectional stream opened successfully");
|
||||
|
||||
// First, send the TransferRequest message
|
||||
let chunk_size = 64 * 1024u32;
|
||||
let total_chunks = ((total_size + chunk_size as u64 - 1) / chunk_size as u64) as u32;
|
||||
|
||||
let transfer_request = crate::service::network::protocol::file_transfer::FileTransferMessage::TransferRequest {
|
||||
transfer_id,
|
||||
file_metadata,
|
||||
transfer_mode: crate::service::network::protocol::TransferMode::TrustedCopy,
|
||||
chunk_size,
|
||||
total_chunks,
|
||||
destination_path,
|
||||
};
|
||||
|
||||
let request_data = rmp_serde::to_vec(&transfer_request)?;
|
||||
|
||||
ctx.log(format!(
|
||||
"Sending TransferRequest for {} bytes ({} chunks)",
|
||||
total_size, total_chunks
|
||||
));
|
||||
|
||||
// Send transfer request: type (0) + length + data
|
||||
send_stream.write_u8(0).await?;
|
||||
send_stream.write_all(&(request_data.len() as u32).to_be_bytes()).await?;
|
||||
send_stream.write_all(&request_data).await?;
|
||||
send_stream.flush().await?;
|
||||
|
||||
ctx.log("TransferRequest sent, now sending file chunks".to_string());
|
||||
|
||||
// Open file for reading
|
||||
let mut file = tokio::fs::File::open(file_path).await?;
|
||||
|
||||
let chunk_size = 64 * 1024; // 64KB chunks
|
||||
let chunk_size = 64 * 1024u64; // 64KB chunks
|
||||
let total_chunks = (total_size + chunk_size - 1) / chunk_size;
|
||||
let mut buffer = vec![0u8; chunk_size as usize];
|
||||
let mut chunk_index = 0u32;
|
||||
let mut bytes_transferred = 0u64;
|
||||
|
||||
ctx.log(format!(
|
||||
"Starting to stream {} chunks to device {}",
|
||||
total_chunks, destination_device_id
|
||||
"Starting to stream {} chunks ({} bytes) to device {}",
|
||||
total_chunks, total_size, destination_device_id
|
||||
));
|
||||
|
||||
// Send all chunks over the stream
|
||||
loop {
|
||||
ctx.check_interrupt().await?;
|
||||
|
||||
@@ -705,16 +833,10 @@ async fn stream_file_data<'a>(
|
||||
break; // End of file
|
||||
}
|
||||
|
||||
// Calculate chunk checksum
|
||||
// Calculate chunk checksum (of original data)
|
||||
let chunk_data = &buffer[..bytes_read];
|
||||
let chunk_checksum = blake3::hash(chunk_data);
|
||||
|
||||
// Update progress
|
||||
bytes_transferred += bytes_read as u64;
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(bytes_transferred, total_size);
|
||||
}
|
||||
|
||||
// Get session keys for encryption
|
||||
let session_keys = file_transfer_protocol
|
||||
.get_session_keys_for_device(destination_device_id)
|
||||
@@ -735,35 +857,59 @@ async fn stream_file_data<'a>(
|
||||
chunk_index,
|
||||
data: encrypted_data,
|
||||
nonce,
|
||||
chunk_checksum: *chunk_checksum.as_bytes(), // Checksum of original unencrypted data
|
||||
chunk_checksum: *chunk_checksum.as_bytes(),
|
||||
};
|
||||
|
||||
// Serialize and send chunk over network
|
||||
let chunk_data = rmp_serde::to_vec(&chunk_message)?;
|
||||
// Serialize message
|
||||
let message_data = rmp_serde::to_vec(&chunk_message)?;
|
||||
|
||||
let networking_guard = &*networking;
|
||||
networking_guard
|
||||
.send_message(destination_device_id, "file_transfer", chunk_data)
|
||||
.await?;
|
||||
// Send transfer type (0 for messages) + message length + message data
|
||||
send_stream.write_u8(0).await?; // Type 0 = message-based transfer
|
||||
send_stream
|
||||
.write_all(&(message_data.len() as u32).to_be_bytes())
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to write message length: {}", e))?;
|
||||
send_stream
|
||||
.write_all(&message_data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to write chunk data: {}", e))?;
|
||||
send_stream
|
||||
.flush()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to flush stream: {}", e))?;
|
||||
|
||||
// Record chunk in local protocol handler for tracking
|
||||
// Record chunk sent
|
||||
file_transfer_protocol.record_chunk_received(
|
||||
&transfer_id,
|
||||
chunk_index,
|
||||
bytes_read as u64,
|
||||
)?;
|
||||
|
||||
// Update progress
|
||||
bytes_transferred += bytes_read as u64;
|
||||
if let Some(callback) = progress_callback {
|
||||
callback(bytes_transferred, total_size);
|
||||
}
|
||||
|
||||
chunk_index += 1;
|
||||
|
||||
ctx.log(format!(
|
||||
"Sent chunk {}/{} ({} bytes)",
|
||||
chunk_index, total_chunks, bytes_read
|
||||
));
|
||||
// Log every 100 chunks or on first/last chunk
|
||||
if chunk_index == 1 || chunk_index % 100 == 0 || chunk_index == total_chunks as u32 {
|
||||
ctx.log(format!(
|
||||
"Sent chunk {}/{} ({} bytes total)",
|
||||
chunk_index, total_chunks, bytes_transferred
|
||||
));
|
||||
}
|
||||
|
||||
// Small delay to prevent overwhelming the network
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
// Yield to allow other tasks to run
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
ctx.log(format!(
|
||||
"All {} chunks sent, sending completion message",
|
||||
chunk_index
|
||||
));
|
||||
|
||||
// Send transfer completion message
|
||||
let final_checksum = calculate_file_checksum(file_path).await?;
|
||||
let completion_message =
|
||||
@@ -775,20 +921,65 @@ async fn stream_file_data<'a>(
|
||||
|
||||
let completion_data = rmp_serde::to_vec(&completion_message)?;
|
||||
|
||||
let networking_guard = &*networking;
|
||||
networking_guard
|
||||
.send_message(destination_device_id, "file_transfer", completion_data)
|
||||
.await?;
|
||||
// Send completion message
|
||||
send_stream.write_u8(0).await?;
|
||||
send_stream.write_all(&(completion_data.len() as u32).to_be_bytes()).await?;
|
||||
send_stream.write_all(&completion_data).await?;
|
||||
send_stream.flush().await?;
|
||||
|
||||
// Mark transfer as completed locally
|
||||
ctx.log(format!(
|
||||
"Completion message sent, waiting for final acknowledgment from receiver"
|
||||
));
|
||||
|
||||
// Close the send side to signal we're done sending
|
||||
send_stream
|
||||
.finish()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to finish stream: {}", e))?;
|
||||
|
||||
// Wait for TransferFinalAck response from receiver
|
||||
ctx.log("Waiting for TransferFinalAck from receiver...".to_string());
|
||||
|
||||
// Read the response message
|
||||
let mut msg_type = [0u8; 1];
|
||||
recv_stream.read_exact(&mut msg_type).await?;
|
||||
|
||||
if msg_type[0] != 0 {
|
||||
return Err(anyhow::anyhow!("Unexpected response type: {}", msg_type[0]));
|
||||
}
|
||||
|
||||
let mut len_buf = [0u8; 4];
|
||||
recv_stream.read_exact(&mut len_buf).await?;
|
||||
let msg_len = u32::from_be_bytes(len_buf) as usize;
|
||||
|
||||
let mut msg_buf = vec![0u8; msg_len];
|
||||
recv_stream.read_exact(&mut msg_buf).await?;
|
||||
|
||||
let ack_message: crate::service::network::protocol::file_transfer::FileTransferMessage =
|
||||
rmp_serde::from_slice(&msg_buf)?;
|
||||
|
||||
// Verify it's a TransferFinalAck for our transfer
|
||||
match ack_message {
|
||||
crate::service::network::protocol::file_transfer::FileTransferMessage::TransferFinalAck { transfer_id: ack_id } => {
|
||||
if ack_id != transfer_id {
|
||||
return Err(anyhow::anyhow!("Received ack for wrong transfer: expected {}, got {}", transfer_id, ack_id));
|
||||
}
|
||||
ctx.log("Received TransferFinalAck from receiver - transfer confirmed!".to_string());
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!("Expected TransferFinalAck, got different message type"));
|
||||
}
|
||||
}
|
||||
|
||||
// Now mark transfer as completed locally (only after receiving confirmation)
|
||||
file_transfer_protocol.update_session_state(
|
||||
&transfer_id,
|
||||
crate::service::network::protocol::file_transfer::TransferState::Completed,
|
||||
)?;
|
||||
|
||||
ctx.log(format!(
|
||||
"File streaming completed: {} chunks, {} bytes sent to device {}",
|
||||
"File streaming completed and acknowledged: {} chunks, {} bytes sent to device {}",
|
||||
chunk_index, bytes_transferred, destination_device_id
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -437,8 +437,12 @@ impl DeviceRegistry {
|
||||
// Found session keys for connected device
|
||||
Some(session_keys.clone())
|
||||
}
|
||||
Some(DeviceState::Disconnected { session_keys, .. }) => {
|
||||
// Found session keys for disconnected device (session keys are preserved after disconnect)
|
||||
Some(session_keys.clone())
|
||||
}
|
||||
_ => {
|
||||
// Device not found or not paired/connected
|
||||
// Device not found or in a state without session keys (Discovered, Pairing)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
//! Sync applier (STUB - Being replaced with PeerSync)
|
||||
//!
|
||||
//! This module handled applying sync log entries from the leader.
|
||||
//! In the new leaderless architecture, this logic is in PeerSync.
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::warn;
|
||||
|
||||
/// Sync applier (DEPRECATED)
|
||||
///
|
||||
/// Stubbed during migration to leaderless architecture.
|
||||
pub struct SyncApplier;
|
||||
|
||||
impl SyncApplier {
|
||||
/// Create a new sync applier (stub)
|
||||
pub fn new() -> Self {
|
||||
warn!("SyncApplier is deprecated - use PeerSync instead");
|
||||
Self
|
||||
}
|
||||
|
||||
/// Apply sync entry (stub)
|
||||
pub async fn apply(&self, _entry: serde_json::Value) -> Result<()> {
|
||||
warn!("SyncApplier::apply called but deprecated");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SyncApplier {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@
|
||||
//! - State-based sync for device-owned data
|
||||
//! - Log-based sync with HLC for shared resources
|
||||
|
||||
pub mod applier;
|
||||
pub mod backfill;
|
||||
pub mod peer;
|
||||
pub mod protocol_handler;
|
||||
@@ -28,7 +27,6 @@ use tokio::sync::{Mutex, RwLock};
|
||||
use tracing::{info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub use applier::SyncApplier;
|
||||
pub use backfill::BackfillManager;
|
||||
pub use protocol_handler::{LogSyncHandler, StateSyncHandler};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user