Files
LocalAI/tests/e2e/distributed/foundation_test.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

276 lines
8.0 KiB
Go

package distributed_test
import (
"bytes"
"context"
"io"
"sync/atomic"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/advisorylock"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
pgdriver "gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var _ = Describe("Phase 0: Foundation", Label("Distributed"), func() {
var (
infra *TestInfra
)
BeforeEach(func() {
infra = SetupInfra("localai_test")
})
Context("Distributed mode validation", func() {
It("should reject --distributed without PostgreSQL configured", func() {
appCfg := config.NewApplicationConfig(
config.EnableDistributed,
config.WithNatsURL(infra.NatsURL),
// No auth/PostgreSQL configured
)
Expect(appCfg.Distributed.Enabled).To(BeTrue())
// Auth not enabled → validation should fail
Expect(appCfg.Auth.Enabled).To(BeFalse())
})
It("should reject --distributed without NATS configured", func() {
appCfg := config.NewApplicationConfig(
config.EnableDistributed,
config.WithAuthEnabled(true),
config.WithAuthDatabaseURL(infra.PGURL),
// No NATS URL
)
Expect(appCfg.Distributed.NatsURL).To(BeEmpty())
})
It("should accept valid distributed configuration", func() {
appCfg := config.NewApplicationConfig(
config.EnableDistributed,
config.WithAuthEnabled(true),
config.WithAuthDatabaseURL(infra.PGURL),
config.WithNatsURL(infra.NatsURL),
)
Expect(appCfg.Distributed.Enabled).To(BeTrue())
Expect(appCfg.Auth.Enabled).To(BeTrue())
Expect(appCfg.Distributed.NatsURL).To(Equal(infra.NatsURL))
})
It("should generate unique frontend ID on startup", func() {
cfg1 := config.NewApplicationConfig(config.EnableDistributed)
cfg2 := config.NewApplicationConfig(config.EnableDistributed)
// IDs are empty until initDistributed generates them,
// but if set via env, they should be preserved
cfg3 := config.NewApplicationConfig(
config.EnableDistributed,
config.WithDistributedInstanceID("my-pod-1"),
)
Expect(cfg3.Distributed.InstanceID).To(Equal("my-pod-1"))
// Default is empty — filled in at startup
Expect(cfg1.Distributed.InstanceID).To(BeEmpty())
Expect(cfg2.Distributed.InstanceID).To(BeEmpty())
})
It("should start in single-node mode without --distributed", func() {
appCfg := config.NewApplicationConfig()
Expect(appCfg.Distributed.Enabled).To(BeFalse())
})
})
Context("NATS client", func() {
It("should connect, publish, and subscribe", func() {
client, err := messaging.New(infra.NatsURL)
Expect(err).ToNot(HaveOccurred())
defer client.Close()
Expect(client.IsConnected()).To(BeTrue())
received := make(chan []byte, 1)
sub, err := client.Subscribe("test.subject", func(data []byte) {
received <- data
})
Expect(err).ToNot(HaveOccurred())
defer sub.Unsubscribe()
// Small delay to ensure subscription is active
FlushNATS(client)
err = client.Publish("test.subject", map[string]string{"msg": "hello"})
Expect(err).ToNot(HaveOccurred())
Eventually(received, "5s").Should(Receive())
})
It("should support queue subscriptions for load balancing", func() {
client, err := messaging.New(infra.NatsURL)
Expect(err).ToNot(HaveOccurred())
defer client.Close()
var worker1Count, worker2Count atomic.Int32
sub1, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) {
worker1Count.Add(1)
})
Expect(err).ToNot(HaveOccurred())
defer sub1.Unsubscribe()
sub2, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) {
worker2Count.Add(1)
})
Expect(err).ToNot(HaveOccurred())
defer sub2.Unsubscribe()
FlushNATS(client)
// Publish multiple messages
for i := range 10 {
err = client.Publish("test.queue", map[string]int{"n": i})
Expect(err).ToNot(HaveOccurred())
}
// Wait for all messages to be processed
Eventually(func() int32 {
return worker1Count.Load() + worker2Count.Load()
}, "5s").Should(Equal(int32(10)))
// Both workers should have received some messages (load-balanced)
// Note: with only 10 messages, distribution may not be perfectly even
Expect(worker1Count.Load() + worker2Count.Load()).To(Equal(int32(10)))
})
It("should reconnect after disconnect", func() {
client, err := messaging.New(infra.NatsURL)
Expect(err).ToNot(HaveOccurred())
defer client.Close()
Expect(client.IsConnected()).To(BeTrue())
// The reconnect behavior is tested implicitly by the RetryOnFailedConnect option
// A full reconnect test would require stopping/restarting the NATS container
})
})
Context("ObjectStore filesystem adapter", func() {
var store *storage.FilesystemStore
BeforeEach(func() {
var err error
store, err = storage.NewFilesystemStore(GinkgoT().TempDir())
Expect(err).ToNot(HaveOccurred())
})
It("should Put/Get/Delete", func() {
ctx := context.Background()
// Put
data := []byte("hello world")
err := store.Put(ctx, "test/file.txt", bytes.NewReader(data))
Expect(err).ToNot(HaveOccurred())
// Exists
exists, err := store.Exists(ctx, "test/file.txt")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())
// Get
r, err := store.Get(ctx, "test/file.txt")
Expect(err).ToNot(HaveOccurred())
got, err := io.ReadAll(r)
r.Close()
Expect(err).ToNot(HaveOccurred())
Expect(string(got)).To(Equal("hello world"))
// List
keys, err := store.List(ctx, "test")
Expect(err).ToNot(HaveOccurred())
Expect(keys).To(ContainElement("test/file.txt"))
// Delete
err = store.Delete(ctx, "test/file.txt")
Expect(err).ToNot(HaveOccurred())
exists, err = store.Exists(ctx, "test/file.txt")
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeFalse())
})
})
Context("Advisory locks", func() {
var db *gorm.DB
BeforeEach(func() {
var err error
db, err = gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
Expect(err).ToNot(HaveOccurred())
})
It("should acquire and release advisory lock", func() {
executed := false
acquired, err := advisorylock.TryWithLockCtx(context.Background(), db, 42, func() error {
executed = true
return nil
})
Expect(err).ToNot(HaveOccurred())
Expect(acquired).To(BeTrue())
Expect(executed).To(BeTrue())
})
It("should prevent concurrent acquisition", func() {
// Use two dedicated sql.Conn to ensure they are different sessions.
sqlDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
conn1, err := sqlDB.Conn(context.Background())
Expect(err).ToNot(HaveOccurred())
defer conn1.Close()
conn2, err := sqlDB.Conn(context.Background())
Expect(err).ToNot(HaveOccurred())
defer conn2.Close()
// Acquire on conn1
var acquired bool
err = conn1.QueryRowContext(context.Background(),
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&acquired)
Expect(err).ToNot(HaveOccurred())
Expect(acquired).To(BeTrue())
// conn2 should NOT be able to acquire the same lock
var otherAcquired bool
err = conn2.QueryRowContext(context.Background(),
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired)
Expect(err).ToNot(HaveOccurred())
Expect(otherAcquired).To(BeFalse())
// Release on conn1
conn1.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43))
// Now conn2 should be able to acquire
err = conn2.QueryRowContext(context.Background(),
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired)
Expect(err).ToNot(HaveOccurred())
Expect(otherAcquired).To(BeTrue())
// Clean up
conn2.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43))
})
It("should support WithLockCtx for scoped locking", func() {
executed := false
err := advisorylock.WithLockCtx(context.Background(), db, 44, func() error {
executed = true
return nil
})
Expect(err).ToNot(HaveOccurred())
Expect(executed).To(BeTrue())
})
})
})