Files
kopia/repo/content/committed_content_index_disk_cache.go
Julio López e1dce8a049 fix(repository): sync index blob file before closing (#5292)
- Sync index blob file before closing
- Ensure file is closed even when there are errors
- Remove temp file on error
- Make writeTempFileAtomic testable
- Test writeTempFile atomic
2026-04-09 10:26:02 -07:00

150 lines
3.7 KiB
Go

package content
import (
"context"
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/blobparam"
"github.com/kopia/kopia/internal/contentlog"
"github.com/kopia/kopia/internal/contentlog/logparam"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
)
const (
simpleIndexSuffix = ".sndx"
)
type diskCommittedContentIndexCache struct {
dirname string
timeNow func() time.Time
v1PerContentOverhead func() int
log *contentlog.Logger
minSweepAge time.Duration
}
func (c *diskCommittedContentIndexCache) indexBlobPath(indexBlobID blob.ID) string {
return filepath.Join(c.dirname, string(indexBlobID)+simpleIndexSuffix)
}
func (c *diskCommittedContentIndexCache) openIndex(ctx context.Context, indexBlobID blob.ID) (index.Index, error) {
fullpath := c.indexBlobPath(indexBlobID)
f, closeMmap, err := c.mmapFile(ctx, fullpath)
if err != nil {
return nil, err
}
ndx, err := index.Open(f, closeMmap, c.v1PerContentOverhead)
if err != nil {
closeMmap() //nolint:errcheck
return nil, errors.Wrapf(err, "error opening index from %v", indexBlobID)
}
return ndx, nil
}
func (c *diskCommittedContentIndexCache) hasIndexBlobID(_ context.Context, indexBlobID blob.ID) (bool, error) {
_, err := os.Stat(c.indexBlobPath(indexBlobID))
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, errors.Wrapf(err, "error checking %v", indexBlobID)
}
func (c *diskCommittedContentIndexCache) addContentToCache(ctx context.Context, indexBlobID blob.ID, data gather.Bytes) error {
exists, err := c.hasIndexBlobID(ctx, indexBlobID)
if err != nil {
return err
}
if exists {
return nil
}
tmpFile, err := writeTempFileAtomic(c.dirname, data.ToByteSlice())
if err != nil {
return err
}
// rename() is atomic, so one process will succeed, but the other will fail
if err := os.Rename(tmpFile, c.indexBlobPath(indexBlobID)); err != nil {
// verify that the content exists
exists, err := c.hasIndexBlobID(ctx, indexBlobID)
if err != nil {
return err
}
if !exists {
return errors.Errorf("unsuccessful index write of content %q", indexBlobID)
}
}
return nil
}
func (c *diskCommittedContentIndexCache) expireUnused(ctx context.Context, used []blob.ID) error {
contentlog.Log2(ctx, c.log, "expireUnused",
blobparam.BlobIDList("except", used),
logparam.Duration("minSweepAge", c.minSweepAge))
entries, err := os.ReadDir(c.dirname)
if err != nil {
return errors.Wrap(err, "can't list cache")
}
remaining := map[blob.ID]os.FileInfo{}
for _, ent := range entries {
fi, err := ent.Info()
if os.IsNotExist(err) {
// we lost the race, the file was deleted since it was listed.
continue
}
if err != nil {
return errors.Wrap(err, "failed to read file info")
}
if n, ok := strings.CutSuffix(ent.Name(), simpleIndexSuffix); ok {
remaining[blob.ID(n)] = fi
}
}
for _, u := range used {
delete(remaining, u)
}
for _, rem := range remaining {
if c.timeNow().Sub(rem.ModTime()) > c.minSweepAge {
contentlog.Log2(ctx, c.log, "removing unused",
logparam.String("name", rem.Name()),
logparam.Time("mtime", rem.ModTime()))
if err := os.Remove(filepath.Join(c.dirname, rem.Name())); err != nil {
contentlog.Log1(ctx, c.log,
"unable to remove unused index file",
logparam.Error("err", err))
}
} else {
contentlog.Log3(ctx, c.log, "keeping unused index because it's too new",
logparam.String("name", rem.Name()),
logparam.Time("mtime", rem.ModTime()),
logparam.Duration("threshold", c.minSweepAge))
}
}
return nil
}