Implement volume tracking, unmounting, and event subscriptions; enhance volume API with new types and improve UI handling

This commit is contained in:
Jamie Pine
2024-10-31 23:54:44 -07:00
parent e57086257c
commit 8a5befc020
12 changed files with 1290 additions and 142 deletions

View File

@@ -1,25 +1,83 @@
use super::{utils::library, Ctx, R};
use crate::library::Library;
use crate::volume::VolumeEvent;
use rspc::alpha::AlphaRouter;
use serde::Deserialize;
use specta::Type;
use std::path::PathBuf;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
#[derive(Type, Deserialize)]
pub struct TrackVolumeInput {
pub volume_id: Vec<u8>,
}
pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router().procedure(
"list",
R.query(|node, _: ()| async move {
tracing::debug!("Handling volumes list request");
// Add a map_err to properly convert the error
match node.volumes.list_system_volumes().await {
Ok(volumes) => {
tracing::debug!("Returning {} volumes", volumes.len());
Ok(volumes)
R.router()
.procedure(
"list",
R.query(|node, _: ()| async move {
tracing::debug!("Handling volumes list request");
match node.volumes.list_system_volumes().await {
Ok(volumes) => {
tracing::debug!("Returning {} volumes", volumes.len());
Ok(volumes)
}
Err(e) => {
tracing::error!("Error listing volumes: {:?}", e);
Err(e.into())
}
}
Err(e) => {
tracing::error!("Error listing volumes: {:?}", e);
Err(e.into())
}
}
}),
)
}),
)
.procedure(
"track",
R.with2(library())
.mutation(|(node, library), input: TrackVolumeInput| async move {
tracing::debug!(
"Handling track volume request for volume_id={:?}",
input.volume_id
);
node.volumes
.track_volume(input.volume_id, library)
.await
.map_err(|e| {
tracing::error!("Failed to track volume: {:?}", e);
e.into()
})
}),
)
.procedure(
"listForLibrary",
R.with2(library())
.query(|(node, library), _: ()| async move {
node.volumes
.list_library_volumes(library)
.await
.map_err(Into::into)
}),
)
.procedure(
"unmount",
R.with2(library())
.mutation(|(node, _), volume_id: Vec<u8>| async move {
node.volumes
.unmount_volume(volume_id)
.await
.map_err(Into::into)
}),
)
.procedure("events", {
R.with2(library()).subscription(|(node, library), _: ()| {
Ok(async_stream::stream! {
let mut event_bus_rx = node.volumes.subscribe();
while let Ok(event) = event_bus_rx.recv().await {
yield event;
}
})
})
})
}

View File

@@ -0,0 +1,160 @@
use sd_prisma::prisma::{device, volume, PrismaClient};
use super::{os::get_volumes, MountType, Volume, VolumeError};
use crate::{invalidate_query, library::Library, volume::speed::SpeedTest};
use std::sync::Arc;
use tracing::{error, info};
pub struct VolumeManager {
library: Arc<Library>,
current_device_id: i32, // the db id of the current device
library_volumes: Vec<Volume>, // Volumes committed to the DB
untracked_volumes: Vec<Volume>, // Uncommitted volumes
}
// The volume manager must be conservative with when to trigger a sync event.
// It should only trigger a sync event when a volume is added or removed from the library.
// not when a volume is mounted or unmounted.
impl VolumeManager {
/// Initializes the VolumeManager by detecting system, external, and network volumes
pub async fn new(lib: Arc<Library>) -> Result<Self, VolumeError> {
// Detect volumes present in the system
let detected_volumes = VolumeManager::detect_system_volumes().await;
// Query database for volumes already committed to the library
let library_volumes = VolumeManager::query_library_volumes(&lib.db).await;
// Merge detected volumes with library volumes to find untracked volumes
let untracked_volumes =
VolumeManager::derive_untracked_volumes(&detected_volumes, &library_volumes);
let current_device = lib
.db
.device()
.find_unique(device::pub_id::equals(lib.sync.device_pub_id.to_db()))
.select(device::select!({ id }))
.exec()
.await?
.ok_or(VolumeError::NoDeviceFound)?;
info!("VOLUME MANAGER: Current device: {:?}", current_device);
Ok(VolumeManager {
current_device_id: current_device.id,
library: lib.clone(),
library_volumes,
untracked_volumes,
})
}
/// Detects system volumes (external, system, etc.)
async fn detect_system_volumes() -> Vec<Volume> {
get_volumes().await
}
/// Queries volumes already in the library (database)
async fn query_library_volumes(db: &PrismaClient) -> Vec<Volume> {
match db.volume().find_many(vec![]).exec().await {
Ok(volumes) => volumes.into_iter().map(Volume::from).collect(),
Err(e) => {
error!(?e, "Failed to query library volumes;");
vec![]
}
}
}
pub async fn track_volume(&mut self, volume_pub_id: Vec<u8>) {
// if volume is already tracked, do nothing
if self
.library_volumes
.iter()
.any(|vol| vol.pub_id == Some(volume_pub_id.clone()))
{
return;
}
let volume_index = self
.untracked_volumes
.iter()
.position(|vol| vol.pub_id == Some(volume_pub_id.clone()));
if let Some(index) = volume_index {
// remove the volume from the untracked volumes
let volume = self.untracked_volumes.swap_remove(index);
// update the volume in the library
self.untracked_volumes
.retain(|vol| vol.mount_point != volume.mount_point);
volume
.create(&self.library.db, self.current_device_id)
.await
.unwrap_or(());
self.library_volumes.push(volume.clone());
info!("Volume tracked: {:?}", volume);
let _lib = self.library.clone();
// spawn a task to test the speed of the volume
tokio::spawn(async move {
let mut volume = volume;
let speed = volume.speed_test().await.unwrap_or((0.0, 0.0));
info!("Volume speed test: {:?}", speed);
});
}
}
// triggered by the watcher when a volume is added or removed
pub async fn evaluate_system_volumes(&self) -> Result<(), VolumeError> {
let volumes = get_volumes().await;
// get the current volumes on the system
let detected_volumes = VolumeManager::detect_system_volumes().await;
println!("Volumes: {:?}", volumes);
println!("Detected Volumes: {:?}", detected_volumes);
Ok(())
}
// this function will commit the system volumes to the database
async fn init_system_volumes(&self) -> Result<(), VolumeError> {
// for each volume, if system volume, commit to db
for vol in self.untracked_volumes.clone() {
if vol.mount_type == MountType::System {
println!("ADDING SYSTEM VOLUME");
let mut volume_clone = vol.clone();
// run speed test but fail silently
volume_clone.speed_test().await.unwrap_or((0.0, 0.0));
volume_clone
.create(&self.library.db, self.current_device_id)
.await
.unwrap_or(());
println!("Volume created: {:?}", volume_clone);
}
}
Ok(())
}
/// Finds untracked volumes by comparing detected and library volumes
fn derive_untracked_volumes(
detected_volumes: &Vec<Volume>,
library_volumes: &Vec<Volume>,
) -> Vec<Volume> {
// Filter out volumes that are already in the library
detected_volumes
.iter()
.filter(|detected_vol| {
!library_volumes
.iter()
.any(|lib_vol| lib_vol.mount_point == detected_vol.mount_point)
})
.cloned()
.collect()
}
// async fn register_non_system_volumes(db: &PrismaClient, volumes: &Vec<Volume>) {}
// pub async fn get_local_volumes(db: &PrismaClient) {}
// pub fn get_volume_for_location(&self, location: &Location) -> Option<Volume>
}

284
core/src/util/throw/mod.rs Normal file
View File

@@ -0,0 +1,284 @@
use sd_prisma::prisma::exif_data::device_id;
use sd_prisma::prisma::volume;
use sd_prisma::prisma::PrismaClient;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use specta::Type;
use std::{
hash::{Hash, Hasher},
path::PathBuf,
};
use strum_macros::Display;
use tracing::error;
use uuid::Uuid;
pub mod manager;
pub mod os;
pub mod speed;
pub mod statistics;
pub mod watcher;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum VolumeError {
#[error("I/O error: {0}")]
Io(#[from] tokio::io::Error),
#[error("Timeout error: {0}")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("No mount point found for volume")]
NoMountPoint,
#[error("Directory error: {0}")]
DirectoryError(String),
#[error("Database error: {0}")]
Database(#[from] prisma_client_rust::QueryError),
#[error("No device found")]
NoDeviceFound,
}
// Conversion to rspc::Error
impl From<VolumeError> for rspc::Error {
fn from(e: VolumeError) -> Self {
rspc::Error::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e)
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug, Clone, Type)]
pub struct Volume {
// ids will be None if the volume is not committed to the database
pub id: Option<i32>,
pub pub_id: Option<Vec<u8>>,
pub name: String, // Volume name
pub mount_type: MountType,
pub mount_point: PathBuf, // List of mount points
pub is_mounted: bool,
pub disk_type: DiskType,
pub file_system: FileSystem,
pub read_only: bool, // True if read-only
pub error_status: Option<String>, // SMART error status or similar
// Statistics
// I/O speed in Mbps
pub read_speed_mbps: Option<u64>,
pub write_speed_mbps: Option<u64>,
#[specta(type = String)]
#[serde_as(as = "DisplayFromStr")]
pub total_bytes_capacity: u64, // Total bytes capacity
#[specta(type = String)]
#[serde_as(as = "DisplayFromStr")]
pub total_bytes_available: u64, // Total bytes available
}
// impl Hash for Volume {
// fn hash<H: Hasher>(&self, state: &mut H) {
// self.name.hash(state);
// self.mount_point.hash(state);
// self.disk_type.hash(state);
// self.file_system.hash(state);
// }
// }
impl PartialEq for Volume {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.disk_type == other.disk_type
&& self.file_system == other.file_system
// Leaving mount points for last because O(n * m)
&& self.mount_point == other.mount_point
}
}
impl Eq for Volume {}
impl From<volume::Data> for Volume {
fn from(vol: volume::Data) -> Self {
Volume {
id: Some(vol.id),
pub_id: Some(vol.pub_id),
name: vol.name.unwrap_or_else(|| "Unknown".to_string()),
mount_type: vol
.mount_type
.and_then(|mt| Some(MountType::from_string(&mt)))
.unwrap_or(MountType::System),
mount_point: PathBuf::from(vol.mount_point.unwrap_or_else(|| "/".to_string())),
is_mounted: vol.is_mounted.unwrap_or(false),
disk_type: vol
.disk_type
.and_then(|dt| Some(DiskType::from_string(&dt)))
.unwrap_or(DiskType::Unknown),
file_system: vol
.file_system
.and_then(|fs| Some(FileSystem::from_string(&fs)))
.unwrap_or(FileSystem::Other("Unknown".to_string())),
read_only: vol.read_only.unwrap_or(false),
error_status: vol.error_status,
// Statistics
total_bytes_capacity: vol
.total_bytes_capacity
.and_then(|t| t.parse().ok())
.unwrap_or(0),
total_bytes_available: vol
.total_bytes_available
.and_then(|a| a.parse().ok())
.unwrap_or(0),
read_speed_mbps: vol.read_speed_mbps.map(|s| s as u64),
write_speed_mbps: vol.write_speed_mbps.map(|s| s as u64),
}
}
}
impl Volume {
pub async fn create(&self, db: &PrismaClient, device_id: i32) -> Result<(), VolumeError> {
let pub_id = Uuid::now_v7().as_bytes().to_vec();
db.volume()
.create(
pub_id,
vec![
volume::name::set(Some(self.name.clone())),
volume::mount_type::set(Some(self.mount_type.to_string())),
volume::mount_point::set(Some(self.mount_point.to_str().unwrap().to_string())),
volume::is_mounted::set(Some(self.is_mounted)),
volume::disk_type::set(Some(self.disk_type.to_string())),
volume::file_system::set(Some(self.file_system.to_string())),
volume::read_only::set(Some(self.read_only)),
volume::error_status::set(self.error_status.clone()),
volume::total_bytes_capacity::set(Some(self.total_bytes_capacity.to_string())),
volume::total_bytes_available::set(Some(
self.total_bytes_available.to_string(),
)),
volume::read_speed_mbps::set(self.read_speed_mbps.and_then(|v| {
if v == 0 {
None
} else {
Some(v as i64)
}
})),
volume::write_speed_mbps::set(self.write_speed_mbps.and_then(|v| {
if v == 0 {
None
} else {
Some(v as i64)
}
})),
volume::device_id::set(Some(device_id)),
],
)
.exec()
.await?;
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Type, Hash, PartialEq, Eq, Display)]
#[allow(clippy::upper_case_acronyms)]
pub enum DiskType {
SSD,
HDD,
Unknown,
}
impl DiskType {
pub fn from_string(disk_type: &str) -> Self {
match disk_type {
"SSD" => Self::SSD,
"HDD" => Self::HDD,
_ => Self::Unknown,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Type, Hash, PartialEq, Eq, Display)]
pub enum FileSystem {
NTFS,
FAT32,
EXT4,
APFS,
ExFAT,
Other(String),
}
impl FileSystem {
// Create a function to convert a String into a FileSystem enum
pub fn from_string(fs: &str) -> Self {
match fs.to_uppercase().as_str() {
"NTFS" => FileSystem::NTFS,
"FAT32" => FileSystem::FAT32,
"EXT4" => FileSystem::EXT4,
"APFS" => FileSystem::APFS,
"EXFAT" => FileSystem::ExFAT,
// If the string does not match known variants, store it in the Other variant
_ => FileSystem::Other(fs.to_string()),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Type, Hash, PartialEq, Eq, Display)]
pub enum MountType {
System,
External,
Network,
Virtual,
}
impl MountType {
pub fn from_string(mount_type: &str) -> Self {
match mount_type {
"System" => Self::System,
"External" => Self::External,
"Network" => Self::Network,
"Virtual" => Self::Virtual,
_ => Self::System,
}
}
}
// pub async fn save_volume(library: &Library) -> Result<(), VolumeError> {
// // enter all volumes associate with this client add to db
// for volume in get_volumes() {
// let params = vec![
// disk_type::set(volume.disk_type.map(|t| t.to_string())),
// filesystem::set(volume.file_system.clone()),
// total_bytes_capacity::set(volume.total_capacity.to_string()),
// total_bytes_available::set(volume.available_capacity.to_string()),
// ];
// library
// .db
// .volume()
// .upsert(
// node_id_mount_point_name(
// library.node_local_id,
// volume.mount_point,
// volume.name,
// ),
// volume::create(
// library.node_local_id,
// volume.name,
// volume.mount_point,
// params.clone(),
// ),
// params,
// )
// .exec()
// .await?;
// }
// // cleanup: remove all unmodified volumes associate with this client
// Ok(())
// }
// #[test]
// fn test_get_volumes() {
// let volumes = get_volumes()?;
// dbg!(&volumes);
// assert!(volumes.len() > 0);
// }

View File

@@ -0,0 +1,147 @@
// use crate::{
// library::Library,
// volume::{Volume, VolumeError},
// Node,
// };
// use futures_concurrency::future::Join;
// use sd_core_sync::SyncManager;
// use sd_prisma::{
// prisma::{device, PrismaClient},
// prisma_sync,
// };
// use sd_sync::*;
// use sd_utils::uuid_to_bytes;
// use std::sync::Arc;
// use tracing::error;
// use uuid::Uuid;
// use super::os::get_volumes;
// async fn update_storage_statistics(
// db: &PrismaClient,
// sync: &SyncManager,
// total_capacity: u64,
// available_capacity: u64,
// ) -> Result<(), VolumeError> {
// let device_pub_id = sync.device_pub_id.to_db();
// let storage_statistics_pub_id = db
// .storage_statistics()
// .find_first(vec![storage_statistics::device::is(vec![
// device::pub_id::equals(device_pub_id.clone()),
// ])])
// .select(storage_statistics::select!({ pub_id }))
// .exec()
// .await?
// .map(|s| s.pub_id);
// if let Some(storage_statistics_pub_id) = storage_statistics_pub_id {
// sync.write_ops(
// db,
// (
// [
// sync_entry!(total_capacity, storage_statistics::total_capacity),
// sync_entry!(available_capacity, storage_statistics::available_capacity),
// ]
// .into_iter()
// .map(|(field, value)| {
// sync.shared_update(
// prisma_sync::storage_statistics::SyncId {
// pub_id: storage_statistics_pub_id.clone(),
// },
// field,
// value,
// )
// })
// .collect(),
// db.storage_statistics()
// .update(
// storage_statistics::pub_id::equals(storage_statistics_pub_id),
// vec![
// storage_statistics::total_capacity::set(total_capacity as i64),
// storage_statistics::available_capacity::set(available_capacity as i64),
// ],
// )
// // We don't need any data here, just the id avoids receiving the entire object
// // as we can't pass an empty select macro call
// .select(storage_statistics::select!({ id })),
// ),
// )
// .await?;
// } else {
// let new_storage_statistics_id = uuid_to_bytes(&Uuid::now_v7());
// sync.write_op(
// db,
// sync.shared_create(
// prisma_sync::storage_statistics::SyncId {
// pub_id: new_storage_statistics_id.clone(),
// },
// [
// sync_entry!(total_capacity, storage_statistics::total_capacity),
// sync_entry!(available_capacity, storage_statistics::available_capacity),
// sync_entry!(
// prisma_sync::device::SyncId {
// pub_id: device_pub_id.clone()
// },
// storage_statistics::device
// ),
// ],
// ),
// db.storage_statistics()
// .create(
// new_storage_statistics_id,
// vec![
// storage_statistics::total_capacity::set(total_capacity as i64),
// storage_statistics::available_capacity::set(available_capacity as i64),
// storage_statistics::device::connect(device::pub_id::equals(device_pub_id)),
// ],
// )
// // We don't need any data here, just the id avoids receiving the entire object
// // as we can't pass an empty select macro call
// .select(storage_statistics::select!({ id })),
// )
// .await?;
// }
// Ok(())
// }
// pub fn save_storage_statistics(node: &Node) {
// // tokio::spawn({
// // let libraries = Arc::clone(&node.libraries);
// // async move {
// // let (total_capacity, available_capacity) = compute_stats(&get_volumes().await);
// // libraries
// // .get_all()
// // .await
// // .into_iter()
// // .map(move |library: Arc<Library>| async move {
// // let Library { db, sync, .. } = &*library;
// // update_storage_statistics(db, sync, total_capacity, available_capacity).await
// // })
// // .collect::<Vec<_>>()
// // .join()
// // .await
// // .into_iter()
// // .for_each(|res| {
// // if let Err(e) = res {
// // error!(?e, "Failed to save storage statistics;");
// // }
// // });
// // }
// // });
// }
// fn compute_stats<'v>(volumes: impl IntoIterator<Item = &'v Volume>) -> (u64, u64) {
// volumes
// .into_iter()
// .fold((0, 0), |(mut total, mut available), volume| {
// total += volume.total_bytes_capacity;
// available += volume.total_bytes_available;
// (total, available)
// })
// }

View File

@@ -0,0 +1,372 @@
use std::path::PathBuf;
use tokio::task;
use tracing::{debug, error, warn};
use super::error::VolumeError;
use super::types::{DiskType, FileSystem, MountType, Volume};
// Re-export platform-specific get_volumes function
#[cfg(target_os = "linux")]
pub use self::linux::get_volumes;
#[cfg(target_os = "macos")]
pub use self::macos::get_volumes;
#[cfg(target_os = "windows")]
pub use self::windows::get_volumes;
/// Common utilities for volume detection across platforms
mod common {
use super::*;
pub fn parse_size(size_str: &str) -> u64 {
size_str
.chars()
.filter(|c| c.is_digit(10))
.collect::<String>()
.parse()
.unwrap_or(0)
}
pub fn is_virtual_filesystem(fs: &str) -> bool {
matches!(
fs.to_lowercase().as_str(),
"devfs" | "sysfs" | "proc" | "tmpfs" | "ramfs" | "devtmpfs"
)
}
}
#[cfg(target_os = "macos")]
mod macos {
use super::*;
use std::process::Command;
use sysinfo::{DiskExt, System, SystemExt};
pub async fn get_volumes() -> Vec<Volume> {
task::spawn_blocking(|| {
let mut volumes = Vec::new();
let mut sys = System::new_all();
sys.refresh_disks_list();
for disk in sys.disks() {
// Skip virtual filesystems
if common::is_virtual_filesystem(
disk.file_system().to_string_lossy().to_string().as_str(),
) {
continue;
}
let mut volume = Volume::new(
disk.name().to_string_lossy().to_string(),
if disk.is_removable() {
MountType::External
} else {
MountType::System
},
disk.mount_point().to_path_buf(),
detect_disk_type(disk.name().to_string_lossy().as_ref()),
FileSystem::from_string(&disk.file_system().to_string_lossy()),
disk.total_space(),
disk.available_space(),
);
volume.read_only = is_volume_readonly(disk.mount_point());
volumes.push(volume);
}
volumes
})
.await
.unwrap_or_default()
}
fn detect_disk_type(device_name: &str) -> DiskType {
let output = Command::new("diskutil")
.args(["info", device_name])
.output();
match output {
Ok(output) => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains("Solid State") {
DiskType::SSD
} else if info.contains("Rotational") {
DiskType::HDD
} else {
DiskType::Unknown
}
}
Err(_) => DiskType::Unknown,
}
}
fn is_volume_readonly(mount_point: &std::path::Path) -> bool {
let output = Command::new("mount")
.output()
.ok()
.map(|o| String::from_utf8_lossy(&o.stdout).to_string());
match output {
Some(mount_output) => mount_output
.lines()
.find(|line| line.contains(&mount_point.to_string_lossy()))
.map(|line| line.contains("read-only"))
.unwrap_or(false),
None => false,
}
}
}
#[cfg(target_os = "linux")]
mod linux {
use super::*;
use std::{fs, process::Command};
use sysinfo::{DiskExt, System, SystemExt};
pub async fn get_volumes() -> Vec<Volume> {
task::spawn_blocking(|| {
let mut volumes = Vec::new();
let mut sys = System::new_all();
sys.refresh_disks_list();
// Read /proc/mounts for additional mount information
let mounts = fs::read_to_string("/proc/mounts").unwrap_or_default();
let mount_points: Vec<_> = mounts
.lines()
.filter(|line| !line.starts_with("none"))
.collect();
for disk in sys.disks() {
// Skip virtual filesystems
if common::is_virtual_filesystem(
disk.file_system().to_string_lossy().to_string().as_str(),
) {
continue;
}
let mount_point = disk.mount_point().to_path_buf();
let mount_info = mount_points
.iter()
.find(|&line| line.contains(&mount_point.to_string_lossy()));
let is_network = mount_info
.map(|info| info.starts_with("//") || info.starts_with("nfs"))
.unwrap_or(false);
let mount_type = if is_network {
MountType::Network
} else if disk.is_removable() {
MountType::External
} else {
MountType::System
};
let mut volume = Volume::new(
disk.name().to_string_lossy().to_string(),
mount_type,
mount_point.clone(),
detect_disk_type(&disk.name().to_string_lossy()),
FileSystem::from_string(&disk.file_system().to_string_lossy()),
disk.total_space(),
disk.available_space(),
);
volume.read_only = mount_info
.map(|info| info.contains("ro,") || info.contains(",ro"))
.unwrap_or(false);
volumes.push(volume);
}
volumes
})
.await
.unwrap_or_default()
}
fn detect_disk_type(device_name: &str) -> DiskType {
// Try reading rotational flag from sys
if let Ok(rotational) =
fs::read_to_string(format!("/sys/block/{}/queue/rotational", device_name))
{
match rotational.trim() {
"0" => return DiskType::SSD,
"1" => return DiskType::HDD,
_ => {}
}
}
// Fallback to lsblk
let output = Command::new("lsblk")
.args(["-d", "-o", "name,rota", device_name])
.output();
match output {
Ok(output) => {
let info = String::from_utf8_lossy(&output.stdout);
if info.contains(" 0") {
DiskType::SSD
} else if info.contains(" 1") {
DiskType::HDD
} else {
DiskType::Unknown
}
}
Err(_) => DiskType::Unknown,
}
}
}
#[cfg(target_os = "windows")]
mod windows {
use super::*;
use std::ffi::OsString;
use std::os::windows::ffi::OsStringExt;
use windows::Win32::Storage::FileSystem::{
GetDiskFreeSpaceExW, GetDriveTypeW, GetVolumeInformationW, DRIVE_FIXED, DRIVE_REMOTE,
DRIVE_REMOVABLE,
};
use windows::Win32::System::Ioctl::STORAGE_PROPERTY_QUERY;
pub async fn get_volumes() -> Vec<Volume> {
task::spawn_blocking(|| {
let mut volumes = Vec::new();
// Get available drives
let drives = unsafe { windows::Win32::Storage::FileSystem::GetLogicalDrives() };
for i in 0..26 {
if (drives & (1 << i)) != 0 {
let drive_letter = (b'A' + i as u8) as char;
let path = format!("{}:\\", drive_letter);
let wide_path: Vec<u16> = OsString::from(&path)
.encode_wide()
.chain(std::iter::once(0))
.collect();
let drive_type = unsafe { GetDriveTypeW(wide_path.as_ptr()) };
// Skip CD-ROM drives and other unsupported types
if drive_type == DRIVE_FIXED
|| drive_type == DRIVE_REMOVABLE
|| drive_type == DRIVE_REMOTE
{
if let Some(volume) = get_volume_info(&path, drive_type) {
volumes.push(volume);
}
}
}
}
volumes
})
.await
.unwrap_or_default()
}
fn get_volume_info(path: &str, drive_type: u32) -> Option<Volume> {
let wide_path: Vec<u16> = OsString::from(path)
.encode_wide()
.chain(std::iter::once(0))
.collect();
let mut name_buf = [0u16; 256];
let mut fs_name_buf = [0u16; 256];
let mut serial_number = 0;
let mut max_component_length = 0;
let mut flags = 0;
unsafe {
let success = GetVolumeInformationW(
wide_path.as_ptr(),
name_buf.as_mut_ptr(),
name_buf.len() as u32,
&mut serial_number,
&mut max_component_length,
&mut flags,
fs_name_buf.as_mut_ptr(),
fs_name_buf.len() as u32,
);
if success.as_bool() {
let mut total_bytes = 0;
let mut free_bytes = 0;
let mut available_bytes = 0;
if GetDiskFreeSpaceExW(
wide_path.as_ptr(),
&mut available_bytes,
&mut total_bytes,
&mut free_bytes,
)
.as_bool()
{
let mount_type = match drive_type {
DRIVE_FIXED => MountType::System,
DRIVE_REMOVABLE => MountType::External,
DRIVE_REMOTE => MountType::Network,
_ => MountType::System,
};
let volume_name = String::from_utf16_lossy(&name_buf)
.trim_matches(char::from(0))
.to_string();
let fs_name = String::from_utf16_lossy(&fs_name_buf)
.trim_matches(char::from(0))
.to_string();
Some(Volume::new(
if volume_name.is_empty() {
path.to_string()
} else {
volume_name
},
mount_type,
PathBuf::from(path),
detect_disk_type(path),
FileSystem::from_string(&fs_name),
total_bytes as u64,
available_bytes as u64,
))
} else {
None
}
} else {
None
}
}
}
fn detect_disk_type(path: &str) -> DiskType {
// We would need to use DeviceIoControl to get this information
// For brevity, returning Unknown, but you could implement the full detection
// using IOCTL_STORAGE_QUERY_PROPERTY
DiskType::Unknown
}
}
// Re-export the platform-specific get_volumes function
pub use self::get_volumes_impl::get_volumes;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_get_volumes() {
let volumes = get_volumes().await;
assert!(!volumes.is_empty(), "Should detect at least one volume");
for volume in volumes {
println!("Detected volume: {:?}", volume);
assert!(
!volume.mount_point.as_os_str().is_empty(),
"Mount point should not be empty"
);
assert!(!volume.name.is_empty(), "Volume name should not be empty");
assert!(
volume.total_bytes_capacity > 0,
"Volume should have non-zero capacity"
);
}
}
}

View File

@@ -60,6 +60,8 @@ impl VolumeManagerState {
continue;
}
// check if vol
let volume_id = match &volume.pub_id {
Some(id) => id.clone(),
None => {

View File

@@ -21,7 +21,7 @@ pub use {
actor::VolumeManagerActor,
error::VolumeError,
manager::VolumeManagerState,
types::{DiskType, FileSystem, MountType, Volume, VolumeOptions},
types::{DiskType, FileSystem, MountType, Volume, VolumeEvent, VolumeOptions},
volumes::Volumes,
};

View File

@@ -28,108 +28,83 @@ mod common {
)
}
}
#[cfg(target_os = "macos")]
pub mod macos {
use super::*;
use std::{collections::HashMap, path::PathBuf, process::Command};
use std::{path::PathBuf, process::Command};
use sysinfo::{DiskExt, System, SystemExt};
pub async fn get_volumes() -> Result<Vec<Volume>, VolumeError> {
task::spawn_blocking(|| {
let mut volumes = Vec::new();
let mut sys = System::new_all();
sys.refresh_disks_list();
// First collect disk info in blocking context
let disk_info: Vec<(String, bool, PathBuf, Vec<u8>, u64, u64)> =
task::spawn_blocking(|| {
let mut sys = System::new_all();
sys.refresh_disks_list();
let mut temp_volumes: HashMap<String, Volume> = HashMap::new();
sys.disks()
.iter()
.filter(|disk| {
!common::is_virtual_filesystem(
std::str::from_utf8(disk.file_system()).unwrap_or(""),
)
})
.map(|disk| {
(
disk.name().to_string_lossy().to_string(),
disk.is_removable(),
disk.mount_point().to_path_buf(),
disk.file_system().to_vec(),
disk.total_space(),
disk.available_space(),
)
})
.collect::<Vec<_>>() // Specify that the collection should be a Vec
})
.await
.map_err(|e| VolumeError::Platform(format!("Task join error: {}", e)))?;
for disk in sys.disks() {
// Skip virtual filesystems
if common::is_virtual_filesystem(
std::str::from_utf8(disk.file_system()).unwrap_or(""),
) {
continue;
// Then create volumes with the collected info
let mut volumes = Vec::new();
for (name, is_removable, mount_point, file_system, total_space, available_space) in
disk_info
{
if !mount_point.exists() {
continue;
}
let read_only = is_volume_readonly(&mount_point)?;
// Skip adding the `/` mount point if it's both read-only and a system volume
if mount_point == PathBuf::from("/") && read_only {
continue;
}
let disk_type = detect_disk_type(&name)?;
let mut mount_points = vec![mount_point.clone()];
// For macOS APFS system volumes
if mount_point == PathBuf::from("/") {
let data_path = PathBuf::from("/System/Volumes/Data");
if data_path.exists() {
mount_points.push(data_path);
}
let disk_type = detect_disk_type(disk.name().to_string_lossy().as_ref())?;
let mount_point = disk.mount_point().to_path_buf();
// For APFS volumes, use the disk name as the key
let key = disk.name().to_string_lossy().to_string();
let mount_points = vec![mount_point.clone()];
if let Some(existing) = temp_volumes.get_mut(&key) {
// If we already have this volume, add the mount point
existing.mount_points.push(mount_point);
continue;
}
let volume = Volume::new(
disk.name().to_string_lossy().to_string(),
if disk.is_removable() {
MountType::External
} else {
MountType::System
},
mount_point.clone(),
mount_points,
disk_type,
FileSystem::from_string(&String::from_utf8_lossy(&disk.file_system())),
disk.total_space(),
disk.available_space(),
is_volume_readonly(&mount_point)?,
);
temp_volumes.insert(key, volume);
}
// Move volumes from HashMap to Vec
volumes.extend(temp_volumes.into_values());
Ok(volumes)
})
.await
.map_err(|e| VolumeError::Platform(format!("Task join error: {}", e)))?
}
fn create_volume_from_disk(disk: &sysinfo::Disk) -> Result<Volume, VolumeError> {
let disk_type = detect_disk_type(disk.name().to_string_lossy().as_ref())?;
let primary_mount_point = disk.mount_point().to_path_buf();
if !primary_mount_point.exists() {
return Err(VolumeError::NoMountPoint);
volumes.push(Volume::new(
name,
if is_removable {
MountType::External
} else {
MountType::System
},
mount_point,
mount_points,
disk_type,
FileSystem::from_string(&String::from_utf8_lossy(&file_system)),
total_space,
available_space,
read_only,
));
}
// Get all mount points for this volume
let mut mount_points = Vec::new();
mount_points.push(primary_mount_point.clone());
// For macOS APFS system volumes
if primary_mount_point == PathBuf::from("/") {
let data_path = PathBuf::from("/System/Volumes/Data");
if data_path.exists() {
mount_points.push(data_path);
}
}
let read_only = is_volume_readonly(&primary_mount_point)?;
Ok(Volume::new(
disk.name().to_string_lossy().to_string(),
if disk.is_removable() {
MountType::External
} else {
MountType::System
},
primary_mount_point,
mount_points,
disk_type,
FileSystem::from_string(&String::from_utf8_lossy(&disk.file_system())),
disk.total_space(),
disk.available_space(),
read_only,
))
Ok(volumes)
}
fn detect_disk_type(device_name: &str) -> Result<DiskType, VolumeError> {

View File

@@ -13,7 +13,7 @@ use strum_macros::Display;
use uuid::Uuid;
/// Events emitted by the Volume Manager when volume state changes
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Type, Deserialize, Serialize)]
pub enum VolumeEvent {
/// Emitted when a new volume is discovered and added
VolumeAdded(Volume),

View File

@@ -108,5 +108,18 @@ impl Volumes {
rx.await.map_err(|_| VolumeError::Cancelled)?
}
pub async fn unmount_volume(&self, volume_id: Vec<u8>) -> Result<(), VolumeError> {
let (tx, rx) = oneshot::channel();
let msg = VolumeManagerMessage::UnmountVolume { volume_id, ack: tx };
self.message_tx
.send(msg)
.await
.map_err(|_| VolumeError::Cancelled)?;
rx.await.map_err(|_| VolumeError::Cancelled)?;
Ok(())
}
// Other public methods...
}

View File

@@ -1,7 +1,14 @@
import { ArrowRight, EjectSimple } from '@phosphor-icons/react';
import { useQueryClient } from '@tanstack/react-query';
import clsx from 'clsx';
import { PropsWithChildren, useMemo } from 'react';
import { useBridgeQuery, useLibraryQuery } from '@sd/client';
import { MouseEvent, PropsWithChildren, useMemo } from 'react';
import {
useBridgeQuery,
useLibraryMutation,
useLibraryQuery,
useLibrarySubscription,
Volume
} from '@sd/client';
import { Button, toast, tw } from '@sd/ui';
import { Icon, IconName } from '~/components';
import { useLocale } from '~/hooks';
@@ -16,16 +23,30 @@ import { SeeMore } from '../../SidebarLayout/SeeMore';
const Name = tw.span`truncate`;
// TODO: This eject button does nothing!
const EjectButton = ({ className }: { className?: string }) => (
<Button
className={clsx('absolute right-[2px] !p-[5px]', className)}
variant="subtle"
onClick={() => toast.info('Eject button coming soon')}
>
<EjectSimple weight="fill" size={18} className="size-3 opacity-70" />
</Button>
);
// Improved eject button that actually unmounts the volume
const EjectButton = ({ volumeId, className }: { volumeId: Uint8Array; className?: string }) => {
const unmountMutation = useLibraryMutation('volumes.track');
return (
<Button
className={clsx('absolute right-[2px] !p-[5px]', className)}
variant="subtle"
onClick={async (e: MouseEvent) => {
e.preventDefault(); // Prevent navigation
try {
await unmountMutation.mutateAsync({
volume_id: Array.from(volumeId) // Convert Uint8Array to number[]
});
toast.success('Volume ejected successfully');
} catch (error) {
toast.error('Failed to eject volume');
}
}}
>
<EjectSimple weight="fill" size={18} className="size-3 opacity-70" />
</Button>
);
};
const SidebarIcon = ({ name }: { name: IconName }) => {
return <Icon name={name} size={20} className="mr-1" />;
@@ -33,39 +54,63 @@ const SidebarIcon = ({ name }: { name: IconName }) => {
export default function LocalSection() {
const platform = usePlatform();
const queryClient = useQueryClient();
const locationsQuery = useLibraryQuery(['locations.list']);
const locations = locationsQuery.data;
const homeDir = useHomeDir();
const result = useBridgeQuery(['volumes.list']);
const volumes = result.data;
const volumesQuery = useBridgeQuery(['volumes.list']);
const volumes = volumesQuery.data;
const volumeEvents = useLibrarySubscription(['volumes.events'], {
onData: (data) => {
console.log('Volume event received:', data);
}
});
// Replace subscription with manual invalidation
const handleVolumeChange = () => {};
const { t } = useLocale();
// this will return an array of location ids that are also volumes
// { "/Mount/Point": 1, "/Mount/Point2": 2"}
// Improved volume tracking
const trackVolumeMutation = useLibraryMutation('volumes.track');
// Mapping of volume paths to location IDs
const locationIdsForVolumes = useMemo(() => {
if (!locations || !volumes) return {};
const matchedLocations = locations.filter((location) =>
volumes.some((volume) => volume.mount_point === location.path)
);
const locationIdsMap = matchedLocations.reduce(
return locations.reduce(
(acc, location) => {
if (location.path) {
acc[location.path] = location.id;
const matchingVolume = volumes.find((v) =>
v.mount_points.some((mp) => mp === location.path)
);
if (matchingVolume && matchingVolume.pub_id && location.path) {
acc[location.path] = {
locationId: location.id,
volumeId: new Uint8Array(matchingVolume.pub_id)
};
}
return acc;
},
{} as {
[key: string]: number;
}
{} as Record<string, { locationId: number; volumeId: Uint8Array }>
);
return locationIdsMap;
}, [locations, volumes]);
// Filter out non-unique volumes and handle mount points
const uniqueVolumes = useMemo(() => {
if (!volumes) return [];
return volumes.filter((volume) => {
if (volume.mount_type === 'System' && volume.name === 'System Reserved') return false;
if (volume.mount_points.some((mp) => mp === homeDir.data)) return false;
return true;
});
}, [volumes, homeDir.data]);
return (
<Section name={t('local')}>
<SeeMore>
@@ -76,23 +121,79 @@ export default function LocalSection() {
{homeDir.data && (
<EphemeralLocation
navigateTo={`ephemeral/0?path=${homeDir.data}`}
path={homeDir.data ?? ''}
navigateTo={`ephemeral/home?path=${homeDir.data}`}
path={homeDir.data}
>
<SidebarIcon name="Home" />
<Name>{t('home')}</Name>
</EphemeralLocation>
)}
{uniqueVolumes.map((volume) => {
const mountPoint = volume.mount_points[0];
if (!mountPoint) return null;
const key = `${volume.pub_id}-${mountPoint}`;
const locationInfo = locationIdsForVolumes[mountPoint];
const isTracked = locationInfo !== undefined;
const toPath = isTracked
? `location/${locationInfo.locationId}`
: `ephemeral/${key}?path=${volume.mount_point}`;
const displayName = mountPoint === '/' ? 'Root' : volume.name || mountPoint;
return (
<EphemeralLocation
key={key}
navigateTo={toPath}
path={mountPoint}
onTrack={async () => {
if (!isTracked && volume.pub_id) {
try {
await trackVolumeMutation.mutateAsync({
volume_id: Array.from(volume.pub_id) // Convert Uint8Array to number[]
});
toast.success('Volume tracked successfully');
} catch (error) {
toast.error('Failed to track volume');
}
}
}}
>
<SidebarIcon name={getVolumeIcon(volume)} />
<Name>{displayName}</Name>
{volume.mount_type === 'External' && volume.pub_id && (
<EjectButton volumeId={new Uint8Array(volume.pub_id)} />
)}
</EphemeralLocation>
);
})}
</SeeMore>
</Section>
);
}
function getVolumeIcon(volume: Volume): IconName {
if (volume.file_system === 'ExFAT') return 'SD';
if (volume.name === 'Macintosh HD') return 'HDD';
if (volume.disk_type === 'SSD') return 'HDD';
if (volume.mount_type === 'Network') return 'Globe';
if (volume.mount_type === 'External') return 'SD';
return 'Drive';
}
// Updated EphemeralLocation component to handle tracking separately
const EphemeralLocation = ({
children,
path,
navigateTo
}: PropsWithChildren<{ path: string; navigateTo: string }>) => {
navigateTo,
onTrack
}: PropsWithChildren<{
path: string;
navigateTo: string;
onTrack?: () => Promise<void>;
}>) => {
const [{ path: ephemeralPath }] = useExplorerSearchParams();
const { isDroppable, className, setDroppableRef } = useExplorerDroppable({
@@ -101,6 +202,7 @@ const EphemeralLocation = ({
data: { type: 'location', path },
disabled: navigateTo.startsWith('location/') || ephemeralPath === path,
navigateTo: navigateTo
// onNavigate: onTrack
});
return (

View File

@@ -60,7 +60,8 @@ export type Procedures = {
{ key: "tags.getForObject", input: LibraryArgs<number>, result: Tag[] } |
{ key: "tags.getWithObjects", input: LibraryArgs<number[]>, result: { [key in number]: ({ object: { id: number }; date_created: string | null })[] } } |
{ key: "tags.list", input: LibraryArgs<null>, result: Tag[] } |
{ key: "volumes.list", input: never, result: Volume[] },
{ key: "volumes.list", input: never, result: Volume[] } |
{ key: "volumes.listForLibrary", input: LibraryArgs<null>, result: Volume[] },
mutations:
{ key: "api.sendFeedback", input: Feedback, result: null } |
{ key: "backups.backup", input: LibraryArgs<null>, result: string } |
@@ -136,7 +137,9 @@ export type Procedures = {
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
{ key: "tags.update", input: LibraryArgs<TagUpdateArgs>, result: null } |
{ key: "toggleFeatureFlag", input: BackendFeature, result: null },
{ key: "toggleFeatureFlag", input: BackendFeature, result: null } |
{ key: "volumes.track", input: LibraryArgs<TrackVolumeInput>, result: null } |
{ key: "volumes.unmount", input: LibraryArgs<number[]>, result: null },
subscriptions:
{ key: "cloud.listenCloudServicesNotifications", input: never, result: CloudP2PNotifyUser } |
{ key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } |
@@ -150,7 +153,8 @@ export type Procedures = {
{ key: "notifications.listen", input: never, result: Notification } |
{ key: "p2p.events", input: never, result: P2PEvent } |
{ key: "search.ephemeralPaths", input: LibraryArgs<EphemeralPathSearchArgs>, result: { entries: ExplorerItem[]; errors: Error[] } } |
{ key: "sync.active", input: LibraryArgs<null>, result: SyncStatus }
{ key: "sync.active", input: LibraryArgs<null>, result: SyncStatus } |
{ key: "volumes.events", input: LibraryArgs<null>, result: VolumeEvent }
};
/**
@@ -786,6 +790,8 @@ export type TextMatch = { contains: string } | { startsWith: string } | { endsWi
*/
export type ThumbKey = { shard_hex: string; cas_id: CasId; base_directory_str: string }
export type TrackVolumeInput = { volume_id: number[] }
export type UpdateThumbnailerPreferences = Record<string, never>
export type VideoProps = { pixel_format: string | null; color_range: string | null; bits_per_channel: number | null; color_space: string | null; color_primaries: string | null; color_transfer: string | null; field_order: string | null; chroma_location: string | null; width: number; height: number; aspect_ratio_num: number | null; aspect_ratio_den: number | null; properties: string[] }
@@ -854,3 +860,32 @@ total_bytes_capacity: string;
* Available storage space in bytes
*/
total_bytes_available: string }
/**
* Events emitted by the Volume Manager when volume state changes
*/
export type VolumeEvent =
/**
* Emitted when a new volume is discovered and added
*/
{ VolumeAdded: Volume } |
/**
* Emitted when a volume is removed from the system
*/
{ VolumeRemoved: Volume } |
/**
* Emitted when a volume's properties are updated
*/
{ VolumeUpdated: { old: Volume; new: Volume } } |
/**
* Emitted when a volume's speed test completes
*/
{ VolumeSpeedTested: { id: number[]; read_speed: bigint; write_speed: bigint } } |
/**
* Emitted when a volume's mount status changes
*/
{ VolumeMountChanged: { id: number[]; is_mounted: boolean } } |
/**
* Emitted when a volume encounters an error
*/
{ VolumeError: { id: number[]; error: string } }