mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-28 10:27:30 -04:00
* feat(distributed): add SyncedMap cross-replica in-memory state component Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map that keeps itself consistent across frontend replicas via NATS, with an optional pluggable durable Store and hydrate-from-source convergence. Several features keep process-local state surfaced to the API (finetune/quant jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS broadcast + read-through-store legs - or forgot to, reintroducing cross-replica staleness. SyncedMap makes that consistency a configuration choice: - local writes mutate the map, write through the Store, then broadcast a delta; - the apply path is memory-only and never re-publishes or re-writes the Store (structural echo-loop guard, mirroring galleryop.mergeStatus); - on Start and on NATS reconnect the map re-hydrates from the source (Store, else Loader); an optional periodic Reconcile repairs silent drift; - standalone mode (nil NATS client) is a strict in-memory no-op. Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback, consumed through an optional type-assertion so MessagingClient stays minimal. Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous in-process MessagingClient with wildcard matching) for adopter tests. Component only; service migrations follow in subsequent commits. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(finetune): back jobs with SyncedMap for cross-replica consistency FineTuneService kept jobs in a process-local map and, although it wrote them to Postgres, ListJobs/GetJob never read the store back and the wired natsClient was never used - so in distributed mode a job created on one replica was invisible to the others. Replace the map and the dead client with a syncstate.SyncedMap keyed by job ID, value *schema.FineTuneJob (the exact REST shape, so responses are unchanged). - Add a Store adapter (core/services/finetune/syncstore.go) over FineTuneStore, plus FineTuneStore.ListAll (global hydrate; per-user List kept) and an idempotent Upsert (create-or-update; Create alone fails on dup key). - Writes go through SyncedMap.Set/Delete (write-through + broadcast); reads use List/Get. The on-disk state.json path becomes the standalone Loader, keeping single-node restart recovery (stale->stopped / exporting->failed fixups). - Fold SetNATSClient/SetFineTuneStore into NewFineTuneService; app.go passes the distributed NATS client + store when distributed, nil otherwise. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(agentpool): back agent tasks with SyncedMap for cross-replica consistency AgentJobService.ListTasks read the process-local tasks map only, while ListJobs already read through the DB persister + dispatcher NATS - so in distributed mode a task created on one replica was invisible to the others. Back tasks with a syncstate.SyncedMap keyed by task ID (value schema.Task, the exact REST shape); jobs are left untouched. - Store adapter (task_syncstore.go) over the existing JobPersister (LoadTasks/SaveTask/DeleteTask); reads svc.persister/userID live so a persister swap needs no rebuild. No new persister methods required. - Task reads -> SyncedMap.List/Get; create/update -> Set (write-through + broadcast); delete -> Delete. The file persister now owns its own task set so the write-through path does not re-enter the SyncedMap lock (deadlock guard). - The distributed NATS client is not available at construction (start() precedes initDistributed), so it is injected via SetTaskSyncNATS, which rebuilds the still-empty map before Start/hydrate. Wired at the main, restart, and per-user (UserServicesManager) distributed sites. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(quantization): back jobs with SyncedMap + durable QuantStore QuantizationService kept jobs in a process-local map persisted only to a local state.json, so in distributed mode jobs were neither visible across replicas nor durable cluster-wide. Back jobs with a syncstate.SyncedMap keyed by job ID (value *schema.QuantizationJob, the exact REST shape). - New distributed.QuantStore (GORM, table quantization_jobs) mirroring FineTuneStore: Create/Get/ListAll/Upsert(idempotent)/Delete, registered for AutoMigrate via distributed.InitStores (Stores.Quant). - New adapter (quantization/syncstore.go) over QuantStore implementing syncstate.Store, with record<->schema conversion. - Reads go through List/Get, writes through Set/Delete (write-through + broadcast); state.json is kept as the standalone Loader for single-node restart recovery (stale-job fixups preserved). - app.go passes the distributed NATS client + QuantStore when distributed, nil otherwise; Start/Close lifecycle mirrors finetune. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(syncstate): annotate gosec G118 false positive on lifeCtx gosec flagged the WithCancel in Start as "cancellation function not called" because the returned cancel is stored on the struct rather than called/deferred in scope. It is invoked in Close (covered by tests), and lifeCtx must outlive Start to drive the reconnect/reconcile goroutines. Suppress the verified false positive with a justified #nosec G118. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(distributed): e2e two-replica SyncedMap sync over real NATS + Postgres Adds the real-infrastructure counterpart to the fake-bus unit tests, in the existing distributed e2e suite (testcontainers NATS + PostgreSQL). Two SyncedMap instances stand in for two frontend replicas - each with its OWN NATS connection to a shared server and a SHARED Postgres store (the distributed-mode invariant) - and assert, over the wire: - a create on replica A is observed by replica B; - an update and a delete propagate A -> B (delete prunes, which a reload cannot); - a late-joining replica recovers a job it never received a delta for, via store hydrate on Start (the at-most-once gap a fake bus cannot exercise); - a local Set is written through to the shared Postgres store. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
232 lines
6.5 KiB
Go
232 lines
6.5 KiB
Go
package agentpool
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/mudler/LocalAGI/services/skills"
|
|
"github.com/mudler/LocalAGI/webui/collections"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/services/jobs"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/templates"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// UserServicesManager lazily creates per-user service instances for
|
|
// collections, skills, and jobs.
|
|
type UserServicesManager struct {
|
|
mu sync.RWMutex
|
|
storage *UserScopedStorage
|
|
appConfig *config.ApplicationConfig
|
|
modelLoader *model.ModelLoader
|
|
configLoader *config.ModelConfigLoader
|
|
evaluator *templates.Evaluator
|
|
collectionsCache map[string]collections.Backend
|
|
skillsCache map[string]*skills.Service
|
|
jobsCache map[string]*AgentJobService
|
|
|
|
// Shared distributed backends (set once, inherited by per-user job services)
|
|
jobDispatcher DistributedDispatcher
|
|
jobDBStore *jobs.JobStore
|
|
// jobNats keeps per-user agent tasks consistent across replicas (nil in
|
|
// standalone). Inherited by each per-user AgentJobService.
|
|
jobNats messaging.MessagingClient
|
|
}
|
|
|
|
// NewUserServicesManager creates a new UserServicesManager.
|
|
func NewUserServicesManager(
|
|
storage *UserScopedStorage,
|
|
appConfig *config.ApplicationConfig,
|
|
modelLoader *model.ModelLoader,
|
|
configLoader *config.ModelConfigLoader,
|
|
evaluator *templates.Evaluator,
|
|
) *UserServicesManager {
|
|
return &UserServicesManager{
|
|
storage: storage,
|
|
appConfig: appConfig,
|
|
modelLoader: modelLoader,
|
|
configLoader: configLoader,
|
|
evaluator: evaluator,
|
|
collectionsCache: make(map[string]collections.Backend),
|
|
skillsCache: make(map[string]*skills.Service),
|
|
jobsCache: make(map[string]*AgentJobService),
|
|
}
|
|
}
|
|
|
|
// GetCollections returns the collections backend for a user, creating it lazily.
|
|
func (m *UserServicesManager) GetCollections(userID string) (collections.Backend, error) {
|
|
m.mu.RLock()
|
|
if backend, ok := m.collectionsCache[userID]; ok {
|
|
m.mu.RUnlock()
|
|
return backend, nil
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if backend, ok := m.collectionsCache[userID]; ok {
|
|
return backend, nil
|
|
}
|
|
|
|
if err := m.storage.EnsureUserDirs(userID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cfg := m.appConfig.AgentPool
|
|
apiURL := cfg.APIURL
|
|
if apiURL == "" {
|
|
apiURL = "http://127.0.0.1:" + getPort(m.appConfig)
|
|
}
|
|
apiKey := cfg.APIKey
|
|
if apiKey == "" && len(m.appConfig.ApiKeys) > 0 {
|
|
apiKey = m.appConfig.ApiKeys[0]
|
|
}
|
|
|
|
collectionsCfg := &collections.Config{
|
|
LLMAPIURL: apiURL,
|
|
LLMAPIKey: apiKey,
|
|
LLMModel: cfg.DefaultModel,
|
|
CollectionDBPath: m.storage.CollectionsDir(userID),
|
|
FileAssets: m.storage.AssetsDir(userID),
|
|
VectorEngine: cfg.VectorEngine,
|
|
EmbeddingModel: cfg.EmbeddingModel,
|
|
MaxChunkingSize: cfg.MaxChunkingSize,
|
|
ChunkOverlap: cfg.ChunkOverlap,
|
|
DatabaseURL: cfg.DatabaseURL,
|
|
}
|
|
|
|
backend, _ := collections.NewInProcessBackend(collectionsCfg)
|
|
m.collectionsCache[userID] = backend
|
|
return backend, nil
|
|
}
|
|
|
|
// GetSkills returns the skills service for a user, creating it lazily.
|
|
func (m *UserServicesManager) GetSkills(userID string) (*skills.Service, error) {
|
|
m.mu.RLock()
|
|
if svc, ok := m.skillsCache[userID]; ok {
|
|
m.mu.RUnlock()
|
|
return svc, nil
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if svc, ok := m.skillsCache[userID]; ok {
|
|
return svc, nil
|
|
}
|
|
|
|
if err := m.storage.EnsureUserDirs(userID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
skillsDir := m.storage.SkillsDir(userID)
|
|
svc, err := skills.NewService(skillsDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.skillsCache[userID] = svc
|
|
return svc, nil
|
|
}
|
|
|
|
// GetJobs returns the agent job service for a user, creating it lazily.
|
|
func (m *UserServicesManager) GetJobs(userID string) (*AgentJobService, error) {
|
|
m.mu.RLock()
|
|
if svc, ok := m.jobsCache[userID]; ok {
|
|
m.mu.RUnlock()
|
|
return svc, nil
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if svc, ok := m.jobsCache[userID]; ok {
|
|
return svc, nil
|
|
}
|
|
|
|
if err := m.storage.EnsureUserDirs(userID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
svc := NewAgentJobServiceWithPaths(
|
|
m.appConfig,
|
|
m.modelLoader,
|
|
m.configLoader,
|
|
m.evaluator,
|
|
m.storage.TasksFile(userID),
|
|
m.storage.JobsFile(userID),
|
|
)
|
|
// Set user ID for per-user DB scoping
|
|
svc.SetUserID(userID)
|
|
// Inherit distributed backends so per-user jobs go through NATS + DB
|
|
if m.jobDispatcher != nil {
|
|
svc.SetDistributedBackends(m.jobDispatcher)
|
|
}
|
|
// Inherit the NATS client so per-user tasks broadcast across replicas. Must be
|
|
// set before the hydrate below (LoadFromDB / LoadTasksFromFile) so the tasks
|
|
// SyncedMap is rebuilt with the client while it is still empty.
|
|
svc.SetTaskSyncNATS(m.jobNats)
|
|
if m.jobDBStore != nil {
|
|
svc.SetDistributedJobStore(m.jobDBStore)
|
|
// Load tasks/jobs from DB immediately (per-user services skip Start())
|
|
svc.LoadFromDB()
|
|
} else {
|
|
// Load from per-user files
|
|
if err := svc.LoadTasksFromFile(); err != nil {
|
|
xlog.Warn("Failed to load tasks from file for user", "userID", userID, "error", err)
|
|
}
|
|
if err := svc.LoadJobsFromFile(); err != nil {
|
|
xlog.Warn("Failed to load jobs from file for user", "userID", userID, "error", err)
|
|
}
|
|
}
|
|
m.jobsCache[userID] = svc
|
|
return svc, nil
|
|
}
|
|
|
|
// SetJobDispatcher sets the distributed dispatcher for per-user job services.
|
|
func (m *UserServicesManager) SetJobDispatcher(d DistributedDispatcher) {
|
|
m.jobDispatcher = d
|
|
}
|
|
|
|
// SetJobDBStore sets the database-backed job store for per-user job services.
|
|
func (m *UserServicesManager) SetJobDBStore(s *jobs.JobStore) {
|
|
m.jobDBStore = s
|
|
}
|
|
|
|
// SetJobSyncNATS sets the NATS client used to keep per-user agent tasks consistent
|
|
// across replicas.
|
|
func (m *UserServicesManager) SetJobSyncNATS(nats messaging.MessagingClient) {
|
|
m.jobNats = nats
|
|
}
|
|
|
|
// ListAllUserIDs returns all user IDs that have scoped data directories.
|
|
func (m *UserServicesManager) ListAllUserIDs() ([]string, error) {
|
|
return m.storage.ListUserDirs()
|
|
}
|
|
|
|
// getPort extracts the port from the API address config.
|
|
func getPort(appConfig *config.ApplicationConfig) string {
|
|
addr := appConfig.APIAddress
|
|
for i := len(addr) - 1; i >= 0; i-- {
|
|
if addr[i] == ':' {
|
|
return addr[i+1:]
|
|
}
|
|
}
|
|
return addr
|
|
}
|
|
|
|
// StopAll stops all cached job services.
|
|
func (m *UserServicesManager) StopAll() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
for _, svc := range m.jobsCache {
|
|
if err := svc.Stop(); err != nil {
|
|
xlog.Error("Failed to stop user job service", "error", err)
|
|
}
|
|
}
|
|
}
|