diff --git a/.cspell/project_words.txt b/.cspell/project_words.txt index d0d6976e8..b516b271b 100644 --- a/.cspell/project_words.txt +++ b/.cspell/project_words.txt @@ -52,6 +52,7 @@ poonen rauch ravikant Recents +reimplementation Renamable richelsen rspc diff --git a/Refactor Progress.md b/Refactor Progress.md new file mode 100644 index 000000000..c096a1f1a --- /dev/null +++ b/Refactor Progress.md @@ -0,0 +1,95 @@ +## Spacedrive Refactor: Finalization Plan + +**Document Version:** 1.0 +**Date:** September 13, 2025 +**Status:** In Progress + +### 1\. Introduction + +The foundational refactor of the Spacedrive `Core` engine is a success. We have established a robust, CQRS-based architecture with separate, modular `Action` and `Query` pathways, managed by the `ActionManager` and `QueryManager`. The `Core` now exposes a clean, type-safe API, and the session state has been correctly centralized within its services. + +This document outlines the remaining tasks required to fully migrate the codebase to this new architecture, clean up legacy code, and realize the full benefits of the new design. + +----- + +### 2\. Phase 1: Complete the "Active Library" Context Migration + +The primary goal of this phase is to make all operations fully session-aware, removing the need for them to manually handle a `library_id`. + +**Objective:** Eliminate the `library_id` field from all `Action` and `Query` structs, and have their handlers source this context directly from the `Core`'s session service. + +**Actionable Tasks:** + +1. **Systematically Refactor Remaining `ops` Modules:** + + * Iterate through the following modules and remove the `library_id` field from every `Action` and `Query` struct within them: + * `ops::location::*` (All location operations) + * `ops::object::copy`, `ops::object::move` + * `ops::tag::*` (All tagging operations) + * `ops::job::*` (All job-related operations) + +2. **Update Handlers to be Session-Aware:** + + * Modify the corresponding `ActionHandler` and `QueryHandler` implementations for the structs above. + * Inside the `validate` or `execute` methods, retrieve the active library ID by calling a method on the core session service, for example: `core.session().get_active_library_id()?`. + +----- + +### 3\. Phase 2: Enhance and Finalize Client Applications + +With a fully context-aware `Core` API, the clients can be simplified and made more powerful. + +**Objective:** Implement the "active library" override mechanism, and reduce boilerplate code in the CLI and GraphQL layers. + +**Actionable Tasks:** + +1. **Implement CLI `--library` Flag:** + + * Add a global `--library ` flag to the CLI using `clap`. + * In the CLI's main execution logic, check for this flag. If present, call a new method on the `Core`'s session service (e.g., `core.session().set_override_library_id(id)`) before executing the command. This will set the context for the duration of that single operation. + +2. **Reduce Client Boilerplate with `From` Trait:** + + * For each `Action` and `Query` struct, implement the `From` trait, where `T` is the corresponding CLI or GraphQL argument struct. + * **Example (`core/src/ops/location/add.rs`):** + ```rust + // Teach the Action how to be created from CLI args + impl From for AddLocationAction { + fn from(args: cli::AddLocationArgs) -> Self { + Self { path: args.path, name: args.name } + } + } + ``` + * Refactor the client code to use `.into()` for clean, one-line conversions, eliminating manual field mapping. + ```rust + // In the CLI command runner + core.execute_action(args.into()).await?; + ``` + +3. **Standardize Client Logic with Macros (Optional):** + + * For highly repetitive client patterns (like a simple CLI command that just creates an action, executes it, and prints the result), consider creating `macro_rules!` to generate the boilerplate code automatically. + +----- + +### 4\. Phase 3: Finalize and Deprecate Legacy Code + +The final step is to remove all traces of the old, tightly-coupled architecture. + +**Objective:** Ensure the old `DaemonCommand` and its related infrastructure are completely removed from the codebase. + +**Actionable Tasks:** + +1. **Delete `DaemonCommand` Enum:** + + * Once all CLI commands have been migrated to the CQRS pattern, the old `DaemonCommand` enum can be safely deleted. + +2. **Clean Up the Daemon:** + + * Remove all logic from the daemon that was responsible for matching on `DaemonCommand` variants. Its role should now be purely to deserialize and route the new `Action`/`Query` structs. + +3. **Code-wide Audit:** + + * Perform a full-text search for `DaemonCommand` and any related types to ensure no remnants are left in the CLI, daemon, or `Core` tests. + +By completing these three phases, the refactor will be complete. The result will be a clean, scalable, and highly maintainable architecture that will serve as a robust foundation for the future of Spacedrive. diff --git a/apps/cli/src/domains/library.rs b/apps/cli/src/domains/library.rs index f66e355c1..b9b59a70d 100644 --- a/apps/cli/src/domains/library.rs +++ b/apps/cli/src/domains/library.rs @@ -21,8 +21,43 @@ pub async fn run(ctx: &Context, cmd: LibraryCmd) -> Result<()> { LibraryCmd::Create { name } => { let input = create::LibraryCreateInput::new(name); let bytes = ctx.core.action(&input).await?; - let out: create::LibraryCreateOutput = bincode::deserialize(&bytes)?; - println!("Created library {} with ID {} at {}", out.name, out.library_id, out.path.display()); + eprintln!("DEBUG: Received {} bytes from daemon", bytes.len()); + // Try to deserialize and provide detailed error information + match bincode::deserialize::(&bytes) { + Ok(out) => { + println!( + "Created library {} with ID {} at {}", + out.name, out.library_id, out.path + ); + } + Err(e) => { + eprintln!("DEBUG: Bincode deserialization failed: {}", e); + eprintln!("DEBUG: Raw bytes length: {}", bytes.len()); + + // Try to decode as much as possible manually to debug + if bytes.len() >= 16 { + let uuid_bytes = &bytes[0..16]; + eprintln!("DEBUG: UUID bytes: {:?}", uuid_bytes); + + if bytes.len() > 16 { + let name_len = bytes[16] as usize; + eprintln!("DEBUG: Name length: {}", name_len); + + if bytes.len() > 17 + name_len { + let name_bytes = &bytes[17..17 + name_len]; + if let Ok(name) = std::str::from_utf8(name_bytes) { + eprintln!("DEBUG: Library name: {}", name); + } + } + } + } + + return Err(anyhow::anyhow!( + "Failed to deserialize library creation response: {}", + e + )); + } + } } LibraryCmd::Switch { id } => { let input = set_current::SetCurrentLibraryInput { library_id: id }; diff --git a/apps/cli/src/domains/location.rs b/apps/cli/src/domains/location.rs index 427ba0226..0461e4047 100644 --- a/apps/cli/src/domains/location.rs +++ b/apps/cli/src/domains/location.rs @@ -4,54 +4,94 @@ use uuid::Uuid; use crate::context::Context; use crate::domains::index::IndexModeArg; +use sd_core::ops::{ + libraries::list::{output::LibraryInfo, query::ListLibrariesQuery}, + locations::add::action::LocationAddInput, +}; #[derive(Subcommand, Debug)] pub enum LocationCmd { - Add { path: std::path::PathBuf, #[arg(long)] name: Option, #[arg(long, value_enum, default_value = "content")] mode: IndexModeArg }, + Add { + path: std::path::PathBuf, + #[arg(long)] + name: Option, + #[arg(long, value_enum, default_value = "content")] + mode: IndexModeArg, + }, List, - Remove { location_id: Uuid }, - Rescan { location_id: Uuid, #[arg(long, default_value_t = false)] force: bool }, + Remove { + location_id: Uuid, + }, + Rescan { + location_id: Uuid, + #[arg(long, default_value_t = false)] + force: bool, + }, } pub async fn run(ctx: &Context, cmd: LocationCmd) -> Result<()> { match cmd { LocationCmd::Add { path, name, mode } => { - let libs: Vec = ctx + let result_bytes = ctx .core - .query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic()) + .action(&LocationAddInput { + path, + name, + mode: sd_core::ops::indexing::job::IndexMode::from(mode), + }) .await?; - let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to add locations when multiple libraries exist") }; - - let result_bytes = ctx.core.action(&sd_core::ops::locations::add::action::LocationAddInput { library_id, path, name, mode: sd_core::ops::indexing::job::IndexMode::from(mode) }).await?; - let out: sd_core::ops::locations::add::output::LocationAddOutput = bincode::deserialize(&result_bytes)?; - println!("Added location {} -> {}", out.location_id, out.path.display()); + let out: sd_core::ops::locations::add::output::LocationAddOutput = + bincode::deserialize(&result_bytes)?; + println!( + "Added location {} -> {}", + out.location_id, + out.path.display() + ); } LocationCmd::List => { - let libs: Vec = ctx + let out: sd_core::ops::locations::list::output::LocationsListOutput = ctx .core - .query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic()) + .query(&sd_core::ops::locations::list::query::LocationsListQuery { library_id }) .await?; - let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to list locations when multiple libraries exist") }; - let out: sd_core::ops::locations::list::output::LocationsListOutput = ctx.core.query(&sd_core::ops::locations::list::query::LocationsListQuery { library_id }).await?; - for loc in out.locations { println!("- {} {}", loc.id, loc.path.display()); } + for loc in out.locations { + println!("- {} {}", loc.id, loc.path.display()); + } } LocationCmd::Remove { location_id } => { - let libs: Vec = ctx - .core - .query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic()) - .await?; - let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to remove locations when multiple libraries exist") }; + let libs: Vec = ctx.core.query(&ListLibrariesQuery::basic()).await?; + let library_id = if libs.len() == 1 { + libs[0].id + } else { + anyhow::bail!("Specify --library to remove locations when multiple libraries exist") + }; - let result_bytes = ctx.core.action(&sd_core::ops::locations::remove::action::LocationRemoveInput { library_id, location_id }).await?; - let _out: sd_core::ops::locations::remove::output::LocationRemoveOutput = bincode::deserialize(&result_bytes)?; + let result_bytes = ctx + .core + .action( + &sd_core::ops::locations::remove::action::LocationRemoveInput { + library_id, + location_id, + }, + ) + .await?; + let _out: sd_core::ops::locations::remove::output::LocationRemoveOutput = + bincode::deserialize(&result_bytes)?; println!("Removed location {}", location_id); } LocationCmd::Rescan { location_id, force } => { - let result_bytes = ctx.core.action(&sd_core::ops::locations::rescan::action::LocationRescanInput { location_id, full_rescan: force }).await?; - let _out: sd_core::ops::locations::rescan::output::LocationRescanOutput = bincode::deserialize(&result_bytes)?; + let result_bytes = ctx + .core + .action( + &sd_core::ops::locations::rescan::action::LocationRescanInput { + location_id, + full_rescan: force, + }, + ) + .await?; + let _out: sd_core::ops::locations::rescan::output::LocationRescanOutput = + bincode::deserialize(&result_bytes)?; println!("Rescan requested for {}", location_id); } } Ok(()) } - diff --git a/apps/cli/src/main.rs b/apps/cli/src/main.rs index 644826e6b..f2ead68b2 100644 --- a/apps/cli/src/main.rs +++ b/apps/cli/src/main.rs @@ -69,7 +69,6 @@ enum Commands { Job(JobCmd), } - #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); @@ -83,7 +82,9 @@ async fn main() -> Result<()> { } let socket_path = if let Some(inst) = &instance { - data_dir.join("daemon").join(format!("daemon-{}.sock", inst)) + data_dir + .join("daemon") + .join(format!("daemon-{}.sock", inst)) } else { data_dir.join("daemon/daemon.sock") }; @@ -91,12 +92,65 @@ async fn main() -> Result<()> { match cli.command { Commands::Start { enable_networking } => { println!("Starting daemon..."); - sd_core::infra::daemon::bootstrap::start_default_server(socket_path, data_dir, enable_networking).await.map_err(|e| anyhow::anyhow!(e.to_string()))?; + + // Check if daemon is already running + let client = CoreClient::new(socket_path.clone()); + match client + .send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Ping) + .await + { + Ok(sd_core::infra::daemon::types::DaemonResponse::Pong) => { + println!("Daemon is already running"); + return Ok(()); + } + _ => {} // Daemon not running, continue + } + + // Start daemon in background using std::process::Command + let current_exe = std::env::current_exe()?; + let daemon_path = current_exe.parent().unwrap().join("daemon"); + let mut command = std::process::Command::new(daemon_path); + + // Pass networking flag if enabled (if daemon supports it) + if enable_networking { + println!("Note: Networking flag passed but daemon may not support it yet"); + } + + // Set working directory to current directory + command.current_dir(std::env::current_dir()?); + + match command.spawn() { + Ok(child) => { + println!("Daemon started (PID: {})", child.id()); + + // Wait a moment for daemon to start up + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Verify daemon is responding + match client + .send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Ping) + .await + { + Ok(sd_core::infra::daemon::types::DaemonResponse::Pong) => { + println!("Daemon is ready and responding"); + } + _ => { + println!("Warning: Daemon may not be fully initialized yet"); + } + } + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to start daemon: {}", e)); + } + } } Commands::Stop => { println!("Stopping daemon..."); - let core = CoreClient::new(socket_path.clone()); - let _ = core.send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Shutdown).await.map_err(|e| anyhow::anyhow!(e.to_string()))?; + let core = CoreClient::new(socket_path.clone()); + let _ = core + .send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Shutdown) + .await + .map_err(|e| anyhow::anyhow!(e.to_string()))?; println!("Daemon stopped."); } _ => { @@ -107,7 +161,12 @@ async fn main() -> Result<()> { Ok(()) } -async fn run_client_command(command: Commands, format: OutputFormat, data_dir: std::path::PathBuf, socket_path: std::path::PathBuf) -> Result<()> { +async fn run_client_command( + command: Commands, + format: OutputFormat, + data_dir: std::path::PathBuf, + socket_path: std::path::PathBuf, +) -> Result<()> { let core = CoreClient::new(socket_path.clone()); let ctx = Context::new(core, format, data_dir, socket_path); match command { diff --git a/core/benchmarks/src/scenarios/content_identification.rs b/core/benchmarks/src/scenarios/content_identification.rs index 0e168eeaf..432a19ad6 100644 --- a/core/benchmarks/src/scenarios/content_identification.rs +++ b/core/benchmarks/src/scenarios/content_identification.rs @@ -27,7 +27,7 @@ impl Scenario for ContentIdentificationScenario { use sd_core_new::infrastructure::actions::handler::ActionHandler; let core = &boot.core; let context = core.context.clone(); - let library = match core.libraries.get_primary_library().await { + let library = match core.libraries.get_active_library().await { Some(lib) => lib, None => core.libraries.create_library("Benchmarks", None, context.clone()).await?, }; diff --git a/core/benchmarks/src/scenarios/core_indexing.rs b/core/benchmarks/src/scenarios/core_indexing.rs index 56533be06..a21e10368 100644 --- a/core/benchmarks/src/scenarios/core_indexing.rs +++ b/core/benchmarks/src/scenarios/core_indexing.rs @@ -27,7 +27,7 @@ impl Scenario for CoreIndexingScenario { use sd_core_new::infrastructure::actions::handler::ActionHandler; let core = &boot.core; let context = core.context.clone(); - let library = match core.libraries.get_primary_library().await { + let library = match core.libraries.get_active_library().await { Some(lib) => lib, None => core.libraries.create_library("Benchmarks", None, context.clone()).await?, }; diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 94405d3c9..ce48feb57 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use std::path::PathBuf; use crate::infra::daemon::client::DaemonClient; -use crate::infra::daemon::types::{DaemonRequest, DaemonResponse}; +use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse}; pub trait Wire { const METHOD: &'static str; @@ -38,7 +38,7 @@ impl CoreClient { match resp { Ok(r) => match r { DaemonResponse::Ok(bytes) => Ok(bytes), - DaemonResponse::Error(e) => Err(anyhow::anyhow!(e)), + DaemonResponse::Error(e) => Err(anyhow::anyhow!(e.to_string())), other => Err(anyhow::anyhow!(format!("unexpected response: {:?}", other))), }, Err(e) => Err(anyhow::anyhow!(e.to_string())), @@ -61,7 +61,7 @@ impl CoreClient { match resp { Ok(r) => match r { DaemonResponse::Ok(bytes) => Ok(decode_from_slice(&bytes, standard())?.0), - DaemonResponse::Error(e) => Err(anyhow::anyhow!(e)), + DaemonResponse::Error(e) => Err(anyhow::anyhow!(e.to_string())), other => Err(anyhow::anyhow!(format!("unexpected response: {:?}", other))), }, Err(e) => Err(anyhow::anyhow!(e.to_string())), @@ -69,6 +69,9 @@ impl CoreClient { } pub async fn send_raw_request(&self, req: &DaemonRequest) -> Result { - self.daemon.send(req).await.map_err(|e| anyhow::anyhow!(e.to_string())) + self.daemon + .send(req) + .await + .map_err(|e| anyhow::anyhow!(e.to_string())) } } diff --git a/core/src/common/utils.rs b/core/src/common/utils.rs index f294d6bc6..ff900fedb 100644 --- a/core/src/common/utils.rs +++ b/core/src/common/utils.rs @@ -1,24 +1,5 @@ //! Shared utility functions - -use std::sync::RwLock; -use uuid::Uuid; - -/// Global reference to current device ID -/// This is set during Core initialization -pub static CURRENT_DEVICE_ID: once_cell::sync::Lazy> = - once_cell::sync::Lazy::new(|| RwLock::new(Uuid::nil())); - -/// Initialize the current device ID -pub fn set_current_device_id(id: Uuid) { - if let Ok(mut device_id) = CURRENT_DEVICE_ID.write() { - *device_id = id; - } -} - -/// Get the current device ID -pub fn get_current_device_id() -> Uuid { - match CURRENT_DEVICE_ID.read() { - Ok(guard) => *guard, - Err(_) => Uuid::nil(), - } -} \ No newline at end of file +// +// Note: Device ID management has been moved to device::manager for better +// module organization. Import from there instead: +// use crate::device::manager::{get_current_device_id, set_current_device_id}; diff --git a/core/src/context.rs b/core/src/context.rs index 0379ed728..7910ba304 100644 --- a/core/src/context.rs +++ b/core/src/context.rs @@ -1,20 +1,19 @@ //! Shared context providing access to core application components. -//! Shared context providing access to core application components. - -use crate::{config::JobLoggingConfig, device::DeviceManager, infra::action::manager::ActionManager, - infra::event::EventBus, crypto::library_key_manager::LibraryKeyManager, library::LibraryManager, - service::network::NetworkingService, volume::VolumeManager, infra::daemon::state::SessionStateService, +use crate::{ + config::JobLoggingConfig, crypto::library_key_manager::LibraryKeyManager, + device::DeviceManager, infra::action::manager::ActionManager, infra::event::EventBus, + library::LibraryManager, service::network::NetworkingService, + service::session::SessionStateService, volume::VolumeManager, }; use std::{path::PathBuf, sync::Arc}; use tokio::sync::RwLock; -/// Shared context providing access to core application components. #[derive(Clone)] pub struct CoreContext { pub events: Arc, pub device_manager: Arc, - pub library_manager: Arc, + pub library_manager: Arc>>>, pub volume_manager: Arc, pub library_key_manager: Arc, // This is wrapped in an RwLock to allow it to be set after initialization @@ -23,7 +22,7 @@ pub struct CoreContext { // Job logging configuration pub job_logging_config: Option, pub job_logs_dir: Option, - pub session_state: Arc, + pub session_state: Arc, } impl CoreContext { @@ -31,25 +30,45 @@ impl CoreContext { pub fn new( events: Arc, device_manager: Arc, - library_manager: Arc, + library_manager: Option>, volume_manager: Arc, library_key_manager: Arc, - session_state: Arc, + session_state: Arc, ) -> Self { Self { events, device_manager, - library_manager, + library_manager: Arc::new(RwLock::new(library_manager)), volume_manager, library_key_manager, action_manager: Arc::new(RwLock::new(None)), networking: Arc::new(RwLock::new(None)), job_logging_config: None, job_logs_dir: None, - session_state, + session_state, } } + /// Get the library manager + pub async fn libraries(&self) -> Arc { + self.library_manager.read().await.clone().unwrap() + } + + /// Get a library by ID + pub async fn get_library(&self, id: uuid::Uuid) -> Option> { + self.libraries().await.get_library(id).await + } + + /// Get the primary library + pub async fn get_primary_library(&self) -> Option> { + self.libraries().await.get_active_library().await + } + + /// Method for Core to set library manager after it's initialized + pub async fn set_libraries(&self, library_manager: Arc) { + *self.library_manager.write().await = Some(library_manager); + } + /// Set job logging configuration pub fn set_job_logging(&mut self, config: JobLoggingConfig, logs_dir: PathBuf) { self.job_logging_config = Some(config); diff --git a/core/src/device/id.rs b/core/src/device/id.rs new file mode 100644 index 000000000..c8b3a3e50 --- /dev/null +++ b/core/src/device/id.rs @@ -0,0 +1,88 @@ +//! Global Device ID Management +//! +//! This module manages the global device ID that is accessible throughout the application. +//! The device ID is stored globally for performance and convenience reasons. +//! +//! ## Why Global? +//! - **Performance**: Device ID is accessed frequently across the codebase. Global access +//! avoids the overhead of Arc/RwLock on every access. +//! - **Convenience**: No need to pass CoreContext everywhere just to get the device ID. +//! - **Simplicity**: Device ID is immutable once set and doesn't need complex lifecycle management. +//! - **Thread Safety**: Works seamlessly in both sync and async contexts. +//! +//! ## Why Not Part of DeviceManager? +//! While the DeviceManager handles device configuration and lifecycle, the global device ID +//! serves a different purpose - it's a runtime cache for quick access. The DeviceManager +//! deals with device state (keys, config, etc.) while this is just the device identifier. +//! +//! ## Architectural Trade-offs +//! **Pros:** +//! - Fast access without context passing +//! - Simple API (`get_current_device_id()`) +//! - No error handling needed +//! - Works anywhere in the codebase +//! +//! **Cons:** +//! - Not "pure" dependency injection +//! - Global mutable state (though read-only after initialization) +//! - Harder to test in isolation +//! +//! ## Module Organization +//! Originally in `common/utils.rs`, moved to `device/id.rs` for better module organization. +//! Device ID management belongs with other device-related code, even though it's +//! implemented as global functions rather than DeviceManager methods. +//! +//! ## Usage Pattern +//! ```rust +//! // Set once during initialization +//! set_current_device_id(device_manager.device_id()?); +//! +//! // Use anywhere in the codebase +//! let device_id = get_current_device_id(); +//! ``` + +use once_cell::sync::Lazy; +use std::sync::RwLock; +use uuid::Uuid; + +/// Global reference to current device ID +/// This is set during Core initialization +pub static CURRENT_DEVICE_ID: Lazy> = Lazy::new(|| RwLock::new(Uuid::nil())); + +/// Initialize the current device ID +pub fn set_current_device_id(id: Uuid) { + if let Ok(mut device_id) = CURRENT_DEVICE_ID.write() { + *device_id = id; + } +} + +/// Get the current device ID +pub fn get_current_device_id() -> Uuid { + match CURRENT_DEVICE_ID.read() { + Ok(guard) => *guard, + Err(_) => Uuid::nil(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_device_id_management() { + let test_id = Uuid::new_v4(); + + // Initially should be nil + assert!(get_current_device_id().is_nil()); + + // Set device ID + set_current_device_id(test_id); + + // Should return the set ID + assert_eq!(get_current_device_id(), test_id); + + // Reset for other tests + set_current_device_id(Uuid::nil()); + assert!(get_current_device_id().is_nil()); + } +} diff --git a/core/src/device/manager.rs b/core/src/device/manager.rs index 0a4d158de..6e2fcef46 100644 --- a/core/src/device/manager.rs +++ b/core/src/device/manager.rs @@ -1,7 +1,7 @@ //! Device manager for handling device lifecycle use super::config::DeviceConfig; -use crate::crypto::device_key_manager::{DeviceKeyManager, DeviceKeyError}; +use crate::crypto::device_key_manager::{DeviceKeyError, DeviceKeyManager}; use crate::domain::device::{Device, OperatingSystem}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -11,257 +11,255 @@ use uuid::Uuid; /// Errors that can occur during device management #[derive(Error, Debug)] pub enum DeviceError { - #[error("Device not initialized")] - NotInitialized, - - #[error("Config path not found")] - ConfigPathNotFound, - - #[error("Unsupported platform")] - UnsupportedPlatform, - - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - - #[error("Serialization error: {0}")] - Serialization(#[from] serde_json::Error), - - #[error("Lock poisoned")] - LockPoisoned, - - #[error("Master key error: {0}")] - MasterKey(#[from] DeviceKeyError), + #[error("Device not initialized")] + NotInitialized, + + #[error("Config path not found")] + ConfigPathNotFound, + + #[error("Unsupported platform")] + UnsupportedPlatform, + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Lock poisoned")] + LockPoisoned, + + #[error("Master key error: {0}")] + MasterKey(#[from] DeviceKeyError), } /// Manages the current device state pub struct DeviceManager { - /// Current device configuration - config: Arc>, - /// Master encryption key manager - device_key_manager: DeviceKeyManager, - /// Custom data directory (if any) - data_dir: Option, + /// Current device configuration + config: Arc>, + /// Master encryption key manager + device_key_manager: DeviceKeyManager, + /// Custom data directory (if any) + data_dir: Option, } impl DeviceManager { - /// Initialize the device manager - /// - /// This will either load existing device configuration or create a new one - pub fn init() -> Result { - let config = match DeviceConfig::load() { - Ok(config) => config, - Err(DeviceError::NotInitialized) => { - // Create new device configuration - let os = detect_os(); - let name = get_device_name(); - let mut config = DeviceConfig::new(name, os); - - // Try to detect hardware model - config.hardware_model = detect_hardware_model(); - - // Save the new configuration - config.save()?; - config - } - Err(e) => return Err(e), - }; - - let device_key_manager = DeviceKeyManager::new()?; - // Initialize master key on first run - device_key_manager.get_or_create_master_key()?; - - Ok(Self { - config: Arc::new(RwLock::new(config)), - device_key_manager, - data_dir: None, - }) - } - - /// Initialize the device manager with a custom data directory - pub fn init_with_path(data_dir: &PathBuf) -> Result { - let config = match DeviceConfig::load_from(data_dir) { - Ok(config) => config, - Err(DeviceError::NotInitialized) => { - // Create new device configuration - let os = detect_os(); - let name = get_device_name(); - let mut config = DeviceConfig::new(name, os); - - // Try to detect hardware model - config.hardware_model = detect_hardware_model(); - - // Save the new configuration - config.save_to(data_dir)?; - config - } - Err(e) => return Err(e), - }; - - // Use fallback file for master key when using custom data directory - let master_key_path = data_dir.join("master_key"); - let device_key_manager = DeviceKeyManager::new_with_fallback(master_key_path)?; - // Initialize master key on first run - device_key_manager.get_or_create_master_key()?; - - Ok(Self { - config: Arc::new(RwLock::new(config)), - device_key_manager, - data_dir: Some(data_dir.clone()), - }) - } - - /// Get the current device ID - pub fn device_id(&self) -> Result { - self.config - .read() - .map(|c| c.id) - .map_err(|_| DeviceError::LockPoisoned) - } - - /// Get the current device as a domain Device object - pub async fn current_device(&self) -> Device { - let config = self.config.read().unwrap(); - Device { - id: config.id, - name: config.name.clone(), - os: parse_os(&config.os), - hardware_model: config.hardware_model.clone(), - network_addresses: vec![], - is_online: true, - sync_leadership: std::collections::HashMap::new(), - last_seen_at: chrono::Utc::now(), - created_at: chrono::Utc::now(), - updated_at: chrono::Utc::now(), - } - } - - /// Get the current device configuration - pub fn config(&self) -> Result { - self.config - .read() - .map(|c| c.clone()) - .map_err(|_| DeviceError::LockPoisoned) - } - - /// Create a Device domain object from current configuration - pub fn to_device(&self) -> Result { - let config = self.config()?; - - // Create device with loaded configuration - let mut device = Device::new(config.name.clone()); - device.id = config.id; - device.os = parse_os(&config.os); - device.hardware_model = config.hardware_model.clone(); - device.created_at = config.created_at; - - Ok(device) - } - - /// Update device name - pub fn set_name(&self, name: String) -> Result<(), DeviceError> { - let mut config = self.config - .write() - .map_err(|_| DeviceError::LockPoisoned)?; - - config.name = name; - - // Save to the appropriate location based on whether we have a custom data dir - if let Some(data_dir) = &self.data_dir { - config.save_to(data_dir)?; - } else { - config.save()?; - } - - Ok(()) - } - - /// Get the master encryption key - pub fn master_key(&self) -> Result<[u8; 32], DeviceError> { - Ok(self.device_key_manager.get_master_key()?) - } - - /// Get the master encryption key as hex string - pub fn master_key_hex(&self) -> Result { - Ok(self.device_key_manager.get_master_key_hex()?) - } - - /// Regenerate the master encryption key (dangerous operation) - pub fn regenerate_device_key(&self) -> Result<[u8; 32], DeviceError> { - Ok(self.device_key_manager.regenerate_master_key()?) - } + /// Initialize the device manager + /// + /// This will either load existing device configuration or create a new one + pub fn init() -> Result { + let config = match DeviceConfig::load() { + Ok(config) => config, + Err(DeviceError::NotInitialized) => { + // Create new device configuration + let os = detect_os(); + let name = get_device_name(); + let mut config = DeviceConfig::new(name, os); + + // Try to detect hardware model + config.hardware_model = detect_hardware_model(); + + // Save the new configuration + config.save()?; + config + } + Err(e) => return Err(e), + }; + + let device_key_manager = DeviceKeyManager::new()?; + // Initialize master key on first run + device_key_manager.get_or_create_master_key()?; + + Ok(Self { + config: Arc::new(RwLock::new(config)), + device_key_manager, + data_dir: None, + }) + } + + /// Initialize the device manager with a custom data directory + pub fn init_with_path(data_dir: &PathBuf) -> Result { + let config = match DeviceConfig::load_from(data_dir) { + Ok(config) => config, + Err(DeviceError::NotInitialized) => { + // Create new device configuration + let os = detect_os(); + let name = get_device_name(); + let mut config = DeviceConfig::new(name, os); + + // Try to detect hardware model + config.hardware_model = detect_hardware_model(); + + // Save the new configuration + config.save_to(data_dir)?; + config + } + Err(e) => return Err(e), + }; + + // Use fallback file for master key when using custom data directory + let master_key_path = data_dir.join("master_key"); + let device_key_manager = DeviceKeyManager::new_with_fallback(master_key_path)?; + // Initialize master key on first run + device_key_manager.get_or_create_master_key()?; + + Ok(Self { + config: Arc::new(RwLock::new(config)), + device_key_manager, + data_dir: Some(data_dir.clone()), + }) + } + + /// Get the current device ID + pub fn device_id(&self) -> Result { + self.config + .read() + .map(|c| c.id) + .map_err(|_| DeviceError::LockPoisoned) + } + + /// Get the current device as a domain Device object + pub async fn current_device(&self) -> Device { + let config = self.config.read().unwrap(); + Device { + id: config.id, + name: config.name.clone(), + os: parse_os(&config.os), + hardware_model: config.hardware_model.clone(), + network_addresses: vec![], + is_online: true, + sync_leadership: std::collections::HashMap::new(), + last_seen_at: chrono::Utc::now(), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + } + } + + /// Get the current device configuration + pub fn config(&self) -> Result { + self.config + .read() + .map(|c| c.clone()) + .map_err(|_| DeviceError::LockPoisoned) + } + + /// Create a Device domain object from current configuration + pub fn to_device(&self) -> Result { + let config = self.config()?; + + // Create device with loaded configuration + let mut device = Device::new(config.name.clone()); + device.id = config.id; + device.os = parse_os(&config.os); + device.hardware_model = config.hardware_model.clone(); + device.created_at = config.created_at; + + Ok(device) + } + + /// Update device name + pub fn set_name(&self, name: String) -> Result<(), DeviceError> { + let mut config = self.config.write().map_err(|_| DeviceError::LockPoisoned)?; + + config.name = name; + + // Save to the appropriate location based on whether we have a custom data dir + if let Some(data_dir) = &self.data_dir { + config.save_to(data_dir)?; + } else { + config.save()?; + } + + Ok(()) + } + + /// Get the master encryption key + pub fn master_key(&self) -> Result<[u8; 32], DeviceError> { + Ok(self.device_key_manager.get_master_key()?) + } + + /// Get the master encryption key as hex string + pub fn master_key_hex(&self) -> Result { + Ok(self.device_key_manager.get_master_key_hex()?) + } + + /// Regenerate the master encryption key (dangerous operation) + pub fn regenerate_device_key(&self) -> Result<[u8; 32], DeviceError> { + Ok(self.device_key_manager.regenerate_master_key()?) + } } /// Get the device name from the system fn get_device_name() -> String { - whoami::devicename() + whoami::devicename() } /// Detect the operating system fn detect_os() -> String { - if cfg!(target_os = "macos") { - "macOS".to_string() - } else if cfg!(target_os = "windows") { - "Windows".to_string() - } else if cfg!(target_os = "linux") { - "Linux".to_string() - } else if cfg!(target_os = "ios") { - "iOS".to_string() - } else if cfg!(target_os = "android") { - "Android".to_string() - } else { - "Unknown".to_string() - } + if cfg!(target_os = "macos") { + "macOS".to_string() + } else if cfg!(target_os = "windows") { + "Windows".to_string() + } else if cfg!(target_os = "linux") { + "Linux".to_string() + } else if cfg!(target_os = "ios") { + "iOS".to_string() + } else if cfg!(target_os = "android") { + "Android".to_string() + } else { + "Unknown".to_string() + } } /// Parse OS string back to enum fn parse_os(os: &str) -> OperatingSystem { - match os { - "macOS" => OperatingSystem::MacOS, - "Windows" => OperatingSystem::Windows, - "Linux" => OperatingSystem::Linux, - "iOS" => OperatingSystem::IOs, - "Android" => OperatingSystem::Android, - _ => OperatingSystem::Other, - } + match os { + "macOS" => OperatingSystem::MacOS, + "Windows" => OperatingSystem::Windows, + "Linux" => OperatingSystem::Linux, + "iOS" => OperatingSystem::IOs, + "Android" => OperatingSystem::Android, + _ => OperatingSystem::Other, + } } /// Try to detect hardware model fn detect_hardware_model() -> Option { - #[cfg(target_os = "macos")] - { - // Try to get model from system_profiler - use std::process::Command; - - let output = Command::new("system_profiler") - .args(&["SPHardwareDataType", "-json"]) - .output() - .ok()?; - - if output.status.success() { - let json_str = String::from_utf8_lossy(&output.stdout); - // Simple extraction - in production would use proper JSON parsing - if let Some(start) = json_str.find("\"machine_model\":") { - let substr = &json_str[start + 17..]; - if let Some(end) = substr.find('"') { - return Some(substr[..end].to_string()); - } - } - } - } - - None + #[cfg(target_os = "macos")] + { + // Try to get model from system_profiler + use std::process::Command; + + let output = Command::new("system_profiler") + .args(&["SPHardwareDataType", "-json"]) + .output() + .ok()?; + + if output.status.success() { + let json_str = String::from_utf8_lossy(&output.stdout); + // Simple extraction - in production would use proper JSON parsing + if let Some(start) = json_str.find("\"machine_model\":") { + let substr = &json_str[start + 17..]; + if let Some(end) = substr.find('"') { + return Some(substr[..end].to_string()); + } + } + } + } + + None } #[cfg(test)] mod tests { - use super::*; - use tempfile::TempDir; - - #[test] - fn test_device_config() { - let config = DeviceConfig::new("Test Device".to_string(), "Linux".to_string()); - assert_eq!(config.name, "Test Device"); - assert_eq!(config.os, "Linux"); - assert!(config.hardware_model.is_none()); - } -} \ No newline at end of file + use super::*; + use tempfile::TempDir; + + #[test] + fn test_device_config() { + let config = DeviceConfig::new("Test Device".to_string(), "Linux".to_string()); + assert_eq!(config.name, "Test Device"); + assert_eq!(config.os, "Linux"); + assert!(config.hardware_model.is_none()); + } +} diff --git a/core/src/device/mod.rs b/core/src/device/mod.rs index 0c2ef85a8..5ad8f025e 100644 --- a/core/src/device/mod.rs +++ b/core/src/device/mod.rs @@ -1,12 +1,15 @@ //! Device management module -//! +//! //! Handles persistent device identification across Spacedrive installations mod config; +mod id; mod manager; + +pub use crate::crypto::device_key_manager::{DeviceKeyError, DeviceKeyManager}; pub use config::DeviceConfig; -pub use manager::{DeviceManager, DeviceError}; -pub use crate::crypto::device_key_manager::{DeviceKeyManager, DeviceKeyError}; +pub use id::{get_current_device_id, set_current_device_id, CURRENT_DEVICE_ID}; +pub use manager::{DeviceError, DeviceManager}; // Re-export domain types -pub use crate::domain::device::{Device, OperatingSystem}; \ No newline at end of file +pub use crate::domain::device::{Device, OperatingSystem}; diff --git a/core/src/domain/addressing.rs b/core/src/domain/addressing.rs index f2017b2ba..f17a48c68 100644 --- a/core/src/domain/addressing.rs +++ b/core/src/domain/addressing.rs @@ -56,7 +56,7 @@ impl SdPath { /// Create an SdPath for a local file on this device pub fn local(path: impl Into) -> Self { Self::Physical { - device_id: crate::common::utils::get_current_device_id(), + device_id: crate::device::get_current_device_id(), path: path.into(), } } @@ -65,7 +65,7 @@ impl SdPath { pub fn is_local(&self) -> bool { match self { Self::Physical { device_id, .. } => { - *device_id == crate::common::utils::get_current_device_id() + *device_id == crate::device::get_current_device_id() } Self::Content { .. } => false, // Content paths are abstract, not inherently local } @@ -75,7 +75,7 @@ impl SdPath { pub fn as_local_path(&self) -> Option<&Path> { match self { Self::Physical { device_id, path } => { - if *device_id == crate::common::utils::get_current_device_id() { + if *device_id == crate::device::get_current_device_id() { Some(path) } else { None @@ -89,7 +89,7 @@ impl SdPath { pub fn display(&self) -> String { match self { Self::Physical { device_id, path } => { - if *device_id == crate::common::utils::get_current_device_id() { + if *device_id == crate::device::get_current_device_id() { path.display().to_string() } else { format!("sd://{}/{}", device_id, path.display()) diff --git a/core/src/domain/device.rs b/core/src/domain/device.rs index 00cdd4404..e1ea27749 100644 --- a/core/src/domain/device.rs +++ b/core/src/domain/device.rs @@ -109,8 +109,8 @@ impl Device { } /// Check if this is the current device - pub fn is_current(&self) -> bool { - self.id == crate::common::utils::get_current_device_id() + pub fn is_current(&self, current_device_id: Uuid) -> bool { + self.id == current_device_id } /// Set sync role for a library diff --git a/core/src/infra/action/manager.rs b/core/src/infra/action/manager.rs index 517accf9e..e2a7f7dbc 100644 --- a/core/src/infra/action/manager.rs +++ b/core/src/infra/action/manager.rs @@ -2,7 +2,6 @@ use super::error::{ActionError, ActionResult}; use crate::{ - common::utils::get_current_device_id, context::CoreContext, infra::db::entities::{audit_log, AuditLog, AuditLogActive}, }; @@ -28,17 +27,17 @@ impl ActionManager { &self, action: A, ) -> Result { - // 1. Validate the action + // Validate the action action.validate(self.context.clone()).await?; - // 2. Log action execution (capture action_kind before move) + // Log action execution (capture action_kind before move) let action_kind = action.action_kind(); tracing::info!("Executing core action: {}", action_kind); - // 3. Execute the action directly + // Execute the action directly let result = action.execute(self.context.clone()).await; - // 4. Log result + // Log result match &result { Ok(_) => tracing::info!("Core action {} completed successfully", action_kind), Err(e) => tracing::error!("Core action {} failed: {}", action_kind, e), @@ -50,35 +49,41 @@ impl ActionManager { /// Dispatch a library-scoped action (library context pre-validated) pub async fn dispatch_library( &self, - library_id: Uuid, + library_id: Option, action: A, ) -> Result { - // 1. Get and validate library exists (eliminates boilerplate!) + // Either use the provided library_id, or the currently active one from core state + // An action created with a given library will always execute on that library + // Actions and queries should not hold their own library id state, rather receive it as context + let effective_library_id = library_id + .or(self.context.session_state.get().await.current_library_id) + .ok_or(ActionError::LibraryNotFound(library_id.unwrap_or_default()))?; + + // Get and validate library exists let library = self .context - .library_manager - .get_library(library_id) + .get_library(effective_library_id) .await - .ok_or_else(|| ActionError::LibraryNotFound(library_id))?; + .ok_or_else(|| ActionError::LibraryNotFound(effective_library_id))?; - // 2. Validate the action with library context + // Validate the action with library context action.validate(&library, self.context.clone()).await?; - // 3. Create audit log entry (capture values before move) + // Create audit log entry (capture values before move) let action_kind = action.action_kind(); let audit_entry = self - .create_action_audit_log(library_id, action_kind) + .create_action_audit_log(effective_library_id, action_kind) .await?; - // 4. Execute the action with validated library + // Execute the action with validated library let result = action.execute(library, self.context.clone()).await; - // 5. Finalize audit log with result + // Finalize audit log with result let audit_result = match &result { Ok(_) => Ok("Action completed successfully".to_string()), Err(e) => Err(ActionError::Internal(e.to_string())), }; - self.finalize_audit_log(audit_entry, &audit_result, library_id) + self.finalize_audit_log(audit_entry, &audit_result, effective_library_id) .await?; result @@ -93,10 +98,11 @@ impl ActionManager { let library = self.get_library(library_id).await?; let db = library.db().conn(); + let device_id = crate::device::get_current_device_id(); let audit_entry = AuditLogActive { uuid: Set(Uuid::new_v4().to_string()), action_type: Set(action_kind.to_string()), - actor_device_id: Set(get_current_device_id().to_string()), + actor_device_id: Set(device_id.to_string()), targets: Set("{}".to_string()), // TODO: Add targets_summary to ActionTrait status: Set(audit_log::ActionStatus::InProgress), job_id: Set(None), @@ -110,16 +116,6 @@ impl ActionManager { audit_entry.insert(db).await.map_err(ActionError::SeaOrm) } - /// Create an initial audit log entry (legacy method - kept for compatibility) - async fn create_audit_log( - &self, - library_id: Uuid, - action_kind: &str, - ) -> ActionResult { - // Delegate to the new method - self.create_action_audit_log(library_id, action_kind).await - } - /// Finalize the audit log entry with the result async fn finalize_audit_log( &self, @@ -151,7 +147,6 @@ impl ActionManager { active_model.status = Set(audit_log::ActionStatus::Completed); active_model.completed_at = Set(Some(chrono::Utc::now())); // Extract job_id if present in certain output types - // TODO: Update this when we have job-based actions active_model.result_payload = Set(Some(output.clone())); } Err(error) => { @@ -172,7 +167,6 @@ impl ActionManager { library_id: Uuid, ) -> ActionResult> { self.context - .library_manager .get_library(library_id) .await .ok_or(ActionError::LibraryNotFound(library_id)) diff --git a/core/src/infra/daemon/bootstrap.rs b/core/src/infra/daemon/bootstrap.rs index beae69c4b..e5908d36b 100644 --- a/core/src/infra/daemon/bootstrap.rs +++ b/core/src/infra/daemon/bootstrap.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::sync::Arc; use crate::infra::daemon::{ - instance::CoreInstanceManager, rpc::RpcServer, state::SessionStateService, + instance::CoreInstanceManager, rpc::RpcServer, }; /// Start a default daemon server with built-in handlers and default instance @@ -11,12 +11,10 @@ pub async fn start_default_server( data_dir: PathBuf, enable_networking: bool, ) -> Result<(), Box> { - let session = Arc::new(SessionStateService::new(data_dir.clone())); let instances = Arc::new(CoreInstanceManager::new( data_dir.clone(), enable_networking, - session.clone(), )); - let mut server = RpcServer::new(socket_path, instances, session); + let mut server = RpcServer::new(socket_path, instances); server.start().await } diff --git a/core/src/infra/daemon/client.rs b/core/src/infra/daemon/client.rs index df9812368..0905deb51 100644 --- a/core/src/infra/daemon/client.rs +++ b/core/src/infra/daemon/client.rs @@ -9,17 +9,44 @@ pub struct DaemonClient { } impl DaemonClient { - pub fn new(socket_path: PathBuf) -> Self { Self { socket_path } } + pub fn new(socket_path: PathBuf) -> Self { + Self { socket_path } + } + + pub async fn send( + &self, + req: &DaemonRequest, + ) -> Result> { + let mut stream = UnixStream::connect(&self.socket_path).await.map_err(|e| { + format!( + "Failed to connect to daemon socket at {}: {}", + self.socket_path.display(), + e + ) + })?; + + let payload = + serde_json::to_vec(req).map_err(|e| format!("Failed to serialize request: {}", e))?; + + stream + .write_all(&payload) + .await + .map_err(|e| format!("Failed to send request to daemon: {}", e))?; + + stream + .shutdown() + .await + .map_err(|e| format!("Failed to shutdown write stream: {}", e))?; - pub async fn send(&self, req: &DaemonRequest) -> Result> { - let mut stream = UnixStream::connect(&self.socket_path).await?; - let payload = serde_json::to_vec(req)?; - stream.write_all(&payload).await?; - stream.shutdown().await?; let mut buf = Vec::new(); - stream.read_to_end(&mut buf).await?; - Ok(serde_json::from_slice(&buf)?) + stream + .read_to_end(&mut buf) + .await + .map_err(|e| format!("Failed to read response from daemon: {}", e))?; + + let response: DaemonResponse = serde_json::from_slice(&buf) + .map_err(|e| format!("Failed to deserialize daemon response: {}", e))?; + + Ok(response) } } - - diff --git a/core/src/infra/daemon/dispatch.rs b/core/src/infra/daemon/dispatch.rs index 0c8187ebf..372efca3d 100644 --- a/core/src/infra/daemon/dispatch.rs +++ b/core/src/infra/daemon/dispatch.rs @@ -4,73 +4,42 @@ use std::sync::Arc; use futures::future::BoxFuture; use tokio::sync::RwLock; -use super::state::SessionState; use crate::Core; use bincode::config::standard; use bincode::serde::{decode_from_slice, encode_to_vec}; use serde::{de::DeserializeOwned, Serialize}; -/// Signature for a generic action handler: takes raw bytes, returns raw bytes -pub type ActionHandler = Arc< - dyn Fn(Vec, Arc, SessionState) -> BoxFuture<'static, Result, String>> - + Send - + Sync, ->; - -/// Signature for a generic query handler (placeholder for future) -pub type QueryHandler = Arc< - dyn Fn(Vec, Arc, SessionState) -> BoxFuture<'static, Result, String>> - + Send - + Sync, ->; +/// Unified handler signature for both actions and queries +/// Actions return empty Vec, queries return serialized output +pub type OperationHandler = + Arc, Arc) -> BoxFuture<'static, Result, String>> + Send + Sync>; /// Registry that maps type IDs to handlers. The daemon remains agnostic of concrete types. pub struct DispatchRegistry { - actions: RwLock>, - queries: RwLock>, + handlers: RwLock>, } impl DispatchRegistry { pub fn new() -> Arc { Arc::new(Self { - actions: RwLock::new(HashMap::new()), - queries: RwLock::new(HashMap::new()), + handlers: RwLock::new(HashMap::new()), }) } - pub async fn register_action(&self, type_id: impl Into, handler: ActionHandler) { - self.actions.write().await.insert(type_id.into(), handler); + pub async fn register_handler(&self, type_id: impl Into, handler: OperationHandler) { + self.handlers.write().await.insert(type_id.into(), handler); } - pub async fn register_query(&self, type_id: impl Into, handler: QueryHandler) { - self.queries.write().await.insert(type_id.into(), handler); - } - - pub async fn dispatch_action( + pub async fn dispatch( &self, type_id: &str, payload: Vec, core: Arc, - session: SessionState, ) -> Result, String> { - let map = self.actions.read().await; + let map = self.handlers.read().await; match map.get(type_id) { - Some(handler) => (handler)(payload, core, session).await, - None => Err("Unknown action type".into()), - } - } - - pub async fn dispatch_query( - &self, - type_id: &str, - payload: Vec, - core: Arc, - session: SessionState, - ) -> Result, String> { - let map = self.queries.read().await; - match map.get(type_id) { - Some(handler) => (handler)(payload, core, session).await, - None => Err("Unknown query type".into()), + Some(handler) => (handler)(payload, core).await, + None => Err(format!("Unknown operation type: {}", type_id)), } } } @@ -78,22 +47,22 @@ impl DispatchRegistry { /// Build a generic action handler that decodes T from payload, executes, and returns empty Ok bytes pub fn make_action_handler( exec: std::sync::Arc< - dyn Fn(T, std::sync::Arc, SessionState) -> BoxFuture<'static, Result<(), String>> + dyn Fn(T, std::sync::Arc) -> BoxFuture<'static, Result<(), String>> + Send + Sync + 'static, >, -) -> ActionHandler +) -> OperationHandler where T: DeserializeOwned + Send + 'static, { - std::sync::Arc::new(move |payload, core, session| { + std::sync::Arc::new(move |payload, core| { let exec = exec.clone(); Box::pin(async move { let val: T = decode_from_slice(&payload, standard()) .map_err(|e| format!("deserialize: {}", e))? .0; - (exec)(val, core, session).await.map(|_| Vec::new()) + (exec)(val, core).await.map(|_| Vec::new()) }) }) } @@ -101,23 +70,23 @@ where /// Build a generic query handler that decodes Q, executes to O, and encodes O pub fn make_query_handler( exec: std::sync::Arc< - dyn Fn(Q, std::sync::Arc, SessionState) -> BoxFuture<'static, Result> + dyn Fn(Q, std::sync::Arc) -> BoxFuture<'static, Result> + Send + Sync + 'static, >, -) -> QueryHandler +) -> OperationHandler where Q: DeserializeOwned + Send + 'static, O: Serialize + Send + 'static, { - std::sync::Arc::new(move |payload, core, session| { + std::sync::Arc::new(move |payload, core| { let exec = exec.clone(); Box::pin(async move { let val: Q = decode_from_slice(&payload, standard()) .map_err(|e| format!("deserialize: {}", e))? .0; - let out = (exec)(val, core, session).await?; + let out = (exec)(val, core).await?; encode_to_vec(&out, standard()).map_err(|e| e.to_string()) }) }) diff --git a/core/src/infra/daemon/instance.rs b/core/src/infra/daemon/instance.rs index 484c74196..dee1aa83a 100644 --- a/core/src/infra/daemon/instance.rs +++ b/core/src/infra/daemon/instance.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use tokio::sync::RwLock; use crate::Core; -use crate::infra::daemon::state::SessionStateService; /// Validate instance name to prevent path traversal attacks pub fn validate_instance_name(instance: &str) -> Result<(), String> { @@ -14,7 +13,10 @@ pub fn validate_instance_name(instance: &str) -> Result<(), String> { if instance.len() > 64 { return Err("Instance name too long (max 64 characters)".to_string()); } - if !instance.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') { + if !instance + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string()); } Ok(()) @@ -25,16 +27,14 @@ pub struct CoreInstanceManager { instances: Arc>>>, default_data_dir: PathBuf, enable_networking: bool, - session_state: Arc, } impl CoreInstanceManager { - pub fn new(default_data_dir: PathBuf, enable_networking: bool, session_state: Arc) -> Self { + pub fn new(default_data_dir: PathBuf, enable_networking: bool) -> Self { Self { instances: Arc::new(RwLock::new(HashMap::new())), default_data_dir, enable_networking, - session_state, } } @@ -67,13 +67,13 @@ impl CoreInstanceManager { // Instance doesn't exist, create it let data_dir = data_dir.unwrap_or_else(|| self.default_data_dir.clone()); let core = Arc::new( - Core::new_with_config(data_dir, self.session_state.clone()) + Core::new_with_config(data_dir) .await - .map_err(|e| format!("Failed to create core: {}", e))? + .map_err(|e| format!("Failed to create core: {}", e))?, ); let core_with_networking = if self.enable_networking { - Core::init_networking_shared(core.clone(), self.session_state.clone()) + Core::init_networking_shared(core.clone()) .await .map_err(|e| format!("Failed to initialize networking: {}", e))? } else { @@ -93,10 +93,10 @@ impl CoreInstanceManager { validate_instance_name(name)?; if let Some(core) = self.instances.write().await.remove(name) { - core.shutdown().await.map_err(|e| format!("Shutdown failed: {}", e))?; + core.shutdown() + .await + .map_err(|e| format!("Shutdown failed: {}", e))?; } Ok(()) } } - - diff --git a/core/src/infra/daemon/mod.rs b/core/src/infra/daemon/mod.rs index 7a1e61f13..83c00a707 100644 --- a/core/src/infra/daemon/mod.rs +++ b/core/src/infra/daemon/mod.rs @@ -1,12 +1,9 @@ //! Client-agnostic daemon infrastructure +pub mod bootstrap; +pub mod client; +pub mod dispatch; +pub mod health; pub mod instance; pub mod rpc; -pub mod state; pub mod types; -pub mod client; -pub mod health; -pub mod dispatch; -pub mod bootstrap; - - diff --git a/core/src/infra/daemon/rpc.rs b/core/src/infra/daemon/rpc.rs index cb7dd7f6c..4567388f4 100644 --- a/core/src/infra/daemon/rpc.rs +++ b/core/src/infra/daemon/rpc.rs @@ -6,29 +6,22 @@ use tokio::net::UnixListener; use tokio::sync::mpsc; use crate::infra::daemon::instance::CoreInstanceManager; -use crate::infra::daemon::state::SessionStateService; use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse}; /// Minimal JSON-over-UDS RPC server pub struct RpcServer { socket_path: PathBuf, instances: Arc, - session: Arc, shutdown_tx: mpsc::Sender<()>, shutdown_rx: mpsc::Receiver<()>, } impl RpcServer { - pub fn new( - socket_path: PathBuf, - instances: Arc, - session: Arc, - ) -> Self { + pub fn new(socket_path: PathBuf, instances: Arc) -> Self { let (shutdown_tx, shutdown_rx) = mpsc::channel(1); Self { socket_path, instances, - session, shutdown_tx, shutdown_rx, } @@ -48,13 +41,12 @@ impl RpcServer { match result { Ok((stream, _addr)) => { let instances = self.instances.clone(); - let session = self.session.clone(); let shutdown_tx = self.shutdown_tx.clone(); // Spawn task for concurrent request handling tokio::spawn(async move { // Convert errors to strings to ensure Send - if let Err(e) = Self::handle_connection(stream, instances, session, shutdown_tx).await { + if let Err(e) = Self::handle_connection(stream, instances, shutdown_tx).await { eprintln!("Connection error: {}", e); } }); @@ -81,7 +73,6 @@ impl RpcServer { async fn handle_connection( mut stream: tokio::net::UnixStream, instances: Arc, - session: Arc, shutdown_tx: mpsc::Sender<()>, ) -> Result<(), String> { // Request size limit (10MB) @@ -125,7 +116,7 @@ impl RpcServer { // Try to parse JSON - if successful, we have the complete request if let Ok(req) = serde_json::from_slice::(&buf) { - let resp = Self::process_request(req, &instances, &session, &shutdown_tx).await; + let resp = Self::process_request(req, &instances, &shutdown_tx).await; // Send response let response_bytes = serde_json::to_string(&resp) @@ -143,28 +134,21 @@ impl RpcServer { async fn process_request( request: DaemonRequest, instances: &Arc, - session: &Arc, shutdown_tx: &mpsc::Sender<()>, ) -> DaemonResponse { match request { DaemonRequest::Ping => DaemonResponse::Pong, DaemonRequest::Action { method, payload } => match instances.get_default().await { - Ok(core) => { - let session_snapshot = session.get().await; - match core - .execute_action_by_method(&method, payload, session_snapshot) - .await - { - Ok(out) => DaemonResponse::Ok(out), - Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)), - } - } + Ok(core) => match core.execute_operation_by_method(&method, payload).await { + Ok(out) => DaemonResponse::Ok(out), + Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)), + }, Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)), }, DaemonRequest::Query { method, payload } => match instances.get_default().await { - Ok(core) => match core.execute_query_by_method(&method, payload).await { + Ok(core) => match core.execute_operation_by_method(&method, payload).await { Ok(out) => DaemonResponse::Ok(out), Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)), }, diff --git a/core/src/lib.rs b/core/src/lib.rs index e0c1c4cad..687581176 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,8 +1,9 @@ #![allow(warnings)] //! Spacedrive Core v2 //! -//! A unified, simplified architecture for cross-platform file management. +//! A complete reimplementation of Spacedrive's core with modern Rust patterns, unified file operations, and a foundation built for the Virtual Distributed File System vision. +// Module declarations pub mod client; pub mod common; pub mod config; @@ -20,30 +21,34 @@ pub mod service; pub mod testing; pub mod volume; -use service::network::protocol::pairing::PairingProtocolHandler; -use service::network::utils::logging::NetworkLogger; - // Compatibility module for legacy networking references pub mod networking { pub use crate::service::network::*; } -use crate::config::AppConfig; -use crate::context::CoreContext; -use crate::cqrs::{Query, QueryManager}; -use crate::device::DeviceManager; -use crate::infra::action::builder::ActionBuilder; -use crate::infra::action::manager::ActionManager; -use crate::infra::action::{CoreAction, LibraryAction}; -use crate::infra::event::{Event, EventBus}; -use crate::library::LibraryManager; -use crate::service::Services; -use crate::volume::{VolumeDetectionConfig, VolumeManager}; -use std::path::PathBuf; -use std::sync::Arc; +// Internal crate imports +use crate::{ + config::AppConfig, + context::CoreContext, + cqrs::{Query, QueryManager}, + device::DeviceManager, + infra::{ + action::{builder::ActionBuilder, manager::ActionManager, CoreAction, LibraryAction}, + event::{Event, EventBus}, + }, + library::LibraryManager, + service::session::SessionStateService, + service::{ + network::{protocol::pairing::PairingProtocolHandler, utils::logging::NetworkLogger}, + Services, + }, + volume::{VolumeDetectionConfig, VolumeManager}, +}; + +// External crate imports +use std::{path::PathBuf, sync::Arc}; use tokio::sync::{mpsc, RwLock}; use tracing::{error, info}; -use crate::infra::daemon::state::SessionStateService; /// Pending pairing request information #[derive(Debug, Clone)] @@ -63,11 +68,8 @@ struct SpacedropRequest { message: Option, file_size: u64, } - -// NOTE: SimplePairingUI has been moved to CLI infrastructure -// See: src/infrastructure/cli/pairing_ui.rs for CLI-specific implementations - /// Bridge between networking events and core events +/// TODO: why? - james pub struct NetworkEventBridge { network_events: mpsc::UnboundedReceiver, core_events: Arc, @@ -141,58 +143,51 @@ pub struct Core { } impl Core { - - /// Initialize a new Core instance with custom data directory - pub async fn new_with_config(data_dir: PathBuf, session_state: Arc) -> Result> { + pub async fn new_with_config(data_dir: PathBuf) -> Result> { info!("Initializing Spacedrive Core at {:?}", data_dir); - // 1. Load or create app config + // Load or create app config let config = AppConfig::load_or_create(&data_dir)?; config.ensure_directories()?; let config = Arc::new(RwLock::new(config)); - // 2. Initialize device manager + // Initialize device manager let device = Arc::new(DeviceManager::init_with_path(&data_dir)?); - // Set the global device ID for legacy compatibility - common::utils::set_current_device_id(device.device_id()?); - // 3. Create event bus + // Set a global device ID for convenience + crate::device::set_current_device_id(device.device_id()?); + + // Create event bus let events = Arc::new(EventBus::default()); - // 4. Initialize volume manager + // Initialize volume manager let volume_config = VolumeDetectionConfig::default(); let device_id = device.device_id()?; let volumes = Arc::new(VolumeManager::new(device_id, volume_config, events.clone())); - // 5. Initialize volume detection + // Initialize volume detection // info!("Initializing volume detection..."); // match volumes.initialize().await { // Ok(()) => info!("Volume manager initialized"), // Err(e) => error!("Failed to initialize volume manager: {}", e), // } - // 6. Initialize library manager with libraries directory - let libraries_dir = config.read().await.libraries_dir(); - let libraries = Arc::new(LibraryManager::new_with_dir(libraries_dir, events.clone())); - - // 7. Initialize library key manager + // Initialize library key manager let library_key_manager = Arc::new(crate::crypto::library_key_manager::LibraryKeyManager::new()?); - // 8. Register all job types - info!("Registering job types..."); - crate::ops::register_all_jobs(); - info!("Job types registered"); + // Initialize session state service + let session_state = Arc::new(SessionStateService::new(data_dir)); - // 9. Create the context that will be shared with services + // Create the context that will be shared with services let mut context_inner = CoreContext::new( events.clone(), device.clone(), - libraries.clone(), + None, // Libraries will be set after context creation volumes.clone(), library_key_manager.clone(), - session_state.clone(), // Pass session_state here + session_state.clone(), ); // Set job logging configuration if enabled @@ -203,15 +198,29 @@ impl Core { } drop(app_config); + // Create the shared context let context = Arc::new(context_inner); - // 10. Initialize services first, passing them the context + // Initialize library manager with libraries directory and context + let libraries_dir = config.read().await.libraries_dir(); + let libraries = Arc::new(LibraryManager::new_with_dir( + libraries_dir, + events.clone(), + session_state.clone(), + volumes.clone(), + device.clone(), + )); + + // Update context with libraries + context.set_libraries(libraries.clone()).await; + + // Initialize services first, passing them the context let services = Services::new(context.clone()); - // 11. Auto-load all libraries with context for job manager initialization + // Auto-load all libraries with context for job manager initialization info!("Loading existing libraries..."); let loaded_libraries: Vec> = - match libraries.load_all_with_context(context.clone()).await { + match libraries.load_all(context.clone()).await { Ok(count) => { info!("Loaded {} libraries", count); libraries.list().await @@ -396,13 +405,12 @@ impl Core { /// Initialize networking from Arc - for daemon use pub async fn init_networking_shared( core: Arc, - session_state: Arc, ) -> Result, Box> { info!("Initializing networking for shared core..."); // Create a new Core with networking enabled let mut new_core = - Core::new_with_config(core.config().read().await.data_dir.clone(), session_state).await?; + Core::new_with_config(core.config().read().await.data_dir.clone()).await?; // Initialize networking on the new core new_core.init_networking().await?; @@ -510,7 +518,7 @@ impl Core { &self, method: &str, payload: Vec, - session: crate::infra::daemon::state::SessionState, + session: crate::service::session::SessionState, ) -> Result, String> { if let Some(handler) = crate::ops::registry::ACTIONS.get(method) { return handler(Arc::new((*self).clone()), session, payload).await; @@ -518,6 +526,26 @@ impl Core { Err("Unknown action method".into()) } + /// Unified dispatcher by method string for both actions and queries. + pub async fn execute_operation_by_method( + &self, + method: &str, + payload: Vec, + ) -> Result, String> { + // Try actions first (they return empty bytes on success) + if let Some(handler) = crate::ops::registry::ACTIONS.get(method) { + let default_session = crate::service::session::SessionState::default(); + return handler(Arc::new((*self).clone()), default_session, payload).await; + } + + // Try queries next (they return serialized output) + if let Some(handler) = crate::ops::registry::QUERIES.get(method) { + return handler(Arc::new((*self).clone()), payload).await; + } + + Err(format!("Unknown operation method: {}", method)) + } + /// Shutdown the core gracefully pub async fn shutdown(&self) -> Result<(), Box> { info!("Shutting down Spacedrive Core..."); diff --git a/core/src/library/lock.rs b/core/src/library/lock.rs index dbd4473f8..a57f0b427 100644 --- a/core/src/library/lock.rs +++ b/core/src/library/lock.rs @@ -1,7 +1,7 @@ //! Library lock implementation to prevent concurrent access use super::error::{LibraryError, Result}; -use crate::common::utils::get_current_device_id; +use crate::device::get_current_device_id; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fs::{File, OpenOptions}; @@ -13,192 +13,188 @@ use uuid::Uuid; /// Information stored in the lock file #[derive(Debug, Serialize, Deserialize)] pub struct LockInfo { - /// ID of the device holding the lock - pub device_id: Uuid, + /// ID of the device holding the lock + pub device_id: Uuid, - /// Process ID - pub process_id: u32, + /// Process ID + pub process_id: u32, - /// When the lock was acquired - pub acquired_at: DateTime, + /// When the lock was acquired + pub acquired_at: DateTime, - /// Optional description (e.g., "indexing", "backup") - pub description: Option, + /// Optional description (e.g., "indexing", "backup") + pub description: Option, } /// A lock that prevents concurrent access to a library pub struct LibraryLock { - /// Path to the lock file - path: PathBuf, + /// Path to the lock file + path: PathBuf, - /// The open file handle (keeps the lock active) - _file: File, + /// The open file handle (keeps the lock active) + _file: File, } impl LibraryLock { - /// Attempt to acquire a lock on the library - pub fn acquire(library_path: &Path) -> Result { - let lock_path = library_path.join(".sdlibrary.lock"); + /// Attempt to acquire a lock on the library + pub fn acquire(library_path: &Path) -> Result { + let lock_path = library_path.join(".sdlibrary.lock"); - // Try to create the lock file exclusively - match OpenOptions::new() - .write(true) - .create_new(true) - .open(&lock_path) - { - Ok(mut file) => { - // Write lock information - let lock_info = LockInfo { - device_id: get_current_device_id(), - process_id: std::process::id(), - acquired_at: Utc::now(), - description: None, - }; + // Try to create the lock file exclusively + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&lock_path) + { + Ok(mut file) => { + // Write lock information + let lock_info = LockInfo { + device_id: get_current_device_id(), + process_id: std::process::id(), + acquired_at: Utc::now(), + description: None, + }; - let json = serde_json::to_string_pretty(&lock_info)?; - file.write_all(json.as_bytes())?; - file.sync_all()?; + let json = serde_json::to_string_pretty(&lock_info)?; + file.write_all(json.as_bytes())?; + file.sync_all()?; - Ok(Self { - path: lock_path, - _file: file, - }) - } - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - // Lock file exists, check if it's stale - if Self::is_lock_stale(&lock_path)? { - // Remove stale lock and try again - std::fs::remove_file(&lock_path)?; + Ok(Self { + path: lock_path, + _file: file, + }) + } + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + // Lock file exists, check if it's stale + if Self::is_lock_stale(&lock_path)? { + // Remove stale lock and try again + std::fs::remove_file(&lock_path)?; - // Recursive call to try again - Self::acquire(library_path) - } else { - Err(LibraryError::AlreadyInUse) - } - } - Err(e) => Err(e.into()), - } - } + // Recursive call to try again + Self::acquire(library_path) + } else { + Err(LibraryError::AlreadyInUse) + } + } + Err(e) => Err(e.into()), + } + } - /// Check if a lock file is stale (older than 1 hour or process no longer running) - pub fn is_lock_stale(lock_path: &Path) -> Result { - let metadata = std::fs::metadata(lock_path)?; - let modified = metadata.modified()?; - let age = SystemTime::now() - .duration_since(modified) - .unwrap_or(Duration::ZERO); + /// Check if a lock file is stale (older than 1 hour or process no longer running) + pub fn is_lock_stale(lock_path: &Path) -> Result { + let metadata = std::fs::metadata(lock_path)?; + let modified = metadata.modified()?; + let age = SystemTime::now() + .duration_since(modified) + .unwrap_or(Duration::ZERO); - // Consider lock stale if older than 1 hour - if age > Duration::from_secs(3600) { - return Ok(true); - } + // Consider lock stale if older than 1 hour + if age > Duration::from_secs(3600) { + return Ok(true); + } - // Also check if the process is still running - if let Ok(contents) = std::fs::read_to_string(lock_path) { - if let Ok(lock_info) = serde_json::from_str::(&contents) { - // Check if process is still running - if !is_process_running(lock_info.process_id) { - return Ok(true); - } - } - } + // Also check if the process is still running + if let Ok(contents) = std::fs::read_to_string(lock_path) { + if let Ok(lock_info) = serde_json::from_str::(&contents) { + // Check if process is still running + if !is_process_running(lock_info.process_id) { + return Ok(true); + } + } + } - Ok(false) - } + Ok(false) + } - /// Try to read lock information (for debugging) - pub fn read_lock_info(library_path: &Path) -> Result> { - let lock_path = library_path.join(".sdlibrary.lock"); + /// Try to read lock information (for debugging) + pub fn read_lock_info(library_path: &Path) -> Result> { + let lock_path = library_path.join(".sdlibrary.lock"); - if !lock_path.exists() { - return Ok(None); - } + if !lock_path.exists() { + return Ok(None); + } - let contents = std::fs::read_to_string(lock_path)?; - let info: LockInfo = serde_json::from_str(&contents)?; + let contents = std::fs::read_to_string(lock_path)?; + let info: LockInfo = serde_json::from_str(&contents)?; - Ok(Some(info)) - } + Ok(Some(info)) + } } /// Check if a process is still running (Unix-specific implementation) #[cfg(unix)] fn is_process_running(pid: u32) -> bool { - use std::process::Command; + use std::process::Command; - match Command::new("ps") - .arg("-p") - .arg(pid.to_string()) - .output() - { - Ok(output) => output.status.success(), - Err(_) => false, - } + match Command::new("ps").arg("-p").arg(pid.to_string()).output() { + Ok(output) => output.status.success(), + Err(_) => false, + } } /// Check if a process is still running (Windows implementation) #[cfg(windows)] fn is_process_running(pid: u32) -> bool { - use std::process::Command; + use std::process::Command; - match Command::new("tasklist") - .arg("/fi") - .arg(&format!("pid eq {}", pid)) - .arg("/fo") - .arg("csv") - .output() - { - Ok(output) => { - let output_str = String::from_utf8_lossy(&output.stdout); - output_str.lines().count() > 1 // Header + process line if exists - } - Err(_) => false, - } + match Command::new("tasklist") + .arg("/fi") + .arg(&format!("pid eq {}", pid)) + .arg("/fo") + .arg("csv") + .output() + { + Ok(output) => { + let output_str = String::from_utf8_lossy(&output.stdout); + output_str.lines().count() > 1 // Header + process line if exists + } + Err(_) => false, + } } impl Drop for LibraryLock { - fn drop(&mut self) { - // Clean up the lock file when the lock is dropped - let _ = std::fs::remove_file(&self.path); - } + fn drop(&mut self) { + // Clean up the lock file when the lock is dropped + let _ = std::fs::remove_file(&self.path); + } } #[cfg(test)] mod tests { - use super::*; - use tempfile::TempDir; + use super::*; + use tempfile::TempDir; - #[test] - fn test_library_lock() { - let temp_dir = TempDir::new().unwrap(); - let library_path = temp_dir.path().join("test.sdlibrary"); - std::fs::create_dir_all(&library_path).unwrap(); + #[test] + fn test_library_lock() { + let temp_dir = TempDir::new().unwrap(); + let library_path = temp_dir.path().join("test.sdlibrary"); + std::fs::create_dir_all(&library_path).unwrap(); - // First lock should succeed - let _lock1 = LibraryLock::acquire(&library_path).unwrap(); + // First lock should succeed + let _lock1 = LibraryLock::acquire(&library_path).unwrap(); - // Second lock should fail - match LibraryLock::acquire(&library_path) { - Err(LibraryError::AlreadyInUse) => {} - _ => panic!("Expected AlreadyInUse error"), - } + // Second lock should fail + match LibraryLock::acquire(&library_path) { + Err(LibraryError::AlreadyInUse) => {} + _ => panic!("Expected AlreadyInUse error"), + } - // Lock file should exist - assert!(library_path.join(".sdlibrary.lock").exists()); - } + // Lock file should exist + assert!(library_path.join(".sdlibrary.lock").exists()); + } - #[test] - fn test_lock_cleanup() { - let temp_dir = TempDir::new().unwrap(); - let library_path = temp_dir.path().join("test.sdlibrary"); - std::fs::create_dir_all(&library_path).unwrap(); + #[test] + fn test_lock_cleanup() { + let temp_dir = TempDir::new().unwrap(); + let library_path = temp_dir.path().join("test.sdlibrary"); + std::fs::create_dir_all(&library_path).unwrap(); - { - let _lock = LibraryLock::acquire(&library_path).unwrap(); - assert!(library_path.join(".sdlibrary.lock").exists()); - } + { + let _lock = LibraryLock::acquire(&library_path).unwrap(); + assert!(library_path.join(".sdlibrary.lock").exists()); + } - // Lock file should be cleaned up after drop - assert!(!library_path.join(".sdlibrary.lock").exists()); - } -} \ No newline at end of file + // Lock file should be cleaned up after drop + assert!(!library_path.join(".sdlibrary.lock").exists()); + } +} diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index cd91f59ef..a8842ce16 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -8,11 +8,14 @@ use super::{ }; use crate::{ context::CoreContext, + device::DeviceManager, infra::{ db::{entities, Database}, event::{Event, EventBus}, job::manager::JobManager, }, + service::session::SessionStateService, + volume::VolumeManager, }; use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; @@ -46,11 +49,21 @@ pub struct LibraryManager { /// Event bus for library events event_bus: Arc, + + /// Dependencies needed from core + session_state: Arc, + volume_manager: Arc, + device_manager: Arc, } impl LibraryManager { /// Create a new library manager - pub fn new(event_bus: Arc) -> Self { + pub fn new( + event_bus: Arc, + session_state: Arc, + volume_manager: Arc, + device_manager: Arc, + ) -> Self { // Default search paths let mut search_paths = vec![]; @@ -63,42 +76,32 @@ impl LibraryManager { libraries: Arc::new(RwLock::new(HashMap::new())), search_paths, event_bus, + session_state, + volume_manager, + device_manager, } } /// Create a new library manager with a specific libraries directory - pub fn new_with_dir(libraries_dir: PathBuf, event_bus: Arc) -> Self { + pub fn new_with_dir( + libraries_dir: PathBuf, + event_bus: Arc, + session_state: Arc, + volume_manager: Arc, + device_manager: Arc, + ) -> Self { let search_paths = vec![libraries_dir]; Self { libraries: Arc::new(RwLock::new(HashMap::new())), search_paths, event_bus, + session_state, + volume_manager, + device_manager, } } - /// Load all discovered libraries with context for job manager initialization - pub async fn load_all_with_context(&self, context: Arc) -> Result { - let discovered = self.scan_for_libraries().await?; - let mut loaded_count = 0; - - for disc in discovered { - if !disc.is_locked { - match self - .open_library_with_context(&disc.path, context.clone()) - .await - { - Ok(_) => loaded_count += 1, - Err(e) => { - warn!("Failed to load library at {:?}: {}", disc.path, e); - } - } - } - } - - Ok(loaded_count) - } - /// Add a search path for libraries pub fn add_search_path(&mut self, path: PathBuf) { if !self.search_paths.contains(&path) { @@ -147,10 +150,8 @@ impl LibraryManager { // Initialize library self.initialize_library(&library_path, name).await?; - // Open the newly created library with context - let library = self - .open_library_with_context(&library_path, context.clone()) - .await?; + // Open the newly created library + let library = self.open_library(&library_path, context.clone()).await?; // Emit event self.event_bus.emit(Event::LibraryCreated { @@ -163,8 +164,13 @@ impl LibraryManager { } /// Open a library from a path - pub async fn open_library(&self, path: impl AsRef) -> Result> { + pub async fn open_library( + &self, + path: impl AsRef, + context: Arc, + ) -> Result> { let path = path.as_ref(); + info!("Opening library at {:?}", path); // Validate it's a library directory if !is_library_directory(path) { @@ -174,40 +180,6 @@ impl LibraryManager { // Acquire lock let lock = LibraryLock::acquire(path)?; - // Load configuration - let config_path = path.join("library.json"); - let config_data = tokio::fs::read_to_string(&config_path).await?; - let config: LibraryConfig = serde_json::from_str(&config_data)?; - - // Check if already open - { - let libraries = self.libraries.read().await; - if libraries.contains_key(&config.id) { - return Err(LibraryError::AlreadyOpen(config.id)); - } - } - - // Open database - let db_path = path.join("database.db"); - let db = Arc::new(Database::open(&db_path).await?); - - // This method now requires context - redirect to open_library_with_context - return Err(LibraryError::Other( - "Context required for library creation".to_string(), - )); - } - - /// Open a library with CoreContext for job manager initialization - pub async fn open_library_with_context( - &self, - path: &Path, - context: Arc, - ) -> Result> { - info!("Opening library at {:?}", path); - - // Acquire lock - let lock = LibraryLock::acquire(path)?; - // Load config let config_path = path.join("library.json"); let config = LibraryConfig::load(&config_path).await?; @@ -217,6 +189,14 @@ impl LibraryManager { return Err(LibraryError::Other("Library config has nil ID".to_string())); } + // Check if already open + { + let libraries = self.libraries.read().await; + if libraries.contains_key(&config.id) { + return Err(LibraryError::AlreadyOpen(config.id)); + } + } + // Open database let db_path = path.join("database.db"); let db = Arc::new(Database::open(&db_path).await?); @@ -236,7 +216,7 @@ impl LibraryManager { }); // Ensure device is registered in this library - if let Err(e) = self.ensure_device_registered(&library, &context).await { + if let Err(e) = self.ensure_device_registered(&library).await { warn!("Failed to register device in library {}: {}", config.id, e); } @@ -254,11 +234,7 @@ impl LibraryManager { "Auto-tracking user-relevant volumes for library {}", config.name ); - if let Err(e) = context - .volume_manager - .auto_track_user_volumes(&library) - .await - { + if let Err(e) = self.volume_manager.auto_track_user_volumes(&library).await { warn!("Failed to auto-track user-relevant volumes: {}", e); } @@ -310,9 +286,14 @@ impl LibraryManager { self.libraries.read().await.values().cloned().collect() } - /// Get the primary library (first available library) - pub async fn get_primary_library(&self) -> Option> { - self.get_open_libraries().await.into_iter().next() + /// Get the active library + /// This is the library that the session state is set to + pub async fn get_active_library(&self) -> Option> { + let session = self.session_state.get().await; + self.get_open_libraries() + .await + .into_iter() + .find(|lib| Some(lib.id()) == session.current_library_id) } /// List all open libraries @@ -321,7 +302,7 @@ impl LibraryManager { } /// Load all libraries from the search paths - pub async fn load_all(&self) -> Result { + pub async fn load_all(&self, context: Arc) -> Result { let mut loaded_count = 0; for search_path in &self.search_paths.clone() { @@ -336,7 +317,7 @@ impl LibraryManager { let path = entry.path(); if is_library_directory(&path) { - match self.open_library(&path).await { + match self.open_library(&path, context.clone()).await { Ok(_) => { loaded_count += 1; info!("Auto-loaded library from {:?}", path); @@ -469,13 +450,9 @@ impl LibraryManager { } /// Ensure the current device is registered in the library - async fn ensure_device_registered( - &self, - library: &Arc, - context: &Arc, - ) -> Result<()> { + async fn ensure_device_registered(&self, library: &Arc) -> Result<()> { let db = library.db(); - let device = context + let device = self .device_manager .to_device() .map_err(|e| LibraryError::Other(format!("Failed to get device info: {}", e)))?; diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index feac9eedc..e2475349e 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -563,7 +563,7 @@ async fn update_location_stats( /// Get device UUID for current device async fn get_device_uuid(_library: Arc) -> LocationResult { // Get the current device ID from the global state - let device_uuid = crate::common::utils::get_current_device_id(); + let device_uuid = crate::device::get_current_device_id(); if device_uuid.is_nil() { return Err(LocationError::InvalidPath( diff --git a/core/src/ops/addressing.rs b/core/src/ops/addressing.rs index 20943416f..95d9e3dce 100644 --- a/core/src/ops/addressing.rs +++ b/core/src/ops/addressing.rs @@ -106,7 +106,7 @@ impl PathResolver { context: &CoreContext, device_id: Uuid, ) -> Result<(), PathResolutionError> { - let current_device_id = crate::common::utils::get_current_device_id(); + let current_device_id = crate::device::get_current_device_id(); // Local device is always "online" if device_id == current_device_id { @@ -134,7 +134,7 @@ impl PathResolver { /// Get list of currently online devices async fn get_online_devices(&self, context: &CoreContext) -> Vec { - let mut online = vec![crate::common::utils::get_current_device_id()]; + let mut online = vec![crate::device::get_current_device_id()]; if let Some(networking) = context.get_networking().await { for device in networking.get_connected_devices().await { @@ -150,7 +150,7 @@ impl PathResolver { let mut metrics = HashMap::new(); // Local device has zero latency - let current_device_id = crate::common::utils::get_current_device_id(); + let current_device_id = crate::device::get_current_device_id(); metrics.insert( current_device_id, DeviceMetrics { @@ -184,8 +184,9 @@ impl PathResolver { ) -> Result { // Get the current library let library = context - .library_manager - .get_primary_library() + .libraries() + .await + .get_active_library() .await .ok_or(PathResolutionError::NoActiveLibrary)?; @@ -218,7 +219,7 @@ impl PathResolver { let mut results = HashMap::new(); // Get the current library - let library = match context.library_manager.get_primary_library().await { + let library = match context.libraries().await.get_active_library().await { Some(lib) => lib, None => { // Return error for all content IDs @@ -302,10 +303,9 @@ impl PathResolver { .await? { // Build the full path using PathResolver - let path = crate::ops::indexing::path_resolver::PathResolver::get_full_path( - db, entry.id, - ) - .await?; + let path = + crate::ops::indexing::path_resolver::PathResolver::get_full_path(db, entry.id) + .await?; instances.push(ContentInstance { device_id: device.uuid, @@ -397,11 +397,11 @@ impl PathResolver { if let Some(location) = location_map.get(&entry.id) { if let Some(device) = device_map.get(&location.device_id) { // Build the full path - let path = crate::ops::indexing::path_resolver::PathResolver::get_full_path( - db, - entry.id, - ) - .await?; + let path = + crate::ops::indexing::path_resolver::PathResolver::get_full_path( + db, entry.id, + ) + .await?; let instance = ContentInstance { device_id: device.uuid, @@ -428,7 +428,7 @@ impl PathResolver { online_devices: &[Uuid], device_metrics: &HashMap, ) -> Option { - let current_device_id = crate::common::utils::get_current_device_id(); + let current_device_id = crate::device::get_current_device_id(); let mut candidates: Vec<(f64, &ContentInstance)> = instances .iter() diff --git a/core/src/ops/core/status/query.rs b/core/src/ops/core/status/query.rs index ad8b36b27..eddff90a8 100644 --- a/core/src/ops/core/status/query.rs +++ b/core/src/ops/core/status/query.rs @@ -12,7 +12,7 @@ impl Query for CoreStatusQuery { type Output = CoreStatus; async fn execute(self, context: Arc) -> Result { - let libs = context.library_manager.list().await; + let libs = context.libraries().await.list().await; Ok(CoreStatus { version: env!("CARGO_PKG_VERSION").to_string(), library_count: libs.len(), diff --git a/core/src/ops/jobs/info/query.rs b/core/src/ops/jobs/info/query.rs index 02783a1df..bd993040a 100644 --- a/core/src/ops/jobs/info/query.rs +++ b/core/src/ops/jobs/info/query.rs @@ -14,7 +14,8 @@ impl Query for JobInfoQuery { async fn execute(self, context: Arc) -> Result { let library = context - .library_manager + .libraries() + .await .get_library(self.library_id) .await .ok_or_else(|| anyhow::anyhow!("Library not found"))?; @@ -32,4 +33,3 @@ impl Query for JobInfoQuery { } crate::register_query!(JobInfoQuery, "jobs.info"); - diff --git a/core/src/ops/jobs/list/query.rs b/core/src/ops/jobs/list/query.rs index a3b4f25af..79b7671f3 100644 --- a/core/src/ops/jobs/list/query.rs +++ b/core/src/ops/jobs/list/query.rs @@ -14,18 +14,23 @@ impl Query for JobListQuery { async fn execute(self, context: Arc) -> Result { let library = context - .library_manager + .libraries() + .await .get_library(self.library_id) .await .ok_or_else(|| anyhow::anyhow!("Library not found"))?; let jobs = library.jobs().list_jobs(self.status).await?; let items = jobs .into_iter() - .map(|j| JobListItem { id: j.id, name: j.name, status: j.status, progress: j.progress }) + .map(|j| JobListItem { + id: j.id, + name: j.name, + status: j.status, + progress: j.progress, + }) .collect(); Ok(JobListOutput { jobs: items }) } } crate::register_query!(JobListQuery, "jobs.list"); - diff --git a/core/src/ops/libraries/create/action.rs b/core/src/ops/libraries/create/action.rs index 154ffe31c..dac0616e9 100644 --- a/core/src/ops/libraries/create/action.rs +++ b/core/src/ops/libraries/create/action.rs @@ -31,7 +31,7 @@ impl CoreAction for LibraryCreateAction { } async fn execute(self, context: Arc) -> Result { - let library_manager = &context.library_manager; + let library_manager = context.libraries().await; let library = library_manager .create_library( self.input.name.clone(), @@ -40,11 +40,11 @@ impl CoreAction for LibraryCreateAction { ) .await?; - Ok(LibraryCreateOutput::new( - library.id(), - library.name().await, - library.path().to_path_buf(), - )) + // Get the name and path + let name = library.name().await; + let path = library.path().to_path_buf(); + + Ok(LibraryCreateOutput::new(library.id(), name, path)) } fn action_kind(&self) -> &'static str { diff --git a/core/src/ops/libraries/delete/action.rs b/core/src/ops/libraries/delete/action.rs index b802c2751..216473958 100644 --- a/core/src/ops/libraries/delete/action.rs +++ b/core/src/ops/libraries/delete/action.rs @@ -46,7 +46,8 @@ impl CoreAction for LibraryDeleteAction { ) -> Result { // Get the library to get its name before deletion let library = context - .library_manager + .libraries() + .await .get_library(self.input.library_id) .await .ok_or_else(|| ActionError::LibraryNotFound(self.input.library_id))?; @@ -55,7 +56,8 @@ impl CoreAction for LibraryDeleteAction { // Delete the library through the library manager context - .library_manager + .libraries() + .await .delete_library(self.input.library_id, self.input.delete_data) .await?; diff --git a/core/src/ops/libraries/list/query.rs b/core/src/ops/libraries/list/query.rs index 2d2da4dd8..eafff6e2f 100644 --- a/core/src/ops/libraries/list/query.rs +++ b/core/src/ops/libraries/list/query.rs @@ -39,7 +39,7 @@ impl Query for ListLibrariesQuery { async fn execute(self, context: Arc) -> Result { // Get all open libraries from the library manager - let libraries = context.library_manager.list().await; + let libraries = context.libraries().await.list().await; let mut result = Vec::new(); for library in libraries { diff --git a/core/src/ops/locations/add/action.rs b/core/src/ops/locations/add/action.rs index 38ed6ae65..ddb57107a 100644 --- a/core/src/ops/locations/add/action.rs +++ b/core/src/ops/locations/add/action.rs @@ -19,7 +19,6 @@ use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LocationAddInput { - pub library_id: Uuid, pub path: PathBuf, pub name: Option, pub mode: IndexMode, diff --git a/core/src/ops/locations/list/query.rs b/core/src/ops/locations/list/query.rs index 8d41bd4c8..a1c049a37 100644 --- a/core/src/ops/locations/list/query.rs +++ b/core/src/ops/locations/list/query.rs @@ -14,18 +14,26 @@ impl Query for LocationsListQuery { async fn execute(self, context: Arc) -> Result { // Fetch library and query locations table - let library = context.library_manager.get_library(self.library_id).await.ok_or_else(|| anyhow::anyhow!("Library not found"))?; + let library = context + .libraries() + .await + .get_library(self.library_id) + .await + .ok_or_else(|| anyhow::anyhow!("Library not found"))?; let db = library.db().conn(); let rows = crate::infra::db::entities::location::Entity::find() .all(db) .await?; let mut out = Vec::new(); for r in rows { - out.push(LocationInfo { id: r.uuid, path: std::path::PathBuf::from(r.name.clone().unwrap_or_default()), name: r.name.clone() }); + out.push(LocationInfo { + id: r.uuid, + path: std::path::PathBuf::from(r.name.clone().unwrap_or_default()), + name: r.name.clone(), + }); } Ok(LocationsListOutput { locations: out }) } } crate::register_query!(LocationsListQuery, "locations.list"); - diff --git a/core/src/ops/mod.rs b/core/src/ops/mod.rs index c358e8c49..7fe2e7aac 100644 --- a/core/src/ops/mod.rs +++ b/core/src/ops/mod.rs @@ -19,45 +19,8 @@ pub mod libraries; pub mod locations; pub mod media; // pub mod metadata; +pub mod jobs; +pub mod network; pub mod registry; pub mod sidecar; pub mod volumes; -pub mod jobs; -pub mod network; - -/// Register all jobs with the job system -/// -/// This should be called during core initialization to register all available job types -pub fn register_all_jobs() { - // File operation jobs - register_job::(); - register_job::(); - register_job::(); - register_job::(); - register_job::(); - - // Indexing jobs - register_job::(); - - // Media processing jobs - register_job::(); -} - -/// Register a single job type with the job system -/// -/// This function would be called automatically by a derive macro in a real implementation, -/// but for now we call it manually for each job type. -fn register_job() -where - T: crate::infra::job::traits::Job + 'static, -{ - // In a real implementation with inventory, this would automatically register the job - // For now, this serves as documentation of which jobs should be registered - - // The actual registration would happen via: - // inventory::submit! { - // crate::infra::job::registration::JobRegistration::new::() - // } - - // For now we'll just log that the job type exists -} diff --git a/core/src/ops/registry.rs b/core/src/ops/registry.rs index e377fa008..7816d8411 100644 --- a/core/src/ops/registry.rs +++ b/core/src/ops/registry.rs @@ -21,7 +21,7 @@ pub type QueryHandlerFn = fn( /// Handler function signature for actions. pub type ActionHandlerFn = fn( Arc, - crate::infra::daemon::state::SessionState, + crate::service::session::SessionState, Vec, ) -> std::pin::Pin< Box, String>> + Send + 'static>, @@ -111,8 +111,8 @@ where /// Generic library action handler (decode A::Input -> A::from_input -> dispatch) pub fn handle_library_action( core: Arc, - // TODO: Move session state to core, shouldn't be in the daemon - session: crate::infra::daemon::state::SessionState, + // this isn't used, but is required by the interface, maybe fix? + session: crate::service::session::SessionState, payload: Vec, ) -> std::pin::Pin, String>> + Send + 'static>> where @@ -128,9 +128,10 @@ where .0; let action = A::from_input(input)?; let manager = crate::infra::action::manager::ActionManager::new(core.context.clone()); + let session = core.context.session_state.get().await; let library_id = session.current_library_id.ok_or("No library selected")?; let out = manager - .dispatch_library(library_id, action) + .dispatch_library(Some(library_id), action) .await .map_err(|e| e.to_string())?; encode_to_vec(&out, standard()).map_err(|e| e.to_string()) @@ -140,7 +141,7 @@ where /// Generic core action handler (decode A::Input -> A::from_input -> dispatch) pub fn handle_core_action( core: Arc, - session: crate::infra::daemon::state::SessionState, + session: crate::service::session::SessionState, payload: Vec, ) -> std::pin::Pin, String>> + Send + 'static>> where diff --git a/core/src/service/file_sharing.rs b/core/src/service/file_sharing.rs index 2ef9855f3..1acc3bd83 100644 --- a/core/src/service/file_sharing.rs +++ b/core/src/service/file_sharing.rs @@ -158,7 +158,6 @@ impl FileSharingService { // Get the current library to access its job manager let library = self .context - .library_manager .get_primary_library() .await .ok_or(SharingError::JobError( @@ -193,7 +192,6 @@ impl FileSharingService { ) -> Result, SharingError> { let library = self .context - .library_manager .get_primary_library() .await .ok_or(SharingError::JobError( @@ -317,15 +315,14 @@ impl FileSharingService { ) -> Result { match transfer_id { TransferId::JobId { job_id, library_id } => { - let library = self - .context - .library_manager - .get_library(*library_id) - .await - .ok_or(SharingError::JobError(format!( - "Library {} not found", - library_id - )))?; + let library = + self.context + .get_library(*library_id) + .await + .ok_or(SharingError::JobError(format!( + "Library {} not found", + library_id + )))?; let job_manager = library.jobs(); @@ -383,15 +380,14 @@ impl FileSharingService { pub async fn cancel_transfer(&self, transfer_id: &TransferId) -> Result<(), SharingError> { match transfer_id { TransferId::JobId { job_id, library_id } => { - let library = self - .context - .library_manager - .get_library(*library_id) - .await - .ok_or(SharingError::JobError(format!( - "Library {} not found", - library_id - )))?; + let library = + self.context + .get_library(*library_id) + .await + .ok_or(SharingError::JobError(format!( + "Library {} not found", + library_id + )))?; let job_manager = library.jobs(); @@ -414,7 +410,6 @@ impl FileSharingService { pub async fn get_active_transfers(&self) -> Result, SharingError> { let library = self .context - .library_manager .get_primary_library() .await .ok_or(SharingError::JobError( @@ -495,7 +490,7 @@ mod tests { use super::*; use crate::{ crypto::library_key_manager::LibraryKeyManager, device::DeviceManager, - infra::event::EventBus, library::LibraryManager, + infra::event::EventBus, library::LibraryManager, service::session::SessionStateService, }; use tempfile::tempdir; @@ -503,22 +498,27 @@ mod tests { async fn test_file_sharing_service_creation() { let events = Arc::new(EventBus::default()); let device_manager = Arc::new(DeviceManager::init().unwrap()); - let library_manager = Arc::new(LibraryManager::new_with_dir( - std::env::temp_dir().join("test_libraries"), - events.clone(), - )); let volume_manager = Arc::new(crate::volume::VolumeManager::new( uuid::Uuid::new_v4(), // Test device ID crate::volume::VolumeDetectionConfig::default(), events.clone(), )); let library_key_manager = Arc::new(LibraryKeyManager::new().unwrap()); + let session_state = Arc::new(SessionStateService::new(std::env::temp_dir())); + let library_manager = Arc::new(LibraryManager::new_with_dir( + std::env::temp_dir().join("test_libraries"), + events.clone(), + session_state.clone(), + volume_manager.clone(), + device_manager.clone(), + )); let context = Arc::new(CoreContext::new( events, device_manager, library_manager, volume_manager, library_key_manager, + session_state, )); let _file_sharing = FileSharingService::new(context); @@ -537,22 +537,27 @@ mod tests { async fn test_create_file_metadata() { let events = Arc::new(EventBus::default()); let device_manager = Arc::new(DeviceManager::init().unwrap()); - let library_manager = Arc::new(LibraryManager::new_with_dir( - std::env::temp_dir().join("test_libraries"), - events.clone(), - )); let volume_manager = Arc::new(crate::volume::VolumeManager::new( uuid::Uuid::new_v4(), // Test device ID crate::volume::VolumeDetectionConfig::default(), events.clone(), )); let library_key_manager = Arc::new(LibraryKeyManager::new().unwrap()); + let session_state = Arc::new(SessionStateService::new(std::env::temp_dir())); + let library_manager = Arc::new(LibraryManager::new_with_dir( + std::env::temp_dir().join("test_libraries"), + events.clone(), + session_state.clone(), + volume_manager.clone(), + device_manager.clone(), + )); let context = Arc::new(CoreContext::new( events, device_manager, library_manager, volume_manager, library_key_manager, + session_state, )); let file_sharing = FileSharingService::new(context); diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 59bce8238..107aa4c2f 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -1,8 +1,8 @@ //! Background services management use crate::{ - context::CoreContext, infra::event::EventBus, - crypto::library_key_manager::LibraryKeyManager, + context::CoreContext, crypto::library_key_manager::LibraryKeyManager, infra::event::EventBus, + service::session::SessionStateService, }; use anyhow::Result; use std::sync::Arc; @@ -12,17 +12,18 @@ use tracing::info; pub mod device; pub mod entry_state_service; pub mod file_sharing; -pub mod watcher; pub mod network; +pub mod session; pub mod sidecar_manager; pub mod volume_monitor; +pub mod watcher; use device::DeviceService; use file_sharing::FileSharingService; -use watcher::{LocationWatcher, LocationWatcherConfig}; use network::NetworkingService; use sidecar_manager::SidecarManager; -use volume_monitor::{VolumeMonitorService, VolumeMonitorConfig}; +use volume_monitor::{VolumeMonitorConfig, VolumeMonitorService}; +use watcher::{LocationWatcher, LocationWatcherConfig}; /// Container for all background services #[derive(Clone)] @@ -41,6 +42,8 @@ pub struct Services { pub sidecar_manager: Arc, /// Library key manager pub library_key_manager: Arc, + /// Session state service + pub session_state: Arc, /// Shared context for all services context: Arc, } @@ -59,15 +62,16 @@ impl Services { let device = Arc::new(DeviceService::new(context.clone())); let sidecar_manager = Arc::new(SidecarManager::new(context.clone())); let library_key_manager = context.library_key_manager.clone(); - + let session_state = context.session_state.clone(); Self { location_watcher, file_sharing, device, - networking: None, // Initialized separately when needed + networking: None, // Initialized separately when needed volume_monitor: None, // Initialized after library manager is available sidecar_manager, library_key_manager, + session_state, context, } } @@ -126,7 +130,7 @@ impl Services { library_key_manager: std::sync::Arc, data_dir: impl AsRef, ) -> Result<()> { - use crate::service::network::{NetworkingService, utils::logging::ConsoleLogger}; + use crate::service::network::{utils::logging::ConsoleLogger, NetworkingService}; info!("Initializing networking service"); let logger = std::sync::Arc::new(ConsoleLogger); diff --git a/core/src/infra/daemon/state.rs b/core/src/service/session.rs similarity index 100% rename from core/src/infra/daemon/state.rs rename to core/src/service/session.rs diff --git a/core/tests/library_test.rs b/core/tests/library_test.rs index 770c79a65..d31ba1f38 100644 --- a/core/tests/library_test.rs +++ b/core/tests/library_test.rs @@ -65,7 +65,8 @@ async fn test_library_lifecycle() { assert!(core.libraries.close_library(lib_id).await.is_err()); // Re-open library - let reopened = core.libraries.open_library(&lib_path).await.unwrap(); + let device_id = core.context.device_manager.device_id().unwrap(); + let reopened = core.libraries.open_library(&lib_path, device_id).await.unwrap(); assert_eq!(reopened.id(), lib_id); assert_eq!(reopened.name().await, "Test Library"); @@ -92,7 +93,8 @@ async fn test_library_locking() { let lib_path = library.path().to_path_buf(); // Try to open same library again - should fail - let result = core.libraries.open_library(&lib_path).await; + let device_id = core.context.device_manager.device_id().unwrap(); + let result = core.libraries.open_library(&lib_path, device_id).await; assert!(result.is_err()); // Close library @@ -103,7 +105,8 @@ async fn test_library_locking() { drop(library); // Now should be able to open - let reopened = core.libraries.open_library(&lib_path).await.unwrap(); + let device_id = core.context.device_manager.device_id().unwrap(); + let reopened = core.libraries.open_library(&lib_path, device_id).await.unwrap(); assert_eq!(reopened.name().await, "Lock Test"); } diff --git a/docs/core/design/SDPATH_REFACTOR.md b/docs/core/design/SDPATH_REFACTOR.md index 7e16bf3e5..4edf0d39a 100644 --- a/docs/core/design/SDPATH_REFACTOR.md +++ b/docs/core/design/SDPATH_REFACTOR.md @@ -128,7 +128,7 @@ async fn resolve_optimal_path( content_id: Uuid ) -> Result { // 1. Get the current library's DB connection from context - let library = context.library_manager.get_primary_library().await + let library = context.library_manager.get_active_library().await .ok_or(PathResolutionError::NoActiveLibrary)?; let db = library.db().conn(); @@ -348,4 +348,4 @@ async fn copy_files( ### 7. Conclusion -Refactoring `SdPath` from a simple `struct` to the dual-mode `enum` is a critical step in realizing the full architectural vision of Spacedrive. It replaces a fragile pointer system with a resilient, content-aware abstraction. This change directly enables the promised features of transparent failover and performance optimization, and it provides the necessary foundation for the **Simulation Engine** and other advanced, AI-native capabilities. \ No newline at end of file +Refactoring `SdPath` from a simple `struct` to the dual-mode `enum` is a critical step in realizing the full architectural vision of Spacedrive. It replaces a fragile pointer system with a resilient, content-aware abstraction. This change directly enables the promised features of transparent failover and performance optimization, and it provides the necessary foundation for the **Simulation Engine** and other advanced, AI-native capabilities.