mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-01 05:36:49 -04:00
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
165 lines
4.3 KiB
Go
165 lines
4.3 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
// S3Store implements ObjectStore backed by an S3-compatible service (AWS S3 or MinIO).
|
|
type S3Store struct {
|
|
client *s3.Client
|
|
bucket string
|
|
}
|
|
|
|
// S3Config holds S3 connection parameters.
|
|
type S3Config struct {
|
|
Endpoint string // e.g. "http://minio:9000" (empty for AWS S3)
|
|
Region string // e.g. "us-east-1"
|
|
Bucket string
|
|
AccessKeyID string
|
|
SecretAccessKey string
|
|
ForcePathStyle bool // required for MinIO
|
|
}
|
|
|
|
// NewS3Store creates a new S3-backed ObjectStore.
|
|
func NewS3Store(ctx context.Context, cfg S3Config) (*S3Store, error) {
|
|
var opts []func(*awsconfig.LoadOptions) error
|
|
|
|
if cfg.Region != "" {
|
|
opts = append(opts, awsconfig.WithRegion(cfg.Region))
|
|
} else {
|
|
opts = append(opts, awsconfig.WithRegion("us-east-1"))
|
|
}
|
|
|
|
if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
|
|
opts = append(opts, awsconfig.WithCredentialsProvider(
|
|
credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
|
|
))
|
|
}
|
|
|
|
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading AWS config: %w", err)
|
|
}
|
|
|
|
var s3Opts []func(*s3.Options)
|
|
if cfg.Endpoint != "" {
|
|
s3Opts = append(s3Opts, func(o *s3.Options) {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
o.UsePathStyle = cfg.ForcePathStyle
|
|
})
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, s3Opts...)
|
|
|
|
return &S3Store{
|
|
client: client,
|
|
bucket: cfg.Bucket,
|
|
}, nil
|
|
}
|
|
|
|
func (s *S3Store) Put(ctx context.Context, key string, r io.Reader) error {
|
|
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: r,
|
|
Metadata: map[string]string{
|
|
"created-at": time.Now().UTC().Format(time.RFC3339),
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("putting object %s: %w", key, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *S3Store) Get(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting object %s: %w", key, err)
|
|
}
|
|
return out.Body, nil
|
|
}
|
|
|
|
func (s *S3Store) Head(ctx context.Context, key string) (*ObjectMeta, error) {
|
|
out, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("head %s: %w", key, err)
|
|
}
|
|
return &ObjectMeta{
|
|
Key: key,
|
|
Size: aws.ToInt64(out.ContentLength),
|
|
LastModified: aws.ToTime(out.LastModified),
|
|
Metadata: out.Metadata,
|
|
}, nil
|
|
}
|
|
|
|
func (s *S3Store) Exists(ctx context.Context, key string) (bool, error) {
|
|
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
var notFound *types.NotFound
|
|
if errors.As(err, ¬Found) {
|
|
return false, nil
|
|
}
|
|
// Could also be a NoSuchKey wrapped differently; check the error message
|
|
var noSuchKey *types.NoSuchKey
|
|
if errors.As(err, &noSuchKey) {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("checking existence of %s: %w", key, err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (s *S3Store) Delete(ctx context.Context, key string) error {
|
|
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("deleting object %s: %w", key, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close implements io.Closer. The underlying AWS S3 client does not hold
|
|
// persistent connections that need explicit cleanup, so this is a no-op.
|
|
func (s *S3Store) Close() error { return nil }
|
|
|
|
func (s *S3Store) List(ctx context.Context, prefix string) ([]string, error) {
|
|
var keys []string
|
|
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(s.bucket),
|
|
Prefix: aws.String(prefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing objects with prefix %s: %w", prefix, err)
|
|
}
|
|
for _, obj := range page.Contents {
|
|
keys = append(keys, *obj.Key)
|
|
}
|
|
}
|
|
return keys, nil
|
|
}
|