Files
LocalAI/core/services/distributed/gallery.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

137 lines
5.1 KiB
Go

package distributed
import (
"fmt"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
)
// GalleryOperationRecord tracks model/backend download operations in PostgreSQL.
type GalleryOperationRecord struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
UserID string `gorm:"index;size:36" json:"user_id,omitempty"`
GalleryElementName string `gorm:"size:255" json:"gallery_element_name"`
OpType string `gorm:"size:32" json:"op_type"` // "model_install", "model_delete", "backend_install"
Status string `gorm:"size:32;default:pending" json:"status"` // pending, downloading, processing, completed, failed, cancelled
Progress float64 `json:"progress"` // 0.0 to 1.0
Message string `gorm:"type:text" json:"message,omitempty"`
Error string `gorm:"type:text" json:"error,omitempty"`
FileName string `gorm:"size:512" json:"file_name,omitempty"`
TotalFileSize string `gorm:"size:32" json:"total_file_size,omitempty"`
DownloadedFileSize string `gorm:"size:32" json:"downloaded_file_size,omitempty"`
FrontendID string `gorm:"size:36" json:"frontend_id,omitempty"` // which instance is processing
Cancellable bool `json:"cancellable"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (GalleryOperationRecord) TableName() string { return "gallery_operations" }
// GalleryStore manages gallery operation state in PostgreSQL.
type GalleryStore struct {
db *gorm.DB
}
// NewGalleryStore creates a new GalleryStore and auto-migrates.
func NewGalleryStore(db *gorm.DB) (*GalleryStore, error) {
if err := db.AutoMigrate(&GalleryOperationRecord{}); err != nil {
return nil, fmt.Errorf("migrating gallery_operations: %w", err)
}
return &GalleryStore{db: db}, nil
}
// Create stores a new gallery operation.
func (s *GalleryStore) Create(op *GalleryOperationRecord) error {
if op.ID == "" {
op.ID = uuid.New().String()
}
op.CreatedAt = time.Now()
op.UpdatedAt = op.CreatedAt
return s.db.Create(op).Error
}
// UpdateProgress updates progress for an operation.
func (s *GalleryStore) UpdateProgress(id string, progress float64, message, downloadedSize string) error {
return s.db.Model(&GalleryOperationRecord{}).Where("id = ?", id).Updates(map[string]any{
"progress": progress,
"message": message,
"downloaded_file_size": downloadedSize,
"updated_at": time.Now(),
}).Error
}
// UpdateStatus updates the status of an operation.
func (s *GalleryStore) UpdateStatus(id, status, errMsg string) error {
updates := map[string]any{
"status": status,
"updated_at": time.Now(),
}
if errMsg != "" {
updates["error"] = errMsg
}
return s.db.Model(&GalleryOperationRecord{}).Where("id = ?", id).Updates(updates).Error
}
// Get retrieves an operation by ID.
func (s *GalleryStore) Get(id string) (*GalleryOperationRecord, error) {
var op GalleryOperationRecord
if err := s.db.First(&op, "id = ?", id).Error; err != nil {
return nil, err
}
return &op, nil
}
// List returns all operations, optionally filtered by status.
func (s *GalleryStore) List(status string) ([]GalleryOperationRecord, error) {
var ops []GalleryOperationRecord
q := s.db.Order("created_at DESC")
if status != "" {
q = q.Where("status = ?", status)
}
return ops, q.Find(&ops).Error
}
// FindDuplicate checks if another instance is already downloading the same element.
// Only considers records updated within the last 30 minutes as active — older
// in-progress records are assumed to be stale (crashed instance).
func (s *GalleryStore) FindDuplicate(elementName string) (*GalleryOperationRecord, error) {
var op GalleryOperationRecord
staleCutoff := time.Now().Add(-30 * time.Minute)
err := s.db.Where("gallery_element_name = ? AND status IN ? AND updated_at > ?", elementName,
[]string{"pending", "downloading", "processing"}, staleCutoff).First(&op).Error
if err != nil {
return nil, err
}
return &op, nil
}
// Cancel marks an operation as cancelled.
func (s *GalleryStore) Cancel(id string) error {
return s.UpdateStatus(id, "cancelled", "")
}
// CleanStale marks abandoned in-progress operations as failed.
// Should be called on startup to recover from crashed instances that
// left records in pending/downloading/processing state.
func (s *GalleryStore) CleanStale(age time.Duration) error {
cutoff := time.Now().Add(-age)
return s.db.Model(&GalleryOperationRecord{}).
Where("updated_at < ? AND status IN ?", cutoff,
[]string{"pending", "downloading", "processing"}).
Updates(map[string]any{
"status": "failed",
"error": "stale operation cleaned up on startup",
"updated_at": time.Now(),
}).Error
}
// CleanOld removes operations older than the given duration.
func (s *GalleryStore) CleanOld(retention time.Duration) error {
cutoff := time.Now().Add(-retention)
return s.db.Where("created_at < ? AND status IN ?", cutoff,
[]string{"completed", "failed", "cancelled"}).
Delete(&GalleryOperationRecord{}).Error
}