mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-11 02:07:27 -04:00
* dev knowledge.go structure Signed-off-by: Pete Chen <petechentw@gmail.com> * feat(agents): append KB source citations to responses Render structured KB citations as a Sources block after agent responses, linking each source to the existing raw collection entry endpoint. Keep long-term memory writes on the original model response so citation blocks do not get stored back into the knowledge base. Tested with: go test ./core/services/agents Assisted-by: Codex:gpt-5 Signed-off-by: Pete Chen <petechentw@gmail.com> * Collect KB citations from tool searches Signed-off-by: Pete Chen <petechentw@gmail.com> * fix(agents): append KB sources in local chats Apply the shared KB citation post-processing to standalone LocalAGI chat responses so the React agent chat receives the same clickable Sources block as the native executor path. Also fix the run target to use the current cmd/local-ai entrypoint. Assisted-by: Codex:gpt-5 Signed-off-by: Pete Chen <petechentw@gmail.com> --------- Signed-off-by: Pete Chen <petechentw@gmail.com> Co-authored-by: shihyunhuang <shihyunhuang88@gmail.com> Co-authored-by: TLoE419 <tloemizuchizu@gmail.com> Co-authored-by: Ching Kao <0980124jim@gmail.com>
1172 lines
38 KiB
Go
1172 lines
38 KiB
Go
package agentpool
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/http/auth"
|
|
"github.com/mudler/LocalAI/core/services/agents"
|
|
"github.com/mudler/LocalAI/core/services/distributed"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
skillsManager "github.com/mudler/LocalAI/core/services/skills"
|
|
|
|
"github.com/mudler/LocalAGI/core/agent"
|
|
"github.com/mudler/LocalAGI/core/sse"
|
|
"github.com/mudler/LocalAGI/core/state"
|
|
coreTypes "github.com/mudler/LocalAGI/core/types"
|
|
agiServices "github.com/mudler/LocalAGI/services"
|
|
"github.com/mudler/LocalAGI/services/skills"
|
|
"github.com/mudler/LocalAGI/webui/collections"
|
|
"github.com/mudler/xlog"
|
|
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// localAGICore manages the in-process LocalAGI agent pool (standalone mode only).
|
|
type localAGICore struct {
|
|
pool *state.AgentPool
|
|
skillsService *skills.Service
|
|
configMeta state.AgentConfigMeta
|
|
sharedState *coreTypes.AgentSharedState
|
|
actionsConfig map[string]string
|
|
}
|
|
|
|
// distributedBridge connects to the NATS-based distributed agent system.
|
|
type distributedBridge struct {
|
|
natsClient messaging.Publisher // NATS client for distributed agent execution
|
|
agentStore *agents.AgentStore // PostgreSQL agent config store
|
|
eventBridge AgentEventBridge // Event bridge for SSE + persistence
|
|
skillStore *distributed.SkillStore // PostgreSQL skill metadata (distributed mode)
|
|
dispatcher agents.Dispatcher // Native dispatcher (distributed or local)
|
|
}
|
|
|
|
// userManager handles per-user services, storage, and auth.
|
|
type userManager struct {
|
|
userServices *UserServicesManager
|
|
userStorage *UserScopedStorage
|
|
authDB *gorm.DB
|
|
}
|
|
|
|
// AgentPoolService wraps LocalAGI's AgentPool, Skills service, and collections backend
|
|
// to provide agentic capabilities integrated directly into LocalAI.
|
|
type AgentPoolService struct {
|
|
appConfig *config.ApplicationConfig
|
|
collectionsBackend collections.Backend
|
|
configBackend AgentConfigBackend // Abstracts local vs distributed agent operations
|
|
localAGI localAGICore
|
|
distributed distributedBridge
|
|
users userManager
|
|
stateDir string
|
|
outputsDir string
|
|
apiURL string // Resolved API URL for agent execution
|
|
apiKey string // Resolved API key for agent execution
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// AgentEventBridge is the interface for event publishing needed by AgentPoolService.
|
|
type AgentEventBridge interface {
|
|
PublishMessage(agentName, userID, sender, content, messageID string) error
|
|
PublishStatus(agentName, userID, status string) error
|
|
PublishStreamEvent(agentName, userID string, data map[string]any) error
|
|
RegisterCancel(key string, cancel context.CancelFunc)
|
|
DeregisterCancel(key string)
|
|
}
|
|
|
|
// AgentConfigStore is the interface for agent config persistence.
|
|
type AgentConfigStore interface {
|
|
SaveConfig(cfg *agents.AgentConfigRecord) error
|
|
GetConfig(userID, name string) (*agents.AgentConfigRecord, error)
|
|
ListConfigs(userID string) ([]agents.AgentConfigRecord, error)
|
|
DeleteConfig(userID, name string) error
|
|
UpdateStatus(userID, name, status string) error
|
|
UpdateLastRun(userID, name string) error
|
|
}
|
|
|
|
// AgentPoolOptions holds optional dependencies for AgentPoolService.
|
|
// Zero values are fine — the service degrades gracefully without them.
|
|
type AgentPoolOptions struct {
|
|
AuthDB *gorm.DB
|
|
SkillStore *distributed.SkillStore
|
|
NATSClient messaging.Publisher
|
|
EventBridge AgentEventBridge
|
|
AgentStore *agents.AgentStore
|
|
}
|
|
|
|
func NewAgentPoolService(appConfig *config.ApplicationConfig, opts ...AgentPoolOptions) (*AgentPoolService, error) {
|
|
svc := &AgentPoolService{
|
|
appConfig: appConfig,
|
|
}
|
|
if len(opts) > 0 {
|
|
o := opts[0]
|
|
if o.AuthDB != nil {
|
|
svc.users.authDB = o.AuthDB
|
|
}
|
|
if o.SkillStore != nil {
|
|
svc.distributed.skillStore = o.SkillStore
|
|
}
|
|
if o.NATSClient != nil {
|
|
svc.distributed.natsClient = o.NATSClient
|
|
}
|
|
if o.EventBridge != nil {
|
|
svc.distributed.eventBridge = o.EventBridge
|
|
}
|
|
if o.AgentStore != nil {
|
|
svc.distributed.agentStore = o.AgentStore
|
|
}
|
|
}
|
|
return svc, nil
|
|
}
|
|
|
|
func (s *AgentPoolService) Start(ctx context.Context) error {
|
|
cfg := s.appConfig.AgentPool
|
|
|
|
// API URL: use configured value, or derive self-referencing URL from LocalAI's address
|
|
apiURL := cfg.APIURL
|
|
if apiURL == "" {
|
|
_, port, err := net.SplitHostPort(s.appConfig.APIAddress)
|
|
if err != nil {
|
|
port = strings.TrimPrefix(s.appConfig.APIAddress, ":")
|
|
}
|
|
apiURL = "http://127.0.0.1:" + port
|
|
}
|
|
apiKey := cfg.APIKey
|
|
if apiKey == "" && len(s.appConfig.ApiKeys) > 0 {
|
|
apiKey = s.appConfig.ApiKeys[0]
|
|
}
|
|
|
|
s.apiURL = apiURL
|
|
s.apiKey = apiKey
|
|
|
|
// Distributed mode: use native executor + NATSDispatcher.
|
|
// No LocalAGI pool, no collections, no skills service — all stateless.
|
|
if s.distributed.natsClient != nil {
|
|
return s.startDistributed(ctx, apiURL, apiKey)
|
|
}
|
|
|
|
// Standalone mode: use LocalAGI pool (backward compat)
|
|
return s.startLocalAGI(ctx, cfg, apiURL, apiKey)
|
|
}
|
|
|
|
func (s *AgentPoolService) buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets string) *collections.Config {
|
|
cfg := s.appConfig.AgentPool
|
|
return &collections.Config{
|
|
LLMAPIURL: apiURL,
|
|
LLMAPIKey: apiKey,
|
|
LLMModel: cfg.DefaultModel,
|
|
CollectionDBPath: collectionDBPath,
|
|
FileAssets: fileAssets,
|
|
VectorEngine: cfg.VectorEngine,
|
|
EmbeddingModel: cfg.EmbeddingModel,
|
|
MaxChunkingSize: cfg.MaxChunkingSize,
|
|
ChunkOverlap: cfg.ChunkOverlap,
|
|
DatabaseURL: cfg.DatabaseURL,
|
|
}
|
|
}
|
|
|
|
// startDistributed initializes the native agent executor with NATS dispatcher.
|
|
// No LocalAGI pool is created — agent execution is stateless.
|
|
// Skills and collections are still initialized for the frontend UI.
|
|
func (s *AgentPoolService) startDistributed(ctx context.Context, apiURL, apiKey string) error {
|
|
cfg := s.appConfig.AgentPool
|
|
|
|
// State dir for skills and outputs
|
|
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
|
|
if err := os.MkdirAll(stateDir, 0750); err != nil {
|
|
xlog.Warn("Failed to create agent state dir", "error", err)
|
|
}
|
|
s.stateDir = stateDir
|
|
|
|
// Outputs directory
|
|
outputsDir := filepath.Join(stateDir, "outputs")
|
|
if err := os.MkdirAll(outputsDir, 0750); err != nil {
|
|
xlog.Warn("Failed to create outputs directory", "error", err)
|
|
}
|
|
s.outputsDir = outputsDir
|
|
|
|
// Skills service — same as standalone, filesystem-based
|
|
skillsSvc, err := skills.NewService(stateDir)
|
|
if err != nil {
|
|
xlog.Warn("Failed to create skills service in distributed mode", "error", err)
|
|
} else {
|
|
s.localAGI.skillsService = skillsSvc
|
|
}
|
|
|
|
// Collections backend — same as standalone, in-process
|
|
collectionDBPath := cfg.CollectionDBPath
|
|
if collectionDBPath == "" {
|
|
collectionDBPath = filepath.Join(stateDir, "collections")
|
|
}
|
|
fileAssets := filepath.Join(stateDir, "assets")
|
|
|
|
collectionsBackend, _ := collections.NewInProcessBackend(s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets))
|
|
s.collectionsBackend = collectionsBackend
|
|
|
|
// User-scoped storage
|
|
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
|
|
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
|
|
|
|
// Start the background agent scheduler on the frontend.
|
|
// It needs DB access to list configs and update LastRunAt — the worker doesn't have DB.
|
|
// The advisory lock ensures only one frontend instance runs the scheduler.
|
|
if s.users.authDB != nil && s.distributed.natsClient != nil && s.distributed.agentStore != nil {
|
|
var schedulerOpts []agents.AgentSchedulerOpt
|
|
if s.distributed.skillStore != nil {
|
|
schedulerOpts = append(schedulerOpts, agents.WithSchedulerSkillProvider(s.buildSkillProvider()))
|
|
}
|
|
scheduler := agents.NewAgentScheduler(
|
|
s.users.authDB,
|
|
s.distributed.natsClient,
|
|
s.distributed.agentStore,
|
|
messaging.SubjectAgentExecute,
|
|
schedulerOpts...,
|
|
)
|
|
go scheduler.Start(ctx)
|
|
}
|
|
|
|
// Wire the distributed config backend
|
|
s.configBackend = newDistributedAgentConfigBackend(s, s.distributed.agentStore)
|
|
|
|
xlog.Info("Agent pool started in distributed mode (frontend dispatcher only)", "apiURL", apiURL, "stateDir", stateDir)
|
|
return nil
|
|
}
|
|
|
|
// startLocalAGI initializes the full LocalAGI pool for standalone mode.
|
|
func (s *AgentPoolService) startLocalAGI(_ context.Context, cfg config.AgentPoolConfig, apiURL, apiKey string) error {
|
|
// State dir: explicit config > DataPath > DynamicConfigsDir > fallback
|
|
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
|
|
if err := os.MkdirAll(stateDir, 0750); err != nil {
|
|
return fmt.Errorf("failed to create agent pool state dir: %w", err)
|
|
}
|
|
|
|
// Collections paths
|
|
collectionDBPath := cfg.CollectionDBPath
|
|
if collectionDBPath == "" {
|
|
collectionDBPath = filepath.Join(stateDir, "collections")
|
|
}
|
|
fileAssets := filepath.Join(stateDir, "assets")
|
|
|
|
// Skills service
|
|
skillsSvc, err := skills.NewService(stateDir)
|
|
if err != nil {
|
|
xlog.Error("Failed to create skills service", "error", err)
|
|
}
|
|
s.localAGI.skillsService = skillsSvc
|
|
|
|
// Actions config map
|
|
actionsConfig := map[string]string{
|
|
agiServices.ConfigStateDir: stateDir,
|
|
}
|
|
if cfg.CustomActionsDir != "" {
|
|
actionsConfig[agiServices.CustomActionsDir] = cfg.CustomActionsDir
|
|
}
|
|
|
|
// Create outputs subdirectory
|
|
outputsDir := filepath.Join(stateDir, "outputs")
|
|
if err := os.MkdirAll(outputsDir, 0750); err != nil {
|
|
xlog.Error("Failed to create outputs directory", "path", outputsDir, "error", err)
|
|
}
|
|
|
|
s.localAGI.actionsConfig = actionsConfig
|
|
s.stateDir = stateDir
|
|
s.outputsDir = outputsDir
|
|
s.localAGI.sharedState = coreTypes.NewAgentSharedState(5 * time.Minute)
|
|
|
|
// Initialize user-scoped storage
|
|
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
|
|
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
|
|
|
|
// Create the agent pool
|
|
pool, err := state.NewAgentPool(
|
|
cfg.DefaultModel,
|
|
cfg.MultimodalModel,
|
|
cfg.TranscriptionModel,
|
|
cfg.TranscriptionLanguage,
|
|
cfg.TTSModel,
|
|
apiURL,
|
|
apiKey,
|
|
stateDir,
|
|
agiServices.Actions(actionsConfig),
|
|
agiServices.Connectors,
|
|
agiServices.DynamicPrompts(actionsConfig),
|
|
agiServices.Filters,
|
|
cfg.Timeout,
|
|
cfg.EnableLogs,
|
|
skillsSvc,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create agent pool: %w", err)
|
|
}
|
|
s.localAGI.pool = pool
|
|
|
|
// Create in-process collections backend and RAG provider
|
|
collectionsCfg := s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets)
|
|
collectionsBackend, collectionsState := collections.NewInProcessBackend(collectionsCfg)
|
|
s.collectionsBackend = collectionsBackend
|
|
|
|
embedded := collections.RAGProviderFromState(collectionsState)
|
|
pool.SetRAGProvider(func(collectionName, _, _ string) (agent.RAGDB, state.KBCompactionClient, bool) {
|
|
return embedded(collectionName)
|
|
})
|
|
|
|
// Build config metadata for UI
|
|
s.localAGI.configMeta = state.NewAgentConfigMeta(
|
|
agiServices.ActionsConfigMeta(cfg.CustomActionsDir),
|
|
agiServices.ConnectorsConfigMeta(),
|
|
agiServices.DynamicPromptsConfigMeta(cfg.CustomActionsDir),
|
|
agiServices.FiltersConfigMeta(),
|
|
)
|
|
|
|
// Start all agents
|
|
if err := pool.StartAll(); err != nil {
|
|
xlog.Error("Failed to start agent pool", "error", err)
|
|
}
|
|
|
|
// Wire the local config backend
|
|
s.configBackend = newLocalAgentConfigBackend(s)
|
|
|
|
xlog.Info("Agent pool started (standalone/LocalAGI mode)", "stateDir", stateDir, "apiURL", apiURL)
|
|
return nil
|
|
}
|
|
|
|
func (s *AgentPoolService) Stop() {
|
|
if s.configBackend != nil {
|
|
s.configBackend.Stop()
|
|
}
|
|
}
|
|
|
|
// ConfigBackend returns the underlying AgentConfigBackend.
|
|
func (s *AgentPoolService) ConfigBackend() AgentConfigBackend {
|
|
return s.configBackend
|
|
}
|
|
|
|
// APIURL returns the resolved API URL for agent execution.
|
|
func (s *AgentPoolService) APIURL() string {
|
|
return s.apiURL
|
|
}
|
|
|
|
// APIKey returns the resolved API key for agent execution.
|
|
func (s *AgentPoolService) APIKey() string {
|
|
return s.apiKey
|
|
}
|
|
|
|
// Pool returns the underlying AgentPool.
|
|
func (s *AgentPoolService) Pool() *state.AgentPool {
|
|
return s.localAGI.pool
|
|
}
|
|
|
|
// SetNATSClient sets the NATS client for distributed agent execution.
|
|
// Deprecated: prefer passing NATSClient via AgentPoolOptions at construction time.
|
|
func (s *AgentPoolService) SetNATSClient(nc messaging.Publisher) {
|
|
s.distributed.natsClient = nc
|
|
}
|
|
|
|
// SetEventBridge sets the event bridge for distributed SSE + persistence.
|
|
// Deprecated: prefer passing EventBridge via AgentPoolOptions at construction time.
|
|
func (s *AgentPoolService) SetEventBridge(eb AgentEventBridge) {
|
|
s.distributed.eventBridge = eb
|
|
}
|
|
|
|
// SetAgentStore sets the PostgreSQL agent config store.
|
|
// Deprecated: prefer passing AgentStore via AgentPoolOptions at construction time.
|
|
func (s *AgentPoolService) SetAgentStore(store *agents.AgentStore) {
|
|
s.distributed.agentStore = store
|
|
}
|
|
|
|
// Agent execution in distributed mode is handled by the dedicated agent-worker process
|
|
// using the NATSDispatcher from core/services/agents/dispatcher.go.
|
|
// The frontend only dispatches chat events to NATS via dispatchChat().
|
|
|
|
// --- Agent CRUD ---
|
|
|
|
func (s *AgentPoolService) GetAgent(name string) *agent.Agent {
|
|
// GetAgent is used by the responses interceptor to check if a model name
|
|
// is an agent. It uses the raw pool key (no userID prefix).
|
|
return s.configBackend.GetAgent("", name)
|
|
}
|
|
|
|
// Chat sends a message to an agent and returns immediately. Responses come via SSE.
|
|
func (s *AgentPoolService) Chat(name, message string) (string, error) {
|
|
ag := s.localAGI.pool.GetAgent(name)
|
|
if ag == nil {
|
|
return "", fmt.Errorf("%w: %s", ErrAgentNotFound, name)
|
|
}
|
|
manager := s.localAGI.pool.GetManager(name)
|
|
if manager == nil {
|
|
return "", fmt.Errorf("SSE manager not found for agent: %s", name)
|
|
}
|
|
|
|
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
|
|
// Send user message via SSE
|
|
userMsg, _ := json.Marshal(map[string]any{
|
|
"id": messageID + "-user",
|
|
"sender": "user",
|
|
"content": message,
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
})
|
|
manager.Send(sse.NewMessage(string(userMsg)).WithEvent("json_message"))
|
|
|
|
// Send processing status
|
|
statusMsg, _ := json.Marshal(map[string]any{
|
|
"status": "processing",
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
})
|
|
manager.Send(sse.NewMessage(string(statusMsg)).WithEvent("json_message_status"))
|
|
|
|
// Process asynchronously
|
|
go func() {
|
|
response := ag.Ask(coreTypes.WithText(message))
|
|
|
|
if response == nil {
|
|
errMsg, _ := json.Marshal(map[string]any{
|
|
"error": "agent request failed or was cancelled",
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
})
|
|
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
|
|
} else if response.Error != nil {
|
|
errMsg, _ := json.Marshal(map[string]any{
|
|
"error": response.Error.Error(),
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
})
|
|
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
|
|
} else {
|
|
// Collect metadata from all action states
|
|
metadata := map[string]any{}
|
|
for _, state := range response.State {
|
|
for k, v := range state.Metadata {
|
|
if existing, ok := metadata[k]; ok {
|
|
if existList, ok := existing.([]string); ok {
|
|
if newList, ok := v.([]string); ok {
|
|
metadata[k] = append(existList, newList...)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
metadata[k] = v
|
|
}
|
|
}
|
|
|
|
if len(metadata) > 0 {
|
|
// Extract userID from the agent key (format: "userID:agentName")
|
|
var chatUserID string
|
|
if uid, _, ok := strings.Cut(name, ":"); ok {
|
|
chatUserID = uid
|
|
}
|
|
s.collectAndCopyMetadata(metadata, chatUserID)
|
|
}
|
|
|
|
content := s.appendLocalAGIKBCitations(response.Response, name, message, response.State)
|
|
msg := map[string]any{
|
|
"id": messageID + "-agent",
|
|
"sender": "agent",
|
|
"content": content,
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
}
|
|
if len(metadata) > 0 {
|
|
msg["metadata"] = metadata
|
|
}
|
|
respMsg, _ := json.Marshal(msg)
|
|
manager.Send(sse.NewMessage(string(respMsg)).WithEvent("json_message"))
|
|
}
|
|
|
|
completedMsg, _ := json.Marshal(map[string]any{
|
|
"status": "completed",
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
})
|
|
manager.Send(sse.NewMessage(string(completedMsg)).WithEvent("json_message_status"))
|
|
}()
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
func (s *AgentPoolService) appendLocalAGIKBCitations(response, agentKey, message string, states []coreTypes.ActionState) string {
|
|
if strings.TrimSpace(response) == "" {
|
|
return response
|
|
}
|
|
|
|
userID, collection := splitAgentKey(agentKey)
|
|
cfg := s.localAGI.pool.GetConfig(agentKey)
|
|
if cfg == nil || !cfg.EnableKnowledgeBase {
|
|
return response
|
|
}
|
|
|
|
citations := kbCitationsFromActionStates(states)
|
|
if len(citations) == 0 && cfg.KBAutoSearch {
|
|
maxResults := cfg.KnowledgeBaseResults
|
|
if maxResults <= 0 {
|
|
maxResults = 5
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
kbResult := agents.KBAutoSearchPrompt(ctx, s.apiURL, s.apiKey, collection, message, maxResults, userID)
|
|
citations = kbResult.Citations
|
|
}
|
|
|
|
return agents.AppendKBCitations(response, collection, userID, citations)
|
|
}
|
|
|
|
func splitAgentKey(agentKey string) (userID, name string) {
|
|
if uid, n, ok := strings.Cut(agentKey, ":"); ok {
|
|
return uid, n
|
|
}
|
|
return "", agentKey
|
|
}
|
|
|
|
func kbCitationsFromActionStates(states []coreTypes.ActionState) []agents.KBCitation {
|
|
var citations []agents.KBCitation
|
|
for _, state := range states {
|
|
citations = append(citations, kbCitationsFromMetadata(state.Metadata)...)
|
|
}
|
|
return citations
|
|
}
|
|
|
|
func kbCitationsFromMetadata(metadata map[string]any) []agents.KBCitation {
|
|
if len(metadata) == 0 {
|
|
return nil
|
|
}
|
|
|
|
fileName := metadata["file_name"]
|
|
source := metadata["source"]
|
|
if fileName == nil && source == nil {
|
|
return nil
|
|
}
|
|
|
|
citation := agents.KBCitation{
|
|
FileName: metadataString(fileName),
|
|
EntryKey: metadataString(source),
|
|
}
|
|
if citation.FileName == "" && citation.EntryKey == "" {
|
|
return nil
|
|
}
|
|
return []agents.KBCitation{citation}
|
|
}
|
|
|
|
func metadataString(value any) string {
|
|
switch v := value.(type) {
|
|
case string:
|
|
return v
|
|
case fmt.Stringer:
|
|
return v.String()
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// userOutputsDir returns the per-user outputs directory, creating it if needed.
|
|
// If userID is empty, falls back to the shared outputs directory.
|
|
func (s *AgentPoolService) userOutputsDir(userID string) string {
|
|
if userID == "" {
|
|
return s.outputsDir
|
|
}
|
|
dir := filepath.Join(s.outputsDir, userID)
|
|
os.MkdirAll(dir, 0750)
|
|
return dir
|
|
}
|
|
|
|
// copyToOutputs copies a file into the per-user outputs directory and returns the new path.
|
|
// If the file is already inside the target dir, it returns the original path unchanged.
|
|
func (s *AgentPoolService) copyToOutputs(srcPath, userID string) (string, error) {
|
|
targetDir := s.userOutputsDir(userID)
|
|
srcClean := filepath.Clean(srcPath)
|
|
absTarget, _ := filepath.Abs(targetDir)
|
|
absSrc, _ := filepath.Abs(srcClean)
|
|
if strings.HasPrefix(absSrc, absTarget+string(os.PathSeparator)) {
|
|
return srcPath, nil
|
|
}
|
|
|
|
src, err := os.Open(srcClean)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer src.Close()
|
|
|
|
dstPath := filepath.Join(targetDir, filepath.Base(srcClean))
|
|
dst, err := os.Create(dstPath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer dst.Close()
|
|
|
|
if _, err := io.Copy(dst, src); err != nil {
|
|
return "", err
|
|
}
|
|
return dstPath, nil
|
|
}
|
|
|
|
// collectAndCopyMetadata iterates all metadata keys and, for any value that is
|
|
// a []string of local file paths, copies those files into the per-user outputs
|
|
// directory so the file endpoint can serve them from a single confined location.
|
|
// Entries that are URLs (http/https) are left unchanged.
|
|
func (s *AgentPoolService) collectAndCopyMetadata(metadata map[string]any, userID string) {
|
|
for key, val := range metadata {
|
|
list, ok := val.([]string)
|
|
if !ok {
|
|
continue
|
|
}
|
|
updated := make([]string, 0, len(list))
|
|
for _, p := range list {
|
|
if strings.HasPrefix(p, "http://") || strings.HasPrefix(p, "https://") {
|
|
updated = append(updated, p)
|
|
continue
|
|
}
|
|
newPath, err := s.copyToOutputs(p, userID)
|
|
if err != nil {
|
|
xlog.Error("Failed to copy file to outputs", "src", p, "error", err)
|
|
updated = append(updated, p)
|
|
continue
|
|
}
|
|
updated = append(updated, newPath)
|
|
}
|
|
metadata[key] = updated
|
|
}
|
|
}
|
|
|
|
func (s *AgentPoolService) GetConfigMeta() state.AgentConfigMeta {
|
|
return s.localAGI.configMeta
|
|
}
|
|
|
|
// GetConfigMetaResult returns the config metadata via the backend, which handles
|
|
// local vs distributed differences (LocalAGI metadata vs native static metadata).
|
|
func (s *AgentPoolService) GetConfigMetaResult() AgentConfigMetaResult {
|
|
return s.configBackend.GetConfigMeta()
|
|
}
|
|
|
|
func (s *AgentPoolService) AgentHubURL() string {
|
|
return s.appConfig.AgentPool.AgentHubURL
|
|
}
|
|
|
|
func (s *AgentPoolService) StateDir() string {
|
|
return s.stateDir
|
|
}
|
|
|
|
func (s *AgentPoolService) OutputsDir() string {
|
|
return s.outputsDir
|
|
}
|
|
|
|
// ExportAgent returns the agent config as JSON bytes.
|
|
func (s *AgentPoolService) ExportAgent(name string) ([]byte, error) {
|
|
// Extract userID and agent name from the key (format: "userID:agentName")
|
|
userID := ""
|
|
agentName := name
|
|
if u, a, ok := strings.Cut(name, ":"); ok {
|
|
userID = u
|
|
agentName = a
|
|
}
|
|
return s.configBackend.ExportConfig(userID, agentName)
|
|
}
|
|
|
|
// --- User Services ---
|
|
|
|
// SetUserServicesManager sets the user services manager for per-user scoping.
|
|
func (s *AgentPoolService) SetUserServicesManager(usm *UserServicesManager) {
|
|
s.users.userServices = usm
|
|
}
|
|
|
|
// UserStorage returns the user-scoped storage.
|
|
func (s *AgentPoolService) UserStorage() *UserScopedStorage {
|
|
return s.users.userStorage
|
|
}
|
|
|
|
// UserServicesManager returns the user services manager.
|
|
func (s *AgentPoolService) UserServicesManager() *UserServicesManager {
|
|
return s.users.userServices
|
|
}
|
|
|
|
// SetAuthDB sets the auth database for API key generation.
|
|
// Deprecated: prefer passing AuthDB via AgentPoolOptions at construction time.
|
|
func (s *AgentPoolService) SetAuthDB(db *gorm.DB) {
|
|
s.users.authDB = db
|
|
}
|
|
|
|
// SetSkillStore sets the distributed skill store for persisting skill metadata to PostgreSQL.
|
|
// Deprecated: prefer passing SkillStore via AgentPoolOptions at construction time.
|
|
func (s *AgentPoolService) SetSkillStore(store *distributed.SkillStore) {
|
|
s.distributed.skillStore = store
|
|
}
|
|
|
|
// --- Admin Aggregation ---
|
|
|
|
// UserAgentInfo holds agent info for cross-user listing.
|
|
type UserAgentInfo struct {
|
|
Name string `json:"name"`
|
|
Active bool `json:"active"`
|
|
}
|
|
|
|
// ListAllAgentsGrouped returns all agents grouped by user ID.
|
|
// Keys without ":" go into the "" (root) group.
|
|
func (s *AgentPoolService) ListAllAgentsGrouped() map[string][]UserAgentInfo {
|
|
return s.configBackend.ListAllGrouped()
|
|
}
|
|
|
|
// --- ForUser Collections ---
|
|
|
|
// ListCollectionsForUser lists collections for a specific user.
|
|
func (s *AgentPoolService) ListCollectionsForUser(userID string) ([]string, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backend.ListCollections()
|
|
}
|
|
|
|
// CreateCollectionForUser creates a collection for a specific user.
|
|
func (s *AgentPoolService) CreateCollectionForUser(userID, name string) error {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return backend.CreateCollection(name)
|
|
}
|
|
|
|
// ensureCollectionForUser creates a collection for the user if it doesn't already exist.
|
|
func (s *AgentPoolService) ensureCollectionForUser(userID, name string) error {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
collections, err := backend.ListCollections()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if slices.Contains(collections, name) {
|
|
return nil
|
|
}
|
|
return backend.CreateCollection(name)
|
|
}
|
|
|
|
// UploadToCollectionForUser uploads to a collection for a specific user.
|
|
// The filename arrives from a multipart upload; the vendored backend may or
|
|
// may not sanitise it, so strip any directory components at the boundary.
|
|
func (s *AgentPoolService) UploadToCollectionForUser(userID, collection, filename string, fileBody io.Reader) (string, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
base := filepath.Base(filename)
|
|
if base == "." || base == ".." || base == "/" || base == "" {
|
|
return "", fmt.Errorf("invalid filename")
|
|
}
|
|
return backend.Upload(collection, base, fileBody)
|
|
}
|
|
|
|
// CollectionEntryExistsForUser checks if an entry exists in a user's collection.
|
|
func (s *AgentPoolService) CollectionEntryExistsForUser(userID, collection, entry string) bool {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return backend.EntryExists(collection, entry)
|
|
}
|
|
|
|
// ListCollectionEntriesForUser lists entries in a user's collection.
|
|
func (s *AgentPoolService) ListCollectionEntriesForUser(userID, collection string) ([]string, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backend.ListEntries(collection)
|
|
}
|
|
|
|
// GetCollectionEntryContentForUser gets entry content for a user's collection.
|
|
func (s *AgentPoolService) GetCollectionEntryContentForUser(userID, collection, entry string) (string, int, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return "", 0, err
|
|
}
|
|
return backend.GetEntryContent(collection, entry)
|
|
}
|
|
|
|
// SearchCollectionForUser searches a user's collection.
|
|
func (s *AgentPoolService) SearchCollectionForUser(userID, collection, query string, maxResults int) ([]collections.SearchResult, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backend.Search(collection, query, maxResults)
|
|
}
|
|
|
|
// ResetCollectionForUser resets a user's collection.
|
|
func (s *AgentPoolService) ResetCollectionForUser(userID, collection string) error {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return backend.Reset(collection)
|
|
}
|
|
|
|
// DeleteCollectionEntryForUser deletes an entry from a user's collection.
|
|
func (s *AgentPoolService) DeleteCollectionEntryForUser(userID, collection, entry string) ([]string, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backend.DeleteEntry(collection, entry)
|
|
}
|
|
|
|
// AddCollectionSourceForUser adds a source to a user's collection.
|
|
func (s *AgentPoolService) AddCollectionSourceForUser(userID, collection, sourceURL string, intervalMin int) error {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return backend.AddSource(collection, sourceURL, intervalMin)
|
|
}
|
|
|
|
// RemoveCollectionSourceForUser removes a source from a user's collection.
|
|
func (s *AgentPoolService) RemoveCollectionSourceForUser(userID, collection, sourceURL string) error {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return backend.RemoveSource(collection, sourceURL)
|
|
}
|
|
|
|
// ListCollectionSourcesForUser lists sources for a user's collection.
|
|
func (s *AgentPoolService) ListCollectionSourcesForUser(userID, collection string) ([]collections.SourceInfo, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backend.ListSources(collection)
|
|
}
|
|
|
|
// GetCollectionEntryFilePathForUser gets the file path for an entry in a user's collection.
|
|
func (s *AgentPoolService) GetCollectionEntryFilePathForUser(userID, collection, entry string) (string, error) {
|
|
backend, err := s.CollectionsBackendForUser(userID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return backend.GetEntryFilePath(collection, entry)
|
|
}
|
|
|
|
// --- ForUser Agent Methods ---
|
|
|
|
// ListAgentsForUser lists agents belonging to a specific user.
|
|
// If userID is empty, returns all agents (backward compat).
|
|
func (s *AgentPoolService) ListAgentsForUser(userID string) map[string]bool {
|
|
return s.configBackend.ListAgents(userID)
|
|
}
|
|
|
|
// CreateAgentForUser creates an agent namespaced to a user.
|
|
// When auth is enabled and the agent config has no API key, a new user API key
|
|
// is auto-generated so the agent can authenticate against LocalAI's own API.
|
|
func (s *AgentPoolService) CreateAgentForUser(userID string, config *state.AgentConfig) error {
|
|
if err := ValidateAgentName(config.Name); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-generate a user API key when auth is active and none is specified
|
|
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
|
|
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+config.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create API key for agent: %w", err)
|
|
}
|
|
config.APIKey = plaintext
|
|
xlog.Info("Auto-generated API key for agent", "agent", config.Name, "user", userID)
|
|
}
|
|
|
|
if err := s.configBackend.SaveConfig(userID, config); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-create collection when knowledge base or long-term memory is enabled
|
|
if config.EnableKnowledgeBase || config.LongTermMemory {
|
|
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
|
|
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetAgentForUser returns the agent for a user.
|
|
// Returns nil in distributed mode where agents don't run in-process.
|
|
func (s *AgentPoolService) GetAgentForUser(userID, name string) *agent.Agent {
|
|
return s.configBackend.GetAgent(userID, name)
|
|
}
|
|
|
|
// GetAgentConfigForUser returns the agent config for a user's agent.
|
|
func (s *AgentPoolService) GetAgentConfigForUser(userID, name string) *state.AgentConfig {
|
|
return s.configBackend.GetConfig(userID, name)
|
|
}
|
|
|
|
// UpdateAgentForUser updates a user's agent.
|
|
func (s *AgentPoolService) UpdateAgentForUser(userID, name string, config *state.AgentConfig) error {
|
|
// Auto-generate a user API key when auth is active and none is specified
|
|
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
|
|
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create API key for agent: %w", err)
|
|
}
|
|
config.APIKey = plaintext
|
|
}
|
|
|
|
if err := s.configBackend.UpdateConfig(userID, name, config); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-create collection when knowledge base or long-term memory is enabled
|
|
if config.EnableKnowledgeBase || config.LongTermMemory {
|
|
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
|
|
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteAgentForUser deletes a user's agent.
|
|
func (s *AgentPoolService) DeleteAgentForUser(userID, name string) error {
|
|
return s.configBackend.DeleteConfig(userID, name)
|
|
}
|
|
|
|
// PauseAgentForUser pauses a user's agent.
|
|
func (s *AgentPoolService) PauseAgentForUser(userID, name string) error {
|
|
return s.configBackend.SetStatus(userID, name, "paused")
|
|
}
|
|
|
|
// ResumeAgentForUser resumes a user's agent.
|
|
func (s *AgentPoolService) ResumeAgentForUser(userID, name string) error {
|
|
return s.configBackend.SetStatus(userID, name, "active")
|
|
}
|
|
|
|
// GetAgentStatusForUser returns the status of a user's agent.
|
|
// Returns nil in distributed mode where status is not tracked in-process.
|
|
func (s *AgentPoolService) GetAgentStatusForUser(userID, name string) *state.Status {
|
|
return s.configBackend.GetStatus(userID, name)
|
|
}
|
|
|
|
// GetAgentObservablesForUser returns observables for a user's agent as raw JSON entries.
|
|
func (s *AgentPoolService) GetAgentObservablesForUser(userID, name string) ([]json.RawMessage, error) {
|
|
return s.configBackend.GetObservables(userID, name)
|
|
}
|
|
|
|
// ClearAgentObservablesForUser clears observables for a user's agent.
|
|
func (s *AgentPoolService) ClearAgentObservablesForUser(userID, name string) error {
|
|
return s.configBackend.ClearObservables(userID, name)
|
|
}
|
|
|
|
// ChatForUser sends a message to a user's agent.
|
|
func (s *AgentPoolService) ChatForUser(userID, name, message string) (string, error) {
|
|
return s.configBackend.Chat(userID, name, message)
|
|
}
|
|
|
|
// dispatchChat publishes a chat event to the NATS agent execution queue.
|
|
// The event is enriched with the full agent config and resolved skills so that
|
|
// the worker does not need direct database access.
|
|
func (s *AgentPoolService) dispatchChat(userID, name, message string) (string, error) {
|
|
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
|
|
// Send user message to SSE immediately so the UI shows it right away
|
|
if s.distributed.eventBridge != nil {
|
|
agentName := name
|
|
s.distributed.eventBridge.PublishMessage(agentName, userID, "user", message, messageID+"-user")
|
|
s.distributed.eventBridge.PublishStatus(agentName, userID, "processing")
|
|
}
|
|
|
|
// Load config from DB to embed in the NATS payload
|
|
var cfg *agents.AgentConfig
|
|
if s.distributed.agentStore != nil {
|
|
rec, err := s.distributed.agentStore.GetConfig(userID, name)
|
|
if err != nil {
|
|
return "", fmt.Errorf("agent config not found: %w", err)
|
|
}
|
|
var c agents.AgentConfig
|
|
if err := agents.ParseConfigJSON(rec.ConfigJSON, &c); err != nil {
|
|
return "", fmt.Errorf("invalid agent config: %w", err)
|
|
}
|
|
cfg = &c
|
|
}
|
|
|
|
// Load skills if enabled — uses SkillManager which reads from filesystem/PostgreSQL
|
|
var skills []agents.SkillInfo
|
|
if cfg != nil && cfg.EnableSkills {
|
|
if loaded, err := s.loadSkillsForUser(userID); err == nil {
|
|
skills = loaded
|
|
}
|
|
}
|
|
|
|
evt := agents.AgentChatEvent{
|
|
AgentName: name,
|
|
UserID: userID,
|
|
Message: message,
|
|
MessageID: messageID,
|
|
Role: "user",
|
|
Config: cfg,
|
|
Skills: skills,
|
|
}
|
|
if err := s.distributed.natsClient.Publish(messaging.SubjectAgentExecute, evt); err != nil {
|
|
return "", fmt.Errorf("failed to dispatch agent chat: %w", err)
|
|
}
|
|
return messageID, nil
|
|
}
|
|
|
|
// GetSSEManagerForUser returns the SSE manager for a user's agent.
|
|
// Returns nil in distributed mode where SSE is handled by EventBridge.
|
|
func (s *AgentPoolService) GetSSEManagerForUser(userID, name string) sse.Manager {
|
|
return s.configBackend.GetSSEManager(userID, name)
|
|
}
|
|
|
|
// ExportAgentForUser exports a user's agent config.
|
|
func (s *AgentPoolService) ExportAgentForUser(userID, name string) ([]byte, error) {
|
|
return s.ExportAgent(agents.AgentKey(userID, name))
|
|
}
|
|
|
|
// ImportAgentForUser imports an agent for a user.
|
|
func (s *AgentPoolService) ImportAgentForUser(userID string, data []byte) error {
|
|
var cfg state.AgentConfig
|
|
if err := json.Unmarshal(data, &cfg); err != nil {
|
|
return fmt.Errorf("invalid agent config: %w", err)
|
|
}
|
|
if err := ValidateAgentName(cfg.Name); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Auto-generate a user API key when auth is active and none is specified
|
|
if s.users.authDB != nil && userID != "" && cfg.APIKey == "" {
|
|
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+cfg.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create API key for agent: %w", err)
|
|
}
|
|
cfg.APIKey = plaintext
|
|
}
|
|
|
|
return s.configBackend.ImportConfig(userID, &cfg)
|
|
}
|
|
|
|
// --- ForUser Collections ---
|
|
|
|
// CollectionsBackendForUser returns the collections backend for a user.
|
|
func (s *AgentPoolService) CollectionsBackendForUser(userID string) (collections.Backend, error) {
|
|
if s.users.userServices == nil || userID == "" {
|
|
if s.collectionsBackend == nil {
|
|
return nil, fmt.Errorf("collections not available in distributed mode")
|
|
}
|
|
return s.collectionsBackend, nil
|
|
}
|
|
return s.users.userServices.GetCollections(userID)
|
|
}
|
|
|
|
// --- ForUser Skills ---
|
|
|
|
// SkillsServiceForUser returns the skills service for a user.
|
|
func (s *AgentPoolService) SkillsServiceForUser(userID string) (*skills.Service, error) {
|
|
if s.users.userServices == nil || userID == "" {
|
|
if s.localAGI.skillsService == nil {
|
|
return nil, fmt.Errorf("skills service not available")
|
|
}
|
|
return s.localAGI.skillsService, nil
|
|
}
|
|
return s.users.userServices.GetSkills(userID)
|
|
}
|
|
|
|
// SkillManagerForUser returns a SkillManager for a specific user.
|
|
// In distributed mode, returns a DistributedManager that syncs to PostgreSQL.
|
|
// In standalone mode, returns a FilesystemManager.
|
|
func (s *AgentPoolService) SkillManagerForUser(userID string) (skillsManager.Manager, error) {
|
|
svc, err := s.SkillsServiceForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fs := skillsManager.NewFilesystemManager(svc)
|
|
|
|
// In distributed mode, wrap with PostgreSQL sync
|
|
if s.distributed.skillStore != nil {
|
|
return skillsManager.NewDistributedManager(fs, s.distributed.skillStore, userID), nil
|
|
}
|
|
return fs, nil
|
|
}
|
|
|
|
// --- ForUser Jobs ---
|
|
|
|
// JobServiceForUser returns the agent job service for a user.
|
|
func (s *AgentPoolService) JobServiceForUser(userID string) (*AgentJobService, error) {
|
|
if s.users.userServices == nil || userID == "" {
|
|
return nil, fmt.Errorf("no user services manager or empty user ID")
|
|
}
|
|
return s.users.userServices.GetJobs(userID)
|
|
}
|
|
|
|
// --- Actions ---
|
|
|
|
// ListAvailableActions returns the list of all available action type names.
|
|
// In distributed mode, returns an empty list (actions are configured as MCP tools per agent).
|
|
func (s *AgentPoolService) ListAvailableActions() []string {
|
|
return s.configBackend.ListAvailableActions()
|
|
}
|
|
|
|
// GetActionDefinition creates an action instance by name with the given config and returns its definition.
|
|
func (s *AgentPoolService) GetActionDefinition(actionName string, actionConfig map[string]string) (any, error) {
|
|
if actionConfig == nil {
|
|
actionConfig = map[string]string{}
|
|
}
|
|
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return a.Definition(), nil
|
|
}
|
|
|
|
// ExecuteAction creates an action instance and runs it with the given params.
|
|
func (s *AgentPoolService) ExecuteAction(ctx context.Context, actionName string, actionConfig map[string]string, params coreTypes.ActionParams) (coreTypes.ActionResult, error) {
|
|
if actionConfig == nil {
|
|
actionConfig = map[string]string{}
|
|
}
|
|
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
|
|
if err != nil {
|
|
return coreTypes.ActionResult{}, err
|
|
}
|
|
return a.Run(ctx, s.localAGI.sharedState, params)
|
|
}
|
|
|
|
// loadSkillsForUser loads full skill info (name, description, content) for a user.
|
|
// Used by dispatchChat and the scheduler to enrich NATS events.
|
|
func (s *AgentPoolService) loadSkillsForUser(userID string) ([]agents.SkillInfo, error) {
|
|
mgr, err := s.SkillManagerForUser(userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allSkills, err := mgr.List()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var skills []agents.SkillInfo
|
|
for _, sk := range allSkills {
|
|
desc := ""
|
|
if sk.Metadata != nil && sk.Metadata.Description != "" {
|
|
desc = sk.Metadata.Description
|
|
}
|
|
if desc == "" {
|
|
d := sk.Content
|
|
if len(d) > 200 {
|
|
d = d[:200] + "..."
|
|
}
|
|
desc = d
|
|
}
|
|
skills = append(skills, agents.SkillInfo{
|
|
Name: sk.Name,
|
|
Description: desc,
|
|
Content: sk.Content,
|
|
})
|
|
}
|
|
return skills, nil
|
|
}
|
|
|
|
// buildSkillProvider returns a SkillContentProvider closure for the scheduler.
|
|
func (s *AgentPoolService) buildSkillProvider() agents.SkillContentProvider {
|
|
return func(userID string) ([]agents.SkillInfo, error) {
|
|
return s.loadSkillsForUser(userID)
|
|
}
|
|
}
|