From 4564bc704f235e893a2797fd6b5d21c03aa09471 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 3 Feb 2018 18:33:47 -0800 Subject: [PATCH] breaking format change: removed support for groups in block manager, this produces bigger, tightly packed blocks align pack blocks to 4096 bytes and insert random preamble --- block/block_manager.go | 497 ++++++++------------------ block/block_manager_test.go | 370 ++++++------------- block/disk_block_cache.go | 2 +- block/pack_index.go | 60 +--- cli/app.go | 1 - cli/command_block_list.go | 18 +- cli/command_block_repack.go | 5 +- cli/command_block_stats.go | 11 +- cli/command_object_ls.go | 2 +- cli/command_repository_create.go | 4 - internal/blockmgrpb/block_index.pb.go | 82 ++--- internal/blockmgrpb/block_index.proto | 4 +- manifest/manifest_manager.go | 13 +- manifest/manifest_manager_test.go | 7 +- object/object_manager.go | 6 +- object/object_manager_test.go | 4 +- object/object_writer.go | 10 +- repo/initialize.go | 20 +- repo/repository.go | 9 +- repo/repository_test.go | 27 +- snapshot/upload.go | 4 +- 21 files changed, 352 insertions(+), 804 deletions(-) diff --git a/block/block_manager.go b/block/block_manager.go index 076face8d..904a97949 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -1,8 +1,11 @@ package block import ( + cryptorand "crypto/rand" "fmt" + "io" "math" + "math/rand" "sort" "strconv" "strings" @@ -18,23 +21,18 @@ ) const ( - parallelFetches = 5 // number of parallel reads goroutines - flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes - packBlockPrefix = "P" // prefix for all storage blocks that are pack indexes - nonPackedObjectsPackGroup = "raw" // ID of pack group that stores non-packed objects that don't belong to any group - packObjectsPackGroup = "packs" // ID of pack group that stores pack blocks themselves - compactedBlockSuffix = "-z" - maxIndexBlockUploadTime = 1 * time.Minute - maxNonPackedBlocksPerPackIndex = 200 + parallelFetches = 5 // number of parallel reads goroutines + flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes + indexBlockPrefix = "I" // prefix for all storage blocks that are pack indexes + compactedBlockSuffix = "-z" + maxIndexBlockUploadTime = 1 * time.Minute + defaultMinPreambleLength = 32 + defaultMaxPreambleLength = 32 + defaultPaddingUnit = 4096 ) var zeroTime time.Time -type packInfo struct { - currentPackData []byte - currentPackIndex *blockmgrpb.Index -} - type blockLocation struct { packIndex int objectIndex int @@ -45,7 +43,6 @@ type Info struct { BlockID string `json:"blockID"` Length int64 `json:"length"` Timestamp time.Time `json:"time"` - PackGroup string `json:"packGroup,omitempty"` PackBlockID string `json:"packBlockID,omitempty"` PackOffset int64 `json:"packOffset,omitempty"` } @@ -58,19 +55,25 @@ type Manager struct { cache blockCache - mu sync.Mutex - locked bool - groupToBlockToIndex map[string]map[string]*blockmgrpb.Index - + mu sync.Mutex + locked bool + indexLoaded bool + blockIDToIndex map[string]*blockmgrpb.Index + packBlockIDToIndex map[string]*blockmgrpb.Index pendingPackIndexes []*blockmgrpb.Index flushPackIndexesAfter time.Time - openPackGroups map[string]*packInfo + currentPackData []byte + currentPackIndex *blockmgrpb.Index + maxPackedContentLength int maxPackSize int formatter Formatter - timeNow func() time.Time + minPreambleLength int + maxPreambleLength int + paddingUnit int + timeNow func() time.Time } // DeleteBlock marks the given blockID as deleted. @@ -86,52 +89,12 @@ func (bm *Manager) DeleteBlock(blockID string) error { return fmt.Errorf("can't load pack index: %v", err) } - // delete from all indexes - for _, m := range bm.groupToBlockToIndex { - delete(m, blockID) - } - - for _, m := range bm.openPackGroups { - if ndx := m.currentPackIndex; ndx != nil { - delete(ndx.Items, blockID) - } - } - - g := bm.ensurePackGroupLocked("", true) - g.currentPackIndex.DeletedItems = append(g.currentPackIndex.DeletedItems, blockID) + delete(bm.blockIDToIndex, blockID) + delete(bm.currentPackIndex.Items, blockID) + bm.currentPackIndex.DeletedItems = append(bm.currentPackIndex.DeletedItems, blockID) return nil } -func (bm *Manager) registerUnpackedBlock(packGroupID string, blockID string, dataLength int64) error { - bm.lock() - defer bm.unlock() - - g := bm.registerUnpackedBlockLockedNoFlush(packGroupID, blockID, dataLength) - - if bm.timeNow().After(bm.flushPackIndexesAfter) || len(g.currentPackIndex.Items) > maxNonPackedBlocksPerPackIndex { - if err := bm.finishPackAndMaybeFlushIndexes(g); err != nil { - return err - } - } - - return nil -} - -func (bm *Manager) registerUnpackedBlockLockedNoFlush(groupID string, blockID string, dataLength int64) *packInfo { - bm.assertLocked() - - g := bm.ensurePackGroupLocked(groupID, true) - - // See if we already have this block ID in an unpacked pack group. - ndx := bm.groupToBlockToIndex[groupID][blockID] - if ndx != nil { - return g - } - - bm.addToIndexLocked(groupID, blockID, g.currentPackIndex, packOffsetAndSize(0, uint32(dataLength))) - return g -} - func packOffsetAndSize(offset uint32, size uint32) uint64 { return uint64(offset)<<32 | uint64(size) } @@ -143,20 +106,14 @@ func unpackOffsetAndSize(os uint64) (uint32, uint32) { return offset, size } -func (bm *Manager) addToIndexLocked(groupID, blockID string, ndx *blockmgrpb.Index, packedOffsetAndSize uint64) { +func (bm *Manager) addToIndexLocked(blockID string, ndx *blockmgrpb.Index, packedOffsetAndSize uint64) { bm.assertLocked() - m := bm.groupToBlockToIndex[groupID] - if m == nil { - m = make(map[string]*blockmgrpb.Index) - bm.groupToBlockToIndex[groupID] = m - } - ndx.Items[blockID] = packedOffsetAndSize - m[blockID] = ndx + bm.blockIDToIndex[blockID] = ndx } -func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte, force bool) error { +func (bm *Manager) addToPackLocked(blockID string, data []byte, force bool) error { bm.assertLocked() if err := bm.ensurePackIndexesLoaded(); err != nil { @@ -165,21 +122,29 @@ func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte if !force { // See if we already have this block ID in the pack. - if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok { + if _, ok := bm.blockIDToIndex[blockID]; ok { return nil } } - g := bm.ensurePackGroupLocked(packGroup, false) + if len(bm.currentPackData) == 0 && bm.maxPreambleLength > 0 { + preambleLength := rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1) + bm.minPreambleLength + preamble := make([]byte, preambleLength, preambleLength+len(data)) + if _, err := io.ReadFull(cryptorand.Reader, preamble); err != nil { + return err + } - offset := len(g.currentPackData) + bm.currentPackData = preamble + } + + offset := len(bm.currentPackData) shouldFinish := offset+len(data) >= bm.maxPackSize - g.currentPackData = append(g.currentPackData, data...) - bm.addToIndexLocked(packGroup, blockID, g.currentPackIndex, packOffsetAndSize(uint32(offset), uint32(len(data)))) + bm.currentPackData = append(bm.currentPackData, data...) + bm.addToIndexLocked(blockID, bm.currentPackIndex, packOffsetAndSize(uint32(offset), uint32(len(data)))) if shouldFinish { - if err := bm.finishPackAndMaybeFlushIndexes(g); err != nil { + if err := bm.finishPackAndMaybeFlushIndexes(); err != nil { return err } } @@ -187,8 +152,8 @@ func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte return nil } -func (bm *Manager) finishPackAndMaybeFlushIndexes(g *packInfo) error { - if err := bm.finishPackLocked(g); err != nil { +func (bm *Manager) finishPackAndMaybeFlushIndexes() error { + if err := bm.finishPackLocked(); err != nil { return err } @@ -216,32 +181,12 @@ func (bm *Manager) close() error { return bm.cache.close() } -func (bm *Manager) ensurePackGroupLocked(packGroup string, unpacked bool) *packInfo { - var suffix string - - if unpacked { - suffix = ":unpacked" +func (bm *Manager) startPackIndexLocked() { + bm.currentPackIndex = &blockmgrpb.Index{ + Items: make(map[string]uint64), + CreateTimeNanos: uint64(bm.timeNow().UnixNano()), } - g := bm.openPackGroups[packGroup+suffix] - if g == nil { - g = &packInfo{} - bm.openPackGroups[packGroup+suffix] = g - } - - if g.currentPackIndex == nil { - g.currentPackIndex = &blockmgrpb.Index{ - Items: make(map[string]uint64), - PackGroup: packGroup, - CreateTimeNanos: bm.timeNow().UnixNano(), - } - if unpacked { - g.currentPackData = nil - } else { - g.currentPackData = []byte{} - } - } - - return g + bm.currentPackData = []byte{} } func (bm *Manager) flushPackIndexesLocked() error { @@ -274,51 +219,46 @@ func (bm *Manager) writePackIndexes(ndx []*blockmgrpb.Index, replacesBlockBefore inverseTimePrefix := fmt.Sprintf("%016x", math.MaxInt64-time.Now().UnixNano()) - return bm.writeUnpackedBlockNotLocked(data, packBlockPrefix+inverseTimePrefix, suffix, true) + return bm.writeUnpackedBlockNotLocked(data, indexBlockPrefix+inverseTimePrefix, suffix, true) } func (bm *Manager) finishAllOpenPacksLocked() error { // finish non-pack groups first. - for _, g := range bm.openPackGroups { - if g.currentPackIndex != nil && g.currentPackIndex.PackGroup != packObjectsPackGroup { - if err := bm.finishPackLocked(g); err != nil { - return err - } - } - } - // finish pack groups at the very end. - for _, g := range bm.openPackGroups { - if g.currentPackIndex != nil && g.currentPackIndex.PackGroup == packObjectsPackGroup { - if err := bm.finishPackLocked(g); err != nil { - return err - } + if bm.currentPackIndex != nil { + if err := bm.finishPackLocked(); err != nil { + return err } } return nil } -func (bm *Manager) finishPackLocked(g *packInfo) error { - if g.currentPackIndex == nil { - return nil - } - - if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 { - if g.currentPackData != nil { - dataLength := len(g.currentPackData) - blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", "", true) +func (bm *Manager) finishPackLocked() error { + if len(bm.currentPackIndex.Items)+len(bm.currentPackIndex.DeletedItems) > 0 { + if bm.currentPackData != nil { + if bm.paddingUnit > 0 { + if missing := bm.paddingUnit - (len(bm.currentPackData) % bm.paddingUnit); missing > 0 { + postamble := make([]byte, missing) + if _, err := io.ReadFull(cryptorand.Reader, postamble); err != nil { + return fmt.Errorf("can't allocate random bytes for postamble: %v", err) + } + bm.currentPackData = append(bm.currentPackData, postamble...) + } + } + blockID, err := bm.writeUnpackedBlockNotLocked(bm.currentPackData, "", "", true) if err != nil { return fmt.Errorf("can't save pack data block %q: %v", blockID, err) } - bm.registerUnpackedBlockLockedNoFlush(packObjectsPackGroup, blockID, int64(dataLength)) - g.currentPackIndex.PackBlockId = blockID + bm.currentPackIndex.PackBlockId = blockID + bm.packBlockIDToIndex[bm.currentPackIndex.PackBlockId] = bm.currentPackIndex + bm.currentPackIndex.PackLength = uint64(len(bm.currentPackData)) } - bm.pendingPackIndexes = append(bm.pendingPackIndexes, g.currentPackIndex) + bm.pendingPackIndexes = append(bm.pendingPackIndexes, bm.currentPackIndex) } - g.currentPackData = g.currentPackData[:0] - g.currentPackIndex = nil + + bm.startPackIndexLocked() return nil } @@ -460,8 +400,7 @@ func (bm *Manager) loadMergedPackIndexLocked() ([]*blockmgrpb.Index, []string, t func (bm *Manager) ensurePackIndexesLoaded() error { bm.assertLocked() - pi := bm.groupToBlockToIndex - if pi != nil { + if bm.indexLoaded { return nil } @@ -470,46 +409,38 @@ func (bm *Manager) ensurePackIndexesLoaded() error { return err } - bm.groupToBlockToIndex = dedupeBlockIDsAndIndex(merged) + bm.indexLoaded = true + bm.blockIDToIndex, bm.packBlockIDToIndex = dedupeBlockIDsAndIndex(merged) + totalBlocks := len(bm.blockIDToIndex) - totalBlocks := 0 - for _, v := range bm.groupToBlockToIndex { - totalBlocks += len(v) - } - - log.Debug().Int("groups", len(bm.groupToBlockToIndex)).Int("blocks", totalBlocks).Msgf("loaded indexes") + log.Debug().Int("blocks", totalBlocks).Msgf("loaded indexes") return nil } -func dedupeBlockIDsAndIndex(ndx []*blockmgrpb.Index) map[string]map[string]*blockmgrpb.Index { +func dedupeBlockIDsAndIndex(ndx []*blockmgrpb.Index) (blockToIndex, packToIndex map[string]*blockmgrpb.Index) { sort.Slice(ndx, func(i, j int) bool { return ndx[i].CreateTimeNanos < ndx[j].CreateTimeNanos }) - pi := make(map[string]map[string]*blockmgrpb.Index) + blockToIndex = make(map[string]*blockmgrpb.Index) + packToIndex = make(map[string]*blockmgrpb.Index) for _, pck := range ndx { - g := pi[pck.PackGroup] - if g == nil { - g = make(map[string]*blockmgrpb.Index) - pi[pck.PackGroup] = g - } + packToIndex[pck.PackBlockId] = pck for blockID := range pck.Items { - if o := g[blockID]; o != nil { + if o := blockToIndex[blockID]; o != nil { // this pack is same or newer. delete(o.Items, blockID) } - g[blockID] = pck + blockToIndex[blockID] = pck } for _, deletedBlockID := range pck.DeletedItems { - for _, m := range pi { - delete(m, deletedBlockID) - } + delete(blockToIndex, deletedBlockID) } } - return pi + return } func removeEmptyIndexes(ndx []*blockmgrpb.Index) []*blockmgrpb.Index { @@ -523,64 +454,6 @@ func removeEmptyIndexes(ndx []*blockmgrpb.Index) []*blockmgrpb.Index { return res } -func (bm *Manager) regroupPacksAndUnpacked(ndx []*blockmgrpb.Index) []*blockmgrpb.Index { - var res []*blockmgrpb.Index - - allPacks := &blockmgrpb.Index{ - Items: map[string]uint64{}, - PackGroup: packObjectsPackGroup, - CreateTimeNanos: bm.timeNow().UnixNano(), - } - - allNonPacked := &blockmgrpb.Index{ - Items: map[string]uint64{}, - PackGroup: nonPackedObjectsPackGroup, - CreateTimeNanos: bm.timeNow().UnixNano(), - } - - inUsePackBlocks := map[string]bool{} - - // Iterate through all indexes, build merged index of all packs and all non-packed items. - for _, n := range ndx { - if n.PackGroup == packObjectsPackGroup { - for i, o := range n.Items { - allPacks.Items[i] = o - } - continue - } - - if n.PackGroup == nonPackedObjectsPackGroup { - for i, o := range n.Items { - allNonPacked.Items[i] = o - } - continue - } - - if n.PackBlockId != "" { - inUsePackBlocks[n.PackBlockId] = true - } - - res = append(res, n) - } - - // Now delete all pack blocks that are not in use. - for k := range allPacks.Items { - if !inUsePackBlocks[k] { - delete(allPacks.Items, k) - } - } - - if len(allPacks.Items) > 0 { - res = append(res, allPacks) - } - - if len(allNonPacked.Items) > 0 { - res = append(res, allNonPacked) - } - - return res -} - // CompactIndexes performs compaction of index blocks. func (bm *Manager) CompactIndexes() error { bm.lock() @@ -599,7 +472,7 @@ func (bm *Manager) CompactIndexes() error { } // ListBlocks returns the metadata about blocks with a given prefix and kind. -func (bm *Manager) ListBlocks(prefix string, kind string) ([]Info, error) { +func (bm *Manager) ListBlocks(prefix string) ([]Info, error) { bm.lock() defer bm.unlock() @@ -609,57 +482,12 @@ func (bm *Manager) ListBlocks(prefix string, kind string) ([]Info, error) { return nil, fmt.Errorf("can't load pack index: %v", err) } - packBlockIDs := map[string]bool{} - for _, blockToIndex := range bm.groupToBlockToIndex { - for _, b := range blockToIndex { - packBlockIDs[b.PackBlockId] = true - } - } - - var blockMatches func(Info, *blockmgrpb.Index) bool - - switch kind { - case "all": - blockMatches = func(Info, *blockmgrpb.Index) bool { return true } - - case "logical": // blocks that are not pack blocks - blockMatches = func(b Info, _ *blockmgrpb.Index) bool { - return !packBlockIDs[b.BlockID] + for b, ndx := range bm.blockIDToIndex { + if !strings.HasPrefix(b, prefix) { + continue } - case "packs": // blocks that are pack blocks - blockMatches = func(b Info, _ *blockmgrpb.Index) bool { - return packBlockIDs[b.BlockID] - } - - case "packed": // blocks that are packed - blockMatches = func(b Info, ndx *blockmgrpb.Index) bool { - return ndx.PackBlockId != "" - } - - case "nonpacked": // blocks that are not packed - blockMatches = func(b Info, ndx *blockmgrpb.Index) bool { - return ndx.PackBlockId == "" - } - - default: - blockMatches = func(Info, *blockmgrpb.Index) bool { return false } - } - - for _, blockToIndex := range bm.groupToBlockToIndex { - for b, ndx := range blockToIndex { - if !strings.HasPrefix(b, prefix) { - continue - } - - nfo := newInfo(b, ndx) - - if !blockMatches(nfo, ndx) { - continue - } - - result = append(result, nfo) - } + result = append(result, newInfo(b, ndx)) } return result, nil @@ -670,35 +498,15 @@ func newInfo(blockID string, ndx *blockmgrpb.Index) Info { return Info{ BlockID: blockID, Length: int64(size), - Timestamp: time.Unix(0, ndx.CreateTimeNanos), - PackGroup: ndx.PackGroup, + Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)), PackBlockID: ndx.PackBlockId, PackOffset: int64(offset), } } -// ListGroupBlocks returns the list of blocks in the specified group (in random order). -func (bm *Manager) ListGroupBlocks(groupID string) ([]Info, error) { - bm.lock() - defer bm.unlock() - - var result []Info - - if err := bm.ensurePackIndexesLoaded(); err != nil { - return nil, fmt.Errorf("can't load pack index: %v", err) - } - - for blockID, ndx := range bm.groupToBlockToIndex[groupID] { - result = append(result, newInfo(blockID, ndx)) - } - - return result, nil -} - func (bm *Manager) compactIndexes(merged []*blockmgrpb.Index, blockIDs []string, latestBlockTime time.Time) error { dedupeBlockIDsAndIndex(merged) merged = removeEmptyIndexes(merged) - merged = bm.regroupPacksAndUnpacked(merged) if len(blockIDs) <= 1 { log.Printf("skipping index compaction - already compacted") return nil @@ -742,37 +550,20 @@ func (bm *Manager) Flush() error { // WriteBlock saves a given block of data to a pack group with a provided name and returns a blockID // that's based on the contents of data written. -func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) { - if bm.maxPackedContentLength > 0 && len(data) <= bm.maxPackedContentLength { - blockID := bm.hashData(data) +func (bm *Manager) WriteBlock(data []byte, prefix string) (string, error) { + blockID := prefix + bm.hashData(data) - bm.lock() - defer bm.unlock() - - err := bm.addToPackLocked(groupID, blockID, data, false) - return blockID, err - } - - blockID, err := bm.writeUnpackedBlockNotLocked(data, "", "", false) - if err != nil { - return "", err - } - - bm.registerUnpackedBlock(nonPackedObjectsPackGroup, blockID, int64(len(data))) - if groupID != "" { - bm.registerUnpackedBlock(groupID, blockID, int64(len(data))) - } - return blockID, nil -} - -// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size. -func (bm *Manager) Repackage(groupID string, maxLength int64) error { bm.lock() defer bm.unlock() - if groupID == "" || groupID == nonPackedObjectsPackGroup || groupID == packObjectsPackGroup { - return fmt.Errorf("invalid group ID: %q", groupID) - } + err := bm.addToPackLocked(blockID, data, false) + return blockID, err +} + +// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size. +func (bm *Manager) Repackage(maxLength uint64) error { + bm.lock() + defer bm.unlock() if err := bm.ensurePackIndexesLoaded(); err != nil { return err @@ -784,23 +575,22 @@ func (bm *Manager) Repackage(groupID string, maxLength int64) error { } var toRepackage []*blockmgrpb.Index - var totalBytes int64 + var totalBytes uint64 for _, m := range merged { - if m.PackGroup == groupID && m.PackBlockId != "" { - bi, err := bm.blockInfoLocked(m.PackBlockId) - if err != nil { - return fmt.Errorf("unable to get info on block %q: %v", m.PackBlockId, err) - } + bi, ok := bm.packBlockIDToIndex[m.PackBlockId] + if !ok { + return fmt.Errorf("unable to get info on pack block %q", m.PackBlockId) + } - if bi.Length <= maxLength { - toRepackage = append(toRepackage, m) - totalBytes += bi.Length - } + if bi.PackLength <= maxLength { + toRepackage = append(toRepackage, m) + totalBytes += bi.PackLength } } log.Printf("%v blocks to re-package (%v total bytes)", len(toRepackage), totalBytes) + done := map[string]bool{} for _, m := range toRepackage { data, err := bm.getBlockInternalLocked(m.PackBlockId) @@ -809,11 +599,15 @@ func (bm *Manager) Repackage(groupID string, maxLength int64) error { } for blockID, os := range m.Items { + if done[blockID] { + continue + } + done[blockID] = true log.Printf("re-packaging: %v %v", blockID, os) offset, size := unpackOffsetAndSize(os) blockData := data[offset : offset+size] - if err := bm.addToPackLocked(groupID, blockID, blockData, true); err != nil { + if err := bm.addToPackLocked(blockID, blockData, true); err != nil { return fmt.Errorf("unable to re-package %q: %v", blockID, err) } } @@ -868,18 +662,15 @@ func (bm *Manager) hashData(data []byte) string { func (bm *Manager) getPendingBlockLocked(blockID string) ([]byte, error) { bm.assertLocked() - for _, p := range bm.openPackGroups { - if ndx := p.currentPackIndex; ndx != nil { - if p.currentPackData == nil { - continue - } - + if ndx := bm.currentPackIndex; ndx != nil { + if bm.currentPackData != nil { if blk, ok := ndx.Items[blockID]; ok { offset, size := unpackOffsetAndSize(blk) - return p.currentPackData[offset : offset+size], nil + return bm.currentPackData[offset : offset+size], nil } } } + return nil, storage.ErrBlockNotFound } @@ -914,26 +705,30 @@ func (bm *Manager) BlockInfo(blockID string) (Info, error) { func (bm *Manager) findIndexForBlockLocked(blockID string) *blockmgrpb.Index { bm.assertLocked() - if ndx := bm.groupToBlockToIndex[""][blockID]; ndx != nil { + if ndx := bm.blockIDToIndex[blockID]; ndx != nil { return ndx } - for _, v := range bm.groupToBlockToIndex { - if ndx := v[blockID]; ndx != nil { - return ndx - } - } - return nil } func (bm *Manager) blockInfoLocked(blockID string) (Info, error) { - if strings.HasPrefix(blockID, packBlockPrefix) { + if strings.HasPrefix(blockID, indexBlockPrefix) { return Info{}, nil } bm.assertLocked() + if ndx, ok := bm.packBlockIDToIndex[blockID]; ok { + // pack block + return Info{ + BlockID: blockID, + Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)), + PackOffset: 0, + Length: int64(ndx.PackLength), + }, nil + } + ndx := bm.findIndexForBlockLocked(blockID) if ndx == nil { return Info{}, storage.ErrBlockNotFound @@ -943,8 +738,7 @@ func (bm *Manager) blockInfoLocked(blockID string) (Info, error) { return Info{ BlockID: blockID, - PackGroup: ndx.PackGroup, - Timestamp: time.Unix(0, ndx.CreateTimeNanos), + Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)), PackBlockID: ndx.PackBlockId, PackOffset: int64(offset), Length: int64(size), @@ -1033,7 +827,7 @@ func listIndexBlocksFromStorage(st storage.Storage, full bool) ([]Info, error) { maxCompactions = math.MaxInt32 } - ch, cancel := st.ListBlocks(packBlockPrefix) + ch, cancel := st.ListBlocks(indexBlockPrefix) defer cancel() var results []Info @@ -1068,6 +862,10 @@ func listIndexBlocksFromStorage(st storage.Storage, full bool) ([]Info, error) { // NewManager creates new block manager with given packing options and a formatter. func NewManager(st storage.Storage, f FormattingOptions, caching CachingOptions) (*Manager, error) { + return newManagerWithTime(st, f, caching, time.Now) +} + +func newManagerWithTime(st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time) (*Manager, error) { sf := FormatterFactories[f.BlockFormat] if sf == nil { return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat) @@ -1078,20 +876,27 @@ func NewManager(st storage.Storage, f FormattingOptions, caching CachingOptions) return nil, err } - return &Manager{ + m := &Manager{ Format: f, - openPackGroups: make(map[string]*packInfo), - timeNow: time.Now, - flushPackIndexesAfter: time.Now().Add(flushPackIndexTimeout), + timeNow: timeNow, + flushPackIndexesAfter: timeNow().Add(flushPackIndexTimeout), pendingPackIndexes: nil, maxPackedContentLength: f.MaxPackedContentLength, maxPackSize: f.MaxPackSize, formatter: formatter, + blockIDToIndex: make(map[string]*blockmgrpb.Index), + packBlockIDToIndex: make(map[string]*blockmgrpb.Index), + minPreambleLength: defaultMinPreambleLength, + maxPreambleLength: defaultMaxPreambleLength, + paddingUnit: defaultPaddingUnit, cache: newBlockCache(st, caching), - }, nil -} + } + m.startPackIndexLocked() + + return m, nil +} func getCompactedTimestamp(blk string) (time.Time, bool) { if p := strings.Index(blk, compactedBlockSuffix); p >= 0 { unixNano, err := strconv.ParseInt(blk[p+len(compactedBlockSuffix):], 16, 64) diff --git a/block/block_manager_test.go b/block/block_manager_test.go index fbc7b9312..aa21f13b5 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -5,14 +5,17 @@ "encoding/hex" "fmt" "math/rand" + "os" "reflect" "strings" + "sync" "testing" "time" "github.com/gogo/protobuf/proto" "github.com/kopia/kopia/internal/blockmgrpb" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/kopia/kopia/internal/storagetesting" @@ -26,10 +29,15 @@ var fakeTime = time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC) +func init() { + //zerolog.SetGlobalLevel(zerolog.InfoLevel) + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) +} + func TestBlockManagerEmptyFlush(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) bm.Flush() if got, want := len(data), 0; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) @@ -39,8 +47,8 @@ func TestBlockManagerEmptyFlush(t *testing.T) { func TestBlockZeroBytes1(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - writeBlockAndVerify(t, bm, "", []byte{}) + bm := newTestBlockManager(data, keyTime, nil) + writeBlockAndVerify(t, bm, []byte{}) bm.Flush() if got, want := len(data), 2; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) @@ -51,9 +59,9 @@ func TestBlockZeroBytes1(t *testing.T) { func TestBlockZeroBytes2(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - writeBlockAndVerify(t, bm, "", seededRandomData(10, 10)) - writeBlockAndVerify(t, bm, "", []byte{}) + bm := newTestBlockManager(data, keyTime, nil) + writeBlockAndVerify(t, bm, seededRandomData(10, 10)) + writeBlockAndVerify(t, bm, []byte{}) bm.Flush() dumpBlockManagerData(data) if got, want := len(data), 2; got != want { @@ -65,10 +73,10 @@ func TestBlockZeroBytes2(t *testing.T) { func TestBlockManagerSmallBlockWrites(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) for i := 0; i < 100; i++ { - writeBlockAndVerify(t, bm, "", seededRandomData(i, 10)) + writeBlockAndVerify(t, bm, seededRandomData(i, 10)) } if got, want := len(data), 0; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) @@ -79,37 +87,13 @@ func TestBlockManagerSmallBlockWrites(t *testing.T) { } } -func TestBlockManagerUnpackedBlockWrites(t *testing.T) { - data := map[string][]byte{} - keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - - for i := 0; i < 100; i++ { - writeBlockAndVerify(t, bm, "", seededRandomData(i, 1001)) - } - - log.Printf("writing again") - // make sure deduping works. - for i := 0; i < 100; i++ { - writeBlockAndVerify(t, bm, "", seededRandomData(i, 1001)) - } - t.Logf("finished writing again") - if got, want := len(data), 100; got != want { - t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) - } - bm.Flush() - if got, want := len(data), 101; got != want { - t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) - } -} - func TestBlockManagerDedupesPendingBlocks(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) for i := 0; i < 100; i++ { - writeBlockAndVerify(t, bm, "", seededRandomData(0, maxPackedContentLength-1)) + writeBlockAndVerify(t, bm, seededRandomData(0, maxPackedContentLength-1)) } if got, want := len(data), 0; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) @@ -123,23 +107,26 @@ func TestBlockManagerDedupesPendingBlocks(t *testing.T) { func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) - writeBlockAndVerify(t, bm, "", seededRandomData(0, 999)) - writeBlockAndVerify(t, bm, "", seededRandomData(1, 999)) - writeBlockAndVerify(t, bm, "", seededRandomData(2, 10)) - if got, want := len(data), 1; got != want { + // no writes here, all data fits in a single pack. + writeBlockAndVerify(t, bm, seededRandomData(0, 950)) + writeBlockAndVerify(t, bm, seededRandomData(1, 950)) + writeBlockAndVerify(t, bm, seededRandomData(2, 10)) + if got, want := len(data), 0; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } - // no writes - writeBlockAndVerify(t, bm, "", seededRandomData(0, 999)) - writeBlockAndVerify(t, bm, "", seededRandomData(1, 999)) - writeBlockAndVerify(t, bm, "", seededRandomData(2, 10)) - if got, want := len(data), 1; got != want { + // no writes here + writeBlockAndVerify(t, bm, seededRandomData(0, 950)) + writeBlockAndVerify(t, bm, seededRandomData(1, 950)) + writeBlockAndVerify(t, bm, seededRandomData(2, 10)) + if got, want := len(data), 0; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } bm.Flush() + + // this flushes the pack block + index block if got, want := len(data), 2; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } @@ -149,7 +136,7 @@ func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) { func TestBlockManagerEmpty(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) noSuchBlockID := md5hash([]byte("foo")) @@ -168,90 +155,25 @@ func TestBlockManagerEmpty(t *testing.T) { } } -func TestBlockManagerPackIdentialToRawObject(t *testing.T) { - data0 := []byte{} - data1 := seededRandomData(1, 600) - data2 := seededRandomData(2, 600) - data3 := append(append([]byte(nil), data1...), data2...) - - b0 := md5hash(data0) - b1 := md5hash(data1) - b2 := md5hash(data2) - b3 := md5hash(data3) - - t.Logf("data0 hash: %v", b0) - t.Logf("data1 hash: %v", b1) - t.Logf("data2 hash: %v", b2) - t.Logf("data3 hash: %v", b3) - - cases := []struct { - ordering [][]byte - expectedBlockCount int - }{ - {ordering: [][]byte{data1, data2, data3, data0}, expectedBlockCount: 2}, - {ordering: [][]byte{data0, data1, data2, data3}, expectedBlockCount: 2}, - {ordering: [][]byte{data1, data0, data2, data3}, expectedBlockCount: 2}, - {ordering: [][]byte{data0, data1, data0, data2, data3}, expectedBlockCount: 2}, - {ordering: [][]byte{data0, data1, data0, data2, data3, data0}, expectedBlockCount: 2}, - {ordering: [][]byte{data1, data0, data2, nil, data0, data3}, expectedBlockCount: 3}, - {ordering: [][]byte{data1, data2, nil, data0, data3}, expectedBlockCount: 4}, - {ordering: [][]byte{data3, nil, data1, data2, data0}, expectedBlockCount: 3}, - {ordering: [][]byte{data3, data1, data2, data0}, expectedBlockCount: 2}, - {ordering: [][]byte{data3, data0, data1, data2}, expectedBlockCount: 2}, - {ordering: [][]byte{data3, data1, data0, data2}, expectedBlockCount: 2}, - {ordering: [][]byte{data3, data0, data1, data0, data2, data0}, expectedBlockCount: 2}, - } - - for i, tc := range cases { - data := map[string][]byte{} - keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - - t.Run(fmt.Sprintf("case-%v", i), func(t *testing.T) { - for _, b := range tc.ordering { - if b == nil { - bm.Flush() - continue - } - - t.Logf("writing %v", md5hash(b)) - writeBlockAndVerify(t, bm, "some-group", b) - } - - verifyBlock(t, bm, b0, data0) - verifyBlock(t, bm, b1, data1) - verifyBlock(t, bm, b2, data2) - verifyBlock(t, bm, b3, data3) - bm.Flush() - dumpBlockManagerData(data) - verifyBlock(t, bm, b0, data0) - verifyBlock(t, bm, b1, data1) - verifyBlock(t, bm, b2, data2) - verifyBlock(t, bm, b3, data3) - bm.Flush() - - // 2 data blocks written. - if got, want := len(data), tc.expectedBlockCount; got != want { - t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) - } - }) - } -} - func TestBlockManagerRepack(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) + + // disable preamble, postamble and padding, so that each pack block is identical to its contents + bm.maxPreambleLength = 0 + bm.minPreambleLength = 0 + bm.paddingUnit = 0 d1 := seededRandomData(1, 10) d2 := seededRandomData(2, 20) d3 := seededRandomData(3, 30) - writeBlockAndVerify(t, bm, "g1", d1) + writeBlockAndVerify(t, bm, d1) bm.Flush() - writeBlockAndVerify(t, bm, "g1", d2) + writeBlockAndVerify(t, bm, d2) bm.Flush() - writeBlockAndVerify(t, bm, "g1", d3) + writeBlockAndVerify(t, bm, d3) bm.Flush() // 3 data blocks, 3 index blocks. @@ -259,7 +181,10 @@ func TestBlockManagerRepack(t *testing.T) { t.Errorf("unexpected block count: %v, wanted %v", got, want) } - if err := bm.Repackage("g1", 5); err != nil { + log.Printf("before repackage") + dumpBlockManagerData(data) + + if err := bm.Repackage(5); err != nil { t.Errorf("repackage failure: %v", err) } bm.Flush() @@ -269,12 +194,13 @@ func TestBlockManagerRepack(t *testing.T) { t.Errorf("unexpected block count: %v, wanted %v", got, want) } - setFakeTime(bm, fakeTime.Add(1*time.Second)) + bm.timeNow = fakeTimeNowFrozen(fakeTime.Add(1 * time.Second)) - if err := bm.Repackage("g1", 30); err != nil { + if err := bm.Repackage(30); err != nil { t.Errorf("repackage failure: %v", err) } bm.Flush() + log.Printf("after repackage") dumpBlockManagerData(data) @@ -310,12 +236,12 @@ func verifyActiveIndexBlockCount(t *testing.T, bm *Manager, expected int) { func TestBlockManagerInternalFlush(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) for i := 0; i < 100; i++ { b := make([]byte, 25) rand.Read(b) - writeBlockAndVerify(t, bm, "", b) + writeBlockAndVerify(t, bm, b) } // 1 data block written, but no index yet. @@ -327,7 +253,7 @@ func TestBlockManagerInternalFlush(t *testing.T) { for i := 0; i < 100; i++ { b := make([]byte, 25) rand.Read(b) - writeBlockAndVerify(t, bm, "", b) + writeBlockAndVerify(t, bm, b) } // 2 data blocks written, but no index yet. @@ -348,7 +274,7 @@ func TestBlockManagerInternalFlush(t *testing.T) { func TestBlockManagerWriteMultiple(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) + bm := newTestBlockManager(data, keyTime, nil) var blockIDs []string @@ -356,7 +282,7 @@ func TestBlockManagerWriteMultiple(t *testing.T) { //t.Logf("i=%v", i) b := seededRandomData(i, i%113) //t.Logf("writing block #%v with %x", i, b) - blkID, err := bm.WriteBlock("", b) + blkID, err := bm.WriteBlock(b, "") //t.Logf("wrote %v=%v", i, blkID) if err != nil { t.Errorf("err: %v", err) @@ -375,7 +301,7 @@ func TestBlockManagerWriteMultiple(t *testing.T) { bm.Flush() t.Logf("data block count: %v", len(data)) //dumpBlockManagerData(data) - bm = newTestBlockManager(data, keyTime) + bm = newTestBlockManager(data, keyTime, nil) } } @@ -388,84 +314,16 @@ func TestBlockManagerWriteMultiple(t *testing.T) { } } -func TestBlockManagerListGroups(t *testing.T) { - blockSizes := []int{10, 1500} - - for _, blockSize := range blockSizes { - blockSize := blockSize - t.Run(fmt.Sprintf("block-size-%v", blockSize), func(t *testing.T) { - data := map[string][]byte{} - keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - data1 := seededRandomData(1, blockSize) - data2 := seededRandomData(2, blockSize) - data3 := seededRandomData(3, blockSize) - - writeBlockAndVerify(t, bm, "group1", data1) - writeBlockAndVerify(t, bm, "group1", data2) - writeBlockAndVerify(t, bm, "group1", data3) - - writeBlockAndVerify(t, bm, "group2", data1) - writeBlockAndVerify(t, bm, "group2", data3) - - writeBlockAndVerify(t, bm, "group3", data1) - writeBlockAndVerify(t, bm, "group3", data2) - - writeBlockAndVerify(t, bm, "group4", data2) - writeBlockAndVerify(t, bm, "group4", data3) - - verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3)) - verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3)) - verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2)) - verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3)) - - bm.Flush() - - data1b := seededRandomData(11, blockSize) - data2b := seededRandomData(12, blockSize) - data3b := seededRandomData(13, blockSize) - - bm = newTestBlockManager(data, keyTime) - writeBlockAndVerify(t, bm, "group1", data1b) - writeBlockAndVerify(t, bm, "group1", data2b) - writeBlockAndVerify(t, bm, "group1", data3b) - - writeBlockAndVerify(t, bm, "group2", data1b) - writeBlockAndVerify(t, bm, "group2", data3b) - - writeBlockAndVerify(t, bm, "group3", data1b) - writeBlockAndVerify(t, bm, "group3", data2b) - - writeBlockAndVerify(t, bm, "group4", data2b) - writeBlockAndVerify(t, bm, "group4", data3b) - - verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3), md5hash(data1b), md5hash(data2b), md5hash(data3b)) - verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3), md5hash(data1b), md5hash(data3b)) - verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2), md5hash(data1b), md5hash(data2b)) - verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3), md5hash(data2b), md5hash(data3b)) - bm.Flush() - bm = newTestBlockManager(data, keyTime) - verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3), md5hash(data1b), md5hash(data2b), md5hash(data3b)) - verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3), md5hash(data1b), md5hash(data3b)) - verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2), md5hash(data1b), md5hash(data2b)) - verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3), md5hash(data2b), md5hash(data3b)) - - dumpBlockManagerData(data) - }) - } -} - func TestBlockManagerConcurrency(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - preexistingBlock := writeBlockAndVerify(t, bm, "", seededRandomData(10, 100)) + bm := newTestBlockManager(data, keyTime, nil) + preexistingBlock := writeBlockAndVerify(t, bm, seededRandomData(10, 100)) bm.Flush() - bm1 := newTestBlockManager(data, keyTime) - bm2 := newTestBlockManager(data, keyTime) - bm3 := newTestBlockManager(data, keyTime) - setFakeTime(bm3, fakeTime.Add(1)) + bm1 := newTestBlockManager(data, keyTime, nil) + bm2 := newTestBlockManager(data, keyTime, nil) + bm3 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1)) // all bm* can see pre-existing block verifyBlock(t, bm1, preexistingBlock, seededRandomData(10, 100)) @@ -473,14 +331,14 @@ func TestBlockManagerConcurrency(t *testing.T) { verifyBlock(t, bm3, preexistingBlock, seededRandomData(10, 100)) // write the same block in all managers. - sharedBlock := writeBlockAndVerify(t, bm1, "", seededRandomData(20, 100)) - writeBlockAndVerify(t, bm2, "", seededRandomData(20, 100)) - writeBlockAndVerify(t, bm3, "", seededRandomData(20, 100)) + sharedBlock := writeBlockAndVerify(t, bm1, seededRandomData(20, 100)) + writeBlockAndVerify(t, bm2, seededRandomData(20, 100)) + writeBlockAndVerify(t, bm3, seededRandomData(20, 100)) // write unique block per manager. - bm1block := writeBlockAndVerify(t, bm1, "", seededRandomData(31, 100)) - bm2block := writeBlockAndVerify(t, bm2, "", seededRandomData(32, 100)) - bm3block := writeBlockAndVerify(t, bm3, "", seededRandomData(33, 100)) + bm1block := writeBlockAndVerify(t, bm1, seededRandomData(31, 100)) + bm2block := writeBlockAndVerify(t, bm2, seededRandomData(32, 100)) + bm3block := writeBlockAndVerify(t, bm3, seededRandomData(33, 100)) // make sure they can't see each other's unflushed blocks. verifyBlockNotFound(t, bm1, bm2block) @@ -502,7 +360,7 @@ func TestBlockManagerConcurrency(t *testing.T) { verifyBlockNotFound(t, bm3, bm2block) // new block manager at this point can see all data. - bm4 := newTestBlockManager(data, keyTime) + bm4 := newTestBlockManager(data, keyTime, nil) verifyBlock(t, bm4, preexistingBlock, seededRandomData(10, 100)) verifyBlock(t, bm4, sharedBlock, seededRandomData(20, 100)) verifyBlock(t, bm4, bm1block, seededRandomData(31, 100)) @@ -521,7 +379,7 @@ func TestBlockManagerConcurrency(t *testing.T) { } // new block manager at this point can see all data. - bm5 := newTestBlockManager(data, keyTime) + bm5 := newTestBlockManager(data, keyTime, nil) verifyBlock(t, bm5, preexistingBlock, seededRandomData(10, 100)) verifyBlock(t, bm5, sharedBlock, seededRandomData(20, 100)) verifyBlock(t, bm5, bm1block, seededRandomData(31, 100)) @@ -535,11 +393,10 @@ func TestBlockManagerConcurrency(t *testing.T) { func TestDeleteBlock(t *testing.T) { data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - setFakeTimeWithAutoAdvance(bm, fakeTime, 1) - block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100)) + bm := newTestBlockManager(data, keyTime, nil) + block1 := writeBlockAndVerify(t, bm, seededRandomData(10, 100)) bm.Flush() - block2 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(11, 100)) + block2 := writeBlockAndVerify(t, bm, seededRandomData(11, 100)) if err := bm.DeleteBlock(block1); err != nil { t.Errorf("unable to delete block: %v", block1) } @@ -549,7 +406,7 @@ func TestDeleteBlock(t *testing.T) { verifyBlockNotFound(t, bm, block1) verifyBlockNotFound(t, bm, block2) bm.Flush() - bm = newTestBlockManager(data, keyTime) + bm = newTestBlockManager(data, keyTime, nil) dumpBlockManagerData(data) verifyBlockNotFound(t, bm, block1) verifyBlockNotFound(t, bm, block2) @@ -573,37 +430,35 @@ func TestDeleteAndRecreate(t *testing.T) { // write a block data := map[string][]byte{} keyTime := map[string]time.Time{} - bm := newTestBlockManager(data, keyTime) - setFakeTimeWithAutoAdvance(bm, fakeTime, 1) - block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100)) + bm := newTestBlockManager(data, keyTime, fakeTimeNowFrozen(fakeTime)) + block1 := writeBlockAndVerify(t, bm, seededRandomData(10, 100)) bm.Flush() // delete but at given timestamp but don't commit yet. - bm0 := newTestBlockManager(data, keyTime) - setFakeTimeWithAutoAdvance(bm0, tc.deletionTime, 1) + bm0 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1)) bm0.DeleteBlock(block1) // delete it at t0+10 - bm1 := newTestBlockManager(data, keyTime) - setFakeTimeWithAutoAdvance(bm1, fakeTime.Add(10), 1) + bm1 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10), 1)) verifyBlock(t, bm1, block1, seededRandomData(10, 100)) bm1.DeleteBlock(block1) bm1.Flush() // recreate at t0+20 - bm2 := newTestBlockManager(data, keyTime) - setFakeTimeWithAutoAdvance(bm2, fakeTime.Add(20), 1) - block2 := writeBlockAndVerify(t, bm2, "some-group", seededRandomData(10, 100)) + bm2 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20), 1)) + block2 := writeBlockAndVerify(t, bm2, seededRandomData(10, 100)) bm2.Flush() // commit deletion from bm0 (t0+5) bm0.Flush() + dumpBlockManagerData(data) + if block1 != block2 { t.Errorf("got invalid block %v, expected %v", block2, block1) } - bm3 := newTestBlockManager(data, keyTime) + bm3 := newTestBlockManager(data, keyTime, nil) if tc.isVisible { verifyBlock(t, bm3, block1, seededRandomData(10, 100)) } else { @@ -613,19 +468,20 @@ func TestDeleteAndRecreate(t *testing.T) { } } -func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time) *Manager { +func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, timeFunc func() time.Time) *Manager { st := storagetesting.NewMapStorage(data, keyTime) //st = logging.NewWrapper(st) - bm, err := NewManager(st, FormattingOptions{ + if timeFunc == nil { + timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1) + } + bm, err := newManagerWithTime(st, FormattingOptions{ BlockFormat: "TESTONLY_MD5", MaxPackedContentLength: maxPackedContentLength, MaxPackSize: maxPackSize, - }, CachingOptions{}) + }, CachingOptions{}, timeFunc) if err != nil { panic("can't create block manager: " + err.Error()) } - - setFakeTime(bm, fakeTime) return bm } @@ -633,7 +489,7 @@ func getIndexCount(d map[string][]byte) int { var cnt int for k := range d { - if strings.HasPrefix(k, packBlockPrefix) { + if strings.HasPrefix(k, indexBlockPrefix) { cnt++ } } @@ -641,12 +497,15 @@ func getIndexCount(d map[string][]byte) int { return cnt } -func setFakeTime(bm *Manager, t time.Time) { - bm.timeNow = func() time.Time { return t } +func fakeTimeNowFrozen(t time.Time) func() time.Time { + return fakeTimeNowWithAutoAdvance(t, 0) } -func setFakeTimeWithAutoAdvance(bm *Manager, t time.Time, dt time.Duration) { - bm.timeNow = func() time.Time { +func fakeTimeNowWithAutoAdvance(t time.Time, dt time.Duration) func() time.Time { + var mu sync.Mutex + return func() time.Time { + mu.Lock() + defer mu.Unlock() ret := t t = t.Add(dt) return ret @@ -685,10 +544,10 @@ func verifyBlock(t *testing.T, bm *Manager, blockID string, b []byte) { } } -func writeBlockAndVerify(t *testing.T, bm *Manager, packGroup string, b []byte) string { +func writeBlockAndVerify(t *testing.T, bm *Manager, b []byte) string { t.Helper() - blockID, err := bm.WriteBlock(packGroup, b) + blockID, err := bm.WriteBlock(b, "") if err != nil { t.Errorf("err: %v", err) } @@ -716,33 +575,22 @@ func md5hash(b []byte) string { func dumpBlockManagerData(data map[string][]byte) { for k, v := range data { - if k[0] == 'P' { + if k[0] == 'I' { var payload blockmgrpb.Indexes proto.Unmarshal(v, &payload) - log.Printf("data[%v] = %v", k, proto.MarshalTextString(&payload)) + fmt.Printf("index %v:\n", k) + for _, ndx := range payload.Indexes { + fmt.Printf(" pack %v len: %v created %v\n", ndx.PackBlockId, ndx.PackLength, time.Unix(0, int64(ndx.CreateTimeNanos)).Local()) + for blk, os := range ndx.Items { + off, size := unpackOffsetAndSize(os) + fmt.Printf(" block[%v]={offset:%v size:%v}\n", blk, off, size) + } + for _, del := range ndx.DeletedItems { + fmt.Printf(" deleted %v\n", del) + } + } } else { - log.Printf("data[%v] = %v bytes", k, len(v)) + fmt.Printf("data %v (%v bytes)\n", k, len(v)) } } } - -func verifyGroupListContains(t *testing.T, bm *Manager, groupID string, expected ...string) { - got := map[string]bool{} - want := map[string]bool{} - blks, err := bm.ListGroupBlocks(groupID) - if err != nil { - t.Errorf("error listing blocks: %v", err) - return - } - for _, a := range blks { - got[a.BlockID] = true - } - - for _, e := range expected { - want[e] = true - } - - if !reflect.DeepEqual(got, want) { - t.Errorf("unexpected contents of group %q: %v, wanted %v", groupID, got, want) - } -} diff --git a/block/disk_block_cache.go b/block/disk_block_cache.go index e0a900849..a5af5e4c1 100644 --- a/block/disk_block_cache.go +++ b/block/disk_block_cache.go @@ -158,7 +158,7 @@ func (c *diskBlockCache) readBlocksFromCacheFile(f *os.File) ([]Info, error) { func (c *diskBlockCache) readBlocksFromSource(maxCompactions int) ([]Info, error) { var blocks []Info - ch, cancel := c.st.ListBlocks(packBlockPrefix) + ch, cancel := c.st.ListBlocks(indexBlockPrefix) defer cancel() numCompactions := 0 diff --git a/block/pack_index.go b/block/pack_index.go index 6d06e6707..22cacc0b0 100644 --- a/block/pack_index.go +++ b/block/pack_index.go @@ -1,21 +1,15 @@ package block import ( - "bytes" - "compress/gzip" "encoding/json" "fmt" - "io" "strconv" "strings" - "time" "github.com/golang/protobuf/proto" "github.com/kopia/kopia/internal/blockmgrpb" ) -type packIndexes []*packIndex - type offsetAndSize struct { offset uint32 size uint32 @@ -52,50 +46,7 @@ func (o *offsetAndSize) UnmarshalJSON(b []byte) error { return nil } -type packIndex struct { - PackBlockID string `json:"packBlock,omitempty"` - PackGroup string `json:"packGroup,omitempty"` - CreateTime time.Time `json:"createTime"` - Items map[string]offsetAndSize `json:"items"` - DeletedItems []string `json:"deletedItems,omitempty"` -} - -func loadPackIndexesLegacy(r io.Reader) ([]*blockmgrpb.Index, error) { - var pi packIndexes - - if err := json.NewDecoder(r).Decode(&pi); err != nil { - return nil, err - } - - var result []*blockmgrpb.Index - - for _, v := range pi { - result = append(result, convertLegacyIndex(v)) - } - - return result, nil -} - -func convertLegacyIndex(pi *packIndex) *blockmgrpb.Index { - res := &blockmgrpb.Index{ - CreateTimeNanos: pi.CreateTime.UnixNano(), - DeletedItems: pi.DeletedItems, - PackBlockId: pi.PackBlockID, - PackGroup: pi.PackGroup, - } - - if len(pi.Items) > 0 { - res.Items = make(map[string]uint64) - - for k, v := range pi.Items { - res.Items[k] = packOffsetAndSize(v.offset, v.size) - } - } - - return res -} - -func loadPackIndexesNew(data []byte) ([]*blockmgrpb.Index, error) { +func loadPackIndexes(data []byte) ([]*blockmgrpb.Index, error) { var b blockmgrpb.Indexes if err := proto.Unmarshal(data, &b); err != nil { @@ -104,12 +55,3 @@ func loadPackIndexesNew(data []byte) ([]*blockmgrpb.Index, error) { return b.Indexes, nil } - -func loadPackIndexes(data []byte) ([]*blockmgrpb.Index, error) { - gz, err := gzip.NewReader(bytes.NewReader(data)) - if err != nil { - return loadPackIndexesNew(data) - } - - return loadPackIndexesLegacy(gz) -} diff --git a/cli/app.go b/cli/app.go index 1ce9972f9..e942ee87d 100644 --- a/cli/app.go +++ b/cli/app.go @@ -9,7 +9,6 @@ snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap") policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies") - metadataCommands = app.Command("metadata", "Low-level commands to manipulate metadata items.").Alias("md") manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.") objectCommands = app.Command("object", "Commands to manipulate objects in repository.").Alias("obj") blockCommands = app.Command("block", "Commands to manipulate virtual blocks in repository.").Alias("blk") diff --git a/cli/command_block_list.go b/cli/command_block_list.go index 163da005b..2b8592c01 100644 --- a/cli/command_block_list.go +++ b/cli/command_block_list.go @@ -10,8 +10,6 @@ var ( blockListCommand = blockCommands.Command("list", "List blocks").Alias("ls") - blockListKind = blockListCommand.Flag("kind", "Block kind").Default("all").Enum("all", "physical", "packed", "nonpacked", "packs") - blockListGroup = blockListCommand.Flag("group", "List blocks belonging to a given group").String() blockListLong = blockListCommand.Flag("long", "Long output").Short('l').Bool() blockListPrefix = blockListCommand.Flag("prefix", "Prefix").String() blockListSort = blockListCommand.Flag("sort", "Sort order").Default("name").Enum("name", "size", "time", "none", "pack") @@ -23,13 +21,7 @@ func runListBlocksAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - var blocks []block.Info - var err error - if *blockListGroup != "" { - blocks, err = rep.Blocks.ListGroupBlocks(*blockListGroup) - } else { - blocks, err = rep.Blocks.ListBlocks(*blockListPrefix, *blockListKind) - } + blocks, err := rep.Blocks.ListBlocks(*blockListPrefix) if err != nil { return err } @@ -61,14 +53,10 @@ func runListBlocksAction(context *kingpin.ParseContext) error { uniquePacks[b.PackBlockID] = true } if *blockListLong { - grp := b.PackGroup - if grp == "" { - grp = "default" - } if b.PackBlockID != "" { - fmt.Printf("%-34v %10v %v %v in %v offset %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), grp, b.PackBlockID, b.PackOffset) + fmt.Printf("%-34v %10v %v in %v offset %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), b.PackBlockID, b.PackOffset) } else { - fmt.Printf("%-34v %10v %v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), grp) + fmt.Printf("%-34v %10v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat)) } } else { fmt.Printf("%v\n", b.BlockID) diff --git a/cli/command_block_repack.go b/cli/command_block_repack.go index c9c725607..a4c8b112a 100644 --- a/cli/command_block_repack.go +++ b/cli/command_block_repack.go @@ -6,15 +6,14 @@ var ( blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones") - blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String() - blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64() + blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Uint64() ) func runBlockRepackAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold); err != nil { + if err := rep.Blocks.Repackage(*blockRepackSizeThreshold); err != nil { return err } diff --git a/cli/command_block_stats.go b/cli/command_block_stats.go index 735ba2900..118fa432d 100644 --- a/cli/command_block_stats.go +++ b/cli/command_block_stats.go @@ -12,7 +12,6 @@ var ( blockStatsCommand = blockCommands.Command("stats", "Block statistics") - blockStatsKind = blockStatsCommand.Flag("kind", "Kinds of blocks").Default("logical").Enum("all", "logical", "physical", "packed", "nonpacked", "packs") blockStatsRaw = blockStatsCommand.Flag("raw", "Raw numbers").Short('r').Bool() blockStatsGroup = blockStatsCommand.Flag("group", "Display stats about blocks belonging to a given group").String() ) @@ -21,13 +20,7 @@ func runBlockStatsAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - var blocks []block.Info - var err error - if *blockStatsGroup != "" { - blocks, err = rep.Blocks.ListGroupBlocks(*blockStatsGroup) - } else { - blocks, err = rep.Blocks.ListBlocks("", *blockStatsKind) - } + blocks, err := rep.Blocks.ListBlocks("") if err != nil { return err } @@ -54,7 +47,7 @@ func runBlockStatsAction(context *kingpin.ParseContext) error { } } - fmt.Printf("Block statistics (%v)\n", *blockStatsKind) + fmt.Printf("Block statistics\n") if len(blocks) == 0 { return nil } diff --git a/cli/command_object_ls.go b/cli/command_object_ls.go index eac301e5b..79b6f2795 100644 --- a/cli/command_object_ls.go +++ b/cli/command_object_ls.go @@ -15,7 +15,7 @@ func runListObjectsAction(context *kingpin.ParseContext) error { rep := mustOpenRepository(nil) defer rep.Close() - info, err := rep.Blocks.ListBlocks(*objectListPrefix, "all") + info, err := rep.Blocks.ListBlocks(*objectListPrefix) if err != nil { return err } diff --git a/cli/command_repository_create.go b/cli/command_repository_create.go index 2e19fb87d..cec5f3067 100644 --- a/cli/command_repository_create.go +++ b/cli/command_repository_create.go @@ -22,8 +22,6 @@ createAvgBlockSize = createCommand.Flag("avg-block-size", "Average size of a data block.").PlaceHolder("KB").Default("10240").Int() createMaxBlockSize = createCommand.Flag("max-block-size", "Maximum size of a data block.").PlaceHolder("KB").Default("20480").Int() - createMaxPackedContentLength = createCommand.Flag("max-packed-file-size", "Minimum size of a file to include in a pack.").PlaceHolder("KB").Default("4096").Int() - createOverwrite = createCommand.Flag("overwrite", "Overwrite existing data (DANGEROUS).").Bool() createOnly = createCommand.Flag("create-only", "Create repository, but don't connect to it.").Short('c').Bool() ) @@ -41,8 +39,6 @@ func newRepositoryOptionsFromFlags() *repo.NewRepositoryOptions { MinBlockSize: *createMinBlockSize * 1024, AvgBlockSize: *createAvgBlockSize * 1024, MaxBlockSize: *createMaxBlockSize * 1024, - - MaxPackedContentLength: *createMaxPackedContentLength * 1024, } } diff --git a/internal/blockmgrpb/block_index.pb.go b/internal/blockmgrpb/block_index.pb.go index 84a06b4e4..cd870f444 100644 --- a/internal/blockmgrpb/block_index.pb.go +++ b/internal/blockmgrpb/block_index.pb.go @@ -32,8 +32,8 @@ type Index struct { PackBlockId string `protobuf:"bytes,1,opt,name=pack_block_id,json=packBlockId,proto3" json:"pack_block_id,omitempty"` - PackGroup string `protobuf:"bytes,2,opt,name=pack_group,json=packGroup,proto3" json:"pack_group,omitempty"` - CreateTimeNanos int64 `protobuf:"varint,3,opt,name=create_time_nanos,json=createTimeNanos,proto3" json:"create_time_nanos,omitempty"` + PackLength uint64 `protobuf:"varint,2,opt,name=pack_length,json=packLength,proto3" json:"pack_length,omitempty"` + CreateTimeNanos uint64 `protobuf:"varint,3,opt,name=create_time_nanos,json=createTimeNanos,proto3" json:"create_time_nanos,omitempty"` Items map[string]uint64 `protobuf:"bytes,4,rep,name=items" json:"items,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` DeletedItems []string `protobuf:"bytes,5,rep,name=deleted_items,json=deletedItems" json:"deleted_items,omitempty"` } @@ -50,14 +50,14 @@ func (m *Index) GetPackBlockId() string { return "" } -func (m *Index) GetPackGroup() string { +func (m *Index) GetPackLength() uint64 { if m != nil { - return m.PackGroup + return m.PackLength } - return "" + return 0 } -func (m *Index) GetCreateTimeNanos() int64 { +func (m *Index) GetCreateTimeNanos() uint64 { if m != nil { return m.CreateTimeNanos } @@ -119,11 +119,10 @@ func (m *Index) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.PackBlockId))) i += copy(dAtA[i:], m.PackBlockId) } - if len(m.PackGroup) > 0 { - dAtA[i] = 0x12 + if m.PackLength != 0 { + dAtA[i] = 0x10 i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.PackGroup))) - i += copy(dAtA[i:], m.PackGroup) + i = encodeVarintBlockIndex(dAtA, i, uint64(m.PackLength)) } if m.CreateTimeNanos != 0 { dAtA[i] = 0x18 @@ -210,9 +209,8 @@ func (m *Index) Size() (n int) { if l > 0 { n += 1 + l + sovBlockIndex(uint64(l)) } - l = len(m.PackGroup) - if l > 0 { - n += 1 + l + sovBlockIndex(uint64(l)) + if m.PackLength != 0 { + n += 1 + sovBlockIndex(uint64(m.PackLength)) } if m.CreateTimeNanos != 0 { n += 1 + sovBlockIndex(uint64(m.CreateTimeNanos)) @@ -318,10 +316,10 @@ func (m *Index) Unmarshal(dAtA []byte) error { m.PackBlockId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PackGroup", wireType) + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PackLength", wireType) } - var stringLen uint64 + m.PackLength = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowBlockIndex @@ -331,21 +329,11 @@ func (m *Index) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + m.PackLength |= (uint64(b) & 0x7F) << shift if b < 0x80 { break } } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.PackGroup = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field CreateTimeNanos", wireType) @@ -360,7 +348,7 @@ func (m *Index) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.CreateTimeNanos |= (int64(b) & 0x7F) << shift + m.CreateTimeNanos |= (uint64(b) & 0x7F) << shift if b < 0x80 { break } @@ -711,24 +699,24 @@ func skipBlockIndex(dAtA []byte) (n int, err error) { func init() { proto.RegisterFile("internal/blockmgrpb/block_index.proto", fileDescriptorBlockIndex) } var fileDescriptorBlockIndex = []byte{ - // 293 bytes of a gzipped FileDescriptorProto + // 292 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0xcd, 0x4a, 0xc3, 0x40, - 0x14, 0x85, 0x9d, 0xa6, 0x69, 0xe9, 0xad, 0x45, 0x3b, 0xb8, 0x08, 0xa2, 0x21, 0x54, 0xc4, 0xa0, - 0x50, 0x41, 0x37, 0xc5, 0x65, 0x41, 0x24, 0x1b, 0x17, 0x83, 0x2b, 0x37, 0x21, 0x3f, 0x97, 0x32, - 0x24, 0x99, 0x84, 0xc9, 0x54, 0xec, 0x9b, 0xf8, 0x48, 0x2e, 0x7d, 0x04, 0x89, 0xef, 0x21, 0x32, - 0x33, 0x91, 0xee, 0xce, 0xfd, 0xee, 0x39, 0xcc, 0x99, 0x0b, 0x97, 0x5c, 0x28, 0x94, 0x22, 0x29, - 0x6f, 0xd3, 0xb2, 0xce, 0x8a, 0x6a, 0x23, 0x9b, 0xd4, 0xca, 0x98, 0x8b, 0x1c, 0xdf, 0x97, 0x8d, - 0xac, 0x55, 0xbd, 0xf8, 0x25, 0xe0, 0x46, 0x7a, 0xa6, 0x0b, 0x98, 0x35, 0x49, 0x56, 0xc4, 0xbd, - 0x27, 0xf7, 0x48, 0x40, 0xc2, 0x09, 0x9b, 0x6a, 0xb8, 0xd6, 0x2c, 0xca, 0xe9, 0x39, 0x80, 0xf1, - 0x6c, 0x64, 0xbd, 0x6d, 0xbc, 0x81, 0x31, 0x4c, 0x34, 0x79, 0xd2, 0x80, 0x5e, 0xc3, 0x3c, 0x93, - 0x98, 0x28, 0x8c, 0x15, 0xaf, 0x30, 0x16, 0x89, 0xa8, 0x5b, 0xcf, 0x09, 0x48, 0xe8, 0xb0, 0x23, - 0xbb, 0x78, 0xe1, 0x15, 0x3e, 0x6b, 0x4c, 0xaf, 0xc0, 0xe5, 0x0a, 0xab, 0xd6, 0x1b, 0x06, 0x4e, - 0x38, 0xbd, 0x9b, 0x2f, 0x4d, 0x8b, 0x65, 0xa4, 0xd9, 0xa3, 0x50, 0x72, 0xc7, 0xec, 0x9e, 0x5e, - 0xc0, 0x2c, 0xc7, 0x12, 0x15, 0xe6, 0xb1, 0x0d, 0xb8, 0x81, 0x13, 0x4e, 0xd8, 0x61, 0x0f, 0x4d, - 0xe0, 0x74, 0x05, 0xb0, 0x4f, 0xd2, 0x63, 0x70, 0x0a, 0xdc, 0xf5, 0x1f, 0xd0, 0x92, 0x9e, 0x80, - 0xfb, 0x96, 0x94, 0x5b, 0x34, 0x9d, 0x87, 0xcc, 0x0e, 0x0f, 0x83, 0x15, 0x59, 0xdc, 0xc0, 0xd8, - 0xbc, 0x8c, 0x2d, 0x0d, 0x60, 0xcc, 0xad, 0xf4, 0x88, 0x29, 0x35, 0xb2, 0xa5, 0xd8, 0x3f, 0x5e, - 0x9f, 0x7d, 0x76, 0x3e, 0xf9, 0xea, 0x7c, 0xf2, 0xdd, 0xf9, 0xe4, 0xe3, 0xc7, 0x3f, 0x78, 0x85, - 0xfd, 0x75, 0xd3, 0x91, 0x39, 0xe9, 0xfd, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc4, 0xf5, 0x68, - 0xbf, 0x7b, 0x01, 0x00, 0x00, + 0x14, 0x85, 0x9d, 0xa6, 0x69, 0xe9, 0xad, 0x45, 0x3b, 0xb8, 0x08, 0x22, 0x31, 0x44, 0xc4, 0xa0, + 0x10, 0x41, 0x37, 0xc5, 0x65, 0xc1, 0x45, 0x40, 0x5c, 0x04, 0x57, 0x6e, 0x86, 0xfc, 0x5c, 0xea, + 0x90, 0x64, 0x12, 0x92, 0x51, 0xec, 0xce, 0xc7, 0xf0, 0x91, 0x5c, 0xfa, 0x08, 0x12, 0x5f, 0x44, + 0x66, 0x26, 0xd2, 0xdd, 0x39, 0xdf, 0xbd, 0x87, 0x39, 0x73, 0xe1, 0x9c, 0x0b, 0x89, 0xad, 0x48, + 0xca, 0xeb, 0xb4, 0xac, 0xb3, 0xa2, 0xda, 0xb4, 0x4d, 0x6a, 0x24, 0xe3, 0x22, 0xc7, 0xf7, 0xb0, + 0x69, 0x6b, 0x59, 0xfb, 0x1f, 0x23, 0xb0, 0x23, 0xe5, 0xa9, 0x0f, 0x8b, 0x26, 0xc9, 0x0a, 0x36, + 0xec, 0xe4, 0x0e, 0xf1, 0x48, 0x30, 0x8b, 0xe7, 0x0a, 0xae, 0x15, 0x8b, 0x72, 0x7a, 0x0a, 0xda, + 0xb2, 0x12, 0xc5, 0x46, 0xbe, 0x38, 0x23, 0x8f, 0x04, 0xe3, 0x18, 0x14, 0x7a, 0xd0, 0x84, 0x5e, + 0xc2, 0x32, 0x6b, 0x31, 0x91, 0xc8, 0x24, 0xaf, 0x90, 0x89, 0x44, 0xd4, 0x9d, 0x63, 0xe9, 0xb5, + 0x03, 0x33, 0x78, 0xe2, 0x15, 0x3e, 0x2a, 0x4c, 0x2f, 0xc0, 0xe6, 0x12, 0xab, 0xce, 0x19, 0x7b, + 0x56, 0x30, 0xbf, 0x59, 0x86, 0xba, 0x47, 0x18, 0x29, 0x76, 0x2f, 0x64, 0xbb, 0x8d, 0xcd, 0x9c, + 0x9e, 0xc1, 0x22, 0xc7, 0x12, 0x25, 0xe6, 0xcc, 0x04, 0x6c, 0xcf, 0x0a, 0x66, 0xf1, 0xfe, 0x00, + 0x75, 0xe0, 0x78, 0x05, 0xb0, 0x4b, 0xd2, 0x43, 0xb0, 0x0a, 0xdc, 0x0e, 0x5f, 0x50, 0x92, 0x1e, + 0x81, 0xfd, 0x96, 0x94, 0xaf, 0x38, 0x94, 0x36, 0xe6, 0x6e, 0xb4, 0x22, 0xfe, 0x15, 0x4c, 0xf5, + 0xcb, 0xd8, 0x51, 0x0f, 0xa6, 0xdc, 0x48, 0x87, 0xe8, 0x52, 0x13, 0x53, 0x2a, 0xfe, 0xc7, 0xeb, + 0x93, 0xaf, 0xde, 0x25, 0xdf, 0xbd, 0x4b, 0x7e, 0x7a, 0x97, 0x7c, 0xfe, 0xba, 0x7b, 0xcf, 0xb0, + 0xbb, 0x6f, 0x3a, 0xd1, 0x47, 0xbd, 0xfd, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x26, 0xd1, 0xd9, 0xf1, + 0x7d, 0x01, 0x00, 0x00, } diff --git a/internal/blockmgrpb/block_index.proto b/internal/blockmgrpb/block_index.proto index 298a5c85e..a1eeba71d 100644 --- a/internal/blockmgrpb/block_index.proto +++ b/internal/blockmgrpb/block_index.proto @@ -4,8 +4,8 @@ option go_package = "blockmgrpb"; message Index { string pack_block_id = 1; - string pack_group = 2; - int64 create_time_nanos = 3; + uint64 pack_length = 2; + uint64 create_time_nanos = 3; map items = 4; repeated string deleted_items = 5; } diff --git a/manifest/manifest_manager.go b/manifest/manifest_manager.go index b498b9dde..5fb0a4917 100644 --- a/manifest/manifest_manager.go +++ b/manifest/manifest_manager.go @@ -19,7 +19,7 @@ // ErrNotFound is returned when the metadata item is not found. var ErrNotFound = errors.New("not found") -const manifestGroupID = "manifests" +const manifestBlockPrefix = "M" // Manager organizes JSON manifests of various kinds, including snapshot manifests type Manager struct { @@ -175,15 +175,11 @@ func (m *Manager) flushPendingEntriesLocked() (string, error) { gz.Flush() gz.Close() - blockID, err := m.b.WriteBlock(manifestGroupID, buf.Bytes()) + blockID, err := m.b.WriteBlock(buf.Bytes(), manifestBlockPrefix) if err != nil { return "", err } - if err := m.b.Flush(); err != nil { - return "", err - } - m.pendingEntries = nil return blockID, nil } @@ -212,12 +208,13 @@ func (m *Manager) load() error { m.entries = map[string]*manifestEntry{} - log.Debug().Str("group", manifestGroupID).Msg("listing manifest group blocks") - blocks, err := m.b.ListGroupBlocks(manifestGroupID) + log.Debug().Msg("listing manifest blocks") + blocks, err := m.b.ListBlocks(manifestBlockPrefix) if err != nil { return fmt.Errorf("unable to list manifest blocks: %v", err) } + log.Printf("loaded %v blocks", len(blocks)) return m.loadManifestBlocks(blocks) } diff --git a/manifest/manifest_manager_test.go b/manifest/manifest_manager_test.go index fa395bc73..db1aa43b4 100644 --- a/manifest/manifest_manager_test.go +++ b/manifest/manifest_manager_test.go @@ -67,7 +67,8 @@ func TestManifest(t *testing.T) { verifyItem(t, mgr, id2, labels2, item2) verifyItem(t, mgr, id3, labels3, item3) - // verify in new manager + // flush underlying block manager and verify in new manifest manager. + mgr.b.Flush() mgr2, err := newManagerForTesting(t, data, keyTime) if err != nil { t.Fatalf("can't open block manager: %v", err) @@ -98,9 +99,9 @@ func TestManifest(t *testing.T) { t.Errorf("can't compact: %v", err) } - blks, err := mgr.b.ListGroupBlocks(manifestGroupID) + blks, err := mgr.b.ListBlocks(manifestBlockPrefix) if err != nil { - t.Errorf("unable to list manifest group blocks: %v", err) + t.Errorf("unable to list manifest blocks: %v", err) } if got, want := len(blks), 1; got != want { t.Errorf("unexpected number of blocks: %v, want %v", got, want) diff --git a/object/object_manager.go b/object/object_manager.go index f67b00131..04430fbfc 100644 --- a/object/object_manager.go +++ b/object/object_manager.go @@ -25,7 +25,7 @@ type Reader interface { type blockManager interface { BlockInfo(blockID string) (block.Info, error) GetBlock(blockID string) ([]byte, error) - WriteBlock(packGroup string, data []byte) (string, error) + WriteBlock(data []byte, prefix string) (string, error) Flush() error } @@ -57,7 +57,7 @@ func (om *Manager) NewWriter(opt WriterOptions) Writer { repo: om, splitter: om.newSplitter(), description: opt.Description, - packGroup: opt.PackGroup, + blockPrefix: opt.BlockPrefix, } if opt.splitter != nil { @@ -171,7 +171,7 @@ func (om *Manager) verifyObjectInternal(oid ID, blocks *blockTracker) (int64, er // ok to be used. func (om *Manager) Flush() error { om.writeBackWG.Wait() - return om.blockMgr.Flush() + return nil } func nullTrace(message string, args ...interface{}) { diff --git a/object/object_manager_test.go b/object/object_manager_test.go index 418a79d7d..930e79bb4 100644 --- a/object/object_manager_test.go +++ b/object/object_manager_test.go @@ -37,10 +37,10 @@ func (f *fakeBlockManager) GetBlock(blockID string) ([]byte, error) { return nil, storage.ErrBlockNotFound } -func (f *fakeBlockManager) WriteBlock(groupID string, data []byte) (string, error) { +func (f *fakeBlockManager) WriteBlock(data []byte, prefix string) (string, error) { h := md5.New() h.Write(data) - blockID := hex.EncodeToString(h.Sum(nil)) + blockID := prefix + hex.EncodeToString(h.Sum(nil)) f.mu.Lock() defer f.mu.Unlock() diff --git a/object/object_writer.go b/object/object_writer.go index 9429a5fb3..18ab431a7 100644 --- a/object/object_writer.go +++ b/object/object_writer.go @@ -54,8 +54,8 @@ type objectWriter struct { description string - splitter objectSplitter - packGroup string + splitter objectSplitter + blockPrefix string pendingBlocksWG sync.WaitGroup @@ -97,7 +97,7 @@ func (w *objectWriter) flushBuffer() error { w.buffer.Reset() do := func() { - blockID, err := w.repo.blockMgr.WriteBlock(w.packGroup, b2.Bytes()) + blockID, err := w.repo.blockMgr.WriteBlock(b2.Bytes(), w.blockPrefix) w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length) if err != nil { w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err)) @@ -146,7 +146,7 @@ func (w *objectWriter) Result() (ID, error) { repo: w.repo, description: "LIST(" + w.description + ")", splitter: w.repo.newSplitter(), - packGroup: w.packGroup, + blockPrefix: w.blockPrefix, } jw := jsonstream.NewWriter(iw, indirectStreamType) @@ -164,7 +164,7 @@ func (w *objectWriter) Result() (ID, error) { // WriterOptions can be passed to Repository.NewWriter() type WriterOptions struct { Description string - PackGroup string + BlockPrefix string splitter objectSplitter } diff --git a/repo/initialize.go b/repo/initialize.go index 470d4e1e2..2962c6bd1 100644 --- a/repo/initialize.go +++ b/repo/initialize.go @@ -34,11 +34,10 @@ type NewRepositoryOptions struct { ObjectHMACSecret []byte // force the use of particular object HMAC secret ObjectEncryptionKey []byte // force the use of particular object encryption key - Splitter string // splitter used to break objects into storage blocks - MinBlockSize int // minimum block size used with dynamic splitter - AvgBlockSize int // approximate size of storage block (used with dynamic splitter) - MaxBlockSize int // maximum size of storage block - MaxPackedContentLength int // maximum size of object to be considered for storage in a pack + Splitter string // splitter used to break objects into storage blocks + MinBlockSize int // minimum block size used with dynamic splitter + AvgBlockSize int // approximate size of storage block (used with dynamic splitter) + MaxBlockSize int // maximum size of storage block // test-only noHMAC bool // disable HMAC @@ -84,12 +83,11 @@ func formatBlockFromOptions(opt *NewRepositoryOptions) *formatBlock { func repositoryObjectFormatFromOptions(opt *NewRepositoryOptions) *config.RepositoryObjectFormat { f := &config.RepositoryObjectFormat{ FormattingOptions: block.FormattingOptions{ - Version: 1, - BlockFormat: applyDefaultString(opt.BlockFormat, block.DefaultFormat), - HMACSecret: applyDefaultRandomBytes(opt.ObjectHMACSecret, 32), - MasterKey: applyDefaultRandomBytes(opt.ObjectEncryptionKey, 32), - MaxPackedContentLength: applyDefaultInt(opt.MaxPackedContentLength, 4<<20), // 4 MB - MaxPackSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20 MB + Version: 1, + BlockFormat: applyDefaultString(opt.BlockFormat, block.DefaultFormat), + HMACSecret: applyDefaultRandomBytes(opt.ObjectHMACSecret, 32), + MasterKey: applyDefaultRandomBytes(opt.ObjectEncryptionKey, 32), + MaxPackSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20 MB }, Splitter: applyDefaultString(opt.Splitter, object.DefaultSplitter), MaxBlockSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20MiB diff --git a/repo/repository.go b/repo/repository.go index c31efb03e..d5b50795c 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -29,6 +29,9 @@ func (r *Repository) Close() error { if err := r.Objects.Close(); err != nil { return err } + if err := r.Blocks.Flush(); err != nil { + return err + } if err := r.Storage.Close(); err != nil { return err } @@ -40,5 +43,9 @@ func (r *Repository) Flush() error { if err := r.Manifests.Flush(); err != nil { return err } - return r.Objects.Flush() + if err := r.Objects.Flush(); err != nil { + return err + } + + return r.Blocks.Flush() } diff --git a/repo/repository_test.go b/repo/repository_test.go index 04c603509..106ce42de 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -16,12 +16,10 @@ "github.com/rs/zerolog/log" - "github.com/kopia/kopia/block" - "github.com/kopia/kopia/auth" - "github.com/kopia/kopia/object" - + "github.com/kopia/kopia/block" "github.com/kopia/kopia/internal/storagetesting" + "github.com/kopia/kopia/object" "github.com/kopia/kopia/storage" ) @@ -38,8 +36,6 @@ func setupTestWithData(t *testing.T, data map[string][]byte, keyTime map[string] Splitter: "FIXED", BlockFormat: "TESTONLY_MD5", MetadataEncryptionAlgorithm: "NONE", - MaxPackedContentLength: -1, - noHMAC: true, } @@ -91,6 +87,8 @@ func TestWriters(t *testing.T) { t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String()) } + repo.Blocks.Flush() + if got, want := len(data), 3; got != want { // 1 format block + 1 data block + 1 pack index block t.Errorf("unexpected data written to the storage (%v), wanted %v: %v", len(data), 3, data) @@ -134,7 +132,6 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) { func TestPackingSimple(t *testing.T) { data, keyTime, repo := setupTest(t, func(n *NewRepositoryOptions) { - n.MaxPackedContentLength = 10000 }) content1 := "hello, how do you do?" @@ -146,8 +143,6 @@ func TestPackingSimple(t *testing.T) { oid2a := writeObject(t, repo, []byte(content2), "packed-object-2a") oid2b := writeObject(t, repo, []byte(content2), "packed-object-2b") - repo.Objects.Flush() - oid3a := writeObject(t, repo, []byte(content3), "packed-object-3a") oid3b := writeObject(t, repo, []byte(content3), "packed-object-3b") verify(t, repo, oid1a, []byte(content1), "packed-object-1") @@ -156,6 +151,7 @@ func TestPackingSimple(t *testing.T) { oid1c := writeObject(t, repo, []byte(content1), "packed-object-1c") repo.Objects.Flush() + repo.Blocks.Flush() if got, want := oid1a.String(), oid1b.String(); got != want { t.Errorf("oid1a(%q) != oid1b(%q)", got, want) @@ -173,20 +169,13 @@ func TestPackingSimple(t *testing.T) { t.Errorf("oid3a(%q) != oid3b(%q)", got, want) } - if got, want := len(data), 1+4; got != want { + // format + index + pack + if got, want := len(data), 3; got != want { t.Errorf("got unexpected repository contents %v items, wanted %v", got, want) - for k, v := range data { - t.Logf("%v => %v", k, string(v)) - } } repo.Close() - for k, v := range data { - log.Printf("data[%v] = %v", k, string(v)) - } - data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) { - n.MaxPackedContentLength = 10000 }) verify(t, repo, oid1a, []byte(content1), "packed-object-1") @@ -197,7 +186,6 @@ func TestPackingSimple(t *testing.T) { t.Errorf("optimize error: %v", err) } data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) { - n.MaxPackedContentLength = 10000 }) verify(t, repo, oid1a, []byte(content1), "packed-object-1") @@ -208,7 +196,6 @@ func TestPackingSimple(t *testing.T) { t.Errorf("optimize error: %v", err) } data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) { - n.MaxPackedContentLength = 10000 }) verify(t, repo, oid1a, []byte(content1), "packed-object-1") diff --git a/snapshot/upload.go b/snapshot/upload.go index 9067b974a..2844e46ae 100644 --- a/snapshot/upload.go +++ b/snapshot/upload.go @@ -224,7 +224,7 @@ func (u *Uploader) uploadDir(dir fs.Directory) (object.ID, object.ID, error) { mw := u.repo.Objects.NewWriter(object.WriterOptions{ Description: "HASHCACHE:" + dir.Metadata().Name, - PackGroup: "HC", + BlockPrefix: "H", }) defer mw.Close() u.cacheWriter = hashcache.NewWriter(mw) @@ -265,7 +265,7 @@ func uploadDirInternal( writer := u.repo.Objects.NewWriter(object.WriterOptions{ Description: "DIR:" + relativePath, - PackGroup: "DIR", + BlockPrefix: "", }) dw := dir.NewWriter(writer)