feat(jobs): Implement job logging functionality

- Introduced job logging capabilities, allowing detailed tracking of job execution and progress.
- Added configuration options for enabling job logging, including log directory, maximum file size, and debug log inclusion.
- Enhanced job context to support file logging, ensuring that job-related events are recorded in separate log files.
- Updated core initialization and job management to utilize the new logging features, improving observability and debugging capabilities.

These changes aim to provide better insights into job execution, facilitating easier troubleshooting and performance monitoring within the Spacedrive system.
This commit is contained in:
Jamie Pine
2025-08-07 23:02:32 -07:00
parent 77ed9dd703
commit e8b6cbdfdd
14 changed files with 761 additions and 16 deletions

View File

@@ -31,13 +31,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 === Spacedrive 2 Desktop Indexing Demo ===\n");
// 1. Initialize Spacedrive Core
// 1. Initialize Spacedrive Core with job logging enabled
println!("1. 🔧 Initializing Spacedrive Core...");
let data_dir = PathBuf::from("./data/spacedrive-desktop-demo");
// Enable job logging by modifying the config before core initialization
{
use sd_core_new::config::{AppConfig, JobLoggingConfig};
let mut config = AppConfig::load_from(&data_dir).unwrap_or_else(|_| {
AppConfig::default_with_dir(data_dir.clone())
});
// Enable job logging - hardcoded for demo
config.job_logging = JobLoggingConfig {
enabled: true,
log_directory: "job_logs".to_string(),
max_file_size: 10 * 1024 * 1024, // 10MB
include_debug: true, // Include debug logs for full detail
};
config.save()?;
println!(" 📝 Job logging enabled to: {}", config.job_logs_dir().display());
}
let core = Core::new_with_config(data_dir.clone()).await?;
println!(" ✅ Core initialized");
println!(" ✅ Core initialized with job logging");
println!(" 📱 Device ID: {}", core.device.device_id()?);
println!(" 💾 Data directory: {:?}\n", data_dir);
println!(" 💾 Data directory: {:?}", data_dir);
println!(" 📝 Job logs directory: {:?}\n", data_dir.join("job_logs"));
// 2. Get or create library
println!("2. 📚 Setting up library...");
@@ -543,14 +564,60 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Brief pause to see final status
sleep(Duration::from_secs(2)).await;
// 9. Graceful shutdown
println!("\n9. 🛑 Shutting down gracefully...");
// 9. Show job logs created during the demo
println!("\n9. 📋 Job Logs Created:");
let job_logs_dir = data_dir.join("job_logs");
if let Ok(mut entries) = tokio::fs::read_dir(&job_logs_dir).await {
let mut log_files = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".log") {
log_files.push(name.to_string());
}
}
}
if !log_files.is_empty() {
println!(" 📝 Found {} job log file(s):", log_files.len());
for (i, log_file) in log_files.iter().enumerate() {
let log_path = job_logs_dir.join(log_file);
if let Ok(metadata) = tokio::fs::metadata(&log_path).await {
println!(" {} {} ({} bytes)",
i + 1,
log_file,
metadata.len()
);
// Show first few lines of the first log
if i == 0 {
if let Ok(contents) = tokio::fs::read_to_string(&log_path).await {
let lines: Vec<&str> = contents.lines().take(5).collect();
println!("\n First {} lines of {}:", lines.len(), log_file);
for line in lines {
println!(" > {}", line);
}
if contents.lines().count() > 5 {
println!(" ... and {} more lines", contents.lines().count() - 5);
}
}
}
}
}
println!("\n 💡 Full logs available at: {:?}", job_logs_dir);
} else {
println!(" ⚠️ No job logs found (jobs may have completed too quickly)");
}
}
// 10. Graceful shutdown
println!("\n10. 🛑 Shutting down gracefully...");
core.shutdown().await?;
println!("\n✅ === Desktop Indexing Demo Complete! ===");
println!("🎉 Spacedrive 2 Production Job System Working!");
println!();
println!("📁 Demo data stored at: {:?}", data_dir);
println!("📝 Job logs stored at: {:?}", job_logs_dir);
println!("🔄 Run again to see library auto-loading and job persistence!");
println!();
println!("🚀 Production system achievements:");

View File

@@ -0,0 +1,164 @@
//! Simple test for job logging functionality
use sd_core_new::{
config::{AppConfig, JobLoggingConfig},
infrastructure::{database::entities, events::Event},
location::{create_location, LocationCreateArgs},
Core,
};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
use std::path::PathBuf;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter("sd_core_new=debug")
.init();
println!("🚀 Job Logging Test\n");
// 1. Initialize Core with job logging
println!("1. Setting up with job logging enabled...");
let data_dir = PathBuf::from("./data/job-logging-test");
// Configure with job logging
{
let mut config = AppConfig::load_from(&data_dir).unwrap_or_else(|_| {
AppConfig::default_with_dir(data_dir.clone())
});
config.job_logging = JobLoggingConfig {
enabled: true,
log_directory: "job_logs".to_string(),
max_file_size: 10 * 1024 * 1024,
include_debug: true,
};
config.save()?;
println!(" ✅ Job logging enabled");
}
let core = Core::new_with_config(data_dir.clone()).await?;
let job_logs_dir = data_dir.join("job_logs");
println!(" 📝 Job logs directory: {:?}", job_logs_dir);
// 2. Create library
println!("\n2. Creating library...");
let library = if core.libraries.list().await.is_empty() {
core.libraries
.create_library("Test Library", None, core.context.clone())
.await?
} else {
core.libraries.list().await.into_iter().next().unwrap()
};
println!(" ✅ Library ready");
// 3. Create a small test location
println!("\n3. Creating test location...");
let test_path = PathBuf::from("./test-data");
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// Create location
let location_args = LocationCreateArgs {
path: test_path.clone(),
name: Some("Test Data".to_string()),
index_mode: sd_core_new::location::IndexMode::Deep,
};
let location_db_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
println!(" ✅ Location created, job dispatched");
// 4. Monitor for a short time
println!("\n4. Monitoring job progress...");
let mut event_rx = core.events.subscribe();
let start = std::time::Instant::now();
let timeout = Duration::from_secs(10);
while start.elapsed() < timeout {
tokio::select! {
Ok(event) = event_rx.recv() => {
match event {
Event::JobProgress { job_id, message, .. } => {
if let Some(msg) = message {
println!(" 📊 Job {}: {}", job_id, msg);
}
}
Event::IndexingCompleted { .. } => {
println!(" ✅ Indexing completed!");
break;
}
_ => {}
}
}
_ = sleep(Duration::from_millis(100)) => {}
}
}
// 5. Check job logs
println!("\n5. Checking job logs...");
if let Ok(mut entries) = tokio::fs::read_dir(&job_logs_dir).await {
let mut count = 0;
while let Ok(Some(entry)) = entries.next_entry().await {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".log") {
count += 1;
let log_path = job_logs_dir.join(name);
if let Ok(contents) = tokio::fs::read_to_string(&log_path).await {
println!("\n 📄 Log file: {}", name);
println!(" Size: {} bytes", contents.len());
println!(" Lines: {}", contents.lines().count());
// Show first few lines
println!("\n First 10 lines:");
for (i, line) in contents.lines().take(10).enumerate() {
println!(" {}: {}", i + 1, line);
}
if contents.lines().count() > 10 {
println!(" ... {} more lines", contents.lines().count() - 10);
}
}
}
}
}
if count == 0 {
println!(" ⚠️ No job logs found");
} else {
println!("\n ✅ Found {} job log file(s)", count);
}
}
// 6. Shutdown
println!("\n6. Shutting down...");
core.shutdown().await?;
println!("\n✅ Test complete!");
println!("📁 Data: {:?}", data_dir);
println!("📝 Logs: {:?}", job_logs_dir);
Ok(())
}

View File

@@ -72,6 +72,8 @@ pub fn derive_job(input: TokenStream) -> TokenStream {
checkpoint_handler: std::sync::Arc<dyn crate::infrastructure::jobs::context::CheckpointHandler>,
networking: Option<std::sync::Arc<crate::services::networking::NetworkingService>>,
volume_manager: Option<std::sync::Arc<crate::volume::VolumeManager>>,
job_logging_config: Option<crate::config::JobLoggingConfig>,
job_logs_dir: Option<std::path::PathBuf>,
) -> Box<dyn sd_task_system::Task<crate::infrastructure::jobs::error::JobError>> {
Box::new(crate::infrastructure::jobs::executor::JobExecutor::new(
*self,
@@ -84,6 +86,8 @@ pub fn derive_job(input: TokenStream) -> TokenStream {
checkpoint_handler,
networking,
volume_manager,
job_logging_config,
job_logs_dir,
))
}

View File

@@ -28,6 +28,37 @@ pub struct AppConfig {
/// User preferences
pub preferences: Preferences,
/// Job logging configuration
#[serde(default)]
pub job_logging: JobLoggingConfig,
}
/// Configuration for job-specific logging
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLoggingConfig {
/// Whether job logging is enabled
pub enabled: bool,
/// Directory for job logs (relative to data_dir)
pub log_directory: String,
/// Maximum log file size in bytes (0 = unlimited)
pub max_file_size: u64,
/// Whether to include debug logs
pub include_debug: bool,
}
impl Default for JobLoggingConfig {
fn default() -> Self {
Self {
enabled: false,
log_directory: "job_logs".to_string(),
max_file_size: 10 * 1024 * 1024, // 10MB default
include_debug: false,
}
}
}
impl AppConfig {
@@ -80,6 +111,7 @@ impl AppConfig {
telemetry_enabled: true,
p2p: P2PConfig::default(),
preferences: Preferences::default(),
job_logging: JobLoggingConfig::default(),
}
}
@@ -105,11 +137,19 @@ impl AppConfig {
self.data_dir.join("libraries")
}
/// Get the path for job logs directory
pub fn job_logs_dir(&self) -> PathBuf {
self.data_dir.join(&self.job_logging.log_directory)
}
/// Ensure all required directories exist
pub fn ensure_directories(&self) -> Result<()> {
fs::create_dir_all(&self.data_dir)?;
fs::create_dir_all(self.logs_dir())?;
fs::create_dir_all(self.libraries_dir())?;
if self.job_logging.enabled {
fs::create_dir_all(self.job_logs_dir())?;
}
Ok(())
}
}
@@ -127,7 +167,7 @@ impl Migrate for AppConfig {
}
fn target_version() -> u32 {
1 // Current schema version
2 // Updated schema version for job logging
}
fn migrate(&mut self) -> Result<()> {
@@ -135,9 +175,15 @@ impl Migrate for AppConfig {
0 => {
// Future migration from v0 to v1 would go here
self.version = 1;
self.migrate() // Continue migration chain
}
1 => {
// Migration from v1 to v2: Add job logging config
self.job_logging = JobLoggingConfig::default();
self.version = 2;
Ok(())
}
1 => Ok(()), // Already at target version
2 => Ok(()), // Already at target version
v => Err(anyhow!("Unknown config version: {}", v)),
}
}

View File

@@ -9,7 +9,7 @@ use std::fs;
pub mod app_config;
pub mod migration;
pub use app_config::AppConfig;
pub use app_config::{AppConfig, JobLoggingConfig};
pub use migration::Migrate;
/// Platform-specific data directory resolution

View File

@@ -3,12 +3,13 @@
//! Shared context providing access to core application components.
use crate::{
config::JobLoggingConfig,
device::DeviceManager, infrastructure::events::EventBus,
keys::library_key_manager::LibraryKeyManager, library::LibraryManager,
infrastructure::actions::manager::ActionManager,
services::networking::NetworkingService, volume::VolumeManager,
};
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;
/// Shared context providing access to core application components.
@@ -22,6 +23,9 @@ pub struct CoreContext {
// This is wrapped in an RwLock to allow it to be set after initialization
pub action_manager: Arc<RwLock<Option<Arc<ActionManager>>>>,
pub networking: Arc<RwLock<Option<Arc<NetworkingService>>>>,
// Job logging configuration
pub job_logging_config: Option<JobLoggingConfig>,
pub job_logs_dir: Option<PathBuf>,
}
impl CoreContext {
@@ -41,8 +45,16 @@ impl CoreContext {
library_key_manager,
action_manager: Arc::new(RwLock::new(None)),
networking: Arc::new(RwLock::new(None)),
job_logging_config: None,
job_logs_dir: None,
}
}
/// Set job logging configuration
pub fn set_job_logging(&mut self, config: JobLoggingConfig, logs_dir: PathBuf) {
self.job_logging_config = Some(config);
self.job_logs_dir = Some(logs_dir);
}
/// Helper method for services to get the networking service
pub async fn get_networking(&self) -> Option<Arc<NetworkingService>> {

View File

@@ -25,6 +25,7 @@ pub struct JobContext<'a> {
pub(crate) child_handles: Arc<Mutex<Vec<JobHandle>>>,
pub(crate) networking: Option<Arc<NetworkingService>>,
pub(crate) volume_manager: Option<Arc<crate::volume::VolumeManager>>,
pub(crate) file_logger: Option<Arc<super::logger::FileJobLogger>>,
}
impl<'a> JobContext<'a> {
@@ -55,6 +56,11 @@ impl<'a> JobContext<'a> {
/// Report progress
pub fn progress(&self, progress: Progress) {
// Log progress messages to file if enabled
if let Some(logger) = &self.file_logger {
let _ = logger.log("PROGRESS", &progress.to_string());
}
if let Err(e) = self.progress_tx.send(progress) {
warn!("Failed to send progress update: {}", e);
}
@@ -62,12 +68,25 @@ impl<'a> JobContext<'a> {
/// Add a warning message
pub fn add_warning(&self, warning: impl Into<String>) {
self.progress(Progress::indeterminate(format!("⚠️ {}", warning.into())));
let msg = warning.into();
// Log to file if enabled
if let Some(logger) = &self.file_logger {
let _ = logger.log("WARN", &msg);
}
self.progress(Progress::indeterminate(format!("⚠️ {}", msg)));
}
/// Add a non-critical error
pub fn add_non_critical_error(&self, error: impl Into<JobError>) {
let error_msg = error.into().to_string();
// Log to file if enabled
if let Some(logger) = &self.file_logger {
let _ = logger.log("ERROR", &error_msg);
}
self.progress(Progress::indeterminate(format!("{}", error_msg)));
// Increment error count
@@ -155,7 +174,23 @@ impl<'a> JobContext<'a> {
/// Log a message
pub fn log(&self, message: impl Into<String>) {
debug!(job_id = %self.id, "{}", message.into());
let msg = message.into();
debug!(job_id = %self.id, "{}", msg);
// Also log to file if enabled
if let Some(logger) = &self.file_logger {
let _ = logger.log("INFO", &msg);
}
}
/// Log a debug message
pub fn log_debug(&self, message: impl Into<String>) {
let msg = message.into();
debug!(job_id = %self.id, "{}", msg);
if let Some(logger) = &self.file_logger {
let _ = logger.log("DEBUG", &msg);
}
}
}

View File

@@ -5,17 +5,18 @@ use super::{
database::{self, JobDb},
error::{JobError, JobResult},
handle::JobHandle,
logger::FileJobLogger,
output::JobOutput,
progress::Progress,
traits::{Job, JobHandler, Resourceful},
types::{ErasedJob, JobId, JobMetrics, JobStatus},
};
use crate::library::Library;
use crate::{config::JobLoggingConfig, library::Library};
use async_trait::async_trait;
use sd_task_system::{ExecStatus, Interrupter, Task, TaskId};
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::{broadcast, mpsc, watch, Mutex};
use tracing::{debug, error, info};
use tracing::{debug, error, info, span, Level};
/// Executor that wraps a job for task system execution
pub struct JobExecutor<J: JobHandler> {
@@ -36,6 +37,9 @@ pub struct JobExecutorState {
pub networking: Option<Arc<crate::services::networking::NetworkingService>>,
pub volume_manager: Option<Arc<crate::volume::VolumeManager>>,
pub latest_progress: Arc<Mutex<Option<Progress>>>,
pub job_logging_config: Option<JobLoggingConfig>,
pub job_logs_dir: Option<PathBuf>,
pub file_logger: Option<Arc<super::logger::FileJobLogger>>,
}
impl<J: JobHandler> JobExecutor<J> {
@@ -50,7 +54,26 @@ impl<J: JobHandler> JobExecutor<J> {
checkpoint_handler: Arc<dyn CheckpointHandler>,
networking: Option<Arc<crate::services::networking::NetworkingService>>,
volume_manager: Option<Arc<crate::volume::VolumeManager>>,
job_logging_config: Option<JobLoggingConfig>,
job_logs_dir: Option<PathBuf>,
) -> Self {
// Create file logger if job logging is enabled
let file_logger = if let (Some(config), Some(logs_dir)) = (&job_logging_config, &job_logs_dir) {
let log_file = logs_dir.join(format!("{}.log", job_id));
match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) {
Ok(logger) => {
let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, J::NAME));
Some(Arc::new(logger))
}
Err(e) => {
error!("Failed to create job logger: {}", e);
None
}
}
} else {
None
};
Self {
job,
state: JobExecutorState {
@@ -66,6 +89,9 @@ impl<J: JobHandler> JobExecutor<J> {
networking,
volume_manager,
latest_progress: Arc::new(Mutex::new(None)),
job_logging_config,
job_logs_dir,
file_logger,
},
}
}
@@ -115,6 +141,37 @@ impl<J: JobHandler> Task<JobError> for JobExecutor<J> {
}
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, JobError> {
// Log job start
if let Some(logger) = &self.state.file_logger {
let _ = logger.log("INFO", &format!("Starting job {}: {}", self.state.job_id, J::NAME));
}
let result = self.run_inner(interrupter).await;
// Log job completion
if let Some(logger) = &self.state.file_logger {
match &result {
Ok(ExecStatus::Done(_)) => {
let _ = logger.log("INFO", &format!("Job {} completed successfully", self.state.job_id));
}
Ok(ExecStatus::Canceled) => {
let _ = logger.log("INFO", &format!("Job {} was cancelled", self.state.job_id));
}
Ok(ExecStatus::Paused) => {
let _ = logger.log("INFO", &format!("Job {} was paused", self.state.job_id));
}
Err(e) => {
let _ = logger.log("ERROR", &format!("Job {} failed: {}", self.state.job_id, e));
}
}
}
result
}
}
impl<J: JobHandler> JobExecutor<J> {
async fn run_inner(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, JobError> {
info!("Starting job {}: {}", self.state.job_id, J::NAME);
// Update status to running
@@ -139,6 +196,7 @@ impl<J: JobHandler> Task<JobError> for JobExecutor<J> {
child_handles: Arc::new(Mutex::new(Vec::new())),
networking: self.state.networking.clone(),
volume_manager: self.state.volume_manager.clone(),
file_logger: self.state.file_logger.clone(),
};
// Progress forwarding is handled by JobManager
@@ -307,9 +365,28 @@ impl<J: JobHandler + std::fmt::Debug> ErasedJob for JobExecutor<J> {
checkpoint_handler: std::sync::Arc<dyn CheckpointHandler>,
networking: Option<std::sync::Arc<crate::services::networking::NetworkingService>>,
volume_manager: Option<std::sync::Arc<crate::volume::VolumeManager>>,
job_logging_config: Option<crate::config::JobLoggingConfig>,
job_logs_dir: Option<std::path::PathBuf>,
) -> Box<dyn sd_task_system::Task<JobError>> {
// Update the executor's state with the new parameters
let mut executor = *self;
// Create file logger if job logging is enabled
let file_logger = if let (Some(config), Some(logs_dir)) = (&job_logging_config, &job_logs_dir) {
let log_file = logs_dir.join(format!("{}.log", job_id));
match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) {
Ok(logger) => {
let _ = logger.log("INFO", &format!("Job {} starting (via create_executor)", job_id));
Some(Arc::new(logger))
}
Err(e) => {
error!("Failed to create job logger: {}", e);
None
}
}
} else {
None
};
executor.state = JobExecutorState {
job_id,
library,
@@ -323,6 +400,9 @@ impl<J: JobHandler + std::fmt::Debug> ErasedJob for JobExecutor<J> {
networking,
volume_manager,
latest_progress: Arc::new(Mutex::new(None)),
job_logging_config,
job_logs_dir,
file_logger,
};
Box::new(executor)

View File

@@ -0,0 +1,314 @@
//! Job-specific logging implementation
//!
//! This module provides a custom tracing subscriber that captures logs
//! for individual jobs and writes them to separate log files.
use super::types::JobId;
use crate::config::JobLoggingConfig;
use std::{
fs::{File, OpenOptions},
io::{Seek, Write},
path::PathBuf,
sync::{Arc, Mutex},
};
use tracing::{
field::{Field, Visit},
span::{Attributes, Record},
Event, Id, Level, Metadata, Subscriber,
};
use tracing_subscriber::{
fmt::{
format::{self, FormatEvent, FormatFields},
FmtContext, FormattedFields,
},
registry::LookupSpan,
Layer,
};
/// A tracing layer that writes logs to a job-specific file
pub struct JobLogLayer {
job_id: JobId,
file: Arc<Mutex<File>>,
config: JobLoggingConfig,
max_file_size: u64,
current_size: Arc<Mutex<u64>>,
}
impl JobLogLayer {
/// Create a new job log layer
pub fn new(job_id: JobId, log_path: PathBuf, config: JobLoggingConfig) -> std::io::Result<Self> {
// Create or append to the log file
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)?;
// Get current file size
let current_size = file.metadata()?.len();
Ok(Self {
job_id,
file: Arc::new(Mutex::new(file)),
max_file_size: config.max_file_size,
current_size: Arc::new(Mutex::new(current_size)),
config,
})
}
/// Check if this event should be logged based on job context
fn should_log(&self, metadata: &Metadata<'_>) -> bool {
// Filter by log level
if !self.config.include_debug && metadata.level() > &Level::INFO {
return false;
}
// Always log ERROR and WARN
if metadata.level() <= &Level::WARN {
return true;
}
// For other levels, only log if it's from job-related modules
let target = metadata.target();
target.contains("job") ||
target.contains("executor") ||
target.contains("infrastructure::jobs") ||
target.contains("operations")
}
/// Write a log entry to the file
fn write_log(&self, message: String) -> std::io::Result<()> {
let mut file = self.file.lock().unwrap();
let mut size = self.current_size.lock().unwrap();
// Check file size limit
if self.max_file_size > 0 && *size + message.len() as u64 > self.max_file_size {
// File too large, truncate and start fresh
file.set_len(0)?;
file.seek(std::io::SeekFrom::Start(0))?;
*size = 0;
// Write truncation notice
let notice = format!(
"[{}] Log file truncated due to size limit\n",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f")
);
file.write_all(notice.as_bytes())?;
*size += notice.len() as u64;
}
// Write the log message
file.write_all(message.as_bytes())?;
file.flush()?;
*size += message.len() as u64;
Ok(())
}
}
impl<S> Layer<S> for JobLogLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
// Check if we should log this event
if !self.should_log(event.metadata()) {
return;
}
// Check if this event is from our job's span
let current_span = ctx.event_span(event);
if let Some(span) = current_span {
// Look for job_id field in the span or its parents
let mut found_job = false;
let mut current = Some(span);
while let Some(span) = current {
if let Some(fields) = span.extensions().get::<FormattedFields<format::DefaultFields>>() {
if fields.fields.contains(&format!("job_id={}", self.job_id)) {
found_job = true;
break;
}
}
current = span.parent();
}
// If this isn't from our job, skip it
if !found_job {
return;
}
}
// Format the log message
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
let level = event.metadata().level();
let target = event.metadata().target();
// Extract the message from the event
let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
let message = visitor.message;
// Format: [timestamp] LEVEL target: message
let formatted = format!("[{}] {:5} {}: {}\n", timestamp, level, target, message);
// Write to file
if let Err(e) = self.write_log(formatted) {
eprintln!("Failed to write to job log: {}", e);
}
}
fn on_new_span(&self, _attrs: &Attributes<'_>, _id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
// We don't need to do anything special for new spans
}
}
/// Helper to extract message from event fields
#[derive(Default)]
struct MessageVisitor {
message: String,
}
impl Visit for MessageVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value);
} else {
if !self.message.is_empty() {
self.message.push_str(", ");
}
self.message.push_str(&format!("{}={:?}", field.name(), value));
}
}
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "message" {
self.message = value.to_string();
} else {
if !self.message.is_empty() {
self.message.push_str(", ");
}
self.message.push_str(&format!("{}=\"{}\"", field.name(), value));
}
}
}
/// A simple file-based job logger
pub struct FileJobLogger {
job_id: JobId,
file: Arc<Mutex<File>>,
config: JobLoggingConfig,
}
impl FileJobLogger {
pub fn new(job_id: JobId, log_path: PathBuf, config: JobLoggingConfig) -> std::io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)?;
Ok(Self {
job_id,
file: Arc::new(Mutex::new(file)),
config,
})
}
pub fn log(&self, level: &str, message: &str) -> std::io::Result<()> {
if level == "DEBUG" && !self.config.include_debug {
return Ok(());
}
let mut file = self.file.lock().unwrap();
writeln!(
file,
"[{}] {} {}: {}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
level,
self.job_id,
message
)?;
file.flush()
}
}
/// Create a job-specific logger that writes to a file
pub fn create_job_logger(
job_id: JobId,
log_dir: PathBuf,
config: JobLoggingConfig,
) -> std::io::Result<JobLogLayer> {
// Create log file path
let log_file = log_dir.join(format!("{}.log", job_id));
// Write initial log entry
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_file)?;
writeln!(
file,
"[{}] === Job {} started ===",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
job_id
)?;
file.flush()?;
drop(file);
// Create the job log layer
JobLogLayer::new(job_id, log_file, config)
}
/// Setup job logging for async execution
pub fn setup_job_logging(
job_id: JobId,
log_dir: PathBuf,
config: JobLoggingConfig,
) -> std::io::Result<Option<impl Drop>> {
// Create log file path
let log_file = log_dir.join(format!("{}.log", job_id));
// Write initial log entry directly
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_file)?;
writeln!(
file,
"[{}] === Job {} started ===",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
job_id
)?;
file.flush()?;
drop(file);
// For now, we'll write logs directly in the job context
// This avoids conflicts with existing tracing subscribers
// Return a guard that will write the final log entry
struct JobLoggingGuard {
log_file: PathBuf,
job_id: JobId,
}
impl Drop for JobLoggingGuard {
fn drop(&mut self) {
// Write final log entry
if let Ok(mut file) = OpenOptions::new().append(true).open(&self.log_file) {
let _ = writeln!(
file,
"[{}] === Job {} finished ===",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
self.job_id
);
let _ = file.flush();
}
}
}
Ok(Some(JobLoggingGuard {
log_file,
job_id,
}))
}

View File

@@ -221,6 +221,8 @@ impl JobManager {
}),
networking,
volume_manager,
self.context.job_logging_config.clone(),
self.context.job_logs_dir.clone(),
);
// Create handle
@@ -458,6 +460,8 @@ impl JobManager {
}),
networking,
volume_manager,
self.context.job_logging_config.clone(),
self.context.job_logs_dir.clone(),
);
// Clone status_rx for cleanup task
@@ -914,6 +918,8 @@ impl JobManager {
}),
networking,
volume_manager,
self.context.job_logging_config.clone(),
self.context.job_logs_dir.clone(),
);
// Create handle
@@ -1164,6 +1170,8 @@ impl JobManager {
}),
networking,
volume_manager,
self.context.job_logging_config.clone(),
self.context.job_logs_dir.clone(),
);
// Create handle

View File

@@ -8,6 +8,7 @@ pub mod error;
pub mod executor;
pub mod generic_progress;
pub mod handle;
pub mod logger;
pub mod manager;
pub mod output;
pub mod progress;

View File

@@ -133,6 +133,8 @@ pub trait ErasedJob: Send + Sync + std::fmt::Debug + 'static {
>,
networking: Option<std::sync::Arc<crate::services::networking::NetworkingService>>,
volume_manager: Option<std::sync::Arc<crate::volume::VolumeManager>>,
job_logging_config: Option<crate::config::JobLoggingConfig>,
job_logs_dir: Option<std::path::PathBuf>,
) -> Box<dyn sd_task_system::Task<crate::infrastructure::jobs::error::JobError>>;
fn serialize_state(&self) -> Result<Vec<u8>, crate::infrastructure::jobs::error::JobError>;

View File

@@ -182,13 +182,25 @@ impl Core {
info!("Job types registered");
// 9. Create the context that will be shared with services
let context = Arc::new(CoreContext::new(
let mut context_inner = CoreContext::new(
events.clone(),
device.clone(),
libraries.clone(),
volumes.clone(),
library_key_manager.clone(),
));
);
// Set job logging configuration if enabled
let app_config = config.read().await;
if app_config.job_logging.enabled {
context_inner.set_job_logging(
app_config.job_logging.clone(),
app_config.job_logs_dir(),
);
}
drop(app_config);
let context = Arc::new(context_inner);
// 10. Auto-load all libraries with context for job manager initialization
info!("Loading existing libraries...");

View File

Binary file not shown.