diff --git a/core/services/agentpool/agent_jobs.go b/core/services/agentpool/agent_jobs.go index e552549ee..925436af6 100644 --- a/core/services/agentpool/agent_jobs.go +++ b/core/services/agentpool/agent_jobs.go @@ -11,7 +11,6 @@ import ( "io" "net" "net/http" - "os" "path/filepath" "slices" "strings" @@ -46,8 +45,6 @@ type AgentJobService struct { tasks *xsync.SyncedMap[string, schema.Task] jobs *xsync.SyncedMap[string, schema.Job] persister JobPersister - tasksFile string // Path to agent_tasks.json (kept for backward compat) - jobsFile string // Path to agent_jobs.json (kept for backward compat) userID string // Scoping: empty for global (main service), set for per-user instances // Job execution channel @@ -70,9 +67,6 @@ type AgentJobService struct { // Service lifecycle ctx context.Context cancel context.CancelFunc - - // Mutex for file operations - fileMutex sync.Mutex } // DistributedDispatcher is the interface for distributed job dispatching via NATS. @@ -220,8 +214,6 @@ func NewAgentJobServiceWithPaths( tasksFile: tasksFile, jobsFile: jobsFile, }, - tasksFile: tasksFile, - jobsFile: jobsFile, jobQueue: make(chan JobExecution, 100), // Buffer for 100 jobs cancellations: xsync.NewSyncedMap[string, context.CancelFunc](), cronScheduler: cron.New(), // Support seconds in cron @@ -230,127 +222,51 @@ func NewAgentJobServiceWithPaths( } } -// LoadTasksFromFile loads tasks from agent_tasks.json +// LoadTasksFromFile loads tasks from the persister into the in-memory map +// and schedules cron entries. Named "FromFile" for backward compat; in DB +// mode it loads from the database. func (s *AgentJobService) LoadTasksFromFile() error { - if s.tasksFile == "" { - return nil // No file path configured - } - - s.fileMutex.Lock() - defer s.fileMutex.Unlock() - - if _, err := os.Stat(s.tasksFile); os.IsNotExist(err) { - xlog.Debug("agent_tasks.json not found, starting with empty tasks") - return nil - } - - fileContent, err := os.ReadFile(s.tasksFile) + tasks, err := s.persister.LoadTasks(s.userID) if err != nil { - return fmt.Errorf("failed to read tasks file: %w", err) + return err } - - var tasksFile schema.TasksFile - if err := json.Unmarshal(fileContent, &tasksFile); err != nil { - return fmt.Errorf("failed to parse tasks file: %w", err) - } - - for _, task := range tasksFile.Tasks { + for _, task := range tasks { s.tasks.Set(task.ID, task) - // Schedule cron if enabled and has cron expression if task.Enabled && task.Cron != "" { if err := s.ScheduleCronTask(task); err != nil { xlog.Warn("Failed to schedule cron task on load", "error", err, "task_id", task.ID) } } } - - xlog.Info("Loaded tasks from file", "count", len(tasksFile.Tasks)) - return nil } -// SaveTasksToFile saves tasks to agent_tasks.json +// SaveTasksToFile flushes the current tasks map via the persister. File +// persister bulk-writes the JSON file atomically; DB persister no-ops +// because per-task SaveTask calls already wrote through. func (s *AgentJobService) SaveTasksToFile() error { - if s.tasksFile == "" { - return nil // No file path configured - } - - s.fileMutex.Lock() - defer s.fileMutex.Unlock() - - tasksFile := schema.TasksFile{ - Tasks: s.tasks.Values(), - } - - fileContent, err := json.MarshalIndent(tasksFile, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal tasks: %w", err) - } - - if err := os.WriteFile(s.tasksFile, fileContent, 0600); err != nil { - return fmt.Errorf("failed to write tasks file: %w", err) - } - - return nil + return s.persister.FlushTasks() } -// LoadJobsFromFile loads jobs from agent_jobs.json +// LoadJobsFromFile loads jobs from the persister into the in-memory map. +// Named "FromFile" for backward compat; in DB mode it loads from the +// database. func (s *AgentJobService) LoadJobsFromFile() error { - if s.jobsFile == "" { - return nil // No file path configured - } - - s.fileMutex.Lock() - defer s.fileMutex.Unlock() - - if _, err := os.Stat(s.jobsFile); os.IsNotExist(err) { - xlog.Debug("agent_jobs.json not found, starting with empty jobs") - return nil - } - - fileContent, err := os.ReadFile(s.jobsFile) + jobs, err := s.persister.LoadJobs(s.userID) if err != nil { - return fmt.Errorf("failed to read jobs file: %w", err) + return err } - - var jobsFile schema.JobsFile - if err := json.Unmarshal(fileContent, &jobsFile); err != nil { - return fmt.Errorf("failed to parse jobs file: %w", err) - } - - // Load jobs into memory - for _, job := range jobsFile.Jobs { + for _, job := range jobs { s.jobs.Set(job.ID, job) } - - xlog.Info("Loaded jobs from file", "count", len(jobsFile.Jobs)) return nil } -// SaveJobsToFile saves jobs to agent_jobs.json +// SaveJobsToFile flushes the current jobs map via the persister. File +// persister bulk-writes the JSON file atomically; DB persister no-ops +// because per-job SaveJob calls already wrote through. func (s *AgentJobService) SaveJobsToFile() error { - if s.jobsFile == "" { - return nil // No file path configured - } - - s.fileMutex.Lock() - defer s.fileMutex.Unlock() - - jobsFile := schema.JobsFile{ - Jobs: s.jobs.Values(), - LastCleanup: time.Now(), - } - - fileContent, err := json.MarshalIndent(jobsFile, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal jobs: %w", err) - } - - if err := os.WriteFile(s.jobsFile, fileContent, 0600); err != nil { - return fmt.Errorf("failed to write jobs file: %w", err) - } - - return nil + return s.persister.FlushJobs() } // CreateTask creates a new task diff --git a/core/services/agentpool/agent_jobs_test.go b/core/services/agentpool/agent_jobs_test.go index 1d94b77b3..966da699a 100644 --- a/core/services/agentpool/agent_jobs_test.go +++ b/core/services/agentpool/agent_jobs_test.go @@ -4,6 +4,7 @@ import ( "context" "os" "path/filepath" + "sync" "time" "github.com/mudler/LocalAI/core/config" @@ -281,6 +282,71 @@ var _ = Describe("AgentJobService", func() { Expect(err).NotTo(HaveOccurred()) Expect(retrieved.TaskID).To(Equal(taskID)) }) + + It("does not surface a partial file when saves and loads race", func() { + // Regression for the macOS-only CI flake where a concurrent + // LoadJobsFromFile landed between os.WriteFile's open(O_TRUNC) + // and write, yielding "unexpected end of JSON input" at offset 0. + // Atomic temp+rename in the persister eliminates the window. + task := schema.Task{ + Name: "Race Task", + Model: "test-model", + Prompt: "Test prompt", + Enabled: true, + } + + taskID, err := service.CreateTask(task) + Expect(err).NotTo(HaveOccurred()) + + _, err = service.ExecuteJob(taskID, map[string]string{}, "test", nil) + Expect(err).NotTo(HaveOccurred()) + Expect(service.SaveJobsToFile()).To(Succeed()) + + newService := agentpool.NewAgentJobService( + appConfig, + modelLoader, + configLoader, + evaluator, + ) + + var wg sync.WaitGroup + deadline := time.Now().Add(500 * time.Millisecond) + readerErrs := make(chan error, 1024) + + for range 4 { + wg.Add(1) + go func() { + defer wg.Done() + for time.Now().Before(deadline) { + _ = service.SaveJobsToFile() + } + }() + } + + for range 4 { + wg.Add(1) + go func() { + defer wg.Done() + for time.Now().Before(deadline) { + if err := newService.LoadJobsFromFile(); err != nil { + readerErrs <- err + return + } + } + }() + } + + wg.Wait() + close(readerErrs) + + var firstErr error + for err := range readerErrs { + if firstErr == nil { + firstErr = err + } + } + Expect(firstErr).NotTo(HaveOccurred(), "concurrent load saw a partial/empty file") + }) }) Describe("Prompt templating", func() { diff --git a/core/services/agentpool/job_persister.go b/core/services/agentpool/job_persister.go index bb9d31148..1f36f8a7b 100644 --- a/core/services/agentpool/job_persister.go +++ b/core/services/agentpool/job_persister.go @@ -16,6 +16,12 @@ type JobPersister interface { SaveJob(userID string, job schema.Job) error DeleteJob(jobID string) error + // Bulk flush of the current in-memory state. File-backed persister + // rewrites the whole JSON file; DB-backed persister no-ops because + // SaveTask/SaveJob are already write-through. + FlushTasks() error + FlushJobs() error + // Authoritative reads — DB returns fresh data; file returns nil, nil GetJob(jobID string) (*schema.Job, error) ListJobs(userID, taskID, status string, limit int) ([]schema.Job, error) diff --git a/core/services/agentpool/job_persister_db.go b/core/services/agentpool/job_persister_db.go index 000d9cb0f..643b80e69 100644 --- a/core/services/agentpool/job_persister_db.go +++ b/core/services/agentpool/job_persister_db.go @@ -32,6 +32,12 @@ func (p *dbJobPersister) DeleteJob(jobID string) error { return p.store.DeleteJob(jobID) } +// FlushTasks is a no-op: SaveTask already writes through to the DB. +func (p *dbJobPersister) FlushTasks() error { return nil } + +// FlushJobs is a no-op: SaveJob already writes through to the DB. +func (p *dbJobPersister) FlushJobs() error { return nil } + func (p *dbJobPersister) GetJob(jobID string) (*schema.Job, error) { rec, err := p.store.GetJob(jobID) if err != nil { diff --git a/core/services/agentpool/job_persister_file.go b/core/services/agentpool/job_persister_file.go index 5dea9da71..3087a2524 100644 --- a/core/services/agentpool/job_persister_file.go +++ b/core/services/agentpool/job_persister_file.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "sync" "time" @@ -41,6 +42,14 @@ func (p *fileJobPersister) DeleteJob(_ string) error { return p.saveJobsToFile() } +func (p *fileJobPersister) FlushTasks() error { + return p.saveTasksToFile() +} + +func (p *fileJobPersister) FlushJobs() error { + return p.saveJobsToFile() +} + // GetJob returns nil — file persister has no authoritative reads. func (p *fileJobPersister) GetJob(_ string) (*schema.Job, error) { return nil, nil @@ -127,7 +136,7 @@ func (p *fileJobPersister) saveTasksToFile() error { return fmt.Errorf("failed to marshal tasks: %w", err) } - return os.WriteFile(p.tasksFile, data, 0600) + return writeFileAtomic(p.tasksFile, data, 0600) } // saveJobsToFile serializes the entire jobs map to the JSON file. @@ -149,5 +158,45 @@ func (p *fileJobPersister) saveJobsToFile() error { return fmt.Errorf("failed to marshal jobs: %w", err) } - return os.WriteFile(p.jobsFile, data, 0600) + return writeFileAtomic(p.jobsFile, data, 0600) +} + +// writeFileAtomic writes data to path via a same-directory temp file + rename. +// os.WriteFile opens with O_TRUNC, so a concurrent reader can land between the +// truncate and the write and see an empty file ("unexpected end of JSON input"). +// rename(2) is atomic on POSIX, so readers see either the prior contents or the +// new contents and never a zero-byte window. +func writeFileAtomic(path string, data []byte, perm os.FileMode) error { + dir := filepath.Dir(path) + tmp, err := os.CreateTemp(dir, filepath.Base(path)+".tmp-*") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tmpPath := tmp.Name() + removeTmp := func() { _ = os.Remove(tmpPath) } + + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + removeTmp() + return fmt.Errorf("failed to write temp file: %w", err) + } + if err := tmp.Chmod(perm); err != nil { + _ = tmp.Close() + removeTmp() + return fmt.Errorf("failed to chmod temp file: %w", err) + } + if err := tmp.Sync(); err != nil { + _ = tmp.Close() + removeTmp() + return fmt.Errorf("failed to sync temp file: %w", err) + } + if err := tmp.Close(); err != nil { + removeTmp() + return fmt.Errorf("failed to close temp file: %w", err) + } + if err := os.Rename(tmpPath, path); err != nil { + removeTmp() + return fmt.Errorf("failed to rename temp file: %w", err) + } + return nil }