mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-21 15:07:54 -04:00
feat: Finalize Spacedrive refactor and enhance session management
- Added a new documentation file outlining the finalization plan for the Spacedrive refactor, detailing remaining tasks and objectives for migrating to a CQRS architecture. - Implemented session state management to track the current library context across operations, improving the overall architecture's modularity and maintainability. - Refactored CLI commands and core components to utilize the new session state service, ensuring a more consistent and error-free user experience. - Enhanced error handling and logging in various modules to facilitate debugging and improve reliability.
This commit is contained in:
@@ -52,6 +52,7 @@ poonen
|
||||
rauch
|
||||
ravikant
|
||||
Recents
|
||||
reimplementation
|
||||
Renamable
|
||||
richelsen
|
||||
rspc
|
||||
|
||||
95
Refactor Progress.md
Normal file
95
Refactor Progress.md
Normal file
@@ -0,0 +1,95 @@
|
||||
## Spacedrive Refactor: Finalization Plan
|
||||
|
||||
**Document Version:** 1.0
|
||||
**Date:** September 13, 2025
|
||||
**Status:** In Progress
|
||||
|
||||
### 1\. Introduction
|
||||
|
||||
The foundational refactor of the Spacedrive `Core` engine is a success. We have established a robust, CQRS-based architecture with separate, modular `Action` and `Query` pathways, managed by the `ActionManager` and `QueryManager`. The `Core` now exposes a clean, type-safe API, and the session state has been correctly centralized within its services.
|
||||
|
||||
This document outlines the remaining tasks required to fully migrate the codebase to this new architecture, clean up legacy code, and realize the full benefits of the new design.
|
||||
|
||||
-----
|
||||
|
||||
### 2\. Phase 1: Complete the "Active Library" Context Migration
|
||||
|
||||
The primary goal of this phase is to make all operations fully session-aware, removing the need for them to manually handle a `library_id`.
|
||||
|
||||
**Objective:** Eliminate the `library_id` field from all `Action` and `Query` structs, and have their handlers source this context directly from the `Core`'s session service.
|
||||
|
||||
**Actionable Tasks:**
|
||||
|
||||
1. **Systematically Refactor Remaining `ops` Modules:**
|
||||
|
||||
* Iterate through the following modules and remove the `library_id` field from every `Action` and `Query` struct within them:
|
||||
* `ops::location::*` (All location operations)
|
||||
* `ops::object::copy`, `ops::object::move`
|
||||
* `ops::tag::*` (All tagging operations)
|
||||
* `ops::job::*` (All job-related operations)
|
||||
|
||||
2. **Update Handlers to be Session-Aware:**
|
||||
|
||||
* Modify the corresponding `ActionHandler` and `QueryHandler` implementations for the structs above.
|
||||
* Inside the `validate` or `execute` methods, retrieve the active library ID by calling a method on the core session service, for example: `core.session().get_active_library_id()?`.
|
||||
|
||||
-----
|
||||
|
||||
### 3\. Phase 2: Enhance and Finalize Client Applications
|
||||
|
||||
With a fully context-aware `Core` API, the clients can be simplified and made more powerful.
|
||||
|
||||
**Objective:** Implement the "active library" override mechanism, and reduce boilerplate code in the CLI and GraphQL layers.
|
||||
|
||||
**Actionable Tasks:**
|
||||
|
||||
1. **Implement CLI `--library` Flag:**
|
||||
|
||||
* Add a global `--library <LIBRARY_ID>` flag to the CLI using `clap`.
|
||||
* In the CLI's main execution logic, check for this flag. If present, call a new method on the `Core`'s session service (e.g., `core.session().set_override_library_id(id)`) before executing the command. This will set the context for the duration of that single operation.
|
||||
|
||||
2. **Reduce Client Boilerplate with `From` Trait:**
|
||||
|
||||
* For each `Action` and `Query` struct, implement the `From<T>` trait, where `T` is the corresponding CLI or GraphQL argument struct.
|
||||
* **Example (`core/src/ops/location/add.rs`):**
|
||||
```rust
|
||||
// Teach the Action how to be created from CLI args
|
||||
impl From<cli::AddLocationArgs> for AddLocationAction {
|
||||
fn from(args: cli::AddLocationArgs) -> Self {
|
||||
Self { path: args.path, name: args.name }
|
||||
}
|
||||
}
|
||||
```
|
||||
* Refactor the client code to use `.into()` for clean, one-line conversions, eliminating manual field mapping.
|
||||
```rust
|
||||
// In the CLI command runner
|
||||
core.execute_action(args.into()).await?;
|
||||
```
|
||||
|
||||
3. **Standardize Client Logic with Macros (Optional):**
|
||||
|
||||
* For highly repetitive client patterns (like a simple CLI command that just creates an action, executes it, and prints the result), consider creating `macro_rules!` to generate the boilerplate code automatically.
|
||||
|
||||
-----
|
||||
|
||||
### 4\. Phase 3: Finalize and Deprecate Legacy Code
|
||||
|
||||
The final step is to remove all traces of the old, tightly-coupled architecture.
|
||||
|
||||
**Objective:** Ensure the old `DaemonCommand` and its related infrastructure are completely removed from the codebase.
|
||||
|
||||
**Actionable Tasks:**
|
||||
|
||||
1. **Delete `DaemonCommand` Enum:**
|
||||
|
||||
* Once all CLI commands have been migrated to the CQRS pattern, the old `DaemonCommand` enum can be safely deleted.
|
||||
|
||||
2. **Clean Up the Daemon:**
|
||||
|
||||
* Remove all logic from the daemon that was responsible for matching on `DaemonCommand` variants. Its role should now be purely to deserialize and route the new `Action`/`Query` structs.
|
||||
|
||||
3. **Code-wide Audit:**
|
||||
|
||||
* Perform a full-text search for `DaemonCommand` and any related types to ensure no remnants are left in the CLI, daemon, or `Core` tests.
|
||||
|
||||
By completing these three phases, the refactor will be complete. The result will be a clean, scalable, and highly maintainable architecture that will serve as a robust foundation for the future of Spacedrive.
|
||||
@@ -21,8 +21,43 @@ pub async fn run(ctx: &Context, cmd: LibraryCmd) -> Result<()> {
|
||||
LibraryCmd::Create { name } => {
|
||||
let input = create::LibraryCreateInput::new(name);
|
||||
let bytes = ctx.core.action(&input).await?;
|
||||
let out: create::LibraryCreateOutput = bincode::deserialize(&bytes)?;
|
||||
println!("Created library {} with ID {} at {}", out.name, out.library_id, out.path.display());
|
||||
eprintln!("DEBUG: Received {} bytes from daemon", bytes.len());
|
||||
// Try to deserialize and provide detailed error information
|
||||
match bincode::deserialize::<create::LibraryCreateOutput>(&bytes) {
|
||||
Ok(out) => {
|
||||
println!(
|
||||
"Created library {} with ID {} at {}",
|
||||
out.name, out.library_id, out.path
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("DEBUG: Bincode deserialization failed: {}", e);
|
||||
eprintln!("DEBUG: Raw bytes length: {}", bytes.len());
|
||||
|
||||
// Try to decode as much as possible manually to debug
|
||||
if bytes.len() >= 16 {
|
||||
let uuid_bytes = &bytes[0..16];
|
||||
eprintln!("DEBUG: UUID bytes: {:?}", uuid_bytes);
|
||||
|
||||
if bytes.len() > 16 {
|
||||
let name_len = bytes[16] as usize;
|
||||
eprintln!("DEBUG: Name length: {}", name_len);
|
||||
|
||||
if bytes.len() > 17 + name_len {
|
||||
let name_bytes = &bytes[17..17 + name_len];
|
||||
if let Ok(name) = std::str::from_utf8(name_bytes) {
|
||||
eprintln!("DEBUG: Library name: {}", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to deserialize library creation response: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
LibraryCmd::Switch { id } => {
|
||||
let input = set_current::SetCurrentLibraryInput { library_id: id };
|
||||
|
||||
@@ -4,54 +4,94 @@ use uuid::Uuid;
|
||||
|
||||
use crate::context::Context;
|
||||
use crate::domains::index::IndexModeArg;
|
||||
use sd_core::ops::{
|
||||
libraries::list::{output::LibraryInfo, query::ListLibrariesQuery},
|
||||
locations::add::action::LocationAddInput,
|
||||
};
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub enum LocationCmd {
|
||||
Add { path: std::path::PathBuf, #[arg(long)] name: Option<String>, #[arg(long, value_enum, default_value = "content")] mode: IndexModeArg },
|
||||
Add {
|
||||
path: std::path::PathBuf,
|
||||
#[arg(long)]
|
||||
name: Option<String>,
|
||||
#[arg(long, value_enum, default_value = "content")]
|
||||
mode: IndexModeArg,
|
||||
},
|
||||
List,
|
||||
Remove { location_id: Uuid },
|
||||
Rescan { location_id: Uuid, #[arg(long, default_value_t = false)] force: bool },
|
||||
Remove {
|
||||
location_id: Uuid,
|
||||
},
|
||||
Rescan {
|
||||
location_id: Uuid,
|
||||
#[arg(long, default_value_t = false)]
|
||||
force: bool,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn run(ctx: &Context, cmd: LocationCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LocationCmd::Add { path, name, mode } => {
|
||||
let libs: Vec<sd_core::ops::libraries::list::output::LibraryInfo> = ctx
|
||||
let result_bytes = ctx
|
||||
.core
|
||||
.query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic())
|
||||
.action(&LocationAddInput {
|
||||
path,
|
||||
name,
|
||||
mode: sd_core::ops::indexing::job::IndexMode::from(mode),
|
||||
})
|
||||
.await?;
|
||||
let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to add locations when multiple libraries exist") };
|
||||
|
||||
let result_bytes = ctx.core.action(&sd_core::ops::locations::add::action::LocationAddInput { library_id, path, name, mode: sd_core::ops::indexing::job::IndexMode::from(mode) }).await?;
|
||||
let out: sd_core::ops::locations::add::output::LocationAddOutput = bincode::deserialize(&result_bytes)?;
|
||||
println!("Added location {} -> {}", out.location_id, out.path.display());
|
||||
let out: sd_core::ops::locations::add::output::LocationAddOutput =
|
||||
bincode::deserialize(&result_bytes)?;
|
||||
println!(
|
||||
"Added location {} -> {}",
|
||||
out.location_id,
|
||||
out.path.display()
|
||||
);
|
||||
}
|
||||
LocationCmd::List => {
|
||||
let libs: Vec<sd_core::ops::libraries::list::output::LibraryInfo> = ctx
|
||||
let out: sd_core::ops::locations::list::output::LocationsListOutput = ctx
|
||||
.core
|
||||
.query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic())
|
||||
.query(&sd_core::ops::locations::list::query::LocationsListQuery { library_id })
|
||||
.await?;
|
||||
let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to list locations when multiple libraries exist") };
|
||||
let out: sd_core::ops::locations::list::output::LocationsListOutput = ctx.core.query(&sd_core::ops::locations::list::query::LocationsListQuery { library_id }).await?;
|
||||
for loc in out.locations { println!("- {} {}", loc.id, loc.path.display()); }
|
||||
for loc in out.locations {
|
||||
println!("- {} {}", loc.id, loc.path.display());
|
||||
}
|
||||
}
|
||||
LocationCmd::Remove { location_id } => {
|
||||
let libs: Vec<sd_core::ops::libraries::list::output::LibraryInfo> = ctx
|
||||
.core
|
||||
.query(&sd_core::ops::libraries::list::query::ListLibrariesQuery::basic())
|
||||
.await?;
|
||||
let library_id = if libs.len() == 1 { libs[0].id } else { anyhow::bail!("Specify --library to remove locations when multiple libraries exist") };
|
||||
let libs: Vec<LibraryInfo> = ctx.core.query(&ListLibrariesQuery::basic()).await?;
|
||||
let library_id = if libs.len() == 1 {
|
||||
libs[0].id
|
||||
} else {
|
||||
anyhow::bail!("Specify --library to remove locations when multiple libraries exist")
|
||||
};
|
||||
|
||||
let result_bytes = ctx.core.action(&sd_core::ops::locations::remove::action::LocationRemoveInput { library_id, location_id }).await?;
|
||||
let _out: sd_core::ops::locations::remove::output::LocationRemoveOutput = bincode::deserialize(&result_bytes)?;
|
||||
let result_bytes = ctx
|
||||
.core
|
||||
.action(
|
||||
&sd_core::ops::locations::remove::action::LocationRemoveInput {
|
||||
library_id,
|
||||
location_id,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let _out: sd_core::ops::locations::remove::output::LocationRemoveOutput =
|
||||
bincode::deserialize(&result_bytes)?;
|
||||
println!("Removed location {}", location_id);
|
||||
}
|
||||
LocationCmd::Rescan { location_id, force } => {
|
||||
let result_bytes = ctx.core.action(&sd_core::ops::locations::rescan::action::LocationRescanInput { location_id, full_rescan: force }).await?;
|
||||
let _out: sd_core::ops::locations::rescan::output::LocationRescanOutput = bincode::deserialize(&result_bytes)?;
|
||||
let result_bytes = ctx
|
||||
.core
|
||||
.action(
|
||||
&sd_core::ops::locations::rescan::action::LocationRescanInput {
|
||||
location_id,
|
||||
full_rescan: force,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let _out: sd_core::ops::locations::rescan::output::LocationRescanOutput =
|
||||
bincode::deserialize(&result_bytes)?;
|
||||
println!("Rescan requested for {}", location_id);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,6 @@ enum Commands {
|
||||
Job(JobCmd),
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
@@ -83,7 +82,9 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
let socket_path = if let Some(inst) = &instance {
|
||||
data_dir.join("daemon").join(format!("daemon-{}.sock", inst))
|
||||
data_dir
|
||||
.join("daemon")
|
||||
.join(format!("daemon-{}.sock", inst))
|
||||
} else {
|
||||
data_dir.join("daemon/daemon.sock")
|
||||
};
|
||||
@@ -91,12 +92,65 @@ async fn main() -> Result<()> {
|
||||
match cli.command {
|
||||
Commands::Start { enable_networking } => {
|
||||
println!("Starting daemon...");
|
||||
sd_core::infra::daemon::bootstrap::start_default_server(socket_path, data_dir, enable_networking).await.map_err(|e| anyhow::anyhow!(e.to_string()))?;
|
||||
|
||||
// Check if daemon is already running
|
||||
let client = CoreClient::new(socket_path.clone());
|
||||
match client
|
||||
.send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Ping)
|
||||
.await
|
||||
{
|
||||
Ok(sd_core::infra::daemon::types::DaemonResponse::Pong) => {
|
||||
println!("Daemon is already running");
|
||||
return Ok(());
|
||||
}
|
||||
_ => {} // Daemon not running, continue
|
||||
}
|
||||
|
||||
// Start daemon in background using std::process::Command
|
||||
let current_exe = std::env::current_exe()?;
|
||||
let daemon_path = current_exe.parent().unwrap().join("daemon");
|
||||
let mut command = std::process::Command::new(daemon_path);
|
||||
|
||||
// Pass networking flag if enabled (if daemon supports it)
|
||||
if enable_networking {
|
||||
println!("Note: Networking flag passed but daemon may not support it yet");
|
||||
}
|
||||
|
||||
// Set working directory to current directory
|
||||
command.current_dir(std::env::current_dir()?);
|
||||
|
||||
match command.spawn() {
|
||||
Ok(child) => {
|
||||
println!("Daemon started (PID: {})", child.id());
|
||||
|
||||
// Wait a moment for daemon to start up
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
|
||||
// Verify daemon is responding
|
||||
match client
|
||||
.send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Ping)
|
||||
.await
|
||||
{
|
||||
Ok(sd_core::infra::daemon::types::DaemonResponse::Pong) => {
|
||||
println!("Daemon is ready and responding");
|
||||
}
|
||||
_ => {
|
||||
println!("Warning: Daemon may not be fully initialized yet");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("Failed to start daemon: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Commands::Stop => {
|
||||
println!("Stopping daemon...");
|
||||
let core = CoreClient::new(socket_path.clone());
|
||||
let _ = core.send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Shutdown).await.map_err(|e| anyhow::anyhow!(e.to_string()))?;
|
||||
let core = CoreClient::new(socket_path.clone());
|
||||
let _ = core
|
||||
.send_raw_request(&sd_core::infra::daemon::types::DaemonRequest::Shutdown)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
|
||||
println!("Daemon stopped.");
|
||||
}
|
||||
_ => {
|
||||
@@ -107,7 +161,12 @@ async fn main() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_client_command(command: Commands, format: OutputFormat, data_dir: std::path::PathBuf, socket_path: std::path::PathBuf) -> Result<()> {
|
||||
async fn run_client_command(
|
||||
command: Commands,
|
||||
format: OutputFormat,
|
||||
data_dir: std::path::PathBuf,
|
||||
socket_path: std::path::PathBuf,
|
||||
) -> Result<()> {
|
||||
let core = CoreClient::new(socket_path.clone());
|
||||
let ctx = Context::new(core, format, data_dir, socket_path);
|
||||
match command {
|
||||
|
||||
@@ -27,7 +27,7 @@ impl Scenario for ContentIdentificationScenario {
|
||||
use sd_core_new::infrastructure::actions::handler::ActionHandler;
|
||||
let core = &boot.core;
|
||||
let context = core.context.clone();
|
||||
let library = match core.libraries.get_primary_library().await {
|
||||
let library = match core.libraries.get_active_library().await {
|
||||
Some(lib) => lib,
|
||||
None => core.libraries.create_library("Benchmarks", None, context.clone()).await?,
|
||||
};
|
||||
|
||||
@@ -27,7 +27,7 @@ impl Scenario for CoreIndexingScenario {
|
||||
use sd_core_new::infrastructure::actions::handler::ActionHandler;
|
||||
let core = &boot.core;
|
||||
let context = core.context.clone();
|
||||
let library = match core.libraries.get_primary_library().await {
|
||||
let library = match core.libraries.get_active_library().await {
|
||||
Some(lib) => lib,
|
||||
None => core.libraries.create_library("Benchmarks", None, context.clone()).await?,
|
||||
};
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::marker::PhantomData;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::infra::daemon::client::DaemonClient;
|
||||
use crate::infra::daemon::types::{DaemonRequest, DaemonResponse};
|
||||
use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse};
|
||||
|
||||
pub trait Wire {
|
||||
const METHOD: &'static str;
|
||||
@@ -38,7 +38,7 @@ impl CoreClient {
|
||||
match resp {
|
||||
Ok(r) => match r {
|
||||
DaemonResponse::Ok(bytes) => Ok(bytes),
|
||||
DaemonResponse::Error(e) => Err(anyhow::anyhow!(e)),
|
||||
DaemonResponse::Error(e) => Err(anyhow::anyhow!(e.to_string())),
|
||||
other => Err(anyhow::anyhow!(format!("unexpected response: {:?}", other))),
|
||||
},
|
||||
Err(e) => Err(anyhow::anyhow!(e.to_string())),
|
||||
@@ -61,7 +61,7 @@ impl CoreClient {
|
||||
match resp {
|
||||
Ok(r) => match r {
|
||||
DaemonResponse::Ok(bytes) => Ok(decode_from_slice(&bytes, standard())?.0),
|
||||
DaemonResponse::Error(e) => Err(anyhow::anyhow!(e)),
|
||||
DaemonResponse::Error(e) => Err(anyhow::anyhow!(e.to_string())),
|
||||
other => Err(anyhow::anyhow!(format!("unexpected response: {:?}", other))),
|
||||
},
|
||||
Err(e) => Err(anyhow::anyhow!(e.to_string())),
|
||||
@@ -69,6 +69,9 @@ impl CoreClient {
|
||||
}
|
||||
|
||||
pub async fn send_raw_request(&self, req: &DaemonRequest) -> Result<DaemonResponse> {
|
||||
self.daemon.send(req).await.map_err(|e| anyhow::anyhow!(e.to_string()))
|
||||
self.daemon
|
||||
.send(req)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,24 +1,5 @@
|
||||
//! Shared utility functions
|
||||
|
||||
use std::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Global reference to current device ID
|
||||
/// This is set during Core initialization
|
||||
pub static CURRENT_DEVICE_ID: once_cell::sync::Lazy<RwLock<Uuid>> =
|
||||
once_cell::sync::Lazy::new(|| RwLock::new(Uuid::nil()));
|
||||
|
||||
/// Initialize the current device ID
|
||||
pub fn set_current_device_id(id: Uuid) {
|
||||
if let Ok(mut device_id) = CURRENT_DEVICE_ID.write() {
|
||||
*device_id = id;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current device ID
|
||||
pub fn get_current_device_id() -> Uuid {
|
||||
match CURRENT_DEVICE_ID.read() {
|
||||
Ok(guard) => *guard,
|
||||
Err(_) => Uuid::nil(),
|
||||
}
|
||||
}
|
||||
//
|
||||
// Note: Device ID management has been moved to device::manager for better
|
||||
// module organization. Import from there instead:
|
||||
// use crate::device::manager::{get_current_device_id, set_current_device_id};
|
||||
|
||||
@@ -1,20 +1,19 @@
|
||||
//! Shared context providing access to core application components.
|
||||
|
||||
//! Shared context providing access to core application components.
|
||||
|
||||
use crate::{config::JobLoggingConfig, device::DeviceManager, infra::action::manager::ActionManager,
|
||||
infra::event::EventBus, crypto::library_key_manager::LibraryKeyManager, library::LibraryManager,
|
||||
service::network::NetworkingService, volume::VolumeManager, infra::daemon::state::SessionStateService,
|
||||
use crate::{
|
||||
config::JobLoggingConfig, crypto::library_key_manager::LibraryKeyManager,
|
||||
device::DeviceManager, infra::action::manager::ActionManager, infra::event::EventBus,
|
||||
library::LibraryManager, service::network::NetworkingService,
|
||||
service::session::SessionStateService, volume::VolumeManager,
|
||||
};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Shared context providing access to core application components.
|
||||
#[derive(Clone)]
|
||||
pub struct CoreContext {
|
||||
pub events: Arc<EventBus>,
|
||||
pub device_manager: Arc<DeviceManager>,
|
||||
pub library_manager: Arc<LibraryManager>,
|
||||
pub library_manager: Arc<RwLock<Option<Arc<LibraryManager>>>>,
|
||||
pub volume_manager: Arc<VolumeManager>,
|
||||
pub library_key_manager: Arc<LibraryKeyManager>,
|
||||
// This is wrapped in an RwLock to allow it to be set after initialization
|
||||
@@ -23,7 +22,7 @@ pub struct CoreContext {
|
||||
// Job logging configuration
|
||||
pub job_logging_config: Option<JobLoggingConfig>,
|
||||
pub job_logs_dir: Option<PathBuf>,
|
||||
pub session_state: Arc<SessionStateService>,
|
||||
pub session_state: Arc<SessionStateService>,
|
||||
}
|
||||
|
||||
impl CoreContext {
|
||||
@@ -31,25 +30,45 @@ impl CoreContext {
|
||||
pub fn new(
|
||||
events: Arc<EventBus>,
|
||||
device_manager: Arc<DeviceManager>,
|
||||
library_manager: Arc<LibraryManager>,
|
||||
library_manager: Option<Arc<LibraryManager>>,
|
||||
volume_manager: Arc<VolumeManager>,
|
||||
library_key_manager: Arc<LibraryKeyManager>,
|
||||
session_state: Arc<SessionStateService>,
|
||||
session_state: Arc<SessionStateService>,
|
||||
) -> Self {
|
||||
Self {
|
||||
events,
|
||||
device_manager,
|
||||
library_manager,
|
||||
library_manager: Arc::new(RwLock::new(library_manager)),
|
||||
volume_manager,
|
||||
library_key_manager,
|
||||
action_manager: Arc::new(RwLock::new(None)),
|
||||
networking: Arc::new(RwLock::new(None)),
|
||||
job_logging_config: None,
|
||||
job_logs_dir: None,
|
||||
session_state,
|
||||
session_state,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the library manager
|
||||
pub async fn libraries(&self) -> Arc<LibraryManager> {
|
||||
self.library_manager.read().await.clone().unwrap()
|
||||
}
|
||||
|
||||
/// Get a library by ID
|
||||
pub async fn get_library(&self, id: uuid::Uuid) -> Option<Arc<crate::library::Library>> {
|
||||
self.libraries().await.get_library(id).await
|
||||
}
|
||||
|
||||
/// Get the primary library
|
||||
pub async fn get_primary_library(&self) -> Option<Arc<crate::library::Library>> {
|
||||
self.libraries().await.get_active_library().await
|
||||
}
|
||||
|
||||
/// Method for Core to set library manager after it's initialized
|
||||
pub async fn set_libraries(&self, library_manager: Arc<LibraryManager>) {
|
||||
*self.library_manager.write().await = Some(library_manager);
|
||||
}
|
||||
|
||||
/// Set job logging configuration
|
||||
pub fn set_job_logging(&mut self, config: JobLoggingConfig, logs_dir: PathBuf) {
|
||||
self.job_logging_config = Some(config);
|
||||
|
||||
88
core/src/device/id.rs
Normal file
88
core/src/device/id.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
//! Global Device ID Management
|
||||
//!
|
||||
//! This module manages the global device ID that is accessible throughout the application.
|
||||
//! The device ID is stored globally for performance and convenience reasons.
|
||||
//!
|
||||
//! ## Why Global?
|
||||
//! - **Performance**: Device ID is accessed frequently across the codebase. Global access
|
||||
//! avoids the overhead of Arc/RwLock on every access.
|
||||
//! - **Convenience**: No need to pass CoreContext everywhere just to get the device ID.
|
||||
//! - **Simplicity**: Device ID is immutable once set and doesn't need complex lifecycle management.
|
||||
//! - **Thread Safety**: Works seamlessly in both sync and async contexts.
|
||||
//!
|
||||
//! ## Why Not Part of DeviceManager?
|
||||
//! While the DeviceManager handles device configuration and lifecycle, the global device ID
|
||||
//! serves a different purpose - it's a runtime cache for quick access. The DeviceManager
|
||||
//! deals with device state (keys, config, etc.) while this is just the device identifier.
|
||||
//!
|
||||
//! ## Architectural Trade-offs
|
||||
//! **Pros:**
|
||||
//! - Fast access without context passing
|
||||
//! - Simple API (`get_current_device_id()`)
|
||||
//! - No error handling needed
|
||||
//! - Works anywhere in the codebase
|
||||
//!
|
||||
//! **Cons:**
|
||||
//! - Not "pure" dependency injection
|
||||
//! - Global mutable state (though read-only after initialization)
|
||||
//! - Harder to test in isolation
|
||||
//!
|
||||
//! ## Module Organization
|
||||
//! Originally in `common/utils.rs`, moved to `device/id.rs` for better module organization.
|
||||
//! Device ID management belongs with other device-related code, even though it's
|
||||
//! implemented as global functions rather than DeviceManager methods.
|
||||
//!
|
||||
//! ## Usage Pattern
|
||||
//! ```rust
|
||||
//! // Set once during initialization
|
||||
//! set_current_device_id(device_manager.device_id()?);
|
||||
//!
|
||||
//! // Use anywhere in the codebase
|
||||
//! let device_id = get_current_device_id();
|
||||
//! ```
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Global reference to current device ID
|
||||
/// This is set during Core initialization
|
||||
pub static CURRENT_DEVICE_ID: Lazy<RwLock<Uuid>> = Lazy::new(|| RwLock::new(Uuid::nil()));
|
||||
|
||||
/// Initialize the current device ID
|
||||
pub fn set_current_device_id(id: Uuid) {
|
||||
if let Ok(mut device_id) = CURRENT_DEVICE_ID.write() {
|
||||
*device_id = id;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current device ID
|
||||
pub fn get_current_device_id() -> Uuid {
|
||||
match CURRENT_DEVICE_ID.read() {
|
||||
Ok(guard) => *guard,
|
||||
Err(_) => Uuid::nil(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_device_id_management() {
|
||||
let test_id = Uuid::new_v4();
|
||||
|
||||
// Initially should be nil
|
||||
assert!(get_current_device_id().is_nil());
|
||||
|
||||
// Set device ID
|
||||
set_current_device_id(test_id);
|
||||
|
||||
// Should return the set ID
|
||||
assert_eq!(get_current_device_id(), test_id);
|
||||
|
||||
// Reset for other tests
|
||||
set_current_device_id(Uuid::nil());
|
||||
assert!(get_current_device_id().is_nil());
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Device manager for handling device lifecycle
|
||||
|
||||
use super::config::DeviceConfig;
|
||||
use crate::crypto::device_key_manager::{DeviceKeyManager, DeviceKeyError};
|
||||
use crate::crypto::device_key_manager::{DeviceKeyError, DeviceKeyManager};
|
||||
use crate::domain::device::{Device, OperatingSystem};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -11,257 +11,255 @@ use uuid::Uuid;
|
||||
/// Errors that can occur during device management
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DeviceError {
|
||||
#[error("Device not initialized")]
|
||||
NotInitialized,
|
||||
|
||||
#[error("Config path not found")]
|
||||
ConfigPathNotFound,
|
||||
|
||||
#[error("Unsupported platform")]
|
||||
UnsupportedPlatform,
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
#[error("Lock poisoned")]
|
||||
LockPoisoned,
|
||||
|
||||
#[error("Master key error: {0}")]
|
||||
MasterKey(#[from] DeviceKeyError),
|
||||
#[error("Device not initialized")]
|
||||
NotInitialized,
|
||||
|
||||
#[error("Config path not found")]
|
||||
ConfigPathNotFound,
|
||||
|
||||
#[error("Unsupported platform")]
|
||||
UnsupportedPlatform,
|
||||
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
#[error("Lock poisoned")]
|
||||
LockPoisoned,
|
||||
|
||||
#[error("Master key error: {0}")]
|
||||
MasterKey(#[from] DeviceKeyError),
|
||||
}
|
||||
|
||||
/// Manages the current device state
|
||||
pub struct DeviceManager {
|
||||
/// Current device configuration
|
||||
config: Arc<RwLock<DeviceConfig>>,
|
||||
/// Master encryption key manager
|
||||
device_key_manager: DeviceKeyManager,
|
||||
/// Custom data directory (if any)
|
||||
data_dir: Option<PathBuf>,
|
||||
/// Current device configuration
|
||||
config: Arc<RwLock<DeviceConfig>>,
|
||||
/// Master encryption key manager
|
||||
device_key_manager: DeviceKeyManager,
|
||||
/// Custom data directory (if any)
|
||||
data_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl DeviceManager {
|
||||
/// Initialize the device manager
|
||||
///
|
||||
/// This will either load existing device configuration or create a new one
|
||||
pub fn init() -> Result<Self, DeviceError> {
|
||||
let config = match DeviceConfig::load() {
|
||||
Ok(config) => config,
|
||||
Err(DeviceError::NotInitialized) => {
|
||||
// Create new device configuration
|
||||
let os = detect_os();
|
||||
let name = get_device_name();
|
||||
let mut config = DeviceConfig::new(name, os);
|
||||
|
||||
// Try to detect hardware model
|
||||
config.hardware_model = detect_hardware_model();
|
||||
|
||||
// Save the new configuration
|
||||
config.save()?;
|
||||
config
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
let device_key_manager = DeviceKeyManager::new()?;
|
||||
// Initialize master key on first run
|
||||
device_key_manager.get_or_create_master_key()?;
|
||||
|
||||
Ok(Self {
|
||||
config: Arc::new(RwLock::new(config)),
|
||||
device_key_manager,
|
||||
data_dir: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize the device manager with a custom data directory
|
||||
pub fn init_with_path(data_dir: &PathBuf) -> Result<Self, DeviceError> {
|
||||
let config = match DeviceConfig::load_from(data_dir) {
|
||||
Ok(config) => config,
|
||||
Err(DeviceError::NotInitialized) => {
|
||||
// Create new device configuration
|
||||
let os = detect_os();
|
||||
let name = get_device_name();
|
||||
let mut config = DeviceConfig::new(name, os);
|
||||
|
||||
// Try to detect hardware model
|
||||
config.hardware_model = detect_hardware_model();
|
||||
|
||||
// Save the new configuration
|
||||
config.save_to(data_dir)?;
|
||||
config
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
// Use fallback file for master key when using custom data directory
|
||||
let master_key_path = data_dir.join("master_key");
|
||||
let device_key_manager = DeviceKeyManager::new_with_fallback(master_key_path)?;
|
||||
// Initialize master key on first run
|
||||
device_key_manager.get_or_create_master_key()?;
|
||||
|
||||
Ok(Self {
|
||||
config: Arc::new(RwLock::new(config)),
|
||||
device_key_manager,
|
||||
data_dir: Some(data_dir.clone()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the current device ID
|
||||
pub fn device_id(&self) -> Result<Uuid, DeviceError> {
|
||||
self.config
|
||||
.read()
|
||||
.map(|c| c.id)
|
||||
.map_err(|_| DeviceError::LockPoisoned)
|
||||
}
|
||||
|
||||
/// Get the current device as a domain Device object
|
||||
pub async fn current_device(&self) -> Device {
|
||||
let config = self.config.read().unwrap();
|
||||
Device {
|
||||
id: config.id,
|
||||
name: config.name.clone(),
|
||||
os: parse_os(&config.os),
|
||||
hardware_model: config.hardware_model.clone(),
|
||||
network_addresses: vec![],
|
||||
is_online: true,
|
||||
sync_leadership: std::collections::HashMap::new(),
|
||||
last_seen_at: chrono::Utc::now(),
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current device configuration
|
||||
pub fn config(&self) -> Result<DeviceConfig, DeviceError> {
|
||||
self.config
|
||||
.read()
|
||||
.map(|c| c.clone())
|
||||
.map_err(|_| DeviceError::LockPoisoned)
|
||||
}
|
||||
|
||||
/// Create a Device domain object from current configuration
|
||||
pub fn to_device(&self) -> Result<Device, DeviceError> {
|
||||
let config = self.config()?;
|
||||
|
||||
// Create device with loaded configuration
|
||||
let mut device = Device::new(config.name.clone());
|
||||
device.id = config.id;
|
||||
device.os = parse_os(&config.os);
|
||||
device.hardware_model = config.hardware_model.clone();
|
||||
device.created_at = config.created_at;
|
||||
|
||||
Ok(device)
|
||||
}
|
||||
|
||||
/// Update device name
|
||||
pub fn set_name(&self, name: String) -> Result<(), DeviceError> {
|
||||
let mut config = self.config
|
||||
.write()
|
||||
.map_err(|_| DeviceError::LockPoisoned)?;
|
||||
|
||||
config.name = name;
|
||||
|
||||
// Save to the appropriate location based on whether we have a custom data dir
|
||||
if let Some(data_dir) = &self.data_dir {
|
||||
config.save_to(data_dir)?;
|
||||
} else {
|
||||
config.save()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the master encryption key
|
||||
pub fn master_key(&self) -> Result<[u8; 32], DeviceError> {
|
||||
Ok(self.device_key_manager.get_master_key()?)
|
||||
}
|
||||
|
||||
/// Get the master encryption key as hex string
|
||||
pub fn master_key_hex(&self) -> Result<String, DeviceError> {
|
||||
Ok(self.device_key_manager.get_master_key_hex()?)
|
||||
}
|
||||
|
||||
/// Regenerate the master encryption key (dangerous operation)
|
||||
pub fn regenerate_device_key(&self) -> Result<[u8; 32], DeviceError> {
|
||||
Ok(self.device_key_manager.regenerate_master_key()?)
|
||||
}
|
||||
/// Initialize the device manager
|
||||
///
|
||||
/// This will either load existing device configuration or create a new one
|
||||
pub fn init() -> Result<Self, DeviceError> {
|
||||
let config = match DeviceConfig::load() {
|
||||
Ok(config) => config,
|
||||
Err(DeviceError::NotInitialized) => {
|
||||
// Create new device configuration
|
||||
let os = detect_os();
|
||||
let name = get_device_name();
|
||||
let mut config = DeviceConfig::new(name, os);
|
||||
|
||||
// Try to detect hardware model
|
||||
config.hardware_model = detect_hardware_model();
|
||||
|
||||
// Save the new configuration
|
||||
config.save()?;
|
||||
config
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
let device_key_manager = DeviceKeyManager::new()?;
|
||||
// Initialize master key on first run
|
||||
device_key_manager.get_or_create_master_key()?;
|
||||
|
||||
Ok(Self {
|
||||
config: Arc::new(RwLock::new(config)),
|
||||
device_key_manager,
|
||||
data_dir: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize the device manager with a custom data directory
|
||||
pub fn init_with_path(data_dir: &PathBuf) -> Result<Self, DeviceError> {
|
||||
let config = match DeviceConfig::load_from(data_dir) {
|
||||
Ok(config) => config,
|
||||
Err(DeviceError::NotInitialized) => {
|
||||
// Create new device configuration
|
||||
let os = detect_os();
|
||||
let name = get_device_name();
|
||||
let mut config = DeviceConfig::new(name, os);
|
||||
|
||||
// Try to detect hardware model
|
||||
config.hardware_model = detect_hardware_model();
|
||||
|
||||
// Save the new configuration
|
||||
config.save_to(data_dir)?;
|
||||
config
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
// Use fallback file for master key when using custom data directory
|
||||
let master_key_path = data_dir.join("master_key");
|
||||
let device_key_manager = DeviceKeyManager::new_with_fallback(master_key_path)?;
|
||||
// Initialize master key on first run
|
||||
device_key_manager.get_or_create_master_key()?;
|
||||
|
||||
Ok(Self {
|
||||
config: Arc::new(RwLock::new(config)),
|
||||
device_key_manager,
|
||||
data_dir: Some(data_dir.clone()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the current device ID
|
||||
pub fn device_id(&self) -> Result<Uuid, DeviceError> {
|
||||
self.config
|
||||
.read()
|
||||
.map(|c| c.id)
|
||||
.map_err(|_| DeviceError::LockPoisoned)
|
||||
}
|
||||
|
||||
/// Get the current device as a domain Device object
|
||||
pub async fn current_device(&self) -> Device {
|
||||
let config = self.config.read().unwrap();
|
||||
Device {
|
||||
id: config.id,
|
||||
name: config.name.clone(),
|
||||
os: parse_os(&config.os),
|
||||
hardware_model: config.hardware_model.clone(),
|
||||
network_addresses: vec![],
|
||||
is_online: true,
|
||||
sync_leadership: std::collections::HashMap::new(),
|
||||
last_seen_at: chrono::Utc::now(),
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current device configuration
|
||||
pub fn config(&self) -> Result<DeviceConfig, DeviceError> {
|
||||
self.config
|
||||
.read()
|
||||
.map(|c| c.clone())
|
||||
.map_err(|_| DeviceError::LockPoisoned)
|
||||
}
|
||||
|
||||
/// Create a Device domain object from current configuration
|
||||
pub fn to_device(&self) -> Result<Device, DeviceError> {
|
||||
let config = self.config()?;
|
||||
|
||||
// Create device with loaded configuration
|
||||
let mut device = Device::new(config.name.clone());
|
||||
device.id = config.id;
|
||||
device.os = parse_os(&config.os);
|
||||
device.hardware_model = config.hardware_model.clone();
|
||||
device.created_at = config.created_at;
|
||||
|
||||
Ok(device)
|
||||
}
|
||||
|
||||
/// Update device name
|
||||
pub fn set_name(&self, name: String) -> Result<(), DeviceError> {
|
||||
let mut config = self.config.write().map_err(|_| DeviceError::LockPoisoned)?;
|
||||
|
||||
config.name = name;
|
||||
|
||||
// Save to the appropriate location based on whether we have a custom data dir
|
||||
if let Some(data_dir) = &self.data_dir {
|
||||
config.save_to(data_dir)?;
|
||||
} else {
|
||||
config.save()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the master encryption key
|
||||
pub fn master_key(&self) -> Result<[u8; 32], DeviceError> {
|
||||
Ok(self.device_key_manager.get_master_key()?)
|
||||
}
|
||||
|
||||
/// Get the master encryption key as hex string
|
||||
pub fn master_key_hex(&self) -> Result<String, DeviceError> {
|
||||
Ok(self.device_key_manager.get_master_key_hex()?)
|
||||
}
|
||||
|
||||
/// Regenerate the master encryption key (dangerous operation)
|
||||
pub fn regenerate_device_key(&self) -> Result<[u8; 32], DeviceError> {
|
||||
Ok(self.device_key_manager.regenerate_master_key()?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the device name from the system
|
||||
fn get_device_name() -> String {
|
||||
whoami::devicename()
|
||||
whoami::devicename()
|
||||
}
|
||||
|
||||
/// Detect the operating system
|
||||
fn detect_os() -> String {
|
||||
if cfg!(target_os = "macos") {
|
||||
"macOS".to_string()
|
||||
} else if cfg!(target_os = "windows") {
|
||||
"Windows".to_string()
|
||||
} else if cfg!(target_os = "linux") {
|
||||
"Linux".to_string()
|
||||
} else if cfg!(target_os = "ios") {
|
||||
"iOS".to_string()
|
||||
} else if cfg!(target_os = "android") {
|
||||
"Android".to_string()
|
||||
} else {
|
||||
"Unknown".to_string()
|
||||
}
|
||||
if cfg!(target_os = "macos") {
|
||||
"macOS".to_string()
|
||||
} else if cfg!(target_os = "windows") {
|
||||
"Windows".to_string()
|
||||
} else if cfg!(target_os = "linux") {
|
||||
"Linux".to_string()
|
||||
} else if cfg!(target_os = "ios") {
|
||||
"iOS".to_string()
|
||||
} else if cfg!(target_os = "android") {
|
||||
"Android".to_string()
|
||||
} else {
|
||||
"Unknown".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse OS string back to enum
|
||||
fn parse_os(os: &str) -> OperatingSystem {
|
||||
match os {
|
||||
"macOS" => OperatingSystem::MacOS,
|
||||
"Windows" => OperatingSystem::Windows,
|
||||
"Linux" => OperatingSystem::Linux,
|
||||
"iOS" => OperatingSystem::IOs,
|
||||
"Android" => OperatingSystem::Android,
|
||||
_ => OperatingSystem::Other,
|
||||
}
|
||||
match os {
|
||||
"macOS" => OperatingSystem::MacOS,
|
||||
"Windows" => OperatingSystem::Windows,
|
||||
"Linux" => OperatingSystem::Linux,
|
||||
"iOS" => OperatingSystem::IOs,
|
||||
"Android" => OperatingSystem::Android,
|
||||
_ => OperatingSystem::Other,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to detect hardware model
|
||||
fn detect_hardware_model() -> Option<String> {
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// Try to get model from system_profiler
|
||||
use std::process::Command;
|
||||
|
||||
let output = Command::new("system_profiler")
|
||||
.args(&["SPHardwareDataType", "-json"])
|
||||
.output()
|
||||
.ok()?;
|
||||
|
||||
if output.status.success() {
|
||||
let json_str = String::from_utf8_lossy(&output.stdout);
|
||||
// Simple extraction - in production would use proper JSON parsing
|
||||
if let Some(start) = json_str.find("\"machine_model\":") {
|
||||
let substr = &json_str[start + 17..];
|
||||
if let Some(end) = substr.find('"') {
|
||||
return Some(substr[..end].to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// Try to get model from system_profiler
|
||||
use std::process::Command;
|
||||
|
||||
let output = Command::new("system_profiler")
|
||||
.args(&["SPHardwareDataType", "-json"])
|
||||
.output()
|
||||
.ok()?;
|
||||
|
||||
if output.status.success() {
|
||||
let json_str = String::from_utf8_lossy(&output.stdout);
|
||||
// Simple extraction - in production would use proper JSON parsing
|
||||
if let Some(start) = json_str.find("\"machine_model\":") {
|
||||
let substr = &json_str[start + 17..];
|
||||
if let Some(end) = substr.find('"') {
|
||||
return Some(substr[..end].to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_device_config() {
|
||||
let config = DeviceConfig::new("Test Device".to_string(), "Linux".to_string());
|
||||
assert_eq!(config.name, "Test Device");
|
||||
assert_eq!(config.os, "Linux");
|
||||
assert!(config.hardware_model.is_none());
|
||||
}
|
||||
}
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_device_config() {
|
||||
let config = DeviceConfig::new("Test Device".to_string(), "Linux".to_string());
|
||||
assert_eq!(config.name, "Test Device");
|
||||
assert_eq!(config.os, "Linux");
|
||||
assert!(config.hardware_model.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
//! Device management module
|
||||
//!
|
||||
//!
|
||||
//! Handles persistent device identification across Spacedrive installations
|
||||
|
||||
mod config;
|
||||
mod id;
|
||||
mod manager;
|
||||
|
||||
pub use crate::crypto::device_key_manager::{DeviceKeyError, DeviceKeyManager};
|
||||
pub use config::DeviceConfig;
|
||||
pub use manager::{DeviceManager, DeviceError};
|
||||
pub use crate::crypto::device_key_manager::{DeviceKeyManager, DeviceKeyError};
|
||||
pub use id::{get_current_device_id, set_current_device_id, CURRENT_DEVICE_ID};
|
||||
pub use manager::{DeviceError, DeviceManager};
|
||||
|
||||
// Re-export domain types
|
||||
pub use crate::domain::device::{Device, OperatingSystem};
|
||||
pub use crate::domain::device::{Device, OperatingSystem};
|
||||
|
||||
@@ -56,7 +56,7 @@ impl SdPath {
|
||||
/// Create an SdPath for a local file on this device
|
||||
pub fn local(path: impl Into<PathBuf>) -> Self {
|
||||
Self::Physical {
|
||||
device_id: crate::common::utils::get_current_device_id(),
|
||||
device_id: crate::device::get_current_device_id(),
|
||||
path: path.into(),
|
||||
}
|
||||
}
|
||||
@@ -65,7 +65,7 @@ impl SdPath {
|
||||
pub fn is_local(&self) -> bool {
|
||||
match self {
|
||||
Self::Physical { device_id, .. } => {
|
||||
*device_id == crate::common::utils::get_current_device_id()
|
||||
*device_id == crate::device::get_current_device_id()
|
||||
}
|
||||
Self::Content { .. } => false, // Content paths are abstract, not inherently local
|
||||
}
|
||||
@@ -75,7 +75,7 @@ impl SdPath {
|
||||
pub fn as_local_path(&self) -> Option<&Path> {
|
||||
match self {
|
||||
Self::Physical { device_id, path } => {
|
||||
if *device_id == crate::common::utils::get_current_device_id() {
|
||||
if *device_id == crate::device::get_current_device_id() {
|
||||
Some(path)
|
||||
} else {
|
||||
None
|
||||
@@ -89,7 +89,7 @@ impl SdPath {
|
||||
pub fn display(&self) -> String {
|
||||
match self {
|
||||
Self::Physical { device_id, path } => {
|
||||
if *device_id == crate::common::utils::get_current_device_id() {
|
||||
if *device_id == crate::device::get_current_device_id() {
|
||||
path.display().to_string()
|
||||
} else {
|
||||
format!("sd://{}/{}", device_id, path.display())
|
||||
|
||||
@@ -109,8 +109,8 @@ impl Device {
|
||||
}
|
||||
|
||||
/// Check if this is the current device
|
||||
pub fn is_current(&self) -> bool {
|
||||
self.id == crate::common::utils::get_current_device_id()
|
||||
pub fn is_current(&self, current_device_id: Uuid) -> bool {
|
||||
self.id == current_device_id
|
||||
}
|
||||
|
||||
/// Set sync role for a library
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
use super::error::{ActionError, ActionResult};
|
||||
use crate::{
|
||||
common::utils::get_current_device_id,
|
||||
context::CoreContext,
|
||||
infra::db::entities::{audit_log, AuditLog, AuditLogActive},
|
||||
};
|
||||
@@ -28,17 +27,17 @@ impl ActionManager {
|
||||
&self,
|
||||
action: A,
|
||||
) -> Result<A::Output, super::error::ActionError> {
|
||||
// 1. Validate the action
|
||||
// Validate the action
|
||||
action.validate(self.context.clone()).await?;
|
||||
|
||||
// 2. Log action execution (capture action_kind before move)
|
||||
// Log action execution (capture action_kind before move)
|
||||
let action_kind = action.action_kind();
|
||||
tracing::info!("Executing core action: {}", action_kind);
|
||||
|
||||
// 3. Execute the action directly
|
||||
// Execute the action directly
|
||||
let result = action.execute(self.context.clone()).await;
|
||||
|
||||
// 4. Log result
|
||||
// Log result
|
||||
match &result {
|
||||
Ok(_) => tracing::info!("Core action {} completed successfully", action_kind),
|
||||
Err(e) => tracing::error!("Core action {} failed: {}", action_kind, e),
|
||||
@@ -50,35 +49,41 @@ impl ActionManager {
|
||||
/// Dispatch a library-scoped action (library context pre-validated)
|
||||
pub async fn dispatch_library<A: super::LibraryAction>(
|
||||
&self,
|
||||
library_id: Uuid,
|
||||
library_id: Option<Uuid>,
|
||||
action: A,
|
||||
) -> Result<A::Output, super::error::ActionError> {
|
||||
// 1. Get and validate library exists (eliminates boilerplate!)
|
||||
// Either use the provided library_id, or the currently active one from core state
|
||||
// An action created with a given library will always execute on that library
|
||||
// Actions and queries should not hold their own library id state, rather receive it as context
|
||||
let effective_library_id = library_id
|
||||
.or(self.context.session_state.get().await.current_library_id)
|
||||
.ok_or(ActionError::LibraryNotFound(library_id.unwrap_or_default()))?;
|
||||
|
||||
// Get and validate library exists
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_library(library_id)
|
||||
.get_library(effective_library_id)
|
||||
.await
|
||||
.ok_or_else(|| ActionError::LibraryNotFound(library_id))?;
|
||||
.ok_or_else(|| ActionError::LibraryNotFound(effective_library_id))?;
|
||||
|
||||
// 2. Validate the action with library context
|
||||
// Validate the action with library context
|
||||
action.validate(&library, self.context.clone()).await?;
|
||||
|
||||
// 3. Create audit log entry (capture values before move)
|
||||
// Create audit log entry (capture values before move)
|
||||
let action_kind = action.action_kind();
|
||||
let audit_entry = self
|
||||
.create_action_audit_log(library_id, action_kind)
|
||||
.create_action_audit_log(effective_library_id, action_kind)
|
||||
.await?;
|
||||
|
||||
// 4. Execute the action with validated library
|
||||
// Execute the action with validated library
|
||||
let result = action.execute(library, self.context.clone()).await;
|
||||
|
||||
// 5. Finalize audit log with result
|
||||
// Finalize audit log with result
|
||||
let audit_result = match &result {
|
||||
Ok(_) => Ok("Action completed successfully".to_string()),
|
||||
Err(e) => Err(ActionError::Internal(e.to_string())),
|
||||
};
|
||||
self.finalize_audit_log(audit_entry, &audit_result, library_id)
|
||||
self.finalize_audit_log(audit_entry, &audit_result, effective_library_id)
|
||||
.await?;
|
||||
|
||||
result
|
||||
@@ -93,10 +98,11 @@ impl ActionManager {
|
||||
let library = self.get_library(library_id).await?;
|
||||
let db = library.db().conn();
|
||||
|
||||
let device_id = crate::device::get_current_device_id();
|
||||
let audit_entry = AuditLogActive {
|
||||
uuid: Set(Uuid::new_v4().to_string()),
|
||||
action_type: Set(action_kind.to_string()),
|
||||
actor_device_id: Set(get_current_device_id().to_string()),
|
||||
actor_device_id: Set(device_id.to_string()),
|
||||
targets: Set("{}".to_string()), // TODO: Add targets_summary to ActionTrait
|
||||
status: Set(audit_log::ActionStatus::InProgress),
|
||||
job_id: Set(None),
|
||||
@@ -110,16 +116,6 @@ impl ActionManager {
|
||||
audit_entry.insert(db).await.map_err(ActionError::SeaOrm)
|
||||
}
|
||||
|
||||
/// Create an initial audit log entry (legacy method - kept for compatibility)
|
||||
async fn create_audit_log(
|
||||
&self,
|
||||
library_id: Uuid,
|
||||
action_kind: &str,
|
||||
) -> ActionResult<audit_log::Model> {
|
||||
// Delegate to the new method
|
||||
self.create_action_audit_log(library_id, action_kind).await
|
||||
}
|
||||
|
||||
/// Finalize the audit log entry with the result
|
||||
async fn finalize_audit_log(
|
||||
&self,
|
||||
@@ -151,7 +147,6 @@ impl ActionManager {
|
||||
active_model.status = Set(audit_log::ActionStatus::Completed);
|
||||
active_model.completed_at = Set(Some(chrono::Utc::now()));
|
||||
// Extract job_id if present in certain output types
|
||||
// TODO: Update this when we have job-based actions
|
||||
active_model.result_payload = Set(Some(output.clone()));
|
||||
}
|
||||
Err(error) => {
|
||||
@@ -172,7 +167,6 @@ impl ActionManager {
|
||||
library_id: Uuid,
|
||||
) -> ActionResult<std::sync::Arc<crate::library::Library>> {
|
||||
self.context
|
||||
.library_manager
|
||||
.get_library(library_id)
|
||||
.await
|
||||
.ok_or(ActionError::LibraryNotFound(library_id))
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::infra::daemon::{
|
||||
instance::CoreInstanceManager, rpc::RpcServer, state::SessionStateService,
|
||||
instance::CoreInstanceManager, rpc::RpcServer,
|
||||
};
|
||||
|
||||
/// Start a default daemon server with built-in handlers and default instance
|
||||
@@ -11,12 +11,10 @@ pub async fn start_default_server(
|
||||
data_dir: PathBuf,
|
||||
enable_networking: bool,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let session = Arc::new(SessionStateService::new(data_dir.clone()));
|
||||
let instances = Arc::new(CoreInstanceManager::new(
|
||||
data_dir.clone(),
|
||||
enable_networking,
|
||||
session.clone(),
|
||||
));
|
||||
let mut server = RpcServer::new(socket_path, instances, session);
|
||||
let mut server = RpcServer::new(socket_path, instances);
|
||||
server.start().await
|
||||
}
|
||||
|
||||
@@ -9,17 +9,44 @@ pub struct DaemonClient {
|
||||
}
|
||||
|
||||
impl DaemonClient {
|
||||
pub fn new(socket_path: PathBuf) -> Self { Self { socket_path } }
|
||||
pub fn new(socket_path: PathBuf) -> Self {
|
||||
Self { socket_path }
|
||||
}
|
||||
|
||||
pub async fn send(
|
||||
&self,
|
||||
req: &DaemonRequest,
|
||||
) -> Result<DaemonResponse, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut stream = UnixStream::connect(&self.socket_path).await.map_err(|e| {
|
||||
format!(
|
||||
"Failed to connect to daemon socket at {}: {}",
|
||||
self.socket_path.display(),
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
let payload =
|
||||
serde_json::to_vec(req).map_err(|e| format!("Failed to serialize request: {}", e))?;
|
||||
|
||||
stream
|
||||
.write_all(&payload)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to send request to daemon: {}", e))?;
|
||||
|
||||
stream
|
||||
.shutdown()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to shutdown write stream: {}", e))?;
|
||||
|
||||
pub async fn send(&self, req: &DaemonRequest) -> Result<DaemonResponse, Box<dyn std::error::Error>> {
|
||||
let mut stream = UnixStream::connect(&self.socket_path).await?;
|
||||
let payload = serde_json::to_vec(req)?;
|
||||
stream.write_all(&payload).await?;
|
||||
stream.shutdown().await?;
|
||||
let mut buf = Vec::new();
|
||||
stream.read_to_end(&mut buf).await?;
|
||||
Ok(serde_json::from_slice(&buf)?)
|
||||
stream
|
||||
.read_to_end(&mut buf)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to read response from daemon: {}", e))?;
|
||||
|
||||
let response: DaemonResponse = serde_json::from_slice(&buf)
|
||||
.map_err(|e| format!("Failed to deserialize daemon response: {}", e))?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -4,73 +4,42 @@ use std::sync::Arc;
|
||||
use futures::future::BoxFuture;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::state::SessionState;
|
||||
use crate::Core;
|
||||
use bincode::config::standard;
|
||||
use bincode::serde::{decode_from_slice, encode_to_vec};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
/// Signature for a generic action handler: takes raw bytes, returns raw bytes
|
||||
pub type ActionHandler = Arc<
|
||||
dyn Fn(Vec<u8>, Arc<Core>, SessionState) -> BoxFuture<'static, Result<Vec<u8>, String>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>;
|
||||
|
||||
/// Signature for a generic query handler (placeholder for future)
|
||||
pub type QueryHandler = Arc<
|
||||
dyn Fn(Vec<u8>, Arc<Core>, SessionState) -> BoxFuture<'static, Result<Vec<u8>, String>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>;
|
||||
/// Unified handler signature for both actions and queries
|
||||
/// Actions return empty Vec<u8>, queries return serialized output
|
||||
pub type OperationHandler =
|
||||
Arc<dyn Fn(Vec<u8>, Arc<Core>) -> BoxFuture<'static, Result<Vec<u8>, String>> + Send + Sync>;
|
||||
|
||||
/// Registry that maps type IDs to handlers. The daemon remains agnostic of concrete types.
|
||||
pub struct DispatchRegistry {
|
||||
actions: RwLock<HashMap<String, ActionHandler>>,
|
||||
queries: RwLock<HashMap<String, QueryHandler>>,
|
||||
handlers: RwLock<HashMap<String, OperationHandler>>,
|
||||
}
|
||||
|
||||
impl DispatchRegistry {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
actions: RwLock::new(HashMap::new()),
|
||||
queries: RwLock::new(HashMap::new()),
|
||||
handlers: RwLock::new(HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn register_action(&self, type_id: impl Into<String>, handler: ActionHandler) {
|
||||
self.actions.write().await.insert(type_id.into(), handler);
|
||||
pub async fn register_handler(&self, type_id: impl Into<String>, handler: OperationHandler) {
|
||||
self.handlers.write().await.insert(type_id.into(), handler);
|
||||
}
|
||||
|
||||
pub async fn register_query(&self, type_id: impl Into<String>, handler: QueryHandler) {
|
||||
self.queries.write().await.insert(type_id.into(), handler);
|
||||
}
|
||||
|
||||
pub async fn dispatch_action(
|
||||
pub async fn dispatch(
|
||||
&self,
|
||||
type_id: &str,
|
||||
payload: Vec<u8>,
|
||||
core: Arc<Core>,
|
||||
session: SessionState,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let map = self.actions.read().await;
|
||||
let map = self.handlers.read().await;
|
||||
match map.get(type_id) {
|
||||
Some(handler) => (handler)(payload, core, session).await,
|
||||
None => Err("Unknown action type".into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn dispatch_query(
|
||||
&self,
|
||||
type_id: &str,
|
||||
payload: Vec<u8>,
|
||||
core: Arc<Core>,
|
||||
session: SessionState,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let map = self.queries.read().await;
|
||||
match map.get(type_id) {
|
||||
Some(handler) => (handler)(payload, core, session).await,
|
||||
None => Err("Unknown query type".into()),
|
||||
Some(handler) => (handler)(payload, core).await,
|
||||
None => Err(format!("Unknown operation type: {}", type_id)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,22 +47,22 @@ impl DispatchRegistry {
|
||||
/// Build a generic action handler that decodes T from payload, executes, and returns empty Ok bytes
|
||||
pub fn make_action_handler<T>(
|
||||
exec: std::sync::Arc<
|
||||
dyn Fn(T, std::sync::Arc<Core>, SessionState) -> BoxFuture<'static, Result<(), String>>
|
||||
dyn Fn(T, std::sync::Arc<Core>) -> BoxFuture<'static, Result<(), String>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
>,
|
||||
) -> ActionHandler
|
||||
) -> OperationHandler
|
||||
where
|
||||
T: DeserializeOwned + Send + 'static,
|
||||
{
|
||||
std::sync::Arc::new(move |payload, core, session| {
|
||||
std::sync::Arc::new(move |payload, core| {
|
||||
let exec = exec.clone();
|
||||
Box::pin(async move {
|
||||
let val: T = decode_from_slice(&payload, standard())
|
||||
.map_err(|e| format!("deserialize: {}", e))?
|
||||
.0;
|
||||
(exec)(val, core, session).await.map(|_| Vec::new())
|
||||
(exec)(val, core).await.map(|_| Vec::new())
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -101,23 +70,23 @@ where
|
||||
/// Build a generic query handler that decodes Q, executes to O, and encodes O
|
||||
pub fn make_query_handler<Q, O>(
|
||||
exec: std::sync::Arc<
|
||||
dyn Fn(Q, std::sync::Arc<Core>, SessionState) -> BoxFuture<'static, Result<O, String>>
|
||||
dyn Fn(Q, std::sync::Arc<Core>) -> BoxFuture<'static, Result<O, String>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
>,
|
||||
) -> QueryHandler
|
||||
) -> OperationHandler
|
||||
where
|
||||
Q: DeserializeOwned + Send + 'static,
|
||||
O: Serialize + Send + 'static,
|
||||
{
|
||||
std::sync::Arc::new(move |payload, core, session| {
|
||||
std::sync::Arc::new(move |payload, core| {
|
||||
let exec = exec.clone();
|
||||
Box::pin(async move {
|
||||
let val: Q = decode_from_slice(&payload, standard())
|
||||
.map_err(|e| format!("deserialize: {}", e))?
|
||||
.0;
|
||||
let out = (exec)(val, core, session).await?;
|
||||
let out = (exec)(val, core).await?;
|
||||
encode_to_vec(&out, standard()).map_err(|e| e.to_string())
|
||||
})
|
||||
})
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::Core;
|
||||
use crate::infra::daemon::state::SessionStateService;
|
||||
|
||||
/// Validate instance name to prevent path traversal attacks
|
||||
pub fn validate_instance_name(instance: &str) -> Result<(), String> {
|
||||
@@ -14,7 +13,10 @@ pub fn validate_instance_name(instance: &str) -> Result<(), String> {
|
||||
if instance.len() > 64 {
|
||||
return Err("Instance name too long (max 64 characters)".to_string());
|
||||
}
|
||||
if !instance.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') {
|
||||
if !instance
|
||||
.chars()
|
||||
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
|
||||
{
|
||||
return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string());
|
||||
}
|
||||
Ok(())
|
||||
@@ -25,16 +27,14 @@ pub struct CoreInstanceManager {
|
||||
instances: Arc<RwLock<HashMap<String, Arc<Core>>>>,
|
||||
default_data_dir: PathBuf,
|
||||
enable_networking: bool,
|
||||
session_state: Arc<SessionStateService>,
|
||||
}
|
||||
|
||||
impl CoreInstanceManager {
|
||||
pub fn new(default_data_dir: PathBuf, enable_networking: bool, session_state: Arc<SessionStateService>) -> Self {
|
||||
pub fn new(default_data_dir: PathBuf, enable_networking: bool) -> Self {
|
||||
Self {
|
||||
instances: Arc::new(RwLock::new(HashMap::new())),
|
||||
default_data_dir,
|
||||
enable_networking,
|
||||
session_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,13 +67,13 @@ impl CoreInstanceManager {
|
||||
// Instance doesn't exist, create it
|
||||
let data_dir = data_dir.unwrap_or_else(|| self.default_data_dir.clone());
|
||||
let core = Arc::new(
|
||||
Core::new_with_config(data_dir, self.session_state.clone())
|
||||
Core::new_with_config(data_dir)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to create core: {}", e))?
|
||||
.map_err(|e| format!("Failed to create core: {}", e))?,
|
||||
);
|
||||
|
||||
let core_with_networking = if self.enable_networking {
|
||||
Core::init_networking_shared(core.clone(), self.session_state.clone())
|
||||
Core::init_networking_shared(core.clone())
|
||||
.await
|
||||
.map_err(|e| format!("Failed to initialize networking: {}", e))?
|
||||
} else {
|
||||
@@ -93,10 +93,10 @@ impl CoreInstanceManager {
|
||||
validate_instance_name(name)?;
|
||||
|
||||
if let Some(core) = self.instances.write().await.remove(name) {
|
||||
core.shutdown().await.map_err(|e| format!("Shutdown failed: {}", e))?;
|
||||
core.shutdown()
|
||||
.await
|
||||
.map_err(|e| format!("Shutdown failed: {}", e))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
//! Client-agnostic daemon infrastructure
|
||||
|
||||
pub mod bootstrap;
|
||||
pub mod client;
|
||||
pub mod dispatch;
|
||||
pub mod health;
|
||||
pub mod instance;
|
||||
pub mod rpc;
|
||||
pub mod state;
|
||||
pub mod types;
|
||||
pub mod client;
|
||||
pub mod health;
|
||||
pub mod dispatch;
|
||||
pub mod bootstrap;
|
||||
|
||||
|
||||
|
||||
@@ -6,29 +6,22 @@ use tokio::net::UnixListener;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::infra::daemon::instance::CoreInstanceManager;
|
||||
use crate::infra::daemon::state::SessionStateService;
|
||||
use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse};
|
||||
|
||||
/// Minimal JSON-over-UDS RPC server
|
||||
pub struct RpcServer {
|
||||
socket_path: PathBuf,
|
||||
instances: Arc<CoreInstanceManager>,
|
||||
session: Arc<SessionStateService>,
|
||||
shutdown_tx: mpsc::Sender<()>,
|
||||
shutdown_rx: mpsc::Receiver<()>,
|
||||
}
|
||||
|
||||
impl RpcServer {
|
||||
pub fn new(
|
||||
socket_path: PathBuf,
|
||||
instances: Arc<CoreInstanceManager>,
|
||||
session: Arc<SessionStateService>,
|
||||
) -> Self {
|
||||
pub fn new(socket_path: PathBuf, instances: Arc<CoreInstanceManager>) -> Self {
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
Self {
|
||||
socket_path,
|
||||
instances,
|
||||
session,
|
||||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
}
|
||||
@@ -48,13 +41,12 @@ impl RpcServer {
|
||||
match result {
|
||||
Ok((stream, _addr)) => {
|
||||
let instances = self.instances.clone();
|
||||
let session = self.session.clone();
|
||||
let shutdown_tx = self.shutdown_tx.clone();
|
||||
|
||||
// Spawn task for concurrent request handling
|
||||
tokio::spawn(async move {
|
||||
// Convert errors to strings to ensure Send
|
||||
if let Err(e) = Self::handle_connection(stream, instances, session, shutdown_tx).await {
|
||||
if let Err(e) = Self::handle_connection(stream, instances, shutdown_tx).await {
|
||||
eprintln!("Connection error: {}", e);
|
||||
}
|
||||
});
|
||||
@@ -81,7 +73,6 @@ impl RpcServer {
|
||||
async fn handle_connection(
|
||||
mut stream: tokio::net::UnixStream,
|
||||
instances: Arc<CoreInstanceManager>,
|
||||
session: Arc<SessionStateService>,
|
||||
shutdown_tx: mpsc::Sender<()>,
|
||||
) -> Result<(), String> {
|
||||
// Request size limit (10MB)
|
||||
@@ -125,7 +116,7 @@ impl RpcServer {
|
||||
|
||||
// Try to parse JSON - if successful, we have the complete request
|
||||
if let Ok(req) = serde_json::from_slice::<DaemonRequest>(&buf) {
|
||||
let resp = Self::process_request(req, &instances, &session, &shutdown_tx).await;
|
||||
let resp = Self::process_request(req, &instances, &shutdown_tx).await;
|
||||
|
||||
// Send response
|
||||
let response_bytes = serde_json::to_string(&resp)
|
||||
@@ -143,28 +134,21 @@ impl RpcServer {
|
||||
async fn process_request(
|
||||
request: DaemonRequest,
|
||||
instances: &Arc<CoreInstanceManager>,
|
||||
session: &Arc<SessionStateService>,
|
||||
shutdown_tx: &mpsc::Sender<()>,
|
||||
) -> DaemonResponse {
|
||||
match request {
|
||||
DaemonRequest::Ping => DaemonResponse::Pong,
|
||||
|
||||
DaemonRequest::Action { method, payload } => match instances.get_default().await {
|
||||
Ok(core) => {
|
||||
let session_snapshot = session.get().await;
|
||||
match core
|
||||
.execute_action_by_method(&method, payload, session_snapshot)
|
||||
.await
|
||||
{
|
||||
Ok(out) => DaemonResponse::Ok(out),
|
||||
Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)),
|
||||
}
|
||||
}
|
||||
Ok(core) => match core.execute_operation_by_method(&method, payload).await {
|
||||
Ok(out) => DaemonResponse::Ok(out),
|
||||
Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)),
|
||||
},
|
||||
Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)),
|
||||
},
|
||||
|
||||
DaemonRequest::Query { method, payload } => match instances.get_default().await {
|
||||
Ok(core) => match core.execute_query_by_method(&method, payload).await {
|
||||
Ok(core) => match core.execute_operation_by_method(&method, payload).await {
|
||||
Ok(out) => DaemonResponse::Ok(out),
|
||||
Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)),
|
||||
},
|
||||
|
||||
128
core/src/lib.rs
128
core/src/lib.rs
@@ -1,8 +1,9 @@
|
||||
#![allow(warnings)]
|
||||
//! Spacedrive Core v2
|
||||
//!
|
||||
//! A unified, simplified architecture for cross-platform file management.
|
||||
//! A complete reimplementation of Spacedrive's core with modern Rust patterns, unified file operations, and a foundation built for the Virtual Distributed File System vision.
|
||||
|
||||
// Module declarations
|
||||
pub mod client;
|
||||
pub mod common;
|
||||
pub mod config;
|
||||
@@ -20,30 +21,34 @@ pub mod service;
|
||||
pub mod testing;
|
||||
pub mod volume;
|
||||
|
||||
use service::network::protocol::pairing::PairingProtocolHandler;
|
||||
use service::network::utils::logging::NetworkLogger;
|
||||
|
||||
// Compatibility module for legacy networking references
|
||||
pub mod networking {
|
||||
pub use crate::service::network::*;
|
||||
}
|
||||
|
||||
use crate::config::AppConfig;
|
||||
use crate::context::CoreContext;
|
||||
use crate::cqrs::{Query, QueryManager};
|
||||
use crate::device::DeviceManager;
|
||||
use crate::infra::action::builder::ActionBuilder;
|
||||
use crate::infra::action::manager::ActionManager;
|
||||
use crate::infra::action::{CoreAction, LibraryAction};
|
||||
use crate::infra::event::{Event, EventBus};
|
||||
use crate::library::LibraryManager;
|
||||
use crate::service::Services;
|
||||
use crate::volume::{VolumeDetectionConfig, VolumeManager};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
// Internal crate imports
|
||||
use crate::{
|
||||
config::AppConfig,
|
||||
context::CoreContext,
|
||||
cqrs::{Query, QueryManager},
|
||||
device::DeviceManager,
|
||||
infra::{
|
||||
action::{builder::ActionBuilder, manager::ActionManager, CoreAction, LibraryAction},
|
||||
event::{Event, EventBus},
|
||||
},
|
||||
library::LibraryManager,
|
||||
service::session::SessionStateService,
|
||||
service::{
|
||||
network::{protocol::pairing::PairingProtocolHandler, utils::logging::NetworkLogger},
|
||||
Services,
|
||||
},
|
||||
volume::{VolumeDetectionConfig, VolumeManager},
|
||||
};
|
||||
|
||||
// External crate imports
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
use tracing::{error, info};
|
||||
use crate::infra::daemon::state::SessionStateService;
|
||||
|
||||
/// Pending pairing request information
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -63,11 +68,8 @@ struct SpacedropRequest {
|
||||
message: Option<String>,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
// NOTE: SimplePairingUI has been moved to CLI infrastructure
|
||||
// See: src/infrastructure/cli/pairing_ui.rs for CLI-specific implementations
|
||||
|
||||
/// Bridge between networking events and core events
|
||||
/// TODO: why? - james
|
||||
pub struct NetworkEventBridge {
|
||||
network_events: mpsc::UnboundedReceiver<service::network::NetworkEvent>,
|
||||
core_events: Arc<EventBus>,
|
||||
@@ -141,58 +143,51 @@ pub struct Core {
|
||||
}
|
||||
|
||||
impl Core {
|
||||
|
||||
|
||||
/// Initialize a new Core instance with custom data directory
|
||||
pub async fn new_with_config(data_dir: PathBuf, session_state: Arc<SessionStateService>) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
pub async fn new_with_config(data_dir: PathBuf) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
info!("Initializing Spacedrive Core at {:?}", data_dir);
|
||||
|
||||
// 1. Load or create app config
|
||||
// Load or create app config
|
||||
let config = AppConfig::load_or_create(&data_dir)?;
|
||||
config.ensure_directories()?;
|
||||
let config = Arc::new(RwLock::new(config));
|
||||
|
||||
// 2. Initialize device manager
|
||||
// Initialize device manager
|
||||
let device = Arc::new(DeviceManager::init_with_path(&data_dir)?);
|
||||
// Set the global device ID for legacy compatibility
|
||||
common::utils::set_current_device_id(device.device_id()?);
|
||||
|
||||
// 3. Create event bus
|
||||
// Set a global device ID for convenience
|
||||
crate::device::set_current_device_id(device.device_id()?);
|
||||
|
||||
// Create event bus
|
||||
let events = Arc::new(EventBus::default());
|
||||
|
||||
// 4. Initialize volume manager
|
||||
// Initialize volume manager
|
||||
let volume_config = VolumeDetectionConfig::default();
|
||||
let device_id = device.device_id()?;
|
||||
let volumes = Arc::new(VolumeManager::new(device_id, volume_config, events.clone()));
|
||||
|
||||
// 5. Initialize volume detection
|
||||
// Initialize volume detection
|
||||
// info!("Initializing volume detection...");
|
||||
// match volumes.initialize().await {
|
||||
// Ok(()) => info!("Volume manager initialized"),
|
||||
// Err(e) => error!("Failed to initialize volume manager: {}", e),
|
||||
// }
|
||||
|
||||
// 6. Initialize library manager with libraries directory
|
||||
let libraries_dir = config.read().await.libraries_dir();
|
||||
let libraries = Arc::new(LibraryManager::new_with_dir(libraries_dir, events.clone()));
|
||||
|
||||
// 7. Initialize library key manager
|
||||
// Initialize library key manager
|
||||
let library_key_manager =
|
||||
Arc::new(crate::crypto::library_key_manager::LibraryKeyManager::new()?);
|
||||
|
||||
// 8. Register all job types
|
||||
info!("Registering job types...");
|
||||
crate::ops::register_all_jobs();
|
||||
info!("Job types registered");
|
||||
// Initialize session state service
|
||||
let session_state = Arc::new(SessionStateService::new(data_dir));
|
||||
|
||||
// 9. Create the context that will be shared with services
|
||||
// Create the context that will be shared with services
|
||||
let mut context_inner = CoreContext::new(
|
||||
events.clone(),
|
||||
device.clone(),
|
||||
libraries.clone(),
|
||||
None, // Libraries will be set after context creation
|
||||
volumes.clone(),
|
||||
library_key_manager.clone(),
|
||||
session_state.clone(), // Pass session_state here
|
||||
session_state.clone(),
|
||||
);
|
||||
|
||||
// Set job logging configuration if enabled
|
||||
@@ -203,15 +198,29 @@ impl Core {
|
||||
}
|
||||
drop(app_config);
|
||||
|
||||
// Create the shared context
|
||||
let context = Arc::new(context_inner);
|
||||
|
||||
// 10. Initialize services first, passing them the context
|
||||
// Initialize library manager with libraries directory and context
|
||||
let libraries_dir = config.read().await.libraries_dir();
|
||||
let libraries = Arc::new(LibraryManager::new_with_dir(
|
||||
libraries_dir,
|
||||
events.clone(),
|
||||
session_state.clone(),
|
||||
volumes.clone(),
|
||||
device.clone(),
|
||||
));
|
||||
|
||||
// Update context with libraries
|
||||
context.set_libraries(libraries.clone()).await;
|
||||
|
||||
// Initialize services first, passing them the context
|
||||
let services = Services::new(context.clone());
|
||||
|
||||
// 11. Auto-load all libraries with context for job manager initialization
|
||||
// Auto-load all libraries with context for job manager initialization
|
||||
info!("Loading existing libraries...");
|
||||
let loaded_libraries: Vec<Arc<crate::library::Library>> =
|
||||
match libraries.load_all_with_context(context.clone()).await {
|
||||
match libraries.load_all(context.clone()).await {
|
||||
Ok(count) => {
|
||||
info!("Loaded {} libraries", count);
|
||||
libraries.list().await
|
||||
@@ -396,13 +405,12 @@ impl Core {
|
||||
/// Initialize networking from Arc<Core> - for daemon use
|
||||
pub async fn init_networking_shared(
|
||||
core: Arc<Core>,
|
||||
session_state: Arc<SessionStateService>,
|
||||
) -> Result<Arc<Core>, Box<dyn std::error::Error>> {
|
||||
info!("Initializing networking for shared core...");
|
||||
|
||||
// Create a new Core with networking enabled
|
||||
let mut new_core =
|
||||
Core::new_with_config(core.config().read().await.data_dir.clone(), session_state).await?;
|
||||
Core::new_with_config(core.config().read().await.data_dir.clone()).await?;
|
||||
|
||||
// Initialize networking on the new core
|
||||
new_core.init_networking().await?;
|
||||
@@ -510,7 +518,7 @@ impl Core {
|
||||
&self,
|
||||
method: &str,
|
||||
payload: Vec<u8>,
|
||||
session: crate::infra::daemon::state::SessionState,
|
||||
session: crate::service::session::SessionState,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
if let Some(handler) = crate::ops::registry::ACTIONS.get(method) {
|
||||
return handler(Arc::new((*self).clone()), session, payload).await;
|
||||
@@ -518,6 +526,26 @@ impl Core {
|
||||
Err("Unknown action method".into())
|
||||
}
|
||||
|
||||
/// Unified dispatcher by method string for both actions and queries.
|
||||
pub async fn execute_operation_by_method(
|
||||
&self,
|
||||
method: &str,
|
||||
payload: Vec<u8>,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
// Try actions first (they return empty bytes on success)
|
||||
if let Some(handler) = crate::ops::registry::ACTIONS.get(method) {
|
||||
let default_session = crate::service::session::SessionState::default();
|
||||
return handler(Arc::new((*self).clone()), default_session, payload).await;
|
||||
}
|
||||
|
||||
// Try queries next (they return serialized output)
|
||||
if let Some(handler) = crate::ops::registry::QUERIES.get(method) {
|
||||
return handler(Arc::new((*self).clone()), payload).await;
|
||||
}
|
||||
|
||||
Err(format!("Unknown operation method: {}", method))
|
||||
}
|
||||
|
||||
/// Shutdown the core gracefully
|
||||
pub async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Shutting down Spacedrive Core...");
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Library lock implementation to prevent concurrent access
|
||||
|
||||
use super::error::{LibraryError, Result};
|
||||
use crate::common::utils::get_current_device_id;
|
||||
use crate::device::get_current_device_id;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{File, OpenOptions};
|
||||
@@ -13,192 +13,188 @@ use uuid::Uuid;
|
||||
/// Information stored in the lock file
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LockInfo {
|
||||
/// ID of the device holding the lock
|
||||
pub device_id: Uuid,
|
||||
/// ID of the device holding the lock
|
||||
pub device_id: Uuid,
|
||||
|
||||
/// Process ID
|
||||
pub process_id: u32,
|
||||
/// Process ID
|
||||
pub process_id: u32,
|
||||
|
||||
/// When the lock was acquired
|
||||
pub acquired_at: DateTime<Utc>,
|
||||
/// When the lock was acquired
|
||||
pub acquired_at: DateTime<Utc>,
|
||||
|
||||
/// Optional description (e.g., "indexing", "backup")
|
||||
pub description: Option<String>,
|
||||
/// Optional description (e.g., "indexing", "backup")
|
||||
pub description: Option<String>,
|
||||
}
|
||||
|
||||
/// A lock that prevents concurrent access to a library
|
||||
pub struct LibraryLock {
|
||||
/// Path to the lock file
|
||||
path: PathBuf,
|
||||
/// Path to the lock file
|
||||
path: PathBuf,
|
||||
|
||||
/// The open file handle (keeps the lock active)
|
||||
_file: File,
|
||||
/// The open file handle (keeps the lock active)
|
||||
_file: File,
|
||||
}
|
||||
|
||||
impl LibraryLock {
|
||||
/// Attempt to acquire a lock on the library
|
||||
pub fn acquire(library_path: &Path) -> Result<Self> {
|
||||
let lock_path = library_path.join(".sdlibrary.lock");
|
||||
/// Attempt to acquire a lock on the library
|
||||
pub fn acquire(library_path: &Path) -> Result<Self> {
|
||||
let lock_path = library_path.join(".sdlibrary.lock");
|
||||
|
||||
// Try to create the lock file exclusively
|
||||
match OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&lock_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
// Write lock information
|
||||
let lock_info = LockInfo {
|
||||
device_id: get_current_device_id(),
|
||||
process_id: std::process::id(),
|
||||
acquired_at: Utc::now(),
|
||||
description: None,
|
||||
};
|
||||
// Try to create the lock file exclusively
|
||||
match OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&lock_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
// Write lock information
|
||||
let lock_info = LockInfo {
|
||||
device_id: get_current_device_id(),
|
||||
process_id: std::process::id(),
|
||||
acquired_at: Utc::now(),
|
||||
description: None,
|
||||
};
|
||||
|
||||
let json = serde_json::to_string_pretty(&lock_info)?;
|
||||
file.write_all(json.as_bytes())?;
|
||||
file.sync_all()?;
|
||||
let json = serde_json::to_string_pretty(&lock_info)?;
|
||||
file.write_all(json.as_bytes())?;
|
||||
file.sync_all()?;
|
||||
|
||||
Ok(Self {
|
||||
path: lock_path,
|
||||
_file: file,
|
||||
})
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
// Lock file exists, check if it's stale
|
||||
if Self::is_lock_stale(&lock_path)? {
|
||||
// Remove stale lock and try again
|
||||
std::fs::remove_file(&lock_path)?;
|
||||
Ok(Self {
|
||||
path: lock_path,
|
||||
_file: file,
|
||||
})
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
// Lock file exists, check if it's stale
|
||||
if Self::is_lock_stale(&lock_path)? {
|
||||
// Remove stale lock and try again
|
||||
std::fs::remove_file(&lock_path)?;
|
||||
|
||||
// Recursive call to try again
|
||||
Self::acquire(library_path)
|
||||
} else {
|
||||
Err(LibraryError::AlreadyInUse)
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
// Recursive call to try again
|
||||
Self::acquire(library_path)
|
||||
} else {
|
||||
Err(LibraryError::AlreadyInUse)
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a lock file is stale (older than 1 hour or process no longer running)
|
||||
pub fn is_lock_stale(lock_path: &Path) -> Result<bool> {
|
||||
let metadata = std::fs::metadata(lock_path)?;
|
||||
let modified = metadata.modified()?;
|
||||
let age = SystemTime::now()
|
||||
.duration_since(modified)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
/// Check if a lock file is stale (older than 1 hour or process no longer running)
|
||||
pub fn is_lock_stale(lock_path: &Path) -> Result<bool> {
|
||||
let metadata = std::fs::metadata(lock_path)?;
|
||||
let modified = metadata.modified()?;
|
||||
let age = SystemTime::now()
|
||||
.duration_since(modified)
|
||||
.unwrap_or(Duration::ZERO);
|
||||
|
||||
// Consider lock stale if older than 1 hour
|
||||
if age > Duration::from_secs(3600) {
|
||||
return Ok(true);
|
||||
}
|
||||
// Consider lock stale if older than 1 hour
|
||||
if age > Duration::from_secs(3600) {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// Also check if the process is still running
|
||||
if let Ok(contents) = std::fs::read_to_string(lock_path) {
|
||||
if let Ok(lock_info) = serde_json::from_str::<LockInfo>(&contents) {
|
||||
// Check if process is still running
|
||||
if !is_process_running(lock_info.process_id) {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Also check if the process is still running
|
||||
if let Ok(contents) = std::fs::read_to_string(lock_path) {
|
||||
if let Ok(lock_info) = serde_json::from_str::<LockInfo>(&contents) {
|
||||
// Check if process is still running
|
||||
if !is_process_running(lock_info.process_id) {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Try to read lock information (for debugging)
|
||||
pub fn read_lock_info(library_path: &Path) -> Result<Option<LockInfo>> {
|
||||
let lock_path = library_path.join(".sdlibrary.lock");
|
||||
/// Try to read lock information (for debugging)
|
||||
pub fn read_lock_info(library_path: &Path) -> Result<Option<LockInfo>> {
|
||||
let lock_path = library_path.join(".sdlibrary.lock");
|
||||
|
||||
if !lock_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
if !lock_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let contents = std::fs::read_to_string(lock_path)?;
|
||||
let info: LockInfo = serde_json::from_str(&contents)?;
|
||||
let contents = std::fs::read_to_string(lock_path)?;
|
||||
let info: LockInfo = serde_json::from_str(&contents)?;
|
||||
|
||||
Ok(Some(info))
|
||||
}
|
||||
Ok(Some(info))
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a process is still running (Unix-specific implementation)
|
||||
#[cfg(unix)]
|
||||
fn is_process_running(pid: u32) -> bool {
|
||||
use std::process::Command;
|
||||
use std::process::Command;
|
||||
|
||||
match Command::new("ps")
|
||||
.arg("-p")
|
||||
.arg(pid.to_string())
|
||||
.output()
|
||||
{
|
||||
Ok(output) => output.status.success(),
|
||||
Err(_) => false,
|
||||
}
|
||||
match Command::new("ps").arg("-p").arg(pid.to_string()).output() {
|
||||
Ok(output) => output.status.success(),
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a process is still running (Windows implementation)
|
||||
#[cfg(windows)]
|
||||
fn is_process_running(pid: u32) -> bool {
|
||||
use std::process::Command;
|
||||
use std::process::Command;
|
||||
|
||||
match Command::new("tasklist")
|
||||
.arg("/fi")
|
||||
.arg(&format!("pid eq {}", pid))
|
||||
.arg("/fo")
|
||||
.arg("csv")
|
||||
.output()
|
||||
{
|
||||
Ok(output) => {
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
output_str.lines().count() > 1 // Header + process line if exists
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
match Command::new("tasklist")
|
||||
.arg("/fi")
|
||||
.arg(&format!("pid eq {}", pid))
|
||||
.arg("/fo")
|
||||
.arg("csv")
|
||||
.output()
|
||||
{
|
||||
Ok(output) => {
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
output_str.lines().count() > 1 // Header + process line if exists
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LibraryLock {
|
||||
fn drop(&mut self) {
|
||||
// Clean up the lock file when the lock is dropped
|
||||
let _ = std::fs::remove_file(&self.path);
|
||||
}
|
||||
fn drop(&mut self) {
|
||||
// Clean up the lock file when the lock is dropped
|
||||
let _ = std::fs::remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_library_lock() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let library_path = temp_dir.path().join("test.sdlibrary");
|
||||
std::fs::create_dir_all(&library_path).unwrap();
|
||||
#[test]
|
||||
fn test_library_lock() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let library_path = temp_dir.path().join("test.sdlibrary");
|
||||
std::fs::create_dir_all(&library_path).unwrap();
|
||||
|
||||
// First lock should succeed
|
||||
let _lock1 = LibraryLock::acquire(&library_path).unwrap();
|
||||
// First lock should succeed
|
||||
let _lock1 = LibraryLock::acquire(&library_path).unwrap();
|
||||
|
||||
// Second lock should fail
|
||||
match LibraryLock::acquire(&library_path) {
|
||||
Err(LibraryError::AlreadyInUse) => {}
|
||||
_ => panic!("Expected AlreadyInUse error"),
|
||||
}
|
||||
// Second lock should fail
|
||||
match LibraryLock::acquire(&library_path) {
|
||||
Err(LibraryError::AlreadyInUse) => {}
|
||||
_ => panic!("Expected AlreadyInUse error"),
|
||||
}
|
||||
|
||||
// Lock file should exist
|
||||
assert!(library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
// Lock file should exist
|
||||
assert!(library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lock_cleanup() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let library_path = temp_dir.path().join("test.sdlibrary");
|
||||
std::fs::create_dir_all(&library_path).unwrap();
|
||||
#[test]
|
||||
fn test_lock_cleanup() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let library_path = temp_dir.path().join("test.sdlibrary");
|
||||
std::fs::create_dir_all(&library_path).unwrap();
|
||||
|
||||
{
|
||||
let _lock = LibraryLock::acquire(&library_path).unwrap();
|
||||
assert!(library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
{
|
||||
let _lock = LibraryLock::acquire(&library_path).unwrap();
|
||||
assert!(library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
|
||||
// Lock file should be cleaned up after drop
|
||||
assert!(!library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
}
|
||||
// Lock file should be cleaned up after drop
|
||||
assert!(!library_path.join(".sdlibrary.lock").exists());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,11 +8,14 @@ use super::{
|
||||
};
|
||||
use crate::{
|
||||
context::CoreContext,
|
||||
device::DeviceManager,
|
||||
infra::{
|
||||
db::{entities, Database},
|
||||
event::{Event, EventBus},
|
||||
job::manager::JobManager,
|
||||
},
|
||||
service::session::SessionStateService,
|
||||
volume::VolumeManager,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
|
||||
@@ -46,11 +49,21 @@ pub struct LibraryManager {
|
||||
|
||||
/// Event bus for library events
|
||||
event_bus: Arc<EventBus>,
|
||||
|
||||
/// Dependencies needed from core
|
||||
session_state: Arc<SessionStateService>,
|
||||
volume_manager: Arc<VolumeManager>,
|
||||
device_manager: Arc<DeviceManager>,
|
||||
}
|
||||
|
||||
impl LibraryManager {
|
||||
/// Create a new library manager
|
||||
pub fn new(event_bus: Arc<EventBus>) -> Self {
|
||||
pub fn new(
|
||||
event_bus: Arc<EventBus>,
|
||||
session_state: Arc<SessionStateService>,
|
||||
volume_manager: Arc<VolumeManager>,
|
||||
device_manager: Arc<DeviceManager>,
|
||||
) -> Self {
|
||||
// Default search paths
|
||||
let mut search_paths = vec![];
|
||||
|
||||
@@ -63,42 +76,32 @@ impl LibraryManager {
|
||||
libraries: Arc::new(RwLock::new(HashMap::new())),
|
||||
search_paths,
|
||||
event_bus,
|
||||
session_state,
|
||||
volume_manager,
|
||||
device_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new library manager with a specific libraries directory
|
||||
pub fn new_with_dir(libraries_dir: PathBuf, event_bus: Arc<EventBus>) -> Self {
|
||||
pub fn new_with_dir(
|
||||
libraries_dir: PathBuf,
|
||||
event_bus: Arc<EventBus>,
|
||||
session_state: Arc<SessionStateService>,
|
||||
volume_manager: Arc<VolumeManager>,
|
||||
device_manager: Arc<DeviceManager>,
|
||||
) -> Self {
|
||||
let search_paths = vec![libraries_dir];
|
||||
|
||||
Self {
|
||||
libraries: Arc::new(RwLock::new(HashMap::new())),
|
||||
search_paths,
|
||||
event_bus,
|
||||
session_state,
|
||||
volume_manager,
|
||||
device_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Load all discovered libraries with context for job manager initialization
|
||||
pub async fn load_all_with_context(&self, context: Arc<CoreContext>) -> Result<usize> {
|
||||
let discovered = self.scan_for_libraries().await?;
|
||||
let mut loaded_count = 0;
|
||||
|
||||
for disc in discovered {
|
||||
if !disc.is_locked {
|
||||
match self
|
||||
.open_library_with_context(&disc.path, context.clone())
|
||||
.await
|
||||
{
|
||||
Ok(_) => loaded_count += 1,
|
||||
Err(e) => {
|
||||
warn!("Failed to load library at {:?}: {}", disc.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(loaded_count)
|
||||
}
|
||||
|
||||
/// Add a search path for libraries
|
||||
pub fn add_search_path(&mut self, path: PathBuf) {
|
||||
if !self.search_paths.contains(&path) {
|
||||
@@ -147,10 +150,8 @@ impl LibraryManager {
|
||||
// Initialize library
|
||||
self.initialize_library(&library_path, name).await?;
|
||||
|
||||
// Open the newly created library with context
|
||||
let library = self
|
||||
.open_library_with_context(&library_path, context.clone())
|
||||
.await?;
|
||||
// Open the newly created library
|
||||
let library = self.open_library(&library_path, context.clone()).await?;
|
||||
|
||||
// Emit event
|
||||
self.event_bus.emit(Event::LibraryCreated {
|
||||
@@ -163,8 +164,13 @@ impl LibraryManager {
|
||||
}
|
||||
|
||||
/// Open a library from a path
|
||||
pub async fn open_library(&self, path: impl AsRef<Path>) -> Result<Arc<Library>> {
|
||||
pub async fn open_library(
|
||||
&self,
|
||||
path: impl AsRef<Path>,
|
||||
context: Arc<CoreContext>,
|
||||
) -> Result<Arc<Library>> {
|
||||
let path = path.as_ref();
|
||||
info!("Opening library at {:?}", path);
|
||||
|
||||
// Validate it's a library directory
|
||||
if !is_library_directory(path) {
|
||||
@@ -174,40 +180,6 @@ impl LibraryManager {
|
||||
// Acquire lock
|
||||
let lock = LibraryLock::acquire(path)?;
|
||||
|
||||
// Load configuration
|
||||
let config_path = path.join("library.json");
|
||||
let config_data = tokio::fs::read_to_string(&config_path).await?;
|
||||
let config: LibraryConfig = serde_json::from_str(&config_data)?;
|
||||
|
||||
// Check if already open
|
||||
{
|
||||
let libraries = self.libraries.read().await;
|
||||
if libraries.contains_key(&config.id) {
|
||||
return Err(LibraryError::AlreadyOpen(config.id));
|
||||
}
|
||||
}
|
||||
|
||||
// Open database
|
||||
let db_path = path.join("database.db");
|
||||
let db = Arc::new(Database::open(&db_path).await?);
|
||||
|
||||
// This method now requires context - redirect to open_library_with_context
|
||||
return Err(LibraryError::Other(
|
||||
"Context required for library creation".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
/// Open a library with CoreContext for job manager initialization
|
||||
pub async fn open_library_with_context(
|
||||
&self,
|
||||
path: &Path,
|
||||
context: Arc<CoreContext>,
|
||||
) -> Result<Arc<Library>> {
|
||||
info!("Opening library at {:?}", path);
|
||||
|
||||
// Acquire lock
|
||||
let lock = LibraryLock::acquire(path)?;
|
||||
|
||||
// Load config
|
||||
let config_path = path.join("library.json");
|
||||
let config = LibraryConfig::load(&config_path).await?;
|
||||
@@ -217,6 +189,14 @@ impl LibraryManager {
|
||||
return Err(LibraryError::Other("Library config has nil ID".to_string()));
|
||||
}
|
||||
|
||||
// Check if already open
|
||||
{
|
||||
let libraries = self.libraries.read().await;
|
||||
if libraries.contains_key(&config.id) {
|
||||
return Err(LibraryError::AlreadyOpen(config.id));
|
||||
}
|
||||
}
|
||||
|
||||
// Open database
|
||||
let db_path = path.join("database.db");
|
||||
let db = Arc::new(Database::open(&db_path).await?);
|
||||
@@ -236,7 +216,7 @@ impl LibraryManager {
|
||||
});
|
||||
|
||||
// Ensure device is registered in this library
|
||||
if let Err(e) = self.ensure_device_registered(&library, &context).await {
|
||||
if let Err(e) = self.ensure_device_registered(&library).await {
|
||||
warn!("Failed to register device in library {}: {}", config.id, e);
|
||||
}
|
||||
|
||||
@@ -254,11 +234,7 @@ impl LibraryManager {
|
||||
"Auto-tracking user-relevant volumes for library {}",
|
||||
config.name
|
||||
);
|
||||
if let Err(e) = context
|
||||
.volume_manager
|
||||
.auto_track_user_volumes(&library)
|
||||
.await
|
||||
{
|
||||
if let Err(e) = self.volume_manager.auto_track_user_volumes(&library).await {
|
||||
warn!("Failed to auto-track user-relevant volumes: {}", e);
|
||||
}
|
||||
|
||||
@@ -310,9 +286,14 @@ impl LibraryManager {
|
||||
self.libraries.read().await.values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get the primary library (first available library)
|
||||
pub async fn get_primary_library(&self) -> Option<Arc<Library>> {
|
||||
self.get_open_libraries().await.into_iter().next()
|
||||
/// Get the active library
|
||||
/// This is the library that the session state is set to
|
||||
pub async fn get_active_library(&self) -> Option<Arc<Library>> {
|
||||
let session = self.session_state.get().await;
|
||||
self.get_open_libraries()
|
||||
.await
|
||||
.into_iter()
|
||||
.find(|lib| Some(lib.id()) == session.current_library_id)
|
||||
}
|
||||
|
||||
/// List all open libraries
|
||||
@@ -321,7 +302,7 @@ impl LibraryManager {
|
||||
}
|
||||
|
||||
/// Load all libraries from the search paths
|
||||
pub async fn load_all(&self) -> Result<usize> {
|
||||
pub async fn load_all(&self, context: Arc<CoreContext>) -> Result<usize> {
|
||||
let mut loaded_count = 0;
|
||||
|
||||
for search_path in &self.search_paths.clone() {
|
||||
@@ -336,7 +317,7 @@ impl LibraryManager {
|
||||
let path = entry.path();
|
||||
|
||||
if is_library_directory(&path) {
|
||||
match self.open_library(&path).await {
|
||||
match self.open_library(&path, context.clone()).await {
|
||||
Ok(_) => {
|
||||
loaded_count += 1;
|
||||
info!("Auto-loaded library from {:?}", path);
|
||||
@@ -469,13 +450,9 @@ impl LibraryManager {
|
||||
}
|
||||
|
||||
/// Ensure the current device is registered in the library
|
||||
async fn ensure_device_registered(
|
||||
&self,
|
||||
library: &Arc<Library>,
|
||||
context: &Arc<CoreContext>,
|
||||
) -> Result<()> {
|
||||
async fn ensure_device_registered(&self, library: &Arc<Library>) -> Result<()> {
|
||||
let db = library.db();
|
||||
let device = context
|
||||
let device = self
|
||||
.device_manager
|
||||
.to_device()
|
||||
.map_err(|e| LibraryError::Other(format!("Failed to get device info: {}", e)))?;
|
||||
|
||||
@@ -563,7 +563,7 @@ async fn update_location_stats(
|
||||
/// Get device UUID for current device
|
||||
async fn get_device_uuid(_library: Arc<Library>) -> LocationResult<Uuid> {
|
||||
// Get the current device ID from the global state
|
||||
let device_uuid = crate::common::utils::get_current_device_id();
|
||||
let device_uuid = crate::device::get_current_device_id();
|
||||
|
||||
if device_uuid.is_nil() {
|
||||
return Err(LocationError::InvalidPath(
|
||||
|
||||
@@ -106,7 +106,7 @@ impl PathResolver {
|
||||
context: &CoreContext,
|
||||
device_id: Uuid,
|
||||
) -> Result<(), PathResolutionError> {
|
||||
let current_device_id = crate::common::utils::get_current_device_id();
|
||||
let current_device_id = crate::device::get_current_device_id();
|
||||
|
||||
// Local device is always "online"
|
||||
if device_id == current_device_id {
|
||||
@@ -134,7 +134,7 @@ impl PathResolver {
|
||||
|
||||
/// Get list of currently online devices
|
||||
async fn get_online_devices(&self, context: &CoreContext) -> Vec<Uuid> {
|
||||
let mut online = vec![crate::common::utils::get_current_device_id()];
|
||||
let mut online = vec![crate::device::get_current_device_id()];
|
||||
|
||||
if let Some(networking) = context.get_networking().await {
|
||||
for device in networking.get_connected_devices().await {
|
||||
@@ -150,7 +150,7 @@ impl PathResolver {
|
||||
let mut metrics = HashMap::new();
|
||||
|
||||
// Local device has zero latency
|
||||
let current_device_id = crate::common::utils::get_current_device_id();
|
||||
let current_device_id = crate::device::get_current_device_id();
|
||||
metrics.insert(
|
||||
current_device_id,
|
||||
DeviceMetrics {
|
||||
@@ -184,8 +184,9 @@ impl PathResolver {
|
||||
) -> Result<SdPath, PathResolutionError> {
|
||||
// Get the current library
|
||||
let library = context
|
||||
.library_manager
|
||||
.get_primary_library()
|
||||
.libraries()
|
||||
.await
|
||||
.get_active_library()
|
||||
.await
|
||||
.ok_or(PathResolutionError::NoActiveLibrary)?;
|
||||
|
||||
@@ -218,7 +219,7 @@ impl PathResolver {
|
||||
let mut results = HashMap::new();
|
||||
|
||||
// Get the current library
|
||||
let library = match context.library_manager.get_primary_library().await {
|
||||
let library = match context.libraries().await.get_active_library().await {
|
||||
Some(lib) => lib,
|
||||
None => {
|
||||
// Return error for all content IDs
|
||||
@@ -302,10 +303,9 @@ impl PathResolver {
|
||||
.await?
|
||||
{
|
||||
// Build the full path using PathResolver
|
||||
let path = crate::ops::indexing::path_resolver::PathResolver::get_full_path(
|
||||
db, entry.id,
|
||||
)
|
||||
.await?;
|
||||
let path =
|
||||
crate::ops::indexing::path_resolver::PathResolver::get_full_path(db, entry.id)
|
||||
.await?;
|
||||
|
||||
instances.push(ContentInstance {
|
||||
device_id: device.uuid,
|
||||
@@ -397,11 +397,11 @@ impl PathResolver {
|
||||
if let Some(location) = location_map.get(&entry.id) {
|
||||
if let Some(device) = device_map.get(&location.device_id) {
|
||||
// Build the full path
|
||||
let path = crate::ops::indexing::path_resolver::PathResolver::get_full_path(
|
||||
db,
|
||||
entry.id,
|
||||
)
|
||||
.await?;
|
||||
let path =
|
||||
crate::ops::indexing::path_resolver::PathResolver::get_full_path(
|
||||
db, entry.id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let instance = ContentInstance {
|
||||
device_id: device.uuid,
|
||||
@@ -428,7 +428,7 @@ impl PathResolver {
|
||||
online_devices: &[Uuid],
|
||||
device_metrics: &HashMap<Uuid, DeviceMetrics>,
|
||||
) -> Option<SdPath> {
|
||||
let current_device_id = crate::common::utils::get_current_device_id();
|
||||
let current_device_id = crate::device::get_current_device_id();
|
||||
|
||||
let mut candidates: Vec<(f64, &ContentInstance)> = instances
|
||||
.iter()
|
||||
|
||||
@@ -12,7 +12,7 @@ impl Query for CoreStatusQuery {
|
||||
type Output = CoreStatus;
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output> {
|
||||
let libs = context.library_manager.list().await;
|
||||
let libs = context.libraries().await.list().await;
|
||||
Ok(CoreStatus {
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
library_count: libs.len(),
|
||||
|
||||
@@ -14,7 +14,8 @@ impl Query for JobInfoQuery {
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output> {
|
||||
let library = context
|
||||
.library_manager
|
||||
.libraries()
|
||||
.await
|
||||
.get_library(self.library_id)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("Library not found"))?;
|
||||
@@ -32,4 +33,3 @@ impl Query for JobInfoQuery {
|
||||
}
|
||||
|
||||
crate::register_query!(JobInfoQuery, "jobs.info");
|
||||
|
||||
|
||||
@@ -14,18 +14,23 @@ impl Query for JobListQuery {
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output> {
|
||||
let library = context
|
||||
.library_manager
|
||||
.libraries()
|
||||
.await
|
||||
.get_library(self.library_id)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("Library not found"))?;
|
||||
let jobs = library.jobs().list_jobs(self.status).await?;
|
||||
let items = jobs
|
||||
.into_iter()
|
||||
.map(|j| JobListItem { id: j.id, name: j.name, status: j.status, progress: j.progress })
|
||||
.map(|j| JobListItem {
|
||||
id: j.id,
|
||||
name: j.name,
|
||||
status: j.status,
|
||||
progress: j.progress,
|
||||
})
|
||||
.collect();
|
||||
Ok(JobListOutput { jobs: items })
|
||||
}
|
||||
}
|
||||
|
||||
crate::register_query!(JobListQuery, "jobs.list");
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ impl CoreAction for LibraryCreateAction {
|
||||
}
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output, ActionError> {
|
||||
let library_manager = &context.library_manager;
|
||||
let library_manager = context.libraries().await;
|
||||
let library = library_manager
|
||||
.create_library(
|
||||
self.input.name.clone(),
|
||||
@@ -40,11 +40,11 @@ impl CoreAction for LibraryCreateAction {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(LibraryCreateOutput::new(
|
||||
library.id(),
|
||||
library.name().await,
|
||||
library.path().to_path_buf(),
|
||||
))
|
||||
// Get the name and path
|
||||
let name = library.name().await;
|
||||
let path = library.path().to_path_buf();
|
||||
|
||||
Ok(LibraryCreateOutput::new(library.id(), name, path))
|
||||
}
|
||||
|
||||
fn action_kind(&self) -> &'static str {
|
||||
|
||||
@@ -46,7 +46,8 @@ impl CoreAction for LibraryDeleteAction {
|
||||
) -> Result<Self::Output, ActionError> {
|
||||
// Get the library to get its name before deletion
|
||||
let library = context
|
||||
.library_manager
|
||||
.libraries()
|
||||
.await
|
||||
.get_library(self.input.library_id)
|
||||
.await
|
||||
.ok_or_else(|| ActionError::LibraryNotFound(self.input.library_id))?;
|
||||
@@ -55,7 +56,8 @@ impl CoreAction for LibraryDeleteAction {
|
||||
|
||||
// Delete the library through the library manager
|
||||
context
|
||||
.library_manager
|
||||
.libraries()
|
||||
.await
|
||||
.delete_library(self.input.library_id, self.input.delete_data)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ impl Query for ListLibrariesQuery {
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output> {
|
||||
// Get all open libraries from the library manager
|
||||
let libraries = context.library_manager.list().await;
|
||||
let libraries = context.libraries().await.list().await;
|
||||
let mut result = Vec::new();
|
||||
|
||||
for library in libraries {
|
||||
|
||||
@@ -19,7 +19,6 @@ use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LocationAddInput {
|
||||
pub library_id: Uuid,
|
||||
pub path: PathBuf,
|
||||
pub name: Option<String>,
|
||||
pub mode: IndexMode,
|
||||
|
||||
@@ -14,18 +14,26 @@ impl Query for LocationsListQuery {
|
||||
|
||||
async fn execute(self, context: Arc<CoreContext>) -> Result<Self::Output> {
|
||||
// Fetch library and query locations table
|
||||
let library = context.library_manager.get_library(self.library_id).await.ok_or_else(|| anyhow::anyhow!("Library not found"))?;
|
||||
let library = context
|
||||
.libraries()
|
||||
.await
|
||||
.get_library(self.library_id)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("Library not found"))?;
|
||||
let db = library.db().conn();
|
||||
let rows = crate::infra::db::entities::location::Entity::find()
|
||||
.all(db)
|
||||
.await?;
|
||||
let mut out = Vec::new();
|
||||
for r in rows {
|
||||
out.push(LocationInfo { id: r.uuid, path: std::path::PathBuf::from(r.name.clone().unwrap_or_default()), name: r.name.clone() });
|
||||
out.push(LocationInfo {
|
||||
id: r.uuid,
|
||||
path: std::path::PathBuf::from(r.name.clone().unwrap_or_default()),
|
||||
name: r.name.clone(),
|
||||
});
|
||||
}
|
||||
Ok(LocationsListOutput { locations: out })
|
||||
}
|
||||
}
|
||||
|
||||
crate::register_query!(LocationsListQuery, "locations.list");
|
||||
|
||||
|
||||
@@ -19,45 +19,8 @@ pub mod libraries;
|
||||
pub mod locations;
|
||||
pub mod media;
|
||||
// pub mod metadata;
|
||||
pub mod jobs;
|
||||
pub mod network;
|
||||
pub mod registry;
|
||||
pub mod sidecar;
|
||||
pub mod volumes;
|
||||
pub mod jobs;
|
||||
pub mod network;
|
||||
|
||||
/// Register all jobs with the job system
|
||||
///
|
||||
/// This should be called during core initialization to register all available job types
|
||||
pub fn register_all_jobs() {
|
||||
// File operation jobs
|
||||
register_job::<files::copy::FileCopyJob>();
|
||||
register_job::<files::copy::MoveJob>();
|
||||
register_job::<files::delete::DeleteJob>();
|
||||
register_job::<files::duplicate_detection::DuplicateDetectionJob>();
|
||||
register_job::<files::validation::ValidationJob>();
|
||||
|
||||
// Indexing jobs
|
||||
register_job::<indexing::IndexerJob>();
|
||||
|
||||
// Media processing jobs
|
||||
register_job::<media::ThumbnailJob>();
|
||||
}
|
||||
|
||||
/// Register a single job type with the job system
|
||||
///
|
||||
/// This function would be called automatically by a derive macro in a real implementation,
|
||||
/// but for now we call it manually for each job type.
|
||||
fn register_job<T>()
|
||||
where
|
||||
T: crate::infra::job::traits::Job + 'static,
|
||||
{
|
||||
// In a real implementation with inventory, this would automatically register the job
|
||||
// For now, this serves as documentation of which jobs should be registered
|
||||
|
||||
// The actual registration would happen via:
|
||||
// inventory::submit! {
|
||||
// crate::infra::job::registration::JobRegistration::new::<T>()
|
||||
// }
|
||||
|
||||
// For now we'll just log that the job type exists
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ pub type QueryHandlerFn = fn(
|
||||
/// Handler function signature for actions.
|
||||
pub type ActionHandlerFn = fn(
|
||||
Arc<crate::Core>,
|
||||
crate::infra::daemon::state::SessionState,
|
||||
crate::service::session::SessionState,
|
||||
Vec<u8>,
|
||||
) -> std::pin::Pin<
|
||||
Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'static>,
|
||||
@@ -111,8 +111,8 @@ where
|
||||
/// Generic library action handler (decode A::Input -> A::from_input -> dispatch)
|
||||
pub fn handle_library_action<A>(
|
||||
core: Arc<crate::Core>,
|
||||
// TODO: Move session state to core, shouldn't be in the daemon
|
||||
session: crate::infra::daemon::state::SessionState,
|
||||
// this isn't used, but is required by the interface, maybe fix?
|
||||
session: crate::service::session::SessionState,
|
||||
payload: Vec<u8>,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'static>>
|
||||
where
|
||||
@@ -128,9 +128,10 @@ where
|
||||
.0;
|
||||
let action = A::from_input(input)?;
|
||||
let manager = crate::infra::action::manager::ActionManager::new(core.context.clone());
|
||||
let session = core.context.session_state.get().await;
|
||||
let library_id = session.current_library_id.ok_or("No library selected")?;
|
||||
let out = manager
|
||||
.dispatch_library(library_id, action)
|
||||
.dispatch_library(Some(library_id), action)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
encode_to_vec(&out, standard()).map_err(|e| e.to_string())
|
||||
@@ -140,7 +141,7 @@ where
|
||||
/// Generic core action handler (decode A::Input -> A::from_input -> dispatch)
|
||||
pub fn handle_core_action<A>(
|
||||
core: Arc<crate::Core>,
|
||||
session: crate::infra::daemon::state::SessionState,
|
||||
session: crate::service::session::SessionState,
|
||||
payload: Vec<u8>,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + 'static>>
|
||||
where
|
||||
|
||||
@@ -158,7 +158,6 @@ impl FileSharingService {
|
||||
// Get the current library to access its job manager
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_primary_library()
|
||||
.await
|
||||
.ok_or(SharingError::JobError(
|
||||
@@ -193,7 +192,6 @@ impl FileSharingService {
|
||||
) -> Result<Vec<TransferId>, SharingError> {
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_primary_library()
|
||||
.await
|
||||
.ok_or(SharingError::JobError(
|
||||
@@ -317,15 +315,14 @@ impl FileSharingService {
|
||||
) -> Result<TransferStatus, SharingError> {
|
||||
match transfer_id {
|
||||
TransferId::JobId { job_id, library_id } => {
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_library(*library_id)
|
||||
.await
|
||||
.ok_or(SharingError::JobError(format!(
|
||||
"Library {} not found",
|
||||
library_id
|
||||
)))?;
|
||||
let library =
|
||||
self.context
|
||||
.get_library(*library_id)
|
||||
.await
|
||||
.ok_or(SharingError::JobError(format!(
|
||||
"Library {} not found",
|
||||
library_id
|
||||
)))?;
|
||||
|
||||
let job_manager = library.jobs();
|
||||
|
||||
@@ -383,15 +380,14 @@ impl FileSharingService {
|
||||
pub async fn cancel_transfer(&self, transfer_id: &TransferId) -> Result<(), SharingError> {
|
||||
match transfer_id {
|
||||
TransferId::JobId { job_id, library_id } => {
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_library(*library_id)
|
||||
.await
|
||||
.ok_or(SharingError::JobError(format!(
|
||||
"Library {} not found",
|
||||
library_id
|
||||
)))?;
|
||||
let library =
|
||||
self.context
|
||||
.get_library(*library_id)
|
||||
.await
|
||||
.ok_or(SharingError::JobError(format!(
|
||||
"Library {} not found",
|
||||
library_id
|
||||
)))?;
|
||||
|
||||
let job_manager = library.jobs();
|
||||
|
||||
@@ -414,7 +410,6 @@ impl FileSharingService {
|
||||
pub async fn get_active_transfers(&self) -> Result<Vec<TransferStatus>, SharingError> {
|
||||
let library = self
|
||||
.context
|
||||
.library_manager
|
||||
.get_primary_library()
|
||||
.await
|
||||
.ok_or(SharingError::JobError(
|
||||
@@ -495,7 +490,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
crypto::library_key_manager::LibraryKeyManager, device::DeviceManager,
|
||||
infra::event::EventBus, library::LibraryManager,
|
||||
infra::event::EventBus, library::LibraryManager, service::session::SessionStateService,
|
||||
};
|
||||
use tempfile::tempdir;
|
||||
|
||||
@@ -503,22 +498,27 @@ mod tests {
|
||||
async fn test_file_sharing_service_creation() {
|
||||
let events = Arc::new(EventBus::default());
|
||||
let device_manager = Arc::new(DeviceManager::init().unwrap());
|
||||
let library_manager = Arc::new(LibraryManager::new_with_dir(
|
||||
std::env::temp_dir().join("test_libraries"),
|
||||
events.clone(),
|
||||
));
|
||||
let volume_manager = Arc::new(crate::volume::VolumeManager::new(
|
||||
uuid::Uuid::new_v4(), // Test device ID
|
||||
crate::volume::VolumeDetectionConfig::default(),
|
||||
events.clone(),
|
||||
));
|
||||
let library_key_manager = Arc::new(LibraryKeyManager::new().unwrap());
|
||||
let session_state = Arc::new(SessionStateService::new(std::env::temp_dir()));
|
||||
let library_manager = Arc::new(LibraryManager::new_with_dir(
|
||||
std::env::temp_dir().join("test_libraries"),
|
||||
events.clone(),
|
||||
session_state.clone(),
|
||||
volume_manager.clone(),
|
||||
device_manager.clone(),
|
||||
));
|
||||
let context = Arc::new(CoreContext::new(
|
||||
events,
|
||||
device_manager,
|
||||
library_manager,
|
||||
volume_manager,
|
||||
library_key_manager,
|
||||
session_state,
|
||||
));
|
||||
|
||||
let _file_sharing = FileSharingService::new(context);
|
||||
@@ -537,22 +537,27 @@ mod tests {
|
||||
async fn test_create_file_metadata() {
|
||||
let events = Arc::new(EventBus::default());
|
||||
let device_manager = Arc::new(DeviceManager::init().unwrap());
|
||||
let library_manager = Arc::new(LibraryManager::new_with_dir(
|
||||
std::env::temp_dir().join("test_libraries"),
|
||||
events.clone(),
|
||||
));
|
||||
let volume_manager = Arc::new(crate::volume::VolumeManager::new(
|
||||
uuid::Uuid::new_v4(), // Test device ID
|
||||
crate::volume::VolumeDetectionConfig::default(),
|
||||
events.clone(),
|
||||
));
|
||||
let library_key_manager = Arc::new(LibraryKeyManager::new().unwrap());
|
||||
let session_state = Arc::new(SessionStateService::new(std::env::temp_dir()));
|
||||
let library_manager = Arc::new(LibraryManager::new_with_dir(
|
||||
std::env::temp_dir().join("test_libraries"),
|
||||
events.clone(),
|
||||
session_state.clone(),
|
||||
volume_manager.clone(),
|
||||
device_manager.clone(),
|
||||
));
|
||||
let context = Arc::new(CoreContext::new(
|
||||
events,
|
||||
device_manager,
|
||||
library_manager,
|
||||
volume_manager,
|
||||
library_key_manager,
|
||||
session_state,
|
||||
));
|
||||
let file_sharing = FileSharingService::new(context);
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
//! Background services management
|
||||
|
||||
use crate::{
|
||||
context::CoreContext, infra::event::EventBus,
|
||||
crypto::library_key_manager::LibraryKeyManager,
|
||||
context::CoreContext, crypto::library_key_manager::LibraryKeyManager, infra::event::EventBus,
|
||||
service::session::SessionStateService,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
@@ -12,17 +12,18 @@ use tracing::info;
|
||||
pub mod device;
|
||||
pub mod entry_state_service;
|
||||
pub mod file_sharing;
|
||||
pub mod watcher;
|
||||
pub mod network;
|
||||
pub mod session;
|
||||
pub mod sidecar_manager;
|
||||
pub mod volume_monitor;
|
||||
pub mod watcher;
|
||||
|
||||
use device::DeviceService;
|
||||
use file_sharing::FileSharingService;
|
||||
use watcher::{LocationWatcher, LocationWatcherConfig};
|
||||
use network::NetworkingService;
|
||||
use sidecar_manager::SidecarManager;
|
||||
use volume_monitor::{VolumeMonitorService, VolumeMonitorConfig};
|
||||
use volume_monitor::{VolumeMonitorConfig, VolumeMonitorService};
|
||||
use watcher::{LocationWatcher, LocationWatcherConfig};
|
||||
|
||||
/// Container for all background services
|
||||
#[derive(Clone)]
|
||||
@@ -41,6 +42,8 @@ pub struct Services {
|
||||
pub sidecar_manager: Arc<SidecarManager>,
|
||||
/// Library key manager
|
||||
pub library_key_manager: Arc<LibraryKeyManager>,
|
||||
/// Session state service
|
||||
pub session_state: Arc<SessionStateService>,
|
||||
/// Shared context for all services
|
||||
context: Arc<CoreContext>,
|
||||
}
|
||||
@@ -59,15 +62,16 @@ impl Services {
|
||||
let device = Arc::new(DeviceService::new(context.clone()));
|
||||
let sidecar_manager = Arc::new(SidecarManager::new(context.clone()));
|
||||
let library_key_manager = context.library_key_manager.clone();
|
||||
|
||||
let session_state = context.session_state.clone();
|
||||
Self {
|
||||
location_watcher,
|
||||
file_sharing,
|
||||
device,
|
||||
networking: None, // Initialized separately when needed
|
||||
networking: None, // Initialized separately when needed
|
||||
volume_monitor: None, // Initialized after library manager is available
|
||||
sidecar_manager,
|
||||
library_key_manager,
|
||||
session_state,
|
||||
context,
|
||||
}
|
||||
}
|
||||
@@ -126,7 +130,7 @@ impl Services {
|
||||
library_key_manager: std::sync::Arc<crate::crypto::library_key_manager::LibraryKeyManager>,
|
||||
data_dir: impl AsRef<std::path::Path>,
|
||||
) -> Result<()> {
|
||||
use crate::service::network::{NetworkingService, utils::logging::ConsoleLogger};
|
||||
use crate::service::network::{utils::logging::ConsoleLogger, NetworkingService};
|
||||
|
||||
info!("Initializing networking service");
|
||||
let logger = std::sync::Arc::new(ConsoleLogger);
|
||||
|
||||
@@ -65,7 +65,8 @@ async fn test_library_lifecycle() {
|
||||
assert!(core.libraries.close_library(lib_id).await.is_err());
|
||||
|
||||
// Re-open library
|
||||
let reopened = core.libraries.open_library(&lib_path).await.unwrap();
|
||||
let device_id = core.context.device_manager.device_id().unwrap();
|
||||
let reopened = core.libraries.open_library(&lib_path, device_id).await.unwrap();
|
||||
assert_eq!(reopened.id(), lib_id);
|
||||
assert_eq!(reopened.name().await, "Test Library");
|
||||
|
||||
@@ -92,7 +93,8 @@ async fn test_library_locking() {
|
||||
let lib_path = library.path().to_path_buf();
|
||||
|
||||
// Try to open same library again - should fail
|
||||
let result = core.libraries.open_library(&lib_path).await;
|
||||
let device_id = core.context.device_manager.device_id().unwrap();
|
||||
let result = core.libraries.open_library(&lib_path, device_id).await;
|
||||
assert!(result.is_err());
|
||||
|
||||
// Close library
|
||||
@@ -103,7 +105,8 @@ async fn test_library_locking() {
|
||||
drop(library);
|
||||
|
||||
// Now should be able to open
|
||||
let reopened = core.libraries.open_library(&lib_path).await.unwrap();
|
||||
let device_id = core.context.device_manager.device_id().unwrap();
|
||||
let reopened = core.libraries.open_library(&lib_path, device_id).await.unwrap();
|
||||
assert_eq!(reopened.name().await, "Lock Test");
|
||||
}
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ async fn resolve_optimal_path(
|
||||
content_id: Uuid
|
||||
) -> Result<SdPath, PathResolutionError> {
|
||||
// 1. Get the current library's DB connection from context
|
||||
let library = context.library_manager.get_primary_library().await
|
||||
let library = context.library_manager.get_active_library().await
|
||||
.ok_or(PathResolutionError::NoActiveLibrary)?;
|
||||
let db = library.db().conn();
|
||||
|
||||
@@ -348,4 +348,4 @@ async fn copy_files(
|
||||
|
||||
### 7. Conclusion
|
||||
|
||||
Refactoring `SdPath` from a simple `struct` to the dual-mode `enum` is a critical step in realizing the full architectural vision of Spacedrive. It replaces a fragile pointer system with a resilient, content-aware abstraction. This change directly enables the promised features of transparent failover and performance optimization, and it provides the necessary foundation for the **Simulation Engine** and other advanced, AI-native capabilities.
|
||||
Refactoring `SdPath` from a simple `struct` to the dual-mode `enum` is a critical step in realizing the full architectural vision of Spacedrive. It replaces a fragile pointer system with a resilient, content-aware abstraction. This change directly enables the promised features of transparent failover and performance optimization, and it provides the necessary foundation for the **Simulation Engine** and other advanced, AI-native capabilities.
|
||||
|
||||
Reference in New Issue
Block a user