Implement volume initialization tailored for libraries, optimize volume scanning on startup, and enhance state management

This commit is contained in:
Jamie Pine
2024-11-03 01:16:29 -08:00
parent 8db67dc839
commit bd0062301d
3 changed files with 52 additions and 57 deletions

View File

@@ -5,9 +5,12 @@ use super::{
watcher::{VolumeWatcher, WatcherState},
VolumeManagerContext, VolumeManagerState,
};
use crate::library::{Library, LibraryManagerEvent};
use crate::{
library::{Library, LibraryManagerEvent},
volume::MountType,
};
use async_channel as chan;
use sd_prisma::prisma::album::pub_id;
use sd_prisma::prisma::{album::pub_id, volume};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tokio::time::Instant;
@@ -184,6 +187,9 @@ impl VolumeManagerActor {
actor.ctx.library_event_tx.clone()
};
// Scan for volumes on startup
let _ = self_arc.lock().await.scan_volumes().await;
// This is a fire-and-forget subscription
tokio::spawn(async move {
if let Err(e) = rx
@@ -194,7 +200,8 @@ impl VolumeManagerActor {
LibraryManagerEvent::Load(library) => {
if let Err(e) = {
let mut actor = self_arc_inner.lock().await;
actor.initialize(library.clone()).await
// TODO: check if active library somehow, as we don't care to sync volumes for inactive libraries
actor.initialize_for_library(library.clone()).await
} {
error!(?e, "Failed to initialize volume manager for library");
}
@@ -231,68 +238,49 @@ impl VolumeManagerActor {
info!("Volume manager actor initialized");
}
pub async fn initialize(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
/// Syncs volume memory state with library database
pub async fn initialize_for_library(
&mut self,
library: Arc<Library>,
) -> Result<(), VolumeError> {
use sd_prisma::prisma::device;
// Use device_id from context instead of node
let device_pub_id = self.ctx.device_id.clone();
let current_volumes = self.get_volumes().await;
// Scan for system volumes first
{
let mut state = self.state.write().await;
state.scan_volumes(device_pub_id.clone()).await?;
}
// Get volumes from library database
let db_volumes = library
let db_device = library
.db
.volume()
.find_many(vec![])
.device()
.find_unique(device::pub_id::equals(device_pub_id.clone()))
.exec()
.await?
.into_iter()
.map(Volume::from);
.ok_or(VolumeError::DeviceNotFound(device_pub_id.clone()))?;
// Get current volumes and clone what we need
let current_volumes = {
let state = self.state.read().await;
state.volumes.clone()
};
let db_system_volumes = library
.db
.volume()
.find_many(vec![
volume::device_id::equals(Some(db_device.id)),
volume::mount_type::equals(Some(MountType::System.to_string())),
])
.exec()
.await?;
let mut updates = Vec::new();
let db_system_volumes = db_system_volumes.into_iter().map(Volume::from);
// Prepare updates
for db_volume in db_volumes {
let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into());
if let Some(system_volume) = current_volumes
.values()
.find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint)
{
let merged = Volume::merge_with_db_volume(system_volume, &db_volume);
if let Some(pub_id) = &merged.pub_id {
updates.push((pub_id.clone(), merged.clone()));
let _ = self.event_tx.send(VolumeEvent::VolumeUpdated {
old: system_volume.clone(),
new: merged,
});
// Register system volumes in the db
if db_system_volumes.len() == 0 {
for v in current_volumes.iter() {
if v.mount_type == MountType::System {
// Create is will always treat the volume as a new volume
// Assigning a new pub_id in the process
v.create(&library.db, device_pub_id.clone()).await?;
}
} else if let Some(pub_id) = &db_volume.pub_id {
updates.push((pub_id.clone(), db_volume.clone()));
let _ = self.event_tx.send(VolumeEvent::VolumeAdded(db_volume));
}
}
// Apply updates and initialize watchers
{
let mut state = self.state.write().await;
// Update volumes
for (pub_id, volume) in updates {
state.volumes.insert(pub_id.clone(), volume);
}
}
info!(
"Volume manager initialized: {:?}",
"Volume manager initialized for library: {:?}",
self.state.read().await.volumes
);

View File

@@ -47,7 +47,6 @@ impl VolumeManagerState {
let detected_volumes = super::os::get_volumes().await?;
debug!("Found {} volumes during scan", detected_volumes.len());
let current_volumes = self.volumes.clone(); // Copy of current state
let mut new_state = HashMap::new(); // New state to build with detected volumes
// Process each detected volume

View File

@@ -7,8 +7,8 @@ use sd_prisma::prisma::{
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use specta::Type;
use std::path::Path;
use std::path::PathBuf;
use std::{path::Path, sync::Arc};
use strum_macros::Display;
use uuid::Uuid;
@@ -79,14 +79,22 @@ pub struct Volume {
pub total_bytes_available: u64,
}
// We can use this to see if a volume has changed
impl PartialEq for Volume {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.disk_type == other.disk_type
&& self.file_system == other.file_system
// Check if any mount points overlap
&& (self.mount_points.iter().any(|mp| other.mount_points.contains(mp))
|| other.mount_points.iter().any(|mp| self.mount_points.contains(mp)))
&& self.mount_type == other.mount_type
&& self.mount_point == other.mount_point
// Check if any mount points overlap
&& (self.mount_points.iter().any(|mp| other.mount_points.contains(mp))
|| other.mount_points.iter().any(|mp| self.mount_points.contains(mp)))
&& self.is_mounted == other.is_mounted
&& self.read_only == other.read_only
&& self.error_status == other.error_status
&& self.total_bytes_capacity == other.total_bytes_capacity
&& self.total_bytes_available == other.total_bytes_available
}
}
@@ -245,7 +253,7 @@ impl Volume {
/// Creates a new volume record in the database
pub async fn create(
&self,
db: &PrismaClient,
db: &Arc<PrismaClient>,
device_pub_id: Vec<u8>,
) -> Result<(), VolumeError> {
let pub_id = Uuid::now_v7().as_bytes().to_vec();