Files
LocalAI/core/services/agentpool/agent_pool.go
Pete d2e6b93369 feat(agents): surface KB source citations in RAG responses (#10228)
* dev knowledge.go structure

Signed-off-by: Pete Chen <petechentw@gmail.com>

* feat(agents): append KB source citations to responses

Render structured KB citations as a Sources block after agent responses, linking each source to the existing raw collection entry endpoint.

Keep long-term memory writes on the original model response so citation blocks do not get stored back into the knowledge base.

Tested with: go test ./core/services/agents

Assisted-by: Codex:gpt-5
Signed-off-by: Pete Chen <petechentw@gmail.com>

* Collect KB citations from tool searches

Signed-off-by: Pete Chen <petechentw@gmail.com>

* fix(agents): append KB sources in local chats

Apply the shared KB citation post-processing to standalone LocalAGI chat responses so the React agent chat receives the same clickable Sources block as the native executor path. Also fix the run target to use the current cmd/local-ai entrypoint.

Assisted-by: Codex:gpt-5
Signed-off-by: Pete Chen <petechentw@gmail.com>

---------

Signed-off-by: Pete Chen <petechentw@gmail.com>
Co-authored-by: shihyunhuang <shihyunhuang88@gmail.com>
Co-authored-by: TLoE419 <tloemizuchizu@gmail.com>
Co-authored-by: Ching Kao <0980124jim@gmail.com>
2026-06-09 16:32:56 +02:00

1172 lines
38 KiB
Go

package agentpool
import (
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/services/agents"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/messaging"
skillsManager "github.com/mudler/LocalAI/core/services/skills"
"github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/sse"
"github.com/mudler/LocalAGI/core/state"
coreTypes "github.com/mudler/LocalAGI/core/types"
agiServices "github.com/mudler/LocalAGI/services"
"github.com/mudler/LocalAGI/services/skills"
"github.com/mudler/LocalAGI/webui/collections"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// localAGICore manages the in-process LocalAGI agent pool (standalone mode only).
type localAGICore struct {
pool *state.AgentPool
skillsService *skills.Service
configMeta state.AgentConfigMeta
sharedState *coreTypes.AgentSharedState
actionsConfig map[string]string
}
// distributedBridge connects to the NATS-based distributed agent system.
type distributedBridge struct {
natsClient messaging.Publisher // NATS client for distributed agent execution
agentStore *agents.AgentStore // PostgreSQL agent config store
eventBridge AgentEventBridge // Event bridge for SSE + persistence
skillStore *distributed.SkillStore // PostgreSQL skill metadata (distributed mode)
dispatcher agents.Dispatcher // Native dispatcher (distributed or local)
}
// userManager handles per-user services, storage, and auth.
type userManager struct {
userServices *UserServicesManager
userStorage *UserScopedStorage
authDB *gorm.DB
}
// AgentPoolService wraps LocalAGI's AgentPool, Skills service, and collections backend
// to provide agentic capabilities integrated directly into LocalAI.
type AgentPoolService struct {
appConfig *config.ApplicationConfig
collectionsBackend collections.Backend
configBackend AgentConfigBackend // Abstracts local vs distributed agent operations
localAGI localAGICore
distributed distributedBridge
users userManager
stateDir string
outputsDir string
apiURL string // Resolved API URL for agent execution
apiKey string // Resolved API key for agent execution
mu sync.Mutex
}
// AgentEventBridge is the interface for event publishing needed by AgentPoolService.
type AgentEventBridge interface {
PublishMessage(agentName, userID, sender, content, messageID string) error
PublishStatus(agentName, userID, status string) error
PublishStreamEvent(agentName, userID string, data map[string]any) error
RegisterCancel(key string, cancel context.CancelFunc)
DeregisterCancel(key string)
}
// AgentConfigStore is the interface for agent config persistence.
type AgentConfigStore interface {
SaveConfig(cfg *agents.AgentConfigRecord) error
GetConfig(userID, name string) (*agents.AgentConfigRecord, error)
ListConfigs(userID string) ([]agents.AgentConfigRecord, error)
DeleteConfig(userID, name string) error
UpdateStatus(userID, name, status string) error
UpdateLastRun(userID, name string) error
}
// AgentPoolOptions holds optional dependencies for AgentPoolService.
// Zero values are fine — the service degrades gracefully without them.
type AgentPoolOptions struct {
AuthDB *gorm.DB
SkillStore *distributed.SkillStore
NATSClient messaging.Publisher
EventBridge AgentEventBridge
AgentStore *agents.AgentStore
}
func NewAgentPoolService(appConfig *config.ApplicationConfig, opts ...AgentPoolOptions) (*AgentPoolService, error) {
svc := &AgentPoolService{
appConfig: appConfig,
}
if len(opts) > 0 {
o := opts[0]
if o.AuthDB != nil {
svc.users.authDB = o.AuthDB
}
if o.SkillStore != nil {
svc.distributed.skillStore = o.SkillStore
}
if o.NATSClient != nil {
svc.distributed.natsClient = o.NATSClient
}
if o.EventBridge != nil {
svc.distributed.eventBridge = o.EventBridge
}
if o.AgentStore != nil {
svc.distributed.agentStore = o.AgentStore
}
}
return svc, nil
}
func (s *AgentPoolService) Start(ctx context.Context) error {
cfg := s.appConfig.AgentPool
// API URL: use configured value, or derive self-referencing URL from LocalAI's address
apiURL := cfg.APIURL
if apiURL == "" {
_, port, err := net.SplitHostPort(s.appConfig.APIAddress)
if err != nil {
port = strings.TrimPrefix(s.appConfig.APIAddress, ":")
}
apiURL = "http://127.0.0.1:" + port
}
apiKey := cfg.APIKey
if apiKey == "" && len(s.appConfig.ApiKeys) > 0 {
apiKey = s.appConfig.ApiKeys[0]
}
s.apiURL = apiURL
s.apiKey = apiKey
// Distributed mode: use native executor + NATSDispatcher.
// No LocalAGI pool, no collections, no skills service — all stateless.
if s.distributed.natsClient != nil {
return s.startDistributed(ctx, apiURL, apiKey)
}
// Standalone mode: use LocalAGI pool (backward compat)
return s.startLocalAGI(ctx, cfg, apiURL, apiKey)
}
func (s *AgentPoolService) buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets string) *collections.Config {
cfg := s.appConfig.AgentPool
return &collections.Config{
LLMAPIURL: apiURL,
LLMAPIKey: apiKey,
LLMModel: cfg.DefaultModel,
CollectionDBPath: collectionDBPath,
FileAssets: fileAssets,
VectorEngine: cfg.VectorEngine,
EmbeddingModel: cfg.EmbeddingModel,
MaxChunkingSize: cfg.MaxChunkingSize,
ChunkOverlap: cfg.ChunkOverlap,
DatabaseURL: cfg.DatabaseURL,
}
}
// startDistributed initializes the native agent executor with NATS dispatcher.
// No LocalAGI pool is created — agent execution is stateless.
// Skills and collections are still initialized for the frontend UI.
func (s *AgentPoolService) startDistributed(ctx context.Context, apiURL, apiKey string) error {
cfg := s.appConfig.AgentPool
// State dir for skills and outputs
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
if err := os.MkdirAll(stateDir, 0750); err != nil {
xlog.Warn("Failed to create agent state dir", "error", err)
}
s.stateDir = stateDir
// Outputs directory
outputsDir := filepath.Join(stateDir, "outputs")
if err := os.MkdirAll(outputsDir, 0750); err != nil {
xlog.Warn("Failed to create outputs directory", "error", err)
}
s.outputsDir = outputsDir
// Skills service — same as standalone, filesystem-based
skillsSvc, err := skills.NewService(stateDir)
if err != nil {
xlog.Warn("Failed to create skills service in distributed mode", "error", err)
} else {
s.localAGI.skillsService = skillsSvc
}
// Collections backend — same as standalone, in-process
collectionDBPath := cfg.CollectionDBPath
if collectionDBPath == "" {
collectionDBPath = filepath.Join(stateDir, "collections")
}
fileAssets := filepath.Join(stateDir, "assets")
collectionsBackend, _ := collections.NewInProcessBackend(s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets))
s.collectionsBackend = collectionsBackend
// User-scoped storage
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
// Start the background agent scheduler on the frontend.
// It needs DB access to list configs and update LastRunAt — the worker doesn't have DB.
// The advisory lock ensures only one frontend instance runs the scheduler.
if s.users.authDB != nil && s.distributed.natsClient != nil && s.distributed.agentStore != nil {
var schedulerOpts []agents.AgentSchedulerOpt
if s.distributed.skillStore != nil {
schedulerOpts = append(schedulerOpts, agents.WithSchedulerSkillProvider(s.buildSkillProvider()))
}
scheduler := agents.NewAgentScheduler(
s.users.authDB,
s.distributed.natsClient,
s.distributed.agentStore,
messaging.SubjectAgentExecute,
schedulerOpts...,
)
go scheduler.Start(ctx)
}
// Wire the distributed config backend
s.configBackend = newDistributedAgentConfigBackend(s, s.distributed.agentStore)
xlog.Info("Agent pool started in distributed mode (frontend dispatcher only)", "apiURL", apiURL, "stateDir", stateDir)
return nil
}
// startLocalAGI initializes the full LocalAGI pool for standalone mode.
func (s *AgentPoolService) startLocalAGI(_ context.Context, cfg config.AgentPoolConfig, apiURL, apiKey string) error {
// State dir: explicit config > DataPath > DynamicConfigsDir > fallback
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
if err := os.MkdirAll(stateDir, 0750); err != nil {
return fmt.Errorf("failed to create agent pool state dir: %w", err)
}
// Collections paths
collectionDBPath := cfg.CollectionDBPath
if collectionDBPath == "" {
collectionDBPath = filepath.Join(stateDir, "collections")
}
fileAssets := filepath.Join(stateDir, "assets")
// Skills service
skillsSvc, err := skills.NewService(stateDir)
if err != nil {
xlog.Error("Failed to create skills service", "error", err)
}
s.localAGI.skillsService = skillsSvc
// Actions config map
actionsConfig := map[string]string{
agiServices.ConfigStateDir: stateDir,
}
if cfg.CustomActionsDir != "" {
actionsConfig[agiServices.CustomActionsDir] = cfg.CustomActionsDir
}
// Create outputs subdirectory
outputsDir := filepath.Join(stateDir, "outputs")
if err := os.MkdirAll(outputsDir, 0750); err != nil {
xlog.Error("Failed to create outputs directory", "path", outputsDir, "error", err)
}
s.localAGI.actionsConfig = actionsConfig
s.stateDir = stateDir
s.outputsDir = outputsDir
s.localAGI.sharedState = coreTypes.NewAgentSharedState(5 * time.Minute)
// Initialize user-scoped storage
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
// Create the agent pool
pool, err := state.NewAgentPool(
cfg.DefaultModel,
cfg.MultimodalModel,
cfg.TranscriptionModel,
cfg.TranscriptionLanguage,
cfg.TTSModel,
apiURL,
apiKey,
stateDir,
agiServices.Actions(actionsConfig),
agiServices.Connectors,
agiServices.DynamicPrompts(actionsConfig),
agiServices.Filters,
cfg.Timeout,
cfg.EnableLogs,
skillsSvc,
)
if err != nil {
return fmt.Errorf("failed to create agent pool: %w", err)
}
s.localAGI.pool = pool
// Create in-process collections backend and RAG provider
collectionsCfg := s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets)
collectionsBackend, collectionsState := collections.NewInProcessBackend(collectionsCfg)
s.collectionsBackend = collectionsBackend
embedded := collections.RAGProviderFromState(collectionsState)
pool.SetRAGProvider(func(collectionName, _, _ string) (agent.RAGDB, state.KBCompactionClient, bool) {
return embedded(collectionName)
})
// Build config metadata for UI
s.localAGI.configMeta = state.NewAgentConfigMeta(
agiServices.ActionsConfigMeta(cfg.CustomActionsDir),
agiServices.ConnectorsConfigMeta(),
agiServices.DynamicPromptsConfigMeta(cfg.CustomActionsDir),
agiServices.FiltersConfigMeta(),
)
// Start all agents
if err := pool.StartAll(); err != nil {
xlog.Error("Failed to start agent pool", "error", err)
}
// Wire the local config backend
s.configBackend = newLocalAgentConfigBackend(s)
xlog.Info("Agent pool started (standalone/LocalAGI mode)", "stateDir", stateDir, "apiURL", apiURL)
return nil
}
func (s *AgentPoolService) Stop() {
if s.configBackend != nil {
s.configBackend.Stop()
}
}
// ConfigBackend returns the underlying AgentConfigBackend.
func (s *AgentPoolService) ConfigBackend() AgentConfigBackend {
return s.configBackend
}
// APIURL returns the resolved API URL for agent execution.
func (s *AgentPoolService) APIURL() string {
return s.apiURL
}
// APIKey returns the resolved API key for agent execution.
func (s *AgentPoolService) APIKey() string {
return s.apiKey
}
// Pool returns the underlying AgentPool.
func (s *AgentPoolService) Pool() *state.AgentPool {
return s.localAGI.pool
}
// SetNATSClient sets the NATS client for distributed agent execution.
// Deprecated: prefer passing NATSClient via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetNATSClient(nc messaging.Publisher) {
s.distributed.natsClient = nc
}
// SetEventBridge sets the event bridge for distributed SSE + persistence.
// Deprecated: prefer passing EventBridge via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetEventBridge(eb AgentEventBridge) {
s.distributed.eventBridge = eb
}
// SetAgentStore sets the PostgreSQL agent config store.
// Deprecated: prefer passing AgentStore via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetAgentStore(store *agents.AgentStore) {
s.distributed.agentStore = store
}
// Agent execution in distributed mode is handled by the dedicated agent-worker process
// using the NATSDispatcher from core/services/agents/dispatcher.go.
// The frontend only dispatches chat events to NATS via dispatchChat().
// --- Agent CRUD ---
func (s *AgentPoolService) GetAgent(name string) *agent.Agent {
// GetAgent is used by the responses interceptor to check if a model name
// is an agent. It uses the raw pool key (no userID prefix).
return s.configBackend.GetAgent("", name)
}
// Chat sends a message to an agent and returns immediately. Responses come via SSE.
func (s *AgentPoolService) Chat(name, message string) (string, error) {
ag := s.localAGI.pool.GetAgent(name)
if ag == nil {
return "", fmt.Errorf("%w: %s", ErrAgentNotFound, name)
}
manager := s.localAGI.pool.GetManager(name)
if manager == nil {
return "", fmt.Errorf("SSE manager not found for agent: %s", name)
}
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message via SSE
userMsg, _ := json.Marshal(map[string]any{
"id": messageID + "-user",
"sender": "user",
"content": message,
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(userMsg)).WithEvent("json_message"))
// Send processing status
statusMsg, _ := json.Marshal(map[string]any{
"status": "processing",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(statusMsg)).WithEvent("json_message_status"))
// Process asynchronously
go func() {
response := ag.Ask(coreTypes.WithText(message))
if response == nil {
errMsg, _ := json.Marshal(map[string]any{
"error": "agent request failed or was cancelled",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else if response.Error != nil {
errMsg, _ := json.Marshal(map[string]any{
"error": response.Error.Error(),
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else {
// Collect metadata from all action states
metadata := map[string]any{}
for _, state := range response.State {
for k, v := range state.Metadata {
if existing, ok := metadata[k]; ok {
if existList, ok := existing.([]string); ok {
if newList, ok := v.([]string); ok {
metadata[k] = append(existList, newList...)
continue
}
}
}
metadata[k] = v
}
}
if len(metadata) > 0 {
// Extract userID from the agent key (format: "userID:agentName")
var chatUserID string
if uid, _, ok := strings.Cut(name, ":"); ok {
chatUserID = uid
}
s.collectAndCopyMetadata(metadata, chatUserID)
}
content := s.appendLocalAGIKBCitations(response.Response, name, message, response.State)
msg := map[string]any{
"id": messageID + "-agent",
"sender": "agent",
"content": content,
"timestamp": time.Now().Format(time.RFC3339),
}
if len(metadata) > 0 {
msg["metadata"] = metadata
}
respMsg, _ := json.Marshal(msg)
manager.Send(sse.NewMessage(string(respMsg)).WithEvent("json_message"))
}
completedMsg, _ := json.Marshal(map[string]any{
"status": "completed",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(completedMsg)).WithEvent("json_message_status"))
}()
return messageID, nil
}
func (s *AgentPoolService) appendLocalAGIKBCitations(response, agentKey, message string, states []coreTypes.ActionState) string {
if strings.TrimSpace(response) == "" {
return response
}
userID, collection := splitAgentKey(agentKey)
cfg := s.localAGI.pool.GetConfig(agentKey)
if cfg == nil || !cfg.EnableKnowledgeBase {
return response
}
citations := kbCitationsFromActionStates(states)
if len(citations) == 0 && cfg.KBAutoSearch {
maxResults := cfg.KnowledgeBaseResults
if maxResults <= 0 {
maxResults = 5
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
kbResult := agents.KBAutoSearchPrompt(ctx, s.apiURL, s.apiKey, collection, message, maxResults, userID)
citations = kbResult.Citations
}
return agents.AppendKBCitations(response, collection, userID, citations)
}
func splitAgentKey(agentKey string) (userID, name string) {
if uid, n, ok := strings.Cut(agentKey, ":"); ok {
return uid, n
}
return "", agentKey
}
func kbCitationsFromActionStates(states []coreTypes.ActionState) []agents.KBCitation {
var citations []agents.KBCitation
for _, state := range states {
citations = append(citations, kbCitationsFromMetadata(state.Metadata)...)
}
return citations
}
func kbCitationsFromMetadata(metadata map[string]any) []agents.KBCitation {
if len(metadata) == 0 {
return nil
}
fileName := metadata["file_name"]
source := metadata["source"]
if fileName == nil && source == nil {
return nil
}
citation := agents.KBCitation{
FileName: metadataString(fileName),
EntryKey: metadataString(source),
}
if citation.FileName == "" && citation.EntryKey == "" {
return nil
}
return []agents.KBCitation{citation}
}
func metadataString(value any) string {
switch v := value.(type) {
case string:
return v
case fmt.Stringer:
return v.String()
default:
return ""
}
}
// userOutputsDir returns the per-user outputs directory, creating it if needed.
// If userID is empty, falls back to the shared outputs directory.
func (s *AgentPoolService) userOutputsDir(userID string) string {
if userID == "" {
return s.outputsDir
}
dir := filepath.Join(s.outputsDir, userID)
os.MkdirAll(dir, 0750)
return dir
}
// copyToOutputs copies a file into the per-user outputs directory and returns the new path.
// If the file is already inside the target dir, it returns the original path unchanged.
func (s *AgentPoolService) copyToOutputs(srcPath, userID string) (string, error) {
targetDir := s.userOutputsDir(userID)
srcClean := filepath.Clean(srcPath)
absTarget, _ := filepath.Abs(targetDir)
absSrc, _ := filepath.Abs(srcClean)
if strings.HasPrefix(absSrc, absTarget+string(os.PathSeparator)) {
return srcPath, nil
}
src, err := os.Open(srcClean)
if err != nil {
return "", err
}
defer src.Close()
dstPath := filepath.Join(targetDir, filepath.Base(srcClean))
dst, err := os.Create(dstPath)
if err != nil {
return "", err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return "", err
}
return dstPath, nil
}
// collectAndCopyMetadata iterates all metadata keys and, for any value that is
// a []string of local file paths, copies those files into the per-user outputs
// directory so the file endpoint can serve them from a single confined location.
// Entries that are URLs (http/https) are left unchanged.
func (s *AgentPoolService) collectAndCopyMetadata(metadata map[string]any, userID string) {
for key, val := range metadata {
list, ok := val.([]string)
if !ok {
continue
}
updated := make([]string, 0, len(list))
for _, p := range list {
if strings.HasPrefix(p, "http://") || strings.HasPrefix(p, "https://") {
updated = append(updated, p)
continue
}
newPath, err := s.copyToOutputs(p, userID)
if err != nil {
xlog.Error("Failed to copy file to outputs", "src", p, "error", err)
updated = append(updated, p)
continue
}
updated = append(updated, newPath)
}
metadata[key] = updated
}
}
func (s *AgentPoolService) GetConfigMeta() state.AgentConfigMeta {
return s.localAGI.configMeta
}
// GetConfigMetaResult returns the config metadata via the backend, which handles
// local vs distributed differences (LocalAGI metadata vs native static metadata).
func (s *AgentPoolService) GetConfigMetaResult() AgentConfigMetaResult {
return s.configBackend.GetConfigMeta()
}
func (s *AgentPoolService) AgentHubURL() string {
return s.appConfig.AgentPool.AgentHubURL
}
func (s *AgentPoolService) StateDir() string {
return s.stateDir
}
func (s *AgentPoolService) OutputsDir() string {
return s.outputsDir
}
// ExportAgent returns the agent config as JSON bytes.
func (s *AgentPoolService) ExportAgent(name string) ([]byte, error) {
// Extract userID and agent name from the key (format: "userID:agentName")
userID := ""
agentName := name
if u, a, ok := strings.Cut(name, ":"); ok {
userID = u
agentName = a
}
return s.configBackend.ExportConfig(userID, agentName)
}
// --- User Services ---
// SetUserServicesManager sets the user services manager for per-user scoping.
func (s *AgentPoolService) SetUserServicesManager(usm *UserServicesManager) {
s.users.userServices = usm
}
// UserStorage returns the user-scoped storage.
func (s *AgentPoolService) UserStorage() *UserScopedStorage {
return s.users.userStorage
}
// UserServicesManager returns the user services manager.
func (s *AgentPoolService) UserServicesManager() *UserServicesManager {
return s.users.userServices
}
// SetAuthDB sets the auth database for API key generation.
// Deprecated: prefer passing AuthDB via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetAuthDB(db *gorm.DB) {
s.users.authDB = db
}
// SetSkillStore sets the distributed skill store for persisting skill metadata to PostgreSQL.
// Deprecated: prefer passing SkillStore via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetSkillStore(store *distributed.SkillStore) {
s.distributed.skillStore = store
}
// --- Admin Aggregation ---
// UserAgentInfo holds agent info for cross-user listing.
type UserAgentInfo struct {
Name string `json:"name"`
Active bool `json:"active"`
}
// ListAllAgentsGrouped returns all agents grouped by user ID.
// Keys without ":" go into the "" (root) group.
func (s *AgentPoolService) ListAllAgentsGrouped() map[string][]UserAgentInfo {
return s.configBackend.ListAllGrouped()
}
// --- ForUser Collections ---
// ListCollectionsForUser lists collections for a specific user.
func (s *AgentPoolService) ListCollectionsForUser(userID string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListCollections()
}
// CreateCollectionForUser creates a collection for a specific user.
func (s *AgentPoolService) CreateCollectionForUser(userID, name string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.CreateCollection(name)
}
// ensureCollectionForUser creates a collection for the user if it doesn't already exist.
func (s *AgentPoolService) ensureCollectionForUser(userID, name string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
collections, err := backend.ListCollections()
if err != nil {
return err
}
if slices.Contains(collections, name) {
return nil
}
return backend.CreateCollection(name)
}
// UploadToCollectionForUser uploads to a collection for a specific user.
// The filename arrives from a multipart upload; the vendored backend may or
// may not sanitise it, so strip any directory components at the boundary.
func (s *AgentPoolService) UploadToCollectionForUser(userID, collection, filename string, fileBody io.Reader) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
base := filepath.Base(filename)
if base == "." || base == ".." || base == "/" || base == "" {
return "", fmt.Errorf("invalid filename")
}
return backend.Upload(collection, base, fileBody)
}
// CollectionEntryExistsForUser checks if an entry exists in a user's collection.
func (s *AgentPoolService) CollectionEntryExistsForUser(userID, collection, entry string) bool {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return false
}
return backend.EntryExists(collection, entry)
}
// ListCollectionEntriesForUser lists entries in a user's collection.
func (s *AgentPoolService) ListCollectionEntriesForUser(userID, collection string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListEntries(collection)
}
// GetCollectionEntryContentForUser gets entry content for a user's collection.
func (s *AgentPoolService) GetCollectionEntryContentForUser(userID, collection, entry string) (string, int, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", 0, err
}
return backend.GetEntryContent(collection, entry)
}
// SearchCollectionForUser searches a user's collection.
func (s *AgentPoolService) SearchCollectionForUser(userID, collection, query string, maxResults int) ([]collections.SearchResult, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.Search(collection, query, maxResults)
}
// ResetCollectionForUser resets a user's collection.
func (s *AgentPoolService) ResetCollectionForUser(userID, collection string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.Reset(collection)
}
// DeleteCollectionEntryForUser deletes an entry from a user's collection.
func (s *AgentPoolService) DeleteCollectionEntryForUser(userID, collection, entry string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.DeleteEntry(collection, entry)
}
// AddCollectionSourceForUser adds a source to a user's collection.
func (s *AgentPoolService) AddCollectionSourceForUser(userID, collection, sourceURL string, intervalMin int) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.AddSource(collection, sourceURL, intervalMin)
}
// RemoveCollectionSourceForUser removes a source from a user's collection.
func (s *AgentPoolService) RemoveCollectionSourceForUser(userID, collection, sourceURL string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.RemoveSource(collection, sourceURL)
}
// ListCollectionSourcesForUser lists sources for a user's collection.
func (s *AgentPoolService) ListCollectionSourcesForUser(userID, collection string) ([]collections.SourceInfo, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListSources(collection)
}
// GetCollectionEntryFilePathForUser gets the file path for an entry in a user's collection.
func (s *AgentPoolService) GetCollectionEntryFilePathForUser(userID, collection, entry string) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
return backend.GetEntryFilePath(collection, entry)
}
// --- ForUser Agent Methods ---
// ListAgentsForUser lists agents belonging to a specific user.
// If userID is empty, returns all agents (backward compat).
func (s *AgentPoolService) ListAgentsForUser(userID string) map[string]bool {
return s.configBackend.ListAgents(userID)
}
// CreateAgentForUser creates an agent namespaced to a user.
// When auth is enabled and the agent config has no API key, a new user API key
// is auto-generated so the agent can authenticate against LocalAI's own API.
func (s *AgentPoolService) CreateAgentForUser(userID string, config *state.AgentConfig) error {
if err := ValidateAgentName(config.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+config.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
xlog.Info("Auto-generated API key for agent", "agent", config.Name, "user", userID)
}
if err := s.configBackend.SaveConfig(userID, config); err != nil {
return err
}
// Auto-create collection when knowledge base or long-term memory is enabled
if config.EnableKnowledgeBase || config.LongTermMemory {
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
}
}
return nil
}
// GetAgentForUser returns the agent for a user.
// Returns nil in distributed mode where agents don't run in-process.
func (s *AgentPoolService) GetAgentForUser(userID, name string) *agent.Agent {
return s.configBackend.GetAgent(userID, name)
}
// GetAgentConfigForUser returns the agent config for a user's agent.
func (s *AgentPoolService) GetAgentConfigForUser(userID, name string) *state.AgentConfig {
return s.configBackend.GetConfig(userID, name)
}
// UpdateAgentForUser updates a user's agent.
func (s *AgentPoolService) UpdateAgentForUser(userID, name string, config *state.AgentConfig) error {
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
}
if err := s.configBackend.UpdateConfig(userID, name, config); err != nil {
return err
}
// Auto-create collection when knowledge base or long-term memory is enabled
if config.EnableKnowledgeBase || config.LongTermMemory {
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
}
}
return nil
}
// DeleteAgentForUser deletes a user's agent.
func (s *AgentPoolService) DeleteAgentForUser(userID, name string) error {
return s.configBackend.DeleteConfig(userID, name)
}
// PauseAgentForUser pauses a user's agent.
func (s *AgentPoolService) PauseAgentForUser(userID, name string) error {
return s.configBackend.SetStatus(userID, name, "paused")
}
// ResumeAgentForUser resumes a user's agent.
func (s *AgentPoolService) ResumeAgentForUser(userID, name string) error {
return s.configBackend.SetStatus(userID, name, "active")
}
// GetAgentStatusForUser returns the status of a user's agent.
// Returns nil in distributed mode where status is not tracked in-process.
func (s *AgentPoolService) GetAgentStatusForUser(userID, name string) *state.Status {
return s.configBackend.GetStatus(userID, name)
}
// GetAgentObservablesForUser returns observables for a user's agent as raw JSON entries.
func (s *AgentPoolService) GetAgentObservablesForUser(userID, name string) ([]json.RawMessage, error) {
return s.configBackend.GetObservables(userID, name)
}
// ClearAgentObservablesForUser clears observables for a user's agent.
func (s *AgentPoolService) ClearAgentObservablesForUser(userID, name string) error {
return s.configBackend.ClearObservables(userID, name)
}
// ChatForUser sends a message to a user's agent.
func (s *AgentPoolService) ChatForUser(userID, name, message string) (string, error) {
return s.configBackend.Chat(userID, name, message)
}
// dispatchChat publishes a chat event to the NATS agent execution queue.
// The event is enriched with the full agent config and resolved skills so that
// the worker does not need direct database access.
func (s *AgentPoolService) dispatchChat(userID, name, message string) (string, error) {
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message to SSE immediately so the UI shows it right away
if s.distributed.eventBridge != nil {
agentName := name
s.distributed.eventBridge.PublishMessage(agentName, userID, "user", message, messageID+"-user")
s.distributed.eventBridge.PublishStatus(agentName, userID, "processing")
}
// Load config from DB to embed in the NATS payload
var cfg *agents.AgentConfig
if s.distributed.agentStore != nil {
rec, err := s.distributed.agentStore.GetConfig(userID, name)
if err != nil {
return "", fmt.Errorf("agent config not found: %w", err)
}
var c agents.AgentConfig
if err := agents.ParseConfigJSON(rec.ConfigJSON, &c); err != nil {
return "", fmt.Errorf("invalid agent config: %w", err)
}
cfg = &c
}
// Load skills if enabled — uses SkillManager which reads from filesystem/PostgreSQL
var skills []agents.SkillInfo
if cfg != nil && cfg.EnableSkills {
if loaded, err := s.loadSkillsForUser(userID); err == nil {
skills = loaded
}
}
evt := agents.AgentChatEvent{
AgentName: name,
UserID: userID,
Message: message,
MessageID: messageID,
Role: "user",
Config: cfg,
Skills: skills,
}
if err := s.distributed.natsClient.Publish(messaging.SubjectAgentExecute, evt); err != nil {
return "", fmt.Errorf("failed to dispatch agent chat: %w", err)
}
return messageID, nil
}
// GetSSEManagerForUser returns the SSE manager for a user's agent.
// Returns nil in distributed mode where SSE is handled by EventBridge.
func (s *AgentPoolService) GetSSEManagerForUser(userID, name string) sse.Manager {
return s.configBackend.GetSSEManager(userID, name)
}
// ExportAgentForUser exports a user's agent config.
func (s *AgentPoolService) ExportAgentForUser(userID, name string) ([]byte, error) {
return s.ExportAgent(agents.AgentKey(userID, name))
}
// ImportAgentForUser imports an agent for a user.
func (s *AgentPoolService) ImportAgentForUser(userID string, data []byte) error {
var cfg state.AgentConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("invalid agent config: %w", err)
}
if err := ValidateAgentName(cfg.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && cfg.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+cfg.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
cfg.APIKey = plaintext
}
return s.configBackend.ImportConfig(userID, &cfg)
}
// --- ForUser Collections ---
// CollectionsBackendForUser returns the collections backend for a user.
func (s *AgentPoolService) CollectionsBackendForUser(userID string) (collections.Backend, error) {
if s.users.userServices == nil || userID == "" {
if s.collectionsBackend == nil {
return nil, fmt.Errorf("collections not available in distributed mode")
}
return s.collectionsBackend, nil
}
return s.users.userServices.GetCollections(userID)
}
// --- ForUser Skills ---
// SkillsServiceForUser returns the skills service for a user.
func (s *AgentPoolService) SkillsServiceForUser(userID string) (*skills.Service, error) {
if s.users.userServices == nil || userID == "" {
if s.localAGI.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
return s.localAGI.skillsService, nil
}
return s.users.userServices.GetSkills(userID)
}
// SkillManagerForUser returns a SkillManager for a specific user.
// In distributed mode, returns a DistributedManager that syncs to PostgreSQL.
// In standalone mode, returns a FilesystemManager.
func (s *AgentPoolService) SkillManagerForUser(userID string) (skillsManager.Manager, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
fs := skillsManager.NewFilesystemManager(svc)
// In distributed mode, wrap with PostgreSQL sync
if s.distributed.skillStore != nil {
return skillsManager.NewDistributedManager(fs, s.distributed.skillStore, userID), nil
}
return fs, nil
}
// --- ForUser Jobs ---
// JobServiceForUser returns the agent job service for a user.
func (s *AgentPoolService) JobServiceForUser(userID string) (*AgentJobService, error) {
if s.users.userServices == nil || userID == "" {
return nil, fmt.Errorf("no user services manager or empty user ID")
}
return s.users.userServices.GetJobs(userID)
}
// --- Actions ---
// ListAvailableActions returns the list of all available action type names.
// In distributed mode, returns an empty list (actions are configured as MCP tools per agent).
func (s *AgentPoolService) ListAvailableActions() []string {
return s.configBackend.ListAvailableActions()
}
// GetActionDefinition creates an action instance by name with the given config and returns its definition.
func (s *AgentPoolService) GetActionDefinition(actionName string, actionConfig map[string]string) (any, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
if err != nil {
return nil, err
}
return a.Definition(), nil
}
// ExecuteAction creates an action instance and runs it with the given params.
func (s *AgentPoolService) ExecuteAction(ctx context.Context, actionName string, actionConfig map[string]string, params coreTypes.ActionParams) (coreTypes.ActionResult, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
if err != nil {
return coreTypes.ActionResult{}, err
}
return a.Run(ctx, s.localAGI.sharedState, params)
}
// loadSkillsForUser loads full skill info (name, description, content) for a user.
// Used by dispatchChat and the scheduler to enrich NATS events.
func (s *AgentPoolService) loadSkillsForUser(userID string) ([]agents.SkillInfo, error) {
mgr, err := s.SkillManagerForUser(userID)
if err != nil {
return nil, err
}
allSkills, err := mgr.List()
if err != nil {
return nil, err
}
var skills []agents.SkillInfo
for _, sk := range allSkills {
desc := ""
if sk.Metadata != nil && sk.Metadata.Description != "" {
desc = sk.Metadata.Description
}
if desc == "" {
d := sk.Content
if len(d) > 200 {
d = d[:200] + "..."
}
desc = d
}
skills = append(skills, agents.SkillInfo{
Name: sk.Name,
Description: desc,
Content: sk.Content,
})
}
return skills, nil
}
// buildSkillProvider returns a SkillContentProvider closure for the scheduler.
func (s *AgentPoolService) buildSkillProvider() agents.SkillContentProvider {
return func(userID string) ([]agents.SkillInfo, error) {
return s.loadSkillsForUser(userID)
}
}