Files
LocalAI/core/services/distributed/finetune.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

106 lines
3.9 KiB
Go

package distributed
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/mudler/LocalAI/core/services/advisorylock"
"gorm.io/gorm"
)
// FineTuneJobRecord tracks fine-tune jobs in PostgreSQL.
type FineTuneJobRecord struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
UserID string `gorm:"index;size:36" json:"user_id,omitempty"`
Model string `gorm:"size:255" json:"model"`
Backend string `gorm:"size:64" json:"backend"`
ModelID string `gorm:"size:255" json:"model_id,omitempty"`
TrainingType string `gorm:"size:32" json:"training_type"` // lora, loha, lokr, full
TrainingMethod string `gorm:"size:32" json:"training_method"` // sft, dpo, grpo, etc.
Status string `gorm:"index;size:32;default:queued" json:"status"` // queued, loading_model, training, saving, completed, failed, stopped
Message string `gorm:"type:text" json:"message,omitempty"`
OutputDir string `gorm:"size:512" json:"output_dir,omitempty"`
ConfigJSON string `gorm:"column:config;type:text" json:"-"`
ExtraOptsJSON string `gorm:"column:extra_options;type:text" json:"-"`
BackendNodeID string `gorm:"size:36" json:"backend_node_id,omitempty"` // which backend node runs it
ExportStatus string `gorm:"size:32" json:"export_status,omitempty"`
ExportMessage string `gorm:"type:text" json:"export_message,omitempty"`
ExportModelName string `gorm:"size:255" json:"export_model_name,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (FineTuneJobRecord) TableName() string { return "finetune_jobs" }
// FineTuneStore manages fine-tune job state in PostgreSQL.
type FineTuneStore struct {
db *gorm.DB
}
// NewFineTuneStore creates a new FineTuneStore and auto-migrates.
// Uses a PostgreSQL advisory lock to prevent concurrent migration races
// when multiple instances (frontend + workers) start at the same time.
func NewFineTuneStore(db *gorm.DB) (*FineTuneStore, error) {
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
return db.AutoMigrate(&FineTuneJobRecord{})
}); err != nil {
return nil, fmt.Errorf("migrating finetune_jobs: %w", err)
}
return &FineTuneStore{db: db}, nil
}
// Create stores a new fine-tune job.
func (s *FineTuneStore) Create(job *FineTuneJobRecord) error {
if job.ID == "" {
job.ID = uuid.New().String()
}
job.CreatedAt = time.Now()
job.UpdatedAt = job.CreatedAt
return s.db.Create(job).Error
}
// Get retrieves a fine-tune job by ID.
func (s *FineTuneStore) Get(id string) (*FineTuneJobRecord, error) {
var job FineTuneJobRecord
if err := s.db.First(&job, "id = ?", id).Error; err != nil {
return nil, err
}
return &job, nil
}
// List returns fine-tune jobs, optionally filtered by user.
func (s *FineTuneStore) List(userID string) ([]FineTuneJobRecord, error) {
var jobs []FineTuneJobRecord
q := s.db.Order("created_at DESC")
if userID != "" {
q = q.Where("user_id = ?", userID)
}
return jobs, q.Find(&jobs).Error
}
// UpdateStatus updates the status and message of a fine-tune job.
func (s *FineTuneStore) UpdateStatus(id, status, message string) error {
return s.db.Model(&FineTuneJobRecord{}).Where("id = ?", id).Updates(map[string]any{
"status": status,
"message": message,
"updated_at": time.Now(),
}).Error
}
// UpdateExportStatus updates the export state of a fine-tune job.
func (s *FineTuneStore) UpdateExportStatus(id, status, message, modelName string) error {
return s.db.Model(&FineTuneJobRecord{}).Where("id = ?", id).Updates(map[string]any{
"export_status": status,
"export_message": message,
"export_model_name": modelName,
"updated_at": time.Now(),
}).Error
}
// Delete removes a fine-tune job.
func (s *FineTuneStore) Delete(id string) error {
return s.db.Where("id = ?", id).Delete(&FineTuneJobRecord{}).Error
}