diff --git a/block/simple_committed_block_index.go b/block/simple_committed_block_index.go index 211fcdd28..d43fc8064 100644 --- a/block/simple_committed_block_index.go +++ b/block/simple_committed_block_index.go @@ -18,9 +18,8 @@ type simpleCommittedBlockIndex struct { dirname string - mu sync.Mutex - indexBlocks map[PhysicalBlockID]bool - merged packindex.Merged + mu sync.Mutex + merged packindex.Merged } func (b *simpleCommittedBlockIndex) getBlock(blockID string) (Info, error) { @@ -41,32 +40,78 @@ func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID b.mu.Lock() defer b.mu.Unlock() - return b.indexBlocks[indexBlockID], nil + _, err := os.Stat(b.indexBlockPath(indexBlockID)) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + + return false, err } -func (b *simpleCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error { - fullPath := filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix)) +func (b *simpleCommittedBlockIndex) indexBlockPath(indexBlockID PhysicalBlockID) string { + return filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix)) +} - if err := ioutil.WriteFile(fullPath, data, 0600); err != nil { +func (b *simpleCommittedBlockIndex) addBlockToCache(indexBlockID PhysicalBlockID, data []byte) error { + exists, err := b.hasIndexBlockID(indexBlockID) + if err != nil { return err } - b.mu.Lock() - defer b.mu.Unlock() + if exists { + return nil + } - b.indexBlocks[indexBlockID] = true + // write to a temp file to avoid race where two processes are writing at the same time. + tf, err := ioutil.TempFile(b.dirname, "index") + if err != nil { + return fmt.Errorf("can't create tmp file: %v", err) + } + defer os.Remove(tf.Name()) //nolint:errcheck + + if _, err := tf.Write(data); err != nil { + return fmt.Errorf("can't write to temp file: %v", err) + } + if err := tf.Close(); err != nil { + return fmt.Errorf("can't close tmp file") + } + + // rename() is atomic, so one process will succeed, but the other will fail + if err := os.Rename(tf.Name(), b.indexBlockPath(indexBlockID)); err != nil { + // verify that the block exists + exists, err := b.hasIndexBlockID(indexBlockID) + if err != nil { + return err + } + if !exists { + return fmt.Errorf("unsuccessful index write of block %q", indexBlockID) + } + } + + return nil +} + +func (b *simpleCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error { + if err := b.addBlockToCache(indexBlockID, data); err != nil { + return err + } if !use { return nil } - ndx, err := b.openIndex(fullPath) + b.mu.Lock() + defer b.mu.Unlock() + + ndx, err := b.openIndex(b.indexBlockPath(indexBlockID)) if err != nil { - return fmt.Errorf("unable to open pack index %q: %v", fullPath, err) + return fmt.Errorf("unable to open pack index %q: %v", indexBlockID, err) } b.merged = append(b.merged, ndx) - return nil } @@ -80,7 +125,6 @@ func (b *simpleCommittedBlockIndex) listBlocks(prefix string, cb func(i Info) er func (b *simpleCommittedBlockIndex) openIndex(fullpath string) (packindex.Index, error) { f, err := mmap.Open(fullpath) - //f, err := os.Open(fullpath) if err != nil { return nil, err } @@ -92,7 +136,6 @@ func (b *simpleCommittedBlockIndex) use(packBlockIDs []PhysicalBlockID) error { b.mu.Lock() defer b.mu.Unlock() - newIndexes := map[PhysicalBlockID]bool{} var newMerged packindex.Merged defer func() { newMerged.Close() //nolint:errcheck @@ -105,10 +148,8 @@ func (b *simpleCommittedBlockIndex) use(packBlockIDs []PhysicalBlockID) error { } log.Printf("opened %v with %v entries", fullpath, ndx.EntryCount()) - newIndexes[e] = true newMerged = append(newMerged, ndx) } - b.indexBlocks = newIndexes b.merged = newMerged newMerged = nil return nil @@ -118,8 +159,7 @@ func newSimpleCommittedBlockIndex(dirname string) (committedBlockIndex, error) { _ = os.MkdirAll(dirname, 0700) s := &simpleCommittedBlockIndex{ - dirname: dirname, - indexBlocks: map[PhysicalBlockID]bool{}, + dirname: dirname, } return s, nil } diff --git a/cli/command_snapshot_estimate.go b/cli/command_snapshot_estimate.go index 80b4d5c72..ff6493568 100644 --- a/cli/command_snapshot_estimate.go +++ b/cli/command_snapshot_estimate.go @@ -3,13 +3,12 @@ import ( "context" "fmt" + "os" "path/filepath" "time" "github.com/kopia/kopia/repo" - "github.com/rs/zerolog/log" - "github.com/kopia/kopia/fs" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/snapshot" @@ -121,7 +120,7 @@ func estimate(ctx context.Context, relativePath string, entry fs.Entry, pol *sna switch entry := entry.(type) { case fs.Directory: if !*snapshotEstimateQuiet { - log.Printf("Scanning %q...\n", relativePath) + fmt.Fprintln(os.Stderr, "Scanning", relativePath) } children, err := entry.Readdir(ctx) if err != nil {