mirror of
https://github.com/kopia/kopia.git
synced 2026-01-28 00:08:04 -05:00
made packing the default, replaced FinishPacking with Flush()
This commit is contained in:
@@ -49,8 +49,7 @@ type ObjectManager struct {
|
||||
func (r *ObjectManager) Close() error {
|
||||
r.writeBackWG.Wait()
|
||||
r.blockSizeCache.close()
|
||||
|
||||
return nil
|
||||
return r.Flush()
|
||||
}
|
||||
|
||||
// Optimize performs object optimizations to improve performance of future operations.
|
||||
@@ -208,13 +207,10 @@ func (r *ObjectManager) verifyObjectInternal(oid ObjectID, blocks *blockTracker)
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// BeginPacking enables creation of pack files.
|
||||
func (r *ObjectManager) BeginPacking() error {
|
||||
return r.packMgr.begin()
|
||||
}
|
||||
|
||||
// FinishPacking closes any pending pack files. Once this method returns
|
||||
func (r *ObjectManager) FinishPacking() error {
|
||||
// Flush closes any pending pack files. Once this method returns, ObjectIDs returned by ObjectManager are
|
||||
// ok to be used.
|
||||
func (r *ObjectManager) Flush() error {
|
||||
r.writeBackWG.Wait()
|
||||
return r.packMgr.finishPacking()
|
||||
}
|
||||
|
||||
@@ -262,10 +258,7 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt
|
||||
}
|
||||
}
|
||||
|
||||
r.packMgr = &packManager{
|
||||
objectManager: r,
|
||||
packGroups: make(map[string]*packInfo),
|
||||
}
|
||||
r.packMgr = newPackManager(r)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
@@ -285,7 +278,7 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff
|
||||
atomic.AddInt32(&r.stats.HashedBlocks, 1)
|
||||
atomic.AddInt64(&r.stats.HashedBytes, int64(len(data)))
|
||||
|
||||
if !isPackInternalObject && r.packMgr.enabled() && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
if !isPackInternalObject && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
packOID, err := r.packMgr.AddToPack(packGroup, prefix+objectID.StorageBlock, data)
|
||||
return packOID, err
|
||||
}
|
||||
|
||||
@@ -129,20 +129,12 @@ func TestPackingSimple(t *testing.T) {
|
||||
content2 := "hi, how are you?"
|
||||
content3 := "thank you!"
|
||||
|
||||
if err := repo.Objects.BeginPacking(); err != nil {
|
||||
t.Fatalf("error in BeginPacking: %v", err)
|
||||
}
|
||||
|
||||
oid1a := writeObject(t, repo, []byte(content1), "packed-object-1a")
|
||||
oid1b := writeObject(t, repo, []byte(content1), "packed-object-1b")
|
||||
oid2a := writeObject(t, repo, []byte(content2), "packed-object-2a")
|
||||
oid2b := writeObject(t, repo, []byte(content2), "packed-object-2b")
|
||||
|
||||
repo.Objects.FinishPacking()
|
||||
|
||||
if err := repo.Objects.BeginPacking(); err != nil {
|
||||
t.Fatalf("error in BeginPacking: %v", err)
|
||||
}
|
||||
repo.Objects.Flush()
|
||||
|
||||
oid3a := writeObject(t, repo, []byte(content3), "packed-object-3a")
|
||||
oid3b := writeObject(t, repo, []byte(content3), "packed-object-3b")
|
||||
@@ -151,7 +143,7 @@ func TestPackingSimple(t *testing.T) {
|
||||
oid2c := writeObject(t, repo, []byte(content2), "packed-object-2c")
|
||||
oid1c := writeObject(t, repo, []byte(content1), "packed-object-1c")
|
||||
|
||||
repo.Objects.FinishPacking()
|
||||
repo.Objects.Flush()
|
||||
|
||||
if got, want := oid1a.String(), oid1b.String(); got != want {
|
||||
t.Errorf("oid1a(%q) != oid1b(%q)", got, want)
|
||||
|
||||
@@ -47,13 +47,6 @@ type packManager struct {
|
||||
packGroups map[string]*packInfo
|
||||
}
|
||||
|
||||
func (p *packManager) enabled() bool {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
return p.pendingPackIndexes != nil
|
||||
}
|
||||
|
||||
func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, bool, error) {
|
||||
if strings.HasPrefix(blockID, packObjectPrefix) {
|
||||
return ObjectIDSection{}, false, nil
|
||||
@@ -99,23 +92,12 @@ func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, boo
|
||||
return ObjectIDSection{}, false, fmt.Errorf("invalid pack index for %q", blockID)
|
||||
}
|
||||
|
||||
func (p *packManager) begin() error {
|
||||
p.ensurePackIndexesLoaded()
|
||||
p.flushPackIndexesAfter = time.Now().Add(flushPackIndexTimeout)
|
||||
p.pendingPackIndexes = make(packIndexes)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *packManager) RegisterNonPackedBlock(blockID string, dataLength int, isInternal bool) error {
|
||||
if !isInternal {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
}
|
||||
|
||||
if p.pendingPackIndexes == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
g := p.ensurePackGroupLocked(unpackedObjectsPackGroup)
|
||||
g.currentPackIndex.Items[blockID] = fmt.Sprintf("0+%v", dataLength)
|
||||
|
||||
@@ -129,6 +111,8 @@ func (p *packManager) RegisterNonPackedBlock(blockID string, dataLength int, isI
|
||||
}
|
||||
|
||||
func (p *packManager) AddToPack(packGroup string, blockID string, data []byte) (ObjectID, error) {
|
||||
p.ensurePackIndexesLoaded()
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
@@ -207,7 +191,6 @@ func (p *packManager) finishPacking() error {
|
||||
return err
|
||||
}
|
||||
|
||||
p.pendingPackIndexes = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -281,6 +264,7 @@ func (p *packManager) finishPackLocked(g *packInfo) error {
|
||||
|
||||
g.currentPackIndex.PackObject = oid.String()
|
||||
}
|
||||
|
||||
g.currentPackIndex = nil
|
||||
|
||||
return nil
|
||||
@@ -455,3 +439,12 @@ func (p *packManager) Flush() error {
|
||||
|
||||
return p.finishCurrentPackLocked()
|
||||
}
|
||||
|
||||
func newPackManager(om *ObjectManager) *packManager {
|
||||
return &packManager{
|
||||
objectManager: om,
|
||||
packGroups: make(map[string]*packInfo),
|
||||
flushPackIndexesAfter: time.Now().Add(flushPackIndexTimeout),
|
||||
pendingPackIndexes: make(packIndexes),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,8 +79,7 @@ func (r *Repository) Close() error {
|
||||
|
||||
// Flush waits for all in-flight writes to complete.
|
||||
func (r *Repository) Flush() error {
|
||||
r.Objects.writeBackWG.Wait()
|
||||
return nil
|
||||
return r.Objects.Flush()
|
||||
}
|
||||
|
||||
// ResetStats resets all repository-wide statistics to zero values.
|
||||
|
||||
@@ -219,10 +219,6 @@ func (u *Uploader) uploadFile(file fs.File) (repo.ObjectID, error) {
|
||||
func (u *Uploader) uploadDir(dir fs.Directory) (repo.ObjectID, repo.ObjectID, error) {
|
||||
var err error
|
||||
|
||||
if err := u.repo.Objects.BeginPacking(); err != nil {
|
||||
return repo.NullObjectID, repo.NullObjectID, err
|
||||
}
|
||||
|
||||
mw := u.repo.Objects.NewWriter(repo.WriterOptions{
|
||||
Description: "HASHCACHE:" + dir.Metadata().Name,
|
||||
BlockNamePrefix: "H",
|
||||
@@ -244,8 +240,8 @@ func (u *Uploader) uploadDir(dir fs.Directory) (repo.ObjectID, repo.ObjectID, er
|
||||
}
|
||||
|
||||
hcid, err := mw.Result()
|
||||
if err := u.repo.Objects.FinishPacking(); err != nil {
|
||||
return repo.NullObjectID, repo.NullObjectID, fmt.Errorf("can't finish packing: %v", err)
|
||||
if err := u.repo.Objects.Flush(); err != nil {
|
||||
return repo.NullObjectID, repo.NullObjectID, fmt.Errorf("can't flush pending objects: %v", err)
|
||||
}
|
||||
return oid, hcid, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user