Files
LocalAI/core/services/agentpool/agent_pool.go
Richard Palethorpe 670259ce43 chore: Security hardening (#9719)
* fix(http): close 0.0.0.0/[::] SSRF bypass in /api/cors-proxy

The CORS proxy carried its own private-network blocklist (RFC 1918 + a
handful of IPv6 ranges) instead of using the same classification as
pkg/utils/urlfetch.go. The hand-rolled list missed 0.0.0.0/8 and ::/128,
both of which Linux routes to localhost — so any user with FeatureMCP
(default-on for new users) could reach LocalAI's own listener and any
other service bound to 0.0.0.0:port via:

  GET /api/cors-proxy?url=http://0.0.0.0:8080/...
  GET /api/cors-proxy?url=http://[::]:8080/...

Replace the custom check with utils.IsPublicIP (Go stdlib IsLoopback /
IsLinkLocalUnicast / IsPrivate / IsUnspecified, plus IPv4-mapped IPv6
unmasking) and add an upfront hostname rejection for localhost, *.local,
and the cloud metadata aliases so split-horizon DNS can't paper over the
IP check.

The IP-pinning DialContext is unchanged: the validated IP from the
single resolution is reused for the connection, so DNS rebinding still
cannot swap a public answer for a private one between validate and dial.

Regression tests cover 0.0.0.0, 0.0.0.0:PORT, [::], ::ffff:127.0.0.1,
::ffff:10.0.0.1, file://, gopher://, ftp://, localhost, 127.0.0.1,
10.0.0.1, 169.254.169.254, metadata.google.internal.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(downloader): verify SHA before promoting temp file to final path

DownloadFileWithContext renamed the .partial file to its final name
*before* checking the streamed SHA, so a hash mismatch returned an
error but left the tampered file at filePath. Subsequent code that
operated on filePath (a backend launcher, a YAML loader, a re-download
that finds the file already present and skips) would consume the
attacker-supplied bytes.

Reorder: verify the streamed hash first, remove the .partial on
mismatch, then rename. The streamed hash is computed during io.Copy
so no second read is needed.

While here, raise the empty-SHA case from a Debug log to a Warn so
"this download had no integrity check" is visible at the default log
level. Backend installs currently pass through with no digest; the
warning makes that footprint observable without changing behaviour.

Regression test asserts os.IsNotExist on the destination after a
deliberate SHA mismatch.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(auth): require email_verified for OIDC admin promotion

extractOIDCUserInfo read the ID token's "email" claim but never
inspected "email_verified". With LOCALAI_ADMIN_EMAIL set, an attacker
who could register on the configured OIDC IdP under that email (some
IdPs accept self-supplied unverified emails) inherited admin role:

  - first login:  AssignRole(tx, email, adminEmail) → RoleAdmin
  - re-login:     MaybePromote(db, user, adminEmail) → flip to RoleAdmin

Add EmailVerified to oauthUserInfo, parse email_verified from the OIDC
claims (default false on absence so an IdP that omits the claim cannot
short-circuit the gate), and substitute "" for the role-decision email
when verified=false via emailForRoleDecision. The user record still
stores the unverified email for display.

GitHub's path defaults EmailVerified=true: GitHub only returns a public
profile email after verification, and fetchGitHubPrimaryEmail explicitly
filters to Verified=true.

Regression tests cover both the helper contract and integration with
AssignRole, including the bootstrap "first user" branch that would
otherwise mask the gate.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(cli): refuse public bind when no auth backend is configured

When neither an auth DB nor a static API key is set, the auth
middleware passes every request through. That is fine for a developer
laptop, a home LAN, or a Tailnet — the network itself is the trust
boundary. It is not fine on a public IP, where every model install,
settings change, and admin endpoint becomes reachable from the
internet.

Refuse to start in that exact configuration. Loopback, RFC 1918,
RFC 4193 ULA, link-local, and RFC 6598 CGNAT (Tailscale's default
range) all count as trusted; wildcard binds (`:port`, `0.0.0.0`,
`[::]`) are accepted only when every host interface is in one of those
ranges. Hostnames are resolved and treated as trusted only when every
answer is.

A new --allow-insecure-public-bind / LOCALAI_ALLOW_INSECURE_PUBLIC_BIND
flag opts out for deployments that gate access externally (a reverse
proxy enforcing auth, a mesh ACL, etc.). The error message lists this
plus the three constructive alternatives (bind a private interface,
enable --auth, set --api-keys).

The interface enumeration goes through a package-level interfaceAddrsFn
var so tests can simulate cloud-VM, home-LAN, Tailscale-only, and
enumeration-failure topologies without poking at the real network
stack.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* test(http): regression-test the localai_assistant admin gate

ChatEndpoint already rejects metadata.localai_assistant=true from a
non-admin caller, but the gate was open-coded inline with no direct
test coverage. The chat route is FeatureChat-gated (default-on), and
the assistant's in-process MCP server can install/delete models and
edit configs — the wrong handler change would silently turn the LLM
into a confused deputy.

Extract the gate into requireAssistantAccess(c, authEnabled) and pin
its behaviour: auth disabled is a no-op, unauthenticated is 403,
RoleUser is 403, RoleAdmin and the synthetic legacy-key admin are
admitted.

No behaviour change in the production path.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* test(http): assert every API route is auth-classified

The auth middleware classifies path prefixes (/api/, /v1/, /models/,
etc.) as protected and treats anything else as a static-asset
passthrough. A new endpoint shipped under a brand-new prefix — or a
new path that simply isn't on the prefix allowlist — would be
reachable anonymously.

Walk every route registered by API() with auth enabled and a fresh
in-memory database (no users, no keys), and assert each API-prefixed
route returns 401 / 404 / 405 to an anonymous request. Public surfaces
(/api/auth/*, /api/branding, /api/node/* token-authenticated routes,
/healthz, branding asset server, generated-content server, static
assets) are explicit allowlist entries with comments justifying them.

Build-tagged 'auth' so it runs against the SQLite-backed auth DB
(matches the existing auth suite).

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* test(http): pin agent endpoint per-user isolation contract

agents.go's getUserID / effectiveUserID / canImpersonateUser /
wantsAllUsers helpers are the single trust boundary for cross-user
access on agent, agent-jobs, collections, and skills routes. A
regression there is the difference between "regular user reads their
own data" and "regular user reads anyone's data via ?user_id=victim".

Lock in the contract:
  - effectiveUserID ignores ?user_id= for unauthenticated and RoleUser
  - effectiveUserID honours it for RoleAdmin and ProviderAgentWorker
  - wantsAllUsers requires admin AND the literal "true" string
  - canImpersonateUser is admin OR agent-worker, never plain RoleUser

No production change — this commit only adds tests.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(downloader): drop redundant stat in removePartialFile

The stat-then-remove pattern is a TOCTOU window and a wasted syscall —
os.Remove already returns ErrNotExist for the missing-file case, so trust
that and treat it as a no-op.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(http): redact secrets from trace buffer and distribution-token logs

The /api/traces buffer captured Authorization, Cookie, Set-Cookie, and
API-key headers verbatim from every request when tracing was enabled. The
endpoint is admin-only but the buffer is reachable via any heap-style
introspection and the captured tokens otherwise outlive the request.
Strip those header values at capture time. Body redaction is left to a
follow-up — the prompts are usually the operator's own and JSON-walking
is invasive.

Distribution tokens were also logged in plaintext from
core/explorer/discovery.go; logs forward to syslog/journald and outlive
the token. Redact those to a short prefix/suffix instead.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(auth): rate-limit OAuth callbacks separately from password endpoints

The shared 5/min/IP limit on auth endpoints is right for password-style
flows but too tight for OAuth callbacks: corporate SSO funnels many real
users through one outbound IP and would trip the limit. Add a separate
60/min/IP limiter for /api/auth/{github,oidc}/callback so callbacks are
bounded against floods without breaking shared-IP deployments.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(gallery): verify backend tarball sha256 when set in gallery entry

GalleryBackend gained an optional sha256 field; the install path now
threads it through to the existing downloader hash-verify (which already
streams, verifies, and rolls back on mismatch). Galleries without sha256
keep working; the empty-SHA path still emits the existing
"downloading without integrity check" warning.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* test(http): pin CSRF coverage on multipart endpoints

The CSRF middleware in app.go is global (e.Use) so it covers every
multipart upload route — branding assets, fine-tune datasets, audio
transforms, agent collections. Pin that contract: cross-site multipart
POSTs are rejected; same-origin / same-site / API-key clients are not.
Also pins the SameSite=Lax fallback path the skipper relies on when
Sec-Fetch-Site is absent.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(http): XSS hardening — CSP headers, safe href, base-href escape, SVG sandbox

Several closely related XSS-prevention changes spanning the SPA shell, the
React UI, and the branding asset server:

- New SecurityHeaders middleware sets CSP, X-Content-Type-Options,
  X-Frame-Options, and Referrer-Policy on every response. The CSP keeps
  script-src permissive because the Vite bundle relies on inline + eval'd
  scripts; tightening that requires moving to a nonce-based policy.

- The <base href> injection in the SPA shell escaped attacker-controllable
  Host / X-Forwarded-Host headers — a single quote in the host header
  broke out of the attribute. Pass through SecureBaseHref (html.EscapeString).

- Three React sinks rendering untrusted content via dangerouslySetInnerHTML
  switch to text-node rendering with whiteSpace: pre-wrap: user message
  bodies in Chat.jsx and AgentChat.jsx, and the agent activity log in
  AgentChat.jsx. The hand-rolled escape on the agent user-message variant
  is replaced by the same plain-text path.

- New safeHref util collapses non-allowlisted URI schemes (most
  importantly javascript:) to '#'. Applied to gallery `<a href={url}>`
  links in Models / Backends / Manage and to canvas artifact links —
  these come from gallery JSON or assistant tool calls and must be treated
  as untrusted.

- The branding asset server attaches a sandbox CSP plus same-origin CORP
  to .svg responses. The React UI loads logos via <img>, but the same URL
  is also reachable via direct navigation; this prevents script
  execution if a hostile SVG slipped past upload validation.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(http): bound HTTP server with read-header and idle timeouts

A net/http server with no timeouts is trivially Slowloris-able and leaks
idle keep-alive connections. Set ReadHeaderTimeout (30s) to plug the
slow-headers attack and IdleTimeout (120s) to cap keep-alive sockets.

ReadTimeout and WriteTimeout stay at 0 because request bodies can be
multi-GB model uploads and SSE / chat completions stream for many
minutes; operators who need tighter per-request bounds should terminate
slow clients at a reverse proxy.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* test(auth): pin PUT /api/auth/profile field-tampering contract

The handler uses an explicit local body struct (only name and avatar_url)
plus a gorm Updates(map) with a column allowlist, so an attacker posting
{"role":"admin","email":"...","password_hash":"..."} can't mass-assign
those fields. Lock that down with a regression test so a future
"let's just c.Bind(&user)" refactor breaks loudly.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(services): strip directory components from multipart upload filenames

UploadDataset and UploadToCollectionForUser took the raw multipart
file.Filename and joined it into a destination path. The fine-tune
upload was incidentally safe because of a UUID prefix that fused any
leading '..' to a literal segment, but the protection is fragile.
UploadToCollectionForUser handed the filename to a vendored backend
without sanitising at all.

Strip to filepath.Base at both boundaries and reject the trivial
unsafe values ("", ".", "..", "/").

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(react-ui): validate persisted MCP server entries on load

localStorage is shared across same-origin pages; an XSS that lands once
can poison persisted MCP server config to attempt header injection or
to feed a non-http URL into the fetch path on subsequent loads.
Validate every entry: types must match, URL must parse with http(s)
scheme, header keys/values must be control-char-free. Drop anything
that doesn't fit.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(http): close X-Forwarded-Prefix open redirect

The reverse-proxy support concatenated X-Forwarded-Prefix into the
redirect target without validation, so a forged header value of
"//evil.com" turned the SPA-shell redirect helper at /, /browse, and
/browse/* into a 301 to //evil.com/app. The path-strip middleware had
the same shape on its prefix-trailing-slash redirect.

Add SafeForwardedPrefix at the middleware boundary: must start with
a single '/', no protocol-relative '//' opener, no scheme, no
backslash, no control characters. Apply at both consumers; misconfig
trips the validator and the header is dropped.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(http): refuse wildcard CORS when LOCALAI_CORS=true with empty allowlist

When LOCALAI_CORS=true but LOCALAI_CORS_ALLOW_ORIGINS was empty, Echo's
CORSWithConfig saw an empty allow-list and fell back to its default
AllowOrigins=["*"]. An operator who flipped the strict-CORS feature
flag without populating the list got the opposite of what they asked
for. Echo never sets Allow-Credentials: true so this isn't directly
exploitable (cookies aren't sent under wildcard CORS), but the
misconfiguration trap is worth closing. Skip the registration and warn.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(auth): zxcvbn password strength check with user-acknowledged override

The previous policy was len < 8, which let through "Password1" and the
rest of the credential-stuffing corpus. LocalAI has no second factor
yet, so the bar needs to sit higher.

Add ValidatePasswordStrength using github.com/timbutler/zxcvbn (an
actively-maintained fork of the trustelem port; v1.0.4, April 2024):
- min 12 chars, max 72 (bcrypt's truncation point)
- reject NUL bytes (some bcrypt callers truncate at the first NUL)
- require zxcvbn score >= 3 ("safely unguessable, ~10^8 guesses to
  break"); the hint list ["localai", "local-ai", "admin"] penalises
  passwords built from the app's own branding

zxcvbn produces false positives sometimes (a strong-looking password
that happens to match a dictionary word) and operators occasionally
need to set a known-weak password (kiosk demos, CI rigs). Add an
acknowledgement path: PasswordPolicy{AllowWeak: true} skips the
entropy check while still enforcing the hard rules. The structured
PasswordErrorResponse marks weak-password rejections as Overridable
so the UI can surface a "use this anyway" checkbox.

Wired through register, self-service password change, and admin
password reset on both the server and the React UI.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(react-ui): drop HTML5 minLength on new-password inputs

minLength={12} on the new-password input let the browser block the
form submit silently before any JS or network call ran. The browser
focused the field, showed a brief native tooltip, and that was that —
no toast, no fetch, no clue. Reproducible by typing fewer than 12
chars on the second password change of a session.

The JS-level length check in handleSubmit already shows a toast and
the server rejects with a structured error, so the HTML5 attribute
was redundant defence anyway. Drop it.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(react-ui): bundle Geist fonts locally instead of fetching from Google

The new CSP correctly refused to apply styles from
fonts.googleapis.com because style-src is locked to 'self' and
'unsafe-inline'. Loosening the CSP would defeat its purpose; the
right fix is to stop reaching out to a third-party CDN for fonts on
every page load.

Add @fontsource-variable/geist and @fontsource-variable/geist-mono as
npm deps and import them once at boot. Drop the <link rel="preconnect">
and external stylesheet from index.html.

Side benefit: no third-party tracking via Referer / IP on every UI
load, no failure mode when offline / behind a captive portal.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* fix(react-ui): refresh i18n strings to reflect 12-char password minimum

The translations still said "at least 8 characters" everywhere — the
client-side toast on a too-short password change told the user the
wrong floor. Update tooShort and newPasswordPlaceholder /
newPasswordDescription across all five locales (en, es, it, de,
zh-CN) to match the real ValidatePasswordStrength rule.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

* feat(auth): make password length-floor overridable like the entropy check

The 12-char minimum was a policy choice, not a technical invariant —
only "non-empty", "<= 72 bytes", and "no NUL bytes" are real bcrypt
constraints. Treating length-12 as a hard rule was inconsistent with
the entropy check (already overridable) and friction for use cases
where the account is just a name on a session, not a security
boundary (single-user kiosk, CI rig, lab demo).

Restructure ValidatePasswordStrength:
- Hard rules (always enforced): non-empty, <= MaxPasswordLength, no NUL byte
- Policy rules (skipped when AllowWeak=true): length >= 12, zxcvbn score >= 3

PasswordError now marks password_too_short as Overridable too. The
React forms generalised from `error_code === 'password_too_weak'` to
`overridable === true`, and the JS-side preflight length checks were
removed (server is source of truth, returns the same checkbox flow).

Assisted-by: Claude:claude-opus-4-7 [Claude Code]
Signed-off-by: Richard Palethorpe <io@richiejp.com>

---------

Signed-off-by: Richard Palethorpe <io@richiejp.com>
2026-05-08 16:25:45 +02:00

1098 lines
36 KiB
Go

package agentpool
import (
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/services/agents"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/messaging"
skillsManager "github.com/mudler/LocalAI/core/services/skills"
"github.com/mudler/LocalAGI/core/agent"
"github.com/mudler/LocalAGI/core/sse"
"github.com/mudler/LocalAGI/core/state"
coreTypes "github.com/mudler/LocalAGI/core/types"
agiServices "github.com/mudler/LocalAGI/services"
"github.com/mudler/LocalAGI/services/skills"
"github.com/mudler/LocalAGI/webui/collections"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// localAGICore manages the in-process LocalAGI agent pool (standalone mode only).
type localAGICore struct {
pool *state.AgentPool
skillsService *skills.Service
configMeta state.AgentConfigMeta
sharedState *coreTypes.AgentSharedState
actionsConfig map[string]string
}
// distributedBridge connects to the NATS-based distributed agent system.
type distributedBridge struct {
natsClient messaging.Publisher // NATS client for distributed agent execution
agentStore *agents.AgentStore // PostgreSQL agent config store
eventBridge AgentEventBridge // Event bridge for SSE + persistence
skillStore *distributed.SkillStore // PostgreSQL skill metadata (distributed mode)
dispatcher agents.Dispatcher // Native dispatcher (distributed or local)
}
// userManager handles per-user services, storage, and auth.
type userManager struct {
userServices *UserServicesManager
userStorage *UserScopedStorage
authDB *gorm.DB
}
// AgentPoolService wraps LocalAGI's AgentPool, Skills service, and collections backend
// to provide agentic capabilities integrated directly into LocalAI.
type AgentPoolService struct {
appConfig *config.ApplicationConfig
collectionsBackend collections.Backend
configBackend AgentConfigBackend // Abstracts local vs distributed agent operations
localAGI localAGICore
distributed distributedBridge
users userManager
stateDir string
outputsDir string
apiURL string // Resolved API URL for agent execution
apiKey string // Resolved API key for agent execution
mu sync.Mutex
}
// AgentEventBridge is the interface for event publishing needed by AgentPoolService.
type AgentEventBridge interface {
PublishMessage(agentName, userID, sender, content, messageID string) error
PublishStatus(agentName, userID, status string) error
PublishStreamEvent(agentName, userID string, data map[string]any) error
RegisterCancel(key string, cancel context.CancelFunc)
DeregisterCancel(key string)
}
// AgentConfigStore is the interface for agent config persistence.
type AgentConfigStore interface {
SaveConfig(cfg *agents.AgentConfigRecord) error
GetConfig(userID, name string) (*agents.AgentConfigRecord, error)
ListConfigs(userID string) ([]agents.AgentConfigRecord, error)
DeleteConfig(userID, name string) error
UpdateStatus(userID, name, status string) error
UpdateLastRun(userID, name string) error
}
// AgentPoolOptions holds optional dependencies for AgentPoolService.
// Zero values are fine — the service degrades gracefully without them.
type AgentPoolOptions struct {
AuthDB *gorm.DB
SkillStore *distributed.SkillStore
NATSClient messaging.Publisher
EventBridge AgentEventBridge
AgentStore *agents.AgentStore
}
func NewAgentPoolService(appConfig *config.ApplicationConfig, opts ...AgentPoolOptions) (*AgentPoolService, error) {
svc := &AgentPoolService{
appConfig: appConfig,
}
if len(opts) > 0 {
o := opts[0]
if o.AuthDB != nil {
svc.users.authDB = o.AuthDB
}
if o.SkillStore != nil {
svc.distributed.skillStore = o.SkillStore
}
if o.NATSClient != nil {
svc.distributed.natsClient = o.NATSClient
}
if o.EventBridge != nil {
svc.distributed.eventBridge = o.EventBridge
}
if o.AgentStore != nil {
svc.distributed.agentStore = o.AgentStore
}
}
return svc, nil
}
func (s *AgentPoolService) Start(ctx context.Context) error {
cfg := s.appConfig.AgentPool
// API URL: use configured value, or derive self-referencing URL from LocalAI's address
apiURL := cfg.APIURL
if apiURL == "" {
_, port, err := net.SplitHostPort(s.appConfig.APIAddress)
if err != nil {
port = strings.TrimPrefix(s.appConfig.APIAddress, ":")
}
apiURL = "http://127.0.0.1:" + port
}
apiKey := cfg.APIKey
if apiKey == "" && len(s.appConfig.ApiKeys) > 0 {
apiKey = s.appConfig.ApiKeys[0]
}
s.apiURL = apiURL
s.apiKey = apiKey
// Distributed mode: use native executor + NATSDispatcher.
// No LocalAGI pool, no collections, no skills service — all stateless.
if s.distributed.natsClient != nil {
return s.startDistributed(ctx, apiURL, apiKey)
}
// Standalone mode: use LocalAGI pool (backward compat)
return s.startLocalAGI(ctx, cfg, apiURL, apiKey)
}
func (s *AgentPoolService) buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets string) *collections.Config {
cfg := s.appConfig.AgentPool
return &collections.Config{
LLMAPIURL: apiURL,
LLMAPIKey: apiKey,
LLMModel: cfg.DefaultModel,
CollectionDBPath: collectionDBPath,
FileAssets: fileAssets,
VectorEngine: cfg.VectorEngine,
EmbeddingModel: cfg.EmbeddingModel,
MaxChunkingSize: cfg.MaxChunkingSize,
ChunkOverlap: cfg.ChunkOverlap,
DatabaseURL: cfg.DatabaseURL,
}
}
// startDistributed initializes the native agent executor with NATS dispatcher.
// No LocalAGI pool is created — agent execution is stateless.
// Skills and collections are still initialized for the frontend UI.
func (s *AgentPoolService) startDistributed(ctx context.Context, apiURL, apiKey string) error {
cfg := s.appConfig.AgentPool
// State dir for skills and outputs
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
if err := os.MkdirAll(stateDir, 0750); err != nil {
xlog.Warn("Failed to create agent state dir", "error", err)
}
s.stateDir = stateDir
// Outputs directory
outputsDir := filepath.Join(stateDir, "outputs")
if err := os.MkdirAll(outputsDir, 0750); err != nil {
xlog.Warn("Failed to create outputs directory", "error", err)
}
s.outputsDir = outputsDir
// Skills service — same as standalone, filesystem-based
skillsSvc, err := skills.NewService(stateDir)
if err != nil {
xlog.Warn("Failed to create skills service in distributed mode", "error", err)
} else {
s.localAGI.skillsService = skillsSvc
}
// Collections backend — same as standalone, in-process
collectionDBPath := cfg.CollectionDBPath
if collectionDBPath == "" {
collectionDBPath = filepath.Join(stateDir, "collections")
}
fileAssets := filepath.Join(stateDir, "assets")
collectionsBackend, _ := collections.NewInProcessBackend(s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets))
s.collectionsBackend = collectionsBackend
// User-scoped storage
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
// Start the background agent scheduler on the frontend.
// It needs DB access to list configs and update LastRunAt — the worker doesn't have DB.
// The advisory lock ensures only one frontend instance runs the scheduler.
if s.users.authDB != nil && s.distributed.natsClient != nil && s.distributed.agentStore != nil {
var schedulerOpts []agents.AgentSchedulerOpt
if s.distributed.skillStore != nil {
schedulerOpts = append(schedulerOpts, agents.WithSchedulerSkillProvider(s.buildSkillProvider()))
}
scheduler := agents.NewAgentScheduler(
s.users.authDB,
s.distributed.natsClient,
s.distributed.agentStore,
messaging.SubjectAgentExecute,
schedulerOpts...,
)
go scheduler.Start(ctx)
}
// Wire the distributed config backend
s.configBackend = newDistributedAgentConfigBackend(s, s.distributed.agentStore)
xlog.Info("Agent pool started in distributed mode (frontend dispatcher only)", "apiURL", apiURL, "stateDir", stateDir)
return nil
}
// startLocalAGI initializes the full LocalAGI pool for standalone mode.
func (s *AgentPoolService) startLocalAGI(_ context.Context, cfg config.AgentPoolConfig, apiURL, apiKey string) error {
// State dir: explicit config > DataPath > DynamicConfigsDir > fallback
stateDir := cmp.Or(cfg.StateDir, s.appConfig.DataPath, s.appConfig.DynamicConfigsDir, "agents")
if err := os.MkdirAll(stateDir, 0750); err != nil {
return fmt.Errorf("failed to create agent pool state dir: %w", err)
}
// Collections paths
collectionDBPath := cfg.CollectionDBPath
if collectionDBPath == "" {
collectionDBPath = filepath.Join(stateDir, "collections")
}
fileAssets := filepath.Join(stateDir, "assets")
// Skills service
skillsSvc, err := skills.NewService(stateDir)
if err != nil {
xlog.Error("Failed to create skills service", "error", err)
}
s.localAGI.skillsService = skillsSvc
// Actions config map
actionsConfig := map[string]string{
agiServices.ConfigStateDir: stateDir,
}
if cfg.CustomActionsDir != "" {
actionsConfig[agiServices.CustomActionsDir] = cfg.CustomActionsDir
}
// Create outputs subdirectory
outputsDir := filepath.Join(stateDir, "outputs")
if err := os.MkdirAll(outputsDir, 0750); err != nil {
xlog.Error("Failed to create outputs directory", "path", outputsDir, "error", err)
}
s.localAGI.actionsConfig = actionsConfig
s.stateDir = stateDir
s.outputsDir = outputsDir
s.localAGI.sharedState = coreTypes.NewAgentSharedState(5 * time.Minute)
// Initialize user-scoped storage
dataDir := cmp.Or(s.appConfig.DataPath, s.appConfig.DynamicConfigsDir)
s.users.userStorage = NewUserScopedStorage(stateDir, dataDir)
// Create the agent pool
pool, err := state.NewAgentPool(
cfg.DefaultModel,
cfg.MultimodalModel,
cfg.TranscriptionModel,
cfg.TranscriptionLanguage,
cfg.TTSModel,
apiURL,
apiKey,
stateDir,
agiServices.Actions(actionsConfig),
agiServices.Connectors,
agiServices.DynamicPrompts(actionsConfig),
agiServices.Filters,
cfg.Timeout,
cfg.EnableLogs,
skillsSvc,
)
if err != nil {
return fmt.Errorf("failed to create agent pool: %w", err)
}
s.localAGI.pool = pool
// Create in-process collections backend and RAG provider
collectionsCfg := s.buildCollectionsConfig(apiURL, apiKey, collectionDBPath, fileAssets)
collectionsBackend, collectionsState := collections.NewInProcessBackend(collectionsCfg)
s.collectionsBackend = collectionsBackend
embedded := collections.RAGProviderFromState(collectionsState)
pool.SetRAGProvider(func(collectionName, _, _ string) (agent.RAGDB, state.KBCompactionClient, bool) {
return embedded(collectionName)
})
// Build config metadata for UI
s.localAGI.configMeta = state.NewAgentConfigMeta(
agiServices.ActionsConfigMeta(cfg.CustomActionsDir),
agiServices.ConnectorsConfigMeta(),
agiServices.DynamicPromptsConfigMeta(cfg.CustomActionsDir),
agiServices.FiltersConfigMeta(),
)
// Start all agents
if err := pool.StartAll(); err != nil {
xlog.Error("Failed to start agent pool", "error", err)
}
// Wire the local config backend
s.configBackend = newLocalAgentConfigBackend(s)
xlog.Info("Agent pool started (standalone/LocalAGI mode)", "stateDir", stateDir, "apiURL", apiURL)
return nil
}
func (s *AgentPoolService) Stop() {
if s.configBackend != nil {
s.configBackend.Stop()
}
}
// ConfigBackend returns the underlying AgentConfigBackend.
func (s *AgentPoolService) ConfigBackend() AgentConfigBackend {
return s.configBackend
}
// APIURL returns the resolved API URL for agent execution.
func (s *AgentPoolService) APIURL() string {
return s.apiURL
}
// APIKey returns the resolved API key for agent execution.
func (s *AgentPoolService) APIKey() string {
return s.apiKey
}
// Pool returns the underlying AgentPool.
func (s *AgentPoolService) Pool() *state.AgentPool {
return s.localAGI.pool
}
// SetNATSClient sets the NATS client for distributed agent execution.
// Deprecated: prefer passing NATSClient via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetNATSClient(nc messaging.Publisher) {
s.distributed.natsClient = nc
}
// SetEventBridge sets the event bridge for distributed SSE + persistence.
// Deprecated: prefer passing EventBridge via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetEventBridge(eb AgentEventBridge) {
s.distributed.eventBridge = eb
}
// SetAgentStore sets the PostgreSQL agent config store.
// Deprecated: prefer passing AgentStore via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetAgentStore(store *agents.AgentStore) {
s.distributed.agentStore = store
}
// Agent execution in distributed mode is handled by the dedicated agent-worker process
// using the NATSDispatcher from core/services/agents/dispatcher.go.
// The frontend only dispatches chat events to NATS via dispatchChat().
// --- Agent CRUD ---
func (s *AgentPoolService) GetAgent(name string) *agent.Agent {
// GetAgent is used by the responses interceptor to check if a model name
// is an agent. It uses the raw pool key (no userID prefix).
return s.configBackend.GetAgent("", name)
}
// Chat sends a message to an agent and returns immediately. Responses come via SSE.
func (s *AgentPoolService) Chat(name, message string) (string, error) {
ag := s.localAGI.pool.GetAgent(name)
if ag == nil {
return "", fmt.Errorf("%w: %s", ErrAgentNotFound, name)
}
manager := s.localAGI.pool.GetManager(name)
if manager == nil {
return "", fmt.Errorf("SSE manager not found for agent: %s", name)
}
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message via SSE
userMsg, _ := json.Marshal(map[string]any{
"id": messageID + "-user",
"sender": "user",
"content": message,
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(userMsg)).WithEvent("json_message"))
// Send processing status
statusMsg, _ := json.Marshal(map[string]any{
"status": "processing",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(statusMsg)).WithEvent("json_message_status"))
// Process asynchronously
go func() {
response := ag.Ask(coreTypes.WithText(message))
if response == nil {
errMsg, _ := json.Marshal(map[string]any{
"error": "agent request failed or was cancelled",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else if response.Error != nil {
errMsg, _ := json.Marshal(map[string]any{
"error": response.Error.Error(),
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(errMsg)).WithEvent("json_error"))
} else {
// Collect metadata from all action states
metadata := map[string]any{}
for _, state := range response.State {
for k, v := range state.Metadata {
if existing, ok := metadata[k]; ok {
if existList, ok := existing.([]string); ok {
if newList, ok := v.([]string); ok {
metadata[k] = append(existList, newList...)
continue
}
}
}
metadata[k] = v
}
}
if len(metadata) > 0 {
// Extract userID from the agent key (format: "userID:agentName")
var chatUserID string
if uid, _, ok := strings.Cut(name, ":"); ok {
chatUserID = uid
}
s.collectAndCopyMetadata(metadata, chatUserID)
}
msg := map[string]any{
"id": messageID + "-agent",
"sender": "agent",
"content": response.Response,
"timestamp": time.Now().Format(time.RFC3339),
}
if len(metadata) > 0 {
msg["metadata"] = metadata
}
respMsg, _ := json.Marshal(msg)
manager.Send(sse.NewMessage(string(respMsg)).WithEvent("json_message"))
}
completedMsg, _ := json.Marshal(map[string]any{
"status": "completed",
"timestamp": time.Now().Format(time.RFC3339),
})
manager.Send(sse.NewMessage(string(completedMsg)).WithEvent("json_message_status"))
}()
return messageID, nil
}
// userOutputsDir returns the per-user outputs directory, creating it if needed.
// If userID is empty, falls back to the shared outputs directory.
func (s *AgentPoolService) userOutputsDir(userID string) string {
if userID == "" {
return s.outputsDir
}
dir := filepath.Join(s.outputsDir, userID)
os.MkdirAll(dir, 0750)
return dir
}
// copyToOutputs copies a file into the per-user outputs directory and returns the new path.
// If the file is already inside the target dir, it returns the original path unchanged.
func (s *AgentPoolService) copyToOutputs(srcPath, userID string) (string, error) {
targetDir := s.userOutputsDir(userID)
srcClean := filepath.Clean(srcPath)
absTarget, _ := filepath.Abs(targetDir)
absSrc, _ := filepath.Abs(srcClean)
if strings.HasPrefix(absSrc, absTarget+string(os.PathSeparator)) {
return srcPath, nil
}
src, err := os.Open(srcClean)
if err != nil {
return "", err
}
defer src.Close()
dstPath := filepath.Join(targetDir, filepath.Base(srcClean))
dst, err := os.Create(dstPath)
if err != nil {
return "", err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return "", err
}
return dstPath, nil
}
// collectAndCopyMetadata iterates all metadata keys and, for any value that is
// a []string of local file paths, copies those files into the per-user outputs
// directory so the file endpoint can serve them from a single confined location.
// Entries that are URLs (http/https) are left unchanged.
func (s *AgentPoolService) collectAndCopyMetadata(metadata map[string]any, userID string) {
for key, val := range metadata {
list, ok := val.([]string)
if !ok {
continue
}
updated := make([]string, 0, len(list))
for _, p := range list {
if strings.HasPrefix(p, "http://") || strings.HasPrefix(p, "https://") {
updated = append(updated, p)
continue
}
newPath, err := s.copyToOutputs(p, userID)
if err != nil {
xlog.Error("Failed to copy file to outputs", "src", p, "error", err)
updated = append(updated, p)
continue
}
updated = append(updated, newPath)
}
metadata[key] = updated
}
}
func (s *AgentPoolService) GetConfigMeta() state.AgentConfigMeta {
return s.localAGI.configMeta
}
// GetConfigMetaResult returns the config metadata via the backend, which handles
// local vs distributed differences (LocalAGI metadata vs native static metadata).
func (s *AgentPoolService) GetConfigMetaResult() AgentConfigMetaResult {
return s.configBackend.GetConfigMeta()
}
func (s *AgentPoolService) AgentHubURL() string {
return s.appConfig.AgentPool.AgentHubURL
}
func (s *AgentPoolService) StateDir() string {
return s.stateDir
}
func (s *AgentPoolService) OutputsDir() string {
return s.outputsDir
}
// ExportAgent returns the agent config as JSON bytes.
func (s *AgentPoolService) ExportAgent(name string) ([]byte, error) {
// Extract userID and agent name from the key (format: "userID:agentName")
userID := ""
agentName := name
if u, a, ok := strings.Cut(name, ":"); ok {
userID = u
agentName = a
}
return s.configBackend.ExportConfig(userID, agentName)
}
// --- User Services ---
// SetUserServicesManager sets the user services manager for per-user scoping.
func (s *AgentPoolService) SetUserServicesManager(usm *UserServicesManager) {
s.users.userServices = usm
}
// UserStorage returns the user-scoped storage.
func (s *AgentPoolService) UserStorage() *UserScopedStorage {
return s.users.userStorage
}
// UserServicesManager returns the user services manager.
func (s *AgentPoolService) UserServicesManager() *UserServicesManager {
return s.users.userServices
}
// SetAuthDB sets the auth database for API key generation.
// Deprecated: prefer passing AuthDB via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetAuthDB(db *gorm.DB) {
s.users.authDB = db
}
// SetSkillStore sets the distributed skill store for persisting skill metadata to PostgreSQL.
// Deprecated: prefer passing SkillStore via AgentPoolOptions at construction time.
func (s *AgentPoolService) SetSkillStore(store *distributed.SkillStore) {
s.distributed.skillStore = store
}
// --- Admin Aggregation ---
// UserAgentInfo holds agent info for cross-user listing.
type UserAgentInfo struct {
Name string `json:"name"`
Active bool `json:"active"`
}
// ListAllAgentsGrouped returns all agents grouped by user ID.
// Keys without ":" go into the "" (root) group.
func (s *AgentPoolService) ListAllAgentsGrouped() map[string][]UserAgentInfo {
return s.configBackend.ListAllGrouped()
}
// --- ForUser Collections ---
// ListCollectionsForUser lists collections for a specific user.
func (s *AgentPoolService) ListCollectionsForUser(userID string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListCollections()
}
// CreateCollectionForUser creates a collection for a specific user.
func (s *AgentPoolService) CreateCollectionForUser(userID, name string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.CreateCollection(name)
}
// ensureCollectionForUser creates a collection for the user if it doesn't already exist.
func (s *AgentPoolService) ensureCollectionForUser(userID, name string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
collections, err := backend.ListCollections()
if err != nil {
return err
}
if slices.Contains(collections, name) {
return nil
}
return backend.CreateCollection(name)
}
// UploadToCollectionForUser uploads to a collection for a specific user.
// The filename arrives from a multipart upload; the vendored backend may or
// may not sanitise it, so strip any directory components at the boundary.
func (s *AgentPoolService) UploadToCollectionForUser(userID, collection, filename string, fileBody io.Reader) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
base := filepath.Base(filename)
if base == "." || base == ".." || base == "/" || base == "" {
return "", fmt.Errorf("invalid filename")
}
return backend.Upload(collection, base, fileBody)
}
// CollectionEntryExistsForUser checks if an entry exists in a user's collection.
func (s *AgentPoolService) CollectionEntryExistsForUser(userID, collection, entry string) bool {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return false
}
return backend.EntryExists(collection, entry)
}
// ListCollectionEntriesForUser lists entries in a user's collection.
func (s *AgentPoolService) ListCollectionEntriesForUser(userID, collection string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListEntries(collection)
}
// GetCollectionEntryContentForUser gets entry content for a user's collection.
func (s *AgentPoolService) GetCollectionEntryContentForUser(userID, collection, entry string) (string, int, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", 0, err
}
return backend.GetEntryContent(collection, entry)
}
// SearchCollectionForUser searches a user's collection.
func (s *AgentPoolService) SearchCollectionForUser(userID, collection, query string, maxResults int) ([]collections.SearchResult, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.Search(collection, query, maxResults)
}
// ResetCollectionForUser resets a user's collection.
func (s *AgentPoolService) ResetCollectionForUser(userID, collection string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.Reset(collection)
}
// DeleteCollectionEntryForUser deletes an entry from a user's collection.
func (s *AgentPoolService) DeleteCollectionEntryForUser(userID, collection, entry string) ([]string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.DeleteEntry(collection, entry)
}
// AddCollectionSourceForUser adds a source to a user's collection.
func (s *AgentPoolService) AddCollectionSourceForUser(userID, collection, sourceURL string, intervalMin int) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.AddSource(collection, sourceURL, intervalMin)
}
// RemoveCollectionSourceForUser removes a source from a user's collection.
func (s *AgentPoolService) RemoveCollectionSourceForUser(userID, collection, sourceURL string) error {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return err
}
return backend.RemoveSource(collection, sourceURL)
}
// ListCollectionSourcesForUser lists sources for a user's collection.
func (s *AgentPoolService) ListCollectionSourcesForUser(userID, collection string) ([]collections.SourceInfo, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return nil, err
}
return backend.ListSources(collection)
}
// GetCollectionEntryFilePathForUser gets the file path for an entry in a user's collection.
func (s *AgentPoolService) GetCollectionEntryFilePathForUser(userID, collection, entry string) (string, error) {
backend, err := s.CollectionsBackendForUser(userID)
if err != nil {
return "", err
}
return backend.GetEntryFilePath(collection, entry)
}
// --- ForUser Agent Methods ---
// ListAgentsForUser lists agents belonging to a specific user.
// If userID is empty, returns all agents (backward compat).
func (s *AgentPoolService) ListAgentsForUser(userID string) map[string]bool {
return s.configBackend.ListAgents(userID)
}
// CreateAgentForUser creates an agent namespaced to a user.
// When auth is enabled and the agent config has no API key, a new user API key
// is auto-generated so the agent can authenticate against LocalAI's own API.
func (s *AgentPoolService) CreateAgentForUser(userID string, config *state.AgentConfig) error {
if err := ValidateAgentName(config.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+config.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
xlog.Info("Auto-generated API key for agent", "agent", config.Name, "user", userID)
}
if err := s.configBackend.SaveConfig(userID, config); err != nil {
return err
}
// Auto-create collection when knowledge base or long-term memory is enabled
if config.EnableKnowledgeBase || config.LongTermMemory {
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
}
}
return nil
}
// GetAgentForUser returns the agent for a user.
// Returns nil in distributed mode where agents don't run in-process.
func (s *AgentPoolService) GetAgentForUser(userID, name string) *agent.Agent {
return s.configBackend.GetAgent(userID, name)
}
// GetAgentConfigForUser returns the agent config for a user's agent.
func (s *AgentPoolService) GetAgentConfigForUser(userID, name string) *state.AgentConfig {
return s.configBackend.GetConfig(userID, name)
}
// UpdateAgentForUser updates a user's agent.
func (s *AgentPoolService) UpdateAgentForUser(userID, name string, config *state.AgentConfig) error {
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && config.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
config.APIKey = plaintext
}
if err := s.configBackend.UpdateConfig(userID, name, config); err != nil {
return err
}
// Auto-create collection when knowledge base or long-term memory is enabled
if config.EnableKnowledgeBase || config.LongTermMemory {
if err := s.ensureCollectionForUser(userID, config.Name); err != nil {
xlog.Warn("Failed to auto-create collection for agent", "agent", config.Name, "error", err)
}
}
return nil
}
// DeleteAgentForUser deletes a user's agent.
func (s *AgentPoolService) DeleteAgentForUser(userID, name string) error {
return s.configBackend.DeleteConfig(userID, name)
}
// PauseAgentForUser pauses a user's agent.
func (s *AgentPoolService) PauseAgentForUser(userID, name string) error {
return s.configBackend.SetStatus(userID, name, "paused")
}
// ResumeAgentForUser resumes a user's agent.
func (s *AgentPoolService) ResumeAgentForUser(userID, name string) error {
return s.configBackend.SetStatus(userID, name, "active")
}
// GetAgentStatusForUser returns the status of a user's agent.
// Returns nil in distributed mode where status is not tracked in-process.
func (s *AgentPoolService) GetAgentStatusForUser(userID, name string) *state.Status {
return s.configBackend.GetStatus(userID, name)
}
// GetAgentObservablesForUser returns observables for a user's agent as raw JSON entries.
func (s *AgentPoolService) GetAgentObservablesForUser(userID, name string) ([]json.RawMessage, error) {
return s.configBackend.GetObservables(userID, name)
}
// ClearAgentObservablesForUser clears observables for a user's agent.
func (s *AgentPoolService) ClearAgentObservablesForUser(userID, name string) error {
return s.configBackend.ClearObservables(userID, name)
}
// ChatForUser sends a message to a user's agent.
func (s *AgentPoolService) ChatForUser(userID, name, message string) (string, error) {
return s.configBackend.Chat(userID, name, message)
}
// dispatchChat publishes a chat event to the NATS agent execution queue.
// The event is enriched with the full agent config and resolved skills so that
// the worker does not need direct database access.
func (s *AgentPoolService) dispatchChat(userID, name, message string) (string, error) {
messageID := fmt.Sprintf("%d", time.Now().UnixNano())
// Send user message to SSE immediately so the UI shows it right away
if s.distributed.eventBridge != nil {
agentName := name
s.distributed.eventBridge.PublishMessage(agentName, userID, "user", message, messageID+"-user")
s.distributed.eventBridge.PublishStatus(agentName, userID, "processing")
}
// Load config from DB to embed in the NATS payload
var cfg *agents.AgentConfig
if s.distributed.agentStore != nil {
rec, err := s.distributed.agentStore.GetConfig(userID, name)
if err != nil {
return "", fmt.Errorf("agent config not found: %w", err)
}
var c agents.AgentConfig
if err := agents.ParseConfigJSON(rec.ConfigJSON, &c); err != nil {
return "", fmt.Errorf("invalid agent config: %w", err)
}
cfg = &c
}
// Load skills if enabled — uses SkillManager which reads from filesystem/PostgreSQL
var skills []agents.SkillInfo
if cfg != nil && cfg.EnableSkills {
if loaded, err := s.loadSkillsForUser(userID); err == nil {
skills = loaded
}
}
evt := agents.AgentChatEvent{
AgentName: name,
UserID: userID,
Message: message,
MessageID: messageID,
Role: "user",
Config: cfg,
Skills: skills,
}
if err := s.distributed.natsClient.Publish(messaging.SubjectAgentExecute, evt); err != nil {
return "", fmt.Errorf("failed to dispatch agent chat: %w", err)
}
return messageID, nil
}
// GetSSEManagerForUser returns the SSE manager for a user's agent.
// Returns nil in distributed mode where SSE is handled by EventBridge.
func (s *AgentPoolService) GetSSEManagerForUser(userID, name string) sse.Manager {
return s.configBackend.GetSSEManager(userID, name)
}
// ExportAgentForUser exports a user's agent config.
func (s *AgentPoolService) ExportAgentForUser(userID, name string) ([]byte, error) {
return s.ExportAgent(agents.AgentKey(userID, name))
}
// ImportAgentForUser imports an agent for a user.
func (s *AgentPoolService) ImportAgentForUser(userID string, data []byte) error {
var cfg state.AgentConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("invalid agent config: %w", err)
}
if err := ValidateAgentName(cfg.Name); err != nil {
return err
}
// Auto-generate a user API key when auth is active and none is specified
if s.users.authDB != nil && userID != "" && cfg.APIKey == "" {
plaintext, _, err := auth.CreateAPIKey(s.users.authDB, userID, "agent:"+cfg.Name, "user", s.appConfig.Auth.APIKeyHMACSecret, nil)
if err != nil {
return fmt.Errorf("failed to create API key for agent: %w", err)
}
cfg.APIKey = plaintext
}
return s.configBackend.ImportConfig(userID, &cfg)
}
// --- ForUser Collections ---
// CollectionsBackendForUser returns the collections backend for a user.
func (s *AgentPoolService) CollectionsBackendForUser(userID string) (collections.Backend, error) {
if s.users.userServices == nil || userID == "" {
if s.collectionsBackend == nil {
return nil, fmt.Errorf("collections not available in distributed mode")
}
return s.collectionsBackend, nil
}
return s.users.userServices.GetCollections(userID)
}
// --- ForUser Skills ---
// SkillsServiceForUser returns the skills service for a user.
func (s *AgentPoolService) SkillsServiceForUser(userID string) (*skills.Service, error) {
if s.users.userServices == nil || userID == "" {
if s.localAGI.skillsService == nil {
return nil, fmt.Errorf("skills service not available")
}
return s.localAGI.skillsService, nil
}
return s.users.userServices.GetSkills(userID)
}
// SkillManagerForUser returns a SkillManager for a specific user.
// In distributed mode, returns a DistributedManager that syncs to PostgreSQL.
// In standalone mode, returns a FilesystemManager.
func (s *AgentPoolService) SkillManagerForUser(userID string) (skillsManager.Manager, error) {
svc, err := s.SkillsServiceForUser(userID)
if err != nil {
return nil, err
}
fs := skillsManager.NewFilesystemManager(svc)
// In distributed mode, wrap with PostgreSQL sync
if s.distributed.skillStore != nil {
return skillsManager.NewDistributedManager(fs, s.distributed.skillStore, userID), nil
}
return fs, nil
}
// --- ForUser Jobs ---
// JobServiceForUser returns the agent job service for a user.
func (s *AgentPoolService) JobServiceForUser(userID string) (*AgentJobService, error) {
if s.users.userServices == nil || userID == "" {
return nil, fmt.Errorf("no user services manager or empty user ID")
}
return s.users.userServices.GetJobs(userID)
}
// --- Actions ---
// ListAvailableActions returns the list of all available action type names.
// In distributed mode, returns an empty list (actions are configured as MCP tools per agent).
func (s *AgentPoolService) ListAvailableActions() []string {
return s.configBackend.ListAvailableActions()
}
// GetActionDefinition creates an action instance by name with the given config and returns its definition.
func (s *AgentPoolService) GetActionDefinition(actionName string, actionConfig map[string]string) (any, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
if err != nil {
return nil, err
}
return a.Definition(), nil
}
// ExecuteAction creates an action instance and runs it with the given params.
func (s *AgentPoolService) ExecuteAction(ctx context.Context, actionName string, actionConfig map[string]string, params coreTypes.ActionParams) (coreTypes.ActionResult, error) {
if actionConfig == nil {
actionConfig = map[string]string{}
}
a, err := agiServices.Action(actionName, "", actionConfig, s.localAGI.pool, s.localAGI.actionsConfig)
if err != nil {
return coreTypes.ActionResult{}, err
}
return a.Run(ctx, s.localAGI.sharedState, params)
}
// loadSkillsForUser loads full skill info (name, description, content) for a user.
// Used by dispatchChat and the scheduler to enrich NATS events.
func (s *AgentPoolService) loadSkillsForUser(userID string) ([]agents.SkillInfo, error) {
mgr, err := s.SkillManagerForUser(userID)
if err != nil {
return nil, err
}
allSkills, err := mgr.List()
if err != nil {
return nil, err
}
var skills []agents.SkillInfo
for _, sk := range allSkills {
desc := ""
if sk.Metadata != nil && sk.Metadata.Description != "" {
desc = sk.Metadata.Description
}
if desc == "" {
d := sk.Content
if len(d) > 200 {
d = d[:200] + "..."
}
desc = d
}
skills = append(skills, agents.SkillInfo{
Name: sk.Name,
Description: desc,
Content: sk.Content,
})
}
return skills, nil
}
// buildSkillProvider returns a SkillContentProvider closure for the scheduler.
func (s *AgentPoolService) buildSkillProvider() agents.SkillContentProvider {
return func(userID string) ([]agents.SkillInfo, error) {
return s.loadSkillsForUser(userID)
}
}