From 3948b580d206a468e2545b048f18d459c6ddcfe8 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 27 Apr 2026 21:43:02 +0000 Subject: [PATCH] fix(distributed): worker stopBackend/isRunning resolve bare modelID to replica keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #9583 changed the supervisor's process map key from `modelID` to `modelID#replicaIndex`, but the NATS lifecycle handlers kept passing the bare modelID: * `backend.stop` (subscribeLifecycleEvents): `s.stopBackend(req.Backend)` → `s.processes["Qwen3.6-..."]` missed (actual key is "...#0") → silent no-op. Admin "Unload model" clicks released VRAM via model.unload but left the gRPC process alive on its old port. Subsequent chats hit installBackend, found the leftover process, reused its address — and the UI reported "no models loaded" while the model kept responding. * `backend.delete` (subscribeLifecycleEvents): same map miss in `isRunning(req.Backend)` and `s.stopBackend(req.Backend)` — admin "Delete backend" deleted the binary while the process was still serving traffic. Add `resolveProcessKeys(id)`: exact match if `id` is a full processKey (stopAllBackends iterates the map and passes its own keys); prefix-match if `id` is bare (NATS handlers); empty if `id` contains `#` but doesn't match (no spurious fallback when the caller was explicit). stopBackend and isRunning now call it; stopBackend gets a new stopBackendExact helper for per-key cleanup. TDD: regression test fails without the fix (resolveProcessKeys doesn't exist; map lookup by bare name returns nothing). Tests pass post-fix. Reproduced live: registry row count was 0 for the model the user "Unloaded", chat still served by the leftover worker process. SmartRouter behavior is correct in itself — it falls through to scheduleAndLoad when no row exists; the bug was that the leftover process corrupted the install path. Signed-off-by: Ettore Di Giacinto Assisted-by: claude-code:opus-4-7 [Edit] [Bash] --- core/cli/worker.go | 81 +++++++++++++++++++++++++++------ core/cli/worker_replica_test.go | 51 +++++++++++++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) diff --git a/core/cli/worker.go b/core/cli/worker.go index 1b8e59666..368143ae0 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -502,16 +502,58 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e return clientAddr, nil } -// stopBackend stops a specific backend's gRPC process. -func (s *backendSupervisor) stopBackend(backend string) { +// resolveProcessKeys turns a caller-supplied identifier into the set of +// process map keys it refers to. PR #9583 changed s.processes to be keyed by +// `modelID#replicaIndex`, but external NATS handlers still pass the bare +// model ID — without this resolver, those lookups silently no-op'd, so +// admin "Unload model" / "Delete backend" left the worker process alive. +// +// - Exact match wins. Callers that already know the full processKey +// (stopAllBackends iterating its own map) get exactly that entry. +// - Else, an identifier without `#` is treated as a model prefix and +// every `id#N` replica is returned. +// - An identifier that contains `#` but doesn't match anything returns +// nothing — no spurious prefix fallback when the caller was explicit. +func (s *backendSupervisor) resolveProcessKeys(id string) []string { s.mu.Lock() - bp, ok := s.processes[backend] + defer s.mu.Unlock() + if _, ok := s.processes[id]; ok { + return []string{id} + } + if strings.Contains(id, "#") { + return nil + } + prefix := id + "#" + var keys []string + for k := range s.processes { + if strings.HasPrefix(k, prefix) { + keys = append(keys, k) + } + } + return keys +} + +// stopBackend stops the backend process(es) matching the given identifier. +// Accepts a bare modelID (stops every replica) or a full processKey +// (stops just that replica). +func (s *backendSupervisor) stopBackend(id string) { + for _, key := range s.resolveProcessKeys(id) { + s.stopBackendExact(key) + } +} + +// stopBackendExact stops the process under exactly this key. Locking and +// network I/O are split: the map mutation runs under the lock, the gRPC +// Free() and proc.Stop() calls run after release so they don't block +// other supervisor operations. +func (s *backendSupervisor) stopBackendExact(key string) { + s.mu.Lock() + bp, ok := s.processes[key] if !ok || bp.proc == nil { s.mu.Unlock() return } - // Clean up map and recycle port while holding lock - delete(s.processes, backend) + delete(s.processes, key) if _, portStr, err := net.SplitHostPort(bp.addr); err == nil { if p, err := strconv.Atoi(portStr); err == nil { s.freePorts = append(s.freePorts, p) @@ -519,16 +561,15 @@ func (s *backendSupervisor) stopBackend(backend string) { } s.mu.Unlock() - // Network I/O outside the lock client := grpc.NewClientWithToken(bp.addr, false, nil, false, s.cmd.RegistrationToken) - xlog.Debug("Calling Free() before stopping backend", "backend", backend) + xlog.Debug("Calling Free() before stopping backend", "backend", key) if err := client.Free(context.Background()); err != nil { - xlog.Warn("Free() failed (best-effort)", "backend", backend, "error", err) + xlog.Warn("Free() failed (best-effort)", "backend", key, "error", err) } - xlog.Info("Stopping backend process", "backend", backend, "addr", bp.addr) + xlog.Info("Stopping backend process", "backend", key, "addr", bp.addr) if err := bp.proc.Stop(); err != nil { - xlog.Error("Error stopping backend process", "backend", backend, "error", err) + xlog.Error("Error stopping backend process", "backend", key, "error", err) } } @@ -557,12 +598,24 @@ func readLastLinesFromFile(path string, n int) string { return strings.Join(lines, "\n") } -// isRunning returns whether a specific backend process is currently running. -func (s *backendSupervisor) isRunning(backend string) bool { +// isRunning returns whether at least one backend process matching the given +// identifier is currently running. Accepts a bare modelID (matches any +// replica) or a full processKey (exact match). Callers like the +// backend.delete pre-check rely on the bare-name path. +func (s *backendSupervisor) isRunning(id string) bool { + keys := s.resolveProcessKeys(id) + if len(keys) == 0 { + // Same lock-free zero-process check the caller would have done. + return false + } s.mu.Lock() defer s.mu.Unlock() - bp, ok := s.processes[backend] - return ok && bp.proc != nil && bp.proc.IsAlive() + for _, key := range keys { + if bp, ok := s.processes[key]; ok && bp.proc != nil && bp.proc.IsAlive() { + return true + } + } + return false } // getAddr returns the gRPC address for a running backend, or empty string. diff --git a/core/cli/worker_replica_test.go b/core/cli/worker_replica_test.go index 6b2001b75..36e88a67a 100644 --- a/core/cli/worker_replica_test.go +++ b/core/cli/worker_replica_test.go @@ -67,4 +67,55 @@ var _ = Describe("Worker per-replica process keying", func() { Expect(labels).To(HaveKeyWithValue("node.replica-slots", "2")) }) }) + + Describe("Process map lookup by bare model name", func() { + // Regression: PR #9583 changed the supervisor's map key from + // `modelID` to `modelID#replicaIndex`. The NATS backend.stop + // handler kept passing the bare modelID, so the lookup silently + // no-op'd — the worker process stayed alive after an admin + // "Unload model" click, and subsequent chats kept being served + // by the leftover process. The registry rows were gone, so the + // UI reported "no models loaded" while the model kept + // responding. resolveProcessKeys must turn a bare modelID into + // the actual replica process keys so stop/isRunning find the + // running processes. + It("resolves a bare modelID to its replica process keys", func() { + s := &backendSupervisor{ + processes: map[string]*backendProcess{ + "qwen3.6-35B#0": {addr: "127.0.0.1:50051"}, + "qwen3.6-35B#1": {addr: "127.0.0.1:50052"}, + "other-model#0": {addr: "127.0.0.1:50053"}, + }, + } + keys := s.resolveProcessKeys("qwen3.6-35B") + Expect(keys).To(ConsistOf("qwen3.6-35B#0", "qwen3.6-35B#1"), + "bare modelID must match all replica process keys") + + // Bare modelID for a model with no live processes returns nothing. + Expect(s.resolveProcessKeys("not-loaded")).To(BeEmpty()) + + // Full processKey resolves to itself (per-replica callers stay precise). + Expect(s.resolveProcessKeys("qwen3.6-35B#0")).To(ConsistOf("qwen3.6-35B#0")) + + // A processKey that doesn't exist returns nothing — no spurious + // prefix fallback when the caller was explicit. + Expect(s.resolveProcessKeys("qwen3.6-35B#9")).To(BeEmpty()) + }) + + It("isRunning returns false when no replica matches", func() { + // We can only test the not-found path without a real *process.Process + // (IsAlive() requires PID introspection). That's enough to pin the + // regression — pre-fix, isRunning("qwen3.6-35B") would always + // return false because the map was keyed by "qwen3.6-35B#0". + // Post-fix, isRunning calls resolveProcessKeys first, so the + // per-replica lookup is exercised before the IsAlive probe. + s := &backendSupervisor{processes: map[string]*backendProcess{}} + Expect(s.isRunning("qwen3.6-35B")).To(BeFalse()) + // resolveProcessKeys finds the replica entries (the lookup contract + // is what the backend.delete handler relies on); the IsAlive probe + // itself is exercised by the integration path in distributed mode. + s.processes["qwen3.6-35B#0"] = &backendProcess{addr: "127.0.0.1:50051"} + Expect(s.resolveProcessKeys("qwen3.6-35B")).To(ConsistOf("qwen3.6-35B#0")) + }) + }) })