diff --git a/core/http/auth/db_sqlite.go b/core/http/auth/db_sqlite.go index 5c13ecf05..eecabe4a5 100644 --- a/core/http/auth/db_sqlite.go +++ b/core/http/auth/db_sqlite.go @@ -3,10 +3,51 @@ package auth import ( + "net/url" + "strings" + "gorm.io/driver/sqlite" "gorm.io/gorm" ) func openSQLiteDialector(path string) (gorm.Dialector, error) { - return sqlite.Open(path), nil + return sqlite.Open(buildSQLiteDSN(path)), nil +} + +// buildSQLiteDSN augments a SQLite file path with connection pragmas that make +// the auth DB resilient on slow or contended storage. +// +// - _busy_timeout=5000 makes SQLite retry for up to 5s on SQLITE_BUSY instead +// of failing immediately. Network-backed storage (SMB/CIFS/NFS, e.g. Azure +// Files) is prone to transient lock contention during migration (see #10506). +// - _txlock=immediate takes the write lock at BEGIN, avoiding deadlocks when a +// read transaction later upgrades to a write during AutoMigrate. +// +// We deliberately do NOT set WAL journal mode: WAL relies on a shared-memory +// mmap that does not work over SMB/NFS, which is exactly the failing case here. +// +// Caller-supplied values for either pragma are preserved. +func buildSQLiteDSN(path string) string { + base := path + rawQuery := "" + if i := strings.IndexByte(path, '?'); i >= 0 { + base = path[:i] + rawQuery = path[i+1:] + } + + values, err := url.ParseQuery(rawQuery) + if err != nil { + // An unparseable query string means a hand-crafted DSN we should not + // risk corrupting; leave it untouched. + return path + } + + if values.Get("_busy_timeout") == "" { + values.Set("_busy_timeout", "5000") + } + if values.Get("_txlock") == "" { + values.Set("_txlock", "immediate") + } + + return base + "?" + values.Encode() } diff --git a/core/http/auth/db_sqlite_test.go b/core/http/auth/db_sqlite_test.go new file mode 100644 index 000000000..f1dc4e404 --- /dev/null +++ b/core/http/auth/db_sqlite_test.go @@ -0,0 +1,57 @@ +//go:build auth + +package auth + +import ( + "net/url" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// parseDSN splits a "base?query" DSN into its base and decoded query values so +// assertions don't depend on url.Values.Encode()'s key ordering. +func parseDSN(dsn string) (string, url.Values) { + base := dsn + rawQuery := "" + if i := strings.IndexByte(dsn, '?'); i >= 0 { + base = dsn[:i] + rawQuery = dsn[i+1:] + } + values, err := url.ParseQuery(rawQuery) + Expect(err).ToNot(HaveOccurred()) + return base, values +} + +var _ = Describe("buildSQLiteDSN", func() { + It("adds busy_timeout and txlock to a plain file path", func() { + base, values := parseDSN(buildSQLiteDSN("/data/database.db")) + Expect(base).To(Equal("/data/database.db")) + Expect(values.Get("_busy_timeout")).To(Equal("5000")) + Expect(values.Get("_txlock")).To(Equal("immediate")) + }) + + It("adds pragmas to an in-memory database", func() { + base, values := parseDSN(buildSQLiteDSN(":memory:")) + Expect(base).To(Equal(":memory:")) + Expect(values.Get("_busy_timeout")).To(Equal("5000")) + Expect(values.Get("_txlock")).To(Equal("immediate")) + }) + + It("preserves an existing query string", func() { + base, values := parseDSN(buildSQLiteDSN("/data/database.db?cache=shared")) + Expect(base).To(Equal("/data/database.db")) + Expect(values.Get("cache")).To(Equal("shared")) + Expect(values.Get("_busy_timeout")).To(Equal("5000")) + Expect(values.Get("_txlock")).To(Equal("immediate")) + }) + + It("does not override a caller-supplied busy_timeout or txlock", func() { + _, values := parseDSN(buildSQLiteDSN("/data/database.db?_busy_timeout=1000&_txlock=deferred")) + Expect(values["_busy_timeout"]).To(HaveLen(1), "_busy_timeout should not be duplicated") + Expect(values.Get("_busy_timeout")).To(Equal("1000")) + Expect(values["_txlock"]).To(HaveLen(1), "_txlock should not be duplicated") + Expect(values.Get("_txlock")).To(Equal("deferred")) + }) +}) diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index 6cc0afb2a..f51a6357e 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -4,14 +4,59 @@ import ( "context" "fmt" "hash/fnv" + "strings" + "sync" "gorm.io/gorm" ) -// TryWithLockCtx attempts to acquire a PostgreSQL advisory lock using the provided context. -// Returns (true, nil) if the lock was acquired and fn executed, (false, nil) if the lock -// was already held, or (false, error) on failure. +// localLocks holds one buffered channel (capacity 1) per lock key, used as an +// in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is +// effectively single-process, so serializing guarded sections within this +// process is sufficient - we cannot and need not coordinate across processes +// the way a PostgreSQL advisory lock does. +var ( + localLocksMu sync.Mutex + localLocks = map[int64]chan struct{}{} +) + +// localLockChan returns the per-key buffered channel, creating it on first use. +func localLockChan(key int64) chan struct{} { + localLocksMu.Lock() + defer localLocksMu.Unlock() + ch, ok := localLocks[key] + if !ok { + ch = make(chan struct{}, 1) + localLocks[key] = ch + } + return ch +} + +// isPostgres reports whether the gorm dialect is PostgreSQL. Anything else +// (SQLite and any non-postgres dialect) uses the in-process fallback, because +// the pg_* advisory lock functions only exist on PostgreSQL. +func isPostgres(db *gorm.DB) bool { + return strings.Contains(db.Dialector.Name(), "postgres") +} + +// TryWithLockCtx attempts to acquire a lock and run fn without blocking. +// Returns (true, nil) if the lock was acquired and fn executed, (false, nil) if +// the lock was already held, or (false, error) on failure. +// +// On PostgreSQL it uses pg_try_advisory_lock (cross-process). On other dialects +// (SQLite) it uses a non-blocking in-process lock keyed by key. func TryWithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) (bool, error) { + if !isPostgres(db) { + ch := localLockChan(key) + select { + case ch <- struct{}{}: + defer func() { <-ch }() + return true, fn() + default: + return false, nil + } + } + sqlDB, err := db.DB() if err != nil { return false, fmt.Errorf("get sql.DB: %w", err) @@ -50,9 +95,31 @@ func KeyFromString(s string) int64 { return int64(h.Sum64()>>1) | 0x100000000 } -// WithLockCtx is like WithLock but respects context cancellation. -// If ctx is cancelled while waiting for the lock, the function returns ctx.Err(). +// WithLockCtx acquires a lock for key, runs fn, then releases it, respecting +// context cancellation. If ctx is cancelled while waiting for the lock, the +// function returns ctx.Err(). +// +// On PostgreSQL it uses pg_advisory_lock (cross-process). On other dialects +// (SQLite) it falls back to a blocking in-process lock keyed by key, which is +// sufficient because a SQLite auth DB is effectively single-process. func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) error { + if !isPostgres(db) { + // Honor an already-cancelled context before attempting acquisition: + // select picks a ready case at random, so without this an already-free + // lock could be taken despite a cancelled ctx. + if err := ctx.Err(); err != nil { + return err + } + ch := localLockChan(key) + select { + case ch <- struct{}{}: + defer func() { <-ch }() + return fn() + case <-ctx.Done(): + return ctx.Err() + } + } + sqlDB, err := db.DB() if err != nil { return fmt.Errorf("advisorylock: getting sql.DB: %w", err) diff --git a/core/services/advisorylock/advisorylock_sqlite_test.go b/core/services/advisorylock/advisorylock_sqlite_test.go new file mode 100644 index 000000000..9e9b6abfd --- /dev/null +++ b/core/services/advisorylock/advisorylock_sqlite_test.go @@ -0,0 +1,129 @@ +package advisorylock + +import ( + "context" + "sync" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +// These specs run against an in-memory SQLite DB and therefore do NOT require +// Docker, unlike the PostgreSQL testcontainer specs. +var _ = Describe("AdvisoryLock (SQLite fallback)", Label("sqlite"), func() { + var db *gorm.DB + + BeforeEach(func() { + var err error + db, err = gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + Expect(err).ToNot(HaveOccurred()) + Expect(db.Dialector.Name()).To(ContainSubstring("sqlite")) + }) + + It("WithLockCtx executes fn and returns no error on SQLite", func() { + const lockKey int64 = 12001 + executed := false + + err := WithLockCtx(context.Background(), db, lockKey, func() error { + executed = true + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(executed).To(BeTrue(), "function should have run under the in-process lock") + }) + + It("WithLockCtx serializes concurrent goroutines on the same key", func() { + const lockKey int64 = 12002 + + var ( + mu sync.Mutex + maxRunning int32 + running int32 + concurrency int32 + ) + + var wg sync.WaitGroup + + for range 2 { + wg.Go(func() { + defer GinkgoRecover() + err := WithLockCtx(context.Background(), db, lockKey, func() error { + cur := atomic.AddInt32(&running, 1) + mu.Lock() + if cur > maxRunning { + maxRunning = cur + } + if cur > 1 { + atomic.AddInt32(&concurrency, 1) + } + mu.Unlock() + + time.Sleep(50 * time.Millisecond) + + atomic.AddInt32(&running, -1) + return nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + } + + wg.Wait() + + Expect(maxRunning).To(BeNumerically("<=", 1), "expected max 1 goroutine inside lock at a time") + Expect(concurrency).To(BeZero(), "detected concurrent execution inside advisory lock") + }) + + It("WithLockCtx returns an error and does not run fn with an already-cancelled context", func() { + const lockKey int64 = 12003 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := WithLockCtx(ctx, db, lockKey, func() error { + Fail("function should not run with a cancelled context") + return nil + }) + Expect(err).To(HaveOccurred()) + }) + + It("TryWithLockCtx returns (true, nil) when free and (false, nil) when held", func() { + const lockKey int64 = 12004 + + acquired, err := TryWithLockCtx(context.Background(), db, lockKey, func() error { + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(acquired).To(BeTrue(), "expected TryWithLockCtx to acquire the free lock") + + // Hold the lock in one goroutine while a concurrent TryWithLockCtx + // attempts to acquire the same key. + held := make(chan struct{}) + release := make(chan struct{}) + var wg sync.WaitGroup + wg.Go(func() { + defer GinkgoRecover() + ok, err := TryWithLockCtx(context.Background(), db, lockKey, func() error { + close(held) + <-release + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(ok).To(BeTrue()) + }) + + <-held + ok, err := TryWithLockCtx(context.Background(), db, lockKey, func() error { + Fail("function should not run while lock is held") + return nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(ok).To(BeFalse(), "expected TryWithLockCtx to fail to acquire a held lock") + + close(release) + wg.Wait() + }) +}) diff --git a/core/services/jobs/sqlite_e2e_test.go b/core/services/jobs/sqlite_e2e_test.go new file mode 100644 index 000000000..26cb5a669 --- /dev/null +++ b/core/services/jobs/sqlite_e2e_test.go @@ -0,0 +1,24 @@ +//go:build auth + +package jobs_test + +import ( + "github.com/mudler/LocalAI/core/http/auth" + "github.com/mudler/LocalAI/core/services/jobs" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// Reproduces the #10506 caller chain: auth.InitDB(sqlite) -> jobs.NewJobStore, +// which previously failed with "no such function: pg_advisory_lock". +var _ = Describe("NewJobStore on a SQLite auth DB (#10506)", func() { + It("migrates without pg_advisory_lock errors", func() { + db, err := auth.InitDB(":memory:") + Expect(err).ToNot(HaveOccurred()) + + store, err := jobs.NewJobStore(db) + Expect(err).ToNot(HaveOccurred()) + Expect(store).ToNot(BeNil()) + }) +}) diff --git a/docs/content/features/authentication.md b/docs/content/features/authentication.md index 35f3cc9ae..ffaa43b34 100644 --- a/docs/content/features/authentication.md +++ b/docs/content/features/authentication.md @@ -85,6 +85,8 @@ localai run | `LOCALAI_REGISTRATION_MODE` | `approval` | Registration mode: `open`, `approval`, or `invite` | | `LOCALAI_DISABLE_LOCAL_AUTH` | `false` | Disable local email/password registration and login (for OAuth/OIDC-only deployments) | +> **Note: network-backed storage.** File-based SQLite relies on POSIX file locking, which is unreliable over network filesystems (SMB/CIFS/NFS, e.g. Azure Files / Azure Container Apps shared volumes). On such storage the auth DB can fail to migrate with `database is locked`. Use PostgreSQL (`LOCALAI_AUTH_DATABASE_URL=postgres://...`) when the data directory lives on shared or network storage, or place `database.db` on a local volume. + ### Disabling Local Authentication If you want to enforce OAuth/OIDC-only login and prevent users from registering or logging in with email/password, set `LOCALAI_DISABLE_LOCAL_AUTH=true` (or pass `--disable-local-auth`):