mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-30 03:55:58 -04:00
The multi-replica refactor (PR #9583) changed the worker's process key from `modelID` to `modelID#replicaIndex`, but the BackendLogStore kept the bare-modelID lookup. Result: every distributed deployment lost backend logs in the Nodes UI — single-replica too, since even the default capacity of 1 produces a `#0` suffix. Two changes wired together: * pkg/model: BackendLogStore.GetLines/Subscribe now treat a modelID without `#` as a model prefix and merge across all `modelID#N` replica buffers (timestamp-sorted for GetLines; fan-in for Subscribe). Calls with a full `modelID#N` key resolve exactly. ListModels strips replica suffixes and deduplicates so the listing surfaces one entry per loaded model. * react-ui: per-replica log streams as the default. Loaded Models table disambiguates each row with a `rep N` pill (only when the node hosts >1 replica of a model). Each row's "View logs" link routes to the per-replica process key so operators see only that replica's output. The logs page renders the replica context as a chip in the title and surfaces a segmented control — `Replica 0 / 1 / … / All merged` — when the model has multiple replicas; the merged segment uses the bare-modelID URL (delegating to the store's prefix aggregation) for the side-by-side comparison case. Single-replica deployments see no extra UI. Tests added first (TDD): the regression set in backend_log_store_test.go reproduces the bug at the exact failure point — GetLines/ListModels/Subscribe assertions all fail against the broken code, all pass against the fix. TestSubscribe_PerReplicaFilter pins the exact-key path so a future change can't silently break it. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: claude-code:opus-4-7 [Edit] [Skill:critique] [Skill:audit] [Skill:polish] [Skill:distill]
310 lines
8.4 KiB
Go
310 lines
8.4 KiB
Go
package model
|
|
|
|
import (
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/emirpasic/gods/v2/queues/circularbuffer"
|
|
)
|
|
|
|
// replicaSeparator separates a model ID from the replica index in the
|
|
// supervisor's process key (e.g. "qwen3-0.6b#0"). Mirrored from the
|
|
// worker's buildProcessKey — duplicated as a constant here to keep this
|
|
// package free of CLI imports.
|
|
const replicaSeparator = "#"
|
|
|
|
// BackendLogLine represents a single line of output from a backend process.
|
|
type BackendLogLine struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Stream string `json:"stream"` // "stdout" or "stderr"
|
|
Text string `json:"text"`
|
|
}
|
|
|
|
// backendLogBuffer wraps a circular buffer for a single model's logs
|
|
// and tracks subscribers for real-time streaming.
|
|
type backendLogBuffer struct {
|
|
mu sync.Mutex
|
|
queue *circularbuffer.Queue[BackendLogLine]
|
|
subscribers map[int]chan BackendLogLine
|
|
nextSubID int
|
|
}
|
|
|
|
// BackendLogStore stores per-model backend process output in circular buffers
|
|
// and supports real-time subscriptions for WebSocket streaming.
|
|
type BackendLogStore struct {
|
|
mu sync.RWMutex // protects the buffers map only
|
|
buffers map[string]*backendLogBuffer
|
|
maxLines int
|
|
}
|
|
|
|
// NewBackendLogStore creates a new BackendLogStore with a maximum number of
|
|
// lines retained per model.
|
|
func NewBackendLogStore(maxLinesPerModel int) *BackendLogStore {
|
|
if maxLinesPerModel <= 0 {
|
|
maxLinesPerModel = 1000
|
|
}
|
|
return &BackendLogStore{
|
|
buffers: make(map[string]*backendLogBuffer),
|
|
maxLines: maxLinesPerModel,
|
|
}
|
|
}
|
|
|
|
// getOrCreateBuffer returns the buffer for modelID, creating it if needed.
|
|
func (s *BackendLogStore) getOrCreateBuffer(modelID string) *backendLogBuffer {
|
|
s.mu.RLock()
|
|
buf, ok := s.buffers[modelID]
|
|
s.mu.RUnlock()
|
|
if ok {
|
|
return buf
|
|
}
|
|
|
|
s.mu.Lock()
|
|
buf, ok = s.buffers[modelID]
|
|
if !ok {
|
|
buf = &backendLogBuffer{
|
|
queue: circularbuffer.New[BackendLogLine](s.maxLines),
|
|
subscribers: make(map[int]chan BackendLogLine),
|
|
}
|
|
s.buffers[modelID] = buf
|
|
}
|
|
s.mu.Unlock()
|
|
return buf
|
|
}
|
|
|
|
// AppendLine adds a log line for the given model. The buffer is lazily created.
|
|
// All active subscribers for this model are notified (non-blocking).
|
|
func (s *BackendLogStore) AppendLine(modelID, stream, text string) {
|
|
line := BackendLogLine{
|
|
Timestamp: time.Now(),
|
|
Stream: stream,
|
|
Text: text,
|
|
}
|
|
|
|
buf := s.getOrCreateBuffer(modelID)
|
|
buf.mu.Lock()
|
|
buf.queue.Enqueue(line)
|
|
for _, ch := range buf.subscribers {
|
|
select {
|
|
case ch <- line:
|
|
default:
|
|
}
|
|
}
|
|
buf.mu.Unlock()
|
|
}
|
|
|
|
// GetLines returns a copy of all log lines for a model, or an empty slice.
|
|
//
|
|
// When modelID contains no replica suffix (no `#`), it's treated as a model
|
|
// prefix and the lines from all `modelID#N` replicas are merged in
|
|
// timestamp order. This keeps the existing per-model logs UI working in
|
|
// distributed mode after the worker started using `modelID#replicaIndex`
|
|
// as its process key (multi-replica refactor) — the UI asks for "qwen3-0.6b"
|
|
// and gets the union of all replicas' logs.
|
|
//
|
|
// When modelID contains a `#` (e.g. "qwen3-0.6b#0"), it's treated as an
|
|
// exact process key for per-replica filtering by callers that need it.
|
|
func (s *BackendLogStore) GetLines(modelID string) []BackendLogLine {
|
|
s.mu.RLock()
|
|
exactBuf, exactOK := s.buffers[modelID]
|
|
s.mu.RUnlock()
|
|
|
|
// Exact match — single key. Caller knew the full process key.
|
|
if exactOK {
|
|
exactBuf.mu.Lock()
|
|
lines := exactBuf.queue.Values()
|
|
exactBuf.mu.Unlock()
|
|
return lines
|
|
}
|
|
|
|
// No exact match: aggregate any replicas if modelID looks like a model prefix.
|
|
if strings.Contains(modelID, replicaSeparator) {
|
|
return []BackendLogLine{}
|
|
}
|
|
|
|
prefix := modelID + replicaSeparator
|
|
var matching []*backendLogBuffer
|
|
s.mu.RLock()
|
|
for k, b := range s.buffers {
|
|
if strings.HasPrefix(k, prefix) {
|
|
matching = append(matching, b)
|
|
}
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
if len(matching) == 0 {
|
|
return []BackendLogLine{}
|
|
}
|
|
|
|
// Merge the per-replica buffers and sort by timestamp so the operator
|
|
// sees a single coherent timeline rather than per-replica blocks.
|
|
var merged []BackendLogLine
|
|
for _, b := range matching {
|
|
b.mu.Lock()
|
|
merged = append(merged, b.queue.Values()...)
|
|
b.mu.Unlock()
|
|
}
|
|
sort.SliceStable(merged, func(i, j int) bool { return merged[i].Timestamp.Before(merged[j].Timestamp) })
|
|
return merged
|
|
}
|
|
|
|
// ListModels returns a sorted list of model IDs that have log buffers.
|
|
// Replica suffixes (`#N`) are stripped and the result is deduplicated, so
|
|
// callers see one entry per loaded model regardless of replica count.
|
|
func (s *BackendLogStore) ListModels() []string {
|
|
s.mu.RLock()
|
|
seen := make(map[string]struct{}, len(s.buffers))
|
|
for id := range s.buffers {
|
|
base := id
|
|
if i := strings.Index(id, replicaSeparator); i >= 0 {
|
|
base = id[:i]
|
|
}
|
|
seen[base] = struct{}{}
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
models := make([]string, 0, len(seen))
|
|
for id := range seen {
|
|
models = append(models, id)
|
|
}
|
|
sort.Strings(models)
|
|
return models
|
|
}
|
|
|
|
// Clear removes all log lines for a model but keeps the buffer entry.
|
|
func (s *BackendLogStore) Clear(modelID string) {
|
|
s.mu.RLock()
|
|
buf, ok := s.buffers[modelID]
|
|
s.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
buf.mu.Lock()
|
|
buf.queue.Clear()
|
|
buf.mu.Unlock()
|
|
}
|
|
|
|
// Remove deletes the buffer entry for a model entirely.
|
|
func (s *BackendLogStore) Remove(modelID string) {
|
|
s.mu.Lock()
|
|
if buf, ok := s.buffers[modelID]; ok {
|
|
buf.mu.Lock()
|
|
for _, ch := range buf.subscribers {
|
|
close(ch)
|
|
}
|
|
buf.mu.Unlock()
|
|
delete(s.buffers, modelID)
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// Subscribe returns a channel that receives new log lines for the given model
|
|
// in real-time, plus an unsubscribe function. The channel has a buffer of 100
|
|
// lines to absorb short bursts without blocking the writer.
|
|
//
|
|
// Like GetLines, a modelID without a `#` separator subscribes to every
|
|
// matching `modelID#N` replica buffer that exists at subscribe time, so the
|
|
// stream merges all replicas. Subscribers are NOT auto-attached to replicas
|
|
// that come up later — callers needing dynamic membership should resubscribe.
|
|
func (s *BackendLogStore) Subscribe(modelID string) (chan BackendLogLine, func()) {
|
|
ch := make(chan BackendLogLine, 100)
|
|
|
|
// Per-replica caller (full process key) — exact subscription.
|
|
if strings.Contains(modelID, replicaSeparator) {
|
|
buf := s.getOrCreateBuffer(modelID)
|
|
buf.mu.Lock()
|
|
id := buf.nextSubID
|
|
buf.nextSubID++
|
|
buf.subscribers[id] = ch
|
|
buf.mu.Unlock()
|
|
unsubscribe := func() {
|
|
buf.mu.Lock()
|
|
if _, exists := buf.subscribers[id]; exists {
|
|
delete(buf.subscribers, id)
|
|
close(ch)
|
|
}
|
|
buf.mu.Unlock()
|
|
}
|
|
return ch, unsubscribe
|
|
}
|
|
|
|
// Aggregated caller: subscribe to the bare-modelID buffer (for back-compat
|
|
// with single-replica writers that still write to the un-suffixed key) AND
|
|
// to every existing `modelID#N` replica buffer. Each per-buffer subscription
|
|
// receives lines into its own channel; we fan them in to `ch` here.
|
|
type subRef struct {
|
|
buf *backendLogBuffer
|
|
id int
|
|
ch chan BackendLogLine
|
|
}
|
|
var refs []subRef
|
|
|
|
subscribe := func(buf *backendLogBuffer) {
|
|
bufCh := make(chan BackendLogLine, 100)
|
|
buf.mu.Lock()
|
|
id := buf.nextSubID
|
|
buf.nextSubID++
|
|
buf.subscribers[id] = bufCh
|
|
buf.mu.Unlock()
|
|
refs = append(refs, subRef{buf: buf, id: id, ch: bufCh})
|
|
}
|
|
|
|
if buf, ok := func() (*backendLogBuffer, bool) {
|
|
s.mu.RLock()
|
|
b, ok := s.buffers[modelID]
|
|
s.mu.RUnlock()
|
|
return b, ok
|
|
}(); ok {
|
|
subscribe(buf)
|
|
}
|
|
|
|
prefix := modelID + replicaSeparator
|
|
s.mu.RLock()
|
|
for k, b := range s.buffers {
|
|
if strings.HasPrefix(k, prefix) {
|
|
subscribe(b)
|
|
}
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
// Fan-in goroutine: forward every per-buffer channel into the merged
|
|
// channel until all source channels close, then close the merged channel.
|
|
if len(refs) == 0 {
|
|
// No source buffers yet: still return a channel so callers don't crash;
|
|
// it'll close on unsubscribe.
|
|
unsubscribe := func() { close(ch) }
|
|
return ch, unsubscribe
|
|
}
|
|
|
|
var fanWG sync.WaitGroup
|
|
closeOnce := sync.OnceFunc(func() { close(ch) })
|
|
for _, r := range refs {
|
|
fanWG.Add(1)
|
|
go func(c chan BackendLogLine) {
|
|
defer fanWG.Done()
|
|
for line := range c {
|
|
select {
|
|
case ch <- line:
|
|
default: // drop on slow consumer to match non-aggregated behavior
|
|
}
|
|
}
|
|
}(r.ch)
|
|
}
|
|
go func() { fanWG.Wait(); closeOnce() }()
|
|
|
|
unsubscribe := func() {
|
|
for _, r := range refs {
|
|
r.buf.mu.Lock()
|
|
if c, exists := r.buf.subscribers[r.id]; exists {
|
|
delete(r.buf.subscribers, r.id)
|
|
close(c) // closes the per-buffer source channel; fan-in goroutine exits
|
|
}
|
|
r.buf.mu.Unlock()
|
|
}
|
|
closeOnce()
|
|
}
|
|
|
|
return ch, unsubscribe
|
|
}
|