fix(nodes): make per-node backend install async via gallery job queue (#9928)

* feat(galleryop): add TargetNodeID to ManagementOp for single-node installs

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): add NodeScopedKey helpers for per-node opcache rows

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(galleryop): use strings.Cut for NodeScopedKey parsing, reject empty nodeID

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(nodes): scope DistributedBackendManager.InstallBackend to single node via TargetNodeID

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(http): make /api/nodes/:id/backends/install async via gallery service job queue

The handler previously called unloader.InstallBackend synchronously and
blocked the browser for up to 3 minutes waiting on the NATS reply. It now
enqueues a TargetNodeID-scoped ManagementOp on BackendGalleryChannel and
returns HTTP 202 + jobID immediately, matching /api/backends/install/:id.

The opcache key is built via NodeScopedKey(nodeID, backend) so concurrent
installs of the same backend across different nodes do not stomp each
other. galleryService/opcache/appConfig are threaded through
RegisterNodeAdminRoutes for this.

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(http): log malformed backend_galleries override and stop test drain goroutine

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(api): expose nodeID for node-scoped backend ops in /api/operations

Node-scoped backend installs land in opcache under "node:<nodeID>:<backend>"
keys. Without splitting that prefix back out, the operations panel renders
the full key as the display name and has no structured way to label which
worker an install is targeting. Detect the prefix, surface nodeID as its own
response field, and reduce the display name back to the bare backend slug.
Bare (non-scoped) ops are left untouched so legacy installs do not gain a
misleading empty nodeID.

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(react-ui): poll job status for node-targeted backend installs

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(react-ui): make NodeInstallPicker state updates pure and surface cancellations as errors

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(react-ui): clarify async semantics in handleInstallOnTarget

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(http): use statusUrl casing for node install response to match codebase precedent

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-21 22:25:53 +02:00
committed by GitHub
parent 05e8e1e9f4
commit a39e025d64
12 changed files with 592 additions and 62 deletions

View File

@@ -407,7 +407,7 @@ func API(application *application.Application) (*echo.Echo, error) {
}
}
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
// Distributed SSE routes (job progress + agent events via NATS)
if d := application.Distributed(); d != nil {

View File

@@ -16,8 +16,11 @@ import (
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
"gorm.io/gorm"
@@ -381,14 +384,24 @@ func ResumeNodeEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
}
}
// InstallBackendOnNodeEndpoint triggers backend installation on a worker node via NATS.
// InstallBackendOnNodeEndpoint triggers backend installation on a worker node.
// Async: enqueues a ManagementOp on the gallery service channel and returns a
// jobID immediately. The gallery service worker goroutine drives the actual
// install via DistributedBackendManager.InstallBackend, which honors the op's
// TargetNodeID to scope the fan-out to one node. The UI polls /api/backends/job/:uid
// for progress, mirroring /api/backends/install/:id.
//
// Backend can be either a gallery ID (resolved against BackendGalleries) or a
// direct URI install (URI + Name + optional Alias) same shape as the
// direct URI install (URI + Name + optional Alias) - same shape as the
// standalone /api/backends/install-external path, just scoped to one node.
func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.HandlerFunc {
//
// The legacy unloader argument is retained for signature symmetry with
// DeleteBackendOnNodeEndpoint / ListBackendsOnNodeEndpoint but is no longer
// used here - the async path goes through galleryService.
func InstallBackendOnNodeEndpoint(_ nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig) echo.HandlerFunc {
return func(c echo.Context) error {
if unloader == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "NATS not configured"))
if galleryService == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "gallery service not configured"))
}
nodeID := c.Param("id")
var req struct {
@@ -401,25 +414,65 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "invalid request body"))
}
// Either a gallery backend name or a direct URI must be supplied.
if req.Backend == "" && req.URI == "" {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "backend name or uri required"))
}
// Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`). The worker's fast path
// takes over if the backend is already running — upgrades go through
// the dedicated /api/backends/upgrade path on backend.upgrade.
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
jobUUID, err := uuid.NewUUID()
if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to generate job id"))
}
if !reply.Success {
xlog.Error("Backend install failed on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", reply.Error)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "backend installation failed"))
jobID := jobUUID.String()
// Cache key: for gallery installs, use the backend slug; for URI
// installs prefer the provided Name (falling back to URI). All keys
// are node-scoped so concurrent installs of the same backend on
// different nodes do not stomp each other in opcache.
backendKey := req.Backend
if backendKey == "" {
backendKey = req.Name
if backendKey == "" {
backendKey = req.URI
}
}
return c.JSON(http.StatusOK, map[string]string{"message": "backend installed"})
cacheKey := galleryop.NodeScopedKey(nodeID, backendKey)
opcache.SetBackend(cacheKey, jobID)
// Optional caller-supplied galleries override. Mirrors the standalone
// install path so an admin can point at a private gallery.
galleries := appConfig.BackendGalleries
if req.BackendGalleries != "" {
var custom []config.Gallery
if err := json.Unmarshal([]byte(req.BackendGalleries), &custom); err != nil {
xlog.Warn("Ignoring malformed backend_galleries override; falling back to configured galleries", "error", err, "nodeID", nodeID)
} else if len(custom) > 0 {
galleries = custom
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
op := galleryop.ManagementOp[gallery.GalleryBackend, any]{
ID: jobID,
GalleryElementName: req.Backend,
Galleries: galleries,
TargetNodeID: nodeID,
ExternalURI: req.URI,
ExternalName: req.Name,
ExternalAlias: req.Alias,
Context: ctx,
CancelFunc: cancelFunc,
}
galleryService.StoreCancellation(jobID, cancelFunc)
go func() {
galleryService.BackendGalleryChannel <- op
}()
xlog.Info("Node-scoped backend install dispatched", "node", nodeID, "backend", req.Backend, "uri", req.URI, "jobID", jobID)
return c.JSON(http.StatusAccepted, map[string]string{
"jobID": jobID,
"statusUrl": "/api/backends/job/" + jobID,
"message": "backend installation started",
})
}
}

View File

@@ -0,0 +1,123 @@
package localai_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/labstack/echo/v4"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// InstallBackendOnNodeEndpoint became async to stop blocking the browser on
// the 3-minute NATS reply timeout. These specs lock in the new contract:
// HTTP 202 with a jobID, a ManagementOp enqueued on the gallery channel, and
// an opcache entry keyed by NodeScopedKey so concurrent installs of the same
// backend on different nodes do not stomp each other.
var _ = Describe("InstallBackendOnNodeEndpoint async behavior", func() {
var (
e *echo.Echo
galleryService *galleryop.GalleryService
opcache *galleryop.OpCache
appCfg *config.ApplicationConfig
dispatched chan galleryop.ManagementOp[gallery.GalleryBackend, any]
done chan struct{}
drainExited chan struct{}
)
BeforeEach(func() {
e = echo.New()
appCfg = &config.ApplicationConfig{
BackendGalleries: []config.Gallery{{Name: "test-gallery", URL: "http://example.com"}},
}
galleryService = galleryop.NewGalleryService(appCfg, nil)
opcache = galleryop.NewOpCache(galleryService)
// Drain the gallery channel into a buffered side channel so the
// handler's `go func() { ch <- op }()` send does not block waiting
// for the real worker (which is not running in this unit test).
dispatched = make(chan galleryop.ManagementOp[gallery.GalleryBackend, any], 4)
done = make(chan struct{})
drainExited = make(chan struct{})
go func() {
defer close(drainExited)
for {
select {
case op := <-galleryService.BackendGalleryChannel:
dispatched <- op
case <-done:
return
}
}
}()
})
AfterEach(func() {
// Signal the drain goroutine to exit. We do NOT close
// BackendGalleryChannel: the handler's dispatch goroutine may still
// be pending (specs that don't Eventually-Receive), and a send on a
// closed channel panics. Signalling via `done` lets the drain
// goroutine return without touching the gallery channel.
close(done)
Eventually(drainExited, "2s").Should(BeClosed())
})
It("returns 202 with a jobID and dispatches a TargetNodeID-scoped op", func() {
body := `{"backend": "llama-cpp"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))
var resp map[string]any
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp["jobID"]).To(BeAssignableToTypeOf(""))
Expect(resp["jobID"].(string)).ToNot(BeEmpty())
Expect(resp["message"]).To(Equal("backend installation started"))
Eventually(dispatched, "2s").Should(Receive())
Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
Expect(opcache.IsBackendOp(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
})
It("returns 400 when neither backend nor uri is supplied", func() {
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(`{}`))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
})
It("accepts a direct URI install and uses the name as the cache key", func() {
body := `{"uri": "oci://example.com/custom-backend:v1", "name": "custom"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))
Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "custom"))).To(BeTrue())
})
})

View File

@@ -1,7 +1,7 @@
import { useState, useMemo, useEffect, useRef } from 'react'
import Modal from './Modal'
import SearchableSelect from './SearchableSelect'
import { nodesApi } from '../utils/api'
import { nodesApi, backendsApi } from '../utils/api'
// NodeInstallPicker is the single multi-node install surface used both from
// the Backends gallery split-button and from the "Install on more nodes" `+`
@@ -240,6 +240,37 @@ export default function NodeInstallPicker({
}
const clearSelection = () => setSelected(new Set())
// pollJob resolves with { done: true, error?: string } once a single job
// completes, fails, or is cancelled. Bounded by a hard wall-clock cap so a
// stuck worker eventually surfaces in the UI as "Failed" instead of
// spinning forever.
const pollJob = (jobID) => new Promise((resolve) => {
const POLL_INTERVAL_MS = 1500
const HARD_CAP_MS = 6 * 60 * 1000 // 6 min - generous for a fresh worker download
const startedAt = Date.now()
const tick = async () => {
try {
const status = await backendsApi.getJob(jobID)
if (status?.completed) { resolve({ done: true }); return }
if (status?.error) { resolve({ done: true, error: status.error }); return }
if (status?.processed && !status?.completed) {
resolve({ done: true, error: status.error || 'install did not complete' })
return
}
} catch (err) {
resolve({ done: true, error: err?.message || 'polling failed' })
return
}
if (Date.now() - startedAt > HARD_CAP_MS) {
resolve({ done: true, error: 'timed out waiting for install to finish' })
return
}
setTimeout(tick, POLL_INTERVAL_MS)
}
tick()
})
const submit = async () => {
if (selected.size === 0 || submitting) return
if (counts.overrides > 0 && !showMismatchConfirm) {
@@ -255,38 +286,68 @@ export default function NodeInstallPicker({
return next
})
const results = await Promise.allSettled(ids.map(id =>
// Phase 1: dispatch all installs in parallel. Each POST returns immediately
// with { jobID } now that the handler is async.
const dispatchResults = await Promise.allSettled(ids.map(id =>
nodesApi.installBackend(id, effectiveBackendName)
.then(r => ({ id, ok: true, message: r?.message }))
.catch(err => ({ id, ok: false, error: err?.message || 'install failed' }))
.then(r => ({ id, ok: true, jobID: r?.jobID }))
.catch(err => ({ id, ok: false, error: err?.message || 'dispatch failed' }))
))
let successCount = 0, failCount = 0
setPerNode(prev => {
const next = { ...prev }
for (const r of results) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok) {
next[v.id] = { status: 'done' }
successCount++
} else {
next[v.id] = { status: 'error', error: v.error }
failCount++
}
// Classify dispatch results synchronously OUTSIDE the setter. React may
// invoke a functional state updater more than once (StrictMode dev double
// invoke, concurrent rendering replay): building the jobs array inside
// the closure would duplicate entries and re-poll the same job.
const jobs = []
const dispatchPatch = {}
for (const r of dispatchResults) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok && v.jobID) {
dispatchPatch[v.id] = { status: 'installing', jobID: v.jobID }
jobs.push({ nodeID: v.id, jobID: v.jobID })
} else {
dispatchPatch[v.id] = { status: 'error', error: v.error || 'dispatch failed' }
}
return next
}
setPerNode(prev => ({ ...prev, ...dispatchPatch }))
// Phase 2: poll each job. Promise.all resolves when the last job settles;
// intermediate updates flip per-row state via the setPerNode inside pollJob.
await Promise.all(jobs.map(async ({ nodeID, jobID }) => {
const result = await pollJob(jobID)
setPerNode(prev => {
const next = { ...prev }
if (result.error) {
next[nodeID] = { status: 'error', error: result.error, jobID }
} else {
next[nodeID] = { status: 'done', jobID }
}
return next
})
}))
// Phase 3: summary toast + onComplete. Read latest state via functional setter.
let successCount = 0
let failCount = 0
setPerNode(prev => {
for (const v of Object.values(prev)) {
if (v.status === 'done') successCount++
else if (v.status === 'error') failCount++
}
return prev
})
setSubmitting(false)
if (successCount > 0 && onComplete) onComplete()
if (failCount === 0) {
if (failCount === 0 && successCount > 0) {
addToast?.(`Installed on ${successCount} node${successCount === 1 ? '' : 's'}`, 'success')
setTimeout(() => onClose?.(), 800)
} else if (successCount === 0) {
} else if (successCount === 0 && failCount > 0) {
addToast?.(`Install failed on all ${failCount} node${failCount === 1 ? '' : 's'}`, 'error')
} else {
} else if (successCount > 0 && failCount > 0) {
addToast?.(`Installed on ${successCount}, failed on ${failCount}`, 'warning')
}
}
@@ -297,32 +358,58 @@ export default function NodeInstallPicker({
.map(([id]) => id)
if (failedIds.length === 0) return
setSelected(new Set(failedIds))
// Replace state for failed rows so they show "installing" again, not stale errors.
setPerNode(prev => {
const next = { ...prev }
failedIds.forEach(id => { next[id] = { status: 'installing' } })
return next
})
setSubmitting(true)
const results = await Promise.allSettled(failedIds.map(id =>
const dispatchResults = await Promise.allSettled(failedIds.map(id =>
nodesApi.installBackend(id, effectiveBackendName)
.then(r => ({ id, ok: true, message: r?.message }))
.catch(err => ({ id, ok: false, error: err?.message || 'install failed' }))
.then(r => ({ id, ok: true, jobID: r?.jobID }))
.catch(err => ({ id, ok: false, error: err?.message || 'dispatch failed' }))
))
// Same precaution as in submit(): classify outside the functional setter
// so a replayed updater can't push duplicate jobs into the polling list.
const jobs = []
const dispatchPatch = {}
for (const r of dispatchResults) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok && v.jobID) {
dispatchPatch[v.id] = { status: 'installing', jobID: v.jobID }
jobs.push({ nodeID: v.id, jobID: v.jobID })
} else {
dispatchPatch[v.id] = { status: 'error', error: v.error || 'dispatch failed' }
}
}
setPerNode(prev => ({ ...prev, ...dispatchPatch }))
await Promise.all(jobs.map(async ({ nodeID, jobID }) => {
const result = await pollJob(jobID)
setPerNode(prev => {
const next = { ...prev }
if (result.error) next[nodeID] = { status: 'error', error: result.error, jobID }
else next[nodeID] = { status: 'done', jobID }
return next
})
}))
setSubmitting(false)
let successCount = 0, failCount = 0
setPerNode(prev => {
const next = { ...prev }
for (const r of results) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok) { next[v.id] = { status: 'done' }; successCount++ }
else { next[v.id] = { status: 'error', error: v.error }; failCount++ }
for (const id of failedIds) {
const v = prev[id]
if (v?.status === 'done') successCount++
else if (v?.status === 'error') failCount++
}
return next
return prev
})
setSubmitting(false)
if (successCount > 0 && onComplete) onComplete()
if (failCount === 0) {
if (failCount === 0 && successCount > 0) {
addToast?.(`Installed on ${successCount} node${successCount === 1 ? '' : 's'}`, 'success')
setTimeout(() => onClose?.(), 800)
}

View File

@@ -179,16 +179,19 @@ export default function Backends() {
// Install a single gallery backend on a specific node, used in target-node
// mode (the URL has ?target=<node-id> set from the Nodes page entry point).
// The handler is async - we dispatch and let the global Operations panel
// surface progress; no need to await completion here.
const handleInstallOnTarget = async (id) => {
if (!targetNode) return
try {
await nodesApi.installBackend(targetNode.id, id)
addToast(`Installing ${id} on ${targetNode.name}`, 'info')
// Per-node install is request-reply, not part of the global jobs feed —
// refetch to reflect the new Nodes column state.
setTimeout(() => { fetchBackends(); refetchNodes() }, 600)
addToast(`Installing ${id} on ${targetNode.name}...`, 'info')
// The install runs async via the gallery job queue. Refetch shortly so
// the Nodes column reflects "installing" state; the Operations panel
// tracks the actual progress until completion.
setTimeout(() => { fetchBackends(); refetchNodes() }, 1200)
} catch (err) {
addToast(`Install failed on ${targetNode.name}: ${err.message}`, 'error')
addToast(`Install dispatch failed on ${targetNode.name}: ${err.message}`, 'error')
}
}

View File

@@ -6,7 +6,9 @@ import (
"strings"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"gorm.io/gorm"
)
@@ -53,7 +55,12 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
// RegisterNodeAdminRoutes registers /api/nodes/ endpoints used by admins
// (list, get, get models, drain, delete, approve, backend management). Protected by admin middleware.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
//
// galleryService/opcache/appConfig are threaded in for the async node-scoped
// backend install path (POST /:id/backends/install). That handler enqueues a
// ManagementOp on the gallery channel rather than blocking on a NATS reply, so
// the browser gets HTTP 202 + jobID immediately instead of waiting up to 3 minutes.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
if registry == nil {
return
}
@@ -78,7 +85,7 @@ func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloade
// Backend management on workers
admin.GET("/:id/backends", localai.ListBackendsOnNodeEndpoint(unloader))
admin.POST("/:id/backends/install", localai.InstallBackendOnNodeEndpoint(unloader))
admin.POST("/:id/backends/install", localai.InstallBackendOnNodeEndpoint(unloader, galleryService, opcache, appConfig))
admin.POST("/:id/backends/delete", localai.DeleteBackendOnNodeEndpoint(unloader))
// Model management on workers

View File

@@ -214,6 +214,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
}
}
// Node-scoped backend ops (from /api/nodes/:id/backends/install)
// carry the nodeID inside the opcache key as "node:<nodeID>:<backend>".
// Pull it back out so the operations panel can label which node the
// install is targeting, and so the display name is just the backend
// slug instead of the full prefixed key.
scopedNodeID := ""
if nodeID, backend, ok := galleryop.ParseNodeScopedKey(galleryID); ok {
scopedNodeID = nodeID
galleryID = backend
}
// Extract display name (remove repo prefix if exists)
displayName := galleryID
if strings.Contains(galleryID, "@") {
@@ -237,6 +248,12 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
"cancellable": isCancellable,
"message": message,
}
// Only attach nodeID when this op was node-scoped: an empty string
// would mislead the UI into rendering a node attribution that never
// existed in the first place.
if scopedNodeID != "" {
opData["nodeID"] = scopedNodeID
}
if status != nil && status.Error != nil {
opData["error"] = status.Error.Error()
}

View File

@@ -0,0 +1,98 @@
package routes_test
import (
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/labstack/echo/v4"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/application"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/routes"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// These specs guard the contract between the opcache (which stores
// node-scoped backend installs under a "node:<nodeID>:<backend>" key) and the
// /api/operations response surface the React UI polls. Without nodeID
// extraction the panel would show the raw prefixed key and have no way to
// label which worker an install is targeting.
var _ = Describe("/api/operations with node-scoped backend ops", func() {
// We pass a zero-value *application.Application because the handler's
// distributed-services branch guards on a nil check on the returned
// *DistributedServices, which is nil for a fresh Application{}.
noopMw := func(next echo.HandlerFunc) echo.HandlerFunc { return next }
It("emits nodeID and the un-prefixed backend name for keys built by NodeScopedKey", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
key := galleryop.NodeScopedKey("worker-7", "llama-cpp")
opcache.SetBackend(key, "job-uuid-123")
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
// The handler wraps operations in {"operations": [...]}.
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == "job-uuid-123" {
found = op
break
}
}
Expect(found).ToNot(BeNil(), "node-scoped op should appear in /api/operations")
Expect(found["nodeID"]).To(Equal("worker-7"))
Expect(found["name"]).To(Equal("llama-cpp"))
Expect(found["isBackend"]).To(Equal(true))
})
It("does not emit nodeID for non-node-scoped backend ops", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
// Legacy/global install path: bare backend name as the opcache key.
opcache.SetBackend("llama-cpp", "job-uuid-456")
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == "job-uuid-456" {
found = op
break
}
}
Expect(found).ToNot(BeNil())
// Critical: bare ops must NOT gain a misleading empty nodeID field.
Expect(found).ToNot(HaveKey("nodeID"), "non-node-scoped ops must NOT carry a nodeID field")
Expect(found["name"]).To(Equal("llama-cpp"))
})
})

View File

@@ -196,4 +196,60 @@ var _ = Describe("ManagementOp with External Backend", func() {
Expect(op.ExternalName).To(Equal("test-backend"))
Expect(op.ExternalAlias).To(Equal("test-alias"))
})
Context("TargetNodeID field", func() {
It("defaults to empty string", func() {
op := galleryop.ManagementOp[string, string]{
ExternalURI: "oci://example.com/backend:latest",
}
Expect(op.TargetNodeID).To(BeEmpty())
})
It("preserves TargetNodeID across a channel send", func() {
ch := make(chan galleryop.ManagementOp[string, string], 1)
ch <- galleryop.ManagementOp[string, string]{
GalleryElementName: "llama-cpp",
TargetNodeID: "node-abc-123",
}
received := <-ch
Expect(received.TargetNodeID).To(Equal("node-abc-123"))
Expect(received.GalleryElementName).To(Equal("llama-cpp"))
})
})
Describe("NodeScopedKey", func() {
It("builds a unique key per (nodeID, backend) pair", func() {
Expect(galleryop.NodeScopedKey("node-a", "llama-cpp")).To(Equal("node:node-a:llama-cpp"))
Expect(galleryop.NodeScopedKey("node-b", "llama-cpp")).To(Equal("node:node-b:llama-cpp"))
Expect(galleryop.NodeScopedKey("node-a", "vllm")).To(Equal("node:node-a:vllm"))
})
It("handles backend names containing colons", func() {
// Gallery IDs sometimes look like "official@llama-cpp"; nodeIDs are UUIDs
// without colons, but the backend slug may contain anything. Splitting on
// the first colon after the prefix MUST yield the full backend back.
key := galleryop.NodeScopedKey("node-1", "official@llama-cpp:v2")
node, backend, ok := galleryop.ParseNodeScopedKey(key)
Expect(ok).To(BeTrue())
Expect(node).To(Equal("node-1"))
Expect(backend).To(Equal("official@llama-cpp:v2"))
})
It("rejects keys without the node prefix", func() {
_, _, ok := galleryop.ParseNodeScopedKey("llama-cpp")
Expect(ok).To(BeFalse())
_, _, ok = galleryop.ParseNodeScopedKey("official@llama-cpp")
Expect(ok).To(BeFalse())
})
It("rejects malformed node-prefixed keys", func() {
_, _, ok := galleryop.ParseNodeScopedKey("node:only-one-segment")
Expect(ok).To(BeFalse())
})
It("rejects keys with an empty nodeID segment", func() {
_, _, ok := galleryop.ParseNodeScopedKey("node::llama-cpp")
Expect(ok).To(BeFalse())
})
})
})

View File

@@ -2,6 +2,7 @@ package galleryop
import (
"context"
"strings"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/xsync"
@@ -30,6 +31,12 @@ type ManagementOp[T any, E any] struct {
ExternalName string // Custom name for the backend
ExternalAlias string // Custom alias for the backend
// TargetNodeID scopes a backend install/upgrade to a single worker node.
// Empty means fan out to every healthy backend node (the previous behavior).
// Set by InstallBackendOnNodeEndpoint so an admin can install a hardware-specific
// build on one node without touching the rest of the cluster.
TargetNodeID string
// Upgrade is true if this is an upgrade operation (not a fresh install)
Upgrade bool
}
@@ -115,3 +122,31 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
return processingModelsData, taskTypes
}
// NodeScopedKeyPrefix is the opcache key prefix used by InstallBackendOnNodeEndpoint
// so per-node installs do not collide on the bare backend name. Format:
// "node:<nodeID>:<backend>". Read by /api/operations to extract nodeID for the UI.
const NodeScopedKeyPrefix = "node:"
// NodeScopedKey returns the opcache key for a node-scoped backend operation.
// The prefix lets ParseNodeScopedKey detach the nodeID back out so the
// operations endpoint can surface it without storing nodeID separately.
func NodeScopedKey(nodeID, backend string) string {
return NodeScopedKeyPrefix + nodeID + ":" + backend
}
// ParseNodeScopedKey extracts (nodeID, backend) from a key built by NodeScopedKey.
// Returns ok=false for keys that lack the prefix or are missing the nodeID or
// backend segment. Backend names containing colons are preserved because we
// split on the first colon after the prefix only.
func ParseNodeScopedKey(key string) (nodeID, backend string, ok bool) {
rest, hasPrefix := strings.CutPrefix(key, NodeScopedKeyPrefix)
if !hasPrefix {
return "", "", false
}
nodeID, backend, ok = strings.Cut(rest, ":")
if !ok || nodeID == "" || backend == "" {
return "", "", false
}
return nodeID, backend, true
}

View File

@@ -331,13 +331,23 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
//
// When op.TargetNodeID is set, only that node is visited - the same allowlist
// path UpgradeBackend uses. Empty TargetNodeID preserves the original fan-out
// behavior so the periodic reconciler and /api/backends/install/:id keep
// working unchanged.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
var targetNodeIDs map[string]bool
if op.TargetNodeID != "" {
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 the worker's processKey is "backend#0" when no
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
if err != nil {

View File

@@ -311,6 +311,47 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mgr.InstallBackend(ctx, op("vllm-development"), nil)).To(Succeed())
})
})
Context("when op.TargetNodeID is set to a healthy node", func() {
It("installs only on that node, leaving the others untouched", func() {
target := registerHealthyBackend("worker-target", "10.0.0.1:50051")
other := registerHealthyBackend("worker-other", "10.0.0.2:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(target.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.1:50100"})
// No reply scripted for `other`: if InstallBackend fans out
// to it, the fakeNoRespondersErr default would surface and
// the test would fail.
targetedOp := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
GalleryElementName: "llama-cpp",
TargetNodeID: target.ID,
}
Expect(mgr.InstallBackend(ctx, targetedOp, nil)).To(Succeed())
mc.mu.Lock()
defer mc.mu.Unlock()
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Subject).To(Equal(messaging.SubjectNodeBackendInstall(target.ID)))
Expect(mc.calls[0].Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(other.ID)))
})
})
Context("when op.TargetNodeID is set to a node that does not exist", func() {
It("returns nil without sending any NATS request", func() {
registerHealthyBackend("worker-a", "10.0.0.1:50051")
ghostOp := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
GalleryElementName: "llama-cpp",
TargetNodeID: "this-id-does-not-exist",
}
Expect(mgr.InstallBackend(ctx, ghostOp, nil)).To(Succeed())
mc.mu.Lock()
defer mc.mu.Unlock()
Expect(mc.calls).To(BeEmpty())
})
})
})
Describe("UpgradeBackend", func() {