feat(volume): Enhance volume management with device ID integration

- Updated VolumeManager to accept a device ID during initialization, allowing for better tracking of volumes associated with specific devices.
- Modified volume detection logic to utilize the device ID, improving the accuracy of volume management across different devices.
- Introduced new volume commands in the CLI for managing volumes, including tracking, untracking, and refreshing volume lists.
- Enhanced database schema to include device ID in the volumes table, ensuring proper relationships between volumes and their respective devices.
- Added functionality to save speed test results for tracked volumes, improving performance monitoring capabilities.

This update significantly improves the volume management system, enabling more robust tracking and operations across multiple devices.
This commit is contained in:
Jamie Pine
2025-07-26 14:51:21 -07:00
parent d283297bc6
commit eebd702f7d
17 changed files with 2219 additions and 1425 deletions

View File

@@ -5,375 +5,379 @@
//! - Checking daemon status
//! - Managing multiple daemon instances
use crate::infrastructure::cli::daemon::{Daemon, DaemonClient, DaemonCommand, DaemonConfig, DaemonResponse};
use crate::infrastructure::cli::daemon::{
Daemon, DaemonClient, DaemonCommand, DaemonConfig, DaemonResponse,
};
use crate::infrastructure::cli::output::messages::LibraryInfo as OutputLibraryInfo;
use crate::infrastructure::cli::output::{CliOutput, Message};
use crate::infrastructure::cli::output::messages::{LibraryInfo as OutputLibraryInfo};
use clap::Subcommand;
use comfy_table::Table;
use std::path::PathBuf;
#[derive(Subcommand, Clone, Debug)]
pub enum DaemonCommands {
/// Start the Spacedrive daemon in the background
Start {
/// Run in foreground instead of daemonizing
#[arg(short, long)]
foreground: bool,
/// Enable networking on startup
#[arg(long)]
enable_networking: bool,
},
/// Start the Spacedrive daemon in the background
Start {
/// Run in foreground instead of daemonizing
#[arg(long)]
foreground: bool,
/// Enable networking on startup
#[arg(long)]
enable_networking: bool,
},
/// Stop the Spacedrive daemon
Stop,
/// Stop the Spacedrive daemon
Stop,
/// Check if the daemon is running and show status
Status,
/// Check if the daemon is running and show status
Status,
/// Manage daemon instances
#[command(subcommand)]
Instance(InstanceCommands),
/// Manage daemon instances
#[command(subcommand)]
Instance(InstanceCommands),
}
#[derive(Subcommand, Clone, Debug)]
pub enum InstanceCommands {
/// List all daemon instances
List,
/// Stop a specific daemon instance
Stop {
/// Instance name to stop
name: String,
},
/// Show currently targeted instance
Current,
/// List all daemon instances
List,
/// Stop a specific daemon instance
Stop {
/// Instance name to stop
name: String,
},
/// Show currently targeted instance
Current,
}
pub async fn handle_daemon_command(
cmd: DaemonCommands,
data_dir: PathBuf,
instance_name: Option<String>,
mut output: CliOutput,
cmd: DaemonCommands,
data_dir: PathBuf,
instance_name: Option<String>,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
match cmd {
DaemonCommands::Start {
foreground,
enable_networking,
} => {
handle_start_daemon(data_dir, foreground, enable_networking, instance_name, output).await
}
DaemonCommands::Stop => {
handle_stop_daemon(instance_name, output).await
}
DaemonCommands::Status => {
handle_daemon_status(instance_name, output).await
}
DaemonCommands::Instance(instance_cmd) => {
handle_instance_command(instance_cmd, output).await
}
}
match cmd {
DaemonCommands::Start {
foreground,
enable_networking,
} => {
handle_start_daemon(
data_dir,
foreground,
enable_networking,
instance_name,
output,
)
.await
}
DaemonCommands::Stop => handle_stop_daemon(instance_name, output).await,
DaemonCommands::Status => handle_daemon_status(instance_name, output).await,
DaemonCommands::Instance(instance_cmd) => {
handle_instance_command(instance_cmd, output).await
}
}
}
async fn handle_start_daemon(
data_dir: PathBuf,
foreground: bool,
enable_networking: bool,
instance_name: Option<String>,
mut output: CliOutput,
data_dir: PathBuf,
foreground: bool,
enable_networking: bool,
instance_name: Option<String>,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
if Daemon::is_running_instance(instance_name.clone()) {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.warning(&format!(
"Spacedrive daemon instance '{}' is already running",
instance_display
))?;
return Ok(());
}
if Daemon::is_running_instance(instance_name.clone()) {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.warning(&format!(
"Spacedrive daemon instance '{}' is already running",
instance_display
))?;
return Ok(());
}
output.print(Message::DaemonStarting {
instance: instance_name.as_deref().unwrap_or("default").to_string(),
})?;
output.print(Message::DaemonStarting {
instance: instance_name.as_deref().unwrap_or("default").to_string(),
})?;
if foreground {
// Run in foreground
if enable_networking {
// For networking enabled startup, we need a default password
output.info("Starting daemon with networking enabled...")?;
output.info("Using master key for secure device authentication.")?;
if foreground {
// Run in foreground
if enable_networking {
// For networking enabled startup, we need a default password
output.info("Starting daemon with networking enabled...")?;
output.info("Using master key for secure device authentication.")?;
match Daemon::new_with_networking_and_instance(
data_dir.clone(),
instance_name.clone(),
)
.await
{
Ok(daemon) => daemon.start().await?,
Err(e) => {
output.error(Message::Error(format!("Failed to start daemon with networking: {}", e)))?;
output.info("Falling back to daemon without networking...")?;
let daemon =
Daemon::new_with_instance(data_dir, instance_name.clone()).await?;
daemon.start().await?;
}
}
} else {
let daemon = Daemon::new_with_instance(data_dir, instance_name.clone()).await?;
daemon.start().await?;
}
} else {
// Daemonize (simplified version - in production use proper daemonization)
use std::process::Command;
match Daemon::new_with_networking_and_instance(data_dir.clone(), instance_name.clone())
.await
{
Ok(daemon) => daemon.start().await?,
Err(e) => {
output.error(Message::Error(format!(
"Failed to start daemon with networking: {}",
e
)))?;
output.info("Falling back to daemon without networking...")?;
let daemon = Daemon::new_with_instance(data_dir, instance_name.clone()).await?;
daemon.start().await?;
}
}
} else {
let daemon = Daemon::new_with_instance(data_dir, instance_name.clone()).await?;
daemon.start().await?;
}
} else {
// Daemonize (simplified version - in production use proper daemonization)
use std::process::Command;
let exe = std::env::current_exe()?;
let mut cmd = Command::new(exe);
cmd.arg("daemon")
.arg("start")
.arg("--foreground")
.arg("--data-dir")
.arg(data_dir);
let exe = std::env::current_exe()?;
let mut cmd = Command::new(exe);
cmd.arg("daemon")
.arg("start")
.arg("--foreground")
.arg("--data-dir")
.arg(data_dir);
if let Some(ref instance) = instance_name {
cmd.arg("--instance").arg(instance);
}
if let Some(ref instance) = instance_name {
cmd.arg("--instance").arg(instance);
}
if enable_networking {
cmd.arg("--enable-networking");
}
if enable_networking {
cmd.arg("--enable-networking");
}
// Detach from terminal
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
// Detach from terminal
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
unsafe {
cmd.pre_exec(|| {
// Create new session
libc::setsid();
Ok(())
});
}
}
unsafe {
cmd.pre_exec(|| {
// Create new session
libc::setsid();
Ok(())
});
}
}
cmd.spawn()?;
cmd.spawn()?;
// Wait a bit to see if it started
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Wait for daemon to be ready (try connecting to socket)
output.info("Waiting for daemon to initialize...")?;
if Daemon::is_running_instance(instance_name.clone()) {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.success(&format!(
"Spacedrive daemon instance '{}' started successfully",
instance_display
))?;
} else {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.error(Message::Error(format!(
"Failed to start Spacedrive daemon instance '{}'",
instance_display
)))?;
}
}
if Daemon::wait_for_ready(instance_name.clone(), 10).await? {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.success(&format!(
"Spacedrive daemon instance '{}' started successfully",
instance_display
))?;
} else {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.error(Message::Error(format!(
"Failed to start Spacedrive daemon instance '{}' (timed out waiting for readiness)",
instance_display
)))?;
}
}
Ok(())
Ok(())
}
async fn handle_stop_daemon(
instance_name: Option<String>,
mut output: CliOutput,
instance_name: Option<String>,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
if !Daemon::is_running_instance(instance_name.clone()) {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.warning(&format!(
"Spacedrive daemon instance '{}' is not running",
instance_display
))?;
return Ok(());
}
if !Daemon::is_running_instance(instance_name.clone()) {
let instance_display = instance_name.as_deref().unwrap_or("default");
output.warning(&format!(
"Spacedrive daemon instance '{}' is not running",
instance_display
))?;
return Ok(());
}
let instance_display = instance_name.as_deref().unwrap_or("default");
output.print(Message::DaemonStopping {
instance: instance_display.to_string(),
})?;
Daemon::stop_instance(instance_name.clone()).await?;
let instance_display = instance_name.as_deref().unwrap_or("default");
output.print(Message::DaemonStopping {
instance: instance_display.to_string(),
})?;
Daemon::stop_instance(instance_name.clone()).await?;
// Wait a bit to ensure it's stopped
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Wait a bit to ensure it's stopped
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if !Daemon::is_running_instance(instance_name.clone()) {
output.print(Message::DaemonStopped {
instance: instance_display.to_string(),
})?;
} else {
output.error(Message::Error(format!(
"Failed to stop Spacedrive daemon instance '{}'",
instance_display
)))?;
}
if !Daemon::is_running_instance(instance_name.clone()) {
output.print(Message::DaemonStopped {
instance: instance_display.to_string(),
})?;
} else {
output.error(Message::Error(format!(
"Failed to stop Spacedrive daemon instance '{}'",
instance_display
)))?;
}
Ok(())
Ok(())
}
async fn handle_daemon_status(
instance_name: Option<String>,
mut output: CliOutput,
instance_name: Option<String>,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
let instance_display = instance_name.as_deref().unwrap_or("default");
let instance_display = instance_name.as_deref().unwrap_or("default");
if Daemon::is_running_instance(instance_name.clone()) {
output.success(&format!(
"Spacedrive daemon instance '{}' is running",
instance_display
))?;
if Daemon::is_running_instance(instance_name.clone()) {
output.success(&format!(
"Spacedrive daemon instance '{}' is running",
instance_display
))?;
// Try to get more info from daemon
let client = DaemonClient::new_with_instance(instance_name);
// Try to get more info from daemon
let client = DaemonClient::new_with_instance(instance_name);
// Get status
match client.send_command(DaemonCommand::GetStatus).await {
Ok(DaemonResponse::Status(status)) => {
output.section()
.title("Status")
.status("Version", &status.version)
.status("Uptime", &format!("{} seconds", status.uptime_secs))
.status("Active Jobs", &status.active_jobs.to_string())
.status("Total Locations", &status.total_locations.to_string())
.render()?;
}
Err(e) => {
output.warning(&format!("Could not get status: {}", e))?;
}
_ => {}
}
// Get status
match client.send_command(DaemonCommand::GetStatus).await {
Ok(DaemonResponse::Status(status)) => {
output
.section()
.title("Status")
.status("Version", &status.version)
.status("Uptime", &format!("{} seconds", status.uptime_secs))
.status("Active Jobs", &status.active_jobs.to_string())
.status("Total Locations", &status.total_locations.to_string())
.render()?;
}
Err(e) => {
output.warning(&format!("Could not get status: {}", e))?;
}
_ => {}
}
// Get libraries
match client
.send_command(DaemonCommand::ListLibraries)
.await
{
Ok(DaemonResponse::Libraries(libraries)) => {
if !libraries.is_empty() {
output.section()
.empty_line()
.title("Libraries")
.render()?;
for lib in &libraries {
output.section()
.text(&format!("{} ({})", lib.name, lib.id))
.render()?;
}
}
}
Err(e) => {
output.warning(&format!("Could not get libraries: {}", e))?;
}
_ => {}
}
// Get libraries
match client.send_command(DaemonCommand::ListLibraries).await {
Ok(DaemonResponse::Libraries(libraries)) => {
if !libraries.is_empty() {
output.section().empty_line().title("Libraries").render()?;
// Get current library
match client
.send_command(DaemonCommand::GetCurrentLibrary)
.await
{
Ok(DaemonResponse::CurrentLibrary(lib_opt)) => {
if let Some(lib) = lib_opt {
output.section()
.empty_line()
.title("Current Library")
.item("Name", &lib.name)
.item("ID", &lib.id.to_string())
.item("Path", &lib.path.display().to_string())
.render()?;
} else {
output.section()
.empty_line()
.title("Current Library")
.text("None selected")
.render()?;
}
}
Err(e) => {
output.warning(&format!("Could not get current library: {}", e))?;
}
_ => {}
}
} else {
output.error(Message::DaemonNotRunning {
instance: instance_display.to_string(),
})?;
let start_cmd = if instance_name.is_some() {
format!("spacedrive --instance {} start", instance_display)
} else {
"spacedrive start".to_string()
};
output.section()
.help()
.item(&format!("Start it with: {}", start_cmd))
.render()?;
}
for lib in &libraries {
output
.section()
.text(&format!("{} ({})", lib.name, lib.id))
.render()?;
}
}
}
Err(e) => {
output.warning(&format!("Could not get libraries: {}", e))?;
}
_ => {}
}
Ok(())
// Get current library
match client.send_command(DaemonCommand::GetCurrentLibrary).await {
Ok(DaemonResponse::CurrentLibrary(lib_opt)) => {
if let Some(lib) = lib_opt {
output
.section()
.empty_line()
.title("Current Library")
.item("Name", &lib.name)
.item("ID", &lib.id.to_string())
.item("Path", &lib.path.display().to_string())
.render()?;
} else {
output
.section()
.empty_line()
.title("Current Library")
.text("None selected")
.render()?;
}
}
Err(e) => {
output.warning(&format!("Could not get current library: {}", e))?;
}
_ => {}
}
} else {
output.error(Message::DaemonNotRunning {
instance: instance_display.to_string(),
})?;
let start_cmd = if instance_name.is_some() {
format!("spacedrive --instance {} start", instance_display)
} else {
"spacedrive start".to_string()
};
output
.section()
.help()
.item(&format!("Start it with: {}", start_cmd))
.render()?;
}
Ok(())
}
async fn handle_instance_command(cmd: InstanceCommands, mut output: CliOutput) -> Result<(), Box<dyn std::error::Error>> {
match cmd {
InstanceCommands::List => match Daemon::list_instances() {
Ok(instances) => {
if instances.is_empty() {
output.info("No daemon instances found")?;
} else {
let mut table = Table::new();
table.set_header(vec!["Instance", "Status", "Socket Path"]);
async fn handle_instance_command(
cmd: InstanceCommands,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
match cmd {
InstanceCommands::List => match Daemon::list_instances() {
Ok(instances) => {
if instances.is_empty() {
output.info("No daemon instances found")?;
} else {
let mut table = Table::new();
table.set_header(vec!["Instance", "Status", "Socket Path"]);
for instance in instances {
let status = if instance.is_running {
"Running"
} else {
"Stopped"
};
for instance in instances {
let status = if instance.is_running {
"Running"
} else {
"Stopped"
};
table.add_row(vec![
instance.display_name().to_string(),
status.to_string(),
instance.socket_path.display().to_string(),
]);
}
table.add_row(vec![
instance.display_name().to_string(),
status.to_string(),
instance.socket_path.display().to_string(),
]);
}
output.section()
.table(table)
.render()?;
}
}
Err(e) => {
output.error(Message::Error(format!("Failed to list instances: {}", e)))?;
}
},
output.section().table(table).render()?;
}
}
Err(e) => {
output.error(Message::Error(format!("Failed to list instances: {}", e)))?;
}
},
InstanceCommands::Stop { name } => {
let instance_name = if name == "default" {
None
} else {
Some(name.clone())
};
match Daemon::stop_instance(instance_name).await {
Ok(_) => {
output.success(&format!("Daemon instance '{}' stopped", name))?;
}
Err(e) => {
output.error(Message::Error(format!("Failed to stop instance '{}': {}", name, e)))?;
}
}
}
InstanceCommands::Stop { name } => {
let instance_name = if name == "default" {
None
} else {
Some(name.clone())
};
match Daemon::stop_instance(instance_name).await {
Ok(_) => {
output.success(&format!("Daemon instance '{}' stopped", name))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to stop instance '{}': {}",
name, e
)))?;
}
}
}
InstanceCommands::Current => {
output.info("Current instance functionality not yet implemented")?;
output.info("Use --instance <name> flag to target specific instances")?;
}
}
InstanceCommands::Current => {
output.info("Current instance functionality not yet implemented")?;
output.info("Use --instance <name> flag to target specific instances")?;
}
}
Ok(())
}
Ok(())
}

View File

@@ -1,49 +1,390 @@
//! Volume CLI commands
use crate::infrastructure::cli::daemon::{DaemonClient, DaemonCommand, DaemonResponse};
use crate::infrastructure::cli::output::{CliOutput, Message};
use clap::Subcommand;
use comfy_table::Table;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Volume-related commands
/// Volume management commands
#[derive(Debug, Clone, Subcommand, Serialize, Deserialize)]
pub enum VolumeCommands {
/// List all volumes
List,
/// Show details for a specific volume
Get {
/// Volume fingerprint
fingerprint: String,
},
/// Track a volume in a library
Track {
/// Library ID
library_id: Uuid,
/// Volume fingerprint
fingerprint: String,
/// Optional name for the tracked volume
#[arg(short, long)]
name: Option<String>,
},
/// Untrack a volume from a library
Untrack {
/// Library ID
library_id: Uuid,
/// Volume fingerprint
fingerprint: String,
},
/// Run speed test on a volume
SpeedTest {
/// Volume fingerprint
fingerprint: String,
},
/// Refresh volume list
Refresh,
}
/// List all volumes
List,
/// Show details for a specific volume
Get {
/// Volume fingerprint
fingerprint: String,
},
/// Track a volume in a library
Track {
/// Volume fingerprint
fingerprint: String,
/// Optional name for the tracked volume
#[arg(short, long)]
name: Option<String>,
},
/// Untrack a volume from a library
Untrack {
/// Volume fingerprint
fingerprint: String,
},
/// Run speed test on a volume
SpeedTest {
/// Volume fingerprint
fingerprint: String,
},
/// Refresh volume list
Refresh,
/// Fix empty display names for tracked volumes
FixNames,
}
pub async fn handle_volume_command(
cmd: VolumeCommands,
instance_name: Option<String>,
mut output: CliOutput,
) -> Result<(), Box<dyn std::error::Error>> {
let mut client = DaemonClient::new_with_instance(instance_name.clone());
match cmd {
VolumeCommands::List => {
output.info("Fetching volumes...")?;
match client
.send_command(DaemonCommand::Volume(VolumeCommands::List))
.await
{
Ok(DaemonResponse::VolumeListWithTracking(volume_infos)) => {
if volume_infos.is_empty() {
output.info("No volumes found")?;
} else {
output.info(&format!("Found {} volume(s):", volume_infos.len()))?;
let mut table = Table::new();
table.set_header(vec![
"Name",
"Mount Point",
"File System",
"Capacity",
"Available",
"Status",
"Tracked",
]);
for volume_info in volume_infos {
let volume = volume_info["volume"].as_object().unwrap();
let is_tracked = volume_info["is_tracked"].as_bool().unwrap_or(false);
let tracked_name = volume_info["tracked_name"].as_str();
let name = volume["name"].as_str().unwrap_or("Unknown");
let mount_point = volume["mount_point"].as_str().unwrap_or("Unknown");
let file_system = volume["file_system"].as_str().unwrap_or("Unknown");
let total_capacity =
volume["total_bytes_capacity"].as_u64().unwrap_or(0);
let available_space =
volume["total_bytes_available"].as_u64().unwrap_or(0);
let is_mounted = volume["is_mounted"].as_bool().unwrap_or(false);
// Format capacity in a human-readable way
let capacity_str = if total_capacity > 0 {
format_bytes(total_capacity)
} else {
"Unknown".to_string()
};
let available_str = if available_space > 0 {
format_bytes(available_space)
} else {
"Unknown".to_string()
};
let status = if is_mounted { "Mounted" } else { "Unmounted" };
let tracked_status = if is_tracked {
if let Some(custom_name) = tracked_name {
format!("Yes ({})", custom_name)
} else {
"Yes".to_string()
}
} else {
"No".to_string()
};
table.add_row(vec![
name.to_string(),
mount_point.to_string(),
file_system.to_string(),
capacity_str,
available_str,
status.to_string(),
tracked_status,
]);
}
output.section().table(table).render()?;
}
}
Ok(DaemonResponse::VolumeList(volumes)) => {
// Fallback for when no current library is set
if volumes.is_empty() {
output.info("No volumes found")?;
} else {
output.info(&format!("Found {} volume(s):", volumes.len()))?;
output.info("💡 Set a current library to see tracking status: spacedrive library switch <id>")?;
let mut table = Table::new();
table.set_header(vec![
"Name",
"Mount Point",
"File System",
"Capacity",
"Available",
"Status",
]);
for volume in volumes {
let capacity_str = if volume.total_bytes_capacity > 0 {
format_bytes(volume.total_bytes_capacity)
} else {
"Unknown".to_string()
};
let available_str = if volume.total_bytes_available > 0 {
format_bytes(volume.total_bytes_available)
} else {
"Unknown".to_string()
};
let status = if volume.is_mounted {
"Mounted"
} else {
"Unmounted"
};
table.add_row(vec![
volume.name,
volume.mount_point.display().to_string(),
volume.file_system.to_string(),
capacity_str,
available_str,
status.to_string(),
]);
}
output.section().table(table).render()?;
}
}
Ok(DaemonResponse::Error(msg)) => {
output.error(Message::Error(msg))?;
}
Ok(_) => {
output.error(Message::Error("Unexpected response".to_string()))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to communicate with daemon: {}",
e
)))?;
}
}
}
VolumeCommands::Get { fingerprint } => {
output.info(&format!("Fetching volume {}...", fingerprint))?;
match client
.send_command(DaemonCommand::Volume(VolumeCommands::Get {
fingerprint: fingerprint.clone(),
}))
.await
{
Ok(DaemonResponse::Volume(volume)) => {
output.info(&format!("Volume: {}", volume.name))?;
output.info(&format!(" Fingerprint: {}", volume.fingerprint.0))?;
output.info(&format!(" Mount Point: {}", volume.mount_point.display()))?;
output.info(&format!(" File System: {}", volume.file_system))?;
output.info(&format!(
" Total Capacity: {} bytes",
volume.total_bytes_capacity
))?;
output.info(&format!(
" Available Space: {} bytes",
volume.total_bytes_available
))?;
output.info(&format!(
" Status: {}",
if volume.is_mounted {
"mounted"
} else {
"unmounted"
}
))?;
}
Ok(DaemonResponse::Error(msg)) => {
output.error(Message::Error(msg))?;
}
Ok(_) => {
output.error(Message::Error("Unexpected response".to_string()))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to communicate with daemon: {}",
e
)))?;
}
}
}
VolumeCommands::Track { fingerprint, name } => {
let name_display = name.as_deref().unwrap_or("(no custom name)");
output.info(&format!(
"Tracking volume {} as '{}'...",
fingerprint, name_display
))?;
match client
.send_command(DaemonCommand::Volume(VolumeCommands::Track {
fingerprint: fingerprint.clone(),
name: name.clone(),
}))
.await
{
Ok(DaemonResponse::ActionOutput(action_output)) => {
output.info(&format!("Successfully tracked volume {}", fingerprint))?;
output.info(&format!("Action completed: {}", action_output))?;
}
Ok(DaemonResponse::Error(msg)) => {
output.error(Message::Error(msg))?;
}
Ok(_) => {
output.error(Message::Error("Unexpected response".to_string()))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to communicate with daemon: {}",
e
)))?;
}
}
}
VolumeCommands::Untrack { fingerprint } => {
output.info(&format!("Untracking volume {}...", fingerprint))?;
match client
.send_command(DaemonCommand::Volume(VolumeCommands::Untrack {
fingerprint: fingerprint.clone(),
}))
.await
{
Ok(DaemonResponse::ActionOutput(action_output)) => {
output.info(&format!("Successfully untracked volume {}", fingerprint))?;
output.info(&format!("Action completed: {}", action_output))?;
}
Ok(DaemonResponse::Error(msg)) => {
output.error(Message::Error(msg))?;
}
Ok(_) => {
output.error(Message::Error("Unexpected response".to_string()))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to communicate with daemon: {}",
e
)))?;
}
}
}
VolumeCommands::SpeedTest { fingerprint } => {
output.info(&format!("Running speed test on volume {}...", fingerprint))?;
output.info("This may take a moment...")?;
match client
.send_command(DaemonCommand::Volume(VolumeCommands::SpeedTest {
fingerprint: fingerprint.clone(),
}))
.await
{
Ok(DaemonResponse::ActionOutput(action_output)) => {
output.info(&format!("Speed test completed for volume {}", fingerprint))?;
output.info(&format!("Action completed: {}", action_output))?;
}
Ok(DaemonResponse::Error(msg)) => {
output.error(Message::Error(msg))?;
}
Ok(_) => {
output.error(Message::Error("Unexpected response".to_string()))?;
}
Err(e) => {
output.error(Message::Error(format!(
"Failed to communicate with daemon: {}",
e
)))?;
}
}
}
VolumeCommands::Refresh => {
let daemon_cmd = DaemonCommand::Volume(VolumeCommands::Refresh);
let response = client.send_command(daemon_cmd).await?;
match response {
DaemonResponse::VolumeList(volumes) => {
println!(" Refreshed {} volume(s)", volumes.len());
}
DaemonResponse::Error(err) => {
return Err(format!("Failed to refresh volumes: {}", err).into());
}
_ => {
return Err("Unexpected response from daemon".into());
}
}
}
VolumeCommands::FixNames => {
let daemon_cmd = DaemonCommand::Volume(VolumeCommands::FixNames);
let response = client.send_command(daemon_cmd).await?;
match response {
DaemonResponse::Ok => {
println!("✅ Fixed display names for tracked volumes");
}
DaemonResponse::Error(err) => {
return Err(format!("Failed to fix display names: {}", err).into());
}
_ => {
return Err("Unexpected response from daemon".into());
}
}
}
}
Ok(())
}
/// Format bytes in a human-readable way
fn format_bytes(bytes: u64) -> String {
const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB", "PB"];
const THRESHOLD: u64 = 1024;
if bytes == 0 {
return "0 B".to_string();
}
let mut size = bytes as f64;
let mut unit_index = 0;
while size >= THRESHOLD as f64 && unit_index < UNITS.len() - 1 {
size /= THRESHOLD as f64;
unit_index += 1;
}
if unit_index == 0 {
format!("{} {}", bytes, UNITS[unit_index])
} else {
format!("{:.1} {}", size, UNITS[unit_index])
}
}

View File

@@ -4,23 +4,22 @@ use async_trait::async_trait;
use std::sync::Arc;
use crate::{
Core,
infrastructure::{
actions::{Action, manager::ActionManager},
cli::{
commands::VolumeCommands,
daemon::{
services::StateService,
types::{DaemonCommand, DaemonResponse},
},
},
},
operations::volumes::{
track::action::VolumeTrackAction,
untrack::action::VolumeUntrackAction,
speed_test::action::VolumeSpeedTestAction,
},
volume::VolumeFingerprint,
infrastructure::{
actions::{manager::ActionManager, Action},
cli::{
commands::VolumeCommands,
daemon::{
services::StateService,
types::{DaemonCommand, DaemonResponse},
},
},
},
operations::volumes::{
speed_test::action::VolumeSpeedTestAction, track::action::VolumeTrackAction,
untrack::action::VolumeUntrackAction,
},
volume::VolumeFingerprint,
Core,
};
use super::CommandHandler;
@@ -30,100 +29,175 @@ pub struct VolumeHandler;
#[async_trait]
impl CommandHandler for VolumeHandler {
async fn handle(
&self,
cmd: DaemonCommand,
core: &Arc<Core>,
_state_service: &Arc<StateService>,
) -> DaemonResponse {
match cmd {
DaemonCommand::Volume(volume_cmd) => match volume_cmd {
VolumeCommands::List => {
// Get all volumes
let volumes = core.volumes.get_all_volumes().await;
DaemonResponse::VolumeList(volumes)
}
VolumeCommands::Get { fingerprint } => {
let fingerprint = VolumeFingerprint(fingerprint);
match core.volumes.get_volume(&fingerprint).await {
Some(volume) => DaemonResponse::Volume(volume),
None => DaemonResponse::Error("Volume not found".to_string()),
}
}
VolumeCommands::Track { library_id, fingerprint, name } => {
let action = Action::VolumeTrack {
action: VolumeTrackAction {
fingerprint: VolumeFingerprint(fingerprint),
library_id,
name,
},
};
match core.context.get_action_manager().await {
Some(action_manager) => {
match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => DaemonResponse::Error(format!("Failed to track volume: {}", e)),
}
}
None => DaemonResponse::Error("Action manager not initialized".to_string()),
}
}
VolumeCommands::Untrack { library_id, fingerprint } => {
let action = Action::VolumeUntrack {
action: VolumeUntrackAction {
fingerprint: VolumeFingerprint(fingerprint),
library_id,
},
};
match core.context.get_action_manager().await {
Some(action_manager) => {
match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => DaemonResponse::Error(format!("Failed to untrack volume: {}", e)),
}
}
None => DaemonResponse::Error("Action manager not initialized".to_string()),
}
}
VolumeCommands::SpeedTest { fingerprint } => {
let action = Action::VolumeSpeedTest {
action: VolumeSpeedTestAction {
fingerprint: VolumeFingerprint(fingerprint),
},
};
match core.context.get_action_manager().await {
Some(action_manager) => {
match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => DaemonResponse::Error(format!("Failed to run speed test: {}", e)),
}
}
None => DaemonResponse::Error("Action manager not initialized".to_string()),
}
}
VolumeCommands::Refresh => {
match core.volumes.refresh_volumes().await {
Ok(_) => {
let volumes = core.volumes.get_all_volumes().await;
DaemonResponse::VolumeList(volumes)
}
Err(e) => DaemonResponse::Error(format!("Failed to refresh volumes: {}", e)),
}
}
},
_ => DaemonResponse::Error("Invalid command for volume handler".to_string()),
}
}
async fn handle(
&self,
cmd: DaemonCommand,
core: &Arc<Core>,
state_service: &Arc<StateService>,
) -> DaemonResponse {
match cmd {
DaemonCommand::Volume(volume_cmd) => match volume_cmd {
VolumeCommands::List => {
// Get all volumes
let volumes = core.volumes.get_all_volumes().await;
fn can_handle(&self, cmd: &DaemonCommand) -> bool {
matches!(cmd, DaemonCommand::Volume(_))
}
}
// Get current library to check track status
if let Some(library) = state_service.get_current_library(core).await {
// Get tracked volumes for this library
let tracked_volumes = match core.volumes.get_tracked_volumes(&library).await
{
Ok(tracked) => tracked,
Err(_) => Vec::new(),
};
// Create a map of fingerprint -> tracked info for quick lookup
let tracked_map: std::collections::HashMap<_, _> = tracked_volumes
.into_iter()
.map(|tv| (tv.fingerprint.clone(), tv))
.collect();
// Combine volume info with track status
let volume_info_list: Vec<_> = volumes
.into_iter()
.map(|volume| {
let is_tracked = tracked_map.contains_key(&volume.fingerprint);
let tracked_name = tracked_map
.get(&volume.fingerprint)
.and_then(|tv| tv.display_name.clone());
serde_json::json!({
"volume": volume,
"is_tracked": is_tracked,
"tracked_name": tracked_name
})
})
.collect();
DaemonResponse::VolumeListWithTracking(volume_info_list)
} else {
// No current library, just return basic volume list
DaemonResponse::VolumeList(volumes)
}
}
VolumeCommands::Get { fingerprint } => {
let fingerprint = VolumeFingerprint(fingerprint);
match core.volumes.get_volume(&fingerprint).await {
Some(volume) => DaemonResponse::Volume(volume),
None => DaemonResponse::Error("Volume not found".to_string()),
}
}
VolumeCommands::Track { fingerprint, name } => {
// Get current library from CLI state
if let Some(library) = state_service.get_current_library(core).await {
let library_id = library.id();
let action = Action::VolumeTrack {
action: VolumeTrackAction {
fingerprint: VolumeFingerprint(fingerprint),
library_id,
name,
},
};
match core.context.get_action_manager().await {
Some(action_manager) => match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => {
DaemonResponse::Error(format!("Failed to track volume: {}", e))
}
},
None => {
DaemonResponse::Error("Action manager not initialized".to_string())
}
}
} else {
DaemonResponse::Error("No current library set. Use 'spacedrive library switch <id>' to select a library.".to_string())
}
}
VolumeCommands::Untrack { fingerprint } => {
// Get current library from CLI state
if let Some(library) = state_service.get_current_library(core).await {
let library_id = library.id();
let action = Action::VolumeUntrack {
action: VolumeUntrackAction {
fingerprint: VolumeFingerprint(fingerprint),
library_id,
},
};
match core.context.get_action_manager().await {
Some(action_manager) => match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => DaemonResponse::Error(format!(
"Failed to untrack volume: {}",
e
)),
},
None => {
DaemonResponse::Error("Action manager not initialized".to_string())
}
}
} else {
DaemonResponse::Error("No current library set. Use 'spacedrive library switch <id>' to select a library.".to_string())
}
}
VolumeCommands::SpeedTest { fingerprint } => {
let action = Action::VolumeSpeedTest {
action: VolumeSpeedTestAction {
fingerprint: VolumeFingerprint(fingerprint),
},
};
match core.context.get_action_manager().await {
Some(action_manager) => match action_manager.dispatch(action).await {
Ok(output) => DaemonResponse::ActionOutput(output),
Err(e) => {
DaemonResponse::Error(format!("Failed to run speed test: {}", e))
}
},
None => DaemonResponse::Error("Action manager not initialized".to_string()),
}
}
VolumeCommands::Refresh => match core.volumes.refresh_volumes().await {
Ok(_) => {
let volumes = core.volumes.get_all_volumes().await;
DaemonResponse::VolumeList(volumes)
}
Err(e) => DaemonResponse::Error(format!("Failed to refresh volumes: {}", e)),
},
VolumeCommands::FixNames => {
if let Some(library) = state_service.get_current_library(core).await {
match core.volumes.update_empty_display_names(&library).await {
Ok(count) => {
if count > 0 {
tracing::info!("Updated display names for {} volumes", count);
} else {
tracing::info!("No volumes with empty display names found");
}
DaemonResponse::Ok
}
Err(e) => DaemonResponse::Error(format!(
"Failed to update display names: {}",
e
)),
}
} else {
DaemonResponse::Error("No library selected".to_string())
}
}
},
_ => DaemonResponse::Error("Invalid command for volume handler".to_string()),
}
}
fn can_handle(&self, cmd: &DaemonCommand) -> bool {
matches!(cmd, DaemonCommand::Volume(_))
}
}

View File

@@ -156,6 +156,11 @@ impl Daemon {
let listener = UnixListener::bind(&self.config.socket_path)?;
info!("Daemon listening on {:?}", self.config.socket_path);
// Emit CoreStarted event to signal daemon is ready
self.core
.events
.emit(crate::infrastructure::events::Event::CoreStarted);
// Set up shutdown channel
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
*self.shutdown_tx.lock().await = Some(shutdown_tx);
@@ -270,6 +275,49 @@ impl Daemon {
Ok(())
}
/// Wait for daemon to be ready by attempting to connect
pub async fn wait_for_ready(
instance_name: Option<String>,
timeout_secs: u64,
) -> Result<bool, Box<dyn std::error::Error>> {
let config = DaemonConfig::new(instance_name);
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(timeout_secs);
while start.elapsed() < timeout {
// Try to connect to the socket
if let Ok(mut stream) = UnixStream::connect(&config.socket_path).await {
// Try to send a simple ping command to verify it's responsive
let cmd = DaemonCommand::Ping;
let json = serde_json::to_string(&cmd)?;
if stream
.write_all(format!("{}\n", json).as_bytes())
.await
.is_ok()
{
if stream.flush().await.is_ok() {
// Try to read the response
let mut reader = BufReader::new(stream);
let mut line = String::new();
if reader.read_line(&mut line).await.is_ok() {
if let Ok(response) = serde_json::from_str::<DaemonResponse>(&line) {
if matches!(response, DaemonResponse::Pong) {
return Ok(true);
}
}
}
}
}
}
// Wait a bit before retrying
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok(false)
}
/// List all daemon instances
pub fn list_instances() -> Result<Vec<DaemonInstance>, Box<dyn std::error::Error>> {
let runtime_dir = dirs::runtime_dir()

View File

@@ -4,11 +4,10 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use uuid::Uuid;
use super::common::{BrowseEntry, ConnectedDeviceInfo, JobInfo, LibraryInfo, LocationInfo, PairingRequestInfo};
use crate::{
infrastructure::actions::output::ActionOutput,
volume::Volume,
use super::common::{
BrowseEntry, ConnectedDeviceInfo, JobInfo, LibraryInfo, LocationInfo, PairingRequestInfo,
};
use crate::{infrastructure::actions::output::ActionOutput, volume::Volume};
/// Responses from the daemon
#[derive(Debug, Serialize, Deserialize)]
@@ -63,11 +62,12 @@ pub enum DaemonResponse {
remote_device: Option<ConnectedDeviceInfo>,
},
PendingPairings(Vec<PairingRequestInfo>),
// Volume responses
VolumeList(Vec<Volume>),
VolumeListWithTracking(Vec<serde_json::Value>),
Volume(Volume),
// Action output (generic for all action results)
ActionOutput(ActionOutput),
}
@@ -79,4 +79,4 @@ pub struct DaemonStatus {
pub current_library: Option<Uuid>,
pub active_jobs: usize,
pub total_locations: usize,
}
}

View File

@@ -14,6 +14,7 @@ use crate::infrastructure::cli::commands::{
location::{handle_location_command, LocationCommands},
network::{handle_network_command, NetworkCommands},
system::{handle_system_command, SystemCommands},
volume::{handle_volume_command, VolumeCommands},
};
use crate::infrastructure::cli::output::{CliOutput, Message};
use clap::{Parser, Subcommand};
@@ -62,7 +63,7 @@ pub enum Commands {
/// Start the Spacedrive daemon in the background
Start {
/// Run in foreground instead of daemonizing
#[arg(short, long)]
#[arg(long)]
foreground: bool,
/// Enable networking on startup
#[arg(long)]
@@ -102,6 +103,10 @@ pub enum Commands {
/// System monitoring and information
#[command(subcommand)]
System(SystemCommands),
/// Volume management and operations
#[command(subcommand)]
Volume(VolumeCommands),
}
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
@@ -267,6 +272,14 @@ pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
}
}
}
Commands::Volume(volume_cmd) => {
// Check if daemon is running
if !daemon::Daemon::is_running_instance(cli.instance.clone()) {
print_daemon_not_running(&cli.instance, &mut output)?;
return Ok(());
}
handle_volume_command(volume_cmd.clone(), cli.instance.clone(), output).await
}
}
}

View File

@@ -7,52 +7,67 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "volumes")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub uuid: Uuid,
pub fingerprint: String,
pub display_name: Option<String>,
pub tracked_at: DateTimeUtc,
pub last_seen_at: DateTimeUtc,
pub is_online: bool,
pub total_capacity: Option<i64>,
pub available_capacity: Option<i64>,
pub read_speed_mbps: Option<i32>,
pub write_speed_mbps: Option<i32>,
pub last_speed_test_at: Option<DateTimeUtc>,
pub file_system: Option<String>,
pub mount_point: Option<String>,
pub is_removable: Option<bool>,
pub is_network_drive: Option<bool>,
pub device_model: Option<String>,
#[sea_orm(primary_key)]
pub id: i32,
pub uuid: Uuid,
pub device_id: Uuid, // Foreign key to devices table
pub fingerprint: String,
pub display_name: Option<String>,
pub tracked_at: DateTimeUtc,
pub last_seen_at: DateTimeUtc,
pub is_online: bool,
pub total_capacity: Option<i64>,
pub available_capacity: Option<i64>,
pub read_speed_mbps: Option<i32>,
pub write_speed_mbps: Option<i32>,
pub last_speed_test_at: Option<DateTimeUtc>,
pub file_system: Option<String>,
pub mount_point: Option<String>,
pub is_removable: Option<bool>,
pub is_network_drive: Option<bool>,
pub device_model: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
pub enum Relation {
#[sea_orm(
belongs_to = "super::device::Entity",
from = "Column::DeviceId",
to = "super::device::Column::Uuid"
)]
Device,
}
impl Related<super::device::Entity> for Entity {
fn to() -> RelationDef {
Relation::Device.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
/// Convert database model to tracked volume
pub fn to_tracked_volume(&self) -> TrackedVolume {
TrackedVolume {
id: self.id,
uuid: self.uuid,
fingerprint: crate::volume::VolumeFingerprint(self.fingerprint.clone()),
display_name: self.display_name.clone(),
tracked_at: self.tracked_at,
last_seen_at: self.last_seen_at,
is_online: self.is_online,
total_capacity: self.total_capacity.map(|c| c as u64),
available_capacity: self.available_capacity.map(|c| c as u64),
read_speed_mbps: self.read_speed_mbps.map(|s| s as u32),
write_speed_mbps: self.write_speed_mbps.map(|s| s as u32),
last_speed_test_at: self.last_speed_test_at,
file_system: self.file_system.clone(),
mount_point: self.mount_point.clone(),
is_removable: self.is_removable,
is_network_drive: self.is_network_drive,
device_model: self.device_model.clone(),
}
}
}
/// Convert database model to tracked volume
pub fn to_tracked_volume(&self) -> TrackedVolume {
TrackedVolume {
id: self.id,
uuid: self.uuid,
device_id: self.device_id,
fingerprint: crate::volume::VolumeFingerprint(self.fingerprint.clone()),
display_name: self.display_name.clone(),
tracked_at: self.tracked_at,
last_seen_at: self.last_seen_at,
is_online: self.is_online,
total_capacity: self.total_capacity.map(|c| c as u64),
available_capacity: self.available_capacity.map(|c| c as u64),
read_speed_mbps: self.read_speed_mbps.map(|s| s as u32),
write_speed_mbps: self.write_speed_mbps.map(|s| s as u32),
last_speed_test_at: self.last_speed_test_at,
file_system: self.file_system.clone(),
mount_point: self.mount_point.clone(),
is_removable: self.is_removable,
is_network_drive: self.is_network_drive,
device_model: self.device_model.clone(),
}
}
}

View File

@@ -7,115 +7,135 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Create volumes table
manager
.create_table(
Table::create()
.table(Volumes::Table)
.if_not_exists()
.col(
ColumnDef::new(Volumes::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Volumes::Uuid).text().not_null().unique_key())
.col(ColumnDef::new(Volumes::Fingerprint).text().not_null())
.col(ColumnDef::new(Volumes::DisplayName).text())
.col(
ColumnDef::new(Volumes::TrackedAt)
.timestamp()
.not_null(),
)
.col(
ColumnDef::new(Volumes::LastSeenAt)
.timestamp()
.not_null(),
)
.col(
ColumnDef::new(Volumes::IsOnline)
.boolean()
.not_null()
.default(true),
)
.col(ColumnDef::new(Volumes::TotalCapacity).big_integer())
.col(ColumnDef::new(Volumes::AvailableCapacity).big_integer())
.col(ColumnDef::new(Volumes::ReadSpeedMbps).integer())
.col(ColumnDef::new(Volumes::WriteSpeedMbps).integer())
.col(ColumnDef::new(Volumes::LastSpeedTestAt).timestamp())
.col(ColumnDef::new(Volumes::FileSystem).text())
.col(ColumnDef::new(Volumes::MountPoint).text())
.col(ColumnDef::new(Volumes::IsRemovable).boolean())
.col(ColumnDef::new(Volumes::IsNetworkDrive).boolean())
.col(ColumnDef::new(Volumes::DeviceModel).text())
.to_owned(),
)
.await?;
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Create volumes table
manager
.create_table(
Table::create()
.table(Volumes::Table)
.if_not_exists()
.col(
ColumnDef::new(Volumes::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Volumes::Uuid).text().not_null().unique_key())
.col(ColumnDef::new(Volumes::DeviceId).text().not_null()) // Device this volume belongs to
.col(ColumnDef::new(Volumes::Fingerprint).text().not_null())
.col(ColumnDef::new(Volumes::DisplayName).text())
.col(ColumnDef::new(Volumes::TrackedAt).timestamp().not_null())
.col(ColumnDef::new(Volumes::LastSeenAt).timestamp().not_null())
.col(
ColumnDef::new(Volumes::IsOnline)
.boolean()
.not_null()
.default(true),
)
.col(ColumnDef::new(Volumes::TotalCapacity).big_integer())
.col(ColumnDef::new(Volumes::AvailableCapacity).big_integer())
.col(ColumnDef::new(Volumes::ReadSpeedMbps).integer())
.col(ColumnDef::new(Volumes::WriteSpeedMbps).integer())
.col(ColumnDef::new(Volumes::LastSpeedTestAt).timestamp())
.col(ColumnDef::new(Volumes::FileSystem).text())
.col(ColumnDef::new(Volumes::MountPoint).text())
.col(ColumnDef::new(Volumes::IsRemovable).boolean())
.col(ColumnDef::new(Volumes::IsNetworkDrive).boolean())
.col(ColumnDef::new(Volumes::DeviceModel).text())
.foreign_key(
ForeignKey::create()
.name("fk_volumes_device_id")
.from(Volumes::Table, Volumes::DeviceId)
.to(Devices::Table, Devices::Uuid)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
// Create unique index on fingerprint (since each library tracks volumes independently)
manager
.create_index(
Index::create()
.name("idx_volume_fingerprint_unique")
.table(Volumes::Table)
.col(Volumes::Fingerprint)
.unique()
.to_owned(),
)
.await?;
// Create unique index on device_id + fingerprint (volumes are unique per device)
manager
.create_index(
Index::create()
.name("idx_volume_device_fingerprint_unique")
.table(Volumes::Table)
.col(Volumes::DeviceId)
.col(Volumes::Fingerprint)
.unique()
.to_owned(),
)
.await?;
// Create index on last_seen_at for efficient queries
manager
.create_index(
Index::create()
.name("idx_volume_last_seen_at")
.table(Volumes::Table)
.col(Volumes::LastSeenAt)
.to_owned(),
)
.await?;
// Create index on device_id for efficient device queries
manager
.create_index(
Index::create()
.name("idx_volume_device_id")
.table(Volumes::Table)
.col(Volumes::DeviceId)
.to_owned(),
)
.await?;
// Create index on is_online for filtering
manager
.create_index(
Index::create()
.name("idx_volume_is_online")
.table(Volumes::Table)
.col(Volumes::IsOnline)
.to_owned(),
)
.await?;
// Create index on last_seen_at for efficient queries
manager
.create_index(
Index::create()
.name("idx_volume_last_seen_at")
.table(Volumes::Table)
.col(Volumes::LastSeenAt)
.to_owned(),
)
.await?;
Ok(())
}
// Create index on is_online for filtering
manager
.create_index(
Index::create()
.name("idx_volume_is_online")
.table(Volumes::Table)
.col(Volumes::IsOnline)
.to_owned(),
)
.await?;
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Volumes::Table).to_owned())
.await
}
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Volumes::Table).to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Volumes {
Table,
Id,
Uuid,
Fingerprint,
DisplayName,
TrackedAt,
LastSeenAt,
IsOnline,
TotalCapacity,
AvailableCapacity,
ReadSpeedMbps,
WriteSpeedMbps,
LastSpeedTestAt,
FileSystem,
MountPoint,
IsRemovable,
IsNetworkDrive,
DeviceModel,
}
Table,
Id,
Uuid,
DeviceId,
Fingerprint,
DisplayName,
TrackedAt,
LastSeenAt,
IsOnline,
TotalCapacity,
AvailableCapacity,
ReadSpeedMbps,
WriteSpeedMbps,
LastSpeedTestAt,
FileSystem,
MountPoint,
IsRemovable,
IsNetworkDrive,
DeviceModel,
}
#[derive(DeriveIden)]
enum Devices {
Table,
Uuid,
}

View File

@@ -2,19 +2,19 @@
use sea_orm_migration::prelude::*;
mod m20240101_000001_create_initial_tables;
mod m20240102_000001_add_audit_log;
mod m20240103_000001_create_volumes;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20240101_000001_create_initial_tables::Migration),
Box::new(m20240102_000001_add_audit_log::Migration),
Box::new(m20240103_000001_create_volumes::Migration),
]
}
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20240101_000001_create_initial_tables::Migration),
Box::new(m20240102_000001_add_audit_log::Migration),
Box::new(m20240103_000001_create_volumes::Migration),
]
}
}
mod m20240101_000001_create_initial_tables;
mod m20240102_000001_add_audit_log;
mod m20240103_000001_create_volumes;

View File

@@ -158,7 +158,8 @@ impl Core {
// 4. Initialize volume manager
let volume_config = VolumeDetectionConfig::default();
let volumes = Arc::new(VolumeManager::new(volume_config, events.clone()));
let device_id = device.device_id()?;
let volumes = Arc::new(VolumeManager::new(device_id, volume_config, events.clone()));
// 5. Initialize volume detection
info!("Initializing volume detection...");

View File

@@ -3,41 +3,40 @@
//! This action tests the read/write performance of a volume.
use crate::{
infrastructure::actions::{error::ActionError, output::ActionOutput},
volume::VolumeFingerprint,
infrastructure::actions::{error::ActionError, output::ActionOutput},
volume::VolumeFingerprint,
};
use serde::{Deserialize, Serialize};
/// Input for volume speed testing
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolumeSpeedTestAction {
/// The fingerprint of the volume to test
pub fingerprint: VolumeFingerprint,
/// The fingerprint of the volume to test
pub fingerprint: VolumeFingerprint,
}
impl VolumeSpeedTestAction {
/// Execute the volume speed test action
pub async fn execute(
&self,
core: &crate::Core,
) -> Result<ActionOutput, ActionError> {
// Run the speed test through the volume manager
core.volumes
.run_speed_test(&self.fingerprint)
.await
.map_err(|e| ActionError::Internal(e.to_string()))?;
// Get the updated volume with speed test results
let volume = core
.volumes
.get_volume(&self.fingerprint)
.await
.ok_or_else(|| ActionError::InvalidInput("Volume not found after speed test".to_string()))?;
Ok(ActionOutput::VolumeSpeedTested {
fingerprint: self.fingerprint.clone(),
read_speed_mbps: volume.read_speed_mbps.map(|v| v as u32),
write_speed_mbps: volume.write_speed_mbps.map(|v| v as u32),
})
}
}
/// Execute the volume speed test action
pub async fn execute(&self, core: &crate::Core) -> Result<ActionOutput, ActionError> {
// Run the speed test through the volume manager
core.volumes
.run_speed_test(&self.fingerprint)
.await
.map_err(|e| ActionError::Internal(e.to_string()))?;
// Get the updated volume with speed test results
let volume = core
.volumes
.get_volume(&self.fingerprint)
.await
.ok_or_else(|| {
ActionError::InvalidInput("Volume not found after speed test".to_string())
})?;
Ok(ActionOutput::VolumeSpeedTested {
fingerprint: self.fingerprint.clone(),
read_speed_mbps: volume.read_speed_mbps.map(|v| v as u32),
write_speed_mbps: volume.write_speed_mbps.map(|v| v as u32),
})
}
}

View File

@@ -1,10 +1,13 @@
//! Handler for volume speed test action
use crate::{
context::CoreContext,
infrastructure::actions::{
Action, error::{ActionError, ActionResult}, handler::ActionHandler, output::ActionOutput,
},
context::CoreContext,
infrastructure::actions::{
error::{ActionError, ActionResult},
handler::ActionHandler,
output::ActionOutput,
Action,
},
};
use async_trait::async_trait;
use std::sync::Arc;
@@ -12,55 +15,77 @@ use std::sync::Arc;
pub struct VolumeSpeedTestHandler;
impl VolumeSpeedTestHandler {
pub fn new() -> Self {
Self
}
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl ActionHandler for VolumeSpeedTestHandler {
async fn execute(
&self,
context: Arc<CoreContext>,
action: Action,
) -> ActionResult<ActionOutput> {
match action {
Action::VolumeSpeedTest { action } => {
// Run speed test through volume manager
context
.volume_manager
.run_speed_test(&action.fingerprint)
.await
.map_err(|e| ActionError::Internal(e.to_string()))?;
// Get updated volume with results
let volume = context
.volume_manager
.get_volume(&action.fingerprint)
.await
.ok_or_else(|| ActionError::InvalidInput("Volume not found after speed test".to_string()))?;
Ok(ActionOutput::VolumeSpeedTested {
fingerprint: action.fingerprint,
read_speed_mbps: volume.read_speed_mbps.map(|v| v as u32),
write_speed_mbps: volume.write_speed_mbps.map(|v| v as u32),
})
}
_ => Err(ActionError::InvalidActionType),
}
}
fn can_handle(&self, action: &Action) -> bool {
matches!(action, Action::VolumeSpeedTest { .. })
}
fn supported_actions() -> &'static [&'static str]
where
Self: Sized
{
&["volume.speed_test"]
}
async fn execute(
&self,
context: Arc<CoreContext>,
action: Action,
) -> ActionResult<ActionOutput> {
match action {
Action::VolumeSpeedTest { action } => {
// Run speed test through volume manager
context
.volume_manager
.run_speed_test(&action.fingerprint)
.await
.map_err(|e| ActionError::Internal(e.to_string()))?;
// Get updated volume with results
let volume = context
.volume_manager
.get_volume(&action.fingerprint)
.await
.ok_or_else(|| {
ActionError::InvalidInput("Volume not found after speed test".to_string())
})?;
// Extract speed test results
let read_speed = volume.read_speed_mbps.unwrap_or(0);
let write_speed = volume.write_speed_mbps.unwrap_or(0);
// Save results to database for all libraries where this volume is tracked
let libraries = context.library_manager.get_open_libraries().await;
if let Err(e) = context
.volume_manager
.save_speed_test_results(
&action.fingerprint,
read_speed,
write_speed,
&libraries,
)
.await
{
// Log error but don't fail the action since the speed test itself succeeded
tracing::warn!("Failed to save speed test results to database: {}", e);
}
Ok(ActionOutput::VolumeSpeedTested {
fingerprint: action.fingerprint,
read_speed_mbps: Some(read_speed as u32),
write_speed_mbps: Some(write_speed as u32),
})
}
_ => Err(ActionError::InvalidActionType),
}
}
fn can_handle(&self, action: &Action) -> bool {
matches!(action, Action::VolumeSpeedTest { .. })
}
fn supported_actions() -> &'static [&'static str]
where
Self: Sized,
{
&["volume.speed_test"]
}
}
// Register the handler
crate::register_action_handler!(VolumeSpeedTestHandler, "volume.speed_test");
crate::register_action_handler!(VolumeSpeedTestHandler, "volume.speed_test");

View File

@@ -165,7 +165,6 @@ impl FileSharingService {
"No active library for job dispatch".to_string(),
))?;
// Create and dispatch the FileCopyJob
let job_manager = library.jobs();
let sources = files.into_iter().map(SdPath::local).collect();
@@ -323,9 +322,10 @@ impl FileSharingService {
.library_manager
.get_library(*library_id)
.await
.ok_or(SharingError::JobError(
format!("Library {} not found", library_id),
))?;
.ok_or(SharingError::JobError(format!(
"Library {} not found",
library_id
)))?;
let job_manager = library.jobs();
@@ -400,9 +400,10 @@ impl FileSharingService {
.library_manager
.get_library(*library_id)
.await
.ok_or(SharingError::JobError(
format!("Library {} not found", library_id),
))?;
.ok_or(SharingError::JobError(format!(
"Library {} not found",
library_id
)))?;
let job_manager = library.jobs();
@@ -523,6 +524,7 @@ mod tests {
events.clone(),
));
let volume_manager = Arc::new(crate::volume::VolumeManager::new(
uuid::Uuid::new_v4(), // Test device ID
crate::volume::VolumeDetectionConfig::default(),
events.clone(),
));
@@ -556,6 +558,7 @@ mod tests {
events.clone(),
));
let volume_manager = Arc::new(crate::volume::VolumeManager::new(
uuid::Uuid::new_v4(), // Test device ID
crate::volume::VolumeDetectionConfig::default(),
events.clone(),
));

View File

@@ -19,6 +19,9 @@ use uuid::Uuid;
/// Central manager for volume detection, monitoring, and operations
pub struct VolumeManager {
/// Device ID for this manager
device_id: uuid::Uuid,
/// Currently known volumes, indexed by fingerprint
volumes: Arc<RwLock<HashMap<VolumeFingerprint, Volume>>>,
@@ -39,9 +42,14 @@ pub struct VolumeManager {
}
impl VolumeManager {
/// Create a new volume manager
pub fn new(config: VolumeDetectionConfig, events: Arc<EventBus>) -> Self {
/// Create a new VolumeManager instance
pub fn new(
device_id: uuid::Uuid,
config: VolumeDetectionConfig,
events: Arc<EventBus>,
) -> Self {
Self {
device_id,
volumes: Arc::new(RwLock::new(HashMap::new())),
path_cache: Arc::new(RwLock::new(HashMap::new())),
config,
@@ -91,6 +99,7 @@ impl VolumeManager {
let events = self.events.clone();
let config = self.config.clone();
let is_monitoring = self.is_monitoring.clone();
let device_id = self.device_id;
tokio::spawn(async move {
info!(
@@ -104,8 +113,14 @@ impl VolumeManager {
while *is_monitoring.read().await {
interval.tick().await;
if let Err(e) =
Self::refresh_volumes_internal(&volumes, &path_cache, &events, &config).await
if let Err(e) = Self::refresh_volumes_internal(
device_id,
&volumes,
&path_cache,
&events,
&config,
)
.await
{
error!("Error during volume refresh: {}", e);
}
@@ -124,21 +139,28 @@ impl VolumeManager {
/// Refresh all volumes and detect changes
#[instrument(skip(self))]
pub async fn refresh_volumes(&self) -> VolumeResult<()> {
Self::refresh_volumes_internal(&self.volumes, &self.path_cache, &self.events, &self.config)
.await
Self::refresh_volumes_internal(
self.device_id,
&self.volumes,
&self.path_cache,
&self.events,
&self.config,
)
.await
}
/// Internal implementation of volume refresh
async fn refresh_volumes_internal(
device_id: uuid::Uuid,
volumes: &Arc<RwLock<HashMap<VolumeFingerprint, Volume>>>,
path_cache: &Arc<RwLock<HashMap<PathBuf, VolumeFingerprint>>>,
events: &Arc<EventBus>,
config: &VolumeDetectionConfig,
) -> VolumeResult<()> {
debug!("Refreshing volumes");
debug!("Refreshing volumes for device {}", device_id);
// Detect current volumes
let detected_volumes = os_detection::detect_volumes(config).await?;
let detected_volumes = os_detection::detect_volumes(device_id, config).await?;
let mut current_volumes = volumes.write().await;
let mut cache = path_cache.write().await;
@@ -378,7 +400,7 @@ impl VolumeManager {
display_name: Option<String>,
) -> VolumeResult<entities::volume::Model> {
let db = library.db().conn();
// Check if already tracked
if let Some(existing) = entities::volume::Entity::find()
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
@@ -388,18 +410,22 @@ impl VolumeManager {
{
return Err(VolumeError::AlreadyTracked(fingerprint.to_string()));
}
// Get current volume info
let volume = self.get_volume(fingerprint).await
let volume = self
.get_volume(fingerprint)
.await
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?;
// Determine removability and network status
let is_removable = matches!(volume.mount_type, crate::volume::types::MountType::External);
let is_network_drive = matches!(volume.mount_type, crate::volume::types::MountType::Network);
let is_network_drive =
matches!(volume.mount_type, crate::volume::types::MountType::Network);
// Create tracking record
let active_model = entities::volume::ActiveModel {
uuid: Set(Uuid::new_v4()),
device_id: Set(volume.device_id), // Use Uuid directly
fingerprint: Set(fingerprint.0.clone()),
display_name: Set(display_name.clone()),
tracked_at: Set(chrono::Utc::now()),
@@ -417,29 +443,28 @@ impl VolumeManager {
device_model: Set(volume.hardware_id.clone()),
..Default::default()
};
let model = active_model
.insert(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
info!(
"Tracked volume '{}' for library '{}'",
display_name.as_ref().unwrap_or(&volume.name),
library.name().await
);
// Emit tracking event
self.events
.emit(Event::Custom {
event_type: "VolumeTracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
"display_name": display_name,
}),
});
self.events.emit(Event::Custom {
event_type: "VolumeTracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
"display_name": display_name,
}),
});
Ok(model)
}
@@ -450,33 +475,32 @@ impl VolumeManager {
fingerprint: &VolumeFingerprint,
) -> VolumeResult<()> {
let db = library.db().conn();
let result = entities::volume::Entity::delete_many()
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.exec(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
if result.rows_affected == 0 {
return Err(VolumeError::NotTracked(fingerprint.to_string()));
}
info!(
"Untracked volume '{}' from library '{}'",
fingerprint.to_string(),
library.name().await
);
// Emit untracking event
self.events
.emit(Event::Custom {
event_type: "VolumeUntracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
}),
});
self.events.emit(Event::Custom {
event_type: "VolumeUntracked".to_string(),
data: serde_json::json!({
"library_id": library.id(),
"volume_fingerprint": fingerprint.to_string(),
}),
});
Ok(())
}
@@ -486,23 +510,23 @@ impl VolumeManager {
library: &crate::library::Library,
) -> VolumeResult<Vec<TrackedVolume>> {
let db = library.db().conn();
let volumes = entities::volume::Entity::find()
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let tracked_volumes: Vec<TrackedVolume> = volumes
.into_iter()
.map(|model| model.to_tracked_volume())
.collect();
debug!(
"Found {} tracked volumes for library '{}'",
tracked_volumes.len(),
library.name().await
);
Ok(tracked_volumes)
}
@@ -513,16 +537,23 @@ impl VolumeManager {
fingerprint: &VolumeFingerprint,
) -> VolumeResult<bool> {
let db = library.db().conn();
// Get the volume to find its device_id
let volume = self
.get_volume(fingerprint)
.await
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?;
let count = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.count(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
Ok(count > 0)
}
/// Update tracked volume state during refresh
pub async fn update_tracked_volume_state(
&self,
@@ -531,28 +562,81 @@ impl VolumeManager {
volume: &Volume,
) -> VolumeResult<()> {
let db = library.db().conn();
let mut active_model: entities::volume::ActiveModel = entities::volume::Entity::find()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.one(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?
.ok_or_else(|| VolumeError::NotTracked(fingerprint.to_string()))?
.into();
active_model.last_seen_at = Set(chrono::Utc::now());
active_model.is_online = Set(volume.is_mounted);
active_model.total_capacity = Set(Some(volume.total_bytes_capacity as i64));
active_model.available_capacity = Set(Some(volume.total_bytes_available as i64));
active_model
.update(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
Ok(())
}
/// Update display names for tracked volumes that have empty display names
pub async fn update_empty_display_names(
&self,
library: &crate::library::Library,
) -> VolumeResult<usize> {
let db = library.db().conn();
// Find tracked volumes with empty display names
let volumes_to_update = entities::volume::Entity::find()
.filter(
entities::volume::Column::DisplayName
.is_null()
.or(entities::volume::Column::DisplayName.eq("")),
)
.all(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
let mut updated_count = 0;
for tracked_volume in volumes_to_update {
let fingerprint = VolumeFingerprint(tracked_volume.fingerprint.clone());
// Get the current volume info to get the name
if let Some(volume) = self.get_volume(&fingerprint).await {
let volume_name = volume.name.clone();
let mut active_model: entities::volume::ActiveModel = tracked_volume.into();
active_model.display_name = Set(Some(volume.name));
match active_model.update(db).await {
Ok(_) => {
updated_count += 1;
info!("Updated display name for volume: {}", volume_name);
}
Err(e) => {
warn!(
"Failed to update display name for volume {}: {}",
fingerprint.0, e
);
}
}
}
}
info!(
"Updated display names for {} volumes in library '{}'",
updated_count,
library.name().await
);
Ok(updated_count)
}
/// Get all system volumes (boot/OS volumes)
pub async fn get_system_volumes(&self) -> Vec<Volume> {
self.volumes
@@ -563,7 +647,7 @@ impl VolumeManager {
.cloned()
.collect()
}
/// Automatically track system volumes for a library
pub async fn auto_track_system_volumes(
&self,
@@ -571,25 +655,25 @@ impl VolumeManager {
) -> VolumeResult<Vec<entities::volume::Model>> {
let system_volumes = self.get_system_volumes().await;
let mut tracked_volumes = Vec::new();
info!(
"Auto-tracking {} system volumes for library '{}'",
system_volumes.len(),
library.name().await
);
for volume in system_volumes {
// Skip if already tracked
if self.is_volume_tracked(library, &volume.fingerprint).await? {
debug!(
"System volume '{}' already tracked in library",
volume.name
);
debug!("System volume '{}' already tracked in library", volume.name);
continue;
}
// Track the system volume
match self.track_volume(library, &volume.fingerprint, None).await {
match self
.track_volume(library, &volume.fingerprint, Some(volume.name.clone()))
.await
{
Ok(tracked) => {
info!(
"Auto-tracked system volume '{}' in library '{}'",
@@ -606,9 +690,59 @@ impl VolumeManager {
}
}
}
Ok(tracked_volumes)
}
/// Save speed test results to all libraries where this volume is tracked
pub async fn save_speed_test_results(
&self,
fingerprint: &VolumeFingerprint,
read_speed_mbps: u64,
write_speed_mbps: u64,
libraries: &[Arc<crate::library::Library>],
) -> VolumeResult<()> {
for library in libraries {
// Check if this volume is tracked in this library
if self.is_volume_tracked(library, fingerprint).await? {
let db = library.db().conn();
// Get the volume to find its device_id
let volume = self
.get_volume(fingerprint)
.await
.ok_or_else(|| VolumeError::NotFound(fingerprint.to_string()))?;
// Update the tracked volume record with speed test results
let now = chrono::Utc::now();
let update_result = entities::volume::Entity::update_many()
.filter(entities::volume::Column::DeviceId.eq(volume.device_id))
.filter(entities::volume::Column::Fingerprint.eq(fingerprint.0.clone()))
.set(entities::volume::ActiveModel {
read_speed_mbps: Set(Some(read_speed_mbps as i32)),
write_speed_mbps: Set(Some(write_speed_mbps as i32)),
last_speed_test_at: Set(Some(now)),
..Default::default()
})
.exec(db)
.await
.map_err(|e| VolumeError::Database(e.to_string()))?;
if update_result.rows_affected > 0 {
info!(
"Saved speed test results for volume {} in library {}: {}MB/s read, {}MB/s write",
fingerprint.0,
library.name().await,
read_speed_mbps,
write_speed_mbps
);
}
}
}
Ok(())
}
}
/// Statistics about detected volumes
@@ -644,7 +778,7 @@ mod tests {
async fn test_volume_manager_creation() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(config, events);
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
let stats = manager.get_statistics().await;
assert_eq!(stats.total_volumes, 0);
@@ -654,7 +788,7 @@ mod tests {
async fn test_volume_path_lookup() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(config, events);
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
// Initially no volumes
let volume = manager
@@ -667,7 +801,7 @@ mod tests {
async fn test_same_volume_check() {
let config = VolumeDetectionConfig::default();
let events = create_test_events();
let manager = VolumeManager::new(config, events);
let manager = VolumeManager::new(Uuid::new_v4(), config, events);
// Both paths don't exist, so should return false
let same = manager

View File

@@ -1,330 +1,432 @@
//! Platform-specific volume detection
use crate::volume::{
error::{VolumeError, VolumeResult},
types::{DiskType, FileSystem, MountType, Volume, VolumeDetectionConfig},
error::{VolumeError, VolumeResult},
types::{DiskType, FileSystem, MountType, Volume, VolumeDetectionConfig},
};
use std::path::PathBuf;
use tokio::task;
use tracing::{debug, warn, instrument};
use tracing::{debug, instrument, warn};
/// Detect all volumes on the current system
#[instrument(skip(config))]
pub async fn detect_volumes(config: &VolumeDetectionConfig) -> VolumeResult<Vec<Volume>> {
debug!("Starting volume detection");
#[cfg(target_os = "macos")]
let volumes = macos::detect_volumes(config).await?;
#[cfg(target_os = "linux")]
let volumes = linux::detect_volumes(config).await?;
#[cfg(target_os = "windows")]
let volumes = windows::detect_volumes(config).await?;
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
let volumes = unsupported::detect_volumes(config).await?;
debug!("Detected {} volumes", volumes.len());
Ok(volumes)
pub async fn detect_volumes(
device_id: uuid::Uuid,
config: &VolumeDetectionConfig,
) -> VolumeResult<Vec<Volume>> {
debug!("Starting volume detection for device {}", device_id);
#[cfg(target_os = "macos")]
let volumes = macos::detect_volumes(device_id, config).await?;
#[cfg(target_os = "linux")]
let volumes = linux::detect_volumes(device_id, config).await?;
#[cfg(target_os = "windows")]
let volumes = windows::detect_volumes(device_id, config).await?;
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
let volumes = unsupported::detect_volumes(device_id, config).await?;
debug!(
"Detected {} volumes for device {}",
volumes.len(),
device_id
);
Ok(volumes)
}
#[cfg(target_os = "macos")]
mod macos {
use super::*;
use std::process::Command;
pub async fn detect_volumes(config: &VolumeDetectionConfig) -> VolumeResult<Vec<Volume>> {
// Clone config for move into task
let config = config.clone();
// Run in blocking task since Command is sync
task::spawn_blocking(move || {
let mut volumes = Vec::new();
// Use diskutil to get volume information
let output = Command::new("diskutil")
.args(["list", "-plist"])
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run diskutil: {}", e)))?;
if !output.status.success() {
return Err(VolumeError::platform("diskutil command failed"));
}
// For now, use simple df command to get basic volume info
let df_output = Command::new("df")
.args(["-h"])
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run df: {}", e)))?;
let df_text = String::from_utf8_lossy(&df_output.stdout);
for line in df_text.lines().skip(1) { // Skip header
if let Some(volume) = parse_df_line(line, &config)? {
volumes.push(volume);
}
}
Ok(volumes)
})
.await
.map_err(|e| VolumeError::platform(format!("Task join error: {}", e)))?
}
fn parse_df_line(line: &str, config: &VolumeDetectionConfig) -> VolumeResult<Option<Volume>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 9 {
return Ok(None);
}
let filesystem = parts[0];
// Handle special case where autofs filesystem has name and target split across columns
if filesystem == "map" && parts.len() > 1 && parts[1].contains("auto") {
debug!("Skipping autofs filesystem: map {}", parts[1]);
return Ok(None);
}
let size_str = parts[1];
let used_str = parts[2];
let available_str = parts[3];
let mount_point = parts[8];
// Skip autofs and other special filesystems
if filesystem.starts_with("map") || filesystem.contains("auto_") {
debug!("Skipping autofs filesystem: {}", filesystem);
return Ok(None);
}
// Skip system filesystems unless requested
if !config.include_system && is_system_filesystem(filesystem) {
return Ok(None);
}
// Skip virtual filesystems unless requested
if !config.include_virtual && is_virtual_filesystem(filesystem) {
return Ok(None);
}
let mount_path = PathBuf::from(mount_point);
// Parse sizes (df output like "931Gi", "465Gi", etc.)
let total_bytes = parse_size_string(size_str)?;
let available_bytes = parse_size_string(available_str)?;
let name = if mount_point == "/" {
"Macintosh HD".to_string()
} else {
mount_path.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
};
let mount_type = if mount_point == "/" {
MountType::System
} else if mount_point.starts_with("/Volumes/") {
MountType::External
} else if filesystem.starts_with("//") {
MountType::Network
} else {
MountType::System
};
let disk_type = detect_disk_type(&mount_path)?;
let file_system = detect_filesystem(&mount_path)?;
let volume = Volume::new(
name,
mount_type,
mount_path,
vec![], // Additional mount points would need diskutil parsing
disk_type,
file_system,
total_bytes,
available_bytes,
false, // Read-only detection would need additional checks
Some(filesystem.to_string()), // Use filesystem as hardware ID
);
Ok(Some(volume))
}
fn detect_disk_type(mount_point: &PathBuf) -> VolumeResult<DiskType> {
// Try to detect SSD vs HDD using system_profiler or diskutil
let output = Command::new("diskutil")
.args(["info", mount_point.to_str().unwrap_or("/")])
.output();
match output {
Ok(output) if output.status.success() => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains("Solid State") {
Ok(DiskType::SSD)
} else if info.contains("Rotational") {
Ok(DiskType::HDD)
} else {
Ok(DiskType::Unknown)
}
}
_ => Ok(DiskType::Unknown),
}
}
fn detect_filesystem(mount_point: &PathBuf) -> VolumeResult<FileSystem> {
let output = Command::new("diskutil")
.args(["info", mount_point.to_str().unwrap_or("/")])
.output();
match output {
Ok(output) if output.status.success() => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains("APFS") {
Ok(FileSystem::APFS)
} else if info.contains("HFS+") {
Ok(FileSystem::Other("HFS+".to_string()))
} else if info.contains("ExFAT") {
Ok(FileSystem::ExFAT)
} else if info.contains("FAT32") {
Ok(FileSystem::FAT32)
} else {
Ok(FileSystem::Other("Unknown".to_string()))
}
}
_ => Ok(FileSystem::Other("Unknown".to_string())),
}
}
use super::*;
use std::process::Command;
pub async fn detect_volumes(
device_id: uuid::Uuid,
config: &VolumeDetectionConfig,
) -> VolumeResult<Vec<Volume>> {
// Clone config for move into task
let config = config.clone();
// Run in blocking task since Command is sync
task::spawn_blocking(move || {
let mut volumes = Vec::new();
// Use diskutil to get volume information
let output = Command::new("diskutil")
.args(["list", "-plist"])
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run diskutil: {}", e)))?;
if !output.status.success() {
return Err(VolumeError::platform(format!(
"diskutil failed with status: {}",
output.status
)));
}
// For now, use a simpler approach with df command to get mounted volumes
let df_output = Command::new("df")
.args(["-H"])
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run df: {}", e)))?;
if !df_output.status.success() {
return Err(VolumeError::platform(
"Failed to get volume information".to_string(),
));
}
let df_stdout = String::from_utf8_lossy(&df_output.stdout);
for line in df_stdout.lines().skip(1) {
// Skip header
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 9 {
let filesystem = fields[0];
let total_str = fields[1];
let used_str = fields[2];
let available_str = fields[3];
let mount_point = fields[8];
// Skip certain filesystems
if should_skip_filesystem(filesystem) {
debug!("Skipping {} filesystem: {}", filesystem, mount_point);
continue;
}
// Parse sizes (in bytes)
let total_bytes = parse_size_string(total_str).unwrap_or(0);
let available_bytes = parse_size_string(available_str).unwrap_or(0);
let mount_path = PathBuf::from(mount_point);
let name = extract_volume_name(&mount_path);
let mount_type = if mount_point.starts_with("/Volumes/") {
MountType::External
} else if mount_point.starts_with("/System/") {
MountType::System
} else if filesystem.contains("://") {
MountType::Network
} else {
MountType::System
};
let disk_type = detect_disk_type(&mount_path)?;
let file_system = detect_filesystem(&mount_path)?;
let volume = Volume::new(
device_id,
name,
mount_type,
mount_path,
vec![], // Additional mount points would need diskutil parsing
disk_type,
file_system,
total_bytes,
available_bytes,
false, // Read-only detection would need additional checks
Some(filesystem.to_string()), // Use filesystem as hardware ID
);
volumes.push(volume);
}
}
Ok(volumes)
})
.await
.map_err(|e| VolumeError::platform(format!("Task join error: {}", e)))?
}
// Helper function to check if filesystem should be skipped
fn should_skip_filesystem(filesystem: &str) -> bool {
matches!(
filesystem,
"devfs" | "tmpfs" | "proc" | "sysfs" | "fdescfs" | "kernfs"
)
}
// Helper function to extract volume name from mount path
fn extract_volume_name(mount_path: &PathBuf) -> String {
if let Some(name) = mount_path.file_name() {
name.to_string_lossy().to_string()
} else if mount_path.to_string_lossy() == "/" {
"Macintosh HD".to_string()
} else {
mount_path.to_string_lossy().to_string()
}
}
fn parse_df_line(
line: &str,
device_id: uuid::Uuid,
config: &VolumeDetectionConfig,
) -> VolumeResult<Option<Volume>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 9 {
return Ok(None);
}
let filesystem = parts[0];
// Handle special case where autofs filesystem has name and target split across columns
if filesystem == "map" && parts.len() > 1 && parts[1].contains("auto") {
debug!("Skipping autofs filesystem: map {}", parts[1]);
return Ok(None);
}
let size_str = parts[1];
let used_str = parts[2];
let available_str = parts[3];
let mount_point = parts[8];
// Skip autofs and other special filesystems
if filesystem.starts_with("map") || filesystem.contains("auto_") {
debug!("Skipping autofs filesystem: {}", filesystem);
return Ok(None);
}
// Skip system filesystems unless requested
if !config.include_system && is_system_filesystem(filesystem) {
return Ok(None);
}
// Skip virtual filesystems unless requested
if !config.include_virtual && is_virtual_filesystem(filesystem) {
return Ok(None);
}
let mount_path = PathBuf::from(mount_point);
// Parse sizes (df output like "931Gi", "465Gi", etc.)
let total_bytes = parse_size_string(size_str)?;
let available_bytes = parse_size_string(available_str)?;
let name = if mount_point == "/" {
"Macintosh HD".to_string()
} else {
mount_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
};
let mount_type = if mount_point == "/" {
MountType::System
} else if mount_point.starts_with("/Volumes/") {
MountType::External
} else if filesystem.starts_with("//") {
MountType::Network
} else {
MountType::System
};
let disk_type = detect_disk_type(&mount_path)?;
let file_system = detect_filesystem(&mount_path)?;
let volume = Volume::new(
device_id,
name,
mount_type,
mount_path,
vec![], // Additional mount points would need diskutil parsing
disk_type,
file_system,
total_bytes,
available_bytes,
false, // Read-only detection would need additional checks
Some(filesystem.to_string()), // Use filesystem as hardware ID
);
Ok(Some(volume))
}
fn detect_disk_type(mount_point: &PathBuf) -> VolumeResult<DiskType> {
// Try to detect SSD vs HDD using system_profiler or diskutil
let output = Command::new("diskutil")
.args(["info", mount_point.to_str().unwrap_or("/")])
.output();
match output {
Ok(output) if output.status.success() => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains("Solid State") {
Ok(DiskType::SSD)
} else if info.contains("Rotational") {
Ok(DiskType::HDD)
} else {
Ok(DiskType::Unknown)
}
}
_ => Ok(DiskType::Unknown),
}
}
fn detect_filesystem(mount_point: &PathBuf) -> VolumeResult<FileSystem> {
let output = Command::new("diskutil")
.args(["info", mount_point.to_str().unwrap_or("/")])
.output();
match output {
Ok(output) if output.status.success() => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains("APFS") {
Ok(FileSystem::APFS)
} else if info.contains("HFS+") {
Ok(FileSystem::Other("HFS+".to_string()))
} else if info.contains("ExFAT") {
Ok(FileSystem::ExFAT)
} else if info.contains("FAT32") {
Ok(FileSystem::FAT32)
} else {
Ok(FileSystem::Other("Unknown".to_string()))
}
}
_ => Ok(FileSystem::Other("Unknown".to_string())),
}
}
}
#[cfg(target_os = "linux")]
mod linux {
use super::*;
use std::process::Command;
pub async fn detect_volumes(config: &VolumeDetectionConfig) -> VolumeResult<Vec<Volume>> {
task::spawn_blocking(move || {
let mut volumes = Vec::new();
// Use df to get mounted filesystems
let output = Command::new("df")
.args(["-h", "-T"]) // -T shows filesystem type
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run df: {}", e)))?;
if !output.status.success() {
return Err(VolumeError::platform("df command failed"));
}
let df_text = String::from_utf8_lossy(&output.stdout);
for line in df_text.lines().skip(1) { // Skip header
if let Some(volume) = parse_df_line(line, &config)? {
volumes.push(volume);
}
}
Ok(volumes)
})
.await
.map_err(|e| VolumeError::platform(format!("Task join error: {}", e)))?
}
fn parse_df_line(line: &str, config: &VolumeDetectionConfig) -> VolumeResult<Option<Volume>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 7 {
return Ok(None);
}
let filesystem_device = parts[0];
let filesystem_type = parts[1];
let size_str = parts[2];
let used_str = parts[3];
let available_str = parts[4];
let mount_point = parts[6];
// Skip system filesystems unless requested
if !config.include_system && is_system_filesystem(filesystem_device) {
return Ok(None);
}
// Skip virtual filesystems unless requested
if !config.include_virtual && is_virtual_filesystem(filesystem_type) {
return Ok(None);
}
let mount_path = PathBuf::from(mount_point);
let total_bytes = parse_size_string(size_str)?;
let available_bytes = parse_size_string(available_str)?;
let name = if mount_point == "/" {
"Root".to_string()
} else {
mount_path.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
};
let mount_type = determine_mount_type(mount_point, filesystem_device);
let disk_type = detect_disk_type_linux(filesystem_device)?;
let file_system = FileSystem::from_string(filesystem_type);
let volume = Volume::new(
name,
mount_type,
mount_path,
vec![],
disk_type,
file_system,
total_bytes,
available_bytes,
false, // Would need additional check for read-only
Some(filesystem_device.to_string()),
);
Ok(Some(volume))
}
fn detect_disk_type_linux(device: &str) -> VolumeResult<DiskType> {
// Try to detect using /sys/block/*/queue/rotational
if let Some(device_name) = device.strip_prefix("/dev/") {
let base_device = device_name.trim_end_matches(char::is_numeric);
let rotational_path = format!("/sys/block/{}/queue/rotational", base_device);
if let Ok(contents) = std::fs::read_to_string(rotational_path) {
return match contents.trim() {
"0" => Ok(DiskType::SSD),
"1" => Ok(DiskType::HDD),
_ => Ok(DiskType::Unknown),
};
}
}
Ok(DiskType::Unknown)
}
fn determine_mount_type(mount_point: &str, device: &str) -> MountType {
if mount_point == "/" || mount_point.starts_with("/boot") {
MountType::System
} else if device.starts_with("//") || device.contains("nfs") {
MountType::Network
} else if mount_point.starts_with("/media/") || mount_point.starts_with("/mnt/") {
MountType::External
} else {
MountType::System
}
}
use super::*;
use std::process::Command;
pub async fn detect_volumes(
device_id: uuid::Uuid,
config: &VolumeDetectionConfig,
) -> VolumeResult<Vec<Volume>> {
task::spawn_blocking(move || {
let mut volumes = Vec::new();
// Use df to get mounted filesystems
let output = Command::new("df")
.args(["-h", "-T"]) // -T shows filesystem type
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run df: {}", e)))?;
if !output.status.success() {
return Err(VolumeError::platform("df command failed"));
}
let df_text = String::from_utf8_lossy(&output.stdout);
for line in df_text.lines().skip(1) {
// Skip header
if let Some(volume) = parse_df_line(line, device_id, &config)? {
volumes.push(volume);
}
}
Ok(volumes)
})
.await
.map_err(|e| VolumeError::platform(format!("Task join error: {}", e)))?
}
fn parse_df_line(
line: &str,
device_id: uuid::Uuid,
config: &VolumeDetectionConfig,
) -> VolumeResult<Option<Volume>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 7 {
return Ok(None);
}
let filesystem_device = parts[0];
let filesystem_type = parts[1];
let size_str = parts[2];
let used_str = parts[3];
let available_str = parts[4];
let mount_point = parts[6];
// Skip system filesystems unless requested
if !config.include_system && is_system_filesystem(filesystem_device) {
return Ok(None);
}
// Skip virtual filesystems unless requested
if !config.include_virtual && is_virtual_filesystem(filesystem_type) {
return Ok(None);
}
let mount_path = PathBuf::from(mount_point);
let total_bytes = parse_size_string(size_str)?;
let available_bytes = parse_size_string(available_str)?;
let name = if mount_point == "/" {
"Root".to_string()
} else {
mount_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
};
let mount_type = determine_mount_type(mount_point, filesystem_device);
let disk_type = detect_disk_type_linux(filesystem_device)?;
let file_system = FileSystem::from_string(filesystem_type);
let volume = Volume::new(
device_id,
name,
mount_type,
mount_path,
vec![],
disk_type,
file_system,
total_bytes,
available_bytes,
false, // Would need additional check for read-only
Some(filesystem_device.to_string()),
);
Ok(Some(volume))
}
fn detect_disk_type_linux(device: &str) -> VolumeResult<DiskType> {
// Try to detect using /sys/block/*/queue/rotational
if let Some(device_name) = device.strip_prefix("/dev/") {
let base_device = device_name.trim_end_matches(char::is_numeric);
let rotational_path = format!("/sys/block/{}/queue/rotational", base_device);
if let Ok(contents) = std::fs::read_to_string(rotational_path) {
return match contents.trim() {
"0" => Ok(DiskType::SSD),
"1" => Ok(DiskType::HDD),
_ => Ok(DiskType::Unknown),
};
}
}
Ok(DiskType::Unknown)
}
fn determine_mount_type(mount_point: &str, device: &str) -> MountType {
if mount_point == "/" || mount_point.starts_with("/boot") {
MountType::System
} else if device.starts_with("//") || device.contains("nfs") {
MountType::Network
} else if mount_point.starts_with("/media/") || mount_point.starts_with("/mnt/") {
MountType::External
} else {
MountType::System
}
}
}
#[cfg(target_os = "windows")]
mod windows {
use super::*;
use std::process::Command;
pub async fn detect_volumes(_config: &VolumeDetectionConfig) -> VolumeResult<Vec<Volume>> {
task::spawn_blocking(|| {
use super::*;
use std::process::Command;
pub async fn detect_volumes(
device_id: uuid::Uuid,
_config: &VolumeDetectionConfig,
) -> VolumeResult<Vec<Volume>> {
task::spawn_blocking(|| {
// Use PowerShell to get volume information
let output = Command::new("powershell")
.args([
@@ -333,101 +435,112 @@ mod windows {
])
.output()
.map_err(|e| VolumeError::platform(format!("Failed to run PowerShell: {}", e)))?;
if !output.status.success() {
return Err(VolumeError::platform("PowerShell command failed"));
}
// For now, return empty until we implement full Windows support
warn!("Windows volume detection not fully implemented yet");
Ok(Vec::new())
})
.await
.map_err(|e| VolumeError::platform(format!("Task join error: {}", e)))?
}
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
mod unsupported {
use super::*;
pub async fn detect_volumes(_config: &VolumeDetectionConfig) -> VolumeResult<Vec<Volume>> {
warn!("Volume detection not supported on this platform");
Ok(Vec::new())
}
use super::*;
pub async fn detect_volumes(
device_id: uuid::Uuid,
_config: &VolumeDetectionConfig,
) -> VolumeResult<Vec<Volume>> {
warn!("Volume detection not supported on this platform");
Ok(Vec::new())
}
}
// Common utility functions
fn is_system_filesystem(filesystem: &str) -> bool {
matches!(filesystem, "/" | "/dev" | "/proc" | "/sys" | "/tmp" | "/var/tmp")
matches!(
filesystem,
"/" | "/dev" | "/proc" | "/sys" | "/tmp" | "/var/tmp"
)
}
fn is_virtual_filesystem(filesystem: &str) -> bool {
let fs_lower = filesystem.to_lowercase();
matches!(
fs_lower.as_str(),
"devfs" | "sysfs" | "proc" | "tmpfs" | "ramfs" | "devtmpfs" | "overlay" | "fuse"
) || fs_lower.starts_with("map ") || fs_lower.contains("auto_")
let fs_lower = filesystem.to_lowercase();
matches!(
fs_lower.as_str(),
"devfs" | "sysfs" | "proc" | "tmpfs" | "ramfs" | "devtmpfs" | "overlay" | "fuse"
) || fs_lower.starts_with("map ")
|| fs_lower.contains("auto_")
}
fn parse_size_string(size_str: &str) -> VolumeResult<u64> {
if size_str == "-" {
return Ok(0);
}
// Skip invalid size strings that don't look like numbers
if size_str.is_empty() || size_str.chars().all(char::is_alphabetic) {
return Ok(0);
}
let size_str = size_str.replace(",", ""); // Remove commas
let (number_part, unit) = if let Some(pos) = size_str.find(char::is_alphabetic) {
(&size_str[..pos], &size_str[pos..])
} else {
(size_str.as_str(), "")
};
let number: f64 = number_part.parse()
.map_err(|_| VolumeError::InvalidData(format!("Invalid size: {}", size_str)))?;
let multiplier = match unit.to_uppercase().as_str() {
"" | "B" => 1,
"K" | "KB" | "KI" => 1024,
"M" | "MB" | "MI" => 1024 * 1024,
"G" | "GB" | "GI" => 1024 * 1024 * 1024,
"T" | "TB" | "TI" => 1024u64.pow(4),
"P" | "PB" | "PI" => 1024u64.pow(5),
_ => {
warn!("Unknown size unit: {}", unit);
1
}
};
Ok((number * multiplier as f64) as u64)
if size_str == "-" {
return Ok(0);
}
// Skip invalid size strings that don't look like numbers
if size_str.is_empty() || size_str.chars().all(char::is_alphabetic) {
return Ok(0);
}
let size_str = size_str.replace(",", ""); // Remove commas
let (number_part, unit) = if let Some(pos) = size_str.find(char::is_alphabetic) {
(&size_str[..pos], &size_str[pos..])
} else {
(size_str.as_str(), "")
};
let number: f64 = number_part
.parse()
.map_err(|_| VolumeError::InvalidData(format!("Invalid size: {}", size_str)))?;
let multiplier = match unit.to_uppercase().as_str() {
"" | "B" => 1,
"K" | "KB" | "KI" => 1024,
"M" | "MB" | "MI" => 1024 * 1024,
"G" | "GB" | "GI" => 1024 * 1024 * 1024,
"T" | "TB" | "TI" => 1024u64.pow(4),
"P" | "PB" | "PI" => 1024u64.pow(5),
_ => {
warn!("Unknown size unit: {}", unit);
1
}
};
Ok((number * multiplier as f64) as u64)
}
#[cfg(test)]
mod tests {
use super::*;
use super::*;
#[test]
fn test_parse_size_string() {
assert_eq!(parse_size_string("1024").unwrap(), 1024);
assert_eq!(parse_size_string("1K").unwrap(), 1024);
assert_eq!(parse_size_string("1M").unwrap(), 1024 * 1024);
assert_eq!(parse_size_string("1G").unwrap(), 1024 * 1024 * 1024);
assert_eq!(parse_size_string("1.5G").unwrap(), (1.5 * 1024.0 * 1024.0 * 1024.0) as u64);
assert_eq!(parse_size_string("-").unwrap(), 0);
}
#[test]
fn test_parse_size_string() {
assert_eq!(parse_size_string("1024").unwrap(), 1024);
assert_eq!(parse_size_string("1K").unwrap(), 1024);
assert_eq!(parse_size_string("1M").unwrap(), 1024 * 1024);
assert_eq!(parse_size_string("1G").unwrap(), 1024 * 1024 * 1024);
assert_eq!(
parse_size_string("1.5G").unwrap(),
(1.5 * 1024.0 * 1024.0 * 1024.0) as u64
);
assert_eq!(parse_size_string("-").unwrap(), 0);
}
#[test]
fn test_filesystem_detection() {
assert!(is_virtual_filesystem("tmpfs"));
assert!(is_virtual_filesystem("proc"));
assert!(!is_virtual_filesystem("ext4"));
assert!(is_system_filesystem("/"));
assert!(is_system_filesystem("/proc"));
assert!(!is_system_filesystem("/home"));
}
}
#[test]
fn test_filesystem_detection() {
assert!(is_virtual_filesystem("tmpfs"));
assert!(is_virtual_filesystem("proc"));
assert!(!is_virtual_filesystem("ext4"));
assert!(is_system_filesystem("/"));
assert!(is_system_filesystem("/proc"));
assert!(!is_system_filesystem("/home"));
}
}

View File

@@ -1,365 +1,360 @@
//! Volume speed testing functionality
use crate::volume::{
error::{VolumeError, VolumeResult},
types::{MountType, Volume},
error::{VolumeError, VolumeResult},
types::{MountType, Volume},
};
use std::time::Instant;
use tokio::{
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
time::{timeout, Duration},
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
time::{timeout, Duration},
};
use tracing::{debug, instrument, warn};
/// Configuration for speed tests
#[derive(Debug, Clone)]
pub struct SpeedTestConfig {
/// Size of the test file in megabytes
pub file_size_mb: usize,
/// Timeout for the test in seconds
pub timeout_secs: u64,
/// Number of test iterations for averaging
pub iterations: usize,
/// Size of the test file in megabytes
pub file_size_mb: usize,
/// Timeout for the test in seconds
pub timeout_secs: u64,
/// Number of test iterations for averaging
pub iterations: usize,
}
impl Default for SpeedTestConfig {
fn default() -> Self {
Self {
file_size_mb: 10,
timeout_secs: 30,
iterations: 1,
}
}
fn default() -> Self {
Self {
file_size_mb: 10,
timeout_secs: 30,
iterations: 1,
}
}
}
/// Result of a speed test
#[derive(Debug, Clone)]
pub struct SpeedTestResult {
/// Write speed in MB/s
pub write_speed_mbps: f64,
/// Read speed in MB/s
pub read_speed_mbps: f64,
/// Total time taken for the test
pub duration_secs: f64,
/// Write speed in MB/s
pub write_speed_mbps: f64,
/// Read speed in MB/s
pub read_speed_mbps: f64,
/// Total time taken for the test
pub duration_secs: f64,
}
/// Run a speed test on the given volume
#[instrument(skip(volume), fields(volume_name = %volume.name))]
pub async fn run_speed_test(volume: &Volume) -> VolumeResult<(u64, u64)> {
run_speed_test_with_config(volume, SpeedTestConfig::default()).await
run_speed_test_with_config(volume, SpeedTestConfig::default()).await
}
/// Run a speed test with custom configuration
#[instrument(skip(volume, config), fields(volume_name = %volume.name))]
pub async fn run_speed_test_with_config(
volume: &Volume,
config: SpeedTestConfig,
volume: &Volume,
config: SpeedTestConfig,
) -> VolumeResult<(u64, u64)> {
if !volume.is_mounted {
return Err(VolumeError::NotMounted(volume.name.clone()));
}
if !volume.is_mounted {
return Err(VolumeError::NotMounted(volume.name.clone()));
}
if volume.read_only {
return Err(VolumeError::ReadOnly(volume.name.clone()));
}
if volume.read_only {
return Err(VolumeError::ReadOnly(volume.name.clone()));
}
debug!("Starting speed test with config: {:?}", config);
debug!("Starting speed test with config: {:?}", config);
let test_location = TestLocation::new(&volume.mount_point, &volume.mount_type).await?;
let result = perform_speed_test(&test_location, &config).await?;
let test_location = TestLocation::new(&volume.mount_point, &volume.mount_type).await?;
let result = perform_speed_test(&test_location, &config).await?;
// Cleanup
test_location.cleanup().await?;
// Cleanup
test_location.cleanup().await?;
debug!(
"Speed test completed: {:.2} MB/s write, {:.2} MB/s read",
result.write_speed_mbps, result.read_speed_mbps
);
debug!(
"Speed test completed: {:.2} MB/s write, {:.2} MB/s read",
result.write_speed_mbps, result.read_speed_mbps
);
Ok((
result.read_speed_mbps as u64,
result.write_speed_mbps as u64,
))
Ok((
result.read_speed_mbps as u64,
result.write_speed_mbps as u64,
))
}
/// Helper for managing test files and directories
struct TestLocation {
test_file: std::path::PathBuf,
created_dir: Option<std::path::PathBuf>,
test_file: std::path::PathBuf,
created_dir: Option<std::path::PathBuf>,
}
impl TestLocation {
/// Create a new test location
async fn new(
volume_path: &std::path::Path,
mount_type: &MountType,
) -> VolumeResult<Self> {
let (dir, created_dir) = get_writable_directory(volume_path, mount_type).await?;
let test_file = dir.join("spacedrive_speed_test.tmp");
/// Create a new test location
async fn new(volume_path: &std::path::Path, mount_type: &MountType) -> VolumeResult<Self> {
let (dir, created_dir) = get_writable_directory(volume_path, mount_type).await?;
let test_file = dir.join("spacedrive_speed_test.tmp");
Ok(Self {
test_file,
created_dir,
})
}
Ok(Self {
test_file,
created_dir,
})
}
/// Clean up test files and directories
async fn cleanup(&self) -> VolumeResult<()> {
// Remove test file
if self.test_file.exists() {
if let Err(e) = tokio::fs::remove_file(&self.test_file).await {
warn!("Failed to remove test file: {}", e);
}
}
/// Clean up test files and directories
async fn cleanup(&self) -> VolumeResult<()> {
// Remove test file
if self.test_file.exists() {
if let Err(e) = tokio::fs::remove_file(&self.test_file).await {
warn!("Failed to remove test file: {}", e);
}
}
// Remove created directory if we created it
if let Some(ref dir) = self.created_dir {
if let Err(e) = tokio::fs::remove_dir_all(dir).await {
warn!("Failed to remove test directory: {}", e);
}
}
// Remove created directory if we created it
if let Some(ref dir) = self.created_dir {
if let Err(e) = tokio::fs::remove_dir_all(dir).await {
warn!("Failed to remove test directory: {}", e);
}
}
Ok(())
}
Ok(())
}
}
/// Perform the actual speed test
async fn perform_speed_test(
location: &TestLocation,
config: &SpeedTestConfig,
location: &TestLocation,
config: &SpeedTestConfig,
) -> VolumeResult<SpeedTestResult> {
let test_data = generate_test_data(config.file_size_mb);
let timeout_duration = Duration::from_secs(config.timeout_secs);
let test_data = generate_test_data(config.file_size_mb);
let timeout_duration = Duration::from_secs(config.timeout_secs);
let mut write_speeds = Vec::new();
let mut read_speeds = Vec::new();
let overall_start = Instant::now();
let mut write_speeds = Vec::new();
let mut read_speeds = Vec::new();
let overall_start = Instant::now();
for iteration in 0..config.iterations {
debug!("Speed test iteration {}/{}", iteration + 1, config.iterations);
for iteration in 0..config.iterations {
debug!(
"Speed test iteration {}/{}",
iteration + 1,
config.iterations
);
// Write test
let write_speed = timeout(
timeout_duration,
perform_write_test(&location.test_file, &test_data),
)
.await
.map_err(|_| VolumeError::Timeout)??;
// Write test
let write_speed = timeout(
timeout_duration,
perform_write_test(&location.test_file, &test_data),
)
.await
.map_err(|_| VolumeError::Timeout)??;
write_speeds.push(write_speed);
write_speeds.push(write_speed);
// Read test
let read_speed = timeout(
timeout_duration,
perform_read_test(&location.test_file, test_data.len()),
)
.await
.map_err(|_| VolumeError::Timeout)??;
// Read test
let read_speed = timeout(
timeout_duration,
perform_read_test(&location.test_file, test_data.len()),
)
.await
.map_err(|_| VolumeError::Timeout)??;
read_speeds.push(read_speed);
read_speeds.push(read_speed);
// Clean up test file between iterations
if iteration < config.iterations - 1 {
let _ = tokio::fs::remove_file(&location.test_file).await;
}
}
// Clean up test file between iterations
if iteration < config.iterations - 1 {
let _ = tokio::fs::remove_file(&location.test_file).await;
}
}
let avg_write_speed = write_speeds.iter().sum::<f64>() / write_speeds.len() as f64;
let avg_read_speed = read_speeds.iter().sum::<f64>() / read_speeds.len() as f64;
let avg_write_speed = write_speeds.iter().sum::<f64>() / write_speeds.len() as f64;
let avg_read_speed = read_speeds.iter().sum::<f64>() / read_speeds.len() as f64;
Ok(SpeedTestResult {
write_speed_mbps: avg_write_speed,
read_speed_mbps: avg_read_speed,
duration_secs: overall_start.elapsed().as_secs_f64(),
})
Ok(SpeedTestResult {
write_speed_mbps: avg_write_speed,
read_speed_mbps: avg_read_speed,
duration_secs: overall_start.elapsed().as_secs_f64(),
})
}
/// Generate test data for speed testing
fn generate_test_data(size_mb: usize) -> Vec<u8> {
let size_bytes = size_mb * 1024 * 1024;
// Use a pattern instead of zeros to avoid compression optimizations
let pattern = b"SpacedriveSpeedTest0123456789ABCDEF";
let mut data = Vec::with_capacity(size_bytes);
for i in 0..size_bytes {
data.push(pattern[i % pattern.len()]);
}
data
let size_bytes = size_mb * 1024 * 1024;
// Use a pattern instead of zeros to avoid compression optimizations
let pattern = b"SpacedriveSpeedTest0123456789ABCDEF";
let mut data = Vec::with_capacity(size_bytes);
for i in 0..size_bytes {
data.push(pattern[i % pattern.len()]);
}
data
}
/// Perform write speed test
async fn perform_write_test(
file_path: &std::path::Path,
data: &[u8],
) -> VolumeResult<f64> {
let start = Instant::now();
async fn perform_write_test(file_path: &std::path::Path, data: &[u8]) -> VolumeResult<f64> {
let start = Instant::now();
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(file_path)
.await?;
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(file_path)
.await?;
file.write_all(data).await?;
file.sync_all().await?; // Ensure data is written to disk
file.write_all(data).await?;
file.sync_all().await?; // Ensure data is written to disk
let duration = start.elapsed();
let speed_mbps = (data.len() as f64 / 1024.0 / 1024.0) / duration.as_secs_f64();
let duration = start.elapsed();
let speed_mbps = (data.len() as f64 / 1024.0 / 1024.0) / duration.as_secs_f64();
Ok(speed_mbps)
Ok(speed_mbps)
}
/// Perform read speed test
async fn perform_read_test(
file_path: &std::path::Path,
expected_size: usize,
) -> VolumeResult<f64> {
let start = Instant::now();
async fn perform_read_test(file_path: &std::path::Path, expected_size: usize) -> VolumeResult<f64> {
let start = Instant::now();
let mut file = File::open(file_path).await?;
let mut buffer = Vec::with_capacity(expected_size);
file.read_to_end(&mut buffer).await?;
let mut file = File::open(file_path).await?;
let mut buffer = Vec::with_capacity(expected_size);
file.read_to_end(&mut buffer).await?;
let duration = start.elapsed();
let speed_mbps = (buffer.len() as f64 / 1024.0 / 1024.0) / duration.as_secs_f64();
let duration = start.elapsed();
let speed_mbps = (buffer.len() as f64 / 1024.0 / 1024.0) / duration.as_secs_f64();
Ok(speed_mbps)
Ok(speed_mbps)
}
/// Get a writable directory within the volume
async fn get_writable_directory(
volume_path: &std::path::Path,
mount_type: &MountType,
volume_path: &std::path::Path,
mount_type: &MountType,
) -> VolumeResult<(std::path::PathBuf, Option<std::path::PathBuf>)> {
match mount_type {
MountType::System => {
// For system volumes, prefer using temp directory
let temp_dir = std::env::temp_dir();
Ok((temp_dir, None))
}
_ => {
// For external volumes, try to write in the root or create a temp directory
let candidates = [
volume_path.join("tmp"),
volume_path.join(".spacedrive_temp"),
volume_path.to_path_buf(),
];
match mount_type {
MountType::System => {
// For system volumes, prefer using temp directory
let temp_dir = std::env::temp_dir();
Ok((temp_dir, None))
}
_ => {
// For external volumes, try to write in the root or create a temp directory
let candidates = [
volume_path.join("tmp"),
volume_path.join(".spacedrive_temp"),
volume_path.to_path_buf(),
];
for candidate in &candidates {
// Try to create the directory
if let Ok(()) = tokio::fs::create_dir_all(candidate).await {
// Test if we can write to it
let test_file = candidate.join("test_write_permissions");
if tokio::fs::write(&test_file, b"test").await.is_ok() {
let _ = tokio::fs::remove_file(&test_file).await;
// If we created a directory specifically for this test, mark it for cleanup
let created_dir = if candidate.file_name()
.map_or(false, |name| name == "tmp" || name == ".spacedrive_temp")
{
Some(candidate.clone())
} else {
None
};
return Ok((candidate.clone(), created_dir));
}
}
}
for candidate in &candidates {
// Try to create the directory
if let Ok(()) = tokio::fs::create_dir_all(candidate).await {
// Test if we can write to it
let test_file = candidate.join("test_write_permissions");
if tokio::fs::write(&test_file, b"test").await.is_ok() {
let _ = tokio::fs::remove_file(&test_file).await;
Err(VolumeError::PermissionDenied(format!(
"No writable directory found in volume: {}",
volume_path.display()
)))
}
}
// If we created a directory specifically for this test, mark it for cleanup
let created_dir = if candidate
.file_name()
.map_or(false, |name| name == "tmp" || name == ".spacedrive_temp")
{
Some(candidate.clone())
} else {
None
};
return Ok((candidate.clone(), created_dir));
}
}
}
Err(VolumeError::PermissionDenied(format!(
"No writable directory found in volume: {}",
volume_path.display()
)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::volume::types::{DiskType, FileSystem};
use tempfile::TempDir;
use super::*;
use crate::volume::types::{DiskType, FileSystem};
use tempfile::TempDir;
#[tokio::test]
async fn test_speed_test_config() {
let config = SpeedTestConfig::default();
assert_eq!(config.file_size_mb, 10);
assert_eq!(config.timeout_secs, 30);
assert_eq!(config.iterations, 1);
}
#[tokio::test]
async fn test_speed_test_config() {
let config = SpeedTestConfig::default();
assert_eq!(config.file_size_mb, 10);
assert_eq!(config.timeout_secs, 30);
assert_eq!(config.iterations, 1);
}
#[tokio::test]
async fn test_generate_test_data() {
let data = generate_test_data(1); // 1MB
assert_eq!(data.len(), 1024 * 1024);
// Verify pattern is not all zeros
assert!(data.iter().any(|&b| b != 0));
}
#[tokio::test]
async fn test_generate_test_data() {
let data = generate_test_data(1); // 1MB
assert_eq!(data.len(), 1024 * 1024);
#[tokio::test]
async fn test_writable_directory_external() {
let temp_dir = TempDir::new().unwrap();
let volume_path = temp_dir.path();
let (writable_dir, created_dir) = get_writable_directory(volume_path, &MountType::External)
.await
.unwrap();
assert!(writable_dir.exists());
// Cleanup if we created a directory
if let Some(dir) = created_dir {
let _ = tokio::fs::remove_dir_all(dir).await;
}
}
// Verify pattern is not all zeros
assert!(data.iter().any(|&b| b != 0));
}
#[tokio::test]
async fn test_writable_directory_system() {
let (writable_dir, created_dir) = get_writable_directory(
&std::path::PathBuf::from("/"),
&MountType::System
)
.await
.unwrap();
assert!(writable_dir.exists());
assert!(created_dir.is_none()); // Should use system temp, not create new dir
}
#[tokio::test]
async fn test_writable_directory_external() {
let temp_dir = TempDir::new().unwrap();
let volume_path = temp_dir.path();
#[tokio::test]
async fn test_full_speed_test() {
let temp_dir = TempDir::new().unwrap();
let volume = Volume::new(
"Test Volume".to_string(),
MountType::External,
temp_dir.path().to_path_buf(),
vec![],
DiskType::Unknown,
FileSystem::Other("test".to_string()),
1000000000, // 1GB capacity
500000000, // 500MB available
false, // Not read-only
None,
);
let (writable_dir, created_dir) = get_writable_directory(volume_path, &MountType::External)
.await
.unwrap();
let config = SpeedTestConfig {
file_size_mb: 1, // Small test file
timeout_secs: 10,
iterations: 1,
};
assert!(writable_dir.exists());
let result = run_speed_test_with_config(&volume, config).await;
assert!(result.is_ok());
let (read_speed, write_speed) = result.unwrap();
assert!(read_speed > 0);
assert!(write_speed > 0);
}
}
// Cleanup if we created a directory
if let Some(dir) = created_dir {
let _ = tokio::fs::remove_dir_all(dir).await;
}
}
#[tokio::test]
async fn test_writable_directory_system() {
let (writable_dir, created_dir) =
get_writable_directory(&std::path::PathBuf::from("/"), &MountType::System)
.await
.unwrap();
assert!(writable_dir.exists());
assert!(created_dir.is_none()); // Should use system temp, not create new dir
}
#[tokio::test]
async fn test_full_speed_test() {
let temp_dir = TempDir::new().unwrap();
let volume = Volume::new(
uuid::Uuid::new_v4(), // Test device ID
"Test Volume".to_string(),
MountType::External,
temp_dir.path().to_path_buf(),
vec![],
DiskType::Unknown,
FileSystem::Other("test".to_string()),
1000000000, // 1GB capacity
500000000, // 500MB available
false, // Not read-only
None,
);
let config = SpeedTestConfig {
file_size_mb: 1, // Small test file
timeout_secs: 10,
iterations: 1,
};
let result = run_speed_test_with_config(&volume, config).await;
assert!(result.is_ok());
let (read_speed, write_speed) = result.unwrap();
assert!(read_speed > 0);
assert!(write_speed > 0);
}
}

View File

@@ -12,6 +12,7 @@ impl VolumeFingerprint {
/// Create a new volume fingerprint from volume properties
pub fn new(volume: &Volume) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(volume.device_id.as_bytes());
hasher.update(volume.mount_point.to_string_lossy().as_bytes());
hasher.update(volume.name.as_bytes());
hasher.update(&volume.total_bytes_capacity.to_be_bytes());
@@ -29,7 +30,7 @@ impl VolumeFingerprint {
pub fn from_hex(hex: impl Into<String>) -> Self {
Self(hex.into())
}
/// Create fingerprint from string (alias for from_hex)
pub fn from_string(s: &str) -> Result<Self, crate::volume::VolumeError> {
Ok(Self(s.to_string()))
@@ -79,6 +80,9 @@ pub struct Volume {
/// Unique fingerprint for this volume
pub fingerprint: VolumeFingerprint,
/// Device this volume belongs to
pub device_id: uuid::Uuid,
/// Human-readable volume name
pub name: String,
/// Type of mount (system, external, etc)
@@ -133,6 +137,7 @@ pub struct VolumeInfo {
pub struct TrackedVolume {
pub id: i32,
pub uuid: uuid::Uuid,
pub device_id: uuid::Uuid,
pub fingerprint: VolumeFingerprint,
pub display_name: Option<String>,
pub tracked_at: chrono::DateTime<chrono::Utc>,
@@ -165,6 +170,7 @@ impl From<&Volume> for VolumeInfo {
impl Volume {
/// Create a new Volume instance
pub fn new(
device_id: uuid::Uuid,
name: String,
mount_type: MountType,
mount_point: PathBuf,
@@ -178,6 +184,7 @@ impl Volume {
) -> Self {
let volume = Self {
fingerprint: VolumeFingerprint::from_hex(""), // Will be set after creation
device_id,
name,
mount_type,
mount_point,
@@ -418,6 +425,7 @@ mod tests {
#[test]
fn test_volume_fingerprint() {
let volume = Volume::new(
uuid::Uuid::new_v4(),
"Test Volume".to_string(),
MountType::External,
PathBuf::from("/mnt/test"),
@@ -441,6 +449,7 @@ mod tests {
#[test]
fn test_volume_contains_path() {
let volume = Volume::new(
uuid::Uuid::new_v4(),
"Test".to_string(),
MountType::System,
PathBuf::from("/home"),