diff --git a/core/http/react-ui/src/pages/Nodes.jsx b/core/http/react-ui/src/pages/Nodes.jsx
index 8326d81bc..bc6383b72 100644
--- a/core/http/react-ui/src/pages/Nodes.jsx
+++ b/core/http/react-ui/src/pages/Nodes.jsx
@@ -1198,12 +1198,38 @@ export default function Nodes() {
- {models.map(m => {
- const stCfg = modelStateConfig[m.state] || modelStateConfig.idle
- return (
-
+ {(() => {
+ // Pre-compute per-model replica counts so the disambiguation
+ // pill only renders when this node actually hosts >1 replica
+ // of the same model. Single-replica deployments stay clean.
+ const replicaCounts = {}
+ models.forEach(m => { replicaCounts[m.model_name] = (replicaCounts[m.model_name] || 0) + 1 })
+ return models.map(m => {
+ const stCfg = modelStateConfig[m.state] || modelStateConfig.idle
+ const showReplica = (replicaCounts[m.model_name] || 0) > 1
+ // Per-replica process key — what the worker stores logs under and what the
+ // store's GetLines/Subscribe match on for replica-scoped filtering.
+ const processKey = `${m.model_name}#${m.replica_index ?? 0}`
+ return (
+
|
{m.model_name}
+ {showReplica && (
+
+ rep {m.replica_index ?? 0}
+
+ )}
|
{
e.preventDefault()
- navigate(`/app/node-backend-logs/${node.id}/${encodeURIComponent(m.model_name)}`)
+ // Send the replica-scoped process key (modelName#replicaIndex).
+ // The worker's BackendLogStore returns only this replica's lines
+ // when given the full key; a future "merged" toggle in the logs
+ // page can navigate to the bare modelName URL to use aggregation.
+ navigate(`/app/node-backend-logs/${node.id}/${encodeURIComponent(processKey)}`)
}}
style={{ fontSize: '0.75rem', color: 'var(--color-primary)' }}
- title="View backend logs"
+ title={showReplica ? `View backend logs for replica ${m.replica_index ?? 0}` : 'View backend logs'}
>
@@ -1249,7 +1279,8 @@ export default function Nodes() {
|
)
- })}
+ })
+ })()}
)}
diff --git a/pkg/model/backend_log_store.go b/pkg/model/backend_log_store.go
index c20b387d8..720460c22 100644
--- a/pkg/model/backend_log_store.go
+++ b/pkg/model/backend_log_store.go
@@ -2,12 +2,19 @@ package model
import (
"sort"
+ "strings"
"sync"
"time"
"github.com/emirpasic/gods/v2/queues/circularbuffer"
)
+// replicaSeparator separates a model ID from the replica index in the
+// supervisor's process key (e.g. "qwen3-0.6b#0"). Mirrored from the
+// worker's buildProcessKey — duplicated as a constant here to keep this
+// package free of CLI imports.
+const replicaSeparator = "#"
+
// BackendLogLine represents a single line of output from a backend process.
type BackendLogLine struct {
Timestamp time.Time `json:"timestamp"`
@@ -88,29 +95,79 @@ func (s *BackendLogStore) AppendLine(modelID, stream, text string) {
}
// GetLines returns a copy of all log lines for a model, or an empty slice.
+//
+// When modelID contains no replica suffix (no `#`), it's treated as a model
+// prefix and the lines from all `modelID#N` replicas are merged in
+// timestamp order. This keeps the existing per-model logs UI working in
+// distributed mode after the worker started using `modelID#replicaIndex`
+// as its process key (multi-replica refactor) — the UI asks for "qwen3-0.6b"
+// and gets the union of all replicas' logs.
+//
+// When modelID contains a `#` (e.g. "qwen3-0.6b#0"), it's treated as an
+// exact process key for per-replica filtering by callers that need it.
func (s *BackendLogStore) GetLines(modelID string) []BackendLogLine {
s.mu.RLock()
- buf, ok := s.buffers[modelID]
+ exactBuf, exactOK := s.buffers[modelID]
s.mu.RUnlock()
- if !ok {
+
+ // Exact match — single key. Caller knew the full process key.
+ if exactOK {
+ exactBuf.mu.Lock()
+ lines := exactBuf.queue.Values()
+ exactBuf.mu.Unlock()
+ return lines
+ }
+
+ // No exact match: aggregate any replicas if modelID looks like a model prefix.
+ if strings.Contains(modelID, replicaSeparator) {
return []BackendLogLine{}
}
- buf.mu.Lock()
- lines := buf.queue.Values()
- buf.mu.Unlock()
- return lines
-}
-
-// ListModels returns a sorted list of model IDs that have log buffers.
-func (s *BackendLogStore) ListModels() []string {
+ prefix := modelID + replicaSeparator
+ var matching []*backendLogBuffer
s.mu.RLock()
- models := make([]string, 0, len(s.buffers))
- for id := range s.buffers {
- models = append(models, id)
+ for k, b := range s.buffers {
+ if strings.HasPrefix(k, prefix) {
+ matching = append(matching, b)
+ }
}
s.mu.RUnlock()
+ if len(matching) == 0 {
+ return []BackendLogLine{}
+ }
+
+ // Merge the per-replica buffers and sort by timestamp so the operator
+ // sees a single coherent timeline rather than per-replica blocks.
+ var merged []BackendLogLine
+ for _, b := range matching {
+ b.mu.Lock()
+ merged = append(merged, b.queue.Values()...)
+ b.mu.Unlock()
+ }
+ sort.SliceStable(merged, func(i, j int) bool { return merged[i].Timestamp.Before(merged[j].Timestamp) })
+ return merged
+}
+
+// ListModels returns a sorted list of model IDs that have log buffers.
+// Replica suffixes (`#N`) are stripped and the result is deduplicated, so
+// callers see one entry per loaded model regardless of replica count.
+func (s *BackendLogStore) ListModels() []string {
+ s.mu.RLock()
+ seen := make(map[string]struct{}, len(s.buffers))
+ for id := range s.buffers {
+ base := id
+ if i := strings.Index(id, replicaSeparator); i >= 0 {
+ base = id[:i]
+ }
+ seen[base] = struct{}{}
+ }
+ s.mu.RUnlock()
+
+ models := make([]string, 0, len(seen))
+ for id := range seen {
+ models = append(models, id)
+ }
sort.Strings(models)
return models
}
@@ -145,23 +202,107 @@ func (s *BackendLogStore) Remove(modelID string) {
// Subscribe returns a channel that receives new log lines for the given model
// in real-time, plus an unsubscribe function. The channel has a buffer of 100
// lines to absorb short bursts without blocking the writer.
+//
+// Like GetLines, a modelID without a `#` separator subscribes to every
+// matching `modelID#N` replica buffer that exists at subscribe time, so the
+// stream merges all replicas. Subscribers are NOT auto-attached to replicas
+// that come up later — callers needing dynamic membership should resubscribe.
func (s *BackendLogStore) Subscribe(modelID string) (chan BackendLogLine, func()) {
ch := make(chan BackendLogLine, 100)
- buf := s.getOrCreateBuffer(modelID)
- buf.mu.Lock()
- id := buf.nextSubID
- buf.nextSubID++
- buf.subscribers[id] = ch
- buf.mu.Unlock()
+ // Per-replica caller (full process key) — exact subscription.
+ if strings.Contains(modelID, replicaSeparator) {
+ buf := s.getOrCreateBuffer(modelID)
+ buf.mu.Lock()
+ id := buf.nextSubID
+ buf.nextSubID++
+ buf.subscribers[id] = ch
+ buf.mu.Unlock()
+ unsubscribe := func() {
+ buf.mu.Lock()
+ if _, exists := buf.subscribers[id]; exists {
+ delete(buf.subscribers, id)
+ close(ch)
+ }
+ buf.mu.Unlock()
+ }
+ return ch, unsubscribe
+ }
+
+ // Aggregated caller: subscribe to the bare-modelID buffer (for back-compat
+ // with single-replica writers that still write to the un-suffixed key) AND
+ // to every existing `modelID#N` replica buffer. Each per-buffer subscription
+ // receives lines into its own channel; we fan them in to `ch` here.
+ type subRef struct {
+ buf *backendLogBuffer
+ id int
+ ch chan BackendLogLine
+ }
+ var refs []subRef
+
+ subscribe := func(buf *backendLogBuffer) {
+ bufCh := make(chan BackendLogLine, 100)
+ buf.mu.Lock()
+ id := buf.nextSubID
+ buf.nextSubID++
+ buf.subscribers[id] = bufCh
+ buf.mu.Unlock()
+ refs = append(refs, subRef{buf: buf, id: id, ch: bufCh})
+ }
+
+ if buf, ok := func() (*backendLogBuffer, bool) {
+ s.mu.RLock()
+ b, ok := s.buffers[modelID]
+ s.mu.RUnlock()
+ return b, ok
+ }(); ok {
+ subscribe(buf)
+ }
+
+ prefix := modelID + replicaSeparator
+ s.mu.RLock()
+ for k, b := range s.buffers {
+ if strings.HasPrefix(k, prefix) {
+ subscribe(b)
+ }
+ }
+ s.mu.RUnlock()
+
+ // Fan-in goroutine: forward every per-buffer channel into the merged
+ // channel until all source channels close, then close the merged channel.
+ if len(refs) == 0 {
+ // No source buffers yet: still return a channel so callers don't crash;
+ // it'll close on unsubscribe.
+ unsubscribe := func() { close(ch) }
+ return ch, unsubscribe
+ }
+
+ var fanWG sync.WaitGroup
+ closeOnce := sync.OnceFunc(func() { close(ch) })
+ for _, r := range refs {
+ fanWG.Add(1)
+ go func(c chan BackendLogLine) {
+ defer fanWG.Done()
+ for line := range c {
+ select {
+ case ch <- line:
+ default: // drop on slow consumer to match non-aggregated behavior
+ }
+ }
+ }(r.ch)
+ }
+ go func() { fanWG.Wait(); closeOnce() }()
unsubscribe := func() {
- buf.mu.Lock()
- if _, exists := buf.subscribers[id]; exists {
- delete(buf.subscribers, id)
- close(ch)
+ for _, r := range refs {
+ r.buf.mu.Lock()
+ if c, exists := r.buf.subscribers[r.id]; exists {
+ delete(r.buf.subscribers, r.id)
+ close(c) // closes the per-buffer source channel; fan-in goroutine exits
+ }
+ r.buf.mu.Unlock()
}
- buf.mu.Unlock()
+ closeOnce()
}
return ch, unsubscribe
diff --git a/pkg/model/backend_log_store_test.go b/pkg/model/backend_log_store_test.go
new file mode 100644
index 000000000..eceff0b30
--- /dev/null
+++ b/pkg/model/backend_log_store_test.go
@@ -0,0 +1,140 @@
+package model
+
+import (
+ "reflect"
+ "sort"
+ "testing"
+ "time"
+)
+
+// TestGetLines_PrefixAggregation pins the multi-replica behavior added when
+// the worker's process key changed from `modelID` to `modelID#replicaIndex`.
+// The frontend still asks for logs of `qwen3-0.6b`, but the actual buffers
+// live under `qwen3-0.6b#0` and `qwen3-0.6b#1` — without aggregation,
+// operators see no logs in distributed mode.
+func TestGetLines_PrefixAggregation(t *testing.T) {
+ s := NewBackendLogStore(100)
+
+ // Two replicas of the same model, plus a different model that should
+ // never leak in. AppendLine timestamps via time.Now(), so add small
+ // sleeps so the merged order is deterministic.
+ s.AppendLine("qwen3-0.6b#0", "stderr", "r0-line-1")
+ time.Sleep(2 * time.Millisecond)
+ s.AppendLine("qwen3-0.6b#1", "stderr", "r1-line-1")
+ time.Sleep(2 * time.Millisecond)
+ s.AppendLine("qwen3-0.6b#0", "stdout", "r0-line-2")
+ time.Sleep(2 * time.Millisecond)
+ s.AppendLine("other-model#0", "stderr", "should-not-appear")
+
+ got := s.GetLines("qwen3-0.6b")
+ var texts []string
+ for _, l := range got {
+ texts = append(texts, l.Text)
+ }
+ want := []string{"r0-line-1", "r1-line-1", "r0-line-2"}
+ if !reflect.DeepEqual(texts, want) {
+ t.Fatalf("aggregated texts = %v, want %v", texts, want)
+ }
+
+ // Per-replica filtering: full process key returns only that replica.
+ r0 := s.GetLines("qwen3-0.6b#0")
+ if len(r0) != 2 {
+ t.Fatalf("replica 0 should have 2 lines, got %d", len(r0))
+ }
+ for _, l := range r0 {
+ if l.Text == "r1-line-1" {
+ t.Fatalf("replica 0 must not include replica 1's lines")
+ }
+ }
+
+ // No matching replica: empty slice (not nil; existing callers rely on len()).
+ if got := s.GetLines("never-loaded-model"); len(got) != 0 {
+ t.Fatalf("unknown model should yield empty slice, got %v", got)
+ }
+}
+
+// TestListModels_DedupReplicas confirms the /v1/backend-logs listing shows
+// one entry per model, not one per replica — operators don't think about
+// replica indexes; they pick a model.
+func TestListModels_DedupReplicas(t *testing.T) {
+ s := NewBackendLogStore(100)
+ s.AppendLine("model-a#0", "stderr", "x")
+ s.AppendLine("model-a#1", "stderr", "y")
+ s.AppendLine("model-b#0", "stderr", "z")
+ s.AppendLine("model-c", "stderr", "no-replica-suffix") // back-compat for non-distributed
+
+ got := s.ListModels()
+ sort.Strings(got)
+ want := []string{"model-a", "model-b", "model-c"}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("ListModels = %v, want %v", got, want)
+ }
+}
+
+// TestSubscribe_AggregatesAcrossReplicas confirms the WebSocket streaming
+// path (the live tail UI) receives lines from every replica when the
+// caller subscribes by bare modelID.
+func TestSubscribe_AggregatesAcrossReplicas(t *testing.T) {
+ s := NewBackendLogStore(100)
+
+ // Pre-create both replica buffers so Subscribe can find them.
+ s.AppendLine("model-a#0", "stderr", "preload-r0")
+ s.AppendLine("model-a#1", "stderr", "preload-r1")
+
+ ch, unsubscribe := s.Subscribe("model-a")
+ defer unsubscribe()
+
+ // Emit one line per replica after subscribing.
+ s.AppendLine("model-a#0", "stderr", "live-r0")
+ s.AppendLine("model-a#1", "stderr", "live-r1")
+ // Different model — must not appear.
+ s.AppendLine("model-b#0", "stderr", "leak-check")
+
+ seen := map[string]bool{}
+ deadline := time.After(500 * time.Millisecond)
+ for len(seen) < 2 {
+ select {
+ case line, ok := <-ch:
+ if !ok {
+ t.Fatalf("subscribe channel closed early; saw %v", seen)
+ }
+ seen[line.Text] = true
+ if line.Text == "leak-check" {
+ t.Fatalf("subscribe leaked a line from a different model")
+ }
+ case <-deadline:
+ t.Fatalf("timed out waiting for fan-in lines; saw %v", seen)
+ }
+ }
+ if !seen["live-r0"] || !seen["live-r1"] {
+ t.Fatalf("missing live lines from replicas: saw %v", seen)
+ }
+}
+
+// TestSubscribe_PerReplicaFilter pins that callers passing the full process
+// key get only that replica — useful for a future per-replica logs view.
+func TestSubscribe_PerReplicaFilter(t *testing.T) {
+ s := NewBackendLogStore(100)
+
+ ch, unsubscribe := s.Subscribe("model-a#0")
+ defer unsubscribe()
+
+ s.AppendLine("model-a#0", "stderr", "wanted")
+ s.AppendLine("model-a#1", "stderr", "unwanted")
+
+ select {
+ case line := <-ch:
+ if line.Text != "wanted" {
+ t.Fatalf("expected line from replica 0, got %q", line.Text)
+ }
+ case <-time.After(500 * time.Millisecond):
+ t.Fatalf("no line received from replica-scoped subscription")
+ }
+
+ // Drain quickly: confirm replica 1 didn't leak in.
+ select {
+ case line := <-ch:
+ t.Fatalf("replica-scoped sub leaked line from replica 1: %q", line.Text)
+ case <-time.After(50 * time.Millisecond):
+ }
+}