Files
LocalAI/core/services/nodes/managers_distributed.go
Ettore Di Giacinto 8ab0744458 feat: backend versioning, upgrade detection and auto-upgrade (#9315)
* feat: add backend versioning data model foundation

Add Version, URI, and Digest fields to BackendMetadata for tracking
installed backend versions and enabling upgrade detection. Add Version
field to GalleryBackend. Add UpgradeAvailable/AvailableVersion fields
to SystemBackend. Implement GetImageDigest() for lightweight OCI digest
lookups via remote.Head. Record version, URI, and digest at install time
in InstallBackend() and propagate version through meta backends.

* feat: add backend upgrade detection and execution logic

Add CheckBackendUpgrades() to compare installed backend versions/digests
against gallery entries, and UpgradeBackend() to perform atomic upgrades
with backup-based rollback on failure. Includes Agent A's data model
changes (Version/URI/Digest fields, GetImageDigest).

* feat: add AutoUpgradeBackends config and runtime settings

Add configuration and runtime settings for backend auto-upgrade:
- RuntimeSettings field for dynamic config via API/JSON
- ApplicationConfig field, option func, and roundtrip conversion
- CLI flag with LOCALAI_AUTO_UPGRADE_BACKENDS env var
- Config file watcher support for runtime_settings.json
- Tests for ToRuntimeSettings, ApplyRuntimeSettings, and roundtrip

* feat(ui): add backend version display and upgrade support

- Add upgrade check/trigger API endpoints to config and api module
- Backends page: version badge, upgrade indicator, upgrade button
- Manage page: version in metadata, context-aware upgrade/reinstall button
- Settings page: auto-upgrade backends toggle

* feat: add upgrade checker service, API endpoints, and CLI command

- UpgradeChecker background service: checks every 6h, auto-upgrades when enabled
- API endpoints: GET /backends/upgrades, POST /backends/upgrades/check, POST /backends/upgrade/:name
- CLI: `localai backends upgrade` command, version display in `backends list`
- BackendManager interface: add UpgradeBackend and CheckUpgrades methods
- Wire upgrade op through GalleryService backend handler
- Distributed mode: fan-out upgrade to worker nodes via NATS

* fix: use advisory lock for upgrade checker in distributed mode

In distributed mode with multiple frontend instances, use PostgreSQL
advisory lock (KeyBackendUpgradeCheck) so only one instance runs
periodic upgrade checks and auto-upgrades. Prevents duplicate
upgrade operations across replicas.

Standalone mode is unchanged (simple ticker loop).

* test: add e2e tests for backend upgrade API

- Test GET /api/backends/upgrades returns 200 (even with no upgrade checker)
- Test POST /api/backends/upgrade/:name accepts request and returns job ID
- Test full upgrade flow: trigger upgrade via API, wait for job completion,
  verify run.sh updated to v2 and metadata.json has version 2.0.0
- Test POST /api/backends/upgrades/check returns 200
- Fix nil check for applicationInstance in upgrade API routes
2026-04-11 22:31:15 +02:00

217 lines
7.6 KiB
Go

package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
)
// DistributedModelManager wraps a local ModelManager and adds NATS fan-out
// for model deletion so worker nodes clean up stale files.
type DistributedModelManager struct {
local galleryop.ModelManager
adapter *RemoteUnloaderAdapter
}
// NewDistributedModelManager creates a DistributedModelManager.
// Backend auto-install is disabled because the frontend node delegates
// inference to workers and never runs backends locally.
func NewDistributedModelManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter) *DistributedModelManager {
local := galleryop.NewLocalModelManager(appConfig, ml)
local.SetAutoInstallBackend(false)
return &DistributedModelManager{
local: local,
adapter: adapter,
}
}
func (d *DistributedModelManager) DeleteModel(name string) error {
err := d.local.DeleteModel(name)
// Best-effort: fan out model.delete to worker nodes
if rcErr := d.adapter.DeleteModelFiles(name); rcErr != nil {
xlog.Warn("Failed to propagate model file deletion to workers", "model", name, "error", rcErr)
}
return err
}
func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryModel, gallery.ModelConfig], progressCb galleryop.ProgressCallback) error {
return d.local.InstallModel(ctx, op, progressCb)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
local galleryop.BackendManager
adapter *RemoteUnloaderAdapter
registry *NodeRegistry
backendGalleries []config.Gallery
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
}
}
func (d *DistributedBackendManager) DeleteBackend(name string) error {
// Try local deletion but ignore "not found" errors — in distributed mode
// the frontend node typically doesn't have backends installed locally;
// they only exist on worker nodes.
if err := d.local.DeleteBackend(name); err != nil {
if !errors.Is(err, gallery.ErrBackendNotFound) {
return err
}
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
}
// Fan out backend.delete to all healthy nodes
allNodes, listErr := d.registry.List(context.Background())
if listErr != nil {
xlog.Warn("Failed to list nodes for backend deletion fan-out", "error", listErr)
return listErr
}
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
if _, delErr := d.adapter.DeleteBackend(node.ID, name); delErr != nil {
if errors.Is(delErr, nats.ErrNoResponders) {
// Node's NATS subscription is gone — likely restarted with a new ID.
// Mark it unhealthy so future fan-outs skip it.
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to propagate backend deletion to worker", "node", node.Name, "backend", name, "error", delErr)
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, delErr))
}
}
return errors.Join(errs...)
}
// ListBackends aggregates installed backends from all healthy worker nodes.
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
result := make(gallery.SystemBackends)
allNodes, err := d.registry.List(context.Background())
if err != nil {
return result, err
}
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
reply, err := d.adapter.ListBackends(node.ID)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to list backends on worker", "node", node.Name, "error", err)
continue
}
if reply.Error != "" {
xlog.Warn("Worker returned error listing backends", "node", node.Name, "error", reply.Error)
continue
}
for _, b := range reply.Backends {
if _, exists := result[b.Name]; !exists {
result[b.Name] = gallery.SystemBackend{
Name: b.Name,
IsSystem: b.IsSystem,
IsMeta: b.IsMeta,
Metadata: &gallery.BackendMetadata{
InstalledAt: b.InstalledAt,
GalleryURL: b.GalleryURL,
},
}
}
}
}
return result, nil
}
// InstallBackend fans out backend installation to all healthy worker nodes.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to install backend on worker", "node", node.Name, "backend", backendName, "error", err)
continue
}
if !reply.Success {
xlog.Warn("Backend install failed on worker", "node", node.Name, "backend", backendName, "error", reply.Error)
}
}
return nil
}
// UpgradeBackend fans out a backend upgrade to all healthy worker nodes.
// TODO: Add dedicated NATS subject for upgrade (currently reuses install with force flag)
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
allNodes, err := d.registry.List(context.Background())
if err != nil {
return err
}
galleriesJSON, _ := json.Marshal(d.backendGalleries)
var errs []error
for _, node := range allNodes {
if node.Status != StatusHealthy {
continue
}
// Reuse install endpoint which will re-download the backend (force mode)
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON))
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node during upgrade, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
errs = append(errs, fmt.Errorf("node %s: %w", node.Name, err))
continue
}
if !reply.Success {
errs = append(errs, fmt.Errorf("node %s: %s", node.Name, reply.Error))
}
}
return errors.Join(errs...)
}
// CheckUpgrades checks for available backend upgrades.
// Gallery comparison is global (not per-node), so we delegate to the local manager.
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
return d.local.CheckUpgrades(ctx)
}