Files
LocalAI/core/services/agents/store.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

219 lines
7.0 KiB
Go

package agents
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/mudler/LocalAI/core/services/advisorylock"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// AgentConfigRecord persists agent configuration in PostgreSQL.
type AgentConfigRecord struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
UserID string `gorm:"index;size:36" json:"user_id"`
Name string `gorm:"size:255;index" json:"name"`
ConfigJSON string `gorm:"column:config;type:text" json:"-"` // Full agent config as JSON
Status string `gorm:"size:32;default:active" json:"status"` // active, paused, deleted
LastRunAt *time.Time `json:"last_run_at,omitempty"` // Last autonomous/background run
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (AgentConfigRecord) TableName() string { return "agent_configs" }
const (
StatusActive = "active"
StatusPaused = "paused"
StatusDeleted = "deleted"
)
// AgentObservableRecord persists agent action traces (reasoning, tool calls, etc.).
type AgentObservableRecord struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
AgentName string `gorm:"index;size:255" json:"agent_name"`
EventType string `gorm:"size:64" json:"event_type"` // status, action, error
PayloadJSON string `gorm:"column:payload;type:text" json:"-"`
CreatedAt time.Time `gorm:"index" json:"created_at"`
}
func (AgentObservableRecord) TableName() string { return "agent_observables" }
// AgentStore provides PostgreSQL-backed persistence for agent state.
type AgentStore struct {
db *gorm.DB
}
// NewAgentStore creates a new AgentStore and auto-migrates the schema.
// Uses a PostgreSQL advisory lock to prevent concurrent migration races
// when multiple instances (frontend + workers) start at the same time.
func NewAgentStore(db *gorm.DB) (*AgentStore, error) {
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
return db.AutoMigrate(&AgentConfigRecord{}, &AgentObservableRecord{})
}); err != nil {
return nil, fmt.Errorf("migrating agent tables: %w", err)
}
s := &AgentStore{db: db}
s.db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_configs_user_name ON agent_configs(user_id, name)")
return s, nil
}
// --- Agent Config CRUD ---
// SaveConfig creates or updates an agent config.
func (s *AgentStore) SaveConfig(cfg *AgentConfigRecord) error {
cfg.UpdatedAt = time.Now()
if cfg.CreatedAt.IsZero() {
cfg.CreatedAt = cfg.UpdatedAt
}
// Use FirstOrCreate for atomic upsert: if a record with the same
// (user_id, name) already exists, update it in place; otherwise create.
var existing AgentConfigRecord
result := s.db.Where("user_id = ? AND name = ?", cfg.UserID, cfg.Name).
Attrs(AgentConfigRecord{ID: uuid.New().String()}).
FirstOrCreate(&existing)
if result.Error != nil {
return result.Error
}
// Preserve the original ID and creation timestamp.
cfg.ID = existing.ID
cfg.CreatedAt = existing.CreatedAt
return s.db.Model(&existing).Updates(cfg).Error
}
// GetConfig retrieves an agent config by user and name.
func (s *AgentStore) GetConfig(userID, name string) (*AgentConfigRecord, error) {
var cfg AgentConfigRecord
q := s.db.Where("name = ?", name)
if userID != "" {
q = q.Where("user_id = ?", userID)
}
if err := q.First(&cfg).Error; err != nil {
return nil, err
}
return &cfg, nil
}
// GetConfigByID retrieves an agent config by ID.
func (s *AgentStore) GetConfigByID(id string) (*AgentConfigRecord, error) {
var cfg AgentConfigRecord
if err := s.db.First(&cfg, "id = ?", id).Error; err != nil {
return nil, err
}
return &cfg, nil
}
// ListConfigs returns all agent configs for a user.
func (s *AgentStore) ListConfigs(userID string) ([]AgentConfigRecord, error) {
var configs []AgentConfigRecord
q := s.db.Where("status != ?", StatusDeleted).Order("name")
if userID != "" {
q = q.Where("user_id = ?", userID)
}
if err := q.Find(&configs).Error; err != nil {
return nil, err
}
return configs, nil
}
// DeleteConfig soft-deletes an agent config.
func (s *AgentStore) DeleteConfig(userID, name string) error {
return s.db.Model(&AgentConfigRecord{}).
Where("user_id = ? AND name = ?", userID, name).
Update("status", StatusDeleted).Error
}
// HardDeleteConfig permanently removes an agent config.
func (s *AgentStore) HardDeleteConfig(id string) error {
return s.db.Where("id = ?", id).Delete(&AgentConfigRecord{}).Error
}
// UpdateStatus updates the status of an agent (active, paused).
func (s *AgentStore) UpdateStatus(userID, name, status string) error {
return s.db.Model(&AgentConfigRecord{}).
Where("user_id = ? AND name = ?", userID, name).
Updates(map[string]any{"status": status, "updated_at": time.Now()}).Error
}
// --- Conversation History ---
// UpdateLastRun updates the last autonomous run timestamp.
func (s *AgentStore) UpdateLastRun(userID, name string) error {
now := time.Now()
return s.db.Model(&AgentConfigRecord{}).
Where("user_id = ? AND name = ?", userID, name).
Update("last_run_at", &now).Error
}
// --- Observables ---
// AppendObservable adds an observable event.
func (s *AgentStore) AppendObservable(obs *AgentObservableRecord) error {
if obs.ID == "" {
obs.ID = uuid.New().String()
}
if obs.CreatedAt.IsZero() {
obs.CreatedAt = time.Now()
}
return s.db.Create(obs).Error
}
// GetObservables retrieves observables for an agent.
func (s *AgentStore) GetObservables(agentName string, limit int) ([]AgentObservableRecord, error) {
var obs []AgentObservableRecord
q := s.db.Where("agent_name = ?", agentName).Order("created_at DESC")
if limit > 0 {
q = q.Limit(limit)
}
if err := q.Find(&obs).Error; err != nil {
return nil, err
}
return obs, nil
}
// ClearObservables deletes all observables for an agent.
func (s *AgentStore) ClearObservables(agentName string) error {
return s.db.Where("agent_name = ?", agentName).Delete(&AgentObservableRecord{}).Error
}
// DB returns the underlying database connection (for advisory locks etc.)
func (s *AgentStore) DB() *gorm.DB {
return s.db
}
// AgentConfigStoreInterface defines the operations for agent config persistence.
type AgentConfigStoreInterface interface {
SaveConfig(cfg *AgentConfigRecord) error
GetConfig(userID, name string) (*AgentConfigRecord, error)
ListConfigs(userID string) ([]AgentConfigRecord, error)
DeleteConfig(userID, name string) error
UpdateStatus(userID, name, status string) error
UpdateLastRun(userID, name string) error
}
// Compile-time interface compliance check.
var _ AgentConfigStoreInterface = (*AgentStore)(nil)
// --- Helpers ---
// NewAgentStoreFromURL creates an AgentStore by connecting to the given PostgreSQL URL.
func NewAgentStoreFromURL(dbURL string) (*AgentStore, error) {
db, err := gorm.Open(postgres.Open(dbURL), &gorm.Config{})
if err != nil {
return nil, fmt.Errorf("connecting to database: %w", err)
}
return NewAgentStore(db)
}
// ParseConfigJSON unmarshals a JSON string into an AgentConfig.
func ParseConfigJSON(configJSON string, cfg *AgentConfig) error {
return json.Unmarshal([]byte(configJSON), cfg)
}