fix(distributed): broadcast admin model-config changes across replicas (#10540)

In distributed mode the admin model endpoints (/models/edit, /models/import,
/models/toggle-state and the PATCH config-json endpoint) wrote the YAML to the
shared models dir but reloaded only the local replica's in-memory
ModelConfigLoader. With multiple frontend replicas behind one service, a save
landed on whichever replica handled the request; peers kept serving their stale
in-memory view, so a load-balanced request was a coin-flip between old and new
config (a created alias visible on one replica and missing on the other, an
edited alias target diverging, etc.).

The NATS cache-invalidation channel (SubjectCacheInvalidateModels +
OnModelsChanged) already existed for the gallery install/delete path; these
admin endpoints simply never published on it. Wire them up via a new
GalleryService.BroadcastModelsChanged helper (no-op in standalone mode).

Also fix delete propagation: LoadModelConfigsFromPath is additive and never
drops an entry whose file is gone, so the subscriber hook (which only reloaded
from disk) could not propagate a removal. ApplyRemoteChange now honors the
event op - pruning the element on "delete" and reloading otherwise - and shuts
down any running instance of the affected model so the new config takes effect.
This closes the same latent gap on the gallery delete path.


Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-06-27 01:36:57 +02:00
committed by GitHub
parent f98b0f1c1e
commit 64150ca7ab
13 changed files with 239 additions and 16 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/modeladmin"
"github.com/mudler/LocalAI/core/services/monitoring"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/routing/admission"
@@ -330,9 +331,14 @@ func New(opts ...config.AppOption) (*Application, error) {
gs := application.galleryService
sys := options.SystemState
cfgLoaderOpts := options.ToConfigLoaderOptions()
gs.OnModelsChanged = func(_ messaging.CacheInvalidateEvent) {
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(sys.Model.ModelsPath, cfgLoaderOpts...); err != nil {
xlog.Warn("Failed to reload model configs after peer invalidation", "error", err)
gs.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) {
// ApplyRemoteChange honors the op: a "delete" prunes the element
// (a reload-from-path is additive and cannot drop it), anything
// else reloads from disk; a named element's running instance is
// shut down so the new config takes effect. The originating
// replica reloads inline and never depends on this path.
if err := modeladmin.ApplyRemoteChange(application.ModelConfigLoader(), application.modelLoader, sys.Model.ModelsPath, evt, cfgLoaderOpts...); err != nil {
xlog.Warn("Failed to apply peer model config change", "error", err)
}
}
if err := application.galleryService.SubscribeBroadcasts(); err != nil {

View File

@@ -155,7 +155,7 @@ func AutocompleteEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, a
// @Param name path string true "Model name"
// @Success 200 {object} map[string]any "success message"
// @Router /api/models/config-json/{name} [patch]
func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc {
func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc {
svc := modeladmin.NewConfigService(cl, appConfig)
return func(c echo.Context) error {
modelName := c.Param("name")
@@ -173,6 +173,14 @@ func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, app
if _, err := svc.PatchConfig(c.Request().Context(), modelName, patchMap); err != nil {
return c.JSON(httpStatusForModelAdminError(err), map[string]any{"error": err.Error()})
}
// Patch rewrites the config on disk and reloads only the local loader;
// tell peers to refresh so the change is consistent across replicas.
// No-op in standalone mode.
if gs != nil {
gs.BroadcastModelsChanged(modelName, "install")
}
return c.JSON(http.StatusOK, map[string]any{
"success": true,
"message": fmt.Sprintf("Model '%s' updated successfully", modelName),

View File

@@ -45,7 +45,7 @@ var _ = Describe("Config Metadata Endpoints", func() {
app = echo.New()
app.GET("/api/models/config-metadata", ConfigMetadataEndpoint())
app.GET("/api/models/config-metadata/autocomplete/:provider", AutocompleteEndpoint(configLoader, modelLoader, appConfig))
app.PATCH("/api/models/config-json/:name", PatchConfigEndpoint(configLoader, modelLoader, appConfig))
app.PATCH("/api/models/config-json/:name", PatchConfigEndpoint(configLoader, modelLoader, nil, appConfig))
})
AfterEach(func() {

View File

@@ -10,6 +10,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
httpUtils "github.com/mudler/LocalAI/core/http/middleware"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/modeladmin"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/model"
@@ -55,7 +56,7 @@ func GetEditModelPage(cl *config.ModelConfigLoader, appConfig *config.Applicatio
}
// EditModelEndpoint handles updating existing model configurations
func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc {
func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc {
svc := modeladmin.NewConfigService(cl, appConfig)
return func(c echo.Context) error {
modelName := c.Param("name")
@@ -70,6 +71,17 @@ func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appC
if err != nil {
return c.JSON(httpStatusForModelAdminError(err), ModelResponse{Success: false, Error: err.Error()})
}
// Tell peer replicas to refresh their in-memory config: this endpoint
// only reloaded the local loader. A rename is a delete of the old name
// plus an install of the new one. No-op in standalone mode.
if gs != nil {
if result.Renamed {
gs.BroadcastModelsChanged(result.OldName, "delete")
}
gs.BroadcastModelsChanged(result.NewName, "install")
}
msg := fmt.Sprintf("Model '%s' updated successfully. Model has been reloaded with new configuration.", result.NewName)
if result.Renamed {
msg = fmt.Sprintf("Model '%s' renamed to '%s' and updated successfully.", result.OldName, result.NewName)

View File

@@ -56,7 +56,7 @@ var _ = Describe("Edit Model test", func() {
app := echo.New()
// Set up a simple renderer for the test
app.Renderer = &testRenderer{}
app.POST("/import-model", ImportModelEndpoint(modelConfigLoader, applicationConfig))
app.POST("/import-model", ImportModelEndpoint(modelConfigLoader, nil, applicationConfig))
app.GET("/edit-model/:name", GetEditModelPage(modelConfigLoader, applicationConfig))
requestBody := bytes.NewBufferString(`{"name": "foo", "backend": "foo", "model": "foo"}`)
@@ -106,7 +106,7 @@ var _ = Describe("Edit Model test", func() {
Expect(exists).To(BeTrue())
app := echo.New()
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig))
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig))
newYAML := "name: newname\nbackend: llama\nmodel: foo\n"
req := httptest.NewRequest("POST", "/models/edit/oldname", bytes.NewBufferString(newYAML))
@@ -163,7 +163,7 @@ var _ = Describe("Edit Model test", func() {
Expect(modelConfigLoader.LoadModelConfigsFromPath(tempDir)).To(Succeed())
app := echo.New()
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig))
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig))
req := httptest.NewRequest(
"POST",
@@ -204,7 +204,7 @@ var _ = Describe("Edit Model test", func() {
Expect(modelConfigLoader.LoadModelConfigsFromPath(tempDir)).To(Succeed())
app := echo.New()
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig))
app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig))
req := httptest.NewRequest(
"POST",

View File

@@ -125,7 +125,7 @@ func ImportModelURIEndpoint(cl *config.ModelConfigLoader, appConfig *config.Appl
}
// ImportModelEndpoint handles creating new model configurations
func ImportModelEndpoint(cl *config.ModelConfigLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc {
func ImportModelEndpoint(cl *config.ModelConfigLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc {
return func(c echo.Context) error {
// Get the raw body
body, err := io.ReadAll(c.Request().Body)
@@ -245,6 +245,13 @@ func ImportModelEndpoint(cl *config.ModelConfigLoader, appConfig *config.Applica
}
return c.JSON(http.StatusInternalServerError, response)
}
// Tell peer replicas to load the newly-created config from the shared
// models dir: this endpoint only reloaded the local loader. No-op in
// standalone mode.
if gs != nil {
gs.BroadcastModelsChanged(modelConfig.Name, "install")
}
// Return success response
response := ModelResponse{
Success: true,

View File

@@ -7,6 +7,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/modeladmin"
"github.com/mudler/LocalAI/pkg/model"
)
@@ -24,7 +25,7 @@ import (
// @Failure 404 {object} ModelResponse
// @Failure 500 {object} ModelResponse
// @Router /api/models/{name}/{action} [put]
func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc {
func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc {
svc := modeladmin.NewConfigService(cl, appConfig)
return func(c echo.Context) error {
modelName := c.Param("name")
@@ -36,6 +37,14 @@ func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoade
if err != nil {
return c.JSON(httpStatusForModelAdminError(err), ModelResponse{Success: false, Error: err.Error()})
}
// Enabling/disabling rewrites the config on disk and reloads only the
// local loader; tell peers to refresh so the model's availability is
// consistent across replicas. No-op in standalone mode.
if gs != nil {
gs.BroadcastModelsChanged(modelName, "install")
}
msg := fmt.Sprintf("Model '%s' has been %sd successfully.", modelName, action)
if action == modeladmin.ActionDisable {
msg += " The model will not be loaded on demand until re-enabled."

View File

@@ -72,19 +72,19 @@ func RegisterLocalAIRoutes(router *echo.Echo,
router.POST("/backends/upgrades/check", backendGalleryEndpointService.CheckUpgradesEndpoint(), adminMiddleware)
router.POST("/backends/upgrade/:name", backendGalleryEndpointService.UpgradeBackendEndpoint(), adminMiddleware)
// Custom model import endpoint
router.POST("/models/import", localai.ImportModelEndpoint(cl, appConfig), adminMiddleware)
router.POST("/models/import", localai.ImportModelEndpoint(cl, galleryService, appConfig), adminMiddleware)
// URI model import endpoint
router.POST("/models/import-uri", localai.ImportModelURIEndpoint(cl, appConfig, galleryService, opcache), adminMiddleware)
// Custom model edit endpoint
router.POST("/models/edit/:name", localai.EditModelEndpoint(cl, ml, appConfig), adminMiddleware)
router.POST("/models/edit/:name", localai.EditModelEndpoint(cl, ml, galleryService, appConfig), adminMiddleware)
// List model aliases endpoint
router.GET("/api/aliases", localai.ListAliasesEndpoint(cl), adminMiddleware)
// Toggle model enable/disable endpoint
router.PUT("/models/toggle-state/:name/:action", localai.ToggleStateModelEndpoint(cl, ml, appConfig), adminMiddleware)
router.PUT("/models/toggle-state/:name/:action", localai.ToggleStateModelEndpoint(cl, ml, galleryService, appConfig), adminMiddleware)
// Toggle model pinned status endpoint
router.PUT("/models/toggle-pinned/:name/:action", localai.TogglePinnedModelEndpoint(cl, appConfig, func() {

View File

@@ -922,7 +922,7 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
app.GET("/api/models/config-metadata/autocomplete/:provider", localai.AutocompleteEndpoint(cl, ml, appConfig), adminMiddleware)
// PATCH config endpoint - partial update using nested JSON merge
app.PATCH("/api/models/config-json/:name", localai.PatchConfigEndpoint(cl, ml, appConfig), adminMiddleware)
app.PATCH("/api/models/config-json/:name", localai.PatchConfigEndpoint(cl, ml, galleryService, appConfig), adminMiddleware)
// VRAM estimation endpoint
app.POST("/api/models/vram-estimate", localai.VRAMEstimateEndpoint(cl, appConfig), adminMiddleware)

View File

@@ -404,6 +404,36 @@ var _ = Describe("GalleryService cache invalidation broadcasts", func() {
Element: "x", Op: "install",
})).To(Succeed())
})
It("BroadcastModelsChanged delivers the element and op to a peer's OnModelsChanged", func() {
var (
mu sync.Mutex
seen []messaging.CacheInvalidateEvent
)
svcB.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) {
mu.Lock()
seen = append(seen, evt)
mu.Unlock()
}
Expect(svcA.SubscribeBroadcasts()).To(Succeed())
Expect(svcB.SubscribeBroadcasts()).To(Succeed())
// An admin edit on replica A must reach replica B over the same subject
// the gallery path uses, so B refreshes its in-memory config loader.
svcA.BroadcastModelsChanged("my-alias", "install")
mu.Lock()
defer mu.Unlock()
Expect(seen).To(ContainElement(messaging.CacheInvalidateEvent{
Element: "my-alias", Op: "install",
}))
})
It("BroadcastModelsChanged is a no-op when NATS is not wired (standalone)", func() {
standalone := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
// No SetNATSClient: must not panic and must simply do nothing.
Expect(func() { standalone.BroadcastModelsChanged("x", "delete") }).ToNot(Panic())
})
})
var _ = Describe("GalleryService PostgreSQL hydration", func() {

View File

@@ -201,6 +201,24 @@ func (g *GalleryService) publishCacheInvalidate(subject string, evt messaging.Ca
}
}
// BroadcastModelsChanged notifies peer replicas that a model config was
// created, edited, or removed out-of-band of the gallery install/delete
// channel (e.g. the admin /models/edit, /models/import and
// /models/toggle-state endpoints, which write the YAML and reload only the
// local in-memory loader). Peers receive it via OnModelsChanged and refresh
// their own ModelConfigLoader so a request load-balanced to any replica sees
// the same config. No-op in standalone mode (no NATS client).
//
// op is "install" for a create/edit (the element must be (re)loaded from
// disk) or "delete" for a removal (the element must be pruned from memory,
// which a reload-from-path cannot do because the loader is additive).
func (g *GalleryService) BroadcastModelsChanged(element, op string) {
g.publishCacheInvalidate(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{
Element: element,
Op: op,
})
}
// mergeStatus is the broadcast-side merge: it updates the in-memory map from
// a peer's GalleryProgressEvent without re-publishing to NATS or re-writing
// to PostgreSQL. UpdateStatus is the local-write entry point and does both;

View File

@@ -0,0 +1,53 @@
package modeladmin
import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
)
// opDelete is the CacheInvalidateEvent.Op value the gallery delete path and the
// admin delete endpoint use; a delete must prune (a reload-from-path cannot).
const opDelete = "delete"
// ApplyRemoteChange refreshes this replica's in-memory model state from a peer
// replica's model-config change broadcast (messaging.CacheInvalidateEvent on
// SubjectCacheInvalidateModels). It is the subscriber-side counterpart to
// GalleryService.BroadcastModelsChanged.
//
// The op matters because LoadModelConfigsFromPath is additive: it loads every
// YAML on disk into the loader but never removes an entry whose file is gone.
// So a delete cannot be propagated by a plain reload - the deleted element must
// be explicitly pruned. Specifically:
//
// - op == "delete" with a named element: prune that element from the loader.
// - otherwise: reload all configs from disk (picks up creates and edits).
//
// In both cases, when an element is named, any running instance on this replica
// is shut down (best-effort) so the next request rebuilds it from the new
// config instead of serving the stale one - mirroring what the originating
// replica does on a local edit/delete.
//
// ml may be nil (no running instances to shut down). modelsPath and opts are
// forwarded to LoadModelConfigsFromPath.
func ApplyRemoteChange(cl *config.ModelConfigLoader, ml *model.ModelLoader, modelsPath string, evt messaging.CacheInvalidateEvent, opts ...config.ConfigLoaderOption) error {
if evt.Op == opDelete && evt.Element != "" {
cl.RemoveModelConfig(evt.Element)
} else if err := cl.LoadModelConfigsFromPath(modelsPath, opts...); err != nil {
return err
}
// Drop any running instance of the affected model so the next request
// rebuilds it from the refreshed config instead of serving the stale one.
// Best-effort: the model may not be loaded on this replica, which surfaces
// as a benign error here.
if ml != nil && evt.Element != "" {
if err := ml.ShutdownModel(evt.Element); err != nil {
xlog.Debug("ApplyRemoteChange: could not shut down model instance (likely not loaded)",
"model", evt.Element, "error", err)
}
}
return nil
}

View File

@@ -0,0 +1,80 @@
package modeladmin
import (
"os"
"path/filepath"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gopkg.in/yaml.v3"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/messaging"
)
var _ = Describe("ApplyRemoteChange", func() {
var (
dir string
loader *config.ModelConfigLoader
)
BeforeEach(func() {
dir = GinkgoT().TempDir()
loader = config.NewModelConfigLoader(dir)
})
writeYAML := func(name string, body map[string]any) {
body["name"] = name
data, err := yaml.Marshal(body)
Expect(err).ToNot(HaveOccurred())
Expect(os.WriteFile(filepath.Join(dir, name+".yaml"), data, 0644)).To(Succeed())
}
It("loads a peer-created config from disk on an install event", func() {
// Peer wrote the YAML to the shared models dir; this replica has not
// loaded it yet (empty in-memory loader).
writeYAML("peer-alias", map[string]any{"alias": "qwen"})
_, ok := loader.GetModelConfig("peer-alias")
Expect(ok).To(BeFalse(), "precondition: not yet in memory")
err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{
Element: "peer-alias", Op: "install",
})
Expect(err).ToNot(HaveOccurred())
_, ok = loader.GetModelConfig("peer-alias")
Expect(ok).To(BeTrue(), "install event must reload the new config from disk")
})
It("prunes a peer-deleted config that a reload-from-path cannot drop", func() {
// Model is present in memory (loaded earlier) but its file is now gone
// from the shared dir. LoadModelConfigsFromPath is additive, so only an
// explicit prune can remove it - this is the cross-replica delete bug.
writeYAML("doomed", map[string]any{"alias": "qwen"})
Expect(loader.LoadModelConfigsFromPath(dir)).To(Succeed())
_, ok := loader.GetModelConfig("doomed")
Expect(ok).To(BeTrue(), "precondition: in memory")
Expect(os.Remove(filepath.Join(dir, "doomed.yaml"))).To(Succeed())
err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{
Element: "doomed", Op: "delete",
})
Expect(err).ToNot(HaveOccurred())
_, ok = loader.GetModelConfig("doomed")
Expect(ok).To(BeFalse(), "delete event must prune the element from memory")
})
It("does a full reload when no element is named", func() {
writeYAML("m1", map[string]any{"alias": "qwen"})
writeYAML("m2", map[string]any{"alias": "qwen"})
err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{})
Expect(err).ToNot(HaveOccurred())
_, ok1 := loader.GetModelConfig("m1")
_, ok2 := loader.GetModelConfig("m2")
Expect(ok1).To(BeTrue())
Expect(ok2).To(BeTrue())
})
})