From e76243e55096e9ede5f9cd39706f41b70a1e46e1 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 10 Aug 2017 18:08:34 -0700 Subject: [PATCH] introduced multiple packs per upload - directory data is stored together, which allows is to be cached together speeding up browsing of backups --- repo/object_manager.go | 5 +- repo/object_writer.go | 7 ++- repo/pack_index.go | 1 + repo/pack_manager.go | 109 ++++++++++++++++++++++++++--------------- snapshot/upload.go | 2 + 5 files changed, 82 insertions(+), 42 deletions(-) diff --git a/repo/object_manager.go b/repo/object_manager.go index 2ec81c27a..a37734aa7 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -54,6 +54,7 @@ func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter { description: opt.Description, prefix: opt.BlockNamePrefix, disablePacking: opt.disablePacking, + packGroup: opt.PackGroup, } if opt.splitter != nil { @@ -203,7 +204,7 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt // hashEncryptAndWriteMaybeAsync computes hash of a given buffer, optionally encrypts and writes it to storage. // The write is not guaranteed to complete synchronously in case write-back is used, but by the time // Repository.Close() returns all writes are guaranteed be over. -func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) { +func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(packGroup string, buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) { var data []byte if buffer != nil { data = buffer.Bytes() @@ -220,7 +221,7 @@ func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, pref atomic.AddInt64(&r.stats.HashedBytes, int64(len(data))) if !disablePacking && r.packMgr.enabled() && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength { - packOID, err := r.packMgr.AddToPack(prefix+objectID.StorageBlock, data) + packOID, err := r.packMgr.AddToPack(packGroup, prefix+objectID.StorageBlock, data) return packOID, err } diff --git a/repo/object_writer.go b/repo/object_writer.go index 1463b8fca..5c59c298f 100644 --- a/repo/object_writer.go +++ b/repo/object_writer.go @@ -56,6 +56,7 @@ type objectWriter struct { splitter objectSplitter disablePacking bool + packGroup string } func (w *objectWriter) Close() error { @@ -94,7 +95,7 @@ func (w *objectWriter) flushBuffer(force bool) error { w.buffer.WriteTo(&b2) w.buffer.Reset() - objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(&b2, w.prefix, w.disablePacking) + objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(w.packGroup, &b2, w.prefix, w.disablePacking) if err != nil { return fmt.Errorf( "error when flushing chunk %d of %s: %#v", @@ -115,6 +116,9 @@ func (w *objectWriter) flushBuffer(force bool) error { description: "LIST(" + w.description + ")", blockTracker: w.blockTracker, splitter: w.repo.newSplitter(), + + disablePacking: w.disablePacking, + packGroup: w.packGroup, } w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType) w.listCurrentPos = 0 @@ -167,6 +171,7 @@ func (w *objectWriter) StorageBlocks() []string { type WriterOptions struct { BlockNamePrefix string Description string + PackGroup string splitter objectSplitter disablePacking bool diff --git a/repo/pack_index.go b/repo/pack_index.go index 35552630c..86a7254ea 100644 --- a/repo/pack_index.go +++ b/repo/pack_index.go @@ -18,6 +18,7 @@ type packedObjectID struct { type packIndex struct { PackObject string `json:"packObject"` + PackGroup string `json:"packGroup"` Items map[string]string `json:"items"` } diff --git a/repo/pack_manager.go b/repo/pack_manager.go index 4cd6a8bd0..8f23973d2 100644 --- a/repo/pack_manager.go +++ b/repo/pack_manager.go @@ -6,13 +6,18 @@ "encoding/hex" "encoding/json" "fmt" - "math" "sync" "time" "github.com/kopia/kopia/blob" ) +type packInfo struct { + currentPackData bytes.Buffer + currentPackIndex *packIndex + currentPackID string +} + type packManager struct { metadataManager *MetadataManager objectManager *ObjectManager @@ -22,10 +27,9 @@ type packManager struct { packIndexes packIndexes blockIDToPackedObjectID map[string]ObjectID - currentPackData bytes.Buffer - currentPackIndexes packIndexes - currentPackIndex *packIndex - currentPackID string + + currentPackIndexes packIndexes + packGroups map[string]*packInfo } func (p *packManager) enabled() bool { @@ -60,7 +64,7 @@ func (p *packManager) begin() error { return nil } -func (p *packManager) AddToPack(blockID string, data []byte) (ObjectID, error) { +func (p *packManager) AddToPack(packGroup string, blockID string, data []byte) (ObjectID, error) { p.mu.Lock() defer p.mu.Unlock() @@ -69,28 +73,33 @@ func (p *packManager) AddToPack(blockID string, data []byte) (ObjectID, error) { return oid, nil } - //log.Printf("%q not found in %v", blockID, p.blockIDToPackedObjectID) - - if p.currentPackIndex == nil { - p.currentPackIndex = &packIndex{ - Items: make(map[string]string), - } - p.currentPackID = p.newPackID() - p.currentPackIndexes[p.currentPackID] = p.currentPackIndex - p.currentPackData.Reset() + g := p.packGroups[packGroup] + if g == nil { + g = &packInfo{} + p.packGroups[packGroup] = g } - offset := p.currentPackData.Len() - p.currentPackData.Write(data) - p.currentPackIndex.Items[blockID] = fmt.Sprintf("%v+%v", int64(offset), int64(len(data))) + if g.currentPackIndex == nil { + g.currentPackIndex = &packIndex{ + Items: make(map[string]string), + PackGroup: packGroup, + } + g.currentPackID = p.newPackID() + p.currentPackIndexes[g.currentPackID] = g.currentPackIndex + g.currentPackData.Reset() + } - if p.currentPackData.Len() >= p.objectManager.format.MaxPackFileLength { + offset := g.currentPackData.Len() + g.currentPackData.Write(data) + g.currentPackIndex.Items[blockID] = fmt.Sprintf("%v+%v", int64(offset), int64(len(data))) + + if g.currentPackData.Len() >= p.objectManager.format.MaxPackFileLength { if err := p.finishCurrentPackLocked(); err != nil { return NullObjectID, err } } - packedID := ObjectID{StorageBlock: blockID, PackID: p.currentPackID} + packedID := ObjectID{StorageBlock: blockID, PackID: g.currentPackID} p.blockIDToPackedObjectID[blockID] = packedID return packedID, nil } @@ -103,6 +112,10 @@ func (p *packManager) finishPacking() error { return err } + if err := p.savePackIndexes(); err != nil { + return err + } + pi := p.currentPackIndexes if p.packIndexes != nil { p.packIndexes.merge(pi) @@ -113,8 +126,39 @@ func (p *packManager) finishPacking() error { return nil } +func (p *packManager) savePackIndexes() error { + if len(p.currentPackIndexes) == 0 { + return nil + } + + var jb bytes.Buffer + if err := json.NewEncoder(&jb).Encode(p.currentPackIndexes); err != nil { + return fmt.Errorf("can't encode pack index: %v", err) + } + + // save pack indexes + uniqueID := make([]byte, 16) + rand.Read(uniqueID) + itemID := fmt.Sprintf("%v%016x.%x", packIDPrefix, time.Now().UnixNano(), uniqueID) + if err := p.metadataManager.PutMetadata(itemID, jb.Bytes()); err != nil { + return fmt.Errorf("can't save pack index %q: %v", itemID, err) + } + + return nil +} + func (p *packManager) finishCurrentPackLocked() error { - if p.currentPackIndex == nil { + for _, g := range p.packGroups { + if err := p.finishPackLocked(g); err != nil { + return err + } + } + + return nil +} + +func (p *packManager) finishPackLocked(g *packInfo) error { + if g.currentPackIndex == nil { return nil } w := p.objectManager.NewWriter(WriterOptions{ @@ -123,32 +167,18 @@ func (p *packManager) finishCurrentPackLocked() error { }) defer w.Close() - if _, err := p.currentPackData.WriteTo(w); err != nil { + if _, err := g.currentPackData.WriteTo(w); err != nil { return fmt.Errorf("unable to write pack: %v", err) } - p.currentPackData.Reset() + g.currentPackData.Reset() oid, err := w.Result(true) if err != nil { return fmt.Errorf("can't save pack data: %v", err) } - p.currentPackIndex.PackObject = oid.String() - p.currentPackIndex = nil - - var jb bytes.Buffer - if err := json.NewEncoder(&jb).Encode(p.currentPackIndexes); err != nil { - return fmt.Errorf("can't encode pack index: %v", err) - } - - // save pack index - uniqueID := make([]byte, 8) - rand.Read(uniqueID) - ts := math.MaxInt64 - time.Now().UnixNano() - itemID := fmt.Sprintf("%v%v.%016x.%x", packIDPrefix, p.currentPackID, ts, uniqueID) - if err := p.metadataManager.PutMetadata(itemID, jb.Bytes()); err != nil { - return fmt.Errorf("can't save pack index %q: %v", itemID, err) - } + g.currentPackIndex.PackObject = oid.String() + g.currentPackIndex = nil return nil } @@ -196,5 +226,6 @@ func (r *Repository) initPackManager() { r.packMgr = &packManager{ objectManager: r.ObjectManager, metadataManager: r.MetadataManager, + packGroups: make(map[string]*packInfo), } } diff --git a/snapshot/upload.go b/snapshot/upload.go index 49c55357e..51ecdd3b0 100644 --- a/snapshot/upload.go +++ b/snapshot/upload.go @@ -225,6 +225,7 @@ func uploadDir(u *uploadContext, dir fs.Directory) (repo.ObjectID, repo.ObjectID mw := u.repo.NewWriter(repo.WriterOptions{ Description: "HASHCACHE:" + dir.Metadata().Name, BlockNamePrefix: "H", + PackGroup: "HC", }) defer mw.Close() u.cacheWriter = hashcache.NewWriter(mw) @@ -261,6 +262,7 @@ func uploadDirInternal( writer := u.repo.NewWriter(repo.WriterOptions{ Description: "DIR:" + relativePath, + PackGroup: "DIR", }) dw := dir.NewWriter(writer)