mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-18 13:26:00 -04:00
Add volume event logging and improve watcher integration with enhanced state management and maintenance routines
This commit is contained in:
@@ -75,6 +75,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
let mut event_bus_rx = node.volumes.subscribe();
|
||||
|
||||
while let Ok(event) = event_bus_rx.recv().await {
|
||||
tracing::debug!("Volume event: {:?}", event);
|
||||
yield event;
|
||||
}
|
||||
})
|
||||
|
||||
@@ -2,13 +2,15 @@ use super::{
|
||||
error::VolumeError,
|
||||
types::{Volume, VolumeEvent, VolumeOptions},
|
||||
volumes::Volumes,
|
||||
watcher::{VolumeWatcher, WatcherState},
|
||||
VolumeManagerContext, VolumeManagerState,
|
||||
};
|
||||
use crate::library::{Library, LibraryManagerEvent};
|
||||
use async_channel as chan;
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
|
||||
use tracing::{error, info, trace};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const DEFAULT_CHANNEL_SIZE: usize = 128;
|
||||
@@ -77,10 +79,10 @@ impl VolumeManagerActor {
|
||||
options: VolumeOptions,
|
||||
) -> Result<(Volumes, Self), VolumeError> {
|
||||
let (message_tx, message_rx) = chan::bounded(DEFAULT_CHANNEL_SIZE);
|
||||
let (event_tx, _) = broadcast::channel(DEFAULT_CHANNEL_SIZE);
|
||||
let (event_tx, event_rx) = broadcast::channel(DEFAULT_CHANNEL_SIZE);
|
||||
|
||||
let manager = Volumes::new(message_tx, event_tx.clone());
|
||||
let state = VolumeManagerState::new(options).await?;
|
||||
let state = VolumeManagerState::new(options, event_tx.clone()).await?;
|
||||
|
||||
let actor = VolumeManagerActor {
|
||||
state: Arc::new(RwLock::new(state)),
|
||||
@@ -89,9 +91,22 @@ impl VolumeManagerActor {
|
||||
ctx,
|
||||
};
|
||||
|
||||
// Pass event_rx to start monitoring task immediately
|
||||
actor.clone().start_event_monitoring(event_rx);
|
||||
|
||||
Ok((manager, actor))
|
||||
}
|
||||
|
||||
fn start_event_monitoring(self, mut event_rx: broadcast::Receiver<VolumeEvent>) {
|
||||
tokio::spawn(async move {
|
||||
debug!("Starting volume event monitoring");
|
||||
while let Ok(event) = event_rx.recv().await {
|
||||
debug!("Volume event processed: {:?}", event);
|
||||
}
|
||||
warn!("Volume event monitoring ended");
|
||||
});
|
||||
}
|
||||
|
||||
/// Starts the VolumeManagerActor
|
||||
pub async fn start(self) {
|
||||
info!("Volume manager actor started");
|
||||
@@ -164,25 +179,36 @@ impl VolumeManagerActor {
|
||||
}
|
||||
});
|
||||
|
||||
// Start the volume watcher
|
||||
let self_arc_watcher = Arc::clone(&self_arc);
|
||||
tokio::spawn(async move {
|
||||
let mut actor = self_arc_watcher.lock().await;
|
||||
let state = actor.state.write().await;
|
||||
|
||||
// Create and start watcher for each volume
|
||||
for (volume_id, volume) in &state.volumes {
|
||||
if let Some(watcher) = state.watchers.get(volume_id) {
|
||||
if let Err(e) = watcher.watcher.start().await {
|
||||
error!(?e, "Failed to start watcher for volume {}", volume.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
info!("Volume manager actor initialized");
|
||||
}
|
||||
|
||||
async fn scan_volumes(&mut self) -> Result<(), VolumeError> {
|
||||
let mut state = self.state.write().await;
|
||||
state.scan_volumes().await
|
||||
}
|
||||
|
||||
pub async fn initialize(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
|
||||
// Use device_id from context instead of node
|
||||
let device_pub_id = self.ctx.device_id.clone();
|
||||
|
||||
// Scan for system volumes first
|
||||
{
|
||||
let mut state = self.state.write().await;
|
||||
state.scan_volumes().await?;
|
||||
state.scan_volumes(device_pub_id.clone()).await?;
|
||||
}
|
||||
|
||||
// Use device_id from context instead of node
|
||||
let device_id = self.ctx.device_id.clone();
|
||||
|
||||
// Rest of initialize remains the same...
|
||||
// Get volumes from library database
|
||||
let db_volumes = library
|
||||
.db
|
||||
.volume()
|
||||
@@ -202,11 +228,11 @@ impl VolumeManagerActor {
|
||||
|
||||
// Prepare updates
|
||||
for db_volume in db_volumes {
|
||||
let fingerprint = db_volume.generate_fingerprint(device_id.clone().into());
|
||||
let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into());
|
||||
|
||||
if let Some(system_volume) = current_volumes
|
||||
.values()
|
||||
.find(|v| v.generate_fingerprint(device_id.clone().into()) == fingerprint)
|
||||
.find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint)
|
||||
{
|
||||
let merged = Volume::merge_with_db_volume(system_volume, &db_volume);
|
||||
if let Some(pub_id) = &merged.pub_id {
|
||||
@@ -222,11 +248,49 @@ impl VolumeManagerActor {
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates
|
||||
// Apply updates and initialize watchers
|
||||
{
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
// Update volumes
|
||||
for (pub_id, volume) in updates {
|
||||
state.volumes.insert(pub_id, volume);
|
||||
state.volumes.insert(pub_id.clone(), volume);
|
||||
|
||||
// Create and start watcher if it doesn't exist
|
||||
if !state.watchers.contains_key(&pub_id) {
|
||||
let watcher = VolumeWatcher::new(self.event_tx.clone());
|
||||
if let Err(e) = watcher.start().await {
|
||||
error!(
|
||||
?e,
|
||||
"Failed to start watcher for volume {}",
|
||||
hex::encode(&pub_id)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
state.watchers.insert(
|
||||
pub_id,
|
||||
WatcherState {
|
||||
watcher: Arc::new(watcher),
|
||||
last_event: Instant::now(),
|
||||
paused: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove any watchers for volumes that no longer exist
|
||||
let stale_watchers: Vec<_> = state
|
||||
.watchers
|
||||
.keys()
|
||||
.filter(|id| !state.volumes.contains_key(*id))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
for volume_id in stale_watchers {
|
||||
if let Some(watcher_state) = state.watchers.remove(&volume_id) {
|
||||
watcher_state.watcher.stop().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,6 +301,45 @@ impl VolumeManagerActor {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn perform_maintenance(&mut self) -> Result<(), VolumeError> {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
// Pass device_id to maintenance
|
||||
if let Err(e) = state.maintenance(self.ctx.device_id.clone()).await {
|
||||
error!(?e, "Volume maintenance error");
|
||||
}
|
||||
|
||||
// Rescan volumes periodically
|
||||
if state.last_scan.elapsed() > Duration::from_secs(300) {
|
||||
drop(state);
|
||||
self.scan_volumes().await?;
|
||||
state = self.state.write().await;
|
||||
}
|
||||
|
||||
// Clean up stale watchers
|
||||
let stale_watchers: Vec<_> = state
|
||||
.watchers
|
||||
.iter()
|
||||
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
|
||||
for volume_id in stale_watchers {
|
||||
if let Some(watcher_state) = state.watchers.get(&volume_id) {
|
||||
watcher_state.watcher.stop().await;
|
||||
}
|
||||
state.watchers.remove(&volume_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn scan_volumes(&mut self) -> Result<(), VolumeError> {
|
||||
let mut state = self.state.write().await;
|
||||
state.scan_volumes(self.ctx.device_id.clone()).await
|
||||
}
|
||||
|
||||
async fn handle_message(&mut self, msg: VolumeManagerMessage) -> Result<(), VolumeError> {
|
||||
trace!("VolumeManagerActor received message: {:?}", msg);
|
||||
match msg {
|
||||
@@ -356,31 +459,6 @@ impl VolumeManagerActor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn perform_maintenance(&mut self) -> Result<(), VolumeError> {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
// Rescan volumes periodically
|
||||
if state.last_scan.elapsed() > Duration::from_secs(300) {
|
||||
drop(state);
|
||||
self.scan_volumes().await?;
|
||||
state = self.state.write().await;
|
||||
}
|
||||
|
||||
// Clean up stale watchers
|
||||
let stale_watchers: Vec<_> = state
|
||||
.watchers
|
||||
.iter()
|
||||
.filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600))
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
|
||||
for volume_id in stale_watchers {
|
||||
state.unwatch_volume(&volume_id).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_library_deletion(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
|
||||
// Clean up volumes associated with deleted library
|
||||
let _state = self.state.write().await;
|
||||
|
||||
@@ -7,7 +7,7 @@ use super::{
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, error, instrument};
|
||||
use tracing::{debug, error, event, instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
// const OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
@@ -29,9 +29,11 @@ pub struct VolumeManagerState {
|
||||
|
||||
impl VolumeManagerState {
|
||||
/// Creates a new volume manager
|
||||
pub async fn new(options: VolumeOptions) -> Result<Self, VolumeError> {
|
||||
let (event_tx, _) = broadcast::channel(128);
|
||||
|
||||
// Take event_tx as parameter instead of creating new channel
|
||||
pub async fn new(
|
||||
options: VolumeOptions,
|
||||
event_tx: broadcast::Sender<VolumeEvent>,
|
||||
) -> Result<Self, VolumeError> {
|
||||
Ok(Self {
|
||||
volumes: HashMap::new(),
|
||||
watchers: HashMap::new(),
|
||||
@@ -42,68 +44,72 @@ impl VolumeManagerState {
|
||||
}
|
||||
|
||||
/// Scans the system for volumes and updates the state
|
||||
#[instrument(skip(self))]
|
||||
pub async fn scan_volumes(&mut self) -> Result<(), VolumeError> {
|
||||
pub async fn scan_volumes(&mut self, device_pub_id: Vec<u8>) -> Result<(), VolumeError> {
|
||||
debug!("Scanning for volumes...");
|
||||
let detected_volumes = super::os::get_volumes().await?;
|
||||
let mut new_volumes = Vec::new();
|
||||
let mut removed_volumes = self.volumes.clone();
|
||||
|
||||
debug!("Found {} volumes during scan", detected_volumes.len());
|
||||
|
||||
// Clear existing volumes and reinsert with new IDs
|
||||
self.volumes.clear();
|
||||
let current_volumes = self.volumes.clone();
|
||||
let mut new_state = HashMap::new();
|
||||
|
||||
for volume in detected_volumes {
|
||||
// Process detected volumes
|
||||
for mut volume in detected_volumes {
|
||||
// Skip virtual volumes if configured
|
||||
if !self.options.include_virtual && matches!(volume.mount_type, MountType::Virtual) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check if vol
|
||||
// Generate fingerprint for the new volume
|
||||
let fingerprint = volume.generate_fingerprint(device_pub_id.clone());
|
||||
|
||||
let volume_id = match &volume.pub_id {
|
||||
Some(id) => id.clone(),
|
||||
None => {
|
||||
// New volume, generate ID
|
||||
let id = Uuid::now_v7().as_bytes().to_vec();
|
||||
let mut volume = volume.clone();
|
||||
volume.pub_id = Some(id.clone());
|
||||
new_volumes.push(volume);
|
||||
id
|
||||
}
|
||||
// Try to find existing volume by fingerprint
|
||||
let existing_volume = current_volumes.values().find(|existing| {
|
||||
existing.generate_fingerprint(device_pub_id.clone()) == fingerprint
|
||||
});
|
||||
|
||||
let volume_id = if let Some(existing) = existing_volume {
|
||||
// Keep existing ID
|
||||
existing
|
||||
.pub_id
|
||||
.clone()
|
||||
.unwrap_or_else(|| Uuid::now_v7().as_bytes().to_vec())
|
||||
} else {
|
||||
// Generate new ID only for truly new volumes
|
||||
Uuid::now_v7().as_bytes().to_vec()
|
||||
};
|
||||
|
||||
// Remove from potentially removed volumes
|
||||
removed_volumes.remove(&volume_id);
|
||||
volume.pub_id = Some(volume_id.clone());
|
||||
|
||||
// Update existing volume or add new one
|
||||
match self.volumes.get_mut(&volume_id) {
|
||||
Some(existing) => {
|
||||
if existing != &volume {
|
||||
let old = existing.clone();
|
||||
*existing = volume;
|
||||
let event = VolumeEvent::VolumeUpdated {
|
||||
old,
|
||||
new: existing.clone(),
|
||||
};
|
||||
self.emit_event(event).await;
|
||||
}
|
||||
match current_volumes.get(&volume_id) {
|
||||
Some(existing) if existing != &volume => {
|
||||
new_state.insert(volume_id.clone(), volume.clone());
|
||||
self.emit_event(VolumeEvent::VolumeUpdated {
|
||||
old: existing.clone(),
|
||||
new: volume,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
None => {
|
||||
self.volumes.insert(volume_id, volume.clone());
|
||||
new_state.insert(volume_id.clone(), volume.clone());
|
||||
self.emit_event(VolumeEvent::VolumeAdded(volume)).await;
|
||||
}
|
||||
_ => {
|
||||
new_state.insert(volume_id.clone(), volume);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle removed volumes
|
||||
for (id, volume) in removed_volumes {
|
||||
self.volumes.remove(&id);
|
||||
self.watchers.remove(&id);
|
||||
self.emit_event(VolumeEvent::VolumeRemoved(volume)).await;
|
||||
// Find removed volumes
|
||||
for (id, volume) in ¤t_volumes {
|
||||
if !new_state.contains_key(id) {
|
||||
self.watchers.remove(id);
|
||||
self.emit_event(VolumeEvent::VolumeRemoved(volume.clone()))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Update state
|
||||
self.volumes = new_state;
|
||||
self.last_scan = Instant::now();
|
||||
Ok(())
|
||||
}
|
||||
@@ -224,10 +230,10 @@ impl VolumeManagerState {
|
||||
}
|
||||
|
||||
/// Performs maintenance tasks
|
||||
pub async fn maintenance(&mut self) -> Result<(), VolumeError> {
|
||||
pub async fn maintenance(&mut self, device_pub_id: Vec<u8>) -> Result<(), VolumeError> {
|
||||
// Rescan volumes periodically
|
||||
if self.last_scan.elapsed() > Duration::from_secs(300) {
|
||||
self.scan_volumes().await?;
|
||||
self.scan_volumes(device_pub_id).await?;
|
||||
}
|
||||
|
||||
// Clean up stale watchers
|
||||
|
||||
@@ -1,16 +1,14 @@
|
||||
use super::error::VolumeError;
|
||||
use super::types::VolumeEvent;
|
||||
use crate::volume::{DiskType, FileSystem, MountType};
|
||||
use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::{broadcast, mpsc, RwLock},
|
||||
time::{sleep, Instant},
|
||||
};
|
||||
use tracing::error;
|
||||
use tracing::{debug, error, warn};
|
||||
|
||||
const DEBOUNCE_MS: u64 = 100;
|
||||
|
||||
/// State of a volume watcher
|
||||
#[derive(Debug)]
|
||||
pub struct WatcherState {
|
||||
pub watcher: Arc<VolumeWatcher>,
|
||||
@@ -35,51 +33,181 @@ impl VolumeWatcher {
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<(), VolumeError> {
|
||||
let (platform_tx, mut platform_rx) = mpsc::unbounded_channel();
|
||||
debug!("Starting volume watcher");
|
||||
|
||||
// Start platform-specific watcher
|
||||
self.spawn_platform_watcher(platform_tx).await?;
|
||||
let (check_tx, mut check_rx) = mpsc::channel(1);
|
||||
|
||||
// Start OS-specific watcher
|
||||
self.spawn_platform_watcher(check_tx.clone()).await?;
|
||||
|
||||
// Handle volume checks when triggered by OS events
|
||||
let event_tx = self.event_tx.clone();
|
||||
let ignored_paths = self.ignored_paths.clone();
|
||||
let running = self.running.clone();
|
||||
|
||||
// Event processor
|
||||
tokio::spawn(async move {
|
||||
let mut last_event: Option<VolumeEvent> = None;
|
||||
let mut last_check = Instant::now();
|
||||
let mut last_volumes = Vec::new();
|
||||
|
||||
while let Some(event) = platform_rx.recv().await {
|
||||
if !*running.read().await {
|
||||
break;
|
||||
}
|
||||
|
||||
// Skip ignored paths
|
||||
if let Some(path) = event.path() {
|
||||
if ignored_paths.read().await.contains(path) {
|
||||
while *running.read().await {
|
||||
// Wait for check trigger from OS watcher
|
||||
if check_rx.recv().await.is_some() {
|
||||
// Debounce checks
|
||||
if last_check.elapsed() < Duration::from_millis(DEBOUNCE_MS) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
last_check = Instant::now();
|
||||
|
||||
// Simple debouncing
|
||||
if let Some(last) = &last_event {
|
||||
if last.matches(&event) {
|
||||
sleep(Duration::from_millis(DEBOUNCE_MS)).await;
|
||||
continue;
|
||||
match super::os::get_volumes().await {
|
||||
Ok(current_volumes) => {
|
||||
// Find new volumes
|
||||
for volume in ¤t_volumes {
|
||||
if !last_volumes.iter().any(|v| v == volume) {
|
||||
debug!("New volume detected: {}", volume.name);
|
||||
let _ = event_tx.send(VolumeEvent::VolumeAdded(volume.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
// Find removed volumes
|
||||
for volume in &last_volumes {
|
||||
if !current_volumes.iter().any(|v| v == volume) {
|
||||
debug!("Volume removed: {}", volume.name);
|
||||
let _ =
|
||||
event_tx.send(VolumeEvent::VolumeRemoved(volume.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
// Find updated volumes
|
||||
for old_volume in &last_volumes {
|
||||
if let Some(new_volume) =
|
||||
current_volumes.iter().find(|v| *v == old_volume)
|
||||
{
|
||||
if new_volume != old_volume {
|
||||
debug!("Volume updated: {}", new_volume.name);
|
||||
let _ = event_tx.send(VolumeEvent::VolumeUpdated {
|
||||
old: old_volume.clone(),
|
||||
new: new_volume.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_volumes = current_volumes;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to get volumes during watch: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = event_tx.send(event.clone()) {
|
||||
error!("Failed to send volume event: {}", e);
|
||||
}
|
||||
|
||||
last_event = Some(event);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn spawn_platform_watcher(&self, check_tx: mpsc::Sender<()>) -> Result<(), VolumeError> {
|
||||
let running = self.running.clone();
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
use inotify::{Inotify, WatchMask};
|
||||
|
||||
let inotify = Inotify::init().map_err(|e| {
|
||||
VolumeError::Platform(format!("Failed to initialize inotify: {}", e))
|
||||
})?;
|
||||
|
||||
// Watch mount points and device changes
|
||||
for path in ["/dev", "/media", "/mnt", "/run/media"] {
|
||||
if let Err(e) = inotify.add_watch(
|
||||
path,
|
||||
WatchMask::CREATE | WatchMask::DELETE | WatchMask::MODIFY | WatchMask::UNMOUNT,
|
||||
) {
|
||||
warn!("Failed to watch path {}: {}", path, e);
|
||||
}
|
||||
}
|
||||
|
||||
let check_tx = check_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = [0; 4096];
|
||||
while *running.read().await {
|
||||
match inotify.read_events_blocking(&mut buffer) {
|
||||
Ok(_) => {
|
||||
if let Err(e) = check_tx.send(()).await {
|
||||
error!("Failed to trigger volume check: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Inotify error: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
use fsevent::{self, StreamFlags};
|
||||
|
||||
let (fs_tx, fs_rx) = std::sync::mpsc::channel();
|
||||
let check_tx = check_tx.clone();
|
||||
|
||||
// Watch for volume mount events
|
||||
std::thread::spawn(move || {
|
||||
let mut stream = fsevent::FsEvent::new(vec![
|
||||
"/Volumes".to_string(),
|
||||
"/System/Volumes".to_string(),
|
||||
]);
|
||||
|
||||
stream
|
||||
.observe_async(fs_tx)
|
||||
.expect("Failed to start FSEvent stream");
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while *running.read().await {
|
||||
if let Ok(events) = fs_rx.try_recv() {
|
||||
if events.flag.contains(StreamFlags::MOUNT)
|
||||
|| events.flag.contains(StreamFlags::UNMOUNT)
|
||||
{
|
||||
if let Err(e) = check_tx.send(()).await {
|
||||
error!("Failed to trigger volume check: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
{
|
||||
use windows::Win32::Storage::FileSystem::{
|
||||
FindFirstVolumeW, FindNextVolumeW, FindVolumeClose, ReadDirectoryChangesW,
|
||||
FILE_NOTIFY_CHANGE_DIR_NAME,
|
||||
};
|
||||
|
||||
let check_tx = check_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
while *running.read().await {
|
||||
// Watch for volume arrival/removal
|
||||
unsafe {
|
||||
let mut volume_name = [0u16; 260];
|
||||
let handle = FindFirstVolumeW(volume_name.as_mut_ptr());
|
||||
if !handle.is_invalid() {
|
||||
// Volume change detected
|
||||
if let Err(e) = check_tx.send(()).await {
|
||||
error!("Failed to trigger volume check: {}", e);
|
||||
}
|
||||
FindVolumeClose(handle);
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&self) {
|
||||
debug!("Stopping volume watcher");
|
||||
*self.running.write().await = false;
|
||||
}
|
||||
|
||||
@@ -90,222 +218,6 @@ impl VolumeWatcher {
|
||||
pub async fn unignore_path(&self, path: &PathBuf) {
|
||||
self.ignored_paths.write().await.remove(path);
|
||||
}
|
||||
|
||||
async fn spawn_platform_watcher(
|
||||
&self,
|
||||
tx: mpsc::UnboundedSender<VolumeEvent>,
|
||||
) -> Result<(), VolumeError> {
|
||||
#[cfg(target_os = "linux")]
|
||||
return self.spawn_linux_watcher(tx).await;
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
return self.spawn_macos_watcher(tx).await;
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
return self.spawn_windows_watcher(tx).await;
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
async fn spawn_linux_watcher(
|
||||
&self,
|
||||
tx: mpsc::UnboundedSender<VolumeEvent>,
|
||||
) -> Result<(), VolumeError> {
|
||||
use inotify::{Inotify, WatchMask};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
let mut inotify = Inotify::init()
|
||||
.map_err(|e| VolumeError::Watcher(format!("Failed to initialize inotify: {}", e)))?;
|
||||
|
||||
// Watch mount points
|
||||
for path in ["/dev", "/media", "/mnt"] {
|
||||
inotify
|
||||
.add_watch(path, WatchMask::CREATE | WatchMask::DELETE)
|
||||
.map_err(|e| {
|
||||
VolumeError::Watcher(format!("Failed to add watch on {}: {}", path, e))
|
||||
})?;
|
||||
}
|
||||
|
||||
let running = self.running.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = [0; 1024];
|
||||
while *running.read().await {
|
||||
match inotify.read_events_blocking(&mut buffer) {
|
||||
Ok(events) => {
|
||||
for event in events {
|
||||
let event_type = if event.mask.contains(inotify::EventMask::CREATE) {
|
||||
VolumeEvent::VolumeAdded
|
||||
} else {
|
||||
VolumeEvent::VolumeRemoved
|
||||
};
|
||||
let _ = tx.send(event_type);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Inotify error: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
async fn spawn_macos_watcher(
|
||||
&self,
|
||||
tx: mpsc::UnboundedSender<VolumeEvent>,
|
||||
) -> Result<(), VolumeError> {
|
||||
use fsevent::{self, StreamFlags};
|
||||
|
||||
use crate::volume::Volume;
|
||||
|
||||
// Create channels for fsevent
|
||||
let (fs_event_tx, fs_event_rx) = std::sync::mpsc::channel();
|
||||
|
||||
// Spawn thread for fsevent
|
||||
std::thread::spawn(move || {
|
||||
let mut stream = fsevent::FsEvent::new(vec!["/Volumes".to_string()]);
|
||||
|
||||
stream.observe_async(fs_event_tx).unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
stream.shutdown_observe();
|
||||
});
|
||||
|
||||
// Spawn task to process events
|
||||
let running = self.running.clone();
|
||||
tokio::spawn(async move {
|
||||
while *running.read().await {
|
||||
match fs_event_rx.try_recv() {
|
||||
Ok(event) => {
|
||||
// Get current volumes after event
|
||||
let volumes_result = super::os::get_volumes().await;
|
||||
|
||||
match volumes_result {
|
||||
Ok(current_volumes) => {
|
||||
if event.flag.contains(StreamFlags::MOUNT) {
|
||||
// Small delay to let the OS finish mounting
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
// Find newly mounted volume
|
||||
if let Some(volume) = current_volumes.iter().find(|v| {
|
||||
v.mount_point.to_string_lossy().contains(&event.path)
|
||||
}) {
|
||||
let _ = tx.send(VolumeEvent::VolumeAdded(volume.clone()));
|
||||
}
|
||||
} else if event.flag.contains(StreamFlags::UNMOUNT) {
|
||||
// For unmount, we need to synthesize a basic volume since it's already gone
|
||||
let basic_volume = Volume::new(
|
||||
event.path.clone(),
|
||||
MountType::External,
|
||||
PathBuf::from(&event.path),
|
||||
vec![],
|
||||
DiskType::Unknown,
|
||||
FileSystem::Other("unknown".to_string()),
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
);
|
||||
let _ = tx.send(VolumeEvent::VolumeRemoved(basic_volume));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to get volumes after event: {}", e);
|
||||
let _ = tx.send(VolumeEvent::VolumeError {
|
||||
id: vec![],
|
||||
error: format!("Failed to get volumes: {}", e),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(std::sync::mpsc::TryRecvError::Empty) => {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
|
||||
error!("FSEvent channel disconnected");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
async fn spawn_windows_watcher(
|
||||
&self,
|
||||
tx: mpsc::UnboundedSender<VolumeEvent>,
|
||||
) -> Result<(), VolumeError> {
|
||||
use windows::Win32::Storage::FileSystem::{
|
||||
ReadDirectoryChangesW, FILE_NOTIFY_CHANGE_DIR_NAME,
|
||||
};
|
||||
|
||||
let path = std::ffi::OsString::from("C:\\")
|
||||
.encode_wide()
|
||||
.chain(std::iter::once(0))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
unsafe {
|
||||
let handle = windows::Win32::Storage::FileSystem::CreateFileW(
|
||||
path.as_ptr(),
|
||||
windows::Win32::Storage::FileSystem::FILE_LIST_DIRECTORY,
|
||||
windows::Win32::Storage::FileSystem::FILE_SHARE_READ
|
||||
| windows::Win32::Storage::FileSystem::FILE_SHARE_WRITE
|
||||
| windows::Win32::Storage::FileSystem::FILE_SHARE_DELETE,
|
||||
std::ptr::null_mut(),
|
||||
windows::Win32::Storage::FileSystem::OPEN_EXISTING,
|
||||
windows::Win32::Storage::FileSystem::FILE_FLAG_BACKUP_SEMANTICS
|
||||
| windows::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED,
|
||||
std::ptr::null_mut(),
|
||||
);
|
||||
|
||||
if handle.is_invalid() {
|
||||
return Err(VolumeError::Watcher(
|
||||
"Failed to open directory for watching".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let running = self.running.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = [0u8; 1024];
|
||||
|
||||
while *running.read().await {
|
||||
match ReadDirectoryChangesW(
|
||||
handle,
|
||||
buffer.as_mut_ptr() as *mut _,
|
||||
buffer.len() as u32,
|
||||
true,
|
||||
FILE_NOTIFY_CHANGE_DIR_NAME as u32,
|
||||
std::ptr::null_mut(),
|
||||
std::ptr::null_mut(),
|
||||
None,
|
||||
) {
|
||||
Ok(_) => {
|
||||
let _ = tx.send(VolumeEvent::VolumeAdded);
|
||||
}
|
||||
Err(e) => error!("ReadDirectoryChangesW error: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl VolumeEvent {
|
||||
fn matches(&self, other: &VolumeEvent) -> bool {
|
||||
std::mem::discriminant(self) == std::mem::discriminant(other)
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<&PathBuf> {
|
||||
match self {
|
||||
VolumeEvent::VolumeAdded(vol) | VolumeEvent::VolumeRemoved(vol) => {
|
||||
Some(&vol.mount_point)
|
||||
}
|
||||
VolumeEvent::VolumeUpdated { new, .. } => Some(&new.mount_point),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -320,14 +232,14 @@ mod tests {
|
||||
|
||||
watcher.start().await.expect("Failed to start watcher");
|
||||
|
||||
// Wait for potential events
|
||||
let result = timeout(Duration::from_secs(1), rx.recv()).await;
|
||||
// Wait for potential volume events
|
||||
let result = timeout(Duration::from_secs(2), rx.recv()).await;
|
||||
|
||||
// Cleanup
|
||||
watcher.stop().await;
|
||||
|
||||
if let Ok(Ok(event)) = result {
|
||||
println!("Received event: {:?}", event);
|
||||
println!("Received volume event: {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user