mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-23 22:18:14 -05:00
chore(blobs): generalised blob storage
This commit is contained in:
@@ -26,9 +26,11 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/syncthing/syncthing/internal/blob"
|
||||
"github.com/syncthing/syncthing/internal/blob/azureblob"
|
||||
"github.com/syncthing/syncthing/internal/blob/s3"
|
||||
"github.com/syncthing/syncthing/lib/build"
|
||||
"github.com/syncthing/syncthing/lib/geoip"
|
||||
"github.com/syncthing/syncthing/lib/s3"
|
||||
"github.com/syncthing/syncthing/lib/ur/contract"
|
||||
)
|
||||
|
||||
@@ -40,11 +42,15 @@ type CLI struct {
|
||||
DumpFile string `env:"UR_DUMP_FILE" default:"reports.jsons.gz"`
|
||||
DumpInterval time.Duration `env:"UR_DUMP_INTERVAL" default:"5m"`
|
||||
|
||||
S3Endpoint string `name:"s3-endpoint" hidden:"true" env:"UR_S3_ENDPOINT"`
|
||||
S3Region string `name:"s3-region" hidden:"true" env:"UR_S3_REGION"`
|
||||
S3Bucket string `name:"s3-bucket" hidden:"true" env:"UR_S3_BUCKET"`
|
||||
S3AccessKeyID string `name:"s3-access-key-id" hidden:"true" env:"UR_S3_ACCESS_KEY_ID"`
|
||||
S3SecretKey string `name:"s3-secret-key" hidden:"true" env:"UR_S3_SECRET_KEY"`
|
||||
S3Endpoint string `name:"s3-endpoint" env:"UR_S3_ENDPOINT"`
|
||||
S3Region string `name:"s3-region" env:"UR_S3_REGION"`
|
||||
S3Bucket string `name:"s3-bucket" env:"UR_S3_BUCKET"`
|
||||
S3AccessKeyID string `name:"s3-access-key-id" env:"UR_S3_ACCESS_KEY_ID"`
|
||||
S3SecretKey string `name:"s3-secret-key" env:"UR_S3_SECRET_KEY"`
|
||||
|
||||
AzureBlobAccount string `name:"azure-blob-account" env:"UR_AZUREBLOB_ACCOUNT"`
|
||||
AzureBlobKey string `name:"azure-blob-key" env:"UR_AZUREBLOB_KEY"`
|
||||
AzureBlobContainer string `name:"azure-blob-container" env:"UR_AZUREBLOB_CONTAINER"`
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -120,19 +126,25 @@ func (cli *CLI) Run() error {
|
||||
go geo.Serve(context.TODO())
|
||||
}
|
||||
|
||||
// s3
|
||||
// Blob storage
|
||||
|
||||
var s3sess *s3.Session
|
||||
var blobs blob.Store
|
||||
if cli.S3Endpoint != "" {
|
||||
s3sess, err = s3.NewSession(cli.S3Endpoint, cli.S3Region, cli.S3Bucket, cli.S3AccessKeyID, cli.S3SecretKey)
|
||||
blobs, err = s3.NewSession(cli.S3Endpoint, cli.S3Region, cli.S3Bucket, cli.S3AccessKeyID, cli.S3SecretKey)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create S3 session", "error", err)
|
||||
return err
|
||||
}
|
||||
} else if cli.AzureBlobAccount != "" {
|
||||
blobs, err = azureblob.NewBlobStore(cli.AzureBlobAccount, cli.AzureBlobKey, cli.AzureBlobContainer)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create Azure blob store", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := os.Stat(cli.DumpFile); err != nil && s3sess != nil {
|
||||
if err := cli.downloadDumpFile(s3sess); err != nil {
|
||||
if _, err := os.Stat(cli.DumpFile); err != nil && blobs != nil {
|
||||
if err := cli.downloadDumpFile(blobs); err != nil {
|
||||
slog.Error("Failed to download dump file", "error", err)
|
||||
}
|
||||
}
|
||||
@@ -154,7 +166,7 @@ func (cli *CLI) Run() error {
|
||||
|
||||
go func() {
|
||||
for range time.Tick(cli.DumpInterval) {
|
||||
if err := cli.saveDumpFile(srv, s3sess); err != nil {
|
||||
if err := cli.saveDumpFile(srv, blobs); err != nil {
|
||||
slog.Error("Failed to write dump file", "error", err)
|
||||
}
|
||||
}
|
||||
@@ -193,8 +205,8 @@ func (cli *CLI) Run() error {
|
||||
return metricsSrv.Serve(urListener)
|
||||
}
|
||||
|
||||
func (cli *CLI) downloadDumpFile(s3sess *s3.Session) error {
|
||||
latestKey, err := s3sess.LatestKey()
|
||||
func (cli *CLI) downloadDumpFile(blobs blob.Store) error {
|
||||
latestKey, err := blobs.LatestKey(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("list latest S3 key: %w", err)
|
||||
}
|
||||
@@ -202,7 +214,7 @@ func (cli *CLI) downloadDumpFile(s3sess *s3.Session) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("create dump file: %w", err)
|
||||
}
|
||||
if err := s3sess.Download(fd, latestKey); err != nil {
|
||||
if err := blobs.Download(context.Background(), latestKey, fd); err != nil {
|
||||
_ = fd.Close()
|
||||
return fmt.Errorf("download dump file: %w", err)
|
||||
}
|
||||
@@ -213,7 +225,7 @@ func (cli *CLI) downloadDumpFile(s3sess *s3.Session) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *CLI) saveDumpFile(srv *server, s3sess *s3.Session) error {
|
||||
func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
|
||||
fd, err := os.Create(cli.DumpFile + ".tmp")
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating dump file: %w", err)
|
||||
@@ -234,13 +246,13 @@ func (cli *CLI) saveDumpFile(srv *server, s3sess *s3.Session) error {
|
||||
}
|
||||
slog.Info("Dump file saved")
|
||||
|
||||
if s3sess != nil {
|
||||
if blobs != nil {
|
||||
key := fmt.Sprintf("reports-%s.jsons.gz", time.Now().UTC().Format("2006-01-02"))
|
||||
fd, err := os.Open(cli.DumpFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening dump file: %w", err)
|
||||
}
|
||||
if err := s3sess.Upload(fd, key); err != nil {
|
||||
if err := blobs.Upload(context.Background(), key, fd); err != nil {
|
||||
return fmt.Errorf("uploading dump file: %w", err)
|
||||
}
|
||||
_ = fd.Close()
|
||||
|
||||
@@ -24,11 +24,11 @@ import (
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/blob"
|
||||
"github.com/syncthing/syncthing/internal/gen/discosrv"
|
||||
"github.com/syncthing/syncthing/internal/protoutil"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/rand"
|
||||
"github.com/syncthing/syncthing/lib/s3"
|
||||
)
|
||||
|
||||
type clock interface {
|
||||
@@ -51,12 +51,12 @@ type inMemoryStore struct {
|
||||
m *xsync.MapOf[protocol.DeviceID, *discosrv.DatabaseRecord]
|
||||
dir string
|
||||
flushInterval time.Duration
|
||||
s3 *s3.Session
|
||||
blobs blob.Store
|
||||
objKey string
|
||||
clock clock
|
||||
}
|
||||
|
||||
func newInMemoryStore(dir string, flushInterval time.Duration, s3sess *s3.Session) *inMemoryStore {
|
||||
func newInMemoryStore(dir string, flushInterval time.Duration, blobs blob.Store) *inMemoryStore {
|
||||
hn, err := os.Hostname()
|
||||
if err != nil {
|
||||
hn = rand.String(8)
|
||||
@@ -65,16 +65,16 @@ func newInMemoryStore(dir string, flushInterval time.Duration, s3sess *s3.Sessio
|
||||
m: xsync.NewMapOf[protocol.DeviceID, *discosrv.DatabaseRecord](),
|
||||
dir: dir,
|
||||
flushInterval: flushInterval,
|
||||
s3: s3sess,
|
||||
blobs: blobs,
|
||||
objKey: hn + ".db",
|
||||
clock: defaultClock{},
|
||||
}
|
||||
nr, err := s.read()
|
||||
if os.IsNotExist(err) && s3sess != nil {
|
||||
// Try to read from AWS
|
||||
latestKey, cerr := s3sess.LatestKey()
|
||||
if os.IsNotExist(err) && blobs != nil {
|
||||
// Try to read from blob storage
|
||||
latestKey, cerr := blobs.LatestKey(context.Background())
|
||||
if cerr != nil {
|
||||
log.Println("Error finding database from S3:", cerr)
|
||||
log.Println("Error finding database from blob storage:", cerr)
|
||||
return s
|
||||
}
|
||||
fd, cerr := os.Create(path.Join(s.dir, "records.db"))
|
||||
@@ -82,8 +82,8 @@ func newInMemoryStore(dir string, flushInterval time.Duration, s3sess *s3.Sessio
|
||||
log.Println("Error creating database file:", cerr)
|
||||
return s
|
||||
}
|
||||
if cerr := s3sess.Download(fd, latestKey); cerr != nil {
|
||||
log.Printf("Error downloading database from S3: %v", cerr)
|
||||
if cerr := blobs.Download(context.Background(), latestKey, fd); cerr != nil {
|
||||
log.Printf("Error downloading database from blob storage: %v", cerr)
|
||||
}
|
||||
_ = fd.Close()
|
||||
nr, err = s.read()
|
||||
@@ -310,16 +310,16 @@ func (s *inMemoryStore) write() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Upload to S3
|
||||
if s.s3 != nil {
|
||||
// Upload to blob storage
|
||||
if s.blobs != nil {
|
||||
fd, err = os.Open(dbf)
|
||||
if err != nil {
|
||||
log.Printf("Error uploading database to S3: %v", err)
|
||||
log.Printf("Error uploading database to blob storage: %v", err)
|
||||
return nil
|
||||
}
|
||||
defer fd.Close()
|
||||
if err := s.s3.Upload(fd, s.objKey); err != nil {
|
||||
log.Printf("Error uploading database to S3: %v", err)
|
||||
if err := s.blobs.Upload(context.Background(), s.objKey, fd); err != nil {
|
||||
log.Printf("Error uploading database to blob storage: %v", err)
|
||||
}
|
||||
log.Println("Finished uploading database")
|
||||
}
|
||||
|
||||
@@ -21,11 +21,13 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/thejerf/suture/v4"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/blob"
|
||||
"github.com/syncthing/syncthing/internal/blob/azureblob"
|
||||
"github.com/syncthing/syncthing/internal/blob/s3"
|
||||
_ "github.com/syncthing/syncthing/lib/automaxprocs"
|
||||
"github.com/syncthing/syncthing/lib/build"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/rand"
|
||||
"github.com/syncthing/syncthing/lib/s3"
|
||||
"github.com/syncthing/syncthing/lib/tlsutil"
|
||||
)
|
||||
|
||||
@@ -74,6 +76,10 @@ type CLI struct {
|
||||
DBS3AccessKeyID string `name:"db-s3-access-key-id" group:"Database (S3 backup)" hidden:"true" help:"S3 access key ID for database" env:"DISCOVERY_DB_S3_ACCESS_KEY_ID"`
|
||||
DBS3SecretKey string `name:"db-s3-secret-key" group:"Database (S3 backup)" hidden:"true" help:"S3 secret key for database" env:"DISCOVERY_DB_S3_SECRET_KEY"`
|
||||
|
||||
DBAzureBlobAccount string `name:"db-azure-blob-account" env:"DISCOVERY_DB_AZUREBLOB_ACCOUNT"`
|
||||
DBAzureBlobKey string `name:"db-azure-blob-key" env:"DISCOVERY_DB_AZUREBLOB_KEY"`
|
||||
DBAzureBlobContainer string `name:"db-azure-blob-container" env:"DISCOVERY_DB_AZUREBLOB_CONTAINER"`
|
||||
|
||||
AMQPAddress string `group:"AMQP replication" hidden:"true" help:"Address to AMQP broker" env:"DISCOVERY_AMQP_ADDRESS"`
|
||||
|
||||
Debug bool `short:"d" help:"Print debug output" env:"DISCOVERY_DEBUG"`
|
||||
@@ -117,18 +123,20 @@ func main() {
|
||||
Timeout: 2 * time.Minute,
|
||||
})
|
||||
|
||||
// If configured, use S3 for database backups.
|
||||
var s3c *s3.Session
|
||||
// If configured, use blob storage for database backups.
|
||||
var blobs blob.Store
|
||||
var err error
|
||||
if cli.DBS3Endpoint != "" {
|
||||
var err error
|
||||
s3c, err = s3.NewSession(cli.DBS3Endpoint, cli.DBS3Region, cli.DBS3Bucket, cli.DBS3AccessKeyID, cli.DBS3SecretKey)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create S3 session: %v", err)
|
||||
}
|
||||
blobs, err = s3.NewSession(cli.DBS3Endpoint, cli.DBS3Region, cli.DBS3Bucket, cli.DBS3AccessKeyID, cli.DBS3SecretKey)
|
||||
} else if cli.DBAzureBlobAccount != "" {
|
||||
blobs, err = azureblob.NewBlobStore(cli.DBAzureBlobAccount, cli.DBAzureBlobKey, cli.DBAzureBlobContainer)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create blob store: %v", err)
|
||||
}
|
||||
|
||||
// Start the database.
|
||||
db := newInMemoryStore(cli.DBDir, cli.DBFlushInterval, s3c)
|
||||
db := newInMemoryStore(cli.DBDir, cli.DBFlushInterval, blobs)
|
||||
main.Add(db)
|
||||
|
||||
// If we have an AMQP broker for replication, start that
|
||||
|
||||
3
go.mod
3
go.mod
@@ -4,6 +4,7 @@ go 1.23.0
|
||||
|
||||
require (
|
||||
github.com/AudriusButkevicius/recli v0.0.7-0.20220911121932-d000ce8fbf0f
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0
|
||||
github.com/alecthomas/kong v1.10.0
|
||||
github.com/aws/aws-sdk-go v1.55.6
|
||||
github.com/calmh/incontainer v1.0.0
|
||||
@@ -51,6 +52,8 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
|
||||
20
go.sum
20
go.sum
@@ -1,7 +1,19 @@
|
||||
github.com/AudriusButkevicius/recli v0.0.7-0.20220911121932-d000ce8fbf0f h1:GmH5lT+moM7PbAJFBq57nH9WJ+wRnBXr/tyaYWbSAx8=
|
||||
github.com/AudriusButkevicius/recli v0.0.7-0.20220911121932-d000ce8fbf0f/go.mod h1:Nhfib1j/VFnLrXL9cHgA+/n2O6P5THuWelOnbfPNd78=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 h1:UXT0o77lXQrikd1kgwIPQOUect7EoR/+sbP4wQKdzxM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0/go.mod h1:cTvi54pg19DoT07ekoeMgE/taAwNtCShVeZqA+Iv2xI=
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 h1:kYRSnvJju5gYVyhkij+RTJ/VR6QIUaCfWeaFm2ycsjQ=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
|
||||
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||
github.com/alecthomas/assert/v2 v2.11.0 h1:2Q9r3ki8+JYXvGsDyBXwH3LcJ+WK5D0gc5E8vS6K3D0=
|
||||
github.com/alecthomas/assert/v2 v2.11.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
|
||||
@@ -65,6 +77,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
|
||||
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
@@ -176,6 +190,8 @@ github.com/oschwald/maxminddb-golang v1.13.1 h1:G3wwjdN9JmIK2o/ermkHM+98oX5fS+k5
|
||||
github.com/oschwald/maxminddb-golang v1.13.1/go.mod h1:K4pgV9N/GcK694KSTmVSDTODk4IsCNThNdTmnaBZ/F8=
|
||||
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
||||
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
@@ -205,8 +221,8 @@ github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8A
|
||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab h1:ZjX6I48eZSFetPb41dHudEyVr5v953N15TsNZXlkcWY=
|
||||
github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab/go.mod h1:/PfPXh0EntGc3QAAyUaviy4S9tzy4Zp0e2ilq4voC6E=
|
||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8=
|
||||
|
||||
74
internal/blob/azureblob/azureblob.go
Normal file
74
internal/blob/azureblob/azureblob.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package azureblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
stblob "github.com/syncthing/syncthing/internal/blob"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
)
|
||||
|
||||
var _ stblob.Store = (*BlobStore)(nil)
|
||||
|
||||
type BlobStore struct {
|
||||
client *azblob.Client
|
||||
container string
|
||||
}
|
||||
|
||||
func NewBlobStore(accountName, accountKey, containerName string) (*BlobStore, error) {
|
||||
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
url := "https://" + accountName + ".blob.core.windows.net/"
|
||||
sc, err := azblob.NewClientWithSharedKeyCredential(url, credential, &azblob.ClientOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// This errors when the container already exists, which we ignore.
|
||||
_, _ = sc.CreateContainer(context.Background(), containerName, &container.CreateOptions{})
|
||||
return &BlobStore{
|
||||
client: sc,
|
||||
container: containerName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *BlobStore) Upload(ctx context.Context, key string, data io.Reader) error {
|
||||
_, err := a.client.UploadStream(ctx, a.container, key, data, &blockblob.UploadStreamOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *BlobStore) Download(ctx context.Context, key string, w stblob.Writer) error {
|
||||
resp, err := a.client.DownloadStream(ctx, a.container, key, &blob.DownloadStreamOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
_, err = io.Copy(w, resp.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *BlobStore) LatestKey(ctx context.Context) (string, error) {
|
||||
opts := &azblob.ListBlobsFlatOptions{}
|
||||
pager := a.client.NewListBlobsFlatPager(a.container, opts)
|
||||
var latest string
|
||||
var lastModified time.Time
|
||||
for pager.More() {
|
||||
page, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, blob := range page.Segment.BlobItems {
|
||||
if latest == "" || blob.Properties.LastModified.After(lastModified) {
|
||||
latest = *blob.Name
|
||||
lastModified = *blob.Properties.LastModified
|
||||
}
|
||||
}
|
||||
}
|
||||
return latest, nil
|
||||
}
|
||||
17
internal/blob/interface.go
Normal file
17
internal/blob/interface.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package blob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
Upload(ctx context.Context, key string, r io.Reader) error
|
||||
Download(ctx context.Context, key string, w Writer) error
|
||||
LatestKey(ctx context.Context) (string, error)
|
||||
}
|
||||
|
||||
type Writer interface {
|
||||
io.Writer
|
||||
io.WriterAt
|
||||
}
|
||||
@@ -7,6 +7,7 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
@@ -15,8 +16,11 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/syncthing/syncthing/internal/blob"
|
||||
)
|
||||
|
||||
var _ blob.Store = (*Session)(nil)
|
||||
|
||||
type Session struct {
|
||||
bucket string
|
||||
s3sess *session.Session
|
||||
@@ -40,7 +44,7 @@ func NewSession(endpoint, region, bucket, accessKeyID, secretKey string) (*Sessi
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Session) Upload(r io.Reader, key string) error {
|
||||
func (s *Session) Upload(_ context.Context, key string, r io.Reader) error {
|
||||
uploader := s3manager.NewUploader(s.s3sess)
|
||||
_, err := uploader.Upload(&s3manager.UploadInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
@@ -50,7 +54,31 @@ func (s *Session) Upload(r io.Reader, key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) List(fn func(*Object) bool) error {
|
||||
func (s *Session) Download(_ context.Context, key string, w blob.Writer) error {
|
||||
downloader := s3manager.NewDownloader(s.s3sess)
|
||||
_, err := downloader.Download(w, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) LatestKey(_ context.Context) (string, error) {
|
||||
var latestKey string
|
||||
var lastModified time.Time
|
||||
if err := s.list(func(obj *Object) bool {
|
||||
if latestKey == "" || obj.LastModified.After(lastModified) {
|
||||
latestKey = *obj.Key
|
||||
lastModified = *obj.LastModified
|
||||
}
|
||||
return true
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return latestKey, nil
|
||||
}
|
||||
|
||||
func (s *Session) list(fn func(*Object) bool) error {
|
||||
svc := s3.New(s.s3sess)
|
||||
|
||||
opts := &s3.ListObjectsV2Input{
|
||||
@@ -76,27 +104,3 @@ func (s *Session) List(fn func(*Object) bool) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) LatestKey() (string, error) {
|
||||
var latestKey string
|
||||
var lastModified time.Time
|
||||
if err := s.List(func(obj *Object) bool {
|
||||
if latestKey == "" || obj.LastModified.After(lastModified) {
|
||||
latestKey = *obj.Key
|
||||
lastModified = *obj.LastModified
|
||||
}
|
||||
return true
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return latestKey, nil
|
||||
}
|
||||
|
||||
func (s *Session) Download(w io.WriterAt, key string) error {
|
||||
downloader := s3manager.NewDownloader(s.s3sess)
|
||||
_, err := downloader.Download(w, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user