Compare commits

...

7 Commits

Author SHA1 Message Date
Ettore Di Giacinto
9d896a37b2 Merge branch 'master' into fix/distributed-model-load-lock-timeout 2026-06-30 22:34:00 +02:00
Tai An
057dee956a fix(launcher): keep data/config under ~/.localai (#10610) (#10613)
The launcher starts the server with run --models-path/--backends-path but
leaves --data-path and the dynamic config dir unset, so the server falls
back to its /data and /configuration defaults.
 is kong.ExpandPath("."), i.e. the launcher process CWD
(commonly the user's home root), producing ~/data and ~/configuration
outside ~/.localai and an agent-pool stateDir under ~/data.

Pass --data-path and --localai-config-dir explicitly, rooted at the
launcher's own data directory (GetDataPath() -> ~/.localai), so data and
config stay consistent with --models-path/--backends-path.
2026-06-30 22:14:59 +02:00
Adira
4ec39bb776 fix(watchdog): don't log optional Free() as an error when backend returns Unimplemented (#10602) (#10607)
* fix(watchdog): don't log optional Free() as an error when backend returns Unimplemented (#10602)

When the watchdog evicts a model, deleteProcess calls the backend's gRPC
Free() to release VRAM before stopping the process. Free is optional:
backends that don't override it -- the generated UnimplementedBackendServer
stub, many Python/external backends, or a federation proxy in distributed
mode -- return gRPC Unimplemented. That is expected, not a failure: VRAM is
reclaimed when the local process is stopped, or by the remote unloader for
remote backends. Logging it as "WARN Error freeing GPU resources" made a
benign, optional RPC look like a fault (the alarming line in #10602, seen
in distributed mode where the model is remote and Free hits a stub).

Treat gRPC Unimplemented from Free() as a no-op logged at Debug; genuine
failures still Warn. Free() is still attempted for every backend, so any
backend that does implement it is unaffected.

Add a reusable grpcerrors.IsUnimplemented helper following the package's
existing code-based detection idiom (prefer the typed status code, fall
back to the message across non-gRPC boundaries), with table tests.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Adira Denis Muhando <dennisadira@gmail.com>

* fix(watchdog): log a non-Unimplemented Free() failure at error level

Per review: now that the expected gRPC Unimplemented case is split out and
logged at Debug, any remaining Free() error is a genuine failure to release
VRAM, so surface it at error level instead of warn.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Adira Denis Muhando <dennisadira@gmail.com>

---------

Signed-off-by: Adira Denis Muhando <dennisadira@gmail.com>
2026-06-30 22:14:01 +02:00
Ettore Di Giacinto
25ecb9f015 fix(gallery): use Q8_0 for lfm2.5-8b-a1b to fix poor tool-call quality
The Q4_K_M quant degraded tool-call reliability for LFM2.5-8B-A1B.
Switch the gallery entry to the Q8_0 GGUF (sha256 verified via HF
x-linked-etag) while keeping the native jinja tool-parsing config.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
2026-06-30 17:46:20 +00:00
LocalAI [bot]
2be495f9c0 fix(kokoros): implement AudioTranscriptionLive trait stub (#10612)
The backend.proto AudioTranscriptionLive bidirectional streaming RPC added
new required trait items (AudioTranscriptionLiveStream + audio_transcription_live)
on the generated Backend trait. The kokoros (TTS) backend did not implement
them, breaking its release build with E0046 (missing trait items).

kokoros is text-to-speech and has no live-ASR support, so stub the method to
return UNIMPLEMENTED, mirroring the existing audio_transcription_stream stub.


Assisted-by: Claude:claude-opus-4-8 [Claude Code]

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-06-30 19:38:41 +02:00
Ettore Di Giacinto
2972165e53 fix(distributed): bound advisory-lock wait instead of disabling lock_timeout
Setting lock_timeout = 0 to override a deployment's short global lock_timeout
meant "wait forever" server-side. Safe for SmartRouter.Route (its loadCtx now
carries the model-load ceiling) but unsafe for the schema-migration callers
that pass context.Background(): a holder whose session never releases would
hang them indefinitely.

Derive the server-side lock_timeout from the caller's context instead: its
remaining budget plus a margin (so the Go context's cancellation still wins
with a clean error and the server bound is only a backstop), or a finite
30m backstop when the context has no deadline. Never zero - "wait forever"
is no longer possible, while a deployment's hostile short lock_timeout is
still overridden so legitimate cross-replica waits don't fail with 55P03.

Added a spec proving a deadline-less waiter gives up at the (shrunk) backstop
rather than hanging.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
2026-06-30 07:23:12 +00:00
Ettore Di Giacinto
a73516f9b4 fix(distributed): don't let a dead worker pin the model-load advisory lock
In distributed mode a chat request could fail with:

  failed to route model with internal loader: routing model ...:
  loading model ...: advisorylock: acquiring lock <id>:
  ERROR: canceling statement due to lock timeout (SQLSTATE 55P03)

Root cause is two independent defects in the cross-replica model-load path:

1. SmartRouter.Route holds a per-model PostgreSQL advisory lock for the whole
   cold-load sequence, which includes installBackendOnNode -> InstallBackend,
   a NATS request-reply with a 15m deadline (DefaultBackendInstallTimeout) that
   ignored ctx. When the chosen worker died mid-install, the holder sat on the
   lock for up to 15m. The detached loadCtx (WithoutCancel) had no deadline, so
   nothing capped the hold.

2. The acquiring statement, pg_advisory_lock(), is subject to any deployment
   global lock_timeout. A common operator setting (e.g. 10s) aborts the wait
   with SQLSTATE 55P03, so every other replica's request for that model hard
   -errored instead of waiting for the in-progress load and reusing it. For the
   ~15m window the model was effectively unroutable.

Fixes:

- advisorylock.WithLockCtx (postgres): SET lock_timeout = 0 on its dedicated
  connection (RESET before it returns to the pool) so the Go context, not a
  deployment-wide GUC, governs how long we wait. Waiters now block and then
  re-check, reusing the model another replica just loaded.

- SmartRouter: bound the detached loadCtx with a single ModelLoadCeiling so the
  lock is always released in bounded time even if a sub-step wedges. Default is
  the configured backend.install deadline + 10m (staging + LoadModel margin),
  so a legitimately slow load is never cut.

- installBackendOnNode: use singleflight.DoChan + select on ctx.Done() so the
  install wait honors cancellation; the ceiling can then actually free a caller
  pinned behind a dead worker. The shared install still coalesces via
  singleflight.

Reproduced both defects as failing tests first (a real 55P03 against a
testcontainer with a short lock_timeout; a wedged install that blocks Route)
and confirmed green.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
2026-06-29 22:51:17 +00:00
11 changed files with 291 additions and 15 deletions

View File

@@ -351,6 +351,16 @@ impl Backend for KokorosService {
Err(Status::unimplemented("Not supported")) Err(Status::unimplemented("Not supported"))
} }
type AudioTranscriptionLiveStream =
ReceiverStream<Result<backend::TranscriptLiveResponse, Status>>;
async fn audio_transcription_live(
&self,
_: Request<tonic::Streaming<backend::TranscriptLiveRequest>>,
) -> Result<Response<Self::AudioTranscriptionLiveStream>, Status> {
Err(Status::unimplemented("Not supported"))
}
async fn diarize( async fn diarize(
&self, &self,
_: Request<backend::DiarizeRequest>, _: Request<backend::DiarizeRequest>,

View File

@@ -207,12 +207,20 @@ func (l *Launcher) StartLocalAI() error {
} }
// Build command arguments // Build command arguments
dataPath := l.GetDataPath()
args := []string{ args := []string{
"run", "run",
"--models-path", l.config.ModelsPath, "--models-path", l.config.ModelsPath,
"--backends-path", l.config.BackendsPath, "--backends-path", l.config.BackendsPath,
"--address", l.config.Address, "--address", l.config.Address,
"--log-level", l.config.LogLevel, "--log-level", l.config.LogLevel,
// Keep persistent data and dynamic config under the launcher's data
// directory (~/.localai) rather than letting the server resolve them
// to ${basepath}/{data,configuration}. ${basepath} expands to the
// launcher process's CWD (often the user's home root), which puts
// ~/data and ~/configuration outside ~/.localai. See #10610.
"--data-path", filepath.Join(dataPath, "data"),
"--localai-config-dir", filepath.Join(dataPath, "configuration"),
} }
l.localaiCmd = exec.CommandContext(l.ctx, binaryPath, args...) l.localaiCmd = exec.CommandContext(l.ctx, binaryPath, args...)

View File

@@ -356,6 +356,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
PrefixConfig: prefixCfg, PrefixConfig: prefixCfg,
Pressure: pressure, Pressure: pressure,
SharedModels: cfg.Distributed.SharedModels, SharedModels: cfg.Distributed.SharedModels,
// Cap how long a cold load may hold the per-model advisory lock: the
// configured backend.install deadline plus a margin for file staging and
// the remote LoadModel. Derived from the install timeout so raising it
// (for slow links pulling multi-GB images) widens the ceiling too,
// instead of letting the static default cut a legitimately slow load.
ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute,
}) })
// Wire staging-progress broadcasting so file-staging shows up on every // Wire staging-progress broadcasting so file-staging shows up on every

View File

@@ -6,10 +6,39 @@ import (
"hash/fnv" "hash/fnv"
"strings" "strings"
"sync" "sync"
"time"
"gorm.io/gorm" "gorm.io/gorm"
) )
// advisoryLockWaitBackstop bounds, server-side, how long we will wait to
// acquire a blocking advisory lock when the caller's context carries no
// deadline (e.g. a startup schema migration using context.Background()). It
// only exists so such a caller cannot hang forever behind a holder whose
// session never releases the lock; it is far longer than any legitimate
// guarded section. A var (not const) so tests can shrink it.
var advisoryLockWaitBackstop = 30 * time.Minute
// advisoryLockTimeoutMargin is added to a context's remaining budget when
// deriving the server-side lock_timeout, so the Go context's own (cleaner)
// cancellation fires first and the server bound is only ever a backstop.
const advisoryLockTimeoutMargin = 30 * time.Second
// advisoryLockWaitBudget returns the server-side lock_timeout to use for a
// blocking acquire: the caller context's remaining time plus a margin (so the
// Go context still governs), or the backstop when the context has no deadline.
// Never returns zero - "wait forever" must not be possible.
func advisoryLockWaitBudget(ctx context.Context) time.Duration {
if dl, ok := ctx.Deadline(); ok {
budget := time.Until(dl) + advisoryLockTimeoutMargin
if budget < time.Second {
budget = time.Second
}
return budget
}
return advisoryLockWaitBackstop
}
// localLocks holds one buffered channel (capacity 1) per lock key, used as an // localLocks holds one buffered channel (capacity 1) per lock key, used as an
// in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is // in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is
// effectively single-process, so serializing guarded sections within this // effectively single-process, so serializing guarded sections within this
@@ -130,6 +159,27 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e
} }
defer conn.Close() defer conn.Close()
// Override any deployment-wide lock_timeout on this dedicated connection.
// Operators commonly set a short global lock_timeout (on the role or
// database) to bound ordinary row-lock waits. Applied to the blocking
// pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns
// LocalAI's intentional cross-replica "wait your turn, then re-check"
// coordination into a hard error for the caller (e.g. a chat request that
// just wanted to reuse a model another replica is loading).
//
// We do NOT disable it outright (lock_timeout = 0 would wait forever, which
// is unsafe for the schema-migration callers that pass context.Background()).
// Instead we set a bound derived from the caller's context: its remaining
// budget plus a margin so the Go context's cancellation wins with a clean
// error, or a finite backstop when the context has no deadline.
waitBudget := advisoryLockWaitBudget(ctx)
if _, err := conn.ExecContext(ctx,
fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil {
return fmt.Errorf("advisorylock: setting lock_timeout: %w", err)
}
// Restore the session default before this pooled connection is reused.
defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }()
if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil { if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil {
return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err) return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err)
} }

View File

@@ -158,6 +158,87 @@ var _ = Describe("AdvisoryLock", func() {
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
It("waits out a short server-side lock_timeout instead of failing with 55P03", func() {
const lockKey int64 = 703
// Reproduce the production deployment that triggered this: a short
// global lock_timeout set on the database. Without the fix, a waiter
// blocked on pg_advisory_lock() is aborted by the server after this
// window and surfaces SQLSTATE 55P03 ("canceling statement due to
// lock timeout") to the caller instead of waiting for its turn.
Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred())
sqlDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
// Drop pooled connections so subsequent ones reconnect and inherit
// the new database-level lock_timeout default.
sqlDB.SetMaxIdleConns(0)
holding := make(chan struct{})
released := make(chan struct{})
go func() {
defer GinkgoRecover()
herr := WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
// Hold well past the 300ms server lock_timeout.
time.Sleep(1 * time.Second)
return nil
})
Expect(herr).ToNot(HaveOccurred())
close(released)
}()
<-holding // ensure the holder owns the lock before we contend
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
executed := false
start := time.Now()
werr := WithLockCtx(ctx, db, lockKey, func() error {
executed = true
return nil
})
Expect(werr).ToNot(HaveOccurred(),
"waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)")
Expect(executed).To(BeTrue())
Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond),
"waiter should have actually waited for the holder to release")
<-released
})
It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() {
const lockKey int64 = 704
// A caller with no context deadline (e.g. startup schema migration
// passing context.Background()) must not hang forever if the holder
// never releases. Shrink the backstop so the test is fast.
origBackstop := advisoryLockWaitBackstop
advisoryLockWaitBackstop = 500 * time.Millisecond
DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop })
holding := make(chan struct{})
release := make(chan struct{})
go func() {
defer GinkgoRecover()
_ = WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
<-release // hold until the test releases us
return nil
})
}()
defer close(release)
<-holding
start := time.Now()
err := WithLockCtx(context.Background(), db, lockKey, func() error {
Fail("waiter should not have acquired the still-held lock")
return nil
})
Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang")
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second),
"backstop must cap the wait well under the test timeout")
})
It("serializes concurrent WithLockCtx on same key", func() { It("serializes concurrent WithLockCtx on same key", func() {
const lockKey int64 = 702 const lockKey int64 = 702

View File

@@ -68,6 +68,13 @@ type SmartRouterOptions struct {
// the absolute model paths untouched so the worker loads them directly from // the absolute model paths untouched so the worker loads them directly from
// the shared volume (#10556). See config.DistributedConfig.SharedModels. // the shared volume (#10556). See config.DistributedConfig.SharedModels.
SharedModels bool SharedModels bool
// ModelLoadCeiling is the hard upper bound on how long a single cold-load
// attempt (node selection -> backend install -> file staging -> LoadModel)
// may run while holding the per-model advisory lock. It backstops every
// sub-step's own timeout so a wedged worker can never pin the lock - and
// every other replica's request for that model - indefinitely. Zero selects
// defaultModelLoadCeiling.
ModelLoadCeiling time.Duration
} }
// SmartRouter routes inference requests to the best available backend node. // SmartRouter routes inference requests to the best available backend node.
@@ -101,8 +108,18 @@ type SmartRouter struct {
// sharedModels skips file staging when all nodes mount the same models // sharedModels skips file staging when all nodes mount the same models
// directory at the same path (see SmartRouterOptions.SharedModels). // directory at the same path (see SmartRouterOptions.SharedModels).
sharedModels bool sharedModels bool
// modelLoadCeiling bounds how long a cold load may hold the per-model
// advisory lock (see SmartRouterOptions.ModelLoadCeiling).
modelLoadCeiling time.Duration
} }
// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load.
// It must comfortably exceed the slowest legitimate load - a multi-GB backend
// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote
// LoadModel (5m) - so it never cuts a real load short; it only ever fires when
// a step is genuinely wedged (e.g. a worker that died mid-install).
const defaultModelLoadCeiling = 25 * time.Minute
// probeCacheTTL is how long a successful gRPC HealthCheck on a backend is // probeCacheTTL is how long a successful gRPC HealthCheck on a backend is
// trusted before the next request re-probes. Matches healthCheckTTL in // trusted before the next request re-probes. Matches healthCheckTTL in
// pkg/model/model.go so the single-process and distributed paths share a // pkg/model/model.go so the single-process and distributed paths share a
@@ -117,6 +134,10 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
if factory == nil { if factory == nil {
factory = &tokenClientFactory{token: opts.AuthToken} factory = &tokenClientFactory{token: opts.AuthToken}
} }
ceiling := opts.ModelLoadCeiling
if ceiling <= 0 {
ceiling = defaultModelLoadCeiling
}
return &SmartRouter{ return &SmartRouter{
registry: registry, registry: registry,
unloader: opts.Unloader, unloader: opts.Unloader,
@@ -131,6 +152,7 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
prefixConfig: opts.PrefixConfig, prefixConfig: opts.PrefixConfig,
pressure: opts.Pressure, pressure: opts.Pressure,
sharedModels: opts.SharedModels, sharedModels: opts.SharedModels,
modelLoadCeiling: ceiling,
} }
} }
@@ -383,11 +405,19 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
// the request context. If staging were bound to it, the multi-GB upload // the request context. If staging were bound to it, the multi-GB upload
// aborts with "context canceled" mid-transfer and large models can never // aborts with "context canceled" mid-transfer and large models can never
// finish staging (the model-load outage). WithoutCancel keeps the request's // finish staging (the model-load outage). WithoutCancel keeps the request's
// values (prefix chain, etc.) but drops its cancellation/deadline. Each // values (prefix chain, etc.) but drops its cancellation/deadline.
// long step still has its own bound (the file stager's resume budget, //
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes // Detaching from the caller is necessary, but it must not be unbounded: the
// concurrent loaders across replicas. // load runs while holding the per-model advisory lock, and a worker that
loadCtx := context.WithoutCancel(ctx) // dies mid-install (its backend.install never replies) would otherwise pin
// that lock (and every other replica's request for the same model) until
// the NATS install deadline alone expires. Re-impose a single hard ceiling
// over the whole sequence so the lock is always released in bounded time,
// even if a sub-step wedges. Each long step still has its own (tighter)
// bound; this only backstops them. The per-model advisory lock below
// de-dupes concurrent loaders across replicas.
loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling)
defer cancelLoad()
loadModel := func(ctx context.Context) (*RouteResult, error) { loadModel := func(ctx context.Context) (*RouteResult, error) {
// Re-check after acquiring lock — another request may have loaded it // Re-check after acquiring lock — another request may have loaded it
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref) node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
@@ -916,7 +946,14 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
} }
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex) key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) { // DoChan rather than Do so this wait honors ctx cancellation. InstallBackend
// blocks for its full NATS deadline (15m by default) when a worker accepts
// the request but never replies (e.g. it died mid-install). Without ctx
// awareness the caller (holding the per-model advisory lock) would sit there
// the whole time; here a cancelled ctx (typically the model-load ceiling)
// frees the caller promptly. The shared install keeps running in the
// background and still coalesces other callers via singleflight.
resCh := r.installFlight.DoChan(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil) reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil { if err != nil {
return "", err return "", err
@@ -931,10 +968,15 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
} }
return addr, nil return addr, nil
}) })
if err != nil { select {
return "", err case <-ctx.Done():
return "", ctx.Err()
case res := <-resCh:
if res.Err != nil {
return "", res.Err
}
return res.Val.(string), nil
} }
return v.(string), nil
} }
func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend { func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend {

View File

@@ -493,6 +493,44 @@ var _ = Describe("SmartRouter", func() {
Expect(result.Node.ID).To(Equal("n3")) Expect(result.Node.ID).To(Equal("n3"))
}) })
}) })
Context("worker wedges mid-install (dead node holding the lock)", func() {
It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() {
// Simulate the production incident: the chosen worker accepts the
// backend.install but never replies (it died), so InstallBackend
// would otherwise block for its full NATS deadline (15m by
// default) while pinning the per-model advisory lock. Route must
// give up at the ceiling so the lock is released promptly.
reg.findAndLockErr = errors.New("not found")
reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"}
block := make(chan struct{})
defer close(block) // let the background install goroutine drain at test end
unloader.installHook = func() { <-block }
router := NewSmartRouter(reg, SmartRouterOptions{
Unloader: unloader,
ClientFactory: factory,
ModelLoadCeiling: 200 * time.Millisecond,
})
done := make(chan error, 1)
start := time.Now()
go func() {
defer GinkgoRecover()
_, err := router.Route(context.Background(), "wedged-model",
"models/wedged.gguf", "llama-cpp",
&pb.ModelOptions{Model: "models/wedged.gguf"}, false)
done <- err
}()
var routeErr error
Eventually(done, 5*time.Second).Should(Receive(&routeErr),
"Route must not block on a wedged install past the ceiling")
Expect(routeErr).To(HaveOccurred())
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second))
})
})
}) })
Describe("scheduleNewModel (mock-based, via Route)", func() { Describe("scheduleNewModel (mock-based, via Route)", func() {

View File

@@ -1716,7 +1716,7 @@
- use_jinja:true - use_jinja:true
parameters: parameters:
min_p: 0.15 min_p: 0.15
model: llama-cpp/models/LFM2.5-8B-A1B-GGUF/LFM2.5-8B-A1B-Q4_K_M.gguf model: llama-cpp/models/LFM2.5-8B-A1B-GGUF/LFM2.5-8B-A1B-Q8_0.gguf
repeat_penalty: 1.05 repeat_penalty: 1.05
temperature: 0.1 temperature: 0.1
top_k: 50 top_k: 50
@@ -1724,9 +1724,9 @@
template: template:
use_tokenizer_template: true use_tokenizer_template: true
files: files:
- filename: llama-cpp/models/LFM2.5-8B-A1B-GGUF/LFM2.5-8B-A1B-Q4_K_M.gguf - filename: llama-cpp/models/LFM2.5-8B-A1B-GGUF/LFM2.5-8B-A1B-Q8_0.gguf
uri: https://huggingface.co/LiquidAI/LFM2.5-8B-A1B-GGUF/resolve/main/LFM2.5-8B-A1B-Q4_K_M.gguf uri: https://huggingface.co/LiquidAI/LFM2.5-8B-A1B-GGUF/resolve/main/LFM2.5-8B-A1B-Q8_0.gguf
sha256: 4923ec14f06b968b74d663e5949867d2d9c3bf13a20b8be1a9f9af39989b2bb0 sha256: 33ab3b8ce6a964fb8ebac89360c9b3cf72c4fa418d5e4c0a94d46883124d5c02
- name: "qwopus3.5-9b-coder-mtp" - name: "qwopus3.5-9b-coder-mtp"
url: "github:mudler/LocalAI/gallery/virtual.yaml@master" url: "github:mudler/LocalAI/gallery/virtual.yaml@master"
urls: urls:

View File

@@ -58,6 +58,23 @@ func IsLiveTranscriptionUnsupported(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "unimplemented") return strings.Contains(strings.ToLower(err.Error()), "unimplemented")
} }
// IsUnimplemented reports whether err is a gRPC Unimplemented status — the
// signal a backend gives for an RPC it does not implement. The generated
// UnimplementedBackendServer stub returns exactly this for any RPC a backend
// (e.g. a Python or external backend) has not overridden, so callers can treat
// an optional RPC as a no-op rather than a failure. Prefers the typed status
// code and falls back to the message for paths that lose the status (e.g. errors
// wrapped across non-gRPC boundaries).
func IsUnimplemented(err error) bool {
if err == nil {
return false
}
if status.Code(err) == codes.Unimplemented {
return true
}
return strings.Contains(strings.ToLower(err.Error()), "unimplemented")
}
// StreamTranscriptionUnsupported returns the canonical error a backend returns // StreamTranscriptionUnsupported returns the canonical error a backend returns
// when it (or the loaded model) cannot serve the server-streaming // when it (or the loaded model) cannot serve the server-streaming
// AudioTranscriptionStream RPC. It carries codes.Unimplemented like the live // AudioTranscriptionStream RPC. It carries codes.Unimplemented like the live

View File

@@ -55,6 +55,18 @@ var _ = Describe("grpcerrors", func() {
Expect(grpcerrors.IsModelNotLoaded(err)).To(BeFalse()) Expect(grpcerrors.IsModelNotLoaded(err)).To(BeFalse())
}) })
DescribeTable("IsUnimplemented",
func(err error, want bool) {
Expect(grpcerrors.IsUnimplemented(err)).To(Equal(want))
},
Entry("nil", nil, false),
Entry("typed code", status.Error(codes.Unimplemented, "method Free not implemented"), true),
Entry("stale stub message (Unknown code)", errors.New("rpc error: code = Unimplemented desc = "), true),
Entry("unrelated error", errors.New("context deadline exceeded"), false),
Entry("unrelated grpc code", status.Error(codes.Unavailable, "connection refused"), false),
Entry("model not loaded is NOT unimplemented", grpcerrors.ModelNotLoaded("parakeet-cpp"), false),
)
It("StreamTranscriptionUnsupported carries Unimplemented and is not ModelNotLoaded", func() { It("StreamTranscriptionUnsupported carries Unimplemented and is not ModelNotLoaded", func() {
err := grpcerrors.StreamTranscriptionUnsupported("parakeet-cpp", "not a streaming model") err := grpcerrors.StreamTranscriptionUnsupported("parakeet-cpp", "not a streaming model")
Expect(status.Code(err)).To(Equal(codes.Unimplemented)) Expect(status.Code(err)).To(Equal(codes.Unimplemented))

View File

@@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
"github.com/mudler/LocalAI/pkg/signals" "github.com/mudler/LocalAI/pkg/signals"
process "github.com/mudler/go-processmanager" process "github.com/mudler/go-processmanager"
"github.com/mudler/xlog" "github.com/mudler/xlog"
@@ -52,10 +53,21 @@ func (ml *ModelLoader) deleteProcess(s string) error {
hook(s) hook(s)
} }
// Free GPU resources before stopping the process to ensure VRAM is released // Free GPU resources before stopping the process to ensure VRAM is released.
// Free is optional: backends that don't override it (the generated stub, many
// Python/external backends, or a federation proxy in distributed mode) return
// gRPC Unimplemented. That is expected, not a failure — VRAM is reclaimed when
// the process is stopped below, or by the remote unloader for remote backends —
// so don't surface it as an error.
xlog.Debug("Calling Free() to release GPU resources", "model", s) xlog.Debug("Calling Free() to release GPU resources", "model", s)
if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil { if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil {
xlog.Warn("Error freeing GPU resources", "error", err, "model", s) if grpcerrors.IsUnimplemented(err) {
xlog.Debug("Backend does not implement Free(); GPU release handled on process stop", "model", s)
} else {
// Now that the expected Unimplemented case is filtered out above, a
// remaining error is a genuine failure to release VRAM — surface it.
xlog.Error("Error freeing GPU resources", "error", err, "model", s)
}
} }
process := model.Process() process := model.Process()