mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 13:10:23 -04:00
* fix(distributed): honor NodeSelector in cached-replica lookup, stop empty-backend reconciler scaleups
Two distinct bugs were causing tight retry loops in the distributed scheduler:
1. FindAndLockNodeWithModel ignored the model's NodeSelector. When a model
was loaded on multiple nodes and only some matched the current selector,
the function returned the lowest-in_flight node — even one the selector
excluded. Route()'s post-check then fell through to scheduleNewModel,
which targeted the matching node where the model was already at
MaxReplicasPerModel capacity. Eviction couldn't help (the only loaded
model on that node was the one being requested, and it was busy), so
every request looped through "evicting LRU" → "all models busy".
Fix: thread an optional candidateNodeIDs filter through
FindAndLockNodeWithModel. Route() resolves the selector once via a new
resolveSelectorCandidates helper and passes the matching IDs to both
the cached-replica lookup and scheduleNewModel. The same helper
replaces the inline selector block in scheduleNewModel.
2. ScheduleAndLoadModel (reconciler scale-up path) fell back to
scheduleNewModel with backendType="" when no replica had ever been
loaded for a model. The worker rejected the resulting backend.install
("backend name is empty") on every reconciler tick (~30s).
Fix: remove the broken fallback. When GetModelLoadInfo has nothing
stored, return a clear error instead of firing a doomed NATS install.
The reconciler's existing scale-up failure log surfaces it once per
tick; the model auto-replicates as soon as Route() serves it once and
stores load info.
Also downgrade the post-LoadModel-failure StopGRPC error to Debug — that
cleanup attempt usually hits "model not found" because LoadModel failed
before registering the process, and the outer "Failed to load model"
error already carries the real reason.
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]
* test(distributed): cover selector-aware FindAndLockNodeWithModel and reconciler scaleup guard
Two regression tests for the bugs fixed in the previous commit:
1. FindAndLockNodeWithModel — registry-level integration tests verify the
candidateNodeIDs filter:
- Returns the included node even when an excluded node has lower
in_flight (the original selector-mismatch loop scenario).
- Returns not-found when the model is loaded only on excluded nodes,
forcing Route() to fall through to a fresh schedule instead of
reusing the excluded replica.
2. ScheduleAndLoadModel — mock-based test verifies the reconciler scale-up
path returns an error and does NOT fire backend.install when no replica
has been loaded yet. fakeUnloader gains an installCalls slice so this
negative assertion is direct.
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]
---------
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
107 lines
5.6 KiB
Go
107 lines
5.6 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
grpc "github.com/mudler/LocalAI/pkg/grpc"
|
|
)
|
|
|
|
// ModelRouter is used by SmartRouter for routing decisions and model lifecycle.
|
|
type ModelRouter interface {
|
|
FindAndLockNodeWithModel(ctx context.Context, modelName string, candidateNodeIDs []string) (*BackendNode, *NodeModel, error)
|
|
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
|
|
TouchNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int)
|
|
SetNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int, state, address string, initialInFlight int) error
|
|
SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName string, replicaIndex int, backendType string, optsBlob []byte) error
|
|
GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error)
|
|
NextFreeReplicaIndex(ctx context.Context, nodeID, modelName string, maxSlots int) (int, error)
|
|
CountReplicasOnNode(ctx context.Context, nodeID, modelName string) (int, error)
|
|
FindNodeWithVRAM(ctx context.Context, minBytes uint64) (*BackendNode, error)
|
|
FindIdleNode(ctx context.Context) (*BackendNode, error)
|
|
FindLeastLoadedNode(ctx context.Context) (*BackendNode, error)
|
|
FindGlobalLRUModelWithZeroInFlight(ctx context.Context) (*NodeModel, error)
|
|
FindLRUModel(ctx context.Context, nodeID string) (*NodeModel, error)
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error)
|
|
FindNodesBySelector(ctx context.Context, selector map[string]string) ([]BackendNode, error)
|
|
FindNodesWithFreeSlot(ctx context.Context, modelName string, candidateNodeIDs []string) ([]BackendNode, error)
|
|
ReserveVRAM(ctx context.Context, nodeID string, bytes uint64) error
|
|
ReleaseVRAM(ctx context.Context, nodeID string, bytes uint64) error
|
|
FindNodeWithVRAMFromSet(ctx context.Context, minBytes uint64, nodeIDs []string) (*BackendNode, error)
|
|
FindIdleNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
|
|
FindLeastLoadedNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error)
|
|
GetNodeLabels(ctx context.Context, nodeID string) ([]NodeLabel, error)
|
|
}
|
|
|
|
// NodeHealthStore is used by HealthMonitor for node status management.
|
|
type NodeHealthStore interface {
|
|
List(ctx context.Context) ([]BackendNode, error)
|
|
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
|
|
MarkOffline(ctx context.Context, nodeID string) error
|
|
MarkUnhealthy(ctx context.Context, nodeID string) error
|
|
MarkHealthy(ctx context.Context, nodeID string) error
|
|
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
|
|
FindStaleNodes(ctx context.Context, threshold time.Duration) ([]BackendNode, error)
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
}
|
|
|
|
// ModelLocator is used by RemoteUnloaderAdapter for model discovery.
|
|
type ModelLocator interface {
|
|
FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error)
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// ModelLookup is used by DistributedModelStore for model existence queries.
|
|
type ModelLookup interface {
|
|
FindNodeForModel(ctx context.Context, modelName string) (*BackendNode, bool)
|
|
ListAllLoadedModels(ctx context.Context) ([]NodeModel, error)
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
}
|
|
|
|
// InFlightTracker is used by InFlightTrackingClient for request counting.
|
|
type InFlightTracker interface {
|
|
IncrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
DecrementInFlight(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
}
|
|
|
|
// NodeManager is used by HTTP endpoints for node registration and lifecycle.
|
|
type NodeManager interface {
|
|
Register(ctx context.Context, node *BackendNode, autoApprove bool) error
|
|
Get(ctx context.Context, nodeID string) (*BackendNode, error)
|
|
GetByName(ctx context.Context, name string) (*BackendNode, error)
|
|
List(ctx context.Context) ([]BackendNode, error)
|
|
Deregister(ctx context.Context, nodeID string) error
|
|
ApproveNode(ctx context.Context, nodeID string) error
|
|
MarkOffline(ctx context.Context, nodeID string) error
|
|
MarkDraining(ctx context.Context, nodeID string) error
|
|
MarkHealthy(ctx context.Context, nodeID string) error
|
|
Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error
|
|
GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error)
|
|
UpdateAuthRefs(ctx context.Context, nodeID, authUserID, apiKeyID string) error
|
|
RemoveNodeModel(ctx context.Context, nodeID, modelName string, replicaIndex int) error
|
|
RemoveAllNodeModelReplicas(ctx context.Context, nodeID, modelName string) error
|
|
}
|
|
|
|
// BackendClientFactory creates gRPC backend clients.
|
|
type BackendClientFactory interface {
|
|
NewClient(address string, parallel bool) grpc.Backend
|
|
}
|
|
|
|
// tokenClientFactory is the default BackendClientFactory that creates gRPC
|
|
// clients with an optional bearer token for distributed auth.
|
|
type tokenClientFactory struct {
|
|
token string
|
|
}
|
|
|
|
func (f *tokenClientFactory) NewClient(address string, parallel bool) grpc.Backend {
|
|
if f.token != "" {
|
|
return grpc.NewClientWithToken(address, parallel, nil, false, f.token)
|
|
}
|
|
return grpc.NewClient(address, parallel, nil, false)
|
|
}
|