mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
162 lines
6.5 KiB
Go
162 lines
6.5 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// backendStopRequest is the request payload for backend.stop (fire-and-forget).
|
|
type backendStopRequest struct {
|
|
Backend string `json:"backend"`
|
|
}
|
|
|
|
// NodeCommandSender abstracts NATS-based commands to worker nodes.
|
|
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
|
|
type NodeCommandSender interface {
|
|
InstallBackend(nodeID, backendType, modelID, galleriesJSON string) (*messaging.BackendInstallReply, error)
|
|
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
|
|
ListBackends(nodeID string) (*messaging.BackendListReply, error)
|
|
StopBackend(nodeID, backend string) error
|
|
UnloadModelOnNode(nodeID, modelName string) error
|
|
}
|
|
|
|
// RemoteUnloaderAdapter implements NodeCommandSender and model.RemoteModelUnloader
|
|
// by publishing NATS events for backend process lifecycle. The worker process
|
|
// subscribes and handles the actual process start/stop.
|
|
//
|
|
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
|
|
// over NATS for remote nodes.
|
|
type RemoteUnloaderAdapter struct {
|
|
registry ModelLocator
|
|
nats messaging.MessagingClient
|
|
}
|
|
|
|
// NewRemoteUnloaderAdapter creates a new adapter.
|
|
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
|
|
return &RemoteUnloaderAdapter{
|
|
registry: registry,
|
|
nats: nats,
|
|
}
|
|
}
|
|
|
|
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
|
|
// to stop their backend process via NATS backend.stop event.
|
|
// The worker process handles: Free() → kill process.
|
|
// This is called by ModelLoader.deleteProcess() when process == nil (remote model).
|
|
func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
|
|
ctx := context.Background()
|
|
nodes, err := a.registry.FindNodesWithModel(ctx, modelName)
|
|
if err != nil || len(nodes) == 0 {
|
|
xlog.Debug("No remote nodes found with model", "model", modelName)
|
|
return nil
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
xlog.Info("Sending NATS backend.stop to node", "model", modelName, "node", node.Name, "nodeID", node.ID)
|
|
if err := a.StopBackend(node.ID, modelName); err != nil {
|
|
xlog.Warn("Failed to send backend.stop", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
// Remove model from registry — the node will handle the actual cleanup
|
|
a.registry.RemoveNodeModel(ctx, node.ID, modelName)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstallBackend sends a backend.install request-reply to a worker node.
|
|
// The worker installs the backend from gallery (if not already installed),
|
|
// starts the gRPC process, and replies when ready.
|
|
// Timeout: 5 minutes (gallery install can take a while).
|
|
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON string) (*messaging.BackendInstallReply, error) {
|
|
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
|
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID)
|
|
|
|
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
|
Backend: backendType,
|
|
ModelID: modelID,
|
|
BackendGalleries: galleriesJSON,
|
|
}, 5*time.Minute)
|
|
}
|
|
|
|
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
|
func (a *RemoteUnloaderAdapter) ListBackends(nodeID string) (*messaging.BackendListReply, error) {
|
|
subject := messaging.SubjectNodeBackendList(nodeID)
|
|
xlog.Debug("Sending NATS backend.list", "nodeID", nodeID)
|
|
|
|
return messaging.RequestJSON[messaging.BackendListRequest, messaging.BackendListReply](a.nats, subject, messaging.BackendListRequest{}, 30*time.Second)
|
|
}
|
|
|
|
// StopBackend tells a worker node to stop a specific gRPC backend process.
|
|
// If backend is empty, the worker stops ALL backends.
|
|
// The node stays registered and can receive another InstallBackend later.
|
|
func (a *RemoteUnloaderAdapter) StopBackend(nodeID, backend string) error {
|
|
subject := messaging.SubjectNodeBackendStop(nodeID)
|
|
if backend == "" {
|
|
return a.nats.Publish(subject, nil)
|
|
}
|
|
req := struct {
|
|
Backend string `json:"backend"`
|
|
}{Backend: backend}
|
|
return a.nats.Publish(subject, req)
|
|
}
|
|
|
|
// DeleteBackend tells a worker node to delete a backend (stop + remove files).
|
|
func (a *RemoteUnloaderAdapter) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) {
|
|
subject := messaging.SubjectNodeBackendDelete(nodeID)
|
|
xlog.Info("Sending NATS backend.delete", "nodeID", nodeID, "backend", backendName)
|
|
|
|
return messaging.RequestJSON[messaging.BackendDeleteRequest, messaging.BackendDeleteReply](a.nats, subject, messaging.BackendDeleteRequest{Backend: backendName}, 2*time.Minute)
|
|
}
|
|
|
|
// UnloadModelOnNode sends a model.unload request to a specific node.
|
|
// The worker calls gRPC Free() to release GPU memory.
|
|
func (a *RemoteUnloaderAdapter) UnloadModelOnNode(nodeID, modelName string) error {
|
|
subject := messaging.SubjectNodeModelUnload(nodeID)
|
|
xlog.Info("Sending NATS model.unload", "nodeID", nodeID, "model", modelName)
|
|
|
|
reply, err := messaging.RequestJSON[messaging.ModelUnloadRequest, messaging.ModelUnloadReply](a.nats, subject, messaging.ModelUnloadRequest{ModelName: modelName}, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !reply.Success {
|
|
return fmt.Errorf("model.unload on node %s: %s", nodeID, reply.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteModelFiles sends model.delete to all nodes that have the model cached.
|
|
// This removes model files from worker disks.
|
|
func (a *RemoteUnloaderAdapter) DeleteModelFiles(modelName string) error {
|
|
nodes, err := a.registry.FindNodesWithModel(context.Background(), modelName)
|
|
if err != nil || len(nodes) == 0 {
|
|
xlog.Debug("No nodes with model for file deletion", "model", modelName)
|
|
return nil
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
subject := messaging.SubjectNodeModelDelete(node.ID)
|
|
xlog.Info("Sending NATS model.delete", "nodeID", node.ID, "model", modelName)
|
|
|
|
reply, err := messaging.RequestJSON[messaging.ModelDeleteRequest, messaging.ModelDeleteReply](a.nats, subject, messaging.ModelDeleteRequest{ModelName: modelName}, 30*time.Second)
|
|
if err != nil {
|
|
xlog.Warn("model.delete failed on node", "node", node.Name, "error", err)
|
|
continue
|
|
}
|
|
if !reply.Success {
|
|
xlog.Warn("model.delete failed on node", "node", node.Name, "error", reply.Error)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StopNode tells a worker node to shut down entirely (deregister + exit).
|
|
func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
|
|
subject := messaging.SubjectNodeStop(nodeID)
|
|
return a.nats.Publish(subject, nil)
|
|
}
|