fix(distributed): stage directory-based models to remote nodes (#10175)

Distributed file-staging treated every model path field (ModelFile, etc.)
as a single regular file: it os.Open'd the path and streamed its fd as the
HTTP PUT body. For directory-based models — e.g. qwen3-tts-cpp, whose
weights and tokenizer ggufs live under one directory referenced by
parameters.model — opening the directory succeeds but reading its fd
returns EISDIR, so routing the model to a remote NATS worker failed with
"read /models/<model>: is a directory". Single-file models were unaffected,
so only multi-file pipelines (e.g. the realtime TTS stage) broke.

stageModelFiles now detects a directory path field and stages each
contained file individually (via the new stageDirectory helper), preserving
structure with the existing StagingKeyMapper and rewriting the field to the
remote directory (deriving ModelPath as before). countStageableFiles makes
the progress total count a directory's files so the staging tracker stays
accurate.

Assisted-by: Claude:claude-opus-4-8 go vet

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-06-04 18:05:38 +02:00
committed by GitHub
parent 994063ba9a
commit 92726f7631
2 changed files with 165 additions and 5 deletions

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
@@ -932,13 +933,12 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op
{"AudioPath", &opts.AudioPath},
}
// Count stageable files for progress tracking
// Count stageable files for progress tracking. Directory models expand to
// the number of files they contain, matching what stageDirectory uploads.
totalFiles := 0
for _, f := range fields {
if *f.val != "" {
if _, err := os.Stat(*f.val); err == nil {
totalFiles++
}
totalFiles += countStageableFiles(*f.val)
}
}
for _, adapter := range opts.LoraAdapters {
@@ -969,8 +969,33 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op
*f.val = ""
continue
}
fileIdx++
localPath := *f.val
// Directory models (e.g. qwen3-tts-cpp ships its weights and tokenizer
// ggufs under one directory) can't be uploaded as a single file — the
// stager would open the directory and read its fd, failing with
// "is a directory" (EISDIR). Expand the directory and stage each
// contained file, then rewrite the field to the remote directory.
if fi, statErr := os.Stat(localPath); statErr == nil && fi.IsDir() {
remoteDir, dirErr := r.stageDirectory(ctx, node, trackingKey, localPath, keyMapper, &fileIdx, totalFiles)
if dirErr != nil {
if f.name == "ModelFile" {
xlog.Error("Failed to stage model directory for remote node", "node", node.Name, "field", f.name, "path", localPath, "error", dirErr)
return nil, fmt.Errorf("staging model file: %w", dirErr)
}
xlog.Warn("Failed to stage model directory, clearing field", "field", f.name, "path", localPath, "error", dirErr)
*f.val = ""
continue
}
*f.val = remoteDir
if f.name == "ModelFile" && opts.Model != "" {
opts.ModelPath = DeriveRemoteModelPath(remoteDir, opts.Model)
xlog.Debug("Derived remote ModelPath", "modelPath", opts.ModelPath)
}
continue
}
fileIdx++
key := keyMapper.Key(localPath)
// Attach progress callback to context for byte-level tracking
@@ -1074,6 +1099,77 @@ func (r *SmartRouter) withStagingCallback(ctx context.Context, trackingKey, file
})
}
// countStageableFiles returns the number of regular files a model path expands
// to for staging: 1 for a regular file, the contained file count for a
// directory, and 0 if the path does not exist.
func countStageableFiles(path string) int {
fi, err := os.Stat(path)
if err != nil {
return 0
}
if !fi.IsDir() {
return 1
}
n := 0
_ = filepath.WalkDir(path, func(_ string, d fs.DirEntry, walkErr error) error {
if walkErr != nil {
return nil
}
if !d.IsDir() {
n++
}
return nil
})
return n
}
// stageDirectory stages every file under a directory-based model (e.g.
// qwen3-tts-cpp, whose weights and tokenizer ggufs live in one directory).
// Each file is uploaded individually with a structure-preserving key; the
// returned path is the remote directory that contained them, suitable for the
// backend's ModelFile/ModelPath. fileIdx is advanced per staged file so the
// staging progress tracker stays accurate.
func (r *SmartRouter) stageDirectory(ctx context.Context, node *BackendNode, trackingKey, dir string, keyMapper *StagingKeyMapper, fileIdx *int, totalFiles int) (string, error) {
var remoteDir string
err := filepath.WalkDir(dir, func(path string, d fs.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
if d.IsDir() {
return nil
}
*fileIdx++
fileName := filepath.Base(path)
stageCtx := r.withStagingCallback(ctx, trackingKey, fileName, *fileIdx, totalFiles)
xlog.Info("Staging file", "model", trackingKey, "node", node.Name, "field", "ModelDir", "file", fileName, "fileIndex", *fileIdx, "totalFiles", totalFiles)
remoteFile, err := r.fileStager.EnsureRemote(stageCtx, node.ID, path, keyMapper.Key(path))
if err != nil {
return fmt.Errorf("staging %s: %w", path, err)
}
r.stagingTracker.FileComplete(trackingKey, *fileIdx, totalFiles)
// Every file under dir shares the same remote parent directory; derive
// it from this file's staged path and its path relative to dir.
rel, relErr := filepath.Rel(dir, path)
if relErr != nil {
return relErr
}
remoteDir = DeriveRemoteModelPath(remoteFile, rel)
r.stageCompanionFiles(ctx, node, path, keyMapper.Key)
return nil
})
if err != nil {
return "", err
}
if remoteDir == "" {
return "", fmt.Errorf("model directory %s contains no files", dir)
}
return remoteDir, nil
}
// stageCompanionFiles stages known companion files that exist alongside
// localPath. For example, piper TTS implicitly loads ".onnx.json" next to
// the ".onnx" model file. Errors are logged but not propagated.

View File

@@ -0,0 +1,64 @@
package nodes
import (
"context"
"os"
"path/filepath"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
)
// These tests cover staging of "directory models" — models whose ModelFile is a
// directory containing multiple files (e.g. qwen3-tts-cpp ships weights +
// tokenizer ggufs under one directory). The HTTP file stager uploads a single
// regular file per path, so a directory ModelFile must be expanded into its
// constituent files; otherwise the upload reads a directory fd and fails with
// "is a directory" (EISDIR) on remote NATS worker nodes.
var _ = Describe("stageModelFiles directory models", func() {
var (
stager *fakeFileStager
router *SmartRouter
node *BackendNode
tmp string
modelID = "qwen3-tts-cpp"
)
BeforeEach(func() {
stager = &fakeFileStager{}
router = &SmartRouter{
fileStager: stager,
stagingTracker: NewStagingTracker(),
}
node = &BackendNode{ID: "node-1", Name: "node-1", Address: "10.0.0.1:50051"}
tmp = GinkgoT().TempDir()
})
It("stages every file inside a directory ModelFile instead of the directory path", func() {
modelDir := filepath.Join(tmp, "models", modelID)
Expect(os.MkdirAll(modelDir, 0o755)).To(Succeed())
weights := filepath.Join(modelDir, "qwen3-tts-0.6b-f16.gguf")
tokenizer := filepath.Join(modelDir, "qwen3-tts-tokenizer-f16.gguf")
Expect(os.WriteFile(weights, []byte("weights"), 0o644)).To(Succeed())
Expect(os.WriteFile(tokenizer, []byte("tokenizer"), 0o644)).To(Succeed())
opts := &pb.ModelOptions{
Model: modelID,
ModelFile: modelDir,
}
_, err := router.stageModelFiles(context.Background(), node, opts, "track-key")
Expect(err).ToNot(HaveOccurred())
staged := make([]string, 0, len(stager.ensureCalls))
for _, c := range stager.ensureCalls {
staged = append(staged, c.localPath)
}
// Each contained file is staged individually; the directory path itself
// is never handed to the stager (which would read a directory fd).
Expect(staged).To(ConsistOf(weights, tokenizer))
Expect(staged).ToNot(ContainElement(modelDir))
})
})