From 4c866e603dcec36a01cdfadbb81b95cb7552b07d Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 3 Nov 2024 00:29:57 -0700 Subject: [PATCH] Refactor volume management to streamline event handling, state updates, and improve watcher interactions within the system --- core/src/api/volumes.rs | 1 - core/src/volume/actor.rs | 125 ++++++--------- core/src/volume/manager.rs | 142 ++++------------ core/src/volume/mod.rs | 8 +- core/src/volume/types.rs | 9 +- core/src/volume/watcher.rs | 151 +++++++++++------- .../Layout/Sidebar/sections/Local/index.tsx | 1 + packages/client/src/core.ts | 4 + 8 files changed, 192 insertions(+), 249 deletions(-) diff --git a/core/src/api/volumes.rs b/core/src/api/volumes.rs index 4366f5568..d91560f9e 100644 --- a/core/src/api/volumes.rs +++ b/core/src/api/volumes.rs @@ -75,7 +75,6 @@ pub(crate) fn mount() -> AlphaRouter { let mut event_bus_rx = node.volumes.subscribe(); while let Ok(event) = event_bus_rx.recv().await { - tracing::debug!("Volume event: {:?}", event); yield event; } }) diff --git a/core/src/volume/actor.rs b/core/src/volume/actor.rs index 9ae6dbb54..9f2e8ecf6 100644 --- a/core/src/volume/actor.rs +++ b/core/src/volume/actor.rs @@ -7,6 +7,7 @@ use super::{ }; use crate::library::{Library, LibraryManagerEvent}; use async_channel as chan; +use sd_prisma::prisma::album::pub_id; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; use tokio::time::Instant; @@ -31,14 +32,6 @@ pub enum VolumeManagerMessage { volume: Volume, ack: oneshot::Sender>, }, - WatchVolume { - volume_id: Vec, - ack: oneshot::Sender>, - }, - UnwatchVolume { - volume_id: Vec, - ack: oneshot::Sender>, - }, MountVolume { volume_id: Vec, ack: oneshot::Sender>, @@ -101,14 +94,50 @@ impl VolumeManagerActor { tokio::spawn(async move { debug!("Starting volume event monitoring"); while let Ok(event) = event_rx.recv().await { - debug!("Volume event processed: {:?}", event); + debug!("Volume event received: {:?}", event); + + match event { + VolumeEvent::VolumeAdded(mut volume) => { + if let Some(pub_id) = volume.pub_id.take() { + self.state.write().await.volumes.insert(pub_id, volume); + } + } + VolumeEvent::VolumeRemoved(mut volume) => { + if let Some(pub_id) = volume.pub_id.take() { + self.state.write().await.volumes.remove(&pub_id); + } + } + VolumeEvent::VolumeUpdated { old, new } => todo!(), + VolumeEvent::VolumeSpeedTested { + id, + read_speed, + write_speed, + } => { + self.state + .write() + .await + .volumes + .get_mut(&id) + .unwrap() + .read_speed_mbps = Some(read_speed); + self.state + .write() + .await + .volumes + .get_mut(&id) + .unwrap() + .write_speed_mbps = Some(write_speed); + } + VolumeEvent::VolumeMountChanged { id, is_mounted } => todo!(), + VolumeEvent::VolumeError { id, error } => todo!(), + } } warn!("Volume event monitoring ended"); }); } /// Starts the VolumeManagerActor - pub async fn start(self) { + pub async fn start(self, device_pub_id: Vec) { info!("Volume manager actor started"); let self_arc = Arc::new(Mutex::new(self)); @@ -179,19 +208,14 @@ impl VolumeManagerActor { } }); - // Start the volume watcher - let self_arc_watcher = Arc::clone(&self_arc); - tokio::spawn(async move { - let mut actor = self_arc_watcher.lock().await; - let state = actor.state.write().await; + let event_tx = self_arc.lock().await.event_tx.clone(); - // Create and start watcher for each volume - for (volume_id, volume) in &state.volumes { - if let Some(watcher) = state.watchers.get(volume_id) { - if let Err(e) = watcher.watcher.start().await { - error!(?e, "Failed to start watcher for volume {}", volume.name); - } - } + tokio::spawn(async move { + // start one watcher + let watcher = VolumeWatcher::new(event_tx); + if let Err(e) = watcher.start(device_pub_id.clone(), self_arc.clone()).await { + error!(?e, "Failed to start watcher for volumes"); + return; } }); @@ -255,42 +279,6 @@ impl VolumeManagerActor { // Update volumes for (pub_id, volume) in updates { state.volumes.insert(pub_id.clone(), volume); - - // Create and start watcher if it doesn't exist - if !state.watchers.contains_key(&pub_id) { - let watcher = VolumeWatcher::new(self.event_tx.clone()); - if let Err(e) = watcher.start().await { - error!( - ?e, - "Failed to start watcher for volume {}", - hex::encode(&pub_id) - ); - continue; - } - - state.watchers.insert( - pub_id, - WatcherState { - watcher: Arc::new(watcher), - last_event: Instant::now(), - paused: false, - }, - ); - } - } - - // Remove any watchers for volumes that no longer exist - let stale_watchers: Vec<_> = state - .watchers - .keys() - .filter(|id| !state.volumes.contains_key(*id)) - .cloned() - .collect(); - - for volume_id in stale_watchers { - if let Some(watcher_state) = state.watchers.remove(&volume_id) { - watcher_state.watcher.stop().await; - } } } @@ -317,21 +305,6 @@ impl VolumeManagerActor { state = self.state.write().await; } - // Clean up stale watchers - let stale_watchers: Vec<_> = state - .watchers - .iter() - .filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600)) - .map(|(id, _)| id.clone()) - .collect(); - - for volume_id in stale_watchers { - if let Some(watcher_state) = state.watchers.get(&volume_id) { - watcher_state.watcher.stop().await; - } - state.watchers.remove(&volume_id); - } - Ok(()) } @@ -369,8 +342,6 @@ impl VolumeManagerActor { ack, } => todo!(), VolumeManagerMessage::UpdateVolume { volume, ack } => todo!(), - VolumeManagerMessage::WatchVolume { volume_id, ack } => todo!(), - VolumeManagerMessage::UnwatchVolume { volume_id, ack } => todo!(), VolumeManagerMessage::MountVolume { volume_id, ack } => todo!(), VolumeManagerMessage::UnmountVolume { volume_id, ack } => todo!(), VolumeManagerMessage::SpeedTest { volume_id, ack } => todo!(), @@ -387,6 +358,10 @@ impl VolumeManagerActor { Ok(self.state.read().await.volumes.values().cloned().collect()) } + pub async fn get_volumes(&self) -> Vec { + self.state.read().await.volumes.values().cloned().collect() + } + async fn handle_list_library_volumes( &self, library: Arc, diff --git a/core/src/volume/manager.rs b/core/src/volume/manager.rs index d30af8c2c..65ecd002f 100644 --- a/core/src/volume/manager.rs +++ b/core/src/volume/manager.rs @@ -17,8 +17,6 @@ use uuid::Uuid; pub struct VolumeManagerState { /// All tracked volumes pub volumes: HashMap, Volume>, - /// Active watchers - pub watchers: HashMap, WatcherState>, /// Volume manager options pub options: VolumeOptions, /// Event broadcaster @@ -36,7 +34,6 @@ impl VolumeManagerState { ) -> Result { Ok(Self { volumes: HashMap::new(), - watchers: HashMap::new(), options, event_tx, last_scan: Instant::now(), @@ -44,107 +41,60 @@ impl VolumeManagerState { } /// Scans the system for volumes and updates the state + /// This happens on startup, and during the volume manager's maintenance task pub async fn scan_volumes(&mut self, device_pub_id: Vec) -> Result<(), VolumeError> { debug!("Scanning for volumes..."); let detected_volumes = super::os::get_volumes().await?; debug!("Found {} volumes during scan", detected_volumes.len()); - let current_volumes = self.volumes.clone(); - let mut new_state = HashMap::new(); + let current_volumes = self.volumes.clone(); // Copy of current state + let mut new_state = HashMap::new(); // New state to build with detected volumes - // Process detected volumes + // Process each detected volume for mut volume in detected_volumes { - // Skip virtual volumes if configured - if !self.options.include_virtual && matches!(volume.mount_type, MountType::Virtual) { - continue; - } - - // Generate fingerprint for the new volume + // Generate a unique fingerprint to identify the volume let fingerprint = volume.generate_fingerprint(device_pub_id.clone()); - // Try to find existing volume by fingerprint - let existing_volume = current_volumes.values().find(|existing| { + // Check if this volume is already tracked in the current volumes + if let Some(existing) = current_volumes.values().find(|existing| { existing.generate_fingerprint(device_pub_id.clone()) == fingerprint - }); - - let volume_id = if let Some(existing) = existing_volume { - // Keep existing ID - existing - .pub_id - .clone() - .unwrap_or_else(|| Uuid::now_v7().as_bytes().to_vec()) - } else { - // Generate new ID only for truly new volumes - Uuid::now_v7().as_bytes().to_vec() - }; - - volume.pub_id = Some(volume_id.clone()); - - match current_volumes.get(&volume_id) { - Some(existing) if existing != &volume => { - new_state.insert(volume_id.clone(), volume.clone()); + }) { + // Compare current and detected volume properties + if existing == &volume { + // If nothing has changed, just add to the new state and skip VolumeAdded + new_state.insert(existing.pub_id.clone().unwrap(), existing.clone()); + continue; + } else { + // If properties have changed, update with the new properties and emit an update event self.emit_event(VolumeEvent::VolumeUpdated { old: existing.clone(), - new: volume, + new: volume.clone(), }) .await; } - None => { - new_state.insert(volume_id.clone(), volume.clone()); - self.emit_event(VolumeEvent::VolumeAdded(volume)).await; - } - _ => { - new_state.insert(volume_id.clone(), volume); - } + } else { + // If the volume is genuinely new, assign an ID and emit a VolumeAdded event + let volume_id = Uuid::now_v7().as_bytes().to_vec(); + volume.pub_id = Some(volume_id.clone()); + self.emit_event(VolumeEvent::VolumeAdded(volume.clone())) + .await; } + + // Insert volume into new state (whether new or updated) + new_state.insert(volume.pub_id.clone().unwrap(), volume); } - // Find removed volumes + // Identify and handle removed volumes for (id, volume) in ¤t_volumes { if !new_state.contains_key(id) { - self.watchers.remove(id); self.emit_event(VolumeEvent::VolumeRemoved(volume.clone())) .await; } } - // Update state + // Update the volume manager's state with the new volume list self.volumes = new_state; - self.last_scan = Instant::now(); - Ok(()) - } - - /// Starts watching a volume - #[instrument(skip(self))] - pub async fn watch_volume(&mut self, volume_id: Vec) -> Result<(), VolumeError> { - if self.watchers.contains_key(&volume_id) { - debug!("Already watching volume {:?}", hex::encode(&volume_id)); - return Ok(()); - } - - let watcher = Arc::new(VolumeWatcher::new(self.event_tx.clone())); - watcher.start().await?; - - self.watchers.insert( - volume_id.clone(), - WatcherState { - watcher, - last_event: Instant::now(), - paused: false, - }, - ); - - debug!("Started watching volume {}", hex::encode(&volume_id)); - Ok(()) - } - - /// Stops watching a volume - #[instrument(skip(self))] - pub async fn unwatch_volume(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> { - if let Some(state) = self.watchers.remove(volume_id) { - state.watcher.stop().await; - debug!("Stopped watching volume {}", hex::encode(volume_id)); - } + self.last_scan = Instant::now(); // Update the last scan time Ok(()) } @@ -198,30 +148,6 @@ impl VolumeManagerState { Ok(()) } - /// Temporarily pauses a volume watcher - #[instrument(skip(self))] - pub async fn pause_watcher(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> { - if let Some(state) = self.watchers.get_mut(volume_id) { - if !state.paused { - state.paused = true; - debug!("Paused watcher for volume {}", hex::encode(volume_id)); - } - } - Ok(()) - } - - /// Resumes a paused volume watcher - #[instrument(skip(self))] - pub async fn resume_watcher(&mut self, volume_id: &[u8]) -> Result<(), VolumeError> { - if let Some(state) = self.watchers.get_mut(volume_id) { - if state.paused { - state.paused = false; - debug!("Resumed watcher for volume {}", hex::encode(volume_id)); - } - } - Ok(()) - } - /// Helper to emit events async fn emit_event(&self, event: VolumeEvent) { if let Err(e) = self.event_tx.send(event) { @@ -236,18 +162,6 @@ impl VolumeManagerState { self.scan_volumes(device_pub_id).await?; } - // Clean up stale watchers - let stale_watchers: Vec<_> = self - .watchers - .iter() - .filter(|(_, state)| state.last_event.elapsed() > Duration::from_secs(3600)) - .map(|(id, _)| id.clone()) - .collect(); - - for volume_id in stale_watchers { - self.unwatch_volume(&volume_id).await?; - } - Ok(()) } } diff --git a/core/src/volume/mod.rs b/core/src/volume/mod.rs index 24ce520b2..bfe2e514f 100644 --- a/core/src/volume/mod.rs +++ b/core/src/volume/mod.rs @@ -35,8 +35,9 @@ pub struct VolumeManagerContext { pub async fn create_volume_manager( ctx: VolumeManagerContext, ) -> Result<(Volumes, VolumeManagerActor), VolumeError> { + let device_pub_id = ctx.device_id.clone(); let (manager, actor) = VolumeManagerActor::new(Arc::new(ctx)).await?; - actor.clone().start().await; + actor.clone().start(device_pub_id).await; Ok((manager, actor)) } @@ -44,8 +45,9 @@ pub async fn create_volume_manager_with_config( ctx: VolumeManagerContext, options: VolumeOptions, ) -> Result<(Volumes, VolumeManagerActor), VolumeError> { + let device_pub_id = ctx.device_id.clone(); let (manager, actor) = VolumeManagerActor::new_with_config(Arc::new(ctx), options).await?; - actor.clone().start().await; + actor.clone().start(device_pub_id).await; Ok((manager, actor)) } @@ -75,6 +77,8 @@ pub use os::linux::get_volumes; pub use os::macos::get_volumes; #[cfg(target_os = "windows")] pub use os::windows::get_volumes; +use sd_core_prisma_helpers::location_ids_and_path::device; +use sd_crypto::ct; use sd_prisma::prisma::PrismaClient; // Internal utilities diff --git a/core/src/volume/types.rs b/core/src/volume/types.rs index 92119c7e3..dddad8928 100644 --- a/core/src/volume/types.rs +++ b/core/src/volume/types.rs @@ -41,6 +41,8 @@ pub struct Volume { pub id: Option, /// Unique public identifier (None if not yet committed) pub pub_id: Option>, + /// Database ID of the device this volume is attached to, if any + pub device_id: Option, /// Human-readable volume name pub name: String, /// Type of mount (system, external, etc) @@ -95,6 +97,7 @@ impl From for Volume { Volume { id: Some(vol.id), pub_id: Some(vol.pub_id), + device_id: vol.device_id, name: vol.name.unwrap_or_else(|| "Unknown".to_string()), mount_type: vol .mount_type @@ -146,6 +149,7 @@ impl Volume { Self { id: None, pub_id: None, + device_id: None, name, mount_type, mount_point, @@ -175,8 +179,8 @@ impl Volume { hasher.update(mount_point.to_string_lossy().as_bytes()); } - hasher.update(self.name.as_bytes()); - hasher.update(&self.total_bytes_capacity.to_be_bytes()); + // hasher.update(self.name.as_bytes()); + // hasher.update(&self.total_bytes_capacity.to_be_bytes()); hasher.update(self.file_system.to_string().as_bytes()); hasher.finalize().as_bytes().to_vec() @@ -219,6 +223,7 @@ impl Volume { // Keep database-tracked properties and metadata id: db_volume.id, + device_id: db_volume.device_id, pub_id: db_volume.pub_id.clone(), name: db_volume.name.clone(), read_only: db_volume.read_only, diff --git a/core/src/volume/watcher.rs b/core/src/volume/watcher.rs index e94915227..b634c08d4 100644 --- a/core/src/volume/watcher.rs +++ b/core/src/volume/watcher.rs @@ -1,6 +1,12 @@ -use super::error::VolumeError; +use crate::volume::Volume; + use super::types::VolumeEvent; +use super::VolumeManagerActor; +use super::{actor, error::VolumeError}; +use sd_prisma::prisma::cloud_crdt_operation::device_pub_id; +use serde::de; use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; +use tokio::sync::Mutex; use tokio::{ sync::{broadcast, mpsc, RwLock}, time::{sleep, Instant}, @@ -32,7 +38,11 @@ impl VolumeWatcher { } } - pub async fn start(&self) -> Result<(), VolumeError> { + pub async fn start( + &self, + device_pub_id: Vec, + actor: Arc>, + ) -> Result<(), VolumeError> { debug!("Starting volume watcher"); let (check_tx, mut check_rx) = mpsc::channel(1); @@ -46,7 +56,6 @@ impl VolumeWatcher { tokio::spawn(async move { let mut last_check = Instant::now(); - let mut last_volumes = Vec::new(); while *running.read().await { // Wait for check trigger from OS watcher @@ -59,39 +68,58 @@ impl VolumeWatcher { match super::os::get_volumes().await { Ok(current_volumes) => { + let actor_state_volumes = actor.lock().await.get_volumes().await; + + debug!("actor_state_volumes: {:?}", actor_state_volumes); + // Need device_id for fingerprinting + let device_id = device_pub_id.clone(); // We'll need to pass this through from the actor + // Find new volumes for volume in ¤t_volumes { - if !last_volumes.iter().any(|v| v == volume) { - debug!("New volume detected: {}", volume.name); + let fingerprint = volume.generate_fingerprint(device_id.clone()); + + if !actor_state_volumes.iter().any(|v: &Volume| { + v.generate_fingerprint(device_id.clone()) == fingerprint + }) { + debug!( + "New volume detected: {} (fingerprint: {})", + volume.name, + hex::encode(&fingerprint) + ); let _ = event_tx.send(VolumeEvent::VolumeAdded(volume.clone())); - } - } - - // Find removed volumes - for volume in &last_volumes { - if !current_volumes.iter().any(|v| v == volume) { - debug!("Volume removed: {}", volume.name); - let _ = - event_tx.send(VolumeEvent::VolumeRemoved(volume.clone())); - } - } - - // Find updated volumes - for old_volume in &last_volumes { - if let Some(new_volume) = - current_volumes.iter().find(|v| *v == old_volume) - { - if new_volume != old_volume { - debug!("Volume updated: {}", new_volume.name); + } else if let Some(old_volume) = + actor_state_volumes.iter().find(|v| { + v.generate_fingerprint(device_id.clone()) == fingerprint + }) { + if old_volume != volume { + debug!( + "Volume updated: {} (fingerprint: {})", + volume.name, + hex::encode(&fingerprint) + ); let _ = event_tx.send(VolumeEvent::VolumeUpdated { old: old_volume.clone(), - new: new_volume.clone(), + new: volume.clone(), }); } } } - last_volumes = current_volumes; + // Find removed volumes + for volume in &actor_state_volumes { + let fingerprint = volume.generate_fingerprint(device_id.clone()); + if !current_volumes.iter().any(|v| { + v.generate_fingerprint(device_id.clone()) == fingerprint + }) { + debug!( + "Volume removed: {} (fingerprint: {})", + volume.name, + hex::encode(&fingerprint) + ); + let _ = + event_tx.send(VolumeEvent::VolumeRemoved(volume.clone())); + } + } } Err(e) => { warn!("Failed to get volumes during watch: {}", e); @@ -148,30 +176,43 @@ impl VolumeWatcher { let (fs_tx, fs_rx) = std::sync::mpsc::channel(); let check_tx = check_tx.clone(); - // Watch for volume mount events + // Keep stream alive in the thread std::thread::spawn(move || { let mut stream = fsevent::FsEvent::new(vec![ "/Volumes".to_string(), "/System/Volumes".to_string(), ]); - stream - .observe_async(fs_tx) - .expect("Failed to start FSEvent stream"); + match stream.observe_async(fs_tx) { + Ok(_) => { + // Block thread to keep stream alive + std::thread::park(); + } + Err(e) => { + error!("Failed to start FSEvent stream: {}", e); + } + } }); tokio::spawn(async move { while *running.read().await { - if let Ok(events) = fs_rx.try_recv() { - if events.flag.contains(StreamFlags::MOUNT) - || events.flag.contains(StreamFlags::UNMOUNT) - { - if let Err(e) = check_tx.send(()).await { - error!("Failed to trigger volume check: {}", e); + match fs_rx.recv() { + Ok(events) => { + // Only care about mount/unmount events + if events.flag.contains(StreamFlags::MOUNT) + || events.flag.contains(StreamFlags::UNMOUNT) + { + debug!("Received volume event: {:?}", events); + if let Err(e) = check_tx.send(()).await { + error!("Failed to trigger volume check: {}", e); + } } } + Err(e) => { + error!("FSEvent receive error: {}", e); + sleep(Duration::from_millis(100)).await; + } } - sleep(Duration::from_millis(100)).await; } }); } @@ -220,26 +261,26 @@ impl VolumeWatcher { } } -#[cfg(test)] -mod tests { - use super::*; - use tokio::time::timeout; +// #[cfg(test)] +// mod tests { +// use super::*; +// use tokio::time::timeout; - #[tokio::test] - async fn test_watcher() { - let (tx, mut rx) = broadcast::channel(16); - let watcher = VolumeWatcher::new(tx); +// #[tokio::test] +// async fn test_watcher() { +// let (tx, mut rx) = broadcast::channel(16); +// let watcher = VolumeWatcher::new(tx); - watcher.start().await.expect("Failed to start watcher"); +// watcher.start().await.expect("Failed to start watcher"); - // Wait for potential volume events - let result = timeout(Duration::from_secs(2), rx.recv()).await; +// // Wait for potential volume events +// let result = timeout(Duration::from_secs(2), rx.recv()).await; - // Cleanup - watcher.stop().await; +// // Cleanup +// watcher.stop().await; - if let Ok(Ok(event)) = result { - println!("Received volume event: {:?}", event); - } - } -} +// if let Ok(Ok(event)) = result { +// println!("Received volume event: {:?}", event); +// } +// } +// } diff --git a/interface/app/$libraryId/Layout/Sidebar/sections/Local/index.tsx b/interface/app/$libraryId/Layout/Sidebar/sections/Local/index.tsx index 1ad3d40c1..d9f63c302 100644 --- a/interface/app/$libraryId/Layout/Sidebar/sections/Local/index.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/sections/Local/index.tsx @@ -66,6 +66,7 @@ export default function LocalSection() { const volumeEvents = useLibrarySubscription(['volumes.events'], { onData: (data) => { console.log('Volume event received:', data); + volumesQuery.refetch(); } }); diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 51efd384c..319913fdb 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -808,6 +808,10 @@ id: number | null; * Unique public identifier (None if not yet committed) */ pub_id: number[] | null; +/** + * Database ID of the device this volume is attached to, if any + */ +device_id: number | null; /** * Human-readable volume name */