refactored repo/ into separate github.com/kopia/repo/ git repository

This commit is contained in:
Jarek Kowalski
2018-10-26 20:40:57 -07:00
parent a824c96271
commit 327d8317d8
165 changed files with 118 additions and 10567 deletions

View File

@@ -12,7 +12,7 @@ builds:
- arm
- arm64
ldflags:
- -s -w -X "github.com/kopia/kopia/repo.BuildVersion={{.Version}}" -X "github.com/kopia/kopia/repo.BuildInfo={{.Commit}}"
- -s -w -X "github.com/kopia/repo.BuildVersion={{.Version}}" -X "github.com/kopia/repo.BuildInfo={{.Commit}}"
archive:
replacements:
darwin: macOS

View File

@@ -70,8 +70,8 @@ integration-tests:
KOPIA_EXE=$(CURDIR)/dist/integration/kopia go test -count=1 -timeout 90s -v github.com/kopia/kopia/tests/end_to_end_test
stress-test:
KOPIA_LONG_STRESS_TEST=1 go test -count=1 -timeout 200s github.com/kopia/kopia/repo/tests/stress_test
go test -count=1 -timeout 200s github.com/kopia/kopia/repo/tests/repository_stress_test
KOPIA_LONG_STRESS_TEST=1 go test -count=1 -timeout 200s github.com/kopia/repo/tests/stress_test
go test -count=1 -timeout 200s github.com/kopia/repo/tests/repository_stress_test
godoc:
godoc -http=:33333

View File

@@ -12,7 +12,7 @@
"github.com/jpillora/go-ogle-analytics"
"github.com/kopia/kopia/internal/ospath"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
const (

View File

@@ -10,9 +10,9 @@
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/storage"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -4,7 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -4,7 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -3,9 +3,9 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/storage"
)
var (

View File

@@ -1,84 +0,0 @@
package cli
import (
"bytes"
"context"
"fmt"
"os"
"sort"
"github.com/kopia/kopia/internal/packindex"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
)
var (
blockIndexShowCommand = blockIndexCommands.Command("show", "List block indexes").Alias("cat")
blockIndexShowIDs = blockIndexShowCommand.Arg("id", "IDs of index blocks to show").Required().Strings()
blockIndexShowRaw = blockIndexShowCommand.Flag("raw", "Show raw block data").Bool()
)
func getIndexBlocksToShow(ctx context.Context, rep *repo.Repository) ([]string, error) {
var blockIDs []string
blockIDs = append(blockIDs, *blockIndexShowIDs...)
if len(blockIDs) == 1 && blockIDs[0] == "active" {
b, err := rep.Blocks.IndexBlocks(ctx)
if err != nil {
return nil, err
}
sort.Slice(b, func(i, j int) bool {
return b[i].Timestamp.Before(b[j].Timestamp)
})
blockIDs = nil
for _, bi := range b {
blockIDs = append(blockIDs, bi.FileName)
}
}
return blockIDs, nil
}
func runShowBlockIndexesAction(ctx context.Context, rep *repo.Repository) error {
blockIDs, err := getIndexBlocksToShow(ctx, rep)
if err != nil {
return err
}
for _, blockID := range blockIDs {
data, err := rep.Blocks.GetIndexBlock(ctx, blockID)
if err != nil {
return fmt.Errorf("can't read block %q: %v", blockID, err)
}
if *blockIndexShowRaw {
os.Stdout.Write(data) //nolint:errcheck
} else {
fmt.Printf("%v (%v bytes):\n", blockID, len(data))
ndx, err := packindex.Open(bytes.NewReader(data))
if err != nil {
return err
}
_ = ndx.Iterate("", func(l block.Info) error {
action := "add"
if l.Deleted {
action = "del"
}
fmt.Printf(" %v %v %v %v %v+%v\n", action, l.BlockID, formatTimestamp(l.Timestamp()), l.PackFile, l.PackOffset, l.Length)
return nil
})
}
}
return nil
}
func init() {
blockIndexShowCommand.Action(repositoryAction(runShowBlockIndexesAction))
}

View File

@@ -5,8 +5,8 @@
"fmt"
"sort"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -6,8 +6,8 @@
"strings"
"sync"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -3,7 +3,7 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -4,7 +4,7 @@
"bytes"
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -7,8 +7,8 @@
"strconv"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -4,7 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -5,7 +5,7 @@
"fmt"
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -6,7 +6,7 @@
"path/filepath"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -7,8 +7,8 @@
"strings"
"github.com/kopia/kopia/internal/diff"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
)
var (

View File

@@ -7,9 +7,9 @@
"strings"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
var (

View File

@@ -6,7 +6,7 @@
"sort"
"strings"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -3,7 +3,7 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -5,7 +5,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -7,8 +7,8 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/cachefs"
"github.com/kopia/kopia/fs/loggingfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
)
var (

View File

@@ -11,11 +11,11 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/parallelwork"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/object"
)
var (

View File

@@ -4,9 +4,9 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
func policyTargets(ctx context.Context, rep *repo.Repository, globalFlag *bool, targetsFlag *[]string) ([]snapshot.SourceInfo, error) {

View File

@@ -8,8 +8,8 @@
"strings"
"github.com/kopia/kopia/internal/editor"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
const policyEditHelpText = `

View File

@@ -5,8 +5,8 @@
"fmt"
"sort"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
var (

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
var (

View File

@@ -8,7 +8,7 @@
"strings"
"github.com/kopia/kopia/fs/ignorefs"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
)

View File

@@ -5,8 +5,8 @@
"fmt"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
var (

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
)
var (

View File

@@ -5,9 +5,9 @@
"fmt"
"time"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/storage"
"gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -8,11 +8,11 @@
"github.com/kopia/kopia/fs/ignorefs"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/object"
"github.com/kopia/repo/storage"
)
var (

View File

@@ -3,7 +3,7 @@
import (
"context"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -5,10 +5,10 @@
"fmt"
"github.com/kopia/kopia/internal/upload"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
var (

View File

@@ -10,7 +10,7 @@
"github.com/kopia/kopia/internal/scrubber"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -7,7 +7,7 @@
"time"
"github.com/kopia/kopia/internal/server"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -5,7 +5,7 @@
"io"
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -12,9 +12,9 @@
"time"
"github.com/kopia/kopia/internal/upload"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
const (

View File

@@ -9,7 +9,7 @@
"github.com/kopia/kopia/fs/ignorefs"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/units"

View File

@@ -4,9 +4,9 @@
"context"
"sort"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo"
)
var (

View File

@@ -9,11 +9,11 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
var (

View File

@@ -4,7 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -4,8 +4,8 @@
"context"
"fmt"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/repo"
"github.com/kopia/repo/storage"
)
var (

View File

@@ -7,7 +7,7 @@
"io"
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -12,7 +12,7 @@
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/fs/loggingfs"
"github.com/kopia/kopia/internal/ospath"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var (

View File

@@ -6,9 +6,9 @@
"strings"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
// ParseObjectID interprets the given ID string and returns corresponding object.ID.

View File

@@ -5,8 +5,8 @@
"os"
"strconv"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/filesystem"
"github.com/kopia/repo/storage"
"github.com/kopia/repo/storage/filesystem"
"gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/gcs"
"github.com/kopia/repo/storage"
"github.com/kopia/repo/storage/gcs"
"gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -4,7 +4,7 @@
"context"
"fmt"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/repo/storage"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/s3"
"github.com/kopia/repo/storage"
"github.com/kopia/repo/storage/s3"
"gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -3,8 +3,8 @@
import (
"context"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/webdav"
"github.com/kopia/repo/storage"
"github.com/kopia/repo/storage/webdav"
"gopkg.in/alecthomas/kingpin.v2"
)

View File

@@ -6,7 +6,7 @@
"os"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
)
var log = kopialogging.Logger("kopia/example")

View File

@@ -5,10 +5,10 @@
"fmt"
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo/storage/filesystem"
"github.com/kopia/kopia/repo/storage/logging"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/storage/filesystem"
"github.com/kopia/repo/storage/logging"
)
const (

View File

@@ -6,8 +6,8 @@
"io/ioutil"
"os"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
func uploadRandomObject(ctx context.Context, r *repo.Repository, length int) (object.ID, error) {

View File

@@ -7,7 +7,7 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/repo/object"
)
var log = kopialogging.Logger("kopia/cachefs")

View File

@@ -11,8 +11,8 @@
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
)
var log = kopialogging.Logger("diff")

View File

@@ -2,7 +2,7 @@
import (
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/repo/object"
)
// Entry represents a directory entry as stored in JSON stream.

View File

@@ -1,7 +1,7 @@
// Package hashcache implements streaming cache of file hashes.
package hashcache
import "github.com/kopia/kopia/repo/object"
import "github.com/kopia/repo/object"
var hashCacheStreamType = "kopia:hashcache"

View File

@@ -7,7 +7,6 @@
var (
userSettingsDir string
userCacheDir string
userLogsDir string
)
@@ -16,16 +15,7 @@ func ConfigDir() string {
return filepath.Join(userSettingsDir, "kopia")
}
// CacheDir returns the directory where cache data (machine-local) needs to be stored.
func CacheDir() string {
return filepath.Join(userCacheDir, "kopia")
}
// LogsDir returns the directory where per-user logs should be written.
func LogsDir() string {
if userLogsDir == "" {
return filepath.Join(CacheDir(), "logs")
}
return filepath.Join(userLogsDir, "kopia")
}

View File

@@ -7,6 +7,5 @@
func init() {
userSettingsDir = filepath.Join(os.Getenv("HOME"), "Library", "Application Support")
userCacheDir = filepath.Join(os.Getenv("HOME"), "Library", "Caches")
userLogsDir = filepath.Join(os.Getenv("HOME"), "Library", "Logs")
}

View File

@@ -6,5 +6,5 @@
func init() {
userSettingsDir = os.Getenv("APPDATA")
userCacheDir = os.Getenv("LOCALAPPDATA")
userLogsDir = os.Getenv("LOCALAPPDATA")
}

View File

@@ -1,152 +0,0 @@
package packindex
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"sort"
)
// Builder prepares and writes block index for writing.
type Builder map[string]*Info
// Add adds a new entry to the builder or conditionally replaces it if the timestamp is greater.
func (b Builder) Add(i Info) {
old, ok := b[i.BlockID]
if !ok || i.TimestampSeconds >= old.TimestampSeconds {
b[i.BlockID] = &i
}
}
func (b Builder) sortedBlocks() []*Info {
var allBlocks []*Info
for _, v := range b {
allBlocks = append(allBlocks, v)
}
sort.Slice(allBlocks, func(i, j int) bool {
return allBlocks[i].BlockID < allBlocks[j].BlockID
})
return allBlocks
}
type indexLayout struct {
packFileOffsets map[string]uint32
entryCount int
keyLength int
entryLength int
extraDataOffset uint32
}
// Build writes the pack index to the provided output.
func (b Builder) Build(output io.Writer) error {
allBlocks := b.sortedBlocks()
layout := &indexLayout{
packFileOffsets: map[string]uint32{},
keyLength: -1,
entryLength: 20,
entryCount: len(allBlocks),
}
w := bufio.NewWriter(output)
// prepare extra data to be appended at the end of an index.
extraData := prepareExtraData(allBlocks, layout)
// write header
header := make([]byte, 8)
header[0] = 1 // version
header[1] = byte(layout.keyLength)
binary.BigEndian.PutUint16(header[2:4], uint16(layout.entryLength))
binary.BigEndian.PutUint32(header[4:8], uint32(layout.entryCount))
if _, err := w.Write(header); err != nil {
return fmt.Errorf("unable to write header: %v", err)
}
// write all sorted blocks.
entry := make([]byte, layout.entryLength)
for _, it := range allBlocks {
if err := writeEntry(w, it, layout, entry); err != nil {
return fmt.Errorf("unable to write entry: %v", err)
}
}
if _, err := w.Write(extraData); err != nil {
return fmt.Errorf("error writing extra data: %v", err)
}
return w.Flush()
}
func prepareExtraData(allBlocks []*Info, layout *indexLayout) []byte {
var extraData []byte
for i, it := range allBlocks {
if i == 0 {
layout.keyLength = len(contentIDToBytes(it.BlockID))
}
if it.PackFile != "" {
if _, ok := layout.packFileOffsets[it.PackFile]; !ok {
layout.packFileOffsets[it.PackFile] = uint32(len(extraData))
extraData = append(extraData, []byte(it.PackFile)...)
}
}
if len(it.Payload) > 0 {
panic("storing payloads in indexes is not supported")
}
}
layout.extraDataOffset = uint32(8 + layout.entryCount*(layout.keyLength+layout.entryLength))
return extraData
}
func writeEntry(w io.Writer, it *Info, layout *indexLayout, entry []byte) error {
k := contentIDToBytes(it.BlockID)
if len(k) != layout.keyLength {
return fmt.Errorf("inconsistent key length: %v vs %v", len(k), layout.keyLength)
}
if err := formatEntry(entry, it, layout); err != nil {
return fmt.Errorf("unable to format entry: %v", err)
}
if _, err := w.Write(k); err != nil {
return fmt.Errorf("error writing entry key: %v", err)
}
if _, err := w.Write(entry); err != nil {
return fmt.Errorf("error writing entry: %v", err)
}
return nil
}
func formatEntry(entry []byte, it *Info, layout *indexLayout) error {
entryTimestampAndFlags := entry[0:8]
entryPackFileOffset := entry[8:12]
entryPackedOffset := entry[12:16]
entryPackedLength := entry[16:20]
timestampAndFlags := uint64(it.TimestampSeconds) << 16
if len(it.PackFile) == 0 {
return fmt.Errorf("empty pack block ID for %v", it.BlockID)
}
binary.BigEndian.PutUint32(entryPackFileOffset, layout.extraDataOffset+layout.packFileOffsets[it.PackFile])
if it.Deleted {
binary.BigEndian.PutUint32(entryPackedOffset, it.PackOffset|0x80000000)
} else {
binary.BigEndian.PutUint32(entryPackedOffset, it.PackOffset)
}
binary.BigEndian.PutUint32(entryPackedLength, it.Length)
timestampAndFlags |= uint64(it.FormatVersion) << 8
timestampAndFlags |= uint64(len(it.PackFile))
binary.BigEndian.PutUint64(entryTimestampAndFlags, timestampAndFlags)
return nil
}
// NewBuilder creates a new Builder.
func NewBuilder() Builder {
return make(map[string]*Info)
}

View File

@@ -1,38 +0,0 @@
package packindex
import (
"encoding/hex"
)
func bytesToContentID(b []byte) string {
if len(b) == 0 {
return ""
}
if b[0] == 0xff {
return string(b[1:])
}
prefix := ""
if b[0] != 0 {
prefix = string(b[0:1])
}
return prefix + hex.EncodeToString(b[1:])
}
func contentIDToBytes(c string) []byte {
var prefix []byte
var skip int
if len(c)%2 == 1 {
prefix = []byte(c[0:1])
skip = 1
} else {
prefix = []byte{0}
}
b, err := hex.DecodeString(c[skip:])
if err != nil {
return append([]byte{0xff}, []byte(c)...)
}
return append(prefix, b...)
}

View File

@@ -1,74 +0,0 @@
package packindex
import (
"encoding/binary"
"fmt"
)
// Format describes a format of a single pack index. The actual structure is not used,
// it's purely for documentation purposes.
// The struct is byte-aligned.
type Format struct {
Version byte // format version number must be 0x01
KeySize byte // size of each key in bytes
EntrySize uint16 // size of each entry in bytes, big-endian
EntryCount uint32 // number of sorted (key,value) entries that follow
Entries []struct {
Key []byte // key bytes (KeySize)
Entry entry
}
ExtraData []byte // extra data
}
type entry struct {
// big endian:
// 48 most significant bits - 48-bit timestamp in seconds since 1970/01/01 UTC
// 8 bits - format version (currently == 1)
// 8 least significant bits - length of pack block ID
timestampAndFlags uint64 //
packFileOffset uint32 // 4 bytes, big endian, offset within index file where pack block ID begins
packedOffset uint32 // 4 bytes, big endian, offset within pack file where the contents begin
packedLength uint32 // 4 bytes, big endian, content length
}
func (e *entry) parse(b []byte) error {
if len(b) < 20 {
return fmt.Errorf("invalid entry length: %v", len(b))
}
e.timestampAndFlags = binary.BigEndian.Uint64(b[0:8])
e.packFileOffset = binary.BigEndian.Uint32(b[8:12])
e.packedOffset = binary.BigEndian.Uint32(b[12:16])
e.packedLength = binary.BigEndian.Uint32(b[16:20])
return nil
}
func (e *entry) IsDeleted() bool {
return e.packedOffset&0x80000000 != 0
}
func (e *entry) TimestampSeconds() int64 {
return int64(e.timestampAndFlags >> 16)
}
func (e *entry) PackedFormatVersion() byte {
return byte(e.timestampAndFlags >> 8)
}
func (e *entry) PackFileLength() byte {
return byte(e.timestampAndFlags)
}
func (e *entry) PackFileOffset() uint32 {
return e.packFileOffset
}
func (e *entry) PackedOffset() uint32 {
return e.packedOffset & 0x7fffffff
}
func (e *entry) PackedLength() uint32 {
return e.packedLength
}

View File

@@ -1,196 +0,0 @@
package packindex
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sort"
"strings"
)
// Index is a read-only index of packed blocks.
type Index interface {
io.Closer
GetInfo(blockID string) (*Info, error)
Iterate(prefix string, cb func(Info) error) error
}
type index struct {
hdr headerInfo
readerAt io.ReaderAt
}
type headerInfo struct {
keySize int
valueSize int
entryCount int
}
func readHeader(readerAt io.ReaderAt) (headerInfo, error) {
var header [8]byte
if n, err := readerAt.ReadAt(header[:], 0); err != nil || n != 8 {
return headerInfo{}, fmt.Errorf("invalid header: %v", err)
}
if header[0] != 1 {
return headerInfo{}, fmt.Errorf("invalid header format: %v", header[0])
}
hi := headerInfo{
keySize: int(header[1]),
valueSize: int(binary.BigEndian.Uint16(header[2:4])),
entryCount: int(binary.BigEndian.Uint32(header[4:8])),
}
if hi.keySize <= 1 || hi.valueSize < 0 || hi.entryCount < 0 {
return headerInfo{}, fmt.Errorf("invalid header")
}
return hi, nil
}
// Iterate invokes the provided callback function for all blocks in the index, sorted alphabetically.
// The iteration ends when the callback returns an error, which is propagated to the caller or when
// all blocks have been visited.
func (b *index) Iterate(prefix string, cb func(Info) error) error {
startPos, err := b.findEntryPosition(prefix)
if err != nil {
return fmt.Errorf("could not find starting position: %v", err)
}
stride := b.hdr.keySize + b.hdr.valueSize
entry := make([]byte, stride)
for i := startPos; i < b.hdr.entryCount; i++ {
n, err := b.readerAt.ReadAt(entry, int64(8+stride*i))
if err != nil || n != len(entry) {
return fmt.Errorf("unable to read from index: %v", err)
}
key := entry[0:b.hdr.keySize]
value := entry[b.hdr.keySize:]
i, err := b.entryToInfo(bytesToContentID(key), value)
if err != nil {
return fmt.Errorf("invalid index data: %v", err)
}
if !strings.HasPrefix(i.BlockID, prefix) {
break
}
if err := cb(i); err != nil {
return err
}
}
return nil
}
func (b *index) findEntryPosition(blockID string) (int, error) {
stride := b.hdr.keySize + b.hdr.valueSize
entryBuf := make([]byte, stride)
var readErr error
pos := sort.Search(b.hdr.entryCount, func(p int) bool {
if readErr != nil {
return false
}
_, err := b.readerAt.ReadAt(entryBuf, int64(8+stride*p))
if err != nil {
readErr = err
return false
}
return bytesToContentID(entryBuf[0:b.hdr.keySize]) >= blockID
})
return pos, readErr
}
func (b *index) findEntry(blockID string) ([]byte, error) {
key := contentIDToBytes(blockID)
if len(key) != b.hdr.keySize {
return nil, fmt.Errorf("invalid block ID: %q", blockID)
}
stride := b.hdr.keySize + b.hdr.valueSize
position, err := b.findEntryPosition(blockID)
if err != nil {
return nil, err
}
if position >= b.hdr.entryCount {
return nil, nil
}
entryBuf := make([]byte, stride)
if _, err := b.readerAt.ReadAt(entryBuf, int64(8+stride*position)); err != nil {
return nil, err
}
if bytes.Equal(entryBuf[0:len(key)], key) {
return entryBuf[len(key):], nil
}
return nil, nil
}
// GetInfo returns information about a given block. If a block is not found, nil is returned.
func (b *index) GetInfo(blockID string) (*Info, error) {
e, err := b.findEntry(blockID)
if err != nil {
return nil, err
}
if e == nil {
return nil, nil
}
i, err := b.entryToInfo(blockID, e)
if err != nil {
return nil, err
}
return &i, err
}
func (b *index) entryToInfo(blockID string, entryData []byte) (Info, error) {
if len(entryData) < 20 {
return Info{}, fmt.Errorf("invalid entry length: %v", len(entryData))
}
var e entry
if err := e.parse(entryData); err != nil {
return Info{}, err
}
packFile := make([]byte, e.PackFileLength())
n, err := b.readerAt.ReadAt(packFile, int64(e.PackFileOffset()))
if err != nil || n != int(e.PackFileLength()) {
return Info{}, fmt.Errorf("can't read pack block ID: %v", err)
}
return Info{
BlockID: blockID,
Deleted: e.IsDeleted(),
TimestampSeconds: e.TimestampSeconds(),
FormatVersion: e.PackedFormatVersion(),
PackOffset: e.PackedOffset(),
Length: e.PackedLength(),
PackFile: string(packFile),
}, nil
}
// Close closes the index and the underlying reader.
func (b *index) Close() error {
if closer, ok := b.readerAt.(io.Closer); ok {
return closer.Close()
}
return nil
}
// Open reads an Index from a given reader. The caller must call Close() when the index is no longer used.
func Open(readerAt io.ReaderAt) (Index, error) {
h, err := readHeader(readerAt)
if err != nil {
return nil, fmt.Errorf("invalid header: %v", err)
}
return &index{hdr: h, readerAt: readerAt}, nil
}

View File

@@ -1,22 +0,0 @@
package packindex
import (
"time"
)
// Info is an information about a single block managed by Manager.
type Info struct {
BlockID string `json:"blockID"`
Length uint32 `json:"length"`
TimestampSeconds int64 `json:"time"`
PackFile string `json:"packFile,omitempty"`
PackOffset uint32 `json:"packOffset,omitempty"`
Deleted bool `json:"deleted"`
Payload []byte `json:"payload"` // set for payloads stored inline
FormatVersion byte `json:"formatVersion"`
}
// Timestamp returns the time when a block was created or deleted.
func (i Info) Timestamp() time.Time {
return time.Unix(i.TimestampSeconds, 0)
}

View File

@@ -1,132 +0,0 @@
package packindex
import (
"container/heap"
"errors"
)
// Merged is an implementation of Index that transparently merges retuns from underlying Indexes.
type Merged []Index
// Close closes all underlying indexes.
func (m Merged) Close() error {
for _, ndx := range m {
if err := ndx.Close(); err != nil {
return err
}
}
return nil
}
// GetInfo returns information about a single block. If a block is not found, returns (nil,nil)
func (m Merged) GetInfo(contentID string) (*Info, error) {
var best *Info
for _, ndx := range m {
i, err := ndx.GetInfo(contentID)
if err != nil {
return nil, err
}
if i != nil {
if best == nil || i.TimestampSeconds > best.TimestampSeconds || (i.TimestampSeconds == best.TimestampSeconds && !i.Deleted) {
best = i
}
}
}
return best, nil
}
type nextInfo struct {
it Info
ch <-chan Info
}
type nextInfoHeap []*nextInfo
func (h nextInfoHeap) Len() int { return len(h) }
func (h nextInfoHeap) Less(i, j int) bool {
if a, b := h[i].it.BlockID, h[j].it.BlockID; a != b {
return a < b
}
if a, b := h[i].it.TimestampSeconds, h[j].it.TimestampSeconds; a != b {
return a < b
}
return !h[i].it.Deleted
}
func (h nextInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *nextInfoHeap) Push(x interface{}) {
*h = append(*h, x.(*nextInfo))
}
func (h *nextInfoHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
func iterateChan(prefix string, ndx Index, done chan bool) <-chan Info {
ch := make(chan Info)
go func() {
defer close(ch)
_ = ndx.Iterate(prefix, func(i Info) error {
select {
case <-done:
return errors.New("end of iteration")
case ch <- i:
return nil
}
})
}()
return ch
}
// Iterate invokes the provided callback for all unique block IDs in the underlying sources until either
// all blocks have been visited or until an error is returned by the callback.
func (m Merged) Iterate(prefix string, cb func(i Info) error) error {
var minHeap nextInfoHeap
done := make(chan bool)
defer close(done)
for _, ndx := range m {
ch := iterateChan(prefix, ndx, done)
it, ok := <-ch
if ok {
heap.Push(&minHeap, &nextInfo{it, ch})
}
}
var pendingItem Info
for len(minHeap) > 0 {
min := heap.Pop(&minHeap).(*nextInfo)
if pendingItem.BlockID != min.it.BlockID {
if pendingItem.BlockID != "" {
if err := cb(pendingItem); err != nil {
return err
}
}
pendingItem = min.it
} else if min.it.TimestampSeconds > pendingItem.TimestampSeconds {
pendingItem = min.it
}
it, ok := <-min.ch
if ok {
heap.Push(&minHeap, &nextInfo{it, min.ch})
}
}
if pendingItem.BlockID != "" {
return cb(pendingItem)
}
return nil
}
var _ Index = (*Merged)(nil)

View File

@@ -1,94 +0,0 @@
package packindex_test
import (
"bytes"
"fmt"
"reflect"
"testing"
"github.com/kopia/kopia/internal/packindex"
)
func TestMerged(t *testing.T) {
i1, err := indexWithItems(
packindex.Info{BlockID: "aabbcc", TimestampSeconds: 1, PackFile: "xx", PackOffset: 11},
packindex.Info{BlockID: "ddeeff", TimestampSeconds: 1, PackFile: "xx", PackOffset: 111},
packindex.Info{BlockID: "z010203", TimestampSeconds: 1, PackFile: "xx", PackOffset: 111},
packindex.Info{BlockID: "de1e1e", TimestampSeconds: 4, PackFile: "xx", PackOffset: 111},
)
if err != nil {
t.Fatalf("can't create index: %v", err)
}
i2, err := indexWithItems(
packindex.Info{BlockID: "aabbcc", TimestampSeconds: 3, PackFile: "yy", PackOffset: 33},
packindex.Info{BlockID: "xaabbcc", TimestampSeconds: 1, PackFile: "xx", PackOffset: 111},
packindex.Info{BlockID: "de1e1e", TimestampSeconds: 4, PackFile: "xx", PackOffset: 222, Deleted: true},
)
if err != nil {
t.Fatalf("can't create index: %v", err)
}
i3, err := indexWithItems(
packindex.Info{BlockID: "aabbcc", TimestampSeconds: 2, PackFile: "zz", PackOffset: 22},
packindex.Info{BlockID: "ddeeff", TimestampSeconds: 1, PackFile: "zz", PackOffset: 222},
packindex.Info{BlockID: "k010203", TimestampSeconds: 1, PackFile: "xx", PackOffset: 111},
packindex.Info{BlockID: "k020304", TimestampSeconds: 1, PackFile: "xx", PackOffset: 111},
)
if err != nil {
t.Fatalf("can't create index: %v", err)
}
m := packindex.Merged{i1, i2, i3}
i, err := m.GetInfo("aabbcc")
if err != nil || i == nil {
t.Fatalf("unable to get info: %v", err)
}
if got, want := i.PackOffset, uint32(33); got != want {
t.Errorf("invalid pack offset %v, wanted %v", got, want)
}
var inOrder []string
m.Iterate("", func(i packindex.Info) error {
inOrder = append(inOrder, i.BlockID)
if i.BlockID == "de1e1e" {
if i.Deleted {
t.Errorf("iteration preferred deleted block over non-deleted")
}
}
return nil
})
if i, err := m.GetInfo("de1e1e"); err != nil {
t.Errorf("error getting deleted block info: %v", err)
} else if i.Deleted {
t.Errorf("GetInfo preferred deleted block over non-deleted")
}
expectedInOrder := []string{
"aabbcc",
"ddeeff",
"de1e1e",
"k010203",
"k020304",
"xaabbcc",
"z010203",
}
if !reflect.DeepEqual(inOrder, expectedInOrder) {
t.Errorf("unexpected items in order: %v, wanted %v", inOrder, expectedInOrder)
}
if err := m.Close(); err != nil {
t.Errorf("unexpected error in Close(): %v", err)
}
}
func indexWithItems(items ...packindex.Info) (packindex.Index, error) {
b := packindex.NewBuilder()
for _, it := range items {
b.Add(it)
}
var buf bytes.Buffer
if err := b.Build(&buf); err != nil {
return nil, fmt.Errorf("build error: %v", err)
}
return packindex.Open(bytes.NewReader(buf.Bytes()))
}

View File

@@ -1,26 +0,0 @@
package packindex
import "testing"
func TestRoundTrip(t *testing.T) {
cases := []string{
"",
"x",
"aa",
"xaa",
"xaaa",
"a1x",
}
for _, tc := range cases {
b := contentIDToBytes(tc)
got := bytesToContentID(b)
if got != tc {
t.Errorf("%q did not round trip, got %q, wanted %q", tc, got, tc)
}
}
if got, want := bytesToContentID(nil), ""; got != want {
t.Errorf("unexpected content id %v, want %v", got, want)
}
}

View File

@@ -1,237 +0,0 @@
package packindex_test
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"fmt"
"math/rand"
"reflect"
"strings"
"testing"
"github.com/kopia/kopia/internal/packindex"
)
func TestPackIndex(t *testing.T) {
blockNumber := 0
deterministicBlockID := func(prefix string, id int) string {
h := sha1.New()
fmt.Fprintf(h, "%v%v", prefix, id)
blockNumber++
prefix2 := ""
if id%2 == 0 {
prefix2 = "x"
}
if id%7 == 0 {
prefix2 = "y"
}
if id%5 == 0 {
prefix2 = "m"
}
return string(fmt.Sprintf("%v%x", prefix2, h.Sum(nil)))
}
deterministicPackFile := func(id int) string {
h := sha1.New()
fmt.Fprintf(h, "%v", id)
blockNumber++
return string(fmt.Sprintf("%x", h.Sum(nil)))
}
deterministicPackedOffset := func(id int) uint32 {
s := rand.NewSource(int64(id + 1))
rnd := rand.New(s)
return uint32(rnd.Int31())
}
deterministicPackedLength := func(id int) uint32 {
s := rand.NewSource(int64(id + 2))
rnd := rand.New(s)
return uint32(rnd.Int31())
}
deterministicFormatVersion := func(id int) byte {
return byte(id % 100)
}
randomUnixTime := func() int64 {
return int64(rand.Int31())
}
var infos []packindex.Info
// deleted blocks with all information
for i := 0; i < 100; i++ {
infos = append(infos, packindex.Info{
TimestampSeconds: randomUnixTime(),
Deleted: true,
BlockID: deterministicBlockID("deleted-packed", i),
PackFile: deterministicPackFile(i),
PackOffset: deterministicPackedOffset(i),
Length: deterministicPackedLength(i),
FormatVersion: deterministicFormatVersion(i),
})
}
// non-deleted block
for i := 0; i < 100; i++ {
infos = append(infos, packindex.Info{
TimestampSeconds: randomUnixTime(),
BlockID: deterministicBlockID("packed", i),
PackFile: deterministicPackFile(i),
PackOffset: deterministicPackedOffset(i),
Length: deterministicPackedLength(i),
FormatVersion: deterministicFormatVersion(i),
})
}
infoMap := map[string]packindex.Info{}
b1 := packindex.NewBuilder()
b2 := packindex.NewBuilder()
b3 := packindex.NewBuilder()
for _, info := range infos {
infoMap[info.BlockID] = info
b1.Add(info)
b2.Add(info)
b3.Add(info)
}
var buf1 bytes.Buffer
var buf2 bytes.Buffer
var buf3 bytes.Buffer
if err := b1.Build(&buf1); err != nil {
t.Errorf("unable to build: %v", err)
}
if err := b1.Build(&buf2); err != nil {
t.Errorf("unable to build: %v", err)
}
if err := b1.Build(&buf3); err != nil {
t.Errorf("unable to build: %v", err)
}
data1 := buf1.Bytes()
data2 := buf2.Bytes()
data3 := buf3.Bytes()
if !reflect.DeepEqual(data1, data2) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data1), hex.Dump(data2))
}
if !reflect.DeepEqual(data2, data3) {
t.Errorf("builder output not stable: %x vs %x", hex.Dump(data2), hex.Dump(data3))
}
t.Run("FuzzTest", func(t *testing.T) {
fuzzTestIndexOpen(t, data1)
})
ndx, err := packindex.Open(bytes.NewReader(data1))
if err != nil {
t.Fatalf("can't open index: %v", err)
}
defer ndx.Close()
for _, info := range infos {
info2, err := ndx.GetInfo(info.BlockID)
if err != nil {
t.Errorf("unable to find %v", info.BlockID)
continue
}
if !reflect.DeepEqual(info, *info2) {
t.Errorf("invalid value retrieved: %+v, wanted %+v", info2, info)
}
}
cnt := 0
ndx.Iterate("", func(info2 packindex.Info) error {
info := infoMap[info2.BlockID]
if !reflect.DeepEqual(info, info2) {
t.Errorf("invalid value retrieved: %+v, wanted %+v", info2, info)
}
cnt++
return nil
})
if cnt != len(infoMap) {
t.Errorf("invalid number of iterations: %v, wanted %v", cnt, len(infoMap))
}
prefixes := []string{"a", "b", "f", "0", "3", "aa", "aaa", "aab", "fff", "m", "x", "y", "m0", "ma"}
for i := 0; i < 100; i++ {
blockID := deterministicBlockID("no-such-block", i)
v, err := ndx.GetInfo(blockID)
if err != nil {
t.Errorf("unable to get block %v: %v", blockID, err)
}
if v != nil {
t.Errorf("unexpected result when getting block %v: %v", blockID, v)
}
}
for _, prefix := range prefixes {
cnt2 := 0
ndx.Iterate(string(prefix), func(info2 packindex.Info) error {
cnt2++
if !strings.HasPrefix(string(info2.BlockID), string(prefix)) {
t.Errorf("unexpected item %v when iterating prefix %v", info2.BlockID, prefix)
}
return nil
})
t.Logf("found %v elements with prefix %q", cnt2, prefix)
}
}
func fuzzTestIndexOpen(t *testing.T, originalData []byte) {
// use consistent random
rnd := rand.New(rand.NewSource(12345))
fuzzTest(rnd, originalData, 50000, func(d []byte) {
ndx, err := packindex.Open(bytes.NewReader(d))
if err != nil {
return
}
defer ndx.Close()
cnt := 0
ndx.Iterate("", func(cb packindex.Info) error {
if cnt < 10 {
ndx.GetInfo(cb.BlockID)
}
cnt++
return nil
})
})
}
func fuzzTest(rnd *rand.Rand, originalData []byte, rounds int, callback func(d []byte)) {
for round := 0; round < rounds; round++ {
data := append([]byte(nil), originalData...)
// mutate small number of bytes
bytesToMutate := rnd.Intn(3)
for i := 0; i < bytesToMutate; i++ {
pos := rnd.Intn(len(data))
data[pos] = byte(rnd.Int())
}
sectionsToInsert := rnd.Intn(3)
for i := 0; i < sectionsToInsert; i++ {
pos := rnd.Intn(len(data))
insertedLength := rnd.Intn(20)
insertedData := make([]byte, insertedLength)
rnd.Read(insertedData)
data = append(append(append([]byte(nil), data[0:pos]...), insertedData...), data[pos:]...)
}
sectionsToDelete := rnd.Intn(3)
for i := 0; i < sectionsToDelete; i++ {
pos := rnd.Intn(len(data))
deletedLength := rnd.Intn(10)
if pos+deletedLength > len(data) {
continue
}
data = append(append([]byte(nil), data[0:pos]...), data[pos+deletedLength:]...)
}
callback(data)
}
}

View File

@@ -1,28 +0,0 @@
package packindex
// IsSubset returns true if all entries in index 'a' are contained in index 'b'.
func IsSubset(a, b Index) bool {
done := make(chan bool)
defer close(done)
ach := iterateChan("", a, done)
bch := iterateChan("", b, done)
for ait := range ach {
bit, ok := <-bch
if !ok {
return false
}
for bit.BlockID < ait.BlockID {
bit, ok = <-bch
if !ok {
return false
}
}
if bit.BlockID != ait.BlockID {
return false
}
}
return true
}

View File

@@ -1,60 +0,0 @@
package packindex_test
import (
"bytes"
"fmt"
"testing"
"github.com/kopia/kopia/internal/packindex"
)
func TestSubset(t *testing.T) {
cases := []struct {
aBlocks, bBlocks []string
want bool
}{
{[]string{}, []string{"aa"}, true},
{[]string{}, []string{"aa", "bb"}, true},
{[]string{"aa"}, []string{"aa"}, true},
{[]string{"aa"}, []string{"bb"}, false},
{[]string{"aa"}, []string{"aa", "bb"}, true},
{[]string{"aa"}, []string{"aa", "bb", "cc"}, true},
{[]string{"aa", "bb"}, []string{"bb", "cc"}, false},
{[]string{"aa", "bb"}, []string{"aa"}, false},
{[]string{"aa", "bb"}, []string{}, false},
{[]string{"aa", "bb", "cc", "dd", "ee", "ff"}, []string{"aa", "bb", "cc", "dd", "ee", "ff"}, true},
{[]string{"aa", "bb", "cc", "dd", "ee", "ff"}, []string{"aa", "bb", "cc", "dd", "ef", "ff"}, false},
{[]string{"aa", "bb", "cc", "dd", "ee", "ff"}, []string{"aa", "bb", "cc", "dd", "ee", "ef", "ff"}, true},
}
for _, tc := range cases {
a, err := indexWithBlockIDs(tc.aBlocks)
if err != nil {
t.Fatalf("error building index: %v", err)
}
b, err := indexWithBlockIDs(tc.bBlocks)
if err != nil {
t.Fatalf("error building index: %v", err)
}
if got, want := packindex.IsSubset(a, b), tc.want; got != want {
t.Errorf("invalid value of IsSubset(%v,%v): %v, wanted %v", tc.aBlocks, tc.bBlocks, got, want)
}
}
}
func indexWithBlockIDs(items []string) (packindex.Index, error) {
b := packindex.NewBuilder()
for _, it := range items {
b.Add(packindex.Info{
BlockID: it,
PackFile: "x",
PackOffset: 1,
Length: 1,
})
}
var buf bytes.Buffer
if err := b.Build(&buf); err != nil {
return nil, fmt.Errorf("build error: %v", err)
}
return packindex.Open(bytes.NewReader(buf.Bytes()))
}

View File

@@ -8,13 +8,11 @@
"path/filepath"
"testing"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/filesystem"
"github.com/kopia/repo"
"github.com/kopia/repo/block"
"github.com/kopia/repo/object"
"github.com/kopia/repo/storage"
"github.com/kopia/repo/storage/filesystem"
)
const masterPassword = "foobarbazfoobarbaz"

View File

@@ -1,44 +0,0 @@
// Package retry implements exponential retry policy.
package retry
import (
"fmt"
"time"
"github.com/kopia/kopia/internal/kopialogging"
)
var log = kopialogging.Logger("kopia/retry")
const (
maxAttempts = 10
retryInitialSleepAmount = 1 * time.Second
retryMaxSleepAmount = 32 * time.Second
)
// AttemptFunc performs an attempt and returns a value (optional, may be nil) and an error.
type AttemptFunc func() (interface{}, error)
// IsRetriableFunc is a function that determines whether an error is retriable.
type IsRetriableFunc func(err error) bool
// WithExponentialBackoff runs the provided attempt until it succeeds, retrying on all errors that are
// deemed retriable by the provided function. The delay between retries grows exponentially up to
// a certain limit.
func WithExponentialBackoff(desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc) (interface{}, error) {
sleepAmount := retryInitialSleepAmount
for i := 0; i < maxAttempts; i++ {
v, err := attempt()
if !isRetriableError(err) {
return v, err
}
log.Debugf("got error %v when %v (#%v), sleeping for %v before retrying", err, desc, i, sleepAmount)
time.Sleep(sleepAmount)
sleepAmount *= 2
if sleepAmount > retryMaxSleepAmount {
sleepAmount = retryMaxSleepAmount
}
}
return nil, fmt.Errorf("unable to complete %v despite %v retries", desc, maxAttempts)
}

View File

@@ -11,8 +11,8 @@
"github.com/bmizerany/pat"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/repo"
)
var log = kopialogging.Logger("kopia/server")

View File

@@ -3,9 +3,9 @@
import (
"time"
"github.com/kopia/kopia/repo/block"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/repo/block"
)
// StatusResponse is the response of 'status' HTTP API command.

View File

@@ -1,44 +0,0 @@
package throttle
import (
"io"
"net/http"
)
type throttlerPool interface {
AddReader(io.ReadCloser) (io.ReadCloser, error)
}
type throttlingRoundTripper struct {
base http.RoundTripper
downloadPool throttlerPool
uploadPool throttlerPool
}
func (rt *throttlingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Body != nil && rt.uploadPool != nil {
var err error
req.Body, err = rt.uploadPool.AddReader(req.Body)
if err != nil {
return nil, err
}
}
resp, err := rt.base.RoundTrip(req)
if resp != nil && resp.Body != nil && rt.downloadPool != nil {
resp.Body, err = rt.downloadPool.AddReader(resp.Body)
}
return resp, err
}
// NewRoundTripper returns http.RoundTripper that throttles upload and downloads.
func NewRoundTripper(base http.RoundTripper, downloadPool throttlerPool, uploadPool throttlerPool) http.RoundTripper {
if base == nil {
base = http.DefaultTransport
}
return &throttlingRoundTripper{
base: base,
downloadPool: downloadPool,
uploadPool: uploadPool,
}
}

View File

@@ -1,103 +0,0 @@
package throttle
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"
)
type baseRoundTripper struct {
responses map[*http.Request]*http.Response
}
func (rt *baseRoundTripper) add(req *http.Request, resp *http.Response) (*http.Request, *http.Response) {
rt.responses[req] = resp
return req, resp
}
func (rt *baseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp := rt.responses[req]
if resp != nil {
return resp, nil
}
return nil, fmt.Errorf("error occurred")
}
type fakePool struct {
readers []io.ReadCloser
}
func (fp *fakePool) reset() {
fp.readers = nil
}
func (fp *fakePool) AddReader(r io.ReadCloser) (io.ReadCloser, error) {
fp.readers = append(fp.readers, r)
return r, nil
}
func TestRoundTripper(t *testing.T) {
downloadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1")))
uploadBody := ioutil.NopCloser(bytes.NewReader([]byte("data1")))
base := &baseRoundTripper{
responses: make(map[*http.Request]*http.Response),
}
downloadPool := &fakePool{}
uploadPool := &fakePool{}
rt := NewRoundTripper(base, downloadPool, uploadPool)
// Empty request (no request, no response)
uploadPool.reset()
downloadPool.reset()
req1, resp1 := base.add(&http.Request{}, &http.Response{})
resp, err := rt.RoundTrip(req1)
if resp != resp1 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 0 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Upload request
uploadPool.reset()
downloadPool.reset()
req2, resp2 := base.add(&http.Request{
Body: uploadBody,
}, &http.Response{})
resp, err = rt.RoundTrip(req2)
if resp != resp2 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 0 || len(uploadPool.readers) != 1 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Download request
uploadPool.reset()
downloadPool.reset()
req3, resp3 := base.add(&http.Request{}, &http.Response{Body: downloadBody})
resp, err = rt.RoundTrip(req3)
if resp != resp3 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 0 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
// Upload/Download request
uploadPool.reset()
downloadPool.reset()
req4, resp4 := base.add(&http.Request{Body: uploadBody}, &http.Response{Body: downloadBody})
resp, err = rt.RoundTrip(req4)
if resp != resp4 || err != nil {
t.Errorf("invalid response or error: %v", err)
}
if len(downloadPool.readers) != 1 || len(uploadPool.readers) != 1 {
t.Errorf("invalid pool contents: %v %v", downloadPool.readers, uploadPool.readers)
}
}

View File

@@ -20,8 +20,8 @@
"github.com/kopia/kopia/internal/dir"
"github.com/kopia/kopia/internal/hashcache"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
)

View File

@@ -10,10 +10,10 @@
"testing"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/repo/storage/filesystem"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/repo"
"github.com/kopia/repo/object"
"github.com/kopia/repo/storage/filesystem"
)
const masterPassword = "foofoofoofoofoofoofoofoo"

View File

@@ -24,7 +24,7 @@
"github.com/kopia/kopia/cli"
"github.com/kopia/kopia/internal/kopialogging"
"github.com/kopia/kopia/internal/ospath"
"github.com/kopia/kopia/repo"
"github.com/kopia/repo"
"gopkg.in/alecthomas/kingpin.v2"

View File

@@ -1,216 +0,0 @@
package block
import (
"container/heap"
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/kopia/kopia/repo/storage"
"github.com/kopia/kopia/repo/storage/filesystem"
)
const (
sweepCacheFrequency = 1 * time.Minute
touchThreshold = 10 * time.Minute
)
type blockCache struct {
st storage.Storage
cacheStorage storage.Storage
maxSizeBytes int64
hmacSecret []byte
mu sync.Mutex
lastTotalSizeBytes int64
closed chan struct{}
}
type blockToucher interface {
TouchBlock(ctx context.Context, blockID string, threshold time.Duration) error
}
func adjustCacheKey(cacheKey string) string {
// block IDs with odd length have a single-byte prefix.
// move the prefix to the end of cache key to make sure the top level shard is spread 256 ways.
if len(cacheKey)%2 == 1 {
return cacheKey[1:] + cacheKey[0:1]
}
return cacheKey
}
func (c *blockCache) getContentBlock(ctx context.Context, cacheKey string, physicalBlockID string, offset, length int64) ([]byte, error) {
cacheKey = adjustCacheKey(cacheKey)
useCache := shouldUseBlockCache(ctx) && c.cacheStorage != nil
if useCache {
if b := c.readAndVerifyCacheBlock(ctx, cacheKey); b != nil {
return b, nil
}
}
b, err := c.st.GetBlock(ctx, physicalBlockID, offset, length)
if err == storage.ErrBlockNotFound {
// not found in underlying storage
return nil, err
}
if err == nil && useCache {
if puterr := c.cacheStorage.PutBlock(ctx, cacheKey, appendHMAC(b, c.hmacSecret)); puterr != nil {
log.Warningf("unable to write cache item %v: %v", cacheKey, puterr)
}
}
return b, err
}
func (c *blockCache) readAndVerifyCacheBlock(ctx context.Context, cacheKey string) []byte {
b, err := c.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err == nil {
b, err = verifyAndStripHMAC(b, c.hmacSecret)
if err == nil {
if t, ok := c.cacheStorage.(blockToucher); ok {
t.TouchBlock(ctx, cacheKey, touchThreshold) //nolint:errcheck
}
// retrieved from cache and HMAC valid
return b
}
// ignore malformed blocks
log.Warningf("malformed block %v: %v", cacheKey, err)
return nil
}
if err != storage.ErrBlockNotFound {
log.Warningf("unable to read cache %v: %v", cacheKey, err)
}
return nil
}
func (c *blockCache) close() {
close(c.closed)
}
func (c *blockCache) sweepDirectoryPeriodically(ctx context.Context) {
for {
select {
case <-c.closed:
return
case <-time.After(sweepCacheFrequency):
err := c.sweepDirectory(ctx)
if err != nil {
log.Warningf("blockCache sweep failed: %v", err)
}
}
}
}
// A blockMetadataHeap implements heap.Interface and holds storage.BlockMetadata.
type blockMetadataHeap []storage.BlockMetadata
func (h blockMetadataHeap) Len() int { return len(h) }
func (h blockMetadataHeap) Less(i, j int) bool {
return h[i].Timestamp.Before(h[j].Timestamp)
}
func (h blockMetadataHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *blockMetadataHeap) Push(x interface{}) {
*h = append(*h, x.(storage.BlockMetadata))
}
func (h *blockMetadataHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
func (c *blockCache) sweepDirectory(ctx context.Context) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cacheStorage == nil {
return nil
}
t0 := time.Now()
var h blockMetadataHeap
var totalRetainedSize int64
err = c.cacheStorage.ListBlocks(ctx, "", func(it storage.BlockMetadata) error {
heap.Push(&h, it)
totalRetainedSize += it.Length
if totalRetainedSize > c.maxSizeBytes {
oldest := heap.Pop(&h).(storage.BlockMetadata)
if delerr := c.cacheStorage.DeleteBlock(ctx, it.BlockID); delerr != nil {
log.Warningf("unable to remove %v: %v", it.BlockID, delerr)
} else {
totalRetainedSize -= oldest.Length
}
}
return nil
})
if err != nil {
return fmt.Errorf("error listing cache: %v", err)
}
log.Debugf("finished sweeping directory in %v and retained %v/%v bytes (%v %%)", time.Since(t0), totalRetainedSize, c.maxSizeBytes, 100*totalRetainedSize/c.maxSizeBytes)
c.lastTotalSizeBytes = totalRetainedSize
return nil
}
func newBlockCache(ctx context.Context, st storage.Storage, caching CachingOptions) (*blockCache, error) {
var cacheStorage storage.Storage
var err error
if caching.MaxCacheSizeBytes > 0 && caching.CacheDirectory != "" {
blockCacheDir := filepath.Join(caching.CacheDirectory, "blocks")
if _, err = os.Stat(blockCacheDir); os.IsNotExist(err) {
if err = os.MkdirAll(blockCacheDir, 0700); err != nil {
return nil, err
}
}
cacheStorage, err = filesystem.New(context.Background(), &filesystem.Options{
Path: blockCacheDir,
DirectoryShards: []int{2},
})
if err != nil {
return nil, err
}
}
return newBlockCacheWithCacheStorage(ctx, st, cacheStorage, caching)
}
func newBlockCacheWithCacheStorage(ctx context.Context, st, cacheStorage storage.Storage, caching CachingOptions) (*blockCache, error) {
c := &blockCache{
st: st,
cacheStorage: cacheStorage,
maxSizeBytes: caching.MaxCacheSizeBytes,
hmacSecret: append([]byte(nil), caching.HMACSecret...),
closed: make(chan struct{}),
}
if err := c.sweepDirectory(ctx); err != nil {
return nil, err
}
go c.sweepDirectoryPeriodically(ctx)
return c, nil
}

View File

@@ -1,130 +0,0 @@
package block
import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"sort"
"testing"
"github.com/kopia/kopia/repo/internal/storagetesting"
"github.com/kopia/kopia/repo/storage"
)
func newUnderlyingStorageForBlockCacheTesting() storage.Storage {
ctx := context.Background()
data := map[string][]byte{}
st := storagetesting.NewMapStorage(data, nil, nil)
st.PutBlock(ctx, "block-1", []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
return st
}
func TestInMemoryBlockCache(t *testing.T) {
cacheData := map[string][]byte{}
cacheStorage := storagetesting.NewMapStorage(cacheData, nil, nil)
cache, err := newBlockCacheWithCacheStorage(context.Background(), newUnderlyingStorageForBlockCacheTesting(), cacheStorage, CachingOptions{
MaxCacheSizeBytes: 10000,
})
if err != nil {
t.Fatalf("err: %v", err)
}
verifyBlockCache(t, cache)
}
func TestDiskBlockCache(t *testing.T) {
ctx := context.Background()
tmpDir, err := ioutil.TempDir("", "kopia")
if err != nil {
t.Fatalf("error getting temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cache, err := newBlockCache(ctx, newUnderlyingStorageForBlockCacheTesting(), CachingOptions{
MaxCacheSizeBytes: 10000,
CacheDirectory: tmpDir,
})
if err != nil {
t.Fatalf("err: %v", err)
}
verifyBlockCache(t, cache)
}
func verifyBlockCache(t *testing.T, cache *blockCache) {
ctx := context.Background()
defer cache.close()
t.Run("GetContentBlock", func(t *testing.T) {
cases := []struct {
cacheKey string
physicalBlockID string
offset int64
length int64
expected []byte
err error
}{
{"xf0f0f1", "block-1", 1, 5, []byte{2, 3, 4, 5, 6}, nil},
{"xf0f0f2", "block-1", 0, -1, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil},
{"xf0f0f1", "block-1", 1, 5, []byte{2, 3, 4, 5, 6}, nil},
{"xf0f0f2", "block-1", 0, -1, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil},
{"xf0f0f3", "no-such-block", 0, -1, nil, storage.ErrBlockNotFound},
{"xf0f0f4", "no-such-block", 10, 5, nil, storage.ErrBlockNotFound},
{"f0f0f5", "block-1", 7, 10, []byte{8, 9, 10}, nil},
{"xf0f0f6", "block-1", 11, 10, nil, fmt.Errorf("invalid offset")},
{"xf0f0f6", "block-1", -1, 5, nil, fmt.Errorf("invalid offset")},
}
for _, tc := range cases {
v, err := cache.getContentBlock(ctx, tc.cacheKey, tc.physicalBlockID, tc.offset, tc.length)
if !reflect.DeepEqual(err, tc.err) {
t.Errorf("unexpected error for %v: %+v, wanted %+v", tc.cacheKey, err, tc.err)
}
if !reflect.DeepEqual(v, tc.expected) {
t.Errorf("unexpected data for %v: %x, wanted %x", tc.cacheKey, v, tc.expected)
}
}
verifyStorageBlockList(t, cache.cacheStorage, "f0f0f1x", "f0f0f2x", "f0f0f5")
})
t.Run("DataCorruption", func(t *testing.T) {
cacheKey := "f0f0f1x"
d, err := cache.cacheStorage.GetBlock(ctx, cacheKey, 0, -1)
if err != nil {
t.Fatalf("unable to retrieve data from cache: %v", err)
}
// corrupt the data and write back
d[0] ^= 1
if err := cache.cacheStorage.PutBlock(ctx, cacheKey, d); err != nil {
t.Fatalf("unable to write corrupted block: %v", err)
}
v, err := cache.getContentBlock(ctx, "xf0f0f1", "block-1", 1, 5)
if err != nil {
t.Fatalf("error in getContentBlock: %v", err)
}
if got, want := v, []byte{2, 3, 4, 5, 6}; !reflect.DeepEqual(v, want) {
t.Errorf("invalid result when reading corrupted data: %v, wanted %v", got, want)
}
})
}
func verifyStorageBlockList(t *testing.T, st storage.Storage, expectedBlocks ...string) {
t.Helper()
var foundBlocks []string
st.ListBlocks(context.Background(), "", func(bm storage.BlockMetadata) error {
foundBlocks = append(foundBlocks, bm.BlockID)
return nil
})
sort.Strings(foundBlocks)
if !reflect.DeepEqual(foundBlocks, expectedBlocks) {
t.Errorf("unexpected block list: %v, wanted %v", foundBlocks, expectedBlocks)
}
}

View File

@@ -1,120 +0,0 @@
package block
import (
"crypto/aes"
"crypto/cipher"
"crypto/hmac" //nolint:gas
"crypto/sha256"
"fmt"
"hash"
"sort"
)
// Formatter performs data block ID computation and encryption of a block of data when storing object in a repository.
type Formatter interface {
// ComputeBlockID computes ID of the storage block for the specified block of data and returns it in ObjectID.
ComputeBlockID(data []byte) []byte
// Encrypt returns encrypted bytes corresponding to the given plaintext. Must not clobber the input slice.
Encrypt(plainText []byte, blockID []byte) ([]byte, error)
// Decrypt returns unencrypted bytes corresponding to the given ciphertext. Must not clobber the input slice.
Decrypt(cipherText []byte, blockID []byte) ([]byte, error)
}
// digestFunction computes the digest (hash, optionally HMAC) of a given block of bytes.
type digestFunction func([]byte) []byte
// unencryptedFormat implements non-encrypted format.
type unencryptedFormat struct {
digestFunc digestFunction
}
func (fi *unencryptedFormat) ComputeBlockID(data []byte) []byte {
return fi.digestFunc(data)
}
func (fi *unencryptedFormat) Encrypt(plainText []byte, blockID []byte) ([]byte, error) {
return cloneBytes(plainText), nil
}
func (fi *unencryptedFormat) Decrypt(cipherText []byte, blockID []byte) ([]byte, error) {
return cloneBytes(cipherText), nil
}
// syntheticIVEncryptionFormat implements encrypted format with single master AES key and StorageBlock==IV that's
// derived from HMAC-SHA256(content, secret).
type syntheticIVEncryptionFormat struct {
digestFunc digestFunction
createCipher func(key []byte) (cipher.Block, error)
aesKey []byte
}
func (fi *syntheticIVEncryptionFormat) ComputeBlockID(data []byte) []byte {
return fi.digestFunc(data)
}
func (fi *syntheticIVEncryptionFormat) Encrypt(plainText []byte, blockID []byte) ([]byte, error) {
return symmetricEncrypt(fi.createCipher, fi.aesKey, blockID, plainText)
}
func (fi *syntheticIVEncryptionFormat) Decrypt(cipherText []byte, blockID []byte) ([]byte, error) {
return symmetricEncrypt(fi.createCipher, fi.aesKey, blockID, cipherText)
}
func symmetricEncrypt(createCipher func(key []byte) (cipher.Block, error), key []byte, iv []byte, b []byte) ([]byte, error) {
blockCipher, err := createCipher(key)
if err != nil {
return nil, err
}
ctr := cipher.NewCTR(blockCipher, iv[0:blockCipher.BlockSize()])
result := make([]byte, len(b))
ctr.XORKeyStream(result, b)
return result, nil
}
// SupportedFormats is a list of supported object formats including:
//
// UNENCRYPTED_HMAC_SHA256_128 - unencrypted, block IDs are 128-bit (32 characters long)
// UNENCRYPTED_HMAC_SHA256 - unencrypted, block IDs are 256-bit (64 characters long)
// ENCRYPTED_HMAC_SHA256_AES256_SIV - encrypted with AES-256 (shared key), IV==FOLD(HMAC-SHA256(content), 128)
var SupportedFormats []string
// FormatterFactories maps known block formatters to their factory functions.
var FormatterFactories map[string]func(f FormattingOptions) (Formatter, error)
func init() {
FormatterFactories = map[string]func(f FormattingOptions) (Formatter, error){
"UNENCRYPTED_HMAC_SHA256": func(f FormattingOptions) (Formatter, error) {
return &unencryptedFormat{computeHMAC(sha256.New, f.HMACSecret, sha256.Size)}, nil
},
"UNENCRYPTED_HMAC_SHA256_128": func(f FormattingOptions) (Formatter, error) {
return &unencryptedFormat{computeHMAC(sha256.New, f.HMACSecret, 16)}, nil
},
"ENCRYPTED_HMAC_SHA256_AES256_SIV": func(f FormattingOptions) (Formatter, error) {
if len(f.MasterKey) < 32 {
return nil, fmt.Errorf("master key is not set")
}
return &syntheticIVEncryptionFormat{computeHMAC(sha256.New, f.HMACSecret, aes.BlockSize), aes.NewCipher, f.MasterKey}, nil
},
}
for formatName := range FormatterFactories {
SupportedFormats = append(SupportedFormats, formatName)
}
sort.Strings(SupportedFormats)
}
// DefaultFormat is the block format that should be used by default when creating new repositories.
const DefaultFormat = "ENCRYPTED_HMAC_SHA256_AES256_SIV"
// computeHMAC returns a digestFunction that computes HMAC(hash, secret) of a given block of bytes and truncates results to the given size.
func computeHMAC(hf func() hash.Hash, secret []byte, truncate int) digestFunction {
return func(b []byte) []byte {
h := hmac.New(hf, secret)
h.Write(b) // nolint:errcheck
return h.Sum(nil)[0:truncate]
}
}

View File

@@ -1,48 +0,0 @@
package block
import (
"bytes"
"crypto/rand"
"crypto/sha1"
"testing"
)
func TestFormatters(t *testing.T) {
secret := []byte("secret")
f := FormattingOptions{HMACSecret: secret, MasterKey: make([]byte, 32)}
for k, v := range FormatterFactories {
data := make([]byte, 100)
rand.Read(data)
h0 := sha1.Sum(data)
of, err := v(f)
if err != nil {
t.Errorf("error creating object formatter for %v: %v", k, err)
continue
}
t.Logf("testing %v", k)
blockID := of.ComputeBlockID(data)
cipherText, err := of.Encrypt(data, blockID)
if err != nil || cipherText == nil {
t.Errorf("invalid response from Encrypt: %v %v", cipherText, err)
}
plainText, err := of.Decrypt(cipherText, blockID)
if err != nil || plainText == nil {
t.Errorf("invalid response from Decrypt: %v %v", plainText, err)
}
h1 := sha1.Sum(plainText)
if !bytes.Equal(h0[:], h1[:]) {
t.Errorf("Encrypt()/Decrypt() does not round-trip: %x %x", h0, h1)
}
if len(blockID)%16 != 0 {
t.Errorf("block ID for %v not a multiple of 16: %v", k, blockID)
}
}
}

View File

@@ -1,10 +0,0 @@
package block
// FormattingOptions describes the rules for formatting blocks in repository.
type FormattingOptions struct {
Version int `json:"version,omitempty"` // version number, must be "1"
BlockFormat string `json:"objectFormat,omitempty"` // identifier of the block format
HMACSecret []byte `json:"secret,omitempty"` // HMAC secret used to generate encryption keys
MasterKey []byte `json:"masterKey,omitempty"` // master encryption key (SIV-mode encryption only)
MaxPackSize int `json:"maxPackSize,omitempty"` // maximum size of a pack object
}

View File

@@ -1,227 +0,0 @@
package block
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"hash/crc32"
"reflect"
"github.com/kopia/kopia/internal/packindex"
)
// RecoverIndexFromPackFile attempts to recover index block entries from a given pack file.
// Pack file length may be provided (if known) to reduce the number of bytes that are read from the storage.
func (bm *Manager) RecoverIndexFromPackFile(ctx context.Context, packFile string, packFileLength int64, commit bool) ([]Info, error) {
localIndexBytes, err := bm.readPackFileLocalIndex(ctx, packFile, packFileLength)
if err != nil {
return nil, err
}
ndx, err := packindex.Open(bytes.NewReader(localIndexBytes))
if err != nil {
return nil, fmt.Errorf("unable to open index in file %v", packFile)
}
var recovered []Info
err = ndx.Iterate("", func(i Info) error {
recovered = append(recovered, i)
if commit {
bm.packIndexBuilder.Add(i)
}
return nil
})
return recovered, err
}
type packBlockPostamble struct {
localIndexIV []byte
localIndexOffset uint32
localIndexLength uint32
}
func (p *packBlockPostamble) toBytes() ([]byte, error) {
// 4 varints + IV + 4 bytes of checksum + 1 byte of postamble length
n := 0
buf := make([]byte, 4*binary.MaxVarintLen64+len(p.localIndexIV)+4+1)
n += binary.PutUvarint(buf[n:], uint64(1)) // version flag
n += binary.PutUvarint(buf[n:], uint64(len(p.localIndexIV))) // length of local index IV
copy(buf[n:], p.localIndexIV)
n += len(p.localIndexIV)
n += binary.PutUvarint(buf[n:], uint64(p.localIndexOffset))
n += binary.PutUvarint(buf[n:], uint64(p.localIndexLength))
checksum := crc32.ChecksumIEEE(buf[0:n])
binary.BigEndian.PutUint32(buf[n:], checksum)
n += 4
if n > 255 {
return nil, fmt.Errorf("postamble too long: %v", n)
}
buf[n] = byte(n)
return buf[0 : n+1], nil
}
// findPostamble detects if a given block of bytes contains a possibly valid postamble, and returns it if so
// NOTE, even if this function returns a postamble, it should not be trusted to be correct, since it's not
// cryptographically signed. this is to facilitate data recovery.
func findPostamble(b []byte) *packBlockPostamble {
if len(b) == 0 {
// no postamble
return nil
}
// length of postamble is the last byte
postambleLength := int(b[len(b)-1])
if postambleLength < 5 {
// too short, must be at least 5 bytes (checksum + own length)
return nil
}
postambleStart := len(b) - 1 - postambleLength
postambleEnd := len(b) - 1
if postambleStart < 0 {
// invalid last byte
return nil
}
postambleBytes := b[postambleStart:postambleEnd]
payload, checksumBytes := postambleBytes[0:len(postambleBytes)-4], postambleBytes[len(postambleBytes)-4:]
checksum := binary.BigEndian.Uint32(checksumBytes)
validChecksum := crc32.ChecksumIEEE(payload)
if checksum != validChecksum {
// invalid checksum, not a valid postamble
return nil
}
return decodePostamble(payload)
}
func decodePostamble(payload []byte) *packBlockPostamble {
flags, n := binary.Uvarint(payload)
if n <= 0 {
// invalid flags
return nil
}
if flags != 1 {
// unsupported flag
return nil
}
payload = payload[n:]
ivLength, n := binary.Uvarint(payload)
if n <= 0 {
// invalid flags
return nil
}
payload = payload[n:]
if ivLength > uint64(len(payload)) {
// invalid IV length
return nil
}
iv := payload[0:ivLength]
payload = payload[ivLength:]
off, n := binary.Uvarint(payload)
if n <= 0 {
// invalid offset
return nil
}
payload = payload[n:]
length, n := binary.Uvarint(payload)
if n <= 0 {
// invalid offset
return nil
}
return &packBlockPostamble{
localIndexIV: iv,
localIndexLength: uint32(length),
localIndexOffset: uint32(off),
}
}
func (bm *Manager) buildLocalIndex(pending packindex.Builder) ([]byte, error) {
var buf bytes.Buffer
if err := pending.Build(&buf); err != nil {
return nil, fmt.Errorf("unable to build local index: %v", err)
}
return buf.Bytes(), nil
}
// appendPackFileIndexRecoveryData appends data designed to help with recovery of pack index in case it gets damaged or lost.
func (bm *Manager) appendPackFileIndexRecoveryData(blockData []byte, pending packindex.Builder) ([]byte, error) {
// build, encrypt and append local index
localIndexOffset := len(blockData)
localIndex, err := bm.buildLocalIndex(pending)
if err != nil {
return nil, err
}
localIndexIV := bm.hashData(localIndex)
encryptedLocalIndex, err := bm.formatter.Encrypt(localIndex, localIndexIV)
if err != nil {
return nil, err
}
postamble := packBlockPostamble{
localIndexIV: localIndexIV,
localIndexOffset: uint32(localIndexOffset),
localIndexLength: uint32(len(localIndex)),
}
blockData = append(blockData, encryptedLocalIndex...)
postambleBytes, err := postamble.toBytes()
if err != nil {
return nil, err
}
blockData = append(blockData, postambleBytes...)
pa2 := findPostamble(blockData)
if pa2 == nil {
log.Fatalf("invalid postamble written, that could not be immediately decoded, it's a bug")
}
if !reflect.DeepEqual(postamble, *pa2) {
log.Fatalf("postamble did not round-trip: %v %v", postamble, *pa2)
}
return blockData, nil
}
func (bm *Manager) readPackFileLocalIndex(ctx context.Context, packFile string, packFileLength int64) ([]byte, error) {
payload, err := bm.st.GetBlock(ctx, packFile, 0, -1)
if err != nil {
return nil, err
}
postamble := findPostamble(payload)
if postamble == nil {
return nil, fmt.Errorf("unable to find valid postamble in file %v", packFile)
}
if uint64(postamble.localIndexOffset+postamble.localIndexLength) > uint64(len(payload)) {
// invalid offset/length
return nil, fmt.Errorf("unable to find valid local index in file %v", packFile)
}
encryptedLocalIndexBytes := payload[postamble.localIndexOffset : postamble.localIndexOffset+postamble.localIndexLength]
if encryptedLocalIndexBytes == nil {
return nil, fmt.Errorf("unable to find valid local index in file %v", packFile)
}
localIndexBytes, err := bm.decryptAndVerify(encryptedLocalIndexBytes, postamble.localIndexIV)
if err != nil {
return nil, fmt.Errorf("unable to decrypt local index: %v", err)
}
return localIndexBytes, nil
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,148 +0,0 @@
package block
import (
"bytes"
"context"
"fmt"
"time"
"github.com/kopia/kopia/internal/packindex"
)
var autoCompactionOptions = CompactOptions{
MinSmallBlocks: 4 * parallelFetches,
MaxSmallBlocks: 64,
}
// CompactOptions provides options for compaction
type CompactOptions struct {
MinSmallBlocks int
MaxSmallBlocks int
AllBlocks bool
SkipDeletedOlderThan time.Duration
}
// CompactIndexes performs compaction of index blocks ensuring that # of small blocks is between minSmallBlockCount and maxSmallBlockCount
func (bm *Manager) CompactIndexes(ctx context.Context, opt CompactOptions) error {
log.Debugf("CompactIndexes(%+v)", opt)
if opt.MaxSmallBlocks < opt.MinSmallBlocks {
return fmt.Errorf("invalid block counts")
}
indexBlocks, _, err := bm.loadPackIndexesUnlocked(ctx)
if err != nil {
return fmt.Errorf("error loading indexes: %v", err)
}
blocksToCompact := bm.getBlocksToCompact(indexBlocks, opt)
if err := bm.compactAndDeleteIndexBlocks(ctx, blocksToCompact, opt); err != nil {
log.Warningf("error performing quick compaction: %v", err)
}
return nil
}
func (bm *Manager) getBlocksToCompact(indexBlocks []IndexInfo, opt CompactOptions) []IndexInfo {
var nonCompactedBlocks []IndexInfo
var totalSizeNonCompactedBlocks int64
var verySmallBlocks []IndexInfo
var totalSizeVerySmallBlocks int64
var mediumSizedBlocks []IndexInfo
var totalSizeMediumSizedBlocks int64
for _, b := range indexBlocks {
if b.Length > int64(bm.maxPackSize) && !opt.AllBlocks {
continue
}
nonCompactedBlocks = append(nonCompactedBlocks, b)
if b.Length < int64(bm.maxPackSize/20) {
verySmallBlocks = append(verySmallBlocks, b)
totalSizeVerySmallBlocks += b.Length
} else {
mediumSizedBlocks = append(mediumSizedBlocks, b)
totalSizeMediumSizedBlocks += b.Length
}
totalSizeNonCompactedBlocks += b.Length
}
if len(nonCompactedBlocks) < opt.MinSmallBlocks {
// current count is below min allowed - nothing to do
formatLog.Debugf("no small blocks to compact")
return nil
}
if len(verySmallBlocks) > len(nonCompactedBlocks)/2 && len(mediumSizedBlocks)+1 < opt.MinSmallBlocks {
formatLog.Debugf("compacting %v very small blocks", len(verySmallBlocks))
return verySmallBlocks
}
formatLog.Debugf("compacting all %v non-compacted blocks", len(nonCompactedBlocks))
return nonCompactedBlocks
}
func (bm *Manager) compactAndDeleteIndexBlocks(ctx context.Context, indexBlocks []IndexInfo, opt CompactOptions) error {
if len(indexBlocks) <= 1 {
return nil
}
formatLog.Debugf("compacting %v blocks", len(indexBlocks))
t0 := time.Now()
bld := packindex.NewBuilder()
for _, indexBlock := range indexBlocks {
if err := bm.addIndexBlocksToBuilder(ctx, bld, indexBlock, opt); err != nil {
return err
}
}
var buf bytes.Buffer
if err := bld.Build(&buf); err != nil {
return fmt.Errorf("unable to build an index: %v", err)
}
compactedIndexBlock, err := bm.writePackIndexesNew(ctx, buf.Bytes())
if err != nil {
return fmt.Errorf("unable to write compacted indexes: %v", err)
}
formatLog.Debugf("wrote compacted index (%v bytes) in %v", compactedIndexBlock, time.Since(t0))
for _, indexBlock := range indexBlocks {
if indexBlock.FileName == compactedIndexBlock {
continue
}
bm.listCache.deleteListCache(ctx)
if err := bm.st.DeleteBlock(ctx, indexBlock.FileName); err != nil {
log.Warningf("unable to delete compacted block %q: %v", indexBlock.FileName, err)
}
}
return nil
}
func (bm *Manager) addIndexBlocksToBuilder(ctx context.Context, bld packindex.Builder, indexBlock IndexInfo, opt CompactOptions) error {
data, err := bm.getPhysicalBlockInternal(ctx, indexBlock.FileName)
if err != nil {
return err
}
index, err := packindex.Open(bytes.NewReader(data))
if err != nil {
return fmt.Errorf("unable to open index block %q: %v", indexBlock, err)
}
_ = index.Iterate("", func(i Info) error {
if i.Deleted && opt.SkipDeletedOlderThan > 0 && time.Since(i.Timestamp()) > opt.SkipDeletedOlderThan {
log.Debugf("skipping block %v deleted at %v", i.BlockID, i.Timestamp())
return nil
}
bld.Add(i)
return nil
})
return nil
}

View File

@@ -1,819 +0,0 @@
package block
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/kopia/kopia/internal/packindex"
"github.com/kopia/kopia/repo/internal/storagetesting"
"github.com/kopia/kopia/repo/storage"
logging "github.com/op/go-logging"
)
const (
maxPackSize = 2000
)
var fakeTime = time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC)
var hmacSecret = []byte{1, 2, 3}
func init() {
logging.SetLevel(logging.INFO, "")
}
func TestBlockManagerEmptyFlush(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
bm.Flush(ctx)
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockZeroBytes1(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
blockID := writeBlockAndVerify(ctx, t, bm, []byte{})
bm.Flush(ctx)
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
dumpBlockManagerData(t, data)
bm = newTestBlockManager(data, keyTime, nil)
verifyBlock(ctx, t, bm, blockID, []byte{})
}
func TestBlockZeroBytes2(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 10))
writeBlockAndVerify(ctx, t, bm, []byte{})
bm.Flush(ctx)
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
dumpBlockManagerData(t, data)
}
}
func TestBlockManagerSmallBlockWrites(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
writeBlockAndVerify(ctx, t, bm, seededRandomData(i, 10))
}
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush(ctx)
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockManagerDedupesPendingBlocks(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
writeBlockAndVerify(ctx, t, bm, seededRandomData(0, 999))
}
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush(ctx)
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockManagerDedupesPendingAndUncommittedBlocks(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
// no writes here, all data fits in a single pack.
writeBlockAndVerify(ctx, t, bm, seededRandomData(0, 950))
writeBlockAndVerify(ctx, t, bm, seededRandomData(1, 950))
writeBlockAndVerify(ctx, t, bm, seededRandomData(2, 10))
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
// no writes here
writeBlockAndVerify(ctx, t, bm, seededRandomData(0, 950))
writeBlockAndVerify(ctx, t, bm, seededRandomData(1, 950))
writeBlockAndVerify(ctx, t, bm, seededRandomData(2, 10))
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush(ctx)
// this flushes the pack block + index block
if got, want := len(data), 2; got != want {
dumpBlockManagerData(t, data)
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockManagerEmpty(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
noSuchBlockID := string(hashValue([]byte("foo")))
b, err := bm.GetBlock(ctx, noSuchBlockID)
if err != storage.ErrBlockNotFound {
t.Errorf("unexpected error when getting non-existent block: %v, %v", b, err)
}
bi, err := bm.BlockInfo(ctx, noSuchBlockID)
if err != storage.ErrBlockNotFound {
t.Errorf("unexpected error when getting non-existent block info: %v, %v", bi, err)
}
if got, want := len(data), 0; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func verifyActiveIndexBlockCount(ctx context.Context, t *testing.T, bm *Manager, expected int) {
t.Helper()
blks, err := bm.IndexBlocks(ctx)
if err != nil {
t.Errorf("error listing active index blocks: %v", err)
return
}
if got, want := len(blks), expected; got != want {
t.Errorf("unexpected number of active index blocks %v, expected %v (%v)", got, want, blks)
}
}
func TestBlockManagerInternalFlush(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
for i := 0; i < 100; i++ {
b := make([]byte, 25)
rand.Read(b)
writeBlockAndVerify(ctx, t, bm, b)
}
// 1 data block written, but no index yet.
if got, want := len(data), 1; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
// do it again - should be 2 blocks + 1000 bytes pending.
for i := 0; i < 100; i++ {
b := make([]byte, 25)
rand.Read(b)
writeBlockAndVerify(ctx, t, bm, b)
}
// 2 data blocks written, but no index yet.
if got, want := len(data), 2; got != want {
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
bm.Flush(ctx)
// third block gets written, followed by index.
if got, want := len(data), 4; got != want {
dumpBlockManagerData(t, data)
t.Errorf("unexpected number of blocks: %v, wanted %v", got, want)
}
}
func TestBlockManagerWriteMultiple(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
timeFunc := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
bm := newTestBlockManager(data, keyTime, timeFunc)
var blockIDs []string
for i := 0; i < 5000; i++ {
//t.Logf("i=%v", i)
b := seededRandomData(i, i%113)
blkID, err := bm.WriteBlock(ctx, b, "")
if err != nil {
t.Errorf("err: %v", err)
}
blockIDs = append(blockIDs, blkID)
if i%17 == 0 {
//t.Logf("flushing %v", i)
if err := bm.Flush(ctx); err != nil {
t.Fatalf("error flushing: %v", err)
}
//dumpBlockManagerData(t, data)
}
if i%41 == 0 {
//t.Logf("opening new manager: %v", i)
if err := bm.Flush(ctx); err != nil {
t.Fatalf("error flushing: %v", err)
}
//t.Logf("data block count: %v", len(data))
//dumpBlockManagerData(t, data)
bm = newTestBlockManager(data, keyTime, timeFunc)
}
pos := rand.Intn(len(blockIDs))
if _, err := bm.GetBlock(ctx, blockIDs[pos]); err != nil {
dumpBlockManagerData(t, data)
t.Fatalf("can't read block %q: %v", blockIDs[pos], err)
continue
}
}
}
// This is regression test for a bug where we would corrupt data when encryption
// was done in place and clobbered pending data in memory.
func TestBlockManagerFailedToWritePack(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
st := storagetesting.NewMapStorage(data, keyTime, nil)
faulty := &storagetesting.FaultyStorage{
Base: st,
}
st = faulty
bm, err := newManagerWithOptions(context.Background(), st, FormattingOptions{
Version: 1,
BlockFormat: "ENCRYPTED_HMAC_SHA256_AES256_SIV",
MaxPackSize: maxPackSize,
HMACSecret: []byte("foo"),
MasterKey: []byte("0123456789abcdef0123456789abcdef"),
}, CachingOptions{}, fakeTimeNowFrozen(fakeTime))
if err != nil {
t.Fatalf("can't create bm: %v", err)
}
logging.SetLevel(logging.DEBUG, "faulty-storage")
faulty.Faults = map[string][]*storagetesting.Fault{
"PutBlock": []*storagetesting.Fault{
{Err: errors.New("booboo")},
},
}
b1, err := bm.WriteBlock(ctx, seededRandomData(1, 10), "")
if err != nil {
t.Fatalf("can't create block: %v", err)
}
if err := bm.Flush(ctx); err != nil {
t.Logf("expected flush error: %v", err)
}
verifyBlock(ctx, t, bm, b1, seededRandomData(1, 10))
}
func TestBlockManagerConcurrency(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
preexistingBlock := writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 100))
bm.Flush(ctx)
dumpBlockManagerData(t, data)
bm1 := newTestBlockManager(data, keyTime, nil)
bm2 := newTestBlockManager(data, keyTime, nil)
bm3 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(1), 1*time.Second))
// all bm* can see pre-existing block
verifyBlock(ctx, t, bm1, preexistingBlock, seededRandomData(10, 100))
verifyBlock(ctx, t, bm2, preexistingBlock, seededRandomData(10, 100))
verifyBlock(ctx, t, bm3, preexistingBlock, seededRandomData(10, 100))
// write the same block in all managers.
sharedBlock := writeBlockAndVerify(ctx, t, bm1, seededRandomData(20, 100))
writeBlockAndVerify(ctx, t, bm2, seededRandomData(20, 100))
writeBlockAndVerify(ctx, t, bm3, seededRandomData(20, 100))
// write unique block per manager.
bm1block := writeBlockAndVerify(ctx, t, bm1, seededRandomData(31, 100))
bm2block := writeBlockAndVerify(ctx, t, bm2, seededRandomData(32, 100))
bm3block := writeBlockAndVerify(ctx, t, bm3, seededRandomData(33, 100))
// make sure they can't see each other's unflushed blocks.
verifyBlockNotFound(ctx, t, bm1, bm2block)
verifyBlockNotFound(ctx, t, bm1, bm3block)
verifyBlockNotFound(ctx, t, bm2, bm1block)
verifyBlockNotFound(ctx, t, bm2, bm3block)
verifyBlockNotFound(ctx, t, bm3, bm1block)
verifyBlockNotFound(ctx, t, bm3, bm2block)
// now flush all writers, they still can't see each others' data.
bm1.Flush(ctx)
bm2.Flush(ctx)
bm3.Flush(ctx)
verifyBlockNotFound(ctx, t, bm1, bm2block)
verifyBlockNotFound(ctx, t, bm1, bm3block)
verifyBlockNotFound(ctx, t, bm2, bm1block)
verifyBlockNotFound(ctx, t, bm2, bm3block)
verifyBlockNotFound(ctx, t, bm3, bm1block)
verifyBlockNotFound(ctx, t, bm3, bm2block)
// new block manager at this point can see all data.
bm4 := newTestBlockManager(data, keyTime, nil)
verifyBlock(ctx, t, bm4, preexistingBlock, seededRandomData(10, 100))
verifyBlock(ctx, t, bm4, sharedBlock, seededRandomData(20, 100))
verifyBlock(ctx, t, bm4, bm1block, seededRandomData(31, 100))
verifyBlock(ctx, t, bm4, bm2block, seededRandomData(32, 100))
verifyBlock(ctx, t, bm4, bm3block, seededRandomData(33, 100))
if got, want := getIndexCount(data), 4; got != want {
t.Errorf("unexpected index count before compaction: %v, wanted %v", got, want)
}
if err := bm4.CompactIndexes(ctx, CompactOptions{
MinSmallBlocks: 1,
MaxSmallBlocks: 1,
}); err != nil {
t.Errorf("compaction error: %v", err)
}
if got, want := getIndexCount(data), 1; got != want {
t.Errorf("unexpected index count after compaction: %v, wanted %v", got, want)
}
// new block manager at this point can see all data.
bm5 := newTestBlockManager(data, keyTime, nil)
verifyBlock(ctx, t, bm5, preexistingBlock, seededRandomData(10, 100))
verifyBlock(ctx, t, bm5, sharedBlock, seededRandomData(20, 100))
verifyBlock(ctx, t, bm5, bm1block, seededRandomData(31, 100))
verifyBlock(ctx, t, bm5, bm2block, seededRandomData(32, 100))
verifyBlock(ctx, t, bm5, bm3block, seededRandomData(33, 100))
if err := bm5.CompactIndexes(ctx, CompactOptions{
MinSmallBlocks: 1,
MaxSmallBlocks: 1,
}); err != nil {
t.Errorf("compaction error: %v", err)
}
}
func TestDeleteBlock(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
block1 := writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 100))
bm.Flush(ctx)
block2 := writeBlockAndVerify(ctx, t, bm, seededRandomData(11, 100))
if err := bm.DeleteBlock(block1); err != nil {
t.Errorf("unable to delete block: %v", block1)
}
if err := bm.DeleteBlock(block2); err != nil {
t.Errorf("unable to delete block: %v", block1)
}
verifyBlockNotFound(ctx, t, bm, block1)
verifyBlockNotFound(ctx, t, bm, block2)
bm.Flush(ctx)
log.Debugf("-----------")
bm = newTestBlockManager(data, keyTime, nil)
//dumpBlockManagerData(t, data)
verifyBlockNotFound(ctx, t, bm, block1)
verifyBlockNotFound(ctx, t, bm, block2)
}
func TestRewriteNonDeleted(t *testing.T) {
const stepBehaviors = 3
// perform a sequence WriteBlock() <action1> RewriteBlock() <action2> GetBlock()
// where actionX can be (0=flush and reopen, 1=flush, 2=nothing)
for action1 := 0; action1 < stepBehaviors; action1++ {
for action2 := 0; action2 < stepBehaviors; action2++ {
t.Run(fmt.Sprintf("case-%v-%v", action1, action2), func(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
bm := newTestBlockManager(data, keyTime, fakeNow)
applyStep := func(action int) {
switch action {
case 0:
t.Logf("flushing and reopening")
bm.Flush(ctx)
bm = newTestBlockManager(data, keyTime, fakeNow)
case 1:
t.Logf("flushing")
bm.Flush(ctx)
case 2:
t.Logf("doing nothing")
}
}
block1 := writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 100))
applyStep(action1)
bm.RewriteBlock(ctx, block1)
applyStep(action2)
verifyBlock(ctx, t, bm, block1, seededRandomData(10, 100))
dumpBlockManagerData(t, data)
})
}
}
}
func TestDisableFlush(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, nil)
bm.DisableIndexFlush()
bm.DisableIndexFlush()
for i := 0; i < 500; i++ {
writeBlockAndVerify(ctx, t, bm, seededRandomData(i, 100))
}
bm.Flush(ctx) // flush will not have effect
bm.EnableIndexFlush()
bm.Flush(ctx) // flush will not have effect
bm.EnableIndexFlush()
verifyActiveIndexBlockCount(ctx, t, bm, 0)
bm.EnableIndexFlush()
verifyActiveIndexBlockCount(ctx, t, bm, 0)
bm.Flush(ctx) // flush will happen now
verifyActiveIndexBlockCount(ctx, t, bm, 1)
}
func TestRewriteDeleted(t *testing.T) {
const stepBehaviors = 3
// perform a sequence WriteBlock() <action1> Delete() <action2> RewriteBlock() <action3> GetBlock()
// where actionX can be (0=flush and reopen, 1=flush, 2=nothing)
for action1 := 0; action1 < stepBehaviors; action1++ {
for action2 := 0; action2 < stepBehaviors; action2++ {
for action3 := 0; action3 < stepBehaviors; action3++ {
t.Run(fmt.Sprintf("case-%v-%v-%v", action1, action2, action3), func(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
fakeNow := fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
bm := newTestBlockManager(data, keyTime, fakeNow)
applyStep := func(action int) {
switch action {
case 0:
t.Logf("flushing and reopening")
bm.Flush(ctx)
bm = newTestBlockManager(data, keyTime, fakeNow)
case 1:
t.Logf("flushing")
bm.Flush(ctx)
case 2:
t.Logf("doing nothing")
}
}
block1 := writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 100))
applyStep(action1)
bm.DeleteBlock(block1)
applyStep(action2)
bm.RewriteBlock(ctx, block1)
applyStep(action3)
verifyBlockNotFound(ctx, t, bm, block1)
dumpBlockManagerData(t, data)
})
}
}
}
}
func TestDeleteAndRecreate(t *testing.T) {
ctx := context.Background()
// simulate race between delete/recreate and delete
// delete happens at t0+10, recreate at t0+20 and second delete time is parameterized.
// depending on it, the second delete results will be visible.
cases := []struct {
desc string
deletionTime time.Time
isVisible bool
}{
{"deleted before delete and-recreate", fakeTime.Add(5 * time.Second), true},
//{"deleted after delete and recreate", fakeTime.Add(25 * time.Second), false},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
// write a block
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, fakeTimeNowFrozen(fakeTime))
block1 := writeBlockAndVerify(ctx, t, bm, seededRandomData(10, 100))
bm.Flush(ctx)
// delete but at given timestamp but don't commit yet.
bm0 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(tc.deletionTime, 1*time.Second))
bm0.DeleteBlock(block1)
// delete it at t0+10
bm1 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(10*time.Second), 1*time.Second))
verifyBlock(ctx, t, bm1, block1, seededRandomData(10, 100))
bm1.DeleteBlock(block1)
bm1.Flush(ctx)
// recreate at t0+20
bm2 := newTestBlockManager(data, keyTime, fakeTimeNowWithAutoAdvance(fakeTime.Add(20*time.Second), 1*time.Second))
block2 := writeBlockAndVerify(ctx, t, bm2, seededRandomData(10, 100))
bm2.Flush(ctx)
// commit deletion from bm0 (t0+5)
bm0.Flush(ctx)
//dumpBlockManagerData(t, data)
if block1 != block2 {
t.Errorf("got invalid block %v, expected %v", block2, block1)
}
bm3 := newTestBlockManager(data, keyTime, nil)
dumpBlockManagerData(t, data)
if tc.isVisible {
verifyBlock(ctx, t, bm3, block1, seededRandomData(10, 100))
} else {
verifyBlockNotFound(ctx, t, bm3, block1)
}
})
}
}
func TestBlockWriteAliasing(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, fakeTimeNowFrozen(fakeTime))
blockData := []byte{100, 0, 0}
id1 := writeBlockAndVerify(ctx, t, bm, blockData)
blockData[0] = 101
id2 := writeBlockAndVerify(ctx, t, bm, blockData)
bm.Flush(ctx)
blockData[0] = 102
id3 := writeBlockAndVerify(ctx, t, bm, blockData)
blockData[0] = 103
id4 := writeBlockAndVerify(ctx, t, bm, blockData)
verifyBlock(ctx, t, bm, id1, []byte{100, 0, 0})
verifyBlock(ctx, t, bm, id2, []byte{101, 0, 0})
verifyBlock(ctx, t, bm, id3, []byte{102, 0, 0})
verifyBlock(ctx, t, bm, id4, []byte{103, 0, 0})
}
func TestBlockReadAliasing(t *testing.T) {
ctx := context.Background()
data := map[string][]byte{}
keyTime := map[string]time.Time{}
bm := newTestBlockManager(data, keyTime, fakeTimeNowFrozen(fakeTime))
blockData := []byte{100, 0, 0}
id1 := writeBlockAndVerify(ctx, t, bm, blockData)
blockData2, err := bm.GetBlock(ctx, id1)
if err != nil {
t.Fatalf("can't get block data: %v", err)
}
blockData2[0]++
verifyBlock(ctx, t, bm, id1, blockData)
bm.Flush(ctx)
verifyBlock(ctx, t, bm, id1, blockData)
}
func TestVersionCompatibility(t *testing.T) {
for writeVer := minSupportedReadVersion; writeVer <= currentWriteVersion; writeVer++ {
t.Run(fmt.Sprintf("version-%v", writeVer), func(t *testing.T) {
verifyVersionCompat(t, writeVer)
})
}
}
func verifyVersionCompat(t *testing.T, writeVersion int) {
ctx := context.Background()
// create block manager that writes 'writeVersion' and reads all versions >= minSupportedReadVersion
data := map[string][]byte{}
keyTime := map[string]time.Time{}
mgr := newTestBlockManager(data, keyTime, nil)
mgr.writeFormatVersion = int32(writeVersion)
dataSet := map[string][]byte{}
for i := 0; i < 3000000; i = (i + 1) * 2 {
data := make([]byte, i)
rand.Read(data)
cid, err := mgr.WriteBlock(ctx, data, "")
if err != nil {
t.Fatalf("unable to write %v bytes: %v", len(data), err)
}
dataSet[cid] = data
}
verifyBlockManagerDataSet(ctx, t, mgr, dataSet)
// delete random 3 items (map iteration order is random)
cnt := 0
for blockID := range dataSet {
t.Logf("deleting %v", blockID)
mgr.DeleteBlock(blockID)
delete(dataSet, blockID)
cnt++
if cnt >= 3 {
break
}
}
if err := mgr.Flush(ctx); err != nil {
t.Fatalf("failed to flush: %v", err)
}
// create new manager that reads and writes using new version.
mgr = newTestBlockManager(data, keyTime, nil)
// make sure we can read everything
verifyBlockManagerDataSet(ctx, t, mgr, dataSet)
if err := mgr.CompactIndexes(ctx, CompactOptions{
MinSmallBlocks: 1,
MaxSmallBlocks: 1,
}); err != nil {
t.Fatalf("unable to compact indexes: %v", err)
}
if err := mgr.Flush(ctx); err != nil {
t.Fatalf("failed to flush: %v", err)
}
verifyBlockManagerDataSet(ctx, t, mgr, dataSet)
// now open one more manager
mgr = newTestBlockManager(data, keyTime, nil)
verifyBlockManagerDataSet(ctx, t, mgr, dataSet)
}
func verifyBlockManagerDataSet(ctx context.Context, t *testing.T, mgr *Manager, dataSet map[string][]byte) {
for blockID, originalPayload := range dataSet {
v, err := mgr.GetBlock(ctx, blockID)
if err != nil {
t.Errorf("unable to read block %q: %v", blockID, err)
continue
}
if !reflect.DeepEqual(v, originalPayload) {
t.Errorf("payload for %q does not match original: %v", v, originalPayload)
}
}
}
func newTestBlockManager(data map[string][]byte, keyTime map[string]time.Time, timeFunc func() time.Time) *Manager {
//st = logging.NewWrapper(st)
if timeFunc == nil {
timeFunc = fakeTimeNowWithAutoAdvance(fakeTime, 1*time.Second)
}
st := storagetesting.NewMapStorage(data, keyTime, timeFunc)
bm, err := newManagerWithOptions(context.Background(), st, FormattingOptions{
BlockFormat: "UNENCRYPTED_HMAC_SHA256",
HMACSecret: hmacSecret,
MaxPackSize: maxPackSize,
}, CachingOptions{}, timeFunc)
if err != nil {
panic("can't create block manager: " + err.Error())
}
bm.checkInvariantsOnUnlock = true
return bm
}
func getIndexCount(d map[string][]byte) int {
var cnt int
for k := range d {
if strings.HasPrefix(k, newIndexBlockPrefix) {
cnt++
}
}
return cnt
}
func fakeTimeNowFrozen(t time.Time) func() time.Time {
return fakeTimeNowWithAutoAdvance(t, 0)
}
func fakeTimeNowWithAutoAdvance(t time.Time, dt time.Duration) func() time.Time {
var mu sync.Mutex
return func() time.Time {
mu.Lock()
defer mu.Unlock()
ret := t
t = t.Add(dt)
return ret
}
}
func verifyBlockNotFound(ctx context.Context, t *testing.T, bm *Manager, blockID string) {
t.Helper()
b, err := bm.GetBlock(ctx, blockID)
if err != storage.ErrBlockNotFound {
t.Errorf("unexpected response from GetBlock(%q), got %v,%v, expected %v", blockID, b, err, storage.ErrBlockNotFound)
}
}
func verifyBlock(ctx context.Context, t *testing.T, bm *Manager, blockID string, b []byte) {
t.Helper()
b2, err := bm.GetBlock(ctx, blockID)
if err != nil {
t.Errorf("unable to read block %q: %v", blockID, err)
return
}
if got, want := b2, b; !reflect.DeepEqual(got, want) {
t.Errorf("block %q data mismatch: got %x (nil:%v), wanted %x (nil:%v)", blockID, got, got == nil, want, want == nil)
}
bi, err := bm.BlockInfo(ctx, blockID)
if err != nil {
t.Errorf("error getting block info %q: %v", blockID, err)
}
if got, want := bi.Length, uint32(len(b)); got != want {
t.Errorf("invalid block size for %q: %v, wanted %v", blockID, got, want)
}
}
func writeBlockAndVerify(ctx context.Context, t *testing.T, bm *Manager, b []byte) string {
t.Helper()
blockID, err := bm.WriteBlock(ctx, b, "")
if err != nil {
t.Errorf("err: %v", err)
}
if got, want := blockID, string(hashValue(b)); got != want {
t.Errorf("invalid block ID for %x, got %v, want %v", b, got, want)
}
verifyBlock(ctx, t, bm, blockID, b)
return blockID
}
func seededRandomData(seed int, length int) []byte {
b := make([]byte, length)
rnd := rand.New(rand.NewSource(int64(seed)))
rnd.Read(b)
return b
}
func hashValue(b []byte) string {
h := hmac.New(sha256.New, hmacSecret)
h.Write(b)
return hex.EncodeToString(h.Sum(nil))
}
func dumpBlockManagerData(t *testing.T, data map[string][]byte) {
t.Helper()
for k, v := range data {
if k[0] == 'n' {
ndx, err := packindex.Open(bytes.NewReader(v))
if err == nil {
t.Logf("index %v (%v bytes)", k, len(v))
ndx.Iterate("", func(i packindex.Info) error {
t.Logf(" %+v\n", i)
return nil
})
}
} else {
t.Logf("data %v (%v bytes)\n", k, len(v))
}
}
}

View File

@@ -1,33 +0,0 @@
package block
import "crypto/hmac"
import "crypto/sha256"
import "errors"
func appendHMAC(data []byte, secret []byte) []byte {
h := hmac.New(sha256.New, secret)
h.Write(data) // nolint:errcheck
return h.Sum(data)
}
func verifyAndStripHMAC(b []byte, secret []byte) ([]byte, error) {
if len(b) < sha256.Size {
return nil, errors.New("invalid data - too short")
}
p := len(b) - sha256.Size
data := b[0:p]
signature := b[p:]
h := hmac.New(sha256.New, secret)
h.Write(data) // nolint:errcheck
validSignature := h.Sum(nil)
if len(signature) != len(validSignature) {
return nil, errors.New("invalid signature length")
}
if hmac.Equal(validSignature, signature) {
return data, nil
}
return nil, errors.New("invalid data - corrupted")
}

View File

@@ -1,10 +0,0 @@
package block
// CachingOptions specifies configuration of local cache.
type CachingOptions struct {
CacheDirectory string `json:"cacheDirectory,omitempty"`
MaxCacheSizeBytes int64 `json:"maxCacheSize,omitempty"`
MaxListCacheDurationSec int `json:"maxListCacheDuration,omitempty"`
IgnoreListCache bool `json:"-"`
HMACSecret []byte `json:"-"`
}

View File

@@ -1,139 +0,0 @@
package block
import (
"fmt"
"path/filepath"
"sync"
"github.com/kopia/kopia/internal/packindex"
"github.com/kopia/kopia/repo/storage"
)
type committedBlockIndex struct {
cache committedBlockIndexCache
mu sync.Mutex
inUse map[string]packindex.Index
merged packindex.Merged
}
type committedBlockIndexCache interface {
hasIndexBlockID(indexBlockID string) (bool, error)
addBlockToCache(indexBlockID string, data []byte) error
openIndex(indexBlockID string) (packindex.Index, error)
expireUnused(used []string) error
}
func (b *committedBlockIndex) getBlock(blockID string) (Info, error) {
b.mu.Lock()
defer b.mu.Unlock()
info, err := b.merged.GetInfo(blockID)
if info != nil {
return *info, nil
}
if err == nil {
return Info{}, storage.ErrBlockNotFound
}
return Info{}, err
}
func (b *committedBlockIndex) addBlock(indexBlockID string, data []byte, use bool) error {
if err := b.cache.addBlockToCache(indexBlockID, data); err != nil {
return err
}
if !use {
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
if b.inUse[indexBlockID] != nil {
return nil
}
ndx, err := b.cache.openIndex(indexBlockID)
if err != nil {
return fmt.Errorf("unable to open pack index %q: %v", indexBlockID, err)
}
b.inUse[indexBlockID] = ndx
b.merged = append(b.merged, ndx)
return nil
}
func (b *committedBlockIndex) listBlocks(prefix string, cb func(i Info) error) error {
b.mu.Lock()
m := append(packindex.Merged(nil), b.merged...)
b.mu.Unlock()
return m.Iterate(prefix, cb)
}
func (b *committedBlockIndex) packFilesChanged(packFiles []string) bool {
if len(packFiles) != len(b.inUse) {
return true
}
for _, packFile := range packFiles {
if b.inUse[packFile] == nil {
return true
}
}
return false
}
func (b *committedBlockIndex) use(packFiles []string) (bool, error) {
b.mu.Lock()
defer b.mu.Unlock()
if !b.packFilesChanged(packFiles) {
return false, nil
}
log.Debugf("set of index files has changed (had %v, now %v)", len(b.inUse), len(packFiles))
var newMerged packindex.Merged
newInUse := map[string]packindex.Index{}
defer func() {
newMerged.Close() //nolint:errcheck
}()
for _, e := range packFiles {
ndx, err := b.cache.openIndex(e)
if err != nil {
return false, fmt.Errorf("unable to open pack index %q: %v", e, err)
}
newMerged = append(newMerged, ndx)
newInUse[e] = ndx
}
b.merged = newMerged
b.inUse = newInUse
if err := b.cache.expireUnused(packFiles); err != nil {
log.Warningf("unable to expire unused block index files: %v", err)
}
newMerged = nil
return true, nil
}
func newCommittedBlockIndex(caching CachingOptions) (*committedBlockIndex, error) {
var cache committedBlockIndexCache
if caching.CacheDirectory != "" {
dirname := filepath.Join(caching.CacheDirectory, "indexes")
cache = &diskCommittedBlockIndexCache{dirname}
} else {
cache = &memoryCommittedBlockIndexCache{
blocks: map[string]packindex.Index{},
}
}
return &committedBlockIndex{
cache: cache,
inUse: map[string]packindex.Index{},
}, nil
}

View File

@@ -1,135 +0,0 @@
package block
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
"github.com/kopia/kopia/internal/packindex"
"golang.org/x/exp/mmap"
)
const (
simpleIndexSuffix = ".sndx"
unusedCommittedBlockIndexCleanupTime = 1 * time.Hour // delete unused committed index blocks after 1 hour
)
type diskCommittedBlockIndexCache struct {
dirname string
}
func (c *diskCommittedBlockIndexCache) indexBlockPath(indexBlockID string) string {
return filepath.Join(c.dirname, indexBlockID+simpleIndexSuffix)
}
func (c *diskCommittedBlockIndexCache) openIndex(indexBlockID string) (packindex.Index, error) {
fullpath := c.indexBlockPath(indexBlockID)
f, err := mmap.Open(fullpath)
if err != nil {
return nil, err
}
return packindex.Open(f)
}
func (c *diskCommittedBlockIndexCache) hasIndexBlockID(indexBlockID string) (bool, error) {
_, err := os.Stat(c.indexBlockPath(indexBlockID))
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func (c *diskCommittedBlockIndexCache) addBlockToCache(indexBlockID string, data []byte) error {
exists, err := c.hasIndexBlockID(indexBlockID)
if err != nil {
return err
}
if exists {
return nil
}
tmpFile, err := writeTempFileAtomic(c.dirname, data)
if err != nil {
return err
}
// rename() is atomic, so one process will succeed, but the other will fail
if err := os.Rename(tmpFile, c.indexBlockPath(indexBlockID)); err != nil {
// verify that the block exists
exists, err := c.hasIndexBlockID(indexBlockID)
if err != nil {
return err
}
if !exists {
return fmt.Errorf("unsuccessful index write of block %q", indexBlockID)
}
}
return nil
}
func writeTempFileAtomic(dirname string, data []byte) (string, error) {
// write to a temp file to avoid race where two processes are writing at the same time.
tf, err := ioutil.TempFile(dirname, "tmp")
if err != nil {
if os.IsNotExist(err) {
os.MkdirAll(dirname, 0700) //nolint:errcheck
tf, err = ioutil.TempFile(dirname, "tmp")
}
}
if err != nil {
return "", fmt.Errorf("can't create tmp file: %v", err)
}
if _, err := tf.Write(data); err != nil {
return "", fmt.Errorf("can't write to temp file: %v", err)
}
if err := tf.Close(); err != nil {
return "", fmt.Errorf("can't close tmp file")
}
return tf.Name(), nil
}
func (c *diskCommittedBlockIndexCache) expireUnused(used []string) error {
entries, err := ioutil.ReadDir(c.dirname)
if err != nil {
return fmt.Errorf("can't list cache: %v", err)
}
remaining := map[string]os.FileInfo{}
for _, ent := range entries {
if strings.HasSuffix(ent.Name(), simpleIndexSuffix) {
n := strings.TrimSuffix(ent.Name(), simpleIndexSuffix)
remaining[n] = ent
}
}
for _, u := range used {
delete(remaining, u)
}
for _, rem := range remaining {
if time.Since(rem.ModTime()) > unusedCommittedBlockIndexCleanupTime {
log.Debugf("removing unused %v %v", rem.Name(), rem.ModTime())
if err := os.Remove(filepath.Join(c.dirname, rem.Name())); err != nil {
log.Warningf("unable to remove unused index file: %v", err)
}
} else {
log.Debugf("keeping unused %v because it's too new %v", rem.Name(), rem.ModTime())
}
}
return nil
}

View File

@@ -1,50 +0,0 @@
package block
import (
"bytes"
"fmt"
"sync"
"github.com/kopia/kopia/internal/packindex"
)
type memoryCommittedBlockIndexCache struct {
mu sync.Mutex
blocks map[string]packindex.Index
}
func (m *memoryCommittedBlockIndexCache) hasIndexBlockID(indexBlockID string) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.blocks[indexBlockID] != nil, nil
}
func (m *memoryCommittedBlockIndexCache) addBlockToCache(indexBlockID string, data []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
ndx, err := packindex.Open(bytes.NewReader(data))
if err != nil {
return err
}
m.blocks[indexBlockID] = ndx
return nil
}
func (m *memoryCommittedBlockIndexCache) openIndex(indexBlockID string) (packindex.Index, error) {
m.mu.Lock()
defer m.mu.Unlock()
v := m.blocks[indexBlockID]
if v == nil {
return nil, fmt.Errorf("block not found in cache: %v", indexBlockID)
}
return v, nil
}
func (m *memoryCommittedBlockIndexCache) expireUnused(used []string) error {
return nil
}

View File

@@ -1,34 +0,0 @@
package block
import "context"
type contextKey string
var useBlockCacheContextKey contextKey = "use-block-cache"
var useListCacheContextKey contextKey = "use-list-cache"
// UsingBlockCache returns a derived context that causes block manager to use cache.
func UsingBlockCache(ctx context.Context, enabled bool) context.Context {
return context.WithValue(ctx, useBlockCacheContextKey, enabled)
}
// UsingListCache returns a derived context that causes block manager to use cache.
func UsingListCache(ctx context.Context, enabled bool) context.Context {
return context.WithValue(ctx, useListCacheContextKey, enabled)
}
func shouldUseBlockCache(ctx context.Context) bool {
if enabled, ok := ctx.Value(useBlockCacheContextKey).(bool); ok {
return enabled
}
return true
}
func shouldUseListCache(ctx context.Context) bool {
if enabled, ok := ctx.Value(useListCacheContextKey).(bool); ok {
return enabled
}
return true
}

View File

@@ -1,123 +0,0 @@
package block
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/kopia/kopia/repo/storage"
)
type listCache struct {
st storage.Storage
cacheFile string
listCacheDuration time.Duration
hmacSecret []byte
}
func (c *listCache) listIndexBlocks(ctx context.Context) ([]IndexInfo, error) {
if c.cacheFile != "" {
ci, err := c.readBlocksFromCache(ctx)
if err == nil {
expirationTime := ci.Timestamp.Add(c.listCacheDuration)
if time.Now().Before(expirationTime) {
log.Debugf("retrieved list of index blocks from cache")
return ci.Blocks, nil
}
} else if err != storage.ErrBlockNotFound {
log.Warningf("unable to open cache file: %v", err)
}
}
blocks, err := listIndexBlocksFromStorage(ctx, c.st)
if err == nil {
c.saveListToCache(ctx, &cachedList{
Blocks: blocks,
Timestamp: time.Now(),
})
}
log.Debugf("found %v index blocks from source", len(blocks))
return blocks, err
}
func (c *listCache) saveListToCache(ctx context.Context, ci *cachedList) {
if c.cacheFile == "" {
return
}
log.Debugf("saving index blocks to cache: %v", len(ci.Blocks))
if data, err := json.Marshal(ci); err == nil {
mySuffix := fmt.Sprintf(".tmp-%v-%v", os.Getpid(), time.Now().UnixNano())
if err := ioutil.WriteFile(c.cacheFile+mySuffix, appendHMAC(data, c.hmacSecret), 0600); err != nil {
log.Warningf("unable to write list cache: %v", err)
}
os.Rename(c.cacheFile+mySuffix, c.cacheFile) //nolint:errcheck
os.Remove(c.cacheFile + mySuffix) //nolint:errcheck
}
}
func (c *listCache) deleteListCache(ctx context.Context) {
if c.cacheFile != "" {
os.Remove(c.cacheFile) //nolint:errcheck
}
}
func (c *listCache) readBlocksFromCache(ctx context.Context) (*cachedList, error) {
if !shouldUseListCache(ctx) {
return nil, storage.ErrBlockNotFound
}
ci := &cachedList{}
data, err := ioutil.ReadFile(c.cacheFile)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrBlockNotFound
}
return nil, err
}
data, err = verifyAndStripHMAC(data, c.hmacSecret)
if err != nil {
return nil, fmt.Errorf("invalid file %v: %v", c.cacheFile, err)
}
if err := json.Unmarshal(data, &ci); err != nil {
return nil, fmt.Errorf("can't unmarshal cached list results: %v", err)
}
return ci, nil
}
func newListCache(ctx context.Context, st storage.Storage, caching CachingOptions) (*listCache, error) {
var listCacheFile string
if caching.CacheDirectory != "" {
listCacheFile = filepath.Join(caching.CacheDirectory, "list")
if _, err := os.Stat(caching.CacheDirectory); os.IsNotExist(err) {
if err := os.MkdirAll(caching.CacheDirectory, 0700); err != nil {
return nil, err
}
}
}
c := &listCache{
st: st,
cacheFile: listCacheFile,
hmacSecret: caching.HMACSecret,
listCacheDuration: time.Duration(caching.MaxListCacheDurationSec) * time.Second,
}
if caching.IgnoreListCache {
c.deleteListCache(ctx)
}
return c, nil
}

View File

@@ -1,25 +0,0 @@
package block
// Stats exposes statistics about block operation.
type Stats struct {
// Keep int64 fields first to ensure they get aligned to at least 64-bit boundaries
// which is required for atomic access on ARM and x86-32.
ReadBytes int64 `json:"readBytes,omitempty"`
WrittenBytes int64 `json:"writtenBytes,omitempty"`
DecryptedBytes int64 `json:"decryptedBytes,omitempty"`
EncryptedBytes int64 `json:"encryptedBytes,omitempty"`
HashedBytes int64 `json:"hashedBytes,omitempty"`
ReadBlocks int32 `json:"readBlocks,omitempty"`
WrittenBlocks int32 `json:"writtenBlocks,omitempty"`
CheckedBlocks int32 `json:"checkedBlocks,omitempty"`
HashedBlocks int32 `json:"hashedBlocks,omitempty"`
InvalidBlocks int32 `json:"invalidBlocks,omitempty"`
PresentBlocks int32 `json:"presentBlocks,omitempty"`
ValidBlocks int32 `json:"validBlocks,omitempty"`
}
// Reset clears all repository statistics.
func (s *Stats) Reset() {
*s = Stats{}
}

Some files were not shown because too many files have changed in this diff Show More