mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
256 lines
7.4 KiB
Go
256 lines
7.4 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/mudler/xlog"
|
|
"golang.org/x/sync/singleflight"
|
|
)
|
|
|
|
// FileManager provides a unified file access layer that abstracts over
|
|
// local filesystem and object storage (S3). In distributed mode, files
|
|
// are stored in S3 with local caching on each node. In single-node mode,
|
|
// it operates directly on the filesystem.
|
|
type FileManager struct {
|
|
store ObjectStore
|
|
cacheDir string // local cache directory for downloaded files
|
|
flight singleflight.Group
|
|
}
|
|
|
|
// NewFileManager creates a new FileManager.
|
|
// If store is nil, all operations fall through to local filesystem only.
|
|
func NewFileManager(store ObjectStore, cacheDir string) (*FileManager, error) {
|
|
if cacheDir != "" {
|
|
if err := os.MkdirAll(cacheDir, 0750); err != nil {
|
|
return nil, fmt.Errorf("creating cache directory %s: %w", cacheDir, err)
|
|
}
|
|
}
|
|
return &FileManager{
|
|
store: store,
|
|
cacheDir: cacheDir,
|
|
}, nil
|
|
}
|
|
|
|
// Upload stores a file in object storage under the given key.
|
|
// The file is read from the local path.
|
|
func (fm *FileManager) Upload(ctx context.Context, key, localPath string) error {
|
|
if fm.store == nil {
|
|
return nil // no-op in single-node mode
|
|
}
|
|
|
|
f, err := os.Open(localPath)
|
|
if err != nil {
|
|
return fmt.Errorf("opening %s for upload: %w", localPath, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
if err := fm.store.Put(ctx, key, f); err != nil {
|
|
return fmt.Errorf("uploading %s to %s: %w", localPath, key, err)
|
|
}
|
|
|
|
xlog.Debug("Uploaded file to object storage", "key", key, "localPath", localPath)
|
|
return nil
|
|
}
|
|
|
|
// Download retrieves a file from object storage and caches it locally.
|
|
// Returns the local file path. If the file is already cached, returns immediately.
|
|
func (fm *FileManager) Download(ctx context.Context, key string) (string, error) {
|
|
if fm.store == nil {
|
|
return "", fmt.Errorf("no object store configured")
|
|
}
|
|
|
|
localPath, err := fm.cachePath(key)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Fast path: check local cache without any locking
|
|
if _, err := os.Stat(localPath); err == nil {
|
|
xlog.Debug("File found in local cache", "key", key, "path", localPath)
|
|
return localPath, nil
|
|
}
|
|
|
|
// singleflight deduplicates concurrent downloads for the same key
|
|
v, err, _ := fm.flight.Do(key, func() (any, error) {
|
|
// Re-check cache (another goroutine may have just finished)
|
|
if _, err := os.Stat(localPath); err == nil {
|
|
return localPath, nil
|
|
}
|
|
|
|
r, err := fm.store.Get(ctx, key)
|
|
if err != nil {
|
|
return "", fmt.Errorf("downloading %s: %w", key, err)
|
|
}
|
|
defer r.Close()
|
|
|
|
if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil {
|
|
return "", fmt.Errorf("creating cache dir for %s: %w", key, err)
|
|
}
|
|
|
|
tmpPath := localPath + ".tmp"
|
|
f, err := os.Create(tmpPath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating temp file for %s: %w", key, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
if _, err := io.Copy(f, r); err != nil {
|
|
os.Remove(tmpPath)
|
|
return "", fmt.Errorf("writing %s to cache: %w", key, err)
|
|
}
|
|
|
|
if err := os.Rename(tmpPath, localPath); err != nil {
|
|
os.Remove(tmpPath)
|
|
return "", fmt.Errorf("renaming temp file for %s: %w", key, err)
|
|
}
|
|
|
|
xlog.Debug("Downloaded file from object storage", "key", key, "path", localPath)
|
|
return localPath, nil
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
path, ok := v.(string)
|
|
if !ok {
|
|
return "", fmt.Errorf("unexpected singleflight result type %T", v)
|
|
}
|
|
return path, nil
|
|
}
|
|
|
|
// Head returns metadata about an object in storage without downloading it.
|
|
func (fm *FileManager) Head(ctx context.Context, key string) (*ObjectMeta, error) {
|
|
if fm.store == nil {
|
|
return nil, fmt.Errorf("no object store configured")
|
|
}
|
|
return fm.store.Head(ctx, key)
|
|
}
|
|
|
|
// Exists checks if a file exists in object storage.
|
|
func (fm *FileManager) Exists(ctx context.Context, key string) (bool, error) {
|
|
if fm.store == nil {
|
|
return false, nil
|
|
}
|
|
return fm.store.Exists(ctx, key)
|
|
}
|
|
|
|
// Delete removes a file from object storage and the local cache.
|
|
func (fm *FileManager) Delete(ctx context.Context, key string) error {
|
|
if fm.store == nil {
|
|
return nil
|
|
}
|
|
|
|
// Remove from local cache
|
|
localPath, err := fm.cachePath(key)
|
|
if err == nil {
|
|
os.Remove(localPath)
|
|
}
|
|
|
|
return fm.store.Delete(ctx, key)
|
|
}
|
|
|
|
// List returns keys matching the given prefix from object storage.
|
|
func (fm *FileManager) List(ctx context.Context, prefix string) ([]string, error) {
|
|
if fm.store == nil {
|
|
return nil, nil
|
|
}
|
|
return fm.store.List(ctx, prefix)
|
|
}
|
|
|
|
// CacheExists checks if a file is in the local cache.
|
|
func (fm *FileManager) CacheExists(key string) bool {
|
|
p, err := fm.cachePath(key)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
_, err = os.Stat(p)
|
|
return err == nil
|
|
}
|
|
|
|
// CachePath returns the local cache path for a key.
|
|
// Returns an error if the key would escape the cache directory.
|
|
func (fm *FileManager) CachePath(key string) (string, error) {
|
|
return fm.cachePath(key)
|
|
}
|
|
|
|
// EvictCache removes a file from the local cache (but keeps it in object storage).
|
|
func (fm *FileManager) EvictCache(key string) error {
|
|
p, err := fm.cachePath(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return os.Remove(p)
|
|
}
|
|
|
|
// IsConfigured returns true if an object store is configured.
|
|
func (fm *FileManager) IsConfigured() bool {
|
|
return fm.store != nil
|
|
}
|
|
|
|
func (fm *FileManager) cachePath(key string) (string, error) {
|
|
// Convert key to safe filesystem path
|
|
safe := strings.ReplaceAll(key, "/", string(filepath.Separator))
|
|
full := filepath.Clean(filepath.Join(fm.cacheDir, safe))
|
|
if !strings.HasPrefix(full, filepath.Clean(fm.cacheDir)+string(filepath.Separator)) {
|
|
return "", fmt.Errorf("key %q escapes cache directory", key)
|
|
}
|
|
return full, nil
|
|
}
|
|
|
|
// EphemeralKey returns an S3 key for ephemeral (per-request) files.
|
|
func EphemeralKey(requestID, category, filename string) string {
|
|
return "ephemeral/" + category + "/" + requestID + "/" + filename
|
|
}
|
|
|
|
// --- Namespace helpers for organizing files in object storage ---
|
|
|
|
// ModelKeyPrefix is the key prefix used for model files in object storage
|
|
// and HTTP file transfer routing.
|
|
const ModelKeyPrefix = "models/"
|
|
|
|
// DataKeyPrefix is the key prefix used for data files (e.g. quantization output)
|
|
// in object storage and HTTP file transfer routing.
|
|
const DataKeyPrefix = "data/"
|
|
|
|
// ModelKey returns the object storage key for a model file.
|
|
func ModelKey(modelName string) string {
|
|
return ModelKeyPrefix + modelName
|
|
}
|
|
|
|
// DataKey returns the object storage key for a data file.
|
|
func DataKey(name string) string {
|
|
return DataKeyPrefix + name
|
|
}
|
|
|
|
// UserAssetKey returns the object storage key for a user asset.
|
|
func UserAssetKey(userID, filename string) string {
|
|
return "users/" + userID + "/assets/" + filename
|
|
}
|
|
|
|
// UserOutputKey returns the object storage key for a user output file.
|
|
func UserOutputKey(userID, filename string) string {
|
|
return "users/" + userID + "/outputs/" + filename
|
|
}
|
|
|
|
// FineTuneDatasetKey returns the object storage key for a fine-tune dataset.
|
|
func FineTuneDatasetKey(jobID, filename string) string {
|
|
return "finetune/datasets/" + jobID + "/" + filename
|
|
}
|
|
|
|
// FineTuneCheckpointKey returns the object storage key for a fine-tune checkpoint.
|
|
func FineTuneCheckpointKey(jobID, checkpoint string) string {
|
|
return "finetune/" + jobID + "/checkpoints/" + checkpoint
|
|
}
|
|
|
|
// SkillKey returns the object storage key for a skill file.
|
|
func SkillKey(userID, skillName, filename string) string {
|
|
if userID != "" {
|
|
return "skills/" + userID + "/" + skillName + "/" + filename
|
|
}
|
|
return "skills/global/" + skillName + "/" + filename
|
|
}
|