Compare commits

...

18 Commits

Author SHA1 Message Date
Ettore Di Giacinto
c410cd7253 test(ui): repoint page-render-smoke from /app/nodes to /app/cluster
The /app/nodes route now redirects to the merged Cluster page, so the
render-smoke spec (added on master) asserted a stale URL. Point it at
/app/cluster, which keeps render-smoke coverage on the new page.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-03 08:22:59 +00:00
Ettore Di Giacinto
4e993332af Merge remote-tracking branch 'origin/master' into worktree-clusterrouting-phase2
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-03 08:22:59 +00:00
Ettore Di Giacinto
450376d22f test(ui): repoint node-backend-actions spec to /app/cluster
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:25:54 +00:00
Ettore Di Giacinto
8180fddc05 test(ui): e2e specs for merged Cluster page and old-route redirects
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:21:02 +00:00
Ettore Di Giacinto
5033457f57 refactor(ui): repoint internal links from /app/nodes to /app/cluster
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:17:43 +00:00
Ettore Di Giacinto
d88758282a feat(ui): route /app/cluster, redirect /app/p2p and /app/nodes, single sidebar entry
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:13:44 +00:00
Ettore Di Giacinto
a0c7cecddd feat(ui): add Cluster page composing distributed and swarm sections
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:07:11 +00:00
Ettore Di Giacinto
bc42374d8a feat(ui): add useP2PMode hook and embedded prop to Nodes and P2P pages
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 23:03:42 +00:00
Ettore Di Giacinto
ec2a0645dd feat(p2p): optional cross-server prefix-affinity sync over the generic channel
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 22:28:21 +00:00
Ettore Di Giacinto
ce8b97edf2 feat(p2p): edgevpn generic-channel publisher and handler for affinity sync
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 22:22:59 +00:00
Ettore Di Giacinto
91fc26ff75 refactor(p2p): dedupe forwarded Connection header and drop unused extractModel param
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 22:07:10 +00:00
Ettore Di Giacinto
8df0bb683b feat(p2p): L7 HTTP-terminating federation proxy with model + prefix-affinity routing
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 21:58:41 +00:00
Ettore Di Giacinto
8ec536a34c feat(p2p): affinity-aware peer selection and federation body-limit flag
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 21:45:13 +00:00
Ettore Di Giacinto
14b57aa343 feat(p2p): model-filtered federation candidates and request model extraction
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 21:37:13 +00:00
Ettore Di Giacinto
288d732af7 feat(p2p): gossip each peer's model set for model-aware federation
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 21:34:08 +00:00
Ettore Di Giacinto
ed38609d51 feat(routing): add load-guarded PickWithAffinity for prefix-cache routing
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 21:29:20 +00:00
Ettore Di Giacinto
7768b35696 feat(p2p): route federation with shared clusterrouting policy (load + VRAM)
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 08:03:39 +00:00
Ettore Di Giacinto
830f818c58 feat(p2p): gossip free VRAM per node and add testable online check
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-01 07:54:42 +00:00
42 changed files with 1215 additions and 138 deletions

View File

@@ -53,9 +53,21 @@ func (a *Application) StartP2P() error {
return err
}
// modelsFn reports the model names this instance currently serves so the
// federation proxy can route a request only to peers that have the
// requested model. It is re-evaluated on every announce tick.
modelsFn := func() []string {
cfgs := a.ModelConfigLoader().GetAllModelsConfigs()
names := make([]string, 0, len(cfgs))
for _, c := range cfgs {
names = append(names, c.Name)
}
return names
}
// Here a new node is created and started
// and a service is exposed by the node
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID))
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), modelsFn)
if err != nil {
return err
}

View File

@@ -14,12 +14,14 @@ type FederatedCLI struct {
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"`
AffinitySync bool `env:"LOCALAI_FEDERATED_AFFINITY_SYNC,FEDERATED_AFFINITY_SYNC" default:"false" help:"Broadcast prefix-cache affinity observations to other federation servers over the p2p generic channel (enable on every federation server that should cohere)" group:"p2p"`
}
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
warnDeprecatedFlags()
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024, f.AffinitySync)
c, cancel := context.WithCancel(context.Background())

View File

@@ -62,7 +62,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
if err != nil {
return err
}
@@ -104,7 +104,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
if err != nil {
return err
}

View File

@@ -81,7 +81,7 @@ func (r *P2PMLX) Run(ctx *cliContext.Context) error {
}
}()
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID))
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID), nil)
if err != nil {
return err
}

View File

@@ -0,0 +1,37 @@
import { test, expect } from './coverage-fixtures.js'
// The Cluster page composes two capability sections: "Distributed (NATS)" (the
// former Nodes page) and "Swarm (p2p)" (the former P2P page). Each section only
// mounts when its mode is enabled — distributed when /api/nodes answers OK, swarm
// when a non-empty p2p network token is present. We mock those probes so the page
// renders against the standalone ui-test-server without NATS / p2p running.
async function mockDistributedOnly(page) {
await page.route('**/api/nodes', (route) => {
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
})
await page.route('**/api/nodes/scheduling', (route) => {
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
})
// Swarm disabled: token probe fails, so the swarm section stays hidden.
await page.route('**/api/p2p/token', (route) => {
route.fulfill({ status: 503, contentType: 'text/plain', body: '' })
})
}
test.describe('Cluster page', () => {
test('shows the page title', async ({ page }) => {
await mockDistributedOnly(page)
await page.goto('/app/cluster')
await expect(page).toHaveURL(/\/app\/cluster$/)
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
})
test('shows the distributed section when /api/nodes responds', async ({ page }) => {
await mockDistributedOnly(page)
await page.goto('/app/cluster')
await expect(page).toHaveURL(/\/app\/cluster$/)
// The distributed capability section is titled "Distributed (NATS)".
await expect(page.getByText(/Distributed \(NATS\)/i)).toBeVisible()
})
})

View File

@@ -23,4 +23,11 @@ test.describe('Navigation', () => {
await expect(page).toHaveURL(/\/app\/traces/)
await expect(page.getByRole('heading', { name: 'Traces', exact: true })).toBeVisible()
})
test('old cluster routes redirect to /app/cluster', async ({ page }) => {
await page.goto('/app/p2p')
await expect(page).toHaveURL(/\/app\/cluster$/)
await page.goto('/app/nodes')
await expect(page).toHaveURL(/\/app\/cluster$/)
})
})

View File

@@ -81,7 +81,7 @@ async function mockDistributedNodes(page, { onDelete } = {}) {
}
async function expandNodeAndWaitForBackends(page) {
await page.goto('/app/nodes')
await page.goto('/app/cluster')
// Click the row to expand it. The chevron toggle and the row both work,
// but clicking the name cell is the most user-like.
await page.getByText(NODE_NAME).first().click()

View File

@@ -1,26 +1,65 @@
import { test, expect } from './coverage-fixtures.js'
// P2P (Swarm) admin page — renders in the no-auth test harness (isAdmin).
test.describe('P2P page', () => {
test.beforeEach(async ({ page }) => {
// The standalone P2P (Swarm) page was merged into the Cluster page: /app/p2p now
// redirects to /app/cluster, and the p2p content lives under the "Swarm (p2p)"
// section. That section only mounts when p2p is enabled (a network token is
// present), so we mock /api/p2p/token to return a non-empty token and assert the
// swarm content renders under the cluster page.
const P2P_TOKEN = 'test-network-token'
async function mockSwarmEnabled(page) {
await page.route('**/api/p2p/token', (route) => {
route.fulfill({
status: 200,
contentType: 'text/plain',
body: P2P_TOKEN,
})
})
await page.route('**/api/p2p/workers', (route) => {
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
})
await page.route('**/api/p2p/federation', (route) => {
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
})
await page.route('**/api/p2p/stats', (route) => {
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify({
llama_cpp_workers: { online: 0, total: 0 },
federated: { online: 0, total: 0 },
mlx_workers: { online: 0, total: 0 },
}),
})
})
// The cluster page also probes /api/nodes for the distributed section; keep it
// failing (distributed disabled) so only the swarm section renders here.
await page.route('**/api/nodes', (route) => {
route.fulfill({ status: 503, contentType: 'application/json', body: '{}' })
})
}
test.describe('P2P (Swarm) section on the Cluster page', () => {
test('the old /app/p2p route lands on the cluster page', async ({ page }) => {
await mockSwarmEnabled(page)
// /app/p2p redirects to /app/cluster.
await page.goto('/app/p2p')
await expect(page).toHaveURL(/\/app\/cluster$/)
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
})
test('renders the P2P distribution overview and capability cards', async ({ page }) => {
await expect(page).toHaveURL(/\/app\/p2p$/)
await expect(page.getByRole('heading', { name: /P2P Distribution Not Enabled/i })).toBeVisible()
await expect(page.getByRole('heading', { name: 'Instance Federation' })).toBeVisible()
await expect(page.getByRole('heading', { name: 'Model Sharding' })).toBeVisible()
await expect(page.getByRole('heading', { name: 'Resource Sharing' })).toBeVisible()
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
})
test('renders the Swarm (p2p) section when p2p is enabled', async ({ page }) => {
await mockSwarmEnabled(page)
await page.goto('/app/cluster')
await expect(page).toHaveURL(/\/app\/cluster$/)
test('hardware selector offers build targets and responds to selection', async ({ page }) => {
const cpu = page.getByRole('button').filter({ hasText: /^CPU$/ })
const cuda = page.getByRole('button').filter({ hasText: /^CUDA 12$/ })
await expect(cpu).toBeVisible()
await expect(cuda).toBeVisible()
await cuda.click() // selecting a build target must not break the page
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
// The collapsible swarm section is titled "Swarm (p2p)".
await expect(page.getByText(/Swarm \(p2p\)/i)).toBeVisible()
// The enabled p2p content (Network Token panel + the federation / sharding
// tabs) is rendered inside the swarm section.
await expect(page.getByRole('heading', { name: /Network Token/i })).toBeVisible()
await expect(page.getByText('Federation', { exact: true })).toBeVisible()
await expect(page.getByText('Model Sharding', { exact: true })).toBeVisible()
})
})

View File

@@ -20,7 +20,7 @@ const PAGES = [
['/app/manage', 'Manage'],
['/app/backends', 'Backends'],
['/app/settings', 'Settings'],
['/app/nodes', 'Nodes'],
['/app/cluster', 'Cluster'],
['/app/face', 'Face recognition'],
['/app/voice', 'Voice recognition'],
['/app/fine-tune', 'Fine-tuning'],

View File

@@ -58,5 +58,21 @@
"explorer": {
"title": "Explorer",
"subtitle": "Dateien und Konfiguration durchsuchen"
},
"cluster": {
"title": "Cluster",
"subtitle": "Verteilte und Peer-to-Peer-Knoten, die diese Instanz bedienen",
"empty": "Es ist kein verteiltes oder p2p-Clustering aktiviert. Starte LocalAI im verteilten oder föderierten/p2p-Modus, um hier Cluster-Knoten zu verwalten.",
"distributed": {
"title": "Verteilt (NATS)"
},
"swarm": {
"title": "Swarm (p2p)"
},
"summary": {
"nodes": "Verteilte Knoten",
"inFlight": "Laufende Anfragen",
"peers": "Swarm-Peers online"
}
}
}

View File

@@ -40,6 +40,7 @@
"traces": "Traces",
"nodes": "Knoten",
"swarm": "Swarm",
"cluster": "Cluster",
"system": "System",
"settings": "Einstellungen",
"api": "API"

View File

@@ -81,5 +81,21 @@
"explorer": {
"title": "Explorer",
"subtitle": "Browse files and configuration"
},
"cluster": {
"title": "Cluster",
"subtitle": "Distributed and peer-to-peer nodes serving this instance",
"empty": "No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.",
"distributed": {
"title": "Distributed (NATS)"
},
"swarm": {
"title": "Swarm (p2p)"
},
"summary": {
"nodes": "Distributed nodes",
"inFlight": "In-flight requests",
"peers": "Swarm peers online"
}
}
}

View File

@@ -41,6 +41,7 @@
"traces": "Traces",
"nodes": "Nodes",
"swarm": "Swarm",
"cluster": "Cluster",
"system": "System",
"settings": "Settings",
"api": "API"

View File

@@ -58,5 +58,21 @@
"explorer": {
"title": "Explorador",
"subtitle": "Explora archivos y configuración"
},
"cluster": {
"title": "Clúster",
"subtitle": "Nodos distribuidos y entre pares que sirven a esta instancia",
"empty": "No hay clustering distribuido ni p2p habilitado. Inicia LocalAI en modo distribuido o federado/p2p para gestionar aquí los nodos del clúster.",
"distributed": {
"title": "Distribuido (NATS)"
},
"swarm": {
"title": "Swarm (p2p)"
},
"summary": {
"nodes": "Nodos distribuidos",
"inFlight": "Solicitudes en curso",
"peers": "Pares de Swarm en línea"
}
}
}

View File

@@ -40,6 +40,7 @@
"traces": "Trazas",
"nodes": "Nodos",
"swarm": "Swarm",
"cluster": "Clúster",
"system": "Sistema",
"settings": "Configuración",
"api": "API"

View File

@@ -58,5 +58,21 @@
"explorer": {
"title": "Esplora risorse",
"subtitle": "Sfoglia file e configurazioni"
},
"cluster": {
"title": "Cluster",
"subtitle": "Nodi distribuiti e peer-to-peer al servizio di questa istanza",
"empty": "Nessun clustering distribuito o p2p è abilitato. Avvia LocalAI in modalità distribuita o federata/p2p per gestire qui i nodi del cluster.",
"distributed": {
"title": "Distribuito (NATS)"
},
"swarm": {
"title": "Swarm (p2p)"
},
"summary": {
"nodes": "Nodi distribuiti",
"inFlight": "Richieste in corso",
"peers": "Peer Swarm online"
}
}
}

View File

@@ -40,6 +40,7 @@
"traces": "Tracce",
"nodes": "Nodi",
"swarm": "Swarm",
"cluster": "Cluster",
"system": "Sistema",
"settings": "Impostazioni",
"api": "API"

View File

@@ -58,5 +58,21 @@
"explorer": {
"title": "资源浏览器",
"subtitle": "浏览文件和配置"
},
"cluster": {
"title": "集群",
"subtitle": "为此实例提供服务的分布式和点对点节点",
"empty": "未启用分布式或 p2p 集群。请以分布式或联邦/p2p 模式启动 LocalAI以便在此管理集群节点。",
"distributed": {
"title": "分布式 (NATS)"
},
"swarm": {
"title": "Swarm (p2p)"
},
"summary": {
"nodes": "分布式节点",
"inFlight": "进行中的请求",
"peers": "在线 Swarm 节点"
}
}
}

View File

@@ -40,6 +40,7 @@
"traces": "追踪",
"nodes": "节点",
"swarm": "Swarm",
"cluster": "集群",
"system": "系统",
"settings": "设置",
"api": "API"

View File

@@ -0,0 +1,27 @@
import { useState } from 'react'
// ClusterSection is a collapsible, titled container for one capability area of
// the Cluster page (Distributed / Swarm). Default expanded.
export default function ClusterSection({ icon, title, subtitle, defaultOpen = true, children }) {
const [open, setOpen] = useState(defaultOpen)
return (
<section className="card" style={{ marginBottom: 'var(--spacing-lg)' }}>
<button
type="button"
aria-expanded={open}
onClick={() => setOpen((o) => !o)}
style={{
display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)',
width: '100%', padding: 'var(--spacing-md)', background: 'none',
border: 'none', cursor: 'pointer', textAlign: 'left', color: 'inherit',
}}
>
<i className={`fas fa-chevron-${open ? 'down' : 'right'}`} style={{ width: '1rem', color: 'var(--color-text-muted)' }} />
{icon && <i className={icon} style={{ color: 'var(--color-primary)' }} />}
<span style={{ fontWeight: 600 }}>{title}</span>
{subtitle && <span style={{ marginLeft: 'auto', color: 'var(--color-text-muted)', fontSize: '0.875rem' }}>{subtitle}</span>}
</button>
{open && <div style={{ padding: '0 var(--spacing-md) var(--spacing-md)' }}>{children}</div>}
</section>
)
}

View File

@@ -0,0 +1,44 @@
import { useEffect, useState } from 'react'
import { useTranslation } from 'react-i18next'
import StatCard from './StatCard'
import { nodesApi, p2pApi } from '../utils/api'
// ClusterSummary shows merged totals across both transports. Self-contained
// (own lightweight fetch) so the page composes without lifting state out of the
// two large section components.
export default function ClusterSummary({ distributedEnabled, p2pEnabled }) {
const { t } = useTranslation('admin')
const [nats, setNats] = useState({ nodes: 0, inFlight: 0 })
const [swarm, setSwarm] = useState({ online: 0, total: 0 })
useEffect(() => {
let active = true
async function load() {
if (distributedEnabled) {
try {
const list = await nodesApi.list()
const nodes = Array.isArray(list) ? list : (list?.nodes ?? [])
if (active) setNats({ nodes: nodes.length, inFlight: nodes.reduce((a, n) => a + (n.in_flight_count || 0), 0) })
} catch { /* leave zeros */ }
}
if (p2pEnabled) {
try {
const stats = await p2pApi.getStats()
const online = (stats?.federated?.online || 0) + (stats?.llama_cpp_workers?.online || 0) + (stats?.mlx_workers?.online || 0)
const total = (stats?.federated?.total || 0) + (stats?.llama_cpp_workers?.total || 0) + (stats?.mlx_workers?.total || 0)
if (active) setSwarm({ online, total })
} catch { /* leave zeros */ }
}
}
load()
return () => { active = false }
}, [distributedEnabled, p2pEnabled])
return (
<div className="stat-grid" style={{ marginBottom: 'var(--spacing-lg)' }}>
{distributedEnabled && <StatCard icon="fas fa-network-wired" label={t('cluster.summary.nodes', 'Distributed nodes')} value={nats.nodes} />}
{distributedEnabled && <StatCard icon="fas fa-bolt" label={t('cluster.summary.inFlight', 'In-flight requests')} value={nats.inFlight} />}
{p2pEnabled && <StatCard icon="fas fa-circle-nodes" label={t('cluster.summary.peers', 'Swarm peers online')} value={`${swarm.online}/${swarm.total}`} />}
</div>
)
}

View File

@@ -479,7 +479,7 @@ export default function NodeInstallPicker({
Approve pending workers or register new ones.
{pendingCount > 0 && ` (${pendingCount} awaiting approval.)`}
</p>
<a className="btn btn-secondary btn-sm" href="/app/nodes">
<a className="btn btn-secondary btn-sm" href="/app/cluster">
<i className="fas fa-network-wired" /> Manage nodes
</a>
</div>
@@ -672,7 +672,7 @@ export default function NodeInstallPicker({
{pendingCount > 0 && (
<p style={{ fontSize: '0.75rem', color: 'var(--color-text-muted)', marginTop: 0, marginBottom: 'var(--spacing-sm)' }}>
+{pendingCount} awaiting approval <a href="/app/nodes" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
+{pendingCount} awaiting approval <a href="/app/cluster" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
</p>
)}

View File

@@ -75,8 +75,7 @@ const sections = [
{ path: '/app/middleware', icon: 'fas fa-shield-halved', labelKey: 'items.middleware', adminOnly: true },
{ path: '/app/backends', icon: 'fas fa-server', labelKey: 'items.backends', adminOnly: true },
{ path: '/app/traces', icon: 'fas fa-chart-line', labelKey: 'items.traces', adminOnly: true },
{ path: '/app/nodes', icon: 'fas fa-network-wired', labelKey: 'items.nodes', adminOnly: true, feature: 'distributed' },
{ path: '/app/p2p', icon: 'fas fa-circle-nodes', labelKey: 'items.swarm', adminOnly: true },
{ path: '/app/cluster', icon: 'fas fa-network-wired', labelKey: 'items.cluster', adminOnly: true },
{ path: '/app/manage', icon: 'fas fa-desktop', labelKey: 'items.system', adminOnly: true },
{ path: '/app/settings', icon: 'fas fa-cog', labelKey: 'items.settings', adminOnly: true },
],

View File

@@ -0,0 +1,31 @@
import { useState, useEffect, useCallback } from 'react'
import { p2pApi } from '../utils/api'
// useP2PMode reports whether p2p / swarm mode is available, mirroring
// useDistributedMode. Availability is "a network token exists" (the same signal
// the standalone P2P page used). One-shot probe on mount plus a manual refetch.
//
// Returns:
// enabled — true when a non-empty network token is present
// loading — true until the first probe completes
// refetch — manual trigger to re-run the probe
export function useP2PMode() {
const [enabled, setEnabled] = useState(false)
const [loading, setLoading] = useState(true)
const probe = useCallback(async () => {
setLoading(true)
try {
const token = await p2pApi.getToken()
setEnabled(!!(token && String(token).trim()))
} catch {
setEnabled(false)
} finally {
setLoading(false)
}
}, [])
useEffect(() => { probe() }, [probe])
return { enabled, loading, refetch: probe }
}

View File

@@ -339,7 +339,7 @@ function DistributedBackendLogsResolver({ modelId, fromTimestamp }) {
<h2 className="empty-state-title">Model not loaded on any worker</h2>
<p className="empty-state-text">
<span style={{ fontFamily: 'var(--font-mono)' }}>{modelId}</span> isn't currently loaded on any node in the cluster.
Check the <Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
Check the <Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
</p>
</div>
</div>

View File

@@ -49,7 +49,7 @@ export default function Backends() {
// whenever splitMenuFor changes to a different row index.
const splitMenuAnchorRef = useRef(null)
// Target-node mode: set when navigated from /app/nodes via "+ Add backend".
// Target-node mode: set when navigated from /app/cluster via "+ Add backend".
// The gallery page header banners the scope; rows collapse their split-button
// to a single Install-on-this-node action; manual install posts to the
// per-node endpoint.
@@ -323,7 +323,7 @@ export default function Backends() {
return (
<div className="page page--wide">
{/* Target-node banner: when this gallery is scoped to one node via
?target=<id> (entered from /app/nodes), show the scope clearly and
?target=<id> (entered from /app/cluster), show the scope clearly and
give a fast way to clear it. Visually a primary-tinted strip so the
user knows they're in a special mode without it feeling alarming. */}
{targetNode && (

View File

@@ -0,0 +1,45 @@
import { useTranslation } from 'react-i18next'
import { useDistributedMode } from '../hooks/useDistributedMode'
import { useP2PMode } from '../hooks/useP2PMode'
import ClusterSection from '../components/ClusterSection'
import ClusterSummary from '../components/ClusterSummary'
import Nodes from './Nodes'
import P2P from './P2P'
export default function Cluster() {
const { t } = useTranslation('admin')
const distributed = useDistributedMode()
const p2p = useP2PMode()
const loading = distributed.loading || p2p.loading
const nothingEnabled = !loading && !distributed.enabled && !p2p.enabled
return (
<div className="page page--wide">
<div className="page-header">
<h1 className="page-title"><i className="fas fa-network-wired" /> {t('cluster.title', 'Cluster')}</h1>
<p className="page-subtitle">{t('cluster.subtitle', 'Distributed and peer-to-peer nodes serving this instance')}</p>
</div>
{!loading && <ClusterSummary distributedEnabled={distributed.enabled} p2pEnabled={p2p.enabled} />}
{distributed.enabled && (
<ClusterSection icon="fas fa-network-wired" title={t('cluster.distributed.title', 'Distributed (NATS)')} defaultOpen>
<Nodes embedded />
</ClusterSection>
)}
{p2p.enabled && (
<ClusterSection icon="fas fa-circle-nodes" title={t('cluster.swarm.title', 'Swarm (p2p)')} defaultOpen={!distributed.enabled}>
<P2P embedded />
</ClusterSection>
)}
{nothingEnabled && (
<div className="card" style={{ padding: 'var(--spacing-lg)', textAlign: 'center', color: 'var(--color-text-muted)' }}>
{t('cluster.empty', 'No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.')}
</div>
)}
</div>
)
}

View File

@@ -162,7 +162,7 @@ export default function NodeBackendLogs() {
<h2 className="empty-state-title">No node/model selected</h2>
<p className="empty-state-text">
View backend logs from the{' '}
<Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
<Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
</p>
</div>
</div>
@@ -220,7 +220,7 @@ export default function NodeBackendLogs() {
</h1>
<p className="page-subtitle" style={{ marginTop: 'var(--spacing-xs)' }}>
Backend logs from node <strong>{nodeName || nodeId}</strong>
{' '}<Link to="/app/nodes" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
{' '}<Link to="/app/cluster" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
</p>
</div>
</div>

View File

@@ -689,7 +689,7 @@ function SchedulingForm({ onSave, onCancel }) {
)
}
export default function Nodes() {
export default function Nodes({ embedded = false }) {
const { addToast } = useOutletContext()
const navigate = useNavigate()
const { t } = useTranslation('admin')
@@ -983,16 +983,18 @@ export default function Nodes() {
const pending = filteredNodes.filter(n => n.status === 'pending').length
return (
<div className="page page--wide">
<div className="page-header">
<h1 className="page-title">
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
{t('nodes.title')}
</h1>
<p className="page-subtitle">
{t('nodes.subtitle')}
</p>
</div>
<div className={embedded ? '' : 'page page--wide'}>
{!embedded && (
<div className="page-header">
<h1 className="page-title">
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
{t('nodes.title')}
</h1>
<p className="page-subtitle">
{t('nodes.subtitle')}
</p>
</div>
)}
{/* Tabs */}
<div className="tabs" style={{ marginBottom: 'var(--spacing-lg)' }}>

View File

@@ -102,7 +102,7 @@ function StepNumber({ n, bg, color }) {
)
}
export default function P2P() {
export default function P2P({ embedded = false }) {
const { addToast } = useOutletContext()
const { t } = useTranslation('admin')
const [workers, setWorkers] = useState([])
@@ -172,7 +172,7 @@ export default function P2P() {
if (loading) {
return (
<div className="page page--narrow" style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
<div className={embedded ? '' : 'page page--narrow'} style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
<LoadingSpinner size="lg" />
</div>
)
@@ -181,7 +181,7 @@ export default function P2P() {
// ── P2P Disabled ──
if (!enabled) {
return (
<div className="page page--narrow">
<div className={embedded ? '' : 'page page--narrow'}>
<div style={{ textAlign: 'center', padding: 'var(--spacing-xl) 0' }}>
<i className="fas fa-network-wired" style={{ fontSize: '3rem', color: 'var(--color-primary)', marginBottom: 'var(--spacing-md)' }} />
<h1 style={{ fontSize: '1.5rem', fontWeight: 600, marginBottom: 'var(--spacing-sm)' }}>
@@ -294,21 +294,23 @@ export default function P2P() {
const mlxTotal = stats.mlx_workers?.total ?? 0
return (
<div className="page page--narrow">
<div className="page-header">
<h1 className="page-title">
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
{t('p2p.title')}
</h1>
<p className="page-subtitle">
{t('p2p.subtitle')}
{' '}
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
style={{ color: 'var(--color-primary)' }}>
<i className="fas fa-circle-info" />
</a>
</p>
</div>
<div className={embedded ? '' : 'page page--narrow'}>
{!embedded && (
<div className="page-header">
<h1 className="page-title">
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
{t('p2p.title')}
</h1>
<p className="page-subtitle">
{t('p2p.subtitle')}
{' '}
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
style={{ color: 'var(--color-primary)' }}>
<i className="fas fa-circle-info" />
</a>
</p>
</div>
)}
{/* Network Token */}
<div style={{

View File

@@ -45,7 +45,7 @@ const Talk = page('talk', () => import('./pages/Talk'))
const Backends = page('backends', () => import('./pages/Backends'))
const Settings = page('settings', () => import('./pages/Settings'))
const Traces = page('traces', () => import('./pages/Traces'))
const P2P = page('p2p', () => import('./pages/P2P'))
const Cluster = page('cluster', () => import('./pages/Cluster'))
const Agents = page('agents', () => import('./pages/Agents'))
const AgentCreate = page(null, () => import('./pages/AgentCreate'))
const AgentChat = page(null, () => import('./pages/AgentChat'))
@@ -68,7 +68,6 @@ const Quantize = page('quantize', () => import('./pages/Quantize'))
const Studio = page('studio', () => import('./pages/Studio'))
const FaceRecognition = page('face', () => import('./pages/FaceRecognition'))
const VoiceRecognition = page('voice', () => import('./pages/VoiceRecognition'))
const Nodes = page('nodes', () => import('./pages/Nodes'))
const NodeBackendLogs = page(null, () => import('./pages/NodeBackendLogs'))
const NotFound = page(null, () => import('./pages/NotFound'))
const Usage = page('usage', () => import('./pages/Usage'))
@@ -120,8 +119,9 @@ const appChildren = [
{ path: 'settings', element: <Admin><Settings /></Admin> },
{ path: 'traces', element: <Admin><Traces /></Admin> },
{ path: 'backend-logs/:modelId', element: <Admin><BackendLogs /></Admin> },
{ path: 'p2p', element: <Admin><P2P /></Admin> },
{ path: 'nodes', element: <Admin><Nodes /></Admin> },
{ path: 'cluster', element: <Admin><Cluster /></Admin> },
{ path: 'p2p', element: <Navigate to="/app/cluster" replace /> },
{ path: 'nodes', element: <Navigate to="/app/cluster" replace /> },
{ path: 'node-backend-logs/:nodeId/:modelId', element: <Admin><NodeBackendLogs /></Admin> },
{ path: 'agents', element: <Feature feature="agents"><Agents /></Feature> },
{ path: 'agents/new', element: <Feature feature="agents"><AgentCreate /></Feature> },

81
core/p2p/affinity_sync.go Normal file
View File

@@ -0,0 +1,81 @@
package p2p
import (
"encoding/json"
"fmt"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/edgevpn/pkg/blockchain"
"github.com/mudler/edgevpn/pkg/hub"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/xlog"
)
// affinitySubjectKey is the hub.Message annotation carrying the logical subject
// (observe vs invalidate) so the receiver can dispatch the way a NATS subject
// would. The generic channel has no subject routing, so we carry it ourselves.
const affinitySubjectKey = "subject"
// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to
// the prefixcache publisher interface (Publish(subject, v)). It lets a
// federation server reuse prefixcache.Sync for cross-server affinity coherence
// without NATS: each event is JSON-encoded into a hub.Message and broadcast over
// the generic channel (not the slow blockchain ledger).
type genericChannelPublisher struct {
node *node.Node
}
// Publish satisfies prefixcache's (unexported) publisher interface structurally.
func (p *genericChannelPublisher) Publish(subject string, v any) error {
payload, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("marshalling affinity event: %w", err)
}
return p.node.PublishMessage(&hub.Message{
Message: string(payload),
Annotations: map[string]interface{}{affinitySubjectKey: subject},
})
}
// applyAffinityMessage decodes a generic-channel affinity message and applies it
// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the
// local clock so TTL is measured per server. Unknown subjects, malformed
// payloads, and nil inputs are ignored (debug-logged), never fatal.
func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) {
if sync == nil || m == nil {
return
}
subject, _ := m.Annotations[affinitySubjectKey].(string)
switch subject {
case messaging.SubjectPrefixCacheObserve:
var ev messaging.PrefixCacheObserveEvent
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
xlog.Debug("affinity: bad observe payload", "error", err)
return
}
sync.ApplyObserve(ev, now)
case messaging.SubjectPrefixCacheInvalidate:
var ev messaging.PrefixCacheInvalidateEvent
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
xlog.Debug("affinity: bad invalidate payload", "error", err)
return
}
sync.ApplyInvalidate(ev)
default:
// Other generic-channel traffic; not ours.
}
}
// affinityHandler returns the edgevpn generic-channel handler that applies remote
// affinity events to this server's index. It is registered at node construction
// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is
// safe because messages only arrive after Start, by which point Start has wired
// fs.prefixSync.
func (fs *FederatedServer) affinityHandler() node.Handler {
return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error {
applyAffinityMessage(fs.prefixSync, m, time.Now())
return nil
}
}

View File

@@ -0,0 +1,48 @@
package p2p
import (
"encoding/json"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/edgevpn/pkg/hub"
)
var _ = Describe("applyAffinityMessage", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message {
payload, _ := json.Marshal(ev)
return &hub.Message{
Message: string(payload),
Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve},
}
}
It("applies a peer observe so the local index resolves the warm peer", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
sync := prefixcache.NewSync(idx, nil)
chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg)
applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref)
d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref)
Expect(d.HasHot).To(BeTrue())
Expect(d.Hot.NodeID).To(Equal("warm"))
})
It("ignores malformed, unknown-subject, and nil inputs without panicking", func() {
sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil)
applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref)
applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref)
applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref)
applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref)
applyAffinityMessage(sync, nil, ref)
Expect(true).To(BeTrue())
})
})

View File

@@ -1,10 +1,17 @@
package p2p
import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/clusterrouting"
"github.com/mudler/xlog"
)
@@ -23,16 +30,29 @@ type FederatedServer struct {
requestTable map[string]int
loadBalanced bool
workerTarget string
bodyLimit int64 // max request body bytes (0 = unlimited)
prefixCfg prefixcache.Config
prefixIndex *prefixcache.Index
prefixSync *prefixcache.Sync
prefixProvider prefixcache.Provider // Index (sync off) or Sync (sync on)
syncAffinity bool
}
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64, syncAffinity bool) *FederatedServer {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
bodyLimit: bodyLimit,
prefixCfg: cfg,
prefixIndex: idx,
prefixProvider: idx,
syncAffinity: syncAffinity,
}
}
@@ -74,28 +94,141 @@ func (fs *FederatedServer) syncTableStatus() {
}
}
func (fs *FederatedServer) SelectLeastUsedServer() string {
fs.syncTableStatus()
// buildFederatedCandidates maps the currently-online federated peers into the
// shared routing policy's candidate form, optionally filtered to peers that can
// serve model. A peer with a non-empty advertised model set that lacks model is
// excluded; a peer with an empty set is treated as "unknown" and stays eligible
// (so older peers and mid-convergence peers are not starved). When model is "",
// no model filtering is applied. InFlight comes from the per-peer request
// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero.
func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate {
candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes))
for _, nd := range nodes {
if !nd.IsOnlineAt(now) {
continue
}
if !servesModel(nd, model) {
continue
}
candidates = append(candidates, clusterrouting.ReplicaCandidate{
NodeID: nd.ID,
InFlight: requestTable[nd.ID],
AvailableVRAM: nd.AvailableVRAM,
})
}
return candidates
}
fs.Lock()
defer fs.Unlock()
xlog.Debug("SelectLeastUsedServer()", "request_table", fs.requestTable)
// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
// if there are no entries, return an empty string
var min int
var minKey string
for k, v := range fs.requestTable {
if min == 0 || v < min {
min = v
minKey = k
// servesModel reports whether nd is eligible to serve model. An empty model
// means "no filter". An empty advertised set means "unknown" and is eligible.
func servesModel(nd schema.NodeData, model string) bool {
if model == "" || len(nd.Models) == 0 {
return true
}
for _, m := range nd.Models {
if m == model {
return true
}
}
xlog.Debug("Selected tunnel", "tunnel", minKey, "requests_served", min, "request_table", fs.requestTable)
return false
}
return minKey
// extractModel best-effort resolves the target model of a buffered request,
// cheapest source first: an explicit query value, then the JSON body "model"
// field. Returns "" when it cannot be determined (for example a multipart or
// websocket request), in which case the caller routes by load/affinity only.
func extractModel(queryModel string, body []byte) string {
if strings.TrimSpace(queryModel) != "" {
return queryModel
}
if len(body) == 0 {
return ""
}
var probe struct {
Model string `json:"model"`
}
if err := json.Unmarshal(body, &probe); err != nil {
return ""
}
return probe.Model
}
// affinityPreferred returns the peer the prefix index considers warm for this
// chain, or "" when there is no match strong enough among the candidates. It
// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick
// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved.
func affinityPreferred(idx prefixcache.Provider, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string {
if idx == nil || len(chain) == 0 || len(candidates) == 0 {
return ""
}
keys := make([]prefixcache.ReplicaKey, 0, len(candidates))
for _, c := range candidates {
keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID})
}
d := idx.Decide(model, chain, keys, now)
if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch {
return d.Hot.NodeID
}
return ""
}
// selectPeer chooses the federated peer to serve a request for model with the
// given raw body. It filters candidates by model, computes the prefix chain,
// consults the affinity index, and makes the final load+VRAM-aware pick. It
// returns the chosen peer ID and the chain (so the caller can Observe after a
// successful forward). An empty model and nil body degrade to load+VRAM only.
// Returns "" when no eligible peer is online.
func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) {
fs.syncTableStatus()
nodes := GetAvailableNodes(fs.service)
// Snapshot candidates under the lock (it only guards requestTable), then
// release before the prefix hashing and tree walk, which are lock-free
// (candidates is a copy; prefixIndex/prefixCfg are set once at construction).
fs.Lock()
candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model)
fs.Unlock()
if len(candidates) == 0 {
return "", nil
}
var chain []uint64
preferred := ""
if fs.prefixProvider != nil && model != "" && len(body) > 0 {
chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg)
preferred = affinityPreferred(fs.prefixProvider, model, chain, candidates, fs.prefixCfg, now)
}
best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold)
if best == nil {
return "", chain
}
return best.NodeID, chain
}
// observeServed records that peerID served the given chain for model, so the
// next request sharing that prefix is routed back to the same warm peer.
func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) {
if fs.prefixProvider == nil || len(chain) == 0 || peerID == "" || model == "" {
return
}
fs.prefixProvider.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now)
}
// evictLoop periodically sweeps expired affinity entries so the in-memory tree
// does not grow unbounded. Runs for the lifetime of the proxy.
func (fs *FederatedServer) evictLoop(ctx context.Context) {
interval := fs.prefixCfg.TTL / 2
if interval <= 0 {
interval = time.Minute
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-t.C:
fs.prefixProvider.Evict(now)
}
}
}
func (fs *FederatedServer) RecordRequest(nodeID string) {

View File

@@ -1,22 +1,81 @@
package p2p
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/xlog"
)
// ErrBodyTooLarge is returned by readRequest when the buffered request body
// exceeds the configured limit. The proxy turns it into a 413 response.
var ErrBodyTooLarge = errors.New("request body exceeds limit")
// readRequest parses a single HTTP request from r and buffers its body (so the
// body can both be inspected for the model/prefix and replayed to the chosen
// peer). limit caps the buffered body in bytes; 0 means unlimited. A body over
// the cap returns ErrBodyTooLarge. The returned request has its body replaced
// with the buffered bytes and RequestURI cleared so it can be re-serialized
// with req.Write to the peer stream.
func readRequest(r *bufio.Reader, limit int64) (*http.Request, []byte, error) {
req, err := http.ReadRequest(r)
if err != nil {
return nil, nil, err
}
var body []byte
if req.Body != nil {
reader := io.Reader(req.Body)
if limit > 0 {
reader = io.LimitReader(req.Body, limit+1)
}
body, err = io.ReadAll(reader)
_ = req.Body.Close()
if err != nil {
return nil, nil, err
}
if limit > 0 && int64(len(body)) > limit {
return nil, nil, ErrBodyTooLarge
}
}
req.Body = io.NopCloser(bytes.NewReader(body))
req.ContentLength = int64(len(body))
req.RequestURI = ""
return req, body, nil
}
// isWebsocketUpgrade reports whether req is a websocket handshake, which must be
// forwarded as a raw bidirectional duplex (not request/streamed-response) and
// is not body-capped or model-routed.
func isWebsocketUpgrade(req *http.Request) bool {
return strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") &&
strings.EqualFold(req.Header.Get("Upgrade"), "websocket")
}
func (f *FederatedServer) Start(ctx context.Context) error {
n, err := NewNode(f.p2ptoken)
var extraOpts []node.Option
if f.syncAffinity {
extraOpts = append(extraOpts, node.EnableGenericHub, node.GenericChannelHandlers(f.affinityHandler()))
}
n, err := NewNode(f.p2ptoken, extraOpts...)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
if f.syncAffinity {
f.prefixSync = prefixcache.NewSync(f.prefixIndex, &genericChannelPublisher{node: n})
f.prefixProvider = f.prefixSync
xlog.Info("Federation affinity sync enabled (generic channel)")
}
err = n.Start(ctx)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
@@ -28,6 +87,8 @@ func (f *FederatedServer) Start(ctx context.Context) error {
return err
}
go f.evictLoop(ctx)
return f.proxy(ctx, n)
}
@@ -62,40 +123,60 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
continue
}
// Handle connections in a new goroutine, forwarding to the p2p service
// Handle connections in a new goroutine, terminating HTTP and
// forwarding the request to the chosen p2p peer.
go func() {
workerID := ""
if fs.workerTarget != "" {
workerID = fs.workerTarget
} else if fs.loadBalanced {
xlog.Debug("Load balancing request")
workerID = fs.SelectLeastUsedServer()
if workerID == "" {
xlog.Debug("Least used server not found, selecting random")
workerID = fs.RandomServer()
br := bufio.NewReader(conn)
req, body, err := readRequest(br, fs.bodyLimit)
if err != nil {
if err == ErrBodyTooLarge {
fs.sendHTMLResponse(conn, 413, "Request body too large")
return
}
} else {
workerID = fs.RandomServer()
}
if workerID == "" {
xlog.Error("No available nodes yet")
fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect")
xlog.Error("Failed to read request", "error", err)
_ = conn.Close()
return
}
upgrade := isWebsocketUpgrade(req)
now := time.Now()
var (
workerID string
model string
chain []uint64
)
switch {
case fs.workerTarget != "":
workerID = fs.workerTarget
case !fs.loadBalanced:
// Explicit random mode (the RandomWorker flag): keep the
// historical random pick, no model/affinity routing.
workerID = fs.RandomServer()
case upgrade:
// Websocket: no readable model; route by load only.
workerID, _ = fs.selectPeer("", nil, now)
default:
model = extractModel(req.URL.Query().Get("model"), body)
workerID, chain = fs.selectPeer(model, body, now)
}
if workerID == "" {
fs.sendHTMLResponse(conn, 503, "No federated peer available for this request")
return
}
xlog.Debug("Selected node", "node", workerID)
nodeData, exists := GetNode(fs.service, workerID)
if !exists {
xlog.Error("Node not found", "node", workerID)
fs.sendHTMLResponse(conn, 404, "Node not found")
return
}
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
if fs.loadBalanced {
fs.RecordRequest(workerID)
proxyHTTPToPeer(ctx, node, nodeData.ServiceID, conn, req, upgrade)
fs.RecordRequest(workerID)
if !upgrade {
fs.observeServed(model, chain, workerID, now)
}
}()
}
@@ -132,6 +213,8 @@ func getHTTPStatusText(statusCode int) string {
switch statusCode {
case 503:
return "Service Unavailable"
case 413:
return "Request Entity Too Large"
case 404:
return "Not Found"
case 200:

196
core/p2p/federated_test.go Normal file
View File

@@ -0,0 +1,196 @@
package p2p
import (
"bufio"
"strings"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
"github.com/mudler/LocalAI/pkg/clusterrouting"
)
var _ = Describe("buildFederatedCandidates", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
onlineSeen := ref.Add(-10 * time.Second)
offlineSeen := ref.Add(-2 * time.Minute)
It("excludes offline nodes", func() {
nodes := []schema.NodeData{
{ID: "online", LastSeen: onlineSeen},
{ID: "offline", LastSeen: offlineSeen},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("online"))
})
It("maps the request counter to InFlight and defaults missing entries to zero", func() {
nodes := []schema.NodeData{
{ID: "a", LastSeen: onlineSeen},
{ID: "b", LastSeen: onlineSeen},
}
cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "")
byID := map[string]int{}
for _, c := range cands {
byID[c.NodeID] = c.InFlight
}
Expect(byID["a"]).To(Equal(4))
Expect(byID["b"]).To(Equal(0))
})
It("carries gossiped AvailableVRAM into the candidate", func() {
nodes := []schema.NodeData{
{ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000)))
})
It("produces candidates the shared policy ranks by least in-flight then most VRAM", func() {
// busy-big has the most VRAM but is busy, so it must lose. Among the two
// idle peers, the one with more free VRAM wins (VRAM breaks the tie).
nodes := []schema.NodeData{
{ID: "busy-big", LastSeen: onlineSeen, AvailableVRAM: 80_000_000_000},
{ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000},
{ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
}
cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "")
best := clusterrouting.PickBestReplica(cands)
Expect(best).ToNot(BeNil())
Expect(best.NodeID).To(Equal("idle-big"))
})
})
var _ = Describe("model-aware candidate building", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
seen := ref.Add(-10 * time.Second)
It("keeps peers that advertise the requested model", func() {
nodes := []schema.NodeData{
{ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}},
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("has"))
})
It("keeps peers with an empty (unknown) model set eligible for any model", func() {
nodes := []schema.NodeData{
{ID: "unknown", LastSeen: seen, Models: nil},
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
Expect(cands).To(HaveLen(1))
Expect(cands[0].NodeID).To(Equal("unknown"))
})
It("does not filter when the requested model is empty", func() {
nodes := []schema.NodeData{
{ID: "a", LastSeen: seen, Models: []string{"x"}},
{ID: "b", LastSeen: seen, Models: []string{"y"}},
}
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
Expect(cands).To(HaveLen(2))
})
})
var _ = Describe("extractModel", func() {
It("reads the JSON body model field", func() {
body := []byte(`{"model":"llama-3","messages":[]}`)
Expect(extractModel("", body)).To(Equal("llama-3"))
})
It("prefers a path/query model over the body", func() {
body := []byte(`{"model":"frombody"}`)
Expect(extractModel("fromquery", body)).To(Equal("fromquery"))
})
It("returns empty when no model is present", func() {
Expect(extractModel("", []byte(`{"messages":[]}`))).To(Equal(""))
})
It("returns empty on non-JSON / unparseable body without panicking", func() {
Expect(extractModel("", []byte("--multipart-boundary--"))).To(Equal(""))
})
})
var _ = Describe("affinityPreferred", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
It("returns the warm peer once a chain has been observed for it", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
})
It("returns empty when no chain has been observed", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal(""))
})
It("returns empty for a nil index or empty chain", func() {
cfg := prefixcache.DefaultConfig()
Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal(""))
idx := prefixcache.NewIndex(cfg)
Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal(""))
})
})
var _ = Describe("L7 request handling", func() {
It("reads a buffered request and its body under the cap", func() {
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 28\r\n\r\n" +
`{"model":"m1","messages":[]}`
req, body, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
Expect(err).ToNot(HaveOccurred())
Expect(req.URL.Path).To(Equal("/v1/chat/completions"))
Expect(string(body)).To(ContainSubstring(`"model":"m1"`))
})
It("rejects a body over the cap with ErrBodyTooLarge", func() {
big := strings.Repeat("a", 200)
raw := "POST /x HTTP/1.1\r\nHost: x\r\nContent-Length: 200\r\n\r\n" + big
_, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 64)
Expect(err).To(MatchError(ErrBodyTooLarge))
})
It("detects a websocket upgrade request", func() {
raw := "GET /v1/realtime HTTP/1.1\r\nHost: x\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n"
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
Expect(err).ToNot(HaveOccurred())
Expect(isWebsocketUpgrade(req)).To(BeTrue())
})
It("does not flag a normal POST as a websocket upgrade", func() {
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 2\r\n\r\n{}"
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
Expect(err).ToNot(HaveOccurred())
Expect(isWebsocketUpgrade(req)).To(BeFalse())
})
})
var _ = Describe("affinityPreferred with a sync provider", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
It("returns the warm peer when the provider is a Sync wrapping the index", func() {
cfg := prefixcache.DefaultConfig()
idx := prefixcache.NewIndex(cfg)
sync := prefixcache.NewSync(idx, nil)
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"a long shared system prompt for affinity"}]}`, cfg)
sync.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
Expect(affinityPreferred(sync, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
})
})

View File

@@ -6,15 +6,18 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/LocalAI/pkg/xsysinfo"
"github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
@@ -86,37 +89,39 @@ func nodeAnnounce(ctx context.Context, node *node.Node) {
)
}
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
ledger, _ := node.Ledger()
// openPeerStream resolves serviceID to its advertised peer in the services
// ledger and opens a libp2p stream to that peer over the service protocol.
// Returns the stream or an error describing which lookup step failed.
func openPeerStream(ctx context.Context, n *node.Node, serviceID string) (network.Stream, error) {
ledger, _ := n.Ledger()
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
return nil, errors.New("service not found on blockchain")
}
// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error("cannot decode peer")
conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
return nil, fmt.Errorf("cannot decode peer: %w", err)
}
// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
stream, err := n.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error("cannot open stream peer", "error", err)
return nil, fmt.Errorf("cannot open stream peer: %w", err)
}
return stream, nil
}
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
stream, err := openPeerStream(ctx, node, serviceID)
if err != nil {
zlog.Error("Could not open peer stream", "error", err)
conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
@@ -130,6 +135,45 @@ func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string,
conn.Close()
}
// proxyHTTPToPeer forwards an already-parsed HTTP request to the chosen peer
// over a libp2p stream and streams the response back to conn. When duplex is
// true (a websocket upgrade) it runs a bidirectional copy after writing the
// request, so post-101 frames flow both ways. The response is never buffered,
// so SSE keeps flowing.
func proxyHTTPToPeer(ctx context.Context, n *node.Node, serviceID string, conn net.Conn, req *http.Request, duplex bool) {
stream, err := openPeerStream(ctx, n, serviceID)
if err != nil {
zlog.Error("Could not open peer stream", "error", err)
_ = conn.Close()
return
}
// Force the peer to close after responding so the one-way io.Copy below
// terminates. Without this the peer keeps the HTTP/1.1 connection alive and
// io.Copy(conn, stream) blocks forever, leaking the goroutine, conn, and
// stream. Websocket upgrades keep keep-alive: their duplex copy owns the
// lifetime.
req.Header.Del("Connection")
req.Close = !duplex
if err := req.Write(stream); err != nil {
zlog.Error("Could not write request to peer", "error", err)
_ = stream.Close()
_ = conn.Close()
return
}
if duplex {
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer
_ = stream.Close()
_ = conn.Close()
return
}
_, _ = io.Copy(conn, stream)
_ = stream.Close()
_ = conn.Close()
}
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info("Allocating service", "service", service, "address", listenAddr)
// Open local port for listening
@@ -311,7 +355,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv
}
// This is the P2P worker main
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
func ExposeService(ctx context.Context, host, port, token, servicesID string, modelsFn func() []string) (*node.Node, error) {
if servicesID == "" {
servicesID = defaultServicesID
}
@@ -347,10 +391,16 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
20*time.Second,
func() {
updatedMap := map[string]any{}
var models []string
if modelsFn != nil {
models = modelsFn()
}
updatedMap[name] = &schema.NodeData{
Name: name,
LastSeen: time.Now(),
ID: nodeID(name),
Name: name,
LastSeen: time.Now(),
ID: nodeID(name),
AvailableVRAM: xsysinfo.GetGPUAggregateInfo().FreeVRAM,
Models: models,
}
ledger.Add(servicesID, updatedMap)
},
@@ -359,11 +409,12 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
return n, err
}
func NewNode(token string) (*node.Node, error) {
func NewNode(token string, extraOpts ...node.Option) (*node.Node, error) {
nodeOpts, err := newNodeOpts(token)
if err != nil {
return nil, err
}
nodeOpts = append(nodeOpts, extraOpts...)
n, err := node.New(nodeOpts...)
if err != nil {

View File

@@ -0,0 +1,13 @@
package p2p
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestP2P(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "P2P Suite")
}

View File

@@ -121,18 +121,38 @@ type StoresFindResponse struct {
Similarities []float32 `json:"similarities" yaml:"similarities"`
}
// NodeOnlineWindow is how long after its last announce a node still counts as
// online. Nodes re-announce into the edgevpn ledger every 20s (see core/p2p
// ExposeService), so 40s tolerates a single missed announce.
const NodeOnlineWindow = 40 * time.Second
type NodeData struct {
Name string
ID string
TunnelAddress string
ServiceID string
LastSeen time.Time
// AvailableVRAM is the node's free GPU VRAM in bytes at its last announce,
// gossiped so federation selection can prefer peers with more headroom.
// Zero for CPU-only nodes and for peers on an older version that does not
// publish it; the routing policy treats zero as the lowest VRAM tier.
AvailableVRAM uint64
// Models is the set of model names this peer currently serves, gossiped so
// the federation proxy can route a request only to peers that have the
// requested model. Empty means "unknown" (an older peer, or one that has
// not loaded any model yet) and is treated as eligible for any model so a
// mixed-version swarm is not starved.
Models []string
}
func (d NodeData) IsOnline() bool {
now := time.Now()
// if the node was seen in the last 40 seconds, it's online
return now.Sub(d.LastSeen) < 40*time.Second
return d.IsOnlineAt(time.Now())
}
// IsOnlineAt reports whether the node counts as online relative to now. It is
// split from IsOnline so selection logic can be exercised with a fixed clock.
func (d NodeData) IsOnlineAt(now time.Time) bool {
return now.Sub(d.LastSeen) < NodeOnlineWindow
}
type P2PNodesResponse struct {

View File

@@ -0,0 +1,39 @@
package schema_test
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/schema"
)
var _ = Describe("NodeData", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
It("is online within the online window", func() {
nd := schema.NodeData{LastSeen: ref.Add(-30 * time.Second)}
Expect(nd.IsOnlineAt(ref)).To(BeTrue())
})
It("is offline once the online window has elapsed", func() {
nd := schema.NodeData{LastSeen: ref.Add(-50 * time.Second)}
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
})
It("treats exactly the window boundary as offline (strict less-than)", func() {
nd := schema.NodeData{LastSeen: ref.Add(-schema.NodeOnlineWindow)}
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
})
It("carries AvailableVRAM in bytes", func() {
nd := schema.NodeData{AvailableVRAM: 8_000_000_000}
Expect(nd.AvailableVRAM).To(Equal(uint64(8_000_000_000)))
})
It("carries the advertised model set", func() {
nd := schema.NodeData{Models: []string{"llama-3", "qwen"}}
Expect(nd.Models).To(ConsistOf("llama-3", "qwen"))
})
})

View File

@@ -0,0 +1,33 @@
package clusterrouting
// PickWithAffinity prefers the candidate whose NodeID equals preferredNodeID
// when that candidate's in-flight count is within slack of the least-loaded
// candidate; otherwise it falls back to PickBestReplica (least in-flight, then
// oldest last-used, then most free VRAM). This keeps a warm prefix-cache peer
// sticky without letting it become a hot-spot: once it is more than slack
// requests busier than the least-loaded peer, load wins. With an empty
// preferredNodeID, or a preferred node not in the set, it is exactly
// PickBestReplica. slack mirrors prefixcache's BalanceAbsThreshold.
func PickWithAffinity(candidates []ReplicaCandidate, preferredNodeID string, slack int) *ReplicaCandidate {
if len(candidates) == 0 {
return nil
}
if preferredNodeID == "" {
return PickBestReplica(candidates)
}
var preferred *ReplicaCandidate
minInFlight := candidates[0].InFlight
for i := range candidates {
c := &candidates[i]
if c.InFlight < minInFlight {
minInFlight = c.InFlight
}
if c.NodeID == preferredNodeID {
preferred = c
}
}
if preferred != nil && preferred.InFlight <= minInFlight+slack {
return preferred
}
return PickBestReplica(candidates)
}

View File

@@ -0,0 +1,48 @@
package clusterrouting
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("PickWithAffinity", func() {
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
It("returns nil for an empty candidate list", func() {
Expect(PickWithAffinity(nil, "x", 2)).To(BeNil())
})
It("falls back to PickBestReplica when no preferred node is given", func() {
cs := []ReplicaCandidate{
{NodeID: "busy", InFlight: 3, LastUsed: ref, AvailableVRAM: 80},
{NodeID: "idle", InFlight: 0, LastUsed: ref, AvailableVRAM: 8},
}
Expect(PickWithAffinity(cs, "", 2).NodeID).To(Equal("idle"))
})
It("honors the preferred node when it is within the in-flight slack of the least-loaded", func() {
cs := []ReplicaCandidate{
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
{NodeID: "warm", InFlight: 2, LastUsed: ref, AvailableVRAM: 8},
}
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("warm"))
})
It("ignores the preferred node when it is beyond the slack and falls back to load+VRAM", func() {
cs := []ReplicaCandidate{
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
{NodeID: "warm", InFlight: 5, LastUsed: ref, AvailableVRAM: 8},
}
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("cold"))
})
It("falls back to load+VRAM when the preferred node is not among the candidates", func() {
cs := []ReplicaCandidate{
{NodeID: "a", InFlight: 1, LastUsed: ref, AvailableVRAM: 8},
{NodeID: "b", InFlight: 1, LastUsed: ref, AvailableVRAM: 24},
}
Expect(PickWithAffinity(cs, "ghost", 2).NodeID).To(Equal("b"))
})
})