From b52e307e33803bda6f76c824da15e2daa97a63fa Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Fri, 19 Sep 2025 19:14:55 -0700 Subject: [PATCH] feat: Implement instance name validation and refactor daemon handling - Added a new function `validate_instance_name` to ensure instance names are secure and conform to specified criteria, preventing path traversal attacks. - Updated the CLI and daemon to utilize the new validation function when processing instance names. - Refactored the `RpcServer` to replace the `CoreInstanceManager` with a direct `Core` instance, simplifying the architecture. - Removed the `instance.rs` file as its functionality has been integrated into the main daemon logic. --- apps/cli/src/main.rs | 31 +++++- core/src/bin/daemon.rs | 64 +++++++++++- core/src/infra/daemon/bootstrap.rs | 155 ++++++----------------------- core/src/infra/daemon/instance.rs | 102 ------------------- core/src/infra/daemon/mod.rs | 1 - core/src/infra/daemon/rpc.rs | 85 +++++++++------- 6 files changed, 165 insertions(+), 273 deletions(-) delete mode 100644 core/src/infra/daemon/instance.rs diff --git a/apps/cli/src/main.rs b/apps/cli/src/main.rs index e46fb7ad7..ebefaf245 100644 --- a/apps/cli/src/main.rs +++ b/apps/cli/src/main.rs @@ -3,6 +3,23 @@ use clap::{Parser, Subcommand}; use sd_core::client::CoreClient; use std::path::Path; +/// Validate instance name to prevent path traversal attacks +fn validate_instance_name(instance: &str) -> Result<(), String> { + if instance.is_empty() { + return Err("Instance name cannot be empty".to_string()); + } + if instance.len() > 64 { + return Err("Instance name too long (max 64 characters)".to_string()); + } + if !instance + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string()); + } + Ok(()) +} + mod context; mod domains; mod ui; @@ -170,7 +187,7 @@ async fn main() -> Result<()> { // Validate instance name for security if let Some(ref inst) = instance { - sd_core::infra::daemon::instance::validate_instance_name(inst) + validate_instance_name(inst) .map_err(|e| anyhow::anyhow!("Invalid instance name: {}", e))?; } @@ -208,9 +225,17 @@ async fn main() -> Result<()> { let daemon_path = current_exe.parent().unwrap().join("daemon"); let mut command = std::process::Command::new(daemon_path); - // Pass networking flag if enabled (if daemon supports it) + // Pass data directory + command.arg("--data-dir").arg(&data_dir); + + // Pass instance name if specified + if let Some(ref inst) = instance { + command.arg("--instance").arg(inst); + } + + // Pass networking flag if enabled if enable_networking { - println!("Note: Networking flag passed but daemon may not support it yet"); + command.arg("--enable-networking"); } // Set working directory to current directory diff --git a/core/src/bin/daemon.rs b/core/src/bin/daemon.rs index cbcc7483c..8a1c54ba7 100644 --- a/core/src/bin/daemon.rs +++ b/core/src/bin/daemon.rs @@ -1,10 +1,64 @@ -// use std::path::PathBuf; +use clap::Parser; +use std::path::PathBuf; + +/// Validate instance name to prevent path traversal attacks +fn validate_instance_name(instance: &str) -> Result<(), String> { + if instance.is_empty() { + return Err("Instance name cannot be empty".to_string()); + } + if instance.len() > 64 { + return Err("Instance name too long (max 64 characters)".to_string()); + } + if !instance + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string()); + } + Ok(()) +} + +#[derive(Parser, Debug)] +#[command(name = "spacedrive-daemon", about = "Spacedrive daemon")] +struct Args { + /// Path to spacedrive data directory + #[arg(long)] + data_dir: Option, + + /// Daemon instance name + #[arg(long)] + instance: Option, + + /// Enable networking + #[arg(long)] + enable_networking: bool, +} #[tokio::main] async fn main() -> Result<(), Box> { - // Resolve default data dir and socket path - let data_dir = sd_core::config::default_data_dir()?; - let socket_path = data_dir.join("daemon/daemon.sock"); + let args = Args::parse(); - sd_core::infra::daemon::bootstrap::start_default_server(socket_path, data_dir, false).await + // Resolve data directory + let data_dir = args + .data_dir + .unwrap_or(sd_core::config::default_data_dir()?); + + // Calculate socket path based on instance + let socket_path = if let Some(instance) = args.instance { + // Validate instance name for security + validate_instance_name(&instance) + .map_err(|e| format!("Invalid instance name: {}", e))?; + data_dir + .join("daemon") + .join(format!("daemon-{}.sock", instance)) + } else { + data_dir.join("daemon/daemon.sock") + }; + + sd_core::infra::daemon::bootstrap::start_default_server( + socket_path, + data_dir, + args.enable_networking, + ) + .await } diff --git a/core/src/infra/daemon/bootstrap.rs b/core/src/infra/daemon/bootstrap.rs index f804afb70..f8eebe946 100644 --- a/core/src/infra/daemon/bootstrap.rs +++ b/core/src/infra/daemon/bootstrap.rs @@ -2,11 +2,10 @@ use std::path::PathBuf; use std::sync::Arc; use tracing::info; -use crate::infra::daemon::{ - instance::CoreInstanceManager, rpc::RpcServer, -}; +use crate::infra::daemon::rpc::RpcServer; +use crate::Core; -/// Start a default daemon server with built-in handlers and default instance +/// Start a daemon server with a single Core instance pub async fn start_default_server( socket_path: PathBuf, data_dir: PathBuf, @@ -15,28 +14,40 @@ pub async fn start_default_server( // Initialize basic tracing with file logging first initialize_tracing_with_file_logging(&data_dir)?; - let instances = Arc::new(CoreInstanceManager::new( - data_dir.clone(), - enable_networking, - )); + // Create a single Core instance + let core = Arc::new( + Core::new_with_config(data_dir.clone()) + .await + .map_err(|e| format!("Failed to create core: {}", e))?, + ); + + let core = if enable_networking { + Core::init_networking_shared(core) + .await + .map_err(|e| format!("Failed to initialize networking: {}", e))? + } else { + core + }; info!("Starting Spacedrive daemon"); info!("Data directory: {:?}", data_dir); info!("Socket path: {:?}", socket_path); info!("Networking enabled: {}", enable_networking); - let mut server = RpcServer::new(socket_path, instances.clone()); + let mut server = RpcServer::new(socket_path, core.clone()); - // Start the server, which will initialize the core and set up event streaming + // Start the server, which will initialize event streaming server.start().await } /// Initialize tracing with file logging to {data_dir}/logs/daemon.log -fn initialize_tracing_with_file_logging(data_dir: &PathBuf) -> Result<(), Box> { +fn initialize_tracing_with_file_logging( + data_dir: &PathBuf, +) -> Result<(), Box> { + use crate::infra::event::log_emitter::LogEventLayer; use std::sync::Once; - use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, fmt, EnvFilter}; - use crate::infra::event::log_emitter::LogEventLayer; use tracing_appender::rolling::{RollingFileAppender, Rotation}; + use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; static INIT: Once = Once::new(); let mut result: Result<(), Box> = Ok(()); @@ -54,29 +65,25 @@ fn initialize_tracing_with_file_logging(data_dir: &PathBuf) -> Result<(), Box Result<(), Box) -> Result<(), Box> { - info!("Initializing core instance for log streaming..."); - - // Initialize the default core instance to get access to the event bus - let core = instances.get_default().await - .map_err(|e| format!("Failed to initialize core for log streaming: {}", e))?; - - info!("Core instance initialized, setting up tracing log event layer..."); - - // Now set up the log event layer with the core's event bus - setup_tracing_log_event_layer(core.events.clone())?; - - info!("Starting test event emission..."); - - // Emit some test events to verify the system is working - emit_test_log_events(&core.events).await; - - Ok(()) -} - -/// Set up the tracing log event layer to forward logs to the event bus -fn setup_tracing_log_event_layer(event_bus: Arc) -> Result<(), Box> { - use crate::infra::event::log_emitter::LogEventLayer; - use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Registry}; - - // We need to rebuild the subscriber with the log event layer - // This is a bit tricky since tracing is already initialized - // For now, let's just emit manual events and improve this later - - // TODO: Properly integrate LogEventLayer with existing tracing subscriber - // This requires restructuring the tracing initialization to be done after core creation - - Ok(()) -} - -/// Emit test log events to demonstrate the log streaming functionality -async fn emit_test_log_events(event_bus: &Arc) { - use crate::infra::event::Event; - use chrono::Utc; - - info!("Emitting test log events to event bus"); - - // Emit a series of test log events - let events = vec![ - Event::LogMessage { - timestamp: Utc::now(), - level: "INFO".to_string(), - target: "sd_core::daemon".to_string(), - message: "Spacedrive daemon started successfully".to_string(), - job_id: None, - library_id: None, - }, - Event::LogMessage { - timestamp: Utc::now(), - level: "INFO".to_string(), - target: "sd_core::event".to_string(), - message: "Log event streaming initialized".to_string(), - job_id: None, - library_id: None, - }, - Event::LogMessage { - timestamp: Utc::now(), - level: "DEBUG".to_string(), - target: "sd_core::rpc".to_string(), - message: "RPC server listening for connections".to_string(), - job_id: None, - library_id: None, - }, - ]; - - for (i, event) in events.into_iter().enumerate() { - info!("Emitting test event {}: {:?}", i + 1, event); - event_bus.emit(event); - // Small delay between events - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - } - - // Emit periodic heartbeat events every 10 seconds for testing - tokio::spawn({ - let event_bus = event_bus.clone(); - async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); - loop { - interval.tick().await; - let heartbeat_event = Event::LogMessage { - timestamp: Utc::now(), - level: "DEBUG".to_string(), - target: "sd_core::daemon".to_string(), - message: "Daemon heartbeat".to_string(), - job_id: None, - library_id: None, - }; - info!("Emitting heartbeat event: {:?}", heartbeat_event); - event_bus.emit(heartbeat_event); - } - } - }); - - info!("Test log events setup complete"); -} diff --git a/core/src/infra/daemon/instance.rs b/core/src/infra/daemon/instance.rs deleted file mode 100644 index dee1aa83a..000000000 --- a/core/src/infra/daemon/instance.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::RwLock; - -use crate::Core; - -/// Validate instance name to prevent path traversal attacks -pub fn validate_instance_name(instance: &str) -> Result<(), String> { - if instance.is_empty() { - return Err("Instance name cannot be empty".to_string()); - } - if instance.len() > 64 { - return Err("Instance name too long (max 64 characters)".to_string()); - } - if !instance - .chars() - .all(|c| c.is_alphanumeric() || c == '-' || c == '_') - { - return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string()); - } - Ok(()) -} - -/// Manages lifecycle of Core instances (by name) -pub struct CoreInstanceManager { - instances: Arc>>>, - default_data_dir: PathBuf, - enable_networking: bool, -} - -impl CoreInstanceManager { - pub fn new(default_data_dir: PathBuf, enable_networking: bool) -> Self { - Self { - instances: Arc::new(RwLock::new(HashMap::new())), - default_data_dir, - enable_networking, - } - } - - /// Get or start the default instance - pub async fn get_default(&self) -> Result, String> { - self.get_or_start("default".to_string(), None).await - } - - /// Get or start a named instance, optionally with a specific data_dir - pub async fn get_or_start( - &self, - name: String, - data_dir: Option, - ) -> Result, String> { - // Validate instance name for security - validate_instance_name(&name)?; - - // Use entry API to avoid race conditions - use std::collections::hash_map::Entry; - - let mut instances = self.instances.write().await; - let entry = instances.entry(name.clone()); - - match entry { - Entry::Occupied(existing) => { - // Instance already exists, return it - Ok(existing.get().clone()) - } - Entry::Vacant(vacant) => { - // Instance doesn't exist, create it - let data_dir = data_dir.unwrap_or_else(|| self.default_data_dir.clone()); - let core = Arc::new( - Core::new_with_config(data_dir) - .await - .map_err(|e| format!("Failed to create core: {}", e))?, - ); - - let core_with_networking = if self.enable_networking { - Core::init_networking_shared(core.clone()) - .await - .map_err(|e| format!("Failed to initialize networking: {}", e))? - } else { - core - }; - - // Insert and return the new instance - vacant.insert(core_with_networking.clone()); - Ok(core_with_networking) - } - } - } - - /// Shutdown a named instance - pub async fn shutdown(&self, name: &str) -> Result<(), String> { - // Validate instance name for security - validate_instance_name(name)?; - - if let Some(core) = self.instances.write().await.remove(name) { - core.shutdown() - .await - .map_err(|e| format!("Shutdown failed: {}", e))?; - } - Ok(()) - } -} diff --git a/core/src/infra/daemon/mod.rs b/core/src/infra/daemon/mod.rs index 83c00a707..37b78d7c1 100644 --- a/core/src/infra/daemon/mod.rs +++ b/core/src/infra/daemon/mod.rs @@ -4,6 +4,5 @@ pub mod bootstrap; pub mod client; pub mod dispatch; pub mod health; -pub mod instance; pub mod rpc; pub mod types; diff --git a/core/src/infra/daemon/rpc.rs b/core/src/infra/daemon/rpc.rs index 118136ffb..0c45d747e 100644 --- a/core/src/infra/daemon/rpc.rs +++ b/core/src/infra/daemon/rpc.rs @@ -1,16 +1,16 @@ +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use std::collections::HashMap; -use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixListener; use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; -use crate::infra::daemon::instance::CoreInstanceManager; use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse, EventFilter}; -use crate::infra::event::{Event, EventSubscriber}; use crate::infra::event::log_emitter::set_global_log_event_bus; +use crate::infra::event::{Event, EventSubscriber}; +use crate::Core; /// Connection information for event streaming #[derive(Debug)] @@ -24,7 +24,7 @@ struct Connection { /// Minimal JSON-over-UDS RPC server with event streaming support pub struct RpcServer { socket_path: PathBuf, - instances: Arc, + core: Arc, shutdown_tx: mpsc::Sender<()>, shutdown_rx: mpsc::Receiver<()>, /// Active connections for event streaming @@ -32,11 +32,11 @@ pub struct RpcServer { } impl RpcServer { - pub fn new(socket_path: PathBuf, instances: Arc) -> Self { + pub fn new(socket_path: PathBuf, core: Arc) -> Self { let (shutdown_tx, shutdown_rx) = mpsc::channel(1); Self { socket_path, - instances, + core, shutdown_tx, shutdown_rx, connections: Arc::new(RwLock::new(HashMap::new())), @@ -63,14 +63,14 @@ impl RpcServer { result = listener.accept() => { match result { Ok((stream, _addr)) => { - let instances = self.instances.clone(); + let core = self.core.clone(); let shutdown_tx = self.shutdown_tx.clone(); let connections = self.connections.clone(); // Spawn task for concurrent request handling tokio::spawn(async move { // Convert errors to strings to ensure Send - if let Err(e) = Self::handle_connection(stream, instances, shutdown_tx, connections).await { + if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections).await { eprintln!("Connection error: {}", e); } }); @@ -95,15 +95,15 @@ impl RpcServer { /// Start the event broadcaster that forwards core events to subscribed connections async fn start_event_broadcaster(&self) -> Result<(), Box> { - let core = self.instances.get_default().await?; + let core = self.core.clone(); // Make the core's EventBus globally available to the LogEventLayer set_global_log_event_bus(core.events.clone()); let mut event_subscriber = core.events.subscribe(); let connections = self.connections.clone(); - // Optional: can emit a one-off info to prove the pipe works - tracing::info!("Log event bus registered for realtime streaming"); + // Optional: can emit a one-off info to prove the pipe works + tracing::info!("Log event bus registered for realtime streaming"); tokio::spawn(async move { while let Ok(event) = event_subscriber.recv().await { @@ -111,7 +111,11 @@ impl RpcServer { // Broadcast event to all subscribed connections for connection in connections_read.values() { - if Self::should_forward_event(&event, &connection.event_types, &connection.filter) { + if Self::should_forward_event( + &event, + &connection.event_types, + &connection.filter, + ) { // Ignore errors if connection is closed let _ = connection.event_tx.send(event.clone()); } @@ -189,7 +193,11 @@ impl RpcServer { } /// Check if an event should be forwarded to a connection based on filters - fn should_forward_event(event: &Event, event_types: &[String], filter: &Option) -> bool { + fn should_forward_event( + event: &Event, + event_types: &[String], + filter: &Option, + ) -> bool { // If no specific event types requested, forward all events if event_types.is_empty() { return true; @@ -221,23 +229,25 @@ impl RpcServer { // Apply additional filters if specified if let Some(filter) = filter { match event { - Event::JobProgress { job_id, .. } | - Event::JobStarted { job_id, .. } | - Event::JobCompleted { job_id, .. } | - Event::JobFailed { job_id, .. } | - Event::JobCancelled { job_id, .. } => { + Event::JobProgress { job_id, .. } + | Event::JobStarted { job_id, .. } + | Event::JobCompleted { job_id, .. } + | Event::JobFailed { job_id, .. } + | Event::JobCancelled { job_id, .. } => { if let Some(filter_job_id) = &filter.job_id { return job_id == filter_job_id; } } - Event::LibraryCreated { id, .. } | - Event::LibraryOpened { id, .. } | - Event::LibraryClosed { id, .. } => { + Event::LibraryCreated { id, .. } + | Event::LibraryOpened { id, .. } + | Event::LibraryClosed { id, .. } => { if let Some(filter_library_id) = &filter.library_id { return id == filter_library_id; } } - Event::LogMessage { job_id, library_id, .. } => { + Event::LogMessage { + job_id, library_id, .. + } => { // Filter by job ID if specified if let Some(filter_job_id) = &filter.job_id { if let Some(log_job_id) = job_id { @@ -266,7 +276,7 @@ impl RpcServer { /// Handle individual client connection concurrently async fn handle_connection( stream: tokio::net::UnixStream, - instances: Arc, + core: Arc, shutdown_tx: mpsc::Sender<()>, connections: Arc>>, ) -> Result<(), String> { @@ -292,7 +302,7 @@ impl RpcServer { if let Ok(request) = serde_json::from_str::(&line.trim()) { let response = Self::process_request( request, - &instances, + &core, &shutdown_tx, &connections, connection_id, @@ -352,7 +362,7 @@ impl RpcServer { /// Process a parsed daemon request async fn process_request( request: DaemonRequest, - instances: &Arc, + core: &Arc, shutdown_tx: &mpsc::Sender<()>, connections: &Arc>>, connection_id: Uuid, @@ -361,23 +371,24 @@ impl RpcServer { match request { DaemonRequest::Ping => DaemonResponse::Pong, - DaemonRequest::Action { method, payload } => match instances.get_default().await { - Ok(core) => match core.execute_operation_by_method(&method, payload).await { + DaemonRequest::Action { method, payload } => { + match core.execute_operation_by_method(&method, payload).await { Ok(out) => DaemonResponse::Ok(out), Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)), - }, - Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)), - }, + } + } - DaemonRequest::Query { method, payload } => match instances.get_default().await { - Ok(core) => match core.execute_operation_by_method(&method, payload).await { + DaemonRequest::Query { method, payload } => { + match core.execute_operation_by_method(&method, payload).await { Ok(out) => DaemonResponse::Ok(out), Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)), - }, - Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)), - }, + } + } - DaemonRequest::Subscribe { event_types, filter } => { + DaemonRequest::Subscribe { + event_types, + filter, + } => { // Register connection for event streaming let connection = Connection { id: connection_id,