mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-17 04:56:52 -04:00
* fix(nodes/health): skip stale-marking already-offline nodes The health monitor re-emitted "Node heartbeat stale" + "Marking stale node offline" + MarkOffline on every cycle for nodes that were already in the offline (or unhealthy) state. For an operator-stopped node this flooded the logs with the same WARN+INFO pair every check interval. Skip the staleness branch when the node is already StatusOffline / StatusUnhealthy — the state is already what we'd write, so neither the log lines nor the DB update carry information. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(worker): wait for backend gRPC bind before replying to backend.install The backend supervisor used to wait up to 4s (20 × 200ms) for the backend's gRPC server to answer a HealthCheck, then log a warning and reply Success with the bind address anyway. On slower nodes (a Jetson Orin doing first-boot CUDA init, large CGO library load) the gRPC listener wasn't up yet, so the frontend's first LoadModel dial returned "connect: connection refused" and the operator chased a phantom network issue instead of a startup-timing one. Two changes: - Bump the readiness window to 30s. CUDA init on Orin/Thor first boot measures in seconds, not milliseconds. - On deadline-exceeded, stop the half-started process, recycle the port, and return an error with the backend's stderr tail. The frontend now gets a real failure with diagnostic context instead of a misleading ECONNREFUSED on a downstream dial. Process death during the wait window keeps its existing fast-fail path. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(distributed): route auto-upgrade through BackendManager + bump LocalAGI/LocalRecall Two distributed-mode bugs that surfaced together in the orchestrator logs: 1. Auto-upgrade always failed with "backend not found". UpgradeChecker correctly routed CheckUpgrades through the active BackendManager (so the frontend aggregates worker state), but the auto-upgrade branch right below called gallery.UpgradeBackend directly with the frontend's SystemState. In distributed mode the frontend has no backends installed locally, so ListSystemBackends returned empty and Get(name) failed for every reported upgrade. Auto-upgrade now also goes through BackendManager.UpgradeBackend, which fans out to workers via NATS. 2. Embedding-load failure on a remote node crashed the orchestrator. When RAG init lazily called NewPersistentPostgresCollection and the remote embedding worker was unreachable, LocalRecall called os.Exit(1) inside the constructor, killing the orchestrator pod. LocalRecall now returns errors instead, LocalAGI surfaces them as a nil collection, and the existing RAGProviderFromState path returns (nil, nil, false) — the same code path the agent pool already takes when no RAG is configured. The orchestrator stays up; chat requests degrade to "no RAG available" until the embedding worker recovers. Bumps: github.com/mudler/LocalAGI → e83bf515d010 github.com/mudler/localrecall → 6138c1f535ab Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
246 lines
7.4 KiB
Go
246 lines
7.4 KiB
Go
package application
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/advisorylock"
|
|
"github.com/mudler/LocalAI/core/services/galleryop"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/system"
|
|
"github.com/mudler/xlog"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// UpgradeChecker periodically checks for backend upgrades and optionally
|
|
// auto-upgrades them. It caches the last check results for API queries.
|
|
//
|
|
// In standalone mode it runs a simple ticker loop.
|
|
// In distributed mode it uses a PostgreSQL advisory lock so that only one
|
|
// frontend instance performs periodic checks and auto-upgrades at a time.
|
|
type UpgradeChecker struct {
|
|
appConfig *config.ApplicationConfig
|
|
modelLoader *model.ModelLoader
|
|
galleries []config.Gallery
|
|
systemState *system.SystemState
|
|
db *gorm.DB // non-nil in distributed mode
|
|
// backendManagerFn lazily returns the current backend manager (may be
|
|
// swapped from Local to Distributed after startup). Pulled through each
|
|
// check so the UpgradeChecker uses whichever is active. In distributed
|
|
// mode this ensures CheckUpgrades asks workers instead of the (empty)
|
|
// frontend filesystem — fixing the bug where upgrades never surfaced.
|
|
backendManagerFn func() galleryop.BackendManager
|
|
|
|
checkInterval time.Duration
|
|
stop chan struct{}
|
|
done chan struct{}
|
|
triggerCh chan struct{}
|
|
|
|
mu sync.RWMutex
|
|
lastUpgrades map[string]gallery.UpgradeInfo
|
|
lastCheckTime time.Time
|
|
}
|
|
|
|
// NewUpgradeChecker creates a new UpgradeChecker service.
|
|
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
|
|
// (uses advisory locks so only one instance runs periodic checks).
|
|
// backendManagerFn is optional; when set, CheckUpgrades is routed through
|
|
// the active backend manager — required in distributed mode so the check
|
|
// aggregates from workers rather than the empty frontend filesystem.
|
|
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker {
|
|
return &UpgradeChecker{
|
|
appConfig: appConfig,
|
|
modelLoader: ml,
|
|
galleries: appConfig.BackendGalleries,
|
|
systemState: appConfig.SystemState,
|
|
db: db,
|
|
backendManagerFn: backendManagerFn,
|
|
checkInterval: 6 * time.Hour,
|
|
stop: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
triggerCh: make(chan struct{}, 1),
|
|
lastUpgrades: make(map[string]gallery.UpgradeInfo),
|
|
}
|
|
}
|
|
|
|
// Run starts the upgrade checker loop. It waits 30 seconds after startup,
|
|
// performs an initial check, then re-checks every 6 hours.
|
|
//
|
|
// In distributed mode, periodic checks are guarded by a PostgreSQL advisory
|
|
// lock so only one frontend instance runs them. On-demand triggers (TriggerCheck)
|
|
// and the initial check always run locally for fast API response cache warming.
|
|
func (uc *UpgradeChecker) Run(ctx context.Context) {
|
|
defer close(uc.done)
|
|
|
|
// Initial delay: don't slow down startup. Short enough that operators
|
|
// don't stare at an empty upgrade banner for long; long enough that
|
|
// workers have registered and reported their installed backends.
|
|
initialDelay := 10 * time.Second
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-uc.stop:
|
|
return
|
|
case <-time.After(initialDelay):
|
|
}
|
|
|
|
// First check always runs locally (to warm the cache on this instance)
|
|
uc.runCheck(ctx)
|
|
|
|
if uc.db != nil {
|
|
// Distributed mode: use advisory lock for periodic checks.
|
|
// RunLeaderLoop ticks every checkInterval; only the lock holder executes.
|
|
go advisorylock.RunLeaderLoop(ctx, uc.db, advisorylock.KeyBackendUpgradeCheck, uc.checkInterval, func() {
|
|
uc.runCheck(ctx)
|
|
})
|
|
|
|
// Still listen for on-demand triggers (from API / settings change)
|
|
// and stop signal — these run on every instance.
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-uc.stop:
|
|
return
|
|
case <-uc.triggerCh:
|
|
uc.runCheck(ctx)
|
|
}
|
|
}
|
|
} else {
|
|
// Standalone mode: simple ticker loop
|
|
ticker := time.NewTicker(uc.checkInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-uc.stop:
|
|
return
|
|
case <-ticker.C:
|
|
uc.runCheck(ctx)
|
|
case <-uc.triggerCh:
|
|
uc.runCheck(ctx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown stops the upgrade checker loop.
|
|
func (uc *UpgradeChecker) Shutdown() {
|
|
close(uc.stop)
|
|
<-uc.done
|
|
}
|
|
|
|
// TriggerCheck forces an immediate upgrade check on this instance.
|
|
func (uc *UpgradeChecker) TriggerCheck() {
|
|
select {
|
|
case uc.triggerCh <- struct{}{}:
|
|
default:
|
|
// Already triggered, skip
|
|
}
|
|
}
|
|
|
|
// GetAvailableUpgrades returns the cached upgrade check results.
|
|
func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo {
|
|
uc.mu.RLock()
|
|
defer uc.mu.RUnlock()
|
|
|
|
// Return a copy to avoid races
|
|
result := make(map[string]gallery.UpgradeInfo, len(uc.lastUpgrades))
|
|
for k, v := range uc.lastUpgrades {
|
|
result[k] = v
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (uc *UpgradeChecker) runCheck(ctx context.Context) {
|
|
var (
|
|
upgrades map[string]gallery.UpgradeInfo
|
|
err error
|
|
)
|
|
if uc.backendManagerFn != nil {
|
|
if bm := uc.backendManagerFn(); bm != nil {
|
|
upgrades, err = bm.CheckUpgrades(ctx)
|
|
}
|
|
}
|
|
if upgrades == nil && err == nil {
|
|
upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
|
|
}
|
|
|
|
uc.mu.Lock()
|
|
uc.lastCheckTime = time.Now()
|
|
if err != nil {
|
|
xlog.Debug("Backend upgrade check failed", "error", err)
|
|
uc.mu.Unlock()
|
|
return
|
|
}
|
|
uc.lastUpgrades = upgrades
|
|
uc.mu.Unlock()
|
|
|
|
if len(upgrades) == 0 {
|
|
xlog.Debug("All backends up to date")
|
|
return
|
|
}
|
|
|
|
// Log available upgrades
|
|
for name, info := range upgrades {
|
|
if info.AvailableVersion != "" {
|
|
xlog.Info("Backend upgrade available",
|
|
"backend", name,
|
|
"installed", info.InstalledVersion,
|
|
"available", info.AvailableVersion)
|
|
} else {
|
|
xlog.Info("Backend upgrade available (new build)",
|
|
"backend", name)
|
|
}
|
|
}
|
|
|
|
// Auto-upgrade if enabled. Route through the active BackendManager so
|
|
// distributed-mode upgrades fan out to workers via NATS — calling
|
|
// gallery.UpgradeBackend directly would look up the backend on the
|
|
// frontend filesystem, which is empty in distributed mode and produces
|
|
// "backend not found" while the cluster still reports an upgrade.
|
|
if uc.appConfig.AutoUpgradeBackends {
|
|
var bm galleryop.BackendManager
|
|
if uc.backendManagerFn != nil {
|
|
bm = uc.backendManagerFn()
|
|
}
|
|
for name, info := range upgrades {
|
|
xlog.Info("Auto-upgrading backend", "backend", name,
|
|
"from", info.InstalledVersion, "to", info.AvailableVersion)
|
|
var err error
|
|
if bm != nil {
|
|
err = bm.UpgradeBackend(ctx, name, nil)
|
|
} else {
|
|
err = gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader,
|
|
uc.galleries, name, nil)
|
|
}
|
|
if err != nil {
|
|
xlog.Error("Failed to auto-upgrade backend",
|
|
"backend", name, "error", err)
|
|
} else {
|
|
xlog.Info("Backend upgraded successfully", "backend", name,
|
|
"version", info.AvailableVersion)
|
|
}
|
|
}
|
|
// Re-check to update cache after upgrades. Route through the same
|
|
// BackendManager so distributed mode reflects the worker view.
|
|
var freshUpgrades map[string]gallery.UpgradeInfo
|
|
var freshErr error
|
|
if bm != nil {
|
|
freshUpgrades, freshErr = bm.CheckUpgrades(ctx)
|
|
} else {
|
|
freshUpgrades, freshErr = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
|
|
}
|
|
if freshErr == nil {
|
|
uc.mu.Lock()
|
|
uc.lastUpgrades = freshUpgrades
|
|
uc.mu.Unlock()
|
|
}
|
|
}
|
|
}
|