feat(p2p): affinity-aware peer selection and federation body-limit flag

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-01 21:42:01 +00:00
parent 14b57aa343
commit 8ec536a34c
3 changed files with 100 additions and 2 deletions

View File

@@ -14,12 +14,13 @@ type FederatedCLI struct {
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"`
}
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
warnDeprecatedFlags()
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024)
c, cancel := context.WithCancel(context.Background())

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/clusterrouting"
"github.com/mudler/xlog"
)
@@ -28,9 +29,13 @@ type FederatedServer struct {
requestTable map[string]int
loadBalanced bool
workerTarget string
bodyLimit int64 // max request body bytes (0 = unlimited)
prefixCfg prefixcache.Config
prefixIndex *prefixcache.Index
}
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64) *FederatedServer {
cfg := prefixcache.DefaultConfig()
return &FederatedServer{
listenAddr: listenAddr,
service: service,
@@ -38,6 +43,9 @@ func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
bodyLimit: bodyLimit,
prefixCfg: cfg,
prefixIndex: prefixcache.NewIndex(cfg),
}
}
@@ -159,6 +167,65 @@ func (fs *FederatedServer) SelectBestServer() string {
return best.NodeID
}
// affinityPreferred returns the peer the prefix index considers warm for this
// chain, or "" when there is no match strong enough among the candidates. It
// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick
// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved.
func affinityPreferred(idx *prefixcache.Index, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string {
if idx == nil || len(chain) == 0 || len(candidates) == 0 {
return ""
}
keys := make([]prefixcache.ReplicaKey, 0, len(candidates))
for _, c := range candidates {
keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID})
}
d := idx.Decide(model, chain, keys, now)
if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch {
return d.Hot.NodeID
}
return ""
}
// selectPeer chooses the federated peer to serve a request for model with the
// given raw body. It filters candidates by model, computes the prefix chain,
// consults the affinity index, and makes the final load+VRAM-aware pick. It
// returns the chosen peer ID and the chain (so the caller can Observe after a
// successful forward). An empty model and nil body degrade to load+VRAM only.
// Returns "" when no eligible peer is online.
func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) {
fs.syncTableStatus()
nodes := GetAvailableNodes(fs.service)
// Snapshot candidates under the lock (it only guards requestTable), then
// release before the prefix hashing and tree walk, which are lock-free
// (candidates is a copy; prefixIndex/prefixCfg are set once at construction).
fs.Lock()
candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model)
fs.Unlock()
if len(candidates) == 0 {
return "", nil
}
var chain []uint64
preferred := ""
if fs.prefixIndex != nil && model != "" && len(body) > 0 {
chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg)
preferred = affinityPreferred(fs.prefixIndex, model, chain, candidates, fs.prefixCfg, now)
}
best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold)
if best == nil {
return "", chain
}
return best.NodeID, chain
}
// observeServed records that peerID served the given chain for model, so the
// next request sharing that prefix is routed back to the same warm peer.
func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) {
if fs.prefixIndex == nil || len(chain) == 0 || peerID == "" || model == "" {
return
}
fs.prefixIndex.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now)
}
func (fs *FederatedServer) RecordRequest(nodeID string) {
fs.Lock()
defer fs.Unlock()

View File

@@ -7,6 +7,7 @@ import (
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/clusterrouting"
)
@@ -115,3 +116,32 @@ var _ = Describe("extractModel", func() {
Expect(extractModel("/x", "", []byte("--multipart-boundary--"))).To(Equal(""))
})
})
var _ = Describe("affinityPreferred", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
It("returns the warm peer once a chain has been observed for it", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
})
It("returns empty when no chain has been observed", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal(""))
})
It("returns empty for a nil index or empty chain", func() {
cfg := prefixcache.DefaultConfig()
Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal(""))
idx := prefixcache.NewIndex(cfg)
Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal(""))
})
})