fixed race condition when multiple kopia instances are downloading indexes in parallel

This commit is contained in:
Jarek Kowalski
2018-05-20 10:29:11 -07:00
parent e36d02c59b
commit 80a3bdfd4a
2 changed files with 61 additions and 22 deletions

View File

@@ -18,9 +18,8 @@
type simpleCommittedBlockIndex struct {
dirname string
mu sync.Mutex
indexBlocks map[PhysicalBlockID]bool
merged packindex.Merged
mu sync.Mutex
merged packindex.Merged
}
func (b *simpleCommittedBlockIndex) getBlock(blockID string) (Info, error) {
@@ -41,32 +40,78 @@ func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID
b.mu.Lock()
defer b.mu.Unlock()
return b.indexBlocks[indexBlockID], nil
_, err := os.Stat(b.indexBlockPath(indexBlockID))
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func (b *simpleCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error {
fullPath := filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix))
func (b *simpleCommittedBlockIndex) indexBlockPath(indexBlockID PhysicalBlockID) string {
return filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix))
}
if err := ioutil.WriteFile(fullPath, data, 0600); err != nil {
func (b *simpleCommittedBlockIndex) addBlockToCache(indexBlockID PhysicalBlockID, data []byte) error {
exists, err := b.hasIndexBlockID(indexBlockID)
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
if exists {
return nil
}
b.indexBlocks[indexBlockID] = true
// write to a temp file to avoid race where two processes are writing at the same time.
tf, err := ioutil.TempFile(b.dirname, "index")
if err != nil {
return fmt.Errorf("can't create tmp file: %v", err)
}
defer os.Remove(tf.Name()) //nolint:errcheck
if _, err := tf.Write(data); err != nil {
return fmt.Errorf("can't write to temp file: %v", err)
}
if err := tf.Close(); err != nil {
return fmt.Errorf("can't close tmp file")
}
// rename() is atomic, so one process will succeed, but the other will fail
if err := os.Rename(tf.Name(), b.indexBlockPath(indexBlockID)); err != nil {
// verify that the block exists
exists, err := b.hasIndexBlockID(indexBlockID)
if err != nil {
return err
}
if !exists {
return fmt.Errorf("unsuccessful index write of block %q", indexBlockID)
}
}
return nil
}
func (b *simpleCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error {
if err := b.addBlockToCache(indexBlockID, data); err != nil {
return err
}
if !use {
return nil
}
ndx, err := b.openIndex(fullPath)
b.mu.Lock()
defer b.mu.Unlock()
ndx, err := b.openIndex(b.indexBlockPath(indexBlockID))
if err != nil {
return fmt.Errorf("unable to open pack index %q: %v", fullPath, err)
return fmt.Errorf("unable to open pack index %q: %v", indexBlockID, err)
}
b.merged = append(b.merged, ndx)
return nil
}
@@ -80,7 +125,6 @@ func (b *simpleCommittedBlockIndex) listBlocks(prefix string, cb func(i Info) er
func (b *simpleCommittedBlockIndex) openIndex(fullpath string) (packindex.Index, error) {
f, err := mmap.Open(fullpath)
//f, err := os.Open(fullpath)
if err != nil {
return nil, err
}
@@ -92,7 +136,6 @@ func (b *simpleCommittedBlockIndex) use(packBlockIDs []PhysicalBlockID) error {
b.mu.Lock()
defer b.mu.Unlock()
newIndexes := map[PhysicalBlockID]bool{}
var newMerged packindex.Merged
defer func() {
newMerged.Close() //nolint:errcheck
@@ -105,10 +148,8 @@ func (b *simpleCommittedBlockIndex) use(packBlockIDs []PhysicalBlockID) error {
}
log.Printf("opened %v with %v entries", fullpath, ndx.EntryCount())
newIndexes[e] = true
newMerged = append(newMerged, ndx)
}
b.indexBlocks = newIndexes
b.merged = newMerged
newMerged = nil
return nil
@@ -118,8 +159,7 @@ func newSimpleCommittedBlockIndex(dirname string) (committedBlockIndex, error) {
_ = os.MkdirAll(dirname, 0700)
s := &simpleCommittedBlockIndex{
dirname: dirname,
indexBlocks: map[PhysicalBlockID]bool{},
dirname: dirname,
}
return s, nil
}

View File

@@ -3,13 +3,12 @@
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/kopia/kopia/repo"
"github.com/rs/zerolog/log"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot"
@@ -121,7 +120,7 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, pol *sna
switch entry := entry.(type) {
case fs.Directory:
if !*snapshotEstimateQuiet {
log.Printf("Scanning %q...\n", relativePath)
fmt.Fprintln(os.Stderr, "Scanning", relativePath)
}
children, err := entry.Readdir(ctx)
if err != nil {