mirror of
https://github.com/kopia/kopia.git
synced 2026-03-27 10:32:08 -04:00
* sharded: plumbed through blob.PutOptions * blob: removed blob.Storage.SetTime() method This was only used for `kopia repo sync-to` and got replaced with an equivalent blob.PutOptions.SetTime, which wehn set to non-zero time will attempt to set the modification time on a file. Since some providers don't support changing modification time, we are able to emulate it using per-blob metadata (on B2, Azure and GCS), sadly S3 is still unsupported, because it does not support returning metadata in list results. Also added PutOptions.GetTime, which when set to not nil, will populate the provided variable with actual time that got assigned to the blob. Added tests that verify that each provider supports GetTime and SetTime according to this spec. * blob: additional test coverage for filesystem storage * blob: added PutBlobAndGetMetadata() helper and used where appropriate * fixed test failures * pr feedback * Update repo/blob/azure/azure_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * Update repo/blob/filesystem/filesystem_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * Update repo/blob/filesystem/filesystem_storage.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * blobtesting: fixed object_locking_map.go * blobtesting: removed SetTime from ObjectLockingMap Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
293 lines
6.6 KiB
Go
293 lines
6.6 KiB
Go
package blobtesting
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/kopia/kopia/internal/clock"
|
|
"github.com/kopia/kopia/internal/gather"
|
|
"github.com/kopia/kopia/internal/iocopy"
|
|
"github.com/kopia/kopia/repo/blob"
|
|
"github.com/kopia/kopia/repo/logging"
|
|
)
|
|
|
|
var eventuallyConsistentLog = logging.Module("eventually-consistent")
|
|
|
|
const ecCacheDuration = 5 * time.Second
|
|
|
|
// ecFrontendCache is an instance of cache, which simulates cloud storage frontend
|
|
// with its own in-memory state. This causes eventual consistency when a client uses
|
|
// different instances of the cache for read and writes.
|
|
type ecFrontendCache struct {
|
|
mu sync.Mutex
|
|
cachedEntries map[blob.ID]*ecCacheEntry
|
|
}
|
|
|
|
func (c *ecFrontendCache) get(id blob.ID) *ecCacheEntry {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
e := c.cachedEntries[id]
|
|
if e != nil && !e.isValid() {
|
|
c.sweepLocked()
|
|
return nil
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (c *ecFrontendCache) put(id blob.ID, data []byte) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.sweepLocked()
|
|
|
|
if c.cachedEntries == nil {
|
|
c.cachedEntries = map[blob.ID]*ecCacheEntry{}
|
|
}
|
|
|
|
if data != nil {
|
|
// clone data before storage
|
|
data = append([]byte(nil), data...)
|
|
}
|
|
|
|
c.cachedEntries[id] = &ecCacheEntry{
|
|
accessTime: clock.Now(),
|
|
data: data,
|
|
}
|
|
}
|
|
|
|
func (c *ecFrontendCache) sweepLocked() {
|
|
for k, v := range c.cachedEntries {
|
|
if !v.isValid() {
|
|
delete(c.cachedEntries, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
type ecCacheEntry struct {
|
|
data []byte
|
|
|
|
accessTime time.Time
|
|
}
|
|
|
|
func (e *ecCacheEntry) isValid() bool {
|
|
return clock.Now().Sub(e.accessTime) < ecCacheDuration
|
|
}
|
|
|
|
type eventuallyConsistentStorage struct {
|
|
mu sync.Mutex
|
|
|
|
recentlyDeleted sync.Map
|
|
listSettleTime time.Duration
|
|
|
|
caches []*ecFrontendCache
|
|
realStorage blob.Storage
|
|
timeNow func() time.Time
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) randomFrontendCache() *ecFrontendCache {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
n := rand.Intn(len(s.caches))
|
|
|
|
if s.caches[n] == nil {
|
|
s.caches[n] = &ecFrontendCache{}
|
|
}
|
|
|
|
return s.caches[n]
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error {
|
|
// don't bother caching partial reads
|
|
if length >= 0 {
|
|
return s.realStorage.GetBlob(ctx, id, offset, length, output)
|
|
}
|
|
|
|
c := s.randomFrontendCache()
|
|
|
|
// see if the frontend has the blob cached
|
|
e := c.get(id)
|
|
if e != nil {
|
|
if e.data == nil {
|
|
return blob.ErrBlobNotFound
|
|
}
|
|
|
|
if _, err := output.Write(e.data); err != nil {
|
|
return errors.Wrap(err, "error appending to output")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var buf gather.WriteBuffer
|
|
|
|
// fetch from the underlying storage.
|
|
err := s.realStorage.GetBlob(ctx, id, offset, length, &buf)
|
|
if err != nil {
|
|
if errors.Is(err, blob.ErrBlobNotFound) {
|
|
c.put(id, nil)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
c.put(id, buf.ToByteSlice())
|
|
|
|
return iocopy.JustCopy(output, buf.Bytes().Reader())
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) GetMetadata(ctx context.Context, id blob.ID) (blob.Metadata, error) {
|
|
c := s.randomFrontendCache()
|
|
|
|
// see if the frontend has cached blob deleted/not exists
|
|
e := c.get(id)
|
|
if e != nil {
|
|
if e.data == nil {
|
|
return blob.Metadata{}, blob.ErrBlobNotFound
|
|
}
|
|
}
|
|
|
|
// fetch from the underlying storage.
|
|
return s.realStorage.GetMetadata(ctx, id)
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) PutBlob(ctx context.Context, id blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
|
if err := s.realStorage.PutBlob(ctx, id, data, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
d, err := io.ReadAll(data.Reader())
|
|
if err != nil {
|
|
return errors.Wrap(err, "invalid data")
|
|
}
|
|
|
|
// add to frontend cache
|
|
s.randomFrontendCache().put(id, d)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
|
|
s.randomFrontendCache().put(id, nil)
|
|
|
|
// capture metadata before deleting
|
|
md, err := s.realStorage.GetMetadata(ctx, id)
|
|
|
|
if errors.Is(err, blob.ErrBlobNotFound) {
|
|
return blob.ErrBlobNotFound
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.realStorage.DeleteBlob(ctx, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
md.Timestamp = s.timeNow()
|
|
s.recentlyDeleted.Store(id, md)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) shouldApplyInconsistency(ctx context.Context, age time.Duration, desc string) bool {
|
|
if age < 0 {
|
|
age = -age
|
|
}
|
|
|
|
if age >= s.listSettleTime {
|
|
return false
|
|
}
|
|
|
|
x := age.Seconds() / s.listSettleTime.Seconds() // [0..1)
|
|
|
|
// y=1-(x^0.3) is:
|
|
// about 50% probability of inconsistency after 10% of listSettleTime
|
|
// about 25% probability of inconsistency after 40% of listSettleTime
|
|
// about 10% probability of inconsistency after 67% of listSettleTime
|
|
// about 1% probability of inconsistency after 95% of listSettleTime
|
|
|
|
const power = 0.3
|
|
|
|
prob := 1 - math.Pow(x, power)
|
|
|
|
if rand.Float64() < prob {
|
|
eventuallyConsistentLog(ctx).Debugf("applying inconsistency %v (probability %v)", desc, prob)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error {
|
|
now := s.timeNow()
|
|
|
|
if err := s.realStorage.ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
|
|
if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "hide recently created "+string(bm.BlobID)) {
|
|
return nil
|
|
}
|
|
|
|
return callback(bm)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
var resultErr error
|
|
|
|
// process recently deleted items and resurrect them with some probability
|
|
s.recentlyDeleted.Range(func(key, value interface{}) bool {
|
|
blobID := key.(blob.ID)
|
|
if !strings.HasPrefix(string(blobID), string(prefix)) {
|
|
return true
|
|
}
|
|
|
|
bm := value.(blob.Metadata)
|
|
if age := now.Sub(bm.Timestamp); s.shouldApplyInconsistency(ctx, age, "resurrect recently deleted "+string(bm.BlobID)) {
|
|
if resultErr = callback(bm); resultErr != nil {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
return resultErr
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) Close(ctx context.Context) error {
|
|
return s.realStorage.Close(ctx)
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) ConnectionInfo() blob.ConnectionInfo {
|
|
return s.realStorage.ConnectionInfo()
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) DisplayName() string {
|
|
return s.realStorage.DisplayName()
|
|
}
|
|
|
|
func (s *eventuallyConsistentStorage) FlushCaches(ctx context.Context) error {
|
|
return s.realStorage.FlushCaches(ctx)
|
|
}
|
|
|
|
// NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top
|
|
// of provided storage.
|
|
func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage {
|
|
return &eventuallyConsistentStorage{
|
|
realStorage: st,
|
|
caches: make([]*ecFrontendCache, 4),
|
|
listSettleTime: listSettleTime,
|
|
timeNow: timeNow,
|
|
}
|
|
}
|