mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-16 20:52:08 -04:00
In distributed mode the React UI's "Upgrade All" button fanned every detected outdated backend out to every healthy backend node, including nodes that never had that backend installed. On heterogeneous clusters this surfaced as platform errors (e.g. mac-mini-m4 asked to upgrade cpu-insightface-development, which has no darwin/arm64 variant) and left forever-retrying pending_backend_ops rows. DistributedBackendManager.UpgradeBackend now queries ListBackends() first, builds the target node-ID set from SystemBackend.Nodes, and only fans out to those nodes — every per-node primitive (adapter.InstallBackend, the pending-ops queue, BackendOpResult) is unchanged. enqueueAndDrainBackendOp gains an optional targetNodeIDs allowlist; Install/Delete keep their fan-to-everyone semantics by passing nil. If no node reports the backend installed, UpgradeBackend now returns a clear "not installed on any node" error instead of producing a stuck queue. Adds Ginkgo coverage for the smart fan-out: backend on a subset of nodes goes only to those nodes; backend on no node returns the new error and never sends a NATS install request. Assisted-by: Claude:claude-opus-4-7 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
418 lines
16 KiB
Go
418 lines
16 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/system"
|
|
"github.com/mudler/xlog"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// DistributedModelManager wraps a local ModelManager and adds NATS fan-out
|
|
// for model deletion so worker nodes clean up stale files.
|
|
type DistributedModelManager struct {
|
|
local galleryop.ModelManager
|
|
adapter *RemoteUnloaderAdapter
|
|
}
|
|
|
|
// NewDistributedModelManager creates a DistributedModelManager.
|
|
// Backend auto-install is disabled because the frontend node delegates
|
|
// inference to workers and never runs backends locally.
|
|
func NewDistributedModelManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter) *DistributedModelManager {
|
|
local := galleryop.NewLocalModelManager(appConfig, ml)
|
|
local.SetAutoInstallBackend(false)
|
|
return &DistributedModelManager{
|
|
local: local,
|
|
adapter: adapter,
|
|
}
|
|
}
|
|
|
|
func (d *DistributedModelManager) DeleteModel(name string) error {
|
|
err := d.local.DeleteModel(name)
|
|
// Best-effort: fan out model.delete to worker nodes
|
|
if rcErr := d.adapter.DeleteModelFiles(name); rcErr != nil {
|
|
xlog.Warn("Failed to propagate model file deletion to workers", "model", name, "error", rcErr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryModel, gallery.ModelConfig], progressCb galleryop.ProgressCallback) error {
|
|
return d.local.InstallModel(ctx, op, progressCb)
|
|
}
|
|
|
|
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
|
|
// for backend deletion so worker nodes clean up stale files.
|
|
type DistributedBackendManager struct {
|
|
local galleryop.BackendManager
|
|
adapter *RemoteUnloaderAdapter
|
|
registry *NodeRegistry
|
|
backendGalleries []config.Gallery
|
|
systemState *system.SystemState
|
|
}
|
|
|
|
// NewDistributedBackendManager creates a DistributedBackendManager.
|
|
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
|
|
return &DistributedBackendManager{
|
|
local: galleryop.NewLocalBackendManager(appConfig, ml),
|
|
adapter: adapter,
|
|
registry: registry,
|
|
backendGalleries: appConfig.BackendGalleries,
|
|
systemState: appConfig.SystemState,
|
|
}
|
|
}
|
|
|
|
// NodeOpStatus is the per-node outcome of a backend lifecycle operation.
|
|
// Returned as part of BackendOpResult so the frontend can surface exactly
|
|
// what happened on each worker instead of a single joined error string.
|
|
type NodeOpStatus struct {
|
|
NodeID string `json:"node_id"`
|
|
NodeName string `json:"node_name"`
|
|
Status string `json:"status"` // "success" | "queued" | "error"
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// BackendOpResult aggregates per-node outcomes.
|
|
type BackendOpResult struct {
|
|
Nodes []NodeOpStatus `json:"nodes"`
|
|
}
|
|
|
|
// Err returns a non-nil error aggregating per-node hard failures
|
|
// (Status == "error"). Queued nodes (waiting for reconciler retry) are not
|
|
// failures — surfacing them as errors would mislead users about durable
|
|
// intent. Used by Install/Upgrade/Delete so reply.Success=false from
|
|
// workers reaches OpStatus.Error and the UI, instead of being silently
|
|
// dropped on the way up.
|
|
func (r BackendOpResult) Err() error {
|
|
var failures []string
|
|
for _, n := range r.Nodes {
|
|
if n.Status == "error" {
|
|
failures = append(failures, fmt.Sprintf("%s: %s", n.NodeName, n.Error))
|
|
}
|
|
}
|
|
if len(failures) == 0 {
|
|
return nil
|
|
}
|
|
return errors.New(strings.Join(failures, "; "))
|
|
}
|
|
|
|
// enqueueAndDrainBackendOp is the shared scaffolding for
|
|
// delete/install/upgrade. Every non-pending node gets a pending_backend_ops
|
|
// row (intent is durable even if the node is offline). Currently-healthy
|
|
// nodes get an immediate attempt; success deletes the row, failure records
|
|
// the error and leaves the row for the reconciler to retry.
|
|
//
|
|
// `apply` is the NATS round-trip for one node. Returning an error keeps the
|
|
// row in the queue and marks the per-node status as "error"; returning nil
|
|
// deletes the row and reports "success". For non-healthy nodes the status
|
|
// is "queued" — no attempt is made right now, reconciler will pick it up
|
|
// when the node returns.
|
|
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
|
|
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
|
|
// never had the backend installed to "upgrade" it — such requests fail at the
|
|
// gallery (no platform variant) and would otherwise leave a forever-retrying
|
|
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
|
|
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
|
|
allNodes, err := d.registry.List(ctx)
|
|
if err != nil {
|
|
return BackendOpResult{}, err
|
|
}
|
|
|
|
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
|
|
for _, node := range allNodes {
|
|
// Pending nodes haven't been approved yet — no intent to apply.
|
|
if node.Status == StatusPending {
|
|
continue
|
|
}
|
|
// Backend lifecycle ops only make sense on backend-type workers.
|
|
// Agent workers don't subscribe to backend.install/delete/list, so
|
|
// enqueueing for them guarantees a forever-retrying row that the
|
|
// reconciler can never drain. Silently skip — they aren't consumers.
|
|
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
|
|
continue
|
|
}
|
|
if targetNodeIDs != nil && !targetNodeIDs[node.ID] {
|
|
continue
|
|
}
|
|
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
|
|
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "error",
|
|
Error: fmt.Sprintf("enqueue failed: %v", err),
|
|
})
|
|
continue
|
|
}
|
|
|
|
if node.Status != StatusHealthy {
|
|
// Intent is recorded; reconciler will retry when the node recovers.
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "queued",
|
|
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
|
|
})
|
|
continue
|
|
}
|
|
|
|
applyErr := apply(node)
|
|
if applyErr == nil {
|
|
// Find the row we just upserted and delete it; cheap but requires
|
|
// a lookup since UpsertPendingBackendOp doesn't return the ID.
|
|
if err := d.deletePendingRow(ctx, node.ID, backend, op); err != nil {
|
|
xlog.Debug("Failed to clear pending backend op after success", "error", err)
|
|
}
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "success",
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Record failure for backoff. If it's an ErrNoResponders, the node's
|
|
// gone AWOL — mark unhealthy so the router stops picking it too.
|
|
errMsg := applyErr.Error()
|
|
if errors.Is(applyErr, nats.ErrNoResponders) {
|
|
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
|
d.registry.MarkUnhealthy(ctx, node.ID)
|
|
}
|
|
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
|
|
_ = d.registry.RecordPendingBackendOpFailure(ctx, id, errMsg)
|
|
}
|
|
result.Nodes = append(result.Nodes, NodeOpStatus{
|
|
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// findPendingRow looks up the ID of a pending_backend_ops row by its
|
|
// composite key. Used to hand off to RecordPendingBackendOpFailure /
|
|
// DeletePendingBackendOp after UpsertPendingBackendOp upserts by the same
|
|
// composite key.
|
|
func (d *DistributedBackendManager) findPendingRow(ctx context.Context, nodeID, backend, op string) (uint, error) {
|
|
var row PendingBackendOp
|
|
if err := d.registry.db.WithContext(ctx).
|
|
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
|
First(&row).Error; err != nil {
|
|
return 0, err
|
|
}
|
|
return row.ID, nil
|
|
}
|
|
|
|
// deletePendingRow removes the queue row keyed by (nodeID, backend, op).
|
|
func (d *DistributedBackendManager) deletePendingRow(ctx context.Context, nodeID, backend, op string) error {
|
|
return d.registry.db.WithContext(ctx).
|
|
Where("node_id = ? AND backend = ? AND op = ?", nodeID, backend, op).
|
|
Delete(&PendingBackendOp{}).Error
|
|
}
|
|
|
|
// DeleteBackend fans out backend deletion to every known node. The previous
|
|
// implementation silently skipped non-healthy nodes, which meant zombies
|
|
// reappeared once those nodes returned. Now the intent is durable — see
|
|
// enqueueAndDrainBackendOp — and the reconciler catches up later.
|
|
func (d *DistributedBackendManager) DeleteBackend(name string) error {
|
|
// Local delete first (frontend rarely has backends installed in
|
|
// distributed mode, but the gallery operation still expects it; ignore
|
|
// "not found" which is the common case).
|
|
if err := d.local.DeleteBackend(name); err != nil {
|
|
if !errors.Is(err, gallery.ErrBackendNotFound) {
|
|
return err
|
|
}
|
|
xlog.Debug("Backend not found locally, will attempt deletion on workers", "backend", name)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
|
|
reply, err := d.adapter.DeleteBackend(node.ID, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("delete failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// DeleteBackendDetailed is the per-node-result variant called by the HTTP
|
|
// handler so the UI can render a per-node status drawer. DeleteBackend still
|
|
// returns error-only for callers that don't care about node breakdown.
|
|
func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, name string) (BackendOpResult, error) {
|
|
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
|
|
return BackendOpResult{}, err
|
|
}
|
|
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
|
|
reply, err := d.adapter.DeleteBackend(node.ID, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("delete failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ListBackends aggregates installed backends from all worker nodes, preserving
|
|
// per-node attribution. Each SystemBackend.Nodes entry records which node has
|
|
// the backend and the version/digest it reports. The top-level Metadata is
|
|
// populated from the first node seen so single-node-minded callers still work.
|
|
//
|
|
// Pending/offline/draining nodes are skipped because they aren't expected to
|
|
// answer NATS requests; unhealthy nodes are still queried — ErrNoResponders
|
|
// then marks them unhealthy and the loop continues.
|
|
func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, error) {
|
|
result := make(gallery.SystemBackends)
|
|
allNodes, err := d.registry.List(context.Background())
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
for _, node := range allNodes {
|
|
if node.Status == StatusPending || node.Status == StatusOffline || node.Status == StatusDraining {
|
|
continue
|
|
}
|
|
reply, err := d.adapter.ListBackends(node.ID)
|
|
if err != nil {
|
|
if errors.Is(err, nats.ErrNoResponders) {
|
|
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
|
|
d.registry.MarkUnhealthy(context.Background(), node.ID)
|
|
continue
|
|
}
|
|
xlog.Warn("Failed to list backends on worker", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
if reply.Error != "" {
|
|
xlog.Warn("Worker returned error listing backends", "node", node.Name, "error", reply.Error)
|
|
continue
|
|
}
|
|
for _, b := range reply.Backends {
|
|
ref := gallery.NodeBackendRef{
|
|
NodeID: node.ID,
|
|
NodeName: node.Name,
|
|
NodeStatus: node.Status,
|
|
Version: b.Version,
|
|
Digest: b.Digest,
|
|
URI: b.URI,
|
|
InstalledAt: b.InstalledAt,
|
|
}
|
|
entry, exists := result[b.Name]
|
|
if !exists {
|
|
entry = gallery.SystemBackend{
|
|
Name: b.Name,
|
|
IsSystem: b.IsSystem,
|
|
IsMeta: b.IsMeta,
|
|
Metadata: &gallery.BackendMetadata{
|
|
Name: b.Name,
|
|
InstalledAt: b.InstalledAt,
|
|
GalleryURL: b.GalleryURL,
|
|
Version: b.Version,
|
|
URI: b.URI,
|
|
Digest: b.Digest,
|
|
},
|
|
}
|
|
}
|
|
entry.Nodes = append(entry.Nodes, ref)
|
|
result[b.Name] = entry
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// InstallBackend fans out installation through the pending-ops queue so
|
|
// non-healthy nodes get retried when they come back instead of being silently
|
|
// skipped. Reply success from the NATS round-trip deletes the queue row;
|
|
// reply.Success==false is treated as an error so the row stays for retry.
|
|
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
|
|
galleriesJSON, _ := json.Marshal(op.Galleries)
|
|
backendName := op.GalleryElementName
|
|
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
|
|
// Admin-driven backend install: not tied to a specific replica slot.
|
|
// Pass replica 0 — the worker's processKey is "backend#0" when no
|
|
// modelID is supplied, matching pre-PR4 behavior.
|
|
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("install failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// UpgradeBackend reuses the install NATS subject (the worker re-downloads
|
|
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes
|
|
// that already report this backend as installed — fanning out to every node
|
|
// would ask workers to "upgrade" something they never had, which fails at
|
|
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
|
|
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
|
|
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
|
|
galleriesJSON, _ := json.Marshal(d.backendGalleries)
|
|
|
|
installed, err := d.ListBackends()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list cluster backends: %w", err)
|
|
}
|
|
entry, ok := installed[name]
|
|
if !ok || len(entry.Nodes) == 0 {
|
|
return fmt.Errorf("backend %q is not installed on any node", name)
|
|
}
|
|
targetNodeIDs := make(map[string]bool, len(entry.Nodes))
|
|
for _, n := range entry.Nodes {
|
|
targetNodeIDs[n.NodeID] = true
|
|
}
|
|
|
|
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
|
|
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("upgrade failed: %s", reply.Error)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return result.Err()
|
|
}
|
|
|
|
// IsDistributed reports that installs from this manager fan out across the
|
|
// cluster. The HTTP layer reads this to gate hardware-specific installs on
|
|
// /api/backends/apply (which would otherwise silently land on every node).
|
|
func (d *DistributedBackendManager) IsDistributed() bool { return true }
|
|
|
|
// CheckUpgrades checks for available backend upgrades across the cluster.
|
|
//
|
|
// The previous implementation delegated to d.local, which called
|
|
// ListSystemBackends on the frontend — but in distributed mode the frontend
|
|
// has no backends installed locally, so the upgrade loop never ran and the UI
|
|
// never surfaced any upgrades. We now feed the cluster-wide aggregation
|
|
// (including per-node versions/digests) into gallery.CheckUpgradesAgainst so
|
|
// digest-based detection actually works and cluster drift is visible.
|
|
func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[string]gallery.UpgradeInfo, error) {
|
|
installed, err := d.ListBackends()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// systemState is used by AvailableBackends (gallery paths + meta-backend
|
|
// resolution). The `installed` argument is what the old code got wrong —
|
|
// it used to come from the empty frontend filesystem.
|
|
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
|
|
}
|