diff --git a/core/application/application.go b/core/application/application.go index 4af5ac9bb..9be613ab7 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -86,6 +86,15 @@ func (a *Application) UpgradeChecker() *UpgradeChecker { return a.upgradeChecker } +// distributedDB returns the PostgreSQL database for distributed coordination, +// or nil in standalone mode. +func (a *Application) distributedDB() *gorm.DB { + if a.distributed != nil { + return a.authDB + } + return nil +} + func (a *Application) AgentPoolService() *agentpool.AgentPoolService { return a.agentPoolService.Load() } diff --git a/core/application/startup.go b/core/application/startup.go index d3aa9dcd0..a03f17bd2 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -231,9 +231,11 @@ func New(opts ...config.AppOption) (*Application, error) { xlog.Error("error registering external backends", "error", err) } - // Start background upgrade checker for backends + // Start background upgrade checker for backends. + // In distributed mode, uses PostgreSQL advisory lock so only one frontend + // instance runs periodic checks (avoids duplicate upgrades across replicas). if len(options.BackendGalleries) > 0 { - uc := NewUpgradeChecker(options, application.ModelLoader()) + uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB()) application.upgradeChecker = uc go uc.Run(options.Context) } diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 774b15fa7..94fb3f6c7 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -7,18 +7,25 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services/advisorylock" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" + "gorm.io/gorm" ) // UpgradeChecker periodically checks for backend upgrades and optionally // auto-upgrades them. It caches the last check results for API queries. +// +// In standalone mode it runs a simple ticker loop. +// In distributed mode it uses a PostgreSQL advisory lock so that only one +// frontend instance performs periodic checks and auto-upgrades at a time. type UpgradeChecker struct { appConfig *config.ApplicationConfig modelLoader *model.ModelLoader galleries []config.Gallery systemState *system.SystemState + db *gorm.DB // non-nil in distributed mode checkInterval time.Duration stop chan struct{} @@ -31,12 +38,15 @@ type UpgradeChecker struct { } // NewUpgradeChecker creates a new UpgradeChecker service. -func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader) *UpgradeChecker { +// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode +// (uses advisory locks so only one instance runs periodic checks). +func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker { return &UpgradeChecker{ appConfig: appConfig, modelLoader: ml, galleries: appConfig.BackendGalleries, systemState: appConfig.SystemState, + db: db, checkInterval: 6 * time.Hour, stop: make(chan struct{}), done: make(chan struct{}), @@ -47,6 +57,10 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade // Run starts the upgrade checker loop. It waits 30 seconds after startup, // performs an initial check, then re-checks every 6 hours. +// +// In distributed mode, periodic checks are guarded by a PostgreSQL advisory +// lock so only one frontend instance runs them. On-demand triggers (TriggerCheck) +// and the initial check always run locally for fast API response cache warming. func (uc *UpgradeChecker) Run(ctx context.Context) { defer close(uc.done) @@ -59,23 +73,44 @@ func (uc *UpgradeChecker) Run(ctx context.Context) { case <-time.After(30 * time.Second): } - // First check + // First check always runs locally (to warm the cache on this instance) uc.runCheck(ctx) - // Periodic loop - ticker := time.NewTicker(uc.checkInterval) - defer ticker.Stop() + if uc.db != nil { + // Distributed mode: use advisory lock for periodic checks. + // RunLeaderLoop ticks every checkInterval; only the lock holder executes. + go advisorylock.RunLeaderLoop(ctx, uc.db, advisorylock.KeyBackendUpgradeCheck, uc.checkInterval, func() { + uc.runCheck(ctx) + }) - for { - select { - case <-ctx.Done(): - return - case <-uc.stop: - return - case <-ticker.C: - uc.runCheck(ctx) - case <-uc.triggerCh: - uc.runCheck(ctx) + // Still listen for on-demand triggers (from API / settings change) + // and stop signal — these run on every instance. + for { + select { + case <-ctx.Done(): + return + case <-uc.stop: + return + case <-uc.triggerCh: + uc.runCheck(ctx) + } + } + } else { + // Standalone mode: simple ticker loop + ticker := time.NewTicker(uc.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-uc.stop: + return + case <-ticker.C: + uc.runCheck(ctx) + case <-uc.triggerCh: + uc.runCheck(ctx) + } } } } @@ -86,7 +121,7 @@ func (uc *UpgradeChecker) Shutdown() { <-uc.done } -// TriggerCheck forces an immediate upgrade check. +// TriggerCheck forces an immediate upgrade check on this instance. func (uc *UpgradeChecker) TriggerCheck() { select { case uc.triggerCh <- struct{}{}: diff --git a/core/services/advisorylock/keys.go b/core/services/advisorylock/keys.go index ebca65932..d5378a5d1 100644 --- a/core/services/advisorylock/keys.go +++ b/core/services/advisorylock/keys.go @@ -9,5 +9,6 @@ const ( KeyGalleryDedup int64 = 102 KeyAgentScheduler int64 = 103 KeyHealthCheck int64 = 104 - KeySchemaMigrate int64 = 105 + KeySchemaMigrate int64 = 105 + KeyBackendUpgradeCheck int64 = 106 )