diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index ec783b283..f28200314 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/fs" "os" "path/filepath" "strings" @@ -932,13 +933,12 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op {"AudioPath", &opts.AudioPath}, } - // Count stageable files for progress tracking + // Count stageable files for progress tracking. Directory models expand to + // the number of files they contain, matching what stageDirectory uploads. totalFiles := 0 for _, f := range fields { if *f.val != "" { - if _, err := os.Stat(*f.val); err == nil { - totalFiles++ - } + totalFiles += countStageableFiles(*f.val) } } for _, adapter := range opts.LoraAdapters { @@ -969,8 +969,33 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op *f.val = "" continue } - fileIdx++ localPath := *f.val + + // Directory models (e.g. qwen3-tts-cpp ships its weights and tokenizer + // ggufs under one directory) can't be uploaded as a single file — the + // stager would open the directory and read its fd, failing with + // "is a directory" (EISDIR). Expand the directory and stage each + // contained file, then rewrite the field to the remote directory. + if fi, statErr := os.Stat(localPath); statErr == nil && fi.IsDir() { + remoteDir, dirErr := r.stageDirectory(ctx, node, trackingKey, localPath, keyMapper, &fileIdx, totalFiles) + if dirErr != nil { + if f.name == "ModelFile" { + xlog.Error("Failed to stage model directory for remote node", "node", node.Name, "field", f.name, "path", localPath, "error", dirErr) + return nil, fmt.Errorf("staging model file: %w", dirErr) + } + xlog.Warn("Failed to stage model directory, clearing field", "field", f.name, "path", localPath, "error", dirErr) + *f.val = "" + continue + } + *f.val = remoteDir + if f.name == "ModelFile" && opts.Model != "" { + opts.ModelPath = DeriveRemoteModelPath(remoteDir, opts.Model) + xlog.Debug("Derived remote ModelPath", "modelPath", opts.ModelPath) + } + continue + } + + fileIdx++ key := keyMapper.Key(localPath) // Attach progress callback to context for byte-level tracking @@ -1074,6 +1099,77 @@ func (r *SmartRouter) withStagingCallback(ctx context.Context, trackingKey, file }) } +// countStageableFiles returns the number of regular files a model path expands +// to for staging: 1 for a regular file, the contained file count for a +// directory, and 0 if the path does not exist. +func countStageableFiles(path string) int { + fi, err := os.Stat(path) + if err != nil { + return 0 + } + if !fi.IsDir() { + return 1 + } + n := 0 + _ = filepath.WalkDir(path, func(_ string, d fs.DirEntry, walkErr error) error { + if walkErr != nil { + return nil + } + if !d.IsDir() { + n++ + } + return nil + }) + return n +} + +// stageDirectory stages every file under a directory-based model (e.g. +// qwen3-tts-cpp, whose weights and tokenizer ggufs live in one directory). +// Each file is uploaded individually with a structure-preserving key; the +// returned path is the remote directory that contained them, suitable for the +// backend's ModelFile/ModelPath. fileIdx is advanced per staged file so the +// staging progress tracker stays accurate. +func (r *SmartRouter) stageDirectory(ctx context.Context, node *BackendNode, trackingKey, dir string, keyMapper *StagingKeyMapper, fileIdx *int, totalFiles int) (string, error) { + var remoteDir string + err := filepath.WalkDir(dir, func(path string, d fs.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + + *fileIdx++ + fileName := filepath.Base(path) + stageCtx := r.withStagingCallback(ctx, trackingKey, fileName, *fileIdx, totalFiles) + xlog.Info("Staging file", "model", trackingKey, "node", node.Name, "field", "ModelDir", "file", fileName, "fileIndex", *fileIdx, "totalFiles", totalFiles) + + remoteFile, err := r.fileStager.EnsureRemote(stageCtx, node.ID, path, keyMapper.Key(path)) + if err != nil { + return fmt.Errorf("staging %s: %w", path, err) + } + r.stagingTracker.FileComplete(trackingKey, *fileIdx, totalFiles) + + // Every file under dir shares the same remote parent directory; derive + // it from this file's staged path and its path relative to dir. + rel, relErr := filepath.Rel(dir, path) + if relErr != nil { + return relErr + } + remoteDir = DeriveRemoteModelPath(remoteFile, rel) + + r.stageCompanionFiles(ctx, node, path, keyMapper.Key) + return nil + }) + if err != nil { + return "", err + } + if remoteDir == "" { + return "", fmt.Errorf("model directory %s contains no files", dir) + } + return remoteDir, nil +} + // stageCompanionFiles stages known companion files that exist alongside // localPath. For example, piper TTS implicitly loads ".onnx.json" next to // the ".onnx" model file. Errors are logged but not propagated. diff --git a/core/services/nodes/router_dirstage_test.go b/core/services/nodes/router_dirstage_test.go new file mode 100644 index 000000000..19c36759f --- /dev/null +++ b/core/services/nodes/router_dirstage_test.go @@ -0,0 +1,64 @@ +package nodes + +import ( + "context" + "os" + "path/filepath" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + pb "github.com/mudler/LocalAI/pkg/grpc/proto" +) + +// These tests cover staging of "directory models" — models whose ModelFile is a +// directory containing multiple files (e.g. qwen3-tts-cpp ships weights + +// tokenizer ggufs under one directory). The HTTP file stager uploads a single +// regular file per path, so a directory ModelFile must be expanded into its +// constituent files; otherwise the upload reads a directory fd and fails with +// "is a directory" (EISDIR) on remote NATS worker nodes. +var _ = Describe("stageModelFiles directory models", func() { + var ( + stager *fakeFileStager + router *SmartRouter + node *BackendNode + tmp string + modelID = "qwen3-tts-cpp" + ) + + BeforeEach(func() { + stager = &fakeFileStager{} + router = &SmartRouter{ + fileStager: stager, + stagingTracker: NewStagingTracker(), + } + node = &BackendNode{ID: "node-1", Name: "node-1", Address: "10.0.0.1:50051"} + tmp = GinkgoT().TempDir() + }) + + It("stages every file inside a directory ModelFile instead of the directory path", func() { + modelDir := filepath.Join(tmp, "models", modelID) + Expect(os.MkdirAll(modelDir, 0o755)).To(Succeed()) + weights := filepath.Join(modelDir, "qwen3-tts-0.6b-f16.gguf") + tokenizer := filepath.Join(modelDir, "qwen3-tts-tokenizer-f16.gguf") + Expect(os.WriteFile(weights, []byte("weights"), 0o644)).To(Succeed()) + Expect(os.WriteFile(tokenizer, []byte("tokenizer"), 0o644)).To(Succeed()) + + opts := &pb.ModelOptions{ + Model: modelID, + ModelFile: modelDir, + } + + _, err := router.stageModelFiles(context.Background(), node, opts, "track-key") + Expect(err).ToNot(HaveOccurred()) + + staged := make([]string, 0, len(stager.ensureCalls)) + for _, c := range stager.ensureCalls { + staged = append(staged, c.localPath) + } + // Each contained file is staged individually; the directory path itself + // is never handed to the stager (which would read a directory fd). + Expect(staged).To(ConsistOf(weights, tokenizer)) + Expect(staged).ToNot(ContainElement(modelDir)) + }) +})