feat(repository): added Set and Map backed by custom on-disk hashtable (#2364)

* feat(repository): added bigmap Set and Map backed by custom on-disk hashtable

* additional test coverage for corner cases

* profile flags

* exclude bigmapbench from code coverage

* refactored based on conversation with Julio

The concern was that values stored on disk are not encrypted.

* bigmap.Map - implements encryption of values
* bigmap.Set - stores keys only, so no encryption necessary
* bigmap.internalMap - used as backing structure for Map and Set

* implemented benchmark with values
This commit is contained in:
Jarek Kowalski
2022-09-10 08:47:53 -07:00
committed by GitHub
parent 64c2fcd481
commit 28ce29eab4
8 changed files with 1276 additions and 0 deletions

View File

@@ -25,4 +25,5 @@ ignore:
- internal/repotesting/
- internal/fshasher/
- internal/blobtesting/
- internal/bigmap/bigmapbench/

View File

@@ -0,0 +1,476 @@
// Package bigmap implements a custom hashmap data structure where keys and values are binary
// and keys are meant to be well-distributed hashes, such as content IDs, object IDs, etc.
//
// Unlike regular maps this map is limited to adding and getting elements but does not support
// any iteration or deletion, but is much more efficient in terms of memory usage.
//
// Data for the hash table is stored in large contiguous memory blocks (for small sets)
// and automatically spills over to memory-mapped files for larger sets using only 8 bytes
// per key in RAM.
package bigmap
import (
"bytes"
"context"
"encoding/binary"
"os"
"path/filepath"
"sync"
"github.com/edsrzf/mmap-go"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/logging"
)
const (
defaultNumMemorySegments = 8 // number of segments to keep in RAM
defaultMemorySegmentSize = 18 * 1e6 // 18MB enough to store >1M 16-17-byte keys
defaultFileSegmentSize = 1024 << 20 // 1 GiB
defaultInitialSizeLogarithm = 20
mmapFileMode = 0o600
// grow hash table above this percentage utilization, higher values (close to 100) will be very slow,
// smaller values will waste memory.
defaultLoadFactorPercentage = 75
minKeyLength = 4 // first 4 bytes of each key are used as uint32 hash value
maxKeyLength = 255
minSizeLogarithm = 8
)
var log = logging.Module("bigmap")
// Options provides options for the internalMap.
type Options struct {
LoadFactorPercentage int // grow the size of the hash table when this percentage full
NumMemorySegments int // number of segments to keep in RAM
MemorySegmentSize int64 // size of a single memory segment
FileSegmentSize int // size of a single file segment, defaults to 1 GiB
InitialSizeLogarithm int // logarithm of the initial size of the hash table, default - 20
}
// internalMap is a custom hashtable implementation using https://en.wikipedia.org/wiki/Double_hashing
// that stores all (key,value) pairs densely packed in segments of fixed size to minimize data
// fragmentation.
type internalMap struct {
hasValues bool // +checklocksignore
opts Options // +checklocksignore
mu sync.RWMutex
// +checklocks:mu
tableSizeIndex int // index into tableSizesPrimes == len(slots)
// +checklocks:mu
segments []mmap.MMap // ([keyLength][key][valueLength][value])*
// +checklocks:mu
slots []entry // current hash table slots
// +checklocks:mu
count int // number of elements in the hash tables
// +checklocks:mu
h2Prime uint64 // prime < len(slots)
// +checklocks:mu
cleanups []func()
// +checklocks:mu
tempDir string
}
// The list of prime numbers close to 2^N from https://primes.utm.edu/lists/2small/0bit.html
//nolint:gochecknoglobals
var tableSizesPrimes = []uint64{
1<<8 - 59, // minSizeLogarithm
1<<9 - 55,
1<<10 - 57,
1<<11 - 61,
1<<12 - 77,
1<<13 - 91,
1<<14 - 111,
1<<15 - 135,
1<<16 - 123,
1<<17 - 99,
1<<18 - 93,
1<<19 - 87,
1<<20 - 185, // initial H2 prime
1<<21 - 129, // initial number of hashtable slots
1<<22 - 123,
1<<23 - 159,
1<<24 - 167,
1<<25 - 183,
1<<26 - 135,
1<<27 - 235,
1<<28 - 273,
1<<29 - 133,
1<<30 - 173,
1<<31 - 171,
1<<32 - 267,
1<<33 - 355,
1<<34 - 281,
1<<35 - 325,
1<<36 - 233,
1<<37 - 375,
1<<38 - 257,
1<<39 - 301,
1<<40 - 437,
1<<41 - 139,
1<<42 - 227,
1<<43 - 369,
1<<44 - 377,
1<<45 - 229,
1<<46 - 311,
1<<47 - 649,
1<<48 - 257,
1<<49 - 339,
1<<50 - 233,
1<<51 - 439,
1<<52 - 395,
1<<53 - 421,
1<<54 - 327,
1<<55 - 267,
1<<56 - 195,
1<<57 - 423,
1<<58 - 251,
1<<59 - 871,
1<<60 - 453,
1<<61 - 465,
1<<62 - 273,
1<<63 - 471,
}
type entry struct {
segment uint32 // 0-empty, otherwise index into internalMap.segments+1
offset uint32 // offset within a segment
}
// Contains returns true if the provided key is in the map.
func (m *internalMap) Contains(key []byte) bool {
m.mu.RLock()
defer m.mu.RUnlock()
slot := m.findSlot(key)
return m.slots[slot].segment != 0
}
// Get gets the value associated with a given key. It is appended to the provided buffer.
func (m *internalMap) Get(buf, key []byte) ([]byte, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
slot := m.findSlot(key)
e := m.slots[slot]
if e.segment == 0 {
return nil, false
}
if !m.hasValues {
return nil, true
}
data := m.segments[e.segment-1] // 1-indexed
koff := e.offset
off := koff + uint32(data[koff]) + 1
vlen, vlenLen := binary.Uvarint(data[off:])
start := off + uint32(vlenLen)
return append(buf, data[start:start+uint32(vlen)]...), true
}
func (m *internalMap) hashValue(key []byte) uint64 {
if len(key) < 8 { //nolint:gomnd
return uint64(binary.BigEndian.Uint32(key))
}
return binary.BigEndian.Uint64(key)
}
// h2 returns the secondary hash value used for double hashing.
func (m *internalMap) h2(key []byte) uint64 {
if len(key) < 16 { //nolint:gomnd
// use linear scan.
return 1
}
return binary.BigEndian.Uint64(key[8:])
}
// +checklocksread:m.mu
func (m *internalMap) keyEquals(e entry, key []byte) bool {
data := m.segments[e.segment-1] // 1-indexed
koff := e.offset
keyLen := uint32(data[koff])
return bytes.Equal(key, data[koff+1:koff+1+keyLen])
}
// +checklocksread:m.mu
func (m *internalMap) findSlotInSlice(key []byte, slots []entry, h2Prime uint64) uint64 {
slot := m.hashValue(key) % uint64(len(slots))
delta := m.h2(key) % h2Prime
if delta == 0 {
delta = 1
}
for slots[slot].segment != 0 && !m.keyEquals(slots[slot], key) {
slot = (slot + delta) % uint64(len(slots))
}
return slot
}
// +checklocksread:m.mu
func (m *internalMap) findSlot(key []byte) uint64 {
return m.findSlotInSlice(key, m.slots, m.h2Prime)
}
// +checklocks:m.mu
func (m *internalMap) growLocked(newSize uint64) {
newSlots := make([]entry, newSize)
newH2Prime := uint64(len(m.slots))
for segNum, seg := range m.segments {
p := 0
for p < len(seg) && seg[p] != 0 {
koff := p
key := seg[p+1 : p+1+int(seg[p])]
p += len(key) + 1
if m.hasValues {
vlen, vlenLen := binary.Uvarint(seg[p:])
p += vlenLen + int(vlen)
}
slot := m.findSlotInSlice(key, newSlots, newH2Prime)
newSlots[slot] = entry{segment: uint32(segNum) + 1, offset: uint32(koff)}
}
}
m.h2Prime = newH2Prime
m.slots = newSlots
}
// PutIfAbsent conditionally adds the provided (key, value) to the map if the provided key is absent.
func (m *internalMap) PutIfAbsent(ctx context.Context, key, value []byte) bool {
m.mu.Lock()
defer m.mu.Unlock()
if len(key) < minKeyLength {
panic("key too short")
}
if len(key) > maxKeyLength {
panic("key too long")
}
if !m.hasValues && len(value) > 0 {
panic("values are not enabled")
}
slot := m.findSlot(key)
if m.slots[slot].segment != 0 {
return false
}
if m.count >= len(m.slots)*m.opts.LoadFactorPercentage/100 {
m.tableSizeIndex++
m.growLocked(tableSizesPrimes[m.tableSizeIndex])
slot = m.findSlot(key)
}
payloadLength := 1 + len(key) + binary.MaxVarintLen32 + len(value)
current := m.segments[len(m.segments)-1]
// ensure that the key we're adding fits in the segment, start a new segment if not enough room.
if len(current)+payloadLength > cap(current) {
current = m.newSegment(ctx)
m.segments = append(m.segments, current)
}
koff := uint32(len(current))
current = append(current, byte(len(key)))
current = append(current, key...)
// append the value
if m.hasValues {
var vlen [binary.MaxVarintLen32]byte
n := binary.PutUvarint(vlen[:], uint64(len(value)))
current = append(current, vlen[0:n]...)
current = append(current, value...)
}
m.segments[len(m.segments)-1] = current
m.count++
m.slots[slot] = entry{
segment: uint32(len(m.segments)), // this is 1-based, 0==empty slot
offset: koff,
}
return true
}
// +checklocks:m.mu
func (m *internalMap) newMemoryMappedSegment(ctx context.Context) (mmap.MMap, error) {
flags := 0
f, err := m.maybeCreateMappedFile(ctx)
if err != nil {
return nil, err
}
s, err := mmap.MapRegion(f, m.opts.FileSegmentSize, mmap.RDWR, flags, 0)
if err != nil {
return nil, errors.Wrap(err, "unable to map region")
}
m.cleanups = append(m.cleanups, func() {
if err := s.Unmap(); err != nil {
log(ctx).Warnf("unable to unmap memory region: %v", err)
}
})
return s[:0], nil
}
// +checklocks:m.mu
func (m *internalMap) maybeCreateMappedFile(ctx context.Context) (*os.File, error) {
if m.tempDir == "" {
tempDir, err := os.MkdirTemp("", "kopia-map")
if err != nil {
return nil, errors.Wrap(err, "unable to create temp directory")
}
m.tempDir = tempDir
}
fname := filepath.Join(m.tempDir, uuid.NewString())
f, err := os.OpenFile(fname, os.O_CREATE|os.O_EXCL|os.O_RDWR, mmapFileMode) //nolint:gosec
if err != nil {
return nil, errors.Wrap(err, "unable to create memory-mapped file")
}
if err := f.Truncate(int64(m.opts.FileSegmentSize)); err != nil {
closeAndRemoveFile(ctx, f, fname)
return nil, errors.Wrap(err, "unable to truncate memory-mapped file")
}
m.cleanups = append(m.cleanups, func() {
closeAndRemoveFile(ctx, f, fname)
})
return f, nil
}
func closeAndRemoveFile(ctx context.Context, f *os.File, fname string) {
if err := f.Close(); err != nil {
log(ctx).Warnf("unable to close segment file: %v", err)
}
if err := os.Remove(fname); err != nil {
log(ctx).Warnf("unable to remove segment file: %v", err)
}
}
// +checklocks:m.mu
func (m *internalMap) newSegment(ctx context.Context) mmap.MMap {
var s mmap.MMap
if len(m.segments) >= m.opts.NumMemorySegments {
var err error
s, err = m.newMemoryMappedSegment(ctx)
if err != nil {
log(ctx).Warnf("unable to create memory-mapped segment: %v", err)
}
}
if s == nil {
s = make([]byte, 0, m.opts.MemorySegmentSize)
}
return s
}
// Close releases all resources associated with a map.
func (m *internalMap) Close(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
for i := len(m.cleanups) - 1; i >= 0; i-- {
m.cleanups[i]()
}
m.cleanups = nil
m.segments = nil
if m.tempDir != "" {
os.RemoveAll(m.tempDir) //nolint:errcheck
}
}
// newInternalMap creates new internalMap.
func newInternalMap(ctx context.Context) (*internalMap, error) {
return newInternalMapWithOptions(ctx, true, nil)
}
// newInternalMapWithOptions creates a new instance of internalMap.
func newInternalMapWithOptions(ctx context.Context, hasValues bool, opts *Options) (*internalMap, error) {
if opts == nil {
opts = &Options{}
}
if opts.LoadFactorPercentage == 0 {
opts.LoadFactorPercentage = defaultLoadFactorPercentage
}
if opts.MemorySegmentSize == 0 {
opts.MemorySegmentSize = defaultMemorySegmentSize
}
if opts.NumMemorySegments == 0 {
opts.NumMemorySegments = defaultNumMemorySegments
}
if opts.FileSegmentSize == 0 {
opts.FileSegmentSize = defaultFileSegmentSize
}
if opts.InitialSizeLogarithm == 0 {
opts.InitialSizeLogarithm = defaultInitialSizeLogarithm
}
tablewSizeIndex := opts.InitialSizeLogarithm - minSizeLogarithm
if tablewSizeIndex < 1 {
return nil, errors.Errorf("invalid initial size")
}
m := &internalMap{
hasValues: hasValues,
opts: *opts,
tableSizeIndex: tablewSizeIndex,
h2Prime: tableSizesPrimes[tablewSizeIndex-1], // h2 prime < number of slots
slots: make([]entry, tableSizesPrimes[tablewSizeIndex]),
}
m.mu.Lock()
m.segments = append(m.segments, m.newSegment(ctx))
m.mu.Unlock()
return m, nil
}

View File

@@ -0,0 +1,290 @@
package bigmap
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"hash"
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testlogging"
)
func TestInternalMap(t *testing.T) {
ctx := testlogging.Context(t)
impl, err := newInternalMap(ctx)
require.NoError(t, err)
defer impl.Close(ctx)
key1 := []byte("key1")
key2 := []byte("longerkey2")
val1 := []byte("val1")
val2 := []byte("val2")
v, ok := impl.Get(nil, key1)
require.Nil(t, v)
require.False(t, ok)
impl.PutIfAbsent(ctx, key1, val1)
v, ok = impl.Get(nil, key1)
require.True(t, ok)
require.Equal(t, val1, v)
v, ok = impl.Get(nil, key2)
require.Nil(t, v)
require.False(t, ok)
impl.PutIfAbsent(ctx, key2, val2)
v, ok = impl.Get(nil, key2)
require.True(t, ok)
require.Equal(t, val2, v)
v, ok = impl.Get(nil, key1)
require.True(t, ok)
require.Equal(t, val1, v)
}
func TestGrowingMap(t *testing.T) {
ctx := testlogging.Context(t)
impl, err := newInternalMapWithOptions(ctx, true, &Options{
InitialSizeLogarithm: 9,
NumMemorySegments: 3,
MemorySegmentSize: 1000,
FileSegmentSize: 4 << 20,
})
require.NoError(t, err)
defer impl.Close(ctx)
h := sha256.New()
// insert 20K hashes
for i := 0; i < 20000; i++ {
var keybuf, valbuf, valbuf2 [sha256.Size]byte
k := sha256Key(h, keybuf[:0], i)
v := sha256Key(h, valbuf[:0], i+3)
require.True(t, impl.PutIfAbsent(ctx, k, v))
// ensure that previously written key is still there.
pkindex := i / 2
pk := sha256Key(h, keybuf[:0], pkindex)
require.True(t, impl.Contains(pk))
pv, ok := impl.Get(valbuf2[:0], pk)
require.True(t, ok)
require.Equal(t, pv, sha256Key(h, valbuf[:0], pkindex+3))
// ensure that key not written yet is not there.
nk := sha256Key(h, keybuf[:0], i+1)
require.False(t, impl.Contains(nk))
_, ok2 := impl.Get(valbuf2[:0], nk)
require.False(t, ok2)
}
}
func TestGrowingSet(t *testing.T) {
ctx := testlogging.Context(t)
impl, err := NewSetWithOptions(ctx, &Options{
InitialSizeLogarithm: 9,
NumMemorySegments: 3,
MemorySegmentSize: 1000,
FileSegmentSize: 4 << 20,
})
require.NoError(t, err)
defer impl.Close(ctx)
h := sha256.New()
// insert 20K hashes
for i := 0; i < 20000; i++ {
var keybuf [sha256.Size]byte
k := sha256Key(h, keybuf[:0], i)
require.True(t, impl.Put(ctx, k))
require.False(t, impl.Put(ctx, k))
// ensure that previously written key is still there.
pkindex := i / 2
pk := sha256Key(h, keybuf[:0], pkindex)
require.True(t, impl.Contains(pk))
// ensure that key not written yet is not there.
nk := sha256Key(h, keybuf[:0], i+1)
require.False(t, impl.Contains(nk))
}
}
func sha256Key(h hash.Hash, out []byte, i int) []byte {
var num [8]byte
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
s := h.Sum(out)
return s
}
func BenchmarkInternalMap_NoValue(b *testing.B) {
ctx := testlogging.Context(b)
m, err := newInternalMapWithOptions(ctx, false, nil)
require.NoError(b, err)
defer m.Close(ctx)
benchmarkInternalMap(b, m, []byte{})
}
func BenchmarkInternalMap_WithValue(b *testing.B) {
ctx := testlogging.Context(b)
m, err := newInternalMap(ctx)
require.NoError(b, err)
defer m.Close(ctx)
benchmarkInternalMap(b, m, []byte{1, 2, 3})
}
//nolint:thelper
func benchmarkInternalMap(b *testing.B, m *internalMap, someVal []byte) {
ctx := testlogging.Context(b)
b.ResetTimer()
var (
h = sha256.New()
num [8]byte
keyBuf [sha256.Size]byte
)
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
m.PutIfAbsent(ctx, key, someVal)
}
valBuf := make([]byte, 10)
for j := 0; j < 4; j++ {
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
_, ok := m.Get(valBuf[:0], key)
require.True(b, ok)
}
}
}
func BenchmarkSyncMap_NoValue(b *testing.B) {
benchmarkSyncMap(b, []byte{})
}
func BenchmarkSyncMap_WithValue(b *testing.B) {
someVal := []byte{1, 2, 3}
benchmarkSyncMap(b, someVal)
}
//nolint:thelper
func benchmarkSyncMap(b *testing.B, someVal []byte) {
var m sync.Map
var (
h = sha256.New()
num [8]byte
keyBuf [sha256.Size]byte
)
b.ResetTimer()
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
m.Store(string(key), append([]byte{}, someVal...))
}
for j := 0; j < 4; j++ {
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
val, ok := m.Load(string(key))
require.True(b, ok)
require.Equal(b, someVal, val)
}
}
}
func TestErrors(t *testing.T) {
ctx := testlogging.Context(t)
_, err := newInternalMapWithOptions(ctx, true, &Options{
InitialSizeLogarithm: 8,
})
require.ErrorContains(t, err, "invalid initial size")
}
func TestPanics(t *testing.T) {
ctx := testlogging.Context(t)
m, err := NewSet(ctx)
require.NoError(t, err)
// too short keys
require.Panics(t, func() { m.Put(ctx, nil) })
require.Panics(t, func() { m.Put(ctx, []byte{1}) })
// too long key
require.Panics(t, func() { m.Put(ctx, bytes.Repeat([]byte{1}, 256)) })
}
func TestMapWithoutValue(t *testing.T) {
ctx := testlogging.Context(t)
// this is a corner case, it's possible to create a map that can only support zero-length values.
m, err := newInternalMapWithOptions(ctx, false, nil)
require.NoError(t, err)
key := []byte{1, 2, 3, 4}
m.PutIfAbsent(ctx, key, nil)
require.Panics(t, func() { m.PutIfAbsent(ctx, key, []byte{3, 4, 5}) })
v, ok := m.Get(nil, key)
require.True(t, ok)
require.Nil(t, v)
}

View File

@@ -0,0 +1,126 @@
package bigmap
import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/binary"
"sync/atomic"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
)
const (
aesNonceSize = 12
aesKeySize = 32 // AES-256
)
// Map is a wrapper around internalMap that adds encryption.
type Map struct {
aead cipher.AEAD
inner *internalMap
nextNonce *uint64
}
// PutIfAbsent adds the element to the map, returns true if the element was added (as opposed to having
// existed before).
func (s *Map) PutIfAbsent(ctx context.Context, key, value []byte) bool {
if len(value) == 0 {
return s.inner.PutIfAbsent(ctx, key, nil)
}
var tmp gather.WriteBuffer
defer tmp.Close()
buf := tmp.MakeContiguous(aesNonceSize + len(value) + s.aead.Overhead())
nonce, input := buf[0:aesNonceSize], buf[aesNonceSize:aesNonceSize+len(value)]
copy(input, value)
nonceVal := atomic.AddUint64(s.nextNonce, 1)
if nonceVal == 0 {
panic("nonce counter wrapped beyond 64 bits, that should never happen")
}
binary.BigEndian.PutUint64(nonce, nonceVal)
s.aead.Seal(input[:0], nonce, input, key)
return s.inner.PutIfAbsent(ctx, key, buf)
}
// Get gets the element from the map and appends the value to the provided buffer.
func (s *Map) Get(ctx context.Context, output, key []byte) (result []byte, ok bool, err error) {
if v, ok := s.inner.Get(output, key); ok {
result, err := s.decrypt(key, v)
return result, true, err
}
return nil, false, nil
}
func (s *Map) decrypt(key, buf []byte) ([]byte, error) {
if len(buf) == 0 {
return nil, nil
}
nonce, input := buf[0:aesNonceSize], buf[aesNonceSize:]
result, err := s.aead.Open(input[:0], nonce, input, key)
if err != nil {
return nil, errors.Errorf("unable to decrypt content: %v", err)
}
return result, nil
}
// Contains returns true if a given key exists in the set.
func (s *Map) Contains(key []byte) bool {
return s.inner.Contains(key)
}
// Close releases resources associated with the set.
func (s *Map) Close(ctx context.Context) {
s.inner.Close(ctx)
}
// NewMap creates new Map.
func NewMap(ctx context.Context) (*Map, error) {
return NewMapWithOptions(ctx, nil)
}
// NewMapWithOptions creates new Map with options.
func NewMapWithOptions(ctx context.Context, opt *Options) (*Map, error) {
inner, err := newInternalMapWithOptions(ctx, true, opt)
if err != nil {
return nil, err
}
key := make([]byte, aesKeySize)
if _, err = rand.Read(key); err != nil {
return nil, errors.Wrap(err, "error initializing map key")
}
enc, err := aes.NewCipher(key)
if err != nil {
return nil, errors.Wrap(err, "error initializing map cipher")
}
aead, err := cipher.NewGCM(enc)
if err != nil {
return nil, errors.Wrap(err, "error initializing map AEAD")
}
if aead.NonceSize() != aesNonceSize {
return nil, errors.Errorf("unexpected nonce size: %v, expected %v", aead.NonceSize(), aesNonceSize)
}
return &Map{
aead: aead,
inner: inner,
nextNonce: new(uint64),
}, nil
}

View File

@@ -0,0 +1,130 @@
package bigmap_test
import (
"crypto/sha256"
"encoding/binary"
"hash"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/testlogging"
)
func TestGrowingMap(t *testing.T) {
ctx := testlogging.Context(t)
impl, err := bigmap.NewMapWithOptions(ctx, &bigmap.Options{
InitialSizeLogarithm: 9,
NumMemorySegments: 3,
MemorySegmentSize: 1000,
FileSegmentSize: 4 << 20,
})
require.NoError(t, err)
defer impl.Close(ctx)
h := sha256.New()
// insert 20K hashes
for i := 0; i < 20000; i++ {
var keybuf, valbuf, valbuf2 [sha256.Size]byte
k := sha256Key(h, keybuf[:0], i)
v := sha256Key(h, valbuf[:0], i+3)
require.True(t, impl.PutIfAbsent(ctx, k, v))
// ensure that previously written key is still there.
pkindex := i / 2
pk := sha256Key(h, keybuf[:0], pkindex)
require.True(t, impl.Contains(pk))
pv, ok, err := impl.Get(ctx, valbuf2[:0], pk)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, pv, sha256Key(h, valbuf[:0], pkindex+3))
// ensure that key not written yet is not there.
nk := sha256Key(h, keybuf[:0], i+1)
require.False(t, impl.Contains(nk))
_, ok2, err := impl.Get(ctx, valbuf2[:0], nk)
require.NoError(t, err)
require.False(t, ok2)
}
}
func sha256Key(h hash.Hash, out []byte, i int) []byte {
var num [8]byte
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
s := h.Sum(out)
return s
}
func BenchmarkMap_NoValue(b *testing.B) {
ctx := testlogging.Context(b)
m, err := bigmap.NewMapWithOptions(ctx, nil)
require.NoError(b, err)
defer m.Close(ctx)
benchmarkMap(b, m, []byte{})
}
func BenchmarkMap_WithValue(b *testing.B) {
ctx := testlogging.Context(b)
m, err := bigmap.NewMap(ctx)
require.NoError(b, err)
defer m.Close(ctx)
benchmarkMap(b, m, []byte{1, 2, 3})
}
//nolint:thelper
func benchmarkMap(b *testing.B, m *bigmap.Map, someVal []byte) {
ctx := testlogging.Context(b)
b.ResetTimer()
var (
h = sha256.New()
num [8]byte
keyBuf [sha256.Size]byte
)
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
m.PutIfAbsent(ctx, key, someVal)
}
valBuf := make([]byte, 10)
for j := 0; j < 4; j++ {
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
_, ok, err := m.Get(ctx, valBuf[:0], key)
require.NoError(b, err)
require.True(b, ok)
}
}
}

View File

@@ -0,0 +1,39 @@
package bigmap
import "context"
// Set is a wrapper around Map that only supports Put() and Contains().
type Set struct {
inner *internalMap
}
// Put adds the element to a set, returns true if the element was added (as opposed to having
// existed before).
func (s *Set) Put(ctx context.Context, key []byte) bool {
return s.inner.PutIfAbsent(ctx, key, nil)
}
// Contains returns true if a given key exists in the set.
func (s *Set) Contains(key []byte) bool {
return s.inner.Contains(key)
}
// Close releases resources associated with the set.
func (s *Set) Close(ctx context.Context) {
s.inner.Close(ctx)
}
// NewSet creates new Set.
func NewSet(ctx context.Context) (*Set, error) {
return NewSetWithOptions(ctx, nil)
}
// NewSetWithOptions creates new Set with options.
func NewSetWithOptions(ctx context.Context, opt *Options) (*Set, error) {
inner, err := newInternalMapWithOptions(ctx, false, opt)
if err != nil {
return nil, err
}
return &Set{inner}, nil
}

View File

@@ -0,0 +1,101 @@
package bigmap_test
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/testlogging"
)
func TestGrowingSet(t *testing.T) {
ctx := testlogging.Context(t)
impl, err := bigmap.NewSetWithOptions(ctx, &bigmap.Options{
InitialSizeLogarithm: 9,
NumMemorySegments: 3,
MemorySegmentSize: 1000,
FileSegmentSize: 4 << 20,
})
require.NoError(t, err)
defer impl.Close(ctx)
h := sha256.New()
// insert 20K hashes
for i := 0; i < 20000; i++ {
var keybuf [sha256.Size]byte
k := sha256Key(h, keybuf[:0], i)
require.True(t, impl.Put(ctx, k))
require.False(t, impl.Put(ctx, k))
// ensure that previously written key is still there.
pkindex := i / 2
pk := sha256Key(h, keybuf[:0], pkindex)
require.True(t, impl.Contains(pk))
// ensure that key not written yet is not there.
nk := sha256Key(h, keybuf[:0], i+1)
require.False(t, impl.Contains(nk))
}
}
func BenchmarkSet(b *testing.B) {
ctx := testlogging.Context(b)
m, err := bigmap.NewSet(ctx)
require.NoError(b, err)
defer m.Close(ctx)
b.ResetTimer()
var (
h = sha256.New()
num [8]byte
keyBuf [sha256.Size]byte
)
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
m.Put(ctx, key)
}
for j := 0; j < 4; j++ {
for i := 0; i < b.N; i++ {
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
key := h.Sum(keyBuf[:0])
require.True(b, m.Contains(key))
}
}
}
func TestSetPanics(t *testing.T) {
ctx := testlogging.Context(t)
m, err := bigmap.NewSet(ctx)
require.NoError(t, err)
// too short keys
require.Panics(t, func() { m.Put(ctx, nil) })
require.Panics(t, func() { m.Put(ctx, []byte{1}) })
// too long key
require.Panics(t, func() { m.Put(ctx, bytes.Repeat([]byte{1}, 256)) })
}

View File

@@ -0,0 +1,113 @@
package main
import (
"context"
"crypto/sha256"
"encoding/binary"
"flag"
"fmt"
"os"
"runtime"
"sync"
"time"
"github.com/pkg/profile"
"github.com/kopia/kopia/internal/bigmap"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/repo/logging"
)
const (
implSyncMap = 0
implMapWithEmptyValue = 1
implMapWithValues = 2
)
//nolint:gochecknoglobals
var (
impl = flag.Int("impl", implMapWithEmptyValue, "Select implementation")
profileDir = flag.String("profile-dir", "", "Profile directory")
profileCPU = flag.Bool("profile-cpu", false, "Profile CPU")
profileMemory = flag.Bool("profile-memory", false, "Profile RAM")
)
func main() {
flag.Parse()
ctx := logging.WithLogger(context.Background(), logging.ToWriter(os.Stderr))
var (
bm *bigmap.Map
sm *sync.Map
num [8]byte
keyBuf [sha256.Size]byte
ms0 runtime.MemStats
)
if *profileDir != "" {
pp := profile.ProfilePath(*profileDir)
if *profileCPU {
defer profile.Start(pp, profile.CPUProfile).Stop()
}
if *profileMemory {
defer profile.Start(pp, profile.MemProfile).Stop()
}
}
switch *impl {
case implSyncMap:
sm = &sync.Map{}
fmt.Println("using sync.Map")
case implMapWithEmptyValue:
fmt.Println("using bigmap.Map without values")
bm, _ = bigmap.NewMapWithOptions(ctx, &bigmap.Options{})
case implMapWithValues:
fmt.Println("using bigmap.Map with values")
bm, _ = bigmap.NewMapWithOptions(ctx, &bigmap.Options{})
}
h := sha256.New()
runtime.ReadMemStats(&ms0)
t0 := clock.Now()
for i := 0; i < 300_000_000; i++ {
if i%1_000_000 == 0 && i > 0 {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
alloc := ms.HeapAlloc - ms0.HeapAlloc
dur := clock.Now().Sub(t0).Truncate(time.Second)
fmt.Printf("elapsed %v count: %v M bytes: %v MB bytes/item: %v Mitems/sec: %.1f\n",
dur,
float64(i)/1e6,
alloc/1e6,
alloc/uint64(i),
float64(i)/dur.Seconds()/1e6)
}
// generate key=sha256(i) without allocations.
h.Reset()
binary.LittleEndian.PutUint64(num[:], uint64(i))
h.Write(num[:])
h.Sum(keyBuf[:0])
switch *impl {
case implSyncMap:
sm.LoadOrStore(keyBuf, nil)
case implMapWithEmptyValue:
bm.PutIfAbsent(ctx, keyBuf[:], nil)
case implMapWithValues:
bm.PutIfAbsent(ctx, keyBuf[:], keyBuf[:])
}
}
}