Files
LocalAI/core/services/nodes/interfaces.go
LocalAI [bot] 858257eaf0 fix(distributed): self-heal stale 'model not loaded' routing (#10181)
* fix(distributed): self-heal stale 'model not loaded' routing

In distributed mode the registry can list a model as loaded on a node
while the worker has evicted it (autonomous LRU eviction, an out-of-band
unload, etc.) yet the backend process survives. The router's cached-node
check only verifies the process is alive (probeHealth), so it routes there
and inference fails with "<backend>: model not loaded" — and stays broken
until the controller restarts and rebuilds its registry.

InFlightTrackingClient now reconciles this: when a tracked inference call
returns a model-not-loaded error, it drops the stale replica row
(RemoveNodeModel) so the next request reloads the model on a healthy node
instead of routing back to the evicted one. The original error is returned
unchanged; only the registry is corrected.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(distributed): typed model-not-loaded error via gRPC status code

Replace the controller-side error-string match with a shared, code-aware
helper. Go error types don't survive the gRPC boundary, so the signal is
carried as a status code (FailedPrecondition):

- pkg/grpc/grpcerrors: ModelNotLoaded(backend) constructor +
  IsModelNotLoaded(err) checker (status-code first, message fallback for
  backends not yet migrated).
- InFlightTrackingClient.reconcile now uses grpcerrors.IsModelNotLoaded.
- Migrate the Go backends that emit this error (parakeet-cpp, cloud-proxy,
  rfdetr-cpp) to the typed constructor.

Acting on a false positive is harmless (the model is just reloaded).

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-05 09:01:36 +02:00

121 lines
6.5 KiB
Go

package nodes
import (
"context"
"time"
grpc "github.com/mudler/LocalAI/pkg/grpc"
)
// ModelRouter is used by SmartRouter for routing decisions and model lifecycle.
type ModelRouter interface {
FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string, pref *RoutePreference) (*BackendNode, *NodeModel, error)
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
TouchNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int)
SetNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int, state, address string, initialInFlight int) error
SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName string, replicaIndex int, backendType string, optsBlob []byte) error
UpsertModelLoadInfo(ctx context.Context, modelName, backendType string, optsBlob []byte) error
GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error)
NextFreeReplicaIndex(ctx context.Context, nodeID, modelName string, maxSlots int) (int, error)
CountReplicasOnNode(ctx context.Context, nodeID, modelName string) (int, error)
FindNodeWithVRAM(ctx context.Context, minBytes uint64) (*BackendNode, error)
FindIdleNode(ctx context.Context) (*BackendNode, error)
FindLeastLoadedNode(ctx context.Context) (*BackendNode, error)
FindGlobalLRUModelWithZeroInFlight(ctx context.Context) (*NodeModel, error)
FindLRUModel(ctx context.Context, nodeID string) (*NodeModel, error)
Get(ctx context.Context, nodeID string) (*BackendNode, error)
GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error)
FindNodesBySelector(ctx context.Context, selector map[string]string) ([]BackendNode, error)
FindNodesWithFreeSlot(ctx context.Context, modelName string, candidateNodeIDs []string) ([]BackendNode, error)
ReserveVRAM(ctx context.Context, nodeID string, bytes uint64) error
ReleaseVRAM(ctx context.Context, nodeID string, bytes uint64) error
FindNodeWithVRAMFromSet(ctx context.Context, minBytes uint64, nodeIDs []string) (*BackendNode, error)
FindIdleNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
FindLeastLoadedNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
GetNodeLabels(ctx context.Context, nodeID string) ([]NodeLabel, error)
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
LoadedReplicaStats(ctx context.Context, modelName string, candidateNodeIDs []string) ([]ReplicaCandidate, error)
}
// ConcurrencyConflictResolver returns the names of configured models that
// share at least one concurrency group with the given model. It is satisfied
// by *config.ModelConfigLoader and lets the SmartRouter make group-aware
// placement decisions without importing the config package's full surface.
type ConcurrencyConflictResolver interface {
GetModelsConflictingWith(modelName string) []string
}
// NodeHealthStore is used by HealthMonitor for node status management.
type NodeHealthStore interface {
List(ctx context.Context) ([]BackendNode, error)
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
MarkOffline(ctx context.Context, nodeID string) error
MarkUnhealthy(ctx context.Context, nodeID string) error
MarkHealthy(ctx context.Context, nodeID string) error
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
FindStaleNodes(ctx context.Context, threshold time.Duration) ([]BackendNode, error)
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
}
// ModelLocator is used by RemoteUnloaderAdapter for model discovery.
type ModelLocator interface {
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
}
// ModelLookup is used by DistributedModelStore for model existence queries.
type ModelLookup interface {
FindNodeForModel(ctx context.Context, modelName string) (*BackendNode, bool)
ListAllLoadedModels(ctx context.Context) ([]NodeModel, error)
Get(ctx context.Context, nodeID string) (*BackendNode, error)
}
// InFlightTracker is used by InFlightTrackingClient for request counting.
type InFlightTracker interface {
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
// RemoveNodeModel drops a stale replica row so the next request reloads the
// model instead of routing back to a node where it is no longer loaded.
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
}
// NodeManager is used by HTTP endpoints for node registration and lifecycle.
type NodeManager interface {
Register(ctx context.Context, node *BackendNode, autoApprove bool) error
Get(ctx context.Context, nodeID string) (*BackendNode, error)
GetByName(ctx context.Context, name string) (*BackendNode, error)
List(ctx context.Context) ([]BackendNode, error)
Deregister(ctx context.Context, nodeID string) error
ApproveNode(ctx context.Context, nodeID string) error
MarkOffline(ctx context.Context, nodeID string) error
MarkDraining(ctx context.Context, nodeID string) error
MarkHealthy(ctx context.Context, nodeID string) error
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
UpdateAuthRefs(ctx context.Context, nodeID, authUserID, apiKeyID string) error
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
}
// BackendClientFactory creates gRPC backend clients.
type BackendClientFactory interface {
NewClient(address string, parallel bool) grpc.Backend
}
// tokenClientFactory is the default BackendClientFactory that creates gRPC
// clients with an optional bearer token for distributed auth.
type tokenClientFactory struct {
token string
}
func (f *tokenClientFactory) NewClient(address string, parallel bool) grpc.Backend {
if f.token != "" {
return grpc.NewClientWithToken(address, parallel, nil, false, f.token)
}
return grpc.NewClient(address, parallel, nil, false)
}