From f60ff48b9dbbb24ccb1ab96960df1f9e288f68e9 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sat, 2 Nov 2024 22:27:31 -0700 Subject: [PATCH] Add volume event logging and improve watcher integration with enhanced state management and maintenance routines --- core/src/api/volumes.rs | 1 + core/src/volume/actor.rs | 162 +++++++++++---- core/src/volume/manager.rs | 96 +++++---- core/src/volume/watcher.rs | 408 +++++++++++++++---------------------- 4 files changed, 332 insertions(+), 335 deletions(-) diff --git a/core/src/api/volumes.rs b/core/src/api/volumes.rs index d91560f9e..4366f5568 100644 --- a/core/src/api/volumes.rs +++ b/core/src/api/volumes.rs @@ -75,6 +75,7 @@ pub(crate) fn mount() -> AlphaRouter { let mut event_bus_rx = node.volumes.subscribe(); while let Ok(event) = event_bus_rx.recv().await { + tracing::debug!("Volume event: {:?}", event); yield event; } }) diff --git a/core/src/volume/actor.rs b/core/src/volume/actor.rs index db0d4fdb8..9ae6dbb54 100644 --- a/core/src/volume/actor.rs +++ b/core/src/volume/actor.rs @@ -2,13 +2,15 @@ use super::{ error::VolumeError, types::{Volume, VolumeEvent, VolumeOptions}, volumes::Volumes, + watcher::{VolumeWatcher, WatcherState}, VolumeManagerContext, VolumeManagerState, }; use crate::library::{Library, LibraryManagerEvent}; use async_channel as chan; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; -use tracing::{error, info, trace}; +use tokio::time::Instant; +use tracing::{debug, error, info, trace, warn}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_CHANNEL_SIZE: usize = 128; @@ -77,10 +79,10 @@ impl VolumeManagerActor { options: VolumeOptions, ) -> Result<(Volumes, Self), VolumeError> { let (message_tx, message_rx) = chan::bounded(DEFAULT_CHANNEL_SIZE); - let (event_tx, _) = broadcast::channel(DEFAULT_CHANNEL_SIZE); + let (event_tx, event_rx) = broadcast::channel(DEFAULT_CHANNEL_SIZE); let manager = Volumes::new(message_tx, event_tx.clone()); - let state = VolumeManagerState::new(options).await?; + let state = VolumeManagerState::new(options, event_tx.clone()).await?; let actor = VolumeManagerActor { state: Arc::new(RwLock::new(state)), @@ -89,9 +91,22 @@ impl VolumeManagerActor { ctx, }; + // Pass event_rx to start monitoring task immediately + actor.clone().start_event_monitoring(event_rx); + Ok((manager, actor)) } + fn start_event_monitoring(self, mut event_rx: broadcast::Receiver) { + tokio::spawn(async move { + debug!("Starting volume event monitoring"); + while let Ok(event) = event_rx.recv().await { + debug!("Volume event processed: {:?}", event); + } + warn!("Volume event monitoring ended"); + }); + } + /// Starts the VolumeManagerActor pub async fn start(self) { info!("Volume manager actor started"); @@ -164,25 +179,36 @@ impl VolumeManagerActor { } }); + // Start the volume watcher + let self_arc_watcher = Arc::clone(&self_arc); + tokio::spawn(async move { + let mut actor = self_arc_watcher.lock().await; + let state = actor.state.write().await; + + // Create and start watcher for each volume + for (volume_id, volume) in &state.volumes { + if let Some(watcher) = state.watchers.get(volume_id) { + if let Err(e) = watcher.watcher.start().await { + error!(?e, "Failed to start watcher for volume {}", volume.name); + } + } + } + }); + info!("Volume manager actor initialized"); } - async fn scan_volumes(&mut self) -> Result<(), VolumeError> { - let mut state = self.state.write().await; - state.scan_volumes().await - } - pub async fn initialize(&mut self, library: Arc) -> Result<(), VolumeError> { + // Use device_id from context instead of node + let device_pub_id = self.ctx.device_id.clone(); + // Scan for system volumes first { let mut state = self.state.write().await; - state.scan_volumes().await?; + state.scan_volumes(device_pub_id.clone()).await?; } - // Use device_id from context instead of node - let device_id = self.ctx.device_id.clone(); - - // Rest of initialize remains the same... + // Get volumes from library database let db_volumes = library .db .volume() @@ -202,11 +228,11 @@ impl VolumeManagerActor { // Prepare updates for db_volume in db_volumes { - let fingerprint = db_volume.generate_fingerprint(device_id.clone().into()); + let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into()); if let Some(system_volume) = current_volumes .values() - .find(|v| v.generate_fingerprint(device_id.clone().into()) == fingerprint) + .find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint) { let merged = Volume::merge_with_db_volume(system_volume, &db_volume); if let Some(pub_id) = &merged.pub_id { @@ -222,11 +248,49 @@ impl VolumeManagerActor { } } - // Apply updates + // Apply updates and initialize watchers { let mut state = self.state.write().await; + + // Update volumes for (pub_id, volume) in updates { - state.volumes.insert(pub_id, volume); + state.volumes.insert(pub_id.clone(), volume); + + // Create and start watcher if it doesn't exist + if !state.watchers.contains_key(&pub_id) { + let watcher = VolumeWatcher::new(self.event_tx.clone()); + if let Err(e) = watcher.start().await { + error!( + ?e, + "Failed to start watcher for volume {}", + hex::encode(&pub_id) + ); + continue; + } + + state.watchers.insert( + pub_id, + WatcherState { + watcher: Arc::new(watcher), + last_event: Instant::now(), + paused: false, + }, + ); + } + } + + // Remove any watchers for volumes that no longer exist + let stale_watchers: Vec<_> = state + .watchers + .keys() + .filter(|id| !state.volumes.contains_key(*id)) + .cloned() + .collect(); + + for volume_id in stale_watchers { + if let Some(watcher_state) = state.watchers.remove(&volume_id) { + watcher_state.watcher.stop().await; + } } } @@ -237,6 +301,45 @@ impl VolumeManagerActor { Ok(()) } + + async fn perform_maintenance(&mut self) -> Result<(), VolumeError> { + let mut state = self.state.write().await; + + // Pass device_id to maintenance + if let Err(e) = state.maintenance(self.ctx.device_id.clone()).await { + error!(?e, "Volume maintenance error"); + } + + // Rescan volumes periodically + if state.last_scan.elapsed() > Duration::from_secs(300) { + drop(state); + self.scan_volumes().await?; + state = self.state.write().await; + } + + // Clean up stale watchers + let stale_watchers: Vec<_> = state + .watchers + .iter() + .filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600)) + .map(|(id, _)| id.clone()) + .collect(); + + for volume_id in stale_watchers { + if let Some(watcher_state) = state.watchers.get(&volume_id) { + watcher_state.watcher.stop().await; + } + state.watchers.remove(&volume_id); + } + + Ok(()) + } + + async fn scan_volumes(&mut self) -> Result<(), VolumeError> { + let mut state = self.state.write().await; + state.scan_volumes(self.ctx.device_id.clone()).await + } + async fn handle_message(&mut self, msg: VolumeManagerMessage) -> Result<(), VolumeError> { trace!("VolumeManagerActor received message: {:?}", msg); match msg { @@ -356,31 +459,6 @@ impl VolumeManagerActor { Ok(()) } - async fn perform_maintenance(&mut self) -> Result<(), VolumeError> { - let mut state = self.state.write().await; - - // Rescan volumes periodically - if state.last_scan.elapsed() > Duration::from_secs(300) { - drop(state); - self.scan_volumes().await?; - state = self.state.write().await; - } - - // Clean up stale watchers - let stale_watchers: Vec<_> = state - .watchers - .iter() - .filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600)) - .map(|(id, _)| id.clone()) - .collect(); - - for volume_id in stale_watchers { - state.unwatch_volume(&volume_id).await?; - } - - Ok(()) - } - async fn handle_library_deletion(&mut self, library: Arc) -> Result<(), VolumeError> { // Clean up volumes associated with deleted library let _state = self.state.write().await; diff --git a/core/src/volume/manager.rs b/core/src/volume/manager.rs index ddd162837..d30af8c2c 100644 --- a/core/src/volume/manager.rs +++ b/core/src/volume/manager.rs @@ -7,7 +7,7 @@ use super::{ use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::broadcast; use tokio::time::Instant; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, event, instrument}; use uuid::Uuid; // const OPERATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -29,9 +29,11 @@ pub struct VolumeManagerState { impl VolumeManagerState { /// Creates a new volume manager - pub async fn new(options: VolumeOptions) -> Result { - let (event_tx, _) = broadcast::channel(128); - + // Take event_tx as parameter instead of creating new channel + pub async fn new( + options: VolumeOptions, + event_tx: broadcast::Sender, + ) -> Result { Ok(Self { volumes: HashMap::new(), watchers: HashMap::new(), @@ -42,68 +44,72 @@ impl VolumeManagerState { } /// Scans the system for volumes and updates the state - #[instrument(skip(self))] - pub async fn scan_volumes(&mut self) -> Result<(), VolumeError> { + pub async fn scan_volumes(&mut self, device_pub_id: Vec) -> Result<(), VolumeError> { debug!("Scanning for volumes..."); let detected_volumes = super::os::get_volumes().await?; - let mut new_volumes = Vec::new(); - let mut removed_volumes = self.volumes.clone(); - debug!("Found {} volumes during scan", detected_volumes.len()); - // Clear existing volumes and reinsert with new IDs - self.volumes.clear(); + let current_volumes = self.volumes.clone(); + let mut new_state = HashMap::new(); - for volume in detected_volumes { + // Process detected volumes + for mut volume in detected_volumes { // Skip virtual volumes if configured if !self.options.include_virtual && matches!(volume.mount_type, MountType::Virtual) { continue; } - // check if vol + // Generate fingerprint for the new volume + let fingerprint = volume.generate_fingerprint(device_pub_id.clone()); - let volume_id = match &volume.pub_id { - Some(id) => id.clone(), - None => { - // New volume, generate ID - let id = Uuid::now_v7().as_bytes().to_vec(); - let mut volume = volume.clone(); - volume.pub_id = Some(id.clone()); - new_volumes.push(volume); - id - } + // Try to find existing volume by fingerprint + let existing_volume = current_volumes.values().find(|existing| { + existing.generate_fingerprint(device_pub_id.clone()) == fingerprint + }); + + let volume_id = if let Some(existing) = existing_volume { + // Keep existing ID + existing + .pub_id + .clone() + .unwrap_or_else(|| Uuid::now_v7().as_bytes().to_vec()) + } else { + // Generate new ID only for truly new volumes + Uuid::now_v7().as_bytes().to_vec() }; - // Remove from potentially removed volumes - removed_volumes.remove(&volume_id); + volume.pub_id = Some(volume_id.clone()); - // Update existing volume or add new one - match self.volumes.get_mut(&volume_id) { - Some(existing) => { - if existing != &volume { - let old = existing.clone(); - *existing = volume; - let event = VolumeEvent::VolumeUpdated { - old, - new: existing.clone(), - }; - self.emit_event(event).await; - } + match current_volumes.get(&volume_id) { + Some(existing) if existing != &volume => { + new_state.insert(volume_id.clone(), volume.clone()); + self.emit_event(VolumeEvent::VolumeUpdated { + old: existing.clone(), + new: volume, + }) + .await; } None => { - self.volumes.insert(volume_id, volume.clone()); + new_state.insert(volume_id.clone(), volume.clone()); self.emit_event(VolumeEvent::VolumeAdded(volume)).await; } + _ => { + new_state.insert(volume_id.clone(), volume); + } } } - // Handle removed volumes - for (id, volume) in removed_volumes { - self.volumes.remove(&id); - self.watchers.remove(&id); - self.emit_event(VolumeEvent::VolumeRemoved(volume)).await; + // Find removed volumes + for (id, volume) in ¤t_volumes { + if !new_state.contains_key(id) { + self.watchers.remove(id); + self.emit_event(VolumeEvent::VolumeRemoved(volume.clone())) + .await; + } } + // Update state + self.volumes = new_state; self.last_scan = Instant::now(); Ok(()) } @@ -224,10 +230,10 @@ impl VolumeManagerState { } /// Performs maintenance tasks - pub async fn maintenance(&mut self) -> Result<(), VolumeError> { + pub async fn maintenance(&mut self, device_pub_id: Vec) -> Result<(), VolumeError> { // Rescan volumes periodically if self.last_scan.elapsed() > Duration::from_secs(300) { - self.scan_volumes().await?; + self.scan_volumes(device_pub_id).await?; } // Clean up stale watchers diff --git a/core/src/volume/watcher.rs b/core/src/volume/watcher.rs index 6c7dddf3e..e94915227 100644 --- a/core/src/volume/watcher.rs +++ b/core/src/volume/watcher.rs @@ -1,16 +1,14 @@ use super::error::VolumeError; use super::types::VolumeEvent; -use crate::volume::{DiskType, FileSystem, MountType}; use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; use tokio::{ sync::{broadcast, mpsc, RwLock}, time::{sleep, Instant}, }; -use tracing::error; +use tracing::{debug, error, warn}; const DEBOUNCE_MS: u64 = 100; -/// State of a volume watcher #[derive(Debug)] pub struct WatcherState { pub watcher: Arc, @@ -35,51 +33,181 @@ impl VolumeWatcher { } pub async fn start(&self) -> Result<(), VolumeError> { - let (platform_tx, mut platform_rx) = mpsc::unbounded_channel(); + debug!("Starting volume watcher"); - // Start platform-specific watcher - self.spawn_platform_watcher(platform_tx).await?; + let (check_tx, mut check_rx) = mpsc::channel(1); + // Start OS-specific watcher + self.spawn_platform_watcher(check_tx.clone()).await?; + + // Handle volume checks when triggered by OS events let event_tx = self.event_tx.clone(); - let ignored_paths = self.ignored_paths.clone(); let running = self.running.clone(); - // Event processor tokio::spawn(async move { - let mut last_event: Option = None; + let mut last_check = Instant::now(); + let mut last_volumes = Vec::new(); - while let Some(event) = platform_rx.recv().await { - if !*running.read().await { - break; - } - - // Skip ignored paths - if let Some(path) = event.path() { - if ignored_paths.read().await.contains(path) { + while *running.read().await { + // Wait for check trigger from OS watcher + if check_rx.recv().await.is_some() { + // Debounce checks + if last_check.elapsed() < Duration::from_millis(DEBOUNCE_MS) { continue; } - } + last_check = Instant::now(); - // Simple debouncing - if let Some(last) = &last_event { - if last.matches(&event) { - sleep(Duration::from_millis(DEBOUNCE_MS)).await; - continue; + match super::os::get_volumes().await { + Ok(current_volumes) => { + // Find new volumes + for volume in ¤t_volumes { + if !last_volumes.iter().any(|v| v == volume) { + debug!("New volume detected: {}", volume.name); + let _ = event_tx.send(VolumeEvent::VolumeAdded(volume.clone())); + } + } + + // Find removed volumes + for volume in &last_volumes { + if !current_volumes.iter().any(|v| v == volume) { + debug!("Volume removed: {}", volume.name); + let _ = + event_tx.send(VolumeEvent::VolumeRemoved(volume.clone())); + } + } + + // Find updated volumes + for old_volume in &last_volumes { + if let Some(new_volume) = + current_volumes.iter().find(|v| *v == old_volume) + { + if new_volume != old_volume { + debug!("Volume updated: {}", new_volume.name); + let _ = event_tx.send(VolumeEvent::VolumeUpdated { + old: old_volume.clone(), + new: new_volume.clone(), + }); + } + } + } + + last_volumes = current_volumes; + } + Err(e) => { + warn!("Failed to get volumes during watch: {}", e); + } } } - - if let Err(e) = event_tx.send(event.clone()) { - error!("Failed to send volume event: {}", e); - } - - last_event = Some(event); } }); Ok(()) } + async fn spawn_platform_watcher(&self, check_tx: mpsc::Sender<()>) -> Result<(), VolumeError> { + let running = self.running.clone(); + + #[cfg(target_os = "linux")] + { + use inotify::{Inotify, WatchMask}; + + let inotify = Inotify::init().map_err(|e| { + VolumeError::Platform(format!("Failed to initialize inotify: {}", e)) + })?; + + // Watch mount points and device changes + for path in ["/dev", "/media", "/mnt", "/run/media"] { + if let Err(e) = inotify.add_watch( + path, + WatchMask::CREATE | WatchMask::DELETE | WatchMask::MODIFY | WatchMask::UNMOUNT, + ) { + warn!("Failed to watch path {}: {}", path, e); + } + } + + let check_tx = check_tx.clone(); + tokio::spawn(async move { + let mut buffer = [0; 4096]; + while *running.read().await { + match inotify.read_events_blocking(&mut buffer) { + Ok(_) => { + if let Err(e) = check_tx.send(()).await { + error!("Failed to trigger volume check: {}", e); + } + } + Err(e) => error!("Inotify error: {}", e), + } + } + }); + } + + #[cfg(target_os = "macos")] + { + use fsevent::{self, StreamFlags}; + + let (fs_tx, fs_rx) = std::sync::mpsc::channel(); + let check_tx = check_tx.clone(); + + // Watch for volume mount events + std::thread::spawn(move || { + let mut stream = fsevent::FsEvent::new(vec![ + "/Volumes".to_string(), + "/System/Volumes".to_string(), + ]); + + stream + .observe_async(fs_tx) + .expect("Failed to start FSEvent stream"); + }); + + tokio::spawn(async move { + while *running.read().await { + if let Ok(events) = fs_rx.try_recv() { + if events.flag.contains(StreamFlags::MOUNT) + || events.flag.contains(StreamFlags::UNMOUNT) + { + if let Err(e) = check_tx.send(()).await { + error!("Failed to trigger volume check: {}", e); + } + } + } + sleep(Duration::from_millis(100)).await; + } + }); + } + + #[cfg(target_os = "windows")] + { + use windows::Win32::Storage::FileSystem::{ + FindFirstVolumeW, FindNextVolumeW, FindVolumeClose, ReadDirectoryChangesW, + FILE_NOTIFY_CHANGE_DIR_NAME, + }; + + let check_tx = check_tx.clone(); + tokio::spawn(async move { + while *running.read().await { + // Watch for volume arrival/removal + unsafe { + let mut volume_name = [0u16; 260]; + let handle = FindFirstVolumeW(volume_name.as_mut_ptr()); + if !handle.is_invalid() { + // Volume change detected + if let Err(e) = check_tx.send(()).await { + error!("Failed to trigger volume check: {}", e); + } + FindVolumeClose(handle); + } + } + sleep(Duration::from_millis(100)).await; + } + }); + } + + Ok(()) + } + pub async fn stop(&self) { + debug!("Stopping volume watcher"); *self.running.write().await = false; } @@ -90,222 +218,6 @@ impl VolumeWatcher { pub async fn unignore_path(&self, path: &PathBuf) { self.ignored_paths.write().await.remove(path); } - - async fn spawn_platform_watcher( - &self, - tx: mpsc::UnboundedSender, - ) -> Result<(), VolumeError> { - #[cfg(target_os = "linux")] - return self.spawn_linux_watcher(tx).await; - - #[cfg(target_os = "macos")] - return self.spawn_macos_watcher(tx).await; - - #[cfg(target_os = "windows")] - return self.spawn_windows_watcher(tx).await; - } - - #[cfg(target_os = "linux")] - async fn spawn_linux_watcher( - &self, - tx: mpsc::UnboundedSender, - ) -> Result<(), VolumeError> { - use inotify::{Inotify, WatchMask}; - use tokio::io::AsyncReadExt; - - let mut inotify = Inotify::init() - .map_err(|e| VolumeError::Watcher(format!("Failed to initialize inotify: {}", e)))?; - - // Watch mount points - for path in ["/dev", "/media", "/mnt"] { - inotify - .add_watch(path, WatchMask::CREATE | WatchMask::DELETE) - .map_err(|e| { - VolumeError::Watcher(format!("Failed to add watch on {}: {}", path, e)) - })?; - } - - let running = self.running.clone(); - - tokio::spawn(async move { - let mut buffer = [0; 1024]; - while *running.read().await { - match inotify.read_events_blocking(&mut buffer) { - Ok(events) => { - for event in events { - let event_type = if event.mask.contains(inotify::EventMask::CREATE) { - VolumeEvent::VolumeAdded - } else { - VolumeEvent::VolumeRemoved - }; - let _ = tx.send(event_type); - } - } - Err(e) => error!("Inotify error: {}", e), - } - } - }); - - Ok(()) - } - #[cfg(target_os = "macos")] - async fn spawn_macos_watcher( - &self, - tx: mpsc::UnboundedSender, - ) -> Result<(), VolumeError> { - use fsevent::{self, StreamFlags}; - - use crate::volume::Volume; - - // Create channels for fsevent - let (fs_event_tx, fs_event_rx) = std::sync::mpsc::channel(); - - // Spawn thread for fsevent - std::thread::spawn(move || { - let mut stream = fsevent::FsEvent::new(vec!["/Volumes".to_string()]); - - stream.observe_async(fs_event_tx).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(5)); - stream.shutdown_observe(); - }); - - // Spawn task to process events - let running = self.running.clone(); - tokio::spawn(async move { - while *running.read().await { - match fs_event_rx.try_recv() { - Ok(event) => { - // Get current volumes after event - let volumes_result = super::os::get_volumes().await; - - match volumes_result { - Ok(current_volumes) => { - if event.flag.contains(StreamFlags::MOUNT) { - // Small delay to let the OS finish mounting - tokio::time::sleep(Duration::from_millis(500)).await; - - // Find newly mounted volume - if let Some(volume) = current_volumes.iter().find(|v| { - v.mount_point.to_string_lossy().contains(&event.path) - }) { - let _ = tx.send(VolumeEvent::VolumeAdded(volume.clone())); - } - } else if event.flag.contains(StreamFlags::UNMOUNT) { - // For unmount, we need to synthesize a basic volume since it's already gone - let basic_volume = Volume::new( - event.path.clone(), - MountType::External, - PathBuf::from(&event.path), - vec![], - DiskType::Unknown, - FileSystem::Other("unknown".to_string()), - 0, - 0, - false, - ); - let _ = tx.send(VolumeEvent::VolumeRemoved(basic_volume)); - } - } - Err(e) => { - error!("Failed to get volumes after event: {}", e); - let _ = tx.send(VolumeEvent::VolumeError { - id: vec![], - error: format!("Failed to get volumes: {}", e), - }); - } - } - } - Err(std::sync::mpsc::TryRecvError::Empty) => { - tokio::time::sleep(Duration::from_millis(100)).await; - } - Err(std::sync::mpsc::TryRecvError::Disconnected) => { - error!("FSEvent channel disconnected"); - break; - } - } - } - }); - - Ok(()) - } - - #[cfg(target_os = "windows")] - async fn spawn_windows_watcher( - &self, - tx: mpsc::UnboundedSender, - ) -> Result<(), VolumeError> { - use windows::Win32::Storage::FileSystem::{ - ReadDirectoryChangesW, FILE_NOTIFY_CHANGE_DIR_NAME, - }; - - let path = std::ffi::OsString::from("C:\\") - .encode_wide() - .chain(std::iter::once(0)) - .collect::>(); - - unsafe { - let handle = windows::Win32::Storage::FileSystem::CreateFileW( - path.as_ptr(), - windows::Win32::Storage::FileSystem::FILE_LIST_DIRECTORY, - windows::Win32::Storage::FileSystem::FILE_SHARE_READ - | windows::Win32::Storage::FileSystem::FILE_SHARE_WRITE - | windows::Win32::Storage::FileSystem::FILE_SHARE_DELETE, - std::ptr::null_mut(), - windows::Win32::Storage::FileSystem::OPEN_EXISTING, - windows::Win32::Storage::FileSystem::FILE_FLAG_BACKUP_SEMANTICS - | windows::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED, - std::ptr::null_mut(), - ); - - if handle.is_invalid() { - return Err(VolumeError::Watcher( - "Failed to open directory for watching".into(), - )); - } - - let running = self.running.clone(); - - tokio::spawn(async move { - let mut buffer = [0u8; 1024]; - - while *running.read().await { - match ReadDirectoryChangesW( - handle, - buffer.as_mut_ptr() as *mut _, - buffer.len() as u32, - true, - FILE_NOTIFY_CHANGE_DIR_NAME as u32, - std::ptr::null_mut(), - std::ptr::null_mut(), - None, - ) { - Ok(_) => { - let _ = tx.send(VolumeEvent::VolumeAdded); - } - Err(e) => error!("ReadDirectoryChangesW error: {}", e), - } - } - }); - } - - Ok(()) - } -} - -impl VolumeEvent { - fn matches(&self, other: &VolumeEvent) -> bool { - std::mem::discriminant(self) == std::mem::discriminant(other) - } - - fn path(&self) -> Option<&PathBuf> { - match self { - VolumeEvent::VolumeAdded(vol) | VolumeEvent::VolumeRemoved(vol) => { - Some(&vol.mount_point) - } - VolumeEvent::VolumeUpdated { new, .. } => Some(&new.mount_point), - _ => None, - } - } } #[cfg(test)] @@ -320,14 +232,14 @@ mod tests { watcher.start().await.expect("Failed to start watcher"); - // Wait for potential events - let result = timeout(Duration::from_secs(1), rx.recv()).await; + // Wait for potential volume events + let result = timeout(Duration::from_secs(2), rx.recv()).await; // Cleanup watcher.stop().await; if let Ok(Ok(event)) = result { - println!("Received event: {:?}", event); + println!("Received volume event: {:?}", event); } } }