feat(distributed): treat NATS install timeout as in-progress, not failure

When a worker times out replying to backend.install but the install is
still running on the worker, enqueueAndDrainBackendOp now reports a
running_on_worker status and pushes NextRetryAt out by the install
timeout so the reconciler does not immediately re-fire another install
while the worker is still pulling the image. The pending_backend_ops
row stays in place for the next reconciler pass to confirm via
backend.list.

InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling
so callers can branch (galleryop renders yellow in-progress instead of
red error). UpgradeBackend uses the same wrap.

Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push
NextRetryAt by the configured timeout without reaching into a private
field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft
cousin of RecordPendingBackendOpFailure.

Also includes incidental gofmt-driven struct-field alignment in
registry.go on lines unrelated to the change (touched files are
re-formatted to canonical form per project policy).

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 20:25:13 +00:00
parent 4f89882057
commit 169ff75633
4 changed files with 139 additions and 35 deletions

View File

@@ -173,8 +173,25 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
}
// Record failure for backoff. If it's an ErrNoResponders, the node's
// gone AWOL mark unhealthy so the router stops picking it too.
// gone AWOL - mark unhealthy so the router stops picking it too.
errMsg := applyErr.Error()
// Worker-still-installing is a "soft" failure: the worker is most
// likely still pulling the OCI image. Keep the row, push NextRetryAt
// out so the reconciler does not immediately re-fire another install
// while the worker is still busy, and report the in-progress state
// to the caller. The next reconciler pass / backend.list confirms
// the actual outcome.
if errors.Is(applyErr, galleryop.ErrWorkerStillInstalling) {
if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil {
_ = d.registry.RecordPendingBackendOpInFlight(ctx, id, errMsg, d.adapter.InstallTimeout())
}
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "running_on_worker", Error: errMsg,
})
continue
}
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID)
d.registry.MarkUnhealthy(ctx, node.ID)
@@ -361,7 +378,19 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// No hard failures, but if at least one node reported running_on_worker,
// surface a wrapped ErrWorkerStillInstalling so galleryop can render a
// yellow in-progress state instead of green success. The reconciler
// will confirm the actual outcome on its next pass via backend.list.
for _, n := range result.Nodes {
if n.Status == "running_on_worker" {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow
@@ -417,7 +446,18 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
if err != nil {
return err
}
return result.Err()
if hardErr := result.Err(); hardErr != nil {
return hardErr
}
// Same in-progress surfacing as InstallBackend: a long-running worker
// upgrade that timed out the NATS round-trip must not be reported as
// green success.
for _, n := range result.Nodes {
if n.Status == "running_on_worker" {
return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes))
}
}
return nil
}
// IsDistributed reports that installs from this manager fan out across the
@@ -443,3 +483,16 @@ func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[stri
// it used to come from the empty frontend filesystem.
return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed)
}
// summarizeRunningOnWorker builds a short human-readable summary of which
// nodes are still installing in the background, for inclusion in the
// wrapped ErrWorkerStillInstalling error.
func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
var names []string
for _, n := range nodes {
if n.Status == "running_on_worker" {
names = append(names, n.NodeName)
}
}
return strings.Join(names, ", ")
}

View File

@@ -3,6 +3,7 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"runtime"
"sync"
"time"
@@ -352,6 +353,33 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mc.calls).To(BeEmpty())
})
})
Context("when InstallBackend times out on a worker", func() {
It("returns galleryop.ErrWorkerStillInstalling and keeps the queue row with NextRetryAt pushed out", func() {
n := registerHealthyBackend("slow", "10.0.0.1:50051")
// Script a NATS timeout on the install subject. The adapter
// wraps this into galleryop.ErrWorkerStillInstalling, which
// the manager should treat as a soft failure.
mc.scriptErr(messaging.SubjectNodeBackendInstall(n.ID), nats.ErrTimeout)
err := mgr.InstallBackend(ctx, op("vllm"), nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
rows, err := registry.ListPendingBackendOps(ctx)
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(HaveLen(1))
Expect(rows[0].Backend).To(Equal("vllm"))
// The adapter is configured with a 3m install timeout in this
// suite (NewRemoteUnloaderAdapter above). NextRetryAt should
// be ~now+3m; a > now+2m bound is safe-but-tight enough to
// catch the buggy short default (30s exponential backoff).
Expect(rows[0].NextRetryAt).To(BeTemporally(">", time.Now().Add(2*time.Minute)),
"NextRetryAt should be pushed to ~now+installTimeout, not the short default")
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -17,24 +17,24 @@ import (
// Workers are generic — they don't have a fixed backend type.
// The SmartRouter dynamically installs backends via NATS backend.install events.
type BackendNode struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
ID string `gorm:"primaryKey;size:36" json:"id"`
Name string `gorm:"uniqueIndex;size:255" json:"name"`
NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent
Address string `gorm:"size:255" json:"address"` // host:port for gRPC
HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer
Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending
TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token
TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes
AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes
// ReservedVRAM is a soft, in-tick reservation deducted by the scheduler when
// it picks this node to load a model. Workers reset it back to 0 on each
// heartbeat (the worker is the source of truth for actual free VRAM); the
// reservation is only here to keep two scheduling decisions within the
// same heartbeat window from over-committing the same node.
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"`
TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU)
AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes
GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this node concurrently. Default 1 preserves the historical "one
// (node, model)" assumption; set higher (via worker --max-replicas-per-model)
@@ -44,12 +44,12 @@ type BackendNode struct {
// admin override. When true, the worker's CLI value is ignored on
// re-registration so the override survives worker restarts. Cleared
// by an explicit "reset to worker default" action.
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"`
APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup)
AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup)
LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
const (
@@ -79,17 +79,17 @@ const (
// gRPC Address (each replica is a separate worker process on its own port),
// and its own InFlight counter.
type NodeModel struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
ID string `gorm:"primaryKey;size:36" json:"id"`
NodeID string `gorm:"index;size:36" json:"node_id"`
ModelName string `gorm:"index;size:255" json:"model_name"`
ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"`
Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process
State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle
InFlight int `json:"in_flight"` // number of active requests on this replica
LastUsed time.Time `json:"last_used"`
LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading
BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads
ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
@@ -1287,7 +1287,7 @@ func (r *NodeRegistry) UpdateMaxReplicasPerModel(ctx context.Context, nodeID str
res := r.db.WithContext(ctx).Model(&BackendNode{}).
Where("id = ?", nodeID).
Updates(map[string]any{
ColMaxReplicasPerModel: n,
ColMaxReplicasPerModel: n,
"max_replicas_per_model_manually_set": true,
})
if res.Error != nil {
@@ -1460,7 +1460,7 @@ func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backe
NextRetryAt: time.Now(),
}
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}},
DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}),
}).Create(&row).Error
}
@@ -1515,6 +1515,22 @@ func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uin
})
}
// RecordPendingBackendOpInFlight is the "soft failure" cousin of
// RecordPendingBackendOpFailure. Used when a NATS install round-trip timed
// out but the worker is still installing in the background. Increments
// Attempts and stores the message in LastError, but pushes NextRetryAt out
// by `retryDelay` (typically the install timeout) so the reconciler does
// not immediately re-fire another install while the worker is still busy.
func (r *NodeRegistry) RecordPendingBackendOpInFlight(ctx context.Context, id uint, lastError string, retryDelay time.Duration) error {
return r.db.WithContext(ctx).Model(&PendingBackendOp{}).
Where("id = ?", id).
Updates(map[string]any{
"attempts": gorm.Expr("attempts + 1"),
"last_error": lastError,
"next_retry_at": time.Now().Add(retryDelay),
}).Error
}
// backoffForAttempt is exponential from 30s doubling up to a 15m cap. The
// reconciler tick is 30s so anything shorter would just re-fire immediately.
func backoffForAttempt(attempts int) time.Duration {

View File

@@ -68,6 +68,13 @@ func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingCli
}
}
// InstallTimeout returns the configured backend.install round-trip timeout.
// Used by DistributedBackendManager to push NextRetryAt out by this duration
// when a worker times out replying but is still installing in the background.
func (a *RemoteUnloaderAdapter) InstallTimeout() time.Duration {
return a.installTimeout
}
// UnloadRemoteModel finds the node(s) hosting the given model and tells them
// to stop their backend process via NATS backend.stop event.
// The worker process handles: Free() → kill process.