Files
LocalAI/core/services/nodes/managers_distributed.go
Ettore Di Giacinto 2da1a4d230 feat(distributed): per-node backend installation from the gallery
In distributed mode the Backends gallery used to fan every install out to
every worker — fine for auto-resolving (meta) backends like llama-cpp where
each node picks its own variant, but wrong for hardware-specific builds
like cpu-llama-cpp that would silently land on every GPU node.

Adds a node-targeted install path through the existing
POST /api/nodes/:id/backends/install plumbing, with two entry points:

- Backends gallery row gets a split-button in distributed mode. Auto-
  resolving keeps "Install on all nodes" as the primary; chevron menu
  opens the picker. Hardware-specific routes the primary directly to the
  picker — no fan-out path on the row.
- Nodes-page drawer gets a "+ Add backend" button that navigates to
  /app/backends?target=<node-id>; the gallery scopes itself to that node
  (banner, single per-row install button, Reinstall/Remove for already-
  installed). One gallery, two scopes — no second UI to maintain.

The picker (new NodeInstallPicker) shows a 3-state suitability column
(Compatible / Override / Installed), an auto-expanding variant override
disclosure that fires when selected nodes have no working GPU, parallel
per-node installs with inline status and Retry-failed-nodes, and a
mismatch confirm that names the consequence on the button itself.

A 409 fan-out guard on /api/backends/apply protects CLI/Terraform/script
users from the same footgun: hardware-specific installs in distributed
mode now return code "concrete_backend_requires_target" with a human-
readable error and a meta_alternative pointer.

The gallery list payload now surfaces capabilities, metaBackendFor and
per-row nodes (NodeBackendRef) so the picker and the new Nodes column
have everything they need without re-walking the gallery client-side.

GODEBUG=netdns=go is set on the compose services because the cgo DNS
resolver follows the container's nsswitch.conf to host systemd-resolved
(127.0.0.53), unreachable from inside the container; the pure-Go
resolver reads /etc/resolv.conf directly and uses Docker's embedded DNS.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude Code:claude-opus-4-7[1m] [Edit] [Bash] [Read] [Write]
2026-04-26 22:05:18 +00:00

390 lines
15 KiB
Go

package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
)
// DistributedModelManager wraps a local ModelManager and adds NATS fan-out
// for model deletion so worker nodes clean up stale files.
type DistributedModelManager struct {
local galleryop.ModelManager
adapter *RemoteUnloaderAdapter
}
// NewDistributedModelManager creates a DistributedModelManager.
// Backend auto-install is disabled because the frontend node delegates
// inference to workers and never runs backends locally.
func NewDistributedModelManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter) *DistributedModelManager {
local := galleryop.NewLocalModelManager(appConfig, ml)
local.SetAutoInstallBackend(false)
return &DistributedModelManager{
local: local,
adapter: adapter,
}
}
func (d *DistributedModelManager) DeleteModel(name string) error {
err := d.local.DeleteModel(name)
// Best-effort: fan out model.delete to worker nodes
if rcErr := d.adapter.DeleteModelFiles(name); rcErr != nil {
xlog.Warn("Failed to propagate model file deletion to workers", "model", name, "error", rcErr)
}
return err
}
func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryModel, gallery.ModelConfig], progressCb galleryop.ProgressCallback) error {
return d.local.InstallModel(ctx, op, progressCb)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
local galleryop.BackendManager
adapter *RemoteUnloaderAdapter
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
}
}
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
// Returned as part of BackendOpResult so the frontend can surface exactly
// what happened on each worker instead of a single joined error string.
type NodeOpStatus struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Status string `json:"status"` // "success" | "queued" | "error"
Error string `json:"error,omitempty"`
}
// BackendOpResult aggregates per-node outcomes.
type BackendOpResult struct {
Nodes []NodeOpStatus `json:"nodes"`
}
// Err returns a non-nil error aggregating per-node hard failures
// (Status == "error"). Queued nodes (waiting for reconciler retry) are not
// failures — surfacing them as errors would mislead users about durable
// intent. Used by Install/Upgrade/Delete so reply.Success=false from
// workers reaches OpStatus.Error and the UI, instead of being silently
// dropped on the way up.
func (r BackendOpResult) Err() error {
var failures []string
for _, n := range r.Nodes {
if n.Status == "error" {
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
}
}
if len(failures) == 0 {
return nil
}
return errors.New(strings.Join(failures, "; "))
}
// enqueueAndDrainBackendOp is the shared scaffolding for
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
// row (intent is durable even if the node is offline). Currently-healthy
// nodes get an immediate attempt; success deletes the row, failure records
// the error and leaves the row for the reconciler to retry.
//
// `apply` is the NATS round-trip for one node. Returning an error keeps the
// row in the queue and marks the per-node status as "error"; returning nil
// deletes the row and reports "success". For non-healthy nodes the status
// is "queued" — no attempt is made right now, reconciler will pick it up
// when the node returns.
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet — no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip — they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
})
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
})
continue
}
applyErr := apply(node)
if applyErr == nil {
// Find the row we just upserted and delete it; cheap but requires
// a lookup since UpsertPendingBackendOp doesn't return the ID.
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
xlog.Debug("Failed to clear pending backend op after success", "error", err)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
continue
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL — mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
}
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
}
return result, nil
}
// findPendingRow looks up the ID of a pending_backend_ops row by its
// composite key. Used to hand off to RecordPendingBackendOpFailure /
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
// composite key.
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
var row PendingBackendOp
if err := d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
First(&row).Error; err != nil {
return 0, err
}
return row.ID, nil
}
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
return d.registry.db.WithContext(ctx).
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
Delete(&PendingBackendOp{}).Error
}
// DeleteBackend fans out backend deletion to every known node. The previous
// implementation silently skipped non-healthy nodes, which meant zombies
// reappeared once those nodes returned. Now the intent is durable — see
// enqueueAndDrainBackendOp — and the reconciler catches up later.
func (d *DistributedBackendManager) DeleteBackend(name string) error {
// Local delete first (frontend rarely has backends installed in
// distributed mode, but the gallery operation still expects it; ignore
// "not found" which is the common case).
if err := d.local.DeleteBackend(name); err != nil {
if !errors.Is(err, gallery.ErrBackendNotFound) {
return err
}
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
}
ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("delete failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
// handler so the UI can render a per-node status drawer. DeleteBackend still
// returns error-only for callers that don't care about node breakdown.
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("delete failed: %s", reply.Error)
}
return nil
})
}
// ListBackends aggregates installed backends from all worker nodes, preserving
// per-node attribution. Each SystemBackend.Nodes entry records which node has
// the backend and the version/digest it reports. The top-level Metadata is
// populated from the first node seen so single-node-minded callers still work.
//
// Pending/offline/draining nodes are skipped because they aren't expected to
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
// then marks them unhealthy and the loop continues.
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
result := make(gallery.SystemBackends)
allNodes, err := d.registry.List(context.Background())
if err != nil {
return result, err
}
for _, node := range allNodes {
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
continue
}
reply, err := d.adapter.ListBackends(node.ID)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(context.Background(), node.ID)
continue
}
xlog.Warn("Failed to list backends on worker", "node", node.Name, "error", err)
continue
}
if reply.Error != "" {
xlog.Warn("Worker returned error listing backends", "node", node.Name, "error", reply.Error)
continue
}
for _, b := range reply.Backends {
ref := gallery.NodeBackendRef{
NodeID: node.ID,
NodeName: node.Name,
NodeStatus: node.Status,
Version: b.Version,
Digest: b.Digest,
URI: b.URI,
InstalledAt: b.InstalledAt,
}
entry, exists := result[b.Name]
if !exists {
entry = gallery.SystemBackend{
Name: b.Name,
IsSystem: b.IsSystem,
IsMeta: b.IsMeta,
Metadata: &gallery.BackendMetadata{
Name: b.Name,
InstalledAt: b.InstalledAt,
GalleryURL: b.GalleryURL,
Version: b.Version,
URI: b.URI,
Digest: b.Digest,
},
}
}
entry.Nodes = append(entry.Nodes, ref)
result[b.Name] = entry
}
}
return result, nil
}
// InstallBackend fans out installation through the pending-ops queue so
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("install failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
// from the gallery). Same queue semantics as Install/Delete.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries)
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "")
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("upgrade failed: %s", reply.Error)
}
return nil
})
if err != nil {
return err
}
return result.Err()
}
// IsDistributed reports that installs from this manager fan out across the
// cluster. The HTTP layer reads this to gate hardware-specific installs on
// /api/backends/apply (which would otherwise silently land on every node).
func (d *DistributedBackendManager) IsDistributed() bool { return true }
// CheckUpgrades checks for available backend upgrades across the cluster.
//
// The previous implementation delegated to d.local, which called
// ListSystemBackends on the frontend — but in distributed mode the frontend
// has no backends installed locally, so the upgrade loop never ran and the UI
// never surfaced any upgrades. We now feed the cluster-wide aggregation
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
// digest-based detection actually works and cluster drift is visible.
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
installed, err := d.ListBackends()
if err != nil {
return nil, err
}
// systemState is used by AvailableBackends (gallery paths + meta-backend
// resolution). The `installed` argument is what the old code got wrong —
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}