mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-31 12:07:45 -04:00
* feat(distributed): add configurable NATS backend install/upgrade timeouts Adds BackendInstallTimeout and BackendUpgradeTimeout to DistributedConfig with 15m defaults, following the existing MCPToolTimeout / WorkerWaitTimeout pattern. These will replace the hardcoded literals in RemoteUnloaderAdapter so admin-driven backend installs across the cluster survive long OCI image pulls that previously timed out at 3m. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * style(distributed): gofmt alignment after timeout fields Re-aligns the Validate() negative-duration map and the Default* const block so the new BackendInstall/UpgradeTimeout entries do not leave the surrounding columns mis-padded. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(cli): surface LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT and _UPGRADE_TIMEOUT Parses the two new env vars on the run CLI and threads them through the existing AppOption builder so DistributedConfig picks them up. Invalid duration strings now fail loudly at startup rather than silently falling back to the default. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and threads in DistributedConfig.BackendInstallTimeoutOrDefault() and BackendUpgradeTimeoutOrDefault() at construction. Install now defaults to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew past the old ceiling. Scripted messaging client captures the timeout so tests can assert the configured value actually reaches the NATS request. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel When the NATS request-reply for backend.install (or .upgrade) times out the worker is almost always still pulling the OCI image. Wrap the timeout in a typed sentinel so the manager above can distinguish "worker hung" from "worker still working" and leave the pending_backend_ops row in place for the reconciler to confirm via backend.list. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): treat NATS install timeout as in-progress, not failure When a worker times out replying to backend.install but the install is still running on the worker, enqueueAndDrainBackendOp now reports a running_on_worker status and pushes NextRetryAt out by the install timeout so the reconciler does not immediately re-fire another install while the worker is still pulling the image. The pending_backend_ops row stays in place for the next reconciler pass to confirm via backend.list. InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling so callers can branch (galleryop renders yellow in-progress instead of red error). UpgradeBackend uses the same wrap. Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push NextRetryAt by the configured timeout without reaching into a private field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft cousin of RecordPendingBackendOpFailure. Also includes incidental gofmt-driven struct-field alignment in registry.go on lines unrelated to the change (touched files are re-formatted to canonical form per project policy). Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(distributed): don't increment Attempts on in-flight install timeout An in-flight timeout (worker still pulling the OCI image) is not a failed attempt, it's a delayed one. Incrementing Attempts let genuinely-progressing slow installs (e.g. 30 GB CUDA images on Wi-Fi) trip the reconciler's maxPendingBackendOpAttempts cap and dead-letter the queue row while the worker was still legitimately working. RecordPendingBackendOpInFlight now only updates LastError and NextRetryAt. Also documents "running_on_worker" in the NodeOpStatus.Status enum comment so Task 6 implementers see the full surface. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(galleryop): surface ErrWorkerStillInstalling as non-error OpStatus When the distributed backend manager returns an error that wraps ErrWorkerStillInstalling, backendHandler now completes the op with a "still installing in background" message rather than marking it as a red failure. Admin UI sees a yellow in-progress state; reconciler confirms completion on its next pass. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * test(distributed): end-to-end install-timeout-then-reconcile Wires Task 1-6 end-to-end so any seam mismatch surfaces in CI rather than during a real cluster install. NATS times out, the queue row stays alive with running_on_worker status, the worker eventually reports the backend installed via backend.list, the manager surfaces it via ListBackends. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): document LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT / _UPGRADE_TIMEOUT Add the two new operator-tunable env vars to the Frontend Configuration table in the distributed-mode docs. Explains the 15m default, when to raise it (slow links pulling multi-GB OCI images), and the new "still installing in background" admin-UI state when the round-trip times out but the worker is still working. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): clear pending install rows when backend.list confirms DistributedBackendManager.ListBackends now proactively clears pending_backend_ops install rows whose (nodeID, backend) is reported installed by backend.list. Operator UI updates immediately instead of waiting up to installTimeout (default 15m) for the next reconciler tick after NextRetryAt. Only install rows are cleared; upgrade and delete intents are not satisfied by presence in backend.list and continue to drain through their normal reconciler paths. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(messaging): add BackendInstallProgressEvent wire type and subject New NATS subject nodes.<nodeID>.backend.install.<opID>.progress lets the worker publish transient progress events (file, current/total bytes, percentage, phase) while a long-running install pulls its OCI image. BackendInstallRequest gains an optional OpID field so the worker knows which subject to publish on. Transient pub/sub (not JetStream): the install reply remains ground truth for success/failure; dropped progress events are tolerable. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * style(messaging): drop em-dash from BackendInstallProgress test comment Per project convention (no em-dashes anywhere). Comment substance is unchanged. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): worker publishes debounced install progress over NATS When BackendInstallRequest.OpID is set, the worker's backend.install handler wires a debounced publisher (250ms window) into the gallery download callback. Each tick becomes a BackendInstallProgressEvent on nodes.<nodeID>.backend.install.<opID>.progress; the publisher always emits a final event on Flush so the UI sees the terminal percentage. Old masters that do not set OpID continue to run silent installs: no behavior change for them. Lock ordering: the publisher releases its mutex before calling messaging.Publish so a slow network never stalls the install loop. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): RemoteUnloaderAdapter subscribes to install progress InstallBackend gains opID + onProgress parameters. When both are set, the adapter subscribes to nodes.<nodeID>.backend.install.<opID>.progress BEFORE publishing the install request, decodes each message into the caller's onProgress callback in a goroutine (so a slow callback never stalls the NATS reader thread), and unsubscribes after RequestJSON returns. When onProgress is nil OR opID is empty (the reconciler retry path), subscription is skipped entirely - silent installs cost nothing extra. Subscribe failure is logged at Warn and the install proceeds without progress streaming; the NATS round-trip still owns terminal status. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): forward backend install progress into galleryop OpStatus DistributedBackendManager.InstallBackend now passes the gallery op ID and a progress bridge into the adapter call. Each BackendInstallProgressEvent from the worker becomes a galleryop.ProgressCallback tick - which the existing backendHandler already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling sees per-byte progress for distributed installs without any UI-side change. UpgradeBackend is intentionally left silent for now: its wire request (BackendUpgradeRequest) does not carry OpID, and rolling-update fallback is the rarer path. Will be picked up in a follow-up if the worker upgrade path also gets a progress channel. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * test(distributed): InstallBackend tolerates silent (pre-Phase-2) workers A worker on pre-Phase-2 code never publishes progress events. The new master subscribes optimistically; this spec pins that a silent worker still produces a green install with no progressCb ticks. The install reply is the source of truth for terminal state; the progress stream is a best-effort UX enrichment. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): document install progress streaming Note the new nodes.<nodeID>.backend.install.<opID>.progress subject and the silent-worker compatibility behavior so operators know to expect real-time progress and what happens on a mixed-version cluster. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): note progress-event ordering trade-off in InstallBackend Document near the goroutine dispatch why ordering at the consumer is best-effort, why it rarely matters in practice (worker debounce >> goroutine jitter), and what a future hardening pass would look like (Seq field + stale-by-seq drop). Stops the next reader from accidentally "fixing" the goroutine pool away. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(galleryop): add NodeProgress + OpStatus.Nodes for per-node breakdown Adds the data model the UI needs to render an expandable per-node breakdown of a fanned-out backend install. NodeProgress carries node identity (ID + name), per-node status (queued / running_on_worker / success / error / downloading), the current file + bytes + percentage from the Phase 2 progress stream, and any per-node error. OpStatus.Nodes is the slice the /api/operations handler will surface in a follow-up. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(galleryop): UpdateNodeProgress merges per-node ticks by NodeID GalleryService.UpdateNodeProgress(opID, nodeID, np) merges a NodeProgress into OpStatus.Nodes (keyed by NodeID, no duplicates) and mirrors the latest tick into the aggregate Progress / FileName / DownloadedFileSize / TotalFileSize fields so the legacy single-bar OperationsBar view keeps working unchanged alongside the new per-node breakdown. Concurrent-safe via the existing g.Mutex. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(distributed): write per-node OpStatus entries during install fan-out DistributedBackendManager now accepts a nodeProgressSink and feeds it two streams: 1. enqueueAndDrainBackendOp emits a per-node terminal entry on each status it appends to BackendOpResult (queued, success, error, running_on_worker). The opID is threaded through the function so the sink gets the right gallery op identity. 2. The install apply closure fans each BackendInstallProgressEvent into the sink as a downloading entry, alongside the legacy progressCb path so the aggregate single-bar view stays correct. Production wiring passes the GalleryService (which implements UpdateNodeProgress via Task 2) as the sink. Single-node tests pass nil. DeleteBackend and UpgradeBackend pass an empty opID so the sink path no-ops for ops that aren't gallery-tracked the same way as Install. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(operations): expose per-node breakdown on /api/operations When an operation's OpStatus has Nodes entries (populated by the Phase 4 progress sink wiring), surface them as a "nodes" array on the /api/operations response, sorted by node_name for stable rendering. Backward compatible: legacy clients ignore the field; ops without any node entries (single-node mode, model installs) omit the array entirely thanks to the empty-slice guard. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * feat(ui): per-node breakdown in OperationsBar When an install op fans out to more than one worker, the operations bar now shows a "N nodes" chevron that expands into a per-node list. Each row carries the node's status (color-coded pill), the current file being downloaded, byte counts, percentage, and a thin per-node progress bar. Yellow "Worker busy" pill marks running_on_worker status with a tooltip explaining the NATS round-trip timed out but the worker is still installing in the background. Backward compatible: ops without a nodes field (legacy or single-node mode) render as before. State for expand/collapse is local to the component, keyed by jobID/id - reload starts collapsed. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): document per-node breakdown in the operations bar Adds a short subsection covering the expandable "N nodes" chevron in the OperationsBar admin UI, the meaning of each status pill, and how it relates to the /api/operations nodes array. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(galleryop): UpdateStatus preserves Nodes when caller sends none Real-world bug surfaced by the Phase 4 multi-worker smoke test: the nodes[] array in /api/operations flickered between a single node at a time on a 2-worker install. Root cause: the Phase 2 progress bridge also calls the legacy progressCb -> UpdateStatus(&OpStatus{...}) on every tick. UpdateStatus then overwrote the entire status pointer, wiping the Nodes slice that UpdateNodeProgress had just merged in. Fix: in UpdateStatus, if the incoming op has an empty Nodes slice, carry forward the previous status's Nodes before storing. Callers that explicitly populate Nodes still win (their slice replaces the prior one, no merge across the two code paths). Two regression specs added pinning both directions of the contract. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * docs(distributed): strip implementation details from user-facing docs Trim the new install/upgrade timeout rows and the install-progress sections to focus on what the operator sees and tunes. Drops: - the NATS subject names and pub/sub mechanics - "round-trip" / reconciler / backend.list jargon - /api/operations polling cadence - "pre-2026-05-22" version references Reframes the breakdown text around the admin UI (Operations Bar, chevron, status pills, "Worker busy" tooltip). Implementation context lives in the agent notes and code comments. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactor(config): move DistributedConfig.Validate flag names to constants The negative-duration check map was a wall of literal kebab-case strings that had to stay in sync with the kong-derived CLI flag names manually. Move them to a Flag* const block alongside the existing Default* block so a rename of either the Go field or the CLI naming convention forces a compile error rather than silent drift. Sole consumer today is Validate; the constants are exported so future operator-facing surfaces (e.g. error messages on other validation paths) can reference them by name instead of repeating the literals. Tests pin both the literal values (so a future "let's just rename this" doesn't accidentally regress the CLI flag) and the negative- duration error message for the new BackendInstall / BackendUpgrade fields. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactor(distributed): extract NodeStatus and Phase enums to constants Sweep for the same literal-string-as-identifier pattern called out on the Validate flag names: the per-node install status enum ("queued" | "downloading" | "running_on_worker" | "success" | "error") appeared as raw literals across managers_distributed.go (10+ sites, including 3 separate `n.Status == "running_on_worker"` checks), operation.go, and the test suite. Same shape for the Phase enum ("resolving" | "downloading" | "extracting" | "starting") in the worker-side progress publisher. Promote both to exported const blocks: - galleryop.NodeStatus{Queued,Downloading,RunningOnWorker,Success,Error} shared between galleryop.NodeProgress.Status (the wire field) and nodes.NodeOpStatus.Status (the in-process per-node summary) - messaging.Phase{Resolving,Downloading,Extracting,Starting} shared between the worker publisher and any future consumer that needs to switch on phase Tests pin both the literal values (so a future "let's just rename" doesn't silently change the JSON wire) and use the constants in setup (so the producer side stays drift-protected). Wire-format assertions on the /api/operations JSON output keep their literals deliberately, so the constant value can never silently diverge from what the UI receives. Out of scope for this PR (separate cleanup): the finetune and quantization job-status enums have the same anti-pattern with 14+ literal sites each, but predate this PR's work. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
780 lines
30 KiB
Go
780 lines
30 KiB
Go
package application
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/backend"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/http/auth"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/core/services/jobs"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
"github.com/mudler/LocalAI/core/services/storage"
|
|
coreStartup "github.com/mudler/LocalAI/core/startup"
|
|
"github.com/mudler/LocalAI/internal"
|
|
"github.com/mudler/LocalAI/pkg/vram"
|
|
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/sanitize"
|
|
"github.com/mudler/LocalAI/pkg/xsysinfo"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
func New(opts ...config.AppOption) (*Application, error) {
|
|
options := config.NewApplicationConfig(opts...)
|
|
|
|
// Store a copy of the startup config (from env vars, before file loading)
|
|
// This is used to determine if settings came from env vars vs file
|
|
startupConfigCopy := *options
|
|
application := newApplication(options)
|
|
application.startupConfig = &startupConfigCopy
|
|
|
|
xlog.Info("Starting LocalAI", "threads", options.Threads, "modelsPath", options.SystemState.Model.ModelsPath)
|
|
xlog.Info("LocalAI version", "version", internal.PrintableVersion())
|
|
|
|
if err := application.start(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
caps, err := xsysinfo.CPUCapabilities()
|
|
if err == nil {
|
|
xlog.Debug("CPU capabilities", "capabilities", caps)
|
|
|
|
}
|
|
gpus, err := xsysinfo.GPUs()
|
|
if err == nil {
|
|
xlog.Debug("GPU count", "count", len(gpus))
|
|
for _, gpu := range gpus {
|
|
xlog.Debug("GPU", "gpu", gpu.String())
|
|
}
|
|
}
|
|
|
|
// Make sure directories exists
|
|
if options.SystemState.Model.ModelsPath == "" {
|
|
return nil, fmt.Errorf("models path cannot be empty")
|
|
}
|
|
|
|
err = os.MkdirAll(options.SystemState.Model.ModelsPath, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create ModelPath: %q", err)
|
|
}
|
|
if options.GeneratedContentDir != "" {
|
|
err := os.MkdirAll(options.GeneratedContentDir, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create ImageDir: %q", err)
|
|
}
|
|
}
|
|
if options.UploadDir != "" {
|
|
err := os.MkdirAll(options.UploadDir, 0750)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create UploadDir: %q", err)
|
|
}
|
|
}
|
|
|
|
// Create and migrate data directory
|
|
if options.DataPath != "" {
|
|
if err := os.MkdirAll(options.DataPath, 0750); err != nil {
|
|
return nil, fmt.Errorf("unable to create DataPath: %q", err)
|
|
}
|
|
// Migrate data from DynamicConfigsDir to DataPath if needed
|
|
if options.DynamicConfigsDir != "" && options.DataPath != options.DynamicConfigsDir {
|
|
migrateDataFiles(options.DynamicConfigsDir, options.DataPath)
|
|
}
|
|
}
|
|
|
|
// Initialize auth database if auth is enabled
|
|
if options.Auth.Enabled {
|
|
// Auto-generate HMAC secret if not provided
|
|
if options.Auth.APIKeyHMACSecret == "" {
|
|
secretFile := filepath.Join(options.DataPath, ".hmac_secret")
|
|
secret, err := loadOrGenerateHMACSecret(secretFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize HMAC secret: %w", err)
|
|
}
|
|
options.Auth.APIKeyHMACSecret = secret
|
|
}
|
|
|
|
authDB, err := auth.InitDB(options.Auth.DatabaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize auth database: %w", err)
|
|
}
|
|
application.authDB = authDB
|
|
xlog.Info("Auth enabled", "database", sanitize.URL(options.Auth.DatabaseURL))
|
|
|
|
// Start session and expired API key cleanup goroutine
|
|
go func() {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-options.Context.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := auth.CleanExpiredSessions(authDB); err != nil {
|
|
xlog.Error("failed to clean expired sessions", "error", err)
|
|
}
|
|
if err := auth.CleanExpiredAPIKeys(authDB); err != nil {
|
|
xlog.Error("failed to clean expired API keys", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wire JobStore for DB-backed task/job persistence whenever auth DB is available.
|
|
// This ensures tasks and jobs survive restarts in both single-node and distributed modes.
|
|
if application.authDB != nil && application.agentJobService != nil {
|
|
dbJobStore, err := jobs.NewJobStore(application.authDB)
|
|
if err != nil {
|
|
xlog.Error("Failed to create job store for auth DB", "error", err)
|
|
} else {
|
|
application.agentJobService.SetDistributedJobStore(dbJobStore)
|
|
}
|
|
}
|
|
|
|
// Initialize distributed mode services (NATS, object storage, node registry)
|
|
distSvc, err := initDistributed(options, application.authDB, application.ModelConfigLoader())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("distributed mode initialization failed: %w", err)
|
|
}
|
|
if distSvc != nil {
|
|
application.distributed = distSvc
|
|
// Wire remote model unloader so ShutdownModel works for remote nodes
|
|
// Uses NATS to tell serve-backend nodes to Free + kill their backend process
|
|
application.modelLoader.SetRemoteUnloader(distSvc.Unloader)
|
|
// Wire ModelRouter so grpcModel() delegates to SmartRouter in distributed mode
|
|
application.modelLoader.SetModelRouter(distSvc.ModelAdapter.AsModelRouter())
|
|
// Wire DistributedModelStore so shutdown/list/watchdog can find remote models
|
|
distStore := nodes.NewDistributedModelStore(
|
|
model.NewInMemoryModelStore(),
|
|
distSvc.Registry,
|
|
)
|
|
application.modelLoader.SetModelStore(distStore)
|
|
// Start health monitor
|
|
distSvc.Health.Start(options.Context)
|
|
// Start replica reconciler for auto-scaling model replicas
|
|
if distSvc.Reconciler != nil {
|
|
go distSvc.Reconciler.Run(options.Context)
|
|
}
|
|
// In distributed mode, MCP CI jobs are executed by agent workers (not the frontend)
|
|
// because the frontend can't create MCP sessions (e.g., stdio servers using docker).
|
|
// The dispatcher still subscribes to jobs.new for persistence (result/progress subs)
|
|
// but does NOT set a workerFn — agent workers consume jobs from the same NATS queue.
|
|
|
|
// Wire model config loader so job events include model config for agent workers
|
|
distSvc.Dispatcher.SetModelConfigLoader(application.backendLoader)
|
|
|
|
// Start job dispatcher — abort startup if it fails, as jobs would be accepted but never dispatched
|
|
if err := distSvc.Dispatcher.Start(options.Context); err != nil {
|
|
return nil, fmt.Errorf("starting job dispatcher: %w", err)
|
|
}
|
|
// Start ephemeral file cleanup
|
|
storage.StartEphemeralCleanup(options.Context, distSvc.FileMgr, 0, 0)
|
|
// Wire distributed backends into AgentJobService (before Start)
|
|
if application.agentJobService != nil {
|
|
application.agentJobService.SetDistributedBackends(distSvc.Dispatcher)
|
|
application.agentJobService.SetDistributedJobStore(distSvc.JobStore)
|
|
}
|
|
// Wire skill store into AgentPoolService (wired at pool start time via closure)
|
|
// The actual wiring happens in StartAgentPool since the pool doesn't exist yet.
|
|
|
|
// Wire NATS and gallery store into GalleryService for cross-instance progress/cancel
|
|
if application.galleryService != nil {
|
|
application.galleryService.SetNATSClient(distSvc.Nats)
|
|
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
|
|
// Clean up stale in-progress operations from previous crashed instances
|
|
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
|
|
xlog.Warn("Failed to clean stale gallery operations", "error", err)
|
|
}
|
|
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
|
|
}
|
|
// Wire distributed model/backend managers so delete propagates to workers
|
|
application.galleryService.SetModelManager(
|
|
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
|
|
)
|
|
application.galleryService.SetBackendManager(
|
|
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Start AgentJobService (after distributed wiring so it knows whether to use local or NATS)
|
|
if application.agentJobService != nil {
|
|
if err := application.agentJobService.Start(options.Context); err != nil {
|
|
return nil, fmt.Errorf("starting agent job service: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := coreStartup.InstallModels(options.Context, application.GalleryService(), options.Galleries, options.BackendGalleries, options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.RequireBackendIntegrity, nil, options.ModelsURL...); err != nil {
|
|
xlog.Error("error installing models", "error", err)
|
|
}
|
|
|
|
for _, backend := range options.ExternalBackends {
|
|
if err := galleryop.InstallExternalBackend(options.Context, options.BackendGalleries, options.SystemState, application.ModelLoader(), nil, backend, "", "", options.RequireBackendIntegrity); err != nil {
|
|
xlog.Error("error installing external backend", "error", err)
|
|
}
|
|
}
|
|
|
|
configLoaderOpts := options.ToConfigLoaderOptions()
|
|
|
|
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil {
|
|
xlog.Error("error loading config files", "error", err)
|
|
}
|
|
|
|
if err := gallery.RegisterBackends(options.SystemState, application.ModelLoader()); err != nil {
|
|
xlog.Error("error registering external backends", "error", err)
|
|
}
|
|
|
|
// Start background upgrade checker for backends.
|
|
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
|
|
// instance runs periodic checks (avoids duplicate upgrades across replicas).
|
|
if len(options.BackendGalleries) > 0 {
|
|
// Pass a lazy getter for the backend manager so the checker always
|
|
// uses the active one — DistributedBackendManager is swapped in above
|
|
// and asks workers for their installed backends, which is what
|
|
// upgrade detection needs in distributed mode.
|
|
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
|
|
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
|
|
application.upgradeChecker = uc
|
|
// Refresh the upgrade cache the moment a backend op finishes — otherwise
|
|
// the UI keeps showing a just-upgraded backend as upgradeable until the
|
|
// next 6-hour tick. TriggerCheck is non-blocking.
|
|
if gs := application.GalleryService(); gs != nil {
|
|
gs.OnBackendOpCompleted = uc.TriggerCheck
|
|
}
|
|
go uc.Run(options.Context)
|
|
}
|
|
|
|
// Wire gallery generation counter into VRAM caches so they invalidate
|
|
// when gallery data refreshes instead of using a fixed TTL.
|
|
vram.SetGalleryGenerationFunc(gallery.GalleryGeneration)
|
|
|
|
if options.ConfigFile != "" {
|
|
if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil {
|
|
xlog.Error("error loading config file", "error", err)
|
|
}
|
|
}
|
|
|
|
if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil {
|
|
xlog.Error("error downloading models", "error", err)
|
|
}
|
|
|
|
if options.PreloadJSONModels != "" {
|
|
if err := galleryop.ApplyGalleryFromString(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadJSONModels, options.RequireBackendIntegrity); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if options.PreloadModelsFromPath != "" {
|
|
if err := galleryop.ApplyGalleryFromFile(options.SystemState, application.ModelLoader(), options.EnforcePredownloadScans, options.AutoloadBackendGalleries, options.Galleries, options.BackendGalleries, options.PreloadModelsFromPath, options.RequireBackendIntegrity); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if options.Debug {
|
|
for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() {
|
|
xlog.Debug("Model", "name", v.Name, "config", v)
|
|
}
|
|
}
|
|
|
|
// Load runtime settings from file if DynamicConfigsDir is set
|
|
// This applies file settings with env var precedence (env vars take priority)
|
|
// Note: startupConfigCopy was already created above, so it has the original env var values
|
|
if options.DynamicConfigsDir != "" {
|
|
loadRuntimeSettingsFromFile(options)
|
|
}
|
|
|
|
application.ModelLoader().SetBackendLoggingEnabled(options.EnableBackendLogging)
|
|
|
|
// turn off any process that was started by GRPC if the context is canceled
|
|
go func() {
|
|
<-options.Context.Done()
|
|
xlog.Debug("Context canceled, shutting down")
|
|
application.distributed.Shutdown()
|
|
err := application.ModelLoader().StopAllGRPC()
|
|
if err != nil {
|
|
xlog.Error("error while stopping all grpc backends", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Initialize watchdog with current settings (after loading from file)
|
|
initializeWatchdog(application, options)
|
|
|
|
if options.LoadToMemory != nil && !options.SingleBackend {
|
|
for _, m := range options.LoadToMemory {
|
|
cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
xlog.Debug("Auto loading model into memory from file", "model", m, "file", cfg.Model)
|
|
|
|
o := backend.ModelOptions(*cfg, options)
|
|
|
|
var backendErr error
|
|
_, backendErr = application.ModelLoader().Load(o...)
|
|
if backendErr != nil {
|
|
return nil, backendErr
|
|
}
|
|
}
|
|
}
|
|
|
|
// Watch the configuration directory
|
|
startWatcher(options)
|
|
|
|
xlog.Info("core/startup process completed!")
|
|
return application, nil
|
|
}
|
|
|
|
func startWatcher(options *config.ApplicationConfig) {
|
|
if options.DynamicConfigsDir == "" {
|
|
// No need to start the watcher if the directory is not set
|
|
return
|
|
}
|
|
|
|
if _, err := os.Stat(options.DynamicConfigsDir); err != nil {
|
|
if os.IsNotExist(err) {
|
|
// We try to create the directory if it does not exist and was specified
|
|
if err := os.MkdirAll(options.DynamicConfigsDir, 0700); err != nil {
|
|
xlog.Error("failed creating DynamicConfigsDir", "error", err)
|
|
}
|
|
} else {
|
|
// something else happened, we log the error and don't start the watcher
|
|
xlog.Error("failed to read DynamicConfigsDir, watcher will not be started", "error", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
configHandler := newConfigFileHandler(options)
|
|
if err := configHandler.Watch(); err != nil {
|
|
xlog.Error("failed creating watcher", "error", err)
|
|
}
|
|
}
|
|
|
|
// loadRuntimeSettingsFromFile loads settings from runtime_settings.json with env var precedence
|
|
// This function is called at startup, before env vars are applied via AppOptions.
|
|
// Since env vars are applied via AppOptions in run.go, we need to check if they're set.
|
|
// We do this by checking if the current options values differ from defaults, which would
|
|
// indicate they were set from env vars. However, a simpler approach is to just apply
|
|
// file settings here, and let the AppOptions (which are applied after this) override them.
|
|
// But actually, this is called AFTER AppOptions are applied in New(), so we need to check env vars.
|
|
// The cleanest solution: Store original values before applying file, or check if values match
|
|
// what would be set from env vars. For now, we'll apply file settings and they'll be
|
|
// overridden by AppOptions if env vars were set (but AppOptions are already applied).
|
|
// Actually, this function is called in New() before AppOptions are fully processed for watchdog.
|
|
// Let's check the call order: New() -> loadRuntimeSettingsFromFile() -> initializeWatchdog()
|
|
// But AppOptions are applied in NewApplicationConfig() which is called first.
|
|
// So at this point, options already has values from env vars. We should compare against
|
|
// defaults to see if env vars were set. But we don't have defaults stored.
|
|
// Simplest: Just apply file settings. If env vars were set, they're already in options.
|
|
// The file watcher handler will handle runtime changes properly by comparing with startupAppConfig.
|
|
func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
|
|
settingsFile := filepath.Join(options.DynamicConfigsDir, "runtime_settings.json")
|
|
fileContent, err := os.ReadFile(settingsFile)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
xlog.Debug("runtime_settings.json not found, using defaults")
|
|
return
|
|
}
|
|
xlog.Warn("failed to read runtime_settings.json", "error", err)
|
|
return
|
|
}
|
|
|
|
var settings config.RuntimeSettings
|
|
|
|
if err := json.Unmarshal(fileContent, &settings); err != nil {
|
|
xlog.Warn("failed to parse runtime_settings.json", "error", err)
|
|
return
|
|
}
|
|
|
|
// At this point, options already has values from env vars (via AppOptions in run.go).
|
|
// To avoid env var duplication, we determine if env vars were set by checking if
|
|
// current values differ from defaults. Defaults are: false for bools, 0 for durations.
|
|
// If current value is at default, it likely wasn't set from env var, so we can apply file.
|
|
// If current value is non-default, it was likely set from env var, so we preserve it.
|
|
// Note: This means env vars explicitly setting to false/0 won't be distinguishable from defaults,
|
|
// but that's an acceptable limitation to avoid env var duplication.
|
|
|
|
if settings.WatchdogIdleEnabled != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.WatchDogIdle {
|
|
options.WatchDogIdle = *settings.WatchdogIdleEnabled
|
|
if options.WatchDogIdle {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogBusyEnabled != nil {
|
|
if !options.WatchDogBusy {
|
|
options.WatchDogBusy = *settings.WatchdogBusyEnabled
|
|
if options.WatchDogBusy {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogIdleTimeout != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.WatchDogIdleTimeout == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout)
|
|
if err == nil {
|
|
options.WatchDogIdleTimeout = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog idle timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogIdleTimeout)
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogBusyTimeout != nil {
|
|
if options.WatchDogBusyTimeout == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout)
|
|
if err == nil {
|
|
options.WatchDogBusyTimeout = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog busy timeout in runtime_settings.json", "error", err, "timeout", *settings.WatchdogBusyTimeout)
|
|
}
|
|
}
|
|
}
|
|
if settings.WatchdogInterval != nil {
|
|
if options.WatchDogInterval == 0 {
|
|
dur, err := time.ParseDuration(*settings.WatchdogInterval)
|
|
if err == nil {
|
|
options.WatchDogInterval = dur
|
|
} else {
|
|
xlog.Warn("invalid watchdog interval in runtime_settings.json", "error", err, "interval", *settings.WatchdogInterval)
|
|
options.WatchDogInterval = model.DefaultWatchdogInterval
|
|
}
|
|
}
|
|
}
|
|
// Handle MaxActiveBackends (new) and SingleBackend (deprecated)
|
|
if settings.MaxActiveBackends != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.MaxActiveBackends == 0 {
|
|
options.MaxActiveBackends = *settings.MaxActiveBackends
|
|
// For backward compatibility, also set SingleBackend if MaxActiveBackends == 1
|
|
options.SingleBackend = (*settings.MaxActiveBackends == 1)
|
|
}
|
|
} else if settings.SingleBackend != nil {
|
|
// Legacy: SingleBackend maps to MaxActiveBackends = 1
|
|
if !options.SingleBackend {
|
|
options.SingleBackend = *settings.SingleBackend
|
|
if *settings.SingleBackend {
|
|
options.MaxActiveBackends = 1
|
|
}
|
|
}
|
|
}
|
|
if settings.MemoryReclaimerEnabled != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.MemoryReclaimerEnabled {
|
|
options.MemoryReclaimerEnabled = *settings.MemoryReclaimerEnabled
|
|
if options.MemoryReclaimerEnabled {
|
|
options.WatchDog = true // Memory reclaimer requires watchdog
|
|
}
|
|
}
|
|
}
|
|
if settings.MemoryReclaimerThreshold != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.MemoryReclaimerThreshold == 0 {
|
|
options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold
|
|
}
|
|
}
|
|
if settings.ForceEvictionWhenBusy != nil {
|
|
// Only apply if current value is default (false), suggesting it wasn't set from env var
|
|
if !options.ForceEvictionWhenBusy {
|
|
options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy
|
|
}
|
|
}
|
|
if settings.LRUEvictionMaxRetries != nil {
|
|
// Only apply if current value is default (30), suggesting it wasn't set from env var
|
|
if options.LRUEvictionMaxRetries == 0 {
|
|
options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries
|
|
}
|
|
}
|
|
if settings.LRUEvictionRetryInterval != nil {
|
|
// Only apply if current value is default (1s), suggesting it wasn't set from env var
|
|
if options.LRUEvictionRetryInterval == 0 {
|
|
dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval)
|
|
if err == nil {
|
|
options.LRUEvictionRetryInterval = dur
|
|
} else {
|
|
xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval)
|
|
}
|
|
}
|
|
}
|
|
if settings.AgentJobRetentionDays != nil {
|
|
// Only apply if current value is default (0), suggesting it wasn't set from env var
|
|
if options.AgentJobRetentionDays == 0 {
|
|
options.AgentJobRetentionDays = *settings.AgentJobRetentionDays
|
|
}
|
|
}
|
|
if !options.WatchDogIdle && !options.WatchDogBusy {
|
|
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
|
|
options.WatchDog = true
|
|
}
|
|
}
|
|
|
|
// P2P settings
|
|
if settings.P2PToken != nil {
|
|
if options.P2PToken == "" {
|
|
options.P2PToken = *settings.P2PToken
|
|
}
|
|
}
|
|
if settings.P2PNetworkID != nil {
|
|
if options.P2PNetworkID == "" {
|
|
options.P2PNetworkID = *settings.P2PNetworkID
|
|
}
|
|
}
|
|
if settings.Federated != nil {
|
|
if !options.Federated {
|
|
options.Federated = *settings.Federated
|
|
}
|
|
}
|
|
|
|
if settings.EnableBackendLogging != nil {
|
|
if !options.EnableBackendLogging {
|
|
options.EnableBackendLogging = *settings.EnableBackendLogging
|
|
}
|
|
}
|
|
|
|
// Tracing settings
|
|
if settings.EnableTracing != nil {
|
|
if !options.EnableTracing {
|
|
options.EnableTracing = *settings.EnableTracing
|
|
}
|
|
}
|
|
if settings.TracingMaxItems != nil {
|
|
if options.TracingMaxItems == 0 {
|
|
options.TracingMaxItems = *settings.TracingMaxItems
|
|
}
|
|
}
|
|
if settings.TracingMaxBodyBytes != nil {
|
|
// Allow the on-disk setting to override the CLI/env default. The
|
|
// startup default is non-zero (see NewApplicationConfig), so a plain
|
|
// `== 0` guard like the others would never trigger; we instead respect
|
|
// any value the file specifies. 0 in the file means "uncapped".
|
|
options.TracingMaxBodyBytes = *settings.TracingMaxBodyBytes
|
|
}
|
|
|
|
// Branding / whitelabeling. There are no env vars for these — the file is
|
|
// the only source — so apply unconditionally. Without this block a server
|
|
// restart silently drops the configured instance name, tagline, and asset
|
|
// filenames.
|
|
if settings.InstanceName != nil {
|
|
options.Branding.InstanceName = *settings.InstanceName
|
|
}
|
|
if settings.InstanceTagline != nil {
|
|
options.Branding.InstanceTagline = *settings.InstanceTagline
|
|
}
|
|
if settings.LogoFile != nil {
|
|
options.Branding.LogoFile = *settings.LogoFile
|
|
}
|
|
if settings.LogoHorizontalFile != nil {
|
|
options.Branding.LogoHorizontalFile = *settings.LogoHorizontalFile
|
|
}
|
|
if settings.FaviconFile != nil {
|
|
options.Branding.FaviconFile = *settings.FaviconFile
|
|
}
|
|
|
|
// Backend upgrade flags
|
|
if settings.AutoUpgradeBackends != nil {
|
|
if !options.AutoUpgradeBackends {
|
|
options.AutoUpgradeBackends = *settings.AutoUpgradeBackends
|
|
}
|
|
}
|
|
if settings.PreferDevelopmentBackends != nil {
|
|
if !options.PreferDevelopmentBackends {
|
|
options.PreferDevelopmentBackends = *settings.PreferDevelopmentBackends
|
|
}
|
|
}
|
|
|
|
// LocalAI Assistant — file-stored as the negation (LocalAIAssistantEnabled).
|
|
// Default is enabled (DisableLocalAIAssistant=false). Apply the file value
|
|
// unless env explicitly disabled the assistant (DisableLocalAIAssistant=true).
|
|
if settings.LocalAIAssistantEnabled != nil {
|
|
if !options.DisableLocalAIAssistant {
|
|
options.DisableLocalAIAssistant = !*settings.LocalAIAssistantEnabled
|
|
}
|
|
}
|
|
|
|
// Open Responses TTL. Default is 0 (no expiration). Treat the on-disk
|
|
// "0"/empty as "no expiration" — a no-op since options is already 0 —
|
|
// and parse anything else as a duration.
|
|
if settings.OpenResponsesStoreTTL != nil && options.OpenResponsesStoreTTL == 0 {
|
|
v := *settings.OpenResponsesStoreTTL
|
|
if v != "0" && v != "" {
|
|
if dur, err := time.ParseDuration(v); err == nil {
|
|
options.OpenResponsesStoreTTL = dur
|
|
} else {
|
|
xlog.Warn("invalid open_responses_store_ttl in runtime_settings.json", "error", err, "ttl", v)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Agent Pool. NewApplicationConfig seeds non-zero defaults for some of
|
|
// these fields (Enabled=true, EmbeddingModel="granite-embedding-107m-
|
|
// multilingual", MaxChunkingSize=400). The "if at default, apply file"
|
|
// gate uses each field's actual default literal so file values can
|
|
// override the bootstrap default while still letting an env-set value
|
|
// (e.g. WithAgentPoolEmbeddingModel from a flag) win.
|
|
if settings.AgentPoolEnabled != nil && options.AgentPool.Enabled {
|
|
options.AgentPool.Enabled = *settings.AgentPoolEnabled
|
|
}
|
|
if settings.AgentPoolDefaultModel != nil && options.AgentPool.DefaultModel == "" {
|
|
options.AgentPool.DefaultModel = *settings.AgentPoolDefaultModel
|
|
}
|
|
if settings.AgentPoolEmbeddingModel != nil {
|
|
if options.AgentPool.EmbeddingModel == "" || options.AgentPool.EmbeddingModel == "granite-embedding-107m-multilingual" {
|
|
options.AgentPool.EmbeddingModel = *settings.AgentPoolEmbeddingModel
|
|
}
|
|
}
|
|
if settings.AgentPoolMaxChunkingSize != nil {
|
|
if options.AgentPool.MaxChunkingSize == 0 || options.AgentPool.MaxChunkingSize == 400 {
|
|
options.AgentPool.MaxChunkingSize = *settings.AgentPoolMaxChunkingSize
|
|
}
|
|
}
|
|
if settings.AgentPoolChunkOverlap != nil && options.AgentPool.ChunkOverlap == 0 {
|
|
options.AgentPool.ChunkOverlap = *settings.AgentPoolChunkOverlap
|
|
}
|
|
if settings.AgentPoolEnableLogs != nil && !options.AgentPool.EnableLogs {
|
|
options.AgentPool.EnableLogs = *settings.AgentPoolEnableLogs
|
|
}
|
|
if settings.AgentPoolCollectionDBPath != nil && options.AgentPool.CollectionDBPath == "" {
|
|
options.AgentPool.CollectionDBPath = *settings.AgentPoolCollectionDBPath
|
|
}
|
|
if settings.AgentPoolVectorEngine != nil {
|
|
// Default is "chromem"; treat both that and empty as "not env-set".
|
|
if options.AgentPool.VectorEngine == "" || options.AgentPool.VectorEngine == "chromem" {
|
|
options.AgentPool.VectorEngine = *settings.AgentPoolVectorEngine
|
|
}
|
|
}
|
|
if settings.AgentPoolDatabaseURL != nil && options.AgentPool.DatabaseURL == "" {
|
|
options.AgentPool.DatabaseURL = *settings.AgentPoolDatabaseURL
|
|
}
|
|
if settings.AgentPoolAgentHubURL != nil {
|
|
// Default is "https://agenthub.localai.io"; treat both that and empty
|
|
// as "not env-set".
|
|
if options.AgentPool.AgentHubURL == "" || options.AgentPool.AgentHubURL == "https://agenthub.localai.io" {
|
|
options.AgentPool.AgentHubURL = *settings.AgentPoolAgentHubURL
|
|
}
|
|
}
|
|
|
|
xlog.Debug("Runtime settings loaded from runtime_settings.json")
|
|
}
|
|
|
|
// initializeWatchdog initializes the watchdog with current ApplicationConfig settings
|
|
func initializeWatchdog(application *Application, options *config.ApplicationConfig) {
|
|
// Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend)
|
|
lruLimit := options.GetEffectiveMaxActiveBackends()
|
|
|
|
// Create watchdog if enabled OR if LRU limit is set OR if memory reclaimer is enabled
|
|
if options.WatchDog || lruLimit > 0 || options.MemoryReclaimerEnabled {
|
|
wd := model.NewWatchDog(
|
|
model.WithProcessManager(application.ModelLoader()),
|
|
model.WithBusyTimeout(options.WatchDogBusyTimeout),
|
|
model.WithIdleTimeout(options.WatchDogIdleTimeout),
|
|
model.WithWatchdogInterval(options.WatchDogInterval),
|
|
model.WithBusyCheck(options.WatchDogBusy),
|
|
model.WithIdleCheck(options.WatchDogIdle),
|
|
model.WithLRULimit(lruLimit),
|
|
model.WithMemoryReclaimer(options.MemoryReclaimerEnabled, options.MemoryReclaimerThreshold),
|
|
model.WithForceEvictionWhenBusy(options.ForceEvictionWhenBusy),
|
|
)
|
|
application.ModelLoader().SetWatchDog(wd)
|
|
|
|
// Initialize ModelLoader LRU eviction retry settings
|
|
application.ModelLoader().SetLRUEvictionRetrySettings(
|
|
options.LRUEvictionMaxRetries,
|
|
options.LRUEvictionRetryInterval,
|
|
)
|
|
|
|
// Sync per-model state from configs to the watchdog. Without this,
|
|
// `pinned: true` and `concurrency_groups:` are only honored after a
|
|
// settings-driven RestartWatchdog and never at boot.
|
|
application.SyncPinnedModelsToWatchdog()
|
|
application.SyncModelGroupsToWatchdog()
|
|
|
|
// Start watchdog goroutine if any periodic checks are enabled
|
|
// LRU eviction doesn't need the Run() loop - it's triggered on model load
|
|
// But memory reclaimer needs the Run() loop for periodic checking
|
|
if options.WatchDogBusy || options.WatchDogIdle || options.MemoryReclaimerEnabled {
|
|
go wd.Run()
|
|
}
|
|
|
|
go func() {
|
|
<-options.Context.Done()
|
|
xlog.Debug("Context canceled, shutting down")
|
|
wd.Shutdown()
|
|
}()
|
|
}
|
|
}
|
|
|
|
// loadOrGenerateHMACSecret loads an HMAC secret from the given file path,
|
|
// or generates a random 32-byte secret and persists it if the file doesn't exist.
|
|
func loadOrGenerateHMACSecret(path string) (string, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err == nil {
|
|
secret := string(data)
|
|
if len(secret) >= 32 {
|
|
return secret, nil
|
|
}
|
|
}
|
|
|
|
b := make([]byte, 32)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return "", fmt.Errorf("failed to generate HMAC secret: %w", err)
|
|
}
|
|
secret := hex.EncodeToString(b)
|
|
|
|
if err := os.WriteFile(path, []byte(secret), 0600); err != nil {
|
|
return "", fmt.Errorf("failed to persist HMAC secret: %w", err)
|
|
}
|
|
|
|
xlog.Info("Generated new HMAC secret for API key hashing", "path", path)
|
|
return secret, nil
|
|
}
|
|
|
|
// migrateDataFiles moves persistent data files from the old config directory
|
|
// to the new data directory. Only moves files that exist in src but not in dst.
|
|
func migrateDataFiles(srcDir, dstDir string) {
|
|
// Files and directories to migrate
|
|
items := []string{
|
|
"agent_tasks.json",
|
|
"agent_jobs.json",
|
|
"collections",
|
|
"assets",
|
|
}
|
|
|
|
migrated := false
|
|
for _, item := range items {
|
|
srcPath := filepath.Join(srcDir, item)
|
|
dstPath := filepath.Join(dstDir, item)
|
|
|
|
// Only migrate if source exists and destination does not
|
|
if _, err := os.Stat(srcPath); os.IsNotExist(err) {
|
|
continue
|
|
}
|
|
if _, err := os.Stat(dstPath); err == nil {
|
|
continue // destination already exists, skip
|
|
}
|
|
|
|
if err := os.Rename(srcPath, dstPath); err != nil {
|
|
xlog.Warn("Failed to migrate data file, will copy instead", "src", srcPath, "dst", dstPath, "error", err)
|
|
// os.Rename fails across filesystems, fall back to leaving in place
|
|
// and log a warning for the user to manually move
|
|
xlog.Warn("Data file remains in old location, please move manually", "src", srcPath, "dst", dstPath)
|
|
continue
|
|
}
|
|
migrated = true
|
|
xlog.Info("Migrated data file to new data path", "src", srcPath, "dst", dstPath)
|
|
}
|
|
|
|
if migrated {
|
|
xlog.Info("Data migration complete", "from", srcDir, "to", dstDir)
|
|
}
|
|
}
|