diff --git a/core/http/app.go b/core/http/app.go index 2cc9c78c6..fff8a3468 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -424,10 +424,23 @@ func API(application *application.Application) (*echo.Echo, error) { // Quantization routes quantizationMw := auth.RequireFeature(application.AuthDB(), auth.FeatureQuantization) + // In distributed mode pass the shared NATS client + PostgreSQL store so + // quantization jobs stay consistent across replicas (the SyncedMap broadcasts + // mutations and hydrates from the DB); standalone passes nil for both. + var quantNats messaging.MessagingClient + var quantStore *distributed.QuantStore + if d := application.Distributed(); d != nil { + quantNats = d.Nats + if d.DistStores != nil && d.DistStores.Quant != nil { + quantStore = d.DistStores.Quant + } + } qService := quantization.NewQuantizationService( application.ApplicationConfig(), application.ModelLoader(), application.ModelConfigLoader(), + quantNats, + quantStore, ) routes.RegisterQuantizationRoutes(e, qService, application.ApplicationConfig(), quantizationMw) diff --git a/core/services/distributed/init.go b/core/services/distributed/init.go index 0ccbe5969..ac28441e2 100644 --- a/core/services/distributed/init.go +++ b/core/services/distributed/init.go @@ -11,6 +11,7 @@ import ( type Stores struct { Gallery *GalleryStore FineTune *FineTuneStore + Quant *QuantStore Skills *SkillStore } @@ -26,15 +27,21 @@ func InitStores(db *gorm.DB) (*Stores, error) { return nil, fmt.Errorf("fine-tune store: %w", err) } + quant, err := NewQuantStore(db) + if err != nil { + return nil, fmt.Errorf("quantization store: %w", err) + } + skills, err := NewSkillStore(db) if err != nil { return nil, fmt.Errorf("skills store: %w", err) } - xlog.Info("Distributed stores initialized (Gallery, FineTune, Skills)") + xlog.Info("Distributed stores initialized (Gallery, FineTune, Quant, Skills)") return &Stores{ Gallery: gallery, FineTune: ft, + Quant: quant, Skills: skills, }, nil } diff --git a/core/services/distributed/quant.go b/core/services/distributed/quant.go new file mode 100644 index 000000000..cba032f4d --- /dev/null +++ b/core/services/distributed/quant.go @@ -0,0 +1,105 @@ +package distributed + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/mudler/LocalAI/core/services/advisorylock" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +// QuantJobRecord tracks quantization jobs in PostgreSQL. The columns mirror the +// API shape (schema.QuantizationJob); the structured Config and ExtraOptions are +// serialized into JSON text columns so a record fully reconstructs the job. +type QuantJobRecord struct { + ID string `gorm:"primaryKey;size:36" json:"id"` + UserID string `gorm:"index;size:36" json:"user_id,omitempty"` + Model string `gorm:"size:255" json:"model"` + Backend string `gorm:"size:64" json:"backend"` + ModelID string `gorm:"size:255" json:"model_id,omitempty"` + QuantizationType string `gorm:"size:32" json:"quantization_type"` + Status string `gorm:"index;size:32;default:queued" json:"status"` // queued, downloading, converting, quantizing, completed, failed, stopped + Message string `gorm:"type:text" json:"message,omitempty"` + OutputDir string `gorm:"size:512" json:"output_dir,omitempty"` + OutputFile string `gorm:"size:512" json:"output_file,omitempty"` + ConfigJSON string `gorm:"column:config;type:text" json:"-"` + ExtraOptsJSON string `gorm:"column:extra_options;type:text" json:"-"` + ImportStatus string `gorm:"size:32" json:"import_status,omitempty"` + ImportMessage string `gorm:"type:text" json:"import_message,omitempty"` + ImportModelName string `gorm:"size:255" json:"import_model_name,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +func (QuantJobRecord) TableName() string { return "quantization_jobs" } + +// QuantStore manages quantization job state in PostgreSQL. +type QuantStore struct { + db *gorm.DB +} + +// NewQuantStore creates a new QuantStore and auto-migrates. +// Uses a PostgreSQL advisory lock to prevent concurrent migration races +// when multiple instances (frontend + workers) start at the same time. +func NewQuantStore(db *gorm.DB) (*QuantStore, error) { + if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { + return db.AutoMigrate(&QuantJobRecord{}) + }); err != nil { + return nil, fmt.Errorf("migrating quantization_jobs: %w", err) + } + return &QuantStore{db: db}, nil +} + +// Create stores a new quantization job. +func (s *QuantStore) Create(job *QuantJobRecord) error { + if job.ID == "" { + job.ID = uuid.New().String() + } + job.CreatedAt = time.Now() + job.UpdatedAt = job.CreatedAt + return s.db.Create(job).Error +} + +// Get retrieves a quantization job by ID. +func (s *QuantStore) Get(id string) (*QuantJobRecord, error) { + var job QuantJobRecord + if err := s.db.First(&job, "id = ?", id).Error; err != nil { + return nil, err + } + return &job, nil +} + +// ListAll returns every quantization job across all users. The SyncedMap that +// backs QuantizationService is a single global map (the REST API filters by user +// at read time), so hydrate needs the full set. +func (s *QuantStore) ListAll() ([]QuantJobRecord, error) { + var jobs []QuantJobRecord + return jobs, s.db.Order("created_at DESC").Find(&jobs).Error +} + +// Upsert idempotently inserts or fully replaces a job row by primary key. The +// SyncedMap write-through path issues a single Set per mutation regardless of +// whether the job already exists, so it needs one create-or-update primitive +// (Create alone fails on a duplicate key). +func (s *QuantStore) Upsert(job *QuantJobRecord) error { + if job.ID == "" { + job.ID = uuid.New().String() + } + now := time.Now() + if job.CreatedAt.IsZero() { + job.CreatedAt = now + } + job.UpdatedAt = now + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(job).Error +} + +// Delete removes a quantization job. +func (s *QuantStore) Delete(id string) error { + return s.db.Where("id = ?", id).Delete(&QuantJobRecord{}).Error +} diff --git a/core/services/distributed/quant_test.go b/core/services/distributed/quant_test.go new file mode 100644 index 000000000..49ae483f9 --- /dev/null +++ b/core/services/distributed/quant_test.go @@ -0,0 +1,57 @@ +package distributed_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +var _ = Describe("QuantStore", func() { + var store *distributed.QuantStore + + BeforeEach(func() { + db := testutil.SetupTestDB() + var err error + store, err = distributed.NewQuantStore(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("ListAll", func() { + It("returns jobs across all users", func() { + Expect(store.Create(&distributed.QuantJobRecord{ID: "j1", UserID: "u1", Status: "queued"})).To(Succeed()) + Expect(store.Create(&distributed.QuantJobRecord{ID: "j2", UserID: "u2", Status: "queued"})).To(Succeed()) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(2)) + }) + }) + + Describe("Upsert", func() { + It("inserts a new row", func() { + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-1", UserID: "u1", Status: "queued"})).To(Succeed()) + + got, err := store.Get("up-1") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("queued")) + }) + + It("idempotently updates an existing row on a repeated key", func() { + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-2", UserID: "u1", Status: "queued"})).To(Succeed()) + // Second Upsert with the same primary key must update, not error on a + // duplicate-key violation (this is the SyncedMap write-through contract). + Expect(store.Upsert(&distributed.QuantJobRecord{ID: "up-2", UserID: "u1", Status: "completed", Message: "done"})).To(Succeed()) + + got, err := store.Get("up-2") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + Expect(got.Message).To(Equal("done")) + + all, err := store.ListAll() + Expect(err).ToNot(HaveOccurred()) + Expect(all).To(HaveLen(1), "upsert must not create a duplicate") + }) + }) +}) diff --git a/core/services/quantization/quantization_suite_test.go b/core/services/quantization/quantization_suite_test.go new file mode 100644 index 000000000..6fabcd2f5 --- /dev/null +++ b/core/services/quantization/quantization_suite_test.go @@ -0,0 +1,13 @@ +package quantization + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestQuantization(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Quantization Suite") +} diff --git a/core/services/quantization/service.go b/core/services/quantization/service.go index 64325b97c..cd9cbcead 100644 --- a/core/services/quantization/service.go +++ b/core/services/quantization/service.go @@ -17,6 +17,9 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery/importers" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" pb "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/utils" @@ -30,26 +33,63 @@ type QuantizationService struct { modelLoader *model.ModelLoader configLoader *config.ModelConfigLoader - mu sync.Mutex - jobs map[string]*schema.QuantizationJob + // mu serializes the read-modify-write of job values. The SyncedMap guards its + // own map structure, but a job is a pointer mutated in place (e.g. the import + // goroutine), so the service still needs a lock to keep those field updates and + // the subsequent Set atomic with respect to readers. + mu sync.Mutex + + // jobs is the cross-replica job store: an in-memory map kept consistent across + // replicas via NATS, optionally read-through to PostgreSQL in distributed mode. + jobs *syncstate.SyncedMap[string, *schema.QuantizationJob] } -// NewQuantizationService creates a new QuantizationService. +// NewQuantizationService creates a new QuantizationService. In distributed mode +// pass the shared NATS client and PostgreSQL store so jobs stay consistent across +// replicas; pass nil for both in standalone mode, where the disk Loader hydrates +// the map and there is nothing to broadcast. func NewQuantizationService( appConfig *config.ApplicationConfig, modelLoader *model.ModelLoader, configLoader *config.ModelConfigLoader, + nats messaging.MessagingClient, + store *distributed.QuantStore, ) *QuantizationService { s := &QuantizationService{ appConfig: appConfig, modelLoader: modelLoader, configLoader: configLoader, - jobs: make(map[string]*schema.QuantizationJob), } - s.loadAllJobs() + + // Only attach a Store interface when a concrete store exists, otherwise the + // SyncedMap would see a non-nil interface wrapping a nil pointer and try to + // hydrate/write through a nil DB. + var syncStore syncstate.Store[string, *schema.QuantizationJob] + if store != nil { + syncStore = &quantStoreAdapter{store: store} + } + + s.jobs = syncstate.New(syncstate.Config[string, *schema.QuantizationJob]{ + Name: "quant.jobs", + Key: func(j *schema.QuantizationJob) string { return j.ID }, + Nats: nats, + Store: syncStore, + Loader: s.loadJobsFromDisk, // ignored when Store is set (distributed mode) + }) + + // Hydrate + subscribe. A hydrate failure must not take the server down: log and + // continue degraded (standalone), mirroring the FineTune/OpCache wiring. + if err := s.jobs.Start(appConfig.Context); err != nil { + xlog.Warn("Quantization SyncedMap start failed; running degraded", "error", err) + } return s } +// Close releases the SyncedMap subscription and background workers. +func (s *QuantizationService) Close() error { + return s.jobs.Close() +} + // quantizationBaseDir returns the base directory for quantization job data. func (s *QuantizationService) quantizationBaseDir() string { return filepath.Join(s.appConfig.DataPath, "quantization") @@ -80,15 +120,18 @@ func (s *QuantizationService) saveJobState(job *schema.QuantizationJob) { } } -// loadAllJobs scans the quantization directory for persisted jobs and loads them. -func (s *QuantizationService) loadAllJobs() { +// loadJobsFromDisk scans the quantization directory for persisted jobs and +// returns them. It is the SyncedMap Loader used in standalone mode (no DB); the +// returned slice hydrates the map on Start. +func (s *QuantizationService) loadJobsFromDisk(_ context.Context) ([]*schema.QuantizationJob, error) { baseDir := s.quantizationBaseDir() entries, err := os.ReadDir(baseDir) if err != nil { - // Directory doesn't exist yet — that's fine - return + // Directory doesn't exist yet — that's fine, start empty. + return nil, nil } + var jobs []*schema.QuantizationJob for _, entry := range entries { if !entry.IsDir() { continue @@ -117,12 +160,13 @@ func (s *QuantizationService) loadAllJobs() { job.ImportMessage = "Server restarted while import was running" } - s.jobs[job.ID] = &job + jobs = append(jobs, &job) } - if len(s.jobs) > 0 { - xlog.Info("Loaded persisted quantization jobs", "count", len(s.jobs)) + if len(jobs) > 0 { + xlog.Info("Loaded persisted quantization jobs", "count", len(jobs)) } + return jobs, nil } // StartJob starts a new quantization job. @@ -188,7 +232,12 @@ func (s *QuantizationService) StartJob(ctx context.Context, userID string, req s CreatedAt: time.Now().UTC().Format(time.RFC3339), Config: &req, } - s.jobs[jobID] = job + // Set write-through persists to PostgreSQL (distributed) and broadcasts to + // peer replicas; the disk state.json is written separately for restart + // recovery / standalone hydrate. + if err := s.jobs.Set(ctx, job); err != nil { + return nil, fmt.Errorf("failed to persist job: %w", err) + } s.saveJobState(job) return &schema.QuantizationJobResponse{ @@ -203,7 +252,7 @@ func (s *QuantizationService) GetJob(userID, jobID string) (*schema.Quantization s.mu.Lock() defer s.mu.Unlock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { return nil, fmt.Errorf("job not found: %s", jobID) } @@ -219,7 +268,7 @@ func (s *QuantizationService) ListJobs(userID string) []*schema.QuantizationJob defer s.mu.Unlock() var result []*schema.QuantizationJob - for _, job := range s.jobs { + for _, job := range s.jobs.List() { if userID == "" || job.UserID == userID { result = append(result, job) } @@ -235,7 +284,7 @@ func (s *QuantizationService) ListJobs(userID string) []*schema.QuantizationJob // StopJob stops a running quantization job. func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -256,6 +305,9 @@ func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) s.mu.Lock() job.Status = "stopped" job.Message = "Quantization stopped by user" + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist stopped job", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() @@ -265,7 +317,7 @@ func (s *QuantizationService) StopJob(ctx context.Context, userID, jobID string) // DeleteJob removes a quantization job and its associated data from disk. func (s *QuantizationService) DeleteJob(userID, jobID string) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -289,7 +341,11 @@ func (s *QuantizationService) DeleteJob(userID, jobID string) error { } importModelName := job.ImportModelName - delete(s.jobs, jobID) + // Delete write-through removes the DB row (distributed) and broadcasts the + // removal to peer replicas. DeleteJob has no ctx, so use Background. + if err := s.jobs.Delete(context.Background(), jobID); err != nil { + xlog.Warn("Failed to delete job from store", "job_id", jobID, "error", err) + } s.mu.Unlock() // Remove job directory (state.json, output files) @@ -324,7 +380,7 @@ func (s *QuantizationService) DeleteJob(userID, jobID string) error { // StreamProgress opens a gRPC progress stream and calls the callback for each update. func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent)) error { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return fmt.Errorf("job not found: %s", jobID) @@ -353,7 +409,7 @@ func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID }, func(update *pb.QuantizationProgressUpdate) { // Update job status and persist s.mu.Lock() - if j, ok := s.jobs[jobID]; ok { + if j, ok := s.jobs.Get(jobID); ok { // Don't let progress updates overwrite terminal states isTerminal := j.Status == "stopped" || j.Status == "completed" || j.Status == "failed" if !isTerminal { @@ -365,6 +421,9 @@ func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID if update.OutputFile != "" { j.OutputFile = update.OutputFile } + if err := s.jobs.Set(ctx, j); err != nil { + xlog.Warn("Failed to persist progress update", "job_id", jobID, "error", err) + } s.saveJobState(j) } s.mu.Unlock() @@ -399,7 +458,7 @@ func sanitizeQuantModelName(s string) string { // ImportModel imports a quantized model into LocalAI asynchronously. func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID string, req schema.QuantizationImportRequest) (string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", fmt.Errorf("job not found: %s", jobID) @@ -459,6 +518,9 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str job.ImportStatus = "importing" job.ImportMessage = "" job.ImportModelName = "" + if err := s.jobs.Set(ctx, job); err != nil { + xlog.Warn("Failed to persist import start", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() @@ -514,10 +576,15 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str xlog.Info("Quantized model imported and registered", "job_id", jobID, "model_name", modelName) + // Runs after the HTTP request returns, so use Background rather than the + // (now likely cancelled) request ctx for the write-through. s.mu.Lock() job.ImportStatus = "completed" job.ImportModelName = modelName job.ImportMessage = "" + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import completion", "job_id", jobID, "error", err) + } s.saveJobState(job) s.mu.Unlock() }() @@ -525,10 +592,14 @@ func (s *QuantizationService) ImportModel(ctx context.Context, userID, jobID str return modelName, nil } -// setImportMessage updates the import message and persists the job state. +// setImportMessage updates the import message and persists the job state. Called +// from the background import goroutine, so it uses Background for write-through. func (s *QuantizationService) setImportMessage(job *schema.QuantizationJob, msg string) { s.mu.Lock() job.ImportMessage = msg + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import message", "job_id", job.ID, "error", err) + } s.saveJobState(job) s.mu.Unlock() } @@ -539,6 +610,9 @@ func (s *QuantizationService) setImportFailed(job *schema.QuantizationJob, messa s.mu.Lock() job.ImportStatus = "failed" job.ImportMessage = message + if err := s.jobs.Set(context.Background(), job); err != nil { + xlog.Warn("Failed to persist import failure", "job_id", job.ID, "error", err) + } s.saveJobState(job) s.mu.Unlock() } @@ -546,7 +620,7 @@ func (s *QuantizationService) setImportFailed(job *schema.QuantizationJob, messa // GetOutputPath returns the path to the quantized model file and a download name. func (s *QuantizationService) GetOutputPath(userID, jobID string) (string, string, error) { s.mu.Lock() - job, ok := s.jobs[jobID] + job, ok := s.jobs.Get(jobID) if !ok { s.mu.Unlock() return "", "", fmt.Errorf("job not found: %s", jobID) diff --git a/core/services/quantization/service_test.go b/core/services/quantization/service_test.go new file mode 100644 index 000000000..665728614 --- /dev/null +++ b/core/services/quantization/service_test.go @@ -0,0 +1,187 @@ +package quantization + +// White-box tests (package quantization) so a spec can drive the service's +// internal SyncedMap the same way StartJob does (via jobs.Set) without standing +// up a quantization backend, then assert the cross-replica reads +// (GetJob/ListJobs) and the adapter conversions that keep REST responses +// byte-for-byte unchanged. + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// newTestService builds a standalone QuantizationService wired to the given bus. +// The model/config loaders are nil because the read/sync paths under test never +// touch them; the data dir is a throwaway temp dir so the disk Loader finds +// nothing. +func newTestService(bus *testutil.FakeBus) *QuantizationService { + appConfig := &config.ApplicationConfig{ + Context: context.Background(), + DataPath: GinkgoT().TempDir(), + } + return NewQuantizationService(appConfig, nil, nil, bus, nil) +} + +var _ = Describe("QuantizationService", func() { + ctx := context.Background() + + Describe("cross-replica job visibility", func() { + var ( + bus *testutil.FakeBus + a, b *QuantizationService + ) + + BeforeEach(func() { + // One shared bus, two replicas: exactly the distributed topology where a + // round-robin request may land on a replica that did not originate the + // change. + bus = testutil.NewFakeBus() + a = newTestService(bus) + b = newTestService(bus) + }) + + AfterEach(func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }) + + It("makes a job created on A visible via B's GetJob and ListJobs", func() { + job := &schema.QuantizationJob{ID: "job-1", UserID: "user-1", Status: "queued", CreatedAt: "2026-06-27T10:00:00Z"} + // StartJob persists via jobs.Set; drive that directly to avoid a backend. + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-1") + Expect(err).ToNot(HaveOccurred(), "B must see a job A just created") + Expect(got.Status).To(Equal("queued")) + + listed := b.ListJobs("user-1") + Expect(listed).To(HaveLen(1)) + Expect(listed[0].ID).To(Equal("job-1")) + }) + + It("removes a job from B when it is deleted on A", func() { + job := &schema.QuantizationJob{ID: "job-2", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + _, err := b.GetJob("user-1", "job-2") + Expect(err).ToNot(HaveOccurred(), "precondition: B must have the job before the delete") + + Expect(a.jobs.Delete(ctx, "job-2")).To(Succeed()) + + _, err = b.GetJob("user-1", "job-2") + Expect(err).To(HaveOccurred(), "a delete on A must remove the job from B") + }) + + It("propagates a status update from A to B", func() { + job := &schema.QuantizationJob{ID: "job-3", UserID: "user-1", Status: "quantizing", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, job)).To(Succeed()) + + updated := &schema.QuantizationJob{ID: "job-3", UserID: "user-1", Status: "completed", CreatedAt: "2026-06-27T10:00:00Z"} + Expect(a.jobs.Set(ctx, updated)).To(Succeed()) + + got, err := b.GetJob("user-1", "job-3") + Expect(err).ToNot(HaveOccurred()) + Expect(got.Status).To(Equal("completed")) + }) + }) + + Describe("ListJobs", func() { + var svc *QuantizationService + + BeforeEach(func() { + svc = newTestService(testutil.NewFakeBus()) + }) + AfterEach(func() { Expect(svc.Close()).To(Succeed()) }) + + It("filters by user and sorts newest-first", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "old", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "new", UserID: "u1", CreatedAt: "2026-06-27T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "other", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + jobs := svc.ListJobs("u1") + Expect(jobs).To(HaveLen(2), "only u1's jobs") + Expect(jobs[0].ID).To(Equal("new"), "newest first") + Expect(jobs[1].ID).To(Equal("old")) + }) + + It("returns every user's jobs when the userID filter is empty", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "a", UserID: "u1", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "b", UserID: "u2", CreatedAt: "2026-06-26T10:00:00Z"})).To(Succeed()) + + Expect(svc.ListJobs("")).To(HaveLen(2)) + }) + + It("rejects GetJob for a job owned by another user", func() { + Expect(svc.jobs.Set(ctx, &schema.QuantizationJob{ID: "x", UserID: "owner", CreatedAt: "2026-06-25T10:00:00Z"})).To(Succeed()) + + _, err := svc.GetJob("intruder", "x") + Expect(err).To(HaveOccurred(), "a different user must not read someone else's job") + }) + }) + + Describe("store adapter conversion", func() { + // The SyncedMap value type is *schema.QuantizationJob (the exact REST shape). + // These specs prove the DB adapter round-trips it losslessly, so hydrate and + // write-through in distributed mode keep responses unchanged. + It("round-trips a job through jobToRecord/recordToJob preserving the API shape", func() { + original := &schema.QuantizationJob{ + ID: "rt-1", + UserID: "user-1", + Model: "base-model", + Backend: "llama-cpp-quantization", + ModelID: "llama-cpp-quantization-quantize-rt-1", + QuantizationType: "q4_k_m", + Status: "completed", + Message: "done", + OutputDir: "/data/quantization/rt-1", + OutputFile: "/data/quantization/rt-1/model.gguf", + ExtraOptions: map[string]string{"hf_token": "secret"}, + CreatedAt: "2026-06-27T10:00:00Z", + ImportStatus: "completed", + ImportMessage: "", + ImportModelName: "base-model-q4_k_m-rt-1", + Config: &schema.QuantizationJobRequest{Model: "base-model", Backend: "llama-cpp-quantization", QuantizationType: "q4_k_m"}, + } + + rec := jobToRecord(original) + Expect(rec.ID).To(Equal("rt-1")) + Expect(rec.ConfigJSON).ToNot(BeEmpty(), "structured config must serialize into the JSON column") + Expect(rec.ExtraOptsJSON).ToNot(BeEmpty()) + + back := recordToJob(rec) + Expect(back.ID).To(Equal(original.ID)) + Expect(back.UserID).To(Equal(original.UserID)) + Expect(back.Model).To(Equal(original.Model)) + Expect(back.Backend).To(Equal(original.Backend)) + Expect(back.ModelID).To(Equal(original.ModelID)) + Expect(back.QuantizationType).To(Equal(original.QuantizationType)) + Expect(back.Status).To(Equal(original.Status)) + Expect(back.Message).To(Equal(original.Message)) + Expect(back.OutputDir).To(Equal(original.OutputDir)) + Expect(back.OutputFile).To(Equal(original.OutputFile)) + Expect(back.ImportStatus).To(Equal(original.ImportStatus)) + Expect(back.ImportModelName).To(Equal(original.ImportModelName)) + Expect(back.CreatedAt).To(Equal(original.CreatedAt)) + Expect(back.ExtraOptions).To(Equal(original.ExtraOptions)) + Expect(back.Config).ToNot(BeNil()) + Expect(back.Config.QuantizationType).To(Equal("q4_k_m")) + }) + }) + + Describe("compile-time adapter contract", func() { + It("satisfies syncstate.Store for *distributed.QuantStore", func() { + // Guards against drift between the adapter and the component interface; + // the var assertion in syncstore.go covers it at build time, this keeps + // the type referenced from a spec too. + var _ *distributed.QuantStore + Expect(&quantStoreAdapter{}).ToNot(BeNil()) + }) + }) +}) diff --git a/core/services/quantization/syncstore.go b/core/services/quantization/syncstore.go new file mode 100644 index 000000000..4201c5ca6 --- /dev/null +++ b/core/services/quantization/syncstore.go @@ -0,0 +1,114 @@ +package quantization + +import ( + "context" + "encoding/json" + "time" + + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/syncstate" +) + +// quantStoreAdapter bridges the distributed PostgreSQL QuantStore to the generic +// syncstate.Store the SyncedMap consumes. It is only wired in distributed mode; +// standalone leaves Store nil and hydrates from disk via a Loader instead. +// +// The SyncedMap value type is *schema.QuantizationJob (the exact shape the REST +// API returns) so reads need no conversion and the response JSON is provably +// unchanged. The adapter is the single place that translates between that API +// shape and the DB QuantJobRecord. +type quantStoreAdapter struct { + store *distributed.QuantStore +} + +// compile-time assertion that the adapter satisfies the component's Store. +var _ syncstate.Store[string, *schema.QuantizationJob] = (*quantStoreAdapter)(nil) + +func (a *quantStoreAdapter) List(_ context.Context) ([]*schema.QuantizationJob, error) { + records, err := a.store.ListAll() + if err != nil { + return nil, err + } + jobs := make([]*schema.QuantizationJob, 0, len(records)) + for i := range records { + jobs = append(jobs, recordToJob(&records[i])) + } + return jobs, nil +} + +func (a *quantStoreAdapter) Upsert(_ context.Context, job *schema.QuantizationJob) error { + return a.store.Upsert(jobToRecord(job)) +} + +func (a *quantStoreAdapter) Delete(_ context.Context, id string) error { + return a.store.Delete(id) +} + +// recordToJob maps a persisted DB record back to the API shape, reconstructing +// the structured Config / ExtraOptions from their JSON columns. +func recordToJob(r *distributed.QuantJobRecord) *schema.QuantizationJob { + job := &schema.QuantizationJob{ + ID: r.ID, + UserID: r.UserID, + Model: r.Model, + Backend: r.Backend, + ModelID: r.ModelID, + QuantizationType: r.QuantizationType, + Status: r.Status, + Message: r.Message, + OutputDir: r.OutputDir, + OutputFile: r.OutputFile, + ImportStatus: r.ImportStatus, + ImportMessage: r.ImportMessage, + ImportModelName: r.ImportModelName, + CreatedAt: r.CreatedAt.UTC().Format(time.RFC3339), + } + if r.ExtraOptsJSON != "" { + // Best-effort: a malformed column must not drop the whole job from the API. + _ = json.Unmarshal([]byte(r.ExtraOptsJSON), &job.ExtraOptions) + } + if r.ConfigJSON != "" { + var cfg schema.QuantizationJobRequest + if err := json.Unmarshal([]byte(r.ConfigJSON), &cfg); err == nil { + job.Config = &cfg + } + } + return job +} + +// jobToRecord maps the API shape to a DB record for write-through, serializing +// the structured Config / ExtraOptions into their JSON columns. CreatedAt is +// parsed back from the RFC3339 string the service stamps; an unparseable value is +// left zero so QuantStore.Upsert stamps "now". +func jobToRecord(job *schema.QuantizationJob) *distributed.QuantJobRecord { + rec := &distributed.QuantJobRecord{ + ID: job.ID, + UserID: job.UserID, + Model: job.Model, + Backend: job.Backend, + ModelID: job.ModelID, + QuantizationType: job.QuantizationType, + Status: job.Status, + Message: job.Message, + OutputDir: job.OutputDir, + OutputFile: job.OutputFile, + ImportStatus: job.ImportStatus, + ImportMessage: job.ImportMessage, + ImportModelName: job.ImportModelName, + } + if job.Config != nil { + if data, err := json.Marshal(job.Config); err == nil { + rec.ConfigJSON = string(data) + } + } + if job.ExtraOptions != nil { + if data, err := json.Marshal(job.ExtraOptions); err == nil { + rec.ExtraOptsJSON = string(data) + } + } + if t, err := time.Parse(time.RFC3339, job.CreatedAt); err == nil { + rec.CreatedAt = t + } + return rec +}