mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
102 lines
2.5 KiB
Go
102 lines
2.5 KiB
Go
package storage
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// StartEphemeralCleanup starts a background goroutine that periodically
|
|
// deletes old ephemeral keys from object storage. Ephemeral keys are
|
|
// used for per-request file transfers and should be cleaned up after
|
|
// a TTL to protect against leaked keys from crashes.
|
|
func StartEphemeralCleanup(ctx context.Context, fm *FileManager, ttl time.Duration, interval time.Duration) {
|
|
if fm == nil || !fm.IsConfigured() {
|
|
return
|
|
}
|
|
|
|
ttl = cmp.Or(ttl, 1*time.Hour)
|
|
interval = cmp.Or(interval, 15*time.Minute)
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
cleanEphemeralKeys(ctx, fm, ttl)
|
|
}
|
|
}
|
|
}()
|
|
|
|
xlog.Info("Ephemeral file cleanup started", "ttl", ttl, "interval", interval)
|
|
}
|
|
|
|
// cleanEphemeralKeys lists and deletes ephemeral keys older than TTL.
|
|
// It uses object store metadata (Head) to determine age, so any instance
|
|
// in a multi-node deployment can clean up orphaned keys — even those
|
|
// uploaded by a different (or crashed) instance.
|
|
func cleanEphemeralKeys(ctx context.Context, fm *FileManager, ttl time.Duration) {
|
|
keys, err := fm.List(ctx, "ephemeral/")
|
|
if err != nil {
|
|
xlog.Warn("Ephemeral cleanup: failed to list keys", "error", err)
|
|
return
|
|
}
|
|
|
|
if len(keys) == 0 {
|
|
return
|
|
}
|
|
|
|
cutoff := time.Now().Add(-ttl)
|
|
deleted := 0
|
|
|
|
for _, key := range keys {
|
|
if !strings.HasPrefix(key, "ephemeral/") {
|
|
continue
|
|
}
|
|
|
|
created, err := objectCreatedAt(ctx, fm, key)
|
|
if err != nil {
|
|
xlog.Warn("Ephemeral cleanup: failed to head object", "key", key, "error", err)
|
|
continue
|
|
}
|
|
|
|
if created.Before(cutoff) {
|
|
if err := fm.Delete(ctx, key); err != nil {
|
|
xlog.Warn("Ephemeral cleanup: failed to delete", "key", key, "error", err)
|
|
} else {
|
|
deleted++
|
|
}
|
|
}
|
|
}
|
|
|
|
if deleted > 0 {
|
|
xlog.Info("Ephemeral cleanup: deleted old keys", "count", deleted)
|
|
}
|
|
}
|
|
|
|
// objectCreatedAt returns the creation time of an object. It first checks
|
|
// the "created-at" user metadata (set by S3Store.Put), and falls back to
|
|
// the object's LastModified timestamp if the metadata is absent (e.g. for
|
|
// objects written before this change or by external tools).
|
|
func objectCreatedAt(ctx context.Context, fm *FileManager, key string) (time.Time, error) {
|
|
meta, err := fm.Head(ctx, key)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
|
|
if v, ok := meta.Metadata["created-at"]; ok {
|
|
if t, err := time.Parse(time.RFC3339, v); err == nil {
|
|
return t, nil
|
|
}
|
|
}
|
|
|
|
return meta.LastModified, nil
|
|
}
|