mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-11 10:19:33 -04:00
* fix(distributed): detect backend upgrades across worker nodes
Before this change `DistributedBackendManager.CheckUpgrades` delegated to the
local manager, which read backends from the frontend filesystem. In
distributed deployments the frontend has no backends installed locally —
they live on workers — so the upgrade-detection loop never ran and the UI
silently never surfaced upgrades even when the gallery advertised newer
versions or digests.
Worker-side: NATS backend.list reply now carries Version, URI and Digest
for each installed backend (read from metadata.json).
Frontend-side: DistributedBackendManager.ListBackends aggregates per-node
refs (name, status, version, digest) instead of deduping, and CheckUpgrades
feeds that aggregation into gallery.CheckUpgradesAgainst — a new entrypoint
factored out of CheckBackendUpgrades so both paths share the same core
logic.
Cluster drift policy: when per-node version/digest tuples disagree, the
backend is flagged upgradeable regardless of whether any single node
matches the gallery, and UpgradeInfo.NodeDrift enumerates the outliers so
operators can see *why* it is out of sync. The next upgrade-all realigns
the cluster.
Tests cover: drift detection, unanimous-match (no upgrade), and the
empty-installed-version path that the old distributed code silently
missed.
* feat(ui): surface backend upgrades in the System page
The System page (Manage.jsx) only showed updates as a tiny inline arrow,
so operators routinely missed them. Port the Backend Gallery's upgrade UX
so System speaks the same visual language:
- Yellow banner at the top of the Backends tab when upgrades are pending,
with an "Upgrade all" button (serial fan-out, matches the gallery) and a
"Updates only" filter toggle.
- Warning pill (↑ N) next to the tab label so the count is glanceable even
when the banner is scrolled out of view.
- Per-row labeled "Upgrade to vX.Y" button (replaces the icon-only button
that silently flipped semantics between Reinstall and Upgrade), plus an
"Update available" badge in the new Version column.
- New columns: Version (with upgrade + drift chips), Nodes (per-node
attribution badges for distributed mode, degrading to a compact
"on N nodes · M offline" chip above three nodes), Installed (relative
time).
- System backends render a "Protected" chip instead of a bare "—" so rows
still align and the reason is obvious.
- Delete uses the softer btn-danger-ghost so rows don't scream red; the
ConfirmDialog still owns the "are you sure".
The upgrade checker also needed the same per-worker fix as the previous
commit: NewUpgradeChecker now takes a BackendManager getter so its
periodic runs call the distributed CheckUpgrades (which asks workers)
instead of the empty frontend filesystem. Without this the /api/backends/
upgrades endpoint stayed empty in distributed mode even with the protocol
change in place.
New CSS primitives — .upgrade-banner, .tab-pill, .badge-row, .cell-stack,
.cell-mono, .cell-muted, .row-actions, .btn-danger-ghost — all live in
App.css so other pages can adopt them without duplicating styles.
* feat(ui): polish the Nodes page so it reads like a product
The Nodes page was the biggest visual liability in distributed mode.
Rework the main dashboard surfaces in place without changing behavior:
StatCards: uniform height (96px min), left accent bar colored by the
metric's semantic (success/warning/error/primary), icon lives in a
36x36 soft-tinted chip top-right, value is left-aligned and large.
Grid auto-fills so the row doesn't collapse on narrow viewports. This
replaces the previous thin-bordered boxes with inconsistent heights.
Table rows: expandable rows now show a chevron cue on the left (rotates
on expand) so users know rows open. Status cell became a dedicated chip
with an LED-style halo dot instead of a bare bullet. Action buttons gained
labels — "Approve", "Resume", "Drain" — so the icons aren't doing all
the semantic work; the destructive remove action uses the softer
btn-danger-ghost variant so rows don't scream red, with the ConfirmDialog
still owning the real "are you sure". Applied cell-mono/cell-muted
utility classes so label chips and addresses share one spacing/font
grammar instead of re-declaring inline styles everywhere.
Expanded drawer: empty states for Loaded Models and Installed Backends
now render as a proper drawer-empty card (dashed border, icon, one-line
hint) instead of a plain muted string that read like broken formatting.
Tabs: three inline-styled buttons became the shared .tab class so they
inherit focus ring, hover state, and the rest of the design system —
matches the System page.
"Add more workers" toggle turned into a .nodes-add-worker dashed-border
button labelled "Register a new worker" (action voice) instead of a
chevron + muted link that operators kept mistaking for broken text.
New shared CSS primitives carry over to other pages:
.stat-grid + .stat-card, .row-chevron, .node-status, .drawer-empty,
.nodes-add-worker.
* feat(distributed): durable backend fan-out + state reconciliation
Two connected problems handled together:
1) Backend delete/install/upgrade used to silently skip non-healthy nodes,
so a delete during an outage left a zombie on the offline node once it
returned. The fan-out now records intent in a new pending_backend_ops
table before attempting the NATS round-trip. Currently-healthy nodes
get an immediate attempt; everyone else is queued. Unique index on
(node_id, backend, op) means reissuing the same operation refreshes
next_retry_at instead of stacking duplicates.
2) Loaded-model state could drift from reality: a worker OOM'd, got
killed, or restarted a backend process would leave a node_models row
claiming the model was still loaded, feeding ghost entries into the
/api/nodes/models listing and the router's scheduling decisions.
The existing ReplicaReconciler gains two new passes that run under a
fresh KeyStateReconciler advisory lock (non-blocking, so one wedged
frontend doesn't freeze the cluster):
- drainPendingBackendOps: retries queued ops whose next_retry_at has
passed on currently-healthy nodes. Success deletes the row; failure
bumps attempts and pushes next_retry_at out with exponential backoff
(30s → 15m cap). ErrNoResponders also marks the node unhealthy.
- probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are
loaded but hasn't seen touched in the last probeStaleAfter (2m).
Unreachable addresses are removed from the registry. A pluggable
ModelProber lets tests substitute a fake without standing up gRPC.
DistributedBackendManager exposes DeleteBackendDetailed so the HTTP
handler can surface per-node outcomes ("2 succeeded, 1 queued") to the
UI in a follow-up commit; the existing DeleteBackend still returns
error-only for callers that don't care about node breakdown.
Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx
on a new key so N frontends coordinate — the same pattern the health
monitor and replica reconciler already rely on. Single-node mode runs
both passes inline (adapter is nil, state drain is a no-op).
Tests cover the upsert semantics, backoff math, the probe removing an
unreachable model but keeping a reachable one, and filtering by
probeStaleAfter.
* feat(ui): show cluster distribution of models in the System page
When a frontend restarted in distributed mode, models that workers had
already loaded weren't visible until the operator clicked into each node
manually — the /api/models/capabilities endpoint only knew about
configs on the frontend's filesystem, not the registry-backed truth.
/api/models/capabilities now joins in ListAllLoadedModels() when the
registry is active, returning loaded_on[] with node id/name/state/status
for each model. Models that live in the registry but lack a local config
(the actual ghosts, not recovered from the frontend's file cache) still
surface with source="registry-only" so operators can see and persist
them; without that emission they'd be invisible to this frontend.
Manage → Models replaces the old Running/Idle pill with a distribution
cell that lists the first three nodes the model is loaded on as chips
colored by state (green loaded, blue loading, amber anything else). On
wider clusters the remaining count collapses into a +N chip with a
title-attribute breakdown. Disabled / single-node behavior unchanged.
Adopted models get an extra "Adopted" ghost-icon chip with hover copy
explaining what it means and how to make it permanent.
Distributed mode also enables a 10s auto-refresh and a "Last synced Xs
ago" indicator next to the Update button so ghost rows drop off within
one reconcile tick after their owning process dies. Non-distributed
mode is untouched — no polling, no cell-stack, same old Running/Idle.
* feat(ui): NodeDistributionChip — shared per-node attribution component
Large clusters were going to break the Manage → Backends Nodes column:
the old inline logic rendered every node as a badge and would shred the
layout at >10 workers, plus the Manage → Models distribution cell had
copy-pasted its own slightly-different version.
NodeDistributionChip handles any cluster size with two render modes:
- small (≤3 nodes): inline chips of node names, colored by health.
- large: a single "on N nodes · M offline · K drift" summary chip;
clicking opens a Popover with a per-node table (name, status,
version, digest for backends; name, status, state for models).
Drift counting mirrors the backend's summarizeNodeDrift so the UI
number matches UpgradeInfo.NodeDrift. Digests are truncated to the
docker-style 12-char form with the full value preserved in the title.
Popover is a new general-purpose primitive: fixed positioning anchored
to the trigger, flips above when there's no room below, closes on
outside-click or Escape, returns focus to the trigger. Uses .card as
its surface so theming is inherited. Also useful for a future
labels-editor popup and the user menu.
Manage.jsx drops its duplicated inline Nodes-column + loaded_on cell
and uses the shared chip with context="backends" / "models"
respectively. Delete code removes ~40 lines of ad-hoc logic.
* feat(ui): shared FilterBar across the System page tabs
The Backends gallery had a nice search + chip + toggle strip; the System
page had nothing, so the two surfaces felt like different apps. Lift the
pattern into a reusable FilterBar and wire both System tabs through it.
New component core/http/react-ui/src/components/FilterBar.jsx renders a
search input, a role="tablist" chip row (aria-selected for a11y), and
optional toggles / right slot. Chips support an optional `count` which
the System page uses to show "User 3", "Updates 1" etc.
System Models tab: search by id or backend; chips for
All/Running/Idle/Disabled/Pinned plus a conditional Distributed chip in
distributed mode. "Last synced" + Update button live in the right slot.
System Backends tab: search by name/alias/meta-backend-for; chips for
All/User/System/Meta plus conditional Updates / Offline-nodes chips
when relevant. The old ad-hoc "Updates only" toggle from the upgrade
banner folded into the Updates chip — one source of truth for that
filter. Offline chip only appears in distributed mode when at least
one backend has an unhealthy node, so the chip row stays quiet on
healthy clusters.
Filter state persists in URL query params (mq/mf/bq/bf) so deep links
and tab switches keep the operator's filter context instead of
resetting every time.
Also adds an "Adopted" distribution path: when a model in
/api/models/capabilities carries source="registry-only" (discovered on
a worker but not configured locally), the Models tab shows a ghost chip
labelled "Adopted" with hover copy explaining how to persist it — this
is what closes the loop on the ghost-model story end-to-end.
1090 lines
42 KiB
Go
1090 lines
42 KiB
Go
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/mudler/LocalAI/core/services/advisorylock"
|
|
"github.com/mudler/xlog"
|
|
"gorm.io/gorm"
|
|
"gorm.io/gorm/clause"
|
|
)
|
|
|
|
// BackendNode represents a remote worker node.
|
|
// Workers are generic — they don't have a fixed backend type.
|
|
// The SmartRouter dynamically installs backends via NATS backend.install events.
|
|
type BackendNode struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
Name string `gorm:"uniqueIndex;size:255" json:"name"`
|
|
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
|
|
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
|
|
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
|
|
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
|
|
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
|
|
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
|
|
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
|
|
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
|
|
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
|
|
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
|
|
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
|
|
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
|
|
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
const (
|
|
NodeTypeBackend = "backend"
|
|
NodeTypeAgent = "agent"
|
|
|
|
StatusHealthy = "healthy"
|
|
StatusPending = "pending"
|
|
StatusOffline = "offline"
|
|
StatusDraining = "draining"
|
|
StatusUnhealthy = "unhealthy"
|
|
|
|
// Column names (must match gorm:"column:" tags on BackendNode)
|
|
ColAvailableVRAM = "available_vram"
|
|
ColTotalVRAM = "total_vram"
|
|
ColAvailableRAM = "available_ram"
|
|
ColGPUVendor = "gpu_vendor"
|
|
ColLastHeartbeat = "last_heartbeat"
|
|
)
|
|
|
|
// NodeModel tracks which models are loaded on which nodes.
|
|
type NodeModel struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
NodeID string `gorm:"index;size:36" json:"node_id"`
|
|
ModelName string `gorm:"index;size:255" json:"model_name"`
|
|
Address string `gorm:"size:255" json:"address"` // gRPC address for this model's backend process
|
|
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
|
|
InFlight int `json:"in_flight"` // number of active requests
|
|
LastUsed time.Time `json:"last_used"`
|
|
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
|
|
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
|
|
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// NodeLabel is a key-value label on a node (like K8s labels).
|
|
type NodeLabel struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
NodeID string `gorm:"uniqueIndex:idx_node_label;size:36" json:"node_id"`
|
|
Key string `gorm:"uniqueIndex:idx_node_label;size:128" json:"key"`
|
|
Value string `gorm:"size:255" json:"value"`
|
|
}
|
|
|
|
// ModelSchedulingConfig defines how a model should be scheduled across the cluster.
|
|
// All fields are optional:
|
|
// - NodeSelector only → constrain nodes, single replica
|
|
// - MinReplicas/MaxReplicas only → auto-scale on any node
|
|
// - Both → auto-scale on matching nodes
|
|
// - Neither → no-op (default behavior)
|
|
//
|
|
// Auto-scaling is enabled when MinReplicas > 0 or MaxReplicas > 0.
|
|
type ModelSchedulingConfig struct {
|
|
ID string `gorm:"primaryKey;size:36" json:"id"`
|
|
ModelName string `gorm:"uniqueIndex;size:255" json:"model_name"`
|
|
NodeSelector string `gorm:"type:text" json:"node_selector,omitempty"` // JSON {"key":"value",...}
|
|
MinReplicas int `gorm:"default:0" json:"min_replicas"`
|
|
MaxReplicas int `gorm:"default:0" json:"max_replicas"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// NodeWithExtras extends BackendNode with computed fields for list views.
|
|
type NodeWithExtras struct {
|
|
BackendNode
|
|
ModelCount int `json:"model_count"`
|
|
InFlightCount int `json:"in_flight_count"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
}
|
|
|
|
// PendingBackendOp is a durable intent for a backend lifecycle operation
|
|
// (delete/install/upgrade) that needs to eventually apply on a specific node.
|
|
//
|
|
// Without this table, a backend delete against an offline node silently
|
|
// dropped: the frontend skipped the node, the node came back later with the
|
|
// backend still installed, and the operator saw a zombie. Now the intent is
|
|
// recorded regardless of node status; the state reconciler drains the queue
|
|
// whenever a node is healthy and removes the row on success. Reissuing the
|
|
// same operation while a row exists updates NextRetryAt instead of stacking
|
|
// duplicates (see the unique index).
|
|
type PendingBackendOp struct {
|
|
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
|
|
NodeID string `gorm:"index;size:36;not null;uniqueIndex:idx_pending_backend_op,priority:1" json:"node_id"`
|
|
Backend string `gorm:"index;size:255;not null;uniqueIndex:idx_pending_backend_op,priority:2" json:"backend"`
|
|
Op string `gorm:"size:16;not null;uniqueIndex:idx_pending_backend_op,priority:3" json:"op"`
|
|
Galleries []byte `gorm:"type:bytea" json:"-"` // serialized JSON for install/upgrade retries
|
|
Attempts int `gorm:"default:0" json:"attempts"`
|
|
LastError string `gorm:"type:text" json:"last_error,omitempty"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
NextRetryAt time.Time `gorm:"index" json:"next_retry_at"`
|
|
}
|
|
|
|
// Op constants mirror the operation names used by DistributedBackendManager
|
|
// so callers don't repeat stringly-typed values.
|
|
const (
|
|
OpBackendDelete = "delete"
|
|
OpBackendInstall = "install"
|
|
OpBackendUpgrade = "upgrade"
|
|
)
|
|
|
|
// NodeRegistry manages backend node registration and lookup in PostgreSQL.
|
|
type NodeRegistry struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
// NewNodeRegistry creates a NodeRegistry and auto-migrates the schema.
|
|
// Uses a PostgreSQL advisory lock to prevent concurrent migration races
|
|
// when multiple instances (frontend + workers) start at the same time.
|
|
func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
|
|
if err := advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
|
|
return db.AutoMigrate(&BackendNode{}, &NodeModel{}, &NodeLabel{}, &ModelSchedulingConfig{}, &PendingBackendOp{})
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("migrating node tables: %w", err)
|
|
}
|
|
return &NodeRegistry{db: db}, nil
|
|
}
|
|
|
|
// Register adds or updates a backend node.
|
|
// If autoApprove is true, the node goes directly to "healthy" status.
|
|
// If false, new nodes start in "pending" status and must be approved by an admin.
|
|
// On re-registration (same name), previously approved nodes return to "healthy";
|
|
// nodes that were never approved stay in "pending".
|
|
func (r *NodeRegistry) Register(ctx context.Context, node *BackendNode, autoApprove bool) error {
|
|
node.LastHeartbeat = time.Now()
|
|
|
|
// Try to find existing node by name
|
|
var existing BackendNode
|
|
err := r.db.WithContext(ctx).Where("name = ?", node.Name).First(&existing).Error
|
|
if err == nil {
|
|
// Re-registration (node restart): preserve ID, respect approval history
|
|
node.ID = existing.ID
|
|
if autoApprove || existing.Status != StatusPending {
|
|
// Auto-approve enabled, or node was previously approved — restore healthy
|
|
node.Status = StatusHealthy
|
|
} else {
|
|
// Node was never approved — keep pending
|
|
node.Status = StatusPending
|
|
}
|
|
if err := r.db.WithContext(ctx).Model(&existing).Updates(node).Error; err != nil {
|
|
return fmt.Errorf("updating node %s: %w", node.Name, err)
|
|
}
|
|
// Preserve auth references from existing record.
|
|
// GORM Updates(struct) skips zero-value fields, so the DB retains
|
|
// the old auth_user_id/api_key_id but the caller's struct is empty.
|
|
// Copy them back so the caller can revoke old credentials on re-registration.
|
|
if node.AuthUserID == "" {
|
|
node.AuthUserID = existing.AuthUserID
|
|
}
|
|
if node.APIKeyID == "" {
|
|
node.APIKeyID = existing.APIKeyID
|
|
}
|
|
// Clear stale model records — the node restarted and has nothing loaded
|
|
if err := r.db.WithContext(ctx).Where("node_id = ?", existing.ID).Delete(&NodeModel{}).Error; err != nil {
|
|
xlog.Warn("Failed to clear stale model records on re-register", "node", node.Name, "error", err)
|
|
}
|
|
} else if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
// Create new node
|
|
if node.ID == "" {
|
|
node.ID = uuid.New().String()
|
|
}
|
|
if autoApprove {
|
|
node.Status = StatusHealthy
|
|
} else {
|
|
node.Status = StatusPending
|
|
}
|
|
if err := r.db.WithContext(ctx).Create(node).Error; err != nil {
|
|
return fmt.Errorf("creating node %s: %w", node.Name, err)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("looking up node %s: %w", node.Name, err)
|
|
}
|
|
|
|
xlog.Info("Node registered", "name", node.Name, "address", node.Address, "status", node.Status)
|
|
return nil
|
|
}
|
|
|
|
// UpdateAuthRefs stores the auto-provisioned user and API key IDs on a node.
|
|
func (r *NodeRegistry) UpdateAuthRefs(ctx context.Context, nodeID, authUserID, apiKeyID string) error {
|
|
return r.db.WithContext(ctx).Model(&BackendNode{}).Where("id = ?", nodeID).Updates(map[string]any{
|
|
"auth_user_id": authUserID,
|
|
"api_key_id": apiKeyID,
|
|
}).Error
|
|
}
|
|
|
|
// ApproveNode sets a pending node's status to healthy.
|
|
func (r *NodeRegistry) ApproveNode(ctx context.Context, nodeID string) error {
|
|
result := r.db.WithContext(ctx).Model(&BackendNode{}).
|
|
Where("id = ? AND status = ?", nodeID, StatusPending).
|
|
Update("status", StatusHealthy)
|
|
if result.Error != nil {
|
|
return fmt.Errorf("approving node %s: %w", nodeID, result.Error)
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return fmt.Errorf("node %s not found or not in pending status", nodeID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setStatus updates a node's status column in the database.
|
|
func (r *NodeRegistry) setStatus(ctx context.Context, nodeID, status string) error {
|
|
result := r.db.WithContext(ctx).Model(&BackendNode{}).
|
|
Where("id = ?", nodeID).Update("status", status)
|
|
if result.Error != nil {
|
|
return fmt.Errorf("setting node %s to %s: %w", nodeID, status, result.Error)
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return fmt.Errorf("node %s not found", nodeID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarkOffline sets a node to offline status and clears its model records.
|
|
// Used on graceful shutdown — preserves the node row so re-registration
|
|
// can restore the previous approval status.
|
|
func (r *NodeRegistry) MarkOffline(ctx context.Context, nodeID string) error {
|
|
if err := r.setStatus(ctx, nodeID, StatusOffline); err != nil {
|
|
return err
|
|
}
|
|
// Clear model records — node is shutting down
|
|
if err := r.db.WithContext(ctx).Where("node_id = ?", nodeID).Delete(&NodeModel{}).Error; err != nil {
|
|
xlog.Warn("Failed to clear model records on offline", "node", nodeID, "error", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FindNodeWithVRAM returns healthy nodes with at least minBytes available VRAM,
|
|
// ordered idle-first then least-loaded.
|
|
func (r *NodeRegistry) FindNodeWithVRAM(ctx context.Context, minBytes uint64) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
loadedModels := db.Model(&NodeModel{}).
|
|
Select("node_id").
|
|
Where("state = ?", "loaded").
|
|
Group("node_id")
|
|
|
|
subquery := db.Model(&NodeModel{}).
|
|
Select("node_id, COALESCE(SUM(in_flight), 0) as total_inflight").
|
|
Group("node_id")
|
|
|
|
// Try idle nodes with enough VRAM first, prefer the one with most free VRAM
|
|
var node BackendNode
|
|
err := db.Where("status = ? AND node_type = ? AND available_vram >= ? AND id NOT IN (?)", StatusHealthy, NodeTypeBackend, minBytes, loadedModels).
|
|
Order("available_vram DESC").
|
|
First(&node).Error
|
|
if err == nil {
|
|
return &node, nil
|
|
}
|
|
|
|
// Fall back to least-loaded nodes with enough VRAM, prefer most free VRAM as tiebreaker
|
|
err = db.Where("status = ? AND node_type = ? AND available_vram >= ?", StatusHealthy, NodeTypeBackend, minBytes).
|
|
Joins("LEFT JOIN (?) AS load ON load.node_id = backend_nodes.id", subquery).
|
|
Order("COALESCE(load.total_inflight, 0) ASC, backend_nodes.available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("no healthy nodes with %d bytes available VRAM: %w", minBytes, err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// Deregister removes a backend node, its model associations, and any auto-provisioned auth credentials.
|
|
func (r *NodeRegistry) Deregister(ctx context.Context, nodeID string) error {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
var node BackendNode
|
|
if err := db.Where("id = ?", nodeID).First(&node).Error; err != nil {
|
|
return fmt.Errorf("node %s not found: %w", nodeID, err)
|
|
}
|
|
|
|
return db.Transaction(func(tx *gorm.DB) error {
|
|
if err := tx.Where("node_id = ?", nodeID).Delete(&NodeModel{}).Error; err != nil {
|
|
return fmt.Errorf("deleting node models for %s: %w", nodeID, err)
|
|
}
|
|
if err := tx.Where("id = ?", nodeID).Delete(&BackendNode{}).Error; err != nil {
|
|
return fmt.Errorf("deleting node %s: %w", nodeID, err)
|
|
}
|
|
// Clean up auto-provisioned auth user (cascades to API keys via FK)
|
|
if node.AuthUserID != "" {
|
|
if err := tx.Exec("DELETE FROM users WHERE id = ?", node.AuthUserID).Error; err != nil {
|
|
xlog.Warn("Failed to clean up agent worker user", "node", node.Name, "userID", node.AuthUserID, "error", err)
|
|
// non-fatal: don't rollback the whole deregistration for auth cleanup
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// HeartbeatUpdate contains optional fields to update on heartbeat.
|
|
type HeartbeatUpdate struct {
|
|
AvailableVRAM *uint64 `json:"available_vram,omitempty"`
|
|
TotalVRAM *uint64 `json:"total_vram,omitempty"`
|
|
AvailableRAM *uint64 `json:"available_ram,omitempty"`
|
|
GPUVendor string `json:"gpu_vendor,omitempty"`
|
|
}
|
|
|
|
// Heartbeat updates the heartbeat timestamp and status for a node.
|
|
// Nodes in "pending" or "offline" status stay in their current status —
|
|
// they must be approved or re-register respectively.
|
|
func (r *NodeRegistry) Heartbeat(ctx context.Context, nodeID string, update *HeartbeatUpdate) error {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
updates := map[string]any{
|
|
ColLastHeartbeat: time.Now(),
|
|
}
|
|
|
|
if update != nil {
|
|
if update.AvailableVRAM != nil {
|
|
updates[ColAvailableVRAM] = *update.AvailableVRAM
|
|
}
|
|
if update.TotalVRAM != nil {
|
|
updates[ColTotalVRAM] = *update.TotalVRAM
|
|
}
|
|
if update.AvailableRAM != nil {
|
|
updates[ColAvailableRAM] = *update.AvailableRAM
|
|
}
|
|
if update.GPUVendor != "" {
|
|
updates[ColGPUVendor] = update.GPUVendor
|
|
}
|
|
}
|
|
|
|
// Only update all fields (including status promotion) for active nodes.
|
|
// Pending and offline nodes must go through approval or re-registration.
|
|
result := db.Model(&BackendNode{}).
|
|
Where("id = ? AND status NOT IN ?", nodeID, []string{StatusPending, StatusOffline}).
|
|
Updates(updates)
|
|
if result.Error != nil {
|
|
return fmt.Errorf("heartbeat for %s: %w", nodeID, result.Error)
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
// May be pending or offline — still update heartbeat timestamp
|
|
result = db.Model(&BackendNode{}).Where("id = ?", nodeID).Update(ColLastHeartbeat, time.Now())
|
|
if result.Error != nil {
|
|
return fmt.Errorf("heartbeat for %s: %w", nodeID, result.Error)
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return fmt.Errorf("node %s not found", nodeID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List returns all registered nodes.
|
|
func (r *NodeRegistry) List(ctx context.Context) ([]BackendNode, error) {
|
|
var nodes []BackendNode
|
|
if err := r.db.WithContext(ctx).Order("name").Find(&nodes).Error; err != nil {
|
|
return nil, fmt.Errorf("listing nodes: %w", err)
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
// Get returns a single node by ID.
|
|
func (r *NodeRegistry) Get(ctx context.Context, nodeID string) (*BackendNode, error) {
|
|
var node BackendNode
|
|
if err := r.db.WithContext(ctx).First(&node, "id = ?", nodeID).Error; err != nil {
|
|
return nil, fmt.Errorf("getting node %s: %w", nodeID, err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// GetByName returns a single node by name.
|
|
func (r *NodeRegistry) GetByName(ctx context.Context, name string) (*BackendNode, error) {
|
|
var node BackendNode
|
|
if err := r.db.WithContext(ctx).First(&node, "name = ?", name).Error; err != nil {
|
|
return nil, fmt.Errorf("getting node by name %s: %w", name, err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// MarkUnhealthy sets a node status to unhealthy.
|
|
func (r *NodeRegistry) MarkUnhealthy(ctx context.Context, nodeID string) error {
|
|
return r.setStatus(ctx, nodeID, StatusUnhealthy)
|
|
}
|
|
|
|
// MarkHealthy sets a node status to healthy.
|
|
func (r *NodeRegistry) MarkHealthy(ctx context.Context, nodeID string) error {
|
|
return r.setStatus(ctx, nodeID, StatusHealthy)
|
|
}
|
|
|
|
// MarkDraining sets a node status to draining (no new requests).
|
|
func (r *NodeRegistry) MarkDraining(ctx context.Context, nodeID string) error {
|
|
return r.setStatus(ctx, nodeID, StatusDraining)
|
|
}
|
|
|
|
// FindStaleNodes returns nodes that haven't sent a heartbeat within the given threshold.
|
|
// Excludes unhealthy, offline, and pending nodes since they're not actively participating.
|
|
func (r *NodeRegistry) FindStaleNodes(ctx context.Context, threshold time.Duration) ([]BackendNode, error) {
|
|
var nodes []BackendNode
|
|
cutoff := time.Now().Add(-threshold)
|
|
if err := r.db.WithContext(ctx).Where("last_heartbeat < ? AND status NOT IN ?", cutoff,
|
|
[]string{StatusUnhealthy, StatusOffline, StatusPending}).
|
|
Find(&nodes).Error; err != nil {
|
|
return nil, fmt.Errorf("finding stale nodes: %w", err)
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
// --- NodeModel operations ---
|
|
|
|
// SetNodeModel records that a model is loaded on a node.
|
|
func (r *NodeRegistry) SetNodeModel(ctx context.Context, nodeID, modelName, state, address string, initialInFlight int) error {
|
|
now := time.Now()
|
|
// Use Attrs for creation-only fields (ID) and Assign for update-only fields.
|
|
// Attrs is applied only when creating a new record. Assign is applied on
|
|
// both create and update. This prevents overwriting the primary key on
|
|
// subsequent calls for the same node+model.
|
|
var nm NodeModel
|
|
result := r.db.WithContext(ctx).Where("node_id = ? AND model_name = ?", nodeID, modelName).
|
|
Attrs(NodeModel{ID: uuid.New().String(), NodeID: nodeID, ModelName: modelName}).
|
|
Assign(map[string]any{"address": address, "state": state, "last_used": now, "in_flight": initialInFlight}).
|
|
FirstOrCreate(&nm)
|
|
return result.Error
|
|
}
|
|
|
|
// SetNodeModelLoadInfo stores the backend type and serialized model options on
|
|
// an existing NodeModel record. This metadata is used by the reconciler to
|
|
// replicate model loads during scale-up.
|
|
func (r *NodeRegistry) SetNodeModelLoadInfo(ctx context.Context, nodeID, modelName, backendType string, optsBlob []byte) error {
|
|
return r.db.WithContext(ctx).Model(&NodeModel{}).
|
|
Where("node_id = ? AND model_name = ?", nodeID, modelName).
|
|
Updates(map[string]any{"backend_type": backendType, "model_opts_blob": optsBlob}).Error
|
|
}
|
|
|
|
// GetModelLoadInfo retrieves the stored backend type and serialized model
|
|
// options from any existing loaded replica. Returns gorm.ErrRecordNotFound
|
|
// if no replica has stored options.
|
|
func (r *NodeRegistry) GetModelLoadInfo(ctx context.Context, modelName string) (backendType string, optsBlob []byte, err error) {
|
|
var nm NodeModel
|
|
err = r.db.WithContext(ctx).
|
|
Where("model_name = ? AND state = ? AND model_opts_blob IS NOT NULL", modelName, "loaded").
|
|
First(&nm).Error
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
return nm.BackendType, nm.ModelOptsBlob, nil
|
|
}
|
|
|
|
// RemoveNodeModel removes a model association from a node.
|
|
func (r *NodeRegistry) RemoveNodeModel(ctx context.Context, nodeID, modelName string) error {
|
|
return r.db.WithContext(ctx).Where("node_id = ? AND model_name = ?", nodeID, modelName).
|
|
Delete(&NodeModel{}).Error
|
|
}
|
|
|
|
// FindNodesWithModel returns nodes that have the given model loaded.
|
|
func (r *NodeRegistry) FindNodesWithModel(ctx context.Context, modelName string) ([]BackendNode, error) {
|
|
var nodes []BackendNode
|
|
if err := r.db.WithContext(ctx).Joins("JOIN node_models ON node_models.node_id = backend_nodes.id").
|
|
Where("node_models.model_name = ? AND node_models.state = ? AND backend_nodes.status = ?",
|
|
modelName, "loaded", StatusHealthy).
|
|
Order("node_models.in_flight ASC").
|
|
Find(&nodes).Error; err != nil {
|
|
return nil, fmt.Errorf("finding nodes with model %s: %w", modelName, err)
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
// FindAndLockNodeWithModel atomically finds the least-loaded node with the given
|
|
// model loaded and increments its in-flight counter within a single transaction.
|
|
// The SELECT FOR UPDATE row lock prevents concurrent eviction from removing the
|
|
// NodeModel row between the find and increment operations.
|
|
func (r *NodeRegistry) FindAndLockNodeWithModel(ctx context.Context, modelName string) (*BackendNode, *NodeModel, error) {
|
|
var nm NodeModel
|
|
var node BackendNode
|
|
|
|
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
// Order by in_flight ASC (least busy replica), then by available_vram DESC
|
|
// (prefer nodes with more free VRAM to spread load across the cluster).
|
|
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
|
|
Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
|
|
Where("node_models.model_name = ? AND node_models.state = ?", modelName, "loaded").
|
|
Order("node_models.in_flight ASC, backend_nodes.available_vram DESC").
|
|
First(&nm).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.Model(&nm).Updates(map[string]any{
|
|
"in_flight": gorm.Expr("in_flight + 1"),
|
|
"last_used": time.Now(),
|
|
}).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.Where("id = ? AND status = ?", nm.NodeID, StatusHealthy).
|
|
First(&node).Error; err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return &node, &nm, nil
|
|
}
|
|
|
|
// TouchNodeModel updates the last_used timestamp for LRU tracking.
|
|
func (r *NodeRegistry) TouchNodeModel(ctx context.Context, nodeID, modelName string) {
|
|
r.db.WithContext(ctx).Model(&NodeModel{}).Where("node_id = ? AND model_name = ?", nodeID, modelName).
|
|
Update("last_used", time.Now())
|
|
}
|
|
|
|
// GetNodeModel returns the NodeModel record for a specific node+model combination.
|
|
func (r *NodeRegistry) GetNodeModel(ctx context.Context, nodeID, modelName string) (*NodeModel, error) {
|
|
var nm NodeModel
|
|
err := r.db.WithContext(ctx).Where("node_id = ? AND model_name = ?", nodeID, modelName).First(&nm).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &nm, nil
|
|
}
|
|
|
|
// FindLeastLoadedNode returns the healthy node with the fewest in-flight requests.
|
|
func (r *NodeRegistry) FindLeastLoadedNode(ctx context.Context) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
var node BackendNode
|
|
query := db.Where("status = ? AND node_type = ?", StatusHealthy, NodeTypeBackend)
|
|
// Order by total in-flight across all models on the node
|
|
subquery := db.Model(&NodeModel{}).
|
|
Select("node_id, COALESCE(SUM(in_flight), 0) as total_inflight").
|
|
Group("node_id")
|
|
|
|
err := query.Joins("LEFT JOIN (?) AS load ON load.node_id = backend_nodes.id", subquery).
|
|
Order("COALESCE(load.total_inflight, 0) ASC, backend_nodes.available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding least loaded node: %w", err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// FindIdleNode returns a healthy node with zero in-flight requests and zero loaded models.
|
|
// Used by the scheduler to prefer truly idle nodes for new backend assignments.
|
|
func (r *NodeRegistry) FindIdleNode(ctx context.Context) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
var node BackendNode
|
|
loadedModels := db.Model(&NodeModel{}).
|
|
Select("node_id").
|
|
Where("state = ?", "loaded").
|
|
Group("node_id")
|
|
err := db.Where("status = ? AND node_type = ? AND id NOT IN (?)", StatusHealthy, NodeTypeBackend, loadedModels).
|
|
Order("available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// IncrementInFlight atomically increments the in-flight counter for a model on a node.
|
|
func (r *NodeRegistry) IncrementInFlight(ctx context.Context, nodeID, modelName string) error {
|
|
result := r.db.WithContext(ctx).Model(&NodeModel{}).
|
|
Where("node_id = ? AND model_name = ?", nodeID, modelName).
|
|
Updates(map[string]any{
|
|
"in_flight": gorm.Expr("in_flight + 1"),
|
|
"last_used": time.Now(),
|
|
})
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return fmt.Errorf("node model %s/%s not found", nodeID, modelName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DecrementInFlight atomically decrements the in-flight counter for a model on a node.
|
|
func (r *NodeRegistry) DecrementInFlight(ctx context.Context, nodeID, modelName string) error {
|
|
result := r.db.WithContext(ctx).Model(&NodeModel{}).
|
|
Where("node_id = ? AND model_name = ? AND in_flight > 0", nodeID, modelName).
|
|
UpdateColumn("in_flight", gorm.Expr("in_flight - 1"))
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
xlog.Warn("DecrementInFlight: no matching row or already zero", "node", nodeID, "model", modelName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetNodeModels returns all models loaded on a given node.
|
|
func (r *NodeRegistry) GetNodeModels(ctx context.Context, nodeID string) ([]NodeModel, error) {
|
|
var models []NodeModel
|
|
if err := r.db.WithContext(ctx).Where("node_id = ?", nodeID).Find(&models).Error; err != nil {
|
|
return nil, fmt.Errorf("getting models for node %s: %w", nodeID, err)
|
|
}
|
|
return models, nil
|
|
}
|
|
|
|
// ListAllLoadedModels returns all models that are loaded on healthy nodes.
|
|
// Used by DistributedModelStore.Range() to discover models not in local cache.
|
|
func (r *NodeRegistry) ListAllLoadedModels(ctx context.Context) ([]NodeModel, error) {
|
|
var models []NodeModel
|
|
err := r.db.WithContext(ctx).Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
|
|
Where("node_models.state = ? AND backend_nodes.status = ?", "loaded", StatusHealthy).
|
|
Find(&models).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing all loaded models: %w", err)
|
|
}
|
|
return models, nil
|
|
}
|
|
|
|
// FindNodeForModel returns the first healthy node that has the given model loaded.
|
|
// Returns the node and true if found, nil and false otherwise.
|
|
func (r *NodeRegistry) FindNodeForModel(ctx context.Context, modelName string) (*BackendNode, bool) {
|
|
nodes, err := r.FindNodesWithModel(ctx, modelName)
|
|
if err != nil || len(nodes) == 0 {
|
|
return nil, false
|
|
}
|
|
return &nodes[0], true
|
|
}
|
|
|
|
// FindLRUModel returns the least-recently-used model on a node.
|
|
func (r *NodeRegistry) FindLRUModel(ctx context.Context, nodeID string) (*NodeModel, error) {
|
|
var nm NodeModel
|
|
err := r.db.WithContext(ctx).Where("node_id = ? AND state = ? AND in_flight = 0", nodeID, "loaded").
|
|
Order("last_used ASC").First(&nm).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding LRU model on node %s: %w", nodeID, err)
|
|
}
|
|
return &nm, nil
|
|
}
|
|
|
|
// FindGlobalLRUModelWithZeroInFlight returns the least-recently-used model
|
|
// across all healthy backend nodes that has zero in-flight requests.
|
|
// Used by the router for preemptive eviction when no node has free VRAM.
|
|
func (r *NodeRegistry) FindGlobalLRUModelWithZeroInFlight(ctx context.Context) (*NodeModel, error) {
|
|
var nm NodeModel
|
|
err := r.db.WithContext(ctx).Joins("JOIN backend_nodes ON backend_nodes.id = node_models.node_id").
|
|
Where("node_models.state = ? AND node_models.in_flight = 0 AND backend_nodes.status = ? AND backend_nodes.node_type = ?",
|
|
"loaded", StatusHealthy, NodeTypeBackend).
|
|
Order("node_models.last_used ASC").
|
|
First(&nm).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("no evictable model found: %w", err)
|
|
}
|
|
return &nm, nil
|
|
}
|
|
|
|
// --- NodeLabel operations ---
|
|
|
|
// SetNodeLabel upserts a single label on a node.
|
|
func (r *NodeRegistry) SetNodeLabel(ctx context.Context, nodeID, key, value string) error {
|
|
label := NodeLabel{
|
|
ID: uuid.New().String(),
|
|
NodeID: nodeID,
|
|
Key: key,
|
|
Value: value,
|
|
}
|
|
return r.db.WithContext(ctx).
|
|
Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "node_id"}, {Name: "key"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"value"}),
|
|
}).
|
|
Create(&label).Error
|
|
}
|
|
|
|
// SetNodeLabels replaces all labels for a node with the given map.
|
|
func (r *NodeRegistry) SetNodeLabels(ctx context.Context, nodeID string, labels map[string]string) error {
|
|
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
if err := tx.Where("node_id = ?", nodeID).Delete(&NodeLabel{}).Error; err != nil {
|
|
return err
|
|
}
|
|
for k, v := range labels {
|
|
label := NodeLabel{ID: uuid.New().String(), NodeID: nodeID, Key: k, Value: v}
|
|
if err := tx.Create(&label).Error; err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RemoveNodeLabel removes a single label from a node.
|
|
func (r *NodeRegistry) RemoveNodeLabel(ctx context.Context, nodeID, key string) error {
|
|
return r.db.WithContext(ctx).Where("node_id = ? AND key = ?", nodeID, key).Delete(&NodeLabel{}).Error
|
|
}
|
|
|
|
// GetNodeLabels returns all labels for a node.
|
|
func (r *NodeRegistry) GetNodeLabels(ctx context.Context, nodeID string) ([]NodeLabel, error) {
|
|
var labels []NodeLabel
|
|
err := r.db.WithContext(ctx).Where("node_id = ?", nodeID).Find(&labels).Error
|
|
return labels, err
|
|
}
|
|
|
|
// GetAllNodeLabelsMap returns all labels grouped by node ID.
|
|
func (r *NodeRegistry) GetAllNodeLabelsMap(ctx context.Context) (map[string]map[string]string, error) {
|
|
var labels []NodeLabel
|
|
if err := r.db.WithContext(ctx).Find(&labels).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
result := make(map[string]map[string]string)
|
|
for _, l := range labels {
|
|
if result[l.NodeID] == nil {
|
|
result[l.NodeID] = make(map[string]string)
|
|
}
|
|
result[l.NodeID][l.Key] = l.Value
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- Selector-based queries ---
|
|
|
|
// FindNodesBySelector returns healthy backend nodes matching ALL key-value pairs in the selector.
|
|
func (r *NodeRegistry) FindNodesBySelector(ctx context.Context, selector map[string]string) ([]BackendNode, error) {
|
|
if len(selector) == 0 {
|
|
// Empty selector matches all healthy backend nodes
|
|
var nodes []BackendNode
|
|
err := r.db.WithContext(ctx).Where("status = ? AND node_type = ?", StatusHealthy, NodeTypeBackend).Find(&nodes).Error
|
|
return nodes, err
|
|
}
|
|
|
|
db := r.db.WithContext(ctx).Where("status = ? AND node_type = ?", StatusHealthy, NodeTypeBackend)
|
|
for k, v := range selector {
|
|
db = db.Where("EXISTS (SELECT 1 FROM node_labels WHERE node_labels.node_id = backend_nodes.id AND node_labels.key = ? AND node_labels.value = ?)", k, v)
|
|
}
|
|
|
|
var nodes []BackendNode
|
|
err := db.Find(&nodes).Error
|
|
return nodes, err
|
|
}
|
|
|
|
// FindNodeWithVRAMFromSet is like FindNodeWithVRAM but restricted to the given node IDs.
|
|
func (r *NodeRegistry) FindNodeWithVRAMFromSet(ctx context.Context, minBytes uint64, nodeIDs []string) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
loadedModels := db.Model(&NodeModel{}).
|
|
Select("node_id").
|
|
Where("state = ?", "loaded").
|
|
Group("node_id")
|
|
|
|
subquery := db.Model(&NodeModel{}).
|
|
Select("node_id, COALESCE(SUM(in_flight), 0) as total_inflight").
|
|
Group("node_id")
|
|
|
|
// Try idle nodes with enough VRAM first, prefer the one with most free VRAM
|
|
var node BackendNode
|
|
err := db.Where("status = ? AND node_type = ? AND available_vram >= ? AND id NOT IN (?) AND id IN ?", StatusHealthy, NodeTypeBackend, minBytes, loadedModels, nodeIDs).
|
|
Order("available_vram DESC").
|
|
First(&node).Error
|
|
if err == nil {
|
|
return &node, nil
|
|
}
|
|
|
|
// Fall back to least-loaded nodes with enough VRAM, prefer most free VRAM as tiebreaker
|
|
err = db.Where("status = ? AND node_type = ? AND available_vram >= ? AND backend_nodes.id IN ?", StatusHealthy, NodeTypeBackend, minBytes, nodeIDs).
|
|
Joins("LEFT JOIN (?) AS load ON load.node_id = backend_nodes.id", subquery).
|
|
Order("COALESCE(load.total_inflight, 0) ASC, backend_nodes.available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("no healthy nodes in set with %d bytes available VRAM: %w", minBytes, err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// FindIdleNodeFromSet is like FindIdleNode but restricted to the given node IDs.
|
|
func (r *NodeRegistry) FindIdleNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
var node BackendNode
|
|
loadedModels := db.Model(&NodeModel{}).
|
|
Select("node_id").
|
|
Where("state = ?", "loaded").
|
|
Group("node_id")
|
|
err := db.Where("status = ? AND node_type = ? AND id NOT IN (?) AND id IN ?", StatusHealthy, NodeTypeBackend, loadedModels, nodeIDs).
|
|
Order("available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// FindLeastLoadedNodeFromSet is like FindLeastLoadedNode but restricted to the given node IDs.
|
|
func (r *NodeRegistry) FindLeastLoadedNodeFromSet(ctx context.Context, nodeIDs []string) (*BackendNode, error) {
|
|
db := r.db.WithContext(ctx)
|
|
|
|
var node BackendNode
|
|
query := db.Where("status = ? AND node_type = ? AND backend_nodes.id IN ?", StatusHealthy, NodeTypeBackend, nodeIDs)
|
|
// Order by total in-flight across all models on the node
|
|
subquery := db.Model(&NodeModel{}).
|
|
Select("node_id, COALESCE(SUM(in_flight), 0) as total_inflight").
|
|
Group("node_id")
|
|
|
|
err := query.Joins("LEFT JOIN (?) AS load ON load.node_id = backend_nodes.id", subquery).
|
|
Order("COALESCE(load.total_inflight, 0) ASC, backend_nodes.available_vram DESC").
|
|
First(&node).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding least loaded node in set: %w", err)
|
|
}
|
|
return &node, nil
|
|
}
|
|
|
|
// --- ModelSchedulingConfig operations ---
|
|
|
|
// SetModelScheduling creates or updates a scheduling config for a model.
|
|
func (r *NodeRegistry) SetModelScheduling(ctx context.Context, config *ModelSchedulingConfig) error {
|
|
if config.ID == "" {
|
|
config.ID = uuid.New().String()
|
|
}
|
|
return r.db.WithContext(ctx).
|
|
Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "model_name"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"node_selector", "min_replicas", "max_replicas", "updated_at"}),
|
|
}).
|
|
Create(config).Error
|
|
}
|
|
|
|
// GetModelScheduling returns the scheduling config for a model, or nil if none exists.
|
|
func (r *NodeRegistry) GetModelScheduling(ctx context.Context, modelName string) (*ModelSchedulingConfig, error) {
|
|
var config ModelSchedulingConfig
|
|
err := r.db.WithContext(ctx).Where("model_name = ?", modelName).First(&config).Error
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &config, nil
|
|
}
|
|
|
|
// ListModelSchedulings returns all scheduling configs.
|
|
func (r *NodeRegistry) ListModelSchedulings(ctx context.Context) ([]ModelSchedulingConfig, error) {
|
|
var configs []ModelSchedulingConfig
|
|
err := r.db.WithContext(ctx).Order("model_name ASC").Find(&configs).Error
|
|
return configs, err
|
|
}
|
|
|
|
// ListAutoScalingConfigs returns scheduling configs where auto-scaling is enabled.
|
|
func (r *NodeRegistry) ListAutoScalingConfigs(ctx context.Context) ([]ModelSchedulingConfig, error) {
|
|
var configs []ModelSchedulingConfig
|
|
err := r.db.WithContext(ctx).Where("min_replicas > 0 OR max_replicas > 0").Find(&configs).Error
|
|
return configs, err
|
|
}
|
|
|
|
// DeleteModelScheduling removes a scheduling config by model name.
|
|
func (r *NodeRegistry) DeleteModelScheduling(ctx context.Context, modelName string) error {
|
|
return r.db.WithContext(ctx).Where("model_name = ?", modelName).Delete(&ModelSchedulingConfig{}).Error
|
|
}
|
|
|
|
// CountLoadedReplicas returns the number of loaded replicas for a model.
|
|
func (r *NodeRegistry) CountLoadedReplicas(ctx context.Context, modelName string) (int64, error) {
|
|
var count int64
|
|
err := r.db.WithContext(ctx).Model(&NodeModel{}).Where("model_name = ? AND state = ?", modelName, "loaded").Count(&count).Error
|
|
return count, err
|
|
}
|
|
|
|
// --- Composite queries ---
|
|
|
|
// ListWithExtras returns all nodes with model counts and labels.
|
|
func (r *NodeRegistry) ListWithExtras(ctx context.Context) ([]NodeWithExtras, error) {
|
|
// Get all nodes
|
|
var nodes []BackendNode
|
|
if err := r.db.WithContext(ctx).Order("name ASC").Find(&nodes).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get model counts per node
|
|
type modelCount struct {
|
|
NodeID string
|
|
Count int
|
|
}
|
|
var counts []modelCount
|
|
if err := r.db.WithContext(ctx).Model(&NodeModel{}).
|
|
Select("node_id, COUNT(*) as count").
|
|
Where("state = ?", "loaded").
|
|
Group("node_id").
|
|
Find(&counts).Error; err != nil {
|
|
xlog.Warn("ListWithExtras: failed to get model counts", "error", err)
|
|
}
|
|
|
|
countMap := make(map[string]int)
|
|
for _, c := range counts {
|
|
countMap[c.NodeID] = c.Count
|
|
}
|
|
|
|
// Get in-flight counts per node
|
|
type inFlightCount struct {
|
|
NodeID string
|
|
Total int
|
|
}
|
|
var inFlights []inFlightCount
|
|
if err := r.db.WithContext(ctx).Model(&NodeModel{}).
|
|
Select("node_id, COALESCE(SUM(in_flight), 0) as total").
|
|
Where("state IN ?", []string{"loaded", "unloading"}).
|
|
Group("node_id").
|
|
Find(&inFlights).Error; err != nil {
|
|
xlog.Warn("ListWithExtras: failed to get in-flight counts", "error", err)
|
|
}
|
|
|
|
inFlightMap := make(map[string]int)
|
|
for _, f := range inFlights {
|
|
inFlightMap[f.NodeID] = f.Total
|
|
}
|
|
|
|
// Get all labels
|
|
labelsMap, err := r.GetAllNodeLabelsMap(ctx)
|
|
if err != nil {
|
|
xlog.Warn("ListWithExtras: failed to get labels", "error", err)
|
|
}
|
|
|
|
// Build result
|
|
result := make([]NodeWithExtras, len(nodes))
|
|
for i, n := range nodes {
|
|
result[i] = NodeWithExtras{
|
|
BackendNode: n,
|
|
ModelCount: countMap[n.ID],
|
|
InFlightCount: inFlightMap[n.ID],
|
|
Labels: labelsMap[n.ID],
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ApplyAutoLabels sets automatic labels based on node hardware info.
|
|
func (r *NodeRegistry) ApplyAutoLabels(ctx context.Context, nodeID string, node *BackendNode) {
|
|
if node.GPUVendor != "" {
|
|
_ = r.SetNodeLabel(ctx, nodeID, "gpu.vendor", node.GPUVendor)
|
|
}
|
|
if node.TotalVRAM > 0 {
|
|
gb := node.TotalVRAM / (1024 * 1024 * 1024)
|
|
var bucket string
|
|
switch {
|
|
case gb >= 80:
|
|
bucket = "80GB+"
|
|
case gb >= 48:
|
|
bucket = "48GB"
|
|
case gb >= 24:
|
|
bucket = "24GB"
|
|
case gb >= 16:
|
|
bucket = "16GB"
|
|
case gb >= 8:
|
|
bucket = "8GB"
|
|
default:
|
|
bucket = fmt.Sprintf("%dGB", gb)
|
|
}
|
|
_ = r.SetNodeLabel(ctx, nodeID, "gpu.vram", bucket)
|
|
}
|
|
if node.Name != "" {
|
|
_ = r.SetNodeLabel(ctx, nodeID, "node.name", node.Name)
|
|
}
|
|
}
|
|
|
|
// UpsertPendingBackendOp records or refreshes a pending backend operation for
|
|
// a node. If a row already exists for (nodeID, backend, op) we keep its
|
|
// Attempts/LastError but reset NextRetryAt to now, so reissuing the same
|
|
// delete/upgrade nudges it to the front of the queue instead of stacking a
|
|
// duplicate intent.
|
|
func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backend, op string, galleries []byte) error {
|
|
row := PendingBackendOp{
|
|
NodeID: nodeID,
|
|
Backend: backend,
|
|
Op: op,
|
|
Galleries: galleries,
|
|
NextRetryAt: time.Now(),
|
|
}
|
|
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
|
|
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
|
|
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
|
|
}).Create(&row).Error
|
|
}
|
|
|
|
// ListDuePendingBackendOps returns queued ops whose NextRetryAt has passed
|
|
// AND whose node is currently healthy. The reconciler drains this list; we
|
|
// filter by node status in the query so a tick doesn't hammer NATS for
|
|
// nodes that obviously can't answer.
|
|
func (r *NodeRegistry) ListDuePendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
|
|
var ops []PendingBackendOp
|
|
err := r.db.WithContext(ctx).
|
|
Joins("JOIN backend_nodes ON backend_nodes.id = pending_backend_ops.node_id").
|
|
Where("pending_backend_ops.next_retry_at <= ? AND backend_nodes.status = ?", time.Now(), StatusHealthy).
|
|
Order("pending_backend_ops.next_retry_at ASC").
|
|
Find(&ops).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing due pending backend ops: %w", err)
|
|
}
|
|
return ops, nil
|
|
}
|
|
|
|
// ListPendingBackendOps returns every queued row (for the UI "pending on N
|
|
// nodes" chip and the pre-delete ConfirmDialog).
|
|
func (r *NodeRegistry) ListPendingBackendOps(ctx context.Context) ([]PendingBackendOp, error) {
|
|
var ops []PendingBackendOp
|
|
if err := r.db.WithContext(ctx).Order("backend ASC, created_at ASC").Find(&ops).Error; err != nil {
|
|
return nil, fmt.Errorf("listing pending backend ops: %w", err)
|
|
}
|
|
return ops, nil
|
|
}
|
|
|
|
// DeletePendingBackendOp removes a queue row — called after the op succeeds.
|
|
func (r *NodeRegistry) DeletePendingBackendOp(ctx context.Context, id uint) error {
|
|
if err := r.db.WithContext(ctx).Delete(&PendingBackendOp{}, id).Error; err != nil {
|
|
return fmt.Errorf("deleting pending backend op %d: %w", id, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RecordPendingBackendOpFailure bumps Attempts, captures the error, and
|
|
// pushes NextRetryAt out with exponential backoff capped at 15 minutes.
|
|
func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uint, errMsg string) error {
|
|
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
var row PendingBackendOp
|
|
if err := tx.First(&row, id).Error; err != nil {
|
|
return err
|
|
}
|
|
row.Attempts++
|
|
row.LastError = errMsg
|
|
row.NextRetryAt = time.Now().Add(backoffForAttempt(row.Attempts))
|
|
return tx.Save(&row).Error
|
|
})
|
|
}
|
|
|
|
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
|
|
// reconciler tick is 30s so anything shorter would just re-fire immediately.
|
|
func backoffForAttempt(attempts int) time.Duration {
|
|
const cap = 15 * time.Minute
|
|
base := 30 * time.Second
|
|
shift := attempts - 1
|
|
if shift < 0 {
|
|
shift = 0
|
|
}
|
|
if shift > 10 { // 2^10 * 30s already exceeds the cap
|
|
shift = 10
|
|
}
|
|
d := base << shift
|
|
if d > cap {
|
|
return cap
|
|
}
|
|
return d
|
|
}
|
|
|
|
// CountPendingBackendOpsByBackend returns a map of backend name to the count
|
|
// of pending rows. Used to decorate Manage → Backends with a "pending on N
|
|
// nodes" chip without exposing the full queue.
|
|
func (r *NodeRegistry) CountPendingBackendOpsByBackend(ctx context.Context) (map[string]int, error) {
|
|
type row struct {
|
|
Backend string
|
|
Count int
|
|
}
|
|
var rows []row
|
|
err := r.db.WithContext(ctx).Model(&PendingBackendOp{}).
|
|
Select("backend, COUNT(*) as count").
|
|
Group("backend").
|
|
Scan(&rows).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("counting pending backend ops: %w", err)
|
|
}
|
|
out := make(map[string]int, len(rows))
|
|
for _, r := range rows {
|
|
out[r.Backend] = r.Count
|
|
}
|
|
return out, nil
|
|
}
|