introduced multiple packs per upload - directory data is stored together, which allows is to be cached together speeding up browsing of backups

This commit is contained in:
Jarek Kowalski
2017-08-10 18:08:34 -07:00
parent 036ce15a88
commit e76243e550
5 changed files with 82 additions and 42 deletions

View File

@@ -54,6 +54,7 @@ func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter {
description: opt.Description,
prefix: opt.BlockNamePrefix,
disablePacking: opt.disablePacking,
packGroup: opt.PackGroup,
}
if opt.splitter != nil {
@@ -203,7 +204,7 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt
// hashEncryptAndWriteMaybeAsync computes hash of a given buffer, optionally encrypts and writes it to storage.
// The write is not guaranteed to complete synchronously in case write-back is used, but by the time
// Repository.Close() returns all writes are guaranteed be over.
func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) {
func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(packGroup string, buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) {
var data []byte
if buffer != nil {
data = buffer.Bytes()
@@ -220,7 +221,7 @@ func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, pref
atomic.AddInt64(&r.stats.HashedBytes, int64(len(data)))
if !disablePacking && r.packMgr.enabled() && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
packOID, err := r.packMgr.AddToPack(prefix+objectID.StorageBlock, data)
packOID, err := r.packMgr.AddToPack(packGroup, prefix+objectID.StorageBlock, data)
return packOID, err
}

View File

@@ -56,6 +56,7 @@ type objectWriter struct {
splitter objectSplitter
disablePacking bool
packGroup string
}
func (w *objectWriter) Close() error {
@@ -94,7 +95,7 @@ func (w *objectWriter) flushBuffer(force bool) error {
w.buffer.WriteTo(&b2)
w.buffer.Reset()
objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(&b2, w.prefix, w.disablePacking)
objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(w.packGroup, &b2, w.prefix, w.disablePacking)
if err != nil {
return fmt.Errorf(
"error when flushing chunk %d of %s: %#v",
@@ -115,6 +116,9 @@ func (w *objectWriter) flushBuffer(force bool) error {
description: "LIST(" + w.description + ")",
blockTracker: w.blockTracker,
splitter: w.repo.newSplitter(),
disablePacking: w.disablePacking,
packGroup: w.packGroup,
}
w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType)
w.listCurrentPos = 0
@@ -167,6 +171,7 @@ func (w *objectWriter) StorageBlocks() []string {
type WriterOptions struct {
BlockNamePrefix string
Description string
PackGroup string
splitter objectSplitter
disablePacking bool

View File

@@ -18,6 +18,7 @@ type packedObjectID struct {
type packIndex struct {
PackObject string `json:"packObject"`
PackGroup string `json:"packGroup"`
Items map[string]string `json:"items"`
}

View File

@@ -6,13 +6,18 @@
"encoding/hex"
"encoding/json"
"fmt"
"math"
"sync"
"time"
"github.com/kopia/kopia/blob"
)
type packInfo struct {
currentPackData bytes.Buffer
currentPackIndex *packIndex
currentPackID string
}
type packManager struct {
metadataManager *MetadataManager
objectManager *ObjectManager
@@ -22,10 +27,9 @@ type packManager struct {
packIndexes packIndexes
blockIDToPackedObjectID map[string]ObjectID
currentPackData bytes.Buffer
currentPackIndexes packIndexes
currentPackIndex *packIndex
currentPackID string
currentPackIndexes packIndexes
packGroups map[string]*packInfo
}
func (p *packManager) enabled() bool {
@@ -60,7 +64,7 @@ func (p *packManager) begin() error {
return nil
}
func (p *packManager) AddToPack(blockID string, data []byte) (ObjectID, error) {
func (p *packManager) AddToPack(packGroup string, blockID string, data []byte) (ObjectID, error) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -69,28 +73,33 @@ func (p *packManager) AddToPack(blockID string, data []byte) (ObjectID, error) {
return oid, nil
}
//log.Printf("%q not found in %v", blockID, p.blockIDToPackedObjectID)
if p.currentPackIndex == nil {
p.currentPackIndex = &packIndex{
Items: make(map[string]string),
}
p.currentPackID = p.newPackID()
p.currentPackIndexes[p.currentPackID] = p.currentPackIndex
p.currentPackData.Reset()
g := p.packGroups[packGroup]
if g == nil {
g = &packInfo{}
p.packGroups[packGroup] = g
}
offset := p.currentPackData.Len()
p.currentPackData.Write(data)
p.currentPackIndex.Items[blockID] = fmt.Sprintf("%v+%v", int64(offset), int64(len(data)))
if g.currentPackIndex == nil {
g.currentPackIndex = &packIndex{
Items: make(map[string]string),
PackGroup: packGroup,
}
g.currentPackID = p.newPackID()
p.currentPackIndexes[g.currentPackID] = g.currentPackIndex
g.currentPackData.Reset()
}
if p.currentPackData.Len() >= p.objectManager.format.MaxPackFileLength {
offset := g.currentPackData.Len()
g.currentPackData.Write(data)
g.currentPackIndex.Items[blockID] = fmt.Sprintf("%v+%v", int64(offset), int64(len(data)))
if g.currentPackData.Len() >= p.objectManager.format.MaxPackFileLength {
if err := p.finishCurrentPackLocked(); err != nil {
return NullObjectID, err
}
}
packedID := ObjectID{StorageBlock: blockID, PackID: p.currentPackID}
packedID := ObjectID{StorageBlock: blockID, PackID: g.currentPackID}
p.blockIDToPackedObjectID[blockID] = packedID
return packedID, nil
}
@@ -103,6 +112,10 @@ func (p *packManager) finishPacking() error {
return err
}
if err := p.savePackIndexes(); err != nil {
return err
}
pi := p.currentPackIndexes
if p.packIndexes != nil {
p.packIndexes.merge(pi)
@@ -113,8 +126,39 @@ func (p *packManager) finishPacking() error {
return nil
}
func (p *packManager) savePackIndexes() error {
if len(p.currentPackIndexes) == 0 {
return nil
}
var jb bytes.Buffer
if err := json.NewEncoder(&jb).Encode(p.currentPackIndexes); err != nil {
return fmt.Errorf("can't encode pack index: %v", err)
}
// save pack indexes
uniqueID := make([]byte, 16)
rand.Read(uniqueID)
itemID := fmt.Sprintf("%v%016x.%x", packIDPrefix, time.Now().UnixNano(), uniqueID)
if err := p.metadataManager.PutMetadata(itemID, jb.Bytes()); err != nil {
return fmt.Errorf("can't save pack index %q: %v", itemID, err)
}
return nil
}
func (p *packManager) finishCurrentPackLocked() error {
if p.currentPackIndex == nil {
for _, g := range p.packGroups {
if err := p.finishPackLocked(g); err != nil {
return err
}
}
return nil
}
func (p *packManager) finishPackLocked(g *packInfo) error {
if g.currentPackIndex == nil {
return nil
}
w := p.objectManager.NewWriter(WriterOptions{
@@ -123,32 +167,18 @@ func (p *packManager) finishCurrentPackLocked() error {
})
defer w.Close()
if _, err := p.currentPackData.WriteTo(w); err != nil {
if _, err := g.currentPackData.WriteTo(w); err != nil {
return fmt.Errorf("unable to write pack: %v", err)
}
p.currentPackData.Reset()
g.currentPackData.Reset()
oid, err := w.Result(true)
if err != nil {
return fmt.Errorf("can't save pack data: %v", err)
}
p.currentPackIndex.PackObject = oid.String()
p.currentPackIndex = nil
var jb bytes.Buffer
if err := json.NewEncoder(&jb).Encode(p.currentPackIndexes); err != nil {
return fmt.Errorf("can't encode pack index: %v", err)
}
// save pack index
uniqueID := make([]byte, 8)
rand.Read(uniqueID)
ts := math.MaxInt64 - time.Now().UnixNano()
itemID := fmt.Sprintf("%v%v.%016x.%x", packIDPrefix, p.currentPackID, ts, uniqueID)
if err := p.metadataManager.PutMetadata(itemID, jb.Bytes()); err != nil {
return fmt.Errorf("can't save pack index %q: %v", itemID, err)
}
g.currentPackIndex.PackObject = oid.String()
g.currentPackIndex = nil
return nil
}
@@ -196,5 +226,6 @@ func (r *Repository) initPackManager() {
r.packMgr = &packManager{
objectManager: r.ObjectManager,
metadataManager: r.MetadataManager,
packGroups: make(map[string]*packInfo),
}
}

View File

@@ -225,6 +225,7 @@ func uploadDir(u *uploadContext, dir fs.Directory) (repo.ObjectID, repo.ObjectID
mw := u.repo.NewWriter(repo.WriterOptions{
Description: "HASHCACHE:" + dir.Metadata().Name,
BlockNamePrefix: "H",
PackGroup: "HC",
})
defer mw.Close()
u.cacheWriter = hashcache.NewWriter(mw)
@@ -261,6 +262,7 @@ func uploadDirInternal(
writer := u.repo.NewWriter(repo.WriterOptions{
Description: "DIR:" + relativePath,
PackGroup: "DIR",
})
dw := dir.NewWriter(writer)