mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
219 lines
7.0 KiB
Go
219 lines
7.0 KiB
Go
package agents
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/mudler/LocalAI/core/services/advisorylock"
|
|
"gorm.io/driver/postgres"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// AgentConfigRecord persists agent configuration in PostgreSQL.
|
|
type AgentConfigRecord struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
UserID string `gorm:"index;size:36" json:"user_id"`
|
|
Name string `gorm:"size:255;index" json:"name"`
|
|
ConfigJSON string `gorm:"column:config;type:text" json:"-"` // Full agent config as JSON
|
|
Status string `gorm:"size:32;default:active" json:"status"` // active, paused, deleted
|
|
LastRunAt *time.Time `json:"last_run_at,omitempty"` // Last autonomous/background run
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
func (AgentConfigRecord) TableName() string { return "agent_configs" }
|
|
|
|
const (
|
|
StatusActive = "active"
|
|
StatusPaused = "paused"
|
|
StatusDeleted = "deleted"
|
|
)
|
|
|
|
// AgentObservableRecord persists agent action traces (reasoning, tool calls, etc.).
|
|
type AgentObservableRecord struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
AgentName string `gorm:"index;size:255" json:"agent_name"`
|
|
EventType string `gorm:"size:64" json:"event_type"` // status, action, error
|
|
PayloadJSON string `gorm:"column:payload;type:text" json:"-"`
|
|
CreatedAt time.Time `gorm:"index" json:"created_at"`
|
|
}
|
|
|
|
func (AgentObservableRecord) TableName() string { return "agent_observables" }
|
|
|
|
// AgentStore provides PostgreSQL-backed persistence for agent state.
|
|
type AgentStore struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
// NewAgentStore creates a new AgentStore and auto-migrates the schema.
|
|
// Uses a PostgreSQL advisory lock to prevent concurrent migration races
|
|
// when multiple instances (frontend + workers) start at the same time.
|
|
func NewAgentStore(db *gorm.DB) (*AgentStore, error) {
|
|
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
|
|
return db.AutoMigrate(&AgentConfigRecord{}, &AgentObservableRecord{})
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("migrating agent tables: %w", err)
|
|
}
|
|
s := &AgentStore{db: db}
|
|
s.db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_configs_user_name ON agent_configs(user_id, name)")
|
|
return s, nil
|
|
}
|
|
|
|
// --- Agent Config CRUD ---
|
|
|
|
// SaveConfig creates or updates an agent config.
|
|
func (s *AgentStore) SaveConfig(cfg *AgentConfigRecord) error {
|
|
cfg.UpdatedAt = time.Now()
|
|
if cfg.CreatedAt.IsZero() {
|
|
cfg.CreatedAt = cfg.UpdatedAt
|
|
}
|
|
|
|
// Use FirstOrCreate for atomic upsert: if a record with the same
|
|
// (user_id, name) already exists, update it in place; otherwise create.
|
|
var existing AgentConfigRecord
|
|
result := s.db.Where("user_id = ? AND name = ?", cfg.UserID, cfg.Name).
|
|
Attrs(AgentConfigRecord{ID: uuid.New().String()}).
|
|
FirstOrCreate(&existing)
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
|
|
// Preserve the original ID and creation timestamp.
|
|
cfg.ID = existing.ID
|
|
cfg.CreatedAt = existing.CreatedAt
|
|
|
|
return s.db.Model(&existing).Updates(cfg).Error
|
|
}
|
|
|
|
// GetConfig retrieves an agent config by user and name.
|
|
func (s *AgentStore) GetConfig(userID, name string) (*AgentConfigRecord, error) {
|
|
var cfg AgentConfigRecord
|
|
q := s.db.Where("name = ?", name)
|
|
if userID != "" {
|
|
q = q.Where("user_id = ?", userID)
|
|
}
|
|
if err := q.First(&cfg).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return &cfg, nil
|
|
}
|
|
|
|
// GetConfigByID retrieves an agent config by ID.
|
|
func (s *AgentStore) GetConfigByID(id string) (*AgentConfigRecord, error) {
|
|
var cfg AgentConfigRecord
|
|
if err := s.db.First(&cfg, "id = ?", id).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return &cfg, nil
|
|
}
|
|
|
|
// ListConfigs returns all agent configs for a user.
|
|
func (s *AgentStore) ListConfigs(userID string) ([]AgentConfigRecord, error) {
|
|
var configs []AgentConfigRecord
|
|
q := s.db.Where("status != ?", StatusDeleted).Order("name")
|
|
if userID != "" {
|
|
q = q.Where("user_id = ?", userID)
|
|
}
|
|
if err := q.Find(&configs).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return configs, nil
|
|
}
|
|
|
|
// DeleteConfig soft-deletes an agent config.
|
|
func (s *AgentStore) DeleteConfig(userID, name string) error {
|
|
return s.db.Model(&AgentConfigRecord{}).
|
|
Where("user_id = ? AND name = ?", userID, name).
|
|
Update("status", StatusDeleted).Error
|
|
}
|
|
|
|
// HardDeleteConfig permanently removes an agent config.
|
|
func (s *AgentStore) HardDeleteConfig(id string) error {
|
|
return s.db.Where("id = ?", id).Delete(&AgentConfigRecord{}).Error
|
|
}
|
|
|
|
// UpdateStatus updates the status of an agent (active, paused).
|
|
func (s *AgentStore) UpdateStatus(userID, name, status string) error {
|
|
return s.db.Model(&AgentConfigRecord{}).
|
|
Where("user_id = ? AND name = ?", userID, name).
|
|
Updates(map[string]any{"status": status, "updated_at": time.Now()}).Error
|
|
}
|
|
|
|
// --- Conversation History ---
|
|
|
|
// UpdateLastRun updates the last autonomous run timestamp.
|
|
func (s *AgentStore) UpdateLastRun(userID, name string) error {
|
|
now := time.Now()
|
|
return s.db.Model(&AgentConfigRecord{}).
|
|
Where("user_id = ? AND name = ?", userID, name).
|
|
Update("last_run_at", &now).Error
|
|
}
|
|
|
|
// --- Observables ---
|
|
|
|
// AppendObservable adds an observable event.
|
|
func (s *AgentStore) AppendObservable(obs *AgentObservableRecord) error {
|
|
if obs.ID == "" {
|
|
obs.ID = uuid.New().String()
|
|
}
|
|
if obs.CreatedAt.IsZero() {
|
|
obs.CreatedAt = time.Now()
|
|
}
|
|
return s.db.Create(obs).Error
|
|
}
|
|
|
|
// GetObservables retrieves observables for an agent.
|
|
func (s *AgentStore) GetObservables(agentName string, limit int) ([]AgentObservableRecord, error) {
|
|
var obs []AgentObservableRecord
|
|
q := s.db.Where("agent_name = ?", agentName).Order("created_at DESC")
|
|
if limit > 0 {
|
|
q = q.Limit(limit)
|
|
}
|
|
if err := q.Find(&obs).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
return obs, nil
|
|
}
|
|
|
|
// ClearObservables deletes all observables for an agent.
|
|
func (s *AgentStore) ClearObservables(agentName string) error {
|
|
return s.db.Where("agent_name = ?", agentName).Delete(&AgentObservableRecord{}).Error
|
|
}
|
|
|
|
// DB returns the underlying database connection (for advisory locks etc.)
|
|
func (s *AgentStore) DB() *gorm.DB {
|
|
return s.db
|
|
}
|
|
|
|
// AgentConfigStoreInterface defines the operations for agent config persistence.
|
|
type AgentConfigStoreInterface interface {
|
|
SaveConfig(cfg *AgentConfigRecord) error
|
|
GetConfig(userID, name string) (*AgentConfigRecord, error)
|
|
ListConfigs(userID string) ([]AgentConfigRecord, error)
|
|
DeleteConfig(userID, name string) error
|
|
UpdateStatus(userID, name, status string) error
|
|
UpdateLastRun(userID, name string) error
|
|
}
|
|
|
|
// Compile-time interface compliance check.
|
|
var _ AgentConfigStoreInterface = (*AgentStore)(nil)
|
|
|
|
// --- Helpers ---
|
|
|
|
// NewAgentStoreFromURL creates an AgentStore by connecting to the given PostgreSQL URL.
|
|
func NewAgentStoreFromURL(dbURL string) (*AgentStore, error) {
|
|
db, err := gorm.Open(postgres.Open(dbURL), &gorm.Config{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to database: %w", err)
|
|
}
|
|
return NewAgentStore(db)
|
|
}
|
|
|
|
// ParseConfigJSON unmarshals a JSON string into an AgentConfig.
|
|
func ParseConfigJSON(configJSON string, cfg *AgentConfig) error {
|
|
return json.Unmarshal([]byte(configJSON), cfg)
|
|
}
|