diff --git a/core/application/application.go b/core/application/application.go index 7a34279c9..29e05b6d1 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -90,6 +90,8 @@ type Application struct { // LocalAI Assistant in-process MCP server. nil when DisableLocalAIAssistant // is set; otherwise initialised in start() after galleryService. localAIAssistant *mcpTools.LocalAIAssistantHolder + + shutdownOnce sync.Once } func newApplication(appConfig *config.ApplicationConfig) *Application { @@ -320,6 +322,24 @@ func (a *Application) IsDistributed() bool { return a.distributed != nil } +// Shutdown stops backend gRPC processes and distributed services +// synchronously on the caller's stack. The context-cancel goroutine wired +// in New does the same work asynchronously, which races test-binary exit +// and CLI shutdown — orphaning spawned mock-backend / llama.cpp / etc. +// children to init. Callers that need a guarantee that cleanup has +// finished before they proceed (AfterSuite/AfterEach, signal handlers) +// must call this. Safe to call multiple times. +func (a *Application) Shutdown() error { + var err error + a.shutdownOnce.Do(func() { + a.distributed.Shutdown() + if a.modelLoader != nil { + err = a.modelLoader.StopAllGRPC() + } + }) + return err +} + // waitForHealthyWorker blocks until at least one healthy backend worker is registered. // This prevents the agent pool from failing during startup when workers haven't connected yet. func (a *Application) waitForHealthyWorker() { diff --git a/core/application/startup.go b/core/application/startup.go index 9fb6519aa..be559479f 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -449,13 +449,15 @@ func New(opts ...config.AppOption) (*Application, error) { application.ModelLoader().SetBackendLoggingEnabled(options.EnableBackendLogging) - // turn off any process that was started by GRPC if the context is canceled + // Safety-net cleanup if the application context is cancelled without + // the caller invoking Shutdown directly. This is fire-and-forget — it + // races binary exit and is unreliable in tests; the deterministic path + // is application.Shutdown(), which Shutdown's sync.Once dedupes with + // this goroutine. go func() { <-options.Context.Done() xlog.Debug("Context canceled, shutting down") - application.distributed.Shutdown() - err := application.ModelLoader().StopAllGRPC() - if err != nil { + if err := application.Shutdown(); err != nil { xlog.Error("error while stopping all grpc backends", "error", err) } }() diff --git a/core/cli/run.go b/core/cli/run.go index 725fc0939..09a58971b 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -577,12 +577,8 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } signals.RegisterGracefulTerminationHandler(func() { - if err := app.ModelLoader().StopAllGRPC(); err != nil { - xlog.Error("error while stopping all grpc backends", "error", err) - } - // Clean up distributed services (idempotent — safe if already called) - if d := app.Distributed(); d != nil { - d.Shutdown() + if err := app.Shutdown(); err != nil { + xlog.Error("error while shutting down application", "error", err) } }) diff --git a/core/http/app_test.go b/core/http/app_test.go index bd7fa501e..735edaf1c 100644 --- a/core/http/app_test.go +++ b/core/http/app_test.go @@ -308,6 +308,11 @@ var _ = Describe("API test", func() { var cancel context.CancelFunc var tmpdir string var modelDir string + // localAIApp captures the Application so AfterEach can synchronously + // stop the spawned gRPC backend processes. application.New cancels + // them asynchronously on context cancel, which races with test-binary + // exit and leaks mock-backend children to init. + var localAIApp *application.Application commonOpts := []config.AppOption{ config.WithDebug(true), @@ -736,14 +741,14 @@ parameters: ) Expect(err).ToNot(HaveOccurred()) - application, err := application.New( + localAIApp, err = application.New( append(commonOpts, config.WithContext(c), config.WithSystemState(systemState), )...) Expect(err).ToNot(HaveOccurred()) - application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) - app, err = API(application) + localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) + app, err = API(localAIApp) Expect(err).ToNot(HaveOccurred()) go func() { if err := app.Start("127.0.0.1:9090"); err != nil && err != http.ErrServerClosed { @@ -765,6 +770,11 @@ parameters: }, "2m").ShouldNot(HaveOccurred()) }) AfterEach(func() { + // Synchronous shutdown — context-cancel cleanup is async and races + // test-binary exit, orphaning mock-backend children to init. + if localAIApp != nil { + _ = localAIApp.Shutdown() + } cancel() if app != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -976,15 +986,15 @@ parameters: ) Expect(err).ToNot(HaveOccurred()) - application, err := application.New( + localAIApp, err = application.New( append(commonOpts, config.WithContext(c), config.WithSystemState(systemState), config.WithConfigFile(configFile))..., ) Expect(err).ToNot(HaveOccurred()) - application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) - app, err = API(application) + localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) + app, err = API(localAIApp) Expect(err).ToNot(HaveOccurred()) go func() { @@ -1005,6 +1015,11 @@ parameters: }, "2m").ShouldNot(HaveOccurred()) }) AfterEach(func() { + // Synchronous shutdown — context-cancel cleanup is async and races + // test-binary exit, orphaning mock-backend children to init. + if localAIApp != nil { + _ = localAIApp.Shutdown() + } cancel() if app != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/tests/e2e/e2e_suite_test.go b/tests/e2e/e2e_suite_test.go index 65bb9b852..49e21f417 100644 --- a/tests/e2e/e2e_suite_test.go +++ b/tests/e2e/e2e_suite_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/labstack/echo/v4" - "github.com/mudler/LocalAI/core/application" + localaiapp "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/config" httpapi "github.com/mudler/LocalAI/core/http" "github.com/mudler/LocalAI/pkg/system" @@ -41,6 +41,7 @@ var ( cloudProxyPath string mcpServerURL string mcpServerShutdown func() + localAIApp *localaiapp.Application // Cloud-proxy fake upstreams. Live for the whole suite so the four // cloud-proxy model YAMLs can point at their URLs at startup time. @@ -390,7 +391,7 @@ var _ = BeforeSuite(func() { // Create application instance (GeneratedContentDir so sound-generation/TTS can write files the handler sends) generatedDir := filepath.Join(tmpDir, "generated") Expect(os.MkdirAll(generatedDir, 0750)).To(Succeed()) - application, err := application.New( + localAIApp, err = localaiapp.New( config.WithContext(appCtx), config.WithSystemState(systemState), config.WithDebug(true), @@ -399,14 +400,14 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) // Register mock backend (always available for non-realtime tests). - application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) - application.ModelLoader().SetExternalBackend("opus", mockBackendPath) + localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath) + localAIApp.ModelLoader().SetExternalBackend("opus", mockBackendPath) if cloudProxyPath != "" { - application.ModelLoader().SetExternalBackend("cloud-proxy", cloudProxyPath) + localAIApp.ModelLoader().SetExternalBackend("cloud-proxy", cloudProxyPath) } // Create HTTP app - app, err = httpapi.API(application) + app, err = httpapi.API(localAIApp) Expect(err).ToNot(HaveOccurred()) // Get free port @@ -436,6 +437,14 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { + // Synchronous shutdown — the context-cancel goroutine in application.New + // runs the same cleanup asynchronously, which races test-binary exit and + // orphans spawned mock-backend children to init. + if localAIApp != nil { + if err := localAIApp.Shutdown(); err != nil { + xlog.Error("error shutting down application", "error", err) + } + } if appCancel != nil { appCancel() }