diff --git a/cli/command_repository_optimize.go b/cli/command_repository_optimize.go new file mode 100644 index 000000000..1fc578713 --- /dev/null +++ b/cli/command_repository_optimize.go @@ -0,0 +1,23 @@ +package cli + +import ( + "time" + + kingpin "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + optimizeCommand = repositoryCommands.Command("optimize", "Optimize repository performance.") + optimizeMinAge = optimizeCommand.Flag("min-age", "Minimum age of objects to optimize").Default("24h").Duration() +) + +func runOptimizeCommand(context *kingpin.ParseContext) error { + rep := mustOpenRepository(nil) + defer rep.Close() + + return rep.Optimize(time.Now().Add(-*optimizeMinAge)) +} + +func init() { + optimizeCommand.Action(runOptimizeCommand) +} diff --git a/repo/connection.go b/repo/connection.go index eb68b844a..67dfdb9ca 100644 --- a/repo/connection.go +++ b/repo/connection.go @@ -129,18 +129,11 @@ func connect(ctx context.Context, st blob.Storage, creds auth.Credentials, optio return nil, fmt.Errorf("unable to open object manager: %v", err) } - r := &Repository{ + return &Repository{ ObjectManager: om, MetadataManager: mm, Storage: st, - } - - r.packMgr = &packManager{ - objectManager: om, - packGroups: make(map[string]*packInfo), - } - - return r, nil + }, nil } // Disconnect removes the specified configuration file and any local cache directories. diff --git a/repo/metadata_manager.go b/repo/metadata_manager.go index 8a62f79db..1b6b4435b 100644 --- a/repo/metadata_manager.go +++ b/repo/metadata_manager.go @@ -25,6 +25,7 @@ const ( parallelFetches = 5 + parallelDeletes = 20 ) var ( diff --git a/repo/object_manager.go b/repo/object_manager.go index 3352efbd9..309c54bcf 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -9,6 +9,7 @@ "strings" "sync" "sync/atomic" + "time" "github.com/kopia/kopia/blob" "github.com/kopia/kopia/internal/config" @@ -52,6 +53,16 @@ func (r *ObjectManager) Close() error { return nil } +// Optimize performs object optimizations to improve performance of future operations. +// The opeartion will not affect objects written after cutoffTime to prevent race conditions. +func (r *ObjectManager) Optimize(cutoffTime time.Time) error { + if err := r.packMgr.Compact(cutoffTime); err != nil { + return err + } + + return nil +} + // NewWriter creates an ObjectWriter for writing to the repository. func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter { w := &objectWriter{ @@ -174,6 +185,11 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt } } + r.packMgr = &packManager{ + objectManager: r, + packGroups: make(map[string]*packInfo), + } + return r, nil } diff --git a/repo/object_manager_test.go b/repo/object_manager_test.go index 055c04005..caad72bf6 100644 --- a/repo/object_manager_test.go +++ b/repo/object_manager_test.go @@ -7,6 +7,7 @@ "fmt" "io" "io/ioutil" + "log" "math/rand" "reflect" "runtime/debug" @@ -177,6 +178,10 @@ func TestPackingSimple(t *testing.T) { } repo.Close() + for k, v := range data { + log.Printf("data[%v] = %v", k, string(v)) + } + data, repo = setupTestWithData(t, data, func(n *NewRepositoryOptions) { n.MaxPackFileLength = 10000 n.MaxPackedContentLength = 10000 @@ -420,7 +425,7 @@ func writeObject(t *testing.T, repo *Repository, data []byte, testCaseID string) func verify(t *testing.T, repo *Repository, objectID ObjectID, expectedData []byte, testCaseID string) { reader, err := repo.Open(objectID) if err != nil { - t.Errorf("cannot get reader for %v: %v %v", testCaseID, err, string(debug.Stack())) + t.Errorf("cannot get reader for %v (%v): %v %v", testCaseID, objectID, err, string(debug.Stack())) return } diff --git a/repo/pack_index.go b/repo/pack_index.go index fe9893e8b..dd84c93bb 100644 --- a/repo/pack_index.go +++ b/repo/pack_index.go @@ -1,15 +1,11 @@ package repo import ( - "bytes" "encoding/json" "io" - "sort" "time" ) -const packIDPrefix = "K" - type packIndexes map[string]*packIndex type packIndex struct { @@ -31,27 +27,9 @@ func loadPackIndexes(r io.Reader) (packIndexes, error) { func (i packIndexes) merge(other packIndexes) { for packID, ndx := range other { - i[packID] = ndx - } -} - -func loadMergedPackIndex(m map[string][]byte) (packIndexes, error) { - var names []string - for n := range m { - names = append(names, n) - } - - sort.Strings(names) - - merged := make(packIndexes) - for _, n := range names { - content := m[n] - pi, err := loadPackIndexes(bytes.NewReader(content)) - if err != nil { - return nil, err + old := i[packID] + if old == nil || ndx.CreateTime.After(old.CreateTime) { + i[packID] = ndx } - merged.merge(pi) } - - return merged, nil } diff --git a/repo/pack_manager.go b/repo/pack_manager.go index 245d120b8..8e830783f 100644 --- a/repo/pack_manager.go +++ b/repo/pack_manager.go @@ -2,10 +2,12 @@ import ( "bytes" + "compress/gzip" "crypto/rand" "encoding/hex" "encoding/json" "fmt" + "io" "io/ioutil" "log" "strconv" @@ -151,25 +153,30 @@ func (p *packManager) savePackIndexes() error { return nil } - var jb bytes.Buffer - if err := json.NewEncoder(&jb).Encode(p.pendingPackIndexes); err != nil { - return fmt.Errorf("can't encode pack index: %v", err) - } + return p.writePackIndexes(p.pendingPackIndexes) +} +func (p *packManager) writePackIndexes(ndx packIndexes) error { w := p.objectManager.NewWriter(WriterOptions{ disablePacking: true, + Description: "pack index", BlockNamePrefix: packObjectPrefix, splitter: newNeverSplitter(), }) + defer w.Close() + + zw := gzip.NewWriter(w) + if err := json.NewEncoder(zw).Encode(p.pendingPackIndexes); err != nil { + return fmt.Errorf("can't encode pack index: %v", err) + } + zw.Close() - w.Write(jb.Bytes()) if _, err := w.Result(); err != nil { return fmt.Errorf("can't save pack index object: %v", err) } return nil } - func (p *packManager) finishCurrentPackLocked() error { for _, g := range p.packGroups { if err := p.finishPackLocked(g); err != nil { @@ -207,17 +214,7 @@ func (p *packManager) finishPackLocked(g *packInfo) error { return nil } -func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { - p.mu.RLock() - pi := p.blockToIndex - p.mu.RUnlock() - if pi != nil { - return pi, nil - } - - p.mu.Lock() - defer p.mu.Unlock() - +func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*packIndex, []string, error) { ch, cancel := p.objectManager.storage.ListBlocks(packObjectPrefix) defer cancel() @@ -228,8 +225,9 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { errors := make(chan error, parallelFetches) var mu sync.Mutex - m := map[string][]byte{} + packIndexData := map[string][]byte{} totalSize := 0 + var blockIDs []string for i := 0; i < parallelFetches; i++ { wg.Add(1) go func() { @@ -241,6 +239,10 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { return } + if olderThan != nil && b.TimeStamp.After(*olderThan) { + return + } + r, err := p.objectManager.Open(ObjectID{StorageBlock: b.BlockID}) if err != nil { errors <- err @@ -254,7 +256,8 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { } mu.Lock() - m[fmt.Sprintf("%16x", b.TimeStamp.UnixNano())] = data + packIndexData[b.BlockID] = data + blockIDs = append(blockIDs, b.BlockID) totalSize += len(data) mu.Unlock() } @@ -266,14 +269,43 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { // Propagate async errors, if any. for err := range errors { - return nil, err + return nil, nil, err } if false { - log.Printf("loaded %v pack indexes (%v bytes) in %v", len(m), totalSize, time.Since(t0)) + log.Printf("loaded %v pack indexes (%v bytes) in %v", len(packIndexData), totalSize, time.Since(t0)) } - merged, err := loadMergedPackIndex(m) + merged := make(packIndexes) + for blockID, content := range packIndexData { + var r io.Reader = bytes.NewReader(content) + zr, err := gzip.NewReader(r) + if err != nil { + return nil, nil, fmt.Errorf("unable to read pack index from %q: %v", blockID, err) + } + + pi, err := loadPackIndexes(zr) + if err != nil { + return nil, nil, err + } + merged.merge(pi) + } + + return merged, blockIDs, nil +} + +func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { + p.mu.RLock() + pi := p.blockToIndex + p.mu.RUnlock() + if pi != nil { + return pi, nil + } + + p.mu.Lock() + defer p.mu.Unlock() + + merged, _, err := p.loadMergedPackIndex(nil) if err != nil { return nil, err } @@ -286,12 +318,56 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { } p.blockToIndex = pi - // log.Printf("loaded pack index with %v entries", len(p.blockToIndex)) return pi, nil } +func (p *packManager) Compact(cutoffTime time.Time) error { + merged, blockIDs, err := p.loadMergedPackIndex(&cutoffTime) + if err != nil { + return err + } + + if len(blockIDs) < parallelFetches { + return nil + } + + if err := p.writePackIndexes(merged); err != nil { + return err + } + + ch := makeStringChannel(blockIDs) + var wg sync.WaitGroup + + for i := 0; i < parallelDeletes; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for blockID := range ch { + if err := p.objectManager.storage.DeleteBlock(blockID); err != nil { + log.Printf("warning: unable to delete %q: %v", blockID, err) + } + } + }(i) + } + wg.Wait() + return nil +} + +func makeStringChannel(s []string) <-chan string { + ch := make(chan string) + go func() { + defer close(ch) + + for _, v := range s { + ch <- v + } + }() + return ch +} + func (p *packManager) newPackID() string { id := make([]byte, 8) rand.Read(id)