From ee00a10836eed4004cbe5e5664886b1a706988e1 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sat, 11 Apr 2026 08:24:03 +0000 Subject: [PATCH] fix: use advisory lock for upgrade checker in distributed mode In distributed mode with multiple frontend instances, use PostgreSQL advisory lock (KeyBackendUpgradeCheck) so only one instance runs periodic upgrade checks and auto-upgrades. Prevents duplicate upgrade operations across replicas. Standalone mode is unchanged (simple ticker loop). --- core/application/application.go | 9 ++++ core/application/startup.go | 6 ++- core/application/upgrade_checker.go | 67 ++++++++++++++++++++++------- core/services/advisorylock/keys.go | 3 +- 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/core/application/application.go b/core/application/application.go index 4af5ac9bb..9be613ab7 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -86,6 +86,15 @@ func (a *Application) UpgradeChecker() *UpgradeChecker { return a.upgradeChecker } +// distributedDB returns the PostgreSQL database for distributed coordination, +// or nil in standalone mode. +func (a *Application) distributedDB() *gorm.DB { + if a.distributed != nil { + return a.authDB + } + return nil +} + func (a *Application) AgentPoolService() *agentpool.AgentPoolService { return a.agentPoolService.Load() } diff --git a/core/application/startup.go b/core/application/startup.go index d3aa9dcd0..a03f17bd2 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -231,9 +231,11 @@ func New(opts ...config.AppOption) (*Application, error) { xlog.Error("error registering external backends", "error", err) } - // Start background upgrade checker for backends + // Start background upgrade checker for backends. + // In distributed mode, uses PostgreSQL advisory lock so only one frontend + // instance runs periodic checks (avoids duplicate upgrades across replicas). if len(options.BackendGalleries) > 0 { - uc := NewUpgradeChecker(options, application.ModelLoader()) + uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB()) application.upgradeChecker = uc go uc.Run(options.Context) } diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 774b15fa7..94fb3f6c7 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -7,18 +7,25 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services/advisorylock" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" + "gorm.io/gorm" ) // UpgradeChecker periodically checks for backend upgrades and optionally // auto-upgrades them. It caches the last check results for API queries. +// +// In standalone mode it runs a simple ticker loop. +// In distributed mode it uses a PostgreSQL advisory lock so that only one +// frontend instance performs periodic checks and auto-upgrades at a time. type UpgradeChecker struct { appConfig *config.ApplicationConfig modelLoader *model.ModelLoader galleries []config.Gallery systemState *system.SystemState + db *gorm.DB // non-nil in distributed mode checkInterval time.Duration stop chan struct{} @@ -31,12 +38,15 @@ type UpgradeChecker struct { } // NewUpgradeChecker creates a new UpgradeChecker service. -func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader) *UpgradeChecker { +// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode +// (uses advisory locks so only one instance runs periodic checks). +func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker { return &UpgradeChecker{ appConfig: appConfig, modelLoader: ml, galleries: appConfig.BackendGalleries, systemState: appConfig.SystemState, + db: db, checkInterval: 6 * time.Hour, stop: make(chan struct{}), done: make(chan struct{}), @@ -47,6 +57,10 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade // Run starts the upgrade checker loop. It waits 30 seconds after startup, // performs an initial check, then re-checks every 6 hours. +// +// In distributed mode, periodic checks are guarded by a PostgreSQL advisory +// lock so only one frontend instance runs them. On-demand triggers (TriggerCheck) +// and the initial check always run locally for fast API response cache warming. func (uc *UpgradeChecker) Run(ctx context.Context) { defer close(uc.done) @@ -59,23 +73,44 @@ func (uc *UpgradeChecker) Run(ctx context.Context) { case <-time.After(30 * time.Second): } - // First check + // First check always runs locally (to warm the cache on this instance) uc.runCheck(ctx) - // Periodic loop - ticker := time.NewTicker(uc.checkInterval) - defer ticker.Stop() + if uc.db != nil { + // Distributed mode: use advisory lock for periodic checks. + // RunLeaderLoop ticks every checkInterval; only the lock holder executes. + go advisorylock.RunLeaderLoop(ctx, uc.db, advisorylock.KeyBackendUpgradeCheck, uc.checkInterval, func() { + uc.runCheck(ctx) + }) - for { - select { - case <-ctx.Done(): - return - case <-uc.stop: - return - case <-ticker.C: - uc.runCheck(ctx) - case <-uc.triggerCh: - uc.runCheck(ctx) + // Still listen for on-demand triggers (from API / settings change) + // and stop signal — these run on every instance. + for { + select { + case <-ctx.Done(): + return + case <-uc.stop: + return + case <-uc.triggerCh: + uc.runCheck(ctx) + } + } + } else { + // Standalone mode: simple ticker loop + ticker := time.NewTicker(uc.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-uc.stop: + return + case <-ticker.C: + uc.runCheck(ctx) + case <-uc.triggerCh: + uc.runCheck(ctx) + } } } } @@ -86,7 +121,7 @@ func (uc *UpgradeChecker) Shutdown() { <-uc.done } -// TriggerCheck forces an immediate upgrade check. +// TriggerCheck forces an immediate upgrade check on this instance. func (uc *UpgradeChecker) TriggerCheck() { select { case uc.triggerCh <- struct{}{}: diff --git a/core/services/advisorylock/keys.go b/core/services/advisorylock/keys.go index ebca65932..d5378a5d1 100644 --- a/core/services/advisorylock/keys.go +++ b/core/services/advisorylock/keys.go @@ -9,5 +9,6 @@ const ( KeyGalleryDedup int64 = 102 KeyAgentScheduler int64 = 103 KeyHealthCheck int64 = 104 - KeySchemaMigrate int64 = 105 + KeySchemaMigrate int64 = 105 + KeyBackendUpgradeCheck int64 = 106 )