Compare commits

...

10 Commits

Author SHA1 Message Date
LocalAI [bot]
6942713d85 chore: ⬆️ Update leejet/stable-diffusion.cpp to 3a8788cb7d74f185d6b18688e9563015524ecaf5 (#9933)
⬆️ Update leejet/stable-diffusion.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-22 00:31:19 +02:00
LocalAI [bot]
0cf52c44d4 chore: ⬆️ Update ggml-org/whisper.cpp to 8443cf05e3fa8ce1b32348e1bcbcf8fc31f7f3ae (#9929)
⬆️ Update ggml-org/whisper.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-21 23:24:01 +02:00
LocalAI [bot]
0d34cf7cbd chore: ⬆️ Update ikawrakow/ik_llama.cpp to 48a55f74e4c6e2aeda363dd386c1ac9170a0af71 (#9930)
⬆️ Update ikawrakow/ik_llama.cpp

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
2026-05-21 23:23:37 +02:00
LocalAI [bot]
f0cb02afb8 feat(usage): attribute Sources rows to user accounts in admin view (#9935)
The merged feature (#9920) let admins see per-API-key and per-source
totals but did not surface which user owned each key, and lumped
every user's Web UI traffic into a single global Web UI row. This
makes the admin Sources tab properly per-user attributable:

- KeyTotal gains UserID + UserName, populated from the snapshot the
  usage middleware already records. The by_key roll-up now groups by
  (api_key_id, api_key_name, user_id, user_name).
- New SourceTotals.ByUserSource roll-up groups (source, user_id,
  user_name) for sources without a key identity (web, legacy). Only
  populated on the admin path (includeLegacy=true); the non-admin
  endpoint stays unchanged for backwards compatibility.
- SourcesTable accepts showUserColumn={isAdmin}; admin view renders
  a User column, makes the search match user name/id, and expands
  Web UI / legacy pseudo-rows from the global aggregate to one row
  per user using by_user_source.

Refs: #9862

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-21 23:23:06 +02:00
LocalAI [bot]
a39e025d64 fix(nodes): make per-node backend install async via gallery job queue (#9928)
* feat(galleryop): add TargetNodeID to ManagementOp for single-node installs

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(galleryop): add NodeScopedKey helpers for per-node opcache rows

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(galleryop): use strings.Cut for NodeScopedKey parsing, reject empty nodeID

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(nodes): scope DistributedBackendManager.InstallBackend to single node via TargetNodeID

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(http): make /api/nodes/:id/backends/install async via gallery service job queue

The handler previously called unloader.InstallBackend synchronously and
blocked the browser for up to 3 minutes waiting on the NATS reply. It now
enqueues a TargetNodeID-scoped ManagementOp on BackendGalleryChannel and
returns HTTP 202 + jobID immediately, matching /api/backends/install/:id.

The opcache key is built via NodeScopedKey(nodeID, backend) so concurrent
installs of the same backend across different nodes do not stomp each
other. galleryService/opcache/appConfig are threaded through
RegisterNodeAdminRoutes for this.

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(http): log malformed backend_galleries override and stop test drain goroutine

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(api): expose nodeID for node-scoped backend ops in /api/operations

Node-scoped backend installs land in opcache under "node:<nodeID>:<backend>"
keys. Without splitting that prefix back out, the operations panel renders
the full key as the display name and has no structured way to label which
worker an install is targeting. Detect the prefix, surface nodeID as its own
response field, and reduce the display name back to the bare backend slug.
Bare (non-scoped) ops are left untouched so legacy installs do not gain a
misleading empty nodeID.

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(react-ui): poll job status for node-targeted backend installs

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(react-ui): make NodeInstallPicker state updates pure and surface cancellations as errors

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(react-ui): clarify async semantics in handleInstallOnTarget

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(http): use statusUrl casing for node install response to match codebase precedent

Assisted-by: Claude:opus-4-7 [Edit] [Bash]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-21 22:25:53 +02:00
Ettore Di Giacinto
05e8e1e9f4 ci(images): publish chronologically-orderable master-<epoch>-<sha> tags
The existing master push pipeline produces `master` (rolling) and
`sha-<short>` tags. Neither is orderable by build time, so downstream
GitOps that want to auto-bump to the newest master build (e.g. Flux
ImagePolicy) can't pick the latest from the tag list — alphabetical
sort over hex shas is effectively random, and the rolling `master`
tag can't be referenced as an immutable bump target.

Add a third tag of the form `master-<epoch>-<sha>` (Unix epoch in
seconds + short sha), gated on default-branch pushes via metadata-
action's `is_default_branch` predicate. The sha is retained for
traceability; the epoch makes the tags numerically orderable, so a
Flux ImagePolicy like

  filterTags:
    pattern: '^master-(?P<ts>[0-9]+)-[a-f0-9]+$'
    extract: '$ts'
  policy:
    numerical:
      order: asc

will reliably bump to the newest master build.

Applied to both image_build.yml (OCI labels stay consistent) and
image_merge.yml (the actual tag publisher via buildx imagetools).
2026-05-21 17:18:30 +00:00
Rin
a7f6cc8956 [utils] Fail immediately on extraction errors (#9926)
utils: fail immediately on extraction errors

Setting ContinueOnError to false ensures that ExtractArchive does not
leave the model or backend directory in an inconsistent state if a
partial failure occurs. This improves robustness against malformed
archives or unexpected I/O issues during installation.

Signed-off-by: RinZ27 <222222878+RinZ27@users.noreply.github.com>
2026-05-21 19:00:33 +02:00
LocalAI [bot]
f15b9178ec feat(usage): track and visualise usage per API key (#9920)
* feat(usage): add Source, APIKeyID, APIKeyName columns to UsageRecord

Adds three additive columns plus UsageSource* constants. The columns
are auto-migrated by InitDB. APIKeyID is a nullable foreign reference
to UserAPIKey.ID; APIKeyName is snapshotted on each row so revoked
keys keep showing their name in history.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): backfill Source on pre-feature usage rows

InitDB now classifies any pre-existing usage_record with an empty
source: 'legacy-api-key' user -> legacy, everything else -> web.
The backfill is idempotent (only touches NULL/empty rows).

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): add GetUserUsageBySource aggregator

Groups by (bucket, source, api_key_id, api_key_name). Filters out
legacy by default. Returns both per-bucket detail and roll-ups
(by_source, by_key sorted desc and capped at 200, grand_total).

The MAX(created_at) projection is iterated via Rows().Scan into a
string column and parsed manually because the SQLite driver surfaces
the aggregated timestamp as a string, which database/sql refuses to
scan directly into time.Time. Postgres returns a real timestamp; the
same string path handles its RFC3339 form too.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(usage): log Rows() errors and assert LastUsed in tests

Adds rows.Err() and Rows() open-failure logging in
computeSourceTotals so silent data drops surface in logs. Logs on
parseLastUsedString format misses for the same reason. Strengthens
the snapshot-survival test to assert LastUsed is a recent timestamp,
locking the SQLite time-string parser behaviour.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): add admin GetAllUsageBySource with filters and truncation

Optional user_id and api_key_id filters (composed with AND). Legacy
bucket is included for admin callers. truncated=true when more than
200 distinct keys would be in the by_key roll-up.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(auth): plumb auth_source and auth_apikey through Echo context

tryAuthenticate now sets auth_source on every successful branch
(web for session/Bearer-session, apikey for Bearer-key/x-api-key/
token-cookie, legacy for legacy env key match). For named-key
branches it also stores the resolved *UserAPIKey under auth_apikey
so downstream middlewares can snapshot id+name without re-validating.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(auth): expand tryAuthenticate godoc and cover Bearer-session branch

Documents all three context-keys side effects (auth_source,
auth_apikey, _auth_session) plus the split of responsibilities with
the parent Middleware. Adds a test for the Bearer-as-session-token
classification so future regressions there fail loudly.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): UsageMiddleware records source + snapshots key name

Reads auth_source and auth_apikey from the Echo context (set by
auth.Middleware in the previous task). Snapshots UserAPIKey.ID and
Name onto each row so revoked keys remain readable in history.
Falls back to source=web when no auth_source is set (auth disabled
or unrecognised path).

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): add /api/auth/usage/sources and admin variant

Self endpoint filters legacy server-side; admin endpoint includes
legacy and accepts user_id + api_key_id filters. Response includes
buckets, totals.{by_source, by_key, grand_total}, and a truncated
flag set when the per-key roll-up was capped at 200.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(routes): mark test mirror handlers as keep-in-sync with production

The newTestAuthApp helper duplicates production route handlers
inline because it cannot use RegisterAuthRoutes (which requires a
*application.Application). Naming the source path on each mirror
makes the drift contract explicit for future maintainers.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): add usageApi.getMySources/getAdminSources + i18n strings

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): add Sources tab skeleton with data fetch

Adds Usage page tab that fetches /api/auth/usage/sources (or the
admin variant). Renders raw totals plus a placeholder key list;
real visualisations land in subsequent commits. Restructures the
existing tab button block so Models and Sources are visible to
non-admins (Users remains admin-only).

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): source mix ribbon + searchable/sortable sources table

Replaces the SourcesTab placeholder rendering with two reusable
components: SourceMixRibbon (one segmented bar per source class)
and SourcesTable (search + sort + revoked-key dim). Pulls the
current API key list to detect revoked keys.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(ui): skip revoked-key detection until the key list is known

existingKeyIds defaulted to an empty Set, which made every live
api_key row render as (revoked) during the brief window before
apiKeysApi.list() resolved, and permanently after a fetch failure.
Use null as the unknown state and suppress the revoked badge until
the parent provides a real Set.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(ui): top-N stacked time chart and drill-in chip for Sources tab

Top 7 sources by total tokens get distinct colours; the rest roll up
into 'Other'. Clicking a row in the SourcesTable dims everything
except that series in the chart; the chip is the canonical clear.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* docs(usage): document per-API-key Sources tab and endpoints

Extends features/authentication.md Usage Tracking section with:
- A 'Sources' tab description and source-class taxonomy
- Endpoint documentation for /api/auth/usage/sources and the
  admin variant
- Response shape example with by_source / by_key / grand_total
- Migration note about pre-feature row backfill

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(usage): silence errcheck on deferred rows.Close

CI errcheck flagged the bare 'defer rows.Close()' in
computeSourceTotals. Wrap in a closure that discards the close
error explicitly; an error here is non-actionable since we have
already drained the rows and logged any iteration failure.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(usage): bound batcher intake and add Shutdown/FlushNow hooks

The pre-existing usage batcher had no cap on its add() path; the
usageMaxPending=5000 constant only guarded the re-queue path after
a failed write, leaving memory growth unbounded if the DB fell
behind. This commit:

- Adds the cap to add() so saturation drops new records (rate-limited
  warn at 1/1024) instead of growing unbounded.
- Raises usageMaxPending to 50000 to absorb realistic inference bursts.
- Replaces the package-level batcher global with a mutex-guarded pair
  plus a currentBatcher() accessor so Init / Shutdown cycles are
  race-free.
- Adds ShutdownUsageRecorder() for graceful drain on process exit
  (not yet wired into app shutdown, just published).
- Adds FlushNow() for deterministic tests; the middleware suite no
  longer needs 6s sleeps per spec and now runs in ~50ms instead of 18s.
- Re-queue on failed flush is now cap-aware: prepends as much of the
  failed batch as fits alongside concurrent arrivals, instead of
  dropping the whole batch when full.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(usage): drain usage batcher on graceful shutdown

Registers ShutdownUsageRecorder with the existing
signals.RegisterGracefulTerminationHandler so SIGINT/SIGTERM
synchronously flushes any in-memory usage records before the
process exits. Without this, up to one flush interval (5s) of
recorded usage was lost when LocalAI restarted.

Refs: #9862
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-21 16:34:02 +02:00
LocalAI [bot]
959de86761 feat(llama-cpp): make server-side prompt cache work by default (#9925)
Aligns LocalAI's llama-cpp gRPC backend with upstream's auto-on prompt
cache path so repeated system prompts (agents, OpenAI/Anthropic-compatible
CLIs, coding assistants) skip prefill on subsequent calls without any
YAML changes. Reported in #9921.

Upstream's server enables `kv_unified=true` (and bumps `n_parallel` to 4)
when slot count is auto, which unlocks `cache_idle_slots`. LocalAI
hardcodes `n_parallel=1` and so far also hardcoded `kv_unified=false`,
which silently force-disables idle-slot saving at server init. The host
prompt cache was allocated but never written across requests.

Changes in backend/cpp/llama-cpp/grpc-server.cpp:
- params.kv_unified: false -> true (single-slot path now benefits from
  the prompt cache; users can opt out with `kv_unified:false`)
- params.n_ctx_checkpoints: 8 -> 32 (match upstream default)
- params.cache_idle_slots = true initialized explicitly (upstream default)
- params.checkpoint_every_nt = 8192 initialized explicitly (upstream default)
- New option parsers: cache_idle_slots / idle_slots_cache,
  checkpoint_every_nt / checkpoint_every_n_tokens

Docs:
- features/text-generation.md: fix misleading `cache_ram` description
  (it's the host-side prompt cache, not the KV cache), document the
  kv_unified + cache_ram + cache_idle_slots interaction, add rows for
  the two newly-exposed options, and add a worked example for the
  agent/CLI workload from the issue.
- advanced/model-configuration.md: mark the legacy `prompt_cache_path`
  / `prompt_cache_all` / `prompt_cache_ro` YAML fields as unused by the
  llama-cpp gRPC backend (they target upstream's CLI completion tool
  and are not consumed by grpc-server.cpp) and point readers at the
  new prompt-cache explainer.

Closes #9921

Assisted-by: claude:opus-4.7

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-21 16:31:48 +02:00
LocalAI [bot]
4c234abc2c refactor(agents): bump skillserver, drop redundant Name from list_skills output (#9916)
refactor(agents): bump skillserver, drop redundant Name from list_skills/search_skills

skillserver's list_skills MCP tool used to ship every entry with name=""
(field was commented out), while search_skills populated it - two tools
with inconsistent shape for the same data. skill.Name and skill.ID are
populated from the same source string anyway (the directory name), so
returning both was pure duplication.

Bumps github.com/mudler/skillserver to a7317cb, which drops the Name
field from both SkillInfo and SearchResult and leaves ID as the single
canonical identifier (already what read_skill consumes).

Adds core/services/skills/skills_mcp_test.go, a regression that drives
the LocalAI FilesystemManager through an in-process MCP session and
asserts a newly-created skill is visible by ID on the still-open session.

This is a cleanup, not the root cause of #9868 - the reporter likely
sees something deeper than a cosmetic JSON shape issue.

Assisted-by: Claude:claude-opus-4-7 [Claude Code]

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-21 14:45:53 +02:00
41 changed files with 2783 additions and 134 deletions

View File

@@ -106,6 +106,7 @@ jobs:
type=ref,event=branch
type=semver,pattern={{raw}}
type=sha
type=raw,value={{branch}}-{{date 'X'}}-{{sha}},enable={{is_default_branch}}
flavor: |
latest=${{ inputs.tag-latest }}
suffix=${{ inputs.tag-suffix }},onlatest=true

View File

@@ -80,6 +80,7 @@ jobs:
type=ref,event=branch
type=semver,pattern={{raw}}
type=sha
type=raw,value={{branch}}-{{date 'X'}}-{{sha}},enable={{is_default_branch}}
flavor: |
latest=${{ inputs.tag-latest }}
suffix=${{ inputs.tag-suffix }},onlatest=true

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=11a1fea9e291f12ce2c803a9d7812c30ca806bcf
IK_LLAMA_VERSION?=48a55f74e4c6e2aeda363dd386c1ac9170a0af71
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -517,10 +517,27 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
params.warmup = true;
// no_op_offload: disable host tensor op offload (default: false)
params.no_op_offload = false;
// kv_unified: enable unified KV cache (default: false)
params.kv_unified = false;
// n_ctx_checkpoints: max context checkpoints per slot (default: 8)
params.n_ctx_checkpoints = 8;
// kv_unified: enable unified KV cache. Upstream's server auto-enables this
// when the slot count is auto (-np <0), bumping n_parallel to 4 alongside.
// LocalAI keeps n_parallel=1 by default, which would skip that auto path
// and leave kv_unified=false. We flip the default to true here so the
// server-side prompt cache (cache_idle_slots) is actually usable on the
// single-slot path that LocalAI ships with: without it, idle slots are
// never persisted across requests and the prompt cache is dead weight.
// Users can opt out with `options: [ "kv_unified:false" ]`.
params.kv_unified = true;
// n_ctx_checkpoints: max context checkpoints per slot. Match upstream's
// default (32); the previous LocalAI-specific 8 was unnecessarily tight
// and limits partial-prefix recovery without a clear memory rationale.
params.n_ctx_checkpoints = 32;
// cache_idle_slots: save and clear idle slot KV to the prompt cache on
// task switch. Upstream default is true; the server auto-disables it if
// kv_unified=false or cache_ram_mib=0, so flipping kv_unified above is
// what actually unlocks it.
params.cache_idle_slots = true;
// checkpoint_every_nt: create a context checkpoint every N tokens during
// prefill (-1 disables). Match upstream's default (8192).
params.checkpoint_every_nt = 8192;
// decode options. Options are in form optname:optvale, or if booleans only optname.
for (int i = 0; i < request->options_size(); i++) {
@@ -679,7 +696,29 @@ static void params_parse(server_context& /*ctx_server*/, const backend::ModelOpt
try {
params.n_ctx_checkpoints = std::stoi(optval_str);
} catch (const std::exception& e) {
// If conversion fails, keep default value (8)
// If conversion fails, keep default value (32)
}
}
// --- server-side idle-slot prompt cache toggle (upstream --cache-idle-slots) ---
// Saves the slot's KV state into the host-side prompt cache on task
// switch so a later request with the same prefix can warm-load it.
// Auto-disabled by the server if kv_unified=false or cache_ram=0.
} else if (!strcmp(optname, "cache_idle_slots") || !strcmp(optname, "idle_slots_cache")) {
if (optval_str == "true" || optval_str == "1" || optval_str == "yes" || optval_str == "on" || optval_str == "enabled") {
params.cache_idle_slots = true;
} else if (optval_str == "false" || optval_str == "0" || optval_str == "no" || optval_str == "off" || optval_str == "disabled") {
params.cache_idle_slots = false;
}
// --- prefill checkpoint cadence (upstream -cpent / --checkpoint-every-n-tokens) ---
// -1 disables checkpointing during prefill.
} else if (!strcmp(optname, "checkpoint_every_nt") || !strcmp(optname, "checkpoint_every_n_tokens")) {
if (optval != NULL) {
try {
params.checkpoint_every_nt = std::stoi(optval_str);
} catch (const std::exception& e) {
// If conversion fails, keep default value (8192)
}
}

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=5b0267e941cade15bd80089d89838795d9f4baa6
STABLEDIFFUSION_GGML_VERSION?=3a8788cb7d74f185d6b18688e9563015524ecaf5
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=afa2ea544fb4b0448916b4a31ecd33c8685bd482
WHISPER_CPP_VERSION?=8443cf05e3fa8ce1b32348e1bcbcf8fc31f7f3ae
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -28,6 +28,7 @@ import (
"github.com/mudler/LocalAI/core/services/monitoring"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/quantization"
"github.com/mudler/LocalAI/pkg/signals"
"github.com/mudler/xlog"
)
@@ -267,9 +268,12 @@ func API(application *application.Application) (*echo.Echo, error) {
e.Static("/generated-videos", videoPath)
}
// Initialize usage recording when auth DB is available
// Initialize usage recording when auth DB is available, and ensure the
// batcher drains its in-memory queue on graceful shutdown so the last
// few seconds of usage don't disappear when the process exits.
if application.AuthDB() != nil {
httpMiddleware.InitUsageRecorder(application.AuthDB())
signals.RegisterGracefulTerminationHandler(httpMiddleware.ShutdownUsageRecorder)
}
// Auth is applied to _all_ endpoints. Filtering out endpoints to bypass is
@@ -403,7 +407,7 @@ func API(application *application.Application) (*echo.Echo, error) {
}
}
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
// Distributed SSE routes (job progress + agent events via NATS)
if d := application.Distributed(); d != nil {

View File

@@ -38,9 +38,15 @@ func InitDB(databaseURL string) (*gorm.DB, error) {
}
// Backfill: users created before the provider column existed have an empty
// provider treat them as local accounts so the UI can identify them.
// provider - treat them as local accounts so the UI can identify them.
db.Exec("UPDATE users SET provider = ? WHERE provider = '' OR provider IS NULL", ProviderLocal)
// Backfill: pre-feature usage_records have no source column. Classify them so the
// new per-source aggregators include them.
if err := BackfillUsageSource(db); err != nil {
return nil, fmt.Errorf("failed to backfill usage source: %w", err)
}
// Create composite index on users(provider, subject) for fast OAuth lookups
if err := db.Exec("CREATE INDEX IF NOT EXISTS idx_users_provider_subject ON users(provider, subject)").Error; err != nil {
// Ignore error on postgres if index already exists

View File

@@ -16,8 +16,10 @@ import (
)
const (
contextKeyUser = "auth_user"
contextKeyRole = "auth_role"
contextKeyUser = "auth_user"
contextKeyRole = "auth_role"
contextKeyAPIKey = "auth_apikey"
contextKeySource = "auth_source"
)
// Middleware returns an Echo middleware that handles authentication.
@@ -75,6 +77,7 @@ func Middleware(db *gorm.DB, appConfig *config.ApplicationConfig) echo.Middlewar
}
c.Set(contextKeyUser, syntheticUser)
c.Set(contextKeyRole, RoleAdmin)
c.Set(contextKeySource, UsageSourceLegacy)
authenticated = true
}
}
@@ -213,6 +216,20 @@ func GetUserRole(c echo.Context) string {
return role
}
// GetAPIKey returns the resolved API key from the echo context, or nil.
// Nil for session-cookie and legacy-env-key authentication.
func GetAPIKey(c echo.Context) *UserAPIKey {
k, _ := c.Get(contextKeyAPIKey).(*UserAPIKey)
return k
}
// GetSource returns the request's authentication source: UsageSourceAPIKey,
// UsageSourceWeb, UsageSourceLegacy, or empty if no authentication was performed.
func GetSource(c echo.Context) string {
s, _ := c.Get(contextKeySource).(string)
return s
}
// RequireRouteFeature returns a global middleware that checks the user has access
// to the feature required by the matched route. It uses the RouteFeatureRegistry
// to look up the required feature for each route pattern + HTTP method.
@@ -421,47 +438,67 @@ func RequireQuota(db *gorm.DB) echo.MiddlewareFunc {
}
// tryAuthenticate attempts to authenticate the request using the database.
//
// On success it returns the user and, as a side effect, sets the following
// values on the Echo context:
// - contextKeySource ("auth_source"): always set, one of UsageSourceWeb /
// UsageSourceAPIKey. UsageSourceLegacy is set elsewhere by the parent
// Middleware when a legacy env key matches.
// - contextKeyAPIKey ("auth_apikey"): set to the resolved *UserAPIKey for
// named-key branches (Bearer, x-api-key, xi-api-key, token cookie).
// - "_auth_session": session record, used by Middleware to drive cookie
// rotation. Only set on the session-cookie branch.
//
// contextKeyUser and contextKeyRole are populated by the parent Middleware
// after this function returns.
func tryAuthenticate(c echo.Context, db *gorm.DB, appConfig *config.ApplicationConfig) *User {
hmacSecret := appConfig.Auth.APIKeyHMACSecret
// a. Session cookie
// a. Session cookie -> web UI
if cookie, err := c.Cookie(sessionCookie); err == nil && cookie.Value != "" {
if user, session := ValidateSession(db, cookie.Value, hmacSecret); user != nil {
// Store session for rotation check in middleware
c.Set("_auth_session", session)
c.Set(contextKeySource, UsageSourceWeb)
return user
}
}
// b. Authorization: Bearer token
// b. Authorization: Bearer
authHeader := c.Request().Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") {
token := strings.TrimPrefix(authHeader, "Bearer ")
// Try as session ID first
// b1. Session token via Bearer -> still web UI
if user, _ := ValidateSession(db, token, hmacSecret); user != nil {
c.Set(contextKeySource, UsageSourceWeb)
return user
}
// Try as user API key
// b2. Named API key
if key, err := ValidateAPIKey(db, token, hmacSecret); err == nil {
c.Set(contextKeySource, UsageSourceAPIKey)
c.Set(contextKeyAPIKey, key)
return &key.User
}
}
// c. x-api-key / xi-api-key headers
// c. x-api-key / xi-api-key -> named API key
for _, header := range []string{"x-api-key", "xi-api-key"} {
if key := c.Request().Header.Get(header); key != "" {
if apiKey, err := ValidateAPIKey(db, key, hmacSecret); err == nil {
if k := c.Request().Header.Get(header); k != "" {
if apiKey, err := ValidateAPIKey(db, k, hmacSecret); err == nil {
c.Set(contextKeySource, UsageSourceAPIKey)
c.Set(contextKeyAPIKey, apiKey)
return &apiKey.User
}
}
}
// d. token cookie (legacy)
// d. token cookie -> named API key
if cookie, err := c.Cookie("token"); err == nil && cookie.Value != "" {
// Try as user API key
if key, err := ValidateAPIKey(db, cookie.Value, hmacSecret); err == nil {
c.Set(contextKeySource, UsageSourceAPIKey)
c.Set(contextKeyAPIKey, key)
return &key.User
}
}

View File

@@ -303,4 +303,122 @@ var _ = Describe("Auth Middleware", func() {
}
})
})
Describe("auth context plumbing for usage source", func() {
// probeApp builds a minimal echo app with the auth middleware and a single
// "/probe" route that captures the user, source, and apikey from context.
type probe struct {
user *auth.User
source string
key *auth.UserAPIKey
}
probeApp := func(db *gorm.DB, appConfig *config.ApplicationConfig, p *probe) *echo.Echo {
e := echo.New()
e.Use(auth.Middleware(db, appConfig))
e.GET("/probe", func(c echo.Context) error {
p.user = auth.GetUser(c)
p.source = auth.GetSource(c)
p.key = auth.GetAPIKey(c)
return c.NoContent(http.StatusOK)
})
return e
}
It("session cookie sets source=web, apikey=nil", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
user := createTestUser(db, "alice@example.com", auth.RoleUser, auth.ProviderLocal)
token := createTestSession(db, user.ID)
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withSessionCookie(token))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.user).ToNot(BeNil())
Expect(p.user.ID).To(Equal(user.ID))
Expect(p.source).To(Equal(auth.UsageSourceWeb))
Expect(p.key).To(BeNil())
})
It("Bearer session token sets source=web, apikey=nil", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
user := createTestUser(db, "alice@example.com", auth.RoleUser, auth.ProviderLocal)
token := createTestSession(db, user.ID)
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withBearerToken(token))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.user).ToNot(BeNil())
Expect(p.user.ID).To(Equal(user.ID))
Expect(p.source).To(Equal(auth.UsageSourceWeb))
Expect(p.key).To(BeNil())
})
It("Bearer API key sets source=apikey and exposes the resolved *UserAPIKey", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
user := createTestUser(db, "alice@example.com", auth.RoleUser, auth.ProviderLocal)
plaintext, key, err := auth.CreateAPIKey(db, user.ID, "ci", auth.RoleUser, appConfig.Auth.APIKeyHMACSecret, nil)
Expect(err).ToNot(HaveOccurred())
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withBearerToken(plaintext))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.source).To(Equal(auth.UsageSourceAPIKey))
Expect(p.key).ToNot(BeNil())
Expect(p.key.ID).To(Equal(key.ID))
})
It("x-api-key header sets source=apikey", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
user := createTestUser(db, "alice@example.com", auth.RoleUser, auth.ProviderLocal)
plaintext, _, err := auth.CreateAPIKey(db, user.ID, "ci", auth.RoleUser, appConfig.Auth.APIKeyHMACSecret, nil)
Expect(err).ToNot(HaveOccurred())
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withXApiKey(plaintext))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.source).To(Equal(auth.UsageSourceAPIKey))
Expect(p.key).ToNot(BeNil())
})
It("token cookie sets source=apikey", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
user := createTestUser(db, "alice@example.com", auth.RoleUser, auth.ProviderLocal)
plaintext, _, err := auth.CreateAPIKey(db, user.ID, "ci", auth.RoleUser, appConfig.Auth.APIKeyHMACSecret, nil)
Expect(err).ToNot(HaveOccurred())
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withTokenCookie(plaintext))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.source).To(Equal(auth.UsageSourceAPIKey))
Expect(p.key).ToNot(BeNil())
})
It("legacy env key sets source=legacy, apikey=nil", func() {
db := testDB()
appConfig := config.NewApplicationConfig()
appConfig.ApiKeys = []string{"legacy-secret"}
var p probe
app := probeApp(db, appConfig, &p)
rec := doRequest(app, http.MethodGet, "/probe", withBearerToken("legacy-secret"))
Expect(rec.Code).To(Equal(http.StatusOK))
Expect(p.source).To(Equal(auth.UsageSourceLegacy))
Expect(p.key).To(BeNil())
})
})
})

View File

@@ -5,14 +5,31 @@ import (
"strings"
"time"
"github.com/mudler/xlog"
"gorm.io/gorm"
)
// Source classification for a UsageRecord.
const (
UsageSourceAPIKey = "apikey" // request authenticated with a named UserAPIKey
UsageSourceWeb = "web" // request authenticated with a session cookie (web UI)
UsageSourceLegacy = "legacy" // request authenticated with an env-configured legacy key
)
// UsageRecord represents a single API request's token usage.
type UsageRecord struct {
ID uint `gorm:"primaryKey;autoIncrement"`
UserID string `gorm:"size:36;index:idx_usage_user_time"`
UserName string `gorm:"size:255"`
ID uint `gorm:"primaryKey;autoIncrement"`
UserID string `gorm:"size:36;index:idx_usage_user_time"`
UserName string `gorm:"size:255"`
// Source classifies how the request authenticated. One of UsageSource* constants.
// Empty for pre-feature rows until the InitDB backfill runs.
Source string `gorm:"size:16;index:idx_usage_source"`
// APIKeyID is the UserAPIKey.ID when Source == UsageSourceAPIKey. Nil otherwise.
APIKeyID *string `gorm:"size:36;index:idx_usage_apikey"`
// APIKeyName is a snapshot of UserAPIKey.Name at write time. Survives key deletion.
APIKeyName string `gorm:"size:255"`
Model string `gorm:"size:255;index"`
Endpoint string `gorm:"size:255"`
PromptTokens int64
@@ -30,9 +47,12 @@ func RecordUsage(db *gorm.DB, record *UsageRecord) error {
// UsageBucket is an aggregated time bucket for the dashboard.
type UsageBucket struct {
Bucket string `json:"bucket"`
Model string `json:"model"`
Model string `json:"model,omitempty"`
UserID string `json:"user_id,omitempty"`
UserName string `json:"user_name,omitempty"`
Source string `json:"source,omitempty"`
APIKeyID string `json:"api_key_id,omitempty"`
APIKeyName string `json:"api_key_name,omitempty"`
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
@@ -119,6 +139,28 @@ func GetUserUsage(db *gorm.DB, userID, period string) ([]UsageBucket, error) {
return buckets, nil
}
// BackfillUsageSource sets the Source column on pre-feature usage rows.
// Idempotent: only touches rows where source is NULL or empty.
// - rows whose user_id == "legacy-api-key" -> UsageSourceLegacy
// - everything else -> UsageSourceWeb
func BackfillUsageSource(db *gorm.DB) error {
// Legacy first (more specific predicate)
if err := db.Exec(
`UPDATE usage_records SET source = ? WHERE (source IS NULL OR source = '') AND user_id = ?`,
UsageSourceLegacy, "legacy-api-key",
).Error; err != nil {
return fmt.Errorf("backfill legacy usage source: %w", err)
}
// Everything else -> web
if err := db.Exec(
`UPDATE usage_records SET source = ? WHERE (source IS NULL OR source = '')`,
UsageSourceWeb,
).Error; err != nil {
return fmt.Errorf("backfill web usage source: %w", err)
}
return nil
}
// GetAllUsage returns aggregated usage for all users (admin). Optional userID filter.
func GetAllUsage(db *gorm.DB, period, userID string) ([]UsageBucket, error) {
sqlite := isSQLiteDB(db)
@@ -149,3 +191,257 @@ func GetAllUsage(db *gorm.DB, period, userID string) ([]UsageBucket, error) {
}
return buckets, nil
}
// TotalsEntry is a token+request roll-up.
type TotalsEntry struct {
Tokens int64 `json:"tokens"`
Requests int64 `json:"requests"`
}
// KeyTotal is the per-key roll-up returned by sources endpoints. UserID and
// UserName are snapshotted from the UsageRecord so revoked-and-deleted keys
// still carry their owner attribution in admin views.
type KeyTotal struct {
APIKeyID string `json:"api_key_id"`
APIKeyName string `json:"api_key_name"`
UserID string `json:"user_id"`
UserName string `json:"user_name"`
Tokens int64 `json:"tokens"`
Requests int64 `json:"requests"`
LastUsed time.Time `json:"last_used"`
}
// UserSourceTotal is a per-(user, source) roll-up for sources that don't carry
// a named API key identity (web, legacy). It exists so admin views can show
// which user generated each block of Web UI / legacy traffic; the per-apikey
// breakdown for source=apikey already lives in KeyTotal.
type UserSourceTotal struct {
Source string `json:"source"`
UserID string `json:"user_id"`
UserName string `json:"user_name"`
Tokens int64 `json:"tokens"`
Requests int64 `json:"requests"`
}
// SourceTotals summarises a per-source breakdown.
type SourceTotals struct {
BySource map[string]TotalsEntry `json:"by_source"`
ByKey []KeyTotal `json:"by_key"` // server-sorted desc by tokens, capped
ByUserSource []UserSourceTotal `json:"by_user_source,omitempty"` // populated only when includeLegacy=true
GrandTotal TotalsEntry `json:"grand_total"`
}
const maxKeyTotals = 200
// GetUserUsageBySource returns per-source aggregated usage for one user. Legacy
// is excluded by design (visible to admins only via the admin variant).
func GetUserUsageBySource(db *gorm.DB, userID, period string) ([]UsageBucket, SourceTotals, error) {
sqlite := isSQLiteDB(db)
since, dateFmt := periodToWindow(period, sqlite)
bucketExpr := fmt.Sprintf("%s as bucket", dateFmt)
query := db.Model(&UsageRecord{}).
Select(bucketExpr+", source, COALESCE(api_key_id, '') as api_key_id, api_key_name, "+
"SUM(prompt_tokens) as prompt_tokens, "+
"SUM(completion_tokens) as completion_tokens, "+
"SUM(total_tokens) as total_tokens, "+
"COUNT(*) as request_count").
Where("user_id = ?", userID).
Where("source <> ?", UsageSourceLegacy).
Group("bucket, source, api_key_id, api_key_name").
Order("bucket ASC")
if !since.IsZero() {
query = query.Where("created_at >= ?", since)
}
var buckets []UsageBucket
if err := query.Find(&buckets).Error; err != nil {
return nil, SourceTotals{}, err
}
totals := computeSourceTotals(db, userID, "", since, false)
return buckets, totals, nil
}
// computeSourceTotals rolls up by_source / by_key / grand_total.
// userID/apiKeyID are optional filters. includeLegacy controls whether the
// legacy bucket is exposed (admin-only).
func computeSourceTotals(db *gorm.DB, userID, apiKeyID string, since time.Time, includeLegacy bool) SourceTotals {
totals := SourceTotals{BySource: map[string]TotalsEntry{}}
bySourceQ := db.Model(&UsageRecord{}).
Select("source, SUM(total_tokens) as tokens, COUNT(*) as requests").
Group("source")
bySourceQ = applyFilters(bySourceQ, userID, apiKeyID, since, includeLegacy)
var bySourceRows []struct {
Source string
Tokens int64
Requests int64
}
if err := bySourceQ.Scan(&bySourceRows).Error; err != nil {
xlog.Warn("computeSourceTotals: by-source Scan failed", "error", err)
return totals
}
for _, r := range bySourceRows {
totals.BySource[r.Source] = TotalsEntry{Tokens: r.Tokens, Requests: r.Requests}
totals.GrandTotal.Tokens += r.Tokens
totals.GrandTotal.Requests += r.Requests
}
byKeyQ := db.Model(&UsageRecord{}).
Select("COALESCE(api_key_id, '') as api_key_id, api_key_name, "+
"user_id, user_name, "+
"SUM(total_tokens) as tokens, COUNT(*) as requests, MAX(created_at) as last_used").
Where("api_key_id IS NOT NULL AND api_key_id <> ''").
Group("api_key_id, api_key_name, user_id, user_name").
Order("tokens DESC").
Limit(maxKeyTotals)
byKeyQ = applyFilters(byKeyQ, userID, apiKeyID, since, includeLegacy)
// Iterate Rows() manually because MAX(created_at) is returned as a string by
// the SQLite driver, and Go's database/sql refuses to scan that into
// *time.Time. Postgres returns a proper timestamp. We accept both shapes
// via a Rows.Scan into a string column, then parse uniformly.
rows, err := byKeyQ.Rows()
if err != nil {
xlog.Warn("computeSourceTotals: by-key Rows() failed", "error", err)
} else {
defer func() { _ = rows.Close() }()
out := make([]KeyTotal, 0)
for rows.Next() {
var (
apiKeyID, apiKeyName, userIDCol, userName, lastUsedRaw string
tokens, requests int64
)
if scanErr := rows.Scan(&apiKeyID, &apiKeyName, &userIDCol, &userName, &tokens, &requests, &lastUsedRaw); scanErr != nil {
continue
}
out = append(out, KeyTotal{
APIKeyID: apiKeyID,
APIKeyName: apiKeyName,
UserID: userIDCol,
UserName: userName,
Tokens: tokens,
Requests: requests,
LastUsed: parseLastUsedString(lastUsedRaw),
})
}
if rerr := rows.Err(); rerr != nil {
xlog.Warn("computeSourceTotals: by-key rows iteration failed", "error", rerr)
}
totals.ByKey = out
}
// by_user_source: only populated for admin callers (includeLegacy=true) so
// they can attribute Web UI / legacy traffic to specific users. Per-apikey
// rows already carry user info via KeyTotal above, so this query only
// covers source != apikey.
if includeLegacy {
byUserSourceQ := db.Model(&UsageRecord{}).
Select("source, user_id, user_name, "+
"SUM(total_tokens) as tokens, COUNT(*) as requests").
Where("source <> ?", UsageSourceAPIKey).
Group("source, user_id, user_name").
Order("tokens DESC")
byUserSourceQ = applyFilters(byUserSourceQ, userID, apiKeyID, since, includeLegacy)
var byUserSourceRows []UserSourceTotal
if scanErr := byUserSourceQ.Scan(&byUserSourceRows).Error; scanErr != nil {
xlog.Warn("computeSourceTotals: by-user-source Scan failed", "error", scanErr)
} else {
totals.ByUserSource = byUserSourceRows
}
}
return totals
}
// parseLastUsedString converts the textual MAX(created_at) value returned by
// SQLite (or any driver that surfaces the timestamp as a string) into a
// time.Time. Returns the zero time on parse failure.
func parseLastUsedString(s string) time.Time {
if s == "" {
return time.Time{}
}
// GORM's SQLite driver emits Go's default time formatting. Try the formats
// it commonly produces, falling back to RFC3339Nano.
layouts := []string{
"2006-01-02 15:04:05.999999999 -0700 MST",
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05",
time.RFC3339Nano,
time.RFC3339,
}
for _, layout := range layouts {
if t, err := time.Parse(layout, s); err == nil {
return t
}
}
xlog.Warn("parseLastUsedString: unrecognised format", "value", s)
return time.Time{}
}
// GetAllUsageBySource is the admin variant of GetUserUsageBySource.
// Optional filters: userID and apiKeyID. Legacy is included.
// truncated == true iff the per-key roll-up was capped at maxKeyTotals.
func GetAllUsageBySource(db *gorm.DB, period, userID, apiKeyID string) ([]UsageBucket, SourceTotals, bool, error) {
sqlite := isSQLiteDB(db)
since, dateFmt := periodToWindow(period, sqlite)
bucketExpr := fmt.Sprintf("%s as bucket", dateFmt)
query := db.Model(&UsageRecord{}).
Select(bucketExpr+", source, COALESCE(api_key_id, '') as api_key_id, api_key_name, "+
"user_id, user_name, "+
"SUM(prompt_tokens) as prompt_tokens, "+
"SUM(completion_tokens) as completion_tokens, "+
"SUM(total_tokens) as total_tokens, "+
"COUNT(*) as request_count").
Group("bucket, source, api_key_id, api_key_name, user_id, user_name").
Order("bucket ASC")
query = applyFilters(query, userID, apiKeyID, since, true)
var buckets []UsageBucket
if err := query.Find(&buckets).Error; err != nil {
return nil, SourceTotals{}, false, err
}
totals := computeSourceTotals(db, userID, apiKeyID, since, true)
// Count distinct api_key_ids matching the filters. If > maxKeyTotals,
// the by_key slice was capped and we signal truncation to the caller.
truncated := false
var distinct int64
countQ := applyFilters(
db.Model(&UsageRecord{}).
Distinct("api_key_id").
Where("api_key_id IS NOT NULL AND api_key_id <> ''"),
userID, apiKeyID, since, true,
)
if err := countQ.Count(&distinct).Error; err != nil {
xlog.Warn("GetAllUsageBySource: distinct api_key_id count failed", "error", err)
} else {
truncated = distinct > maxKeyTotals
}
return buckets, totals, truncated, nil
}
func applyFilters(q *gorm.DB, userID, apiKeyID string, since time.Time, includeLegacy bool) *gorm.DB {
if userID != "" {
q = q.Where("user_id = ?", userID)
}
if apiKeyID != "" {
q = q.Where("api_key_id = ?", apiKeyID)
}
if !since.IsZero() {
q = q.Where("created_at >= ?", since)
}
if !includeLegacy {
q = q.Where("source <> ?", UsageSourceLegacy)
}
return q
}

View File

@@ -3,11 +3,13 @@
package auth_test
import (
"fmt"
"time"
"github.com/mudler/LocalAI/core/http/auth"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gorm.io/gorm"
)
var _ = Describe("Usage", func() {
@@ -158,4 +160,275 @@ var _ = Describe("Usage", func() {
}
})
})
Describe("Usage source backfill", func() {
It("backfills 'web' for pre-feature rows", func() {
db := testDB()
rawDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
_, err = rawDB.Exec(
`INSERT INTO usage_records (user_id, source, model, created_at, total_tokens, prompt_tokens, completion_tokens, duration) VALUES (?, '', ?, ?, 0, 0, 0, 0)`,
"user-x", "gpt-4", time.Now())
Expect(err).ToNot(HaveOccurred())
Expect(auth.BackfillUsageSource(db)).To(Succeed())
var loaded auth.UsageRecord
Expect(db.Where("user_id = ?", "user-x").First(&loaded).Error).To(Succeed())
Expect(loaded.Source).To(Equal(auth.UsageSourceWeb))
})
It("backfills 'legacy' for pre-feature rows with legacy-api-key user_id", func() {
db := testDB()
rawDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
_, err = rawDB.Exec(
`INSERT INTO usage_records (user_id, source, model, created_at, total_tokens, prompt_tokens, completion_tokens, duration) VALUES (?, '', ?, ?, 0, 0, 0, 0)`,
"legacy-api-key", "gpt-4", time.Now())
Expect(err).ToNot(HaveOccurred())
Expect(auth.BackfillUsageSource(db)).To(Succeed())
var loaded auth.UsageRecord
Expect(db.Where("user_id = ?", "legacy-api-key").First(&loaded).Error).To(Succeed())
Expect(loaded.Source).To(Equal(auth.UsageSourceLegacy))
})
It("is idempotent on re-run", func() {
db := testDB()
Expect(auth.BackfillUsageSource(db)).To(Succeed())
Expect(auth.BackfillUsageSource(db)).To(Succeed())
})
})
Describe("UsageRecord with source fields", func() {
It("persists Source, APIKeyID, APIKeyName", func() {
db := testDB()
keyID := "key-uuid-1"
record := &auth.UsageRecord{
UserID: "user-1",
UserName: "Test User",
Source: auth.UsageSourceAPIKey,
APIKeyID: &keyID,
APIKeyName: "ci-runner",
Model: "gpt-4",
Endpoint: "/v1/chat/completions",
TotalTokens: 150,
CreatedAt: time.Now(),
}
Expect(auth.RecordUsage(db, record)).To(Succeed())
var loaded auth.UsageRecord
Expect(db.First(&loaded, record.ID).Error).To(Succeed())
Expect(loaded.Source).To(Equal(auth.UsageSourceAPIKey))
Expect(loaded.APIKeyID).ToNot(BeNil())
Expect(*loaded.APIKeyID).To(Equal("key-uuid-1"))
Expect(loaded.APIKeyName).To(Equal("ci-runner"))
})
It("allows nil APIKeyID for web/legacy sources", func() {
db := testDB()
record := &auth.UsageRecord{
UserID: "user-1",
Source: auth.UsageSourceWeb,
Model: "gpt-4",
CreatedAt: time.Now(),
}
Expect(auth.RecordUsage(db, record)).To(Succeed())
var loaded auth.UsageRecord
Expect(db.First(&loaded, record.ID).Error).To(Succeed())
Expect(loaded.Source).To(Equal(auth.UsageSourceWeb))
Expect(loaded.APIKeyID).To(BeNil())
Expect(loaded.APIKeyName).To(BeEmpty())
})
})
Describe("GetUserUsageBySource", func() {
insert := func(db *gorm.DB, userID, source, keyID, keyName string, tokens int64, when time.Time) {
rec := &auth.UsageRecord{
UserID: userID,
Source: source,
Model: "gpt-4",
TotalTokens: tokens,
CreatedAt: when,
}
if keyID != "" {
rec.APIKeyID = &keyID
rec.APIKeyName = keyName
}
Expect(auth.RecordUsage(db, rec)).To(Succeed())
}
It("returns only the caller's rows, never legacy", func() {
db := testDB()
now := time.Now()
insert(db, "alice", auth.UsageSourceAPIKey, "k1", "ci", 100, now)
insert(db, "alice", auth.UsageSourceWeb, "", "", 50, now)
insert(db, "alice", auth.UsageSourceLegacy, "", "", 30, now)
insert(db, "bob", auth.UsageSourceAPIKey, "k2", "bobk", 90, now)
buckets, totals, err := auth.GetUserUsageBySource(db, "alice", "month")
Expect(err).ToNot(HaveOccurred())
for _, b := range buckets {
Expect(b.UserID).To(Or(BeEmpty(), Equal("alice")))
Expect(b.Source).ToNot(Equal(auth.UsageSourceLegacy))
}
Expect(totals.GrandTotal.Tokens).To(Equal(int64(150)))
Expect(totals.BySource[auth.UsageSourceAPIKey].Tokens).To(Equal(int64(100)))
Expect(totals.BySource[auth.UsageSourceWeb].Tokens).To(Equal(int64(50)))
_, hasLegacy := totals.BySource[auth.UsageSourceLegacy]
Expect(hasLegacy).To(BeFalse())
})
It("snapshots survive key deletion", func() {
db := testDB()
now := time.Now()
insert(db, "alice", auth.UsageSourceAPIKey, "deleted-key", "old-name", 42, now)
_, totals, err := auth.GetUserUsageBySource(db, "alice", "month")
Expect(err).ToNot(HaveOccurred())
Expect(totals.ByKey).To(HaveLen(1))
Expect(totals.ByKey[0].APIKeyName).To(Equal("old-name"))
Expect(totals.ByKey[0].APIKeyID).To(Equal("deleted-key"))
Expect(totals.ByKey[0].LastUsed).ToNot(BeZero())
Expect(totals.ByKey[0].LastUsed).To(BeTemporally("~", now, 2*time.Second))
})
})
Describe("GetAllUsageBySource", func() {
insert := func(db *gorm.DB, userID, source, keyID string, tokens int64) {
rec := &auth.UsageRecord{
UserID: userID,
Source: source,
Model: "gpt-4",
TotalTokens: tokens,
CreatedAt: time.Now(),
}
if keyID != "" {
rec.APIKeyID = &keyID
rec.APIKeyName = "name-" + keyID
}
Expect(auth.RecordUsage(db, rec)).To(Succeed())
}
It("includes legacy for admins", func() {
db := testDB()
insert(db, "alice", auth.UsageSourceAPIKey, "k1", 10)
insert(db, "legacy-api-key", auth.UsageSourceLegacy, "", 5)
_, totals, _, err := auth.GetAllUsageBySource(db, "month", "", "")
Expect(err).ToNot(HaveOccurred())
Expect(totals.BySource).To(HaveKey(auth.UsageSourceLegacy))
Expect(totals.BySource[auth.UsageSourceLegacy].Tokens).To(Equal(int64(5)))
})
It("filters by user_id AND api_key_id", func() {
db := testDB()
insert(db, "alice", auth.UsageSourceAPIKey, "k1", 10)
insert(db, "alice", auth.UsageSourceAPIKey, "k2", 20)
insert(db, "bob", auth.UsageSourceAPIKey, "k3", 30)
_, totals, _, err := auth.GetAllUsageBySource(db, "month", "alice", "k2")
Expect(err).ToNot(HaveOccurred())
Expect(totals.GrandTotal.Tokens).To(Equal(int64(20)))
})
It("sets truncated=true when by_key exceeds the cap", func() {
db := testDB()
for i := 0; i < 210; i++ {
insert(db, "alice", auth.UsageSourceAPIKey, fmt.Sprintf("key-%03d", i), int64(210-i))
}
_, totals, truncated, err := auth.GetAllUsageBySource(db, "month", "", "")
Expect(err).ToNot(HaveOccurred())
Expect(truncated).To(BeTrue())
Expect(totals.ByKey).To(HaveLen(200))
Expect(totals.ByKey[0].Tokens > totals.ByKey[199].Tokens).To(BeTrue())
})
// insertNamed records a row with explicit user_id, user_name, source,
// and optional api key snapshot. Used by the user-attribution tests
// below which the older insert helper can't express.
insertNamed := func(db *gorm.DB, userID, userName, source, keyID, keyName string, tokens int64) {
rec := &auth.UsageRecord{
UserID: userID,
UserName: userName,
Source: source,
Model: "gpt-4",
TotalTokens: tokens,
CreatedAt: time.Now(),
}
if keyID != "" {
rec.APIKeyID = &keyID
rec.APIKeyName = keyName
}
Expect(auth.RecordUsage(db, rec)).To(Succeed())
}
It("attributes each KeyTotal to its owner user", func() {
db := testDB()
insertNamed(db, "alice", "Alice", auth.UsageSourceAPIKey, "k1", "ci-runner", 100)
insertNamed(db, "bob", "Bob", auth.UsageSourceAPIKey, "k2", "lap", 50)
_, totals, _, err := auth.GetAllUsageBySource(db, "month", "", "")
Expect(err).ToNot(HaveOccurred())
Expect(totals.ByKey).To(HaveLen(2))
byID := map[string]auth.KeyTotal{}
for _, k := range totals.ByKey {
byID[k.APIKeyID] = k
}
Expect(byID["k1"].UserID).To(Equal("alice"))
Expect(byID["k1"].UserName).To(Equal("Alice"))
Expect(byID["k2"].UserID).To(Equal("bob"))
Expect(byID["k2"].UserName).To(Equal("Bob"))
})
It("breaks Web UI and legacy traffic out per user in by_user_source for admin", func() {
db := testDB()
// Alice and Bob both have Web UI traffic; a synthetic legacy user
// also contributes. ByUserSource should expose one row per
// (source, user) pair, never for source=apikey.
insertNamed(db, "alice", "Alice", auth.UsageSourceWeb, "", "", 30)
insertNamed(db, "bob", "Bob", auth.UsageSourceWeb, "", "", 70)
insertNamed(db, "legacy-api-key", "API Key User", auth.UsageSourceLegacy, "", "", 10)
insertNamed(db, "alice", "Alice", auth.UsageSourceAPIKey, "k1", "ci-runner", 5)
_, totals, _, err := auth.GetAllUsageBySource(db, "month", "", "")
Expect(err).ToNot(HaveOccurred())
Expect(totals.ByUserSource).ToNot(BeEmpty())
for _, r := range totals.ByUserSource {
Expect(r.Source).ToNot(Equal(auth.UsageSourceAPIKey))
}
webByUser := map[string]int64{}
legacyByUser := map[string]int64{}
for _, r := range totals.ByUserSource {
switch r.Source {
case auth.UsageSourceWeb:
webByUser[r.UserID] = r.Tokens
case auth.UsageSourceLegacy:
legacyByUser[r.UserID] = r.Tokens
}
}
Expect(webByUser["alice"]).To(Equal(int64(30)))
Expect(webByUser["bob"]).To(Equal(int64(70)))
Expect(legacyByUser["legacy-api-key"]).To(Equal(int64(10)))
})
It("does NOT populate by_user_source in the non-admin path", func() {
db := testDB()
insertNamed(db, "alice", "Alice", auth.UsageSourceWeb, "", "", 30)
_, totals, err := auth.GetUserUsageBySource(db, "alice", "month")
Expect(err).ToNot(HaveOccurred())
// Non-admin path uses includeLegacy=false, so by_user_source stays nil.
Expect(totals.ByUserSource).To(BeNil())
})
})
})

View File

@@ -16,8 +16,11 @@ import (
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
"gorm.io/gorm"
@@ -381,14 +384,24 @@ func ResumeNodeEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
}
}
// InstallBackendOnNodeEndpoint triggers backend installation on a worker node via NATS.
// InstallBackendOnNodeEndpoint triggers backend installation on a worker node.
// Async: enqueues a ManagementOp on the gallery service channel and returns a
// jobID immediately. The gallery service worker goroutine drives the actual
// install via DistributedBackendManager.InstallBackend, which honors the op's
// TargetNodeID to scope the fan-out to one node. The UI polls /api/backends/job/:uid
// for progress, mirroring /api/backends/install/:id.
//
// Backend can be either a gallery ID (resolved against BackendGalleries) or a
// direct URI install (URI + Name + optional Alias) same shape as the
// direct URI install (URI + Name + optional Alias) - same shape as the
// standalone /api/backends/install-external path, just scoped to one node.
func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.HandlerFunc {
//
// The legacy unloader argument is retained for signature symmetry with
// DeleteBackendOnNodeEndpoint / ListBackendsOnNodeEndpoint but is no longer
// used here - the async path goes through galleryService.
func InstallBackendOnNodeEndpoint(_ nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig) echo.HandlerFunc {
return func(c echo.Context) error {
if unloader == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "NATS not configured"))
if galleryService == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "gallery service not configured"))
}
nodeID := c.Param("id")
var req struct {
@@ -401,25 +414,65 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "invalid request body"))
}
// Either a gallery backend name or a direct URI must be supplied.
if req.Backend == "" && req.URI == "" {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "backend name or uri required"))
}
// Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`). The worker's fast path
// takes over if the backend is already running — upgrades go through
// the dedicated /api/backends/upgrade path on backend.upgrade.
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
jobUUID, err := uuid.NewUUID()
if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to generate job id"))
}
if !reply.Success {
xlog.Error("Backend install failed on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", reply.Error)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "backend installation failed"))
jobID := jobUUID.String()
// Cache key: for gallery installs, use the backend slug; for URI
// installs prefer the provided Name (falling back to URI). All keys
// are node-scoped so concurrent installs of the same backend on
// different nodes do not stomp each other in opcache.
backendKey := req.Backend
if backendKey == "" {
backendKey = req.Name
if backendKey == "" {
backendKey = req.URI
}
}
return c.JSON(http.StatusOK, map[string]string{"message": "backend installed"})
cacheKey := galleryop.NodeScopedKey(nodeID, backendKey)
opcache.SetBackend(cacheKey, jobID)
// Optional caller-supplied galleries override. Mirrors the standalone
// install path so an admin can point at a private gallery.
galleries := appConfig.BackendGalleries
if req.BackendGalleries != "" {
var custom []config.Gallery
if err := json.Unmarshal([]byte(req.BackendGalleries), &custom); err != nil {
xlog.Warn("Ignoring malformed backend_galleries override; falling back to configured galleries", "error", err, "nodeID", nodeID)
} else if len(custom) > 0 {
galleries = custom
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
op := galleryop.ManagementOp[gallery.GalleryBackend, any]{
ID: jobID,
GalleryElementName: req.Backend,
Galleries: galleries,
TargetNodeID: nodeID,
ExternalURI: req.URI,
ExternalName: req.Name,
ExternalAlias: req.Alias,
Context: ctx,
CancelFunc: cancelFunc,
}
galleryService.StoreCancellation(jobID, cancelFunc)
go func() {
galleryService.BackendGalleryChannel <- op
}()
xlog.Info("Node-scoped backend install dispatched", "node", nodeID, "backend", req.Backend, "uri", req.URI, "jobID", jobID)
return c.JSON(http.StatusAccepted, map[string]string{
"jobID": jobID,
"statusUrl": "/api/backends/job/" + jobID,
"message": "backend installation started",
})
}
}

View File

@@ -0,0 +1,123 @@
package localai_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/labstack/echo/v4"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// InstallBackendOnNodeEndpoint became async to stop blocking the browser on
// the 3-minute NATS reply timeout. These specs lock in the new contract:
// HTTP 202 with a jobID, a ManagementOp enqueued on the gallery channel, and
// an opcache entry keyed by NodeScopedKey so concurrent installs of the same
// backend on different nodes do not stomp each other.
var _ = Describe("InstallBackendOnNodeEndpoint async behavior", func() {
var (
e *echo.Echo
galleryService *galleryop.GalleryService
opcache *galleryop.OpCache
appCfg *config.ApplicationConfig
dispatched chan galleryop.ManagementOp[gallery.GalleryBackend, any]
done chan struct{}
drainExited chan struct{}
)
BeforeEach(func() {
e = echo.New()
appCfg = &config.ApplicationConfig{
BackendGalleries: []config.Gallery{{Name: "test-gallery", URL: "http://example.com"}},
}
galleryService = galleryop.NewGalleryService(appCfg, nil)
opcache = galleryop.NewOpCache(galleryService)
// Drain the gallery channel into a buffered side channel so the
// handler's `go func() { ch <- op }()` send does not block waiting
// for the real worker (which is not running in this unit test).
dispatched = make(chan galleryop.ManagementOp[gallery.GalleryBackend, any], 4)
done = make(chan struct{})
drainExited = make(chan struct{})
go func() {
defer close(drainExited)
for {
select {
case op := <-galleryService.BackendGalleryChannel:
dispatched <- op
case <-done:
return
}
}
}()
})
AfterEach(func() {
// Signal the drain goroutine to exit. We do NOT close
// BackendGalleryChannel: the handler's dispatch goroutine may still
// be pending (specs that don't Eventually-Receive), and a send on a
// closed channel panics. Signalling via `done` lets the drain
// goroutine return without touching the gallery channel.
close(done)
Eventually(drainExited, "2s").Should(BeClosed())
})
It("returns 202 with a jobID and dispatches a TargetNodeID-scoped op", func() {
body := `{"backend": "llama-cpp"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))
var resp map[string]any
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp["jobID"]).To(BeAssignableToTypeOf(""))
Expect(resp["jobID"].(string)).ToNot(BeEmpty())
Expect(resp["message"]).To(Equal("backend installation started"))
Eventually(dispatched, "2s").Should(Receive())
Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
Expect(opcache.IsBackendOp(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
})
It("returns 400 when neither backend nor uri is supplied", func() {
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(`{}`))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
})
It("accepts a direct URI install and uses the name as the cache key", func() {
body := `{"uri": "oci://example.com/custom-backend:v1", "name": "custom"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")
handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))
Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "custom"))).To(BeTrue())
})
})

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/labstack/echo/v4"
@@ -14,18 +15,37 @@ import (
const (
usageFlushInterval = 5 * time.Second
usageMaxPending = 5000
// usageMaxPending bounds the in-memory queue. Sized for bursty inference
// traffic on a self-hosted instance with a slow or unavailable DB.
usageMaxPending = 50000
)
// usageBatcher accumulates usage records and flushes them to the DB periodically.
type usageBatcher struct {
mu sync.Mutex
pending []*auth.UsageRecord
db *gorm.DB
mu sync.Mutex
pending []*auth.UsageRecord
db *gorm.DB
stop chan struct{}
done chan struct{}
stopOnce sync.Once
}
// droppedRecords counts records discarded because the in-memory queue was full.
// Used to rate-limit the warn log so a sustained outage doesn't flood it.
var droppedRecords atomic.Uint64
func (b *usageBatcher) add(r *auth.UsageRecord) {
b.mu.Lock()
if len(b.pending) >= usageMaxPending {
b.mu.Unlock()
// Rate-limit: one warn per 1024 drops keeps the log readable.
n := droppedRecords.Add(1)
if n&1023 == 1 {
xlog.Warn("usage batcher full, dropping record",
"cap", usageMaxPending, "total_dropped", n)
}
return
}
b.pending = append(b.pending, r)
b.mu.Unlock()
}
@@ -42,31 +62,102 @@ func (b *usageBatcher) flush() {
if err := b.db.Create(&batch).Error; err != nil {
xlog.Error("Failed to flush usage batch", "count", len(batch), "error", err)
// Re-queue failed records with a cap to avoid unbounded growth
// Cap-aware re-queue: prepend as much of the failed batch as fits
// alongside any records added concurrently with the failed write.
b.mu.Lock()
if len(b.pending) < usageMaxPending {
b.pending = append(batch, b.pending...)
room := usageMaxPending - len(b.pending)
if room > 0 {
if room > len(batch) {
room = len(batch)
}
b.pending = append(batch[:room], b.pending...)
}
b.mu.Unlock()
}
}
var batcher *usageBatcher
func (b *usageBatcher) run() {
defer close(b.done)
ticker := time.NewTicker(usageFlushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.flush()
case <-b.stop:
b.flush() // final drain
return
}
}
}
func (b *usageBatcher) shutdown() {
b.stopOnce.Do(func() {
close(b.stop)
<-b.done
})
}
// The package-level batcher is guarded by batcherMu so Init / Shutdown cycles
// (the test pattern) don't race against UsageMiddleware reads.
var (
batcherMu sync.RWMutex
batcher *usageBatcher
)
func currentBatcher() *usageBatcher {
batcherMu.RLock()
defer batcherMu.RUnlock()
return batcher
}
// InitUsageRecorder starts a background goroutine that periodically flushes
// accumulated usage records to the database.
// accumulated usage records to the database. Calling it more than once
// shuts down the previous batcher first so its goroutine doesn't leak.
func InitUsageRecorder(db *gorm.DB) {
if db == nil {
return
}
batcher = &usageBatcher{db: db}
go func() {
ticker := time.NewTicker(usageFlushInterval)
defer ticker.Stop()
for range ticker.C {
batcher.flush()
}
}()
batcherMu.Lock()
old := batcher
batcher = nil
batcherMu.Unlock()
if old != nil {
old.shutdown()
}
b := &usageBatcher{
db: db,
stop: make(chan struct{}),
done: make(chan struct{}),
}
batcherMu.Lock()
batcher = b
batcherMu.Unlock()
go b.run()
}
// ShutdownUsageRecorder stops the background flusher and synchronously drains
// pending records once. Safe to call multiple times. Not yet wired into the
// application lifecycle; intended for graceful process exit and tests.
func ShutdownUsageRecorder() {
batcherMu.Lock()
b := batcher
batcher = nil
batcherMu.Unlock()
if b != nil {
b.shutdown()
}
}
// FlushNow synchronously flushes any pending usage records. Intended for tests
// that need deterministic behaviour without waiting for the ticker.
func FlushNow() {
if b := currentBatcher(); b != nil {
b.flush()
}
}
// usageResponseBody is the minimal structure we need from the response JSON.
@@ -84,7 +175,8 @@ type usageResponseBody struct {
func UsageMiddleware(db *gorm.DB) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if db == nil || batcher == nil {
b := currentBatcher()
if db == nil || b == nil {
return next(c)
}
@@ -149,9 +241,17 @@ func UsageMiddleware(db *gorm.DB) echo.MiddlewareFunc {
return handlerErr
}
source := auth.GetSource(c)
if source == "" {
// Auth disabled or unrecognised path: classify as web so the row is still
// bucketable rather than silently dropped from per-source aggregates.
source = auth.UsageSourceWeb
}
record := &auth.UsageRecord{
UserID: user.ID,
UserName: user.Name,
Source: source,
Model: resp.Model,
Endpoint: c.Request().URL.Path,
PromptTokens: resp.Usage.PromptTokens,
@@ -161,7 +261,13 @@ func UsageMiddleware(db *gorm.DB) echo.MiddlewareFunc {
CreatedAt: startTime,
}
batcher.add(record)
if key := auth.GetAPIKey(c); key != nil {
id := key.ID
record.APIKeyID = &id
record.APIKeyName = key.Name
}
b.add(record)
return handlerErr
}

View File

@@ -0,0 +1,140 @@
//go:build auth
package middleware_test
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/http/middleware"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gorm.io/gorm"
)
// testAuthDB returns a fresh in-memory SQLite auth DB.
func testAuthDB() *gorm.DB {
db, err := auth.InitDB(":memory:")
if err != nil {
panic(err)
}
return db
}
var _ = Describe("UsageMiddleware", func() {
var (
e *echo.Echo
db *gorm.DB
)
BeforeEach(func() {
db = testAuthDB()
e = echo.New()
middleware.InitUsageRecorder(db)
})
AfterEach(func() {
middleware.ShutdownUsageRecorder()
})
okHandler := func(c echo.Context) error {
body, _ := json.Marshal(map[string]any{
"model": "gpt-4",
"usage": map[string]int{
"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15,
},
})
c.Response().Header().Set("Content-Type", "application/json")
c.Response().WriteHeader(http.StatusOK)
_, _ = c.Response().Write(body)
return nil
}
// FlushNow drains pending records synchronously, replacing the 6s sleep
// that was previously needed to wait for the batcher's ticker.
flush := middleware.FlushNow
It("records source=web when auth_source is web", func() {
e.POST("/v1/chat/completions", okHandler, func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.Set("auth_user", &auth.User{ID: "alice", Name: "Alice"})
c.Set("auth_source", auth.UsageSourceWeb)
return next(c)
}
}, middleware.UsageMiddleware(db))
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewReader([]byte(`{}`)))
e.ServeHTTP(httptest.NewRecorder(), req)
flush()
var rec auth.UsageRecord
Expect(db.Where("user_id = ?", "alice").First(&rec).Error).To(Succeed())
Expect(rec.Source).To(Equal(auth.UsageSourceWeb))
Expect(rec.APIKeyID).To(BeNil())
Expect(rec.APIKeyName).To(BeEmpty())
})
It("records source=apikey with snapshotted name when auth_apikey is set", func() {
e.POST("/v1/chat/completions", okHandler, func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.Set("auth_user", &auth.User{ID: "alice", Name: "Alice"})
c.Set("auth_source", auth.UsageSourceAPIKey)
c.Set("auth_apikey", &auth.UserAPIKey{ID: "key-1", Name: "ci-runner"})
return next(c)
}
}, middleware.UsageMiddleware(db))
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewReader([]byte(`{}`)))
e.ServeHTTP(httptest.NewRecorder(), req)
flush()
var rec auth.UsageRecord
Expect(db.Where("user_id = ?", "alice").First(&rec).Error).To(Succeed())
Expect(rec.Source).To(Equal(auth.UsageSourceAPIKey))
Expect(rec.APIKeyID).ToNot(BeNil())
Expect(*rec.APIKeyID).To(Equal("key-1"))
Expect(rec.APIKeyName).To(Equal("ci-runner"))
})
It("FlushNow drains pending records synchronously", func() {
e.POST("/v1/chat/completions", okHandler, func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.Set("auth_user", &auth.User{ID: "carol", Name: "Carol"})
c.Set("auth_source", auth.UsageSourceWeb)
return next(c)
}
}, middleware.UsageMiddleware(db))
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewReader([]byte(`{}`)))
e.ServeHTTP(httptest.NewRecorder(), req)
// No sleep: FlushNow should drain immediately.
middleware.FlushNow()
var rec auth.UsageRecord
Expect(db.Where("user_id = ?", "carol").First(&rec).Error).To(Succeed())
Expect(rec.Source).To(Equal(auth.UsageSourceWeb))
})
It("falls back to source=web when auth_source is empty", func() {
e.POST("/v1/chat/completions", okHandler, func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.Set("auth_user", &auth.User{ID: "alice", Name: "Alice"})
// no auth_source set
return next(c)
}
}, middleware.UsageMiddleware(db))
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewReader([]byte(`{}`)))
e.ServeHTTP(httptest.NewRecorder(), req)
flush()
var rec auth.UsageRecord
Expect(db.Where("user_id = ?", "alice").First(&rec).Error).To(Succeed())
Expect(rec.Source).To(Equal(auth.UsageSourceWeb))
})
})

View File

@@ -53,7 +53,30 @@
},
"usage": {
"title": "Usage",
"subtitle": "API token usage statistics"
"subtitle": "API token usage statistics",
"sources": {
"tab": "Sources",
"mixTitle": "Source mix",
"ribbonAria": "{{apikey}}% API keys, {{web}}% Web UI, {{legacy}}% Legacy",
"topSources": "Top sources over time",
"searchPlaceholder": "Search by name or prefix",
"sortBy": "Sort",
"sortTokens": "Tokens",
"sortRequests": "Requests",
"sortLastUsed": "Last used",
"sortName": "Name",
"sortUser": "User",
"webUI": "Web UI",
"legacy": "Legacy",
"revoked": "revoked",
"filteredTo": "Filtered to: {{name}}",
"clearFilter": "Clear filter",
"other": "Other ({{count}})",
"noTrafficShort": "No requests in this period.",
"noKeysYet": "Once requests come in, you'll see them broken down here.",
"createKey": "Create your first API key",
"truncatedWarning": "Showing top 200 keys. Apply a filter to narrow further."
}
},
"explorer": {
"title": "Explorer",

View File

@@ -1,7 +1,7 @@
import { useState, useMemo, useEffect, useRef } from 'react'
import Modal from './Modal'
import SearchableSelect from './SearchableSelect'
import { nodesApi } from '../utils/api'
import { nodesApi, backendsApi } from '../utils/api'
// NodeInstallPicker is the single multi-node install surface used both from
// the Backends gallery split-button and from the "Install on more nodes" `+`
@@ -240,6 +240,37 @@ export default function NodeInstallPicker({
}
const clearSelection = () => setSelected(new Set())
// pollJob resolves with { done: true, error?: string } once a single job
// completes, fails, or is cancelled. Bounded by a hard wall-clock cap so a
// stuck worker eventually surfaces in the UI as "Failed" instead of
// spinning forever.
const pollJob = (jobID) => new Promise((resolve) => {
const POLL_INTERVAL_MS = 1500
const HARD_CAP_MS = 6 * 60 * 1000 // 6 min - generous for a fresh worker download
const startedAt = Date.now()
const tick = async () => {
try {
const status = await backendsApi.getJob(jobID)
if (status?.completed) { resolve({ done: true }); return }
if (status?.error) { resolve({ done: true, error: status.error }); return }
if (status?.processed && !status?.completed) {
resolve({ done: true, error: status.error || 'install did not complete' })
return
}
} catch (err) {
resolve({ done: true, error: err?.message || 'polling failed' })
return
}
if (Date.now() - startedAt > HARD_CAP_MS) {
resolve({ done: true, error: 'timed out waiting for install to finish' })
return
}
setTimeout(tick, POLL_INTERVAL_MS)
}
tick()
})
const submit = async () => {
if (selected.size === 0 || submitting) return
if (counts.overrides > 0 && !showMismatchConfirm) {
@@ -255,38 +286,68 @@ export default function NodeInstallPicker({
return next
})
const results = await Promise.allSettled(ids.map(id =>
// Phase 1: dispatch all installs in parallel. Each POST returns immediately
// with { jobID } now that the handler is async.
const dispatchResults = await Promise.allSettled(ids.map(id =>
nodesApi.installBackend(id, effectiveBackendName)
.then(r => ({ id, ok: true, message: r?.message }))
.catch(err => ({ id, ok: false, error: err?.message || 'install failed' }))
.then(r => ({ id, ok: true, jobID: r?.jobID }))
.catch(err => ({ id, ok: false, error: err?.message || 'dispatch failed' }))
))
let successCount = 0, failCount = 0
setPerNode(prev => {
const next = { ...prev }
for (const r of results) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok) {
next[v.id] = { status: 'done' }
successCount++
} else {
next[v.id] = { status: 'error', error: v.error }
failCount++
}
// Classify dispatch results synchronously OUTSIDE the setter. React may
// invoke a functional state updater more than once (StrictMode dev double
// invoke, concurrent rendering replay): building the jobs array inside
// the closure would duplicate entries and re-poll the same job.
const jobs = []
const dispatchPatch = {}
for (const r of dispatchResults) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok && v.jobID) {
dispatchPatch[v.id] = { status: 'installing', jobID: v.jobID }
jobs.push({ nodeID: v.id, jobID: v.jobID })
} else {
dispatchPatch[v.id] = { status: 'error', error: v.error || 'dispatch failed' }
}
return next
}
setPerNode(prev => ({ ...prev, ...dispatchPatch }))
// Phase 2: poll each job. Promise.all resolves when the last job settles;
// intermediate updates flip per-row state via the setPerNode inside pollJob.
await Promise.all(jobs.map(async ({ nodeID, jobID }) => {
const result = await pollJob(jobID)
setPerNode(prev => {
const next = { ...prev }
if (result.error) {
next[nodeID] = { status: 'error', error: result.error, jobID }
} else {
next[nodeID] = { status: 'done', jobID }
}
return next
})
}))
// Phase 3: summary toast + onComplete. Read latest state via functional setter.
let successCount = 0
let failCount = 0
setPerNode(prev => {
for (const v of Object.values(prev)) {
if (v.status === 'done') successCount++
else if (v.status === 'error') failCount++
}
return prev
})
setSubmitting(false)
if (successCount > 0 && onComplete) onComplete()
if (failCount === 0) {
if (failCount === 0 && successCount > 0) {
addToast?.(`Installed on ${successCount} node${successCount === 1 ? '' : 's'}`, 'success')
setTimeout(() => onClose?.(), 800)
} else if (successCount === 0) {
} else if (successCount === 0 && failCount > 0) {
addToast?.(`Install failed on all ${failCount} node${failCount === 1 ? '' : 's'}`, 'error')
} else {
} else if (successCount > 0 && failCount > 0) {
addToast?.(`Installed on ${successCount}, failed on ${failCount}`, 'warning')
}
}
@@ -297,32 +358,58 @@ export default function NodeInstallPicker({
.map(([id]) => id)
if (failedIds.length === 0) return
setSelected(new Set(failedIds))
// Replace state for failed rows so they show "installing" again, not stale errors.
setPerNode(prev => {
const next = { ...prev }
failedIds.forEach(id => { next[id] = { status: 'installing' } })
return next
})
setSubmitting(true)
const results = await Promise.allSettled(failedIds.map(id =>
const dispatchResults = await Promise.allSettled(failedIds.map(id =>
nodesApi.installBackend(id, effectiveBackendName)
.then(r => ({ id, ok: true, message: r?.message }))
.catch(err => ({ id, ok: false, error: err?.message || 'install failed' }))
.then(r => ({ id, ok: true, jobID: r?.jobID }))
.catch(err => ({ id, ok: false, error: err?.message || 'dispatch failed' }))
))
// Same precaution as in submit(): classify outside the functional setter
// so a replayed updater can't push duplicate jobs into the polling list.
const jobs = []
const dispatchPatch = {}
for (const r of dispatchResults) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok && v.jobID) {
dispatchPatch[v.id] = { status: 'installing', jobID: v.jobID }
jobs.push({ nodeID: v.id, jobID: v.jobID })
} else {
dispatchPatch[v.id] = { status: 'error', error: v.error || 'dispatch failed' }
}
}
setPerNode(prev => ({ ...prev, ...dispatchPatch }))
await Promise.all(jobs.map(async ({ nodeID, jobID }) => {
const result = await pollJob(jobID)
setPerNode(prev => {
const next = { ...prev }
if (result.error) next[nodeID] = { status: 'error', error: result.error, jobID }
else next[nodeID] = { status: 'done', jobID }
return next
})
}))
setSubmitting(false)
let successCount = 0, failCount = 0
setPerNode(prev => {
const next = { ...prev }
for (const r of results) {
if (r.status !== 'fulfilled') continue
const v = r.value
if (v.ok) { next[v.id] = { status: 'done' }; successCount++ }
else { next[v.id] = { status: 'error', error: v.error }; failCount++ }
for (const id of failedIds) {
const v = prev[id]
if (v?.status === 'done') successCount++
else if (v?.status === 'error') failCount++
}
return next
return prev
})
setSubmitting(false)
if (successCount > 0 && onComplete) onComplete()
if (failCount === 0) {
if (failCount === 0 && successCount > 0) {
addToast?.(`Installed on ${successCount} node${successCount === 1 ? '' : 's'}`, 'success')
setTimeout(() => onClose?.(), 800)
}

View File

@@ -179,16 +179,19 @@ export default function Backends() {
// Install a single gallery backend on a specific node, used in target-node
// mode (the URL has ?target=<node-id> set from the Nodes page entry point).
// The handler is async - we dispatch and let the global Operations panel
// surface progress; no need to await completion here.
const handleInstallOnTarget = async (id) => {
if (!targetNode) return
try {
await nodesApi.installBackend(targetNode.id, id)
addToast(`Installing ${id} on ${targetNode.name}`, 'info')
// Per-node install is request-reply, not part of the global jobs feed —
// refetch to reflect the new Nodes column state.
setTimeout(() => { fetchBackends(); refetchNodes() }, 600)
addToast(`Installing ${id} on ${targetNode.name}...`, 'info')
// The install runs async via the gallery job queue. Refetch shortly so
// the Nodes column reflects "installing" state; the Operations panel
// tracks the actual progress until completion.
setTimeout(() => { fetchBackends(); refetchNodes() }, 1200)
} catch (err) {
addToast(`Install failed on ${targetNode.name}: ${err.message}`, 'error')
addToast(`Install dispatch failed on ${targetNode.name}: ${err.message}`, 'error')
}
}

View File

@@ -4,6 +4,7 @@ import { useTranslation } from 'react-i18next'
import { useAuth } from '../context/AuthContext'
import { apiUrl } from '../utils/basePath'
import LoadingSpinner from '../components/LoadingSpinner'
import SourcesTab from './Usage/SourcesTab'
const PERIODS = [
{ key: 'day', label: 'Day' },
@@ -724,23 +725,27 @@ export default function Usage() {
{p.label}
</button>
))}
<div style={{ width: 1, height: 20, background: 'var(--color-border-subtle)', margin: '0 var(--spacing-xs)' }} />
<button
className={`btn btn-sm ${activeTab === 'models' ? 'btn-primary' : 'btn-secondary'}`}
onClick={() => setActiveTab('models')}
>
<i className="fas fa-cube" style={{ fontSize: '0.7rem' }} /> Models
</button>
{isAdmin && (
<>
<div style={{ width: 1, height: 20, background: 'var(--color-border-subtle)', margin: '0 var(--spacing-xs)' }} />
<button
className={`btn btn-sm ${activeTab === 'models' ? 'btn-primary' : 'btn-secondary'}`}
onClick={() => setActiveTab('models')}
>
<i className="fas fa-cube" style={{ fontSize: '0.7rem' }} /> Models
</button>
<button
className={`btn btn-sm ${activeTab === 'users' ? 'btn-primary' : 'btn-secondary'}`}
onClick={() => setActiveTab('users')}
>
<i className="fas fa-users" style={{ fontSize: '0.7rem' }} /> Users
</button>
</>
<button
className={`btn btn-sm ${activeTab === 'users' ? 'btn-primary' : 'btn-secondary'}`}
onClick={() => setActiveTab('users')}
>
<i className="fas fa-users" style={{ fontSize: '0.7rem' }} /> Users
</button>
)}
<button
className={`btn btn-sm ${activeTab === 'sources' ? 'btn-primary' : 'btn-secondary'}`}
onClick={() => setActiveTab('sources')}
>
<i className="fas fa-key" style={{ fontSize: '0.7rem' }} /> {t('usage.sources.tab')}
</button>
<div style={{ flex: 1 }} />
<button className="btn btn-secondary btn-sm" onClick={fetchUsage} disabled={loading} style={{ gap: 4 }}>
<i className={`fas fa-rotate${loading ? ' fa-spin' : ''}`} /> Refresh
@@ -884,6 +889,10 @@ export default function Usage() {
</div>
)
)}
{activeTab === 'sources' && (
<SourcesTab period={period} adminUserId={selectedUserId} />
)}
</>
)}
</div>

View File

@@ -0,0 +1,83 @@
import { useTranslation } from 'react-i18next'
const SEGMENT_COLORS = {
apikey: 'var(--color-primary)',
web: 'var(--color-info, #3b82f6)',
legacy: 'var(--color-warning, #f59e0b)',
}
// SourceMixRibbon renders one segmented horizontal bar showing the share of
// tokens by source class (apikey / web / legacy). Clicking a segment invokes
// onSelectSourceClass with the segment key so the parent can filter the view.
//
// Props:
// bySource: { apikey?: {tokens, requests}, web?: {...}, legacy?: {...} }
// keyCount: number of distinct API keys in the dataset (for the legend)
// onSelectSourceClass: (cls: 'apikey'|'web'|'legacy') => void (optional)
export default function SourceMixRibbon({ bySource = {}, keyCount = 0, onSelectSourceClass }) {
const { t } = useTranslation('admin')
const apikey = (bySource.apikey?.tokens) || 0
const web = (bySource.web?.tokens) || 0
const legacy = (bySource.legacy?.tokens) || 0
const total = apikey + web + legacy || 1
const pct = (n) => Math.round((n / total) * 100)
const apiPct = pct(apikey)
const webPct = pct(web)
const legacyPct = pct(legacy)
const segments = [
{ key: 'apikey', label: `${apiPct}% API keys (${keyCount})`, pct: apiPct, color: SEGMENT_COLORS.apikey },
{ key: 'web', label: `${webPct}% ${t('usage.sources.webUI')}`, pct: webPct, color: SEGMENT_COLORS.web },
{ key: 'legacy', label: `${legacyPct}% ${t('usage.sources.legacy')}`, pct: legacyPct, color: SEGMENT_COLORS.legacy },
].filter((s) => s.pct > 0)
return (
<div
role="group"
aria-label={t('usage.sources.ribbonAria', { apikey: apiPct, web: webPct, legacy: legacyPct })}
style={{ display: 'flex', flexDirection: 'column', gap: 'var(--spacing-xs)' }}
>
<div style={{ fontSize: '0.875rem', fontWeight: 600, color: 'var(--color-text-primary)' }}>
{t('usage.sources.mixTitle')}
</div>
<div
style={{
display: 'flex',
height: 12,
borderRadius: 'var(--radius-sm)',
overflow: 'hidden',
border: '1px solid var(--color-border-subtle)',
}}
>
{segments.map((s) => (
<button
key={s.key}
type="button"
onClick={() => onSelectSourceClass?.(s.key)}
aria-label={s.label}
style={{
width: `${s.pct}%`,
background: s.color,
border: 'none',
padding: 0,
cursor: onSelectSourceClass ? 'pointer' : 'default',
}}
/>
))}
</div>
<div style={{ display: 'flex', flexWrap: 'wrap', gap: 'var(--spacing-sm)', fontSize: '0.75rem' }}>
{segments.map((s) => (
<span key={s.key} style={{ display: 'inline-flex', alignItems: 'center', gap: 6 }}>
<span
style={{ width: 10, height: 10, borderRadius: 2, background: s.color, display: 'inline-block' }}
aria-hidden
/>
{s.label}
</span>
))}
</div>
</div>
)
}

View File

@@ -0,0 +1,147 @@
import { useMemo } from 'react'
import { useTranslation } from 'react-i18next'
const TOP_N = 7
// Distinct, accessible-ish series colors that read on both light and dark themes.
const SERIES_COLORS = [
'var(--color-primary)',
'var(--color-success, #10b981)',
'var(--color-warning, #f59e0b)',
'var(--color-info, #3b82f6)',
'var(--color-danger, #ef4444)',
'#a855f7',
'#ec4899',
]
const OTHER_COLOR = 'var(--color-text-muted, #94a3b8)'
function identityFor(bucket) {
return bucket.api_key_id || bucket.source || 'unknown'
}
// buckets: UsageBucket[] from /api/auth/usage/sources (server-sorted ASC by bucket)
// selectedKey: 'web' | 'legacy' | api_key_id | null
// totals: SourceTotals (for the "Other (count)" legend label)
export default function SourceTimeChart({ buckets = [], selectedKey, totals }) {
const { t } = useTranslation('admin')
// Find the top-N identities by total tokens across the period.
const topIds = useMemo(() => {
const sums = new Map()
for (const b of buckets) {
const id = identityFor(b)
sums.set(id, (sums.get(id) || 0) + (b.total_tokens || 0))
}
return [...sums.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, TOP_N)
.map(([id]) => id)
}, [buckets])
const topSet = useMemo(() => new Set(topIds), [topIds])
// Resolve a display label for an identity (api_key_id -> snapshotted name, or source name).
const labelByIdentity = useMemo(() => {
const m = new Map()
for (const b of buckets) {
const id = identityFor(b)
if (m.has(id)) continue
if (b.source === 'web') { m.set(id, t('usage.sources.webUI')); continue }
if (b.source === 'legacy') { m.set(id, t('usage.sources.legacy')); continue }
m.set(id, b.api_key_name || b.api_key_id || id)
}
return m
}, [buckets, t])
// Build a dense per-bucket row, splitting top-N vs Other.
const series = useMemo(() => {
const byBucket = new Map()
for (const b of buckets) {
const id = identityFor(b)
const seriesId = topSet.has(id) ? id : '__other__'
const row = byBucket.get(b.bucket) || { bucket: b.bucket, total: 0 }
row[seriesId] = (row[seriesId] || 0) + (b.total_tokens || 0)
row.total += b.total_tokens || 0
byBucket.set(b.bucket, row)
}
return [...byBucket.values()]
}, [buckets, topSet])
const max = useMemo(
() => series.reduce((m, r) => Math.max(m, r.total), 0) || 1,
[series]
)
const seriesIds = [...topIds, '__other__']
const colorOf = (id) =>
id === '__other__'
? OTHER_COLOR
: SERIES_COLORS[topIds.indexOf(id) % SERIES_COLORS.length]
const labelOfId = (id) => {
if (id === '__other__') return null // computed inline (need count)
return labelByIdentity.get(id) || id
}
const otherCount = Math.max(0, (totals?.by_key?.length || 0) - TOP_N)
// SVG geometry: 24px wide per bar (2px gap), 100px tall, viewBox stretches with bar count.
const barWidth = 20
const barGap = 4
const slotWidth = barWidth + barGap
const height = 100
const width = Math.max(series.length * slotWidth, 200)
return (
<div style={{ display: 'flex', flexDirection: 'column', gap: 'var(--spacing-xs)' }}>
<div style={{ fontSize: '0.875rem', fontWeight: 600, color: 'var(--color-text-primary)' }}>
{t('usage.sources.topSources')}
</div>
<svg
viewBox={`0 0 ${width} ${height}`}
preserveAspectRatio="none"
style={{ width: '100%', height: 160, display: 'block' }}
aria-hidden
>
{series.map((row, i) => {
let y = height
return (
<g key={row.bucket} transform={`translate(${i * slotWidth}, 0)`}>
{seriesIds.map(id => {
const v = row[id] || 0
if (!v) return null
const h = (v / max) * height
y -= h
const dim = selectedKey && selectedKey !== id ? 0.25 : 1
const title = id === '__other__'
? t('usage.sources.other', { count: otherCount })
: labelOfId(id)
return (
<rect
key={id}
x={barGap / 2} y={y}
width={barWidth} height={h}
fill={colorOf(id)} opacity={dim}
>
<title>{`${row.bucket} - ${title}: ${v.toLocaleString()}`}</title>
</rect>
)
})}
</g>
)
})}
</svg>
<div style={{ display: 'flex', flexWrap: 'wrap', gap: 'var(--spacing-sm)', fontSize: '0.75rem' }}>
{seriesIds.map(id => (
<span key={id} style={{ display: 'inline-flex', alignItems: 'center', gap: 6 }}>
<span style={{ width: 10, height: 10, borderRadius: 2, background: colorOf(id), display: 'inline-block' }} aria-hidden />
{id === '__other__'
? t('usage.sources.other', { count: otherCount })
: labelOfId(id)}
</span>
))}
</div>
</div>
)
}

View File

@@ -0,0 +1,176 @@
import { useEffect, useState } from 'react'
import { useTranslation } from 'react-i18next'
import { usageApi, apiKeysApi } from '../../utils/api'
import { useAuth } from '../../context/AuthContext'
import LoadingSpinner from '../../components/LoadingSpinner'
import SourceMixRibbon from './SourceMixRibbon'
import SourcesTable from './SourcesTable'
import SourceTimeChart from './SourceTimeChart'
const EMPTY_DATA = {
buckets: [],
totals: { by_source: {}, by_key: [], grand_total: { tokens: 0, requests: 0 } },
truncated: false,
}
// Resolve a human label for the currently selected key (web/legacy class or api_key_id).
function labelForSelected(totals, selectedKey, t) {
if (!selectedKey) return ''
if (selectedKey === 'web') return t('usage.sources.webUI')
if (selectedKey === 'legacy') return t('usage.sources.legacy')
const row = (totals?.by_key || []).find(k => k.api_key_id === selectedKey)
return row ? (row.api_key_name || selectedKey) : selectedKey
}
// SourcesTab fetches and renders per-source / per-API-key usage breakdown.
// Task 10 replaces the raw JSON / list placeholders with SourceMixRibbon and
// SourcesTable. Task 11 will add the time chart and drill-in chip.
export default function SourcesTab({ period, adminUserId }) {
const { t } = useTranslation('admin')
const { isAdmin } = useAuth()
const [data, setData] = useState(EMPTY_DATA)
const [loading, setLoading] = useState(false)
const [error, setError] = useState(null)
const [selectedKey, setSelectedKey] = useState(null)
const [search, setSearch] = useState('')
const [sortKey, setSortKey] = useState('tokens')
// Pull the current set of API key ids so the table can mark unknown keys as
// revoked. null = "don't know yet" so the table won't dim live keys during
// the fetch or after a failure.
const [existingKeyIds, setExistingKeyIds] = useState(null)
useEffect(() => {
apiKeysApi
.list()
.then((resp) => {
const list = Array.isArray(resp) ? resp : (resp?.keys || [])
setExistingKeyIds(new Set(list.map((k) => k.id)))
})
.catch(() => { /* leave existingKeyIds null so revoked detection is skipped */ })
}, [])
useEffect(() => {
let cancelled = false
setLoading(true)
setError(null)
const p = isAdmin
? usageApi.getAdminSources(period, adminUserId)
: usageApi.getMySources(period)
p
.then((d) => { if (!cancelled) setData(d || EMPTY_DATA) })
.catch((e) => { if (!cancelled) setError(e) })
.finally(() => { if (!cancelled) setLoading(false) })
return () => { cancelled = true }
}, [isAdmin, period, adminUserId])
const totals = data.totals || EMPTY_DATA.totals
const buckets = data.buckets || EMPTY_DATA.buckets
const grandT = totals.grand_total || { tokens: 0, requests: 0 }
const truncated = data.truncated || false
const isEmpty = !loading && (grandT.tokens || 0) === 0 && (grandT.requests || 0) === 0
if (loading) {
return (
<div style={{ display: 'flex', justifyContent: 'center', padding: 'var(--spacing-xl)' }}>
<LoadingSpinner size="lg" />
</div>
)
}
if (error) {
return (
<div className="empty-state">
<div className="empty-state-icon"><i className="fas fa-triangle-exclamation" /></div>
<h2 className="empty-state-title">Failed to load</h2>
<p className="empty-state-text">{String(error.message || error)}</p>
</div>
)
}
if (isEmpty) {
return (
<div className="empty-state">
<div className="empty-state-icon"><i className="fas fa-key" /></div>
<h2 className="empty-state-title">{t('usage.sources.noTrafficShort')}</h2>
<p className="empty-state-text">{t('usage.sources.noKeysYet')}</p>
</div>
)
}
return (
<div style={{ display: 'flex', flexDirection: 'column', gap: 'var(--spacing-md)' }}>
<div className="card" style={{ padding: 'var(--spacing-md)' }}>
<SourceMixRibbon
bySource={totals.by_source}
keyCount={(totals.by_key || []).length}
onSelectSourceClass={(cls) => setSelectedKey(cls)}
/>
</div>
{selectedKey && (
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-xs)' }}>
<span
style={{
display: 'inline-flex',
alignItems: 'center',
gap: 'var(--spacing-xs)',
padding: 'calc(var(--spacing-xs) / 2) var(--spacing-sm)',
background: 'var(--color-bg-secondary)',
color: 'var(--color-text-primary)',
fontSize: '0.75rem',
borderRadius: 'var(--radius-sm)',
border: '1px solid var(--color-border-subtle)',
}}
>
<i className="fas fa-filter" style={{ fontSize: '0.6875rem', color: 'var(--color-text-muted)' }} aria-hidden />
{t('usage.sources.filteredTo', { name: labelForSelected(totals, selectedKey, t) })}
<button
type="button"
onClick={() => setSelectedKey(null)}
aria-label={t('usage.sources.clearFilter')}
style={{
appearance: 'none',
background: 'transparent',
border: 'none',
color: 'var(--color-text-muted)',
cursor: 'pointer',
padding: 0,
fontSize: '0.875rem',
lineHeight: 1,
}}
>
<i className="fas fa-xmark" />
</button>
</span>
</div>
)}
<div className="card" style={{ padding: 'var(--spacing-md)' }}>
<SourceTimeChart buckets={buckets} selectedKey={selectedKey} totals={totals} />
</div>
<div className="card" style={{ padding: 'var(--spacing-md)' }}>
<SourcesTable
totals={totals}
selectedKey={selectedKey}
onSelectKey={setSelectedKey}
search={search}
setSearch={setSearch}
sortKey={sortKey}
setSortKey={setSortKey}
existingKeyIds={existingKeyIds}
showUserColumn={isAdmin}
/>
</div>
{truncated && (
<div style={{ fontSize: '0.75rem', color: 'var(--color-warning)' }}>
{t('usage.sources.truncatedWarning')}
</div>
)}
</div>
)
}

View File

@@ -0,0 +1,245 @@
import { useMemo } from 'react'
import { useTranslation } from 'react-i18next'
const SORT_FNS = {
tokens: (a, b) => (b.tokens || 0) - (a.tokens || 0),
requests: (a, b) => (b.requests || 0) - (a.requests || 0),
last_used: (a, b) => new Date(b.last_used || 0).getTime() - new Date(a.last_used || 0).getTime(),
name: (a, b) => (a.name || '').localeCompare(b.name || ''),
user: (a, b) => (a.userName || '').localeCompare(b.userName || ''),
}
function formatTokens(n) {
if (!n) return '0'
if (n >= 1_000_000) return (n / 1_000_000).toFixed(1) + 'M'
if (n >= 1_000) return (n / 1_000).toFixed(1) + 'k'
return String(n)
}
function formatRelative(iso) {
if (!iso) return '-'
const t = new Date(iso).getTime()
if (Number.isNaN(t) || t <= 0) return '-'
const diff = Date.now() - t
if (diff < 60_000) return 'just now'
if (diff < 3_600_000) return Math.round(diff / 60_000) + 'm ago'
if (diff < 86_400_000) return Math.round(diff / 3_600_000) + 'h ago'
return Math.round(diff / 86_400_000) + 'd ago'
}
// SourcesTable is the searchable, sortable list of key totals plus pseudo-rows
// for the web UI and legacy (unkeyed) source classes. Clicking a row selects
// it; the parent decides what to do with the selection (the drill-in panel
// will be wired in Task 11).
//
// Props:
// totals: SourceTotals payload (from /api/auth/usage/sources)
// selectedKey: currently-selected row id (api_key_id | 'web' | 'legacy' | null)
// onSelectKey: (id|null) => void
// search / setSearch: free-text filter state lifted to the parent
// sortKey / setSortKey: sort column state lifted to the parent
// existingKeyIds: Set<string> of current (non-revoked) api key ids, or null
// when the parent hasn't yet learned which keys exist. Null suppresses the
// revoked badge entirely so live keys aren't dimmed during the fetch or
// after a failure.
// showUserColumn: render the User column. Admin views set this true so the
// reader can attribute each key (and each Web UI row) to its owner.
export default function SourcesTable({
totals,
selectedKey,
onSelectKey,
search,
setSearch,
sortKey,
setSortKey,
existingKeyIds = null,
showUserColumn = false,
}) {
const { t } = useTranslation('admin')
const rows = useMemo(() => {
const named = (totals?.by_key || []).map((k) => ({
kind: 'apikey',
id: k.api_key_id,
name: k.api_key_name || k.api_key_id,
userID: k.user_id || '',
userName: k.user_name || '',
prefix: '',
tokens: k.tokens,
requests: k.requests,
last_used: k.last_used,
revoked: existingKeyIds != null && !existingKeyIds.has(k.api_key_id),
}))
// Pseudo-rows for sources that don't have a named key identity.
// In admin view (showUserColumn=true), prefer the per-user breakdown
// from totals.by_user_source so each user's Web UI / legacy traffic
// gets its own row. Otherwise fall back to the global by_source aggregate.
let unkeyed = []
if (showUserColumn && Array.isArray(totals?.by_user_source) && totals.by_user_source.length > 0) {
unkeyed = totals.by_user_source.map((r) => ({
kind: r.source,
id: r.source + ':' + (r.user_id || ''),
name: r.source === 'legacy' ? t('usage.sources.legacy') : t('usage.sources.webUI'),
userID: r.user_id || '',
userName: r.user_name || '',
prefix: '-',
tokens: r.tokens,
requests: r.requests,
}))
} else {
if (totals?.by_source?.web) {
unkeyed.push({
kind: 'web',
id: 'web',
name: t('usage.sources.webUI'),
userID: '',
userName: '',
prefix: '-',
tokens: totals.by_source.web.tokens,
requests: totals.by_source.web.requests,
})
}
if (totals?.by_source?.legacy) {
unkeyed.push({
kind: 'legacy',
id: 'legacy',
name: t('usage.sources.legacy'),
userID: '',
userName: '',
prefix: '-',
tokens: totals.by_source.legacy.tokens,
requests: totals.by_source.legacy.requests,
})
}
}
return [...named, ...unkeyed]
}, [totals, existingKeyIds, showUserColumn, t])
const filtered = useMemo(() => {
const q = (search || '').trim().toLowerCase()
const list = q
? rows.filter((r) =>
(r.name || '').toLowerCase().includes(q) ||
(r.prefix || '').toLowerCase().includes(q) ||
(r.userName || '').toLowerCase().includes(q) ||
(r.userID || '').toLowerCase().includes(q)
)
: rows
return [...list].sort(SORT_FNS[sortKey] || SORT_FNS.tokens)
}, [rows, search, sortKey])
const iconFor = (kind) =>
kind === 'apikey' ? 'fas fa-key' : kind === 'web' ? 'fas fa-globe' : 'fas fa-gear'
return (
<div style={{ display: 'flex', flexDirection: 'column', gap: 'var(--spacing-sm)' }}>
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--spacing-sm)', flexWrap: 'wrap' }}>
<input
type="search"
value={search}
onChange={(e) => setSearch(e.target.value)}
placeholder={t('usage.sources.searchPlaceholder')}
aria-label={t('usage.sources.searchPlaceholder')}
style={{
flex: '1 1 12rem',
minWidth: 160,
padding: 'var(--spacing-xs) var(--spacing-sm)',
border: '1px solid var(--color-border-subtle)',
borderRadius: 'var(--radius-sm)',
background: 'var(--color-bg-primary)',
color: 'var(--color-text-primary)',
}}
/>
<label style={{ display: 'inline-flex', alignItems: 'center', gap: 6, fontSize: '0.75rem' }}>
{t('usage.sources.sortBy')}:
<select
value={sortKey}
onChange={(e) => setSortKey(e.target.value)}
style={{
padding: 'calc(var(--spacing-xs) / 2) var(--spacing-xs)',
border: '1px solid var(--color-border-subtle)',
borderRadius: 'var(--radius-sm)',
background: 'var(--color-bg-primary)',
color: 'var(--color-text-primary)',
}}
>
<option value="tokens">{t('usage.sources.sortTokens')}</option>
<option value="requests">{t('usage.sources.sortRequests')}</option>
<option value="last_used">{t('usage.sources.sortLastUsed')}</option>
<option value="name">{t('usage.sources.sortName')}</option>
{showUserColumn && <option value="user">{t('usage.sources.sortUser')}</option>}
</select>
</label>
</div>
<div className="table-container">
<table className="table">
<thead>
<tr>
<th>{t('usage.sources.sortName')}</th>
{showUserColumn && <th style={{ width: 180 }}>{t('usage.sources.sortUser')}</th>}
<th style={{ width: 110 }}>Prefix</th>
<th style={{ width: 100, textAlign: 'right' }}>{t('usage.sources.sortRequests')}</th>
<th style={{ width: 100, textAlign: 'right' }}>{t('usage.sources.sortTokens')}</th>
<th style={{ width: 120, textAlign: 'right' }}>{t('usage.sources.sortLastUsed')}</th>
</tr>
</thead>
<tbody>
{filtered.map((r) => {
const isSel = selectedKey === r.id
return (
<tr
key={r.id}
onClick={() => onSelectKey?.(isSel ? null : r.id)}
style={{
cursor: 'pointer',
background: isSel ? 'var(--color-bg-secondary)' : undefined,
opacity: r.revoked ? 0.5 : 1,
}}
>
<td>
<span style={{ display: 'inline-flex', alignItems: 'center', gap: 8 }}>
<i
className={iconFor(r.kind)}
style={{ color: 'var(--color-text-muted)', fontSize: '0.8125rem' }}
/>
<span>{r.name}</span>
{r.revoked && (
<span
style={{
fontSize: '0.6875rem',
textTransform: 'uppercase',
color: 'var(--color-text-muted)',
}}
>
({t('usage.sources.revoked')})
</span>
)}
</span>
</td>
{showUserColumn && (
<td style={{ color: 'var(--color-text-secondary)', fontSize: '0.8125rem' }}>
{r.userName || r.userID || '-'}
</td>
)}
<td style={{ color: 'var(--color-text-muted)', fontSize: '0.75rem' }}>{r.prefix || '-'}</td>
<td style={{ textAlign: 'right', fontFamily: 'var(--font-mono)' }}>
{Number(r.requests || 0).toLocaleString()}
</td>
<td style={{ textAlign: 'right', fontFamily: 'var(--font-mono)' }}>
{formatTokens(r.tokens || 0)}
</td>
<td style={{ textAlign: 'right', fontSize: '0.75rem', color: 'var(--color-text-muted)' }}>
{formatRelative(r.last_used)}
</td>
</tr>
)
})}
</tbody>
</table>
</div>
</div>
)
}

View File

@@ -422,6 +422,14 @@ export const usageApi = {
if (userId) url += `&user_id=${encodeURIComponent(userId)}`
return fetchJSON(url)
},
getMySources: (period) =>
fetchJSON(`/api/auth/usage/sources?period=${period || 'month'}`),
getAdminSources: (period, userId, apiKeyId) => {
let url = `/api/auth/admin/usage/sources?period=${period || 'month'}`
if (userId) url += `&user_id=${encodeURIComponent(userId)}`
if (apiKeyId) url += `&api_key_id=${encodeURIComponent(apiKeyId)}`
return fetchJSON(url)
},
getMyQuotas: () => fetchJSON('/api/auth/quota'),
}

View File

@@ -789,6 +789,30 @@ func RegisterAuthRoutes(e *echo.Echo, app *application.Application) {
})
})
// GET /api/auth/usage/sources - caller's per-source breakdown (no legacy)
e.GET("/api/auth/usage/sources", func(c echo.Context) error {
user := auth.GetUser(c)
if user == nil {
return c.JSON(http.StatusUnauthorized, map[string]string{"error": "not authenticated"})
}
period := c.QueryParam("period")
if period == "" {
period = "month"
}
buckets, totals, err := auth.GetUserUsageBySource(db, user.ID, period)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "failed to get usage"})
}
return c.JSON(http.StatusOK, map[string]any{
"buckets": buckets,
"totals": totals,
"truncated": false,
})
})
// Admin endpoints
adminMw := auth.RequireAdmin()
@@ -1104,6 +1128,27 @@ func RegisterAuthRoutes(e *echo.Echo, app *application.Application) {
})
}, adminMw)
// GET /api/auth/admin/usage/sources - all users' per-source breakdown (admin only)
e.GET("/api/auth/admin/usage/sources", func(c echo.Context) error {
period := c.QueryParam("period")
if period == "" {
period = "month"
}
userID := c.QueryParam("user_id")
apiKeyID := c.QueryParam("api_key_id")
buckets, totals, truncated, err := auth.GetAllUsageBySource(db, period, userID, apiKeyID)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "failed to get usage"})
}
return c.JSON(http.StatusOK, map[string]any{
"buckets": buckets,
"totals": totals,
"truncated": truncated,
})
}, adminMw)
// --- Invite management endpoints ---
// POST /api/auth/admin/invites - create invite (admin only)

View File

@@ -286,6 +286,45 @@ func newTestAuthApp(db *gorm.DB, appConfig *config.ApplicationConfig) *echo.Echo
return c.JSON(http.StatusOK, map[string]string{"message": "user deleted"})
}, adminMw)
// Mirror of production handler in routes/auth.go GET /api/auth/usage/sources.
// Keep this body in sync with the real handler; this test app cannot call
// RegisterAuthRoutes because it needs a *application.Application.
e.GET("/api/auth/usage/sources", func(c echo.Context) error {
user := auth.GetUser(c)
if user == nil {
return c.JSON(http.StatusUnauthorized, map[string]string{"error": "not authenticated"})
}
period := c.QueryParam("period")
if period == "" {
period = "month"
}
buckets, totals, err := auth.GetUserUsageBySource(db, user.ID, period)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "failed to get usage"})
}
return c.JSON(http.StatusOK, map[string]any{
"buckets": buckets, "totals": totals, "truncated": false,
})
})
// Mirror of production handler in routes/auth.go GET /api/auth/admin/usage/sources.
// Keep this body in sync with the real handler.
e.GET("/api/auth/admin/usage/sources", func(c echo.Context) error {
period := c.QueryParam("period")
if period == "" {
period = "month"
}
userID := c.QueryParam("user_id")
apiKeyID := c.QueryParam("api_key_id")
buckets, totals, truncated, err := auth.GetAllUsageBySource(db, period, userID, apiKeyID)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "failed to get usage"})
}
return c.JSON(http.StatusOK, map[string]any{
"buckets": buckets, "totals": totals, "truncated": truncated,
})
}, adminMw)
// Regular API endpoint for testing
e.POST("/v1/chat/completions", func(c echo.Context) error {
return c.String(http.StatusOK, "ok")
@@ -931,4 +970,110 @@ var _ = Describe("Auth Routes", Label("auth"), func() {
Expect(providers).To(ContainElement(auth.ProviderGitHub))
})
})
Describe("GET /api/auth/usage/sources", func() {
It("returns only the caller's data, never legacy", func() {
app := newTestAuthApp(db, appConfig)
alice := createRouteTestUser(db, "alice@example.com", auth.RoleUser)
aliceToken, err := auth.CreateSession(db, alice.ID, "")
Expect(err).ToNot(HaveOccurred())
keyID := "k-alice"
now := time.Now()
Expect(auth.RecordUsage(db, &auth.UsageRecord{
UserID: alice.ID, Source: auth.UsageSourceAPIKey,
APIKeyID: &keyID, APIKeyName: "alice-key",
Model: "gpt-4", TotalTokens: 100, CreatedAt: now,
})).To(Succeed())
Expect(auth.RecordUsage(db, &auth.UsageRecord{
UserID: alice.ID, Source: auth.UsageSourceWeb,
Model: "gpt-4", TotalTokens: 50, CreatedAt: now,
})).To(Succeed())
Expect(auth.RecordUsage(db, &auth.UsageRecord{
UserID: "legacy-api-key", Source: auth.UsageSourceLegacy,
Model: "gpt-4", TotalTokens: 30, CreatedAt: now,
})).To(Succeed())
rec := doAuthRequest(app, http.MethodGet, "/api/auth/usage/sources?period=month", nil, withSession(aliceToken))
Expect(rec.Code).To(Equal(http.StatusOK))
var resp struct {
Buckets []auth.UsageBucket `json:"buckets"`
Totals auth.SourceTotals `json:"totals"`
Truncated bool `json:"truncated"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
_, hasLegacy := resp.Totals.BySource[auth.UsageSourceLegacy]
Expect(hasLegacy).To(BeFalse())
Expect(resp.Totals.GrandTotal.Tokens).To(Equal(int64(150)))
Expect(resp.Truncated).To(BeFalse())
})
It("returns 401 when unauthenticated", func() {
app := newTestAuthApp(db, appConfig)
// Without a session cookie or bearer token, the global auth middleware
// should refuse the request before our handler runs.
rec := doAuthRequest(app, http.MethodGet, "/api/auth/usage/sources?period=month", nil)
Expect(rec.Code).To(Equal(http.StatusUnauthorized))
})
})
Describe("GET /api/auth/admin/usage/sources", func() {
It("returns 403 for non-admin", func() {
app := newTestAuthApp(db, appConfig)
alice := createRouteTestUser(db, "alice@example.com", auth.RoleUser)
aliceToken, _ := auth.CreateSession(db, alice.ID, "")
rec := doAuthRequest(app, http.MethodGet, "/api/auth/admin/usage/sources?period=month", nil, withSession(aliceToken))
Expect(rec.Code).To(Equal(http.StatusForbidden))
})
It("returns legacy bucket for admin and applies api_key_id filter", func() {
app := newTestAuthApp(db, appConfig)
admin := createRouteTestUser(db, "admin@example.com", auth.RoleAdmin)
adminToken, _ := auth.CreateSession(db, admin.ID, "")
k1 := "k1"
k2 := "k2"
now := time.Now()
Expect(auth.RecordUsage(db, &auth.UsageRecord{UserID: "alice", Source: auth.UsageSourceAPIKey, APIKeyID: &k1, APIKeyName: "ci", Model: "gpt-4", TotalTokens: 10, CreatedAt: now})).To(Succeed())
Expect(auth.RecordUsage(db, &auth.UsageRecord{UserID: "alice", Source: auth.UsageSourceAPIKey, APIKeyID: &k2, APIKeyName: "lap", Model: "gpt-4", TotalTokens: 20, CreatedAt: now})).To(Succeed())
Expect(auth.RecordUsage(db, &auth.UsageRecord{UserID: "legacy-api-key", Source: auth.UsageSourceLegacy, Model: "gpt-4", TotalTokens: 5, CreatedAt: now})).To(Succeed())
rec := doAuthRequest(app, http.MethodGet,
"/api/auth/admin/usage/sources?period=month&api_key_id=k2", nil, withSession(adminToken))
Expect(rec.Code).To(Equal(http.StatusOK))
var resp struct {
Totals auth.SourceTotals `json:"totals"`
Truncated bool `json:"truncated"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp.Totals.GrandTotal.Tokens).To(Equal(int64(20)))
})
It("includes legacy in by_source for admin with no filter", func() {
app := newTestAuthApp(db, appConfig)
admin := createRouteTestUser(db, "admin@example.com", auth.RoleAdmin)
adminToken, _ := auth.CreateSession(db, admin.ID, "")
now := time.Now()
Expect(auth.RecordUsage(db, &auth.UsageRecord{UserID: "legacy-api-key", Source: auth.UsageSourceLegacy, Model: "gpt-4", TotalTokens: 7, CreatedAt: now})).To(Succeed())
rec := doAuthRequest(app, http.MethodGet, "/api/auth/admin/usage/sources?period=month", nil, withSession(adminToken))
Expect(rec.Code).To(Equal(http.StatusOK))
var resp struct {
Totals auth.SourceTotals `json:"totals"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp.Totals.BySource).To(HaveKey(auth.UsageSourceLegacy))
Expect(resp.Totals.BySource[auth.UsageSourceLegacy].Tokens).To(Equal(int64(7)))
})
})
})

View File

@@ -6,7 +6,9 @@ import (
"strings"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"gorm.io/gorm"
)
@@ -53,7 +55,12 @@ func RegisterNodeSelfServiceRoutes(e *echo.Echo, registry *nodes.NodeRegistry, r
// RegisterNodeAdminRoutes registers /api/nodes/ endpoints used by admins
// (list, get, get models, drain, delete, approve, backend management). Protected by admin middleware.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
//
// galleryService/opcache/appConfig are threaded in for the async node-scoped
// backend install path (POST /:id/backends/install). That handler enqueues a
// ManagementOp on the gallery channel rather than blocking on a NATS reply, so
// the browser gets HTTP 202 + jobID immediately instead of waiting up to 3 minutes.
func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloader nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig, adminMw echo.MiddlewareFunc, authDB *gorm.DB, hmacSecret string, registrationToken string) {
if registry == nil {
return
}
@@ -78,7 +85,7 @@ func RegisterNodeAdminRoutes(e *echo.Echo, registry *nodes.NodeRegistry, unloade
// Backend management on workers
admin.GET("/:id/backends", localai.ListBackendsOnNodeEndpoint(unloader))
admin.POST("/:id/backends/install", localai.InstallBackendOnNodeEndpoint(unloader))
admin.POST("/:id/backends/install", localai.InstallBackendOnNodeEndpoint(unloader, galleryService, opcache, appConfig))
admin.POST("/:id/backends/delete", localai.DeleteBackendOnNodeEndpoint(unloader))
// Model management on workers

View File

@@ -214,6 +214,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
}
}
// Node-scoped backend ops (from /api/nodes/:id/backends/install)
// carry the nodeID inside the opcache key as "node:<nodeID>:<backend>".
// Pull it back out so the operations panel can label which node the
// install is targeting, and so the display name is just the backend
// slug instead of the full prefixed key.
scopedNodeID := ""
if nodeID, backend, ok := galleryop.ParseNodeScopedKey(galleryID); ok {
scopedNodeID = nodeID
galleryID = backend
}
// Extract display name (remove repo prefix if exists)
displayName := galleryID
if strings.Contains(galleryID, "@") {
@@ -237,6 +248,12 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
"cancellable": isCancellable,
"message": message,
}
// Only attach nodeID when this op was node-scoped: an empty string
// would mislead the UI into rendering a node attribution that never
// existed in the first place.
if scopedNodeID != "" {
opData["nodeID"] = scopedNodeID
}
if status != nil && status.Error != nil {
opData["error"] = status.Error.Error()
}

View File

@@ -0,0 +1,98 @@
package routes_test
import (
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/labstack/echo/v4"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/application"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http/routes"
"github.com/mudler/LocalAI/core/services/galleryop"
)
// These specs guard the contract between the opcache (which stores
// node-scoped backend installs under a "node:<nodeID>:<backend>" key) and the
// /api/operations response surface the React UI polls. Without nodeID
// extraction the panel would show the raw prefixed key and have no way to
// label which worker an install is targeting.
var _ = Describe("/api/operations with node-scoped backend ops", func() {
// We pass a zero-value *application.Application because the handler's
// distributed-services branch guards on a nil check on the returned
// *DistributedServices, which is nil for a fresh Application{}.
noopMw := func(next echo.HandlerFunc) echo.HandlerFunc { return next }
It("emits nodeID and the un-prefixed backend name for keys built by NodeScopedKey", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
key := galleryop.NodeScopedKey("worker-7", "llama-cpp")
opcache.SetBackend(key, "job-uuid-123")
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
// The handler wraps operations in {"operations": [...]}.
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == "job-uuid-123" {
found = op
break
}
}
Expect(found).ToNot(BeNil(), "node-scoped op should appear in /api/operations")
Expect(found["nodeID"]).To(Equal("worker-7"))
Expect(found["name"]).To(Equal("llama-cpp"))
Expect(found["isBackend"]).To(Equal(true))
})
It("does not emit nodeID for non-node-scoped backend ops", func() {
appCfg := &config.ApplicationConfig{}
galleryService := galleryop.NewGalleryService(appCfg, nil)
opcache := galleryop.NewOpCache(galleryService)
// Legacy/global install path: bare backend name as the opcache key.
opcache.SetBackend("llama-cpp", "job-uuid-456")
e := echo.New()
routes.RegisterUIAPIRoutes(e, nil, nil, appCfg, galleryService, opcache, &application.Application{}, noopMw)
req := httptest.NewRequest(http.MethodGet, "/api/operations", nil)
rec := httptest.NewRecorder()
e.ServeHTTP(rec, req)
Expect(rec.Code).To(Equal(http.StatusOK))
var envelope struct {
Operations []map[string]any `json:"operations"`
}
Expect(json.Unmarshal(rec.Body.Bytes(), &envelope)).To(Succeed())
var found map[string]any
for _, op := range envelope.Operations {
if op["jobID"] == "job-uuid-456" {
found = op
break
}
}
Expect(found).ToNot(BeNil())
// Critical: bare ops must NOT gain a misleading empty nodeID field.
Expect(found).ToNot(HaveKey("nodeID"), "non-node-scoped ops must NOT carry a nodeID field")
Expect(found["name"]).To(Equal("llama-cpp"))
})
})

View File

@@ -196,4 +196,60 @@ var _ = Describe("ManagementOp with External Backend", func() {
Expect(op.ExternalName).To(Equal("test-backend"))
Expect(op.ExternalAlias).To(Equal("test-alias"))
})
Context("TargetNodeID field", func() {
It("defaults to empty string", func() {
op := galleryop.ManagementOp[string, string]{
ExternalURI: "oci://example.com/backend:latest",
}
Expect(op.TargetNodeID).To(BeEmpty())
})
It("preserves TargetNodeID across a channel send", func() {
ch := make(chan galleryop.ManagementOp[string, string], 1)
ch <- galleryop.ManagementOp[string, string]{
GalleryElementName: "llama-cpp",
TargetNodeID: "node-abc-123",
}
received := <-ch
Expect(received.TargetNodeID).To(Equal("node-abc-123"))
Expect(received.GalleryElementName).To(Equal("llama-cpp"))
})
})
Describe("NodeScopedKey", func() {
It("builds a unique key per (nodeID, backend) pair", func() {
Expect(galleryop.NodeScopedKey("node-a", "llama-cpp")).To(Equal("node:node-a:llama-cpp"))
Expect(galleryop.NodeScopedKey("node-b", "llama-cpp")).To(Equal("node:node-b:llama-cpp"))
Expect(galleryop.NodeScopedKey("node-a", "vllm")).To(Equal("node:node-a:vllm"))
})
It("handles backend names containing colons", func() {
// Gallery IDs sometimes look like "official@llama-cpp"; nodeIDs are UUIDs
// without colons, but the backend slug may contain anything. Splitting on
// the first colon after the prefix MUST yield the full backend back.
key := galleryop.NodeScopedKey("node-1", "official@llama-cpp:v2")
node, backend, ok := galleryop.ParseNodeScopedKey(key)
Expect(ok).To(BeTrue())
Expect(node).To(Equal("node-1"))
Expect(backend).To(Equal("official@llama-cpp:v2"))
})
It("rejects keys without the node prefix", func() {
_, _, ok := galleryop.ParseNodeScopedKey("llama-cpp")
Expect(ok).To(BeFalse())
_, _, ok = galleryop.ParseNodeScopedKey("official@llama-cpp")
Expect(ok).To(BeFalse())
})
It("rejects malformed node-prefixed keys", func() {
_, _, ok := galleryop.ParseNodeScopedKey("node:only-one-segment")
Expect(ok).To(BeFalse())
})
It("rejects keys with an empty nodeID segment", func() {
_, _, ok := galleryop.ParseNodeScopedKey("node::llama-cpp")
Expect(ok).To(BeFalse())
})
})
})

View File

@@ -2,6 +2,7 @@ package galleryop
import (
"context"
"strings"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/xsync"
@@ -30,6 +31,12 @@ type ManagementOp[T any, E any] struct {
ExternalName string // Custom name for the backend
ExternalAlias string // Custom alias for the backend
// TargetNodeID scopes a backend install/upgrade to a single worker node.
// Empty means fan out to every healthy backend node (the previous behavior).
// Set by InstallBackendOnNodeEndpoint so an admin can install a hardware-specific
// build on one node without touching the rest of the cluster.
TargetNodeID string
// Upgrade is true if this is an upgrade operation (not a fresh install)
Upgrade bool
}
@@ -115,3 +122,31 @@ func (m *OpCache) GetStatus() (map[string]string, map[string]string) {
return processingModelsData, taskTypes
}
// NodeScopedKeyPrefix is the opcache key prefix used by InstallBackendOnNodeEndpoint
// so per-node installs do not collide on the bare backend name. Format:
// "node:<nodeID>:<backend>". Read by /api/operations to extract nodeID for the UI.
const NodeScopedKeyPrefix = "node:"
// NodeScopedKey returns the opcache key for a node-scoped backend operation.
// The prefix lets ParseNodeScopedKey detach the nodeID back out so the
// operations endpoint can surface it without storing nodeID separately.
func NodeScopedKey(nodeID, backend string) string {
return NodeScopedKeyPrefix + nodeID + ":" + backend
}
// ParseNodeScopedKey extracts (nodeID, backend) from a key built by NodeScopedKey.
// Returns ok=false for keys that lack the prefix or are missing the nodeID or
// backend segment. Backend names containing colons are preserved because we
// split on the first colon after the prefix only.
func ParseNodeScopedKey(key string) (nodeID, backend string, ok bool) {
rest, hasPrefix := strings.CutPrefix(key, NodeScopedKeyPrefix)
if !hasPrefix {
return "", "", false
}
nodeID, backend, ok = strings.Cut(rest, ":")
if !ok || nodeID == "" || backend == "" {
return "", "", false
}
return nodeID, backend, true
}

View File

@@ -331,13 +331,23 @@ func (d *DistributedBackendManager) ListBackends() (gallery.SystemBackends, erro
// non-healthy nodes get retried when they come back instead of being silently
// skipped. Reply success from the NATS round-trip deletes the queue row;
// reply.Success==false is treated as an error so the row stays for retry.
//
// When op.TargetNodeID is set, only that node is visited - the same allowlist
// path UpgradeBackend uses. Empty TargetNodeID preserves the original fan-out
// behavior so the periodic reconciler and /api/backends/install/:id keep
// working unchanged.
func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *galleryop.ManagementOp[gallery.GalleryBackend, any], progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
var targetNodeIDs map[string]bool
if op.TargetNodeID != "" {
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 the worker's processKey is "backend#0" when no
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
if err != nil {

View File

@@ -311,6 +311,47 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(mgr.InstallBackend(ctx, op("vllm-development"), nil)).To(Succeed())
})
})
Context("when op.TargetNodeID is set to a healthy node", func() {
It("installs only on that node, leaving the others untouched", func() {
target := registerHealthyBackend("worker-target", "10.0.0.1:50051")
other := registerHealthyBackend("worker-other", "10.0.0.2:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(target.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.1:50100"})
// No reply scripted for `other`: if InstallBackend fans out
// to it, the fakeNoRespondersErr default would surface and
// the test would fail.
targetedOp := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
GalleryElementName: "llama-cpp",
TargetNodeID: target.ID,
}
Expect(mgr.InstallBackend(ctx, targetedOp, nil)).To(Succeed())
mc.mu.Lock()
defer mc.mu.Unlock()
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Subject).To(Equal(messaging.SubjectNodeBackendInstall(target.ID)))
Expect(mc.calls[0].Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(other.ID)))
})
})
Context("when op.TargetNodeID is set to a node that does not exist", func() {
It("returns nil without sending any NATS request", func() {
registerHealthyBackend("worker-a", "10.0.0.1:50051")
ghostOp := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
GalleryElementName: "llama-cpp",
TargetNodeID: "this-id-does-not-exist",
}
Expect(mgr.InstallBackend(ctx, ghostOp, nil)).To(Succeed())
mc.mu.Lock()
defer mc.mu.Unlock()
Expect(mc.calls).To(BeEmpty())
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -0,0 +1,115 @@
package skills_test
import (
"context"
"encoding/json"
"os"
"testing"
"time"
"github.com/modelcontextprotocol/go-sdk/mcp"
agiSkills "github.com/mudler/LocalAGI/services/skills"
localskills "github.com/mudler/LocalAI/core/services/skills"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSkillsMCP(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Skills MCP test")
}
// listSkillsResult mirrors the output struct of skillserver's list_skills tool.
type listSkillsResult struct {
Skills []struct {
ID string `json:"id"`
Description string `json:"description,omitempty"`
} `json:"skills"`
}
// Exercises the same wire the agent uses at runtime: open an in-process
// MCP session via LocalAGI's skills.Service, create a skill through the
// LocalAI FilesystemManager, then list_skills on the still-open session.
// Guards against regressions in the manager <-> MCP session lifecycle
// (e.g. cached manager not picking up newly-created skills).
var _ = Describe("Skills exposed to agent via MCP", func() {
var (
stateDir string
svc *agiSkills.Service
ctx context.Context
cancel context.CancelFunc
)
BeforeEach(func() {
var err error
stateDir, err = os.MkdirTemp("", "skills-mcp-test")
Expect(err).NotTo(HaveOccurred())
// Create the LocalAGI skills service (this is what AgentPoolService wires
// into LocalAGI's state.NewAgentPool for MCP session exposure).
svc, err = agiSkills.NewService(stateDir)
Expect(err).NotTo(HaveOccurred())
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
})
AfterEach(func() {
cancel()
Expect(os.RemoveAll(stateDir)).To(Succeed())
})
It("returns a skill created after the MCP session was established", func() {
// Open the MCP session first — this is what the agent does at startup
// with EnableSkills=true, before any skill might exist.
session, err := svc.GetMCPSession(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(session).NotTo(BeNil())
res, err := session.CallTool(ctx, &mcp.CallToolParams{Name: "list_skills"})
Expect(err).NotTo(HaveOccurred())
Expect(res.IsError).To(BeFalse())
var initial listSkillsResult
Expect(decodeMCPText(res, &initial)).To(Succeed())
Expect(initial.Skills).To(BeEmpty(), "no skills should exist initially")
// Create a skill via the LocalAI FilesystemManager — same code path the
// /api/agents/skills POST endpoint takes.
mgr := localskills.NewFilesystemManager(svc)
_, err = mgr.Create("talk-like-pirate", "Talk like a pirate", "Speak in pirate-style.", "", "", "", nil)
Expect(err).NotTo(HaveOccurred())
// Re-list via the SAME already-open session: the manager is shared,
// so a freshly-created skill must be visible without re-attaching.
res, err = session.CallTool(ctx, &mcp.CallToolParams{Name: "list_skills"})
Expect(err).NotTo(HaveOccurred())
Expect(res.IsError).To(BeFalse())
var got listSkillsResult
Expect(decodeMCPText(res, &got)).To(Succeed())
ids := make([]string, 0, len(got.Skills))
for _, s := range got.Skills {
ids = append(ids, s.ID)
}
Expect(ids).To(ContainElement("talk-like-pirate"))
})
})
func mcpText(res *mcp.CallToolResult) string {
text := ""
for _, c := range res.Content {
if tc, ok := c.(*mcp.TextContent); ok {
text += tc.Text
}
}
return text
}
func decodeMCPText(res *mcp.CallToolResult, out any) error {
text := mcpText(res)
if text == "" {
return nil
}
return json.Unmarshal([]byte(text), out)
}

View File

@@ -444,11 +444,15 @@ These llama.cpp options are passed through the `options:` array.
### Prompt Caching
The recommended way to enable prompt caching for the `llama-cpp` backend is the **server-side prompt cache** controlled by `cache_ram` / `kv_unified` / `cache_idle_slots` in the `options:` array (see [llama.cpp backend options]({{%relref "features/text-generation#server-side-prompt-cache-repeated-system-prompts" %}})). It's on by default since LocalAI v4.3 and is what gives repeated system prompts a near-zero prefill on the second call.
The fields below come from upstream llama.cpp's **CLI completion tool** and are passed through to the gRPC backend for compatibility, but the gRPC server itself does not consume them: keep them empty unless you're targeting a non-llama-cpp backend that reads them.
| Field | Type | Description |
|-------|------|-------------|
| `prompt_cache_path` | string | Path to store prompt cache (relative to models directory) |
| `prompt_cache_all` | bool | Cache all prompts automatically |
| `prompt_cache_ro` | bool | Read-only prompt cache |
| `prompt_cache_path` | string | (legacy / unused by llama-cpp gRPC server) Path to a file-backed prompt cache for upstream's CLI completion tool. |
| `prompt_cache_all` | bool | (legacy / unused by llama-cpp gRPC server) |
| `prompt_cache_ro` | bool | (legacy / unused by llama-cpp gRPC server) |
### Text Processing

View File

@@ -253,10 +253,12 @@ User API keys inherit the creating user's role. Admin keys grant admin access; u
| `GET` | `/api/auth/api-keys` | List user's API keys | Yes |
| `DELETE` | `/api/auth/api-keys/:id` | Revoke API key | Yes |
| `GET` | `/api/auth/usage` | User's own usage stats | Yes |
| `GET` | `/api/auth/usage/sources` | User's own per-API-key / per-source breakdown | Yes |
| `GET` | `/api/auth/admin/users` | List all users | Admin |
| `PUT` | `/api/auth/admin/users/:id/role` | Change user role | Admin |
| `DELETE` | `/api/auth/admin/users/:id` | Delete user | Admin |
| `GET` | `/api/auth/admin/usage` | All users' usage stats | Admin |
| `GET` | `/api/auth/admin/usage/sources` | All users' per-API-key / per-source breakdown | Admin |
| `POST` | `/api/auth/admin/invites` | Create invite link | Admin |
| `GET` | `/api/auth/admin/invites` | List all invites | Admin |
| `DELETE` | `/api/auth/admin/invites/:id` | Revoke unused invite | Admin |
@@ -327,10 +329,79 @@ curl "http://localhost:8080/api/auth/admin/usage?period=month&user_id=<user-id>"
### Usage Dashboard
The web UI Usage page provides:
- **Period selector** switch between day, week, month, and all-time views
- **Summary cards** total requests, prompt tokens, completion tokens, total tokens
- **By Model table** per-model breakdown with visual usage bars
- **By User table** (admin only) per-user breakdown across all models
- **Period selector** - switch between day, week, month, and all-time views
- **Summary cards** - total requests, prompt tokens, completion tokens, total tokens
- **By Model table** - per-model breakdown with visual usage bars
- **By User table** (admin only) - per-user breakdown across all models
- **Sources tab** - per-API-key and per-source breakdown (described below)
### Per-API-key Breakdown
The **Sources** tab on the Usage page surfaces a third dimension of the same data: traffic broken down by API key and by request source. Three source classes are tracked:
- **API key** - request authenticated with a named user API key (`Authorization: Bearer lai-...`, `x-api-key`, or `token` cookie). Each key shows up with its label (snapshotted at write time, so revoked keys still display the original name).
- **Web UI** - request authenticated with a browser session cookie.
- **Legacy** - request authenticated with an env-configured `LOCALAI_API_KEY`. Visible to admins only.
The Sources tab is visible to every authenticated user. Non-admins see only their own keys plus their own Web UI traffic (legacy is filtered server-side). Admins see every key from every user.
The tab is laid out as:
- A **source mix ribbon** showing the percentage split across the three classes.
- A **top-N + Other stacked time chart** (top 7 sources by total tokens; the rest roll up).
- A **searchable, sortable table** of every key plus the Web UI and Legacy pseudo-rows. Click a row to filter the chart to that source.
#### Endpoints
| Method | Path | Auth | Description |
|--------|------|------|-------------|
| `GET` | `/api/auth/usage/sources` | Self | Caller's per-source breakdown. Excludes legacy. |
| `GET` | `/api/auth/admin/usage/sources` | Admin | All users' per-source breakdown. Accepts `user_id` and `api_key_id` filters. Includes legacy. |
Both endpoints accept the same `period` parameter (`day`, `week`, `month`, `all`) as `/api/auth/usage`.
```bash
# Your own per-source usage for the last week
curl "http://localhost:8080/api/auth/usage/sources?period=week" \
-H "Authorization: Bearer <key>"
# Admin: filter to a single API key across all users
curl "http://localhost:8080/api/auth/admin/usage/sources?period=month&api_key_id=<key-id>" \
-H "Authorization: Bearer <admin-key>"
```
**Response shape:**
```json
{
"buckets": [
{ "bucket": "2026-05-19", "source": "apikey",
"api_key_id": "uuid", "api_key_name": "ci-runner",
"total_tokens": 20000, "request_count": 142, "...": "..." },
{ "bucket": "2026-05-19", "source": "web",
"total_tokens": 300, "request_count": 11, "...": "..." }
],
"totals": {
"by_source": {
"apikey": { "tokens": 1234567, "requests": 8420 },
"web": { "tokens": 92000, "requests": 211 }
},
"by_key": [
{ "api_key_id": "uuid", "api_key_name": "ci-runner",
"tokens": 2100000, "requests": 8420,
"last_used": "2026-05-20T12:34:56Z" }
],
"grand_total": { "tokens": 1334777, "requests": 8645 }
},
"truncated": false
}
```
The `by_key` list is server-sorted by tokens descending and capped at 200 entries. When more keys would qualify, the response sets `"truncated": true` so the UI can show a notice.
#### Migration of pre-feature data
Usage rows recorded before this feature have no `source` column. On startup, `InitDB` backfills them as `legacy` when the synthetic `legacy-api-key` user_id was used, and `web` for everything else. The migration is idempotent; existing aggregations remain correct after the upgrade.
## Combining Auth Modes

View File

@@ -499,7 +499,7 @@ The `llama.cpp` backend supports additional configuration options that can be sp
|--------|------|-------------|---------|
| `use_jinja` or `jinja` | boolean | Enable Jinja2 template processing for chat templates. When enabled, the backend uses Jinja2-based chat templates from the model for formatting messages. | `use_jinja:true` |
| `context_shift` | boolean | Enable context shifting, which allows the model to dynamically adjust context window usage. | `context_shift:true` |
| `cache_ram` | integer | Set the maximum RAM cache size in MiB for KV cache. Use `-1` for unlimited (default). | `cache_ram:2048` |
| `cache_ram` | integer | Size budget in MiB for the **server-side prompt cache** (a host-RAM store of idle slot KV states that's reloaded on a prompt-prefix hit, see [upstream PR #16391](https://github.com/ggml-org/llama.cpp/pull/16391)). Default: `-1` (no limit). `0` disables the prompt cache entirely. Together with `kv_unified` and `cache_idle_slots` this is what makes a repeated system prompt skip prefill on subsequent calls. | `cache_ram:4096` |
| `parallel` or `n_parallel` | integer | Enable parallel request processing. When set to a value greater than 1, enables continuous batching for handling multiple requests concurrently. | `parallel:4` |
| `grpc_servers` or `rpc_servers` | string | Comma-separated list of gRPC server addresses for distributed inference. Allows distributing workload across multiple llama.cpp workers. | `grpc_servers:localhost:50051,localhost:50052` |
| `fit_params` or `fit` | boolean | Enable auto-adjustment of model/context parameters to fit available device memory. Default: `true`. | `fit_params:true` |
@@ -512,8 +512,10 @@ The `llama.cpp` backend supports additional configuration options that can be sp
| `check_tensors` | boolean | Validate tensor data for invalid values during model loading. Default: `false`. | `check_tensors:true` |
| `warmup` | boolean | Enable warmup run after model loading. Default: `true`. | `warmup:false` |
| `no_op_offload` | boolean | Disable offloading host tensor operations to device. Default: `false`. | `no_op_offload:true` |
| `kv_unified` or `unified_kv` | boolean | Enable unified KV cache. Default: `false`. | `kv_unified:true` |
| `n_ctx_checkpoints` or `ctx_checkpoints` | integer | Maximum number of context checkpoints per slot. Default: `8`. | `ctx_checkpoints:4` |
| `kv_unified` or `unified_kv` | boolean | Use a single unified KV buffer shared across all sequences. Default: `true` (LocalAI override; upstream defaults to `false` but auto-enables it when slot count is auto). **Required for `cache_idle_slots` to work**: without it the server force-disables idle-slot saving at init, and the prompt cache is never written across requests. | `kv_unified:false` |
| `cache_idle_slots` or `idle_slots_cache` | boolean | On a new task, save the previous slot's KV state into the prompt cache (and clear the slot) so a later request with the same prefix can warm-load it. Default: `true`. Auto-disabled by the server if `kv_unified=false` or `cache_ram=0`. | `cache_idle_slots:false` |
| `n_ctx_checkpoints` or `ctx_checkpoints` | integer | Maximum number of context checkpoints per slot (used for partial-prefix recovery, e.g. SWA). Default: `32`. | `ctx_checkpoints:16` |
| `checkpoint_every_nt` or `checkpoint_every_n_tokens` | integer | Create a context checkpoint every N tokens during prefill. `-1` disables checkpointing. Default: `8192`. | `checkpoint_every_nt:4096` |
| `split_mode` or `sm` | string | How to split the model across multiple GPUs: `none` (single GPU only), `layer` (default — split layers and KV across GPUs), `row` (split rows across GPUs), `tensor` (experimental tensor parallelism — requires `flash_attention: true`, no KV-cache quantization, manually set `context_size`, and a llama.cpp build that includes [#19378](https://github.com/ggml-org/llama.cpp/pull/19378)). | `split_mode:tensor` |
**Example configuration with options:**
@@ -535,6 +537,27 @@ options:
**Note:** The `parallel` option can also be set via the `LLAMACPP_PARALLEL` environment variable, and `grpc_servers` can be set via the `LLAMACPP_GRPC_SERVERS` environment variable. Options specified in the YAML file take precedence over environment variables.
##### Server-side prompt cache (repeated system prompts)
Agents, coding assistants, and Anthropic/OpenAI-compatible CLIs typically resend the same large system prompt on every turn. The llama.cpp server can short-circuit prefill for the matching prefix by stashing idle slot KV states in host RAM and reloading them on a hit. Three settings interact:
| Setting | Default | Role |
|---|---|---|
| `cache_ram:N` | `-1` (no limit) | Allocates the host-side prompt cache. `0` disables it. |
| `kv_unified:true` | `true` | Single unified KV buffer (**prerequisite** for idle-slot saving). |
| `cache_idle_slots:true` | `true` | Persists the idle slot's KV into the prompt cache on task switch. |
All three are on by default since LocalAI v4.3, so the prompt cache works out of the box for the common single-slot setup. If you're on an older release, or you've explicitly disabled one of them, add the following to recover the behaviour:
```yaml
options:
- cache_ram:4096 # or -1 for no limit
- kv_unified:true
- cache_idle_slots:true
```
Set `cache_ram:0` to opt out of the prompt cache entirely (saves host RAM at the cost of re-prefilling repeated prompts).
#### Reference
- [llama](https://github.com/ggerganov/llama.cpp)

2
go.mod
View File

@@ -220,7 +220,7 @@ require (
github.com/mschoch/smat v0.2.0 // indirect
github.com/mudler/LocalAGI v0.0.0-20260508125235-37810d918a87
github.com/mudler/localrecall v0.6.1-0.20260507074622-a7724fef6f81 // indirect
github.com/mudler/skillserver v0.0.6
github.com/mudler/skillserver v0.0.7-0.20260520220837-a7317cbf9145
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/oxffaa/gopher-parse-sitemap v0.0.0-20191021113419-005d2eb1def4 // indirect
github.com/philippgille/chromem-go v0.7.0 // indirect

4
go.sum
View File

@@ -984,6 +984,10 @@ github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8 h1:Ry8RiWy8fZ6Ff4E7d
github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8/go.mod h1:EA8Ashhd56o32qN7ouPKFSRUs/Z+LrRCF4v6R2Oarm8=
github.com/mudler/skillserver v0.0.6 h1:ixz6wUekLdTmbnpAavCkTydDF6UdXAG3ncYufSPK9G0=
github.com/mudler/skillserver v0.0.6/go.mod h1:z3yFhcL9bSykmmh6xgGu0hyoItd4CnxgtWMEWw8uFJU=
github.com/mudler/skillserver v0.0.7-0.20260520212528-3dae7f041b1e h1:ryXE1UEzGhLkDFYuaxJ0fZ6fg4l++TWfMCTJ1E7bYS8=
github.com/mudler/skillserver v0.0.7-0.20260520212528-3dae7f041b1e/go.mod h1:z3yFhcL9bSykmmh6xgGu0hyoItd4CnxgtWMEWw8uFJU=
github.com/mudler/skillserver v0.0.7-0.20260520220837-a7317cbf9145 h1:z59tA3IDYPt71nzH1jpxeaA1LuDw8aZfpTQFNU43Zb8=
github.com/mudler/skillserver v0.0.7-0.20260520220837-a7317cbf9145/go.mod h1:z3yFhcL9bSykmmh6xgGu0hyoItd4CnxgtWMEWw8uFJU=
github.com/mudler/water v0.0.0-20250808092830-dd90dcf09025 h1:WFLP5FHInarYGXi6B/Ze204x7Xy6q/I4nCZnWEyPHK0=
github.com/mudler/water v0.0.0-20250808092830-dd90dcf09025/go.mod h1:QuIFdRstyGJt+MTTkWY+mtD7U6xwjOR6SwKUjmLZtR4=
github.com/mudler/xlog v0.0.6 h1:3nBV4THK8kY0Y8FDXXvWAnuAJoOyO7EAXteJeAoHUC0=

View File

@@ -36,7 +36,7 @@ func ExtractArchive(archive, dst string) error {
OverwriteExisting: true,
MkdirAll: true,
ImplicitTopLevelFolder: false,
ContinueOnError: true,
ContinueOnError: false,
}
switch v := uaIface.(type) {