feat: Implement instance name validation and refactor daemon handling

- Added a new function `validate_instance_name` to ensure instance names are secure and conform to specified criteria, preventing path traversal attacks.
- Updated the CLI and daemon to utilize the new validation function when processing instance names.
- Refactored the `RpcServer` to replace the `CoreInstanceManager` with a direct `Core` instance, simplifying the architecture.
- Removed the `instance.rs` file as its functionality has been integrated into the main daemon logic.
This commit is contained in:
Jamie Pine
2025-09-19 19:14:55 -07:00
parent 294952bb54
commit b52e307e33
6 changed files with 165 additions and 273 deletions

View File

@@ -3,6 +3,23 @@ use clap::{Parser, Subcommand};
use sd_core::client::CoreClient;
use std::path::Path;
/// Validate instance name to prevent path traversal attacks
fn validate_instance_name(instance: &str) -> Result<(), String> {
if instance.is_empty() {
return Err("Instance name cannot be empty".to_string());
}
if instance.len() > 64 {
return Err("Instance name too long (max 64 characters)".to_string());
}
if !instance
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
{
return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string());
}
Ok(())
}
mod context;
mod domains;
mod ui;
@@ -170,7 +187,7 @@ async fn main() -> Result<()> {
// Validate instance name for security
if let Some(ref inst) = instance {
sd_core::infra::daemon::instance::validate_instance_name(inst)
validate_instance_name(inst)
.map_err(|e| anyhow::anyhow!("Invalid instance name: {}", e))?;
}
@@ -208,9 +225,17 @@ async fn main() -> Result<()> {
let daemon_path = current_exe.parent().unwrap().join("daemon");
let mut command = std::process::Command::new(daemon_path);
// Pass networking flag if enabled (if daemon supports it)
// Pass data directory
command.arg("--data-dir").arg(&data_dir);
// Pass instance name if specified
if let Some(ref inst) = instance {
command.arg("--instance").arg(inst);
}
// Pass networking flag if enabled
if enable_networking {
println!("Note: Networking flag passed but daemon may not support it yet");
command.arg("--enable-networking");
}
// Set working directory to current directory

View File

@@ -1,10 +1,64 @@
// use std::path::PathBuf;
use clap::Parser;
use std::path::PathBuf;
/// Validate instance name to prevent path traversal attacks
fn validate_instance_name(instance: &str) -> Result<(), String> {
if instance.is_empty() {
return Err("Instance name cannot be empty".to_string());
}
if instance.len() > 64 {
return Err("Instance name too long (max 64 characters)".to_string());
}
if !instance
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
{
return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string());
}
Ok(())
}
#[derive(Parser, Debug)]
#[command(name = "spacedrive-daemon", about = "Spacedrive daemon")]
struct Args {
/// Path to spacedrive data directory
#[arg(long)]
data_dir: Option<PathBuf>,
/// Daemon instance name
#[arg(long)]
instance: Option<String>,
/// Enable networking
#[arg(long)]
enable_networking: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Resolve default data dir and socket path
let data_dir = sd_core::config::default_data_dir()?;
let socket_path = data_dir.join("daemon/daemon.sock");
let args = Args::parse();
sd_core::infra::daemon::bootstrap::start_default_server(socket_path, data_dir, false).await
// Resolve data directory
let data_dir = args
.data_dir
.unwrap_or(sd_core::config::default_data_dir()?);
// Calculate socket path based on instance
let socket_path = if let Some(instance) = args.instance {
// Validate instance name for security
validate_instance_name(&instance)
.map_err(|e| format!("Invalid instance name: {}", e))?;
data_dir
.join("daemon")
.join(format!("daemon-{}.sock", instance))
} else {
data_dir.join("daemon/daemon.sock")
};
sd_core::infra::daemon::bootstrap::start_default_server(
socket_path,
data_dir,
args.enable_networking,
)
.await
}

View File

@@ -2,11 +2,10 @@ use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
use crate::infra::daemon::{
instance::CoreInstanceManager, rpc::RpcServer,
};
use crate::infra::daemon::rpc::RpcServer;
use crate::Core;
/// Start a default daemon server with built-in handlers and default instance
/// Start a daemon server with a single Core instance
pub async fn start_default_server(
socket_path: PathBuf,
data_dir: PathBuf,
@@ -15,28 +14,40 @@ pub async fn start_default_server(
// Initialize basic tracing with file logging first
initialize_tracing_with_file_logging(&data_dir)?;
let instances = Arc::new(CoreInstanceManager::new(
data_dir.clone(),
enable_networking,
));
// Create a single Core instance
let core = Arc::new(
Core::new_with_config(data_dir.clone())
.await
.map_err(|e| format!("Failed to create core: {}", e))?,
);
let core = if enable_networking {
Core::init_networking_shared(core)
.await
.map_err(|e| format!("Failed to initialize networking: {}", e))?
} else {
core
};
info!("Starting Spacedrive daemon");
info!("Data directory: {:?}", data_dir);
info!("Socket path: {:?}", socket_path);
info!("Networking enabled: {}", enable_networking);
let mut server = RpcServer::new(socket_path, instances.clone());
let mut server = RpcServer::new(socket_path, core.clone());
// Start the server, which will initialize the core and set up event streaming
// Start the server, which will initialize event streaming
server.start().await
}
/// Initialize tracing with file logging to {data_dir}/logs/daemon.log
fn initialize_tracing_with_file_logging(data_dir: &PathBuf) -> Result<(), Box<dyn std::error::Error>> {
fn initialize_tracing_with_file_logging(
data_dir: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
use crate::infra::event::log_emitter::LogEventLayer;
use std::sync::Once;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, fmt, EnvFilter};
use crate::infra::event::log_emitter::LogEventLayer;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
static INIT: Once = Once::new();
let mut result: Result<(), Box<dyn std::error::Error>> = Ok(());
@@ -54,29 +65,25 @@ fn initialize_tracing_with_file_logging(data_dir: &PathBuf) -> Result<(), Box<dy
.unwrap_or_else(|_| "sd_core=info,spacedrive=info".to_string());
// Create file appender that rotates daily
let file_appender = RollingFileAppender::new(
Rotation::DAILY,
logs_dir,
"daemon.log"
);
let file_appender = RollingFileAppender::new(Rotation::DAILY, logs_dir, "daemon.log");
// Set up layered subscriber with stdout, file output, and log event streaming layer
if let Err(e) = tracing_subscriber::registry()
// Set up layered subscriber with stdout, file output, and log event streaming layer
if let Err(e) = tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter)))
.with(
fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_writer(std::io::stdout)
.with_writer(std::io::stdout),
)
.with(
fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_ansi(false) // No ANSI colors in log files
.with_writer(file_appender)
.with_writer(file_appender),
)
.with(LogEventLayer::new())
.with(LogEventLayer::new())
.try_init()
{
result = Err(format!("Failed to initialize tracing: {}", e).into());
@@ -85,105 +92,3 @@ fn initialize_tracing_with_file_logging(data_dir: &PathBuf) -> Result<(), Box<dy
result
}
/// Set up log event streaming by initializing core and registering log event layer
async fn setup_log_event_streaming(instances: &Arc<CoreInstanceManager>) -> Result<(), Box<dyn std::error::Error>> {
info!("Initializing core instance for log streaming...");
// Initialize the default core instance to get access to the event bus
let core = instances.get_default().await
.map_err(|e| format!("Failed to initialize core for log streaming: {}", e))?;
info!("Core instance initialized, setting up tracing log event layer...");
// Now set up the log event layer with the core's event bus
setup_tracing_log_event_layer(core.events.clone())?;
info!("Starting test event emission...");
// Emit some test events to verify the system is working
emit_test_log_events(&core.events).await;
Ok(())
}
/// Set up the tracing log event layer to forward logs to the event bus
fn setup_tracing_log_event_layer(event_bus: Arc<crate::infra::event::EventBus>) -> Result<(), Box<dyn std::error::Error>> {
use crate::infra::event::log_emitter::LogEventLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Registry};
// We need to rebuild the subscriber with the log event layer
// This is a bit tricky since tracing is already initialized
// For now, let's just emit manual events and improve this later
// TODO: Properly integrate LogEventLayer with existing tracing subscriber
// This requires restructuring the tracing initialization to be done after core creation
Ok(())
}
/// Emit test log events to demonstrate the log streaming functionality
async fn emit_test_log_events(event_bus: &Arc<crate::infra::event::EventBus>) {
use crate::infra::event::Event;
use chrono::Utc;
info!("Emitting test log events to event bus");
// Emit a series of test log events
let events = vec![
Event::LogMessage {
timestamp: Utc::now(),
level: "INFO".to_string(),
target: "sd_core::daemon".to_string(),
message: "Spacedrive daemon started successfully".to_string(),
job_id: None,
library_id: None,
},
Event::LogMessage {
timestamp: Utc::now(),
level: "INFO".to_string(),
target: "sd_core::event".to_string(),
message: "Log event streaming initialized".to_string(),
job_id: None,
library_id: None,
},
Event::LogMessage {
timestamp: Utc::now(),
level: "DEBUG".to_string(),
target: "sd_core::rpc".to_string(),
message: "RPC server listening for connections".to_string(),
job_id: None,
library_id: None,
},
];
for (i, event) in events.into_iter().enumerate() {
info!("Emitting test event {}: {:?}", i + 1, event);
event_bus.emit(event);
// Small delay between events
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
// Emit periodic heartbeat events every 10 seconds for testing
tokio::spawn({
let event_bus = event_bus.clone();
async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
loop {
interval.tick().await;
let heartbeat_event = Event::LogMessage {
timestamp: Utc::now(),
level: "DEBUG".to_string(),
target: "sd_core::daemon".to_string(),
message: "Daemon heartbeat".to_string(),
job_id: None,
library_id: None,
};
info!("Emitting heartbeat event: {:?}", heartbeat_event);
event_bus.emit(heartbeat_event);
}
}
});
info!("Test log events setup complete");
}

View File

@@ -1,102 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::Core;
/// Validate instance name to prevent path traversal attacks
pub fn validate_instance_name(instance: &str) -> Result<(), String> {
if instance.is_empty() {
return Err("Instance name cannot be empty".to_string());
}
if instance.len() > 64 {
return Err("Instance name too long (max 64 characters)".to_string());
}
if !instance
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
{
return Err("Instance name contains invalid characters. Only alphanumeric, dash, and underscore allowed".to_string());
}
Ok(())
}
/// Manages lifecycle of Core instances (by name)
pub struct CoreInstanceManager {
instances: Arc<RwLock<HashMap<String, Arc<Core>>>>,
default_data_dir: PathBuf,
enable_networking: bool,
}
impl CoreInstanceManager {
pub fn new(default_data_dir: PathBuf, enable_networking: bool) -> Self {
Self {
instances: Arc::new(RwLock::new(HashMap::new())),
default_data_dir,
enable_networking,
}
}
/// Get or start the default instance
pub async fn get_default(&self) -> Result<Arc<Core>, String> {
self.get_or_start("default".to_string(), None).await
}
/// Get or start a named instance, optionally with a specific data_dir
pub async fn get_or_start(
&self,
name: String,
data_dir: Option<PathBuf>,
) -> Result<Arc<Core>, String> {
// Validate instance name for security
validate_instance_name(&name)?;
// Use entry API to avoid race conditions
use std::collections::hash_map::Entry;
let mut instances = self.instances.write().await;
let entry = instances.entry(name.clone());
match entry {
Entry::Occupied(existing) => {
// Instance already exists, return it
Ok(existing.get().clone())
}
Entry::Vacant(vacant) => {
// Instance doesn't exist, create it
let data_dir = data_dir.unwrap_or_else(|| self.default_data_dir.clone());
let core = Arc::new(
Core::new_with_config(data_dir)
.await
.map_err(|e| format!("Failed to create core: {}", e))?,
);
let core_with_networking = if self.enable_networking {
Core::init_networking_shared(core.clone())
.await
.map_err(|e| format!("Failed to initialize networking: {}", e))?
} else {
core
};
// Insert and return the new instance
vacant.insert(core_with_networking.clone());
Ok(core_with_networking)
}
}
}
/// Shutdown a named instance
pub async fn shutdown(&self, name: &str) -> Result<(), String> {
// Validate instance name for security
validate_instance_name(name)?;
if let Some(core) = self.instances.write().await.remove(name) {
core.shutdown()
.await
.map_err(|e| format!("Shutdown failed: {}", e))?;
}
Ok(())
}
}

View File

@@ -4,6 +4,5 @@ pub mod bootstrap;
pub mod client;
pub mod dispatch;
pub mod health;
pub mod instance;
pub mod rpc;
pub mod types;

View File

@@ -1,16 +1,16 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use crate::infra::daemon::instance::CoreInstanceManager;
use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse, EventFilter};
use crate::infra::event::{Event, EventSubscriber};
use crate::infra::event::log_emitter::set_global_log_event_bus;
use crate::infra::event::{Event, EventSubscriber};
use crate::Core;
/// Connection information for event streaming
#[derive(Debug)]
@@ -24,7 +24,7 @@ struct Connection {
/// Minimal JSON-over-UDS RPC server with event streaming support
pub struct RpcServer {
socket_path: PathBuf,
instances: Arc<CoreInstanceManager>,
core: Arc<Core>,
shutdown_tx: mpsc::Sender<()>,
shutdown_rx: mpsc::Receiver<()>,
/// Active connections for event streaming
@@ -32,11 +32,11 @@ pub struct RpcServer {
}
impl RpcServer {
pub fn new(socket_path: PathBuf, instances: Arc<CoreInstanceManager>) -> Self {
pub fn new(socket_path: PathBuf, core: Arc<Core>) -> Self {
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
Self {
socket_path,
instances,
core,
shutdown_tx,
shutdown_rx,
connections: Arc::new(RwLock::new(HashMap::new())),
@@ -63,14 +63,14 @@ impl RpcServer {
result = listener.accept() => {
match result {
Ok((stream, _addr)) => {
let instances = self.instances.clone();
let core = self.core.clone();
let shutdown_tx = self.shutdown_tx.clone();
let connections = self.connections.clone();
// Spawn task for concurrent request handling
tokio::spawn(async move {
// Convert errors to strings to ensure Send
if let Err(e) = Self::handle_connection(stream, instances, shutdown_tx, connections).await {
if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections).await {
eprintln!("Connection error: {}", e);
}
});
@@ -95,15 +95,15 @@ impl RpcServer {
/// Start the event broadcaster that forwards core events to subscribed connections
async fn start_event_broadcaster(&self) -> Result<(), Box<dyn std::error::Error>> {
let core = self.instances.get_default().await?;
let core = self.core.clone();
// Make the core's EventBus globally available to the LogEventLayer
set_global_log_event_bus(core.events.clone());
let mut event_subscriber = core.events.subscribe();
let connections = self.connections.clone();
// Optional: can emit a one-off info to prove the pipe works
tracing::info!("Log event bus registered for realtime streaming");
// Optional: can emit a one-off info to prove the pipe works
tracing::info!("Log event bus registered for realtime streaming");
tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
@@ -111,7 +111,11 @@ impl RpcServer {
// Broadcast event to all subscribed connections
for connection in connections_read.values() {
if Self::should_forward_event(&event, &connection.event_types, &connection.filter) {
if Self::should_forward_event(
&event,
&connection.event_types,
&connection.filter,
) {
// Ignore errors if connection is closed
let _ = connection.event_tx.send(event.clone());
}
@@ -189,7 +193,11 @@ impl RpcServer {
}
/// Check if an event should be forwarded to a connection based on filters
fn should_forward_event(event: &Event, event_types: &[String], filter: &Option<EventFilter>) -> bool {
fn should_forward_event(
event: &Event,
event_types: &[String],
filter: &Option<EventFilter>,
) -> bool {
// If no specific event types requested, forward all events
if event_types.is_empty() {
return true;
@@ -221,23 +229,25 @@ impl RpcServer {
// Apply additional filters if specified
if let Some(filter) = filter {
match event {
Event::JobProgress { job_id, .. } |
Event::JobStarted { job_id, .. } |
Event::JobCompleted { job_id, .. } |
Event::JobFailed { job_id, .. } |
Event::JobCancelled { job_id, .. } => {
Event::JobProgress { job_id, .. }
| Event::JobStarted { job_id, .. }
| Event::JobCompleted { job_id, .. }
| Event::JobFailed { job_id, .. }
| Event::JobCancelled { job_id, .. } => {
if let Some(filter_job_id) = &filter.job_id {
return job_id == filter_job_id;
}
}
Event::LibraryCreated { id, .. } |
Event::LibraryOpened { id, .. } |
Event::LibraryClosed { id, .. } => {
Event::LibraryCreated { id, .. }
| Event::LibraryOpened { id, .. }
| Event::LibraryClosed { id, .. } => {
if let Some(filter_library_id) = &filter.library_id {
return id == filter_library_id;
}
}
Event::LogMessage { job_id, library_id, .. } => {
Event::LogMessage {
job_id, library_id, ..
} => {
// Filter by job ID if specified
if let Some(filter_job_id) = &filter.job_id {
if let Some(log_job_id) = job_id {
@@ -266,7 +276,7 @@ impl RpcServer {
/// Handle individual client connection concurrently
async fn handle_connection(
stream: tokio::net::UnixStream,
instances: Arc<CoreInstanceManager>,
core: Arc<Core>,
shutdown_tx: mpsc::Sender<()>,
connections: Arc<RwLock<HashMap<Uuid, Connection>>>,
) -> Result<(), String> {
@@ -292,7 +302,7 @@ impl RpcServer {
if let Ok(request) = serde_json::from_str::<DaemonRequest>(&line.trim()) {
let response = Self::process_request(
request,
&instances,
&core,
&shutdown_tx,
&connections,
connection_id,
@@ -352,7 +362,7 @@ impl RpcServer {
/// Process a parsed daemon request
async fn process_request(
request: DaemonRequest,
instances: &Arc<CoreInstanceManager>,
core: &Arc<Core>,
shutdown_tx: &mpsc::Sender<()>,
connections: &Arc<RwLock<HashMap<Uuid, Connection>>>,
connection_id: Uuid,
@@ -361,23 +371,24 @@ impl RpcServer {
match request {
DaemonRequest::Ping => DaemonResponse::Pong,
DaemonRequest::Action { method, payload } => match instances.get_default().await {
Ok(core) => match core.execute_operation_by_method(&method, payload).await {
DaemonRequest::Action { method, payload } => {
match core.execute_operation_by_method(&method, payload).await {
Ok(out) => DaemonResponse::Ok(out),
Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)),
},
Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)),
},
}
}
DaemonRequest::Query { method, payload } => match instances.get_default().await {
Ok(core) => match core.execute_operation_by_method(&method, payload).await {
DaemonRequest::Query { method, payload } => {
match core.execute_operation_by_method(&method, payload).await {
Ok(out) => DaemonResponse::Ok(out),
Err(e) => DaemonResponse::Error(DaemonError::OperationFailed(e)),
},
Err(e) => DaemonResponse::Error(DaemonError::CoreUnavailable(e)),
},
}
}
DaemonRequest::Subscribe { event_types, filter } => {
DaemonRequest::Subscribe {
event_types,
filter,
} => {
// Register connection for event streaming
let connection = Connection {
id: connection_id,