mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-31 12:07:45 -04:00
feat(worker): add LOCALAI_PREFETCH_MODELS for boot-time gallery prefetch (#10108)
In LocalAI distributed mode the master streams a model GGUF to a worker on first inference. On bandwidth-constrained cluster networks (libp2p circuit-v2 relays under NAT, double-NAT residential, slow overlays) that transfer can be slow or unreliable — meanwhile each worker's outbound internet is usually fine. LOCALAI_PREFETCH_MODELS lets the operator name gallery model IDs to download at worker boot, BEFORE the worker subscribes to backend.install events. Reuses gallery.InstallModelFromGallery so the on-disk /models layout matches what the master would have pushed, and the master can still push files on demand if the gallery is unreachable at boot (prefetch is non-fatal on every error path). The installer is wrapped in a function-value indirection so tests can swap a fake without touching the real gallery; production never reassigns the binding. Assisted-by: Claude:claude-opus-4-7 Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -25,9 +25,20 @@ type Config struct {
|
||||
BackendsPath string `env:"LOCALAI_BACKENDS_PATH,BACKENDS_PATH" type:"path" default:"${basepath}/backends" help:"Path containing backends" group:"server"`
|
||||
BackendsSystemPath string `env:"LOCALAI_BACKENDS_SYSTEM_PATH" type:"path" default:"/var/lib/local-ai/backends" help:"Path containing system backends" group:"server"`
|
||||
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"server" default:"${backends}"`
|
||||
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of model galleries (used to resolve --prefetch-models on boot)" group:"server" default:"${galleries}"`
|
||||
ModelsPath string `env:"LOCALAI_MODELS_PATH,MODELS_PATH" type:"path" default:"${basepath}/models" help:"Path containing models" group:"server"`
|
||||
RequireBackendIntegrity bool `env:"LOCALAI_REQUIRE_BACKEND_INTEGRITY,REQUIRE_BACKEND_INTEGRITY" help:"If true, reject backend installs without a configured signature verification policy (OCI URIs) or SHA256 (tarball/HTTP URIs)." group:"hardening" default:"false"`
|
||||
|
||||
// PrefetchModels lets a worker download gallery model artifacts (GGUFs, etc.)
|
||||
// from its own outbound internet at boot, instead of waiting for the master to
|
||||
// stream them over the cluster network at first-inference time. Useful when the
|
||||
// cluster-internal path is slow (slirp/circuit-relay, CGNAT) but outbound NAT
|
||||
// works fine. Resolution reuses the same gallery installer the master uses, so
|
||||
// the on-disk /models layout is identical. Errors are non-fatal — if the gallery
|
||||
// is unreachable on boot, the worker logs a warning and starts the NATS loop
|
||||
// anyway; the master can still push the file on demand (existing behaviour).
|
||||
PrefetchModels []string `env:"LOCALAI_PREFETCH_MODELS,PREFETCH_MODELS" help:"Comma-separated gallery model IDs to download from LOCALAI_GALLERIES at worker boot (e.g. 'llama-3.2-1b-instruct,phi-3-mini-4k'). Skipped if already on disk and SHA matches." group:"server"`
|
||||
|
||||
// HTTP file transfer
|
||||
HTTPAddr string `env:"LOCALAI_HTTP_ADDR" default:"" help:"HTTP file transfer server address (default: gRPC port + 1)" group:"server" hidden:""`
|
||||
AdvertiseHTTPAddr string `env:"LOCALAI_ADVERTISE_HTTP_ADDR" help:"HTTP address the frontend uses to reach this node for file transfer" group:"server" hidden:""`
|
||||
|
||||
145
core/services/worker/prefetch.go
Normal file
145
core/services/worker/prefetch.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/gallery"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// modelInstaller is the subset of gallery.InstallModelFromGallery the prefetch
|
||||
// loop needs. Carved out as a function value so tests can substitute a fake
|
||||
// installer without touching the real gallery — and so we don't duplicate the
|
||||
// full install pipeline (URL resolution, SHA verification, idempotent skip,
|
||||
// config-file write) which already lives in core/gallery/models.go.
|
||||
type modelInstaller func(
|
||||
ctx context.Context,
|
||||
modelGalleries, backendGalleries []config.Gallery,
|
||||
systemState *system.SystemState,
|
||||
modelLoader *model.ModelLoader,
|
||||
name string,
|
||||
) error
|
||||
|
||||
// realModelInstaller is the production binding to gallery.InstallModelFromGallery.
|
||||
// Kept as a package-level var so tests can swap it; production code never
|
||||
// reassigns it.
|
||||
var realModelInstaller modelInstaller = func(
|
||||
ctx context.Context,
|
||||
modelGalleries, backendGalleries []config.Gallery,
|
||||
systemState *system.SystemState,
|
||||
modelLoader *model.ModelLoader,
|
||||
name string,
|
||||
) error {
|
||||
// enforceScan=false: workers fetch from the same gallery the master already
|
||||
// trusts, and the master would have scanned at install time anyway.
|
||||
// autoloadBackendGalleries=false: the worker installs backends on demand via
|
||||
// backend.install NATS events; prefetching the backend here would race the
|
||||
// supervisor's own install path and double-trigger gallery work.
|
||||
// requireBackendIntegrity=false: same reason — we're not installing a backend.
|
||||
return gallery.InstallModelFromGallery(
|
||||
ctx,
|
||||
modelGalleries,
|
||||
backendGalleries,
|
||||
systemState,
|
||||
modelLoader,
|
||||
name,
|
||||
gallery.GalleryModel{},
|
||||
nil, /* downloadStatus: silent on the worker; master is the UX surface */
|
||||
false /* enforceScan */, false /* autoloadBackendGalleries */, false, /* requireBackendIntegrity */
|
||||
)
|
||||
}
|
||||
|
||||
// prefetchModels resolves each configured gallery ID against the model gallery
|
||||
// and downloads the artifact into the worker's /models. It is called once at
|
||||
// worker startup, BEFORE the NATS lifecycle subscription, so that the steady
|
||||
// state has the file already on disk and the master never needs to stream it.
|
||||
//
|
||||
// Errors are intentionally non-fatal: on a fresh worker with no outbound
|
||||
// connectivity (or a misconfigured gallery JSON), we want the worker to still
|
||||
// register and serve traffic — the master will fall back to pushing files
|
||||
// on-demand over NATS/HTTP, which is the pre-existing behavior. Per-model
|
||||
// failures are logged at warn level and the loop continues with the next ID.
|
||||
//
|
||||
// Idempotency comes for free from pkg/downloader.URI.DownloadFileWithContext:
|
||||
// it stats the target path, hashes it if a SHA is configured, and short-circuits
|
||||
// on a match. So restarts against a populated PVC are effectively no-ops.
|
||||
func prefetchModels(
|
||||
ctx context.Context,
|
||||
cfg *Config,
|
||||
systemState *system.SystemState,
|
||||
ml *model.ModelLoader,
|
||||
backendGalleries []config.Gallery,
|
||||
installer modelInstaller,
|
||||
) {
|
||||
models := normalizePrefetchList(cfg.PrefetchModels)
|
||||
if len(models) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
modelGalleries, err := parseModelGalleries(cfg.Galleries)
|
||||
if err != nil {
|
||||
// Without a model-gallery config we cannot resolve gallery IDs. Warn
|
||||
// and let the worker proceed — the master can still push files later.
|
||||
xlog.Warn("Skipping model prefetch: invalid LOCALAI_GALLERIES", "error", err)
|
||||
return
|
||||
}
|
||||
if len(modelGalleries) == 0 {
|
||||
xlog.Warn("Skipping model prefetch: no model galleries configured (set LOCALAI_GALLERIES)", "models", models)
|
||||
return
|
||||
}
|
||||
|
||||
if installer == nil {
|
||||
installer = realModelInstaller
|
||||
}
|
||||
|
||||
xlog.Info("Prefetching models from gallery before entering NATS loop", "count", len(models), "models", models)
|
||||
for _, name := range models {
|
||||
xlog.Info("Prefetching model", "model", name)
|
||||
if err := installer(ctx, modelGalleries, backendGalleries, systemState, ml, name); err != nil {
|
||||
// Non-fatal: master can still push the file on demand. We log
|
||||
// loudly so an operator can spot a misconfigured gallery ID or
|
||||
// a missing outbound route without the worker crash-looping.
|
||||
xlog.Warn("Model prefetch failed; master will push on demand", "model", name, "error", err)
|
||||
continue
|
||||
}
|
||||
xlog.Info("Prefetched model", "model", name)
|
||||
}
|
||||
}
|
||||
|
||||
// normalizePrefetchList trims whitespace and drops empty entries. kong already
|
||||
// splits comma-separated env values into []string, but callers using the CLI
|
||||
// flag repeatedly (or pasting whitespace) can produce stragglers we don't want
|
||||
// to ship into the gallery resolver as "" or " ".
|
||||
func normalizePrefetchList(in []string) []string {
|
||||
out := make([]string, 0, len(in))
|
||||
for _, s := range in {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
continue
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// parseModelGalleries parses the JSON-encoded LOCALAI_GALLERIES value the same
|
||||
// way the master does. Returns an empty slice (not nil) and nil error when the
|
||||
// input is empty, so callers can treat "" as "not configured" without a
|
||||
// secondary check.
|
||||
func parseModelGalleries(raw string) ([]config.Gallery, error) {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return []config.Gallery{}, nil
|
||||
}
|
||||
var galleries []config.Gallery
|
||||
if err := json.Unmarshal([]byte(raw), &galleries); err != nil {
|
||||
return nil, fmt.Errorf("parsing model galleries JSON: %w", err)
|
||||
}
|
||||
return galleries, nil
|
||||
}
|
||||
232
core/services/worker/prefetch_test.go
Normal file
232
core/services/worker/prefetch_test.go
Normal file
@@ -0,0 +1,232 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/pkg/model"
|
||||
"github.com/mudler/LocalAI/pkg/system"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// fakeInstaller records every call and lets each test choose the behaviour
|
||||
// for a given model name. Modelled on the real modelInstaller signature so
|
||||
// the prefetch loop can't accidentally widen its dependency on the gallery
|
||||
// package without breaking the contract here too.
|
||||
type fakeInstaller struct {
|
||||
mu sync.Mutex
|
||||
calls []string
|
||||
behavior map[string]error // model name -> error to return; missing key = nil (success)
|
||||
}
|
||||
|
||||
func newFakeInstaller() *fakeInstaller {
|
||||
return &fakeInstaller{behavior: map[string]error{}}
|
||||
}
|
||||
|
||||
func (f *fakeInstaller) install(
|
||||
_ context.Context,
|
||||
_ []config.Gallery,
|
||||
_ []config.Gallery,
|
||||
_ *system.SystemState,
|
||||
_ *model.ModelLoader,
|
||||
name string,
|
||||
) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.calls = append(f.calls, name)
|
||||
return f.behavior[name]
|
||||
}
|
||||
|
||||
func (f *fakeInstaller) recorded() []string {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
out := make([]string, len(f.calls))
|
||||
copy(out, f.calls)
|
||||
return out
|
||||
}
|
||||
|
||||
// minimalSystemState yields a SystemState pointed at a fresh tempdir. The
|
||||
// prefetch loop never touches it directly (the fake installer ignores it),
|
||||
// but the production signature requires a non-nil value and we want the
|
||||
// tests to mirror the real wiring as closely as possible.
|
||||
func minimalSystemState(modelsDir string) *system.SystemState {
|
||||
ss, err := system.GetSystemState(system.WithModelPath(modelsDir))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
return ss
|
||||
}
|
||||
|
||||
var _ = Describe("prefetchModels", func() {
|
||||
var (
|
||||
tmp string
|
||||
ss *system.SystemState
|
||||
galleries = `[{"name":"localai","url":"file:///dev/null"}]`
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
tmp, err = os.MkdirTemp("", "worker-prefetch-test")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
ss = minimalSystemState(tmp)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
_ = os.RemoveAll(tmp)
|
||||
})
|
||||
|
||||
It("installs every configured model before returning (happy path)", func() {
|
||||
f := newFakeInstaller()
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{"llama-3.2-1b", "phi-3-mini"},
|
||||
Galleries: galleries,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
|
||||
Expect(f.recorded()).To(Equal([]string{"llama-3.2-1b", "phi-3-mini"}))
|
||||
})
|
||||
|
||||
It("is a no-op when the file is already present on disk (idempotency contract)", func() {
|
||||
// Drop a fake artifact where the gallery would have placed it; the
|
||||
// real installer's downloader stats the path and short-circuits when
|
||||
// the SHA matches (or any time SHA is unset). We simulate that here
|
||||
// by having the fake installer assert the file exists before claiming
|
||||
// success — exactly the invariant we rely on for restart-against-PVC.
|
||||
modelFile := filepath.Join(tmp, "llama-3.2-1b.gguf")
|
||||
Expect(os.WriteFile(modelFile, []byte("already-here"), 0o600)).To(Succeed())
|
||||
|
||||
var installerCalls int
|
||||
var sawFile bool
|
||||
fake := func(
|
||||
_ context.Context,
|
||||
_, _ []config.Gallery,
|
||||
_ *system.SystemState,
|
||||
_ *model.ModelLoader,
|
||||
_ string,
|
||||
) error {
|
||||
installerCalls++
|
||||
if _, err := os.Stat(modelFile); err == nil {
|
||||
sawFile = true
|
||||
}
|
||||
// Real installer returns nil on cache hit. Mirror that.
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{"llama-3.2-1b"},
|
||||
Galleries: galleries,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, fake)
|
||||
|
||||
Expect(installerCalls).To(Equal(1), "installer is always invoked; its downloader handles the skip")
|
||||
Expect(sawFile).To(BeTrue(), "fake installer saw the pre-existing artifact, matching the restart-on-warm-PVC path")
|
||||
|
||||
// File untouched.
|
||||
data, err := os.ReadFile(modelFile)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(string(data)).To(Equal("already-here"))
|
||||
})
|
||||
|
||||
It("logs and continues when an individual install fails (non-fatal contract)", func() {
|
||||
f := newFakeInstaller()
|
||||
f.behavior["broken"] = errors.New("gallery unreachable")
|
||||
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{"good-1", "broken", "good-2"},
|
||||
Galleries: galleries,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
|
||||
// Function must return normally despite the middle install failing.
|
||||
Expect(func() {
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
}).ToNot(Panic())
|
||||
|
||||
// Subsequent models still get attempted — failures don't short-circuit.
|
||||
Expect(f.recorded()).To(Equal([]string{"good-1", "broken", "good-2"}))
|
||||
})
|
||||
|
||||
It("does nothing when PrefetchModels is empty", func() {
|
||||
f := newFakeInstaller()
|
||||
cfg := &Config{ModelsPath: tmp, Galleries: galleries}
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
Expect(f.recorded()).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("trims whitespace and skips empty entries", func() {
|
||||
f := newFakeInstaller()
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{" llama ", "", " ", "phi"},
|
||||
Galleries: galleries,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
Expect(f.recorded()).To(Equal([]string{"llama", "phi"}))
|
||||
})
|
||||
|
||||
It("skips prefetch when LOCALAI_GALLERIES is malformed (non-fatal)", func() {
|
||||
f := newFakeInstaller()
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{"llama"},
|
||||
Galleries: `not-json`,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
Expect(f.recorded()).To(BeEmpty(), "no installer call when galleries can't be parsed")
|
||||
})
|
||||
|
||||
It("skips prefetch when no galleries are configured (non-fatal)", func() {
|
||||
f := newFakeInstaller()
|
||||
cfg := &Config{
|
||||
PrefetchModels: []string{"llama"},
|
||||
Galleries: `[]`,
|
||||
ModelsPath: tmp,
|
||||
}
|
||||
prefetchModels(context.Background(), cfg, ss, nil, nil, f.install)
|
||||
Expect(f.recorded()).To(BeEmpty())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("parseModelGalleries", func() {
|
||||
It("returns empty slice on empty input", func() {
|
||||
g, err := parseModelGalleries("")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(g).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("returns empty slice on whitespace-only input", func() {
|
||||
g, err := parseModelGalleries(" \n\t ")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(g).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("parses a valid JSON list", func() {
|
||||
g, err := parseModelGalleries(`[{"name":"localai","url":"github:mudler/LocalAI/gallery/index.yaml@master"}]`)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(g).To(HaveLen(1))
|
||||
Expect(g[0].Name).To(Equal("localai"))
|
||||
})
|
||||
|
||||
It("returns an error on malformed JSON", func() {
|
||||
_, err := parseModelGalleries(`{this is not json`)
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("normalizePrefetchList", func() {
|
||||
It("drops empty and whitespace-only entries", func() {
|
||||
got := normalizePrefetchList([]string{"a", "", " ", " b ", "\tc\n"})
|
||||
Expect(got).To(Equal([]string{"a", "b", "c"}))
|
||||
})
|
||||
|
||||
It("returns empty slice on empty input", func() {
|
||||
Expect(normalizePrefetchList(nil)).To(BeEmpty())
|
||||
Expect(normalizePrefetchList([]string{})).To(BeEmpty())
|
||||
})
|
||||
})
|
||||
@@ -50,6 +50,17 @@ func Run(ctx *cliContext.Context, cfg *Config) error {
|
||||
xlog.Warn("Failed to parse backend galleries", "error", err)
|
||||
}
|
||||
|
||||
// Prefetch gallery models over the worker's outbound internet before we
|
||||
// start accepting backend.install events. Non-fatal on every failure path:
|
||||
// if the gallery is unreachable, an ID is unknown, or LOCALAI_GALLERIES is
|
||||
// malformed, the worker still starts and the master can push files on
|
||||
// demand (existing fallback behaviour). Placed BEFORE registration so a
|
||||
// large download doesn't delay heartbeat — registration happens after.
|
||||
// Actually: keep it before registration so a worker that's still warming
|
||||
// the cache isn't yet announced as ready. The fast no-op path on a hot
|
||||
// PVC keeps restarts cheap.
|
||||
prefetchModels(context.Background(), cfg, systemState, ml, galleries, nil)
|
||||
|
||||
// Self-registration with frontend (with retry)
|
||||
regClient := &workerregistry.RegistrationClient{
|
||||
FrontendURL: cfg.RegisterTo,
|
||||
|
||||
Reference in New Issue
Block a user