mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-01 12:42:55 -04:00
refactor(routing): extract replica picker into pkg/clusterrouting (#10123)
Move ReplicaCandidate and PickBestReplica out of core/services/nodes (which depends on gorm) into a new dependency-light leaf package pkg/clusterrouting, so the p2p federation server can later share the same replica-selection policy without pulling in a database driver. core/services/nodes keeps a type alias and a thin delegator, so every existing reference (the LoadedReplicaStats interface method, the ReplicaCandidate row conversion in registry.go, and the SQL policy-mirror test) compiles and behaves unchanged. This is a pure, behavior-preserving refactor: the full nodes suite, including the policy-mirror spec that pins the SQL ORDER BY to PickBestReplica, stays green. Assisted-by: Claude Code:claude-opus-4-8 Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -1,69 +1,17 @@
|
||||
package nodes
|
||||
|
||||
import "time"
|
||||
import "github.com/mudler/LocalAI/pkg/clusterrouting"
|
||||
|
||||
// ReplicaCandidate is the minimum view of a loaded model replica needed to
|
||||
// apply the routing policy. It is intentionally decoupled from the gorm models
|
||||
// (BackendNode, NodeModel) so the same picker can run against fresh DB rows
|
||||
// (SmartRouter.Route → FindAndLockNodeWithModel) and against an in-memory
|
||||
// snapshot (the per-frontend rotating cache flagged in pkg/model — see TODO
|
||||
// below).
|
||||
type ReplicaCandidate struct {
|
||||
NodeID string
|
||||
Address string
|
||||
ReplicaIndex int
|
||||
InFlight int
|
||||
LastUsed time.Time
|
||||
AvailableVRAM uint64
|
||||
}
|
||||
// ReplicaCandidate aliases the canonical type in pkg/clusterrouting. The policy
|
||||
// implementation moved there so the p2p federation server can share it without
|
||||
// importing this package (which pulls in gorm). Because this is a type alias,
|
||||
// existing references such as the LoadedReplicaStats interface method and the
|
||||
// ReplicaCandidate(rw) row conversion in registry.go remain valid unchanged.
|
||||
type ReplicaCandidate = clusterrouting.ReplicaCandidate
|
||||
|
||||
// PickBestReplica is the single source of truth for which loaded replica of a
|
||||
// model serves the next request.
|
||||
//
|
||||
// Policy (ordered tiers, first non-tie wins):
|
||||
// 1. Least in-flight wins — primary load-balancing signal.
|
||||
// 2. Oldest last_used wins — round-robin between equally-loaded replicas.
|
||||
// Every successful pick refreshes last_used (in FindAndLockNodeWithModel's
|
||||
// transaction and in TouchNodeModel on cache hits), so the "oldest" tier
|
||||
// naturally rotates through the candidate set without a separate cursor.
|
||||
// 3. Largest available_vram wins — cold-start tiebreaker for replicas that
|
||||
// have never been picked (identical last_used).
|
||||
//
|
||||
// Two callers must agree on this policy:
|
||||
//
|
||||
// - SmartRouter.Route, via the SQL ORDER BY in FindAndLockNodeWithModel
|
||||
// (registry.go). That query MUST mirror this function — TestPickerSQLMirror
|
||||
// asserts both sides agree on a representative dataset.
|
||||
//
|
||||
// - The per-frontend rotating-replica cache (NOT YET IMPLEMENTED — see
|
||||
// pkg/model/loader.go and pkg/model/initializers.go for the integration
|
||||
// point). When that cache lands, it will call PickBestReplica against an
|
||||
// in-memory snapshot using locally-tracked in-flight counters and skip the
|
||||
// per-request DB round-trip.
|
||||
//
|
||||
// Returns nil when the candidate list is empty. Does not allocate.
|
||||
// PickBestReplica delegates to the canonical implementation in pkg/clusterrouting.
|
||||
// The SQL ORDER BY in FindAndLockNodeWithModel (registry.go) must mirror it; the
|
||||
// "policy mirror" spec in registry_test.go asserts they agree.
|
||||
func PickBestReplica(candidates []ReplicaCandidate) *ReplicaCandidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
best := &candidates[0]
|
||||
for i := 1; i < len(candidates); i++ {
|
||||
c := &candidates[i]
|
||||
if betterReplica(c, best) {
|
||||
best = c
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
// betterReplica reports whether candidate a is preferred over candidate b
|
||||
// under the policy documented on PickBestReplica.
|
||||
func betterReplica(a, b *ReplicaCandidate) bool {
|
||||
if a.InFlight != b.InFlight {
|
||||
return a.InFlight < b.InFlight
|
||||
}
|
||||
if !a.LastUsed.Equal(b.LastUsed) {
|
||||
return a.LastUsed.Before(b.LastUsed)
|
||||
}
|
||||
return a.AvailableVRAM > b.AvailableVRAM
|
||||
return clusterrouting.PickBestReplica(candidates)
|
||||
}
|
||||
|
||||
13
pkg/clusterrouting/clusterrouting_suite_test.go
Normal file
13
pkg/clusterrouting/clusterrouting_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package clusterrouting
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestClusterRouting(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "ClusterRouting Suite")
|
||||
}
|
||||
66
pkg/clusterrouting/replica.go
Normal file
66
pkg/clusterrouting/replica.go
Normal file
@@ -0,0 +1,66 @@
|
||||
// Package clusterrouting holds the transport-agnostic replica selection policy
|
||||
// shared by the NATS distributed mode (core/services/nodes) and the p2p
|
||||
// federation server (core/p2p). It deliberately depends on nothing heavier than
|
||||
// the standard library so either transport can import it without pulling in a
|
||||
// database driver or message bus.
|
||||
package clusterrouting
|
||||
|
||||
import "time"
|
||||
|
||||
// ReplicaCandidate is the minimum view of a loaded model replica needed to
|
||||
// apply the routing policy. It is intentionally decoupled from any storage
|
||||
// model (gorm rows on the NATS side, gossiped NodeData on the p2p side) so the
|
||||
// same picker runs against fresh DB rows, an in-memory snapshot, or p2p gossip.
|
||||
type ReplicaCandidate struct {
|
||||
NodeID string
|
||||
Address string
|
||||
ReplicaIndex int
|
||||
InFlight int
|
||||
LastUsed time.Time
|
||||
AvailableVRAM uint64
|
||||
}
|
||||
|
||||
// PickBestReplica is the single source of truth for which loaded replica of a
|
||||
// model serves the next request.
|
||||
//
|
||||
// Policy (ordered tiers, first non-tie wins):
|
||||
// 1. Least in-flight wins: primary load-balancing signal.
|
||||
// 2. Oldest last_used wins: round-robin between equally-loaded replicas.
|
||||
// Every successful pick refreshes last_used (in the NATS
|
||||
// FindAndLockNodeWithModel transaction and in TouchNodeModel on cache
|
||||
// hits), so the "oldest" tier naturally rotates through the candidate set
|
||||
// without a separate cursor.
|
||||
// 3. Largest available_vram wins: cold-start tiebreaker for replicas that
|
||||
// have never been picked (identical last_used).
|
||||
//
|
||||
// The NATS SQL ORDER BY in FindAndLockNodeWithModel (registry.go) MUST mirror
|
||||
// this function; registry_test.go's "agrees with PickBestReplica on a seeded
|
||||
// dataset (policy mirror)" spec asserts both sides agree on a representative
|
||||
// dataset and fails fast if they drift.
|
||||
//
|
||||
// Returns nil when the candidate list is empty. Does not allocate.
|
||||
func PickBestReplica(candidates []ReplicaCandidate) *ReplicaCandidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
best := &candidates[0]
|
||||
for i := 1; i < len(candidates); i++ {
|
||||
c := &candidates[i]
|
||||
if betterReplica(c, best) {
|
||||
best = c
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
// betterReplica reports whether candidate a is preferred over candidate b
|
||||
// under the policy documented on PickBestReplica.
|
||||
func betterReplica(a, b *ReplicaCandidate) bool {
|
||||
if a.InFlight != b.InFlight {
|
||||
return a.InFlight < b.InFlight
|
||||
}
|
||||
if !a.LastUsed.Equal(b.LastUsed) {
|
||||
return a.LastUsed.Before(b.LastUsed)
|
||||
}
|
||||
return a.AvailableVRAM > b.AvailableVRAM
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package nodes
|
||||
package clusterrouting
|
||||
|
||||
import (
|
||||
"time"
|
||||
@@ -36,7 +36,7 @@ var _ = Describe("PickBestReplica", func() {
|
||||
|
||||
It("uses oldest last_used as the tiebreaker when in_flight ties", func() {
|
||||
// All three tied on in_flight=0. Without last_used, available_vram
|
||||
// would pin every pick to the fattest node — the exact bug
|
||||
// would pin every pick to the fattest node: the exact bug
|
||||
// fix(distributed): round-robin replicas of the same model addressed.
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "fat-recent", InFlight: 0, LastUsed: ref.Add(2 * time.Second), AvailableVRAM: 24_000_000_000},
|
||||
@@ -47,7 +47,7 @@ var _ = Describe("PickBestReplica", func() {
|
||||
})
|
||||
|
||||
It("uses largest available_vram as the final tiebreaker", func() {
|
||||
// in_flight tied AND last_used tied — pick the largest GPU.
|
||||
// in_flight tied AND last_used tied: pick the largest GPU.
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "small", InFlight: 0, LastUsed: ref, AvailableVRAM: 8_000_000_000},
|
||||
{NodeID: "fat", InFlight: 0, LastUsed: ref, AvailableVRAM: 24_000_000_000},
|
||||
Reference in New Issue
Block a user