added support for index recovery from pack files, work in progress

This commit is contained in:
Jarek Kowalski
2018-08-25 15:04:07 -07:00
parent 98aa71c02a
commit 5dd755fcec
3 changed files with 322 additions and 36 deletions

View File

@@ -0,0 +1,224 @@
package block
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"hash/crc32"
"reflect"
"github.com/kopia/kopia/internal/packindex"
)
// RecoverIndexFromPackFile attempts to recover index block entries from a given pack file.
// Pack file length may be provided (if known) to reduce the number of bytes that are read from the storage.
func (bm *Manager) RecoverIndexFromPackFile(ctx context.Context, packFile string, packFileLength int64, commit bool) ([]Info, error) {
localIndexBytes, err := bm.readPackFileLocalIndex(ctx, packFile, packFileLength)
if err != nil {
return nil, err
}
ndx, err := packindex.Open(bytes.NewReader(localIndexBytes))
if err != nil {
return nil, fmt.Errorf("unable to open index in file %v", packFile)
}
var recovered []Info
ndx.Iterate("", func(i Info) error {
log.Debugf("recovering %v", i)
recovered = append(recovered, i)
if commit {
bm.packIndexBuilder.Add(i)
}
return nil
})
return recovered, nil
}
type packBlockPostamble struct {
localIndexIV []byte
localIndexOffset uint32
localIndexLength uint32
}
func (p *packBlockPostamble) toBytes() ([]byte, error) {
// 4 varints + IV + 4 bytes of checksum + 1 byte of postamble length
n := 0
buf := make([]byte, 4*binary.MaxVarintLen64+len(p.localIndexIV)+4+1)
n += binary.PutUvarint(buf[n:], uint64(1)) // version flag
n += binary.PutUvarint(buf[n:], uint64(len(p.localIndexIV))) // length of local index IV
copy(buf[n:], p.localIndexIV)
n += len(p.localIndexIV)
n += binary.PutUvarint(buf[n:], uint64(p.localIndexOffset))
n += binary.PutUvarint(buf[n:], uint64(p.localIndexLength))
checksum := crc32.ChecksumIEEE(buf[0:n])
binary.BigEndian.PutUint32(buf[n:], checksum)
n += 4
if n > 255 {
return nil, fmt.Errorf("postamble too long: %v", n)
}
buf[n] = byte(n)
return buf[0 : n+1], nil
}
// findPostamble detects if a given block of bytes contains a possibly valid postamble, and returns it if so
// NOTE, even if this function returns a postamble, it should not be trusted to be correct, since it's not
// cryptographically signed. this is to facilitate data recovery.
func findPostamble(b []byte) *packBlockPostamble {
if len(b) == 0 {
// no postamble
return nil
}
// length of postamble is the last byte
postambleLength := int(b[len(b)-1])
if postambleLength < 5 {
// too short, must be at least 5 bytes (checksum + own length)
return nil
}
postambleStart := len(b) - 1 - postambleLength
postambleEnd := len(b) - 1
if postambleStart < 0 {
// invalid last byte
return nil
}
postambleBytes := b[postambleStart:postambleEnd]
payload, checksumBytes := postambleBytes[0:len(postambleBytes)-4], postambleBytes[len(postambleBytes)-4:]
checksum := binary.BigEndian.Uint32(checksumBytes)
validChecksum := crc32.ChecksumIEEE(payload)
if checksum != validChecksum {
// invalid checksum, not a valid postamble
return nil
}
flags, n := binary.Uvarint(payload)
if n <= 0 {
// invalid flags
return nil
}
if flags != 1 {
// unsupported flag
return nil
}
payload = payload[n:]
ivLength, n := binary.Uvarint(payload)
if n <= 0 {
// invalid flags
return nil
}
payload = payload[n:]
if ivLength > uint64(len(payload)) {
// invalid IV length
return nil
}
iv := payload[0:ivLength]
payload = payload[ivLength:]
off, n := binary.Uvarint(payload)
if n <= 0 {
// invalid offset
return nil
}
payload = payload[n:]
length, n := binary.Uvarint(payload)
if n <= 0 {
// invalid offset
return nil
}
return &packBlockPostamble{
localIndexIV: iv,
localIndexLength: uint32(length),
localIndexOffset: uint32(off),
}
}
func (bm *Manager) buildLocalIndex(pending packindex.Builder) ([]byte, error) {
var buf bytes.Buffer
if err := pending.Build(&buf); err != nil {
return nil, fmt.Errorf("unable to build local index: %v", err)
}
return buf.Bytes(), nil
}
// appendPackFileIndexRecoveryData appends data designed to help with recovery of pack index in case it gets damaged or lost.
func (bm *Manager) appendPackFileIndexRecoveryData(blockData []byte, pending packindex.Builder) ([]byte, error) {
// build, encrypt and append local index
localIndexOffset := len(blockData)
localIndex, err := bm.buildLocalIndex(pending)
if err != nil {
return nil, err
}
localIndexIV := bm.hashData(localIndex)
encryptedLocalIndex, err := bm.formatter.Encrypt(localIndex, localIndexIV)
if err != nil {
return nil, err
}
postamble := packBlockPostamble{
localIndexIV: localIndexIV,
localIndexOffset: uint32(localIndexOffset),
localIndexLength: uint32(len(localIndex)),
}
blockData = append(blockData, encryptedLocalIndex...)
postambleBytes, err := postamble.toBytes()
if err != nil {
return nil, err
}
blockData = append(blockData, postambleBytes...)
pa2 := findPostamble(blockData)
if pa2 == nil {
log.Fatalf("invalid postamble written, that could not be immediately decoded, it's a bug")
}
if !reflect.DeepEqual(postamble, *pa2) {
log.Fatalf("postamble did not round-trip: %v %v", postamble, *pa2)
}
return blockData, nil
}
func (bm *Manager) readPackFileLocalIndex(ctx context.Context, packFile string, packFileLength int64) ([]byte, error) {
payload, err := bm.st.GetBlock(ctx, packFile, 0, -1)
if err != nil {
return nil, err
}
postamble := findPostamble(payload)
if postamble == nil {
return nil, fmt.Errorf("unable to find valid postamble in file %v", packFile)
}
if uint64(postamble.localIndexOffset+postamble.localIndexLength) > uint64(len(payload)) {
// invalid offset/length
return nil, fmt.Errorf("unable to find valid local index in file %v", packFile)
}
encryptedLocalIndexBytes := payload[postamble.localIndexOffset : postamble.localIndexOffset+postamble.localIndexLength]
if encryptedLocalIndexBytes == nil {
return nil, fmt.Errorf("unable to find valid local index in file %v", packFile)
}
localIndexBytes, err := bm.decryptAndVerify(encryptedLocalIndexBytes, postamble.localIndexIV)
if err != nil {
return nil, fmt.Errorf("unable to decrypt local index: %v", err)
}
return localIndexBytes, nil
}

View File

@@ -329,59 +329,65 @@ func (bm *Manager) finishPackLocked(ctx context.Context) error {
func (bm *Manager) writePackBlockLocked(ctx context.Context) error {
bm.assertLocked()
blockData, pending, err := bm.preparePackDataBlock()
blockID := make([]byte, 16)
if _, err := cryptorand.Read(blockID); err != nil {
return fmt.Errorf("unable to read crypto bytes: %v", err)
}
packFile := fmt.Sprintf("%v%x", PackBlockPrefix, blockID)
blockData, packFileIndex, err := bm.preparePackDataBlock(packFile)
if err != nil {
return fmt.Errorf("error preparing data block: %v", err)
}
packFile, err := bm.writePackDataNotLocked(ctx, blockData)
if err != nil {
if err := bm.writePackFileNotLocked(ctx, packFile, blockData); err != nil {
return fmt.Errorf("can't save pack data block: %v", err)
}
formatLog.Debugf("wrote pack file: %v", packFile)
for _, info := range pending {
info.PackFile = packFile
bm.packIndexBuilder.Add(info)
for _, info := range packFileIndex {
bm.packIndexBuilder.Add(*info)
}
return nil
}
func (bm *Manager) preparePackDataBlock() ([]byte, map[string]Info, error) {
func (bm *Manager) preparePackDataBlock(packFile string) ([]byte, packindex.Builder, error) {
formatLog.Debugf("preparing block data with %v items", len(bm.currentPackItems))
blockData, err := appendRandomBytes(nil, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength)
if err != nil {
return nil, nil, fmt.Errorf("unable to prepare block preamble: %v", err)
}
pending := map[string]Info{}
packFileIndex := packindex.Builder{}
for blockID, info := range bm.currentPackItems {
if info.Payload == nil {
continue
}
var encrypted []byte
encrypted, err = bm.maybeEncryptBlockDataForPacking(info.Payload, info.BlockID)
encrypted, err := bm.maybeEncryptBlockDataForPacking(info.Payload, info.BlockID)
if err != nil {
return nil, nil, fmt.Errorf("unable to encrypt %q: %v", blockID, err)
}
formatLog.Debugf("adding %v length=%v deleted=%v", blockID, len(info.Payload), info.Deleted)
pending[blockID] = Info{
packFileIndex.Add(Info{
BlockID: blockID,
Deleted: info.Deleted,
FormatVersion: byte(bm.writeFormatVersion),
PackFile: packFile,
PackOffset: uint32(len(blockData)),
Length: uint32(len(info.Payload)),
TimestampSeconds: info.TimestampSeconds,
}
})
blockData = append(blockData, encrypted...)
}
if len(pending) == 0 {
if len(packFileIndex) == 0 {
return nil, nil, nil
}
@@ -393,10 +399,12 @@ func (bm *Manager) preparePackDataBlock() ([]byte, map[string]Info, error) {
}
}
}
formatLog.Debugf("finished block %v bytes", len(blockData))
return blockData, pending, nil
origBlockLength := len(blockData)
blockData, err = bm.appendPackFileIndexRecoveryData(blockData, packFileIndex)
formatLog.Debugf("finished block %v bytes (%v bytes index)", len(blockData), len(blockData)-origBlockLength)
return blockData, packFileIndex, err
}
func (bm *Manager) maybeEncryptBlockDataForPacking(data []byte, blockID string) ([]byte, error) {
@@ -768,22 +776,11 @@ func validatePrefix(prefix string) error {
return fmt.Errorf("invalid prefix, must be a empty or single letter between 'g' and 'z'")
}
func (bm *Manager) writePackDataNotLocked(ctx context.Context, data []byte) (string, error) {
blockID := make([]byte, 16)
if _, err := cryptorand.Read(blockID); err != nil {
return "", fmt.Errorf("unable to read crypto bytes: %v", err)
}
physicalBlockID := fmt.Sprintf("%v%x", PackBlockPrefix, blockID)
func (bm *Manager) writePackFileNotLocked(ctx context.Context, packFile string, data []byte) error {
atomic.AddInt32(&bm.stats.WrittenBlocks, 1)
atomic.AddInt64(&bm.stats.WrittenBytes, int64(len(data)))
bm.listCache.deleteListCache(ctx)
if err := bm.st.PutBlock(ctx, physicalBlockID, data); err != nil {
return "", err
}
return physicalBlockID, nil
return bm.st.PutBlock(ctx, packFile, data)
}
func (bm *Manager) encryptAndWriteBlockNotLocked(ctx context.Context, data []byte, prefix string) (string, error) {
@@ -933,20 +930,25 @@ func (bm *Manager) getBlockContentsUnlocked(ctx context.Context, bi Info) ([]byt
return nil, err
}
payload, err = bm.formatter.Decrypt(payload, iv)
decrypted, err := bm.decryptAndVerify(payload, iv)
if err != nil {
return nil, fmt.Errorf("invalid checksum at %v offset %v length %v: %v", bi.PackFile, bi.PackOffset, len(payload), err)
}
return decrypted, nil
}
func (bm *Manager) decryptAndVerify(encrypted []byte, iv []byte) ([]byte, error) {
decrypted, err := bm.formatter.Decrypt(encrypted, iv)
if err != nil {
return nil, err
}
atomic.AddInt64(&bm.stats.DecryptedBytes, int64(len(payload)))
atomic.AddInt64(&bm.stats.DecryptedBytes, int64(len(decrypted)))
// Since the encryption key is a function of data, we must be able to generate exactly the same key
// after decrypting the content. This serves as a checksum.
if err := bm.verifyChecksum(payload, iv); err != nil {
return nil, fmt.Errorf("invalid checksum at %v offset %v length %v: %v", bi.PackFile, bi.PackOffset, len(payload), err)
}
return payload, nil
return decrypted, bm.verifyChecksum(decrypted, iv)
}
func (bm *Manager) getPhysicalBlockInternal(ctx context.Context, blockID string) ([]byte, error) {

View File

@@ -0,0 +1,60 @@
package cli
import (
"context"
"github.com/kopia/kopia/block"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/storage"
)
var (
blockIndexRecoverCommand = blockIndexCommands.Command("recover", "Recover block indexes from pack blocks")
blockIndexRecoverPackFile = blockIndexRecoverCommand.Flag("file", "Names of pack files to recover (default=all packs)").Strings()
blockIndexRecoverCommit = blockIndexRecoverCommand.Flag("commit", "Commit recovered blocks").Bool()
)
func runRecoverBlockIndexesAction(ctx context.Context, rep *repo.Repository) error {
var totalCount int
defer func() {
if totalCount == 0 {
log.Noticef("No blocks recovered.")
return
}
if !*blockIndexRecoverCommit {
log.Noticef("Found %v blocks to recover, but not committed. Re-run with --commit", totalCount)
} else {
log.Noticef("Recovered %v blocks.", totalCount)
}
}()
if len(*blockIndexRecoverPackFile) == 0 {
return rep.Storage.ListBlocks(ctx, block.PackBlockPrefix, func(bm storage.BlockMetadata) error {
recoverIndexFromSinglePackFile(ctx, rep, bm.BlockID, bm.Length, &totalCount)
return nil
})
}
for _, packFile := range *blockIndexRecoverPackFile {
recoverIndexFromSinglePackFile(ctx, rep, packFile, 0, &totalCount)
}
return nil
}
func recoverIndexFromSinglePackFile(ctx context.Context, rep *repo.Repository, packFileName string, length int64, totalCount *int) {
recovered, err := rep.Blocks.RecoverIndexFromPackFile(ctx, packFileName, length, *blockIndexRecoverCommit)
if err != nil {
log.Warningf("unable to recover index from %v: %v", packFileName, err)
return
}
*totalCount += len(recovered)
log.Infof("Recovered %v entries from %v (commit=%v)", len(recovered), packFileName, *blockIndexRecoverCommit)
}
func init() {
blockIndexRecoverCommand.Action(repositoryAction(runRecoverBlockIndexesAction))
}