diff --git a/core/services/worker/config.go b/core/services/worker/config.go index 500f1355f..97d2a6582 100644 --- a/core/services/worker/config.go +++ b/core/services/worker/config.go @@ -25,9 +25,20 @@ type Config struct { BackendsPath string `env:"LOCALAI_BACKENDS_PATH,BACKENDS_PATH" type:"path" default:"${basepath}/backends" help:"Path containing backends" group:"server"` BackendsSystemPath string `env:"LOCALAI_BACKENDS_SYSTEM_PATH" type:"path" default:"/var/lib/local-ai/backends" help:"Path containing system backends" group:"server"` BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"server" default:"${backends}"` + Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of model galleries (used to resolve --prefetch-models on boot)" group:"server" default:"${galleries}"` ModelsPath string `env:"LOCALAI_MODELS_PATH,MODELS_PATH" type:"path" default:"${basepath}/models" help:"Path containing models" group:"server"` RequireBackendIntegrity bool `env:"LOCALAI_REQUIRE_BACKEND_INTEGRITY,REQUIRE_BACKEND_INTEGRITY" help:"If true, reject backend installs without a configured signature verification policy (OCI URIs) or SHA256 (tarball/HTTP URIs)." group:"hardening" default:"false"` + // PrefetchModels lets a worker download gallery model artifacts (GGUFs, etc.) + // from its own outbound internet at boot, instead of waiting for the master to + // stream them over the cluster network at first-inference time. Useful when the + // cluster-internal path is slow (slirp/circuit-relay, CGNAT) but outbound NAT + // works fine. Resolution reuses the same gallery installer the master uses, so + // the on-disk /models layout is identical. Errors are non-fatal — if the gallery + // is unreachable on boot, the worker logs a warning and starts the NATS loop + // anyway; the master can still push the file on demand (existing behaviour). + PrefetchModels []string `env:"LOCALAI_PREFETCH_MODELS,PREFETCH_MODELS" help:"Comma-separated gallery model IDs to download from LOCALAI_GALLERIES at worker boot (e.g. 'llama-3.2-1b-instruct,phi-3-mini-4k'). Skipped if already on disk and SHA matches." group:"server"` + // HTTP file transfer HTTPAddr string `env:"LOCALAI_HTTP_ADDR" default:"" help:"HTTP file transfer server address (default: gRPC port + 1)" group:"server" hidden:""` AdvertiseHTTPAddr string `env:"LOCALAI_ADVERTISE_HTTP_ADDR" help:"HTTP address the frontend uses to reach this node for file transfer" group:"server" hidden:""` diff --git a/core/services/worker/prefetch.go b/core/services/worker/prefetch.go new file mode 100644 index 000000000..4aec36a99 --- /dev/null +++ b/core/services/worker/prefetch.go @@ -0,0 +1,145 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" + "github.com/mudler/xlog" +) + +// modelInstaller is the subset of gallery.InstallModelFromGallery the prefetch +// loop needs. Carved out as a function value so tests can substitute a fake +// installer without touching the real gallery — and so we don't duplicate the +// full install pipeline (URL resolution, SHA verification, idempotent skip, +// config-file write) which already lives in core/gallery/models.go. +type modelInstaller func( + ctx context.Context, + modelGalleries, backendGalleries []config.Gallery, + systemState *system.SystemState, + modelLoader *model.ModelLoader, + name string, +) error + +// realModelInstaller is the production binding to gallery.InstallModelFromGallery. +// Kept as a package-level var so tests can swap it; production code never +// reassigns it. +var realModelInstaller modelInstaller = func( + ctx context.Context, + modelGalleries, backendGalleries []config.Gallery, + systemState *system.SystemState, + modelLoader *model.ModelLoader, + name string, +) error { + // enforceScan=false: workers fetch from the same gallery the master already + // trusts, and the master would have scanned at install time anyway. + // autoloadBackendGalleries=false: the worker installs backends on demand via + // backend.install NATS events; prefetching the backend here would race the + // supervisor's own install path and double-trigger gallery work. + // requireBackendIntegrity=false: same reason — we're not installing a backend. + return gallery.InstallModelFromGallery( + ctx, + modelGalleries, + backendGalleries, + systemState, + modelLoader, + name, + gallery.GalleryModel{}, + nil, /* downloadStatus: silent on the worker; master is the UX surface */ + false /* enforceScan */, false /* autoloadBackendGalleries */, false, /* requireBackendIntegrity */ + ) +} + +// prefetchModels resolves each configured gallery ID against the model gallery +// and downloads the artifact into the worker's /models. It is called once at +// worker startup, BEFORE the NATS lifecycle subscription, so that the steady +// state has the file already on disk and the master never needs to stream it. +// +// Errors are intentionally non-fatal: on a fresh worker with no outbound +// connectivity (or a misconfigured gallery JSON), we want the worker to still +// register and serve traffic — the master will fall back to pushing files +// on-demand over NATS/HTTP, which is the pre-existing behavior. Per-model +// failures are logged at warn level and the loop continues with the next ID. +// +// Idempotency comes for free from pkg/downloader.URI.DownloadFileWithContext: +// it stats the target path, hashes it if a SHA is configured, and short-circuits +// on a match. So restarts against a populated PVC are effectively no-ops. +func prefetchModels( + ctx context.Context, + cfg *Config, + systemState *system.SystemState, + ml *model.ModelLoader, + backendGalleries []config.Gallery, + installer modelInstaller, +) { + models := normalizePrefetchList(cfg.PrefetchModels) + if len(models) == 0 { + return + } + + modelGalleries, err := parseModelGalleries(cfg.Galleries) + if err != nil { + // Without a model-gallery config we cannot resolve gallery IDs. Warn + // and let the worker proceed — the master can still push files later. + xlog.Warn("Skipping model prefetch: invalid LOCALAI_GALLERIES", "error", err) + return + } + if len(modelGalleries) == 0 { + xlog.Warn("Skipping model prefetch: no model galleries configured (set LOCALAI_GALLERIES)", "models", models) + return + } + + if installer == nil { + installer = realModelInstaller + } + + xlog.Info("Prefetching models from gallery before entering NATS loop", "count", len(models), "models", models) + for _, name := range models { + xlog.Info("Prefetching model", "model", name) + if err := installer(ctx, modelGalleries, backendGalleries, systemState, ml, name); err != nil { + // Non-fatal: master can still push the file on demand. We log + // loudly so an operator can spot a misconfigured gallery ID or + // a missing outbound route without the worker crash-looping. + xlog.Warn("Model prefetch failed; master will push on demand", "model", name, "error", err) + continue + } + xlog.Info("Prefetched model", "model", name) + } +} + +// normalizePrefetchList trims whitespace and drops empty entries. kong already +// splits comma-separated env values into []string, but callers using the CLI +// flag repeatedly (or pasting whitespace) can produce stragglers we don't want +// to ship into the gallery resolver as "" or " ". +func normalizePrefetchList(in []string) []string { + out := make([]string, 0, len(in)) + for _, s := range in { + s = strings.TrimSpace(s) + if s == "" { + continue + } + out = append(out, s) + } + return out +} + +// parseModelGalleries parses the JSON-encoded LOCALAI_GALLERIES value the same +// way the master does. Returns an empty slice (not nil) and nil error when the +// input is empty, so callers can treat "" as "not configured" without a +// secondary check. +func parseModelGalleries(raw string) ([]config.Gallery, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return []config.Gallery{}, nil + } + var galleries []config.Gallery + if err := json.Unmarshal([]byte(raw), &galleries); err != nil { + return nil, fmt.Errorf("parsing model galleries JSON: %w", err) + } + return galleries, nil +} diff --git a/core/services/worker/prefetch_test.go b/core/services/worker/prefetch_test.go new file mode 100644 index 000000000..025984080 --- /dev/null +++ b/core/services/worker/prefetch_test.go @@ -0,0 +1,232 @@ +package worker + +import ( + "context" + "errors" + "os" + "path/filepath" + "sync" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// fakeInstaller records every call and lets each test choose the behaviour +// for a given model name. Modelled on the real modelInstaller signature so +// the prefetch loop can't accidentally widen its dependency on the gallery +// package without breaking the contract here too. +type fakeInstaller struct { + mu sync.Mutex + calls []string + behavior map[string]error // model name -> error to return; missing key = nil (success) +} + +func newFakeInstaller() *fakeInstaller { + return &fakeInstaller{behavior: map[string]error{}} +} + +func (f *fakeInstaller) install( + _ context.Context, + _ []config.Gallery, + _ []config.Gallery, + _ *system.SystemState, + _ *model.ModelLoader, + name string, +) error { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, name) + return f.behavior[name] +} + +func (f *fakeInstaller) recorded() []string { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]string, len(f.calls)) + copy(out, f.calls) + return out +} + +// minimalSystemState yields a SystemState pointed at a fresh tempdir. The +// prefetch loop never touches it directly (the fake installer ignores it), +// but the production signature requires a non-nil value and we want the +// tests to mirror the real wiring as closely as possible. +func minimalSystemState(modelsDir string) *system.SystemState { + ss, err := system.GetSystemState(system.WithModelPath(modelsDir)) + Expect(err).ToNot(HaveOccurred()) + return ss +} + +var _ = Describe("prefetchModels", func() { + var ( + tmp string + ss *system.SystemState + galleries = `[{"name":"localai","url":"file:///dev/null"}]` + ) + + BeforeEach(func() { + var err error + tmp, err = os.MkdirTemp("", "worker-prefetch-test") + Expect(err).ToNot(HaveOccurred()) + ss = minimalSystemState(tmp) + }) + + AfterEach(func() { + _ = os.RemoveAll(tmp) + }) + + It("installs every configured model before returning (happy path)", func() { + f := newFakeInstaller() + cfg := &Config{ + PrefetchModels: []string{"llama-3.2-1b", "phi-3-mini"}, + Galleries: galleries, + ModelsPath: tmp, + } + + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + + Expect(f.recorded()).To(Equal([]string{"llama-3.2-1b", "phi-3-mini"})) + }) + + It("is a no-op when the file is already present on disk (idempotency contract)", func() { + // Drop a fake artifact where the gallery would have placed it; the + // real installer's downloader stats the path and short-circuits when + // the SHA matches (or any time SHA is unset). We simulate that here + // by having the fake installer assert the file exists before claiming + // success — exactly the invariant we rely on for restart-against-PVC. + modelFile := filepath.Join(tmp, "llama-3.2-1b.gguf") + Expect(os.WriteFile(modelFile, []byte("already-here"), 0o600)).To(Succeed()) + + var installerCalls int + var sawFile bool + fake := func( + _ context.Context, + _, _ []config.Gallery, + _ *system.SystemState, + _ *model.ModelLoader, + _ string, + ) error { + installerCalls++ + if _, err := os.Stat(modelFile); err == nil { + sawFile = true + } + // Real installer returns nil on cache hit. Mirror that. + return nil + } + + cfg := &Config{ + PrefetchModels: []string{"llama-3.2-1b"}, + Galleries: galleries, + ModelsPath: tmp, + } + prefetchModels(context.Background(), cfg, ss, nil, nil, fake) + + Expect(installerCalls).To(Equal(1), "installer is always invoked; its downloader handles the skip") + Expect(sawFile).To(BeTrue(), "fake installer saw the pre-existing artifact, matching the restart-on-warm-PVC path") + + // File untouched. + data, err := os.ReadFile(modelFile) + Expect(err).ToNot(HaveOccurred()) + Expect(string(data)).To(Equal("already-here")) + }) + + It("logs and continues when an individual install fails (non-fatal contract)", func() { + f := newFakeInstaller() + f.behavior["broken"] = errors.New("gallery unreachable") + + cfg := &Config{ + PrefetchModels: []string{"good-1", "broken", "good-2"}, + Galleries: galleries, + ModelsPath: tmp, + } + + // Function must return normally despite the middle install failing. + Expect(func() { + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + }).ToNot(Panic()) + + // Subsequent models still get attempted — failures don't short-circuit. + Expect(f.recorded()).To(Equal([]string{"good-1", "broken", "good-2"})) + }) + + It("does nothing when PrefetchModels is empty", func() { + f := newFakeInstaller() + cfg := &Config{ModelsPath: tmp, Galleries: galleries} + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + Expect(f.recorded()).To(BeEmpty()) + }) + + It("trims whitespace and skips empty entries", func() { + f := newFakeInstaller() + cfg := &Config{ + PrefetchModels: []string{" llama ", "", " ", "phi"}, + Galleries: galleries, + ModelsPath: tmp, + } + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + Expect(f.recorded()).To(Equal([]string{"llama", "phi"})) + }) + + It("skips prefetch when LOCALAI_GALLERIES is malformed (non-fatal)", func() { + f := newFakeInstaller() + cfg := &Config{ + PrefetchModels: []string{"llama"}, + Galleries: `not-json`, + ModelsPath: tmp, + } + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + Expect(f.recorded()).To(BeEmpty(), "no installer call when galleries can't be parsed") + }) + + It("skips prefetch when no galleries are configured (non-fatal)", func() { + f := newFakeInstaller() + cfg := &Config{ + PrefetchModels: []string{"llama"}, + Galleries: `[]`, + ModelsPath: tmp, + } + prefetchModels(context.Background(), cfg, ss, nil, nil, f.install) + Expect(f.recorded()).To(BeEmpty()) + }) +}) + +var _ = Describe("parseModelGalleries", func() { + It("returns empty slice on empty input", func() { + g, err := parseModelGalleries("") + Expect(err).ToNot(HaveOccurred()) + Expect(g).To(BeEmpty()) + }) + + It("returns empty slice on whitespace-only input", func() { + g, err := parseModelGalleries(" \n\t ") + Expect(err).ToNot(HaveOccurred()) + Expect(g).To(BeEmpty()) + }) + + It("parses a valid JSON list", func() { + g, err := parseModelGalleries(`[{"name":"localai","url":"github:mudler/LocalAI/gallery/index.yaml@master"}]`) + Expect(err).ToNot(HaveOccurred()) + Expect(g).To(HaveLen(1)) + Expect(g[0].Name).To(Equal("localai")) + }) + + It("returns an error on malformed JSON", func() { + _, err := parseModelGalleries(`{this is not json`) + Expect(err).To(HaveOccurred()) + }) +}) + +var _ = Describe("normalizePrefetchList", func() { + It("drops empty and whitespace-only entries", func() { + got := normalizePrefetchList([]string{"a", "", " ", " b ", "\tc\n"}) + Expect(got).To(Equal([]string{"a", "b", "c"})) + }) + + It("returns empty slice on empty input", func() { + Expect(normalizePrefetchList(nil)).To(BeEmpty()) + Expect(normalizePrefetchList([]string{})).To(BeEmpty()) + }) +}) diff --git a/core/services/worker/worker.go b/core/services/worker/worker.go index 744e368ac..ff03d7b55 100644 --- a/core/services/worker/worker.go +++ b/core/services/worker/worker.go @@ -50,6 +50,17 @@ func Run(ctx *cliContext.Context, cfg *Config) error { xlog.Warn("Failed to parse backend galleries", "error", err) } + // Prefetch gallery models over the worker's outbound internet before we + // start accepting backend.install events. Non-fatal on every failure path: + // if the gallery is unreachable, an ID is unknown, or LOCALAI_GALLERIES is + // malformed, the worker still starts and the master can push files on + // demand (existing fallback behaviour). Placed BEFORE registration so a + // large download doesn't delay heartbeat — registration happens after. + // Actually: keep it before registration so a worker that's still warming + // the cache isn't yet announced as ready. The fast no-op path on a hot + // PVC keeps restarts cheap. + prefetchModels(context.Background(), cfg, systemState, ml, galleries, nil) + // Self-registration with frontend (with retry) regClient := &workerregistry.RegistrationClient{ FrontendURL: cfg.RegisterTo,