diff --git a/core/application/startup.go b/core/application/startup.go index 1e5a7a73b..a71f8d0ea 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -16,6 +16,7 @@ import ( "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/jobs" "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/modeladmin" "github.com/mudler/LocalAI/core/services/monitoring" "github.com/mudler/LocalAI/core/services/nodes" "github.com/mudler/LocalAI/core/services/routing/admission" @@ -330,9 +331,14 @@ func New(opts ...config.AppOption) (*Application, error) { gs := application.galleryService sys := options.SystemState cfgLoaderOpts := options.ToConfigLoaderOptions() - gs.OnModelsChanged = func(_ messaging.CacheInvalidateEvent) { - if err := application.ModelConfigLoader().LoadModelConfigsFromPath(sys.Model.ModelsPath, cfgLoaderOpts...); err != nil { - xlog.Warn("Failed to reload model configs after peer invalidation", "error", err) + gs.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) { + // ApplyRemoteChange honors the op: a "delete" prunes the element + // (a reload-from-path is additive and cannot drop it), anything + // else reloads from disk; a named element's running instance is + // shut down so the new config takes effect. The originating + // replica reloads inline and never depends on this path. + if err := modeladmin.ApplyRemoteChange(application.ModelConfigLoader(), application.modelLoader, sys.Model.ModelsPath, evt, cfgLoaderOpts...); err != nil { + xlog.Warn("Failed to apply peer model config change", "error", err) } } if err := application.galleryService.SubscribeBroadcasts(); err != nil { diff --git a/core/http/endpoints/localai/config_meta.go b/core/http/endpoints/localai/config_meta.go index b45720b78..3db694512 100644 --- a/core/http/endpoints/localai/config_meta.go +++ b/core/http/endpoints/localai/config_meta.go @@ -155,7 +155,7 @@ func AutocompleteEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, a // @Param name path string true "Model name" // @Success 200 {object} map[string]any "success message" // @Router /api/models/config-json/{name} [patch] -func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc { +func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc { svc := modeladmin.NewConfigService(cl, appConfig) return func(c echo.Context) error { modelName := c.Param("name") @@ -173,6 +173,14 @@ func PatchConfigEndpoint(cl *config.ModelConfigLoader, _ *model.ModelLoader, app if _, err := svc.PatchConfig(c.Request().Context(), modelName, patchMap); err != nil { return c.JSON(httpStatusForModelAdminError(err), map[string]any{"error": err.Error()}) } + + // Patch rewrites the config on disk and reloads only the local loader; + // tell peers to refresh so the change is consistent across replicas. + // No-op in standalone mode. + if gs != nil { + gs.BroadcastModelsChanged(modelName, "install") + } + return c.JSON(http.StatusOK, map[string]any{ "success": true, "message": fmt.Sprintf("Model '%s' updated successfully", modelName), diff --git a/core/http/endpoints/localai/config_meta_test.go b/core/http/endpoints/localai/config_meta_test.go index f56c14b00..e60f7e08d 100644 --- a/core/http/endpoints/localai/config_meta_test.go +++ b/core/http/endpoints/localai/config_meta_test.go @@ -45,7 +45,7 @@ var _ = Describe("Config Metadata Endpoints", func() { app = echo.New() app.GET("/api/models/config-metadata", ConfigMetadataEndpoint()) app.GET("/api/models/config-metadata/autocomplete/:provider", AutocompleteEndpoint(configLoader, modelLoader, appConfig)) - app.PATCH("/api/models/config-json/:name", PatchConfigEndpoint(configLoader, modelLoader, appConfig)) + app.PATCH("/api/models/config-json/:name", PatchConfigEndpoint(configLoader, modelLoader, nil, appConfig)) }) AfterEach(func() { diff --git a/core/http/endpoints/localai/edit_model.go b/core/http/endpoints/localai/edit_model.go index 4cc0477fb..5dd573751 100644 --- a/core/http/endpoints/localai/edit_model.go +++ b/core/http/endpoints/localai/edit_model.go @@ -10,6 +10,7 @@ import ( "github.com/labstack/echo/v4" "github.com/mudler/LocalAI/core/config" httpUtils "github.com/mudler/LocalAI/core/http/middleware" + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/modeladmin" "github.com/mudler/LocalAI/internal" "github.com/mudler/LocalAI/pkg/model" @@ -55,7 +56,7 @@ func GetEditModelPage(cl *config.ModelConfigLoader, appConfig *config.Applicatio } // EditModelEndpoint handles updating existing model configurations -func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc { +func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc { svc := modeladmin.NewConfigService(cl, appConfig) return func(c echo.Context) error { modelName := c.Param("name") @@ -70,6 +71,17 @@ func EditModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appC if err != nil { return c.JSON(httpStatusForModelAdminError(err), ModelResponse{Success: false, Error: err.Error()}) } + + // Tell peer replicas to refresh their in-memory config: this endpoint + // only reloaded the local loader. A rename is a delete of the old name + // plus an install of the new one. No-op in standalone mode. + if gs != nil { + if result.Renamed { + gs.BroadcastModelsChanged(result.OldName, "delete") + } + gs.BroadcastModelsChanged(result.NewName, "install") + } + msg := fmt.Sprintf("Model '%s' updated successfully. Model has been reloaded with new configuration.", result.NewName) if result.Renamed { msg = fmt.Sprintf("Model '%s' renamed to '%s' and updated successfully.", result.OldName, result.NewName) diff --git a/core/http/endpoints/localai/edit_model_test.go b/core/http/endpoints/localai/edit_model_test.go index 55328dc39..54ad2d5ec 100644 --- a/core/http/endpoints/localai/edit_model_test.go +++ b/core/http/endpoints/localai/edit_model_test.go @@ -56,7 +56,7 @@ var _ = Describe("Edit Model test", func() { app := echo.New() // Set up a simple renderer for the test app.Renderer = &testRenderer{} - app.POST("/import-model", ImportModelEndpoint(modelConfigLoader, applicationConfig)) + app.POST("/import-model", ImportModelEndpoint(modelConfigLoader, nil, applicationConfig)) app.GET("/edit-model/:name", GetEditModelPage(modelConfigLoader, applicationConfig)) requestBody := bytes.NewBufferString(`{"name": "foo", "backend": "foo", "model": "foo"}`) @@ -106,7 +106,7 @@ var _ = Describe("Edit Model test", func() { Expect(exists).To(BeTrue()) app := echo.New() - app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig)) + app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig)) newYAML := "name: newname\nbackend: llama\nmodel: foo\n" req := httptest.NewRequest("POST", "/models/edit/oldname", bytes.NewBufferString(newYAML)) @@ -163,7 +163,7 @@ var _ = Describe("Edit Model test", func() { Expect(modelConfigLoader.LoadModelConfigsFromPath(tempDir)).To(Succeed()) app := echo.New() - app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig)) + app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig)) req := httptest.NewRequest( "POST", @@ -204,7 +204,7 @@ var _ = Describe("Edit Model test", func() { Expect(modelConfigLoader.LoadModelConfigsFromPath(tempDir)).To(Succeed()) app := echo.New() - app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, applicationConfig)) + app.POST("/models/edit/:name", EditModelEndpoint(modelConfigLoader, modelLoader, nil, applicationConfig)) req := httptest.NewRequest( "POST", diff --git a/core/http/endpoints/localai/import_model.go b/core/http/endpoints/localai/import_model.go index 54a80a9cc..21b7673da 100644 --- a/core/http/endpoints/localai/import_model.go +++ b/core/http/endpoints/localai/import_model.go @@ -125,7 +125,7 @@ func ImportModelURIEndpoint(cl *config.ModelConfigLoader, appConfig *config.Appl } // ImportModelEndpoint handles creating new model configurations -func ImportModelEndpoint(cl *config.ModelConfigLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc { +func ImportModelEndpoint(cl *config.ModelConfigLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc { return func(c echo.Context) error { // Get the raw body body, err := io.ReadAll(c.Request().Body) @@ -245,6 +245,13 @@ func ImportModelEndpoint(cl *config.ModelConfigLoader, appConfig *config.Applica } return c.JSON(http.StatusInternalServerError, response) } + // Tell peer replicas to load the newly-created config from the shared + // models dir: this endpoint only reloaded the local loader. No-op in + // standalone mode. + if gs != nil { + gs.BroadcastModelsChanged(modelConfig.Name, "install") + } + // Return success response response := ModelResponse{ Success: true, diff --git a/core/http/endpoints/localai/toggle_model.go b/core/http/endpoints/localai/toggle_model.go index e4e71ca64..545fdc8af 100644 --- a/core/http/endpoints/localai/toggle_model.go +++ b/core/http/endpoints/localai/toggle_model.go @@ -7,6 +7,7 @@ import ( "github.com/labstack/echo/v4" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/modeladmin" "github.com/mudler/LocalAI/pkg/model" ) @@ -24,7 +25,7 @@ import ( // @Failure 404 {object} ModelResponse // @Failure 500 {object} ModelResponse // @Router /api/models/{name}/{action} [put] -func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc { +func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, gs *galleryop.GalleryService, appConfig *config.ApplicationConfig) echo.HandlerFunc { svc := modeladmin.NewConfigService(cl, appConfig) return func(c echo.Context) error { modelName := c.Param("name") @@ -36,6 +37,14 @@ func ToggleStateModelEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoade if err != nil { return c.JSON(httpStatusForModelAdminError(err), ModelResponse{Success: false, Error: err.Error()}) } + + // Enabling/disabling rewrites the config on disk and reloads only the + // local loader; tell peers to refresh so the model's availability is + // consistent across replicas. No-op in standalone mode. + if gs != nil { + gs.BroadcastModelsChanged(modelName, "install") + } + msg := fmt.Sprintf("Model '%s' has been %sd successfully.", modelName, action) if action == modeladmin.ActionDisable { msg += " The model will not be loaded on demand until re-enabled." diff --git a/core/http/routes/localai.go b/core/http/routes/localai.go index 212f379f0..763623a7f 100644 --- a/core/http/routes/localai.go +++ b/core/http/routes/localai.go @@ -72,19 +72,19 @@ func RegisterLocalAIRoutes(router *echo.Echo, router.POST("/backends/upgrades/check", backendGalleryEndpointService.CheckUpgradesEndpoint(), adminMiddleware) router.POST("/backends/upgrade/:name", backendGalleryEndpointService.UpgradeBackendEndpoint(), adminMiddleware) // Custom model import endpoint - router.POST("/models/import", localai.ImportModelEndpoint(cl, appConfig), adminMiddleware) + router.POST("/models/import", localai.ImportModelEndpoint(cl, galleryService, appConfig), adminMiddleware) // URI model import endpoint router.POST("/models/import-uri", localai.ImportModelURIEndpoint(cl, appConfig, galleryService, opcache), adminMiddleware) // Custom model edit endpoint - router.POST("/models/edit/:name", localai.EditModelEndpoint(cl, ml, appConfig), adminMiddleware) + router.POST("/models/edit/:name", localai.EditModelEndpoint(cl, ml, galleryService, appConfig), adminMiddleware) // List model aliases endpoint router.GET("/api/aliases", localai.ListAliasesEndpoint(cl), adminMiddleware) // Toggle model enable/disable endpoint - router.PUT("/models/toggle-state/:name/:action", localai.ToggleStateModelEndpoint(cl, ml, appConfig), adminMiddleware) + router.PUT("/models/toggle-state/:name/:action", localai.ToggleStateModelEndpoint(cl, ml, galleryService, appConfig), adminMiddleware) // Toggle model pinned status endpoint router.PUT("/models/toggle-pinned/:name/:action", localai.TogglePinnedModelEndpoint(cl, appConfig, func() { diff --git a/core/http/routes/ui_api.go b/core/http/routes/ui_api.go index e26894273..d9c99c6b9 100644 --- a/core/http/routes/ui_api.go +++ b/core/http/routes/ui_api.go @@ -922,7 +922,7 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model app.GET("/api/models/config-metadata/autocomplete/:provider", localai.AutocompleteEndpoint(cl, ml, appConfig), adminMiddleware) // PATCH config endpoint - partial update using nested JSON merge - app.PATCH("/api/models/config-json/:name", localai.PatchConfigEndpoint(cl, ml, appConfig), adminMiddleware) + app.PATCH("/api/models/config-json/:name", localai.PatchConfigEndpoint(cl, ml, galleryService, appConfig), adminMiddleware) // VRAM estimation endpoint app.POST("/api/models/vram-estimate", localai.VRAMEstimateEndpoint(cl, appConfig), adminMiddleware) diff --git a/core/services/galleryop/distributed_sync_test.go b/core/services/galleryop/distributed_sync_test.go index 7c1087de8..71a96c7ae 100644 --- a/core/services/galleryop/distributed_sync_test.go +++ b/core/services/galleryop/distributed_sync_test.go @@ -404,6 +404,36 @@ var _ = Describe("GalleryService cache invalidation broadcasts", func() { Element: "x", Op: "install", })).To(Succeed()) }) + + It("BroadcastModelsChanged delivers the element and op to a peer's OnModelsChanged", func() { + var ( + mu sync.Mutex + seen []messaging.CacheInvalidateEvent + ) + svcB.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) { + mu.Lock() + seen = append(seen, evt) + mu.Unlock() + } + Expect(svcA.SubscribeBroadcasts()).To(Succeed()) + Expect(svcB.SubscribeBroadcasts()).To(Succeed()) + + // An admin edit on replica A must reach replica B over the same subject + // the gallery path uses, so B refreshes its in-memory config loader. + svcA.BroadcastModelsChanged("my-alias", "install") + + mu.Lock() + defer mu.Unlock() + Expect(seen).To(ContainElement(messaging.CacheInvalidateEvent{ + Element: "my-alias", Op: "install", + })) + }) + + It("BroadcastModelsChanged is a no-op when NATS is not wired (standalone)", func() { + standalone := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + // No SetNATSClient: must not panic and must simply do nothing. + Expect(func() { standalone.BroadcastModelsChanged("x", "delete") }).ToNot(Panic()) + }) }) var _ = Describe("GalleryService PostgreSQL hydration", func() { diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index d01d9cc19..abe399088 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -201,6 +201,24 @@ func (g *GalleryService) publishCacheInvalidate(subject string, evt messaging.Ca } } +// BroadcastModelsChanged notifies peer replicas that a model config was +// created, edited, or removed out-of-band of the gallery install/delete +// channel (e.g. the admin /models/edit, /models/import and +// /models/toggle-state endpoints, which write the YAML and reload only the +// local in-memory loader). Peers receive it via OnModelsChanged and refresh +// their own ModelConfigLoader so a request load-balanced to any replica sees +// the same config. No-op in standalone mode (no NATS client). +// +// op is "install" for a create/edit (the element must be (re)loaded from +// disk) or "delete" for a removal (the element must be pruned from memory, +// which a reload-from-path cannot do because the loader is additive). +func (g *GalleryService) BroadcastModelsChanged(element, op string) { + g.publishCacheInvalidate(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{ + Element: element, + Op: op, + }) +} + // mergeStatus is the broadcast-side merge: it updates the in-memory map from // a peer's GalleryProgressEvent without re-publishing to NATS or re-writing // to PostgreSQL. UpdateStatus is the local-write entry point and does both; diff --git a/core/services/modeladmin/remote_sync.go b/core/services/modeladmin/remote_sync.go new file mode 100644 index 000000000..5acf5bf9a --- /dev/null +++ b/core/services/modeladmin/remote_sync.go @@ -0,0 +1,53 @@ +package modeladmin + +import ( + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/pkg/model" + + "github.com/mudler/xlog" +) + +// opDelete is the CacheInvalidateEvent.Op value the gallery delete path and the +// admin delete endpoint use; a delete must prune (a reload-from-path cannot). +const opDelete = "delete" + +// ApplyRemoteChange refreshes this replica's in-memory model state from a peer +// replica's model-config change broadcast (messaging.CacheInvalidateEvent on +// SubjectCacheInvalidateModels). It is the subscriber-side counterpart to +// GalleryService.BroadcastModelsChanged. +// +// The op matters because LoadModelConfigsFromPath is additive: it loads every +// YAML on disk into the loader but never removes an entry whose file is gone. +// So a delete cannot be propagated by a plain reload - the deleted element must +// be explicitly pruned. Specifically: +// +// - op == "delete" with a named element: prune that element from the loader. +// - otherwise: reload all configs from disk (picks up creates and edits). +// +// In both cases, when an element is named, any running instance on this replica +// is shut down (best-effort) so the next request rebuilds it from the new +// config instead of serving the stale one - mirroring what the originating +// replica does on a local edit/delete. +// +// ml may be nil (no running instances to shut down). modelsPath and opts are +// forwarded to LoadModelConfigsFromPath. +func ApplyRemoteChange(cl *config.ModelConfigLoader, ml *model.ModelLoader, modelsPath string, evt messaging.CacheInvalidateEvent, opts ...config.ConfigLoaderOption) error { + if evt.Op == opDelete && evt.Element != "" { + cl.RemoveModelConfig(evt.Element) + } else if err := cl.LoadModelConfigsFromPath(modelsPath, opts...); err != nil { + return err + } + + // Drop any running instance of the affected model so the next request + // rebuilds it from the refreshed config instead of serving the stale one. + // Best-effort: the model may not be loaded on this replica, which surfaces + // as a benign error here. + if ml != nil && evt.Element != "" { + if err := ml.ShutdownModel(evt.Element); err != nil { + xlog.Debug("ApplyRemoteChange: could not shut down model instance (likely not loaded)", + "model", evt.Element, "error", err) + } + } + return nil +} diff --git a/core/services/modeladmin/remote_sync_test.go b/core/services/modeladmin/remote_sync_test.go new file mode 100644 index 000000000..df4907a02 --- /dev/null +++ b/core/services/modeladmin/remote_sync_test.go @@ -0,0 +1,80 @@ +package modeladmin + +import ( + "os" + "path/filepath" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/messaging" +) + +var _ = Describe("ApplyRemoteChange", func() { + var ( + dir string + loader *config.ModelConfigLoader + ) + + BeforeEach(func() { + dir = GinkgoT().TempDir() + loader = config.NewModelConfigLoader(dir) + }) + + writeYAML := func(name string, body map[string]any) { + body["name"] = name + data, err := yaml.Marshal(body) + Expect(err).ToNot(HaveOccurred()) + Expect(os.WriteFile(filepath.Join(dir, name+".yaml"), data, 0644)).To(Succeed()) + } + + It("loads a peer-created config from disk on an install event", func() { + // Peer wrote the YAML to the shared models dir; this replica has not + // loaded it yet (empty in-memory loader). + writeYAML("peer-alias", map[string]any{"alias": "qwen"}) + _, ok := loader.GetModelConfig("peer-alias") + Expect(ok).To(BeFalse(), "precondition: not yet in memory") + + err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{ + Element: "peer-alias", Op: "install", + }) + Expect(err).ToNot(HaveOccurred()) + + _, ok = loader.GetModelConfig("peer-alias") + Expect(ok).To(BeTrue(), "install event must reload the new config from disk") + }) + + It("prunes a peer-deleted config that a reload-from-path cannot drop", func() { + // Model is present in memory (loaded earlier) but its file is now gone + // from the shared dir. LoadModelConfigsFromPath is additive, so only an + // explicit prune can remove it - this is the cross-replica delete bug. + writeYAML("doomed", map[string]any{"alias": "qwen"}) + Expect(loader.LoadModelConfigsFromPath(dir)).To(Succeed()) + _, ok := loader.GetModelConfig("doomed") + Expect(ok).To(BeTrue(), "precondition: in memory") + Expect(os.Remove(filepath.Join(dir, "doomed.yaml"))).To(Succeed()) + + err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{ + Element: "doomed", Op: "delete", + }) + Expect(err).ToNot(HaveOccurred()) + + _, ok = loader.GetModelConfig("doomed") + Expect(ok).To(BeFalse(), "delete event must prune the element from memory") + }) + + It("does a full reload when no element is named", func() { + writeYAML("m1", map[string]any{"alias": "qwen"}) + writeYAML("m2", map[string]any{"alias": "qwen"}) + + err := ApplyRemoteChange(loader, nil, dir, messaging.CacheInvalidateEvent{}) + Expect(err).ToNot(HaveOccurred()) + + _, ok1 := loader.GetModelConfig("m1") + _, ok2 := loader.GetModelConfig("m2") + Expect(ok1).To(BeTrue()) + Expect(ok2).To(BeTrue()) + }) +})