Files
LocalAI/core/services/agent_pool.go
Ettore Di Giacinto d9c1db2b87 feat: add (experimental) fine-tuning support with TRL (#9088)
* feat: add fine-tuning endpoint

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(experimental): add fine-tuning endpoint and TRL support

This changeset defines new GRPC signatues for Fine tuning backends, and
add TRL backend as initial fine-tuning engine. This implementation also
supports exporting to GGUF and automatically importing it to LocalAI
after fine-tuning.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* commit TRL backend, stop by killing process

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* move fine-tune to generic features

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* add evals, reorder menu

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fix tests

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-21 02:08:02 +01:00

2122 lines
62 KiB
Go

package services
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/sse"
"github.com/mudler/LocalAGI/core/state"
coreTypes "github.com/mudler/LocalAGI/core/types"
agiServices "github.com/mudler/LocalAGI/services"
"github.com/mudler/LocalAGI/services/skills"
"github.com/mudler/LocalAGI/webui/collections"
"github.com/mudler/xlog"
skilldomain "github.com/mudler/skillserver/pkg/domain"
skillgit "github.com/mudler/skillserver/pkg/git"
"gorm.io/gorm"
)
// AgentPoolService wraps LocalAGI's AgentPool, Skills service, and collections backend
// to provide agentic capabilities integrated directly into LocalAI.
type AgentPoolService struct {
appConfig *config.ApplicationConfig
pool *state.AgentPool
skillsService *skills.Service
collectionsBackend collections.Backend
configMeta state.AgentConfigMeta
actionsConfig map[string]string
sharedState *coreTypes.AgentSharedState
stateDir string
outputsDir string
mu sync.Mutex
userServices *UserServicesManager
userStorage *UserScopedStorage
authDB *gorm.DB
}
func NewAgentPoolService(appConfig *config.ApplicationConfig) (*AgentPoolService, error) {
return &AgentPoolService{
appConfig: appConfig,
}, nil
}
func (s *AgentPoolService) Start(ctx context.Context) error {
cfg := s.appConfig.AgentPool
// API URL: use configured value, or derive self-referencing URL from LocalAI's address
apiURL := cfg.APIURL
if apiURL == "" {
_, port, err := net.SplitHostPort(s.appConfig.APIAddress)
if err != nil {
port = strings.TrimPrefix(s.appConfig.APIAddress, ":")
}
apiURL = "http://127.0.0.1:" + port
}
apiKey := cfg.APIKey
if apiKey == "" && len(s.appConfig.ApiKeys) > 0 {
apiKey = s.appConfig.ApiKeys[0]
}
// State dir: explicit config > DataPath > DynamicConfigsDir > fallback
stateDir := cfg.StateDir
if stateDir == "" {
stateDir = s.appConfig.DataPath
}
if stateDir == "" {
stateDir = s.appConfig.DynamicConfigsDir
}
if stateDir == "" {
stateDir = "agents"
}
if err := os.MkdirAll(stateDir, 0750); err != nil {
return fmt.Errorf("failed to create agent pool state dir: %w", err)
}
// Collections paths
collectionDBPath := cfg.CollectionDBPath
if collectionDBPath == "" {
collectionDBPath = filepath.Join(stateDir, "collections")
}
fileAssets := filepath.Join(stateDir, "assets")
// Skills service — always created since the agent pool calls GetSkillsPrompt unconditionally.
// When EnableSkills is false, the service still exists but the skills directory will be empty.
skillsSvc, err := skills.NewService(stateDir)
if err != nil {
xlog.Error("Failed to create skills service", "error", err)
}
s.skillsService = skillsSvc
// Actions config map — only set CustomActionsDir if non-empty to avoid
// "open : no such file or directory" errors
actionsConfig := map[string]string{
agiServices.ConfigStateDir: stateDir,
}
if cfg.CustomActionsDir != "" {
actionsConfig[agiServices.CustomActionsDir] = cfg.CustomActionsDir
}
// Create outputs subdirectory for action-generated files (PDFs, audio, etc.)
outputsDir := filepath.Join(stateDir, "outputs")
if err := os.MkdirAll(outputsDir, 0750); err != nil {
xlog.Error("Failed to create outputs directory", "path", outputsDir, "error", err)
}
s.actionsConfig = actionsConfig
s.stateDir = stateDir
s.outputsDir = outputsDir
s.sharedState = coreTypes.NewAgentSharedState(5 * time.Minute)
// Initialize user-scoped storage
dataDir := s.appConfig.DataPath
if dataDir == "" {
dataDir = s.appConfig.DynamicConfigsDir
}
s.userStorage = NewUserScopedStorage(stateDir, dataDir)
// Create the agent pool
pool, err := state.NewAgentPool(
cfg.DefaultModel,
cfg.MultimodalModel,
cfg.TranscriptionModel,
cfg.TranscriptionLanguage,
cfg.TTSModel,
apiURL,
apiKey,
stateDir,
agiServices.Actions(actionsConfig),
agiServices.Connectors,
agiServices.DynamicPrompts(actionsConfig),
agiServices.Filters,
cfg.Timeout,
cfg.EnableLogs,
skillsSvc,
)
if err != nil {
return fmt.Errorf("failed to create agent pool: %w", err)
}
s.pool = pool
// Create in-process collections backend and RAG provider directly
collectionsCfg := &collections.Config{
LLMAPIURL: apiURL,
LLMAPIKey: apiKey,
LLMModel: cfg.DefaultModel,
CollectionDBPath: collectionDBPath,
FileAssets: fileAssets,
VectorEngine: cfg.VectorEngine,
EmbeddingModel: cfg.EmbeddingModel,
MaxChunkingSize: cfg.MaxChunkingSize,
ChunkOverlap: cfg.ChunkOverlap,
DatabaseURL: cfg.DatabaseURL,
}
collectionsBackend, collectionsState := collections.NewInProcessBackend(collectionsCfg)
s.collectionsBackend = collectionsBackend
// Set up in-process RAG provider from collections state
embedded := collections.RAGProviderFromState(collectionsState)
pool.SetRAGProvider(func(collectionName, _, _ string) (agent.RAGDB, state.KBCompactionClient, bool) {
return embedded(collectionName)
})
// Build config metadata for UI
s.configMeta = state.NewAgentConfigMeta(
agiServices.ActionsConfigMeta(cfg.CustomActionsDir),
agiServices.ConnectorsConfigMeta(),
agiServices.DynamicPromptsConfigMeta(cfg.CustomActionsDir),
agiServices.FiltersConfigMeta(),
)
// Start all agents
if err := pool.StartAll(); err != nil {
xlog.Error("Failed to start agent pool", "error", err)
}
xlog.Info("Agent pool started", "stateDir", stateDir, "apiURL", apiURL)
return nil
}
func (s *AgentPoolService) Stop() {
if s.pool != nil {
s.pool.StopAll()
}
}
// Pool returns the underlying AgentPool.
func (s *AgentPoolService) Pool() *state.AgentPool {
return s.pool
}
// --- Agent CRUD ---
func (s *AgentPoolService) ListAgents() map[string]bool {
statuses := map[string]bool{}
agents := s.pool.List()
for _, a := range agents {
ag := s.pool.GetAgent(a)
if ag == nil {
continue
}
statuses[a] = !ag.Paused()
}
return statuses
}
func (s *AgentPoolService) CreateAgent(config *state.AgentConfig) error {
if config.Name == "" {
return fmt.Errorf("name is required")
}
return s.pool.CreateAgent(config.Name, config)
}
func (s *AgentPoolService) GetAgent(name string) *agent.Agent {
return s.pool.GetAgent(name)
}
func (s *AgentPoolService) GetAgentConfig(name string) *state.AgentConfig {
return s.pool.GetConfig(name)
}
func (s *AgentPoolService) UpdateAgent(name string, config *state.AgentConfig) error {
old := s.pool.GetConfig(name)
if old == nil {
return fmt.Errorf("agent not found: %s", name)
}
return s.pool.RecreateAgent(name, config)
}
func (s *AgentPoolService) DeleteAgent(name string) error {
return s.pool.Remove(name)
}
func (s *AgentPoolService) PauseAgent(name string) error {
ag := s.pool.GetAgent(name)
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Pause()
return nil
}
func (s *AgentPoolService) ResumeAgent(name string) error {
ag := s.pool.GetAgent(name)
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Resume()
return nil
}
func (s *AgentPoolService) GetAgentStatus(name string) *state.Status {
return s.pool.GetStatusHistory(name)
}
func (s *AgentPoolService) GetAgentObservables(name string) ([]coreTypes.Observable, error) {
ag := s.pool.GetAgent(name)
if ag == nil {
return nil, fmt.Errorf("agent not found: %s", name)
}
return ag.Observer().History(), nil
}
func (s *AgentPoolService) ClearAgentObservables(name string) error {
ag := s.pool.GetAgent(name)
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Observer().ClearHistory()
return nil
}
// Chat sends a message to an agent and returns immediately. Responses come via SSE.
func (s *AgentPoolService) Chat(name, message string) (string, error) {
ag := s.pool.GetAgent(name)
if ag == nil {
return "", fmt.Errorf("agent not found: %s", name)
}
manager := s.pool.GetManager(name)
if manager == nil {
return "", fmt.Errorf("SSE manager not found for agent: %s", name)
}
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message via SSE
userMsg, _ := json.Marshal(map[string]any{
"id": messageID + "-user",
"sender": "user",
"content": message,
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(userMsg)).WithEvent("json_message"))
// Send processing status
statusMsg, _ := json.Marshal(map[string]any{
"status": "processing",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(statusMsg)).WithEvent("json_message_status"))
// Process asynchronously
go func() {
response := ag.Ask(coreTypes.WithText(message))
if response == nil {
errMsg, _ := json.Marshal(map[string]any{
"error": "agent request failed or was cancelled",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else if response.Error != nil {
errMsg, _ := json.Marshal(map[string]any{
"error": response.Error.Error(),
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else {
// Collect metadata from all action states
metadata := map[string]any{}
for _, state := range response.State {
for k, v := range state.Metadata {
if existing, ok := metadata[k]; ok {
if existList, ok := existing.([]string); ok {
if newList, ok := v.([]string); ok {
metadata[k] = append(existList, newList...)
continue
}
}
}
metadata[k] = v
}
}
if len(metadata) > 0 {
// Extract userID from the agent key (format: "userID:agentName")
var chatUserID string
if parts := strings.SplitN(name, ":", 2); len(parts) == 2 {
chatUserID = parts[0]
}
s.collectAndCopyMetadata(metadata, chatUserID)
}
msg := map[string]any{
"id": messageID + "-agent",
"sender": "agent",
"content": response.Response,
"timestamp": time.Now().Format(time.RFC3339),
}
if len(metadata) > 0 {
msg["metadata"] = metadata
}
respMsg, _ := json.Marshal(msg)
manager.Send(sse.NewMessage(string(respMsg)).WithEvent("json_message"))
}
completedMsg, _ := json.Marshal(map[string]any{
"status": "completed",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(completedMsg)).WithEvent("json_message_status"))
}()
return messageID, nil
}
// userOutputsDir returns the per-user outputs directory, creating it if needed.
// If userID is empty, falls back to the shared outputs directory.
func (s *AgentPoolService) userOutputsDir(userID string) string {
if userID == "" {
return s.outputsDir
}
dir := filepath.Join(s.outputsDir, userID)
os.MkdirAll(dir, 0750)
return dir
}
// copyToOutputs copies a file into the per-user outputs directory and returns the new path.
// If the file is already inside the target dir, it returns the original path unchanged.
func (s *AgentPoolService) copyToOutputs(srcPath, userID string) (string, error) {
targetDir := s.userOutputsDir(userID)
srcClean := filepath.Clean(srcPath)
absTarget, _ := filepath.Abs(targetDir)
absSrc, _ := filepath.Abs(srcClean)
if strings.HasPrefix(absSrc, absTarget+string(os.PathSeparator)) {
return srcPath, nil
}
src, err := os.Open(srcClean)
if err != nil {
return "", err
}
defer src.Close()
dstPath := filepath.Join(targetDir, filepath.Base(srcClean))
dst, err := os.Create(dstPath)
if err != nil {
return "", err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return "", err
}
return dstPath, nil
}
// collectAndCopyMetadata iterates all metadata keys and, for any value that is
// a []string of local file paths, copies those files into the per-user outputs
// directory so the file endpoint can serve them from a single confined location.
// Entries that are URLs (http/https) are left unchanged.
func (s *AgentPoolService) collectAndCopyMetadata(metadata map[string]any, userID string) {
for key, val := range metadata {
list, ok := val.([]string)
if !ok {
continue
}
updated := make([]string, 0, len(list))
for _, p := range list {
if strings.HasPrefix(p, "http://") || strings.HasPrefix(p, "https://") {
updated = append(updated, p)
continue
}
newPath, err := s.copyToOutputs(p, userID)
if err != nil {
xlog.Error("Failed to copy file to outputs", "src", p, "error", err)
updated = append(updated, p)
continue
}
updated = append(updated, newPath)
}
metadata[key] = updated
}
}
func (s *AgentPoolService) GetSSEManager(name string) sse.Manager {
return s.pool.GetManager(name)
}
func (s *AgentPoolService) GetConfigMeta() state.AgentConfigMeta {
return s.configMeta
}
func (s *AgentPoolService) AgentHubURL() string {
return s.appConfig.AgentPool.AgentHubURL
}
func (s *AgentPoolService) StateDir() string {
return s.stateDir
}
func (s *AgentPoolService) OutputsDir() string {
return s.outputsDir
}
// ExportAgent returns the agent config as JSON bytes.
func (s *AgentPoolService) ExportAgent(name string) ([]byte, error) {
cfg := s.pool.GetConfig(name)
if cfg == nil {
return nil, fmt.Errorf("agent not found: %s", name)
}
return json.MarshalIndent(cfg, "", " ")
}
// ImportAgent creates an agent from JSON config data.
func (s *AgentPoolService) ImportAgent(data []byte) error {
var cfg state.AgentConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("invalid agent config: %w", err)
}
if cfg.Name == "" {
return fmt.Errorf("agent name is required")
}
return s.pool.CreateAgent(cfg.Name, &cfg)
}
// --- Skills ---
func (s *AgentPoolService) SkillsService() *skills.Service {
return s.skillsService
}
func (s *AgentPoolService) GetSkillsConfig() map[string]any {
if s.skillsService == nil {
return nil
}
return map[string]any{"skills_dir": s.skillsService.GetSkillsDir()}
}
func (s *AgentPoolService) ListSkills() ([]skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
if mgr == nil {
return []skilldomain.Skill{}, nil
}
return nil, err
}
return mgr.ListSkills()
}
func (s *AgentPoolService) GetSkill(name string) (*skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
return mgr.ReadSkill(name)
}
func (s *AgentPoolService) SearchSkills(query string) ([]skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
return mgr.SearchSkills(query)
}
func (s *AgentPoolService) CreateSkill(name, description, content, license, compatibility, allowedTools string, metadata map[string]string) (*skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
if err := skilldomain.ValidateSkillName(name); err != nil {
return nil, err
}
skillsDir := fsManager.GetSkillsDir()
skillDir := filepath.Join(skillsDir, name)
if _, err := os.Stat(skillDir); err == nil {
return nil, fmt.Errorf("skill already exists")
}
if err := os.MkdirAll(skillDir, 0755); err != nil {
return nil, err
}
frontmatter := fmt.Sprintf("---\nname: %s\ndescription: %s\n", name, description)
if license != "" {
frontmatter += fmt.Sprintf("license: %s\n", license)
}
if compatibility != "" {
frontmatter += fmt.Sprintf("compatibility: %s\n", compatibility)
}
if len(metadata) > 0 {
frontmatter += "metadata:\n"
for k, v := range metadata {
frontmatter += fmt.Sprintf(" %s: %s\n", k, v)
}
}
if allowedTools != "" {
frontmatter += fmt.Sprintf("allowed-tools: %s\n", allowedTools)
}
frontmatter += "---\n\n"
if err := os.WriteFile(filepath.Join(skillDir, "SKILL.md"), []byte(frontmatter+content), 0644); err != nil {
os.RemoveAll(skillDir)
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(name)
}
func (s *AgentPoolService) UpdateSkill(name, description, content, license, compatibility, allowedTools string, metadata map[string]string) (*skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
existing, err := mgr.ReadSkill(name)
if err != nil {
return nil, fmt.Errorf("skill not found")
}
if existing.ReadOnly {
return nil, fmt.Errorf("cannot update read-only skill from git repository")
}
skillDir := filepath.Join(fsManager.GetSkillsDir(), name)
frontmatter := fmt.Sprintf("---\nname: %s\ndescription: %s\n", name, description)
if license != "" {
frontmatter += fmt.Sprintf("license: %s\n", license)
}
if compatibility != "" {
frontmatter += fmt.Sprintf("compatibility: %s\n", compatibility)
}
if len(metadata) > 0 {
frontmatter += "metadata:\n"
for k, v := range metadata {
frontmatter += fmt.Sprintf(" %s: %s\n", k, v)
}
}
if allowedTools != "" {
frontmatter += fmt.Sprintf("allowed-tools: %s\n", allowedTools)
}
frontmatter += "---\n\n"
if err := os.WriteFile(filepath.Join(skillDir, "SKILL.md"), []byte(frontmatter+content), 0644); err != nil {
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(name)
}
func (s *AgentPoolService) DeleteSkill(name string) error {
if s.skillsService == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return fmt.Errorf("unsupported manager type")
}
existing, err := mgr.ReadSkill(name)
if err != nil {
return fmt.Errorf("skill not found")
}
if existing.ReadOnly {
return fmt.Errorf("cannot delete read-only skill from git repository")
}
skillDir := filepath.Join(fsManager.GetSkillsDir(), name)
if err := os.RemoveAll(skillDir); err != nil {
return err
}
return mgr.RebuildIndex()
}
func (s *AgentPoolService) ExportSkill(name string) ([]byte, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
skill, err := mgr.ReadSkill(name)
if err != nil {
return nil, fmt.Errorf("skill not found")
}
return skilldomain.ExportSkill(skill.ID, fsManager.GetSkillsDir())
}
func (s *AgentPoolService) ImportSkill(archiveData []byte) (*skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
skillName, err := skilldomain.ImportSkill(archiveData, fsManager.GetSkillsDir())
if err != nil {
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(skillName)
}
// --- Skill Resources ---
func (s *AgentPoolService) ListSkillResources(skillName string) ([]skilldomain.SkillResource, *skilldomain.Skill, error) {
if s.skillsService == nil {
return nil, nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, nil, fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return nil, nil, fmt.Errorf("skill not found")
}
resources, err := mgr.ListSkillResources(skill.ID)
if err != nil {
return nil, nil, err
}
return resources, skill, nil
}
func (s *AgentPoolService) GetSkillResource(skillName, resourcePath string) (*skilldomain.ResourceContent, *skilldomain.SkillResource, error) {
if s.skillsService == nil {
return nil, nil, fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return nil, nil, fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return nil, nil, fmt.Errorf("skill not found")
}
info, err := mgr.GetSkillResourceInfo(skill.ID, resourcePath)
if err != nil {
return nil, nil, fmt.Errorf("resource not found")
}
content, err := mgr.ReadSkillResource(skill.ID, resourcePath)
if err != nil {
return nil, nil, err
}
return content, info, nil
}
func (s *AgentPoolService) CreateSkillResource(skillName, path string, data []byte) error {
if s.skillsService == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot add resources to read-only skill")
}
if err := skilldomain.ValidateResourcePath(path); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, path)
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
return err
}
return os.WriteFile(fullPath, data, 0644)
}
func (s *AgentPoolService) UpdateSkillResource(skillName, resourcePath, content string) error {
if s.skillsService == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot update resources in read-only skill")
}
if err := skilldomain.ValidateResourcePath(resourcePath); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, resourcePath)
return os.WriteFile(fullPath, []byte(content), 0644)
}
func (s *AgentPoolService) DeleteSkillResource(skillName, resourcePath string) error {
if s.skillsService == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot delete resources from read-only skill")
}
if err := skilldomain.ValidateResourcePath(resourcePath); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, resourcePath)
return os.Remove(fullPath)
}
// --- Git Repos ---
func (s *AgentPoolService) getSkillsDir() string {
if s.skillsService == nil {
return ""
}
return s.skillsService.GetSkillsDir()
}
type GitRepoInfo struct {
ID string `json:"id"`
URL string `json:"url"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
}
func (s *AgentPoolService) ListGitRepos() ([]GitRepoInfo, error) {
dir := s.getSkillsDir()
if dir == "" {
return []GitRepoInfo{}, nil
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
out := make([]GitRepoInfo, len(repos))
for i, r := range repos {
out[i] = GitRepoInfo{ID: r.ID, URL: r.URL, Name: r.Name, Enabled: r.Enabled}
}
return out, nil
}
func (s *AgentPoolService) AddGitRepo(repoURL string) (*GitRepoInfo, error) {
dir := s.getSkillsDir()
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
if !strings.HasPrefix(repoURL, "http://") && !strings.HasPrefix(repoURL, "https://") && !strings.HasPrefix(repoURL, "git@") {
return nil, fmt.Errorf("invalid URL format")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
for _, r := range repos {
if r.URL == repoURL {
return nil, fmt.Errorf("repository already exists")
}
}
newRepo := skillgit.GitRepoConfig{
ID: skillgit.GenerateID(repoURL),
URL: repoURL,
Name: skillgit.ExtractRepoName(repoURL),
Enabled: true,
}
repos = append(repos, newRepo)
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
// Background sync
go func() {
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return
}
syncer := skillgit.NewGitSyncer(dir, []string{repoURL}, mgr.RebuildIndex)
if err := syncer.Start(); err != nil {
xlog.Error("background sync failed", "url", repoURL, "error", err)
s.skillsService.RefreshManagerFromConfig()
return
}
syncer.Stop()
s.skillsService.RefreshManagerFromConfig()
}()
return &GitRepoInfo{ID: newRepo.ID, URL: newRepo.URL, Name: newRepo.Name, Enabled: newRepo.Enabled}, nil
}
func (s *AgentPoolService) UpdateGitRepo(id, repoURL string, enabled *bool) (*GitRepoInfo, error) {
dir := s.getSkillsDir()
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
idx := -1
for i, r := range repos {
if r.ID == id {
idx = i
if repoURL != "" {
parsedURL, err := url.Parse(repoURL)
if err != nil || parsedURL.Scheme == "" {
return nil, fmt.Errorf("invalid repository URL")
}
repos[i].URL = repoURL
repos[i].Name = skillgit.ExtractRepoName(repoURL)
}
if enabled != nil {
repos[i].Enabled = *enabled
}
break
}
}
if idx < 0 {
return nil, fmt.Errorf("repository not found")
}
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
s.skillsService.RefreshManagerFromConfig()
r := repos[idx]
return &GitRepoInfo{ID: r.ID, URL: r.URL, Name: r.Name, Enabled: r.Enabled}, nil
}
func (s *AgentPoolService) DeleteGitRepo(id string) error {
dir := s.getSkillsDir()
if dir == "" {
return fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return err
}
var newRepos []skillgit.GitRepoConfig
var repoName string
for _, r := range repos {
if r.ID == id {
repoName = r.Name
} else {
newRepos = append(newRepos, r)
}
}
if len(newRepos) == len(repos) {
return fmt.Errorf("repository not found")
}
if err := cm.SaveConfig(newRepos); err != nil {
return err
}
if repoName != "" {
os.RemoveAll(filepath.Join(dir, repoName))
}
s.skillsService.RefreshManagerFromConfig()
return nil
}
func (s *AgentPoolService) SyncGitRepo(id string) error {
dir := s.getSkillsDir()
if dir == "" {
return fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return err
}
var repoURL string
for _, r := range repos {
if r.ID == id {
repoURL = r.URL
break
}
}
if repoURL == "" {
return fmt.Errorf("repository not found")
}
mgr, err := s.skillsService.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("manager not ready")
}
go func() {
syncer := skillgit.NewGitSyncer(dir, []string{repoURL}, mgr.RebuildIndex)
if err := syncer.Start(); err != nil {
xlog.Error("background sync failed", "id", id, "error", err)
s.skillsService.RefreshManagerFromConfig()
return
}
syncer.Stop()
s.skillsService.RefreshManagerFromConfig()
}()
return nil
}
func (s *AgentPoolService) ToggleGitRepo(id string) (*GitRepoInfo, error) {
dir := s.getSkillsDir()
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
for i, r := range repos {
if r.ID == id {
repos[i].Enabled = !repos[i].Enabled
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
s.skillsService.RefreshManagerFromConfig()
return &GitRepoInfo{ID: repos[i].ID, URL: repos[i].URL, Name: repos[i].Name, Enabled: repos[i].Enabled}, nil
}
}
return nil, fmt.Errorf("repository not found")
}
// --- Collections ---
func (s *AgentPoolService) CollectionsBackend() collections.Backend {
return s.collectionsBackend
}
func (s *AgentPoolService) ListCollections() ([]string, error) {
return s.collectionsBackend.ListCollections()
}
func (s *AgentPoolService) CreateCollection(name string) error {
return s.collectionsBackend.CreateCollection(name)
}
func (s *AgentPoolService) UploadToCollection(collection, filename string, fileBody io.Reader) (string, error) {
return s.collectionsBackend.Upload(collection, filename, fileBody)
}
func (s *AgentPoolService) ListCollectionEntries(collection string) ([]string, error) {
return s.collectionsBackend.ListEntries(collection)
}
func (s *AgentPoolService) GetCollectionEntryContent(collection, entry string) (string, int, error) {
return s.collectionsBackend.GetEntryContent(collection, entry)
}
func (s *AgentPoolService) SearchCollection(collection, query string, maxResults int) ([]collections.SearchResult, error) {
return s.collectionsBackend.Search(collection, query, maxResults)
}
func (s *AgentPoolService) ResetCollection(collection string) error {
return s.collectionsBackend.Reset(collection)
}
func (s *AgentPoolService) DeleteCollectionEntry(collection, entry string) ([]string, error) {
return s.collectionsBackend.DeleteEntry(collection, entry)
}
func (s *AgentPoolService) AddCollectionSource(collection, sourceURL string, intervalMin int) error {
return s.collectionsBackend.AddSource(collection, sourceURL, intervalMin)
}
func (s *AgentPoolService) RemoveCollectionSource(collection, sourceURL string) error {
return s.collectionsBackend.RemoveSource(collection, sourceURL)
}
func (s *AgentPoolService) ListCollectionSources(collection string) ([]collections.SourceInfo, error) {
return s.collectionsBackend.ListSources(collection)
}
func (s *AgentPoolService) CollectionEntryExists(collection, entry string) bool {
return s.collectionsBackend.EntryExists(collection, entry)
}
func (s *AgentPoolService) GetCollectionEntryFilePath(collection, entry string) (string, error) {
return s.collectionsBackend.GetEntryFilePath(collection, entry)
}
// --- User Services ---
// SetUserServicesManager sets the user services manager for per-user scoping.
func (s *AgentPoolService) SetUserServicesManager(usm *UserServicesManager) {
s.userServices = usm
}
// UserStorage returns the user-scoped storage.
func (s *AgentPoolService) UserStorage() *UserScopedStorage {
return s.userStorage
}
// UserServicesManager returns the user services manager.
func (s *AgentPoolService) UserServicesManager() *UserServicesManager {
return s.userServices
}
// SetAuthDB sets the auth database for API key generation.
func (s *AgentPoolService) SetAuthDB(db *gorm.DB) {
s.authDB = db
}
// --- Admin Aggregation ---
// UserAgentInfo holds agent info for cross-user listing.
type UserAgentInfo struct {
Name string `json:"name"`
Active bool `json:"active"`
}
// ListAllAgentsGrouped returns all agents grouped by user ID.
// Keys without ":" go into the "" (root) group.
func (s *AgentPoolService) ListAllAgentsGrouped() map[string][]UserAgentInfo {
result := map[string][]UserAgentInfo{}
agents := s.pool.List()
for _, a := range agents {
ag := s.pool.GetAgent(a)
if ag == nil {
continue
}
userID := ""
name := a
if idx := strings.Index(a, ":"); idx >= 0 {
userID = a[:idx]
name = a[idx+1:]
}
result[userID] = append(result[userID], UserAgentInfo{
Name: name,
Active: !ag.Paused(),
})
}
return result
}
// --- ForUser Skills ---
// ListSkillsForUser lists skills for a specific user.
func (s *AgentPoolService) ListSkillsForUser(userID string) ([]skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
if mgr == nil {
return []skilldomain.Skill{}, nil
}
return nil, err
}
return mgr.ListSkills()
}
// GetSkillForUser gets a skill for a specific user.
func (s *AgentPoolService) GetSkillForUser(userID, name string) (*skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
return mgr.ReadSkill(name)
}
// SearchSkillsForUser searches skills for a specific user.
func (s *AgentPoolService) SearchSkillsForUser(userID, query string) ([]skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
return mgr.SearchSkills(query)
}
// CreateSkillForUser creates a skill for a specific user.
func (s *AgentPoolService) CreateSkillForUser(userID, name, description, content, license, compatibility, allowedTools string, metadata map[string]string) (*skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
if err := skilldomain.ValidateSkillName(name); err != nil {
return nil, err
}
skillsDir := fsManager.GetSkillsDir()
skillDir := filepath.Join(skillsDir, name)
if _, err := os.Stat(skillDir); err == nil {
return nil, fmt.Errorf("skill already exists")
}
if err := os.MkdirAll(skillDir, 0755); err != nil {
return nil, err
}
frontmatter := fmt.Sprintf("---\nname: %s\ndescription: %s\n", name, description)
if license != "" {
frontmatter += fmt.Sprintf("license: %s\n", license)
}
if compatibility != "" {
frontmatter += fmt.Sprintf("compatibility: %s\n", compatibility)
}
if len(metadata) > 0 {
frontmatter += "metadata:\n"
for k, v := range metadata {
frontmatter += fmt.Sprintf(" %s: %s\n", k, v)
}
}
if allowedTools != "" {
frontmatter += fmt.Sprintf("allowed-tools: %s\n", allowedTools)
}
frontmatter += "---\n\n"
if err := os.WriteFile(filepath.Join(skillDir, "SKILL.md"), []byte(frontmatter+content), 0644); err != nil {
os.RemoveAll(skillDir)
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(name)
}
// UpdateSkillForUser updates a skill for a specific user.
func (s *AgentPoolService) UpdateSkillForUser(userID, name, description, content, license, compatibility, allowedTools string, metadata map[string]string) (*skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
existing, err := mgr.ReadSkill(name)
if err != nil {
return nil, fmt.Errorf("skill not found")
}
if existing.ReadOnly {
return nil, fmt.Errorf("cannot update read-only skill from git repository")
}
skillDir := filepath.Join(fsManager.GetSkillsDir(), name)
frontmatter := fmt.Sprintf("---\nname: %s\ndescription: %s\n", name, description)
if license != "" {
frontmatter += fmt.Sprintf("license: %s\n", license)
}
if compatibility != "" {
frontmatter += fmt.Sprintf("compatibility: %s\n", compatibility)
}
if len(metadata) > 0 {
frontmatter += "metadata:\n"
for k, v := range metadata {
frontmatter += fmt.Sprintf(" %s: %s\n", k, v)
}
}
if allowedTools != "" {
frontmatter += fmt.Sprintf("allowed-tools: %s\n", allowedTools)
}
frontmatter += "---\n\n"
if err := os.WriteFile(filepath.Join(skillDir, "SKILL.md"), []byte(frontmatter+content), 0644); err != nil {
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(name)
}
// DeleteSkillForUser deletes a skill for a specific user.
func (s *AgentPoolService) DeleteSkillForUser(userID, name string) error {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return err
}
if svc == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return fmt.Errorf("unsupported manager type")
}
existing, err := mgr.ReadSkill(name)
if err != nil {
return fmt.Errorf("skill not found")
}
if existing.ReadOnly {
return fmt.Errorf("cannot delete read-only skill from git repository")
}
skillDir := filepath.Join(fsManager.GetSkillsDir(), name)
if err := os.RemoveAll(skillDir); err != nil {
return err
}
return mgr.RebuildIndex()
}
// ExportSkillForUser exports a skill for a specific user.
func (s *AgentPoolService) ExportSkillForUser(userID, name string) ([]byte, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
skill, err := mgr.ReadSkill(name)
if err != nil {
return nil, fmt.Errorf("skill not found")
}
return skilldomain.ExportSkill(skill.ID, fsManager.GetSkillsDir())
}
// ImportSkillForUser imports a skill for a specific user.
func (s *AgentPoolService) ImportSkillForUser(userID string, archiveData []byte) (*skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
if svc == nil {
return nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, fmt.Errorf("skills directory not configured")
}
fsManager, ok := mgr.(*skilldomain.FileSystemManager)
if !ok {
return nil, fmt.Errorf("unsupported manager type")
}
skillName, err := skilldomain.ImportSkill(archiveData, fsManager.GetSkillsDir())
if err != nil {
return nil, err
}
if err := mgr.RebuildIndex(); err != nil {
return nil, fmt.Errorf("failed to rebuild index: %w", err)
}
return mgr.ReadSkill(skillName)
}
// GetSkillsConfigForUser returns the skills config for a specific user.
func (s *AgentPoolService) GetSkillsConfigForUser(userID string) map[string]any {
svc, err := s.SkillsServiceForUser(userID)
if err != nil || svc == nil {
return nil
}
return map[string]any{"skills_dir": svc.GetSkillsDir()}
}
// --- ForUser Skill Resources ---
// ListSkillResourcesForUser lists resources for a user's skill.
func (s *AgentPoolService) ListSkillResourcesForUser(userID, skillName string) ([]skilldomain.SkillResource, *skilldomain.Skill, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, nil, err
}
if svc == nil {
return nil, nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, nil, fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return nil, nil, fmt.Errorf("skill not found")
}
resources, err := mgr.ListSkillResources(skill.ID)
if err != nil {
return nil, nil, err
}
return resources, skill, nil
}
// GetSkillResourceForUser gets a resource for a user's skill.
func (s *AgentPoolService) GetSkillResourceForUser(userID, skillName, resourcePath string) (*skilldomain.ResourceContent, *skilldomain.SkillResource, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, nil, err
}
if svc == nil {
return nil, nil, fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return nil, nil, fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return nil, nil, fmt.Errorf("skill not found")
}
info, err := mgr.GetSkillResourceInfo(skill.ID, resourcePath)
if err != nil {
return nil, nil, fmt.Errorf("resource not found")
}
content, err := mgr.ReadSkillResource(skill.ID, resourcePath)
if err != nil {
return nil, nil, err
}
return content, info, nil
}
// CreateSkillResourceForUser creates a resource for a user's skill.
func (s *AgentPoolService) CreateSkillResourceForUser(userID, skillName, path string, data []byte) error {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return err
}
if svc == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot add resources to read-only skill")
}
if err := skilldomain.ValidateResourcePath(path); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, path)
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
return err
}
return os.WriteFile(fullPath, data, 0644)
}
// UpdateSkillResourceForUser updates a resource for a user's skill.
func (s *AgentPoolService) UpdateSkillResourceForUser(userID, skillName, resourcePath, content string) error {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return err
}
if svc == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot update resources in read-only skill")
}
if err := skilldomain.ValidateResourcePath(resourcePath); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, resourcePath)
return os.WriteFile(fullPath, []byte(content), 0644)
}
// DeleteSkillResourceForUser deletes a resource for a user's skill.
func (s *AgentPoolService) DeleteSkillResourceForUser(userID, skillName, resourcePath string) error {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return err
}
if svc == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("skills directory not configured")
}
skill, err := mgr.ReadSkill(skillName)
if err != nil {
return fmt.Errorf("skill not found")
}
if skill.ReadOnly {
return fmt.Errorf("cannot delete resources from read-only skill")
}
if err := skilldomain.ValidateResourcePath(resourcePath); err != nil {
return err
}
fullPath := filepath.Join(skill.SourcePath, resourcePath)
return os.Remove(fullPath)
}
// --- ForUser Collections ---
// ListCollectionsForUser lists collections for a specific user.
func (s *AgentPoolService) ListCollectionsForUser(userID string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListCollections()
}
// CreateCollectionForUser creates a collection for a specific user.
func (s *AgentPoolService) CreateCollectionForUser(userID, name string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.CreateCollection(name)
}
// UploadToCollectionForUser uploads to a collection for a specific user.
func (s *AgentPoolService) UploadToCollectionForUser(userID, collection, filename string, fileBody io.Reader) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
return backend.Upload(collection, filename, fileBody)
}
// CollectionEntryExistsForUser checks if an entry exists in a user's collection.
func (s *AgentPoolService) CollectionEntryExistsForUser(userID, collection, entry string) bool {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return false
}
return backend.EntryExists(collection, entry)
}
// ListCollectionEntriesForUser lists entries in a user's collection.
func (s *AgentPoolService) ListCollectionEntriesForUser(userID, collection string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListEntries(collection)
}
// GetCollectionEntryContentForUser gets entry content for a user's collection.
func (s *AgentPoolService) GetCollectionEntryContentForUser(userID, collection, entry string) (string, int, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", 0, err
}
return backend.GetEntryContent(collection, entry)
}
// SearchCollectionForUser searches a user's collection.
func (s *AgentPoolService) SearchCollectionForUser(userID, collection, query string, maxResults int) ([]collections.SearchResult, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.Search(collection, query, maxResults)
}
// ResetCollectionForUser resets a user's collection.
func (s *AgentPoolService) ResetCollectionForUser(userID, collection string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.Reset(collection)
}
// DeleteCollectionEntryForUser deletes an entry from a user's collection.
func (s *AgentPoolService) DeleteCollectionEntryForUser(userID, collection, entry string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.DeleteEntry(collection, entry)
}
// AddCollectionSourceForUser adds a source to a user's collection.
func (s *AgentPoolService) AddCollectionSourceForUser(userID, collection, sourceURL string, intervalMin int) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.AddSource(collection, sourceURL, intervalMin)
}
// RemoveCollectionSourceForUser removes a source from a user's collection.
func (s *AgentPoolService) RemoveCollectionSourceForUser(userID, collection, sourceURL string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.RemoveSource(collection, sourceURL)
}
// ListCollectionSourcesForUser lists sources for a user's collection.
func (s *AgentPoolService) ListCollectionSourcesForUser(userID, collection string) ([]collections.SourceInfo, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListSources(collection)
}
// GetCollectionEntryFilePathForUser gets the file path for an entry in a user's collection.
func (s *AgentPoolService) GetCollectionEntryFilePathForUser(userID, collection, entry string) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
return backend.GetEntryFilePath(collection, entry)
}
// --- ForUser Git Repos ---
// getSkillsDirForUser returns the skills directory for a specific user.
func (s *AgentPoolService) getSkillsDirForUser(userID string) string {
svc, err := s.SkillsServiceForUser(userID)
if err != nil || svc == nil {
return ""
}
return svc.GetSkillsDir()
}
// ListGitReposForUser lists git repos for a specific user.
func (s *AgentPoolService) ListGitReposForUser(userID string) ([]GitRepoInfo, error) {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return []GitRepoInfo{}, nil
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
out := make([]GitRepoInfo, len(repos))
for i, r := range repos {
out[i] = GitRepoInfo{ID: r.ID, URL: r.URL, Name: r.Name, Enabled: r.Enabled}
}
return out, nil
}
// AddGitRepoForUser adds a git repo for a specific user.
func (s *AgentPoolService) AddGitRepoForUser(userID, repoURL string) (*GitRepoInfo, error) {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
if !strings.HasPrefix(repoURL, "http://") && !strings.HasPrefix(repoURL, "https://") && !strings.HasPrefix(repoURL, "git@") {
return nil, fmt.Errorf("invalid URL format")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
for _, r := range repos {
if r.URL == repoURL {
return nil, fmt.Errorf("repository already exists")
}
}
newRepo := skillgit.GitRepoConfig{
ID: skillgit.GenerateID(repoURL),
URL: repoURL,
Name: skillgit.ExtractRepoName(repoURL),
Enabled: true,
}
repos = append(repos, newRepo)
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
go func() {
svc, err := s.SkillsServiceForUser(userID)
if err != nil || svc == nil {
return
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return
}
syncer := skillgit.NewGitSyncer(dir, []string{repoURL}, mgr.RebuildIndex)
if err := syncer.Start(); err != nil {
xlog.Error("background sync failed", "url", repoURL, "error", err)
svc.RefreshManagerFromConfig()
return
}
syncer.Stop()
svc.RefreshManagerFromConfig()
}()
return &GitRepoInfo{ID: newRepo.ID, URL: newRepo.URL, Name: newRepo.Name, Enabled: newRepo.Enabled}, nil
}
// UpdateGitRepoForUser updates a git repo for a specific user.
func (s *AgentPoolService) UpdateGitRepoForUser(userID, id, repoURL string, enabled *bool) (*GitRepoInfo, error) {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
idx := -1
for i, r := range repos {
if r.ID == id {
idx = i
if repoURL != "" {
parsedURL, err := url.Parse(repoURL)
if err != nil || parsedURL.Scheme == "" {
return nil, fmt.Errorf("invalid repository URL")
}
repos[i].URL = repoURL
repos[i].Name = skillgit.ExtractRepoName(repoURL)
}
if enabled != nil {
repos[i].Enabled = *enabled
}
break
}
}
if idx < 0 {
return nil, fmt.Errorf("repository not found")
}
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
svc, _ := s.SkillsServiceForUser(userID)
if svc != nil {
svc.RefreshManagerFromConfig()
}
r := repos[idx]
return &GitRepoInfo{ID: r.ID, URL: r.URL, Name: r.Name, Enabled: r.Enabled}, nil
}
// DeleteGitRepoForUser deletes a git repo for a specific user.
func (s *AgentPoolService) DeleteGitRepoForUser(userID, id string) error {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return err
}
var newRepos []skillgit.GitRepoConfig
var repoName string
for _, r := range repos {
if r.ID == id {
repoName = r.Name
} else {
newRepos = append(newRepos, r)
}
}
if len(newRepos) == len(repos) {
return fmt.Errorf("repository not found")
}
if err := cm.SaveConfig(newRepos); err != nil {
return err
}
if repoName != "" {
os.RemoveAll(filepath.Join(dir, repoName))
}
svc, _ := s.SkillsServiceForUser(userID)
if svc != nil {
svc.RefreshManagerFromConfig()
}
return nil
}
// SyncGitRepoForUser syncs a git repo for a specific user.
func (s *AgentPoolService) SyncGitRepoForUser(userID, id string) error {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return err
}
var repoURL string
for _, r := range repos {
if r.ID == id {
repoURL = r.URL
break
}
}
if repoURL == "" {
return fmt.Errorf("repository not found")
}
svc, err := s.SkillsServiceForUser(userID)
if err != nil || svc == nil {
return fmt.Errorf("skills service not available")
}
mgr, err := svc.GetManager()
if err != nil || mgr == nil {
return fmt.Errorf("manager not ready")
}
go func() {
syncer := skillgit.NewGitSyncer(dir, []string{repoURL}, mgr.RebuildIndex)
if err := syncer.Start(); err != nil {
xlog.Error("background sync failed", "id", id, "error", err)
svc.RefreshManagerFromConfig()
return
}
syncer.Stop()
svc.RefreshManagerFromConfig()
}()
return nil
}
// ToggleGitRepoForUser toggles a git repo for a specific user.
func (s *AgentPoolService) ToggleGitRepoForUser(userID, id string) (*GitRepoInfo, error) {
dir := s.getSkillsDirForUser(userID)
if dir == "" {
return nil, fmt.Errorf("skills directory not configured")
}
cm := skillgit.NewConfigManager(dir)
repos, err := cm.LoadConfig()
if err != nil {
return nil, err
}
for i, r := range repos {
if r.ID == id {
repos[i].Enabled = !repos[i].Enabled
if err := cm.SaveConfig(repos); err != nil {
return nil, err
}
svc, _ := s.SkillsServiceForUser(userID)
if svc != nil {
svc.RefreshManagerFromConfig()
}
return &GitRepoInfo{ID: repos[i].ID, URL: repos[i].URL, Name: repos[i].Name, Enabled: repos[i].Enabled}, nil
}
}
return nil, fmt.Errorf("repository not found")
}
// --- ForUser Agent Methods ---
// agentKey returns the namespaced key for an agent: "{userID}:{name}" or just "{name}" if no userID.
func agentKey(userID, name string) string {
if userID == "" {
return name
}
return userID + ":" + name
}
// ListAgentsForUser lists agents belonging to a specific user.
// If userID is empty, returns all agents (backward compat).
func (s *AgentPoolService) ListAgentsForUser(userID string) map[string]bool {
statuses := map[string]bool{}
agents := s.pool.List()
prefix := ""
if userID != "" {
prefix = userID + ":"
}
for _, a := range agents {
if userID != "" && !strings.HasPrefix(a, prefix) {
continue
}
ag := s.pool.GetAgent(a)
if ag == nil {
continue
}
displayName := a
if prefix != "" {
displayName = strings.TrimPrefix(a, prefix)
}
statuses[displayName] = !ag.Paused()
}
return statuses
}
// CreateAgentForUser creates an agent namespaced to a user.
// When auth is enabled and the agent config has no API key, a new user API key
// is auto-generated so the agent can authenticate against LocalAI's own API.
func (s *AgentPoolService) CreateAgentForUser(userID string, config *state.AgentConfig) error {
if err := ValidateAgentName(config.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.authDB, userID, "agent:"+config.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
xlog.Info("Auto-generated API key for agent", "agent", config.Name, "user", userID)
}
key := agentKey(userID, config.Name)
config.Name = key
return s.pool.CreateAgent(key, config)
}
// GetAgentForUser returns the agent for a user.
func (s *AgentPoolService) GetAgentForUser(userID, name string) *agent.Agent {
return s.pool.GetAgent(agentKey(userID, name))
}
// GetAgentConfigForUser returns the agent config for a user's agent.
func (s *AgentPoolService) GetAgentConfigForUser(userID, name string) *state.AgentConfig {
return s.pool.GetConfig(agentKey(userID, name))
}
// UpdateAgentForUser updates a user's agent.
func (s *AgentPoolService) UpdateAgentForUser(userID, name string, config *state.AgentConfig) error {
key := agentKey(userID, name)
old := s.pool.GetConfig(key)
if old == nil {
return fmt.Errorf("agent not found: %s", name)
}
// Auto-generate a user API key when auth is active and none is specified
if s.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.authDB, userID, "agent:"+name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
}
config.Name = key
return s.pool.RecreateAgent(key, config)
}
// DeleteAgentForUser deletes a user's agent.
func (s *AgentPoolService) DeleteAgentForUser(userID, name string) error {
return s.pool.Remove(agentKey(userID, name))
}
// PauseAgentForUser pauses a user's agent.
func (s *AgentPoolService) PauseAgentForUser(userID, name string) error {
ag := s.pool.GetAgent(agentKey(userID, name))
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Pause()
return nil
}
// ResumeAgentForUser resumes a user's agent.
func (s *AgentPoolService) ResumeAgentForUser(userID, name string) error {
ag := s.pool.GetAgent(agentKey(userID, name))
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Resume()
return nil
}
// GetAgentStatusForUser returns the status of a user's agent.
func (s *AgentPoolService) GetAgentStatusForUser(userID, name string) *state.Status {
return s.pool.GetStatusHistory(agentKey(userID, name))
}
// GetAgentObservablesForUser returns observables for a user's agent.
func (s *AgentPoolService) GetAgentObservablesForUser(userID, name string) ([]coreTypes.Observable, error) {
ag := s.pool.GetAgent(agentKey(userID, name))
if ag == nil {
return nil, fmt.Errorf("agent not found: %s", name)
}
return ag.Observer().History(), nil
}
// ClearAgentObservablesForUser clears observables for a user's agent.
func (s *AgentPoolService) ClearAgentObservablesForUser(userID, name string) error {
ag := s.pool.GetAgent(agentKey(userID, name))
if ag == nil {
return fmt.Errorf("agent not found: %s", name)
}
ag.Observer().ClearHistory()
return nil
}
// ChatForUser sends a message to a user's agent.
func (s *AgentPoolService) ChatForUser(userID, name, message string) (string, error) {
return s.Chat(agentKey(userID, name), message)
}
// GetSSEManagerForUser returns the SSE manager for a user's agent.
func (s *AgentPoolService) GetSSEManagerForUser(userID, name string) sse.Manager {
return s.pool.GetManager(agentKey(userID, name))
}
// ExportAgentForUser exports a user's agent config.
func (s *AgentPoolService) ExportAgentForUser(userID, name string) ([]byte, error) {
return s.ExportAgent(agentKey(userID, name))
}
// ImportAgentForUser imports an agent for a user.
func (s *AgentPoolService) ImportAgentForUser(userID string, data []byte) error {
var cfg state.AgentConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("invalid agent config: %w", err)
}
if err := ValidateAgentName(cfg.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.authDB != nil && userID != "" && cfg.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.authDB, userID, "agent:"+cfg.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
cfg.APIKey = plaintext
}
key := agentKey(userID, cfg.Name)
cfg.Name = key
return s.pool.CreateAgent(key, &cfg)
}
// --- ForUser Collections ---
// CollectionsBackendForUser returns the collections backend for a user.
func (s *AgentPoolService) CollectionsBackendForUser(userID string) (collections.Backend, error) {
if s.userServices == nil || userID == "" {
return s.collectionsBackend, nil
}
return s.userServices.GetCollections(userID)
}
// --- ForUser Skills ---
// SkillsServiceForUser returns the skills service for a user.
func (s *AgentPoolService) SkillsServiceForUser(userID string) (*skills.Service, error) {
if s.userServices == nil || userID == "" {
return s.skillsService, nil
}
return s.userServices.GetSkills(userID)
}
// --- ForUser Jobs ---
// JobServiceForUser returns the agent job service for a user.
func (s *AgentPoolService) JobServiceForUser(userID string) (*AgentJobService, error) {
if s.userServices == nil || userID == "" {
return nil, fmt.Errorf("no user services manager or empty user ID")
}
return s.userServices.GetJobs(userID)
}
// --- Actions ---
// ListAvailableActions returns the list of all available action type names.
func (s *AgentPoolService) ListAvailableActions() []string {
return agiServices.AvailableActions
}
// GetActionDefinition creates an action instance by name with the given config and returns its definition.
func (s *AgentPoolService) GetActionDefinition(actionName string, actionConfig map[string]string) (any, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.pool, s.actionsConfig)
if err != nil {
return nil, err
}
return a.Definition(), nil
}
// ExecuteAction creates an action instance and runs it with the given params.
func (s *AgentPoolService) ExecuteAction(ctx context.Context, actionName string, actionConfig map[string]string, params coreTypes.ActionParams) (coreTypes.ActionResult, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.pool, s.actionsConfig)
if err != nil {
return coreTypes.ActionResult{}, err
}
return a.Run(ctx, s.sharedState, params)
}