From 14e38b59bf61104bd18d76a3bfd82bbcb570b171 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 14 Apr 2018 16:25:03 -0700 Subject: [PATCH] cleaned up how active block index is loaded --- block/block_cache.go | 2 +- block/block_manager.go | 132 +++++++----------------- block/block_manager_test.go | 2 +- block/content_id.go | 59 ----------- block/content_id_test.go | 63 ----------- block/local_storage_cache.go | 4 +- block/null_block_cache.go | 5 +- internal/storagetesting/map.go | 10 +- internal/storagetesting/map_test.go | 2 +- manifest/manifest_manager_test.go | 12 +-- repo/repository_test.go | 2 +- storage/logging/logging_storage_test.go | 2 +- 12 files changed, 59 insertions(+), 236 deletions(-) delete mode 100644 block/content_id.go delete mode 100644 block/content_id_test.go diff --git a/block/block_cache.go b/block/block_cache.go index c778bf550..947824844 100644 --- a/block/block_cache.go +++ b/block/block_cache.go @@ -13,7 +13,7 @@ type blockCache interface { getBlock(ctx context.Context, cacheKey string, physicalBlockID PhysicalBlockID, offset, length int64) ([]byte, error) putBlock(ctx context.Context, blockID PhysicalBlockID, data []byte) error - listIndexBlocks(ctx context.Context, full bool) ([]IndexInfo, error) + listIndexBlocks(ctx context.Context, full bool, extraTime time.Duration) ([]IndexInfo, error) close() error } diff --git a/block/block_manager.go b/block/block_manager.go index 9a053f2ca..7defcfd13 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -12,7 +12,6 @@ "math/rand" "os" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -34,13 +33,9 @@ defaultMaxPreambleLength = 32 defaultPaddingUnit = 4096 maxInlineContentLength = 100000 // amount of block data to store in the index block itself - - autoCompactionBlockCount = 16 - autoCompactionSafetyMargin = 5 * time.Minute // do not auto-compact if time since block was written is less than this + autoCompactionBlockCount = 16 ) -var zeroTime time.Time - // Info is an information about a single block managed by Manager. type Info struct { BlockID ContentID `json:"blockID"` @@ -50,6 +45,11 @@ type Info struct { PackOffset int64 `json:"packOffset,omitempty"` } +// ContentID uniquely identifies a block of content stored in repository. +// It consists of optional one-character prefix (which can't be 0..9 or a..f) followed by hexa-decimal +// digits representing hash of the content. +type ContentID string + // PhysicalBlockID identifies physical storage block. type PhysicalBlockID string @@ -79,6 +79,7 @@ type Manager struct { pendingPackIndexes []packIndex // pending indexes of blocks that have been saved. flushPackIndexesAfter time.Time // time when those indexes should be flushed + activeBlocksExtraTime time.Duration maxInlineContentLength int maxPackSize int @@ -105,13 +106,6 @@ func (bm *Manager) DeleteBlock(blockID ContentID) error { return nil } -func (bm *Manager) addToIndexLocked(blockID ContentID, ndx packIndexBuilder, offset, size uint32) { - bm.assertLocked() - - ndx.addPackedBlock(blockID, offset, size) - bm.blockIDToIndex[blockID] = ndx -} - func (bm *Manager) addToPackLocked(ctx context.Context, blockID ContentID, data []byte) error { bm.assertLocked() @@ -127,9 +121,10 @@ func (bm *Manager) addToPackLocked(ctx context.Context, blockID ContentID, data offset := len(bm.currentPackData) shouldFinish := offset+len(data) >= bm.maxPackSize - bm.currentPackData = append(bm.currentPackData, data...) - bm.addToIndexLocked(blockID, bm.currentPackIndex, uint32(offset), uint32(len(data))) + + bm.currentPackIndex.addPackedBlock(blockID, uint32(offset), uint32(len(data))) + bm.blockIDToIndex[blockID] = bm.currentPackIndex if shouldFinish { if err := bm.finishPackAndMaybeFlushIndexes(ctx); err != nil { @@ -228,7 +223,7 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { if false { log.Printf("saving %v pack indexes", len(bm.pendingPackIndexes)) } - if _, err := bm.writePackIndexes(ctx, bm.pendingPackIndexes, nil); err != nil { + if _, err := bm.writePackIndexes(ctx, bm.pendingPackIndexes, false); err != nil { return err } } @@ -238,7 +233,7 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { return nil } -func (bm *Manager) writePackIndexes(ctx context.Context, ndx []packIndex, replacesBlockBeforeTime *time.Time) (PhysicalBlockID, error) { +func (bm *Manager) writePackIndexes(ctx context.Context, ndx []packIndex, isCompaction bool) (PhysicalBlockID, error) { pb := &blockmgrpb.Indexes{} for _, n := range ndx { @@ -250,8 +245,8 @@ func (bm *Manager) writePackIndexes(ctx context.Context, ndx []packIndex, replac } var suffix string - if replacesBlockBeforeTime != nil { - suffix = fmt.Sprintf("%v%x", compactedBlockSuffix, replacesBlockBeforeTime.UnixNano()) + if isCompaction { + suffix = compactedBlockSuffix } inverseTimePrefix := fmt.Sprintf("%016x", math.MaxInt64-time.Now().UnixNano()) @@ -296,7 +291,7 @@ func (bm *Manager) finishPackLocked(ctx context.Context) error { // ListIndexBlocks returns the list of all index blocks, including inactive, sorted by time. func (bm *Manager) ListIndexBlocks(ctx context.Context) ([]IndexInfo, error) { - blocks, err := bm.cache.listIndexBlocks(ctx, true) + blocks, err := bm.cache.listIndexBlocks(ctx, true, 0) if err != nil { return nil, fmt.Errorf("error listing index blocks: %v", err) } @@ -307,7 +302,7 @@ func (bm *Manager) ListIndexBlocks(ctx context.Context) ([]IndexInfo, error) { // ActiveIndexBlocks returns the list of active index blocks, sorted by time. func (bm *Manager) ActiveIndexBlocks(ctx context.Context) ([]IndexInfo, error) { - blocks, err := bm.cache.listIndexBlocks(ctx, false) + blocks, err := bm.cache.listIndexBlocks(ctx, false, bm.activeBlocksExtraTime) if err != nil { return nil, err } @@ -315,20 +310,8 @@ func (bm *Manager) ActiveIndexBlocks(ctx context.Context) ([]IndexInfo, error) { return nil, nil } - cutoffTime, err := findLatestCompactedTimestamp(blocks) - if err != nil { - return nil, err - } - - var activeBlocks []IndexInfo - for _, b := range blocks { - if b.Timestamp.After(cutoffTime) { - activeBlocks = append(activeBlocks, b) - } - } - - sortBlocksByTime(activeBlocks) - return activeBlocks, nil + sortBlocksByTime(blocks) + return blocks, nil } func sortBlocksByTime(b []IndexInfo) { @@ -337,32 +320,15 @@ func sortBlocksByTime(b []IndexInfo) { }) } -func findLatestCompactedTimestamp(blocks []IndexInfo) (time.Time, error) { - // look for blocks that end with -ztimestamp - // find the latest such timestamp. - var latestTime time.Time - - for _, b := range blocks { - blk := b.BlockID - if ts, ok := getCompactedTimestamp(blk); ok { - if ts.After(latestTime) { - latestTime = ts - } - } - } - - return latestTime, nil -} - -func (bm *Manager) loadMergedPackIndexLocked(ctx context.Context) ([]packIndex, []PhysicalBlockID, time.Time, error) { +func (bm *Manager) loadMergedPackIndexLocked(ctx context.Context) ([]packIndex, []PhysicalBlockID, error) { log.Debug().Msg("listing active index blocks") blocks, err := bm.ActiveIndexBlocks(ctx) if err != nil { - return nil, nil, zeroTime, err + return nil, nil, err } if len(blocks) == 0 { - return nil, nil, zeroTime, nil + return nil, nil, nil } // add block IDs to the channel @@ -417,7 +383,7 @@ func (bm *Manager) loadMergedPackIndexLocked(ctx context.Context) ([]packIndex, // Propagate async errors, if any. for err := range errors { - return nil, nil, time.Now(), err + return nil, nil, err } var merged []packIndex @@ -425,24 +391,24 @@ func (bm *Manager) loadMergedPackIndexLocked(ctx context.Context) ([]packIndex, merged = append(merged, pi...) } - return merged, blockIDs, blocks[len(blocks)-1].Timestamp, nil + return merged, blockIDs, nil } func (bm *Manager) initializeIndexes(ctx context.Context) error { bm.lock() defer bm.unlock() - merged, blockIDs, latestBlockTime, err := bm.loadMergedPackIndexLocked(ctx) + merged, blockIDs, err := bm.loadMergedPackIndexLocked(ctx) if err != nil { return err } - log.Debug().Msgf("loaded %v index blocks with latest time %v", len(blockIDs), latestBlockTime.Local()) + log.Debug().Msgf("loaded %v index blocks", len(blockIDs)) bm.blockIDToIndex, bm.packBlockIDToIndex = dedupeBlockIDsAndIndex(merged) - if len(blockIDs) >= autoCompactionBlockCount && latestBlockTime.Before(time.Now().Add(-autoCompactionSafetyMargin)) { + if len(blockIDs) >= autoCompactionBlockCount { log.Debug().Msgf("auto compacting block indexes (block count %v exceeds threshold of %v)", len(blockIDs), autoCompactionBlockCount) merged = removeEmptyIndexes(merged) - if _, err := bm.writePackIndexes(ctx, merged, &latestBlockTime); err != nil { + if _, err := bm.writePackIndexes(ctx, merged, true); err != nil { return err } } @@ -489,12 +455,12 @@ func (bm *Manager) CompactIndexes(ctx context.Context) error { bm.lock() defer bm.unlock() - merged, indexBlocks, latestBlockTime, err := bm.loadMergedPackIndexLocked(ctx) + merged, indexBlocks, err := bm.loadMergedPackIndexLocked(ctx) if err != nil { return err } - if err := bm.compactIndexes(ctx, merged, indexBlocks, latestBlockTime); err != nil { + if err := bm.compactIndexes(ctx, merged, indexBlocks); err != nil { return err } @@ -546,15 +512,13 @@ func newInfo(blockID ContentID, ndx packIndex) (Info, error) { }, nil } -func (bm *Manager) compactIndexes(ctx context.Context, merged []packIndex, blockIDs []PhysicalBlockID, latestBlockTime time.Time) error { - dedupeBlockIDsAndIndex(merged) - merged = removeEmptyIndexes(merged) +func (bm *Manager) compactIndexes(ctx context.Context, merged []packIndex, blockIDs []PhysicalBlockID) error { if len(blockIDs) <= 1 { log.Printf("skipping index compaction - already compacted") return nil } - _, err := bm.writePackIndexes(ctx, merged, &latestBlockTime) + _, err := bm.writePackIndexes(ctx, merged, true) return err } @@ -620,22 +584,12 @@ func (bm *Manager) Repackage(ctx context.Context, maxLength uint64) error { bm.lock() defer bm.unlock() - merged, _, _, err := bm.loadMergedPackIndexLocked(ctx) - if err != nil { - return err - } - var toRepackage []packIndex var totalBytes uint64 - for _, m := range merged { - bi, ok := bm.packBlockIDToIndex[m.packBlockID()] - if !ok { - return fmt.Errorf("unable to get info on pack block %q", m.packBlockID()) - } - + for _, bi := range bm.packBlockIDToIndex { if bi.packLength() <= maxLength { - toRepackage = append(toRepackage, m) + toRepackage = append(toRepackage, bi) totalBytes += bi.packLength() } } @@ -910,8 +864,8 @@ type cachedList struct { // If 'full' is set to true, this function lists and returns all blocks, // if 'full' is false, the function returns only blocks from the last 2 compactions. // The list of blocks is not guaranteed to be sorted. -func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage, full bool) ([]IndexInfo, error) { - maxCompactions := 2 +func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage, full bool, extraTime time.Duration) ([]IndexInfo, error) { + maxCompactions := 1 if full { maxCompactions = math.MaxInt32 } @@ -941,10 +895,10 @@ func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage, full bo } results = append(results, ii) - if ts, ok := getCompactedTimestamp(ii.BlockID); ok { + if strings.Contains(string(ii.BlockID), compactedBlockSuffix) { numCompactions++ if numCompactions == maxCompactions { - timestampCutoff = ts.Add(-10 * time.Minute) + timestampCutoff = it.TimeStamp.Add(-extraTime) } } } @@ -1001,15 +955,3 @@ func newManagerWithTime(ctx context.Context, st storage.Storage, f FormattingOpt return m, nil } -func getCompactedTimestamp(blk PhysicalBlockID) (time.Time, bool) { - if p := strings.Index(string(blk), compactedBlockSuffix); p >= 0 { - unixNano, err := strconv.ParseInt(string(blk)[p+len(compactedBlockSuffix):], 16, 64) - if err != nil { - return time.Time{}, false - } - - return time.Unix(0, unixNano), true - } - - return time.Time{}, false -} diff --git a/block/block_manager_test.go b/block/block_manager_test.go index 7680ea6ba..4b836d815 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -482,7 +482,7 @@ func TestDeleteAndRecreate(t *testing.T) { } func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, timeFunc func() time.Time) *Manager { - st := storagetesting.NewMapStorage(data, keyTime) + st := storagetesting.NewMapStorage(data, keyTime, timeFunc) //st = logging.NewWrapper(st) if timeFunc == nil { timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1) diff --git a/block/content_id.go b/block/content_id.go deleted file mode 100644 index 089d8d8ec..000000000 --- a/block/content_id.go +++ /dev/null @@ -1,59 +0,0 @@ -package block - -import ( - "encoding/hex" - "fmt" -) - -// ContentID uniquely identifies a block of content stored in repository. -// It consists of optional one-character prefix (which can't be 0..9 or a..f) followed by hexa-decimal -// digits representing hash of the content. -type ContentID string - -func packContentID(c ContentID) ([]byte, error) { - if len(c) < 2 { - return nil, fmt.Errorf("invalid content ID: %q", c) - } - - var hexDigits ContentID - var prefix byte - - if !isHex(c[0]) { - hexDigits = c[1:] - prefix = c[0] - } else { - hexDigits = c - prefix = 0 - } - - result := make([]byte, 1+len(hexDigits)/2) - result[0] = prefix - if _, err := hex.Decode(result[1:], []byte(hexDigits)); err != nil { - return nil, fmt.Errorf("unable to decode content hash: %v", err) - } - - return result, nil -} - -func unpackContentID(b []byte) (ContentID, error) { - if len(b) <= 1 { - return "", fmt.Errorf("invalid content ID: %x", b) - } - - var prefix string - if b[0] != 0 { - prefix = string(b[0:1]) - } - return ContentID(prefix + hex.EncodeToString(b[1:])), nil -} - -func isHex(b byte) bool { - if b >= '0' && b <= '9' { - return true - } - if b >= 'a' && b <= 'f' { - return true - } - - return false -} diff --git a/block/content_id_test.go b/block/content_id_test.go deleted file mode 100644 index 05b550175..000000000 --- a/block/content_id_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package block - -import ( - "reflect" - "strings" - "testing" -) - -func TestContentID(t *testing.T) { - cases := []struct { - contentID ContentID - want []byte - }{ - {"abcdef", []byte{0x00, 0xab, 0xcd, 0xef}}, - {"zabcdef", []byte{0x7a, 0xab, 0xcd, 0xef}}, - {"iabcdef", []byte{0x69, 0xab, 0xcd, 0xef}}, - } - - for _, tc := range cases { - got, err := packContentID(tc.contentID) - if err != nil { - t.Errorf("unable to pack %q: %v", tc.contentID, err) - continue - } - - if !reflect.DeepEqual(got, tc.want) { - t.Errorf("invalid packed content for %q: %x wanted %x", tc.contentID, got, tc.want) - } - - rtt, err := unpackContentID(got) - if err != nil { - t.Errorf("unable to round-trip %q: %v", tc.contentID, err) - continue - } - - if rtt != tc.contentID { - t.Errorf("failed to round trip: %q, got %q", tc.contentID, rtt) - } - - } -} - -func TestInvalidContentID(t *testing.T) { - cases := []struct { - contentID ContentID - err string - }{ - {"", "invalid content ID"}, - {"a", "invalid content ID"}, - {"aabcdef", "odd length hex string"}, - } - - for _, tc := range cases { - _, err := packContentID(tc.contentID) - if err == nil { - t.Errorf("unexpected success when packing %q, wanted %v", tc.contentID, tc.err) - continue - } - if !strings.Contains(err.Error(), tc.err) { - t.Errorf("invalid error when packing %q: %v, wanted %q", tc.contentID, err, tc.err) - } - } -} diff --git a/block/local_storage_cache.go b/block/local_storage_cache.go index 04436ae67..5046c7497 100644 --- a/block/local_storage_cache.go +++ b/block/local_storage_cache.go @@ -80,7 +80,7 @@ func (c *localStorageCache) putBlock(ctx context.Context, blockID PhysicalBlockI return c.st.PutBlock(ctx, string(blockID), bytes.NewReader(data)) } -func (c *localStorageCache) listIndexBlocks(ctx context.Context, full bool) ([]IndexInfo, error) { +func (c *localStorageCache) listIndexBlocks(ctx context.Context, full bool, extraTime time.Duration) ([]IndexInfo, error) { var cachedListBlockID string if full { @@ -101,7 +101,7 @@ func (c *localStorageCache) listIndexBlocks(ctx context.Context, full bool) ([]I } log.Debug().Bool("full", full).Msg("listing index blocks from source") - blocks, err := listIndexBlocksFromStorage(ctx, c.st, full) + blocks, err := listIndexBlocksFromStorage(ctx, c.st, full, extraTime) if err == nil { c.saveListToCache(ctx, cachedListBlockID, &cachedList{ Blocks: blocks, diff --git a/block/null_block_cache.go b/block/null_block_cache.go index 50a5c75fd..edf7f5dd0 100644 --- a/block/null_block_cache.go +++ b/block/null_block_cache.go @@ -3,6 +3,7 @@ import ( "bytes" "context" + "time" "github.com/kopia/kopia/storage" ) @@ -19,8 +20,8 @@ func (c nullBlockCache) putBlock(ctx context.Context, blockID PhysicalBlockID, d return c.st.PutBlock(ctx, string(blockID), bytes.NewReader(data)) } -func (c nullBlockCache) listIndexBlocks(ctx context.Context, full bool) ([]IndexInfo, error) { - return listIndexBlocksFromStorage(ctx, c.st, full) +func (c nullBlockCache) listIndexBlocks(ctx context.Context, full bool, extraTime time.Duration) ([]IndexInfo, error) { + return listIndexBlocksFromStorage(ctx, c.st, full, extraTime) } func (c nullBlockCache) close() error { diff --git a/internal/storagetesting/map.go b/internal/storagetesting/map.go index 3c2bcd37e..56741a424 100644 --- a/internal/storagetesting/map.go +++ b/internal/storagetesting/map.go @@ -15,6 +15,7 @@ type mapStorage struct { data map[string][]byte keyTime map[string]time.Time + timeNow func() time.Time mutex sync.RWMutex } @@ -51,7 +52,7 @@ func (s *mapStorage) PutBlock(ctx context.Context, id string, r io.Reader) error return nil } - s.keyTime[id] = time.Now() + s.keyTime[id] = s.timeNow() s.data[id] = append([]byte{}, data...) return nil } @@ -107,9 +108,12 @@ func (s *mapStorage) ConnectionInfo() storage.ConnectionInfo { // NewMapStorage returns an implementation of Storage backed by the contents of given map. // Used primarily for testing. -func NewMapStorage(data map[string][]byte, keyTime map[string]time.Time) storage.Storage { +func NewMapStorage(data map[string][]byte, keyTime map[string]time.Time, timeNow func() time.Time) storage.Storage { if keyTime == nil { keyTime = make(map[string]time.Time) } - return &mapStorage{data: data, keyTime: keyTime} + if timeNow == nil { + timeNow = time.Now + } + return &mapStorage{data: data, keyTime: keyTime, timeNow: timeNow} } diff --git a/internal/storagetesting/map_test.go b/internal/storagetesting/map_test.go index 380a44fb6..238276cbd 100644 --- a/internal/storagetesting/map_test.go +++ b/internal/storagetesting/map_test.go @@ -7,7 +7,7 @@ func TestMapStorage(t *testing.T) { data := map[string][]byte{} - r := NewMapStorage(data, nil) + r := NewMapStorage(data, nil, nil) if r == nil { t.Errorf("unexpected result: %v", r) } diff --git a/manifest/manifest_manager_test.go b/manifest/manifest_manager_test.go index f9eee09cd..9cddf38f7 100644 --- a/manifest/manifest_manager_test.go +++ b/manifest/manifest_manager_test.go @@ -6,7 +6,6 @@ "reflect" "sort" "testing" - "time" "github.com/kopia/kopia/internal/storagetesting" @@ -16,8 +15,7 @@ func TestManifest(t *testing.T) { ctx := context.Background() data := map[string][]byte{} - keyTime := map[string]time.Time{} - mgr, setupErr := newManagerForTesting(ctx, t, data, keyTime) + mgr, setupErr := newManagerForTesting(ctx, t, data) if setupErr != nil { t.Fatalf("unable to open block manager: %v", setupErr) } @@ -71,7 +69,7 @@ func TestManifest(t *testing.T) { // flush underlying block manager and verify in new manifest manager. mgr.b.Flush(ctx) - mgr2, setupErr := newManagerForTesting(ctx, t, data, keyTime) + mgr2, setupErr := newManagerForTesting(ctx, t, data) if setupErr != nil { t.Fatalf("can't open block manager: %v", setupErr) } @@ -111,7 +109,7 @@ func TestManifest(t *testing.T) { mgr.b.Flush(ctx) - mgr3, err := newManagerForTesting(ctx, t, data, keyTime) + mgr3, err := newManagerForTesting(ctx, t, data) if err != nil { t.Fatalf("can't open manager: %v", err) } @@ -172,8 +170,8 @@ func verifyMatches(t *testing.T, mgr *Manager, labels map[string]string, expecte } } -func newManagerForTesting(ctx context.Context, t *testing.T, data map[string][]byte, keyTime map[string]time.Time) (*Manager, error) { - st := storagetesting.NewMapStorage(data, keyTime) +func newManagerForTesting(ctx context.Context, t *testing.T, data map[string][]byte) (*Manager, error) { + st := storagetesting.NewMapStorage(data, nil, nil) bm, err := block.NewManager(ctx, st, block.FormattingOptions{ BlockFormat: "TESTONLY_MD5", diff --git a/repo/repository_test.go b/repo/repository_test.go index e51694dcf..afb34e72b 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -28,7 +28,7 @@ func setupTest(t *testing.T, mods ...func(o *NewRepositoryOptions)) (map[string] } func setupTestWithData(t *testing.T, data map[string][]byte, keyTime map[string]time.Time, mods ...func(o *NewRepositoryOptions)) (map[string][]byte, map[string]time.Time, *Repository) { - st := storagetesting.NewMapStorage(data, keyTime) + st := storagetesting.NewMapStorage(data, keyTime, nil) creds, _ := auth.Password("foobarbazfoobarbaz") opt := &NewRepositoryOptions{ diff --git a/storage/logging/logging_storage_test.go b/storage/logging/logging_storage_test.go index 281673f40..a801cfc20 100644 --- a/storage/logging/logging_storage_test.go +++ b/storage/logging/logging_storage_test.go @@ -9,7 +9,7 @@ func TestLoggingStorage(t *testing.T) { data := map[string][]byte{} - r := NewWrapper(storagetesting.NewMapStorage(data, nil)) + r := NewWrapper(storagetesting.NewMapStorage(data, nil, nil)) if r == nil { t.Errorf("unexpected result: %v", r) }