breaking format change: removed support for groups in block manager, this produces bigger, tightly packed blocks

align pack blocks to 4096 bytes and insert random preamble
This commit is contained in:
Jarek Kowalski
2018-02-03 18:33:47 -08:00
parent 7e16471a21
commit 4564bc704f
21 changed files with 352 additions and 804 deletions

View File

@@ -1,8 +1,11 @@
package block
import (
cryptorand "crypto/rand"
"fmt"
"io"
"math"
"math/rand"
"sort"
"strconv"
"strings"
@@ -18,23 +21,18 @@
)
const (
parallelFetches = 5 // number of parallel reads goroutines
flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes
packBlockPrefix = "P" // prefix for all storage blocks that are pack indexes
nonPackedObjectsPackGroup = "raw" // ID of pack group that stores non-packed objects that don't belong to any group
packObjectsPackGroup = "packs" // ID of pack group that stores pack blocks themselves
compactedBlockSuffix = "-z"
maxIndexBlockUploadTime = 1 * time.Minute
maxNonPackedBlocksPerPackIndex = 200
parallelFetches = 5 // number of parallel reads goroutines
flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes
indexBlockPrefix = "I" // prefix for all storage blocks that are pack indexes
compactedBlockSuffix = "-z"
maxIndexBlockUploadTime = 1 * time.Minute
defaultMinPreambleLength = 32
defaultMaxPreambleLength = 32
defaultPaddingUnit = 4096
)
var zeroTime time.Time
type packInfo struct {
currentPackData []byte
currentPackIndex *blockmgrpb.Index
}
type blockLocation struct {
packIndex int
objectIndex int
@@ -45,7 +43,6 @@ type Info struct {
BlockID string `json:"blockID"`
Length int64 `json:"length"`
Timestamp time.Time `json:"time"`
PackGroup string `json:"packGroup,omitempty"`
PackBlockID string `json:"packBlockID,omitempty"`
PackOffset int64 `json:"packOffset,omitempty"`
}
@@ -58,19 +55,25 @@ type Manager struct {
cache blockCache
mu sync.Mutex
locked bool
groupToBlockToIndex map[string]map[string]*blockmgrpb.Index
mu sync.Mutex
locked bool
indexLoaded bool
blockIDToIndex map[string]*blockmgrpb.Index
packBlockIDToIndex map[string]*blockmgrpb.Index
pendingPackIndexes []*blockmgrpb.Index
flushPackIndexesAfter time.Time
openPackGroups map[string]*packInfo
currentPackData []byte
currentPackIndex *blockmgrpb.Index
maxPackedContentLength int
maxPackSize int
formatter Formatter
timeNow func() time.Time
minPreambleLength int
maxPreambleLength int
paddingUnit int
timeNow func() time.Time
}
// DeleteBlock marks the given blockID as deleted.
@@ -86,52 +89,12 @@ func (bm *Manager) DeleteBlock(blockID string) error {
return fmt.Errorf("can't load pack index: %v", err)
}
// delete from all indexes
for _, m := range bm.groupToBlockToIndex {
delete(m, blockID)
}
for _, m := range bm.openPackGroups {
if ndx := m.currentPackIndex; ndx != nil {
delete(ndx.Items, blockID)
}
}
g := bm.ensurePackGroupLocked("", true)
g.currentPackIndex.DeletedItems = append(g.currentPackIndex.DeletedItems, blockID)
delete(bm.blockIDToIndex, blockID)
delete(bm.currentPackIndex.Items, blockID)
bm.currentPackIndex.DeletedItems = append(bm.currentPackIndex.DeletedItems, blockID)
return nil
}
func (bm *Manager) registerUnpackedBlock(packGroupID string, blockID string, dataLength int64) error {
bm.lock()
defer bm.unlock()
g := bm.registerUnpackedBlockLockedNoFlush(packGroupID, blockID, dataLength)
if bm.timeNow().After(bm.flushPackIndexesAfter) || len(g.currentPackIndex.Items) > maxNonPackedBlocksPerPackIndex {
if err := bm.finishPackAndMaybeFlushIndexes(g); err != nil {
return err
}
}
return nil
}
func (bm *Manager) registerUnpackedBlockLockedNoFlush(groupID string, blockID string, dataLength int64) *packInfo {
bm.assertLocked()
g := bm.ensurePackGroupLocked(groupID, true)
// See if we already have this block ID in an unpacked pack group.
ndx := bm.groupToBlockToIndex[groupID][blockID]
if ndx != nil {
return g
}
bm.addToIndexLocked(groupID, blockID, g.currentPackIndex, packOffsetAndSize(0, uint32(dataLength)))
return g
}
func packOffsetAndSize(offset uint32, size uint32) uint64 {
return uint64(offset)<<32 | uint64(size)
}
@@ -143,20 +106,14 @@ func unpackOffsetAndSize(os uint64) (uint32, uint32) {
return offset, size
}
func (bm *Manager) addToIndexLocked(groupID, blockID string, ndx *blockmgrpb.Index, packedOffsetAndSize uint64) {
func (bm *Manager) addToIndexLocked(blockID string, ndx *blockmgrpb.Index, packedOffsetAndSize uint64) {
bm.assertLocked()
m := bm.groupToBlockToIndex[groupID]
if m == nil {
m = make(map[string]*blockmgrpb.Index)
bm.groupToBlockToIndex[groupID] = m
}
ndx.Items[blockID] = packedOffsetAndSize
m[blockID] = ndx
bm.blockIDToIndex[blockID] = ndx
}
func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte, force bool) error {
func (bm *Manager) addToPackLocked(blockID string, data []byte, force bool) error {
bm.assertLocked()
if err := bm.ensurePackIndexesLoaded(); err != nil {
@@ -165,21 +122,29 @@ func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte
if !force {
// See if we already have this block ID in the pack.
if _, ok := bm.groupToBlockToIndex[packGroup][blockID]; ok {
if _, ok := bm.blockIDToIndex[blockID]; ok {
return nil
}
}
g := bm.ensurePackGroupLocked(packGroup, false)
if len(bm.currentPackData) == 0 && bm.maxPreambleLength > 0 {
preambleLength := rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1) + bm.minPreambleLength
preamble := make([]byte, preambleLength, preambleLength+len(data))
if _, err := io.ReadFull(cryptorand.Reader, preamble); err != nil {
return err
}
offset := len(g.currentPackData)
bm.currentPackData = preamble
}
offset := len(bm.currentPackData)
shouldFinish := offset+len(data) >= bm.maxPackSize
g.currentPackData = append(g.currentPackData, data...)
bm.addToIndexLocked(packGroup, blockID, g.currentPackIndex, packOffsetAndSize(uint32(offset), uint32(len(data))))
bm.currentPackData = append(bm.currentPackData, data...)
bm.addToIndexLocked(blockID, bm.currentPackIndex, packOffsetAndSize(uint32(offset), uint32(len(data))))
if shouldFinish {
if err := bm.finishPackAndMaybeFlushIndexes(g); err != nil {
if err := bm.finishPackAndMaybeFlushIndexes(); err != nil {
return err
}
}
@@ -187,8 +152,8 @@ func (bm *Manager) addToPackLocked(packGroup string, blockID string, data []byte
return nil
}
func (bm *Manager) finishPackAndMaybeFlushIndexes(g *packInfo) error {
if err := bm.finishPackLocked(g); err != nil {
func (bm *Manager) finishPackAndMaybeFlushIndexes() error {
if err := bm.finishPackLocked(); err != nil {
return err
}
@@ -216,32 +181,12 @@ func (bm *Manager) close() error {
return bm.cache.close()
}
func (bm *Manager) ensurePackGroupLocked(packGroup string, unpacked bool) *packInfo {
var suffix string
if unpacked {
suffix = ":unpacked"
func (bm *Manager) startPackIndexLocked() {
bm.currentPackIndex = &blockmgrpb.Index{
Items: make(map[string]uint64),
CreateTimeNanos: uint64(bm.timeNow().UnixNano()),
}
g := bm.openPackGroups[packGroup+suffix]
if g == nil {
g = &packInfo{}
bm.openPackGroups[packGroup+suffix] = g
}
if g.currentPackIndex == nil {
g.currentPackIndex = &blockmgrpb.Index{
Items: make(map[string]uint64),
PackGroup: packGroup,
CreateTimeNanos: bm.timeNow().UnixNano(),
}
if unpacked {
g.currentPackData = nil
} else {
g.currentPackData = []byte{}
}
}
return g
bm.currentPackData = []byte{}
}
func (bm *Manager) flushPackIndexesLocked() error {
@@ -274,51 +219,46 @@ func (bm *Manager) writePackIndexes(ndx []*blockmgrpb.Index, replacesBlockBefore
inverseTimePrefix := fmt.Sprintf("%016x", math.MaxInt64-time.Now().UnixNano())
return bm.writeUnpackedBlockNotLocked(data, packBlockPrefix+inverseTimePrefix, suffix, true)
return bm.writeUnpackedBlockNotLocked(data, indexBlockPrefix+inverseTimePrefix, suffix, true)
}
func (bm *Manager) finishAllOpenPacksLocked() error {
// finish non-pack groups first.
for _, g := range bm.openPackGroups {
if g.currentPackIndex != nil && g.currentPackIndex.PackGroup != packObjectsPackGroup {
if err := bm.finishPackLocked(g); err != nil {
return err
}
}
}
// finish pack groups at the very end.
for _, g := range bm.openPackGroups {
if g.currentPackIndex != nil && g.currentPackIndex.PackGroup == packObjectsPackGroup {
if err := bm.finishPackLocked(g); err != nil {
return err
}
if bm.currentPackIndex != nil {
if err := bm.finishPackLocked(); err != nil {
return err
}
}
return nil
}
func (bm *Manager) finishPackLocked(g *packInfo) error {
if g.currentPackIndex == nil {
return nil
}
if len(g.currentPackIndex.Items)+len(g.currentPackIndex.DeletedItems) > 0 {
if g.currentPackData != nil {
dataLength := len(g.currentPackData)
blockID, err := bm.writeUnpackedBlockNotLocked(g.currentPackData, "", "", true)
func (bm *Manager) finishPackLocked() error {
if len(bm.currentPackIndex.Items)+len(bm.currentPackIndex.DeletedItems) > 0 {
if bm.currentPackData != nil {
if bm.paddingUnit > 0 {
if missing := bm.paddingUnit - (len(bm.currentPackData) % bm.paddingUnit); missing > 0 {
postamble := make([]byte, missing)
if _, err := io.ReadFull(cryptorand.Reader, postamble); err != nil {
return fmt.Errorf("can't allocate random bytes for postamble: %v", err)
}
bm.currentPackData = append(bm.currentPackData, postamble...)
}
}
blockID, err := bm.writeUnpackedBlockNotLocked(bm.currentPackData, "", "", true)
if err != nil {
return fmt.Errorf("can't save pack data block %q: %v", blockID, err)
}
bm.registerUnpackedBlockLockedNoFlush(packObjectsPackGroup, blockID, int64(dataLength))
g.currentPackIndex.PackBlockId = blockID
bm.currentPackIndex.PackBlockId = blockID
bm.packBlockIDToIndex[bm.currentPackIndex.PackBlockId] = bm.currentPackIndex
bm.currentPackIndex.PackLength = uint64(len(bm.currentPackData))
}
bm.pendingPackIndexes = append(bm.pendingPackIndexes, g.currentPackIndex)
bm.pendingPackIndexes = append(bm.pendingPackIndexes, bm.currentPackIndex)
}
g.currentPackData = g.currentPackData[:0]
g.currentPackIndex = nil
bm.startPackIndexLocked()
return nil
}
@@ -460,8 +400,7 @@ func (bm *Manager) loadMergedPackIndexLocked() ([]*blockmgrpb.Index, []string, t
func (bm *Manager) ensurePackIndexesLoaded() error {
bm.assertLocked()
pi := bm.groupToBlockToIndex
if pi != nil {
if bm.indexLoaded {
return nil
}
@@ -470,46 +409,38 @@ func (bm *Manager) ensurePackIndexesLoaded() error {
return err
}
bm.groupToBlockToIndex = dedupeBlockIDsAndIndex(merged)
bm.indexLoaded = true
bm.blockIDToIndex, bm.packBlockIDToIndex = dedupeBlockIDsAndIndex(merged)
totalBlocks := len(bm.blockIDToIndex)
totalBlocks := 0
for _, v := range bm.groupToBlockToIndex {
totalBlocks += len(v)
}
log.Debug().Int("groups", len(bm.groupToBlockToIndex)).Int("blocks", totalBlocks).Msgf("loaded indexes")
log.Debug().Int("blocks", totalBlocks).Msgf("loaded indexes")
return nil
}
func dedupeBlockIDsAndIndex(ndx []*blockmgrpb.Index) map[string]map[string]*blockmgrpb.Index {
func dedupeBlockIDsAndIndex(ndx []*blockmgrpb.Index) (blockToIndex, packToIndex map[string]*blockmgrpb.Index) {
sort.Slice(ndx, func(i, j int) bool {
return ndx[i].CreateTimeNanos < ndx[j].CreateTimeNanos
})
pi := make(map[string]map[string]*blockmgrpb.Index)
blockToIndex = make(map[string]*blockmgrpb.Index)
packToIndex = make(map[string]*blockmgrpb.Index)
for _, pck := range ndx {
g := pi[pck.PackGroup]
if g == nil {
g = make(map[string]*blockmgrpb.Index)
pi[pck.PackGroup] = g
}
packToIndex[pck.PackBlockId] = pck
for blockID := range pck.Items {
if o := g[blockID]; o != nil {
if o := blockToIndex[blockID]; o != nil {
// this pack is same or newer.
delete(o.Items, blockID)
}
g[blockID] = pck
blockToIndex[blockID] = pck
}
for _, deletedBlockID := range pck.DeletedItems {
for _, m := range pi {
delete(m, deletedBlockID)
}
delete(blockToIndex, deletedBlockID)
}
}
return pi
return
}
func removeEmptyIndexes(ndx []*blockmgrpb.Index) []*blockmgrpb.Index {
@@ -523,64 +454,6 @@ func removeEmptyIndexes(ndx []*blockmgrpb.Index) []*blockmgrpb.Index {
return res
}
func (bm *Manager) regroupPacksAndUnpacked(ndx []*blockmgrpb.Index) []*blockmgrpb.Index {
var res []*blockmgrpb.Index
allPacks := &blockmgrpb.Index{
Items: map[string]uint64{},
PackGroup: packObjectsPackGroup,
CreateTimeNanos: bm.timeNow().UnixNano(),
}
allNonPacked := &blockmgrpb.Index{
Items: map[string]uint64{},
PackGroup: nonPackedObjectsPackGroup,
CreateTimeNanos: bm.timeNow().UnixNano(),
}
inUsePackBlocks := map[string]bool{}
// Iterate through all indexes, build merged index of all packs and all non-packed items.
for _, n := range ndx {
if n.PackGroup == packObjectsPackGroup {
for i, o := range n.Items {
allPacks.Items[i] = o
}
continue
}
if n.PackGroup == nonPackedObjectsPackGroup {
for i, o := range n.Items {
allNonPacked.Items[i] = o
}
continue
}
if n.PackBlockId != "" {
inUsePackBlocks[n.PackBlockId] = true
}
res = append(res, n)
}
// Now delete all pack blocks that are not in use.
for k := range allPacks.Items {
if !inUsePackBlocks[k] {
delete(allPacks.Items, k)
}
}
if len(allPacks.Items) > 0 {
res = append(res, allPacks)
}
if len(allNonPacked.Items) > 0 {
res = append(res, allNonPacked)
}
return res
}
// CompactIndexes performs compaction of index blocks.
func (bm *Manager) CompactIndexes() error {
bm.lock()
@@ -599,7 +472,7 @@ func (bm *Manager) CompactIndexes() error {
}
// ListBlocks returns the metadata about blocks with a given prefix and kind.
func (bm *Manager) ListBlocks(prefix string, kind string) ([]Info, error) {
func (bm *Manager) ListBlocks(prefix string) ([]Info, error) {
bm.lock()
defer bm.unlock()
@@ -609,57 +482,12 @@ func (bm *Manager) ListBlocks(prefix string, kind string) ([]Info, error) {
return nil, fmt.Errorf("can't load pack index: %v", err)
}
packBlockIDs := map[string]bool{}
for _, blockToIndex := range bm.groupToBlockToIndex {
for _, b := range blockToIndex {
packBlockIDs[b.PackBlockId] = true
}
}
var blockMatches func(Info, *blockmgrpb.Index) bool
switch kind {
case "all":
blockMatches = func(Info, *blockmgrpb.Index) bool { return true }
case "logical": // blocks that are not pack blocks
blockMatches = func(b Info, _ *blockmgrpb.Index) bool {
return !packBlockIDs[b.BlockID]
for b, ndx := range bm.blockIDToIndex {
if !strings.HasPrefix(b, prefix) {
continue
}
case "packs": // blocks that are pack blocks
blockMatches = func(b Info, _ *blockmgrpb.Index) bool {
return packBlockIDs[b.BlockID]
}
case "packed": // blocks that are packed
blockMatches = func(b Info, ndx *blockmgrpb.Index) bool {
return ndx.PackBlockId != ""
}
case "nonpacked": // blocks that are not packed
blockMatches = func(b Info, ndx *blockmgrpb.Index) bool {
return ndx.PackBlockId == ""
}
default:
blockMatches = func(Info, *blockmgrpb.Index) bool { return false }
}
for _, blockToIndex := range bm.groupToBlockToIndex {
for b, ndx := range blockToIndex {
if !strings.HasPrefix(b, prefix) {
continue
}
nfo := newInfo(b, ndx)
if !blockMatches(nfo, ndx) {
continue
}
result = append(result, nfo)
}
result = append(result, newInfo(b, ndx))
}
return result, nil
@@ -670,35 +498,15 @@ func newInfo(blockID string, ndx *blockmgrpb.Index) Info {
return Info{
BlockID: blockID,
Length: int64(size),
Timestamp: time.Unix(0, ndx.CreateTimeNanos),
PackGroup: ndx.PackGroup,
Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)),
PackBlockID: ndx.PackBlockId,
PackOffset: int64(offset),
}
}
// ListGroupBlocks returns the list of blocks in the specified group (in random order).
func (bm *Manager) ListGroupBlocks(groupID string) ([]Info, error) {
bm.lock()
defer bm.unlock()
var result []Info
if err := bm.ensurePackIndexesLoaded(); err != nil {
return nil, fmt.Errorf("can't load pack index: %v", err)
}
for blockID, ndx := range bm.groupToBlockToIndex[groupID] {
result = append(result, newInfo(blockID, ndx))
}
return result, nil
}
func (bm *Manager) compactIndexes(merged []*blockmgrpb.Index, blockIDs []string, latestBlockTime time.Time) error {
dedupeBlockIDsAndIndex(merged)
merged = removeEmptyIndexes(merged)
merged = bm.regroupPacksAndUnpacked(merged)
if len(blockIDs) <= 1 {
log.Printf("skipping index compaction - already compacted")
return nil
@@ -742,37 +550,20 @@ func (bm *Manager) Flush() error {
// WriteBlock saves a given block of data to a pack group with a provided name and returns a blockID
// that's based on the contents of data written.
func (bm *Manager) WriteBlock(groupID string, data []byte) (string, error) {
if bm.maxPackedContentLength > 0 && len(data) <= bm.maxPackedContentLength {
blockID := bm.hashData(data)
func (bm *Manager) WriteBlock(data []byte, prefix string) (string, error) {
blockID := prefix + bm.hashData(data)
bm.lock()
defer bm.unlock()
err := bm.addToPackLocked(groupID, blockID, data, false)
return blockID, err
}
blockID, err := bm.writeUnpackedBlockNotLocked(data, "", "", false)
if err != nil {
return "", err
}
bm.registerUnpackedBlock(nonPackedObjectsPackGroup, blockID, int64(len(data)))
if groupID != "" {
bm.registerUnpackedBlock(groupID, blockID, int64(len(data)))
}
return blockID, nil
}
// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size.
func (bm *Manager) Repackage(groupID string, maxLength int64) error {
bm.lock()
defer bm.unlock()
if groupID == "" || groupID == nonPackedObjectsPackGroup || groupID == packObjectsPackGroup {
return fmt.Errorf("invalid group ID: %q", groupID)
}
err := bm.addToPackLocked(blockID, data, false)
return blockID, err
}
// Repackage reorganizes all pack blocks belonging to a given group that are not bigger than given size.
func (bm *Manager) Repackage(maxLength uint64) error {
bm.lock()
defer bm.unlock()
if err := bm.ensurePackIndexesLoaded(); err != nil {
return err
@@ -784,23 +575,22 @@ func (bm *Manager) Repackage(groupID string, maxLength int64) error {
}
var toRepackage []*blockmgrpb.Index
var totalBytes int64
var totalBytes uint64
for _, m := range merged {
if m.PackGroup == groupID && m.PackBlockId != "" {
bi, err := bm.blockInfoLocked(m.PackBlockId)
if err != nil {
return fmt.Errorf("unable to get info on block %q: %v", m.PackBlockId, err)
}
bi, ok := bm.packBlockIDToIndex[m.PackBlockId]
if !ok {
return fmt.Errorf("unable to get info on pack block %q", m.PackBlockId)
}
if bi.Length <= maxLength {
toRepackage = append(toRepackage, m)
totalBytes += bi.Length
}
if bi.PackLength <= maxLength {
toRepackage = append(toRepackage, m)
totalBytes += bi.PackLength
}
}
log.Printf("%v blocks to re-package (%v total bytes)", len(toRepackage), totalBytes)
done := map[string]bool{}
for _, m := range toRepackage {
data, err := bm.getBlockInternalLocked(m.PackBlockId)
@@ -809,11 +599,15 @@ func (bm *Manager) Repackage(groupID string, maxLength int64) error {
}
for blockID, os := range m.Items {
if done[blockID] {
continue
}
done[blockID] = true
log.Printf("re-packaging: %v %v", blockID, os)
offset, size := unpackOffsetAndSize(os)
blockData := data[offset : offset+size]
if err := bm.addToPackLocked(groupID, blockID, blockData, true); err != nil {
if err := bm.addToPackLocked(blockID, blockData, true); err != nil {
return fmt.Errorf("unable to re-package %q: %v", blockID, err)
}
}
@@ -868,18 +662,15 @@ func (bm *Manager) hashData(data []byte) string {
func (bm *Manager) getPendingBlockLocked(blockID string) ([]byte, error) {
bm.assertLocked()
for _, p := range bm.openPackGroups {
if ndx := p.currentPackIndex; ndx != nil {
if p.currentPackData == nil {
continue
}
if ndx := bm.currentPackIndex; ndx != nil {
if bm.currentPackData != nil {
if blk, ok := ndx.Items[blockID]; ok {
offset, size := unpackOffsetAndSize(blk)
return p.currentPackData[offset : offset+size], nil
return bm.currentPackData[offset : offset+size], nil
}
}
}
return nil, storage.ErrBlockNotFound
}
@@ -914,26 +705,30 @@ func (bm *Manager) BlockInfo(blockID string) (Info, error) {
func (bm *Manager) findIndexForBlockLocked(blockID string) *blockmgrpb.Index {
bm.assertLocked()
if ndx := bm.groupToBlockToIndex[""][blockID]; ndx != nil {
if ndx := bm.blockIDToIndex[blockID]; ndx != nil {
return ndx
}
for _, v := range bm.groupToBlockToIndex {
if ndx := v[blockID]; ndx != nil {
return ndx
}
}
return nil
}
func (bm *Manager) blockInfoLocked(blockID string) (Info, error) {
if strings.HasPrefix(blockID, packBlockPrefix) {
if strings.HasPrefix(blockID, indexBlockPrefix) {
return Info{}, nil
}
bm.assertLocked()
if ndx, ok := bm.packBlockIDToIndex[blockID]; ok {
// pack block
return Info{
BlockID: blockID,
Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)),
PackOffset: 0,
Length: int64(ndx.PackLength),
}, nil
}
ndx := bm.findIndexForBlockLocked(blockID)
if ndx == nil {
return Info{}, storage.ErrBlockNotFound
@@ -943,8 +738,7 @@ func (bm *Manager) blockInfoLocked(blockID string) (Info, error) {
return Info{
BlockID: blockID,
PackGroup: ndx.PackGroup,
Timestamp: time.Unix(0, ndx.CreateTimeNanos),
Timestamp: time.Unix(0, int64(ndx.CreateTimeNanos)),
PackBlockID: ndx.PackBlockId,
PackOffset: int64(offset),
Length: int64(size),
@@ -1033,7 +827,7 @@ func listIndexBlocksFromStorage(st storage.Storage, full bool) ([]Info, error) {
maxCompactions = math.MaxInt32
}
ch, cancel := st.ListBlocks(packBlockPrefix)
ch, cancel := st.ListBlocks(indexBlockPrefix)
defer cancel()
var results []Info
@@ -1068,6 +862,10 @@ func listIndexBlocksFromStorage(st storage.Storage, full bool) ([]Info, error) {
// NewManager creates new block manager with given packing options and a formatter.
func NewManager(st storage.Storage, f FormattingOptions, caching CachingOptions) (*Manager, error) {
return newManagerWithTime(st, f, caching, time.Now)
}
func newManagerWithTime(st storage.Storage, f FormattingOptions, caching CachingOptions, timeNow func() time.Time) (*Manager, error) {
sf := FormatterFactories[f.BlockFormat]
if sf == nil {
return nil, fmt.Errorf("unsupported block format: %v", f.BlockFormat)
@@ -1078,20 +876,27 @@ func NewManager(st storage.Storage, f FormattingOptions, caching CachingOptions)
return nil, err
}
return &Manager{
m := &Manager{
Format: f,
openPackGroups: make(map[string]*packInfo),
timeNow: time.Now,
flushPackIndexesAfter: time.Now().Add(flushPackIndexTimeout),
timeNow: timeNow,
flushPackIndexesAfter: timeNow().Add(flushPackIndexTimeout),
pendingPackIndexes: nil,
maxPackedContentLength: f.MaxPackedContentLength,
maxPackSize: f.MaxPackSize,
formatter: formatter,
blockIDToIndex: make(map[string]*blockmgrpb.Index),
packBlockIDToIndex: make(map[string]*blockmgrpb.Index),
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
cache: newBlockCache(st, caching),
}, nil
}
}
m.startPackIndexLocked()
return m, nil
}
func getCompactedTimestamp(blk string) (time.Time, bool) {
if p := strings.Index(blk, compactedBlockSuffix); p >= 0 {
unixNano, err := strconv.ParseInt(blk[p+len(compactedBlockSuffix):], 16, 64)

View File

@@ -5,14 +5,17 @@
"encoding/hex"
"fmt"
"math/rand"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/kopia/kopia/internal/blockmgrpb"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/kopia/kopia/internal/storagetesting"
@@ -26,10 +29,15 @@
var fakeTime = time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC)
func init() {
//zerolog.SetGlobalLevel(zerolog.InfoLevel)
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
func TestBlockManagerEmptyFlush(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
bm.Flush()
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
@@ -39,8 +47,8 @@ func TestBlockManagerEmptyFlush(t *testing.T) {
func TestBlockZeroBytes1(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
writeBlockAndVerify(t, bm, "", []byte{})
bm := newTestBlockManager(data, keyTime, nil)
writeBlockAndVerify(t, bm, []byte{})
bm.Flush()
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
@@ -51,9 +59,9 @@ func TestBlockZeroBytes1(t *testing.T) {
func TestBlockZeroBytes2(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
writeBlockAndVerify(t, bm, "", seededRandomData(10, 10))
writeBlockAndVerify(t, bm, "", []byte{})
bm := newTestBlockManager(data, keyTime, nil)
writeBlockAndVerify(t, bm, seededRandomData(10, 10))
writeBlockAndVerify(t, bm, []byte{})
bm.Flush()
dumpBlockManagerData(data)
if got, want := len(data), 2; got != want {
@@ -65,10 +73,10 @@ func TestBlockZeroBytes2(t *testing.T) {
func TestBlockManagerSmallBlockWrites(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
writeBlockAndVerify(t, bm, "", seededRandomData(i, 10))
writeBlockAndVerify(t, bm, seededRandomData(i, 10))
}
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
@@ -79,37 +87,13 @@ func TestBlockManagerSmallBlockWrites(t *testing.T) {
}
}
func TestBlockManagerUnpackedBlockWrites(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
for i := 0; i < 100; i++ {
writeBlockAndVerify(t, bm, "", seededRandomData(i, 1001))
}
log.Printf("writing again")
// make sure deduping works.
for i := 0; i < 100; i++ {
writeBlockAndVerify(t, bm, "", seededRandomData(i, 1001))
}
t.Logf("finished writing again")
if got, want := len(data), 100; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush()
if got, want := len(data), 101; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockManagerDedupesPendingBlocks(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
writeBlockAndVerify(t, bm, "", seededRandomData(0, maxPackedContentLength-1))
writeBlockAndVerify(t, bm, seededRandomData(0, maxPackedContentLength-1))
}
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
@@ -123,23 +107,26 @@ func TestBlockManagerDedupesPendingBlocks(t *testing.T) {
func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
writeBlockAndVerify(t, bm, "", seededRandomData(0, 999))
writeBlockAndVerify(t, bm, "", seededRandomData(1, 999))
writeBlockAndVerify(t, bm, "", seededRandomData(2, 10))
if got, want := len(data), 1; got != want {
// no writes here, all data fits in a single pack.
writeBlockAndVerify(t, bm, seededRandomData(0, 950))
writeBlockAndVerify(t, bm, seededRandomData(1, 950))
writeBlockAndVerify(t, bm, seededRandomData(2, 10))
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
// no writes
writeBlockAndVerify(t, bm, "", seededRandomData(0, 999))
writeBlockAndVerify(t, bm, "", seededRandomData(1, 999))
writeBlockAndVerify(t, bm, "", seededRandomData(2, 10))
if got, want := len(data), 1; got != want {
// no writes here
writeBlockAndVerify(t, bm, seededRandomData(0, 950))
writeBlockAndVerify(t, bm, seededRandomData(1, 950))
writeBlockAndVerify(t, bm, seededRandomData(2, 10))
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush()
// this flushes the pack block + index block
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
@@ -149,7 +136,7 @@ func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) {
func TestBlockManagerEmpty(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
noSuchBlockID := md5hash([]byte("foo"))
@@ -168,90 +155,25 @@ func TestBlockManagerEmpty(t *testing.T) {
}
}
func TestBlockManagerPackIdentialToRawObject(t *testing.T) {
data0 := []byte{}
data1 := seededRandomData(1, 600)
data2 := seededRandomData(2, 600)
data3 := append(append([]byte(nil), data1...), data2...)
b0 := md5hash(data0)
b1 := md5hash(data1)
b2 := md5hash(data2)
b3 := md5hash(data3)
t.Logf("data0 hash: %v", b0)
t.Logf("data1 hash: %v", b1)
t.Logf("data2 hash: %v", b2)
t.Logf("data3 hash: %v", b3)
cases := []struct {
ordering [][]byte
expectedBlockCount int
}{
{ordering: [][]byte{data1, data2, data3, data0}, expectedBlockCount: 2},
{ordering: [][]byte{data0, data1, data2, data3}, expectedBlockCount: 2},
{ordering: [][]byte{data1, data0, data2, data3}, expectedBlockCount: 2},
{ordering: [][]byte{data0, data1, data0, data2, data3}, expectedBlockCount: 2},
{ordering: [][]byte{data0, data1, data0, data2, data3, data0}, expectedBlockCount: 2},
{ordering: [][]byte{data1, data0, data2, nil, data0, data3}, expectedBlockCount: 3},
{ordering: [][]byte{data1, data2, nil, data0, data3}, expectedBlockCount: 4},
{ordering: [][]byte{data3, nil, data1, data2, data0}, expectedBlockCount: 3},
{ordering: [][]byte{data3, data1, data2, data0}, expectedBlockCount: 2},
{ordering: [][]byte{data3, data0, data1, data2}, expectedBlockCount: 2},
{ordering: [][]byte{data3, data1, data0, data2}, expectedBlockCount: 2},
{ordering: [][]byte{data3, data0, data1, data0, data2, data0}, expectedBlockCount: 2},
}
for i, tc := range cases {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
t.Run(fmt.Sprintf("case-%v", i), func(t *testing.T) {
for _, b := range tc.ordering {
if b == nil {
bm.Flush()
continue
}
t.Logf("writing %v", md5hash(b))
writeBlockAndVerify(t, bm, "some-group", b)
}
verifyBlock(t, bm, b0, data0)
verifyBlock(t, bm, b1, data1)
verifyBlock(t, bm, b2, data2)
verifyBlock(t, bm, b3, data3)
bm.Flush()
dumpBlockManagerData(data)
verifyBlock(t, bm, b0, data0)
verifyBlock(t, bm, b1, data1)
verifyBlock(t, bm, b2, data2)
verifyBlock(t, bm, b3, data3)
bm.Flush()
// 2 data blocks written.
if got, want := len(data), tc.expectedBlockCount; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
})
}
}
func TestBlockManagerRepack(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
// disable preamble, postamble and padding, so that each pack block is identical to its contents
bm.maxPreambleLength = 0
bm.minPreambleLength = 0
bm.paddingUnit = 0
d1 := seededRandomData(1, 10)
d2 := seededRandomData(2, 20)
d3 := seededRandomData(3, 30)
writeBlockAndVerify(t, bm, "g1", d1)
writeBlockAndVerify(t, bm, d1)
bm.Flush()
writeBlockAndVerify(t, bm, "g1", d2)
writeBlockAndVerify(t, bm, d2)
bm.Flush()
writeBlockAndVerify(t, bm, "g1", d3)
writeBlockAndVerify(t, bm, d3)
bm.Flush()
// 3 data blocks, 3 index blocks.
@@ -259,7 +181,10 @@ func TestBlockManagerRepack(t *testing.T) {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
}
if err := bm.Repackage("g1", 5); err != nil {
log.Printf("before repackage")
dumpBlockManagerData(data)
if err := bm.Repackage(5); err != nil {
t.Errorf("repackage failure: %v", err)
}
bm.Flush()
@@ -269,12 +194,13 @@ func TestBlockManagerRepack(t *testing.T) {
t.Errorf("unexpected block count: %v, wanted %v", got, want)
}
setFakeTime(bm, fakeTime.Add(1*time.Second))
bm.timeNow = fakeTimeNowFrozen(fakeTime.Add(1 * time.Second))
if err := bm.Repackage("g1", 30); err != nil {
if err := bm.Repackage(30); err != nil {
t.Errorf("repackage failure: %v", err)
}
bm.Flush()
log.Printf("after repackage")
dumpBlockManagerData(data)
@@ -310,12 +236,12 @@ func verifyActiveIndexBlockCount(t *testing.T, bm *Manager, expected int) {
func TestBlockManagerInternalFlush(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
b := make([]byte, 25)
rand.Read(b)
writeBlockAndVerify(t, bm, "", b)
writeBlockAndVerify(t, bm, b)
}
// 1 data block written, but no index yet.
@@ -327,7 +253,7 @@ func TestBlockManagerInternalFlush(t *testing.T) {
for i := 0; i < 100; i++ {
b := make([]byte, 25)
rand.Read(b)
writeBlockAndVerify(t, bm, "", b)
writeBlockAndVerify(t, bm, b)
}
// 2 data blocks written, but no index yet.
@@ -348,7 +274,7 @@ func TestBlockManagerInternalFlush(t *testing.T) {
func TestBlockManagerWriteMultiple(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
bm := newTestBlockManager(data, keyTime, nil)
var blockIDs []string
@@ -356,7 +282,7 @@ func TestBlockManagerWriteMultiple(t *testing.T) {
//t.Logf("i=%v", i)
b := seededRandomData(i, i%113)
//t.Logf("writing block #%v with %x", i, b)
blkID, err := bm.WriteBlock("", b)
blkID, err := bm.WriteBlock(b, "")
//t.Logf("wrote %v=%v", i, blkID)
if err != nil {
t.Errorf("err: %v", err)
@@ -375,7 +301,7 @@ func TestBlockManagerWriteMultiple(t *testing.T) {
bm.Flush()
t.Logf("data block count: %v", len(data))
//dumpBlockManagerData(data)
bm = newTestBlockManager(data, keyTime)
bm = newTestBlockManager(data, keyTime, nil)
}
}
@@ -388,84 +314,16 @@ func TestBlockManagerWriteMultiple(t *testing.T) {
}
}
func TestBlockManagerListGroups(t *testing.T) {
blockSizes := []int{10, 1500}
for _, blockSize := range blockSizes {
blockSize := blockSize
t.Run(fmt.Sprintf("block-size-%v", blockSize), func(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
data1 := seededRandomData(1, blockSize)
data2 := seededRandomData(2, blockSize)
data3 := seededRandomData(3, blockSize)
writeBlockAndVerify(t, bm, "group1", data1)
writeBlockAndVerify(t, bm, "group1", data2)
writeBlockAndVerify(t, bm, "group1", data3)
writeBlockAndVerify(t, bm, "group2", data1)
writeBlockAndVerify(t, bm, "group2", data3)
writeBlockAndVerify(t, bm, "group3", data1)
writeBlockAndVerify(t, bm, "group3", data2)
writeBlockAndVerify(t, bm, "group4", data2)
writeBlockAndVerify(t, bm, "group4", data3)
verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3))
verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3))
verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2))
verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3))
bm.Flush()
data1b := seededRandomData(11, blockSize)
data2b := seededRandomData(12, blockSize)
data3b := seededRandomData(13, blockSize)
bm = newTestBlockManager(data, keyTime)
writeBlockAndVerify(t, bm, "group1", data1b)
writeBlockAndVerify(t, bm, "group1", data2b)
writeBlockAndVerify(t, bm, "group1", data3b)
writeBlockAndVerify(t, bm, "group2", data1b)
writeBlockAndVerify(t, bm, "group2", data3b)
writeBlockAndVerify(t, bm, "group3", data1b)
writeBlockAndVerify(t, bm, "group3", data2b)
writeBlockAndVerify(t, bm, "group4", data2b)
writeBlockAndVerify(t, bm, "group4", data3b)
verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3), md5hash(data1b), md5hash(data2b), md5hash(data3b))
verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3), md5hash(data1b), md5hash(data3b))
verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2), md5hash(data1b), md5hash(data2b))
verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3), md5hash(data2b), md5hash(data3b))
bm.Flush()
bm = newTestBlockManager(data, keyTime)
verifyGroupListContains(t, bm, "group1", md5hash(data1), md5hash(data2), md5hash(data3), md5hash(data1b), md5hash(data2b), md5hash(data3b))
verifyGroupListContains(t, bm, "group2", md5hash(data1), md5hash(data3), md5hash(data1b), md5hash(data3b))
verifyGroupListContains(t, bm, "group3", md5hash(data1), md5hash(data2), md5hash(data1b), md5hash(data2b))
verifyGroupListContains(t, bm, "group4", md5hash(data2), md5hash(data3), md5hash(data2b), md5hash(data3b))
dumpBlockManagerData(data)
})
}
}
func TestBlockManagerConcurrency(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
preexistingBlock := writeBlockAndVerify(t, bm, "", seededRandomData(10, 100))
bm := newTestBlockManager(data, keyTime, nil)
preexistingBlock := writeBlockAndVerify(t, bm, seededRandomData(10, 100))
bm.Flush()
bm1 := newTestBlockManager(data, keyTime)
bm2 := newTestBlockManager(data, keyTime)
bm3 := newTestBlockManager(data, keyTime)
setFakeTime(bm3, fakeTime.Add(1))
bm1 := newTestBlockManager(data, keyTime, nil)
bm2 := newTestBlockManager(data, keyTime, nil)
bm3 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1))
// all bm* can see pre-existing block
verifyBlock(t, bm1, preexistingBlock, seededRandomData(10, 100))
@@ -473,14 +331,14 @@ func TestBlockManagerConcurrency(t *testing.T) {
verifyBlock(t, bm3, preexistingBlock, seededRandomData(10, 100))
// write the same block in all managers.
sharedBlock := writeBlockAndVerify(t, bm1, "", seededRandomData(20, 100))
writeBlockAndVerify(t, bm2, "", seededRandomData(20, 100))
writeBlockAndVerify(t, bm3, "", seededRandomData(20, 100))
sharedBlock := writeBlockAndVerify(t, bm1, seededRandomData(20, 100))
writeBlockAndVerify(t, bm2, seededRandomData(20, 100))
writeBlockAndVerify(t, bm3, seededRandomData(20, 100))
// write unique block per manager.
bm1block := writeBlockAndVerify(t, bm1, "", seededRandomData(31, 100))
bm2block := writeBlockAndVerify(t, bm2, "", seededRandomData(32, 100))
bm3block := writeBlockAndVerify(t, bm3, "", seededRandomData(33, 100))
bm1block := writeBlockAndVerify(t, bm1, seededRandomData(31, 100))
bm2block := writeBlockAndVerify(t, bm2, seededRandomData(32, 100))
bm3block := writeBlockAndVerify(t, bm3, seededRandomData(33, 100))
// make sure they can't see each other's unflushed blocks.
verifyBlockNotFound(t, bm1, bm2block)
@@ -502,7 +360,7 @@ func TestBlockManagerConcurrency(t *testing.T) {
verifyBlockNotFound(t, bm3, bm2block)
// new block manager at this point can see all data.
bm4 := newTestBlockManager(data, keyTime)
bm4 := newTestBlockManager(data, keyTime, nil)
verifyBlock(t, bm4, preexistingBlock, seededRandomData(10, 100))
verifyBlock(t, bm4, sharedBlock, seededRandomData(20, 100))
verifyBlock(t, bm4, bm1block, seededRandomData(31, 100))
@@ -521,7 +379,7 @@ func TestBlockManagerConcurrency(t *testing.T) {
}
// new block manager at this point can see all data.
bm5 := newTestBlockManager(data, keyTime)
bm5 := newTestBlockManager(data, keyTime, nil)
verifyBlock(t, bm5, preexistingBlock, seededRandomData(10, 100))
verifyBlock(t, bm5, sharedBlock, seededRandomData(20, 100))
verifyBlock(t, bm5, bm1block, seededRandomData(31, 100))
@@ -535,11 +393,10 @@ func TestBlockManagerConcurrency(t *testing.T) {
func TestDeleteBlock(t *testing.T) {
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
setFakeTimeWithAutoAdvance(bm, fakeTime, 1)
block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100))
bm := newTestBlockManager(data, keyTime, nil)
block1 := writeBlockAndVerify(t, bm, seededRandomData(10, 100))
bm.Flush()
block2 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(11, 100))
block2 := writeBlockAndVerify(t, bm, seededRandomData(11, 100))
if err := bm.DeleteBlock(block1); err != nil {
t.Errorf("unable to delete block: %v", block1)
}
@@ -549,7 +406,7 @@ func TestDeleteBlock(t *testing.T) {
verifyBlockNotFound(t, bm, block1)
verifyBlockNotFound(t, bm, block2)
bm.Flush()
bm = newTestBlockManager(data, keyTime)
bm = newTestBlockManager(data, keyTime, nil)
dumpBlockManagerData(data)
verifyBlockNotFound(t, bm, block1)
verifyBlockNotFound(t, bm, block2)
@@ -573,37 +430,35 @@ func TestDeleteAndRecreate(t *testing.T) {
// write a block
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime)
setFakeTimeWithAutoAdvance(bm, fakeTime, 1)
block1 := writeBlockAndVerify(t, bm, "some-group", seededRandomData(10, 100))
bm := newTestBlockManager(data, keyTime, fakeTimeNowFrozen(fakeTime))
block1 := writeBlockAndVerify(t, bm, seededRandomData(10, 100))
bm.Flush()
// delete but at given timestamp but don't commit yet.
bm0 := newTestBlockManager(data, keyTime)
setFakeTimeWithAutoAdvance(bm0, tc.deletionTime, 1)
bm0 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1))
bm0.DeleteBlock(block1)
// delete it at t0+10
bm1 := newTestBlockManager(data, keyTime)
setFakeTimeWithAutoAdvance(bm1, fakeTime.Add(10), 1)
bm1 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10), 1))
verifyBlock(t, bm1, block1, seededRandomData(10, 100))
bm1.DeleteBlock(block1)
bm1.Flush()
// recreate at t0+20
bm2 := newTestBlockManager(data, keyTime)
setFakeTimeWithAutoAdvance(bm2, fakeTime.Add(20), 1)
block2 := writeBlockAndVerify(t, bm2, "some-group", seededRandomData(10, 100))
bm2 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20), 1))
block2 := writeBlockAndVerify(t, bm2, seededRandomData(10, 100))
bm2.Flush()
// commit deletion from bm0 (t0+5)
bm0.Flush()
dumpBlockManagerData(data)
if block1 != block2 {
t.Errorf("got invalid block %v, expected %v", block2, block1)
}
bm3 := newTestBlockManager(data, keyTime)
bm3 := newTestBlockManager(data, keyTime, nil)
if tc.isVisible {
verifyBlock(t, bm3, block1, seededRandomData(10, 100))
} else {
@@ -613,19 +468,20 @@ func TestDeleteAndRecreate(t *testing.T) {
}
}
func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time) *Manager {
func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, timeFunc func() time.Time) *Manager {
st := storagetesting.NewMapStorage(data, keyTime)
//st = logging.NewWrapper(st)
bm, err := NewManager(st, FormattingOptions{
if timeFunc == nil {
timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1)
}
bm, err := newManagerWithTime(st, FormattingOptions{
BlockFormat: "TESTONLY_MD5",
MaxPackedContentLength: maxPackedContentLength,
MaxPackSize: maxPackSize,
}, CachingOptions{})
}, CachingOptions{}, timeFunc)
if err != nil {
panic("can't create block manager: " + err.Error())
}
setFakeTime(bm, fakeTime)
return bm
}
@@ -633,7 +489,7 @@ func getIndexCount(d map[string][]byte) int {
var cnt int
for k := range d {
if strings.HasPrefix(k, packBlockPrefix) {
if strings.HasPrefix(k, indexBlockPrefix) {
cnt++
}
}
@@ -641,12 +497,15 @@ func getIndexCount(d map[string][]byte) int {
return cnt
}
func setFakeTime(bm *Manager, t time.Time) {
bm.timeNow = func() time.Time { return t }
func fakeTimeNowFrozen(t time.Time) func() time.Time {
return fakeTimeNowWithAutoAdvance(t, 0)
}
func setFakeTimeWithAutoAdvance(bm *Manager, t time.Time, dt time.Duration) {
bm.timeNow = func() time.Time {
func fakeTimeNowWithAutoAdvance(t time.Time, dt time.Duration) func() time.Time {
var mu sync.Mutex
return func() time.Time {
mu.Lock()
defer mu.Unlock()
ret := t
t = t.Add(dt)
return ret
@@ -685,10 +544,10 @@ func verifyBlock(t *testing.T, bm *Manager, blockID string, b []byte) {
}
}
func writeBlockAndVerify(t *testing.T, bm *Manager, packGroup string, b []byte) string {
func writeBlockAndVerify(t *testing.T, bm *Manager, b []byte) string {
t.Helper()
blockID, err := bm.WriteBlock(packGroup, b)
blockID, err := bm.WriteBlock(b, "")
if err != nil {
t.Errorf("err: %v", err)
}
@@ -716,33 +575,22 @@ func md5hash(b []byte) string {
func dumpBlockManagerData(data map[string][]byte) {
for k, v := range data {
if k[0] == 'P' {
if k[0] == 'I' {
var payload blockmgrpb.Indexes
proto.Unmarshal(v, &payload)
log.Printf("data[%v] = %v", k, proto.MarshalTextString(&payload))
fmt.Printf("index %v:\n", k)
for _, ndx := range payload.Indexes {
fmt.Printf(" pack %v len: %v created %v\n", ndx.PackBlockId, ndx.PackLength, time.Unix(0, int64(ndx.CreateTimeNanos)).Local())
for blk, os := range ndx.Items {
off, size := unpackOffsetAndSize(os)
fmt.Printf(" block[%v]={offset:%v size:%v}\n", blk, off, size)
}
for _, del := range ndx.DeletedItems {
fmt.Printf(" deleted %v\n", del)
}
}
} else {
log.Printf("data[%v] = %v bytes", k, len(v))
fmt.Printf("data %v (%v bytes)\n", k, len(v))
}
}
}
func verifyGroupListContains(t *testing.T, bm *Manager, groupID string, expected ...string) {
got := map[string]bool{}
want := map[string]bool{}
blks, err := bm.ListGroupBlocks(groupID)
if err != nil {
t.Errorf("error listing blocks: %v", err)
return
}
for _, a := range blks {
got[a.BlockID] = true
}
for _, e := range expected {
want[e] = true
}
if !reflect.DeepEqual(got, want) {
t.Errorf("unexpected contents of group %q: %v, wanted %v", groupID, got, want)
}
}

View File

@@ -158,7 +158,7 @@ func (c *diskBlockCache) readBlocksFromCacheFile(f *os.File) ([]Info, error) {
func (c *diskBlockCache) readBlocksFromSource(maxCompactions int) ([]Info, error) {
var blocks []Info
ch, cancel := c.st.ListBlocks(packBlockPrefix)
ch, cancel := c.st.ListBlocks(indexBlockPrefix)
defer cancel()
numCompactions := 0

View File

@@ -1,21 +1,15 @@
package block
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/kopia/kopia/internal/blockmgrpb"
)
type packIndexes []*packIndex
type offsetAndSize struct {
offset uint32
size uint32
@@ -52,50 +46,7 @@ func (o *offsetAndSize) UnmarshalJSON(b []byte) error {
return nil
}
type packIndex struct {
PackBlockID string `json:"packBlock,omitempty"`
PackGroup string `json:"packGroup,omitempty"`
CreateTime time.Time `json:"createTime"`
Items map[string]offsetAndSize `json:"items"`
DeletedItems []string `json:"deletedItems,omitempty"`
}
func loadPackIndexesLegacy(r io.Reader) ([]*blockmgrpb.Index, error) {
var pi packIndexes
if err := json.NewDecoder(r).Decode(&pi); err != nil {
return nil, err
}
var result []*blockmgrpb.Index
for _, v := range pi {
result = append(result, convertLegacyIndex(v))
}
return result, nil
}
func convertLegacyIndex(pi *packIndex) *blockmgrpb.Index {
res := &blockmgrpb.Index{
CreateTimeNanos: pi.CreateTime.UnixNano(),
DeletedItems: pi.DeletedItems,
PackBlockId: pi.PackBlockID,
PackGroup: pi.PackGroup,
}
if len(pi.Items) > 0 {
res.Items = make(map[string]uint64)
for k, v := range pi.Items {
res.Items[k] = packOffsetAndSize(v.offset, v.size)
}
}
return res
}
func loadPackIndexesNew(data []byte) ([]*blockmgrpb.Index, error) {
func loadPackIndexes(data []byte) ([]*blockmgrpb.Index, error) {
var b blockmgrpb.Indexes
if err := proto.Unmarshal(data, &b); err != nil {
@@ -104,12 +55,3 @@ func loadPackIndexesNew(data []byte) ([]*blockmgrpb.Index, error) {
return b.Indexes, nil
}
func loadPackIndexes(data []byte) ([]*blockmgrpb.Index, error) {
gz, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return loadPackIndexesNew(data)
}
return loadPackIndexesLegacy(gz)
}

View File

@@ -9,7 +9,6 @@
snapshotCommands = app.Command("snapshot", "Commands to manipulate snapshots.").Alias("snap")
policyCommands = app.Command("policy", "Commands to manipulate snapshotting policies.").Alias("policies")
metadataCommands = app.Command("metadata", "Low-level commands to manipulate metadata items.").Alias("md")
manifestCommands = app.Command("manifest", "Low-level commands to manipulate manifest items.")
objectCommands = app.Command("object", "Commands to manipulate objects in repository.").Alias("obj")
blockCommands = app.Command("block", "Commands to manipulate virtual blocks in repository.").Alias("blk")

View File

@@ -10,8 +10,6 @@
var (
blockListCommand = blockCommands.Command("list", "List blocks").Alias("ls")
blockListKind = blockListCommand.Flag("kind", "Block kind").Default("all").Enum("all", "physical", "packed", "nonpacked", "packs")
blockListGroup = blockListCommand.Flag("group", "List blocks belonging to a given group").String()
blockListLong = blockListCommand.Flag("long", "Long output").Short('l').Bool()
blockListPrefix = blockListCommand.Flag("prefix", "Prefix").String()
blockListSort = blockListCommand.Flag("sort", "Sort order").Default("name").Enum("name", "size", "time", "none", "pack")
@@ -23,13 +21,7 @@ func runListBlocksAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
var blocks []block.Info
var err error
if *blockListGroup != "" {
blocks, err = rep.Blocks.ListGroupBlocks(*blockListGroup)
} else {
blocks, err = rep.Blocks.ListBlocks(*blockListPrefix, *blockListKind)
}
blocks, err := rep.Blocks.ListBlocks(*blockListPrefix)
if err != nil {
return err
}
@@ -61,14 +53,10 @@ func runListBlocksAction(context *kingpin.ParseContext) error {
uniquePacks[b.PackBlockID] = true
}
if *blockListLong {
grp := b.PackGroup
if grp == "" {
grp = "default"
}
if b.PackBlockID != "" {
fmt.Printf("%-34v %10v %v %v in %v offset %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), grp, b.PackBlockID, b.PackOffset)
fmt.Printf("%-34v %10v %v in %v offset %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), b.PackBlockID, b.PackOffset)
} else {
fmt.Printf("%-34v %10v %v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat), grp)
fmt.Printf("%-34v %10v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormat))
}
} else {
fmt.Printf("%v\n", b.BlockID)

View File

@@ -6,15 +6,14 @@
var (
blockRepackCommand = blockCommands.Command("repack", "Repackage small blocks into bigger ones")
blockRepackGroup = blockRepackCommand.Flag("group", "Group to repack").Default("DIR").String()
blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Int64()
blockRepackSizeThreshold = blockRepackCommand.Flag("max-size", "Max size of block to re-pack").Default("500000").Uint64()
)
func runBlockRepackAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
if err := rep.Blocks.Repackage(*blockRepackGroup, *blockRepackSizeThreshold); err != nil {
if err := rep.Blocks.Repackage(*blockRepackSizeThreshold); err != nil {
return err
}

View File

@@ -12,7 +12,6 @@
var (
blockStatsCommand = blockCommands.Command("stats", "Block statistics")
blockStatsKind = blockStatsCommand.Flag("kind", "Kinds of blocks").Default("logical").Enum("all", "logical", "physical", "packed", "nonpacked", "packs")
blockStatsRaw = blockStatsCommand.Flag("raw", "Raw numbers").Short('r').Bool()
blockStatsGroup = blockStatsCommand.Flag("group", "Display stats about blocks belonging to a given group").String()
)
@@ -21,13 +20,7 @@ func runBlockStatsAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
var blocks []block.Info
var err error
if *blockStatsGroup != "" {
blocks, err = rep.Blocks.ListGroupBlocks(*blockStatsGroup)
} else {
blocks, err = rep.Blocks.ListBlocks("", *blockStatsKind)
}
blocks, err := rep.Blocks.ListBlocks("")
if err != nil {
return err
}
@@ -54,7 +47,7 @@ func runBlockStatsAction(context *kingpin.ParseContext) error {
}
}
fmt.Printf("Block statistics (%v)\n", *blockStatsKind)
fmt.Printf("Block statistics\n")
if len(blocks) == 0 {
return nil
}

View File

@@ -15,7 +15,7 @@ func runListObjectsAction(context *kingpin.ParseContext) error {
rep := mustOpenRepository(nil)
defer rep.Close()
info, err := rep.Blocks.ListBlocks(*objectListPrefix, "all")
info, err := rep.Blocks.ListBlocks(*objectListPrefix)
if err != nil {
return err
}

View File

@@ -22,8 +22,6 @@
createAvgBlockSize = createCommand.Flag("avg-block-size", "Average size of a data block.").PlaceHolder("KB").Default("10240").Int()
createMaxBlockSize = createCommand.Flag("max-block-size", "Maximum size of a data block.").PlaceHolder("KB").Default("20480").Int()
createMaxPackedContentLength = createCommand.Flag("max-packed-file-size", "Minimum size of a file to include in a pack.").PlaceHolder("KB").Default("4096").Int()
createOverwrite = createCommand.Flag("overwrite", "Overwrite existing data (DANGEROUS).").Bool()
createOnly = createCommand.Flag("create-only", "Create repository, but don't connect to it.").Short('c').Bool()
)
@@ -41,8 +39,6 @@ func newRepositoryOptionsFromFlags() *repo.NewRepositoryOptions {
MinBlockSize: *createMinBlockSize * 1024,
AvgBlockSize: *createAvgBlockSize * 1024,
MaxBlockSize: *createMaxBlockSize * 1024,
MaxPackedContentLength: *createMaxPackedContentLength * 1024,
}
}

View File

@@ -32,8 +32,8 @@
type Index struct {
PackBlockId string `protobuf:"bytes,1,opt,name=pack_block_id,json=packBlockId,proto3" json:"pack_block_id,omitempty"`
PackGroup string `protobuf:"bytes,2,opt,name=pack_group,json=packGroup,proto3" json:"pack_group,omitempty"`
CreateTimeNanos int64 `protobuf:"varint,3,opt,name=create_time_nanos,json=createTimeNanos,proto3" json:"create_time_nanos,omitempty"`
PackLength uint64 `protobuf:"varint,2,opt,name=pack_length,json=packLength,proto3" json:"pack_length,omitempty"`
CreateTimeNanos uint64 `protobuf:"varint,3,opt,name=create_time_nanos,json=createTimeNanos,proto3" json:"create_time_nanos,omitempty"`
Items map[string]uint64 `protobuf:"bytes,4,rep,name=items" json:"items,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
DeletedItems []string `protobuf:"bytes,5,rep,name=deleted_items,json=deletedItems" json:"deleted_items,omitempty"`
}
@@ -50,14 +50,14 @@ func (m *Index) GetPackBlockId() string {
return ""
}
func (m *Index) GetPackGroup() string {
func (m *Index) GetPackLength() uint64 {
if m != nil {
return m.PackGroup
return m.PackLength
}
return ""
return 0
}
func (m *Index) GetCreateTimeNanos() int64 {
func (m *Index) GetCreateTimeNanos() uint64 {
if m != nil {
return m.CreateTimeNanos
}
@@ -119,11 +119,10 @@ func (m *Index) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.PackBlockId)))
i += copy(dAtA[i:], m.PackBlockId)
}
if len(m.PackGroup) > 0 {
dAtA[i] = 0x12
if m.PackLength != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.PackGroup)))
i += copy(dAtA[i:], m.PackGroup)
i = encodeVarintBlockIndex(dAtA, i, uint64(m.PackLength))
}
if m.CreateTimeNanos != 0 {
dAtA[i] = 0x18
@@ -210,9 +209,8 @@ func (m *Index) Size() (n int) {
if l > 0 {
n += 1 + l + sovBlockIndex(uint64(l))
}
l = len(m.PackGroup)
if l > 0 {
n += 1 + l + sovBlockIndex(uint64(l))
if m.PackLength != 0 {
n += 1 + sovBlockIndex(uint64(m.PackLength))
}
if m.CreateTimeNanos != 0 {
n += 1 + sovBlockIndex(uint64(m.CreateTimeNanos))
@@ -318,10 +316,10 @@ func (m *Index) Unmarshal(dAtA []byte) error {
m.PackBlockId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PackGroup", wireType)
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PackLength", wireType)
}
var stringLen uint64
m.PackLength = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowBlockIndex
@@ -331,21 +329,11 @@ func (m *Index) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
m.PackLength |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthBlockIndex
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PackGroup = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field CreateTimeNanos", wireType)
@@ -360,7 +348,7 @@ func (m *Index) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
m.CreateTimeNanos |= (int64(b) & 0x7F) << shift
m.CreateTimeNanos |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
@@ -711,24 +699,24 @@ func skipBlockIndex(dAtA []byte) (n int, err error) {
func init() { proto.RegisterFile("internal/blockmgrpb/block_index.proto", fileDescriptorBlockIndex) }
var fileDescriptorBlockIndex = []byte{
// 293 bytes of a gzipped FileDescriptorProto
// 292 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0xcd, 0x4a, 0xc3, 0x40,
0x14, 0x85, 0x9d, 0xa6, 0x69, 0xe9, 0xad, 0x45, 0x3b, 0xb8, 0x08, 0xa2, 0x21, 0x54, 0xc4, 0xa0,
0x50, 0x41, 0x37, 0xc5, 0x65, 0x41, 0x24, 0x1b, 0x17, 0x83, 0x2b, 0x37, 0x21, 0x3f, 0x97, 0x32,
0x24, 0x99, 0x84, 0xc9, 0x54, 0xec, 0x9b, 0xf8, 0x48, 0x2e, 0x7d, 0x04, 0x89, 0xef, 0x21, 0x32,
0x33, 0x91, 0xee, 0xce, 0xfd, 0xee, 0x39, 0xcc, 0x99, 0x0b, 0x97, 0x5c, 0x28, 0x94, 0x22, 0x29,
0x6f, 0xd3, 0xb2, 0xce, 0x8a, 0x6a, 0x23, 0x9b, 0xd4, 0xca, 0x98, 0x8b, 0x1c, 0xdf, 0x97, 0x8d,
0xac, 0x55, 0xbd, 0xf8, 0x25, 0xe0, 0x46, 0x7a, 0xa6, 0x0b, 0x98, 0x35, 0x49, 0x56, 0xc4, 0xbd,
0x27, 0xf7, 0x48, 0x40, 0xc2, 0x09, 0x9b, 0x6a, 0xb8, 0xd6, 0x2c, 0xca, 0xe9, 0x39, 0x80, 0xf1,
0x6c, 0x64, 0xbd, 0x6d, 0xbc, 0x81, 0x31, 0x4c, 0x34, 0x79, 0xd2, 0x80, 0x5e, 0xc3, 0x3c, 0x93,
0x98, 0x28, 0x8c, 0x15, 0xaf, 0x30, 0x16, 0x89, 0xa8, 0x5b, 0xcf, 0x09, 0x48, 0xe8, 0xb0, 0x23,
0xbb, 0x78, 0xe1, 0x15, 0x3e, 0x6b, 0x4c, 0xaf, 0xc0, 0xe5, 0x0a, 0xab, 0xd6, 0x1b, 0x06, 0x4e,
0x38, 0xbd, 0x9b, 0x2f, 0x4d, 0x8b, 0x65, 0xa4, 0xd9, 0xa3, 0x50, 0x72, 0xc7, 0xec, 0x9e, 0x5e,
0xc0, 0x2c, 0xc7, 0x12, 0x15, 0xe6, 0xb1, 0x0d, 0xb8, 0x81, 0x13, 0x4e, 0xd8, 0x61, 0x0f, 0x4d,
0xe0, 0x74, 0x05, 0xb0, 0x4f, 0xd2, 0x63, 0x70, 0x0a, 0xdc, 0xf5, 0x1f, 0xd0, 0x92, 0x9e, 0x80,
0xfb, 0x96, 0x94, 0x5b, 0x34, 0x9d, 0x87, 0xcc, 0x0e, 0x0f, 0x83, 0x15, 0x59, 0xdc, 0xc0, 0xd8,
0xbc, 0x8c, 0x2d, 0x0d, 0x60, 0xcc, 0xad, 0xf4, 0x88, 0x29, 0x35, 0xb2, 0xa5, 0xd8, 0x3f, 0x5e,
0x9f, 0x7d, 0x76, 0x3e, 0xf9, 0xea, 0x7c, 0xf2, 0xdd, 0xf9, 0xe4, 0xe3, 0xc7, 0x3f, 0x78, 0x85,
0xfd, 0x75, 0xd3, 0x91, 0x39, 0xe9, 0xfd, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc4, 0xf5, 0x68,
0xbf, 0x7b, 0x01, 0x00, 0x00,
0x14, 0x85, 0x9d, 0xa6, 0x69, 0xe9, 0xad, 0x45, 0x3b, 0xb8, 0x08, 0x22, 0x31, 0x44, 0xc4, 0xa0,
0x10, 0x41, 0x37, 0xc5, 0x65, 0xc1, 0x45, 0x40, 0x5c, 0x04, 0x57, 0x6e, 0x86, 0xfc, 0x5c, 0xea,
0x90, 0x64, 0x12, 0x92, 0x51, 0xec, 0xce, 0xc7, 0xf0, 0x91, 0x5c, 0xfa, 0x08, 0x12, 0x5f, 0x44,
0x66, 0x26, 0xd2, 0xdd, 0x39, 0xdf, 0xbd, 0x87, 0x39, 0x73, 0xe1, 0x9c, 0x0b, 0x89, 0xad, 0x48,
0xca, 0xeb, 0xb4, 0xac, 0xb3, 0xa2, 0xda, 0xb4, 0x4d, 0x6a, 0x24, 0xe3, 0x22, 0xc7, 0xf7, 0xb0,
0x69, 0x6b, 0x59, 0xfb, 0x1f, 0x23, 0xb0, 0x23, 0xe5, 0xa9, 0x0f, 0x8b, 0x26, 0xc9, 0x0a, 0x36,
0xec, 0xe4, 0x0e, 0xf1, 0x48, 0x30, 0x8b, 0xe7, 0x0a, 0xae, 0x15, 0x8b, 0x72, 0x7a, 0x0a, 0xda,
0xb2, 0x12, 0xc5, 0x46, 0xbe, 0x38, 0x23, 0x8f, 0x04, 0xe3, 0x18, 0x14, 0x7a, 0xd0, 0x84, 0x5e,
0xc2, 0x32, 0x6b, 0x31, 0x91, 0xc8, 0x24, 0xaf, 0x90, 0x89, 0x44, 0xd4, 0x9d, 0x63, 0xe9, 0xb5,
0x03, 0x33, 0x78, 0xe2, 0x15, 0x3e, 0x2a, 0x4c, 0x2f, 0xc0, 0xe6, 0x12, 0xab, 0xce, 0x19, 0x7b,
0x56, 0x30, 0xbf, 0x59, 0x86, 0xba, 0x47, 0x18, 0x29, 0x76, 0x2f, 0x64, 0xbb, 0x8d, 0xcd, 0x9c,
0x9e, 0xc1, 0x22, 0xc7, 0x12, 0x25, 0xe6, 0xcc, 0x04, 0x6c, 0xcf, 0x0a, 0x66, 0xf1, 0xfe, 0x00,
0x75, 0xe0, 0x78, 0x05, 0xb0, 0x4b, 0xd2, 0x43, 0xb0, 0x0a, 0xdc, 0x0e, 0x5f, 0x50, 0x92, 0x1e,
0x81, 0xfd, 0x96, 0x94, 0xaf, 0x38, 0x94, 0x36, 0xe6, 0x6e, 0xb4, 0x22, 0xfe, 0x15, 0x4c, 0xf5,
0xcb, 0xd8, 0x51, 0x0f, 0xa6, 0xdc, 0x48, 0x87, 0xe8, 0x52, 0x13, 0x53, 0x2a, 0xfe, 0xc7, 0xeb,
0x93, 0xaf, 0xde, 0x25, 0xdf, 0xbd, 0x4b, 0x7e, 0x7a, 0x97, 0x7c, 0xfe, 0xba, 0x7b, 0xcf, 0xb0,
0xbb, 0x6f, 0x3a, 0xd1, 0x47, 0xbd, 0xfd, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x26, 0xd1, 0xd9, 0xf1,
0x7d, 0x01, 0x00, 0x00,
}

View File

@@ -4,8 +4,8 @@ option go_package = "blockmgrpb";
message Index {
string pack_block_id = 1;
string pack_group = 2;
int64 create_time_nanos = 3;
uint64 pack_length = 2;
uint64 create_time_nanos = 3;
map<string, uint64> items = 4;
repeated string deleted_items = 5;
}

View File

@@ -19,7 +19,7 @@
// ErrNotFound is returned when the metadata item is not found.
var ErrNotFound = errors.New("not found")
const manifestGroupID = "manifests"
const manifestBlockPrefix = "M"
// Manager organizes JSON manifests of various kinds, including snapshot manifests
type Manager struct {
@@ -175,15 +175,11 @@ func (m *Manager) flushPendingEntriesLocked() (string, error) {
gz.Flush()
gz.Close()
blockID, err := m.b.WriteBlock(manifestGroupID, buf.Bytes())
blockID, err := m.b.WriteBlock(buf.Bytes(), manifestBlockPrefix)
if err != nil {
return "", err
}
if err := m.b.Flush(); err != nil {
return "", err
}
m.pendingEntries = nil
return blockID, nil
}
@@ -212,12 +208,13 @@ func (m *Manager) load() error {
m.entries = map[string]*manifestEntry{}
log.Debug().Str("group", manifestGroupID).Msg("listing manifest group blocks")
blocks, err := m.b.ListGroupBlocks(manifestGroupID)
log.Debug().Msg("listing manifest blocks")
blocks, err := m.b.ListBlocks(manifestBlockPrefix)
if err != nil {
return fmt.Errorf("unable to list manifest blocks: %v", err)
}
log.Printf("loaded %v blocks", len(blocks))
return m.loadManifestBlocks(blocks)
}

View File

@@ -67,7 +67,8 @@ func TestManifest(t *testing.T) {
verifyItem(t, mgr, id2, labels2, item2)
verifyItem(t, mgr, id3, labels3, item3)
// verify in new manager
// flush underlying block manager and verify in new manifest manager.
mgr.b.Flush()
mgr2, err := newManagerForTesting(t, data, keyTime)
if err != nil {
t.Fatalf("can't open block manager: %v", err)
@@ -98,9 +99,9 @@ func TestManifest(t *testing.T) {
t.Errorf("can't compact: %v", err)
}
blks, err := mgr.b.ListGroupBlocks(manifestGroupID)
blks, err := mgr.b.ListBlocks(manifestBlockPrefix)
if err != nil {
t.Errorf("unable to list manifest group blocks: %v", err)
t.Errorf("unable to list manifest blocks: %v", err)
}
if got, want := len(blks), 1; got != want {
t.Errorf("unexpected number of blocks: %v, want %v", got, want)

View File

@@ -25,7 +25,7 @@ type Reader interface {
type blockManager interface {
BlockInfo(blockID string) (block.Info, error)
GetBlock(blockID string) ([]byte, error)
WriteBlock(packGroup string, data []byte) (string, error)
WriteBlock(data []byte, prefix string) (string, error)
Flush() error
}
@@ -57,7 +57,7 @@ func (om *Manager) NewWriter(opt WriterOptions) Writer {
repo: om,
splitter: om.newSplitter(),
description: opt.Description,
packGroup: opt.PackGroup,
blockPrefix: opt.BlockPrefix,
}
if opt.splitter != nil {
@@ -171,7 +171,7 @@ func (om *Manager) verifyObjectInternal(oid ID, blocks *blockTracker) (int64, er
// ok to be used.
func (om *Manager) Flush() error {
om.writeBackWG.Wait()
return om.blockMgr.Flush()
return nil
}
func nullTrace(message string, args ...interface{}) {

View File

@@ -37,10 +37,10 @@ func (f *fakeBlockManager) GetBlock(blockID string) ([]byte, error) {
return nil, storage.ErrBlockNotFound
}
func (f *fakeBlockManager) WriteBlock(groupID string, data []byte) (string, error) {
func (f *fakeBlockManager) WriteBlock(data []byte, prefix string) (string, error) {
h := md5.New()
h.Write(data)
blockID := hex.EncodeToString(h.Sum(nil))
blockID := prefix + hex.EncodeToString(h.Sum(nil))
f.mu.Lock()
defer f.mu.Unlock()

View File

@@ -54,8 +54,8 @@ type objectWriter struct {
description string
splitter objectSplitter
packGroup string
splitter objectSplitter
blockPrefix string
pendingBlocksWG sync.WaitGroup
@@ -97,7 +97,7 @@ func (w *objectWriter) flushBuffer() error {
w.buffer.Reset()
do := func() {
blockID, err := w.repo.blockMgr.WriteBlock(w.packGroup, b2.Bytes())
blockID, err := w.repo.blockMgr.WriteBlock(b2.Bytes(), w.blockPrefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length)
if err != nil {
w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err))
@@ -146,7 +146,7 @@ func (w *objectWriter) Result() (ID, error) {
repo: w.repo,
description: "LIST(" + w.description + ")",
splitter: w.repo.newSplitter(),
packGroup: w.packGroup,
blockPrefix: w.blockPrefix,
}
jw := jsonstream.NewWriter(iw, indirectStreamType)
@@ -164,7 +164,7 @@ func (w *objectWriter) Result() (ID, error) {
// WriterOptions can be passed to Repository.NewWriter()
type WriterOptions struct {
Description string
PackGroup string
BlockPrefix string
splitter objectSplitter
}

View File

@@ -34,11 +34,10 @@ type NewRepositoryOptions struct {
ObjectHMACSecret []byte // force the use of particular object HMAC secret
ObjectEncryptionKey []byte // force the use of particular object encryption key
Splitter string // splitter used to break objects into storage blocks
MinBlockSize int // minimum block size used with dynamic splitter
AvgBlockSize int // approximate size of storage block (used with dynamic splitter)
MaxBlockSize int // maximum size of storage block
MaxPackedContentLength int // maximum size of object to be considered for storage in a pack
Splitter string // splitter used to break objects into storage blocks
MinBlockSize int // minimum block size used with dynamic splitter
AvgBlockSize int // approximate size of storage block (used with dynamic splitter)
MaxBlockSize int // maximum size of storage block
// test-only
noHMAC bool // disable HMAC
@@ -84,12 +83,11 @@ func formatBlockFromOptions(opt *NewRepositoryOptions) *formatBlock {
func repositoryObjectFormatFromOptions(opt *NewRepositoryOptions) *config.RepositoryObjectFormat {
f := &config.RepositoryObjectFormat{
FormattingOptions: block.FormattingOptions{
Version: 1,
BlockFormat: applyDefaultString(opt.BlockFormat, block.DefaultFormat),
HMACSecret: applyDefaultRandomBytes(opt.ObjectHMACSecret, 32),
MasterKey: applyDefaultRandomBytes(opt.ObjectEncryptionKey, 32),
MaxPackedContentLength: applyDefaultInt(opt.MaxPackedContentLength, 4<<20), // 4 MB
MaxPackSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20 MB
Version: 1,
BlockFormat: applyDefaultString(opt.BlockFormat, block.DefaultFormat),
HMACSecret: applyDefaultRandomBytes(opt.ObjectHMACSecret, 32),
MasterKey: applyDefaultRandomBytes(opt.ObjectEncryptionKey, 32),
MaxPackSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20 MB
},
Splitter: applyDefaultString(opt.Splitter, object.DefaultSplitter),
MaxBlockSize: applyDefaultInt(opt.MaxBlockSize, 20<<20), // 20MiB

View File

@@ -29,6 +29,9 @@ func (r *Repository) Close() error {
if err := r.Objects.Close(); err != nil {
return err
}
if err := r.Blocks.Flush(); err != nil {
return err
}
if err := r.Storage.Close(); err != nil {
return err
}
@@ -40,5 +43,9 @@ func (r *Repository) Flush() error {
if err := r.Manifests.Flush(); err != nil {
return err
}
return r.Objects.Flush()
if err := r.Objects.Flush(); err != nil {
return err
}
return r.Blocks.Flush()
}

View File

@@ -16,12 +16,10 @@
"github.com/rs/zerolog/log"
"github.com/kopia/kopia/block"
"github.com/kopia/kopia/auth"
"github.com/kopia/kopia/object"
"github.com/kopia/kopia/block"
"github.com/kopia/kopia/internal/storagetesting"
"github.com/kopia/kopia/object"
"github.com/kopia/kopia/storage"
)
@@ -38,8 +36,6 @@ func setupTestWithData(t *testing.T, data map[string][]byte, keyTime map[string]
Splitter: "FIXED",
BlockFormat: "TESTONLY_MD5",
MetadataEncryptionAlgorithm: "NONE",
MaxPackedContentLength: -1,
noHMAC: true,
}
@@ -91,6 +87,8 @@ func TestWriters(t *testing.T) {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String())
}
repo.Blocks.Flush()
if got, want := len(data), 3; got != want {
// 1 format block + 1 data block + 1 pack index block
t.Errorf("unexpected data written to the storage (%v), wanted %v: %v", len(data), 3, data)
@@ -134,7 +132,6 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
func TestPackingSimple(t *testing.T) {
data, keyTime, repo := setupTest(t, func(n *NewRepositoryOptions) {
n.MaxPackedContentLength = 10000
})
content1 := "hello, how do you do?"
@@ -146,8 +143,6 @@ func TestPackingSimple(t *testing.T) {
oid2a := writeObject(t, repo, []byte(content2), "packed-object-2a")
oid2b := writeObject(t, repo, []byte(content2), "packed-object-2b")
repo.Objects.Flush()
oid3a := writeObject(t, repo, []byte(content3), "packed-object-3a")
oid3b := writeObject(t, repo, []byte(content3), "packed-object-3b")
verify(t, repo, oid1a, []byte(content1), "packed-object-1")
@@ -156,6 +151,7 @@ func TestPackingSimple(t *testing.T) {
oid1c := writeObject(t, repo, []byte(content1), "packed-object-1c")
repo.Objects.Flush()
repo.Blocks.Flush()
if got, want := oid1a.String(), oid1b.String(); got != want {
t.Errorf("oid1a(%q) != oid1b(%q)", got, want)
@@ -173,20 +169,13 @@ func TestPackingSimple(t *testing.T) {
t.Errorf("oid3a(%q) != oid3b(%q)", got, want)
}
if got, want := len(data), 1+4; got != want {
// format + index + pack
if got, want := len(data), 3; got != want {
t.Errorf("got unexpected repository contents %v items, wanted %v", got, want)
for k, v := range data {
t.Logf("%v => %v", k, string(v))
}
}
repo.Close()
for k, v := range data {
log.Printf("data[%v] = %v", k, string(v))
}
data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) {
n.MaxPackedContentLength = 10000
})
verify(t, repo, oid1a, []byte(content1), "packed-object-1")
@@ -197,7 +186,6 @@ func TestPackingSimple(t *testing.T) {
t.Errorf("optimize error: %v", err)
}
data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) {
n.MaxPackedContentLength = 10000
})
verify(t, repo, oid1a, []byte(content1), "packed-object-1")
@@ -208,7 +196,6 @@ func TestPackingSimple(t *testing.T) {
t.Errorf("optimize error: %v", err)
}
data, _, repo = setupTestWithData(t, data, keyTime, func(n *NewRepositoryOptions) {
n.MaxPackedContentLength = 10000
})
verify(t, repo, oid1a, []byte(content1), "packed-object-1")

View File

@@ -224,7 +224,7 @@ func (u *Uploader) uploadDir(dir fs.Directory) (object.ID, object.ID, error) {
mw := u.repo.Objects.NewWriter(object.WriterOptions{
Description: "HASHCACHE:" + dir.Metadata().Name,
PackGroup: "HC",
BlockPrefix: "H",
})
defer mw.Close()
u.cacheWriter = hashcache.NewWriter(mw)
@@ -265,7 +265,7 @@ func uploadDirInternal(
writer := u.repo.Objects.NewWriter(object.WriterOptions{
Description: "DIR:" + relativePath,
PackGroup: "DIR",
BlockPrefix: "",
})
dw := dir.NewWriter(writer)