From bd0062301d49c22e55898c60cc09f4b3d606d3fc Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Sun, 3 Nov 2024 01:16:29 -0800 Subject: [PATCH] Implement volume initialization tailored for libraries, optimize volume scanning on startup, and enhance state management --- core/src/volume/actor.rs | 90 +++++++++++++++++--------------------- core/src/volume/manager.rs | 1 - core/src/volume/types.rs | 18 +++++--- 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/core/src/volume/actor.rs b/core/src/volume/actor.rs index 4c2c24f2f..354340ce3 100644 --- a/core/src/volume/actor.rs +++ b/core/src/volume/actor.rs @@ -5,9 +5,12 @@ use super::{ watcher::{VolumeWatcher, WatcherState}, VolumeManagerContext, VolumeManagerState, }; -use crate::library::{Library, LibraryManagerEvent}; +use crate::{ + library::{Library, LibraryManagerEvent}, + volume::MountType, +}; use async_channel as chan; -use sd_prisma::prisma::album::pub_id; +use sd_prisma::prisma::{album::pub_id, volume}; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; use tokio::time::Instant; @@ -184,6 +187,9 @@ impl VolumeManagerActor { actor.ctx.library_event_tx.clone() }; + // Scan for volumes on startup + let _ = self_arc.lock().await.scan_volumes().await; + // This is a fire-and-forget subscription tokio::spawn(async move { if let Err(e) = rx @@ -194,7 +200,8 @@ impl VolumeManagerActor { LibraryManagerEvent::Load(library) => { if let Err(e) = { let mut actor = self_arc_inner.lock().await; - actor.initialize(library.clone()).await + // TODO: check if active library somehow, as we don't care to sync volumes for inactive libraries + actor.initialize_for_library(library.clone()).await } { error!(?e, "Failed to initialize volume manager for library"); } @@ -231,68 +238,49 @@ impl VolumeManagerActor { info!("Volume manager actor initialized"); } - pub async fn initialize(&mut self, library: Arc) -> Result<(), VolumeError> { + /// Syncs volume memory state with library database + pub async fn initialize_for_library( + &mut self, + library: Arc, + ) -> Result<(), VolumeError> { + use sd_prisma::prisma::device; // Use device_id from context instead of node let device_pub_id = self.ctx.device_id.clone(); + let current_volumes = self.get_volumes().await; - // Scan for system volumes first - { - let mut state = self.state.write().await; - state.scan_volumes(device_pub_id.clone()).await?; - } - - // Get volumes from library database - let db_volumes = library + let db_device = library .db - .volume() - .find_many(vec![]) + .device() + .find_unique(device::pub_id::equals(device_pub_id.clone())) .exec() .await? - .into_iter() - .map(Volume::from); + .ok_or(VolumeError::DeviceNotFound(device_pub_id.clone()))?; - // Get current volumes and clone what we need - let current_volumes = { - let state = self.state.read().await; - state.volumes.clone() - }; + let db_system_volumes = library + .db + .volume() + .find_many(vec![ + volume::device_id::equals(Some(db_device.id)), + volume::mount_type::equals(Some(MountType::System.to_string())), + ]) + .exec() + .await?; - let mut updates = Vec::new(); + let db_system_volumes = db_system_volumes.into_iter().map(Volume::from); - // Prepare updates - for db_volume in db_volumes { - let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into()); - - if let Some(system_volume) = current_volumes - .values() - .find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint) - { - let merged = Volume::merge_with_db_volume(system_volume, &db_volume); - if let Some(pub_id) = &merged.pub_id { - updates.push((pub_id.clone(), merged.clone())); - let _ = self.event_tx.send(VolumeEvent::VolumeUpdated { - old: system_volume.clone(), - new: merged, - }); + // Register system volumes in the db + if db_system_volumes.len() == 0 { + for v in current_volumes.iter() { + if v.mount_type == MountType::System { + // Create is will always treat the volume as a new volume + // Assigning a new pub_id in the process + v.create(&library.db, device_pub_id.clone()).await?; } - } else if let Some(pub_id) = &db_volume.pub_id { - updates.push((pub_id.clone(), db_volume.clone())); - let _ = self.event_tx.send(VolumeEvent::VolumeAdded(db_volume)); - } - } - - // Apply updates and initialize watchers - { - let mut state = self.state.write().await; - - // Update volumes - for (pub_id, volume) in updates { - state.volumes.insert(pub_id.clone(), volume); } } info!( - "Volume manager initialized: {:?}", + "Volume manager initialized for library: {:?}", self.state.read().await.volumes ); diff --git a/core/src/volume/manager.rs b/core/src/volume/manager.rs index e3b2303b0..fbf4aa251 100644 --- a/core/src/volume/manager.rs +++ b/core/src/volume/manager.rs @@ -47,7 +47,6 @@ impl VolumeManagerState { let detected_volumes = super::os::get_volumes().await?; debug!("Found {} volumes during scan", detected_volumes.len()); - let current_volumes = self.volumes.clone(); // Copy of current state let mut new_state = HashMap::new(); // New state to build with detected volumes // Process each detected volume diff --git a/core/src/volume/types.rs b/core/src/volume/types.rs index dddad8928..4ff3b8b01 100644 --- a/core/src/volume/types.rs +++ b/core/src/volume/types.rs @@ -7,8 +7,8 @@ use sd_prisma::prisma::{ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use specta::Type; -use std::path::Path; use std::path::PathBuf; +use std::{path::Path, sync::Arc}; use strum_macros::Display; use uuid::Uuid; @@ -79,14 +79,22 @@ pub struct Volume { pub total_bytes_available: u64, } +// We can use this to see if a volume has changed impl PartialEq for Volume { fn eq(&self, other: &Self) -> bool { self.name == other.name && self.disk_type == other.disk_type && self.file_system == other.file_system - // Check if any mount points overlap - && (self.mount_points.iter().any(|mp| other.mount_points.contains(mp)) - || other.mount_points.iter().any(|mp| self.mount_points.contains(mp))) + && self.mount_type == other.mount_type + && self.mount_point == other.mount_point + // Check if any mount points overlap + && (self.mount_points.iter().any(|mp| other.mount_points.contains(mp)) + || other.mount_points.iter().any(|mp| self.mount_points.contains(mp))) + && self.is_mounted == other.is_mounted + && self.read_only == other.read_only + && self.error_status == other.error_status + && self.total_bytes_capacity == other.total_bytes_capacity + && self.total_bytes_available == other.total_bytes_available } } @@ -245,7 +253,7 @@ impl Volume { /// Creates a new volume record in the database pub async fn create( &self, - db: &PrismaClient, + db: &Arc, device_pub_id: Vec, ) -> Result<(), VolumeError> { let pub_id = Uuid::now_v7().as_bytes().to_vec();