From d7d7721eae1d5730b45d1f617e2cf6735e2cabfb Mon Sep 17 00:00:00 2001 From: "LocalAI [bot]" <139863280+localai-bot@users.noreply.github.com> Date: Sat, 27 Jun 2026 23:23:51 +0200 Subject: [PATCH] feat(distributed): SyncedMap component + migrate finetune/quant/agent-tasks to cross-replica state (#10542) * feat(distributed): add SyncedMap cross-replica in-memory state component Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map that keeps itself consistent across frontend replicas via NATS, with an optional pluggable durable Store and hydrate-from-source convergence. Several features keep process-local state surfaced to the API (finetune/quant jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS broadcast + read-through-store legs - or forgot to, reintroducing cross-replica staleness. SyncedMap makes that consistency a configuration choice: - local writes mutate the map, write through the Store, then broadcast a delta; - the apply path is memory-only and never re-publishes or re-writes the Store (structural echo-loop guard, mirroring galleryop.mergeStatus); - on Start and on NATS reconnect the map re-hydrates from the source (Store, else Loader); an optional periodic Reconcile repairs silent drift; - standalone mode (nil NATS client) is a strict in-memory no-op. Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback, consumed through an optional type-assertion so MessagingClient stays minimal. Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous in-process MessagingClient with wildcard matching) for adopter tests. Component only; service migrations follow in subsequent commits. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(finetune): back jobs with SyncedMap for cross-replica consistency FineTuneService kept jobs in a process-local map and, although it wrote them to Postgres, ListJobs/GetJob never read the store back and the wired natsClient was never used - so in distributed mode a job created on one replica was invisible to the others. Replace the map and the dead client with a syncstate.SyncedMap keyed by job ID, value *schema.FineTuneJob (the exact REST shape, so responses are unchanged). - Add a Store adapter (core/services/finetune/syncstore.go) over FineTuneStore, plus FineTuneStore.ListAll (global hydrate; per-user List kept) and an idempotent Upsert (create-or-update; Create alone fails on dup key). - Writes go through SyncedMap.Set/Delete (write-through + broadcast); reads use List/Get. The on-disk state.json path becomes the standalone Loader, keeping single-node restart recovery (stale->stopped / exporting->failed fixups). - Fold SetNATSClient/SetFineTuneStore into NewFineTuneService; app.go passes the distributed NATS client + store when distributed, nil otherwise. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(agentpool): back agent tasks with SyncedMap for cross-replica consistency AgentJobService.ListTasks read the process-local tasks map only, while ListJobs already read through the DB persister + dispatcher NATS - so in distributed mode a task created on one replica was invisible to the others. Back tasks with a syncstate.SyncedMap keyed by task ID (value schema.Task, the exact REST shape); jobs are left untouched. - Store adapter (task_syncstore.go) over the existing JobPersister (LoadTasks/SaveTask/DeleteTask); reads svc.persister/userID live so a persister swap needs no rebuild. No new persister methods required. - Task reads -> SyncedMap.List/Get; create/update -> Set (write-through + broadcast); delete -> Delete. The file persister now owns its own task set so the write-through path does not re-enter the SyncedMap lock (deadlock guard). - The distributed NATS client is not available at construction (start() precedes initDistributed), so it is injected via SetTaskSyncNATS, which rebuilds the still-empty map before Start/hydrate. Wired at the main, restart, and per-user (UserServicesManager) distributed sites. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(quantization): back jobs with SyncedMap + durable QuantStore QuantizationService kept jobs in a process-local map persisted only to a local state.json, so in distributed mode jobs were neither visible across replicas nor durable cluster-wide. Back jobs with a syncstate.SyncedMap keyed by job ID (value *schema.QuantizationJob, the exact REST shape). - New distributed.QuantStore (GORM, table quantization_jobs) mirroring FineTuneStore: Create/Get/ListAll/Upsert(idempotent)/Delete, registered for AutoMigrate via distributed.InitStores (Stores.Quant). - New adapter (quantization/syncstore.go) over QuantStore implementing syncstate.Store, with record<->schema conversion. - Reads go through List/Get, writes through Set/Delete (write-through + broadcast); state.json is kept as the standalone Loader for single-node restart recovery (stale-job fixups preserved). - app.go passes the distributed NATS client + QuantStore when distributed, nil otherwise; Start/Close lifecycle mirrors finetune. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(syncstate): annotate gosec G118 false positive on lifeCtx gosec flagged the WithCancel in Start as "cancellation function not called" because the returned cancel is stored on the struct rather than called/deferred in scope. It is invoked in Close (covered by tests), and lifeCtx must outlive Start to drive the reconnect/reconcile goroutines. Suppress the verified false positive with a justified #nosec G118. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(distributed): e2e two-replica SyncedMap sync over real NATS + Postgres Adds the real-infrastructure counterpart to the fake-bus unit tests, in the existing distributed e2e suite (testcontainers NATS + PostgreSQL). Two SyncedMap instances stand in for two frontend replicas - each with its OWN NATS connection to a shared server and a SHARED Postgres store (the distributed-mode invariant) - and assert, over the wire: - a create on replica A is observed by replica B; - an update and a delete propagate A -> B (delete prunes, which a reload cannot); - a late-joining replica recovers a job it never received a delta for, via store hydrate on Start (the at-most-once gap a fake bus cannot exercise); - a local Set is written through to the shared Postgres store. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto Co-authored-by: Ettore Di Giacinto --- core/application/agent_jobs.go | 2 + core/application/application.go | 4 + core/application/startup.go | 3 + core/http/app.go | 34 +- core/services/agentpool/agent_jobs.go | 150 +++++---- core/services/agentpool/job_persister_file.go | 57 +++- core/services/agentpool/job_persister_test.go | 18 +- core/services/agentpool/task_sync_test.go | 152 +++++++++ core/services/agentpool/task_syncstore.go | 47 +++ core/services/agentpool/user_services.go | 14 + core/services/distributed/finetune.go | 29 ++ .../distributed/finetune_suite_test.go | 13 + core/services/distributed/finetune_test.go | 61 ++++ core/services/distributed/init.go | 9 +- core/services/distributed/quant.go | 105 +++++++ core/services/distributed/quant_test.go | 57 ++++ core/services/finetune/finetune_suite_test.go | 13 + core/services/finetune/service.go | 168 +++++----- core/services/finetune/service_test.go | 185 +++++++++++ core/services/finetune/syncstore.go | 114 +++++++ core/services/messaging/client.go | 41 ++- core/services/messaging/subjects.go | 14 + .../quantization/quantization_suite_test.go | 13 + core/services/quantization/service.go | 120 ++++++-- core/services/quantization/service_test.go | 187 +++++++++++ core/services/quantization/syncstore.go | 114 +++++++ core/services/syncstate/syncstate.go | 289 +++++++++++++++++ .../syncstate/syncstate_suite_test.go | 13 + core/services/syncstate/syncstate_test.go | 291 ++++++++++++++++++ core/services/testutil/fakebus.go | 160 ++++++++++ .../distributed/syncstate_distributed_test.go | 161 ++++++++++ 31 files changed, 2450 insertions(+), 188 deletions(-) create mode 100644 core/services/agentpool/task_sync_test.go create mode 100644 core/services/agentpool/task_syncstore.go create mode 100644 core/services/distributed/finetune_suite_test.go create mode 100644 core/services/distributed/finetune_test.go create mode 100644 core/services/distributed/quant.go create mode 100644 core/services/distributed/quant_test.go create mode 100644 core/services/finetune/finetune_suite_test.go create mode 100644 core/services/finetune/service_test.go create mode 100644 core/services/finetune/syncstore.go create mode 100644 core/services/quantization/quantization_suite_test.go create mode 100644 core/services/quantization/service_test.go create mode 100644 core/services/quantization/syncstore.go create mode 100644 core/services/syncstate/syncstate.go create mode 100644 core/services/syncstate/syncstate_suite_test.go create mode 100644 core/services/syncstate/syncstate_test.go create mode 100644 core/services/testutil/fakebus.go create mode 100644 tests/e2e/distributed/syncstate_distributed_test.go diff --git a/core/application/agent_jobs.go b/core/application/agent_jobs.go index b7cfb20a3..f380b0750 100644 --- a/core/application/agent_jobs.go +++ b/core/application/agent_jobs.go @@ -37,6 +37,8 @@ func (a *Application) RestartAgentJobService() error { if d.JobStore != nil { agentJobService.SetDistributedJobStore(d.JobStore) } + // Keep agent tasks consistent across replicas (same client the dispatcher uses). + agentJobService.SetTaskSyncNATS(d.Nats) } // Start the service diff --git a/core/application/application.go b/core/application/application.go index 9bbf26bb8..52f8618f1 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -604,6 +604,10 @@ func (a *Application) StartAgentPool() { usm.SetJobDBStore(s) } } + // Keep per-user agent tasks consistent across replicas (nil in standalone). + if d := a.Distributed(); d != nil { + usm.SetJobSyncNATS(d.Nats) + } aps.SetUserServicesManager(usm) a.agentPoolService.Store(aps) diff --git a/core/application/startup.go b/core/application/startup.go index a71f8d0ea..25d965834 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -280,6 +280,9 @@ func New(opts ...config.AppOption) (*Application, error) { if application.agentJobService != nil { application.agentJobService.SetDistributedBackends(distSvc.Dispatcher) application.agentJobService.SetDistributedJobStore(distSvc.JobStore) + // Keep agent tasks consistent across replicas (jobs already sync via the + // dispatcher + DB read-through). Same NATS client the dispatcher uses. + application.agentJobService.SetTaskSyncNATS(distSvc.Nats) } // Wire skill store into AgentPoolService (wired at pool start time via closure) // The actual wiring happens in StartAgentPool since the pool doesn't exist yet. diff --git a/core/http/app.go b/core/http/app.go index ee5cd99eb..fff8a3468 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -23,8 +23,10 @@ import ( "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" "github.com/mudler/LocalAI/core/services/finetune" "github.com/mudler/LocalAI/core/services/galleryop" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/services/nodes" "github.com/mudler/LocalAI/core/services/quantization" @@ -400,25 +402,45 @@ func API(application *application.Application) (*echo.Echo, error) { routes.RegisterAgentPoolRoutes(e, application, agentsMw, skillsMw, collectionsMw) // Fine-tuning routes fineTuningMw := auth.RequireFeature(application.AuthDB(), auth.FeatureFineTuning) + // In distributed mode pass the shared NATS client + PostgreSQL store so + // fine-tune jobs stay consistent across replicas (the SyncedMap broadcasts + // mutations and hydrates from the DB); standalone passes nil for both. + var ftNats messaging.MessagingClient + var ftStore *distributed.FineTuneStore + if d := application.Distributed(); d != nil { + ftNats = d.Nats + if d.DistStores != nil && d.DistStores.FineTune != nil { + ftStore = d.DistStores.FineTune + } + } ftService := finetune.NewFineTuneService( application.ApplicationConfig(), application.ModelLoader(), application.ModelConfigLoader(), + ftNats, + ftStore, ) - if d := application.Distributed(); d != nil { - ftService.SetNATSClient(d.Nats) - if d.DistStores != nil && d.DistStores.FineTune != nil { - ftService.SetFineTuneStore(d.DistStores.FineTune) - } - } routes.RegisterFineTuningRoutes(e, ftService, application.ApplicationConfig(), fineTuningMw) // Quantization routes quantizationMw := auth.RequireFeature(application.AuthDB(), auth.FeatureQuantization) + // In distributed mode pass the shared NATS client + PostgreSQL store so + // quantization jobs stay consistent across replicas (the SyncedMap broadcasts + // mutations and hydrates from the DB); standalone passes nil for both. + var quantNats messaging.MessagingClient + var quantStore *distributed.QuantStore + if d := application.Distributed(); d != nil { + quantNats = d.Nats + if d.DistStores != nil && d.DistStores.Quant != nil { + quantStore = d.DistStores.Quant + } + } qService := quantization.NewQuantizationService( application.ApplicationConfig(), application.ModelLoader(), application.ModelConfigLoader(), + quantNats, + quantStore, ) routes.RegisterQuantizationRoutes(e, qService, application.ApplicationConfig(), quantizationMw) diff --git a/core/services/agentpool/agent_jobs.go b/core/services/agentpool/agent_jobs.go index 8d9e82b8e..59850981a 100644 --- a/core/services/agentpool/agent_jobs.go +++ b/core/services/agentpool/agent_jobs.go @@ -30,6 +30,8 @@ import ( mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp" "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/core/services/jobs" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" "github.com/mudler/LocalAI/core/templates" "github.com/mudler/LocalAI/pkg/httpclient" "github.com/mudler/LocalAI/pkg/model" @@ -43,8 +45,18 @@ type AgentJobService struct { configLoader *config.ModelConfigLoader evaluator *templates.Evaluator + // tasks is the cross-replica task store: an in-memory map kept consistent + // across replicas via NATS, with read-through to the configured persister + // (file in standalone, PostgreSQL in distributed). Unlike jobs - which already + // converge via the dispatcher + DB read-through - tasks previously read + // in-memory only, so ListTasks went stale on non-originating replicas. + tasks *syncstate.SyncedMap[string, schema.Task] + // taskNats is the distributed NATS client backing the tasks SyncedMap. It is + // not available at construction time, so it is injected via SetTaskSyncNATS + // during distributed wiring; nil keeps tasks in-memory-only (standalone). + taskNats messaging.MessagingClient + // Storage (in-memory primary, persister for secondary persistence) - tasks *xsync.SyncedMap[string, schema.Task] jobs *xsync.SyncedMap[string, schema.Job] persister JobPersister userID string // Scoping: empty for global (main service), set for per-user instances @@ -96,6 +108,31 @@ func (s *AgentJobService) SetDistributedJobStore(store *jobs.JobStore) { s.persister = &dbJobPersister{store: store} } +// SetTaskSyncNATS wires the distributed NATS client used to keep agent *tasks* +// consistent across replicas (jobs already converge via the dispatcher + DB +// read-through, so they are left untouched). The client is not available when the +// service is constructed, so it is injected here during distributed wiring and the +// tasks SyncedMap is rebuilt to pick it up. It is always called before Start / +// hydrate, while the map is still empty, so rebuilding loses no state. Passing nil +// (standalone) keeps the map in-memory-only with no broadcast. +func (s *AgentJobService) SetTaskSyncNATS(nats messaging.MessagingClient) { + s.taskNats = nats + s.buildTasksMap() +} + +// buildTasksMap (re)constructs the cross-replica tasks SyncedMap from the current +// taskNats. The Store adapter reads s.persister/s.userID live, so a persister swap +// (SetDistributedJobStore) needs no rebuild; only the NATS client, fixed at +// New-time, forces one - hence SetTaskSyncNATS calls this. +func (s *AgentJobService) buildTasksMap() { + s.tasks = syncstate.New(syncstate.Config[string, schema.Task]{ + Name: "agent.tasks", + Key: func(t schema.Task) string { return t.ID }, + Nats: s.taskNats, + Store: &taskStoreAdapter{svc: s}, + }) +} + // Dispatcher returns the distributed dispatcher (nil if not in distributed mode). func (s *AgentJobService) Dispatcher() DistributedDispatcher { return s.dispatcher @@ -106,13 +143,6 @@ func (s *AgentJobService) DBStore() *jobs.JobStore { return s.rawDBStore } -// saveTasks persists tasks via the configured persister (file or DB). -func (s *AgentJobService) saveTasks(task schema.Task) { - if err := s.persister.SaveTask(s.userID, task); err != nil { - xlog.Warn("Failed to persist task", "error", err, "task_id", task.ID) - } -} - // saveJobs persists jobs via the configured persister (file or DB). func (s *AgentJobService) saveJobs(job schema.Job) { if err := s.persister.SaveJob(s.userID, job); err != nil { @@ -129,18 +159,8 @@ func (s *AgentJobService) LoadFromDB() { // loadFromPersister loads tasks and jobs from the configured persister into memory. func (s *AgentJobService) loadFromPersister() { - if tasks, err := s.persister.LoadTasks(s.userID); err != nil { + if err := s.hydrateTasks(s.appConfig.Context); err != nil { xlog.Warn("Failed to load tasks from persister", "error", err) - } else { - for _, task := range tasks { - s.tasks.Set(task.ID, task) - if task.Enabled && task.Cron != "" { - if err := s.ScheduleCronTask(task); err != nil { - xlog.Warn("Failed to schedule cron task on load", "error", err, "task_id", task.ID) - } - } - } - xlog.Info("Loaded tasks from persister", "count", len(tasks)) } if loadedJobs, err := s.persister.LoadJobs(s.userID); err != nil { @@ -153,6 +173,27 @@ func (s *AgentJobService) loadFromPersister() { } } +// hydrateTasks loads tasks into the cross-replica SyncedMap and (re)schedules +// cron entries for enabled tasks. Hydration goes through the SyncedMap's Store +// read-through (Start), not Set, so it neither re-persists nor re-broadcasts the +// loaded tasks. Each service instance hydrates exactly once: the main service via +// Start -> loadFromPersister, per-user services via LoadFromDB or LoadTasksFromFile. +func (s *AgentJobService) hydrateTasks(ctx context.Context) error { + if err := s.tasks.Start(ctx); err != nil { + return err + } + tasks := s.tasks.List() + for _, task := range tasks { + if task.Enabled && task.Cron != "" { + if err := s.ScheduleCronTask(task); err != nil { + xlog.Warn("Failed to schedule cron task on load", "error", err, "task_id", task.ID) + } + } + } + xlog.Info("Loaded tasks from persister", "count", len(tasks)) + return nil +} + // JobExecution represents a job to be executed type JobExecution struct { Job schema.Job @@ -200,21 +241,19 @@ func NewAgentJobServiceWithPaths( ) *AgentJobService { retentionDays := cmp.Or(appConfig.AgentJobRetentionDays, 30) - tasks := xsync.NewSyncedMap[string, schema.Task]() jobsMap := xsync.NewSyncedMap[string, schema.Job]() - return &AgentJobService{ + s := &AgentJobService{ appConfig: appConfig, modelLoader: modelLoader, configLoader: configLoader, evaluator: evaluator, - tasks: tasks, jobs: jobsMap, persister: &fileJobPersister{ - tasks: tasks, jobs: jobsMap, tasksFile: tasksFile, jobsFile: jobsFile, + taskSet: make(map[string]schema.Task), }, jobQueue: make(chan JobExecution, 100), // Buffer for 100 jobs cancellations: xsync.NewSyncedMap[string, context.CancelFunc](), @@ -222,25 +261,17 @@ func NewAgentJobServiceWithPaths( cronEntries: xsync.NewSyncedMap[string, cron.EntryID](), retentionDays: retentionDays, } + // Build the cross-replica tasks map standalone (nil NATS); SetTaskSyncNATS + // rebuilds it with the distributed client once that is available, before Start. + s.buildTasksMap() + return s } // LoadTasksFromFile loads tasks from the persister into the in-memory map // and schedules cron entries. Named "FromFile" for backward compat; in DB // mode it loads from the database. func (s *AgentJobService) LoadTasksFromFile() error { - tasks, err := s.persister.LoadTasks(s.userID) - if err != nil { - return err - } - for _, task := range tasks { - s.tasks.Set(task.ID, task) - if task.Enabled && task.Cron != "" { - if err := s.ScheduleCronTask(task); err != nil { - xlog.Warn("Failed to schedule cron task on load", "error", err, "task_id", task.ID) - } - } - } - return nil + return s.hydrateTasks(s.appConfig.Context) } // SaveTasksToFile flushes the current tasks map via the persister. File @@ -293,8 +324,12 @@ func (s *AgentJobService) CreateTask(task schema.Task) (string, error) { task.Enabled = true // Default to enabled } - // Store task - s.tasks.Set(id, task) + // Store task: Set updates the in-memory map, write-throughs to the persister + // (file or DB), and broadcasts the create to peer replicas. Background ctx + // because CreateTask carries no request ctx (mirrors the finetune service). + if err := s.tasks.Set(context.Background(), task); err != nil { + return "", fmt.Errorf("failed to persist task: %w", err) + } // Schedule cron if enabled and has cron expression if task.Enabled && task.Cron != "" { @@ -303,16 +338,15 @@ func (s *AgentJobService) CreateTask(task schema.Task) (string, error) { } } - s.saveTasks(task) return id, nil } // UpdateTask updates an existing task func (s *AgentJobService) UpdateTask(id string, task schema.Task) error { - if !s.tasks.Exists(id) { + existing, ok := s.tasks.Get(id) + if !ok { return fmt.Errorf("%w: %s", ErrTaskNotFound, id) } - existing := s.tasks.Get(id) // Preserve ID and CreatedAt task.ID = id @@ -324,8 +358,10 @@ func (s *AgentJobService) UpdateTask(id string, task schema.Task) error { s.UnscheduleCronTask(id) } - // Store updated task - s.tasks.Set(id, task) + // Store updated task: write-through + broadcast (see CreateTask). + if err := s.tasks.Set(context.Background(), task); err != nil { + return fmt.Errorf("failed to persist task: %w", err) + } // Schedule new cron if enabled and has cron expression if task.Enabled && task.Cron != "" { @@ -334,24 +370,22 @@ func (s *AgentJobService) UpdateTask(id string, task schema.Task) error { } } - s.saveTasks(task) return nil } // DeleteTask deletes a task func (s *AgentJobService) DeleteTask(id string) error { - if !s.tasks.Exists(id) { + if _, ok := s.tasks.Get(id); !ok { return fmt.Errorf("%w: %s", ErrTaskNotFound, id) } // Unschedule cron s.UnscheduleCronTask(id) - // Remove from memory - s.tasks.Delete(id) - - if err := s.persister.DeleteTask(id); err != nil { - xlog.Warn("Failed to delete task from persister", "error", err, "task_id", id) + // Delete removes from the in-memory map, deletes from the persister, and + // broadcasts the removal to peer replicas. + if err := s.tasks.Delete(context.Background(), id); err != nil { + xlog.Warn("Failed to delete task from store", "error", err, "task_id", id) } return nil @@ -359,8 +393,8 @@ func (s *AgentJobService) DeleteTask(id string) error { // GetTask retrieves a task by ID func (s *AgentJobService) GetTask(id string) (*schema.Task, error) { - task := s.tasks.Get(id) - if task.ID == "" { + task, ok := s.tasks.Get(id) + if !ok { return nil, fmt.Errorf("%w: %s", ErrTaskNotFound, id) } return &task, nil @@ -368,7 +402,7 @@ func (s *AgentJobService) GetTask(id string) (*schema.Task, error) { // ListTasks returns all tasks, sorted by creation date (newest first) func (s *AgentJobService) ListTasks() []schema.Task { - tasks := s.tasks.Values() + tasks := s.tasks.List() // Sort by CreatedAt descending (newest first), then by Name for stability slices.SortFunc(tasks, func(a, b schema.Task) int { if a.CreatedAt.Equal(b.CreatedAt) { @@ -397,8 +431,8 @@ func (s *AgentJobService) buildPrompt(templateStr string, params map[string]stri // ExecuteJob creates and queues a job for execution // multimedia can be nil for backward compatibility func (s *AgentJobService) ExecuteJob(taskID string, params map[string]string, triggeredBy string, multimedia *schema.MultimediaAttachment) (string, error) { - task := s.tasks.Get(taskID) - if task.ID == "" { + task, ok := s.tasks.Get(taskID) + if !ok { return "", fmt.Errorf("%w: %s", ErrTaskNotFound, taskID) } @@ -1451,6 +1485,12 @@ func (s *AgentJobService) Stop() error { if s.cronScheduler != nil { s.cronScheduler.Stop() } + // Release the tasks SyncedMap subscription / background workers. + if s.tasks != nil { + if err := s.tasks.Close(); err != nil { + xlog.Warn("Error closing tasks sync map", "error", err) + } + } xlog.Info("AgentJobService stopped") return nil } diff --git a/core/services/agentpool/job_persister_file.go b/core/services/agentpool/job_persister_file.go index 3087a2524..b161c442b 100644 --- a/core/services/agentpool/job_persister_file.go +++ b/core/services/agentpool/job_persister_file.go @@ -14,24 +14,38 @@ import ( ) // fileJobPersister persists tasks and jobs to JSON files. -// It holds references to the service's syncmaps and serializes the entire -// map contents on each save (bulk write). Reads at runtime return nil -// (the in-memory map is the authoritative source); LoadTasks/LoadJobs -// are used only at startup to bootstrap the syncmaps. +// +// Jobs serialize the service's in-memory jobs syncmap on each save (bulk write). +// Tasks are kept in this persister's own taskSet map instead: the tasks SyncedMap +// calls SaveTask/DeleteTask while holding its internal lock (write-through), so +// reading back the SyncedMap here would re-enter that lock and deadlock. The +// self-contained taskSet, seeded by LoadTasks, lets a per-task write rewrite the +// whole bulk file without touching the SyncedMap. +// +// Runtime reads (GetJob/ListJobs) return nil (the in-memory state is the +// authoritative source); LoadTasks/LoadJobs bootstrap state at startup. type fileJobPersister struct { - tasks *xsync.SyncedMap[string, schema.Task] jobs *xsync.SyncedMap[string, schema.Job] tasksFile string jobsFile string mu sync.Mutex + // taskSet is the persister's own view of all tasks, seeded by LoadTasks and + // updated by SaveTask/DeleteTask. The bulk JSON file is rewritten from it. + taskSet map[string]schema.Task } -func (p *fileJobPersister) SaveTask(_ string, _ schema.Task) error { - return p.saveTasksToFile() +func (p *fileJobPersister) SaveTask(_ string, task schema.Task) error { + p.mu.Lock() + defer p.mu.Unlock() + p.taskSet[task.ID] = task + return p.writeTasksLocked() } -func (p *fileJobPersister) DeleteTask(_ string) error { - return p.saveTasksToFile() +func (p *fileJobPersister) DeleteTask(taskID string) error { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.taskSet, taskID) + return p.writeTasksLocked() } func (p *fileJobPersister) SaveJob(_ string, _ schema.Job) error { @@ -43,7 +57,9 @@ func (p *fileJobPersister) DeleteJob(_ string) error { } func (p *fileJobPersister) FlushTasks() error { - return p.saveTasksToFile() + p.mu.Lock() + defer p.mu.Unlock() + return p.writeTasksLocked() } func (p *fileJobPersister) FlushJobs() error { @@ -83,6 +99,12 @@ func (p *fileJobPersister) LoadTasks(_ string) ([]schema.Task, error) { return nil, fmt.Errorf("failed to parse tasks file: %w", err) } + // Seed the in-memory set so subsequent per-task SaveTask/DeleteTask merge into + // (rather than overwrite) the persisted tasks when the bulk file is rewritten. + for _, t := range tf.Tasks { + p.taskSet[t.ID] = t + } + xlog.Info("Loaded tasks from file", "count", len(tf.Tasks)) return tf.Tasks, nil } @@ -118,19 +140,20 @@ func (p *fileJobPersister) CleanupOldJobs(_ time.Duration) (int64, error) { return 0, nil // cleanup handled via in-memory filtering } -// saveTasksToFile serializes the entire tasks map to the JSON file. -func (p *fileJobPersister) saveTasksToFile() error { +// writeTasksLocked serializes the persister's task set to the JSON file. Callers +// must hold p.mu. +func (p *fileJobPersister) writeTasksLocked() error { if p.tasksFile == "" { return nil } - p.mu.Lock() - defer p.mu.Unlock() - - tf := schema.TasksFile{ - Tasks: p.tasks.Values(), + tasks := make([]schema.Task, 0, len(p.taskSet)) + for _, t := range p.taskSet { + tasks = append(tasks, t) } + tf := schema.TasksFile{Tasks: tasks} + data, err := json.MarshalIndent(tf, "", " ") if err != nil { return fmt.Errorf("failed to marshal tasks: %w", err) diff --git a/core/services/agentpool/job_persister_test.go b/core/services/agentpool/job_persister_test.go index 919eb4a66..646104db6 100644 --- a/core/services/agentpool/job_persister_test.go +++ b/core/services/agentpool/job_persister_test.go @@ -20,28 +20,26 @@ var _ = Describe("JobPersister", func() { Context("fileJobPersister", func() { var ( p *fileJobPersister - tasks *xsync.SyncedMap[string, schema.Task] jobsMap *xsync.SyncedMap[string, schema.Job] tmpDir string ) BeforeEach(func() { tmpDir = GinkgoT().TempDir() - tasks = xsync.NewSyncedMap[string, schema.Task]() jobsMap = xsync.NewSyncedMap[string, schema.Job]() p = &fileJobPersister{ - tasks: tasks, jobs: jobsMap, tasksFile: filepath.Join(tmpDir, "tasks.json"), jobsFile: filepath.Join(tmpDir, "jobs.json"), + // taskSet is the persister's own task view (decoupled from the tasks + // SyncedMap to avoid re-entering its lock during write-through). + taskSet: make(map[string]schema.Task), } }) It("SaveTask writes all tasks to file", func() { - tasks.Set("t1", schema.Task{ID: "t1", Name: "Task One", Model: "m", Prompt: "p"}) - tasks.Set("t2", schema.Task{ID: "t2", Name: "Task Two", Model: "m", Prompt: "p"}) - - Expect(p.SaveTask("", schema.Task{})).To(Succeed()) + Expect(p.SaveTask("", schema.Task{ID: "t1", Name: "Task One", Model: "m", Prompt: "p"})).To(Succeed()) + Expect(p.SaveTask("", schema.Task{ID: "t2", Name: "Task Two", Model: "m", Prompt: "p"})).To(Succeed()) // Verify file contents data, err := os.ReadFile(p.tasksFile) @@ -52,11 +50,9 @@ var _ = Describe("JobPersister", func() { }) It("DeleteTask writes updated tasks to file", func() { - tasks.Set("t1", schema.Task{ID: "t1", Name: "Keep"}) - tasks.Set("t2", schema.Task{ID: "t2", Name: "Delete"}) + Expect(p.SaveTask("", schema.Task{ID: "t1", Name: "Keep"})).To(Succeed()) + Expect(p.SaveTask("", schema.Task{ID: "t2", Name: "Delete"})).To(Succeed()) - // Simulate deletion from memory (caller does this before calling persister) - tasks.Delete("t2") Expect(p.DeleteTask("t2")).To(Succeed()) data, err := os.ReadFile(p.tasksFile) diff --git a/core/services/agentpool/task_sync_test.go b/core/services/agentpool/task_sync_test.go new file mode 100644 index 000000000..d42a197f6 --- /dev/null +++ b/core/services/agentpool/task_sync_test.go @@ -0,0 +1,152 @@ +package agentpool + +// White-box tests (package agentpool) so a spec can build two AgentJobService +// instances sharing one in-memory bus and assert that agent *tasks* converge +// across replicas - the bug this migration fixes (ListTasks used to read +// in-memory only, so a task created on replica A was invisible on replica B). +// Jobs are deliberately untouched here: they already converge via the dispatcher +// + DB read-through. + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" + "github.com/mudler/LocalAI/core/services/testutil" + "github.com/mudler/LocalAI/pkg/system" +) + +// newTaskSyncService builds an AgentJobService wired to the given bus and a +// throwaway data dir (so the file persister has somewhere to write). Model/config +// loaders are nil because the task sync paths under test never touch them. +func newTaskSyncService(bus messaging.MessagingClient) *AgentJobService { + tmpDir := GinkgoT().TempDir() + sysState := &system.SystemState{} + sysState.Model.ModelsPath = tmpDir + appConfig := config.NewApplicationConfig( + config.WithDynamicConfigDir(tmpDir), + config.WithContext(context.Background()), + ) + appConfig.SystemState = sysState + + svc := NewAgentJobServiceWithPaths(appConfig, nil, nil, nil, + // Distinct per-replica files so the file persister write-through never + // crosses replicas: convergence here must be proven via the bus alone. + tmpDir+"/tasks.json", tmpDir+"/jobs.json") + svc.SetTaskSyncNATS(bus) + return svc +} + +var _ = Describe("AgentJobService task cross-replica sync", func() { + Describe("two replicas sharing one bus", func() { + var ( + bus *testutil.FakeBus + a, b *AgentJobService + ) + + BeforeEach(func() { + // One shared bus, two replicas: exactly the distributed topology where a + // round-robin request may land on a replica that did not originate the + // change. + bus = testutil.NewFakeBus() + a = newTaskSyncService(bus) + b = newTaskSyncService(bus) + // Start hydrates (empty here) and subscribes both replicas to deltas. + Expect(a.Start(context.Background())).To(Succeed()) + Expect(b.Start(context.Background())).To(Succeed()) + }) + + AfterEach(func() { + Expect(a.Stop()).To(Succeed()) + Expect(b.Stop()).To(Succeed()) + }) + + It("makes a task created on A visible via B's GetTask and ListTasks", func() { + id, err := a.CreateTask(schema.Task{Name: "Shared", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + + got, err := b.GetTask(id) + Expect(err).NotTo(HaveOccurred(), "B must see a task A just created") + Expect(got.Name).To(Equal("Shared")) + + listed := b.ListTasks() + Expect(listed).To(HaveLen(1)) + Expect(listed[0].ID).To(Equal(id)) + }) + + It("propagates a task update from A to B", func() { + id, err := a.CreateTask(schema.Task{Name: "Before", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + + Expect(a.UpdateTask(id, schema.Task{Name: "After", Model: "m", Prompt: "p"})).To(Succeed()) + + got, err := b.GetTask(id) + Expect(err).NotTo(HaveOccurred()) + Expect(got.Name).To(Equal("After"), "an update on A must be visible on B") + }) + + It("removes a task from B when it is deleted on A", func() { + id, err := a.CreateTask(schema.Task{Name: "Doomed", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + _, err = b.GetTask(id) + Expect(err).NotTo(HaveOccurred(), "precondition: B must have the task before the delete") + + Expect(a.DeleteTask(id)).To(Succeed()) + + _, err = b.GetTask(id) + Expect(err).To(HaveOccurred(), "a delete on A must remove the task from B") + Expect(b.ListTasks()).To(BeEmpty()) + }) + + It("does not re-broadcast a delta it received (echo-loop guard)", func() { + subject := messaging.SubjectSyncStateDelta("agent.tasks") + + _, err := a.CreateTask(schema.Task{Name: "Once", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + + // Exactly one publish: A's create. B applies it without re-publishing, + // otherwise this would be 2+ and a real bus would storm. + Expect(bus.PublishCount(subject)).To(Equal(1)) + }) + }) + + Describe("ListTasks ordering and scoping", func() { + var svc *AgentJobService + + BeforeEach(func() { + svc = newTaskSyncService(testutil.NewFakeBus()) + Expect(svc.Start(context.Background())).To(Succeed()) + }) + AfterEach(func() { Expect(svc.Stop()).To(Succeed()) }) + + It("sorts newest-first, breaking ties by name", func() { + // CreateTask stamps CreatedAt with time.Now(); space them out so ordering + // is deterministic rather than relying on the sub-millisecond gap. + oldID, err := svc.CreateTask(schema.Task{Name: "Old", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(5 * time.Millisecond) + newID, err := svc.CreateTask(schema.Task{Name: "New", Model: "m", Prompt: "p"}) + Expect(err).NotTo(HaveOccurred()) + + listed := svc.ListTasks() + Expect(listed).To(HaveLen(2)) + Expect(listed[0].ID).To(Equal(newID), "newest first") + Expect(listed[1].ID).To(Equal(oldID)) + }) + }) + + Describe("compile-time adapter contract", func() { + It("satisfies syncstate.Store for tasks", func() { + // Mirrors the var assertion in task_syncstore.go; keeps the type + // referenced from a spec so drift surfaces here too. + var _ syncstate.Store[string, schema.Task] = (*taskStoreAdapter)(nil) + Expect(&taskStoreAdapter{}).ToNot(BeNil()) + }) + }) +}) diff --git a/core/services/agentpool/task_syncstore.go b/core/services/agentpool/task_syncstore.go new file mode 100644 index 000000000..ef8f3f8cc --- /dev/null +++ b/core/services/agentpool/task_syncstore.go @@ -0,0 +1,47 @@ +package agentpool + +import ( + "context" + + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/syncstate" +) + +// taskStoreAdapter bridges the existing JobPersister (file- or DB-backed) to the +// generic syncstate.Store the tasks SyncedMap consumes. Only tasks are migrated: +// jobs already converge across replicas via the dispatcher (NATS) plus the DB +// read-through in ListJobs/GetJob, whereas ListTasks read in-memory only and so +// went stale on replicas that did not originate the change. +// +// The adapter reads svc.persister and svc.userID live (rather than capturing +// them) because both are configured by setters - SetDistributedJobStore swaps the +// file persister for the DB one, SetUserID scopes per-user queries - AFTER the +// service, and thus this adapter, is constructed. Reading them at call time means +// the SyncedMap never has to be rebuilt when the persister is swapped. +// +// The SyncedMap value type is schema.Task: the exact shape ListTasks returns, so +// reads need no conversion and REST responses are provably unchanged. +type taskStoreAdapter struct { + svc *AgentJobService +} + +// compile-time assertion that the adapter satisfies the component's Store. +var _ syncstate.Store[string, schema.Task] = (*taskStoreAdapter)(nil) + +// List hydrates the map from durable storage on Start/reconnect: the file's task +// list (standalone) or every task row (DB / distributed). +func (a *taskStoreAdapter) List(_ context.Context) ([]schema.Task, error) { + return a.svc.persister.LoadTasks(a.svc.userID) +} + +// Upsert write-through persists a single task created/updated locally; the +// SyncedMap then broadcasts the delta to peers. +func (a *taskStoreAdapter) Upsert(_ context.Context, task schema.Task) error { + return a.svc.persister.SaveTask(a.svc.userID, task) +} + +// Delete write-through removes a task locally; the SyncedMap then broadcasts the +// removal to peers. +func (a *taskStoreAdapter) Delete(_ context.Context, id string) error { + return a.svc.persister.DeleteTask(id) +} diff --git a/core/services/agentpool/user_services.go b/core/services/agentpool/user_services.go index db30e25ad..56d19e0fc 100644 --- a/core/services/agentpool/user_services.go +++ b/core/services/agentpool/user_services.go @@ -7,6 +7,7 @@ import ( "github.com/mudler/LocalAGI/webui/collections" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/services/jobs" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/templates" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/xlog" @@ -28,6 +29,9 @@ type UserServicesManager struct { // Shared distributed backends (set once, inherited by per-user job services) jobDispatcher DistributedDispatcher jobDBStore *jobs.JobStore + // jobNats keeps per-user agent tasks consistent across replicas (nil in + // standalone). Inherited by each per-user AgentJobService. + jobNats messaging.MessagingClient } // NewUserServicesManager creates a new UserServicesManager. @@ -162,6 +166,10 @@ func (m *UserServicesManager) GetJobs(userID string) (*AgentJobService, error) { if m.jobDispatcher != nil { svc.SetDistributedBackends(m.jobDispatcher) } + // Inherit the NATS client so per-user tasks broadcast across replicas. Must be + // set before the hydrate below (LoadFromDB / LoadTasksFromFile) so the tasks + // SyncedMap is rebuilt with the client while it is still empty. + svc.SetTaskSyncNATS(m.jobNats) if m.jobDBStore != nil { svc.SetDistributedJobStore(m.jobDBStore) // Load tasks/jobs from DB immediately (per-user services skip Start()) @@ -189,6 +197,12 @@ func (m *UserServicesManager) SetJobDBStore(s *jobs.JobStore) { m.jobDBStore = s } +// SetJobSyncNATS sets the NATS client used to keep per-user agent tasks consistent +// across replicas. +func (m *UserServicesManager) SetJobSyncNATS(nats messaging.MessagingClient) { + m.jobNats = nats +} + // ListAllUserIDs returns all user IDs that have scoped data directories. func (m *UserServicesManager) ListAllUserIDs() ([]string, error) { return m.storage.ListUserDirs() diff --git a/core/services/distributed/finetune.go b/core/services/distributed/finetune.go index 49144c32d..1f4cfeb8f 100644 --- a/core/services/distributed/finetune.go +++ b/core/services/distributed/finetune.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/mudler/LocalAI/core/services/advisorylock" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // FineTuneJobRecord tracks fine-tune jobs in PostgreSQL. @@ -80,6 +81,34 @@ func (s *FineTuneStore) List(userID string) ([]FineTuneJobRecord, error) { return jobs, q.Find(&jobs).Error } +// ListAll returns every fine-tune job across all users. The SyncedMap that backs +// FineTuneService is a single global map (the REST API filters by user at read +// time), so hydrate needs the full set rather than the per-user List above. +func (s *FineTuneStore) ListAll() ([]FineTuneJobRecord, error) { + var jobs []FineTuneJobRecord + return jobs, s.db.Order("created_at DESC").Find(&jobs).Error +} + +// Upsert idempotently inserts or fully replaces a job row by primary key. The +// SyncedMap write-through path issues a single Set per mutation regardless of +// whether the job already exists, so it needs one create-or-update primitive +// (Create alone fails on a duplicate key, UpdateStatus alone misses new rows and +// only touches a few columns). +func (s *FineTuneStore) Upsert(job *FineTuneJobRecord) error { + if job.ID == "" { + job.ID = uuid.New().String() + } + now := time.Now() + if job.CreatedAt.IsZero() { + job.CreatedAt = now + } + job.UpdatedAt = now + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(job).Error +} + // UpdateStatus updates the status and message of a fine-tune job. func (s *FineTuneStore) UpdateStatus(id, status, message string) error { return s.db.Model(&FineTuneJobRecord{}).Where("id = ?", id).Updates(map[string]any{ diff --git a/core/services/distributed/finetune_suite_test.go b/core/services/distributed/finetune_suite_test.go new file mode 100644 index 000000000..87add73fd --- /dev/null +++ b/core/services/distributed/finetune_suite_test.go @@ -0,0 +1,13 @@ +package distributed_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestDistributed(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Distributed Suite") +} diff --git a/core/services/distributed/finetune_test.go b/core/services/distributed/finetune_test.go new file mode 100644 index 000000000..cf92b5cf4 --- /dev/null +++ b/core/services/distributed/finetune_test.go @@ -0,0 +1,61 @@ +package distributed_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +var _ = Describe("FineTuneStore", func() { + var store *distributed.FineTuneStore + + BeforeEach(func() { + db := testutil.SetupTestDB() + var err error + store, err = distributed.NewFineTuneStore(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("ListAll", func() { + It("returns jobs across all users (unlike per-user List)", func() { + Expect(store.Create(&distributed.FineTuneJobRecord{ID: "j1", UserID: "u1", Status: "queued"})).To(Succeed()) + Expect(store.Create(&distributed.FineTuneJobRecord{ID: "j2", UserID: "u2", Status: "queued"})).To(Succeed()) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(2)) + + perUser, err := store.List("u1") + Expect(err).ToNot(HaveOccurred()) + Expect(perUser).To(HaveLen(1), "List stays per-user") + }) + }) + + Describe("Upsert", func() { + It("inserts a new row", func() { + Expect(store.Upsert(&distributed.FineTuneJobRecord{ID: "up-1", UserID: "u1", Status: "queued"})).To(Succeed()) + + got, err := store.Get("up-1") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("queued")) + }) + + It("idempotently updates an existing row on a repeated key", func() { + Expect(store.Upsert(&distributed.FineTuneJobRecord{ID: "up-2", UserID: "u1", Status: "queued"})).To(Succeed()) + // Second Upsert with the same primary key must update, not error on a + // duplicate-key violation (this is the SyncedMap write-through contract). + Expect(store.Upsert(&distributed.FineTuneJobRecord{ID: "up-2", UserID: "u1", Status: "completed", Message: "done"})).To(Succeed()) + + got, err := store.Get("up-2") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + Expect(got.Message).To(Equal("done")) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(1), "upsert must not create a duplicate") + }) + }) +}) diff --git a/core/services/distributed/init.go b/core/services/distributed/init.go index 0ccbe5969..ac28441e2 100644 --- a/core/services/distributed/init.go +++ b/core/services/distributed/init.go @@ -11,6 +11,7 @@ import ( type Stores struct { Gallery *GalleryStore FineTune *FineTuneStore + Quant *QuantStore Skills *SkillStore } @@ -26,15 +27,21 @@ func InitStores(db *gorm.DB) (*Stores, error) { return nil, fmt.Errorf("fine-tune store: %w", err) } + quant, err := NewQuantStore(db) + if err != nil { + return nil, fmt.Errorf("quantization store: %w", err) + } + skills, err := NewSkillStore(db) if err != nil { return nil, fmt.Errorf("skills store: %w", err) } - xlog.Info("Distributed stores initialized (Gallery, FineTune, Skills)") + xlog.Info("Distributed stores initialized (Gallery, FineTune, Quant, Skills)") return &Stores{ Gallery: gallery, FineTune: ft, + Quant: quant, Skills: skills, }, nil } diff --git a/core/services/distributed/quant.go b/core/services/distributed/quant.go new file mode 100644 index 000000000..cba032f4d --- /dev/null +++ b/core/services/distributed/quant.go @@ -0,0 +1,105 @@ +package distributed + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/mudler/LocalAI/core/services/advisorylock" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// QuantJobRecord tracks quantization jobs in PostgreSQL. The columns mirror the +// API shape (schema.QuantizationJob); the structured Config and ExtraOptions are +// serialized into JSON text columns so a record fully reconstructs the job. +type QuantJobRecord struct { + ID string `gorm:"primaryKey;size:36" json:"id"` + UserID string `gorm:"index;size:36" json:"user_id,omitempty"` + Model string `gorm:"size:255" json:"model"` + Backend string `gorm:"size:64" json:"backend"` + ModelID string `gorm:"size:255" json:"model_id,omitempty"` + QuantizationType string `gorm:"size:32" json:"quantization_type"` + Status string `gorm:"index;size:32;default:queued" json:"status"` // queued, downloading, converting, quantizing, completed, failed, stopped + Message string `gorm:"type:text" json:"message,omitempty"` + OutputDir string `gorm:"size:512" json:"output_dir,omitempty"` + OutputFile string `gorm:"size:512" json:"output_file,omitempty"` + ConfigJSON string `gorm:"column:config;type:text" json:"-"` + ExtraOptsJSON string `gorm:"column:extra_options;type:text" json:"-"` + ImportStatus string `gorm:"size:32" json:"import_status,omitempty"` + ImportMessage string `gorm:"type:text" json:"import_message,omitempty"` + ImportModelName string `gorm:"size:255" json:"import_model_name,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +func (QuantJobRecord) TableName() string { return "quantization_jobs" } + +// QuantStore manages quantization job state in PostgreSQL. +type QuantStore struct { + db *gorm.DB +} + +// NewQuantStore creates a new QuantStore and auto-migrates. +// Uses a PostgreSQL advisory lock to prevent concurrent migration races +// when multiple instances (frontend + workers) start at the same time. +func NewQuantStore(db *gorm.DB) (*QuantStore, error) { + if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { + return db.AutoMigrate(&QuantJobRecord{}) + }); err != nil { + return nil, fmt.Errorf("migrating quantization_jobs: %w", err) + } + return &QuantStore{db: db}, nil +} + +// Create stores a new quantization job. +func (s *QuantStore) Create(job *QuantJobRecord) error { + if job.ID == "" { + job.ID = uuid.New().String() + } + job.CreatedAt = time.Now() + job.UpdatedAt = job.CreatedAt + return s.db.Create(job).Error +} + +// Get retrieves a quantization job by ID. +func (s *QuantStore) Get(id string) (*QuantJobRecord, error) { + var job QuantJobRecord + if err := s.db.First(&job, "id = ?", id).Error; err != nil { + return nil, err + } + return &job, nil +} + +// ListAll returns every quantization job across all users. The SyncedMap that +// backs QuantizationService is a single global map (the REST API filters by user +// at read time), so hydrate needs the full set. +func (s *QuantStore) ListAll() ([]QuantJobRecord, error) { + var jobs []QuantJobRecord + return jobs, s.db.Order("created_at DESC").Find(&jobs).Error +} + +// Upsert idempotently inserts or fully replaces a job row by primary key. The +// SyncedMap write-through path issues a single Set per mutation regardless of +// whether the job already exists, so it needs one create-or-update primitive +// (Create alone fails on a duplicate key). +func (s *QuantStore) Upsert(job *QuantJobRecord) error { + if job.ID == "" { + job.ID = uuid.New().String() + } + now := time.Now() + if job.CreatedAt.IsZero() { + job.CreatedAt = now + } + job.UpdatedAt = now + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(job).Error +} + +// Delete removes a quantization job. +func (s *QuantStore) Delete(id string) error { + return s.db.Where("id = ?", id).Delete(&QuantJobRecord{}).Error +} diff --git a/core/services/distributed/quant_test.go b/core/services/distributed/quant_test.go new file mode 100644 index 000000000..49ae483f9 --- /dev/null +++ b/core/services/distributed/quant_test.go @@ -0,0 +1,57 @@ +package distributed_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +var _ = Describe("QuantStore", func() { + var store *distributed.QuantStore + + BeforeEach(func() { + db := testutil.SetupTestDB() + var err error + store, err = distributed.NewQuantStore(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("ListAll", func() { + It("returns jobs across all users", func() { + Expect(store.Create(&distributed.QuantJobRecord{ID: "j1", UserID: "u1", Status: "queued"})).To(Succeed()) + Expect(store.Create(&distributed.QuantJobRecord{ID: "j2", UserID: "u2", Status: "queued"})).To(Succeed()) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(2)) + }) + }) + + Describe("Upsert", func() { + It("inserts a new row", func() { + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-1", UserID: "u1", Status: "queued"})).To(Succeed()) + + got, err := store.Get("up-1") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("queued")) + }) + + It("idempotently updates an existing row on a repeated key", func() { + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-2", UserID: "u1", Status: "queued"})).To(Succeed()) + // Second Upsert with the same primary key must update, not error on a + // duplicate-key violation (this is the SyncedMap write-through contract). + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-2", UserID: "u1", Status: "completed", Message: "done"})).To(Succeed()) + + got, err := store.Get("up-2") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + Expect(got.Message).To(Equal("done")) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(1), "upsert must not create a duplicate") + }) + }) +}) diff --git a/core/services/finetune/finetune_suite_test.go b/core/services/finetune/finetune_suite_test.go new file mode 100644 index 000000000..fe7deb994 --- /dev/null +++ b/core/services/finetune/finetune_suite_test.go @@ -0,0 +1,13 @@ +package finetune + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestFinetune(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Finetune Suite") +} diff --git a/core/services/finetune/service.go b/core/services/finetune/service.go index 84d50d80e..3e2431df2 100644 --- a/core/services/finetune/service.go +++ b/core/services/finetune/service.go @@ -19,6 +19,7 @@ import ( "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/core/services/distributed" "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/utils" @@ -32,44 +33,63 @@ type FineTuneService struct { modelLoader *model.ModelLoader configLoader *config.ModelConfigLoader - mu sync.Mutex - jobs map[string]*schema.FineTuneJob + // mu serializes the read-modify-write of job values. The SyncedMap guards its + // own map structure, but a job is a pointer mutated in place (e.g. the export + // goroutine), so the service still needs a lock to keep those field updates + // and the subsequent Set atomic with respect to readers. + mu sync.Mutex - // Distributed mode (nil when not in distributed mode) - natsClient messaging.Publisher - fineTuneStore *distributed.FineTuneStore + // jobs is the cross-replica job store: an in-memory map kept consistent across + // replicas via NATS, optionally read-through to PostgreSQL in distributed mode. + jobs *syncstate.SyncedMap[string, *schema.FineTuneJob] } -// SetNATSClient sets the NATS client for distributed progress publishing. -func (s *FineTuneService) SetNATSClient(nc messaging.Publisher) { - s.mu.Lock() - defer s.mu.Unlock() - s.natsClient = nc -} - -// SetFineTuneStore sets the PostgreSQL fine-tune store for distributed persistence. -func (s *FineTuneService) SetFineTuneStore(store *distributed.FineTuneStore) { - s.mu.Lock() - defer s.mu.Unlock() - s.fineTuneStore = store -} - -// NewFineTuneService creates a new FineTuneService. +// NewFineTuneService creates a new FineTuneService. In distributed mode pass the +// shared NATS client and PostgreSQL store so jobs stay consistent across +// replicas; pass nil for both in standalone mode, where the disk Loader hydrates +// the map and there is nothing to broadcast. func NewFineTuneService( appConfig *config.ApplicationConfig, modelLoader *model.ModelLoader, configLoader *config.ModelConfigLoader, + nats messaging.MessagingClient, + store *distributed.FineTuneStore, ) *FineTuneService { s := &FineTuneService{ appConfig: appConfig, modelLoader: modelLoader, configLoader: configLoader, - jobs: make(map[string]*schema.FineTuneJob), } - s.loadAllJobs() + + // Only attach a Store interface when a concrete store exists, otherwise the + // SyncedMap would see a non-nil interface wrapping a nil pointer and try to + // hydrate/write through a nil DB. + var syncStore syncstate.Store[string, *schema.FineTuneJob] + if store != nil { + syncStore = &fineTuneStoreAdapter{store: store} + } + + s.jobs = syncstate.New(syncstate.Config[string, *schema.FineTuneJob]{ + Name: "finetune.jobs", + Key: func(j *schema.FineTuneJob) string { return j.ID }, + Nats: nats, + Store: syncStore, + Loader: s.loadJobsFromDisk, // ignored when Store is set (distributed mode) + }) + + // Hydrate + subscribe. A hydrate failure must not take the server down: log + // and continue degraded (standalone), mirroring the OpCache wiring. + if err := s.jobs.Start(appConfig.Context); err != nil { + xlog.Warn("FineTune SyncedMap start failed; running degraded", "error", err) + } return s } +// Close releases the SyncedMap subscription and background workers. +func (s *FineTuneService) Close() error { + return s.jobs.Close() +} + // fineTuneBaseDir returns the base directory for fine-tune job data. func (s *FineTuneService) fineTuneBaseDir() string { return filepath.Join(s.appConfig.DataPath, "fine-tune") @@ -100,15 +120,18 @@ func (s *FineTuneService) saveJobState(job *schema.FineTuneJob) { } } -// loadAllJobs scans the fine-tune directory for persisted jobs and loads them. -func (s *FineTuneService) loadAllJobs() { +// loadJobsFromDisk scans the fine-tune directory for persisted jobs and returns +// them. It is the SyncedMap Loader used in standalone mode (no DB); the returned +// slice hydrates the map on Start. +func (s *FineTuneService) loadJobsFromDisk(_ context.Context) ([]*schema.FineTuneJob, error) { baseDir := s.fineTuneBaseDir() entries, err := os.ReadDir(baseDir) if err != nil { - // Directory doesn't exist yet — that's fine - return + // Directory doesn't exist yet — that's fine, start empty. + return nil, nil } + var jobs []*schema.FineTuneJob for _, entry := range entries { if !entry.IsDir() { continue @@ -137,12 +160,13 @@ func (s *FineTuneService) loadAllJobs() { job.ExportMessage = "Server restarted while export was running" } - s.jobs[job.ID] = &job + jobs = append(jobs, &job) } - if len(s.jobs) > 0 { - xlog.Info("Loaded persisted fine-tune jobs", "count", len(s.jobs)) + if len(jobs) > 0 { + xlog.Info("Loaded persisted fine-tune jobs", "count", len(jobs)) } + return jobs, nil } // StartJob starts a new fine-tuning job. @@ -236,27 +260,13 @@ func (s *FineTuneService) StartJob(ctx context.Context, userID string, req schem CreatedAt: time.Now().UTC().Format(time.RFC3339), Config: &req, } - s.jobs[jobID] = job - s.saveJobState(job) - - // Persist to PostgreSQL in distributed mode - if s.fineTuneStore != nil { - configJSON, _ := json.Marshal(req) - extraJSON, _ := json.Marshal(req.ExtraOptions) - s.fineTuneStore.Create(&distributed.FineTuneJobRecord{ - ID: jobID, - UserID: userID, - Model: req.Model, - Backend: backendName, - ModelID: modelID, - TrainingType: req.TrainingType, - TrainingMethod: req.TrainingMethod, - Status: "queued", - OutputDir: outputDir, - ConfigJSON: string(configJSON), - ExtraOptsJSON: string(extraJSON), - }) + // Set write-through persists to PostgreSQL (distributed) and broadcasts to + // peer replicas; the disk state.json is written separately for restart + // recovery / standalone hydrate. + if err := s.jobs.Set(ctx, job); err != nil { + return nil, fmt.Errorf("failed to persist job: %w", err) } + s.saveJobState(job) return &schema.FineTuneJobResponse{ ID: jobID, @@ -270,7 +280,7 @@ func (s *FineTuneService) GetJob(userID, jobID string) (*schema.FineTuneJob, err s.mu.Lock() defer s.mu.Unlock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { return nil, fmt.Errorf("job not found: %s", jobID) } @@ -286,7 +296,7 @@ func (s *FineTuneService) ListJobs(userID string) []*schema.FineTuneJob { defer s.mu.Unlock() var result []*schema.FineTuneJob - for _, job := range s.jobs { + for _, job := range s.jobs.List() { if userID == "" || job.UserID == userID { result = append(result, job) } @@ -302,7 +312,7 @@ func (s *FineTuneService) ListJobs(userID string) []*schema.FineTuneJob { // StopJob stops a running fine-tuning job. func (s *FineTuneService) StopJob(ctx context.Context, userID, jobID string, saveCheckpoint bool) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -323,10 +333,10 @@ func (s *FineTuneService) StopJob(ctx context.Context, userID, jobID string, sav s.mu.Lock() job.Status = "stopped" job.Message = "Training stopped by user" - s.saveJobState(job) - if s.fineTuneStore != nil { - s.fineTuneStore.UpdateStatus(jobID, "stopped", "Training stopped by user") + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist stopped job", "job_id", jobID, "error", err) } + s.saveJobState(job) s.mu.Unlock() return nil @@ -335,7 +345,7 @@ func (s *FineTuneService) StopJob(ctx context.Context, userID, jobID string, sav // DeleteJob removes a fine-tuning job and its associated data from disk. func (s *FineTuneService) DeleteJob(userID, jobID string) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -360,9 +370,10 @@ func (s *FineTuneService) DeleteJob(userID, jobID string) error { } exportModelName := job.ExportModelName - delete(s.jobs, jobID) - if s.fineTuneStore != nil { - s.fineTuneStore.Delete(jobID) + // Delete write-through removes the DB row (distributed) and broadcasts the + // removal to peer replicas. DeleteJob has no ctx, so use Background. + if err := s.jobs.Delete(context.Background(), jobID); err != nil { + xlog.Warn("Failed to delete job from store", "job_id", jobID, "error", err) } s.mu.Unlock() @@ -398,7 +409,7 @@ func (s *FineTuneService) DeleteJob(userID, jobID string) error { // StreamProgress opens a gRPC progress stream and calls the callback for each update. func (s *FineTuneService) StreamProgress(ctx context.Context, userID, jobID string, callback func(event *schema.FineTuneProgressEvent)) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -427,7 +438,7 @@ func (s *FineTuneService) StreamProgress(ctx context.Context, userID, jobID stri }, func(update *pb.FineTuneProgressUpdate) { // Update job status and persist s.mu.Lock() - if j, ok := s.jobs[jobID]; ok { + if j, ok := s.jobs.Get(jobID); ok { // Don't let progress updates overwrite terminal states isTerminal := j.Status == "stopped" || j.Status == "completed" || j.Status == "failed" if !isTerminal { @@ -436,10 +447,10 @@ func (s *FineTuneService) StreamProgress(ctx context.Context, userID, jobID stri if update.Message != "" { j.Message = update.Message } - s.saveJobState(j) - if s.fineTuneStore != nil { - s.fineTuneStore.UpdateStatus(jobID, j.Status, j.Message) + if err := s.jobs.Set(ctx, j); err != nil { + xlog.Warn("Failed to persist progress update", "job_id", jobID, "error", err) } + s.saveJobState(j) } s.mu.Unlock() @@ -474,7 +485,7 @@ func (s *FineTuneService) StreamProgress(ctx context.Context, userID, jobID stri // ListCheckpoints lists checkpoints for a job. func (s *FineTuneService) ListCheckpoints(ctx context.Context, userID, jobID string) ([]*pb.CheckpointInfo, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return nil, fmt.Errorf("job not found: %s", jobID) @@ -520,7 +531,7 @@ func sanitizeModelName(s string) string { // ExportModel starts an async model export from a checkpoint and returns the intended model name immediately. func (s *FineTuneService) ExportModel(ctx context.Context, userID, jobID string, req schema.ExportRequest) (string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", fmt.Errorf("job not found: %s", jobID) @@ -572,6 +583,9 @@ func (s *FineTuneService) ExportModel(ctx context.Context, userID, jobID string, job.ExportStatus = "exporting" job.ExportMessage = "" job.ExportModelName = "" + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist export start", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() @@ -662,24 +676,30 @@ func (s *FineTuneService) ExportModel(ctx context.Context, userID, jobID string, xlog.Info("Model exported and registered", "job_id", jobID, "model_name", modelName, "format", req.ExportFormat) + // Runs after the HTTP request returns, so use Background rather than the + // (now likely cancelled) request ctx for the write-through. s.mu.Lock() job.ExportStatus = "completed" job.ExportModelName = modelName job.ExportMessage = "" - s.saveJobState(job) - if s.fineTuneStore != nil { - s.fineTuneStore.UpdateExportStatus(jobID, "completed", "", modelName) + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist export completion", "job_id", jobID, "error", err) } + s.saveJobState(job) s.mu.Unlock() }() return modelName, nil } -// setExportMessage updates the export message and persists the job state. +// setExportMessage updates the export message and persists the job state. Called +// from the background export goroutine, so it uses Background for write-through. func (s *FineTuneService) setExportMessage(job *schema.FineTuneJob, msg string) { s.mu.Lock() job.ExportMessage = msg + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist export message", "job_id", job.ID, "error", err) + } s.saveJobState(job) s.mu.Unlock() } @@ -687,7 +707,7 @@ func (s *FineTuneService) setExportMessage(job *schema.FineTuneJob, msg string) // GetExportedModelPath returns the path to the exported model directory and its name. func (s *FineTuneService) GetExportedModelPath(userID, jobID string) (string, string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", "", fmt.Errorf("job not found: %s", jobID) @@ -723,10 +743,10 @@ func (s *FineTuneService) setExportFailed(job *schema.FineTuneJob, message strin s.mu.Lock() job.ExportStatus = "failed" job.ExportMessage = message - s.saveJobState(job) - if s.fineTuneStore != nil { - s.fineTuneStore.UpdateExportStatus(job.ID, "failed", message, "") + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist export failure", "job_id", job.ID, "error", err) } + s.saveJobState(job) s.mu.Unlock() } diff --git a/core/services/finetune/service_test.go b/core/services/finetune/service_test.go new file mode 100644 index 000000000..dc7c53290 --- /dev/null +++ b/core/services/finetune/service_test.go @@ -0,0 +1,185 @@ +package finetune + +// White-box tests (package finetune) so a spec can drive the service's internal +// SyncedMap the same way StartJob does (via jobs.Set) without standing up a +// training backend, then assert the cross-replica reads (GetJob/ListJobs) and +// the adapter conversions that keep REST responses byte-for-byte unchanged. + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// newTestService builds a standalone FineTuneService wired to the given bus. The +// model/config loaders are nil because the read/sync paths under test never touch +// them; the data dir is a throwaway temp dir so the disk Loader finds nothing. +func newTestService(bus *testutil.FakeBus) *FineTuneService { + appConfig := &config.ApplicationConfig{ + Context: context.Background(), + DataPath: GinkgoT().TempDir(), + } + return NewFineTuneService(appConfig, nil, nil, bus, nil) +} + +var _ = Describe("FineTuneService", func() { + ctx := context.Background() + + Describe("cross-replica job visibility", func() { + var ( + bus *testutil.FakeBus + a, b *FineTuneService + ) + + BeforeEach(func() { + // One shared bus, two replicas: exactly the distributed topology where + // a round-robin request may land on a replica that did not originate + // the change. + bus = testutil.NewFakeBus() + a = newTestService(bus) + b = newTestService(bus) + }) + + AfterEach(func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }) + + It("makes a job created on A visible via B's GetJob and ListJobs", func() { + job := &schema.FineTuneJob{ID: "job-1", UserID: "user-1", Status: "queued", CreatedAt: "2026-06-27T10:00:00Z"} + // StartJob persists via jobs.Set; drive that directly to avoid a backend. + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-1") + Expect(err).ToNot(HaveOccurred(), "B must see a job A just created") + Expect(got.Status).To(Equal("queued")) + + listed := b.ListJobs("user-1") + Expect(listed).To(HaveLen(1)) + Expect(listed[0].ID).To(Equal("job-1")) + }) + + It("removes a job from B when it is deleted on A", func() { + job := &schema.FineTuneJob{ID: "job-2", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + _, err := b.GetJob("user-1", "job-2") + Expect(err).ToNot(HaveOccurred(), "precondition: B must have the job before the delete") + + Expect(a.jobs.Delete(ctx, "job-2")).To(Succeed()) + + _, err = b.GetJob("user-1", "job-2") + Expect(err).To(HaveOccurred(), "a delete on A must remove the job from B") + }) + + It("propagates a status update from A to B", func() { + job := &schema.FineTuneJob{ID: "job-3", UserID: "user-1", Status: "training", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + updated := &schema.FineTuneJob{ID: "job-3", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, updated)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-3") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + }) + }) + + Describe("ListJobs", func() { + var svc *FineTuneService + + BeforeEach(func() { + svc = newTestService(testutil.NewFakeBus()) + }) + AfterEach(func() { Expect(svc.Close()).To(Succeed()) }) + + It("filters by user and sorts newest-first", func() { + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "old", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "new", UserID: "u1", CreatedAt: "2026-06-27T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "other", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + jobs := svc.ListJobs("u1") + Expect(jobs).To(HaveLen(2), "only u1's jobs") + Expect(jobs[0].ID).To(Equal("new"), "newest first") + Expect(jobs[1].ID).To(Equal("old")) + }) + + It("returns every user's jobs when the userID filter is empty", func() { + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "a", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "b", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + Expect(svc.ListJobs("")).To(HaveLen(2)) + }) + + It("rejects GetJob for a job owned by another user", func() { + Expect(svc.jobs.Set(ctx, &schema.FineTuneJob{ID: "x", UserID: "owner", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + + _, err := svc.GetJob("intruder", "x") + Expect(err).To(HaveOccurred(), "a different user must not read someone else's job") + }) + }) + + Describe("store adapter conversion", func() { + // The SyncedMap value type is *schema.FineTuneJob (the exact REST shape). + // These specs prove the DB adapter round-trips it losslessly, so hydrate + // and write-through in distributed mode keep responses unchanged. + It("round-trips a job through jobToRecord/recordToJob preserving the API shape", func() { + original := &schema.FineTuneJob{ + ID: "rt-1", + UserID: "user-1", + Model: "base-model", + Backend: "trl", + ModelID: "trl-finetune-rt-1", + TrainingType: "lora", + TrainingMethod: "sft", + Status: "completed", + Message: "done", + OutputDir: "/data/fine-tune/rt-1", + ExtraOptions: map[string]string{"hf_token": "secret"}, + CreatedAt: "2026-06-27T10:00:00Z", + ExportStatus: "completed", + ExportMessage: "", + ExportModelName: "base-model-ft-rt-1", + Config: &schema.FineTuneJobRequest{Model: "base-model", Backend: "trl", DatasetSource: "data.jsonl"}, + } + + rec := jobToRecord(original) + Expect(rec.ID).To(Equal("rt-1")) + Expect(rec.ConfigJSON).ToNot(BeEmpty(), "structured config must serialize into the JSON column") + Expect(rec.ExtraOptsJSON).ToNot(BeEmpty()) + + back := recordToJob(rec) + Expect(back.ID).To(Equal(original.ID)) + Expect(back.UserID).To(Equal(original.UserID)) + Expect(back.Model).To(Equal(original.Model)) + Expect(back.Backend).To(Equal(original.Backend)) + Expect(back.ModelID).To(Equal(original.ModelID)) + Expect(back.TrainingType).To(Equal(original.TrainingType)) + Expect(back.TrainingMethod).To(Equal(original.TrainingMethod)) + Expect(back.Status).To(Equal(original.Status)) + Expect(back.Message).To(Equal(original.Message)) + Expect(back.OutputDir).To(Equal(original.OutputDir)) + Expect(back.ExportStatus).To(Equal(original.ExportStatus)) + Expect(back.ExportModelName).To(Equal(original.ExportModelName)) + Expect(back.CreatedAt).To(Equal(original.CreatedAt)) + Expect(back.ExtraOptions).To(Equal(original.ExtraOptions)) + Expect(back.Config).ToNot(BeNil()) + Expect(back.Config.DatasetSource).To(Equal("data.jsonl")) + }) + }) + + Describe("compile-time adapter contract", func() { + It("satisfies syncstate.Store for *distributed.FineTuneStore", func() { + // Guards against drift between the adapter and the component interface; + // the var assertion in syncstore.go covers it at build time, this keeps + // the type referenced from a spec too. + var _ *distributed.FineTuneStore + Expect(&fineTuneStoreAdapter{}).ToNot(BeNil()) + }) + }) +}) diff --git a/core/services/finetune/syncstore.go b/core/services/finetune/syncstore.go new file mode 100644 index 000000000..e5bd8239c --- /dev/null +++ b/core/services/finetune/syncstore.go @@ -0,0 +1,114 @@ +package finetune + +import ( + "context" + "encoding/json" + "time" + + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/syncstate" +) + +// fineTuneStoreAdapter bridges the distributed PostgreSQL FineTuneStore to the +// generic syncstate.Store the SyncedMap consumes. It is only wired in distributed +// mode; standalone leaves Store nil and hydrates from disk via a Loader instead. +// +// The SyncedMap value type is *schema.FineTuneJob (the exact shape the REST API +// returns) so reads need no conversion and the response JSON is provably +// unchanged. The adapter is the single place that translates between that API +// shape and the DB FineTuneJobRecord. +type fineTuneStoreAdapter struct { + store *distributed.FineTuneStore +} + +// compile-time assertion that the adapter satisfies the component's Store. +var _ syncstate.Store[string, *schema.FineTuneJob] = (*fineTuneStoreAdapter)(nil) + +func (a *fineTuneStoreAdapter) List(_ context.Context) ([]*schema.FineTuneJob, error) { + records, err := a.store.ListAll() + if err != nil { + return nil, err + } + jobs := make([]*schema.FineTuneJob, 0, len(records)) + for i := range records { + jobs = append(jobs, recordToJob(&records[i])) + } + return jobs, nil +} + +func (a *fineTuneStoreAdapter) Upsert(_ context.Context, job *schema.FineTuneJob) error { + return a.store.Upsert(jobToRecord(job)) +} + +func (a *fineTuneStoreAdapter) Delete(_ context.Context, id string) error { + return a.store.Delete(id) +} + +// recordToJob maps a persisted DB record back to the API shape, reconstructing +// the structured Config / ExtraOptions from their JSON columns. +func recordToJob(r *distributed.FineTuneJobRecord) *schema.FineTuneJob { + job := &schema.FineTuneJob{ + ID: r.ID, + UserID: r.UserID, + Model: r.Model, + Backend: r.Backend, + ModelID: r.ModelID, + TrainingType: r.TrainingType, + TrainingMethod: r.TrainingMethod, + Status: r.Status, + Message: r.Message, + OutputDir: r.OutputDir, + ExportStatus: r.ExportStatus, + ExportMessage: r.ExportMessage, + ExportModelName: r.ExportModelName, + CreatedAt: r.CreatedAt.UTC().Format(time.RFC3339), + } + if r.ExtraOptsJSON != "" { + // Best-effort: a malformed column must not drop the whole job from the API. + _ = json.Unmarshal([]byte(r.ExtraOptsJSON), &job.ExtraOptions) + } + if r.ConfigJSON != "" { + var cfg schema.FineTuneJobRequest + if err := json.Unmarshal([]byte(r.ConfigJSON), &cfg); err == nil { + job.Config = &cfg + } + } + return job +} + +// jobToRecord maps the API shape to a DB record for write-through, serializing +// the structured Config / ExtraOptions into their JSON columns. CreatedAt is +// parsed back from the RFC3339 string the service stamps; an unparseable value +// is left zero so FineTuneStore.Upsert stamps "now". +func jobToRecord(job *schema.FineTuneJob) *distributed.FineTuneJobRecord { + rec := &distributed.FineTuneJobRecord{ + ID: job.ID, + UserID: job.UserID, + Model: job.Model, + Backend: job.Backend, + ModelID: job.ModelID, + TrainingType: job.TrainingType, + TrainingMethod: job.TrainingMethod, + Status: job.Status, + Message: job.Message, + OutputDir: job.OutputDir, + ExportStatus: job.ExportStatus, + ExportMessage: job.ExportMessage, + ExportModelName: job.ExportModelName, + } + if job.Config != nil { + if data, err := json.Marshal(job.Config); err == nil { + rec.ConfigJSON = string(data) + } + } + if job.ExtraOptions != nil { + if data, err := json.Marshal(job.ExtraOptions); err == nil { + rec.ExtraOptsJSON = string(data) + } + } + if t, err := time.Parse(time.RFC3339, job.CreatedAt); err == nil { + rec.CreatedAt = t + } + return rec +} diff --git a/core/services/messaging/client.go b/core/services/messaging/client.go index 31257f1fd..e01c7d9ca 100644 --- a/core/services/messaging/client.go +++ b/core/services/messaging/client.go @@ -22,6 +22,14 @@ const subscribeConfirmTimeout = 5 * time.Second type Client struct { conn *nats.Conn mu sync.RWMutex + + // reconnectCbs are invoked after the underlying connection is + // re-established. nats.go transparently resubscribes existing + // subscriptions on reconnect, but it cannot know that a consumer kept + // derived in-memory state (e.g. syncstate.SyncedMap) that may have drifted + // while the link was down — these callbacks let such consumers re-hydrate. + cbMu sync.Mutex + reconnectCbs []func() } // New creates a new NATS client with auto-reconnect. @@ -31,6 +39,10 @@ func New(url string, opts ...Option) (*Client, error) { o(&cfg) } + // Allocate the client up front so the reconnect handler closure can reach + // it; conn is populated after nats.Connect succeeds below. + c := &Client{} + natsOpts := []nats.Option{ nats.RetryOnFailedConnect(true), nats.MaxReconnects(-1), @@ -41,6 +53,7 @@ func New(url string, opts ...Option) (*Client, error) { }), nats.ReconnectHandler(func(_ *nats.Conn) { xlog.Info("NATS reconnected") + c.runReconnectCallbacks() }), nats.ClosedHandler(func(_ *nats.Conn) { xlog.Info("NATS connection closed") @@ -103,7 +116,33 @@ func New(url string, opts ...Option) (*Client, error) { return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err) } - return &Client{conn: nc}, nil + c.conn = nc + return c, nil +} + +// OnReconnect registers a callback invoked after the NATS connection is +// re-established. It is consumed via an optional interface type-assertion +// (interface{ OnReconnect(func()) }) rather than being added to MessagingClient, +// so the messaging abstraction stays minimal and standalone/test clients are not +// forced to implement reconnect semantics. A nil callback is ignored. +func (c *Client) OnReconnect(cb func()) { + if cb == nil { + return + } + c.cbMu.Lock() + c.reconnectCbs = append(c.reconnectCbs, cb) + c.cbMu.Unlock() +} + +// runReconnectCallbacks invokes registered reconnect callbacks. It copies the +// slice under the lock so a callback that (re)registers cannot deadlock. +func (c *Client) runReconnectCallbacks() { + c.cbMu.Lock() + cbs := append([]func(){}, c.reconnectCbs...) + c.cbMu.Unlock() + for _, cb := range cbs { + cb() + } } // Publish marshals data as JSON and publishes it to the given subject. diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 7d099460c..d2b11f535 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -380,6 +380,20 @@ func SubjectCacheInvalidateCollection(name string) string { return "cache.invalidate.collections." + sanitizeSubjectToken(name) } +// SyncedMap State Sync (Pub/Sub — broadcast to all frontends) +// +// The reusable syncstate.SyncedMap component publishes a {op,key,value} delta on +// this subject whenever a replica mutates a piece of cross-replica in-memory +// state. Peers subscribe and apply the delta to their own map, so a round-robin +// API request that lands on a replica which did not originate the change still +// sees it. Convergence on (re)connect is done by re-hydrating from the durable +// source, so no request/reply snapshot subject is needed here. +func SubjectSyncStateDelta(name string) string { + return subjectSyncStatePrefix + sanitizeSubjectToken(name) + ".delta" +} + +const subjectSyncStatePrefix = "state." + // Prefix-Cache Routing Sync (Pub/Sub - broadcast to all frontends) // // Frontends share prefix-cache observations so a request routed to any replica diff --git a/core/services/quantization/quantization_suite_test.go b/core/services/quantization/quantization_suite_test.go new file mode 100644 index 000000000..6fabcd2f5 --- /dev/null +++ b/core/services/quantization/quantization_suite_test.go @@ -0,0 +1,13 @@ +package quantization + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestQuantization(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Quantization Suite") +} diff --git a/core/services/quantization/service.go b/core/services/quantization/service.go index 64325b97c..cd9cbcead 100644 --- a/core/services/quantization/service.go +++ b/core/services/quantization/service.go @@ -17,6 +17,9 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery/importers" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/utils" @@ -30,26 +33,63 @@ type QuantizationService struct { modelLoader *model.ModelLoader configLoader *config.ModelConfigLoader - mu sync.Mutex - jobs map[string]*schema.QuantizationJob + // mu serializes the read-modify-write of job values. The SyncedMap guards its + // own map structure, but a job is a pointer mutated in place (e.g. the import + // goroutine), so the service still needs a lock to keep those field updates and + // the subsequent Set atomic with respect to readers. + mu sync.Mutex + + // jobs is the cross-replica job store: an in-memory map kept consistent across + // replicas via NATS, optionally read-through to PostgreSQL in distributed mode. + jobs *syncstate.SyncedMap[string, *schema.QuantizationJob] } -// NewQuantizationService creates a new QuantizationService. +// NewQuantizationService creates a new QuantizationService. In distributed mode +// pass the shared NATS client and PostgreSQL store so jobs stay consistent across +// replicas; pass nil for both in standalone mode, where the disk Loader hydrates +// the map and there is nothing to broadcast. func NewQuantizationService( appConfig *config.ApplicationConfig, modelLoader *model.ModelLoader, configLoader *config.ModelConfigLoader, + nats messaging.MessagingClient, + store *distributed.QuantStore, ) *QuantizationService { s := &QuantizationService{ appConfig: appConfig, modelLoader: modelLoader, configLoader: configLoader, - jobs: make(map[string]*schema.QuantizationJob), } - s.loadAllJobs() + + // Only attach a Store interface when a concrete store exists, otherwise the + // SyncedMap would see a non-nil interface wrapping a nil pointer and try to + // hydrate/write through a nil DB. + var syncStore syncstate.Store[string, *schema.QuantizationJob] + if store != nil { + syncStore = &quantStoreAdapter{store: store} + } + + s.jobs = syncstate.New(syncstate.Config[string, *schema.QuantizationJob]{ + Name: "quant.jobs", + Key: func(j *schema.QuantizationJob) string { return j.ID }, + Nats: nats, + Store: syncStore, + Loader: s.loadJobsFromDisk, // ignored when Store is set (distributed mode) + }) + + // Hydrate + subscribe. A hydrate failure must not take the server down: log and + // continue degraded (standalone), mirroring the FineTune/OpCache wiring. + if err := s.jobs.Start(appConfig.Context); err != nil { + xlog.Warn("Quantization SyncedMap start failed; running degraded", "error", err) + } return s } +// Close releases the SyncedMap subscription and background workers. +func (s *QuantizationService) Close() error { + return s.jobs.Close() +} + // quantizationBaseDir returns the base directory for quantization job data. func (s *QuantizationService) quantizationBaseDir() string { return filepath.Join(s.appConfig.DataPath, "quantization") @@ -80,15 +120,18 @@ func (s *QuantizationService) saveJobState(job *schema.QuantizationJob) { } } -// loadAllJobs scans the quantization directory for persisted jobs and loads them. -func (s *QuantizationService) loadAllJobs() { +// loadJobsFromDisk scans the quantization directory for persisted jobs and +// returns them. It is the SyncedMap Loader used in standalone mode (no DB); the +// returned slice hydrates the map on Start. +func (s *QuantizationService) loadJobsFromDisk(_ context.Context) ([]*schema.QuantizationJob, error) { baseDir := s.quantizationBaseDir() entries, err := os.ReadDir(baseDir) if err != nil { - // Directory doesn't exist yet — that's fine - return + // Directory doesn't exist yet — that's fine, start empty. + return nil, nil } + var jobs []*schema.QuantizationJob for _, entry := range entries { if !entry.IsDir() { continue @@ -117,12 +160,13 @@ func (s *QuantizationService) loadAllJobs() { job.ImportMessage = "Server restarted while import was running" } - s.jobs[job.ID] = &job + jobs = append(jobs, &job) } - if len(s.jobs) > 0 { - xlog.Info("Loaded persisted quantization jobs", "count", len(s.jobs)) + if len(jobs) > 0 { + xlog.Info("Loaded persisted quantization jobs", "count", len(jobs)) } + return jobs, nil } // StartJob starts a new quantization job. @@ -188,7 +232,12 @@ func (s *QuantizationService) StartJob(ctx context.Context, userID string, req s CreatedAt: time.Now().UTC().Format(time.RFC3339), Config: &req, } - s.jobs[jobID] = job + // Set write-through persists to PostgreSQL (distributed) and broadcasts to + // peer replicas; the disk state.json is written separately for restart + // recovery / standalone hydrate. + if err := s.jobs.Set(ctx, job); err != nil { + return nil, fmt.Errorf("failed to persist job: %w", err) + } s.saveJobState(job) return &schema.QuantizationJobResponse{ @@ -203,7 +252,7 @@ func (s *QuantizationService) GetJob(userID, jobID string) (*schema.Quantization s.mu.Lock() defer s.mu.Unlock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { return nil, fmt.Errorf("job not found: %s", jobID) } @@ -219,7 +268,7 @@ func (s *QuantizationService) ListJobs(userID string) []*schema.QuantizationJob defer s.mu.Unlock() var result []*schema.QuantizationJob - for _, job := range s.jobs { + for _, job := range s.jobs.List() { if userID == "" || job.UserID == userID { result = append(result, job) } @@ -235,7 +284,7 @@ func (s *QuantizationService) ListJobs(userID string) []*schema.QuantizationJob // StopJob stops a running quantization job. func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -256,6 +305,9 @@ func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) s.mu.Lock() job.Status = "stopped" job.Message = "Quantization stopped by user" + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist stopped job", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() @@ -265,7 +317,7 @@ func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) // DeleteJob removes a quantization job and its associated data from disk. func (s *QuantizationService) DeleteJob(userID, jobID string) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -289,7 +341,11 @@ func (s *QuantizationService) DeleteJob(userID, jobID string) error { } importModelName := job.ImportModelName - delete(s.jobs, jobID) + // Delete write-through removes the DB row (distributed) and broadcasts the + // removal to peer replicas. DeleteJob has no ctx, so use Background. + if err := s.jobs.Delete(context.Background(), jobID); err != nil { + xlog.Warn("Failed to delete job from store", "job_id", jobID, "error", err) + } s.mu.Unlock() // Remove job directory (state.json, output files) @@ -324,7 +380,7 @@ func (s *QuantizationService) DeleteJob(userID, jobID string) error { // StreamProgress opens a gRPC progress stream and calls the callback for each update. func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent)) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -353,7 +409,7 @@ func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID }, func(update *pb.QuantizationProgressUpdate) { // Update job status and persist s.mu.Lock() - if j, ok := s.jobs[jobID]; ok { + if j, ok := s.jobs.Get(jobID); ok { // Don't let progress updates overwrite terminal states isTerminal := j.Status == "stopped" || j.Status == "completed" || j.Status == "failed" if !isTerminal { @@ -365,6 +421,9 @@ func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID if update.OutputFile != "" { j.OutputFile = update.OutputFile } + if err := s.jobs.Set(ctx, j); err != nil { + xlog.Warn("Failed to persist progress update", "job_id", jobID, "error", err) + } s.saveJobState(j) } s.mu.Unlock() @@ -399,7 +458,7 @@ func sanitizeQuantModelName(s string) string { // ImportModel imports a quantized model into LocalAI asynchronously. func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID string, req schema.QuantizationImportRequest) (string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", fmt.Errorf("job not found: %s", jobID) @@ -459,6 +518,9 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str job.ImportStatus = "importing" job.ImportMessage = "" job.ImportModelName = "" + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist import start", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() @@ -514,10 +576,15 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str xlog.Info("Quantized model imported and registered", "job_id", jobID, "model_name", modelName) + // Runs after the HTTP request returns, so use Background rather than the + // (now likely cancelled) request ctx for the write-through. s.mu.Lock() job.ImportStatus = "completed" job.ImportModelName = modelName job.ImportMessage = "" + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import completion", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() }() @@ -525,10 +592,14 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str return modelName, nil } -// setImportMessage updates the import message and persists the job state. +// setImportMessage updates the import message and persists the job state. Called +// from the background import goroutine, so it uses Background for write-through. func (s *QuantizationService) setImportMessage(job *schema.QuantizationJob, msg string) { s.mu.Lock() job.ImportMessage = msg + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import message", "job_id", job.ID, "error", err) + } s.saveJobState(job) s.mu.Unlock() } @@ -539,6 +610,9 @@ func (s *QuantizationService) setImportFailed(job *schema.QuantizationJob, messa s.mu.Lock() job.ImportStatus = "failed" job.ImportMessage = message + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import failure", "job_id", job.ID, "error", err) + } s.saveJobState(job) s.mu.Unlock() } @@ -546,7 +620,7 @@ func (s *QuantizationService) setImportFailed(job *schema.QuantizationJob, messa // GetOutputPath returns the path to the quantized model file and a download name. func (s *QuantizationService) GetOutputPath(userID, jobID string) (string, string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", "", fmt.Errorf("job not found: %s", jobID) diff --git a/core/services/quantization/service_test.go b/core/services/quantization/service_test.go new file mode 100644 index 000000000..665728614 --- /dev/null +++ b/core/services/quantization/service_test.go @@ -0,0 +1,187 @@ +package quantization + +// White-box tests (package quantization) so a spec can drive the service's +// internal SyncedMap the same way StartJob does (via jobs.Set) without standing +// up a quantization backend, then assert the cross-replica reads +// (GetJob/ListJobs) and the adapter conversions that keep REST responses +// byte-for-byte unchanged. + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// newTestService builds a standalone QuantizationService wired to the given bus. +// The model/config loaders are nil because the read/sync paths under test never +// touch them; the data dir is a throwaway temp dir so the disk Loader finds +// nothing. +func newTestService(bus *testutil.FakeBus) *QuantizationService { + appConfig := &config.ApplicationConfig{ + Context: context.Background(), + DataPath: GinkgoT().TempDir(), + } + return NewQuantizationService(appConfig, nil, nil, bus, nil) +} + +var _ = Describe("QuantizationService", func() { + ctx := context.Background() + + Describe("cross-replica job visibility", func() { + var ( + bus *testutil.FakeBus + a, b *QuantizationService + ) + + BeforeEach(func() { + // One shared bus, two replicas: exactly the distributed topology where a + // round-robin request may land on a replica that did not originate the + // change. + bus = testutil.NewFakeBus() + a = newTestService(bus) + b = newTestService(bus) + }) + + AfterEach(func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }) + + It("makes a job created on A visible via B's GetJob and ListJobs", func() { + job := &schema.QuantizationJob{ID: "job-1", UserID: "user-1", Status: "queued", CreatedAt: "2026-06-27T10:00:00Z"} + // StartJob persists via jobs.Set; drive that directly to avoid a backend. + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-1") + Expect(err).ToNot(HaveOccurred(), "B must see a job A just created") + Expect(got.Status).To(Equal("queued")) + + listed := b.ListJobs("user-1") + Expect(listed).To(HaveLen(1)) + Expect(listed[0].ID).To(Equal("job-1")) + }) + + It("removes a job from B when it is deleted on A", func() { + job := &schema.QuantizationJob{ID: "job-2", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + _, err := b.GetJob("user-1", "job-2") + Expect(err).ToNot(HaveOccurred(), "precondition: B must have the job before the delete") + + Expect(a.jobs.Delete(ctx, "job-2")).To(Succeed()) + + _, err = b.GetJob("user-1", "job-2") + Expect(err).To(HaveOccurred(), "a delete on A must remove the job from B") + }) + + It("propagates a status update from A to B", func() { + job := &schema.QuantizationJob{ID: "job-3", UserID: "user-1", Status: "quantizing", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + updated := &schema.QuantizationJob{ID: "job-3", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, updated)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-3") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + }) + }) + + Describe("ListJobs", func() { + var svc *QuantizationService + + BeforeEach(func() { + svc = newTestService(testutil.NewFakeBus()) + }) + AfterEach(func() { Expect(svc.Close()).To(Succeed()) }) + + It("filters by user and sorts newest-first", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "old", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "new", UserID: "u1", CreatedAt: "2026-06-27T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "other", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + jobs := svc.ListJobs("u1") + Expect(jobs).To(HaveLen(2), "only u1's jobs") + Expect(jobs[0].ID).To(Equal("new"), "newest first") + Expect(jobs[1].ID).To(Equal("old")) + }) + + It("returns every user's jobs when the userID filter is empty", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "a", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "b", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + Expect(svc.ListJobs("")).To(HaveLen(2)) + }) + + It("rejects GetJob for a job owned by another user", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "x", UserID: "owner", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + + _, err := svc.GetJob("intruder", "x") + Expect(err).To(HaveOccurred(), "a different user must not read someone else's job") + }) + }) + + Describe("store adapter conversion", func() { + // The SyncedMap value type is *schema.QuantizationJob (the exact REST shape). + // These specs prove the DB adapter round-trips it losslessly, so hydrate and + // write-through in distributed mode keep responses unchanged. + It("round-trips a job through jobToRecord/recordToJob preserving the API shape", func() { + original := &schema.QuantizationJob{ + ID: "rt-1", + UserID: "user-1", + Model: "base-model", + Backend: "llama-cpp-quantization", + ModelID: "llama-cpp-quantization-quantize-rt-1", + QuantizationType: "q4_k_m", + Status: "completed", + Message: "done", + OutputDir: "/data/quantization/rt-1", + OutputFile: "/data/quantization/rt-1/model.gguf", + ExtraOptions: map[string]string{"hf_token": "secret"}, + CreatedAt: "2026-06-27T10:00:00Z", + ImportStatus: "completed", + ImportMessage: "", + ImportModelName: "base-model-q4_k_m-rt-1", + Config: &schema.QuantizationJobRequest{Model: "base-model", Backend: "llama-cpp-quantization", QuantizationType: "q4_k_m"}, + } + + rec := jobToRecord(original) + Expect(rec.ID).To(Equal("rt-1")) + Expect(rec.ConfigJSON).ToNot(BeEmpty(), "structured config must serialize into the JSON column") + Expect(rec.ExtraOptsJSON).ToNot(BeEmpty()) + + back := recordToJob(rec) + Expect(back.ID).To(Equal(original.ID)) + Expect(back.UserID).To(Equal(original.UserID)) + Expect(back.Model).To(Equal(original.Model)) + Expect(back.Backend).To(Equal(original.Backend)) + Expect(back.ModelID).To(Equal(original.ModelID)) + Expect(back.QuantizationType).To(Equal(original.QuantizationType)) + Expect(back.Status).To(Equal(original.Status)) + Expect(back.Message).To(Equal(original.Message)) + Expect(back.OutputDir).To(Equal(original.OutputDir)) + Expect(back.OutputFile).To(Equal(original.OutputFile)) + Expect(back.ImportStatus).To(Equal(original.ImportStatus)) + Expect(back.ImportModelName).To(Equal(original.ImportModelName)) + Expect(back.CreatedAt).To(Equal(original.CreatedAt)) + Expect(back.ExtraOptions).To(Equal(original.ExtraOptions)) + Expect(back.Config).ToNot(BeNil()) + Expect(back.Config.QuantizationType).To(Equal("q4_k_m")) + }) + }) + + Describe("compile-time adapter contract", func() { + It("satisfies syncstate.Store for *distributed.QuantStore", func() { + // Guards against drift between the adapter and the component interface; + // the var assertion in syncstore.go covers it at build time, this keeps + // the type referenced from a spec too. + var _ *distributed.QuantStore + Expect(&quantStoreAdapter{}).ToNot(BeNil()) + }) + }) +}) diff --git a/core/services/quantization/syncstore.go b/core/services/quantization/syncstore.go new file mode 100644 index 000000000..4201c5ca6 --- /dev/null +++ b/core/services/quantization/syncstore.go @@ -0,0 +1,114 @@ +package quantization + +import ( + "context" + "encoding/json" + "time" + + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/syncstate" +) + +// quantStoreAdapter bridges the distributed PostgreSQL QuantStore to the generic +// syncstate.Store the SyncedMap consumes. It is only wired in distributed mode; +// standalone leaves Store nil and hydrates from disk via a Loader instead. +// +// The SyncedMap value type is *schema.QuantizationJob (the exact shape the REST +// API returns) so reads need no conversion and the response JSON is provably +// unchanged. The adapter is the single place that translates between that API +// shape and the DB QuantJobRecord. +type quantStoreAdapter struct { + store *distributed.QuantStore +} + +// compile-time assertion that the adapter satisfies the component's Store. +var _ syncstate.Store[string, *schema.QuantizationJob] = (*quantStoreAdapter)(nil) + +func (a *quantStoreAdapter) List(_ context.Context) ([]*schema.QuantizationJob, error) { + records, err := a.store.ListAll() + if err != nil { + return nil, err + } + jobs := make([]*schema.QuantizationJob, 0, len(records)) + for i := range records { + jobs = append(jobs, recordToJob(&records[i])) + } + return jobs, nil +} + +func (a *quantStoreAdapter) Upsert(_ context.Context, job *schema.QuantizationJob) error { + return a.store.Upsert(jobToRecord(job)) +} + +func (a *quantStoreAdapter) Delete(_ context.Context, id string) error { + return a.store.Delete(id) +} + +// recordToJob maps a persisted DB record back to the API shape, reconstructing +// the structured Config / ExtraOptions from their JSON columns. +func recordToJob(r *distributed.QuantJobRecord) *schema.QuantizationJob { + job := &schema.QuantizationJob{ + ID: r.ID, + UserID: r.UserID, + Model: r.Model, + Backend: r.Backend, + ModelID: r.ModelID, + QuantizationType: r.QuantizationType, + Status: r.Status, + Message: r.Message, + OutputDir: r.OutputDir, + OutputFile: r.OutputFile, + ImportStatus: r.ImportStatus, + ImportMessage: r.ImportMessage, + ImportModelName: r.ImportModelName, + CreatedAt: r.CreatedAt.UTC().Format(time.RFC3339), + } + if r.ExtraOptsJSON != "" { + // Best-effort: a malformed column must not drop the whole job from the API. + _ = json.Unmarshal([]byte(r.ExtraOptsJSON), &job.ExtraOptions) + } + if r.ConfigJSON != "" { + var cfg schema.QuantizationJobRequest + if err := json.Unmarshal([]byte(r.ConfigJSON), &cfg); err == nil { + job.Config = &cfg + } + } + return job +} + +// jobToRecord maps the API shape to a DB record for write-through, serializing +// the structured Config / ExtraOptions into their JSON columns. CreatedAt is +// parsed back from the RFC3339 string the service stamps; an unparseable value is +// left zero so QuantStore.Upsert stamps "now". +func jobToRecord(job *schema.QuantizationJob) *distributed.QuantJobRecord { + rec := &distributed.QuantJobRecord{ + ID: job.ID, + UserID: job.UserID, + Model: job.Model, + Backend: job.Backend, + ModelID: job.ModelID, + QuantizationType: job.QuantizationType, + Status: job.Status, + Message: job.Message, + OutputDir: job.OutputDir, + OutputFile: job.OutputFile, + ImportStatus: job.ImportStatus, + ImportMessage: job.ImportMessage, + ImportModelName: job.ImportModelName, + } + if job.Config != nil { + if data, err := json.Marshal(job.Config); err == nil { + rec.ConfigJSON = string(data) + } + } + if job.ExtraOptions != nil { + if data, err := json.Marshal(job.ExtraOptions); err == nil { + rec.ExtraOptsJSON = string(data) + } + } + if t, err := time.Parse(time.RFC3339, job.CreatedAt); err == nil { + rec.CreatedAt = t + } + return rec +} diff --git a/core/services/syncstate/syncstate.go b/core/services/syncstate/syncstate.go new file mode 100644 index 000000000..809177d40 --- /dev/null +++ b/core/services/syncstate/syncstate.go @@ -0,0 +1,289 @@ +// Package syncstate provides SyncedMap, a reusable cross-replica in-memory map. +// +// LocalAI in distributed mode runs multiple frontend replicas behind a +// round-robin load balancer. Several features keep process-local in-memory state +// that is surfaced to the HTTP/UI API; without cross-replica sync a poll that +// lands on a replica which did not originate a change sees stale or missing data. +// SyncedMap collapses the three legs each feature otherwise hand-wires - an +// in-memory map, a NATS broadcast/apply path, and optional durable read-through - +// into one well-tested component so cross-replica consistency is a configuration +// choice rather than a bespoke re-implementation. +package syncstate + +import ( + "context" + "sync" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/xlog" +) + +// Op values carried on the wire and passed to OnApply. +const ( + opSet = "set" + opDelete = "delete" +) + +// Store is optional durable backing for a SyncedMap. In distributed mode it is a +// single shared DB, so the apply path (a delta received from a peer) updates +// memory only and never re-writes the Store. +type Store[K comparable, V any] interface { + List(ctx context.Context) ([]V, error) + Upsert(ctx context.Context, v V) error + Delete(ctx context.Context, k K) error +} + +// Config configures a SyncedMap. +type Config[K comparable, V any] struct { + Name string // subject namespace, e.g. "finetune.jobs" + Key func(V) K // extract the key from a value + Nats messaging.MessagingClient // nil => standalone: in-memory only, no broadcast/subscribe + Store Store[K, V] // optional read-through persistence + Loader func(ctx context.Context) ([]V, error) // source when there is no Store (e.g. disk reload) + OnApply func(op string, k K, v V) // optional hook after an applied change (e.g. ShutdownModel) + Reconcile time.Duration // optional periodic re-hydrate; 0 = off +} + +// delta is the JSON wire envelope broadcast on every local mutation. Value is +// omitempty so a delete carries only op+key. +type delta[K comparable, V any] struct { + Op string `json:"op"` + Key K `json:"key"` + Value V `json:"value,omitempty"` +} + +// SyncedMap is a cross-replica in-memory map. A local write (Set/Delete) updates +// memory, the optional durable Store, then broadcasts a delta to peers. A peer's +// delta updates memory only and fires OnApply - it never re-broadcasts and never +// writes the Store. That structural split is the echo-loop guard (same pattern as +// galleryop.mergeStatus / OpCache.applyStart): receiving your own broadcast just +// re-applies an idempotent value to memory, so there is no storm and no +// double-write. +type SyncedMap[K comparable, V any] struct { + cfg Config[K, V] + + mu sync.RWMutex + data map[K]V + + sub Subscription + + // lifeCtx outlives Start's argument: a reconnect callback or reconcile tick + // can fire long after Start returns, so they must not be tied to a ctx the + // caller may cancel. Close cancels it. + lifeCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// Subscription is the subset of messaging.Subscription the component holds onto. +type Subscription = messaging.Subscription + +// New constructs a SyncedMap. Call Start to hydrate and begin syncing. +func New[K comparable, V any](cfg Config[K, V]) *SyncedMap[K, V] { + return &SyncedMap[K, V]{cfg: cfg, data: make(map[K]V)} +} + +func (m *SyncedMap[K, V]) subject() string { + return messaging.SubjectSyncStateDelta(m.cfg.Name) +} + +// Start hydrates from the source, subscribes for peer deltas, registers a +// reconnect re-hydrate (when the client supports it), and starts the optional +// reconcile ticker. +func (m *SyncedMap[K, V]) Start(ctx context.Context) error { + if err := m.hydrate(ctx); err != nil { + return err + } + + // The cancel func is stored on the struct and invoked in Close (covered by + // tests); lifeCtx must outlive Start to drive the reconnect/reconcile + // goroutines, so it cannot be cancelled or deferred within this scope. + m.lifeCtx, m.cancel = context.WithCancel(context.Background()) // #nosec G118 -- cancel is invoked in Close() + + if m.cfg.Nats != nil { + sub, err := messaging.SubscribeJSON(m.cfg.Nats, m.subject(), m.apply) + if err != nil { + return err + } + m.sub = sub + + // nats.go transparently resubscribes on reconnect, but it cannot know we + // kept derived in-memory state that may have drifted while the link was + // down, so re-hydrate from the durable source. Detected via an optional + // interface so MessagingClient itself stays minimal; standalone/test + // clients without the method simply fall back to the reconcile ticker. + if r, ok := m.cfg.Nats.(interface{ OnReconnect(func()) }); ok { + r.OnReconnect(func() { + if err := m.hydrate(m.lifeCtx); err != nil { + xlog.Warn("syncstate: reconnect re-hydrate failed", "name", m.cfg.Name, "error", err) + } + }) + } + } + + if m.cfg.Reconcile > 0 { + m.wg.Add(1) + go m.reconcileLoop() + } + return nil +} + +// Close unsubscribes and stops the reconcile ticker. +func (m *SyncedMap[K, V]) Close() error { + if m.cancel != nil { + m.cancel() + } + m.wg.Wait() + if m.sub != nil { + return m.sub.Unsubscribe() + } + return nil +} + +// Set updates the value locally, writes through the Store, then broadcasts. +// Per the data-flow contract the Store write happens under the lock so memory and +// durable state move together; the broadcast is best-effort after unlocking. +func (m *SyncedMap[K, V]) Set(ctx context.Context, v V) error { + k := m.cfg.Key(v) + m.mu.Lock() + m.data[k] = v + if m.cfg.Store != nil { + if err := m.cfg.Store.Upsert(ctx, v); err != nil { + m.mu.Unlock() + return err + } + } + m.mu.Unlock() + m.publish(opSet, k, v) + return nil +} + +// Delete removes the key locally, deletes it from the Store, then broadcasts. +func (m *SyncedMap[K, V]) Delete(ctx context.Context, k K) error { + m.mu.Lock() + delete(m.data, k) + if m.cfg.Store != nil { + if err := m.cfg.Store.Delete(ctx, k); err != nil { + m.mu.Unlock() + return err + } + } + m.mu.Unlock() + var zero V + m.publish(opDelete, k, zero) + return nil +} + +// Get returns the value for k and whether it was present. +func (m *SyncedMap[K, V]) Get(k K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + v, ok := m.data[k] + return v, ok +} + +// List returns a snapshot slice of all values. +func (m *SyncedMap[K, V]) List() []V { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]V, 0, len(m.data)) + for _, v := range m.data { + out = append(out, v) + } + return out +} + +// Snapshot returns a copy of the underlying map. +func (m *SyncedMap[K, V]) Snapshot() map[K]V { + m.mu.RLock() + defer m.mu.RUnlock() + out := make(map[K]V, len(m.data)) + for k, v := range m.data { + out[k] = v + } + return out +} + +// publish broadcasts a delta. Standalone (nil Nats) is a strict no-op. +func (m *SyncedMap[K, V]) publish(op string, k K, v V) { + if m.cfg.Nats == nil { + return + } + if err := m.cfg.Nats.Publish(m.subject(), delta[K, V]{Op: op, Key: k, Value: v}); err != nil { + xlog.Warn("syncstate: failed to broadcast delta", "name", m.cfg.Name, "op", op, "error", err) + } +} + +// apply handles a peer's delta: memory-only update plus OnApply. It deliberately +// never writes the Store nor re-publishes - that is the echo-loop guard. +func (m *SyncedMap[K, V]) apply(d delta[K, V]) { + switch d.Op { + case opSet: + m.mu.Lock() + m.data[d.Key] = d.Value + m.mu.Unlock() + case opDelete: + m.mu.Lock() + delete(m.data, d.Key) + m.mu.Unlock() + default: + xlog.Warn("syncstate: ignoring delta with unknown op", "name", m.cfg.Name, "op", d.Op) + return + } + if m.cfg.OnApply != nil { + m.cfg.OnApply(d.Op, d.Key, d.Value) + } +} + +// hydrate replaces the whole map from the durable source: Store if present, else +// Loader. With neither, a late joiner starts empty and catches up via deltas +// (acceptable only for ephemeral state). +func (m *SyncedMap[K, V]) hydrate(ctx context.Context) error { + var ( + vals []V + err error + ) + switch { + case m.cfg.Store != nil: + vals, err = m.cfg.Store.List(ctx) + case m.cfg.Loader != nil: + vals, err = m.cfg.Loader(ctx) + default: + return nil + } + if err != nil { + return err + } + m.replaceAll(vals) + return nil +} + +// replaceAll atomically swaps the map contents for the given values, keyed via +// cfg.Key. +func (m *SyncedMap[K, V]) replaceAll(vals []V) { + next := make(map[K]V, len(vals)) + for _, v := range vals { + next[m.cfg.Key(v)] = v + } + m.mu.Lock() + m.data = next + m.mu.Unlock() +} + +// reconcileLoop periodically re-hydrates to repair silent drift (missed deltas). +func (m *SyncedMap[K, V]) reconcileLoop() { + defer m.wg.Done() + t := time.NewTicker(m.cfg.Reconcile) + defer t.Stop() + for { + select { + case <-m.lifeCtx.Done(): + return + case <-t.C: + if err := m.hydrate(m.lifeCtx); err != nil { + xlog.Warn("syncstate: reconcile re-hydrate failed", "name", m.cfg.Name, "error", err) + } + } + } +} diff --git a/core/services/syncstate/syncstate_suite_test.go b/core/services/syncstate/syncstate_suite_test.go new file mode 100644 index 000000000..b4f025c9e --- /dev/null +++ b/core/services/syncstate/syncstate_suite_test.go @@ -0,0 +1,13 @@ +package syncstate_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSyncstate(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Syncstate Suite") +} diff --git a/core/services/syncstate/syncstate_test.go b/core/services/syncstate/syncstate_test.go new file mode 100644 index 000000000..1e31db41b --- /dev/null +++ b/core/services/syncstate/syncstate_test.go @@ -0,0 +1,291 @@ +package syncstate_test + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// job is a minimal JSON-serializable value stand-in for the real cross-replica +// records (finetune/quant/agent jobs) the component is built for. +type job struct { + ID string `json:"id"` + Status string `json:"status"` +} + +func jobKey(j *job) string { return j.ID } + +const stateName = "test.jobs" + +func deltaSubject() string { return messaging.SubjectSyncStateDelta(stateName) } + +// fakeStore is an in-memory Store that records call counts so specs can assert +// the write-through-vs-apply split (local writes hit the Store; applied deltas +// must not). +type fakeStore struct { + mu sync.Mutex + data map[string]*job + upsertCalls int + deleteCalls int + listCalls int +} + +func newFakeStore(seed ...*job) *fakeStore { + s := &fakeStore{data: map[string]*job{}} + for _, j := range seed { + s.data[j.ID] = j + } + return s +} + +func (s *fakeStore) List(_ context.Context) ([]*job, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.listCalls++ + out := make([]*job, 0, len(s.data)) + for _, j := range s.data { + out = append(out, j) + } + return out, nil +} + +func (s *fakeStore) Upsert(_ context.Context, j *job) error { + s.mu.Lock() + defer s.mu.Unlock() + s.upsertCalls++ + s.data[j.ID] = j + return nil +} + +func (s *fakeStore) Delete(_ context.Context, k string) error { + s.mu.Lock() + defer s.mu.Unlock() + s.deleteCalls++ + delete(s.data, k) + return nil +} + +// add simulates a peer replica writing to the shared DB out-of-band (e.g. while +// this replica was partitioned), so a re-hydrate can be observed to pick it up. +func (s *fakeStore) add(j *job) { + s.mu.Lock() + defer s.mu.Unlock() + s.data[j.ID] = j +} + +func (s *fakeStore) counts() (upsert, del, list int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.upsertCalls, s.deleteCalls, s.listCalls +} + +var _ = Describe("SyncedMap", func() { + ctx := context.Background() + + Describe("cross-replica delta propagation", func() { + var ( + bus *testutil.FakeBus + a, b *syncstate.SyncedMap[string, *job] + ) + + BeforeEach(func() { + bus = testutil.NewFakeBus() + a = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + }) + + AfterEach(func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }) + + It("propagates a Set on A to B", func() { + Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed()) + + got, ok := b.Get("1") + Expect(ok).To(BeTrue(), "replica B should see the value A just set") + Expect(got.Status).To(Equal("running")) + }) + + It("prunes a Delete on A from B", func() { + Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed()) + _, present := b.Get("1") + Expect(present).To(BeTrue(), "precondition: B must have the value before the delete") + + Expect(a.Delete(ctx, "1")).To(Succeed()) + + _, ok := b.Get("1") + Expect(ok).To(BeFalse(), "a delete on A must remove the key from B") + }) + }) + + Describe("hydration", func() { + It("hydrates on Start from a preloaded Store", func() { + store := newFakeStore(&job{ID: "x", Status: "done"}) + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Store: store}) + Expect(m.Start(ctx)).To(Succeed()) + + got, ok := m.Get("x") + Expect(ok).To(BeTrue(), "Start must populate the map from the Store") + Expect(got.Status).To(Equal("done")) + }) + + It("uses the Loader when Store is nil", func() { + m := syncstate.New(syncstate.Config[string, *job]{ + Name: stateName, + Key: jobKey, + Loader: func(_ context.Context) ([]*job, error) { + return []*job{{ID: "l", Status: "loaded"}}, nil + }, + }) + Expect(m.Start(ctx)).To(Succeed()) + + got, ok := m.Get("l") + Expect(ok).To(BeTrue(), "Loader output must hydrate the map when there is no Store") + Expect(got.Status).To(Equal("loaded")) + }) + }) + + Describe("echo-loop guard", func() { + It("applies its own broadcast once and does not re-publish", func() { + bus := testutil.NewFakeBus() + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "e", Status: "running"})).To(Succeed()) + + // One local write must produce exactly one broadcast: A and B both + // receive it and apply to memory, but the apply path never re-publishes. + Expect(bus.PublishCount(deltaSubject())).To(Equal(1), + "the apply path must not re-broadcast, otherwise replicas storm") + Expect(a.List()).To(HaveLen(1), "A must not double-store its own echo") + _, ok := b.Get("e") + Expect(ok).To(BeTrue()) + }) + }) + + Describe("Store write-through vs apply", func() { + It("writes the Store on local Set/Delete but not on an applied delta", func() { + bus := testutil.NewFakeBus() + storeA := newFakeStore() + storeB := newFakeStore() + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeA}) + b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeB}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "w", Status: "running"})).To(Succeed()) + + upA, _, _ := storeA.counts() + upB, _, _ := storeB.counts() + Expect(upA).To(Equal(1), "local Set must write through to its own Store") + Expect(upB).To(Equal(0), "the apply path must never write the peer's Store") + + Expect(a.Delete(ctx, "w")).To(Succeed()) + _, delA, _ := storeA.counts() + _, delB, _ := storeB.counts() + Expect(delA).To(Equal(1), "local Delete must delete from its own Store") + Expect(delB).To(Equal(0), "the apply path must never delete from the peer's Store") + }) + }) + + Describe("OnApply hook", func() { + It("fires with the correct op and key on an applied delta", func() { + bus := testutil.NewFakeBus() + var ( + mu sync.Mutex + ops []string + keys []string + ) + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b := syncstate.New(syncstate.Config[string, *job]{ + Name: stateName, Key: jobKey, Nats: bus, + OnApply: func(op string, k string, _ *job) { + mu.Lock() + ops = append(ops, op) + keys = append(keys, k) + mu.Unlock() + }, + }) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "o", Status: "running"})).To(Succeed()) + Expect(a.Delete(ctx, "o")).To(Succeed()) + + mu.Lock() + defer mu.Unlock() + Expect(ops).To(Equal([]string{"set", "delete"})) + Expect(keys).To(Equal([]string{"o", "o"})) + }) + }) + + Describe("standalone (nil Nats)", func() { + It("works in-memory with no panic and nothing to broadcast", func() { + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey}) + Expect(m.Start(ctx)).To(Succeed()) + defer func() { Expect(m.Close()).To(Succeed()) }() + + Expect(func() { + Expect(m.Set(ctx, &job{ID: "s", Status: "running"})).To(Succeed()) + }).ToNot(Panic()) + + got, ok := m.Get("s") + Expect(ok).To(BeTrue()) + Expect(got.Status).To(Equal("running")) + Expect(m.List()).To(HaveLen(1)) + Expect(m.Snapshot()).To(HaveKey("s")) + + Expect(m.Delete(ctx, "s")).To(Succeed()) + _, ok = m.Get("s") + Expect(ok).To(BeFalse()) + }) + }) + + Describe("reconnect re-hydrate", func() { + It("re-reads the source when the messaging client reconnects", func() { + bus := testutil.NewFakeBus() + store := newFakeStore(&job{ID: "init", Status: "running"}) + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: store}) + Expect(m.Start(ctx)).To(Succeed()) + defer func() { Expect(m.Close()).To(Succeed()) }() + + _, ok := m.Get("init") + Expect(ok).To(BeTrue()) + + // A peer writes to the shared DB while we are unaware (no delta seen). + store.add(&job{ID: "late", Status: "running"}) + _, ok = m.Get("late") + Expect(ok).To(BeFalse(), "the new row should not appear before a re-hydrate") + + bus.TriggerReconnect() + + _, ok = m.Get("late") + Expect(ok).To(BeTrue(), "reconnect must re-hydrate from the source and pick up drift") + _, _, list := store.counts() + Expect(list).To(Equal(2), "exactly one Start hydrate plus one reconnect re-hydrate") + }) + }) +}) diff --git a/core/services/testutil/fakebus.go b/core/services/testutil/fakebus.go new file mode 100644 index 000000000..7452d810f --- /dev/null +++ b/core/services/testutil/fakebus.go @@ -0,0 +1,160 @@ +package testutil + +import ( + "encoding/json" + "strings" + "sync" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +// FakeBus is an in-memory messaging.MessagingClient that delivers each published +// message synchronously to every registered subscriber whose subject filter +// matches, including NATS-style wildcard subjects (`*` matches exactly one +// token). +// +// Synchronous delivery keeps specs deterministic: the moment Publish returns, +// every matching subscriber's handler has already run, so the spec body can read +// the resulting state without polling. It is the shared test double for every +// cross-replica-sync adopter (gallery, syncstate, ...) so they exercise the same +// delivery semantics. It deliberately depends only on the standard library and +// the messaging package — no test framework — so it is importable anywhere. +type FakeBus struct { + mu sync.Mutex + subs []fakeBusSub + // publishCounts records how many messages were published per subject, so a + // spec can assert the echo-loop guard (an applied delta must not re-publish). + publishCounts map[string]int + + // reconnectCbs back the optional OnReconnect/TriggerReconnect pair, letting a + // spec exercise the component's reconnect re-hydrate path without a real + // NATS server. + reconnectCbs []func() +} + +type fakeBusSub struct { + subject string + handler func([]byte) +} + +// NewFakeBus returns a ready-to-use in-memory bus. +func NewFakeBus() *FakeBus { + return &FakeBus{publishCounts: map[string]int{}} +} + +// subjectMatches reports whether a subscription filter matches a concrete +// subject, honoring the single-token `*` wildcard used by NATS. +func subjectMatches(filter, subject string) bool { + if filter == subject { + return true + } + fp := strings.Split(filter, ".") + sp := strings.Split(subject, ".") + if len(fp) != len(sp) { + return false + } + for i := range fp { + if fp[i] == "*" { + continue + } + if fp[i] != sp[i] { + return false + } + } + return true +} + +// Publish marshals data as JSON and delivers it synchronously to every matching +// subscriber. +func (b *FakeBus) Publish(subject string, data any) error { + payload, err := json.Marshal(data) + if err != nil { + return err + } + b.mu.Lock() + b.publishCounts[subject]++ + subs := append([]fakeBusSub(nil), b.subs...) + b.mu.Unlock() + for _, s := range subs { + if subjectMatches(s.subject, subject) { + s.handler(payload) + } + } + return nil +} + +// PublishCount returns how many messages were published on the exact subject. +func (b *FakeBus) PublishCount(subject string) int { + b.mu.Lock() + defer b.mu.Unlock() + return b.publishCounts[subject] +} + +type fakeBusSubscription struct { + bus *FakeBus + subRef fakeBusSub +} + +func (s *fakeBusSubscription) Unsubscribe() error { + s.bus.mu.Lock() + defer s.bus.mu.Unlock() + for i, candidate := range s.bus.subs { + if candidate.subject == s.subRef.subject { + s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...) + return nil + } + } + return nil +} + +func (b *FakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) { + sub := fakeBusSub{subject: subject, handler: handler} + b.mu.Lock() + b.subs = append(b.subs, sub) + b.mu.Unlock() + return &fakeBusSubscription{bus: b, subRef: sub}, nil +} + +func (b *FakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) { + return b.Subscribe(subject, handler) +} + +func (b *FakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *FakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *FakeBus) Request(string, []byte, time.Duration) ([]byte, error) { + return nil, nil +} + +func (b *FakeBus) IsConnected() bool { return true } +func (b *FakeBus) Close() {} + +// OnReconnect mirrors *messaging.Client.OnReconnect so a spec can drive the +// component's reconnect re-hydrate path. The component detects this method via an +// optional interface assertion; implementing it here keeps the fake a faithful +// stand-in for the concrete client. +func (b *FakeBus) OnReconnect(cb func()) { + if cb == nil { + return + } + b.mu.Lock() + b.reconnectCbs = append(b.reconnectCbs, cb) + b.mu.Unlock() +} + +// TriggerReconnect runs every registered reconnect callback, simulating a NATS +// reconnect event. +func (b *FakeBus) TriggerReconnect() { + b.mu.Lock() + cbs := append([]func(){}, b.reconnectCbs...) + b.mu.Unlock() + for _, cb := range cbs { + cb() + } +} diff --git a/tests/e2e/distributed/syncstate_distributed_test.go b/tests/e2e/distributed/syncstate_distributed_test.go new file mode 100644 index 000000000..acd2797e6 --- /dev/null +++ b/tests/e2e/distributed/syncstate_distributed_test.go @@ -0,0 +1,161 @@ +package distributed_test + +import ( + "context" + + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + pgdriver "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// ftSyncStore adapts the real FineTuneStore to syncstate.Store, exactly as the +// finetune service does in production. Defined here (rather than reusing the +// service's unexported adapter) so the e2e exercises the store + component over +// real infrastructure without pulling in backend execution. +type ftSyncStore struct{ s *distributed.FineTuneStore } + +func (a ftSyncStore) List(_ context.Context) ([]*distributed.FineTuneJobRecord, error) { + recs, err := a.s.ListAll() + if err != nil { + return nil, err + } + out := make([]*distributed.FineTuneJobRecord, len(recs)) + for i := range recs { + r := recs[i] + out[i] = &r + } + return out, nil +} + +func (a ftSyncStore) Upsert(_ context.Context, r *distributed.FineTuneJobRecord) error { + return a.s.Upsert(r) +} + +func (a ftSyncStore) Delete(_ context.Context, k string) error { return a.s.Delete(k) } + +// This suite is the real-infrastructure counterpart to the fake-bus unit tests: +// two SyncedMap instances stand in for two LocalAI frontend replicas, each with +// its OWN NATS connection to a shared NATS server and a SHARED PostgreSQL store - +// the exact distributed-mode invariant (single shared DB, per-replica process +// state). It proves the delta path works over the wire and that a late-joining +// replica recovers via store hydrate (the at-most-once gap a fake bus cannot +// exercise). +var _ = Describe("SyncedMap two-replica sync over real NATS", Label("Distributed"), func() { + var ( + infra *TestInfra + ftStore *distributed.FineTuneStore + ) + + BeforeEach(func() { + infra = SetupInfra("localai_syncstate_dist_test") + + db, err := gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + Expect(err).ToNot(HaveOccurred()) + + ftStore, err = distributed.NewFineTuneStore(db) + Expect(err).ToNot(HaveOccurred()) + }) + + // newReplica builds an independent "replica": its own NATS client to the + // shared server plus a SyncedMap over the shared store, started (hydrate + + // subscribe) and cleaned up automatically. + newReplica := func() *syncstate.SyncedMap[string, *distributed.FineTuneJobRecord] { + GinkgoHelper() + nc, err := messaging.New(infra.NatsURL) + Expect(err).ToNot(HaveOccurred()) + + sm := syncstate.New(syncstate.Config[string, *distributed.FineTuneJobRecord]{ + Name: "finetune.jobs", + Key: func(r *distributed.FineTuneJobRecord) string { return r.ID }, + Nats: nc, + Store: ftSyncStore{s: ftStore}, + }) + Expect(sm.Start(infra.Ctx)).To(Succeed()) + FlushNATS(nc) // ensure the subscription is registered server-side before any publish + DeferCleanup(func() { + _ = sm.Close() + nc.Close() + }) + return sm + } + + rec := func(id, status string) *distributed.FineTuneJobRecord { + return &distributed.FineTuneJobRecord{ + ID: id, UserID: "u1", Model: "m", Backend: "b", + TrainingType: "lora", TrainingMethod: "sft", Status: status, + } + } + + It("propagates a create from replica A to replica B over the wire", func() { + a := newReplica() + b := newReplica() + + Expect(a.Set(infra.Ctx, rec("job-1", "queued"))).To(Succeed()) + + Eventually(func() bool { _, ok := b.Get("job-1"); return ok }, "10s", "50ms"). + Should(BeTrue(), "replica B must observe the job created on A via NATS") + + got, ok := b.Get("job-1") + Expect(ok).To(BeTrue()) + Expect(got.Status).To(Equal("queued")) + }) + + It("propagates an update and a delete across replicas", func() { + a := newReplica() + b := newReplica() + + Expect(a.Set(infra.Ctx, rec("job-2", "queued"))).To(Succeed()) + Eventually(func() bool { _, ok := b.Get("job-2"); return ok }, "10s", "50ms").Should(BeTrue()) + + // Update on A -> B reflects the new status. + Expect(a.Set(infra.Ctx, rec("job-2", "training"))).To(Succeed()) + Eventually(func() string { + if r, ok := b.Get("job-2"); ok { + return r.Status + } + return "" + }, "10s", "50ms").Should(Equal("training")) + + // Delete on A -> B prunes (a reload-from-path could not do this). + Expect(a.Delete(infra.Ctx, "job-2")).To(Succeed()) + Eventually(func() bool { _, ok := b.Get("job-2"); return ok }, "10s", "50ms"). + Should(BeFalse(), "replica B must drop the job deleted on A") + }) + + It("hydrates a late-joining replica from the shared store (missed-delta recovery)", func() { + a := newReplica() + + // Written (and broadcast) BEFORE replica C exists, so C can never receive + // the delta - it can only learn the job by hydrating from shared Postgres + // on Start. This is the at-most-once gap a fake bus cannot exercise. + Expect(a.Set(infra.Ctx, rec("job-3", "completed"))).To(Succeed()) + Eventually(func() (*distributed.FineTuneJobRecord, error) { return ftStore.Get("job-3") }, "10s", "50ms"). + ShouldNot(BeNil(), "write-through must reach the shared store first") + + c := newReplica() // joins late; Start() hydrates from the store synchronously + + got, ok := c.Get("job-3") + Expect(ok).To(BeTrue(), "late replica must recover the job via store hydrate, not a delta") + Expect(got.Status).To(Equal("completed")) + }) + + It("write-through persists a local Set to the shared PostgreSQL store", func() { + a := newReplica() + + Expect(a.Set(infra.Ctx, rec("job-4", "queued"))).To(Succeed()) + + persisted, err := ftStore.Get("job-4") + Expect(err).ToNot(HaveOccurred()) + Expect(persisted.ID).To(Equal("job-4")) + Expect(persisted.Status).To(Equal("queued")) + }) +})