feat(p2p): edgevpn generic-channel publisher and handler for affinity sync

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-01 22:22:59 +00:00
parent 91fc26ff75
commit ce8b97edf2
3 changed files with 130 additions and 0 deletions

81
core/p2p/affinity_sync.go Normal file
View File

@@ -0,0 +1,81 @@
package p2p
import (
"encoding/json"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/edgevpn/pkg/blockchain"
"github.com/mudler/edgevpn/pkg/hub"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/xlog"
)
// affinitySubjectKey is the hub.Message annotation carrying the logical subject
// (observe vs invalidate) so the receiver can dispatch the way a NATS subject
// would. The generic channel has no subject routing, so we carry it ourselves.
const affinitySubjectKey = "subject"
// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to
// the prefixcache publisher interface (Publish(subject, v)). It lets a
// federation server reuse prefixcache.Sync for cross-server affinity coherence
// without NATS: each event is JSON-encoded into a hub.Message and broadcast over
// the generic channel (not the slow blockchain ledger).
type genericChannelPublisher struct {
node *node.Node
}
// Publish satisfies prefixcache's (unexported) publisher interface structurally.
func (p *genericChannelPublisher) Publish(subject string, v any) error {
payload, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("marshalling affinity event: %w", err)
}
return p.node.PublishMessage(&hub.Message{
Message: string(payload),
Annotations: map[string]interface{}{affinitySubjectKey: subject},
})
}
// applyAffinityMessage decodes a generic-channel affinity message and applies it
// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the
// local clock so TTL is measured per server. Unknown subjects, malformed
// payloads, and nil inputs are ignored (debug-logged), never fatal.
func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) {
if sync == nil || m == nil {
return
}
subject, _ := m.Annotations[affinitySubjectKey].(string)
switch subject {
case messaging.SubjectPrefixCacheObserve:
var ev messaging.PrefixCacheObserveEvent
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
xlog.Debug("affinity: bad observe payload", "error", err)
return
}
sync.ApplyObserve(ev, now)
case messaging.SubjectPrefixCacheInvalidate:
var ev messaging.PrefixCacheInvalidateEvent
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
xlog.Debug("affinity: bad invalidate payload", "error", err)
return
}
sync.ApplyInvalidate(ev)
default:
// Other generic-channel traffic; not ours.
}
}
// affinityHandler returns the edgevpn generic-channel handler that applies remote
// affinity events to this server's index. It is registered at node construction
// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is
// safe because messages only arrive after Start, by which point Start has wired
// fs.prefixSync.
func (fs *FederatedServer) affinityHandler() node.Handler {
return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error {
applyAffinityMessage(fs.prefixSync, m, time.Now())
return nil
}
}

View File

@@ -0,0 +1,48 @@
package p2p
import (
"encoding/json"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/edgevpn/pkg/hub"
)
var _ = Describe("applyAffinityMessage", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message {
payload, _ := json.Marshal(ev)
return &hub.Message{
Message: string(payload),
Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve},
}
}
It("applies a peer observe so the local index resolves the warm peer", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
sync := prefixcache.NewSync(idx, nil)
chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg)
applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref)
d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref)
Expect(d.HasHot).To(BeTrue())
Expect(d.Hot.NodeID).To(Equal("warm"))
})
It("ignores malformed, unknown-subject, and nil inputs without panicking", func() {
sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil)
applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref)
applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref)
applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref)
applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref)
applyAffinityMessage(sync, nil, ref)
Expect(true).To(BeTrue())
})
})

View File

@@ -32,6 +32,7 @@ type FederatedServer struct {
bodyLimit int64 // max request body bytes (0 = unlimited)
prefixCfg prefixcache.Config
prefixIndex *prefixcache.Index
prefixSync *prefixcache.Sync
}
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64) *FederatedServer {