Files
LocalAI/core/services/storage/s3.go
Ettore Di Giacinto 59108fbe32 feat: add distributed mode (#9124)
* feat: add distributed mode (experimental)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix data races, mutexes, transactions

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix events and tool stream in agent chat

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* use ginkgo

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cron): compute correctly time boundaries avoiding re-triggering

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not flood of healthy checks

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* do not list obvious backends as text backends

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* tests fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactoring and consolidation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop redundant healthcheck

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* enhancements, refactorings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-03-30 00:47:27 +02:00

165 lines
4.3 KiB
Go

package storage
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3Store implements ObjectStore backed by an S3-compatible service (AWS S3 or MinIO).
type S3Store struct {
client *s3.Client
bucket string
}
// S3Config holds S3 connection parameters.
type S3Config struct {
Endpoint string // e.g. "http://minio:9000" (empty for AWS S3)
Region string // e.g. "us-east-1"
Bucket string
AccessKeyID string
SecretAccessKey string
ForcePathStyle bool // required for MinIO
}
// NewS3Store creates a new S3-backed ObjectStore.
func NewS3Store(ctx context.Context, cfg S3Config) (*S3Store, error) {
var opts []func(*awsconfig.LoadOptions) error
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
} else {
opts = append(opts, awsconfig.WithRegion("us-east-1"))
}
if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
opts = append(opts, awsconfig.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("loading AWS config: %w", err)
}
var s3Opts []func(*s3.Options)
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = cfg.ForcePathStyle
})
}
client := s3.NewFromConfig(awsCfg, s3Opts...)
return &S3Store{
client: client,
bucket: cfg.Bucket,
}, nil
}
func (s *S3Store) Put(ctx context.Context, key string, r io.Reader) error {
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: r,
Metadata: map[string]string{
"created-at": time.Now().UTC().Format(time.RFC3339),
},
})
if err != nil {
return fmt.Errorf("putting object %s: %w", key, err)
}
return nil
}
func (s *S3Store) Get(ctx context.Context, key string) (io.ReadCloser, error) {
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, fmt.Errorf("getting object %s: %w", key, err)
}
return out.Body, nil
}
func (s *S3Store) Head(ctx context.Context, key string) (*ObjectMeta, error) {
out, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, fmt.Errorf("head %s: %w", key, err)
}
return &ObjectMeta{
Key: key,
Size: aws.ToInt64(out.ContentLength),
LastModified: aws.ToTime(out.LastModified),
Metadata: out.Metadata,
}, nil
}
func (s *S3Store) Exists(ctx context.Context, key string) (bool, error) {
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
var notFound *types.NotFound
if errors.As(err, &notFound) {
return false, nil
}
// Could also be a NoSuchKey wrapped differently; check the error message
var noSuchKey *types.NoSuchKey
if errors.As(err, &noSuchKey) {
return false, nil
}
return false, fmt.Errorf("checking existence of %s: %w", key, err)
}
return true, nil
}
func (s *S3Store) Delete(ctx context.Context, key string) error {
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("deleting object %s: %w", key, err)
}
return nil
}
// Close implements io.Closer. The underlying AWS S3 client does not hold
// persistent connections that need explicit cleanup, so this is a no-op.
func (s *S3Store) Close() error { return nil }
func (s *S3Store) List(ctx context.Context, prefix string) ([]string, error) {
var keys []string
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("listing objects with prefix %s: %w", prefix, err)
}
for _, obj := range page.Contents {
keys = append(keys, *obj.Key)
}
}
return keys, nil
}