From 169ff7563388ea4bace3f4d155074759c6931d35 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 20:25:13 +0000 Subject: [PATCH] feat(distributed): treat NATS install timeout as in-progress, not failure When a worker times out replying to backend.install but the install is still running on the worker, enqueueAndDrainBackendOp now reports a running_on_worker status and pushes NextRetryAt out by the install timeout so the reconciler does not immediately re-fire another install while the worker is still pulling the image. The pending_backend_ops row stays in place for the next reconciler pass to confirm via backend.list. InstallBackend wraps the result in galleryop.ErrWorkerStillInstalling so callers can branch (galleryop renders yellow in-progress instead of red error). UpgradeBackend uses the same wrap. Adds RemoteUnloaderAdapter.InstallTimeout() so the manager can push NextRetryAt by the configured timeout without reaching into a private field, and NodeRegistry.RecordPendingBackendOpInFlight as the soft cousin of RecordPendingBackendOpFailure. Also includes incidental gofmt-driven struct-field alignment in registry.go on lines unrelated to the change (touched files are re-formatted to canonical form per project policy). Signed-off-by: Ettore Di Giacinto --- core/services/nodes/managers_distributed.go | 59 +++++++++++++- .../nodes/managers_distributed_test.go | 28 +++++++ core/services/nodes/registry.go | 80 +++++++++++-------- core/services/nodes/unloader.go | 7 ++ 4 files changed, 139 insertions(+), 35 deletions(-) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index b5a92fcbb..94fee2997 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -173,8 +173,25 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context } // Record failure for backoff. If it's an ErrNoResponders, the node's - // gone AWOL — mark unhealthy so the router stops picking it too. + // gone AWOL - mark unhealthy so the router stops picking it too. errMsg := applyErr.Error() + + // Worker-still-installing is a "soft" failure: the worker is most + // likely still pulling the OCI image. Keep the row, push NextRetryAt + // out so the reconciler does not immediately re-fire another install + // while the worker is still busy, and report the in-progress state + // to the caller. The next reconciler pass / backend.list confirms + // the actual outcome. + if errors.Is(applyErr, galleryop.ErrWorkerStillInstalling) { + if id, err := d.findPendingRow(ctx, node.ID, backend, op); err == nil { + _ = d.registry.RecordPendingBackendOpInFlight(ctx, id, errMsg, d.adapter.InstallTimeout()) + } + result.Nodes = append(result.Nodes, NodeOpStatus{ + NodeID: node.ID, NodeName: node.Name, Status: "running_on_worker", Error: errMsg, + }) + continue + } + if errors.Is(applyErr, nats.ErrNoResponders) { xlog.Warn("No NATS responders for node, marking unhealthy", "node", node.Name, "nodeID", node.ID) d.registry.MarkUnhealthy(ctx, node.ID) @@ -361,7 +378,19 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall if err != nil { return err } - return result.Err() + if hardErr := result.Err(); hardErr != nil { + return hardErr + } + // No hard failures, but if at least one node reported running_on_worker, + // surface a wrapped ErrWorkerStillInstalling so galleryop can render a + // yellow in-progress state instead of green success. The reconciler + // will confirm the actual outcome on its next pass via backend.list. + for _, n := range result.Nodes { + if n.Status == "running_on_worker" { + return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes)) + } + } + return nil } // UpgradeBackend uses a separate NATS subject (backend.upgrade) so the slow @@ -417,7 +446,18 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str if err != nil { return err } - return result.Err() + if hardErr := result.Err(); hardErr != nil { + return hardErr + } + // Same in-progress surfacing as InstallBackend: a long-running worker + // upgrade that timed out the NATS round-trip must not be reported as + // green success. + for _, n := range result.Nodes { + if n.Status == "running_on_worker" { + return fmt.Errorf("%w: %s", galleryop.ErrWorkerStillInstalling, summarizeRunningOnWorker(result.Nodes)) + } + } + return nil } // IsDistributed reports that installs from this manager fan out across the @@ -443,3 +483,16 @@ func (d *DistributedBackendManager) CheckUpgrades(ctx context.Context) (map[stri // it used to come from the empty frontend filesystem. return gallery.CheckUpgradesAgainst(ctx, d.backendGalleries, d.systemState, installed) } + +// summarizeRunningOnWorker builds a short human-readable summary of which +// nodes are still installing in the background, for inclusion in the +// wrapped ErrWorkerStillInstalling error. +func summarizeRunningOnWorker(nodes []NodeOpStatus) string { + var names []string + for _, n := range nodes { + if n.Status == "running_on_worker" { + names = append(names, n.NodeName) + } + } + return strings.Join(names, ", ") +} diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index 10df15214..e30f2fe40 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -3,6 +3,7 @@ package nodes import ( "context" "encoding/json" + "errors" "runtime" "sync" "time" @@ -352,6 +353,33 @@ var _ = Describe("DistributedBackendManager", func() { Expect(mc.calls).To(BeEmpty()) }) }) + + Context("when InstallBackend times out on a worker", func() { + It("returns galleryop.ErrWorkerStillInstalling and keeps the queue row with NextRetryAt pushed out", func() { + n := registerHealthyBackend("slow", "10.0.0.1:50051") + + // Script a NATS timeout on the install subject. The adapter + // wraps this into galleryop.ErrWorkerStillInstalling, which + // the manager should treat as a soft failure. + mc.scriptErr(messaging.SubjectNodeBackendInstall(n.ID), nats.ErrTimeout) + + err := mgr.InstallBackend(ctx, op("vllm"), nil) + Expect(err).To(HaveOccurred()) + Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(), + "expected wrapped ErrWorkerStillInstalling, got %v", err) + + rows, err := registry.ListPendingBackendOps(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(rows).To(HaveLen(1)) + Expect(rows[0].Backend).To(Equal("vllm")) + // The adapter is configured with a 3m install timeout in this + // suite (NewRemoteUnloaderAdapter above). NextRetryAt should + // be ~now+3m; a > now+2m bound is safe-but-tight enough to + // catch the buggy short default (30s exponential backoff). + Expect(rows[0].NextRetryAt).To(BeTemporally(">", time.Now().Add(2*time.Minute)), + "NextRetryAt should be pushed to ~now+installTimeout, not the short default") + }) + }) }) Describe("UpgradeBackend", func() { diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 8849cf6e2..5869f8e6a 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -17,24 +17,24 @@ import ( // Workers are generic — they don't have a fixed backend type. // The SmartRouter dynamically installs backends via NATS backend.install events. type BackendNode struct { - ID string `gorm:"primaryKey;size:36" json:"id"` - Name string `gorm:"uniqueIndex;size:255" json:"name"` - NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent - Address string `gorm:"size:255" json:"address"` // host:port for gRPC - HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer - Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending - TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token - TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes - AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes + ID string `gorm:"primaryKey;size:36" json:"id"` + Name string `gorm:"uniqueIndex;size:255" json:"name"` + NodeType string `gorm:"size:32;default:backend" json:"node_type"` // backend, agent + Address string `gorm:"size:255" json:"address"` // host:port for gRPC + HTTPAddress string `gorm:"size:255" json:"http_address"` // host:port for HTTP file transfer + Status string `gorm:"size:32;default:registering" json:"status"` // registering, healthy, unhealthy, draining, pending + TokenHash string `gorm:"size:64" json:"-"` // SHA-256 of registration token + TotalVRAM uint64 `gorm:"column:total_vram" json:"total_vram"` // Total GPU VRAM in bytes + AvailableVRAM uint64 `gorm:"column:available_vram" json:"available_vram"` // Available GPU VRAM in bytes // ReservedVRAM is a soft, in-tick reservation deducted by the scheduler when // it picks this node to load a model. Workers reset it back to 0 on each // heartbeat (the worker is the source of truth for actual free VRAM); the // reservation is only here to keep two scheduling decisions within the // same heartbeat window from over-committing the same node. - ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"` - TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU) - AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes - GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown + ReservedVRAM uint64 `gorm:"column:reserved_vram;default:0" json:"reserved_vram"` + TotalRAM uint64 `gorm:"column:total_ram" json:"total_ram"` // Total system RAM in bytes (fallback when no GPU) + AvailableRAM uint64 `gorm:"column:available_ram" json:"available_ram"` // Available system RAM in bytes + GPUVendor string `gorm:"column:gpu_vendor;size:32" json:"gpu_vendor"` // nvidia, amd, intel, vulkan, unknown // MaxReplicasPerModel caps how many replicas of any one model can run on // this node concurrently. Default 1 preserves the historical "one // (node, model)" assumption; set higher (via worker --max-replicas-per-model) @@ -44,12 +44,12 @@ type BackendNode struct { // admin override. When true, the worker's CLI value is ignored on // re-registration so the override survives worker restarts. Cleared // by an explicit "reset to worker default" action. - MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"` - APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup) - AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup) - LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + MaxReplicasPerModelManuallySet bool `gorm:"column:max_replicas_per_model_manually_set;default:false" json:"max_replicas_per_model_manually_set"` + APIKeyID string `gorm:"size:36" json:"-"` // auto-provisioned API key ID (for cleanup) + AuthUserID string `gorm:"size:36" json:"-"` // auto-provisioned user ID (for cleanup) + LastHeartbeat time.Time `gorm:"column:last_heartbeat" json:"last_heartbeat"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } const ( @@ -79,17 +79,17 @@ const ( // gRPC Address (each replica is a separate worker process on its own port), // and its own InFlight counter. type NodeModel struct { - ID string `gorm:"primaryKey;size:36" json:"id"` - NodeID string `gorm:"index;size:36" json:"node_id"` - ModelName string `gorm:"index;size:255" json:"model_name"` - ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"` - Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process - State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle - InFlight int `json:"in_flight"` // number of active requests on this replica - LastUsed time.Time `json:"last_used"` - LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading - BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads - ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups + ID string `gorm:"primaryKey;size:36" json:"id"` + NodeID string `gorm:"index;size:36" json:"node_id"` + ModelName string `gorm:"index;size:255" json:"model_name"` + ReplicaIndex int `gorm:"column:replica_index;default:0;index" json:"replica_index"` + Address string `gorm:"size:255" json:"address"` // gRPC address for this replica's backend process + State string `gorm:"size:32;default:idle" json:"state"` // loading, loaded, unloading, idle + InFlight int `json:"in_flight"` // number of active requests on this replica + LastUsed time.Time `json:"last_used"` + LoadingBy string `gorm:"size:36" json:"loading_by,omitempty"` // frontend ID that triggered loading + BackendType string `gorm:"size:128" json:"backend_type,omitempty"` // e.g. "llama-cpp"; used by reconciler to replicate loads + ModelOptsBlob []byte `gorm:"type:bytea" json:"-"` // serialized pb.ModelOptions for replica scale-ups CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } @@ -1287,7 +1287,7 @@ func (r *NodeRegistry) UpdateMaxReplicasPerModel(ctx context.Context, nodeID str res := r.db.WithContext(ctx).Model(&BackendNode{}). Where("id = ?", nodeID). Updates(map[string]any{ - ColMaxReplicasPerModel: n, + ColMaxReplicasPerModel: n, "max_replicas_per_model_manually_set": true, }) if res.Error != nil { @@ -1460,7 +1460,7 @@ func (r *NodeRegistry) UpsertPendingBackendOp(ctx context.Context, nodeID, backe NextRetryAt: time.Now(), } return r.db.WithContext(ctx).Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}}, + Columns: []clause.Column{{Name: "node_id"}, {Name: "backend"}, {Name: "op"}}, DoUpdates: clause.AssignmentColumns([]string{"galleries", "next_retry_at"}), }).Create(&row).Error } @@ -1515,6 +1515,22 @@ func (r *NodeRegistry) RecordPendingBackendOpFailure(ctx context.Context, id uin }) } +// RecordPendingBackendOpInFlight is the "soft failure" cousin of +// RecordPendingBackendOpFailure. Used when a NATS install round-trip timed +// out but the worker is still installing in the background. Increments +// Attempts and stores the message in LastError, but pushes NextRetryAt out +// by `retryDelay` (typically the install timeout) so the reconciler does +// not immediately re-fire another install while the worker is still busy. +func (r *NodeRegistry) RecordPendingBackendOpInFlight(ctx context.Context, id uint, lastError string, retryDelay time.Duration) error { + return r.db.WithContext(ctx).Model(&PendingBackendOp{}). + Where("id = ?", id). + Updates(map[string]any{ + "attempts": gorm.Expr("attempts + 1"), + "last_error": lastError, + "next_retry_at": time.Now().Add(retryDelay), + }).Error +} + // backoffForAttempt is exponential from 30s doubling up to a 15m cap. The // reconciler tick is 30s so anything shorter would just re-fire immediately. func backoffForAttempt(attempts int) time.Duration { diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index dee967a9c..8bc8d5808 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -68,6 +68,13 @@ func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingCli } } +// InstallTimeout returns the configured backend.install round-trip timeout. +// Used by DistributedBackendManager to push NextRetryAt out by this duration +// when a worker times out replying but is still installing in the background. +func (a *RemoteUnloaderAdapter) InstallTimeout() time.Duration { + return a.installTimeout +} + // UnloadRemoteModel finds the node(s) hosting the given model and tells them // to stop their backend process via NATS backend.stop event. // The worker process handles: Free() → kill process.