mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-06 15:56:06 -04:00
Compare commits
18 Commits
feat/realt
...
feat/p2p-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c410cd7253 | ||
|
|
4e993332af | ||
|
|
450376d22f | ||
|
|
8180fddc05 | ||
|
|
5033457f57 | ||
|
|
d88758282a | ||
|
|
a0c7cecddd | ||
|
|
bc42374d8a | ||
|
|
ec2a0645dd | ||
|
|
ce8b97edf2 | ||
|
|
91fc26ff75 | ||
|
|
8df0bb683b | ||
|
|
8ec536a34c | ||
|
|
14b57aa343 | ||
|
|
288d732af7 | ||
|
|
ed38609d51 | ||
|
|
7768b35696 | ||
|
|
830f818c58 |
@@ -53,9 +53,21 @@ func (a *Application) StartP2P() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// modelsFn reports the model names this instance currently serves so the
|
||||
// federation proxy can route a request only to peers that have the
|
||||
// requested model. It is re-evaluated on every announce tick.
|
||||
modelsFn := func() []string {
|
||||
cfgs := a.ModelConfigLoader().GetAllModelsConfigs()
|
||||
names := make([]string, 0, len(cfgs))
|
||||
for _, c := range cfgs {
|
||||
names = append(names, c.Name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// Here a new node is created and started
|
||||
// and a service is exposed by the node
|
||||
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID))
|
||||
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), modelsFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -14,12 +14,14 @@ type FederatedCLI struct {
|
||||
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
|
||||
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
|
||||
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
|
||||
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"`
|
||||
AffinitySync bool `env:"LOCALAI_FEDERATED_AFFINITY_SYNC,FEDERATED_AFFINITY_SYNC" default:"false" help:"Broadcast prefix-cache affinity observations to other federation servers over the p2p generic channel (enable on every federation server that should cohere)" group:"p2p"`
|
||||
}
|
||||
|
||||
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
||||
warnDeprecatedFlags()
|
||||
|
||||
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
|
||||
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024, f.AffinitySync)
|
||||
|
||||
c, cancel := context.WithCancel(context.Background())
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||
p = r.RunnerPort
|
||||
}
|
||||
|
||||
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -104,7 +104,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func (r *P2PMLX) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
37
core/http/react-ui/e2e/cluster.spec.js
Normal file
37
core/http/react-ui/e2e/cluster.spec.js
Normal file
@@ -0,0 +1,37 @@
|
||||
import { test, expect } from './coverage-fixtures.js'
|
||||
|
||||
// The Cluster page composes two capability sections: "Distributed (NATS)" (the
|
||||
// former Nodes page) and "Swarm (p2p)" (the former P2P page). Each section only
|
||||
// mounts when its mode is enabled — distributed when /api/nodes answers OK, swarm
|
||||
// when a non-empty p2p network token is present. We mock those probes so the page
|
||||
// renders against the standalone ui-test-server without NATS / p2p running.
|
||||
|
||||
async function mockDistributedOnly(page) {
|
||||
await page.route('**/api/nodes', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
|
||||
})
|
||||
await page.route('**/api/nodes/scheduling', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
|
||||
})
|
||||
// Swarm disabled: token probe fails, so the swarm section stays hidden.
|
||||
await page.route('**/api/p2p/token', (route) => {
|
||||
route.fulfill({ status: 503, contentType: 'text/plain', body: '' })
|
||||
})
|
||||
}
|
||||
|
||||
test.describe('Cluster page', () => {
|
||||
test('shows the page title', async ({ page }) => {
|
||||
await mockDistributedOnly(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
|
||||
})
|
||||
|
||||
test('shows the distributed section when /api/nodes responds', async ({ page }) => {
|
||||
await mockDistributedOnly(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
// The distributed capability section is titled "Distributed (NATS)".
|
||||
await expect(page.getByText(/Distributed \(NATS\)/i)).toBeVisible()
|
||||
})
|
||||
})
|
||||
@@ -23,4 +23,11 @@ test.describe('Navigation', () => {
|
||||
await expect(page).toHaveURL(/\/app\/traces/)
|
||||
await expect(page.getByRole('heading', { name: 'Traces', exact: true })).toBeVisible()
|
||||
})
|
||||
|
||||
test('old cluster routes redirect to /app/cluster', async ({ page }) => {
|
||||
await page.goto('/app/p2p')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await page.goto('/app/nodes')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -81,7 +81,7 @@ async function mockDistributedNodes(page, { onDelete } = {}) {
|
||||
}
|
||||
|
||||
async function expandNodeAndWaitForBackends(page) {
|
||||
await page.goto('/app/nodes')
|
||||
await page.goto('/app/cluster')
|
||||
// Click the row to expand it. The chevron toggle and the row both work,
|
||||
// but clicking the name cell is the most user-like.
|
||||
await page.getByText(NODE_NAME).first().click()
|
||||
|
||||
@@ -1,26 +1,65 @@
|
||||
import { test, expect } from './coverage-fixtures.js'
|
||||
|
||||
// P2P (Swarm) admin page — renders in the no-auth test harness (isAdmin).
|
||||
test.describe('P2P page', () => {
|
||||
test.beforeEach(async ({ page }) => {
|
||||
// The standalone P2P (Swarm) page was merged into the Cluster page: /app/p2p now
|
||||
// redirects to /app/cluster, and the p2p content lives under the "Swarm (p2p)"
|
||||
// section. That section only mounts when p2p is enabled (a network token is
|
||||
// present), so we mock /api/p2p/token to return a non-empty token and assert the
|
||||
// swarm content renders under the cluster page.
|
||||
const P2P_TOKEN = 'test-network-token'
|
||||
|
||||
async function mockSwarmEnabled(page) {
|
||||
await page.route('**/api/p2p/token', (route) => {
|
||||
route.fulfill({
|
||||
status: 200,
|
||||
contentType: 'text/plain',
|
||||
body: P2P_TOKEN,
|
||||
})
|
||||
})
|
||||
await page.route('**/api/p2p/workers', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
|
||||
})
|
||||
await page.route('**/api/p2p/federation', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
|
||||
})
|
||||
await page.route('**/api/p2p/stats', (route) => {
|
||||
route.fulfill({
|
||||
status: 200,
|
||||
contentType: 'application/json',
|
||||
body: JSON.stringify({
|
||||
llama_cpp_workers: { online: 0, total: 0 },
|
||||
federated: { online: 0, total: 0 },
|
||||
mlx_workers: { online: 0, total: 0 },
|
||||
}),
|
||||
})
|
||||
})
|
||||
// The cluster page also probes /api/nodes for the distributed section; keep it
|
||||
// failing (distributed disabled) so only the swarm section renders here.
|
||||
await page.route('**/api/nodes', (route) => {
|
||||
route.fulfill({ status: 503, contentType: 'application/json', body: '{}' })
|
||||
})
|
||||
}
|
||||
|
||||
test.describe('P2P (Swarm) section on the Cluster page', () => {
|
||||
test('the old /app/p2p route lands on the cluster page', async ({ page }) => {
|
||||
await mockSwarmEnabled(page)
|
||||
// /app/p2p redirects to /app/cluster.
|
||||
await page.goto('/app/p2p')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
|
||||
})
|
||||
|
||||
test('renders the P2P distribution overview and capability cards', async ({ page }) => {
|
||||
await expect(page).toHaveURL(/\/app\/p2p$/)
|
||||
await expect(page.getByRole('heading', { name: /P2P Distribution Not Enabled/i })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Instance Federation' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Model Sharding' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Resource Sharing' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
|
||||
})
|
||||
test('renders the Swarm (p2p) section when p2p is enabled', async ({ page }) => {
|
||||
await mockSwarmEnabled(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
|
||||
test('hardware selector offers build targets and responds to selection', async ({ page }) => {
|
||||
const cpu = page.getByRole('button').filter({ hasText: /^CPU$/ })
|
||||
const cuda = page.getByRole('button').filter({ hasText: /^CUDA 12$/ })
|
||||
await expect(cpu).toBeVisible()
|
||||
await expect(cuda).toBeVisible()
|
||||
await cuda.click() // selecting a build target must not break the page
|
||||
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
|
||||
// The collapsible swarm section is titled "Swarm (p2p)".
|
||||
await expect(page.getByText(/Swarm \(p2p\)/i)).toBeVisible()
|
||||
|
||||
// The enabled p2p content (Network Token panel + the federation / sharding
|
||||
// tabs) is rendered inside the swarm section.
|
||||
await expect(page.getByRole('heading', { name: /Network Token/i })).toBeVisible()
|
||||
await expect(page.getByText('Federation', { exact: true })).toBeVisible()
|
||||
await expect(page.getByText('Model Sharding', { exact: true })).toBeVisible()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -20,7 +20,7 @@ const PAGES = [
|
||||
['/app/manage', 'Manage'],
|
||||
['/app/backends', 'Backends'],
|
||||
['/app/settings', 'Settings'],
|
||||
['/app/nodes', 'Nodes'],
|
||||
['/app/cluster', 'Cluster'],
|
||||
['/app/face', 'Face recognition'],
|
||||
['/app/voice', 'Voice recognition'],
|
||||
['/app/fine-tune', 'Fine-tuning'],
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorer",
|
||||
"subtitle": "Dateien und Konfiguration durchsuchen"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Verteilte und Peer-to-Peer-Knoten, die diese Instanz bedienen",
|
||||
"empty": "Es ist kein verteiltes oder p2p-Clustering aktiviert. Starte LocalAI im verteilten oder föderierten/p2p-Modus, um hier Cluster-Knoten zu verwalten.",
|
||||
"distributed": {
|
||||
"title": "Verteilt (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Verteilte Knoten",
|
||||
"inFlight": "Laufende Anfragen",
|
||||
"peers": "Swarm-Peers online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Traces",
|
||||
"nodes": "Knoten",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "System",
|
||||
"settings": "Einstellungen",
|
||||
"api": "API"
|
||||
|
||||
@@ -81,5 +81,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorer",
|
||||
"subtitle": "Browse files and configuration"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Distributed and peer-to-peer nodes serving this instance",
|
||||
"empty": "No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.",
|
||||
"distributed": {
|
||||
"title": "Distributed (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Distributed nodes",
|
||||
"inFlight": "In-flight requests",
|
||||
"peers": "Swarm peers online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
"traces": "Traces",
|
||||
"nodes": "Nodes",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "System",
|
||||
"settings": "Settings",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorador",
|
||||
"subtitle": "Explora archivos y configuración"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Clúster",
|
||||
"subtitle": "Nodos distribuidos y entre pares que sirven a esta instancia",
|
||||
"empty": "No hay clustering distribuido ni p2p habilitado. Inicia LocalAI en modo distribuido o federado/p2p para gestionar aquí los nodos del clúster.",
|
||||
"distributed": {
|
||||
"title": "Distribuido (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Nodos distribuidos",
|
||||
"inFlight": "Solicitudes en curso",
|
||||
"peers": "Pares de Swarm en línea"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Trazas",
|
||||
"nodes": "Nodos",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Clúster",
|
||||
"system": "Sistema",
|
||||
"settings": "Configuración",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Esplora risorse",
|
||||
"subtitle": "Sfoglia file e configurazioni"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Nodi distribuiti e peer-to-peer al servizio di questa istanza",
|
||||
"empty": "Nessun clustering distribuito o p2p è abilitato. Avvia LocalAI in modalità distribuita o federata/p2p per gestire qui i nodi del cluster.",
|
||||
"distributed": {
|
||||
"title": "Distribuito (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Nodi distribuiti",
|
||||
"inFlight": "Richieste in corso",
|
||||
"peers": "Peer Swarm online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Tracce",
|
||||
"nodes": "Nodi",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "Sistema",
|
||||
"settings": "Impostazioni",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "资源浏览器",
|
||||
"subtitle": "浏览文件和配置"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "集群",
|
||||
"subtitle": "为此实例提供服务的分布式和点对点节点",
|
||||
"empty": "未启用分布式或 p2p 集群。请以分布式或联邦/p2p 模式启动 LocalAI,以便在此管理集群节点。",
|
||||
"distributed": {
|
||||
"title": "分布式 (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "分布式节点",
|
||||
"inFlight": "进行中的请求",
|
||||
"peers": "在线 Swarm 节点"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "追踪",
|
||||
"nodes": "节点",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "集群",
|
||||
"system": "系统",
|
||||
"settings": "设置",
|
||||
"api": "API"
|
||||
|
||||
27
core/http/react-ui/src/components/ClusterSection.jsx
Normal file
27
core/http/react-ui/src/components/ClusterSection.jsx
Normal file
@@ -0,0 +1,27 @@
|
||||
import { useState } from 'react'
|
||||
|
||||
// ClusterSection is a collapsible, titled container for one capability area of
|
||||
// the Cluster page (Distributed / Swarm). Default expanded.
|
||||
export default function ClusterSection({ icon, title, subtitle, defaultOpen = true, children }) {
|
||||
const [open, setOpen] = useState(defaultOpen)
|
||||
return (
|
||||
<section className="card" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
<button
|
||||
type="button"
|
||||
aria-expanded={open}
|
||||
onClick={() => setOpen((o) => !o)}
|
||||
style={{
|
||||
display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)',
|
||||
width: '100%', padding: 'var(--spacing-md)', background: 'none',
|
||||
border: 'none', cursor: 'pointer', textAlign: 'left', color: 'inherit',
|
||||
}}
|
||||
>
|
||||
<i className={`fas fa-chevron-${open ? 'down' : 'right'}`} style={{ width: '1rem', color: 'var(--color-text-muted)' }} />
|
||||
{icon && <i className={icon} style={{ color: 'var(--color-primary)' }} />}
|
||||
<span style={{ fontWeight: 600 }}>{title}</span>
|
||||
{subtitle && <span style={{ marginLeft: 'auto', color: 'var(--color-text-muted)', fontSize: '0.875rem' }}>{subtitle}</span>}
|
||||
</button>
|
||||
{open && <div style={{ padding: '0 var(--spacing-md) var(--spacing-md)' }}>{children}</div>}
|
||||
</section>
|
||||
)
|
||||
}
|
||||
44
core/http/react-ui/src/components/ClusterSummary.jsx
Normal file
44
core/http/react-ui/src/components/ClusterSummary.jsx
Normal file
@@ -0,0 +1,44 @@
|
||||
import { useEffect, useState } from 'react'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import StatCard from './StatCard'
|
||||
import { nodesApi, p2pApi } from '../utils/api'
|
||||
|
||||
// ClusterSummary shows merged totals across both transports. Self-contained
|
||||
// (own lightweight fetch) so the page composes without lifting state out of the
|
||||
// two large section components.
|
||||
export default function ClusterSummary({ distributedEnabled, p2pEnabled }) {
|
||||
const { t } = useTranslation('admin')
|
||||
const [nats, setNats] = useState({ nodes: 0, inFlight: 0 })
|
||||
const [swarm, setSwarm] = useState({ online: 0, total: 0 })
|
||||
|
||||
useEffect(() => {
|
||||
let active = true
|
||||
async function load() {
|
||||
if (distributedEnabled) {
|
||||
try {
|
||||
const list = await nodesApi.list()
|
||||
const nodes = Array.isArray(list) ? list : (list?.nodes ?? [])
|
||||
if (active) setNats({ nodes: nodes.length, inFlight: nodes.reduce((a, n) => a + (n.in_flight_count || 0), 0) })
|
||||
} catch { /* leave zeros */ }
|
||||
}
|
||||
if (p2pEnabled) {
|
||||
try {
|
||||
const stats = await p2pApi.getStats()
|
||||
const online = (stats?.federated?.online || 0) + (stats?.llama_cpp_workers?.online || 0) + (stats?.mlx_workers?.online || 0)
|
||||
const total = (stats?.federated?.total || 0) + (stats?.llama_cpp_workers?.total || 0) + (stats?.mlx_workers?.total || 0)
|
||||
if (active) setSwarm({ online, total })
|
||||
} catch { /* leave zeros */ }
|
||||
}
|
||||
}
|
||||
load()
|
||||
return () => { active = false }
|
||||
}, [distributedEnabled, p2pEnabled])
|
||||
|
||||
return (
|
||||
<div className="stat-grid" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
{distributedEnabled && <StatCard icon="fas fa-network-wired" label={t('cluster.summary.nodes', 'Distributed nodes')} value={nats.nodes} />}
|
||||
{distributedEnabled && <StatCard icon="fas fa-bolt" label={t('cluster.summary.inFlight', 'In-flight requests')} value={nats.inFlight} />}
|
||||
{p2pEnabled && <StatCard icon="fas fa-circle-nodes" label={t('cluster.summary.peers', 'Swarm peers online')} value={`${swarm.online}/${swarm.total}`} />}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -479,7 +479,7 @@ export default function NodeInstallPicker({
|
||||
Approve pending workers or register new ones.
|
||||
{pendingCount > 0 && ` (${pendingCount} awaiting approval.)`}
|
||||
</p>
|
||||
<a className="btn btn-secondary btn-sm" href="/app/nodes">
|
||||
<a className="btn btn-secondary btn-sm" href="/app/cluster">
|
||||
<i className="fas fa-network-wired" /> Manage nodes
|
||||
</a>
|
||||
</div>
|
||||
@@ -672,7 +672,7 @@ export default function NodeInstallPicker({
|
||||
|
||||
{pendingCount > 0 && (
|
||||
<p style={{ fontSize: '0.75rem', color: 'var(--color-text-muted)', marginTop: 0, marginBottom: 'var(--spacing-sm)' }}>
|
||||
+{pendingCount} awaiting approval — <a href="/app/nodes" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
|
||||
+{pendingCount} awaiting approval — <a href="/app/cluster" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
|
||||
</p>
|
||||
)}
|
||||
|
||||
|
||||
@@ -75,8 +75,7 @@ const sections = [
|
||||
{ path: '/app/middleware', icon: 'fas fa-shield-halved', labelKey: 'items.middleware', adminOnly: true },
|
||||
{ path: '/app/backends', icon: 'fas fa-server', labelKey: 'items.backends', adminOnly: true },
|
||||
{ path: '/app/traces', icon: 'fas fa-chart-line', labelKey: 'items.traces', adminOnly: true },
|
||||
{ path: '/app/nodes', icon: 'fas fa-network-wired', labelKey: 'items.nodes', adminOnly: true, feature: 'distributed' },
|
||||
{ path: '/app/p2p', icon: 'fas fa-circle-nodes', labelKey: 'items.swarm', adminOnly: true },
|
||||
{ path: '/app/cluster', icon: 'fas fa-network-wired', labelKey: 'items.cluster', adminOnly: true },
|
||||
{ path: '/app/manage', icon: 'fas fa-desktop', labelKey: 'items.system', adminOnly: true },
|
||||
{ path: '/app/settings', icon: 'fas fa-cog', labelKey: 'items.settings', adminOnly: true },
|
||||
],
|
||||
|
||||
31
core/http/react-ui/src/hooks/useP2PMode.js
vendored
Normal file
31
core/http/react-ui/src/hooks/useP2PMode.js
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
import { useState, useEffect, useCallback } from 'react'
|
||||
import { p2pApi } from '../utils/api'
|
||||
|
||||
// useP2PMode reports whether p2p / swarm mode is available, mirroring
|
||||
// useDistributedMode. Availability is "a network token exists" (the same signal
|
||||
// the standalone P2P page used). One-shot probe on mount plus a manual refetch.
|
||||
//
|
||||
// Returns:
|
||||
// enabled — true when a non-empty network token is present
|
||||
// loading — true until the first probe completes
|
||||
// refetch — manual trigger to re-run the probe
|
||||
export function useP2PMode() {
|
||||
const [enabled, setEnabled] = useState(false)
|
||||
const [loading, setLoading] = useState(true)
|
||||
|
||||
const probe = useCallback(async () => {
|
||||
setLoading(true)
|
||||
try {
|
||||
const token = await p2pApi.getToken()
|
||||
setEnabled(!!(token && String(token).trim()))
|
||||
} catch {
|
||||
setEnabled(false)
|
||||
} finally {
|
||||
setLoading(false)
|
||||
}
|
||||
}, [])
|
||||
|
||||
useEffect(() => { probe() }, [probe])
|
||||
|
||||
return { enabled, loading, refetch: probe }
|
||||
}
|
||||
@@ -339,7 +339,7 @@ function DistributedBackendLogsResolver({ modelId, fromTimestamp }) {
|
||||
<h2 className="empty-state-title">Model not loaded on any worker</h2>
|
||||
<p className="empty-state-text">
|
||||
<span style={{ fontFamily: 'var(--font-mono)' }}>{modelId}</span> isn't currently loaded on any node in the cluster.
|
||||
Check the <Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
|
||||
Check the <Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -49,7 +49,7 @@ export default function Backends() {
|
||||
// whenever splitMenuFor changes to a different row index.
|
||||
const splitMenuAnchorRef = useRef(null)
|
||||
|
||||
// Target-node mode: set when navigated from /app/nodes via "+ Add backend".
|
||||
// Target-node mode: set when navigated from /app/cluster via "+ Add backend".
|
||||
// The gallery page header banners the scope; rows collapse their split-button
|
||||
// to a single Install-on-this-node action; manual install posts to the
|
||||
// per-node endpoint.
|
||||
@@ -323,7 +323,7 @@ export default function Backends() {
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
{/* Target-node banner: when this gallery is scoped to one node via
|
||||
?target=<id> (entered from /app/nodes), show the scope clearly and
|
||||
?target=<id> (entered from /app/cluster), show the scope clearly and
|
||||
give a fast way to clear it. Visually a primary-tinted strip so the
|
||||
user knows they're in a special mode without it feeling alarming. */}
|
||||
{targetNode && (
|
||||
|
||||
45
core/http/react-ui/src/pages/Cluster.jsx
Normal file
45
core/http/react-ui/src/pages/Cluster.jsx
Normal file
@@ -0,0 +1,45 @@
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import { useDistributedMode } from '../hooks/useDistributedMode'
|
||||
import { useP2PMode } from '../hooks/useP2PMode'
|
||||
import ClusterSection from '../components/ClusterSection'
|
||||
import ClusterSummary from '../components/ClusterSummary'
|
||||
import Nodes from './Nodes'
|
||||
import P2P from './P2P'
|
||||
|
||||
export default function Cluster() {
|
||||
const { t } = useTranslation('admin')
|
||||
const distributed = useDistributedMode()
|
||||
const p2p = useP2PMode()
|
||||
|
||||
const loading = distributed.loading || p2p.loading
|
||||
const nothingEnabled = !loading && !distributed.enabled && !p2p.enabled
|
||||
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title"><i className="fas fa-network-wired" /> {t('cluster.title', 'Cluster')}</h1>
|
||||
<p className="page-subtitle">{t('cluster.subtitle', 'Distributed and peer-to-peer nodes serving this instance')}</p>
|
||||
</div>
|
||||
|
||||
{!loading && <ClusterSummary distributedEnabled={distributed.enabled} p2pEnabled={p2p.enabled} />}
|
||||
|
||||
{distributed.enabled && (
|
||||
<ClusterSection icon="fas fa-network-wired" title={t('cluster.distributed.title', 'Distributed (NATS)')} defaultOpen>
|
||||
<Nodes embedded />
|
||||
</ClusterSection>
|
||||
)}
|
||||
|
||||
{p2p.enabled && (
|
||||
<ClusterSection icon="fas fa-circle-nodes" title={t('cluster.swarm.title', 'Swarm (p2p)')} defaultOpen={!distributed.enabled}>
|
||||
<P2P embedded />
|
||||
</ClusterSection>
|
||||
)}
|
||||
|
||||
{nothingEnabled && (
|
||||
<div className="card" style={{ padding: 'var(--spacing-lg)', textAlign: 'center', color: 'var(--color-text-muted)' }}>
|
||||
{t('cluster.empty', 'No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.')}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -162,7 +162,7 @@ export default function NodeBackendLogs() {
|
||||
<h2 className="empty-state-title">No node/model selected</h2>
|
||||
<p className="empty-state-text">
|
||||
View backend logs from the{' '}
|
||||
<Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
|
||||
<Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
@@ -220,7 +220,7 @@ export default function NodeBackendLogs() {
|
||||
</h1>
|
||||
<p className="page-subtitle" style={{ marginTop: 'var(--spacing-xs)' }}>
|
||||
Backend logs from node <strong>{nodeName || nodeId}</strong>
|
||||
{' '}<Link to="/app/nodes" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
|
||||
{' '}<Link to="/app/cluster" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -689,7 +689,7 @@ function SchedulingForm({ onSave, onCancel }) {
|
||||
)
|
||||
}
|
||||
|
||||
export default function Nodes() {
|
||||
export default function Nodes({ embedded = false }) {
|
||||
const { addToast } = useOutletContext()
|
||||
const navigate = useNavigate()
|
||||
const { t } = useTranslation('admin')
|
||||
@@ -983,16 +983,18 @@ export default function Nodes() {
|
||||
const pending = filteredNodes.filter(n => n.status === 'pending').length
|
||||
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('nodes.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('nodes.subtitle')}
|
||||
</p>
|
||||
</div>
|
||||
<div className={embedded ? '' : 'page page--wide'}>
|
||||
{!embedded && (
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('nodes.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('nodes.subtitle')}
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Tabs */}
|
||||
<div className="tabs" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
|
||||
@@ -102,7 +102,7 @@ function StepNumber({ n, bg, color }) {
|
||||
)
|
||||
}
|
||||
|
||||
export default function P2P() {
|
||||
export default function P2P({ embedded = false }) {
|
||||
const { addToast } = useOutletContext()
|
||||
const { t } = useTranslation('admin')
|
||||
const [workers, setWorkers] = useState([])
|
||||
@@ -172,7 +172,7 @@ export default function P2P() {
|
||||
|
||||
if (loading) {
|
||||
return (
|
||||
<div className="page page--narrow" style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
|
||||
<div className={embedded ? '' : 'page page--narrow'} style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
|
||||
<LoadingSpinner size="lg" />
|
||||
</div>
|
||||
)
|
||||
@@ -181,7 +181,7 @@ export default function P2P() {
|
||||
// ── P2P Disabled ──
|
||||
if (!enabled) {
|
||||
return (
|
||||
<div className="page page--narrow">
|
||||
<div className={embedded ? '' : 'page page--narrow'}>
|
||||
<div style={{ textAlign: 'center', padding: 'var(--spacing-xl) 0' }}>
|
||||
<i className="fas fa-network-wired" style={{ fontSize: '3rem', color: 'var(--color-primary)', marginBottom: 'var(--spacing-md)' }} />
|
||||
<h1 style={{ fontSize: '1.5rem', fontWeight: 600, marginBottom: 'var(--spacing-sm)' }}>
|
||||
@@ -294,21 +294,23 @@ export default function P2P() {
|
||||
const mlxTotal = stats.mlx_workers?.total ?? 0
|
||||
|
||||
return (
|
||||
<div className="page page--narrow">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('p2p.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('p2p.subtitle')}
|
||||
{' '}
|
||||
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
|
||||
style={{ color: 'var(--color-primary)' }}>
|
||||
<i className="fas fa-circle-info" />
|
||||
</a>
|
||||
</p>
|
||||
</div>
|
||||
<div className={embedded ? '' : 'page page--narrow'}>
|
||||
{!embedded && (
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('p2p.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('p2p.subtitle')}
|
||||
{' '}
|
||||
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
|
||||
style={{ color: 'var(--color-primary)' }}>
|
||||
<i className="fas fa-circle-info" />
|
||||
</a>
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Network Token */}
|
||||
<div style={{
|
||||
|
||||
@@ -45,7 +45,7 @@ const Talk = page('talk', () => import('./pages/Talk'))
|
||||
const Backends = page('backends', () => import('./pages/Backends'))
|
||||
const Settings = page('settings', () => import('./pages/Settings'))
|
||||
const Traces = page('traces', () => import('./pages/Traces'))
|
||||
const P2P = page('p2p', () => import('./pages/P2P'))
|
||||
const Cluster = page('cluster', () => import('./pages/Cluster'))
|
||||
const Agents = page('agents', () => import('./pages/Agents'))
|
||||
const AgentCreate = page(null, () => import('./pages/AgentCreate'))
|
||||
const AgentChat = page(null, () => import('./pages/AgentChat'))
|
||||
@@ -68,7 +68,6 @@ const Quantize = page('quantize', () => import('./pages/Quantize'))
|
||||
const Studio = page('studio', () => import('./pages/Studio'))
|
||||
const FaceRecognition = page('face', () => import('./pages/FaceRecognition'))
|
||||
const VoiceRecognition = page('voice', () => import('./pages/VoiceRecognition'))
|
||||
const Nodes = page('nodes', () => import('./pages/Nodes'))
|
||||
const NodeBackendLogs = page(null, () => import('./pages/NodeBackendLogs'))
|
||||
const NotFound = page(null, () => import('./pages/NotFound'))
|
||||
const Usage = page('usage', () => import('./pages/Usage'))
|
||||
@@ -120,8 +119,9 @@ const appChildren = [
|
||||
{ path: 'settings', element: <Admin><Settings /></Admin> },
|
||||
{ path: 'traces', element: <Admin><Traces /></Admin> },
|
||||
{ path: 'backend-logs/:modelId', element: <Admin><BackendLogs /></Admin> },
|
||||
{ path: 'p2p', element: <Admin><P2P /></Admin> },
|
||||
{ path: 'nodes', element: <Admin><Nodes /></Admin> },
|
||||
{ path: 'cluster', element: <Admin><Cluster /></Admin> },
|
||||
{ path: 'p2p', element: <Navigate to="/app/cluster" replace /> },
|
||||
{ path: 'nodes', element: <Navigate to="/app/cluster" replace /> },
|
||||
{ path: 'node-backend-logs/:nodeId/:modelId', element: <Admin><NodeBackendLogs /></Admin> },
|
||||
{ path: 'agents', element: <Feature feature="agents"><Agents /></Feature> },
|
||||
{ path: 'agents/new', element: <Feature feature="agents"><AgentCreate /></Feature> },
|
||||
|
||||
81
core/p2p/affinity_sync.go
Normal file
81
core/p2p/affinity_sync.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/blockchain"
|
||||
"github.com/mudler/edgevpn/pkg/hub"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// affinitySubjectKey is the hub.Message annotation carrying the logical subject
|
||||
// (observe vs invalidate) so the receiver can dispatch the way a NATS subject
|
||||
// would. The generic channel has no subject routing, so we carry it ourselves.
|
||||
const affinitySubjectKey = "subject"
|
||||
|
||||
// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to
|
||||
// the prefixcache publisher interface (Publish(subject, v)). It lets a
|
||||
// federation server reuse prefixcache.Sync for cross-server affinity coherence
|
||||
// without NATS: each event is JSON-encoded into a hub.Message and broadcast over
|
||||
// the generic channel (not the slow blockchain ledger).
|
||||
type genericChannelPublisher struct {
|
||||
node *node.Node
|
||||
}
|
||||
|
||||
// Publish satisfies prefixcache's (unexported) publisher interface structurally.
|
||||
func (p *genericChannelPublisher) Publish(subject string, v any) error {
|
||||
payload, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshalling affinity event: %w", err)
|
||||
}
|
||||
return p.node.PublishMessage(&hub.Message{
|
||||
Message: string(payload),
|
||||
Annotations: map[string]interface{}{affinitySubjectKey: subject},
|
||||
})
|
||||
}
|
||||
|
||||
// applyAffinityMessage decodes a generic-channel affinity message and applies it
|
||||
// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the
|
||||
// local clock so TTL is measured per server. Unknown subjects, malformed
|
||||
// payloads, and nil inputs are ignored (debug-logged), never fatal.
|
||||
func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) {
|
||||
if sync == nil || m == nil {
|
||||
return
|
||||
}
|
||||
subject, _ := m.Annotations[affinitySubjectKey].(string)
|
||||
switch subject {
|
||||
case messaging.SubjectPrefixCacheObserve:
|
||||
var ev messaging.PrefixCacheObserveEvent
|
||||
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
|
||||
xlog.Debug("affinity: bad observe payload", "error", err)
|
||||
return
|
||||
}
|
||||
sync.ApplyObserve(ev, now)
|
||||
case messaging.SubjectPrefixCacheInvalidate:
|
||||
var ev messaging.PrefixCacheInvalidateEvent
|
||||
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
|
||||
xlog.Debug("affinity: bad invalidate payload", "error", err)
|
||||
return
|
||||
}
|
||||
sync.ApplyInvalidate(ev)
|
||||
default:
|
||||
// Other generic-channel traffic; not ours.
|
||||
}
|
||||
}
|
||||
|
||||
// affinityHandler returns the edgevpn generic-channel handler that applies remote
|
||||
// affinity events to this server's index. It is registered at node construction
|
||||
// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is
|
||||
// safe because messages only arrive after Start, by which point Start has wired
|
||||
// fs.prefixSync.
|
||||
func (fs *FederatedServer) affinityHandler() node.Handler {
|
||||
return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error {
|
||||
applyAffinityMessage(fs.prefixSync, m, time.Now())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
48
core/p2p/affinity_sync_test.go
Normal file
48
core/p2p/affinity_sync_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/hub"
|
||||
)
|
||||
|
||||
var _ = Describe("applyAffinityMessage", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message {
|
||||
payload, _ := json.Marshal(ev)
|
||||
return &hub.Message{
|
||||
Message: string(payload),
|
||||
Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve},
|
||||
}
|
||||
}
|
||||
|
||||
It("applies a peer observe so the local index resolves the warm peer", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
sync := prefixcache.NewSync(idx, nil)
|
||||
chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg)
|
||||
|
||||
applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref)
|
||||
|
||||
d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref)
|
||||
Expect(d.HasHot).To(BeTrue())
|
||||
Expect(d.Hot.NodeID).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("ignores malformed, unknown-subject, and nil inputs without panicking", func() {
|
||||
sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref)
|
||||
applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref)
|
||||
applyAffinityMessage(sync, nil, ref)
|
||||
Expect(true).To(BeTrue())
|
||||
})
|
||||
})
|
||||
@@ -1,10 +1,17 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/LocalAI/pkg/clusterrouting"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
@@ -23,16 +30,29 @@ type FederatedServer struct {
|
||||
requestTable map[string]int
|
||||
loadBalanced bool
|
||||
workerTarget string
|
||||
bodyLimit int64 // max request body bytes (0 = unlimited)
|
||||
prefixCfg prefixcache.Config
|
||||
prefixIndex *prefixcache.Index
|
||||
prefixSync *prefixcache.Sync
|
||||
prefixProvider prefixcache.Provider // Index (sync off) or Sync (sync on)
|
||||
syncAffinity bool
|
||||
}
|
||||
|
||||
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
|
||||
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64, syncAffinity bool) *FederatedServer {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
return &FederatedServer{
|
||||
listenAddr: listenAddr,
|
||||
service: service,
|
||||
p2ptoken: p2pToken,
|
||||
requestTable: map[string]int{},
|
||||
loadBalanced: loadBalanced,
|
||||
workerTarget: workerTarget,
|
||||
listenAddr: listenAddr,
|
||||
service: service,
|
||||
p2ptoken: p2pToken,
|
||||
requestTable: map[string]int{},
|
||||
loadBalanced: loadBalanced,
|
||||
workerTarget: workerTarget,
|
||||
bodyLimit: bodyLimit,
|
||||
prefixCfg: cfg,
|
||||
prefixIndex: idx,
|
||||
prefixProvider: idx,
|
||||
syncAffinity: syncAffinity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,28 +94,141 @@ func (fs *FederatedServer) syncTableStatus() {
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FederatedServer) SelectLeastUsedServer() string {
|
||||
fs.syncTableStatus()
|
||||
// buildFederatedCandidates maps the currently-online federated peers into the
|
||||
// shared routing policy's candidate form, optionally filtered to peers that can
|
||||
// serve model. A peer with a non-empty advertised model set that lacks model is
|
||||
// excluded; a peer with an empty set is treated as "unknown" and stays eligible
|
||||
// (so older peers and mid-convergence peers are not starved). When model is "",
|
||||
// no model filtering is applied. InFlight comes from the per-peer request
|
||||
// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero.
|
||||
func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate {
|
||||
candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes))
|
||||
for _, nd := range nodes {
|
||||
if !nd.IsOnlineAt(now) {
|
||||
continue
|
||||
}
|
||||
if !servesModel(nd, model) {
|
||||
continue
|
||||
}
|
||||
candidates = append(candidates, clusterrouting.ReplicaCandidate{
|
||||
NodeID: nd.ID,
|
||||
InFlight: requestTable[nd.ID],
|
||||
AvailableVRAM: nd.AvailableVRAM,
|
||||
})
|
||||
}
|
||||
return candidates
|
||||
}
|
||||
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
|
||||
xlog.Debug("SelectLeastUsedServer()", "request_table", fs.requestTable)
|
||||
|
||||
// cycle over requestTable and find the entry with the lower number
|
||||
// if there are multiple entries with the same number, select one randomly
|
||||
// if there are no entries, return an empty string
|
||||
var min int
|
||||
var minKey string
|
||||
for k, v := range fs.requestTable {
|
||||
if min == 0 || v < min {
|
||||
min = v
|
||||
minKey = k
|
||||
// servesModel reports whether nd is eligible to serve model. An empty model
|
||||
// means "no filter". An empty advertised set means "unknown" and is eligible.
|
||||
func servesModel(nd schema.NodeData, model string) bool {
|
||||
if model == "" || len(nd.Models) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, m := range nd.Models {
|
||||
if m == model {
|
||||
return true
|
||||
}
|
||||
}
|
||||
xlog.Debug("Selected tunnel", "tunnel", minKey, "requests_served", min, "request_table", fs.requestTable)
|
||||
return false
|
||||
}
|
||||
|
||||
return minKey
|
||||
// extractModel best-effort resolves the target model of a buffered request,
|
||||
// cheapest source first: an explicit query value, then the JSON body "model"
|
||||
// field. Returns "" when it cannot be determined (for example a multipart or
|
||||
// websocket request), in which case the caller routes by load/affinity only.
|
||||
func extractModel(queryModel string, body []byte) string {
|
||||
if strings.TrimSpace(queryModel) != "" {
|
||||
return queryModel
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return ""
|
||||
}
|
||||
var probe struct {
|
||||
Model string `json:"model"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &probe); err != nil {
|
||||
return ""
|
||||
}
|
||||
return probe.Model
|
||||
}
|
||||
|
||||
// affinityPreferred returns the peer the prefix index considers warm for this
|
||||
// chain, or "" when there is no match strong enough among the candidates. It
|
||||
// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick
|
||||
// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved.
|
||||
func affinityPreferred(idx prefixcache.Provider, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string {
|
||||
if idx == nil || len(chain) == 0 || len(candidates) == 0 {
|
||||
return ""
|
||||
}
|
||||
keys := make([]prefixcache.ReplicaKey, 0, len(candidates))
|
||||
for _, c := range candidates {
|
||||
keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID})
|
||||
}
|
||||
d := idx.Decide(model, chain, keys, now)
|
||||
if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch {
|
||||
return d.Hot.NodeID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// selectPeer chooses the federated peer to serve a request for model with the
|
||||
// given raw body. It filters candidates by model, computes the prefix chain,
|
||||
// consults the affinity index, and makes the final load+VRAM-aware pick. It
|
||||
// returns the chosen peer ID and the chain (so the caller can Observe after a
|
||||
// successful forward). An empty model and nil body degrade to load+VRAM only.
|
||||
// Returns "" when no eligible peer is online.
|
||||
func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) {
|
||||
fs.syncTableStatus()
|
||||
nodes := GetAvailableNodes(fs.service)
|
||||
// Snapshot candidates under the lock (it only guards requestTable), then
|
||||
// release before the prefix hashing and tree walk, which are lock-free
|
||||
// (candidates is a copy; prefixIndex/prefixCfg are set once at construction).
|
||||
fs.Lock()
|
||||
candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model)
|
||||
fs.Unlock()
|
||||
if len(candidates) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
var chain []uint64
|
||||
preferred := ""
|
||||
if fs.prefixProvider != nil && model != "" && len(body) > 0 {
|
||||
chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg)
|
||||
preferred = affinityPreferred(fs.prefixProvider, model, chain, candidates, fs.prefixCfg, now)
|
||||
}
|
||||
best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold)
|
||||
if best == nil {
|
||||
return "", chain
|
||||
}
|
||||
return best.NodeID, chain
|
||||
}
|
||||
|
||||
// observeServed records that peerID served the given chain for model, so the
|
||||
// next request sharing that prefix is routed back to the same warm peer.
|
||||
func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) {
|
||||
if fs.prefixProvider == nil || len(chain) == 0 || peerID == "" || model == "" {
|
||||
return
|
||||
}
|
||||
fs.prefixProvider.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now)
|
||||
}
|
||||
|
||||
// evictLoop periodically sweeps expired affinity entries so the in-memory tree
|
||||
// does not grow unbounded. Runs for the lifetime of the proxy.
|
||||
func (fs *FederatedServer) evictLoop(ctx context.Context) {
|
||||
interval := fs.prefixCfg.TTL / 2
|
||||
if interval <= 0 {
|
||||
interval = time.Minute
|
||||
}
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-t.C:
|
||||
fs.prefixProvider.Evict(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FederatedServer) RecordRequest(nodeID string) {
|
||||
|
||||
@@ -1,22 +1,81 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// ErrBodyTooLarge is returned by readRequest when the buffered request body
|
||||
// exceeds the configured limit. The proxy turns it into a 413 response.
|
||||
var ErrBodyTooLarge = errors.New("request body exceeds limit")
|
||||
|
||||
// readRequest parses a single HTTP request from r and buffers its body (so the
|
||||
// body can both be inspected for the model/prefix and replayed to the chosen
|
||||
// peer). limit caps the buffered body in bytes; 0 means unlimited. A body over
|
||||
// the cap returns ErrBodyTooLarge. The returned request has its body replaced
|
||||
// with the buffered bytes and RequestURI cleared so it can be re-serialized
|
||||
// with req.Write to the peer stream.
|
||||
func readRequest(r *bufio.Reader, limit int64) (*http.Request, []byte, error) {
|
||||
req, err := http.ReadRequest(r)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var body []byte
|
||||
if req.Body != nil {
|
||||
reader := io.Reader(req.Body)
|
||||
if limit > 0 {
|
||||
reader = io.LimitReader(req.Body, limit+1)
|
||||
}
|
||||
body, err = io.ReadAll(reader)
|
||||
_ = req.Body.Close()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if limit > 0 && int64(len(body)) > limit {
|
||||
return nil, nil, ErrBodyTooLarge
|
||||
}
|
||||
}
|
||||
req.Body = io.NopCloser(bytes.NewReader(body))
|
||||
req.ContentLength = int64(len(body))
|
||||
req.RequestURI = ""
|
||||
return req, body, nil
|
||||
}
|
||||
|
||||
// isWebsocketUpgrade reports whether req is a websocket handshake, which must be
|
||||
// forwarded as a raw bidirectional duplex (not request/streamed-response) and
|
||||
// is not body-capped or model-routed.
|
||||
func isWebsocketUpgrade(req *http.Request) bool {
|
||||
return strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") &&
|
||||
strings.EqualFold(req.Header.Get("Upgrade"), "websocket")
|
||||
}
|
||||
|
||||
func (f *FederatedServer) Start(ctx context.Context) error {
|
||||
n, err := NewNode(f.p2ptoken)
|
||||
var extraOpts []node.Option
|
||||
if f.syncAffinity {
|
||||
extraOpts = append(extraOpts, node.EnableGenericHub, node.GenericChannelHandlers(f.affinityHandler()))
|
||||
}
|
||||
n, err := NewNode(f.p2ptoken, extraOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating a new node: %w", err)
|
||||
}
|
||||
if f.syncAffinity {
|
||||
f.prefixSync = prefixcache.NewSync(f.prefixIndex, &genericChannelPublisher{node: n})
|
||||
f.prefixProvider = f.prefixSync
|
||||
xlog.Info("Federation affinity sync enabled (generic channel)")
|
||||
}
|
||||
err = n.Start(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating a new node: %w", err)
|
||||
@@ -28,6 +87,8 @@ func (f *FederatedServer) Start(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
go f.evictLoop(ctx)
|
||||
|
||||
return f.proxy(ctx, n)
|
||||
}
|
||||
|
||||
@@ -62,40 +123,60 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle connections in a new goroutine, forwarding to the p2p service
|
||||
// Handle connections in a new goroutine, terminating HTTP and
|
||||
// forwarding the request to the chosen p2p peer.
|
||||
go func() {
|
||||
workerID := ""
|
||||
if fs.workerTarget != "" {
|
||||
workerID = fs.workerTarget
|
||||
} else if fs.loadBalanced {
|
||||
xlog.Debug("Load balancing request")
|
||||
|
||||
workerID = fs.SelectLeastUsedServer()
|
||||
if workerID == "" {
|
||||
xlog.Debug("Least used server not found, selecting random")
|
||||
workerID = fs.RandomServer()
|
||||
br := bufio.NewReader(conn)
|
||||
req, body, err := readRequest(br, fs.bodyLimit)
|
||||
if err != nil {
|
||||
if err == ErrBodyTooLarge {
|
||||
fs.sendHTMLResponse(conn, 413, "Request body too large")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
workerID = fs.RandomServer()
|
||||
}
|
||||
|
||||
if workerID == "" {
|
||||
xlog.Error("No available nodes yet")
|
||||
fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect")
|
||||
xlog.Error("Failed to read request", "error", err)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
upgrade := isWebsocketUpgrade(req)
|
||||
|
||||
now := time.Now()
|
||||
var (
|
||||
workerID string
|
||||
model string
|
||||
chain []uint64
|
||||
)
|
||||
switch {
|
||||
case fs.workerTarget != "":
|
||||
workerID = fs.workerTarget
|
||||
case !fs.loadBalanced:
|
||||
// Explicit random mode (the RandomWorker flag): keep the
|
||||
// historical random pick, no model/affinity routing.
|
||||
workerID = fs.RandomServer()
|
||||
case upgrade:
|
||||
// Websocket: no readable model; route by load only.
|
||||
workerID, _ = fs.selectPeer("", nil, now)
|
||||
default:
|
||||
model = extractModel(req.URL.Query().Get("model"), body)
|
||||
workerID, chain = fs.selectPeer(model, body, now)
|
||||
}
|
||||
|
||||
if workerID == "" {
|
||||
fs.sendHTMLResponse(conn, 503, "No federated peer available for this request")
|
||||
return
|
||||
}
|
||||
|
||||
xlog.Debug("Selected node", "node", workerID)
|
||||
nodeData, exists := GetNode(fs.service, workerID)
|
||||
if !exists {
|
||||
xlog.Error("Node not found", "node", workerID)
|
||||
fs.sendHTMLResponse(conn, 404, "Node not found")
|
||||
return
|
||||
}
|
||||
|
||||
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
|
||||
if fs.loadBalanced {
|
||||
fs.RecordRequest(workerID)
|
||||
proxyHTTPToPeer(ctx, node, nodeData.ServiceID, conn, req, upgrade)
|
||||
|
||||
fs.RecordRequest(workerID)
|
||||
if !upgrade {
|
||||
fs.observeServed(model, chain, workerID, now)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -132,6 +213,8 @@ func getHTTPStatusText(statusCode int) string {
|
||||
switch statusCode {
|
||||
case 503:
|
||||
return "Service Unavailable"
|
||||
case 413:
|
||||
return "Request Entity Too Large"
|
||||
case 404:
|
||||
return "Not Found"
|
||||
case 200:
|
||||
|
||||
196
core/p2p/federated_test.go
Normal file
196
core/p2p/federated_test.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/LocalAI/pkg/clusterrouting"
|
||||
)
|
||||
|
||||
var _ = Describe("buildFederatedCandidates", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
onlineSeen := ref.Add(-10 * time.Second)
|
||||
offlineSeen := ref.Add(-2 * time.Minute)
|
||||
|
||||
It("excludes offline nodes", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "online", LastSeen: onlineSeen},
|
||||
{ID: "offline", LastSeen: offlineSeen},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("online"))
|
||||
})
|
||||
|
||||
It("maps the request counter to InFlight and defaults missing entries to zero", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "a", LastSeen: onlineSeen},
|
||||
{ID: "b", LastSeen: onlineSeen},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "")
|
||||
byID := map[string]int{}
|
||||
for _, c := range cands {
|
||||
byID[c.NodeID] = c.InFlight
|
||||
}
|
||||
Expect(byID["a"]).To(Equal(4))
|
||||
Expect(byID["b"]).To(Equal(0))
|
||||
})
|
||||
|
||||
It("carries gossiped AvailableVRAM into the candidate", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000)))
|
||||
})
|
||||
|
||||
It("produces candidates the shared policy ranks by least in-flight then most VRAM", func() {
|
||||
// busy-big has the most VRAM but is busy, so it must lose. Among the two
|
||||
// idle peers, the one with more free VRAM wins (VRAM breaks the tie).
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "busy-big", LastSeen: onlineSeen, AvailableVRAM: 80_000_000_000},
|
||||
{ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000},
|
||||
{ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "")
|
||||
best := clusterrouting.PickBestReplica(cands)
|
||||
Expect(best).ToNot(BeNil())
|
||||
Expect(best.NodeID).To(Equal("idle-big"))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("model-aware candidate building", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
seen := ref.Add(-10 * time.Second)
|
||||
|
||||
It("keeps peers that advertise the requested model", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}},
|
||||
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("has"))
|
||||
})
|
||||
|
||||
It("keeps peers with an empty (unknown) model set eligible for any model", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "unknown", LastSeen: seen, Models: nil},
|
||||
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("unknown"))
|
||||
})
|
||||
|
||||
It("does not filter when the requested model is empty", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "a", LastSeen: seen, Models: []string{"x"}},
|
||||
{ID: "b", LastSeen: seen, Models: []string{"y"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands).To(HaveLen(2))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("extractModel", func() {
|
||||
It("reads the JSON body model field", func() {
|
||||
body := []byte(`{"model":"llama-3","messages":[]}`)
|
||||
Expect(extractModel("", body)).To(Equal("llama-3"))
|
||||
})
|
||||
|
||||
It("prefers a path/query model over the body", func() {
|
||||
body := []byte(`{"model":"frombody"}`)
|
||||
Expect(extractModel("fromquery", body)).To(Equal("fromquery"))
|
||||
})
|
||||
|
||||
It("returns empty when no model is present", func() {
|
||||
Expect(extractModel("", []byte(`{"messages":[]}`))).To(Equal(""))
|
||||
})
|
||||
|
||||
It("returns empty on non-JSON / unparseable body without panicking", func() {
|
||||
Expect(extractModel("", []byte("--multipart-boundary--"))).To(Equal(""))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("affinityPreferred", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns the warm peer once a chain has been observed for it", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
|
||||
idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
|
||||
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("returns empty when no chain has been observed", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal(""))
|
||||
})
|
||||
|
||||
It("returns empty for a nil index or empty chain", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal(""))
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal(""))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("L7 request handling", func() {
|
||||
It("reads a buffered request and its body under the cap", func() {
|
||||
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 28\r\n\r\n" +
|
||||
`{"model":"m1","messages":[]}`
|
||||
req, body, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(req.URL.Path).To(Equal("/v1/chat/completions"))
|
||||
Expect(string(body)).To(ContainSubstring(`"model":"m1"`))
|
||||
})
|
||||
|
||||
It("rejects a body over the cap with ErrBodyTooLarge", func() {
|
||||
big := strings.Repeat("a", 200)
|
||||
raw := "POST /x HTTP/1.1\r\nHost: x\r\nContent-Length: 200\r\n\r\n" + big
|
||||
_, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 64)
|
||||
Expect(err).To(MatchError(ErrBodyTooLarge))
|
||||
})
|
||||
|
||||
It("detects a websocket upgrade request", func() {
|
||||
raw := "GET /v1/realtime HTTP/1.1\r\nHost: x\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n"
|
||||
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(isWebsocketUpgrade(req)).To(BeTrue())
|
||||
})
|
||||
|
||||
It("does not flag a normal POST as a websocket upgrade", func() {
|
||||
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 2\r\n\r\n{}"
|
||||
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(isWebsocketUpgrade(req)).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("affinityPreferred with a sync provider", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns the warm peer when the provider is a Sync wrapping the index", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
sync := prefixcache.NewSync(idx, nil)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"a long shared system prompt for affinity"}]}`, cfg)
|
||||
sync.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
|
||||
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(sync, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
|
||||
})
|
||||
})
|
||||
@@ -6,15 +6,18 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/utils"
|
||||
"github.com/mudler/LocalAI/pkg/xsysinfo"
|
||||
"github.com/mudler/edgevpn/pkg/config"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/edgevpn/pkg/protocol"
|
||||
@@ -86,37 +89,39 @@ func nodeAnnounce(ctx context.Context, node *node.Node) {
|
||||
)
|
||||
}
|
||||
|
||||
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
|
||||
ledger, _ := node.Ledger()
|
||||
// openPeerStream resolves serviceID to its advertised peer in the services
|
||||
// ledger and opens a libp2p stream to that peer over the service protocol.
|
||||
// Returns the stream or an error describing which lookup step failed.
|
||||
func openPeerStream(ctx context.Context, n *node.Node, serviceID string) (network.Stream, error) {
|
||||
ledger, _ := n.Ledger()
|
||||
// Retrieve current ID for ip in the blockchain
|
||||
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
|
||||
service := &types.Service{}
|
||||
existingValue.Unmarshal(service)
|
||||
// If mismatch, update the blockchain
|
||||
if !found {
|
||||
zlog.Error("Service not found on blockchain")
|
||||
conn.Close()
|
||||
// ll.Debugf("service '%s' not found on blockchain", serviceID)
|
||||
return
|
||||
return nil, errors.New("service not found on blockchain")
|
||||
}
|
||||
|
||||
// Decode the Peer
|
||||
d, err := peer.Decode(service.PeerID)
|
||||
if err != nil {
|
||||
zlog.Error("cannot decode peer")
|
||||
|
||||
conn.Close()
|
||||
// ll.Debugf("could not decode peer '%s'", service.PeerID)
|
||||
return
|
||||
return nil, fmt.Errorf("cannot decode peer: %w", err)
|
||||
}
|
||||
|
||||
// Open a stream
|
||||
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
|
||||
stream, err := n.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
|
||||
if err != nil {
|
||||
zlog.Error("cannot open stream peer", "error", err)
|
||||
return nil, fmt.Errorf("cannot open stream peer: %w", err)
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
|
||||
stream, err := openPeerStream(ctx, node, serviceID)
|
||||
if err != nil {
|
||||
zlog.Error("Could not open peer stream", "error", err)
|
||||
conn.Close()
|
||||
// ll.Debugf("could not open stream '%s'", err.Error())
|
||||
return
|
||||
}
|
||||
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
|
||||
@@ -130,6 +135,45 @@ func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string,
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// proxyHTTPToPeer forwards an already-parsed HTTP request to the chosen peer
|
||||
// over a libp2p stream and streams the response back to conn. When duplex is
|
||||
// true (a websocket upgrade) it runs a bidirectional copy after writing the
|
||||
// request, so post-101 frames flow both ways. The response is never buffered,
|
||||
// so SSE keeps flowing.
|
||||
func proxyHTTPToPeer(ctx context.Context, n *node.Node, serviceID string, conn net.Conn, req *http.Request, duplex bool) {
|
||||
stream, err := openPeerStream(ctx, n, serviceID)
|
||||
if err != nil {
|
||||
zlog.Error("Could not open peer stream", "error", err)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
// Force the peer to close after responding so the one-way io.Copy below
|
||||
// terminates. Without this the peer keeps the HTTP/1.1 connection alive and
|
||||
// io.Copy(conn, stream) blocks forever, leaking the goroutine, conn, and
|
||||
// stream. Websocket upgrades keep keep-alive: their duplex copy owns the
|
||||
// lifetime.
|
||||
req.Header.Del("Connection")
|
||||
req.Close = !duplex
|
||||
if err := req.Write(stream); err != nil {
|
||||
zlog.Error("Could not write request to peer", "error", err)
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
if duplex {
|
||||
closer := make(chan struct{}, 2)
|
||||
go copyStream(closer, stream, conn)
|
||||
go copyStream(closer, conn, stream)
|
||||
<-closer
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
_, _ = io.Copy(conn, stream)
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
|
||||
zlog.Info("Allocating service", "service", service, "address", listenAddr)
|
||||
// Open local port for listening
|
||||
@@ -311,7 +355,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv
|
||||
}
|
||||
|
||||
// This is the P2P worker main
|
||||
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
|
||||
func ExposeService(ctx context.Context, host, port, token, servicesID string, modelsFn func() []string) (*node.Node, error) {
|
||||
if servicesID == "" {
|
||||
servicesID = defaultServicesID
|
||||
}
|
||||
@@ -347,10 +391,16 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
|
||||
20*time.Second,
|
||||
func() {
|
||||
updatedMap := map[string]any{}
|
||||
var models []string
|
||||
if modelsFn != nil {
|
||||
models = modelsFn()
|
||||
}
|
||||
updatedMap[name] = &schema.NodeData{
|
||||
Name: name,
|
||||
LastSeen: time.Now(),
|
||||
ID: nodeID(name),
|
||||
Name: name,
|
||||
LastSeen: time.Now(),
|
||||
ID: nodeID(name),
|
||||
AvailableVRAM: xsysinfo.GetGPUAggregateInfo().FreeVRAM,
|
||||
Models: models,
|
||||
}
|
||||
ledger.Add(servicesID, updatedMap)
|
||||
},
|
||||
@@ -359,11 +409,12 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
|
||||
return n, err
|
||||
}
|
||||
|
||||
func NewNode(token string) (*node.Node, error) {
|
||||
func NewNode(token string, extraOpts ...node.Option) (*node.Node, error) {
|
||||
nodeOpts, err := newNodeOpts(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeOpts = append(nodeOpts, extraOpts...)
|
||||
|
||||
n, err := node.New(nodeOpts...)
|
||||
if err != nil {
|
||||
|
||||
13
core/p2p/p2p_suite_test.go
Normal file
13
core/p2p/p2p_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestP2P(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "P2P Suite")
|
||||
}
|
||||
@@ -121,18 +121,38 @@ type StoresFindResponse struct {
|
||||
Similarities []float32 `json:"similarities" yaml:"similarities"`
|
||||
}
|
||||
|
||||
// NodeOnlineWindow is how long after its last announce a node still counts as
|
||||
// online. Nodes re-announce into the edgevpn ledger every 20s (see core/p2p
|
||||
// ExposeService), so 40s tolerates a single missed announce.
|
||||
const NodeOnlineWindow = 40 * time.Second
|
||||
|
||||
type NodeData struct {
|
||||
Name string
|
||||
ID string
|
||||
TunnelAddress string
|
||||
ServiceID string
|
||||
LastSeen time.Time
|
||||
// AvailableVRAM is the node's free GPU VRAM in bytes at its last announce,
|
||||
// gossiped so federation selection can prefer peers with more headroom.
|
||||
// Zero for CPU-only nodes and for peers on an older version that does not
|
||||
// publish it; the routing policy treats zero as the lowest VRAM tier.
|
||||
AvailableVRAM uint64
|
||||
// Models is the set of model names this peer currently serves, gossiped so
|
||||
// the federation proxy can route a request only to peers that have the
|
||||
// requested model. Empty means "unknown" (an older peer, or one that has
|
||||
// not loaded any model yet) and is treated as eligible for any model so a
|
||||
// mixed-version swarm is not starved.
|
||||
Models []string
|
||||
}
|
||||
|
||||
func (d NodeData) IsOnline() bool {
|
||||
now := time.Now()
|
||||
// if the node was seen in the last 40 seconds, it's online
|
||||
return now.Sub(d.LastSeen) < 40*time.Second
|
||||
return d.IsOnlineAt(time.Now())
|
||||
}
|
||||
|
||||
// IsOnlineAt reports whether the node counts as online relative to now. It is
|
||||
// split from IsOnline so selection logic can be exercised with a fixed clock.
|
||||
func (d NodeData) IsOnlineAt(now time.Time) bool {
|
||||
return now.Sub(d.LastSeen) < NodeOnlineWindow
|
||||
}
|
||||
|
||||
type P2PNodesResponse struct {
|
||||
|
||||
39
core/schema/nodedata_test.go
Normal file
39
core/schema/nodedata_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package schema_test
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
)
|
||||
|
||||
var _ = Describe("NodeData", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("is online within the online window", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-30 * time.Second)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeTrue())
|
||||
})
|
||||
|
||||
It("is offline once the online window has elapsed", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-50 * time.Second)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
|
||||
})
|
||||
|
||||
It("treats exactly the window boundary as offline (strict less-than)", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-schema.NodeOnlineWindow)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
|
||||
})
|
||||
|
||||
It("carries AvailableVRAM in bytes", func() {
|
||||
nd := schema.NodeData{AvailableVRAM: 8_000_000_000}
|
||||
Expect(nd.AvailableVRAM).To(Equal(uint64(8_000_000_000)))
|
||||
})
|
||||
|
||||
It("carries the advertised model set", func() {
|
||||
nd := schema.NodeData{Models: []string{"llama-3", "qwen"}}
|
||||
Expect(nd.Models).To(ConsistOf("llama-3", "qwen"))
|
||||
})
|
||||
})
|
||||
33
pkg/clusterrouting/affinity.go
Normal file
33
pkg/clusterrouting/affinity.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package clusterrouting
|
||||
|
||||
// PickWithAffinity prefers the candidate whose NodeID equals preferredNodeID
|
||||
// when that candidate's in-flight count is within slack of the least-loaded
|
||||
// candidate; otherwise it falls back to PickBestReplica (least in-flight, then
|
||||
// oldest last-used, then most free VRAM). This keeps a warm prefix-cache peer
|
||||
// sticky without letting it become a hot-spot: once it is more than slack
|
||||
// requests busier than the least-loaded peer, load wins. With an empty
|
||||
// preferredNodeID, or a preferred node not in the set, it is exactly
|
||||
// PickBestReplica. slack mirrors prefixcache's BalanceAbsThreshold.
|
||||
func PickWithAffinity(candidates []ReplicaCandidate, preferredNodeID string, slack int) *ReplicaCandidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
if preferredNodeID == "" {
|
||||
return PickBestReplica(candidates)
|
||||
}
|
||||
var preferred *ReplicaCandidate
|
||||
minInFlight := candidates[0].InFlight
|
||||
for i := range candidates {
|
||||
c := &candidates[i]
|
||||
if c.InFlight < minInFlight {
|
||||
minInFlight = c.InFlight
|
||||
}
|
||||
if c.NodeID == preferredNodeID {
|
||||
preferred = c
|
||||
}
|
||||
}
|
||||
if preferred != nil && preferred.InFlight <= minInFlight+slack {
|
||||
return preferred
|
||||
}
|
||||
return PickBestReplica(candidates)
|
||||
}
|
||||
48
pkg/clusterrouting/affinity_test.go
Normal file
48
pkg/clusterrouting/affinity_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package clusterrouting
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("PickWithAffinity", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns nil for an empty candidate list", func() {
|
||||
Expect(PickWithAffinity(nil, "x", 2)).To(BeNil())
|
||||
})
|
||||
|
||||
It("falls back to PickBestReplica when no preferred node is given", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "busy", InFlight: 3, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "idle", InFlight: 0, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "", 2).NodeID).To(Equal("idle"))
|
||||
})
|
||||
|
||||
It("honors the preferred node when it is within the in-flight slack of the least-loaded", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "warm", InFlight: 2, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("ignores the preferred node when it is beyond the slack and falls back to load+VRAM", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "warm", InFlight: 5, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("cold"))
|
||||
})
|
||||
|
||||
It("falls back to load+VRAM when the preferred node is not among the candidates", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "a", InFlight: 1, LastUsed: ref, AvailableVRAM: 8},
|
||||
{NodeID: "b", InFlight: 1, LastUsed: ref, AvailableVRAM: 24},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "ghost", 2).NodeID).To(Equal("b"))
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user