mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-16 12:38:01 -04:00
fix(agentpool): close truncate-then-read race in agent_jobs.json persistence (#9811)
* fix(agentpool): close truncate-then-read race in agent_jobs.json persistence
Three call sites wrote and read agent_jobs.json (and agent_tasks.json)
through three independent mutexes:
- AgentJobService.ExecuteJob spawns go saveJobs(job) -> fileJobPersister
holding p.mu
- AgentJobService.SaveJobsToFile holding service.fileMutex
- AgentJobService.LoadJobsFromFile on a separate service instance holding
a different service.fileMutex
Nothing serialized those mutexes, and both writers used os.WriteFile, which
opens O_TRUNC. A reader landing between the truncate and the write saw a
zero-byte file and surfaced as `unexpected end of JSON input` at offset 0.
The macOS tests-apple job started hitting this consistently once the path
filter was removed from .github/workflows/test.yml and the file-mode race
test ran on every push (run 25823124797 was the first observed failure).
Two changes close the window:
1. fileJobPersister.saveTasksToFile / saveJobsToFile now write to a
same-directory temp file and os.Rename to the final path. rename(2) is
atomic on POSIX, so concurrent readers see either the prior contents or
the new contents and never a zero-byte window. The helper Syncs before
close so a crash mid-write leaves either the old file intact or the temp
behind (cleaned up on next save).
2. AgentJobService.{Load,Save}{Tasks,Jobs}{FromFile,ToFile} are collapsed
to thin wrappers around fileJobPersister, removing the duplicate write
path and the redundant service.fileMutex / service.tasksFile /
service.jobsFile fields. Within a single service all task/job I/O now
serializes on the persister's mutex; the atomic rename handles the
cross-instance case the tests exercise.
Adds a regression test that hammers SaveJobsToFile and LoadJobsFromFile
concurrently for 500ms across two service instances on the same paths.
On master this reproduces `unexpected end of JSON input` on Linux within
~500ms; with the fix the suite ran -until-it-fails for 30s (54 attempts,
all green).
Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
* refactor(agentpool): route service flush/load through JobPersister interface
The first cut of the race fix made AgentJobService.{Save,Load}{Tasks,Jobs}*
type-assert s.persister to *fileJobPersister so they could reach the
unexported saveTasksToFile / saveJobsToFile helpers. That defeats the
JobPersister interface: the service is back to reasoning about a concrete
implementation instead of an abstraction.
Promote the bulk-flush operations to the interface as FlushTasks / FlushJobs:
- fileJobPersister.FlushTasks/FlushJobs call the existing private helpers
(atomic temp+rename writes from the prior commit).
- dbJobPersister.FlushTasks/FlushJobs are no-ops because SaveTask/SaveJob
are already write-through to the database.
The service's four file-named methods now talk only to the interface:
LoadTasks/LoadJobs read through s.persister.LoadTasks/LoadJobs, and the
Save side calls FlushTasks/FlushJobs. The "FromFile"/"ToFile" suffixes
stay for backward compat with user_services.go and the existing tests,
but they no longer claim a file-only contract.
Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -11,7 +11,6 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -46,8 +45,6 @@ type AgentJobService struct {
|
||||
tasks *xsync.SyncedMap[string, schema.Task]
|
||||
jobs *xsync.SyncedMap[string, schema.Job]
|
||||
persister JobPersister
|
||||
tasksFile string // Path to agent_tasks.json (kept for backward compat)
|
||||
jobsFile string // Path to agent_jobs.json (kept for backward compat)
|
||||
userID string // Scoping: empty for global (main service), set for per-user instances
|
||||
|
||||
// Job execution channel
|
||||
@@ -70,9 +67,6 @@ type AgentJobService struct {
|
||||
// Service lifecycle
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Mutex for file operations
|
||||
fileMutex sync.Mutex
|
||||
}
|
||||
|
||||
// DistributedDispatcher is the interface for distributed job dispatching via NATS.
|
||||
@@ -220,8 +214,6 @@ func NewAgentJobServiceWithPaths(
|
||||
tasksFile: tasksFile,
|
||||
jobsFile: jobsFile,
|
||||
},
|
||||
tasksFile: tasksFile,
|
||||
jobsFile: jobsFile,
|
||||
jobQueue: make(chan JobExecution, 100), // Buffer for 100 jobs
|
||||
cancellations: xsync.NewSyncedMap[string, context.CancelFunc](),
|
||||
cronScheduler: cron.New(), // Support seconds in cron
|
||||
@@ -230,127 +222,51 @@ func NewAgentJobServiceWithPaths(
|
||||
}
|
||||
}
|
||||
|
||||
// LoadTasksFromFile loads tasks from agent_tasks.json
|
||||
// LoadTasksFromFile loads tasks from the persister into the in-memory map
|
||||
// and schedules cron entries. Named "FromFile" for backward compat; in DB
|
||||
// mode it loads from the database.
|
||||
func (s *AgentJobService) LoadTasksFromFile() error {
|
||||
if s.tasksFile == "" {
|
||||
return nil // No file path configured
|
||||
}
|
||||
|
||||
s.fileMutex.Lock()
|
||||
defer s.fileMutex.Unlock()
|
||||
|
||||
if _, err := os.Stat(s.tasksFile); os.IsNotExist(err) {
|
||||
xlog.Debug("agent_tasks.json not found, starting with empty tasks")
|
||||
return nil
|
||||
}
|
||||
|
||||
fileContent, err := os.ReadFile(s.tasksFile)
|
||||
tasks, err := s.persister.LoadTasks(s.userID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read tasks file: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var tasksFile schema.TasksFile
|
||||
if err := json.Unmarshal(fileContent, &tasksFile); err != nil {
|
||||
return fmt.Errorf("failed to parse tasks file: %w", err)
|
||||
}
|
||||
|
||||
for _, task := range tasksFile.Tasks {
|
||||
for _, task := range tasks {
|
||||
s.tasks.Set(task.ID, task)
|
||||
// Schedule cron if enabled and has cron expression
|
||||
if task.Enabled && task.Cron != "" {
|
||||
if err := s.ScheduleCronTask(task); err != nil {
|
||||
xlog.Warn("Failed to schedule cron task on load", "error", err, "task_id", task.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
xlog.Info("Loaded tasks from file", "count", len(tasksFile.Tasks))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveTasksToFile saves tasks to agent_tasks.json
|
||||
// SaveTasksToFile flushes the current tasks map via the persister. File
|
||||
// persister bulk-writes the JSON file atomically; DB persister no-ops
|
||||
// because per-task SaveTask calls already wrote through.
|
||||
func (s *AgentJobService) SaveTasksToFile() error {
|
||||
if s.tasksFile == "" {
|
||||
return nil // No file path configured
|
||||
}
|
||||
|
||||
s.fileMutex.Lock()
|
||||
defer s.fileMutex.Unlock()
|
||||
|
||||
tasksFile := schema.TasksFile{
|
||||
Tasks: s.tasks.Values(),
|
||||
}
|
||||
|
||||
fileContent, err := json.MarshalIndent(tasksFile, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal tasks: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(s.tasksFile, fileContent, 0600); err != nil {
|
||||
return fmt.Errorf("failed to write tasks file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.persister.FlushTasks()
|
||||
}
|
||||
|
||||
// LoadJobsFromFile loads jobs from agent_jobs.json
|
||||
// LoadJobsFromFile loads jobs from the persister into the in-memory map.
|
||||
// Named "FromFile" for backward compat; in DB mode it loads from the
|
||||
// database.
|
||||
func (s *AgentJobService) LoadJobsFromFile() error {
|
||||
if s.jobsFile == "" {
|
||||
return nil // No file path configured
|
||||
}
|
||||
|
||||
s.fileMutex.Lock()
|
||||
defer s.fileMutex.Unlock()
|
||||
|
||||
if _, err := os.Stat(s.jobsFile); os.IsNotExist(err) {
|
||||
xlog.Debug("agent_jobs.json not found, starting with empty jobs")
|
||||
return nil
|
||||
}
|
||||
|
||||
fileContent, err := os.ReadFile(s.jobsFile)
|
||||
jobs, err := s.persister.LoadJobs(s.userID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read jobs file: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var jobsFile schema.JobsFile
|
||||
if err := json.Unmarshal(fileContent, &jobsFile); err != nil {
|
||||
return fmt.Errorf("failed to parse jobs file: %w", err)
|
||||
}
|
||||
|
||||
// Load jobs into memory
|
||||
for _, job := range jobsFile.Jobs {
|
||||
for _, job := range jobs {
|
||||
s.jobs.Set(job.ID, job)
|
||||
}
|
||||
|
||||
xlog.Info("Loaded jobs from file", "count", len(jobsFile.Jobs))
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveJobsToFile saves jobs to agent_jobs.json
|
||||
// SaveJobsToFile flushes the current jobs map via the persister. File
|
||||
// persister bulk-writes the JSON file atomically; DB persister no-ops
|
||||
// because per-job SaveJob calls already wrote through.
|
||||
func (s *AgentJobService) SaveJobsToFile() error {
|
||||
if s.jobsFile == "" {
|
||||
return nil // No file path configured
|
||||
}
|
||||
|
||||
s.fileMutex.Lock()
|
||||
defer s.fileMutex.Unlock()
|
||||
|
||||
jobsFile := schema.JobsFile{
|
||||
Jobs: s.jobs.Values(),
|
||||
LastCleanup: time.Now(),
|
||||
}
|
||||
|
||||
fileContent, err := json.MarshalIndent(jobsFile, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal jobs: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(s.jobsFile, fileContent, 0600); err != nil {
|
||||
return fmt.Errorf("failed to write jobs file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.persister.FlushJobs()
|
||||
}
|
||||
|
||||
// CreateTask creates a new task
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
@@ -281,6 +282,71 @@ var _ = Describe("AgentJobService", func() {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(retrieved.TaskID).To(Equal(taskID))
|
||||
})
|
||||
|
||||
It("does not surface a partial file when saves and loads race", func() {
|
||||
// Regression for the macOS-only CI flake where a concurrent
|
||||
// LoadJobsFromFile landed between os.WriteFile's open(O_TRUNC)
|
||||
// and write, yielding "unexpected end of JSON input" at offset 0.
|
||||
// Atomic temp+rename in the persister eliminates the window.
|
||||
task := schema.Task{
|
||||
Name: "Race Task",
|
||||
Model: "test-model",
|
||||
Prompt: "Test prompt",
|
||||
Enabled: true,
|
||||
}
|
||||
|
||||
taskID, err := service.CreateTask(task)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
_, err = service.ExecuteJob(taskID, map[string]string{}, "test", nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(service.SaveJobsToFile()).To(Succeed())
|
||||
|
||||
newService := agentpool.NewAgentJobService(
|
||||
appConfig,
|
||||
modelLoader,
|
||||
configLoader,
|
||||
evaluator,
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
deadline := time.Now().Add(500 * time.Millisecond)
|
||||
readerErrs := make(chan error, 1024)
|
||||
|
||||
for range 4 {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for time.Now().Before(deadline) {
|
||||
_ = service.SaveJobsToFile()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for range 4 {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for time.Now().Before(deadline) {
|
||||
if err := newService.LoadJobsFromFile(); err != nil {
|
||||
readerErrs <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(readerErrs)
|
||||
|
||||
var firstErr error
|
||||
for err := range readerErrs {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
Expect(firstErr).NotTo(HaveOccurred(), "concurrent load saw a partial/empty file")
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Prompt templating", func() {
|
||||
|
||||
@@ -16,6 +16,12 @@ type JobPersister interface {
|
||||
SaveJob(userID string, job schema.Job) error
|
||||
DeleteJob(jobID string) error
|
||||
|
||||
// Bulk flush of the current in-memory state. File-backed persister
|
||||
// rewrites the whole JSON file; DB-backed persister no-ops because
|
||||
// SaveTask/SaveJob are already write-through.
|
||||
FlushTasks() error
|
||||
FlushJobs() error
|
||||
|
||||
// Authoritative reads — DB returns fresh data; file returns nil, nil
|
||||
GetJob(jobID string) (*schema.Job, error)
|
||||
ListJobs(userID, taskID, status string, limit int) ([]schema.Job, error)
|
||||
|
||||
@@ -32,6 +32,12 @@ func (p *dbJobPersister) DeleteJob(jobID string) error {
|
||||
return p.store.DeleteJob(jobID)
|
||||
}
|
||||
|
||||
// FlushTasks is a no-op: SaveTask already writes through to the DB.
|
||||
func (p *dbJobPersister) FlushTasks() error { return nil }
|
||||
|
||||
// FlushJobs is a no-op: SaveJob already writes through to the DB.
|
||||
func (p *dbJobPersister) FlushJobs() error { return nil }
|
||||
|
||||
func (p *dbJobPersister) GetJob(jobID string) (*schema.Job, error) {
|
||||
rec, err := p.store.GetJob(jobID)
|
||||
if err != nil {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -41,6 +42,14 @@ func (p *fileJobPersister) DeleteJob(_ string) error {
|
||||
return p.saveJobsToFile()
|
||||
}
|
||||
|
||||
func (p *fileJobPersister) FlushTasks() error {
|
||||
return p.saveTasksToFile()
|
||||
}
|
||||
|
||||
func (p *fileJobPersister) FlushJobs() error {
|
||||
return p.saveJobsToFile()
|
||||
}
|
||||
|
||||
// GetJob returns nil — file persister has no authoritative reads.
|
||||
func (p *fileJobPersister) GetJob(_ string) (*schema.Job, error) {
|
||||
return nil, nil
|
||||
@@ -127,7 +136,7 @@ func (p *fileJobPersister) saveTasksToFile() error {
|
||||
return fmt.Errorf("failed to marshal tasks: %w", err)
|
||||
}
|
||||
|
||||
return os.WriteFile(p.tasksFile, data, 0600)
|
||||
return writeFileAtomic(p.tasksFile, data, 0600)
|
||||
}
|
||||
|
||||
// saveJobsToFile serializes the entire jobs map to the JSON file.
|
||||
@@ -149,5 +158,45 @@ func (p *fileJobPersister) saveJobsToFile() error {
|
||||
return fmt.Errorf("failed to marshal jobs: %w", err)
|
||||
}
|
||||
|
||||
return os.WriteFile(p.jobsFile, data, 0600)
|
||||
return writeFileAtomic(p.jobsFile, data, 0600)
|
||||
}
|
||||
|
||||
// writeFileAtomic writes data to path via a same-directory temp file + rename.
|
||||
// os.WriteFile opens with O_TRUNC, so a concurrent reader can land between the
|
||||
// truncate and the write and see an empty file ("unexpected end of JSON input").
|
||||
// rename(2) is atomic on POSIX, so readers see either the prior contents or the
|
||||
// new contents and never a zero-byte window.
|
||||
func writeFileAtomic(path string, data []byte, perm os.FileMode) error {
|
||||
dir := filepath.Dir(path)
|
||||
tmp, err := os.CreateTemp(dir, filepath.Base(path)+".tmp-*")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
removeTmp := func() { _ = os.Remove(tmpPath) }
|
||||
|
||||
if _, err := tmp.Write(data); err != nil {
|
||||
_ = tmp.Close()
|
||||
removeTmp()
|
||||
return fmt.Errorf("failed to write temp file: %w", err)
|
||||
}
|
||||
if err := tmp.Chmod(perm); err != nil {
|
||||
_ = tmp.Close()
|
||||
removeTmp()
|
||||
return fmt.Errorf("failed to chmod temp file: %w", err)
|
||||
}
|
||||
if err := tmp.Sync(); err != nil {
|
||||
_ = tmp.Close()
|
||||
removeTmp()
|
||||
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
removeTmp()
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
removeTmp()
|
||||
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user