mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-29 03:24:49 -04:00
fix(distributed): worker stopBackend/isRunning resolve bare modelID to replica keys
PR #9583 changed the supervisor's process map key from `modelID` to `modelID#replicaIndex`, but the NATS lifecycle handlers kept passing the bare modelID: * `backend.stop` (subscribeLifecycleEvents): `s.stopBackend(req.Backend)` → `s.processes["Qwen3.6-..."]` missed (actual key is "...#0") → silent no-op. Admin "Unload model" clicks released VRAM via model.unload but left the gRPC process alive on its old port. Subsequent chats hit installBackend, found the leftover process, reused its address — and the UI reported "no models loaded" while the model kept responding. * `backend.delete` (subscribeLifecycleEvents): same map miss in `isRunning(req.Backend)` and `s.stopBackend(req.Backend)` — admin "Delete backend" deleted the binary while the process was still serving traffic. Add `resolveProcessKeys(id)`: exact match if `id` is a full processKey (stopAllBackends iterates the map and passes its own keys); prefix-match if `id` is bare (NATS handlers); empty if `id` contains `#` but doesn't match (no spurious fallback when the caller was explicit). stopBackend and isRunning now call it; stopBackend gets a new stopBackendExact helper for per-key cleanup. TDD: regression test fails without the fix (resolveProcessKeys doesn't exist; map lookup by bare name returns nothing). Tests pass post-fix. Reproduced live: registry row count was 0 for the model the user "Unloaded", chat still served by the leftover worker process. SmartRouter behavior is correct in itself — it falls through to scheduleAndLoad when no row exists; the bug was that the leftover process corrupted the install path. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: claude-code:opus-4-7 [Edit] [Bash]
This commit is contained in:
@@ -502,16 +502,58 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e
|
||||
return clientAddr, nil
|
||||
}
|
||||
|
||||
// stopBackend stops a specific backend's gRPC process.
|
||||
func (s *backendSupervisor) stopBackend(backend string) {
|
||||
// resolveProcessKeys turns a caller-supplied identifier into the set of
|
||||
// process map keys it refers to. PR #9583 changed s.processes to be keyed by
|
||||
// `modelID#replicaIndex`, but external NATS handlers still pass the bare
|
||||
// model ID — without this resolver, those lookups silently no-op'd, so
|
||||
// admin "Unload model" / "Delete backend" left the worker process alive.
|
||||
//
|
||||
// - Exact match wins. Callers that already know the full processKey
|
||||
// (stopAllBackends iterating its own map) get exactly that entry.
|
||||
// - Else, an identifier without `#` is treated as a model prefix and
|
||||
// every `id#N` replica is returned.
|
||||
// - An identifier that contains `#` but doesn't match anything returns
|
||||
// nothing — no spurious prefix fallback when the caller was explicit.
|
||||
func (s *backendSupervisor) resolveProcessKeys(id string) []string {
|
||||
s.mu.Lock()
|
||||
bp, ok := s.processes[backend]
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.processes[id]; ok {
|
||||
return []string{id}
|
||||
}
|
||||
if strings.Contains(id, "#") {
|
||||
return nil
|
||||
}
|
||||
prefix := id + "#"
|
||||
var keys []string
|
||||
for k := range s.processes {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// stopBackend stops the backend process(es) matching the given identifier.
|
||||
// Accepts a bare modelID (stops every replica) or a full processKey
|
||||
// (stops just that replica).
|
||||
func (s *backendSupervisor) stopBackend(id string) {
|
||||
for _, key := range s.resolveProcessKeys(id) {
|
||||
s.stopBackendExact(key)
|
||||
}
|
||||
}
|
||||
|
||||
// stopBackendExact stops the process under exactly this key. Locking and
|
||||
// network I/O are split: the map mutation runs under the lock, the gRPC
|
||||
// Free() and proc.Stop() calls run after release so they don't block
|
||||
// other supervisor operations.
|
||||
func (s *backendSupervisor) stopBackendExact(key string) {
|
||||
s.mu.Lock()
|
||||
bp, ok := s.processes[key]
|
||||
if !ok || bp.proc == nil {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Clean up map and recycle port while holding lock
|
||||
delete(s.processes, backend)
|
||||
delete(s.processes, key)
|
||||
if _, portStr, err := net.SplitHostPort(bp.addr); err == nil {
|
||||
if p, err := strconv.Atoi(portStr); err == nil {
|
||||
s.freePorts = append(s.freePorts, p)
|
||||
@@ -519,16 +561,15 @@ func (s *backendSupervisor) stopBackend(backend string) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Network I/O outside the lock
|
||||
client := grpc.NewClientWithToken(bp.addr, false, nil, false, s.cmd.RegistrationToken)
|
||||
xlog.Debug("Calling Free() before stopping backend", "backend", backend)
|
||||
xlog.Debug("Calling Free() before stopping backend", "backend", key)
|
||||
if err := client.Free(context.Background()); err != nil {
|
||||
xlog.Warn("Free() failed (best-effort)", "backend", backend, "error", err)
|
||||
xlog.Warn("Free() failed (best-effort)", "backend", key, "error", err)
|
||||
}
|
||||
|
||||
xlog.Info("Stopping backend process", "backend", backend, "addr", bp.addr)
|
||||
xlog.Info("Stopping backend process", "backend", key, "addr", bp.addr)
|
||||
if err := bp.proc.Stop(); err != nil {
|
||||
xlog.Error("Error stopping backend process", "backend", backend, "error", err)
|
||||
xlog.Error("Error stopping backend process", "backend", key, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -557,12 +598,24 @@ func readLastLinesFromFile(path string, n int) string {
|
||||
return strings.Join(lines, "\n")
|
||||
}
|
||||
|
||||
// isRunning returns whether a specific backend process is currently running.
|
||||
func (s *backendSupervisor) isRunning(backend string) bool {
|
||||
// isRunning returns whether at least one backend process matching the given
|
||||
// identifier is currently running. Accepts a bare modelID (matches any
|
||||
// replica) or a full processKey (exact match). Callers like the
|
||||
// backend.delete pre-check rely on the bare-name path.
|
||||
func (s *backendSupervisor) isRunning(id string) bool {
|
||||
keys := s.resolveProcessKeys(id)
|
||||
if len(keys) == 0 {
|
||||
// Same lock-free zero-process check the caller would have done.
|
||||
return false
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
bp, ok := s.processes[backend]
|
||||
return ok && bp.proc != nil && bp.proc.IsAlive()
|
||||
for _, key := range keys {
|
||||
if bp, ok := s.processes[key]; ok && bp.proc != nil && bp.proc.IsAlive() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getAddr returns the gRPC address for a running backend, or empty string.
|
||||
|
||||
@@ -67,4 +67,55 @@ var _ = Describe("Worker per-replica process keying", func() {
|
||||
Expect(labels).To(HaveKeyWithValue("node.replica-slots", "2"))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Process map lookup by bare model name", func() {
|
||||
// Regression: PR #9583 changed the supervisor's map key from
|
||||
// `modelID` to `modelID#replicaIndex`. The NATS backend.stop
|
||||
// handler kept passing the bare modelID, so the lookup silently
|
||||
// no-op'd — the worker process stayed alive after an admin
|
||||
// "Unload model" click, and subsequent chats kept being served
|
||||
// by the leftover process. The registry rows were gone, so the
|
||||
// UI reported "no models loaded" while the model kept
|
||||
// responding. resolveProcessKeys must turn a bare modelID into
|
||||
// the actual replica process keys so stop/isRunning find the
|
||||
// running processes.
|
||||
It("resolves a bare modelID to its replica process keys", func() {
|
||||
s := &backendSupervisor{
|
||||
processes: map[string]*backendProcess{
|
||||
"qwen3.6-35B#0": {addr: "127.0.0.1:50051"},
|
||||
"qwen3.6-35B#1": {addr: "127.0.0.1:50052"},
|
||||
"other-model#0": {addr: "127.0.0.1:50053"},
|
||||
},
|
||||
}
|
||||
keys := s.resolveProcessKeys("qwen3.6-35B")
|
||||
Expect(keys).To(ConsistOf("qwen3.6-35B#0", "qwen3.6-35B#1"),
|
||||
"bare modelID must match all replica process keys")
|
||||
|
||||
// Bare modelID for a model with no live processes returns nothing.
|
||||
Expect(s.resolveProcessKeys("not-loaded")).To(BeEmpty())
|
||||
|
||||
// Full processKey resolves to itself (per-replica callers stay precise).
|
||||
Expect(s.resolveProcessKeys("qwen3.6-35B#0")).To(ConsistOf("qwen3.6-35B#0"))
|
||||
|
||||
// A processKey that doesn't exist returns nothing — no spurious
|
||||
// prefix fallback when the caller was explicit.
|
||||
Expect(s.resolveProcessKeys("qwen3.6-35B#9")).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("isRunning returns false when no replica matches", func() {
|
||||
// We can only test the not-found path without a real *process.Process
|
||||
// (IsAlive() requires PID introspection). That's enough to pin the
|
||||
// regression — pre-fix, isRunning("qwen3.6-35B") would always
|
||||
// return false because the map was keyed by "qwen3.6-35B#0".
|
||||
// Post-fix, isRunning calls resolveProcessKeys first, so the
|
||||
// per-replica lookup is exercised before the IsAlive probe.
|
||||
s := &backendSupervisor{processes: map[string]*backendProcess{}}
|
||||
Expect(s.isRunning("qwen3.6-35B")).To(BeFalse())
|
||||
// resolveProcessKeys finds the replica entries (the lookup contract
|
||||
// is what the backend.delete handler relies on); the IsAlive probe
|
||||
// itself is exercised by the integration path in distributed mode.
|
||||
s.processes["qwen3.6-35B#0"] = &backendProcess{addr: "127.0.0.1:50051"}
|
||||
Expect(s.resolveProcessKeys("qwen3.6-35B")).To(ConsistOf("qwen3.6-35B#0"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user