Files
LocalAI/core/application/startup.go
LocalAI [bot] a0f3e26245 fix(distributed): make admin backend installs resilient and observable (#9958)
* feat(distributed): add configurable NATS backend install/upgrade timeouts

Adds BackendInstallTimeout and BackendUpgradeTimeout to DistributedConfig
with 15m defaults, following the existing MCPToolTimeout / WorkerWaitTimeout
pattern. These will replace the hardcoded literals in RemoteUnloaderAdapter
so admin-driven backend installs across the cluster survive long OCI image
pulls that previously timed out at 3m.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* style(distributed): gofmt alignment after timeout fields

Re-aligns the Validate() negative-duration map and the Default* const
block so the new BackendInstall/UpgradeTimeout entries do not leave
the surrounding columns mis-padded.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(cli): surface LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT and _UPGRADE_TIMEOUT

Parses the two new env vars on the run CLI and threads them through the
existing AppOption builder so DistributedConfig picks them up. Invalid
duration strings now fail loudly at startup rather than silently falling
back to the default.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter

Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and
threads in DistributedConfig.BackendInstallTimeoutOrDefault() and
BackendUpgradeTimeoutOrDefault() at construction. Install now defaults
to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew
past the old ceiling. Scripted messaging client captures the timeout
so tests can assert the configured value actually reaches the NATS
request.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel

When the NATS request-reply for backend.install (or .upgrade) times out
the worker is almost always still pulling the OCI image. Wrap the timeout
in a typed sentinel so the manager above can distinguish "worker hung"
from "worker still working" and leave the pending_backend_ops row in
place for the reconciler to confirm via backend.list.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): treat NATS install timeout as in-progress, not failure

When a worker times out replying to backend.install but the install is
still running on the worker, enqueueAndDrainBackendOp now reports a
running_on_worker status and pushes NextRetryAt out by the install
timeout so the reconciler does not immediately re-fire another install
while the worker is still pulling the image. The pending_backend_ops
row stays in place for the next reconciler pass to confirm via
backend.list.

InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling
so callers can branch (galleryop renders yellow in-progress instead of
red error). UpgradeBackend uses the same wrap.

Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push
NextRetryAt by the configured timeout without reaching into a private
field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft
cousin of RecordPendingBackendOpFailure.

Also includes incidental gofmt-driven struct-field alignment in
registry.go on lines unrelated to the change (touched files are
re-formatted to canonical form per project policy).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(distributed): don't increment Attempts on in-flight install timeout

An in-flight timeout (worker still pulling the OCI image) is not a
failed attempt, it's a delayed one. Incrementing Attempts let
genuinely-progressing slow installs (e.g. 30 GB CUDA images on Wi-Fi)
trip the reconciler's maxPendingBackendOpAttempts cap and dead-letter
the queue row while the worker was still legitimately working.

RecordPendingBackendOpInFlight now only updates LastError and NextRetryAt.
Also documents "running_on_worker" in the NodeOpStatus.Status enum
comment so Task 6 implementers see the full surface.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): surface ErrWorkerStillInstalling as non-error OpStatus

When the distributed backend manager returns an error that wraps
ErrWorkerStillInstalling, backendHandler now completes the op with a
"still installing in background" message rather than marking it as a
red failure. Admin UI sees a yellow in-progress state; reconciler
confirms completion on its next pass.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): end-to-end install-timeout-then-reconcile

Wires Task 1-6 end-to-end so any seam mismatch surfaces in CI rather
than during a real cluster install. NATS times out, the queue row
stays alive with running_on_worker status, the worker eventually
reports the backend installed via backend.list, the manager surfaces
it via ListBackends.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT / _UPGRADE_TIMEOUT

Add the two new operator-tunable env vars to the Frontend Configuration
table in the distributed-mode docs. Explains the 15m default, when to
raise it (slow links pulling multi-GB OCI images), and the new
"still installing in background" admin-UI state when the round-trip
times out but the worker is still working.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): clear pending install rows when backend.list confirms

DistributedBackendManager.ListBackends now proactively clears
pending_backend_ops install rows whose (nodeID, backend) is reported
installed by backend.list. Operator UI updates immediately instead of
waiting up to installTimeout (default 15m) for the next reconciler
tick after NextRetryAt.

Only install rows are cleared; upgrade and delete intents are not
satisfied by presence in backend.list and continue to drain through
their normal reconciler paths.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(messaging): add BackendInstallProgressEvent wire type and subject

New NATS subject nodes.<nodeID>.backend.install.<opID>.progress lets the
worker publish transient progress events (file, current/total bytes,
percentage, phase) while a long-running install pulls its OCI image.
BackendInstallRequest gains an optional OpID field so the worker knows
which subject to publish on.

Transient pub/sub (not JetStream): the install reply remains ground
truth for success/failure; dropped progress events are tolerable.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* style(messaging): drop em-dash from BackendInstallProgress test comment

Per project convention (no em-dashes anywhere). Comment substance is
unchanged.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): worker publishes debounced install progress over NATS

When BackendInstallRequest.OpID is set, the worker's backend.install
handler wires a debounced publisher (250ms window) into the gallery
download callback. Each tick becomes a BackendInstallProgressEvent on
nodes.<nodeID>.backend.install.<opID>.progress; the publisher always
emits a final event on Flush so the UI sees the terminal percentage.

Old masters that do not set OpID continue to run silent installs: no
behavior change for them. Lock ordering: the publisher releases its
mutex before calling messaging.Publish so a slow network never stalls
the install loop.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): RemoteUnloaderAdapter subscribes to install progress

InstallBackend gains opID + onProgress parameters. When both are set,
the adapter subscribes to nodes.<nodeID>.backend.install.<opID>.progress
BEFORE publishing the install request, decodes each message into the
caller's onProgress callback in a goroutine (so a slow callback never
stalls the NATS reader thread), and unsubscribes after RequestJSON
returns.

When onProgress is nil OR opID is empty (the reconciler retry path),
subscription is skipped entirely - silent installs cost nothing extra.

Subscribe failure is logged at Warn and the install proceeds without
progress streaming; the NATS round-trip still owns terminal status.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): forward backend install progress into galleryop OpStatus

DistributedBackendManager.InstallBackend now passes the gallery op ID
and a progress bridge into the adapter call. Each
BackendInstallProgressEvent from the worker becomes a
galleryop.ProgressCallback tick - which the existing backendHandler
already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling
sees per-byte progress for distributed installs without any UI-side
change.

UpgradeBackend is intentionally left silent for now: its wire request
(BackendUpgradeRequest) does not carry OpID, and rolling-update
fallback is the rarer path. Will be picked up in a follow-up if the
worker upgrade path also gets a progress channel.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(distributed): InstallBackend tolerates silent (pre-Phase-2) workers

A worker on pre-Phase-2 code never publishes progress events. The new
master subscribes optimistically; this spec pins that a silent worker
still produces a green install with no progressCb ticks. The install
reply is the source of truth for terminal state; the progress stream
is a best-effort UX enrichment.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document install progress streaming

Note the new nodes.<nodeID>.backend.install.<opID>.progress subject and
the silent-worker compatibility behavior so operators know to expect
real-time progress and what happens on a mixed-version cluster.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): note progress-event ordering trade-off in InstallBackend

Document near the goroutine dispatch why ordering at the consumer is
best-effort, why it rarely matters in practice (worker debounce >>
goroutine jitter), and what a future hardening pass would look like
(Seq field + stale-by-seq drop). Stops the next reader from accidentally
"fixing" the goroutine pool away.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): add NodeProgress + OpStatus.Nodes for per-node breakdown

Adds the data model the UI needs to render an expandable per-node
breakdown of a fanned-out backend install. NodeProgress carries node
identity (ID + name), per-node status (queued / running_on_worker /
success / error / downloading), the current file + bytes + percentage
from the Phase 2 progress stream, and any per-node error.

OpStatus.Nodes is the slice the /api/operations handler will surface
in a follow-up.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): UpdateNodeProgress merges per-node ticks by NodeID

GalleryService.UpdateNodeProgress(opID, nodeID, np) merges a NodeProgress
into OpStatus.Nodes (keyed by NodeID, no duplicates) and mirrors the
latest tick into the aggregate Progress / FileName /
DownloadedFileSize / TotalFileSize fields so the legacy single-bar
OperationsBar view keeps working unchanged alongside the new per-node
breakdown.

Concurrent-safe via the existing g.Mutex.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(distributed): write per-node OpStatus entries during install fan-out

DistributedBackendManager now accepts a nodeProgressSink and feeds it
two streams:

1. enqueueAndDrainBackendOp emits a per-node terminal entry on each
   status it appends to BackendOpResult (queued, success, error,
   running_on_worker). The opID is threaded through the function so
   the sink gets the right gallery op identity.

2. The install apply closure fans each BackendInstallProgressEvent
   into the sink as a downloading entry, alongside the legacy
   progressCb path so the aggregate single-bar view stays correct.

Production wiring passes the GalleryService (which implements
UpdateNodeProgress via Task 2) as the sink. Single-node tests pass
nil. DeleteBackend and UpgradeBackend pass an empty opID so the
sink path no-ops for ops that aren't gallery-tracked the same way
as Install.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(operations): expose per-node breakdown on /api/operations

When an operation's OpStatus has Nodes entries (populated by the
Phase 4 progress sink wiring), surface them as a "nodes" array on the
/api/operations response, sorted by node_name for stable rendering.

Backward compatible: legacy clients ignore the field; ops without any
node entries (single-node mode, model installs) omit the array entirely
thanks to the empty-slice guard.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): per-node breakdown in OperationsBar

When an install op fans out to more than one worker, the operations
bar now shows a "N nodes" chevron that expands into a per-node list.
Each row carries the node's status (color-coded pill), the current
file being downloaded, byte counts, percentage, and a thin per-node
progress bar. Yellow "Worker busy" pill marks running_on_worker
status with a tooltip explaining the NATS round-trip timed out but
the worker is still installing in the background.

Backward compatible: ops without a nodes field (legacy or single-node
mode) render as before. State for expand/collapse is local to the
component, keyed by jobID/id - reload starts collapsed.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): document per-node breakdown in the operations bar

Adds a short subsection covering the expandable "N nodes" chevron in
the OperationsBar admin UI, the meaning of each status pill, and
how it relates to the /api/operations nodes array.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(galleryop): UpdateStatus preserves Nodes when caller sends none

Real-world bug surfaced by the Phase 4 multi-worker smoke test: the
nodes[] array in /api/operations flickered between a single node at a
time on a 2-worker install. Root cause: the Phase 2 progress bridge
also calls the legacy progressCb -> UpdateStatus(&OpStatus{...}) on
every tick. UpdateStatus then overwrote the entire status pointer,
wiping the Nodes slice that UpdateNodeProgress had just merged in.

Fix: in UpdateStatus, if the incoming op has an empty Nodes slice,
carry forward the previous status's Nodes before storing. Callers
that explicitly populate Nodes still win (their slice replaces the
prior one, no merge across the two code paths).

Two regression specs added pinning both directions of the contract.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(distributed): strip implementation details from user-facing docs

Trim the new install/upgrade timeout rows and the install-progress
sections to focus on what the operator sees and tunes. Drops:

- the NATS subject names and pub/sub mechanics
- "round-trip" / reconciler / backend.list jargon
- /api/operations polling cadence
- "pre-2026-05-22" version references

Reframes the breakdown text around the admin UI (Operations Bar,
chevron, status pills, "Worker busy" tooltip). Implementation context
lives in the agent notes and code comments.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(config): move DistributedConfig.Validate flag names to constants

The negative-duration check map was a wall of literal kebab-case
strings that had to stay in sync with the kong-derived CLI flag names
manually. Move them to a Flag* const block alongside the existing
Default* block so a rename of either the Go field or the CLI naming
convention forces a compile error rather than silent drift.

Sole consumer today is Validate; the constants are exported so future
operator-facing surfaces (e.g. error messages on other validation
paths) can reference them by name instead of repeating the literals.

Tests pin both the literal values (so a future "let's just rename
this" doesn't accidentally regress the CLI flag) and the negative-
duration error message for the new BackendInstall / BackendUpgrade
fields.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed): extract NodeStatus and Phase enums to constants

Sweep for the same literal-string-as-identifier pattern called out on
the Validate flag names: the per-node install status enum
("queued" | "downloading" | "running_on_worker" | "success" | "error")
appeared as raw literals across managers_distributed.go (10+ sites,
including 3 separate `n.Status == "running_on_worker"` checks),
operation.go, and the test suite. Same shape for the Phase enum
("resolving" | "downloading" | "extracting" | "starting") in the
worker-side progress publisher.

Promote both to exported const blocks:

- galleryop.NodeStatus{Queued,Downloading,RunningOnWorker,Success,Error}
  shared between galleryop.NodeProgress.Status (the wire field) and
  nodes.NodeOpStatus.Status (the in-process per-node summary)
- messaging.Phase{Resolving,Downloading,Extracting,Starting}
  shared between the worker publisher and any future consumer that
  needs to switch on phase

Tests pin both the literal values (so a future "let's just rename" doesn't
silently change the JSON wire) and use the constants in setup (so the
producer side stays drift-protected). Wire-format assertions on the
/api/operations JSON output keep their literals deliberately, so the
constant value can never silently diverge from what the UI receives.

Out of scope for this PR (separate cleanup): the finetune and
quantization job-status enums have the same anti-pattern with 14+
literal sites each, but predate this PR's work.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-23 12:35:44 +02:00

780 lines
30 KiB
Go

package application
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/sanitize"
"github.com/mudler/LocalAI/pkg/xsysinfo"
"github.com/mudler/xlog"
)
func New(opts ...config.AppOption) (*Application, error) {
options := config.NewApplicationConfig(opts...)
// Store a copy of the startup config (from env vars, before file loading)
// This is used to determine if settings came from env vars vs file
startupConfigCopy := *options
application := newApplication(options)
application.startupConfig = &startupConfigCopy
xlog.Info("Starting LocalAI", "threads", options.Threads, "modelsPath", options.SystemState.Model.ModelsPath)
xlog.Info("LocalAI version", "version", internal.PrintableVersion())
if err := application.start(); err != nil {
return nil, err
}
caps, err := xsysinfo.CPUCapabilities()
if err == nil {
xlog.Debug("CPU capabilities", "capabilities", caps)
}
gpus, err := xsysinfo.GPUs()
if err == nil {
xlog.Debug("GPU count", "count", len(gpus))
for _, gpu := range gpus {
xlog.Debug("GPU", "gpu", gpu.String())
}
}
// Make sure directories exists
if options.SystemState.Model.ModelsPath == "" {
return nil, fmt.Errorf("models path cannot be empty")
}
err = os.MkdirAll(options.SystemState.Model.ModelsPath, 0750)
if err != nil {
return nil, fmt.Errorf("unable to create ModelPath: %q", err)
}
if options.GeneratedContentDir != "" {
err := os.MkdirAll(options.GeneratedContentDir, 0750)
if err != nil {
return nil, fmt.Errorf("unable to create ImageDir: %q", err)
}
}
if options.UploadDir != "" {
err := os.MkdirAll(options.UploadDir, 0750)
if err != nil {
return nil, fmt.Errorf("unable to create UploadDir: %q", err)
}
}
// Create and migrate data directory
if options.DataPath != "" {
if err := os.MkdirAll(options.DataPath, 0750); err != nil {
return nil, fmt.Errorf("unable to create DataPath: %q", err)
}
// Migrate data from DynamicConfigsDir to DataPath if needed
if options.DynamicConfigsDir != "" && options.DataPath != options.DynamicConfigsDir {
migrateDataFiles(options.DynamicConfigsDir, options.DataPath)
}
}
// Initialize auth database if auth is enabled
if options.Auth.Enabled {
// Auto-generate HMAC secret if not provided
if options.Auth.APIKeyHMACSecret == "" {
secretFile := filepath.Join(options.DataPath, ".hmac_secret")
secret, err := loadOrGenerateHMACSecret(secretFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize HMAC secret: %w", err)
}
options.Auth.APIKeyHMACSecret = secret
}
authDB, err := auth.InitDB(options.Auth.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("failed to initialize auth database: %w", err)
}
application.authDB = authDB
xlog.Info("Auth enabled", "database", sanitize.URL(options.Auth.DatabaseURL))
// Start session and expired API key cleanup goroutine
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-options.Context.Done():
return
case <-ticker.C:
if err := auth.CleanExpiredSessions(authDB); err != nil {
xlog.Error("failed to clean expired sessions", "error", err)
}
if err := auth.CleanExpiredAPIKeys(authDB); err != nil {
xlog.Error("failed to clean expired API keys", "error", err)
}
}
}
}()
}
// Wire JobStore for DB-backed task/job persistence whenever auth DB is available.
// This ensures tasks and jobs survive restarts in both single-node and distributed modes.
if application.authDB != nil && application.agentJobService != nil {
dbJobStore, err := jobs.NewJobStore(application.authDB)
if err != nil {
xlog.Error("Failed to create job store for auth DB", "error", err)
} else {
application.agentJobService.SetDistributedJobStore(dbJobStore)
}
}
// Initialize distributed mode services (NATS, object storage, node registry)
distSvc, err := initDistributed(options, application.authDB, application.ModelConfigLoader())
if err != nil {
return nil, fmt.Errorf("distributed mode initialization failed: %w", err)
}
if distSvc != nil {
application.distributed = distSvc
// Wire remote model unloader so ShutdownModel works for remote nodes
// Uses NATS to tell serve-backend nodes to Free + kill their backend process
application.modelLoader.SetRemoteUnloader(distSvc.Unloader)
// Wire ModelRouter so grpcModel() delegates to SmartRouter in distributed mode
application.modelLoader.SetModelRouter(distSvc.ModelAdapter.AsModelRouter())
// Wire DistributedModelStore so shutdown/list/watchdog can find remote models
distStore := nodes.NewDistributedModelStore(
model.NewInMemoryModelStore(),
distSvc.Registry,
)
application.modelLoader.SetModelStore(distStore)
// Start health monitor
distSvc.Health.Start(options.Context)
// Start replica reconciler for auto-scaling model replicas
if distSvc.Reconciler != nil {
go distSvc.Reconciler.Run(options.Context)
}
// In distributed mode, MCP CI jobs are executed by agent workers (not the frontend)
// because the frontend can't create MCP sessions (e.g., stdio servers using docker).
// The dispatcher still subscribes to jobs.new for persistence (result/progress subs)
// but does NOT set a workerFn — agent workers consume jobs from the same NATS queue.
// Wire model config loader so job events include model config for agent workers
distSvc.Dispatcher.SetModelConfigLoader(application.backendLoader)
// Start job dispatcher — abort startup if it fails, as jobs would be accepted but never dispatched
if err := distSvc.Dispatcher.Start(options.Context); err != nil {
return nil, fmt.Errorf("starting job dispatcher: %w", err)
}
// Start ephemeral file cleanup
storage.StartEphemeralCleanup(options.Context, distSvc.FileMgr, 0, 0)
// Wire distributed backends into AgentJobService (before Start)
if application.agentJobService != nil {
application.agentJobService.SetDistributedBackends(distSvc.Dispatcher)
application.agentJobService.SetDistributedJobStore(distSvc.JobStore)
}
// Wire skill store into AgentPoolService (wired at pool start time via closure)
// The actual wiring happens in StartAgentPool since the pool doesn't exist yet.
// Wire NATS and gallery store into GalleryService for cross-instance progress/cancel
if application.galleryService != nil {
application.galleryService.SetNATSClient(distSvc.Nats)
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
// Clean up stale in-progress operations from previous crashed instances
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
xlog.Warn("Failed to clean stale gallery operations", "error", err)
}
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
}
// Wire distributed model/backend managers so delete propagates to workers
application.galleryService.SetModelManager(
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
)
application.galleryService.SetBackendManager(
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
)
}
}
// Start AgentJobService (after distributed wiring so it knows whether to use local or NATS)
if application.agentJobService != nil {
if err := application.agentJobService.Start(options.Context); err != nil {
return nil, fmt.Errorf("starting agent job service: %w", err)
}
}
if err := coreStartup.InstallModels(options.Context, application.GalleryService(), options.Galleries, options.BackendGalleries, options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.RequireBackendIntegrity, nil, options.ModelsURL...); err != nil {
xlog.Error("error installing models", "error", err)
}
for _, backend := range options.ExternalBackends {
if err := galleryop.InstallExternalBackend(options.Context, options.BackendGalleries, options.SystemState, application.ModelLoader(), nil, backend, "", "", options.RequireBackendIntegrity); err != nil {
xlog.Error("error installing external backend", "error", err)
}
}
configLoaderOpts := options.ToConfigLoaderOptions()
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil {
xlog.Error("error loading config files", "error", err)
}
if err := gallery.RegisterBackends(options.SystemState, application.ModelLoader()); err != nil {
xlog.Error("error registering external backends", "error", err)
}
// Start background upgrade checker for backends.
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
// instance runs periodic checks (avoids duplicate upgrades across replicas).
if len(options.BackendGalleries) > 0 {
// Pass a lazy getter for the backend manager so the checker always
// uses the active one — DistributedBackendManager is swapped in above
// and asks workers for their installed backends, which is what
// upgrade detection needs in distributed mode.
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
application.upgradeChecker = uc
// Refresh the upgrade cache the moment a backend op finishes — otherwise
// the UI keeps showing a just-upgraded backend as upgradeable until the
// next 6-hour tick. TriggerCheck is non-blocking.
if gs := application.GalleryService(); gs != nil {
gs.OnBackendOpCompleted = uc.TriggerCheck
}
go uc.Run(options.Context)
}
// Wire gallery generation counter into VRAM caches so they invalidate
// when gallery data refreshes instead of using a fixed TTL.
vram.SetGalleryGenerationFunc(gallery.GalleryGeneration)
if options.ConfigFile != "" {
if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil {
xlog.Error("error loading config file", "error", err)
}
}
if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil {
xlog.Error("error downloading models", "error", err)
}
if options.PreloadJSONModels != "" {
if err := galleryop.ApplyGalleryFromString(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadJSONModels, options.RequireBackendIntegrity); err != nil {
return nil, err
}
}
if options.PreloadModelsFromPath != "" {
if err := galleryop.ApplyGalleryFromFile(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadModelsFromPath, options.RequireBackendIntegrity); err != nil {
return nil, err
}
}
if options.Debug {
for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() {
xlog.Debug("Model", "name", v.Name, "config", v)
}
}
// Load runtime settings from file if DynamicConfigsDir is set
// This applies file settings with env var precedence (env vars take priority)
// Note: startupConfigCopy was already created above, so it has the original env var values
if options.DynamicConfigsDir != "" {
loadRuntimeSettingsFromFile(options)
}
application.ModelLoader().SetBackendLoggingEnabled(options.EnableBackendLogging)
// turn off any process that was started by GRPC if the context is canceled
go func() {
<-options.Context.Done()
xlog.Debug("Context canceled, shutting down")
application.distributed.Shutdown()
err := application.ModelLoader().StopAllGRPC()
if err != nil {
xlog.Error("error while stopping all grpc backends", "error", err)
}
}()
// Initialize watchdog with current settings (after loading from file)
initializeWatchdog(application, options)
if options.LoadToMemory != nil && !options.SingleBackend {
for _, m := range options.LoadToMemory {
cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options)
if err != nil {
return nil, err
}
xlog.Debug("Auto loading model into memory from file", "model", m, "file", cfg.Model)
o := backend.ModelOptions(*cfg, options)
var backendErr error
_, backendErr = application.ModelLoader().Load(o...)
if backendErr != nil {
return nil, backendErr
}
}
}
// Watch the configuration directory
startWatcher(options)
xlog.Info("core/startup process completed!")
return application, nil
}
func startWatcher(options *config.ApplicationConfig) {
if options.DynamicConfigsDir == "" {
// No need to start the watcher if the directory is not set
return
}
if _, err := os.Stat(options.DynamicConfigsDir); err != nil {
if os.IsNotExist(err) {
// We try to create the directory if it does not exist and was specified
if err := os.MkdirAll(options.DynamicConfigsDir, 0700); err != nil {
xlog.Error("failed creating DynamicConfigsDir", "error", err)
}
} else {
// something else happened, we log the error and don't start the watcher
xlog.Error("failed to read DynamicConfigsDir, watcher will not be started", "error", err)
return
}
}
configHandler := newConfigFileHandler(options)
if err := configHandler.Watch(); err != nil {
xlog.Error("failed creating watcher", "error", err)
}
}
// loadRuntimeSettingsFromFile loads settings from runtime_settings.json with env var precedence
// This function is called at startup, before env vars are applied via AppOptions.
// Since env vars are applied via AppOptions in run.go, we need to check if they're set.
// We do this by checking if the current options values differ from defaults, which would
// indicate they were set from env vars. However, a simpler approach is to just apply
// file settings here, and let the AppOptions (which are applied after this) override them.
// But actually, this is called AFTER AppOptions are applied in New(), so we need to check env vars.
// The cleanest solution: Store original values before applying file, or check if values match
// what would be set from env vars. For now, we'll apply file settings and they'll be
// overridden by AppOptions if env vars were set (but AppOptions are already applied).
// Actually, this function is called in New() before AppOptions are fully processed for watchdog.
// Let's check the call order: New() -> loadRuntimeSettingsFromFile() -> initializeWatchdog()
// But AppOptions are applied in NewApplicationConfig() which is called first.
// So at this point, options already has values from env vars. We should compare against
// defaults to see if env vars were set. But we don't have defaults stored.
// Simplest: Just apply file settings. If env vars were set, they're already in options.
// The file watcher handler will handle runtime changes properly by comparing with startupAppConfig.
func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
settingsFile := filepath.Join(options.DynamicConfigsDir, "runtime_settings.json")
fileContent, err := os.ReadFile(settingsFile)
if err != nil {
if os.IsNotExist(err) {
xlog.Debug("runtime_settings.json not found, using defaults")
return
}
xlog.Warn("failed to read runtime_settings.json", "error", err)
return
}
var settings config.RuntimeSettings
if err := json.Unmarshal(fileContent, &settings); err != nil {
xlog.Warn("failed to parse runtime_settings.json", "error", err)
return
}
// At this point, options already has values from env vars (via AppOptions in run.go).
// To avoid env var duplication, we determine if env vars were set by checking if
// current values differ from defaults. Defaults are: false for bools, 0 for durations.
// If current value is at default, it likely wasn't set from env var, so we can apply file.
// If current value is non-default, it was likely set from env var, so we preserve it.
// Note: This means env vars explicitly setting to false/0 won't be distinguishable from defaults,
// but that's an acceptable limitation to avoid env var duplication.
if settings.WatchdogIdleEnabled != nil {
// Only apply if current value is default (false), suggesting it wasn't set from env var
if !options.WatchDogIdle {
options.WatchDogIdle = *settings.WatchdogIdleEnabled
if options.WatchDogIdle {
options.WatchDog = true
}
}
}
if settings.WatchdogBusyEnabled != nil {
if !options.WatchDogBusy {
options.WatchDogBusy = *settings.WatchdogBusyEnabled
if options.WatchDogBusy {
options.WatchDog = true
}
}
}
if settings.WatchdogIdleTimeout != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.WatchDogIdleTimeout == 0 {
dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout)
if err == nil {
options.WatchDogIdleTimeout = dur
} else {
xlog.Warn("invalid watchdog idle timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogIdleTimeout)
}
}
}
if settings.WatchdogBusyTimeout != nil {
if options.WatchDogBusyTimeout == 0 {
dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout)
if err == nil {
options.WatchDogBusyTimeout = dur
} else {
xlog.Warn("invalid watchdog busy timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogBusyTimeout)
}
}
}
if settings.WatchdogInterval != nil {
if options.WatchDogInterval == 0 {
dur, err := time.ParseDuration(*settings.WatchdogInterval)
if err == nil {
options.WatchDogInterval = dur
} else {
xlog.Warn("invalid watchdog interval in runtime_settings.json", "error", err, "interval", *settings.WatchdogInterval)
options.WatchDogInterval = model.DefaultWatchdogInterval
}
}
}
// Handle MaxActiveBackends (new) and SingleBackend (deprecated)
if settings.MaxActiveBackends != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.MaxActiveBackends == 0 {
options.MaxActiveBackends = *settings.MaxActiveBackends
// For backward compatibility, also set SingleBackend if MaxActiveBackends == 1
options.SingleBackend = (*settings.MaxActiveBackends == 1)
}
} else if settings.SingleBackend != nil {
// Legacy: SingleBackend maps to MaxActiveBackends = 1
if !options.SingleBackend {
options.SingleBackend = *settings.SingleBackend
if *settings.SingleBackend {
options.MaxActiveBackends = 1
}
}
}
if settings.MemoryReclaimerEnabled != nil {
// Only apply if current value is default (false), suggesting it wasn't set from env var
if !options.MemoryReclaimerEnabled {
options.MemoryReclaimerEnabled = *settings.MemoryReclaimerEnabled
if options.MemoryReclaimerEnabled {
options.WatchDog = true // Memory reclaimer requires watchdog
}
}
}
if settings.MemoryReclaimerThreshold != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.MemoryReclaimerThreshold == 0 {
options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold
}
}
if settings.ForceEvictionWhenBusy != nil {
// Only apply if current value is default (false), suggesting it wasn't set from env var
if !options.ForceEvictionWhenBusy {
options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy
}
}
if settings.LRUEvictionMaxRetries != nil {
// Only apply if current value is default (30), suggesting it wasn't set from env var
if options.LRUEvictionMaxRetries == 0 {
options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries
}
}
if settings.LRUEvictionRetryInterval != nil {
// Only apply if current value is default (1s), suggesting it wasn't set from env var
if options.LRUEvictionRetryInterval == 0 {
dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval)
if err == nil {
options.LRUEvictionRetryInterval = dur
} else {
xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval)
}
}
}
if settings.AgentJobRetentionDays != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.AgentJobRetentionDays == 0 {
options.AgentJobRetentionDays = *settings.AgentJobRetentionDays
}
}
if !options.WatchDogIdle && !options.WatchDogBusy {
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
options.WatchDog = true
}
}
// P2P settings
if settings.P2PToken != nil {
if options.P2PToken == "" {
options.P2PToken = *settings.P2PToken
}
}
if settings.P2PNetworkID != nil {
if options.P2PNetworkID == "" {
options.P2PNetworkID = *settings.P2PNetworkID
}
}
if settings.Federated != nil {
if !options.Federated {
options.Federated = *settings.Federated
}
}
if settings.EnableBackendLogging != nil {
if !options.EnableBackendLogging {
options.EnableBackendLogging = *settings.EnableBackendLogging
}
}
// Tracing settings
if settings.EnableTracing != nil {
if !options.EnableTracing {
options.EnableTracing = *settings.EnableTracing
}
}
if settings.TracingMaxItems != nil {
if options.TracingMaxItems == 0 {
options.TracingMaxItems = *settings.TracingMaxItems
}
}
if settings.TracingMaxBodyBytes != nil {
// Allow the on-disk setting to override the CLI/env default. The
// startup default is non-zero (see NewApplicationConfig), so a plain
// `== 0` guard like the others would never trigger; we instead respect
// any value the file specifies. 0 in the file means "uncapped".
options.TracingMaxBodyBytes = *settings.TracingMaxBodyBytes
}
// Branding / whitelabeling. There are no env vars for these — the file is
// the only source — so apply unconditionally. Without this block a server
// restart silently drops the configured instance name, tagline, and asset
// filenames.
if settings.InstanceName != nil {
options.Branding.InstanceName = *settings.InstanceName
}
if settings.InstanceTagline != nil {
options.Branding.InstanceTagline = *settings.InstanceTagline
}
if settings.LogoFile != nil {
options.Branding.LogoFile = *settings.LogoFile
}
if settings.LogoHorizontalFile != nil {
options.Branding.LogoHorizontalFile = *settings.LogoHorizontalFile
}
if settings.FaviconFile != nil {
options.Branding.FaviconFile = *settings.FaviconFile
}
// Backend upgrade flags
if settings.AutoUpgradeBackends != nil {
if !options.AutoUpgradeBackends {
options.AutoUpgradeBackends = *settings.AutoUpgradeBackends
}
}
if settings.PreferDevelopmentBackends != nil {
if !options.PreferDevelopmentBackends {
options.PreferDevelopmentBackends = *settings.PreferDevelopmentBackends
}
}
// LocalAI Assistant — file-stored as the negation (LocalAIAssistantEnabled).
// Default is enabled (DisableLocalAIAssistant=false). Apply the file value
// unless env explicitly disabled the assistant (DisableLocalAIAssistant=true).
if settings.LocalAIAssistantEnabled != nil {
if !options.DisableLocalAIAssistant {
options.DisableLocalAIAssistant = !*settings.LocalAIAssistantEnabled
}
}
// Open Responses TTL. Default is 0 (no expiration). Treat the on-disk
// "0"/empty as "no expiration" — a no-op since options is already 0 —
// and parse anything else as a duration.
if settings.OpenResponsesStoreTTL != nil && options.OpenResponsesStoreTTL == 0 {
v := *settings.OpenResponsesStoreTTL
if v != "0" && v != "" {
if dur, err := time.ParseDuration(v); err == nil {
options.OpenResponsesStoreTTL = dur
} else {
xlog.Warn("invalid open_responses_store_ttl in runtime_settings.json", "error", err, "ttl", v)
}
}
}
// Agent Pool. NewApplicationConfig seeds non-zero defaults for some of
// these fields (Enabled=true, EmbeddingModel="granite-embedding-107m-
// multilingual", MaxChunkingSize=400). The "if at default, apply file"
// gate uses each field's actual default literal so file values can
// override the bootstrap default while still letting an env-set value
// (e.g. WithAgentPoolEmbeddingModel from a flag) win.
if settings.AgentPoolEnabled != nil && options.AgentPool.Enabled {
options.AgentPool.Enabled = *settings.AgentPoolEnabled
}
if settings.AgentPoolDefaultModel != nil && options.AgentPool.DefaultModel == "" {
options.AgentPool.DefaultModel = *settings.AgentPoolDefaultModel
}
if settings.AgentPoolEmbeddingModel != nil {
if options.AgentPool.EmbeddingModel == "" || options.AgentPool.EmbeddingModel == "granite-embedding-107m-multilingual" {
options.AgentPool.EmbeddingModel = *settings.AgentPoolEmbeddingModel
}
}
if settings.AgentPoolMaxChunkingSize != nil {
if options.AgentPool.MaxChunkingSize == 0 || options.AgentPool.MaxChunkingSize == 400 {
options.AgentPool.MaxChunkingSize = *settings.AgentPoolMaxChunkingSize
}
}
if settings.AgentPoolChunkOverlap != nil && options.AgentPool.ChunkOverlap == 0 {
options.AgentPool.ChunkOverlap = *settings.AgentPoolChunkOverlap
}
if settings.AgentPoolEnableLogs != nil && !options.AgentPool.EnableLogs {
options.AgentPool.EnableLogs = *settings.AgentPoolEnableLogs
}
if settings.AgentPoolCollectionDBPath != nil && options.AgentPool.CollectionDBPath == "" {
options.AgentPool.CollectionDBPath = *settings.AgentPoolCollectionDBPath
}
if settings.AgentPoolVectorEngine != nil {
// Default is "chromem"; treat both that and empty as "not env-set".
if options.AgentPool.VectorEngine == "" || options.AgentPool.VectorEngine == "chromem" {
options.AgentPool.VectorEngine = *settings.AgentPoolVectorEngine
}
}
if settings.AgentPoolDatabaseURL != nil && options.AgentPool.DatabaseURL == "" {
options.AgentPool.DatabaseURL = *settings.AgentPoolDatabaseURL
}
if settings.AgentPoolAgentHubURL != nil {
// Default is "https://agenthub.localai.io"; treat both that and empty
// as "not env-set".
if options.AgentPool.AgentHubURL == "" || options.AgentPool.AgentHubURL == "https://agenthub.localai.io" {
options.AgentPool.AgentHubURL = *settings.AgentPoolAgentHubURL
}
}
xlog.Debug("Runtime settings loaded from runtime_settings.json")
}
// initializeWatchdog initializes the watchdog with current ApplicationConfig settings
func initializeWatchdog(application *Application, options *config.ApplicationConfig) {
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
lruLimit := options.GetEffectiveMaxActiveBackends()
// Create watchdog if enabled OR if LRU limit is set OR if memory reclaimer is enabled
if options.WatchDog || lruLimit > 0 || options.MemoryReclaimerEnabled {
wd := model.NewWatchDog(
model.WithProcessManager(application.ModelLoader()),
model.WithBusyTimeout(options.WatchDogBusyTimeout),
model.WithIdleTimeout(options.WatchDogIdleTimeout),
model.WithWatchdogInterval(options.WatchDogInterval),
model.WithBusyCheck(options.WatchDogBusy),
model.WithIdleCheck(options.WatchDogIdle),
model.WithLRULimit(lruLimit),
model.WithMemoryReclaimer(options.MemoryReclaimerEnabled, options.MemoryReclaimerThreshold),
model.WithForceEvictionWhenBusy(options.ForceEvictionWhenBusy),
)
application.ModelLoader().SetWatchDog(wd)
// Initialize ModelLoader LRU eviction retry settings
application.ModelLoader().SetLRUEvictionRetrySettings(
options.LRUEvictionMaxRetries,
options.LRUEvictionRetryInterval,
)
// Sync per-model state from configs to the watchdog. Without this,
// `pinned: true` and `concurrency_groups:` are only honored after a
// settings-driven RestartWatchdog and never at boot.
application.SyncPinnedModelsToWatchdog()
application.SyncModelGroupsToWatchdog()
// Start watchdog goroutine if any periodic checks are enabled
// LRU eviction doesn't need the Run() loop - it's triggered on model load
// But memory reclaimer needs the Run() loop for periodic checking
if options.WatchDogBusy || options.WatchDogIdle || options.MemoryReclaimerEnabled {
go wd.Run()
}
go func() {
<-options.Context.Done()
xlog.Debug("Context canceled, shutting down")
wd.Shutdown()
}()
}
}
// loadOrGenerateHMACSecret loads an HMAC secret from the given file path,
// or generates a random 32-byte secret and persists it if the file doesn't exist.
func loadOrGenerateHMACSecret(path string) (string, error) {
data, err := os.ReadFile(path)
if err == nil {
secret := string(data)
if len(secret) >= 32 {
return secret, nil
}
}
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("failed to generate HMAC secret: %w", err)
}
secret := hex.EncodeToString(b)
if err := os.WriteFile(path, []byte(secret), 0600); err != nil {
return "", fmt.Errorf("failed to persist HMAC secret: %w", err)
}
xlog.Info("Generated new HMAC secret for API key hashing", "path", path)
return secret, nil
}
// migrateDataFiles moves persistent data files from the old config directory
// to the new data directory. Only moves files that exist in src but not in dst.
func migrateDataFiles(srcDir, dstDir string) {
// Files and directories to migrate
items := []string{
"agent_tasks.json",
"agent_jobs.json",
"collections",
"assets",
}
migrated := false
for _, item := range items {
srcPath := filepath.Join(srcDir, item)
dstPath := filepath.Join(dstDir, item)
// Only migrate if source exists and destination does not
if _, err := os.Stat(srcPath); os.IsNotExist(err) {
continue
}
if _, err := os.Stat(dstPath); err == nil {
continue // destination already exists, skip
}
if err := os.Rename(srcPath, dstPath); err != nil {
xlog.Warn("Failed to migrate data file, will copy instead", "src", srcPath, "dst", dstPath, "error", err)
// os.Rename fails across filesystems, fall back to leaving in place
// and log a warning for the user to manually move
xlog.Warn("Data file remains in old location, please move manually", "src", srcPath, "dst", dstPath)
continue
}
migrated = true
xlog.Info("Migrated data file to new data path", "src", srcPath, "dst", dstPath)
}
if migrated {
xlog.Info("Data migration complete", "from", srcDir, "to", dstDir)
}
}