Files
spacedrive/core/src/volume/manager.rs
Jamie Pine cf400865f4 Introduce ephemeral index cache and status API
- Add a complete ephemeral indexing subsystem
  - core/src/ops/core/ephemeral_status with input/output and query types
  - core/src/ops/indexing/ephemeral with arena, cache, registry,
    index_cache, types
  - expose EphemeralIndexCache and EphemeralIndex through core modules
  - EphemeralIndexCache supports
    get/insert/create_for_indexing/mark_indexing_complete eviction and
    stats
- Implement EphemeralIndex data structures for memory-efficient storage
  - NodeArena, NameCache, NameRegistry, and related types
- Add EphemeralIndex status API
  - EphemeralCacheStatusInput and EphemeralCacheStatusQuery
  - EphemeralCacheStatus with per-index details
- Wire ephemeral indexing into the indexing flow
  - Change default Ephemeral Indexer behavior to shallow mode
  - Align code to EphemeralIndex usage across the codebase
- Enhance content kind detection in UI
  - Add getContentKind(file) helper (prefers content_identity.kind, then
    content_kind)
  - Use getContentKind in Explorer utilities and UI components
- Invalidate directory listings when location index_mode changes
  - Add useLocationChangeInvalidation to trigger refetches for ephemeral
    vs persistent indexing transitions
- Misc refactors and formatting to accommodate the new modules and APIs
2025-12-07 19:55:34 -08:00

1943 lines
57 KiB
Rust

//! Volume Manager - Central management for all volume operations
use crate::infra::db::entities;
use crate::infra::event::{Event, EventBus};
use crate::library::LibraryManager;
use crate::volume::{
detection,
error::{VolumeError, VolumeResult},
types::{
SpacedriveVolumeId, TrackedVolume, Volume, VolumeDetectionConfig, VolumeFingerprint,
VolumeInfo,
},
VolumeExt,
};
use crate::Core;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::Duration as StdDuration;
use tokio::{fs, sync::RwLock, time::Duration};
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
/// Filename for Spacedrive volume identifier files
const SPACEDRIVE_VOLUME_ID_FILE: &str = ".spacedrive-volume-id";
/// Get platform-specific directories to watch for volume mount changes
fn get_volume_watch_paths() -> Vec<PathBuf> {
let mut paths = Vec::new();
#[cfg(target_os = "macos")]
{
paths.push(PathBuf::from("/Volumes"));
// Note: System volumes like / and /System/Volumes/Data are typically stable
}
#[cfg(target_os = "linux")]
{
paths.push(PathBuf::from("/media"));
paths.push(PathBuf::from("/mnt"));
// Note: Could also watch /proc/mounts but that's more complex
}
#[cfg(target_os = "windows")]
{
// Windows drive letters are harder to watch - we'll rely on polling for now
// Could potentially use WMI events in the future
}
// Filter to only existing directories
paths.into_iter().filter(|p: &PathBuf| p.exists()).collect()
}
/// Central manager for volume detection, monitoring, and operations
pub struct VolumeManager {
/// Device ID for this manager
pub(crate) device_id: uuid::Uuid,
/// Currently known volumes, indexed by fingerprint
volumes: Arc<RwLock<HashMap<VolumeFingerprint, Volume>>>,
/// Cache mapping paths to volume fingerprints for fast lookup
path_cache: Arc<RwLock<HashMap<PathBuf, VolumeFingerprint>>>,
/// Cache mapping mount points to fingerprints for O(1) cloud volume lookup
/// Format: "s3://bucket" -> fingerprint
mount_point_cache: Arc<RwLock<HashMap<String, VolumeFingerprint>>>,
/// Configuration for volume detection
config: VolumeDetectionConfig,
/// Event bus for emitting volume events
events: Arc<EventBus>,
/// Whether the manager is currently running monitoring
is_monitoring: Arc<RwLock<bool>>,
/// File system watcher for real-time volume change detection
volume_watcher: Arc<RwLock<Option<RecommendedWatcher>>>,
/// Weak reference to library manager for database operations
library_manager: Arc<RwLock<Option<Weak<LibraryManager>>>>,
}
impl VolumeManager {
/// Create a new VolumeManager instance
pub fn new(
device_id: uuid::Uuid,
config: VolumeDetectionConfig,
events: Arc<EventBus>,
) -> Self {
Self {
device_id,
volumes: Arc::new(RwLock::new(HashMap::new())),
path_cache: Arc::new(RwLock::new(HashMap::new())),
mount_point_cache: Arc::new(RwLock::new(HashMap::new())),
config,
events,
is_monitoring: Arc::new(RwLock::new(false)),
volume_watcher: Arc::new(RwLock::new(None)),
library_manager: Arc::new(RwLock::new(None)),
}
}
/// Set the library manager reference
pub async fn set_library_manager(&self, library_manager: Weak<LibraryManager>) {
*self.library_manager.write().await = Some(library_manager);
}
/// Initialize the volume manager and perform initial detection
#[instrument(skip(self))]
pub async fn initialize(&self) -> VolumeResult<()> {
info!("Initializing volume manager");
// Perform initial volume detection (for local volumes)
self.refresh_volumes().await?;
// Start monitoring if configured
if self.config.refresh_interval_secs > 0 {
self.start_monitoring().await;
}
info!(
"Volume manager initialized with {} volumes",
self.volumes.read().await.len()
);
Ok(())
}
/// Load cloud volumes from the database and restore them to the in-memory HashMap
/// This should be called after libraries are loaded
pub async fn load_cloud_volumes_from_db(
&self,
libraries: &[std::sync::Arc<crate::library::Library>],
key_manager: std::sync::Arc<crate::crypto::key_manager::KeyManager>,
) -> VolumeResult<()> {
use crate::crypto::cloud_credentials::CloudCredentialManager;
let mut loaded_count = 0;
for library in libraries {
let db = library.db().conn();
// Query all network volumes (cloud volumes) for this library
let cloud_volumes = entities::volume::Entity::find()
.filter(entities::volume::Column::IsNetworkDrive.eq(true))
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
info!(
"Found {} cloud volumes in database for library {}",
cloud_volumes.len(),
library.id()
);
for db_volume in cloud_volumes {
let fingerprint = VolumeFingerprint(db_volume.fingerprint.clone());
// Skip if already loaded
if self.get_volume(&fingerprint).await.is_some() {
continue;
}
// Try to load credentials and recreate the backend
let credential_manager = CloudCredentialManager::new(
key_manager.clone(),
library.db().clone(),
library.id(),
);
match credential_manager
.get_credential(library.id(), &db_volume.fingerprint)
.await
{
Ok(credential) => {
// Get mount point from database (for display and cache purposes)
let mount_point_str = match &db_volume.mount_point {
Some(mp) => mp,
None => {
warn!("No mount point for cloud volume {}", fingerprint.0);
continue;
}
};
// Get cloud identifier from database (actual bucket/drive/container name)
let cloud_identifier = match &db_volume.cloud_identifier {
Some(id) => id,
None => {
warn!("No cloud identifier for cloud volume {}", fingerprint.0);
continue;
}
};
// Parse cloud_config JSON if available
let cloud_config: Option<serde_json::Value> = db_volume
.cloud_config
.as_ref()
.and_then(|s| serde_json::from_str(s).ok());
let backend_result = match credential.service {
crate::volume::CloudServiceType::S3 => {
if let crate::crypto::cloud_credentials::CredentialData::AccessKey {
access_key_id,
secret_access_key,
..
} = &credential.data
{
// Extract region from cloud_config, or default to us-east-1
let region = cloud_config
.as_ref()
.and_then(|c| c.get("region"))
.and_then(|r| r.as_str())
.unwrap_or("us-east-1");
let endpoint = cloud_config
.as_ref()
.and_then(|c| c.get("endpoint"))
.and_then(|e| e.as_str())
.map(String::from);
crate::volume::CloudBackend::new_s3(
cloud_identifier,
region,
access_key_id,
secret_access_key,
endpoint,
).await
} else {
warn!("Invalid credential type for S3 volume {}", fingerprint.0);
continue;
}
}
crate::volume::CloudServiceType::GoogleDrive => {
if let crate::crypto::cloud_credentials::CredentialData::OAuth {
access_token,
refresh_token,
} = &credential.data
{
crate::volume::CloudBackend::new_google_drive(
access_token,
refresh_token,
"", // client_id not stored yet
"", // client_secret not stored yet
Some(cloud_identifier.clone()),
).await
} else {
warn!("Invalid credential type for Google Drive volume {}", fingerprint.0);
continue;
}
}
crate::volume::CloudServiceType::OneDrive => {
if let crate::crypto::cloud_credentials::CredentialData::OAuth {
access_token,
refresh_token,
} = &credential.data
{
crate::volume::CloudBackend::new_onedrive(
access_token,
refresh_token,
"",
"",
Some(cloud_identifier.clone()),
).await
} else {
warn!("Invalid credential type for OneDrive volume {}", fingerprint.0);
continue;
}
}
crate::volume::CloudServiceType::Dropbox => {
if let crate::crypto::cloud_credentials::CredentialData::OAuth {
access_token,
refresh_token,
} = &credential.data
{
crate::volume::CloudBackend::new_dropbox(
access_token,
refresh_token,
"",
"",
Some(cloud_identifier.clone()),
).await
} else {
warn!("Invalid credential type for Dropbox volume {}", fingerprint.0);
continue;
}
}
crate::volume::CloudServiceType::AzureBlob => {
if let crate::crypto::cloud_credentials::CredentialData::AccessKey {
access_key_id,
secret_access_key,
..
} = &credential.data
{
crate::volume::CloudBackend::new_azure_blob(
cloud_identifier,
access_key_id,
secret_access_key,
None,
).await
} else {
warn!("Invalid credential type for Azure Blob volume {}", fingerprint.0);
continue;
}
}
crate::volume::CloudServiceType::GoogleCloudStorage => {
if let crate::crypto::cloud_credentials::CredentialData::ApiKey(service_account_json) = &credential.data {
crate::volume::CloudBackend::new_google_cloud_storage(
cloud_identifier,
service_account_json,
None,
None,
).await
} else {
warn!("Invalid credential type for GCS volume {}", fingerprint.0);
continue;
}
}
_ => {
warn!("Unsupported cloud service type {:?} for volume {}", credential.service, fingerprint.0);
continue;
}
};
match backend_result {
Ok(backend) => {
let now = chrono::Utc::now();
let volume = Volume {
id: db_volume.uuid,
fingerprint: fingerprint.clone(),
device_id: db_volume.device_id,
name: db_volume
.display_name
.clone()
.unwrap_or_else(|| "Cloud Volume".to_string()),
library_id: None,
is_tracked: true,
mount_point: std::path::PathBuf::from(mount_point_str),
mount_points: vec![std::path::PathBuf::from(mount_point_str)],
volume_type: crate::volume::types::VolumeType::Network,
mount_type: crate::volume::types::MountType::Network,
disk_type: crate::volume::types::DiskType::Unknown,
file_system: crate::volume::types::FileSystem::Other(format!(
"{:?}",
credential.service
)),
total_capacity: db_volume.total_capacity.unwrap_or(0) as u64,
available_space: db_volume.available_capacity.unwrap_or(0)
as u64,
is_read_only: false,
is_mounted: true,
hardware_id: None,
backend: Some(Arc::new(backend)),
cloud_identifier: db_volume.cloud_identifier.clone(),
cloud_config,
apfs_container: None,
container_volume_id: None,
path_mappings: Vec::new(),
is_user_visible: db_volume.is_user_visible.unwrap_or(true),
auto_track_eligible: db_volume
.auto_track_eligible
.unwrap_or(false),
read_speed_mbps: db_volume.read_speed_mbps.map(|s| s as u64),
write_speed_mbps: db_volume.write_speed_mbps.map(|s| s as u64),
created_at: db_volume.tracked_at,
updated_at: now,
last_seen_at: db_volume.last_seen_at,
total_files: None,
total_directories: None,
last_stats_update: None,
display_name: db_volume.display_name.clone(),
is_favorite: false,
color: None,
icon: None,
error_message: None,
};
let mut volumes = self.volumes.write().await;
volumes.insert(fingerprint.clone(), volume.clone());
// Update mount point cache for fast cloud volume lookup using cloud_identifier
if let Some(ref cloud_id) = volume.cloud_identifier {
let cache_key =
format!("{}://{}", credential.service.scheme(), cloud_id);
let mut mount_point_cache =
self.mount_point_cache.write().await;
mount_point_cache.insert(cache_key, fingerprint.clone());
}
loaded_count += 1;
info!(
"Loaded cloud volume {} ({:?}) from database",
db_volume
.display_name
.as_ref()
.unwrap_or(&"Unknown".to_string()),
credential.service
);
}
Err(e) => {
warn!(
"Failed to recreate cloud backend for volume {}: {}",
fingerprint.0, e
);
}
}
}
Err(e) => {
warn!("Failed to load credentials for cloud volume {} ({}): {}. The volume will not be available until credentials are re-entered by removing and re-adding the volume.",
db_volume.display_name.as_ref().unwrap_or(&"Unknown".to_string()),
fingerprint.0,
e
);
}
}
}
}
info!("Loaded {} cloud volumes from database", loaded_count);
Ok(())
}
/// Start background monitoring of volume changes
pub async fn start_monitoring(&self) {
if *self.is_monitoring.read().await {
warn!("Volume monitoring already started");
return;
}
*self.is_monitoring.write().await = true;
// Start file system watcher for real-time detection
self.start_volume_watcher().await;
// Continue with existing timer-based monitoring as fallback
let volumes = self.volumes.clone();
let path_cache = self.path_cache.clone();
let events = self.events.clone();
let config = self.config.clone();
let is_monitoring = self.is_monitoring.clone();
let library_manager = self.library_manager.clone();
let device_id = self.device_id;
tokio::spawn(async move {
info!(
"Starting volume monitoring (refresh every {}s)",
config.refresh_interval_secs
);
let mut interval =
tokio::time::interval(Duration::from_secs(config.refresh_interval_secs));
while *is_monitoring.read().await {
interval.tick().await;
if let Err(e) = Self::refresh_volumes_internal(
device_id,
&volumes,
&path_cache,
&events,
&config,
&library_manager,
)
.await
{
error!("Error during volume refresh: {}", e);
}
}
info!("Volume monitoring stopped");
});
}
/// Start file system watcher for real-time volume change detection
async fn start_volume_watcher(&self) {
let watch_paths = get_volume_watch_paths();
if watch_paths.is_empty() {
debug!("No volume watch paths available on this platform, using timer-based monitoring only");
return;
}
let volumes = self.volumes.clone();
let path_cache = self.path_cache.clone();
let events = self.events.clone();
let config = self.config.clone();
let library_manager = self.library_manager.clone();
let device_id = self.device_id;
let is_monitoring = self.is_monitoring.clone();
// Create the watcher
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let watcher = notify::recommended_watcher(move |result: notify::Result<notify::Event>| {
match result {
Ok(event) => {
// Send the event to our async handler
if let Err(_) = tx.blocking_send(event) {
// Channel closed, watcher is stopping
}
}
Err(e) => {
error!("Volume watcher error: {}", e);
}
}
});
match watcher {
Ok(mut watcher) => {
// Watch the volume directories
for path in &watch_paths {
if let Err(e) = watcher.watch(path, RecursiveMode::NonRecursive) {
warn!("Failed to watch {}: {}", path.display(), e);
} else {
info!("Watching {} for volume changes", path.display());
}
}
// Store the watcher
*self.volume_watcher.write().await = Some(watcher);
// Handle events
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if !*is_monitoring.read().await {
break;
}
// Check if this is a mount/unmount event
if event.kind.is_create() || event.kind.is_remove() {
debug!("Volume change detected: {:?}", event);
// Debounce rapid events (reduced from 500ms for faster response)
tokio::time::sleep(Duration::from_millis(200)).await;
let start_time = std::time::Instant::now();
// Trigger volume refresh
match Self::refresh_volumes_internal(
device_id,
&volumes,
&path_cache,
&events,
&config,
&library_manager,
)
.await
{
Ok(()) => {
let elapsed = start_time.elapsed();
info!(
"Event-triggered volume refresh completed in {:?}",
elapsed
);
}
Err(e) => {
error!("Error during event-triggered volume refresh: {}", e);
}
}
}
}
debug!("Volume watcher event handler stopped");
});
}
Err(e) => {
warn!("Failed to create volume watcher: {}", e);
}
}
}
/// Stop background monitoring
pub async fn stop_monitoring(&self) {
*self.is_monitoring.write().await = false;
// Stop the file system watcher
if let Some(_watcher) = self.volume_watcher.write().await.take() {
debug!("Volume watcher stopped");
}
info!("Volume monitoring stopped");
}
/// Refresh all volumes and detect changes
#[instrument(skip(self))]
pub async fn refresh_volumes(&self) -> VolumeResult<()> {
Self::refresh_volumes_internal(
self.device_id,
&self.volumes,
&self.path_cache,
&self.events,
&self.config,
&self.library_manager,
)
.await
}
/// Internal implementation of volume refresh
async fn refresh_volumes_internal(
device_id: uuid::Uuid,
volumes: &Arc<RwLock<HashMap<VolumeFingerprint, Volume>>>,
path_cache: &Arc<RwLock<HashMap<PathBuf, VolumeFingerprint>>>,
events: &Arc<EventBus>,
config: &VolumeDetectionConfig,
library_manager: &RwLock<Option<Weak<LibraryManager>>>,
) -> VolumeResult<()> {
debug!("Refreshing volumes for device {}", device_id);
// Detect current volumes
let detected_volumes = detection::detect_volumes(device_id, config).await?;
debug!("VOLUME_DETECT: Detected {} volumes", detected_volumes.len());
for vol in &detected_volumes {
debug!(
"VOLUME_DETECT: Found '{}' at {} - Type: {:?}, Auto-track: {}",
vol.name,
vol.mount_point.display(),
vol.volume_type,
vol.auto_track_eligible
);
}
// Query database for tracked volumes to merge metadata
let mut tracked_volumes_map: HashMap<VolumeFingerprint, (Uuid, Option<String>)> =
HashMap::new();
if let Some(lib_mgr) = library_manager.read().await.as_ref() {
if let Some(lib_mgr) = lib_mgr.upgrade() {
let libraries = lib_mgr.get_open_libraries().await;
debug!("DB_MERGE: Found {} open libraries", libraries.len());
for library in libraries {
debug!(
"DB_MERGE: Querying library {} for tracked volumes on device {}",
library.id(),
device_id
);
if let Ok(tracked_vols) = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(device_id))
.all(library.db().conn())
.await
{
debug!(
"DB_MERGE: Found {} tracked volumes in library {}",
tracked_vols.len(),
library.id()
);
for db_vol in tracked_vols {
let fingerprint = VolumeFingerprint(db_vol.fingerprint.clone());
debug!("DB_MERGE: Found tracked volume - fingerprint: {}, display_name: {:?}",
fingerprint.short_id(), db_vol.display_name);
tracked_volumes_map
.insert(fingerprint, (library.id(), db_vol.display_name));
}
} else {
debug!(
"DB_MERGE: Failed to query tracked volumes for library {}",
library.id()
);
}
}
} else {
debug!("DB_MERGE: Library manager weak reference could not be upgraded");
}
} else {
debug!("DB_MERGE: No library manager reference available");
}
let mut current_volumes = volumes.write().await;
let mut cache = path_cache.write().await;
// Track which volumes we've seen in this refresh
let mut seen_fingerprints = std::collections::HashSet::new();
// Process detected volumes
for mut detected in detected_volumes {
let fingerprint = detected.fingerprint.clone();
seen_fingerprints.insert(fingerprint.clone());
// Merge tracked volume metadata from database
if let Some((library_id, display_name)) = tracked_volumes_map.get(&fingerprint) {
detected.is_tracked = true;
detected.library_id = Some(*library_id);
detected.display_name = display_name.clone();
}
debug!(
"Processing volume '{}' with fingerprint {} (exists in cache: {}, is_tracked: {})",
detected.name,
fingerprint.short_id(),
current_volumes.contains_key(&fingerprint),
detected.is_tracked
);
match current_volumes.get(&fingerprint) {
Some(existing) => {
// Volume exists - check for changes
let old_info = VolumeInfo::from(existing);
let new_info = VolumeInfo::from(&detected);
if old_info.is_mounted != new_info.is_mounted
|| old_info.total_bytes_available != new_info.total_bytes_available
|| old_info.error_status != new_info.error_status
{
// Update the volume
let mut updated_volume = detected.clone();
updated_volume.update_info(new_info.clone());
current_volumes.insert(fingerprint.clone(), updated_volume.clone());
// Emit update event
events.emit(Event::VolumeUpdated {
fingerprint: fingerprint.clone(),
old_info: old_info.clone(),
new_info: new_info.clone(),
});
// Emit mount status change if applicable
if old_info.is_mounted != new_info.is_mounted {
events.emit(Event::VolumeMountChanged {
fingerprint: fingerprint.clone(),
is_mounted: new_info.is_mounted,
});
}
// Emit ResourceChanged event for UI reactivity (only for user-visible volumes)
if updated_volume.is_user_visible {
use crate::domain::{resource::Identifiable, volume::Volume};
if let Ok(resource) = serde_json::to_value(&updated_volume) {
events.emit(Event::ResourceChanged {
resource_type: Volume::resource_type().to_string(),
resource,
metadata: None,
});
}
}
}
}
None => {
// New volume discovered
info!("New volume discovered: {}", detected.name);
// Update cache for all mount points
cache.insert(detected.mount_point.clone(), fingerprint.clone());
for mount_point in &detected.mount_points {
cache.insert(mount_point.clone(), fingerprint.clone());
}
current_volumes.insert(fingerprint.clone(), detected.clone());
// Emit volume added event
events.emit(Event::VolumeAdded(detected.clone()));
// Emit ResourceChanged event for UI reactivity (only for user-visible volumes)
if detected.is_user_visible {
debug!(
"Emitting ResourceChanged for user-visible volume: {} (is_user_visible={})",
detected.name, detected.is_user_visible
);
use crate::domain::{resource::Identifiable, volume::Volume};
if let Ok(resource) = serde_json::to_value(&detected) {
events.emit(Event::ResourceChanged {
resource_type: Volume::resource_type().to_string(),
resource,
metadata: None,
});
}
} else {
debug!(
"Skipping ResourceChanged for non-user-visible volume: {} (is_user_visible={})",
detected.name, detected.is_user_visible
);
}
}
}
}
// Check for removed volumes
let removed_fingerprints: Vec<_> = current_volumes
.keys()
.filter(|fp| !seen_fingerprints.contains(fp))
.cloned()
.collect();
for fingerprint in removed_fingerprints {
if let Some(removed_volume) = current_volumes.remove(&fingerprint) {
info!("Volume removed: {}", removed_volume.name);
// Clean up cache entries
cache.retain(|_, fp| fp != &fingerprint);
// Emit volume removed event
events.emit(Event::VolumeRemoved {
fingerprint: fingerprint.clone(),
});
// Emit ResourceDeleted event for UI reactivity (only for user-visible volumes)
if removed_volume.is_user_visible {
use crate::domain::{resource::Identifiable, volume::Volume};
events.emit(Event::ResourceDeleted {
resource_type: Volume::resource_type().to_string(),
resource_id: removed_volume.id,
});
}
}
}
// Update offline status for tracked volumes that are no longer detected
// Note: This requires library context, so we'll add this to a separate method
// that gets called from places where we have library access
debug!(
"Volume refresh completed. Detected {} volumes",
seen_fingerprints.len()
);
Ok(())
}
/// Mark tracked volumes as offline if they're no longer detected
/// This should be called after refresh_volumes_internal when we have library access
pub async fn update_offline_volumes(
&self,
library: &crate::library::Library,
) -> VolumeResult<()> {
let db = library.db().conn();
let current_volumes = self.volumes.read().await;
let detected_fingerprints: std::collections::HashSet<_> =
current_volumes.keys().cloned().collect();
// Get all tracked volumes for this device
let tracked_volumes = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(self.device_id))
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let mut updated_count = 0;
for tracked_volume in tracked_volumes {
let fingerprint = VolumeFingerprint(tracked_volume.fingerprint.clone());
let is_currently_detected = detected_fingerprints.contains(&fingerprint);
// Update online status if it has changed
if tracked_volume.is_online != is_currently_detected {
let mut active_model: entities::volume::ActiveModel = tracked_volume.into();
active_model.is_online = Set(is_currently_detected);
active_model.last_seen_at = Set(chrono::Utc::now());
active_model
.update(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
updated_count += 1;
if is_currently_detected {
debug!("Marked volume {} as online", fingerprint.0);
} else {
debug!("Marked volume {} as offline", fingerprint.0);
}
}
}
if updated_count > 0 {
debug!(
"Updated online status for {} tracked volumes",
updated_count
);
}
Ok(())
}
/// Get volume information for a specific path
#[instrument(skip(self))]
pub async fn volume_for_path(&self, path: &Path) -> Option<Volume> {
// Canonicalize the path to handle relative paths properly
let canonical_path = match path.canonicalize() {
Ok(p) => p,
Err(e) => {
debug!("Failed to canonicalize path {}: {}", path.display(), e);
// If canonicalization fails, try with the original path
path.to_path_buf()
}
};
// Check cache first (use canonical path for cache key)
{
let cache = self.path_cache.read().await;
if let Some(fingerprint) = cache.get(&canonical_path) {
let volumes = self.volumes.read().await;
if let Some(volume) = volumes.get(fingerprint) {
return Some(volume.clone());
}
}
}
// Search through all volumes using canonical path
let volumes = self.volumes.read().await;
for volume in volumes.values() {
if volume.contains_path(&canonical_path) {
// Cache the result using canonical path
let mut cache = self.path_cache.write().await;
cache.insert(canonical_path.clone(), volume.fingerprint.clone());
return Some(volume.clone());
}
}
debug!("No volume found for path: {}", canonical_path.display());
None
}
/// Get all currently known volumes
pub async fn get_all_volumes(&self) -> Vec<Volume> {
self.volumes.read().await.values().cloned().collect()
}
/// Get a specific volume by fingerprint
pub async fn get_volume(&self, fingerprint: &VolumeFingerprint) -> Option<Volume> {
self.volumes.read().await.get(fingerprint).cloned()
}
/// Resolve a volume for an SdPath (unified method for cloud and local paths)
/// This abstracts away the cloud/local path distinction
pub async fn resolve_volume_for_sdpath(
&self,
sdpath: &crate::domain::addressing::SdPath,
_library: &crate::library::Library,
) -> VolumeResult<Option<Volume>> {
// Check if this is a cloud path
if let Some((service, identifier, _path)) = sdpath.as_cloud() {
// Cloud path - use identity-based lookup
Ok(self.find_cloud_volume(service, identifier).await)
} else {
// Local path - resolve by filesystem path
if let Some(local_path) = sdpath.as_local_path() {
Ok(self.volume_for_path(local_path).await)
} else {
Ok(None)
}
}
}
/// Check if two paths are on the same volume
pub async fn same_volume(&self, path1: &Path, path2: &Path) -> bool {
let vol1 = self.volume_for_path(path1).await;
let vol2 = self.volume_for_path(path2).await;
match (vol1, vol2) {
(Some(v1), Some(v2)) => v1.fingerprint == v2.fingerprint,
_ => false,
}
}
/// Check if two paths are on the same physical storage (filesystem-aware)
/// This is the enhanced version that uses filesystem-specific handlers
pub async fn same_physical_storage(&self, path1: &Path, path2: &Path) -> bool {
// 1. Get volumes for both paths
let vol1 = self.volume_for_path(path1).await;
let vol2 = self.volume_for_path(path2).await;
match (&vol1, &vol2) {
(Some(v1), Some(v2)) => {
// 2. Check if same volume first (fast path)
if v1.fingerprint == v2.fingerprint {
debug!("Paths are on the same volume: {}", v1.fingerprint);
return true;
}
// 3. Use filesystem-specific logic for cross-volume checks
if v1.file_system == v2.file_system {
debug!(
"Using filesystem-specific handler for {} to check paths: {} vs {}",
v1.file_system,
path1.display(),
path2.display()
);
return crate::volume::fs::same_physical_storage(path1, path2, &v1.file_system)
.await;
}
debug!(
"Different filesystems: {} vs {}",
v1.file_system, v2.file_system
);
false
}
_ => {
debug!(
"Could not find volumes for paths: {:?}, {:?}",
vol1.is_some(),
vol2.is_some()
);
false
}
}
}
/// Get or initialize the I/O backend for a volume
///
/// This lazily initializes the backend on first access. Local volumes get a
/// LocalBackend pointing to their mount point. Cloud volumes should have their
/// backend set during creation.
pub(crate) fn backend_for_volume(
&self,
volume: &mut Volume,
) -> Arc<dyn crate::volume::VolumeBackend> {
if let Some(backend) = &volume.backend {
return backend.clone();
}
// Lazy-initialize LocalBackend for local volumes
let backend: Arc<dyn crate::volume::VolumeBackend> =
Arc::new(crate::volume::LocalBackend::new(&volume.mount_point));
volume.backend = Some(backend.clone());
backend
}
/// Find volumes with available space
pub async fn volumes_with_space(&self, required_bytes: u64) -> Vec<Volume> {
self.volumes
.read()
.await
.values()
.filter(|vol| vol.available_space >= required_bytes)
.cloned()
.collect()
}
/// Get volume statistics
pub async fn get_statistics(&self) -> VolumeStatistics {
let volumes = self.volumes.read().await;
let total_volumes = volumes.len();
let mounted_volumes = volumes.values().filter(|v| v.is_mounted).count();
let total_capacity: u64 = volumes.values().map(|v| v.total_capacity).sum();
let total_available: u64 = volumes.values().map(|v| v.available_space).sum();
let mut by_type = HashMap::new();
let mut by_filesystem = HashMap::new();
for volume in volumes.values() {
*by_type.entry(volume.disk_type.clone()).or_insert(0) += 1;
*by_filesystem.entry(volume.file_system.clone()).or_insert(0) += 1;
}
VolumeStatistics {
total_volumes,
mounted_volumes,
total_capacity,
total_available,
by_type,
by_filesystem,
}
}
/// Run speed test on a specific volume
#[instrument(skip(self))]
pub async fn run_speed_test(&self, fingerprint: &VolumeFingerprint) -> VolumeResult<()> {
let mut volumes = self.volumes.write().await;
if let Some(volume) = volumes.get_mut(fingerprint) {
info!("Running speed test on volume: {}", volume.name);
match crate::volume::speed::run_speed_test(volume).await {
Ok((read_speed, write_speed)) => {
volume.read_speed_mbps = Some(read_speed);
volume.write_speed_mbps = Some(write_speed);
// Emit speed test event
self.events.emit(Event::VolumeSpeedTested {
fingerprint: fingerprint.clone(),
read_speed_mbps: read_speed,
write_speed_mbps: write_speed,
});
info!(
"Speed test completed: {}MB/s read, {}MB/s write",
read_speed, write_speed
);
Ok(())
}
Err(e) => {
error!("Speed test failed for volume {}: {}", volume.name, e);
// Emit error event
self.events.emit(Event::VolumeError {
fingerprint: fingerprint.clone(),
error: format!("Speed test failed: {}", e),
});
Err(e)
}
}
} else {
Err(VolumeError::NotFound(fingerprint.to_string()))
}
}
/// Clear the path cache (useful after major volume changes)
pub async fn clear_cache(&self) {
self.path_cache.write().await.clear();
debug!("Volume path cache cleared");
}
/// Find cloud volume by service type and identifier
/// Uses mount point cache for O(1) lookup instead of scanning
///
/// # Examples
/// ```ignore
/// let volume = manager.find_cloud_volume(CloudServiceType::S3, "my-bucket").await;
/// ```
pub async fn find_cloud_volume(
&self,
service: crate::volume::backend::CloudServiceType,
identifier: &str,
) -> Option<Volume> {
// Construct the mount point string (e.g., "s3://my-bucket")
let mount_point_key = format!("{}://{}", service.scheme(), identifier);
// Check cache first for O(1) lookup
{
let mount_point_cache = self.mount_point_cache.read().await;
if let Some(fingerprint) = mount_point_cache.get(&mount_point_key) {
let volumes = self.volumes.read().await;
if let Some(volume) = volumes.get(fingerprint) {
return Some(volume.clone());
}
}
}
// Cache miss - fall back to scanning (for volumes added before cache was implemented)
let volumes = self.volumes.read().await;
volumes.values().find_map(|volume| {
if let Some((vol_service, vol_id)) = volume.parse_cloud_identity() {
if vol_service == service && vol_id == identifier {
// Update cache for next time
let mount_point_key = format!("{}://{}", service.scheme(), identifier);
let mut mount_point_cache = self.mount_point_cache.blocking_write();
mount_point_cache.insert(mount_point_key, volume.fingerprint.clone());
return Some(volume.clone());
}
}
None
})
}
/// Ensure mount point is unique by appending -2, -3, etc. if needed
/// Used during volume creation to prevent collisions
///
/// # Examples
/// ```ignore
/// let mount_point = manager.ensure_unique_mount_point("s3://my-bucket").await;
/// // Returns "s3://my-bucket" or "s3://my-bucket-2" if collision exists
/// ```
pub async fn ensure_unique_mount_point(&self, desired: &str) -> PathBuf {
let volumes = self.volumes.read().await;
let base = desired;
let mut candidate = base.to_string();
let mut counter = 2;
while volumes
.values()
.any(|v| v.mount_point.to_string_lossy() == candidate)
{
candidate = format!("{}-{}", base, counter);
counter += 1;
}
PathBuf::from(candidate)
}
/// Register a cloud volume with the volume manager
/// This adds the volume to the internal volumes map so it can be tracked
pub async fn register_cloud_volume(&self, volume: Volume) {
let fingerprint = volume.fingerprint.clone();
let mut volumes = self.volumes.write().await;
info!(
"Registering cloud volume '{}' with fingerprint {}",
volume.name, fingerprint
);
// Update mount point cache for fast cloud volume lookup using cloud_identifier
if let Some((service, identifier)) = volume.parse_cloud_identity() {
let cache_key = format!("{}://{}", service.scheme(), identifier);
let mut mount_point_cache = self.mount_point_cache.write().await;
mount_point_cache.insert(cache_key, fingerprint.clone());
}
volumes.insert(fingerprint.clone(), volume);
}
/// Track a volume in the specified library
pub async fn track_volume(
&self,
library: &crate::library::Library,
fingerprint: &VolumeFingerprint,
display_name: Option<String>,
) -> VolumeResult<entities::volume::Model> {
// Find the volume in our current detected volumes
let volume = {
let volumes = self.volumes.read().await;
volumes
.get(fingerprint)
.cloned()
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?
};
// Try to create/read identifier file for this volume
if let Some(spacedrive_id) = self.manage_spacedrive_identifier(&volume).await {
info!(
"Created/found Spacedrive ID {} for manually tracked volume {}",
spacedrive_id, volume.name
);
// Check if we should upgrade to Spacedrive ID-based fingerprint
let spacedrive_fingerprint = VolumeFingerprint::from_spacedrive_id(spacedrive_id);
if spacedrive_fingerprint != volume.fingerprint {
info!(
"Upgrading fingerprint for volume {} from content-based to Spacedrive ID-based",
volume.name
);
// Note: In a full implementation, we'd want to update the volume's fingerprint
// and potentially migrate database records. For now, we'll log this.
}
}
// Check if volume is already tracked
if let Some(existing) = entities::volume::Entity::find()
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.one(library.db().conn())
.await
.map_err(|e| VolumeError::Database(e.to_string()))?
{
warn!(
"Volume {} is already tracked in library {}",
volume.name,
library.name().await
);
return Ok(existing);
}
// Determine removability and network status
let is_removable = matches!(volume.mount_type, crate::volume::types::MountType::External);
let is_network_drive =
matches!(volume.mount_type, crate::volume::types::MountType::Network);
// Create tracking record
let active_model = entities::volume::ActiveModel {
uuid: Set(volume.id), // Use the volume's UUID
device_id: Set(volume.device_id), // Use Uuid directly
fingerprint: Set(fingerprint.0.clone()),
display_name: Set(display_name.clone()),
tracked_at: Set(chrono::Utc::now()),
last_seen_at: Set(chrono::Utc::now()),
is_online: Set(volume.is_mounted),
total_capacity: Set(Some(volume.total_capacity as i64)),
available_capacity: Set(Some(volume.available_space as i64)),
read_speed_mbps: Set(volume.read_speed_mbps.map(|s| s as i32)),
write_speed_mbps: Set(volume.write_speed_mbps.map(|s| s as i32)),
last_speed_test_at: Set(None),
file_system: Set(Some(volume.file_system.to_string())),
mount_point: Set(Some(volume.mount_point.to_string_lossy().to_string())),
is_removable: Set(Some(is_removable)),
is_network_drive: Set(Some(is_network_drive)),
device_model: Set(volume.hardware_id.clone()),
// Save volume classification fields
volume_type: Set(Some(format!("{:?}", volume.volume_type))),
is_user_visible: Set(Some(volume.is_user_visible)),
auto_track_eligible: Set(Some(volume.auto_track_eligible)),
cloud_identifier: Set(volume.cloud_identifier.clone()),
cloud_config: Set(volume.cloud_config.as_ref().map(|c| c.to_string())),
..Default::default()
};
let model = active_model
.insert(library.db().conn())
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
info!(
"Tracked volume '{}' for library '{}'",
display_name.as_ref().unwrap_or(&volume.name),
library.name().await
);
// Emit tracking event
self.events.emit(Event::Custom {
event_type: "VolumeTracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
"display_name": display_name,
}),
});
Ok(model)
}
/// Untrack a volume from the database
pub async fn untrack_volume(
&self,
library: &crate::library::Library,
fingerprint: &VolumeFingerprint,
) -> VolumeResult<()> {
let db = library.db().conn();
let result = entities::volume::Entity::delete_many()
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.exec(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
if result.rows_affected == 0 {
return Err(VolumeError::NotTracked(fingerprint.to_string()));
}
info!(
"Untracked volume '{}' from library '{}'",
fingerprint.to_string(),
library.name().await
);
// Emit untracking event
self.events.emit(Event::Custom {
event_type: "VolumeUntracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
}),
});
Ok(())
}
/// Get tracked volumes for a library
pub async fn get_tracked_volumes(
&self,
library: &crate::library::Library,
) -> VolumeResult<Vec<TrackedVolume>> {
let db = library.db().conn();
let volumes = entities::volume::Entity::find()
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let tracked_volumes: Vec<TrackedVolume> = volumes
.into_iter()
.map(|model| model.to_tracked_volume())
.collect();
debug!(
"Found {} tracked volumes for library '{}'",
tracked_volumes.len(),
library.name().await
);
Ok(tracked_volumes)
}
/// Check if a volume is tracked in a specific library
pub async fn is_volume_tracked(
&self,
library: &crate::library::Library,
fingerprint: &VolumeFingerprint,
) -> VolumeResult<bool> {
let db = library.db().conn();
// Get the volume to find its device_id
let volume = self
.get_volume(fingerprint)
.await
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?;
let count = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.count(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
Ok(count > 0)
}
/// Update tracked volume state during refresh
pub async fn update_tracked_volume_state(
&self,
library: &crate::library::Library,
fingerprint: &VolumeFingerprint,
volume: &Volume,
) -> VolumeResult<()> {
let db = library.db().conn();
let mut active_model: entities::volume::ActiveModel = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.one(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?
.ok_or_else(|| VolumeError::NotTracked(fingerprint.to_string()))?
.into();
active_model.last_seen_at = Set(chrono::Utc::now());
active_model.is_online = Set(volume.is_mounted);
active_model.total_capacity = Set(Some(volume.total_capacity as i64));
active_model.available_capacity = Set(Some(volume.available_space as i64));
active_model
.update(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
Ok(())
}
/// Update display names for tracked volumes that have empty display names
pub async fn update_empty_display_names(
&self,
library: &crate::library::Library,
) -> VolumeResult<usize> {
let db = library.db().conn();
// Find tracked volumes with empty display names
let volumes_to_update = entities::volume::Entity::find()
.filter(
entities::volume::Column::DisplayName
.is_null()
.or(entities::volume::Column::DisplayName.eq("")),
)
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let mut updated_count = 0;
for tracked_volume in volumes_to_update {
let fingerprint = VolumeFingerprint(tracked_volume.fingerprint.clone());
// Get the current volume info to get the name
if let Some(volume) = self.get_volume(&fingerprint).await {
let volume_name = volume.name.clone();
let mut active_model: entities::volume::ActiveModel = tracked_volume.into();
active_model.display_name = Set(Some(volume.name));
match active_model.update(db).await {
Ok(_) => {
updated_count += 1;
info!("Updated display name for volume: {}", volume_name);
}
Err(e) => {
warn!(
"Failed to update display name for volume {}: {}",
fingerprint.0, e
);
}
}
}
}
info!(
"Updated display names for {} volumes in library '{}'",
updated_count,
library.name().await
);
Ok(updated_count)
}
/// Get all system volumes (boot/OS volumes)
pub async fn get_system_volumes(&self) -> Vec<Volume> {
self.volumes
.read()
.await
.values()
.filter(|v| matches!(v.mount_type, crate::volume::types::MountType::System))
.cloned()
.collect()
}
/// Auto-track eligible volumes (only Primary system volume)
pub async fn auto_track_user_volumes(
&self,
library: &crate::library::Library,
) -> VolumeResult<Vec<entities::volume::Model>> {
let all_volumes = self.volumes.read().await;
let total_count = all_volumes.len();
debug!("AUTO_TRACK: Total volumes detected: {}", total_count);
for (fp, vol) in all_volumes.iter() {
debug!(
"AUTO_TRACK: Volume '{}' - Type: {:?}, Eligible: {}, Fingerprint: {}",
vol.name, vol.volume_type, vol.auto_track_eligible, fp
);
}
let eligible_volumes: Vec<_> = all_volumes
.values()
.filter(|v| v.auto_track_eligible)
.cloned()
.collect();
drop(all_volumes);
debug!(
"AUTO_TRACK: Eligible volumes for tracking: {}",
eligible_volumes.len()
);
let mut tracked_volumes = Vec::new();
for volume in eligible_volumes {
// Try to create/read identifier file for better fingerprinting
if let Some(spacedrive_id) = self.manage_spacedrive_identifier(&volume).await {
info!(
"Using Spacedrive ID {} for volume {} fingerprinting",
spacedrive_id, volume.name
);
// We could update the fingerprint here, but for now we'll keep using the existing one
// to maintain compatibility with already tracked volumes
}
if !self.is_volume_tracked(library, &volume.fingerprint).await? {
// Use display_name if available, otherwise fall back to name
let display_name = volume
.display_name
.clone()
.unwrap_or_else(|| volume.name.clone());
match self
.track_volume(library, &volume.fingerprint, Some(display_name.clone()))
.await
{
Ok(tracked_volume) => {
info!("Auto-tracked volume: {}", display_name);
tracked_volumes.push(tracked_volume);
}
Err(e) => {
warn!("Failed to auto-track volume {}: {}", display_name, e);
}
}
}
}
Ok(tracked_volumes)
}
/// Automatically track system volumes for a library (legacy - use auto_track_user_volumes instead)
pub async fn auto_track_system_volumes(
&self,
library: &crate::library::Library,
) -> VolumeResult<Vec<entities::volume::Model>> {
// Use the new filtered auto-tracking
self.auto_track_user_volumes(library).await
}
/// Save speed test results to all libraries where this volume is tracked
pub async fn save_speed_test_results(
&self,
fingerprint: &VolumeFingerprint,
read_speed_mbps: u64,
write_speed_mbps: u64,
libraries: &[Arc<crate::library::Library>],
) -> VolumeResult<()> {
for library in libraries {
// Check if this volume is tracked in this library
if self.is_volume_tracked(library, fingerprint).await? {
let db = library.db().conn();
// Get the volume to find its device_id
let volume = self
.get_volume(fingerprint)
.await
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?;
// Update the tracked volume record with speed test results
let now = chrono::Utc::now();
let update_result = entities::volume::Entity::update_many()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.set(entities::volume::ActiveModel {
read_speed_mbps: Set(Some(read_speed_mbps as i32)),
write_speed_mbps: Set(Some(write_speed_mbps as i32)),
last_speed_test_at: Set(Some(now)),
..Default::default()
})
.exec(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
if update_result.rows_affected > 0 {
info!(
"Saved speed test results for volume {} in library {}: {}MB/s read, {}MB/s write",
fingerprint.0,
library.name().await,
read_speed_mbps,
write_speed_mbps
);
}
}
}
Ok(())
}
/// Calculate and save unique bytes for a specific volume (owned by this device)
/// This deduplicates content using content_identity hashes
pub async fn calculate_and_save_unique_bytes(
&self,
fingerprint: &VolumeFingerprint,
libraries: &[Arc<crate::library::Library>],
) -> VolumeResult<()> {
use sea_orm::{DbBackend, FromQueryResult, Statement};
for library in libraries {
// Check if this volume is tracked in this library
if self.is_volume_tracked(library, fingerprint).await? {
let db = library.db().conn();
// Get the volume from database to get mount_point and verify ownership
let db_volume = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(self.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.one(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let db_volume = match db_volume {
Some(v) => v,
None => {
debug!(
"Volume {} not found or not owned by this device, skipping unique_bytes calculation",
fingerprint.0
);
continue;
}
};
let mount_point = match &db_volume.mount_point {
Some(mp) => mp,
None => {
debug!(
"Volume {} has no mount point, cannot calculate unique_bytes",
fingerprint.0
);
continue;
}
};
info!(
"Calculating unique bytes for volume {} in library {}",
fingerprint.0,
library.name().await
);
// Calculate unique bytes using content_identity deduplication
let query = r#"
SELECT COALESCE(SUM(unique_size), 0) as unique_bytes
FROM (
SELECT ci.content_hash, ci.total_size as unique_size
FROM entries e
INNER JOIN directory_paths dp ON e.id = dp.entry_id
INNER JOIN content_identities ci ON e.content_id = ci.id
WHERE dp.path LIKE ? || '%'
AND e.kind = 0
GROUP BY ci.content_hash, ci.total_size
)
"#;
#[derive(FromQueryResult)]
struct UniqueResult {
unique_bytes: i64,
}
let result = UniqueResult::find_by_statement(Statement::from_sql_and_values(
DbBackend::Sqlite,
query,
vec![mount_point.clone().into()],
))
.one(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let unique_bytes = result.map(|r| r.unique_bytes).unwrap_or(0);
// Update the volume record with calculated unique_bytes
let update_result = entities::volume::Entity::update_many()
.filter(entities::volume::Column::DeviceId.eq(self.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.set(entities::volume::ActiveModel {
unique_bytes: Set(Some(unique_bytes)),
..Default::default()
})
.exec(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
if update_result.rows_affected > 0 {
info!(
"Saved unique_bytes for volume {} in library {}: {} bytes ({:.2} GB)",
fingerprint.0,
library.name().await,
unique_bytes,
unique_bytes as f64 / 1_073_741_824.0
);
}
}
}
Ok(())
}
/// Calculate and save unique bytes for all volumes owned by this device
pub async fn calculate_unique_bytes_for_owned_volumes(
&self,
libraries: &[Arc<crate::library::Library>],
) -> VolumeResult<()> {
info!(
"Calculating unique bytes for all volumes owned by device {}",
self.device_id
);
for library in libraries {
let db = library.db().conn();
// Get all tracked volumes owned by this device
let owned_volumes = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(self.device_id))
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
info!(
"Found {} volumes owned by this device in library {}",
owned_volumes.len(),
library.name().await
);
for volume in owned_volumes {
let fingerprint = VolumeFingerprint(volume.fingerprint.clone());
// Calculate and save unique bytes for this volume
if let Err(e) = self
.calculate_and_save_unique_bytes(&fingerprint, &[library.clone()])
.await
{
warn!(
"Failed to calculate unique_bytes for volume {}: {}",
fingerprint.0, e
);
// Continue with other volumes even if one fails
}
}
}
Ok(())
}
/// Get volume by short ID
pub async fn get_volume_by_short_id(&self, short_id: &str) -> Option<Volume> {
if !VolumeFingerprint::is_short_id(short_id) && !VolumeFingerprint::is_medium_id(short_id) {
return None;
}
let volumes = self.volumes.read().await;
for volume in volumes.values() {
if volume.fingerprint.matches_short_id(short_id) {
return Some(volume.clone());
}
}
None
}
/// Get volumes that match a partial name (for smart name matching)
pub async fn get_volumes_by_name(&self, name: &str) -> Vec<Volume> {
let volumes = self.volumes.read().await;
let name_lower = name.to_lowercase();
volumes
.values()
.filter(|volume| volume.name.to_lowercase().contains(&name_lower))
.cloned()
.collect()
}
/// Create or read Spacedrive identifier file for a volume
/// Returns the UUID from the identifier file if successfully created/read
async fn manage_spacedrive_identifier(&self, volume: &Volume) -> Option<Uuid> {
// Skip cloud volumes - they don't have filesystem mount points
if matches!(
volume.volume_type,
crate::volume::types::VolumeType::Network
) && matches!(volume.mount_type, crate::volume::types::MountType::Network)
{
debug!(
"Skipping Spacedrive identifier management for cloud volume: {}",
volume.name
);
return None;
}
let id_file_path = volume.mount_point.join(SPACEDRIVE_VOLUME_ID_FILE);
// Try to read existing identifier file
if let Ok(content) = fs::read_to_string(&id_file_path).await {
if let Ok(spacedrive_id) = serde_json::from_str::<SpacedriveVolumeId>(&content) {
debug!(
"Found existing Spacedrive ID {} for volume {}",
spacedrive_id.id, volume.name
);
return Some(spacedrive_id.id);
}
}
// Try to create new identifier file if volume is writable
if !volume.is_read_only && volume.mount_point.exists() {
let spacedrive_id = SpacedriveVolumeId {
id: Uuid::new_v4(),
created: chrono::Utc::now(),
device_name: None, // TODO: Get from DeviceManager when available
volume_name: volume.name.clone(),
device_id: volume.device_id,
library_id: Uuid::nil(), // TODO: Populate from library context when available
};
if let Ok(content) = serde_json::to_string_pretty(&spacedrive_id) {
match fs::write(&id_file_path, content).await {
Ok(()) => {
info!(
"Created Spacedrive ID {} for volume {} at {}",
spacedrive_id.id,
volume.name,
id_file_path.display()
);
return Some(spacedrive_id.id);
}
Err(e) => {
debug!(
"Failed to write Spacedrive ID file to {}: {}",
id_file_path.display(),
e
);
}
}
}
}
debug!(
"Could not create or read Spacedrive identifier for volume {} (read_only: {}, exists: {})",
volume.name,
volume.is_read_only,
volume.mount_point.exists()
);
None
}
/// Read Spacedrive identifier file from a volume if it exists
pub async fn read_spacedrive_identifier(
&self,
mount_point: &Path,
) -> Option<SpacedriveVolumeId> {
let id_file_path = mount_point.join(SPACEDRIVE_VOLUME_ID_FILE);
if let Ok(content) = fs::read_to_string(&id_file_path).await {
if let Ok(spacedrive_id) = serde_json::from_str::<SpacedriveVolumeId>(&content) {
return Some(spacedrive_id);
}
}
None
}
}
/// Statistics about detected volumes
#[derive(Debug, Clone)]
pub struct VolumeStatistics {
pub total_volumes: usize,
pub mounted_volumes: usize,
pub total_capacity: u64,
pub total_available: u64,
pub by_type: HashMap<crate::volume::types::DiskType, usize>,
pub by_filesystem: HashMap<crate::volume::types::FileSystem, usize>,
}
impl Drop for VolumeManager {
fn drop(&mut self) {
// Ensure monitoring is stopped when manager is dropped
let is_monitoring = self.is_monitoring.clone();
tokio::spawn(async move {
*is_monitoring.write().await = false;
});
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_events() -> Arc<EventBus> {
Arc::new(EventBus::default())
}
#[tokio::test]
async fn test_volume_manager_creation() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
let stats = manager.get_statistics().await;
assert_eq!(stats.total_volumes, 0);
}
#[tokio::test]
async fn test_volume_path_lookup() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
// Initially no volumes
let volume = manager
.volume_for_path(&PathBuf::from("/nonexistent"))
.await;
assert!(volume.is_none());
}
#[tokio::test]
async fn test_same_volume_check() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
// Both paths don't exist, so should return false
let same = manager
.same_volume(&PathBuf::from("/path1"), &PathBuf::from("/path2"))
.await;
assert!(!same);
}
}