Files
LocalAI/core/services/messaging/subjects.go
LocalAI [bot] 569d9bbd9e fix(distributed): broadcast file-staging progress across replicas (#10440)
File-staging progress lived only in the SmartRouter's in-memory
StagingTracker on the replica performing the transfer. In a multi-replica
deployment behind a round-robin load balancer, a /api/operations poll
that lands on any other replica saw no staging row, so the progress
("processing file ... Total ... Current ...") flickered in and out as
polls rotated between frontends.

Mirror the pattern already used for gallery-install progress: the origin
replica broadcasts staging ticks over NATS (SubjectStagingProgress, a
new staging.<model>.progress subject), and peers merge them via
ApplyRemote (SubscribeBroadcasts on the wildcard). Byte-level ticks are
leading-edge debounced (~1/s); Start/FileComplete/Complete always
publish. A locally-owned op stays authoritative so the origin's own echo
and stray peer events can't clobber it, and mirrored remote ops expire
after a TTL so a missed Done event can't leave a phantom row. The UI read
path (StagingTracker.GetAll) is unchanged.


Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-22 09:28:07 +02:00

414 lines
18 KiB
Go

package messaging
import "strings"
// sanitizeSubjectToken replaces NATS-reserved characters in a subject token.
// NATS uses '.' as hierarchy delimiter and '*'/'>' as wildcards.
func sanitizeSubjectToken(s string) string {
r := strings.NewReplacer(".", "-", "*", "-", ">", "-", " ", "-", "\t", "-", "\n", "-")
return r.Replace(s)
}
// NATS subject constants for the distributed architecture.
// Following the notetaker pattern: <entity>.<action>
// Job Distribution (Queue Groups — load-balanced, one consumer gets each message)
const (
SubjectJobsNew = "jobs.new"
SubjectMCPCIJobsNew = "jobs.mcp-ci.new"
SubjectAgentExecute = "agent.execute"
QueueWorkers = "workers"
)
// Status Updates (Pub/Sub — all subscribers get every message, for SSE bridging)
// These use parameterized subjects: e.g. SubjectAgentEvents("myagent", "user1")
const (
subjectAgentEventsPrefix = "agent."
subjectJobProgressPrefix = "jobs."
subjectFineTunePrefix = "finetune."
subjectGalleryPrefix = "gallery."
)
// SubjectAgentEvents returns the NATS subject for agent SSE events.
func SubjectAgentEvents(agentName, userID string) string {
if userID == "" {
userID = "anonymous"
}
return subjectAgentEventsPrefix + sanitizeSubjectToken(agentName) + ".events." + sanitizeSubjectToken(userID)
}
// SubjectJobProgress returns the NATS subject for job progress updates.
func SubjectJobProgress(jobID string) string {
return subjectJobProgressPrefix + sanitizeSubjectToken(jobID) + ".progress"
}
// SubjectJobResult returns the NATS subject for the final job result (terminal state).
func SubjectJobResult(jobID string) string {
return subjectJobProgressPrefix + sanitizeSubjectToken(jobID) + ".result"
}
// MCP Tool Execution (Request-Reply via NATS — load-balanced across agent workers)
const (
SubjectMCPToolExecute = "mcp.tools.execute"
SubjectMCPDiscovery = "mcp.discovery"
QueueAgentWorkers = "agent-workers"
)
// SubjectFineTuneProgress returns the NATS subject for fine-tune progress.
func SubjectFineTuneProgress(jobID string) string {
return subjectFineTunePrefix + sanitizeSubjectToken(jobID) + ".progress"
}
// SubjectGalleryProgress returns the NATS subject for gallery download progress.
func SubjectGalleryProgress(opID string) string {
return subjectGalleryPrefix + sanitizeSubjectToken(opID) + ".progress"
}
// SubjectStagingProgress returns the NATS subject a frontend replica publishes
// file-staging progress on. Staging progress is otherwise per-process state
// (the SmartRouter's in-memory StagingTracker), so without this broadcast a
// /api/operations poll that round-robins onto a replica that did not originate
// the staging op sees nothing - the progress row flickers in multi-replica
// deployments. Peers subscribe to the wildcard and merge.
func SubjectStagingProgress(modelID string) string {
return subjectStagingPrefix + sanitizeSubjectToken(modelID) + ".progress"
}
const subjectStagingPrefix = "staging."
// SubjectStagingProgressWildcard matches every replica's staging-progress
// broadcasts so a peer can mirror staging ops it did not originate.
const SubjectStagingProgressWildcard = "staging.*.progress"
// SubjectGalleryOpStart and SubjectGalleryOpEnd are broadcast subjects for the
// in-memory OpCache lifecycle. Frontend replicas publish to these when an
// admin admits a new install/delete (Start) and when an operation is
// dismissed (End), so peer replicas can keep their OpCache in sync without
// hitting PostgreSQL on every UI poll.
const (
SubjectGalleryOpStart = "gallery.opcache.start"
SubjectGalleryOpEnd = "gallery.opcache.end"
)
// Control Signals (Pub/Sub — targeted cancellation)
const (
subjectJobCancelPrefix = "jobs."
subjectAgentCancelPrefix = "agent."
subjectFineTuneCancelPrefix = "finetune."
subjectGalleryCancelPrefix = "gallery."
)
// Wildcard subjects for NATS subscriptions that match all IDs.
const (
SubjectJobCancelWildcard = "jobs.*.cancel"
SubjectJobResultWildcard = "jobs.*.result"
SubjectJobProgressWildcard = "jobs.*.progress"
SubjectAgentCancelWildcard = "agent.*.cancel"
SubjectGalleryCancelWildcard = "gallery.*.cancel"
SubjectGalleryProgressWildcard = "gallery.*.progress"
)
// SubjectJobCancel returns the NATS subject to cancel a running job.
func SubjectJobCancel(jobID string) string {
return subjectJobCancelPrefix + sanitizeSubjectToken(jobID) + ".cancel"
}
// SubjectAgentCancel returns the NATS subject to cancel agent execution.
func SubjectAgentCancel(agentID string) string {
return subjectAgentCancelPrefix + sanitizeSubjectToken(agentID) + ".cancel"
}
// SubjectFineTuneCancel returns the NATS subject to stop fine-tuning.
func SubjectFineTuneCancel(jobID string) string {
return subjectFineTuneCancelPrefix + sanitizeSubjectToken(jobID) + ".cancel"
}
// SubjectGalleryCancel returns the NATS subject to cancel a gallery download.
func SubjectGalleryCancel(opID string) string {
return subjectGalleryCancelPrefix + sanitizeSubjectToken(opID) + ".cancel"
}
// Node Backend Lifecycle (Pub/Sub — targeted to specific nodes)
//
// These subjects control the backend *process* lifecycle on a serve-backend node,
// mirroring how the local ModelLoader uses startProcess() / deleteProcess().
//
// Model loading (LoadModel gRPC) is done via direct gRPC calls to the node's
// address — no NATS needed for that, same as local mode.
const (
subjectNodePrefix = "nodes."
)
// SubjectNodeBackendInstall tells a worker node to install a backend and start its gRPC process.
// Uses NATS request-reply: the SmartRouter sends the request, the worker installs
// the backend from gallery (if not already installed), starts the gRPC process,
// and replies when ready.
func SubjectNodeBackendInstall(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install"
}
// BackendInstallRequest is the payload for a backend.install NATS request.
type BackendInstallRequest struct {
Backend string `json:"backend"`
ModelID string `json:"model_id,omitempty"`
BackendGalleries string `json:"backend_galleries,omitempty"`
// URI is set for external installs (OCI image, URL, or path). When non-empty
// the worker routes to InstallExternalBackend instead of the gallery lookup.
URI string `json:"uri,omitempty"`
Name string `json:"name,omitempty"`
Alias string `json:"alias,omitempty"`
// ReplicaIndex selects which slot on the worker this load occupies, so two
// concurrent backend.install requests for the same model land on distinct
// gRPC processes and ports. Workers older than this field treat it as 0
// (single-replica behavior — no collision because the controller never
// asks for replica > 0 on a node whose MaxReplicasPerModel is 1).
ReplicaIndex int32 `json:"replica_index,omitempty"`
// Force is retained on the wire only for backward compatibility with
// pre-2026-05-08 masters that did not know about backend.upgrade. New
// callers MUST send to SubjectNodeBackendUpgrade instead. Workers continue
// to honor Force=true here so a rolling update with new master + old
// worker still works (the master's install fallback path also uses this
// when backend.upgrade returns nats.ErrNoResponders).
Force bool `json:"force,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is
// running, debounced to roughly 250ms. Empty means the caller is a
// reconciler-driven retry that does not need progress streamed.
OpID string `json:"op_id,omitempty"`
}
// BackendInstallReply is the response from a backend.install NATS request.
type BackendInstallReply struct {
Success bool `json:"success"`
Address string `json:"address,omitempty"` // gRPC address of the backend process (host:port)
Error string `json:"error,omitempty"`
}
// SubjectNodeBackendUpgrade tells a worker node to force-reinstall a backend
// from the gallery, stop every running process for that backend, and restart.
// Uses NATS request-reply with a long deadline (gallery image pulls can take
// many minutes on slow links). Routine model loads use SubjectNodeBackendInstall
// instead — this subject exists so the slow path doesn't head-of-line-block
// the fast one through a shared subscription goroutine.
func SubjectNodeBackendUpgrade(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.upgrade"
}
// BackendUpgradeRequest is the payload for a backend.upgrade NATS request.
// It is intentionally a strict subset of BackendInstallRequest — there is no
// Force field because the upgrade subject IS the force semantics; no ModelID
// because upgrade is backend-scoped (it stops every replica using the binary
// before re-installing). Per-replica restart happens on the next routine load.
type BackendUpgradeRequest struct {
Backend string `json:"backend"`
BackendGalleries string `json:"backend_galleries,omitempty"`
URI string `json:"uri,omitempty"`
Name string `json:"name,omitempty"`
Alias string `json:"alias,omitempty"`
// ReplicaIndex is informational — upgrade stops all replicas regardless,
// but the field lets future per-replica metadata (e.g. progress reporting
// scoped to a slot) ride the same wire without a v3 type.
ReplicaIndex int32 `json:"replica_index,omitempty"`
// OpID identifies the admin-side operation. When non-empty the worker
// publishes BackendInstallProgressEvent values to
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the force-reinstall
// runs, so the master can stream per-node progress for upgrades exactly as
// it already does for installs (an upgrade IS a force-reinstall, so the
// install-progress subject is reused rather than minting a new one — no new
// NATS permission or rolling-update compat surface). Empty on legacy callers.
OpID string `json:"op_id,omitempty"`
}
// BackendUpgradeReply mirrors BackendInstallReply minus Address — upgrade does
// not start a process, so there is no port to advertise. The subsequent
// routine load will re-bind via backend.install and learn the new address.
type BackendUpgradeReply struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// SubjectNodeBackendList queries a worker node for its installed backends.
// Uses NATS request-reply.
func SubjectNodeBackendList(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.list"
}
// BackendListRequest is the payload for a backend.list NATS request.
type BackendListRequest struct{}
// BackendListReply is the response from a backend.list NATS request.
type BackendListReply struct {
Backends []NodeBackendInfo `json:"backends"`
Error string `json:"error,omitempty"`
}
// NodeBackendInfo describes a backend installed on a worker node.
type NodeBackendInfo struct {
Name string `json:"name"`
IsSystem bool `json:"is_system"`
IsMeta bool `json:"is_meta"`
InstalledAt string `json:"installed_at,omitempty"`
GalleryURL string `json:"gallery_url,omitempty"`
// Version, URI and Digest enable cluster-wide upgrade detection —
// without them, the frontend cannot tell whether the installed OCI
// image matches the gallery entry, and upgrades silently never surface.
Version string `json:"version,omitempty"`
URI string `json:"uri,omitempty"`
Digest string `json:"digest,omitempty"`
}
// SubjectNodeBackendStop tells a worker node to stop its gRPC backend process.
// Equivalent to the local deleteProcess(). The node will:
// 1. Best-effort Free() via gRPC
// 2. Kill the backend process
// 3. Can be restarted via another backend.start event.
func SubjectNodeBackendStop(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.stop"
}
// SubjectNodeBackendDelete tells a worker node to delete a backend (stop + remove files).
// Uses NATS request-reply.
func SubjectNodeBackendDelete(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.delete"
}
// BackendDeleteRequest is the payload for a backend.delete NATS request.
type BackendDeleteRequest struct {
Backend string `json:"backend"`
}
// BackendDeleteReply is the response from a backend.delete NATS request.
type BackendDeleteReply struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// SubjectNodeModelUnload tells a worker node to unload a model (gRPC Free) without killing the backend.
// Uses NATS request-reply.
func SubjectNodeModelUnload(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".model.unload"
}
// ModelUnloadRequest is the payload for a model.unload NATS request.
type ModelUnloadRequest struct {
ModelName string `json:"model_name"`
Address string `json:"address,omitempty"` // gRPC address of the backend process to unload from
}
// ModelUnloadReply is the response from a model.unload NATS request.
type ModelUnloadReply struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// SubjectNodeModelDelete tells a worker node to delete model files from disk.
// Uses NATS request-reply.
func SubjectNodeModelDelete(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".model.delete"
}
// ModelDeleteRequest is the payload for a model.delete NATS request.
type ModelDeleteRequest struct {
ModelName string `json:"model_name"`
}
// ModelDeleteReply is the response from a model.delete NATS request.
type ModelDeleteReply struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// SubjectNodeStop tells a serve-backend node to shut down entirely
// (deregister + exit). The node will not restart the backend process.
func SubjectNodeStop(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".stop"
}
// File Staging (Request-Reply — targeted to specific nodes)
// These subjects use request-reply for synchronous file operations.
// SubjectNodeFilesEnsure tells a serve-backend node to download an S3 key to its local cache.
// Reply: {local_path, error}
func SubjectNodeFilesEnsure(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".files.ensure"
}
// SubjectNodeFilesStage tells a serve-backend node to upload a local file to S3.
// Reply: {key, error}
func SubjectNodeFilesStage(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".files.stage"
}
// SubjectNodeFilesTemp tells a serve-backend node to allocate a temp file.
// Reply: {local_path, error}
func SubjectNodeFilesTemp(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".files.temp"
}
// SubjectNodeFilesListDir tells a serve-backend node to list files in a directory.
// Reply: {files: [...], error}
func SubjectNodeFilesListDir(nodeID string) string {
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".files.listdir"
}
// Cache Invalidation (Pub/Sub — broadcast to all instances)
const (
SubjectCacheInvalidateSkills = "cache.invalidate.skills"
// SubjectCacheInvalidateModels is broadcast by the replica that completed
// a model install/delete. Peers subscribe and re-run
// ModelConfigLoader.LoadModelConfigsFromPath so a chat completion routed
// to a different replica can find the newly installed model.
SubjectCacheInvalidateModels = "cache.invalidate.models"
// SubjectCacheInvalidateBackends is broadcast after a backend
// install/upgrade/delete. Peers retrigger their UpgradeChecker so the
// 6-hour upgrade-available cache flips to fresh on every replica, not
// just the one that handled the request.
SubjectCacheInvalidateBackends = "cache.invalidate.backends"
)
// CacheInvalidateEvent is the payload for cache invalidation broadcasts.
// Element names a specific model/backend when known; empty means "the whole
// set was touched, do a full reload."
type CacheInvalidateEvent struct {
Element string `json:"element,omitempty"`
Op string `json:"op,omitempty"` // "install" | "delete" | "upgrade"
}
// SubjectCacheInvalidateCollection returns the NATS subject for collection cache invalidation.
func SubjectCacheInvalidateCollection(name string) string {
return "cache.invalidate.collections." + sanitizeSubjectToken(name)
}
// Prefix-Cache Routing Sync (Pub/Sub - broadcast to all frontends)
//
// Frontends share prefix-cache observations so a request routed to any replica
// benefits from the prefix-affinity another replica already learned. This
// mirrors the OpCache live-sync pattern: plain NATS Core pub/sub, no JetStream.
const (
SubjectPrefixCacheObserve = "prefixcache.observe"
SubjectPrefixCacheInvalidate = "prefixcache.invalidate"
)
// PrefixCacheObserveEvent announces that the replica (NodeID, Replica) served a
// request whose prefix chain ends at the given hashes for model. Chain is the
// full shallow-to-deep hash chain so peers can insert the same path. Affinity is
// per replica (a backend process with its own KV cache), not per node, so the
// replica index is carried so peers attribute the observation to the same one.
type PrefixCacheObserveEvent struct {
Model string `json:"model"`
Chain []uint64 `json:"chain"`
NodeID string `json:"node_id"`
Replica int `json:"replica"`
}
// PrefixCacheInvalidateEvent tells peers to drop entries for a replica. When
// Replica >= 0 it targets the single replica (Model, NodeID, Replica). When
// Replica < 0 it targets ALL replicas of (Model, NodeID), for example when a
// whole node goes offline.
type PrefixCacheInvalidateEvent struct {
Model string `json:"model"`
NodeID string `json:"node_id"`
Replica int `json:"replica"`
}