diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 3b2d94544..6b8d69a2d 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -199,13 +199,27 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) { } } - // Auto-upgrade if enabled + // Auto-upgrade if enabled. Route through the active BackendManager so + // distributed-mode upgrades fan out to workers via NATS — calling + // gallery.UpgradeBackend directly would look up the backend on the + // frontend filesystem, which is empty in distributed mode and produces + // "backend not found" while the cluster still reports an upgrade. if uc.appConfig.AutoUpgradeBackends { + var bm galleryop.BackendManager + if uc.backendManagerFn != nil { + bm = uc.backendManagerFn() + } for name, info := range upgrades { xlog.Info("Auto-upgrading backend", "backend", name, "from", info.InstalledVersion, "to", info.AvailableVersion) - if err := gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader, - uc.galleries, name, nil); err != nil { + var err error + if bm != nil { + err = bm.UpgradeBackend(ctx, name, nil) + } else { + err = gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader, + uc.galleries, name, nil) + } + if err != nil { xlog.Error("Failed to auto-upgrade backend", "backend", name, "error", err) } else { @@ -213,8 +227,16 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) { "version", info.AvailableVersion) } } - // Re-check to update cache after upgrades - if freshUpgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState); err == nil { + // Re-check to update cache after upgrades. Route through the same + // BackendManager so distributed mode reflects the worker view. + var freshUpgrades map[string]gallery.UpgradeInfo + var freshErr error + if bm != nil { + freshUpgrades, freshErr = bm.CheckUpgrades(ctx) + } else { + freshUpgrades, freshErr = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + } + if freshErr == nil { uc.mu.Lock() uc.lastUpgrades = freshUpgrades uc.mu.Unlock() diff --git a/core/cli/worker.go b/core/cli/worker.go index 368143ae0..032e80a74 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -465,10 +465,20 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e bp := s.processes[backend] s.mu.Unlock() - // Wait for the gRPC server to be ready + // Wait for the gRPC server to be ready before reporting success. + // Slow nodes (Jetson Orin doing first-boot CUDA init, large CGO libs) + // can take 10-15s before the gRPC port accepts connections; the previous + // 4s window made the worker reply Success on a not-yet-listening port, + // which manifested upstream as "connect: connection refused" on the + // frontend's first LoadModel dial. client := grpc.NewClientWithToken(clientAddr, false, nil, false, s.cmd.RegistrationToken) - for range 20 { - time.Sleep(200 * time.Millisecond) + const ( + readinessPollInterval = 200 * time.Millisecond + readinessTimeout = 30 * time.Second + ) + deadline := time.Now().Add(readinessTimeout) + for time.Now().Before(deadline) { + time.Sleep(readinessPollInterval) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) if ok, _ := client.HealthCheck(ctx); ok { cancel() @@ -496,10 +506,23 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e } } - // Log stderr to help diagnose why the backend isn't responding + // Readiness deadline exceeded. Returning success here would leave the + // frontend with an unbound address (it dials, gets ECONNREFUSED, and + // the operator sees a misleading "connection refused" instead of the + // real cause). Stop the half-started process, recycle the port, and + // surface the failure to the caller with the backend's stderr tail. stderrTail := readLastLinesFromFile(proc.StderrPath(), 20) - xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr, "stderr", stderrTail) - return clientAddr, nil + xlog.Error("Backend gRPC server not ready before deadline; aborting install", "backend", backend, "addr", clientAddr, "timeout", readinessTimeout, "stderr", stderrTail) + if killErr := proc.Stop(); killErr != nil { + xlog.Warn("Failed to stop unready backend process", "backend", backend, "error", killErr) + } + s.mu.Lock() + if cur, ok := s.processes[backend]; ok && cur == bp { + delete(s.processes, backend) + s.freePorts = append(s.freePorts, port) + } + s.mu.Unlock() + return "", fmt.Errorf("backend %s did not become ready within %s. Last stderr:\n%s", backend, readinessTimeout, stderrTail) } // resolveProcessKeys turns a caller-supplied identifier into the set of diff --git a/core/services/nodes/health.go b/core/services/nodes/health.go index dd0e903f1..ad570fd81 100644 --- a/core/services/nodes/health.go +++ b/core/services/nodes/health.go @@ -126,6 +126,12 @@ func (hm *HealthMonitor) doCheckAll(ctx context.Context) { // Workers (both backend and agent) send HTTP heartbeats to the frontend. // If the heartbeat is stale, the worker is presumed down. if time.Since(node.LastHeartbeat) > hm.staleThreshold { + // Skip nodes already marked offline/unhealthy — re-marking them + // every cycle floods the log with the same WARN+INFO pair for + // nodes the operator has intentionally taken down. + if node.Status == StatusOffline || node.Status == StatusUnhealthy { + continue + } xlog.Warn("Node heartbeat stale", "node", node.Name, "lastHeartbeat", node.LastHeartbeat) if hm.autoOffline { xlog.Info("Marking stale node offline", "node", node.Name) diff --git a/go.mod b/go.mod index 132b9fa91..62edd83a4 100644 --- a/go.mod +++ b/go.mod @@ -167,8 +167,8 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380 - github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc // indirect + github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010 + github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab // indirect github.com/mudler/skillserver v0.0.6 github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/oxffaa/gopher-parse-sitemap v0.0.0-20191021113419-005d2eb1def4 // indirect diff --git a/go.sum b/go.sum index 53f9f2961..46ccc4170 100644 --- a/go.sum +++ b/go.sum @@ -703,8 +703,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380 h1:gSS535c1MO3IRSUIWJT1xzZjT4lZBsqtHpptXvrEsmw= -github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380/go.mod h1:rD7G70wl+5zlpvNF13iZBpAuat8LsiJFn678z3Kxleo= +github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010 h1:b5MBD3gq+H/tN2dVFqkFI6CvSrBUnmvdGPl6ivtSrSc= +github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010/go.mod h1:QOB+zg2jARzslqhy2c/59CW2Kcp0JEHOiNIDeCRFP2s= github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU= github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4= github.com/mudler/edgevpn v0.31.1 h1:7qegiDWd0kAg6ljhNHxqvp8hbo/6BbzSdbb7/2WZfiY= @@ -713,8 +713,8 @@ github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig= github.com/mudler/go-processmanager v0.1.1 h1:c/1NRZOZpW8HuFv9RhBG57nQu1oDMRomEHedwBFMlrw= github.com/mudler/go-processmanager v0.1.1/go.mod h1:h6kmHUZeafr+k5hRYpGLMzJFH4hItHffgpRo2QIkP+o= -github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc h1:p1ucQ2rbU4mhG2Xl1Emg5Q6QCYCjI+fvMF9KTek/+sY= -github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc/go.mod h1:xuPtgL9zUyiQLmspYzO3kaboYrGbWmwi8BQPt1aCAcs= +github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab h1:U6MWVv9Xgb56JTIL4DfsZftSig/LeJA+yizlyw8fq24= +github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab/go.mod h1:xuPtgL9zUyiQLmspYzO3kaboYrGbWmwi8BQPt1aCAcs= github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8 h1:Ry8RiWy8fZ6Ff4E7dPmjRsBrnHOnPeOOj2LhCgyjQu0= github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8/go.mod h1:EA8Ashhd56o32qN7ouPKFSRUs/Z+LrRCF4v6R2Oarm8= github.com/mudler/skillserver v0.0.6 h1:ixz6wUekLdTmbnpAavCkTydDF6UdXAG3ncYufSPK9G0=