mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-02 14:16:02 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
276 lines
8.0 KiB
Go
276 lines
8.0 KiB
Go
package distributed_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/services/advisorylock"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/storage"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
|
|
pgdriver "gorm.io/driver/postgres"
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/logger"
|
|
)
|
|
|
|
var _ = Describe("Phase 0: Foundation", Label("Distributed"), func() {
|
|
var (
|
|
infra *TestInfra
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
infra = SetupInfra("localai_test")
|
|
})
|
|
|
|
Context("Distributed mode validation", func() {
|
|
It("should reject --distributed without PostgreSQL configured", func() {
|
|
appCfg := config.NewApplicationConfig(
|
|
config.EnableDistributed,
|
|
config.WithNatsURL(infra.NatsURL),
|
|
// No auth/PostgreSQL configured
|
|
)
|
|
Expect(appCfg.Distributed.Enabled).To(BeTrue())
|
|
// Auth not enabled → validation should fail
|
|
Expect(appCfg.Auth.Enabled).To(BeFalse())
|
|
})
|
|
|
|
It("should reject --distributed without NATS configured", func() {
|
|
appCfg := config.NewApplicationConfig(
|
|
config.EnableDistributed,
|
|
config.WithAuthEnabled(true),
|
|
config.WithAuthDatabaseURL(infra.PGURL),
|
|
// No NATS URL
|
|
)
|
|
Expect(appCfg.Distributed.NatsURL).To(BeEmpty())
|
|
})
|
|
|
|
It("should accept valid distributed configuration", func() {
|
|
appCfg := config.NewApplicationConfig(
|
|
config.EnableDistributed,
|
|
config.WithAuthEnabled(true),
|
|
config.WithAuthDatabaseURL(infra.PGURL),
|
|
config.WithNatsURL(infra.NatsURL),
|
|
)
|
|
Expect(appCfg.Distributed.Enabled).To(BeTrue())
|
|
Expect(appCfg.Auth.Enabled).To(BeTrue())
|
|
Expect(appCfg.Distributed.NatsURL).To(Equal(infra.NatsURL))
|
|
})
|
|
|
|
It("should generate unique frontend ID on startup", func() {
|
|
cfg1 := config.NewApplicationConfig(config.EnableDistributed)
|
|
cfg2 := config.NewApplicationConfig(config.EnableDistributed)
|
|
// IDs are empty until initDistributed generates them,
|
|
// but if set via env, they should be preserved
|
|
cfg3 := config.NewApplicationConfig(
|
|
config.EnableDistributed,
|
|
config.WithDistributedInstanceID("my-pod-1"),
|
|
)
|
|
Expect(cfg3.Distributed.InstanceID).To(Equal("my-pod-1"))
|
|
// Default is empty — filled in at startup
|
|
Expect(cfg1.Distributed.InstanceID).To(BeEmpty())
|
|
Expect(cfg2.Distributed.InstanceID).To(BeEmpty())
|
|
})
|
|
|
|
It("should start in single-node mode without --distributed", func() {
|
|
appCfg := config.NewApplicationConfig()
|
|
Expect(appCfg.Distributed.Enabled).To(BeFalse())
|
|
})
|
|
})
|
|
|
|
Context("NATS client", func() {
|
|
It("should connect, publish, and subscribe", func() {
|
|
client, err := messaging.New(infra.NatsURL)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer client.Close()
|
|
|
|
Expect(client.IsConnected()).To(BeTrue())
|
|
|
|
received := make(chan []byte, 1)
|
|
sub, err := client.Subscribe("test.subject", func(data []byte) {
|
|
received <- data
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer sub.Unsubscribe()
|
|
|
|
// Small delay to ensure subscription is active
|
|
FlushNATS(client)
|
|
|
|
err = client.Publish("test.subject", map[string]string{"msg": "hello"})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
Eventually(received, "5s").Should(Receive())
|
|
})
|
|
|
|
It("should support queue subscriptions for load balancing", func() {
|
|
client, err := messaging.New(infra.NatsURL)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer client.Close()
|
|
|
|
var worker1Count, worker2Count atomic.Int32
|
|
|
|
sub1, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) {
|
|
worker1Count.Add(1)
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer sub1.Unsubscribe()
|
|
|
|
sub2, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) {
|
|
worker2Count.Add(1)
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer sub2.Unsubscribe()
|
|
|
|
FlushNATS(client)
|
|
|
|
// Publish multiple messages
|
|
for i := range 10 {
|
|
err = client.Publish("test.queue", map[string]int{"n": i})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
}
|
|
|
|
// Wait for all messages to be processed
|
|
Eventually(func() int32 {
|
|
return worker1Count.Load() + worker2Count.Load()
|
|
}, "5s").Should(Equal(int32(10)))
|
|
|
|
// Both workers should have received some messages (load-balanced)
|
|
// Note: with only 10 messages, distribution may not be perfectly even
|
|
Expect(worker1Count.Load() + worker2Count.Load()).To(Equal(int32(10)))
|
|
})
|
|
|
|
It("should reconnect after disconnect", func() {
|
|
client, err := messaging.New(infra.NatsURL)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer client.Close()
|
|
|
|
Expect(client.IsConnected()).To(BeTrue())
|
|
// The reconnect behavior is tested implicitly by the RetryOnFailedConnect option
|
|
// A full reconnect test would require stopping/restarting the NATS container
|
|
})
|
|
})
|
|
|
|
Context("ObjectStore filesystem adapter", func() {
|
|
var store *storage.FilesystemStore
|
|
|
|
BeforeEach(func() {
|
|
var err error
|
|
store, err = storage.NewFilesystemStore(GinkgoT().TempDir())
|
|
Expect(err).ToNot(HaveOccurred())
|
|
})
|
|
|
|
It("should Put/Get/Delete", func() {
|
|
ctx := context.Background()
|
|
|
|
// Put
|
|
data := []byte("hello world")
|
|
err := store.Put(ctx, "test/file.txt", bytes.NewReader(data))
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
// Exists
|
|
exists, err := store.Exists(ctx, "test/file.txt")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(exists).To(BeTrue())
|
|
|
|
// Get
|
|
r, err := store.Get(ctx, "test/file.txt")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
got, err := io.ReadAll(r)
|
|
r.Close()
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(string(got)).To(Equal("hello world"))
|
|
|
|
// List
|
|
keys, err := store.List(ctx, "test")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(keys).To(ContainElement("test/file.txt"))
|
|
|
|
// Delete
|
|
err = store.Delete(ctx, "test/file.txt")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
exists, err = store.Exists(ctx, "test/file.txt")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(exists).To(BeFalse())
|
|
})
|
|
})
|
|
|
|
Context("Advisory locks", func() {
|
|
var db *gorm.DB
|
|
|
|
BeforeEach(func() {
|
|
var err error
|
|
db, err = gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{
|
|
Logger: logger.Default.LogMode(logger.Silent),
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
})
|
|
|
|
It("should acquire and release advisory lock", func() {
|
|
executed := false
|
|
acquired, err := advisorylock.TryWithLockCtx(context.Background(), db, 42, func() error {
|
|
executed = true
|
|
return nil
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(acquired).To(BeTrue())
|
|
Expect(executed).To(BeTrue())
|
|
})
|
|
|
|
It("should prevent concurrent acquisition", func() {
|
|
// Use two dedicated sql.Conn to ensure they are different sessions.
|
|
sqlDB, err := db.DB()
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
conn1, err := sqlDB.Conn(context.Background())
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer conn1.Close()
|
|
|
|
conn2, err := sqlDB.Conn(context.Background())
|
|
Expect(err).ToNot(HaveOccurred())
|
|
defer conn2.Close()
|
|
|
|
// Acquire on conn1
|
|
var acquired bool
|
|
err = conn1.QueryRowContext(context.Background(),
|
|
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&acquired)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(acquired).To(BeTrue())
|
|
|
|
// conn2 should NOT be able to acquire the same lock
|
|
var otherAcquired bool
|
|
err = conn2.QueryRowContext(context.Background(),
|
|
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(otherAcquired).To(BeFalse())
|
|
|
|
// Release on conn1
|
|
conn1.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43))
|
|
|
|
// Now conn2 should be able to acquire
|
|
err = conn2.QueryRowContext(context.Background(),
|
|
"SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(otherAcquired).To(BeTrue())
|
|
|
|
// Clean up
|
|
conn2.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43))
|
|
})
|
|
|
|
It("should support WithLockCtx for scoped locking", func() {
|
|
executed := false
|
|
err := advisorylock.WithLockCtx(context.Background(), db, 44, func() error {
|
|
executed = true
|
|
return nil
|
|
})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(executed).To(BeTrue())
|
|
})
|
|
})
|
|
})
|