mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-30 12:08:13 -04:00
When installing a backend with a custom OCI URI in distributed mode, the URI was captured in ManagementOp.ExternalURI by the HTTP handler but never forwarded to workers. BackendInstallRequest had no URI field, so workers fell through to the gallery lookup and failed with "no backend found with name <custom-name>". Add URI/Name/Alias fields to BackendInstallRequest and thread them from ManagementOp through DistributedBackendManager.InstallBackend() and the RemoteUnloaderAdapter. On the worker side, route to InstallExternalBackend when URI is set instead of InstallBackendFromGallery. Update all remaining InstallBackend call sites (UpgradeBackend, reconciler pending-op drain, router auto-install) to pass empty strings for the new params. Assisted-by: Claude Code:claude-sonnet-4-6 Signed-off-by: Russell Sim <rsl@simopolis.xyz>
165 lines
6.6 KiB
Go
165 lines
6.6 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// backendStopRequest is the request payload for backend.stop (fire-and-forget).
|
|
type backendStopRequest struct {
|
|
Backend string `json:"backend"`
|
|
}
|
|
|
|
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
|
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
|
type NodeCommandSender interface {
|
|
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string) (*messaging.BackendInstallReply, error)
|
|
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
|
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
|
StopBackend(nodeID, backend string) error
|
|
UnloadModelOnNode(nodeID, modelName string) error
|
|
}
|
|
|
|
// RemoteUnloaderAdapter implements NodeCommandSender and model.RemoteModelUnloader
|
|
// by publishing NATS events for backend process lifecycle. The worker process
|
|
// subscribes and handles the actual process start/stop.
|
|
//
|
|
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
|
|
// over NATS for remote nodes.
|
|
type RemoteUnloaderAdapter struct {
|
|
registry ModelLocator
|
|
nats messaging.MessagingClient
|
|
}
|
|
|
|
// NewRemoteUnloaderAdapter creates a new adapter.
|
|
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
|
|
return &RemoteUnloaderAdapter{
|
|
registry: registry,
|
|
nats: nats,
|
|
}
|
|
}
|
|
|
|
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
|
|
// to stop their backend process via NATS backend.stop event.
|
|
// The worker process handles: Free() → kill process.
|
|
// This is called by ModelLoader.deleteProcess() when process == nil (remote model).
|
|
func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
|
|
ctx := context.Background()
|
|
nodes, err := a.registry.FindNodesWithModel(ctx, modelName)
|
|
if err != nil || len(nodes) == 0 {
|
|
xlog.Debug("No remote nodes found with model", "model", modelName)
|
|
return nil
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
xlog.Info("Sending NATS backend.stop to node", "model", modelName, "node", node.Name, "nodeID", node.ID)
|
|
if err := a.StopBackend(node.ID, modelName); err != nil {
|
|
xlog.Warn("Failed to send backend.stop", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
// Remove model from registry — the node will handle the actual cleanup
|
|
a.registry.RemoveNodeModel(ctx, node.ID, modelName)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstallBackend sends a backend.install request-reply to a worker node.
|
|
// The worker installs the backend from gallery (if not already installed),
|
|
// starts the gRPC process, and replies when ready.
|
|
// Timeout: 5 minutes (gallery install can take a while).
|
|
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string) (*messaging.BackendInstallReply, error) {
|
|
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
|
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID)
|
|
|
|
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
|
Backend: backendType,
|
|
ModelID: modelID,
|
|
BackendGalleries: galleriesJSON,
|
|
URI: uri,
|
|
Name: name,
|
|
Alias: alias,
|
|
}, 5*time.Minute)
|
|
}
|
|
|
|
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
|
func (a *RemoteUnloaderAdapter) ListBackends(nodeID string) (*messaging.BackendListReply, error) {
|
|
subject := messaging.SubjectNodeBackendList(nodeID)
|
|
xlog.Debug("Sending NATS backend.list", "nodeID", nodeID)
|
|
|
|
return messaging.RequestJSON[messaging.BackendListRequest, messaging.BackendListReply](a.nats, subject, messaging.BackendListRequest{}, 30*time.Second)
|
|
}
|
|
|
|
// StopBackend tells a worker node to stop a specific gRPC backend process.
|
|
// If backend is empty, the worker stops ALL backends.
|
|
// The node stays registered and can receive another InstallBackend later.
|
|
func (a *RemoteUnloaderAdapter) StopBackend(nodeID, backend string) error {
|
|
subject := messaging.SubjectNodeBackendStop(nodeID)
|
|
if backend == "" {
|
|
return a.nats.Publish(subject, nil)
|
|
}
|
|
req := struct {
|
|
Backend string `json:"backend"`
|
|
}{Backend: backend}
|
|
return a.nats.Publish(subject, req)
|
|
}
|
|
|
|
// DeleteBackend tells a worker node to delete a backend (stop + remove files).
|
|
func (a *RemoteUnloaderAdapter) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) {
|
|
subject := messaging.SubjectNodeBackendDelete(nodeID)
|
|
xlog.Info("Sending NATS backend.delete", "nodeID", nodeID, "backend", backendName)
|
|
|
|
return messaging.RequestJSON[messaging.BackendDeleteRequest, messaging.BackendDeleteReply](a.nats, subject, messaging.BackendDeleteRequest{Backend: backendName}, 2*time.Minute)
|
|
}
|
|
|
|
// UnloadModelOnNode sends a model.unload request to a specific node.
|
|
// The worker calls gRPC Free() to release GPU memory.
|
|
func (a *RemoteUnloaderAdapter) UnloadModelOnNode(nodeID, modelName string) error {
|
|
subject := messaging.SubjectNodeModelUnload(nodeID)
|
|
xlog.Info("Sending NATS model.unload", "nodeID", nodeID, "model", modelName)
|
|
|
|
reply, err := messaging.RequestJSON[messaging.ModelUnloadRequest, messaging.ModelUnloadReply](a.nats, subject, messaging.ModelUnloadRequest{ModelName: modelName}, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("model.unload on node %s: %s", nodeID, reply.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteModelFiles sends model.delete to all nodes that have the model cached.
|
|
// This removes model files from worker disks.
|
|
func (a *RemoteUnloaderAdapter) DeleteModelFiles(modelName string) error {
|
|
nodes, err := a.registry.FindNodesWithModel(context.Background(), modelName)
|
|
if err != nil || len(nodes) == 0 {
|
|
xlog.Debug("No nodes with model for file deletion", "model", modelName)
|
|
return nil
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
subject := messaging.SubjectNodeModelDelete(node.ID)
|
|
xlog.Info("Sending NATS model.delete", "nodeID", node.ID, "model", modelName)
|
|
|
|
reply, err := messaging.RequestJSON[messaging.ModelDeleteRequest, messaging.ModelDeleteReply](a.nats, subject, messaging.ModelDeleteRequest{ModelName: modelName}, 30*time.Second)
|
|
if err != nil {
|
|
xlog.Warn("model.delete failed on node", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
if !reply.Success {
|
|
xlog.Warn("model.delete failed on node", "node", node.Name, "error", reply.Error)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StopNode tells a worker node to shut down entirely (deregister + exit).
|
|
func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
|
|
subject := messaging.SubjectNodeStop(nodeID)
|
|
return a.nats.Publish(subject, nil)
|
|
}
|