From 5dd755fcecc9e567486394ec691c55a7488eb3cf Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 25 Aug 2018 15:04:07 -0700 Subject: [PATCH] added support for index recovery from pack files, work in progress --- block/block_index_recovery.go | 224 +++++++++++++++++++++++++++++ block/block_manager.go | 74 +++++----- cli/command_block_index_recover.go | 60 ++++++++ 3 files changed, 322 insertions(+), 36 deletions(-) create mode 100644 block/block_index_recovery.go create mode 100644 cli/command_block_index_recover.go diff --git a/block/block_index_recovery.go b/block/block_index_recovery.go new file mode 100644 index 000000000..06a1839e1 --- /dev/null +++ b/block/block_index_recovery.go @@ -0,0 +1,224 @@ +package block + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "reflect" + + "github.com/kopia/kopia/internal/packindex" +) + +// RecoverIndexFromPackFile attempts to recover index block entries from a given pack file. +// Pack file length may be provided (if known) to reduce the number of bytes that are read from the storage. +func (bm *Manager) RecoverIndexFromPackFile(ctx context.Context, packFile string, packFileLength int64, commit bool) ([]Info, error) { + localIndexBytes, err := bm.readPackFileLocalIndex(ctx, packFile, packFileLength) + if err != nil { + return nil, err + } + + ndx, err := packindex.Open(bytes.NewReader(localIndexBytes)) + if err != nil { + return nil, fmt.Errorf("unable to open index in file %v", packFile) + } + + var recovered []Info + + ndx.Iterate("", func(i Info) error { + log.Debugf("recovering %v", i) + recovered = append(recovered, i) + if commit { + bm.packIndexBuilder.Add(i) + } + return nil + }) + + return recovered, nil +} + +type packBlockPostamble struct { + localIndexIV []byte + localIndexOffset uint32 + localIndexLength uint32 +} + +func (p *packBlockPostamble) toBytes() ([]byte, error) { + // 4 varints + IV + 4 bytes of checksum + 1 byte of postamble length + n := 0 + buf := make([]byte, 4*binary.MaxVarintLen64+len(p.localIndexIV)+4+1) + + n += binary.PutUvarint(buf[n:], uint64(1)) // version flag + n += binary.PutUvarint(buf[n:], uint64(len(p.localIndexIV))) // length of local index IV + copy(buf[n:], p.localIndexIV) + n += len(p.localIndexIV) + n += binary.PutUvarint(buf[n:], uint64(p.localIndexOffset)) + n += binary.PutUvarint(buf[n:], uint64(p.localIndexLength)) + + checksum := crc32.ChecksumIEEE(buf[0:n]) + binary.BigEndian.PutUint32(buf[n:], checksum) + n += 4 + if n > 255 { + return nil, fmt.Errorf("postamble too long: %v", n) + } + + buf[n] = byte(n) + return buf[0 : n+1], nil +} + +// findPostamble detects if a given block of bytes contains a possibly valid postamble, and returns it if so +// NOTE, even if this function returns a postamble, it should not be trusted to be correct, since it's not +// cryptographically signed. this is to facilitate data recovery. +func findPostamble(b []byte) *packBlockPostamble { + if len(b) == 0 { + // no postamble + return nil + } + + // length of postamble is the last byte + postambleLength := int(b[len(b)-1]) + if postambleLength < 5 { + // too short, must be at least 5 bytes (checksum + own length) + return nil + } + postambleStart := len(b) - 1 - postambleLength + postambleEnd := len(b) - 1 + if postambleStart < 0 { + // invalid last byte + return nil + } + + postambleBytes := b[postambleStart:postambleEnd] + payload, checksumBytes := postambleBytes[0:len(postambleBytes)-4], postambleBytes[len(postambleBytes)-4:] + checksum := binary.BigEndian.Uint32(checksumBytes) + validChecksum := crc32.ChecksumIEEE(payload) + + if checksum != validChecksum { + // invalid checksum, not a valid postamble + return nil + } + + flags, n := binary.Uvarint(payload) + if n <= 0 { + // invalid flags + return nil + } + if flags != 1 { + // unsupported flag + return nil + } + payload = payload[n:] + + ivLength, n := binary.Uvarint(payload) + if n <= 0 { + // invalid flags + return nil + } + payload = payload[n:] + if ivLength > uint64(len(payload)) { + // invalid IV length + return nil + } + + iv := payload[0:ivLength] + payload = payload[ivLength:] + + off, n := binary.Uvarint(payload) + if n <= 0 { + // invalid offset + return nil + } + payload = payload[n:] + + length, n := binary.Uvarint(payload) + if n <= 0 { + // invalid offset + return nil + } + + return &packBlockPostamble{ + localIndexIV: iv, + localIndexLength: uint32(length), + localIndexOffset: uint32(off), + } +} + +func (bm *Manager) buildLocalIndex(pending packindex.Builder) ([]byte, error) { + var buf bytes.Buffer + if err := pending.Build(&buf); err != nil { + return nil, fmt.Errorf("unable to build local index: %v", err) + } + + return buf.Bytes(), nil +} + +// appendPackFileIndexRecoveryData appends data designed to help with recovery of pack index in case it gets damaged or lost. +func (bm *Manager) appendPackFileIndexRecoveryData(blockData []byte, pending packindex.Builder) ([]byte, error) { + // build, encrypt and append local index + localIndexOffset := len(blockData) + localIndex, err := bm.buildLocalIndex(pending) + if err != nil { + return nil, err + } + + localIndexIV := bm.hashData(localIndex) + encryptedLocalIndex, err := bm.formatter.Encrypt(localIndex, localIndexIV) + if err != nil { + return nil, err + } + + postamble := packBlockPostamble{ + localIndexIV: localIndexIV, + localIndexOffset: uint32(localIndexOffset), + localIndexLength: uint32(len(localIndex)), + } + + blockData = append(blockData, encryptedLocalIndex...) + postambleBytes, err := postamble.toBytes() + if err != nil { + return nil, err + } + + blockData = append(blockData, postambleBytes...) + + pa2 := findPostamble(blockData) + if pa2 == nil { + log.Fatalf("invalid postamble written, that could not be immediately decoded, it's a bug") + } + + if !reflect.DeepEqual(postamble, *pa2) { + log.Fatalf("postamble did not round-trip: %v %v", postamble, *pa2) + } + + return blockData, nil +} + +func (bm *Manager) readPackFileLocalIndex(ctx context.Context, packFile string, packFileLength int64) ([]byte, error) { + payload, err := bm.st.GetBlock(ctx, packFile, 0, -1) + if err != nil { + return nil, err + } + + postamble := findPostamble(payload) + if postamble == nil { + return nil, fmt.Errorf("unable to find valid postamble in file %v", packFile) + } + + if uint64(postamble.localIndexOffset+postamble.localIndexLength) > uint64(len(payload)) { + // invalid offset/length + return nil, fmt.Errorf("unable to find valid local index in file %v", packFile) + } + + encryptedLocalIndexBytes := payload[postamble.localIndexOffset : postamble.localIndexOffset+postamble.localIndexLength] + if encryptedLocalIndexBytes == nil { + return nil, fmt.Errorf("unable to find valid local index in file %v", packFile) + } + + localIndexBytes, err := bm.decryptAndVerify(encryptedLocalIndexBytes, postamble.localIndexIV) + if err != nil { + return nil, fmt.Errorf("unable to decrypt local index: %v", err) + } + + return localIndexBytes, nil +} diff --git a/block/block_manager.go b/block/block_manager.go index 75d813f8c..2d60f8ecc 100644 --- a/block/block_manager.go +++ b/block/block_manager.go @@ -329,59 +329,65 @@ func (bm *Manager) finishPackLocked(ctx context.Context) error { func (bm *Manager) writePackBlockLocked(ctx context.Context) error { bm.assertLocked() - blockData, pending, err := bm.preparePackDataBlock() + blockID := make([]byte, 16) + if _, err := cryptorand.Read(blockID); err != nil { + return fmt.Errorf("unable to read crypto bytes: %v", err) + } + + packFile := fmt.Sprintf("%v%x", PackBlockPrefix, blockID) + + blockData, packFileIndex, err := bm.preparePackDataBlock(packFile) if err != nil { return fmt.Errorf("error preparing data block: %v", err) } - packFile, err := bm.writePackDataNotLocked(ctx, blockData) - if err != nil { + if err := bm.writePackFileNotLocked(ctx, packFile, blockData); err != nil { return fmt.Errorf("can't save pack data block: %v", err) } formatLog.Debugf("wrote pack file: %v", packFile) - - for _, info := range pending { - info.PackFile = packFile - bm.packIndexBuilder.Add(info) + for _, info := range packFileIndex { + bm.packIndexBuilder.Add(*info) } return nil } -func (bm *Manager) preparePackDataBlock() ([]byte, map[string]Info, error) { +func (bm *Manager) preparePackDataBlock(packFile string) ([]byte, packindex.Builder, error) { formatLog.Debugf("preparing block data with %v items", len(bm.currentPackItems)) + blockData, err := appendRandomBytes(nil, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength) if err != nil { return nil, nil, fmt.Errorf("unable to prepare block preamble: %v", err) } - pending := map[string]Info{} + packFileIndex := packindex.Builder{} for blockID, info := range bm.currentPackItems { if info.Payload == nil { continue } - var encrypted []byte - encrypted, err = bm.maybeEncryptBlockDataForPacking(info.Payload, info.BlockID) + + encrypted, err := bm.maybeEncryptBlockDataForPacking(info.Payload, info.BlockID) if err != nil { return nil, nil, fmt.Errorf("unable to encrypt %q: %v", blockID, err) } formatLog.Debugf("adding %v length=%v deleted=%v", blockID, len(info.Payload), info.Deleted) - pending[blockID] = Info{ + packFileIndex.Add(Info{ BlockID: blockID, Deleted: info.Deleted, FormatVersion: byte(bm.writeFormatVersion), + PackFile: packFile, PackOffset: uint32(len(blockData)), Length: uint32(len(info.Payload)), TimestampSeconds: info.TimestampSeconds, - } + }) blockData = append(blockData, encrypted...) } - if len(pending) == 0 { + if len(packFileIndex) == 0 { return nil, nil, nil } @@ -393,10 +399,12 @@ func (bm *Manager) preparePackDataBlock() ([]byte, map[string]Info, error) { } } } - formatLog.Debugf("finished block %v bytes", len(blockData)) - return blockData, pending, nil + origBlockLength := len(blockData) + blockData, err = bm.appendPackFileIndexRecoveryData(blockData, packFileIndex) + formatLog.Debugf("finished block %v bytes (%v bytes index)", len(blockData), len(blockData)-origBlockLength) + return blockData, packFileIndex, err } func (bm *Manager) maybeEncryptBlockDataForPacking(data []byte, blockID string) ([]byte, error) { @@ -768,22 +776,11 @@ func validatePrefix(prefix string) error { return fmt.Errorf("invalid prefix, must be a empty or single letter between 'g' and 'z'") } -func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (string, error) { - blockID := make([]byte, 16) - if _, err := cryptorand.Read(blockID); err != nil { - return "", fmt.Errorf("unable to read crypto bytes: %v", err) - } - - physicalBlockID := fmt.Sprintf("%v%x", PackBlockPrefix, blockID) - +func (bm *Manager) writePackFileNotLocked(ctx context.Context, packFile string, data []byte) error { atomic.AddInt32(&bm.stats.WrittenBlocks, 1) atomic.AddInt64(&bm.stats.WrittenBytes, int64(len(data))) bm.listCache.deleteListCache(ctx) - if err := bm.st.PutBlock(ctx, physicalBlockID, data); err != nil { - return "", err - } - - return physicalBlockID, nil + return bm.st.PutBlock(ctx, packFile, data) } func (bm *Manager) encryptAndWriteBlockNotLocked(ctx context.Context, data []byte, prefix string) (string, error) { @@ -933,20 +930,25 @@ func (bm *Manager) getBlockContentsUnlocked(ctx context.Context, bi Info) ([]byt return nil, err } - payload, err = bm.formatter.Decrypt(payload, iv) + decrypted, err := bm.decryptAndVerify(payload, iv) + if err != nil { + return nil, fmt.Errorf("invalid checksum at %v offset %v length %v: %v", bi.PackFile, bi.PackOffset, len(payload), err) + } + + return decrypted, nil +} + +func (bm *Manager) decryptAndVerify(encrypted []byte, iv []byte) ([]byte, error) { + decrypted, err := bm.formatter.Decrypt(encrypted, iv) if err != nil { return nil, err } - atomic.AddInt64(&bm.stats.DecryptedBytes, int64(len(payload))) + atomic.AddInt64(&bm.stats.DecryptedBytes, int64(len(decrypted))) // Since the encryption key is a function of data, we must be able to generate exactly the same key // after decrypting the content. This serves as a checksum. - if err := bm.verifyChecksum(payload, iv); err != nil { - return nil, fmt.Errorf("invalid checksum at %v offset %v length %v: %v", bi.PackFile, bi.PackOffset, len(payload), err) - } - - return payload, nil + return decrypted, bm.verifyChecksum(decrypted, iv) } func (bm *Manager) getPhysicalBlockInternal(ctx context.Context, blockID string) ([]byte, error) { diff --git a/cli/command_block_index_recover.go b/cli/command_block_index_recover.go new file mode 100644 index 000000000..c0bb49963 --- /dev/null +++ b/cli/command_block_index_recover.go @@ -0,0 +1,60 @@ +package cli + +import ( + "context" + + "github.com/kopia/kopia/block" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/storage" +) + +var ( + blockIndexRecoverCommand = blockIndexCommands.Command("recover", "Recover block indexes from pack blocks") + blockIndexRecoverPackFile = blockIndexRecoverCommand.Flag("file", "Names of pack files to recover (default=all packs)").Strings() + blockIndexRecoverCommit = blockIndexRecoverCommand.Flag("commit", "Commit recovered blocks").Bool() +) + +func runRecoverBlockIndexesAction(ctx context.Context, rep *repo.Repository) error { + var totalCount int + + defer func() { + if totalCount == 0 { + log.Noticef("No blocks recovered.") + return + } + + if !*blockIndexRecoverCommit { + log.Noticef("Found %v blocks to recover, but not committed. Re-run with --commit", totalCount) + } else { + log.Noticef("Recovered %v blocks.", totalCount) + } + }() + + if len(*blockIndexRecoverPackFile) == 0 { + return rep.Storage.ListBlocks(ctx, block.PackBlockPrefix, func(bm storage.BlockMetadata) error { + recoverIndexFromSinglePackFile(ctx, rep, bm.BlockID, bm.Length, &totalCount) + return nil + }) + } + + for _, packFile := range *blockIndexRecoverPackFile { + recoverIndexFromSinglePackFile(ctx, rep, packFile, 0, &totalCount) + } + + return nil +} + +func recoverIndexFromSinglePackFile(ctx context.Context, rep *repo.Repository, packFileName string, length int64, totalCount *int) { + recovered, err := rep.Blocks.RecoverIndexFromPackFile(ctx, packFileName, length, *blockIndexRecoverCommit) + if err != nil { + log.Warningf("unable to recover index from %v: %v", packFileName, err) + return + } + + *totalCount += len(recovered) + log.Infof("Recovered %v entries from %v (commit=%v)", len(recovered), packFileName, *blockIndexRecoverCommit) +} + +func init() { + blockIndexRecoverCommand.Action(repositoryAction(runRecoverBlockIndexesAction)) +}