changed block.PhysicalBlockID to a string

This commit is contained in:
Jarek Kowalski
2018-06-07 18:42:22 -07:00
parent 6ee5c81e21
commit fc618b0c66
16 changed files with 71 additions and 77 deletions

View File

@@ -11,11 +11,11 @@
)
type blockCache interface {
getBlock(ctx context.Context, cacheKey string, physicalBlockID PhysicalBlockID, offset, length int64) ([]byte, error)
putBlock(ctx context.Context, blockID PhysicalBlockID, data []byte) error
getBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error)
putBlock(ctx context.Context, blockID string, data []byte) error
listIndexBlocks(ctx context.Context) ([]IndexInfo, error)
deleteListCache(ctx context.Context)
deleteBlock(ctx context.Context, blockID PhysicalBlockID) error
deleteBlock(ctx context.Context, blockID string) error
close() error
}

View File

@@ -48,14 +48,11 @@
// Info is an information about a single block managed by Manager.
type Info = packindex.Info
// PhysicalBlockID identifies physical storage block.
type PhysicalBlockID = packindex.PhysicalBlockID
// IndexInfo is an information about a single index block managed by Manager.
type IndexInfo struct {
BlockID PhysicalBlockID `json:"blockID"`
Length int64 `json:"length"`
Timestamp time.Time `json:"time"`
FileName string
Length int64
Timestamp time.Time
}
// Manager manages storage blocks at a low level with encryption, deduplication and packaging.
@@ -281,7 +278,7 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error {
return nil
}
func (bm *Manager) writePackIndexesNew(ctx context.Context, data []byte) (PhysicalBlockID, error) {
func (bm *Manager) writePackIndexesNew(ctx context.Context, data []byte) (string, error) {
return bm.encryptAndWriteBlockNotLocked(ctx, data, newIndexBlockPrefix)
}
@@ -426,9 +423,9 @@ func (bm *Manager) loadPackIndexesLocked(ctx context.Context) ([]IndexInfo, erro
err = bm.tryLoadPackIndexBlocksLocked(ctx, blocks)
if err == nil {
var blockIDs []PhysicalBlockID
var blockIDs []string
for _, b := range blocks {
blockIDs = append(blockIDs, b.BlockID)
blockIDs = append(blockIDs, b.FileName)
}
if err = bm.committedBlocks.use(blockIDs); err != nil {
return nil, err
@@ -488,18 +485,18 @@ func (bm *Manager) tryLoadPackIndexBlocksLocked(ctx context.Context, blocks []In
}
// unprocessedIndexBlocks returns a closed channel filled with block IDs that are not in committedBlocks cache.
func (bm *Manager) unprocessedIndexBlocks(blocks []IndexInfo) (<-chan PhysicalBlockID, error) {
ch := make(chan PhysicalBlockID, len(blocks))
func (bm *Manager) unprocessedIndexBlocks(blocks []IndexInfo) (<-chan string, error) {
ch := make(chan string, len(blocks))
for _, block := range blocks {
has, err := bm.committedBlocks.hasIndexBlockID(block.BlockID)
has, err := bm.committedBlocks.hasIndexBlockID(block.FileName)
if err != nil {
return nil, err
}
if has {
log.Printf("index block %q already in cache, skipping", block.BlockID)
log.Printf("index block %q already in cache, skipping", block.FileName)
continue
}
ch <- block.BlockID
ch <- block.FileName
}
close(ch)
return ch, nil
@@ -631,7 +628,7 @@ func (bm *Manager) compactAndDeleteIndexBlocks(ctx context.Context, indexBlocks
bld := packindex.NewBuilder()
for _, indexBlock := range indexBlocks {
data, err := bm.getPhysicalBlockInternal(ctx, indexBlock.BlockID)
data, err := bm.getPhysicalBlockInternal(ctx, indexBlock.FileName)
if err != nil {
return err
}
@@ -660,12 +657,12 @@ func (bm *Manager) compactAndDeleteIndexBlocks(ctx context.Context, indexBlocks
log.Debug().Msgf("wrote compacted index (%v bytes) in %v", compactedIndexBlock, time.Since(t0))
for _, indexBlock := range indexBlocks {
if indexBlock.BlockID == compactedIndexBlock {
if indexBlock.FileName == compactedIndexBlock {
continue
}
if err := bm.cache.deleteBlock(ctx, indexBlock.BlockID); err != nil {
log.Warn().Msgf("unable to delete compacted block %q: %v", indexBlock.BlockID, err)
if err := bm.cache.deleteBlock(ctx, indexBlock.FileName); err != nil {
log.Warn().Msgf("unable to delete compacted block %q: %v", indexBlock.FileName, err)
}
}
@@ -747,13 +744,13 @@ func validatePrefix(prefix string) error {
return fmt.Errorf("invalid prefix, must be a empty or single letter between 'g' and 'z'")
}
func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (PhysicalBlockID, error) {
func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (string, error) {
blockID := make([]byte, 16)
if _, err := cryptorand.Read(blockID); err != nil {
return "", fmt.Errorf("unable to read crypto bytes: %v", err)
}
physicalBlockID := PhysicalBlockID(fmt.Sprintf("%v%x", PackBlockPrefix, blockID))
physicalBlockID := string(fmt.Sprintf("%v%x", PackBlockPrefix, blockID))
atomic.AddInt32(&bm.stats.WrittenBlocks, 1)
atomic.AddInt64(&bm.stats.WrittenBytes, int64(len(data)))
@@ -764,9 +761,9 @@ func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (Phy
return physicalBlockID, nil
}
func (bm *Manager) encryptAndWriteBlockNotLocked(ctx context.Context, data []byte, prefix string) (PhysicalBlockID, error) {
func (bm *Manager) encryptAndWriteBlockNotLocked(ctx context.Context, data []byte, prefix string) (string, error) {
hash := bm.hashData(data)
physicalBlockID := PhysicalBlockID(prefix + hex.EncodeToString(hash))
physicalBlockID := string(prefix + hex.EncodeToString(hash))
// Encrypt the block in-place.
atomic.AddInt64(&bm.stats.EncryptedBytes, int64(len(data)))
@@ -817,7 +814,7 @@ func (bm *Manager) GetBlock(ctx context.Context, blockID string) ([]byte, error)
}
// GetIndexBlock gets the contents of a given index block. If the block is not found returns blob.ErrBlockNotFound.
func (bm *Manager) GetIndexBlock(ctx context.Context, blockID PhysicalBlockID) ([]byte, error) {
func (bm *Manager) GetIndexBlock(ctx context.Context, blockID string) ([]byte, error) {
bm.lock()
defer bm.unlock()
@@ -877,7 +874,7 @@ func (bm *Manager) getPackedBlockInternalLocked(ctx context.Context, blockID str
return d, bi.Deleted, nil
}
func (bm *Manager) decryptAndVerifyPayload(formatVersion byte, payload []byte, offset int, blockID string, packFile PhysicalBlockID) ([]byte, error) {
func (bm *Manager) decryptAndVerifyPayload(formatVersion byte, payload []byte, offset int, blockID string, packFile string) ([]byte, error) {
atomic.AddInt32(&bm.stats.ReadBlocks, 1)
atomic.AddInt64(&bm.stats.ReadBytes, int64(len(payload)))
@@ -902,7 +899,7 @@ func (bm *Manager) decryptAndVerifyPayload(formatVersion byte, payload []byte, o
return payload, nil
}
func (bm *Manager) getPhysicalBlockInternal(ctx context.Context, blockID PhysicalBlockID) ([]byte, error) {
func (bm *Manager) getPhysicalBlockInternal(ctx context.Context, blockID string) ([]byte, error) {
payload, err := bm.cache.getBlock(ctx, string(blockID), blockID, 0, -1)
if err != nil {
return nil, err
@@ -935,7 +932,7 @@ func getPackedBlockIV(blockID string) ([]byte, error) {
return hex.DecodeString(blockID[len(blockID)-(aes.BlockSize*2):])
}
func getPhysicalBlockIV(b PhysicalBlockID) ([]byte, error) {
func getPhysicalBlockIV(b string) ([]byte, error) {
s := string(b)
if p := strings.Index(s, "-"); p >= 0 {
s = s[0:p]
@@ -996,7 +993,7 @@ func listIndexBlocksFromStorage(ctx context.Context, st storage.Storage) ([]Inde
}
ii := IndexInfo{
BlockID: PhysicalBlockID(it.BlockID),
FileName: string(it.BlockID),
Timestamp: it.TimeStamp,
Length: it.Length,
}

View File

@@ -8,14 +8,14 @@ type committedBlockIndex interface {
getBlock(blockID string) (Info, error)
listBlocks(prefix string, cb func(i Info) error) error
addBlock(indexBlockID PhysicalBlockID, indexData []byte, use bool) error
hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error)
addBlock(indexBlockID string, indexData []byte, use bool) error
hasIndexBlockID(indexBlockID string) (bool, error)
use(indexBlockIDs []PhysicalBlockID) error
use(indexBlockIDs []string) error
}
func newCommittedBlockIndex() committedBlockIndex {
return &inMemoryCommittedBlockIndex{
physicalBlocks: make(map[PhysicalBlockID]packindex.Index),
physicalBlocks: make(map[string]packindex.Index),
}
}

View File

@@ -12,7 +12,7 @@
type inMemoryCommittedBlockIndex struct {
mu sync.Mutex
indexes packindex.Merged
physicalBlocks map[PhysicalBlockID]packindex.Index
physicalBlocks map[string]packindex.Index
}
func (b *inMemoryCommittedBlockIndex) getBlock(blockID string) (Info, error) {
@@ -32,7 +32,7 @@ func (b *inMemoryCommittedBlockIndex) getBlock(blockID string) (Info, error) {
}
func (b *inMemoryCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error {
func (b *inMemoryCommittedBlockIndex) addBlock(indexBlockID string, data []byte, use bool) error {
b.mu.Lock()
defer b.mu.Unlock()
@@ -58,14 +58,14 @@ func (b *inMemoryCommittedBlockIndex) listBlocks(prefix string, cb func(i Info)
return b.indexes.Iterate(prefix, cb)
}
func (b *inMemoryCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error) {
func (b *inMemoryCommittedBlockIndex) hasIndexBlockID(indexBlockID string) (bool, error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.physicalBlocks[indexBlockID] != nil, nil
}
func (b *inMemoryCommittedBlockIndex) use(packFiles []PhysicalBlockID) error {
func (b *inMemoryCommittedBlockIndex) use(packFiles []string) error {
b.mu.Lock()
defer b.mu.Unlock()

View File

@@ -36,7 +36,7 @@ type localStorageCache struct {
closed chan struct{}
}
func (c *localStorageCache) getBlock(ctx context.Context, cacheKey string, physicalBlockID PhysicalBlockID, offset, length int64) ([]byte, error) {
func (c *localStorageCache) getBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error) {
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = c.verifyHMAC(b)
@@ -74,12 +74,12 @@ func (c *localStorageCache) writeToCacheBestEffort(ctx context.Context, cacheKey
}
}
func (c *localStorageCache) putBlock(ctx context.Context, blockID PhysicalBlockID, data []byte) error {
func (c *localStorageCache) putBlock(ctx context.Context, blockID string, data []byte) error {
c.deleteListCache(ctx)
return c.st.PutBlock(ctx, string(blockID), bytes.NewReader(data))
}
func (c *localStorageCache) deleteBlock(ctx context.Context, blockID PhysicalBlockID) error {
func (c *localStorageCache) deleteBlock(ctx context.Context, blockID string) error {
c.deleteListCache(ctx)
return c.st.DeleteBlock(ctx, string(blockID))
}

View File

@@ -11,11 +11,11 @@ type nullBlockCache struct {
st storage.Storage
}
func (c nullBlockCache) getBlock(ctx context.Context, cacheKey string, blockID PhysicalBlockID, offset, length int64) ([]byte, error) {
func (c nullBlockCache) getBlock(ctx context.Context, cacheKey string, blockID string, offset, length int64) ([]byte, error) {
return c.st.GetBlock(ctx, string(blockID), offset, length)
}
func (c nullBlockCache) putBlock(ctx context.Context, blockID PhysicalBlockID, data []byte) error {
func (c nullBlockCache) putBlock(ctx context.Context, blockID string, data []byte) error {
return c.st.PutBlock(ctx, string(blockID), bytes.NewReader(data))
}
@@ -26,7 +26,7 @@ func (c nullBlockCache) listIndexBlocks(ctx context.Context) ([]IndexInfo, error
func (c nullBlockCache) deleteListCache(ctx context.Context) {
}
func (c nullBlockCache) deleteBlock(ctx context.Context, blockID PhysicalBlockID) error {
func (c nullBlockCache) deleteBlock(ctx context.Context, blockID string) error {
return c.st.DeleteBlock(ctx, string(blockID))
}

View File

@@ -41,7 +41,7 @@ func (b *simpleCommittedBlockIndex) getBlock(blockID string) (Info, error) {
return Info{}, err
}
func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID) (bool, error) {
func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID string) (bool, error) {
b.mu.Lock()
defer b.mu.Unlock()
@@ -56,11 +56,11 @@ func (b *simpleCommittedBlockIndex) hasIndexBlockID(indexBlockID PhysicalBlockID
return false, err
}
func (b *simpleCommittedBlockIndex) indexBlockPath(indexBlockID PhysicalBlockID) string {
func (b *simpleCommittedBlockIndex) indexBlockPath(indexBlockID string) string {
return filepath.Join(b.dirname, string(indexBlockID+simpleIndexSuffix))
}
func (b *simpleCommittedBlockIndex) addBlockToCache(indexBlockID PhysicalBlockID, data []byte) error {
func (b *simpleCommittedBlockIndex) addBlockToCache(indexBlockID string, data []byte) error {
exists, err := b.hasIndexBlockID(indexBlockID)
if err != nil {
return err
@@ -99,7 +99,7 @@ func (b *simpleCommittedBlockIndex) addBlockToCache(indexBlockID PhysicalBlockID
return nil
}
func (b *simpleCommittedBlockIndex) addBlock(indexBlockID PhysicalBlockID, data []byte, use bool) error {
func (b *simpleCommittedBlockIndex) addBlock(indexBlockID string, data []byte, use bool) error {
if err := b.addBlockToCache(indexBlockID, data); err != nil {
return err
}
@@ -137,7 +137,7 @@ func (b *simpleCommittedBlockIndex) openIndex(fullpath string) (packindex.Index,
return packindex.Open(f)
}
func (b *simpleCommittedBlockIndex) use(packFiles []PhysicalBlockID) error {
func (b *simpleCommittedBlockIndex) use(packFiles []string) error {
b.mu.Lock()
defer b.mu.Unlock()

View File

@@ -42,7 +42,7 @@ func runBlockGarbageCollectAction(ctx context.Context, rep *repo.Repository) err
allPackBlocks++
u := usedPackBlocks[block.PhysicalBlockID(bi.BlockID)]
u := usedPackBlocks[string(bi.BlockID)]
if u > 0 {
log.Printf("pack %v, in use by %v blocks", bi.BlockID, u)
continue
@@ -77,8 +77,8 @@ func runBlockGarbageCollectAction(ctx context.Context, rep *repo.Repository) err
return nil
}
func findPackBlocksInUse(infos []block.Info) map[block.PhysicalBlockID]int {
packUsage := map[block.PhysicalBlockID]int{}
func findPackBlocksInUse(infos []block.Info) map[string]int {
packUsage := map[string]int{}
for _, bi := range infos {
packUsage[bi.PackFile]++

View File

@@ -20,7 +20,7 @@ func runListBlockIndexesAction(ctx context.Context, rep *repo.Repository) error
}
for _, b := range blks {
fmt.Printf("%-70v %10v %v\n", b.BlockID, b.Length, b.Timestamp.Local().Format(timeFormatPrecise))
fmt.Printf("%-70v %10v %v\n", b.FileName, b.Length, b.Timestamp.Local().Format(timeFormatPrecise))
}
if *blockIndexListSummary {

View File

@@ -19,10 +19,10 @@
blockIndexShowRaw = blockIndexShowCommand.Flag("raw", "Show raw block data").Bool()
)
func getIndexBlocksToShow(ctx context.Context, rep *repo.Repository) ([]block.PhysicalBlockID, error) {
var blockIDs []block.PhysicalBlockID
func getIndexBlocksToShow(ctx context.Context, rep *repo.Repository) ([]string, error) {
var blockIDs []string
for _, id := range *blockIndexShowIDs {
blockIDs = append(blockIDs, block.PhysicalBlockID(id))
blockIDs = append(blockIDs, string(id))
}
if len(blockIDs) == 1 && blockIDs[0] == "active" {
@@ -37,7 +37,7 @@ func getIndexBlocksToShow(ctx context.Context, rep *repo.Repository) ([]block.Ph
blockIDs = nil
for _, bi := range b {
blockIDs = append(blockIDs, bi.BlockID)
blockIDs = append(blockIDs, bi.FileName)
}
}

View File

@@ -29,7 +29,7 @@ func runListBlocksAction(ctx context.Context, rep *repo.Repository) error {
var count int
var totalSize int64
uniquePacks := map[block.PhysicalBlockID]bool{}
uniquePacks := map[string]bool{}
for _, b := range blocks {
totalSize += int64(b.Length)
count++

View File

@@ -104,14 +104,14 @@ func getBlocksToRewrite(ctx context.Context, rep *repo.Repository) ([]block.Info
return result, nil
}
func findShortPackBlocks(infos []block.Info, threshold uint32) (map[block.PhysicalBlockID]bool, error) {
packUsage := map[block.PhysicalBlockID]uint32{}
func findShortPackBlocks(infos []block.Info, threshold uint32) (map[string]bool, error) {
packUsage := map[string]uint32{}
for _, bi := range infos {
packUsage[bi.PackFile] += bi.Length
}
shortPackBlocks := map[block.PhysicalBlockID]bool{}
shortPackBlocks := map[string]bool{}
for packFile, usage := range packUsage {
if usage < threshold {

View File

@@ -34,7 +34,7 @@ func (b Builder) sortedBlocks() []*Info {
}
type indexLayout struct {
packFileOffsets map[PhysicalBlockID]uint32
packFileOffsets map[string]uint32
entryCount int
keyLength int
entryLength int
@@ -45,7 +45,7 @@ type indexLayout struct {
func (b Builder) Build(output io.Writer) error {
allBlocks := b.sortedBlocks()
layout := &indexLayout{
packFileOffsets: map[PhysicalBlockID]uint32{},
packFileOffsets: map[string]uint32{},
keyLength: -1,
entryLength: 20,
entryCount: len(allBlocks),

View File

@@ -181,7 +181,7 @@ func (b *index) entryToInfo(blockID string, entryData []byte) (Info, error) {
FormatVersion: e.PackedFormatVersion(),
PackOffset: e.PackedOffset(),
Length: e.PackedLength(),
PackFile: PhysicalBlockID(packFile),
PackFile: string(packFile),
}, nil
}

View File

@@ -5,19 +5,16 @@
"time"
)
// PhysicalBlockID is the name of a physical block in storage.
type PhysicalBlockID string
// Info is an information about a single block managed by Manager.
type Info struct {
BlockID string `json:"blockID"`
Length uint32 `json:"length"`
TimestampSeconds int64 `json:"time"`
PackFile PhysicalBlockID `json:"packFile,omitempty"`
PackOffset uint32 `json:"packOffset,omitempty"`
Deleted bool `json:"deleted"`
Payload []byte `json:"payload"` // set for payloads stored inline
FormatVersion byte `json:"formatVersion"`
BlockID string `json:"blockID"`
Length uint32 `json:"length"`
TimestampSeconds int64 `json:"time"`
PackFile string `json:"packFile,omitempty"`
PackOffset uint32 `json:"packOffset,omitempty"`
Deleted bool `json:"deleted"`
Payload []byte `json:"payload"` // set for payloads stored inline
FormatVersion byte `json:"formatVersion"`
}
// Timestamp returns the time when a block was created or deleted.

View File

@@ -33,11 +33,11 @@ func TestPackIndex(t *testing.T) {
}
return string(fmt.Sprintf("%v%x", prefix2, h.Sum(nil)))
}
deterministicPackFile := func(id int) packindex.PhysicalBlockID {
deterministicPackFile := func(id int) string {
h := sha1.New()
fmt.Fprintf(h, "%v", id)
blockNumber++
return packindex.PhysicalBlockID(fmt.Sprintf("%x", h.Sum(nil)))
return string(fmt.Sprintf("%x", h.Sum(nil)))
}
deterministicPackedOffset := func(id int) uint32 {