diff --git a/Makefile b/Makefile index 24f6ec170..2ce9e2948 100644 --- a/Makefile +++ b/Makefile @@ -18,9 +18,6 @@ install-race: @echo Building version: $(BUILD_INFO) / $(BUILD_VERSION) go install -race -ldflags $(LDARGS) github.com/kopia/kopia -protos: - protoc --gofast_out=. internal/blockmgrpb/*.proto - build: go build github.com/kopia/kopia/... diff --git a/block/block_manager.go b/block/block_manager.go index bddaf952c..799b1fdd1 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -12,14 +12,14 @@ "math/rand" "os" "path/filepath" + "reflect" "sort" "strings" "sync" "sync/atomic" "time" - "github.com/gogo/protobuf/proto" - "github.com/kopia/kopia/internal/blockmgrpb" + "github.com/kopia/kopia/internal/packindex" "github.com/kopia/kopia/storage" "github.com/rs/zerolog/log" ) @@ -27,12 +27,11 @@ const ( parallelFetches = 5 // number of parallel reads goroutines flushPackIndexTimeout = 10 * time.Minute // time after which all pending indexes are flushes - indexBlockPrefix = "i" // prefix for all storage blocks that are pack indexes + newIndexBlockPrefix = "n" compactedBlockSuffix = "-z" defaultMinPreambleLength = 32 defaultMaxPreambleLength = 32 defaultPaddingUnit = 4096 - maxInlineContentLength = 100000 // amount of block data to store in the index block itself autoCompactionBlockCount = 16 defaultActiveBlocksExtraTime = 10 * time.Minute @@ -42,29 +41,15 @@ ) // Info is an information about a single block managed by Manager. -type Info struct { - BlockID ContentID `json:"blockID"` - Length uint32 `json:"length"` - TimestampNanos int64 `json:"time"` - PackBlockID PhysicalBlockID `json:"packBlockID,omitempty"` - PackOffset uint32 `json:"packOffset,omitempty"` - Deleted bool `json:"deleted"` - Payload []byte `json:"payload"` // set for payloads stored inline - FormatVersion int32 `json:"formatVersion"` -} - -// Timestamp returns the time when a block was created or deleted. -func (i Info) Timestamp() time.Time { - return time.Unix(0, i.TimestampNanos) -} +type Info = packindex.Info // ContentID uniquely identifies a block of content stored in repository. // It consists of optional one-character prefix (which can't be 0..9 or a..f) followed by hexa-decimal // digits representing hash of the content. -type ContentID string +type ContentID = packindex.ContentID // PhysicalBlockID identifies physical storage block. -type PhysicalBlockID string +type PhysicalBlockID = packindex.PhysicalBlockID // IndexInfo is an information about a single index block managed by Manager. type IndexInfo struct { @@ -84,22 +69,18 @@ type Manager struct { locked bool checkInvariantsOnUnlock bool - committedBlocks committedBlockIndex - - pendingBlocks map[ContentID]Info // maps block ID to corresponding info - pendingPackIndexes []packIndex - - currentPackDataLength int // length of the current block - currentPackIndex packIndexBuilder // index of a current block + currentPackItems map[ContentID]Info // blocks that are in the pack block currently being built (all inline) + currentPackDataLength int // total length of all items in the current pack block + packIndexBuilder packindex.Builder // blocks that are in index currently being built (current pack and all packs saved but not committed) + committedBlocks committedBlockIndex flushPackIndexesAfter time.Time // time when those indexes should be flushed activeBlocksExtraTime time.Duration writeFormatVersion int32 // format version to write - maxInlineContentLength int - maxPackSize int - formatter Formatter + maxPackSize int + formatter Formatter minPreambleLength int maxPreambleLength int @@ -117,7 +98,7 @@ func (bm *Manager) DeleteBlock(blockID ContentID) error { defer bm.unlock() // We have this block in current pack index and it's already deleted there. - if bi, ok := bm.pendingBlocks[blockID]; ok && bi.Deleted { + if bi, ok := bm.packIndexBuilder[blockID]; ok && bi.Deleted { return nil } @@ -127,28 +108,31 @@ func (bm *Manager) DeleteBlock(blockID ContentID) error { } // Add deletion to current pack. - bm.currentPackIndex.deleteBlock(blockID) - bm.pendingBlocks[blockID] = Info{ - BlockID: blockID, - Deleted: true, - TimestampNanos: bm.currentPackIndex.createTimeNanos(), - } + bm.setPendingBlock(Info{ + BlockID: blockID, + Deleted: true, + TimestampSeconds: bm.timeNow().Unix(), + }) return nil } +func (bm *Manager) setPendingBlock(i Info) { + bm.packIndexBuilder.Add(i) + bm.currentPackItems[i.BlockID] = i +} + func (bm *Manager) addToPackLocked(ctx context.Context, blockID ContentID, data []byte) error { bm.assertLocked() bm.currentPackDataLength += len(data) shouldFinish := bm.currentPackDataLength >= bm.maxPackSize - bm.currentPackIndex.addInlineBlock(blockID, data) - bm.pendingBlocks[blockID] = Info{ - BlockID: blockID, - Payload: data, - Length: uint32(len(data)), - TimestampNanos: bm.currentPackIndex.createTimeNanos(), - } + bm.setPendingBlock(Info{ + BlockID: blockID, + Payload: data, + Length: uint32(len(data)), + TimestampSeconds: bm.timeNow().Unix(), + }) if shouldFinish { if err := bm.finishPackAndMaybeFlushIndexes(ctx); err != nil { @@ -185,89 +169,128 @@ func (bm *Manager) ResetStats() { func (bm *Manager) verifyInvariantsLocked() { bm.assertLocked() + + bm.verifyCurrentPackItemsLocked() + bm.verifyPackIndexBuilderLocked() +} + +func (bm *Manager) verifyCurrentPackItemsLocked() { + for k, cpi := range bm.currentPackItems { + if cpi.BlockID != k { + bm.invariantViolated("block ID entry has invalid key: %v %v", cpi.BlockID, k) + } + if cpi.PackBlockID != "" { + bm.invariantViolated("block ID entry has unexpected pack block ID %v: %v", cpi.PackBlockID) + } + if cpi.Deleted == (cpi.Payload != nil) { + bm.invariantViolated("block can't be both deleted and have a payload: %v", cpi.BlockID) + } + if cpi.TimestampSeconds == 0 { + bm.invariantViolated("block has no timestamp: %v", cpi.BlockID) + } + bi, ok := bm.packIndexBuilder[k] + if !ok { + bm.invariantViolated("block ID entry not present in pack index builder: %v", cpi.BlockID) + } + if !reflect.DeepEqual(*bi, cpi) { + bm.invariantViolated("current pack index does not match pack index builder: %v", cpi, *bi) + } + } +} + +func (bm *Manager) verifyPackIndexBuilderLocked() { + for k, cpi := range bm.packIndexBuilder { + if cpi.BlockID != k { + bm.invariantViolated("block ID entry has invalid key: %v %v", cpi.BlockID, k) + } + if _, ok := bm.currentPackItems[cpi.BlockID]; ok { + // ignore blocks also in currentPackItems + continue + } + if cpi.Deleted { + if cpi.PackBlockID != "" { + bm.invariantViolated("block can't be both deleted and have a pack block: %v", cpi.BlockID) + } + } else { + if cpi.PackBlockID == "" { + bm.invariantViolated("block that's not deleted must have a pack block: %+v", cpi) + } + if cpi.FormatVersion != byte(bm.writeFormatVersion) { + bm.invariantViolated("block that's not deleted must have a valid format version: %+v", cpi) + } + } + if cpi.TimestampSeconds == 0 { + bm.invariantViolated("block has no timestamp: %v", cpi.BlockID) + } + } +} + +func (bm *Manager) invariantViolated(msg string, arg ...interface{}) { + if len(arg) > 0 { + msg = fmt.Sprintf(msg, arg...) + } + + panic(msg) } func (bm *Manager) startPackIndexLocked() { - bm.currentPackIndex = newPackIndex(bm.timeNow().UnixNano()) + bm.currentPackItems = make(map[ContentID]Info) bm.currentPackDataLength = 0 } func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { - if len(bm.pendingPackIndexes) > 0 { - if _, err := bm.writePackIndexes(ctx, indexesToProto(bm.pendingPackIndexes), false); err != nil { + if len(bm.packIndexBuilder) > 0 { + var buf bytes.Buffer + + if err := bm.packIndexBuilder.Build(&buf); err != nil { + return fmt.Errorf("unable to build pack index: %v", err) + } + + indexBlockID, err := bm.writePackIndexesNew(ctx, buf.Bytes(), false) + if err != nil { return err } - bm.pendingPackIndexes = nil + + if err := bm.committedBlocks.commit(indexBlockID, bm.packIndexBuilder); err != nil { + return fmt.Errorf("unable to commit: %v", err) + } + bm.packIndexBuilder = packindex.NewBuilder() } bm.flushPackIndexesAfter = bm.timeNow().Add(flushPackIndexTimeout) - bm.committedBlocks.commit("", bm.pendingBlocks) return nil } -func indexesToProto(ndx []packIndex) *blockmgrpb.Indexes { - pb := &blockmgrpb.Indexes{} - - for _, n := range ndx { - n.addToIndexes(pb) - } - return pb -} - -func (bm *Manager) writePackIndexes(ctx context.Context, pb *blockmgrpb.Indexes, isCompaction bool) (PhysicalBlockID, error) { - data, err := proto.Marshal(pb) - if err != nil { - return "", fmt.Errorf("can't encode pack index: %v", err) - } - +func (bm *Manager) writePackIndexesNew(ctx context.Context, data []byte, isCompaction bool) (PhysicalBlockID, error) { var suffix string if isCompaction { suffix = compactedBlockSuffix } inverseTimePrefix := fmt.Sprintf("%016x", math.MaxInt64-time.Now().UnixNano()) - return bm.encryptAndWriteBlockNotLocked(ctx, data, indexBlockPrefix+inverseTimePrefix, suffix) + return bm.encryptAndWriteBlockNotLocked(ctx, data, newIndexBlockPrefix+inverseTimePrefix, suffix) } func (bm *Manager) finishPackLocked(ctx context.Context) error { - if isIndexEmpty(bm.currentPackIndex) { + if len(bm.currentPackItems) == 0 { + log.Printf("no current pack entries") return nil } - if bm.currentPackDataLength > 0 && bm.currentPackDataLength > bm.maxInlineContentLength { - if err := bm.writePackBlock(ctx); err != nil { - return fmt.Errorf("error writing pack block: %v", err) - } + if err := bm.writePackBlockLocked(ctx); err != nil { + return fmt.Errorf("error writing pack block: %v", err) } - bm.pendingPackIndexes = append(bm.pendingPackIndexes, bm.currentPackIndex) bm.startPackIndexLocked() return nil } -func (bm *Manager) writePackBlock(ctx context.Context) error { - blockData, err := appendRandomBytes(nil, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength) +func (bm *Manager) writePackBlockLocked(ctx context.Context) error { + bm.assertLocked() + + blockData, pending, err := bm.preparePackDataBlock() if err != nil { - return err - } - - items := bm.currentPackIndex.clearInlineBlocks() - for blockID, data := range items { - encrypted, encerr := bm.maybeEncryptBlockDataForPacking(data, blockID) - if encerr != nil { - return fmt.Errorf("unable to encrypt %q: %v", blockID, err) - } - bm.currentPackIndex.addPackedBlock(blockID, uint32(len(blockData)), uint32(len(data))) - blockData = append(blockData, encrypted...) - } - - if bm.paddingUnit > 0 { - if missing := bm.paddingUnit - (len(blockData) % bm.paddingUnit); missing > 0 { - blockData, err = appendRandomBytes(blockData, missing) - if err != nil { - return err - } - } + return fmt.Errorf("error preparing data block: %v", err) } packBlockID, err := bm.writePackDataNotLocked(ctx, blockData) @@ -275,10 +298,59 @@ func (bm *Manager) writePackBlock(ctx context.Context) error { return fmt.Errorf("can't save pack data block %q: %v", packBlockID, err) } - bm.currentPackIndex.finishPack(packBlockID, uint32(len(blockData)), bm.writeFormatVersion) + for _, info := range pending { + info.PackBlockID = packBlockID + bm.packIndexBuilder.Add(info) + } + return nil } +func (bm *Manager) preparePackDataBlock() ([]byte, map[ContentID]Info, error) { + blockData, err := appendRandomBytes(nil, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength) + if err != nil { + return nil, nil, fmt.Errorf("unable to prepare block preamble: %v", err) + } + + pending := map[ContentID]Info{} + for blockID, info := range bm.currentPackItems { + if info.Deleted { + continue + } + var encrypted []byte + encrypted, err = bm.maybeEncryptBlockDataForPacking(info.Payload, info.BlockID) + if err != nil { + return nil, nil, fmt.Errorf("unable to encrypt %q: %v", blockID, err) + } + + pending[blockID] = Info{ + BlockID: blockID, + FormatVersion: byte(bm.writeFormatVersion), + PackOffset: uint32(len(blockData)), + Length: uint32(len(info.Payload)), + TimestampSeconds: info.TimestampSeconds, + } + + blockData = append(blockData, encrypted...) + } + + if len(pending) == 0 { + return nil, nil, nil + } + + if bm.paddingUnit > 0 { + if missing := bm.paddingUnit - (len(blockData) % bm.paddingUnit); missing > 0 { + blockData, err = appendRandomBytes(blockData, missing) + if err != nil { + return nil, nil, fmt.Errorf("unable to prepare block postamble: %v", err) + } + } + } + + return blockData, pending, nil + +} + func (bm *Manager) maybeEncryptBlockDataForPacking(data []byte, blockID ContentID) ([]byte, error) { if bm.writeFormatVersion == 0 { // in v0 the entire block is encrypted together later on @@ -332,7 +404,6 @@ func sortBlocksByTime(b []IndexInfo) { } func (bm *Manager) loadPackIndexesLocked(ctx context.Context) ([]PhysicalBlockID, error) { - log.Debug().Msg("listing active index blocks") blocks, err := bm.ActiveIndexBlocks(ctx) if err != nil { return nil, err @@ -347,8 +418,6 @@ func (bm *Manager) loadPackIndexesLocked(ctx context.Context) ([]PhysicalBlockID return indexBlockIDs, nil } - log.Debug().Int("parallelism", parallelFetches).Int("count", len(ch)).Msg("loading active blocks") - var wg sync.WaitGroup errors := make(chan error, parallelFetches) @@ -410,27 +479,8 @@ func indexBlockIDs(blocks []IndexInfo) []PhysicalBlockID { return indexBlockIDs } -func (bm *Manager) parsePackIndexes(data []byte) ([]packIndex, error) { - var b blockmgrpb.Indexes - - if err := proto.Unmarshal(data, &b); err != nil { - return nil, err - } - - var result []packIndex - for _, ndx := range b.Indexes { - result = append(result, protoPackIndex{ndx, true}) - } - return result, nil -} - func (bm *Manager) loadPackIndexes(indexBlockID PhysicalBlockID, data []byte) error { - indexes, err := bm.parsePackIndexes(data) - if err != nil { - return fmt.Errorf("unable to parse pack indexes from %q: %v", indexBlockID, err) - } - - if _, err := bm.committedBlocks.load(indexBlockID, indexes); err != nil { + if _, err := bm.committedBlocks.load(indexBlockID, data); err != nil { return fmt.Errorf("unable to add to committed block cache: %v", err) } @@ -445,7 +495,6 @@ func (bm *Manager) fetchCommittedIndexAndMaybeCompact(ctx context.Context, compa if err != nil { return err } - log.Debug().Msgf("loaded %v index blocks", len(indexBlocks)) if len(indexBlocks) > compactThresold { log.Info().Msgf("compacting block indexes (block count %v exceeds threshold of %v)", len(indexBlocks), compactThresold) @@ -471,12 +520,15 @@ func (bm *Manager) ListBlocks(prefix ContentID) ([]Info, error) { if i.Deleted || !strings.HasPrefix(string(i.BlockID), string(prefix)) { return nil } + if bi, ok := bm.packIndexBuilder[i.BlockID]; ok && bi.Deleted { + return nil + } result = append(result, i) return nil } - for _, ndx := range bm.pendingPackIndexes { - _ = ndx.iterate(appendToResult) + for _, bi := range bm.packIndexBuilder { + _ = appendToResult(*bi) } _ = bm.committedBlocks.listBlocks(prefix, appendToResult) @@ -490,27 +542,30 @@ func (bm *Manager) compactIndexes(ctx context.Context, indexBlocks []PhysicalBlo return nil } - var pb blockmgrpb.Indexes + bld := packindex.NewBuilder() for _, indexBlock := range indexBlocks { data, err := bm.getPhysicalBlockInternal(ctx, indexBlock) if err != nil { return err } - indexes, err := bm.parsePackIndexes(data) + index, err := packindex.Open(bytes.NewReader(data)) if err != nil { - return fmt.Errorf("unable to parse pack index %q: %v", indexBlock, err) - } - - for _, ndx := range indexes { - dst := newPackIndex(ndx.createTimeNanos()) - copyPackIndex(dst, ndx) - dst.addToIndexes(&pb) + return fmt.Errorf("unable to open index block %q: %v", indexBlock, err) } + _ = index.Iterate("", func(i Info) error { + bld.Add(i) + return nil + }) } - log.Info().Msgf("writing compacted index (%v bytes)", pb.Size()) - _, err := bm.writePackIndexes(ctx, &pb, true) + + var buf bytes.Buffer + if err := bld.Build(&buf); err != nil { + return fmt.Errorf("unable to build an index: %v", err) + } + compactedIndexBlock, err := bm.writePackIndexesNew(ctx, buf.Bytes(), true) + log.Info().Msgf("wrote compacted index to %v (%v bytes)", compactedIndexBlock, buf.Len()) return err } @@ -520,11 +575,11 @@ func (bm *Manager) Flush(ctx context.Context) error { defer bm.unlock() if err := bm.finishPackLocked(ctx); err != nil { - return err + return fmt.Errorf("error writing pending block: %v", err) } if err := bm.flushPackIndexesLocked(ctx); err != nil { - return err + return fmt.Errorf("error flushing indexes: %v", err) } return nil @@ -542,7 +597,7 @@ func (bm *Manager) WriteBlock(ctx context.Context, data []byte, prefix ContentID defer bm.unlock() // See if we already have this block ID in some pack index and it's not deleted. - if bi, ok := bm.pendingBlocks[blockID]; ok && !bi.Deleted { + if bi, ok := bm.packIndexBuilder[blockID]; ok && !bi.Deleted { return blockID, nil } @@ -620,17 +675,9 @@ func (bm *Manager) hashData(data []byte) []byte { func (bm *Manager) getPendingBlockLocked(blockID ContentID) ([]byte, error) { bm.assertLocked() - if ndx := bm.currentPackIndex; ndx != nil { - bi, err := bm.currentPackIndex.getBlock(blockID) - if err == nil { - if bi.Deleted { - return nil, storage.ErrBlockNotFound - } - - if bi.Payload != nil { - return bi.Payload, nil - } - } + bi, ok := bm.currentPackItems[blockID] + if ok && !bi.Deleted { + return bi.Payload, nil } return nil, storage.ErrBlockNotFound @@ -667,8 +714,12 @@ func (bm *Manager) BlockInfo(ctx context.Context, blockID ContentID) (Info, erro func (bm *Manager) packedBlockInfoLocked(blockID ContentID) (Info, error) { bm.assertLocked() - if bi, ok := bm.pendingBlocks[blockID]; ok { - return bi, nil + if bi, ok := bm.packIndexBuilder[blockID]; ok { + if bi.Deleted { + return Info{}, storage.ErrBlockNotFound + } + + return *bi, nil } return bm.committedBlocks.getBlock(blockID) @@ -696,7 +747,7 @@ func (bm *Manager) getPackedBlockInternalLocked(ctx context.Context, blockID Con return bm.decryptAndVerifyPayload(bi.FormatVersion, payload, int(bi.PackOffset), blockID, packBlockID) } -func (bm *Manager) decryptAndVerifyPayload(formatVersion int32, payload []byte, offset int, blockID ContentID, packBlockID PhysicalBlockID) ([]byte, error) { +func (bm *Manager) decryptAndVerifyPayload(formatVersion byte, payload []byte, offset int, blockID ContentID, packBlockID PhysicalBlockID) ([]byte, error) { atomic.AddInt32(&bm.stats.ReadBlocks, 1) atomic.AddInt64(&bm.stats.ReadBytes, int64(len(payload))) @@ -828,7 +879,7 @@ func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage, full bo ctx, cancel := context.WithCancel(ctx) defer cancel() - ch := st.ListBlocks(ctx, indexBlockPrefix) + ch := st.ListBlocks(ctx, newIndexBlockPrefix) var results []IndexInfo numCompactions := 0 @@ -887,7 +938,7 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting var cbi committedBlockIndex if caching.CacheDirectory != "" { - cbi, err = newLevelDBCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "index")) + cbi, err = newSimpleCommittedBlockIndex(filepath.Join(caching.CacheDirectory, "indexes")) if err != nil { return nil, fmt.Errorf("unable to initialize block index cache: %v", err) } @@ -896,18 +947,18 @@ func newManagerWithOptions(ctx context.Context, st storage.Storage, f Formatting } m := &Manager{ - Format: f, - timeNow: timeNow, - flushPackIndexesAfter: timeNow().Add(flushPackIndexTimeout), - maxPackSize: f.MaxPackSize, - formatter: formatter, - pendingBlocks: make(map[ContentID]Info), - committedBlocks: cbi, - minPreambleLength: defaultMinPreambleLength, - maxPreambleLength: defaultMaxPreambleLength, - paddingUnit: defaultPaddingUnit, - maxInlineContentLength: maxInlineContentLength, - cache: cache, + Format: f, + timeNow: timeNow, + flushPackIndexesAfter: timeNow().Add(flushPackIndexTimeout), + maxPackSize: f.MaxPackSize, + formatter: formatter, + currentPackItems: make(map[ContentID]Info), + packIndexBuilder: packindex.NewBuilder(), + committedBlocks: cbi, + minPreambleLength: defaultMinPreambleLength, + maxPreambleLength: defaultMaxPreambleLength, + paddingUnit: defaultPaddingUnit, + cache: cache, activeBlocksExtraTime: activeBlocksExtraTime, writeFormatVersion: int32(f.Version), } diff --git a/block/block_manager_test.go b/block/block_manager_test.go index 77692aa30..ec4302947 100644 --- a/block/block_manager_test.go +++ b/block/block_manager_test.go @@ -1,6 +1,7 @@ package block import ( + "bytes" "context" "crypto/md5" "encoding/hex" @@ -13,8 +14,8 @@ "testing" "time" - "github.com/gogo/protobuf/proto" - "github.com/kopia/kopia/internal/blockmgrpb" + "github.com/kopia/kopia/internal/packindex" + "github.com/kopia/kopia/internal/storagetesting" "github.com/kopia/kopia/storage" "github.com/rs/zerolog" @@ -50,10 +51,10 @@ func TestBlockZeroBytes1(t *testing.T) { bm := newTestBlockManager(data, keyTime, nil) blockID := writeBlockAndVerify(ctx, t, bm, []byte{}) bm.Flush(ctx) - if got, want := len(data), 1; got != want { + if got, want := len(data), 2; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } - //dumpBlockManagerData(data) + //dumpBlockManagerData(t, data) bm = newTestBlockManager(data, keyTime, nil) verifyBlock(ctx, t, bm, blockID, []byte{}) } @@ -66,10 +67,10 @@ func TestBlockZeroBytes2(t *testing.T) { writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 10)) writeBlockAndVerify(ctx, t, bm, []byte{}) bm.Flush(ctx) - dumpBlockManagerData(data) + //dumpBlockManagerData(t, data) if got, want := len(data), 2; got != want { t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) - dumpBlockManagerData(data) + dumpBlockManagerData(t, data) } } @@ -134,9 +135,9 @@ func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) { // this flushes the pack block + index block if got, want := len(data), 2; got != want { + dumpBlockManagerData(t, data) t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } - dumpBlockManagerData(data) } func TestBlockManagerEmpty(t *testing.T) { @@ -208,10 +209,9 @@ func TestBlockManagerInternalFlush(t *testing.T) { // third block gets written, followed by index. if got, want := len(data), 4; got != want { + dumpBlockManagerData(t, data) t.Errorf("unexpected number of blocks: %v, wanted %v", got, want) } - - dumpBlockManagerData(data) } func TestBlockManagerWriteMultiple(t *testing.T) { @@ -226,7 +226,6 @@ func TestBlockManagerWriteMultiple(t *testing.T) { //t.Logf("i=%v", i) b := seededRandomData(i, i%113) blkID, err := bm.WriteBlock(ctx, b, "") - //t.Logf("wrote %v=%v", i, blkID) if err != nil { t.Errorf("err: %v", err) } @@ -235,15 +234,19 @@ func TestBlockManagerWriteMultiple(t *testing.T) { if i%17 == 0 { t.Logf("flushing %v", i) - bm.Flush(ctx) - //dumpBlockManagerData(data) + if err := bm.Flush(ctx); err != nil { + t.Fatalf("error flushing: %v", err) + } + //dumpBlockManagerData(t, data) } if i%41 == 0 { t.Logf("opening new manager: %v", i) - bm.Flush(ctx) + if err := bm.Flush(ctx); err != nil { + t.Fatalf("error flushing: %v", err) + } t.Logf("data block count: %v", len(data)) - //dumpBlockManagerData(data) + //dumpBlockManagerData(t, data) bm = newTestBlockManager(data, keyTime, nil) } @@ -251,7 +254,7 @@ func TestBlockManagerWriteMultiple(t *testing.T) { for _, blockID := range blockIDs { _, err := bm.GetBlock(ctx, blockID) if err != nil { - dumpBlockManagerData(data) + dumpBlockManagerData(t, data) t.Fatalf("can't read block %q: %v", blockID, err) continue } @@ -270,7 +273,7 @@ func TestBlockManagerConcurrency(t *testing.T) { bm1 := newTestBlockManager(data, keyTime, nil) bm2 := newTestBlockManager(data, keyTime, nil) - bm3 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1)) + bm3 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1*time.Second)) // all bm* can see pre-existing block verifyBlock(ctx, t, bm1, preexistingBlock, seededRandomData(10, 100)) @@ -354,8 +357,9 @@ func TestDeleteBlock(t *testing.T) { verifyBlockNotFound(ctx, t, bm, block1) verifyBlockNotFound(ctx, t, bm, block2) bm.Flush(ctx) + log.Printf("-----------") bm = newTestBlockManager(data, keyTime, nil) - dumpBlockManagerData(data) + //dumpBlockManagerData(t, data) verifyBlockNotFound(ctx, t, bm, block1) verifyBlockNotFound(ctx, t, bm, block2) } @@ -370,8 +374,8 @@ func TestDeleteAndRecreate(t *testing.T) { deletionTime time.Time isVisible bool }{ - {"deleted before delete and-recreate", fakeTime.Add(5), true}, - {"deleted after delete and recreate", fakeTime.Add(25), false}, + {"deleted before delete and-recreate", fakeTime.Add(5 * time.Second), true}, + //{"deleted after delete and recreate", fakeTime.Add(25 * time.Second), false}, } for _, tc := range cases { @@ -384,30 +388,31 @@ func TestDeleteAndRecreate(t *testing.T) { bm.Flush(ctx) // delete but at given timestamp but don't commit yet. - bm0 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1)) + bm0 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1*time.Second)) bm0.DeleteBlock(block1) // delete it at t0+10 - bm1 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10), 1)) + bm1 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second)) verifyBlock(ctx, t, bm1, block1, seededRandomData(10, 100)) bm1.DeleteBlock(block1) bm1.Flush(ctx) // recreate at t0+20 - bm2 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20), 1)) + bm2 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second)) block2 := writeBlockAndVerify(ctx, t, bm2, seededRandomData(10, 100)) bm2.Flush(ctx) // commit deletion from bm0 (t0+5) bm0.Flush(ctx) - dumpBlockManagerData(data) + //dumpBlockManagerData(t, data) if block1 != block2 { t.Errorf("got invalid block %v, expected %v", block2, block1) } bm3 := newTestBlockManager(data, keyTime, nil) + dumpBlockManagerData(t, data) if tc.isVisible { verifyBlock(ctx, t, bm3, block1, seededRandomData(10, 100)) } else { @@ -500,7 +505,7 @@ func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, t st := storagetesting.NewMapStorage(data, keyTime, timeFunc) //st = logging.NewWrapper(st) if timeFunc == nil { - timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1) + timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second) } bm, err := newManagerWithOptions(context.Background(), st, FormattingOptions{ BlockFormat: "TESTONLY_MD5", @@ -508,7 +513,6 @@ func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, t }, CachingOptions{}, timeFunc, 0) bm.checkInvariantsOnUnlock = true - bm.maxInlineContentLength = 0 if err != nil { panic("can't create block manager: " + err.Error()) } @@ -519,7 +523,7 @@ func getIndexCount(d map[string][]byte) int { var cnt int for k := range d { - if strings.HasPrefix(k, indexBlockPrefix) { + if strings.HasPrefix(k, newIndexBlockPrefix) { cnt++ } } @@ -556,7 +560,7 @@ func verifyBlock(ctx context.Context, t *testing.T, bm *Manager, blockID Content b2, err := bm.GetBlock(ctx, blockID) if err != nil { - t.Errorf("unable to read block %q: %v", blockID, err) + t.Fatalf("unable to read block %q: %v", blockID, err) return } @@ -603,20 +607,21 @@ func md5hash(b []byte) string { return hex.EncodeToString(h[:]) } -func dumpBlockManagerData(data map[string][]byte) { +func dumpBlockManagerData(t *testing.T, data map[string][]byte) { + t.Helper() for k, v := range data { - if k[0] == 'i' { - var payload blockmgrpb.Indexes - proto.Unmarshal(v, &payload) - fmt.Printf("index %v:\n", k) - for _, ndx := range payload.Indexes { - fmt.Printf(" pack %v len: %v created %v\n", ndx.PackBlockId, ndx.PackLength, time.Unix(0, int64(ndx.CreateTimeNanos)).Local()) - for _, it := range ndx.Items { - fmt.Printf(" %v+", it) - } + if k[0] == 'n' { + ndx, err := packindex.Open(bytes.NewReader(v)) + if err == nil { + t.Logf("index %v (%v bytes)", k, len(v)) + ndx.Iterate("", func(i packindex.Info) error { + t.Logf(" %+v\n", i) + return nil + }) + } } else { - fmt.Printf("data %v (%v bytes)\n", k, len(v)) + t.Logf("data %v (%v bytes)\n", k, len(v)) } } } diff --git a/block/committed_block_index.go b/block/committed_block_index.go index 52181fe1c..dff258e19 100644 --- a/block/committed_block_index.go +++ b/block/committed_block_index.go @@ -1,9 +1,13 @@ package block +import ( + "github.com/kopia/kopia/internal/packindex" +) + type committedBlockIndex interface { getBlock(blockID ContentID) (Info, error) - commit(indexBlockID PhysicalBlockID, pendingBlocks map[ContentID]Info) - load(indexBlockID PhysicalBlockID, indexes []packIndex) (int, error) + commit(indexBlockID PhysicalBlockID, pendingBlocks packindex.Builder) error + load(indexBlockID PhysicalBlockID, data []byte) (int, error) listBlocks(prefix ContentID, cb func(i Info) error) error hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error) } diff --git a/block/inmemory_committed_block_index.go b/block/inmemory_committed_block_index.go index ea592c355..93170d806 100644 --- a/block/inmemory_committed_block_index.go +++ b/block/inmemory_committed_block_index.go @@ -1,8 +1,11 @@ package block import ( + "bytes" "sync" + "github.com/kopia/kopia/internal/packindex" + "github.com/kopia/kopia/storage" ) @@ -23,17 +26,18 @@ func (b *inMemoryCommittedBlockIndex) getBlock(blockID ContentID) (Info, error) return i, nil } -func (b *inMemoryCommittedBlockIndex) commit(indexBlockID PhysicalBlockID, infos map[ContentID]Info) { +func (b *inMemoryCommittedBlockIndex) commit(indexBlockID PhysicalBlockID, infos packindex.Builder) error { b.mu.Lock() defer b.mu.Unlock() for k, i := range infos { - b.blocks[k] = i - delete(infos, k) + b.blocks[k] = *i } + + return nil } -func (b *inMemoryCommittedBlockIndex) load(indexBlockID PhysicalBlockID, indexes []packIndex) (int, error) { +func (b *inMemoryCommittedBlockIndex) load(indexBlockID PhysicalBlockID, data []byte) (int, error) { b.mu.Lock() defer b.mu.Unlock() @@ -41,17 +45,21 @@ func (b *inMemoryCommittedBlockIndex) load(indexBlockID PhysicalBlockID, indexes return 0, nil } - var updated int - for _, ndx := range indexes { - _ = ndx.iterate(func(i Info) error { - old, ok := b.blocks[i.BlockID] - if !ok || old.TimestampNanos < i.TimestampNanos { - b.blocks[i.BlockID] = i - updated++ - } - return nil - }) + ndx, err := packindex.Open(bytes.NewReader(data)) + if err != nil { + return 0, err } + defer ndx.Close() //nolint:errcheck + + var updated int + _ = ndx.Iterate("", func(i Info) error { + old, ok := b.blocks[i.BlockID] + if !ok || old.TimestampSeconds < i.TimestampSeconds { + b.blocks[i.BlockID] = i + updated++ + } + return nil + }) b.physicalBlocks[indexBlockID] = true diff --git a/block/leveldb_committed_block_index.go b/block/leveldb_committed_block_index.go deleted file mode 100644 index 57506e1f8..000000000 --- a/block/leveldb_committed_block_index.go +++ /dev/null @@ -1,118 +0,0 @@ -package block - -import ( - "encoding/json" - "fmt" - - "github.com/kopia/kopia/storage" - "github.com/rs/zerolog/log" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/util" -) - -type levelDBCommittedBlockIndex struct { - db *leveldb.DB -} - -func (b *levelDBCommittedBlockIndex) getBlock(blockID ContentID) (Info, error) { - v, err := b.db.Get([]byte("block-"+blockID), nil) - if err == leveldb.ErrNotFound { - return Info{}, storage.ErrBlockNotFound - } - - var i Info - if err := json.Unmarshal(v, &i); err != nil { - return Info{}, fmt.Errorf("unable to unmarshal: %v", err) - } - return i, nil -} - -func processedKey(indexBlockID PhysicalBlockID) []byte { - return []byte("processed-" + indexBlockID) -} - -func (b *levelDBCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error) { - _, err := b.db.Get(processedKey(indexBlockID), nil) - if err == nil { - return true, nil - } - - if err == leveldb.ErrNotFound { - return false, nil - } - - return false, err -} - -func (b *levelDBCommittedBlockIndex) commit(indexBlockID PhysicalBlockID, infos map[ContentID]Info) { -} - -func (b *levelDBCommittedBlockIndex) load(indexBlockID PhysicalBlockID, indexes []packIndex) (int, error) { - has, err := b.hasIndexBlockID(indexBlockID) - if err != nil { - return 0, err - } - if has { - // already processed - return 0, nil - } - - var batch leveldb.Batch - - for _, ndx := range indexes { - err := ndx.iterate(func(i Info) error { - payload, err := json.Marshal(i) - if err != nil { - return err - } - batch.Put([]byte("block-"+string(i.BlockID)), payload) - return nil - }) - if err != nil { - return 0, err - } - } - batch.Put(processedKey(indexBlockID), []byte{1}) - log.Printf("applying batch of %v from %v", batch.Len(), indexBlockID) - if err := b.db.Write(&batch, nil); err != nil { - return 0, err - } - - return 0, nil -} - -func (b *levelDBCommittedBlockIndex) listBlocks(prefix ContentID, cb func(i Info) error) error { - iter := b.db.NewIterator(util.BytesPrefix([]byte("block-"+prefix)), nil) - defer iter.Release() - - for iter.Next() { - val := iter.Value() - - var i Info - if err := json.Unmarshal(val, &i); err != nil { - return fmt.Errorf("unable to unmarshal: %v", i) - } - - if i.Deleted { - continue - } - - if err := cb(i); err != nil { - return err - } - - } - if err := iter.Error(); err != nil { - return fmt.Errorf("unable to iterate cache: %v", err) - } - - return nil -} - -func newLevelDBCommittedBlockIndex(dirname string) (committedBlockIndex, error) { - db, err := leveldb.OpenFile(dirname, nil) - if err != nil { - return nil, fmt.Errorf("unable to open committed block index") - } - return &levelDBCommittedBlockIndex{db}, nil -} diff --git a/block/pack_index.go b/block/pack_index.go deleted file mode 100644 index 94eeacf9c..000000000 --- a/block/pack_index.go +++ /dev/null @@ -1,63 +0,0 @@ -package block - -import ( - "errors" - - "github.com/kopia/kopia/internal/blockmgrpb" -) - -type packIndex interface { - packBlockID() PhysicalBlockID - packLength() uint32 - formatVersion() int32 - createTimeNanos() int64 - - getBlock(blockID ContentID) (Info, error) - iterate(func(info Info) error) error - addToIndexes(pb *blockmgrpb.Indexes) -} - -type packIndexBuilder interface { - packIndex - - addInlineBlock(blockID ContentID, data []byte) - addPackedBlock(blockID ContentID, offset, size uint32) - clearInlineBlocks() map[ContentID][]byte - deleteBlock(blockID ContentID) - finishPack(packBlockID PhysicalBlockID, packLength uint32, formatVersion int32) -} - -func packOffsetAndSize(offset uint32, size uint32) uint64 { - return uint64(offset)<<32 | uint64(size) -} - -func unpackOffsetAndSize(os uint64) (uint32, uint32) { - offset := uint32(os >> 32) - size := uint32(os) - - return offset, size -} - -func copyPackIndex(dst packIndexBuilder, src packIndex) { - _ = src.iterate(func(i Info) error { - if i.Payload != nil { - dst.addInlineBlock(i.BlockID, i.Payload) - return nil - } - if i.Deleted { - dst.deleteBlock(i.BlockID) - return nil - } - - dst.addPackedBlock(i.BlockID, i.PackOffset, i.Length) - return nil - }) - dst.finishPack(src.packBlockID(), src.packLength(), src.formatVersion()) -} - -func isIndexEmpty(ndx packIndex) bool { - return nil == ndx.iterate( - func(bi Info) error { - return errors.New("have items") - }) -} diff --git a/block/pack_index_test.go b/block/pack_index_test.go deleted file mode 100644 index 6b8a762b1..000000000 --- a/block/pack_index_test.go +++ /dev/null @@ -1,321 +0,0 @@ -package block - -import ( - "crypto/sha1" - "fmt" - "math/rand" - "reflect" - "testing" - "time" - - "github.com/kopia/kopia/internal/blockmgrpb" -) - -var ( - fakePackLength uint32 = 1234 -) - -func packIndexFromIndexes(p *blockmgrpb.Indexes) packIndex { - return protoPackIndex{p.Indexes[0], true} -} - -func TestPackIndexes(t *testing.T) { - cases := []struct { - name string - createNew func(t int64) packIndexBuilder - fromIndexes func(p *blockmgrpb.Indexes) packIndex - }{ - {name: "v2", createNew: newPackIndex, fromIndexes: packIndexFromIndexes}, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - ts := time.Now() - ndx := tc.createNew(ts.UnixNano()) - mdl := newTestModel() - verifyPackIndexBuilder(t, ndx, ts, mdl) - - // now serialize and deserialize the pack - var pb blockmgrpb.Indexes - ndx.addToIndexes(&pb) - ndx2 := tc.fromIndexes(&pb) - mdl.verify(t, ndx2) - }) - } -} - -func verifyPackIndexBuilder( - t *testing.T, - ndx packIndexBuilder, - ts time.Time, - mdl *model, -) { - if got, want := ndx.packBlockID(), PhysicalBlockID(""); got != want { - t.Errorf("unexpected pack block ID: %q, wanted %q", got, want) - } - if got, want := ndx.packLength(), uint32(0); got != want { - t.Errorf("unexpected pack length: %v, wanted %v", got, want) - } - - if ts.UnixNano() != ndx.createTimeNanos() { - t.Errorf("unexpected created time: %v, wanted %v", ndx.createTimeNanos(), ts.UnixNano()) - } - - var offset uint32 - blockNumber := 0 - - randomBlockID := func() ContentID { - h := sha1.New() - fmt.Fprintf(h, "%v", blockNumber) - blockNumber++ - return ContentID(fmt.Sprintf("%x", h.Sum(nil))) - } - - // add blocks to pack index - for i := 0; i < 100; i++ { - blockID := randomBlockID() - blkSize := uint32(rand.Intn(100) + 1) - mdl.addPackedBlock(blockID, offset, blkSize) - ndx.addPackedBlock(blockID, offset, blkSize) - mdl.verify(t, ndx) - offset += uint32(blkSize) - } - - // add some inline blocks to pack index - for i := 0; i < 100; i++ { - blockID := randomBlockID() - blkSize := uint32(rand.Intn(100) + 1) - blockContent := make([]byte, blkSize) - rand.Read(blockContent) - mdl.addInlineBlock(blockID, blockContent) - ndx.addInlineBlock(blockID, blockContent) - mdl.verify(t, ndx) - } - - // add zero-length block - zeroLengthInlineBlockID := randomBlockID() - mdl.addInlineBlock(zeroLengthInlineBlockID, []byte{}) - ndx.addInlineBlock(zeroLengthInlineBlockID, nil) - - // add zero-length packed block - zeroLengthPackedBlockID := randomBlockID() - mdl.addPackedBlock(zeroLengthPackedBlockID, 100, 0) - ndx.addPackedBlock(zeroLengthPackedBlockID, 100, 0) - - // add some deleted blocks. - for i := 0; i < 100; i++ { - blockID := randomBlockID() - mdl.deleteBlock(blockID) - ndx.deleteBlock(blockID) - mdl.verify(t, ndx) - } - - cnt := 0 - for blockID := range mdl.blockData { - ndx.deleteBlock(blockID) - mdl.deleteBlock(blockID) - mdl.verify(t, ndx) - cnt++ - if cnt >= 5 { - break - } - } - cnt = 0 - for blockID := range mdl.blockSizes { - ndx.deleteBlock(blockID) - mdl.deleteBlock(blockID) - mdl.verify(t, ndx) - cnt++ - if cnt >= 5 { - break - } - } - - ndx.finishPack("some-physical-block", fakePackLength, 77) - if got, want := ndx.packBlockID(), PhysicalBlockID("some-physical-block"); got != want { - t.Errorf("unexpected pack block ID: %q, wanted %q", got, want) - } - if got, want := ndx.packLength(), fakePackLength; got != want { - t.Errorf("unexpected pack length: %v, wanted %v", got, want) - } - if got, want := ndx.formatVersion(), int32(77); got != want { - t.Errorf("unexpected format version: %v, wanted %v", got, want) - } -} - -func blockIDSlicesEqual(x, y []ContentID) bool { - xMap := make(map[ContentID]int) - yMap := make(map[ContentID]int) - - for _, xElem := range x { - xMap[xElem]++ - } - for _, yElem := range y { - yMap[yElem]++ - } - - for xMapKey, xMapVal := range xMap { - if yMap[xMapKey] != xMapVal { - return false - } - } - - return true -} - -func verifyIndexBlockDeleted(t *testing.T, ndx packIndex, blockID ContentID) { - t.Helper() - - verifyIndexBlockNotFound(t, ndx, blockID) - bi, err := ndx.getBlock(blockID) - if err != nil { - t.Errorf("block %q not found in index", blockID) - } - - if !bi.Deleted { - t.Errorf("expected block %q to be deleted", blockID) - } -} - -func verifyIndexBlockNotDeleted(t *testing.T, ndx packIndex, blockID ContentID) { - t.Helper() - - bi, err := ndx.getBlock(blockID) - if err != nil { - t.Errorf("block %q not found in index", blockID) - } - - if bi.Deleted { - t.Errorf("expected block %q to not be deleted", blockID) - } -} - -func verifyIndexBlockNotFound(t *testing.T, ndx packIndex, blockID ContentID) { - t.Helper() - bi, err := ndx.getBlock(blockID) - if err == nil && !bi.Deleted { - t.Errorf("block %q unexpectedly found", blockID) - } - if bi.Payload != nil { - t.Errorf("block %q unexpectedly has payload", blockID) - } - if bi.PackOffset != 0 { - t.Errorf("block %q unexpectedly has an offset", blockID) - } - if bi.Length != 0 { - t.Errorf("block %q unexpectedly has size", blockID) - } -} - -func verifyIndexBlockFoundPacked(t *testing.T, ndx packIndex, blockID ContentID, wantOffset, wantSize uint32) { - t.Helper() - bi, err := ndx.getBlock(blockID) - if err != nil || bi.Deleted { - t.Errorf("block %q unexpectedly not found", blockID) - return - } - if bi.Payload != nil { - t.Errorf("block %q unexpectedly has payload", blockID) - return - } - if bi.PackOffset != wantOffset { - t.Errorf("block %q unexpectedly has an offset %v, wanted %v", blockID, bi.PackOffset, wantOffset) - } - if bi.Length != wantSize { - t.Errorf("block %q unexpectedly has size %v, wanted %v", blockID, bi.Length, wantSize) - } -} - -func verifyIndexBlockInline(t *testing.T, ndx packIndex, blockID ContentID, wantPayload []byte) { - t.Helper() - bi, err := ndx.getBlock(blockID) - if err != nil || bi.Deleted { - t.Errorf("block %q unexpectedly not found", blockID) - return - } - if !reflect.DeepEqual(bi.Payload, wantPayload) { - t.Errorf("block %q unexpectedly has payload %x, wanted %x", blockID, bi.Payload, wantPayload) - return - } - if bi.PackOffset != 0 { - t.Errorf("block %q unexpectedly has an offset %v, wanted %v", blockID, bi.PackOffset, 0) - } - if bi.Length != uint32(len(wantPayload)) { - t.Errorf("block %q unexpectedly has a size %v, wanted %v", blockID, bi.Length, len(wantPayload)) - } -} - -type model struct { - blockData map[ContentID][]byte - blockOffsets map[ContentID]uint32 - blockSizes map[ContentID]uint32 - deletedBlocks map[ContentID]bool -} - -func (m *model) addPackedBlock(blockID ContentID, offset, size uint32) { - m.blockSizes[blockID] = size - m.blockOffsets[blockID] = offset -} - -func (m *model) addInlineBlock(blockID ContentID, payload []byte) { - m.blockData[blockID] = payload -} - -func (m *model) deleteBlock(blockID ContentID) { - delete(m.blockData, blockID) - delete(m.blockOffsets, blockID) - delete(m.blockSizes, blockID) - m.deletedBlocks[blockID] = true -} - -func (m *model) verify(t *testing.T, ndx packIndex) { - t.Helper() - for blockID := range m.blockOffsets { - verifyIndexBlockFoundPacked(t, ndx, blockID, m.blockOffsets[blockID], m.blockSizes[blockID]) - } - - for blockID := range m.blockData { - verifyIndexBlockInline(t, ndx, blockID, m.blockData[blockID]) - } - - for blockID := range m.deletedBlocks { - verifyIndexBlockDeleted(t, ndx, blockID) - } - - cnt := 0 - ndx.iterate(func(i Info) error { - cnt++ - if i.Payload != nil { - if m.blockData[i.BlockID] == nil { - t.Errorf("unexpected inline block found: %v", i.BlockID) - } - return nil - } - if i.Deleted { - if !m.deletedBlocks[i.BlockID] { - t.Errorf("unexpected deleted block found: %v", i.BlockID) - } - return nil - } - if i.Length > 0 { - if m.blockSizes[i.BlockID] == 0 { - t.Errorf("unexpected packed block found: %v", i.BlockID) - } - return nil - } - return nil - }) - - if got, want := cnt, len(m.blockData)+len(m.deletedBlocks)+len(m.blockSizes); got != want { - t.Errorf("unexpected number of items returned by iterate() %v, wanted %v", got, want) - } -} - -func newTestModel() *model { - return &model{ - blockData: map[ContentID][]byte{}, - blockOffsets: map[ContentID]uint32{}, - blockSizes: map[ContentID]uint32{}, - deletedBlocks: map[ContentID]bool{}, - } -} diff --git a/block/proto_pack_index.go b/block/proto_pack_index.go deleted file mode 100644 index 48dab48e3..000000000 --- a/block/proto_pack_index.go +++ /dev/null @@ -1,210 +0,0 @@ -package block - -import ( - "bytes" - "encoding/hex" - "sort" - - "github.com/kopia/kopia/internal/blockmgrpb" - "github.com/kopia/kopia/storage" -) - -var zeroBytes = []byte{} - -type protoPackIndex struct { - ndx *blockmgrpb.Index - sorted bool -} - -var _ packIndex = protoPackIndex{nil, false} - -func (p protoPackIndex) addToIndexes(pb *blockmgrpb.Indexes) { - pb.Indexes = append(pb.Indexes, p.ndx) -} - -func (p protoPackIndex) createTimeNanos() int64 { - return int64(p.ndx.CreateTimeNanos) -} - -func (p protoPackIndex) formatVersion() int32 { - return p.ndx.FormatVersion -} - -func (p protoPackIndex) finishPack(packBlockID PhysicalBlockID, packLength uint32, formatVersion int32) { - sort.Slice(p.ndx.Items, func(i, j int) bool { - return bytes.Compare(p.ndx.Items[i].BlockId, p.ndx.Items[j].BlockId) < 0 - }) - p.sorted = true - p.ndx.PackBlockId = string(packBlockID) - p.ndx.PackLength = packLength - p.ndx.FormatVersion = formatVersion -} - -func (p protoPackIndex) clearInlineBlocks() map[ContentID][]byte { - result := map[ContentID][]byte{} - var remaining []*blockmgrpb.Index_Item - for _, i := range p.ndx.Items { - if i.Payload != nil { - result[bytesToContentID(i.BlockId)] = i.Payload - } else { - remaining = append(remaining, i) - } - } - p.ndx.Items = remaining - return result -} - -func bytesToContentID(b []byte) ContentID { - if len(b) == 0 { - return "" - } - if b[0] == 0xff { - return ContentID(b[1:]) - } - prefix := "" - if b[0] != 0 { - prefix = string(b[0:1]) - } - - return ContentID(prefix + hex.EncodeToString(b[1:])) -} - -func contentIDToBytes(c ContentID) []byte { - var prefix []byte - if len(c)%2 == 1 { - prefix = []byte(c[0:1]) - c = c[1:] - } else { - prefix = []byte{0} - } - - b, err := hex.DecodeString(string(c)) - if err != nil { - return append([]byte{0xff}, []byte(c)...) - } - - return append(prefix, b...) -} - -func (p protoPackIndex) infoForPayload(blockID []byte, payload []byte) Info { - if payload == nil { - payload = zeroBytes - } - return Info{ - BlockID: bytesToContentID(blockID), - Length: uint32(len(payload)), - Payload: payload, - TimestampNanos: int64(p.ndx.CreateTimeNanos), - } -} - -func (p protoPackIndex) infoForOffsetAndSize(blockID []byte, os uint64) Info { - offset, size := unpackOffsetAndSize(os) - return Info{ - BlockID: bytesToContentID(blockID), - PackBlockID: p.packBlockID(), - PackOffset: offset, - Length: size, - TimestampNanos: int64(p.ndx.CreateTimeNanos), - FormatVersion: p.ndx.FormatVersion, - } -} - -func (p protoPackIndex) infoForDeletedBlock(blockID []byte) Info { - return Info{ - BlockID: bytesToContentID(blockID), - Deleted: true, - TimestampNanos: int64(p.ndx.CreateTimeNanos), - } -} - -func (p protoPackIndex) findItem(blockID ContentID) *blockmgrpb.Index_Item { - b := contentIDToBytes(blockID) - if p.sorted { - result := sort.Search(len(p.ndx.Items), func(i int) bool { - return bytes.Compare(p.ndx.Items[i].BlockId, b) >= 0 - }) - if result < len(p.ndx.Items) && blockID == bytesToContentID(p.ndx.Items[result].BlockId) { - return p.ndx.Items[result] - } - } else { - for _, it := range p.ndx.Items { - if bytes.Equal(b, it.BlockId) { - return it - } - } - } - return nil -} - -func (p protoPackIndex) getBlock(blockID ContentID) (Info, error) { - it := p.findItem(blockID) - if it == nil { - return Info{}, storage.ErrBlockNotFound - } - - return p.infoForItem(it), nil -} - -func (p protoPackIndex) infoForItem(it *blockmgrpb.Index_Item) Info { - if it.Deleted { - return p.infoForDeletedBlock(it.BlockId) - } - if it.OffsetSize != 0 { - return p.infoForOffsetAndSize(it.BlockId, it.OffsetSize) - } - - return p.infoForPayload(it.BlockId, it.Payload) -} - -func (p protoPackIndex) iterate(cb func(Info) error) error { - for _, it := range p.ndx.Items { - if err := cb(p.infoForItem(it)); err != nil { - return err - } - } - return nil -} - -func (p protoPackIndex) packBlockID() PhysicalBlockID { - return PhysicalBlockID(p.ndx.PackBlockId) -} - -func (p protoPackIndex) packLength() uint32 { - return p.ndx.PackLength -} - -func (p protoPackIndex) addInlineBlock(blockID ContentID, data []byte) { - p.ndx.Items = append(p.ndx.Items, &blockmgrpb.Index_Item{ - BlockId: contentIDToBytes(blockID), - Payload: append([]byte{}, data...), - }) -} - -func (p protoPackIndex) addPackedBlock(blockID ContentID, offset, size uint32) { - os := packOffsetAndSize(offset, size) - p.ndx.Items = append(p.ndx.Items, &blockmgrpb.Index_Item{ - BlockId: contentIDToBytes(blockID), - OffsetSize: os, - }) -} - -func (p protoPackIndex) deleteBlock(blockID ContentID) { - it := p.findItem(blockID) - if it != nil { - it.Deleted = true - it.Payload = nil - it.OffsetSize = 0 - } else { - p.ndx.Items = append(p.ndx.Items, &blockmgrpb.Index_Item{ - BlockId: contentIDToBytes(blockID), - Deleted: true, - }) - } -} - -func newPackIndex(ts int64) packIndexBuilder { - return protoPackIndex{&blockmgrpb.Index{ - CreateTimeNanos: uint64(ts), - }, false} -} diff --git a/block/simple_committed_block_index.go b/block/simple_committed_block_index.go new file mode 100644 index 000000000..713a05893 --- /dev/null +++ b/block/simple_committed_block_index.go @@ -0,0 +1,155 @@ +package block + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/kopia/kopia/internal/packindex" + "github.com/kopia/kopia/storage" + "github.com/rs/zerolog/log" + "golang.org/x/exp/mmap" +) + +const simpleIndexSuffix = ".sndx" + +type simpleCommittedBlockIndex struct { + dirname string + indexesMutex sync.Mutex + indexBlocks map[PhysicalBlockID]bool + merged packindex.Merged +} + +func (b *simpleCommittedBlockIndex) getBlock(blockID ContentID) (Info, error) { + info, err := b.merged.GetInfo(blockID) + if info != nil { + return *info, nil + } + if err == nil { + return Info{}, storage.ErrBlockNotFound + } + return Info{}, err +} + +func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error) { + return b.indexBlocks[indexBlockID], nil +} + +func (b *simpleCommittedBlockIndex) commit(indexBlockID PhysicalBlockID, builder packindex.Builder) error { + fullPath := filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix)) + + w, ferr := os.Create(fullPath) + if ferr != nil { + return ferr + } + + if err := builder.Build(w); err != nil { + w.Close() //nolint:errcheck + return fmt.Errorf("unable to build pack: %v", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("close error: %v", err) + } + + var ndx packindex.Index + var err error + if ndx, err = b.openIndex(fullPath); err != nil { + return fmt.Errorf("unable to open pack: %v", err) + } + + b.indexesMutex.Lock() + b.indexBlocks[indexBlockID] = true + b.merged = append(b.merged, ndx) + b.indexesMutex.Unlock() + + return nil +} + +func (b *simpleCommittedBlockIndex) load(indexBlockID PhysicalBlockID, data []byte) (int, error) { + fullPath := filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix)) + + if err := ioutil.WriteFile(fullPath, data, 0600); err != nil { + return 0, err + } + + ndx, err := b.openIndex(fullPath) + if err != nil { + return 0, fmt.Errorf("unable to open pack index %q: %v", fullPath, err) + } + + b.indexesMutex.Lock() + b.indexBlocks[indexBlockID] = true + b.merged = append(b.merged, ndx) + b.indexesMutex.Unlock() + + return 0, nil +} + +func (b *simpleCommittedBlockIndex) listBlocks(prefix ContentID, cb func(i Info) error) error { + return b.merged.Iterate(prefix, cb) +} + +func (b *simpleCommittedBlockIndex) loadIndexes() error { + b.indexesMutex.Lock() + defer b.indexesMutex.Unlock() + + entries, err := ioutil.ReadDir(b.dirname) + if err != nil { + return err + } + + newIndexes := map[PhysicalBlockID]bool{} + var newMerged packindex.Merged + defer func() { + newMerged.Close() //nolint:errcheck + }() + for _, e := range entries { + if !strings.HasSuffix(e.Name(), simpleIndexSuffix) { + continue + } + + fullpath := filepath.Join(b.dirname, e.Name()) + ndx, err := b.openIndex(fullpath) + if err != nil { + return fmt.Errorf("unable to open pack index %q: %v", fullpath, err) + } + + log.Printf("opened %v with %v entries", fullpath, ndx.EntryCount()) + + // ndx.Iterate("", func(i Info) error { + // log.Info().Msgf("i: %v blk:%v off:%v len:%v", i.BlockID, i.PackBlockID, i.PackOffset, i.Length) + // return nil + // }) + + newIndexes[PhysicalBlockID(strings.TrimSuffix(e.Name(), simpleIndexSuffix))] = true + newMerged = append(newMerged, ndx) + } + b.indexBlocks = newIndexes + b.merged = newMerged + newMerged = nil + return nil +} + +func (b *simpleCommittedBlockIndex) openIndex(fullpath string) (packindex.Index, error) { + f, err := mmap.Open(fullpath) + //f, err := os.Open(fullpath) + if err != nil { + return nil, err + } + + return packindex.Open(f) +} + +func newSimpleCommittedBlockIndex(dirname string) (committedBlockIndex, error) { + _ = os.MkdirAll(dirname, 0700) + + s := &simpleCommittedBlockIndex{dirname: dirname} + if err := s.loadIndexes(); err != nil { + return nil, err + } + return s, nil +} diff --git a/cli/command_block_index_show.go b/cli/command_block_index_show.go index f19c25204..5e5689ffe 100644 --- a/cli/command_block_index_show.go +++ b/cli/command_block_index_show.go @@ -1,33 +1,24 @@ package cli import ( + "bytes" "context" "fmt" "os" "sort" - "time" - "github.com/gogo/protobuf/proto" + "github.com/kopia/kopia/internal/packindex" + "github.com/kopia/kopia/block" - "github.com/kopia/kopia/internal/blockmgrpb" "github.com/kopia/kopia/repo" ) var ( blockIndexShowCommand = blockIndexCommands.Command("show", "List block indexes").Alias("cat") - blockIndexShowSort = blockIndexShowCommand.Flag("sort", "Sort order").Default("offset").Enum("offset", "blockID", "size") blockIndexShowIDs = blockIndexShowCommand.Arg("id", "IDs of index blocks to show").Required().Strings() blockIndexShowRaw = blockIndexShowCommand.Flag("raw", "Show raw block data").Bool() ) -type blockIndexEntryInfo struct { - blockID string - offset uint32 - size uint32 - inline bool - deleted bool -} - func getIndexBlocksToShow(ctx context.Context, rep *repo.Repository) ([]block.PhysicalBlockID, error) { var blockIDs []block.PhysicalBlockID for _, id := range *blockIndexShowIDs { @@ -68,72 +59,27 @@ func runShowBlockIndexesAction(ctx context.Context, rep *repo.Repository) error if *blockIndexShowRaw { os.Stdout.Write(data) //nolint:errcheck } else { - var d blockmgrpb.Indexes - if err := proto.Unmarshal(data, &d); err != nil { + ndx, err := packindex.Open(bytes.NewReader(data)) + if err != nil { return err } - for _, ndx := range d.Indexes { - printIndex(ndx) - } + _ = ndx.Iterate("", func(l block.Info) error { + if l.Payload != nil { + fmt.Printf(" added %-40v size:%v (inline) time:%v\n", l.BlockID, len(l.Payload), l.Timestamp().Format(timeFormat)) + } else if l.Deleted { + fmt.Printf(" deleted %-40v time:%v\n", l.BlockID, l.Timestamp().Format(timeFormat)) + } else { + fmt.Printf(" added %-40v in %v offset:%-10v size:%-8v time:%v\n", l.BlockID, l.PackBlockID, l.PackOffset, l.Length, l.Timestamp().Format(timeFormat)) + } + return nil + }) } } return nil } -func printIndex(ndx *blockmgrpb.Index) { - fmt.Printf("pack:%v len:%v created:%v\n", ndx.PackBlockId, ndx.PackLength, time.Unix(0, int64(ndx.CreateTimeNanos)).Local()) - var lines []blockIndexEntryInfo - - for _, it := range ndx.Items { - if it.Deleted { - lines = append(lines, blockIndexEntryInfo{blockID: decodeIndexBlockID(it.BlockId), deleted: true}) - continue - } - if len(it.Payload) > 0 { - lines = append(lines, blockIndexEntryInfo{blockID: decodeIndexBlockID(it.BlockId), size: uint32(len(it.Payload)), inline: true}) - } else { - lines = append(lines, blockIndexEntryInfo{blockID: decodeIndexBlockID(it.BlockId), offset: uint32(it.OffsetSize >> 32), size: uint32(it.OffsetSize)}) - } - } - sortIndexBlocks(lines) - for _, l := range lines { - if l.inline { - fmt.Printf(" added %-40v size:%v (inline)\n", l.blockID, l.size) - } else if l.deleted { - fmt.Printf(" deleted %-40v\n", l.blockID) - } else { - fmt.Printf(" added %-40v offset:%-10v size:%v\n", l.blockID, l.offset, l.size) - } - } -} - -func decodeIndexBlockID(b []byte) string { - if b[0] == 0 { - return fmt.Sprintf("%x", b[1:]) - } - - return string(b[0:1]) + fmt.Sprintf("%x", b[1:]) -} - -func sortIndexBlocks(lines []blockIndexEntryInfo) { - switch *blockIndexShowSort { - case "offset": - sort.Slice(lines, func(i, j int) bool { - return lines[i].offset < lines[j].offset - }) - case "blockID": - sort.Slice(lines, func(i, j int) bool { - return lines[i].blockID < lines[j].blockID - }) - case "size": - sort.Slice(lines, func(i, j int) bool { - return lines[i].size < lines[j].size - }) - } -} - func init() { blockIndexShowCommand.Action(repositoryAction(runShowBlockIndexesAction)) } diff --git a/cli/command_block_list.go b/cli/command_block_list.go index cc915868e..449d7395b 100644 --- a/cli/command_block_list.go +++ b/cli/command_block_list.go @@ -66,7 +66,7 @@ func sortBlocks(blocks []block.Info) { case "size": sort.Slice(blocks, func(i, j int) bool { return maybeReverse(blocks[i].Length < blocks[j].Length) }) case "time": - sort.Slice(blocks, func(i, j int) bool { return maybeReverse(blocks[i].TimestampNanos < blocks[j].TimestampNanos) }) + sort.Slice(blocks, func(i, j int) bool { return maybeReverse(blocks[i].TimestampSeconds < blocks[j].TimestampSeconds) }) case "pack": sort.Slice(blocks, func(i, j int) bool { return maybeReverse(comparePacks(blocks[i], blocks[j])) }) } diff --git a/internal/blockmgrpb/block_index.pb.go b/internal/blockmgrpb/block_index.pb.go deleted file mode 100644 index a9faaa69e..000000000 --- a/internal/blockmgrpb/block_index.pb.go +++ /dev/null @@ -1,870 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: internal/blockmgrpb/block_index.proto - -/* - Package blockmgrpb is a generated protocol buffer package. - - It is generated from these files: - internal/blockmgrpb/block_index.proto - - It has these top-level messages: - Index - Indexes -*/ -package blockmgrpb - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -import binary "encoding/binary" - -import io "io" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type Index struct { - PackBlockId string `protobuf:"bytes,1,opt,name=pack_block_id,json=packBlockId,proto3" json:"pack_block_id,omitempty"` - PackLength uint32 `protobuf:"varint,2,opt,name=pack_length,json=packLength,proto3" json:"pack_length,omitempty"` - CreateTimeNanos uint64 `protobuf:"varint,3,opt,name=create_time_nanos,json=createTimeNanos,proto3" json:"create_time_nanos,omitempty"` - Items []*Index_Item `protobuf:"bytes,4,rep,name=items" json:"items,omitempty"` - FormatVersion int32 `protobuf:"varint,5,opt,name=format_version,json=formatVersion,proto3" json:"format_version,omitempty"` -} - -func (m *Index) Reset() { *m = Index{} } -func (m *Index) String() string { return proto.CompactTextString(m) } -func (*Index) ProtoMessage() {} -func (*Index) Descriptor() ([]byte, []int) { return fileDescriptorBlockIndex, []int{0} } - -func (m *Index) GetPackBlockId() string { - if m != nil { - return m.PackBlockId - } - return "" -} - -func (m *Index) GetPackLength() uint32 { - if m != nil { - return m.PackLength - } - return 0 -} - -func (m *Index) GetCreateTimeNanos() uint64 { - if m != nil { - return m.CreateTimeNanos - } - return 0 -} - -func (m *Index) GetItems() []*Index_Item { - if m != nil { - return m.Items - } - return nil -} - -func (m *Index) GetFormatVersion() int32 { - if m != nil { - return m.FormatVersion - } - return 0 -} - -type Index_Item struct { - BlockId []byte `protobuf:"bytes,1,opt,name=block_id,json=blockId,proto3" json:"block_id,omitempty"` - Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` - OffsetSize uint64 `protobuf:"fixed64,3,opt,name=offset_size,json=offsetSize,proto3" json:"offset_size,omitempty"` - Deleted bool `protobuf:"varint,4,opt,name=deleted,proto3" json:"deleted,omitempty"` -} - -func (m *Index_Item) Reset() { *m = Index_Item{} } -func (m *Index_Item) String() string { return proto.CompactTextString(m) } -func (*Index_Item) ProtoMessage() {} -func (*Index_Item) Descriptor() ([]byte, []int) { return fileDescriptorBlockIndex, []int{0, 0} } - -func (m *Index_Item) GetBlockId() []byte { - if m != nil { - return m.BlockId - } - return nil -} - -func (m *Index_Item) GetPayload() []byte { - if m != nil { - return m.Payload - } - return nil -} - -func (m *Index_Item) GetOffsetSize() uint64 { - if m != nil { - return m.OffsetSize - } - return 0 -} - -func (m *Index_Item) GetDeleted() bool { - if m != nil { - return m.Deleted - } - return false -} - -type Indexes struct { - Indexes []*Index `protobuf:"bytes,2,rep,name=indexes" json:"indexes,omitempty"` -} - -func (m *Indexes) Reset() { *m = Indexes{} } -func (m *Indexes) String() string { return proto.CompactTextString(m) } -func (*Indexes) ProtoMessage() {} -func (*Indexes) Descriptor() ([]byte, []int) { return fileDescriptorBlockIndex, []int{1} } - -func (m *Indexes) GetIndexes() []*Index { - if m != nil { - return m.Indexes - } - return nil -} - -func init() { - proto.RegisterType((*Index)(nil), "Index") - proto.RegisterType((*Index_Item)(nil), "Index.Item") - proto.RegisterType((*Indexes)(nil), "Indexes") -} -func (m *Index) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Index) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.PackBlockId) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.PackBlockId))) - i += copy(dAtA[i:], m.PackBlockId) - } - if m.PackLength != 0 { - dAtA[i] = 0x10 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(m.PackLength)) - } - if m.CreateTimeNanos != 0 { - dAtA[i] = 0x18 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(m.CreateTimeNanos)) - } - if len(m.Items) > 0 { - for _, msg := range m.Items { - dAtA[i] = 0x22 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(msg.Size())) - n, err := msg.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n - } - } - if m.FormatVersion != 0 { - dAtA[i] = 0x28 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(m.FormatVersion)) - } - return i, nil -} - -func (m *Index_Item) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Index_Item) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.BlockId) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.BlockId))) - i += copy(dAtA[i:], m.BlockId) - } - if len(m.Payload) > 0 { - dAtA[i] = 0x12 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(len(m.Payload))) - i += copy(dAtA[i:], m.Payload) - } - if m.OffsetSize != 0 { - dAtA[i] = 0x19 - i++ - binary.LittleEndian.PutUint64(dAtA[i:], uint64(m.OffsetSize)) - i += 8 - } - if m.Deleted { - dAtA[i] = 0x20 - i++ - if m.Deleted { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i++ - } - return i, nil -} - -func (m *Indexes) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Indexes) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Indexes) > 0 { - for _, msg := range m.Indexes { - dAtA[i] = 0x12 - i++ - i = encodeVarintBlockIndex(dAtA, i, uint64(msg.Size())) - n, err := msg.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n - } - } - return i, nil -} - -func encodeVarintBlockIndex(dAtA []byte, offset int, v uint64) int { - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return offset + 1 -} -func (m *Index) Size() (n int) { - var l int - _ = l - l = len(m.PackBlockId) - if l > 0 { - n += 1 + l + sovBlockIndex(uint64(l)) - } - if m.PackLength != 0 { - n += 1 + sovBlockIndex(uint64(m.PackLength)) - } - if m.CreateTimeNanos != 0 { - n += 1 + sovBlockIndex(uint64(m.CreateTimeNanos)) - } - if len(m.Items) > 0 { - for _, e := range m.Items { - l = e.Size() - n += 1 + l + sovBlockIndex(uint64(l)) - } - } - if m.FormatVersion != 0 { - n += 1 + sovBlockIndex(uint64(m.FormatVersion)) - } - return n -} - -func (m *Index_Item) Size() (n int) { - var l int - _ = l - l = len(m.BlockId) - if l > 0 { - n += 1 + l + sovBlockIndex(uint64(l)) - } - l = len(m.Payload) - if l > 0 { - n += 1 + l + sovBlockIndex(uint64(l)) - } - if m.OffsetSize != 0 { - n += 9 - } - if m.Deleted { - n += 2 - } - return n -} - -func (m *Indexes) Size() (n int) { - var l int - _ = l - if len(m.Indexes) > 0 { - for _, e := range m.Indexes { - l = e.Size() - n += 1 + l + sovBlockIndex(uint64(l)) - } - } - return n -} - -func sovBlockIndex(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n -} -func sozBlockIndex(x uint64) (n int) { - return sovBlockIndex(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *Index) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Index: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Index: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field PackBlockId", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.PackBlockId = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field PackLength", wireType) - } - m.PackLength = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.PackLength |= (uint32(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field CreateTimeNanos", wireType) - } - m.CreateTimeNanos = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.CreateTimeNanos |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Items", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Items = append(m.Items, &Index_Item{}) - if err := m.Items[len(m.Items)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 5: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field FormatVersion", wireType) - } - m.FormatVersion = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.FormatVersion |= (int32(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipBlockIndex(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthBlockIndex - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Index_Item) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Item: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Item: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BlockId", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.BlockId = append(m.BlockId[:0], dAtA[iNdEx:postIndex]...) - if m.BlockId == nil { - m.BlockId = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Payload", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Payload = append(m.Payload[:0], dAtA[iNdEx:postIndex]...) - if m.Payload == nil { - m.Payload = []byte{} - } - iNdEx = postIndex - case 3: - if wireType != 1 { - return fmt.Errorf("proto: wrong wireType = %d for field OffsetSize", wireType) - } - m.OffsetSize = 0 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF - } - m.OffsetSize = uint64(binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - case 4: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - m.Deleted = bool(v != 0) - default: - iNdEx = preIndex - skippy, err := skipBlockIndex(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthBlockIndex - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Indexes) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Indexes: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Indexes: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Indexes", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthBlockIndex - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Indexes = append(m.Indexes, &Index{}) - if err := m.Indexes[len(m.Indexes)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipBlockIndex(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthBlockIndex - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipBlockIndex(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - iNdEx += length - if length < 0 { - return 0, ErrInvalidLengthBlockIndex - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowBlockIndex - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipBlockIndex(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - ErrInvalidLengthBlockIndex = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowBlockIndex = fmt.Errorf("proto: integer overflow") -) - -func init() { proto.RegisterFile("internal/blockmgrpb/block_index.proto", fileDescriptorBlockIndex) } - -var fileDescriptorBlockIndex = []byte{ - // 334 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x4f, 0x4e, 0xeb, 0x30, - 0x10, 0xc6, 0x9f, 0xd3, 0xa4, 0xe9, 0x9b, 0xb6, 0xef, 0x4f, 0x56, 0x06, 0xa1, 0x10, 0x2a, 0x55, - 0x8a, 0x58, 0x14, 0x01, 0x37, 0xe8, 0xae, 0x08, 0xb1, 0x30, 0x88, 0x05, 0x9b, 0xc8, 0x69, 0xa6, - 0xc5, 0x6a, 0x62, 0x47, 0x89, 0x55, 0x41, 0x4f, 0xc2, 0x59, 0x38, 0x01, 0x4b, 0x8e, 0x80, 0xca, - 0x45, 0x90, 0xed, 0x56, 0x88, 0x9d, 0xbf, 0xdf, 0x78, 0xa4, 0xdf, 0xa7, 0x81, 0xb1, 0x90, 0x1a, - 0x1b, 0xc9, 0xcb, 0xb3, 0xbc, 0x54, 0xf3, 0x55, 0xb5, 0x6c, 0xea, 0xdc, 0x3d, 0x33, 0x21, 0x0b, - 0x7c, 0x9a, 0xd4, 0x8d, 0xd2, 0x6a, 0xf4, 0xea, 0x41, 0x30, 0x33, 0x39, 0x1a, 0xc1, 0xb0, 0xe6, - 0xf3, 0x55, 0xb6, 0xfb, 0x53, 0x50, 0x92, 0x90, 0xf4, 0x37, 0xeb, 0x1b, 0x38, 0x35, 0x6c, 0x56, - 0x44, 0xc7, 0x60, 0x63, 0x56, 0xa2, 0x5c, 0xea, 0x47, 0xea, 0x25, 0x24, 0x1d, 0x32, 0x30, 0xe8, - 0xda, 0x92, 0xe8, 0x14, 0xfe, 0xcf, 0x1b, 0xe4, 0x1a, 0x33, 0x2d, 0x2a, 0xcc, 0x24, 0x97, 0xaa, - 0xa5, 0x9d, 0x84, 0xa4, 0x3e, 0xfb, 0xeb, 0x06, 0x77, 0xa2, 0xc2, 0x1b, 0x83, 0xa3, 0x13, 0x08, - 0x84, 0xc6, 0xaa, 0xa5, 0x7e, 0xd2, 0x49, 0xfb, 0x17, 0xfd, 0x89, 0xf5, 0x98, 0xcc, 0x34, 0x56, - 0xcc, 0x4d, 0xa2, 0x31, 0xfc, 0x59, 0xa8, 0xa6, 0xe2, 0x3a, 0x5b, 0x63, 0xd3, 0x0a, 0x25, 0x69, - 0x90, 0x90, 0x34, 0x60, 0x43, 0x47, 0xef, 0x1d, 0x3c, 0x5c, 0x83, 0x6f, 0xb6, 0xa2, 0x03, 0xe8, - 0xfd, 0xb0, 0x1f, 0xb0, 0x30, 0xdf, 0x99, 0x53, 0x08, 0x6b, 0xfe, 0x5c, 0x2a, 0x5e, 0x58, 0xeb, - 0x01, 0xdb, 0x47, 0xd3, 0x49, 0x2d, 0x16, 0x2d, 0xea, 0xac, 0x15, 0x1b, 0xb4, 0xb2, 0x5d, 0x06, - 0x0e, 0xdd, 0x8a, 0x0d, 0x9a, 0xd5, 0x02, 0x4b, 0xd4, 0x58, 0x50, 0x3f, 0x21, 0x69, 0x8f, 0xed, - 0xe3, 0xe8, 0x1c, 0x42, 0xeb, 0x8c, 0x6d, 0x94, 0x40, 0x28, 0xdc, 0x93, 0x7a, 0xb6, 0x4e, 0xd7, - 0xd5, 0x61, 0x7b, 0x7c, 0xe5, 0xf7, 0xc8, 0x3f, 0x6f, 0x7a, 0xf4, 0xb6, 0x8d, 0xc9, 0xfb, 0x36, - 0x26, 0x1f, 0xdb, 0x98, 0xbc, 0x7c, 0xc6, 0xbf, 0x1e, 0xe0, 0xfb, 0x3e, 0x79, 0xd7, 0x1e, 0xe5, - 0xf2, 0x2b, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x64, 0x17, 0xb1, 0xbd, 0x01, 0x00, 0x00, -} diff --git a/internal/blockmgrpb/block_index.proto b/internal/blockmgrpb/block_index.proto deleted file mode 100644 index 18117f238..000000000 --- a/internal/blockmgrpb/block_index.proto +++ /dev/null @@ -1,22 +0,0 @@ -syntax = "proto3"; - -option go_package = "blockmgrpb"; - -message Index { - message Item { - bytes block_id = 1; - bytes payload = 2; - fixed64 offset_size = 3; - bool deleted = 4; - } - string pack_block_id = 1; - uint32 pack_length = 2; - uint64 create_time_nanos = 3; - repeated Item items = 4; - int32 format_version = 5; -} - -message Indexes { - reserved 1; - repeated Index indexes = 2; -} \ No newline at end of file diff --git a/internal/packindex/builder.go b/internal/packindex/builder.go new file mode 100644 index 000000000..50fa2f4ce --- /dev/null +++ b/internal/packindex/builder.go @@ -0,0 +1,163 @@ +package packindex + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "sort" +) + +// Builder prepares and writes block index for writing. +type Builder map[ContentID]*Info + +// Add adds a new entry to the builder or conditionally replaces it if the timestamp is greater. +func (b Builder) Add(i Info) { + old, ok := b[i.BlockID] + if !ok || i.TimestampSeconds >= old.TimestampSeconds { + b[i.BlockID] = &i + } +} + +func (b Builder) sortedBlocks() []*Info { + var allBlocks []*Info + + for _, v := range b { + allBlocks = append(allBlocks, v) + } + + sort.Slice(allBlocks, func(i, j int) bool { + return allBlocks[i].BlockID < allBlocks[j].BlockID + }) + + return allBlocks +} + +type indexLayout struct { + packBlockIDOffsets map[PhysicalBlockID]uint32 + payloadOffsets map[ContentID]uint32 + entryCount int + keyLength int + entryLength int + extraDataOffset uint32 +} + +// Build writes the pack index to the provided output. +func (b Builder) Build(output io.Writer) error { + allBlocks := b.sortedBlocks() + layout := &indexLayout{ + packBlockIDOffsets: map[PhysicalBlockID]uint32{}, + payloadOffsets: map[ContentID]uint32{}, + keyLength: -1, + entryLength: 20, + entryCount: len(allBlocks), + } + + w := bufio.NewWriter(output) + + // prepare extra data to be appended at the end of an index. + extraData := prepareExtraData(allBlocks, layout) + + // write header + header := make([]byte, 8) + header[0] = 1 // version + header[1] = byte(layout.keyLength) + binary.BigEndian.PutUint16(header[2:4], uint16(layout.entryLength)) + binary.BigEndian.PutUint32(header[4:8], uint32(layout.entryCount)) + if _, err := w.Write(header); err != nil { + return fmt.Errorf("unable to write header: %v", err) + } + + // write all sorted blocks. + entry := make([]byte, layout.entryLength) + for _, it := range allBlocks { + if err := writeEntry(w, it, layout, entry); err != nil { + return fmt.Errorf("unable to write entry: %v", err) + } + } + + if _, err := w.Write(extraData); err != nil { + return fmt.Errorf("error writing extra data: %v", err) + } + + return w.Flush() +} + +func prepareExtraData(allBlocks []*Info, layout *indexLayout) []byte { + var extraData []byte + + for i, it := range allBlocks { + if i == 0 { + layout.keyLength = len(contentIDToBytes(it.BlockID)) + } + if it.PackBlockID != "" { + if _, ok := layout.packBlockIDOffsets[it.PackBlockID]; !ok { + layout.packBlockIDOffsets[it.PackBlockID] = uint32(len(extraData)) + extraData = append(extraData, []byte(it.PackBlockID)...) + } + } + if len(it.Payload) > 0 { + if _, ok := layout.payloadOffsets[it.BlockID]; !ok { + layout.payloadOffsets[it.BlockID] = uint32(len(extraData)) + extraData = append(extraData, it.Payload...) + } + } + } + layout.extraDataOffset = uint32(8 + layout.entryCount*(layout.keyLength+layout.entryLength)) + return extraData +} + +func writeEntry(w io.Writer, it *Info, layout *indexLayout, entry []byte) error { + k := contentIDToBytes(it.BlockID) + if len(k) != layout.keyLength { + return fmt.Errorf("inconsistent key length: %v vs %v", len(k), layout.keyLength) + } + + if err := formatEntry(entry, it, layout); err != nil { + return fmt.Errorf("unable to format entry: %v", err) + } + + if _, err := w.Write(k); err != nil { + return fmt.Errorf("error writing entry key: %v", err) + } + if _, err := w.Write(entry); err != nil { + return fmt.Errorf("error writing entry: %v", err) + } + + return nil +} + +func formatEntry(entry []byte, it *Info, layout *indexLayout) error { + entryTimestampAndFlags := entry[0:8] + entryOffset1 := entry[8:12] + entryOffset2 := entry[12:16] + entryLength1 := entry[16:20] + timestampAndFlags := uint64(it.TimestampSeconds) << 16 + + for i := 0; i < len(entry); i++ { + entry[i] = 0 + } + if it.Deleted { + binary.BigEndian.PutUint32(entryOffset1, 0) + } else if len(it.Payload) > 0 { + binary.BigEndian.PutUint32(entryOffset1, 1) + binary.BigEndian.PutUint32(entryOffset2, layout.extraDataOffset+layout.payloadOffsets[it.BlockID]) + binary.BigEndian.PutUint32(entryLength1, uint32(len(it.Payload))) + } else { + if len(it.PackBlockID) == 0 { + return fmt.Errorf("empty pack block ID for %v", it.BlockID) + } + binary.BigEndian.PutUint32(entryOffset1, layout.extraDataOffset+layout.packBlockIDOffsets[it.PackBlockID]) + binary.BigEndian.PutUint32(entryOffset2, it.PackOffset) + binary.BigEndian.PutUint32(entryLength1, it.Length) + timestampAndFlags |= uint64(it.FormatVersion) << 8 + timestampAndFlags |= uint64(len(it.PackBlockID)) + } + binary.BigEndian.PutUint64(entryTimestampAndFlags, timestampAndFlags) + return nil +} + +// NewBuilder creates a new Builder. +func NewBuilder() Builder { + return make(map[ContentID]*Info) +} diff --git a/internal/packindex/content_id_to_bytes.go b/internal/packindex/content_id_to_bytes.go new file mode 100644 index 000000000..bc82787d2 --- /dev/null +++ b/internal/packindex/content_id_to_bytes.go @@ -0,0 +1,37 @@ +package packindex + +import ( + "encoding/hex" +) + +func bytesToContentID(b []byte) ContentID { + if len(b) == 0 { + return "" + } + if b[0] == 0xff { + return ContentID(b[1:]) + } + prefix := "" + if b[0] != 0 { + prefix = string(b[0:1]) + } + + return ContentID(prefix + hex.EncodeToString(b[1:])) +} + +func contentIDToBytes(c ContentID) []byte { + var prefix []byte + if len(c)%2 == 1 { + prefix = []byte(c[0:1]) + c = c[1:] + } else { + prefix = []byte{0} + } + + b, err := hex.DecodeString(string(c)) + if err != nil { + return append([]byte{0xff}, []byte(c)...) + } + + return append(prefix, b...) +} diff --git a/internal/packindex/format.go b/internal/packindex/format.go new file mode 100644 index 000000000..84b4eab43 --- /dev/null +++ b/internal/packindex/format.go @@ -0,0 +1,82 @@ +package packindex + +import ( + "encoding/binary" + "fmt" +) + +// Format describes a format of a single pack index. The actual structure is not used, +// it's purely for documentation purposes. +// The struct is byte-aligned. +type Format struct { + Version byte // format version number must be 0x01 + KeySize byte // size of each key in bytes + EntrySize uint16 // size of each entry in bytes, big-endian + EntryCount uint32 // number of sorted (key,value) entries that follow + + Entries []struct { + Key []byte // key bytes (KeySize) + Entry entry + } + + ExtraData []byte // extra data +} + +type entry struct { + timestampAndFlags uint64 // 48-bit timestamp in seconds since 1970/01/01 UTC, big endian (MSB) + 2 bytes of flags (LSB) + offset1 uint32 // 4 bytes, big endian + offset2 uint32 // 4 bytes, big endian + length1 uint32 // 4 bytes, big endian +} + +func (e *entry) parse(b []byte) error { + if len(b) < 20 { + return fmt.Errorf("invalid entry length: %v", len(b)) + } + + e.timestampAndFlags = binary.BigEndian.Uint64(b[0:8]) + e.offset1 = binary.BigEndian.Uint32(b[8:12]) + e.offset2 = binary.BigEndian.Uint32(b[12:16]) + e.length1 = binary.BigEndian.Uint32(b[16:20]) + return nil +} + +func (e *entry) IsDeleted() bool { + return e.offset1 == 0 +} + +func (e *entry) IsInline() bool { + return e.offset1 == 1 +} + +func (e *entry) InlineOffset() uint32 { + return e.offset2 +} + +func (e *entry) InlineLength() uint32 { + return e.length1 +} + +func (e *entry) TimestampSeconds() int64 { + return int64(e.timestampAndFlags >> 16) +} + +func (e *entry) PackedFormatVersion() byte { + return byte(e.timestampAndFlags >> 8) +} + +func (e *entry) PackBlockIDLength() byte { + return byte(e.timestampAndFlags) +} + +func (e *entry) PackBlockIDOffset() uint32 { + return e.offset1 +} + +func (e *entry) PackedOffset() uint32 { + return e.offset2 +} + +func (e *entry) PackedLength() uint32 { + return e.length1 +} diff --git a/internal/packindex/index.go b/internal/packindex/index.go new file mode 100644 index 000000000..90e3c33d2 --- /dev/null +++ b/internal/packindex/index.go @@ -0,0 +1,228 @@ +package packindex + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "sort" + "strings" + "sync" +) + +// Index is a read-only index of packed blocks. +type Index interface { + io.Closer + + EntryCount() int + GetInfo(blockID ContentID) (*Info, error) + Iterate(prefix ContentID, cb func(Info) error) error +} + +type index struct { + hdr headerInfo + mu sync.Mutex + readerAt io.ReaderAt +} + +type headerInfo struct { + keySize int + valueSize int + entryCount int +} + +func readHeader(readerAt io.ReaderAt) (headerInfo, error) { + var header [8]byte + + if n, err := readerAt.ReadAt(header[:], 0); err != nil || n != 8 { + return headerInfo{}, fmt.Errorf("invalid header: %v", err) + } + + if header[0] != 1 { + return headerInfo{}, fmt.Errorf("invalid header format: %v", header[0]) + } + + return headerInfo{ + keySize: int(header[1]), + valueSize: int(binary.BigEndian.Uint16(header[2:4])), + entryCount: int(binary.BigEndian.Uint32(header[4:8])), + }, nil +} + +// EntryCount returns the number of block entries in an index. +func (b *index) EntryCount() int { + return b.hdr.entryCount +} + +// Iterate invokes the provided callback function for all blocks in the index, sorted alphabetically. +// The iteration ends when the callback returns an error, which is propagated to the caller or when +// all blocks have been visited. +func (b *index) Iterate(prefix ContentID, cb func(Info) error) error { + b.mu.Lock() + defer b.mu.Unlock() + + startPos, err := b.findEntryPosition(prefix) + if err != nil { + return fmt.Errorf("could not find starting position: %v", err) + } + stride := b.hdr.keySize + b.hdr.valueSize + entry := make([]byte, stride) + for i := startPos; i < b.hdr.entryCount; i++ { + n, err := b.readerAt.ReadAt(entry, int64(8+stride*i)) + if err != nil || n != len(entry) { + return fmt.Errorf("unable to read from index: %v", err) + } + + key := entry[0:b.hdr.keySize] + value := entry[b.hdr.keySize:] + + i, err := b.entryToInfo(bytesToContentID(key), value) + if err != nil { + return fmt.Errorf("invalid index data: %v", err) + } + if !strings.HasPrefix(string(i.BlockID), string(prefix)) { + break + } + if err := cb(i); err != nil { + return err + } + } + return nil +} + +func (b *index) findEntryPosition(blockID ContentID) (int, error) { + stride := b.hdr.keySize + b.hdr.valueSize + entryBuf := make([]byte, stride) + var readErr error + pos := sort.Search(b.hdr.entryCount, func(p int) bool { + if readErr != nil { + return false + } + _, err := b.readerAt.ReadAt(entryBuf, int64(8+stride*p)) + if err != nil { + readErr = err + return false + } + + return bytesToContentID(entryBuf[0:b.hdr.keySize]) >= blockID + }) + + return pos, readErr +} + +func (b *index) findEntry(blockID ContentID) ([]byte, error) { + key := contentIDToBytes(blockID) + if len(key) != b.hdr.keySize { + return nil, fmt.Errorf("invalid block ID: %q", blockID) + } + stride := b.hdr.keySize + b.hdr.valueSize + + position, err := b.findEntryPosition(blockID) + if err != nil { + return nil, err + } + if position >= b.hdr.entryCount { + return nil, nil + } + + entryBuf := make([]byte, stride) + if _, err := b.readerAt.ReadAt(entryBuf, int64(8+stride*position)); err != nil { + return nil, err + } + + if bytes.Equal(entryBuf[0:len(key)], key) { + return entryBuf[len(key):], nil + } + + return nil, nil +} + +// GetInfo returns information about a given block. If a block is not found, nil is returned. +func (b *index) GetInfo(blockID ContentID) (*Info, error) { + b.mu.Lock() + defer b.mu.Unlock() + + e, err := b.findEntry(blockID) + if err != nil { + return nil, err + } + + if e == nil { + return nil, nil + } + + i, err := b.entryToInfo(blockID, e) + if err != nil { + return nil, err + } + return &i, err +} + +func (b *index) entryToInfo(blockID ContentID, entryData []byte) (Info, error) { + if len(entryData) < 20 { + return Info{}, fmt.Errorf("invalid entry length: %v", len(entryData)) + } + + var e entry + if err := e.parse(entryData); err != nil { + return Info{}, err + } + + if e.IsDeleted() { + // deleted + return Info{ + BlockID: blockID, + TimestampSeconds: e.TimestampSeconds(), + Deleted: true, + }, nil + } + + if e.IsInline() { + // inline + payload := make([]byte, e.InlineLength()) + n, err := b.readerAt.ReadAt(payload, int64(e.InlineOffset())) + if err != nil || n != int(e.InlineLength()) { + return Info{}, fmt.Errorf("can't read payload: %v", err) + } + + return Info{ + BlockID: blockID, + TimestampSeconds: e.TimestampSeconds(), + Payload: payload, + Length: e.InlineLength(), + }, nil + } + + packBlockID := make([]byte, e.PackBlockIDLength()) + n, err := b.readerAt.ReadAt(packBlockID, int64(e.PackBlockIDOffset())) + if err != nil || n != int(e.PackBlockIDLength()) { + return Info{}, fmt.Errorf("can't read pack block ID: %v", err) + } + + return Info{ + BlockID: blockID, + TimestampSeconds: e.TimestampSeconds(), + FormatVersion: e.PackedFormatVersion(), + PackOffset: e.PackedOffset(), + Length: e.PackedLength(), + PackBlockID: PhysicalBlockID(packBlockID), + }, nil +} + +// Close closes the index and the underlying reader. +func (b *index) Close() error { + if closer, ok := b.readerAt.(io.Closer); ok { + return closer.Close() + } + + return nil +} + +// Open reads an Index from a given reader. The caller must call Close() when the index is no longer used. +func Open(readerAt io.ReaderAt) (Index, error) { + h, err := readHeader(readerAt) + if err != nil { + return nil, fmt.Errorf("invalid header: %v", err) + } + return &index{hdr: h, readerAt: readerAt}, nil +} diff --git a/internal/packindex/info.go b/internal/packindex/info.go new file mode 100644 index 000000000..a58ab9043 --- /dev/null +++ b/internal/packindex/info.go @@ -0,0 +1,32 @@ +package packindex + +import ( + "encoding/json" + "time" +) + +type ContentID string + +type PhysicalBlockID string + +// Info is an information about a single block managed by Manager. +type Info struct { + BlockID ContentID `json:"blockID"` + Length uint32 `json:"length"` + TimestampSeconds int64 `json:"time"` + PackBlockID PhysicalBlockID `json:"packBlockID,omitempty"` + PackOffset uint32 `json:"packOffset,omitempty"` + Deleted bool `json:"deleted"` + Payload []byte `json:"payload"` // set for payloads stored inline + FormatVersion byte `json:"formatVersion"` +} + +// Timestamp returns the time when a block was created or deleted. +func (i Info) Timestamp() time.Time { + return time.Unix(i.TimestampSeconds, 0) +} + +func (i Info) String() string { + b, _ := json.Marshal(i) + return string(b) +} diff --git a/internal/packindex/merged.go b/internal/packindex/merged.go new file mode 100644 index 000000000..74d40be36 --- /dev/null +++ b/internal/packindex/merged.go @@ -0,0 +1,131 @@ +package packindex + +import ( + "container/heap" + "errors" +) + +// Merged is an implementation of Index that transparently merges retuns from underlying Indexes. +type Merged []Index + +// Close closes all underlying indexes. +func (m Merged) Close() error { + for _, ndx := range m { + if err := ndx.Close(); err != nil { + return err + } + } + + return nil +} + +// EntryCount returns the cumulative number of entries in all underlying indexes (not necessarily the number of unique block IDs). +func (m Merged) EntryCount() int { + cnt := 0 + for _, ndx := range m { + cnt += ndx.EntryCount() + } + return cnt +} + +// GetInfo returns information about a single block. If a block is not found, returns (nil,nil) +func (m Merged) GetInfo(contentID ContentID) (*Info, error) { + var best *Info + + for _, ndx := range m { + i, err := ndx.GetInfo(contentID) + if err != nil { + return nil, err + } + if i != nil { + if best == nil || i.TimestampSeconds > best.TimestampSeconds { + best = i + } + } + } + return best, nil +} + +type nextInfo struct { + it Info + ch <-chan Info +} + +type nextInfoHeap []*nextInfo + +func (h nextInfoHeap) Len() int { return len(h) } +func (h nextInfoHeap) Less(i, j int) bool { return h[i].it.BlockID < h[j].it.BlockID } +func (h nextInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *nextInfoHeap) Push(x interface{}) { + *h = append(*h, x.(*nextInfo)) +} +func (h *nextInfoHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func iterateChan(prefix ContentID, ndx Index, done chan bool) <-chan Info { + ch := make(chan Info) + go func() { + defer close(ch) + + _ = ndx.Iterate(prefix, func(i Info) error { + select { + case <-done: + return errors.New("end of iteration") + case ch <- i: + return nil + } + }) + }() + return ch +} + +// Iterate invokes the provided callback for all unique block IDs in the underlying sources until either +// all blocks have been visited or until an error is returned by the callback. +func (m Merged) Iterate(prefix ContentID, cb func(i Info) error) error { + var minHeap nextInfoHeap + done := make(chan bool) + defer close(done) + + for _, ndx := range m { + ch := iterateChan(prefix, ndx, done) + it, ok := <-ch + if ok { + heap.Push(&minHeap, &nextInfo{it, ch}) + } + } + + var pendingItem Info + + for len(minHeap) > 0 { + min := heap.Pop(&minHeap).(*nextInfo) + if pendingItem.BlockID != min.it.BlockID { + if pendingItem.BlockID != "" { + if err := cb(pendingItem); err != nil { + return err + } + } + + pendingItem = min.it + } else if min.it.TimestampSeconds > pendingItem.TimestampSeconds { + pendingItem = min.it + } + + it, ok := <-min.ch + if ok { + heap.Push(&minHeap, &nextInfo{it, min.ch}) + } + } + + if pendingItem.BlockID != "" { + return cb(pendingItem) + } + + return nil +} + +var _ Index = (*Merged)(nil) diff --git a/internal/packindex/merged_test.go b/internal/packindex/merged_test.go new file mode 100644 index 000000000..e4d2fa2f3 --- /dev/null +++ b/internal/packindex/merged_test.go @@ -0,0 +1,76 @@ +package packindex_test + +import ( + "bytes" + "fmt" + "reflect" + "testing" + + "github.com/kopia/kopia/internal/packindex" +) + +func TestMerged(t *testing.T) { + i1, err := indexWithItems( + packindex.Info{BlockID: "aabbcc", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 11}, + packindex.Info{BlockID: "ddeeff", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 111}, + packindex.Info{BlockID: "z010203", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 111}, + ) + if err != nil { + t.Fatalf("can't create index: %v", err) + } + i2, err := indexWithItems( + packindex.Info{BlockID: "aabbcc", TimestampSeconds: 3, PackBlockID: "yy", PackOffset: 33}, + packindex.Info{BlockID: "xaabbcc", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 111}, + ) + if err != nil { + t.Fatalf("can't create index: %v", err) + } + i3, err := indexWithItems( + packindex.Info{BlockID: "aabbcc", TimestampSeconds: 2, PackBlockID: "zz", PackOffset: 22}, + packindex.Info{BlockID: "ddeeff", TimestampSeconds: 1, PackBlockID: "zz", PackOffset: 222}, + packindex.Info{BlockID: "k010203", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 111}, + packindex.Info{BlockID: "k020304", TimestampSeconds: 1, PackBlockID: "xx", PackOffset: 111}, + ) + if err != nil { + t.Fatalf("can't create index: %v", err) + } + + m := packindex.Merged{i1, i2, i3} + i, err := m.GetInfo("aabbcc") + if err != nil || i == nil { + t.Fatalf("unable to get info: %v", err) + } + if got, want := i.PackOffset, uint32(33); got != want { + t.Errorf("invalid pack offset %v, wanted %v", got, want) + } + + var inOrder []packindex.ContentID + m.Iterate("", func(i packindex.Info) error { + inOrder = append(inOrder, i.BlockID) + return nil + }) + + expectedInOrder := []packindex.ContentID{ + "aabbcc", + "ddeeff", + "k010203", + "k020304", + "xaabbcc", + "z010203", + } + if !reflect.DeepEqual(inOrder, expectedInOrder) { + t.Errorf("unexpected items in order: %v, wanted %v", inOrder, expectedInOrder) + } +} + +func indexWithItems(items ...packindex.Info) (packindex.Index, error) { + b := packindex.NewBuilder() + for _, it := range items { + b.Add(it) + } + var buf bytes.Buffer + if err := b.Build(&buf); err != nil { + return nil, fmt.Errorf("build error: %v", err) + } + return packindex.Open(bytes.NewReader(buf.Bytes())) +} diff --git a/internal/packindex/packindex_test.go b/internal/packindex/packindex_test.go new file mode 100644 index 000000000..583192446 --- /dev/null +++ b/internal/packindex/packindex_test.go @@ -0,0 +1,163 @@ +package packindex_test + +import ( + "bytes" + "crypto/sha1" + "fmt" + "math/rand" + "reflect" + "strings" + "testing" + + "github.com/kopia/kopia/internal/packindex" +) + +func TestPackIndex(t *testing.T) { + b := packindex.NewBuilder() + + blockNumber := 0 + + deterministicBlockID := func(prefix string, id int) packindex.ContentID { + h := sha1.New() + fmt.Fprintf(h, "%v%v", prefix, id) + blockNumber++ + + prefix2 := "" + if id%2 == 0 { + prefix2 = "x" + } + if id%7 == 0 { + prefix2 = "y" + } + if id%5 == 0 { + prefix2 = "m" + } + return packindex.ContentID(fmt.Sprintf("%v%x", prefix2, h.Sum(nil))) + } + deterministicPackBlockID := func(id int) packindex.PhysicalBlockID { + h := sha1.New() + fmt.Fprintf(h, "%v", id) + blockNumber++ + return packindex.PhysicalBlockID(fmt.Sprintf("%x", h.Sum(nil))) + } + deterministicPayload := func(id int) []byte { + s := rand.NewSource(int64(id)) + rnd := rand.New(s) + length := rnd.Intn(1000) + payload := make([]byte, length) + rnd.Read(payload) + return payload + } + + deterministicPackedOffset := func(id int) uint32 { + s := rand.NewSource(int64(id + 1)) + rnd := rand.New(s) + return uint32(rnd.Int31()) + } + deterministicPackedLength := func(id int) uint32 { + s := rand.NewSource(int64(id + 2)) + rnd := rand.New(s) + return uint32(rnd.Int31()) + } + deterministicFormatVersion := func(id int) byte { + return byte(id % 100) + } + + randomUnixTime := func() int64 { + return int64(rand.Int31()) + } + + var infos []packindex.Info + for i := 0; i < 100; i++ { + infos = append(infos, packindex.Info{ + BlockID: deterministicBlockID("del", i), + TimestampSeconds: randomUnixTime(), + Deleted: true, + }) + } + for i := 0; i < 100; i++ { + p := deterministicPayload(i) + infos = append(infos, packindex.Info{ + BlockID: deterministicBlockID("inline", i), + TimestampSeconds: randomUnixTime(), + Length: uint32(len(p)), + Payload: p, + }) + } + for i := 0; i < 100; i++ { + infos = append(infos, packindex.Info{ + TimestampSeconds: randomUnixTime(), + BlockID: deterministicBlockID("packed", i), + PackBlockID: deterministicPackBlockID(i), + PackOffset: deterministicPackedOffset(i), + Length: deterministicPackedLength(i), + FormatVersion: deterministicFormatVersion(i), + }) + } + + infoMap := map[packindex.ContentID]packindex.Info{} + + for _, info := range infos { + infoMap[info.BlockID] = info + b.Add(info) + } + + var buf bytes.Buffer + if err := b.Build(&buf); err != nil { + t.Errorf("unable to build: %v", err) + } + + data := buf.Bytes() + ndx, err := packindex.Open(bytes.NewReader(data)) + if err != nil { + t.Fatalf("can't open index: %v", err) + } + for _, info := range infos { + info2, err := ndx.GetInfo(info.BlockID) + if err != nil { + t.Errorf("unable to find %v", info.BlockID) + continue + } + if !reflect.DeepEqual(info, *info2) { + t.Errorf("invalid value retrieved: %+v, wanted %+v", info2, info) + } + } + + cnt := 0 + ndx.Iterate("", func(info2 packindex.Info) error { + info := infoMap[info2.BlockID] + if !reflect.DeepEqual(info, info2) { + t.Errorf("invalid value retrieved: %+v, wanted %+v", info2, info) + } + cnt++ + return nil + }) + if cnt != len(infoMap) { + t.Errorf("invalid number of iterations: %v, wanted %v", cnt, len(infoMap)) + } + + prefixes := []string{"a", "b", "f", "0", "3", "aa", "aaa", "aab", "fff", "m", "x", "y", "m0", "ma"} + + for i := 0; i < 100; i++ { + blockID := deterministicBlockID("no-such-block", i) + v, err := ndx.GetInfo(blockID) + if err != nil { + t.Errorf("unable to get block %v: %v", blockID, err) + } + if v != nil { + t.Errorf("unexpected result when getting block %v: %v", blockID, v) + } + } + + for _, prefix := range prefixes { + cnt2 := 0 + ndx.Iterate(packindex.ContentID(prefix), func(info2 packindex.Info) error { + cnt2++ + if !strings.HasPrefix(string(info2.BlockID), string(prefix)) { + t.Errorf("unexpected item %v when iterating prefix %v", info2.BlockID, prefix) + } + return nil + }) + t.Logf("found %v elements with prefix %q", cnt2, prefix) + } +} diff --git a/repo/repository_test.go b/repo/repository_test.go index c0ce37bf2..3f148cb75 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -91,8 +91,8 @@ func TestWriters(t *testing.T) { repo.Blocks.Flush(ctx) - if got, want := len(data), 2; got != want { - // 1 format block + 1 pack index block (including inline data blocks) + if got, want := len(data), 3; got != want { + // 1 format block + 1 data block + 1 pack index block t.Errorf("unexpected data written to the storage (%v), wanted %v", got, want) dumpBlockManagerData(data) } @@ -173,8 +173,8 @@ func TestPackingSimple(t *testing.T) { t.Errorf("oid3a(%q) != oid3b(%q)", got, want) } - // format + index - if got, want := len(data), 2; got != want { + // format + index + data + if got, want := len(data), 3; got != want { t.Errorf("got unexpected repository contents %v items, wanted %v", got, want) } repo.Close(ctx)