package distributed_test import ( "bytes" "context" "io" "sync/atomic" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/services/advisorylock" "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/services/storage" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" pgdriver "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/logger" ) var _ = Describe("Phase 0: Foundation", Label("Distributed"), func() { var ( infra *TestInfra ) BeforeEach(func() { infra = SetupInfra("localai_test") }) Context("Distributed mode validation", func() { It("should reject --distributed without PostgreSQL configured", func() { appCfg := config.NewApplicationConfig( config.EnableDistributed, config.WithNatsURL(infra.NatsURL), // No auth/PostgreSQL configured ) Expect(appCfg.Distributed.Enabled).To(BeTrue()) // Auth not enabled → validation should fail Expect(appCfg.Auth.Enabled).To(BeFalse()) }) It("should reject --distributed without NATS configured", func() { appCfg := config.NewApplicationConfig( config.EnableDistributed, config.WithAuthEnabled(true), config.WithAuthDatabaseURL(infra.PGURL), // No NATS URL ) Expect(appCfg.Distributed.NatsURL).To(BeEmpty()) }) It("should accept valid distributed configuration", func() { appCfg := config.NewApplicationConfig( config.EnableDistributed, config.WithAuthEnabled(true), config.WithAuthDatabaseURL(infra.PGURL), config.WithNatsURL(infra.NatsURL), ) Expect(appCfg.Distributed.Enabled).To(BeTrue()) Expect(appCfg.Auth.Enabled).To(BeTrue()) Expect(appCfg.Distributed.NatsURL).To(Equal(infra.NatsURL)) }) It("should generate unique frontend ID on startup", func() { cfg1 := config.NewApplicationConfig(config.EnableDistributed) cfg2 := config.NewApplicationConfig(config.EnableDistributed) // IDs are empty until initDistributed generates them, // but if set via env, they should be preserved cfg3 := config.NewApplicationConfig( config.EnableDistributed, config.WithDistributedInstanceID("my-pod-1"), ) Expect(cfg3.Distributed.InstanceID).To(Equal("my-pod-1")) // Default is empty — filled in at startup Expect(cfg1.Distributed.InstanceID).To(BeEmpty()) Expect(cfg2.Distributed.InstanceID).To(BeEmpty()) }) It("should start in single-node mode without --distributed", func() { appCfg := config.NewApplicationConfig() Expect(appCfg.Distributed.Enabled).To(BeFalse()) }) }) Context("NATS client", func() { It("should connect, publish, and subscribe", func() { client, err := messaging.New(infra.NatsURL) Expect(err).ToNot(HaveOccurred()) defer client.Close() Expect(client.IsConnected()).To(BeTrue()) received := make(chan []byte, 1) sub, err := client.Subscribe("test.subject", func(data []byte) { received <- data }) Expect(err).ToNot(HaveOccurred()) defer sub.Unsubscribe() // Small delay to ensure subscription is active FlushNATS(client) err = client.Publish("test.subject", map[string]string{"msg": "hello"}) Expect(err).ToNot(HaveOccurred()) Eventually(received, "5s").Should(Receive()) }) It("should support queue subscriptions for load balancing", func() { client, err := messaging.New(infra.NatsURL) Expect(err).ToNot(HaveOccurred()) defer client.Close() var worker1Count, worker2Count atomic.Int32 sub1, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) { worker1Count.Add(1) }) Expect(err).ToNot(HaveOccurred()) defer sub1.Unsubscribe() sub2, err := client.QueueSubscribe("test.queue", "workers", func(data []byte) { worker2Count.Add(1) }) Expect(err).ToNot(HaveOccurred()) defer sub2.Unsubscribe() FlushNATS(client) // Publish multiple messages for i := range 10 { err = client.Publish("test.queue", map[string]int{"n": i}) Expect(err).ToNot(HaveOccurred()) } // Wait for all messages to be processed Eventually(func() int32 { return worker1Count.Load() + worker2Count.Load() }, "5s").Should(Equal(int32(10))) // Both workers should have received some messages (load-balanced) // Note: with only 10 messages, distribution may not be perfectly even Expect(worker1Count.Load() + worker2Count.Load()).To(Equal(int32(10))) }) It("should reconnect after disconnect", func() { client, err := messaging.New(infra.NatsURL) Expect(err).ToNot(HaveOccurred()) defer client.Close() Expect(client.IsConnected()).To(BeTrue()) // The reconnect behavior is tested implicitly by the RetryOnFailedConnect option // A full reconnect test would require stopping/restarting the NATS container }) }) Context("ObjectStore filesystem adapter", func() { var store *storage.FilesystemStore BeforeEach(func() { var err error store, err = storage.NewFilesystemStore(GinkgoT().TempDir()) Expect(err).ToNot(HaveOccurred()) }) It("should Put/Get/Delete", func() { ctx := context.Background() // Put data := []byte("hello world") err := store.Put(ctx, "test/file.txt", bytes.NewReader(data)) Expect(err).ToNot(HaveOccurred()) // Exists exists, err := store.Exists(ctx, "test/file.txt") Expect(err).ToNot(HaveOccurred()) Expect(exists).To(BeTrue()) // Get r, err := store.Get(ctx, "test/file.txt") Expect(err).ToNot(HaveOccurred()) got, err := io.ReadAll(r) r.Close() Expect(err).ToNot(HaveOccurred()) Expect(string(got)).To(Equal("hello world")) // List keys, err := store.List(ctx, "test") Expect(err).ToNot(HaveOccurred()) Expect(keys).To(ContainElement("test/file.txt")) // Delete err = store.Delete(ctx, "test/file.txt") Expect(err).ToNot(HaveOccurred()) exists, err = store.Exists(ctx, "test/file.txt") Expect(err).ToNot(HaveOccurred()) Expect(exists).To(BeFalse()) }) }) Context("Advisory locks", func() { var db *gorm.DB BeforeEach(func() { var err error db, err = gorm.Open(pgdriver.Open(infra.PGURL), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) Expect(err).ToNot(HaveOccurred()) }) It("should acquire and release advisory lock", func() { executed := false acquired, err := advisorylock.TryWithLockCtx(context.Background(), db, 42, func() error { executed = true return nil }) Expect(err).ToNot(HaveOccurred()) Expect(acquired).To(BeTrue()) Expect(executed).To(BeTrue()) }) It("should prevent concurrent acquisition", func() { // Use two dedicated sql.Conn to ensure they are different sessions. sqlDB, err := db.DB() Expect(err).ToNot(HaveOccurred()) conn1, err := sqlDB.Conn(context.Background()) Expect(err).ToNot(HaveOccurred()) defer conn1.Close() conn2, err := sqlDB.Conn(context.Background()) Expect(err).ToNot(HaveOccurred()) defer conn2.Close() // Acquire on conn1 var acquired bool err = conn1.QueryRowContext(context.Background(), "SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&acquired) Expect(err).ToNot(HaveOccurred()) Expect(acquired).To(BeTrue()) // conn2 should NOT be able to acquire the same lock var otherAcquired bool err = conn2.QueryRowContext(context.Background(), "SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired) Expect(err).ToNot(HaveOccurred()) Expect(otherAcquired).To(BeFalse()) // Release on conn1 conn1.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43)) // Now conn2 should be able to acquire err = conn2.QueryRowContext(context.Background(), "SELECT pg_try_advisory_lock($1)", int64(43)).Scan(&otherAcquired) Expect(err).ToNot(HaveOccurred()) Expect(otherAcquired).To(BeTrue()) // Clean up conn2.ExecContext(context.Background(), "SELECT pg_advisory_unlock($1)", int64(43)) }) It("should support WithLockCtx for scoped locking", func() { executed := false err := advisorylock.WithLockCtx(context.Background(), db, 44, func() error { executed = true return nil }) Expect(err).ToNot(HaveOccurred()) Expect(executed).To(BeTrue()) }) }) })