diff --git a/core-new/examples/indexing_demo.rs b/core-new/examples/indexing_demo.rs index cdd474738..4db16bfae 100644 --- a/core-new/examples/indexing_demo.rs +++ b/core-new/examples/indexing_demo.rs @@ -31,13 +31,34 @@ async fn main() -> Result<(), Box> { println!("šŸš€ === Spacedrive 2 Desktop Indexing Demo ===\n"); - // 1. Initialize Spacedrive Core + // 1. Initialize Spacedrive Core with job logging enabled println!("1. šŸ”§ Initializing Spacedrive Core..."); let data_dir = PathBuf::from("./data/spacedrive-desktop-demo"); + + // Enable job logging by modifying the config before core initialization + { + use sd_core_new::config::{AppConfig, JobLoggingConfig}; + let mut config = AppConfig::load_from(&data_dir).unwrap_or_else(|_| { + AppConfig::default_with_dir(data_dir.clone()) + }); + + // Enable job logging - hardcoded for demo + config.job_logging = JobLoggingConfig { + enabled: true, + log_directory: "job_logs".to_string(), + max_file_size: 10 * 1024 * 1024, // 10MB + include_debug: true, // Include debug logs for full detail + }; + + config.save()?; + println!(" šŸ“ Job logging enabled to: {}", config.job_logs_dir().display()); + } + let core = Core::new_with_config(data_dir.clone()).await?; - println!(" āœ… Core initialized"); + println!(" āœ… Core initialized with job logging"); println!(" šŸ“± Device ID: {}", core.device.device_id()?); - println!(" šŸ’¾ Data directory: {:?}\n", data_dir); + println!(" šŸ’¾ Data directory: {:?}", data_dir); + println!(" šŸ“ Job logs directory: {:?}\n", data_dir.join("job_logs")); // 2. Get or create library println!("2. šŸ“š Setting up library..."); @@ -543,14 +564,60 @@ async fn main() -> Result<(), Box> { // Brief pause to see final status sleep(Duration::from_secs(2)).await; - // 9. Graceful shutdown - println!("\n9. šŸ›‘ Shutting down gracefully..."); + // 9. Show job logs created during the demo + println!("\n9. šŸ“‹ Job Logs Created:"); + let job_logs_dir = data_dir.join("job_logs"); + if let Ok(mut entries) = tokio::fs::read_dir(&job_logs_dir).await { + let mut log_files = Vec::new(); + while let Ok(Some(entry)) = entries.next_entry().await { + if let Some(name) = entry.file_name().to_str() { + if name.ends_with(".log") { + log_files.push(name.to_string()); + } + } + } + + if !log_files.is_empty() { + println!(" šŸ“ Found {} job log file(s):", log_files.len()); + for (i, log_file) in log_files.iter().enumerate() { + let log_path = job_logs_dir.join(log_file); + if let Ok(metadata) = tokio::fs::metadata(&log_path).await { + println!(" {} {} ({} bytes)", + i + 1, + log_file, + metadata.len() + ); + + // Show first few lines of the first log + if i == 0 { + if let Ok(contents) = tokio::fs::read_to_string(&log_path).await { + let lines: Vec<&str> = contents.lines().take(5).collect(); + println!("\n First {} lines of {}:", lines.len(), log_file); + for line in lines { + println!(" > {}", line); + } + if contents.lines().count() > 5 { + println!(" ... and {} more lines", contents.lines().count() - 5); + } + } + } + } + } + println!("\n šŸ’” Full logs available at: {:?}", job_logs_dir); + } else { + println!(" āš ļø No job logs found (jobs may have completed too quickly)"); + } + } + + // 10. Graceful shutdown + println!("\n10. šŸ›‘ Shutting down gracefully..."); core.shutdown().await?; println!("\nāœ… === Desktop Indexing Demo Complete! ==="); println!("šŸŽ‰ Spacedrive 2 Production Job System Working!"); println!(); println!("šŸ“ Demo data stored at: {:?}", data_dir); + println!("šŸ“ Job logs stored at: {:?}", job_logs_dir); println!("šŸ”„ Run again to see library auto-loading and job persistence!"); println!(); println!("šŸš€ Production system achievements:"); diff --git a/core-new/examples/job_logging_test.rs b/core-new/examples/job_logging_test.rs new file mode 100644 index 000000000..b187471e6 --- /dev/null +++ b/core-new/examples/job_logging_test.rs @@ -0,0 +1,164 @@ +//! Simple test for job logging functionality + +use sd_core_new::{ + config::{AppConfig, JobLoggingConfig}, + infrastructure::{database::entities, events::Event}, + location::{create_location, LocationCreateArgs}, + Core, +}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter}; +use std::path::PathBuf; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + tracing_subscriber::fmt() + .with_env_filter("sd_core_new=debug") + .init(); + + println!("šŸš€ Job Logging Test\n"); + + // 1. Initialize Core with job logging + println!("1. Setting up with job logging enabled..."); + let data_dir = PathBuf::from("./data/job-logging-test"); + + // Configure with job logging + { + let mut config = AppConfig::load_from(&data_dir).unwrap_or_else(|_| { + AppConfig::default_with_dir(data_dir.clone()) + }); + + config.job_logging = JobLoggingConfig { + enabled: true, + log_directory: "job_logs".to_string(), + max_file_size: 10 * 1024 * 1024, + include_debug: true, + }; + + config.save()?; + println!(" āœ… Job logging enabled"); + } + + let core = Core::new_with_config(data_dir.clone()).await?; + let job_logs_dir = data_dir.join("job_logs"); + println!(" šŸ“ Job logs directory: {:?}", job_logs_dir); + + // 2. Create library + println!("\n2. Creating library..."); + let library = if core.libraries.list().await.is_empty() { + core.libraries + .create_library("Test Library", None, core.context.clone()) + .await? + } else { + core.libraries.list().await.into_iter().next().unwrap() + }; + println!(" āœ… Library ready"); + + // 3. Create a small test location + println!("\n3. Creating test location..."); + let test_path = PathBuf::from("./test-data"); + + // Register device + let db = library.db(); + let device = core.device.to_device()?; + let device_record = match entities::device::Entity::find() + .filter(entities::device::Column::Uuid.eq(device.id)) + .one(db.conn()) + .await? + { + Some(existing) => existing, + None => { + let device_model: entities::device::ActiveModel = device.into(); + device_model.insert(db.conn()).await? + } + }; + + // Create location + let location_args = LocationCreateArgs { + path: test_path.clone(), + name: Some("Test Data".to_string()), + index_mode: sd_core_new::location::IndexMode::Deep, + }; + + let location_db_id = create_location( + library.clone(), + &core.events, + location_args, + device_record.id, + ) + .await?; + + println!(" āœ… Location created, job dispatched"); + + // 4. Monitor for a short time + println!("\n4. Monitoring job progress..."); + let mut event_rx = core.events.subscribe(); + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(10); + + while start.elapsed() < timeout { + tokio::select! { + Ok(event) = event_rx.recv() => { + match event { + Event::JobProgress { job_id, message, .. } => { + if let Some(msg) = message { + println!(" šŸ“Š Job {}: {}", job_id, msg); + } + } + Event::IndexingCompleted { .. } => { + println!(" āœ… Indexing completed!"); + break; + } + _ => {} + } + } + _ = sleep(Duration::from_millis(100)) => {} + } + } + + // 5. Check job logs + println!("\n5. Checking job logs..."); + if let Ok(mut entries) = tokio::fs::read_dir(&job_logs_dir).await { + let mut count = 0; + while let Ok(Some(entry)) = entries.next_entry().await { + if let Some(name) = entry.file_name().to_str() { + if name.ends_with(".log") { + count += 1; + let log_path = job_logs_dir.join(name); + if let Ok(contents) = tokio::fs::read_to_string(&log_path).await { + println!("\n šŸ“„ Log file: {}", name); + println!(" Size: {} bytes", contents.len()); + println!(" Lines: {}", contents.lines().count()); + + // Show first few lines + println!("\n First 10 lines:"); + for (i, line) in contents.lines().take(10).enumerate() { + println!(" {}: {}", i + 1, line); + } + + if contents.lines().count() > 10 { + println!(" ... {} more lines", contents.lines().count() - 10); + } + } + } + } + } + + if count == 0 { + println!(" āš ļø No job logs found"); + } else { + println!("\n āœ… Found {} job log file(s)", count); + } + } + + // 6. Shutdown + println!("\n6. Shutting down..."); + core.shutdown().await?; + + println!("\nāœ… Test complete!"); + println!("šŸ“ Data: {:?}", data_dir); + println!("šŸ“ Logs: {:?}", job_logs_dir); + + Ok(()) +} \ No newline at end of file diff --git a/core-new/spacedrive-jobs-derive/src/lib.rs b/core-new/spacedrive-jobs-derive/src/lib.rs index 422ecc605..58a69386b 100644 --- a/core-new/spacedrive-jobs-derive/src/lib.rs +++ b/core-new/spacedrive-jobs-derive/src/lib.rs @@ -72,6 +72,8 @@ pub fn derive_job(input: TokenStream) -> TokenStream { checkpoint_handler: std::sync::Arc, networking: Option>, volume_manager: Option>, + job_logging_config: Option, + job_logs_dir: Option, ) -> Box> { Box::new(crate::infrastructure::jobs::executor::JobExecutor::new( *self, @@ -84,6 +86,8 @@ pub fn derive_job(input: TokenStream) -> TokenStream { checkpoint_handler, networking, volume_manager, + job_logging_config, + job_logs_dir, )) } diff --git a/core-new/src/config/app_config.rs b/core-new/src/config/app_config.rs index f715c13ee..22651c860 100644 --- a/core-new/src/config/app_config.rs +++ b/core-new/src/config/app_config.rs @@ -28,6 +28,37 @@ pub struct AppConfig { /// User preferences pub preferences: Preferences, + + /// Job logging configuration + #[serde(default)] + pub job_logging: JobLoggingConfig, +} + +/// Configuration for job-specific logging +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobLoggingConfig { + /// Whether job logging is enabled + pub enabled: bool, + + /// Directory for job logs (relative to data_dir) + pub log_directory: String, + + /// Maximum log file size in bytes (0 = unlimited) + pub max_file_size: u64, + + /// Whether to include debug logs + pub include_debug: bool, +} + +impl Default for JobLoggingConfig { + fn default() -> Self { + Self { + enabled: false, + log_directory: "job_logs".to_string(), + max_file_size: 10 * 1024 * 1024, // 10MB default + include_debug: false, + } + } } impl AppConfig { @@ -80,6 +111,7 @@ impl AppConfig { telemetry_enabled: true, p2p: P2PConfig::default(), preferences: Preferences::default(), + job_logging: JobLoggingConfig::default(), } } @@ -105,11 +137,19 @@ impl AppConfig { self.data_dir.join("libraries") } + /// Get the path for job logs directory + pub fn job_logs_dir(&self) -> PathBuf { + self.data_dir.join(&self.job_logging.log_directory) + } + /// Ensure all required directories exist pub fn ensure_directories(&self) -> Result<()> { fs::create_dir_all(&self.data_dir)?; fs::create_dir_all(self.logs_dir())?; fs::create_dir_all(self.libraries_dir())?; + if self.job_logging.enabled { + fs::create_dir_all(self.job_logs_dir())?; + } Ok(()) } } @@ -127,7 +167,7 @@ impl Migrate for AppConfig { } fn target_version() -> u32 { - 1 // Current schema version + 2 // Updated schema version for job logging } fn migrate(&mut self) -> Result<()> { @@ -135,9 +175,15 @@ impl Migrate for AppConfig { 0 => { // Future migration from v0 to v1 would go here self.version = 1; + self.migrate() // Continue migration chain + } + 1 => { + // Migration from v1 to v2: Add job logging config + self.job_logging = JobLoggingConfig::default(); + self.version = 2; Ok(()) } - 1 => Ok(()), // Already at target version + 2 => Ok(()), // Already at target version v => Err(anyhow!("Unknown config version: {}", v)), } } diff --git a/core-new/src/config/mod.rs b/core-new/src/config/mod.rs index b2ce55625..d895dfc76 100644 --- a/core-new/src/config/mod.rs +++ b/core-new/src/config/mod.rs @@ -9,7 +9,7 @@ use std::fs; pub mod app_config; pub mod migration; -pub use app_config::AppConfig; +pub use app_config::{AppConfig, JobLoggingConfig}; pub use migration::Migrate; /// Platform-specific data directory resolution diff --git a/core-new/src/context.rs b/core-new/src/context.rs index fd4aeb107..bb656e2a0 100644 --- a/core-new/src/context.rs +++ b/core-new/src/context.rs @@ -3,12 +3,13 @@ //! Shared context providing access to core application components. use crate::{ + config::JobLoggingConfig, device::DeviceManager, infrastructure::events::EventBus, keys::library_key_manager::LibraryKeyManager, library::LibraryManager, infrastructure::actions::manager::ActionManager, services::networking::NetworkingService, volume::VolumeManager, }; -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use tokio::sync::RwLock; /// Shared context providing access to core application components. @@ -22,6 +23,9 @@ pub struct CoreContext { // This is wrapped in an RwLock to allow it to be set after initialization pub action_manager: Arc>>>, pub networking: Arc>>>, + // Job logging configuration + pub job_logging_config: Option, + pub job_logs_dir: Option, } impl CoreContext { @@ -41,8 +45,16 @@ impl CoreContext { library_key_manager, action_manager: Arc::new(RwLock::new(None)), networking: Arc::new(RwLock::new(None)), + job_logging_config: None, + job_logs_dir: None, } } + + /// Set job logging configuration + pub fn set_job_logging(&mut self, config: JobLoggingConfig, logs_dir: PathBuf) { + self.job_logging_config = Some(config); + self.job_logs_dir = Some(logs_dir); + } /// Helper method for services to get the networking service pub async fn get_networking(&self) -> Option> { diff --git a/core-new/src/infrastructure/jobs/context.rs b/core-new/src/infrastructure/jobs/context.rs index b94738213..b9d4c4cd0 100644 --- a/core-new/src/infrastructure/jobs/context.rs +++ b/core-new/src/infrastructure/jobs/context.rs @@ -25,6 +25,7 @@ pub struct JobContext<'a> { pub(crate) child_handles: Arc>>, pub(crate) networking: Option>, pub(crate) volume_manager: Option>, + pub(crate) file_logger: Option>, } impl<'a> JobContext<'a> { @@ -55,6 +56,11 @@ impl<'a> JobContext<'a> { /// Report progress pub fn progress(&self, progress: Progress) { + // Log progress messages to file if enabled + if let Some(logger) = &self.file_logger { + let _ = logger.log("PROGRESS", &progress.to_string()); + } + if let Err(e) = self.progress_tx.send(progress) { warn!("Failed to send progress update: {}", e); } @@ -62,12 +68,25 @@ impl<'a> JobContext<'a> { /// Add a warning message pub fn add_warning(&self, warning: impl Into) { - self.progress(Progress::indeterminate(format!("āš ļø {}", warning.into()))); + let msg = warning.into(); + + // Log to file if enabled + if let Some(logger) = &self.file_logger { + let _ = logger.log("WARN", &msg); + } + + self.progress(Progress::indeterminate(format!("āš ļø {}", msg))); } /// Add a non-critical error pub fn add_non_critical_error(&self, error: impl Into) { let error_msg = error.into().to_string(); + + // Log to file if enabled + if let Some(logger) = &self.file_logger { + let _ = logger.log("ERROR", &error_msg); + } + self.progress(Progress::indeterminate(format!("āŒ {}", error_msg))); // Increment error count @@ -155,7 +174,23 @@ impl<'a> JobContext<'a> { /// Log a message pub fn log(&self, message: impl Into) { - debug!(job_id = %self.id, "{}", message.into()); + let msg = message.into(); + debug!(job_id = %self.id, "{}", msg); + + // Also log to file if enabled + if let Some(logger) = &self.file_logger { + let _ = logger.log("INFO", &msg); + } + } + + /// Log a debug message + pub fn log_debug(&self, message: impl Into) { + let msg = message.into(); + debug!(job_id = %self.id, "{}", msg); + + if let Some(logger) = &self.file_logger { + let _ = logger.log("DEBUG", &msg); + } } } diff --git a/core-new/src/infrastructure/jobs/executor.rs b/core-new/src/infrastructure/jobs/executor.rs index 9453e0c0c..9b1159151 100644 --- a/core-new/src/infrastructure/jobs/executor.rs +++ b/core-new/src/infrastructure/jobs/executor.rs @@ -5,17 +5,18 @@ use super::{ database::{self, JobDb}, error::{JobError, JobResult}, handle::JobHandle, + logger::FileJobLogger, output::JobOutput, progress::Progress, traits::{Job, JobHandler, Resourceful}, types::{ErasedJob, JobId, JobMetrics, JobStatus}, }; -use crate::library::Library; +use crate::{config::JobLoggingConfig, library::Library}; use async_trait::async_trait; use sd_task_system::{ExecStatus, Interrupter, Task, TaskId}; -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use tokio::sync::{broadcast, mpsc, watch, Mutex}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, span, Level}; /// Executor that wraps a job for task system execution pub struct JobExecutor { @@ -36,6 +37,9 @@ pub struct JobExecutorState { pub networking: Option>, pub volume_manager: Option>, pub latest_progress: Arc>>, + pub job_logging_config: Option, + pub job_logs_dir: Option, + pub file_logger: Option>, } impl JobExecutor { @@ -50,7 +54,26 @@ impl JobExecutor { checkpoint_handler: Arc, networking: Option>, volume_manager: Option>, + job_logging_config: Option, + job_logs_dir: Option, ) -> Self { + // Create file logger if job logging is enabled + let file_logger = if let (Some(config), Some(logs_dir)) = (&job_logging_config, &job_logs_dir) { + let log_file = logs_dir.join(format!("{}.log", job_id)); + match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) { + Ok(logger) => { + let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, J::NAME)); + Some(Arc::new(logger)) + } + Err(e) => { + error!("Failed to create job logger: {}", e); + None + } + } + } else { + None + }; + Self { job, state: JobExecutorState { @@ -66,6 +89,9 @@ impl JobExecutor { networking, volume_manager, latest_progress: Arc::new(Mutex::new(None)), + job_logging_config, + job_logs_dir, + file_logger, }, } } @@ -115,6 +141,37 @@ impl Task for JobExecutor { } async fn run(&mut self, interrupter: &Interrupter) -> Result { + // Log job start + if let Some(logger) = &self.state.file_logger { + let _ = logger.log("INFO", &format!("Starting job {}: {}", self.state.job_id, J::NAME)); + } + + let result = self.run_inner(interrupter).await; + + // Log job completion + if let Some(logger) = &self.state.file_logger { + match &result { + Ok(ExecStatus::Done(_)) => { + let _ = logger.log("INFO", &format!("Job {} completed successfully", self.state.job_id)); + } + Ok(ExecStatus::Canceled) => { + let _ = logger.log("INFO", &format!("Job {} was cancelled", self.state.job_id)); + } + Ok(ExecStatus::Paused) => { + let _ = logger.log("INFO", &format!("Job {} was paused", self.state.job_id)); + } + Err(e) => { + let _ = logger.log("ERROR", &format!("Job {} failed: {}", self.state.job_id, e)); + } + } + } + + result + } +} + +impl JobExecutor { + async fn run_inner(&mut self, interrupter: &Interrupter) -> Result { info!("Starting job {}: {}", self.state.job_id, J::NAME); // Update status to running @@ -139,6 +196,7 @@ impl Task for JobExecutor { child_handles: Arc::new(Mutex::new(Vec::new())), networking: self.state.networking.clone(), volume_manager: self.state.volume_manager.clone(), + file_logger: self.state.file_logger.clone(), }; // Progress forwarding is handled by JobManager @@ -307,9 +365,28 @@ impl ErasedJob for JobExecutor { checkpoint_handler: std::sync::Arc, networking: Option>, volume_manager: Option>, + job_logging_config: Option, + job_logs_dir: Option, ) -> Box> { // Update the executor's state with the new parameters let mut executor = *self; + // Create file logger if job logging is enabled + let file_logger = if let (Some(config), Some(logs_dir)) = (&job_logging_config, &job_logs_dir) { + let log_file = logs_dir.join(format!("{}.log", job_id)); + match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) { + Ok(logger) => { + let _ = logger.log("INFO", &format!("Job {} starting (via create_executor)", job_id)); + Some(Arc::new(logger)) + } + Err(e) => { + error!("Failed to create job logger: {}", e); + None + } + } + } else { + None + }; + executor.state = JobExecutorState { job_id, library, @@ -323,6 +400,9 @@ impl ErasedJob for JobExecutor { networking, volume_manager, latest_progress: Arc::new(Mutex::new(None)), + job_logging_config, + job_logs_dir, + file_logger, }; Box::new(executor) diff --git a/core-new/src/infrastructure/jobs/logger.rs b/core-new/src/infrastructure/jobs/logger.rs new file mode 100644 index 000000000..d980ad170 --- /dev/null +++ b/core-new/src/infrastructure/jobs/logger.rs @@ -0,0 +1,314 @@ +//! Job-specific logging implementation +//! +//! This module provides a custom tracing subscriber that captures logs +//! for individual jobs and writes them to separate log files. + +use super::types::JobId; +use crate::config::JobLoggingConfig; +use std::{ + fs::{File, OpenOptions}, + io::{Seek, Write}, + path::PathBuf, + sync::{Arc, Mutex}, +}; +use tracing::{ + field::{Field, Visit}, + span::{Attributes, Record}, + Event, Id, Level, Metadata, Subscriber, +}; +use tracing_subscriber::{ + fmt::{ + format::{self, FormatEvent, FormatFields}, + FmtContext, FormattedFields, + }, + registry::LookupSpan, + Layer, +}; + +/// A tracing layer that writes logs to a job-specific file +pub struct JobLogLayer { + job_id: JobId, + file: Arc>, + config: JobLoggingConfig, + max_file_size: u64, + current_size: Arc>, +} + +impl JobLogLayer { + /// Create a new job log layer + pub fn new(job_id: JobId, log_path: PathBuf, config: JobLoggingConfig) -> std::io::Result { + // Create or append to the log file + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path)?; + + // Get current file size + let current_size = file.metadata()?.len(); + + Ok(Self { + job_id, + file: Arc::new(Mutex::new(file)), + max_file_size: config.max_file_size, + current_size: Arc::new(Mutex::new(current_size)), + config, + }) + } + + /// Check if this event should be logged based on job context + fn should_log(&self, metadata: &Metadata<'_>) -> bool { + // Filter by log level + if !self.config.include_debug && metadata.level() > &Level::INFO { + return false; + } + + // Always log ERROR and WARN + if metadata.level() <= &Level::WARN { + return true; + } + + // For other levels, only log if it's from job-related modules + let target = metadata.target(); + target.contains("job") || + target.contains("executor") || + target.contains("infrastructure::jobs") || + target.contains("operations") + } + + /// Write a log entry to the file + fn write_log(&self, message: String) -> std::io::Result<()> { + let mut file = self.file.lock().unwrap(); + let mut size = self.current_size.lock().unwrap(); + + // Check file size limit + if self.max_file_size > 0 && *size + message.len() as u64 > self.max_file_size { + // File too large, truncate and start fresh + file.set_len(0)?; + file.seek(std::io::SeekFrom::Start(0))?; + *size = 0; + + // Write truncation notice + let notice = format!( + "[{}] Log file truncated due to size limit\n", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f") + ); + file.write_all(notice.as_bytes())?; + *size += notice.len() as u64; + } + + // Write the log message + file.write_all(message.as_bytes())?; + file.flush()?; + *size += message.len() as u64; + + Ok(()) + } +} + +impl Layer for JobLogLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + // Check if we should log this event + if !self.should_log(event.metadata()) { + return; + } + + // Check if this event is from our job's span + let current_span = ctx.event_span(event); + if let Some(span) = current_span { + // Look for job_id field in the span or its parents + let mut found_job = false; + let mut current = Some(span); + + while let Some(span) = current { + if let Some(fields) = span.extensions().get::>() { + if fields.fields.contains(&format!("job_id={}", self.job_id)) { + found_job = true; + break; + } + } + current = span.parent(); + } + + // If this isn't from our job, skip it + if !found_job { + return; + } + } + + // Format the log message + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); + let level = event.metadata().level(); + let target = event.metadata().target(); + + // Extract the message from the event + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + let message = visitor.message; + + // Format: [timestamp] LEVEL target: message + let formatted = format!("[{}] {:5} {}: {}\n", timestamp, level, target, message); + + // Write to file + if let Err(e) = self.write_log(formatted) { + eprintln!("Failed to write to job log: {}", e); + } + } + + fn on_new_span(&self, _attrs: &Attributes<'_>, _id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) { + // We don't need to do anything special for new spans + } +} + +/// Helper to extract message from event fields +#[derive(Default)] +struct MessageVisitor { + message: String, +} + +impl Visit for MessageVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = format!("{:?}", value); + } else { + if !self.message.is_empty() { + self.message.push_str(", "); + } + self.message.push_str(&format!("{}={:?}", field.name(), value)); + } + } + + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.message = value.to_string(); + } else { + if !self.message.is_empty() { + self.message.push_str(", "); + } + self.message.push_str(&format!("{}=\"{}\"", field.name(), value)); + } + } +} + +/// A simple file-based job logger +pub struct FileJobLogger { + job_id: JobId, + file: Arc>, + config: JobLoggingConfig, +} + +impl FileJobLogger { + pub fn new(job_id: JobId, log_path: PathBuf, config: JobLoggingConfig) -> std::io::Result { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_path)?; + + Ok(Self { + job_id, + file: Arc::new(Mutex::new(file)), + config, + }) + } + + pub fn log(&self, level: &str, message: &str) -> std::io::Result<()> { + if level == "DEBUG" && !self.config.include_debug { + return Ok(()); + } + + let mut file = self.file.lock().unwrap(); + writeln!( + file, + "[{}] {} {}: {}", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"), + level, + self.job_id, + message + )?; + file.flush() + } +} + +/// Create a job-specific logger that writes to a file +pub fn create_job_logger( + job_id: JobId, + log_dir: PathBuf, + config: JobLoggingConfig, +) -> std::io::Result { + // Create log file path + let log_file = log_dir.join(format!("{}.log", job_id)); + + // Write initial log entry + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_file)?; + + writeln!( + file, + "[{}] === Job {} started ===", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"), + job_id + )?; + file.flush()?; + drop(file); + + // Create the job log layer + JobLogLayer::new(job_id, log_file, config) +} + +/// Setup job logging for async execution +pub fn setup_job_logging( + job_id: JobId, + log_dir: PathBuf, + config: JobLoggingConfig, +) -> std::io::Result> { + // Create log file path + let log_file = log_dir.join(format!("{}.log", job_id)); + + // Write initial log entry directly + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&log_file)?; + + writeln!( + file, + "[{}] === Job {} started ===", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"), + job_id + )?; + file.flush()?; + drop(file); + + // For now, we'll write logs directly in the job context + // This avoids conflicts with existing tracing subscribers + + // Return a guard that will write the final log entry + struct JobLoggingGuard { + log_file: PathBuf, + job_id: JobId, + } + + impl Drop for JobLoggingGuard { + fn drop(&mut self) { + // Write final log entry + if let Ok(mut file) = OpenOptions::new().append(true).open(&self.log_file) { + let _ = writeln!( + file, + "[{}] === Job {} finished ===", + chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"), + self.job_id + ); + let _ = file.flush(); + } + } + } + + Ok(Some(JobLoggingGuard { + log_file, + job_id, + })) +} \ No newline at end of file diff --git a/core-new/src/infrastructure/jobs/manager.rs b/core-new/src/infrastructure/jobs/manager.rs index ef25551bc..917fb43b3 100644 --- a/core-new/src/infrastructure/jobs/manager.rs +++ b/core-new/src/infrastructure/jobs/manager.rs @@ -221,6 +221,8 @@ impl JobManager { }), networking, volume_manager, + self.context.job_logging_config.clone(), + self.context.job_logs_dir.clone(), ); // Create handle @@ -458,6 +460,8 @@ impl JobManager { }), networking, volume_manager, + self.context.job_logging_config.clone(), + self.context.job_logs_dir.clone(), ); // Clone status_rx for cleanup task @@ -914,6 +918,8 @@ impl JobManager { }), networking, volume_manager, + self.context.job_logging_config.clone(), + self.context.job_logs_dir.clone(), ); // Create handle @@ -1164,6 +1170,8 @@ impl JobManager { }), networking, volume_manager, + self.context.job_logging_config.clone(), + self.context.job_logs_dir.clone(), ); // Create handle diff --git a/core-new/src/infrastructure/jobs/mod.rs b/core-new/src/infrastructure/jobs/mod.rs index 0135d002b..d0e15866b 100644 --- a/core-new/src/infrastructure/jobs/mod.rs +++ b/core-new/src/infrastructure/jobs/mod.rs @@ -8,6 +8,7 @@ pub mod error; pub mod executor; pub mod generic_progress; pub mod handle; +pub mod logger; pub mod manager; pub mod output; pub mod progress; diff --git a/core-new/src/infrastructure/jobs/types.rs b/core-new/src/infrastructure/jobs/types.rs index 4c8508258..46e52df1b 100644 --- a/core-new/src/infrastructure/jobs/types.rs +++ b/core-new/src/infrastructure/jobs/types.rs @@ -133,6 +133,8 @@ pub trait ErasedJob: Send + Sync + std::fmt::Debug + 'static { >, networking: Option>, volume_manager: Option>, + job_logging_config: Option, + job_logs_dir: Option, ) -> Box>; fn serialize_state(&self) -> Result, crate::infrastructure::jobs::error::JobError>; diff --git a/core-new/src/lib.rs b/core-new/src/lib.rs index 0c8a13fd2..1ad2307bc 100644 --- a/core-new/src/lib.rs +++ b/core-new/src/lib.rs @@ -182,13 +182,25 @@ impl Core { info!("Job types registered"); // 9. Create the context that will be shared with services - let context = Arc::new(CoreContext::new( + let mut context_inner = CoreContext::new( events.clone(), device.clone(), libraries.clone(), volumes.clone(), library_key_manager.clone(), - )); + ); + + // Set job logging configuration if enabled + let app_config = config.read().await; + if app_config.job_logging.enabled { + context_inner.set_job_logging( + app_config.job_logging.clone(), + app_config.job_logs_dir(), + ); + } + drop(app_config); + + let context = Arc::new(context_inner); // 10. Auto-load all libraries with context for job manager initialization info!("Loading existing libraries..."); diff --git a/core-new/whitepaper/spacedrive.pdf b/core-new/whitepaper/spacedrive.pdf index ea3725f7c..b14c9f138 100644 Binary files a/core-new/whitepaper/spacedrive.pdf and b/core-new/whitepaper/spacedrive.pdf differ