added support for pack index compaction (not the pack file compaction, which is not implemented yet). it is combining multiple pack index objects P* into one large P which loads more quickly

This commit is contained in:
Jarek Kowalski
2017-09-09 09:30:06 -07:00
parent 6ef7005396
commit bb64238eba
7 changed files with 150 additions and 58 deletions

View File

@@ -0,0 +1,23 @@
package cli
import (
"time"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
var (
optimizeCommand = repositoryCommands.Command("optimize", "Optimize repository performance.")
optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of objects to optimize").Default("24h").Duration()
)
func runOptimizeCommand(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
return rep.Optimize(time.Now().Add(-*optimizeMinAge))
}
func init() {
optimizeCommand.Action(runOptimizeCommand)
}

View File

@@ -129,18 +129,11 @@ func connect(ctx context.Context, st blob.Storage, creds auth.Credentials, optio
return nil, fmt.Errorf("unable to open object manager: %v", err)
}
r := &Repository{
return &Repository{
ObjectManager: om,
MetadataManager: mm,
Storage: st,
}
r.packMgr = &packManager{
objectManager: om,
packGroups: make(map[string]*packInfo),
}
return r, nil
}, nil
}
// Disconnect removes the specified configuration file and any local cache directories.

View File

@@ -25,6 +25,7 @@
const (
parallelFetches = 5
parallelDeletes = 20
)
var (

View File

@@ -9,6 +9,7 @@
"strings"
"sync"
"sync/atomic"
"time"
"github.com/kopia/kopia/blob"
"github.com/kopia/kopia/internal/config"
@@ -52,6 +53,16 @@ func (r *ObjectManager) Close() error {
return nil
}
// Optimize performs object optimizations to improve performance of future operations.
// The opeartion will not affect objects written after cutoffTime to prevent race conditions.
func (r *ObjectManager) Optimize(cutoffTime time.Time) error {
if err := r.packMgr.Compact(cutoffTime); err != nil {
return err
}
return nil
}
// NewWriter creates an ObjectWriter for writing to the repository.
func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter {
w := &objectWriter{
@@ -174,6 +185,11 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt
}
}
r.packMgr = &packManager{
objectManager: r,
packGroups: make(map[string]*packInfo),
}
return r, nil
}

View File

@@ -7,6 +7,7 @@
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"reflect"
"runtime/debug"
@@ -177,6 +178,10 @@ func TestPackingSimple(t *testing.T) {
}
repo.Close()
for k, v := range data {
log.Printf("data[%v] = %v", k, string(v))
}
data, repo = setupTestWithData(t, data, func(n *NewRepositoryOptions) {
n.MaxPackFileLength = 10000
n.MaxPackedContentLength = 10000
@@ -420,7 +425,7 @@ func writeObject(t *testing.T, repo *Repository, data []byte, testCaseID string)
func verify(t *testing.T, repo *Repository, objectID ObjectID, expectedData []byte, testCaseID string) {
reader, err := repo.Open(objectID)
if err != nil {
t.Errorf("cannot get reader for %v: %v %v", testCaseID, err, string(debug.Stack()))
t.Errorf("cannot get reader for %v (%v): %v %v", testCaseID, objectID, err, string(debug.Stack()))
return
}

View File

@@ -1,15 +1,11 @@
package repo
import (
"bytes"
"encoding/json"
"io"
"sort"
"time"
)
const packIDPrefix = "K"
type packIndexes map[string]*packIndex
type packIndex struct {
@@ -31,27 +27,9 @@ func loadPackIndexes(r io.Reader) (packIndexes, error) {
func (i packIndexes) merge(other packIndexes) {
for packID, ndx := range other {
i[packID] = ndx
}
}
func loadMergedPackIndex(m map[string][]byte) (packIndexes, error) {
var names []string
for n := range m {
names = append(names, n)
}
sort.Strings(names)
merged := make(packIndexes)
for _, n := range names {
content := m[n]
pi, err := loadPackIndexes(bytes.NewReader(content))
if err != nil {
return nil, err
old := i[packID]
if old == nil || ndx.CreateTime.After(old.CreateTime) {
i[packID] = ndx
}
merged.merge(pi)
}
return merged, nil
}

View File

@@ -2,10 +2,12 @@
import (
"bytes"
"compress/gzip"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"strconv"
@@ -151,25 +153,30 @@ func (p *packManager) savePackIndexes() error {
return nil
}
var jb bytes.Buffer
if err := json.NewEncoder(&jb).Encode(p.pendingPackIndexes); err != nil {
return fmt.Errorf("can't encode pack index: %v", err)
}
return p.writePackIndexes(p.pendingPackIndexes)
}
func (p *packManager) writePackIndexes(ndx packIndexes) error {
w := p.objectManager.NewWriter(WriterOptions{
disablePacking: true,
Description: "pack index",
BlockNamePrefix: packObjectPrefix,
splitter: newNeverSplitter(),
})
defer w.Close()
zw := gzip.NewWriter(w)
if err := json.NewEncoder(zw).Encode(p.pendingPackIndexes); err != nil {
return fmt.Errorf("can't encode pack index: %v", err)
}
zw.Close()
w.Write(jb.Bytes())
if _, err := w.Result(); err != nil {
return fmt.Errorf("can't save pack index object: %v", err)
}
return nil
}
func (p *packManager) finishCurrentPackLocked() error {
for _, g := range p.packGroups {
if err := p.finishPackLocked(g); err != nil {
@@ -207,17 +214,7 @@ func (p *packManager) finishPackLocked(g *packInfo) error {
return nil
}
func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
p.mu.RLock()
pi := p.blockToIndex
p.mu.RUnlock()
if pi != nil {
return pi, nil
}
p.mu.Lock()
defer p.mu.Unlock()
func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*packIndex, []string, error) {
ch, cancel := p.objectManager.storage.ListBlocks(packObjectPrefix)
defer cancel()
@@ -228,8 +225,9 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
errors := make(chan error, parallelFetches)
var mu sync.Mutex
m := map[string][]byte{}
packIndexData := map[string][]byte{}
totalSize := 0
var blockIDs []string
for i := 0; i < parallelFetches; i++ {
wg.Add(1)
go func() {
@@ -241,6 +239,10 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
return
}
if olderThan != nil && b.TimeStamp.After(*olderThan) {
return
}
r, err := p.objectManager.Open(ObjectID{StorageBlock: b.BlockID})
if err != nil {
errors <- err
@@ -254,7 +256,8 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
}
mu.Lock()
m[fmt.Sprintf("%16x", b.TimeStamp.UnixNano())] = data
packIndexData[b.BlockID] = data
blockIDs = append(blockIDs, b.BlockID)
totalSize += len(data)
mu.Unlock()
}
@@ -266,14 +269,43 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
// Propagate async errors, if any.
for err := range errors {
return nil, err
return nil, nil, err
}
if false {
log.Printf("loaded %v pack indexes (%v bytes) in %v", len(m), totalSize, time.Since(t0))
log.Printf("loaded %v pack indexes (%v bytes) in %v", len(packIndexData), totalSize, time.Since(t0))
}
merged, err := loadMergedPackIndex(m)
merged := make(packIndexes)
for blockID, content := range packIndexData {
var r io.Reader = bytes.NewReader(content)
zr, err := gzip.NewReader(r)
if err != nil {
return nil, nil, fmt.Errorf("unable to read pack index from %q: %v", blockID, err)
}
pi, err := loadPackIndexes(zr)
if err != nil {
return nil, nil, err
}
merged.merge(pi)
}
return merged, blockIDs, nil
}
func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
p.mu.RLock()
pi := p.blockToIndex
p.mu.RUnlock()
if pi != nil {
return pi, nil
}
p.mu.Lock()
defer p.mu.Unlock()
merged, _, err := p.loadMergedPackIndex(nil)
if err != nil {
return nil, err
}
@@ -286,12 +318,56 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
}
p.blockToIndex = pi
// log.Printf("loaded pack index with %v entries", len(p.blockToIndex))
return pi, nil
}
func (p *packManager) Compact(cutoffTime time.Time) error {
merged, blockIDs, err := p.loadMergedPackIndex(&cutoffTime)
if err != nil {
return err
}
if len(blockIDs) < parallelFetches {
return nil
}
if err := p.writePackIndexes(merged); err != nil {
return err
}
ch := makeStringChannel(blockIDs)
var wg sync.WaitGroup
for i := 0; i < parallelDeletes; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for blockID := range ch {
if err := p.objectManager.storage.DeleteBlock(blockID); err != nil {
log.Printf("warning: unable to delete %q: %v", blockID, err)
}
}
}(i)
}
wg.Wait()
return nil
}
func makeStringChannel(s []string) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for _, v := range s {
ch <- v
}
}()
return ch
}
func (p *packManager) newPackID() string {
id := make([]byte, 8)
rand.Read(id)