mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-16 12:59:33 -04:00
fix: use advisory lock for upgrade checker in distributed mode
In distributed mode with multiple frontend instances, use PostgreSQL advisory lock (KeyBackendUpgradeCheck) so only one instance runs periodic upgrade checks and auto-upgrades. Prevents duplicate upgrade operations across replicas. Standalone mode is unchanged (simple ticker loop).
This commit is contained in:
@@ -86,6 +86,15 @@ func (a *Application) UpgradeChecker() *UpgradeChecker {
|
||||
return a.upgradeChecker
|
||||
}
|
||||
|
||||
// distributedDB returns the PostgreSQL database for distributed coordination,
|
||||
// or nil in standalone mode.
|
||||
func (a *Application) distributedDB() *gorm.DB {
|
||||
if a.distributed != nil {
|
||||
return a.authDB
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Application) AgentPoolService() *agentpool.AgentPoolService {
|
||||
return a.agentPoolService.Load()
|
||||
}
|
||||
|
||||
@@ -231,9 +231,11 @@ func New(opts ...config.AppOption) (*Application, error) {
|
||||
xlog.Error("error registering external backends", "error", err)
|
||||
}
|
||||
|
||||
// Start background upgrade checker for backends
|
||||
// Start background upgrade checker for backends.
|
||||
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
|
||||
// instance runs periodic checks (avoids duplicate upgrades across replicas).
|
||||
if len(options.BackendGalleries) > 0 {
|
||||
uc := NewUpgradeChecker(options, application.ModelLoader())
|
||||
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB())
|
||||
application.upgradeChecker = uc
|
||||
go uc.Run(options.Context)
|
||||
}
|
||||
|
||||
@@ -7,18 +7,25 @@ import (
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/gallery"
|
||||
"github.com/mudler/LocalAI/core/services/advisorylock"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
"github.com/mudler/xlog"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// UpgradeChecker periodically checks for backend upgrades and optionally
|
||||
// auto-upgrades them. It caches the last check results for API queries.
|
||||
//
|
||||
// In standalone mode it runs a simple ticker loop.
|
||||
// In distributed mode it uses a PostgreSQL advisory lock so that only one
|
||||
// frontend instance performs periodic checks and auto-upgrades at a time.
|
||||
type UpgradeChecker struct {
|
||||
appConfig *config.ApplicationConfig
|
||||
modelLoader *model.ModelLoader
|
||||
galleries []config.Gallery
|
||||
systemState *system.SystemState
|
||||
db *gorm.DB // non-nil in distributed mode
|
||||
|
||||
checkInterval time.Duration
|
||||
stop chan struct{}
|
||||
@@ -31,12 +38,15 @@ type UpgradeChecker struct {
|
||||
}
|
||||
|
||||
// NewUpgradeChecker creates a new UpgradeChecker service.
|
||||
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader) *UpgradeChecker {
|
||||
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
|
||||
// (uses advisory locks so only one instance runs periodic checks).
|
||||
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker {
|
||||
return &UpgradeChecker{
|
||||
appConfig: appConfig,
|
||||
modelLoader: ml,
|
||||
galleries: appConfig.BackendGalleries,
|
||||
systemState: appConfig.SystemState,
|
||||
db: db,
|
||||
checkInterval: 6 * time.Hour,
|
||||
stop: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
@@ -47,6 +57,10 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade
|
||||
|
||||
// Run starts the upgrade checker loop. It waits 30 seconds after startup,
|
||||
// performs an initial check, then re-checks every 6 hours.
|
||||
//
|
||||
// In distributed mode, periodic checks are guarded by a PostgreSQL advisory
|
||||
// lock so only one frontend instance runs them. On-demand triggers (TriggerCheck)
|
||||
// and the initial check always run locally for fast API response cache warming.
|
||||
func (uc *UpgradeChecker) Run(ctx context.Context) {
|
||||
defer close(uc.done)
|
||||
|
||||
@@ -59,23 +73,44 @@ func (uc *UpgradeChecker) Run(ctx context.Context) {
|
||||
case <-time.After(30 * time.Second):
|
||||
}
|
||||
|
||||
// First check
|
||||
// First check always runs locally (to warm the cache on this instance)
|
||||
uc.runCheck(ctx)
|
||||
|
||||
// Periodic loop
|
||||
ticker := time.NewTicker(uc.checkInterval)
|
||||
defer ticker.Stop()
|
||||
if uc.db != nil {
|
||||
// Distributed mode: use advisory lock for periodic checks.
|
||||
// RunLeaderLoop ticks every checkInterval; only the lock holder executes.
|
||||
go advisorylock.RunLeaderLoop(ctx, uc.db, advisorylock.KeyBackendUpgradeCheck, uc.checkInterval, func() {
|
||||
uc.runCheck(ctx)
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-uc.stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
uc.runCheck(ctx)
|
||||
case <-uc.triggerCh:
|
||||
uc.runCheck(ctx)
|
||||
// Still listen for on-demand triggers (from API / settings change)
|
||||
// and stop signal — these run on every instance.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-uc.stop:
|
||||
return
|
||||
case <-uc.triggerCh:
|
||||
uc.runCheck(ctx)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Standalone mode: simple ticker loop
|
||||
ticker := time.NewTicker(uc.checkInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-uc.stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
uc.runCheck(ctx)
|
||||
case <-uc.triggerCh:
|
||||
uc.runCheck(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -86,7 +121,7 @@ func (uc *UpgradeChecker) Shutdown() {
|
||||
<-uc.done
|
||||
}
|
||||
|
||||
// TriggerCheck forces an immediate upgrade check.
|
||||
// TriggerCheck forces an immediate upgrade check on this instance.
|
||||
func (uc *UpgradeChecker) TriggerCheck() {
|
||||
select {
|
||||
case uc.triggerCh <- struct{}{}:
|
||||
|
||||
@@ -9,5 +9,6 @@ const (
|
||||
KeyGalleryDedup int64 = 102
|
||||
KeyAgentScheduler int64 = 103
|
||||
KeyHealthCheck int64 = 104
|
||||
KeySchemaMigrate int64 = 105
|
||||
KeySchemaMigrate int64 = 105
|
||||
KeyBackendUpgradeCheck int64 = 106
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user