feat(p2p): model-filtered federation candidates and request model extraction

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-01 21:37:13 +00:00
parent 288d732af7
commit 14b57aa343
2 changed files with 105 additions and 11 deletions

View File

@@ -1,8 +1,10 @@
package p2p
import (
"encoding/json"
"fmt"
"math/rand/v2"
"strings"
"sync"
"time"
@@ -78,17 +80,21 @@ func (fs *FederatedServer) syncTableStatus() {
}
// buildFederatedCandidates maps the currently-online federated peers into the
// shared routing policy's candidate form. InFlight comes from the per-peer
// request counter (lower means fewer requests routed there); AvailableVRAM
// comes from the gossiped NodeData. LastUsed is intentionally left zero:
// federation has no per-peer last-used clock, and the request counter already
// spreads load, so the VRAM tier deterministically breaks in-flight ties.
func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time) []clusterrouting.ReplicaCandidate {
// shared routing policy's candidate form, optionally filtered to peers that can
// serve model. A peer with a non-empty advertised model set that lacks model is
// excluded; a peer with an empty set is treated as "unknown" and stays eligible
// (so older peers and mid-convergence peers are not starved). When model is "",
// no model filtering is applied. InFlight comes from the per-peer request
// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero.
func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate {
candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes))
for _, nd := range nodes {
if !nd.IsOnlineAt(now) {
continue
}
if !servesModel(nd, model) {
continue
}
candidates = append(candidates, clusterrouting.ReplicaCandidate{
NodeID: nd.ID,
InFlight: requestTable[nd.ID],
@@ -98,6 +104,40 @@ func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]i
return candidates
}
// servesModel reports whether nd is eligible to serve model. An empty model
// means "no filter". An empty advertised set means "unknown" and is eligible.
func servesModel(nd schema.NodeData, model string) bool {
if model == "" || len(nd.Models) == 0 {
return true
}
for _, m := range nd.Models {
if m == model {
return true
}
}
return false
}
// extractModel best-effort resolves the target model of a buffered request,
// cheapest source first: an explicit query value, then the JSON body "model"
// field. Returns "" when it cannot be determined (for example a multipart or
// websocket request), in which case the caller routes by load/affinity only.
func extractModel(path, queryModel string, body []byte) string {
if strings.TrimSpace(queryModel) != "" {
return queryModel
}
if len(body) == 0 {
return ""
}
var probe struct {
Model string `json:"model"`
}
if err := json.Unmarshal(body, &probe); err != nil {
return ""
}
return probe.Model
}
// SelectBestServer picks the online federated peer to serve the next request
// using the shared cluster-routing policy (least in-flight, then most free
// VRAM). Returns "" when no peer is online.
@@ -109,7 +149,7 @@ func (fs *FederatedServer) SelectBestServer() string {
nodes := GetAvailableNodes(fs.service)
fs.Lock()
defer fs.Unlock()
candidates := buildFederatedCandidates(nodes, fs.requestTable, time.Now())
candidates := buildFederatedCandidates(nodes, fs.requestTable, time.Now(), "")
best := clusterrouting.PickBestReplica(candidates)
if best == nil {
xlog.Debug("No online federated peers to select", "request_table", fs.requestTable)

View File

@@ -20,7 +20,7 @@ var _ = Describe("buildFederatedCandidates", func() {
{ID: "online", LastSeen: onlineSeen},
{ID: "offline", LastSeen: offlineSeen},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref)
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("online"))
})
@@ -30,7 +30,7 @@ var _ = Describe("buildFederatedCandidates", func() {
{ID: "a", LastSeen: onlineSeen},
{ID: "b", LastSeen: onlineSeen},
}
cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref)
cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "")
byID := map[string]int{}
for _, c := range cands {
byID[c.NodeID] = c.InFlight
@@ -43,7 +43,7 @@ var _ = Describe("buildFederatedCandidates", func() {
nodes := []schema.NodeData{
{ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref)
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000)))
})
@@ -55,9 +55,63 @@ var _ = Describe("buildFederatedCandidates", func() {
{ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000},
{ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
}
cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref)
cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "")
best := clusterrouting.PickBestReplica(cands)
Expect(best).ToNot(BeNil())
Expect(best.NodeID).To(Equal("idle-big"))
})
})
var _ = Describe("model-aware candidate building", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
seen := ref.Add(-10 * time.Second)
It("keeps peers that advertise the requested model", func() {
nodes := []schema.NodeData{
{ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}},
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("has"))
})
It("keeps peers with an empty (unknown) model set eligible for any model", func() {
nodes := []schema.NodeData{
{ID: "unknown", LastSeen: seen, Models: nil},
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("unknown"))
})
It("does not filter when the requested model is empty", func() {
nodes := []schema.NodeData{
{ID: "a", LastSeen: seen, Models: []string{"x"}},
{ID: "b", LastSeen: seen, Models: []string{"y"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands).To(HaveLen(2))
})
})
var _ = Describe("extractModel", func() {
It("reads the JSON body model field", func() {
body := []byte(`{"model":"llama-3","messages":[]}`)
Expect(extractModel("/v1/chat/completions", "", body)).To(Equal("llama-3"))
})
It("prefers a path/query model over the body", func() {
body := []byte(`{"model":"frombody"}`)
Expect(extractModel("/x", "fromquery", body)).To(Equal("fromquery"))
})
It("returns empty when no model is present", func() {
Expect(extractModel("/x", "", []byte(`{"messages":[]}`))).To(Equal(""))
})
It("returns empty on non-JSON / unparseable body without panicking", func() {
Expect(extractModel("/x", "", []byte("--multipart-boundary--"))).To(Equal(""))
})
})