mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-30 03:55:58 -04:00
* fix(distributed): detect backend upgrades across worker nodes
Before this change `DistributedBackendManager.CheckUpgrades` delegated to the
local manager, which read backends from the frontend filesystem. In
distributed deployments the frontend has no backends installed locally —
they live on workers — so the upgrade-detection loop never ran and the UI
silently never surfaced upgrades even when the gallery advertised newer
versions or digests.
Worker-side: NATS backend.list reply now carries Version, URI and Digest
for each installed backend (read from metadata.json).
Frontend-side: DistributedBackendManager.ListBackends aggregates per-node
refs (name, status, version, digest) instead of deduping, and CheckUpgrades
feeds that aggregation into gallery.CheckUpgradesAgainst — a new entrypoint
factored out of CheckBackendUpgrades so both paths share the same core
logic.
Cluster drift policy: when per-node version/digest tuples disagree, the
backend is flagged upgradeable regardless of whether any single node
matches the gallery, and UpgradeInfo.NodeDrift enumerates the outliers so
operators can see *why* it is out of sync. The next upgrade-all realigns
the cluster.
Tests cover: drift detection, unanimous-match (no upgrade), and the
empty-installed-version path that the old distributed code silently
missed.
* feat(ui): surface backend upgrades in the System page
The System page (Manage.jsx) only showed updates as a tiny inline arrow,
so operators routinely missed them. Port the Backend Gallery's upgrade UX
so System speaks the same visual language:
- Yellow banner at the top of the Backends tab when upgrades are pending,
with an "Upgrade all" button (serial fan-out, matches the gallery) and a
"Updates only" filter toggle.
- Warning pill (↑ N) next to the tab label so the count is glanceable even
when the banner is scrolled out of view.
- Per-row labeled "Upgrade to vX.Y" button (replaces the icon-only button
that silently flipped semantics between Reinstall and Upgrade), plus an
"Update available" badge in the new Version column.
- New columns: Version (with upgrade + drift chips), Nodes (per-node
attribution badges for distributed mode, degrading to a compact
"on N nodes · M offline" chip above three nodes), Installed (relative
time).
- System backends render a "Protected" chip instead of a bare "—" so rows
still align and the reason is obvious.
- Delete uses the softer btn-danger-ghost so rows don't scream red; the
ConfirmDialog still owns the "are you sure".
The upgrade checker also needed the same per-worker fix as the previous
commit: NewUpgradeChecker now takes a BackendManager getter so its
periodic runs call the distributed CheckUpgrades (which asks workers)
instead of the empty frontend filesystem. Without this the /api/backends/
upgrades endpoint stayed empty in distributed mode even with the protocol
change in place.
New CSS primitives — .upgrade-banner, .tab-pill, .badge-row, .cell-stack,
.cell-mono, .cell-muted, .row-actions, .btn-danger-ghost — all live in
App.css so other pages can adopt them without duplicating styles.
* feat(ui): polish the Nodes page so it reads like a product
The Nodes page was the biggest visual liability in distributed mode.
Rework the main dashboard surfaces in place without changing behavior:
StatCards: uniform height (96px min), left accent bar colored by the
metric's semantic (success/warning/error/primary), icon lives in a
36x36 soft-tinted chip top-right, value is left-aligned and large.
Grid auto-fills so the row doesn't collapse on narrow viewports. This
replaces the previous thin-bordered boxes with inconsistent heights.
Table rows: expandable rows now show a chevron cue on the left (rotates
on expand) so users know rows open. Status cell became a dedicated chip
with an LED-style halo dot instead of a bare bullet. Action buttons gained
labels — "Approve", "Resume", "Drain" — so the icons aren't doing all
the semantic work; the destructive remove action uses the softer
btn-danger-ghost variant so rows don't scream red, with the ConfirmDialog
still owning the real "are you sure". Applied cell-mono/cell-muted
utility classes so label chips and addresses share one spacing/font
grammar instead of re-declaring inline styles everywhere.
Expanded drawer: empty states for Loaded Models and Installed Backends
now render as a proper drawer-empty card (dashed border, icon, one-line
hint) instead of a plain muted string that read like broken formatting.
Tabs: three inline-styled buttons became the shared .tab class so they
inherit focus ring, hover state, and the rest of the design system —
matches the System page.
"Add more workers" toggle turned into a .nodes-add-worker dashed-border
button labelled "Register a new worker" (action voice) instead of a
chevron + muted link that operators kept mistaking for broken text.
New shared CSS primitives carry over to other pages:
.stat-grid + .stat-card, .row-chevron, .node-status, .drawer-empty,
.nodes-add-worker.
* feat(distributed): durable backend fan-out + state reconciliation
Two connected problems handled together:
1) Backend delete/install/upgrade used to silently skip non-healthy nodes,
so a delete during an outage left a zombie on the offline node once it
returned. The fan-out now records intent in a new pending_backend_ops
table before attempting the NATS round-trip. Currently-healthy nodes
get an immediate attempt; everyone else is queued. Unique index on
(node_id, backend, op) means reissuing the same operation refreshes
next_retry_at instead of stacking duplicates.
2) Loaded-model state could drift from reality: a worker OOM'd, got
killed, or restarted a backend process would leave a node_models row
claiming the model was still loaded, feeding ghost entries into the
/api/nodes/models listing and the router's scheduling decisions.
The existing ReplicaReconciler gains two new passes that run under a
fresh KeyStateReconciler advisory lock (non-blocking, so one wedged
frontend doesn't freeze the cluster):
- drainPendingBackendOps: retries queued ops whose next_retry_at has
passed on currently-healthy nodes. Success deletes the row; failure
bumps attempts and pushes next_retry_at out with exponential backoff
(30s → 15m cap). ErrNoResponders also marks the node unhealthy.
- probeLoadedModels: gRPC-HealthChecks addresses the DB thinks are
loaded but hasn't seen touched in the last probeStaleAfter (2m).
Unreachable addresses are removed from the registry. A pluggable
ModelProber lets tests substitute a fake without standing up gRPC.
DistributedBackendManager exposes DeleteBackendDetailed so the HTTP
handler can surface per-node outcomes ("2 succeeded, 1 queued") to the
UI in a follow-up commit; the existing DeleteBackend still returns
error-only for callers that don't care about node breakdown.
Multi-frontend safety: the state pass uses advisorylock.TryWithLockCtx
on a new key so N frontends coordinate — the same pattern the health
monitor and replica reconciler already rely on. Single-node mode runs
both passes inline (adapter is nil, state drain is a no-op).
Tests cover the upsert semantics, backoff math, the probe removing an
unreachable model but keeping a reachable one, and filtering by
probeStaleAfter.
* feat(ui): show cluster distribution of models in the System page
When a frontend restarted in distributed mode, models that workers had
already loaded weren't visible until the operator clicked into each node
manually — the /api/models/capabilities endpoint only knew about
configs on the frontend's filesystem, not the registry-backed truth.
/api/models/capabilities now joins in ListAllLoadedModels() when the
registry is active, returning loaded_on[] with node id/name/state/status
for each model. Models that live in the registry but lack a local config
(the actual ghosts, not recovered from the frontend's file cache) still
surface with source="registry-only" so operators can see and persist
them; without that emission they'd be invisible to this frontend.
Manage → Models replaces the old Running/Idle pill with a distribution
cell that lists the first three nodes the model is loaded on as chips
colored by state (green loaded, blue loading, amber anything else). On
wider clusters the remaining count collapses into a +N chip with a
title-attribute breakdown. Disabled / single-node behavior unchanged.
Adopted models get an extra "Adopted" ghost-icon chip with hover copy
explaining what it means and how to make it permanent.
Distributed mode also enables a 10s auto-refresh and a "Last synced Xs
ago" indicator next to the Update button so ghost rows drop off within
one reconcile tick after their owning process dies. Non-distributed
mode is untouched — no polling, no cell-stack, same old Running/Idle.
* feat(ui): NodeDistributionChip — shared per-node attribution component
Large clusters were going to break the Manage → Backends Nodes column:
the old inline logic rendered every node as a badge and would shred the
layout at >10 workers, plus the Manage → Models distribution cell had
copy-pasted its own slightly-different version.
NodeDistributionChip handles any cluster size with two render modes:
- small (≤3 nodes): inline chips of node names, colored by health.
- large: a single "on N nodes · M offline · K drift" summary chip;
clicking opens a Popover with a per-node table (name, status,
version, digest for backends; name, status, state for models).
Drift counting mirrors the backend's summarizeNodeDrift so the UI
number matches UpgradeInfo.NodeDrift. Digests are truncated to the
docker-style 12-char form with the full value preserved in the title.
Popover is a new general-purpose primitive: fixed positioning anchored
to the trigger, flips above when there's no room below, closes on
outside-click or Escape, returns focus to the trigger. Uses .card as
its surface so theming is inherited. Also useful for a future
labels-editor popup and the user menu.
Manage.jsx drops its duplicated inline Nodes-column + loaded_on cell
and uses the shared chip with context="backends" / "models"
respectively. Delete code removes ~40 lines of ad-hoc logic.
* feat(ui): shared FilterBar across the System page tabs
The Backends gallery had a nice search + chip + toggle strip; the System
page had nothing, so the two surfaces felt like different apps. Lift the
pattern into a reusable FilterBar and wire both System tabs through it.
New component core/http/react-ui/src/components/FilterBar.jsx renders a
search input, a role="tablist" chip row (aria-selected for a11y), and
optional toggles / right slot. Chips support an optional `count` which
the System page uses to show "User 3", "Updates 1" etc.
System Models tab: search by id or backend; chips for
All/Running/Idle/Disabled/Pinned plus a conditional Distributed chip in
distributed mode. "Last synced" + Update button live in the right slot.
System Backends tab: search by name/alias/meta-backend-for; chips for
All/User/System/Meta plus conditional Updates / Offline-nodes chips
when relevant. The old ad-hoc "Updates only" toggle from the upgrade
banner folded into the Updates chip — one source of truth for that
filter. Offline chip only appears in distributed mode when at least
one backend has an unhealthy node, so the chip row stays quiet on
healthy clusters.
Filter state persists in URL query params (mq/mf/bq/bf) so deep links
and tab switches keep the operator's filter context instead of
resetting every time.
Also adds an "Adopted" distribution path: when a model in
/api/models/capabilities carries source="registry-only" (discovered on
a worker but not configured locally), the Models tab shows a ghost chip
labelled "Adopted" with hover copy explaining how to persist it — this
is what closes the loop on the ghost-model story end-to-end.
940 lines
32 KiB
Go
940 lines
32 KiB
Go
package cli
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"maps"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
cliContext "github.com/mudler/LocalAI/core/cli/context"
|
|
"github.com/mudler/LocalAI/core/cli/workerregistry"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
"github.com/mudler/LocalAI/core/gallery"
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/nodes"
|
|
"github.com/mudler/LocalAI/core/services/storage"
|
|
grpc "github.com/mudler/LocalAI/pkg/grpc"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
"github.com/mudler/LocalAI/pkg/sanitize"
|
|
"github.com/mudler/LocalAI/pkg/system"
|
|
"github.com/mudler/LocalAI/pkg/xsysinfo"
|
|
process "github.com/mudler/go-processmanager"
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// isPathAllowed checks if path is within one of the allowed directories.
|
|
func isPathAllowed(path string, allowedDirs []string) bool {
|
|
absPath, err := filepath.Abs(path)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
resolved, err := filepath.EvalSymlinks(absPath)
|
|
if err != nil {
|
|
// Path may not exist yet; use the absolute path
|
|
resolved = absPath
|
|
}
|
|
for _, dir := range allowedDirs {
|
|
absDir, err := filepath.Abs(dir)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(resolved, absDir+string(filepath.Separator)) || resolved == absDir {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// WorkerCMD starts a generic worker process for distributed mode.
|
|
// Workers are backend-agnostic — they wait for backend.install NATS events
|
|
// from the SmartRouter to install and start the required backend.
|
|
//
|
|
// NATS is required. The worker acts as a process supervisor:
|
|
// - Receives backend.install → installs backend from gallery, starts gRPC process, replies success
|
|
// - Receives backend.stop → stops the gRPC process
|
|
// - Receives stop → full shutdown (deregister + exit)
|
|
//
|
|
// Model loading (LoadModel) is always via direct gRPC — no NATS needed for that.
|
|
type WorkerCMD struct {
|
|
// Primary address — the reachable address of this worker.
|
|
// Host is used for advertise, port is the base for gRPC backends.
|
|
// HTTP file transfer runs on port-1.
|
|
Addr string `env:"LOCALAI_ADDR" help:"Address where this worker is reachable (host:port). Port is base for gRPC backends, port-1 for HTTP." group:"server"`
|
|
ServeAddr string `env:"LOCALAI_SERVE_ADDR" default:"0.0.0.0:50051" help:"(Advanced) gRPC base port bind address" group:"server" hidden:""`
|
|
|
|
BackendsPath string `env:"LOCALAI_BACKENDS_PATH,BACKENDS_PATH" type:"path" default:"${basepath}/backends" help:"Path containing backends" group:"server"`
|
|
BackendsSystemPath string `env:"LOCALAI_BACKENDS_SYSTEM_PATH" type:"path" default:"/var/lib/local-ai/backends" help:"Path containing system backends" group:"server"`
|
|
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"server" default:"${backends}"`
|
|
ModelsPath string `env:"LOCALAI_MODELS_PATH,MODELS_PATH" type:"path" default:"${basepath}/models" help:"Path containing models" group:"server"`
|
|
|
|
// HTTP file transfer
|
|
HTTPAddr string `env:"LOCALAI_HTTP_ADDR" default:"" help:"HTTP file transfer server address (default: gRPC port + 1)" group:"server" hidden:""`
|
|
AdvertiseHTTPAddr string `env:"LOCALAI_ADVERTISE_HTTP_ADDR" help:"HTTP address the frontend uses to reach this node for file transfer" group:"server" hidden:""`
|
|
|
|
// Registration (required)
|
|
AdvertiseAddr string `env:"LOCALAI_ADVERTISE_ADDR" help:"Address the frontend uses to reach this node (defaults to hostname:port from Addr)" group:"registration" hidden:""`
|
|
RegisterTo string `env:"LOCALAI_REGISTER_TO" required:"" help:"Frontend URL for registration" group:"registration"`
|
|
NodeName string `env:"LOCALAI_NODE_NAME" help:"Node name for registration (defaults to hostname)" group:"registration"`
|
|
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token for authenticating with the frontend" group:"registration"`
|
|
HeartbeatInterval string `env:"LOCALAI_HEARTBEAT_INTERVAL" default:"10s" help:"Interval between heartbeats" group:"registration"`
|
|
NodeLabels string `env:"LOCALAI_NODE_LABELS" help:"Comma-separated key=value labels for this node (e.g. tier=fast,gpu=a100)" group:"registration"`
|
|
|
|
// NATS (required)
|
|
NatsURL string `env:"LOCALAI_NATS_URL" required:"" help:"NATS server URL" group:"distributed"`
|
|
|
|
// S3 storage for distributed file transfer
|
|
StorageURL string `env:"LOCALAI_STORAGE_URL" help:"S3 endpoint URL" group:"distributed"`
|
|
StorageBucket string `env:"LOCALAI_STORAGE_BUCKET" help:"S3 bucket name" group:"distributed"`
|
|
StorageRegion string `env:"LOCALAI_STORAGE_REGION" help:"S3 region" group:"distributed"`
|
|
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key" group:"distributed"`
|
|
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret key" group:"distributed"`
|
|
}
|
|
|
|
func (cmd *WorkerCMD) Run(ctx *cliContext.Context) error {
|
|
xlog.Info("Starting worker", "advertise", cmd.advertiseAddr(), "basePort", cmd.effectiveBasePort())
|
|
|
|
systemState, err := system.GetSystemState(
|
|
system.WithModelPath(cmd.ModelsPath),
|
|
system.WithBackendPath(cmd.BackendsPath),
|
|
system.WithBackendSystemPath(cmd.BackendsSystemPath),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("getting system state: %w", err)
|
|
}
|
|
|
|
ml := model.NewModelLoader(systemState)
|
|
ml.SetBackendLoggingEnabled(true)
|
|
|
|
// Register already-installed backends
|
|
gallery.RegisterBackends(systemState, ml)
|
|
|
|
// Parse galleries config
|
|
var galleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(cmd.BackendGalleries), &galleries); err != nil {
|
|
xlog.Warn("Failed to parse backend galleries", "error", err)
|
|
}
|
|
|
|
// Self-registration with frontend (with retry)
|
|
regClient := &workerregistry.RegistrationClient{
|
|
FrontendURL: cmd.RegisterTo,
|
|
RegistrationToken: cmd.RegistrationToken,
|
|
}
|
|
|
|
registrationBody := cmd.registrationBody()
|
|
nodeID, _, err := regClient.RegisterWithRetry(context.Background(), registrationBody, 10)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register with frontend: %w", err)
|
|
}
|
|
|
|
xlog.Info("Registered with frontend", "nodeID", nodeID, "frontend", cmd.RegisterTo)
|
|
heartbeatInterval, err := time.ParseDuration(cmd.HeartbeatInterval)
|
|
if err != nil && cmd.HeartbeatInterval != "" {
|
|
xlog.Warn("invalid heartbeat interval, using default 10s", "input", cmd.HeartbeatInterval, "error", err)
|
|
}
|
|
heartbeatInterval = cmp.Or(heartbeatInterval, 10*time.Second)
|
|
// Context cancelled on shutdown — used by heartbeat and other background goroutines
|
|
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
|
|
defer shutdownCancel()
|
|
|
|
// Start HTTP file transfer server
|
|
httpAddr := cmd.resolveHTTPAddr()
|
|
stagingDir := filepath.Join(cmd.ModelsPath, "..", "staging")
|
|
dataDir := filepath.Join(cmd.ModelsPath, "..", "data")
|
|
httpServer, err := nodes.StartFileTransferServer(httpAddr, stagingDir, cmd.ModelsPath, dataDir, cmd.RegistrationToken, config.DefaultMaxUploadSize, ml.BackendLogs())
|
|
if err != nil {
|
|
return fmt.Errorf("starting HTTP file transfer server: %w", err)
|
|
}
|
|
|
|
// Connect to NATS
|
|
xlog.Info("Connecting to NATS", "url", sanitize.URL(cmd.NatsURL))
|
|
natsClient, err := messaging.New(cmd.NatsURL)
|
|
if err != nil {
|
|
nodes.ShutdownFileTransferServer(httpServer)
|
|
return fmt.Errorf("connecting to NATS: %w", err)
|
|
}
|
|
defer natsClient.Close()
|
|
|
|
// Start heartbeat goroutine (after NATS is connected so IsConnected check works)
|
|
go func() {
|
|
ticker := time.NewTicker(heartbeatInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-shutdownCtx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if !natsClient.IsConnected() {
|
|
xlog.Warn("Skipping heartbeat: NATS disconnected")
|
|
continue
|
|
}
|
|
body := cmd.heartbeatBody()
|
|
if err := regClient.Heartbeat(shutdownCtx, nodeID, body); err != nil {
|
|
xlog.Warn("Heartbeat failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Process supervisor — manages multiple backend gRPC processes on different ports
|
|
basePort := cmd.effectiveBasePort()
|
|
// Buffered so NATS stop handler can send without blocking
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Set the registration token once before any backends are started
|
|
if cmd.RegistrationToken != "" {
|
|
os.Setenv(grpc.AuthTokenEnvVar, cmd.RegistrationToken)
|
|
}
|
|
|
|
supervisor := &backendSupervisor{
|
|
cmd: cmd,
|
|
ml: ml,
|
|
systemState: systemState,
|
|
galleries: galleries,
|
|
nodeID: nodeID,
|
|
nats: natsClient,
|
|
sigCh: sigCh,
|
|
processes: make(map[string]*backendProcess),
|
|
nextPort: basePort,
|
|
}
|
|
supervisor.subscribeLifecycleEvents()
|
|
|
|
// Subscribe to file staging NATS subjects if S3 is configured
|
|
if cmd.StorageURL != "" {
|
|
if err := cmd.subscribeFileStaging(natsClient, nodeID); err != nil {
|
|
xlog.Error("Failed to subscribe to file staging subjects", "error", err)
|
|
}
|
|
}
|
|
|
|
xlog.Info("Worker ready, waiting for backend.install events")
|
|
<-sigCh
|
|
|
|
xlog.Info("Shutting down worker")
|
|
shutdownCancel() // stop heartbeat loop immediately
|
|
regClient.GracefulDeregister(nodeID)
|
|
supervisor.stopAllBackends()
|
|
nodes.ShutdownFileTransferServer(httpServer)
|
|
return nil
|
|
}
|
|
|
|
// subscribeFileStaging subscribes to NATS file staging subjects for this node.
|
|
func (cmd *WorkerCMD) subscribeFileStaging(natsClient messaging.MessagingClient, nodeID string) error {
|
|
// Create FileManager with same S3 config as the frontend
|
|
// TODO: propagate a caller-provided context once WorkerCMD carries one
|
|
s3Store, err := storage.NewS3Store(context.Background(), storage.S3Config{
|
|
Endpoint: cmd.StorageURL,
|
|
Region: cmd.StorageRegion,
|
|
Bucket: cmd.StorageBucket,
|
|
AccessKeyID: cmd.StorageAccessKey,
|
|
SecretAccessKey: cmd.StorageSecretKey,
|
|
ForcePathStyle: true,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("initializing S3 store: %w", err)
|
|
}
|
|
|
|
cacheDir := filepath.Join(cmd.ModelsPath, "..", "cache")
|
|
fm, err := storage.NewFileManager(s3Store, cacheDir)
|
|
if err != nil {
|
|
return fmt.Errorf("initializing file manager: %w", err)
|
|
}
|
|
|
|
// Subscribe: files.ensure — download S3 key to local, reply with local path
|
|
natsClient.SubscribeReply(messaging.SubjectNodeFilesEnsure(nodeID), func(data []byte, reply func([]byte)) {
|
|
var req struct {
|
|
Key string `json:"key"`
|
|
}
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
replyJSON(reply, map[string]string{"error": "invalid request"})
|
|
return
|
|
}
|
|
|
|
localPath, err := fm.Download(context.Background(), req.Key)
|
|
if err != nil {
|
|
xlog.Error("File ensure failed", "key", req.Key, "error", err)
|
|
replyJSON(reply, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
xlog.Debug("File ensured locally", "key", req.Key, "path", localPath)
|
|
replyJSON(reply, map[string]string{"local_path": localPath})
|
|
})
|
|
|
|
// Subscribe: files.stage — upload local path to S3, reply with key
|
|
natsClient.SubscribeReply(messaging.SubjectNodeFilesStage(nodeID), func(data []byte, reply func([]byte)) {
|
|
var req struct {
|
|
LocalPath string `json:"local_path"`
|
|
Key string `json:"key"`
|
|
}
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
replyJSON(reply, map[string]string{"error": "invalid request"})
|
|
return
|
|
}
|
|
|
|
allowedDirs := []string{cacheDir}
|
|
if cmd.ModelsPath != "" {
|
|
allowedDirs = append(allowedDirs, cmd.ModelsPath)
|
|
}
|
|
if !isPathAllowed(req.LocalPath, allowedDirs) {
|
|
replyJSON(reply, map[string]string{"error": "path outside allowed directories"})
|
|
return
|
|
}
|
|
|
|
if err := fm.Upload(context.Background(), req.Key, req.LocalPath); err != nil {
|
|
xlog.Error("File stage failed", "path", req.LocalPath, "key", req.Key, "error", err)
|
|
replyJSON(reply, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
xlog.Debug("File staged to S3", "path", req.LocalPath, "key", req.Key)
|
|
replyJSON(reply, map[string]string{"key": req.Key})
|
|
})
|
|
|
|
// Subscribe: files.temp — allocate temp file, reply with local path
|
|
natsClient.SubscribeReply(messaging.SubjectNodeFilesTemp(nodeID), func(data []byte, reply func([]byte)) {
|
|
tmpDir := filepath.Join(cacheDir, "staging-tmp")
|
|
if err := os.MkdirAll(tmpDir, 0750); err != nil {
|
|
replyJSON(reply, map[string]string{"error": fmt.Sprintf("creating temp dir: %v", err)})
|
|
return
|
|
}
|
|
|
|
f, err := os.CreateTemp(tmpDir, "localai-staging-*.tmp")
|
|
if err != nil {
|
|
replyJSON(reply, map[string]string{"error": fmt.Sprintf("creating temp file: %v", err)})
|
|
return
|
|
}
|
|
localPath := f.Name()
|
|
f.Close()
|
|
|
|
xlog.Debug("Allocated temp file", "path", localPath)
|
|
replyJSON(reply, map[string]string{"local_path": localPath})
|
|
})
|
|
|
|
// Subscribe: files.listdir — list files in a local directory, reply with relative paths
|
|
natsClient.SubscribeReply(messaging.SubjectNodeFilesListDir(nodeID), func(data []byte, reply func([]byte)) {
|
|
var req struct {
|
|
KeyPrefix string `json:"key_prefix"`
|
|
}
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
replyJSON(reply, map[string]any{"error": "invalid request"})
|
|
return
|
|
}
|
|
|
|
// Resolve key prefix to local directory
|
|
dirPath := filepath.Join(cacheDir, req.KeyPrefix)
|
|
if rel, ok := strings.CutPrefix(req.KeyPrefix, storage.ModelKeyPrefix); ok && cmd.ModelsPath != "" {
|
|
dirPath = filepath.Join(cmd.ModelsPath, rel)
|
|
} else if rel, ok := strings.CutPrefix(req.KeyPrefix, storage.DataKeyPrefix); ok {
|
|
dirPath = filepath.Join(cacheDir, "..", "data", rel)
|
|
}
|
|
|
|
// Sanitize to prevent directory traversal via crafted key_prefix
|
|
dirPath = filepath.Clean(dirPath)
|
|
cleanCache := filepath.Clean(cacheDir)
|
|
cleanModels := filepath.Clean(cmd.ModelsPath)
|
|
cleanData := filepath.Clean(filepath.Join(cacheDir, "..", "data"))
|
|
if !(strings.HasPrefix(dirPath, cleanCache+string(filepath.Separator)) ||
|
|
dirPath == cleanCache ||
|
|
(cleanModels != "." && strings.HasPrefix(dirPath, cleanModels+string(filepath.Separator))) ||
|
|
dirPath == cleanModels ||
|
|
strings.HasPrefix(dirPath, cleanData+string(filepath.Separator)) ||
|
|
dirPath == cleanData) {
|
|
replyJSON(reply, map[string]any{"error": "invalid key prefix"})
|
|
return
|
|
}
|
|
|
|
var files []string
|
|
filepath.WalkDir(dirPath, func(path string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if !d.IsDir() {
|
|
rel, err := filepath.Rel(dirPath, path)
|
|
if err == nil {
|
|
files = append(files, rel)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
xlog.Debug("Listed remote dir", "keyPrefix", req.KeyPrefix, "dirPath", dirPath, "fileCount", len(files))
|
|
replyJSON(reply, map[string]any{"files": files})
|
|
})
|
|
|
|
xlog.Info("Subscribed to file staging NATS subjects", "nodeID", nodeID)
|
|
return nil
|
|
}
|
|
|
|
// replyJSON marshals v to JSON and calls the reply function.
|
|
func replyJSON(reply func([]byte), v any) {
|
|
data, err := json.Marshal(v)
|
|
if err != nil {
|
|
xlog.Error("Failed to marshal NATS reply", "error", err)
|
|
data = []byte(`{"error":"internal marshal error"}`)
|
|
}
|
|
reply(data)
|
|
}
|
|
|
|
// backendProcess represents a single gRPC backend process.
|
|
type backendProcess struct {
|
|
proc *process.Process
|
|
backend string
|
|
addr string // gRPC address (host:port)
|
|
}
|
|
|
|
// backendSupervisor manages multiple backend gRPC processes on different ports.
|
|
// Each backend type (e.g., llama-cpp, bert-embeddings) gets its own process and port.
|
|
type backendSupervisor struct {
|
|
cmd *WorkerCMD
|
|
ml *model.ModelLoader
|
|
systemState *system.SystemState
|
|
galleries []config.Gallery
|
|
nodeID string
|
|
nats messaging.MessagingClient
|
|
sigCh chan<- os.Signal // send shutdown signal instead of os.Exit
|
|
|
|
mu sync.Mutex
|
|
processes map[string]*backendProcess // key: backend name
|
|
nextPort int // next available port for new backends
|
|
freePorts []int // ports freed by stopBackend, reused before nextPort
|
|
}
|
|
|
|
// startBackend starts a gRPC backend process on a dynamically allocated port.
|
|
// Returns the gRPC address.
|
|
func (s *backendSupervisor) startBackend(backend, backendPath string) (string, error) {
|
|
s.mu.Lock()
|
|
|
|
// Already running?
|
|
if bp, ok := s.processes[backend]; ok {
|
|
if bp.proc != nil && bp.proc.IsAlive() {
|
|
s.mu.Unlock()
|
|
return bp.addr, nil
|
|
}
|
|
// Process died — clean up and restart
|
|
xlog.Warn("Backend process died unexpectedly, restarting", "backend", backend)
|
|
delete(s.processes, backend)
|
|
}
|
|
|
|
// Allocate port — recycle freed ports first, then grow upward from basePort
|
|
var port int
|
|
if len(s.freePorts) > 0 {
|
|
port = s.freePorts[len(s.freePorts)-1]
|
|
s.freePorts = s.freePorts[:len(s.freePorts)-1]
|
|
} else {
|
|
port = s.nextPort
|
|
s.nextPort++
|
|
}
|
|
bindAddr := fmt.Sprintf("0.0.0.0:%d", port)
|
|
clientAddr := fmt.Sprintf("127.0.0.1:%d", port)
|
|
|
|
proc, err := s.ml.StartProcess(backendPath, backend, bindAddr)
|
|
if err != nil {
|
|
s.mu.Unlock()
|
|
return "", fmt.Errorf("starting backend process: %w", err)
|
|
}
|
|
|
|
s.processes[backend] = &backendProcess{
|
|
proc: proc,
|
|
backend: backend,
|
|
addr: clientAddr,
|
|
}
|
|
xlog.Info("Backend process started", "backend", backend, "addr", clientAddr)
|
|
|
|
// Capture reference before unlocking for race-safe health check.
|
|
// Another goroutine could stopBackend and recycle the port while we poll.
|
|
bp := s.processes[backend]
|
|
s.mu.Unlock()
|
|
|
|
// Wait for the gRPC server to be ready
|
|
client := grpc.NewClientWithToken(clientAddr, false, nil, false, s.cmd.RegistrationToken)
|
|
for range 20 {
|
|
time.Sleep(200 * time.Millisecond)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
if ok, _ := client.HealthCheck(ctx); ok {
|
|
cancel()
|
|
// Verify the process wasn't stopped/replaced while health-checking
|
|
s.mu.Lock()
|
|
currentBP, exists := s.processes[backend]
|
|
s.mu.Unlock()
|
|
if !exists || currentBP != bp {
|
|
return "", fmt.Errorf("backend %s was stopped during startup", backend)
|
|
}
|
|
xlog.Debug("Backend gRPC server is ready", "backend", backend, "addr", clientAddr)
|
|
return clientAddr, nil
|
|
}
|
|
cancel()
|
|
|
|
// Check if the process died (e.g. OOM, CUDA error, missing libs)
|
|
if !proc.IsAlive() {
|
|
stderrTail := readLastLinesFromFile(proc.StderrPath(), 20)
|
|
xlog.Warn("Backend process died during startup", "backend", backend, "stderr", stderrTail)
|
|
s.mu.Lock()
|
|
delete(s.processes, backend)
|
|
s.freePorts = append(s.freePorts, port)
|
|
s.mu.Unlock()
|
|
return "", fmt.Errorf("backend process %s died during startup. Last stderr:\n%s", backend, stderrTail)
|
|
}
|
|
}
|
|
|
|
// Log stderr to help diagnose why the backend isn't responding
|
|
stderrTail := readLastLinesFromFile(proc.StderrPath(), 20)
|
|
xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr, "stderr", stderrTail)
|
|
return clientAddr, nil
|
|
}
|
|
|
|
// stopBackend stops a specific backend's gRPC process.
|
|
func (s *backendSupervisor) stopBackend(backend string) {
|
|
s.mu.Lock()
|
|
bp, ok := s.processes[backend]
|
|
if !ok || bp.proc == nil {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
// Clean up map and recycle port while holding lock
|
|
delete(s.processes, backend)
|
|
if _, portStr, err := net.SplitHostPort(bp.addr); err == nil {
|
|
if p, err := strconv.Atoi(portStr); err == nil {
|
|
s.freePorts = append(s.freePorts, p)
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
// Network I/O outside the lock
|
|
client := grpc.NewClientWithToken(bp.addr, false, nil, false, s.cmd.RegistrationToken)
|
|
xlog.Debug("Calling Free() before stopping backend", "backend", backend)
|
|
if err := client.Free(context.Background()); err != nil {
|
|
xlog.Warn("Free() failed (best-effort)", "backend", backend, "error", err)
|
|
}
|
|
|
|
xlog.Info("Stopping backend process", "backend", backend, "addr", bp.addr)
|
|
if err := bp.proc.Stop(); err != nil {
|
|
xlog.Error("Error stopping backend process", "backend", backend, "error", err)
|
|
}
|
|
}
|
|
|
|
// stopAllBackends stops all running backend processes.
|
|
func (s *backendSupervisor) stopAllBackends() {
|
|
s.mu.Lock()
|
|
backends := slices.Collect(maps.Keys(s.processes))
|
|
s.mu.Unlock()
|
|
|
|
for _, b := range backends {
|
|
s.stopBackend(b)
|
|
}
|
|
}
|
|
|
|
// readLastLinesFromFile reads the last n lines from a file.
|
|
// Returns an empty string if the file cannot be read.
|
|
func readLastLinesFromFile(path string, n int) string {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n")
|
|
if len(lines) > n {
|
|
lines = lines[len(lines)-n:]
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
}
|
|
|
|
// isRunning returns whether a specific backend process is currently running.
|
|
func (s *backendSupervisor) isRunning(backend string) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
bp, ok := s.processes[backend]
|
|
return ok && bp.proc != nil && bp.proc.IsAlive()
|
|
}
|
|
|
|
// getAddr returns the gRPC address for a running backend, or empty string.
|
|
func (s *backendSupervisor) getAddr(backend string) string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if bp, ok := s.processes[backend]; ok {
|
|
return bp.addr
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// installBackend handles the backend.install flow:
|
|
// 1. If already running for this model, return existing address
|
|
// 2. Install backend from gallery (if not already installed)
|
|
// 3. Find backend binary
|
|
// 4. Start gRPC process on a new port
|
|
// Returns the gRPC address of the backend process.
|
|
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest) (string, error) {
|
|
// Process key: use ModelID if provided (per-model process), else backend name
|
|
processKey := req.ModelID
|
|
if processKey == "" {
|
|
processKey = req.Backend
|
|
}
|
|
|
|
// If already running for this model, return its address
|
|
if addr := s.getAddr(processKey); addr != "" {
|
|
xlog.Info("Backend already running for model", "backend", req.Backend, "model", req.ModelID, "addr", addr)
|
|
return addr, nil
|
|
}
|
|
|
|
// Parse galleries from request (override local config if provided)
|
|
galleries := s.galleries
|
|
if req.BackendGalleries != "" {
|
|
var reqGalleries []config.Gallery
|
|
if err := json.Unmarshal([]byte(req.BackendGalleries), &reqGalleries); err == nil {
|
|
galleries = reqGalleries
|
|
}
|
|
}
|
|
|
|
// Try to find the backend binary
|
|
backendPath := s.findBackend(req.Backend)
|
|
if backendPath == "" {
|
|
// Backend not found locally — try auto-installing from gallery
|
|
xlog.Info("Backend not found locally, attempting gallery install", "backend", req.Backend)
|
|
if err := gallery.InstallBackendFromGallery(
|
|
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, false,
|
|
); err != nil {
|
|
return "", fmt.Errorf("installing backend from gallery: %w", err)
|
|
}
|
|
// Re-register after install and retry
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
backendPath = s.findBackend(req.Backend)
|
|
}
|
|
|
|
if backendPath == "" {
|
|
return "", fmt.Errorf("backend %q not found after install attempt", req.Backend)
|
|
}
|
|
|
|
xlog.Info("Found backend binary", "path", backendPath, "processKey", processKey)
|
|
|
|
// Start the gRPC process on a new port (keyed by model, not just backend)
|
|
return s.startBackend(processKey, backendPath)
|
|
}
|
|
|
|
// findBackend looks for the backend binary in the backends path and system path.
|
|
func (s *backendSupervisor) findBackend(backend string) string {
|
|
candidates := []string{
|
|
filepath.Join(s.cmd.BackendsPath, backend),
|
|
filepath.Join(s.cmd.BackendsPath, backend, backend),
|
|
filepath.Join(s.cmd.BackendsSystemPath, backend),
|
|
filepath.Join(s.cmd.BackendsSystemPath, backend, backend),
|
|
}
|
|
if uri := s.ml.GetExternalBackend(backend); uri != "" {
|
|
if fi, err := os.Stat(uri); err == nil && !fi.IsDir() {
|
|
return uri
|
|
}
|
|
}
|
|
for _, path := range candidates {
|
|
fi, err := os.Stat(path)
|
|
if err == nil && !fi.IsDir() {
|
|
return path
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// subscribeLifecycleEvents subscribes to NATS backend lifecycle events.
|
|
func (s *backendSupervisor) subscribeLifecycleEvents() {
|
|
// backend.install — install backend + start gRPC process (request-reply)
|
|
s.nats.SubscribeReply(messaging.SubjectNodeBackendInstall(s.nodeID), func(data []byte, reply func([]byte)) {
|
|
xlog.Info("Received NATS backend.install event")
|
|
var req messaging.BackendInstallRequest
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
resp := messaging.BackendInstallReply{Success: false, Error: fmt.Sprintf("invalid request: %v", err)}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
|
|
addr, err := s.installBackend(req)
|
|
if err != nil {
|
|
xlog.Error("Failed to install backend via NATS", "error", err)
|
|
resp := messaging.BackendInstallReply{Success: false, Error: err.Error()}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
|
|
// Return the gRPC address so the router knows which port to use
|
|
advertiseAddr := addr
|
|
advAddr := s.cmd.advertiseAddr()
|
|
if advAddr != addr { // only remap if advertise differs from bind
|
|
_, port, _ := net.SplitHostPort(addr)
|
|
advertiseHost, _, _ := net.SplitHostPort(advAddr)
|
|
advertiseAddr = net.JoinHostPort(advertiseHost, port)
|
|
}
|
|
resp := messaging.BackendInstallReply{Success: true, Address: advertiseAddr}
|
|
replyJSON(reply, resp)
|
|
})
|
|
|
|
// backend.stop — stop a specific backend process
|
|
s.nats.Subscribe(messaging.SubjectNodeBackendStop(s.nodeID), func(data []byte) {
|
|
// Try to parse backend name from payload; if empty, stop all
|
|
var req struct {
|
|
Backend string `json:"backend"`
|
|
}
|
|
if json.Unmarshal(data, &req) == nil && req.Backend != "" {
|
|
xlog.Info("Received NATS backend.stop event", "backend", req.Backend)
|
|
s.stopBackend(req.Backend)
|
|
} else {
|
|
xlog.Info("Received NATS backend.stop event (all)")
|
|
s.stopAllBackends()
|
|
}
|
|
})
|
|
|
|
// backend.delete — stop backend + delete files (request-reply)
|
|
s.nats.SubscribeReply(messaging.SubjectNodeBackendDelete(s.nodeID), func(data []byte, reply func([]byte)) {
|
|
var req messaging.BackendDeleteRequest
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
resp := messaging.BackendDeleteReply{Success: false, Error: fmt.Sprintf("invalid request: %v", err)}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
xlog.Info("Received NATS backend.delete event", "backend", req.Backend)
|
|
|
|
// Stop if running this backend
|
|
if s.isRunning(req.Backend) {
|
|
s.stopBackend(req.Backend)
|
|
}
|
|
|
|
// Delete the backend files
|
|
if err := gallery.DeleteBackendFromSystem(s.systemState, req.Backend); err != nil {
|
|
xlog.Warn("Failed to delete backend files", "backend", req.Backend, "error", err)
|
|
resp := messaging.BackendDeleteReply{Success: false, Error: err.Error()}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
|
|
// Re-register backends after deletion
|
|
gallery.RegisterBackends(s.systemState, s.ml)
|
|
|
|
resp := messaging.BackendDeleteReply{Success: true}
|
|
replyJSON(reply, resp)
|
|
})
|
|
|
|
// backend.list — list installed backends (request-reply)
|
|
s.nats.SubscribeReply(messaging.SubjectNodeBackendList(s.nodeID), func(data []byte, reply func([]byte)) {
|
|
xlog.Info("Received NATS backend.list event")
|
|
backends, err := gallery.ListSystemBackends(s.systemState)
|
|
if err != nil {
|
|
resp := messaging.BackendListReply{Error: err.Error()}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
|
|
var infos []messaging.NodeBackendInfo
|
|
for name, b := range backends {
|
|
info := messaging.NodeBackendInfo{
|
|
Name: name,
|
|
IsSystem: b.IsSystem,
|
|
IsMeta: b.IsMeta,
|
|
}
|
|
if b.Metadata != nil {
|
|
info.InstalledAt = b.Metadata.InstalledAt
|
|
info.GalleryURL = b.Metadata.GalleryURL
|
|
info.Version = b.Metadata.Version
|
|
info.URI = b.Metadata.URI
|
|
info.Digest = b.Metadata.Digest
|
|
}
|
|
infos = append(infos, info)
|
|
}
|
|
|
|
resp := messaging.BackendListReply{Backends: infos}
|
|
replyJSON(reply, resp)
|
|
})
|
|
|
|
// model.unload — call gRPC Free() to release GPU memory (request-reply)
|
|
s.nats.SubscribeReply(messaging.SubjectNodeModelUnload(s.nodeID), func(data []byte, reply func([]byte)) {
|
|
xlog.Info("Received NATS model.unload event")
|
|
var req messaging.ModelUnloadRequest
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
resp := messaging.ModelUnloadReply{Success: false, Error: fmt.Sprintf("invalid request: %v", err)}
|
|
replyJSON(reply, resp)
|
|
return
|
|
}
|
|
|
|
// Find the backend address for this model's backend type
|
|
// The request includes an Address field if the router knows which process to target
|
|
targetAddr := req.Address
|
|
if targetAddr == "" {
|
|
// Fallback: try all running backends
|
|
s.mu.Lock()
|
|
for _, bp := range s.processes {
|
|
targetAddr = bp.addr
|
|
break
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
if targetAddr != "" {
|
|
// Best-effort gRPC Free()
|
|
client := grpc.NewClientWithToken(targetAddr, false, nil, false, s.cmd.RegistrationToken)
|
|
if err := client.Free(context.Background()); err != nil {
|
|
xlog.Warn("Free() failed during model.unload", "error", err, "addr", targetAddr)
|
|
}
|
|
}
|
|
|
|
resp := messaging.ModelUnloadReply{Success: true}
|
|
replyJSON(reply, resp)
|
|
})
|
|
|
|
// model.delete — remove model files from disk (request-reply)
|
|
s.nats.SubscribeReply(messaging.SubjectNodeModelDelete(s.nodeID), func(data []byte, reply func([]byte)) {
|
|
xlog.Info("Received NATS model.delete event")
|
|
var req messaging.ModelDeleteRequest
|
|
if err := json.Unmarshal(data, &req); err != nil {
|
|
replyJSON(reply, messaging.ModelDeleteReply{Success: false, Error: "invalid request"})
|
|
return
|
|
}
|
|
|
|
if err := gallery.DeleteStagedModelFiles(s.cmd.ModelsPath, req.ModelName); err != nil {
|
|
xlog.Warn("Failed to delete model files", "model", req.ModelName, "error", err)
|
|
replyJSON(reply, messaging.ModelDeleteReply{Success: false, Error: err.Error()})
|
|
return
|
|
}
|
|
|
|
replyJSON(reply, messaging.ModelDeleteReply{Success: true})
|
|
})
|
|
|
|
// stop — trigger the normal shutdown path via sigCh so deferred cleanup runs
|
|
s.nats.Subscribe(messaging.SubjectNodeStop(s.nodeID), func(data []byte) {
|
|
xlog.Info("Received NATS stop event — signaling shutdown")
|
|
select {
|
|
case s.sigCh <- syscall.SIGTERM:
|
|
default:
|
|
xlog.Debug("Shutdown already signaled, ignoring duplicate stop")
|
|
}
|
|
})
|
|
}
|
|
|
|
// effectiveBasePort returns the port used as base for gRPC backend processes.
|
|
// Priority: Addr port → ServeAddr port → 50051
|
|
func (cmd *WorkerCMD) effectiveBasePort() int {
|
|
for _, addr := range []string{cmd.Addr, cmd.ServeAddr} {
|
|
if addr != "" {
|
|
if _, portStr, ok := strings.Cut(addr, ":"); ok {
|
|
if p, _ := strconv.Atoi(portStr); p > 0 {
|
|
return p
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return 50051
|
|
}
|
|
|
|
// advertiseAddr returns the address the frontend should use to reach this node.
|
|
func (cmd *WorkerCMD) advertiseAddr() string {
|
|
if cmd.AdvertiseAddr != "" {
|
|
return cmd.AdvertiseAddr
|
|
}
|
|
if cmd.Addr != "" {
|
|
return cmd.Addr
|
|
}
|
|
hostname, _ := os.Hostname()
|
|
return fmt.Sprintf("%s:%d", cmp.Or(hostname, "localhost"), cmd.effectiveBasePort())
|
|
}
|
|
|
|
// resolveHTTPAddr returns the address to bind the HTTP file transfer server to.
|
|
// Uses basePort-1 so it doesn't conflict with dynamically allocated gRPC ports
|
|
// which grow upward from basePort.
|
|
func (cmd *WorkerCMD) resolveHTTPAddr() string {
|
|
if cmd.HTTPAddr != "" {
|
|
return cmd.HTTPAddr
|
|
}
|
|
return fmt.Sprintf("0.0.0.0:%d", cmd.effectiveBasePort()-1)
|
|
}
|
|
|
|
// advertiseHTTPAddr returns the HTTP address the frontend should use to reach
|
|
// this node for file transfer.
|
|
func (cmd *WorkerCMD) advertiseHTTPAddr() string {
|
|
if cmd.AdvertiseHTTPAddr != "" {
|
|
return cmd.AdvertiseHTTPAddr
|
|
}
|
|
advHost, _, _ := strings.Cut(cmd.advertiseAddr(), ":")
|
|
httpPort := cmd.effectiveBasePort() - 1
|
|
return fmt.Sprintf("%s:%d", advHost, httpPort)
|
|
}
|
|
|
|
// registrationBody builds the JSON body for node registration.
|
|
func (cmd *WorkerCMD) registrationBody() map[string]any {
|
|
nodeName := cmd.NodeName
|
|
if nodeName == "" {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
nodeName = fmt.Sprintf("node-%d", os.Getpid())
|
|
} else {
|
|
nodeName = hostname
|
|
}
|
|
}
|
|
|
|
// Detect GPU info for VRAM-aware scheduling
|
|
totalVRAM, _ := xsysinfo.TotalAvailableVRAM()
|
|
gpuVendor, _ := xsysinfo.DetectGPUVendor()
|
|
|
|
body := map[string]any{
|
|
"name": nodeName,
|
|
"address": cmd.advertiseAddr(),
|
|
"http_address": cmd.advertiseHTTPAddr(),
|
|
"total_vram": totalVRAM,
|
|
"available_vram": totalVRAM, // initially all VRAM is available
|
|
"gpu_vendor": gpuVendor,
|
|
}
|
|
|
|
// If no GPU detected, report system RAM so the scheduler/UI has capacity info
|
|
if totalVRAM == 0 {
|
|
if ramInfo, err := xsysinfo.GetSystemRAMInfo(); err == nil {
|
|
body["total_ram"] = ramInfo.Total
|
|
body["available_ram"] = ramInfo.Available
|
|
}
|
|
}
|
|
if cmd.RegistrationToken != "" {
|
|
body["token"] = cmd.RegistrationToken
|
|
}
|
|
|
|
// Parse and add static node labels
|
|
if cmd.NodeLabels != "" {
|
|
labels := make(map[string]string)
|
|
for _, pair := range strings.Split(cmd.NodeLabels, ",") {
|
|
pair = strings.TrimSpace(pair)
|
|
if k, v, ok := strings.Cut(pair, "="); ok {
|
|
labels[strings.TrimSpace(k)] = strings.TrimSpace(v)
|
|
}
|
|
}
|
|
if len(labels) > 0 {
|
|
body["labels"] = labels
|
|
}
|
|
}
|
|
|
|
return body
|
|
}
|
|
|
|
// heartbeatBody returns the current VRAM/RAM stats for heartbeat payloads.
|
|
func (cmd *WorkerCMD) heartbeatBody() map[string]any {
|
|
var availVRAM uint64
|
|
aggregate := xsysinfo.GetGPUAggregateInfo()
|
|
if aggregate.TotalVRAM > 0 {
|
|
availVRAM = aggregate.FreeVRAM
|
|
} else {
|
|
// Fallback: report total as available (no usage tracking possible)
|
|
availVRAM, _ = xsysinfo.TotalAvailableVRAM()
|
|
}
|
|
|
|
body := map[string]any{
|
|
"available_vram": availVRAM,
|
|
}
|
|
|
|
// If no GPU, report system RAM usage instead
|
|
if aggregate.TotalVRAM == 0 {
|
|
if ramInfo, err := xsysinfo.GetSystemRAMInfo(); err == nil {
|
|
body["available_ram"] = ramInfo.Available
|
|
}
|
|
}
|
|
return body
|
|
}
|