mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-06 15:56:06 -04:00
Compare commits
21 Commits
dependabot
...
feat/p2p-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c410cd7253 | ||
|
|
4e993332af | ||
|
|
42e51894c3 | ||
|
|
d9ae6481fb | ||
|
|
f1c495a748 | ||
|
|
450376d22f | ||
|
|
8180fddc05 | ||
|
|
5033457f57 | ||
|
|
d88758282a | ||
|
|
a0c7cecddd | ||
|
|
bc42374d8a | ||
|
|
ec2a0645dd | ||
|
|
ce8b97edf2 | ||
|
|
91fc26ff75 | ||
|
|
8df0bb683b | ||
|
|
8ec536a34c | ||
|
|
14b57aa343 | ||
|
|
288d732af7 | ||
|
|
ed38609d51 | ||
|
|
7768b35696 | ||
|
|
830f818c58 |
@@ -1,6 +1,6 @@
|
||||
# parakeet-cpp backend Makefile.
|
||||
#
|
||||
# Upstream pin lives below as PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
|
||||
# Upstream pin lives below as PARAKEET_VERSION?=9edf17c3ada66e0f881dcff155492867db7ac4cf
|
||||
# (.github/bump_deps.sh) can find and update it - matches the
|
||||
# whisper.cpp / ds4 / vibevoice-cpp convention.
|
||||
#
|
||||
@@ -15,7 +15,7 @@
|
||||
# That's what the L0 smoke test uses. The default target below does the
|
||||
# proper clone-at-pin + cmake build so CI doesn't need a side-checkout.
|
||||
|
||||
PARAKEET_VERSION?=8a7c48209d7882a7ce79a6b306270e4703194543
|
||||
PARAKEET_VERSION?=9edf17c3ada66e0f881dcff155492867db7ac4cf
|
||||
PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp
|
||||
|
||||
GOCMD?=go
|
||||
|
||||
@@ -53,9 +53,21 @@ func (a *Application) StartP2P() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// modelsFn reports the model names this instance currently serves so the
|
||||
// federation proxy can route a request only to peers that have the
|
||||
// requested model. It is re-evaluated on every announce tick.
|
||||
modelsFn := func() []string {
|
||||
cfgs := a.ModelConfigLoader().GetAllModelsConfigs()
|
||||
names := make([]string, 0, len(cfgs))
|
||||
for _, c := range cfgs {
|
||||
names = append(names, c.Name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// Here a new node is created and started
|
||||
// and a service is exposed by the node
|
||||
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID))
|
||||
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), modelsFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -14,12 +14,14 @@ type FederatedCLI struct {
|
||||
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
|
||||
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
|
||||
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
|
||||
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-size limit in megabytes" group:"api"`
|
||||
AffinitySync bool `env:"LOCALAI_FEDERATED_AFFINITY_SYNC,FEDERATED_AFFINITY_SYNC" default:"false" help:"Broadcast prefix-cache affinity observations to other federation servers over the p2p generic channel (enable on every federation server that should cohere)" group:"p2p"`
|
||||
}
|
||||
|
||||
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
|
||||
warnDeprecatedFlags()
|
||||
|
||||
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
|
||||
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker, int64(f.UploadLimit)*1024*1024, f.AffinitySync)
|
||||
|
||||
c, cancel := context.WithCancel(context.Background())
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||
p = r.RunnerPort
|
||||
}
|
||||
|
||||
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -104,7 +104,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.LlamaCPPWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func (r *P2PMLX) Run(ctx *cliContext.Context) error {
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID))
|
||||
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.MLXWorkerID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
37
core/http/react-ui/e2e/cluster.spec.js
Normal file
37
core/http/react-ui/e2e/cluster.spec.js
Normal file
@@ -0,0 +1,37 @@
|
||||
import { test, expect } from './coverage-fixtures.js'
|
||||
|
||||
// The Cluster page composes two capability sections: "Distributed (NATS)" (the
|
||||
// former Nodes page) and "Swarm (p2p)" (the former P2P page). Each section only
|
||||
// mounts when its mode is enabled — distributed when /api/nodes answers OK, swarm
|
||||
// when a non-empty p2p network token is present. We mock those probes so the page
|
||||
// renders against the standalone ui-test-server without NATS / p2p running.
|
||||
|
||||
async function mockDistributedOnly(page) {
|
||||
await page.route('**/api/nodes', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
|
||||
})
|
||||
await page.route('**/api/nodes/scheduling', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '[]' })
|
||||
})
|
||||
// Swarm disabled: token probe fails, so the swarm section stays hidden.
|
||||
await page.route('**/api/p2p/token', (route) => {
|
||||
route.fulfill({ status: 503, contentType: 'text/plain', body: '' })
|
||||
})
|
||||
}
|
||||
|
||||
test.describe('Cluster page', () => {
|
||||
test('shows the page title', async ({ page }) => {
|
||||
await mockDistributedOnly(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
|
||||
})
|
||||
|
||||
test('shows the distributed section when /api/nodes responds', async ({ page }) => {
|
||||
await mockDistributedOnly(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
// The distributed capability section is titled "Distributed (NATS)".
|
||||
await expect(page.getByText(/Distributed \(NATS\)/i)).toBeVisible()
|
||||
})
|
||||
})
|
||||
@@ -23,4 +23,11 @@ test.describe('Navigation', () => {
|
||||
await expect(page).toHaveURL(/\/app\/traces/)
|
||||
await expect(page.getByRole('heading', { name: 'Traces', exact: true })).toBeVisible()
|
||||
})
|
||||
|
||||
test('old cluster routes redirect to /app/cluster', async ({ page }) => {
|
||||
await page.goto('/app/p2p')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await page.goto('/app/nodes')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -81,7 +81,7 @@ async function mockDistributedNodes(page, { onDelete } = {}) {
|
||||
}
|
||||
|
||||
async function expandNodeAndWaitForBackends(page) {
|
||||
await page.goto('/app/nodes')
|
||||
await page.goto('/app/cluster')
|
||||
// Click the row to expand it. The chevron toggle and the row both work,
|
||||
// but clicking the name cell is the most user-like.
|
||||
await page.getByText(NODE_NAME).first().click()
|
||||
|
||||
@@ -1,26 +1,65 @@
|
||||
import { test, expect } from './coverage-fixtures.js'
|
||||
|
||||
// P2P (Swarm) admin page — renders in the no-auth test harness (isAdmin).
|
||||
test.describe('P2P page', () => {
|
||||
test.beforeEach(async ({ page }) => {
|
||||
// The standalone P2P (Swarm) page was merged into the Cluster page: /app/p2p now
|
||||
// redirects to /app/cluster, and the p2p content lives under the "Swarm (p2p)"
|
||||
// section. That section only mounts when p2p is enabled (a network token is
|
||||
// present), so we mock /api/p2p/token to return a non-empty token and assert the
|
||||
// swarm content renders under the cluster page.
|
||||
const P2P_TOKEN = 'test-network-token'
|
||||
|
||||
async function mockSwarmEnabled(page) {
|
||||
await page.route('**/api/p2p/token', (route) => {
|
||||
route.fulfill({
|
||||
status: 200,
|
||||
contentType: 'text/plain',
|
||||
body: P2P_TOKEN,
|
||||
})
|
||||
})
|
||||
await page.route('**/api/p2p/workers', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
|
||||
})
|
||||
await page.route('**/api/p2p/federation', (route) => {
|
||||
route.fulfill({ status: 200, contentType: 'application/json', body: '{"nodes":[]}' })
|
||||
})
|
||||
await page.route('**/api/p2p/stats', (route) => {
|
||||
route.fulfill({
|
||||
status: 200,
|
||||
contentType: 'application/json',
|
||||
body: JSON.stringify({
|
||||
llama_cpp_workers: { online: 0, total: 0 },
|
||||
federated: { online: 0, total: 0 },
|
||||
mlx_workers: { online: 0, total: 0 },
|
||||
}),
|
||||
})
|
||||
})
|
||||
// The cluster page also probes /api/nodes for the distributed section; keep it
|
||||
// failing (distributed disabled) so only the swarm section renders here.
|
||||
await page.route('**/api/nodes', (route) => {
|
||||
route.fulfill({ status: 503, contentType: 'application/json', body: '{}' })
|
||||
})
|
||||
}
|
||||
|
||||
test.describe('P2P (Swarm) section on the Cluster page', () => {
|
||||
test('the old /app/p2p route lands on the cluster page', async ({ page }) => {
|
||||
await mockSwarmEnabled(page)
|
||||
// /app/p2p redirects to /app/cluster.
|
||||
await page.goto('/app/p2p')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
await expect(page.getByRole('heading', { name: /Cluster/i })).toBeVisible()
|
||||
})
|
||||
|
||||
test('renders the P2P distribution overview and capability cards', async ({ page }) => {
|
||||
await expect(page).toHaveURL(/\/app\/p2p$/)
|
||||
await expect(page.getByRole('heading', { name: /P2P Distribution Not Enabled/i })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Instance Federation' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Model Sharding' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: 'Resource Sharing' })).toBeVisible()
|
||||
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
|
||||
})
|
||||
test('renders the Swarm (p2p) section when p2p is enabled', async ({ page }) => {
|
||||
await mockSwarmEnabled(page)
|
||||
await page.goto('/app/cluster')
|
||||
await expect(page).toHaveURL(/\/app\/cluster$/)
|
||||
|
||||
test('hardware selector offers build targets and responds to selection', async ({ page }) => {
|
||||
const cpu = page.getByRole('button').filter({ hasText: /^CPU$/ })
|
||||
const cuda = page.getByRole('button').filter({ hasText: /^CUDA 12$/ })
|
||||
await expect(cpu).toBeVisible()
|
||||
await expect(cuda).toBeVisible()
|
||||
await cuda.click() // selecting a build target must not break the page
|
||||
await expect(page.getByRole('heading', { name: /How to Enable P2P/i })).toBeVisible()
|
||||
// The collapsible swarm section is titled "Swarm (p2p)".
|
||||
await expect(page.getByText(/Swarm \(p2p\)/i)).toBeVisible()
|
||||
|
||||
// The enabled p2p content (Network Token panel + the federation / sharding
|
||||
// tabs) is rendered inside the swarm section.
|
||||
await expect(page.getByRole('heading', { name: /Network Token/i })).toBeVisible()
|
||||
await expect(page.getByText('Federation', { exact: true })).toBeVisible()
|
||||
await expect(page.getByText('Model Sharding', { exact: true })).toBeVisible()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -20,7 +20,7 @@ const PAGES = [
|
||||
['/app/manage', 'Manage'],
|
||||
['/app/backends', 'Backends'],
|
||||
['/app/settings', 'Settings'],
|
||||
['/app/nodes', 'Nodes'],
|
||||
['/app/cluster', 'Cluster'],
|
||||
['/app/face', 'Face recognition'],
|
||||
['/app/voice', 'Voice recognition'],
|
||||
['/app/fine-tune', 'Fine-tuning'],
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorer",
|
||||
"subtitle": "Dateien und Konfiguration durchsuchen"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Verteilte und Peer-to-Peer-Knoten, die diese Instanz bedienen",
|
||||
"empty": "Es ist kein verteiltes oder p2p-Clustering aktiviert. Starte LocalAI im verteilten oder föderierten/p2p-Modus, um hier Cluster-Knoten zu verwalten.",
|
||||
"distributed": {
|
||||
"title": "Verteilt (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Verteilte Knoten",
|
||||
"inFlight": "Laufende Anfragen",
|
||||
"peers": "Swarm-Peers online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Traces",
|
||||
"nodes": "Knoten",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "System",
|
||||
"settings": "Einstellungen",
|
||||
"api": "API"
|
||||
|
||||
@@ -81,5 +81,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorer",
|
||||
"subtitle": "Browse files and configuration"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Distributed and peer-to-peer nodes serving this instance",
|
||||
"empty": "No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.",
|
||||
"distributed": {
|
||||
"title": "Distributed (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Distributed nodes",
|
||||
"inFlight": "In-flight requests",
|
||||
"peers": "Swarm peers online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
"traces": "Traces",
|
||||
"nodes": "Nodes",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "System",
|
||||
"settings": "Settings",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Explorador",
|
||||
"subtitle": "Explora archivos y configuración"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Clúster",
|
||||
"subtitle": "Nodos distribuidos y entre pares que sirven a esta instancia",
|
||||
"empty": "No hay clustering distribuido ni p2p habilitado. Inicia LocalAI en modo distribuido o federado/p2p para gestionar aquí los nodos del clúster.",
|
||||
"distributed": {
|
||||
"title": "Distribuido (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Nodos distribuidos",
|
||||
"inFlight": "Solicitudes en curso",
|
||||
"peers": "Pares de Swarm en línea"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Trazas",
|
||||
"nodes": "Nodos",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Clúster",
|
||||
"system": "Sistema",
|
||||
"settings": "Configuración",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "Esplora risorse",
|
||||
"subtitle": "Sfoglia file e configurazioni"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "Cluster",
|
||||
"subtitle": "Nodi distribuiti e peer-to-peer al servizio di questa istanza",
|
||||
"empty": "Nessun clustering distribuito o p2p è abilitato. Avvia LocalAI in modalità distribuita o federata/p2p per gestire qui i nodi del cluster.",
|
||||
"distributed": {
|
||||
"title": "Distribuito (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "Nodi distribuiti",
|
||||
"inFlight": "Richieste in corso",
|
||||
"peers": "Peer Swarm online"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "Tracce",
|
||||
"nodes": "Nodi",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "Cluster",
|
||||
"system": "Sistema",
|
||||
"settings": "Impostazioni",
|
||||
"api": "API"
|
||||
|
||||
@@ -58,5 +58,21 @@
|
||||
"explorer": {
|
||||
"title": "资源浏览器",
|
||||
"subtitle": "浏览文件和配置"
|
||||
},
|
||||
"cluster": {
|
||||
"title": "集群",
|
||||
"subtitle": "为此实例提供服务的分布式和点对点节点",
|
||||
"empty": "未启用分布式或 p2p 集群。请以分布式或联邦/p2p 模式启动 LocalAI,以便在此管理集群节点。",
|
||||
"distributed": {
|
||||
"title": "分布式 (NATS)"
|
||||
},
|
||||
"swarm": {
|
||||
"title": "Swarm (p2p)"
|
||||
},
|
||||
"summary": {
|
||||
"nodes": "分布式节点",
|
||||
"inFlight": "进行中的请求",
|
||||
"peers": "在线 Swarm 节点"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"traces": "追踪",
|
||||
"nodes": "节点",
|
||||
"swarm": "Swarm",
|
||||
"cluster": "集群",
|
||||
"system": "系统",
|
||||
"settings": "设置",
|
||||
"api": "API"
|
||||
|
||||
27
core/http/react-ui/src/components/ClusterSection.jsx
Normal file
27
core/http/react-ui/src/components/ClusterSection.jsx
Normal file
@@ -0,0 +1,27 @@
|
||||
import { useState } from 'react'
|
||||
|
||||
// ClusterSection is a collapsible, titled container for one capability area of
|
||||
// the Cluster page (Distributed / Swarm). Default expanded.
|
||||
export default function ClusterSection({ icon, title, subtitle, defaultOpen = true, children }) {
|
||||
const [open, setOpen] = useState(defaultOpen)
|
||||
return (
|
||||
<section className="card" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
<button
|
||||
type="button"
|
||||
aria-expanded={open}
|
||||
onClick={() => setOpen((o) => !o)}
|
||||
style={{
|
||||
display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)',
|
||||
width: '100%', padding: 'var(--spacing-md)', background: 'none',
|
||||
border: 'none', cursor: 'pointer', textAlign: 'left', color: 'inherit',
|
||||
}}
|
||||
>
|
||||
<i className={`fas fa-chevron-${open ? 'down' : 'right'}`} style={{ width: '1rem', color: 'var(--color-text-muted)' }} />
|
||||
{icon && <i className={icon} style={{ color: 'var(--color-primary)' }} />}
|
||||
<span style={{ fontWeight: 600 }}>{title}</span>
|
||||
{subtitle && <span style={{ marginLeft: 'auto', color: 'var(--color-text-muted)', fontSize: '0.875rem' }}>{subtitle}</span>}
|
||||
</button>
|
||||
{open && <div style={{ padding: '0 var(--spacing-md) var(--spacing-md)' }}>{children}</div>}
|
||||
</section>
|
||||
)
|
||||
}
|
||||
44
core/http/react-ui/src/components/ClusterSummary.jsx
Normal file
44
core/http/react-ui/src/components/ClusterSummary.jsx
Normal file
@@ -0,0 +1,44 @@
|
||||
import { useEffect, useState } from 'react'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import StatCard from './StatCard'
|
||||
import { nodesApi, p2pApi } from '../utils/api'
|
||||
|
||||
// ClusterSummary shows merged totals across both transports. Self-contained
|
||||
// (own lightweight fetch) so the page composes without lifting state out of the
|
||||
// two large section components.
|
||||
export default function ClusterSummary({ distributedEnabled, p2pEnabled }) {
|
||||
const { t } = useTranslation('admin')
|
||||
const [nats, setNats] = useState({ nodes: 0, inFlight: 0 })
|
||||
const [swarm, setSwarm] = useState({ online: 0, total: 0 })
|
||||
|
||||
useEffect(() => {
|
||||
let active = true
|
||||
async function load() {
|
||||
if (distributedEnabled) {
|
||||
try {
|
||||
const list = await nodesApi.list()
|
||||
const nodes = Array.isArray(list) ? list : (list?.nodes ?? [])
|
||||
if (active) setNats({ nodes: nodes.length, inFlight: nodes.reduce((a, n) => a + (n.in_flight_count || 0), 0) })
|
||||
} catch { /* leave zeros */ }
|
||||
}
|
||||
if (p2pEnabled) {
|
||||
try {
|
||||
const stats = await p2pApi.getStats()
|
||||
const online = (stats?.federated?.online || 0) + (stats?.llama_cpp_workers?.online || 0) + (stats?.mlx_workers?.online || 0)
|
||||
const total = (stats?.federated?.total || 0) + (stats?.llama_cpp_workers?.total || 0) + (stats?.mlx_workers?.total || 0)
|
||||
if (active) setSwarm({ online, total })
|
||||
} catch { /* leave zeros */ }
|
||||
}
|
||||
}
|
||||
load()
|
||||
return () => { active = false }
|
||||
}, [distributedEnabled, p2pEnabled])
|
||||
|
||||
return (
|
||||
<div className="stat-grid" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
{distributedEnabled && <StatCard icon="fas fa-network-wired" label={t('cluster.summary.nodes', 'Distributed nodes')} value={nats.nodes} />}
|
||||
{distributedEnabled && <StatCard icon="fas fa-bolt" label={t('cluster.summary.inFlight', 'In-flight requests')} value={nats.inFlight} />}
|
||||
{p2pEnabled && <StatCard icon="fas fa-circle-nodes" label={t('cluster.summary.peers', 'Swarm peers online')} value={`${swarm.online}/${swarm.total}`} />}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -479,7 +479,7 @@ export default function NodeInstallPicker({
|
||||
Approve pending workers or register new ones.
|
||||
{pendingCount > 0 && ` (${pendingCount} awaiting approval.)`}
|
||||
</p>
|
||||
<a className="btn btn-secondary btn-sm" href="/app/nodes">
|
||||
<a className="btn btn-secondary btn-sm" href="/app/cluster">
|
||||
<i className="fas fa-network-wired" /> Manage nodes
|
||||
</a>
|
||||
</div>
|
||||
@@ -672,7 +672,7 @@ export default function NodeInstallPicker({
|
||||
|
||||
{pendingCount > 0 && (
|
||||
<p style={{ fontSize: '0.75rem', color: 'var(--color-text-muted)', marginTop: 0, marginBottom: 'var(--spacing-sm)' }}>
|
||||
+{pendingCount} awaiting approval — <a href="/app/nodes" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
|
||||
+{pendingCount} awaiting approval — <a href="/app/cluster" style={{ color: 'var(--color-primary)' }}>approve from Nodes</a>.
|
||||
</p>
|
||||
)}
|
||||
|
||||
|
||||
@@ -75,8 +75,7 @@ const sections = [
|
||||
{ path: '/app/middleware', icon: 'fas fa-shield-halved', labelKey: 'items.middleware', adminOnly: true },
|
||||
{ path: '/app/backends', icon: 'fas fa-server', labelKey: 'items.backends', adminOnly: true },
|
||||
{ path: '/app/traces', icon: 'fas fa-chart-line', labelKey: 'items.traces', adminOnly: true },
|
||||
{ path: '/app/nodes', icon: 'fas fa-network-wired', labelKey: 'items.nodes', adminOnly: true, feature: 'distributed' },
|
||||
{ path: '/app/p2p', icon: 'fas fa-circle-nodes', labelKey: 'items.swarm', adminOnly: true },
|
||||
{ path: '/app/cluster', icon: 'fas fa-network-wired', labelKey: 'items.cluster', adminOnly: true },
|
||||
{ path: '/app/manage', icon: 'fas fa-desktop', labelKey: 'items.system', adminOnly: true },
|
||||
{ path: '/app/settings', icon: 'fas fa-cog', labelKey: 'items.settings', adminOnly: true },
|
||||
],
|
||||
|
||||
31
core/http/react-ui/src/hooks/useP2PMode.js
vendored
Normal file
31
core/http/react-ui/src/hooks/useP2PMode.js
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
import { useState, useEffect, useCallback } from 'react'
|
||||
import { p2pApi } from '../utils/api'
|
||||
|
||||
// useP2PMode reports whether p2p / swarm mode is available, mirroring
|
||||
// useDistributedMode. Availability is "a network token exists" (the same signal
|
||||
// the standalone P2P page used). One-shot probe on mount plus a manual refetch.
|
||||
//
|
||||
// Returns:
|
||||
// enabled — true when a non-empty network token is present
|
||||
// loading — true until the first probe completes
|
||||
// refetch — manual trigger to re-run the probe
|
||||
export function useP2PMode() {
|
||||
const [enabled, setEnabled] = useState(false)
|
||||
const [loading, setLoading] = useState(true)
|
||||
|
||||
const probe = useCallback(async () => {
|
||||
setLoading(true)
|
||||
try {
|
||||
const token = await p2pApi.getToken()
|
||||
setEnabled(!!(token && String(token).trim()))
|
||||
} catch {
|
||||
setEnabled(false)
|
||||
} finally {
|
||||
setLoading(false)
|
||||
}
|
||||
}, [])
|
||||
|
||||
useEffect(() => { probe() }, [probe])
|
||||
|
||||
return { enabled, loading, refetch: probe }
|
||||
}
|
||||
@@ -339,7 +339,7 @@ function DistributedBackendLogsResolver({ modelId, fromTimestamp }) {
|
||||
<h2 className="empty-state-title">Model not loaded on any worker</h2>
|
||||
<p className="empty-state-text">
|
||||
<span style={{ fontFamily: 'var(--font-mono)' }}>{modelId}</span> isn't currently loaded on any node in the cluster.
|
||||
Check the <Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
|
||||
Check the <Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link> to see which models are running where.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -49,7 +49,7 @@ export default function Backends() {
|
||||
// whenever splitMenuFor changes to a different row index.
|
||||
const splitMenuAnchorRef = useRef(null)
|
||||
|
||||
// Target-node mode: set when navigated from /app/nodes via "+ Add backend".
|
||||
// Target-node mode: set when navigated from /app/cluster via "+ Add backend".
|
||||
// The gallery page header banners the scope; rows collapse their split-button
|
||||
// to a single Install-on-this-node action; manual install posts to the
|
||||
// per-node endpoint.
|
||||
@@ -323,7 +323,7 @@ export default function Backends() {
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
{/* Target-node banner: when this gallery is scoped to one node via
|
||||
?target=<id> (entered from /app/nodes), show the scope clearly and
|
||||
?target=<id> (entered from /app/cluster), show the scope clearly and
|
||||
give a fast way to clear it. Visually a primary-tinted strip so the
|
||||
user knows they're in a special mode without it feeling alarming. */}
|
||||
{targetNode && (
|
||||
|
||||
45
core/http/react-ui/src/pages/Cluster.jsx
Normal file
45
core/http/react-ui/src/pages/Cluster.jsx
Normal file
@@ -0,0 +1,45 @@
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import { useDistributedMode } from '../hooks/useDistributedMode'
|
||||
import { useP2PMode } from '../hooks/useP2PMode'
|
||||
import ClusterSection from '../components/ClusterSection'
|
||||
import ClusterSummary from '../components/ClusterSummary'
|
||||
import Nodes from './Nodes'
|
||||
import P2P from './P2P'
|
||||
|
||||
export default function Cluster() {
|
||||
const { t } = useTranslation('admin')
|
||||
const distributed = useDistributedMode()
|
||||
const p2p = useP2PMode()
|
||||
|
||||
const loading = distributed.loading || p2p.loading
|
||||
const nothingEnabled = !loading && !distributed.enabled && !p2p.enabled
|
||||
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title"><i className="fas fa-network-wired" /> {t('cluster.title', 'Cluster')}</h1>
|
||||
<p className="page-subtitle">{t('cluster.subtitle', 'Distributed and peer-to-peer nodes serving this instance')}</p>
|
||||
</div>
|
||||
|
||||
{!loading && <ClusterSummary distributedEnabled={distributed.enabled} p2pEnabled={p2p.enabled} />}
|
||||
|
||||
{distributed.enabled && (
|
||||
<ClusterSection icon="fas fa-network-wired" title={t('cluster.distributed.title', 'Distributed (NATS)')} defaultOpen>
|
||||
<Nodes embedded />
|
||||
</ClusterSection>
|
||||
)}
|
||||
|
||||
{p2p.enabled && (
|
||||
<ClusterSection icon="fas fa-circle-nodes" title={t('cluster.swarm.title', 'Swarm (p2p)')} defaultOpen={!distributed.enabled}>
|
||||
<P2P embedded />
|
||||
</ClusterSection>
|
||||
)}
|
||||
|
||||
{nothingEnabled && (
|
||||
<div className="card" style={{ padding: 'var(--spacing-lg)', textAlign: 'center', color: 'var(--color-text-muted)' }}>
|
||||
{t('cluster.empty', 'No distributed or p2p clustering is enabled. Start LocalAI in distributed or federated/p2p mode to manage cluster nodes here.')}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
@@ -162,7 +162,7 @@ export default function NodeBackendLogs() {
|
||||
<h2 className="empty-state-title">No node/model selected</h2>
|
||||
<p className="empty-state-text">
|
||||
View backend logs from the{' '}
|
||||
<Link to="/app/nodes" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
|
||||
<Link to="/app/cluster" style={{ color: 'var(--color-primary)' }}>Nodes page</Link>.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
@@ -220,7 +220,7 @@ export default function NodeBackendLogs() {
|
||||
</h1>
|
||||
<p className="page-subtitle" style={{ marginTop: 'var(--spacing-xs)' }}>
|
||||
Backend logs from node <strong>{nodeName || nodeId}</strong>
|
||||
{' '}<Link to="/app/nodes" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
|
||||
{' '}<Link to="/app/cluster" style={{ color: 'var(--color-primary)', fontSize: '0.8125rem' }}>(back to nodes)</Link>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -689,7 +689,7 @@ function SchedulingForm({ onSave, onCancel }) {
|
||||
)
|
||||
}
|
||||
|
||||
export default function Nodes() {
|
||||
export default function Nodes({ embedded = false }) {
|
||||
const { addToast } = useOutletContext()
|
||||
const navigate = useNavigate()
|
||||
const { t } = useTranslation('admin')
|
||||
@@ -983,16 +983,18 @@ export default function Nodes() {
|
||||
const pending = filteredNodes.filter(n => n.status === 'pending').length
|
||||
|
||||
return (
|
||||
<div className="page page--wide">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('nodes.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('nodes.subtitle')}
|
||||
</p>
|
||||
</div>
|
||||
<div className={embedded ? '' : 'page page--wide'}>
|
||||
{!embedded && (
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-network-wired" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('nodes.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('nodes.subtitle')}
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Tabs */}
|
||||
<div className="tabs" style={{ marginBottom: 'var(--spacing-lg)' }}>
|
||||
|
||||
@@ -102,7 +102,7 @@ function StepNumber({ n, bg, color }) {
|
||||
)
|
||||
}
|
||||
|
||||
export default function P2P() {
|
||||
export default function P2P({ embedded = false }) {
|
||||
const { addToast } = useOutletContext()
|
||||
const { t } = useTranslation('admin')
|
||||
const [workers, setWorkers] = useState([])
|
||||
@@ -172,7 +172,7 @@ export default function P2P() {
|
||||
|
||||
if (loading) {
|
||||
return (
|
||||
<div className="page page--narrow" style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
|
||||
<div className={embedded ? '' : 'page page--narrow'} style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
|
||||
<LoadingSpinner size="lg" />
|
||||
</div>
|
||||
)
|
||||
@@ -181,7 +181,7 @@ export default function P2P() {
|
||||
// ── P2P Disabled ──
|
||||
if (!enabled) {
|
||||
return (
|
||||
<div className="page page--narrow">
|
||||
<div className={embedded ? '' : 'page page--narrow'}>
|
||||
<div style={{ textAlign: 'center', padding: 'var(--spacing-xl) 0' }}>
|
||||
<i className="fas fa-network-wired" style={{ fontSize: '3rem', color: 'var(--color-primary)', marginBottom: 'var(--spacing-md)' }} />
|
||||
<h1 style={{ fontSize: '1.5rem', fontWeight: 600, marginBottom: 'var(--spacing-sm)' }}>
|
||||
@@ -294,21 +294,23 @@ export default function P2P() {
|
||||
const mlxTotal = stats.mlx_workers?.total ?? 0
|
||||
|
||||
return (
|
||||
<div className="page page--narrow">
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('p2p.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('p2p.subtitle')}
|
||||
{' '}
|
||||
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
|
||||
style={{ color: 'var(--color-primary)' }}>
|
||||
<i className="fas fa-circle-info" />
|
||||
</a>
|
||||
</p>
|
||||
</div>
|
||||
<div className={embedded ? '' : 'page page--narrow'}>
|
||||
{!embedded && (
|
||||
<div className="page-header">
|
||||
<h1 className="page-title">
|
||||
<i className="fas fa-circle-nodes" style={{ marginRight: 'var(--spacing-sm)' }} />
|
||||
{t('p2p.title')}
|
||||
</h1>
|
||||
<p className="page-subtitle">
|
||||
{t('p2p.subtitle')}
|
||||
{' '}
|
||||
<a href="https://localai.io/features/distribute/" target="_blank" rel="noopener noreferrer"
|
||||
style={{ color: 'var(--color-primary)' }}>
|
||||
<i className="fas fa-circle-info" />
|
||||
</a>
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Network Token */}
|
||||
<div style={{
|
||||
|
||||
@@ -45,7 +45,7 @@ const Talk = page('talk', () => import('./pages/Talk'))
|
||||
const Backends = page('backends', () => import('./pages/Backends'))
|
||||
const Settings = page('settings', () => import('./pages/Settings'))
|
||||
const Traces = page('traces', () => import('./pages/Traces'))
|
||||
const P2P = page('p2p', () => import('./pages/P2P'))
|
||||
const Cluster = page('cluster', () => import('./pages/Cluster'))
|
||||
const Agents = page('agents', () => import('./pages/Agents'))
|
||||
const AgentCreate = page(null, () => import('./pages/AgentCreate'))
|
||||
const AgentChat = page(null, () => import('./pages/AgentChat'))
|
||||
@@ -68,7 +68,6 @@ const Quantize = page('quantize', () => import('./pages/Quantize'))
|
||||
const Studio = page('studio', () => import('./pages/Studio'))
|
||||
const FaceRecognition = page('face', () => import('./pages/FaceRecognition'))
|
||||
const VoiceRecognition = page('voice', () => import('./pages/VoiceRecognition'))
|
||||
const Nodes = page('nodes', () => import('./pages/Nodes'))
|
||||
const NodeBackendLogs = page(null, () => import('./pages/NodeBackendLogs'))
|
||||
const NotFound = page(null, () => import('./pages/NotFound'))
|
||||
const Usage = page('usage', () => import('./pages/Usage'))
|
||||
@@ -120,8 +119,9 @@ const appChildren = [
|
||||
{ path: 'settings', element: <Admin><Settings /></Admin> },
|
||||
{ path: 'traces', element: <Admin><Traces /></Admin> },
|
||||
{ path: 'backend-logs/:modelId', element: <Admin><BackendLogs /></Admin> },
|
||||
{ path: 'p2p', element: <Admin><P2P /></Admin> },
|
||||
{ path: 'nodes', element: <Admin><Nodes /></Admin> },
|
||||
{ path: 'cluster', element: <Admin><Cluster /></Admin> },
|
||||
{ path: 'p2p', element: <Navigate to="/app/cluster" replace /> },
|
||||
{ path: 'nodes', element: <Navigate to="/app/cluster" replace /> },
|
||||
{ path: 'node-backend-logs/:nodeId/:modelId', element: <Admin><NodeBackendLogs /></Admin> },
|
||||
{ path: 'agents', element: <Feature feature="agents"><Agents /></Feature> },
|
||||
{ path: 'agents/new', element: <Feature feature="agents"><AgentCreate /></Feature> },
|
||||
|
||||
81
core/p2p/affinity_sync.go
Normal file
81
core/p2p/affinity_sync.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/blockchain"
|
||||
"github.com/mudler/edgevpn/pkg/hub"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// affinitySubjectKey is the hub.Message annotation carrying the logical subject
|
||||
// (observe vs invalidate) so the receiver can dispatch the way a NATS subject
|
||||
// would. The generic channel has no subject routing, so we carry it ourselves.
|
||||
const affinitySubjectKey = "subject"
|
||||
|
||||
// genericChannelPublisher adapts an edgevpn node's generic broadcast channel to
|
||||
// the prefixcache publisher interface (Publish(subject, v)). It lets a
|
||||
// federation server reuse prefixcache.Sync for cross-server affinity coherence
|
||||
// without NATS: each event is JSON-encoded into a hub.Message and broadcast over
|
||||
// the generic channel (not the slow blockchain ledger).
|
||||
type genericChannelPublisher struct {
|
||||
node *node.Node
|
||||
}
|
||||
|
||||
// Publish satisfies prefixcache's (unexported) publisher interface structurally.
|
||||
func (p *genericChannelPublisher) Publish(subject string, v any) error {
|
||||
payload, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshalling affinity event: %w", err)
|
||||
}
|
||||
return p.node.PublishMessage(&hub.Message{
|
||||
Message: string(payload),
|
||||
Annotations: map[string]interface{}{affinitySubjectKey: subject},
|
||||
})
|
||||
}
|
||||
|
||||
// applyAffinityMessage decodes a generic-channel affinity message and applies it
|
||||
// to sync WITHOUT re-broadcasting (ApplyObserve/ApplyInvalidate). now is the
|
||||
// local clock so TTL is measured per server. Unknown subjects, malformed
|
||||
// payloads, and nil inputs are ignored (debug-logged), never fatal.
|
||||
func applyAffinityMessage(sync *prefixcache.Sync, m *hub.Message, now time.Time) {
|
||||
if sync == nil || m == nil {
|
||||
return
|
||||
}
|
||||
subject, _ := m.Annotations[affinitySubjectKey].(string)
|
||||
switch subject {
|
||||
case messaging.SubjectPrefixCacheObserve:
|
||||
var ev messaging.PrefixCacheObserveEvent
|
||||
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
|
||||
xlog.Debug("affinity: bad observe payload", "error", err)
|
||||
return
|
||||
}
|
||||
sync.ApplyObserve(ev, now)
|
||||
case messaging.SubjectPrefixCacheInvalidate:
|
||||
var ev messaging.PrefixCacheInvalidateEvent
|
||||
if err := json.Unmarshal([]byte(m.Message), &ev); err != nil {
|
||||
xlog.Debug("affinity: bad invalidate payload", "error", err)
|
||||
return
|
||||
}
|
||||
sync.ApplyInvalidate(ev)
|
||||
default:
|
||||
// Other generic-channel traffic; not ours.
|
||||
}
|
||||
}
|
||||
|
||||
// affinityHandler returns the edgevpn generic-channel handler that applies remote
|
||||
// affinity events to this server's index. It is registered at node construction
|
||||
// (handlers cannot be added after Start) and reads fs.prefixSync lazily, which is
|
||||
// safe because messages only arrive after Start, by which point Start has wired
|
||||
// fs.prefixSync.
|
||||
func (fs *FederatedServer) affinityHandler() node.Handler {
|
||||
return func(_ *blockchain.Ledger, m *hub.Message, _ chan *hub.Message) error {
|
||||
applyAffinityMessage(fs.prefixSync, m, time.Now())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
48
core/p2p/affinity_sync_test.go
Normal file
48
core/p2p/affinity_sync_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/hub"
|
||||
)
|
||||
|
||||
var _ = Describe("applyAffinityMessage", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
observeMsg := func(ev messaging.PrefixCacheObserveEvent) *hub.Message {
|
||||
payload, _ := json.Marshal(ev)
|
||||
return &hub.Message{
|
||||
Message: string(payload),
|
||||
Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve},
|
||||
}
|
||||
}
|
||||
|
||||
It("applies a peer observe so the local index resolves the warm peer", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
sync := prefixcache.NewSync(idx, nil)
|
||||
chain := prefixcache.ExtractChain("m1", "a fairly long shared system prompt body for the prefix chain", cfg)
|
||||
|
||||
applyAffinityMessage(sync, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m1", Chain: chain, NodeID: "warm", Replica: 0}), ref)
|
||||
|
||||
d := idx.Decide("m1", chain, []prefixcache.ReplicaKey{{NodeID: "warm"}, {NodeID: "cold"}}, ref)
|
||||
Expect(d.HasHot).To(BeTrue())
|
||||
Expect(d.Hot.NodeID).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("ignores malformed, unknown-subject, and nil inputs without panicking", func() {
|
||||
sync := prefixcache.NewSync(prefixcache.NewIndex(prefixcache.DefaultConfig()), nil)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "not-json", Annotations: map[string]interface{}{affinitySubjectKey: messaging.SubjectPrefixCacheObserve}}, ref)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "{}", Annotations: map[string]interface{}{affinitySubjectKey: "some.other.subject"}}, ref)
|
||||
applyAffinityMessage(sync, &hub.Message{Message: "{}"}, ref)
|
||||
applyAffinityMessage(nil, observeMsg(messaging.PrefixCacheObserveEvent{Model: "m"}), ref)
|
||||
applyAffinityMessage(sync, nil, ref)
|
||||
Expect(true).To(BeTrue())
|
||||
})
|
||||
})
|
||||
@@ -1,10 +1,17 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/LocalAI/pkg/clusterrouting"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
@@ -23,16 +30,29 @@ type FederatedServer struct {
|
||||
requestTable map[string]int
|
||||
loadBalanced bool
|
||||
workerTarget string
|
||||
bodyLimit int64 // max request body bytes (0 = unlimited)
|
||||
prefixCfg prefixcache.Config
|
||||
prefixIndex *prefixcache.Index
|
||||
prefixSync *prefixcache.Sync
|
||||
prefixProvider prefixcache.Provider // Index (sync off) or Sync (sync on)
|
||||
syncAffinity bool
|
||||
}
|
||||
|
||||
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
|
||||
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string, bodyLimit int64, syncAffinity bool) *FederatedServer {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
return &FederatedServer{
|
||||
listenAddr: listenAddr,
|
||||
service: service,
|
||||
p2ptoken: p2pToken,
|
||||
requestTable: map[string]int{},
|
||||
loadBalanced: loadBalanced,
|
||||
workerTarget: workerTarget,
|
||||
listenAddr: listenAddr,
|
||||
service: service,
|
||||
p2ptoken: p2pToken,
|
||||
requestTable: map[string]int{},
|
||||
loadBalanced: loadBalanced,
|
||||
workerTarget: workerTarget,
|
||||
bodyLimit: bodyLimit,
|
||||
prefixCfg: cfg,
|
||||
prefixIndex: idx,
|
||||
prefixProvider: idx,
|
||||
syncAffinity: syncAffinity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,28 +94,141 @@ func (fs *FederatedServer) syncTableStatus() {
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FederatedServer) SelectLeastUsedServer() string {
|
||||
fs.syncTableStatus()
|
||||
// buildFederatedCandidates maps the currently-online federated peers into the
|
||||
// shared routing policy's candidate form, optionally filtered to peers that can
|
||||
// serve model. A peer with a non-empty advertised model set that lacks model is
|
||||
// excluded; a peer with an empty set is treated as "unknown" and stays eligible
|
||||
// (so older peers and mid-convergence peers are not starved). When model is "",
|
||||
// no model filtering is applied. InFlight comes from the per-peer request
|
||||
// counter; AvailableVRAM from the gossiped NodeData; LastUsed is left zero.
|
||||
func buildFederatedCandidates(nodes []schema.NodeData, requestTable map[string]int, now time.Time, model string) []clusterrouting.ReplicaCandidate {
|
||||
candidates := make([]clusterrouting.ReplicaCandidate, 0, len(nodes))
|
||||
for _, nd := range nodes {
|
||||
if !nd.IsOnlineAt(now) {
|
||||
continue
|
||||
}
|
||||
if !servesModel(nd, model) {
|
||||
continue
|
||||
}
|
||||
candidates = append(candidates, clusterrouting.ReplicaCandidate{
|
||||
NodeID: nd.ID,
|
||||
InFlight: requestTable[nd.ID],
|
||||
AvailableVRAM: nd.AvailableVRAM,
|
||||
})
|
||||
}
|
||||
return candidates
|
||||
}
|
||||
|
||||
fs.Lock()
|
||||
defer fs.Unlock()
|
||||
|
||||
xlog.Debug("SelectLeastUsedServer()", "request_table", fs.requestTable)
|
||||
|
||||
// cycle over requestTable and find the entry with the lower number
|
||||
// if there are multiple entries with the same number, select one randomly
|
||||
// if there are no entries, return an empty string
|
||||
var min int
|
||||
var minKey string
|
||||
for k, v := range fs.requestTable {
|
||||
if min == 0 || v < min {
|
||||
min = v
|
||||
minKey = k
|
||||
// servesModel reports whether nd is eligible to serve model. An empty model
|
||||
// means "no filter". An empty advertised set means "unknown" and is eligible.
|
||||
func servesModel(nd schema.NodeData, model string) bool {
|
||||
if model == "" || len(nd.Models) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, m := range nd.Models {
|
||||
if m == model {
|
||||
return true
|
||||
}
|
||||
}
|
||||
xlog.Debug("Selected tunnel", "tunnel", minKey, "requests_served", min, "request_table", fs.requestTable)
|
||||
return false
|
||||
}
|
||||
|
||||
return minKey
|
||||
// extractModel best-effort resolves the target model of a buffered request,
|
||||
// cheapest source first: an explicit query value, then the JSON body "model"
|
||||
// field. Returns "" when it cannot be determined (for example a multipart or
|
||||
// websocket request), in which case the caller routes by load/affinity only.
|
||||
func extractModel(queryModel string, body []byte) string {
|
||||
if strings.TrimSpace(queryModel) != "" {
|
||||
return queryModel
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return ""
|
||||
}
|
||||
var probe struct {
|
||||
Model string `json:"model"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &probe); err != nil {
|
||||
return ""
|
||||
}
|
||||
return probe.Model
|
||||
}
|
||||
|
||||
// affinityPreferred returns the peer the prefix index considers warm for this
|
||||
// chain, or "" when there is no match strong enough among the candidates. It
|
||||
// reuses prefixcache's per-model radix-tree Decide; the final load-guarded pick
|
||||
// is done by clusterrouting.PickWithAffinity so the VRAM tier is preserved.
|
||||
func affinityPreferred(idx prefixcache.Provider, model string, chain []uint64, candidates []clusterrouting.ReplicaCandidate, cfg prefixcache.Config, now time.Time) string {
|
||||
if idx == nil || len(chain) == 0 || len(candidates) == 0 {
|
||||
return ""
|
||||
}
|
||||
keys := make([]prefixcache.ReplicaKey, 0, len(candidates))
|
||||
for _, c := range candidates {
|
||||
keys = append(keys, prefixcache.ReplicaKey{NodeID: c.NodeID})
|
||||
}
|
||||
d := idx.Decide(model, chain, keys, now)
|
||||
if d.HasHot && d.MatchRatio >= cfg.MinPrefixMatch {
|
||||
return d.Hot.NodeID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// selectPeer chooses the federated peer to serve a request for model with the
|
||||
// given raw body. It filters candidates by model, computes the prefix chain,
|
||||
// consults the affinity index, and makes the final load+VRAM-aware pick. It
|
||||
// returns the chosen peer ID and the chain (so the caller can Observe after a
|
||||
// successful forward). An empty model and nil body degrade to load+VRAM only.
|
||||
// Returns "" when no eligible peer is online.
|
||||
func (fs *FederatedServer) selectPeer(model string, body []byte, now time.Time) (string, []uint64) {
|
||||
fs.syncTableStatus()
|
||||
nodes := GetAvailableNodes(fs.service)
|
||||
// Snapshot candidates under the lock (it only guards requestTable), then
|
||||
// release before the prefix hashing and tree walk, which are lock-free
|
||||
// (candidates is a copy; prefixIndex/prefixCfg are set once at construction).
|
||||
fs.Lock()
|
||||
candidates := buildFederatedCandidates(nodes, fs.requestTable, now, model)
|
||||
fs.Unlock()
|
||||
if len(candidates) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
var chain []uint64
|
||||
preferred := ""
|
||||
if fs.prefixProvider != nil && model != "" && len(body) > 0 {
|
||||
chain = prefixcache.ExtractChain(model, string(body), fs.prefixCfg)
|
||||
preferred = affinityPreferred(fs.prefixProvider, model, chain, candidates, fs.prefixCfg, now)
|
||||
}
|
||||
best := clusterrouting.PickWithAffinity(candidates, preferred, fs.prefixCfg.BalanceAbsThreshold)
|
||||
if best == nil {
|
||||
return "", chain
|
||||
}
|
||||
return best.NodeID, chain
|
||||
}
|
||||
|
||||
// observeServed records that peerID served the given chain for model, so the
|
||||
// next request sharing that prefix is routed back to the same warm peer.
|
||||
func (fs *FederatedServer) observeServed(model string, chain []uint64, peerID string, now time.Time) {
|
||||
if fs.prefixProvider == nil || len(chain) == 0 || peerID == "" || model == "" {
|
||||
return
|
||||
}
|
||||
fs.prefixProvider.Observe(model, chain, prefixcache.ReplicaKey{NodeID: peerID}, now)
|
||||
}
|
||||
|
||||
// evictLoop periodically sweeps expired affinity entries so the in-memory tree
|
||||
// does not grow unbounded. Runs for the lifetime of the proxy.
|
||||
func (fs *FederatedServer) evictLoop(ctx context.Context) {
|
||||
interval := fs.prefixCfg.TTL / 2
|
||||
if interval <= 0 {
|
||||
interval = time.Minute
|
||||
}
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-t.C:
|
||||
fs.prefixProvider.Evict(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *FederatedServer) RecordRequest(nodeID string) {
|
||||
|
||||
@@ -1,22 +1,81 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
// ErrBodyTooLarge is returned by readRequest when the buffered request body
|
||||
// exceeds the configured limit. The proxy turns it into a 413 response.
|
||||
var ErrBodyTooLarge = errors.New("request body exceeds limit")
|
||||
|
||||
// readRequest parses a single HTTP request from r and buffers its body (so the
|
||||
// body can both be inspected for the model/prefix and replayed to the chosen
|
||||
// peer). limit caps the buffered body in bytes; 0 means unlimited. A body over
|
||||
// the cap returns ErrBodyTooLarge. The returned request has its body replaced
|
||||
// with the buffered bytes and RequestURI cleared so it can be re-serialized
|
||||
// with req.Write to the peer stream.
|
||||
func readRequest(r *bufio.Reader, limit int64) (*http.Request, []byte, error) {
|
||||
req, err := http.ReadRequest(r)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var body []byte
|
||||
if req.Body != nil {
|
||||
reader := io.Reader(req.Body)
|
||||
if limit > 0 {
|
||||
reader = io.LimitReader(req.Body, limit+1)
|
||||
}
|
||||
body, err = io.ReadAll(reader)
|
||||
_ = req.Body.Close()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if limit > 0 && int64(len(body)) > limit {
|
||||
return nil, nil, ErrBodyTooLarge
|
||||
}
|
||||
}
|
||||
req.Body = io.NopCloser(bytes.NewReader(body))
|
||||
req.ContentLength = int64(len(body))
|
||||
req.RequestURI = ""
|
||||
return req, body, nil
|
||||
}
|
||||
|
||||
// isWebsocketUpgrade reports whether req is a websocket handshake, which must be
|
||||
// forwarded as a raw bidirectional duplex (not request/streamed-response) and
|
||||
// is not body-capped or model-routed.
|
||||
func isWebsocketUpgrade(req *http.Request) bool {
|
||||
return strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") &&
|
||||
strings.EqualFold(req.Header.Get("Upgrade"), "websocket")
|
||||
}
|
||||
|
||||
func (f *FederatedServer) Start(ctx context.Context) error {
|
||||
n, err := NewNode(f.p2ptoken)
|
||||
var extraOpts []node.Option
|
||||
if f.syncAffinity {
|
||||
extraOpts = append(extraOpts, node.EnableGenericHub, node.GenericChannelHandlers(f.affinityHandler()))
|
||||
}
|
||||
n, err := NewNode(f.p2ptoken, extraOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating a new node: %w", err)
|
||||
}
|
||||
if f.syncAffinity {
|
||||
f.prefixSync = prefixcache.NewSync(f.prefixIndex, &genericChannelPublisher{node: n})
|
||||
f.prefixProvider = f.prefixSync
|
||||
xlog.Info("Federation affinity sync enabled (generic channel)")
|
||||
}
|
||||
err = n.Start(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating a new node: %w", err)
|
||||
@@ -28,6 +87,8 @@ func (f *FederatedServer) Start(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
go f.evictLoop(ctx)
|
||||
|
||||
return f.proxy(ctx, n)
|
||||
}
|
||||
|
||||
@@ -62,40 +123,60 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle connections in a new goroutine, forwarding to the p2p service
|
||||
// Handle connections in a new goroutine, terminating HTTP and
|
||||
// forwarding the request to the chosen p2p peer.
|
||||
go func() {
|
||||
workerID := ""
|
||||
if fs.workerTarget != "" {
|
||||
workerID = fs.workerTarget
|
||||
} else if fs.loadBalanced {
|
||||
xlog.Debug("Load balancing request")
|
||||
|
||||
workerID = fs.SelectLeastUsedServer()
|
||||
if workerID == "" {
|
||||
xlog.Debug("Least used server not found, selecting random")
|
||||
workerID = fs.RandomServer()
|
||||
br := bufio.NewReader(conn)
|
||||
req, body, err := readRequest(br, fs.bodyLimit)
|
||||
if err != nil {
|
||||
if err == ErrBodyTooLarge {
|
||||
fs.sendHTMLResponse(conn, 413, "Request body too large")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
workerID = fs.RandomServer()
|
||||
}
|
||||
|
||||
if workerID == "" {
|
||||
xlog.Error("No available nodes yet")
|
||||
fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect")
|
||||
xlog.Error("Failed to read request", "error", err)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
upgrade := isWebsocketUpgrade(req)
|
||||
|
||||
now := time.Now()
|
||||
var (
|
||||
workerID string
|
||||
model string
|
||||
chain []uint64
|
||||
)
|
||||
switch {
|
||||
case fs.workerTarget != "":
|
||||
workerID = fs.workerTarget
|
||||
case !fs.loadBalanced:
|
||||
// Explicit random mode (the RandomWorker flag): keep the
|
||||
// historical random pick, no model/affinity routing.
|
||||
workerID = fs.RandomServer()
|
||||
case upgrade:
|
||||
// Websocket: no readable model; route by load only.
|
||||
workerID, _ = fs.selectPeer("", nil, now)
|
||||
default:
|
||||
model = extractModel(req.URL.Query().Get("model"), body)
|
||||
workerID, chain = fs.selectPeer(model, body, now)
|
||||
}
|
||||
|
||||
if workerID == "" {
|
||||
fs.sendHTMLResponse(conn, 503, "No federated peer available for this request")
|
||||
return
|
||||
}
|
||||
|
||||
xlog.Debug("Selected node", "node", workerID)
|
||||
nodeData, exists := GetNode(fs.service, workerID)
|
||||
if !exists {
|
||||
xlog.Error("Node not found", "node", workerID)
|
||||
fs.sendHTMLResponse(conn, 404, "Node not found")
|
||||
return
|
||||
}
|
||||
|
||||
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
|
||||
if fs.loadBalanced {
|
||||
fs.RecordRequest(workerID)
|
||||
proxyHTTPToPeer(ctx, node, nodeData.ServiceID, conn, req, upgrade)
|
||||
|
||||
fs.RecordRequest(workerID)
|
||||
if !upgrade {
|
||||
fs.observeServed(model, chain, workerID, now)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -132,6 +213,8 @@ func getHTTPStatusText(statusCode int) string {
|
||||
switch statusCode {
|
||||
case 503:
|
||||
return "Service Unavailable"
|
||||
case 413:
|
||||
return "Request Entity Too Large"
|
||||
case 404:
|
||||
return "Not Found"
|
||||
case 200:
|
||||
|
||||
196
core/p2p/federated_test.go
Normal file
196
core/p2p/federated_test.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/core/services/nodes/prefixcache"
|
||||
"github.com/mudler/LocalAI/pkg/clusterrouting"
|
||||
)
|
||||
|
||||
var _ = Describe("buildFederatedCandidates", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
onlineSeen := ref.Add(-10 * time.Second)
|
||||
offlineSeen := ref.Add(-2 * time.Minute)
|
||||
|
||||
It("excludes offline nodes", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "online", LastSeen: onlineSeen},
|
||||
{ID: "offline", LastSeen: offlineSeen},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("online"))
|
||||
})
|
||||
|
||||
It("maps the request counter to InFlight and defaults missing entries to zero", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "a", LastSeen: onlineSeen},
|
||||
{ID: "b", LastSeen: onlineSeen},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{"a": 4}, ref, "")
|
||||
byID := map[string]int{}
|
||||
for _, c := range cands {
|
||||
byID[c.NodeID] = c.InFlight
|
||||
}
|
||||
Expect(byID["a"]).To(Equal(4))
|
||||
Expect(byID["b"]).To(Equal(0))
|
||||
})
|
||||
|
||||
It("carries gossiped AvailableVRAM into the candidate", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "gpu", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands[0].AvailableVRAM).To(Equal(uint64(24_000_000_000)))
|
||||
})
|
||||
|
||||
It("produces candidates the shared policy ranks by least in-flight then most VRAM", func() {
|
||||
// busy-big has the most VRAM but is busy, so it must lose. Among the two
|
||||
// idle peers, the one with more free VRAM wins (VRAM breaks the tie).
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "busy-big", LastSeen: onlineSeen, AvailableVRAM: 80_000_000_000},
|
||||
{ID: "idle-small", LastSeen: onlineSeen, AvailableVRAM: 8_000_000_000},
|
||||
{ID: "idle-big", LastSeen: onlineSeen, AvailableVRAM: 24_000_000_000},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{"busy-big": 3}, ref, "")
|
||||
best := clusterrouting.PickBestReplica(cands)
|
||||
Expect(best).ToNot(BeNil())
|
||||
Expect(best.NodeID).To(Equal("idle-big"))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("model-aware candidate building", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
seen := ref.Add(-10 * time.Second)
|
||||
|
||||
It("keeps peers that advertise the requested model", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "has", LastSeen: seen, Models: []string{"m1", "m2"}},
|
||||
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("has"))
|
||||
})
|
||||
|
||||
It("keeps peers with an empty (unknown) model set eligible for any model", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "unknown", LastSeen: seen, Models: nil},
|
||||
{ID: "hasnot", LastSeen: seen, Models: []string{"other"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "m1")
|
||||
Expect(cands).To(HaveLen(1))
|
||||
Expect(cands[0].NodeID).To(Equal("unknown"))
|
||||
})
|
||||
|
||||
It("does not filter when the requested model is empty", func() {
|
||||
nodes := []schema.NodeData{
|
||||
{ID: "a", LastSeen: seen, Models: []string{"x"}},
|
||||
{ID: "b", LastSeen: seen, Models: []string{"y"}},
|
||||
}
|
||||
cands := buildFederatedCandidates(nodes, map[string]int{}, ref, "")
|
||||
Expect(cands).To(HaveLen(2))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("extractModel", func() {
|
||||
It("reads the JSON body model field", func() {
|
||||
body := []byte(`{"model":"llama-3","messages":[]}`)
|
||||
Expect(extractModel("", body)).To(Equal("llama-3"))
|
||||
})
|
||||
|
||||
It("prefers a path/query model over the body", func() {
|
||||
body := []byte(`{"model":"frombody"}`)
|
||||
Expect(extractModel("fromquery", body)).To(Equal("fromquery"))
|
||||
})
|
||||
|
||||
It("returns empty when no model is present", func() {
|
||||
Expect(extractModel("", []byte(`{"messages":[]}`))).To(Equal(""))
|
||||
})
|
||||
|
||||
It("returns empty on non-JSON / unparseable body without panicking", func() {
|
||||
Expect(extractModel("", []byte("--multipart-boundary--"))).To(Equal(""))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("affinityPreferred", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns the warm peer once a chain has been observed for it", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
|
||||
idx.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
|
||||
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("returns empty when no chain has been observed", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"hello world this is a fairly long shared system prompt"}]}`, cfg)
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(idx, "m1", chain, cands, cfg, ref)).To(Equal(""))
|
||||
})
|
||||
|
||||
It("returns empty for a nil index or empty chain", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
Expect(affinityPreferred(nil, "m1", []uint64{1}, nil, cfg, ref)).To(Equal(""))
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
Expect(affinityPreferred(idx, "m1", nil, nil, cfg, ref)).To(Equal(""))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("L7 request handling", func() {
|
||||
It("reads a buffered request and its body under the cap", func() {
|
||||
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 28\r\n\r\n" +
|
||||
`{"model":"m1","messages":[]}`
|
||||
req, body, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(req.URL.Path).To(Equal("/v1/chat/completions"))
|
||||
Expect(string(body)).To(ContainSubstring(`"model":"m1"`))
|
||||
})
|
||||
|
||||
It("rejects a body over the cap with ErrBodyTooLarge", func() {
|
||||
big := strings.Repeat("a", 200)
|
||||
raw := "POST /x HTTP/1.1\r\nHost: x\r\nContent-Length: 200\r\n\r\n" + big
|
||||
_, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 64)
|
||||
Expect(err).To(MatchError(ErrBodyTooLarge))
|
||||
})
|
||||
|
||||
It("detects a websocket upgrade request", func() {
|
||||
raw := "GET /v1/realtime HTTP/1.1\r\nHost: x\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n"
|
||||
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(isWebsocketUpgrade(req)).To(BeTrue())
|
||||
})
|
||||
|
||||
It("does not flag a normal POST as a websocket upgrade", func() {
|
||||
raw := "POST /v1/chat/completions HTTP/1.1\r\nHost: x\r\nContent-Length: 2\r\n\r\n{}"
|
||||
req, _, err := readRequest(bufio.NewReader(strings.NewReader(raw)), 1024)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(isWebsocketUpgrade(req)).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("affinityPreferred with a sync provider", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns the warm peer when the provider is a Sync wrapping the index", func() {
|
||||
cfg := prefixcache.DefaultConfig()
|
||||
idx := prefixcache.NewIndex(cfg)
|
||||
sync := prefixcache.NewSync(idx, nil)
|
||||
chain := prefixcache.ExtractChain("m1", `{"model":"m1","messages":[{"role":"system","content":"a long shared system prompt for affinity"}]}`, cfg)
|
||||
sync.Observe("m1", chain, prefixcache.ReplicaKey{NodeID: "warm"}, ref)
|
||||
|
||||
cands := []clusterrouting.ReplicaCandidate{{NodeID: "warm"}, {NodeID: "cold"}}
|
||||
Expect(affinityPreferred(sync, "m1", chain, cands, cfg, ref)).To(Equal("warm"))
|
||||
})
|
||||
})
|
||||
@@ -6,15 +6,18 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/utils"
|
||||
"github.com/mudler/LocalAI/pkg/xsysinfo"
|
||||
"github.com/mudler/edgevpn/pkg/config"
|
||||
"github.com/mudler/edgevpn/pkg/node"
|
||||
"github.com/mudler/edgevpn/pkg/protocol"
|
||||
@@ -86,37 +89,39 @@ func nodeAnnounce(ctx context.Context, node *node.Node) {
|
||||
)
|
||||
}
|
||||
|
||||
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
|
||||
ledger, _ := node.Ledger()
|
||||
// openPeerStream resolves serviceID to its advertised peer in the services
|
||||
// ledger and opens a libp2p stream to that peer over the service protocol.
|
||||
// Returns the stream or an error describing which lookup step failed.
|
||||
func openPeerStream(ctx context.Context, n *node.Node, serviceID string) (network.Stream, error) {
|
||||
ledger, _ := n.Ledger()
|
||||
// Retrieve current ID for ip in the blockchain
|
||||
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
|
||||
service := &types.Service{}
|
||||
existingValue.Unmarshal(service)
|
||||
// If mismatch, update the blockchain
|
||||
if !found {
|
||||
zlog.Error("Service not found on blockchain")
|
||||
conn.Close()
|
||||
// ll.Debugf("service '%s' not found on blockchain", serviceID)
|
||||
return
|
||||
return nil, errors.New("service not found on blockchain")
|
||||
}
|
||||
|
||||
// Decode the Peer
|
||||
d, err := peer.Decode(service.PeerID)
|
||||
if err != nil {
|
||||
zlog.Error("cannot decode peer")
|
||||
|
||||
conn.Close()
|
||||
// ll.Debugf("could not decode peer '%s'", service.PeerID)
|
||||
return
|
||||
return nil, fmt.Errorf("cannot decode peer: %w", err)
|
||||
}
|
||||
|
||||
// Open a stream
|
||||
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
|
||||
stream, err := n.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
|
||||
if err != nil {
|
||||
zlog.Error("cannot open stream peer", "error", err)
|
||||
return nil, fmt.Errorf("cannot open stream peer: %w", err)
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
|
||||
stream, err := openPeerStream(ctx, node, serviceID)
|
||||
if err != nil {
|
||||
zlog.Error("Could not open peer stream", "error", err)
|
||||
conn.Close()
|
||||
// ll.Debugf("could not open stream '%s'", err.Error())
|
||||
return
|
||||
}
|
||||
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
|
||||
@@ -130,6 +135,45 @@ func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string,
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// proxyHTTPToPeer forwards an already-parsed HTTP request to the chosen peer
|
||||
// over a libp2p stream and streams the response back to conn. When duplex is
|
||||
// true (a websocket upgrade) it runs a bidirectional copy after writing the
|
||||
// request, so post-101 frames flow both ways. The response is never buffered,
|
||||
// so SSE keeps flowing.
|
||||
func proxyHTTPToPeer(ctx context.Context, n *node.Node, serviceID string, conn net.Conn, req *http.Request, duplex bool) {
|
||||
stream, err := openPeerStream(ctx, n, serviceID)
|
||||
if err != nil {
|
||||
zlog.Error("Could not open peer stream", "error", err)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
// Force the peer to close after responding so the one-way io.Copy below
|
||||
// terminates. Without this the peer keeps the HTTP/1.1 connection alive and
|
||||
// io.Copy(conn, stream) blocks forever, leaking the goroutine, conn, and
|
||||
// stream. Websocket upgrades keep keep-alive: their duplex copy owns the
|
||||
// lifetime.
|
||||
req.Header.Del("Connection")
|
||||
req.Close = !duplex
|
||||
if err := req.Write(stream); err != nil {
|
||||
zlog.Error("Could not write request to peer", "error", err)
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
if duplex {
|
||||
closer := make(chan struct{}, 2)
|
||||
go copyStream(closer, stream, conn)
|
||||
go copyStream(closer, conn, stream)
|
||||
<-closer
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
_, _ = io.Copy(conn, stream)
|
||||
_ = stream.Close()
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
|
||||
zlog.Info("Allocating service", "service", service, "address", listenAddr)
|
||||
// Open local port for listening
|
||||
@@ -311,7 +355,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv
|
||||
}
|
||||
|
||||
// This is the P2P worker main
|
||||
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
|
||||
func ExposeService(ctx context.Context, host, port, token, servicesID string, modelsFn func() []string) (*node.Node, error) {
|
||||
if servicesID == "" {
|
||||
servicesID = defaultServicesID
|
||||
}
|
||||
@@ -347,10 +391,16 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
|
||||
20*time.Second,
|
||||
func() {
|
||||
updatedMap := map[string]any{}
|
||||
var models []string
|
||||
if modelsFn != nil {
|
||||
models = modelsFn()
|
||||
}
|
||||
updatedMap[name] = &schema.NodeData{
|
||||
Name: name,
|
||||
LastSeen: time.Now(),
|
||||
ID: nodeID(name),
|
||||
Name: name,
|
||||
LastSeen: time.Now(),
|
||||
ID: nodeID(name),
|
||||
AvailableVRAM: xsysinfo.GetGPUAggregateInfo().FreeVRAM,
|
||||
Models: models,
|
||||
}
|
||||
ledger.Add(servicesID, updatedMap)
|
||||
},
|
||||
@@ -359,11 +409,12 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (*
|
||||
return n, err
|
||||
}
|
||||
|
||||
func NewNode(token string) (*node.Node, error) {
|
||||
func NewNode(token string, extraOpts ...node.Option) (*node.Node, error) {
|
||||
nodeOpts, err := newNodeOpts(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeOpts = append(nodeOpts, extraOpts...)
|
||||
|
||||
n, err := node.New(nodeOpts...)
|
||||
if err != nil {
|
||||
|
||||
13
core/p2p/p2p_suite_test.go
Normal file
13
core/p2p/p2p_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestP2P(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "P2P Suite")
|
||||
}
|
||||
@@ -121,18 +121,38 @@ type StoresFindResponse struct {
|
||||
Similarities []float32 `json:"similarities" yaml:"similarities"`
|
||||
}
|
||||
|
||||
// NodeOnlineWindow is how long after its last announce a node still counts as
|
||||
// online. Nodes re-announce into the edgevpn ledger every 20s (see core/p2p
|
||||
// ExposeService), so 40s tolerates a single missed announce.
|
||||
const NodeOnlineWindow = 40 * time.Second
|
||||
|
||||
type NodeData struct {
|
||||
Name string
|
||||
ID string
|
||||
TunnelAddress string
|
||||
ServiceID string
|
||||
LastSeen time.Time
|
||||
// AvailableVRAM is the node's free GPU VRAM in bytes at its last announce,
|
||||
// gossiped so federation selection can prefer peers with more headroom.
|
||||
// Zero for CPU-only nodes and for peers on an older version that does not
|
||||
// publish it; the routing policy treats zero as the lowest VRAM tier.
|
||||
AvailableVRAM uint64
|
||||
// Models is the set of model names this peer currently serves, gossiped so
|
||||
// the federation proxy can route a request only to peers that have the
|
||||
// requested model. Empty means "unknown" (an older peer, or one that has
|
||||
// not loaded any model yet) and is treated as eligible for any model so a
|
||||
// mixed-version swarm is not starved.
|
||||
Models []string
|
||||
}
|
||||
|
||||
func (d NodeData) IsOnline() bool {
|
||||
now := time.Now()
|
||||
// if the node was seen in the last 40 seconds, it's online
|
||||
return now.Sub(d.LastSeen) < 40*time.Second
|
||||
return d.IsOnlineAt(time.Now())
|
||||
}
|
||||
|
||||
// IsOnlineAt reports whether the node counts as online relative to now. It is
|
||||
// split from IsOnline so selection logic can be exercised with a fixed clock.
|
||||
func (d NodeData) IsOnlineAt(now time.Time) bool {
|
||||
return now.Sub(d.LastSeen) < NodeOnlineWindow
|
||||
}
|
||||
|
||||
type P2PNodesResponse struct {
|
||||
|
||||
39
core/schema/nodedata_test.go
Normal file
39
core/schema/nodedata_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package schema_test
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
)
|
||||
|
||||
var _ = Describe("NodeData", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("is online within the online window", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-30 * time.Second)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeTrue())
|
||||
})
|
||||
|
||||
It("is offline once the online window has elapsed", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-50 * time.Second)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
|
||||
})
|
||||
|
||||
It("treats exactly the window boundary as offline (strict less-than)", func() {
|
||||
nd := schema.NodeData{LastSeen: ref.Add(-schema.NodeOnlineWindow)}
|
||||
Expect(nd.IsOnlineAt(ref)).To(BeFalse())
|
||||
})
|
||||
|
||||
It("carries AvailableVRAM in bytes", func() {
|
||||
nd := schema.NodeData{AvailableVRAM: 8_000_000_000}
|
||||
Expect(nd.AvailableVRAM).To(Equal(uint64(8_000_000_000)))
|
||||
})
|
||||
|
||||
It("carries the advertised model set", func() {
|
||||
nd := schema.NodeData{Models: []string{"llama-3", "qwen"}}
|
||||
Expect(nd.Models).To(ConsistOf("llama-3", "qwen"))
|
||||
})
|
||||
})
|
||||
42
go.mod
42
go.mod
@@ -37,14 +37,14 @@ require (
|
||||
github.com/microcosm-cc/bluemonday v1.0.27
|
||||
github.com/modelcontextprotocol/go-sdk v1.5.0
|
||||
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b
|
||||
github.com/mudler/edgevpn v0.32.2
|
||||
github.com/mudler/edgevpn v0.34.0
|
||||
github.com/mudler/go-processmanager v0.1.1
|
||||
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8
|
||||
github.com/mudler/xlog v0.0.6
|
||||
github.com/nats-io/nats.go v1.52.0
|
||||
github.com/ollama/ollama v0.20.4
|
||||
github.com/onsi/ginkgo/v2 v2.29.0
|
||||
github.com/onsi/gomega v1.40.0
|
||||
github.com/onsi/gomega v1.41.0
|
||||
github.com/openai/openai-go/v3 v3.26.0
|
||||
github.com/otiai10/copy v1.14.1
|
||||
github.com/otiai10/openaigo v1.7.0
|
||||
@@ -63,10 +63,10 @@ require (
|
||||
github.com/testcontainers/testcontainers-go/modules/nats v0.42.0
|
||||
github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0
|
||||
github.com/timbutler/zxcvbn v1.0.4
|
||||
go.opentelemetry.io/otel v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.65.0
|
||||
go.opentelemetry.io/otel/metric v1.43.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0
|
||||
go.opentelemetry.io/otel v1.44.0
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.66.0
|
||||
go.opentelemetry.io/otel/metric v1.44.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.44.0
|
||||
google.golang.org/grpc v1.80.0
|
||||
google.golang.org/protobuf v1.36.11
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
@@ -123,7 +123,7 @@ require (
|
||||
github.com/go-openapi/validate v0.25.1 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||
github.com/google/certificate-transparency-go v1.3.2 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
|
||||
github.com/in-toto/attestation v1.1.2 // indirect
|
||||
github.com/in-toto/in-toto-golang v0.9.0 // indirect
|
||||
github.com/invopop/jsonschema v0.13.0 // indirect
|
||||
@@ -155,7 +155,7 @@ require (
|
||||
github.com/transparency-dev/merkle v0.0.2 // indirect
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
|
||||
go.mongodb.org/mongo-driver v1.17.6 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
|
||||
sigs.k8s.io/yaml v1.6.0 // indirect
|
||||
)
|
||||
|
||||
@@ -325,7 +325,7 @@ require (
|
||||
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
|
||||
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 // indirect
|
||||
go.uber.org/mock v0.5.2 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.4
|
||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||
@@ -351,7 +351,7 @@ require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/c-robinson/iplib v1.0.8 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/containerd/cgroups v1.1.0 // indirect
|
||||
github.com/containerd/continuity v0.4.4 // indirect
|
||||
github.com/containerd/errdefs v1.0.0 // indirect
|
||||
@@ -392,10 +392,10 @@ require (
|
||||
github.com/henvic/httpretty v0.1.4 // indirect
|
||||
github.com/huandu/xstrings v1.5.0 // indirect
|
||||
github.com/huin/goupnp v1.3.0 // indirect
|
||||
github.com/ipfs/boxo v0.37.0 // indirect
|
||||
github.com/ipfs/boxo v0.39.0 // indirect
|
||||
github.com/ipfs/go-cid v0.6.1 // indirect
|
||||
github.com/ipfs/go-datastore v0.9.1 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.9.1 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.9.2 // indirect
|
||||
github.com/ipld/go-ipld-prime v0.23.0 // indirect
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||
github.com/jaypipes/pcidb v1.1.1 // indirect
|
||||
@@ -407,9 +407,9 @@ require (
|
||||
github.com/libp2p/go-cidranger v1.1.0 // indirect
|
||||
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
|
||||
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.39.0 // indirect
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.40.0 // indirect
|
||||
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
|
||||
github.com/libp2p/go-libp2p-pubsub v0.15.0 // indirect
|
||||
github.com/libp2p/go-libp2p-pubsub v0.16.0 // indirect
|
||||
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect
|
||||
github.com/libp2p/go-msgio v0.3.0 // indirect
|
||||
@@ -421,7 +421,7 @@ require (
|
||||
github.com/mailru/easyjson v0.9.0 // indirect
|
||||
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mattn/go-isatty v0.0.22 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.17 // indirect
|
||||
github.com/miekg/dns v1.1.72 // indirect
|
||||
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
|
||||
@@ -487,25 +487,25 @@ require (
|
||||
github.com/yuin/goldmark-emoji v1.0.6 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.43.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.44.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.44.0 // indirect
|
||||
go.uber.org/dig v1.19.0 // indirect
|
||||
go.uber.org/fx v1.24.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.27.1 // indirect
|
||||
go.uber.org/zap v1.28.0 // indirect
|
||||
golang.org/x/crypto v0.51.0
|
||||
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
|
||||
golang.org/x/mod v0.35.0 // indirect
|
||||
golang.org/x/sync v0.20.0
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
golang.org/x/sys v0.45.0 // indirect
|
||||
golang.org/x/term v0.43.0
|
||||
golang.org/x/text v0.37.0 // indirect
|
||||
golang.org/x/tools v0.44.0 // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb // indirect
|
||||
golang.zx2c4.com/wireguard/windows v0.5.3 // indirect
|
||||
golang.zx2c4.com/wireguard/windows v0.6.1 // indirect
|
||||
gonum.org/v1/gonum v0.17.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
howett.net/plist v1.0.2-0.20250314012144-ee69052608d9 // indirect
|
||||
|
||||
86
go.sum
86
go.sum
@@ -649,8 +649,8 @@ github.com/gpustack/gguf-parser-go v0.24.0/go.mod h1:y4TwTtDqFWTK+xvprOjRUh+dowg
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
|
||||
github.com/hack-pad/go-indexeddb v0.3.2 h1:DTqeJJYc1usa45Q5r52t01KhvlSN02+Oq+tQbSBI91A=
|
||||
github.com/hack-pad/go-indexeddb v0.3.2/go.mod h1:QvfTevpDVlkfomY498LhstjwbPW6QC4VC/lxYb0Kom0=
|
||||
github.com/hack-pad/safejs v0.1.0 h1:qPS6vjreAqh2amUqj4WNG1zIw7qlRQJ9K10eDKMCnE8=
|
||||
@@ -722,8 +722,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
|
||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E=
|
||||
github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
|
||||
github.com/ipfs/boxo v0.37.0 h1:2E3mZvydMI2t5IkAgtkmZ3sGsld0oS7o3I+xyzDk6uI=
|
||||
github.com/ipfs/boxo v0.37.0/go.mod h1:8yyiRn54F2CsW13n0zwXEPrVsZix/gFj9SYIRYMZ6KE=
|
||||
github.com/ipfs/boxo v0.39.0 h1:u9jLf5pLx5SWROXjHtj8VMvv+iDlMbiTyZ/vVTQ4VhI=
|
||||
github.com/ipfs/boxo v0.39.0/go.mod h1:k9YCvMjytFguMHndEiGdCGMMj4b7CkdOT44vtgAxOdk=
|
||||
github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk=
|
||||
github.com/ipfs/go-block-format v0.2.3/go.mod h1:WJaQmPAKhD3LspLixqlqNFxiZ3BZ3xgqxxoSR/76pnA=
|
||||
github.com/ipfs/go-cid v0.6.1 h1:T5TnNb08+ueovG76Z5gx1L4Y7QOaGTXHg1F6raWFxIc=
|
||||
@@ -735,10 +735,10 @@ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46U
|
||||
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
|
||||
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
|
||||
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
|
||||
github.com/ipfs/go-log/v2 v2.9.1 h1:3JXwHWU31dsCpvQ+7asz6/QsFJHqFr4gLgQ0FWteujk=
|
||||
github.com/ipfs/go-log/v2 v2.9.1/go.mod h1:evFx7sBiohUN3AG12mXlZBw5hacBQld3ZPHrowlJYoo=
|
||||
github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc=
|
||||
github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o=
|
||||
github.com/ipfs/go-log/v2 v2.9.2 h1:O/5BB0elpkRILvT24rCJ5976wWd7u0nJ436T3rdYdc4=
|
||||
github.com/ipfs/go-log/v2 v2.9.2/go.mod h1:RziRwwXWhndlk8L75RnEe0zeAYaq2heKtEMc3jqUov0=
|
||||
github.com/ipfs/go-test v0.3.0 h1:0Y4Uve3tp9HI+2lIJjfOliOrOgv/YpXg/l1y3P4DEYE=
|
||||
github.com/ipfs/go-test v0.3.0/go.mod h1:JK+U8pRpATZb7lsYNSJlCj3WYB3cFfWIbI6nWRM/GFk=
|
||||
github.com/ipld/go-ipld-prime v0.23.0 h1:csqdPZH60BsTC+AZrv7fpa27v+09I/oTqyHYYYE27eE=
|
||||
github.com/ipld/go-ipld-prime v0.23.0/go.mod h1:46YCFSFNFBJHPjB0pfMuv7Ly7df2eChpkpyPo5SE0bA=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
@@ -839,12 +839,12 @@ github.com/libp2p/go-libp2p v0.48.0 h1:h2BrLAgrj7X8bEN05K7qmrjpNHYA+6tnsGRdprjTn
|
||||
github.com/libp2p/go-libp2p v0.48.0/go.mod h1:Q1fBZNdmC2Hf82husCTfkKJVfHm2we5zk+NWmOGEmWk=
|
||||
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
|
||||
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.39.0 h1:mww38eBYiUvdsu+Xl/GLlBC0Aa8M+5HAwvafkFOygAM=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.39.0/go.mod h1:Po2JugFEkDq9Vig/JXtc153ntOi0q58o4j7IuITCOVs=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.40.0 h1:as8U7Y1RX9CTKCBiFBHWKZ6tSS+rE+6WNz+H1+M+wbo=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.40.0/go.mod h1:iLUjII47u3/HjxyhucI2lhsl29lrzlAs/ym16+H40jE=
|
||||
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
|
||||
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.15.0 h1:cG7Cng2BT82WttmPFMi50gDNV+58K626m/wR00vGL1o=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.15.0/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.16.0 h1:j7G2C8kJwkcAQqYR7Wmq3d75d3Sgw/N0Hhiv0dVx7OY=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.16.0/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
|
||||
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=
|
||||
github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E=
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI=
|
||||
@@ -885,8 +885,8 @@ github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stg
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4=
|
||||
github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4=
|
||||
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
|
||||
github.com/mattn/go-runewidth v0.0.17 h1:78v8ZlW0bP43XfmAfPsdXcoNCelfMHsDmd/pkENfrjQ=
|
||||
@@ -972,8 +972,8 @@ github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87 h1:az+2umaD/sT1rRv
|
||||
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87/go.mod h1:x77p9W1zKZr+W+UcEwg8/qdp00p4XXOI69wE7WlXZc0=
|
||||
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU=
|
||||
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
|
||||
github.com/mudler/edgevpn v0.32.2 h1:umTPyyZgkom/A81Bk4HbP0p1ZSEU5EFPW3Bg+YPxI8A=
|
||||
github.com/mudler/edgevpn v0.32.2/go.mod h1:UaMc8MORbcRsAjuO5gVJj9Bn3Nq2AP5U9NTb6epVyv8=
|
||||
github.com/mudler/edgevpn v0.34.0 h1:qDrD/rCPFY/FdURbXudIZWihVKY4VOX3nMn3CcbeQEU=
|
||||
github.com/mudler/edgevpn v0.34.0/go.mod h1:yki7uMi5LR9gSMrw8PdPieuxsrk8BLV2Ui7VBEmbbIA=
|
||||
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA=
|
||||
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig=
|
||||
github.com/mudler/go-processmanager v0.1.1 h1:c/1NRZOZpW8HuFv9RhBG57nQu1oDMRomEHedwBFMlrw=
|
||||
@@ -1044,8 +1044,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
||||
github.com/onsi/ginkgo/v2 v2.29.0 h1:rfh+ZFjgJhYWRoIqVf3Uwx/W20yLrcrE2h2GmYVRaag=
|
||||
github.com/onsi/ginkgo/v2 v2.29.0/go.mod h1:+aXOY+vzZ5mu2iI2HpTZUPmM//oQfsNFX6gU9kNcA44=
|
||||
github.com/onsi/gomega v1.40.0 h1:Vtol0e1MghCD2ZVIilPDIg44XSL9l2QAn8ZNaljWcJc=
|
||||
github.com/onsi/gomega v1.40.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A=
|
||||
github.com/onsi/gomega v1.41.0 h1:OwKp4pXNgVxf6sCplzYo794OFNuoL2q2SBMU5NSWOjA=
|
||||
github.com/onsi/gomega v1.41.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A=
|
||||
github.com/openai/openai-go/v3 v3.26.0 h1:bRt6H/ozMNt/dDkN4gobnLqaEGrRGBzmbVs0xxJEnQE=
|
||||
github.com/openai/openai-go/v3 v3.26.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
@@ -1417,20 +1417,22 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0=
|
||||
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
|
||||
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.65.0 h1:jOveH/b4lU9HT7y+Gfamf18BqlOuz2PWEvs8yM7Q6XE=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.65.0/go.mod h1:i1P8pcumauPtUI4YNopea1dhzEMuEqWP1xoUZDylLHo=
|
||||
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
|
||||
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
|
||||
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
|
||||
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
|
||||
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
|
||||
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg=
|
||||
go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU=
|
||||
go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.66.0 h1:vkrK8PAznv2NKt2r+kdu252ccGzkEqLc2aSXbQIALYQ=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.66.0/go.mod h1:V/UB6D3vMF/UBOL5igAsAYnk1nG/bzYYTzvsB16cy7o=
|
||||
go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc=
|
||||
go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo=
|
||||
go.opentelemetry.io/otel/metric/x v0.66.0 h1:YkCrx1zLOChi9ZcZ6euupOcsgzbVlec7D/xoEU1+cTA=
|
||||
go.opentelemetry.io/otel/metric/x v0.66.0/go.mod h1:d1+BDj9t96do0/1LoU1ayfCv79ZgNE41qbhBvnMOBZk=
|
||||
go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58=
|
||||
go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA=
|
||||
go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk=
|
||||
go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE=
|
||||
go.starlark.net v0.0.0-20250417143717-f57e51f710eb h1:zOg9DxxrorEmgGUr5UPdCEwKqiqG0MlZciuCuA3XiDE=
|
||||
go.starlark.net v0.0.0-20250417143717-f57e51f710eb/go.mod h1:YKMCv9b1WrfWmeqdV5MAuEHWsu5iC+fe6kYl2sQjdI8=
|
||||
go.step.sm/crypto v0.74.0 h1:/APBEv45yYR4qQFg47HA8w1nesIGcxh44pGyQNw6JRA=
|
||||
@@ -1452,8 +1454,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
|
||||
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
|
||||
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
|
||||
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
|
||||
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
go.uber.org/zap v1.28.0 h1:IZzaP1Fv73/T/pBMLk4VutPl36uNC+OSUh3JLG3FIjo=
|
||||
go.uber.org/zap v1.28.0/go.mod h1:rDLpOi171uODNm/mxFcuYWxDsqWSAVkFdX4XojSKg/Q=
|
||||
go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
|
||||
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
@@ -1674,8 +1676,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
|
||||
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
|
||||
golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa h1:efT73AJZfAAUV7SOip6pWGkwJDzIGiKBZGVzHYa+ve4=
|
||||
golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa/go.mod h1:kHjTxDEnAu6/Nl9lDkzjWpR+bmKfxeiRuSDlsMb70gE=
|
||||
@@ -1785,8 +1787,8 @@ golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 h1:B82qJJgjvYKsXS9jeu
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI=
|
||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb h1:whnFRlWMcXI9d+ZbWg+4sHnLp52d5yiIPUxMBSt4X9A=
|
||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb/go.mod h1:rpwXGsirqLqN2L0JDJQlwOboGHmptD5ZD6T2VmcqhTw=
|
||||
golang.zx2c4.com/wireguard/windows v0.5.3 h1:On6j2Rpn3OEMXqBq00QEDC7bWSZrPIHKIus8eIuExIE=
|
||||
golang.zx2c4.com/wireguard/windows v0.5.3/go.mod h1:9TEe8TJmtwyQebdFwAkEWOPr3prrtqm+REGFifP60hI=
|
||||
golang.zx2c4.com/wireguard/windows v0.6.1 h1:XMaKojH1Hs/raMrmnir4n35nTvzvWj7NmSYzHn2F4qU=
|
||||
golang.zx2c4.com/wireguard/windows v0.6.1/go.mod h1:04aqInu5GYuTFvMuDw/rKBAF7mHrltW/3rekpfbbZDM=
|
||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
@@ -1865,10 +1867,10 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE
|
||||
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
|
||||
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9 h1:LvZVVaPE0JSqL+ZWb6ErZfnEOKIqqFWUJE2D0fObSmc=
|
||||
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9/go.mod h1:QFOrLhdAe2PsTp3vQY4quuLKTi9j3XG3r6JPPaw7MSc=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
|
||||
|
||||
33
pkg/clusterrouting/affinity.go
Normal file
33
pkg/clusterrouting/affinity.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package clusterrouting
|
||||
|
||||
// PickWithAffinity prefers the candidate whose NodeID equals preferredNodeID
|
||||
// when that candidate's in-flight count is within slack of the least-loaded
|
||||
// candidate; otherwise it falls back to PickBestReplica (least in-flight, then
|
||||
// oldest last-used, then most free VRAM). This keeps a warm prefix-cache peer
|
||||
// sticky without letting it become a hot-spot: once it is more than slack
|
||||
// requests busier than the least-loaded peer, load wins. With an empty
|
||||
// preferredNodeID, or a preferred node not in the set, it is exactly
|
||||
// PickBestReplica. slack mirrors prefixcache's BalanceAbsThreshold.
|
||||
func PickWithAffinity(candidates []ReplicaCandidate, preferredNodeID string, slack int) *ReplicaCandidate {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
if preferredNodeID == "" {
|
||||
return PickBestReplica(candidates)
|
||||
}
|
||||
var preferred *ReplicaCandidate
|
||||
minInFlight := candidates[0].InFlight
|
||||
for i := range candidates {
|
||||
c := &candidates[i]
|
||||
if c.InFlight < minInFlight {
|
||||
minInFlight = c.InFlight
|
||||
}
|
||||
if c.NodeID == preferredNodeID {
|
||||
preferred = c
|
||||
}
|
||||
}
|
||||
if preferred != nil && preferred.InFlight <= minInFlight+slack {
|
||||
return preferred
|
||||
}
|
||||
return PickBestReplica(candidates)
|
||||
}
|
||||
48
pkg/clusterrouting/affinity_test.go
Normal file
48
pkg/clusterrouting/affinity_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package clusterrouting
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("PickWithAffinity", func() {
|
||||
ref := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
It("returns nil for an empty candidate list", func() {
|
||||
Expect(PickWithAffinity(nil, "x", 2)).To(BeNil())
|
||||
})
|
||||
|
||||
It("falls back to PickBestReplica when no preferred node is given", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "busy", InFlight: 3, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "idle", InFlight: 0, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "", 2).NodeID).To(Equal("idle"))
|
||||
})
|
||||
|
||||
It("honors the preferred node when it is within the in-flight slack of the least-loaded", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "warm", InFlight: 2, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("warm"))
|
||||
})
|
||||
|
||||
It("ignores the preferred node when it is beyond the slack and falls back to load+VRAM", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "cold", InFlight: 0, LastUsed: ref, AvailableVRAM: 80},
|
||||
{NodeID: "warm", InFlight: 5, LastUsed: ref, AvailableVRAM: 8},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "warm", 2).NodeID).To(Equal("cold"))
|
||||
})
|
||||
|
||||
It("falls back to load+VRAM when the preferred node is not among the candidates", func() {
|
||||
cs := []ReplicaCandidate{
|
||||
{NodeID: "a", InFlight: 1, LastUsed: ref, AvailableVRAM: 8},
|
||||
{NodeID: "b", InFlight: 1, LastUsed: ref, AvailableVRAM: 24},
|
||||
}
|
||||
Expect(PickWithAffinity(cs, "ghost", 2).NodeID).To(Equal("b"))
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user