Files
LocalAI/core/services/nodes/unloader.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

162 lines
6.5 KiB
Go

package nodes
import (
"context"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
// backendStopRequest is the request payload for backend.stop (fire-and-forget).
type backendStopRequest struct {
Backend string `json:"backend"`
}
// NodeCommandSender abstracts NATS-based commands to worker nodes.
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON string) (*messaging.BackendInstallReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
StopBackend(nodeID, backend string) error
UnloadModelOnNode(nodeID, modelName string) error
}
// RemoteUnloaderAdapter implements NodeCommandSender and model.RemoteModelUnloader
// by publishing NATS events for backend process lifecycle. The worker process
// subscribes and handles the actual process start/stop.
//
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
}
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
}
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.
// This is called by ModelLoader.deleteProcess() when process == nil (remote model).
func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
ctx := context.Background()
nodes, err := a.registry.FindNodesWithModel(ctx, modelName)
if err != nil || len(nodes) == 0 {
xlog.Debug("No remote nodes found with model", "model", modelName)
return nil
}
for _, node := range nodes {
xlog.Info("Sending NATS backend.stop to node", "model", modelName, "node", node.Name, "nodeID", node.ID)
if err := a.StopBackend(node.ID, modelName); err != nil {
xlog.Warn("Failed to send backend.stop", "node", node.Name, "error", err)
continue
}
// Remove model from registry — the node will handle the actual cleanup
a.registry.RemoveNodeModel(ctx, node.ID, modelName)
}
return nil
}
// InstallBackend sends a backend.install request-reply to a worker node.
// The worker installs the backend from gallery (if not already installed),
// starts the gRPC process, and replies when ready.
// Timeout: 5 minutes (gallery install can take a while).
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON string) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID)
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
ModelID: modelID,
BackendGalleries: galleriesJSON,
}, 5*time.Minute)
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.
func (a *RemoteUnloaderAdapter) ListBackends(nodeID string) (*messaging.BackendListReply, error) {
subject := messaging.SubjectNodeBackendList(nodeID)
xlog.Debug("Sending NATS backend.list", "nodeID", nodeID)
return messaging.RequestJSON[messaging.BackendListRequest, messaging.BackendListReply](a.nats, subject, messaging.BackendListRequest{}, 30*time.Second)
}
// StopBackend tells a worker node to stop a specific gRPC backend process.
// If backend is empty, the worker stops ALL backends.
// The node stays registered and can receive another InstallBackend later.
func (a *RemoteUnloaderAdapter) StopBackend(nodeID, backend string) error {
subject := messaging.SubjectNodeBackendStop(nodeID)
if backend == "" {
return a.nats.Publish(subject, nil)
}
req := struct {
Backend string `json:"backend"`
}{Backend: backend}
return a.nats.Publish(subject, req)
}
// DeleteBackend tells a worker node to delete a backend (stop + remove files).
func (a *RemoteUnloaderAdapter) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) {
subject := messaging.SubjectNodeBackendDelete(nodeID)
xlog.Info("Sending NATS backend.delete", "nodeID", nodeID, "backend", backendName)
return messaging.RequestJSON[messaging.BackendDeleteRequest, messaging.BackendDeleteReply](a.nats, subject, messaging.BackendDeleteRequest{Backend: backendName}, 2*time.Minute)
}
// UnloadModelOnNode sends a model.unload request to a specific node.
// The worker calls gRPC Free() to release GPU memory.
func (a *RemoteUnloaderAdapter) UnloadModelOnNode(nodeID, modelName string) error {
subject := messaging.SubjectNodeModelUnload(nodeID)
xlog.Info("Sending NATS model.unload", "nodeID", nodeID, "model", modelName)
reply, err := messaging.RequestJSON[messaging.ModelUnloadRequest, messaging.ModelUnloadReply](a.nats, subject, messaging.ModelUnloadRequest{ModelName: modelName}, 30*time.Second)
if err != nil {
return err
}
if !reply.Success {
return fmt.Errorf("model.unload on node %s: %s", nodeID, reply.Error)
}
return nil
}
// DeleteModelFiles sends model.delete to all nodes that have the model cached.
// This removes model files from worker disks.
func (a *RemoteUnloaderAdapter) DeleteModelFiles(modelName string) error {
nodes, err := a.registry.FindNodesWithModel(context.Background(), modelName)
if err != nil || len(nodes) == 0 {
xlog.Debug("No nodes with model for file deletion", "model", modelName)
return nil
}
for _, node := range nodes {
subject := messaging.SubjectNodeModelDelete(node.ID)
xlog.Info("Sending NATS model.delete", "nodeID", node.ID, "model", modelName)
reply, err := messaging.RequestJSON[messaging.ModelDeleteRequest, messaging.ModelDeleteReply](a.nats, subject, messaging.ModelDeleteRequest{ModelName: modelName}, 30*time.Second)
if err != nil {
xlog.Warn("model.delete failed on node", "node", node.Name, "error", err)
continue
}
if !reply.Success {
xlog.Warn("model.delete failed on node", "node", node.Name, "error", reply.Error)
}
}
return nil
}
// StopNode tells a worker node to shut down entirely (deregister + exit).
func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
subject := messaging.SubjectNodeStop(nodeID)
return a.nats.Publish(subject, nil)
}