Files
LocalAI/core/services/nodes/unloader_test.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

257 lines
7.9 KiB
Go

package nodes
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
)
// --- Fakes ---
// fakeModelLocator implements ModelLocator with configurable node lists.
type fakeModelLocator struct {
nodes []BackendNode
findErr error
removedPairs []modelNodePair // records RemoveNodeModel calls
}
type modelNodePair struct {
nodeID string
modelName string
}
func (f *fakeModelLocator) FindNodesWithModel(_ context.Context, _ string) ([]BackendNode, error) {
return f.nodes, f.findErr
}
func (f *fakeModelLocator) RemoveNodeModel(_ context.Context, nodeID, modelName string) error {
f.removedPairs = append(f.removedPairs, modelNodePair{nodeID, modelName})
return nil
}
// fakeMessagingClient implements messaging.MessagingClient, recording Publish
// and Request calls so we can assert on subjects and payloads.
type fakeMessagingClient struct {
mu sync.Mutex
published []publishCall
publishErr error // error to return from Publish
requestReply []byte
requestErr error
requestCalls []requestCall
}
type publishCall struct {
Subject string
Data []byte
}
type requestCall struct {
Subject string
Data []byte
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
f.mu.Lock()
defer f.mu.Unlock()
var raw []byte
if data != nil {
var err error
raw, err = json.Marshal(data)
if err != nil {
return err
}
}
f.published = append(f.published, publishCall{Subject: subject, Data: raw})
return f.publishErr
}
func (f *fakeMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) QueueSubscribeReply(_ string, _ string, _ func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data})
return f.requestReply, f.requestErr
}
func (f *fakeMessagingClient) IsConnected() bool { return true }
func (f *fakeMessagingClient) Close() {}
type fakeSubscription struct{}
func (f *fakeSubscription) Unsubscribe() error { return nil }
// --- Tests ---
var _ = Describe("RemoteUnloaderAdapter", func() {
var (
locator *fakeModelLocator
mc *fakeMessagingClient
adapter *RemoteUnloaderAdapter
)
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc)
})
Describe("UnloadRemoteModel", func() {
It("with no nodes returns nil", func() {
locator.nodes = nil
Expect(adapter.UnloadRemoteModel("my-model")).To(Succeed())
Expect(mc.published).To(BeEmpty())
})
It("broadcasts to all nodes with model", func() {
locator.nodes = []BackendNode{
{ID: "node-1", Name: "worker-1"},
{ID: "node-2", Name: "worker-2"},
}
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
// Should have published a StopBackend for each node.
Expect(mc.published).To(HaveLen(2))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-1")))
Expect(mc.published[1].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-2")))
// Should have removed the model from each node in the registry.
Expect(locator.removedPairs).To(HaveLen(2))
Expect(locator.removedPairs[0]).To(Equal(modelNodePair{"node-1", "llama"}))
Expect(locator.removedPairs[1]).To(Equal(modelNodePair{"node-2", "llama"}))
})
It("continues when one node fails", func() {
locator.nodes = []BackendNode{
{ID: "node-fail", Name: "worker-fail"},
{ID: "node-ok", Name: "worker-ok"},
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
// The second node should still have been processed.
// The first node's StopBackend errored, so RemoveNodeModel was NOT called for it.
// The second node's StopBackend succeeded, so RemoveNodeModel WAS called.
Expect(locator.removedPairs).To(HaveLen(1))
Expect(locator.removedPairs[0].nodeID).To(Equal("node-ok"))
})
})
Describe("StopBackend", func() {
It("with empty backend publishes nil payload", func() {
Expect(adapter.StopBackend("node-1", "")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeBackendStop("node-1")))
Expect(mc.published[0].Data).To(BeNil())
})
It("with backend name publishes JSON", func() {
Expect(adapter.StopBackend("node-1", "llama-backend")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
var payload struct {
Backend string `json:"backend"`
}
Expect(json.Unmarshal(mc.published[0].Data, &payload)).To(Succeed())
Expect(payload.Backend).To(Equal("llama-backend"))
})
})
Describe("StopNode", func() {
It("publishes to correct subject", func() {
Expect(adapter.StopNode("node-abc")).To(Succeed())
Expect(mc.published).To(HaveLen(1))
Expect(mc.published[0].Subject).To(Equal(messaging.SubjectNodeStop("node-abc")))
Expect(mc.published[0].Data).To(BeNil())
})
})
Describe("DeleteModelFiles", func() {
It("with no nodes returns nil", func() {
locator.nodes = nil
Expect(adapter.DeleteModelFiles("my-model")).To(Succeed())
})
It("continues on failure", func() {
locator.nodes = []BackendNode{
{ID: "node-1", Name: "w1"},
{ID: "node-2", Name: "w2"},
}
// Request will fail for all calls.
mc.requestErr = fmt.Errorf("timeout")
Expect(adapter.DeleteModelFiles("my-model")).To(Succeed())
// Both nodes attempted.
Expect(mc.requestCalls).To(HaveLen(2))
Expect(mc.requestCalls[0].Subject).To(Equal(messaging.SubjectNodeModelDelete("node-1")))
Expect(mc.requestCalls[1].Subject).To(Equal(messaging.SubjectNodeModelDelete("node-2")))
})
})
})
// failOnceMessagingClient wraps fakeMessagingClient but fails the Publish call
// at index failOn (0-based) and succeeds all others.
type failOnceMessagingClient struct {
inner *fakeMessagingClient
failOn int
callIdx int
mu sync.Mutex
}
func (f *failOnceMessagingClient) Publish(subject string, data any) error {
f.mu.Lock()
idx := f.callIdx
f.callIdx++
f.mu.Unlock()
if idx == f.failOn {
return fmt.Errorf("simulated failure")
}
return f.inner.Publish(subject, data)
}
func (f *failOnceMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
return f.inner.Subscribe(subject, handler)
}
func (f *failOnceMessagingClient) QueueSubscribe(subject, queue string, handler func([]byte)) (messaging.Subscription, error) {
return f.inner.QueueSubscribe(subject, queue, handler)
}
func (f *failOnceMessagingClient) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return f.inner.QueueSubscribeReply(subject, queue, handler)
}
func (f *failOnceMessagingClient) SubscribeReply(subject string, handler func(data []byte, reply func([]byte))) (messaging.Subscription, error) {
return f.inner.SubscribeReply(subject, handler)
}
func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
return f.inner.Request(subject, data, timeout)
}
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}